Skip to content

ControlInterface wrapper to help with timeouts #203

@migurski

Description

@migurski

I use set_rt_frame_socket to send individual frames to my light strings. These are shown for 60 seconds before the string reverts to its last stored movie. Because the xled.control.ControlInterface does not set a short timeout when communicating with the lights, sometimes I see many-minute hangs during which communication is lost. These failed connections are intermittent, so in the absence of an xled timeout argument I’ve found that this wrapper class works well:

class TimeoutControlInterface:
    interface = None

    def __init__(self, *args, **kwargs):
        """Initialize xled.control.ControlInterface with same arguments"""
        self.interface = xled.control.ControlInterface(*args, **kwargs)

    def __getattr__(self, name):
        """Retrieve versions of self.interface methods that use a threaded timeout"""
        original_method, responses = getattr(self.interface, name), []

        def wrapped_method(*args, **kwargs):
            """Call the original method and stash its response"""
            responses.append(original_method(*args, **kwargs))

        def threaded_method(*args, **kwargs):
            """Call the wrapped method inside a thread and return its response"""
            thread = threading.Thread(target=wrapped_method, args=args, kwargs=kwargs)
            thread.start()
            thread.join(timeout=5.0)
            if thread.is_alive():
                raise Exception("Thread still alive")
            return responses[0]

        return threaded_method

It’s used just like xled.control.ControlInterface but raises an exception when a method call requires more than five seconds to complete. Hope it’s useful to someome!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions