All docs This doc

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Localtab Group
Localtab
titleKafka 2.12-0.11.0.0

The publishMessages operation allows you to publish messages to Kafka brokers via Kafka topics.

Code Block
languagexml
titlepublishMessages
<kafkaTransport.publishMessages>
    <topic>topicName</topic>
    <partitionNo>partitionNo</partitionNo>
</kafkaTransport.publishMessages>
Properties
  • topic: The name of the topic.
  • partitionNo: The partition number of the topic.

Info

If required, you can add custom headers to the records in publishMessage operation:

Code Block
languagexml
<topic.Content-Type>Value</topic.Content-Type>

You can add the parameter as follows in the publishMessage operation:

Code Block
languagexml
<kafkaTransport.publishMessage configKey="kafka_init">
    <topic>topicName</topic>
    <partitionNo>partitionNo</partitionNo>
    <topicName.Content-Type>Value</topicName.Content-Type>
</kafkaTransport.publishMessage>

Anchor
KafkaConnectorSample
KafkaConnectorSample
Sample scenario

Following is a sample scenario that demonstrates how to send messages to a Kafka broker via Kafka topics.

Prerequisites
  • Run the following command to start the ZooKeeper server:

    Code Block
    bin/zookeeper-server-start.sh config/zookeeper.properties

    You will see the following log:

  • Run the following command to start the Kafka server:

    Code Block
    bin/kafka-server-start.sh config/server.properties

    You will see the following log:

Building the sample

Following is a sample proxy service that illustrates how you can connect to a Kakfa broker with the init operation, and then use the publishMessages operation to publish messages via the topic without security.

Anchor
withoutSecurity
withoutSecurity

Code Block
languagexml
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="KafkaTransport"
       transports="https http"
       startOnLoad="true"
       trace="disable">
   <description/>
   <target>
        <inSequence>
     	<kafkaTransport.init>
        	<bootstrapServers>localhost:9092</bootstrapServers>
        	<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>                                                                                              
        	<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass> 
            <maxPoolSize>100</maxPoolSize>                                                                                       
     	</kafkaTransport.init>
     	<kafkaTransport.publishMessages>
        	<topic>test</topic>
     	</kafkaTransport.publishMessages>
  	</inSequence>
   </target>
</proxy>  

Anchor
withSecurity
withSecurity

Following is a sample proxy service that illustrates how you can connect to a Kakfa broker with the init operation, and then use the publishMessages operation to publish messages via the topic with security.

Code Block
languagexml
titleproxy with security
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="testKafka"
       startOnLoad="true"
       statistics="disable"
       trace="disable"
       transports="http,https">
   <target>
      <inSequence>
         <kafkaTransport.init>
            <bootstrapServers>localhost:9092</bootstrapServers>
            <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
            <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
            <securityProtocol>SSL</securityProtocol>
			<sslTruststoreLocation>{path-to-certificate}/kafka.server.truststore.jks</sslTruststoreLocation>
            <sslTruststorePassword>test1234</sslTruststorePassword>
            <sslKeystoreLocation>{path-to-certificate}/kafka.server.keystore.jks</sslKeystoreLocation>
            <sslKeystorePassword>test1234</sslKeystorePassword>
            <sslKeyPassword>test1234</sslKeyPassword>
	    	<maxPoolSize>100</maxPoolSize>
         </kafkaTransport.init>
         <kafkaTransport.publishMessages>
            <topic>test</topic>
         </kafkaTransport.publishMessages>
      </inSequence>
   </target>
   <description/>
</proxy>

Following is a sample proxy service that illustrates how to acknowledge the back-end with Kafka meta data information (topic, partition, offset) in-order to check the success response with publishMessages operation.

Here the faultHandlerSeq custom sequence is used to acknowledge the back-end with error status up on failure cases.

Code Block
languagexml
titleproxy with success or failiure status
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="KafkaTransport"
       startOnLoad="true"
       statistics="disable"
       trace="disable"
       transports="http,https">
   <target faultSequence="faultHandlerSeq">
      <inSequence>
         <kafkaTransport.init>
            <bootstrapServers>localhost:9092</bootstrapServers>
            <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
            <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
            <maxPoolSize>50</maxPoolSize>
            <requestTimeout>10000</requestTimeout>
            <acks>all</acks>
            <timeout>8000</timeout>
            <metadataFetchTimeout>5000</metadataFetchTimeout>
         </kafkaTransport.init>
         <kafkaTransport.publishMessages>
            <topic>test</topic>
         </kafkaTransport.publishMessages>
         <payloadFactory media-type="json">
            <format> 
                {"topic":"$1", "partition":"$2", "offset":"$3"}
          </format>
            <args>
               <arg evaluator="xml" expression="$ctx:topic"/>
               <arg evaluator="xml" expression="$ctx:partition"/>
               <arg evaluator="xml" expression="$ctx:offset"/>
            </args>
         </payloadFactory>
         <property name="messageType" scope="axis2" value="application/json"/>
         <respond/>
      </inSequence>
   </target>
   <description/>
</proxy>
Executing the sample

Send a message to the Kafka broker using a sample client.

The following digram illustrates how to send a message using Jmeter:

Analyzing the output

Run the following command to verify the messages:

Code Block
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

You will see the following message content:

Code Block
{"name":"sample"}

This demonstrates how the Kafka connector publishes messages to Kafka brokers.

Localtab
titleKafka 2.9.2-0.8.1.1

The publishMessage operation allows you to publish messages to Kafka brokers using Kafka topics.

Code Block
languagexml
titlepublishMessages
<kafkaTransport.publishMessages>
	<topic>topicName</topic>
</kafkaTransport.publishMessages>
Properties
  • topic: The name of the topic.

Sample scenario

Following is a sample scenario that demonstrates how to send messages to a Kafka broker using Kafka topics.

Prerequisites
  • Download and install Apache Kafka. For more information, see Apache Kafka documentation
  • Copy the following client libraries from the <KAFKA_HOME>/lib directory to the <ESB_HOME>/repository/components/lib directory.

    • kafka_2.9.2-0.8.1.1.jar
    • scala-library-2.9.2.jar
    • zkclient-0.3.jar
    • zookeeper-3.3.4.jar
    • metrics-core-2.2.0.jar
  • Run the following command to start the ZooKeeper server:

    Code Block
    bin/zookeeper-server-start.sh config/zookeeper.properties

    You will see the following log:

  • Run the following command to start the Kafka server:

    Code Block
    bin/kafka-server-start.sh config/server.properties

    You will see the following log:

Building the sample

Following is a sample proxy service that illustrates how you can connect to a Kakfa broker with the init operation and use the publishMessages operation to publish the messages with the topic.

Code Block
languagexml
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="KafkaTransport"
       transports="https http"
       startOnLoad="true"
       trace="disable">
   <description/>
   <target>
      <inSequence>
         <kafkaTransport.init>
            <brokerList>localhost:9092</brokerList>
         </kafkaTransport.init>
         <kafkaTransport.publishMessages>
            <topic>test</topic>
         </kafkaTransport.publishMessages>
      </inSequence>
   </target>
</proxy>   

Following is a sample client(JMeter) screen that displays how to send a message:

Analyzing the output

  • Run the following command to verify the messages:

    Code Block
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    You will see the following message content:

          <name>sample</name>

         This demonstrates that the Kafka connector publishes messages to Kafka brokers.