omniNotify Logo Free High Performance CORBA Notification Service
from AT&T Laboratories
ATT logo omniORB Home
AT&T Research
Home SF Project Features Release
Notes
Download Instal-
lation
Documen-
tation
Patch &
Bug List
More
Info
Keep in
Touch

Back to Documentation Home Page

Developing Event Consumer Applications

Event consumers receive events from an event channel they are connected with, without any knowledge of the suppliers of those events. omniNotify supports both push and pull event communication for consumers of events. Push consumers have to implement a push operation which is invoked by the event channel when the channel has events to forward to these consumer. On the other hand, pull consumers can retrieve events from the channel by invoking a pull operation on the proxy objects they are connected with.

The event messages sent to a push consumer by an event channel or requested from an event channel by a pull consumer belong to one of three categories:

To develop event consumer applications, you must implement these consumers as normal CORBA applications that communicate with event channels through the CORBA Notification Service IDL interfaces. These IDL definitions are supplied with omniNotify.

In the remainder of this document we will present two example event consumer applications. Both of these applications use CosNotification::StructuredEvent messages, and they differ in the event communication model they implement; one is a push consumer and the other is a pull consumer. In both applications, the following steps have to be executed before any event communication takes place:

  1. Implement an instance of the appropriate consumer interface;
  2. Obtain a reference to a CosNotifyChannelAdmin::EventChannel object in the omniNotify notification server;
  3. Obtain a reference to a CosNotifyChannelAdmin::ConsumerAdmin object using the above reference;
  4. Create an instance of the appropriate proxy object using the above admin reference;
  5. Connect the consumer object created in the first step to the proxy instance of the previous step.

Developing a Push Consumer

Our consumer implements the CosNotifyComm::StructuredPushConsumer interface. This is done by inheriting from the corresponding skeleton class, which is generated by the IDL compiler, and implementing all abstract methods defined in this class. Obtaining a reference to an event channel, creating or using an existing administrative object, creating a new proxy object, and connecting the consumer to the proxy is done in the way presented in the development of a supplier application. The complete listing of the push consumer application is presented below.

// -*- Mode: C++; -*-

#include <iostream.h>
#include "CosNotifyComm.hh"
#include "CosNotifyChannelAdmin.hh"

class StructuredPushConsumer_i :
            public virtual CosNotifyComm::_sk_StructuredPushConsumer {
public:
    StructuredPushConsumer_i() : num_events(0)    {;}
    void disconnect_structured_push_consumer()    {;}
    void offer_change(const CosNotification::EventTypeSeq& added,
                      const CosNotification::EventTypeSeq& removed)
           { CORBA::ULong ix;
             for (ix=0; ix < added.length(); ix++) {
                 cout << "+ " << added[ix].domain_name <<
                         "::" << added[ix].type_name   << endl;
             }
             for (ix=0; ix < removed.length(); ix++) {
                  cout << "- " << removed[ix].domain_name <<
                          "::" << removed[ix].type_name   << endl;
             }
           }
    void push_structured_event(const CosNotification::StructuredEvent& se)
           {
             CORBA::Long val;
             se.remainder_of_body >>= val;
             cout << se.header.fixed_header.event_type.domain_name << "::"
                  << se.header.fixed_header.event_type.type_name   << " : "
                  << val << endl;
             num_events += 1;
           }
    CORBA::Long get_num_events() const   { return num_events; }
private:
    //
The following is used to keep track of the number of events pushed by the event channel
    CORBA::Long num_events;
};

//////////////////////////////////////////////////////////////////////////

