All docs This doc
Skip to end of metadata
Go to start of metadata

Before you use the Kafka connector, download and install Apache Kafka.

From the tabs below, go to the tab relevant to your Kafka version to see how to configure the Kafka operations.

These instructions are for kafka_2.12-1.0.0 and Java 1.8.

First, copy the following client libraries from the <KAFKA_HOME>/lib directory to the <ESB_HOME>/repository/components/lib directory:

To use the Kafka connector, add the <kafkaTransport.init> element to your configuration with or without security. For detailed information on how to enable TLS authentication for the Kafka broker, producer, and consumer, see Enabling Security.

Creating a producer with security

Given below isa asample configuration that creates a producer with security:

init with security
<kafkaTransport.init>
     <bootstrapServers>localhost:9093</bootstrapServers>
     <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
     <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
     <securityProtocol>SSL</securityProtocol>
     <sslTruststoreLocation>/home/hariprasath/Desktop/kafkaNewJira/certKafka/kafka.server.truststore.jks</sslTruststoreLocation>
     <sslTruststorePassword>test1234</sslTruststorePassword>
     <sslKeystoreLocation>/home/hariprasath/Desktop/kafkaNewJira/certKafka/kafka.server.keystore.jks</sslKeystoreLocation>
     <sslKeystorePassword>test1234</sslKeystorePassword>
     <sslKeyPassword>test1234</sslKeyPassword>
 </kafkaTransport.init>

Here's a description of the properties in the above configuration:

  • bootstrapServers: Required. The Kafka brokers listed as host1:port1host2:port2.

  • keySerializerClass: Required. The serializer class for the key that implements the serializer interface.

  • valueSerializerClass: Required. The serializer class for the value that implements the serializer interface.

  • acks: The number of acknowledgments that the producer requires the leader to receive before considering a request to be complete.

  • bufferMemory: The total bytes of memory the producer can use to buffer records waiting to be sent to the server.

  • compressionType: The compression type for all data generated by the producer.

  • retries: Set a required value if you want automatic retry to take place when a request fails. Setting a value greater than zero causes the client to resend any records for which send fails with a potentially transient error.

  • sslKeyPassword: The password of the private key in the keystore file. Setting this for the client is optional.

  • sslKeystoreLocation: The location of the keystore file. Set this when you want to have two-way authentication for the client. Setting this for the client is optional.

  • sslKeystorePassword: The store password for the keystore file. You need to set this only if ssl.keystore.location is configured.

  • sslTruststoreLocation: The location of the trust store file.

  • sslTruststorePassword: The password for the trust store file.

  • batchSize: Specify how many records the producer should batch together whenever multiple records are sent to the same partition.

  • clientId: The client identifier that you pass to the server when making requests.

  • connectionsMaxIdleTime: The duration, in milliseconds, after which idle connections should be closed.

  • lingerTime: The time span, in milliseconds, to wait before sending a record. Setting this property is useful if you want the client to reduce the number of requests sent when the load is moderate. This adds a slight artificial delay rather than immediately sending out a record. Therefore, the producer delays to allow other records to be sent so that the records can be batched together.

  • maxBlockTime: The maximum amount of time, in milliseconds, that the KafkaProducer.send() and KafkaProducer.partitionsFor()methods can be blocked.

  • maxRequestSize: The maximum size of a request in bytes.

  • partitionerClass: The partitioner class that implements the Partitioner interface.

  • receiveBufferBytes: The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.

  • requestTimeout: The maximum amount of time, in milliseconds, that a client can wait for the server to respond to a request.

  • saslJaasConfig: JAAS login context parameters for SASL connections in the format used by JAAS configuration files. 

  • saslKerberosServiceName: The Kerberos principal name that Kafka runs as.

  • securityProtocol: The protocol used to communicate with brokers.

  • sendBufferBytes: The size of the TCP send buffer (SO_SNDBUF) to use when sending data.

  • sslEnabledProtocols: The list of protocols enabled for SSL connections.

  • sslKeystoreType: The format of thekeystore file. Setting this for the client is optional.

  • sslProtocol: The SSL protocol used to generate the SSLContext.

  • sslProvider: The name of the security provider used for SSL connections. The default value is the default security provider of the JVM.

  • sslTruststoreType: The format of the trust store file.

  • timeout: The maximum amount of time, in milliseconds, that the server can wait for acknowledgments from followers to meet the acknowledgment requirements that the producer has specified with the acks configuration.

  • blockOnBufferFull: Set this to true if you want to stop accepting new records when the memory buffer is full. However, if blocking is not desirable, you can set this property to false, which causes the producer to throw an exception if arecrord is sent to the memory buffer when it is full.

  • maxInFlightRequestsPerConnection: The maximum number of unacknowledged requests that the client can send via a single connection before blocking.

  • metadataFetchTimeout: The maximum amount of time, in milliseconds, to block and wait for the metadata fetch to succeed before throwing an exception to the client.

  • metadataMaxAge: The period of time, in milliseconds, after which you should force-refresh the metadata even if there are no partition leadership changes to proactively discover any new brokers or partitions.

  • metricReporters: A list of classes to use as metrics reporters.

  • metricsNumSamples: The number of samples maintained to compute metrics.

  • metricsSampleWindow: The window of time, in milliseconds, that a metric sample is computed over.

  • reconnectBackoff: The amount of time to wait before attempting to reconnect to a given host.

  • retryBackoff: The amount of time, in milliseconds, to wait before attempting to retry a failed request to a given topic partition.

  • saslKerberosKinitCmd: Thekerberoskinit command path.

  • saslKerberosMinTimeBeforeRelogin: Login thread sleep time, in milliseconds, between refresh attempts.

  • saslKerberosTicketRenewJitter: Percentage of random jitter added to the renewal time.

  • saslKerberosTicketRenewWindowFactor: The login thread sleeps until the specified window factor of time from the last refresh to the ticket's expiry is reached, after which it will try to renew the ticket.

  • sslCipherSuites: A list of cipher suites.

  • sslEndpointIdentificationAlgorithm: The endpoint identification algorithm to validate the server hostname using the server certificate.

  • sslKeymanagerAlgorithm: The algorithm used by the key manager factory for SSL connections. The default value is the key manager factory algorithm configured for the Java Virtual Machine.

  • sslSecureRandomImplementation: The SecureRandom PRNG implementation to use for SSL cryptography operations.

  • sslTrustmanagerAlgorithm: The algorithm used by trust manager factory for SSL connections. The default value is the trust manager factory algorithm configured for the Java Virtual Machine.

  • maxPoolSize: The maximum number of message requests that can share the Kafka connection.

    Performance tuning tip

    For better throughput, configure the <maxPoolSize> parameter as follows in the <init> configuration: <maxPoolSize>20</maxPoolSize>

    By default, a Kafka connection is created for each message request. If you set the maxPoolSize parameter, that many Kafka connectors share the connection to the requests.

