添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

使用 Pika 连接 rabbitmq 集群

使用 python 编程经常会用到 pika 来向 rabbitmq 发送消息,单个 rabbitmq 节点连接比较简单,本文介绍使用 rabbitmq 集群情况下的连接方式。

vip 连接方式

在 client 与 rabbitmq server 之间通过 haproxy 等负载均衡来提供 vip,我使用的环境就是采用这种方式,但是遇到某一节点挂掉时再访问 vip 连接 rabbitmq 集群会连接失败,常见 log 如下:





1


2


3


4


5


6


7


8


9


10


11


12


13


14


15


16


17


18


19


20




​<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>​


​DEBUG:pika.adapters.select_connection:Using EPollPoller​


​DEBUG:pika.callback:Added: {​ ​​ ​'callback'​ ​​ ​: <bound method SelectConnection._on_connection_start of <SelectConnection CLOSED socket=None ​ ​​ ​params​ ​​ ​=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: True, ​ ​​ ​'arguments'​ ​​ ​: None, ​ ​​ ​'calls'​ ​​ ​: 1}​


​DEBUG:pika.callback:Added: {​ ​​ ​'callback'​ ​​ ​: <bound method SelectConnection._on_connection_close of <SelectConnection CLOSED socket=None ​ ​​ ​params​ ​​ ​=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: True, ​ ​​ ​'arguments'​ ​​ ​: None, ​ ​​ ​'calls'​ ​​ ​: 1}​


​DEBUG:pika.callback:Added: {​ ​​ ​'callback'​ ​​ ​: <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult ​ ​​ ​object​ ​​ ​ ​at 0x7f869db987a0>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: False, ​ ​​ ​'arguments'​ ​​ ​: None}​


​DEBUG:pika.callback:Added: {​ ​​ ​'callback'​ ​​ ​: <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult ​ ​​ ​object​ ​​ ​ ​at 0x7f869db98758>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: False, ​ ​​ ​'arguments'​ ​​ ​: None}​


​DEBUG:pika.callback:Added: {​ ​​ ​'callback'​ ​​ ​: <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult ​ ​​ ​object​ ​​ ​ ​at 0x7f869db987e8>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: False, ​ ​​ ​'arguments'​ ​​ ​: None}​


​DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout ​ ​​ ​object​ ​​ ​ ​at 0x7f869dc0b6d0> with deadline=1538140088.706256 and callback=<bound method SelectConnection._on_connect_timer of <SelectConnection INIT socket=None ​ ​​ ​params​ ​​ ​=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>; now=1538140088.71; delay=0​


​INFO:pika.adapters.base_connection:Pika version 0.12.0 connecting to 10.10.11.1:5672​


​ERROR:pika.adapters.base_connection:Read empty data, calling disconnect​


​INFO:pika.connection:Disconnected ​ ​​ ​from​ ​​ ​ ​RabbitMQ at 10.10.11.1:5672 (-1): EOF​


​ERROR:pika.connection:Incompatible Protocol Versions​


​ERROR:pika.connection:Connection setup failed due to The protocol returned ​ ​​ ​by​ ​​ ​ ​the server ​ ​​ ​is​ ​​ ​ ​not supported: (-1, ​ ​​ ​'EOF'​ ​​ ​)​


​DEBUG:pika.callback:Processing 0:_on_connection_error​


​DEBUG:pika.callback:Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult ​ ​​ ​object​ ​​ ​ ​at 0x7f869db987a0>> ​ ​​ ​for​ ​​ ​ ​"0:_on_connection_error"​


​DEBUG:pika.callback:Processing 0:_on_connection_closed​


​DEBUG:pika.callback:Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult ​ ​​ ​object​ ​​ ​ ​at 0x7f869db987e8>> ​ ​​ ​for​ ​​ ​ ​"0:_on_connection_closed"​


​DEBUG:pika.callback:Incremented callback reference counter: {​ ​​ ​'callback'​ ​​ ​: <bound method SelectConnection._on_connection_start of <SelectConnection CLOSED socket=None ​ ​​ ​params​ ​​ ​=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: True, ​ ​​ ​'arguments'​ ​​ ​: None, ​ ​​ ​'calls'​ ​​ ​: 2}​


​DEBUG:pika.callback:Incremented callback reference counter: {​ ​​ ​'callback'​ ​​ ​: <bound method SelectConnection._on_connection_close of <SelectConnection CLOSED socket=None ​ ​​ ​params​ ​​ ​=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: True, ​ ​​ ​'arguments'​ ​​ ​: None, ​ ​​ ​'calls'​ ​​ ​: 2}​


