omniNotify Logo Free High Performance CORBA Notification Service
from AT&T Laboratories
ATT logo omniORB Home
AT&T Research
Home SF Project Features Release
Notes
Download Instal-
lation
Documen-
tation
Patch &
Bug List
More
Info
Keep in
Touch

Back to Documentation Home Page

Developing Push Consumers

This section goes over the details of the StructuredPushConsumer_i class found in sample_clients.h / sample_clients.cc.  This class does all the real work of the example program struct_push_consumer.cc.  (As discussed in the Common Code section, the main program creates a StructuredPushConsumer_i object using the class's create function, tells the new client object to connect to its proxy using the connect method, and then waits for the desired number of events to be consumed using the wait_done method.)

Before we begin looking at the code, here are a few things you should know. Clients of a notification channel connect to that channel in 3 steps: (1) acquire a reference to an admin object; (2) from the admin, acquire a reference to a proxy object object; (3) connect to that proxy. Any number of clients can use the same admin object, but there is a 1-to-1 mapping between client and proxy objects (each client has its own proxy). The admin and proxy objects are the client's 2 contact points with the notification channel. The client invokes methods on these objects to carry out such actions as adding or removing filtering constraints. A filter that is added at an admin applies to all clients connected to any proxy acquired from that admin (allowing a set of clients to share the filter). A filter added to a proxy only applies to the 1 client that is connected to that proxy.

The rest of this section covers the class declaration (in sample_clients.h) and the class implementation (in sample_clients.cc).


Class Declaration (sample_clients.h)

Here is the beginning of the class declaration:


class StructuredPushConsumer_i :
  public GenericWorker,
  public POA_CosNotifyComm::StructuredPushConsumer,
  public PortableServer::RefCountServantBase

A push consumer of structured events must implement the CosNotifyComm::StructuredPushConsumer interface.  Thus, class StructuredPushConsumer_i inherits from the POA skeleton class POA_CosNotifyComm::StructuredPushConsumer.  It also inherits from the ``mix-in'' class PortableServer::RefCountServantBase to enable automatic object disposal based on reference counting: the POA will destroy the object when its reference count reaches zero. Finally, the class inherits from class GenericWorker. GenericWorker is defined at the top of sample_clients.h. Any class that inherits from GenericWorker must implement the start_working method, which will run in its own thread. As we'll see below, start_working will be used to monitor that liveness of the notification channel.

The class has one constructor and one static function called create. New instances of the class are normally generated using the create function. This function has a large number of arguments, used to configure the consumer in various ways. Most of these parameters have default values; only the first 2 parameters are mandatory. The declarations of the constructor and static create function:


  StructuredPushConsumer_i(CosNA_StructuredProxyPushSupplier_ptr proxy,
                           CORBA::ULong max_events,
                           const char* objnm,
                           consume_structured_fn* consume_fn,
                           type_change_fn* change_fn,
                           CORBA::ULong millisecs,
                           CORBA::Boolean verbose);

  static StructuredPushConsumer_i*
    create(CORBA::ORB_ptr orb,
           CosNA_EventChannel_ptr channel,
           CORBA::ULong max_events = 0,
           CORBA::ULong batch_size = 0,
           const char* objnm = "StructuredPushConsumer_i",
           const char* proxy_ior_file = "",
           consume_structured_fn* consume_fn = sample_consume_structured_fn,
           type_change_fn* change_fn = sample_offer_change_fn,
           CosN_EventTypeSeq* evs_ptr = 0,
           const char* constraint_expr = "",
           CORBA::ULong millisecs = 0,
           CORBA::Boolean verbose = 0);

Here is a rundown of the parameters:

* batch_size is passed to all create functions so that there is a consistent interface across all 12 classes in sample_clients.h, but it is only used by the push and pull batch supplier classes.

Next, there are declarations of two kinds of methods.


  // IDL methods
  void push_structured_event(const CosN_StructuredEvent & data);
  void disconnect_structured_push_consumer();
  void offer_change(const CosN_EventTypeSeq& added,
                    const CosN_EventTypeSeq& deled);

The IDL Methods are the methods required by any class implementing CosNotifyComm::StructuredPushConsumer;   these methods are invoked remotely by the notification channel.


  // Local methods
  CORBA::Boolean connect();
  CORBA::Boolean wait_done();
  void cleanup();
  void* start_working(void *);

The ``local methods'' are additional methods that are only invoked locally on the implementation object.  The first 3 methods are invoked by the main program after creating a new consumer object.

Finally, the class has some private state:


protected:
  CosNA_StructuredProxyPushSupplier_var _my_proxy;
  const char*                            _obj_name;
  consume_structured_fn*                 _consume_fn;
  type_change_fn*                        _change_fn;
  CORBA::ULong                           _num_events;
  CORBA::ULong                           _max_events;
  CORBA::ULong                           _millisecs;
  CORBA::Boolean                         _verbose;
  CORBA::Boolean                         _done;
  CORBA::Boolean                         _com_err;
  omni_mutex                             _oplock;
  omni_condition                         _finish;
  GenericBoundWorkerThread*              _worker;
};

