python-sqlalchemy

0x01 安装服务

基础环境

1
2
python3.6
mysql5.0

安装mysql

1
2
3
4
sudo yum install mysql-server
sudo systemctl start mysqld
sudo grep 'temporary password' /var/log/mysqld.log 查看初始化密码
sudo mysql_secure_installation

安装flask项目的依赖

1
2
3
4
5
6
7
8
9
flask
flask-mysqldb
flask_script
flask_migrate
flask_apscheduler
pyjwt
flask_restful
flask_cors
sqlalchemy

0x02 中文乱码解决

sqlite bug解决

1
2
3
4
5
6
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table:
第一次启动应用的时候,BUG解决:
$ python db.py shell
>>> from app.users.model import db
>>> db.create_all()
>>> exit

flask_sqlalchemy 中文乱码解决方式

1.修改代码部分

1
将"db = SQLAlchemy(app)"改为"db = SQLAlchemy(app, use_native_unicode="utf8")"

2.进入mysql的配置文件目录, 编辑my.cnf配置文件

1
2
3
4
5
6
7
8
9
10
cd /etc/mysql/
vim my.cnf
[mysqld]
character_set_server = utf8

[client]
default-character-set = utf8

[mysql]
default-character-set = utf8

3.重启mysql

1
sudo systemctl restart mysqld

4.进入mysql,重新查看状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
mysql> status
--------------
mysql Ver 14.14 Distrib 5.7.25, for Linux (x86_64) using EditLine wrapper

Connection id: 2
Current database:
Current user: test@localhost
SSL: Not in use
Current pager: stdout
Using outfile: ''
Using delimiter: ;
Server version: 5.7.25 MySQL Community Server (GPL)
Protocol version: 10
Connection: Localhost via UNIX socket
Server characterset: utf8
Db characterset: utf8
Client characterset: utf8
Conn. characterset: utf8
UNIX socket: /var/lib/mysql/mysql.sock
Uptime: 27 sec

5.如果还是不行,修改数据库的连接方式

1
2
'mysql://root:password@localhost/database'   改成如下:
'mysql+mysqlconnector://root:password@localhost/database'

0x03 基础数据模板

app/init.py

1
db = SQLAlchemy(use_native_unicode="utf8")

app/models/monitor_analysis_logger.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python
# coding=UTF-8
'''
@Author: Bing
@Description:
@Date: 2019-02-25 14:58:11
'''

import time
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import SQLAlchemyError

from app import db

class MonitorAnalysisLogger(db.Model):
'''
@Description: 监控分析引擎运行日志
@param {type}
@return:
'''
__tablename__ = "monitor_analysis_logger"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
engine = db.Column(db.String(250), unique=True, nullable=False, comment="监控引擎名称")
message = db.Column(db.Text, nullable=False, comment="日志信息")
ctime = db.Column(db.Integer, comment="创建时间")

def __init__(self, engine, message):
self.engine = engine
self.message = message

def __str__(self):
return "MonitorAnalysisLogger(id='%s')" % self.id

def get(self, id):
return self.query.filter_by(id=id).first()

def add(self, obj):
db.session.add(obj)
return session_commit()

def update(self):
return session_commit()

def delete(self, id):
self.query.filter_by(id=id).delete()
return session_commit()


def session_commit():
try:
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
reason = str(e)
return reason

一些基础操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
查询所有:
result = MonitorResourceConfig.query.all() # 单个first(), 最后一个last()
for row in result:
temp = {}
temp["id"] = row.id
temp["topic"] = row.topic
temp["category"] = row.category
temp["segmentation"] = row.segmentation
temp["paramater"] = row.paramater
temp["uid"] = Users.get(Users, row.uid).username
temp["status"] = row.status
temp["ctime"] = row.ctime
data.append(temp)
return jsonify(common.trueReturn(data, ""))

增加数据:
count = MonitorResourceConfig.query.filter_by(topic=topic).count()
if count == 0:
monitor = MonitorResourceConfig(topic=topic, category=category, segmentation=segmentation, paramater=paramater, uid = user.id )
result = MonitorResourceConfig.add(MonitorResourceConfig, monitor)
if monitor.id:
return jsonify(common.trueReturn("", "添加成功"))
else:
return jsonify(common.falseReturn("", "添加失败" ))

更新:
ids = request.json.get("id")
monitor = MonitorResourceConfig.query.filter_by(id=ids).first()
if monitor.status == 1:
monitor.status = 0
else:
monitor.status = 1
monitor.update()

0x04 sqlalchemy常用操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#查询一个trace中flow个数(to count flows of specific trace)
session.query(Flow).filter(Flow.trace_id == 1).count()

