remote.py (3977B) download
1from threading import Thread
2from time import sleep
3import tkinter as tk
4from tkinter import Button, OptionMenu, StringVar, Tk, Label
5from http.client import HTTPConnection
6from typing import Optional
7
8from remote import Remote
9
10import json
11import sys
12import asyncio
13import websockets
14
15
16WEBSOCKET_PORT = 1612
17host, port = 'localhost', 5000
18
19remote = Remote(115200)
20token: Optional[str] = None
21
22
23@remote.command("set_token")
24def set_token(req):
25 global token
26 token = req['token']
27
28
29@remote.command("send")
30def send_http(params):
31 method, endpoint, body = params["method"], params["endpoint"], params["body"]
32
33 print(body)
34
35 client = HTTPConnection(host, port)
36 client.request(method, endpoint, json.dumps(body))
37 res = client.getresponse()
38 response = json.load(res)
39
40 print(response)
41
42 return dict(code=res.status, body=response)
43
44
45token = 'abcdefghijklmnoq'
46
47
48async def websocket_handler(ws, _):
49 if await ws.recv() == 'token':
50 if token:
51 await ws.send(token)
52 await ws.close()
53
54
55class RemoteWindow(Tk):
56 running = False
57 closed = False
58 disconnecting = False
59
60 def __init__(self):
61 super().__init__()
62
63 self.title('Team Benni - Remote')
64 self.geometry('500x100')
65 self.protocol("WM_DELETE_WINDOW", self.on_close)
66
67 self.columnconfigure(0, weight=1)
68 self.columnconfigure(1, weight=3)
69# self.columnconfigure(2, weight=3)
70
71 self.devices = Remote.list_ports()
72 self.device_names = [
73 f'{p.name} ({p.description})' for p in self.devices]
74
75 self.dev_var = StringVar(self, self.device_names[0])
76
77 self.label = Label(self, text='Not connected')
78 self.label['anchor'] = tk.CENTER
79 self.label.grid(column=0, row=0, sticky=tk.W,
80 padx=5, pady=5, columnspan=2)
81
82 self.dev_label = Label(self, text='Device:')
83 self.dev_label.grid(column=0, row=1, sticky=tk.E, padx=5, pady=5)
84
85 self.dev_menu = OptionMenu(self, self.dev_var, *self.device_names)
86 self.dev_menu.grid(column=1, row=1, sticky=tk.E, padx=5, pady=5)
87
88 self.connect_button = Button(
89 self, text="Connect", command=self.on_connect)
90 self.connect_button.grid(column=1, row=3, sticky=tk.E, padx=5, pady=5)
91
92 async def run_websocket(self):
93 async with websockets.serve(websocket_handler, '0.0.0.0', # type: ignore
94 WEBSOCKET_PORT):
95 while self.running:
96 await asyncio.sleep(1)
97
98 def on_connect(self):
99 if self.disconnecting:
100 return
101 self.running = not self.running
102 if self.running:
103 port = self.devices[self.device_names.index(self.dev_var.get())]
104
105 self.websocket_thread = Thread(
106 target=lambda: asyncio.run(self.run_websocket()))
107 self.remote_thread = Thread(
108 target=lambda: remote.run(port.device))
109
110 self.websocket_thread.start()
111 self.remote_thread.start()
112
113 self.label['text'] = f'Connected to {port.name}'
114 if port.description != 'n/a':
115 self.label['text'] += f' ({port.description})'
116 self.connect_button['text'] = 'Disconnect'
117 else:
118 remote.stop()
119 self.disconnecting = True
120
121 self.connect_button['text'] = 'Disconnecting...'
122
123 def on_close(self):
124 if self.running:
125 self.on_connect()
126
127 self.closed = True
128
129 def run(self):
130 while not self.closed or self.disconnecting:
131 if self.disconnecting and not self.remote_thread.is_alive() and not self.websocket_thread.is_alive():
132 self.label['text'] = 'Not connected'
133 self.connect_button['text'] = 'Connect'
134 self.disconnecting = False
135
136 sleep(0.1)
137 self.update()
138
139
140if __name__ == "__main__":
141 win = RemoteWindow()
142 win.run()