This documentation is for WSO2 Complex Event Processor 4.0.0. View documentation for the latest release.
WSO2 Complex Event Processor is succeeded by WSO2 Stream Processor. To view the latest documentation for WSO2 SP, see WSO2 Stream Processor Documentation.
||
Skip to end of metadata
Go to start of metadata

In addition to the default receiver types, you can define your own custom receiver, which gives more flexibility to receive events that are sent to WSO2 products. Since each event receiver implementation is an OSGI bundle, you can deploy/undeploy it easily on the WSO2 product. To create a custom event receiver, import org.wso2.carbon.event.input.adaptor.core package with the provided skeleton classes/interfaces required by a custom receiver implementation.

Implementing InputEventAdapter Interface

org.wso2.carbon.event.input.adapter.core.InputEventAdapter interface contains the event receiver logic that will be used to receive events. You should override the below methods when implementing your own custom receiver.

  1. void init(InputEventAdapterListener eventAdaptorListener) throws InputEventAdapterException

    This method is called when initiating event receiver bundle. Relevant code segments which are needed when loading OSGI bundle can be included in this method.

  2. void testConnect() throws TestConnectionNotSupportedException, InputEventAdapterRuntimeException, ConnectionUnavailableException

    This method checks whether the receiving server is available.

  3. void connect() throws InputEventAdapterRuntimeException, ConnectionUnavailableException

    Method connect() will be called after calling the init() method. Intention is to connect to a receiving end and if it is not available "ConnectionUnavailableException" will be thrown.

  4. void disconnect()

     disconnect() method can be called when it is needed to disconnect from the connected receiving server.

  5. void destroy()

    The method can be called when removing an event receiver. The cleanups that has to be done when removing the receiver can be done over here.

  6. boolean isEventDuplicatedInCluster()

    Returns a boolean output stating whether an event is duplicated in a cluster or not. This can be used in clustered deployment.

  7. boolean isPolling()

    Checks whether events get accumulated at the adapter and clients connect to it to collect events.

Below is a sample File Tail Receiver implementation of the above described methods: 

public class FileTailEventAdapter implements InputEventAdapter {

    @Override
    public void init(InputEventAdapterListener eventAdapterListener) throws InputEventAdapterException {
        validateInputEventAdapterConfigurations();
        this.eventAdapterListener = eventAdapterListener;
    }

    @Override
    public void testConnect() throws TestConnectionNotSupportedException {
        throw new TestConnectionNotSupportedException("not-supported");
    }

    @Override
    public void connect() {
        createFileAdapterListener();
    }

    @Override
    public void disconnect() {
        if (fileTailerManager != null) {
            fileTailerManager.getTailer().stop();
        }
    }

    @Override
    public void destroy() {
    }

    @Override
    public boolean isEventDuplicatedInCluster() {
        return Boolean.parseBoolean(globalProperties.get(EventAdapterConstants.EVENTS_DUPLICATED_IN_CLUSTER));
    }

    @Override
    public boolean isPolling() {
        return true;
    }

    private void validateInputEventAdapterConfigurations() throws InputEventAdapterException {
        String delayInMillisProperty = eventAdapterConfiguration.getProperties().get(FileTailEventAdapterConstants.EVENT_ADAPTER_DELAY_MILLIS);
        try{
            Integer.parseInt(delayInMillisProperty);
        } catch (NumberFormatException e){
            throw new InputEventAdapterException("Invalid value set for property Delay: " + delayInMillisProperty, e);
        }
    }
    
    private void createFileAdapterListener() {
        if(log.isDebugEnabled()){
            log.debug("New subscriber added for " + eventAdapterConfiguration.getName());
        }
        String delayInMillisProperty = eventAdapterConfiguration.getProperties().get(FileTailEventAdapterConstants.EVENT_ADAPTER_DELAY_MILLIS);
        int delayInMillis = FileTailEventAdapterConstants.DEFAULT_DELAY_MILLIS;
        if (delayInMillisProperty != null && (!delayInMillisProperty.trim().isEmpty())) {
            delayInMillis = Integer.parseInt(delayInMillisProperty);
        }
        boolean startFromEnd = false;
        String startFromEndProperty = eventAdapterConfiguration.getProperties().get(FileTailEventAdapterConstants.EVENT_ADAPTER_START_FROM_END);
        if (startFromEndProperty != null && (!startFromEndProperty.trim().isEmpty())) {
            startFromEnd = Boolean.parseBoolean(startFromEndProperty);
        }
        String filePath = eventAdapterConfiguration.getProperties().get(
                FileTailEventAdapterConstants.EVENT_ADAPTER_CONF_FILEPATH);
        FileTailerListener listener = new FileTailerListener(new File(filePath).getName(), eventAdapterListener);
        Tailer tailer = new Tailer(new File(filePath), listener, delayInMillis, startFromEnd);
        fileTailerManager = new FileTailerManager(tailer, listener);
        singleThreadedExecutor.execute(tailer);
    }
}

Implementing InputEventAdapterFactory Class

