omniNotify Logo Free High Performance CORBA Notification Service
from AT&T Laboratories
ATT logo omniORB Home
AT&T Research
Home SF Project Features Release
Notes
Download Instal-
lation
Documen-
tation
Patch &
Bug List
More
Info
Keep in
Touch

Back to Documentation Home Page

Developing Event Supplier Applications

omniNotify supports push and pull event communication for suppliers of events. A push supplier sends events to a omniNotify event channel by invoking a push operation on the proxy object it is connected with. A pull supplier implements a pull operation that is invoked by an event channel to retrieve any event(s) the supplier may have available. The event messages send to an event channel by a push supplier or requested from a pull supplier by an event channel belong to three categories:

To develop event supplier applications, you must implement these suppliers as normal CORBA applications that communicate with event channels through the CORBA Notification Service IDL interfaces. These IDL definitions are supplied with omniNotify.

In the remainder of this document we will present two example event supplier applications. Both of these applications use CosNotification::StructuredEvent messages, and they differ in the event communication model they implement; one is a push supplier and the other is a pull supplier. In both applications, the following steps have to be executed before any event communication takes place:

  1. Implement an instance of the appropriate supplier interface;
  2. Obtain a reference to a CosNotifyChannelAdmin::EventChannel object in the omniNotify notification server;
  3. Obtain a reference to a CosNotifyChannelAdmin::SupplierAdmin object using the above reference;
  4. Create an instance of the appropriate proxy object using the above admin reference;
  5. Connect the supplier object created in the first step to the proxy instance of the previous step.

Developing a Push Supplier

Our supplier implements the CosNotifyComm::StructuredPushSupplier interface. This is done by inheriting from the corresponding skeleton class, which is generated by the IDL compiler, and implementing all abstract methods defined in this class.

// -*- Mode: C++; -*-

class StructuredPushSupplier_i :
            public virtual CosNotifyComm::_sk_StructuredPushSupplier {
public:
    StructuredPushSupplier_i()                    {;}
    void disconnect_structured_push_supplier()    {;}
    void subscription_change(const CosNotification::EventTypeSeq& added,
                             const CosNotification::EventTypeSeq& removed)
           { CORBA::ULong ix;
             for (ix=0; ix < added.length(); ix++) {
                 cout << "+ " << added[ix].domain_name <<
                         "::" << added[ix].type_name   << endl;
             }
             for (ix=0; ix < removed.length(); ix++) {
                  cout << "- " << removed[ix].domain_name <<
                          "::" << removed[ix].type_name   << endl;
             }
           }
};

We can locate a reference to the default event channel created by the omniNotify notification server using the CORBA Naming Service. The code segment below illustrates how this is done, assuming that the name of the default channel is "EventChannel" and the name of the Naming Service is "NameService".

// -*- Mode: C++; -*-

CORBA::ORB_var                          orb;
CosNaming::Name                         name;
CORBA::Object_var                       echannel_ref;
CORBA::Object_var                       name_service;
CosNaming::NamingContext_var            name_context;
CosNotifyChannelAdmin::EventChannel_var channel;

// The following assumes that the orb reference is initialized.

name_service = orb->resolve_initial_references("NameService");
name_context = CosNaming::NamingContext::_narrow(name_service);
name.length(1);
name[0].id   = CORBA::string_dup("EventChannel");
name[0].kind = CORBA::string_dup("EventChannel");
echannel_ref = name_context->resolve(name);
channel      = CosNotifyChannelAdmin::EventChannel::_narrow(echannel_ref);

The following code segment illustrates how we can connect the event supplier to the channel. First, we need to obtain a reference to a CosNotifyChannelAdmin::SupplierAdmin object. This can be done by either requesting the creation of a new object or retrieving a reference to the default SupplierAdmin object that is created by each channel. The ID of this object is 0. Next, we create a StructuredProxyPushConsumer object using the SupplierAdmin reference from the previous step. Once this is done, we connect the supplier instance to this proxy object and start supplying structured events to the channel.

// -*- Mode: C++; -*-

