Coverage for src / sql_tool / cli / commands / ts.py: 70%

260 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-14 15:28 -0500

1"""TimescaleDB administration commands. 

2 

3Thin CLI layer: typer decorators, argument parsing, output formatting. 

4Business logic delegated to core.timescaledb module. 

5""" 

6 

7from __future__ import annotations 

8 

9from typing import Annotated, Any 

10 

11import typer 

12 

13from sql_tool.cli.commands._shared import ( 

14 apply_local_format_options, 

15 get_client, 

16 is_table_format, 

17 output_result, 

18 parse_table_arg, 

19) 

20from sql_tool.cli.helpers import ( 

21 format_relative_time, 

22 format_size_compact, 

23 format_size_gb, 

24 format_timestamp, 

25 normalize_pg_interval, 

26) 

27from sql_tool.cli.output import OutputFormat 

28from sql_tool.core.models import ColumnMeta, QueryResult 

29from sql_tool.core.timescaledb import ( 

30 add_compression_policy, 

31 alter_compression_settings, 

32 check_timescaledb_available, 

33 compress_single_chunk, 

34 compression_stats, 

35 count_compressed_chunks, 

36 list_chunk_info, 

37 list_chunks, 

38 list_compressed_chunk_names, 

39 list_compression_settings, 

40 list_continuous_aggregates, 

41 list_hypertables, 

42 list_job_history, 

43 list_jobs, 

44 list_refresh_status, 

45 list_retention_policies, 

46 parse_chunk_id, 

47 recompress_chunk, 

48 remove_compression_policy, 

49 set_chunk_time_interval, 

50 set_compression_enabled, 

51) 

52 

53ts_app = typer.Typer(help="TimescaleDB administration commands") 

54 

55 

56@ts_app.callback(invoke_without_command=True) 

57def ts_callback( 

58 ctx: typer.Context, 

59 database: Annotated[ 

60 str | None, 

61 typer.Option("--database", "-d", help="Database name"), 

62 ] = None, 

63) -> None: 

64 if database is not None: 

65 ctx.ensure_object(dict)["database"] = database 

66 if not ctx.invoked_subcommand: 

67 typer.echo(ctx.get_help()) 

68 raise typer.Exit() 

69 

70 

71@ts_app.command("hypertables") 

72def hypertables_command( 

73 ctx: typer.Context, 

74 schema_filter: Annotated[ 

75 str | None, 

76 typer.Option("--schema", "-s", help="Filter by schema"), 

77 ] = None, 

78 format: Annotated[ 

79 OutputFormat | None, 

80 typer.Option("--format", "-f", help="Output format: table|json|csv"), 

81 ] = None, 

82 table: Annotated[ 

83 bool, 

84 typer.Option("--table", help="Shorthand for --format table"), 

85 ] = False, 

86 compact: Annotated[ 

87 bool, 

88 typer.Option("--compact", help="Compact JSON output (no indentation)"), 

89 ] = False, 

90 width: Annotated[ 

91 int | None, 

92 typer.Option("--width", help="Column width for table format"), 

93 ] = None, 

94 no_header: Annotated[ 

95 bool, 

96 typer.Option("--no-header", help="Suppress header row in CSV output"), 

97 ] = False, 

98) -> None: 

99 """List hypertables with size, chunk counts (compressed/uncompressed), and compression info.""" 

100 apply_local_format_options( 

101 ctx, 

102 format=format, 

103 table=table, 

104 compact=compact, 

105 width=width, 

106 no_header=no_header, 

107 ) 

108 

109 effective_schema = schema_filter or ctx.ensure_object(dict).get("schema") 

110 

111 with get_client(ctx) as client: 

112 check_timescaledb_available(client) 

113 raw = list_hypertables(client, schema_filter=effective_schema) 

114 

115 is_tbl = is_table_format(ctx) 

116 fmt_size = format_size_compact if is_tbl else (lambda b: str(b or 0)) 

117 

118 total_size = 0 

119 total_uncompr = 0 

120 total_compr = 0 

121 total_before = 0 

122 total_after = 0 

123 

124 rows: list[tuple[Any, ...]] = [] 

125 for ( 

126 schema, 

127 tbl_name, 

128 col, 

129 interval, 

130 size, 

131 uncompr_c, 

132 compr_c, 

133 before_bytes, 

134 after_bytes, 

135 compr_enabled, 

136 ) in raw.rows: 

137 total_size += size or 0 

138 total_uncompr += uncompr_c or 0 

139 total_compr += compr_c or 0 

140 total_before += before_bytes or 0 

141 total_after += after_bytes or 0 

