跳转到主要内容HOME
KOLIKO / 20262024年10月29日 / 7 min read

Flask后端框架

介绍flask这个后端框架的使用经验

2024年10月29日7 min read
未分类

Flask后端框架

项目部署

安装环境

先到宝塔面板上软件商店找到并且安装python的项目管理器,以及进程守护管理器

image-20230429232505231

image-20230429234214320

安装好之后打开python项目管理器软件,在左侧的版本管理中安装自己需要的python版本

image-20230429232538770

部署项目

点击添加项目,按图所示进行部署(这里的前提是你已经把flask项目放到了你的自己选择的文件夹,图中所示我放到的是/www/WEB的文件夹当中)

<img src="images/Flask.assets/image-20230429232713012.png" alt="image-20230429232713012" style="zoom: 80%;" />

值得注意的是,我们如果需要他自动帮我们装一些module的话,需要在项目文件夹里面新建一个requirements.txt,然后写上我们需要的模块比如这样,建议这一步在写代码的时候就新建文件一起写上。

image-20230429232943504

最后添加项目和安装模块成功,正常的话可以通过ip进行访问,或者你的域名绑定了服务器应该也可以直接通过http进行域名访问

添加ssl证书

使用的过程中,因为有时候写小程序会需要用到域名和ssl证书,这里简单介绍一下如何添加证书

申请证书

这里可以在腾讯云之类的地方申请一个免费的ssl证书,拿到之后下载nginx版本的证书就好。

参考宝塔linux配置nginx HTTPS证书和域名进行操作

使用nginx进行反向代理

image-20230430104128521

在nginx的文件夹里新建.conf文件,进行配置的代理,这里我使用的是单域名多端口的分配,使用/flask路径来分发到5000端口的flask项目中,这里的/默认分发的是springboot的项目8080端口。

