添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Rabbitmq Connect与Channel

RabbitMQ官方提供了Connection对象,本质就是一个TCP连接对象。
Channels对象,虚拟连接。虚拟连接建立在上面Connection对象的TCP连接中。数据流动都是在Channel中进行的。每个Connection对象的虚拟连接也是有限的,如果单个Connnection的Channel对象超出指定范围了,也会有性能问题,另外一个TCP连接上的多个虚拟连接,实际在传输数据时,传输数据的虚拟连接还是独占了TCP连接,其它虚拟连接在排队等待。

在单个的 Connection 对象创建多个 Channel 来实现数据传输,在 channel 信息比较大的情况下, Connection 带宽会限制消息的传输。那么需要设计 Connection 池,将流量分摊到不同的 connection 上。

官网对于Connection的解读:

AMQP 0-9-1 connections are typically long-lived. AMQP 0-9-1 is an application level protocol that uses TCP for reliable delivery. Connections use authentication and can be protected using TLS. When an application no longer needs to be connected to the server, it should gracefully close its AMQP 0-9-1 connection instead of abruptly closing the underlying TCP connection.

大概意思就是: AMQP 0-9-1 一般是一个 TCP 的长链接,当应用程序不再需要连接到服务器时,应该正常关闭 AMQP 0-9-1 连接而不是关闭 TCP 连接。

官网对于Channel的解读:

Some applications need multiple connections to the broker. However, it is undesirable to keep many TCP connections open at the same time because doing so consumes system resources and makes it more difficult to configure firewalls. AMQP 0-9-1 connections are multiplexed with channels that can be thought of as "lightweight connections that share a single TCP connection".
Every protocol operation performed by a client happens on a channel. Communication on a particular channel is completely separate from communication on another channel, therefore every protocol method also carries a channel ID (a.k.a. channel number), an integer that both the broker and clients use to figure out which channel the method is for.
A channel only exists in the context of a connection and never on its own. When a connection is closed, so are all channels on it.

For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.

大概的意思就是: 一些应用需要同时创建多个连接到 broker 也就是 RabbitMQ 服务器上。然而因为防火墙的存在,很难同时创建多个连接。 AMQP 0-9-1 连接使用多个 channel 连接实现对单一 Connection 的复用。
客户端的每一个协议操作都发送在 channel 上。每个协议方法携带者 channel ID broker client 使用 channel ID 来确定方法对应的 channel 。因此实现 channel 之间的数据隔离。
channel 不能单独存在,仅存在 connection 上下文中。当 connection 关闭时, channel 也会关闭。
多线程/进程之间打开一个 channel 但不共享 channels 是很普遍的。

通道和并发注意事项(线程安全)

As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.

要避免一个反例, 为每一个发布的消息新建一个 channel ,开辟一个新的 channel 需要一个网络的往返,这种模式是很低效的。 channel 保持合理的存活时间。

It is possible to use channel pooling to avoid concurrent publishing on a shared channel: once a thread is done working with a channel, it returns it to the pool, making the channel available for another thread. Channel pooling can be thought of as a specific synchronization solution. It is recommended that an existing pooling library is used instead of a homegrown solution. For example, Spring AMQP which comes with a ready-to-use channel pooling feature.

可以使用 channel pool 来避免共享 channel 上并发发布:一旦一个线程使用完了 channel ,那么它将返回到 pool 中。其他线程便可使用这个 Channel 。线程池是一个解决方案,可以使用 Spring AMQP 线程池而不是自己开发。

总结: 频繁建立 TCP 连接和 channel 连接是消耗性能的,于是我们希望可以共享 connection 或者 channel 。达到连接的复用

