Coverage for common/spreadsheet.py: 28%

115 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2026-02-05 06:46 -0600

1""" 

2crate_anon/common/spreadsheet.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26Functions for reading/writing spreadsheets. 

27 

28""" 

29 

30# ============================================================================= 

31# Imports 

32# ============================================================================= 

33 

34import csv 

35from enum import Enum 

36import logging 

37import os 

38from typing import Any, Dict, Iterable, List, Sequence, TextIO 

39 

40from cardinal_pythonlib.file_io import smart_open 

41import openpyxl 

42import pyexcel_ods 

43import pyexcel_xlsx 

44 

45log = logging.getLogger(__name__) 

46 

47 

48# ============================================================================= 

49# Constants 

50# ============================================================================= 

51 

52SPREADSHEET_ROW_TYPE = Sequence[Any] 

53# ... a row is a sequence of cell values 

54SINGLE_SPREADSHEET_TYPE = Iterable[SPREADSHEET_ROW_TYPE] 

55# ... iterable of rows 

56SINGLE_SPREADSHEET_GENERATOR_TYPE = Iterable[SPREADSHEET_ROW_TYPE] 

57MULTIPLE_SPREADSHEET_TYPE = Dict[str, SINGLE_SPREADSHEET_TYPE] 

58# ... maps spreadsheet names to spreadsheets 

59 

60 

61# ============================================================================= 

62# Enums 

63# ============================================================================= 

64 

65 

66class SpreadsheetFileExtensions(Enum): 

67 CSV = ".csv" 

68 TSV = ".tsv" 

69 ODS = ".ods" 

70 XLSX = ".xlsx" 

71 

72 

73# ============================================================================= 

74# Reading methods 

75# ============================================================================= 

76 

77 

78def skip_spreadsheet_row(row: SPREADSHEET_ROW_TYPE) -> bool: 

79 """ 

80 Should we skip a row, because it's empty or starts with a comment? 

81 """ 

82 if not row: 

83 return True 

84 first = row[0] 

85 if isinstance(first, str) and first.strip().startswith("#"): 

86 return True 

87 return not any(v for v in row) 

88 

89 

90def gen_rows_from_csv(filename: str) -> SINGLE_SPREADSHEET_GENERATOR_TYPE: 

91 """ 

92 Generates rows from a CSV file. 

93 """ 

94 log.debug(f"Loading as CSV: {filename}") 

95 with open(filename, "r") as csvfile: 

96 reader = csv.reader(csvfile) 

97 for row in reader: 

98 if skip_spreadsheet_row(row): 

99 continue 

100 yield row 

101 

102 

103def gen_rows_from_tsv(filename: str) -> SINGLE_SPREADSHEET_GENERATOR_TYPE: 

104 """ 

105 Generates rows from a TSV file. 

106 """ 

107 log.debug(f"Loading as TSV: {filename}") 

108 with open(filename, "r") as tsvfile: 

109 reader = csv.reader(tsvfile, delimiter="\t") 

110 for row in reader: 

111 if skip_spreadsheet_row(row): 

112 continue 

113 yield row 

114 

115 

116def gen_rows_from_xlsx(filename: str) -> SINGLE_SPREADSHEET_GENERATOR_TYPE: 

117 """ 

118 Generates rows from an XLSX file, reading the first sheet. 

119 """ 

120 log.debug(f"Loading as XLSX: {filename}") 

121 workbook = openpyxl.load_workbook(filename) 

122 # ... NB potential bug using read_only; see postcodes.py 

123 worksheet = workbook.active # first sheet, by default 

124 for sheet_row in worksheet.iter_rows(): 

125 row = ["" if cell.value is None else cell.value for cell in sheet_row] 

126 if skip_spreadsheet_row(row): 

127 continue 

128 yield row 

129 

130 

131def gen_rows_from_ods(filename: str) -> SINGLE_SPREADSHEET_GENERATOR_TYPE: 

132 """ 

133 Generates rows from an ODS file, reading the first sheet. 

134 """ 

135 log.debug(f"Loading as ODS: {filename}") 

136 data = pyexcel_ods.get_data(filename) # type: MULTIPLE_SPREADSHEET_TYPE 

137 # ... but it's an ordered dictionary, so: 

138 first_key = next(iter(data)) 

139 first_sheet_rows = data[first_key] 

140 for row in first_sheet_rows: 

141 if skip_spreadsheet_row(row): 

142 continue 

143 yield row 

144 

145 

146def gen_rows_from_spreadsheet( 

147 filename: str, 

148) -> SINGLE_SPREADSHEET_GENERATOR_TYPE: 

149 """ 

150 Generates rows from a spreadsheet-type file, autodetecting it. 

151 

152 Args: 

153 filename: 

154 Filename to read. 

155 """ 

156 _, ext = os.path.splitext(filename) 

157 if ext == SpreadsheetFileExtensions.CSV.value: 

158 row_gen = gen_rows_from_csv(filename) 

159 elif ext == SpreadsheetFileExtensions.ODS.value: 

160 row_gen = gen_rows_from_ods(filename) 

161 elif ext == SpreadsheetFileExtensions.TSV.value: 

162 row_gen = gen_rows_from_tsv(filename) 

163 elif ext == SpreadsheetFileExtensions.XLSX.value: 

164 row_gen = gen_rows_from_xlsx(filename) 

