This documentation is for WSO2 Stream Processor 4.2.0. View documentation for the latest release.

All docs This doc

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

A Siddhi Application is a combination of multiple Siddhi executional elements. A Siddhi executional element can be a Siddhi Query or a Siddhi Partition. When defining a Siddhi application, you can specify a number of parallel instances to be created for each executional element, and how each executional element must be isolated for an SP instance. Based on this, the initial Siddhi application is divided into multiple Siddhi applications and deployed in different SP instances.

Any standalone Siddhi Application can be converted in to a distributed Siddhi Application by adding @dist annotations. By adding these annotations, you ca divide the Siddhi Application to multiple execution groups and run them in parallel with multiple instances per group.

Annotations

The supported annotations are as follows: Table of ContentsmaxLevel2minLevel2

Excerpt
hiddentrue

Executional elements

A distributed Siddhi application can contain one or more of the following elements:

ElementDescription
Stateless queries

Queries that only consider currently incoming events when generating an output.

e.g., Filters

Stateful queriesQueries that consider both currently incoming events as well as past events when generating an output.
e.g., windows, sequences, patterns, etc. 
PartitionsCollections of stream definitions and Siddhi queries separated from each other within a Siddhi application for the purpose of processing events in parallel and in isolation.

Execution group annotation

Syntax@dist(execGroup=’name of the group<string>’)
DescriptionThis annotation specifies the execution groups. An execution group is a collection of queries that is executed as a single unit. You can add this annotation at the query level and specify a name for the group. Queries with same execution group name are considered as part of the same group. If you do not specify an execution group name, system generated name with the {Siddhi app name}-random UUID format is assigned. When grouping queries, the following scenarios result in validation exceptions and application deployment is halted as a result.
  • The same window being referenced from two different execution groups.
  • The same in-memory table being referenced from two different execution groups.
  • Two partitions with the same key residing within the same execution group.
  • The same stream being referenced as an unpartitioned stream within a partition and also in a normal query within a same execution group.
