Coverage for src / sql_tool / core / timescaledb.py: 74%
124 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-14 15:28 -0500
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-14 15:28 -0500
1"""TimescaleDB administration operations.
3Framework-agnostic business logic for TimescaleDB commands.
4CLI layer in cli/commands/ts.py provides the typer interface.
5"""
7from __future__ import annotations
9from typing import TYPE_CHECKING, Any
11from sql_tool.core.exceptions import SqlToolError
13if TYPE_CHECKING:
14 from sql_tool.core.client import PgClient
15 from sql_tool.core.models import QueryResult
18def check_timescaledb_available(client: PgClient) -> None:
19 """Raise SqlToolError if TimescaleDB extension is not installed."""
20 sql = """
21 SELECT EXISTS(
22 SELECT 1 FROM pg_extension WHERE extname = 'timescaledb'
23 ) AS timescaledb_installed
24 """
25 result = client.execute_query(sql)
26 if not result.rows or not result.rows[0][0]:
27 raise SqlToolError(
28 "TimescaleDB extension is not installed in this database. "
29 "Install it with: CREATE EXTENSION IF NOT EXISTS timescaledb;"
30 )
33# ---------------------------------------------------------------------------
34# Read-only queries
35# ---------------------------------------------------------------------------
38def list_hypertables(
39 client: PgClient,
40 *,
41 schema_filter: str | None = None,
42) -> QueryResult:
43 if schema_filter:
44 where_clause = "WHERE h.hypertable_schema = %(schema)s"
45 params: dict[str, Any] | None = {"schema": schema_filter}
46 else:
47 where_clause = ""
48 params = None
50 sql = f"""
51 SELECT
52 h.hypertable_schema,
53 h.hypertable_name,
54 d.column_name,
55 d.time_interval::text,
56 hypertable_size((quote_ident(h.hypertable_schema) || '.' || quote_ident(h.hypertable_name))::regclass) AS size_bytes,
57 (SELECT COUNT(*) FILTER (WHERE NOT c.is_compressed) FROM timescaledb_information.chunks c
58 WHERE c.hypertable_schema = h.hypertable_schema AND c.hypertable_name = h.hypertable_name) AS uncompr_chunks,
59 (SELECT COUNT(*) FILTER (WHERE c.is_compressed) FROM timescaledb_information.chunks c
60 WHERE c.hypertable_schema = h.hypertable_schema AND c.hypertable_name = h.hypertable_name) AS compr_chunks,
61 cs.before_compression_total_bytes,
62 cs.after_compression_total_bytes,
63 h.compression_enabled
64 FROM timescaledb_information.hypertables h
65 LEFT JOIN timescaledb_information.dimensions d
66 ON h.hypertable_schema = d.hypertable_schema
67 AND h.hypertable_name = d.hypertable_name
68 AND d.dimension_number = 1
69 LEFT JOIN LATERAL (
70 SELECT
71 SUM(before_compression_total_bytes)::bigint AS before_compression_total_bytes,
72 SUM(after_compression_total_bytes)::bigint AS after_compression_total_bytes
73 FROM hypertable_compression_stats(
74 (quote_ident(h.hypertable_schema) || '.' || quote_ident(h.hypertable_name))::regclass
75 )
76 ) cs ON true
77 {where_clause}
78 ORDER BY h.hypertable_schema, h.hypertable_name
79 """
80 return client.execute_query(sql, params)
83def list_chunks(client: PgClient, schema: str, table: str) -> QueryResult:
84 sql = """
85 SELECT
86 c.chunk_name,
87 c.range_start::text AS range_start,
88 c.range_end::text AS range_end,
89 c.is_compressed,
90 pg_total_relation_size(('_timescaledb_internal.' || quote_ident(c.chunk_name))::regclass) AS chunk_size_bytes
91 FROM timescaledb_information.chunks c
92 WHERE c.hypertable_schema = %(schema)s
93 AND c.hypertable_name = %(table)s
94 ORDER BY c.range_start
95 """
96 return client.execute_query(sql, {"schema": schema, "table": table})
99def compression_stats(
100 client: PgClient,
101 *,
102 schema: str | None = None,
103 table: str | None = None,
104) -> QueryResult:
105 """Query before/after compression byte counts per hypertable."""
106 if schema and table:
107 where_clause = """
108 WHERE h.hypertable_schema = %(schema)s
109 AND h.hypertable_name = %(table)s
110 """
111 params: dict[str, Any] = {"schema": schema, "table": table}
112 else:
113 where_clause = ""
114 params = {}
116 sql = f"""
117 SELECT
118 h.hypertable_schema,
119 h.hypertable_name,
120 (SELECT COUNT(*) FROM timescaledb_information.chunks c
121 WHERE c.hypertable_schema = h.hypertable_schema
122 AND c.hypertable_name = h.hypertable_name) AS total_chunks,
123 (SELECT COUNT(*) FROM timescaledb_information.chunks c
124 WHERE c.hypertable_schema = h.hypertable_schema
125 AND c.hypertable_name = h.hypertable_name
126 AND c.is_compressed) AS compr_chunks,
127 d.before_compression_total_bytes,
128 d.after_compression_total_bytes
129 FROM timescaledb_information.hypertables h
130 LEFT JOIN LATERAL (
131 SELECT
132 SUM(before_compression_total_bytes)::bigint AS before_compression_total_bytes,
133 SUM(after_compression_total_bytes)::bigint AS after_compression_total_bytes
134 FROM hypertable_compression_stats(
135 (quote_ident(h.hypertable_schema) || '.' || quote_ident(h.hypertable_name))::regclass
136 )
137 ) d ON true
138 {where_clause}
139 ORDER BY h.hypertable_schema, h.hypertable_name
140 """
141 return client.execute_query(sql, params if params else None)
144def list_continuous_aggregates(client: PgClient) -> QueryResult:
145 sql = """
146 SELECT
147 ca.view_schema,
148 ca.view_name,
149 format('%I.%I', ca.hypertable_schema, ca.hypertable_name) AS source_hypertable,
150 ca.materialized_only,
151 ca.compression_enabled,
152 format('%I.%I', ca.materialization_hypertable_schema, ca.materialization_hypertable_name) AS materialization_hypertable,
153 ca.finalized
154 FROM timescaledb_information.continuous_aggregates ca
155 ORDER BY ca.view_schema, ca.view_name
156 """
157 return client.execute_query(sql)
160def list_retention_policies(client: PgClient) -> QueryResult:
161 sql = """
162 SELECT
163 j.hypertable_schema,
164 j.hypertable_name,
165 j.config->>'drop_after' AS drop_after,
166 j.schedule_interval::text AS schedule_interval,
167 js.last_run_started_at::text AS last_run_started_at,
168 js.next_start::text AS next_start
169 FROM timescaledb_information.jobs j
170 LEFT JOIN timescaledb_information.job_stats js
171 ON j.job_id = js.job_id
172 WHERE j.proc_name = 'policy_retention'
173 ORDER BY j.hypertable_schema, j.hypertable_name
174 """
175 return client.execute_query(sql)
178def list_refresh_status(client: PgClient) -> QueryResult:
179 sql = """
180 SELECT
181 j.hypertable_schema,
182 j.hypertable_name,
183 j.schedule_interval::text AS schedule_interval,
184 js.last_run_started_at::text AS last_run_started_at,
185 js.next_start::text AS next_start,
186 js.last_run_status,
187 js.total_runs,
188 js.total_successes,
189 js.total_failures
190 FROM timescaledb_information.jobs j
191 LEFT JOIN timescaledb_information.job_stats js
192 ON j.job_id = js.job_id
193 WHERE j.proc_name = 'policy_refresh_continuous_aggregate'
194 ORDER BY j.hypertable_schema, j.hypertable_name
195 """
196 return client.execute_query(sql)
199def list_jobs(client: PgClient) -> QueryResult:
200 sql = """
201 SELECT
202 j.job_id,
203 j.application_name,
204 CASE WHEN j.hypertable_schema IS NOT NULL
205 THEN format('%I.%I', j.hypertable_schema, j.hypertable_name)
206 ELSE NULL END AS hypertable,
207 j.schedule_interval::text AS schedule,
208 js.last_run_started_at::text AS last_run,
209 CASE WHEN js.last_run_started_at IS NOT NULL
210 AND js.last_run_started_at > '-infinity'::timestamptz
211 AND js.last_run_started_at < 'infinity'::timestamptz
212 THEN EXTRACT(EPOCH FROM (now() - js.last_run_started_at))
213 ELSE NULL END AS last_run_seconds,
214 js.last_run_status,
215 js.next_start::text AS next_start,
216 CASE WHEN js.next_start IS NOT NULL
217 AND js.next_start > '-infinity'::timestamptz
218 AND js.next_start < 'infinity'::timestamptz
219 THEN EXTRACT(EPOCH FROM (js.next_start - now()))
220 ELSE NULL END AS next_start_seconds,
221 js.total_runs,
222 js.total_successes,
223 js.total_failures
224 FROM timescaledb_information.jobs j
225 LEFT JOIN timescaledb_information.job_stats js
226 ON j.job_id = js.job_id
227 ORDER BY j.job_id
228 """
229 return client.execute_query(sql)
232_JOB_HISTORY_TABLES = [
233 "_timescaledb_internal.bgw_job_stat_history",
234 "_timescaledb_internal.bgw_job_stat",
235]
238def list_job_history(
239 client: PgClient,
240 *,
241 job_id: int | None = None,
242) -> QueryResult:
243 """Try bgw_job_stat_history first, fall back to bgw_job_stat."""
244 where_clause = "WHERE job_id = %(job_id)s" if job_id else ""
245 params: dict[str, Any] = {"job_id": job_id} if job_id else {}
247 for table_name in _JOB_HISTORY_TABLES:
248 try:
249 sql = f"""
250 SELECT
251 job_id,
252 succeeded,
253 execution_start::text,
254 execution_finish::text,
255 data::text AS error_data
256 FROM {table_name}
257 {where_clause}
258 ORDER BY execution_start DESC
259 LIMIT 20
260 """
261 return client.execute_query(sql, params if params else None)
262 except Exception: # noqa: BLE001 - table may not exist in this TS version
263 continue
265 raise SqlToolError("No job history table found")
268def list_compression_settings(
269 client: PgClient,
270 *,
271 schema: str | None = None,
272 table: str | None = None,
273 effective_schema: str | None = None,
274 include_policy: bool = True,
275) -> QueryResult:
276 """Query segment_by/order_by settings, optionally joined with policy schedule."""
277 conditions: list[str] = []
278 params: dict[str, Any] = {}
280 if schema and table:
281 conditions.append("cs.hypertable_schema = %(schema)s")
282 conditions.append("cs.hypertable_name = %(table)s")
283 params = {"schema": schema, "table": table}
284 elif effective_schema:
285 conditions.append("cs.hypertable_schema = %(schema)s")
286 params = {"schema": effective_schema}
288 where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
290 policy_join = ""
291 policy_select = ""
292 if include_policy:
293 policy_join = """
294 LEFT JOIN (
295 SELECT
296 j.hypertable_schema AS p_schema,
297 j.hypertable_name AS p_table,
298 j.config->>'compress_after' AS compress_after,
299 j.schedule_interval::text AS sched,
300 js.last_run_started_at::text AS last_run,
301 js.next_start::text AS next_start,
302 js.last_run_status AS status
303 FROM timescaledb_information.jobs j
304 LEFT JOIN timescaledb_information.job_stats js ON j.job_id = js.job_id
305 WHERE j.proc_name = 'policy_compression'
306 ) pol ON pol.p_schema = cs.hypertable_schema AND pol.p_table = cs.hypertable_name
307 """
308 policy_select = """,
309 pol.compress_after,
310 pol.sched,
311 pol.last_run,
312 pol.next_start,
313 pol.status"""
315 sql = f"""
316 SELECT
317 cs.hypertable_schema,
318 cs.hypertable_name,
319 d.column_name AS time_col,
320 d.time_interval::text AS chunk_iv,
321 STRING_AGG(
322 CASE WHEN cs.segmentby_column_index IS NOT NULL THEN cs.attname END,
323 ', ' ORDER BY cs.segmentby_column_index
324 ) AS segment_by,
325 STRING_AGG(
326 CASE WHEN cs.orderby_column_index IS NOT NULL THEN
327 cs.attname ||
328 CASE WHEN cs.orderby_asc THEN '' ELSE ' DESC' END ||
329 CASE WHEN cs.orderby_nullsfirst THEN ' NULLS FIRST' ELSE '' END
330 END,
331 ', ' ORDER BY cs.orderby_column_index
332 ) AS order_by{policy_select}
333 FROM timescaledb_information.compression_settings cs
334 LEFT JOIN timescaledb_information.dimensions d
335 ON cs.hypertable_schema = d.hypertable_schema
336 AND cs.hypertable_name = d.hypertable_name
337 AND d.dimension_number = 1
338 {policy_join}
339 {where_clause}
340 GROUP BY cs.hypertable_schema, cs.hypertable_name, d.column_name, d.time_interval
341 {", pol.compress_after, pol.sched, pol.last_run, pol.next_start, pol.status" if include_policy else ""}
342 ORDER BY cs.hypertable_schema, cs.hypertable_name
343 """
344 return client.execute_query(sql, params if params else None)
347# ---------------------------------------------------------------------------
348# Mutation operations
349# ---------------------------------------------------------------------------
352def alter_compression_settings(
353 client: PgClient,
354 fqn: str,
355 *,
356 segmentby: str | None = None,
357 orderby: str | None = None,
358) -> None:
359 parts = []
360 if segmentby:
361 parts.append(f"timescaledb.compress_segmentby = '{segmentby}'")
362 if orderby:
363 parts.append(f"timescaledb.compress_orderby = '{orderby}'")
364 if parts:
365 client.execute_query(f"ALTER TABLE {fqn} SET ({', '.join(parts)})")
368def set_compression_enabled(client: PgClient, fqn: str, *, enabled: bool) -> None:
369 if enabled:
370 client.execute_query(f"ALTER TABLE {fqn} SET (timescaledb.compress)")
371 else:
372 client.execute_query(f"ALTER TABLE {fqn} SET (timescaledb.compress = false)")
375def add_compression_policy(
376 client: PgClient,
377 schema: str,
378 table: str,
379 compress_after: str,
380 *,
381 schedule: str | None = None,
382) -> int | None:
383 """Returns job_id if created, None otherwise."""
384 if schedule:
385 sql = """
386 SELECT add_compression_policy(
387 (quote_ident(%(schema)s) || '.' || quote_ident(%(table)s))::regclass,
388 compress_after => %(after)s::interval,
389 schedule_interval => %(schedule)s::interval,
390 if_not_exists => true
391 )
392 """
393 params: dict[str, Any] = {
394 "schema": schema,
395 "table": table,
396 "after": compress_after,
397 "schedule": schedule,
398 }
399 else:
400 sql = """
401 SELECT add_compression_policy(
402 (quote_ident(%(schema)s) || '.' || quote_ident(%(table)s))::regclass,
403 compress_after => %(after)s::interval,
404 if_not_exists => true
405 )
406 """
407 params = {"schema": schema, "table": table, "after": compress_after}
409 result = client.execute_query(sql, params)
410 return result.rows[0][0] if result.rows else None
413def remove_compression_policy(client: PgClient, schema: str, table: str) -> None:
414 sql = """
415 SELECT remove_compression_policy(
416 (quote_ident(%(schema)s) || '.' || quote_ident(%(table)s))::regclass
417 )
418 """
419 client.execute_query(sql, {"schema": schema, "table": table})
422def set_chunk_time_interval(
423 client: PgClient, schema: str, table: str, interval: str
424) -> None:
425 sql = """
426 SELECT set_chunk_time_interval(
427 (quote_ident(%(schema)s) || '.' || quote_ident(%(table)s))::regclass,
428 %(interval)s::interval
429 )
430 """
431 client.execute_query(sql, {"schema": schema, "table": table, "interval": interval})
434def count_compressed_chunks(client: PgClient, schema: str, table: str) -> int:
435 sql = """
436 SELECT COUNT(*) FROM timescaledb_information.chunks
437 WHERE hypertable_schema = %(schema)s
438 AND hypertable_name = %(table)s
439 AND is_compressed = true
440 """
441 result = client.execute_query(sql, {"schema": schema, "table": table})
442 return result.rows[0][0] if result.rows else 0
445def list_compressed_chunk_names(client: PgClient, schema: str, table: str) -> list[str]:
446 sql = """
447 SELECT chunk_name FROM timescaledb_information.chunks
448 WHERE hypertable_schema = %(schema)s
449 AND hypertable_name = %(table)s
450 AND is_compressed = true
451 ORDER BY range_start
452 """
453 result = client.execute_query(sql, {"schema": schema, "table": table})
454 return [row[0] for row in result.rows]
457def recompress_chunk(client: PgClient, chunk_name: str) -> None:
458 chunk_fqn = f"_timescaledb_internal.{chunk_name}"
459 client.execute_query(f"SELECT decompress_chunk('{chunk_fqn}'::regclass)")
460 client.execute_query(f"SELECT compress_chunk('{chunk_fqn}'::regclass)")
463def list_chunk_info(
464 client: PgClient, schema: str, table: str
465) -> list[tuple[str, bool]]:
466 sql = """
467 SELECT chunk_name, is_compressed
468 FROM timescaledb_information.chunks
469 WHERE hypertable_schema = %(schema)s
470 AND hypertable_name = %(table)s
471 ORDER BY range_start
472 """
473 result = client.execute_query(sql, {"schema": schema, "table": table})
474 return [(row[0], row[1]) for row in result.rows]
477def compress_single_chunk(client: PgClient, chunk_name: str) -> None:
478 chunk_fqn = f"_timescaledb_internal.{chunk_name}"
479 client.execute_query(f"SELECT compress_chunk('{chunk_fqn}'::regclass)")
482def parse_chunk_id(chunk_name: str) -> int | None:
483 """Extract numeric chunk ID from chunk name like '_hyper_16_11420_chunk'."""
484 parts = chunk_name.split("_")
485 if len(parts) >= 4:
486 try:
487 return int(parts[-2])
488 except ValueError:
489 pass
490 return None