int main(int argc, char** argv)
{
  CORBA::Long numEvents = 100;

  try {
     CosNaming::Name                                        name;
     CORBA::Object_var                                      echannel_ref;
     CORBA::Object_var                                      name_service;
     CosNaming::NamingContext_var                           name_context;
     CosNotifyChannelAdmin::AdminID                         saID;
     CosNotifyChannelAdmin::ProxyID                         pxID;
     CosNotifyChannelAdmin::EventChannel_var                channel;
     CosNotifyChannelAdmin::ConsumerAdmin_var               cadmin;
     CosNotifyChannelAdmin::ProxySupplier_var               tmp_supl;
     CosNotifyChannelAdmin::StructuredProxyPushSupplier_var prx_supl;

     CORBA::ORB_var orb = CORBA::ORB_init(argc, argv, "omniORB2");
     CORBA::BOA_var boa = orb->BOA_init(argc, argv, "omniORB2_BOA");
     StructuredPushConsumer_i* consumer = new StructuredPushConsumer_i();

     // Register the consumer object with the undelying ORB

     consumer->_obj_is_ready(boa);

     // Obtain reference to the CORBA Naming Service and resolve 'EventChannel'

     name_service = orb->resolve_initial_references("NameService");
     name_context = CosNaming::NamingContext::_narrow(name_service);
     if ( CORBA::is_nil(name_context) ) {
        cerr << "Failed to obtain context for 'NameService'" << endl;
        return -1;
     }
     name.length(1);
     name[0].id   = CORBA::string_dup("EventChannel");
     name[0].kind = CORBA::string_dup("EventChannel");
     echannel_ref = name_context->resolve(name);
     channel = CosNotifyChannelAdmin::EventChannel::_narrow(echannel_ref);
     if ( CORBA::is_nil(channel) ) {
         cerr << "Failed to obtain reference to 'EventChannel'" << endl;
         return -1;
     }

     // Create new ConsumerAdmin   and proxy objects, and connect the consumer

     cadmin = channel->new_for_consumers(
                          CosNotifyChannelAdmin::AND_OP, saID);
     if ( CORBA::is_nil(cadmin) ) {
        cerr << "Failed to create new ConsumerAdmin object" << endl;
        return -1;
     }
     tmp_supl = cadmin->obtain_notification_push_supplier(
                          CosNotifyChannelAdmin::STRUCTURED_EVENT, pxID);
     prx_supl =
      CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow(tmp_supl);
     if ( CORBA::is_nil(prx_supl) ) {
        cerr << "Failed to create PushSupplier Proxy object" << endl;
        return -1;
     }
     prx_supl->connect_structured_push_consumer(consumer);

     // Inform the BOA that we are ready to receive events/callbacks from the event channel

     boa->impl_is_ready(0, 1);

     // Wait until at least numEvents are pushed  -- sleep 5 seconds between checks

     while ( consumer->get_num_events() < numEvents )
        omni_thread::sleep(5);

     // No more events to be consumed. Disconnect the consumer and destroy the admin object

     prx_supl->disconnect_structured_push_supplier();
     cadmin->destroy();
  } catch ( ... ) {
     //
Handle any exception that may have been raised during one of the above calls
     return -1;
  }

  return 0;
}

 


Developing a Pull Consumer

Our consumer implements the CosNotifyComm::StructuredPullConsumer interface. This is done by inheriting from the corresponding skeleton class, which is generated by the IDL compiler, and implementing all abstract methods defined in this class. Obtaining a reference to an event channel, creating or using an existing administrative object, creating a new proxy object, and connecting the consumer to the proxy is done in the way presented in the development of a supplier application. The complete listing of the pull consumer application is presented below.

// -*- Mode: C++; -*-

#include <iostream.h>
#include "CosNotifyComm.hh"
#include "CosNotifyChannelAdmin.hh"

class StructuredPullConsumer_i :
            public virtual CosNotifyComm::_sk_StructuredPullConsumer {
public:
    StructuredPullConsumer_i()                     {;}
    void disconnect_structured_pull_consumer()    {;}
    void offer_change(const CosNotification::EventTypeSeq& added,
                      const CosNotification::EventTypeSeq& removed)
           { CORBA::ULong ix;
             for (ix=0; ix < added.length(); ix++) {
                 cout << "+ " << added[ix].domain_name <<
                         "::" << added[ix].type_name   << endl;
             }
             for (ix=0; ix < removed.length(); ix++) {
                  cout << "- " << removed[ix].domain_name <<
                          "::" << removed[ix].type_name   << endl;
             }
           }
};

