omniNotify Logo Free High Performance CORBA Notification Service
from AT&T Laboratories
ATT logo omniORB Home
AT&T Research
Home SF Project Features Release
Notes
Download Instal-
lation
Documen-
tation
Patch &
Bug List
More
Info
Keep in
Touch

Back to Documentation Home Page

Developing Push Suppliers

This section goes over the details of the StructuredPushSupplier_i class found in sample_clients.h / sample_clients.cc.  This class does all the real work of the example program struct_push_supplier.cc.  (As discussed in the Common Code section, the main program creates a StructuredPushSupplier_i object using the class's create function, tells the new client object to connect to its proxy using the connect method, and then waits for the desired number of events to be supplied using the wait_done method.)

This section assumes you have read the section covering class StructuredPushConsumer_i. All the client code has the same basic structure, so we do not repeat things here that are covered in that section.

The rest of this section covers the class declaration (in sample_clients.h) and the class implementation (in sample_clients.cc).


Class Declaration (sample_clients.h)

Here is the beginning of the class declaration:


class StructuredPushSupplier_i :
  public GenericWorker,
  public POA_CosNotifyComm::StructuredPushSupplier,
  public PortableServer::RefCountServantBase

A push supplier of structured events must implement the CosNotifyComm::StructuredPushSupplier interface.  Thus, class StructuredPushSupplier_i inherits from the POA skeleton class POA_CosNotifyComm::StructuredPushSupplier.  It also inherits from the ``mix-in'' class PortableServer::RefCountServantBase to enable automatic object disposal based on reference counting: the POA will destroy the object when its reference count reaches zero. Finally, the class inherits from class GenericWorker. GenericWorker is defined at the top of sample_clients.h. Any class that inherits from GenericWorker must implement the start_working method, which will run in its own thread. As we'll see below, start_working will be used to push events to the notification channel.

The constructor, create function, and most of the methods are the same as in the declaration for StructuredPushConsumer_i, with these exceptions.  First, _my_proxy has type CosNA_StructuredProxyPushConsumer_var (a structured push supplier must connect to a structured proxy push consumer). Second, rather than having an offer_change method, suppliers have a subscription_change method. (The default change function for all suppliers is sample_subscription_change_fn.) Third, the create call takes a function used for supplying structured events, instead of a function for consuming events. The default supply function is sample_supply_structured_fn, and is found in sample_functions.cc. Finally, there is no push method. A push supplier does not have a method related to pushing. The proxy provides the method push_structured_event to support pushing of events to the channel.

The following is the complete class declaration.

class StructuredPushSupplier_i :
  public GenericWorker,
  public POA_CosNotifyComm::StructuredPushSupplier,
  public PortableServer::RefCountServantBase
{
public:
  StructuredPushSupplier_i(CosNA_StructuredProxyPushConsumer_ptr proxy,
                           CORBA::ULong max_events,
                            const char* objnm,
                           supply_structured_fn* supply_fn,
                           type_change_fn* change_fn,
                           CORBA::ULong millisecs,
                           CORBA::Boolean verbose);

  static StructuredPushSupplier_i*
    create(CORBA::ORB_ptr orb,
           CosNA_EventChannel_ptr channel,
           CORBA::ULong max_events = 0,
           CORBA::ULong batch_size = 0,
           const char* objnm = "StructuredPushSupplier_i",
           const char* proxy_ior_file = "",
           supply_structured_fn* supply_fn = sample_supply_structured_fn,
           type_change_fn* change_fn = sample_subscription_change_fn,
           CosN_EventTypeSeq* evs_ptr = 0,
           const char* constraint_expr = "",
           CORBA::ULong millisecs = 0,
           CORBA::Boolean verbose = 0);

  // IDL methods
  void disconnect_structured_push_supplier();
  void subscription_change(const CosN_EventTypeSeq& added,
                           const CosN_EventTypeSeq& deled);
  // Local methods
  CORBA::Boolean connect();
  CORBA::Boolean wait_done();
  void cleanup();
  void* start_working(void *);

protected:
  CosNA_StructuredProxyPushConsumer_var   _my_proxy;
  const char*                             _obj_name;
  supply_structured_fn*                   _supply_fn;
  type_change_fn*                         _change_fn;
  CORBA::ULong                            _num_events;
  CORBA::ULong                            _max_events;
  CORBA::ULong                            _millisecs;
  CORBA::Boolean                          _verbose;
  CORBA::Boolean                          _done;
  CORBA::Boolean                          _com_err;
  omni_mutex                              _oplock;
  omni_condition                          _finish;
  GenericBoundWorkerThread*               _worker;
};

 


Class Implementation (sample_clients.cc)

The push supplier classes are very similar in structure to the pull consumer classes -- both use a worker thread to either pull events from the channel or push events to the channel.  For this class, events are pushed using the proxy's push_structured_event method.  The supplier performs this call in a worker thread, pausing for _millisecs milliseconds between each push call.  (A setting of zero means push as fast as possible.)  Because this worker thread is constantly communicating with the channel, there is no need for a separate 'ping thread' to monitor the liveness of the proxy.

Start_working Method

As with the pull consumer, the key method is the start_working method:


void* StructuredPushSupplier_i::start_working(void *) {
  CosNC_StructuredPushSupplier_var bump_my_refcount_during_outcalls = _this();

  unsigned long t_secs = 0, t_nanosecs = 0;
  unsigned long d_secs = 0, d_nanosecs = 0;
  if (_millisecs) {
    d_secs = _millisecs / 1000;
    d_nanosecs = (_millisecs % 1000) * 1000000;
  }
  CosN_StructuredEvent* data = new CosN_StructuredEvent;
  _oplock.lock();
  if (_verbose && (!_done))
    cout << _obj_name << ": Spawned thread entering main push loop" << endl;
  // invariants: _oplock is held entering top of loop and
  // also whenever we break out of loop
  // (it is *not* held during push_structured_event(), yield() and timedwait() calls)
  while ( 1 ) {
    if (_done) break; // must have disconnected
    if ((*_supply_fn)(*data, _obj_name, _num_events+1, _verbose)) {
      _oplock.unlock(); // do not hold oplock across push
      try {
        _my_proxy->push_structured_event(*data);
      } catch (...) {
        _oplock.lock();
        if (_done) break; // must have disconnected during push
        if (_verbose)
          cout << _obj_name << ": communication error while calling push_structured_event()" << endl;
        _com_err = 1;
        break; // break from while loop -- done
      }
      _oplock.lock();
      if (_done) break; // must have disconnected during push
      _num_events++;
      if (_max_events && (_num_events >= _max_events)) {
        if (_verbose)
          cout << _obj_name << ": DONE [max_events reached]" << endl;
        break; // done
      }
    }
    // wait or yield then try again
    if (_millisecs) { // sleep for specified interval
      omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs);
      _finish.timedwait(t_secs, t_nanosecs); // releases _oplock during wait
    } else { // use yield to let other threads have a shot at _oplock
      _oplock.unlock();
      omni_thread::yield();
      _oplock.lock();
    }
    // continue pushing
  }
  // done
  delete data;
  if (!_done) {
    _done = 1;
    _finish.broadcast();
  }
  _oplock.unlock();
  return 0;
}

 

