Coverage for /home/deng/Projects/ete4/hackathon/ete4/ete4/parser/ete_format.py: 22%

55 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-03-21 09:19 +0100

1""" 

2Prototype of the "future ete format", an alternative to the newick format, 

3that encodes types too (and is probably faster). 

4""" 

5 

6import io 

7import json 

8import pickle 

9import base64 

10import gzip 

11import ete4 

12 

13 

14def pickle_pack(data): 

15 return base64.b64encode(pickle.dumps(data)).decode() 

16 

17def pickle_unpack(data): 

18 return pickle.loads(base64.b64decode(data)) 

19 

20def b64gzip_pack(data): 

21 return base64.b64encode(gzip.compress(bytes(data, 'utf-8'))).decode() 

22 

23def b64gzip_unpack(data): 

24 return gzip.decompress(base64.b64decode(data)) 

25 

26 

27def dumps(t, encoder='pickle', pack=False): 

28 OUT = io.StringIO() 

29 

30 assert encoder in ['pickle', 'json'], f'Invalid encoder: {encoder}' 

31 

32 for i, n in enumerate(t.traverse()): 

33 n.props['__id'] = i 

34 next_nodes = [t] 

35 

36 while next_nodes: 

37 n = next_nodes.pop() 

38 next_nodes.extend(n.children) 

39 

40 if encoder == 'json': 

41 packed_content = json.dumps(n.props) 

42 elif encoder == 'pickle': 

43 packed_content = pickle_pack(n.props) 

44 

45 print('p', n.props['__id'], packed_content, sep='\t', file=OUT) 

46 

47 if n.up: 

48 print('t', n.props['__id'], n.up.props['__id'], sep='\t', file=OUT) 

49 else: 

50 print('t', n.props['__id'], '', sep='\t', file=OUT) 

51 

52 if pack: 

53 return b64gzip_pack(OUT.getvalue()) 

54 else: 

55 return OUT.getvalue() 

56 

57 

58def loads(INPUT, encoder='pickle', unpack=False): 

59 if unpack: 

60 INPUT = b64gzip_unpack(INPUT).decode() 

61 

62 assert encoder in ['pickle', 'json'], f'Invalid encoder: {encoder}' 

63 

64 id2node = {} 

65 root = None 

66 for line in io.StringIO(INPUT).readlines(): 

67 etype, nid, b = map(str.strip, line.split('\t')) 

68 if nid not in id2node: 

69 node = id2node[nid] = ete4.Tree() 

70 

71 if etype == 'p': 

72 if encoder == 'pickle': 

73 node.props = pickle_unpack(b) 

74 if encoder == 'json': 

75 node.props = json.loads(b) 

76 elif etype == 't': 

77 if b: 

78 id2node[b].add_child(node) 

79 node.up = id2node[b] 

80 else: 

81 root = node 

82 return root