​ERROR:pika.adapters.blocking_connection:Connection open failed - The protocol returned ​ ​​ ​by​ ​​ ​ ​the server ​ ​​ ​is​ ​​ ​ ​not supported: (-1​



这个报错通常是由于网络问题导致,尝试过 Telnet 连接 vip 和端口,都正常返回,目前还未找到 pika 访问 vip 连接 rabbitmq 失败的原因,所以采用类似于 openstack 中连接 rabbitmq 的方式,配置多主机列表,建立连接池。

配置 multiple hosts

openstack 配置transport_url 采用 rabbitmq 集群 host 列表方式,然后在 oslo.message 中建立连接池,通过 kombu 来使用 rabbitmq。参考这种方式,用 pika 实现。

pika 的官方文档中有示例参考 ​ ​blocking_consume_recover_multiple_hosts​

实际实现的时候会抛异常,原因是传递给 pika 需要是个实例而不是列表,官网上提供的方式把 host url 参数化后直接放到列表里传给 pika 进行连接:





1


2


3


4


5




​node1 = pika.URLParameters(​ ​​ ​'amqp://node1'​ ​​ ​)​


​node2 = pika.URLParameters(​ ​​ ​'amqp://node2'​ ​​ ​)​


​node3 = pika.URLParameters(​ ​​ ​'amqp://node3'​ ​​ ​)​


​all_endpoints = [node1, node2, node3]​


​connection = pika.BlockingConnection(all_endpoints)​



实际执行后报错如下:





1




​Expected instance of Parameters, not [.........]​



github 上提交的 issue: ​ ​parameters error​

目前不支持直接传入多 host url 来池化 rabbitmq 集群的连接,所以要在应用程序中单独实现。

用一个简单 for 循环来做:





1


2


3


4


5


6


7


8


9




​random.shuffle(all_endpoints)​


​for​ ​​ ​ ​url ​ ​​ ​in​ ​​ ​ ​all_endpoints:​


​try​ ​​ ​:​


​logging.basicConfig(level=logging.DEBUG)​


​connection = pika.BlockingConnection(url)​


​except Exception ​ ​​ ​as​ ​​ ​ ​ex:​


​print str(ex)​


​else​ ​​ ​:​


​break​



用 shuffle 来改变列表中的 host 顺序,可以起到负载均衡的作用。


使用 Pika 连接 rabbitmq 集群

使用 python 编程经常会用到 pika 来向 rabbitmq 发送消息,单个 rabbitmq 节点连接比较简单,本文介绍使用 rabbitmq 集群情况下的连接方式。

vip 连接方式

在 client 与 rabbitmq server 之间通过 haproxy 等负载均衡来提供 vip,我使用的环境就是采用这种方式,但是遇到某一节点挂掉时再访问 vip 连接 rabbitmq 集群会连接失败,常见 log 如下:





1


2


3


4


5


6


7


8


9


10


11


12


13


14


15


16


17


18


19


20




​<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>​


​DEBUG:pika.adapters.select_connection:Using EPollPoller​


​DEBUG:pika.callback:Added: {​ ​​ ​'callback'​ ​​ ​: <bound method SelectConnection._on_connection_start of <SelectConnection CLOSED socket=None ​ ​​ ​params​ ​​ ​=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: True, ​ ​​ ​'arguments'​ ​​ ​: None, ​ ​​ ​'calls'​ ​​ ​: 1}​


​DEBUG:pika.callback:Added: {​ ​​ ​'callback'​ ​​ ​: <bound method SelectConnection._on_connection_close of <SelectConnection CLOSED socket=None ​ ​​ ​params​ ​​ ​=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: True, ​ ​​ ​'arguments'​ ​​ ​: None, ​ ​​ ​'calls'​ ​​ ​: 1}​


​DEBUG:pika.callback:Added: {​ ​​ ​'callback'​ ​​ ​: <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult ​ ​​ ​object​ ​​ ​ ​at 0x7f869db987a0>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: False, ​ ​​ ​'arguments'​ ​​ ​: None}​


​DEBUG:pika.callback:Added: {​ ​​ ​'callback'​ ​​ ​: <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult ​ ​​ ​object​ ​​ ​ ​at 0x7f869db98758>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: False, ​ ​​ ​'arguments'​ ​​ ​: None}​


​DEBUG:pika.callback:Added: {​ ​​ ​'callback'​ ​​ ​: <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult ​ ​​ ​object​ ​​ ​ ​at 0x7f869db987e8>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: False, ​ ​​ ​'arguments'​ ​​ ​: None}​


​DEBUG:pika.adapters.select_connection:call_later: added timeout <pika.adapters.select_connection._Timeout ​ ​​ ​object​ ​​ ​ ​at 0x7f869dc0b6d0> with deadline=1538140088.706256 and callback=<bound method SelectConnection._on_connect_timer of <SelectConnection INIT socket=None ​ ​​ ​params​ ​​ ​=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>; now=1538140088.71; delay=0​


​INFO:pika.adapters.base_connection:Pika version 0.12.0 connecting to 10.10.11.1:5672​


​ERROR:pika.adapters.base_connection:Read empty data, calling disconnect​


​INFO:pika.connection:Disconnected ​ ​​ ​from​ ​​ ​ ​RabbitMQ at 10.10.11.1:5672 (-1): EOF​


​ERROR:pika.connection:Incompatible Protocol Versions​


​ERROR:pika.connection:Connection setup failed due to The protocol returned ​ ​​ ​by​ ​​ ​ ​the server ​ ​​ ​is​ ​​ ​ ​not supported: (-1, ​ ​​ ​'EOF'​ ​​ ​)​


​DEBUG:pika.callback:Processing 0:_on_connection_error​


​DEBUG:pika.callback:Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult ​ ​​ ​object​ ​​ ​ ​at 0x7f869db987a0>> ​ ​​ ​for​ ​​ ​ ​"0:_on_connection_error"​


​DEBUG:pika.callback:Processing 0:_on_connection_closed​


​DEBUG:pika.callback:Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult ​ ​​ ​object​ ​​ ​ ​at 0x7f869db987e8>> ​ ​​ ​for​ ​​ ​ ​"0:_on_connection_closed"​


​DEBUG:pika.callback:Incremented callback reference counter: {​ ​​ ​'callback'​ ​​ ​: <bound method SelectConnection._on_connection_start of <SelectConnection CLOSED socket=None ​ ​​ ​params​ ​​ ​=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: True, ​ ​​ ​'arguments'​ ​​ ​: None, ​ ​​ ​'calls'​ ​​ ​: 2}​


​DEBUG:pika.callback:Incremented callback reference counter: {​ ​​ ​'callback'​ ​​ ​: <bound method SelectConnection._on_connection_close of <SelectConnection CLOSED socket=None ​ ​​ ​params​ ​​ ​=<URLParameters host=10.10.11.1 port=5672 virtual_host=/ ssl=False>>>, ​ ​​ ​'only'​ ​​ ​: None, ​ ​​ ​'one_shot'​ ​​ ​: True, ​ ​​ ​'arguments'​ ​​ ​: None, ​ ​​ ​'calls'​ ​​ ​: 2}​


​ERROR:pika.adapters.blocking_connection:Connection open failed - The protocol returned ​ ​​ ​by​ ​​ ​ ​the server ​ ​​ ​is​ ​​ ​ ​not supported: (-1​



这个报错通常是由于网络问题导致,尝试过 Telnet 连接 vip 和端口,都正常返回,目前还未找到 pika 访问 vip 连接 rabbitmq 失败的原因,所以采用类似于 openstack 中连接 rabbitmq 的方式,配置多主机列表,建立连接池。

配置 multiple hosts

openstack 配置transport_url 采用 rabbitmq 集群 host 列表方式,然后在 oslo.message 中建立连接池,通过 kombu 来使用 rabbitmq。参考这种方式,用 pika 实现。

pika 的官方文档中有示例参考 ​ ​blocking_consume_recover_multiple_hosts​

实际实现的时候会抛异常,原因是传递给 pika 需要是个实例而不是列表,官网上提供的方式把 host url 参数化后直接放到列表里传给 pika 进行连接:





1


2


3


4


5




​node1 = pika.URLParameters(​ ​​ ​'amqp://node1'​ ​​ ​)​


​node2 = pika.URLParameters(​ ​​ ​'amqp://node2'​ ​​ ​)​


​node3 = pika.URLParameters(​ ​​ ​'amqp://node3'​ ​​ ​)​


​all_endpoints = [node1, node2, node3]​


​connection = pika.BlockingConnection(all_endpoints)​



实际执行后报错如下:





1




​Expected instance of Parameters, not [.........]​



github 上提交的 issue: ​ ​parameters error​

目前不支持直接传入多 host url 来池化 rabbitmq 集群的连接,所以要在应用程序中单独实现。

用一个简单 for 循环来做:





1


2


3


4


5


6


7


8


9




​random.shuffle(all_endpoints)​


​for​ ​​ ​ ​url ​ ​​ ​in​ ​​ ​ ​all_endpoints:​


​try​ ​​ ​:​


​logging.basicConfig(level=logging.DEBUG)​


​connection = pika.BlockingConnection(url)​


​except Exception ​ ​​ ​as​ ​​ ​ ​ex:​


​print str(ex)​


​else​ ​​ ​:​


​break​



用 shuffle 来改变列表中的 host 顺序,可以起到负载均衡的作用。


对于有序数据,通常可以采用二分查找法。对于无序数据,则只能挨个查找。在本节中,我们将讨论有关并行的无序数组的搜索实现。 给定一个数组,我们要查找满足条件的元素。对于串行程序来说,只要遍历一下数组就可以得到结果。但如果要使用并行方式,则需要额外增加一些线程间的通信机制,使各个线程可以有效地运行。 一种简单的策略就是将原始数据集合按照期望的线程数进行分割。每个线程各自独立搜索

centos 开启redis后台运行 centos7 redis

redis安装 在虚拟机创建redis文件夹 mkdir –p /usr/local/src/redis 进入文件夹 cd /usr/local/src/redis 通过Xhell传输redis到虚拟机中,或者通过CentOS tools可以直接拖拽文件到虚拟机的制定位置解压 redis文件 tar xzf redis-3.0.7.tar.gz 解压完毕后,