#查询一个trace中不同srcIP的个数 (to count distinct srcIP)
from sqlalchemy import distinct
from config import *
session = DBSession()
session.query(Flow.srcIP).filter(Flow.trace_id == 1).distinct().count()

#查询一个trace中不同的dstIP和dstPort对的个数(to count distinct dstIP and dstPort)
session.query(Flow.dstIP, Flow.dstPort).filter(Flow.trace_id == 1).distinct().count()

#查询指定列的数据,返回一个KeyedTuple数据类型的列表( get a tuple list of specified columns )
n = session.query(Flow.dstIP, Flow.dstPort).filter(Flow.trace_id == 1).all()
# The type of n is list.
# The type of n[0] is sqlalchemy.util._collections.KeyedTuple

#查询指定列中的所有不同值( get a distinct tuple list of specified columns)
n = session.query(Flow.dstIP, Flow.dstPort).filter(Flow.trace_id == 1).distinct().all()

#获得一列数据的平均值(get average value of a column)
# sql language: select avg(txPkt) from Flow
from sqlalchemy.sql import func
q = session.query(func.avg(Flow.txPkt)).filter(Flow.trace_id == 1)


#多列数据平均值的计算(compute average values of columns)
q = session.query((func.avg(Flow.txPkt)+func.avg(Flow.rxPkt))/2).filter(Flow.trace_id == 1)

#对查询到的数据排序(order by )
from sqlalchemy import desc
q = session.query(Flow.timestamp).filter(trace_id == 1).order_by(desc(Flow.timestamp))
q = Monitor.query.order_by(Monitor.id.desc()).limit(100)

#分组查询
q = session.query(Flow.dstIP, Flow.dstPort, func.count(Flow.id)).filter(Flow.trace_id == tid).group_by(Flow.dstIP, Flow.dstPort).all()

#查询中,常用的过滤操作
等于(equals), 例如 query.filter(name == 'Jack')
不等于(not equals), 例如 query.filter(name != 'Jack')
在列表中(in), 例如 query.filter(name.in_(['Micheal', 'Bob', 'Jack']))
不在列表中(not in), 例如query.filter(~name.in_(['Micheal', 'Bob', 'Jack']))
空值(null), 例如 query.filter(name == None)
不是空值(not null), 例如 query.filter(name != None)
与(and), 例如 query.filter(and_(name == 'Andy', fullname == 'Andy Liu' ))
and_可以省略, 例如 VulCategrory.query.filter_by( platform = platform, vul = vultype ).first()
或(or), 例如 query.filter(or_(name == 'Andy', name == 'Micheal'))


#修改一个数据(update a value)
session.query(Flow).filter(Flow.dstIP == dstIP, Flow.dstPort == dstPort, Flow.trace_id == 1).update({'cluster_id' : 0})

#插入一行数据(insert a row)
session = DBSession()
cluster = Clusters(trace_id = tid, cluster_id = cid, \
dstIP = dIP, dstPort = dPort, \
avgPkt = aPkt, avgByte = aByte, \
size = count)
session.add(cluster)
session.commit() # commit or flush
session.close()

#删除一行数据(delete a row )
session = DBSession()
session.query(Clusters).filter(Clusters.trace_id = 2).delete()
session.commit() # commit or flush
session.close()

#简单查询
print(session.query(User).all())
print(session.query(User.name, User.fullname).all())
print(session.query(User, User.name).all())


#带条件查询
print(session.query(User).filter_by(name='user1').all())
print(session.query(User).filter(User.name == "user").all())
print(session.query(User).filter(User.name.like("user%")).all())


#多条件查询
print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all())
print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all())


#sql过滤
print(session.query(User).filter("id>:id").params(id=1).all())


#关联查询
print(session.query(User, Address).filter(User.id == Address.user_id).all())
print(session.query(User).join(User.addresses).all())
print(session.query(User).outerjoin(User.addresses).all())


#聚合查询
print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all())
print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all())


#子查询
stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()
print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all())


#exists
print(session.query(User).filter(exists().where(Address.user_id == User.id)))
print(session.query(User).filter(User.addresses.any()))


限制返回字段查询
person = session.query(Person.name, Person.created_at,Person.updated_at).filter_by(name="zhongwei").order_by(Person.created_at).first()


记录总数查询:
from sqlalchemy import func
# count User records, without
# using a subquery.
session.query(func.count(User.id))
# return count of user "id" grouped
# by "name"
session.query(func.count(User.id)).group_by(User.name)

# 去重复
from sqlalchemy import distinct
# count distinct "name" values
session.query(func.count(distinct(User.name)))
# 去重复必须使用db
from app import db
result_vul = db.session.query(distinct(VulCategrory.platform))
坚持原创技术分享,您的支持将鼓励我继续创作!