This documentation is for WSO2 Enterprise Integrator version 6.1.1 . View documentation for the latest release.

All docs This doc
Skip to end of metadata
Go to start of metadata

Note that WSO2 EI is shipped with the following changes to what is mentioned in this documentation :

  • <PRODUCT_HOME>/ repository/samples/ directory that includes all Integration profile samples is changed to <EI_HOME>/ samples/service-bus/.
  • <PRODUCT_HOME>/ repository/samples/resources/ directory that includes all artifacts related to the Integration profile samples is changed to <EI_HOME>/ samples/service-bus/resources/.

Introduction

This sample demonstrates how one way message bridging from Kafka to HTTP can be done using the inbound kafka endpoint.

Prerequisites

  • Download and install Apache Kafka. For more information, see Apache Kafka documentation
  • Copy the following client libraries from the <KAFKA_HOME>/lib directory to the <EI_HOME>/lib directory.

    • kafka_2.9.2-0.8.1.1.jar
    • scala-library-2.9.2.jar
    • zkclient-0.3.jar
    • zookeeper-3.3.4.jar
    • metrics-core-2.2.0.jar

    Note

    • If you are using kafka_2.x.x-0.8.2.0 or later, you also need to add the kafka-clients-0.8.x.x.jar file to the <EI_HOME>/lib  directory.
    • If you are using a newer version of ZooKeeper, follow the steps below:

        1. Create a directory named conf inside the <EI_HOME>/repository directory.
        2. Create a directory named identity inside the <EI_HOME>/repository/conf directory.
        3. Add the jaas.conf file to the <EI_HOME>/repository/conf/identity directory. This is required because Kerberos authentication is enforced on newer versions of ZooKeeper.
  • Run the following command to start the ZooKeeper server:

    bin/zookeeper-server-start.sh config/zookeeper.properties

    You will see the following log:

  • Run the following command to start the Kafka server

    bin/kafka-server-start.sh config/server.properties

    You will see the following log:

Building the sample

The XML configuration for this sample is as follows:

<definitions xmlns="http://ws.apache.org/ns/synapse">
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
 name="KAFKAListenerEP"
 sequence="TestIn"
 onError="fault"
 protocol="kafka"
 suspend="false">
 <parameters>
     <parameter name="interval">10</parameter>
     <parameter name="consumer.type">highlevel</parameter>
     <parameter name="content.type">application/xml</parameter>
     <parameter name="coordination">false</parameter>
     <parameter name="sequential">false</parameter>
     <parameter name="topics">test</parameter>
     <parameter name="zookeeper.connect">localhost:2181</parameter>
     <parameter name="group.id">test-1</parameter>
 </parameters>
</inboundEndpoint>
 
<sequence xmlns="http://ws.apache.org/ns/synapse" name="TestIn">
 <log level="full"/>
 <drop/>
</sequence>
 
</definitions>

This configuration file synapse_sample_904.xml is available in the <EI_HOME>/samples/service-bus directory.

To build the sample

  • Start the ESB with the sample 904 configuration. For instructions on starting a sample ESB configuration, see Starting the ESB with a sample configuration.
    The operation log keeps running until the server starts, which usually takes several seconds. Wait until the server has fully booted up and displays a message similar to "WSO2 Carbon started in n seconds."

Executing the sample

  • Run the following on the Kafka command line to create a topic named test with a single partition and only one replica:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • Run the following on the Kafka command line to send a message to the Kafka brokers. You can also use the WSO2 ESB Kafka producer connector to send the message to the Kafka brokers.

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Analyzing the output


You will see the following Message content:

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsa="http://www.w3.org/2005/08/addressing"><soapenv:Body><m0:getQuote xmlns:m0="http://services.samples">
<m0:request><m0:symbol>IBM</m0:symbol></m0:request></m0:getQuote></soapenv:Body></soapenv:Envelope>

The Kafka inbound gets the messages from the Kafka brokers and logs the messages in the ESB


  • No labels