omniNotify Logo Free High Performance CORBA Notification Service
from AT&T Laboratories
ATT logo omniORB Home
AT&T Research
Home SF Project Features Release
Notes
Download Instal-
lation
Documen-
tation
Patch &
Bug List
More
Info
Keep in
Touch

Back to Documentation Home Page

Developing Pull Consumers

This section goes over the details of the StructuredPullConsumer_i class found in sample_clients.h / sample_clients.cc.  This class does all the real work of the example program struct_push_consumer.cc.  (As discussed in the Common Code section, the main program creates a StructuredPushConsumer_i object using the class's create function, tells the new client object to connect to its proxy using the connect method, and then waits for the desired number of events to be consumed using the wait_done method.)

This section assumes you have read the section covering class StructuredPushConsumer_i. All the client code has the same basic structure, so we do not repeat things here that are covered in that section.

The rest of this section covers the class declaration (in sample_clients.h) and the class implementation (in sample_clients.cc).


Class Declaration (sample_clients.h)

Here is the beginning of the class declaration:


class StructuredPullConsumer_i :
  public GenericWorker,
  public POA_CosNotifyComm::StructuredPullConsumer,
  public PortableServer::RefCountServantBase

A pull consumer of structured events must implement the CosNotifyComm::StructuredPullConsumer interface.  Thus, class StructuredPullConsumer_i inherits from the POA skeleton class POA_CosNotifyComm::StructuredPullConsumer.  It also inherits from the ``mix-in'' class PortableServer::RefCountServantBase to enable automatic object disposal based on reference counting: the POA will destroy the object when its reference count reaches zero. Finally, the class inherits from class GenericWorker. GenericWorker is defined at the top of sample_clients.h. Any class that inherits from GenericWorker must implement the start_working method, which will run in its own thread. As we'll see below, start_working will be used to pull events from the notification channel.

The rest of the class declaration is very similar to the push consumer class declaration, with obvious changes. For eample, the type of  _my_proxy is   CosNotifyChannelAdmin::StructuredProxyPullSupplier_var.  (A structured pull consumer connects to a structured proxy pull supplier.)  The local methods are the same, and the IDL Methods are almost the same -- a pull consumer does not have a push_structured_events method.

Complete Declaration

Here is the complete class declaration:


class StructuredPullConsumer_i :
  public GenericWorker,
  public POA_CosNotifyComm::StructuredPullConsumer,
  public PortableServer::RefCountServantBase
{
public:
  StructuredPullConsumer_i(CosNA_StructuredProxyPullSupplier_ptr proxy,
                           CORBA::ULong max_events,
                           const char* objnm,
                           consume_structured_fn* consume_fn,
                           type_change_fn* change_fn,
                           CORBA::ULong millisecs,
                           CORBA::Boolean verbose);

  static StructuredPullConsumer_i*
    create(CORBA::ORB_ptr orb,
           CosNA_EventChannel_ptr channel,
           CORBA::ULong max_events = 0,
           CORBA::ULong batch_size = 0,
           const char* objnm = "StructuredPullConsumer_i",
           const char* proxy_ior_file = "",
           consume_structured_fn* consume_fn = sample_consume_structured_fn,
           type_change_fn* change_fn = sample_offer_change_fn,
           CosN_EventTypeSeq* evs_ptr = 0,
           const char* constraint_expr = "",
           CORBA::ULong millisecs = 0,
           CORBA::Boolean verbose = 0);

  // IDL methods
  void disconnect_structured_push_consumer();
  void offer_change(const CosN_EventTypeSeq& added,
                    const CosN_EventTypeSeq& deled);

  // Local methods
  CORBA::Boolean connect();
  CORBA::Boolean wait_done();
  void cleanup();
  void* start_working(void *);

protected:
  CosNA_StructuredProxyPullSupplier_var  _my_proxy;
  const char*                            _obj_name;
  consume_structured_fn*                 _consume_fn;
  type_change_fn*                        _change_fn;
  CORBA::ULong                           _num_events;
  CORBA::ULong                           _max_events;
  CORBA::ULong                           _millisecs;
  CORBA::Boolean                         _verbose;
  CORBA::Boolean                         _done;
  CORBA::Boolean                         _com_err;
  omni_mutex                             _oplock;
  omni_condition                         _finish;
  GenericBoundWorkerThread*              _worker;
};

Class Implementation (sample_clients.cc)

The two classes have similar constructors and static create methods.  In both cases the default consume function is sample_consume_structured_fn and the default offer change function is sample_offer_change_fn.  The main difference between the classes is that rather than providing a push_structured_event method which the channel inovoked to provide events to the consumer, a pull consumer requests events from the channel by invoking pull_structured_event on its proxy.  It performs this call in a worker thread, pausing for _millisecs milliseconds between each pull call.  (A setting of zero means pull as fast as possible.)  Because this worker thread is constantly polling the channel, there is no need for a separate 'ping thread' to monitor the liveness of the proxy.

Thus, the key method is the start_working method, which is spawned in a background thread by the connect method:

Start_working Method

void* StructuredPullConsumer_i::start_working(void *) {
  CosNC_StructuredPullConsumer_var bump_my_refcount_during_outcalls = _this();
  unsigned long t_secs = 0, t_nanosecs = 0;
  unsigned long d_secs = 0, d_nanosecs = 0;
  if (_millisecs) {
    d_secs = _millisecs / 1000;
    d_nanosecs = (_millisecs % 1000) * 1000000;
  }
  CosN_StructuredEvent* data = 0;
  // invariants: _oplock is held entering top of loop and
  // also whenever we break out of loop
  // (it is *not* held during pull_structured_event(), yield(), and timedwait() calls)
  _oplock.lock();
  if (_verbose && (!_done))
    cout << _obj_name << ": Spawned thread entering main pull loop" << endl;
  while ( 1 ) {
    if (_done) break; // must have disconnected
    _oplock.unlock(); // do not hold oplock across pull
    try {
      data = _my_proxy->pull_structured_event();
    } catch (...) {
      _oplock.lock();
      if (_done) break; // must have disconnected during pull
      if (_verbose)
        cout << _obj_name << ": communication error while calling pull_structured_event()" << endl;
      _com_err = 1;
      break; // break from while loop -- done
    }
    _oplock.lock();
    if (_done) break; // must have disconnected during pull
    if (!data) {
      if (_verbose)
        cout << _obj_name << ": strange failure: pull_structured_event() returned nil" << endl;
      _com_err = 1;
      break; // break from while loop -- done
    }
    // got an event
    _num_events++;
    if (_consume_fn)
      (*_consume_fn)(*data, _obj_name, _num_events, _verbose);
    else if (_verbose)
      cout << _obj_name << ": event count = " << _num_events << endl;
    delete data;
    data = 0;
    if (_max_events && (_num_events >= _max_events)) {
      if (_verbose)
        cout << _obj_name << ": DONE [max_events reached]" << endl;
      break; // done
    }
    if (_millisecs) { // sleep for specified interval
      omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs);
      _finish.timedwait(t_secs, t_nanosecs); // releases _oplock during wait
    } else { // use yield to let other threads have a shot at _oplock
      _oplock.unlock();
      omni_thread::yield();
      _oplock.lock();
    }
    // continue pulling
  }
  // done
  delete data;
  if (!_done) {
    _done = 1;
    _finish.broadcast();
  }
  _oplock.unlock();
  return 0;
}

