Coverage for /home/deng/Projects/metatree_drawer/metatreedrawer/treeprofiler/src/b64pickle.py: 22%

58 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-08-07 10:33 +0200

1 

2import sys 

3import io 

4import json 

5import pickle 

6import base64 

7import gzip 

8from ete4 import Tree 

9 

10def pickle_pack(data): 

11 return base64.b64encode(pickle.dumps(data)).decode() 

12 

13def pickle_unpack(data): 

14 return pickle.loads(base64.b64decode(data)) 

15 

16def b64gzip_pack(data): 

17 return base64.b64encode(gzip.compress(bytes(data, 'utf-8'))).decode() 

18 

19def b64gzip_unpack(data): 

20 return gzip.decompress(base64.b64decode(data)) 

21 

22 

23def dumps(t, encoder='pickle', pack=False): 

24 OUT = io.StringIO() 

25 if encoder not in ['pickle', 'json']: 

26 raise Exception("Invalid encoder") 

27 

28 for i, n in enumerate(t.traverse()): 

29 n.props['__id'] = i 

30 next_nodes = [t] 

31 

32 while next_nodes: 

33 n = next_nodes.pop() 

34 next_nodes.extend(n.children) 

35 

36 if encoder == 'json': 

37 packed_content = json.dumps(n.props) 

38 elif encoder == 'pickle': 

39 packed_content = pickle_pack(n.props) 

40 

41 print('p', n.props['__id'], packed_content, sep='\t', file=OUT) 

42 

43 if n.up: 

44 print('t', n.props['__id'], n.up.props['__id'], sep='\t', file=OUT) 

45 else: 

46 print('t', n.props['__id'], '', sep='\t', file=OUT) 

47 

48 if pack: 

49 return b64gzip_pack(OUT.getvalue()) 

50 else: 

51 return OUT.getvalue() 

52 

53def loads(INPUT, encoder='pickle', unpack=False): 

54 if unpack: 

55 INPUT = b64gzip_unpack(INPUT).decode() 

56 

57 if encoder not in ['pickle', 'json']: 

58 raise Exception("Invalid encoder") 

59 id2node = {} 

60 root = None 

61 

62 for line in io.StringIO(INPUT).readlines(): 

63 etype, nid, b = map(str.strip, line.split('\t')) 

64 if nid not in id2node: 

65 node = id2node[nid] = Tree() 

66 

67 if etype == 'p': 

68 if encoder == 'pickle': 

69 node.props = pickle_unpack(b) 

70 if encoder == 'json': 

71 node.props = json.loads(b) 

72 elif etype == 't': 

73 if b: 

74 id2node[b].add_child(node) 

75 node.up = id2node[b] 

76 else: 

77 root = node 

78 return root