start_working enters a while loop, while(1), which loops until a break statement is executed.   At the top of the loop, _oplock is always held.  Each time through the loop, the lock is released, push_structured_event is called, then the lock is reacquired.  As with a ping loop, it is important to release the lock across a call to a remote object so that other methods can execute during this call.  

There are three things that cause a break from the while loop:  _done has been set to 1 by another method (such as disconnect_structured_push_supplier); the target event count is reached; an error occurs during a push call.  For the latter 2 cases, the code following the while loop will set _done to true and signal the _finish condition variable using broadcast.   This will wake any thread waiting on that condition (see wait_done).   For the case where an error occurs during a push call, _com_err is set to true before executing the break statement.

Prior to performing a push, the _consume_fn is used to obtain the structured event to be pushed to the channel.  (The default function, sample_push_structured_fn, provides events of type Stock::Ticker and Bond::Ticker with stock and bond price quotes.  See sample_functions.cc.)  The function need not return an event; if it does not, the push call is skipped and the loop continues (after yielding or waiting, as discussed below).  For each successful push, the method  increments the count of the number of events received, _num_events.  If _num_events has reached _max_events, a break occurs.  

At the bottom of the while loop, if _millisecs is zero then omni_thread::yield() is called, otherwise timewait is called with the specified number of milliseconds (converted to an absolute seconds/nanoseconds time value).  The yield is done to ensure that the thread running the start_working method does not 'hog' the processor... it gives other methods that need _oplock a chance to run.  (Since start_working releases the lock on each push, the yield call may not be strictly necessary.)

Complete Implementation Listing

The complete listing of the class implementation is presented below.

StructuredPushSupplier_i::
StructuredPushSupplier_i(CosNA_StructuredProxyPushConsumer_ptr proxy,
                         CORBA::ULong max_events, const char* objnm,
                         supply_structured_fn* supply_fn, type_change_fn* change_fn,
                         CORBA::ULong millisecs, CORBA::Boolean verbose) :
  _my_proxy(proxy), _obj_name(objnm), _supply_fn(supply_fn), _change_fn(change_fn),
  _num_events(0), _max_events(max_events),
  _millisecs(millisecs), _verbose(verbose), _done(0), _com_err(0),
  _oplock(), _finish(&_oplock), _worker(0)
{
  // providing explict NULL for supply_fn is not OK -- must have a valid function
  if (!_supply_fn) _supply_fn = sample_supply_structured_fn;
}

StructuredPushSupplier_i*
StructuredPushSupplier_i::create(CORBA::ORB_ptr orb,
                                  CosNA_EventChannel_ptr channel,
                                  CORBA::ULong max_events,
                                  CORBA::ULong batch_size,
                                  const char* objnm,
                                  const char* proxy_ior_file,
                                  supply_structured_fn* supply_fn,
                                  type_change_fn* change_fn,
                                  CosN_EventTypeSeq* evs_ptr,
                                  const char* constraint_expr,
                                  CORBA::ULong millisecs,
                                  CORBA::Boolean verbose)
{
  // Obtain appropriate proxy object
  CosNA_ProxyConsumer_var generic_proxy =
    get_proxy_consumer(channel, CosNA_STRUCTURED_EVENT, 1, verbose); // 1 means push 0 means pull
  CosNA_StructuredProxyPushConsumer_ptr proxy =
    CosNA_StructuredProxyPushConsumer::_narrow(generic_proxy);
  if ( CORBA::is_nil(proxy) ) {
    return 0; // get_proxy_consumer failed
  }
  // If evs or constraint_expr are non-empty, add a filter to proxy
  if ((evs_ptr) &&
      sample_add_filter(channel, proxy, *evs_ptr, constraint_expr, objnm, verbose)) {
    return 0; // adding filter failed
  }
  // write proxy IOR to file
  write_ior_to_file(orb, proxy, proxy_ior_file, verbose);
  // Construct a client
  StructuredPushSupplier_i* client =
  new StructuredPushSupplier_i(proxy, max_events, objnm, supply_fn, change_fn, millisecs, verbose);
  return client;
}

CORBA::Boolean StructuredPushSupplier_i::connect() {
  omni_mutex_lock l(_oplock);
   if (_done) return 0;
  _com_err = 0;
  try {
    _my_proxy->connect_structured_push_supplier(_this());
    if (_change_fn) {
      _my_proxy->obtain_subscription_types(CosNA_NONE_NOW_UPDATES_ON);
    } else {
      _my_proxy->obtain_subscription_types(CosNA_NONE_NOW_UPDATES_OFF);
    }
  } catch (CORBA::BAD_PARAM& ex) {
    cerr << _obj_name << ": BAD_PARAM Exception while connecting" << endl;
    return 1; // error
  } catch (CosEventChannelAdmin::AlreadyConnected& ex) {
    cerr << _obj_name << ": Already connected" << endl;
    return 1; // error
  } catch (...) {
    cerr << _obj_name << ": Failed to connect" << endl;
    return 1; // error
  }
  // spawn a thread to do pushing
  if (_verbose)
    cout << _obj_name << ": Connected to proxy, ready to supply events" << endl;
  _worker = new GenericBoundWorkerThread(this);
  return 0; // OK
}

void StructuredPushSupplier_i::cleanup() {
  _oplock.lock();
  if (_worker || (!_done)) {
    cerr << "Coding error: only call c->cleanup() after c->wait_done()" << endl;
    _oplock.unlock();
    return;
  }
  // this method takes sole ownership of _my_proxy ref
  CosNA_StructuredProxyPushConsumer_var proxy = _my_proxy;
  _my_proxy = CosNA_StructuredProxyPushConsumer::_nil();
  // do not hold oplock while invoking disconnect
  _oplock.unlock();
  try {
    if ((!_com_err) && (!CORBA::is_nil(proxy)))
      proxy->disconnect_structured_push_consumer();
  } catch(...) {}
}

void* StructuredPushSupplier_i::start_working(void *) {
  CosNC_StructuredPushSupplier_var bump_my_refcount_during_outcalls = _this();

  unsigned long t_secs = 0, t_nanosecs = 0;
  unsigned long d_secs = 0, d_nanosecs = 0;
  if (_millisecs) {
    d_secs = _millisecs / 1000;
    d_nanosecs = (_millisecs % 1000) * 1000000;
  }
  CosN_StructuredEvent* data = new CosN_StructuredEvent;
  _oplock.lock();
  if (_verbose && (!_done))
    cout << _obj_name << ": Spawned thread entering main push loop" << endl;
  // invariants: _oplock is held entering top of loop and
  // also whenever we break out of loop
  // (it is *not* held during push_structured_event(), yield() and timedwait() calls)
  while ( 1 ) {
    if (_done) break; // must have disconnected
    if ((*_supply_fn)(*data, _obj_name, _num_events+1, _verbose)) {
      _oplock.unlock(); // do not hold oplock across push
      try {
        _my_proxy->push_structured_event(*data);
      } catch (...) {
        _oplock.lock();
        if (_done) break; // must have disconnected during push
        if (_verbose)
          cout << _obj_name << ": communication error while calling push_structured_event()" << endl;
        _com_err = 1;
        break; // break from while loop -- done
      }
      _oplock.lock();
      if (_done) break; // must have disconnected during push
      _num_events++;
      if (_max_events && (_num_events >= _max_events)) {
        if (_verbose)
          cout << _obj_name << ": DONE [max_events reached]" << endl;
        break; // done
      }
    }
    // wait or yield then try again
    if (_millisecs) { // sleep for specified interval
      omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs);
      _finish.timedwait(t_secs, t_nanosecs); // releases _oplock during wait
    } else { // use yield to let other threads have a shot at _oplock
      _oplock.unlock();
      omni_thread::yield();
      _oplock.lock();
    }
    // continue pushing
  }
  // done
  delete data;
  if (!_done) {
    _done = 1;
    _finish.broadcast();
  }
  _oplock.unlock();
  return 0;
}

void StructuredPushSupplier_i::disconnect_structured_push_supplier()
{
  if (_done) return; // see "Optimization Notes" at top

  omni_mutex_lock l(_oplock);
  if (_done) return;
  if (_verbose)
    cout << _obj_name << ": disconnected" << endl;
  _done = 1;
  _finish.broadcast();
}

CORBA::Boolean StructuredPushSupplier_i::wait_done() {
  _oplock.lock();
  while (!_done) {
    _finish.wait();
  }
  // using tmp_worker ensures only one wait_done call tries join()
  GenericBoundWorkerThread* tmp_worker = _worker;
  _worker = 0;
  _oplock.unlock();
  if (tmp_worker) {
    tmp_worker->join(0);
  }
  return _com_err;
}

void StructuredPushSupplier_i::subscription_change(const CosN_EventTypeSeq& added,
                        const CosN_EventTypeSeq& deled)
{
  if (_done) return; // see "Optimization Notes" at top

  omni_mutex_lock l(_oplock);
  if (_done) return;
  if (_verbose)
    cout << _obj_name << ": subscription_change received" << endl;
  if (_change_fn) (*_change_fn)(added, deled, _obj_name, _verbose);
}

 


Back to Documentation Home Page


For comments, feedback, etc, please see the 'Keep in touch' page.