All docs This doc

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The publishMessage operation Go to the tab of the Kafka version that you use:

Localtab Group
Localtab
titleKafka 2.12-0.11.0.0

The publishMessages operation allows you to publish messages to the Kafka

...

brokers via Kafka topics.

Code Block
languagexml
titlepublishMessages
<kafkaTransport.publishMessages>
    <!-- Name of the topic-->
    <topic>topicName</topic>
    <!-- The partition number of the topic-->
    <partitionNo>partitionNo</partitionNo>
</kafkaTransport.publishMessages>
Properties
  • topic: The name of the topic.

Sample scenario

...

Info

If required, you can add custom headers to the records in the publishMessage operation:

Code Block
languagexml
<topic.Content-Type>Value</topic.Content-Type>

You can add the parameter as follows in the publishMessage operation:

Code Block
languagexml
<kafkaTransport.publishMessage configKey="kafka_init">
    <topic>topicName</topic>
    <partitionNo>partitionNo</partitionNo>
    <topicName.Content-Type>Value</topicName.Content-Type>
</kafkaTransport.publishMessage>

Anchor
KafkaConnectorSample
KafkaConnectorSample
Sample scenario

Given below is a sample scenario that demonstrates how to send messages to a Kafka broker via Kafka topics.

Prerequisites
  • Run the following command to start the ZooKeeper server:

    Code Block
    bin/zookeeper-server-start.sh config/zookeeper.properties

    You see the following log:
    Image Added

  • Run the following command to start the Kafka server:

    Code Block
    bin/kafka-server-start.sh config/server.properties

    You will see the following log:
    Image Added

Building the sample

Given below is a sample proxy service that illustrates how you can connect to a Kakfa broker with the init operation and then use the publishMessages operation to publish messages via the topic without security.

Anchor
withoutSecurity
withoutSecurity

Code Block
languagexml
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="KafkaTransport"
       transports="https http"
       startOnLoad="true"
       trace="disable">
   <description/>
   <target>
        <inSequence>
     	<kafkaTransport.init>
        	<bootstrapServers>localhost:9092</bootstrapServers>
        	<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>                                                                                              
        	<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass> 
            <maxPoolSize>100</maxPoolSize>                                                                                       
     	</kafkaTransport.init>
     	<kafkaTransport.publishMessages>
        	<topic>test</topic>
     	</kafkaTransport.publishMessages>
  	</inSequence>
   </target>
</proxy>  

Anchor
withSecurity
withSecurity

Given below is a sample proxy service that illustrates how you can connect to a Kakfa broker with the init operation and then use the publishMessages operation to publish messages via the topic with security.

Code Block
languagexml
titleproxy with security
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="testKafka"
       startOnLoad="true"
       statistics="disable"
       trace="disable"
       transports="http,https">
   <target>
      <inSequence>
         <kafkaTransport.init>
            <bootstrapServers>localhost:9092</bootstrapServers>
            <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
            <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
            <securityProtocol>SSL</securityProtocol>
			<sslTruststoreLocation>{path-to-certificate}/kafka.server.truststore.jks</sslTruststoreLocation>
            <sslTruststorePassword>test1234</sslTruststorePassword>
            <sslKeystoreLocation>{path-to-certificate}/kafka.server.keystore.jks</sslKeystoreLocation>
            <sslKeystorePassword>test1234</sslKeystorePassword>
            <sslKeyPassword>test1234</sslKeyPassword>
	    	<maxPoolSize>100</maxPoolSize>
         </kafkaTransport.init>
         <kafkaTransport.publishMessages>
            <topic>test</topic>
         </kafkaTransport.publishMessages>
      </inSequence>
   </target>
   <description/>
</proxy>

Given below is a sample proxy service that illustrates how to acknowledge the backend with Kafka metadata (topic, partition, offset) in order to check the success response with the publishMessages operation.

Here, the faultHandlerSeq custom sequence is used to acknowledge the backend with error status on failure cases.

Code Block
languagexml
titleproxy with success or failiure status
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="KafkaTransport"
       startOnLoad="true"
       statistics="disable"
       trace="disable"
       transports="http,https">
   <target faultSequence="faultHandlerSeq">
      <inSequence>
         <kafkaTransport.init>
            <bootstrapServers>localhost:9092</bootstrapServers>
            <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
            <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
            <maxPoolSize>50</maxPoolSize>
            <requestTimeout>10000</requestTimeout>
            <acks>all</acks>
            <timeout>8000</timeout>
            <metadataFetchTimeout>5000</metadataFetchTimeout>
         </kafkaTransport.init>
         <kafkaTransport.publishMessages>
            <topic>test</topic>
         </kafkaTransport.publishMessages>
         <payloadFactory media-type="json">
            <format> 
                {"topic":"$1", "partition":"$2", "offset":"$3"}
          </format>
            <args>
               <arg evaluator="xml" expression="$ctx:topic"/>
               <arg evaluator="xml" expression="$ctx:partition"/>
               <arg evaluator="xml" expression="$ctx:offset"/>
            </args>
         </payloadFactory>
         <property name="messageType" scope="axis2" value="application/json"/>
         <respond/>
      </inSequence>
   </target>
   <description/>
</proxy>

Executing the sample

Send a message to the Kafka broker using a sample client. The following digram illustrates how to send a message using Jmeter:

Image Added

Analyzing the output

Run the following command to verify the messages:

Code Block
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

You see the following message content:

Code Block
{"name":"sample"}

This demonstrates how the Kafka connector publishes messages to the Kafka brokers.

Localtab
titleKafka 2.9.2-0.8.1.1

The publishMessage operation allows you to publish messages to Kafka brokers using Kafka topics.

Code Block
languagexml
titlepublishMessages
<kafkaTransport.publishMessages>
	<topic>topicName</topic>
</kafkaTransport.publishMessages>

Sample scenario

Given below is a sample scenario that demonstrates how to send messages to a Kafka broker using Kafka topics.

Prerequisites

...

  •  directory to the <ESB_HOME>/repository/components/lib directory.

    • kafka_2.9.2-0.8.1.1.jar
    • scala-library-2.9.2.jar
    • zkclient-0.3.jar
    • zookeeper-3.3.4.jar
    • metrics-core-2.2.0.jar
  • Run the following command to start the ZooKeeper server:

    Code Block
    bin/zookeeper-server-start.sh config/zookeeper.properties

    You

...

  • see the following log:
    Image Modified

  • Run the following command to start the Kafka server:

    Code Block
    bin/kafka-server-start.sh config/server.properties

    You

...

  • see the following log:
    Image Modified

Building the sample

...

Given below is a sample proxy service that illustrates how you can connect to a Kakfa broker with

...

the init

...

 operation and use the publishMessages

...

 operation to publish the messages with the topic.

Code Block
languagexml
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="KafkaTransport"
       transports="https http"
       startOnLoad="true"
       trace="disable">
   <description/>
   <target>
      <inSequence>
         <kafkaTransport.init>
            <brokerList>localhost:9092</brokerList>
         </kafkaTransport.init>
         <kafkaTransport.publishMessages>
            <topic>test</topic>
         </kafkaTransport.publishMessages>
      </inSequence>
   </target>
</proxy>   

...

Given below is a sample client (JMeter) screen that displays how to send a message:

Image Modified

Analyzing the output

Run the following command to verify the messages:

Code Block
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

You

...

see the following message content:

...

<name>sample</name>.

...

This demonstrates that the Kafka connector publishes messages to Kafka brokers.