Apache ActiveMQ Load Balancing Example
1. Introduction
Apache ActiveMQ (AMQ) is a message broker which transfers the message from the sender to the receiver. Load Balancing is the process of distributing data across services for better performance.
In this example, we will demonstrate how to build a load-balanced AMQ client application.
2. The Component Diagram
In this example, we will demonstrate two forms of load balancing outlined in the diagram:
- A message producer sends messages to multiple AMQ brokers
- Messages in a queue are consumed by multiple competing consumers
3. Technologies used
The example code in this article was built and run using:
- Java 1.8.101 (1.8.x will do fine)
- Maven 3.3.9 (3.3.x will do fine)
- Apache ActiveMQ 5.8.0 and 5.15.0 (others will do fine)
- Spring JMS 4.1.5.RELEASE (others will do fine)
- Eclipse Neon (Any Java IDE would work)
4. Start two ActiveMQ Brokers
4.1 Configure ActiveMQ with Non-Default Port
4.1.1 Update activemq.xml
Navigate to the ..\apache-activemq-5.8.0\conf directory. Update the activemq.xml file at the transportConnector element.
activemq.xml transportConnectors
<transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61716?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> </transportConnectors>
4.1.2 Update jetty.xml
Go to the ..\apache-activemq-5.8.0\conf directory. Update the jetty.xml file at the bean element .
jetty.xml port
<bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector">
<property name="port" value="8761" />
</bean>4.2 Start ActiveMQ Brokers
In this example, we will start two AMQ instances:
- Broker 1 – AMQ 5.15.0 at default port 61616/8161
- Broker 2 – AMQ 5.8.0 at port 61716/8761
Go to the ..\apache-activemq-5.x.0\bin directory. Then click the activemq.bat file.
If you can go to http://localhost:8161/admin/index.jsp, then the broker 1 is started fine.
You do the same for broker 2 at http://localhost:8761/admin/index.jsp.
5. Producer Load Balancing Example
In this example, we will demonstrate how to build MessageSender which uses the Round-robin method to send messages to two AMQ brokers.
5.1 Dependency
Add dependency to Maven pom.xml.
pom.xml
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.0</version> </dependency> </dependencies>
5.2 Constants
There are five constants value used in this example.
DemoConstants
package jcg.demo.util;
import java.util.Random;
/**
* The constant data used in this Demo
* @author Mary.Zheng
*
*/
public final class DemoConstants{
public static final int MESSAGE_SIZE = 100;
public static final String PRODUCER_DESTINATION = "test.queue.lb.producer";
public static final String CONSUMER_DESTINATION = "test.queue.lb.consumer";
public static String BROKER_1_URI = "tcp://localhost:61616";
public static String BROKER_2_URI = "tcp://localhost:61716";
public static String buildDummyMessage() {
Random rand = new Random();
int value = rand.nextInt(MESSAGE_SIZE);
return "dummy message " + value;
}
}
5.3 Spring configuration
Add the JMS Spring configuration.
JmsConfig
package jcg.demo.spring.config;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import jcg.demo.spring.jms.component.JmsExceptionListener;
import jcg.demo.util.DemoConstants;
@Configuration
@EnableJms
@ComponentScan(basePackages = "jcg.demo.spring.jms.component")
public class JmsConfig {
@Bean
@Autowired
public ConnectionFactory jmsConnectionFactory(JmsExceptionListener jmsExceptionListener) {
return createJmsConnectionFactory(DemoConstants.BROKER_1_URI, jmsExceptionListener);
}
@Bean
@Autowired
public ConnectionFactory jmsConnectionFactory_2(JmsExceptionListener jmsExceptionListener) {
return createJmsConnectionFactory(DemoConstants.BROKER_2_URI, jmsExceptionListener);
}
private ConnectionFactory createJmsConnectionFactory(String brokerURI, JmsExceptionListener jmsExceptionListener) {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURI);
activeMQConnectionFactory.setExceptionListener(jmsExceptionListener);
CachingConnectionFactory pooledConnection = new CachingConnectionFactory(activeMQConnectionFactory);
return pooledConnection;
}
@Bean(name = "jmsQueueTemplate_1")
@Autowired
public JmsTemplate createJmsQueueTemplate(ConnectionFactory jmsConnectionFactory) {
return new JmsTemplate(jmsConnectionFactory);
}
@Bean(name = "jmsQueueTemplate_2")
@Autowired
public JmsTemplate createJmsQueueTemplate_2(ConnectionFactory jmsConnectionFactory_2) {
return new JmsTemplate(jmsConnectionFactory_2);
}
}
- line 25: Create connection factory to broker 1
- line 31: Create connection factory to broker 2
- line 42: Create
JmsTemplateto broker 1 - line 48: Create
JmsTemplateto broker 2
5.4 MessageSender
Create MessageSender Spring component to send messages.
MessageSender
package jcg.demo.spring.jms.component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
/**
* This is Spring component which finds the load-balanced JmsTemplate via
* Round-Robin from the list of available JmsQueueTemplates to send the message
*
* @author Mary.Zheng
*
*/
@Component
public class MessageSender {
@Autowired
private List jmsQueueTemplates = new ArrayList();
private AtomicInteger current = new AtomicInteger(0);
private JmsTemplate findJmsTemplate_LB() {
int cur = current.getAndIncrement();
int index = cur % jmsQueueTemplates.size();
System.out.println("\tFind Load balanced JmsTemplate[ " + index + " ]");
return jmsQueueTemplates.get(index);
}
public void postToQueue(final String queueName, final String message) {
System.out.println("MessageSender postToQueue started");
this.findJmsTemplate_LB().send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
- line 25: Spring dependency injection will add both broker’s
JmsTemplatetojmsQueueTemplates - line 30-36: Use Round-robin logic to find the
JmsTemplate - line 40: Send the message with load-balanced
JmsTemplate
5.5 MessageProducerApp
Create MessageProducer application.
MessageProducerApp
package jcg.demo.activemqlb.producer;
import java.util.Scanner;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import jcg.demo.spring.config.JmsConfig;
import jcg.demo.spring.jms.component.MessageSender;
import jcg.demo.util.DemoConstants;
@Configuration
public class MessageProducerApp {
public static void main(String[] args) throws Exception {
try (AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(JmsConfig.class)) {
context.register(MessageProducerApp.class);
String queueName = readDestination();
MessageSender springJmsProducer = (MessageSender) context.getBean("messageSender");
for (int i = 0; i < DemoConstants.MESSAGE_SIZE; i++) {
springJmsProducer.postToQueue(queueName, DemoConstants.buildDummyMessage());
}
}
}
private static String readDestination() {
System.out.println("Enter Destination: P - Producer, C - Consumer : ");
try (Scanner scanIn = new Scanner(System.in)) {
String inputString = scanIn.nextLine();
scanIn.close();
if (inputString.equalsIgnoreCase("P")) {
return DemoConstants.PRODUCER_DESTINATION;
}
return DemoConstants.CONSUMER_DESTINATION;
}
}
}
- line 16: Start Spring context from
JmsConfig - line 20: Get
messageSenderSpring bean
5.6 Execute MessageProducerApp
Below is the application output when you input P on the prompt. Make sure both brokers are running.
Execution Output
Enter Destination: P - Producer, C - Consumer : P MessageSender postToQueue started Find Load balanced JmsTemplate[ 0 ] MessageSender postToQueue started Find Load balanced JmsTemplate[ 1 ] MessageSender postToQueue started Find Load balanced JmsTemplate[ 0 ] MessageSender postToQueue started Find Load balanced JmsTemplate[ 1 ] ......
As you see here, two JmsTemplates take turns to send a total of 100 messages to their connected broker.
Go to http://localhost:8161/admin/queues.jsp for broker 1 and http://localhost:8761/admin/queues.jsp for broker 2. You should see that each broker has 50 pending messages at test.queue.lb.producer.
6. Consumer Load Balancing Example
In this example, we will demonstrate how to build the MessageConsumerApp which consumes the messages from a queue. We also show how to run two of them concurrently.
6.1 MessageConsumerWithPrefetch
AMQ brokers set default prefetch size 1000, so we have to set the prefetch size to 1 to allow two consumers consume messages concurrently.
MessageConsumerWithPrefetch
package jcg.demo.activemqlb.consumer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* A simple message consumer which consumes the message from ActiveMQ Broker
* with pre-fetch size set to 1 instead of default 1000.
*
* @author Mary.Zheng
*
*/
public class MessageConsumerWithPrefetch implements MessageListener {
private static final String JMS_PREFETCH_POLICY_ALL_1 = "?jms.prefetchPolicy.all=1";
private String activeMqBrokerUri;
private String username;
private String password;
private String destinationName;
public MessageConsumerWithPrefetch(String activeMqBrokerUri, String username, String password) {
super();
this.activeMqBrokerUri = activeMqBrokerUri + JMS_PREFETCH_POLICY_ALL_1;
this.username = username;
this.password = password;
}
public void run() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, activeMqBrokerUri);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
setComsumer(session);
connection.start();
System.out.println(String.format("MessageConsumerWithPrefetch Waiting for messages at %s from %s",
destinationName, this.activeMqBrokerUri));
}
private void setComsumer(Session session) throws JMSException {
Destination destination = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
}
@Override
public void onMessage(Message message) {
String msg;
try {
msg = String.format("MessageConsumerWithPrefetch Received message [ %s ]",
((TextMessage) message).getText());
Thread.sleep(10000);// sleep for 10 seconds
System.out.println(msg);
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
public String getDestinationName() {
return destinationName;
}
public void setDestinationName(String destinationName) {
this.destinationName = destinationName;
}
}
- line 23, 31 : Set the AMQ
prefetchPolicy
6.2 MessageConsumerApp
Create MessageConsumerApp which consumes from the consumer queue based on the selected broker.
MessageConsumerApp
package jcg.demo.activemqlb.consumer;
import java.util.Scanner;
import javax.jms.JMSException;
import jcg.demo.util.DemoConstants;
public class MessageConsumerApp {
public static void main(String[] args) {
String brokerUri = readBrokerInstance();
consume_queue_with_prefetchsize(brokerUri);
}
private static void consume_queue_with_prefetchsize(String brokerUri) {
MessageConsumerWithPrefetch queueMsgListener = new MessageConsumerWithPrefetch(brokerUri, "admin", "admin");
queueMsgListener.setDestinationName(DemoConstants.CONSUMER_DESTINATION);
try {
queueMsgListener.run();
} catch (JMSException e) {
e.printStackTrace();
}
}
private static String readBrokerInstance() {
System.out.println("MessageConsumerApp listens at Broker Instance ( 1 or 2 ): ");
try (Scanner scanIn = new Scanner(System.in)) {
String inputString = scanIn.nextLine();
scanIn.close();
if (inputString.equalsIgnoreCase("1")) {
return DemoConstants.BROKER_1_URI;
}
return DemoConstants.BROKER_2_URI;
}
}
}
6.3 Execute MessageConsumerApp in Eclipse
Starts the MessageConsumerApp via Eclipse.
MessageConsumerApp Output
MessageConsumerApp listens at Broker Instance ( 1 or 2 ): 1 MessageConsumerWithPrefetch Waiting for messages at test.queue.lb.consumer from tcp://localhost:61616?jms.prefetchPolicy.all=1
6.4 Execute MessageConsumerApp via Jar command
First, export the MessageConsumerApp as a jar: activemq-lb.jar. Open the command prompt and enter the command java -jar activemq-lb.jar.
MessageConsumerApp Output
C:\JDK8_CTLSS\Java Code Geek Examples>java -jar activemq-lb.jar MessageConsumerApp listens at Broker Instance ( 1 or 2 ): 1 MessageConsumerWithPrefetch Waiting for messages at test.queue.lb.consumer from tcp://localhost:61616?jms.prefetchPolicy.all=1
6.5 Summary
There are two consumer applications listening at test.queue.lb.consumer after the steps 6.3 and 6.4,
Monitoring both output while executing the MessageProducerApp built at step 5.5 to send 100 messages to test.queue.lb.consumer. You should see both consumers are receiving the messages. The screenshot below shows both consumers consumed 25 messages from test.queue.lb.consumer after all messages are processed.
6.6 Things to consider
The AMQ message is dispatched based on the First-In, First-Out(FIFO) algorithm. If the messages must be processed based on the order entered, then running the consumer concurrently must be planned accordingly to avoid an error. Please check out ActiveMQ tutorial for detail.
7. Conclusion
In this example, we built two Java AMQ client applications:
MessageProducerAppsends the message to two AMQ brokers via Round-robin algorithm to reduce the data load at each AMQ Broker- Two
MessageConsumerAppsconsume the messages from the same queue to reduce the data load at the AMQ queue
8. Download the Source Code
This example built two Java AMQ client applications (producer and consumer) to achieve the load balancing requirement.
You can download the full source code of this example here: Apache ActiveMQ Load Balancing Example