org.wso2.carbon.event.input.adapter.core. InputEventAdapterFactory class can be used as the factory to create your appropriate event receiver type. You should override the below methods when extending your own custom receiver.

  1. public String getType()

    This method returns the receiver type as a String.

  2. public List<String> getSupportedMessageFormats()

    Specify supported message formats for the created receiver type.

  3. public List<Property> getPropertyList()

    Here the properties have to be defined for the receiver. When defining properties you can implement to configure property values from the management console.

  4. public String getUsageTips()

    Specify any hints to be displayed in the management console.

  5. public InputEventAdapter createEventAdapter(InputEventAdapterConfiguration eventAdapterConfiguration, Map<String, String> globalProperties)

    This method creates the receiver by specifying event adapter configuration and global properties which are common to every adapter type.

Below is a sample File Tail Receiver implementation of the InputEventAdapterFactory class: 

public class FileTailEventAdapterFactory extends InputEventAdapterFactory {
    @Override
    public String getType() {
        return FileTailEventAdapterConstants.EVENT_ADAPTER_TYPE_FILE;
    }

    @Override
    public List<String> getSupportedMessageFormats() {
        List<String> supportInputMessageTypes = new ArrayList<String>();
        supportInputMessageTypes.add(MessageType.TEXT);
        return supportInputMessageTypes;
    }

    @Override
    public List<Property> getPropertyList() {
        List<Property> propertyList = new ArrayList<Property>();
        Property filePath = new Property(FileTailEventAdapterConstants.EVENT_ADAPTER_CONF_FILEPATH);
        filePath.setDisplayName(
                resourceBundle.getString(FileTailEventAdapterConstants.EVENT_ADAPTER_CONF_FILEPATH));
        filePath.setRequired(true);
        filePath.setHint(resourceBundle.getString(FileTailEventAdapterConstants.EVENT_ADAPTER_CONF_FILEPATH_HINT));
        propertyList.add(filePath);

        Property delayInMillis = new Property(FileTailEventAdapterConstants.EVENT_ADAPTER_DELAY_MILLIS);
        delayInMillis.setDisplayName(
                resourceBundle.getString(FileTailEventAdapterConstants.EVENT_ADAPTER_DELAY_MILLIS));
        delayInMillis.setHint(resourceBundle.getString(FileTailEventAdapterConstants.EVENT_ADAPTER_DELAY_MILLIS_HINT));
        propertyList.add(delayInMillis);
        Property startFromEndProperty = new Property(FileTailEventAdapterConstants.EVENT_ADAPTER_START_FROM_END);
        startFromEndProperty.setRequired(true);
        startFromEndProperty.setDisplayName(
                resourceBundle.getString(FileTailEventAdapterConstants.EVENT_ADAPTER_START_FROM_END));
        startFromEndProperty.setOptions(new String[]{"true", "false"});
        startFromEndProperty.setDefaultValue("true");
        startFromEndProperty.setHint(resourceBundle.getString(
                FileTailEventAdapterConstants.EVENT_ADAPTER_START_FROM_END_HINT));
        propertyList.add(startFromEndProperty);
        return propertyList;
    }

    @Override
    public String getUsageTips() {
        return resourceBundle.getString(FileTailEventAdapterConstants.EVENT_ADAPTER_USAGE_TIPS_FILE);
    }

    @Override
    public InputEventAdapter createEventAdapter(InputEventAdapterConfiguration eventAdapterConfiguration,
                                                Map<String, String> globalProperties) {
        return new FileTailEventAdapter(eventAdapterConfiguration, globalProperties);
    }

}

Exposing Custom Event Receiver as an OSGI Service

Apart from above, you can maintain a service class under internal\ds\ directory to expose the custom event receiver implementation as an OSGI service. When exposing the service, it needs to expose as “InputEventAdaptorFactory” type. Below is a sample implementation for a service class for a File Tail Receiver:

/**
 * @scr.component component.name="input.File.AdapterService.component" immediate="true"
 * @scr.reference name="configurationcontext.service"
 * interface="org.wso2.carbon.utils.ConfigurationContextService" cardinality="1..1"
 * policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
 */
 
public class FileTailEventAdapterServiceDS {
    private static final Log log = LogFactory.getLog(FileTailEventAdapterServiceDS.class);

    protected void activate(ComponentContext context) {
        try {
            InputEventAdapterFactory testInEventAdapterFactory = new FileTailEventAdapterFactory();
            context.getBundleContext().registerService(InputEventAdapterFactory.class.getName(),
                    testInEventAdapterFactory, null);
            if (log.isDebugEnabled()) {
                log.debug("Successfully deployed the TailFile input event adapter service");
            }
        } catch (RuntimeException e) {
            log.error("Can not create the TailFile input event adapter service ", e);
        }
    }

    protected void setConfigurationContextService(
            ConfigurationContextService configurationContextService) {
        FileTailEventAdapterServiceHolder.registerConfigurationContextService(configurationContextService);
    }

    protected void unsetConfigurationContextService(
            ConfigurationContextService configurationContextService) {
        FileTailEventAdapterServiceHolder.unregisterConfigurationContextService(configurationContextService);
    }
}


Furthermore you can have a utility directory as internel\util\ where you can place utility classes required for the custom receiver implementation.

Deploying Custom Event Receiver

Deploying a custom event receiver is very simple in WSO2 CEP 4.0.0. Simply implement the custom event receiver type, build the project and copy the created OSGI bundle that is inside the "target" folder into the <CEP_HOME>/repository/components/dropins. In CEP server startup, you can see the newly created event receiver type service in the server startup logs. The newly created custom event receiver type will also be visible in the UI with necessary properties. Now you can create several instances of this event receiver type.

  • No labels