Coverage for /home/deng/Projects/ete4/hackathon/ete4/ete4/parser/text_arraytable.py: 10%
73 statements
« prev ^ index » next coverage.py v7.2.7, created at 2024-03-21 09:19 +0100
« prev ^ index » next coverage.py v7.2.7, created at 2024-03-21 09:19 +0100
1import re
2from sys import stderr
3import numpy
5from ete4.core import arraytable
7__all__ = ['read_arraytable', 'write_arraytable']
10def read_arraytable(matrix_file, mtype="float", arraytable_object = None):
11 """ Reads a text tab-delimited matrix from file """
13 if arraytable_object is None:
14 A = arraytable.ArrayTable()
15 else:
16 A = arraytable_object
18 A.mtype = mtype
19 temp_matrix = []
20 rowname_counter = {}
21 colname_counter = {}
22 row_dup_flag = False
23 col_dup_flag = False
25 # if matrix_file has many lines, tries to read it as the matrix
26 # itself.
27 if len(matrix_file.split("\n"))>1:
28 matrix_data = matrix_file.split("\n")
29 else:
30 matrix_data = open(matrix_file)
32 for line in matrix_data:
33 # Clean up line
34 line = line.strip("\n")
35 #line = line.replace(" ","")
36 # Skip empty lines
37 if not line:
38 continue
39 # Get fields in line
40 fields = line.split("\t")
41 # Read column names
42 if line[0]=='#' and re.match("#NAMES",fields[0],re.IGNORECASE):
43 counter = 0
44 for colname in fields[1:]:
45 colname = colname.strip()
47 # Handle duplicated col names by adding a number
48 colname_counter[colname] = colname_counter.get(colname,0) + 1
49 if colname in A.colValues:
50 colname += "_%d" % colname_counter[colname]
51 col_dup_flag = True
52 # Adds colname
53 A.colValues[colname] = None
54 A.colNames.append(colname)
55 if col_dup_flag:
56 print("Duplicated column names were renamed.", file=stderr)
58 # Skip comments
59 elif line[0]=='#':
60 continue
62 # Read values (only when column names are loaded)
63 elif A.colNames:
64 # Checks shape
65 if len(fields)-1 != len(A.colNames):
66 raise ValueError("Invalid number of columns. Expecting:%d" % len(A.colNames))
68 # Extracts row name and remove it from fields
69 rowname = fields.pop(0).strip()
71 # Handles duplicated row names by adding a number
72 rowname_counter[rowname] = rowname_counter.get(rowname,0) + 1
73 if rowname in A.rowValues:
74 rowname += "_%d" % rowname_counter[rowname]
75 row_dup_names = True
77 # Adds row name
78 A.rowValues[rowname] = None
79 A.rowNames.append(rowname)
81 # Reads row values
82 values = []
83 for f in fields:
84 if f.strip()=="":
85 f = numpy.nan
86 values.append(f)
87 temp_matrix.append(values)
88 else:
89 raise ValueError("Column names are required.")
91 if row_dup_flag:
92 print("Duplicated row names were renamed.", file=stderr)
94 # Convert all read lines into a numpy matrix
95 vmatrix = numpy.array(temp_matrix).astype(A.mtype)
97 # Updates indexes to link names and vectors in matrix
98 A._link_names2matrix(vmatrix)
99 return A
101def write_arraytable(A, fname, colnames=None):
102 if colnames is None:
103 colnames = []
104 elif colnames == []:
105 colnames = A.colNames
107 matrix = A.get_several_column_vectors(colnames)
108 matrix = matrix.swapaxes(0,1)
109 OUT = open(fname,"w")
110 print('\t'.join(["#NAMES"] + colnames), file=OUT)
111 counter = 0
112 for rname in A.rowNames:
113 print('\t'.join(map(str,[rname]+matrix[counter].tolist())), file=OUT)
114 counter +=1
115 OUT.close()