Thursday, October 29, 2009

Writing an Erlang PubSub Client with Exmpp

I've been working on a system to monitor and debug web applications remotely. One of the goals of it is to receive notifications when something wrong has happened, i.e.: the load went wild, the database is overloaded, etc. While I won't be describing the whole system in this post, I'd like to present a component of it, which is a PubSub client that connects to a XMPP server to be notified of events.

In my case I've already set up an Ejabberd server, where I've configured a couple of PubSub nodes for testing purposes. In this tutorial we will see how to create an Erlang client to receive such notifications.

Regarding the PubSub service, you can read about it here. Basically you create a PubSub node, and then send notifications to it. Then those users who are interested in those notification can subscribe to the node and receive them. It works similar to Twitter, where you follow someone and then you receive his messages in your timeline.

Our client will be written in Erlang an will use the Exmpp library. Follow the instructions on their site to see how to install it.

NOTE: when I first started with Exmpp the code was constantly failing. After some research I found that the problem was caused by the compilation on Snow Leopard. In case you fall into the same problem, run the configure command like this:

CC='gcc -m32' CFLAGS=-m32 LDFLAGS=-m32 ./configure

I got that trick from here which also works with Exmpp.

First we create the folder pubsub_client and inside we set up the structure for our project. We will follow the recommendations found here:

It will look like this:

pubsub_client/
- ebin/
- include/
- priv/
- src/

At the root of the project we will add a Makefile –the code for it is provided at the end of the post– and this shell script that will launch the Erlang Console:

#!/bin/sh
cd `dirname $0`
exec erl -pa $PWD/ebin -boot start_sasl -s exmpp

There we tell the Erlang environment to add the compiled files found inside ebin to the code load path and then it will start the exmpp application, which is required in order to use the functions from that library.

Our client will be built around the gen_server behavior. When launched it will connect to the Ejabberd server and wait for notifications. When they arrive we will print them to the tty. Also we will add a function that will let us subscribe to PubSub nodes.

To start lets create a file called pubsub_client.erl inside the src folder. Then we add the following content to it:

-module(pubsub_client).

-behaviour(gen_server).

