Skip to main content

守护进程

使用原生subprocess

以下是一个简单的 Python 脚本示例,它使用 subprocess 模块来启动 FastAPI 应用,并监控其运行状态。如果 FastAPI 进程意外退出,该脚本将尝试重新启动它。

import subprocess  
import time
import os

def start_fastapi_app():
# 假设你的 FastAPI 应用的主模块是 main.py,并且使用 uvicorn 运行
command = ['uvicorn', 'main:app', '--host', '0.0.0.0', '--port', '8000']
return subprocess.Popen(command)

def monitor_fastapi_process(process):
while True:
if process.poll() is not None: # 如果进程已退出
print("FastAPI 进程已退出,正在尝试重启...")
time.sleep(5) # 等待一段时间再重启
process = start_fastapi_app() # 重启 FastAPI 应用
time.sleep(1) # 每秒检查一次进程状态

if __name__ == "__main__":
process = start_fastapi_app()
monitor_fastapi_process(process)

使用python-diamond

import subprocess  
import time

def start_fastapi():
# 启动 FastAPI 的命令
command = ['uvicorn', 'main:app', '--host', '0.0.0.0', '--port', '8000']
subprocess.Popen(command)

def is_fastapi_running():
# 检查 FastAPI 是否正在运行的逻辑(可能需要实现更复杂的检查)
# 这里只是一个简单的示例
try:
# 你可以尝试连接到 FastAPI 的某个端点或使用其他方法
pass # 实现你的检查逻辑
return True
except:
return False

def main():
while True:
if not is_fastapi_running():
print("FastAPI 进程未运行,正在尝试重启...")
start_fastapi()
time.sleep(5) # 每隔一段时间检查一次

if __name__ == "__main__":
main()

使用Service

原地址:http://www.coolpython.net/informal_essay/20-09/easy-way-create-daemon.html

import time
import sys
from service import Service

def my_task():
"""
这是你的任务
"""
for i in range(10):
print('ok')
time.sleep(5)


class TaskService(Service):
def __init__(self, *args, **kwargs):
super(TaskService, self).__init__(*args, **kwargs)

def run(self):
my_task()

if __name__ == '__main__':

if len(sys.argv) != 2:
sys.exit('Syntax: %s COMMAND' % sys.argv[0])

cmd = sys.argv[1].lower()
service = TaskService('my_service', pid_dir='/tmp')

if cmd == 'start':
service.start()
elif cmd == 'stop':
service.stop()
elif cmd == 'kill':
service.kill()
elif cmd == 'pid':
print(service.get_pid())
elif cmd == 'status':
if service.is_running():
print("Service is running.")
else:
print("Service is not running.")
else:
sys.exit('Unknown command "%s".' % cmd)