142 uncompr_size = (size or 0) - (after_bytes or 0) if after_bytes else (size or 0) 

143 rows.append( 

144 ( 

145 schema, 

146 tbl_name, 

147 col, 

148 normalize_pg_interval(interval), 

149 fmt_size(size), 

150 uncompr_c, 

151 compr_c, 

152 fmt_size(uncompr_size) if uncompr_size else "-", 

153 fmt_size(before_bytes) if before_bytes else "-", 

154 fmt_size(after_bytes) if after_bytes else "-", 

155 compr_enabled, 

156 ) 

157 ) 

158 

159 total_uncompr_size = total_size - total_after 

160 rows.append( 

161 ( 

162 "TOTAL", 

163 "", 

164 "", 

165 "", 

166 fmt_size(total_size), 

167 total_uncompr, 

168 total_compr, 

169 fmt_size(total_uncompr_size) if total_uncompr_size else "-", 

170 fmt_size(total_before) if total_before else "-", 

171 fmt_size(total_after) if total_after else "-", 

172 "", 

173 ) 

174 ) 

175 

176 result = QueryResult( 

177 columns=[ 

178 ColumnMeta(name="schema", type_oid=25, type_name="text"), 

179 ColumnMeta(name="table", type_oid=25, type_name="text"), 

180 ColumnMeta(name="time_col", type_oid=25, type_name="text"), 

181 ColumnMeta(name="chunk_iv", type_oid=25, type_name="text"), 

182 ColumnMeta(name="size", type_oid=25, type_name="text"), 

183 ColumnMeta(name="uncompr", type_oid=20, type_name="int8"), 

184 ColumnMeta(name="compr", type_oid=20, type_name="int8"), 

185 ColumnMeta(name="uncompr_size", type_oid=25, type_name="text"), 

186 ColumnMeta(name="before_size", type_oid=25, type_name="text"), 

187 ColumnMeta(name="after_size", type_oid=25, type_name="text"), 

188 ColumnMeta(name="compr_on", type_oid=16, type_name="bool"), 

189 ], 

190 rows=rows, 

191 row_count=len(rows), 

192 status_message=raw.status_message, 

193 ) 

194 output_result(ctx, result) 

195 

196 

197@ts_app.command("chunks") 

198def chunks_command( 

199 ctx: typer.Context, 

200 hypertable: Annotated[ 

201 str, typer.Argument(help="Hypertable name (schema.table or table)") 

202 ], 

203 format: Annotated[ 

204 str | None, 

205 typer.Option("--format", "-f", help="Output format: table|json|csv"), 

206 ] = None, 

207 table: Annotated[ 

208 bool, 

209 typer.Option("--table", help="Shorthand for --format table"), 

210 ] = False, 

211 compact: Annotated[ 

212 bool, 

213 typer.Option("--compact", help="Compact JSON output (no indentation)"), 

214 ] = False, 

215 width: Annotated[ 

216 int | None, 

217 typer.Option("--width", help="Column width for table format"), 

218 ] = None, 

219 no_header: Annotated[ 

220 bool, 

221 typer.Option("--no-header", help="Suppress header row in CSV output"), 

222 ] = False, 

223) -> None: 

224 """Show chunks for a hypertable with size and compression status. 

225 

226 Supports schema-qualified names like 'public.metrics' or just 'metrics'.""" 

227 apply_local_format_options( 

228 ctx, 

229 format=OutputFormat(format) if format else None, 

230 table=table, 

231 compact=compact, 

232 width=width, 

233 no_header=no_header, 

234 ) 

235 

236 schema_name, table_name = parse_table_arg(hypertable) 

237 

238 with get_client(ctx) as client: 

239 check_timescaledb_available(client) 

240 raw = list_chunks(client, schema_name, table_name) 

241 

242 is_tbl = is_table_format(ctx) 

243 fmt_size = format_size_compact if is_tbl else (lambda b: str(b or 0)) 

244 

245 total_bytes = 0 

246 uncompr_count = 0 

247 compr_count = 0 

248 uncompr_bytes = 0 

249 compr_bytes = 0 

250 

251 rows: list[tuple[Any, ...]] = [] 

252 for chunk_name, range_start, range_end, is_compressed, size_bytes in raw.rows: 

253 total_bytes += size_bytes or 0 

254 if is_compressed: 

255 compr_count += 1 

256 compr_bytes += size_bytes or 0 

257 else: 

258 uncompr_count += 1 

259 uncompr_bytes += size_bytes or 0 

260 rows.append( 

261 (chunk_name, range_start, range_end, is_compressed, fmt_size(size_bytes)) 

262 ) 

263 

264 rows.append(("TOTAL", "", "", "", fmt_size(total_bytes))) 

