Free High Performance CORBA Notification Service from AT&T Laboratories |
omniORB Home AT&T Research |
Back to Documentation Home Page
This section goes over the details of the StructuredPushSupplier_i
class
found in sample_clients.h / sample_clients.cc. This class does all the real work of
the example program struct_push_supplier.cc. (As discussed in the Common Code section, the main program creates a
StructuredPushSupplier_i object using the class's create
function, tells the
new client object to connect to its proxy using the connect
method, and then
waits for the desired number of events to be supplied using the wait_done
method.)
This section assumes you have read the section covering class StructuredPushConsumer_i. All the client code has the same basic structure, so we do not repeat things here that are covered in that section.
The rest of this section covers the class declaration (in sample_clients.h) and the class implementation (in sample_clients.cc).
Here is the beginning of the class declaration:
class StructuredPushSupplier_i : public GenericWorker, public POA_CosNotifyComm::StructuredPushSupplier, public PortableServer::RefCountServantBase |
A push supplier of structured events must implement the CosNotifyComm::StructuredPushSupplier
interface. Thus, class StructuredPushSupplier_i
inherits from the POA
skeleton class POA_CosNotifyComm::StructuredPushSupplie
r. It also
inherits from the ``mix-in'' class PortableServer::RefCountServantBase
to
enable automatic object disposal based on reference counting: the POA will destroy the
object when its reference count reaches zero. Finally, the class inherits from class GenericWorker
.
GenericWorker is defined at the top of sample_clients.h. Any class that inherits from
GenericWorker must implement the start_working
method, which will run in its
own thread. As we'll see below, start_working
will be used to push events to
the notification channel.
The constructor, create function, and most of the methods are the same as in the
declaration for StructuredPushConsumer_i, with these exceptions. First, _my_proxy
has type CosNA_StructuredProxyPushConsumer_var
(a structured push supplier
must connect to a structured proxy push consumer). Second, rather than having an
offer_change method, suppliers have a subscription_change method. (The default change
function for all suppliers is sample_subscription_change_fn.) Third, the create call takes
a function used for supplying structured events, instead of a function for consuming
events. The default supply function is sample_supply_structured_fn, and is found in
sample_functions.cc. Finally, there is no push method. A push supplier does not have a
method related to pushing. The proxy provides the method push_structured_event to support
pushing of events to the channel.
The following is the complete class declaration.
class StructuredPushSupplier_i : public GenericWorker, public POA_CosNotifyComm::StructuredPushSupplier, public PortableServer::RefCountServantBase { public: StructuredPushSupplier_i(CosNA_StructuredProxyPushConsumer_ptr proxy, CORBA::ULong max_events, const char* objnm, supply_structured_fn* supply_fn, type_change_fn* change_fn, CORBA::ULong millisecs, CORBA::Boolean verbose); static StructuredPushSupplier_i* create(CORBA::ORB_ptr orb, CosNA_EventChannel_ptr channel, CORBA::ULong max_events = 0, CORBA::ULong batch_size = 0, const char* objnm = "StructuredPushSupplier_i", const char* proxy_ior_file = "", supply_structured_fn* supply_fn = sample_supply_structured_fn, type_change_fn* change_fn = sample_subscription_change_fn, CosN_EventTypeSeq* evs_ptr = 0, const char* constraint_expr = "", CORBA::ULong millisecs = 0, CORBA::Boolean verbose = 0); // IDL methods void disconnect_structured_push_supplier(); void subscription_change(const CosN_EventTypeSeq& added, const CosN_EventTypeSeq& deled); // Local methods CORBA::Boolean connect(); CORBA::Boolean wait_done(); void cleanup(); void* start_working(void *); protected: CosNA_StructuredProxyPushConsumer_var _my_proxy; const char* _obj_name; supply_structured_fn* _supply_fn; type_change_fn* _change_fn; CORBA::ULong _num_events; CORBA::ULong _max_events; CORBA::ULong _millisecs; CORBA::Boolean _verbose; CORBA::Boolean _done; CORBA::Boolean _com_err; omni_mutex _oplock; omni_condition _finish; GenericBoundWorkerThread* _worker; }; |
The push supplier classes are very similar in structure to the pull consumer classes -- both use a worker thread to either pull events from the channel or push events to the channel. For this class, events are pushed using the proxy's push_structured_event method. The supplier performs this call in a worker thread, pausing for _millisecs milliseconds between each push call. (A setting of zero means push as fast as possible.) Because this worker thread is constantly communicating with the channel, there is no need for a separate 'ping thread' to monitor the liveness of the proxy.
As with the pull consumer, the key method is the start_working method:
void* StructuredPushSupplier_i::start_working(void *) { CosNC_StructuredPushSupplier_var bump_my_refcount_during_outcalls = _this(); unsigned long t_secs = 0, t_nanosecs = 0; unsigned long d_secs = 0, d_nanosecs = 0; if (_millisecs) { d_secs = _millisecs / 1000; d_nanosecs = (_millisecs % 1000) * 1000000; } CosN_StructuredEvent* data = new CosN_StructuredEvent; _oplock.lock(); if (_verbose && (!_done)) cout << _obj_name << ": Spawned thread entering main push loop" << endl; // invariants: _oplock is held entering top of loop and // also whenever we break out of loop // (it is *not* held during push_structured_event(), yield() and timedwait() calls) while ( 1 ) { if (_done) break; // must have disconnected if ((*_supply_fn)(*data, _obj_name, _num_events+1, _verbose)) { _oplock.unlock(); // do not hold oplock across push try { _my_proxy->push_structured_event(*data); } catch (...) { _oplock.lock(); if (_done) break; // must have disconnected during push if (_verbose) cout << _obj_name << ": communication error while calling push_structured_event()" << endl; _com_err = 1; break; // break from while loop -- done } _oplock.lock(); if (_done) break; // must have disconnected during push _num_events++; if (_max_events && (_num_events >= _max_events)) { if (_verbose) cout << _obj_name << ": DONE [max_events reached]" << endl; break; // done } } // wait or yield then try again if (_millisecs) { // sleep for specified interval omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs); _finish.timedwait(t_secs, t_nanosecs); // releases _oplock during wait } else { // use yield to let other threads have a shot at _oplock _oplock.unlock(); omni_thread::yield(); _oplock.lock(); } // continue pushing } // done delete data; if (!_done) { _done = 1; _finish.broadcast(); } _oplock.unlock(); return 0; } |
start_working enters a while loop, while(1), which loops until a break statement is executed. At the top of the loop, _oplock is always held. Each time through the loop, the lock is released, push_structured_event is called, then the lock is reacquired. As with a ping loop, it is important to release the lock across a call to a remote object so that other methods can execute during this call.
There are three things that cause a break from the while loop: _done has been set to 1 by another method (such as disconnect_structured_push_supplier); the target event count is reached; an error occurs during a push call. For the latter 2 cases, the code following the while loop will set _done to true and signal the _finish condition variable using broadcast. This will wake any thread waiting on that condition (see wait_done). For the case where an error occurs during a push call, _com_err is set to true before executing the break statement.
Prior to performing a push, the _consume_fn is used to obtain the structured event to be pushed to the channel. (The default function, sample_push_structured_fn, provides events of type Stock::Ticker and Bond::Ticker with stock and bond price quotes. See sample_functions.cc.) The function need not return an event; if it does not, the push call is skipped and the loop continues (after yielding or waiting, as discussed below). For each successful push, the method increments the count of the number of events received, _num_events. If _num_events has reached _max_events, a break occurs.
At the bottom of the while loop, if _millisecs is zero then omni_thread::yield() is called, otherwise timewait is called with the specified number of milliseconds (converted to an absolute seconds/nanoseconds time value). The yield is done to ensure that the thread running the start_working method does not 'hog' the processor... it gives other methods that need _oplock a chance to run. (Since start_working releases the lock on each push, the yield call may not be strictly necessary.)
The complete listing of the class implementation is presented below.
StructuredPushSupplier_i:: StructuredPushSupplier_i(CosNA_StructuredProxyPushConsumer_ptr proxy, CORBA::ULong max_events, const char* objnm, supply_structured_fn* supply_fn, type_change_fn* change_fn, CORBA::ULong millisecs, CORBA::Boolean verbose) : _my_proxy(proxy), _obj_name(objnm), _supply_fn(supply_fn), _change_fn(change_fn), _num_events(0), _max_events(max_events), _millisecs(millisecs), _verbose(verbose), _done(0), _com_err(0), _oplock(), _finish(&_oplock), _worker(0) { // providing explict NULL for supply_fn is not OK -- must have a valid function if (!_supply_fn) _supply_fn = sample_supply_structured_fn; } StructuredPushSupplier_i* StructuredPushSupplier_i::create(CORBA::ORB_ptr orb, CosNA_EventChannel_ptr channel, CORBA::ULong max_events, CORBA::ULong batch_size, const char* objnm, const char* proxy_ior_file, supply_structured_fn* supply_fn, type_change_fn* change_fn, CosN_EventTypeSeq* evs_ptr, const char* constraint_expr, CORBA::ULong millisecs, CORBA::Boolean verbose) { // Obtain appropriate proxy object CosNA_ProxyConsumer_var generic_proxy = get_proxy_consumer(channel, CosNA_STRUCTURED_EVENT, 1, verbose); // 1 means push 0 means pull CosNA_StructuredProxyPushConsumer_ptr proxy = CosNA_StructuredProxyPushConsumer::_narrow(generic_proxy); if ( CORBA::is_nil(proxy) ) { return 0; // get_proxy_consumer failed } // If evs or constraint_expr are non-empty, add a filter to proxy if ((evs_ptr) && sample_add_filter(channel, proxy, *evs_ptr, constraint_expr, objnm, verbose)) { return 0; // adding filter failed } // write proxy IOR to file write_ior_to_file(orb, proxy, proxy_ior_file, verbose); // Construct a client StructuredPushSupplier_i* client = new StructuredPushSupplier_i(proxy, max_events, objnm, supply_fn, change_fn, millisecs, verbose); return client; } CORBA::Boolean StructuredPushSupplier_i::connect() { omni_mutex_lock l(_oplock); if (_done) return 0; _com_err = 0; try { _my_proxy->connect_structured_push_supplier(_this()); if (_change_fn) { _my_proxy->obtain_subscription_types(CosNA_NONE_NOW_UPDATES_ON); } else { _my_proxy->obtain_subscription_types(CosNA_NONE_NOW_UPDATES_OFF); } } catch (CORBA::BAD_PARAM& ex) { cerr << _obj_name << ": BAD_PARAM Exception while connecting" << endl; return 1; // error } catch (CosEventChannelAdmin::AlreadyConnected& ex) { cerr << _obj_name << ": Already connected" << endl; return 1; // error } catch (...) { cerr << _obj_name << ": Failed to connect" << endl; return 1; // error } // spawn a thread to do pushing if (_verbose) cout << _obj_name << ": Connected to proxy, ready to supply events" << endl; _worker = new GenericBoundWorkerThread(this); return 0; // OK } void StructuredPushSupplier_i::cleanup() { _oplock.lock(); if (_worker || (!_done)) { cerr << "Coding error: only call c->cleanup() after c->wait_done()" << endl; _oplock.unlock(); return; } // this method takes sole ownership of _my_proxy ref CosNA_StructuredProxyPushConsumer_var proxy = _my_proxy; _my_proxy = CosNA_StructuredProxyPushConsumer::_nil(); // do not hold oplock while invoking disconnect _oplock.unlock(); try { if ((!_com_err) && (!CORBA::is_nil(proxy))) proxy->disconnect_structured_push_consumer(); } catch(...) {} } void* StructuredPushSupplier_i::start_working(void *) { CosNC_StructuredPushSupplier_var bump_my_refcount_during_outcalls = _this(); unsigned long t_secs = 0, t_nanosecs = 0; unsigned long d_secs = 0, d_nanosecs = 0; if (_millisecs) { d_secs = _millisecs / 1000; d_nanosecs = (_millisecs % 1000) * 1000000; } CosN_StructuredEvent* data = new CosN_StructuredEvent; _oplock.lock(); if (_verbose && (!_done)) cout << _obj_name << ": Spawned thread entering main push loop" << endl; // invariants: _oplock is held entering top of loop and // also whenever we break out of loop // (it is *not* held during push_structured_event(), yield() and timedwait() calls) while ( 1 ) { if (_done) break; // must have disconnected if ((*_supply_fn)(*data, _obj_name, _num_events+1, _verbose)) { _oplock.unlock(); // do not hold oplock across push try { _my_proxy->push_structured_event(*data); } catch (...) { _oplock.lock(); if (_done) break; // must have disconnected during push if (_verbose) cout << _obj_name << ": communication error while calling push_structured_event()" << endl; _com_err = 1; break; // break from while loop -- done } _oplock.lock(); if (_done) break; // must have disconnected during push _num_events++; if (_max_events && (_num_events >= _max_events)) { if (_verbose) cout << _obj_name << ": DONE [max_events reached]" << endl; break; // done } } // wait or yield then try again if (_millisecs) { // sleep for specified interval omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs); _finish.timedwait(t_secs, t_nanosecs); // releases _oplock during wait } else { // use yield to let other threads have a shot at _oplock _oplock.unlock(); omni_thread::yield(); _oplock.lock(); } // continue pushing } // done delete data; if (!_done) { _done = 1; _finish.broadcast(); } _oplock.unlock(); return 0; } void StructuredPushSupplier_i::disconnect_structured_push_supplier() { if (_done) return; // see "Optimization Notes" at top omni_mutex_lock l(_oplock); if (_done) return; if (_verbose) cout << _obj_name << ": disconnected" << endl; _done = 1; _finish.broadcast(); } CORBA::Boolean StructuredPushSupplier_i::wait_done() { _oplock.lock(); while (!_done) { _finish.wait(); } // using tmp_worker ensures only one wait_done call tries join() GenericBoundWorkerThread* tmp_worker = _worker; _worker = 0; _oplock.unlock(); if (tmp_worker) { tmp_worker->join(0); } return _com_err; } void StructuredPushSupplier_i::subscription_change(const CosN_EventTypeSeq& added, const CosN_EventTypeSeq& deled) { if (_done) return; // see "Optimization Notes" at top omni_mutex_lock l(_oplock); if (_done) return; if (_verbose) cout << _obj_name << ": subscription_change received" << endl; if (_change_fn) (*_change_fn)(added, deled, _obj_name, _verbose); } |
Back to Documentation Home Page
For comments, feedback, etc, please see the 'Keep in touch' page. |