This page describes how to install and configure the RabbitMQ AMQP transport. It contains the following sections:
AMQP is a wire-level messaging protocol that describes the format of the data that is sent across the network. If a system or application can read and write AMQP, it can exchange messages with any other system or application that understands AMQP, regardless of the implementation language.
The RabbitMQ AMQP transport is implemented using the RabbitMQ Java Client. It allows you to send or receive AMQP messages by calling an AMQP broker (RabbitMQ) directly without the need to use different transport mechanisms, such as JMS.
The following diagram illustrates a scenario where WSO2 ESB uses the RabbitMQ AMQP transport to exchange messages between RabbitMQ Java Clients by calling RabbitMQ brokers.
The RabbitMQ AMQP transport is developed as a separate module of the transports project. You install it using the Feature Manager in the Management Console.
To install the RabbitMQ AMQP transport as a feature:
p2-repo.zip
file and copy its path (such as /home/product/p2-repo
).You are now ready to configure the listener and sender of the transport in axis2.xml.
In the transport listeners section, add the following RabbitMQ transport listener, replacing the values with your host, port, username, and password to connect to the AMQP broker (you can optionally create multiple connection factories if you want this listener to connect to multiple brokers):
<transportReceiver name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQListener"> <parameter name="AMQPConnectionFactory" locked="false"> <parameter name="rabbitmq.server.host.name" locked="false">192.168.0.3</parameter> <parameter name="rabbitmq.server.port" locked="false">5672</parameter> <parameter name="rabbitmq.server.user.name" locked="false">user</parameter> <parameter name="rabbitmq.server.password" locked="false">abc123</parameter> </parameter> </transportReceiver>
In the transport senders section, add the following RabbitMQ transport sender, which will be used for sending AMQP messages to a queue:
<transportSender name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQSender"/>
You are now ready to create the RabbitMQ proxy service.
Following is a sample RabbitMQ proxy service named AMQPProxy that consumes AMQP messages from one RabbitMQ broker and publishes them to another:
<proxy xmlns="http://ws.apache.org/ns/synapse" name="AMQPProxy" transports="rabbitmq" statistics="disable" trace="disable" startOnLoad="true"> <target> <inSequence> <log level="full"/> <property name="OUT_ONLY" value="true"/> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> </inSequence> <endpoint> <address uri="rabbitmq:/AMQPProxy?rabbitmq.server.host.name=192.168.0.3&rabbitmq.server.port=5672&rabbitmq.server.user.name=user&rabbitmq.server.password=abc123&rabbitmq.queue.name=queue2&rabbitmq.exchange.name=exchange2"/> </endpoint> </target> <parameter name="rabbitmq.queue.name">queue1</parameter> <parameter name="rabbitmq.exchange.name">exchange1</parameter> <parameter name="rabbitmq.connection.factory">AMQPConnectionFactory</parameter> <description></description> </proxy>
Note the following:
rabbitmq
. You specify this keyname in the transports parameter (transports="rabbitmq"
).rabbbitmq
so that the RabbitMQ AMQP transport will be used to publish the message. Be sure to specify the rest of the parameters in the URI as shown above. If you do not which RabbitMQ exchange to use, leave the value blank to use the default exchange.rabbitmq.queue.name
parameter specifies the queue on which the proxy service will listen and consume messages. If you do not specify a name for this parameter, the name of the proxy service is also used as the queue name.rabbitmq.connection.factory
parameter specifies the listener that will listen on the queue and consume messages. In this example, the connection factory is set to the name of the listener we created earlier (AMQPConnectionFactory).rabbitmq.exchange.name
parameter specifies the RabbitMQ exchange to which the queue is bound. If you do not want to use a specific exchange, leave this value blank to use the default exchange.Following is a quick reference to all the RabbitMQ AMQP transport properties. The rabbitmq.server
properties refer to the server where RabbitMQ is running.
rabbitmq.server.host.name
– Host name of the server.rabbitmq.server.port
– Port value of the server.You can modify the sample proxy service above to handle cases where you only want to receive AMQP messages but send messages in a different format, or receive messages in a different format and send AMQP messages. You can also modify the proxy service to work with a different transport. For example, you could create a proxy that uses the RabbitMQ AMQP transport to listen to messages and then sends them over HTTP or JMS.
This section provides sample Java clients that you can use to send and receive AMQP messages. These clients can be used to test the scenario where the Sender publishes a message to a RabbitMQ AMQP queue, which is consumed by the product and published to another queue, which in turn is consumed by the Receiver. When you run these clients, the Receiver will get the messages sent by the Sender, confirming that you have correctly configured the RabbitMQ AMQP transport.
The following Java client sends SOAP XML messages to a RabbitMQ queue. This client was tested with SOAP messages sent and consumed from AMQP broker queues with content type “text/xml”. When specifying the queue name for publishing messages, be sure to specify the same queue where the RabbitMQ transport listener is listening.
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setUsername(userName); factory.setPassword(password); factory.setPort(port); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); channel.exchangeDeclare(exchangeName, "direct", true); channel.queueBind(queueName, exchangeName, routingKey); // The message to be sent String message = "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope">\n" + "<soapenv:Header/>\n" + "<soapenv:Body>\n" + " <p:greet xmlns:p=\"http://greet.service.kishanthan.org\">\n" + " <in>" + name + "</in>\n" + " </p:greet>\n" + "</soapenv:Body>\n" + "</soapenv:Envelope>"; // Populate the AMQP message properties AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder(); builder.messageId(messageID); builder.contentType("text/xml"); builder.replyTo(replyToAddress); builder.correlationId(correlationId); builder.contentEncoding(contentEncoding); // Custom user properties Map<String, Object> headers = new HashMap<String, Object>(); headers.put("SOAP_ACTION", "greet"); builder.headers(headers); // Publish the message to exchange channel.basicPublish(exchangeName, queueName, builder.build(), message.getBytes());
When specifying the queue name for consuming messages, be sure to specify the same queue configured in the proxy service endpoint.
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hostName); factory.setUsername(userName); factory.setPassword(password); factory.setPort(port); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); channel.exchangeDeclare(exchangeName, "direct", true); channel.queueBind(queueName, exchangeName, routingKey); // Create the consumer QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); // Start consuming messages while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); }