Coverage for frappe_manager / utils / subprocess.py: 89%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-07-02 18:13 +0530

1""" 

2Generic subprocess streaming utilities. 

3 

4This module provides generic command execution with real-time output streaming 

5that works for any subprocess (Docker, acme.sh, git, npm, etc.). 

6""" 

7 

8import os 

9from collections.abc import Iterator 

10from queue import Queue 

11from subprocess import PIPE, Popen 

12from threading import Thread 

13 

14from frappe_manager.logger import log 

15 

16 

17def reader(pipe, pipe_name: str, queue: Queue): 

18 """ 

19 Reads lines from a pipe and puts them into a queue. 

20 

21 This function runs in a daemon thread to continuously read from 

22 stdout or stderr without blocking the main process. 

23 

24 Args: 

25 pipe: The pipe to read from (stdout or stderr) 

26 pipe_name: Name identifier ("stdout" or "stderr") 

27 queue: Queue to put the lines into 

28 """ 

29 logger = log.get_logger() 

30 try: 

31 buf = b"" 

32 with pipe: 

33 while True: 

34 chunk = pipe.read(4096) 

35 if not chunk: 

36 if buf: 

37 line = buf.decode(errors="replace").strip("\r\n") 

38 if line: 

39 logger.debug(line) 

40 queue.put((pipe_name, line.encode())) 

41 break 

42 buf += chunk 

43 while True: 

44 for sep in (b"\n", b"\r"): 

45 idx = buf.find(sep) 

46 if idx != -1: 

47 line = buf[:idx].decode(errors="replace").strip("\r\n") 

48 buf = buf[idx + 1 :] 

49 if line: 

50 logger.debug(line) 

51 queue.put((pipe_name, line.encode())) 

52 break 

53 else: 

54 break 

55 finally: 

56 queue.put(None) 

57 

58 

59def stream_command_output( 

60 cmd: list, 

61 env: dict[str, str] | None = None, 

62 cwd: str | None = None, 

63) -> Iterator[tuple[str, bytes]]: 

64 """ 

65 Execute a command and stream stdout/stderr output in real-time. 

66 

67 This is a generic subprocess streaming function that works for any command. 

68 It yields (source, line) tuples as output is produced, then yields 

69 ("exit_code", code) when the process completes. 

70 

71 Unlike stream_stdout_and_stderr in utils/docker.py, this function does NOT 

72 raise exceptions on non-zero exit codes. It simply yields the exit code, 

73 allowing the caller to decide how to handle failures. 

74 

75 Args: 

76 cmd: Command to execute as list of strings 

77 env: Environment variables (merged with os.environ if provided) 

78 cwd: Working directory for command execution 

79 

80 Yields: 

81 Tuple[str, bytes]: 

82 - ("stdout", line) for stdout output 

83 - ("stderr", line) for stderr output 

84 - ("exit_code", code) when process completes 

85 

86 Example: 

87 >>> for source, line in stream_command_output(["echo", "hello"]): 

88 ... if source == "exit_code": 

89 ... exit_code = int(line.decode()) 

90 ... print(f"Exited with: {exit_code}") 

91 ... else: 

92 ... print(f"{source}: {line.decode()}") 

93 

94 Note: 

95 This function uses daemon threads to read from stdout/stderr pipes, 

96 preventing deadlocks when the process produces large amounts of output. 

97 """ 

98 logger = log.get_logger() 

99 logger.debug("- -" * 10) 

100 logger.debug(f"COMMAND: {' '.join(cmd)}") 

101 

102 # Prepare environment 

103 if env is not None: 

104 subprocess_env = dict(os.environ) 

105 subprocess_env.update(env) 

106 else: 

107 subprocess_env = None 

108 

109 # Convert all elements to strings 

110 cmd = list(map(str, cmd)) 

111 

112 # Start process with pipes 

113 process = Popen(cmd, stdout=PIPE, stderr=PIPE, env=subprocess_env, cwd=cwd) 

114 

115 # Setup queue and reader threads 

116 q = Queue() 

117 

118 # Use daemon threads to avoid hanging on ctrl+c 

119 stdout_thread = Thread(target=reader, args=[process.stdout, "stdout", q]) 

120 stdout_thread.daemon = True 

121 stdout_thread.start() 

122 

123 stderr_thread = Thread(target=reader, args=[process.stderr, "stderr", q]) 

124 stderr_thread.daemon = True 

125 stderr_thread.start() 

126 

127 # Yield output as it arrives 

128 for _ in range(2): # Wait for both threads to finish 

129 for source, line in iter(q.get, None): 

130 yield source, line 

131 

132 # Wait for process to complete and yield exit code 

133 exit_code = process.wait() 

134 

135 logger.debug(f"RETURN CODE: {exit_code}") 

136 logger.debug("- -" * 10) 

137 

138 yield ("exit_code", str(exit_code).encode())