Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout option to call_service messages #984

Draft
wants to merge 5 commits into
base: ros2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ROSBRIDGE_PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ Calls a ROS service.
"service": <string>,
(optional) "args": <list<json>>,
(optional) "fragment_size": <int>,
(optional) "compression": <string>
(optional) "compression": <string>,
(optional) "timeout": <float>
}
```

Expand All @@ -449,6 +450,7 @@ Calls a ROS service.
before it is fragmented
* **compression** – an optional string to specify the compression scheme to be
used on messages. Valid values are "none" and "png"
* **timeout** – the time, in seconds, to wait for a response from the server


Stops advertising an external ROS service server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def call_service(self, message):
fragment_size = message.get("fragment_size", None)
compression = message.get("compression", "none")
args = message.get("args", [])
timeout = message.get("timeout", 5.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the previous default was to never time out. Should this continue to be the case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to retain current behavior, it should by default never timeout.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I think we could change the defaults for rolling and future distros. @sea-bass what do you think about branching out for Jazzy ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK either way.

It would be the "right" thing to do to branch out for Jazzy, but I think we still have a few more months where it's okay to land breaking changes to Jazzy.

As an unofficial rule, I would say we can keep putting changes into Jazzy until May 2025 where you may see more people upgrading from ROS 1.


if CallService.services_glob is not None and CallService.services_glob:
self.protocol.log(
Expand Down Expand Up @@ -112,7 +113,14 @@ def call_service(self, message):
e_cb = partial(self._failure, cid, service)

# Run service caller in the same thread.
ServiceCaller(trim_servicename(service), args, s_cb, e_cb, self.protocol.node_handle).run()
ServiceCaller(
trim_servicename(service),
args,
timeout,
s_cb,
e_cb,
self.protocol.node_handle,
).run()

def _success(self, cid, service, fragment_size, compression, message):
outgoing_message = {
Expand Down
44 changes: 34 additions & 10 deletions rosbridge_library/src/rosbridge_library/internal/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import time
from threading import Thread
from threading import Condition, Thread
from typing import Any, Callable, Optional

import rclpy
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.expand_topic_name import expand_topic_name
from rclpy.node import Node
Expand All @@ -58,6 +56,7 @@ def __init__(
self,
service: str,
args: dict,
timeout: float,
success_callback: Callable[[dict], None],
error_callback: Callable[[Exception], None],
node_handle: Node,
Expand All @@ -71,6 +70,8 @@ def __init__(
ordered list, or a dict of name-value pairs. Anything else will be
treated as though no arguments were provided (which is still valid for
some kinds of service)
timeout -- the time, in seconds, to wait for a response from the server.
A non-positive value means no timeout.
success_callback -- a callback to call with the JSON result of the
service call
error_callback -- a callback to call if an error occurs. The
Expand All @@ -81,14 +82,22 @@ def __init__(
self.daemon = True
self.service = service
self.args = args
self.timeout = timeout
self.success = success_callback
self.error = error_callback
self.node_handle = node_handle

def run(self) -> None:
try:
# Call the service and pass the result to the success handler
self.success(call_service(self.node_handle, self.service, args=self.args))
self.success(
call_service(
self.node_handle,
self.service,
args=self.args,
server_response_timeout=self.timeout,
)
)
except Exception as e:
# On error, just pass the exception to the error handler
self.error(e)
Expand All @@ -114,8 +123,8 @@ def call_service(
node_handle: Node,
service: str,
args: Optional[dict] = None,
server_timeout_time: float = 1.0,
sleep_time: float = 0.001,
server_ready_timeout: float = 1.0,
server_response_timeout: float = 5.0,
) -> dict:
# Given the service name, fetch the type and class of the service,
# and a request instance
Expand All @@ -141,20 +150,35 @@ def call_service(
service_class, service, callback_group=ReentrantCallbackGroup()
)

if not client.wait_for_service(server_timeout_time):
if not client.wait_for_service(server_ready_timeout):
node_handle.destroy_client(client)
raise InvalidServiceException(service)

future = client.call_async(inst)
while rclpy.ok() and not future.done():
time.sleep(sleep_time)
condition = Condition()

def future_done_callback():
with condition:
condition.notify_all()

future.add_done_callback(lambda future: future_done_callback())

with condition:
if not condition.wait_for(
lambda: future.done(),
timeout=(server_response_timeout if server_response_timeout > 0 else None),
):
future.cancel()
node_handle.destroy_client(client)
raise Exception("Timeout exceeded while waiting for service response")

result = future.result()

node_handle.destroy_client(client)
if result is not None:
# Turn the response into JSON and pass to the callback
json_response = extract_values(result)
else:
raise Exception(result)
raise Exception("Service call returned None")

return json_response
2 changes: 2 additions & 0 deletions rosbridge_library/test/internal/services/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def start(self):
thread = services.ServiceCaller(
self.name,
gen,
5.0,
self.success,
self.error,
self.node,
Expand Down Expand Up @@ -201,6 +202,7 @@ def error():
services.ServiceCaller(
self.node.get_name() + "/list_parameters",
None,
5.0,
success,
error,
self.node,
Expand Down
Loading