165 else: 

166 raise ValueError(f"Unknown spreadsheet extension: {ext!r}") 

167 for row in row_gen: 

168 yield row 

169 

170 

171# ============================================================================= 

172# Writing methods 

173# ============================================================================= 

174 

175 

176def make_safe_for_spreadsheet(x: Any) -> Any: 

177 """ 

178 Helper function for :func:`remove_none_values_from_spreadsheet`. 

179 """ 

180 return "" if x is None else x 

181 

182 

183def remove_none_values_from_spreadsheet( 

184 data: MULTIPLE_SPREADSHEET_TYPE, 

185) -> MULTIPLE_SPREADSHEET_TYPE: 

186 """ 

187 The ODS writer does not cope with ``None`` values, giving: 

188 

189 .. code-block:: 

190 

191 AttributeError: 'NoneType' object has no attribute 'split' 

192 

193 Here, we transform ``None`` values to the empty string. 

194 """ 

195 result = {} 

196 for sheetname, sheetdata in data.items(): 

197 converted_sheetdata = [] # type: List[List[Any]] 

198 for row in sheetdata: 

199 converted_row = [make_safe_for_spreadsheet(x) for x in row] 

200 converted_sheetdata.append(converted_row) 

201 result[sheetname] = converted_sheetdata 

202 return result 

203 

204 

205def write_csv(filename: str, rows: SINGLE_SPREADSHEET_TYPE) -> None: 

206 """ 

207 Writes to a comma-separated values (CSV) file. 

208 

209 Empty (null) values are translated to "". 

210 

211 Args: 

212 rows: 

213 Rows to write. (The first row is often a header row.) 

214 filename: 

215 Name of file to write. 

216 """ 

217 log.info(f"Saving as CSV: {filename}") 

218 with smart_open(filename, "wt") as f: # type: TextIO 

219 writer = csv.writer(f) 

220 writer.writerows(rows) 

221 

222 

223def write_tsv(filename: str, rows: SINGLE_SPREADSHEET_TYPE) -> None: 

224 """ 

225 Writes to a tab-separated values (TSV) file. 

226 

227 Empty (null) values are translated to "". 

228 

229 Args: 

230 rows: 

231 Rows to write. (The first row is often a header row.) 

232 filename: 

233 Name of file to write. 

234 """ 

235 log.info(f"Saving as TSV: {filename}") 

236 with smart_open(filename, "wt") as f: # type: TextIO 

237 writer = csv.writer(f, delimiter="\t") 

238 writer.writerows(rows) 

239 

240 

241def write_ods(filename: str, data: MULTIPLE_SPREADSHEET_TYPE) -> None: 

242 """ 

243 Writes to an OpenOffice spreadsheet (ODS) file. 

244 

245 Args: 

246 data: 

247 See :func:`write_spreadsheet`. 

248 filename: 

249 Name of file to write. 

250 """ 

251 log.info(f"Saving as ODS: {filename}") 

252 pyexcel_ods.save_data(filename, data) 

253 

254 

255def write_xlsx(filename: str, data: MULTIPLE_SPREADSHEET_TYPE) -> None: 

256 """ 

257 Writes to an OpenOffice spreadsheet (ODS) file. 

258 

259 Args: 

260 data: 

261 See :func:`write_spreadsheet`. 

262 filename: 

263 Name of file to write. 

264 """ 

265 log.info(f"Saving as XLSX: {filename}") 

266 pyexcel_xlsx.save_data(filename, data) 

267 

268 

269def write_spreadsheet( 

270 filename: str, data: MULTIPLE_SPREADSHEET_TYPE, filetype: str = None 

271) -> None: 

272 """ 

273 Writes to a spreadsheet-style file, autodetecting it. 

274 

275 Args: 

276 filename: 

277 Name of file to write, or "-" for stdout (in which case the 

278 filetype is forced to TSV). 

279 data: 

280 A dictionary whose keys are spreadsheet names and whose 

281 corresponding values contain spreadsheet data. (For TSV, which is a 

282 single-sheet format, only the first value is used.) Each dictionary 

283 value is an iterable containing rows, and each row is an iterable 

284 of cell data items. 

285 filetype: 

286 File type as one of the string values of SpreadsheetFileExtensions; 

287 alternatively, use ``None`` to autodetect from the filename. 

288 """ 

289 ext = filetype or os.path.splitext(filename)[1] 

290 if filename == "-" or ext == SpreadsheetFileExtensions.TSV.value: 

291 first_key = next(iter(data)) 

292 # https://stackoverflow.com/questions/30362391/how-do-you-find-the-first-key-in-a-dictionary # noqa: E501 

293 first_sheet = data[first_key] 

294 write_tsv(filename, first_sheet) 

295 elif ext == SpreadsheetFileExtensions.CSV.value: 

296 first_key = next(iter(data)) 

297 first_sheet = data[first_key] 

298 write_csv(filename, first_sheet) 

299 elif ext == SpreadsheetFileExtensions.ODS.value: 

300 # The ODS writer does not like None values. 

301 write_ods(filename, remove_none_values_from_spreadsheet(data)) 

302 elif ext == SpreadsheetFileExtensions.XLSX.value: 

303 write_xlsx(filename, data) 

304 else: 

305 raise ValueError(f"Unknown spreadsheet extension: {ext!r}")