Module serato_data.parser

Generic parser for Serato data files.

Classes

class Parser (filename: str)
Expand source code
class Parser:
    """\
    Parse a Serato file.

    Subclasses must define their fields to parse and how to retrieve the
    filename.
    """

    DATACLASS: t.Type = dict
    """Dataclass for entries in the file accepting keyword arguments"""

    FIELDS: t.Dict[bytes, t.Union[t.Literal[False], t.Tuple[str, t.Callable[[bytes], t.Any], t.Optional[t.Callable[[t.Any], t.Any]]]]] = {}
    """\
    Mapping of prefix to field parser.  If the field parser is False, the field
    is ignored.  Fields that are not present in the mapping raise an error.

    The field parser is a tuple of (key, parser, formatter):

      * key: Key to use for this field as passed into the dataclass
      * parser: Callback taking the raw bytes read from the file for this field and producing an appropriate value
      * formatter: Callback taking the parsed value and performing any additional processing on it
    """

    SKIP_CTYPES: t.Collection[bytes] = []
    """List of chunk types to skip when parsing"""

    MAIN_CTYPE: bytes = b''
    """Chunk type containing the main data for this parser subclass"""

    HAS_VERSION: bool = True
    """If true, a version chunk is expected and parsed"""

    CHUNK_HDR: struct.Struct = struct.Struct('>4sI')
    """Header format for chunk"""

    def __init__(self, filename: str):
        """\
        Create a new parser for the given filename
        """

        self.filename: str = filename
        self._version: t.Optional[str] = None
        self._version_str: t.Optional[str] = None
        self.data = None

    def _read_chunks(self, data: bytes) -> t.Generator[t.Tuple[str, bytes]]:
        """\
        Read chunks from a block of data containing chunks.
        """

        while True:
            if not data:
                break
            if len(data) < self.CHUNK_HDR.size:
                raise ParserReadError("Failed to read chunk header: need {}b, have {}b".format(self.CHUNK_HDR.size, len(data)))
            ctype, clen = self.CHUNK_HDR.unpack(data[:self.CHUNK_HDR.size])
            data = data[self.CHUNK_HDR.size:]
            if len(data) < clen:
                raise ParserReadError("Failed to read chunk data: need {}b, have {}b".format(clen, len(data)))
            raw = data[:clen]
            data = data[clen:]
            yield ctype, raw

    def _read_chunks_nested_one(self, expected_ctype: bytes, raw: bytes) -> bytes:
        """\
        Read one chunk from a block of data containing exactly one chunk of an
        expected type.
        """

        count = 0
        out = None
        for ctype, raw in self._read_chunks(raw):
            if count > 0:
                raise ParserReadError("Expected 1 chunk of type {}, found more than 1 chunk".format(ctype))
            if ctype != expected_ctype:
                raise ParserReadError("Expected 1 chunk of type {}, found type {}".format(expected_ctype, ctype))
            count += 1
            out = raw
        return out


    def _read_file(self) -> t.Generator[t.Tuple[str, bytes]]:
        """\
        Read chunks from the file this parser was constructed with
        """

        with open(self.filename, 'rb') as fp:
            return self._read_chunks(fp.read())


    def _parse_fields(self, raw: bytes) -> t.Dict[str, t.Any]:
        """\
        Parse fields from a chunk of data
        """

        out = {}
        for ctype, craw in self._read_chunks(raw):
            fdata = self.FIELDS.get(ctype)
            if fdata is False:
                continue
            elif fdata is None:
                logger.error("Failed to parse field %s: %s", repr(ctype), repr(craw))
                continue
            else:
                fname, parser, recast = fdata
                try:
                    out[fname] = parser(craw)
                    if recast:
                        out[fname] = recast(out[fname])
                except Exception as e:
                    logger.error("Failed to parse field %s: %s :: %s", fname, repr(craw), str(e))
                    out.pop(fname, None)
        for ctype, fdata in self.FIELDS.items():
            if fdata:
                fname, parser, recast = fdata
                out.setdefault(fname, None)
        return out

    def make_dataclass_args(self, *args, **kwargs) -> t.Tuple[t.Collection, t.Dict]:
        """\
        Generate the args and kwargs for the dataclass.  By default, args will
        be empty and kwargs will be the fields parsed from a chunk.  Subclasses
        may override this in order to pass different args.
        """

        return args, kwargs

    def loaded(self):
        """\
        Actions to perform after load.  This can be overridden to implement
        functionality like sorting the parsed data.
        """

        pass

    def load(self):
        """\
        Parse the file passed in the constructor.  Generally should not be
        overridden, as it may be called many times.  In general, for tasks that
        should be done once when the data is loaded, override `loaded()`
        instead.

        Files are not loaded on construction, but when a property or method is
        accessed that requires data from the file.  While load may be called
        many times, it will not reload the file once it has been loaded.
        """

        if self.data is None:
            self.data = []
            logger.debug("Parsing file %s", self.filename)
            for ctype, raw in self._read_file():
                if ctype in self.SKIP_CTYPES:
                    logger.debug("In %s, found chunk type %s, ignoring", self.filename, repr(ctype))
                    continue
                elif self.HAS_VERSION and ctype == b'vrsn':
                    logger.debug("In %s, found version chunk, parsing", self.filename)
                    ver, verstr = self.parse_version(raw)
                    logger.debug("Reading %s file %s: %s v%s", self.__class__.__name__, self.filename, verstr, ver)
                    self._version = ver
                    self._version_str = verstr
                elif ctype == self.MAIN_CTYPE:
                    logger.debug("In %s, found %s chunk, parsing one adat chunk", self.filename, repr(ctype))
                    data = self._read_chunks_nested_one(b'adat', raw)
                    parsed_fields = self._parse_fields(data)
                    a, ka = self.make_dataclass_args(**parsed_fields)
                    self.data.append(self.DATACLASS(*a, **ka))
                else:
                    logger.error("In %s, failed to parse chunk %s (main is %s): %s", self.filename, repr(ctype), repr(self.MAIN_CTYPE), repr(raw))
            self.loaded()

    @property
    def version(self) -> t.Optional[str]:
        """Return the version parsed from the file, if any"""

        self.load()
        return self._version

    @property
    def version_str(self) -> t.Optional[str]:
        """Return the version string parsed from the file, if any"""

        self.load()
        return self._version_str

    def __iter__(self):
        """Iterate over data parsed from the file."""

        self.load()
        return iter(self.data)

    # Parser functions

    @classmethod
    def parse_version(cls, raw: bytes) -> t.Tuple[str, str]:
        """\
        Parse a version from the file.  This is automatically used if
        `HAS_VERSION` is True, and a version chunk is encountered.
        """

        d = cls.parse_field_str(raw)
        return d.split('/')

    @classmethod
    def parse_field_str(cls, raw: bytes) -> str:
        """\
        Parse a string field from the file.  Null bytes are removed and decoding
        to unicode is attempted; if the field cannot be decoded, raises
        UnicodeDecodeError.
        """

        out = raw.replace(b'\x00', b'')
        codecs = ['utf8', 'ascii', 'latin1']
        while codecs:
            try:
                return out.decode(codecs.pop(0))
            except UnicodeDecodeError:
                if not codecs:
                    raise

    @classmethod
    def parse_field_int(cls, raw: bytes) -> int:
        """Parse an integer field from the file."""

        return struct.unpack('>I', raw)[0]

    @classmethod
    def parse_field_byte(cls, raw: bytes) -> int:
        """\
        Parse a byte field from the file.  Bytes are treated as short integers.
        """

        return struct.unpack('>B', raw)[0]

    @classmethod
    def parse_field_bool(cls, raw: bytes) -> bool:
        """\
        Parse a boolean field from the file.  Boolean fields are internally
        stored as bytes.
        """

        return bool(cls.parse_field_byte(raw))

    @classmethod
    def parse_field_arrow(cls, raw: bytes):
        """\
        Parse a date/time field from the file.  These fields are stored as
        integer unix timestamps, these timestamps are parsed into Arrow objects
        for easier usage.
        """

        ts = cls.parse_field_int(raw)
        return arrow.get(ts)

