omniNotify Logo Free High Performance CORBA Notification Service
from AT&T Laboratories
ATT logo omniORB Home
AT&T Research
Home SF Project Features Release
Notes
Download Instal-
lation
Documen-
tation
Patch &
Bug List
More
Info
Keep in
Touch

Back to Documentation Home Page

Developing Event Filtering Applications

In the original CORBA Event Service model, every consumer application connected to an event channel receives all events that are announced to the channel. While this may be desirable in some application domains, it is inefficient when consumers are interested in only a subset of the announced events.  The CORBA Notification Service offers a solution to this problem and enables clients to subscribe to the precise set of events they are interested in receiving by using  filter objects.

Each filter object encapsulates a set of one or more constraints, each of which is specified in a constraint grammar. Each constraint consists of two components:

Filter objects implement the CosNotifyFilter::Filter interface. This interface includes two operations that are used to evaluate the constraints associated with a filter against a given event: match_structured and match. The former is used for events of type CosNotification::StructuredEvent and the latter for events of type CORBA::Any. The returned value of these operations is TRUE when at least one of the constraints associated with the filter object evaluates to TRUE, and FALSE, otherwise.

Filter objects can be associated with both administrative and proxy objects using the add_filter operation of the CosNotifyFilter::FilterAdmin interface, which is inherited by these objects. In addition these is no limit on the number of filters that can be associated with any of them. When multiple filters are associated with some administrative or proxy object, an event is considered to satisfy these filters when the corresponding match operation of at least one of these filters returns TRUE. Filters associated with administrative objects are inherited by all proxy objects created by them. Consequently, each proxy object may have two sets of filters, those added to it by invoking its add_filter operation and those inherited by its administrative object. These two sets of filters can be combined using AND or OR semantics.


Developing an Event Filtering Application

Filter objects are created by filter factories. Each omniNotify event channel has a default filter factory that can be used for creating filters using the extended Trader Constraint Language defined in the OMG Notification Service specification. The following code segment shows how to create a filter using the default filter factory.

// -*- Mode: C++; -*-

CosNotifyFilter::Filter_var             filterp;
CosNotifyFilter::FilterFactory_var      fltfact;
CosNotifyChannelAdmin::EventChannel_var channel;

// We assume that we have initialized channel to point to an event channel instance

fltfact = channel->default_filter_factory();
filterp = fltfact->create_filter("EXTENDED_TCL");

Once a filter is created, a sequence of constraints can be added to it. Each constraint in this sequence has the following structure:

struct ConstraintExp {
     CosNotification::EventTypeSeq event_types;
     string                         constraint_expr;
};

The following code segment shows how to add a constraint that evaluates to true for events having domain name "Financial", type name "StockQuote", and an attribute "Price" whose value is greater than 25.0.

// -*- Mode: C++; -*-

CosNotifyFilter::Filter_var            filterp;
CosNotifyFilter::ConstraintExpSeq      cexpr;
CosNotifyFilter::ConstraintInfoSeq_var cinfo;

cexpr.length(1);
cexpr[0].event_types.length(1);
cexpr[0].event_types[0].domain_name = CORBA::string_dup("Financial");
cexpr[0].event_types[0].type_name   = CORBA::string_dup("StockQuote");
cexpr[0].constraint_expr            = CORBA::string_dup("$Price > 25.0");

// We assume that we have initialized filterp to point to a filter object

cinfo = filterp->add_constraints(cexpr);
if ( cinfo->length() != 1 ) {
    cerr << "Failed to add constraint to filter object" << endl;
}

Once we are done with the above, we can attach the filter object to a proxy object to enable filtering of events. This is done by invoking the add_filter method on the proxy object. However, the above filter object gets evaluated only when suppliers supply events that match its constraint. The code segment below shows how to construct such events.

// -*- Mode: C++; -*-

CosNotification::StructuredEvent se;

se.header.fixed_header.event_type.domain_name =
                                CORBA::string_dup("Financial");
se.header.fixed_header.event_type.type_name =
                                CORBA::string_dup("StockQuote");
se.filterable_data.length(2);
se.filterable_data[0].name    = CORBA::string_dup("TickerSymbol");
se.filterable_data[0].value <<= (const char *) "T";
se.filterable_data[1].name    = CORBA::string_dup("Price");
se.filterable_data[1].value <<= (CORBA::Double) 50.375;

The complete listing of a push consumer that uses filtering is presented below.

// -*- Mode: C++; -*-

#include <iostream.h>
#include "CosNotifyComm.hh"
#include "CosNotifyChannelAdmin.hh"

