gRPCサービスのクライアントストリーミング・メソッドのサンプルを作成した。
(注意:2024年1月15日更新→.NET5.0はサポート終了しました。サンプルコードは.NET8.0に更新しました)
機能はファイルアップロードだ。
これまでの gRPC のサンプルソースにクライアントストリーミング・メソッドを追加した。
以下がサンプルコードへのGitHubリンクになる。
gRPCサービス側
WPFクライアント側
先日紹介したが、テスト用に自由にファイルサイズを指定してテスト用のバイナリファイルを作成するツールを作成した。
以下がそのツールのソースと実行ファイルである。
コンソールで、
dummyfile testfile.bin /s:1M /d
と入力すると「testfile.bin」というダミーファイルを作成する。
/s はファイルサイズを指定し、/d はファイル内部にダミーデータを書き込む事を指定する。
ダミーデータは、0x01から0xFFまでの数値を繰り返し書き込む。
全て .NET6.0 で稼働する。
サンプルコードの機能と使い方
今回、追加した機能はこれまでと同じように既存のサンプルに追加して作成している。
gRPCサービス側を起動しておき、WPFクライアント側を起動して操作する。
画面全体は以下のようになる。
その内、今回追加したのが以下の部分だ。
「アップロード」ボタンをクリックするとオープンダイアログが出て、ファイルを指定するとアップロード処理が始まる。
「キャンセル」ボタンをクリックするとアップロードを中断する。
gRPCサービスの解説
追加したコードの解説をする。
[greet.proto]の編集
[greet.proto]へ追加したのは以下のコードになる。
service Greeter {
// Client Streaming
rpc FileUpload(stream FileUploadStream) returns (FileUploadResponse);
}
クライアントストリーミング・メソッドは引数の指定に「stream」を宣言する。
返り値「returns」の側は単項メソッドと同じように指定するだけだ。
追加した「message」は以下になる。
// Client Streaming
message FileUploadStream {
string fileName = 1;
int64 fileSize = 2;
bytes binary = 3;
}
// Client Response
message FileUploadResponse {
string fileName = 1;
string result = 2;
}
こちらの宣言の仕方は単項メソッドと変わらない。
FileUploadStream の「bytes binary」はアップロードするファイルを分割してダウンロードするためのパケットの格納容器になる。
クライアントストリーミングでは stream FileUploadStream を繰り返しサーバーへ送信する事で、大量のデータを継続的回線で送信する。
実際の処理もループ処理になっている。
[GreeterService.cs]への編集
rpc FileUpload に対応するメソッドは以下になる。
/// <summary>
/// ファイルアップロード(クライアント ストリーミング メソッド)
/// </summary>
/// <param name="requestStream">ストリーミング</param>
/// <param name="context"></param>
/// <returns></returns>
public override async Task<FileUploadResponse> FileUpload(
IAsyncStreamReader<FileUploadStream> requestStream, ServerCallContext context)
{
string fileName = null;
string result = "Untreated";
FileStream fs = null;
try
{
await foreach (var message in requestStream.ReadAllAsync())
{
if(fs == null)
{
fileName = message.FileName;
fs = new FileStream(fileName, FileMode.Create, FileAccess.Write);
}
byte[] bin = message.Binary.ToByteArray();
int size = (int)message.FileSize;
fs.Write(bin, 0, size);
}
result = "Success";
}
catch (Exception ex)
{
result = ex.Message;
}
finally
{
if (fs != null)
{
fs.Close();
fs.Dispose();
}
}
return new FileUploadResponse() { FileName = fileName, Result = result };
}
クライアントストリーミングは、クライアントからループ処理で送られてくるパケットを、引数の
IAsyncStreamReader<FileUploadStream> requestStream
から受け取るだけの処理となる。
よって、requestStream からパケットを読み取るループ処理だけになる。
await foreach (var message in requestStream.ReadAllAsync())
requestStream.ReadAllAsync() は、クライアントからのストリーミングを非同期に一括で受け取りコレクションで保持する。
それを foreach のループで取得する。
このサーバー側処理では、受け取ったパケットを FileStream でサーバー側のストレージに書き込み保存している。
結果は、
return new FileUploadResponse() { FileName = fileName, Result = result };
で、クライアントへ返している。
それだけだ。
サーバー側の処理はとても単純だ。
WPFクライアント側のコード解説
この画面のXAMLは以下のようになる。
[MainWindow.xaml]
<StackPanel Orientation="Vertical" Grid.Row="13" Height="30" VerticalAlignment="Top" Background="Silver">
<StackPanel Orientation="Horizontal" Height="25" VerticalAlignment="Top" Background="Silver">
<Button x:Name="FileUploadButton" Content="ファイル アップロード" Width="100" Margin="30,0,0,0" VerticalAlignment="Center" Click="FileUploadButton_Click"/>
<Button x:Name="CancelUploadButton" Content="キャンセル" Width="100" Margin="30,0,0,0" Click="CancelUploadButton_Click" IsEnabled="False" VerticalAlignment="Center"/>
<TextBlock x:Name="xUploadMessage" Text="" Width="550" Margin="10,0,0,0" TextAlignment="Left" VerticalAlignment="Center"/>
</StackPanel>
</StackPanel>
FileUploadButton でオープンファイルダイアログを開き、ファイルを選ぶと、アップロード処理を実行する。
FileUploadButton_Click がそのイベントメソッドである。
CancelUploadButton でキャンセルフラグをTrueにする。
CancelUploadButton_Click がそのイベントメソッドである。
xUploadMessage に処理状況を表示する。
メッセージボックスの代りである。
次にコードビハインドの追加コードを以下に示す。
[MainWindow.xaml.cs]
/// <summary>
/// キャンセルした
/// </summary>
private bool _uploadCanceled = false;
/// <summary>
/// ファイルアップロード
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void FileUploadButton_Click(object sender, RoutedEventArgs e)
{
AsyncClientStreamingCall<FileUploadStream, FileUploadResponse> _callFileUpload;
string filePath;
string fileName;
const int BufferSize = 10240;
byte[] bin = new byte[BufferSize];
this._uploadCanceled = false;
OpenFileDialog openFileDialog = new OpenFileDialog();
var dlg = openFileDialog.ShowDialog();
if(dlg == false)
{
return;
}
filePath = openFileDialog.FileName;
fileName = System.IO.Path.GetFileName(filePath);
this.FileUploadButton.IsEnabled = false;
this.CancelUploadButton.IsEnabled = true;
this.xUploadMessage.Text = fileName + " アップロード中";
// gRPC メッセージ 宣言
FileUploadStream fileUploadStream = new FileUploadStream();
fileUploadStream.FileName = fileName;
fileUploadStream.FileSize = BufferSize;
// gRPC サービスを呼び出す。
_callFileUpload = this.grpcClient.GreeterClient.FileUpload();
using (var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read))
{
int sendSize = 0;
int readSize = 0;
while((readSize = fs.Read(bin, 0, BufferSize)) > 0)
{
if (this._uploadCanceled)
{
break;
}
fileUploadStream.Binary = Google.Protobuf.ByteString.CopyFrom(bin);
await _callFileUpload.RequestStream.WriteAsync(fileUploadStream);
await Task.Delay(TimeSpan.FromSeconds(1));
this.xUploadMessage.Text = fileName + " アップロード中 / Send Byte=" + (sendSize += readSize);
}
await _callFileUpload.RequestStream.CompleteAsync();
}
this.FileUploadButton.IsEnabled = true;
this.CancelUploadButton.IsEnabled = false;
if (this._uploadCanceled)
{
this.xUploadMessage.Text = "キャンセルしました";
}
else
{
this.xUploadMessage.Text = "Result = " + _callFileUpload.ResponseAsync.Result.Result.ToString();
}
}
/// <summary>
/// ファイルアップロードのキャンセル
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void CancelUploadButton_Click(object sender, RoutedEventArgs e)
{
this._uploadCanceled = true;
}
ファイルアップロードのメイン処理は FileUploadButton_Click になる。
gRPCメソッドの呼び出し方は単項メソッドとは異なり、呼び出しでは引数を指定しない。
// gRPC サービスを呼び出す。
_callFileUpload = this.grpcClient.GreeterClient.FileUpload();
この処理では BufferSize のバイト数ごとに、指定したファイルをループで読み込み、送信用のパケットを作成する。
そのパケットを同一ループで、以下の処理により、サーバーへ送信する。
fileUploadStream.Binary = Google.Protobuf.ByteString.CopyFrom(bin);
await _callFileUpload.RequestStream.WriteAsync(fileUploadStream);
await Task.Delay(TimeSpan.FromSeconds(1));
Task.Delay は一秒間スリープしているだけで、無くても良い。
キャンセルを取得する為に付けている。
WriteAsync メソッドで、サーバーへパケットを送っている。
ループの度に内容が変わるのは fileUploadStream.Binary だけなので、ループの中ではこれしか更新していない。
他はループの外で設定した。
// gRPC メッセージ 宣言
FileUploadStream fileUploadStream = new FileUploadStream();
fileUploadStream.FileName = fileName;
fileUploadStream.FileSize = BufferSize;
これは、内容次第でループの中で設定しても良い。
クライアントストリーミングが終了したら、サーバーへ終了を通知してやる必要がある。
await _callFileUpload.RequestStream.CompleteAsync();
FileStream の使い方の解説は省略する。
キャンセル処理は単純に「フラグ」を立てて、ループの中でそのフラグを確認して、立っていたら終了する。
クライアント側で制御できるので、中断処理は単純だ。
private void CancelUploadButton_Click(object sender, RoutedEventArgs e)
{
this._uploadCanceled = true;
}
while((readSize = fs.Read(bin, 0, BufferSize)) > 0)
{
if (this._uploadCanceled)
{
break;
}
private bool _uploadCanceled = false;
クライアントストリーミングのサンプルでした
サーバーストリーミングより簡単だったと思う。
お役に立てば幸いだ。
次は双方向ストリーミングを作る予定だ。
電話は難しいので、
単純にクライアントから送られたバイナリデータを、簡単に加工して、送り返す処理にしようと思っている。