守护进程
使用原生subprocess
以下是一个简单的 Python 脚本示例,它使用 subprocess
模块来启动 FastAPI 应用,并监控其运行状态。如果 FastAPI 进程意外退出,该脚本将尝试重新启动它。
import subprocess
import time
import os
def start_fastapi_app():
# 假设你的 FastAPI 应用的主模块是 main.py,并且使用 uvicorn 运行
command = ['uvicorn', 'main:app', '--host', '0.0.0.0', '--port', '8000']
return subprocess.Popen(command)
def monitor_fastapi_process(process):
while True:
if process.poll() is not None: # 如果进程已退出
print("FastAPI 进程已退出,正在尝试重启...")
time.sleep(5) # 等待一段时间再重启
process = start_fastapi_app() # 重启 FastAPI 应用
time.sleep(1) # 每秒检查一次进程状态
if __name__ == "__main__":
process = start_fastapi_app()
monitor_fastapi_process(process)
使用python-diamond
import subprocess
import time
def start_fastapi():
# 启动 FastAPI 的命令
command = ['uvicorn', 'main:app', '--host', '0.0.0.0', '--port', '8000']
subprocess.Popen(command)
def is_fastapi_running():
# 检查 FastAPI 是否正在运行的逻辑(可能需要实现更复杂的检查)
# 这里只是一个简单的示例
try:
# 你可以尝试连接到 FastAPI 的某个端点或使用其他方法
pass # 实现你的检查逻辑
return True
except:
return False
def main():
while True:
if not is_fastapi_running():
print("FastAPI 进程未运行,正在尝试重启...")
start_fastapi()
time.sleep(5) # 每隔一段时间检查一次
if __name__ == "__main__":
main()
使用Service
库
原地址:http://www.coolpython.net/informal_essay/20-09/easy-way-create-daemon.html
import time
import sys
from service import Service
def my_task():
"""
这是你的任务
"""
for i in range(10):
print('ok')
time.sleep(5)
class TaskService(Service):
def __init__(self, *args, **kwargs):
super(TaskService, self).__init__(*args, **kwargs)
def run(self):
my_task()
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.exit('Syntax: %s COMMAND' % sys.argv[0])
cmd = sys.argv[1].lower()
service = TaskService('my_service', pid_dir='/tmp')
if cmd == 'start':
service.start()
elif cmd == 'stop':
service.stop()
elif cmd == 'kill':
service.kill()
elif cmd == 'pid':
print(service.get_pid())
elif cmd == 'status':
if service.is_running():
print("Service is running.")
else:
print("Service is not running.")
else:
sys.exit('Unknown command "%s".' % cmd)