Implementing an event-notification mechanism

Table 85 shows the support that the Java connector library provides for the development of an event-notification mechanism:

Table 85. Support for an event-notification mechanism
Java connector library support For more information
The following classes for the encapsulation of access to the event store:
  • CWConnectorEvent
  • CWConnectorEventStatusConstants
  • CWConnectorEventStore
  • CWConnectorEventStoreFactory
Obtaining access to the event store
A poll method, pollForEvents(), that polls the event store at a specified frequency. Implementing the pollForEvents() method

Note:
For an introduction to event notification, see Event notification.. For a discussion of event-notification mechanisms and the implementation of pollForEvents(), see Event notification.

Obtaining access to the event store

If a connector is expected to process information that originates in its application, it must obtain access to the application's event store. Table 86 shows the support that the Java connector library provides in support of obtaining access to an event store from within a Java connector.

Table 86. Support for defining access to an event store
Java connector library class Description
Event store CWConnectorEventStoreFactory Provides a single method that creates an event-store object
CWConnectorEventStore Represents the event store
Event CWConnectorEvent Represents an event object, which provides access to an event record within the Java connector.

Defining the event store

As Table 86 shows, the Java connector library provides the following classes to define an event store:

CWConnectorEventStore class

The CWConnectorEventStore class defines an event store. As Table 87 shows, this class provides an additional layer for standardizing the event retrieval, processing, and archiving mechanisms.

Table 87. Methods of the CWConnectorEventStore class
Event-store task CWConnectorEventStore method Implementation status
Event retrieval fetchEvents() Must be implemented
getBO() Implementation provided in base class--however, you must override this implementation if your connector does not support the RetrieveByContent verb.
getNextEvent() Implementation provided in base class
Event processing recoverInProgressEvents() Must be implemented
resubmitArchivedEvents() Must be implemented
setEventStatus() Must be implemented
setEventsToProcess() Implementation provided in base class
updateEventStatus() Implementation provided in base class
Archiving archiveEvent() Must be implemented--if the connector supports archiving.
deleteEvent() Must be implemented
Error processing getTerminate(),, setTerminate() Implementation provided in base class
Resource cleanup cleanupResources() Not required for the event-store class but must be implemented if resources used to access the event store need to be released.

To define an event store, follow these steps:

  1. Extend the CWConnectorEventStore class, naming your new class to identify the event store that your connector accesses.
  2. Define any additional data members that your event store might require.

    The CWConnectorEventStore class contains a single data member: an events vector array called eventsToProcess. Events retrieved from the event store are saved in this Java Vector object. Declare any other information that is required to access the application's event and archive stores as data members in your extended CWConnectorEventStore class. This information should include the location of the event and archive stores. For example:

  3. Implement the appropriate abstract methods within the CWConnectorEventStore class (see Table 87) to provide access to the event store.

    You can implement those CWConnectorEventStore methods that your event store requires, with the following conditions:

  4. Access the CWConnectorEventStore methods as needed to perform event retrieval, event processing, and archiving from within the pollForEvents() poll method. For more information, see Implementing the pollForEvents() method.

Note:
For more information on the methods of CWConnectorEventStore, see CWConnectorEventStore class.
CWConnectorEventStoreFactory interface

The CWConnectorEventStoreFactory interface defines an event-store factory, which provides a method to instantiate an event store, as Table 88 shows.

Table 88. Method of the CWConnectorEventStoreFactory interface
CWConnectorEventStoreFactory method Implementation status
getEventStore() Must be implemented

To define an event-store factory, follow these steps:

  1. Create a new event-store-factory class to implement the CWConnectorEventStoreFactory interface. Name your new class to include the name of the event store that your CWConnectorEventStore class accesses.
  2. Implement the getEventStore() method of the CWConnectorEventStoreFactory interface within your event-store-factory class to provide an event-store factory for your extended CWConnectorEventStore class.
  3. Determine whether to use the default implementation of the getEventStore() method in the CWConnectorAgent class to instantiate an event store. The default implementation of the pollForEvents() method uses this getEventStore() method to obtain a reference to the event store.

Defining an event object

The Java connector obtains event records from the event store and encapsulates them as event objects. The event-store class builds event objects for each event record that the connector retrieves from the event store. The information in each event object is then used to build and retrieve the business object that the connector sends to the integration broker.

The default event object that CWConnectorEvent defines contains the event information in Table 46.. The CWConnectorEvent class provides access methods for this information, as Table 89 shows.

Table 89. Methods to retrieve information in an event object
Element CWConnectorEvent method
Event Id getEventID()
Business object name getBusObjName()
Business object verb getVerb()
Object key getIDValues(),, getKeyDelimiter()

These CWConnectorEvent methods provide access to the actual data values that identify the business object. The getIDValues() method assumes that this data is a name/value pair. For example, if the object key contains data for the ContractId attribute in the business object, the name/value pair in the business object data would be: ContractId=45381If the object key in the event record contains a concatenation of fields, the getIDValues() assumes that each name/value pair is separated by a delimiter, which the getKeyDelimiter() method returns. The delimiter should be configurable as set by the PollAttributeDelimiter connector configuration property. The default value for the delimiter is a colon (:).

Priority getPriority()
Timestamp getEventTimeStamp()
Status getStatus()

Use the following methods to set event status: getNextEvent(),, recoverInProgressEvents(),, resubmitArchivedEvents(),, setEventStatus(), updateEventStatus().

Description A text string describing the event.
ConnectorID getConnectorID()

In addition to providing the standard information in an event record (shown in Table 89), the event object also provides accessor methods for the information shown in Table 90..

Table 90. Additional event information in the event object
Element Description Accessor method
Effective date Date on which the event becomes active and should be processed. This information might be useful when there is a change to an object in one system that should not be propagated until the date on which it becomes effective (such as a salary change). getEffectiveDate()
Event source Source from where the event originated. This information might be needed by a connector that needs to track the event source for archiving. getEventSource(),, setEventSource()
Triggering user User identifier (ID) associated with the user that triggered this event. This information can be used to avoid synchronization problems between two systems. getTriggeringUser()

