Coverage for fss\common\config.py: 66%

61 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-12 22:20 +0800

1"""Configuration in the project""" 

2 

3import os 

4import subprocess 

5import time 

6from typing import Union, Tuple 

7from loguru import logger 

8from pathlib import Path as path 

9 

10from pydantic.v1 import BaseSettings 

11 

12current_file_path = os.path.abspath(__file__) 

13env_directory = path(current_file_path).parent.parent.parent 

14 

15 

16ENV_FILE = os.path.join(env_directory, ".env") 

17if not os.path.exists(ENV_FILE): 

18 ENV_FILE = os.path.join(env_directory, ".env.example") 

19 

20 

21class Configs(BaseSettings): 

22 app_name: str 

23 app_desc: str 

24 mode: str 

25 port: int 

26 access_token_expire_minutes: int = 60 * 24 # 24 hour 

27 refresh_token_expire_minutes: int = 60 * 24 * 30 # 30 days 

28 win_tz: str 

29 linux_tz: str 

30 workers: Union[str, int] 

31 backend_cors_origins: str 

32 white_list_routes: str 

33 algorithm: str 

34 secret_key: str 

35 log_file: str 

36 api_version: str 

37 sqlalchemy_database_url: str 

38 enable_redis: bool 

39 cache_pass: str 

40 cache_host: str 

41 cache_port: str 

42 enable_swagger: bool 

43 

44 class Config: 

45 env_file = ENV_FILE 

46 

47 

48configs = Configs() 

49 

50 

51def init_log() -> None: 

52 logger.add(configs.log_file) 

53 

54 

55def init_tz() -> None: 

56 if os.name == "nt": 

57 try: 

58 subprocess.run(["tzutil", "/s", configs.win_tz], check=True) 

59 except subprocess.CalledProcessError as e: 

60 logger.error(f"Error setting timezone: {e}") 

61 else: 

62 try: 

63 os.environ["TZ"] = configs.linux_tz 

64 time.tzset() 

65 except Exception as e: 

66 logger.error(f"Error setting timezone: {e}") 

67 

68 

69def complete() -> Tuple[int, int]: 

70 port = configs.port 

71 workers = configs.workers 

72 

73 if not isinstance(workers, int): 

74 try: 

75 workers = int(workers) 

76 except ValueError: 

77 import multiprocessing 

78 

79 cpu_count = multiprocessing.cpu_count() 

80 workers = max(cpu_count - 2, 1) 

81 return port, workers