All docs This doc
Skip to end of metadata
Go to start of metadata

Currently, you can use this resequencer pattern only through the Management Console of WSO2 Enterprise Service Bus (ESB) 4.9.0 via the WSO2-CARBON-PATCH-4.4.0-1935 WUM Update. For more information on how to update your WSO2 product distribution using WUM, see Updating WSO2 Products. In the future, this will be implemented with Tooling support in later WSO2 ESB versions.

The following topics explain, through an example scenario, how you can implement the Resequencer EIP using WSO2 ESB. 

Introduction to Resequencer

The Resequencer EIP puts a stream of related but out-of-sequence messages back into the correct order. It collects and reorders messages so that they can be published to the output channel in a specific order. For more information, go to Enterprise Integration Patterns Documentation.

resequencer pattern

Example scenario

Messages in the ESB usually take different mediation paths due to constructs like routers and filters. The time taken to process each message varies depending on the characteristics of each router. Some messages might arrive earlier than others, which can be disadvantageous in situations where the order of message delivery is important. To overcome this issue, WSO2 ESB introduces this resequencer EIP.

This example scenario demonstrates WSO2 ESB collecting incoming messages using a message store and resequencing or reordering them based on a definitive sequence number derived from some part of the message.

The diagram below depicts how to simulate the example scenario using the WSO2 ESB.

resequencer example scenario

The following is a comparison of the core components of the pattern explaining the relationship between this example scenario and the Resequencer EIP.


Resequencer EIPResequencer Example Secnario
Incoming messagesSimple Stock requests
ResequencerResequence Message Store with any Message Processor
Outgoing messagesSimple Stock Quote requests

The ESB configuration

Given below is the ESB configuration for simulating the example scenario explained above.

<definiitons>
<messageStore xmlns="http://ws.apache.org/ns/synapse"
              class="org.apache.synapse.message.store.impl.resequencer.ResequenceMessageStore"
              name="RStore">
   <parameter name="store.resequence.timeout">-1</parameter>
   <parameter name="store.producer.guaranteed.delivery.enable">false</parameter>
   <parameter name="store.failover.message.store.name">RStore</parameter>
   <parameter xmlns:m0="http://services.samples"
              name="store.resequence.id.path"
              expression="substring-after(//m0:placeOrder/m0:order/m0:symbol,'-')"/>
   <parameter name="store.jdbc.password">root</parameter>
   <parameter name="store.jdbc.driver">com.mysql.jdbc.Driver</parameter>
   <parameter name="store.jdbc.username">root</parameter>
   <parameter name="store.jdbc.connection.url">jdbc:mysql://localhost:3306/resequenceDB</parameter>
   <parameter name="store.jdbc.table">tbl_resequence</parameter>
</messageStore>
<messageProcessor
        class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"
        messageStore="RStore" name="ScheduledProcessor" targetEndpoint="StockQuoteServiceEp">
        <parameter name="max.delivery.drop">Disabled</parameter>
        <parameter name="client.retry.interval">1000</parameter>
        <parameter name="max.delivery.attempts">5</parameter>
        <parameter name="member.count">1</parameter>
        <parameter name="interval">1000</parameter>
        <parameter name="throttle">false</parameter>
        <parameter name="target.endpoint">StockQuoteServiceEp</parameter>
        <parameter name="message.processor.fault.sequence">fault</parameter>
        <parameter name="is.active">true</parameter>
</messageProcessor> 
<endpoint name="StockQuoteServiceEp">
        <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
</endpoint> 
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="MessageStoreProxy"
       startOnLoad="true"
       statistics="disable"
       trace="disable"
       transports="https,http">
   <target>
      <inSequence>
         <property name="FORCE_SC_ACCEPTED" scope="axis2" value="true"/>
         <property name="OUT_ONLY" value="true"/>
         <log level="custom">
            <property name="Message Store Invoked" value="true"/>
         </log>
         <log level="full"/>
         <store messageStore="RStore"/>
      </inSequence>
   </target>
   <publishWSDL uri="http://localhost:9000/services/SimpleStockQuoteService?wsdl"/>
   <description/>
</proxy>                   
</definitions>

The configuration elements

The elements used in the above ESB configuration are explained below.

  • <proxy> - This is the proxy service that should be invoked to execute the configuration. 
  • <inSequence> - A message is first received by the proxy service, and then directed to this sequence.
  • <outSequence> - This sequence is triggered after the execution of the <inSequence>.
  • <messageStore> - This collects and re-orders the stored messages based on a defined sequence.
  • <messageProcessor> - This listens to the Message Store and dispatches the message to the specified endpoint. 
  • <endpoint> - The destination to which, the re-ordered the message will be sent.

Simulating the example scenario

The following sections demonstrate how you can try out the example scenario explained above.

