Skip to content

Toro Cloud Dev Center


ActiveMQ in a master-slave setup

This guide will go over the process of configuring two ActiveMQ instances in a master-slave setup. This type of set-up is typically done to improve performance (by distributing loads) and high-availability by adding a failover instance, which are both important when running Martini in a production environment.

Martini uses the configured broker when indexing data with Tracker and Monitor; in fact, a message is created, published, received, and resolved per transaction. By enhancing ActiveMQ's performance, it should be safe to assume that Martini's overall performance will also improve, especially if your organization uses search indices a lot.

Prerequisites

To implement this setup, you must have two instances of ActiveMQ running; one will be the master instance, and the other, the slave. It is recommended to host these instances on separate machines, but if this arrangement is not an option, then it is still possible to deploy them on the same machine. An NFS server will also be required in order to share data between these instances.

Configuration, authentication and authorization

Before diving into this guide, it is recommended to read the previous pages on general remote ActiveMQ instance configuration, as well as setting up remote ActiveMQ instances with authentication and authorization.

Procedure

Configuring the NFS server & NFS clients

A network file system (NFS) is a setup that involves a server and a client. An NFS enables you to mount directories to share and access files directly within a network.

Below are the steps required in order to configure NFS servers and NFS clients for CentOS 7. Of course, it is possible to use a different operating system as long as you are able to correctly configure the NFS server its clients.

Configuring the NFS server

  1. Determine which instance or machine will act as the NFS server.
  2. Install NFS packages using yum:

    1
    yum install nfs-utils
    

    nfs-utils package

    The nfs-utils package contains the tools for both the server and client which are necessary for the kernel's NFS abilities.

  3. Determine and create the directory which will be shared for the NFS setup.

    TORO recommends creating and sharing the /datastore/activemq-data directory, but it is also possible to choose a different directory which will satisfy your preferences and/or needs better.

    1
    mkdir -p /datastore/activemq-data
    
  4. Set the NFS directory's permissions.

    Once you have chosen and have created the NFS directory in the NFS server, you can now set the permissions of the folder using the chmod and chown commands.

    1
    chmod -R 7777 /datastore/activemq-data && chown $user:$group /datastore/activemq-data
    
  5. Start and enable services.

    1
    2
    3
    4
    5
    6
    7
    8
    systemctl enable rpcbind
    systemctl enable nfs-server
    systemctl enable nfs-lock
    systemctl enable nfs-idmap
    systemctl start rpcbind
    systemctl start nfs-server
    systemctl start nfs-lock
    systemctl start nfs-idmap
    
  6. Share the NFS directory to enable client access.

    1. First, open the /etc/exports file and edit it with your preferred text editor.

      1
      vi /etc/exports
      
    2. Next, input the following:

      1
      /datastore/activemq-data    <nfs-client-ip>(rw,sync,no_root_squash,no_all_squash)
      

    A connection must be establishable

    The server and client must be able to ping each other.

  7. Start the NFS service using the command:

    1
    systemctl restart nfs-server
    
  8. Configure the firewall for CentOS 7.

    1
    2
    3
    4
    firewall-cmd --permanent --zone=public --add-service=nfs
    firewall-cmd --permanent --zone=public --add-service=mountd
    firewall-cmd --permanent --zone=public --add-service=rpc-bind   
    firewall-cmd --reload
    

Configuring the NFS client

  1. Install NFS packages using yum:

    1
    yum install nfs-utils
    
  2. Create the NFS directory mount point(s).

    1
    mkdir /datastore/activemq-data
    
  3. Mount the directory.

    1
    mount -t nfs <nfs-server-ip>:/datastore/activemq-data /datastore/activemq-data
    
  4. Verify the NFS mount.

    1
    df -kh
    

    After executing this command, you should see a list of file systems and belonging in that list should be the NFS configured earlier.

Setting up a permanent NFS mount

By default, you will have to remount all your NFS directories after every reboot. To make sure it is available upon reboot, you may follow the steps below:

  1. Open the fstab file using the text editor of your choice.

    1
    vi /etc/fstab
    
  2. Edit the file and add the following line to the file to have the mount points automatically configured after reboot:

    1
    <nfs-server-ip>:/datastore/activemq-data    /datastore/activemq-data    nfs   defaults 0 0
    

Configuring Martini

To tell Martini that there will be two instances of ActiveMQ, edit the activemq.uri in the application.properties file. This property contains the list of broker connection URLs, separated by commas if there is more than one broker, which is the case for this set-up. For example:

1
activemq.uri=failover:tcp://<activemq1-ip-address>:61616,tcp://<activemq2-ip-address>:61616?CloseAsync=false

Embedded ActiveMQ

You can select an embedded ActiveMQ instance as the fail-over option by doing something like:

1
activemq.uri=failover:tcp://<activemq1-ip-address>:61616,tcp://<activemq2-ip-address>:61616,tcp://0.0.0.0:61616?CloseAsync=false

... where tcp://0.0.0.0:61616 is the URL for the embedded ActiveMQ instance.

Duplicate JMS client IDs

No two instances should be using the same jms.client-id or the connection will fail.

ActiveMQ configuration

Remote ActiveMQ instances can be configured via XML or via the command line. TORO and ActiveMQ both recommend configuring your instance via XML, as this type of configuration allows for advanced configuration options and flexibility. This is also the type of configuration the following sections will be referencing, as we discuss which components of the ActiveMQ instance should be configured for a master-slave setup.

Configuring the broker bean element

The broker bean element is used to configure ActiveMQ instance-wide properties. TORO recommends the following setup for your remote ActiveMQ instance:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
<broker xmlns="http://activemq.apache.org/schema/core"
    brokerName="activemq-broker1"
    dataDirectory="${activemq.data}"
    useJmx="true" advisorySupport="true"
    persistent="true"
    deleteAllMessagesOnStartup="false"
    useShutdownHook="false"
    schedulerSupport="true"
    start="false">
<!-- ... -->
</broker>

In fact, the configuration above is used for remote ActiveMQ instances deployed for Martini instances on TORO Cloud.

Configuring destination policies

According to the ActiveMQ documentation on per destination policies:

"Multiple different policies can be applied per destination (queue or topic), or using wildcard notation to apply to a hierarchy of queues or topics, making it possible, therefore, to configure how different regions of the JMS destination space are handled."

When using XML configuration, destination policies are defined using the <destinationPolicy> tag, which is under the <broker> XML element. For example, the following configuration sets the behavior of topics:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<destinationPolicy>
<policyMap>
    <policyEntries>
        <policyEntry topic=">"
            producerFlowControl="false"
            gcInactiveDestinations="true"
            inactiveTimeoutBeforeGC="5000"
            memoryLimit="5mb"
            expireMessagesPeriod="0"
            advisoryWhenFull="true"
            maxDestinations="200" >
            <pendingMessageLimitStrategy>
                <constantPendingMessageLimitStrategy limit="20"/> <!-- 20 seconds -->
            </pendingMessageLimitStrategy>
            <dispatchPolicy>
                <strictOrderDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
                <timedSubscriptionRecoveryPolicy recoverDuration="60000" /> <!-- 1 minute -->
            </subscriptionRecoveryPolicy>
        </policyEntry>
    </policyEntries>
</policyMap>
</destinationPolicy>

Changes

Changes are immediately applied once saved. Check your logs for confirmation.

Configuring the management context

This is not necessary but if you'd like to be able to control the behavior of the broker via JMX MBeans, then you must configure the ActiveMQ instance's management context. Doing so will allow you to connect, monitor, and control the instance remotely via JConsole.

1
2
3
<managementContext>
    <managementContext createConnector="true" rmiServerPort="1098" connectorPort="1099"/>
</managementContext>

With this, you will be able to connect to the service via JConsole using the URL service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi.

Configuring the KahaDB directory

According to the ActiveMQ website, "KahaDB is a file-based persistence database that is local to the message broker that is using it." You must set KahaDB's directory to /datastore/activemq-data, which was created earlier, like so:

1
2
3
<persistenceAdapter>
    <kahaDB directory="/datastore/activemq-data" indexWriteBatchSize="1000" enableIndexWriteAsync="true"/>
</persistenceAdapter>

Configuring transport connectors

Since this master-slave setup features a fail-over, you must use the openwire transport connector. This transport connector allows brokers to talk to each other over a network. The configuration below shows the transport connector for the master broker instance:

1
2
3
4
5
<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?jms.useAsyncSend=true&amp;randomize=false&amp;initialReconnectDelay=100&amp;maximumConnections=1000&amp;nested.wireFormat.maxInactivityDuration=1000" updateClusterClients="true"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?jms.useAsyncSend=true&amp;randomize=false&amp;initialReconnectDelay=100&amp;maximumConnections=1000&amp;nested.wireFormat.maxInactivityDuration=1000" updateClusterClients="true"/>
    <!-- Other transport connectors... -->
</transportConnectors>

Default ports

  • 61616 is the default TCP port
  • 8161 is the default web console port
  • 5672 is the default AMQP port

Summary

After successfully following through this guide, you should now have an ActiveMQ master-slave setup for Martini. Your final activemq.xml file should look roughly something like:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans                             http://www.springframework.org/schema/beans/spring-beans.xsd                            http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>
    <!-- Allows access to the server logs -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop" />
    <!-- The <broker> element is used to configure the ActiveMQ broker. -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemq-broker1" dataDirectory="${activemq.data}" useJmx="true" advisorySupport="true" persistent="true" deleteAllMessagesOnStartup="false" useShutdownHook="false" schedulerSupport="true" start="false">
        <!--
            For better performances use VM cursor and small memory limit.
            For more information, see:

            http://activemq.apache.org/message-cursors.html

            Also, if your producer is "hanging", it's probably due to producer flow control.
            For more information, see:
            http://activemq.apache.org/producer-flow-control.html
        -->
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry topic="&gt;" producerFlowControl="false" gcInactiveDestinations="true" inactiveTimeoutBeforeGC="5000" memoryLimit="5mb" expireMessagesPeriod="0" advisoryWhenFull="true" maxDestinations="200">
                        <!--
                            The constantPendingMessageLimitStrategy is used to prevent
                            slow topic consumers to block producers and affect other consumers
                            by limiting the number of messages that are retained
                            For more information, see:

                            http://activemq.apache.org/slow-consumer-handling.html
                        -->
                        <pendingMessageLimitStrategy>
                            <constantPendingMessageLimitStrategy limit="20" /> <!-- 20 seconds worth -->
                        </pendingMessageLimitStrategy>
                        <dispatchPolicy>
                            <strictOrderDispatchPolicy />
                        </dispatchPolicy>
                        <subscriptionRecoveryPolicy>
                            <timedSubscriptionRecoveryPolicy recoverDuration="60000" /> <!--  1 minute worth -->
                        </subscriptionRecoveryPolicy>
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>
        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="true" rmiServerPort="1098" connectorPort="1099" />
        </managementContext>
        <plugins>
            <jaasAuthenticationPlugin configuration="activemq-domain" />
            <runtimeConfigurationPlugin checkPeriod="1000" />
            <authorizationPlugin>
                <map>
                    <authorizationMap>
                        <authorizationEntries>
                            <!-- To make security on every topic, the application needs be to configured to have their unique identifier on their topic name -->
                            <authorizationEntry topic="jmsPrefix.statistics.&gt;" read="admins" write="admins" admin="admins" />
                            <authorizationEntry topic="jmsPrefix.io.toro.martini.&gt;" read="admins" write="admins" admin="admins" />
                            <authorizationEntry queue="jmsPrefix.io.toro.martini.&gt;" read="admins" write="admins" admin="admins" />
                            <authorizationEntry topic="ActiveMQ.Advisory.&gt;" read="admins" write="admins" admin="admins" />
                        </authorizationEntries>
                    </authorizationMap>
                </map>
            </authorizationPlugin>
        </plugins>
        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="/datastore/activemq-data" indexWriteBatchSize="1000" enableIndexWriteAsync="true" />
        </persistenceAdapter>
        <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
            If using ActiveMQ embedded - the following limits could safely be used:

        <systemUsage><systemUsage><memoryUsage><memoryUsage limit="20 mb"/></memoryUsage><storeUsage><storeUsage limit="1 gb"/></storeUsage><tempUsage><tempUsage limit="100 mb"/></tempUsage></systemUsage></systemUsage>
        -->
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb" />
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb" />
                </tempUsage>
            </systemUsage>
        </systemUsage>
        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?jms.useAsyncSend=true&amp;randomize=false&amp;initialReconnectDelay=100&amp;maximumConnections=1000&amp;nested.wireFormat.maxInactivityDuration=1000" updateClusterClients="true" />
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?jms.useAsyncSend=true&amp;randomize=false&amp;initialReconnectDelay=100&amp;maximumConnections=1000&amp;nested.wireFormat.maxInactivityDuration=1000" updateClusterClients="true" />
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
        </transportConnectors>
        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>
    </broker>
    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web console requires by default login; you can disable this in the jetty.xml file
        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml" />
</beans>
<!-- END SNIPPET: example -->

Jetty settings

The activemq.xml file will often either contain Jetty settings or import them from another file. For example:

1
<import resource="jetty.xml"/>

MCollective doesn't use this. If you're not using it to manage ActiveMQ, leaving it enabled may be a security risk.