ActiveMQ Generic Typed Client for Queue Access

The .Net client that exists to interact with ActiveMQ provides means to send and receive messages but it requires explicit knowledge of both the name of the queue that should be read from or pushed to and the type of the objects that are serialized into that queue. I find that these pieces of information are not necessary for communication and add complexity as well as instability in the case where differently typed objects are serialized and sent to the queue.

A better approach I’ve found for interacting with the queue is to abstract away the specific queue that is being used as well as the type of the object being pulled from it. This is done by using the fully qualified type of the object being pushed to the queue as the queue name. This means you no longer need to specify which specific queue a message should be sent to because it’s based on the type of the object being serialized into a message. This has the side effect of no longer needing to worry about the type of the object that is getting deserialized from the queue either since the queue is dependent on the type of object.

Lets start off with a definition of the object we’ll serialize and push to the queue.

namespace Organization.SubNamespace
{
    public class User
    {
        public string Name { get; set; }

        public int Age { get; set; }

        public DateTime Birthday { get; set; }
    }
}

Now that we have an object to serialize, lets look at the base definition of the queue service and some helper methods we’ll use.

public class GenericQueueService<T>
    : IQueuePublisher<T>, IQueueConsumer<T>, IAsyncQueueConsumer<T> where T : class
{
    private readonly IQueue queue;
    private readonly Serializer serializer;
    private ISession session;
    private IMessageConsumer messageConsumer;
    private IMessageProducer messageProducer;

    public event EventHandler<ConsumeQueueItemEventArgs<T>> ConsumeQueueItem;

    public GenericQueueService()
    {
        queue = new ActiveMQQueue(typeof(T).FullName);
        serializer = new Serializer();
    }

    private void EnsureConsumerExists()
    {
        if (messageConsumer != null)
        {
            return;
        }

        if (session == null)
        {
            session = CreateSession();
        }

        messageConsumer = session.CreateConsumer(queue);
    }

    private static ISession CreateSession()
    {
        IConnectionFactory connectionFactory = 
            new ConnectionFactory(Settings.Default.MQServer);
        IConnection connection = connectionFactory.CreateConnection();
        connection.Start();
        return connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
    }

    private void EnsureProducerExists()
    {
        if (messageProducer != null)
        {
            return;
        }

        if (session == null)
        {
            session = CreateSession();
        }

        messageProducer = session.CreateProducer(queue);
    }
}

On the class definition I’ve identified a number of interfaces that can be used to get at specific parts of this service depending on whether there’s a need to push or pull messages from the queue. Also included is an async specific listener so that messages can be pulled asynchronously. This implementation requires that a separate service be instantiated for each type of object that will be pushed/pulled from the queue.

Notice in the constructor when a the new ActiveMQQueue object is created the fully qualified type name is used as the queue in which to communicate with. A feature I like about ActiveMQ is that if a queue doesn’t exist yet pushing a message will create it on the fly.

public void Publish(T publishObject)
{
    EnsureProducerExists();

    ITextMessage textMessage = session.CreateTextMessage(serializer.Serialize(publishObject));
    messageProducer.Send(textMessage);
}

The above method first ensures a producer exists which can push messages to the queue. Next the object is serialized and the result passed to a TextMessage which is used host the serialized object. Finally the message is sent to the server.

public T Consume()
{
    EnsureConsumerExists();

    IMessage message = messageConsumer.Receive();
    ITextMessage textMessage = message as ITextMessage;
    if (textMessage == null)
    {
        throw new MQException("Message pulled from queue must be of type ITextMessage.");
    }

    return ProcessTextMessage(textMessage);
}

private T ProcessTextMessage(ITextMessage textMessage)
{
    if (textMessage == null)
    {
        throw new MQException("Message from queue must be of type ITextMessage.");
    }

    return serializer.Deserialize<T>(textMessage.Text);
}

Here we see how to consume messages synchronously. The message is retrieved from the queue, casted to an ITextMessage and its contents are then deserialized back into an object.

public void StartAsyncConsumer()
{
    EnsureConsumerExists();

    messageConsumer.Listener += messageConsumer_Listener;
}

private void messageConsumer_Listener(IMessage message)
{
    T item = ProcessTextMessage(message as ITextMessage);
    if (ConsumeQueueItem != null)
    {
        ConsumeQueueItem(this, new ConsumeQueueItemEventArgs<T>(item));
    }
}

This implementation requires that an instance of the queue service be created for each object type that is pushed to the message queue. An enhancement could be made to allow a single service that takes in multiple types and manages itself which queues the messages are passed to and which queues messages should be retrieved from.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s