Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
Does anyone have an example of using a NetMQPoller in conjunction with sending messages from an asynchronous task?
i.e.
// 1. Receive a message from one of many clients
// 2. Handle client requests on multiple async Task's (may be file bound)
// 3. Respond to clients when long running Task finishes
We are currently using a very barebones Dealer/Router setup for multiple clients to talk to a single server. On the server we pass each client request on to a pool of Tasks/Threads that handle the request, generate a response, and then attempt to respond to the original client. Because I'm new to this I didn't realize calling the client on a different Task/Thread would bork things but it makes sense now.
What I'm looking for is the right way to handle this. I could definitely have a BlockingCollection of messages to send back and just service that from the original thread, but I recently discovered the NetMQPoller and it sort of sounds like it has some of this built in. I am not 100% sure if it can help me send messages on the original thread or not but there are a lot of parts to it that sound promising. Can anyone point me at an example of sending messages back to the Dealer/client from the Router/server on a different thread than the original message was received on?
UPDATE
I think I may have sort-of solved my own question by combining
Router-Dealer example with the
WithPoller
unit test from NetMQUnitTests.cs
class Program
static void Main( string[] args )
// NOTES
// 1. Use ThreadLocal<DealerSocket> where each thread has
// its own client DealerSocket to talk to server
// 2. Each thread can send using it own socket
// 3. Each thread socket is added to poller
const int delay = 3000; // millis
var clientSocketPerThread = new ThreadLocal<DealerSocket>();
using( var server = new RouterSocket("@tcp://127.0.0.1:5556") )
using( var queue = new NetMQQueue<MyResponse>() )
using( var poller = new NetMQPoller { queue } )
// Start some threads, each with its own DealerSocket
// to talk to the server socket. Creates lots of sockets,
// but no nasty race conditions no shared state, each
// thread has its own socket, happy days.
for( int i = 0; i < 3; i++ )
Task.Factory.StartNew(state =>
DealerSocket client = null;
if( !clientSocketPerThread.IsValueCreated )
client = new DealerSocket();
client.Options.Identity =
Encoding.Unicode.GetBytes(state.ToString());
client.Connect("tcp://127.0.0.1:5556");
client.ReceiveReady += Client_ReceiveReady;
clientSocketPerThread.Value = client;
poller.Add(client);
client = clientSocketPerThread.Value;
while( true )
var messageToServer = new NetMQMessage();
messageToServer.AppendEmptyFrame();
messageToServer.Append(state.ToString());
Console.WriteLine("======================================");
Console.WriteLine(" OUTGOING MESSAGE TO SERVER ");
Console.WriteLine("======================================");
PrintFrames("Client Sending", messageToServer);
client.SendMultipartMessage(messageToServer);
Thread.Sleep(delay);
}, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
queue.ReceiveReady += ( sender, e ) =>
var queueItem = e.Queue.Dequeue();
var messageToClient = new NetMQMessage();
messageToClient.Append(queueItem.ClientId);
messageToClient.AppendEmptyFrame();
messageToClient.Append(queueItem.MessageToClient);
server.SendMultipartMessage(messageToClient);
// start the poller
poller.RunAsync();
// server loop
while( true )
var clientMessage = server.ReceiveMultipartMessage();
Console.WriteLine("======================================");
Console.WriteLine(" INCOMING CLIENT MESSAGE FROM CLIENT ");
Console.WriteLine("======================================");
PrintFrames("Server receiving", clientMessage);
if( clientMessage.FrameCount == 3 )
var clientAddress = clientMessage[0];
var clientOriginalMessage = clientMessage[2].ConvertToString();
string response = string.Format("{0} back from server {1}",
clientOriginalMessage, DateTime.Now.ToLongTimeString());
Task.Factory.StartNew(async() =>
await Task.Delay(200);
var clientResponse = new MyResponse()
ClientId = clientAddress,
MessageToClient = response
queue.Enqueue(clientResponse);
}, TaskCreationOptions.LongRunning);
static void PrintFrames( string operationType, NetMQMessage message )
for( int i = 0; i < message.FrameCount; i++ )
Console.WriteLine("{0} Socket : Frame[{1}] = {2}", operationType, i,
message[i].ConvertToString());
static void Client_ReceiveReady( object sender, NetMQSocketEventArgs e )
bool hasmore = false;
e.Socket.ReceiveFrameString(out hasmore);
if( hasmore )
string result = e.Socket.ReceiveFrameString(out hasmore);
Console.WriteLine("REPLY {0}", result);
class MyResponse
public NetMQFrame ClientId;
public string MessageToClient;
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.