200 行 ,一个PYQT 窗口 + 后台 AIOHTTP 服务 , 例子
作者:互联网
直接上代码
import sys
from typing import Dict, List
from aiohttp import web
import asyncio
from functools import wraps
from blinker import Signal
from PySide6 import QtCore as qc
from PySide6 import QtGui as qg
from PySide6 import QtWidgets as qw
def get(path):
def _f1(fn):
@wraps(fn)
async def _f2(self: "Routes", *args, **kw):
async with self.lock:
self.count += 1
if self.count >= self.max:
self.max = self.count
ret = await fn(self, *args, **kw)
async with self.lock:
self.count -= 1
return ret
_f2.path = path
_f2.method = "get"
return _f2
return _f1
class Routes:
def __init__(self, app: web.Application) -> None:
self.lock = asyncio.Lock()
self.count = 0
self.max = 0
self.add_routes(app)
@get("/")
async def get_g1(self, request):
return {1: 2, 2: 3, 3: "中国"}
def add_routes(self, app: web.Application):
get_lst = []
for name in self.__dir__():
if name.startswith("get_"):
att = getattr(self, name)
if callable(att):
assert hasattr(att, "path")
assert hasattr(att, "method")
path = getattr(att, "path")
get_lst.append(web.get(path, att))
app.add_routes(get_lst)
async def logger_mdw(app, handler):
async def _h1(request: web.Request):
print(request.method, request.path)
return await handler(request)
return _h1
async def reponse_mdw(app, handler):
async def _h1(request: web.Request):
ret = await handler(request)
print(type(ret))
if isinstance(ret, web.Response):
print(3)
return ret
elif isinstance(ret, (str, int, float)):
print(1)
return web.Response(text=str(ret))
elif isinstance(ret, (Dict, List)):
return web.json_response(ret)
else:
print(4)
return web.Response(text=str(ret))
return _h1
class sv(qc.QObject):
sg = Signal()
def __init__(self, loop: asyncio.AbstractEventLoop, ft: asyncio.Future) -> None:
super().__init__()
self.loop = loop
self.ft = ft
async def run_server(self):
server = web.Application()
server.middlewares.append(logger_mdw)
server.middlewares.append(reponse_mdw)
Routes(server)
runner = web.AppRunner(server)
# 3----------------------------------
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", 8080)
await site.start()
print(f"======= Serving on http://{site._host}:{site._port}/ ======")
return site
async def server_container(self):
site = await self.run_server()
await self.ft # 等待约定的信号传过来
await site.stop()
self.sg.send()
def start(self):
self.loop.run_until_complete(self.server_container())
def stop(self):
self.loop.call_soon_threadsafe(self.ft.set_result, None)
class newth(qc.QThread):
v: sv
def run(self):
loop = asyncio.new_event_loop()
ft = asyncio.Future(loop=loop)
self.v = sv(loop, ft)
self.v.moveToThread(self)
try:
self.v.start()
except Exception as e:
print(e)
finally:
print("server finished ..........")
def stop(self):
if hasattr(self, "v") and self.v is not None:
self.v.stop()
class mw(qw.QDialog):
def __init__(self) -> None:
super().__init__()
ly = qw.QHBoxLayout()
self.setLayout(ly)
self.b1 = qw.QPushButton("start")
self.b1.clicked.connect(self.on_b1_clicked)
ly.addWidget(self.b1)
self.resize(200, 200)
self.th = None
def on_b1_clicked(self):
self.b1.setEnabled(False)
if self.th is None:
self.th = newth()
self.th.started.connect(self.b1_update)
self.th.finished.connect(self.b1_update)
self.th.start()
else:
self.th.stop()
def b1_update(self):
if self.th is None or self.th.isFinished():
self.th = None
self.b1.setEnabled(True)
self.b1.setText("start")
else:
self.b1.setEnabled(True)
self.b1.setText("stop")
def closeEvent(self, arg__1: qg.QCloseEvent) -> None:
if self.th is not None:
self.on_b1_clicked()
arg__1.ignore()
else:
arg__1.accept()
if __name__ == "__main__":
app = qw.QApplication()
ss = mw()
ss.show()
app.exec()
sys.exit()
标签:__,200,return,AIOHTTP,web,self,PYQT,b1,def 来源: https://www.cnblogs.com/unm001/p/16217525.html