Coverage for frappe_manager / utils / subprocess.py: 89%
57 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
« prev ^ index » next coverage.py v7.13.5, created at 2026-07-02 18:13 +0530
1"""
2Generic subprocess streaming utilities.
4This module provides generic command execution with real-time output streaming
5that works for any subprocess (Docker, acme.sh, git, npm, etc.).
6"""
8import os
9from collections.abc import Iterator
10from queue import Queue
11from subprocess import PIPE, Popen
12from threading import Thread
14from frappe_manager.logger import log
17def reader(pipe, pipe_name: str, queue: Queue):
18 """
19 Reads lines from a pipe and puts them into a queue.
21 This function runs in a daemon thread to continuously read from
22 stdout or stderr without blocking the main process.
24 Args:
25 pipe: The pipe to read from (stdout or stderr)
26 pipe_name: Name identifier ("stdout" or "stderr")
27 queue: Queue to put the lines into
28 """
29 logger = log.get_logger()
30 try:
31 buf = b""
32 with pipe:
33 while True:
34 chunk = pipe.read(4096)
35 if not chunk:
36 if buf:
37 line = buf.decode(errors="replace").strip("\r\n")
38 if line:
39 logger.debug(line)
40 queue.put((pipe_name, line.encode()))
41 break
42 buf += chunk
43 while True:
44 for sep in (b"\n", b"\r"):
45 idx = buf.find(sep)
46 if idx != -1:
47 line = buf[:idx].decode(errors="replace").strip("\r\n")
48 buf = buf[idx + 1 :]
49 if line:
50 logger.debug(line)
51 queue.put((pipe_name, line.encode()))
52 break
53 else:
54 break
55 finally:
56 queue.put(None)
59def stream_command_output(
60 cmd: list,
61 env: dict[str, str] | None = None,
62 cwd: str | None = None,
63) -> Iterator[tuple[str, bytes]]:
64 """
65 Execute a command and stream stdout/stderr output in real-time.
67 This is a generic subprocess streaming function that works for any command.
68 It yields (source, line) tuples as output is produced, then yields
69 ("exit_code", code) when the process completes.
71 Unlike stream_stdout_and_stderr in utils/docker.py, this function does NOT
72 raise exceptions on non-zero exit codes. It simply yields the exit code,
73 allowing the caller to decide how to handle failures.
75 Args:
76 cmd: Command to execute as list of strings
77 env: Environment variables (merged with os.environ if provided)
78 cwd: Working directory for command execution
80 Yields:
81 Tuple[str, bytes]:
82 - ("stdout", line) for stdout output
83 - ("stderr", line) for stderr output
84 - ("exit_code", code) when process completes
86 Example:
87 >>> for source, line in stream_command_output(["echo", "hello"]):
88 ... if source == "exit_code":
89 ... exit_code = int(line.decode())
90 ... print(f"Exited with: {exit_code}")
91 ... else:
92 ... print(f"{source}: {line.decode()}")
94 Note:
95 This function uses daemon threads to read from stdout/stderr pipes,
96 preventing deadlocks when the process produces large amounts of output.
97 """
98 logger = log.get_logger()
99 logger.debug("- -" * 10)
100 logger.debug(f"COMMAND: {' '.join(cmd)}")
102 # Prepare environment
103 if env is not None:
104 subprocess_env = dict(os.environ)
105 subprocess_env.update(env)
106 else:
107 subprocess_env = None
109 # Convert all elements to strings
110 cmd = list(map(str, cmd))
112 # Start process with pipes
113 process = Popen(cmd, stdout=PIPE, stderr=PIPE, env=subprocess_env, cwd=cwd)
115 # Setup queue and reader threads
116 q = Queue()
118 # Use daemon threads to avoid hanging on ctrl+c
119 stdout_thread = Thread(target=reader, args=[process.stdout, "stdout", q])
120 stdout_thread.daemon = True
121 stdout_thread.start()
123 stderr_thread = Thread(target=reader, args=[process.stderr, "stderr", q])
124 stderr_thread.daemon = True
125 stderr_thread.start()
127 # Yield output as it arrives
128 for _ in range(2): # Wait for both threads to finish
129 for source, line in iter(q.get, None):
130 yield source, line
132 # Wait for process to complete and yield exit code
133 exit_code = process.wait()
135 logger.debug(f"RETURN CODE: {exit_code}")
136 logger.debug("- -" * 10)
138 yield ("exit_code", str(exit_code).encode())