CosNotifyChannelAdmin::AdminID                         saID;
CosNotifyChannelAdmin::ProxyID                         pxID;
CosNotifyChannelAdmin::SupplierAdmin_var               sadmin;
CosNotifyChannelAdmin::ProxyConsumer_var               tmp_cons;
CosNotifyChannelAdmin::StructuredProxyPushConsumer_var prx_cons;

StructuredPushSupplier_i* supplier = new StructuredPushSupplier_i();

sadmin   = channel->new_for_suppliers(CosNotifyChannelAdmin::AND_OP, saID);
tmp_cons = sadmin->obtain_notification_push_consumer(
                             CosNotifyChannelAdmin::STRUCTURED_EVENT, pxID);
prx_cons =
     CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow(tmp_cons);
prx_cons->connect_structured_push_supplier(supplier);

The following code segment illustrates the transfer of structured events to the channel. The structure of these events is very simple. The domain name is set to "Ready" and type name is set to "Long". The payload of the event is stored in the remainder_of_body field, and contains the value of a counter whose type is CORBA::Long. Here, the supplier pushes numEvents to the channel without carrying out any other operation between successive pushes.

// -*- Mode: C++; -*-

CosNotification::StructuredEvent se;

se.header.fixed_header.event_type.domain_name = CORBA::string_dup("READY");
se.header.fixed_header.event_type.type_name   = CORBA::string_dup("Long");
se.header.fixed_header.event_name             = CORBA::string_dup("");
se.header.variable_header.length(0);
se.filterable_data.length(0);

for (CORBA::Long i = 0; i < numEvents; i++) {
     se.remainder_of_body <<= i;
     pxcons->push_structured_event( se );
}

The complete listing of the push supplier application is presented below, assuming that omniORB is being used.

// -*- Mode: C++; -*-

#include <iostream.h>
#include "CosNotifyComm.hh"
#include "CosNotifyChannelAdmin.hh"

class StructuredPushSupplier_i :
            public virtual CosNotifyComm::_sk_StructuredPushSupplier {
public:
    StructuredPushSupplier_i()                    {;}
    void disconnect_structured_push_supplier()    {;}
    void subscription_change(const CosNotification::EventTypeSeq& added,
                             const CosNotification::EventTypeSeq& removed)
           { CORBA::ULong ix;
             for (ix=0; ix < added.length(); ix++) {
                 cout << "+ " << added[ix].domain_name <<
                         "::" << added[ix].type_name   << endl;
             }
             for (ix=0; ix < removed.length(); ix++) {
                  cout << "- " << removed[ix].domain_name <<
                          "::" << removed[ix].type_name   << endl;
             }
           }
};

//////////////////////////////////////////////////////////////////////////