The members _obj_name, _consume_fn, _change_fn, _max_events, _millisecs, and _verbose simply keep track of configuration parameters passed to the constructor.  The member _my_proxy is a reference to the proxy object obtained from the channel (via an admin object -- a reference to the admin object is not retained because this client does not do anything further with the admin after obtaining _my_proxy).  The _done flag tracks the done condition discussed above, while _com_err tracks whether the ping loop has detected a communications error.  The mutex _oplock is used to ensure that only one method at a time is reading or modifying these members, while the condition variable _finish is used to wait for _done to become true.  The _worker member keeps track of the spawned worker thread.

Complete Declaration

Here is the complete class declaration:


class StructuredPushConsumer_i :
  public GenericWorker,
  public POA_CosNotifyComm::StructuredPushConsumer,
  public PortableServer::RefCountServantBase
{
public:
  StructuredPushConsumer_i(CosNA_StructuredProxyPushSupplier_ptr proxy,
                           CORBA::ULong max_events,
                           const char* objnm,
                           consume_structured_fn* consume_fn,
                           type_change_fn* change_fn,
                           CORBA::ULong millisecs,
                           CORBA::Boolean verbose);

  static StructuredPushConsumer_i*
    create(CORBA::ORB_ptr orb,
           CosNA_EventChannel_ptr channel,
           CORBA::ULong max_events = 0,
           CORBA::ULong batch_size = 0,
           const char* objnm = "StructuredPushConsumer_i",
           const char* proxy_ior_file = "",
           consume_structured_fn* consume_fn = sample_consume_structured_fn,
           type_change_fn* change_fn = sample_offer_change_fn,
           CosN_EventTypeSeq* evs_ptr = 0,
           const char* constraint_expr = "",
           CORBA::ULong millisecs = 0,
           CORBA::Boolean verbose = 0);

  // IDL methods
  void push_structured_event(const CosN_StructuredEvent & data);
  void disconnect_structured_push_consumer();
  void offer_change(const CosN_EventTypeSeq& added,
                    const CosN_EventTypeSeq& deled);

  // Local methods
  CORBA::Boolean connect();
  CORBA::Boolean wait_done();
  void cleanup();
  void* start_working(void *);

protected:
  CosNA_StructuredProxyPushSupplier_var _my_proxy;
  const char*                            _obj_name;
  consume_structured_fn*                 _consume_fn;
  type_change_fn*                        _change_fn;
  CORBA::ULong                           _num_events;
  CORBA::ULong                           _max_events;
  CORBA::ULong                           _millisecs;
  CORBA::Boolean                         _verbose;
  CORBA::Boolean                         _done;
  CORBA::Boolean                         _com_err;
  omni_mutex                             _oplock;
  omni_condition                         _finish;
  GenericBoundWorkerThread*              _worker;
};

 


Class Implementation (sample_clients.cc)

Optimization Note

A number of methods have the following form:


void method_name (...) {
  if (_done) return;
  ...
  omni_mutex_lock l(_oplock);
  if (_done) return;
  ...
  }

The _done check prior to obtaining the lock is an optimization; it could be removed and the code would still be correct. It is useful because it avoids unnecessary synchronization during shutdown/cleanup of the client.