1server{ 2 #端口 3 listen 443 ssl; 4 #域名 5 ##需要修改 6 server_name www.zgmzgm.top zgmzgm.top; 7 #配置可以使用访问 8 9 10 #日志配置 11 if ($time_iso8601 ~ "^(\d{4})-(\d{2})-(\d{2})") { 12 set $year $1; 13 set $month $2; 14 } 15 ##需要修改 16 access_log /www/WEB/logs/access_$year-$month.log; 17 18 #证书 19 ##需要修改 20 ssl_certificate /www/server/panel/vhost/cert/zgmzgm.top/zgmzgm.top_bundle.pem; 21 ssl_certificate_key /www/server/panel/vhost/cert/zgmzgm.top/zgmzgm.top.key; 22 23 #请求超时时间 24 ssl_session_timeout 5m; 25 #协议 26 ssl_protocols TLSv1 TLSv1.1 TLSv1.2; 27 #加密算法 28 ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:HIGH:!aNULL:!MD5:!RC4:!DHE; 29 ssl_prefer_server_ciphers on; 30 location / { 31 proxy_set_header Host $host; 32 proxy_set_header X-Real-IP $remote_addr; 33 proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; 34 ##需要修改 35 proxy_pass http://127.0.0.1:8080/; 36 37 } 38 39 location /flask/ { 40 proxy_set_header Host $host; 41 proxy_set_header X-Real-IP $remote_addr; 42 proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; 43 ##需要修改 44 proxy_pass http://127.0.0.1:5000/; 45 46 } 47} 48 49 50 51#将http转https 52server{ 53 listen 80; 54 ##需要修改 55 server_name www.zgmzgm.top zgmzgm.top; 56 #配置可以使用(www.)domain.com访问 57 #server_name www.domain.com domain.com; 58 59 ##需要修改 60 rewrite ^/(.*)$ https://www.zgmzgm.top:443/$1 permanent; 61 62}

如何写接口?

参考

Python 查询数据库数据转成json格式输出到文件

GET请求

这里简单展示一个获取所有的示例

1 def selectAll(): 2 # 连接数据库 3 conn = pymysql.connect(host='42.193.160.52', # 连接名称,默认127.0.0.1 4 user='dianxinbei', # 用户名 5 password='A8SdsZAWMpGkKmGb', # 密码 6 port=3306, # 端口,默认为3306 7 db='dianxinbei', # 数据库名称 8 charset='utf8' # 字符编码 9 ) 10 11 cur = conn.cursor() # 生成游标对象 12 sql = "select * from team" 13 try: 14 cur.execute(sql) 15 data = cur.fetchall() 16 17 cur.close() # 关闭游 p标 18 conn.close() # 关闭连接 19 jsonData = [] 20 for row in data: 21 result = {} 22 result['stu1_id'] = row[0] 23 result['stu2_id'] = row[1] 24 result['stu3_id'] = row[2] 25 result['stu1_name'] = row[3] 26 result['stu2_name'] = row[4] 27 result['stu3_name'] = row[5] 28 result['stu1_phone'] = row[6] 29 result['team_title'] = row[7] 30 result['team_name'] = row[8] 31 result['team_school'] = row[9] 32 result['team_id'] = row[10] 33 result['team_score'] = row[11] 34 jsonData.append(result) 35 except Exception as e: 36 cur.close() # 关闭游标 37 conn.close() # 关闭连接 38 print('error') 39 else: 40 return jsonData
1@app.route('/teams', methods=['get']) 2def selectAll(): 3 result = {'status': 200, 'data': dao.selectAll()} 4 jsonResult = json.dumps(result) 5 return Response(jsonResult, mimetype='application/json') #声明Content-Type为json格式

POST请求

DAO层代码

使用%占位符可以做到防注入

1# 添加队伍 2def addTeam(team): 3 conn = pymysql.connect(host='42.193.160.52', # 连接名称,默认127.0.0.1 4 user='dianxinbei', # 用户名 5 password='A8SdsZAWMpGkKmGb', # 密码 6 port=3306, # 端口,默认为3306 7 db='dianxinbei', # 数据库名称 8 charset='utf8' # 字符编码 9 ) 10 11 cur = conn.cursor() # 生成游标对象 12 sql = "Replace team (team_name,team_school,team_title,stu1_id,stu1_name,stu1_phone,stu2_id,stu2_name,stu3_id," \ 13 "stu3_name) value (%(team_name)s,%(team_school)s,%(team_title)s,%(stu1_id)s,%(stu1_name)s,%(stu1_phone)s," \ 14 "%(stu2_id)s,%(stu2_name)s,%(stu3_id)s,%(stu3_name)s)" 15 try: 16 cur.execute(sql, team) 17 conn.commit() 18 cur.close() # 关闭游标 19 conn.close() # 关闭连接 20 21 return 1 22 except Exception as e: 23 conn.rollback() 24 cur.close() # 关闭游标 25 conn.close() # 关闭连接 26 print('error') 27 return 0

controller层代码(写在app.py就行)

1@app.route('/teams', methods=['GET', 'POST']) 2def selectAll(): 3 global result, jsonResult 4 #处理get请求 5 if request.method == 'GET': 6 result = {'status': 200, 'data': dao.selectAll()} 7 jsonResult = json.dumps(result) 8 return Response(jsonResult, mimetype='application/json') # 声明请求头Content-Type为json格式 9 #处理post请求 10 elif request.method == 'POST': 11 try: 12 result = {'status': 200} 13 data = request.get_json() 14 dao.addTeam(data) 15 jsonResult = json.dumps(result) 16 return Response(jsonResult, mimetype='application/json') 17 except: 18 result['status'] = 400 19 return Response(jsonResult, mimetype='application/json')

axios层获取数据

参考上面的两个get和post请求,分别对应的axios层的js代码如下(注意是在vue.js框架里使用的)

1//添加队伍数据 2 addTeam() { 3 alert("提交成功") 4 //发送请求数据 5 axios.post("/teams",this.tableData).then((res) => { 6 }); 7 //测试使用 8 },
1//获取表单数据 2 getAll() { 3 axios.get("/teams").then((res) => { 4 this.tableData = res.data.data; 5 console.log(res.data); 6 }); 7 },

mqtt

pip3 install flask-mqtt

使用模板进行订阅和发布

1from flask import Flask 2import eventlet 3from flask_mqtt import Mqtt 4 5eventlet.monkey_patch() 6 7app = Flask(__name__) 8 9app.config['SECRET'] = 'my secret key' 10app.config['TEMPLATES_AUTO_RELOAD'] = True 11app.config['MQTT_BROKER_URL'] = 'zgmzgm.top' 12app.config['MQTT_BROKER_PORT'] = 1883 13app.config['MQTT_USERNAME'] = '' 14app.config['MQTT_PASSWORD'] = '' 15app.config['MQTT_KEEPALIVE'] = 5 16app.config['MQTT_TLS_ENABLED'] = False 17app.config['MQTT_CLEAN_SESSION'] = True 18 19mqtt = Mqtt(app) 20 21 22@app.route('/') 23def index(): 24 return "hello mqtt_flask" 25 26 27@app.route('/hello') 28def hello(): 29 mqtt.publish('hello', 'hello, this is flask') 30 print("[mqtt] publish successfully") 31 return "publish successfully" 32 33 34@mqtt.on_connect() 35def handle_connect(client, userdata, flags, rc): 36 mqtt.subscribe('hello') 37 print("[mqtt] has listen topic hello") 38 39 40@mqtt.on_message() 41def handle_mqtt_message(client, userdata, message): 42 data = dict( 43 topic=message.topic, 44 payload=message.payload.decode() 45 ) 46 print(data) 47 48 49if __name__ == '__main__': 50 app.run()

python的json解包和封装包

1import json 2json.loads() 3json.dumps()