int main(int argc, char** argv)
{
  CORBA::Long numEvents = 100;

  try {
     CosNaming::Name                                        name;
     CORBA::Object_var                                      echannel_ref;
     CORBA::Object_var                                      name_service;
     CosNaming::NamingContext_var                           name_context;
     CosNotifyChannelAdmin::AdminID                         saID;
     CosNotifyChannelAdmin::ProxyID                         pxID;
     CosNotification::StructuredEvent                       se;
     CosNotifyChannelAdmin::EventChannel_var                channel;
     CosNotifyChannelAdmin::SupplierAdmin_var               sadmin;
     CosNotifyChannelAdmin::ProxyConsumer_var               tmp_cons;
     CosNotifyChannelAdmin::StructuredProxyPushConsumer_var prx_cons;

     CORBA::ORB_var orb = CORBA::ORB_init(argc, argv, "omniORB2");
     CORBA::BOA_var boa = orb->BOA_init(argc, argv, "omniORB2_BOA");
     StructuredPushSupplier_i* supplier = new StructuredPushSupplier_i();

     // Register the supplier object with the ORB

     supplier->_obj_is_ready(boa);

     // Obtain reference to the CORBA Naming Service and resolve 'EventChannel'

     name_service = orb->resolve_initial_references("NameService");
     name_context = CosNaming::NamingContext::_narrow(name_service);
     if ( CORBA::is_nil(name_context) ) {
        cerr << "Failed to obtain context for 'NameService'" << endl;
        return -1;
     }
     name.length(1);
     name[0].id   = CORBA::string_dup("EventChannel");
     name[0].kind = CORBA::string_dup("EventChannel");
     echannel_ref = name_context->resolve(name);
     channel = CosNotifyChannelAdmin::EventChannel::_narrow(echannel_ref);
     if ( CORBA::is_nil(channel) ) {
         cerr << "Failed to obtain reference to 'EventChannel'" << endl;
         return -1;
     }

     // Create new SupplierAdmin and proxy objects, and connect the supplier

     sadmin = channel->new_for_suppliers(
                          CosNotifyChannelAdmin::AND_OP, saID);
     if ( CORBA::is_nil(sadmin) ) {
        cerr << "Failed to create new SupplierAdmin object" << endl;
        return -1;
     }
     tmp_cons = sadmin->obtain_notification_push_consumer(
                          CosNotifyChannelAdmin::STRUCTURED_EVENT, pxID);
     prx_cons =
     CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow(tmp_cons);
     if ( CORBA::is_nil(prx_cons) ) {
        cerr << "Failed to create PushConsumer Proxy object" << endl;
        return -1;
     }
     prx_cons->connect_structured_push_supplier(supplier);

     // Inform the BOA that we are ready to receive callbacks from the event channel, if any

     boa->impl_is_ready(0, 1);

     // Populate the common fields of each structured event that will be supplied

     se.header.fixed_header.event_type.domain_name =
                                         CORBA::string_dup("READY");
     se.header.fixed_header.event_type.type_name   =
                                         CORBA::string_dup("Long");
     se.header.fixed_header.event_name = CORBA::string_dup("");
     se.header.variable_header.length(0);
     se.filterable_data.length(0);

     for (CORBA::Long i = 0; i < numEvents; i++) {
        se.remainder_of_body <<= i;
        prx_cons->push_structured_event( se );
     }

     // No more events to be supplied. Disconnect the supplier and destroy the admin object

     prx_cons->disconnect_structured_push_consumer();
     sadmin->destroy();
  } catch ( ... ) {
     //
Handle any exception that may have been raised during one of the above calls
     return -1;
  }

  return 0;
}

 


Developing a Pull Supplier

The development of a pull event supplier is similar to the development of a push event supplier. The only major difference is the implementation of the CosNotifyComm::StructuredPullSupplier interface. The code segment below shows a simple implementation of this interface.

Since the event channel may invoke the pull_structured_event and try_pull_structured_event methods of the pull supplier at any time between the connection and disconnection times, the implementation of the pull supplier application should be prepared to handle such asynchronous callback invocations. The complete listing of a simple pull supplier application is presented below.

// -*- Mode: C++; -*-

#include <iostream.h>
#include "CosNotifyComm.hh"
#include "CosNotifyChannelAdmin.hh"

CORBA::Long numEvents = 100;        // Remaining number of events to be pulled

class StructuredPullSupplier_i :
            public virtual CosNotifyComm::_sk_StructuredPullSupplier {
public:
    StructuredPullSupplier_i() : next_number(0)   {;}
    void disconnect_structured_pull_supplier()    {;}
    void subscription_change(const CosNotification::EventTypeSeq& added,
                             const CosNotification::EventTypeSeq& removed)
         {
           CORBA::ULong ix;
           for (ix=0; ix < added.length(); ix++) {
               cout << "+ " << added[ix].domain_name <<
                       "::" << added[ix].type_name   << endl;
           }
           for (ix=0; ix < removed.length(); ix++) {
                cout << "- " << removed[ix].domain_name <<
                        "::" << removed[ix].type_name   << endl;
           }
         }
    CosNotification::StructuredEvent* pull_structured_event()
         {
            CosNotification::StructuredEvent* se =
                                 new CosNotification::StructuredEvent;
            if ( numEvents == 0 )
                return se;
            se->header.fixed_header.event_type.domain_name =
                                 CORBA::string_dup("READY");
            se->header.fixed_header.event_type.type_name =
                                 CORBA::string_dup("Long");
            se->header.fixed_header.event_name = CORBA::string_dup("");
            se->header.variable_header.length(0);
            se->filterable_data.length(0);
            se->remainder_of_body <<= next_mumber++;
            numEvents -= 1;
            return se;
         }
    CosNotification::StructuredEvent* try_pull_structured_event(
                                            CORBA::Boolean& has_event)
         {
            has_event = (numEvents == 0) ? 0 : 1;
            return pull_structured_event();
         }
private:
  CORBA::Long next_number;
};

