Rabbitmq Connect与Channel
RabbitMQ官方提供了Connection对象,本质就是一个TCP连接对象。
Channels对象,虚拟连接。虚拟连接建立在上面Connection对象的TCP连接中。数据流动都是在Channel中进行的。每个Connection对象的虚拟连接也是有限的,如果单个Connnection的Channel对象超出指定范围了,也会有性能问题,另外一个TCP连接上的多个虚拟连接,实际在传输数据时,传输数据的虚拟连接还是独占了TCP连接,其它虚拟连接在排队等待。
在单个的
Connection
对象创建多个
Channel
来实现数据传输,在
channel
信息比较大的情况下,
Connection
带宽会限制消息的传输。那么需要设计
Connection
池,将流量分摊到不同的
connection
上。
官网对于Connection的解读:
AMQP 0-9-1 connections are typically long-lived. AMQP 0-9-1 is an application level protocol that uses TCP for reliable delivery. Connections use authentication and can be protected using TLS. When an application no longer needs to be connected to the server, it should gracefully close its AMQP 0-9-1 connection instead of abruptly closing the underlying TCP connection.
大概意思就是:
AMQP 0-9-1
一般是一个
TCP
的长链接,当应用程序不再需要连接到服务器时,应该正常关闭
AMQP 0-9-1
连接而不是关闭
TCP
连接。
官网对于Channel的解读:
Some applications need multiple connections to the broker. However, it is undesirable to keep many TCP connections open at the same time because doing so consumes system resources and makes it more difficult to configure firewalls. AMQP 0-9-1 connections are multiplexed with
channels
that can be thought of as "lightweight connections that share a single TCP connection".
Every protocol operation performed by a client happens on a channel. Communication on a particular channel is completely separate from communication on another channel, therefore every protocol method also carries a channel ID (a.k.a. channel number), an integer that both the broker and clients use to figure out which channel the method is for.
A channel only exists in the context of a connection and never on its own. When a connection is closed, so are all channels on it.
For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.
大概的意思就是:
一些应用需要同时创建多个连接到
broker
也就是
RabbitMQ
服务器上。然而因为防火墙的存在,很难同时创建多个连接。
AMQP 0-9-1
连接使用多个
channel
连接实现对单一
Connection
的复用。
客户端的每一个协议操作都发送在
channel
上。每个协议方法携带者
channel ID
。
broker
和
client
使用
channel ID
来确定方法对应的
channel
。因此实现
channel
之间的数据隔离。
channel
不能单独存在,仅存在
connection
上下文中。当
connection
关闭时,
channel
也会关闭。
多线程/进程之间打开一个
channel
但不共享
channels
是很普遍的。
通道和并发注意事项(线程安全)
As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
要避免一个反例,
为每一个发布的消息新建一个
channel
,开辟一个新的
channel
需要一个网络的往返,这种模式是很低效的。
channel
保持合理的存活时间。
It is possible to use channel pooling to avoid concurrent publishing on a shared channel: once a thread is done working with a channel, it returns it to the pool, making the channel available for another thread. Channel pooling can be thought of as a specific synchronization solution. It is recommended that an existing pooling library is used instead of a homegrown solution. For example,
Spring AMQP
which comes with a ready-to-use channel pooling feature.
可以使用
channel pool
来避免共享
channel
上并发发布:一旦一个线程使用完了
channel
,那么它将返回到
pool
中。其他线程便可使用这个
Channel
。线程池是一个解决方案,可以使用
Spring AMQP
线程池而不是自己开发。
总结:
频繁建立
TCP
连接和
channel
连接是消耗性能的,于是我们希望可以共享
connection
或者
channel
。达到连接的复用
Parameters:
Connectionsize:int类型,Rabbitmqpool池连接的最大数
Channelsize:int类型,Rabbitmqpool池Channel的最大数
return:None
#
单例保证唯一
class
Rabbitmqpool:
#
定义类属性
__instance
=
None
__lock
=
threading.Lock()
def
__init__
(self, Connectionsize, Channelsize):
self.maxConnectionsize
=
Connectionsize
self.maxChannelsize
=
Channelsize
self.nowConnectionsize
=
0
self.nowChannelsize
=
0
self.connectpool
=
{}
self.channelpool
=
{}
self.certdic
=
{}
def
__new__
(cls, Connectionsize, Channelsize):
if
not
cls.
__instance
:
cls.
__instance
= object.
__new__
(cls)
return
cls.
__instance
function: 获取一个空闲Channel或者新建一个Channel
Parameters:
return:
channel:channel
cname:连接名
def
get_channel(self):
try
:
self.
__lock
.acquire()
cname
=
""
channel
=
None
#
在已存在键中查找空闲Channel
for
connectionname
in
self.connectpool:
if
len(self.channelpool[connectionname]) !=
0:
channel
= self.channelpool[connectionname][-1
]
cname
=
connectionname
self.channelpool[connectionname]
= self.channelpool[connectionname][0:-1
]
print
(
"
取出一个Channel
"
)
break
#
如果没有找到空闲Channel,canme为"",则新建一个Channel
if
cname ==
""
:
if
self.nowChannelsize <
self.maxChannelsize:
#
从连接池返回一个连接的名字
if
len(self.connectpool) !=
0:
cname
=
random.choice(list(self.connectpool))
#
根据名字拿到此连接,传入连接和Pool池创建Channel
CreateChannel(self.connectpool[cname], self)
#
得到一个新Channel
channel = self.channelpool[cname][-1
]
self.channelpool[cname]
= self.channelpool[cname][0:-1
]
print
(
"
创建一个Channel
"
)
#
如果没有连接,则新建连接与channel
else
:
if
len(self.certdic) !=
0:
cert
=
random.choice(list(self.certdic))
cname
=
str(uuid.uuid4().int)
print
(
"
创建一个连接
"
)
CreateConnection(str(self.certdic[cert][
"
rabbitmq_host
"
]), str(self.certdic[cert][
"
rabbitmq_port
"
]),
str(self.certdic[cert][
"
rabbitmq_virtual_host
"
]),
str(self.certdic[cert][
"
rabbitmq_user
"
]),
str(self.certdic[cert][
"
rabbitmq_password
"
]), self, cname)
CreateChannel(self.connectpool[cname], self)
#
得到一个新Channel
channel = self.channelpool[cname][-1
]
self.channelpool[cname]
= self.channelpool[cname][0:-1
]
print
(
"
创建一个Channel
"
)
else
:
print
(
"
无法创建Channel,无连接凭证,不能创建连接!
"
)
else
:
print
(
"
无法创建Channel,超过限制
"
)
finally
:
self.
__lock
.release()
return
channel, cname
def
create_channel(self):
try
:
self.
__lock
.acquire()
if
len(self.certdic) !=
0:
cert
=
random.choice(list(self.certdic))
cname
=
str(uuid.uuid4().int)
print
(
"
创建一个连接
"
)
CreateConnection(str(self.certdic[cert][
"
rabbitmq_host
"
]), str(self.certdic[cert][
"
rabbitmq_port
"
]),
str(self.certdic[cert][
"
rabbitmq_virtual_host
"
]),
str(self.certdic[cert][
"
rabbitmq_user
"
]),
str(self.certdic[cert][
"
rabbitmq_password
"
]), self, cname)
CreateChannel(self.connectpool[cname], self)
#
得到一个新Channel
channel = self.channelpool[cname][-1
]
self.channelpool[cname]
= self.channelpool[cname][0:-1
]
print
(
"
创建一个Channel
"
)
return
channel, cname
else
:
print
(
"
无法创建Channel,无连接凭证,不能创建连接!
"
)
return
None,
""
finally
:
self.
__lock
.release()
def
return_channel(self, channel, connectionname):
try
:
self.
__lock
.acquire()
self.channelpool[connectionname].append(channel)
finally
:
self.
__lock
.release()
def
closepool(self):
def
delconnection(self, connectionname):
try
:
self.
__lock
.acquire()
if
connectionname
in
self.connectpool:
del
self.connectpool[connectionname]
self.nowConnectionsize
= self.nowConnectionsize -1
self.nowChannelsize
= self.nowChannelsize -
len(self.channelpool[connectionname])
del
self.channelpool[connectionname]
finally
:
self.
__lock
.release()
def
get_certtemplate(self):
return
{
"
rabbitmq_host
"
:
""
,
"
rabbitmq_port
"
: 5672,
"
rabbitmq_virtual_host
"
:
""
,
"
rabbitmq_user
"
:
""
,
"
rabbitmq_password
"
:
""
}
def
addcert(self,cert):
self.certdic[cert[
"
rabbitmq_host
"
]] =
cert
#
连接可以自己创建
class
CreateConnection:
def
__init__
(self, rabbitmq_host, rabbitmq_port, rabbitmq_virtual_host, rabbitmq_user, rabbitmq_password,
Rabbitmqpool, Connectionname
= str(uuid.uuid4().int), heartbeat=60
):
if
Rabbitmqpool.nowConnectionsize <
Rabbitmqpool.maxConnectionsize:
if
Connectionname
not
in
Rabbitmqpool.connectpool:
self.rabbitmq_user
=
str(rabbitmq_user)
self.rabbitmq_password
=
str(rabbitmq_password)
self.rabbitmq_host
=
rabbitmq_host
self.rabbitmq_port
=
rabbitmq_port
self.rabbitmq_virtual_host
=
rabbitmq_virtual_host
self.connectionname
=
Connectionname
print
(self.rabbitmq_user,self.rabbitmq_password,self.rabbitmq_host,self.rabbitmq_port,self.rabbitmq_virtual_host,self.connectionname)
credentials
=
pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
try
:
self.connection
=
pika.BlockingConnection(
pika.ConnectionParameters(
host
=
rabbitmq_host,
port
=
rabbitmq_port,
virtual_host
=
rabbitmq_virtual_host,
heartbeat
=
heartbeat,
credentials
=
credentials))
Rabbitmqpool.connectpool[Connectionname]
=
self
Rabbitmqpool.nowConnectionsize
+= 1
if
self.connectionname
not
in
Rabbitmqpool.channelpool:
Rabbitmqpool.channelpool[self.connectionname]
=
[]
print
(
"
创建连接:
"
, Connectionname)
except
Exception as e:
print
(
"
创建连接失败:
"
, e)
else
:
print
(
"
创建连接失败,此连接名已存在:
"
, Connectionname)
else
:
print
(
"
创建连接失败,连接池已满,无法创建连接池
"
)
def
get_connection(self):
return
self.connection
class
CreateChannel:
def
__init__
(self, Connection, Rabbitmqpool):
Rabbitmqpool.channelpool[Connection.connectionname].append(Connection.get_connection().channel())
Rabbitmqpool.nowChannelsize
+= 1
我这里并没有增加过期时间:
rabbitmq_password
= ""
rabbitmq_virtual_host
= ""
credentials
=
pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
Pool
= Rabbitmqpool(3, 20
)
cert
=
Pool.get_certtemplate()
cert[
'
rabbitmq_host
'
] =
rabbitmq_host
cert[
'
rabbitmq_virtual_host
'
] =
rabbitmq_virtual_host
cert[
'
rabbitmq_user
'
] =
rabbitmq_user
cert[
'
rabbitmq_password
'
] =
rabbitmq_password
cert[
'
rabbitmq_port
'
] =
rabbitmq_port
Pool.addcert(cert)
发送消息代码
try:
c, cname = Pool.get_channel()
c.basic_publish(exchange='',
routing_key='队列名',
body=str(data),
Pool.return_channel(c, cname)
except Exception as e:
print("发送错误:",e) #链接过期
Pool.delconnection(cname) #channel过期时,删除此链接和此链接下的所有channel
c, cname = Pool.create_channel() #创建一个新的链接和channel
c.basic_publish(exchange='',
routing_key='队列名',
body=str(senddata),
Pool.return_channel(c, cname)
refer:
https://www.jianshu.com/p/24e541170ace