Setting up the environment

You need to set up the ESB, and the back-end service:

  1. To deploy the back-end service SimpleStockQuoteService, navigate to the <ESB_HOME>/samples/axis2Server/src/SimpleStockQuoteService directory and execute the ant command in a new Console tab. 

    For more information on deploying sample back-end services, go to Deploying sample back-end services in WSO2 ESB documentation.

  2. To start an Axis2 server instance, navigate to the <ESB_HOME>/samples/axis2Server directory and execute the corresponding command in a Console tab. 

    • On Windows: axis2server.bat
    • On Linux/Solaris: ./axis2server.sh

    Start only one instance of the back-end Axis2 service (Stock Quote Service) to simulate this example.

  3. Follow the steps below to set up the database.
    • Add the relevant database drivers into the <ESB_HOME>/repository/components/lib directory.

    • Name the database as resequenceDB.

    • If you are setting up a MySQL database, execute the following DB script, to create the required table in the database.

      You can create a similar script based on the database you want to set up.

      CREATE TABLE `tbl_lastprocessid` (
        `statement` varchar(20) NOT NULL,
        `seq_id` bigint(20) NOT NULL,
        PRIMARY KEY (`statement`),
        KEY `seq_id` (`seq_id`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
      
      CREATE TABLE `tbl_resequence` (
        `indexId` bigint(20) NOT NULL AUTO_INCREMENT,
        `msg_id` varchar(200) NOT NULL,
        `seq_id` bigint(20) NOT NULL,
        `message` blob NOT NULL,
        PRIMARY KEY (`indexId`),
        KEY `msg_id` (`msg_id`),
        KEY `seq_id` (`seq_id`)
      ) ENGINE=InnoDB AUTO_INCREMENT=59 DEFAULT CHARSET=utf8;
  4. Download the ResquencerCapp_1.0.0.car file, and upload it to WSO2 ESB. 

  5. Run the WSO2 ESB server. For instructions, see Running WSO2 ESB via Tooling in WSO2 ESB Documentation.

  6. Log in to the Management Console, click Main, and then click Add in the Carbon Applications menu.

  7. Browse and select the ResquencerCapp_1.0.0.car file you downloaded, and click Upload.

Creating the artifacts

You can skip this section if you are using the ResquencerCapp_1.0.0.car file to try out this example scenario. This file includes these artifacts, and thereby, you need not create the artifacts manually.

Follow the steps below to create the artifacts of the ESB configuration above.

  1. Use the configuration below to create a Resequence Message Store. For instructions, see Resequence Message Store in WSO2 ESB Documentation.

    <?xml version="1.0" encoding="UTF-8"?>
    <messageStore xmlns="http://ws.apache.org/ns/synapse"
                  class="org.apache.synapse.message.store.impl.resequencer.ResequenceMessageStore"
                  name="RStore">
       <parameter name="store.resequence.timeout">-1</parameter>
       <parameter name="store.producer.guaranteed.delivery.enable">false</parameter>
       <parameter name="store.failover.message.store.name">RStore</parameter>
       <parameter xmlns:m0="http://services.samples"
                  name="store.resequence.id.path"
                  expression="substring-after(//m0:placeOrder/m0:order/m0:symbol,'-')"/>
       <parameter name="store.jdbc.password">root</parameter>
       <parameter name="store.jdbc.driver">com.mysql.jdbc.Driver</parameter>
       <parameter name="store.jdbc.username">root</parameter>
       <parameter name="store.jdbc.connection.url">jdbc:mysql://localhost:3306/resequenceDB</parameter>
       <parameter name="store.jdbc.table">tbl_resequence</parameter>
    </messageStore>
  2. Use the configuration below to create a Scheduled Message Forwarding Processor, which will listen to the Resequence Message Store and will dispatch the message to the relevant endpoint. For instructions, see Scheduled Message Forwarding Processor in WSO2 ESB Documentation.

    Specify the vale of the member.count property as 1. Else, if multiple message processors query from the Store in parallel, messages will be duplicated. Also, it affects the re-sequencing order.

    <messageProcessor
            class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"
            messageStore="RStore" name="ScheduledProcessor" targetEndpoint="StockQuoteServiceEp">
            <parameter name="max.delivery.drop">Disabled</parameter>
            <parameter name="client.retry.interval">1000</parameter>
            <parameter name="max.delivery.attempts">5</parameter>
            <parameter name="member.count">1</parameter>
            <parameter name="interval">1000</parameter>
            <parameter name="throttle">false</parameter>
            <parameter name="target.endpoint">StockQuoteServiceEp</parameter>
            <parameter name="message.processor.fault.sequence">fault</parameter>
            <parameter name="is.active">true</parameter>
        </messageProcessor>
  3. Use the configuration below to create an Address Endpoint to which, the ordered message should be dispatched. For instructions, see Address Endpoint in WSO2 ESB Documentation.

    <endpoint name="StockQuoteServiceEp">
            <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
    </endpoint>
  4. Use the configuration below to create a Custom Proxy Service, which will accept incoming requests and add messages into the Resequence Message Store. For instructions, see Adding a Proxy Service in WSO2 ESB Documentation.

    <?xml version="1.0" encoding="UTF-8"?>
    <proxy xmlns="http://ws.apache.org/ns/synapse"
           name="MessageStoreProxy"
           startOnLoad="true"
           statistics="disable"
           trace="disable"
           transports="https,http">
       <target>
          <inSequence>
             <property name="FORCE_SC_ACCEPTED" scope="axis2" value="true"/>
             <property name="OUT_ONLY" value="true"/>
             <log level="custom">
                <property name="Message Store Invoked" value="true"/>
             </log>
             <log level="full"/>
             <store messageStore="RStore"/>
          </inSequence>
       </target>
       <publishWSDL uri="http://localhost:9000/services/SimpleStockQuoteService?wsdl"/>
       <description/>
    </proxy>
    

    The above proxy service accepts the incoming messages and inserts them into the Resequencing Message Store, which will re-order the messages if they are not in order. The incoming message expected by the proxy service will look like below.

    <?xml version="1.0" encoding="utf-8"?>
    <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
    <soapenv:Header xmlns:wsa="http://www.w3.org/2005/08/addressing">
    <wsa:To>http://localhost:8280/services/MessageStoreProxy</wsa:To>
    <wsa:ReplyTo>
        <wsa:Address>http://www.w3.org/2005/08/addressing/none</wsa:Address>
    </wsa:ReplyTo>
    <wsa:MessageID>urn:uuid:82532168-db9f-4d4b-a48d-1727af42935e</wsa:MessageID>
    <wsa:Action>urn:placeOrder</wsa:Action>
    </soapenv:Header>
    <soapenv:Body>
        <m0:placeOrder xmlns:m0="http://services.samples">
        <m0:order>
            <m0:price>70.74859146053822</m0:price>
            <m0:quantity>5815</m0:quantity>
            <m0:symbol>WSO2-3</m0:symbol>
        </m0:order>
    </m0:placeOrder>
    </soapenv:Body>
    </soapenv:Envelope>

Executing the sample and analysing the output

Follow the steps below to send a request to the ESB using the Stock Quote Client application. For more information on the Stock Quote Client, go to the ESB documentation.

  1. Open a new terminal, and navigate to the <ESB_HOME>/samples/axis2Client/ directory. The Stock Quote client application is stored in this directory.
  2. Execute the following command to send the first request to the ESB.

    ant stockquote -Daddurl=http://localhost:8280/services/MessageStoreProxy -Dmode=placeorder -Dsymbol=WSO2-1

    You view the following output printed in the Axis2 Server Console:

    samples.services.SimpleStockQuoteService  :: Accepted order #xx for : xx stocks of WSO2-1 at $ xxxxxx
  3. Execute the following command to send the second request to the ESB.

    ant stockquote -Daddurl=http://localhost:8280/services/MessageStoreProxy -Dmode=placeorder -Dsymbol=WSO2-4

    You will not view any output printed in the Axis2 Server Console.

    This is because it is the fourth request you sent, which is denoted by -Dsymbol=WSO2-4, and thereby, the Store waits the time defined for the second message denoted by -Dsymbol=WSO2-2 to arrive.

  4. Execute the following command to send the third request to the ESB.

    ant stockquote -Daddurl=http://localhost:8280/services/MessageStoreProxy -Dmode=placeorder -Dsymbol=WSO2-3

    You will not view any output printed in the Axis2 Server Console.

    This is because it is the third request you sent, which is denoted by -Dsymbol=WSO2-3, and thereby, the Store waits the time defined for the second message denoted by -Dsymbol=WSO2-2 to arrive.

  5. Execute the following command to send the fourth request to the ESB.

    ant stockquote -Daddurl=http://localhost:8280/services/MessageStoreProxy -Dmode=placeorder -Dsymbol=WSO2-2

    You view the following output printed, based on the proper sequence in the Axis2 Server Console:

    samples.services.SimpleStockQuoteService  :: Accepted order #xx for : xx stocks of WSO2-1 at $ xxxxxx
    samples.services.SimpleStockQuoteService  :: Accepted order #xx for : xx stocks of WSO2-2 at $ xxxxxx
    samples.services.SimpleStockQuoteService  :: Accepted order #xx for : xx stocks of WSO2-3 at $ xxxxxx
    samples.services.SimpleStockQuoteService  :: Accepted order #xx for : xx stocks of WSO2-4 at $ xxxxxx
  • No labels