robotengine.ho_robot

ho_robot 是 robotengine 控制 ho 机器人的节点。

ho_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。

如果配置了 url ho_link 节点会自动发送机器人的状态 HoState 到 url 指定的地址。

ho_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。

使用 ho_link.update() 函数可以向机器人发送数据。

挂载 ho_link 节点后,_process()的处理速度会显著受到影响,请酌情调整 engine 的运行频率。

  1"""
  2
  3ho_robot 是 robotengine 控制 ho 机器人的节点。
  4
  5ho_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。
  6
  7如果配置了 url ho_link 节点会自动发送机器人的状态 HoState 到 url 指定的地址。
  8
  9ho_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。
 10
 11使用 ho_link.update() 函数可以向机器人发送数据。
 12
 13挂载 ho_link 节点后,_process()的处理速度会显著受到影响,请酌情调整 engine 的运行频率。
 14"""
 15
 16from robotengine.node import Node
 17from robotengine.serial_io import SerialIO, DeviceType, CheckSumType
 18from robotengine.tools import hex2str, warning, error, info, near, vector_angle, vector_length, compute_vector_projection, find_closest_vectors
 19from robotengine.signal import Signal
 20from robotengine.timer import Timer
 21from typing import List, Tuple
 22from enum import Enum
 23import requests
 24import threading
 25import time
 26import random
 27import multiprocessing
 28import tkinter as tk
 29from ttkbootstrap import ttk
 30import ttkbootstrap as ttkb
 31from fastapi import FastAPI, Request
 32import uvicorn
 33from urllib.parse import urlparse
 34import copy
 35import math
 36import json
 37import os
 38
 39class HoMode(Enum):
 40    """ Ho 电机模态 """
 41    S = 0
 42    """ 停止 """
 43    I = 1
 44    """ 电流控制 """
 45    V = 2
 46    """ 速度控制 """
 47    P = 3
 48    """ 位置控制 """
 49
 50class AlignState:
 51    """ 帧和时间戳对齐的状态数据 """
 52    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None:
 53        """ 
 54        初始化对齐状态数据
 55
 56            :param id: 电机 id
 57            :param i: 电流
 58            :param v: 速度
 59            :param p: 位置
 60            :param frame: 当前帧
 61            :param timestamp: 当前时间戳 
 62        """
 63        self.id = id
 64        """ 电机 id """
 65        self.i: float = i
 66        """ 电流 """
 67        self.v: float = v
 68        """ 速度 """
 69        self.p: float = p
 70        """ 位置 """
 71        self.frame = frame
 72        """ 此状态数据对应的帧 """
 73        self.timestamp = timestamp
 74        """ 此状态数据对应的时间戳 """
 75
 76    def to_dict(self):
 77        """ 转换为字典 """
 78        return {
 79            "id": self.id,
 80            "i": self.i,
 81            "v": self.v,
 82            "p": self.p,
 83            "frame": self.frame,
 84            "timestamp": self.timestamp
 85        }
 86
 87    def __repr__(self):
 88        return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})"
 89    
 90class HoState:
 91    """ Ho 机器人状态 """
 92    def __init__(self, states: List[AlignState], random_state=False) -> None:
 93        """ 
 94        初始化 Ho 机器人状态
 95
 96            :param states: 帧和时间戳对齐的状态数据列表
 97            :param random_state: 是否随机生成状态数据
 98        """
 99        if not random_state:
100            self._states = states
101        else:
102            self._states = []
103            for i in range(1, 9):
104                self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0))
105
106    def get_state(self, id: int) -> AlignState:
107        """ 
108        获取指定 id 的状态 
109        """
110        for state in self._states:
111            if state.id == id:
112                return state
113        return None
114    
115    def get_states(self) -> List[AlignState]:
116        """ 
117        获取所有状态 
118        """
119        return copy.deepcopy(self._states)
120    
121    def to_dict(self):
122        """ 
123        转换为字典 
124        """
125        return {
126            "states": [state.to_dict() for state in self._states]
127        }
128    
129    def __repr__(self):
130        state_str = ""
131        for state in self._states:
132            state_str += str(state)
133            if state != self._states[-1]:
134                state_str += "\n"
135        return f"HoState(\n{state_str})"
136
137class HoLink(Node):
138    """ Ho 机器人链接节点 """
139    def __init__(self, name="HoLink", buffer_capacity: int=1024, urls: List[str]=[], warn=True) -> None:
140        """ 
141        初始化 Ho 机器人链接节点 
142
143            :param name: 节点名称
144            :param buffer_capacity: 存储状态数据的缓冲区的容量
145            :param urls: 服务地址列表
146            :param read_mode: 串口读取模式
147            :param warn: 是否显示警告
148        """
149        super().__init__(name)
150        self._data_length = 84
151        self._receive_data = None
152        self._urls = urls
153        self._warn = warn
154        
155        if self._urls:
156            self._shutdown = multiprocessing.Event()
157            self._pending_capacity = 256
158            self._pending_requests = multiprocessing.Queue()
159            self._http_process = multiprocessing.Process(target=self._http_request, daemon=True, name=self.name+"HttpProcess")
160            self._http_process.start()
161
162        self._buffer_capacity: int = buffer_capacity
163        """ 存储状态数据的缓冲区的容量 """
164        self._state_buffer: List[HoState] = []
165        """ 存储状态数据的缓冲区 """
166
167        self.sio: SerialIO = SerialIO(name="HoSerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=warn, baudrate=1000000, timeout=1.0)
168        """ 串口节点 HoLink 会主动挂载一个已经配置好的串口节点 """
169        self.add_child(self.sio)
170
171        self.receive: Signal = Signal(bytes)
172        """ 信号,当接收到数据时触发(无论是否通过校验和) """
173        self.ho_state_update: Signal = Signal(HoState)
174        """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """
175
176    def _ready(self) -> None:
177        pass
178
179    def get_ho_state(self) -> HoState:
180        """
181        获取机器人当前最新的状态数据
182        """
183        if len(self._state_buffer) == 0:
184            return None
185        return self._state_buffer[-1]
186    
187    def get_ho_state_buffer(self) -> List[HoState]:
188        """
189        获取机器人当前的状态数据缓冲区
190        """
191        return copy.deepcopy(self._state_buffer)
192
193    def _add_pending_request(self, ho_state: HoState):
194        """ 
195        向请求队列中添加请求 
196        """
197        self._pending_requests.put(ho_state)
198        if self._pending_requests.qsize() > self._pending_capacity:
199            if self._warn:
200                warning(f"{self.name}{self._urls} 发送请求时,请求队列已满,将丢弃最早的请求,可能会导致数据丢失")
201            self._pending_requests.get()
202
203    def _send_request(self, ho_state_dict: dict) -> None:
204        """
205        向服务地址发送请求
206        """
207        try:
208            for _url in self._urls:
209                response = requests.post(_url, json=ho_state_dict, timeout=0.1)
210
211        except requests.RequestException as e:
212            if self._warn:
213                warning(f"请求失败: {e}")
214        except Exception as e:
215            if self._warn:
216                warning(f"发生未知错误: {e}")
217
218    def _http_request(self):
219        info(f"{self.name} 已开启向服务地址 {self._urls} 发送数据的功能")
220        while not self._shutdown.is_set():
221            if not self._pending_requests.empty():
222                ho_state = self._pending_requests.get()
223                self._send_request(ho_state.to_dict())
224
225    def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None:
226        """ 
227        向机器人发送数据 
228        """
229        data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \
230            self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2)
231        # print(f"发送数据: {hex2str(data)}")
232        self.sio.transmit(data)
233
234    def _process(self, delta) -> None:
235        self._receive_data = self.sio.receive(self._data_length)
236        if self._receive_data:
237            if self.sio.check_sum(self._receive_data):
238                states = []
239                receive_data = self._receive_data[2:-2]
240
241                id = 1
242                for i in range(0, 80, 10):
243                    _data = receive_data[i:i+10]
244                    _p = self._decode(_data[0:4], 100.0, 4)
245                    _v = self._decode(_data[4:8], 100.0, 4)
246                    _i = self._decode(_data[8:10], 100.0, 2)
247
248                    align_state = AlignState(id=id, i=_i, v=_v, p=_p, frame=self.engine.get_frame(), timestamp=self.engine.get_timestamp())
249                    states.append(align_state)
250                    id += 1
251
252                ho_state = HoState(states)
253                self._state_buffer.append(ho_state)
254
255                if len(self._state_buffer) > self._buffer_capacity:
256                    self._state_buffer.pop(0)
257
258                self.ho_state_update.emit(ho_state)
259                if self._urls:
260                    self._add_pending_request(ho_state)
261            else:
262                if self._warn:
263                    warning(f"{self.name} 长度为 {len(self._receive_data)} 的数据 {hex2str(self._receive_data)} 校验和错误")
264            self.receive.emit(self._receive_data)
265
266    def _encode(self, value: float, scale_factor: float, byte_length: int) -> bytes:
267        max_value = (1 << (8 * byte_length - 1))
268        max_scaled_value = max_value / scale_factor
269
270        if abs(value) >= max_scaled_value:
271            raise ValueError(f"要编码的值 {round(value, 2)} 超出范围 [-{max_scaled_value}, {max_scaled_value}]")
272
273        encoded_value = int(value * scale_factor) + max_value
274        
275        max_value_for_length = (1 << (8 * byte_length)) - 1
276        if encoded_value > max_value_for_length:
277            raise ValueError(f"编码值 {encoded_value} 超出了 {byte_length} 字节的最大值 {max_value_for_length}")
278
279        byte_data = []
280        for i in range(byte_length):
281            byte_data.insert(0, encoded_value & 0xFF)
282            encoded_value >>= 8
283
284        return bytes(byte_data)
285
286    def _decode(self, data: bytes, scale_factor: float, byte_length: int) -> float:
287        if len(data) != byte_length:
288            raise ValueError(f"数据长度 {len(data)} 与指定的字节长度 {byte_length} 不匹配")
289        max_value = (1 << (8 * byte_length - 1))
290
291        decoded_value = 0
292        for i in range(byte_length):
293            decoded_value <<= 8
294            decoded_value |= data[i]
295        
296        decoded_value -= max_value
297
298        return decoded_value / scale_factor
299    
300    # def _on_engine_exit(self):
301    #     if self._urls:
302    #         self._shutdown.set()
303    #         self._http_process.join()
304            
305
306class HoServer:
307    def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None:
308        """
309        初始化 HoServer 实例。
310
311            :param url: 服务器的 URL。
312            :param capacity: 数据缓冲区的最大容量。
313            :param ui: 是否启用 UI 界面。
314            :param ui_frequency: UI 更新频率(Hz)。
315        """
316        self._urls = url
317        parsed_url = urlparse(url)
318        self._host = parsed_url.hostname
319        self._port = parsed_url.port
320        self._path = parsed_url.path
321
322        self._ui = ui
323        self._ui_frequency = ui_frequency
324        self._capacity = capacity
325        self._data_buffer = []
326        """ 
327        数据缓冲区 
328        """
329
330        self._data_queue = multiprocessing.Queue()
331        self._shutdown = multiprocessing.Event()
332
333        # 启动 FastAPI 应用进程
334        self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True)
335
336    def _update_data(self):
337        """
338        从数据队列中读取数据并更新缓冲区。
339        """
340        while not self._shutdown.is_set():
341            if not self._data_queue.empty():
342                ho_state = self._data_queue.get()
343                self._data_buffer.append(ho_state)
344                if len(self._data_buffer) > self._capacity:
345                    self._data_buffer.pop(0)
346
347    def has_data(self) -> bool:
348        """
349        检查缓冲区中是否有数据。
350
351            :return: 如果缓冲区中有数据,则返回 True,否则返回 False。
352        """
353        return len(self._data_buffer) > 0
354
355    def get_data(self) -> HoState:
356        """
357        获取缓冲区中最新的数据。
358
359            :return: 缓冲区中最老的数据,如果缓冲区为空,则返回 None。
360        """
361        if not self.has_data():
362            return None
363        return self._data_buffer.pop(0)
364    
365    def get_data_buffer(self) -> List[HoState]:
366        """
367        获取缓冲区。
368
369        注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失
370
371            :return: 缓冲区。
372        """
373        return copy.deepcopy(self._data_buffer)
374    
375    def flush(self) -> None:
376        """
377        清空缓冲区。
378        """
379        self._data_buffer.clear()
380    
381    def length(self) -> int:
382        """
383        获取缓冲区中的数据长度。
384
385            :return: 缓冲区中的数据长度。
386        """
387        return len(self._data_buffer)
388
389    def _init_ui(self) -> None:
390        """
391        初始化 UI。
392        """
393        self.root = tk.Tk()
394        self.root.title("HoServer")
395        self.root.geometry("800x600")
396
397    def run(self) -> None:
398        """
399        启动服务器并运行 UI 更新线程(如果启用 UI)。
400        """
401        self._app_process.start()
402
403        # 数据更新线程
404        self._data_thread = threading.Thread(target=self._update_data, daemon=True)
405        self._data_thread.start()
406
407        if self._ui:
408            self._init_ui()
409            # UI 更新线程
410            self._ui_thread = threading.Thread(target=self._update_ui, daemon=True)
411            self._ui_thread.start()
412
413            self.root.mainloop()
414
415    def _run_app(self, path: str, host: str, port: int) -> None:
416        """
417        启动 FastAPI 服务器并监听请求。
418
419            :param path: API 路径。
420            :param host: 服务器主机。
421            :param port: 服务器端口。
422        """
423        app = FastAPI()
424        app.add_api_route(path, self._handle_data, methods=["POST"])
425
426        uvicorn.run(app, host=host, port=port)
427
428    async def _handle_data(self, request: Request) -> dict:
429        """
430        处理接收到的 POST 请求数据。
431
432            :param request: FastAPI 请求对象。
433            :return: 处理结果。
434        """
435        json_data = await request.json()
436        states_data = json_data.get("states", [])
437
438        states = []
439        for state_data in states_data:
440            state = AlignState(
441                id=state_data["id"],
442                i=state_data["i"],
443                v=state_data["v"],
444                p=state_data["p"],
445                frame=state_data["frame"],
446                timestamp=state_data["timestamp"]
447            )
448            states.append(state)
449        
450        ho_state = HoState(states=states)
451        self._data_queue.put(ho_state)
452        return {"message": "Data received"}
453
454    def _init_ui(self) -> None:
455        """
456        初始化 UI 界面。
457        """
458        self.root = ttkb.Window(themename="superhero", title="HoServer")
459
460        frame = ttk.Frame(self.root)
461        frame.pack(padx=10, pady=10)
462
463        columns = ['Id', 'Frame', 'Timestamp', 'i', 'v', 'p']
464        self.entries = {}
465
466        # 创建表头
467        for col, column_name in enumerate(columns):
468            label = ttk.Label(frame, text=column_name, width=5)
469            label.grid(row=0, column=col, padx=5, pady=5)
470
471        # 创建数据输入框
472        for row in range(8):
473            id_label = ttk.Label(frame, text=f"{row + 1}", width=5)
474            id_label.grid(row=row + 1, column=0, padx=5, pady=5)
475            for col in range(5):
476                entry = ttk.Entry(frame, width=15, state='normal')
477                entry.grid(row=row + 1, column=col + 1, padx=5, pady=10)
478                self.entries[(row, col)] = entry
479
480    def _update_ui(self) -> None:
481        """
482        根据数据缓冲区更新 UI 界面。
483        """
484        def update() -> None:
485            if len(self._data_buffer) == 0:
486                return
487            ho_state = self._data_buffer[-1]
488            
489            # 清空当前数据
490            for row in range(8):
491                for col in range(5):
492                    self.entries[(row, col)].delete(0, tk.END)
493
494            # 更新数据
495            for row in range(8):
496                align_state = ho_state.get_state(row + 1)
497                self.entries[(row, 0)].insert(0, str(align_state.frame))
498                self.entries[(row, 1)].insert(0, str(align_state.timestamp))
499                self.entries[(row, 2)].insert(0, str(round(align_state.i, 2)))
500                self.entries[(row, 3)].insert(0, str(round(align_state.v, 2)))
501                self.entries[(row, 4)].insert(0, str(round(align_state.p, 2)))
502
503        time_interval = 1.0 / self._ui_frequency
504        while not self._shutdown.is_set():
505            time.sleep(time_interval)
506
507            self.root.after(0, update)
508
509
510    def __del__(self) -> None:
511        """
512        清理资源,停止线程和进程。
513        """
514        self._shutdown.set()
515        self._app_process.join()
516        self._data_thread.join()
517        if self._ui:
518            self._ui_thread.join()
519
520
521class ManualState(Enum):
522    """ 手动状态枚举 """
523    IDLE = 0
524    """ 空闲状态,所有电机停止运动 """
525    RUNNING = 1
526    """ 运行状态 """
527    RETURNING = 2
528    """ 返回状态 """
529    ZEROING = 3
530    """ 设置零点状态 """
531
532class HoManual(Node):
533    """ Ho 机器人的手动控制节点 """
534    from robotengine import InputEvent
535    def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None:
536        """
537        初始化 HoManual 实例。
538
539            :param link: HoLink 实例。
540            :param name: 节点名称。
541            :param rotation_velocity: 旋转速度(度/秒)。
542            :param running_scale: 运行状态的缩放因子。
543            :param zeroing_scale: 设置零点状态的缩放因子。
544            :param axis_threshold: 轴的阈值。
545        """
546        from robotengine import StateMachine
547        super().__init__(name)
548        self._debug = False
549        self._valid = True
550
551        self._link = link
552        self._link.ho_state_update.connect(self._on_ho_state_update)
553        
554        self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine")
555        self.add_child(self.state_machine)
556        
557        self._zero_angles = {
558            4: 0.0,
559            5: 0.0,
560            6: 0.0,
561            7: 0.0,
562        }
563        self._zero_index = 4
564
565        self._is_tension = False
566        self._is_rotation = False
567        self._rotation_velocity = rotation_velocity
568        self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7]
569
570        self._running_scale = running_scale
571        self._zeroing_scale = zeroing_scale
572        self._axis_threshold = axis_threshold
573
574        self._before_returning = None
575
576        self.exit()
577
578    def _init(self):
579        _load_zero_angles = self._load_from_json("zero_angles.json")
580        if _load_zero_angles:
581            info("成功加载 zero_angles.json")
582            for i in range(4, 8):
583                self._zero_angles[i] = _load_zero_angles[str(i)]
584                info(f"{i}: {self._zero_angles[i]}")
585
586    def _input(self, event: InputEvent) -> None:
587        if not self._valid:
588            return
589        
590        state = self.state_machine.current_state
591
592        if event.is_action_pressed("X"):
593            self.tension(not self._is_tension)
594
595        elif event.is_action_pressed("Y"):
596            self.rotation(not self._is_rotation, self._rotation_velocity)
597
598        if state == ManualState.ZEROING:
599            if event.is_action_pressed("LEFT_SHOULDER"):
600                self._change_index(-1)
601
602            elif event.is_action_pressed("RIGHT_SHOULDER"):
603                self._change_index(1)
604
605            elif event.is_action_pressed("A"):
606                if self._debug:
607                    return
608                ho_state = self._link.get_ho_state()
609                if not ho_state:
610                    return
611                for i in range(4, 8):
612                    state = ho_state.get_state(i)
613                    self._zero_angles[i] = state.p
614                self._save_to_json("zero_angles.json", self._zero_angles)
615
616    def _change_index(self, dir: int) -> None:
617        self.lock(self._zero_index)
618        self._zero_index += dir
619        if self._zero_index > 7:
620            self._zero_index = 4
621        elif self._zero_index < 4:
622            self._zero_index = 7
623        info(f"     当前电机: {self._zero_index}")
624
625    def _on_ho_state_update(self, ho_state: HoState):
626        if not self._valid:
627            return
628        
629        state = self.state_machine.current_state
630
631        if state == ManualState.IDLE:
632            pass
633
634        elif state == ManualState.RUNNING:
635            x_value = self._threshold(self.input.get_action_strength("RIGHT_X"))
636            y_value = self._threshold(self.input.get_action_strength("LEFT_Y"))
637            self.turn(x_value, y_value, ho_state)
638
639            l_value = self._threshold(self.input.get_action_strength("TRIGGER_LEFT"))
640            r_value = self._threshold(self.input.get_action_strength("TRIGGER_RIGHT"))
641            self.move(l_value, r_value, ho_state)
642
643        elif state == ManualState.RETURNING:
644            for i in range(4, 8):
645                self._link.update(i, HoMode.P, 2.0, 100.0, self._zero_angles[i])
646
647        elif state == ManualState.ZEROING:
648            direction = self.input.get_action_strength("LEFT_Y")
649            direction = self._threshold(direction)
650            velocity = direction * self._zeroing_scale
651            if not self._debug:
652                self._link.update(self._zero_index, HoMode.V, 2.0, velocity, 0.0)
653
654    def tick(self, state: ManualState, delta: float) -> None:
655        if state == ManualState.IDLE:
656            pass
657
658        elif state == ManualState.RUNNING:
659            pass
660
661        elif state == ManualState.RETURNING:
662            pass
663
664        elif state == ManualState.ZEROING:
665            pass
666
667    def _threshold(self, value: float) -> float:
668        if abs(value) < self._axis_threshold:
669            return 0.0
670        return value
671
672    def get_next_state(self, state: ManualState) -> ManualState:
673        if state == ManualState.IDLE:
674            if self.input.is_action_pressed("START"):
675                self.input.flush_action("START")
676                return ManualState.RUNNING
677            
678            if self.input.is_action_pressed("B"):
679                self.input.flush_action("B")
680                return ManualState.RETURNING
681            
682            elif self.input.is_action_pressed("RIGHT_STICK"):
683                self.input.flush_action("RIGHT_STICK")
684                return ManualState.ZEROING
685
686        elif state == ManualState.RUNNING:
687            if self.input.is_action_pressed("START"):
688                self.input.flush_action("START")
689                return ManualState.IDLE
690            
691            if self.input.is_action_pressed("B"):
692                self.input.flush_action("B")
693                return ManualState.RETURNING
694
695        elif state == ManualState.RETURNING:
696            if self.input.is_action_pressed("B"):
697                self.input.flush_action("B")
698                return self._before_returning
699
700        elif state == ManualState.ZEROING:
701            if self.input.is_action_pressed("RIGHT_STICK"):
702                self.input.flush_action("RIGHT_STICK")
703                return ManualState.IDLE
704
705        return self.state_machine.KEEP_CURRENT
706    
707    def transition_state(self, from_state: ManualState, to_state: ManualState) -> None:
708        print("")
709        info(f"{from_state if from_state is not None else 'START'} -> {to_state}")
710        info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}")
711
712        if from_state == ManualState.IDLE:
713            pass
714
715        elif from_state == ManualState.RUNNING:
716            pass
717
718        elif from_state == ManualState.RETURNING:
719            pass
720
721        elif from_state == ManualState.ZEROING:
722            pass
723
724        info("      Y: 开关旋转")
725        info("      X: 开关张紧")
726        if to_state == ManualState.IDLE:
727            for i in range(1, 9):
728                if i == 2 or i == 3:
729                    continue
730                self.stop(i)
731            info("      START: 进入 RUNNING 状态")
732            info("      B: 进入 RETURNING 状态")
733            info("      RIGHT_STICK: 进入 ZEROING 状态")
734
735        elif to_state == ManualState.RUNNING:
736            for i in range(1, 9):
737                if i == 2 or i == 3:
738                    continue
739                self.lock(i)
740            info("      START: 返回 IDLE 状态")
741            info("      B: 进入 RETURNING 状态")
742    
743        elif to_state == ManualState.RETURNING:
744            self.lock(1)
745            self._before_returning = from_state
746            info("      B: 返回之前的状态")
747
748        elif to_state == ManualState.ZEROING:
749            for i in range(1, 9):
750                if i == 2 or i == 3:
751                    continue
752                self.lock(i)
753            info("      RIGHT_STICK: 返回 IDLE 状态")
754            info("      LEFT_SHOULDER: 切换到上一个电机")
755            info("      RIGHT_SHOULDER: 切换到下一个电机")
756            info("      A: 保存当前位置为零点")
757            info(f"      当前电机: {self._zero_index}")
758    
759    def lock(self, id: int) -> None:
760        """
761        锁定指定的电机。
762
763            :param id: 电机编号
764        """
765        if self._debug:
766            info(f"{self.name} 锁定电机 {id}")
767            return
768        self._link.update(id, HoMode.V, 2.0, 0.0, 0.0)
769
770    def lock_all(self) -> None:
771        """
772        锁定所有的电机。
773        """
774        for i in range(1, 9):
775            self.lock(i)
776
777    def stop(self, id: int) -> None:
778        """
779        停止指定的电机。
780
781            :param id: 电机编号
782        """
783        if self._debug:
784            info(f"{self.name} 停止电机 {id}")
785            return
786        self._link.update(id, HoMode.S, 0.0, 0.0, 0.0)
787
788    def stop_all(self) -> None:
789        """
790        停止所有的电机。
791        """
792        for i in range(1, 9):
793            self.stop(i)
794
795    def tension(self, on: bool, i: float=0.8) -> None:
796        """
797        驱动牵引电机,张紧导管
798
799            :param on: 是否开启牵引
800            :param i: 牵引电流(A)
801        """
802        self._is_tension = on
803        if on:
804            self._link.update(2, HoMode.V, i, -360.0, 0.0)
805            self._link.update(3, HoMode.V, i, 360.0, 0.0)
806        else:
807            self.stop(2)
808            self.stop(3)
809
810    def rotation(self, on: bool, velocity: float = 360.0) -> None:
811        """
812        驱动旋转电机,旋转换能器
813
814            :param on: 是否开启旋转
815            :param velocity: 旋转速度(度/秒)
816        """
817        self._is_rotation = on
818        if on:
819            self._link.update(8, HoMode.V, 2.0, velocity, 0.0)
820        else:
821            self.stop(8)
822
823    def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None:
824        """
825        驱动转向电机,转向导管
826
827            :param x_value: 横向控制值
828            :param y_value: 纵向控制值
829            :param ho_state: Ho 机器人状态
830        """
831        if x_value == 0 and y_value == 0:
832            for i in range(4, 8):
833                self._link.update(i, HoMode.V, 2.0, 0.0, 0.0)
834        else:
835            projection = compute_vector_projection(x_value, y_value, self._base_angles)
836            control_values = [v * self._running_scale for v in projection]
837
838            for i in range(4, 8):
839                if control_values[i-4] > 0:
840                    a_id = i
841                    b_id = (i + 2) % 4 + 4
842                    a_state = ho_state.get_state(a_id)
843                    b_state = ho_state.get_state(b_id)
844                    a_near = near(a_state.p, self._zero_angles[a_id])
845                    b_near = near(b_state.p, self._zero_angles[b_id])
846
847                    if a_near and not b_near:
848                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
849                    elif (not a_near and b_near) or (a_near and b_near):
850                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
851                    elif not a_near and not b_near:
852                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
853                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
854
855    def move(self, l_value: float, r_value: float, ho_state: HoState) -> None:
856        """
857        驱动移动电机,移动导管
858
859            :param l_value: 左侧移动控制值
860            :param r_value: 右侧移动控制值
861            :param ho_state: Ho 机器人状态
862        """
863        if l_value != 0 and r_value != 0:
864            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)
865
866        elif l_value != 0 and r_value== 0:
867            self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0)
868
869        elif r_value != 0 and l_value== 0:
870            self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0)
871
872        else:
873            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)
874
875    def is_running(self) -> bool:
876        """
877        判断当前节点是否处于运行状态。
878
879            :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。
880        """
881        return self._valid
882
883    def enter(self) -> None:
884        """
885        进入节点。
886        """
887        self.state_machine.freeze = False
888        self.state_machine.first_tick = True
889        self.state_machine.current_state = ManualState.IDLE
890        self._is_rotation = False
891        self._is_tension = False
892        self._valid = True
893
894    def exit(self) -> None:
895        """
896        退出节点。
897        """
898        self.state_machine.freeze = True
899        self.state_machine.first_tick = True
900        self.state_machine.current_state = ManualState.IDLE
901        self._valid = False
902        self._is_rotation = False
903        self._is_tension = False
904        self.stop_all()
905
906    def _save_to_json(self, file_name, data):
907        with open(file_name, 'w') as f:
908            json.dump(data, f)
909        info(f"     {self.name} 保存 {file_name} 成功")
910
911    def _load_from_json(self, file_name):
912        if not os.path.exists(file_name):
913            warning(f"{file_name} 不存在,无法读取 zero_angles 将使用 0.0 作为初始值")
914            return None
915        with open(file_name, 'r') as f:
916            return json.load(f)
class HoMode(enum.Enum):
40class HoMode(Enum):
41    """ Ho 电机模态 """
42    S = 0
43    """ 停止 """
44    I = 1
45    """ 电流控制 """
46    V = 2
47    """ 速度控制 """
48    P = 3
49    """ 位置控制 """

