gRPCサービスのサーバーストリーミング・メソッドのサンプルを作成した。
(注意:2024年1月15日更新→.NET5.0はサポート終了しました。サンプルコードは.NET8.0に更新しました)
機能はファイルダウンロードだ。
これまでの gRPC のサンプルソースにサーバーストリーミング・メソッドを追加した。
以下がサンプルコードへのGitHubリンクになる。
gRPCサービス側
WPFクライアント側
また、テスト用に自由にファイルサイズを指定してテスト用のバイナリファイルを作成するツールを作成した。
以下がそのツールのソースと実行ファイルである。
非常に簡単なコマンドなので誰でもすぐに作れると思うが、いちいち自分で作る必要もないかと思うので共有する。
コンソールで、
dummyfile testfile.bin /s:1M /d
と入力すると「testfile.bin」というダミーファイルを作成する。
/s はファイルサイズを指定し、/d はファイル内部にダミーデータを書き込む事を指定する。
ダミーデータは、0x01から0xFFまでの数値を繰り返し書き込む。
全て .NET Core6.0 で稼働する。
サンプルコードの機能と使い方
今回、追加した機能はこれまでと同じように既存のサンプルに追加して作成している。
gRPCサービス側を起動しておき、WPFクライアント側を起動して操作する。
画面全体は以下のようになる。
その内、今回追加したのが以下の部分だ。
「ダウンロードファイル名」でgRPCサービス側(サーバー側)のファイル名を指定する。
「ダウンロード」ボタンをクリックすると保存ダイアログが出て、保存先を指定するとダウンロード処理が始まる。
「キャンセル」ボタンをクリックするとダウンロードを中断する。
gRPCサービスの解説
追加したコードの解説をする。
[greet.proto]の編集
[greet.proto]へ追加したのは以下のコードになる。
service Greeter {
//中略
// Server streaming
rpc FileDownload(FileDownloadRequest) returns (stream FileDownloadStream);
}
サーバーストリーミング・メソッドは返り値「returns」の指定に「stream」を宣言する。
呼び出し側はコールに必要なパラメータを単項メソッドと同じように指定するだけだ。
追加した「message」は以下になる。
// Call Server streaming
message FileDownloadRequest {
string fileName = 1;
}
// Server streaming
message FileDownloadStream {
string fileName = 1;
int64 fileSize = 2;
bytes binary = 3;
}
こちらの宣言の仕方は単項メソッドと変わらない。
FileDownloadStream の「bytes binary」はダウンロードするファイルを分割してダウンロードするためのパケットの格納容器になる。
サーバーストリーミングでは stream FileDownloadStream を繰り返しクライアントへ送信する事で、大量のデータを継続的回線で送信する。
実際の処理もループ処理になっている。
サーバーからクライアントへのストリーミング送信がサーバーストリーミング。
クライアントからサーバーへのストリーミング送信がクライアントストリーミング。
[GreeterService.cs]への編集
rpc FileDownload に対応するメソッドは以下になる。
/// <summary>
/// ファイルダウンロード(サーバー ストリーミング メソッド)
/// </summary>
/// <param name="request"></param>
/// <param name="responseStream">ストリーミング</param>
/// <param name="context"></param>
/// <returns></returns>
public override async Task FileDownload(FileDownloadRequest request,
IServerStreamWriter<FileDownloadStream> responseStream, ServerCallContext context)
{
const int BufferSize = 10240;
byte[] buffer = new byte[BufferSize];
string currentDir = Directory.GetCurrentDirectory();
Console.WriteLine("$CurrentDirectory = {0}", currentDir);
using(var fs = new FileStream(request.FileName, FileMode.Open, FileAccess.Read))
{
int downloadSize = 0;
int readSize = 0;
while ((readSize = fs.Read(buffer, 0, BufferSize)) > 0)
{
Console.WriteLine("ダウンロード リクエスト");
//クライアントからキャンセルされたら終了する。
if (context.CancellationToken.IsCancellationRequested)
{
Console.WriteLine("キャンセル リクエスト");
break;
}
FileDownloadStream fileDownloadStream = new FileDownloadStream();
fileDownloadStream.Binary = Google.Protobuf.ByteString.CopyFrom(buffer);
fileDownloadStream.FileName = request.FileName;
fileDownloadStream.FileSize = readSize;
await responseStream.WriteAsync(fileDownloadStream);
await Task.Delay(TimeSpan.FromSeconds(1));
//await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
downloadSize += readSize;
Console.WriteLine("{0}byte ダウンロード", downloadSize);
}
}
}
Directory.GetCurrentDirectory() は、サービス側のどこのフォルダーで稼働しているか表示する為に呼び出しているだけなので、本番では必要無い処理だ。
request.FileName でクライアントから要求ファイルの名称が送られてくるので、これをFileStreamで開いて読み込んでいる。
while ((readSize = fs.Read(buffer, 0, BufferSize)) > 0) で、BufferSizeで示されたバイト数ごとに、要求ファイルをバッファに読み込むループを回す。
FileDownloadStream fileDownloadStream に、gRPCで送信するパケットを作成して、
await responseStream.WriteAsync(fileDownloadStream);
で、クライアントへ送信する。
await Task.Delay(TimeSpan.FromSeconds(1));
Task.Delay はパケットを一回送信するごとに、一秒待機している。
必ずしも必要ではないのだが、これが無いとキャンセルを検知できない。
一秒も待たせる必要はないので、実用処理ではもっと短い時間を指定すると良い。
ちなみにその下の
//await Task.Delay(TimeSpan.FromSeconds(1), context.CancellationToken);
に入れ替えてやると、キャンセル時に例外インスタンスを生成してくれる。
以下の条件文がクライアントから送信される「キャンセル」イベントを検出している。
if (context.CancellationToken.IsCancellationRequested)
downloadSize += readSize; はダウンロードの進捗を表示する為の処理で、実用処理では必要無い。
全ての Console.WriteLine も実用処理では必要無い。
WPFクライアント側のコード解説
この画面のXAMLは以下のようになる。
[MainWindow.xaml]
<StackPanel Orientation="Vertical" Grid.Row="12" Height="30" VerticalAlignment="Top" Background="PeachPuff">
<StackPanel Orientation="Horizontal" Height="25" VerticalAlignment="Top" Background="PeachPuff">
<TextBlock Text="ダウンロードファイル名" Width="125" TextAlignment="Left" VerticalAlignment="Center" />
<TextBox x:Name="xDownloadFileNameTextBox" Width="450" TextAlignment="Left" Text=".TestDatatestfile.bin" VerticalAlignment="Center" />
<Button x:Name="FileDownloadButton" Content="ダウンロード" Width="100" Margin="30,0,0,0" VerticalAlignment="Center" Click="FileDownloadButton_Click"/>
<Button x:Name="CancelButton" Content="キャンセル" Width="100" Margin="30,0,0,0" Click="CancelButton_Click" IsEnabled="False" VerticalAlignment="Center"/>
<TextBlock x:Name="xDownloadMessage" Text="" Width="250" Margin="10,0,0,0" TextAlignment="Left" VerticalAlignment="Center"/>
</StackPanel>
</StackPanel>
xDownloadFileNameTextBox にダウンロードするサービス側のファイル名を指定する。
FileDownloadButton でダウンロード処理を実行する。
FileDownloadButton_Click がそのイベントメソッドである。
CancelButton でキャンセルイベントをサービスへ送る。
CancelButton_Click がそのイベントメソッドである。
xDownloadMessage に処理状況を表示する。
メッセージボックスの代りである。
次にコードビハインドの追加コードを以下に示す。
[MainWindow.xaml.cs]
/// <summary>
/// ファイルダウンロードのgRPCサービスメソッドのインスタンス
/// </summary>
private AsyncServerStreamingCall<FileDownloadStream> _responseFileDownload;
/// <summary>
/// ファイルダウンロード
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void FileDownloadButton_Click(object sender, RoutedEventArgs e)
{
string fileName = System.IO.Path.GetFileName(this.xDownloadFileNameTextBox.Text);
SaveFileDialog saveFileDialog = new SaveFileDialog();
saveFileDialog.Filter = "txt files (*.txt)|*.txt";
saveFileDialog.FileName = fileName;
if (this.xDownloadFileNameTextBox.Text.Length == 0)
{
return;
}
if (saveFileDialog.ShowDialog() == false)
return;
this.FileDownloadButton.IsEnabled = false;
this.CancelButton.IsEnabled = true;
this.xDownloadMessage.Text = fileName + " ダウンロード中";
FileDownloadRequest fileDownloadRequest = new FileDownloadRequest();
fileDownloadRequest.FileName = this.xDownloadFileNameTextBox.Text;
// gRPC サービスを呼び出す。
//var response = this.grpcClient.GreeterClient.FileDownload(fileDownloadRequest);
this._responseFileDownload = this.grpcClient.GreeterClient.FileDownload(fileDownloadRequest);
try
{
using (var fs = new FileStream(saveFileDialog.FileName, FileMode.Create, FileAccess.Write))
{
int offset = 0;
while (await this._responseFileDownload?.ResponseStream.MoveNext())
{
var reply = this._responseFileDownload.ResponseStream.Current;
byte[] bin = reply.Binary.ToByteArray();
fs.Write(bin, offset, (int)reply.FileSize);
}
}
this.xDownloadMessage.Text = "";
}
catch (RpcException rpcEx) when (rpcEx.StatusCode == StatusCode.Cancelled)
{
this.xDownloadMessage.Text = "ダウンロードをキャンセルしました。";
}
this.FileDownloadButton.IsEnabled = true;
this.CancelButton.IsEnabled = false;
}
/// <summary>
/// ファイルダウンロードのキャンセル
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void CancelButton_Click(object sender, RoutedEventArgs e)
{
if (this._responseFileDownload == null)
return;
this._responseFileDownload.Dispose();
}
これまでのサンプルでは、gRPCメソッドのインスタンスはメソッドのローカルスコープに宣言していたが、これはクラスのプロパティにインスタンスを持つ。
private AsyncServerStreamingCall<FileDownloadStream> _responseFileDownload;
なぜかと言えば、このインスタンスは非同期で実行され、ストリーミング処理である為、単項メソッドのようにコールして直ぐに終わらないからである。
呼び出してから非同期バックグラウンドで処理が終わるまで、常駐する。
だからインスタンスへの参照をプロパティに持つ。
インスタンスへの参照を失っても動作するが、「キャンセル」イベントを呼び出す時にこのインスタンスへの参照が必要なので、プロパティに持っている。
先に「キャンセル」イベントの呼び出し方を説明するが、ストリーミング処理のインスタンスである「_responseFileDownload」の「Dispose()」を呼び出す事で、gRPCサービスへ「キャンセル」イベントを送る事ができる。
private void CancelButton_Click(object sender, RoutedEventArgs e)
{
if (this._responseFileDownload == null)
return;
this._responseFileDownload.Dispose();
}
this._responseFileDownload への参照が無ければ「キャンセル」イベントは呼び出せないわけだ。
ファイルダウンロードのメイン処理は FileDownloadButton_Click になる。
gRPCメソッドの呼び出し方は単項メソッドと同じだ。
this._responseFileDownload = this.grpcClient.GreeterClient.FileDownload(fileDownloadRequest);
fileDownloadRequest には呼び出すファイル名だけが格納されている。
非同期処理なので結果は直ぐに帰ってくる。
次に FileStream でダウンロードパケットを書き込むファイルを作成して開き、gRPCサービスから送られるパケットループと同様に、パケットループでファイルにパケットの中身を書きこむ。
using (var fs = new FileStream(saveFileDialog.FileName, FileMode.Create, FileAccess.Write))
{
int offset = 0;
while (await this._responseFileDownload?.ResponseStream.MoveNext())
{
var reply = this._responseFileDownload.ResponseStream.Current;
byte[] bin = reply.Binary.ToByteArray();
fs.Write(bin, offset, (int)reply.FileSize);
}
}
await this._responseFileDownload?.ResponseStream.MoveNext()
は、サーバーストリーミングの次のパケットを呼び出す要求を出す。
次が無ければ FALSE を返す。
var reply = this._responseFileDownload.ResponseStream.Current;
で、次のパケットのレスポンスを受け取り、
byte[] bin = reply.Binary.ToByteArray();
で、gRPCバイナリ型を byte 型配列に変換する。
それを fs.write でファイルに書き込む。
これをパケットの数だけ繰り返す。
これが主なストリーミング処理の中身である。
「キャンセル」イベントの取得の為、例外インスタンスを捉えている。
catch (RpcException rpcEx) when (rpcEx.StatusCode == StatusCode.Cancelled)
RpcException は Grpc.Core のメンバである。
gRPCの例外インスタンスを纏めている。
rpcEx.StatusCode の値がキャンセルを示す「StatusCode.Cancelled」であれば、キャンセルが要求された事を意味するので、処理を中断する。
ストリーミング処理は非同期バックグラウンドで稼働しているので、CancelButton_Click の処理と、FileDownloadButton_Click の処理は互いに非同期で独立して動作しているため、キャンセルボタンをクリックしたイベントは別途 FileDownloadButton_Click 側で捉えなければならない。
だからこんな処理になる。
サーバーストリーミングのサンプルでした
以上でサンプルの解説を終わる。
ストリーミング処理は要するに非同期のパケットループ処理なので、分かってしまうと簡単だ。
しかし、最初に分からない状態で作ると、それなりに悩む時間を消費する。
このサンプルで無駄な時間を節約できれば幸いだ。
次はクライアントストリーミングを作る予定だ。
ファイルアップロードになると思う。
双方向ストリーミングはどうするか悩んでいる。
何を作れば良いか分からないからだ。
電話でも作るか ?