Creating a producer without security

Given below is a sample configuration that creates a producer without security.

init
<kafkaTransport.init>
	<bootstrapServers>localhost:9092</bootstrapServers>
	<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
	<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
</kafkaTransport.init>

These instructions are for kafka_2.12-0.11.0.0 and Java 1.8.

First, copy the following client libraries from the <KAFKA_HOME>/lib directory to the <ESB_HOME>/repository/components/lib directory:

To use the Kafka connector, add the <kafkaTransport.init> element to your configuration with or without security. For detailed information on how to enable TLS authentication for the Kafka broker, producer, and consumer, see Enabling Security.

Creating a producer with security

Given below isa asample configuration that creates a producer with security:

init with security
<kafkaTransport.init>
     <bootstrapServers>localhost:9093</bootstrapServers>
     <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
     <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
     <securityProtocol>SSL</securityProtocol>
     <sslTruststoreLocation>/home/hariprasath/Desktop/kafkaNewJira/certKafka/kafka.server.truststore.jks</sslTruststoreLocation>
     <sslTruststorePassword>test1234</sslTruststorePassword>
     <sslKeystoreLocation>/home/hariprasath/Desktop/kafkaNewJira/certKafka/kafka.server.keystore.jks</sslKeystoreLocation>
     <sslKeystorePassword>test1234</sslKeystorePassword>
     <sslKeyPassword>test1234</sslKeyPassword>
 </kafkaTransport.init>

