A-A+
非异步async函数调用异步函数

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】
如果你有一个非异步(同步)函数,但需要调用异步的 `websocket.send(content)`,你有几种选择来处理这个情况。
方法1: 使用 asyncio.run() 或 asyncio.get_event_loop().run_until_complete()
如果你在一个同步函数中,并且只需要调用一次异步操作,你可以使用 asyncio.run() 或 asyncio.get_event_loop().run_until_complete()。
import asyncio
def handle_logic_sync(content, websocket):
# 同步处理逻辑
processed_content = f"Processed: {content}"
# 使用 asyncio.run 调用异步方法
asyncio.run(websocket_send(processed_content, websocket))
async def websocket_send(content, websocket):
await websocket.send(content)
在这种情况下,你的同步函数 handle_logic_sync 通过 asyncio.run() 来调用异步的 websocket.send()。
方法2: 在已有的事件循环中使用 asyncio.ensure_future()
如果你的同步函数是在一个已经运行的异步上下文中调用的,那么可以使用 asyncio.ensure_future() 来调度异步任务。
import asyncio
def handle_logic_sync(content, websocket):
# 同步处理逻辑
processed_content = f"Processed: {content}"
# 在当前事件循环中创建任务
asyncio.ensure_future(websocket_send(processed_content, websocket))
async def websocket_send(content, websocket):
await websocket.send(content)
`asyncio.ensure_future()` 会将任务放入事件循环中,但不会阻塞当前的同步函数。
方法3: 使用 run_in_executor() 在线程中运行同步代码
如果你希望在一个独立的线程中运行同步代码,并且在执行异步任务之前进行一些同步计算,可以使用 run_in_executor()。
import asyncio
def handle_logic_sync(content, websocket):
# 同步处理逻辑
processed_content = f"Processed: {content}"
# 在事件循环中异步调用
loop = asyncio.get_event_loop()
loop.run_in_executor(None, lambda: asyncio.run(websocket_send(processed_content, websocket)))
async def websocket_send(content, websocket):
await websocket.send(content)
在这种情况下,你的同步逻辑会在一个单独的线程中运行,然后在事件循环中异步调用 websocket.send()。
方法4: 使用 asyncio.to_thread(Python 3.9+)
如果你使用 Python 3.9 或更高版本,可以使用 asyncio.to_thread() 来将同步代码转移到后台线程中运行。
import asyncio
def handle_logic_sync(content, websocket):
# 同步处理逻辑
processed_content = f"Processed: {content}"
# 使用 asyncio.to_thread 调用异步方法
asyncio.run(asyncio.to_thread(websocket_send, processed_content, websocket))
async def websocket_send(content, websocket):
await websocket.send(content)
这允许你在异步代码中更方便地调用同步函数,并在它们完成后继续异步执行。
### 总结
选择合适的方法取决于你的应用场景。如果你需要在一个已经运行的事件循环中调用异步代码,asyncio.ensure_future() 通常是最合适的方法。如果你只是在同步代码中简单地调用异步操作,可以考虑使用 asyncio.run()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from pydantic import BaseModel
from typing import Dict, List
import websockets
app = FastAPI()
# 用于存储 WebSocket 连接及其对应的监听任务和消息列表
connections: Dict[str, Dict] = {}
class Credentials(BaseModel):
username: str
password: str
class Message(BaseModel):
username: str
content: str
async def websocket_listener(username: str, websocket):
while True:
try:
message = await websocket.recv()
connections[username]["messages"].append(message)
except websockets.ConnectionClosed:
break
async def send_message_and_wait(username, message):
try:
# 清空该用户的消息列表
connections[username]["messages"] = []
# 发送消息到 WebSocket 服务器
await connections[username]['websocket'].send(message)
# 等待 3 秒内没有收到新的消息后再继续
await asyncio.sleep(3)
return connections[username]["messages"]
except Exception as e:
print(f"Error while sending message: {e}")
@app.post("/login/")
async def login(credentials: Credentials):
username = credentials.username
password = credentials.password
if username in connections:
raise HTTPException(status_code=400, detail="User is already connected")
try:
websocket = await websockets.connect('wss://woj.app/')
connections[username] = {
"websocket": websocket,
"messages": [],
"listener_task": asyncio.create_task(websocket_listener(username, websocket))
}
# 发送账号和密码到 WebSocket 服务器
await send_message_and_wait(username, f"{username}:{password}")
# 返回所有接收到的消息
return {"message": connections[username]["messages"]}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to connect: {str(e)}")
@app.get("/messages/{username}")
async def get_messages(username: str):
if username not in connections:
raise HTTPException(status_code=404, detail="User not connected")
return {"messages": connections[username]["messages"]}
@app.post("/send/")
async def send_message(message: Message):
username = message.username
content = message.content
if username not in connections:
raise HTTPException(status_code=404, detail="User not connected")
# 发送消息并等待响应
response = await send_message_and_wait(username, content)
return {"messages": response}
@app.post("/logout/{username}")
async def logout(username: str):
if username not in connections:
raise HTTPException(status_code=404, detail="User not connected")
websocket = connections[username]["websocket"]
listener_task = connections[username]["listener_task"]
# 关闭 WebSocket 连接
await websocket.close()
listener_task.cancel()
# 删除用户连接
del connections[username]
return {"message": "Connection closed"}
# 启动应用
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |
布施恩德可便相知重
微信扫一扫打赏
支付宝扫一扫打赏