start_working enters a while loop, while(1), which loops until a break statement is executed.   At the top of the loop, _oplock is always held.  Each time through the loop, the lock is released, pull_structured_event is called, then the lock is reacquired.  As with a ping loop, it is important to release the lock across a call to a remote object so that other methods can execute during this call.  

There are three things that cause a break from the while loop:  _done has been set to 1 by another method (such as disconnect_structured_pull_consumer); the target event count is reached; an error occurs during a pull call.  For the latter 2 cases, the code following the while loop will set _done to true and signal the _finish condition variable using broadcast.   This will wake any thread waiting on that condition (see wait_done).   For the case where an error occurs during a pull call, _com_err is set to true before executing the break statement.

For each event returned by pull_structured_event, the method  increments the count of the number of events received, _num_events.  If a consume function has been specified, it is invoked.   If _num_events has reached _max_events, a break occurs.  

After processing the event, if _millisecs is zero then omni_thread::yield() is called at the bottom of the while loop, otherwise timewait is called with the specified number of milliseconds (converted to an absolute seconds/nanoseconds time value).  The yield is done to ensure that the thread running the start_working method does not 'hog' the processor... it gives other methods that need _oplock a chance to run.  (Since start_working releases the lock on each pull, the yield call may not be strictly necessary.)

Complete Implementation