Here's a description of the properties in the above configuration:

  • bootstrapServers: Required. The Kafka brokers listed as host1:port1host2:port2.

  • keySerializerClass: Required. The serializer class for the key that implements the serializer interface.

  • valueSerializerClass: Required. The serializer class for the value that implements the serializer interface.

  • acks: The number of acknowledgments that the producer requires the leader to receive before considering a request to be complete.

  • bufferMemory: The total bytes of memory the producer can use to buffer records waiting to be sent to the server.

  • compressionType: The compression type for all data generated by the producer.

  • retries: Set a required value if you want automatic retry to take place when a request fails. Setting a value greater than zero causes the client to resend any records for which send fails with a potentially transient error.

  • sslKeyPassword: The password of the private key in the keystore file. Setting this for the client is optional.

  • sslKeystoreLocation: The location of the keystore file. Set this when you want to have two-way authentication for the client. Setting this for the client is optional.

  • sslKeystorePassword: The store password for the keystore file. You need to set this only if ssl.keystore.location is configured.

  • sslTruststoreLocation: The location of the trust store file.

  • sslTruststorePassword: The password for the trust store file.

  • batchSize: Specify how many records the producer should batch together whenever multiple records are sent to the same partition.

  • clientId: The client identifier that you pass to the server when making requests.

  • connectionsMaxIdleTime: The duration, in milliseconds, after which idle connections should be closed.

  • lingerTime: The time span, in milliseconds, to wait before sending a record. Setting this property is useful if you want the client to reduce the number of requests sent when the load is moderate. This adds a slight artificial delay rather than immediately sending out a record. Therefore, the producer delays to allow other records to be sent so that the records can be batched together.

  • maxBlockTime: The maximum amount of time, in milliseconds, that the KafkaProducer.send() and KafkaProducer.partitionsFor()methods can be blocked.

  • maxRequestSize: The maximum size of a request in bytes.

  • partitionerClass: The partitioner class that implements the Partitioner interface.

  • receiveBufferBytes: The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.

  • requestTimeout: The maximum amount of time, in milliseconds, that a client can wait for the server to respond to a request.

  • saslJaasConfig: JAAS login context parameters for SASL connections in the format used by JAAS configuration files. 

  • saslKerberosServiceName: The Kerberos principal name that Kafka runs as.

  • securityProtocol: The protocol used to communicate with brokers.

  • sendBufferBytes: The size of the TCP send buffer (SO_SNDBUF) to use when sending data.

  • sslEnabledProtocols: The list of protocols enabled for SSL connections.

  • sslKeystoreType: The format of thekeystore file. Setting this for the client is optional.

  • sslProtocol: The SSL protocol used to generate the SSLContext.

  • sslProvider: The name of the security provider used for SSL connections. The default value is the default security provider of the JVM.

  • sslTruststoreType: The format of the trust store file.

  • timeout: The maximum amount of time, in milliseconds, that the server can wait for acknowledgments from followers to meet the acknowledgment requirements that the producer has specified with the acks configuration.

  • blockOnBufferFull: Set this to true if you want to stop accepting new records when the memory buffer is full. However, if blocking is not desirable, you can set this property to false, which causes the producer to throw an exception if arecrord is sent to the memory buffer when it is full.

  • maxInFlightRequestsPerConnection: The maximum number of unacknowledged requests that the client can send via a single connection before blocking.

  • metadataFetchTimeout: The maximum amount of time, in milliseconds, to block and wait for the metadata fetch to succeed before throwing an exception to the client.

  • metadataMaxAge: The period of time, in milliseconds, after which you should force-refresh the metadata even if there are no partition leadership changes to proactively discover any new brokers or partitions.

  • metricReporters: A list of classes to use as metrics reporters.

  • metricsNumSamples: The number of samples maintained to compute metrics.

  • metricsSampleWindow: The window of time, in milliseconds, that a metric sample is computed over.

  • reconnectBackoff: The amount of time to wait before attempting to reconnect to a given host.

  • retryBackoff: The amount of time, in milliseconds, to wait before attempting to retry a failed request to a given topic partition.

  • saslKerberosKinitCmd: Thekerberoskinit command path.

  • saslKerberosMinTimeBeforeRelogin: Login thread sleep time, in milliseconds, between refresh attempts.

  • saslKerberosTicketRenewJitter: Percentage of random jitter added to the renewal time.

  • saslKerberosTicketRenewWindowFactor: The login thread sleeps until the specified window factor of time from the last refresh to the ticket's expiry is reached, after which it will try to renew the ticket.

  • sslCipherSuites: A list of cipher suites.

  • sslEndpointIdentificationAlgorithm: The endpoint identification algorithm to validate the server hostname using the server certificate.

  • sslKeymanagerAlgorithm: The algorithm used by the key manager factory for SSL connections. The default value is the key manager factory algorithm configured for the Java Virtual Machine.

  • sslSecureRandomImplementation: The SecureRandom PRNG implementation to use for SSL cryptography operations.

  • sslTrustmanagerAlgorithm: The algorithm used by trust manager factory for SSL connections. The default value is the trust manager factory algorithm configured for the Java Virtual Machine.

  • maxPoolSize: The maximum number of message requests that can share the Kafka connection.

    Performance tuning tip

    For better throughput, configure the <maxPoolSize> parameter as follows in the <init> configuration: <maxPoolSize>20</maxPoolSize>

    By default, a Kafka connection is created for each message request. If you set the maxPoolSize parameter, that many Kafka connectors share the connection to the requests.

Creating a producer without security

Given below is a sample configuration that creates a producer without security.

init
<kafkaTransport.init>
	<bootstrapServers>localhost:9092</bootstrapServers>
	<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
	<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
</kafkaTransport.init>

These instructions are for Kafka version 2.9.2-0.8.1.1.

Copy the following client libraries from the <KAFKA_HOME>/lib directory to the <ESB_HOME>/repository/components/lib directory.

  • kafka_2.9.2-0.8.1.1.jar

  • scala-library-2.9.2.jar

  • zkclient-0.3.jar

  • zookeeper-3.3.4.jar

  • metrics-core-2.2.0.jar

  • kafka-clients-0.8.x.x.jar (Only if you use Kafka_2.x.x-0.8.2.0 or later)

To use the Kafka connector, add the <kafka.init> element in your configuration before carrying out any Kafka producer operation.

<kafkaTransport.init>
      <brokerList>localhost:9092</brokerList>
</kafkaTransport.init>
  • brokerList: Required. Thekafka broker list. The format is host1:port1,host2:port2.

  • serializationClass: The serializer class for messages. The default encoder takes a byte and returns the same byte.

  • producerType: Whether the messages are sent asynchronously or synchronously.

  • requiredAck: The value that controls when a produce request is considered completed.

  • keySerializerClass: The serializer class for keys.

  • partitionClass: The class for partitioning messages among sub-topics.

  • compressionCodec: The compression codec for all data generated by this producer.

  • compressedTopics: Whether compression should be turned on for particular topics.

  • messageSendMaxRetries: The number of times the producer should automatically retry a failed send request. 

  • retryBackOff: The amount of time the producer should wait before refreshing the metadata.

  • refreshInterval: The interval after which the producer should refresh the topic metadata from brokers when there is a failure. 

  • bufferingMaxMessages: The maximum number of unsent messages that can be queued up in the producer when messages are sent asynchronously. 

  • batchNoMessages: The number of messages to send in one batch when messages are sent asynchronously.

  • sendBufferSize: The Socket writing buffer size.

  • requestTimeout: The amount of time the broker will waittrying to meet the request.

  • bufferingMaxTime: The maximum time to buffer data when messages are sent asynchronously.

  • enqueueTimeout: The amount of time to block before dropping messages, when messages are sent asynchronously and the buffer has reached the value specified as bufferingMaxMessages.

  • clientId: The user-specified string sent in each request to help trace calls.

  • maxPoolSize: The maximum number of message requests that can share the Kafka connection.

    Performance tuning tip

    For better throughput, configure the <maxPoolSize> parameter as follows in the <init> configuration: <maxPoolSize>20</maxPoolSize>

    By default, a Kafka connection is created for each message request. If you set the maxPoolSize parameter, that many Kafka connectors share the connection to the requests.

Now that you have connected to Kafka, let's publish messages to the Kafka brokers.


  • No labels