This documentation is for WSO2 Data Analytics Server 3.0.0. View documentation for the latest release.
||
Skip to end of metadata
Go to start of metadata

Introduction to data publisher

A data publisher allows you to send data to a predefined set of data fields in a DAS/CEP server. The data structure with predefined fields is defined in an event stream. The data is converted to the format defined by the event stream and sent via the WSO2 data-bridge component. You can also send custom key-value pairs with data events.

Custom fields with data stream

The data bridge data agent has a map data structure that enables you to send an arbitrary number of string key-value pairs. The other data structures are the three object arrays corresponding to the key-value pairs of metadata, correlation data, and payload data of fixed stream definitions. You can change the key-value pairs in the map data structure from message to message, but they all should be of the string data type.

You can put the data types of these custom key-value pairs into three groups according to the transmission category.

  1. When the key starts with meta: The data field is considered as a metadata custom field. It is sent with metadata and saved in Cassandra with the meta_ key prefix.
  2. When the key starts with correlation: The data field is considered as a correlation data custom field. It is sent with correlation data and saved in Cassandra with the correlation_ key prefix.
  3. When the key starts with payload or any other string: The data field is considered as a payload data custom field. It is sent with payload data and saved in Cassandra with the payload_ key prefix.

Dependencies

In order to publish data to WSO2 DAS/CEP through a custom data agent, you need to have the following dependencies. You can configure the dependencies either using the class path or using the POM file.

Adding dependencies using class path

Add the JAR files listed below to your class path. Note that${carbon.analytics-common.version}refers to the version of the carbon-analytics-common github repository -  https://github.com/wso2/carbon-analytics-common/.

It is always recommended to use the jar file from the latest released version. In DAS 3.0 release, 5.0.6 is the version of the carbon-analytics-common repository. Therefore, if you want to use the 5.0.6 repository to build your data publisher, you can configure the pom.xml file as follows.

<carbon.analytics.common.version>5.0.6</carbon.analytics.common.version>

  • org.wso2.carbon.logging_4.3.0.jar

  • commons-pool-1.5.6.wso2v1.jar

  • google-collect_1.0.0.wso2v2.jar

  • org.wso2.carbon.utils_4.3.0.jar

  • org.wso2.carbon.base_4.3.0.jar

  • axiom_1.2.11.wso2v5.jar

  • httpclient-4.2.5.wso2v1.jar

  • libthrift-0.7.0.wso2v2.jar

  • slf4j.log4j12-1.6.1.jar

  • slf4j.api-1.6.1.jar

  • org.wso2.carbon.databridge.agent-${carbon.analytics.common.version}.jar

  • org.wso2.carbon.databridge.commons.${carbon.analytics.common.version}.jar

  • org.wso2.carbon.databridge.commons-${carbon.analytics.common.version}.jar

  • disruptor-2.10.4.wso2v2.jar

Adding dependencies using POM file

Alternatively, add the following Maven project dependency entries to your POM file. Note that ${carbon.analytics-common.version} refers to the version of the carbon-analytics-common github repository -  https://github.com/wso2/carbon-analytics-common/. It is always recommended to use the dependency entry from the latest released version.

 

Maven repository
<pluginRepository>
	<id>wso2.releases</id>
	<name>WSO2 internal Repository</name>
	<url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
	<releases>
		<enabled>true</enabled>
		<updatePolicy>daily</updatePolicy>
		<checksumPolicy>ignore</checksumPolicy>
	</releases>
</pluginRepository>

<carbon.analytics.common.version>5.0.7-SNAPSHOT</carbon.analytics.common.version>
Maven pom dependency
<dependency>
    <groupId>org.org.wso2.carbon.analytics-common</groupId>
    <artifactId>org.wso2.carbon.databridge.agent</artifactId>
    <version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
    <groupId>org.wso2.carbon.analytics-common</groupId>
    <artifactId>org.wso2.carbon.databridge.commons</artifactId>
    <version>${carbon.analytics.common.version}</version>
</dependency>
<dependency>
    <groupId>org.wso2.carbon.analytics-common</groupId>
    <artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId>
    <version>${carbon.analytics.common..version}</version>
</dependency>

Specify the relevant version for the <version> element.

 

Configuring the data agent

A data agent is a single controller for all types of data publishers created. Data publishers share resources such as client pool etc. with one data agent. Thrift data agent is available by default. You can also extend and write a new data agent such as a binary data agent. 

