Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter-driver-http/jumpstarter_driver_http/driver.py: 59%

75 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 15:50 +0200

1import os 

2from dataclasses import dataclass, field 

3from typing import Optional 

4 

5import anyio 

6import anyio.from_thread 

7from aiohttp import web 

8from jumpstarter_driver_opendal.driver import Opendal 

9 

10from jumpstarter.common.ipaddr import get_ip_address 

11from jumpstarter.driver import Driver, export 

12 

13 

14class HttpServerError(Exception): 

15 """Base exception for HTTP server errors""" 

16 

17 

18class FileWriteError(HttpServerError): 

19 """Exception raised when file writing fails""" 

20 

21 

22@dataclass(kw_only=True) 

23class HttpServer(Driver): 

24 """HTTP Server driver for Jumpstarter""" 

25 

26 root_dir: str = "/var/www" 

27 host: str | None = field(default=None) 

28 port: int = 8080 

29 timeout: int = field(default=600) 

30 app: web.Application = field(init=False, default_factory=web.Application) 

31 runner: Optional[web.AppRunner] = field(init=False, default=None) 

32 

33 def __post_init__(self): 

34 if hasattr(super(), "__post_init__"): 

35 super().__post_init__() 

36 

37 os.makedirs(self.root_dir, exist_ok=True) 

38 

39 self.children["storage"] = Opendal(scheme="fs", kwargs={"root": self.root_dir}) 

40 self.app.router.add_routes( 

41 [ 

42 web.static("/", self.root_dir), 

43 ] 

44 ) 

45 if self.host is None: 

46 self.host = get_ip_address(logger=self.logger) 

47 

48 @classmethod 

49 def client(cls) -> str: 

50 """Return the import path of the corresponding client""" 

51 return "jumpstarter_driver_http.client.HttpServerClient" 

52 

53 @export 

54 async def start(self): 

55 """ 

56 Start the HTTP server. 

57 

58 Raises: 

59 HttpServerError: If the server fails to start. 

60 """ 

61 if self.runner is not None: 

62 self.logger.warning("HTTP server is already running.") 

63 return 

64 

65 self.runner = web.AppRunner(self.app) 

66 if self.runner: 

67 await self.runner.setup() 

68 

69 site = web.TCPSite(self.runner, self.host, self.port) 

70 await site.start() 

71 self.logger.info(f"HTTP server started at http://{self.host}:{self.port}") 

72 

73 @export 

74 async def stop(self): 

75 """ 

76 Stop the HTTP server. 

77 

78 Raises: 

79 HttpServerError: If the server fails to stop. 

80 """ 

81 if self.runner is None: 

82 self.logger.warning("HTTP server is not running.") 

83 return 

84 

85 await self.runner.cleanup() 

86 self.logger.info("HTTP server stopped.") 

87 self.runner = None 

88 

89 @export 

90 def get_url(self) -> str: 

91 """ 

92 Get the base URL of the HTTP server. 

93 

94 Returns: 

95 str: Base URL of the HTTP server. 

96 """ 

97 return f"http://{self.host}:{self.port}" 

98 

99 @export 

100 def get_host(self) -> str | None: 

101 """ 

102 Get the host IP address of the HTTP server. 

103 

104 Returns: 

105 str: Host IP address. 

106 """ 

107 return self.host 

108 

109 @export 

110 def get_port(self) -> int: 

111 """ 

112 Get the port number of the HTTP server. 

113 

114 Returns: 

115 int: Port number. 

116 """ 

117 return self.port 

118 

119 def close(self): 

120 if self.runner: 

121 try: 

122 if anyio.get_current_task(): 

123 anyio.from_thread.run(self._async_cleanup) 

124 except Exception as e: 

125 self.logger.warning(f"HTTP server cleanup failed synchronously: {e}") 

126 self.runner = None 

127 super().close() 

128 

129 async def _async_cleanup(self): 

130 try: 

131 if self.runner: 

132 await self.runner.shutdown() 

133 await self.runner.cleanup() 

134 self.logger.info("HTTP server cleanup completed asynchronously.") 

135 except Exception as e: 

136 self.logger.error(f"HTTP server cleanup failed asynchronously: {e}")