zeromq,zero , How do I add a pipeline to a REQ-REP in ZeroMQ?

How do I add a pipeline to a REQ-REP in ZeroMQ?


Tag: zeromq,zero

I am experimenting with ZeroMQ where I want to create a server that does :


I want to sequentially receives data query requests, push it through a inproc pipeline to parallelise the data query and the sink merges the data back. After the sink merges the data together, the sink sends the merged data as the reply back to the request.

Is this possible? How would it look? I am not sure if the push/pull will preserve client's address for the REP socket to send back to.


Assuming that each client has only a single request out at any one time.

Is this possible?

Yes, but with different socket types.

How would it look?

(in C)

What you may like to do is shift from a ZMQ_REP socket on the external server socket to a ZMQ_ROUTER socket. The Router/Dealer sockets have identities which can allow you to have multiple requests in your pipeline and still respond correctly to each.

The Asynchronous Client/Server Pattern:

The only hitch in this is that you will need to manage the multiple parts of the ZMQ message. The first part is the identity. Second is null. Third is the data. As long as you REPLY in the same order as the REQUEST the identity will guide your response's data to the correct client. I wrapped my requests in a struct:

struct msg {
   zmq_msg * identity;
   zmq_msg * nullMsg;
   zmq_msg * data;

Make sure to use zmq_msg_more when receiving messages and set the more flag when sending correctly.

I am not sure if the push/pull will preserve client's address for the REP socket to send back to.

You are correct. A push pull pattern would not allow for specifying of the return address between multiple clients.


Using process instead of thread with zeromq

I'm reading this code But when I've tried to replace threading.Thread by multiprocessing.Process I got the error Assertion failed: ok (mailbox.cpp:84) Code is import time import threading import zmq def worker_routine(worker_url, context=None): """Worker routine""" context = context or zmq.Context.instance() # Socket to talk to dispatcher socket = context.socket(zmq.REP) socket.connect(worker_url)...

0 not an [int] in PowerShell?

My understanding is that (mathematically) 0 is an Int. And yet if (0 -as [int]){"Int"}else{"Not"} returns Not, at least for me in PS 2.0. Is this a bug in v2, or am I misunderstanding things? I have worked around the issue by testing for -as [int] -and -ne 0, but...

TCP tunneling through Linux tap device

I've created a tap0 device (IP, and am using zeromq's pgm pub/sub (e.g. pgm://; to transport Ethernet frames from the tap to the zmq subscribers and vice versa. The idea is to create a virtual network using pgm. I have 2 tap hosts on the network:, They...

Akka clustering conflicts

The Akka doc talks about a variety of seemingly inter-related Akka technologies without distinguishing much between them: Akka Networking Akka Remoting Akka Clustering The Akka ZeroMQ module My understanding is that "Akka Networking" is simply a module/lib that gives Akka the ability to speak to remote actor systems over TCP....

ZeroMQ word count app gives error when you compile in spark 1.2.1

I'm trying to setup zeromq data stream to spark. Basically I took the ZeroMQWordCount.scala app an tried to recompile it and run it. I have zeromq 2.1 installed, and spark 1.2.1 here is my scala code: package org.apache.spark.examples.streaming import import import akka.zeromq._ import akka.zeromq.Subscribe import akka.util.ByteString import org.apache.spark.streaming.{Seconds,...

Reason for zmq error code 156384763

I am using zmq req/rep pattern communication. The implementation is pretty simple, the req sends some data and waits on recv. The rep receives the data, process and reply back. //REQ zmq_connect zmq_send zmq_recv //blocking zmq_close //REP zmq_bind while(true) { while(data_received) { //miscellaneous process zmq_recv //non-blocking Print zmq_error_no if zmq_recv...

Asynchronous Client/Server pattern in Python ZeroMQ

I have 3 programs written in Python, which need to be connected. 2 programs X and Y gather some information, which are sent by them to program Z. Program Z analyzes the data and send to program X and Y some decisions. Number of programs similar to X and Y...

java convert string to int and remove trailing zeros

Hi everyone I am currently trying to convert a string to an arrayList in reverse and then remove any trailing zeros eg "001203" converts to (3,0.2,1,0,0) and then (3,0,2,1) my code is currently public convert(String nums) { List = new ArrayList<Integer>(); // create arraylist for (int i = nums.length(); i...

How to make my cheque generator output exactly what it is instead of “Zero” in C

My Cheque generator program has worked flawlessly for any input you give it to make it output the numerals in words. for example if I were to input "1234.56" it will out put "One Thousand Two Hundred Thirty Four Dollars and ... 56 Cents". However whenever I want to output...

with height in percentage has 0 height when inside table-cell in FF and IE

I want two vertically stacked empty divs (to display background images) to share exactly 50% of the height of the parent container minus a fixed gap (e.g. 20px). I'm using calc() for this - as Opera Mini doesn't matter in my case. I got this working fine on all browsers:...

python, zeromq, how to abort context.socket.recv() the right way?

I have a small software where I have a separate thread which is waiting for ZeroMQ messages. I am using the PUB/SUB communication protocol of ZeroMQ. Currently I am aborting that thread by setting a variable "cont_loop" to False. But I discovered that, when no messages arrive to the ZeroMQ...

Initializing ZeroMQ 2.2 message in “almost always auto” style wants to use private constructor

I have written a C++11 program which uses ZeroMQ. In one particular line I want to create a new message as a local variable and initialize it with the size of a vector called serialized, using the "almost always auto"-style syntax: auto zm = zmq::message_t {serialized.size()}; This compiles fine on...

VS2013 Error: LNK2019 When trying to build ZeroMQ server

I'm trying to build this simple ZeroMQ server in C++ on Visual Studio 2013. #include "stdafx.h" #include "zmq.hpp" #include <string> #include <iostream> #include <windows.h> using namespace std; int _tmain(int argc, _TCHAR* argv[]) { // Prepare context and socket zmq::context_t ctx(1); zmq::socket_t sckt(ctx, ZMQ_REP); sckt.bind("tcp://*:5555"); while (true) { zmq::message_t request; //...

Using ZeroMQ or other MQ to send objects [closed]

I am wondering how I can send objects via a message queue. My question is two-fold: (1) are there any message queues like ZeroMQ that support Java objects and JSON out of the box? (2) are there any message queues that don't require you to serialize/deserialize objects on both ends?...

How to choose zmq maximum buffer size properly?

According to ZMQ-manual The ZMQ_SNDBUF option shall retrieve the underlying kernel transmit buffer size for the specified socket. A value of zero means that the OS default is in effect. For details refer to your operating system documentation for the SO_SNDBUF socket option. How can I get it in a...

python 3.4 string manipulation - truncating leading zeros

There are a number of existing posts on this topic, but I cannot get this simple piece of code to work correctly. I have tried many many times with no success... per = 5 if per < 10 == True: ms = 'fwd_%sd' % str(per).zfill(2) else: ms = 'fwd_%sd' %...

Is MemoryMappedFile.CreateNew(…) guaranteed to create a file with zeroes?

As the title says; Can I rely on MemoryMappedFile to always create a file full of zeroes? I've tested by reading some newly created files but this "testing" seems futile, altough I have yet to see a non-zero byte. Is there any documentation on this that I have missed, or...

IObservable with NetMQ receive

I'm trying to write a typical stock trading program, which receives stock tickers/orders/trades from netmq, turn the streams into IObservable, and show them on a WPF frontend. I try to use async/await with NetMQ blocking ReceiveString (suppose I am expecting some string input) so that the ReceiveString loop wouldn't block...

Serializing Protobuf Object and Sending with ØMQ/ZMQ

I have a protobuf object that I am sending from a C# application (using clrZmq) to a C++ service (using the zmq C++ bindings) on a local machine (for testing). I attempt to send my object from C# using the following Taurus.Odds odds = Util.GetFakeOdds(); using (var context = ZmqContext.Create())...

Python GUI not capturing zeromq message

I have a small program where I am not receiving a response from the server. This is using python 3.4 and latest zeromq and pyqt5 on Ubuntu 14.04. The client GUI sends a message to the server which gets it and responds but the client does not see the...

How does ZeroMQ connect and bind work internally

I am experimenting with ZeroMQ. And I found it really interesting that in ZeroMQ, it does not matter whether either connect or bind happens first. I tried looking into the source code of ZeroMQ but it was too big to find anything. The code is as follows. # client side...

MATLAB imagesc: Use black as mid-color for value 0

I want to use imagesc to plot a quantity where all of positive, negative, and close-to-zero values are important. I want the close-to-zeros to be black, but there is no default colormap that provides such a feature. For example, using colormap(hot) assigns black to the lowest (in this case: negative)...

ZMQ Pattern Dealer/Router HeartBeating

I have a Dealer socket in client side, who is connected to Router socket in server side. I often see Heartbeating mechanism : the server regularly send message to the client in order that client knows if he is correctly connect to the server, so the client can reconnect if...

MessagePack and datetime

I need a fast way to send 300 short messages a second over zeromq between the python multiprocessing processes. Each message needs to contain an ID and time.time() Msgpack seems like the best way to serialize the dict before sending it via zeromq, and conveniently, msgpack has an example of...

Is it possible to differentiate between 0 and -0?

I know that the integer values 0 and -0 are essentially the same. But, I am wondering if it is possible to differentiate between them. For example, how do I know if a variable was assigned -0? bool IsNegative(int num) { // How ? } int num = -0; int...

Node.js socket.send( ) functions failing to complete before exit

In some Node.js scripts that I have written, I notice that even if the last line is a synchronous call, sometimes it doesn't complete before Node.js exits. I have never seen a console.log statement fail to run/complete before exiting, but I have seen some other statements fail to complete before...

Gathering distributed data into central database

I was assigned to update existing system of gathering data coming from points of sale and inserting it into central database. The one that is working now is based on FTP/SFTP transmission, where the information is sent once a day, usually at night. Unfortunately, because of unstable connection links (low...

ZMQ pub/sub subscribe

I am having trouble figuring out how to subscribe to a particularly "channel" with ZMQ with regard to its pub/sub functionality. Here is the publisher: var zmq = require('zmq'); var pub = zmq.socket('pub'); pub.bindSync('tcp://'); setInterval(function(){ pub.send('pub msg'); },500); here is the subscriber: var sub = zmq.socket('sub'); sub.connect('tcp://'); sub.subscribe(''); //herein lies...

ZeroMQ pattern for load balancing work across workers based on idleness

I have a single producer and n workers that I only want to give work to when they're not already processing a unit of work and I'm struggling to find a good zeroMQ pattern. 1) REQ/REP The producer is the requestor and creates a connection to each worker. It tracks...

ZeroMQ choose recipient

I'm new to ZeroMQ (and to networking in general), and have a question about using ZeroMQ in a setup where multiple clients connect to a single server. My situation is as follows: --1 server --multiple clients --Clients send messages to server: I've already figured out how to do this part....

ZeroMQ pattern for multiple asynchronous requests to single endpoint

I'm using zmq to develop a distributed application having the following network topology: a client node that initiates a request and a server node that replies to requests. Since the client is a node.js application I can't block after a send call to wait the response, so the scenario is...

Ros Navigation Stack - Issue with zero and negative goal values

I’m following the example of with navigation stack. If I set the goal position with positive values the turtle moves as expected, but If it’s set to zero or negative values the turtle rotates in the same place, and It returns the message: “[ERROR] [1433622909.572352221]: Aborting because a valid...

c++ gives Segmentation Fault from text file of all 0's

I'm writing a really simple code (or so I thought) that requires 2 text files to be read in. One is full of a bunch of data points (of type double) and the other one happens to be a bunch of 0's (this is only SOMETIMES filled with 0's, sometimes...

Request Aggregator / Middle-tier design pattern for costly requests

I'm working on a program that will have multiple threads requiring information from a web-service that can handle requests such as: "Give me [Var1, Var2, Var3] for [Object1, Object2, ... Object20]" and the resulting reply will give me a, in this case, 20-node XML (one for each object), each node...

Displaying a zero

I'm filling a template with some values being numbers, using str_replace. When a number is 0 (zero), PHP thinks it's false and write nothing. I tried every solution found on SO and the web : intval($myZeroVar) floatval($myZeroVar) strval($myZeroVar) (string)$myZeroVar ''.$myZeroVar random tests using number_format, sprintf, str_pad... NONE of these are...

python fuction finding root( or zero) with minimum distance from real root epsilon

So its and exercises for python i am totally stuck! you have a random Function in [a,b] you already know that the a is negative and b is positive and it has only ONE root. The true root is : -0.94564927392359 and you have to make a def that will...

How ZMQ works between 2 different machines

I am not sure I understand fundamentally how ZMQ (or any message queue) knows how to communicate between two servers that otherwise don't know anything about each other. For example, using the request/reply pattern: the requester will bind to a host and port like so: var requester = zmq.socket("req"); requester.bind('tcp://*:5555'),...

zeromq.node setsockopt throws invalid argument error

This code... var zmq = require('zmq'); var req = zmq.socket('req'); req.setsockopt('hwm', 10); // or req.setsockopt(zmq.ZMQ_HWM, 10); ...throws this error: /Users/.../node_modules/zmq/lib/index.js:246 this._zmq.setsockopt(opts[opt] || opt, val); ^ Error: Invalid argument at Socket.setsockopt (/Users/.../node_modules/zmq/lib/index.js:246:13) ps. bonus point, is there a way to do something like this zmq.socket('req', {'hwm': 10}) ? zeromq.node 2.11.1 &...

Understanding ZeroMQ

So as I have asked in a previous post, I want to be able to make programs or functions written in different languages to communicate between them. I have come across zeromq recently and I'm trying to figure out whether or not this is something that could help me since...

Can you create a centralized topic in ZeroMQ?

I would really like to try ZeroMQ and I am wondering whether my problem can be solved using it. THE PROBLEM: I have multiple subscribers and multiple publishers. In a centralized broker architecture the publishers would publish a message to a topic (kind of like multicast address) and the subscribers...

How to use ProtoBuf extensions to get polymorphism in C++

I am trying to define a common basic message which defines the type of the message (for easier parsing) and is then extended with the actual message. The messages will be used in an RPC way. My .proto file syntax = "proto2"; package userapi; // wrapper for commands message Command...

Listening for connect/connection events within ZMQ request-reply pattern

In the replier or 'server' of the ZMQ request-reply pattern, I would like to listen to requestors connecting to my replier/server. I have this code: var zmqConfig = {...}; var replier = zmq.socket('rep'); var address = 'tcp://'.concat(':').concat(zmqConfig.port); replier.bind(address, function (err) { if (err) { } }); replier.on('message', function () {...

What open ports are required on firewall to allow for salt-stack remote execution?

The documentation on saltstack appears to be unclear regarding what ports are required from the salt-master -> salt-minion (apparently none are required). It suggests that ports only need to be opened from the salt-minion -> salt-master. (See: If however commands are executed remotely on the salt-master targeted to a...

Multi core ZeroMQ?

ZeroMQ is used for receiving input parameters.. def server(): rep = context.socket(zmq.REP) rep.bind('tcp://*:{}'.format(PORT)) while True: data = rep.recv_json() result = calculate(data) rep.send_json(result) The calculation method is called calculate, after finished, result would be sent to client through ZMQ. Base on my test, it currently uses only 1 core of the...

PyZMQ and Django: connection from different view

I am writing a Django web application where the user can do different operations from every view and when the user submit the form a JSON is pushed to ZeroMQ and waits for a response (REQ-REP). My problem is that the operations are too slow. I don't know if the...

How to setup a ZMQ PUB/SUB pattern to serve only for pre-authorized subscriber(s)

How can I implement or do kind of "hack" in PUB-SUB pattern to get an ability to publish only to authorized subscribers, disconnect unauthorized subscribers etc? I googled for this problem, but all the answers very similar to set subscribe filter in subscriber side. But I want, as I said,...

error installing zmq (ZeroMQ)

I am trying to install node module zmq I have referred to this However i get this error while installing the module 0 info it worked if it ends with ok 1 verbose cli [ 'C:\\Program Files (x86)\\nodejs\\\\node.exe', 1 verbose cli 'C:\\Program Files (x86)\\nodejs\\node_modules\\npm\\bin\\npm-cli.js', 1 verbose cli 'install', 1...

ZMQ: No subscription message on XPUB socket for multiple subscribers (Last Value Caching pattern)

I implemented the Last Value Caching (LVC) example of ZMQ (, but can't get a 2nd subscriber to register at the backend. The first time a subscriber comes on board, the event[0] == b'\x01' condition is met and the cached value is sent, but the second subscriber (same topic) doesn't...

ZeroMQ: Executing request in serial fashion on REQ-Router socket

I have built front end with Router sockets-- that forwards the request to REQ-REP Thread like in example But I have only one worker. So in order to process I need to wait untill REQ socket get free. But I cannot figure out a way to wait untill one...

0MQ req wait for rep in Node.js

I'm new to javascript, Node.js, and 0MQ, so n00b * 3 here. I want to set up a simple request and reply, but I want the client to wait for a response before sending out the next request. The zguide goes over this, but the Node.js version does not behave...