Here is the complete listing for the implementation of StructuredPullConsumer_i.


StructuredPullConsumer_i::
StructuredPullConsumer_i(CosNA_StructuredProxyPullSupplier_ptr proxy,
                         CORBA::ULong max_events, const char* objnm,
                         consume_structured_fn* consume_fn, type_change_fn* change_fn,
                         CORBA::ULong millisecs, CORBA::Boolean verbose) :
  _my_proxy(proxy), _obj_name(objnm), _consume_fn(consume_fn), _change_fn(change_fn),
  _num_events(0), _max_events(max_events), _millisecs(millisecs), _verbose(verbose),
  _done(0), _com_err(0), _oplock(), _finish(&_oplock), _worker(0)
{
  // nothing else to do
}

StructuredPullConsumer_i*
StructuredPullConsumer_i::create(CORBA::ORB_ptr orb,
                                 CosNA_EventChannel_ptr channel,
                                 CORBA::ULong max_events,
                                 CORBA::ULong batch_size,
                                 const char* objnm,
                                 const char* proxy_ior_file,
                                 consume_structured_fn* consume_fn,
                                 type_change_fn* change_fn,
                                 CosN_EventTypeSeq* evs_ptr,
                                 const char* constraint_expr,
                                 CORBA::ULong millisecs,
                                 CORBA::Boolean verbose)
{
  // Obtain appropriate proxy object
  CosNA_ProxySupplier_var generic_proxy =
    get_proxy_supplier(channel, CosNA_STRUCTURED_EVENT, 0, verbose); // 1 is push 0 is pull
  CosNA_StructuredProxyPullSupplier_ptr proxy =
    CosNA_StructuredProxyPullSupplier::_narrow(generic_proxy);
  if ( CORBA::is_nil(proxy) ) {
    return 0; // get_proxy_supplier failed
  }
  // If evs or constraint_expr are non-empty, add a filter to proxy
  if ((evs_ptr) &&
      sample_add_filter(channel, proxy, *evs_ptr, constraint_expr, objnm, verbose)) {
    return 0; // adding filter failed
  }
  // write proxy IOR to file
  write_ior_to_file(orb, proxy, proxy_ior_file, verbose);
  // Construct a client
  StructuredPullConsumer_i* client =
    new StructuredPullConsumer_i(proxy, max_events, objnm, consume_fn, change_fn, millisecs, verbose);
  return client;
}


CORBA::Boolean StructuredPullConsumer_i::connect() {
  omni_mutex_lock l(_oplock);
  if (_done) return 0;
  _com_err = 0;
  try {
    _my_proxy->connect_structured_pull_consumer(_this());
    if (_change_fn) {
      _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_ON);
    } else {
      _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_OFF);
    }
  } catch (CORBA::BAD_PARAM& ex) {
    cerr << _obj_name << ": BAD_PARAM Exception while connecting" << endl;
    return 1; // error
  } catch (CosEventChannelAdmin::AlreadyConnected& ex) {
    cerr << _obj_name << ": Already connected" << endl;
    return 1; // error
  } catch (...) {
    cerr << _obj_name << ": Failed to connect" << endl;
    return 1; // error
  }
  // spawn a thread to do pulling
  if (_verbose)
    cout << _obj_name << ": Connected to proxy, ready to consume events" << endl;
  _worker = new GenericBoundWorkerThread(this);
  return 0; // OK
}

