zeromq
ZeroMQ只是一个库,要想在Python中使用需要下载相应的模块python3-zmq,下面直接介绍它的3中基本的模式。
1.request-Reply
serverimport zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
# Wait for next request from client
message = socket.recv()
print("Received request: ", message.decode('utf-8'))
time.sleep(1) # Do some 'work'
# Send reply back to client
socket.send_string("World")
clientimport zmq
context = zmq.Context()
subscriber = context.socket(zmq.REQ)
subscriber.connect("tcp://localhost:5555")
for request in range(1, 10):
print("Sending request ", request, "...")
subscriber.send_string("Hello")
# Get the reply.
message = subscriber.recv()
print("Received reply ", request, "[", message.decode('utf-8'), "]")
- 服务端绑定端口,准守格式protocal://interface:port,protocal定义了数据传输时的协议,例中使用tcp协议(一般是tcp, udp,不能使用http,因为http是应用层的协议,socket位于应用层和运输层之间,应该使用socket下一层的协议),interface指ip地址,例中用 '*' 号代表了本机所有IP地址(多宿主主机有多个IP地址)
- 客户端连接服务器绑定的端口,用于和服务端之间相互传输数据,为什么服务端用bind, 客户端用connect,stackoverflow上有个回答做了很好的解释:https://stackoverflow.com/questions/27014955/socket-connect-vs-bind
- socket之间传递的是byte数据,接收到data后使用utf-8解码查看,发送字符串需使用send_string接口
2. Publish-Subscribe
server
# -*- coding=utf-8 -*-
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
i = 1
while True:
print('发送消息')
socket.send_string("消息群发" + str(i))
time.sleep(1)
i += 1
client1
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt_string(zmq.SUBSCRIBE, '') # 消息过滤
while True:
response = socket.recv() # 消息发送是以bytes形式,接收后需要解码
print("response: %s" % response.decode('utf-8'))
client2
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt_string(zmq.SUBSCRIBE, '')
while True:
response = socket.recv();
print("response: %s" % response.decode('utf-8'))
- 服务端socket类型为zmq.PUB, 客户端为zmq.SUB。
- 服务端逻辑独立,无论有没有客户端订阅,都会按照自己的逻辑发送广播
3. parallel Pipeline
server
# -*- coding=utf-8 -*-
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")
i = 1
while True:
socket.send_string("测试消息:" + str(i))
print("已发送")
time.sleep(1)
i += 1
worker1
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
recive = context.socket(zmq.PULL)
recive.connect('tcp://127.0.0.1:5557')
sender = context.socket(zmq.PUSH)
sender.bind('tcp://*:5558')
while True:
data = recive.recv()
print("正在转发..." + data.decode('utf-8'))
sender.send(data)
worker2
# -*- coding=utf-8 -*-
import zmq
context = zmq.Context()
recive = context.socket(zmq.PULL)
recive.connect('tcp://127.0.0.1:5557')
sender = context.socket(zmq.PUSH)
sender.bind('tcp://*:5559')
while True:
data = recive.recv()
print("正在转发..." + data.decode('utf-8'))
sender.send(data)
client1, client2, client3
# -*- coding=utf-8 -*-
import zmq
import sys
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://127.0.0.1:5558")
socket.connect("tcp://127.0.0.1:5559")
while True:
response = socket.recv()
print("response: %s" % response.decode('utf-8'))
- worker1, worker2分摊由server发出的消息,然后再分摊给client1, client2, client3,Push/Pull 的特点是无论是 Push 端还是 Pull 端都可以做 server,bind 到某个地址等待对方访问。如果我们在 Push 端绑定地址,那么这是一个 Push server,对应的 Pull clients 可以 connect 到这个 Push server 往外拉数据;反之,如果我们建立一个 Pull server,对应的 Push clients 可以 connect 到这个 Pull server 往里压数据。例中,worker1, worker2作为server端的Pull client, client端的Push Server。Server端只能有一个,因为server需要bind socket,而客户端只需要connect到server的socket就行
- server socket send数据时如果没有client socket连接,那么send将会阻塞,直到有client socket连接上
文章参考:
https://segmentfault.com/a/1190000012010573#articleHeader3
https://zhuanlan.zhihu.com/p/22947038
相关阅读
https://gitee.com/solym/ZeroMQ-Guide-Zh https://github.com/booksbyus/zguide ZMQ接口文档的官方网站 : http://api.zeromq.org