Skip to content
This repository has been archived by the owner on Jul 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #702 from mliszcz/backport-fix-496-blind-event-cli…
Browse files Browse the repository at this point in the history
…ents-after-dev-restart

Backport fix for #496 and #361 - event clients lost after dev restart
  • Loading branch information
t-b authored May 28, 2020
2 parents 4f8a0ba + 6a0ee49 commit 85057ac
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 12 deletions.
84 changes: 84 additions & 0 deletions cpp_test_suite/new_tests/cxx_dserver_misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,27 @@ using namespace std;
#undef SUITE_NAME
#define SUITE_NAME DServerMiscTestSuite

template <typename TEvent>
struct EventCallback : public Tango::CallBack
{
EventCallback()
: num_of_all_events(0)
, num_of_error_events(0)
{}

void push_event(TEvent* event)
{
num_of_all_events++;
if (event->err)
{
num_of_error_events++;
}
}

int num_of_all_events;
int num_of_error_events;
};

class DServerMiscTestSuite: public CxxTest::TestSuite
{
protected:
Expand Down Expand Up @@ -211,6 +232,69 @@ cout << "str = " << str << endl;
TS_ASSERT(dserver->info().server_id == full_ds_name);
TS_ASSERT(dserver->info().server_version == server_version);
}

/* Tests that subscriber can receive events immediately after
* a device restart without a need to wait for re-subscription.
*/
void test_event_subscription_recovery_after_device_restart()
{
EventCallback<Tango::EventData> callback{};

std::string attribute_name = "event_change_tst";

TS_ASSERT_THROWS_NOTHING(device1->subscribe_event(
attribute_name,
Tango::USER_EVENT,
&callback));

TS_ASSERT_THROWS_NOTHING(device1->command_inout("IOPushEvent"));
Tango_sleep(2);
TS_ASSERT_EQUALS(2, callback.num_of_all_events);
TS_ASSERT_EQUALS(0, callback.num_of_error_events);

{
Tango::DeviceData input{};
input << device1_name;
TS_ASSERT_THROWS_NOTHING(dserver->command_inout("DevRestart", input));
}

TS_ASSERT_THROWS_NOTHING(device1->command_inout("IOPushEvent"));
Tango_sleep(2);
TS_ASSERT_EQUALS(3, callback.num_of_all_events);
TS_ASSERT_EQUALS(0, callback.num_of_error_events);
}

/* Tests that attribute configuration change event
* is sent to all subscribers after device restart.
*/
void test_attr_conf_change_event_after_device_restart()
{
EventCallback<Tango::AttrConfEventData> callback{};

const std::string attribute_name = "event_change_tst";

int subscription = 0;
TS_ASSERT_THROWS_NOTHING(subscription = device1->subscribe_event(
attribute_name,
Tango::ATTR_CONF_EVENT,
&callback));

Tango_sleep(1);
TS_ASSERT_EQUALS(1, callback.num_of_all_events);
TS_ASSERT_EQUALS(0, callback.num_of_error_events);

{
Tango::DeviceData input{};
input << device1_name;
TS_ASSERT_THROWS_NOTHING(dserver->command_inout("DevRestart", input));
}

Tango_sleep(1);
TS_ASSERT_EQUALS(2, callback.num_of_all_events);
TS_ASSERT_EQUALS(0, callback.num_of_error_events);

TS_ASSERT_THROWS_NOTHING(device1->unsubscribe_event(subscription));
}
};
#undef cout
#endif // DServerMiscTestSuite_h
34 changes: 34 additions & 0 deletions cppapi/server/device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6148,6 +6148,25 @@ void DeviceImpl::get_event_param(vector<EventPar> &eve)
}
}

void DeviceImpl::get_event_param(vector<EventSubscriptionState> &eve)
{
ZmqEventSupplier *event_supplier_zmq = Util::instance()->get_zmq_event_supplier();

if (event_supplier_zmq->any_dev_intr_client(this) == true)
{
EventSubscriptionState ep;

ep.notifd = false;
ep.zmq = true;
ep.attribute_name = "";
ep.quality = false;
ep.data_ready = false;
ep.dev_intr_change = true;

eve.push_back(ep);
}
}

//+-----------------------------------------------------------------------------------------------------------------
//
// method :
Expand Down Expand Up @@ -6177,6 +6196,21 @@ void DeviceImpl::set_event_param(vector<EventPar> &eve)
}
}

void DeviceImpl::set_event_param(vector<EventSubscriptionState> &eve)
{
for (size_t loop = 0; loop < eve.size(); loop++)
{
if (eve[loop].attribute_name.empty())
{
if (eve[loop].dev_intr_change == true)
{
set_event_intr_change_subscription(time(NULL));
}
break;
}
}
}

//+-----------------------------------------------------------------------------------------------------------------
//
// method :
Expand Down
6 changes: 4 additions & 2 deletions cppapi/server/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -3414,8 +3414,10 @@ class DeviceImpl : public virtual POA_Tango::Device
void disable_intr_change_ev() {intr_change_ev = false;}
bool is_intr_change_ev_enable() {return intr_change_ev;}

void get_event_param(vector<EventPar> &);
void set_event_param(vector<EventPar> &);
void get_event_param(vector<EventPar> &); // Deprecated, use EventSubscriptionState overload
void set_event_param(vector<EventPar> &); // Deprecated, use EventSubscriptionState overload
void get_event_param(vector<EventSubscriptionState>&);
void set_event_param(vector<EventSubscriptionState>&);

void set_client_lib(int _l) {if (count(client_lib.begin(),client_lib.end(),_l)==0)client_lib.push_back(_l);}

Expand Down
61 changes: 55 additions & 6 deletions cppapi/server/dserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ void DServer::restart(string &d_name)

vector<PollObj *> &p_obj = dev_to_del->get_poll_obj_list();
vector<Pol> dev_pol;
vector<EventPar> eve;
vector<EventSubscriptionState> eve;

for (i = 0;i < p_obj.size();i++)
{
Expand Down Expand Up @@ -1098,6 +1098,16 @@ void DServer::restart(string &d_name)
event_supplier_zmq->push_dev_intr_change_event(new_dev,false,cmds_list,atts_list);
}
}

// Attribute properties may have changed after the restart.
// Push an attribute configuration event to all registered subscribers.

vector<Tango::Attribute*>& dev_att_list = new_dev->get_device_attr()->get_attribute_list();
vector<Tango::Attribute*>::iterator ite_att;
for (ite_att = dev_att_list.begin(); ite_att != dev_att_list.end() ; ++ite_att)
{
new_dev->push_att_conf_event(*ite_att);
}
}

//+-----------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1172,7 +1182,7 @@ void ServRestartThread::run(void *ptr)
// Memorize event parameters and devices interface
//

map<string,vector<EventPar> > map_events;
map<string,vector<EventSubscriptionState> > map_events;
map<string,DevIntr> map_dev_inter;

dev->mem_event_par(map_events);
Expand Down Expand Up @@ -1204,22 +1214,22 @@ void ServRestartThread::run(void *ptr)
dev->set_poll_th_pool_size(DEFAULT_POLLING_THREADS_POOL_SIZE);

tg->set_svr_starting(true);

vector<DeviceClass *> empty_class;
tg->set_class_list(&empty_class);

{
AutoPyLock PyLo;
dev->init_device();
}

//
// Set the class list pointer in the Util class and add the DServer object class
//

tg->set_class_list(&(dev->get_class_list()));
tg->add_class_to_list(dev->get_device_class());

tg->set_svr_starting(false);

//
Expand Down Expand Up @@ -1975,6 +1985,25 @@ void DServer::mem_event_par(map<string,vector<EventPar> > &_map)
}
}

void DServer::mem_event_par(map<string,vector<EventSubscriptionState> > &_map)
{
for (size_t i = 0;i < class_list.size();i++)
{
vector<DeviceImpl *> &dev_list = class_list[i]->get_device_list();
for (size_t j = 0;j < dev_list.size();j++)
{
vector<EventSubscriptionState> eve;
dev_list[j]->get_device_attr()->get_event_param(eve);
dev_list[j]->get_event_param(eve);

if (eve.size() != 0)
{
_map.insert(make_pair(dev_list[j]->get_name(),eve));
}
}
}
}

//+-----------------------------------------------------------------------------------------------------------------
//
// method :
Expand Down Expand Up @@ -2009,6 +2038,26 @@ void DServer::apply_event_par(map<string,vector<EventPar> > &_map)
}
}

void DServer::apply_event_par(map<string,vector<EventSubscriptionState> > &_map)
{
for (size_t i = 0;i < class_list.size();i++)
{
vector<DeviceImpl *> &dev_list = class_list[i]->get_device_list();
for (size_t j = 0;j < dev_list.size();j++)
{
string &dev_name = dev_list[j]->get_name();
map<string,vector<EventSubscriptionState> >::iterator ite;
ite = _map.find(dev_name);

if (ite != _map.end())
{
dev_list[j]->get_device_attr()->set_event_param(ite->second);
dev_list[j]->set_event_param(ite->second);
}
}
}
}

//+-----------------------------------------------------------------------------------------------------------------
//
// method :
Expand Down
7 changes: 5 additions & 2 deletions cppapi/server/dserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ public :
void _create_cpp_class(const char *c1,const char *c2) {this->create_cpp_class(c1,c2);}

void mcast_event_for_att(string &,string &,vector<string> &);
void mem_event_par(map<string, vector<EventPar> > &);
void apply_event_par(map<string,vector<EventPar> > &);
void mem_event_par(map<string, vector<EventPar> > &); // Deprecated, use EventSubscriptionState overload
void apply_event_par(map<string,vector<EventPar> > &); // Deprecated, use EventSubscriptionState overload
void mem_event_par(map<string, vector<EventSubscriptionState> >&);
void apply_event_par(map<string,vector<EventSubscriptionState> >&);

void mem_devices_interface(map<string,DevIntr> &);
void changed_devices_interface(map<string,DevIntr> &);
Expand Down Expand Up @@ -158,6 +160,7 @@ protected :
static ClassFactoryFuncPtr class_factory_func_ptr;

private:

#if ((defined _TG_WINDOWS_) && (defined TANGO_HAS_DLL) && !(defined _TANGO_LIB))
__declspec(dllexport) void class_factory();
#else
Expand Down
Loading

0 comments on commit 85057ac

Please sign in to comment.