void StructuredPullConsumer_i::cleanup() {
  _oplock.lock();
  if (_worker || (!_done)) {
    cerr << "Coding error: only call c->cleanup() after c->wait_done()" << endl;
    _oplock.unlock();
    return;
  }
  // this method takes sole ownership of _my_proxy ref
  CosNA_StructuredProxyPullSupplier_var proxy = _my_proxy;
  _my_proxy = CosNA_StructuredProxyPullSupplier::_nil();
  // do not hold oplock while invoking disconnect
  _oplock.unlock();
  try {
    if ((!_com_err) && (!CORBA::is_nil(proxy)))
      proxy->disconnect_structured_pull_supplier();
  } catch(...) {}
}

void* StructuredPullConsumer_i::start_working(void *) {
  CosNC_StructuredPullConsumer_var bump_my_refcount_during_outcalls = _this();

  unsigned long t_secs = 0, t_nanosecs = 0;
  unsigned long d_secs = 0, d_nanosecs = 0;
  if (_millisecs) {
    d_secs = _millisecs / 1000;
    d_nanosecs = (_millisecs % 1000) * 1000000;
  }
  CosN_StructuredEvent* data = 0;
  // invariants: _oplock is held entering top of loop and
  // also whenever we break out of loop
  // (it is *not* held during pull_structured_event(), yield(), and timedwait() calls)
  _oplock.lock();
  if (_verbose && (!_done))
    cout << _obj_name << ": Spawned thread entering main pull loop" << endl;
  while ( 1 ) {
    if (_done) break; // must have disconnected
    _oplock.unlock(); // do not hold oplock across pull
    try {
      data = _my_proxy->pull_structured_event();
    } catch (...) {
      _oplock.lock();
      if (_done) break; // must have disconnected during pull
      if (_verbose)
        cout << _obj_name << ": communication error while calling pull_structured_event()" << endl;
      _com_err = 1;
      break; // break from while loop -- done
    }
    _oplock.lock();
    if (_done) break; // must have disconnected during pull
    if (!data) {
      if (_verbose)
        cout << _obj_name << ": strange failure: pull_structured_event() returned nil" << endl;
      _com_err = 1;
      break; // break from while loop -- done
    }
    // got an event
    _num_events++;
    if (_consume_fn)
      (*_consume_fn)(*data, _obj_name, _num_events, _verbose);
    else if (_verbose)
      cout << _obj_name << ": event count = " << _num_events << endl;
    delete data;
    data = 0;
    if (_max_events && (_num_events >= _max_events)) {
      if (_verbose)
        cout << _obj_name << ": DONE [max_events reached]" << endl;
      break; // done
    }
    if (_millisecs) { // sleep for specified interval
      omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs);
      _finish.timedwait(t_secs, t_nanosecs); // this release _oplock during wait
    } else { // use yield to let other threads have a shot at _oplock
      _oplock.unlock();
      omni_thread::yield();
      _oplock.lock();
    }
    // continue pulling
  }
  // done
  delete data;
  if (!_done) {
    _done = 1;
    _finish.broadcast();
  }
  _oplock.unlock();
  return 0;
}


void StructuredPullConsumer_i::disconnect_structured_pull_consumer()
{
  if (_done) return; // see "Optimization Notes" at top

  omni_mutex_lock l(_oplock);
  if (_done) return;
  if (_verbose)
    cout << _obj_name << ": disconnected" << endl;
  _done = 1;
  _finish.broadcast();
}

CORBA::Boolean StructuredPullConsumer_i::wait_done() {
  _oplock.lock();
  while (!_done) {
    _finish.wait();
  }
  // using tmp_worker ensures only one wait_done call tries join()
  GenericBoundWorkerThread* tmp_worker = _worker;
  _worker = 0;
  _oplock.unlock();
  if (tmp_worker) {
    tmp_worker->join(0);
  }
  return _com_err;
}

void StructuredPullConsumer_i::offer_change(const CosN_EventTypeSeq& added,
                                             const CosN_EventTypeSeq& deled)
{
  if (_done) return; // see "Optimization Notes" at top

  omni_mutex_lock l(_oplock);
  if (_done) return;
  if (_verbose)
    cout << _obj_name << ": offer_change received" << endl;
  if (_change_fn) (*_change_fn)(added, deled, _obj_name, _verbose);
}


 


Back to Documentation Home Page


For comments, feedback, etc, please see the 'Keep in touch' page.