Package deepsport_utilities
Sub-modules
deepsport_utilities.calibdeepsport_utilities.courtdeepsport_utilities.datasetdeepsport_utilities.dsdeepsport_utilities.transformsdeepsport_utilities.utils
Functions
def deserialize_keys(keys, type)-
Expand source code
def deserialize_keys(keys, type): return list(map(lambda k: type(*k), keys)) def export_dataset(dataset: mlworkflow.datasets.Dataset,
prefix: str,
keys=None,
verbose: bool = True,
download: bool = True)-
Expand source code
def export_dataset(dataset: Dataset, prefix: str, keys=None, verbose: bool=True, download: bool=True): """ Export a dataset to disk by saving - serialized dataset items in their original database format in a json file, - and a list of necessary files in a txt file. Arguments: - dataset: the dataset to export - prefix: the prefix of the output files """ files = [] items = [] keys = keys or dataset.keys for key in keys: item = dataset.query_item(key, download=download) files = files + list(item.files) items.append(item.db_item) filename = prefix + '-index.json' with open(filename, 'w') as fd: json.dump(items, fd) if verbose: print(f"Dataset index successfully created in '{filename}'") filename = prefix + '-files.txt' with open(filename, 'w') as fd: files = map(lambda x:x+'\n', files) fd.writelines(files) if verbose: print(f"Dataset file list successfully created in '{filename}'") print(f"You can zip them by running\n$ zip -r <filename>.zip `cat {filename}`")Export a dataset to disk by saving - serialized dataset items in their original database format in a json file, - and a list of necessary files in a txt file.
Arguments
- dataset: the dataset to export
- prefix: the prefix of the output files
def find(path, dirs=None, verbose=True, fail=True)-
Expand source code
def find(path, dirs=None, verbose=True, fail=True): if os.path.isabs(path): if not os.path.isfile(path) and not os.path.isdir(path): if not fail: not verbose or print(f"{path} not found") return None raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path) return path dirs = dirs or [os.getcwd(), *os.getenv("DATA_PATH", "").split(":")] for dirname in dirs: if dirname is None: continue tmp_path = os.path.join(dirname, path) if os.path.isfile(tmp_path) or os.path.isdir(tmp_path): not verbose or print("{} found in {}".format(path, tmp_path)) return tmp_path if not fail: not verbose or print(f"{path} not found (searched in {dirs})") return None raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), "{} (searched in {})".format(path, dirs)) def import_dataset(dataset_type: mlworkflow.datasets.Dataset, filename: str, **dataset_config)-
Expand source code
def import_dataset(dataset_type: Dataset, filename: str, **dataset_config): """ Import a dataset exported by `export_dataset` by providing the - dataset Type, - the serialized dataset items in their original database format in a json file, - and the config dictionary required to build each dataset item (dataset dependant) """ return ImportedDataset(filename=filename, dataset_type=dataset_type, **dataset_config)Import a dataset exported by
export_dataset()by providing the - dataset Type, - the serialized dataset items in their original database format in a json file, - and the config dictionary required to build each dataset item (dataset dependant) def serialize_keys(keys)-
Expand source code
def serialize_keys(keys): return list(map(tuple, keys)) # keys should already be tuples by design, but here we remove any reference to possible NamedTuple
Classes
class Court (rule_type='FIBA')-
Expand source code
class Court(): """ Defines the world 3D coordinates of a Basketball court. Please refer to calib.py if you want to obtain their 2D image coordinates """ def __init__(self, rule_type="FIBA"): self.rule_type = rule_type court_type = court_types_from_rule_type.get(rule_type, rule_type) self.court_definition = court_definitions[court_type] self.w, self.h = self.court_definition.width, self.court_definition.height @property def size(self): return self.w, self.h @property def corners(self): return Point3D([0,self.w, self.w,0],[0,0,self.h,self.h],[0,0,0,0]) @property def edges(self): for c1, c2 in zip(self.corners, self.corners.close()[:,1:]): yield c1, c2 def visible_edges(self, calib: Calib): for edge in self.edges: try: yield calib.visible_edge(edge) except ValueError: continue def projects_in(self, calib: Calib, point2D): point3D = calib.project_2D_to_3D(point2D, Z=0) return point3D.x >= 0 and point3D.x <= self.w and point3D.y >= 0 and point3D.y <= self.h def draw_rim(self, image, calib: Calib, color=(128,200,90), thickness=2): pd = ProjectiveDrawer(calib, color, thickness=thickness) offset = self.court_definition.rim_center_offset h = self.court_definition.rim_height r = self.court_definition.rim_radius pd.draw_arc(image, Point3D(offset, self.h/2, -h), r) pd.draw_arc(image, Point3D(self.w-offset, self.h/2, -h), r) def draw_poles(self, image, calib: Calib, color=(10,10,10)): pass def draw_net(self, image, calib: Calib, color=(128,90,200)): offset = self.court_definition.rim_center_offset r = self.court_definition.rim_radius angles = np.linspace(0, np.pi*2, 20) pd = ProjectiveDrawer(calib, color) for x in [offset, self.w-offset]: # two rims center = Point3D(x, self.h/2, -self.court_definition.rim_height) xts = np.cos(angles)*r + center.x yts = np.sin(angles)*r + center.y xbs = np.cos(angles)*r/2 + center.x ybs = np.sin(angles)*r/2 + center.y zs = np.ones_like(angles)*center.z points = Point3D([Point3D(x,y,z) for x,y,z in zip(np.concatenate((xts,xbs)),np.concatenate((yts,ybs)),np.concatenate((zs,zs+40)))]) pd.polylines(image, points) def _get_three_points_anchors(self): def sign(value): return -1 if value < 0 else 1 # +----------+----------------------------+-----------------------------+----------------------------+------------+ # | | | ^ | | | # | | | RIM | | | | # | | | offset | | | | # | | | v | | | # | | | -----( X ) | | | # | 3-POINTS | | ------- ^ | | | # | limit | ------|- | | | | | # |<-------->| ------- | \ | | | | # | | ------- Z | '. | | | | # | |------- | alpha `---| | | | # | | | | | | | # | \ | | 3-POINTS | / | # | | | | dist | | | # | \ | | | / | # | \ +-----+-----------------+-----+ / | # | \ | | | / | # | '. \ | / .' | # | '-. '. | .' .-' | # | '-. `-----´ .-' | # | '--. | .--' | # | '--. | .--' | # | '---___ | ___-ˇ-' | # | ''-------___v____-------'' | # | | # The goal of this function is to determine the "alpha" angle (see illustration above). # alpha: angle (in radian) between a a line at the center of the court height, and where the 3-points arc meets the straight line (nearby the "3-POINTS limit" annotation here above) # Y: line from the center of the rim to the point where the 3-points arc meets the straight line (nearby the "3-POINTS limit" annotation here above) # See https://mathworld.wolfram.com/Circle-LineIntersection.html for terminology # Note that all the computation are done on a euclidean standard orthonormal basis (positive abscissa pointing to the right, positive ordinate pointing to the top) y1 = 0 y2 = - self.w/2 # Arbitary point on the extension of the 3-points line x1 = -(self.h/2 - self.court_definition.three_point_limit) # Considers the center of the 3-points circle at (0, 0), with y-axis inverted x2 = x1 dx = x2 - x1 dy = y2 - y1 dr = np.sqrt(dx**2 + dy**2) det = x1*y2 - x2*y1 discriminant = (self.court_definition.three_point_distance**2) * (dr**2) - det**2 assert discriminant >= 0 # Two points of intersection (one in the court, the other before the baseline) or 1 point intersection x_intersection = (det*dy + sign(dy)*dx*np.sqrt(discriminant)) / (dr**2) y_intersection = (-det*dx + abs(dy)*np.sqrt(discriminant)) / (dr**2) if y_intersection >= 0: # The estimated intersection is before the rim, compute the second intersection x_intersection = (det*dy - sign(dy)*dx*np.sqrt(discriminant)) / (dr**2) y_intersection = (-det*dx - abs(dy)*np.sqrt(discriminant)) / (dr**2) # Compute alpha, the angle between the Z line and the vertical "3-points dist" line Z_line_vector = [0 - 0, 0 + self.court_definition.three_point_distance] three_point_dist_line_vector = [0 - x_intersection, 0 - y_intersection] alpha = np.arccos(np.dot(Z_line_vector, three_point_dist_line_vector)/(np.linalg.norm(Z_line_vector)*np.linalg.norm(three_point_dist_line_vector))) return alpha, -y_intersection + self.court_definition.rim_center_offset def draw_lines(self, image, calib: Calib, color=(255,255,0), thickness=5): """ If color is None, use incremental color """ pd = ProjectiveDrawer(calib, color, thickness) N = 100 # maximum number of lines _color = iter(range(1, N)) if color is None else iter([color]*N) # draw court borders for edge in self.edges:#self.visible_edges(calib): pd.draw_line(image, edge[0], edge[1], color=next(_color)) # draw midcourt-line pd.draw_line(image, Point3D(self.w/2,0,0), Point3D(self.w/2,self.h,0), color=next(_color)) # draw central circle r = self.court_definition.circle_diameter/2 pd.draw_arc(image, Point3D(self.w/2,self.h/2,0), r, color=next(_color)) # draw paints w, l = self.court_definition.key_area_width, self.court_definition.key_area_length # left paint pd.draw_line(image, Point3D(0, self.h/2-w/2, 0), Point3D(l, self.h/2-w/2, 0), color=next(_color)) pd.draw_line(image, Point3D(l, self.h/2-w/2, 0), Point3D(l, self.h/2+w/2, 0), color=next(_color)) pd.draw_line(image, Point3D(0, self.h/2+w/2, 0), Point3D(l, self.h/2+w/2, 0), color=next(_color)) pd.draw_arc(image, Point3D(l, self.h/2,0), r, -np.pi/2, np.pi/2, color=next(_color)) # right paint pd.draw_line(image, Point3D(self.w, self.h/2-w/2, 0), Point3D(self.w-l, self.h/2-w/2, 0), color=next(_color)) pd.draw_line(image, Point3D(self.w-l, self.h/2-w/2, 0), Point3D(self.w-l, self.h/2+w/2, 0), color=next(_color)) pd.draw_line(image, Point3D(self.w, self.h/2+w/2, 0), Point3D(self.w-l, self.h/2+w/2, 0), color=next(_color)) pd.draw_arc(image, Point3D(self.w-l,self.h/2,0), r, np.pi/2, 3*np.pi/2, color=next(_color)) # draw 3-points line offset = self.court_definition.rim_center_offset r = self.court_definition.three_point_distance arc_rad, x_intersection = self._get_three_points_anchors() # left 3-points-line pd.draw_arc(image, Point3D(offset,self.h/2,0), r, -arc_rad, arc_rad, color=next(_color)) pd.draw_line(image, Point3D(0,self.court_definition.three_point_limit,0), Point3D(x_intersection,self.court_definition.three_point_limit,0), color=next(_color)) pd.draw_line(image, Point3D(0,self.h-self.court_definition.three_point_limit,0), Point3D(x_intersection,self.h-self.court_definition.three_point_limit,0), color=next(_color)) # right 3-points-line pd.draw_arc(image, Point3D(self.w-offset,self.h/2,0), r, np.pi -arc_rad, np.pi + arc_rad, color=next(_color)) pd.draw_line(image, Point3D(self.w,self.court_definition.three_point_limit,0), Point3D(self.w-x_intersection,self.court_definition.three_point_limit,0), color=next(_color)) pd.draw_line(image, Point3D(self.w,self.h-self.court_definition.three_point_limit,0), Point3D(self.w-x_intersection,self.h-self.court_definition.three_point_limit,0), color=next(_color)) def fill_court(self, image, calib, color=(255, 0, 255)): pd = ProjectiveDrawer(calib, color) pd.fill_polygon(image, self.corners) def fill_court_coordinates(self, image, calib): pass @property def left_key_area(self): """ Return the 3D coordinates of the 4 corners of the left key area """ return Point3D([0, self.court_definition.key_area_length, self.court_definition.key_area_length, 0], # x [self.h/2 - self.court_definition.key_area_width/2, self.h/2 - self.court_definition.key_area_width/2, self.h/2 + self.court_definition.key_area_width/2, self.h/2 + self.court_definition.key_area_width/2], # y [0,0,0,0]) # z @property def right_key_area(self): """ Return the 3D coordinates of the 4 corners of the right key area """ return Point3D([self.w - self.court_definition.key_area_length, self.w, self.w, self.w - self.court_definition.key_area_length], #x [self.h/2 - self.court_definition.key_area_width/2, self.h/2 - self.court_definition.key_area_width/2, self.h/2 + self.court_definition.key_area_width/2, self.h/2 + self.court_definition.key_area_width/2], # y [0,0,0,0]) # z @property def left_board(self): """ Return the 3D coordinates of the 4 corners of the left basketball board (panel) """ board_offset = self.court_definition.board_offset board_width = self.court_definition.board_width board_height = self.court_definition.board_height board_elevation = self.court_definition.board_elevation return Point3D([0+board_offset, 0+board_offset, 0+board_offset, 0+board_offset], # x [self.h/2-board_width/2,self.h/2+board_width/2, self.h/2+board_width/2, self.h/2-board_width/2], # y [-board_elevation, -board_elevation, -board_elevation-board_height, -board_elevation-board_height]) # z @property def right_board(self): """ Return the 3D coordinates of the 4 corners of the right basketball board (panel) """ board_offset = self.court_definition.board_offset board_width = self.court_definition.board_width board_height = self.court_definition.board_height board_elevation = self.court_definition.board_elevation return Point3D([self.w-board_offset, self.w-board_offset, self.w-board_offset, self.w-board_offset], # x [self.h/2-board_width/2,self.h/2+board_width/2, self.h/2+board_width/2, self.h/2-board_width/2], # y [-board_elevation, -board_elevation, -board_elevation-board_height, -board_elevation-board_height]) # z def fill_board(self, image, calib: Calib, color=(128,255,128)): pd = ProjectiveDrawer(calib, color) pd.fill_polygon(image, self.left_board) pd.fill_polygon(image, self.right_board)Defines the world 3D coordinates of a Basketball court. Please refer to calib.py if you want to obtain their 2D image coordinates
Instance variables
prop corners-
Expand source code
@property def corners(self): return Point3D([0,self.w, self.w,0],[0,0,self.h,self.h],[0,0,0,0]) prop edges-
Expand source code
@property def edges(self): for c1, c2 in zip(self.corners, self.corners.close()[:,1:]): yield c1, c2 prop left_board-
Expand source code
@property def left_board(self): """ Return the 3D coordinates of the 4 corners of the left basketball board (panel) """ board_offset = self.court_definition.board_offset board_width = self.court_definition.board_width board_height = self.court_definition.board_height board_elevation = self.court_definition.board_elevation return Point3D([0+board_offset, 0+board_offset, 0+board_offset, 0+board_offset], # x [self.h/2-board_width/2,self.h/2+board_width/2, self.h/2+board_width/2, self.h/2-board_width/2], # y [-board_elevation, -board_elevation, -board_elevation-board_height, -board_elevation-board_height]) # zReturn the 3D coordinates of the 4 corners of the left basketball board (panel)
prop left_key_area-
Expand source code
@property def left_key_area(self): """ Return the 3D coordinates of the 4 corners of the left key area """ return Point3D([0, self.court_definition.key_area_length, self.court_definition.key_area_length, 0], # x [self.h/2 - self.court_definition.key_area_width/2, self.h/2 - self.court_definition.key_area_width/2, self.h/2 + self.court_definition.key_area_width/2, self.h/2 + self.court_definition.key_area_width/2], # y [0,0,0,0]) # zReturn the 3D coordinates of the 4 corners of the left key area
prop right_board-
Expand source code
@property def right_board(self): """ Return the 3D coordinates of the 4 corners of the right basketball board (panel) """ board_offset = self.court_definition.board_offset board_width = self.court_definition.board_width board_height = self.court_definition.board_height board_elevation = self.court_definition.board_elevation return Point3D([self.w-board_offset, self.w-board_offset, self.w-board_offset, self.w-board_offset], # x [self.h/2-board_width/2,self.h/2+board_width/2, self.h/2+board_width/2, self.h/2-board_width/2], # y [-board_elevation, -board_elevation, -board_elevation-board_height, -board_elevation-board_height]) # zReturn the 3D coordinates of the 4 corners of the right basketball board (panel)
prop right_key_area-
Expand source code
@property def right_key_area(self): """ Return the 3D coordinates of the 4 corners of the right key area """ return Point3D([self.w - self.court_definition.key_area_length, self.w, self.w, self.w - self.court_definition.key_area_length], #x [self.h/2 - self.court_definition.key_area_width/2, self.h/2 - self.court_definition.key_area_width/2, self.h/2 + self.court_definition.key_area_width/2, self.h/2 + self.court_definition.key_area_width/2], # y [0,0,0,0]) # zReturn the 3D coordinates of the 4 corners of the right key area
prop size-
Expand source code
@property def size(self): return self.w, self.h
Methods
def draw_lines(self, image, calib: calib3d.calib.Calib, color=(255, 255, 0), thickness=5)-
Expand source code
def draw_lines(self, image, calib: Calib, color=(255,255,0), thickness=5): """ If color is None, use incremental color """ pd = ProjectiveDrawer(calib, color, thickness) N = 100 # maximum number of lines _color = iter(range(1, N)) if color is None else iter([color]*N) # draw court borders for edge in self.edges:#self.visible_edges(calib): pd.draw_line(image, edge[0], edge[1], color=next(_color)) # draw midcourt-line pd.draw_line(image, Point3D(self.w/2,0,0), Point3D(self.w/2,self.h,0), color=next(_color)) # draw central circle r = self.court_definition.circle_diameter/2 pd.draw_arc(image, Point3D(self.w/2,self.h/2,0), r, color=next(_color)) # draw paints w, l = self.court_definition.key_area_width, self.court_definition.key_area_length # left paint pd.draw_line(image, Point3D(0, self.h/2-w/2, 0), Point3D(l, self.h/2-w/2, 0), color=next(_color)) pd.draw_line(image, Point3D(l, self.h/2-w/2, 0), Point3D(l, self.h/2+w/2, 0), color=next(_color)) pd.draw_line(image, Point3D(0, self.h/2+w/2, 0), Point3D(l, self.h/2+w/2, 0), color=next(_color)) pd.draw_arc(image, Point3D(l, self.h/2,0), r, -np.pi/2, np.pi/2, color=next(_color)) # right paint pd.draw_line(image, Point3D(self.w, self.h/2-w/2, 0), Point3D(self.w-l, self.h/2-w/2, 0), color=next(_color)) pd.draw_line(image, Point3D(self.w-l, self.h/2-w/2, 0), Point3D(self.w-l, self.h/2+w/2, 0), color=next(_color)) pd.draw_line(image, Point3D(self.w, self.h/2+w/2, 0), Point3D(self.w-l, self.h/2+w/2, 0), color=next(_color)) pd.draw_arc(image, Point3D(self.w-l,self.h/2,0), r, np.pi/2, 3*np.pi/2, color=next(_color)) # draw 3-points line offset = self.court_definition.rim_center_offset r = self.court_definition.three_point_distance arc_rad, x_intersection = self._get_three_points_anchors() # left 3-points-line pd.draw_arc(image, Point3D(offset,self.h/2,0), r, -arc_rad, arc_rad, color=next(_color)) pd.draw_line(image, Point3D(0,self.court_definition.three_point_limit,0), Point3D(x_intersection,self.court_definition.three_point_limit,0), color=next(_color)) pd.draw_line(image, Point3D(0,self.h-self.court_definition.three_point_limit,0), Point3D(x_intersection,self.h-self.court_definition.three_point_limit,0), color=next(_color)) # right 3-points-line pd.draw_arc(image, Point3D(self.w-offset,self.h/2,0), r, np.pi -arc_rad, np.pi + arc_rad, color=next(_color)) pd.draw_line(image, Point3D(self.w,self.court_definition.three_point_limit,0), Point3D(self.w-x_intersection,self.court_definition.three_point_limit,0), color=next(_color)) pd.draw_line(image, Point3D(self.w,self.h-self.court_definition.three_point_limit,0), Point3D(self.w-x_intersection,self.h-self.court_definition.three_point_limit,0), color=next(_color))If color is None, use incremental color
def draw_net(self, image, calib: calib3d.calib.Calib, color=(128, 90, 200))-
Expand source code
def draw_net(self, image, calib: Calib, color=(128,90,200)): offset = self.court_definition.rim_center_offset r = self.court_definition.rim_radius angles = np.linspace(0, np.pi*2, 20) pd = ProjectiveDrawer(calib, color) for x in [offset, self.w-offset]: # two rims center = Point3D(x, self.h/2, -self.court_definition.rim_height) xts = np.cos(angles)*r + center.x yts = np.sin(angles)*r + center.y xbs = np.cos(angles)*r/2 + center.x ybs = np.sin(angles)*r/2 + center.y zs = np.ones_like(angles)*center.z points = Point3D([Point3D(x,y,z) for x,y,z in zip(np.concatenate((xts,xbs)),np.concatenate((yts,ybs)),np.concatenate((zs,zs+40)))]) pd.polylines(image, points) def draw_poles(self, image, calib: calib3d.calib.Calib, color=(10, 10, 10))-
Expand source code
def draw_poles(self, image, calib: Calib, color=(10,10,10)): pass def draw_rim(self, image, calib: calib3d.calib.Calib, color=(128, 200, 90), thickness=2)-
Expand source code
def draw_rim(self, image, calib: Calib, color=(128,200,90), thickness=2): pd = ProjectiveDrawer(calib, color, thickness=thickness) offset = self.court_definition.rim_center_offset h = self.court_definition.rim_height r = self.court_definition.rim_radius pd.draw_arc(image, Point3D(offset, self.h/2, -h), r) pd.draw_arc(image, Point3D(self.w-offset, self.h/2, -h), r) def fill_board(self, image, calib: calib3d.calib.Calib, color=(128, 255, 128))-
Expand source code
def fill_board(self, image, calib: Calib, color=(128,255,128)): pd = ProjectiveDrawer(calib, color) pd.fill_polygon(image, self.left_board) pd.fill_polygon(image, self.right_board) def fill_court(self, image, calib, color=(255, 0, 255))-
Expand source code
def fill_court(self, image, calib, color=(255, 0, 255)): pd = ProjectiveDrawer(calib, color) pd.fill_polygon(image, self.corners) def fill_court_coordinates(self, image, calib)-
Expand source code
def fill_court_coordinates(self, image, calib): pass def projects_in(self, calib: calib3d.calib.Calib, point2D)-
Expand source code
def projects_in(self, calib: Calib, point2D): point3D = calib.project_2D_to_3D(point2D, Z=0) return point3D.x >= 0 and point3D.x <= self.w and point3D.y >= 0 and point3D.y <= self.h def visible_edges(self, calib: calib3d.calib.Calib)-
Expand source code
def visible_edges(self, calib: Calib): for edge in self.edges: try: yield calib.visible_edge(edge) except ValueError: continue
class GenericItem-
Expand source code
class GenericItem(metaclass=abc.ABCMeta): """ Python object describing dataset item. .. important:: Attributes that require files to be downloaded (like images) should be decorated with `functools.cached_property` to prevent being read before they get downloaded. """ @abc.abstractproperty def key(self): """ Generates the key associated to the Item. Key should to be immutable (eg: NamedTuple). """ raise NotImplementedError @property def db_item(self): """ Returns the db_item that creates the python object """ raise NotImplementedError @abc.abstractproperty def files(self): """ List files stored on remote storage that belong to the object """ raise NotImplementedErrorPython object describing dataset item.
Important
Attributes that require files to be downloaded (like images) should be decorated with
functools.cached_propertyto prevent being read before they get downloaded.Subclasses
Instance variables
prop db_item-
Expand source code
@property def db_item(self): """ Returns the db_item that creates the python object """ raise NotImplementedErrorReturns the db_item that creates the python object
prop files-
Expand source code
@abc.abstractproperty def files(self): """ List files stored on remote storage that belong to the object """ raise NotImplementedErrorList files stored on remote storage that belong to the object
prop key-
Expand source code
@abc.abstractproperty def key(self): """ Generates the key associated to the Item. Key should to be immutable (eg: NamedTuple). """ raise NotImplementedErrorGenerates the key associated to the Item. Key should to be immutable (eg: NamedTuple).
class InstantsDataset-
Expand source code
class InstantsDataset(Dataset): items_type = InstantAncestors
- mlworkflow.datasets.Dataset
Class variables
var items_type-
Python object describing dataset item.
Important
Attributes that require files to be downloaded (like images) should be decorated with
functools.cached_propertyto prevent being read before they get downloaded.
class Stage (value, names=None, *, module=None, qualname=None, type=None, start=1)-
Expand source code
class Stage(IntFlag): TRAIN = 1 EVAL = 2An enumeration.
Ancestors
- enum.IntFlag
- builtins.int
- enum.Flag
- enum.Enum
Class variables
var EVALvar TRAIN
class Subset (name: str,
stage: Stage,
dataset: mlworkflow.datasets.Dataset,
keys=None,
repetitions=1,
desc=None)-
Expand source code
class Subset: def __init__(self, name: str, stage: Stage, dataset: Dataset, keys=None, repetitions=1, desc=None): keys = keys if keys is not None else dataset.keys.all() assert isinstance(keys, (tuple, list)), f"Received instance of {type(keys)} for subset {name}" #assert DataAugmentationDataset.__name__ == dataset.__class__.__name__, "dataset must be an instance of DataAugmentationDataset" # comparing name rather than class enables class being defined in shared git-subtree self.name = name self.type = stage self.dataset = dataset#FilteredDataset(dataset, predicate=lambda k,v: v is not None) self._keys = keys self.keys = keys self.repetitions = repetitions self.desc = desc self.is_training = self.type == Stage.TRAIN loop = None if self.is_training else repetitions self.shuffled_keys = pseudo_random(evolutive=self.is_training)(self.shuffled_keys) self.query_item = pseudo_random(loop=loop, input_dependent=True)(self.query_item) def shuffled_keys(self): keys = self.keys * self.repetitions return random.sample(keys, len(keys)) if self.is_training else keys def __len__(self): return len(self.keys)*self.repetitions def __str__(self): return f"{self.__class__.__name__}<{self.name}>({len(self.keys)}*{self.repetitions})" def query_item(self, key): return self.dataset.query_item(key) def chunkify(self, keys, chunk_size, drop_last=True): d = [] for k in keys: try: v = self.query_item(k) except KeyError: continue if v is None: continue d.append((k, v)) if len(d) == chunk_size: # yield complete sublist and create a new list yield d d = [] if not drop_last and d: yield d def batches(self, batch_size, keys=None, drop_last=True, *args, **kwargs): keys = keys or self.shuffled_keys() for chunk in self.chunkify(keys, chunk_size=batch_size, drop_last=drop_last): keys, batch = list(zip(*chunk)) # transforms list of (k,v) into list of (k) and list of (v) yield keys, collate_fn(batch)Subclasses
Methods
def batches(self, batch_size, keys=None, drop_last=True, *args, **kwargs)-
Expand source code
def batches(self, batch_size, keys=None, drop_last=True, *args, **kwargs): keys = keys or self.shuffled_keys() for chunk in self.chunkify(keys, chunk_size=batch_size, drop_last=drop_last): keys, batch = list(zip(*chunk)) # transforms list of (k,v) into list of (k) and list of (v) yield keys, collate_fn(batch) def chunkify(self, keys, chunk_size, drop_last=True)-
Expand source code
def chunkify(self, keys, chunk_size, drop_last=True): d = [] for k in keys: try: v = self.query_item(k) except KeyError: continue if v is None: continue d.append((k, v)) if len(d) == chunk_size: # yield complete sublist and create a new list yield d d = [] if not drop_last and d: yield d def query_item(self, key)-
Expand source code
def query_item(self, key): return self.dataset.query_item(key) def shuffled_keys(self)-
Expand source code
def shuffled_keys(self): keys = self.keys * self.repetitions return random.sample(keys, len(keys)) if self.is_training else keys