diff --git a/usblib.py b/usblib.py index df9ef69..0844da0 100755 --- a/usblib.py +++ b/usblib.py @@ -171,9 +171,11 @@ def shell_usbdevice(fd, device): } banner = ( "Variables:\n" - + "\n".join(["{:>12}: {}".format(k, repr(v)) for k, v in namespace.items()]) - + "\n" - ) + + "\n".join( + "{:>12}: {}".format(k, repr(v)) for k, v in namespace.items() + ) + ) + "\n" + shell = InteractiveShellEmbed.instance(banner1=banner, user_ns=namespace) shell() @@ -245,7 +247,7 @@ def read_until(self, expected, size=-1): def contains(self, expected): with self.lock: - return -1 != self.buf.find(expected) + return self.buf.find(expected) != -1 def peek(self, size): return self.buf[:size] @@ -269,10 +271,7 @@ def __init__(self, duration): self.is_infinite = duration is None self.is_non_blocking = duration == 0 self.duration = duration - if duration is not None: - self.target_time = self.TIME() + duration - else: - self.target_time = None + self.target_time = self.TIME() + duration if duration is not None else None def expired(self): """Return a boolean, telling if the timeout has expired.""" @@ -286,13 +285,13 @@ def time_left(self): return None else: delta = self.target_time - self.TIME() - if delta > self.duration: - # clock jumped, recalculate - self.target_time = self.TIME() + self.duration - return self.duration - else: + if delta <= self.duration: return max(0, delta) + # clock jumped, recalculate + self.target_time = self.TIME() + self.duration + return self.duration + def restart(self, duration): """\ Restart a timeout, only supported if a timeout was already set up @@ -463,7 +462,7 @@ def get_endpoints(device): # -------------------------------- def send_ctrl_cmd(self, request, value=0, data=None, intf=0): - ret = self._device.ctrl_transfer( + return self._device.ctrl_transfer( CP210x_REQTYPE_HOST2DEVICE, request, wValue=value, @@ -471,7 +470,6 @@ def send_ctrl_cmd(self, request, value=0, data=None, intf=0): data_or_wLength=data, timeout=None, ) - return ret def recv_ctrl_cmd(self, request, blen, value=0, intf=0): buf = usb.util.create_buffer(blen) @@ -668,17 +666,14 @@ def set_DTR(self, on): self.send_ctrl_cmd(CP210x_SET_MHS, CP210x_MHS_DTR_OFF, None) def get_modem_state(self): - buf = self.recv_ctrl_cmd(CP210x_GET_MDMSTS, 1) - return buf + return self.recv_ctrl_cmd(CP210x_GET_MDMSTS, 1) def get_comm_status(self): - buf = self.recv_ctrl_cmd(CP210x_GET_COMM_STATUS, 19) - return buf + return self.recv_ctrl_cmd(CP210x_GET_COMM_STATUS, 19) def get_CTL(self): buf = self.recv_ctrl_cmd(CP210x_GET_LINE_CTL, 2) - val = struct.unpack(" 0: - rlen = size - len(data) - else: - rlen = size + rlen = size - len(data) if size > 0 else size chunk = buf.read_until(expected_last, rlen) data += chunk @@ -862,7 +854,7 @@ def read_until_or_none(self, expected=b"\n", size=None, timeout=None): # check if needle in size limit if size > 0 and size < len(buf): data_peek = buf.peek(size) - if -1 == data_peek.find(expected): + if data_peek.find(expected) == -1: return None # needle should be in limit @@ -1101,10 +1093,7 @@ def prepare_usb_cp210x(self, intf=0, baudRate=DEFAULT_BAUDRATE): return False ret = self.send_ctrl_cmd(CP210x_SET_MHS, CP210x_MHS_DEFAULT, None) - if ret < 0: - return False - - return True + return ret >= 0 def open(self, _async=True): if self._is_open: @@ -1126,7 +1115,7 @@ def read_dump_forever(self): while True: try: data = device.read(endp_in.bEndpointAddress, endp_in.wMaxPacketSize) - text = "".join([chr(v) for v in data]) + text = "".join(chr(v) for v in data) print(text, end="", flush=True) except libusb1.USBError: pass diff --git a/usbtest_rw1.py b/usbtest_rw1.py index 66278c0..f1e911f 100644 --- a/usbtest_rw1.py +++ b/usbtest_rw1.py @@ -44,7 +44,7 @@ def dump_test(ser): def dumper(): while not stop: data = ser._buf_in.read(100) - text = "".join([chr(v) for v in data]) + text = "".join(chr(v) for v in data) print(text, end="", flush=True) t = threading.Thread(target=dumper) diff --git a/usbtest_rw_buf.py b/usbtest_rw_buf.py index 42bd1a1..330fbb8 100644 --- a/usbtest_rw_buf.py +++ b/usbtest_rw_buf.py @@ -69,7 +69,7 @@ def dumper(): if not data: print(" no data") continue - text = "".join([chr(v) for v in data]) + text = "".join(chr(v) for v in data) print(" data:", text, end="\n", flush=True) print("read stopped")