This documentation is for WSO2 Enterprise Integrator version 6.0.0 . View documentation for the latest release in the 6.x.x family and the latest release in the 7.x.x family.

All docs This doc
Skip to end of metadata
Go to start of metadata

You can send request-response messages using the RabbitMQ transport by implementing a Remote Procedure Call(RPC) scenario with RabbitMQ.

The following diagram illustrates a remote procedure call scenario with RabbitMQ:

 

The remote procedure call works as follows:

  • When WSO2 Enterprise Integrator(WSO2 EI) starts up, it creates an anonymous, exclusive callback queue. 
  • For a remote procedure call request, WSO2 EI sends a message with the following properties:
    • reply_to : This is set to the callback queue
    • correlation_id : This is set to a unique value for every request.
  • The request is then sent to the rpc_queue.
  • The RPC Server waits for requests on that queue. When a request appears, it does the job and sends a message with the result back to the WSO2 EI server, using the queue from the reply_to field with the same correlation_id.

  • WSO2 EI waits for data on the reply_to queue. When a message appears, it checks the correlation_id property. If it matches the value from the request, it returns the response to the application.

The following is a sample proxy service named RabbitMQRPCProxy that sends request-response messages using the RabbitMQ transport.

<proxy xmlns="http://ws.apache.org/ns/synapse"
   	name="RabbitMQRPCProxy"
   	startOnLoad="true"
   	trace="enable"
       transports="http">
   <description/>
   <target>
  	<inSequence>
     	<log level="full">
        	<property name="received" value="true"/>
     	</log>
     	<send>
        	<endpoint>
           	<address uri="rabbitmq://?rabbitmq.server.host.name=localhost&amp;rabbitmq.server.port=5672&amp;rabbitmq.server.user.name=guest&amp;rabbitmq.server.password=guest&amp;rabbitmq.queue.name=rpc_queue&amp;rabbitmq.queue.routing.key=rpc_queue&amp;rabbitmq.replyto.name=dummy"/>
        	</endpoint>
     	</send>
  	</inSequence>
  	<outSequence>
     	<log level="full">
        	<property name="response" value="true"/>
     	</log>
     	<send/>
  	</outSequence>
   </target>
</proxy>

The following is the code for a sample RPC server:

package rpc;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;

public class RPCServer {

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume("rpc_queue", false, consumer);

            System.out.println(" [x] Awaiting RPC requests");

            while (true) {
                String response = null;
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                BasicProperties props = delivery.getProperties();
                BasicProperties replyProps =
                        new BasicProperties.Builder().correlationId(props.getCorrelationId()).contentType("text/xml")
                                                     .build();

                response =
                        "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" " +
                        "xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\">\n" +
                        "   <soapenv:Header/>\n" +
                        "   <soapenv:Body>\n" +
                        "      <ser:placeOrder>\n" +
                        "         <!--Optional:-->\n" +
                        "         <ser:order>\n" +
                        "            <!--Optional:-->\n" +
                        "            <xsd:price>10</xsd:price>\n" +
                        "            <!--Optional:-->\n" +
                        "            <xsd:quantity>5</xsd:quantity>\n" +
                        "            <!--Optional:-->\n" +
                        "            <xsd:symbol>RMQ</xsd:symbol>\n" +
                        "         </ser:order>\n" +
                        "      </ser:placeOrder>\n" +
                        "   </soapenv:Body>\n" +
                        "</soapenv:Envelope>";

                String replyToQueue = props.getReplyTo();
                System.out.println("Publishing to : " + replyToQueue);
                channel.basicPublish("", replyToQueue, replyProps, response.getBytes("UTF-8"));
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • No labels