Ho 电机模态

S = <HoMode.S: 0>

停止

I = <HoMode.I: 1>

电流控制

V = <HoMode.V: 2>

速度控制

P = <HoMode.P: 3>

位置控制

Inherited Members
enum.Enum
name
value
class AlignState:
51class AlignState:
52    """ 帧和时间戳对齐的状态数据 """
53    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None:
54        """ 
55        初始化对齐状态数据
56
57            :param id: 电机 id
58            :param i: 电流
59            :param v: 速度
60            :param p: 位置
61            :param frame: 当前帧
62            :param timestamp: 当前时间戳 
63        """
64        self.id = id
65        """ 电机 id """
66        self.i: float = i
67        """ 电流 """
68        self.v: float = v
69        """ 速度 """
70        self.p: float = p
71        """ 位置 """
72        self.frame = frame
73        """ 此状态数据对应的帧 """
74        self.timestamp = timestamp
75        """ 此状态数据对应的时间戳 """
76
77    def to_dict(self):
78        """ 转换为字典 """
79        return {
80            "id": self.id,
81            "i": self.i,
82            "v": self.v,
83            "p": self.p,
84            "frame": self.frame,
85            "timestamp": self.timestamp
86        }
87
88    def __repr__(self):
89        return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})"

帧和时间戳对齐的状态数据

