Source code for ase2sprkkr.common.grammar_types.data

"""This module contains special GrammarTypes used for large data in output files"""

from .grammar_type import GrammarType, compare_numpy_values
from ..decorators import add_to_signature, cached_property
import pyparsing as pp
import re
from ..grammar import SkipToRegex
import io
import numpy as np
import copy
import os
from typing import Union


[docs] class RestOfTheFile(GrammarType): """Match anything up to the end of the file""" datatype = str datatype_name = "string" _grammar = pp.Regex(".*$", re.M | re.S).set_parse_action(lambda x: x[0]) _grammar.skipWhitespace = False
[docs] def grammar_name(self): return "<the rest of the file>"
[docs] class Prefixed(GrammarType): """This value consists from a few lines, each prefixed with a given prefix"""
[docs] @add_to_signature(GrammarType.__init__, prepend=True) def __init__(self, data_prefix, allow_empty=True, *args, **kwargs): self.data_prefix = data_prefix self.allow_empty = allow_empty super().__init__(*args, **kwargs)
[docs] @cached_property def _grammar(self): pref = re.escape(self.data_prefix) out = f"({pref}[^\n]*)(\n{pref}[^\n]*)*" if self.allow_empty: out = f"({out})?" return pp.Regex(out)
[docs] def _string(self, value): return re.replace("^|\n", f"{self.data_prefix}\\1", value)
[docs] class RawData(GrammarType): """Match anything up to the end of the file or to the given delimiter"""
[docs] @add_to_signature(GrammarType.__init__) def __init__( self, *args, lines=None, indented=False, line_length=None, ends_with: Union[str, re.Pattern] = None, ends_with_str=None, include_ends_with=False, **kwargs, ): """ Parameters ---------- lines Number of lines to read. Can be given as string - then the value of the given option determines the number of lines. indented If there are <n> spaces before data, pass n to this arg. If the file has the following structure: .. code-block:: text ....................................... ....rest of the splitted line..... ....rest of the splitted line. ....... The second line................ ..... the rest of the ............ ............ second line ... ............ Pass a tuple with two integers into this argument. The first number of tuple is the max. number of characters on a line, longer lines will be splitted. The second number is the number of spaces placed on the begining of the new lines created by splitting the old. line_length Wrap the lines longer than a given number ends_with The data ends with a given string. ends_with_str If ends_with is regex, print this on end of the data include_ends_with Include the ending delimiter to the data. **kwargs Any other arguments are passed to the :meth:`GrammarType constructor<GrammarType.__init__>` """ self.ends_with = ends_with self.ends_with_str = self.ends_with if ends_with_str is None else ends_with_str self.indented = " " * indented if isinstance(indented, int) else indented self.lines = lines self.line_length = line_length self.include_ends_with = include_ends_with self.remove_forward = None super().__init__(*args, **kwargs)
[docs] def _n_lines_grammar(self, lines): """return a grammar for n lines of text""" out = pp.Regex(f"([^\n]*\n){{{lines - 1}}}[^\n]*(?=\n|$)", re.S) out.leave_whitespace() return out
[docs] def _grammar(self, param_name=False): if self.lines: if isinstance(self.lines, int): out = self._n_lines_grammar(self.lines) else: out = self.forward elif self.ends_with: if isinstance(self.ends_with, re.Pattern): out = SkipToRegex(self.ends_with, include_pattern=self.include_ends_with) else: out = pp.SkipTo(pp.Suppress(self.ends_with), include=self.include_ends_with) out.set_parse_action(lambda x: x[0]) else: out = RestOfTheFile._grammar.copy() def parse(v): v = v[0] if self.indented: if isinstance(self.indented, tuple): v = v.replace("\n" + " " * self.indented[1], "") else: v = re.sub(f"(^|\n){self.indented}", r"\1", v) if self.line_length: v = re.sub( f"([^\n]{{{self.line_length}}}[^{self.written_delimiter}\n]*)\n", f"\\1{self.written_delimiter}", v ) return v if self.indented or self.line_length: out.add_parse_action(parse) return out
[docs] def _string(self, val): out = str(val) if self.line_length: out = re.sub( f"([^\n]{{{self.line_length}}}[^{self.written_delimiter}\n]*){self.written_delimiter}", "\\1\n", out ) indented = self.indented if indented: if isinstance(indented, tuple): first = indented[0] nexts = first - indented[1] prefix = " " * indented[1] def g(): for i in out.split("\n"): yield i[:first] s = first ln = len(i) while s < ln: e = s + nexts yield prefix + i[s:e] s = e out = "\n".join(g()) else: out = re.sub("(^|\n(?!$))", r"\1" + indented, out) if self.ends_with_str and self.include_ends_with: out += self.ends_with_str return out
[docs] def added_to_container(self, container): if not self.lines or isinstance(self.lines, int): return if self.remove_forward: self.remove_forward if container: self.forward = pp.Forward() obj = container[self.lines] def paction(parsed): self.forward << self._n_lines_grammar(parsed[0][1]) return parsed def hook(grammar): grammar.add_parse_action(paction) obj.add_grammar_hook(hook) self.remove_forward = lambda: obj.remove_grammar_hook(hook) else: self.remove_forward = None super().added_to_container(container)
def __del__(self): if self.remove_forward: self.remove_forward self.remove_forward = None
[docs] def convert(self, val): return str(val)
[docs] class NumpyArray(RawData): """Match anything up to the end of the file or to the given delimiter, as numpy array""" array_access = True
[docs] @add_to_signature(GrammarType.__init__) def __init__( self, *args, delimiter=None, shape=None, written_shape=None, item_format="% .18e", dtype=None, dtypes=None, no_newline_at_end=True, **kwargs, ): """ Parameters ---------- delimiter None - default behavior. int - the number will take given fixed number of chars shape Resize to given shape after read written_shape Resize to given shape before writing item_format Output format of the array (just for writing). dtype Type of the resulting data. Pass ``'line'`` to get array of whole lines dtypes More dtypes can be given. Then, the first, that match the data, is used **kwargs Any other arguments are passed to the :meth:`GrammarType constructor<GrammarType.__init__>` """ self.delimiter = delimiter self.written_delimiter = delimiter or " " self.written_shape = written_shape self.item_format = item_format self.shape = shape self.no_newline_at_end = no_newline_at_end if dtypes is None: if dtype == "line": dtypes = dtype else: dtypes = [dtype] else: if dtype is not None: raise ValueError("Use either dtype or dtypes, but not both") self.dtypes = dtypes super().__init__(*args, **kwargs)
[docs] def _validate(self, value, why="set"): return isinstance(value, np.ndarray)
[docs] def convert(self, value): return np.asarray(value)
[docs] def _string(self, value): out = io.StringIO() delimiter = self.delimiter if isinstance(delimiter, int): delimiter = "" if self.written_shape: value = value.reshape(self.written_shape) np.savetxt(out, value, delimiter=self.written_delimiter, fmt=self.item_format) if self.no_newline_at_end: out.seek(out.tell() - 1, os.SEEK_SET) out.truncate() out = out.getvalue() return super()._string(out)
is_the_same_value = staticmethod(compare_numpy_values)
[docs] def _grammar(self, param_name=False): grammar = super()._grammar(param_name) def parse(v): v = v[0] if self.dtypes == "line": v = np.array([i.rstrip() for i in v.split("\n")], dtype=object) else: last_error = None for dt in self.dtypes: try: v = np.genfromtxt(io.StringIO(v), delimiter=self.delimiter, dtype=dt) break except Exception: pass else: if not last_error: raise ValueError("No dtype specified") raise last_error if self.shape: v.shape = self.shape return v grammar.add_parse_action(parse) return grammar
[docs] def copy_value(self, value): return copy.deepcopy(value)