Because _done can change while the lock is not held, any time the code first acquires (or releases then re-acquires) _oplock, it must do another _done check. So the 2nd check, which looks redundant, is actually required!

Constructor

The constructor for StructuredPushConsumer_i simply initializes all of the private members:


StructuredPushConsumer_i::
StructuredPushConsumer_i(CosNA_StructuredProxyPushSupplier_ptr proxy,
                          CORBA::ULong max_events, const char* objnm,
                          consume_structured_fn* consume_fn, type_change_fn* change_fn,
                          CORBA::ULong millisecs, CORBA::Boolean verbose) :
  _my_proxy(proxy), _obj_name(objnm), _consume_fn(consume_fn), _change_fn(change_fn),
  _num_events(0), _max_events(max_events), _millisecs(millisecs), _verbose(verbose),
  _done(0), _com_err(0), _oplock(), _finish(&_oplock), _worker(0)
{
// nothing else to do
}

The only non-obvious initialization is that the _finish condition variable is initailized with a pointer to the _oplock mutex -- a condition variable must always be associated with a mutex, and a method must hold the mutex before waiting on the mutex.   (See the omnithread documentation for the full mutex and condition variable APIs.)

Static create function

The static create function does four things.


StructuredPushConsumer_i*
StructuredPushConsumer_i::create(CORBA::ORB_ptr orb,
                                  CosNA_EventChannel_ptr channel,
                                  CORBA::ULong max_events,
                                  CORBA::ULong batch_size,
                                  const char* objnm,
                                  const char* proxy_ior_file,
                                  consume_structured_fn* consume_fn,
                                  type_change_fn* change_fn,
                                  CosN_EventTypeSeq* evs_ptr,
                                  const char* constraint_expr,
                                  CORBA::ULong millisecs,
                                   CORBA::Boolean verbose)
{
  // (1) Obtain appropriate proxy object
  CosNA_ProxySupplier_var generic_proxy =
  get_proxy_supplier(channel, CosNA_STRUCTURED_EVENT, 1, verbose); // 1 is push 0 is pull
  CosNA_StructuredProxyPushSupplier_ptr proxy =
   CosNA_StructuredProxyPushSupplier::_narrow(generic_proxy);
  if ( CORBA::is_nil(proxy) ) {
    return 0; // get_proxy_supplier failed
  }
  // (2) If evs or constraint_expr are non-empty, add a filter to proxy
  if ((evs_ptr) &&
       sample_add_filter(channel, proxy, *evs_ptr, constraint_expr, objnm, verbose)) {
    return 0; // adding filter failed
  }
  // (3) write proxy IOR to file
  write_ior_to_file(orb, proxy, proxy_ior_file, verbose);
  // (4) Construct a client
  StructuredPushConsumer_i* client =
  new StructuredPushConsumer_i(proxy, max_events, objnm, consume_fn, change_fn, millisecs, verbose);
  return client;
}

(1) It obtains an appropriate proxy object using the helper function get_proxy_supplier. The returned proxy reference is a generic proxy supplier; it is narrowed to a CosNotifyChannelAdmin::StructuredProxyPushSupplier.  Note that consumers connect to proxy suppliers, while suppliers connect to proxy consumers.  Thus our structured   push consumer client must connect to a structured proxy push supplier.

(2) If a list of event types or a constraint expression is specified, the helper function sample_add_filter is called.  This helper function constructs a filter with one constraint that captures the required event types and the constraint expression and adds the filter to the consumer's proxy.

(3) The helper routine write_ior_to_file is called with the proxy_ior_file.   This string may be an empty string, in which case nothing is done, otherwise the proxy's object reference is stringified and written to the specified file.

(4) A new StructuredPushConsumer_i object is created with appropriate parameters as passed in to the create function, and a reference to this object is returned by create.

Connect Method

The connect method connects the client to the proxy that was obtained during the create call.  Once a push consumer is connected,  the channel will begin pushing events to it, so this is the step that starts the consuming of events.  After connecting, the obtain_offered_types method is invoked on the proxy with one of two arguments:   CosNA_NONE_NOW_UPDATES_ON  enables offer_change messages, while CosNA_NONE_NOW_UPDATES_OFF  disables  offer_change messages.  The messages are only enabled if a non-NULL offer_change function was provided to create (as stored in _change_fn).  Finally, if a non-zero millisecs value was provided to create, then a new worker thread is created -- the thread constructor immediately executes the method start_worker (discussed next).


