diff --git a/cpp_test_suite/new_tests/cxx_dserver_misc.cpp b/cpp_test_suite/new_tests/cxx_dserver_misc.cpp index 8cf0bcb3b..6b1dfa6f2 100644 --- a/cpp_test_suite/new_tests/cxx_dserver_misc.cpp +++ b/cpp_test_suite/new_tests/cxx_dserver_misc.cpp @@ -14,6 +14,27 @@ using namespace std; #undef SUITE_NAME #define SUITE_NAME DServerMiscTestSuite +template +struct EventCallback : public Tango::CallBack +{ + EventCallback() + : num_of_all_events(0) + , num_of_error_events(0) + {} + + void push_event(TEvent* event) + { + num_of_all_events++; + if (event->err) + { + num_of_error_events++; + } + } + + int num_of_all_events; + int num_of_error_events; +}; + class DServerMiscTestSuite: public CxxTest::TestSuite { protected: @@ -211,6 +232,69 @@ cout << "str = " << str << endl; TS_ASSERT(dserver->info().server_id == full_ds_name); TS_ASSERT(dserver->info().server_version == server_version); } + + /* Tests that subscriber can receive events immediately after + * a device restart without a need to wait for re-subscription. + */ + void test_event_subscription_recovery_after_device_restart() + { + EventCallback callback{}; + + std::string attribute_name = "event_change_tst"; + + TS_ASSERT_THROWS_NOTHING(device1->subscribe_event( + attribute_name, + Tango::USER_EVENT, + &callback)); + + TS_ASSERT_THROWS_NOTHING(device1->command_inout("IOPushEvent")); + Tango_sleep(2); + TS_ASSERT_EQUALS(2, callback.num_of_all_events); + TS_ASSERT_EQUALS(0, callback.num_of_error_events); + + { + Tango::DeviceData input{}; + input << device1_name; + TS_ASSERT_THROWS_NOTHING(dserver->command_inout("DevRestart", input)); + } + + TS_ASSERT_THROWS_NOTHING(device1->command_inout("IOPushEvent")); + Tango_sleep(2); + TS_ASSERT_EQUALS(3, callback.num_of_all_events); + TS_ASSERT_EQUALS(0, callback.num_of_error_events); + } + + /* Tests that attribute configuration change event + * is sent to all subscribers after device restart. + */ + void test_attr_conf_change_event_after_device_restart() + { + EventCallback callback{}; + + const std::string attribute_name = "event_change_tst"; + + int subscription = 0; + TS_ASSERT_THROWS_NOTHING(subscription = device1->subscribe_event( + attribute_name, + Tango::ATTR_CONF_EVENT, + &callback)); + + Tango_sleep(1); + TS_ASSERT_EQUALS(1, callback.num_of_all_events); + TS_ASSERT_EQUALS(0, callback.num_of_error_events); + + { + Tango::DeviceData input{}; + input << device1_name; + TS_ASSERT_THROWS_NOTHING(dserver->command_inout("DevRestart", input)); + } + + Tango_sleep(1); + TS_ASSERT_EQUALS(2, callback.num_of_all_events); + TS_ASSERT_EQUALS(0, callback.num_of_error_events); + + TS_ASSERT_THROWS_NOTHING(device1->unsubscribe_event(subscription)); + } }; #undef cout #endif // DServerMiscTestSuite_h diff --git a/cppapi/server/device.cpp b/cppapi/server/device.cpp index c408ec0e4..98c2fb62a 100644 --- a/cppapi/server/device.cpp +++ b/cppapi/server/device.cpp @@ -6148,6 +6148,25 @@ void DeviceImpl::get_event_param(vector &eve) } } +void DeviceImpl::get_event_param(vector &eve) +{ + ZmqEventSupplier *event_supplier_zmq = Util::instance()->get_zmq_event_supplier(); + + if (event_supplier_zmq->any_dev_intr_client(this) == true) + { + EventSubscriptionState ep; + + ep.notifd = false; + ep.zmq = true; + ep.attribute_name = ""; + ep.quality = false; + ep.data_ready = false; + ep.dev_intr_change = true; + + eve.push_back(ep); + } +} + //+----------------------------------------------------------------------------------------------------------------- // // method : @@ -6177,6 +6196,21 @@ void DeviceImpl::set_event_param(vector &eve) } } +void DeviceImpl::set_event_param(vector &eve) +{ + for (size_t loop = 0; loop < eve.size(); loop++) + { + if (eve[loop].attribute_name.empty()) + { + if (eve[loop].dev_intr_change == true) + { + set_event_intr_change_subscription(time(NULL)); + } + break; + } + } +} + //+----------------------------------------------------------------------------------------------------------------- // // method : diff --git a/cppapi/server/device.h b/cppapi/server/device.h index 6714be908..d21579325 100644 --- a/cppapi/server/device.h +++ b/cppapi/server/device.h @@ -3414,8 +3414,10 @@ class DeviceImpl : public virtual POA_Tango::Device void disable_intr_change_ev() {intr_change_ev = false;} bool is_intr_change_ev_enable() {return intr_change_ev;} - void get_event_param(vector &); - void set_event_param(vector &); + void get_event_param(vector &); // Deprecated, use EventSubscriptionState overload + void set_event_param(vector &); // Deprecated, use EventSubscriptionState overload + void get_event_param(vector&); + void set_event_param(vector&); void set_client_lib(int _l) {if (count(client_lib.begin(),client_lib.end(),_l)==0)client_lib.push_back(_l);} diff --git a/cppapi/server/dserver.cpp b/cppapi/server/dserver.cpp index 72682e16f..f06008149 100644 --- a/cppapi/server/dserver.cpp +++ b/cppapi/server/dserver.cpp @@ -861,7 +861,7 @@ void DServer::restart(string &d_name) vector &p_obj = dev_to_del->get_poll_obj_list(); vector dev_pol; - vector eve; + vector eve; for (i = 0;i < p_obj.size();i++) { @@ -1098,6 +1098,16 @@ void DServer::restart(string &d_name) event_supplier_zmq->push_dev_intr_change_event(new_dev,false,cmds_list,atts_list); } } + +// Attribute properties may have changed after the restart. +// Push an attribute configuration event to all registered subscribers. + + vector& dev_att_list = new_dev->get_device_attr()->get_attribute_list(); + vector::iterator ite_att; + for (ite_att = dev_att_list.begin(); ite_att != dev_att_list.end() ; ++ite_att) + { + new_dev->push_att_conf_event(*ite_att); + } } //+----------------------------------------------------------------------------------------------------------------- @@ -1172,7 +1182,7 @@ void ServRestartThread::run(void *ptr) // Memorize event parameters and devices interface // - map > map_events; + map > map_events; map map_dev_inter; dev->mem_event_par(map_events); @@ -1204,22 +1214,22 @@ void ServRestartThread::run(void *ptr) dev->set_poll_th_pool_size(DEFAULT_POLLING_THREADS_POOL_SIZE); tg->set_svr_starting(true); - + vector empty_class; tg->set_class_list(&empty_class); - + { AutoPyLock PyLo; dev->init_device(); } - + // // Set the class list pointer in the Util class and add the DServer object class // tg->set_class_list(&(dev->get_class_list())); tg->add_class_to_list(dev->get_device_class()); - + tg->set_svr_starting(false); // @@ -1975,6 +1985,25 @@ void DServer::mem_event_par(map > &_map) } } +void DServer::mem_event_par(map > &_map) +{ + for (size_t i = 0;i < class_list.size();i++) + { + vector &dev_list = class_list[i]->get_device_list(); + for (size_t j = 0;j < dev_list.size();j++) + { + vector eve; + dev_list[j]->get_device_attr()->get_event_param(eve); + dev_list[j]->get_event_param(eve); + + if (eve.size() != 0) + { + _map.insert(make_pair(dev_list[j]->get_name(),eve)); + } + } + } +} + //+----------------------------------------------------------------------------------------------------------------- // // method : @@ -2009,6 +2038,26 @@ void DServer::apply_event_par(map > &_map) } } +void DServer::apply_event_par(map > &_map) +{ + for (size_t i = 0;i < class_list.size();i++) + { + vector &dev_list = class_list[i]->get_device_list(); + for (size_t j = 0;j < dev_list.size();j++) + { + string &dev_name = dev_list[j]->get_name(); + map >::iterator ite; + ite = _map.find(dev_name); + + if (ite != _map.end()) + { + dev_list[j]->get_device_attr()->set_event_param(ite->second); + dev_list[j]->set_event_param(ite->second); + } + } + } +} + //+----------------------------------------------------------------------------------------------------------------- // // method : diff --git a/cppapi/server/dserver.h b/cppapi/server/dserver.h index 510c9b5e2..6165b57bd 100644 --- a/cppapi/server/dserver.h +++ b/cppapi/server/dserver.h @@ -127,8 +127,10 @@ public : void _create_cpp_class(const char *c1,const char *c2) {this->create_cpp_class(c1,c2);} void mcast_event_for_att(string &,string &,vector &); - void mem_event_par(map > &); - void apply_event_par(map > &); + void mem_event_par(map > &); // Deprecated, use EventSubscriptionState overload + void apply_event_par(map > &); // Deprecated, use EventSubscriptionState overload + void mem_event_par(map >&); + void apply_event_par(map >&); void mem_devices_interface(map &); void changed_devices_interface(map &); @@ -158,6 +160,7 @@ protected : static ClassFactoryFuncPtr class_factory_func_ptr; private: + #if ((defined _TG_WINDOWS_) && (defined TANGO_HAS_DLL) && !(defined _TANGO_LIB)) __declspec(dllexport) void class_factory(); #else diff --git a/cppapi/server/multiattribute.cpp b/cppapi/server/multiattribute.cpp index 1de10a070..c5611e6cf 100644 --- a/cppapi/server/multiattribute.cpp +++ b/cppapi/server/multiattribute.cpp @@ -1586,6 +1586,91 @@ void MultiAttribute::get_event_param(vector &eve) } } +void MultiAttribute::get_event_param(vector &eve) +{ + unsigned int i; + + for (i = 0;i < attr_list.size();i++) + { + bool once_more = false; + vector ch; + vector ar; + vector pe; + vector us; + vector ac; + bool dr = false; + bool qu = false; + + if (attr_list[i]->change_event_subscribed() == true) + { + once_more = true; + ch = attr_list[i]->get_client_lib(CHANGE_EVENT); + } + + if (attr_list[i]->quality_event_subscribed() == true) + { + once_more = true; + qu = true; + } + + if (attr_list[i]->periodic_event_subscribed() == true) + { + once_more = true; + pe = attr_list[i]->get_client_lib(PERIODIC_EVENT); + } + + if (attr_list[i]->archive_event_subscribed() == true) + { + once_more = true; + ar = attr_list[i]->get_client_lib(ARCHIVE_EVENT); + } + + if (attr_list[i]->user_event_subscribed() == true) + { + once_more = true; + us = attr_list[i]->get_client_lib(USER_EVENT); + } + + if (attr_list[i]->attr_conf_event_subscribed() == true) + { + once_more = true; + ac = attr_list[i]->get_client_lib(ATTR_CONF_EVENT); + } + + if (attr_list[i]->data_ready_event_subscribed() == true) + { + once_more = true; + dr = true; + } + + if (once_more == true) + { + EventSubscriptionState ep; + + if (attr_list[i]->use_notifd_event() == true) + ep.notifd = true; + else + ep.notifd = false; + + if (attr_list[i]->use_zmq_event() == true) + ep.zmq = true; + else + ep.zmq = false; + + ep.attribute_name = attr_list[i]->get_name();; + ep.change = ch; + ep.quality = qu; + ep.archive = ar; + ep.periodic = pe; + ep.user = us; + ep.att_conf = ac; + ep.data_ready = dr; + + eve.push_back(ep); + } + } +} + //+----------------------------------------------------------------------------------------------------------------- // // method : @@ -1614,32 +1699,128 @@ void MultiAttribute::set_event_param(vector &eve) if (eve[i].change.empty() == false) { + std::string event_name = EventName[CHANGE_EVENT]; for (ite = eve[i].change.begin();ite != eve[i].change.end();++ite) + { att.set_change_event_sub(*ite); + att.set_client_lib(*ite, event_name); + } } if (eve[i].periodic.empty() == false) { + std::string event_name = EventName[PERIODIC_EVENT]; for (ite = eve[i].periodic.begin();ite != eve[i].periodic.end();++ite) + { att.set_periodic_event_sub(*ite); + att.set_client_lib(*ite, event_name); + } } if (eve[i].archive.empty() == false) { + std::string event_name = EventName[ARCHIVE_EVENT]; for (ite = eve[i].archive.begin();ite != eve[i].archive.end();++ite) + { att.set_archive_event_sub(*ite); + att.set_client_lib(*ite, event_name); + } } if (eve[i].att_conf.empty() == false) { + std::string event_name = EventName[ATTR_CONF_EVENT]; for (ite = eve[i].att_conf.begin();ite != eve[i].att_conf.end();++ite) + { att.set_att_conf_event_sub(*ite); + att.set_client_lib(*ite, event_name); + } } if (eve[i].user.empty() == false) { + std::string event_name = EventName[USER_EVENT]; for (ite = eve[i].user.begin();ite != eve[i].user.end();++ite) + { att.set_user_event_sub(*ite); + att.set_client_lib(*ite, event_name); + } + } + + if (eve[i].quality == true) + att.set_quality_event_sub(); + if (eve[i].data_ready == true) + att.set_data_ready_event_sub(); + } + + if (eve[i].notifd == true) + att.set_use_notifd_event(); + if (eve[i].zmq == true) + att.set_use_zmq_event(); + } + } +} + +void MultiAttribute::set_event_param(vector &eve) +{ + for (size_t i = 0;i < eve.size();i++) + { + if (! eve[i].attribute_name.empty()) + { + Tango::Attribute &att = get_attr_by_name(eve[i].attribute_name.c_str()); + + { + omni_mutex_lock oml(EventSupplier::get_event_mutex()); + vector::iterator ite; + + if (eve[i].change.empty() == false) + { + std::string event_name = EventName[CHANGE_EVENT]; + for (ite = eve[i].change.begin();ite != eve[i].change.end();++ite) + { + att.set_change_event_sub(*ite); + att.set_client_lib(*ite, event_name); + } + } + + if (eve[i].periodic.empty() == false) + { + std::string event_name = EventName[PERIODIC_EVENT]; + for (ite = eve[i].periodic.begin();ite != eve[i].periodic.end();++ite) + { + att.set_periodic_event_sub(*ite); + att.set_client_lib(*ite, event_name); + } + } + + if (eve[i].archive.empty() == false) + { + std::string event_name = EventName[ARCHIVE_EVENT]; + for (ite = eve[i].archive.begin();ite != eve[i].archive.end();++ite) + { + att.set_archive_event_sub(*ite); + att.set_client_lib(*ite, event_name); + } + } + + if (eve[i].att_conf.empty() == false) + { + std::string event_name = EventName[ATTR_CONF_EVENT]; + for (ite = eve[i].att_conf.begin();ite != eve[i].att_conf.end();++ite) + { + att.set_att_conf_event_sub(*ite); + att.set_client_lib(*ite, event_name); + } + } + + if (eve[i].user.empty() == false) + { + std::string event_name = EventName[USER_EVENT]; + for (ite = eve[i].user.begin();ite != eve[i].user.end();++ite) + { + att.set_user_event_sub(*ite); + att.set_client_lib(*ite, event_name); + } } if (eve[i].quality == true) diff --git a/cppapi/server/multiattribute.h b/cppapi/server/multiattribute.h index f5657be52..853332a8b 100644 --- a/cppapi/server/multiattribute.h +++ b/cppapi/server/multiattribute.h @@ -61,6 +61,13 @@ struct EventPar bool zmq; }; +// This structure is intended to be used in place of EventPar. Field attribute_name +// replaces attr_id. EventPar is left untouched for backward compatibility with 9.3. +struct EventSubscriptionState : public EventPar +{ + std::string attribute_name; +}; + //============================================================================= // // The MultiAttribute class @@ -283,8 +290,10 @@ class MultiAttribute void remove_attribute(string &,bool); vector &get_w_attr_list() {return writable_attr_list;} bool is_att_quality_alarmed(); - void get_event_param(vector &); - void set_event_param(vector &); + void get_event_param(vector &); // Deprecated, use EventSubscriptionState overload + void set_event_param(vector &); // Deprecated, use EventSubscriptionState overload + void get_event_param(vector &); + void set_event_param(vector &); void add_alarmed_quality_factor(string &); void add_default(vector &,string &,string &,long); void add_attr(Attribute *att);