All docs This doc
Skip to end of metadata
Go to start of metadata

This section explains, through an example scenario, how the Composed Msg. Processor EIP can be implemented using WSO2 ESB. The following topics are covered:

Introduction to Composed Msg. Processor

The Composed Msg. Processor EIP is used to process a composite message. It maintains the overall message flow when processing a message consisting of multiple elements, each of which might require different processing. The Composed Message Processor splits the message up, routes the sub-messages to the appropriate destinations, and then aggregates the responses back into a single message. For more information, refer to http://www.eaipatterns.com/DistributionAggregate.html.

Figure 1: Composed Msg. Processor EIP

Example scenario

This example scenario demonstrates splitting a message into several requests, which are then routed to different servers, merged together, and sent back to the client. In this scenario, each server instance acts as an inventory controller. The user can have multiple requests in a single request message. The Iterate Mediator processes the request message and splits it. The ESB identifies the request content using the Switch mediator, and decides the routing destination. The Send mediator then sends the request message to the respective location (endpoint). The response, which will be sent from different endpoints, will be merged together using the Aggregate mediator of the ESB before sending back to the client.

The diagram below depicts how to simulate the example scenario using WSO2 ESB.

Figure 2: Composed Msg. Processor Example Scenario

Before digging into implementation details, let's take a look at the relationship between the example scenario and the Composed Msg. Processor EIP by comparing their core components.

Composed Msg. Processor EIP (Figure 1)Composed Msg. Processor Example Scenario (Figure 2)
New OrderStock Quote Request  
Splitter

Iterate Mediator

RouterSwitch Mediator
Widget/Gadget Inventory

Stock Quote Service Instance

Aggregator

Aggregate Mediator

Validated OrderAggregated Stock Quote Response

Environment setup

  1. Download and install WSO2 ESB from http://wso2.com/products/enterprise-service-bus. For a list of prerequisites and step-by-step installation instructions, refer to Installation Guide in the WSO2 ESB documentation.
  2. Start two Sample Axis2 server instances in ports 9000 and 9001. For instructions, refer to the section Setting up the ESB Samples - Starting the Axis2 server in the WSO2 ESB documentation.

ESB configuration

