__init__.py (2388B) download
1from datetime import datetime
2from typing import Any, Callable, Dict, Optional
3from serial import Serial
4from serial.tools.list_ports import comports
5from .exception import RemoteException
6
7import json
8import traceback
9
10
11CommandHandler = Callable[[Dict[str, Any]], Optional[Dict[str, Any]]]
12
13
14class Remote:
15 commands: Dict[str, CommandHandler] = dict()
16 running = True
17
18 @staticmethod
19 def list_ports():
20 return comports()
21
22 def __init__(self, baud: int):
23 self.baud = baud
24
25 def command(self, name: str):
26 def inner(func: CommandHandler):
27 self.commands[name] = func
28 return func
29
30 return inner
31
32 def run(self, serial_path, timeout=1):
33 serial = Serial(port=serial_path, baudrate=self.baud, timeout=timeout)
34 self.running = True
35 while self.running:
36 command = ''
37 status = ''
38 params = None
39 response = None
40 try:
41 line = serial.readline().decode(errors='ignore')
42 try:
43 if line == '':
44 continue
45 if ' ' in line:
46 command, params_raw = line.split(' ', 1)
47 params = json.loads(params_raw)
48 else:
49 command = line
50 params = dict()
51 except json.JSONDecodeError:
52 raise RemoteException('bad-request')
53
54 if command not in self.commands:
55 raise RemoteException('bad-command')
56
57 res = self.commands[command](params) or dict()
58
59 status = 'ok'
60 response = json.dumps(res)
61 except RemoteException as err:
62 status = err.name
63 except KeyboardInterrupt:
64 break
65 except Exception as err:
66 print(f'Error handling {command} ({params}):')
67 traceback.print_exc()
68 status = 'unknown'
69
70 print(
71 f'{datetime.now().strftime("%d/%m/%y %H:%M:%S")} | {command} -> { status }')
72 serial.write(status.encode())
73 if response is not None:
74 serial.write(b' ' + response.encode())
75 serial.write(b'\n')
76 serial.close()
77
78 def stop(self):
79 self.running = False