Follow the steps below to configure a data agent.

  1. Load the following sample configurations and properties to define the data agent in the JVM.

    <DataAgentsConfiguration>
        <Agent>
            <Name>Thrift</Name>
            <DataEndpointClass>org.wso2.carbon.databridge.agent.internal.endpoint.thrift.ThriftDataEndpoint</DataEndpointClass>
            <TrustSore>src/main/resources/client-truststore.jks</TrustSore>
            <TrustSorePassword>wso2carbon</TrustSorePassword>
            <QueueSize>32768</QueueSize>
            <BatchSize>200</BatchSize>
            <CorePoolSize>5</CorePoolSize>
            <MaxPoolSize>10</MaxPoolSize>
            <KeepAliveTimeInPool>20</KeepAliveTimeInPool>
            <ReconnectionInterval>30</ReconnectionInterval>
            <MaxTransportPoolSize>250</MaxTransportPoolSize>
            <MaxIdleConnections>250</MaxIdleConnections>
            <EvictionTimePeriod>5500</EvictionTimePeriod>
            <MinIdleTimeInPool>5000</MinIdleTimeInPool>
            <SecureMaxTransportPoolSize>250</SecureMaxTransportPoolSize>
            <SecureMaxIdleConnections>250</SecureMaxIdleConnections>
            <SecureEvictionTimePeriod>5500</SecureEvictionTimePeriod>
            <SecureMinIdleTimeInPool>5000</SecureMinIdleTimeInPool>
        </Agent>
    
        <Agent>
            <Name>Binary</Name>
            <DataEndpointClass>org.wso2.carbon.databridge.agent.internal.endpoint.binary.BinaryDataEndpoint
            </DataEndpointClass>
            <TrustSore>src/main/resources/client-truststore.jks</TrustSore>
            <TrustSorePassword>wso2carbon</TrustSorePassword>
             <QueueSize>32768</QueueSize>
            <BatchSize>200</BatchSize>
            <CorePoolSize>5</CorePoolSize>
            <MaxPoolSize>10</MaxPoolSize>
            <KeepAliveTimeInPool>20</KeepAliveTimeInPool>
            <ReconnectionInterval>30</ReconnectionInterval>
            <MaxTransportPoolSize>250</MaxTransportPoolSize>
            <MaxIdleConnections>250</MaxIdleConnections>
            <EvictionTimePeriod>5500</EvictionTimePeriod>
            <MinIdleTimeInPool>5000</MinIdleTimeInPool>
            <SecureMaxTransportPoolSize>250</SecureMaxTransportPoolSize>
            <SecureMaxIdleConnections>250</SecureMaxIdleConnections>
            <SecureEvictionTimePeriod>5500</SecureEvictionTimePeriod>
            <SecureMinIdleTimeInPool>5000</SecureMinIdleTimeInPool>
        </Agent>
    </DataAgentsConfiguration>

    To configure the above parameters in the <DAS_HOME>/repository/conf/data-bridge/data-agent-conf.xml file in order to tune performance, follow the instructions in Performance Tuning.

  2. Instantiate the data publisher as follows:  AgentHolder. setConfigPath(“/path/to/data/agent/conf.xml”)
  3. Instantiate and use the data publisher using one of the following configurations:

    • DataPublisher dataPublisher = new DataPublisher(url, username, password);
    • DataPublisher dataPublisher = new DataPublisher(receiverURLSet, username, password);

    • DataPublisher dataPublisher = new DataPublisher(receiverURLSet,authURLSet, username, password);  

      For information on the receiverURLSet and authURLSet parameters of the above configuration, see Setting up Multi Receiver and Load Balancing Data Agent. And similarly if you are passing an receiverURLSet as tcp://localhost:7611|tcp://localhost:7612|tcp://localhost:7613, then the corresponding receiverURL set will be ssl://localhost:7711|ssl://localhost:7712|ssl://localhost:7713.

      In all the above methods, the default data agent (which is configured as first Agent element in the above configuration) will be used to create the data publishers. If you have configured only the Thrift data agent in the <DAS_HOME>/repository/conf/data-bridge/data-agent-conf.xml file, then this will provide you a Thirft-based data publisher instance.

      However, if you have configured more types of data agents in the <DAS_HOME>/repository/conf/data-bridge/data-agent-conf.xml  file (Eg: Binary Agent in the above sample data-agent-conf.xml ), then you can pass an additional property named type, which denotes the type of data publisher that needs to be created. For example, if you have a binary data publisher, then you can pass binary as the type to get the binary data publisher Instance as shown below.

      DataPublisher dataPublisher = new DataPublisher(String type, String receiverURLSet, String authURLSet, String username, String password)

Data publisher sample

As a prerequisite for this sample, you need to define the streams in the receiver server (WSO2 DAS/CEP). For information on defining event streams, see Understanding Event Streams and Event Tables.

Follow the procedure below to use the data publisher.

  1. Initialize the data publisher as follows.

    AgentHolder. setConfigPath ( getDataAgentConfigPath ());

    DataPublisher dataPublisher =  new  DataPublisher(url, username, password);
  2. Generate the stream ID for the stream from which you are going to publish the event as follows.

    String streamId = DataBridgeCommonsUtils.generateStreamId(HTTPD_LOG_STREAM, VERSION ;

  3. Publish the events using any of the following methods.
    • In the following configuration, the published event is blocked being called until the event is put into a disruptor. If the disruptor is full it will wait until there is a free space.

      Event event = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
      new Object[]{aLog});
      dataPublisher.publish(event);
    • Try publish as shown in the following configuration, is a non-blocking publishing. If there is a space available in the disruptor, it will try to insert the event. However, if the disruptor is full, the event is returned back immediately without waiting.

      Event event = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
      new Object[]{aLog});
      dataPublisher.tryPublish(event);
    • Try publish as shown in the following configuration, is a non-blocking publishing with timeout in mili seconds. if there is a space available in the disruptor it will try to insert the event, but if the disruptor is full it will wait for the specified amount of time, and if the timeout is reached the event is returned back.

      Event event = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
      new Object[]{aLog});
      dataPublisher.tryPublish(event, 100);

      When you use the tryPublish API, it is important to check the value it returns. If it returns false (i.e indicating that the event was not sent), you should slow down the process of sending it requests in order to avoid overusing the CPU resources and blocking them for other operations. In order to do this, use the sleep option or similar.

    For more information on the usage of data publishers, see the sample in the <DAS_HOME>/samples/httpd-logs/ directory.

  • No labels
  • Download PDF icon Download PDF
  • Download a PDF file of the documentation