添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

Does anyone have an example of using a NetMQPoller in conjunction with sending messages from an asynchronous task? i.e.

// 1. Receive a message from one of many clients
// 2. Handle client requests on multiple async Task's (may be file bound)
// 3. Respond to clients when long running Task finishes

We are currently using a very barebones Dealer/Router setup for multiple clients to talk to a single server. On the server we pass each client request on to a pool of Tasks/Threads that handle the request, generate a response, and then attempt to respond to the original client. Because I'm new to this I didn't realize calling the client on a different Task/Thread would bork things but it makes sense now.

What I'm looking for is the right way to handle this. I could definitely have a BlockingCollection of messages to send back and just service that from the original thread, but I recently discovered the NetMQPoller and it sort of sounds like it has some of this built in. I am not 100% sure if it can help me send messages on the original thread or not but there are a lot of parts to it that sound promising. Can anyone point me at an example of sending messages back to the Dealer/client from the Router/server on a different thread than the original message was received on?

UPDATE I think I may have sort-of solved my own question by combining

  • Router-Dealer example with the
  • WithPoller unit test from NetMQUnitTests.cs
  •     class Program
            static void Main( string[] args )
                // NOTES
                // 1. Use ThreadLocal<DealerSocket> where each thread has
                //    its own client DealerSocket to talk to server
                // 2. Each thread can send using it own socket
                // 3. Each thread socket is added to poller
                const int delay = 3000; // millis
                var clientSocketPerThread = new ThreadLocal<DealerSocket>();
                using( var server = new RouterSocket("@tcp://127.0.0.1:5556") )
                using( var queue = new NetMQQueue<MyResponse>() )
                using( var poller = new NetMQPoller { queue } )
                    // Start some threads, each with its own DealerSocket
                    // to talk to the server socket. Creates lots of sockets,
                    // but no nasty race conditions no shared state, each
                    // thread has its own socket, happy days.
                    for( int i = 0; i < 3; i++ )
                        Task.Factory.StartNew(state =>
                            DealerSocket client = null;
                            if( !clientSocketPerThread.IsValueCreated )
                                client = new DealerSocket();
                                client.Options.Identity =
                                    Encoding.Unicode.GetBytes(state.ToString());
                                client.Connect("tcp://127.0.0.1:5556");
                                client.ReceiveReady += Client_ReceiveReady;
                                clientSocketPerThread.Value = client;
                                poller.Add(client);
                                client = clientSocketPerThread.Value;
                            while( true )
                                var messageToServer = new NetMQMessage();
                                messageToServer.AppendEmptyFrame();
                                messageToServer.Append(state.ToString());
                                Console.WriteLine("======================================");
                                Console.WriteLine(" OUTGOING MESSAGE TO SERVER ");
                                Console.WriteLine("======================================");
                                PrintFrames("Client Sending", messageToServer);
                                client.SendMultipartMessage(messageToServer);
                                Thread.Sleep(delay);
                        }, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
                    queue.ReceiveReady += ( sender, e ) =>
                        var queueItem = e.Queue.Dequeue();
                        var messageToClient = new NetMQMessage();
                        messageToClient.Append(queueItem.ClientId);
                        messageToClient.AppendEmptyFrame();
                        messageToClient.Append(queueItem.MessageToClient);
                        server.SendMultipartMessage(messageToClient);
                    // start the poller
                    poller.RunAsync();
                    // server loop
                    while( true )
                        var clientMessage = server.ReceiveMultipartMessage();
                        Console.WriteLine("======================================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE FROM CLIENT ");
                        Console.WriteLine("======================================");
                        PrintFrames("Server receiving", clientMessage);
                        if( clientMessage.FrameCount == 3 )
                            var clientAddress = clientMessage[0];
                            var clientOriginalMessage = clientMessage[2].ConvertToString();
                            string response = string.Format("{0} back from server {1}",
                                clientOriginalMessage, DateTime.Now.ToLongTimeString());
                            Task.Factory.StartNew(async() =>
                                await Task.Delay(200);
                                var clientResponse = new MyResponse()
                                    ClientId = clientAddress,
                                    MessageToClient = response
                                queue.Enqueue(clientResponse);
                            }, TaskCreationOptions.LongRunning);
            static void PrintFrames( string operationType, NetMQMessage message )
                for( int i = 0; i < message.FrameCount; i++ )
                    Console.WriteLine("{0} Socket : Frame[{1}] = {2}", operationType, i,
                        message[i].ConvertToString());
            static void Client_ReceiveReady( object sender, NetMQSocketEventArgs e )
                bool hasmore = false;
                e.Socket.ReceiveFrameString(out hasmore);
                if( hasmore )
                    string result = e.Socket.ReceiveFrameString(out hasmore);
                    Console.WriteLine("REPLY {0}", result);
            class MyResponse
                public NetMQFrame ClientId;
                public string MessageToClient;
            

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.