//////////////////////////////////////////////////////////////////////////

int main(int argc, char** argv)
{
  try {
     CosNaming::Name                                        name;
     CORBA::Object_var                                      echannel_ref;
     CORBA::Object_var                                      name_service;
     CosNaming::NamingContext_var                           name_context;
     CosNotifyChannelAdmin::AdminID                         saID;
     CosNotifyChannelAdmin::ProxyID                         pxID;
     CosNotification::StructuredEvent                       se;
     CosNotifyChannelAdmin::EventChannel_var                channel;
     CosNotifyChannelAdmin::SupplierAdmin_var               sadmin;
     CosNotifyChannelAdmin::ProxyConsumer_var               tmp_cons;
     CosNotifyChannelAdmin::StructuredProxyPullConsumer_var prx_cons;

     CORBA::ORB_var orb = CORBA::ORB_init(argc, argv, "omniORB2");
     CORBA::BOA_var boa = orb->BOA_init(argc, argv, "omniORB2_BOA");
     StructuredPullSupplier_i* supplier = new StructuredPullSupplier_i();

     // Register the supplier object with the ORB

     supplier->_obj_is_ready(boa);

     // Obtain reference to the CORBA Naming Service and resolve 'EventChannel'

     name_service = orb->resolve_initial_references("NameService");
     name_context = CosNaming::NamingContext::_narrow(name_service);
     if ( CORBA::is_nil(name_context) ) {
        cerr << "Failed to obtain context for 'NameService'" << endl;
        return -1;
     }
     name.length(1);
     name[0].id   = CORBA::string_dup("EventChannel");
     name[0].kind = CORBA::string_dup("EventChannel");
     echannel_ref = name_context->resolve(name);
     channel = CosNotifyChannelAdmin::EventChannel::_narrow(echannel_ref);
     if ( CORBA::is_nil(channel) ) {
         cerr << "Failed to obtain reference to 'EventChannel'" << endl;
         return -1;
     }

     // Create a new SupplierAdmin object, create a proxy object, and connect the supplier

     sadmin = channel->new_for_suppliers(
                          CosNotifyChannelAdmin::AND_OP, saID);
     if ( CORBA::is_nil(sadmin) ) {
        cerr << "Failed to create new SupplierAdmin object" << endl;
        return -1;
     }
     tmp_cons = sadmin->obtain_notification_pull_consumer(
                          CosNotifyChannelAdmin::STRUCTURED_EVENT, pxID);
     prx_cons =
     CosNotifyChannelAdmin::StructuredProxyPullConsumer::_narrow(tmp_cons);
     if ( CORBA::is_nil(prx_cons) ) {
        cerr << "Failed to create PullConsumer Proxy object" << endl;
        return -1;
     }
     prx_cons->connect_structured_pull_supplier(supplier);

     // Inform the BOA that we are ready to receive callbacks from the event channel, if any

     boa->impl_is_ready(0, 1);

     // Wait until there are no more events to be pulled -- sleep 5 seconds between checks

     while ( numEvents != 0 )
        omni_thread::sleep(5);

     // No more events to be supplied. Disconnect the supplier and destroy the admin object

     prx_cons->disconnect_structured_pull_consumer();
     sadmin->destroy();
  } catch ( ... ) {
     //
Handle any exception that may have been raised during one of the above calls
     return -1;
  }

  return 0;
}

 


Back to Documentation Home Page


For comments, feedback, etc, please see the 'Keep in touch' page.