Coverage for src / sql_tool / core / timescaledb.py: 74%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-14 15:28 -0500

1"""TimescaleDB administration operations. 

2 

3Framework-agnostic business logic for TimescaleDB commands. 

4CLI layer in cli/commands/ts.py provides the typer interface. 

5""" 

6 

7from __future__ import annotations 

8 

9from typing import TYPE_CHECKING, Any 

10 

11from sql_tool.core.exceptions import SqlToolError 

12 

13if TYPE_CHECKING: 

14 from sql_tool.core.client import PgClient 

15 from sql_tool.core.models import QueryResult 

16 

17 

18def check_timescaledb_available(client: PgClient) -> None: 

19 """Raise SqlToolError if TimescaleDB extension is not installed.""" 

20 sql = """ 

21 SELECT EXISTS( 

22 SELECT 1 FROM pg_extension WHERE extname = 'timescaledb' 

23 ) AS timescaledb_installed 

24 """ 

25 result = client.execute_query(sql) 

26 if not result.rows or not result.rows[0][0]: 

27 raise SqlToolError( 

28 "TimescaleDB extension is not installed in this database. " 

29 "Install it with: CREATE EXTENSION IF NOT EXISTS timescaledb;" 

30 ) 

31 

32 

33# --------------------------------------------------------------------------- 

34# Read-only queries 

35# --------------------------------------------------------------------------- 

36 

37 

38def list_hypertables( 

39 client: PgClient, 

40 *, 

41 schema_filter: str | None = None, 

42) -> QueryResult: 

43 if schema_filter: 

44 where_clause = "WHERE h.hypertable_schema = %(schema)s" 

45 params: dict[str, Any] | None = {"schema": schema_filter} 

46 else: 

47 where_clause = "" 

48 params = None 

49 

50 sql = f""" 

51 SELECT 

52 h.hypertable_schema, 

53 h.hypertable_name, 

54 d.column_name, 

55 d.time_interval::text, 

56 hypertable_size((quote_ident(h.hypertable_schema) || '.' || quote_ident(h.hypertable_name))::regclass) AS size_bytes, 

57 (SELECT COUNT(*) FILTER (WHERE NOT c.is_compressed) FROM timescaledb_information.chunks c 

58 WHERE c.hypertable_schema = h.hypertable_schema AND c.hypertable_name = h.hypertable_name) AS uncompr_chunks, 

59 (SELECT COUNT(*) FILTER (WHERE c.is_compressed) FROM timescaledb_information.chunks c 

60 WHERE c.hypertable_schema = h.hypertable_schema AND c.hypertable_name = h.hypertable_name) AS compr_chunks, 

61 cs.before_compression_total_bytes, 

62 cs.after_compression_total_bytes, 

63 h.compression_enabled 

64 FROM timescaledb_information.hypertables h 

65 LEFT JOIN timescaledb_information.dimensions d 

66 ON h.hypertable_schema = d.hypertable_schema 

67 AND h.hypertable_name = d.hypertable_name 

68 AND d.dimension_number = 1 

69 LEFT JOIN LATERAL ( 

70 SELECT 

71 SUM(before_compression_total_bytes)::bigint AS before_compression_total_bytes, 

72 SUM(after_compression_total_bytes)::bigint AS after_compression_total_bytes 

73 FROM hypertable_compression_stats( 

74 (quote_ident(h.hypertable_schema) || '.' || quote_ident(h.hypertable_name))::regclass 

75 ) 

76 ) cs ON true 

77 {where_clause} 

78 ORDER BY h.hypertable_schema, h.hypertable_name 

79 """ 

80 return client.execute_query(sql, params) 

81 

82 

83def list_chunks(client: PgClient, schema: str, table: str) -> QueryResult: 

84 sql = """ 

85 SELECT 

86 c.chunk_name, 

87 c.range_start::text AS range_start, 

88 c.range_end::text AS range_end, 

89 c.is_compressed, 

90 pg_total_relation_size(('_timescaledb_internal.' || quote_ident(c.chunk_name))::regclass) AS chunk_size_bytes 

91 FROM timescaledb_information.chunks c 

92 WHERE c.hypertable_schema = %(schema)s 

93 AND c.hypertable_name = %(table)s 

94 ORDER BY c.range_start 

95 """ 

96 return client.execute_query(sql, {"schema": schema, "table": table}) 

97 

98 

99def compression_stats( 

100 client: PgClient, 

101 *, 

102 schema: str | None = None, 

103 table: str | None = None, 

104) -> QueryResult: 

105 """Query before/after compression byte counts per hypertable.""" 

106 if schema and table: 

107 where_clause = """ 

108 WHERE h.hypertable_schema = %(schema)s 

109 AND h.hypertable_name = %(table)s 

110 """ 

111 params: dict[str, Any] = {"schema": schema, "table": table} 

112 else: 

113 where_clause = "" 

114 params = {} 

115 

116 sql = f""" 

117 SELECT 

118 h.hypertable_schema, 

119 h.hypertable_name, 

120 (SELECT COUNT(*) FROM timescaledb_information.chunks c 

121 WHERE c.hypertable_schema = h.hypertable_schema 

122 AND c.hypertable_name = h.hypertable_name) AS total_chunks, 

123 (SELECT COUNT(*) FROM timescaledb_information.chunks c 

124 WHERE c.hypertable_schema = h.hypertable_schema 

125 AND c.hypertable_name = h.hypertable_name 

126 AND c.is_compressed) AS compr_chunks, 

127 d.before_compression_total_bytes, 

128 d.after_compression_total_bytes 

129 FROM timescaledb_information.hypertables h 

130 LEFT JOIN LATERAL ( 

131 SELECT 

132 SUM(before_compression_total_bytes)::bigint AS before_compression_total_bytes, 

133 SUM(after_compression_total_bytes)::bigint AS after_compression_total_bytes 

134 FROM hypertable_compression_stats( 

135 (quote_ident(h.hypertable_schema) || '.' || quote_ident(h.hypertable_name))::regclass 

136 ) 

137 ) d ON true 

138 {where_clause} 

139 ORDER BY h.hypertable_schema, h.hypertable_name 

140 """ 

141 return client.execute_query(sql, params if params else None) 

142 

143 

144def list_continuous_aggregates(client: PgClient) -> QueryResult: 

145 sql = """ 

146 SELECT 

147 ca.view_schema, 

148 ca.view_name, 

149 format('%I.%I', ca.hypertable_schema, ca.hypertable_name) AS source_hypertable, 

150 ca.materialized_only, 

151 ca.compression_enabled, 

152 format('%I.%I', ca.materialization_hypertable_schema, ca.materialization_hypertable_name) AS materialization_hypertable, 

153 ca.finalized 

154 FROM timescaledb_information.continuous_aggregates ca 

155 ORDER BY ca.view_schema, ca.view_name 

156 """ 

157 return client.execute_query(sql) 

158 

159 

160def list_retention_policies(client: PgClient) -> QueryResult: 

161 sql = """ 

162 SELECT 

163 j.hypertable_schema, 

164 j.hypertable_name, 

165 j.config->>'drop_after' AS drop_after, 

166 j.schedule_interval::text AS schedule_interval, 

167 js.last_run_started_at::text AS last_run_started_at, 

168 js.next_start::text AS next_start 

169 FROM timescaledb_information.jobs j 

170 LEFT JOIN timescaledb_information.job_stats js 

171 ON j.job_id = js.job_id 

172 WHERE j.proc_name = 'policy_retention' 

173 ORDER BY j.hypertable_schema, j.hypertable_name 

174 """ 

175 return client.execute_query(sql) 

176 

177 

178def list_refresh_status(client: PgClient) -> QueryResult: 

179 sql = """ 

180 SELECT 

181 j.hypertable_schema, 

182 j.hypertable_name, 

183 j.schedule_interval::text AS schedule_interval, 

184 js.last_run_started_at::text AS last_run_started_at, 

185 js.next_start::text AS next_start, 

186 js.last_run_status, 

187 js.total_runs, 

188 js.total_successes, 

189 js.total_failures 

190 FROM timescaledb_information.jobs j 

191 LEFT JOIN timescaledb_information.job_stats js 

192 ON j.job_id = js.job_id 

193 WHERE j.proc_name = 'policy_refresh_continuous_aggregate' 

194 ORDER BY j.hypertable_schema, j.hypertable_name 

195 """ 

196 return client.execute_query(sql) 

197 

198 

199def list_jobs(client: PgClient) -> QueryResult: 

200 sql = """ 

201 SELECT 

202 j.job_id, 

203 j.application_name, 

204 CASE WHEN j.hypertable_schema IS NOT NULL 

205 THEN format('%I.%I', j.hypertable_schema, j.hypertable_name) 

206 ELSE NULL END AS hypertable, 

207 j.schedule_interval::text AS schedule, 

208 js.last_run_started_at::text AS last_run, 

209 CASE WHEN js.last_run_started_at IS NOT NULL 

210 AND js.last_run_started_at > '-infinity'::timestamptz 

211 AND js.last_run_started_at < 'infinity'::timestamptz 

212 THEN EXTRACT(EPOCH FROM (now() - js.last_run_started_at)) 

213 ELSE NULL END AS last_run_seconds, 

214 js.last_run_status, 

215 js.next_start::text AS next_start, 

216 CASE WHEN js.next_start IS NOT NULL 

217 AND js.next_start > '-infinity'::timestamptz 

218 AND js.next_start < 'infinity'::timestamptz 

219 THEN EXTRACT(EPOCH FROM (js.next_start - now())) 

220 ELSE NULL END AS next_start_seconds, 

221 js.total_runs, 

222 js.total_successes, 

223 js.total_failures 

224 FROM timescaledb_information.jobs j 

225 LEFT JOIN timescaledb_information.job_stats js 

226 ON j.job_id = js.job_id 

227 ORDER BY j.job_id 

228 """ 

229 return client.execute_query(sql) 

230 

231 

232_JOB_HISTORY_TABLES = [ 

233 "_timescaledb_internal.bgw_job_stat_history", 

234 "_timescaledb_internal.bgw_job_stat", 

235] 

236 

237 

238def list_job_history( 

239 client: PgClient, 

240 *, 

241 job_id: int | None = None, 

242) -> QueryResult: 

243 """Try bgw_job_stat_history first, fall back to bgw_job_stat.""" 

244 where_clause = "WHERE job_id = %(job_id)s" if job_id else "" 

245 params: dict[str, Any] = {"job_id": job_id} if job_id else {} 

246 

247 for table_name in _JOB_HISTORY_TABLES: 

248 try: 

249 sql = f""" 

250 SELECT 

251 job_id, 

252 succeeded, 

253 execution_start::text, 

254 execution_finish::text, 

255 data::text AS error_data 

256 FROM {table_name} 

257 {where_clause} 

258 ORDER BY execution_start DESC 

259 LIMIT 20 

260 """ 

261 return client.execute_query(sql, params if params else None) 

262 except Exception: # noqa: BLE001 - table may not exist in this TS version 

263 continue 

264 

265 raise SqlToolError("No job history table found") 

266 

267 

268def list_compression_settings( 

269 client: PgClient, 

270 *, 

271 schema: str | None = None, 

272 table: str | None = None, 

273 effective_schema: str | None = None, 

274 include_policy: bool = True, 

275) -> QueryResult: 

276 """Query segment_by/order_by settings, optionally joined with policy schedule.""" 

277 conditions: list[str] = [] 

278 params: dict[str, Any] = {} 

279 

280 if schema and table: 

281 conditions.append("cs.hypertable_schema = %(schema)s") 

282 conditions.append("cs.hypertable_name = %(table)s") 

283 params = {"schema": schema, "table": table} 

284 elif effective_schema: 

285 conditions.append("cs.hypertable_schema = %(schema)s") 

286 params = {"schema": effective_schema} 

287 

288 where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else "" 

289 

290 policy_join = "" 

291 policy_select = "" 

292 if include_policy: 

293 policy_join = """ 

294 LEFT JOIN ( 

295 SELECT 

296 j.hypertable_schema AS p_schema, 

297 j.hypertable_name AS p_table, 

298 j.config->>'compress_after' AS compress_after, 

299 j.schedule_interval::text AS sched, 

300 js.last_run_started_at::text AS last_run, 

301 js.next_start::text AS next_start, 

302 js.last_run_status AS status 

303 FROM timescaledb_information.jobs j 

304 LEFT JOIN timescaledb_information.job_stats js ON j.job_id = js.job_id 

305 WHERE j.proc_name = 'policy_compression' 

306 ) pol ON pol.p_schema = cs.hypertable_schema AND pol.p_table = cs.hypertable_name 

307 """ 

308 policy_select = """, 

309 pol.compress_after, 

310 pol.sched, 

311 pol.last_run, 

312 pol.next_start, 

313 pol.status""" 

314 

315 sql = f""" 

316 SELECT 

317 cs.hypertable_schema, 

318 cs.hypertable_name, 

319 d.column_name AS time_col, 

320 d.time_interval::text AS chunk_iv, 

321 STRING_AGG( 

322 CASE WHEN cs.segmentby_column_index IS NOT NULL THEN cs.attname END, 

323 ', ' ORDER BY cs.segmentby_column_index 

324 ) AS segment_by, 

325 STRING_AGG( 

326 CASE WHEN cs.orderby_column_index IS NOT NULL THEN 

327 cs.attname || 

328 CASE WHEN cs.orderby_asc THEN '' ELSE ' DESC' END || 

329 CASE WHEN cs.orderby_nullsfirst THEN ' NULLS FIRST' ELSE '' END 

330 END, 

331 ', ' ORDER BY cs.orderby_column_index 

332 ) AS order_by{policy_select} 

333 FROM timescaledb_information.compression_settings cs 

334 LEFT JOIN timescaledb_information.dimensions d 

335 ON cs.hypertable_schema = d.hypertable_schema 

336 AND cs.hypertable_name = d.hypertable_name 

337 AND d.dimension_number = 1 

338 {policy_join} 

339 {where_clause} 

340 GROUP BY cs.hypertable_schema, cs.hypertable_name, d.column_name, d.time_interval 

341 {", pol.compress_after, pol.sched, pol.last_run, pol.next_start, pol.status" if include_policy else ""} 

342 ORDER BY cs.hypertable_schema, cs.hypertable_name 

343 """ 

344 return client.execute_query(sql, params if params else None) 

345 

346 

347# --------------------------------------------------------------------------- 

348# Mutation operations 

349# --------------------------------------------------------------------------- 

350 

351 

352def alter_compression_settings( 

353 client: PgClient, 

354 fqn: str, 

355 *, 

356 segmentby: str | None = None, 

357 orderby: str | None = None, 

358) -> None: 

359 parts = [] 

360 if segmentby: 

361 parts.append(f"timescaledb.compress_segmentby = '{segmentby}'") 

362 if orderby: 

363 parts.append(f"timescaledb.compress_orderby = '{orderby}'") 

364 if parts: 

365 client.execute_query(f"ALTER TABLE {fqn} SET ({', '.join(parts)})") 

366 

367 

368def set_compression_enabled(client: PgClient, fqn: str, *, enabled: bool) -> None: 

369 if enabled: 

370 client.execute_query(f"ALTER TABLE {fqn} SET (timescaledb.compress)") 

371 else: 

372 client.execute_query(f"ALTER TABLE {fqn} SET (timescaledb.compress = false)") 

373 

374 

375def add_compression_policy( 

376 client: PgClient, 

377 schema: str, 

378 table: str, 

379 compress_after: str, 

380 *, 

381 schedule: str | None = None, 

382) -> int | None: 

383 """Returns job_id if created, None otherwise.""" 

384 if schedule: 

385 sql = """ 

386 SELECT add_compression_policy( 

387 (quote_ident(%(schema)s) || '.' || quote_ident(%(table)s))::regclass, 

388 compress_after => %(after)s::interval, 

389 schedule_interval => %(schedule)s::interval, 

390 if_not_exists => true 

391 ) 

392 """ 

393 params: dict[str, Any] = { 

394 "schema": schema, 

395 "table": table, 

396 "after": compress_after, 

397 "schedule": schedule, 

398 } 

399 else: 

400 sql = """ 

401 SELECT add_compression_policy( 

402 (quote_ident(%(schema)s) || '.' || quote_ident(%(table)s))::regclass, 

403 compress_after => %(after)s::interval, 

404 if_not_exists => true 

405 ) 

406 """ 

407 params = {"schema": schema, "table": table, "after": compress_after} 

408 

409 result = client.execute_query(sql, params) 

410 return result.rows[0][0] if result.rows else None 

411 

412 

413def remove_compression_policy(client: PgClient, schema: str, table: str) -> None: 

414 sql = """ 

415 SELECT remove_compression_policy( 

416 (quote_ident(%(schema)s) || '.' || quote_ident(%(table)s))::regclass 

417 ) 

418 """ 

419 client.execute_query(sql, {"schema": schema, "table": table}) 

420 

421 

422def set_chunk_time_interval( 

423 client: PgClient, schema: str, table: str, interval: str 

424) -> None: 

425 sql = """ 

426 SELECT set_chunk_time_interval( 

427 (quote_ident(%(schema)s) || '.' || quote_ident(%(table)s))::regclass, 

428 %(interval)s::interval 

429 ) 

430 """ 

431 client.execute_query(sql, {"schema": schema, "table": table, "interval": interval}) 

432 

433 

434def count_compressed_chunks(client: PgClient, schema: str, table: str) -> int: 

435 sql = """ 

436 SELECT COUNT(*) FROM timescaledb_information.chunks 

437 WHERE hypertable_schema = %(schema)s 

438 AND hypertable_name = %(table)s 

439 AND is_compressed = true 

440 """ 

441 result = client.execute_query(sql, {"schema": schema, "table": table}) 

442 return result.rows[0][0] if result.rows else 0 

443 

444 

445def list_compressed_chunk_names(client: PgClient, schema: str, table: str) -> list[str]: 

446 sql = """ 

447 SELECT chunk_name FROM timescaledb_information.chunks 

448 WHERE hypertable_schema = %(schema)s 

449 AND hypertable_name = %(table)s 

450 AND is_compressed = true 

451 ORDER BY range_start 

452 """ 

453 result = client.execute_query(sql, {"schema": schema, "table": table}) 

454 return [row[0] for row in result.rows] 

455 

456 

457def recompress_chunk(client: PgClient, chunk_name: str) -> None: 

458 chunk_fqn = f"_timescaledb_internal.{chunk_name}" 

459 client.execute_query(f"SELECT decompress_chunk('{chunk_fqn}'::regclass)") 

460 client.execute_query(f"SELECT compress_chunk('{chunk_fqn}'::regclass)") 

461 

462 

463def list_chunk_info( 

464 client: PgClient, schema: str, table: str 

465) -> list[tuple[str, bool]]: 

466 sql = """ 

467 SELECT chunk_name, is_compressed 

468 FROM timescaledb_information.chunks 

469 WHERE hypertable_schema = %(schema)s 

470 AND hypertable_name = %(table)s 

471 ORDER BY range_start 

472 """ 

473 result = client.execute_query(sql, {"schema": schema, "table": table}) 

474 return [(row[0], row[1]) for row in result.rows] 

475 

476 

477def compress_single_chunk(client: PgClient, chunk_name: str) -> None: 

478 chunk_fqn = f"_timescaledb_internal.{chunk_name}" 

479 client.execute_query(f"SELECT compress_chunk('{chunk_fqn}'::regclass)") 

480 

481 

482def parse_chunk_id(chunk_name: str) -> int | None: 

483 """Extract numeric chunk ID from chunk name like '_hyper_16_11420_chunk'.""" 

484 parts = chunk_name.split("_") 

485 if len(parts) >= 4: 

486 try: 

487 return int(parts[-2]) 

488 except ValueError: 

489 pass 

490 return None