Free High Performance CORBA Notification Service from AT&T Laboratories |
omniORB Home AT&T Research |
Back to Documentation Home Page
This section goes over the details of the StructuredPushConsumer_i
class
found in sample_clients.h / sample_clients.cc. This class does all the real work of
the example program struct_push_consumer.cc. (As discussed in the Common Code section, the main program creates a
StructuredPushConsumer_i object using the class's create
function, tells the
new client object to connect to its proxy using the connect
method, and then
waits for the desired number of events to be consumed using the wait_done
method.)
Before we begin looking at the code, here are a few things you should know. Clients of a notification channel connect to that channel in 3 steps: (1) acquire a reference to an admin object; (2) from the admin, acquire a reference to a proxy object object; (3) connect to that proxy. Any number of clients can use the same admin object, but there is a 1-to-1 mapping between client and proxy objects (each client has its own proxy). The admin and proxy objects are the client's 2 contact points with the notification channel. The client invokes methods on these objects to carry out such actions as adding or removing filtering constraints. A filter that is added at an admin applies to all clients connected to any proxy acquired from that admin (allowing a set of clients to share the filter). A filter added to a proxy only applies to the 1 client that is connected to that proxy.
The rest of this section covers the class declaration (in sample_clients.h) and the class implementation (in sample_clients.cc).
Here is the beginning of the class declaration:
class StructuredPushConsumer_i : public GenericWorker, public POA_CosNotifyComm::StructuredPushConsumer, public PortableServer::RefCountServantBase |
A push consumer of structured events must implement the CosNotifyComm::StructuredPushConsumer
interface. Thus, class StructuredPushConsumer_i
inherits from the POA
skeleton class POA_CosNotifyComm::StructuredPushConsumer
. It also
inherits from the ``mix-in'' class PortableServer::RefCountServantBase
to
enable automatic object disposal based on reference counting: the POA will destroy the
object when its reference count reaches zero. Finally, the class inherits from class GenericWorker
.
GenericWorker is defined at the top of sample_clients.h. Any class that inherits from
GenericWorker must implement the start_working
method, which will run in its
own thread. As we'll see below, start_working
will be used to monitor that
liveness of the notification channel.
The class has one constructor and one static function called create
. New
instances of the class are normally generated using the create function. This function has
a large number of arguments, used to configure the consumer in various ways. Most of these
parameters have default values; only the first 2 parameters are mandatory. The
declarations of the constructor and static create function:
StructuredPushConsumer_i(CosNA_StructuredProxyPushSupplier_ptr proxy, CORBA::ULong max_events, const char* objnm, consume_structured_fn* consume_fn, type_change_fn* change_fn, CORBA::ULong millisecs, CORBA::Boolean verbose); static StructuredPushConsumer_i* create(CORBA::ORB_ptr orb, CosNA_EventChannel_ptr channel, CORBA::ULong max_events = 0, CORBA::ULong batch_size = 0, const char* objnm = "StructuredPushConsumer_i", const char* proxy_ior_file = "", consume_structured_fn* consume_fn = sample_consume_structured_fn, type_change_fn* change_fn = sample_offer_change_fn, CosN_EventTypeSeq* evs_ptr = 0, const char* constraint_expr = "", CORBA::ULong millisecs = 0, CORBA::Boolean verbose = 0); |
Here is a rundown of the parameters:
orb
: A reference to the ORB (which must be initialized) channel
: A reference to a notification channel max_events
: The number of events to consume (the default
value of 0 means consume 'forever') batch_size
: * not relevant objnm
: A descriptive name for the object -- used in
verbose output messages proxy_ior_file
: A filename used for storing a stringified
representation of the IOR (object reference) of the consumer's proxy consume_fn
: A pointer to a function that will
"consume" one structured event at a time. The default function is called sample_consume_structured_fn
and can be found in sample_functions.cc. By using an external function, we have separated
out the actual processing of incoming events from the program logic that is used to do all
the dirty work of setting up a push consumer connection, monitoring the connection, and
shutting down the connection. We review the sample consume function later. change_fn
: A pointer to a function that handles any offer_change
messages sent to the client. The default function is called sample_offer_change_fn
and can be found in sample_functions.cc. Again, we decided to separate out this processing
from the rest of the code. ( sample_offer_change_fn
simply prints out a
description of the added and deleted event types that were sent in the offer_change
message. In some cases it makes sense for offer_change messages to cause a consumer to
change the set of event types that it subscribes to, but our client does not do this.)evs_ptr
and constraint_expr
: these two parameters are used to (optionally) add a filter to the consumer's proxy.
evs_ptr is a pointer to an event type sequence, and constraint_expr is a pointer to a
string containing a constraint expression. If these arguments are NULL, no filter is
added. (This is the default.) millisecs
: The number of millisecs between each 'ping'
sent to the client's proxy by a special background thread which monitors the liveness of
this proxy. If the value is zero, no background thread is used (no pinging is done). verbose
: If set to true, descriptive messages are output
tracking the progress of the consumer. In addition, the verbose flag is passed to the
consume function, and controls whether some output is generated for each event that is
pushed to the consumer. * batch_size
is passed to all create functions so that there is a
consistent interface across all 12 classes in sample_clients.h, but it is only used by the
push and pull batch supplier classes.
Next, there are declarations of two kinds of methods.
// IDL methods void push_structured_event(const CosN_StructuredEvent & data); void disconnect_structured_push_consumer(); void offer_change(const CosN_EventTypeSeq& added, const CosN_EventTypeSeq& deled); |
The IDL Methods are the methods required by any class implementing CosNotifyComm::StructuredPushConsumer
;
these methods are invoked remotely by the notification channel.
push_structured_event
: used to push events to the
consumer.disconnect_structured_push_consumer
: used to inform the
consumer it is no longer connected (not used).offer_change
: used to inform the consumer of changes in
the set of event types offered across all the suppliers connected to the channel.// Local methods CORBA::Boolean connect(); CORBA::Boolean wait_done(); void cleanup(); void* start_working(void *); |
The ``local methods'' are additional methods that are only invoked locally on the implementation object. The first 3 methods are invoked by the main program after creating a new consumer object.
connect
: Causes the consumer to connect to its proxy; it
will start receiving events. Returns a boolean indicating whether an immediate error
occured (true means there was an error).wait_done
: Waits until a done condition is reached, then
returns. Returns a boolean indicating whether there was a communication error during
a ping attempt (true means there was an error). There are 3 done conditions:
the max_events
goal specified in the create call is reached; a communication
error occurs while 'pinging' the proxy; the disconnect_structured_push_consumer
method is called. cleanup
: Causes the consumer to disconnect from the
channel (as long as a communications error has not occurred and the disconnect method has
not been called). This method should only be called after calling wait_done
.start_working
: This method runs in a separate worker
thread. If the millisecs
argument to create was non-zero, then the
connect method spawns a worker thread after successfully connecting the consumer to its
proxy. This method periodically 'pings' the proxy, and if a ping request fails it
triggers the done condition.Finally, the class has some private state:
protected: CosNA_StructuredProxyPushSupplier_var _my_proxy; const char* _obj_name; consume_structured_fn* _consume_fn; type_change_fn* _change_fn; CORBA::ULong _num_events; CORBA::ULong _max_events; CORBA::ULong _millisecs; CORBA::Boolean _verbose; CORBA::Boolean _done; CORBA::Boolean _com_err; omni_mutex _oplock; omni_condition _finish; GenericBoundWorkerThread* _worker; }; |
The members _obj_name, _consume_fn, _change_fn, _max_events, _millisecs, and _verbose simply keep track of configuration parameters passed to the constructor. The member _my_proxy is a reference to the proxy object obtained from the channel (via an admin object -- a reference to the admin object is not retained because this client does not do anything further with the admin after obtaining _my_proxy). The _done flag tracks the done condition discussed above, while _com_err tracks whether the ping loop has detected a communications error. The mutex _oplock is used to ensure that only one method at a time is reading or modifying these members, while the condition variable _finish is used to wait for _done to become true. The _worker member keeps track of the spawned worker thread.
Here is the complete class declaration:
class StructuredPushConsumer_i : public GenericWorker, public POA_CosNotifyComm::StructuredPushConsumer, public PortableServer::RefCountServantBase { public: StructuredPushConsumer_i(CosNA_StructuredProxyPushSupplier_ptr proxy, CORBA::ULong max_events, const char* objnm, consume_structured_fn* consume_fn, type_change_fn* change_fn, CORBA::ULong millisecs, CORBA::Boolean verbose); static StructuredPushConsumer_i* create(CORBA::ORB_ptr orb, CosNA_EventChannel_ptr channel, CORBA::ULong max_events = 0, CORBA::ULong batch_size = 0, const char* objnm = "StructuredPushConsumer_i", const char* proxy_ior_file = "", consume_structured_fn* consume_fn = sample_consume_structured_fn, type_change_fn* change_fn = sample_offer_change_fn, CosN_EventTypeSeq* evs_ptr = 0, const char* constraint_expr = "", CORBA::ULong millisecs = 0, CORBA::Boolean verbose = 0); // IDL methods void push_structured_event(const CosN_StructuredEvent & data); void disconnect_structured_push_consumer(); void offer_change(const CosN_EventTypeSeq& added, const CosN_EventTypeSeq& deled); // Local methods CORBA::Boolean connect(); CORBA::Boolean wait_done(); void cleanup(); void* start_working(void *); protected: CosNA_StructuredProxyPushSupplier_var _my_proxy; const char* _obj_name; consume_structured_fn* _consume_fn; type_change_fn* _change_fn; CORBA::ULong _num_events; CORBA::ULong _max_events; CORBA::ULong _millisecs; CORBA::Boolean _verbose; CORBA::Boolean _done; CORBA::Boolean _com_err; omni_mutex _oplock; omni_condition _finish; GenericBoundWorkerThread* _worker; }; |
A number of methods have the following form:
void method_name (...) { if (_done) return; ... omni_mutex_lock l(_oplock); if (_done) return; ... } |
The _done check prior to obtaining the lock is an optimization; it could be removed and the code would still be correct. It is useful because it avoids unnecessary synchronization during shutdown/cleanup of the client.
Because _done can change while the lock is not held, any time the code first acquires (or releases then re-acquires) _oplock, it must do another _done check. So the 2nd check, which looks redundant, is actually required!
The constructor for StructuredPushConsumer_i simply initializes all of the private members:
StructuredPushConsumer_i:: StructuredPushConsumer_i(CosNA_StructuredProxyPushSupplier_ptr proxy, CORBA::ULong max_events, const char* objnm, consume_structured_fn* consume_fn, type_change_fn* change_fn, CORBA::ULong millisecs, CORBA::Boolean verbose) : _my_proxy(proxy), _obj_name(objnm), _consume_fn(consume_fn), _change_fn(change_fn), _num_events(0), _max_events(max_events), _millisecs(millisecs), _verbose(verbose), _done(0), _com_err(0), _oplock(), _finish(&_oplock), _worker(0) { // nothing else to do } |
The only non-obvious initialization is that the _finish condition variable is initailized with a pointer to the _oplock mutex -- a condition variable must always be associated with a mutex, and a method must hold the mutex before waiting on the mutex. (See the omnithread documentation for the full mutex and condition variable APIs.)
The static create function does four things.
StructuredPushConsumer_i* StructuredPushConsumer_i::create(CORBA::ORB_ptr orb, CosNA_EventChannel_ptr channel, CORBA::ULong max_events, CORBA::ULong batch_size, const char* objnm, const char* proxy_ior_file, consume_structured_fn* consume_fn, type_change_fn* change_fn, CosN_EventTypeSeq* evs_ptr, const char* constraint_expr, CORBA::ULong millisecs, CORBA::Boolean verbose) { // (1) Obtain appropriate proxy object CosNA_ProxySupplier_var generic_proxy = get_proxy_supplier(channel, CosNA_STRUCTURED_EVENT, 1, verbose); // 1 is push 0 is pull CosNA_StructuredProxyPushSupplier_ptr proxy = CosNA_StructuredProxyPushSupplier::_narrow(generic_proxy); if ( CORBA::is_nil(proxy) ) { return 0; // get_proxy_supplier failed } // (2) If evs or constraint_expr are non-empty, add a filter to proxy if ((evs_ptr) && sample_add_filter(channel, proxy, *evs_ptr, constraint_expr, objnm, verbose)) { return 0; // adding filter failed } // (3) write proxy IOR to file write_ior_to_file(orb, proxy, proxy_ior_file, verbose); // (4) Construct a client StructuredPushConsumer_i* client = new StructuredPushConsumer_i(proxy, max_events, objnm, consume_fn, change_fn, millisecs, verbose); return client; } |
(1) It obtains an appropriate proxy object using the helper function get_proxy_supplier. The returned proxy reference is a generic proxy supplier; it is narrowed to a CosNotifyChannelAdmin::StructuredProxyPushSupplier. Note that consumers connect to proxy suppliers, while suppliers connect to proxy consumers. Thus our structured push consumer client must connect to a structured proxy push supplier.
(2) If a list of event types or a constraint expression is specified, the helper function sample_add_filter is called. This helper function constructs a filter with one constraint that captures the required event types and the constraint expression and adds the filter to the consumer's proxy.
(3) The helper routine write_ior_to_file is called with the proxy_ior_file. This string may be an empty string, in which case nothing is done, otherwise the proxy's object reference is stringified and written to the specified file.
(4) A new StructuredPushConsumer_i object is created with appropriate parameters as passed in to the create function, and a reference to this object is returned by create.
The connect method connects the client to the proxy that was obtained during the create call. Once a push consumer is connected, the channel will begin pushing events to it, so this is the step that starts the consuming of events. After connecting, the obtain_offered_types method is invoked on the proxy with one of two arguments: CosNA_NONE_NOW_UPDATES_ON enables offer_change messages, while CosNA_NONE_NOW_UPDATES_OFF disables offer_change messages. The messages are only enabled if a non-NULL offer_change function was provided to create (as stored in _change_fn). Finally, if a non-zero millisecs value was provided to create, then a new worker thread is created -- the thread constructor immediately executes the method start_worker (discussed next).
CORBA::Boolean StructuredPushConsumer_i::connect() { omni_mutex_lock l(_oplock); if (_done) return 0; _com_err = 0; try { _my_proxy->connect_structured_push_consumer(_this()); if (_change_fn) { _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_ON); } else { _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_OFF); } } catch (CORBA::BAD_PARAM& ex) { cerr << _obj_name << ": BAD_PARAM Exception while connecting" << endl; return 1; // error } catch (CosEventChannelAdmin::AlreadyConnected& ex) { cerr << _obj_name << ": Already connected" << endl; return 1; // error } catch (...) { cerr << _obj_name << ": Failed to connect" << endl; return 1; // error } if (_verbose) cout << _obj_name << ": Connected to proxy, ready to consume events" << endl; // if _millisecs is set, spawn a thread to ping the proxy if (_millisecs) _worker = new GenericBoundWorkerThread(this); return 0; // OK } |
The start_working method performs one call to the proxy every _millisecs milliseconds. The only purpose of the call is check whether the proxy is still responding to messages. (The MyType() method is used, where the return value is ignored.) If one of these 'ping' calls fails, then the _done flag is set to true to indicate a done condition has been reached, the _com_err flag is set to true to indicate a communication error occurred, and the _finish condition variable is signalled (using broadcast) to wake up any thread that is waiting on this condition variable. (See the wait_done method below.) The milliseconds value must be converted to seconds and nanoseconds -- d_secs and d_nanosecs hold this conversion. The call
omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs);
stores a time that is d_secs seconds and d_nanosecs nanoseconds later than the current clock time into the variables t_secs and t_nanosecs, and these values are then used in the timed_wait call, which takes an absolute clock time. Note that the method terminates (which terminates the worker thread) as soon as it detects that _done is true.
An important part of the start_working code is its lock management paradigm. It releases _oplock before invoking the 'ping' on the proxy, and reacquires _oplock so that it can safely check and update the _done flag, _com_err flag, and so on. Its call to timed_wait also implicitly releases _oplock during the wait period and reacquires the lock upon return. As a general rule, an object should not hold a critical lock (such as a lock that controls access to all of its private state) during a call to a remove object (such as the proxy). Remote calls can take a long time, and no other work will occur across a remote call if such a critical lock is held across the call.
void* StructuredPushConsumer_i::start_working(void *) { CosNC_StructuredPushConsumer_var bump_my_refcount_during_outcalls = _this(); _oplock.lock(); if (_verbose && (!_done) && _millisecs) cout << _obj_name << ": Spawned thread entering ping loop" << endl; // invariant: _oplock held at top of loop while ( 1 ) { if (_done || (_millisecs == 0)) break; unsigned long t_secs = 0, t_nanosecs = 0; unsigned long d_secs = 0, d_nanosecs = 0; d_secs = _millisecs / 1000; d_nanosecs = (_millisecs % 1000) * 1000000; _oplock.unlock(); // do not hold oplock across ping try { _my_proxy->MyType(); } catch (...) { _oplock.lock(); if (_done) break; if (_verbose) cout << _obj_name << ": communication error while pinging proxy" << endl; _done = 1; _finish.broadcast(); _com_err = 1; break; // break from while loop -- done } _oplock.lock(); if (_done) break; // must have disconnected during pull // sleep for specified interval omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs); _finish.timedwait(t_secs, t_nanosecs); // releases _oplock during wait // continue ping loop } // done _oplock.unlock(); return 0; } |
The push_structured_event method is called by the channel for each structured event to be consumed. This method increments the count of the number of events received, _num_events. If a consume function has been specified, it is invoked. The default consume function for a structured push consumer is called sample_consume_structured_fn and can be found in sample_functions.cc. If _num_events has reached _max_events, the specified target event count, then _done is set to true and the condition variable _finish is signalled using broadcast. This will wake any thread waiting on that condition (see wait_done).
void StructuredPushConsumer_i::push_structured_event(const CosN_StructuredEvent& data) { if (_done) return; // see "Optimization Notes" at top omni_mutex_lock l(_oplock); if (_done) return; _num_events++; if (_consume_fn) (*_consume_fn)(data, _obj_name, _num_events, _verbose); else if (_verbose) cout << _obj_name << ": event count = " << _num_events << endl; if (_max_events && (_num_events >= _max_events)) { if (_verbose) cout << _obj_name << ": DONE [max_events reached]" << endl; _done = 1; _finish.broadcast(); } } |
The disconnect_structured_push_consumer method is not used (it is not called by the channel). An implementation must be provided, however, because it is part of the API for any StructuredPushConsumer. In this code below, the _done condition is set and the condition variable _finish is signalled using broadcast.
void StructuredPushConsumer_i::disconnect_structured_push_consumer() { if (_done) return; // see "Optimization Notes" at top omni_mutex_lock l(_oplock); if (_done) return; if (_verbose) cout << _obj_name << ": disconnected" << endl; _done = 1; _finish.broadcast(); } |
The wait_done method waits for the _done flag to become true (if it is not already true) by waiting on the _finsih condition variable. Three methods above can set _done to true, and each signals _finish once _done is true. After _done is true, wait_done calls 'join' on the worker thread; this call returns once the worker thread terminates (the worker thread will terminate as soon as it detects _done is true). wait_done returns _com_err, which will only be true if an exception occurred when 'pinging' the proxy.
Note that a local variable, tmp_worker, is used to do the join after releasing the lock; it is important not to hold the lock while waiting for the worker, since the worker may need to acquire teh lock to terminate. _worker is set to NULL before releasing the lock so that only one call to _wait_done will result in a join call (thus the need to introduce a temporary variable to hold the value of _worker before changing its value to NULL).
CORBA::Boolean StructuredPushConsumer_i::wait_done() { _oplock.lock(); while (!_done) { _finish.wait(); } // using tmp_worker ensures only one wait_done call tries join() GenericBoundWorkerThread* tmp_worker = _worker; _worker = 0; _oplock.unlock(); if (tmp_worker) { tmp_worker->join(0); } return _com_err; } |
The offer_change method calls the change function that was passed to create, passing in the added and deleted event type sequences. (If a NULL pointer was used for the function pointer, no call is made.) Note that offer_change, like push_structured_event, does nothing if the _done flag has been set to true.
void StructuredPushConsumer_i::offer_change(const CosN_EventTypeSeq& added, const CosN_EventTypeSeq& deled) { if (_done) return; // see "Optimization Notes" at top omni_mutex_lock l(_oplock); if (_done) return; if (_verbose) cout << _obj_name << ": offer_change received" << endl; if (_change_fn) (*_change_fn)(added, deled, _obj_name, _verbose); } |
The cleanup method returns immediately with an error output if either _worker is non-NULL or _done is false; both of these tests indicate that wait_done has not been called (ntoe that wait_done sets _worker to NULL). Otherwise, it sets a local variable, proxy, to the value of _my_proxy, and sets _my_proxy to be a nil reference. It then releases the lock and calls the method disconnect_structured_push_supplier on the proxy reference. By setting _my_proxy to a nil reference, it ensures that only one call to cleanup will result in a disconnect call on the proxy. Also note that the disconnect is not performed if a 'ping' resulted in an error (on the assumption that the proxy is no longer valid). As with the ping call, cleanup follows the general rule that _oplock should not be held during a call to a remote object (the proxy).
void StructuredPushConsumer_i::cleanup() { _oplock.lock(); if (_worker || (!_done)) { cerr << "Coding error: only call c->cleanup() after c->wait_done()" << endl; _oplock.unlock(); return; } // this method takes sole ownership of _my_proxy ref CosNA_StructuredProxyPushSupplier_var proxy = _my_proxy; _my_proxy = CosNA_StructuredProxyPushSupplier::_nil(); // do not hold oplock while invoking disconnect _oplock.unlock(); try { if ((!_com_err) && (!CORBA::is_nil(proxy))) proxy->disconnect_structured_push_supplier(); } catch(...) {} } |
Here is the complete listing for the implementation of StructuredPushConsumer_i.
StructuredPushConsumer_i:: StructuredPushConsumer_i(CosNA_StructuredProxyPushSupplier_ptr proxy, CORBA::ULong max_events, const char* objnm, consume_structured_fn* consume_fn, type_change_fn* change_fn, CORBA::ULong millisecs, CORBA::Boolean verbose) : _my_proxy(proxy), _obj_name(objnm), _consume_fn(consume_fn), _change_fn(change_fn), _num_events(0), _max_events(max_events), _millisecs(millisecs), _verbose(verbose), _done(0), _com_err(0), _oplock(), _finish(&_oplock), _worker(0) { // nothing else to do } StructuredPushConsumer_i* StructuredPushConsumer_i::create(CORBA::ORB_ptr orb, CosNA_EventChannel_ptr channel, CORBA::ULong max_events, CORBA::ULong batch_size, const char* objnm, const char* proxy_ior_file, consume_structured_fn* consume_fn, type_change_fn* change_fn, CosN_EventTypeSeq* evs_ptr, const char* constraint_expr, CORBA::ULong millisecs, CORBA::Boolean verbose) { // Obtain appropriate proxy object CosNA_ProxySupplier_var generic_proxy = get_proxy_supplier(channel, CosNA_STRUCTURED_EVENT, 1, verbose); // 1 is push 0 is pull CosNA_StructuredProxyPushSupplier_ptr proxy = CosNA_StructuredProxyPushSupplier::_narrow(generic_proxy); if ( CORBA::is_nil(proxy) ) { return 0; // get_proxy_supplier failed } // If evs or constraint_expr are non-empty, add a filter to proxy if ((evs_ptr) && sample_add_filter(channel, proxy, *evs_ptr, constraint_expr, objnm, verbose)) { return 0; // adding filter failed } // write proxy IOR to file write_ior_to_file(orb, proxy, proxy_ior_file, verbose); // Construct a client StructuredPushConsumer_i* client = new StructuredPushConsumer_i(proxy, max_events, objnm, consume_fn, change_fn, millisecs, verbose); return client; } CORBA::Boolean StructuredPushConsumer_i::connect() { omni_mutex_lock l(_oplock); if (_done) return 0; _com_err = 0; try { _my_proxy->connect_structured_push_consumer(_this()); if (_change_fn) { _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_ON); } else { _my_proxy->obtain_offered_types(CosNA_NONE_NOW_UPDATES_OFF); } } catch (CORBA::BAD_PARAM& ex) { cerr << _obj_name << ": BAD_PARAM Exception while connecting" << endl; return 1; // error } catch (CosEventChannelAdmin::AlreadyConnected& ex) { cerr << _obj_name << ": Already connected" << endl; return 1; // error } catch (...) { cerr << _obj_name << ": Failed to connect" << endl; return 1; // error } if (_verbose) cout << _obj_name << ": Connected to proxy, ready to consume events" << endl; // if _millisecs is set, spawn a thread to ping the proxy if (_millisecs) _worker = new GenericBoundWorkerThread(this); return 0; // OK } // While _millisecs is positive, pings proxy every _millisecs milliseconds. // Sets error code and sets _done to 1 if there is an error during ping. void* StructuredPushConsumer_i::start_working(void *) { CosNC_StructuredPushConsumer_var bump_my_refcount_during_outcalls = _this(); _oplock.lock(); if (_verbose && (!_done) && _millisecs) cout << _obj_name << ": Spawned thread entering ping loop" << endl; // invariant: _oplock held at top of loop while ( 1 ) { if (_done || (_millisecs == 0)) break; unsigned long t_secs = 0, t_nanosecs = 0; unsigned long d_secs = 0, d_nanosecs = 0; d_secs = _millisecs / 1000; d_nanosecs = (_millisecs % 1000) * 1000000; _oplock.unlock(); // do not hold oplock across ping try { _my_proxy->MyType(); } catch (...) { _oplock.lock(); if (_done) break; if (_verbose) cout << _obj_name << ": communication error while pinging proxy" << endl; _done = 1; _finish.broadcast(); _com_err = 1; break; // break from while loop -- done } _oplock.lock(); if (_done) break; // must have disconnected during pull // sleep for specified interval omni_thread::get_time(&t_secs, &t_nanosecs, d_secs, d_nanosecs); _finish.timedwait(t_secs, t_nanosecs); // this release _oplock during wait // continue ping loop } // done _oplock.unlock(); return 0; } void StructuredPushConsumer_i::push_structured_event(const CosN_StructuredEvent& data) { if (_done) return; // see "Optimization Notes" at top omni_mutex_lock l(_oplock); if (_done) return; _num_events++; if (_consume_fn) (*_consume_fn)(data, _obj_name, _num_events, _verbose); else if (_verbose) cout << _obj_name << ": event count = " << _num_events << endl; if (_max_events && (_num_events >= _max_events)) { if (_verbose) cout << _obj_name << ": DONE [max_events reached]" << endl; _done = 1; _finish.broadcast(); } } void StructuredPushConsumer_i::disconnect_structured_push_consumer() { if (_done) return; // see "Optimization Notes" at top omni_mutex_lock l(_oplock); if (_done) return; if (_verbose) cout << _obj_name << ": disconnected" << endl; _done = 1; _finish.broadcast(); } CORBA::Boolean StructuredPushConsumer_i::wait_done() { _oplock.lock(); while (!_done) { _finish.wait(); } // using tmp_worker ensures only one wait_done call tries join() GenericBoundWorkerThread* tmp_worker = _worker; _worker = 0; _oplock.unlock(); if (tmp_worker) { tmp_worker->join(0); } return _com_err; } void StructuredPushConsumer_i::offer_change(const CosN_EventTypeSeq& added, const CosN_EventTypeSeq& deled) { if (_done) return; // see "Optimization Notes" at top omni_mutex_lock l(_oplock); if (_done) return; if (_verbose) cout << _obj_name << ": offer_change received" << endl; if (_change_fn) (*_change_fn)(added, deled, _obj_name, _verbose); } void StructuredPushConsumer_i::cleanup() { _oplock.lock(); if (_worker || (!_done)) { cerr << "Coding error: only call c->cleanup() after c->wait_done()" << endl; _oplock.unlock(); return; } // this method takes sole ownership of _my_proxy ref CosNA_StructuredProxyPushSupplier_var proxy = _my_proxy; _my_proxy = CosNA_StructuredProxyPushSupplier::_nil(); // do not hold oplock while invoking disconnect _oplock.unlock(); try { if ((!_com_err) && (!CORBA::is_nil(proxy))) proxy->disconnect_structured_push_supplier(); } catch(...) {} } |
Back to Documentation Home Page
For comments, feedback, etc, please see the 'Keep in touch' page. |