Parse a Serato file.

Subclasses must define their fields to parse and how to retrieve the filename.

Create a new parser for the given filename

Subclasses

Class variables

var CHUNK_HDR : _struct.Struct

Header format for chunk

var DATACLASS : Type

Dataclass for entries in the file accepting keyword arguments

var FIELDS : Dict[bytes, Literal[False] | Tuple[str, Callable[[bytes], Any], Callable[[Any], Any] | None]]

Mapping of prefix to field parser. If the field parser is False, the field is ignored. Fields that are not present in the mapping raise an error.

The field parser is a tuple of (key, parser, formatter):

  • key: Key to use for this field as passed into the dataclass
  • parser: Callback taking the raw bytes read from the file for this field and producing an appropriate value
  • formatter: Callback taking the parsed value and performing any additional processing on it
var HAS_VERSION : bool

If true, a version chunk is expected and parsed

var MAIN_CTYPE : bytes

Chunk type containing the main data for this parser subclass

var SKIP_CTYPES : Collection[bytes]

List of chunk types to skip when parsing

Static methods

def parse_field_arrow(raw: bytes)

Parse a date/time field from the file. These fields are stored as integer unix timestamps, these timestamps are parsed into Arrow objects for easier usage.

def parse_field_bool(raw: bytes) ‑> bool