265 

266 result = QueryResult( 

267 columns=[ 

268 ColumnMeta(name="chunk_name", type_oid=25, type_name="text"), 

269 ColumnMeta(name="range_start", type_oid=25, type_name="text"), 

270 ColumnMeta(name="range_end", type_oid=25, type_name="text"), 

271 ColumnMeta(name="is_compressed", type_oid=16, type_name="bool"), 

272 ColumnMeta(name="size", type_oid=25, type_name="text"), 

273 ], 

274 rows=rows, 

275 row_count=len(rows), 

276 status_message=raw.status_message, 

277 ) 

278 output_result(ctx, result) 

279 

280 if is_tbl: 

281 typer.echo( 

282 f"\nUncompressed: {uncompr_count} chunks, {format_size_compact(uncompr_bytes)}\n" 

283 f"Compressed: {compr_count} chunks, {format_size_compact(compr_bytes)}\n" 

284 f"Total: {uncompr_count + compr_count} chunks, {format_size_compact(total_bytes)}", 

285 err=True, 

286 ) 

287 

288 

289@ts_app.command("compression") 

290def compression_command( 

291 ctx: typer.Context, 

292 hypertable: Annotated[ 

293 str | None, 

294 typer.Argument(help="Hypertable name (schema.table or table). Omit for all."), 

295 ] = None, 

296 format: Annotated[ 

297 OutputFormat | None, 

298 typer.Option("--format", "-f", help="Output format: table|json|csv"), 

299 ] = None, 

300 table: Annotated[ 

301 bool, 

302 typer.Option("--table", help="Shorthand for --format table"), 

303 ] = False, 

304 compact: Annotated[ 

305 bool, 

306 typer.Option("--compact", help="Compact JSON output (no indentation)"), 

307 ] = False, 

308 width: Annotated[ 

309 int | None, 

310 typer.Option("--width", help="Column width for table format"), 

311 ] = None, 

312 no_header: Annotated[ 

313 bool, 

314 typer.Option("--no-header", help="Suppress header row in CSV output"), 

315 ] = False, 

316) -> None: 

317 """Show compression statistics per hypertable with compressed/uncompressed sizes.""" 

318 apply_local_format_options( 

319 ctx, 

320 format=format, 

321 table=table, 

322 compact=compact, 

323 width=width, 

324 no_header=no_header, 

325 ) 

326 

327 schema_name = table_name = None 

328 if hypertable: 

329 schema_name, table_name = parse_table_arg(hypertable) 

330 

331 with get_client(ctx) as client: 

332 check_timescaledb_available(client) 

333 raw = compression_stats(client, schema=schema_name, table=table_name) 

334 

335 fmt_size = format_size_compact if is_table_format(ctx) else format_size_gb 

336 rows = [ 

337 (schema, tbl, total, compr, fmt_size(before), fmt_size(after)) 

338 for schema, tbl, total, compr, before, after in raw.rows 

339 ] 

340 

341 result = QueryResult( 

342 columns=[ 

343 ColumnMeta(name="schema", type_oid=25, type_name="text"), 

344 ColumnMeta(name="table", type_oid=25, type_name="text"), 

345 ColumnMeta(name="chunks", type_oid=20, type_name="int8"), 

346 ColumnMeta(name="compr", type_oid=20, type_name="int8"), 

347 ColumnMeta(name="before", type_oid=25, type_name="text"), 

348 ColumnMeta(name="after", type_oid=25, type_name="text"), 

349 ], 

350 rows=rows, 

351 row_count=len(rows), 

352 status_message=raw.status_message, 

353 ) 

354 output_result(ctx, result) 

355 

356 

357@ts_app.command("caggs") 

358def caggs_command( 

359 ctx: typer.Context, 

360 format: Annotated[ 

361 OutputFormat | None, 

362 typer.Option("--format", "-f", help="Output format: table|json|csv"), 

363 ] = None, 

364 table: Annotated[ 

365 bool, 

366 typer.Option("--table", help="Shorthand for --format table"), 

367 ] = False, 

368 compact: Annotated[ 

369 bool, 

370 typer.Option("--compact", help="Compact JSON output (no indentation)"), 

371 ] = False, 

372 width: Annotated[ 

373 int | None, 

374 typer.Option("--width", help="Column width for table format"), 

375 ] = None, 

376 no_header: Annotated[ 

377 bool, 

378 typer.Option("--no-header", help="Suppress header row in CSV output"), 

379 ] = False, 

380) -> None: 

381 """List continuous aggregates with source hypertable and materialization details.""" 

382 apply_local_format_options( 

383 ctx, 

384 format=format, 

385 table=table, 

386 compact=compact, 

387 width=width, 

388 no_header=no_header, 

389 ) 

390 

391 with get_client(ctx) as client: 

392 check_timescaledb_available(client) 

393 result = list_continuous_aggregates(client) 

394 

395 output_result(ctx, result) 

396 

397 

398@ts_app.command("retention") 

399def retention_command( 

400 ctx: typer.Context, 

401 format: Annotated[ 

402 OutputFormat | None, 

403 typer.Option("--format", "-f", help="Output format: table|json|csv"), 

404 ] = None, 

405 table: Annotated[ 

406 bool, 

407 typer.Option("--table", help="Shorthand for --format table"), 

408 ] = False, 

409 compact: Annotated[ 

410 bool, 

411 typer.Option("--compact", help="Compact JSON output (no indentation)"), 

412 ] = False, 

413 width: Annotated[ 

414 int | None, 

415 typer.Option("--width", help="Column width for table format"), 

416 ] = None, 

417 no_header: Annotated[ 

418 bool, 

419 typer.Option("--no-header", help="Suppress header row in CSV output"), 

420 ] = False, 

421) -> None: 

422 """List retention policies with drop interval, schedule, and execution times.""" 

423 apply_local_format_options( 

424 ctx, 

425 format=format, 

426 table=table, 

427 compact=compact, 

428 width=width, 

429 no_header=no_header, 

430 ) 

431 

432 with get_client(ctx) as client: 

433 check_timescaledb_available(client) 

434 result = list_retention_policies(client) 

435 

436 output_result(ctx, result) 

437 

438 

439@ts_app.command("refresh-status") 

440def refresh_status_command( 

441 ctx: typer.Context, 

442 format: Annotated[ 

443 OutputFormat | None, 

444 typer.Option("--format", "-f", help="Output format: table|json|csv"), 

445 ] = None, 

446 table: Annotated[ 

447 bool, 

448 typer.Option("--table", help="Shorthand for --format table"), 

449 ] = False, 

450 compact: Annotated[ 

451 bool, 

452 typer.Option("--compact", help="Compact JSON output (no indentation)"), 

453 ] = False, 

454 width: Annotated[ 

455 int | None, 

456 typer.Option("--width", help="Column width for table format"), 

457 ] = None, 

458 no_header: Annotated[ 

459 bool, 

460 typer.Option("--no-header", help="Suppress header row in CSV output"), 

461 ] = False, 

462) -> None: 

463 """Refresh schedule, last run status, and execution statistics for each continuous aggregate.""" 

464 apply_local_format_options( 

465 ctx, 

466 format=format, 

467 table=table, 

468 compact=compact, 

469 width=width, 

470 no_header=no_header, 

471 ) 

472 

473 with get_client(ctx) as client: 

474 check_timescaledb_available(client) 

475 result = list_refresh_status(client) 

476 

477 output_result(ctx, result) 

478 

479 

480@ts_app.command("jobs") 

481def jobs_command( 

482 ctx: typer.Context, 

483 history: Annotated[ 

484 bool, 

485 typer.Option("--history", help="Show job execution history (last 20 runs)"), 

486 ] = False, 

487 job_id: Annotated[ 

488 int | None, 

489 typer.Option("--job", help="Filter by job ID (use with --history)"), 

490 ] = None, 

491 format: Annotated[ 

492 OutputFormat | None, 

493 typer.Option("--format", "-f", help="Output format: table|json|csv"), 

494 ] = None, 

495 table: Annotated[ 

496 bool, 

497 typer.Option("--table", help="Shorthand for --format table"), 

498 ] = False, 

499 compact: Annotated[ 

500 bool, 

501 typer.Option("--compact", help="Compact JSON output (no indentation)"), 

502 ] = False, 

503 width: Annotated[ 

504 int | None, 

505 typer.Option("--width", help="Column width for table format"), 

506 ] = None, 

507 no_header: Annotated[ 

508 bool, 

509 typer.Option("--no-header", help="Suppress header row in CSV output"), 

510 ] = False, 

511) -> None: 

512 """Show all TimescaleDB background jobs with schedule and execution stats.""" 

513 apply_local_format_options( 

514 ctx, 

515 format=format, 

516 table=table, 

517 compact=compact, 

518 width=width, 

519 no_header=no_header, 

520 ) 

521 if history: 

522 _jobs_history_output(ctx, job_id) 

523 return 

524 

525 with get_client(ctx) as client: 

526 check_timescaledb_available(client) 

527 raw = list_jobs(client) 

528 

529 is_tbl = is_table_format(ctx) 

530 

531 if is_tbl: 

