gRPC 是可以在任何環(huán)境中運(yùn)行的現(xiàn)代開源高性能 RPC 框架。它可以通過可插拔的支持來有效地連接數(shù)據(jù)中心內(nèi)和跨數(shù)據(jù)中心的服務(wù),以實(shí)現(xiàn)負(fù)載平衡,跟蹤,運(yùn)行狀況檢查和身份驗(yàn)證。它也適用于分布式計(jì)算的最后一英里,以將設(shè)備,移動(dòng)應(yīng)用程序和瀏覽器連接到后端服務(wù)。
用Python編寫簡單的gRPC服務(wù)
grpc官網(wǎng)python參考:https://www.grpc.io/docs/languages/python/quickstart/
http://grpc.github.io/grpc/python/grpc.html
- Python 3.5 or higher
- pip version 9.0.1 or higher
安裝gRPC相關(guān)的庫
grpcio-tools主要用根據(jù)我們的protocol buffer定義來生成Python代碼,官方解釋是Protobuf code generator for gRPC。
#apt install python3-pip
pip install grpcio
pip install protobuf
pip install grpcio_tools
編寫proto文件
proto是一個(gè)協(xié)議文件,客戶端和服務(wù)器的通信接口正是通過proto文件協(xié)定的,可以根據(jù)不同語言生成對應(yīng)語言的代碼文件。
heartbeat.proto文件:
syntax = "proto3";
message HeartbeatRequest{
string Host = 1;
int32 Mem = 2;
int32 Disk = 3;
int32 Cpu = 4;
int64 Timestamp = 5;
int64 Seq = 6;
}
message HeartbeatResponse{
int32 ErrCode = 1;
string ErrMsg = 2;
}
heartbeat_service.proto
syntax = "proto3";
import "heartbeat.proto";
// HeartBeatService
service HeartBeatService{
rpc HeartBeat(HeartbeatRequest) returns(HeartbeatResponse){}
}
核心 就是一個(gè) 用于生成需要用到數(shù)據(jù)類型的文件;一個(gè)就是用于生成相關(guān)調(diào)用方法的類。 一個(gè)定義數(shù)據(jù)類型,一個(gè)用于定義方法。
通過proto生成.py文件
proto文件需要通過protoc生成對應(yīng)的.py文件。protoc的下載地址 。下載解壓之后,將解壓目錄添加到path的環(huán)境變量中。
pip install grpcio
install grpcio-tools
#pip install --upgrade protobuf
注意:【下面命令是在proto文件所在的目錄執(zhí)行的,-I 用來指定proto的目錄是 . 】
python -m grpc_tools.protoc -I=. --python_out=.. heartbeat.proto
python -m grpc_tools.protoc -I=. --grpc_python_out=.. heartbeat_service.proto
- -I 指定proto所在目錄
- -m 指定通過protoc生成py文件
- –python_out生成py文件的輸出路徑
- heartbeat.proto、heartbeat_service.proto為 輸入的proto文件
- 生成的文件名中 xxx_pb2.py 就是我們剛才創(chuàng)建數(shù)據(jù)結(jié)構(gòu)文件,里面有定義函數(shù)參數(shù)和返回?cái)?shù)據(jù)結(jié)構(gòu); xxx_pb2_grpc.py 就是我們定義的函數(shù),定義了我們客服端rpc將來要調(diào)用方法。
編譯客戶端和服務(wù)端代碼
服務(wù)端
#!/usr/bin/env python
# coding=utf-8
import sys
from concurrent import futures
import time
import grpc
from google.protobuf.json_format import MessageToJson
import heartbeat_service_pb2_grpc
import heartbeat_pb2
from lib.core.log import LOGGER
class HeartBeatSrv(heartbeat_service_pb2_grpc.HeartBeatServiceServicer):
def HeartBeat(self, msg, context):
try:
# LOGGER.info(MessageToJson(msg, preserving_proto_field_name=True))
body = MessageToJson(msg, preserving_proto_field_name=True)
LOGGER.info("Get Heartbeat Request: %s", body)
response = heartbeat_pb2.HeartbeatResponse()
response.ErrCode = 0000
response.ErrMsg = "success"
return response
except Exception as e:
print("exception in heartbeat")
LOGGER.error("RPC Service exception: %s", e)
response = heartbeat_pb2.HeartbeatResponse()
response.ErrCode = 500
response.ErrMsg = "rpc error: %s" % e
return response
def server(host, rpc_port):
# 這里通過thread pool來并發(fā)處理server的任務(wù)
# 定義服務(wù)器并設(shè)置最大連接數(shù),concurrent.futures是一個(gè)并發(fā)庫,類似于線程池的概念
grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=100))
# 不使用SSL
grpc_server.add_insecure_port('[::]' + ':' + str(rpc_port))
# This method is only safe to call before the server is started.
# 綁定處理器HeartBeatSrv(自己實(shí)現(xiàn)了處理函數(shù))
heartbeat_service_pb2_grpc.add_HeartBeatServiceServicer_to_server(HeartBeatSrv(), grpc_server)
# 該方法只能調(diào)用一次, start() 不會(huì)阻塞
# 啟動(dòng)服務(wù)器
grpc_server.start()
LOGGER.info("server start...")
while 1:
time.sleep(10)
#grpc_server.wait_for_termination()
def main():
try:
LOGGER.info("begin start server")
rpc_port = 8090
host = "::"
server(host, rpc_port)
except Exception as e:
LOGGER.error("server start error: %s", e)
time.sleep(5)
if __name__ == '__main__':
LOGGER.info(sys.path)
main()
客戶端
from time import sleep
import grpc
import heartbeat_pb2
import heartbeat_service_pb2_grpc
from lib.core.log import LOGGER
def run(seq):
option = [('grpc.keepalive_timeout_ms', 10000)]
#
with grpc.insecure_channel(target='127.0.0.1:8090', options=option) as channel:
# 客戶端實(shí)例
stub = heartbeat_service_pb2_grpc.HeartBeatServiceStub(channel)
# stub調(diào)用服務(wù)端方法
response = stub.HeartBeat(heartbeat_pb2.HeartbeatRequest(Host='hello grpc', Seq=seq), timeout=10)
LOGGER.info("response ErrCode:%s", response.ErrCode)
if __name__ == '__main__':
for i in range(1, 10000):
LOGGER.info("i: %s", i)
sleep(3)
run(i)
參考
使用Python實(shí)現(xiàn)gRPC通信
參考URL: https://zhuanlan.zhihu.com/p/363810793
python grpc搭構(gòu)服務(wù)
https://www.jianshu.com/p/10d9ca034567
python grpc 服務(wù)端和客戶端調(diào)用demo
參考URL: https://blog.csdn.net/qq_42363032/article/details/115282405
到此這篇關(guān)于用Python編寫簡單的gRPC服務(wù)的文章就介紹到這了,更多相關(guān)Python gRPC服務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!