AlignState(id: int, i: float, v: float, p: float, frame: int, timestamp: float)
53    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None:
54        """ 
55        初始化对齐状态数据
56
57            :param id: 电机 id
58            :param i: 电流
59            :param v: 速度
60            :param p: 位置
61            :param frame: 当前帧
62            :param timestamp: 当前时间戳 
63        """
64        self.id = id
65        """ 电机 id """
66        self.i: float = i
67        """ 电流 """
68        self.v: float = v
69        """ 速度 """
70        self.p: float = p
71        """ 位置 """
72        self.frame = frame
73        """ 此状态数据对应的帧 """
74        self.timestamp = timestamp
75        """ 此状态数据对应的时间戳 """

初始化对齐状态数据

:param id: 电机 id
:param i: 电流
:param v: 速度
:param p: 位置
:param frame: 当前帧
:param timestamp: 当前时间戳
id

电机 id

i: float

电流

v: float

速度

p: float

位置

frame

此状态数据对应的帧

timestamp

此状态数据对应的时间戳

def to_dict(self):
77    def to_dict(self):
78        """ 转换为字典 """
79        return {
80            "id": self.id,
81            "i": self.i,
82            "v": self.v,
83            "p": self.p,
84            "frame": self.frame,
85            "timestamp": self.timestamp
86        }

转换为字典

class HoState:
 91class HoState:
 92    """ Ho 机器人状态 """
 93    def __init__(self, states: List[AlignState], random_state=False) -> None:
 94        """ 
 95        初始化 Ho 机器人状态
 96
 97            :param states: 帧和时间戳对齐的状态数据列表
 98            :param random_state: 是否随机生成状态数据
 99        """
100        if not random_state:
101            self._states = states
102        else:
103            self._states = []
104            for i in range(1, 9):
105                self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0))
106
107    def get_state(self, id: int) -> AlignState:
108        """ 
109        获取指定 id 的状态 
110        """
111        for state in self._states:
112            if state.id == id:
113                return state
114        return None
115    
116    def get_states(self) -> List[AlignState]:
117        """ 
118        获取所有状态 
119        """
120        return copy.deepcopy(self._states)
121    
122    def to_dict(self):
123        """ 
124        转换为字典 
125        """
126        return {
127            "states": [state.to_dict() for state in self._states]
128        }
129    
130    def __repr__(self):
131        state_str = ""
132        for state in self._states:
133            state_str += str(state)
134            if state != self._states[-1]:
135                state_str += "\n"
136        return f"HoState(\n{state_str})"

Ho 机器人状态

HoState(states: List[AlignState], random_state=False)
 93    def __init__(self, states: List[AlignState], random_state=False) -> None:
 94        """ 
 95        初始化 Ho 机器人状态
 96
 97            :param states: 帧和时间戳对齐的状态数据列表
 98            :param random_state: 是否随机生成状态数据
 99        """
100        if not random_state:
101            self._states = states
102        else:
103            self._states = []
104            for i in range(1, 9):
105                self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0))

初始化 Ho 机器人状态

:param states: 帧和时间戳对齐的状态数据列表
:param random_state: 是否随机生成状态数据
def get_state(self, id: int) -> AlignState:
107    def get_state(self, id: int) -> AlignState:
108        """ 
109        获取指定 id 的状态 
110        """
111        for state in self._states:
112            if state.id == id:
113                return state
114        return None

获取指定 id 的状态

def get_states(self) -> List[AlignState]:
116    def get_states(self) -> List[AlignState]:
117        """ 
118        获取所有状态 
119        """
120        return copy.deepcopy(self._states)

获取所有状态

def to_dict(self):
122    def to_dict(self):
123        """ 
124        转换为字典 
125        """
126        return {
127            "states": [state.to_dict() for state in self._states]
128        }

转换为字典

class HoServer:
307class HoServer:
308    def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None:
309        """
310        初始化 HoServer 实例。
311
312            :param url: 服务器的 URL。
313            :param capacity: 数据缓冲区的最大容量。
314            :param ui: 是否启用 UI 界面。
315            :param ui_frequency: UI 更新频率(Hz)。
316        """
317        self._urls = url
318        parsed_url = urlparse(url)
319        self._host = parsed_url.hostname
320        self._port = parsed_url.port
321        self._path = parsed_url.path
322
323        self._ui = ui
324        self._ui_frequency = ui_frequency
325        self._capacity = capacity
326        self._data_buffer = []
327        """ 
328        数据缓冲区 
329        """
330
331        self._data_queue = multiprocessing.Queue()
332        self._shutdown = multiprocessing.Event()
333
334        # 启动 FastAPI 应用进程
335        self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True)
336
337    def _update_data(self):
338        """
339        从数据队列中读取数据并更新缓冲区。
340        """
341        while not self._shutdown.is_set():
342            if not self._data_queue.empty():
343                ho_state = self._data_queue.get()
344                self._data_buffer.append(ho_state)
345                if len(self._data_buffer) > self._capacity:
346                    self._data_buffer.pop(0)
347
348    def has_data(self) -> bool:
349        """
350        检查缓冲区中是否有数据。
351
352            :return: 如果缓冲区中有数据,则返回 True,否则返回 False。
353        """
354        return len(self._data_buffer) > 0
355
356    def get_data(self) -> HoState:
357        """
358        获取缓冲区中最新的数据。
359
360            :return: 缓冲区中最老的数据,如果缓冲区为空,则返回 None。
361        """
362        if not self.has_data():
363            return None
364        return self._data_buffer.pop(0)
365    
366    def get_data_buffer(self) -> List[HoState]:
367        """
368        获取缓冲区。
369
370        注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失
371
372            :return: 缓冲区。
373        """
374        return copy.deepcopy(self._data_buffer)
375    
376    def flush(self) -> None:
377        """
378        清空缓冲区。
379        """
380        self._data_buffer.clear()
381    
382    def length(self) -> int:
383        """
384        获取缓冲区中的数据长度。
385
386            :return: 缓冲区中的数据长度。
387        """
388        return len(self._data_buffer)
389
390    def _init_ui(self) -> None:
391        """
392        初始化 UI。
393        """
394        self.root = tk.Tk()
395        self.root.title("HoServer")
396        self.root.geometry("800x600")
397
398    def run(self) -> None:
399        """
400        启动服务器并运行 UI 更新线程(如果启用 UI)。
401        """
402        self._app_process.start()
403
404        # 数据更新线程
405        self._data_thread = threading.Thread(target=self._update_data, daemon=True)
406        self._data_thread.start()
407
408        if self._ui:
409            self._init_ui()
410            # UI 更新线程
411            self._ui_thread = threading.Thread(target=self._update_ui, daemon=True)
412            self._ui_thread.start()
413
414            self.root.mainloop()
415
416    def _run_app(self, path: str, host: str, port: int) -> None:
417        """
418        启动 FastAPI 服务器并监听请求。
419
420            :param path: API 路径。
421            :param host: 服务器主机。
422            :param port: 服务器端口。
423        """
424        app = FastAPI()
425        app.add_api_route(path, self._handle_data, methods=["POST"])
426
427        uvicorn.run(app, host=host, port=port)
428
429    async def _handle_data(self, request: Request) -> dict:
430        """
431        处理接收到的 POST 请求数据。
432
433            :param request: FastAPI 请求对象。
434            :return: 处理结果。
435        """
436        json_data = await request.json()
437        states_data = json_data.get("states", [])
438
439        states = []
440        for state_data in states_data:
441            state = AlignState(
442                id=state_data["id"],
443                i=state_data["i"],
444                v=state_data["v"],
445                p=state_data["p"],
446                frame=state_data["frame"],
447                timestamp=state_data["timestamp"]
448            )
449            states.append(state)
450        
451        ho_state = HoState(states=states)
452        self._data_queue.put(ho_state)
453        return {"message": "Data received"}
454
455    def _init_ui(self) -> None:
456        """
457        初始化 UI 界面。
458        """
459        self.root = ttkb.Window(themename="superhero", title="HoServer")
460
461        frame = ttk.Frame(self.root)
462        frame.pack(padx=10, pady=10)
463
464        columns = ['Id', 'Frame', 'Timestamp', 'i', 'v', 'p']
465        self.entries = {}
466
467        # 创建表头
468        for col, column_name in enumerate(columns):
469            label = ttk.Label(frame, text=column_name, width=5)
470            label.grid(row=0, column=col, padx=5, pady=5)
471
472        # 创建数据输入框
473        for row in range(8):
474            id_label = ttk.Label(frame, text=f"{row + 1}", width=5)
475            id_label.grid(row=row + 1, column=0, padx=5, pady=5)
476            for col in range(5):
477                entry = ttk.Entry(frame, width=15, state='normal')
478                entry.grid(row=row + 1, column=col + 1, padx=5, pady=10)
479                self.entries[(row, col)] = entry
480
481    def _update_ui(self) -> None:
482        """
483        根据数据缓冲区更新 UI 界面。
484        """
485        def update() -> None:
486            if len(self._data_buffer) == 0:
487                return
488            ho_state = self._data_buffer[-1]
489            
490            # 清空当前数据
491            for row in range(8):
492                for col in range(5):
493                    self.entries[(row, col)].delete(0, tk.END)
494
495            # 更新数据
496            for row in range(8):
497                align_state = ho_state.get_state(row + 1)
498                self.entries[(row, 0)].insert(0, str(align_state.frame))
499                self.entries[(row, 1)].insert(0, str(align_state.timestamp))
500                self.entries[(row, 2)].insert(0, str(round(align_state.i, 2)))
501                self.entries[(row, 3)].insert(0, str(round(align_state.v, 2)))
502                self.entries[(row, 4)].insert(0, str(round(align_state.p, 2)))
503
504        time_interval = 1.0 / self._ui_frequency
505        while not self._shutdown.is_set():
506            time.sleep(time_interval)
507
508            self.root.after(0, update)
509
510
511    def __del__(self) -> None:
512        """
513        清理资源,停止线程和进程。
514        """
515        self._shutdown.set()
516        self._app_process.join()
517        self._data_thread.join()
518        if self._ui:
519            self._ui_thread.join()
HoServer(url: str, capacity=1024, ui: bool = True, ui_frequency: float = 30.0)
308    def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None:
309        """
310        初始化 HoServer 实例。
311
312            :param url: 服务器的 URL。
313            :param capacity: 数据缓冲区的最大容量。
314            :param ui: 是否启用 UI 界面。
315            :param ui_frequency: UI 更新频率(Hz)。
316        """
317        self._urls = url
318        parsed_url = urlparse(url)
319        self._host = parsed_url.hostname
320        self._port = parsed_url.port
321        self._path = parsed_url.path
322
323        self._ui = ui
324        self._ui_frequency = ui_frequency
325        self._capacity = capacity
326        self._data_buffer = []
327        """ 
328        数据缓冲区 
329        """
330
331        self._data_queue = multiprocessing.Queue()
332        self._shutdown = multiprocessing.Event()
333
334        # 启动 FastAPI 应用进程
335        self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True)

初始化 HoServer 实例。

:param url: 服务器的 URL。
:param capacity: 数据缓冲区的最大容量。
:param ui: 是否启用 UI 界面。
:param ui_frequency: UI 更新频率(Hz)。
def has_data(self) -> bool:
348    def has_data(self) -> bool:
349        """
350        检查缓冲区中是否有数据。
351
352            :return: 如果缓冲区中有数据,则返回 True,否则返回 False。
353        """
354        return len(self._data_buffer) > 0

检查缓冲区中是否有数据。

:return: 如果缓冲区中有数据,则返回 True,否则返回 False。
def get_data(self) -> HoState:
356    def get_data(self) -> HoState:
357        """
358        获取缓冲区中最新的数据。
359
360            :return: 缓冲区中最老的数据,如果缓冲区为空,则返回 None。
361        """
362        if not self.has_data():
363            return None
364        return self._data_buffer.pop(0)