If your event record requires information beyond what the default event class provides (Table 89 and Table 90), you can take the following steps:

  1. Extend the CWConnectorEvent class, naming your new class to identify the event store whose event records your event class encapsulates.
  2. Define any additional data members that your event might require.

    The CWConnectorEvent class contains the data members whose accessor methods are listed in Table 89 and Table 90.. Any other information that is required to access the application's event records needs to be declared as data members in your extended CWConnectorEvent class.

  3. Provide accessor methods for any data members you add to your extended CWConnectorEvent class.

    To support true encapsulation, your data members should be private members of your extended CWConnectorEvent class. To provide access to these data members, you define a "get" methods to retrieve each data member's value. You can also define "set" methods for those data members that connector developers are allowed to set.

Note:
For more information on the methods of CWConnectorEvent, see CWConnectorEvent class.

Implementing the pollForEvents() method

For a Java connector, the CWConnectorAgent class defines the pollForEvents() method. This class provides a default implementation of pollForEvents(). You can use this default implementation or override the method with your own poll method. However, the pollForEvents() method must be implemented.

The Java-based pseudo-code in Figure 62 shows the basic logic flow for a pollForEvents() method. The method first retrieves a set of events from the event store. For each event, the method calls the isSubscribed() method to determine whether any subscriptions exist for the corresponding business object. If there are subscriptions, the method retrieves the data from the application, creates a new business object, and calls gotApplEvent() to send the business object to InterChange Server. If there are no subscriptions, the method archives the event record with a status value of unprocessed.

