Examples¶
Publish-Subscribe¶
Here is an example of using txZMQ with publish and subscribe (examples/push_pull.py):
#!env/bin/python
"""
Example txzmq client.
examples/pub_sub.py --method=bind --endpoint=ipc:///tmp/sock --mode=publisher
examples/pub_sub.py --method=connect --endpoint=ipc:///tmp/sock --mode=subscriber
"""
import os
import sys
import time
from optparse import OptionParser
from twisted.internet import reactor
rootdir = os.path.realpath(os.path.join(os.path.dirname(sys.argv[0]), '..'))
sys.path.append(rootdir)
os.chdir(rootdir)
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPubConnection, ZmqSubConnection
parser = OptionParser("")
parser.add_option("-m", "--method", dest="method", help="0MQ socket connection: bind|connect")
parser.add_option("-e", "--endpoint", dest="endpoint", help="0MQ Endpoint")
parser.add_option("-M", "--mode", dest="mode", help="Mode: publisher|subscriber")
parser.set_defaults(method="connect", endpoint="epgm://eth1;239.0.5.3:10011")
(options, args) = parser.parse_args()
zf = ZmqFactory()
e = ZmqEndpoint(options.method, options.endpoint)
if options.mode == "publisher":
s = ZmqPubConnection(zf, e)
def publish():
data = str(time.time())
print "publishing %r" % data
s.publish(data)
reactor.callLater(1, publish)
publish()
else:
s = ZmqSubConnection(zf, e)
s.subscribe("")
def doPrint(*args):
print "message received: %r" % (args, )
s.gotMessage = doPrint
reactor.run()
The same example is available in the source code. You can run it from the checkout directory with the following commands (in two different terminals):
examples/pub_sub.py --method=bind --endpoint=ipc:///tmp/sock --mode=publisher
examples/pub_sub.py --method=connect --endpoint=ipc:///tmp/sock --mode=subscriber
Push-Pull¶
Example for push and pull socket is available in examples/push_pull.py.
#!env/bin/python
"""
Example txzmq client.
examples/push_pull.py --method=bind --endpoint=ipc:///tmp/sock
--mode=push
examples/push_pull.py --method=connect --endpoint=ipc:///tmp/sock
--mode=pull
"""
import os
import socket
import sys
import time
import zmq
from optparse import OptionParser
from twisted.internet import reactor
rootdir = os.path.realpath(os.path.join(os.path.dirname(sys.argv[0]), '..'))
sys.path.insert(0, rootdir)
os.chdir(rootdir)
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection, ZmqPullConnection
parser = OptionParser("")
parser.add_option("-m", "--method", dest="method", help="0MQ socket connection: bind|connect")
parser.add_option("-e", "--endpoint", dest="endpoint", help="0MQ Endpoint")
parser.add_option("-M", "--mode", dest="mode", help="Mode: push|pull")
parser.set_defaults(method="connect", endpoint="ipc:///tmp/txzmq-pc-demo")
(options, args) = parser.parse_args()
zf = ZmqFactory()
e = ZmqEndpoint(options.method, options.endpoint)
if options.mode == "push":
s = ZmqPushConnection(zf, e)
def produce():
data = [str(time.time()), socket.gethostname()]
print "producing %r" % data
try:
s.push(data)
except zmq.error.Again:
print "Skipping, no pull consumers..."
reactor.callLater(1, produce)
reactor.callWhenRunning(reactor.callLater, 1, produce)
else:
s = ZmqPullConnection(zf, e)
def doPrint(message):
print "consuming %r" % (message,)
s.onPull = doPrint
reactor.run()