//////////////////////////////////////////////////////////////////////////

int main(int argc, char** argv)
{
  CORBA::Long numEvents = 100;

  try {
     CosNaming::Name                                        name;
     CORBA::Object_var                                      echannel_ref;
     CORBA::Object_var                                      name_service;
     CosNaming::NamingContext_var                           name_context;
     CosNotifyChannelAdmin::AdminID                         saID;
     CosNotifyChannelAdmin::ProxyID                         pxID;
     CosNotification::StructuredEvent*                       se;
     CosNotifyChannelAdmin::EventChannel_var                channel;
     CosNotifyChannelAdmin::ConsumerAdmin_var               cadmin;
     CosNotifyChannelAdmin::ProxySupplier_var               tmp_supl;
     CosNotifyChannelAdmin::StructuredProxyPullSupplier_var prx_supl;

     CORBA::ORB_var orb = CORBA::ORB_init(argc, argv, "omniORB2");
     CORBA::BOA_var boa = orb->BOA_init(argc, argv, "omniORB2_BOA");
     StructuredPullConsumer_i* consumer = new StructuredPullConsumer_i();

     // Register the consumer object with the undelying ORB

     consumer->_obj_is_ready(boa);

     // Obtain reference to the CORBA Naming Service and resolve 'EventChannel'

     name_service = orb->resolve_initial_references("NameService");
     name_context = CosNaming::NamingContext::_narrow(name_service);
     if ( CORBA::is_nil(name_context) ) {
        cerr << "Failed to obtain context for NameService" << endl;
        return -1;
     }
     name.length(1);
     name[0].id   = CORBA::string_dup("EventChannel");
     name[0].kind = CORBA::string_dup("EventChannel");
     echannel_ref = name_context->resolve(name);
     channel = CosNotifyChannelAdmin::EventChannel::_narrow(echannel_ref);
     if ( CORBA::is_nil(channel) ) {
         cerr << "Failed to obtain reference to 'EventChannel'" << endl;
         return -1;
     }

     // Create new ConsumerAdmin and proxy objects, and connect the consumer

     cadmin = channel->new_for_consumers(
                          CosNotifyChannelAdmin::AND_OP, saID);
     if ( CORBA::is_nil(cadmin) ) {
        cerr << "Failed to create new ConsumerAdmin object" << endl;
        return -1;
     }
     tmp_supl = cadmin->obtain_notification_pull_supplier(
                          CosNotifyChannelAdmin::STRUCTURED_EVENT, pxID);
     prx_supl =
      CosNotifyChannelAdmin::StructuredProxyPullSupplier::_narrow(tmp_supl);
     if ( CORBA::is_nil(prx_supl) ) {
        cerr << "Failed to create PullSupplier Proxy object" << endl;
        return -1;
     }
     prx_supl->connect_structured_pull_consumer(consumer);

     // Inform the BOA that we are ready to receive events/callbacks from the event channel

     boa->impl_is_ready(0, 1);

     // Pull numEvents structured events from the channel

     for ( CORBA::Long ix = 0; ix < numEvents; ix++ ) {
         CORBA::Long val;
         se = prx_supl->pull_structured_event();
         se->remainder_of_body >>= val;
         cout << se->header.fixed_header.event_type.domain_name << "::"
              << se->header.fixed_header.event_type.type_name   << " : "
              << val << endl;
     }

 

     // No more events to be consumed. Disconnect the consumer and destroy the admin object

     prx_supl->disconnect_structured_pull_supplier();
     cadmin->destroy();
  } catch ( ... ) {
     //
Handle any exception that may have been raised during one of the above calls
     return -1;
  }

  return 0;
}

 

Back to Documentation Home Page


For comments, feedback, etc, please see the 'Keep in touch' page.