532 rows = [ 

533 ( 

534 jid, 

535 app_name, 

536 hypertable_val, 

537 normalize_pg_interval(schedule), 

538 format_relative_time(last_run_secs), 

539 status, 

540 format_relative_time(-next_start_secs) 

541 if next_start_secs and next_start_secs < 0 

542 else ( 

543 f"in {format_relative_time(next_start_secs).replace(' ago', '')}" 

544 if next_start_secs 

545 else "-" 

546 ), 

547 total_runs, 

548 total_successes, 

549 total_failures, 

550 ) 

551 for jid, app_name, hypertable_val, schedule, _last_run, last_run_secs, status, _next_start, next_start_secs, total_runs, total_successes, total_failures in raw.rows 

552 ] 

553 else: 

554 rows = [ 

555 ( 

556 jid, 

557 app_name, 

558 hypertable_val, 

559 normalize_pg_interval(schedule), 

560 format_timestamp(last_run), 

561 status, 

562 format_timestamp(next_start), 

563 total_runs, 

564 total_successes, 

565 total_failures, 

566 ) 

567 for jid, app_name, hypertable_val, schedule, last_run, _last_run_secs, status, next_start, _next_start_secs, total_runs, total_successes, total_failures in raw.rows 

568 ] 

569 

570 result = QueryResult( 

571 columns=[ 

572 ColumnMeta(name="job_id", type_oid=23, type_name="int4"), 

573 ColumnMeta(name="application_name", type_oid=25, type_name="text"), 

574 ColumnMeta(name="hypertable", type_oid=25, type_name="text"), 

575 ColumnMeta(name="schedule", type_oid=25, type_name="text"), 

576 ColumnMeta(name="last_run", type_oid=25, type_name="text"), 

577 ColumnMeta(name="last_run_status", type_oid=25, type_name="text"), 

578 ColumnMeta(name="next_start", type_oid=25, type_name="text"), 

579 ColumnMeta(name="total_runs", type_oid=20, type_name="int8"), 

580 ColumnMeta(name="total_successes", type_oid=20, type_name="int8"), 

581 ColumnMeta(name="total_failures", type_oid=20, type_name="int8"), 

582 ], 

583 rows=rows, 

584 row_count=len(rows), 

585 status_message=raw.status_message, 

586 ) 

587 output_result(ctx, result) 

588 

589 

590def _jobs_history_output(ctx: typer.Context, job_id: int | None) -> None: 

591 with get_client(ctx) as client: 

592 check_timescaledb_available(client) 

593 try: 

594 raw = list_job_history(client, job_id=job_id) 

595 except Exception: 

596 typer.echo("No job history table found.", err=True) 

597 raise typer.Exit(1) from None 

598 

599 rows = [ 

600 ( 

601 jid, 

602 succeeded, 

603 format_timestamp(exec_start), 

604 format_timestamp(exec_finish), 

605 error_data[:200] if error_data else "-", 

606 ) 

607 for jid, succeeded, exec_start, exec_finish, error_data in raw.rows 

608 ] 

609 

610 result = QueryResult( 

611 columns=[ 

612 ColumnMeta(name="job_id", type_oid=23, type_name="int4"), 

613 ColumnMeta(name="succeeded", type_oid=16, type_name="bool"), 

614 ColumnMeta(name="execution_start", type_oid=25, type_name="text"), 

615 ColumnMeta(name="execution_finish", type_oid=25, type_name="text"), 

616 ColumnMeta(name="error_data", type_oid=25, type_name="text"), 

617 ], 

618 rows=rows, 

619 row_count=len(rows), 

620 status_message=raw.status_message, 

621 ) 

622 output_result(ctx, result) 

623 

624 

625@ts_app.command("compression-settings") 

626def compression_settings_command( 

627 ctx: typer.Context, 

628 hypertable: Annotated[ 

629 str | None, 

630 typer.Argument(help="Hypertable name (schema.table or table). Omit for all."), 

631 ] = None, 

632 schema_filter: Annotated[ 

633 str | None, 

634 typer.Option("--schema", "-s", help="Filter by schema"), 

635 ] = None, 

636 no_policy: Annotated[ 

637 bool, 

638 typer.Option("--no-policy", help="Hide policy columns (show settings only)"), 

639 ] = False, 

640 format: Annotated[ 

641 OutputFormat | None, 

642 typer.Option("--format", "-f", help="Output format: table|json|csv"), 

643 ] = None, 

644 table: Annotated[ 

645 bool, 

646 typer.Option("--table", help="Shorthand for --format table"), 

647 ] = False, 

648 compact: Annotated[ 

649 bool, 

650 typer.Option("--compact", help="Compact JSON output (no indentation)"), 

651 ] = False, 

652 width: Annotated[ 

653 int | None, 

654 typer.Option("--width", help="Column width for table format"), 

655 ] = None, 

656 no_header: Annotated[ 

657 bool, 

658 typer.Option("--no-header", help="Suppress header row in CSV output"), 

659 ] = False, 

660) -> None: 

