import os
import os.path as osp
import subprocess
from collections.abc import Iterator
from contextlib import AbstractContextManager
import cameralib
import msgpack_numpy
import numpy as np
from scipy.spatial.transform import Rotation as R
[docs]
class BodyCompressor(AbstractContextManager):
"""Compresses body data to a file using xz compression.
The data is quantized, difference encoded, serialized using ``msgpack_numpy`` then compressed
using ``xz`` and written to the file.
Args:
path: path to the output file
metadata: metadata to be stored in the beginning of the file
quantization_mm: quantization level for the vertices and joints in millimeters.
Coordinates are rounded to the nearest multiple of this value.
"""
def __init__(self, path: str, metadata=None, quantization_mm: float = 0.5, n_threads: int = 0):
os.makedirs(osp.dirname(path), exist_ok=True)
self.f = open(path, 'wb')
self.proc = subprocess.Popen(
['xz', f'--threads={n_threads}', '-5', '--to-stdout'],
stdin=subprocess.PIPE, stdout=self.f)
self.proc.stdin.write(msgpack_numpy.packb(metadata))
self.quantization_mm = quantization_mm
[docs]
def append(self, **kwargs):
"""Append data for frame to the file.
Args:
**kwargs: data to be stored. Supported keys are
- vertices: (N, 3) float32 array of vertices in millimeters
- joints: (N, 3) float32 array of joints in millimeters
- vertex_uncertainties: (N,) float32 array of vertex uncertainties in meters
- joint_uncertainties: (N,) float32 array of joint uncertainties in meters
- camera: cameralib.Camera object or dict with camera parameters
Other keys are also allowed, but they will not be quantized.
"""
compressed = compress(kwargs, quantization_mm=self.quantization_mm)
packed = msgpack_numpy.packb(compressed)
self.proc.stdin.write(packed)
[docs]
def close(self):
"""Wait for the currently pending compression to finish then close the file."""
self.proc.stdin.close()
self.proc.wait()
self.f.close()
[docs]
def __exit__(self, exc_type, exc_value, traceback):
"""Close the file and remove it if an exception occurred."""
if exc_type is not None:
self.f.close()
os.remove(self.f.name)
self.proc.kill()
else:
self.close()
[docs]
class BodyDecompressor(AbstractContextManager, Iterator):
"""Decompresses body data from a file compressed using BodyCompressor.
The data is decompressed using xz, deserialized using msgpack_numpy, difference decoded and
unquantized.
Args:
path: path to the compressed file
"""
def __init__(self, path):
self.f = open(path, 'rb')
self.proc = subprocess.Popen(
['xz', '--threads=0', '--decompress', '--to-stdout'],
stdin=self.f, stdout=subprocess.PIPE)
self.unpacker = msgpack_numpy.Unpacker(self.proc.stdout)
self.metadata = next(self.unpacker)
[docs]
def __next__(self) -> dict:
"""Read the next frame and decode it from the file."""
return decompress(dict(next(self.unpacker)))
[docs]
def close(self):
"""Terminate the decompression process and close the file."""
self.proc.terminate()
self.f.close()
[docs]
def __exit__(self, *args):
"""Close the decompressor."""
self.close()
def quantize_diff(x, factor=2, axis=-2):
return np.diff(np.round(x * factor), prepend=0, axis=axis).astype(np.int32), factor
def unquantize_diff(x, axis=-2):
x, factor = x
return np.cumsum(x, axis=axis).astype(np.float32) / factor
def compress(d, quantization_mm=0.5):
factor = 1 / quantization_mm
for name in ['vertices', 'joints']:
d[f'qd_{name}'] = quantize_diff(d.pop(name), factor=factor, axis=-2)
for name in ['vertex_uncertainties', 'joint_uncertainties']:
if name in d:
d[f'qd_{name}'] = quantize_diff(d.pop(name), factor=factor * 1000, axis=-1)
if 'camera' in d:
if isinstance(d['camera'], cameralib.Camera):
d['camera'] = cam_to_dict(d['camera'])
return d
def decompress(d, decode_camera=False):
for name in ['vertices', 'joints']:
if f'qd_{name}' in d:
d[name] = unquantize_diff(d.pop(f'qd_{name}'), axis=-2)
for name in ['vertex_uncertainties', 'joint_uncertainties']:
if f'qd_{name}' in d:
d[name] = unquantize_diff(d.pop(f'qd_{name}'), axis=-1)
if 'camera' in d and decode_camera:
d['camera'] = dict_to_cam(d['camera'])
return d
def cam_to_dict(cam):
d = dict(
rotvec_w2c=mat2rotvec(cam.R),
loc=cam.t,
intr=cam.intrinsic_matrix[:2],
up=cam.world_up)
if cam.distortion_coeffs is not None and np.count_nonzero(cam.distortion_coeffs) > 0:
d['distcoef'] = cam.distortion_coeffs
return d
def dict_to_cam(d):
return cameralib.Camera(
rot_world_to_cam=rotvec2mat(d['rotvec_w2c']),
optical_center=np.array(d['loc']),
intrinsic_matrix=np.concatenate([d['intr'], np.array([[0, 0, 1]])]),
distortion_coeffs=d.get('distcoef', None),
world_up=d.get('up', (0, 0, 1))
)
def rotvec2mat(rotvec):
return R.from_rotvec(rotvec).as_matrix()
def mat2rotvec(rotmat):
return R.from_matrix(rotmat).as_rotvec()