This documentation is for WSO2 Enterprise Service Bus version 4.9.0 . View documentation for the latest release.

All docs This doc
Skip to end of metadata
Go to start of metadata

You can send request-response messages using the RabbitMQ transport by implementing a Remote Procedure Call(RPC) scenario with RabbitMQ.

The following diagram illustrates a remote procedure call scenario with RabbitMQ:


The remote procedure call works as follows:

  • When WSO2 ESB starts up, it creates an anonymous exclusive callback queue. 
  • For a remote procedure call request, the ESB sends a message with the following properties:
    • reply_to - This is set to the callback queue
    • correlation_id - This is set to a unique value for every request.
  • The request is then sent to the rpc_queue.
  • The RPC Server waits for requests on that queue. When a request appears, it does the job and sends a message with the result back to the ESB, using the queue from the reply_to field with the same correlation_id.

  • The ESB waits for data on the reply_to queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.

Following is a sample proxy service named RabbitMQRPCProxy that sends request-response messages using the RabbitMQ transport.

<proxy xmlns=""
     	<log level="full">
        	<property name="received" value="true"/>
           	<address uri="rabbitmq://?;rabbitmq.server.port=5672&amp;;rabbitmq.server.password=guest&amp;;rabbitmq.queue.routing.key=rpc_queue&amp;"/>
     	<log level="full">
        	<property name="response" value="true"/>

Following is the code for a sample RPC server:

package rpc;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;


public class RPCServerV {

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            connection = factory.newConnection();
            channel = connection.createChannel();
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume("rpc_queue", false, consumer);

            System.out.println(" [x] Awaiting RPC requests");

            while (true) {
                String response = null;
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                BasicProperties props = delivery.getProperties();
                BasicProperties replyProps =
                        new BasicProperties.Builder().correlationId(props.getCorrelationId()).contentType("text/xml")

                response =
                        "<soapenv:Envelope xmlns:soapenv=\"\" " +
                        "xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\">\n" +
                        "   <soapenv:Header/>\n" +
                        "   <soapenv:Body>\n" +
                        "      <ser:placeOrder>\n" +
                        "         <!--Optional:-->\n" +
                        "         <ser:order>\n" +
                        "            <!--Optional:-->\n" +
                        "            <xsd:price>10</xsd:price>\n" +
                        "            <!--Optional:-->\n" +
                        "            <xsd:quantity>5</xsd:quantity>\n" +
                        "            <!--Optional:-->\n" +
                        "            <xsd:symbol>RMQ</xsd:symbol>\n" +
                        "         </ser:order>\n" +
                        "      </ser:placeOrder>\n" +
                        "   </soapenv:Body>\n" +

                String replyToQueue = props.getReplyTo();
                System.out.println("Publishing to : " + replyToQueue);
                channel.basicPublish("", replyToQueue, replyProps, response.getBytes("UTF-8"));
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        } catch (InterruptedException e) {
        } catch (IOException e) {
        } finally {
            if (connection != null) {
                try {
                } catch (IOException e) {



  • No labels