Source code for ifgen.plugins.struct_receiver

"""
A struct-receiver interface implementation.
"""

# built-in
from typing import Any

# third-party
from vcorelib.io.arbiter import ARBITER
from vcorelib.names import to_snake
from vcorelib.paths import rel

# internal
from ifgen.enums import Language
from ifgen.generation.interface import GenerateTask
from ifgen.generation.python import python_imports
from ifgen.struct import header_for_type
from ifgen.struct.python import python_dependencies


[docs] def get_receiver_struct_names(task: GenerateTask) -> list[str]: """Get names of structs to create receiver entries for.""" data = task.env.config.data return list( x for x in data.get("structs", {}) if data["structs"][x]["codec"] and data["structs"][x].get("allocate", True) and task.env.config.check_name(x) )
[docs] def python_struct_receiver(task: GenerateTask) -> None: """Python struct receiver generation.""" structs = get_receiver_struct_names(task) with task.boilerplate(json=False, parent_depth=2) as writer: python_imports( writer, third_party={ "runtimepy.codec.protocol.receiver": ["StructReceiver"] }, internal=python_dependencies( [], structs, struct_prefix="..structs" ), final_empty=1, ) writer.write("RECEIVER = StructReceiver(") tlm_structs = [] with writer.indented(): for struct in structs: writer.write(struct + ",") if not task.env.config.data["structs"][struct].get( "control", True ): tlm_structs.append(struct) writer.write(")") if tlm_structs: writer.empty() writer.write("# Non-control structures.") for struct in tlm_structs: writer.write(f"RECEIVER.add_handler({struct}.id())") runtimepy_structs: list[Any] = [] for struct in structs: snake = to_snake(struct) runtimepy_structs.append( { "name": snake, "config": { "control": task.env.config.data["structs"][struct].get( "control", True ), "protocol_factory": ".".join( list( rel( task.env.directories[Language.PYTHON].output, base=task.env.root_path, ).parts ) + ["structs", snake, struct] ), }, } ) assert ARBITER.encode( task.path.with_suffix(".yaml"), {"structs": runtimepy_structs} )[0]
# pylint: disable=too-many-statements
[docs] def cpp_struct_receiver(task: GenerateTask) -> None: """C++ struct receiver generation.""" structs = get_receiver_struct_names(task) names_qualified = {} for struct in structs: names = task.env.config.data["structs"][struct].get("namespace", []) names.append(struct) names_qualified[struct] = "::".join(names) with task.boilerplate( json=False, includes=["<functional>"] + [header_for_type(x, task) for x in structs], parent_depth=2, ) as writer: writer.write("template <ifgen_struct T>") writer.write("using struct_handler = std::function<void(const T &)>;") writer.empty() writer.write("using non_struct_handler =") with writer.indented(): writer.write( "std::function<std::size_t(const std::byte *, std::size_t)>;" ) writer.empty() writer.write("struct StructReceiver") with writer.scope(suffix=";"): for struct in structs: snake = to_snake(struct) writer.write(f"{names_qualified[struct]} {snake};") writer.write( f"struct_handler<decltype({snake})> " f"{snake}_handler = nullptr;" ) writer.empty() writer.write("non_struct_handler non_struct = nullptr;") writer.empty() writer.write("std::size_t dropped_messages = 0;") writer.write("std::size_t dropped_bytes = 0;") writer.empty() writer.write("inline void drop_message(std::size_t &len)") with writer.scope(): writer.write("if (len)") with writer.scope(): writer.write("dropped_messages++;") writer.write("dropped_bytes += len;") writer.write("len = 0;") writer.empty() writer.write("template <std::endian endianness = default_endian>") writer.write( "void handle_message(const std::byte *data, std::size_t len)" ) with writer.scope(): writer.write("if (len < sizeof(struct_id_t))") with writer.scope(): writer.write("drop_message(len);") writer.write("return;") writer.empty() writer.c_comment("Read identifier and advance buffer.") writer.write("struct_id_t ident = handle_endian<endianness>(") with writer.indented(): writer.write( "*reinterpret_cast<const struct_id_t *>(data));" ) writer.write("data += sizeof(struct_id_t);") writer.write("len -= sizeof(struct_id_t);") writer.empty() writer.write("switch (ident)") writer.write("{") writer.write("case 0:") with writer.indented(): writer.write("if (non_struct)") with writer.scope(): writer.write("auto result = non_struct(data, len);") writer.write("if (result)") with writer.scope(): writer.write("data += result;") writer.write("len -= result;") writer.write("else") with writer.scope(): writer.write("drop_message(len);") writer.write("else") with writer.scope(): writer.write("drop_message(len);") writer.write("break;") for struct in structs: snake = to_snake(struct) writer.write(f"case decltype({snake})::id:") with writer.indented(): writer.write(f"if (len >= decltype({snake})::size)") with writer.scope(): writer.write(f"{snake}.decode<endianness>(") with writer.indented(): writer.write( f"reinterpret_cast<const decltype" f"({snake}) *>(data)->raw_ro());" ) with writer.padding(): writer.write(f"if ({snake}_handler)") with writer.scope(): writer.write(f"{snake}_handler({snake});") writer.write(f"data += decltype({snake})::size;") writer.write(f"len -= decltype({snake})::size;") writer.write("else") with writer.scope(): writer.write("drop_message(len);") writer.write("break;") writer.write("default:") with writer.indented(): writer.c_comment("Couldn't match any identifier.") writer.write("drop_message(len);") writer.write("}") writer.empty() writer.c_comment("Continue if more bytes remain.") writer.write("if (len)") with writer.scope(): writer.write("handle_message<endianness>(data, len);")