Messaging services with nodejs Part 1

Get to know how to use messaging services using nodejs. If you aren’t familiar with nodejs, I suggest that to read our post about first step with nodeJs.

In this blog post I will share my experiences working with the most common libraries for developing the messaging services.

Before we get started I would like to remind you that the node package manager, npm, offers a lot of different modules.  Also, in a network application you may have different endpoints with different roles that communicate in different ways. In this first part of the blog, we will see the messaging pattern publish/subscribe. Later I will be showing you the request/reply and push/pull patterns.

The publish/subscribe pattern is very useful when an endpoint (publisher) needs to notify others endpoints (subscribers) messages, even if the publisher doesn’t know the subscribers. This pattern provides you with great scalability and flexibility on your network topology.

So, let’s get started!

We’ll use a cross-platform library called 0MQ (Zero-M-Q), this is a very common library in order to work with messaging queue standars, since it provides hight-scalability and low latency messaging and it has its own event loop development model.

If you want to install the 0MQ library you can find the instructions in the official page: http://www.zeromq.org

In my case, I’m using Mac OS X with Homebrew:

zmq-download-core

After you install the 0MQ base library, you’ll need to install the 0MQ node module. Create a directory named node-messaging-services, open a terminal and navigate to this directory. Once you’ve created a node project under it, you can install the zmq node module:

npm-zmq

After that, you will have the library in your package.json and, of course, the zmq node module under node_module folder.

The Publisher

The next step is to create two new files: the first one named viking-publisher.js, the second one test.txt (only to create some events and send this events to the subscribers).

All the code for our publisher endpoint will be in the  viking-publisher.js file, so we should enter the following code:

'use strict'
const
    fs=require('fs'),
    zmq=require('zmq'),

    vik_publisher = zmq.socket('pub'),
    filename = process.argv[2];

fs.watch(filename,function(){
    vik_publisher.send(JSON.stringify({
        type:'changed',
        file: filename,
        timestamp: Date.now()
    }))
})

vik_publisher.bind('tcp://*:3000',function(err){
    console.log('Listening for some viking subscriber');
})

What is happening with the code? Well, the first lines import the fs module (for the “read” and “write”  file system) and the zmq module (the library that we imported earlier). After that, we created the publisher vik_publisher and took the argument from the command line, this argument gave us the path of the file that we wanted to watch (in our case, the file will be test.txt).

The fs.watch function creates a file-system watcher, witch invokes the publisher’s send() method. Its important to know that the send() method doesn’t do any formatting of the message itself (only push bytes down the wire), it’s our job to serialise and deserialise any messages sent through 0MQ.

Finally, within the code you could see the binding of the publisher, telling it to listening to TCP port 3000. For running the publisher we should write this in the console:

viking_publisher_start

Now, we need to implement a subscriber.

The Subscriptor

Create a new file name: viking-subscriber.js. Then, write the following code:

'use strict'
const   
    zmq = require('zmq'),
    viking_subscriber = zmq.socket('sub');

viking_subscriber.subscribe("");
viking_subscriber.on("message",function(data){
    let 
        message = JSON.parse(data),
        date = new Date(message.timestamp);
        console.log("Viking modify the file: "+ message.file + " chenage at "  + date);
});

viking_subscriber.connect("tcp://localhost:3000");

The first lines of this code import the zmq library and create the subscriber viking_subscriber.

Then, on the viking_subscriber.subscribe(“”) we are telling  the subscriber that we want to receive all messages. Then on the viking_subscriber.on(“message”), we are telling it how to process the message when the publisher sends it.

Finally, we are telling it where the publisher is with the viking_subscriber.connect(“tcp://localhost:3000”).

Now let’s run the subscriber:

subscriber_run_start

If we modify the test.txt file, we will see in the subscriber window the message from the publisher, telling us that the file was modified:

subcriber_ricive_message

Great! Now the viking_publisher.js and the viking_subscriber.js are able to successfully communicate over 0MQ socket!

A great functionality of 0MQ if you kill the publisher process, is that nothing happens with the subscriber process, it waits  for a message even if the publisher is down. But when the publisher starts up again, 0MQ will automatically reconnect the endpoints. It doesn’t matter witch endpoint starts up first, 0MQ gives you  stability without any hard work from you.

If you want to download this code, please follow the link

Thanks for reading and stay tuned for part 2!

Related posts