-export([start/4, start_link/4, stop/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-include_lib("exmpp/include/exmpp.hrl").
-include_lib("exmpp/include/exmpp_client.hrl").

-record(state, {session, jid}).

There we defined our module name: pubsub_client and we declared it to be a gen_server behavior. Then we export some functions to start and stop the client. Below are the exports of the required gen_server callbacks and we include the exmpp.hrl and exmpp_client.hrl headers, because we will need some macros that are defined there.

The last line defines a record called state, which will hold our XMPP session and our JID.

Lets write now the init/1 function where we will initialize our connection to the server.

init({Host, Port, User, Password}) ->
{ok, {MySession, MyJID}} = pubsub_utils:connect(Host, Port, User, Password),
{ok, #state{session=MySession, jid=MyJID}}.

This function expects a four element tuple with the parameters to use for the connection, that is, server Host and Port, the User name and the Password. Those parameters are passed to the pubsub_utils:connect/4 function, which will return the term {ok, Session, JID} on success. If everything its OK, our init function will return {ok, State} where State is the record that we defined before, holding our Session and our JID.

Then we have to add the functions that will take care of starting and stopping the system:

start(Host, Port, User, Password) ->
gen_server:start({local, ?MODULE}, ?MODULE, {Host, Port, User, Password}, []).

start_link(Host, Port, User, Password) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, {Host, Port, User, Password}, []).
stop() ->
gen_server:cast(?MODULE, stop).

The functions start/4 and start_link/4 both expects four parameters, which will we passed to the init function. The only difference is that start_link/4 will link the client to our current process. You can read more about them and the whole gen_server behavior here.

Then with stop/1 we send and asynchronous call to our module to tell it to stop. Asynchronous calls are sent using gen_server:cast/2 and they will be processed by the handle_cast/2 module callbacks. This are our handle_cast/2 implementations:

handle_cast(stop, State) -> {stop, normal, State};
handle_cast(_Msg, State) -> {noreply, State}.

In the first one we expect only the stop message while the second one is some sort of catch all handler. Because the return of the first one is {stop, normal, State} our client will receive a terminate message, which we handle in the following function:

terminate(_Reason, #state{session=MySession}) ->
pubsub_utils:disconnect(MySession),
ok.

As you can see there, first we call pubsub_utils:disconnect/1 passing our session identifier as parameter to close our XMPP connection and the we return ok.

NOTE: In the link provided at the end, we can find the source of the pubsub_utils module along with the whole project. You can read more about the implementation of the connection method here.

So far we can connect and disconnect from the Ejabberd server. Now lets write the function that will handle the notifications.

When a notification is received, the Exmpp library will send a message with it to our gen_server process. When a gen_server process receives a message that was not generateed by a gen_server:cast or gen_server:call –an their similar functions– it will processed by the handle_info/2 handler. This handler expects two parameters, Info and State, where Info is the message received which in our case willb the XMPP packet.

Here's our implementation:

handle_info(#received_packet{packet_type='message'}=Packet, State) ->
process_received_packet(Packet, Fun),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.

NOTE: We will talk later about the unbound variable Fun that you see there

The first one expects a received_packet record as message. That record is defined inside the exmpp_client.hrl and is used by the Exmpp library to deliver messages. The second handle_info/2 handler will work as a catch all handler.

In our case the packet must be of the message type, which we specify in our pattern matching declaration. If so we delegate the processing to the process_received_packet/2 function.

Now is time to process the notification, which in our case means printing them to the tty. Here's the code:

process_received_packet(#received_packet{raw_packet=Raw}, Fun) ->
Event = exmpp_xml:get_element(Raw, ?NS_PUBSUB_EVENT, 'event'),
Items = exmpp_xml:get_element(Event, 'items'),
exmpp_xml:foreach(Fun, Items),
ok.

To accomplish this we have to parse the incoming packet. Our process_received_packet/2 function will take care of that. In its declaration we extract the Raw packet via pattern matching. Then first we use the exmpp_xml:get_element/3 function to get the received PubSub event. This helper function from the Exmpp library expects the XML element from where we will extract the child node, the namespace the child should have -?NS_PUBSUB_EVENT in our case– and the name of the XML element of the child node. In our example we want an event node.

Then from that XML element we extract the Items, which is what we are interested in. The items node will have one or several children containing the notification we expect inside an item node –note the plural difference–.

Once we have the Items we pass them to the exmpp_xml:foreach/2 function along with the Fun parameter. The exmpp_xml:foreach/2 will iterate over the child elements of Items and will apply to them the anonymous function contained in Fun. In case you need it, the first argument passed to the anonymous function will be the original XML element. For this to work we have to pass this Fun to to process_received_packet/2.

Let's declare that fun inside the init/1 function and add them to the state of the process. This will be our new init/1 function:

init({Host, Port, User, Password}) ->
{ok, {MySession, MyJID}} = pubsub_utils:connect(Host, Port, User, Password),
Fun = fun(_XML_Element, Child) ->
case exmpp_xml:get_element(Child, 'log') of
undefined -> not_a_notification;
Log ->
io:format("Notification: ~s~n", [exmpp_xml:get_cdata_as_list(Log)])
end
end,
{ok, #state{session=MySession, jid=MyJID, on_message=Fun}}.

Our fun will take two parameters as we discussed and it will print to the tty the log using io:format/2. There's some XML processing in place there, to extract our log text out. This fun will be added to our process State inside the new on_message field of our state record, which we will have to modify for this to work:

-record(state, {session, jid, on_message}).

Once we have that code in place we change the header of our handle_info/2 function so we can extract the Fun out of the state record:

handle_info(#received_packet{packet_type='message'}=Packet, #state{on_message=Fun}=State) ->

The final piece of our puzzle is to actually subscribe to a node, here's what we can do. We will add a function to our process API that will do this for us:

subscribe(Service, Node) ->
gen_server:call(?MODULE, {subscribe, Service, Node}).

And we export it adding the following line after the gen_server export callbacks:

-export([subscribe/2]).

Our function subscribe/2 will send a synchronous call to our process passing two parameters. The PubSub service Name and the Node. The next step is to implement the handle_call/3 callback:

handle_call({subscribe, Service, Node}, _From, #state{session=MySession, jid=MyJID}=State) ->
IQ = exmpp_client_pubsub:subscribe(exmpp_jid:to_list(MyJID), Service, Node),
PacketId = exmpp_session:send_packet(MySession, exmpp_stanza:set_sender(IQ, MyJID)),
PacketId2 = erlang:binary_to_list(PacketId),
Reply =
receive
#received_packet{id=PacketId2, raw_packet=Raw} ->
case exmpp_iq:is_error(Raw) of
true -> error;
_ -> ok
end
end,
{reply, Reply, State};

There we use the helper function from the Exmpp library to generate an IQ stanza that will be sent to the XMPP server to subscribe our user. The interesting part of that code is the receive block. Let's review it. When we send an IQ packet to the server it will reply to us with another IQ packet. In order to track the IQ requests and responses, the XMPP protocol adds an id attribute to the stanza. That's why we only wait for messages sent to our process that contain that packet id. You may be wondering why we have the receive block here. We want to know if our subscription succeeded or if it failed. If we don't have any receive block there, then the reply will be handled by the handle_info/2 function. There will be quite hard to track the IQs ids and match them. In our simple case could be easy, but in a more complex scenario we may encounter problems when our process is receiving several messages concurrently.

To try our module we can do the following sequence at the command line:

cd /path/to/our/project
make
./start-dev.sh

There we compile the code and launch the Erlang console. Then inside the console we input the following:

pubsub_publisher:start("localhost", 5222, "publisher", "password").
pubsub_publisher:create_node("pubsub.localhost", "logs").

Then we can open a new terminal window an do the following
cd /path/to/our/project
./start-dev.sh
pubsub_client:start("localhost", 5222, "tutorial", "password").
pubsub_client:subscribe("pubsub.localhost", "logs").

Then we can go back to the first window and issue this command:

pubsub_publisher:send_message("pubsub.localhost", "some notification", "logs").

If everything worked well, then we should see the message being displayed where the pubsub_client is running.

BONUS: Display the log message as a Growl notification

You have the right to wonder why in the code above we pass the anonymous function around in the process state. I did that, because later we can hook a new function to process the notifications, say, to send them directly to Growl :) –for the non Mac users, Growl is a free application for Mac that can display notifications on our screen more info here–.

If you review the code provided at the end of this post, there's a function pubsub_client:use_growl/0 that when called will swap the log handler and instead of displaying the notifications in the tty, it will forward them to Growl. Since the implementation is easy to understand I won't explain it here.

Thanks for reading this far and I hope this tutorial will push you to create some nice erlang applications using Exmpp and PubSub.

PubSub client Code.

2 comments:

Artem Golovinsky said...

hi,
I try to use your client with superfeedr.com and get some errors. May be could you help me in my issue?

Alvaro said...

Hi Garry, besides that I don't know about the errors that you are getting I guess that the problem is that the exmpp library has changed. I submitted some bug fixes related to this and after that the API changed. So probably you have to check their API docs, which are pretty good BTW.