Figure 62. Java pollForEvents() example
public int pollForEvents() 
{       
   int status = 0; 
   get the events from the event store
   for (events 1 to MaxEvents in event store) {
      extract BOName, verb, and key from the event record 
        if(ConnectorBase.isSubscribed(BOName,BOverb) {
            BO = JavaConnectorUtil.createBusinessObject(BOName) 
            BO.setAttrValue(key) 

            retrieve application data using doVerbFor() 
            BO.setVerb(Retrieve) 
            BO.doVerbFor()
            BO.setVerb(BOverb) 
            status = gotApplEvent(BusinessObject);

            archive event record with success or failure status 
        } 
        else {
            archive item with unsubscribed status
   }
   return status;
}

Note:
For a flow chart of the poll method's basic logic, see Figure 27..

This section provides more detailed information on each of the steps in the basic logic for the event processing that the pollForEvents() method typically performs. Table 91 summarizes these basic steps.

Table 91. Basic logic of the pollForEvents() method
Step For more information
1. Set up a subscription manager for the connector. Accessing a subscription manager
2. Verify that the connector still has a valid connection to the event store. Verifying the connection before accessing the event store
3. Retrieve specified number of event records from the event store and store them in an events array. Cycle through the events array. For each event, mark the event in the event store as In-Progress and begin processing. Retrieving event records
4. Get the business object name, verb, and key data from the event record. Getting the business object name, verb, and key
5. Check for subscriptions to the event. Checking for subscriptions to the event
If the event has subscribers:
  • Retrieve application data and create the business object.
Retrieving application data
  • Send the business object to the connector framework for event delivery.
Sending the business object to the connector framework
  • Complete event processing.
Completing the processing of an event
If the event does not have subscribers, update the event status to Unsubscribed. Checking for subscriptions to the event
6. Archive the event. Archiving the event
7. Release resources used to access the event store.

Accessing a subscription manager

As part of connector initialization, the connector framework instantiates a subscription manager. This subscription manager keeps the subscription list current. (For more information, see Business object subscription and publishing.) A connector has access to the subscription manager and the connector subscription list through a subscription handler, which is included in the connector base class. It can use methods of this class to determine whether business objects have subscribers and to send business objects to the connector controller.

Note:
Unlike a C++ connector, a Java connector does not need to set up a subscription handler. This functionality is handled in the CWConnectorAgent class.

Verifying the connection before accessing the event store

When the agentInit() method in the connector class initializes the application-specific component, one of its most common tasks is to establish a connection to the application. The poll method requires access to the event store. Therefore, before the pollForEvents() method begins processing events, it should verify that the connector is still connected to the application. The way to perform this verification is application-specific. Consult your application documentation for more information.

A good design practice is to code the connector application-specific component so that it shuts down whenever the connection to the application is lost. If the connection has been lost, the connector should not continue with event polling. Instead, it should return APPRESPONSETIMEOUT to notify the connector framework of the loss of connection to the application.

Note:
To surface an APPRESPONSETIMEOUT outcome status returned by the doVerbFor() from within pollForEvents(), use the getTerminate() method of the CWConnectorEventStore class. For more information, see Retrieving application data.

Retrieving event records

To send event notifications to the connector framework, the poll method must first retrieve event records from the event store. Table 92 lists the methods that the Java connector library provides to retrieve event records from the event store.

Table 92. Classes and methods for event retrieval
Java connector library class Method
CWConnectorAgent getEventStore()
CWConnectorEventStoreFactory getEventStore()
CWConnectorEventStore fetchEvents(),, getNextEvent(),,updateEventStatus()

The poll method can retrieve one event record at a time and process it or it can retrieve a specified number of event records per poll and cache them to an events array. Processing multiple events per poll can improve performance when the application generates large numbers of events.

The number of events picked up in any polling cycle should be configurable using the connector configuration property PollQuantity. At install time, a system administrator sets the value of PollQuantity to an appropriate number, such as 50. The poll method can use the getConfigProp() to retrieve the value of the PollQuantity property, and then retrieve the specified number of event records and process them in a single poll.

The connector should assign the In-Progress status to any event that it has read out of the event store and has started to process. If the connector terminates while processing an event and before updating the event status to indicate that the event was either sent or failed, it will leave an In-Progress event in the table. For more information on how recover these In-Progress events, see Recovering In-Progress events.

The Java connector library provides the CWConnectorEventStore class to represent an event store. To retrieve event records from this event store, the poll method takes the following actions:

  1. Instantiate an event-store object with the getEventStore() method that is defined in the CWConnectorAgent class. The default implementation of this method calls the getEventStore() of the event-store-factory class named in the EventStoreFactory connector configuration property. The event-store-factory class implements the CWConnectorEventStoreFactory interface for your event store. For more information, see CWConnectorEventStoreFactory interface.
  2. Retrieve a specified number of event records from the event store with the fetchEvents() method.

    You must implement the fetchEvents() method as part of the CWConnectorEventStore class. This method can use the value of the PollQuantity connector configuration property as the number of event records to retrieve. The method must take the following actions:

    The fetchEvents() method should throw the StatusChangeFailedException exception if the application is unable to fetch events because it is unable to access the event store. When the pollForEvents() method catches this exception, it can return the APPRESPONSETIMEOUT outcome status to indicate the lack of response from the application's event store.

Setting the event status to IN_PROGRESS indicates that the poll method has begun processing on the event. Figure 63 shows a code fragment that retrieves event records from the event store, accessing each as an event object.

Figure 63. Retrieving event records from the event store
// Instantiate event store
CWConnectorEventStore evts=getEventStore();

// Fetch PollQuantity number of events from the application.
try 
   {
   evts.fetchEvents();
   } 
catch (StatusChangeFailedException e) 
   {
   // log error message
   return CWConnectorConstant.FAIL;
   }
}
// Get the property values for PollQuantity
int pollQuantity;
String poll=CWConnectorUtil.getConfigProp("PollQuantity");
if (poll == null || poll.equals(""))
   pollQuantity=1;
else
   pollQuantity=Integer.parseInt(poll);

   for (int i=0; i < pollQuantity; i++)
   {
      // Process each event retrieved from the application.
      // Get the next event to be processed.
      evtObj=evts.getNextEvent();

Getting the business object name, verb, and key

Once the connector has retrieved an event, it extracts the event ID, the object key, and the name and verb of the business object from the event record. The connector uses the business object name and verb to determine whether the integration broker is interested in this type of business object. If the business object and its active verb have subscribers, the connector uses the entity key to retrieve the complete set of data.

Table 93 lists the methods that the Java connector library provides to obtain the name of the business object definition and the verb from the retrieved event records.

Table 93. Methods for obtaining event information
Java connector library class Method
CWConnectorEvent getBusObjName(),, getVerb()

Important:
The connector should send the business object with the same verb that was in the event record.

Once the getNextEvent() method has retrieved an event object to be processed, the Java connector can use the appropriate accessor methods of the CWConnectorEvent class to obtain the information needed to check for an event subscription, as follows:

Event ID getEventID()
Business object name getBusObjName()
Verb getVerb()
Object key getIDValues()

For sample code that uses these accessor methods, see Figure 64..

Checking for subscriptions to the event

To determine whether the integration broker is interested in receiving a particular business object and verb, the poll method calls the isSubscribed() method. The isSubscribed() method takes the name of the current business object and a verb as arguments. The name of the business object and verb must match the name of the business object and verb in the repository.

WebSphere InterChange Server

If your business integration system uses InterChange Server, the poll method can determine if any collaboration subscribes to the business object with a particular verb. At initialization, the connector framework requests its subscription list from the connector controller at connector initialization. At runtime, the application-specific component can use isSubscribed() to query the connector framework to verify that some collaboration subscribes to a particular business object. The application-specific connector component can send the event only if some collaboration is currently subscribed.

Other integration brokers

If your business integration system uses a WebSphere message broker (WebSphere MQ Integrator, WebSphere MQ Integrator Broker, or WebSphere Business Integration Message Broker) or WebSphere Application Server, the connector framework assumes that the integration broker is interested in all the connector's supported business objects. If the poll method uses the isSubscribed() method to query the connector framework about subscriptions for a particular business object, the method returns true for every business object that the connector supports.

Table 94 lists the methods that the Java connector library provides to check for subscriptions to the event.

Table 94. Classes and methods for checking subscriptions
Java connector library class Method
CWConnectorAgent isSubscribed()
CWConnectorEventStore updateEventStatus(),, archiveEvent(),, deleteEvent()

Based on the value that isSubscribed() returns, the poll method should take one of the following actions based on whether there are subscribers for the event:

For a Java connector, the isSubscribed() method is defined in the CWConnectorAgent class because the subscription manager is part of the connector base class. The method returns true if there are subscribers and false if there are no subscribers. Figure 64 shows a code fragment that checks for subscriptions in a Java connector.

Figure 64. Checking for an event subscription
if (isSubscribed(evtObj.getBusObjName(),evtObj.getVerb())) {
      // handle event
} else
   {
      // Update the event status to UNSUBSCRIBED.
      evts.updateEventStatus(evtObj,
         CWConnectorEventStatusConstants.UNSUBSCRIBED);

      // Archive the event (if archiving is supported) 
      return CWConnectorConstant.FAIL;
   }

If no subscriptions exist for the event, this code fragment uses the updateEventStatus() method to update the event's status to UNSUBSCRIBED and then archives the event.

Events that have subscriptions

If there are subscribers for an event, the connector takes the following actions:

Connector action taken For more information
Retrieve the complete set of business object data from the entity in the application database. Retrieving application data
Send the business object to the connector framework, which routes it to the integration broker. Sending the business object to the connector framework
Complete the processing on the event. Completing the processing of an event
Archive the event (if archiving is implemented) in case the integration broker subscribes at a later time. Archiving the event
Events that do not have subscriptions

If there are no subscriptions for the event, the connector should take the following actions:

No other processing should be done with unsubscribed events. If at a later date, the integration broker subscribes to these events, a system administrator can move the unsubscribed event records from the archive store back to the event store.

Retrieving application data

If there are subscribers for an event, the poll method must take the following steps:

  1. Retrieve the complete set of data for the entity from the application.

    To retrieve the complete set of entity data, the poll method must use name of the entity's key information (which is stored in the event) to locate the entity in the application. The poll method must retrieve the complete set of application data when the event has the following verbs:

    For a Delete event from an application that supports physical deletes, the application may have already deleted the entity from the database, and the connector may not be able to retrieve the entity data. For information on delete processing, see Processing Delete events.

  2. Package the entity data in a business object.

    Once the populated business object exists, the poll method can publish the business object to subscribers.

Table 95 lists the method that the Java connector library provides to retrieve entity data from the application database and populate a business object.

Table 95. Method for retrieving business object data
Java connector library class Method
CWConnectorEventStore getBO()

Note:
If the event is a delete operation and the application supports physical deletions of data, the data has most likely been deleted from the application, and the connector cannot retrieve the data. In this case, the connector simply creates a business object, sets the key from the object key of the event record, and sends the business object.

For a Java connector, the standard way of retrieving application data from within pollForEvents() is to use the getBO() method in the CWConnectorEventStore class. This method takes the following steps:

If the call to getBO() is successful, it returns the populated CWConnectorBusObj object. The following line shows a call to getBO() that returns a populated CWConnectorBusObj object called bo:

bo = evts.getBO(evtObj);

In case the getBO() call is not successful, the poll method should take the following steps:

The ObjectEventId attribute is used in the IBM WebSphere business integration system to track the flow of business objects through the system. In addition, it is used to keep track of child business objects across requests and responses, as child business objects in a hierarchical business object request might be reordered in a response business object.

Connectors are not required to populate ObjectEventId attributes for either a parent business object or its children. If business objects do not have values for ObjectEventId attributes, the business integration system generates values for them. However, if a connector populates child ObjectEventIds, the values must be unique across all other ObjectEventId values for that particular business object regardless of level of hierarchy. ObjectEventId values can be generated as part of the event notification mechanism. For suggestions on how to generate ObjectEventId values, see Event identifier.

Sending the business object to the connector framework

Once the data for the business object has been retrieved, the poll method performs the following tasks:

Table 96 lists the methods that the Java connector library provides to perform these tasks.

Table 96. Classes and methods for setting the verb and sending the business object
Java connector library class Method
CWConnectorBusObj setVerb()
CWConnectorEvent getVerb()
CWConnectorAgent gotApplEvent()
Setting the business object verb

To set the verb in a business object to the verb specified in the event record, the poll method calls the business object method setVerb(). The poll method should set the verb to the same verb that was in the event record in the event store.

Note:
If the event is a physical delete, use the object keys from the event record to set the keys in the business object, and set the verb to Delete.

For a Java connector, the populated CWConnectorBusObj object that the getBO() method returns still has a verb of RetrieveByContent. The poll method must set the business object's verb to its original value with the setVerb() method of the CWConnectorBusObj class, as the following code fragment shows:

// Set verb to action as indicated in the event record

busObj.setVerb(evntObj.getVerb());

In this code fragment, the poll method uses the getVerb() of the CWConnectorEvent class to obtain the verb from the event record. This verb is then copied into the business object with setVerb().

Sending the business object

The poll method uses the method gotApplEvent() to send the business object to the connector framework. This method takes the following steps:

The connector framework does some processing on the event object to serialize the data and ensure that it is persisted properly. It then makes sure the event is sent.

WebSphere InterChange Server

If your business integration system uses InterChange Server, the connector framework makes sure the event is either sent to the ICS through CORBA IIOP or written to a queue (if you are using queues for event notification). If sending the event to ICS, the connector framework forwards the business object to the connector controller, which in turn performs any mapping required to transform the application-specific business object to a generic business object. The connector controller can then send the generic business object to the appropriate collaboration.

Other integration brokers

If your business integration system uses a WebSphere message broker (WebSphere MQ Integrator, WebSphere MQ Integrator Broker, or WebSphere Business Integration Message Broker) or WebSphere Application Server, the connector framework makes sure the event is converted to an XML WebSphere MQ message and written to the appropriate MQ queue.

The poll method should check the return code from gotApplEvent() to ensure that any error conditions are handled appropriately. For example, until the event delivery is successful, the poll method should not remove the event from the event store. Instead, the poll method should update the event record's status to reflect the results of the event delivery. Table 97 shows the possible event-status values, based on the return code from gotApplEvent().

Table 97. Possible event status after event delivery with gotApplEvent()
State of event delivery Return code of gotApplEvent() Event status
If the event delivery is successful SUCCEED SUCCESS
If no subscription exists for the event NO_SUBSCRIPTION_FOUND UNSUBSCRIBED
If the connector has been paused CONNECTOR_NOT_ACTIVE READY_FOR_POLL
If the event delivery fails FAIL ERROR_POSTING_EVENT

The gotApplEvent() method returns SUCCEED if the connector framework successfully delivers the business object. The poll method checks the return code from gotApplEvent() to ensure that the event record's status is updated appropriately. If gotApplEvent() returns any return code except FAIL, the poll method returns SUCCEED so that it continues to poll for events. However, on a FAIL return code from gotApplEvent(), event delivery has failed so the poll method logs an error message and fails.

Table 98 shows the actions that pollForEvents() takes based on the gotApplEvent() return code.

Table 98. Possible pollForEvents() actions after event delivery with gotApplEvent()
Return code of gotApplEvent() Actions in pollForEvents()
SUCCEED

  1. Reset the event status to SUCCESS.
  2. If the ArchiveProcessed connector property is set to true, archive the event and delete it from the event store.
  3. Continue polling.
NO_SUBSCRIPTION_FOUND

  1. Log an error message.
  2. Reset the event status to UNSUBSCRIBED.
  3. If the ArchiveProcessed connector property is set to true, archive the event and delete it from the event store.
  4. Continue polling.
CONNECTOR_NOT_ACTIVE

  1. Log an informational message at a trace level of 3.
  2. Prepare the event for future re-execution:
    • For application adapters, reset the event status to READY_FOR_POLL.
    • For technology adapters, push back the event (if possible).
  3. Return SUCCEED as the pollForEvents() outcome status.

Note:
In this case, the event is not archived.
FAIL

  1. Log an error message.
  2. Reset the event status to ERROR_POSTING_EVENT.
  3. If the ArchiveProcessed connector property is set to true, archive the event and delete it from the event store.
  4. Return FAIL as the pollForEvents() outcome status.

As Table 98 shows, the action that pollForEvents() takes when the gotApplEvents() method returns an outcome status of CONNECTOR_NOT_ACTIVE depends on the type of connector you have created. For an application connector (in particular a connector whose application uses a database as its event store), the pollForEvents() method should reset the event's status to READY_FOR_POLL to revert an event back to its "unprocessed" state.

However, for technology connectors (in particular, those that do not use event tables and therefore cannot always revert an event back to an "unprocessed" state), the connector can hold the event in memory and return an outcome status of SUCCEED from pollForEvents(), rather than attempting to "push" the event back. The connector should keep this event in memory until the adapter is re-activated and pollForEvents() is again invoked. At this time, the connector can try to republish the event.

The following code fragment shows how this functionality might be implemented.

BusinessObject eventOnHold;

pollForEvents(...)
{
   ...
   if eventOnHold != null
   {
      event = eventOnHold;
      eventOnHold = null;
   }

   else
   {
      event = getNextUnprocessedEvent();
   }

   ...

   result = gotApplEvent( event );
   if (result == CWConnectorConstant.CONNECTOR_NOT_ACTIVE )
   {
      eventOnHold = event;
      return CWConnectorConstant.SUCCEED;
   }

Note:
Keep in mind that if you pause the adapter while it is actively processing an event and then later terminate this adapter (or it terminates unexpectedly on its own), "in-doubt" events can result for these events that the connector (using the above logic) has copied to memory. Different adapters have different strategies for how to handle in-doubt events. However, the result of this logic can mean the creation of "in-doubt" events even though the adapter was seemingly terminated properly. These events are not lost.

When implementing the pollForEvents() response to the CONNECTOR_NOT_ACTIVE return status, keep in mind that the programming approaches discussed here assume that the adapter places an event in an "in-progress" state while it processes and sends the event to the integration broker. However, not all adapters are implemented this way. An adapter might simply receive an event from a source and then call gotApplEvent() to send it to the integration broker. If this adapter terminates in the time between when it receives the event and when it calls gotApplEvent(), the event is lost. When such an adapter is restarted, it has no way of reprocessing the event.

Completing the processing of an event

The processing of an event is complete with the completion of the tasks in Table 99.

Table 99. Steps in processing an event
Event-processing task For more information
The poll method has retrieved the application data for the event and created a business object that represents the event. Retrieving application data
The poll method has sent the business object to the connector framework. Sending the business object to the connector framework

Note:
For hierarchical business objects, the event processing is complete when the poll method has retrieved the application data for the parent business object and all child business objects and sent the complete hierarchical business object to the connector framework. The event notification mechanism must retrieve and send the entire hierarchical business object, not just the parent business object.

The poll method must ensure that the event status correctly reflects the completion of the event processing. Therefore, it must handle both of the following conditions:

Handling successful event processing

The processing of an event is successful when the tasks in Table 99 successfully complete. The following steps show how the poll method should finish processing a successful event:

  1. Receive a "success" return code from the gotApplEvent() method signifying the connector framework's successful delivery of the business object to the messaging system.
  2. Copy the event to the archive store. For more information, see Archiving the event.
  3. Set the status of the event in the archive store.
  4. Delete the event record from the event store.

    Until the event delivery is successful, the poll method should not remove the event from the event table.

Note:
The order of the steps might be different for different implementations.
Handling unsuccessful event processing

If an error occurs in processing an event, the connector should update the event status to indicate that an error has occurred. Table 100 shows the possible event-status values, based on errors that can occur during event processing.

Table 100. Possible event status after errors in event processing
State of event delivery Event status Does polling terminate?
If an error occurs in processing an event ERROR_PROCESSING_EVENT No, retrieve the next event from the event store
If the event delivery fails ERROR_POSTING_EVENT Yes
If no subscriptions exist for the event UNSUBSCRIBED No, retrieve the next event from the event store

For example, if there are no application entities matching the entity key, the event status should be updated to "error processing event". If the event cannot be successfully delivered, its event status should be updated to "error posting event". As discussed in Sending the business object, the poll method should check the return code from gotApplEvent() to ensure that any errors that are returned are handled appropriately.

In any case, the event should be left in the event store to be analyzed by a system administrator. When the poll method queries for events, it should exclude events with the error status so that these events are not picked up. Once an event's error condition has been resolved, the system administrator can manually reset the event status so that the event is picked up by the connector on the next poll.

Archiving the event

Archiving an event consists of moving the event record from the event store to an archive store. The Java connector library provides the CWConnectorEventStore class to represent an event store, which includes the archive store. Table 101 lists the methods that the Java connector library provides to archive events.

Table 101. Methods for archiving events
Java connector library class Method
CWConnectorEventStore updateEventStatus(), archiveEvent(), deleteEvent()

Note:
For a general introduction to archiving, see Archiving events..

To archive event records from this event store, the poll method takes the following actions:

  1. Ensure that archiving is implemented by checking the value of the appropriate connector configuration property, such as ArchiveProcessed. For more information, see Configuring a connector for archiving..
  2. Copy the event record from the archive store to the event store with the archiveEvent() method.

    To provide event archiving, you must implement the archiveEvent() method as part of the CWConnectorEventStore class. This method identifies the event record to copy by its event ID.

    The archiveEvents() method should throw the ArchiveFailedException exception if the application is unable to archive the event because it is unable to access the event store. When the pollForEvents() method catches this exception, it can return the APPRESPONSETIMEOUT outcome status to indicate the lack of response from the application's event store.

  3. Update the event status of the archive record with the updateEventStatus() method to reflect the reason for archiving the event.

    Table 102 shows the likely event-status constants that the archive record will have.

    Table 102. Event-status constants in an archive record
    Event status Description
    SUCCESS The event was detected, and the connector created a business object for the event and sent the business object to the connector framework. For more information, see Handling successful event processing.
    UNSUBSCRIBED The event was detected, but there were no subscriptions for the event, so the event was not sent to the connector framework and on to the integration broker. For more information, see Checking for subscriptions to the event.
    ERROR_PROCESSING_EVENT The event was detected, but the connector encountered an error when trying to process the event. The error occurred either in the process of building a business object for the event or in sending the business object to connector framework. For more information, see Handling unsuccessful event processing.

    The updateEventStatus() method should throw the StatusChangeFailedException exception if the application is unable to change the event status because it is unable to access the event store. When the pollForEvents() method catches this exception, it can return the APPRESPONSETIMEOUT outcome status to indicate the lack of response from the application's event store.

  4. Delete the event record from the event store with the deleteEvent() method.

    You must implement the deleteEvent() method as part of the CWConnectorEventStore class. This method uses the event ID to identify the event record to delete.

    The deleteEvents() method should throw the DeleteFailedException exception if the application is unable to delete the event because it is unable to access the event store. When the pollForEvents() method catches this exception, it can return the APPRESPONSETIMEOUT outcome status to indicate the lack of response from the application's event store.

Figure 65 contains a code fragment that archives an event.

Figure 65. Archiving an event
// Archive the event if ArchiveProcessed is set to true.
if (arcProcessed.equalsIgnoreCase("true")) {
   // Archive the event in the application's archive store.
   evts.archiveEvent(evtObj.getEventID());

   // Delete the event from the event store.
   evts.deleteEvent(evtObj.getEventID());
   }

After archiving is complete, your poll method should set the appropriate return code:

Releasing event-store resources

Often, the pollForEvents() method needs to allocate resources to access the event store. To prevent excessive memory usage by these resources, you can release them at the end of the poll method. Table 103 lists the methods that the Java connector library provides to release event-store resources.

Table 103. Method for releasing event-store resources
Java connector library class Method
CWConnectorEventStore cleanupResources()

For example, if the event store is implemented as event tables in a database, pollForEvents() might allocate SQL cursors to access these tables. You can implement a cleanupResources() method to free these SQL cursors. At the end of pollForEvents(), you can then call cleanupResources() to free the memory that these cursors use.

Note:
The CWConnectorEventStore class does not provide a default implementation of the cleanupResources() method. To free event-store resources, you must override cleanupResources() with a version that releases the resources needed to access your event store.

Default implementation of the Java pollForEvents()

Figure 66 shows the default implementation of the pollForEvents() in the CWConnectorAgent class. You can use this default implementation, which follows the basic logic outlined in Basic logic for pollForEvents(), or you can override this method with your own implementation.

Figure 66. Implementation of basic logic for pollForEvents()
/**
 * Default implementation of pollForEvents.
 */
public int pollForEvents() {
   CWConnectorUtil.traceWrite(
      CWConnectorLogAndTrace.LEVEL5,"Entering pollForEvents.");

   // Get the EventStoreFactory implementation name from the
   // getEventStore() method.
   CWConnectorEventStore evts=getEventStore();
   if (evts==null)
   {
      CWConnectorUtil.generateAndLogMsg(10533, 
         CWConnectorLogAndTrace.XRD_ERROR, 0, 0);
      return CWConnectorConstant.APPRESPONSETIMEOUT
   }
   try { //finally block
       // Fetch PollQuantity number of events from the application.
      try {
         evts.fetchEvents();
      } catch (StatusChangeFailedException e) {
         CWConnectorUtil.generateAndLogMsg(10533,
            CWConnectorLogAndTrace.XRD_ERROR,0,0);
         CWConnectorUtil.logMsg(e.getMessage());
         e.printStackTrace();
         return CWConnectorConstant.APPRESPONSETIMEOUT;
      }

   // Get the property values for PollQuantity and ArchiveProcessed.
   int pollQuantity;
   String poll=CWConnectorUtil.getConfigProp("PollQuantity");
   try {
      if (poll == null || poll.equals(""))
         pollQuantity=1;
      else
         pollQuantity=Integer.parseInt(poll);
   } catch (NumberFormatException e) {
      CWConnectorUtil.generateAndLogMsg(10544, 
         CWConnectorLogAndTrace.XRD_ERROR, 0);
      CWConnectorUtil.logMsg(e.getMessage());
      e.printStackTrace();
      return CWConnectorConstant.FAIL;
   }

   String arcProcessed=CWConnectorUtil.getConfigProp(
      "ArchiveProcessed");

   // In case the ArchiveProcessed property is not set, use true 
   // as default.
   if (arcProcessed == null || arcProcessed.equals(""))
      arcProcessed=CWConnectorAttrType.TRUESTRING;
   CWConnectorEvent evtObj;
   CWConnectorBusObj bo=null;
   try {
      for (int i=0; i < pollQuantity; i++){

         // Process each event retrieved from the application.
         // Get the next event to be processed.
         evtObj=evts.getNextEvent();

         // A null return indicates that there were no events with
         // READY_FOR_POLL status. Return SUCCESS.
         if (evtObj == null) {
            CWConnectorUtil.generateAndLogMsg(10534,
               CWConnectorLogAndTrace.XRD_INFO,0,0);
            return CWConnectorConstant.SUCCEED;
         }
         // Check if the connector has subscribed to the event
         // generated for the business object.
         boolean isSub=isSubscribed(evtObj.getBusObjName(),
            evtObj.getVerb());
         if (isSub) {
            // Retrieve the complete CWConnectorBusObj corresponding 
            // to the object using the getBO method in
            // CWConnectorEventStore. This method sets the verb on a
            // temporary business object to RetrieveByContent 
            // and retrieves the corresponding data information to be
            // filled in the business object from the application.
            try {
               bo = evts.getBO(evtObj);
               // Terminate flag will be set in the event store when
               // the doVerbFor method returns APPRESPONSETIMEOUT in
               // getBO.
               if (evts.getTerminate())
                  return CWConnectorConstant.APPRESPONSETIMEOUT;
            }catch (AttributeNotFoundException e) {
               CWConnectorUtil.generateAndLogMsg(10536,
                  CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
                  "getBO","AttributeNotFoundException");
               CWConnectorUtil.logMsg(e.getMessage());
               e.printStackTrace();
               // Update the event status to ERROR_PROCESSING_EVENT
               evts.updateEventStatus(evtObj,
                  CWConnectorEventStatusConstants.ERROR_PROCESSING_EVENT);
               if (arcProcessed.equalsIgnoreCase(CWConnectorAttrType.TRUESTRING))
               {
                  // Archive the event in the application's archive store
                  evts.archiveEvent(evtObj.getEventID());
                  // Delete the event from the event store
                  evts.deleteEvent(evtObj.getEventID());
               }
               continue;
            }catch (SpecNameNotFoundException e) {
               CWConnectorUtil.generateAndLogMsg(10536,
                  CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
                  "getBO","SpecNameNotFoundException");
               CWConnectorUtil.logMsg(e.getMessage());
               e.printStackTrace();
               // Update the event status to ERROR_PROCESSING_EVENT
               evts.updateEventStatus(evtObj,
                  CWConnectorEventStatusConstants.ERROR_PROCESSING_EVENT);
               if (arcProcessed.equalsIgnoreCase(CWConnectorAttrType.TRUESTRING))
               {
                  // Archive the event in the application's archive store
                  evts.archiveEvent(evtObj.getEventID());
                  // Delete the event from the event store
                  evts.deleteEvent(evtObj.getEventID());
               }
               continue;
            }catch (InvalidVerbException e) {
               CWConnectorUtil.generateAndLogMsg(10536,
                  CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
                  "getBO","InvalidVerbException");
               CWConnectorUtil.logMsg(e.getMessage());
               e.printStackTrace();
               // Update the event status to ERROR_PROCESSING_EVENT
               evts.updateEventStatus(evtObj,
                  CWConnectorEventStatusConstants.ERROR_PROCESSING_EVENT);
               if (arcProcessed.equalsIgnoreCase(CWConnectorAttrType.TRUESTRING))
               {
                  // Archive the event in the application's archive store
                  evts.archiveEvent(evtObj.getEventID());
                  // Delete the event from the event store
                  evts.deleteEvent(evtObj.getEventID());
               }
               continue;
            }catch (WrongAttributeException e) {
               CWConnectorUtil.generateAndLogMsg(10536,
                  CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
                  "getBO","WrongAttributeException");
               CWConnectorUtil.logMsg(e.getMessage());
               e.printStackTrace();
               // Update the event status to ERROR_PROCESSING_EVENT
               evts.updateEventStatus(evtObj,
                  CWConnectorEventStatusConstants.ERROR_PROCESSING_EVENT);
               if (arcProcessed.equalsIgnoreCase(CWConnectorAttrType.TRUESTRING))
               {
                  // Archive the event in the application's archive store
                  evts.archiveEvent(evtObj.getEventID());
                  // Delete the event from the event store
                  evts.deleteEvent(evtObj.getEventID());
               }
               continue;
            }catch (AttributeValueException e) {
               CWConnectorUtil.generateAndLogMsg(10536,
                  CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
                  "getBO","AttributeValueException");
               CWConnectorUtil.logMsg(e.getMessage());
               e.printStackTrace();
               // Update the event status to ERROR_PROCESSING_EVENT
               evts.updateEventStatus(evtObj,
                  CWConnectorEventStatusConstants.ERROR_PROCESSING_EVENT);
               if (arcProcessed.equalsIgnoreCase(CWConnectorAttrType.TRUESTRING))
               {
                  // Archive the event in the application's archive store
                  evts.archiveEvent(evtObj.getEventID());
                  // Delete the event from the event store
                  evts.deleteEvent(evtObj.getEventID());
               }
               continue;
            }catch (AttributeNullValueException e) {
               CWConnectorUtil.generateAndLogMsg(10536,
                  CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
                  "getBO","AttributeNullValueException");
               CWConnectorUtil.logMsg(e.getMessage());
               e.printStackTrace();
               // Update the event status to ERROR_PROCESSING_EVENT
               evts.updateEventStatus(evtObj,
                  CWConnectorEventStatusConstants.ERROR_PROCESSING_EVENT);
               if (arcProcessed.equalsIgnoreCase(CWConnectorAttrType.TRUESTRING))
               {
                  // Archive the event in the application's archive store
                  evts.archiveEvent(evtObj.getEventID());
                  // Delete the event from the event store
                  evts.deleteEvent(evtObj.getEventID());
               }
               continue;
            }

            // Log a fatal error in case the object is not found.
            if (evtObj.getStatus()==
         CWConnectorEventStatusConstants.ERROR_OBJECT_NOT_FOUND) {
               CWConnectorUtil.generateAndLogMsg(10543,
                  CWConnectorLogAndTrace.XRD_FATAL,0,0);
               // Update the event status to ERROR_OBJECT_NOT_FOUND
               evts.updateEventStatus(evtObj,
                  CWConnectorEventStatusConstants.ERROR_OBJECT_NOT_FOUND);
               if (arcProcessed.equalsIgnoreCase(CWConnectorAttrType.TRUESTRING))
               {
                  // Archive the event in the application's archive store
                  evts.archiveEvent(evtObj.getEventID());
                  // Delete the event from the event store
                  evts.deleteEvent(evtObj.getEventID());
               }
               continue;
            }
            // In case the business object is null, the retrieve call 
            // returned an error.
            if (bo == null) {
               CWConnectorUtil.generateAndLogMsg(10335,
                  CWConnectorLogAndTrace.XRD_ERROR,0,0);
               // Update the event status to ERROR_PROCESSING_EVENT
               evts.updateEventStatus(evtObj,
                  CWConnectorEventStatusConstants.ERROR_PROCESSING_EVENT);
               if (arcProcessed.equalsIgnoreCase(CWConnectorAttrType.TRUESTRING))
               {
                  // Archive the event in the application's archive store
                  evts.archiveEvent(evtObj.getEventID());
                  // Delete the event from the event store
                  evts.deleteEvent(evtObj.getEventID());
               }
               continue;
            }
   
            // Set the processing verb on the business object.
            try {
               bo.setVerb(evtObj.getVerb());
            } catch(InvalidVerbException e){
               CWConnectorUtil.generateAndLogMsg(10536,
                  CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
                  "setVerb","InvalidVerbException");
               CWConnectorUtil.logMsg(e.getMessage());
               e.printStackTrace();
               // Update the event status to ERROR_PROCESSING_EVENT
               evts.updateEventStatus(evtObj,
                  CWConnectorEventStatusConstants.ERROR_PROCESSING_EVENT);
               if (arcProcessed.equalsIgnoreCase(CWConnectorAttrType.TRUESTRING))
               {
                  // Archive the event in the application's archive store
                  evts.archiveEvent(evtObj.getEventID());
                  // Delete the event from the event store
                  evts.deleteEvent(evtObj.getEventID());
               }
               continue;
            }
            // Check again for subscription.
            if (isSubscribed(bo.getName(),bo.getVerb())){
               // Send the event to integration broker.
               int stat=gotApplEvent(bo);
               if (stat == CWConnectorConstant.CONNECTOR_NOT_ACTIVE){
                  CWConnectorUtil.generateAndTraceMsg(
                     CWConnectorLogAndTrace.LEVEL3, 10551,
                     CWConnectorLogAndTrace.XRD_INFO, 0, 0);
                  evts.updateEventStatus(evtObj,
                     CWConnectorEventStatusConstants.READY_FOR_ROLL);
                  // No need to archive the event, as the status is reset to
                  // READY_FOR_POLL. It is as if this event never reached the 
                  // connector for processing.
                  return CWConnectorConstant.SUCCEED;
               }
               if (stat == CWConnectorConstant.NO_SUBSCRIPTION_FOUND){
                  CWConnectorUtil.generateAndLogMsg(10552,
                     CWConnectorLogAndTrace.XRD_ERROR, 0, 0);
                  // Update the event status to UNSUBSCRIBED.
                  evts.updateEventStatus(evtObj,
                     CWConnectorEventStatusConstants.UNSUBSCRIBED);
                  if (arcProcessed.equalsIgnoreCase(
                        CWConnectorAttrType.TRUESTRING)) {
                     // Archive the event in the application's archive store
                     evts.archiveEvent(evtObj.getEventID());
                     // Delete the event from the event store
                     evts.deleteEvent(evtObj.getEventID());
                  }
                  continue;
               }
               if (stat == CWConnectorConstant.SUCCEED){
                  // Update the event status to SUCCESS.
                  evts.updateEventStatus(evtObj,
                     CWConnectorEventStatusConstants.SUCCESS);
                  if (arcProcessed.equalsIgnoreCase(
                        CWConnectorAttrType.TRUESTRING)) {
                     // Archive the event in the application's archive store
                     evts.archiveEvent(evtObj.getEventID());
                     // Delete the event from the event store
                     evts.deleteEvent(evtObj.getEventID());
                  }
                  continue;
               } else // gotApplEvent returned FAIL
               {
                  CWConnectorUtil.generateAndLogMsg(10532,
                     CWConnectorLogAndTrace.XRD_ERROR,0,0);
                  // Update the event status to ERROR_POSTING_EVENT.
                  evts.updateEventStatus(evtObj,
            CWConnectorEventStatusConstants.ERROR_POSTING_EVENT);
                  // Archive the event if ArchiveProcessed is set
                  // to true.
                  if (arcProcessed.equalsIgnoreCase(
                        CWConnectorAttrType.TRUESTRING)) {
                     // Archive the event in the application's
                     // archive store.
                     evts.archiveEvent(evtObj.getEventID());
                     // Delete the event from the event store.
                     evts.deleteEvent(evtObj.getEventID());
                  }
                  return CWConnectorConstant.FAIL;
               }

            }  else // Event unsubscribed.
            {
               CWConnectorUtil.generateAndLogMsg(10552,
                  CWConnectorLogAndTrace.XRD_ERROR, 0, 0);
               // Update the event status to UNSUBSCRIBED.
               evts.updateEventStatus(evtObj,
                  CWConnectorEventStatusConstants.UNSUBSCRIBED);
               // Archive the event if ArchiveProcessed is set 
               // to true.
               if (arcProcessed.equalsIgnoreCase(
                     CWConnectorAttrType.TRUESTRING)) {
                  // Archive the event in the application's 
                  // archive store.
                  evts.archiveEvent(evtObj.getEventID());
                  // Delete the event from the event store.
                  evts.deleteEvent(evtObj.getEventID());
               }
               continue;
            }

         } else
         {
            CWConnectorUtil.generateAndLogMsg(10552,
               CWConnectorLogAndTrace.XRD_ERROR, 0, 0);
            // Update the event status to UNSUBSCRIBED.
            evts.updateEventStatus(evtObj,
               CWConnectorEventStatusConstants.UNSUBSCRIBED);
            // Archive the event if ArchiveProcessed is set
            // to true.
            if (arcProcessed.equalsIgnoreCase(
                  CWConnectorAttrType.TRUESTRING)) {
               // Archive the event in the application's 
               // archive store.
               evts.archiveEvent(evtObj.getEventID());
               // Delete the event from the event store.
               evts.deleteEvent(evtObj.getEventID());
            }
            continue;
         }
      } //For loop
}
   } catch (StatusChangeFailedException e){
      CWConnectorUtil.generateAndLogMsg(10536,
         CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
         "updateEventStatus","StatusChangeFailedException");
      CWConnectorUtil.logMsg(e.getMessage());
      e.printStackTrace();
      return CWConnectorConstant.APPRESPONSETIMEOUT;
   } catch (InvalidStatusChangeException e){
      CWConnectorUtil.generateAndLogMsg(10536,
         CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
         "updateEventStatus","InvalidStatusChangeException");
      CWConnectorUtil.logMsg(e.getMessage());
      e.printStackTrace();
      return CWConnectorConstant.APPRESPONSETIMEOUT;
   } catch (ArchiveFailedException e){
      CWConnectorUtil.generateAndLogMsg(10536,
         CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
         "archiveEvent","ArchiveFailedException");
      CWConnectorUtil.logMsg(e.getMessage());
      e.printStackTrace();
      return CWConnectorConstant.APPRESPONSETIMEOUT;
   } catch (DeleteFailedException e){
      CWConnectorUtil.generateAndLogMsg(10536,
         CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
         "deleteEvent","DeleteFailedException");
      CWConnectorUtil.logMsg(e.getMessage());
      e.printStackTrace();
      return CWConnectorConstant.APPRESPONSETIMEOUT;
   } catch (AttributeNullValueException e) {
      CWConnectorUtil.generateAndLogMsg(10536,
         CWConnectorLogAndTrace.XRD_ERROR, 0, 2,
         "get method in event store","AttributeNullValueException");
      CWConnectorUtil.logMsg(e.getMessage());
      e.printStackTrace();
      return CWConnectorConstant.FAIL;
   }
   } finally {
      evts.cleanupResources();
   }
   return CWConnectorConstant.SUCCEED;
}

Copyright IBM Corporation 1997, 2004. All Rights Reserved.