661 """Show compression settings and policy info for hypertables.""" 

662 apply_local_format_options( 

663 ctx, 

664 format=format, 

665 table=table, 

666 compact=compact, 

667 width=width, 

668 no_header=no_header, 

669 ) 

670 

671 schema_name = table_name = None 

672 if hypertable: 

673 schema_name, table_name = parse_table_arg(hypertable) 

674 

675 eff_schema = schema_filter or ctx.ensure_object(dict).get("schema") 

676 

677 with get_client(ctx) as client: 

678 check_timescaledb_available(client) 

679 raw = list_compression_settings( 

680 client, 

681 schema=schema_name, 

682 table=table_name, 

683 effective_schema=eff_schema if not hypertable else None, 

684 include_policy=not no_policy, 

685 ) 

686 

687 rows: list[tuple[Any, ...]] 

688 if no_policy: 

689 rows = [ 

690 ( 

691 schema, 

692 tbl, 

693 time_col or "-", 

694 normalize_pg_interval(chunk_iv) if chunk_iv else "-", 

695 segment_by or "-", 

696 order_by or "-", 

697 ) 

698 for schema, tbl, time_col, chunk_iv, segment_by, order_by in raw.rows 

699 ] 

700 columns = [ 

701 ColumnMeta(name="schema", type_oid=25, type_name="text"), 

702 ColumnMeta(name="table", type_oid=25, type_name="text"), 

703 ColumnMeta(name="time_col", type_oid=25, type_name="text"), 

704 ColumnMeta(name="chunk_iv", type_oid=25, type_name="text"), 

705 ColumnMeta(name="seg_by", type_oid=25, type_name="text"), 

706 ColumnMeta(name="order_by", type_oid=25, type_name="text"), 

707 ] 

708 else: 

709 rows = [ 

710 ( 

711 schema, 

712 tbl, 

713 time_col or "-", 

714 normalize_pg_interval(chunk_iv) if chunk_iv else "-", 

715 segment_by or "-", 

716 order_by or "-", 

717 normalize_pg_interval(compress_after) if compress_after else "-", 

718 normalize_pg_interval(sched) if sched else "-", 

719 format_timestamp(last_run), 

720 format_timestamp(next_start), 

721 status or "-", 

722 ) 

723 for schema, tbl, time_col, chunk_iv, segment_by, order_by, compress_after, sched, last_run, next_start, status in raw.rows 

724 ] 

725 columns = [ 

726 ColumnMeta(name="schema", type_oid=25, type_name="text"), 

727 ColumnMeta(name="table", type_oid=25, type_name="text"), 

728 ColumnMeta(name="time_col", type_oid=25, type_name="text"), 

729 ColumnMeta(name="chunk_iv", type_oid=25, type_name="text"), 

730 ColumnMeta(name="seg_by", type_oid=25, type_name="text"), 

731 ColumnMeta(name="order_by", type_oid=25, type_name="text"), 

732 ColumnMeta(name="compress", type_oid=25, type_name="text"), 

733 ColumnMeta(name="sched", type_oid=25, type_name="text"), 

734 ColumnMeta(name="last_run", type_oid=25, type_name="text"), 

735 ColumnMeta(name="next_start", type_oid=25, type_name="text"), 

736 ColumnMeta(name="status", type_oid=25, type_name="text"), 

737 ] 

738 

739 result = QueryResult( 

740 columns=columns, 

741 rows=rows, 

742 row_count=len(rows), 

743 status_message=raw.status_message, 

744 ) 

745 output_result(ctx, result) 

746 

747 

748@ts_app.command("compression-set") 

