Skip to main content

ADVANCED

Message Queue Integration

Test message queue systems like ActiveMQ, RabbitMQ, and JMS using Karate's listen keyword and karate.signal() for async event handling. The pattern works with any queue system through custom Java wrappers.

On this page:

Listen and Signal

The listen keyword waits for an async event, and karate.signal() triggers completion. The listenResult variable holds the signaled value.

Gherkin
Feature: Basic async pattern

Scenario: Wait for async event
# Setup handler that signals when message arrives
* def handler = function(msg){ karate.signal(msg) }

# Trigger some async operation
Given url 'https://jsonplaceholder.typicode.com'
And path 'posts'
And request { title: 'Test', body: 'Content', userId: 1 }
When method post
Then status 201

# Wait for async event (timeout in milliseconds)
* listen 5000
* def result = listenResult
* match result == '#notnull'

The listen keyword blocks until either:

  • karate.signal(value) is called (from any thread)
  • The timeout expires (listenResult will be null)

Queue Consumer Pattern

The core pattern involves a custom Java class that wraps your message queue client. The Java class calls back into Karate when messages arrive.

Gherkin
Feature: ActiveMQ queue testing

Background:
# Load custom Java queue consumer class
* def QueueConsumer = Java.type('mock.contract.QueueConsumer')
* def queue = new QueueConsumer(queueName)

# Create handler that signals Karate when message arrives
* def handler = function(msg){ karate.signal(msg) }

# Register handler - karate.toJava() converts JS function to Java Function
* queue.listen(karate.toJava(handler))

* url paymentServiceUrl + '/payments'

Scenario: Create payment and verify queue message
Given request { amount: 5.67, description: 'test one' }
When method post
Then status 200
And match response == { id: '#number', amount: 5.67, description: 'test one' }
And def id = response.id

# Wait for message on queue
* listen 5000
* json shipment = listenResult
* match shipment == { paymentId: '#(id)', status: 'shipped' }
karate.toJava()

JavaScript functions must be wrapped with karate.toJava() when passed to Java code. This converts the JS function to a Java Function interface that your queue consumer can call.

Java Consumer Wrapper

Your Java wrapper class handles the queue-specific connection logic and calls the handler function when messages arrive:

Java
public class QueueConsumer {
private final MessageConsumer consumer;

public QueueConsumer(String queueName) throws Exception {
// Queue-specific connection setup
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
this.consumer = session.createConsumer(queue);
}

public void listen(Function<String, Void> handler) {
consumer.setMessageListener(msg -> {
try {
String text = ((TextMessage) msg).getText();
handler.apply(text); // Calls back to karate.signal()
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
Java Function Pattern

For better performance with multi-threaded code, consider using a Java factory method that returns a Function directly instead of wrapping JavaScript functions:

public static Function<String, Void> createHandler(Object karateSignal) {
return msg -> { /* call karate.signal */ return null; };
}

Message Filtering

Filter messages in your handler to wait for specific events:

Gherkin
Feature: Filtered message handling

Background:
* def QueueConsumer = Java.type('com.mycompany.QueueConsumer')
* def orderQueue = new QueueConsumer('order-events')
* def targetOrderId = null

# Handler filters for specific order ID
* def orderHandler = function(msg) {
var order = JSON.parse(msg);
if (order.id == targetOrderId) {
karate.signal(order);
}
}
* orderQueue.listen(karate.toJava(orderHandler))

Scenario: Wait for specific order event
# Create order via API
Given url apiUrl + '/orders'
And request { customerId: 123, items: [{ sku: 'WIDGET-001', quantity: 2 }] }
When method post
Then status 201
* targetOrderId = response.id

# Wait for matching order event
* listen 10000
* def orderEvent = listenResult
* match orderEvent.id == targetOrderId
* match orderEvent.status == 'PENDING'

Message Correlation

Track related messages across distributed workflows using correlation IDs:

Gherkin
Feature: Correlated message handling

Background:
* def correlationId = java.util.UUID.randomUUID().toString()
* def messages = []

* def QueueConsumer = Java.type('com.mycompany.QueueConsumer')
* def consumer = new QueueConsumer('workflow-events')

# Collect all messages with matching correlation ID
* def correlationHandler = function(msg) {
var event = JSON.parse(msg);
if (event.correlationId == correlationId) {
messages.push(event);
if (messages.length >= 3) {
karate.signal(messages);
}
}
}
* consumer.listen(karate.toJava(correlationHandler))

Scenario: Multi-step workflow with correlation
# Start workflow with correlation ID
Given url apiUrl + '/workflows'
And request { correlationId: '#(correlationId)', steps: ['validate', 'process', 'notify'] }
When method post
Then status 202

# Wait for all correlated events
* listen 20000
* def workflowEvents = listenResult
* match workflowEvents == '#[3]'
* match each workflowEvents[*].correlationId == correlationId

Handling Timeouts

Always check for null when listen times out:

Gherkin
Feature: Timeout handling

Scenario: Handle missing message
* def QueueConsumer = Java.type('com.mycompany.QueueConsumer')
* def queue = new QueueConsumer('test-queue')
* def handler = function(msg){ karate.signal(msg) }
* queue.listen(karate.toJava(handler))

# Trigger async operation
Given url apiUrl + '/trigger'
When method post

# Wait with timeout
* listen 5000

# Check if message arrived
* if (listenResult == null) karate.fail('Timeout waiting for queue message')

# Safe to use result
* match listenResult.status == 'completed'

Timeout Guidelines

Processing TypeRecommended Timeout
Real-time notifications2-5 seconds
Order processing10-15 seconds
Heavy processing (video, reports)30-60 seconds
Batch jobs2-5 minutes

Dead Letter Queue Testing

Verify failed messages route to dead letter queues:

Gherkin
Feature: Dead letter queue validation

Background:
* def DLQConsumer = Java.type('com.mycompany.QueueConsumer')
* def dlq = new DLQConsumer('DLQ.failed-messages')
* def dlqHandler = function(msg){ karate.signal(JSON.parse(msg)) }
* dlq.listen(karate.toJava(dlqHandler))

Scenario: Invalid message goes to DLQ
# Send invalid message that will fail processing
Given url apiUrl + '/messages'
And request { invalid: 'data', missing: 'required-fields' }
When method post
Then status 202

# Verify message ends up in DLQ
* listen 15000
* def dlqMessage = listenResult
* match dlqMessage.error == '#string'
* match dlqMessage.originalMessage == '#present'

Test Isolation

Use unique queue names per test to prevent interference:

Gherkin
Feature: Isolated queue tests

Background:
# Unique queue name per test run
* def testId = java.util.UUID.randomUUID().toString()
* def queueName = 'test-queue-' + testId

* def QueueConsumer = Java.type('com.mycompany.QueueConsumer')
* def consumer = new QueueConsumer(queueName)
* def handler = function(msg){ karate.signal(msg) }
* consumer.listen(karate.toJava(handler))

Scenario: Isolated test
Given url apiUrl + '/send-to-queue'
And request { queueName: '#(queueName)', message: 'test data' }
When method post

* listen 5000
* match listenResult == 'test data'

Next Steps