添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
gRPC在Python的异步非阻塞实现方式

gRPC在Python的异步非阻塞实现方式

前言

之前写过两篇文章讲述了RPC服务的概念和gRPC的基本使用、proto语法、TLS认证、异常处理和重连重试等等的教程。两篇文章传送入口: 1. gRPC基本使用教程 2. gRPC身份认证与流式通信

当我们真正把gRPC服务部署到生产环境上的时候,除了上诉这些基本使用和安全认证之外,我们还要考虑性能问题。本文主要探讨一下如何解决在请求并发数较高时如何保证gRPC服务的性能。

问题&分析

问题阐述

  1. 在生产环境中,api的gRPC客户端在连接和调用gRPC服务端的时候出现性能瓶颈,尤其是api服务在QPS陡增的时候,服务实例数扩容到了最大值,仍然有一些请求出现502,即由于阻塞gRPC请求导致请求队列中的一些请求响应返回给负载均衡器的时间超过了负载均衡器的超时时限;
  2. 定时脚本在连接和调研gRPC服务端时也是阻塞式请求,处理效率低。

原因分析

Python由于“大家都懂”的 GIL 问题,在使用多线程时CPU操作是无法分配在多核进行的,故而导致很多框架多线程实际上是阻塞的,但是在进行一些机器I/O或者网络I/O请求时,python的多线程或者协程是可以不阻塞的,也就是说GIL锁限制的是CPU密集型的服务的性能,而不会限制I/O密集型的服务性能。所以我们在使用gRPC这种网络I/O服务的时候,也可以达到非阻塞的效果。本文的问题即是如何让gRPC服务达到非阻塞的效果。

解决方案

由于 asyncio 的引入,使得python目前的协程得到较为完善的补充。在进行I/O操作时可以采用协程的方式,不占用I/O等待时间,让协程去处理I/O请求,主进程可以继续监听下一个请求,等协程的请求回调了再交付给主进程继续执行下去,以达到提高性能的目的,这就是异步非阻塞( aio )的方式。 之前的文章讲述过一些常用的支持aio的库如aiohttp、aiomysql等等,本文我们采用的是aio版本的gRPC库,目前在pypi上的 grpcio 库,1.35.0版本开始已经可以支持asyncio的方式,GitHub链接: github.com/grpc/grpc

完整gRPC asyncio api文档: grpc.github.io/grpc/pyt 我们分别通过在服务端和客户端来实践一下gRPC的aio实现:

服务端

原服务端实现方式

import os
import sys
import time
import grpc
import asyncio
from concurrent import futures
async def start_server():
    # start rpc service
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=40), options=[
        ('grpc.max_send_message_length', 100 * 1024 * 1024),
        ('grpc.max_receive_message_length', 100 * 1024 * 1024),
        ('grpc.enable_retries', 1),
    xxx_pb2_grpc.add_xxx_to_server(<rpc_function>, server) # 加入服务
    server.add_insecure_port('[::]:50051')
    server.start()
    # since server.start() will not block,
    # a sleep-loop is added to keep alive
    try:
        while True:
            time.sleep(86400)
    except KeyboardInterrupt:
        server.stop(0)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([start_server()]))
    loop.close()

aio的服务端实现方式:

import os
import sys
import time
import grpc
from grpc.experimental import aio
import asyncio
from concurrent import futures
async def start_server():
    # start rpc service
    server = aio.server(futures.ThreadPoolExecutor(max_workers=40), options=[
        ('grpc.so_reuseport', 0),
        ('grpc.max_send_message_length', 100 * 1024 * 1024),
        ('grpc.max_receive_message_length', 100 * 1024 * 1024),
        ('grpc.enable_retries', 1),
    xxx_pb2_grpc.add_xxx_to_server(<rpc_function>, server)  # 加入服务
    server.add_insecure_port('[::]:50051')
    await server.start()
    # since server.start() will not block,
    # a sleep-loop is added to keep alive
    try:
        await server.wait_for_termination()
    except KeyboardInterrupt:
        await server.stop(None)
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([start_server()]))
    loop.close()

采用aio的实现方式,只需要使用1.35.0版本及以上的grpcio库,在创建服务端的时候将grpc.server()改为aio.server(),服务启动的类方法采用async和await的方式即可完成。

注:服务端开启了aio的支持,是既可以支持阻塞式的客户端请求,也支持客户端非阻塞式请求的,所以对于旧版本客户端是直接兼容的

客户端

服务端增加了非阻塞式请求的支持,客户端则可以采用阻塞和非阻塞的方式来调用:

异步非阻塞方式

import grpc
import sys
import os
from . import rpc_config
class RpcClient(object):
    # rpc_client = {}
    rpc_client = None
    @staticmethod
    def get_rpc_channel(host, port):
        options = rpc_config.RPC_OPTIONS
        # OPTIONS配置可根据需要自行设置:
        #RPC_OPTIONS = [('grpc.max_send_message_length', 100 * 1024 * 1024),
        #       ('grpc.max_receive_message_length', 100 * 1024 * 1024),
        #       ('grpc.enable_retries', 1),
        #       ('grpc.service_config',
        #        '{"retryPolicy": {"maxAttempts": 4, "initialBackoff": "0.1s", '
        #        '"maxBackoff": "1s", "backoffMutiplier": 2, '
        #        '"retryableStatusCodes": ["UNAVAILABLE"]}}'),
        #       ]
        channel = grpc.insecure_channel("{}:{}".format(host, port),
                                        options=options)
        return channel
    def load_sub_rpc(self, platform, host, port, db_type):
        function return rpc instance
        :param platform
        :param host
        :param port
        :param db_type
        :return: instance
        channel = self.get_rpc_channel(host, port)
        stub = xxx_pb2_grpc.xxxStub(channel)
        return stub

同步阻塞方式

import grpc
import sys
import os
from . import rpc_config
class RpcClient(object):
    rpc_client = {}
    @staticmethod
    def get_rpc_channel(host, port, is_aio=False):
        options = rpc_config.RPC_OPTIONS
        # OPTIONS配置可根据需要自行设置:
        #RPC_OPTIONS = [('grpc.max_send_message_length', 100 * 1024 * 1024),
        #       ('grpc.max_receive_message_length', 100 * 1024 * 1024),
        #       ('grpc.enable_retries', 1),
        #       ('grpc.service_config',
        #        '{"retryPolicy": {"maxAttempts": 4, "initialBackoff": "0.1s", '
        #        '"maxBackoff": "1s", "backoffMutiplier": 2, '
        #        '"retryableStatusCodes": ["UNAVAILABLE"]}}'),
        #       ]
        if is_aio:
            channel = grpc.aio.insecure_channel("{}:{}".format(host, port),
                                                options=options)
        else:
            channel = grpc.insecure_channel("{}:{}".format(host, port),
                                            options=options)
        return channel
    def load_sub_rpc(self, platform, host, port, db_type, is_aio=False):
        function return rpc instance
        :param platform
        :param host
        :param port
        :param db_type
        :param is_aio