Skip to content

Toro Cloud Dev Center


RabbitMQ as the instance message broker

RabbitMQ, like most messaging systems, provides a common platform for sending and receiving messages. It is a messaging broker that operates using the Advanced Message Queueing Protocol (AMQP)1. Martini supports brokers that implement the Java Message Service (JMS)2 API. RabbitMQ is not a JMS provider but provides support for it using its JMS topic exchange plugin and JMS client libraries. Martini will need these components to connect to a RabbitMQ broker.

Limitations

As of this writing, RabbitMQ's JMS client does not fully implement the JMS API. Using unsupported methods may result in connection failures or undelivered messages. As a consequence of these limitations, Martini uses an in-memory broker for WebSocket messages. No WebSocket messages will be routed to RabbitMQ.

Prerequisites

Before connecting to a RabbitMQ broker, the following components must be added:

  • RabbitMQ topic exchange plugin, which is a plugin included with RabbitMQ by default although it will need enabling by executing:

    1
    rabbitmq-plugins enable rabbitmq_jms_topic_exchange
    

    After executing the command, the plugin is activated automatically. A broker restart is not needed. * RabbitMQ Management Plugin, which is a plugin used for Martini runtime to query the list of destination name. * RabbitMQ JMS client library v2.6.0, which contains RabbitMQ's implementation of the JMS API. You can download the JAR file via here. * RabbitMQ Java client library v5.16.0, which is a dependency of the JMS client library required for Java applications to interact with RabbitMQ. You can also download the JAR file via here.

After downloading the required libraries, place them in <martini-home>/lib/ext/.

Configuration

  1. Modify the configuration file.

    The configuration for RabbitMQ JMS clients is located in <martini-home>/conf/broker/rabbitmq.xml. Open this file and change the uri property value to amqp://{username}:{password}@{broker-ip-address}:{port}. Variables enclosed in braces ({}) must be substituted accordingly.

  2. Use the configuration file.

    By default, Martini uses an embedded instance of ActiveMQ. To configure Martini to use the RabbitMQ configuration file instead, open the <martini-home>/data/override.properties file and then add or change the following property:

    1
    jms.configuration-file=rabbitmq
    
  3. Restart Martini.

    Before the configuration takes effect, you will need to restart your Martini instance.

    If everything is configured correctly, Martini should be able to start. You can check the connection status under the Connections tab using the RabbitMQ management plugin.

    RabbitMQ web interface, Connections tab

RabbitMQ is not a JMS provider however JMS Client for RabbitMQ implements the JMS 1.1 specification on top of the RabbitMQ Java client, thus allowing new and existing JMS applications to connect to RabbitMQ.

It is recommended to review RabbitMQ documentation regarding with compatibility of JMS client before attempting to do any integrations.

Martini runtime supports publishing and receiving JMS message however publishing and receiving AMQP formatted message requires special handling.

Defining AMQP destination

The behaviour of RabbitMQ will vary depending on whether it is receiving a message formatted as JMS or AMQP. It will be necessary to consider all of the clients of the RabbitMQ and the native message format they will be using. Ensuring that all the clients of RabbitMQ are using the same messaging format will mean that they can all publish and subscribe to the same RabbitMQ destination.

For Martini runtime to send or publish to a AMQP queue, the destination requires at least one of the parameter.

Parameter Default Value Description
amqp false Set to true
exchangeName Default RabbitMQ exchange name Exchange name where the message will be sent to. This is only required for sending an AMQP message but not required for receiving. If there is no amqp parameter set in the desitnation name then simply having a exchangeName parameter regardless if it is empty or not will automatically set amqp parameter to true internally.
routingKey Queue/Topic destination name Exchange name where the message will be sent to.This is only required for sending an AMQP message but not required for receiving. If there is no amqp parameter set in the desitnation name then simply having a exchangeName parameter regardless if it is empty or not will automatically set amqp parameter to true internally.

An example destination string for amqp will look like this queue://com.torocloud.Hello?amqp=true&exchangeName=NotADefaultExchange&routingKey=johndoe

The destination string will be mainly used in areas such as in JMS listener endpoint or JMSFuntion.

Sending & receiving message

AMQP message

Sending

When trying to send a message to an AMQP queue, it will require the parameter amqp set to true in your destination string. Parameters such as exchangeName and routingKey will be set to default if empty.

When selecting an AMQP destination in Martini Desktop and Martini Online destination selector GUI, it will always return all AMQP destination with exchangeName and routingKey using their corresponding default value.

For users who prefer to send message with custom exchangeName and routingKey, you will need to create a service and use the JMSFuntion and define the destination name with the necessary parameter.

Sending AMQP message to a JMS queue will not be properly deserialize

Sending a AMQP formatted message to an exchange that has a bindings to a JMS queue will result in improper deserialization. Ensure the queue bindings does not have jms.durable.queues exchange in its binding.

Receiving

To be able to listen for AMQP message, you can configure your JMS listener endpoint listen to destination with amqp parameter set to true.

Receiving a message from AMQP will either be formatted as BytesMessage or TextMessage (provided the AMQP sender included message header JMSType=TextMessage).

JMS incompatibility

JMS Listener endpoint listening to AMQP queue will not be able to deserialize message that came from a JMS Client.

JMS message

Queue
Sending

Sending a JMS message to a non-existent JMS destination will automatically create the destination queue in RabbitMQ broker however sending a message to a pre-existing destination may fail if the queue is not configured with the following parameters.

RabbitMQ JMS client requires the pre-existing destination queue to be configured with the following configuration:

Config name Expected value
Queue type classic
Durability Durable
Auto delete true

If the existing destination queue is not configured properly you will get a PRECONDITION_FAILED channel error log in Martini Runtime.

Receiving

Receiving a JMS message will work out of the box however receiving a AMQP message will not be deserialized properly unless the the destination has the amqp parameter set to true. If the amqp parameter for the destination is set to false then when receiving an AMQP message Martini Runtime will log an error similar to the one below:

1
2
31/10/22 08:45:55.501 WARN  [DefaultMessageListenerContainer] Setup of JMS message listener invoker failed for 
destination 'queue://com.jms.Queue' - trying to recover. Cause: invalid stream header: 65617265

Topic
Sending

Unlike with sending message in queue where the queue is automatically created in RabbitMQ broker, topic creation behaves differently where its creation only happens on topic subscription which is by creating a JMS listener endpoint. The creation of topic will not be registered in the RabbitMQ broker using its destination name but a auto generated name following the pattern jms-cons-{UUID}. See here for more information.

Sending a message to a topic will work out of the box without any conditions.

Receiving

Receiving a message with the JMS Listener will work out of the box for JMS message however receiving a AMQP message will not be deserialized properly and Martini runtime will log similar error below.

1
2
31/10/22 08:45:55.501 WARN  [DefaultMessageListenerContainer] Setup of JMS message listener invoker failed for 
destination 'queue://com.jms.Topic' - trying to recover. Cause: invalid stream header: 65617265


  1. Advanced Message Queueing Protocol (AMQP) is a standard for asynchronous messaging and is designed to support a large variety of messaging applications. 

  2. Java Message Service (JMS) is a set of interfaces that allows Java applications to communicate with other messaging implementations.