.NETで名前付きパイプを試す(4) - 複数のクライアントに対応したサーバにする

もう少しNamedPipeClientStreamとNamedPipeServerStreamを試してみます。

前回のサーバは1つのクライアントの1つのリクエストを処理するだけでプログラムが終了しています。

ichiroku11.hatenablog.jp

これではさすがにサーバとは言えないと思うので、もう少しサーバっぽくしたいと思います。具体的には次のことに対応していきます。

  • 1つのサーバで複数のクライアントからのリクエストを処理できるようにする
  • 複数のリクエストを並列処理できるようにする

また前回までは「プロセス間」通信ということを確認したかったのでプロジェクトを分けましたが、今回は実行結果がわかりやすいように1つのプロジェクトにしてスレッド間で試しています。

複数のクライアントに対応したサーバ

まずはサーバのプログラムを終了しないようにして、複数のクライアントからのリクエストを処理できるようにします。

Serverメソッド内の処理全体をwhile文で囲んでいます。当然なのかもしれませんが、クライアントの NamedPipeClientStreamで接続が閉じられるとサーバのNamedPipeServerStreamも接続が閉じられるようで、NamedPipeServerStreamのインスタンスはwhile文の中で都度生成しています。ストリームのインスタンスは使い回すものじゃないのかなと。

serverIdは識別用です。後半でサーバを複数作るときに使います。

// サーバ
private static async Task Server(int serverId, Func<Message, Message> process) {
    while (true) {
        using (var stream = new NamedPipeServerStream("testpipe")) {
            // クライアントからの接続を待つ
            Console.WriteLine($"Server#{serverId} waiting");
            await stream.WaitForConnectionAsync();
            Console.WriteLine($"Server#{serverId} connected");

            // クライアントからリクエストを受信する
            var request = default(Message);
            using (var reader = new BinaryReader(stream, Encoding.UTF8, true)) {
                request = reader.ReadObject<Message>();
            }
            Console.WriteLine($"Server#{serverId} {nameof(request)}: {request}");

            // リクエストを処理してレスポンスを作る
            var response = process(request);

            // クライアントにレスポンスを送信する
            Console.WriteLine($"Server#{serverId} {nameof(response)}: {response}");
            using (var writer = new BinaryWriter(stream, Encoding.UTF8, true)) {
                writer.WriteObject(response);
            }
        }
    }
}

// サーバでの処理(Serverメソッドの引数に渡す)
// Contentの文字列を逆順にする処理
private static Message ServerProcess(Message message) {
    return new Message {
        Id = message.Id,
        Content = new string(message.Content.Reverse().ToArray()),
    };
}

複数のクライアントから接続

続いてクライアントです。clientIdは識別用です。

// クライアント
private static async Task<Message> Client(int clientId, Message request) {
    using (var stream = new NamedPipeClientStream("testpipe")) {
        // サーバに接続
        Console.WriteLine($"Client#{clientId} connecting");
        await stream.ConnectAsync();
        Console.WriteLine($"Client#{clientId} connected");

        // サーバにリクエストを送信する
        Console.WriteLine($"Client#{clientId} {nameof(request)}: {request}");
        using (var writer = new BinaryWriter(stream, Encoding.UTF8, true)) {
            writer.WriteObject(request);
        }

        // サーバからレスポンスを受信する
        var response = default(Message);
        using (var reader = new BinaryReader(stream, Encoding.UTF8, true)) {
            response = reader.ReadObject<Message>();
        }
        Console.WriteLine($"Client#{clientId} {nameof(response)}: {response}");

        return response;
    }
}

実行する

クライアントを2つとサーバを1つ動かしてみます。

サーバが1つなのでクライアントは同時に接続できていないことが確認できます。サーバではリクエストを1つずつ順に処理していることもわかります。

static void Main(string[] args) {
    Task.WaitAll(new[] {
        // クライアント
        Client(1, new Message { Id = 10, Content = "あいうえお", }),
        Client(2, new Message { Id = 20, Content = "かきくけこ", }),
        // サーバ
        Server(1, ServerProcess),
    });
}

// 実行結果(実行するごとに変わる部分もありますが)
/*
Client#1 connecting
Client#2 connecting
Server#1 waiting
Client#2 connected // Client#2が接続(Client#1はまだ接続待ち)
Server#1 connected
Client#2 request: { Id = 20, Content = "かきくけこ" }
Server#1 request: { Id = 20, Content = "かきくけこ" }
Server#1 response: { Id = 20, Content = "こけくきか" }
Client#2 response: { Id = 20, Content = "こけくきか" }
Server#1 waiting
Server#1 connected
Client#1 connected // ここでやっとClient#1が接続できる
Client#1 request: { Id = 10, Content = "あいうえお" }
Server#1 request: { Id = 10, Content = "あいうえお" }
Server#1 response: { Id = 10, Content = "おえういあ" }
Client#1 response: { Id = 10, Content = "おえういあ" }
Server#1 waiting
*/

複数のリクエストを並列に処理するサーバ

さらにサーバでは複数のリクエストを並列に処理できるようにしてみます。

NamedPipeServerStreamのコンストラクタにはサーバインスタンスの最大数を指定できるオーバーロードがあります。

このコンストラクタを使って複数のサーバインスタンスを用意できるように修正します。今回は最大数を2にしてみます。(ちなみに上記のサーバはサーバインスタンスの最大数は1でした。)

// サーバ
private static async Task Server(int serverId, Func<Message, Message> process) {
    while (true) {
        // サーバインスタンスを2に
        // using (var stream = new NamedPipeServerStream("testpipe")) {
        using (var stream = new NamedPipeServerStream("testpipe", PipeDirection.InOut, 2)) {
            // この部分は上記のServerメソッドと同じなので省略
        }
    }
}

実行する

クライアントとサーバを2つずつ動かしてみます。それぞれのサーバでリクエストが処理されていることを確認できます。

static void Main(string[] args) {
    Task.WaitAll(new[] {
        // クライアント
        Client(1, new Message { Id = 10, Content = "あいうえお", }),
        Client(2, new Message { Id = 20, Content = "かきくけこ", }),
        // サーバ
        Server(1, ServerProcess),
        Server(2, ServerProcess),
    });
}

// 実行結果(実行するごとに変わる部分もありますが)
/*
Client#1 connecting
Client#2 connecting
Server#1 waiting
Client#1 connected // Client#1が接続
Server#1 connected
Server#2 waiting
Server#2 connected
Client#2 connected // Client#2が接続
Client#2 request: { Id = 20, Content = "かきくけこ" }
Server#2 request: { Id = 20, Content = "かきくけこ" }
Server#2 response: { Id = 20, Content = "こけくきか" }
Client#2 response: { Id = 20, Content = "こけくきか" }
Server#2 waiting
Client#1 request: { Id = 10, Content = "あいうえお" }
Server#1 request: { Id = 10, Content = "あいうえお" }
Server#1 response: { Id = 10, Content = "おえういあ" }
Client#1 response: { Id = 10, Content = "おえういあ" }
Server#1 waiting
*/

これでNamedPipeClientStreamとNamedPipeServerStreamの基本的な使い方は把握できたかなーと思います。

ソース一式はこちらに。

.NETで名前付きパイプを試す · GitHub

おしまい。