其他分享
首页 > 其他分享> > Flask+Vue+Echarts+Mysql+websocket 实战(四)

Flask+Vue+Echarts+Mysql+websocket 实战(四)

作者:互联网

一、后台编写

思路:本质是数据的展示,因此只是调用数据库查询方法给前端使用即可。由于物联网水质监测仪目前还未调试好,因此模拟数据采集到数据库,做一个定时器做数据插入的功能(预计本周可以调试完毕,到时候传感器会定时采集数据传入服务器数据库,和此效果相同,先预留读取接口),然后websocket保持对服务器的读取,查询到数据返回给前端。

db_operate.py

import sys
sys.path.append(r"E:pycharm2020projectsplatform1.0")
import json
import time
import random

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import pymysql
import threading

from util.ecodings import class_to_dict, Decimal_and_DateEncoder
from util.settings import DevelopmentConfig

pymysql.install_as_MySQLdb()

app = Flask(__name__)
# 读取配置,包含数据库配置
app.config.from_object(DevelopmentConfig)

# 创建数据库sqlalchemy工具对象
db = SQLAlchemy(app)


# Flask从数据库已有表自动生成
# model flask-sqlacodegen "mysql+pymysql://root:root@120.78.94.58:3306/monitor_sys_data" --tables
# monitor_data --outfile "model.py" --flask
class MonitorDatum(db.Model):
    __tablename__ = monitor_data

    id = db.Column(db.Integer, primary_key=True)
    date_time = db.Column(db.DateTime)
    water_type = db.Column(db.Float(asdecimal=True))
    device_id = db.Column(db.Float(asdecimal=True))
    temperature = db.Column(db.Float(asdecimal=True))
    ph = db.Column(db.Float(asdecimal=True))
    solinity = db.Column(db.Float(asdecimal=True))
    dissolved_oxygen = db.Column(db.Float(asdecimal=True))
    light = db.Column(db.String(6))
    velocity = db.Column(db.String(6))
    data_type = db.Column(db.Float(asdecimal=True))


def insert():
    print("定时器启动了")
    print(threading.current_thread())  # 查看当前线程
    record_t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    temp = round(random.uniform(15, 25), 2)  # 产生一个0-40之间的数字,保留两位小数
    ph = round(random.uniform(7, 8), 3)
    sol = round(random.uniform(100, 150), 1)
    dod = round(random.uniform(5, 8), 2)
    # 预留光照,如果是传入为字符串,需要将小数转为字符串,再插入数据库
    ins = MonitorDatum(date_time=record_t, temperature=temp, ph=ph, solinity=sol, dissolved_oxygen=dod, light=30)
    db.session.add(ins)
    db.session.commit()
    print(插入成功!)
    timer = threading.Timer(5, insert)  # 在insert函数结束之前我再开启一个定时器
    timer.start()


def create():
    # 创建所有表
    db.create_all()


def drop():
    # 删除所有表
    db.drop_all()


def query():
    # 清空缓存
    db.session.commit()
    # 查询最近一条数据
    # 只有最后加.all()才能读到实例,order_by和limit是条件查询
    new = db.session.query(MonitorDatum).order_by(MonitorDatum.id.desc()).limit(1).all()
    # [{temperature: 23.18, id: 5, record_t: datetime.datetime(2022, 10, 8, 10, 41, 35)}]  list
    result = class_to_dict(new)
    # 取的时间json.dumps无法对字典中的datetime时间格式数据进行转化。因此需要添加特殊日期格式转化
    result[0] = json.loads(json.dumps(result[0], cls=Decimal_and_DateEncoder))
    # print(result[0])  # {temperature: 23.18, id: 5, record_t: "2022-10-08 10:41:35"}
    # 由于数据库中光照的类型只能为字符型"light": 30,在这里需要把光照转为int型,才能传给前端显示
    result[0][light] = int(result[0][light])
    return result[0]
    # {"temperature": 23.72, "id": 16, "water_type": null, "solinity": 102.1, "light": 30,
    # "data_type": null, "device_id": null, "date_time": "2022-10-17 00:12:37", "ph": 7.731,
    # "dissolved_oxygen": 6.97, "velocity": null}  返回是一个字典


if __name__ == __main__:
    # res = query()
    # print(res)
    # # print(type(res))
    # print(res[date_time])
    # print(type(res[date_time]))
    # print(res[temperature])
    # print(type(res[temperature]))
    # 创建一个定时器,在程序运行在之后我开启一个insert函数
    t1 = threading.Timer(5, function=insert)  # 第一个参数是时间,例:过5s之后我执行后面的一个函数,开启一个线程
    t1.start()
# 控制台运行,5s定时向数据库插入
python db_operate.py

app_db_data.py

import sys

sys.path.append(r"E:pycharm2020projectsplatform1.0")
from db_manage.db_operate import query
from threading import Lock
from flask import Flask, render_template
from flask_socketio import SocketIO
from flask_cors import CORS

app = Flask(__name__)
CORS(app)  # 跨域问题
app.config[SECRET_KEY] = secret!
socketio = SocketIO()
socketio.init_app(app, async_mode=None, cors_allowed_origins=*)
thread = None
thread_lock = Lock()


# 后台线程 产生数据,即刻推送至前端
def background_thread():
    count = 0
    while True:
        socketio.sleep(5)
        res = query()
        # 1个时间,5个水质参数
        record_t = res[date_time]
        temperature = res[temperature]
        ph = res[ph]
        sol = res[solinity]
        dod = res[dissolved_oxygen]
        light = res[light]
        socketio.emit(server_response,
                      {
          
   data: [record_t, temperature, ph, sol, dod, light]},
                      namespace=/test)


@socketio.on(conn, namespace=/test)
def test_connect():
    print(触发)
    global thread
    with thread_lock:
        if thread is None:
            thread = socketio.start_background_task(target=background_thread)


if __name__ == __main__:
    socketio.run(app, host=127.0.0.1, port=5000, debug=True)
# 控制台运行,服务器此时会不断地向前端推送数据(1个时间,5个水质参数)
python app_db_data.py

这是程序定时插入的数据库数据,后面开始光照强度是固定“30”,其余的都不需要。

二、前端编写

1.总体架构

vue就不介绍了,之前的文章都有写过,这里挑重要点写: 首先创建好vue项目后,登录啥的我也不写了,直接写Home.vue(主页),包含header(顶部栏)、sidebar(侧边栏)和content(主要内容)3个组件,顶部栏和侧边栏都是固定的,变化的是content,项目结构如下: 本demo只写主界面中的基地总览和水质检测两部分,前端设计的还很烂,慢慢完善吧。

标签:Flask,vue,echarts,mysql,WebSocket,实战
来源: