ADVANCED
Message Queue Integration
Test message queue systems like ActiveMQ, RabbitMQ, and JMS using Karate's listen/signal pattern for event-driven and async architectures.
Why Test Message Queues?
Message queue testing validates critical async workflows in modern distributed systems:
- Event-driven validation: Verify messages flow correctly through your architecture
- End-to-end coverage: Test complete workflows from API triggers to queue consumers
- Async workflow testing: Confirm eventual consistency and message processing
- Integration confidence: Ensure producers and consumers work together correctly
Core Pattern: Listen and Signal
Basic Listen and Signal
The listen
keyword waits for async events, and karate.signal()
triggers the wait from Java callbacks:
Feature: Basic message queue pattern
Background:
# Custom Java class that wraps your queue consumer
* def QueueConsumer = Java.type('com.mycompany.QueueConsumer')
* def queue = new QueueConsumer('test-queue')
# JavaScript handler that signals when message arrives
* def messageHandler = function(msg) { karate.signal(msg) }
# Register handler with queue consumer
* queue.listen(karate.toJava(messageHandler))
Scenario: Wait for queue message
# Trigger action that sends message to queue
Given url apiUrl + '/trigger-event'
And request { eventType: 'user-registered', userId: 123 }
When method post
Then status 202
# Wait for message on queue (timeout 5 seconds)
* listen 5000
* def message = listenResult
* match message.eventType == 'user-registered'
* match message.userId == 123
Understanding karate.toJava()
JavaScript functions must be wrapped with karate.toJava()
when passed to Java code:
Scenario: JavaScript to Java conversion
# JavaScript function
* def myHandler = function(data) {
karate.log('Received:', data);
karate.signal(data);
}
# Convert to Java Function interface
* def javaHandler = karate.toJava(myHandler)
# Pass to Java consumer
* def Consumer = Java.type('com.example.MessageConsumer')
* Consumer.registerCallback(javaHandler)
Handling Timeouts
Always check for null when listen times out:
Scenario: Handle listen timeout
* listen 5000
# listenResult will be null if timeout occurs
* if (listenResult == null) karate.fail('Timeout waiting for message')
# Safe to access message
* match listenResult.status == 'completed'
ActiveMQ Integration
ActiveMQ Setup
Connect to ActiveMQ and consume messages from queues or topics:
// Example Java wrapper class for ActiveMQ
public class ActiveMQConsumer {
private final ConnectionFactory factory;
private final Session session;
private final MessageConsumer consumer;
public ActiveMQConsumer(String brokerUrl, String queueName) throws Exception {
this.factory = new ActiveMQConnectionFactory(brokerUrl);
Connection connection = factory.createConnection();
connection.start();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
this.consumer = session.createConsumer(queue);
}
public void listen(Function<String, Void> handler) {
consumer.setMessageListener(msg -> {
try {
String text = ((TextMessage) msg).getText();
handler.apply(text);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
ActiveMQ Queue Example
Test order processing workflow with ActiveMQ queues:
Feature: ActiveMQ order processing
Background:
* def ActiveMQConsumer = Java.type('com.mycompany.ActiveMQConsumer')
* def orderQueue = new ActiveMQConsumer('tcp://localhost:61616', 'order-events')
* def orderId = null
* def orderHandler = function(msg) {
var order = JSON.parse(msg);
if (order.id == orderId) {
karate.signal(order);
}
}
* orderQueue.listen(karate.toJava(orderHandler))
Scenario: Order creation via ActiveMQ
# Create order via REST API
Given url apiUrl + '/orders'
And request { customerId: 123, items: [{ sku: 'WIDGET-001', quantity: 2 }] }
When method post
Then status 201
* orderId = response.id
# Wait for order event on ActiveMQ queue
* listen 10000
* def orderEvent = listenResult
* match orderEvent.id == orderId
* match orderEvent.status == 'PENDING'
* match orderEvent.total == '#number'
ActiveMQ Topic Subscription
Subscribe to topics for publish-subscribe patterns:
Feature: ActiveMQ topic subscription
Background:
* def ActiveMQTopicConsumer = Java.type('com.mycompany.ActiveMQTopicConsumer')
* def notificationTopic = new ActiveMQTopicConsumer('tcp://localhost:61616', 'notifications')
* def notificationHandler = function(msg) {
var notification = JSON.parse(msg);
if (notification.type === 'email-sent') {
karate.signal(notification);
}
}
* notificationTopic.subscribe(karate.toJava(notificationHandler))
Scenario: Email notification via topic
# Trigger email send
Given url apiUrl + '/notifications/send'
And request { userId: 456, template: 'welcome', email: 'user@example.com' }
When method post
Then status 202
# Wait for notification on topic
* listen 8000
* def notification = listenResult
* match notification.type == 'email-sent'
* match notification.recipient == 'user@example.com'
* match notification.sentAt == '#string'
RabbitMQ Integration
RabbitMQ Connection
Connect to RabbitMQ exchanges and queues:
// Example RabbitMQ consumer wrapper
public class RabbitMQConsumer {
private final Connection connection;
private final Channel channel;
public RabbitMQConsumer(String host, int port, String exchange, String queue) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
this.connection = factory.newConnection();
this.channel = connection.createChannel();
channel.exchangeDeclare(exchange, "topic", true);
channel.queueDeclare(queue, true, false, false, null);
channel.queueBind(queue, exchange, "#");
}
public void subscribe(Function<String, Void> handler) {
DeliverCallback callback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
handler.apply(message);
};
channel.basicConsume(queue, true, callback, consumerTag -> {});
}
}
RabbitMQ Work Queue Pattern
Distribute tasks across multiple workers using work queues:
Feature: RabbitMQ work queue
Background:
* def RabbitMQConsumer = Java.type('com.mycompany.RabbitMQConsumer')
* def taskQueue = new RabbitMQConsumer('localhost', 5672, 'tasks', 'image-processing')
* def taskHandler = function(msg) {
var task = JSON.parse(msg);
if (task.status === 'completed') {
karate.signal(task);
}
}
* taskQueue.subscribe(karate.toJava(taskHandler))
Scenario: Image processing task
# Submit image processing task
Given url apiUrl + '/images/process'
And multipart file image = { read: 'test-image.jpg', contentType: 'image/jpeg' }
And multipart field operation = 'thumbnail'
When method post
Then status 202
* def taskId = response.taskId
# Wait for task completion message
* listen 15000
* def taskResult = listenResult
* match taskResult.taskId == taskId
* match taskResult.status == 'completed'
* match taskResult.outputUrl == '#string'
RabbitMQ Routing Patterns
Use routing keys for targeted message delivery:
Feature: RabbitMQ routing
Background:
* def RabbitMQConsumer = Java.type('com.mycompany.RabbitMQConsumer')
# Subscribe to high-priority messages only
* def priorityQueue = new RabbitMQConsumer('localhost', 5672, 'events', 'high-priority')
* def handler = function(msg) { karate.signal(JSON.parse(msg)) }
* priorityQueue.subscribe(karate.toJava(handler))
Scenario: Priority message routing
# Send high-priority alert
Given url apiUrl + '/alerts'
And request { severity: 'critical', message: 'Service down', priority: 'high' }
When method post
Then status 201
# Verify message routed to high-priority queue
* listen 5000
* def alert = listenResult
* match alert.severity == 'critical'
* match alert.priority == 'high'
JMS (Java Message Service)
Generic JMS Setup
Use standard JMS API for queue-agnostic integration:
// Generic JMS consumer
public class JMSConsumer {
private final Session session;
private final MessageConsumer consumer;
public JMSConsumer(ConnectionFactory factory, String queueName) throws Exception {
Connection connection = factory.createConnection();
connection.start();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
this.consumer = session.createConsumer(queue);
}
public void listen(Function<String, Void> handler) throws Exception {
consumer.setMessageListener(msg -> {
try {
if (msg instanceof TextMessage) {
handler.apply(((TextMessage) msg).getText());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
JMS Message Selectors
Filter messages using JMS selectors:
Feature: JMS message selectors
Background:
* def ConnectionFactory = Java.type('org.apache.activemq.ActiveMQConnectionFactory')
* def factory = new ConnectionFactory('tcp://localhost:61616')
* def JMSConsumer = Java.type('com.mycompany.JMSConsumer')
# Create consumer with message selector
* def selector = "priority > 5 AND type = 'ORDER'"
* def consumer = new JMSConsumer(factory, 'orders', selector)
* def handler = function(msg) { karate.signal(JSON.parse(msg)) }
* consumer.listen(karate.toJava(handler))
Scenario: High-priority order filtering
# Create high-priority order
Given url apiUrl + '/orders'
And request { customerId: 789, priority: 8, items: ['item1'] }
When method post
Then status 201
# Verify only high-priority messages received
* listen 5000
* def order = listenResult
* match order.priority == 8
* match order.customerId == 789
Transactional Messaging
Handle transactional message processing:
Feature: Transactional JMS
Background:
* def TransactionalJMS = Java.type('com.mycompany.TransactionalJMSConsumer')
* def txConsumer = new TransactionalJMS('tcp://localhost:61616', 'payments')
* def paymentHandler = function(msg) {
var payment = JSON.parse(msg);
karate.signal(payment);
return true; # Acknowledge message
}
* txConsumer.listen(karate.toJava(paymentHandler))
Scenario: Process payment transaction
# Initiate payment
Given url apiUrl + '/payments'
And request { amount: 99.99, currency: 'USD', orderId: 'ORD-12345' }
When method post
Then status 200
# Wait for transactional confirmation
* listen 10000
* def paymentConfirmation = listenResult
* match paymentConfirmation.status == 'COMMITTED'
* match paymentConfirmation.transactionId == '#string'
Message Correlation
Correlation ID Pattern
Track related messages across distributed workflows:
Feature: Message correlation
Background:
* def correlationId = java.util.UUID.randomUUID().toString()
* def messages = []
* def QueueConsumer = Java.type('com.mycompany.QueueConsumer')
* def consumer = new QueueConsumer('workflow-events')
* def correlationHandler = function(msg) {
var event = JSON.parse(msg);
if (event.correlationId === correlationId) {
messages.push(event);
if (messages.length >= 3) {
karate.signal(messages);
}
}
}
* consumer.listen(karate.toJava(correlationHandler))
Scenario: Multi-step workflow correlation
# Start workflow with correlation ID
Given url apiUrl + '/workflows'
And request {
correlationId: '#(correlationId)',
steps: ['validate', 'process', 'notify']
}
When method post
Then status 202
# Wait for all correlated events
* listen 20000
* def workflowEvents = listenResult
* match workflowEvents == '#[3]'
* match each workflowEvents[*].correlationId == correlationId
* match workflowEvents[0].step == 'validate'
* match workflowEvents[1].step == 'process'
* match workflowEvents[2].step == 'notify'
Request-Reply Pattern
Implement synchronous-style communication over async queues:
Feature: Request-reply over queues
Background:
* def replyQueueId = java.util.UUID.randomUUID().toString()
* def ReplyConsumer = Java.type('com.mycompany.QueueConsumer')
* def replyQueue = new ReplyConsumer('reply-' + replyQueueId)
* def replyHandler = function(msg) { karate.signal(JSON.parse(msg)) }
* replyQueue.listen(karate.toJava(replyHandler))
Scenario: Send request and wait for reply
# Send request to processing queue
Given url apiUrl + '/queue/process'
And request {
data: 'complex-calculation',
replyTo: 'reply-#(replyQueueId)'
}
When method post
Then status 202
# Wait for reply on dedicated queue
* listen 10000
* def reply = listenResult
* match reply.result == '#number'
* match reply.processingTime == '#number'
Error Handling and Timeouts
Dead Letter Queue Handling
Monitor and validate dead letter queue behavior:
Feature: Dead letter queue handling
Background:
* def DLQConsumer = Java.type('com.mycompany.QueueConsumer')
* def dlq = new DLQConsumer('DLQ.failed-messages')
* def dlqHandler = function(msg) { karate.signal(JSON.parse(msg)) }
* dlq.listen(karate.toJava(dlqHandler))
Scenario: Verify failed message routing to DLQ
# Send invalid message that will fail processing
Given url apiUrl + '/messages'
And request { invalid: 'data', missing: 'required-fields' }
When method post
Then status 202
# Verify message ends up in DLQ
* listen 15000
* def dlqMessage = listenResult
* match dlqMessage.error == '#string'
* match dlqMessage.retryCount == '#number'
* match dlqMessage.originalMessage == '#present'
Retry Strategies
Implement message retry with backoff:
Feature: Message retry handling
Background:
* def retryCount = 0
* def maxRetries = 3
* def RetryConsumer = Java.type('com.mycompany.QueueConsumer')
* def consumer = new RetryConsumer('retry-queue')
* def retryHandler = function(msg) {
var message = JSON.parse(msg);
retryCount = message.retryCount || 0;
karate.signal(message);
}
* consumer.listen(karate.toJava(retryHandler))
Scenario: Message retry with exponential backoff
# Send message that may need retries
Given url apiUrl + '/unreliable-process'
And request { data: 'test', retryPolicy: 'exponential' }
When method post
Then status 202
# Wait for final retry or success
* listen 30000
* def finalMessage = listenResult
* assert finalMessage.retryCount <= maxRetries
* match finalMessage.status contains ['SUCCESS', 'FAILED']
Best Practices
Resource Cleanup
Always clean up queue consumers after tests:
Feature: Queue resource management
Background:
* def consumers = []
* def createConsumer = function(queueName) {
var QueueConsumer = Java.type('com.mycompany.QueueConsumer');
var consumer = new QueueConsumer(queueName);
consumers.push(consumer);
return consumer;
}
Scenario: Proper cleanup
* def consumer = createConsumer('test-queue')
* def handler = function(msg) { karate.signal(msg) }
* consumer.listen(karate.toJava(handler))
# Test logic here
* listen 5000
# Cleanup all consumers
* def cleanup = function() { consumers.forEach(c => c.close()) }
* cleanup()
Test Isolation
Ensure queue tests do not interfere with each other:
Feature: Test isolation for queues
Background:
# Use unique queue names per test
* def testId = java.util.UUID.randomUUID().toString()
* def queueName = 'test-queue-' + testId
* def QueueConsumer = Java.type('com.mycompany.QueueConsumer')
* def consumer = new QueueConsumer(queueName)
Scenario: Isolated queue test
# Each scenario uses its own queue
* def handler = function(msg) { karate.signal(msg) }
* consumer.listen(karate.toJava(handler))
# Send message to isolated queue
Given url apiUrl + '/send-to-queue'
And request { queueName: '#(queueName)', message: 'test' }
When method post
* listen 5000
* match listenResult == 'test'
Timeout Configuration
Set appropriate timeouts based on queue characteristics:
Queue Type | Recommended Timeout | Use Case |
---|---|---|
Fast processing | 2-5 seconds | Real-time notifications |
Moderate processing | 10-15 seconds | Order processing, emails |
Heavy processing | 30-60 seconds | Video transcoding, reports |
Batch processing | 2-5 minutes | Nightly jobs, aggregations |
Next Steps
- Learn about async patterns: Polling and Async Operations
- Understand Java integration: Java API
- Add conditional logic: Conditional Logic
- Debug async issues: Debugging