Applicable LevelQuery
Example
Code Block
languagesql
@info(name = ‘query-1')
@dist(execGroup='group-1')
from TempStream#window.time(2 min)
select avg(temp) as avgTemp, roomNo, deviceID
insert all events into AvgTempStream;

Parallel annotation

Syntax@dist(parallel=’no of parallel instances<int>’)
Description

This annotation specifies the execution parallelism of an execution group (i.e., the number of instances in which the executional elements of the execution group must be executed in parallel). Parallelism is always defined against an execution group. If no parallelism is specified, 1 is considered default. If specified, the number of instances of the execution group is created, and all these instances work in parallel. When defining parallelism, the following scenarios result in validation exceptions and application deployment is halted as a result.

  • A defined window is referenced within an execution group of which the parallelism is greater than 1.

  • An in-memory table is referenced within an execution group of which the parallelism is greater than 1.

  • A window query is included within an execution group of which the parallelism is greater than 1.

  • A pattern query within an execution group of which the parallelism is greater than 1.

  • A sequence query within an execution group of which the parallelism is greater than 1.

  • A join query within an execution group of which the parallelism is greater than 1.

  • Window within a partition query where window is not defined against the partitioned stream or the inner stream that belongs to an execution group of which the parallelism is greater than 1.

  • If different parallelism numbers are specified within the same group, application deployment is aborted with a validation exception.

Applicable LevelQuery
Example
Code Block
@info(name = ‘query-3')
@dist(parallel='2')
from TempStream [(roomNo >= 100 and roomNo < 110) and temp > 40 ]
select roomNo, temp
insert into HighTempStream;

Transport channel creation annotation

Syntax@App:transportChannelCreationEnabled('true|false')
DescriptionThis annotation specifies whether Stream Processor managers are allowed to create Kafka topics that are required for application deployment. By default, this is set to true and the topics are created when the application is deployed. If the annotation is set to false, you need to create the required intermediate topics before deploying the application. If the required topics do not exist when you deploy the application, the application deployment fails, and the deployment process is aborted.
Applicable LevelApplication
Example
Code Block
languagesql
@App:name('wso2-app')  @App:transportChannelCreationEnabled('false')	

Creating a distributed Siddhi application

This section explains how to write distributed Sidhi applications by assigning executional elements to different execution groups.

Annotations

The following annotations are used when writing a distributed Siddhi application.

...

All the executional elements with the same execution group are executed in the same Siddhi application. When different execution groups are mentioned within the same distributed Siddhi application, WSO2 SP initiates a separate Siddhi Application per execution group. In each separated Siddhi application, only the executional elements assigned to the relevant execution group are executed.

Executional elements that have no execution group assigned to them are executed in a separate SP instance.

...

The number of instances in which the executional element must be executed in parallel. All the executional elements assigned to a specific execution group (i.e., via the @dist(execGroup) annotation) must have the same number of parallel instances specified. If there is a mismatch in the parallel instances specified for an execution group, an exception occurs.

When the number of parallel instances to be run is not given for the executional elements assigned to an execution group, only one Siddhi application is initiated for that execution group.

User given source parellelism annotation

Syntax@dist(parallel='no of parallel receiver instances<int>’)
DescriptionThis specifies the number of parallel receiver instances that should be created for a user given source. In a distributed deployment, user given sources are extracted out as separate passthrough Siddhi applications and deployed on designated receiver nodes. If adequate receiver nodes are not available, the deployment is aborted. If you have not specified a parallel count, it is defaulted to 1.  
Applicable LevelStream
Example
Code Block
languagesql
@source(type='inMemory', topic='stock', @map(type='json'), @dist(parallel='2'))


Example

The following is a sample distributed Siddhi application.

Code Block
languagesql
@App:name('wso2-app')

@Source(type=’http’,@map(type=’json),@dist(parallel=’3’))
Define stream TempStream(deviceID long, roomNo int, temp double);

@Source(type=’http’,@map(type=’json),@dist(parallel=’3’))
Define stream RegulatorStream(deviceID long, roomNo int, isOn bool);

@info(name = ‘query1‘query-1') @dist(execGroup='group1group-1')
from TempStream#window.time(2 min)

select avg(temp) as avgTemp, roomNo, deviceID insert all events into AvgTempStream;
 
@info(name = ‘query2‘query-2') @dist(execGroup='group1group-1')
from TempStream[temp > 30.0]#window.time(1 min) as T join

RegulatorStream[isOn ==false]#window.length(1) as R on T.roomNo == R.roomNo
select T.roomNo, R.deviceID, 'start' as action insert into RegulatorActionStream;
 
@info(name = ‘query3‘query-3') @dist(execGroup='group1group-1')
from every( e1=TempStream ) -> e2=TempStream[e1.roomNo==roomNo and (e1.temp + 5) <= temp ] within 10 min select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;
 
@info(name = ‘query4‘query-4') @dist(execGroup='group2group-2' ,parallel ='23')
from TempStream [(roomNo >= 100 and roomNo < 110) and temp > 40 ] select roomNo, temp insert into HighTempStream;
 
@info(name = ‘query5‘query-5') @dist(execGroup='group3group-3' , parallel=’2’)
partition with ( deviceID of TempStream )
begin
    from TempStream#window.time(1 min)
    select roomNo, deviceID, temp, avg(temp) as avgTemp
    insert into #AvgTempStream;
 
    from #AvgTempStream[avgTemp > 20]#window.length(10)
    select roomNo, deviceID, max(temp) as maxTemp
    insert into deviceTempStream;
end;

When this In the above Siddhi application is deployed, it is executed as shown in the table below.

...

Number of Siddhi

Application Instances

...

group1

...

query1

query2

query3

...

group2

...

query4

...

group3

...

, there are two user given sources. The execution group per source is created via a passthrough query to fetch data from each user given source and insert into Kafka. The details of these execution groups are as follows:

Group name: wso2-app-passthrough-xxxxx
Queries included: wso2-app-passthrough-xxxxx-1 (auto generated query)
Parallelism: 1

There are three user-defined groups. g roup-1 is defined without specifying parallelism. Therefore, the default parellelism of 1 applies.


  • Details of group-1 are as follows:
    Group name: wso2-app-group-1
    Queries included: query-1, query-2, query-3
    Parallelism: 1
     
  • Details of  group-2  are as follows:
    Group namewso2-app-group-2 
    Queries includedquery-4 
    Parallelism3
  • Details of  group-3  are as follows:

    Group name: wso2-app-group-3
    Queries included: query-5
    Parallelism: 2


All of these groups are deployed in worker nodes with patching parallelism as separate parallel Siddhi applications. Failure to deploy any of the group reults in the whole deployment being aborted.