Parse a boolean field from the file. Boolean fields are internally stored as bytes.

def parse_field_byte(raw: bytes) ‑> int

Parse a byte field from the file. Bytes are treated as short integers.

def parse_field_int(raw: bytes) ‑> int

Parse an integer field from the file.

def parse_field_str(raw: bytes) ‑> str

Parse a string field from the file. Null bytes are removed and decoding to unicode is attempted; if the field cannot be decoded, raises UnicodeDecodeError.

def parse_version(raw: bytes) ‑> Tuple[str, str]

Parse a version from the file. This is automatically used if HAS_VERSION is True, and a version chunk is encountered.

Instance variables

prop version : str | None
Expand source code
@property
def version(self) -> t.Optional[str]:
    """Return the version parsed from the file, if any"""

    self.load()
    return self._version

Return the version parsed from the file, if any

prop version_str : str | None
Expand source code
@property
def version_str(self) -> t.Optional[str]:
    """Return the version string parsed from the file, if any"""

    self.load()
    return self._version_str

Return the version string parsed from the file, if any

Methods

def load(self)
Expand source code
def load(self):
    """\
    Parse the file passed in the constructor.  Generally should not be
    overridden, as it may be called many times.  In general, for tasks that
    should be done once when the data is loaded, override `loaded()`
    instead.

    Files are not loaded on construction, but when a property or method is
    accessed that requires data from the file.  While load may be called
    many times, it will not reload the file once it has been loaded.
    """

    if self.data is None:
        self.data = []
        logger.debug("Parsing file %s", self.filename)
        for ctype, raw in self._read_file():
            if ctype in self.SKIP_CTYPES:
                logger.debug("In %s, found chunk type %s, ignoring", self.filename, repr(ctype))
                continue
            elif self.HAS_VERSION and ctype == b'vrsn':
                logger.debug("In %s, found version chunk, parsing", self.filename)
                ver, verstr = self.parse_version(raw)
                logger.debug("Reading %s file %s: %s v%s", self.__class__.__name__, self.filename, verstr, ver)
                self._version = ver
                self._version_str = verstr
            elif ctype == self.MAIN_CTYPE:
                logger.debug("In %s, found %s chunk, parsing one adat chunk", self.filename, repr(ctype))
                data = self._read_chunks_nested_one(b'adat', raw)
                parsed_fields = self._parse_fields(data)
                a, ka = self.make_dataclass_args(**parsed_fields)
                self.data.append(self.DATACLASS(*a, **ka))
            else:
                logger.error("In %s, failed to parse chunk %s (main is %s): %s", self.filename, repr(ctype), repr(self.MAIN_CTYPE), repr(raw))
        self.loaded()

Parse the file passed in the constructor. Generally should not be overridden, as it may be called many times. In general, for tasks that should be done once when the data is loaded, override loaded() instead.

Files are not loaded on construction, but when a property or method is accessed that requires data from the file. While load may be called many times, it will not reload the file once it has been loaded.

def loaded(self)
Expand source code
def loaded(self):
    """\
    Actions to perform after load.  This can be overridden to implement
    functionality like sorting the parsed data.
    """

    pass

Actions to perform after load. This can be overridden to implement functionality like sorting the parsed data.

def make_dataclass_args(self, *args, **kwargs) ‑> Tuple[Collection, Dict]
Expand source code
def make_dataclass_args(self, *args, **kwargs) -> t.Tuple[t.Collection, t.Dict]:
    """\
    Generate the args and kwargs for the dataclass.  By default, args will
    be empty and kwargs will be the fields parsed from a chunk.  Subclasses
    may override this in order to pass different args.
    """

    return args, kwargs

Generate the args and kwargs for the dataclass. By default, args will be empty and kwargs will be the fields parsed from a chunk. Subclasses may override this in order to pass different args.