class StructuredPushConsumer_i :
            public virtual CosNotifyComm::_sk_StructuredPushConsumer {
public:
    StructuredPushConsumer_i() : num_events(0)    {;}
    void disconnect_structured_push_consumer()    {;}
    void offer_change(const CosNotification::EventTypeSeq& added,
                      const CosNotification::EventTypeSeq& removed)
           { CORBA::ULong ix;
             for (ix=0; ix < added.length(); ix++) {
                 cout << "+ " << added[ix].domain_name <<
                         "::" << added[ix].type_name   << endl;
             }
             for (ix=0; ix < removed.length(); ix++) {
                  cout << "- " << removed[ix].domain_name <<
                          "::" << removed[ix].type_name   << endl;
             }
           }
    void push_structured_event(const CosNotification::StructuredEvent& se)
           {
             CORBA::Double val;
             const char*    ticker;
             se.filterable_data[0].value >>= ticker;
             se.filterable_data[1].value >>= val;
             cout << se.header.fixed_header.event_type.domain_name << "::"
                  << se.header.fixed_header.event_type.type_name   << " : "
                  << ticker << " price " << val << endl;
             num_events += 1;
           }
    CORBA::Long get_num_events() const   { return num_events; }
private:
    //
The following is used to keep track of the number of events pushed by the event channel
    CORBA::Long num_events;
};

//////////////////////////////////////////////////////////////////////////

int main(int argc, char** argv)
{
  CORBA::Long numEvents = 100;

  try {
     CosNaming::Name                                        name;
     CORBA::Object_var                                      echannel_ref;
     CORBA::Object_var                                      name_service;
     CosNaming::NamingContext_var                           name_context;
     CosNotifyChannelAdmin::AdminID                         saID;
     CosNotifyChannelAdmin::ProxyID                         pxID;
     CosNotifyFilter::FilterID                              flID;
     CosNotifyFilter::Filter_var                            filterp;
     CosNotifyFilter::FilterFactory_var                      fltfact;
     CosNotifyFilter::ConstraintExpSeq                      cexpr;
     CosNotifyFilter::ConstraintInfoSeq_var                 cinfo;
     CosNotifyChannelAdmin::EventChannel_var                channel;
     CosNotifyChannelAdmin::ConsumerAdmin_var               cadmin;
     CosNotifyChannelAdmin::ProxySupplier_var               tmp_supl;
     CosNotifyChannelAdmin::StructuredProxyPushSupplier_var prx_supl;

     CORBA::ORB_var orb = CORBA::ORB_init(argc, argv, "omniORB2");
     CORBA::BOA_var boa = orb->BOA_init(argc, argv, "omniORB2_BOA");
     StructuredPushConsumer_i* consumer = new StructuredPushConsumer_i();

     // Register the consumer object with the undelying ORB

     consumer->_obj_is_ready(boa);

     // Obtain reference to the CORBA Naming Service and resolve 'EventChannel'

     name_service = orb->resolve_initial_references("NameService");
     name_context = CosNaming::NamingContext::_narrow(name_service);
     if ( CORBA::is_nil(name_context) ) {
        cerr << "Failed to obtain context for 'NameService'" << endl;
        return -1;
     }
     name.length(1);
     name[0].id   = CORBA::string_dup("EventChannel");
     name[0].kind = CORBA::string_dup("EventChannel");
     echannel_ref = name_context->resolve(name);
     channel = CosNotifyChannelAdmin::EventChannel::_narrow(echannel_ref);
     if ( CORBA::is_nil(channel) ) {
        cerr << "Failed to obtain reference to 'EventChannel'" << endl;
        return -1;
     }

     // Create new ConsumerAdmin   and proxy objects, and connect the consumer

     cadmin = channel->new_for_consumers(
                          CosNotifyChannelAdmin::AND_OP, saID);
     if ( CORBA::is_nil(cadmin) ) {
        cerr << "Failed to create new ConsumerAdmin object" << endl;
        return -1;
     }
     tmp_supl = cadmin->obtain_notification_push_supplier(
                          CosNotifyChannelAdmin::STRUCTURED_EVENT, pxID);
     prx_supl =
      CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow(tmp_supl);
     if ( CORBA::is_nil(prx_supl) ) {
        cerr << "Failed to create PushSupplier Proxy object" << endl;
        return -1;
     }

     // Create the filter object, add a constraint to it, and attach it to the proxy

     fltfact = channel->default_filter_factory();
     filterp = fltfact->create_filter("EXTENDED_TCL");
    
     cexpr.length(1);
     cexpr[0].event_types.length(1);
     cexpr[0].event_types[0].domain_name = CORBA::string_dup("Financial");
     cexpr[0].event_types[0].type_name = CORBA::string_dup("StockQuote");
     cexpr[0].constraint_expr = CORBA::string_dup("$Price > 25.0");

     cinfo = filterp->add_constraints(cexpr);
     if ( cinfo->length() != 1 ) {
        cerr << "Failed to add constraint to filter object" << endl;
        return -1;
     }

     flID = prx_supl->add_filter(filterp);

     // Connect the consumer to the proxy and inform the BOA that we can receive events/callbacks

     prx_supl->connect_structured_push_consumer(consumer);

     boa->impl_is_ready(0, 1);

     // Wait until at least numEvents are pushed  -- sleep 5 seconds between checks

     while ( consumer->get_num_events() < numEvents )
        omni_thread::sleep(5);

     // No more events to be consumed. Disconnect the consumer and destroy the admin object

     prx_supl->disconnect_structured_push_supplier();
     cadmin->destroy();
  } catch ( ... ) {
     //
Handle any exception that may have been raised during one of the above calls
     return -1;
  }

  return 0;
}

 


Back to Documentation Home Page


For comments, feedback, etc, please see the 'Keep in touch' page.