Coverage for src / sql_tool / cli / commands / ts.py: 70%
260 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-14 15:28 -0500
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-14 15:28 -0500
1"""TimescaleDB administration commands.
3Thin CLI layer: typer decorators, argument parsing, output formatting.
4Business logic delegated to core.timescaledb module.
5"""
7from __future__ import annotations
9from typing import Annotated, Any
11import typer
13from sql_tool.cli.commands._shared import (
14 apply_local_format_options,
15 get_client,
16 is_table_format,
17 output_result,
18 parse_table_arg,
19)
20from sql_tool.cli.helpers import (
21 format_relative_time,
22 format_size_compact,
23 format_size_gb,
24 format_timestamp,
25 normalize_pg_interval,
26)
27from sql_tool.cli.output import OutputFormat
28from sql_tool.core.models import ColumnMeta, QueryResult
29from sql_tool.core.timescaledb import (
30 add_compression_policy,
31 alter_compression_settings,
32 check_timescaledb_available,
33 compress_single_chunk,
34 compression_stats,
35 count_compressed_chunks,
36 list_chunk_info,
37 list_chunks,
38 list_compressed_chunk_names,
39 list_compression_settings,
40 list_continuous_aggregates,
41 list_hypertables,
42 list_job_history,
43 list_jobs,
44 list_refresh_status,
45 list_retention_policies,
46 parse_chunk_id,
47 recompress_chunk,
48 remove_compression_policy,
49 set_chunk_time_interval,
50 set_compression_enabled,
51)
53ts_app = typer.Typer(help="TimescaleDB administration commands")
56@ts_app.callback(invoke_without_command=True)
57def ts_callback(
58 ctx: typer.Context,
59 database: Annotated[
60 str | None,
61 typer.Option("--database", "-d", help="Database name"),
62 ] = None,
63) -> None:
64 if database is not None:
65 ctx.ensure_object(dict)["database"] = database
66 if not ctx.invoked_subcommand:
67 typer.echo(ctx.get_help())
68 raise typer.Exit()
71@ts_app.command("hypertables")
72def hypertables_command(
73 ctx: typer.Context,
74 schema_filter: Annotated[
75 str | None,
76 typer.Option("--schema", "-s", help="Filter by schema"),
77 ] = None,
78 format: Annotated[
79 OutputFormat | None,
80 typer.Option("--format", "-f", help="Output format: table|json|csv"),
81 ] = None,
82 table: Annotated[
83 bool,
84 typer.Option("--table", help="Shorthand for --format table"),
85 ] = False,
86 compact: Annotated[
87 bool,
88 typer.Option("--compact", help="Compact JSON output (no indentation)"),
89 ] = False,
90 width: Annotated[
91 int | None,
92 typer.Option("--width", help="Column width for table format"),
93 ] = None,
94 no_header: Annotated[
95 bool,
96 typer.Option("--no-header", help="Suppress header row in CSV output"),
97 ] = False,
98) -> None:
99 """List hypertables with size, chunk counts (compressed/uncompressed), and compression info."""
100 apply_local_format_options(
101 ctx,
102 format=format,
103 table=table,
104 compact=compact,
105 width=width,
106 no_header=no_header,
107 )
109 effective_schema = schema_filter or ctx.ensure_object(dict).get("schema")
111 with get_client(ctx) as client:
112 check_timescaledb_available(client)
113 raw = list_hypertables(client, schema_filter=effective_schema)
115 is_tbl = is_table_format(ctx)
116 fmt_size = format_size_compact if is_tbl else (lambda b: str(b or 0))
118 total_size = 0
119 total_uncompr = 0
120 total_compr = 0
121 total_before = 0
122 total_after = 0
124 rows: list[tuple[Any, ...]] = []
125 for (
126 schema,
127 tbl_name,
128 col,
129 interval,
130 size,
131 uncompr_c,
132 compr_c,
133 before_bytes,
134 after_bytes,
135 compr_enabled,
136 ) in raw.rows:
137 total_size += size or 0
138 total_uncompr += uncompr_c or 0
139 total_compr += compr_c or 0
140 total_before += before_bytes or 0
141 total_after += after_bytes or 0
142 uncompr_size = (size or 0) - (after_bytes or 0) if after_bytes else (size or 0)
143 rows.append(
144 (
145 schema,
146 tbl_name,
147 col,
148 normalize_pg_interval(interval),
149 fmt_size(size),
150 uncompr_c,
151 compr_c,
152 fmt_size(uncompr_size) if uncompr_size else "-",
153 fmt_size(before_bytes) if before_bytes else "-",
154 fmt_size(after_bytes) if after_bytes else "-",
155 compr_enabled,
156 )
157 )
159 total_uncompr_size = total_size - total_after
160 rows.append(
161 (
162 "TOTAL",
163 "",
164 "",
165 "",
166 fmt_size(total_size),
167 total_uncompr,
168 total_compr,
169 fmt_size(total_uncompr_size) if total_uncompr_size else "-",
170 fmt_size(total_before) if total_before else "-",
171 fmt_size(total_after) if total_after else "-",
172 "",
173 )
174 )
176 result = QueryResult(
177 columns=[
178 ColumnMeta(name="schema", type_oid=25, type_name="text"),
179 ColumnMeta(name="table", type_oid=25, type_name="text"),
180 ColumnMeta(name="time_col", type_oid=25, type_name="text"),
181 ColumnMeta(name="chunk_iv", type_oid=25, type_name="text"),
182 ColumnMeta(name="size", type_oid=25, type_name="text"),
183 ColumnMeta(name="uncompr", type_oid=20, type_name="int8"),
184 ColumnMeta(name="compr", type_oid=20, type_name="int8"),
185 ColumnMeta(name="uncompr_size", type_oid=25, type_name="text"),
186 ColumnMeta(name="before_size", type_oid=25, type_name="text"),
187 ColumnMeta(name="after_size", type_oid=25, type_name="text"),
188 ColumnMeta(name="compr_on", type_oid=16, type_name="bool"),
189 ],
190 rows=rows,
191 row_count=len(rows),
192 status_message=raw.status_message,
193 )
194 output_result(ctx, result)
197@ts_app.command("chunks")
198def chunks_command(
199 ctx: typer.Context,
200 hypertable: Annotated[
201 str, typer.Argument(help="Hypertable name (schema.table or table)")
202 ],
203 format: Annotated[
204 str | None,
205 typer.Option("--format", "-f", help="Output format: table|json|csv"),
206 ] = None,
207 table: Annotated[
208 bool,
209 typer.Option("--table", help="Shorthand for --format table"),
210 ] = False,
211 compact: Annotated[
212 bool,
213 typer.Option("--compact", help="Compact JSON output (no indentation)"),
214 ] = False,
215 width: Annotated[
216 int | None,
217 typer.Option("--width", help="Column width for table format"),
218 ] = None,
219 no_header: Annotated[
220 bool,
221 typer.Option("--no-header", help="Suppress header row in CSV output"),
222 ] = False,
223) -> None:
224 """Show chunks for a hypertable with size and compression status.
226 Supports schema-qualified names like 'public.metrics' or just 'metrics'."""
227 apply_local_format_options(
228 ctx,
229 format=OutputFormat(format) if format else None,
230 table=table,
231 compact=compact,
232 width=width,
233 no_header=no_header,
234 )
236 schema_name, table_name = parse_table_arg(hypertable)
238 with get_client(ctx) as client:
239 check_timescaledb_available(client)
240 raw = list_chunks(client, schema_name, table_name)
242 is_tbl = is_table_format(ctx)
243 fmt_size = format_size_compact if is_tbl else (lambda b: str(b or 0))
245 total_bytes = 0
246 uncompr_count = 0
247 compr_count = 0
248 uncompr_bytes = 0
249 compr_bytes = 0
251 rows: list[tuple[Any, ...]] = []
252 for chunk_name, range_start, range_end, is_compressed, size_bytes in raw.rows:
253 total_bytes += size_bytes or 0
254 if is_compressed:
255 compr_count += 1
256 compr_bytes += size_bytes or 0
257 else:
258 uncompr_count += 1
259 uncompr_bytes += size_bytes or 0
260 rows.append(
261 (chunk_name, range_start, range_end, is_compressed, fmt_size(size_bytes))
262 )
264 rows.append(("TOTAL", "", "", "", fmt_size(total_bytes)))
266 result = QueryResult(
267 columns=[
268 ColumnMeta(name="chunk_name", type_oid=25, type_name="text"),
269 ColumnMeta(name="range_start", type_oid=25, type_name="text"),
270 ColumnMeta(name="range_end", type_oid=25, type_name="text"),
271 ColumnMeta(name="is_compressed", type_oid=16, type_name="bool"),
272 ColumnMeta(name="size", type_oid=25, type_name="text"),
273 ],
274 rows=rows,
275 row_count=len(rows),
276 status_message=raw.status_message,
277 )
278 output_result(ctx, result)
280 if is_tbl:
281 typer.echo(
282 f"\nUncompressed: {uncompr_count} chunks, {format_size_compact(uncompr_bytes)}\n"
283 f"Compressed: {compr_count} chunks, {format_size_compact(compr_bytes)}\n"
284 f"Total: {uncompr_count + compr_count} chunks, {format_size_compact(total_bytes)}",
285 err=True,
286 )
289@ts_app.command("compression")
290def compression_command(
291 ctx: typer.Context,
292 hypertable: Annotated[
293 str | None,
294 typer.Argument(help="Hypertable name (schema.table or table). Omit for all."),
295 ] = None,
296 format: Annotated[
297 OutputFormat | None,
298 typer.Option("--format", "-f", help="Output format: table|json|csv"),
299 ] = None,
300 table: Annotated[
301 bool,
302 typer.Option("--table", help="Shorthand for --format table"),
303 ] = False,
304 compact: Annotated[
305 bool,
306 typer.Option("--compact", help="Compact JSON output (no indentation)"),
307 ] = False,
308 width: Annotated[
309 int | None,
310 typer.Option("--width", help="Column width for table format"),
311 ] = None,
312 no_header: Annotated[
313 bool,
314 typer.Option("--no-header", help="Suppress header row in CSV output"),
315 ] = False,
316) -> None:
317 """Show compression statistics per hypertable with compressed/uncompressed sizes."""
318 apply_local_format_options(
319 ctx,
320 format=format,
321 table=table,
322 compact=compact,
323 width=width,
324 no_header=no_header,
325 )
327 schema_name = table_name = None
328 if hypertable:
329 schema_name, table_name = parse_table_arg(hypertable)
331 with get_client(ctx) as client:
332 check_timescaledb_available(client)
333 raw = compression_stats(client, schema=schema_name, table=table_name)
335 fmt_size = format_size_compact if is_table_format(ctx) else format_size_gb
336 rows = [
337 (schema, tbl, total, compr, fmt_size(before), fmt_size(after))
338 for schema, tbl, total, compr, before, after in raw.rows
339 ]
341 result = QueryResult(
342 columns=[
343 ColumnMeta(name="schema", type_oid=25, type_name="text"),
344 ColumnMeta(name="table", type_oid=25, type_name="text"),
345 ColumnMeta(name="chunks", type_oid=20, type_name="int8"),
346 ColumnMeta(name="compr", type_oid=20, type_name="int8"),
347 ColumnMeta(name="before", type_oid=25, type_name="text"),
348 ColumnMeta(name="after", type_oid=25, type_name="text"),
349 ],
350 rows=rows,
351 row_count=len(rows),
352 status_message=raw.status_message,
353 )
354 output_result(ctx, result)
357@ts_app.command("caggs")
358def caggs_command(
359 ctx: typer.Context,
360 format: Annotated[
361 OutputFormat | None,
362 typer.Option("--format", "-f", help="Output format: table|json|csv"),
363 ] = None,
364 table: Annotated[
365 bool,
366 typer.Option("--table", help="Shorthand for --format table"),
367 ] = False,
368 compact: Annotated[
369 bool,
370 typer.Option("--compact", help="Compact JSON output (no indentation)"),
371 ] = False,
372 width: Annotated[
373 int | None,
374 typer.Option("--width", help="Column width for table format"),
375 ] = None,
376 no_header: Annotated[
377 bool,
378 typer.Option("--no-header", help="Suppress header row in CSV output"),
379 ] = False,
380) -> None:
381 """List continuous aggregates with source hypertable and materialization details."""
382 apply_local_format_options(
383 ctx,
384 format=format,
385 table=table,
386 compact=compact,
387 width=width,
388 no_header=no_header,
389 )
391 with get_client(ctx) as client:
392 check_timescaledb_available(client)
393 result = list_continuous_aggregates(client)
395 output_result(ctx, result)
398@ts_app.command("retention")
399def retention_command(
400 ctx: typer.Context,
401 format: Annotated[
402 OutputFormat | None,
403 typer.Option("--format", "-f", help="Output format: table|json|csv"),
404 ] = None,
405 table: Annotated[
406 bool,
407 typer.Option("--table", help="Shorthand for --format table"),
408 ] = False,
409 compact: Annotated[
410 bool,
411 typer.Option("--compact", help="Compact JSON output (no indentation)"),
412 ] = False,
413 width: Annotated[
414 int | None,
415 typer.Option("--width", help="Column width for table format"),
416 ] = None,
417 no_header: Annotated[
418 bool,
419 typer.Option("--no-header", help="Suppress header row in CSV output"),
420 ] = False,
421) -> None:
422 """List retention policies with drop interval, schedule, and execution times."""
423 apply_local_format_options(
424 ctx,
425 format=format,
426 table=table,
427 compact=compact,
428 width=width,
429 no_header=no_header,
430 )
432 with get_client(ctx) as client:
433 check_timescaledb_available(client)
434 result = list_retention_policies(client)
436 output_result(ctx, result)
439@ts_app.command("refresh-status")
440def refresh_status_command(
441 ctx: typer.Context,
442 format: Annotated[
443 OutputFormat | None,
444 typer.Option("--format", "-f", help="Output format: table|json|csv"),
445 ] = None,
446 table: Annotated[
447 bool,
448 typer.Option("--table", help="Shorthand for --format table"),
449 ] = False,
450 compact: Annotated[
451 bool,
452 typer.Option("--compact", help="Compact JSON output (no indentation)"),
453 ] = False,
454 width: Annotated[
455 int | None,
456 typer.Option("--width", help="Column width for table format"),
457 ] = None,
458 no_header: Annotated[
459 bool,
460 typer.Option("--no-header", help="Suppress header row in CSV output"),
461 ] = False,
462) -> None:
463 """Refresh schedule, last run status, and execution statistics for each continuous aggregate."""
464 apply_local_format_options(
465 ctx,
466 format=format,
467 table=table,
468 compact=compact,
469 width=width,
470 no_header=no_header,
471 )
473 with get_client(ctx) as client:
474 check_timescaledb_available(client)
475 result = list_refresh_status(client)
477 output_result(ctx, result)
480@ts_app.command("jobs")
481def jobs_command(
482 ctx: typer.Context,
483 history: Annotated[
484 bool,
485 typer.Option("--history", help="Show job execution history (last 20 runs)"),
486 ] = False,
487 job_id: Annotated[
488 int | None,
489 typer.Option("--job", help="Filter by job ID (use with --history)"),
490 ] = None,
491 format: Annotated[
492 OutputFormat | None,
493 typer.Option("--format", "-f", help="Output format: table|json|csv"),
494 ] = None,
495 table: Annotated[
496 bool,
497 typer.Option("--table", help="Shorthand for --format table"),
498 ] = False,
499 compact: Annotated[
500 bool,
501 typer.Option("--compact", help="Compact JSON output (no indentation)"),
502 ] = False,
503 width: Annotated[
504 int | None,
505 typer.Option("--width", help="Column width for table format"),
506 ] = None,
507 no_header: Annotated[
508 bool,
509 typer.Option("--no-header", help="Suppress header row in CSV output"),
510 ] = False,
511) -> None:
512 """Show all TimescaleDB background jobs with schedule and execution stats."""
513 apply_local_format_options(
514 ctx,
515 format=format,
516 table=table,
517 compact=compact,
518 width=width,
519 no_header=no_header,
520 )
521 if history:
522 _jobs_history_output(ctx, job_id)
523 return
525 with get_client(ctx) as client:
526 check_timescaledb_available(client)
527 raw = list_jobs(client)
529 is_tbl = is_table_format(ctx)
531 if is_tbl:
532 rows = [
533 (
534 jid,
535 app_name,
536 hypertable_val,
537 normalize_pg_interval(schedule),
538 format_relative_time(last_run_secs),
539 status,
540 format_relative_time(-next_start_secs)
541 if next_start_secs and next_start_secs < 0
542 else (
543 f"in {format_relative_time(next_start_secs).replace(' ago', '')}"
544 if next_start_secs
545 else "-"
546 ),
547 total_runs,
548 total_successes,
549 total_failures,
550 )
551 for jid, app_name, hypertable_val, schedule, _last_run, last_run_secs, status, _next_start, next_start_secs, total_runs, total_successes, total_failures in raw.rows
552 ]
553 else:
554 rows = [
555 (
556 jid,
557 app_name,
558 hypertable_val,
559 normalize_pg_interval(schedule),
560 format_timestamp(last_run),
561 status,
562 format_timestamp(next_start),
563 total_runs,
564 total_successes,
565 total_failures,
566 )
567 for jid, app_name, hypertable_val, schedule, last_run, _last_run_secs, status, next_start, _next_start_secs, total_runs, total_successes, total_failures in raw.rows
568 ]
570 result = QueryResult(
571 columns=[
572 ColumnMeta(name="job_id", type_oid=23, type_name="int4"),
573 ColumnMeta(name="application_name", type_oid=25, type_name="text"),
574 ColumnMeta(name="hypertable", type_oid=25, type_name="text"),
575 ColumnMeta(name="schedule", type_oid=25, type_name="text"),
576 ColumnMeta(name="last_run", type_oid=25, type_name="text"),
577 ColumnMeta(name="last_run_status", type_oid=25, type_name="text"),
578 ColumnMeta(name="next_start", type_oid=25, type_name="text"),
579 ColumnMeta(name="total_runs", type_oid=20, type_name="int8"),
580 ColumnMeta(name="total_successes", type_oid=20, type_name="int8"),
581 ColumnMeta(name="total_failures", type_oid=20, type_name="int8"),
582 ],
583 rows=rows,
584 row_count=len(rows),
585 status_message=raw.status_message,
586 )
587 output_result(ctx, result)
590def _jobs_history_output(ctx: typer.Context, job_id: int | None) -> None:
591 with get_client(ctx) as client:
592 check_timescaledb_available(client)
593 try:
594 raw = list_job_history(client, job_id=job_id)
595 except Exception:
596 typer.echo("No job history table found.", err=True)
597 raise typer.Exit(1) from None
599 rows = [
600 (
601 jid,
602 succeeded,
603 format_timestamp(exec_start),
604 format_timestamp(exec_finish),
605 error_data[:200] if error_data else "-",
606 )
607 for jid, succeeded, exec_start, exec_finish, error_data in raw.rows
608 ]
610 result = QueryResult(
611 columns=[
612 ColumnMeta(name="job_id", type_oid=23, type_name="int4"),
613 ColumnMeta(name="succeeded", type_oid=16, type_name="bool"),
614 ColumnMeta(name="execution_start", type_oid=25, type_name="text"),
615 ColumnMeta(name="execution_finish", type_oid=25, type_name="text"),
616 ColumnMeta(name="error_data", type_oid=25, type_name="text"),
617 ],
618 rows=rows,
619 row_count=len(rows),
620 status_message=raw.status_message,
621 )
622 output_result(ctx, result)
625@ts_app.command("compression-settings")
626def compression_settings_command(
627 ctx: typer.Context,
628 hypertable: Annotated[
629 str | None,
630 typer.Argument(help="Hypertable name (schema.table or table). Omit for all."),
631 ] = None,
632 schema_filter: Annotated[
633 str | None,
634 typer.Option("--schema", "-s", help="Filter by schema"),
635 ] = None,
636 no_policy: Annotated[
637 bool,
638 typer.Option("--no-policy", help="Hide policy columns (show settings only)"),
639 ] = False,
640 format: Annotated[
641 OutputFormat | None,
642 typer.Option("--format", "-f", help="Output format: table|json|csv"),
643 ] = None,
644 table: Annotated[
645 bool,
646 typer.Option("--table", help="Shorthand for --format table"),
647 ] = False,
648 compact: Annotated[
649 bool,
650 typer.Option("--compact", help="Compact JSON output (no indentation)"),
651 ] = False,
652 width: Annotated[
653 int | None,
654 typer.Option("--width", help="Column width for table format"),
655 ] = None,
656 no_header: Annotated[
657 bool,
658 typer.Option("--no-header", help="Suppress header row in CSV output"),
659 ] = False,
660) -> None:
661 """Show compression settings and policy info for hypertables."""
662 apply_local_format_options(
663 ctx,
664 format=format,
665 table=table,
666 compact=compact,
667 width=width,
668 no_header=no_header,
669 )
671 schema_name = table_name = None
672 if hypertable:
673 schema_name, table_name = parse_table_arg(hypertable)
675 eff_schema = schema_filter or ctx.ensure_object(dict).get("schema")
677 with get_client(ctx) as client:
678 check_timescaledb_available(client)
679 raw = list_compression_settings(
680 client,
681 schema=schema_name,
682 table=table_name,
683 effective_schema=eff_schema if not hypertable else None,
684 include_policy=not no_policy,
685 )
687 rows: list[tuple[Any, ...]]
688 if no_policy:
689 rows = [
690 (
691 schema,
692 tbl,
693 time_col or "-",
694 normalize_pg_interval(chunk_iv) if chunk_iv else "-",
695 segment_by or "-",
696 order_by or "-",
697 )
698 for schema, tbl, time_col, chunk_iv, segment_by, order_by in raw.rows
699 ]
700 columns = [
701 ColumnMeta(name="schema", type_oid=25, type_name="text"),
702 ColumnMeta(name="table", type_oid=25, type_name="text"),
703 ColumnMeta(name="time_col", type_oid=25, type_name="text"),
704 ColumnMeta(name="chunk_iv", type_oid=25, type_name="text"),
705 ColumnMeta(name="seg_by", type_oid=25, type_name="text"),
706 ColumnMeta(name="order_by", type_oid=25, type_name="text"),
707 ]
708 else:
709 rows = [
710 (
711 schema,
712 tbl,
713 time_col or "-",
714 normalize_pg_interval(chunk_iv) if chunk_iv else "-",
715 segment_by or "-",
716 order_by or "-",
717 normalize_pg_interval(compress_after) if compress_after else "-",
718 normalize_pg_interval(sched) if sched else "-",
719 format_timestamp(last_run),
720 format_timestamp(next_start),
721 status or "-",
722 )
723 for schema, tbl, time_col, chunk_iv, segment_by, order_by, compress_after, sched, last_run, next_start, status in raw.rows
724 ]
725 columns = [
726 ColumnMeta(name="schema", type_oid=25, type_name="text"),
727 ColumnMeta(name="table", type_oid=25, type_name="text"),
728 ColumnMeta(name="time_col", type_oid=25, type_name="text"),
729 ColumnMeta(name="chunk_iv", type_oid=25, type_name="text"),
730 ColumnMeta(name="seg_by", type_oid=25, type_name="text"),
731 ColumnMeta(name="order_by", type_oid=25, type_name="text"),
732 ColumnMeta(name="compress", type_oid=25, type_name="text"),
733 ColumnMeta(name="sched", type_oid=25, type_name="text"),
734 ColumnMeta(name="last_run", type_oid=25, type_name="text"),
735 ColumnMeta(name="next_start", type_oid=25, type_name="text"),
736 ColumnMeta(name="status", type_oid=25, type_name="text"),
737 ]
739 result = QueryResult(
740 columns=columns,
741 rows=rows,
742 row_count=len(rows),
743 status_message=raw.status_message,
744 )
745 output_result(ctx, result)
748@ts_app.command("compression-set")
749def compression_set_command(
750 ctx: typer.Context,
751 hypertable: Annotated[
752 str, typer.Argument(help="Hypertable name (schema.table or table)")
753 ],
754 segmentby: Annotated[
755 str | None,
756 typer.Option("--segmentby", help="Comma-separated segment_by columns"),
757 ] = None,
758 orderby: Annotated[
759 str | None,
760 typer.Option(
761 "--orderby", help="Comma-separated order_by columns (e.g. 'timestamp DESC')"
762 ),
763 ] = None,
764 enable: Annotated[
765 bool,
766 typer.Option("--enable", help="Enable compression on the hypertable"),
767 ] = False,
768 disable: Annotated[
769 bool,
770 typer.Option("--disable", help="Disable compression on the hypertable"),
771 ] = False,
772 policy: Annotated[
773 str | None,
774 typer.Option(
775 "--policy", help="Set compression policy interval (e.g. '1 hour', '7 days')"
776 ),
777 ] = None,
778 schedule: Annotated[
779 str | None,
780 typer.Option(
781 "--schedule", help="Policy schedule interval (default: TimescaleDB default)"
782 ),
783 ] = None,
784 remove_policy_flag: Annotated[
785 bool,
786 typer.Option("--remove-policy", help="Remove the compression policy"),
787 ] = False,
788 chunk_interval: Annotated[
789 str | None,
790 typer.Option(
791 "--chunk-interval", help="Set chunk interval (e.g. '4 hours', '1 day')"
792 ),
793 ] = None,
794) -> None:
795 """Configure compression settings, policies, and chunk interval for a hypertable."""
796 if enable and disable:
797 typer.echo("Error: --enable and --disable are mutually exclusive.", err=True)
798 raise typer.Exit(1)
800 if policy and remove_policy_flag:
801 typer.echo(
802 "Error: --policy and --remove-policy are mutually exclusive.", err=True
803 )
804 raise typer.Exit(1)
806 has_action = any(
807 [
808 segmentby,
809 orderby,
810 enable,
811 disable,
812 policy,
813 remove_policy_flag,
814 chunk_interval,
815 ]
816 )
817 if not has_action:
818 typer.echo(
819 "Error: Provide at least one of --segmentby, --orderby, --enable, --disable, "
820 "--policy, --remove-policy, --chunk-interval.",
821 err=True,
822 )
823 raise typer.Exit(1)
825 schema_name, table_name = parse_table_arg(hypertable)
826 fqn = f"{schema_name}.{table_name}"
828 typer.echo(f"Applying to {fqn}:", err=True)
829 if segmentby:
830 typer.echo(f" segment_by: {segmentby}", err=True)
831 if orderby:
832 typer.echo(f" order_by: {orderby}", err=True)
833 if enable:
834 typer.echo(" compression: enable", err=True)
835 if disable:
836 typer.echo(" compression: disable", err=True)
837 if policy:
838 typer.echo(f" policy: compress_after {policy}", err=True)
839 if schedule:
840 typer.echo(f" schedule: {schedule}", err=True)
841 if remove_policy_flag:
842 typer.echo(" policy: remove", err=True)
843 if chunk_interval:
844 typer.echo(f" chunk_interval: {chunk_interval}", err=True)
845 if segmentby or orderby:
846 typer.echo("Warning: Existing compressed chunks keep old settings.", err=True)
847 if chunk_interval:
848 typer.echo("Note: Chunk interval only affects new chunks.", err=True)
850 if not typer.confirm("Proceed?"):
851 raise typer.Abort()
853 with get_client(ctx) as client:
854 check_timescaledb_available(client)
856 if segmentby or orderby:
857 alter_compression_settings(
858 client, fqn, segmentby=segmentby, orderby=orderby
859 )
861 if enable:
862 set_compression_enabled(client, fqn, enabled=True)
863 elif disable:
864 set_compression_enabled(client, fqn, enabled=False)
866 if policy:
867 job_result = add_compression_policy(
868 client,
869 schema_name,
870 table_name,
871 policy,
872 schedule=schedule,
873 )
874 if job_result:
875 typer.echo(f" Policy set (job_id: {job_result})", err=True)
877 if remove_policy_flag:
878 remove_compression_policy(client, schema_name, table_name)
880 if chunk_interval:
881 set_chunk_time_interval(client, schema_name, table_name, chunk_interval)
883 typer.echo(f"Done. Settings applied to {fqn}.", err=True)
886@ts_app.command("recompress")
887def recompress_command(
888 ctx: typer.Context,
889 hypertable: Annotated[
890 str, typer.Argument(help="Hypertable name (schema.table or table)")
891 ],
892) -> None:
893 """Decompress and recompress all chunks with current settings."""
894 schema_name, table_name = parse_table_arg(hypertable)
895 fqn = f"{schema_name}.{table_name}"
897 with get_client(ctx) as client:
898 check_timescaledb_available(client)
899 chunk_count = count_compressed_chunks(client, schema_name, table_name)
901 if chunk_count == 0:
902 typer.echo(f"No compressed chunks found for {fqn}.", err=True)
903 raise typer.Exit()
905 typer.echo(
906 f"This will decompress and recompress {chunk_count} chunks in {fqn}.\n"
907 "This is an expensive operation.",
908 err=True,
909 )
910 if not typer.confirm("Proceed?"):
911 raise typer.Abort()
913 with get_client(ctx, timeout=600) as client:
914 chunk_names = list_compressed_chunk_names(client, schema_name, table_name)
916 for i, chunk_name in enumerate(chunk_names, 1):
917 typer.echo(
918 f"Recompressing chunk {i}/{chunk_count}: {chunk_name}...", err=True
919 )
920 recompress_chunk(client, chunk_name)
922 typer.echo(f"Recompressed {chunk_count} chunks in {fqn}.", err=True)
925@ts_app.command("compress")
926def compress_command(
927 ctx: typer.Context,
928 hypertable: Annotated[
929 str, typer.Argument(help="Hypertable name (schema.table or table)")
930 ],
931 chunk: Annotated[
932 int | None,
933 typer.Option(
934 "--chunk",
935 help="Compress specific chunk by ID (e.g. 11420 from _hyper_16_11420_chunk)",
936 ),
937 ] = None,
938) -> None:
939 """Compress chunks for a hypertable.
941 Without --chunk, compresses all uncompressed chunks except the latest (active) one.
942 With --chunk ID, compresses a specific chunk.
943 """
944 schema_name, table_name = parse_table_arg(hypertable)
946 timeout = 300 if chunk is not None else 600
948 with get_client(ctx, timeout=timeout) as client:
949 check_timescaledb_available(client)
950 chunks = list_chunk_info(client, schema_name, table_name)
952 if chunk is not None:
953 _compress_specific_chunk(client, chunks, chunk, schema_name, table_name)
954 else:
955 _compress_all_uncompressed(client, chunks)
958def _compress_specific_chunk(
959 client: Any,
960 chunks: list[tuple[str, bool]],
961 chunk_id: int,
962 schema_name: str,
963 table_name: str,
964) -> None:
965 chunk_map: dict[int, tuple[str, bool]] = {}
966 for name, compressed in chunks:
967 cid = parse_chunk_id(name)
968 if cid is not None:
969 chunk_map[cid] = (name, compressed)
971 if chunk_id not in chunk_map:
972 typer.echo(
973 f"Error: Chunk ID {chunk_id} not found for {schema_name}.{table_name}.",
974 err=True,
975 )
976 raise typer.Exit(1)
978 chunk_name, is_compressed = chunk_map[chunk_id]
979 if is_compressed:
980 typer.echo(f"Chunk {chunk_name} is already compressed.", err=True)
981 raise typer.Exit()
983 typer.echo(f"Compressing {chunk_name}...", err=True)
984 compress_single_chunk(client, chunk_name)
985 typer.echo(f"Compressed {chunk_name}.", err=True)
988def _compress_all_uncompressed(
989 client: Any,
990 chunks: list[tuple[str, bool]],
991) -> None:
992 if not chunks:
993 typer.echo("No chunks found.", err=True)
994 raise typer.Exit()
996 candidates = chunks[:-1]
997 to_compress = [
998 (name, idx)
999 for idx, (name, compressed) in enumerate(candidates)
1000 if not compressed
1001 ]
1003 if not to_compress:
1004 typer.echo(
1005 "No uncompressed chunks to compress (excluding active chunk).", err=True
1006 )
1007 raise typer.Exit()
1009 typer.echo(
1010 f"Compressing {len(to_compress)} uncompressed chunks "
1011 f"(excluding latest active chunk)...",
1012 err=True,
1013 )
1015 for i, (chunk_name, _) in enumerate(to_compress, 1):
1016 typer.echo(
1017 f"Compressing chunk {i}/{len(to_compress)}: {chunk_name}...", err=True
1018 )
1019 compress_single_chunk(client, chunk_name)