CORBA::Boolean StructuredPushConsumer_i::connect() {
  omni_mutex_lock l(_oplock);
  if (_done) return 0;
  _com_err = 0;
  try {
    _my_proxy->connect_structured_push_consumer(_this());
    if (_change_fn) {
      _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_ON);
    } else {
      _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_OFF);
    }
  } catch (CORBA::BAD_PARAM& ex) {
    cerr << _obj_name << ": BAD_PARAM Exception while connecting" << endl;
    return 1; // error
  } catch (CosEventChannelAdmin::AlreadyConnected& ex) {
    cerr << _obj_name << ": Already connected" << endl;
    return 1; // error
  } catch (...) {
    cerr << _obj_name << ": Failed to connect" << endl;
    return 1; // error
  }
  if (_verbose) cout << _obj_name << ": Connected to proxy, ready to consume events" << endl;
  // if _millisecs is set, spawn a thread to ping the proxy
  if (_millisecs)
  _worker = new GenericBoundWorkerThread(this);
  return 0; // OK
}

Start_working Method

The start_working method performs one call to the proxy every _millisecs milliseconds.   The only purpose of the call is check whether the proxy is still responding to messages.  (The MyType() method is used, where the return value is ignored.)  If one of these 'ping' calls fails, then the _done flag is set to true to indicate a done condition has been reached, the _com_err flag is set to true to indicate a communication error occurred, and the _finish condition variable is signalled (using broadcast) to wake up any thread that is waiting on this condition variable.  (See the wait_done method below.)   The milliseconds value must be converted to seconds and nanoseconds --   d_secs and d_nanosecs hold this conversion.  The call

   omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs);

stores a time that is d_secs seconds and d_nanosecs nanoseconds later than the current clock time into the variables t_secs and t_nanosecs, and these values are then used in the timed_wait call, which takes an absolute clock time.  Note that the method terminates (which terminates the worker thread) as soon as it detects that _done is true.

An important part of the start_working code is its lock management paradigm.  It releases _oplock before invoking the 'ping' on the proxy, and  reacquires _oplock so that it can safely check and update the _done flag, _com_err flag, and so on.  Its call to timed_wait also implicitly releases _oplock during the wait period and reacquires the lock upon return.  As a general rule, an object should not hold a critical lock (such as a lock that controls access to all of its private state) during a call to a remove object (such as the proxy).  Remote calls can take a long time, and no other work will occur across a remote call if such a critical lock is held across the call.


void* StructuredPushConsumer_i::start_working(void *) {
  CosNC_StructuredPushConsumer_var bump_my_refcount_during_outcalls = _this();

  _oplock.lock();
  if (_verbose && (!_done) && _millisecs)
    cout << _obj_name << ": Spawned thread entering ping loop" << endl;
  // invariant: _oplock held at top of loop
  while ( 1 ) {
    if (_done || (_millisecs == 0)) break;
    unsigned long t_secs = 0, t_nanosecs = 0;
    unsigned long d_secs = 0, d_nanosecs = 0;
    d_secs = _millisecs / 1000;
    d_nanosecs = (_millisecs % 1000) * 1000000;
    _oplock.unlock(); // do not hold oplock across ping
    try {
      _my_proxy->MyType();
    } catch (...) {
      _oplock.lock();
      if (_done) break;
      if (_verbose) cout << _obj_name << ": communication error while pinging proxy" << endl;
      _done = 1;
      _finish.broadcast();
      _com_err = 1;
      break; // break from while loop -- done
    }
    _oplock.lock();
    if (_done) break; // must have disconnected during pull
    // sleep for specified interval
    omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs);
    _finish.timedwait(t_secs, t_nanosecs); // releases _oplock during wait
    // continue ping loop
  }
  // done
  _oplock.unlock();
  return 0;
}

Push_structured_event Method

The push_structured_event method is called by the channel for each structured event to be consumed.  This method  increments the count of the number of events received, _num_events.  If a consume function has been specified, it is invoked.   The default consume function for a structured push consumer is called sample_consume_structured_fn and can be found in sample_functions.cc.  If _num_events has reached _max_events, the specified target event count, then _done is set to true and the condition variable _finish is signalled using broadcast.  This will wake any thread waiting on that condition (see wait_done).


void StructuredPushConsumer_i::push_structured_event(const CosN_StructuredEvent& data)
{
  if (_done) return; // see "Optimization Notes" at top

  omni_mutex_lock l(_oplock);
  if (_done) return;
  _num_events++;
  if (_consume_fn)
    (*_consume_fn)(data, _obj_name, _num_events, _verbose);
  else if (_verbose) cout << _obj_name << ": event count = " << _num_events << endl;
  if (_max_events && (_num_events >= _max_events)) {
    if (_verbose) cout << _obj_name << ": DONE [max_events reached]" << endl;
    _done = 1;
    _finish.broadcast();
  }
}

Disconnect_structured_push_consumer Method

The disconnect_structured_push_consumer method is not used (it is not called by the channel).  An implementation must be provided, however, because it is part of the API for any StructuredPushConsumer.  In this code below, the _done condition is set and the condition variable _finish is signalled using broadcast.


void StructuredPushConsumer_i::disconnect_structured_push_consumer()
{
  if (_done) return; // see "Optimization Notes" at top

  omni_mutex_lock l(_oplock);
  if (_done) return;
  if (_verbose) cout << _obj_name << ": disconnected" << endl;
  _done = 1;
  _finish.broadcast();
}

Wait_done Method

The wait_done method waits for the _done flag to become true (if it is not already true) by waiting on the _finsih condition variable.  Three methods above can set _done to true, and each signals _finish once _done is true.   After _done is true, wait_done calls 'join' on the worker thread; this call returns once the worker thread terminates (the worker thread will terminate as soon as it detects _done is true).   wait_done returns _com_err, which will only be true if an exception occurred when 'pinging' the proxy.

Note that a local variable, tmp_worker, is used to do the join after releasing the lock; it is important not to hold the lock while waiting for the worker, since the worker may need to acquire teh lock to terminate.  _worker is set to NULL before releasing the lock so that only one call to _wait_done will result in a join call (thus the need to introduce a temporary variable to hold the value of  _worker before changing its value to NULL).


CORBA::Boolean StructuredPushConsumer_i::wait_done() {
  _oplock.lock();
  while (!_done) {
    _finish.wait();
  }
  // using tmp_worker ensures only one wait_done call tries join()
   GenericBoundWorkerThread* tmp_worker = _worker;
  _worker = 0;
  _oplock.unlock();
  if (tmp_worker) {
    tmp_worker->join(0);
  }
  return _com_err;
}

Offer_change Method

The offer_change method calls the change function that was passed to create, passing in the added and deleted event type sequences.  (If a NULL pointer was used for the function pointer, no call is made.)  Note that offer_change, like push_structured_event, does nothing if the _done flag has been set to true.


void StructuredPushConsumer_i::offer_change(const CosN_EventTypeSeq& added,
                    const CosN_EventTypeSeq& deled)
{
  if (_done) return; // see "Optimization Notes" at top

  omni_mutex_lock l(_oplock);
  if (_done) return;
  if (_verbose) cout << _obj_name << ": offer_change received" << endl;
  if (_change_fn) (*_change_fn)(added, deled, _obj_name, _verbose);
}

Cleanup Method

The cleanup method returns immediately with an error output if either _worker is non-NULL or _done is false; both of these tests indicate that wait_done has not been called (ntoe that wait_done sets _worker to NULL).   Otherwise, it sets a local variable, proxy, to the value of _my_proxy, and sets _my_proxy to be a nil reference.   It then releases the lock and calls the method disconnect_structured_push_supplier on the proxy reference.  By setting _my_proxy to a nil reference, it ensures that only one call to cleanup will result in a disconnect call on the proxy.  Also note that the disconnect is not performed if a 'ping' resulted in an error (on the assumption that the proxy is no longer valid).  As with the ping call, cleanup follows the general rule that _oplock should not be held during a call to a remote object (the proxy).