749def compression_set_command( 

750 ctx: typer.Context, 

751 hypertable: Annotated[ 

752 str, typer.Argument(help="Hypertable name (schema.table or table)") 

753 ], 

754 segmentby: Annotated[ 

755 str | None, 

756 typer.Option("--segmentby", help="Comma-separated segment_by columns"), 

757 ] = None, 

758 orderby: Annotated[ 

759 str | None, 

760 typer.Option( 

761 "--orderby", help="Comma-separated order_by columns (e.g. 'timestamp DESC')" 

762 ), 

763 ] = None, 

764 enable: Annotated[ 

765 bool, 

766 typer.Option("--enable", help="Enable compression on the hypertable"), 

767 ] = False, 

768 disable: Annotated[ 

769 bool, 

770 typer.Option("--disable", help="Disable compression on the hypertable"), 

771 ] = False, 

772 policy: Annotated[ 

773 str | None, 

774 typer.Option( 

775 "--policy", help="Set compression policy interval (e.g. '1 hour', '7 days')" 

776 ), 

777 ] = None, 

778 schedule: Annotated[ 

779 str | None, 

780 typer.Option( 

781 "--schedule", help="Policy schedule interval (default: TimescaleDB default)" 

782 ), 

783 ] = None, 

784 remove_policy_flag: Annotated[ 

785 bool, 

786 typer.Option("--remove-policy", help="Remove the compression policy"), 

787 ] = False, 

788 chunk_interval: Annotated[ 

789 str | None, 

790 typer.Option( 

791 "--chunk-interval", help="Set chunk interval (e.g. '4 hours', '1 day')" 

792 ), 

793 ] = None, 

794) -> None: 

795 """Configure compression settings, policies, and chunk interval for a hypertable.""" 

796 if enable and disable: 

797 typer.echo("Error: --enable and --disable are mutually exclusive.", err=True) 

798 raise typer.Exit(1) 

799 

800 if policy and remove_policy_flag: 

801 typer.echo( 

802 "Error: --policy and --remove-policy are mutually exclusive.", err=True 

803 ) 

804 raise typer.Exit(1) 

805 

806 has_action = any( 

807 [ 

808 segmentby, 

809 orderby, 

810 enable, 

811 disable, 

812 policy, 

813 remove_policy_flag, 

814 chunk_interval, 

815 ] 

816 ) 

817 if not has_action: 

818 typer.echo( 

819 "Error: Provide at least one of --segmentby, --orderby, --enable, --disable, " 

820 "--policy, --remove-policy, --chunk-interval.", 

821 err=True, 

822 ) 

823 raise typer.Exit(1) 

824 

825 schema_name, table_name = parse_table_arg(hypertable) 

826 fqn = f"{schema_name}.{table_name}" 

827 

828 typer.echo(f"Applying to {fqn}:", err=True) 

829 if segmentby: 

830 typer.echo(f" segment_by: {segmentby}", err=True) 

831 if orderby: 

832 typer.echo(f" order_by: {orderby}", err=True) 

833 if enable: 

834 typer.echo(" compression: enable", err=True) 

835 if disable: 

836 typer.echo(" compression: disable", err=True) 

837 if policy: 

838 typer.echo(f" policy: compress_after {policy}", err=True) 

839 if schedule: 

840 typer.echo(f" schedule: {schedule}", err=True) 

841 if remove_policy_flag: 

842 typer.echo(" policy: remove", err=True) 

843 if chunk_interval: 

844 typer.echo(f" chunk_interval: {chunk_interval}", err=True) 

845 if segmentby or orderby: 

846 typer.echo("Warning: Existing compressed chunks keep old settings.", err=True) 

847 if chunk_interval: 

848 typer.echo("Note: Chunk interval only affects new chunks.", err=True) 

849 

850 if not typer.confirm("Proceed?"): 

851 raise typer.Abort() 

852 

853 with get_client(ctx) as client: 

854 check_timescaledb_available(client) 

855 

856 if segmentby or orderby: 

857 alter_compression_settings( 

858 client, fqn, segmentby=segmentby, orderby=orderby 

859 ) 

860 

861 if enable: 

862 set_compression_enabled(client, fqn, enabled=True) 

863 elif disable: 

864 set_compression_enabled(client, fqn, enabled=False) 

865 

866 if policy: 

867 job_result = add_compression_policy( 

868 client, 

869 schema_name, 

870 table_name, 

871 policy, 

872 schedule=schedule, 

873 ) 

874 if job_result: 

875 typer.echo(f" Policy set (job_id: {job_result})", err=True) 

876 

877 if remove_policy_flag: 

878 remove_compression_policy(client, schema_name, table_name) 

879 

880 if chunk_interval: 

881 set_chunk_time_interval(client, schema_name, table_name, chunk_interval) 

882 

883 typer.echo(f"Done. Settings applied to {fqn}.", err=True) 

884 

885 

886@ts_app.command("recompress") 

887def recompress_command( 

888 ctx: typer.Context, 

889 hypertable: Annotated[ 

890 str, typer.Argument(help="Hypertable name (schema.table or table)") 

891 ], 

892) -> None: 

893 """Decompress and recompress all chunks with current settings.""" 

894 schema_name, table_name = parse_table_arg(hypertable) 

895 fqn = f"{schema_name}.{table_name}" 

896 

897 with get_client(ctx) as client: 

898 check_timescaledb_available(client) 

899 chunk_count = count_compressed_chunks(client, schema_name, table_name) 

900 

901 if chunk_count == 0: 

902 typer.echo(f"No compressed chunks found for {fqn}.", err=True) 

903 raise typer.Exit() 

904 

905 typer.echo( 

906 f"This will decompress and recompress {chunk_count} chunks in {fqn}.\n" 

907 "This is an expensive operation.", 

908 err=True, 

909 ) 

910 if not typer.confirm("Proceed?"): 

911 raise typer.Abort() 

912 

913 with get_client(ctx, timeout=600) as client: 

914 chunk_names = list_compressed_chunk_names(client, schema_name, table_name) 

915 

916 for i, chunk_name in enumerate(chunk_names, 1): 

917 typer.echo( 

918 f"Recompressing chunk {i}/{chunk_count}: {chunk_name}...", err=True 

919 ) 

920 recompress_chunk(client, chunk_name) 

921 

922 typer.echo(f"Recompressed {chunk_count} chunks in {fqn}.", err=True) 

923 

924 

925@ts_app.command("compress") 

926def compress_command( 

927 ctx: typer.Context, 

928 hypertable: Annotated[ 

929 str, typer.Argument(help="Hypertable name (schema.table or table)") 

930 ], 

931 chunk: Annotated[ 

932 int | None, 

933 typer.Option( 

934 "--chunk", 

935 help="Compress specific chunk by ID (e.g. 11420 from _hyper_16_11420_chunk)", 

936 ), 

937 ] = None, 

938) -> None: 

939 """Compress chunks for a hypertable. 

940 

941 Without --chunk, compresses all uncompressed chunks except the latest (active) one. 

942 With --chunk ID, compresses a specific chunk. 

943 """ 

944 schema_name, table_name = parse_table_arg(hypertable) 

945 

946 timeout = 300 if chunk is not None else 600 

947 

948 with get_client(ctx, timeout=timeout) as client: 

949 check_timescaledb_available(client) 

950 chunks = list_chunk_info(client, schema_name, table_name) 

951 

952 if chunk is not None: 

953 _compress_specific_chunk(client, chunks, chunk, schema_name, table_name) 

954 else: 

955 _compress_all_uncompressed(client, chunks) 

956 

957 

958def _compress_specific_chunk( 

959 client: Any, 

960 chunks: list[tuple[str, bool]], 

961 chunk_id: int, 

962 schema_name: str, 

963 table_name: str, 

964) -> None: 

965 chunk_map: dict[int, tuple[str, bool]] = {} 

966 for name, compressed in chunks: 

967 cid = parse_chunk_id(name) 

968 if cid is not None: 

969 chunk_map[cid] = (name, compressed) 

970 

971 if chunk_id not in chunk_map: 

972 typer.echo( 

973 f"Error: Chunk ID {chunk_id} not found for {schema_name}.{table_name}.", 

974 err=True, 

975 ) 

976 raise typer.Exit(1) 

977 

978 chunk_name, is_compressed = chunk_map[chunk_id] 

979 if is_compressed: 

980 typer.echo(f"Chunk {chunk_name} is already compressed.", err=True) 

981 raise typer.Exit() 

982 

983 typer.echo(f"Compressing {chunk_name}...", err=True) 

984 compress_single_chunk(client, chunk_name) 

985 typer.echo(f"Compressed {chunk_name}.", err=True) 

986 

987 

988def _compress_all_uncompressed( 

989 client: Any, 

990 chunks: list[tuple[str, bool]], 

991) -> None: 

992 if not chunks: 

993 typer.echo("No chunks found.", err=True) 

994 raise typer.Exit() 

995 

996 candidates = chunks[:-1] 

997 to_compress = [ 

998 (name, idx) 

999 for idx, (name, compressed) in enumerate(candidates) 

1000 if not compressed 

1001 ] 

1002 

1003 if not to_compress: 

1004 typer.echo( 

1005 "No uncompressed chunks to compress (excluding active chunk).", err=True 

1006 ) 

1007 raise typer.Exit() 

1008 

1009 typer.echo( 

1010 f"Compressing {len(to_compress)} uncompressed chunks " 

1011 f"(excluding latest active chunk)...", 

1012 err=True, 

1013 ) 

1014 

1015 for i, (chunk_name, _) in enumerate(to_compress, 1): 

1016 typer.echo( 

1017 f"Compressing chunk {i}/{len(to_compress)}: {chunk_name}...", err=True 

1018 ) 

1019 compress_single_chunk(client, chunk_name)