4
4
import asyncio
5
5
import traceback
6
6
7
- from pymodbus .exceptions import NoSuchSlaveException
7
+ from pymodbus .exceptions import ModbusIOException , NoSuchSlaveException
8
8
from pymodbus .logging import Log
9
9
from pymodbus .pdu .pdu import ExceptionResponse
10
10
from pymodbus .transaction import TransactionManager
@@ -29,9 +29,6 @@ def __init__(self, owner):
29
29
self .server = owner
30
30
self .framer = self .server .framer (self .server .decoder )
31
31
self .running = False
32
- self .handler_task = None # coroutine to be run on asyncio loop
33
- self .databuffer = b''
34
- self .loop = asyncio .get_running_loop ()
35
32
super ().__init__ (
36
33
params ,
37
34
self .framer ,
@@ -44,8 +41,7 @@ def __init__(self, owner):
44
41
45
42
def callback_new_connection (self ) -> ModbusProtocol :
46
43
"""Call when listener receive new connection request."""
47
- Log .debug ("callback_new_connection called" )
48
- return ServerRequestHandler (self )
44
+ raise RuntimeError ("callback_new_connection should never be called" )
49
45
50
46
def callback_connected (self ) -> None :
51
47
"""Call when connection is succcesfull."""
@@ -54,27 +50,11 @@ def callback_connected(self) -> None:
54
50
if self .server .broadcast_enable :
55
51
if 0 not in slaves :
56
52
slaves .append (0 )
57
- try :
58
- self .running = True
59
-
60
- # schedule the connection handler on the event loop
61
- self .handler_task = asyncio .create_task (self .handle ())
62
- self .handler_task .set_name ("server connection handler" )
63
- except Exception as exc : # pylint: disable=broad-except
64
- Log .error (
65
- "Server callback_connected exception: {}; {}" ,
66
- exc ,
67
- traceback .format_exc (),
68
- )
69
53
70
54
def callback_disconnected (self , call_exc : Exception | None ) -> None :
71
55
"""Call when connection is lost."""
72
56
super ().callback_disconnected (call_exc )
73
57
try :
74
- if self .handler_task :
75
- self .handler_task .cancel ()
76
- if hasattr (self .server , "on_connection_lost" ):
77
- self .server .on_connection_lost ()
78
58
if call_exc is None :
79
59
Log .debug (
80
60
"Handler for stream [{}] has been canceled" , self .comm_params .comm_name
@@ -93,66 +73,46 @@ def callback_disconnected(self, call_exc: Exception | None) -> None:
93
73
traceback .format_exc (),
94
74
)
95
75
96
- async def handle (self ) -> None :
97
- """Coroutine which represents a single master <=> slave conversation.
98
-
99
- Once the client connection is established, the data chunks will be
100
- fed to this coroutine via the asyncio.Queue object which is fed by
101
- the ServerRequestHandler class's callback Future.
102
-
103
- This callback future gets data from either asyncio.BaseProtocol.data_received
104
- or asyncio.DatagramProtocol.datagram_received.
76
+ def callback_data (self , data : bytes , addr : tuple | None = None ) -> int :
77
+ """Handle received data."""
78
+ try :
79
+ used_len = super ().callback_data (data , addr )
80
+ except ModbusIOException :
81
+ response = ExceptionResponse (
82
+ 40 ,
83
+ exception_code = ExceptionResponse .ILLEGAL_FUNCTION
84
+ )
85
+ self .server_send (response , 0 )
86
+ return (len (data ))
87
+ if self .last_pdu :
88
+ if self .is_server :
89
+ self .loop .call_soon (self .handle_later )
90
+ else :
91
+ self .response_future .set_result (True )
92
+ return used_len
105
93
106
- This function will execute without blocking in the while-loop and
107
- yield to the asyncio event loop when the frame is exhausted.
108
- As a result, multiple clients can be interleaved without any
109
- interference between them.
110
- """
111
- while self .running :
112
- try :
113
- pdu , * addr , exc = await self .server_execute ()
114
- if exc :
115
- pdu = ExceptionResponse (
116
- 40 ,
117
- exception_code = ExceptionResponse .ILLEGAL_FUNCTION
118
- )
119
- self .server_send (pdu , 0 )
120
- continue
121
- await self .server_async_execute (pdu , * addr )
122
- except asyncio .CancelledError :
123
- # catch and ignore cancellation errors
124
- if self .running :
125
- Log .debug (
126
- "Handler for stream [{}] has been canceled" , self .comm_params .comm_name
127
- )
128
- self .running = False
129
- except Exception as exc : # pylint: disable=broad-except
130
- # force TCP socket termination as framer
131
- # should handle application layer errors
132
- Log .error (
133
- 'Unknown exception "{}" on stream {} forcing disconnect' ,
134
- exc ,
135
- self .comm_params .comm_name ,
136
- )
137
- self .close ()
138
- self .callback_disconnected (exc )
94
+ def handle_later (self ):
95
+ """Change sync (async not allowed in call_soon) to async."""
96
+ asyncio .run_coroutine_threadsafe (self .handle_request (), self .loop )
139
97
140
- async def server_async_execute (self , request , * addr ):
98
+ async def handle_request (self ):
141
99
"""Handle request."""
142
100
broadcast = False
101
+ if not self .last_pdu :
102
+ return
143
103
try :
144
- if self .server .broadcast_enable and not request .dev_id :
104
+ if self .server .broadcast_enable and not self . last_pdu .dev_id :
145
105
broadcast = True
146
106
# if broadcasting then execute on all slave contexts,
147
107
# note response will be ignored
148
108
for dev_id in self .server .context .slaves ():
149
- response = await request .update_datastore (self .server .context [dev_id ])
109
+ response = await self . last_pdu .update_datastore (self .server .context [dev_id ])
150
110
else :
151
- context = self .server .context [request .dev_id ]
152
- response = await request .update_datastore (context )
111
+ context = self .server .context [self . last_pdu .dev_id ]
112
+ response = await self . last_pdu .update_datastore (context )
153
113
154
114
except NoSuchSlaveException :
155
- Log .error ("requested slave does not exist: {}" , request .dev_id )
115
+ Log .error ("requested slave does not exist: {}" , self . last_pdu .dev_id )
156
116
if self .server .ignore_missing_slaves :
157
117
return # the client will simply timeout waiting for a response
158
118
response = ExceptionResponse (0x00 , ExceptionResponse .GATEWAY_NO_RESPONSE )
@@ -165,9 +125,9 @@ async def server_async_execute(self, request, *addr):
165
125
response = ExceptionResponse (0x00 , ExceptionResponse .SLAVE_FAILURE )
166
126
# no response when broadcasting
167
127
if not broadcast :
168
- response .transaction_id = request .transaction_id
169
- response .dev_id = request .dev_id
170
- self .server_send (response , * addr )
128
+ response .transaction_id = self . last_pdu .transaction_id
129
+ response .dev_id = self . last_pdu .dev_id
130
+ self .server_send (response , self . last_addr )
171
131
172
132
def server_send (self , pdu , addr ):
173
133
"""Send message."""
0 commit comments