Free High Performance CORBA Notification Service from AT&T Laboratories |
omniORB Home AT&T Research |
Back to Documentation Home Page
This section goes over the details of the StructuredPullConsumer_i
class
found in sample_clients.h / sample_clients.cc. This class does all the real work of
the example program struct_push_consumer.cc. (As discussed in the Common Code section, the main program creates a
StructuredPushConsumer_i object using the class's create
function, tells the
new client object to connect to its proxy using the connect
method, and then
waits for the desired number of events to be consumed using the wait_done
method.)
This section assumes you have read the section covering class StructuredPushConsumer_i. All the client code has the same basic structure, so we do not repeat things here that are covered in that section.
The rest of this section covers the class declaration (in sample_clients.h) and the class implementation (in sample_clients.cc).
Here is the beginning of the class declaration:
class StructuredPullConsumer_i : public GenericWorker, public POA_CosNotifyComm::StructuredPullConsumer, public PortableServer::RefCountServantBase |
A pull consumer of structured events must implement the CosNotifyComm::StructuredPullConsumer
interface. Thus, class StructuredPullConsumer_i
inherits from the POA
skeleton class POA_CosNotifyComm::StructuredPullConsumer
. It also
inherits from the ``mix-in'' class PortableServer::RefCountServantBase
to
enable automatic object disposal based on reference counting: the POA will destroy the
object when its reference count reaches zero. Finally, the class inherits from class GenericWorker
.
GenericWorker is defined at the top of sample_clients.h. Any class that inherits from
GenericWorker must implement the start_working
method, which will run in its
own thread. As we'll see below, start_working
will be used to pull events
from the notification channel.
The rest of the class declaration is very similar to the push consumer class declaration, with obvious changes. For eample, the type of _my_proxy is CosNotifyChannelAdmin::StructuredProxyPullSupplier_var. (A structured pull consumer connects to a structured proxy pull supplier.) The local methods are the same, and the IDL Methods are almost the same -- a pull consumer does not have a push_structured_events method.
Here is the complete class declaration:
class StructuredPullConsumer_i : public GenericWorker, public POA_CosNotifyComm::StructuredPullConsumer, public PortableServer::RefCountServantBase { public: StructuredPullConsumer_i(CosNA_StructuredProxyPullSupplier_ptr proxy, CORBA::ULong max_events, const char* objnm, consume_structured_fn* consume_fn, type_change_fn* change_fn, CORBA::ULong millisecs, CORBA::Boolean verbose); static StructuredPullConsumer_i* create(CORBA::ORB_ptr orb, CosNA_EventChannel_ptr channel, CORBA::ULong max_events = 0, CORBA::ULong batch_size = 0, const char* objnm = "StructuredPullConsumer_i", const char* proxy_ior_file = "", consume_structured_fn* consume_fn = sample_consume_structured_fn, type_change_fn* change_fn = sample_offer_change_fn, CosN_EventTypeSeq* evs_ptr = 0, const char* constraint_expr = "", CORBA::ULong millisecs = 0, CORBA::Boolean verbose = 0); // IDL methods void disconnect_structured_push_consumer(); void offer_change(const CosN_EventTypeSeq& added, const CosN_EventTypeSeq& deled); // Local methods CORBA::Boolean connect(); CORBA::Boolean wait_done(); void cleanup(); void* start_working(void *); protected: CosNA_StructuredProxyPullSupplier_var _my_proxy; const char* _obj_name; consume_structured_fn* _consume_fn; type_change_fn* _change_fn; CORBA::ULong _num_events; CORBA::ULong _max_events; CORBA::ULong _millisecs; CORBA::Boolean _verbose; CORBA::Boolean _done; CORBA::Boolean _com_err; omni_mutex _oplock; omni_condition _finish; GenericBoundWorkerThread* _worker; }; |
The two classes have similar constructors and static create methods. In both cases the default consume function is sample_consume_structured_fn and the default offer change function is sample_offer_change_fn. The main difference between the classes is that rather than providing a push_structured_event method which the channel inovoked to provide events to the consumer, a pull consumer requests events from the channel by invoking pull_structured_event on its proxy. It performs this call in a worker thread, pausing for _millisecs milliseconds between each pull call. (A setting of zero means pull as fast as possible.) Because this worker thread is constantly polling the channel, there is no need for a separate 'ping thread' to monitor the liveness of the proxy.
Thus, the key method is the start_working method, which is spawned in a background thread by the connect method:
void* StructuredPullConsumer_i::start_working(void *) { CosNC_StructuredPullConsumer_var bump_my_refcount_during_outcalls = _this(); unsigned long t_secs = 0, t_nanosecs = 0; unsigned long d_secs = 0, d_nanosecs = 0; if (_millisecs) { d_secs = _millisecs / 1000; d_nanosecs = (_millisecs % 1000) * 1000000; } CosN_StructuredEvent* data = 0; // invariants: _oplock is held entering top of loop and // also whenever we break out of loop // (it is *not* held during pull_structured_event(), yield(), and timedwait() calls) _oplock.lock(); if (_verbose && (!_done)) cout << _obj_name << ": Spawned thread entering main pull loop" << endl; while ( 1 ) { if (_done) break; // must have disconnected _oplock.unlock(); // do not hold oplock across pull try { data = _my_proxy->pull_structured_event(); } catch (...) { _oplock.lock(); if (_done) break; // must have disconnected during pull if (_verbose) cout << _obj_name << ": communication error while calling pull_structured_event()" << endl; _com_err = 1; break; // break from while loop -- done } _oplock.lock(); if (_done) break; // must have disconnected during pull if (!data) { if (_verbose) cout << _obj_name << ": strange failure: pull_structured_event() returned nil" << endl; _com_err = 1; break; // break from while loop -- done } // got an event _num_events++; if (_consume_fn) (*_consume_fn)(*data, _obj_name, _num_events, _verbose); else if (_verbose) cout << _obj_name << ": event count = " << _num_events << endl; delete data; data = 0; if (_max_events && (_num_events >= _max_events)) { if (_verbose) cout << _obj_name << ": DONE [max_events reached]" << endl; break; // done } if (_millisecs) { // sleep for specified interval omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs); _finish.timedwait(t_secs, t_nanosecs); // releases _oplock during wait } else { // use yield to let other threads have a shot at _oplock _oplock.unlock(); omni_thread::yield(); _oplock.lock(); } // continue pulling } // done delete data; if (!_done) { _done = 1; _finish.broadcast(); } _oplock.unlock(); return 0; } |
start_working enters a while loop, while(1), which loops until a break statement is executed. At the top of the loop, _oplock is always held. Each time through the loop, the lock is released, pull_structured_event is called, then the lock is reacquired. As with a ping loop, it is important to release the lock across a call to a remote object so that other methods can execute during this call.
There are three things that cause a break from the while loop: _done has been set to 1 by another method (such as disconnect_structured_pull_consumer); the target event count is reached; an error occurs during a pull call. For the latter 2 cases, the code following the while loop will set _done to true and signal the _finish condition variable using broadcast. This will wake any thread waiting on that condition (see wait_done). For the case where an error occurs during a pull call, _com_err is set to true before executing the break statement.
For each event returned by pull_structured_event, the method increments the count of the number of events received, _num_events. If a consume function has been specified, it is invoked. If _num_events has reached _max_events, a break occurs.
After processing the event, if _millisecs is zero then omni_thread::yield() is called at the bottom of the while loop, otherwise timewait is called with the specified number of milliseconds (converted to an absolute seconds/nanoseconds time value). The yield is done to ensure that the thread running the start_working method does not 'hog' the processor... it gives other methods that need _oplock a chance to run. (Since start_working releases the lock on each pull, the yield call may not be strictly necessary.)
Here is the complete listing for the implementation of StructuredPullConsumer_i.
StructuredPullConsumer_i:: StructuredPullConsumer_i(CosNA_StructuredProxyPullSupplier_ptr proxy, CORBA::ULong max_events, const char* objnm, consume_structured_fn* consume_fn, type_change_fn* change_fn, CORBA::ULong millisecs, CORBA::Boolean verbose) : _my_proxy(proxy), _obj_name(objnm), _consume_fn(consume_fn), _change_fn(change_fn), _num_events(0), _max_events(max_events), _millisecs(millisecs), _verbose(verbose), _done(0), _com_err(0), _oplock(), _finish(&_oplock), _worker(0) { // nothing else to do } StructuredPullConsumer_i* StructuredPullConsumer_i::create(CORBA::ORB_ptr orb, CosNA_EventChannel_ptr channel, CORBA::ULong max_events, CORBA::ULong batch_size, const char* objnm, const char* proxy_ior_file, consume_structured_fn* consume_fn, type_change_fn* change_fn, CosN_EventTypeSeq* evs_ptr, const char* constraint_expr, CORBA::ULong millisecs, CORBA::Boolean verbose) { // Obtain appropriate proxy object CosNA_ProxySupplier_var generic_proxy = get_proxy_supplier(channel, CosNA_STRUCTURED_EVENT, 0, verbose); // 1 is push 0 is pull CosNA_StructuredProxyPullSupplier_ptr proxy = CosNA_StructuredProxyPullSupplier::_narrow(generic_proxy); if ( CORBA::is_nil(proxy) ) { return 0; // get_proxy_supplier failed } // If evs or constraint_expr are non-empty, add a filter to proxy if ((evs_ptr) && sample_add_filter(channel, proxy, *evs_ptr, constraint_expr, objnm, verbose)) { return 0; // adding filter failed } // write proxy IOR to file write_ior_to_file(orb, proxy, proxy_ior_file, verbose); // Construct a client StructuredPullConsumer_i* client = new StructuredPullConsumer_i(proxy, max_events, objnm, consume_fn, change_fn, millisecs, verbose); return client; } CORBA::Boolean StructuredPullConsumer_i::connect() { omni_mutex_lock l(_oplock); if (_done) return 0; _com_err = 0; try { _my_proxy->connect_structured_pull_consumer(_this()); if (_change_fn) { _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_ON); } else { _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_OFF); } } catch (CORBA::BAD_PARAM& ex) { cerr << _obj_name << ": BAD_PARAM Exception while connecting" << endl; return 1; // error } catch (CosEventChannelAdmin::AlreadyConnected& ex) { cerr << _obj_name << ": Already connected" << endl; return 1; // error } catch (...) { cerr << _obj_name << ": Failed to connect" << endl; return 1; // error } // spawn a thread to do pulling if (_verbose) cout << _obj_name << ": Connected to proxy, ready to consume events" << endl; _worker = new GenericBoundWorkerThread(this); return 0; // OK } void StructuredPullConsumer_i::cleanup() { _oplock.lock(); if (_worker || (!_done)) { cerr << "Coding error: only call c->cleanup() after c->wait_done()" << endl; _oplock.unlock(); return; } // this method takes sole ownership of _my_proxy ref CosNA_StructuredProxyPullSupplier_var proxy = _my_proxy; _my_proxy = CosNA_StructuredProxyPullSupplier::_nil(); // do not hold oplock while invoking disconnect _oplock.unlock(); try { if ((!_com_err) && (!CORBA::is_nil(proxy))) proxy->disconnect_structured_pull_supplier(); } catch(...) {} } void* StructuredPullConsumer_i::start_working(void *) { CosNC_StructuredPullConsumer_var bump_my_refcount_during_outcalls = _this(); unsigned long t_secs = 0, t_nanosecs = 0; unsigned long d_secs = 0, d_nanosecs = 0; if (_millisecs) { d_secs = _millisecs / 1000; d_nanosecs = (_millisecs % 1000) * 1000000; } CosN_StructuredEvent* data = 0; // invariants: _oplock is held entering top of loop and // also whenever we break out of loop // (it is *not* held during pull_structured_event(), yield(), and timedwait() calls) _oplock.lock(); if (_verbose && (!_done)) cout << _obj_name << ": Spawned thread entering main pull loop" << endl; while ( 1 ) { if (_done) break; // must have disconnected _oplock.unlock(); // do not hold oplock across pull try { data = _my_proxy->pull_structured_event(); } catch (...) { _oplock.lock(); if (_done) break; // must have disconnected during pull if (_verbose) cout << _obj_name << ": communication error while calling pull_structured_event()" << endl; _com_err = 1; break; // break from while loop -- done } _oplock.lock(); if (_done) break; // must have disconnected during pull if (!data) { if (_verbose) cout << _obj_name << ": strange failure: pull_structured_event() returned nil" << endl; _com_err = 1; break; // break from while loop -- done } // got an event _num_events++; if (_consume_fn) (*_consume_fn)(*data, _obj_name, _num_events, _verbose); else if (_verbose) cout << _obj_name << ": event count = " << _num_events << endl; delete data; data = 0; if (_max_events && (_num_events >= _max_events)) { if (_verbose) cout << _obj_name << ": DONE [max_events reached]" << endl; break; // done } if (_millisecs) { // sleep for specified interval omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs); _finish.timedwait(t_secs, t_nanosecs); // this release _oplock during wait } else { // use yield to let other threads have a shot at _oplock _oplock.unlock(); omni_thread::yield(); _oplock.lock(); } // continue pulling } // done delete data; if (!_done) { _done = 1; _finish.broadcast(); } _oplock.unlock(); return 0; } void StructuredPullConsumer_i::disconnect_structured_pull_consumer() { if (_done) return; // see "Optimization Notes" at top omni_mutex_lock l(_oplock); if (_done) return; if (_verbose) cout << _obj_name << ": disconnected" << endl; _done = 1; _finish.broadcast(); } CORBA::Boolean StructuredPullConsumer_i::wait_done() { _oplock.lock(); while (!_done) { _finish.wait(); } // using tmp_worker ensures only one wait_done call tries join() GenericBoundWorkerThread* tmp_worker = _worker; _worker = 0; _oplock.unlock(); if (tmp_worker) { tmp_worker->join(0); } return _com_err; } void StructuredPullConsumer_i::offer_change(const CosN_EventTypeSeq& added, const CosN_EventTypeSeq& deled) { if (_done) return; // see "Optimization Notes" at top omni_mutex_lock l(_oplock); if (_done) return; if (_verbose) cout << _obj_name << ": offer_change received" << endl; if (_change_fn) (*_change_fn)(added, deled, _obj_name, _verbose); } |
Back to Documentation Home Page
For comments, feedback, etc, please see the 'Keep in touch' page. |