All docs This doc
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 26 Next »

Go to the tab of the Kafka version that you use:

The publishMessages operation allows you to publish messages to the Kafka brokers via Kafka topics.

publishMessages
<kafkaTransport.publishMessages>
    <topic>topicName</topic>
    <partitionNo>partitionNo</partitionNo>
</kafkaTransport.publishMessages>
Properties
  • topic: The name of the topic.
  • partitionNo: The partition number of the topic.

If required, you can add custom headers to the records in the publishMessage operation:

<topic.Content-Type>Value</topic.Content-Type>

You can add the parameter as follows in the publishMessage operation:

<kafkaTransport.publishMessage configKey="kafka_init">
    <topic>topicName</topic>
    <partitionNo>partitionNo</partitionNo>
    <topicName.Content-Type>Value</topicName.Content-Type>
</kafkaTransport.publishMessage>

Sample scenario

Given below is a sample scenario that demonstrates how to send messages to a Kafka broker via Kafka topics.

Prerequisites
  • Run the following command to start the ZooKeeper server:

    bin/zookeeper-server-start.sh config/zookeeper.properties

    You see the following log:

  • Run the following command to start the Kafka server:

    bin/kafka-server-start.sh config/server.properties

    You will see the following log:

Building the sample

Given below is a sample proxy service that illustrates how you can connect to a Kakfa broker with the init operation and then use the publishMessages operation to publish messages via the topic without security.

<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="KafkaTransport"
       transports="https http"
       startOnLoad="true"
       trace="disable">
   <description/>
   <target>
        <inSequence>
     	<kafkaTransport.init>
        	<bootstrapServers>localhost:9092</bootstrapServers>
        	<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>                                                                                              
        	<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass> 
            <maxPoolSize>100</maxPoolSize>                                                                                       
     	</kafkaTransport.init>
     	<kafkaTransport.publishMessages>
        	<topic>test</topic>
     	</kafkaTransport.publishMessages>
  	</inSequence>
   </target>
</proxy>  

Given below is a sample proxy service that illustrates how you can connect to a Kakfa broker with the init operation and then use the publishMessages operation to publish messages via the topic with security.

proxy with security
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="testKafka"
       startOnLoad="true"
       statistics="disable"
       trace="disable"
       transports="http,https">
   <target>
      <inSequence>
         <kafkaTransport.init>
            <bootstrapServers>localhost:9092</bootstrapServers>
            <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
            <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
            <securityProtocol>SSL</securityProtocol>
			<sslTruststoreLocation>{path-to-certificate}/kafka.server.truststore.jks</sslTruststoreLocation>
            <sslTruststorePassword>test1234</sslTruststorePassword>
            <sslKeystoreLocation>{path-to-certificate}/kafka.server.keystore.jks</sslKeystoreLocation>
            <sslKeystorePassword>test1234</sslKeystorePassword>
            <sslKeyPassword>test1234</sslKeyPassword>
	    	<maxPoolSize>100</maxPoolSize>
         </kafkaTransport.init>
         <kafkaTransport.publishMessages>
            <topic>test</topic>
         </kafkaTransport.publishMessages>
      </inSequence>
   </target>
   <description/>
</proxy>

Given below is a sample proxy service that illustrates how to acknowledge the backend with Kafka metadata (topic, partition, offset) in order to check the success response with the publishMessages operation.

Here, the faultHandlerSeq custom sequence is used to acknowledge the backend with error status on failure cases.

proxy with success or failiure status
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="KafkaTransport"
       startOnLoad="true"
       statistics="disable"
       trace="disable"
       transports="http,https">
   <target faultSequence="faultHandlerSeq">
      <inSequence>
         <kafkaTransport.init>
            <bootstrapServers>localhost:9092</bootstrapServers>
            <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
            <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
            <maxPoolSize>50</maxPoolSize>
            <requestTimeout>10000</requestTimeout>
            <acks>all</acks>
            <timeout>8000</timeout>
            <metadataFetchTimeout>5000</metadataFetchTimeout>
         </kafkaTransport.init>
         <kafkaTransport.publishMessages>
            <topic>test</topic>
         </kafkaTransport.publishMessages>
         <payloadFactory media-type="json">
            <format> 
                {"topic":"$1", "partition":"$2", "offset":"$3"}
          </format>
            <args>
               <arg evaluator="xml" expression="$ctx:topic"/>
               <arg evaluator="xml" expression="$ctx:partition"/>
               <arg evaluator="xml" expression="$ctx:offset"/>
            </args>
         </payloadFactory>
         <property name="messageType" scope="axis2" value="application/json"/>
         <respond/>
      </inSequence>
   </target>
   <description/>
</proxy>
Executing the sample

Send a message to the Kafka broker using a sample client. The following digram illustrates how to send a message using Jmeter:

Analyzing the output

Run the following command to verify the messages:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

You see the following message content:

{"name":"sample"}

This demonstrates how the Kafka connector publishes messages to the Kafka brokers.

The publishMessage operation allows you to publish messages to Kafka brokers using Kafka topics.

publishMessages
<kafkaTransport.publishMessages>
	<topic>topicName</topic>
</kafkaTransport.publishMessages>
Properties
  • topic: The name of the topic.

Sample scenario

Given below is a sample scenario that demonstrates how to send messages to a Kafka broker using Kafka topics.

Prerequisites
  • Download and install Apache Kafka. For more information, see Apache Kafka documentation
  • Copy the following client libraries from the <KAFKA_HOME>/lib directory to the <ESB_HOME>/repository/components/lib directory.

    • kafka_2.9.2-0.8.1.1.jar
    • scala-library-2.9.2.jar
    • zkclient-0.3.jar
    • zookeeper-3.3.4.jar
    • metrics-core-2.2.0.jar
  • Run the following command to start the ZooKeeper server:

    bin/zookeeper-server-start.sh config/zookeeper.properties

    You see the following log:

  • Run the following command to start the Kafka server:

    bin/kafka-server-start.sh config/server.properties

    You see the following log:

Building the sample

Given below is a sample proxy service that illustrates how you can connect to a Kakfa broker with the init operation and use the publishMessages operation to publish the messages with the topic.

<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="KafkaTransport"
       transports="https http"
       startOnLoad="true"
       trace="disable">
   <description/>
   <target>
      <inSequence>
         <kafkaTransport.init>
            <brokerList>localhost:9092</brokerList>
         </kafkaTransport.init>
         <kafkaTransport.publishMessages>
            <topic>test</topic>
         </kafkaTransport.publishMessages>
      </inSequence>
   </target>
</proxy>   

Given below is a sample client (JMeter) screen that displays how to send a message:

Analyzing the output

Run the following command to verify the messages:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

You see the following message content: <name>sample</name>.

This demonstrates that the Kafka connector publishes messages to Kafka brokers.

  • No labels