ZeroMQ分享-part2

ZeroMQ API Reference

void *zmq_ctx_new();

创建一个ZMQ的上下文环境,是ZMQ一切的开始。

  • 线程安全,不需要自己加锁和信号量处理。
  • Return:成功的话返回一个句柄;否则返回NULL并设置errno。
  • int zmq_ctx_shutdown(void *context);
    

    关闭ZMQ的上下文环境。

  • 基于该上下文环境的socket进行的阻塞操作立即返回ETERM。
  • zmq_close()也会立即返回ETERM。
  • Return:成功的话返回0;否则返回-1并设置errno。
  • 注意 该函数并没有释放ZMQ分配的资源。因此,对于彻底关闭ZMQ通讯而言,执行该函数是可选的,调用之后仍需调用zmq_ctx_term释放ZMQ分配的所有资源。

    int zmq_ctx_term(void *context);
    

    销毁ZMQ的上下文环境,释放ZMQ分配的所有资源,是ZMQ一切的结束。执行zmq_ctx_shutdown的所有操作。

  • 在打断所有阻塞的调用后,zmq_ctx_term自己会阻塞直到所有context上的socket执行zmq_close()完毕。
  • Return:成功的话返回0;否二返回-1并设置errno。
  • void *zmq_socket(void *context, int type);
    

    创建一个给定上下文环境下的ZMQ socket,type指定了ZMQ通讯的模式。

  • 返回的socket是无状态的,还需要调用zmq_connect连接到端点,或zmq_bind绑定到指定端口/IP
  • Return:成功的话返回一个句柄;否则返回NULL并设置errno。
  • ZMQ socket不是线程安全的,尽量不要多线程共享一个zmq socket。
  • 传统的socket提供的是同步的字节流或数据包通讯接口;而ZMQ socket提供的是异步的消息队列,传递的不是离散的数据包或连续的字节流,而是离散的“消息”。
  • 传统的socket提供的是一对一或多对一(多个客户端,一个服务器),特殊情况下会有一对多(多播);而ZMQ socket提供的是多对多模型,客户端可以通过zmq_connect连接多个服务器,服务器端可以通过zmq_bind连接多个客户端。
  • int zmq_close(void *socket);
    

    关闭指定的socket上的所有连接,未到达的数据会被丢弃。

  • 默认情况下,未发送的数据不会被丢弃,这会使zmq_ctx_term函数调用发生堵塞(30秒)。可以通过zmq_setsockopt的ZMQ_LINGER改变策略。
  • Return:成功的话返回0;否则返回-1并设置errno。
  • int zmq_connect(void *socket, const char *endpoint);
    

    (客户端)与指定的端点建立连接,通过endpoint参数指定了通讯协议。

  • 除ZMQ_PAIR类型的socket,任何ZMQ socket都支持通过zmq_connect与多个端点建立连接,具体的机制取决于socket模式。
  • Return:成功的话返回0;否则返回-1并设置errno。
  • 传统的connect函数一般不马上返回(三次握手),一旦超时则socket不再可用,必须关闭;zmq_connect总是立即返回,但是成功调用函数并不表明已经建立连接。因此,绝大多数情况下,服务器调用zmq_bind与客户端调用zmq_connect的顺序无所谓。
  • 除了ZMQ_ROUTER模式外,调用zmq_connect后socket进入就绪状态;而调用zmq_bind后socket进入静音状态,根据zmq_socket类型会阻塞或返回并丢弃消息。
  • int zmq_disconnect(void *socket, const char *endpoint);
    

    (客户端)关闭指定的socket上的指定连接,未到达的数据会被丢弃。

  • 默认情况下,与zmq_close相同,未发送的数据不会被丢弃。
  • Return:成功的话返回0;否则返回-1并设置errno。
  • int zmq_bind(void *socket, const char *endpoint);
    

    (服务器)将socket与指定的端点绑定,接收客户端的连接。

  • 除ZMQ_PAIR类型的socket,任何ZMQ socket都支持通过zmq_bind与多个端点绑定,接收客户的连接请求。
  • Return:成功的话返回0;否则返回-1并设置errno。
  • 注意 调用zmq_bind后的socket进入静音状态,根据zmq_socket类型会阻塞或立即返回并丢弃消息

    int zmq_unbind(void *socket, const char *endpoint);
    

    (服务器)将socket与指定的端点解绑,不再接收向该端点提出连接请求的客户端连接。

  • 端点包含通配符“*”时,需通过getsockopt的ZMQ_LAST_ENDPOINT获得真实的端点再调用zmq_unbind解绑。例如:
  • /* Create a ZMQ_SUB socket */
    void *socket = zmq_socket(context, ZMQ_SUB);
    assert(socket);
    /* Bind it to the system-assigned ephemeral port using a TCP transport */
    rc = zmq_bind(socket, "tcp://127.0.0.1:*");
    assert(rc == 0);
    /* Obtain real endpoint */
    const size_t buf_size = 32;
    char buf[buf_size];
    rc = zmq_getsockopt(socket, ZMQ_LAST_ENDPOINT, buf,
                           (size_t *)&buf_size);
    assert (rc == 0);
    /* Unbind socket by real endpoint */
    rc = zmq_unbind(socket, buf); 
    assert (rc == 0);
    
  • Return:成功的话返回0;否则返回-1并设置errno。
  • int zmq_setsockopt(void *socket, int option_name,
                        const void *option_value,size_t option_len);
    

    为指定的socket设定option_name,值由option_value指定。为了使选项生效,要在建立连接(如zmq_connect)之前调用zmq_setsockopt。

  • ZMQ_BACKLOG:设定面向连接的协议中,socket队列的最大长度,默认是100。
  • ZMQ_CONNECT_TIMEOUT:为connect()系统调用设置超时时间,默认是0。
  • ZMQ_IPV6:设置值为1表示启用IPv6协议,默认是0。
  • ZMQ_LINGER:设置调用zmq_disconnect或zmq_close后尚未接收消息的等待时间。该选项将进一步影响zmq_ctx_term的阻塞时间。设置为-1表示不丢弃任何消息,zmq_ctx_term将会一直阻塞直到消息全部接收;设置为0表示丢弃消息并立刻返回;设置为正值x表示,x毫秒后将返回,默认是30000毫秒。
  • ZMQ_RCVTIMEO:设置recv操作的超时时间。在没有消息接收的情况下,设置为0时,zmq_recv将立即返回-1,并设置errno为EAGAIN;设置为-1时,zmq_recv会一直阻塞直到有消息到达;设置为x时,zmq_recv将等待x毫秒再返回。
  • 注意 该选项会被zmq_recv的flag参数ZMQ_DONTWAIT屏蔽。

  • ZMQ_SNDTIMEO:设置send操作的超时时间,与ZMQ_RCVTIMEO类似。
  • 注意 该选项会被zmq_send的flag参数ZMQ_DONTWAIT屏蔽。

    int zmq_getsockopt(void *socket, int option_name,
                        void *option_value,size_t *option_len);
    

    从指定的socket中提取指定option_name,保存在option_value中。

  • ZMQ_THREAD_SAFE:返回一个布尔类型,表示当前的socket是否是线程安全的。
  • ZMQ_LAST_ENDPOINT:获得试用通配符的端点的真实值。需要注意的是,若TCP的主机为INADDR_ANY,则返回0.0.0.0。
  • ZMQ_RCVMORE:返回一个布尔类型,表示从socket最后接收到的消息是否还有后续消息。该选项由zmq_send的flags设置。
  • int zmq_recv(void *socket, void *buf,size_t len, int flags);
    

    从指定的socket接收消息,并存储在buf中。

  • 如果接收的消息长度大于参数len,消息会被截断。
  • 若flags参数为0,则在无消息时zmq_recv会阻塞直到消息到来;若flags为ZMQ_DONTWAIT,则zmq_recv会立即返回EAGAIN。
  • 注意 当socket无数据接收时,若flags参数设置为ZMQ_DONTWAIT则zmq_recv立即返回-1,忽略zmq_setsockopt的ZMQ_RCVTIMEO设置的超时时间。

  • Return:成功时返回接收的比特数;否则返回-1并设置errno。
  • int zmq_send(void *socket, void *buf,size_t len, int flags);
    

    将buf中的消息放入指定socket的发送队列。

  • 若flags参数为ZMQ_DONTWAIT,则在无建立的连接情况下,zmq_send立即返回EAGAIN。若flags参数为ZMQ_SNDMORE,则表明消息将分片发送。
  • Return:成功时返回消息的比特数;否则返回-1并设置errno。
  • 成功调用zmq_send并不能说明消息已经传输到网络,只能保证消息被放入消息队列,而ZMQ对消息队列中的消息负责。
    对于多片的消息,ZMQ会“原子”地发送。即要么消息全部送达,要么消息全部未送达。
  • 当socket无数据发送时,若flags参数设置为ZMQ_DONTWAIT则zmq_send立即返回-1,忽略zmq_setsockopt的ZMQ_SNDTIMEO设置的超时时间。
  • const char *zmq_strerror(int errnum);
    

    根据给定的errno序号得到相应的出错信息。

  • ZMQ定义了额外的errno,因此应用程序应该调用zmq_strerror而不是strerror来获得出错信息。
  • Return:返回一个字符串的首地址。
  • int zmq_poll(zmq_pollitem_t *items, int nitems,long timeout);
    

    提供ZMQ的多路I/O复用机制。

  • items参数的结构如下,其中,socket为ZMQ socket,而fd为标准的posix文件描述符,两者填一个。调用zmq_poll之前为items的每个成员指定events:
  • ZMQ_POLLIN, ZMQ_POLLOUT, ZMQ_POLLERR, ZMQ_POLLPRI
  • typedef struct{
        void *socket;
        int fd;
        short events;
        short revents;
    } zmq_pollitem_t;
    
  • Return:成功时返回revents触发并与events匹配的items成员个数;否则返回-1并设置errno。
  • zmq_poll提供了与传统的select和poll类似的“轮询”功能。在单线程下可以同时监控多个套接字的响应状态,使得服务端能够同时处理多个客户端的请求,也可以使客户端同时向多个服务端发送请求。
  • 其实,对于ZMQ来说,zmq_poll是不必要的,因为zmq_connect和zmq_bind支持连接/绑定多个端点。完全可以使用它们取代zmq_poll的功能。例如:
  • // using zmq_poll implements I/O multiplexing
    vector<string> zmq_url/* */;
    int port_num/* */;
    vector<void *> receiver;
    vector<void *> context;
    zmq_pollitem_t *poll_item;int ret;
    poll_item = (zmq_pollitem_t*)malloc(sizeof(zmq_pollitem_t) * port_num);
    for (int i = 0; i < port_num; ++i) {
            void *aContext, *aReceiver;
            aContext = zmq_ctx_new();
            aReceiver = zmq_socket(aContext, ZMQ_PULL);
            int timeout = 1000;  // milliseconds 
            zmq_setsockopt(aReceiver, ZMQ_RCVTIMEO, &timeout, sizeof(int)); 
            ret = zmq_connect(aReceiver, zmq_url[i].c_str());    
            assert(0 == ret);    
            receiver.push_back(aReceiver);    
            context.push_back(aContext);    
            poll_item[i].socket = receiver[i];    
            poll_item[i].fd = 0;    
            poll_item[i].events = ZMQ_POLLIN;    
            poll_item[i].revents = 0;
    ret = zmq_poll(poll_item, port_num, 0);
    assert(-1 != ret);
    for (int i = 0; i < port_num; ++i) {  
            if (poll_item[i].revents & ZMQ_POLLIN) {        
                ret = zmq_recv(receiver[i] ...       
    // using zmq_connect implements I/O multiplexing
    vector<string> zmq_url/* */;
    int port_num/* */;
    void *context = zmq_ctx_new();
    void *receiver = zmq_socket(context, ZMQ_PULL);
    zmq_setsockopt(receiver, ZMQ_RCVTIMEO, &timeout, sizeof(int));
    int timeout = 1000;  // milliseconds
    int ret;
    for (int i = 0; i < port_num; ++i) {  
            ret = zmq_connect(receiver, zmq_url[i].c_str());