获取缓冲区中最新的数据。

:return: 缓冲区中最老的数据,如果缓冲区为空,则返回 None。
def get_data_buffer(self) -> List[HoState]:
366    def get_data_buffer(self) -> List[HoState]:
367        """
368        获取缓冲区。
369
370        注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失
371
372            :return: 缓冲区。
373        """
374        return copy.deepcopy(self._data_buffer)

获取缓冲区。

注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失

:return: 缓冲区。
def flush(self) -> None:
376    def flush(self) -> None:
377        """
378        清空缓冲区。
379        """
380        self._data_buffer.clear()

清空缓冲区。

def length(self) -> int:
382    def length(self) -> int:
383        """
384        获取缓冲区中的数据长度。
385
386            :return: 缓冲区中的数据长度。
387        """
388        return len(self._data_buffer)

获取缓冲区中的数据长度。

:return: 缓冲区中的数据长度。
def run(self) -> None:
398    def run(self) -> None:
399        """
400        启动服务器并运行 UI 更新线程(如果启用 UI)。
401        """
402        self._app_process.start()
403
404        # 数据更新线程
405        self._data_thread = threading.Thread(target=self._update_data, daemon=True)
406        self._data_thread.start()
407
408        if self._ui:
409            self._init_ui()
410            # UI 更新线程
411            self._ui_thread = threading.Thread(target=self._update_ui, daemon=True)
412            self._ui_thread.start()
413
414            self.root.mainloop()

启动服务器并运行 UI 更新线程(如果启用 UI)。

class ManualState(enum.Enum):
522class ManualState(Enum):
523    """ 手动状态枚举 """
524    IDLE = 0
525    """ 空闲状态,所有电机停止运动 """
526    RUNNING = 1
527    """ 运行状态 """
528    RETURNING = 2
529    """ 返回状态 """
530    ZEROING = 3
531    """ 设置零点状态 """

手动状态枚举

IDLE = <ManualState.IDLE: 0>

空闲状态,所有电机停止运动

RUNNING = <ManualState.RUNNING: 1>

运行状态

RETURNING = <ManualState.RETURNING: 2>

返回状态

ZEROING = <ManualState.ZEROING: 3>

设置零点状态

Inherited Members
enum.Enum
name
value
class HoManual(robotengine.node.Node):
533class HoManual(Node):
534    """ Ho 机器人的手动控制节点 """
535    from robotengine import InputEvent
536    def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None:
537        """
538        初始化 HoManual 实例。
539
540            :param link: HoLink 实例。
541            :param name: 节点名称。
542            :param rotation_velocity: 旋转速度(度/秒)。
543            :param running_scale: 运行状态的缩放因子。
544            :param zeroing_scale: 设置零点状态的缩放因子。
545            :param axis_threshold: 轴的阈值。
546        """
547        from robotengine import StateMachine
548        super().__init__(name)
549        self._debug = False
550        self._valid = True
551
552        self._link = link
553        self._link.ho_state_update.connect(self._on_ho_state_update)
554        
555        self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine")
556        self.add_child(self.state_machine)
557        
558        self._zero_angles = {
559            4: 0.0,
560            5: 0.0,
561            6: 0.0,
562            7: 0.0,
563        }
564        self._zero_index = 4
565
566        self._is_tension = False
567        self._is_rotation = False
568        self._rotation_velocity = rotation_velocity
569        self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7]
570
571        self._running_scale = running_scale
572        self._zeroing_scale = zeroing_scale
573        self._axis_threshold = axis_threshold
574
575        self._before_returning = None
576
577        self.exit()
578
579    def _init(self):
580        _load_zero_angles = self._load_from_json("zero_angles.json")
581        if _load_zero_angles:
582            info("成功加载 zero_angles.json")
583            for i in range(4, 8):
584                self._zero_angles[i] = _load_zero_angles[str(i)]
585                info(f"{i}: {self._zero_angles[i]}")
586
587    def _input(self, event: InputEvent) -> None:
588        if not self._valid:
589            return
590        
591        state = self.state_machine.current_state
592
593        if event.is_action_pressed("X"):
594            self.tension(not self._is_tension)
595
596        elif event.is_action_pressed("Y"):
597            self.rotation(not self._is_rotation, self._rotation_velocity)
598
599        if state == ManualState.ZEROING:
600            if event.is_action_pressed("LEFT_SHOULDER"):
601                self._change_index(-1)
602
603            elif event.is_action_pressed("RIGHT_SHOULDER"):
604                self._change_index(1)
605
606            elif event.is_action_pressed("A"):
607                if self._debug:
608                    return
609                ho_state = self._link.get_ho_state()
610                if not ho_state:
611                    return
612                for i in range(4, 8):
613                    state = ho_state.get_state(i)
614                    self._zero_angles[i] = state.p
615                self._save_to_json("zero_angles.json", self._zero_angles)
616
617    def _change_index(self, dir: int) -> None:
618        self.lock(self._zero_index)
619        self._zero_index += dir
620        if self._zero_index > 7:
621            self._zero_index = 4
622        elif self._zero_index < 4:
623            self._zero_index = 7
624        info(f"     当前电机: {self._zero_index}")
625
626    def _on_ho_state_update(self, ho_state: HoState):
627        if not self._valid:
628            return
629        
630        state = self.state_machine.current_state
631
632        if state == ManualState.IDLE:
633            pass
634
635        elif state == ManualState.RUNNING:
636            x_value = self._threshold(self.input.get_action_strength("RIGHT_X"))
637            y_value = self._threshold(self.input.get_action_strength("LEFT_Y"))
638            self.turn(x_value, y_value, ho_state)
639
640            l_value = self._threshold(self.input.get_action_strength("TRIGGER_LEFT"))
641            r_value = self._threshold(self.input.get_action_strength("TRIGGER_RIGHT"))
642            self.move(l_value, r_value, ho_state)
643
644        elif state == ManualState.RETURNING:
645            for i in range(4, 8):
646                self._link.update(i, HoMode.P, 2.0, 100.0, self._zero_angles[i])
647
648        elif state == ManualState.ZEROING:
649            direction = self.input.get_action_strength("LEFT_Y")
650            direction = self._threshold(direction)
651            velocity = direction * self._zeroing_scale
652            if not self._debug:
653                self._link.update(self._zero_index, HoMode.V, 2.0, velocity, 0.0)
654
655    def tick(self, state: ManualState, delta: float) -> None:
656        if state == ManualState.IDLE:
657            pass
658
659        elif state == ManualState.RUNNING:
660            pass
661
662        elif state == ManualState.RETURNING:
663            pass
664
665        elif state == ManualState.ZEROING:
666            pass
667
668    def _threshold(self, value: float) -> float:
669        if abs(value) < self._axis_threshold:
670            return 0.0
671        return value
672
673    def get_next_state(self, state: ManualState) -> ManualState:
674        if state == ManualState.IDLE:
675            if self.input.is_action_pressed("START"):
676                self.input.flush_action("START")
677                return ManualState.RUNNING
678            
679            if self.input.is_action_pressed("B"):
680                self.input.flush_action("B")
681                return ManualState.RETURNING
682            
683            elif self.input.is_action_pressed("RIGHT_STICK"):
684                self.input.flush_action("RIGHT_STICK")
685                return ManualState.ZEROING
686
687        elif state == ManualState.RUNNING:
688            if self.input.is_action_pressed("START"):
689                self.input.flush_action("START")
690                return ManualState.IDLE
691            
692            if self.input.is_action_pressed("B"):
693                self.input.flush_action("B")
694                return ManualState.RETURNING
695
696        elif state == ManualState.RETURNING:
697            if self.input.is_action_pressed("B"):
698                self.input.flush_action("B")
699                return self._before_returning
700
701        elif state == ManualState.ZEROING:
702            if self.input.is_action_pressed("RIGHT_STICK"):
703                self.input.flush_action("RIGHT_STICK")
704                return ManualState.IDLE
705
706        return self.state_machine.KEEP_CURRENT
707    
708    def transition_state(self, from_state: ManualState, to_state: ManualState) -> None:
709        print("")
710        info(f"{from_state if from_state is not None else 'START'} -> {to_state}")
711        info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}")
712
713        if from_state == ManualState.IDLE:
714            pass
715
716        elif from_state == ManualState.RUNNING:
717            pass
718
719        elif from_state == ManualState.RETURNING:
720            pass
721
722        elif from_state == ManualState.ZEROING:
723            pass
724
725        info("      Y: 开关旋转")
726        info("      X: 开关张紧")
727        if to_state == ManualState.IDLE:
728            for i in range(1, 9):
729                if i == 2 or i == 3:
730                    continue
731                self.stop(i)
732            info("      START: 进入 RUNNING 状态")
733            info("      B: 进入 RETURNING 状态")
734            info("      RIGHT_STICK: 进入 ZEROING 状态")
735
736        elif to_state == ManualState.RUNNING:
737            for i in range(1, 9):
738                if i == 2 or i == 3:
739                    continue
740                self.lock(i)
741            info("      START: 返回 IDLE 状态")
742            info("      B: 进入 RETURNING 状态")
743    
744        elif to_state == ManualState.RETURNING:
745            self.lock(1)
746            self._before_returning = from_state
747            info("      B: 返回之前的状态")
748
749        elif to_state == ManualState.ZEROING:
750            for i in range(1, 9):
751                if i == 2 or i == 3:
752                    continue
753                self.lock(i)
754            info("      RIGHT_STICK: 返回 IDLE 状态")
755            info("      LEFT_SHOULDER: 切换到上一个电机")
756            info("      RIGHT_SHOULDER: 切换到下一个电机")
757            info("      A: 保存当前位置为零点")
758            info(f"      当前电机: {self._zero_index}")
759    
760    def lock(self, id: int) -> None:
761        """
762        锁定指定的电机。
763
764            :param id: 电机编号
765        """
766        if self._debug:
767            info(f"{self.name} 锁定电机 {id}")
768            return
769        self._link.update(id, HoMode.V, 2.0, 0.0, 0.0)
770
771    def lock_all(self) -> None:
772        """
773        锁定所有的电机。
774        """
775        for i in range(1, 9):
776            self.lock(i)
777
778    def stop(self, id: int) -> None:
779        """
780        停止指定的电机。
781
782            :param id: 电机编号
783        """
784        if self._debug:
785            info(f"{self.name} 停止电机 {id}")
786            return
787        self._link.update(id, HoMode.S, 0.0, 0.0, 0.0)
788
789    def stop_all(self) -> None:
790        """
791        停止所有的电机。
792        """
793        for i in range(1, 9):
794            self.stop(i)
795
796    def tension(self, on: bool, i: float=0.8) -> None:
797        """
798        驱动牵引电机,张紧导管
799
800            :param on: 是否开启牵引
801            :param i: 牵引电流(A)
802        """
803        self._is_tension = on
804        if on:
805            self._link.update(2, HoMode.V, i, -360.0, 0.0)
806            self._link.update(3, HoMode.V, i, 360.0, 0.0)
807        else:
808            self.stop(2)
809            self.stop(3)
810
811    def rotation(self, on: bool, velocity: float = 360.0) -> None:
812        """
813        驱动旋转电机,旋转换能器
814
815            :param on: 是否开启旋转
816            :param velocity: 旋转速度(度/秒)
817        """
818        self._is_rotation = on
819        if on:
820            self._link.update(8, HoMode.V, 2.0, velocity, 0.0)
821        else:
822            self.stop(8)
823
824    def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None:
825        """
826        驱动转向电机,转向导管
827
828            :param x_value: 横向控制值
829            :param y_value: 纵向控制值
830            :param ho_state: Ho 机器人状态
831        """
832        if x_value == 0 and y_value == 0:
833            for i in range(4, 8):
834                self._link.update(i, HoMode.V, 2.0, 0.0, 0.0)
835        else:
836            projection = compute_vector_projection(x_value, y_value, self._base_angles)
837            control_values = [v * self._running_scale for v in projection]
838
839            for i in range(4, 8):
840                if control_values[i-4] > 0:
841                    a_id = i
842                    b_id = (i + 2) % 4 + 4
843                    a_state = ho_state.get_state(a_id)
844                    b_state = ho_state.get_state(b_id)
845                    a_near = near(a_state.p, self._zero_angles[a_id])
846                    b_near = near(b_state.p, self._zero_angles[b_id])
847
848                    if a_near and not b_near:
849                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
850                    elif (not a_near and b_near) or (a_near and b_near):
851                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
852                    elif not a_near and not b_near:
853                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
854                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
855
856    def move(self, l_value: float, r_value: float, ho_state: HoState) -> None:
857        """
858        驱动移动电机,移动导管
859
860            :param l_value: 左侧移动控制值
861            :param r_value: 右侧移动控制值
862            :param ho_state: Ho 机器人状态
863        """
864        if l_value != 0 and r_value != 0:
865            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)
866
867        elif l_value != 0 and r_value== 0:
868            self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0)
869
870        elif r_value != 0 and l_value== 0:
871            self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0)
872
873        else:
874            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)
875
876    def is_running(self) -> bool:
877        """
878        判断当前节点是否处于运行状态。
879
880            :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。
881        """
882        return self._valid
883
884    def enter(self) -> None:
885        """
886        进入节点。
887        """
888        self.state_machine.freeze = False
889        self.state_machine.first_tick = True
890        self.state_machine.current_state = ManualState.IDLE
891        self._is_rotation = False
892        self._is_tension = False
893        self._valid = True
894
895    def exit(self) -> None:
896        """
897        退出节点。
898        """
899        self.state_machine.freeze = True
900        self.state_machine.first_tick = True
901        self.state_machine.current_state = ManualState.IDLE
902        self._valid = False
903        self._is_rotation = False
904        self._is_tension = False
905        self.stop_all()
906
907    def _save_to_json(self, file_name, data):
908        with open(file_name, 'w') as f:
909            json.dump(data, f)
910        info(f"     {self.name} 保存 {file_name} 成功")
911
912    def _load_from_json(self, file_name):
913        if not os.path.exists(file_name):
914            warning(f"{file_name} 不存在,无法读取 zero_angles 将使用 0.0 作为初始值")
915            return None
916        with open(file_name, 'r') as f:
917            return json.load(f)

Ho 机器人的手动控制节点

HoManual( link: HoLink, name='Manual', rotation_velocity: float = 360.0, running_scale: float = 100.0, zeroing_scale: float = 100.0, axis_threshold: float = 0.1)
536    def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None:
537        """
538        初始化 HoManual 实例。
539
540            :param link: HoLink 实例。
541            :param name: 节点名称。
542            :param rotation_velocity: 旋转速度(度/秒)。
543            :param running_scale: 运行状态的缩放因子。
544            :param zeroing_scale: 设置零点状态的缩放因子。
545            :param axis_threshold: 轴的阈值。
546        """
547        from robotengine import StateMachine
548        super().__init__(name)
549        self._debug = False
550        self._valid = True
551
552        self._link = link
553        self._link.ho_state_update.connect(self._on_ho_state_update)
554        
555        self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine")
556        self.add_child(self.state_machine)
557        
558        self._zero_angles = {
559            4: 0.0,
560            5: 0.0,
561            6: 0.0,
562            7: 0.0,
563        }
564        self._zero_index = 4
565
566        self._is_tension = False
567        self._is_rotation = False
568        self._rotation_velocity = rotation_velocity
569        self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7]
570
571        self._running_scale = running_scale
572        self._zeroing_scale = zeroing_scale
573        self._axis_threshold = axis_threshold
574
575        self._before_returning = None
576
577        self.exit()

初始化 HoManual 实例。

