robotengine.ho_robot
ho_robot 是 robotengine 控制 ho 机器人的节点。
ho_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。
如果配置了 url ho_link 节点会自动发送机器人的状态 HoState 到 url 指定的地址。
ho_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。
使用 ho_link.update() 函数可以向机器人发送数据。
挂载 ho_link 节点后,_process()的处理速度会显著受到影响,请酌情调整 engine 的运行频率。
1""" 2 3ho_robot 是 robotengine 控制 ho 机器人的节点。 4 5ho_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。 6 7如果配置了 url ho_link 节点会自动发送机器人的状态 HoState 到 url 指定的地址。 8 9ho_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。 10 11使用 ho_link.update() 函数可以向机器人发送数据。 12 13挂载 ho_link 节点后,_process()的处理速度会显著受到影响,请酌情调整 engine 的运行频率。 14""" 15 16from robotengine.node import Node 17from robotengine.serial_io import SerialIO, DeviceType, CheckSumType 18from robotengine.tools import hex2str, warning, error, info, near, vector_angle, vector_length, compute_vector_projection, find_closest_vectors 19from robotengine.signal import Signal 20from robotengine.timer import Timer 21from typing import List, Tuple 22from enum import Enum 23import requests 24import threading 25import time 26import random 27import multiprocessing 28import tkinter as tk 29from ttkbootstrap import ttk 30import ttkbootstrap as ttkb 31from fastapi import FastAPI, Request 32import uvicorn 33from urllib.parse import urlparse 34import copy 35import math 36import json 37import os 38 39class HoMode(Enum): 40 """ Ho 电机模态 """ 41 S = 0 42 """ 停止 """ 43 I = 1 44 """ 电流控制 """ 45 V = 2 46 """ 速度控制 """ 47 P = 3 48 """ 位置控制 """ 49 50class AlignState: 51 """ 帧和时间戳对齐的状态数据 """ 52 def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None: 53 """ 54 初始化对齐状态数据 55 56 :param id: 电机 id 57 :param i: 电流 58 :param v: 速度 59 :param p: 位置 60 :param frame: 当前帧 61 :param timestamp: 当前时间戳 62 """ 63 self.id = id 64 """ 电机 id """ 65 self.i: float = i 66 """ 电流 """ 67 self.v: float = v 68 """ 速度 """ 69 self.p: float = p 70 """ 位置 """ 71 self.frame = frame 72 """ 此状态数据对应的帧 """ 73 self.timestamp = timestamp 74 """ 此状态数据对应的时间戳 """ 75 76 def to_dict(self): 77 """ 转换为字典 """ 78 return { 79 "id": self.id, 80 "i": self.i, 81 "v": self.v, 82 "p": self.p, 83 "frame": self.frame, 84 "timestamp": self.timestamp 85 } 86 87 def __repr__(self): 88 return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})" 89 90class HoState: 91 """ Ho 机器人状态 """ 92 def __init__(self, states: List[AlignState], random_state=False) -> None: 93 """ 94 初始化 Ho 机器人状态 95 96 :param states: 帧和时间戳对齐的状态数据列表 97 :param random_state: 是否随机生成状态数据 98 """ 99 if not random_state: 100 self._states = states 101 else: 102 self._states = [] 103 for i in range(1, 9): 104 self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0)) 105 106 def get_state(self, id: int) -> AlignState: 107 """ 108 获取指定 id 的状态 109 """ 110 for state in self._states: 111 if state.id == id: 112 return state 113 return None 114 115 def get_states(self) -> List[AlignState]: 116 """ 117 获取所有状态 118 """ 119 return copy.deepcopy(self._states) 120 121 def to_dict(self): 122 """ 123 转换为字典 124 """ 125 return { 126 "states": [state.to_dict() for state in self._states] 127 } 128 129 def __repr__(self): 130 state_str = "" 131 for state in self._states: 132 state_str += str(state) 133 if state != self._states[-1]: 134 state_str += "\n" 135 return f"HoState(\n{state_str})" 136 137class HoLink(Node): 138 """ Ho 机器人链接节点 """ 139 def __init__(self, name="HoLink", buffer_capacity: int=1024, urls: List[str]=[], warn=True) -> None: 140 """ 141 初始化 Ho 机器人链接节点 142 143 :param name: 节点名称 144 :param buffer_capacity: 存储状态数据的缓冲区的容量 145 :param urls: 服务地址列表 146 :param read_mode: 串口读取模式 147 :param warn: 是否显示警告 148 """ 149 super().__init__(name) 150 self._data_length = 84 151 self._receive_data = None 152 self._urls = urls 153 self._warn = warn 154 155 if self._urls: 156 self._shutdown = multiprocessing.Event() 157 self._pending_capacity = 256 158 self._pending_requests = multiprocessing.Queue() 159 self._http_process = multiprocessing.Process(target=self._http_request, daemon=True, name=self.name+"HttpProcess") 160 self._http_process.start() 161 162 self._buffer_capacity: int = buffer_capacity 163 """ 存储状态数据的缓冲区的容量 """ 164 self._state_buffer: List[HoState] = [] 165 """ 存储状态数据的缓冲区 """ 166 167 self.sio: SerialIO = SerialIO(name="HoSerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=warn, baudrate=1000000, timeout=1.0) 168 """ 串口节点 HoLink 会主动挂载一个已经配置好的串口节点 """ 169 self.add_child(self.sio) 170 171 self.receive: Signal = Signal(bytes) 172 """ 信号,当接收到数据时触发(无论是否通过校验和) """ 173 self.ho_state_update: Signal = Signal(HoState) 174 """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """ 175 176 def _ready(self) -> None: 177 pass 178 179 def get_ho_state(self) -> HoState: 180 """ 181 获取机器人当前最新的状态数据 182 """ 183 if len(self._state_buffer) == 0: 184 return None 185 return self._state_buffer[-1] 186 187 def get_ho_state_buffer(self) -> List[HoState]: 188 """ 189 获取机器人当前的状态数据缓冲区 190 """ 191 return copy.deepcopy(self._state_buffer) 192 193 def _add_pending_request(self, ho_state: HoState): 194 """ 195 向请求队列中添加请求 196 """ 197 self._pending_requests.put(ho_state) 198 if self._pending_requests.qsize() > self._pending_capacity: 199 if self._warn: 200 warning(f"{self.name} 向 {self._urls} 发送请求时,请求队列已满,将丢弃最早的请求,可能会导致数据丢失") 201 self._pending_requests.get() 202 203 def _send_request(self, ho_state_dict: dict) -> None: 204 """ 205 向服务地址发送请求 206 """ 207 try: 208 for _url in self._urls: 209 response = requests.post(_url, json=ho_state_dict, timeout=0.1) 210 211 except requests.RequestException as e: 212 if self._warn: 213 warning(f"请求失败: {e}") 214 except Exception as e: 215 if self._warn: 216 warning(f"发生未知错误: {e}") 217 218 def _http_request(self): 219 info(f"{self.name} 已开启向服务地址 {self._urls} 发送数据的功能") 220 while not self._shutdown.is_set(): 221 if not self._pending_requests.empty(): 222 ho_state = self._pending_requests.get() 223 self._send_request(ho_state.to_dict()) 224 225 def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None: 226 """ 227 向机器人发送数据 228 """ 229 data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \ 230 self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2) 231 # print(f"发送数据: {hex2str(data)}") 232 self.sio.transmit(data) 233 234 def _process(self, delta) -> None: 235 self._receive_data = self.sio.receive(self._data_length) 236 if self._receive_data: 237 if self.sio.check_sum(self._receive_data): 238 states = [] 239 receive_data = self._receive_data[2:-2] 240 241 id = 1 242 for i in range(0, 80, 10): 243 _data = receive_data[i:i+10] 244 _p = self._decode(_data[0:4], 100.0, 4) 245 _v = self._decode(_data[4:8], 100.0, 4) 246 _i = self._decode(_data[8:10], 100.0, 2) 247 248 align_state = AlignState(id=id, i=_i, v=_v, p=_p, frame=self.engine.get_frame(), timestamp=self.engine.get_timestamp()) 249 states.append(align_state) 250 id += 1 251 252 ho_state = HoState(states) 253 self._state_buffer.append(ho_state) 254 255 if len(self._state_buffer) > self._buffer_capacity: 256 self._state_buffer.pop(0) 257 258 self.ho_state_update.emit(ho_state) 259 if self._urls: 260 self._add_pending_request(ho_state) 261 else: 262 if self._warn: 263 warning(f"{self.name} 长度为 {len(self._receive_data)} 的数据 {hex2str(self._receive_data)} 校验和错误") 264 self.receive.emit(self._receive_data) 265 266 def _encode(self, value: float, scale_factor: float, byte_length: int) -> bytes: 267 max_value = (1 << (8 * byte_length - 1)) 268 max_scaled_value = max_value / scale_factor 269 270 if abs(value) >= max_scaled_value: 271 raise ValueError(f"要编码的值 {round(value, 2)} 超出范围 [-{max_scaled_value}, {max_scaled_value}]") 272 273 encoded_value = int(value * scale_factor) + max_value 274 275 max_value_for_length = (1 << (8 * byte_length)) - 1 276 if encoded_value > max_value_for_length: 277 raise ValueError(f"编码值 {encoded_value} 超出了 {byte_length} 字节的最大值 {max_value_for_length}") 278 279 byte_data = [] 280 for i in range(byte_length): 281 byte_data.insert(0, encoded_value & 0xFF) 282 encoded_value >>= 8 283 284 return bytes(byte_data) 285 286 def _decode(self, data: bytes, scale_factor: float, byte_length: int) -> float: 287 if len(data) != byte_length: 288 raise ValueError(f"数据长度 {len(data)} 与指定的字节长度 {byte_length} 不匹配") 289 max_value = (1 << (8 * byte_length - 1)) 290 291 decoded_value = 0 292 for i in range(byte_length): 293 decoded_value <<= 8 294 decoded_value |= data[i] 295 296 decoded_value -= max_value 297 298 return decoded_value / scale_factor 299 300 # def _on_engine_exit(self): 301 # if self._urls: 302 # self._shutdown.set() 303 # self._http_process.join() 304 305 306class HoServer: 307 def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None: 308 """ 309 初始化 HoServer 实例。 310 311 :param url: 服务器的 URL。 312 :param capacity: 数据缓冲区的最大容量。 313 :param ui: 是否启用 UI 界面。 314 :param ui_frequency: UI 更新频率(Hz)。 315 """ 316 self._urls = url 317 parsed_url = urlparse(url) 318 self._host = parsed_url.hostname 319 self._port = parsed_url.port 320 self._path = parsed_url.path 321 322 self._ui = ui 323 self._ui_frequency = ui_frequency 324 self._capacity = capacity 325 self._data_buffer = [] 326 """ 327 数据缓冲区 328 """ 329 330 self._data_queue = multiprocessing.Queue() 331 self._shutdown = multiprocessing.Event() 332 333 # 启动 FastAPI 应用进程 334 self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True) 335 336 def _update_data(self): 337 """ 338 从数据队列中读取数据并更新缓冲区。 339 """ 340 while not self._shutdown.is_set(): 341 if not self._data_queue.empty(): 342 ho_state = self._data_queue.get() 343 self._data_buffer.append(ho_state) 344 if len(self._data_buffer) > self._capacity: 345 self._data_buffer.pop(0) 346 347 def has_data(self) -> bool: 348 """ 349 检查缓冲区中是否有数据。 350 351 :return: 如果缓冲区中有数据,则返回 True,否则返回 False。 352 """ 353 return len(self._data_buffer) > 0 354 355 def get_data(self) -> HoState: 356 """ 357 获取缓冲区中最新的数据。 358 359 :return: 缓冲区中最老的数据,如果缓冲区为空,则返回 None。 360 """ 361 if not self.has_data(): 362 return None 363 return self._data_buffer.pop(0) 364 365 def get_data_buffer(self) -> List[HoState]: 366 """ 367 获取缓冲区。 368 369 注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失 370 371 :return: 缓冲区。 372 """ 373 return copy.deepcopy(self._data_buffer) 374 375 def flush(self) -> None: 376 """ 377 清空缓冲区。 378 """ 379 self._data_buffer.clear() 380 381 def length(self) -> int: 382 """ 383 获取缓冲区中的数据长度。 384 385 :return: 缓冲区中的数据长度。 386 """ 387 return len(self._data_buffer) 388 389 def _init_ui(self) -> None: 390 """ 391 初始化 UI。 392 """ 393 self.root = tk.Tk() 394 self.root.title("HoServer") 395 self.root.geometry("800x600") 396 397 def run(self) -> None: 398 """ 399 启动服务器并运行 UI 更新线程(如果启用 UI)。 400 """ 401 self._app_process.start() 402 403 # 数据更新线程 404 self._data_thread = threading.Thread(target=self._update_data, daemon=True) 405 self._data_thread.start() 406 407 if self._ui: 408 self._init_ui() 409 # UI 更新线程 410 self._ui_thread = threading.Thread(target=self._update_ui, daemon=True) 411 self._ui_thread.start() 412 413 self.root.mainloop() 414 415 def _run_app(self, path: str, host: str, port: int) -> None: 416 """ 417 启动 FastAPI 服务器并监听请求。 418 419 :param path: API 路径。 420 :param host: 服务器主机。 421 :param port: 服务器端口。 422 """ 423 app = FastAPI() 424 app.add_api_route(path, self._handle_data, methods=["POST"]) 425 426 uvicorn.run(app, host=host, port=port) 427 428 async def _handle_data(self, request: Request) -> dict: 429 """ 430 处理接收到的 POST 请求数据。 431 432 :param request: FastAPI 请求对象。 433 :return: 处理结果。 434 """ 435 json_data = await request.json() 436 states_data = json_data.get("states", []) 437 438 states = [] 439 for state_data in states_data: 440 state = AlignState( 441 id=state_data["id"], 442 i=state_data["i"], 443 v=state_data["v"], 444 p=state_data["p"], 445 frame=state_data["frame"], 446 timestamp=state_data["timestamp"] 447 ) 448 states.append(state) 449 450 ho_state = HoState(states=states) 451 self._data_queue.put(ho_state) 452 return {"message": "Data received"} 453 454 def _init_ui(self) -> None: 455 """ 456 初始化 UI 界面。 457 """ 458 self.root = ttkb.Window(themename="superhero", title="HoServer") 459 460 frame = ttk.Frame(self.root) 461 frame.pack(padx=10, pady=10) 462 463 columns = ['Id', 'Frame', 'Timestamp', 'i', 'v', 'p'] 464 self.entries = {} 465 466 # 创建表头 467 for col, column_name in enumerate(columns): 468 label = ttk.Label(frame, text=column_name, width=5) 469 label.grid(row=0, column=col, padx=5, pady=5) 470 471 # 创建数据输入框 472 for row in range(8): 473 id_label = ttk.Label(frame, text=f"{row + 1}", width=5) 474 id_label.grid(row=row + 1, column=0, padx=5, pady=5) 475 for col in range(5): 476 entry = ttk.Entry(frame, width=15, state='normal') 477 entry.grid(row=row + 1, column=col + 1, padx=5, pady=10) 478 self.entries[(row, col)] = entry 479 480 def _update_ui(self) -> None: 481 """ 482 根据数据缓冲区更新 UI 界面。 483 """ 484 def update() -> None: 485 if len(self._data_buffer) == 0: 486 return 487 ho_state = self._data_buffer[-1] 488 489 # 清空当前数据 490 for row in range(8): 491 for col in range(5): 492 self.entries[(row, col)].delete(0, tk.END) 493 494 # 更新数据 495 for row in range(8): 496 align_state = ho_state.get_state(row + 1) 497 self.entries[(row, 0)].insert(0, str(align_state.frame)) 498 self.entries[(row, 1)].insert(0, str(align_state.timestamp)) 499 self.entries[(row, 2)].insert(0, str(round(align_state.i, 2))) 500 self.entries[(row, 3)].insert(0, str(round(align_state.v, 2))) 501 self.entries[(row, 4)].insert(0, str(round(align_state.p, 2))) 502 503 time_interval = 1.0 / self._ui_frequency 504 while not self._shutdown.is_set(): 505 time.sleep(time_interval) 506 507 self.root.after(0, update) 508 509 510 def __del__(self) -> None: 511 """ 512 清理资源,停止线程和进程。 513 """ 514 self._shutdown.set() 515 self._app_process.join() 516 self._data_thread.join() 517 if self._ui: 518 self._ui_thread.join() 519 520 521class ManualState(Enum): 522 """ 手动状态枚举 """ 523 IDLE = 0 524 """ 空闲状态,所有电机停止运动 """ 525 RUNNING = 1 526 """ 运行状态 """ 527 RETURNING = 2 528 """ 返回状态 """ 529 ZEROING = 3 530 """ 设置零点状态 """ 531 532class HoManual(Node): 533 """ Ho 机器人的手动控制节点 """ 534 from robotengine import InputEvent 535 def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None: 536 """ 537 初始化 HoManual 实例。 538 539 :param link: HoLink 实例。 540 :param name: 节点名称。 541 :param rotation_velocity: 旋转速度(度/秒)。 542 :param running_scale: 运行状态的缩放因子。 543 :param zeroing_scale: 设置零点状态的缩放因子。 544 :param axis_threshold: 轴的阈值。 545 """ 546 from robotengine import StateMachine 547 super().__init__(name) 548 self._debug = False 549 self._valid = True 550 551 self._link = link 552 self._link.ho_state_update.connect(self._on_ho_state_update) 553 554 self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine") 555 self.add_child(self.state_machine) 556 557 self._zero_angles = { 558 4: 0.0, 559 5: 0.0, 560 6: 0.0, 561 7: 0.0, 562 } 563 self._zero_index = 4 564 565 self._is_tension = False 566 self._is_rotation = False 567 self._rotation_velocity = rotation_velocity 568 self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7] 569 570 self._running_scale = running_scale 571 self._zeroing_scale = zeroing_scale 572 self._axis_threshold = axis_threshold 573 574 self._before_returning = None 575 576 self.exit() 577 578 def _init(self): 579 _load_zero_angles = self._load_from_json("zero_angles.json") 580 if _load_zero_angles: 581 info("成功加载 zero_angles.json") 582 for i in range(4, 8): 583 self._zero_angles[i] = _load_zero_angles[str(i)] 584 info(f"{i}: {self._zero_angles[i]}") 585 586 def _input(self, event: InputEvent) -> None: 587 if not self._valid: 588 return 589 590 state = self.state_machine.current_state 591 592 if event.is_action_pressed("X"): 593 self.tension(not self._is_tension) 594 595 elif event.is_action_pressed("Y"): 596 self.rotation(not self._is_rotation, self._rotation_velocity) 597 598 if state == ManualState.ZEROING: 599 if event.is_action_pressed("LEFT_SHOULDER"): 600 self._change_index(-1) 601 602 elif event.is_action_pressed("RIGHT_SHOULDER"): 603 self._change_index(1) 604 605 elif event.is_action_pressed("A"): 606 if self._debug: 607 return 608 ho_state = self._link.get_ho_state() 609 if not ho_state: 610 return 611 for i in range(4, 8): 612 state = ho_state.get_state(i) 613 self._zero_angles[i] = state.p 614 self._save_to_json("zero_angles.json", self._zero_angles) 615 616 def _change_index(self, dir: int) -> None: 617 self.lock(self._zero_index) 618 self._zero_index += dir 619 if self._zero_index > 7: 620 self._zero_index = 4 621 elif self._zero_index < 4: 622 self._zero_index = 7 623 info(f" 当前电机: {self._zero_index}") 624 625 def _on_ho_state_update(self, ho_state: HoState): 626 if not self._valid: 627 return 628 629 state = self.state_machine.current_state 630 631 if state == ManualState.IDLE: 632 pass 633 634 elif state == ManualState.RUNNING: 635 x_value = self._threshold(self.input.get_action_strength("RIGHT_X")) 636 y_value = self._threshold(self.input.get_action_strength("LEFT_Y")) 637 self.turn(x_value, y_value, ho_state) 638 639 l_value = self._threshold(self.input.get_action_strength("TRIGGER_LEFT")) 640 r_value = self._threshold(self.input.get_action_strength("TRIGGER_RIGHT")) 641 self.move(l_value, r_value, ho_state) 642 643 elif state == ManualState.RETURNING: 644 for i in range(4, 8): 645 self._link.update(i, HoMode.P, 2.0, 100.0, self._zero_angles[i]) 646 647 elif state == ManualState.ZEROING: 648 direction = self.input.get_action_strength("LEFT_Y") 649 direction = self._threshold(direction) 650 velocity = direction * self._zeroing_scale 651 if not self._debug: 652 self._link.update(self._zero_index, HoMode.V, 2.0, velocity, 0.0) 653 654 def tick(self, state: ManualState, delta: float) -> None: 655 if state == ManualState.IDLE: 656 pass 657 658 elif state == ManualState.RUNNING: 659 pass 660 661 elif state == ManualState.RETURNING: 662 pass 663 664 elif state == ManualState.ZEROING: 665 pass 666 667 def _threshold(self, value: float) -> float: 668 if abs(value) < self._axis_threshold: 669 return 0.0 670 return value 671 672 def get_next_state(self, state: ManualState) -> ManualState: 673 if state == ManualState.IDLE: 674 if self.input.is_action_pressed("START"): 675 self.input.flush_action("START") 676 return ManualState.RUNNING 677 678 if self.input.is_action_pressed("B"): 679 self.input.flush_action("B") 680 return ManualState.RETURNING 681 682 elif self.input.is_action_pressed("RIGHT_STICK"): 683 self.input.flush_action("RIGHT_STICK") 684 return ManualState.ZEROING 685 686 elif state == ManualState.RUNNING: 687 if self.input.is_action_pressed("START"): 688 self.input.flush_action("START") 689 return ManualState.IDLE 690 691 if self.input.is_action_pressed("B"): 692 self.input.flush_action("B") 693 return ManualState.RETURNING 694 695 elif state == ManualState.RETURNING: 696 if self.input.is_action_pressed("B"): 697 self.input.flush_action("B") 698 return self._before_returning 699 700 elif state == ManualState.ZEROING: 701 if self.input.is_action_pressed("RIGHT_STICK"): 702 self.input.flush_action("RIGHT_STICK") 703 return ManualState.IDLE 704 705 return self.state_machine.KEEP_CURRENT 706 707 def transition_state(self, from_state: ManualState, to_state: ManualState) -> None: 708 print("") 709 info(f"{from_state if from_state is not None else 'START'} -> {to_state}") 710 info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}") 711 712 if from_state == ManualState.IDLE: 713 pass 714 715 elif from_state == ManualState.RUNNING: 716 pass 717 718 elif from_state == ManualState.RETURNING: 719 pass 720 721 elif from_state == ManualState.ZEROING: 722 pass 723 724 info(" Y: 开关旋转") 725 info(" X: 开关张紧") 726 if to_state == ManualState.IDLE: 727 for i in range(1, 9): 728 if i == 2 or i == 3: 729 continue 730 self.stop(i) 731 info(" START: 进入 RUNNING 状态") 732 info(" B: 进入 RETURNING 状态") 733 info(" RIGHT_STICK: 进入 ZEROING 状态") 734 735 elif to_state == ManualState.RUNNING: 736 for i in range(1, 9): 737 if i == 2 or i == 3: 738 continue 739 self.lock(i) 740 info(" START: 返回 IDLE 状态") 741 info(" B: 进入 RETURNING 状态") 742 743 elif to_state == ManualState.RETURNING: 744 self.lock(1) 745 self._before_returning = from_state 746 info(" B: 返回之前的状态") 747 748 elif to_state == ManualState.ZEROING: 749 for i in range(1, 9): 750 if i == 2 or i == 3: 751 continue 752 self.lock(i) 753 info(" RIGHT_STICK: 返回 IDLE 状态") 754 info(" LEFT_SHOULDER: 切换到上一个电机") 755 info(" RIGHT_SHOULDER: 切换到下一个电机") 756 info(" A: 保存当前位置为零点") 757 info(f" 当前电机: {self._zero_index}") 758 759 def lock(self, id: int) -> None: 760 """ 761 锁定指定的电机。 762 763 :param id: 电机编号 764 """ 765 if self._debug: 766 info(f"{self.name} 锁定电机 {id}") 767 return 768 self._link.update(id, HoMode.V, 2.0, 0.0, 0.0) 769 770 def lock_all(self) -> None: 771 """ 772 锁定所有的电机。 773 """ 774 for i in range(1, 9): 775 self.lock(i) 776 777 def stop(self, id: int) -> None: 778 """ 779 停止指定的电机。 780 781 :param id: 电机编号 782 """ 783 if self._debug: 784 info(f"{self.name} 停止电机 {id}") 785 return 786 self._link.update(id, HoMode.S, 0.0, 0.0, 0.0) 787 788 def stop_all(self) -> None: 789 """ 790 停止所有的电机。 791 """ 792 for i in range(1, 9): 793 self.stop(i) 794 795 def tension(self, on: bool, i: float=0.8) -> None: 796 """ 797 驱动牵引电机,张紧导管 798 799 :param on: 是否开启牵引 800 :param i: 牵引电流(A) 801 """ 802 self._is_tension = on 803 if on: 804 self._link.update(2, HoMode.V, i, -360.0, 0.0) 805 self._link.update(3, HoMode.V, i, 360.0, 0.0) 806 else: 807 self.stop(2) 808 self.stop(3) 809 810 def rotation(self, on: bool, velocity: float = 360.0) -> None: 811 """ 812 驱动旋转电机,旋转换能器 813 814 :param on: 是否开启旋转 815 :param velocity: 旋转速度(度/秒) 816 """ 817 self._is_rotation = on 818 if on: 819 self._link.update(8, HoMode.V, 2.0, velocity, 0.0) 820 else: 821 self.stop(8) 822 823 def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None: 824 """ 825 驱动转向电机,转向导管 826 827 :param x_value: 横向控制值 828 :param y_value: 纵向控制值 829 :param ho_state: Ho 机器人状态 830 """ 831 if x_value == 0 and y_value == 0: 832 for i in range(4, 8): 833 self._link.update(i, HoMode.V, 2.0, 0.0, 0.0) 834 else: 835 projection = compute_vector_projection(x_value, y_value, self._base_angles) 836 control_values = [v * self._running_scale for v in projection] 837 838 for i in range(4, 8): 839 if control_values[i-4] > 0: 840 a_id = i 841 b_id = (i + 2) % 4 + 4 842 a_state = ho_state.get_state(a_id) 843 b_state = ho_state.get_state(b_id) 844 a_near = near(a_state.p, self._zero_angles[a_id]) 845 b_near = near(b_state.p, self._zero_angles[b_id]) 846 847 if a_near and not b_near: 848 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id]) 849 elif (not a_near and b_near) or (a_near and b_near): 850 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 851 elif not a_near and not b_near: 852 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 853 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id]) 854 855 def move(self, l_value: float, r_value: float, ho_state: HoState) -> None: 856 """ 857 驱动移动电机,移动导管 858 859 :param l_value: 左侧移动控制值 860 :param r_value: 右侧移动控制值 861 :param ho_state: Ho 机器人状态 862 """ 863 if l_value != 0 and r_value != 0: 864 self._link.update(1, HoMode.V, 2.0, 0.0, 0.0) 865 866 elif l_value != 0 and r_value== 0: 867 self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0) 868 869 elif r_value != 0 and l_value== 0: 870 self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0) 871 872 else: 873 self._link.update(1, HoMode.V, 2.0, 0.0, 0.0) 874 875 def is_running(self) -> bool: 876 """ 877 判断当前节点是否处于运行状态。 878 879 :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。 880 """ 881 return self._valid 882 883 def enter(self) -> None: 884 """ 885 进入节点。 886 """ 887 self.state_machine.freeze = False 888 self.state_machine.first_tick = True 889 self.state_machine.current_state = ManualState.IDLE 890 self._is_rotation = False 891 self._is_tension = False 892 self._valid = True 893 894 def exit(self) -> None: 895 """ 896 退出节点。 897 """ 898 self.state_machine.freeze = True 899 self.state_machine.first_tick = True 900 self.state_machine.current_state = ManualState.IDLE 901 self._valid = False 902 self._is_rotation = False 903 self._is_tension = False 904 self.stop_all() 905 906 def _save_to_json(self, file_name, data): 907 with open(file_name, 'w') as f: 908 json.dump(data, f) 909 info(f" {self.name} 保存 {file_name} 成功") 910 911 def _load_from_json(self, file_name): 912 if not os.path.exists(file_name): 913 warning(f"{file_name} 不存在,无法读取 zero_angles 将使用 0.0 作为初始值") 914 return None 915 with open(file_name, 'r') as f: 916 return json.load(f)
class
HoMode(enum.Enum):
40class HoMode(Enum): 41 """ Ho 电机模态 """ 42 S = 0 43 """ 停止 """ 44 I = 1 45 """ 电流控制 """ 46 V = 2 47 """ 速度控制 """ 48 P = 3 49 """ 位置控制 """
Ho 电机模态
Inherited Members
- enum.Enum
- name
- value
class
AlignState:
51class AlignState: 52 """ 帧和时间戳对齐的状态数据 """ 53 def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None: 54 """ 55 初始化对齐状态数据 56 57 :param id: 电机 id 58 :param i: 电流 59 :param v: 速度 60 :param p: 位置 61 :param frame: 当前帧 62 :param timestamp: 当前时间戳 63 """ 64 self.id = id 65 """ 电机 id """ 66 self.i: float = i 67 """ 电流 """ 68 self.v: float = v 69 """ 速度 """ 70 self.p: float = p 71 """ 位置 """ 72 self.frame = frame 73 """ 此状态数据对应的帧 """ 74 self.timestamp = timestamp 75 """ 此状态数据对应的时间戳 """ 76 77 def to_dict(self): 78 """ 转换为字典 """ 79 return { 80 "id": self.id, 81 "i": self.i, 82 "v": self.v, 83 "p": self.p, 84 "frame": self.frame, 85 "timestamp": self.timestamp 86 } 87 88 def __repr__(self): 89 return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})"
帧和时间戳对齐的状态数据
AlignState(id: int, i: float, v: float, p: float, frame: int, timestamp: float)
53 def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None: 54 """ 55 初始化对齐状态数据 56 57 :param id: 电机 id 58 :param i: 电流 59 :param v: 速度 60 :param p: 位置 61 :param frame: 当前帧 62 :param timestamp: 当前时间戳 63 """ 64 self.id = id 65 """ 电机 id """ 66 self.i: float = i 67 """ 电流 """ 68 self.v: float = v 69 """ 速度 """ 70 self.p: float = p 71 """ 位置 """ 72 self.frame = frame 73 """ 此状态数据对应的帧 """ 74 self.timestamp = timestamp 75 """ 此状态数据对应的时间戳 """
初始化对齐状态数据
:param id: 电机 id
:param i: 电流
:param v: 速度
:param p: 位置
:param frame: 当前帧
:param timestamp: 当前时间戳
class
HoState:
91class HoState: 92 """ Ho 机器人状态 """ 93 def __init__(self, states: List[AlignState], random_state=False) -> None: 94 """ 95 初始化 Ho 机器人状态 96 97 :param states: 帧和时间戳对齐的状态数据列表 98 :param random_state: 是否随机生成状态数据 99 """ 100 if not random_state: 101 self._states = states 102 else: 103 self._states = [] 104 for i in range(1, 9): 105 self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0)) 106 107 def get_state(self, id: int) -> AlignState: 108 """ 109 获取指定 id 的状态 110 """ 111 for state in self._states: 112 if state.id == id: 113 return state 114 return None 115 116 def get_states(self) -> List[AlignState]: 117 """ 118 获取所有状态 119 """ 120 return copy.deepcopy(self._states) 121 122 def to_dict(self): 123 """ 124 转换为字典 125 """ 126 return { 127 "states": [state.to_dict() for state in self._states] 128 } 129 130 def __repr__(self): 131 state_str = "" 132 for state in self._states: 133 state_str += str(state) 134 if state != self._states[-1]: 135 state_str += "\n" 136 return f"HoState(\n{state_str})"
Ho 机器人状态
HoState(states: List[AlignState], random_state=False)
93 def __init__(self, states: List[AlignState], random_state=False) -> None: 94 """ 95 初始化 Ho 机器人状态 96 97 :param states: 帧和时间戳对齐的状态数据列表 98 :param random_state: 是否随机生成状态数据 99 """ 100 if not random_state: 101 self._states = states 102 else: 103 self._states = [] 104 for i in range(1, 9): 105 self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0))
初始化 Ho 机器人状态
:param states: 帧和时间戳对齐的状态数据列表
:param random_state: 是否随机生成状态数据
107 def get_state(self, id: int) -> AlignState: 108 """ 109 获取指定 id 的状态 110 """ 111 for state in self._states: 112 if state.id == id: 113 return state 114 return None
获取指定 id 的状态
138class HoLink(Node): 139 """ Ho 机器人链接节点 """ 140 def __init__(self, name="HoLink", buffer_capacity: int=1024, urls: List[str]=[], warn=True) -> None: 141 """ 142 初始化 Ho 机器人链接节点 143 144 :param name: 节点名称 145 :param buffer_capacity: 存储状态数据的缓冲区的容量 146 :param urls: 服务地址列表 147 :param read_mode: 串口读取模式 148 :param warn: 是否显示警告 149 """ 150 super().__init__(name) 151 self._data_length = 84 152 self._receive_data = None 153 self._urls = urls 154 self._warn = warn 155 156 if self._urls: 157 self._shutdown = multiprocessing.Event() 158 self._pending_capacity = 256 159 self._pending_requests = multiprocessing.Queue() 160 self._http_process = multiprocessing.Process(target=self._http_request, daemon=True, name=self.name+"HttpProcess") 161 self._http_process.start() 162 163 self._buffer_capacity: int = buffer_capacity 164 """ 存储状态数据的缓冲区的容量 """ 165 self._state_buffer: List[HoState] = [] 166 """ 存储状态数据的缓冲区 """ 167 168 self.sio: SerialIO = SerialIO(name="HoSerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=warn, baudrate=1000000, timeout=1.0) 169 """ 串口节点 HoLink 会主动挂载一个已经配置好的串口节点 """ 170 self.add_child(self.sio) 171 172 self.receive: Signal = Signal(bytes) 173 """ 信号,当接收到数据时触发(无论是否通过校验和) """ 174 self.ho_state_update: Signal = Signal(HoState) 175 """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """ 176 177 def _ready(self) -> None: 178 pass 179 180 def get_ho_state(self) -> HoState: 181 """ 182 获取机器人当前最新的状态数据 183 """ 184 if len(self._state_buffer) == 0: 185 return None 186 return self._state_buffer[-1] 187 188 def get_ho_state_buffer(self) -> List[HoState]: 189 """ 190 获取机器人当前的状态数据缓冲区 191 """ 192 return copy.deepcopy(self._state_buffer) 193 194 def _add_pending_request(self, ho_state: HoState): 195 """ 196 向请求队列中添加请求 197 """ 198 self._pending_requests.put(ho_state) 199 if self._pending_requests.qsize() > self._pending_capacity: 200 if self._warn: 201 warning(f"{self.name} 向 {self._urls} 发送请求时,请求队列已满,将丢弃最早的请求,可能会导致数据丢失") 202 self._pending_requests.get() 203 204 def _send_request(self, ho_state_dict: dict) -> None: 205 """ 206 向服务地址发送请求 207 """ 208 try: 209 for _url in self._urls: 210 response = requests.post(_url, json=ho_state_dict, timeout=0.1) 211 212 except requests.RequestException as e: 213 if self._warn: 214 warning(f"请求失败: {e}") 215 except Exception as e: 216 if self._warn: 217 warning(f"发生未知错误: {e}") 218 219 def _http_request(self): 220 info(f"{self.name} 已开启向服务地址 {self._urls} 发送数据的功能") 221 while not self._shutdown.is_set(): 222 if not self._pending_requests.empty(): 223 ho_state = self._pending_requests.get() 224 self._send_request(ho_state.to_dict()) 225 226 def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None: 227 """ 228 向机器人发送数据 229 """ 230 data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \ 231 self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2) 232 # print(f"发送数据: {hex2str(data)}") 233 self.sio.transmit(data) 234 235 def _process(self, delta) -> None: 236 self._receive_data = self.sio.receive(self._data_length) 237 if self._receive_data: 238 if self.sio.check_sum(self._receive_data): 239 states = [] 240 receive_data = self._receive_data[2:-2] 241 242 id = 1 243 for i in range(0, 80, 10): 244 _data = receive_data[i:i+10] 245 _p = self._decode(_data[0:4], 100.0, 4) 246 _v = self._decode(_data[4:8], 100.0, 4) 247 _i = self._decode(_data[8:10], 100.0, 2) 248 249 align_state = AlignState(id=id, i=_i, v=_v, p=_p, frame=self.engine.get_frame(), timestamp=self.engine.get_timestamp()) 250 states.append(align_state) 251 id += 1 252 253 ho_state = HoState(states) 254 self._state_buffer.append(ho_state) 255 256 if len(self._state_buffer) > self._buffer_capacity: 257 self._state_buffer.pop(0) 258 259 self.ho_state_update.emit(ho_state) 260 if self._urls: 261 self._add_pending_request(ho_state) 262 else: 263 if self._warn: 264 warning(f"{self.name} 长度为 {len(self._receive_data)} 的数据 {hex2str(self._receive_data)} 校验和错误") 265 self.receive.emit(self._receive_data) 266 267 def _encode(self, value: float, scale_factor: float, byte_length: int) -> bytes: 268 max_value = (1 << (8 * byte_length - 1)) 269 max_scaled_value = max_value / scale_factor 270 271 if abs(value) >= max_scaled_value: 272 raise ValueError(f"要编码的值 {round(value, 2)} 超出范围 [-{max_scaled_value}, {max_scaled_value}]") 273 274 encoded_value = int(value * scale_factor) + max_value 275 276 max_value_for_length = (1 << (8 * byte_length)) - 1 277 if encoded_value > max_value_for_length: 278 raise ValueError(f"编码值 {encoded_value} 超出了 {byte_length} 字节的最大值 {max_value_for_length}") 279 280 byte_data = [] 281 for i in range(byte_length): 282 byte_data.insert(0, encoded_value & 0xFF) 283 encoded_value >>= 8 284 285 return bytes(byte_data) 286 287 def _decode(self, data: bytes, scale_factor: float, byte_length: int) -> float: 288 if len(data) != byte_length: 289 raise ValueError(f"数据长度 {len(data)} 与指定的字节长度 {byte_length} 不匹配") 290 max_value = (1 << (8 * byte_length - 1)) 291 292 decoded_value = 0 293 for i in range(byte_length): 294 decoded_value <<= 8 295 decoded_value |= data[i] 296 297 decoded_value -= max_value 298 299 return decoded_value / scale_factor 300 301 # def _on_engine_exit(self): 302 # if self._urls: 303 # self._shutdown.set() 304 # self._http_process.join()
Ho 机器人链接节点
HoLink( name='HoLink', buffer_capacity: int = 1024, urls: List[str] = [], warn=True)
140 def __init__(self, name="HoLink", buffer_capacity: int=1024, urls: List[str]=[], warn=True) -> None: 141 """ 142 初始化 Ho 机器人链接节点 143 144 :param name: 节点名称 145 :param buffer_capacity: 存储状态数据的缓冲区的容量 146 :param urls: 服务地址列表 147 :param read_mode: 串口读取模式 148 :param warn: 是否显示警告 149 """ 150 super().__init__(name) 151 self._data_length = 84 152 self._receive_data = None 153 self._urls = urls 154 self._warn = warn 155 156 if self._urls: 157 self._shutdown = multiprocessing.Event() 158 self._pending_capacity = 256 159 self._pending_requests = multiprocessing.Queue() 160 self._http_process = multiprocessing.Process(target=self._http_request, daemon=True, name=self.name+"HttpProcess") 161 self._http_process.start() 162 163 self._buffer_capacity: int = buffer_capacity 164 """ 存储状态数据的缓冲区的容量 """ 165 self._state_buffer: List[HoState] = [] 166 """ 存储状态数据的缓冲区 """ 167 168 self.sio: SerialIO = SerialIO(name="HoSerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=warn, baudrate=1000000, timeout=1.0) 169 """ 串口节点 HoLink 会主动挂载一个已经配置好的串口节点 """ 170 self.add_child(self.sio) 171 172 self.receive: Signal = Signal(bytes) 173 """ 信号,当接收到数据时触发(无论是否通过校验和) """ 174 self.ho_state_update: Signal = Signal(HoState) 175 """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """
初始化 Ho 机器人链接节点
:param name: 节点名称
:param buffer_capacity: 存储状态数据的缓冲区的容量
:param urls: 服务地址列表
:param read_mode: 串口读取模式
:param warn: 是否显示警告
180 def get_ho_state(self) -> HoState: 181 """ 182 获取机器人当前最新的状态数据 183 """ 184 if len(self._state_buffer) == 0: 185 return None 186 return self._state_buffer[-1]
获取机器人当前最新的状态数据
188 def get_ho_state_buffer(self) -> List[HoState]: 189 """ 190 获取机器人当前的状态数据缓冲区 191 """ 192 return copy.deepcopy(self._state_buffer)
获取机器人当前的状态数据缓冲区
226 def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None: 227 """ 228 向机器人发送数据 229 """ 230 data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \ 231 self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2) 232 # print(f"发送数据: {hex2str(data)}") 233 self.sio.transmit(data)
向机器人发送数据
class
HoServer:
307class HoServer: 308 def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None: 309 """ 310 初始化 HoServer 实例。 311 312 :param url: 服务器的 URL。 313 :param capacity: 数据缓冲区的最大容量。 314 :param ui: 是否启用 UI 界面。 315 :param ui_frequency: UI 更新频率(Hz)。 316 """ 317 self._urls = url 318 parsed_url = urlparse(url) 319 self._host = parsed_url.hostname 320 self._port = parsed_url.port 321 self._path = parsed_url.path 322 323 self._ui = ui 324 self._ui_frequency = ui_frequency 325 self._capacity = capacity 326 self._data_buffer = [] 327 """ 328 数据缓冲区 329 """ 330 331 self._data_queue = multiprocessing.Queue() 332 self._shutdown = multiprocessing.Event() 333 334 # 启动 FastAPI 应用进程 335 self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True) 336 337 def _update_data(self): 338 """ 339 从数据队列中读取数据并更新缓冲区。 340 """ 341 while not self._shutdown.is_set(): 342 if not self._data_queue.empty(): 343 ho_state = self._data_queue.get() 344 self._data_buffer.append(ho_state) 345 if len(self._data_buffer) > self._capacity: 346 self._data_buffer.pop(0) 347 348 def has_data(self) -> bool: 349 """ 350 检查缓冲区中是否有数据。 351 352 :return: 如果缓冲区中有数据,则返回 True,否则返回 False。 353 """ 354 return len(self._data_buffer) > 0 355 356 def get_data(self) -> HoState: 357 """ 358 获取缓冲区中最新的数据。 359 360 :return: 缓冲区中最老的数据,如果缓冲区为空,则返回 None。 361 """ 362 if not self.has_data(): 363 return None 364 return self._data_buffer.pop(0) 365 366 def get_data_buffer(self) -> List[HoState]: 367 """ 368 获取缓冲区。 369 370 注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失 371 372 :return: 缓冲区。 373 """ 374 return copy.deepcopy(self._data_buffer) 375 376 def flush(self) -> None: 377 """ 378 清空缓冲区。 379 """ 380 self._data_buffer.clear() 381 382 def length(self) -> int: 383 """ 384 获取缓冲区中的数据长度。 385 386 :return: 缓冲区中的数据长度。 387 """ 388 return len(self._data_buffer) 389 390 def _init_ui(self) -> None: 391 """ 392 初始化 UI。 393 """ 394 self.root = tk.Tk() 395 self.root.title("HoServer") 396 self.root.geometry("800x600") 397 398 def run(self) -> None: 399 """ 400 启动服务器并运行 UI 更新线程(如果启用 UI)。 401 """ 402 self._app_process.start() 403 404 # 数据更新线程 405 self._data_thread = threading.Thread(target=self._update_data, daemon=True) 406 self._data_thread.start() 407 408 if self._ui: 409 self._init_ui() 410 # UI 更新线程 411 self._ui_thread = threading.Thread(target=self._update_ui, daemon=True) 412 self._ui_thread.start() 413 414 self.root.mainloop() 415 416 def _run_app(self, path: str, host: str, port: int) -> None: 417 """ 418 启动 FastAPI 服务器并监听请求。 419 420 :param path: API 路径。 421 :param host: 服务器主机。 422 :param port: 服务器端口。 423 """ 424 app = FastAPI() 425 app.add_api_route(path, self._handle_data, methods=["POST"]) 426 427 uvicorn.run(app, host=host, port=port) 428 429 async def _handle_data(self, request: Request) -> dict: 430 """ 431 处理接收到的 POST 请求数据。 432 433 :param request: FastAPI 请求对象。 434 :return: 处理结果。 435 """ 436 json_data = await request.json() 437 states_data = json_data.get("states", []) 438 439 states = [] 440 for state_data in states_data: 441 state = AlignState( 442 id=state_data["id"], 443 i=state_data["i"], 444 v=state_data["v"], 445 p=state_data["p"], 446 frame=state_data["frame"], 447 timestamp=state_data["timestamp"] 448 ) 449 states.append(state) 450 451 ho_state = HoState(states=states) 452 self._data_queue.put(ho_state) 453 return {"message": "Data received"} 454 455 def _init_ui(self) -> None: 456 """ 457 初始化 UI 界面。 458 """ 459 self.root = ttkb.Window(themename="superhero", title="HoServer") 460 461 frame = ttk.Frame(self.root) 462 frame.pack(padx=10, pady=10) 463 464 columns = ['Id', 'Frame', 'Timestamp', 'i', 'v', 'p'] 465 self.entries = {} 466 467 # 创建表头 468 for col, column_name in enumerate(columns): 469 label = ttk.Label(frame, text=column_name, width=5) 470 label.grid(row=0, column=col, padx=5, pady=5) 471 472 # 创建数据输入框 473 for row in range(8): 474 id_label = ttk.Label(frame, text=f"{row + 1}", width=5) 475 id_label.grid(row=row + 1, column=0, padx=5, pady=5) 476 for col in range(5): 477 entry = ttk.Entry(frame, width=15, state='normal') 478 entry.grid(row=row + 1, column=col + 1, padx=5, pady=10) 479 self.entries[(row, col)] = entry 480 481 def _update_ui(self) -> None: 482 """ 483 根据数据缓冲区更新 UI 界面。 484 """ 485 def update() -> None: 486 if len(self._data_buffer) == 0: 487 return 488 ho_state = self._data_buffer[-1] 489 490 # 清空当前数据 491 for row in range(8): 492 for col in range(5): 493 self.entries[(row, col)].delete(0, tk.END) 494 495 # 更新数据 496 for row in range(8): 497 align_state = ho_state.get_state(row + 1) 498 self.entries[(row, 0)].insert(0, str(align_state.frame)) 499 self.entries[(row, 1)].insert(0, str(align_state.timestamp)) 500 self.entries[(row, 2)].insert(0, str(round(align_state.i, 2))) 501 self.entries[(row, 3)].insert(0, str(round(align_state.v, 2))) 502 self.entries[(row, 4)].insert(0, str(round(align_state.p, 2))) 503 504 time_interval = 1.0 / self._ui_frequency 505 while not self._shutdown.is_set(): 506 time.sleep(time_interval) 507 508 self.root.after(0, update) 509 510 511 def __del__(self) -> None: 512 """ 513 清理资源,停止线程和进程。 514 """ 515 self._shutdown.set() 516 self._app_process.join() 517 self._data_thread.join() 518 if self._ui: 519 self._ui_thread.join()
HoServer(url: str, capacity=1024, ui: bool = True, ui_frequency: float = 30.0)
308 def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None: 309 """ 310 初始化 HoServer 实例。 311 312 :param url: 服务器的 URL。 313 :param capacity: 数据缓冲区的最大容量。 314 :param ui: 是否启用 UI 界面。 315 :param ui_frequency: UI 更新频率(Hz)。 316 """ 317 self._urls = url 318 parsed_url = urlparse(url) 319 self._host = parsed_url.hostname 320 self._port = parsed_url.port 321 self._path = parsed_url.path 322 323 self._ui = ui 324 self._ui_frequency = ui_frequency 325 self._capacity = capacity 326 self._data_buffer = [] 327 """ 328 数据缓冲区 329 """ 330 331 self._data_queue = multiprocessing.Queue() 332 self._shutdown = multiprocessing.Event() 333 334 # 启动 FastAPI 应用进程 335 self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True)
初始化 HoServer 实例。
:param url: 服务器的 URL。
:param capacity: 数据缓冲区的最大容量。
:param ui: 是否启用 UI 界面。
:param ui_frequency: UI 更新频率(Hz)。
def
has_data(self) -> bool:
348 def has_data(self) -> bool: 349 """ 350 检查缓冲区中是否有数据。 351 352 :return: 如果缓冲区中有数据,则返回 True,否则返回 False。 353 """ 354 return len(self._data_buffer) > 0
检查缓冲区中是否有数据。
:return: 如果缓冲区中有数据,则返回 True,否则返回 False。
356 def get_data(self) -> HoState: 357 """ 358 获取缓冲区中最新的数据。 359 360 :return: 缓冲区中最老的数据,如果缓冲区为空,则返回 None。 361 """ 362 if not self.has_data(): 363 return None 364 return self._data_buffer.pop(0)
获取缓冲区中最新的数据。
:return: 缓冲区中最老的数据,如果缓冲区为空,则返回 None。
366 def get_data_buffer(self) -> List[HoState]: 367 """ 368 获取缓冲区。 369 370 注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失 371 372 :return: 缓冲区。 373 """ 374 return copy.deepcopy(self._data_buffer)
获取缓冲区。
注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失
:return: 缓冲区。
def
length(self) -> int:
382 def length(self) -> int: 383 """ 384 获取缓冲区中的数据长度。 385 386 :return: 缓冲区中的数据长度。 387 """ 388 return len(self._data_buffer)
获取缓冲区中的数据长度。
:return: 缓冲区中的数据长度。
def
run(self) -> None:
398 def run(self) -> None: 399 """ 400 启动服务器并运行 UI 更新线程(如果启用 UI)。 401 """ 402 self._app_process.start() 403 404 # 数据更新线程 405 self._data_thread = threading.Thread(target=self._update_data, daemon=True) 406 self._data_thread.start() 407 408 if self._ui: 409 self._init_ui() 410 # UI 更新线程 411 self._ui_thread = threading.Thread(target=self._update_ui, daemon=True) 412 self._ui_thread.start() 413 414 self.root.mainloop()
启动服务器并运行 UI 更新线程(如果启用 UI)。
class
ManualState(enum.Enum):
522class ManualState(Enum): 523 """ 手动状态枚举 """ 524 IDLE = 0 525 """ 空闲状态,所有电机停止运动 """ 526 RUNNING = 1 527 """ 运行状态 """ 528 RETURNING = 2 529 """ 返回状态 """ 530 ZEROING = 3 531 """ 设置零点状态 """
手动状态枚举
Inherited Members
- enum.Enum
- name
- value
533class HoManual(Node): 534 """ Ho 机器人的手动控制节点 """ 535 from robotengine import InputEvent 536 def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None: 537 """ 538 初始化 HoManual 实例。 539 540 :param link: HoLink 实例。 541 :param name: 节点名称。 542 :param rotation_velocity: 旋转速度(度/秒)。 543 :param running_scale: 运行状态的缩放因子。 544 :param zeroing_scale: 设置零点状态的缩放因子。 545 :param axis_threshold: 轴的阈值。 546 """ 547 from robotengine import StateMachine 548 super().__init__(name) 549 self._debug = False 550 self._valid = True 551 552 self._link = link 553 self._link.ho_state_update.connect(self._on_ho_state_update) 554 555 self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine") 556 self.add_child(self.state_machine) 557 558 self._zero_angles = { 559 4: 0.0, 560 5: 0.0, 561 6: 0.0, 562 7: 0.0, 563 } 564 self._zero_index = 4 565 566 self._is_tension = False 567 self._is_rotation = False 568 self._rotation_velocity = rotation_velocity 569 self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7] 570 571 self._running_scale = running_scale 572 self._zeroing_scale = zeroing_scale 573 self._axis_threshold = axis_threshold 574 575 self._before_returning = None 576 577 self.exit() 578 579 def _init(self): 580 _load_zero_angles = self._load_from_json("zero_angles.json") 581 if _load_zero_angles: 582 info("成功加载 zero_angles.json") 583 for i in range(4, 8): 584 self._zero_angles[i] = _load_zero_angles[str(i)] 585 info(f"{i}: {self._zero_angles[i]}") 586 587 def _input(self, event: InputEvent) -> None: 588 if not self._valid: 589 return 590 591 state = self.state_machine.current_state 592 593 if event.is_action_pressed("X"): 594 self.tension(not self._is_tension) 595 596 elif event.is_action_pressed("Y"): 597 self.rotation(not self._is_rotation, self._rotation_velocity) 598 599 if state == ManualState.ZEROING: 600 if event.is_action_pressed("LEFT_SHOULDER"): 601 self._change_index(-1) 602 603 elif event.is_action_pressed("RIGHT_SHOULDER"): 604 self._change_index(1) 605 606 elif event.is_action_pressed("A"): 607 if self._debug: 608 return 609 ho_state = self._link.get_ho_state() 610 if not ho_state: 611 return 612 for i in range(4, 8): 613 state = ho_state.get_state(i) 614 self._zero_angles[i] = state.p 615 self._save_to_json("zero_angles.json", self._zero_angles) 616 617 def _change_index(self, dir: int) -> None: 618 self.lock(self._zero_index) 619 self._zero_index += dir 620 if self._zero_index > 7: 621 self._zero_index = 4 622 elif self._zero_index < 4: 623 self._zero_index = 7 624 info(f" 当前电机: {self._zero_index}") 625 626 def _on_ho_state_update(self, ho_state: HoState): 627 if not self._valid: 628 return 629 630 state = self.state_machine.current_state 631 632 if state == ManualState.IDLE: 633 pass 634 635 elif state == ManualState.RUNNING: 636 x_value = self._threshold(self.input.get_action_strength("RIGHT_X")) 637 y_value = self._threshold(self.input.get_action_strength("LEFT_Y")) 638 self.turn(x_value, y_value, ho_state) 639 640 l_value = self._threshold(self.input.get_action_strength("TRIGGER_LEFT")) 641 r_value = self._threshold(self.input.get_action_strength("TRIGGER_RIGHT")) 642 self.move(l_value, r_value, ho_state) 643 644 elif state == ManualState.RETURNING: 645 for i in range(4, 8): 646 self._link.update(i, HoMode.P, 2.0, 100.0, self._zero_angles[i]) 647 648 elif state == ManualState.ZEROING: 649 direction = self.input.get_action_strength("LEFT_Y") 650 direction = self._threshold(direction) 651 velocity = direction * self._zeroing_scale 652 if not self._debug: 653 self._link.update(self._zero_index, HoMode.V, 2.0, velocity, 0.0) 654 655 def tick(self, state: ManualState, delta: float) -> None: 656 if state == ManualState.IDLE: 657 pass 658 659 elif state == ManualState.RUNNING: 660 pass 661 662 elif state == ManualState.RETURNING: 663 pass 664 665 elif state == ManualState.ZEROING: 666 pass 667 668 def _threshold(self, value: float) -> float: 669 if abs(value) < self._axis_threshold: 670 return 0.0 671 return value 672 673 def get_next_state(self, state: ManualState) -> ManualState: 674 if state == ManualState.IDLE: 675 if self.input.is_action_pressed("START"): 676 self.input.flush_action("START") 677 return ManualState.RUNNING 678 679 if self.input.is_action_pressed("B"): 680 self.input.flush_action("B") 681 return ManualState.RETURNING 682 683 elif self.input.is_action_pressed("RIGHT_STICK"): 684 self.input.flush_action("RIGHT_STICK") 685 return ManualState.ZEROING 686 687 elif state == ManualState.RUNNING: 688 if self.input.is_action_pressed("START"): 689 self.input.flush_action("START") 690 return ManualState.IDLE 691 692 if self.input.is_action_pressed("B"): 693 self.input.flush_action("B") 694 return ManualState.RETURNING 695 696 elif state == ManualState.RETURNING: 697 if self.input.is_action_pressed("B"): 698 self.input.flush_action("B") 699 return self._before_returning 700 701 elif state == ManualState.ZEROING: 702 if self.input.is_action_pressed("RIGHT_STICK"): 703 self.input.flush_action("RIGHT_STICK") 704 return ManualState.IDLE 705 706 return self.state_machine.KEEP_CURRENT 707 708 def transition_state(self, from_state: ManualState, to_state: ManualState) -> None: 709 print("") 710 info(f"{from_state if from_state is not None else 'START'} -> {to_state}") 711 info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}") 712 713 if from_state == ManualState.IDLE: 714 pass 715 716 elif from_state == ManualState.RUNNING: 717 pass 718 719 elif from_state == ManualState.RETURNING: 720 pass 721 722 elif from_state == ManualState.ZEROING: 723 pass 724 725 info(" Y: 开关旋转") 726 info(" X: 开关张紧") 727 if to_state == ManualState.IDLE: 728 for i in range(1, 9): 729 if i == 2 or i == 3: 730 continue 731 self.stop(i) 732 info(" START: 进入 RUNNING 状态") 733 info(" B: 进入 RETURNING 状态") 734 info(" RIGHT_STICK: 进入 ZEROING 状态") 735 736 elif to_state == ManualState.RUNNING: 737 for i in range(1, 9): 738 if i == 2 or i == 3: 739 continue 740 self.lock(i) 741 info(" START: 返回 IDLE 状态") 742 info(" B: 进入 RETURNING 状态") 743 744 elif to_state == ManualState.RETURNING: 745 self.lock(1) 746 self._before_returning = from_state 747 info(" B: 返回之前的状态") 748 749 elif to_state == ManualState.ZEROING: 750 for i in range(1, 9): 751 if i == 2 or i == 3: 752 continue 753 self.lock(i) 754 info(" RIGHT_STICK: 返回 IDLE 状态") 755 info(" LEFT_SHOULDER: 切换到上一个电机") 756 info(" RIGHT_SHOULDER: 切换到下一个电机") 757 info(" A: 保存当前位置为零点") 758 info(f" 当前电机: {self._zero_index}") 759 760 def lock(self, id: int) -> None: 761 """ 762 锁定指定的电机。 763 764 :param id: 电机编号 765 """ 766 if self._debug: 767 info(f"{self.name} 锁定电机 {id}") 768 return 769 self._link.update(id, HoMode.V, 2.0, 0.0, 0.0) 770 771 def lock_all(self) -> None: 772 """ 773 锁定所有的电机。 774 """ 775 for i in range(1, 9): 776 self.lock(i) 777 778 def stop(self, id: int) -> None: 779 """ 780 停止指定的电机。 781 782 :param id: 电机编号 783 """ 784 if self._debug: 785 info(f"{self.name} 停止电机 {id}") 786 return 787 self._link.update(id, HoMode.S, 0.0, 0.0, 0.0) 788 789 def stop_all(self) -> None: 790 """ 791 停止所有的电机。 792 """ 793 for i in range(1, 9): 794 self.stop(i) 795 796 def tension(self, on: bool, i: float=0.8) -> None: 797 """ 798 驱动牵引电机,张紧导管 799 800 :param on: 是否开启牵引 801 :param i: 牵引电流(A) 802 """ 803 self._is_tension = on 804 if on: 805 self._link.update(2, HoMode.V, i, -360.0, 0.0) 806 self._link.update(3, HoMode.V, i, 360.0, 0.0) 807 else: 808 self.stop(2) 809 self.stop(3) 810 811 def rotation(self, on: bool, velocity: float = 360.0) -> None: 812 """ 813 驱动旋转电机,旋转换能器 814 815 :param on: 是否开启旋转 816 :param velocity: 旋转速度(度/秒) 817 """ 818 self._is_rotation = on 819 if on: 820 self._link.update(8, HoMode.V, 2.0, velocity, 0.0) 821 else: 822 self.stop(8) 823 824 def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None: 825 """ 826 驱动转向电机,转向导管 827 828 :param x_value: 横向控制值 829 :param y_value: 纵向控制值 830 :param ho_state: Ho 机器人状态 831 """ 832 if x_value == 0 and y_value == 0: 833 for i in range(4, 8): 834 self._link.update(i, HoMode.V, 2.0, 0.0, 0.0) 835 else: 836 projection = compute_vector_projection(x_value, y_value, self._base_angles) 837 control_values = [v * self._running_scale for v in projection] 838 839 for i in range(4, 8): 840 if control_values[i-4] > 0: 841 a_id = i 842 b_id = (i + 2) % 4 + 4 843 a_state = ho_state.get_state(a_id) 844 b_state = ho_state.get_state(b_id) 845 a_near = near(a_state.p, self._zero_angles[a_id]) 846 b_near = near(b_state.p, self._zero_angles[b_id]) 847 848 if a_near and not b_near: 849 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id]) 850 elif (not a_near and b_near) or (a_near and b_near): 851 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 852 elif not a_near and not b_near: 853 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 854 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id]) 855 856 def move(self, l_value: float, r_value: float, ho_state: HoState) -> None: 857 """ 858 驱动移动电机,移动导管 859 860 :param l_value: 左侧移动控制值 861 :param r_value: 右侧移动控制值 862 :param ho_state: Ho 机器人状态 863 """ 864 if l_value != 0 and r_value != 0: 865 self._link.update(1, HoMode.V, 2.0, 0.0, 0.0) 866 867 elif l_value != 0 and r_value== 0: 868 self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0) 869 870 elif r_value != 0 and l_value== 0: 871 self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0) 872 873 else: 874 self._link.update(1, HoMode.V, 2.0, 0.0, 0.0) 875 876 def is_running(self) -> bool: 877 """ 878 判断当前节点是否处于运行状态。 879 880 :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。 881 """ 882 return self._valid 883 884 def enter(self) -> None: 885 """ 886 进入节点。 887 """ 888 self.state_machine.freeze = False 889 self.state_machine.first_tick = True 890 self.state_machine.current_state = ManualState.IDLE 891 self._is_rotation = False 892 self._is_tension = False 893 self._valid = True 894 895 def exit(self) -> None: 896 """ 897 退出节点。 898 """ 899 self.state_machine.freeze = True 900 self.state_machine.first_tick = True 901 self.state_machine.current_state = ManualState.IDLE 902 self._valid = False 903 self._is_rotation = False 904 self._is_tension = False 905 self.stop_all() 906 907 def _save_to_json(self, file_name, data): 908 with open(file_name, 'w') as f: 909 json.dump(data, f) 910 info(f" {self.name} 保存 {file_name} 成功") 911 912 def _load_from_json(self, file_name): 913 if not os.path.exists(file_name): 914 warning(f"{file_name} 不存在,无法读取 zero_angles 将使用 0.0 作为初始值") 915 return None 916 with open(file_name, 'r') as f: 917 return json.load(f)
Ho 机器人的手动控制节点
HoManual( link: HoLink, name='Manual', rotation_velocity: float = 360.0, running_scale: float = 100.0, zeroing_scale: float = 100.0, axis_threshold: float = 0.1)
536 def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None: 537 """ 538 初始化 HoManual 实例。 539 540 :param link: HoLink 实例。 541 :param name: 节点名称。 542 :param rotation_velocity: 旋转速度(度/秒)。 543 :param running_scale: 运行状态的缩放因子。 544 :param zeroing_scale: 设置零点状态的缩放因子。 545 :param axis_threshold: 轴的阈值。 546 """ 547 from robotengine import StateMachine 548 super().__init__(name) 549 self._debug = False 550 self._valid = True 551 552 self._link = link 553 self._link.ho_state_update.connect(self._on_ho_state_update) 554 555 self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine") 556 self.add_child(self.state_machine) 557 558 self._zero_angles = { 559 4: 0.0, 560 5: 0.0, 561 6: 0.0, 562 7: 0.0, 563 } 564 self._zero_index = 4 565 566 self._is_tension = False 567 self._is_rotation = False 568 self._rotation_velocity = rotation_velocity 569 self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7] 570 571 self._running_scale = running_scale 572 self._zeroing_scale = zeroing_scale 573 self._axis_threshold = axis_threshold 574 575 self._before_returning = None 576 577 self.exit()
初始化 HoManual 实例。
:param link: HoLink 实例。
:param name: 节点名称。
:param rotation_velocity: 旋转速度(度/秒)。
:param running_scale: 运行状态的缩放因子。
:param zeroing_scale: 设置零点状态的缩放因子。
:param axis_threshold: 轴的阈值。
673 def get_next_state(self, state: ManualState) -> ManualState: 674 if state == ManualState.IDLE: 675 if self.input.is_action_pressed("START"): 676 self.input.flush_action("START") 677 return ManualState.RUNNING 678 679 if self.input.is_action_pressed("B"): 680 self.input.flush_action("B") 681 return ManualState.RETURNING 682 683 elif self.input.is_action_pressed("RIGHT_STICK"): 684 self.input.flush_action("RIGHT_STICK") 685 return ManualState.ZEROING 686 687 elif state == ManualState.RUNNING: 688 if self.input.is_action_pressed("START"): 689 self.input.flush_action("START") 690 return ManualState.IDLE 691 692 if self.input.is_action_pressed("B"): 693 self.input.flush_action("B") 694 return ManualState.RETURNING 695 696 elif state == ManualState.RETURNING: 697 if self.input.is_action_pressed("B"): 698 self.input.flush_action("B") 699 return self._before_returning 700 701 elif state == ManualState.ZEROING: 702 if self.input.is_action_pressed("RIGHT_STICK"): 703 self.input.flush_action("RIGHT_STICK") 704 return ManualState.IDLE 705 706 return self.state_machine.KEEP_CURRENT
708 def transition_state(self, from_state: ManualState, to_state: ManualState) -> None: 709 print("") 710 info(f"{from_state if from_state is not None else 'START'} -> {to_state}") 711 info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}") 712 713 if from_state == ManualState.IDLE: 714 pass 715 716 elif from_state == ManualState.RUNNING: 717 pass 718 719 elif from_state == ManualState.RETURNING: 720 pass 721 722 elif from_state == ManualState.ZEROING: 723 pass 724 725 info(" Y: 开关旋转") 726 info(" X: 开关张紧") 727 if to_state == ManualState.IDLE: 728 for i in range(1, 9): 729 if i == 2 or i == 3: 730 continue 731 self.stop(i) 732 info(" START: 进入 RUNNING 状态") 733 info(" B: 进入 RETURNING 状态") 734 info(" RIGHT_STICK: 进入 ZEROING 状态") 735 736 elif to_state == ManualState.RUNNING: 737 for i in range(1, 9): 738 if i == 2 or i == 3: 739 continue 740 self.lock(i) 741 info(" START: 返回 IDLE 状态") 742 info(" B: 进入 RETURNING 状态") 743 744 elif to_state == ManualState.RETURNING: 745 self.lock(1) 746 self._before_returning = from_state 747 info(" B: 返回之前的状态") 748 749 elif to_state == ManualState.ZEROING: 750 for i in range(1, 9): 751 if i == 2 or i == 3: 752 continue 753 self.lock(i) 754 info(" RIGHT_STICK: 返回 IDLE 状态") 755 info(" LEFT_SHOULDER: 切换到上一个电机") 756 info(" RIGHT_SHOULDER: 切换到下一个电机") 757 info(" A: 保存当前位置为零点") 758 info(f" 当前电机: {self._zero_index}")
def
lock(self, id: int) -> None:
760 def lock(self, id: int) -> None: 761 """ 762 锁定指定的电机。 763 764 :param id: 电机编号 765 """ 766 if self._debug: 767 info(f"{self.name} 锁定电机 {id}") 768 return 769 self._link.update(id, HoMode.V, 2.0, 0.0, 0.0)
锁定指定的电机。
:param id: 电机编号
def
lock_all(self) -> None:
771 def lock_all(self) -> None: 772 """ 773 锁定所有的电机。 774 """ 775 for i in range(1, 9): 776 self.lock(i)
锁定所有的电机。
def
stop(self, id: int) -> None:
778 def stop(self, id: int) -> None: 779 """ 780 停止指定的电机。 781 782 :param id: 电机编号 783 """ 784 if self._debug: 785 info(f"{self.name} 停止电机 {id}") 786 return 787 self._link.update(id, HoMode.S, 0.0, 0.0, 0.0)
停止指定的电机。
:param id: 电机编号
def
stop_all(self) -> None:
789 def stop_all(self) -> None: 790 """ 791 停止所有的电机。 792 """ 793 for i in range(1, 9): 794 self.stop(i)
停止所有的电机。
def
tension(self, on: bool, i: float = 0.8) -> None:
796 def tension(self, on: bool, i: float=0.8) -> None: 797 """ 798 驱动牵引电机,张紧导管 799 800 :param on: 是否开启牵引 801 :param i: 牵引电流(A) 802 """ 803 self._is_tension = on 804 if on: 805 self._link.update(2, HoMode.V, i, -360.0, 0.0) 806 self._link.update(3, HoMode.V, i, 360.0, 0.0) 807 else: 808 self.stop(2) 809 self.stop(3)
驱动牵引电机,张紧导管
:param on: 是否开启牵引
:param i: 牵引电流(A)
def
rotation(self, on: bool, velocity: float = 360.0) -> None:
811 def rotation(self, on: bool, velocity: float = 360.0) -> None: 812 """ 813 驱动旋转电机,旋转换能器 814 815 :param on: 是否开启旋转 816 :param velocity: 旋转速度(度/秒) 817 """ 818 self._is_rotation = on 819 if on: 820 self._link.update(8, HoMode.V, 2.0, velocity, 0.0) 821 else: 822 self.stop(8)
驱动旋转电机,旋转换能器
:param on: 是否开启旋转
:param velocity: 旋转速度(度/秒)
824 def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None: 825 """ 826 驱动转向电机,转向导管 827 828 :param x_value: 横向控制值 829 :param y_value: 纵向控制值 830 :param ho_state: Ho 机器人状态 831 """ 832 if x_value == 0 and y_value == 0: 833 for i in range(4, 8): 834 self._link.update(i, HoMode.V, 2.0, 0.0, 0.0) 835 else: 836 projection = compute_vector_projection(x_value, y_value, self._base_angles) 837 control_values = [v * self._running_scale for v in projection] 838 839 for i in range(4, 8): 840 if control_values[i-4] > 0: 841 a_id = i 842 b_id = (i + 2) % 4 + 4 843 a_state = ho_state.get_state(a_id) 844 b_state = ho_state.get_state(b_id) 845 a_near = near(a_state.p, self._zero_angles[a_id]) 846 b_near = near(b_state.p, self._zero_angles[b_id]) 847 848 if a_near and not b_near: 849 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id]) 850 elif (not a_near and b_near) or (a_near and b_near): 851 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 852 elif not a_near and not b_near: 853 self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0) 854 self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
驱动转向电机,转向导管
:param x_value: 横向控制值
:param y_value: 纵向控制值
:param ho_state: Ho 机器人状态
856 def move(self, l_value: float, r_value: float, ho_state: HoState) -> None: 857 """ 858 驱动移动电机,移动导管 859 860 :param l_value: 左侧移动控制值 861 :param r_value: 右侧移动控制值 862 :param ho_state: Ho 机器人状态 863 """ 864 if l_value != 0 and r_value != 0: 865 self._link.update(1, HoMode.V, 2.0, 0.0, 0.0) 866 867 elif l_value != 0 and r_value== 0: 868 self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0) 869 870 elif r_value != 0 and l_value== 0: 871 self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0) 872 873 else: 874 self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)
驱动移动电机,移动导管
:param l_value: 左侧移动控制值
:param r_value: 右侧移动控制值
:param ho_state: Ho 机器人状态
def
is_running(self) -> bool:
876 def is_running(self) -> bool: 877 """ 878 判断当前节点是否处于运行状态。 879 880 :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。 881 """ 882 return self._valid
判断当前节点是否处于运行状态。
:return: 如果当前节点处于运行状态,则返回 True,否则返回 False。
def
enter(self) -> None:
884 def enter(self) -> None: 885 """ 886 进入节点。 887 """ 888 self.state_machine.freeze = False 889 self.state_machine.first_tick = True 890 self.state_machine.current_state = ManualState.IDLE 891 self._is_rotation = False 892 self._is_tension = False 893 self._valid = True
进入节点。
def
exit(self) -> None:
895 def exit(self) -> None: 896 """ 897 退出节点。 898 """ 899 self.state_machine.freeze = True 900 self.state_machine.first_tick = True 901 self.state_machine.current_state = ManualState.IDLE 902 self._valid = False 903 self._is_rotation = False 904 self._is_tension = False 905 self.stop_all()
退出节点。
class
HoManual.InputEvent:
139class InputEvent: 140 """ 输入事件基类 """ 141 def __init__(self): 142 pass 143 144 def get_action_strength(self, action: str) -> float: 145 """ 返回某个动作的强度 """ 146 pass 147 148 def is_action_pressed(self, action: str) -> bool: 149 """ 检查某个动作是否被按下 """ 150 pass 151 152 def is_action_released(self, action: str) -> bool: 153 """ 检查某个动作是否被释放 """ 154 pass
输入事件基类