void StructuredPushConsumer_i::cleanup() {
  _oplock.lock();
  if (_worker || (!_done)) {
    cerr << "Coding error: only call c->cleanup() after c->wait_done()" << endl;
    _oplock.unlock();
    return;
  }
  // this method takes sole ownership of _my_proxy ref
  CosNA_StructuredProxyPushSupplier_var proxy = _my_proxy;
  _my_proxy = CosNA_StructuredProxyPushSupplier::_nil();
  // do not hold oplock while invoking disconnect
  _oplock.unlock();
  try {
    if ((!_com_err) && (!CORBA::is_nil(proxy)))
      proxy->disconnect_structured_push_supplier();
  } catch(...) {}
}

Complete Implementation

Here is the complete listing for the implementation of StructuredPushConsumer_i.


StructuredPushConsumer_i::
StructuredPushConsumer_i(CosNA_StructuredProxyPushSupplier_ptr proxy,
                          CORBA::ULong max_events, const char* objnm,
                          consume_structured_fn* consume_fn, type_change_fn* change_fn,
                          CORBA::ULong millisecs, CORBA::Boolean verbose) :
  _my_proxy(proxy), _obj_name(objnm), _consume_fn(consume_fn), _change_fn(change_fn),
  _num_events(0), _max_events(max_events), _millisecs(millisecs), _verbose(verbose),
  _done(0), _com_err(0), _oplock(), _finish(&_oplock), _worker(0)
{
// nothing else to do
}

StructuredPushConsumer_i*
StructuredPushConsumer_i::create(CORBA::ORB_ptr orb,
                                  CosNA_EventChannel_ptr channel,
                                  CORBA::ULong max_events,
                                  CORBA::ULong batch_size,
                                  const char* objnm,
                                  const char* proxy_ior_file,
                                  consume_structured_fn* consume_fn,
                                  type_change_fn* change_fn,
                                  CosN_EventTypeSeq* evs_ptr,
                                  const char* constraint_expr,
                                  CORBA::ULong millisecs,
                                   CORBA::Boolean verbose)
{
  // Obtain appropriate proxy object
  CosNA_ProxySupplier_var generic_proxy =
  get_proxy_supplier(channel, CosNA_STRUCTURED_EVENT, 1, verbose); // 1 is push 0 is pull
  CosNA_StructuredProxyPushSupplier_ptr proxy =
   CosNA_StructuredProxyPushSupplier::_narrow(generic_proxy);
  if ( CORBA::is_nil(proxy) ) {
    return 0; // get_proxy_supplier failed
  }
  // If evs or constraint_expr are non-empty, add a filter to proxy
  if ((evs_ptr) &&
       sample_add_filter(channel, proxy, *evs_ptr, constraint_expr, objnm, verbose)) {
    return 0; // adding filter failed
  }
  // write proxy IOR to file
  write_ior_to_file(orb, proxy, proxy_ior_file, verbose);
  // Construct a client
  StructuredPushConsumer_i* client =
  new StructuredPushConsumer_i(proxy, max_events, objnm, consume_fn, change_fn, millisecs, verbose);
  return client;
}


CORBA::Boolean StructuredPushConsumer_i::connect() {
  omni_mutex_lock l(_oplock);
  if (_done) return 0;
  _com_err = 0;
  try {
    _my_proxy->connect_structured_push_consumer(_this());
    if (_change_fn) {
      _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_ON);
    } else {
      _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_OFF);
    }
  } catch (CORBA::BAD_PARAM& ex) {
    cerr << _obj_name << ": BAD_PARAM Exception while connecting" << endl;
    return 1; // error
  } catch (CosEventChannelAdmin::AlreadyConnected& ex) {
    cerr << _obj_name << ": Already connected" << endl;
    return 1; // error
  } catch (...) {
    cerr << _obj_name << ": Failed to connect" << endl;
    return 1; // error
  }
  if (_verbose) cout << _obj_name << ": Connected to proxy, ready to consume events" << endl;
  // if _millisecs is set, spawn a thread to ping the proxy
  if (_millisecs)
  _worker = new GenericBoundWorkerThread(this);
  return 0; // OK
}