Parameters: Connectionsize:int类型,Rabbitmqpool池连接的最大数 Channelsize:int类型,Rabbitmqpool池Channel的最大数 return:None # 单例保证唯一 class Rabbitmqpool: # 定义类属性 __instance = None __lock = threading.Lock() def __init__ (self, Connectionsize, Channelsize): self.maxConnectionsize = Connectionsize self.maxChannelsize = Channelsize self.nowConnectionsize = 0 self.nowChannelsize = 0 self.connectpool = {} self.channelpool = {} self.certdic = {} def __new__ (cls, Connectionsize, Channelsize): if not cls. __instance : cls. __instance = object. __new__ (cls) return cls. __instance function: 获取一个空闲Channel或者新建一个Channel Parameters: return: channel:channel cname:连接名 def get_channel(self): try : self. __lock .acquire() cname = "" channel = None # 在已存在键中查找空闲Channel for connectionname in self.connectpool: if len(self.channelpool[connectionname]) != 0: channel = self.channelpool[connectionname][-1 ] cname = connectionname self.channelpool[connectionname] = self.channelpool[connectionname][0:-1 ] print ( " 取出一个Channel " ) break # 如果没有找到空闲Channel,canme为"",则新建一个Channel if cname == "" : if self.nowChannelsize < self.maxChannelsize: # 从连接池返回一个连接的名字 if len(self.connectpool) != 0: cname = random.choice(list(self.connectpool)) # 根据名字拿到此连接,传入连接和Pool池创建Channel CreateChannel(self.connectpool[cname], self) # 得到一个新Channel channel = self.channelpool[cname][-1 ] self.channelpool[cname] = self.channelpool[cname][0:-1 ] print ( " 创建一个Channel " ) # 如果没有连接,则新建连接与channel else : if len(self.certdic) != 0: cert = random.choice(list(self.certdic)) cname = str(uuid.uuid4().int) print ( " 创建一个连接 " ) CreateConnection(str(self.certdic[cert][ " rabbitmq_host " ]), str(self.certdic[cert][ " rabbitmq_port " ]), str(self.certdic[cert][ " rabbitmq_virtual_host " ]), str(self.certdic[cert][ " rabbitmq_user " ]), str(self.certdic[cert][ " rabbitmq_password " ]), self, cname) CreateChannel(self.connectpool[cname], self) # 得到一个新Channel channel = self.channelpool[cname][-1 ] self.channelpool[cname] = self.channelpool[cname][0:-1 ] print ( " 创建一个Channel " ) else : print ( " 无法创建Channel,无连接凭证,不能创建连接! " ) else : print ( " 无法创建Channel,超过限制 " ) finally : self. __lock .release() return channel, cname def create_channel(self): try : self. __lock .acquire() if len(self.certdic) != 0: cert = random.choice(list(self.certdic)) cname = str(uuid.uuid4().int) print ( " 创建一个连接 " ) CreateConnection(str(self.certdic[cert][ " rabbitmq_host " ]), str(self.certdic[cert][ " rabbitmq_port " ]), str(self.certdic[cert][ " rabbitmq_virtual_host " ]), str(self.certdic[cert][ " rabbitmq_user " ]), str(self.certdic[cert][ " rabbitmq_password " ]), self, cname) CreateChannel(self.connectpool[cname], self) # 得到一个新Channel channel = self.channelpool[cname][-1 ] self.channelpool[cname] = self.channelpool[cname][0:-1 ] print ( " 创建一个Channel " ) return channel, cname else : print ( " 无法创建Channel,无连接凭证,不能创建连接! " ) return None, "" finally : self. __lock .release() def return_channel(self, channel, connectionname): try : self. __lock .acquire() self.channelpool[connectionname].append(channel) finally : self. __lock .release() def closepool(self): def delconnection(self, connectionname): try : self. __lock .acquire() if connectionname in self.connectpool: del self.connectpool[connectionname] self.nowConnectionsize = self.nowConnectionsize -1 self.nowChannelsize = self.nowChannelsize - len(self.channelpool[connectionname]) del self.channelpool[connectionname] finally : self. __lock .release() def get_certtemplate(self): return { " rabbitmq_host " : "" , " rabbitmq_port " : 5672, " rabbitmq_virtual_host " : "" , " rabbitmq_user " : "" , " rabbitmq_password " : "" } def addcert(self,cert): self.certdic[cert[ " rabbitmq_host " ]] = cert # 连接可以自己创建 class CreateConnection: def __init__ (self, rabbitmq_host, rabbitmq_port, rabbitmq_virtual_host, rabbitmq_user, rabbitmq_password, Rabbitmqpool, Connectionname = str(uuid.uuid4().int), heartbeat=60 ): if Rabbitmqpool.nowConnectionsize < Rabbitmqpool.maxConnectionsize: if Connectionname not in Rabbitmqpool.connectpool: self.rabbitmq_user = str(rabbitmq_user) self.rabbitmq_password = str(rabbitmq_password) self.rabbitmq_host = rabbitmq_host self.rabbitmq_port = rabbitmq_port self.rabbitmq_virtual_host = rabbitmq_virtual_host self.connectionname = Connectionname print (self.rabbitmq_user,self.rabbitmq_password,self.rabbitmq_host,self.rabbitmq_port,self.rabbitmq_virtual_host,self.connectionname) credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password) try : self.connection = pika.BlockingConnection( pika.ConnectionParameters( host = rabbitmq_host, port = rabbitmq_port, virtual_host = rabbitmq_virtual_host, heartbeat = heartbeat, credentials = credentials)) Rabbitmqpool.connectpool[Connectionname] = self Rabbitmqpool.nowConnectionsize += 1 if self.connectionname not in Rabbitmqpool.channelpool: Rabbitmqpool.channelpool[self.connectionname] = [] print ( " 创建连接: " , Connectionname) except Exception as e: print ( " 创建连接失败: " , e) else : print ( " 创建连接失败,此连接名已存在: " , Connectionname) else : print ( " 创建连接失败,连接池已满,无法创建连接池 " ) def get_connection(self): return self.connection class CreateChannel: def __init__ (self, Connection, Rabbitmqpool): Rabbitmqpool.channelpool[Connection.connectionname].append(Connection.get_connection().channel()) Rabbitmqpool.nowChannelsize += 1

我这里并没有增加过期时间:

rabbitmq_password
= "" rabbitmq_virtual_host = "" credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password) Pool = Rabbitmqpool(3, 20 ) cert = Pool.get_certtemplate() cert[ ' rabbitmq_host ' ] = rabbitmq_host cert[ ' rabbitmq_virtual_host ' ] = rabbitmq_virtual_host cert[ ' rabbitmq_user ' ] = rabbitmq_user cert[ ' rabbitmq_password ' ] = rabbitmq_password cert[ ' rabbitmq_port ' ] = rabbitmq_port Pool.addcert(cert)

发送消息代码

 try:
        c, cname = Pool.get_channel()
        c.basic_publish(exchange='',
                        routing_key='队列名',
                        body=str(data),
        Pool.return_channel(c, cname)
    except Exception as e:
        print("发送错误:",e) #链接过期
        Pool.delconnection(cname) #channel过期时,删除此链接和此链接下的所有channel
        c, cname = Pool.create_channel() #创建一个新的链接和channel
        c.basic_publish(exchange='',
                        routing_key='队列名',
                        body=str(senddata),
        Pool.return_channel(c, cname)

refer:

https://www.jianshu.com/p/24e541170ace