Start the ESB server and log into its management console UI (https: //localhost:9443/carbon ). In the management console, navigate to the Main menu and click Source View in the Service Bus section. Next, copy and paste the following configuration, which helps you explore the example scenario, to the source view.  

<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
    <proxy name="ComposedMessageProxy" startOnLoad="true">
        <target>
            <inSequence>
                <log level="full"/>
                <iterate xmlns:m0="http://services.samples" preservePayload="true" attachPath="//m0:getQuote" expression="//m0:getQuote/m0:request">
                    <target>
                        <sequence>
                            <switch xmlns:m1="http://services.samples/xsd" source="//m1:symbol">
                                <case regex="IBM">
                                    <send>
                                        <endpoint>
                                            <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
                                        </endpoint>
                                    </send>
                                </case>
                                <case regex="WSO2">
                                    <send>
                                        <endpoint>
                                            <address uri="http://localhost:9001/services/SimpleStockQuoteService"/>
                                        </endpoint>
                                    </send>
                                </case>
                                <default>
                                    <drop/>
                                </default>
                            </switch>
                        </sequence>
                    </target>
                </iterate>
            </inSequence>
            <outSequence>
                <aggregate>
                    <completeCondition>
                        <messageCount/>
                    </completeCondition>
                    <onComplete xmlns:m0="http://services.samples" expression="//m0:getQuoteResponse">
                        <send/>
                    </onComplete>
                </aggregate>
            </outSequence>
        </target>
        <publishWSDL uri="file:samples/service-bus/resources/proxy/sample_proxy_1.wsdl"/>
    </proxy>

</definitions>

Simulating the sample scenario

Send the following request using a SOAP client such as SoapUI

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd">
   <soapenv:Header/>
   <soapenv:Body>
      <ser:getQuote>     
         <ser:request>           
            <xsd:symbol>IBM</xsd:symbol>
         </ser:request>
          <ser:request>            
            <xsd:symbol>WSO2</xsd:symbol>
         </ser:request>
         <ser:request>           
            <xsd:symbol>IBM</xsd:symbol>
         </ser:request>
      </ser:getQuote>
   </soapenv:Body>
</soapenv:Envelope>

Note that the three responses are merged together.

SOAP response of Composed Message Processor

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
   <soapenv:Body>
      <ns:getQuoteResponse xmlns:ns="http://services.samples">
         <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:ax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
            <ax21:change>-2.3524349166988148</ax21:change>
            <ax21:earnings>13.039287427943174</ax21:earnings>
            <ax21:high>-147.82215416089954</ax21:high>
            <ax21:last>150.42944098590854</ax21:last>
            <ax21:lastTradeTimestamp>Wed Nov 29 13:10:16 IST 2017</ax21:lastTradeTimestamp>
            <ax21:low>156.00309322666212</ax21:low>
            <ax21:marketCap>4.0704702811974496E7</ax21:marketCap>
            <ax21:name>IBM Company</ax21:name>
            <ax21:open>156.54563823303528</ax21:open>
            <ax21:peRatio>24.744940446245423</ax21:peRatio>
            <ax21:percentageChange>-1.4082628986156709</ax21:percentageChange>
            <ax21:prevClose>167.04515321757532</ax21:prevClose>
            <ax21:symbol>IBM</ax21:symbol>
            <ax21:volume>19856</ax21:volume>
         </ns:return>
      </ns:getQuoteResponse>
      <ns:getQuoteResponse xmlns:ns="http://services.samples">
         <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:ax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
            <ax21:change>4.338630296226518</ax21:change>
            <ax21:earnings>-9.981195004808995</ax21:earnings>
            <ax21:high>-93.23101400112802</ax21:high>
            <ax21:last>93.78054915571116</ax21:last>
            <ax21:lastTradeTimestamp>Wed Nov 29 13:10:16 IST 2017</ax21:lastTradeTimestamp>
            <ax21:low>-91.67657115464598</ax21:low>
            <ax21:marketCap>1.2048601227631677E7</ax21:marketCap>
            <ax21:name>IBM Company</ax21:name>
            <ax21:open>97.71302391406073</ax21:open>
            <ax21:peRatio>23.753214298084966</ax21:peRatio>
            <ax21:percentageChange>-4.997221061621991</ax21:percentageChange>
            <ax21:prevClose>-86.8208598884415</ax21:prevClose>
            <ax21:symbol>IBM</ax21:symbol>
            <ax21:volume>7120</ax21:volume>
         </ns:return>
      </ns:getQuoteResponse>
      <ns:getQuoteResponse xmlns:ns="http://services.samples">
         <ns:return xsi:type="ax21:GetQuoteResponse" xmlns:ax21="http://services.samples/xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
            <ax21:change>-2.8758173622682595</ax21:change>
            <ax21:earnings>13.587800448945982</ax21:earnings>
            <ax21:high>-60.60915794654595</ax21:high>
            <ax21:last>61.521647372674394</ax21:last>
            <ax21:lastTradeTimestamp>Wed Nov 29 13:10:16 IST 2017</ax21:lastTradeTimestamp>
            <ax21:low>63.7318273151116</ax21:low>
            <ax21:marketCap>1319867.136948647</ax21:marketCap>
            <ax21:name>WSO2 Company</ax21:name>
            <ax21:open>64.03367826249405</ax21:open>
            <ax21:peRatio>23.802458318708982</ax21:peRatio>
            <ax21:percentageChange>-4.319528792063117</ax21:percentageChange>
            <ax21:prevClose>66.57710830756369</ax21:prevClose>
            <ax21:symbol>WSO2</ax21:symbol>
            <ax21:volume>16990</ax21:volume>
         </ns:return>
      </ns:getQuoteResponse>
   </soapenv:Body>
</soapenv:Envelope>


How the implementation works

Let's investigate the elements of the ESB configuration in detail. The line numbers below are mapped with the ESB configuration shown above.

  • iterate [line 7 in ESB config] - The Iterate mediator takes each child element of the element specified in its XPath expression and applies the sequence flow inside the Iterate mediator. In this example, it takes each getQuote request specified in the incoming request and forwards this request to the target endpoint. 
  • switch [line 13 in ESB config] - Observes the message and filters out the message content to the given XPath expression.
  • case [line 14 in ESB config] - The filtered content will be tallied with the given regular expression. 
  • send [line 15 in ESB config] - When a matching case is found, the Send mediator will route the message to the endpoint indicated in the address URI.
  • aggregate [line 37 in ESB config] - The Aggregate mediator merges together the response messages for requests made by the Iterate or Clone mediators. The completion condition specifies the minimum or maximum number of messages to be collected. When all messages are aggregated, the sequence inside onComplete is run. 
  • No labels