// While _millisecs is positive, pings proxy every _millisecs milliseconds.
// Sets error code and sets _done to 1 if there is an error during ping.
void* StructuredPushConsumer_i::start_working(void *) {
  CosNC_StructuredPushConsumer_var bump_my_refcount_during_outcalls = _this();

  _oplock.lock();
  if (_verbose && (!_done) && _millisecs)
    cout << _obj_name << ": Spawned thread entering ping loop" << endl;
  // invariant: _oplock held at top of loop
  while ( 1 ) {
    if (_done || (_millisecs == 0)) break;
    unsigned long t_secs = 0, t_nanosecs = 0;
    unsigned long d_secs = 0, d_nanosecs = 0;
    d_secs = _millisecs / 1000;
    d_nanosecs = (_millisecs % 1000) * 1000000;
    _oplock.unlock(); // do not hold oplock across ping
    try {
      _my_proxy->MyType();
    } catch (...) {
      _oplock.lock();
      if (_done) break;
      if (_verbose) cout << _obj_name << ": communication error while pinging proxy" << endl;
      _done = 1;
      _finish.broadcast();
      _com_err = 1;
      break; // break from while loop -- done
    }
    _oplock.lock();
    if (_done) break; // must have disconnected during pull
    // sleep for specified interval
    omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs);
    _finish.timedwait(t_secs, t_nanosecs); // this release _oplock during wait
    // continue ping loop
  }
  // done
  _oplock.unlock();
  return 0;
}

void StructuredPushConsumer_i::push_structured_event(const CosN_StructuredEvent& data)
{
  if (_done) return; // see "Optimization Notes" at top

  omni_mutex_lock l(_oplock);
  if (_done) return;
  _num_events++;
  if (_consume_fn)
    (*_consume_fn)(data, _obj_name, _num_events, _verbose);
  else if (_verbose) cout << _obj_name << ": event count = " << _num_events << endl;
  if (_max_events && (_num_events >= _max_events)) {
    if (_verbose) cout << _obj_name << ": DONE [max_events reached]" << endl;
    _done = 1;
    _finish.broadcast();
  }
}

void StructuredPushConsumer_i::disconnect_structured_push_consumer()
{
  if (_done) return; // see "Optimization Notes" at top

  omni_mutex_lock l(_oplock);
  if (_done) return;
  if (_verbose) cout << _obj_name << ": disconnected" << endl;
  _done = 1;
  _finish.broadcast();
}

CORBA::Boolean StructuredPushConsumer_i::wait_done() {
  _oplock.lock();
  while (!_done) {
    _finish.wait();
  }
  // using tmp_worker ensures only one wait_done call tries join()
   GenericBoundWorkerThread* tmp_worker = _worker;
  _worker = 0;
  _oplock.unlock();
  if (tmp_worker) {
    tmp_worker->join(0);
  }
  return _com_err;
}

void StructuredPushConsumer_i::offer_change(const CosN_EventTypeSeq& added,
                                             const CosN_EventTypeSeq& deled)
{
  if (_done) return; // see "Optimization Notes" at top

  omni_mutex_lock l(_oplock);
  if (_done) return;
  if (_verbose) cout << _obj_name << ": offer_change received" << endl;
  if (_change_fn) (*_change_fn)(added, deled, _obj_name, _verbose);
}

void StructuredPushConsumer_i::cleanup() {
  _oplock.lock();
  if (_worker || (!_done)) {
    cerr << "Coding error: only call c->cleanup() after c->wait_done()" << endl;
    _oplock.unlock();
    return;
  }
  // this method takes sole ownership of _my_proxy ref
  CosNA_StructuredProxyPushSupplier_var proxy = _my_proxy;
  _my_proxy = CosNA_StructuredProxyPushSupplier::_nil();
  // do not hold oplock while invoking disconnect
  _oplock.unlock();
  try {
    if ((!_com_err) && (!CORBA::is_nil(proxy)))
      proxy->disconnect_structured_push_supplier();
  } catch(...) {}
}

 


Back to Documentation Home Page


For comments, feedback, etc, please see the 'Keep in touch' page.