77
88"""
99
10+ from typing import Any , Optional , Tuple
11+ from can import typechecking
12+
1013import time
1114import logging
1215
@@ -52,38 +55,39 @@ class slcanBus(BusABC):
5255
5356 def __init__ (
5457 self ,
55- channel ,
56- ttyBaudrate = 115200 ,
57- bitrate = None ,
58- btr = None ,
59- sleep_after_open = _SLEEP_AFTER_SERIAL_OPEN ,
60- rtscts = False ,
61- ** kwargs
62- ):
58+ channel : typechecking . ChannelStr ,
59+ ttyBaudrate : int = 115200 ,
60+ bitrate : Optional [ int ] = None ,
61+ btr : Optional [ str ] = None ,
62+ sleep_after_open : float = _SLEEP_AFTER_SERIAL_OPEN ,
63+ rtscts : bool = False ,
64+ ** kwargs : Any
65+ ) -> None :
6366 """
6467 :raise ValueError: if both *bitrate* and *btr* are set
6568
66- :param str channel:
69+ :param channel:
6770 port of underlying serial or usb device (e.g. /dev/ttyUSB0, COM8, ...)
6871 Must not be empty.
69- :param int ttyBaudrate:
72+ :param ttyBaudrate:
7073 baudrate of underlying serial or usb device
71- :param int bitrate:
74+ :param bitrate:
7275 Bitrate in bit/s
73- :param str btr:
76+ :param btr:
7477 BTR register value to set custom can speed
75- :param float poll_interval:
78+ :param poll_interval:
7679 Poll interval in seconds when reading messages
77- :param float sleep_after_open:
80+ :param sleep_after_open:
7881 Time to wait in seconds after opening serial connection
79- :param bool rtscts:
82+ :param rtscts:
8083 turn hardware handshake (RTS/CTS) on and off
8184 """
8285
8386 if not channel : # if None or empty
8487 raise TypeError ("Must specify a serial port." )
8588 if "@" in channel :
86- (channel , ttyBaudrate ) = channel .split ("@" )
89+ (channel , baudrate ) = channel .split ("@" )
90+ ttyBaudrate = int (baudrate )
8791 self .serialPortOrig = serial .serial_for_url (
8892 channel , baudrate = ttyBaudrate , rtscts = rtscts
8993 )
@@ -104,36 +108,38 @@ def __init__(
104108 channel , ttyBaudrate = 115200 , bitrate = None , rtscts = False , ** kwargs
105109 )
106110
107- def set_bitrate (self , bitrate ) :
111+ def set_bitrate (self , bitrate : int ) -> None :
108112 """
109113 :raise ValueError: if both *bitrate* is not among the possible values
110114
111- :param int bitrate:
115+ :param bitrate:
112116 Bitrate in bit/s
113117 """
114118 self .close ()
115119 if bitrate in self ._BITRATES :
116120 self ._write (self ._BITRATES [bitrate ])
117121 else :
118122 raise ValueError (
119- "Invalid bitrate, choose one of " + (", " .join (self ._BITRATES )) + "."
123+ "Invalid bitrate, choose one of "
124+ + (", " .join (str (k ) for k in self ._BITRATES .keys ()))
125+ + "."
120126 )
121127 self .open ()
122128
123- def set_bitrate_reg (self , btr ) :
129+ def set_bitrate_reg (self , btr : str ) -> None :
124130 """
125- :param str btr:
131+ :param btr:
126132 BTR register value to set custom can speed
127133 """
128134 self .close ()
129135 self ._write ("s" + btr )
130136 self .open ()
131137
132- def _write (self , string ) :
138+ def _write (self , string : str ) -> None :
133139 self .serialPortOrig .write (string .encode () + self .LINE_TERMINATOR )
134140 self .serialPortOrig .flush ()
135141
136- def _read (self , timeout ) :
142+ def _read (self , timeout : Optional [ float ]) -> Optional [ str ] :
137143
138144 # first read what is already in receive buffer
139145 while self .serialPortOrig .in_waiting :
@@ -165,18 +171,20 @@ def _read(self, timeout):
165171 break
166172 return string
167173
168- def flush (self ):
174+ def flush (self ) -> None :
169175 del self ._buffer [:]
170176 while self .serialPortOrig .in_waiting :
171177 self .serialPortOrig .read ()
172178
173- def open (self ):
179+ def open (self ) -> None :
174180 self ._write ("O" )
175181
176- def close (self ):
182+ def close (self ) -> None :
177183 self ._write ("C" )
178184
179- def _recv_internal (self , timeout ):
185+ def _recv_internal (
186+ self , timeout : Optional [float ]
187+ ) -> Tuple [Optional [Message ], bool ]:
180188
181189 canId = None
182190 remote = False
@@ -223,7 +231,7 @@ def _recv_internal(self, timeout):
223231 return msg , False
224232 return None , False
225233
226- def send (self , msg , timeout = None ):
234+ def send (self , msg : Message , timeout : Optional [ float ] = None ) -> None :
227235 if timeout != self .serialPortOrig .write_timeout :
228236 self .serialPortOrig .write_timeout = timeout
229237 if msg .is_remote_frame :
@@ -239,20 +247,21 @@ def send(self, msg, timeout=None):
239247 sendStr += "" .join (["%02X" % b for b in msg .data ])
240248 self ._write (sendStr )
241249
242- def shutdown (self ):
250+ def shutdown (self ) -> None :
243251 self .close ()
244252 self .serialPortOrig .close ()
245253
246- def fileno (self ):
254+ def fileno (self ) -> int :
247255 if hasattr (self .serialPortOrig , "fileno" ):
248256 return self .serialPortOrig .fileno ()
249257 # Return an invalid file descriptor on Windows
250258 return - 1
251259
252- def get_version (self , timeout ):
260+ def get_version (
261+ self , timeout : Optional [float ]
262+ ) -> Tuple [Optional [int ], Optional [int ]]:
253263 """Get HW and SW version of the slcan interface.
254264
255- :type timeout: int or None
256265 :param timeout:
257266 seconds to wait for version or None to wait indefinitely
258267
@@ -288,14 +297,12 @@ def get_version(self, timeout):
288297 else :
289298 return None , None
290299
291- def get_serial_number (self , timeout ) :
300+ def get_serial_number (self , timeout : Optional [ float ]) -> Optional [ str ] :
292301 """Get serial number of the slcan interface.
293302
294- :type timeout: int or None
295303 :param timeout:
296304 seconds to wait for serial number or None to wait indefinitely
297305
298- :rtype str or None
299306 :return:
300307 None on timeout or a str object.
301308 """
0 commit comments