Coverage for /home/deng/Projects/ete4/hackathon/ete4/ete4/parser/text_arraytable.py: 10%

73 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-03-21 09:19 +0100

1import re 

2from sys import stderr 

3import numpy 

4 

5from ete4.core import arraytable 

6 

7__all__ = ['read_arraytable', 'write_arraytable'] 

8 

9 

10def read_arraytable(matrix_file, mtype="float", arraytable_object = None): 

11 """ Reads a text tab-delimited matrix from file """ 

12 

13 if arraytable_object is None: 

14 A = arraytable.ArrayTable() 

15 else: 

16 A = arraytable_object 

17 

18 A.mtype = mtype 

19 temp_matrix = [] 

20 rowname_counter = {} 

21 colname_counter = {} 

22 row_dup_flag = False 

23 col_dup_flag = False 

24 

25 # if matrix_file has many lines, tries to read it as the matrix 

26 # itself. 

27 if len(matrix_file.split("\n"))>1: 

28 matrix_data = matrix_file.split("\n") 

29 else: 

30 matrix_data = open(matrix_file) 

31 

32 for line in matrix_data: 

33 # Clean up line 

34 line = line.strip("\n") 

35 #line = line.replace(" ","") 

36 # Skip empty lines 

37 if not line: 

38 continue 

39 # Get fields in line 

40 fields = line.split("\t") 

41 # Read column names 

42 if line[0]=='#' and re.match("#NAMES",fields[0],re.IGNORECASE): 

43 counter = 0 

44 for colname in fields[1:]: 

45 colname = colname.strip() 

46 

47 # Handle duplicated col names by adding a number 

48 colname_counter[colname] = colname_counter.get(colname,0) + 1 

49 if colname in A.colValues: 

50 colname += "_%d" % colname_counter[colname] 

51 col_dup_flag = True 

52 # Adds colname 

53 A.colValues[colname] = None 

54 A.colNames.append(colname) 

55 if col_dup_flag: 

56 print("Duplicated column names were renamed.", file=stderr) 

57 

58 # Skip comments 

59 elif line[0]=='#': 

60 continue 

61 

62 # Read values (only when column names are loaded) 

63 elif A.colNames: 

64 # Checks shape 

65 if len(fields)-1 != len(A.colNames): 

66 raise ValueError("Invalid number of columns. Expecting:%d" % len(A.colNames)) 

67 

68 # Extracts row name and remove it from fields 

69 rowname = fields.pop(0).strip() 

70 

71 # Handles duplicated row names by adding a number 

72 rowname_counter[rowname] = rowname_counter.get(rowname,0) + 1 

73 if rowname in A.rowValues: 

74 rowname += "_%d" % rowname_counter[rowname] 

75 row_dup_names = True 

76 

77 # Adds row name 

78 A.rowValues[rowname] = None 

79 A.rowNames.append(rowname) 

80 

81 # Reads row values 

82 values = [] 

83 for f in fields: 

84 if f.strip()=="": 

85 f = numpy.nan 

86 values.append(f) 

87 temp_matrix.append(values) 

88 else: 

89 raise ValueError("Column names are required.") 

90 

91 if row_dup_flag: 

92 print("Duplicated row names were renamed.", file=stderr) 

93 

94 # Convert all read lines into a numpy matrix 

95 vmatrix = numpy.array(temp_matrix).astype(A.mtype) 

96 

97 # Updates indexes to link names and vectors in matrix 

98 A._link_names2matrix(vmatrix) 

99 return A 

100 

101def write_arraytable(A, fname, colnames=None): 

102 if colnames is None: 

103 colnames = [] 

104 elif colnames == []: 

105 colnames = A.colNames 

106 

107 matrix = A.get_several_column_vectors(colnames) 

108 matrix = matrix.swapaxes(0,1) 

109 OUT = open(fname,"w") 

110 print('\t'.join(["#NAMES"] + colnames), file=OUT) 

111 counter = 0 

112 for rname in A.rowNames: 

113 print('\t'.join(map(str,[rname]+matrix[counter].tolist())), file=OUT) 

114 counter +=1 

115 OUT.close()