:param link: HoLink 实例。
:param name: 节点名称。
:param rotation_velocity: 旋转速度(度/秒)。
:param running_scale: 运行状态的缩放因子。
:param zeroing_scale: 设置零点状态的缩放因子。
:param axis_threshold: 轴的阈值。
state_machine
def tick(self, state: ManualState, delta: float) -> None:
655    def tick(self, state: ManualState, delta: float) -> None:
656        if state == ManualState.IDLE:
657            pass
658
659        elif state == ManualState.RUNNING:
660            pass
661
662        elif state == ManualState.RETURNING:
663            pass
664
665        elif state == ManualState.ZEROING:
666            pass
def get_next_state( self, state: ManualState) -> ManualState:
673    def get_next_state(self, state: ManualState) -> ManualState:
674        if state == ManualState.IDLE:
675            if self.input.is_action_pressed("START"):
676                self.input.flush_action("START")
677                return ManualState.RUNNING
678            
679            if self.input.is_action_pressed("B"):
680                self.input.flush_action("B")
681                return ManualState.RETURNING
682            
683            elif self.input.is_action_pressed("RIGHT_STICK"):
684                self.input.flush_action("RIGHT_STICK")
685                return ManualState.ZEROING
686
687        elif state == ManualState.RUNNING:
688            if self.input.is_action_pressed("START"):
689                self.input.flush_action("START")
690                return ManualState.IDLE
691            
692            if self.input.is_action_pressed("B"):
693                self.input.flush_action("B")
694                return ManualState.RETURNING
695
696        elif state == ManualState.RETURNING:
697            if self.input.is_action_pressed("B"):
698                self.input.flush_action("B")
699                return self._before_returning
700
701        elif state == ManualState.ZEROING:
702            if self.input.is_action_pressed("RIGHT_STICK"):
703                self.input.flush_action("RIGHT_STICK")
704                return ManualState.IDLE
705
706        return self.state_machine.KEEP_CURRENT
def transition_state( self, from_state: ManualState, to_state: ManualState) -> None:
708    def transition_state(self, from_state: ManualState, to_state: ManualState) -> None:
709        print("")
710        info(f"{from_state if from_state is not None else 'START'} -> {to_state}")
711        info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}")
712
713        if from_state == ManualState.IDLE:
714            pass
715
716        elif from_state == ManualState.RUNNING:
717            pass
718
719        elif from_state == ManualState.RETURNING:
720            pass
721
722        elif from_state == ManualState.ZEROING:
723            pass
724
725        info("      Y: 开关旋转")
726        info("      X: 开关张紧")
727        if to_state == ManualState.IDLE:
728            for i in range(1, 9):
729                if i == 2 or i == 3:
730                    continue
731                self.stop(i)
732            info("      START: 进入 RUNNING 状态")
733            info("      B: 进入 RETURNING 状态")
734            info("      RIGHT_STICK: 进入 ZEROING 状态")
735
736        elif to_state == ManualState.RUNNING:
737            for i in range(1, 9):
738                if i == 2 or i == 3:
739                    continue
740                self.lock(i)
741            info("      START: 返回 IDLE 状态")
742            info("      B: 进入 RETURNING 状态")
743    
744        elif to_state == ManualState.RETURNING:
745            self.lock(1)
746            self._before_returning = from_state
747            info("      B: 返回之前的状态")
748
749        elif to_state == ManualState.ZEROING:
750            for i in range(1, 9):
751                if i == 2 or i == 3:
752                    continue
753                self.lock(i)
754            info("      RIGHT_STICK: 返回 IDLE 状态")
755            info("      LEFT_SHOULDER: 切换到上一个电机")
756            info("      RIGHT_SHOULDER: 切换到下一个电机")
757            info("      A: 保存当前位置为零点")
758            info(f"      当前电机: {self._zero_index}")
def lock(self, id: int) -> None:
760    def lock(self, id: int) -> None:
761        """
762        锁定指定的电机。
763
764            :param id: 电机编号
765        """
766        if self._debug:
767            info(f"{self.name} 锁定电机 {id}")
768            return
769        self._link.update(id, HoMode.V, 2.0, 0.0, 0.0)

锁定指定的电机。

:param id: 电机编号
def lock_all(self) -> None:
771    def lock_all(self) -> None:
772        """
773        锁定所有的电机。
774        """
775        for i in range(1, 9):
776            self.lock(i)

锁定所有的电机。

def stop(self, id: int) -> None:
778    def stop(self, id: int) -> None:
779        """
780        停止指定的电机。
781
782            :param id: 电机编号
783        """
784        if self._debug:
785            info(f"{self.name} 停止电机 {id}")
786            return
787        self._link.update(id, HoMode.S, 0.0, 0.0, 0.0)

停止指定的电机。

:param id: 电机编号
def stop_all(self) -> None:
789    def stop_all(self) -> None:
790        """
791        停止所有的电机。
792        """
793        for i in range(1, 9):
794            self.stop(i)

停止所有的电机。

def tension(self, on: bool, i: float = 0.8) -> None:
796    def tension(self, on: bool, i: float=0.8) -> None:
797        """
798        驱动牵引电机,张紧导管
799
800            :param on: 是否开启牵引
801            :param i: 牵引电流(A)
802        """
803        self._is_tension = on
804        if on:
805            self._link.update(2, HoMode.V, i, -360.0, 0.0)
806            self._link.update(3, HoMode.V, i, 360.0, 0.0)
807        else:
808            self.stop(2)
809            self.stop(3)

驱动牵引电机,张紧导管

:param on: 是否开启牵引
:param i: 牵引电流(A)
def rotation(self, on: bool, velocity: float = 360.0) -> None:
811    def rotation(self, on: bool, velocity: float = 360.0) -> None:
812        """
813        驱动旋转电机,旋转换能器
814
815            :param on: 是否开启旋转
816            :param velocity: 旋转速度(度/秒)
817        """
818        self._is_rotation = on
819        if on:
820            self._link.update(8, HoMode.V, 2.0, velocity, 0.0)
821        else:
822            self.stop(8)

驱动旋转电机,旋转换能器

:param on: 是否开启旋转
:param velocity: 旋转速度(度/秒)
def turn( self, x_value: float, y_value: float, ho_state: HoState) -> None:
824    def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None:
825        """
826        驱动转向电机,转向导管
827
828            :param x_value: 横向控制值
829            :param y_value: 纵向控制值
830            :param ho_state: Ho 机器人状态
831        """
832        if x_value == 0 and y_value == 0:
833            for i in range(4, 8):
834                self._link.update(i, HoMode.V, 2.0, 0.0, 0.0)
835        else:
836            projection = compute_vector_projection(x_value, y_value, self._base_angles)
837            control_values = [v * self._running_scale for v in projection]
838
839            for i in range(4, 8):
840                if control_values[i-4] > 0:
841                    a_id = i
842                    b_id = (i + 2) % 4 + 4
843                    a_state = ho_state.get_state(a_id)
844                    b_state = ho_state.get_state(b_id)
845                    a_near = near(a_state.p, self._zero_angles[a_id])
846                    b_near = near(b_state.p, self._zero_angles[b_id])
847
848                    if a_near and not b_near:
849                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
850                    elif (not a_near and b_near) or (a_near and b_near):
851                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
852                    elif not a_near and not b_near:
853                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
854                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])

驱动转向电机,转向导管

:param x_value: 横向控制值
:param y_value: 纵向控制值
:param ho_state: Ho 机器人状态
def move( self, l_value: float, r_value: float, ho_state: HoState) -> None:
856    def move(self, l_value: float, r_value: float, ho_state: HoState) -> None:
857        """
858        驱动移动电机,移动导管
859
860            :param l_value: 左侧移动控制值
861            :param r_value: 右侧移动控制值
862            :param ho_state: Ho 机器人状态
863        """
864        if l_value != 0 and r_value != 0:
865            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)
866
867        elif l_value != 0 and r_value== 0:
868            self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0)
869
870        elif r_value != 0 and l_value== 0:
871            self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0)
872
873        else:
874            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)

驱动移动电机,移动导管

:param l_value: 左侧移动控制值
:param r_value: 右侧移动控制值
:param ho_state: Ho 机器人状态
def is_running(self) -> bool:
876    def is_running(self) -> bool:
877        """
878        判断当前节点是否处于运行状态。
879
880            :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。
881        """
882        return self._valid

判断当前节点是否处于运行状态。

:return: 如果当前节点处于运行状态,则返回 True,否则返回 False。
def enter(self) -> None:
884    def enter(self) -> None:
885        """
886        进入节点。
887        """
888        self.state_machine.freeze = False
889        self.state_machine.first_tick = True
890        self.state_machine.current_state = ManualState.IDLE
891        self._is_rotation = False
892        self._is_tension = False
893        self._valid = True

进入节点。

def exit(self) -> None:
895    def exit(self) -> None:
896        """
897        退出节点。
898        """
899        self.state_machine.freeze = True
900        self.state_machine.first_tick = True
901        self.state_machine.current_state = ManualState.IDLE
902        self._valid = False
903        self._is_rotation = False
904        self._is_tension = False
905        self.stop_all()

退出节点。

class HoManual.InputEvent:
139class InputEvent:
140    """ 输入事件基类 """
141    def __init__(self):
142        pass
143
144    def get_action_strength(self, action: str) -> float:
145        """ 返回某个动作的强度 """
146        pass
147
148    def is_action_pressed(self, action: str) -> bool:
149        """ 检查某个动作是否被按下 """
150        pass
151
152    def is_action_released(self, action: str) -> bool:
153        """ 检查某个动作是否被释放 """
154        pass

输入事件基类