Source code for mtc.base.vector
import io
import textwrap
from typing import Self
from .parser import Parser
from .utils import int_to_bytes, bytes_to_int, bytes_needed, printable_bytes_truncate
[docs]
class Vector(Parser, metaclass=VectorMeta):
data_type: type[Parser]
max_length: int = 1
min_length: int = 1
# this is computed in metaclass
marker_size: int
def __init__(self, /, *value: Parser) -> None:
self.value = value
self._bytes_cache: bytes | None = None
[docs]
def to_bytes(self) -> bytes:
if self._bytes_cache is None:
# using BytesIO because repeated byte concatenation is slow
bio = io.BytesIO()
for item in self.value:
bio.write(item.to_bytes())
b = bio.getvalue()
self._bytes_cache = int_to_bytes(len(b), self.marker_size) + b
return self._bytes_cache
[docs]
@classmethod
def parse(cls, data: io.BufferedIOBase) -> Self:
size = bytes_to_int(data.read(cls.marker_size))
if not cls.min_length <= size <= cls.max_length:
raise cls.ParsingError(data.tell() - cls.marker_size, data.tell(),
f"Invalid vector size {size} outside {cls.min_length}-{cls.max_length}")
l = []
offset_start = data.tell()
while data.tell() - offset_start < size:
result = cls.data_type.parse(data)
l.append(result)
if data.tell() - offset_start > size:
raise cls.ParsingError(offset_start + size, data.tell(), "extra data read while processing vector")
return cls(*l)
[docs]
@classmethod
def skip(cls, stream: io.BufferedIOBase) -> None:
size = bytes_to_int(stream.read(cls.marker_size))
stream.seek(size, io.SEEK_CUR)
[docs]
def print(self) -> str:
header = "-" * 20 + f"Vector {self.__class__.__name__} ({len(self)})" + "-" * 20 + "\n"
footer = "-" * 18 + f"End vector {self.__class__.__name__}" + "-" * 18
inner = ""
for v in self.value:
inner += v.print() + "\n"
return header + textwrap.indent(inner, "\t") + footer
[docs]
def validate(self) -> None:
for i, v in enumerate(self.value):
if not isinstance(v, self.data_type):
raise ValueError(
f"Input item {i} to {self.__class__.__name__} is not of type {self.data_type.__name__}. Found {type(v).__name__}")
# a special type of vector that can probably be implemented as a Vector of chars
[docs]
class OpaqueVector(Parser, metaclass=VectorMeta):
min_length: int = 0
max_length: int = 0
# this is computed in metaclass
marker_size: int
def __init__(self, /, value: bytes) -> None:
self.value = value
[docs]
def to_bytes(self) -> bytes:
# vector size marker then value
return int_to_bytes(len(self.value), self.marker_size) + self.value
[docs]
@classmethod
def parse(cls, stream: io.BufferedIOBase) -> Self:
size = bytes_to_int(stream.read(cls.marker_size))
if not cls.min_length <= size <= cls.max_length:
raise cls.ParsingError(stream.tell() - cls.marker_size, stream.tell(),
f"Invalid vector size {size} outside {cls.min_length}-{cls.max_length}")
return cls(stream.read(size))
[docs]
def validate(self) -> None:
if not self.min_length <= len(self.value) <= self.max_length:
raise self.ValidationError(
f"Invalid data size {len(self.value)}. Must be between {self.min_length} and {self.max_length}")
[docs]
def print(self) -> str:
b = self.value
return f"{len(b) + self.marker_size} {self.__class__.__name__} {printable_bytes_truncate(b, 80)}"
[docs]
@classmethod
def skip(cls, stream: io.BufferedIOBase) -> None:
size = bytes_to_int(stream.read(cls.marker_size))
stream.seek(size, io.SEEK_CUR)
[docs]
class Array(Parser):
length: int
def __init__(self, /, value: bytes) -> None:
self.value = value
[docs]
def to_bytes(self) -> bytes:
return self.value
[docs]
@classmethod
def parse(cls, stream: io.BufferedIOBase) -> Self:
return cls(stream.read(cls.length))
[docs]
@classmethod
def skip(cls, stream: io.BufferedIOBase) -> None:
stream.seek(cls.length, io.SEEK_CUR)
[docs]
def validate(self) -> None:
if len(self.value) != self.length:
raise self.ValidationError(
f"Invalid data size {len(self.value)}. Must be of length {self.length}")
[docs]
def print(self) -> str:
b = self.value
return f"{self.length} Array {self.__class__.__name__} {printable_bytes_truncate(b, 80)}"
__all__ = ["OpaqueVector", "Vector", "Array"]