ORM 江湖
曾幾何時,程序員因為懼怕SQL而在開發(fā)的時候小心翼翼的寫著sql,心中總是少不了恐慌,萬一不小心sql語句出錯,搞壞了數據庫怎么辦?又或者為了獲取一些數據,什么內外左右連接,函數存儲過程等等。毫無疑問,不搞懂這些,怎么都覺得變扭,說不定某天就跳進了坑里,叫天天不應,喊地地不答。
ORM 的出現,讓畏懼SQL的開發(fā)者,在坑里看見了爬出去的繩索,仿佛天空并不是那么黑暗,至少再暗,我們也有了眼睛。顧名思義,ORM 對象關系映射,簡而言之,就是把數據庫的一個個table(表),映射為編程語言的class(類)。
python中比較著名的ORM框架有很多,大名頂頂的 SQLAlchemy 是python世界里當仁不讓的ORM框架。江湖中peewee,strom, pyorm,SQLObject 各領風騷,可是最終還是SQLAlchemy 傲視群雄。
SQLAlchemy 簡介
SQLAlchemy 分為兩個部分,一個用于 ORM 的對象映射,另外一個是核心的 SQL expression 。第一個很好理解,純粹的ORM,后面這個不是 ORM,而是DBAPI的封裝,當然也提供了很多方法,避免了直接寫sql,而是通過一些sql表達式。使用 SQLAlchemy 則可以分為三種方式。
- 使用 sql expression ,通過 SQLAlchemy 的方法寫sql表達式,簡介的寫sql
- 使用 raw sql, 直接書寫 sql
- 使用 ORM 避開直接書寫 sql
本文先探討 SQLAlchemy的 sql expresstion 部分的用法。主要還是跟著官方的 SQL Expression Language Tutorial.介紹
為什么要學習 sql expresstion ,而不直接上 ORM?因為后面這個兩個是 orm 的基礎。并且,即是不使用orm,后面這兩個也能很好的完成工作,并且代碼的可讀性更好。純粹把SQLAlchemy當成dbapi使用。首先SQLAlchemy 內建數據庫連接池,解決了連接操作相關繁瑣的處理。其次,提供方便的強大的log功能,最后,復雜的查詢語句,依靠單純的ORM比較難實現。
實戰(zhàn)
連接數據庫
首先需要導入 sqlalchemy 庫,然后建立數據庫連接,這里使用 mysql。通過create_engine方法進行
1
2
|
from sqlalchemy import create_engine engine = create_engine( "mysql://root:@localhost:3306/webpy?charset=utf8" ,encoding = "utf-8" , echo = True ) |
create_engine 方法進行數據庫連接,返回一個 db 對象。里面的參數表示
數據庫類型://用戶名:密碼(沒有密碼則為空,不填)@數據庫主機地址/數據庫名?編碼
echo = True 是為了方便 控制臺 logging 輸出一些sql信息,默認是False
通過這個engine對象可以直接execute 進行查詢,例如 engine.execute("SELECT * FROM user") 也可以通過 engine 獲取連接在查詢,例如 conn = engine.connect() 通過 conn.execute()方法進行查詢。兩者有什么差別呢?
直接使用engine的execute執(zhí)行sql的方式, 叫做connnectionless執(zhí)行,
借助 engine.connect()獲取conn, 然后通過conn執(zhí)行sql, 叫做connection執(zhí)行
主要差別在于是否使用transaction模式, 如果不涉及transaction, 兩種方法效果是一樣的. 官網推薦使用后者。
定義表
定義數據表,才能進行sql表達式的操作,畢竟sql表達式的表的確定,是sqlalchemy制定的,如果數據庫已經存在了數據表還需要定義么?當然,這里其實是一個映射關系,如果不指定,查詢表達式就不知道是附加在那個表的操作,當然定義的時候,注意表名和字段名,代碼和數據的必須保持一致。定義好之后,就能創(chuàng)建數據表,一旦創(chuàng)建了,再次運行創(chuàng)建的代碼,數據庫是不會創(chuàng)建的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# -*- coding: utf-8 -*- __author__ = 'ghost' from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey # 連接數據庫 engine = create_engine( "mysql://root:@localhost:3306/webpy?charset=utf8" ,encoding = "utf-8" , echo = True ) # 獲取元數據 metadata = MetaData() # 定義表 user = Table( 'user' , metadata, Column( 'id' , Integer, primary_key = True ), Column( 'name' , String( 20 )), Column( 'fullname' , String( 40 )), ) address = Table( 'address' , metadata, Column( 'id' , Integer, primary_key = True ), Column( 'user_id' , None , ForeignKey( 'user.id' )), Column( 'email' , String( 60 ), nullable = False ) ) # 創(chuàng)建數據表,如果數據表存在,則忽視 metadata.create_all(engine) # 獲取數據庫連接 conn = engine.connect() |
插入 insert
有了數據表和連接對象,對應數據庫操作就簡單了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
>>> i = user.insert() # 使用查詢 >>> i <sqlalchemy.sql.dml.Insert object at 0x0000000002637748 > >>> print i # 內部構件的sql語句 INSERT INTO "user" ( id , name, fullname) VALUES (: id , :name, :fullname) >>> u = dict (name = 'jack' , fullname = 'jack Jone' ) >>> r = conn.execute(i, * * u) # 執(zhí)行查詢,第一個為查詢對象,第二個參數為一個插入數據字典,如果插入的是多個對象,就把對象字典放在列表里面 >>> r <sqlalchemy.engine.result.ResultProxy object at 0x0000000002EF9390 > >>> r.inserted_primary_key # 返回插入行 主鍵 id [ 4L ] >>> addresses [{ 'user_id' : 1 , 'email' : '[email protected]' }, { 'user_id' : 1 , 'email' : '[email protected]' }, { 'user_id' : 2 , 'email' : '[email protected]' }, { 'user_id' : 2 , 'email' : '[email protected]' }] >>> i = address.insert() >>> r = conn.execute(i, addresses) # 插入多條記錄 >>> r <sqlalchemy.engine.result.ResultProxy object at 0x0000000002EB5080 > >>> r.rowcount #返回影響的行數 4L >>> i = user.insert().values(name = 'tom' , fullname = 'tom Jim' ) >>> i. compile () <sqlalchemy.sql.compiler.SQLCompiler object at 0x0000000002F6F390 > >>> print i. compile () INSERT INTO "user" (name, fullname) VALUES (:name, :fullname) >>> print i. compile ().params { 'fullname' : 'tom Jim' , 'name' : 'tom' } >>> r = conn.execute(i) >>> r.rowcount 1L |
查詢 select
查詢方式很靈活,多數時候使用 sqlalchemy.sql 下面的 select方法
1
2
3
4
5
6
|
>>> s = select([user]) # 查詢 user表 >>> s <sqlalchemy.sql.selectable.Select at 0x25a7748 ; Select object > >>> print s SELECT "user" . id , "user" .name, "user" .fullname FROM "user" |
如果需要查詢自定義的字段,可是使用 user 的cloumn 對象,例如
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
>>> user.c # 表 user 的字段column對象 <sqlalchemy.sql.base.ImmutableColumnCollection object at 0x0000000002E804A8 > >>> print user.c [ 'user.id' , 'user.name' , 'user.fullname' ] >>> s = select([user.c.name,user.c.fullname]) >>> r = conn.execute(s) >>> r <sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748 > >>> r.rowcount # 影響的行數 5L >>> ru = r.fetchall() >>> ru [(u 'hello' , u 'hello world' ), (u 'Jack' , u 'Jack Jone' ), (u 'Jack' , u 'Jack Jone' ), (u 'jack' , u 'jack Jone' ), (u 'tom' , u 'tom Jim' )] >>> r <sqlalchemy.engine.result.ResultProxy object at 0x00000000025A7748 > >>> r.closed # 只要 r.fetchall() 之后,就會自動關閉 ResultProxy 對象 True |
同時查詢兩個表
1
2
3
4
5
6
7
|
>>> s = select([user.c.name, address.c.user_id]).where(user.c. id = = address.c.user_id) # 使用了字段和字段比較的條件 >>> s <sqlalchemy.sql.selectable.Select at 0x2f03390 ; Select object > >>> print s SELECT "user" .name, address.user_id FROM "user" , address WHERE "user" . id = address.user_id |
操作符
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
>>> print user.c. id = = address.c.user_id # 返回一個編譯的字符串 "user" . id = address.user_id >>> print user.c. id = = 7 "user" . id = :id_1 # 編譯成為帶參數的sql 語句片段字符串 >>> print user.c. id ! = 7 "user" . id ! = :id_1 >>> print user.c. id > 7 "user" . id > :id_1 >>> print user.c. id = = None "user" . id IS NULL >>> print user.c. id + address.c. id # 使用兩個整形的變成 + "user" . id + address. id >>> print user.c.name + address.c.email # 使用兩個字符串 變成 || "user" .name || address.email |
操作連接
這里的連接指條件查詢的時候,邏輯運算符的連接,即 and or 和 not
1
2
3
4
5
6
7
8
9
10
|
>>> print and_( user.c.name.like( 'j%' ), user.c. id = = address.c.user_id, or_( ), not_(user.c. id > 5 )) "user" .name LIKE :name_1 AND "user" . id = address.user_id AND (address.email = :email_1 OR address.email = :email_2) AND "user" . id < = :id_1 >>> |
得到的結果為 編譯的sql語句片段,下面看一個完整的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
>>> se_sql = [(user.c.fullname + ", " + address.c.email).label( 'title' )] >>> wh_sql = and_( user.c. id = = address.c.user_id, user.c.name.between( 'm' , 'z' ), or_( address.c.email.like( '%@aol.com' ), address.c.email.like( '%@msn.com' ) ) ) >>> print wh_sql "user" . id = address.user_id AND "user" .name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2) >>> s = select(se_sql).where(wh_sql) >>> print s SELECT "user" .fullname || :fullname_1 || address.email AS title FROM "user" , address WHERE "user" . id = address.user_id AND "user" .name BETWEEN :name_1 AND :name_2 AND (address.email LIKE :email_1 OR address.email LIKE :email_2) >>> r = conn.execute(s) >>> r.fetchall() |
使用 raw sql 方式
遇到負責的sql語句的時候,可以使用 sqlalchemy.sql 下面的 text 函數。將字符串的sql語句包裝編譯成為 execute執(zhí)行需要的sql對象。例如:、
1
2
3
4
5
6
7
8
|
>>> text_sql = "SELECT id, name, fullname FROM user WHERE id=:id" # 原始sql語句,參數用( :value)表示 >>> s = text(text_sql) >>> print s SELECT id , name, fullname FROM user WHERE id = : id >>> s <sqlalchemy.sql.elements.TextClause object at 0x0000000002587668 > >>> conn.execute(s, id = 3 ).fetchall() # id=3 傳遞:id參數 [( 3L , u 'Jack' , u 'Jack Jone' )] |
連接 join
連接有join 和 outejoin 兩個方法,join 有兩個參數,第一個是join 的表,第二個是on 的條件,joing之后必須要配合select_from 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
>>> print user.join(address) "user" JOIN address ON "user" . id = address.user_id # 因為開啟了外鍵 ,所以join 能只能識別 on 條件 >>> print user.join(address, address.c.user_id = = user.c. id ) # 手動指定 on 條件 "user" JOIN address ON address.user_id = "user" . id >>> s = select([user.c.name, address.c.email]).select_from(user.join(address, user.c. id = = address.c.user_id)) # 被jion的sql語句需要用 select_from方法配合 >>> s <sqlalchemy.sql.selectable.Select at 0x2eb63c8 ; Select object > >>> print s SELECT "user" .name, address.email FROM "user" JOIN address ON "user" . id = address.user_id >>> conn.execute(s).fetchall() [(u 'hello' , u '[email protected]' ), (u 'hello' , u '[email protected]' ), (u 'hello' , u '[email protected]' ), (u 'hello' , u '[email protected]' ), (u 'Jack' , u '[email protected]' ), (u 'Jack' , u '[email protected]' ), (u 'Jack' , u '[email protected]' ), (u 'Jack' , u '[email protected]' )] |
排序 分組 分頁
排序使用 order_by 方法,分組是 group_by ,分頁自然就是limit 和 offset兩個方法配合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
>>> s = select([user.c.name]).order_by(user.c.name) # order_by >>> print s SELECT "user" .name FROM "user" ORDER BY "user" .name >>> s = select([user]).order_by(user.c.name.desc()) >>> print s SELECT "user" . id , "user" .name, "user" .fullname FROM "user" ORDER BY "user" .name DESC >>> s = select([user]).group_by(user.c.name) # group_by >>> print s SELECT "user" . id , "user" .name, "user" .fullname FROM "user" GROUP BY "user" .name >>> s = select([user]).order_by(user.c.name.desc()).limit( 1 ).offset( 3 ) # limit(1).offset(3) >>> print s SELECT "user" . id , "user" .name, "user" .fullname FROM "user" ORDER BY "user" .name DESC LIMIT :param_1 OFFSET :param_2 [( 4L , u 'jack' , u 'jack Jone' )] |
更新 update
前面都是一些查詢,更新和插入的方法很像,都是 表下面的方法,不同的是,update 多了一個 where 方法 用來選擇過濾
1
2
3
4
5
6
7
8
9
10
11
12
|
>>> s = user.update() >>> print s UPDATE "user" SET id = : id , name = :name, fullname = :fullname >>> s = user.update().values(fullname = user.c.name) # values 指定了更新的字段 >>> print s UPDATE "user" SET fullname = "user" .name >>> s = user.update().where(user.c.name = = 'jack' ).values(name = 'ed' ) # where 進行選擇過濾 >>> print s UPDATE "user" SET name = :name WHERE "user" .name = :name_1 >>> r = conn.execute(s) >>> print r.rowcount # 影響行數 3 |
還有一個高級用法,就是一次命令執(zhí)行多個記錄的更新,需要用到 bindparam 方法
1
2
3
4
5
6
7
8
9
|
>>> s = user.update().where(user.c.name = = bindparam( 'oldname' )).values(name = bindparam( 'newname' )) # oldname 與下面的傳入的從拿書進行綁定,newname也一樣 >>> print s UPDATE "user" SET name = :newname WHERE "user" .name = :oldname >>> u = [{ 'oldname' : 'hello' , 'newname' : 'edd' }, { 'oldname' : 'ed' , 'newname' : 'mary' }, { 'oldname' : 'tom' , 'newname' : 'jake' }] >>> r = conn.execute(s, u) >>> r.rowcount 5L |
刪除 delete
刪除比較容易,調用 delete方法即可,不加 where 過濾,則刪除所有數據,但是不會drop掉表,等于清空了數據表
1
2
3
4
5
6
7
8
|
>>> r = conn.execute(address.delete()) # 清空表 >>> print r <sqlalchemy.engine.result.ResultProxy object at 0x0000000002EAF550 > >>> r.rowcount 8L >>> r = conn.execute(users.delete().where(users.c.name > 'm' )) # 刪除記錄 >>> r.rowcount 3L |
flask-sqlalchemy
SQLAlchemy已經成為了python世界里面orm的標準,flask是一個輕巧的web框架,可以自由的使用orm,其中flask-sqlalchemy是專門為flask指定的插件。
安裝flask-sqlalchemy
1
|
pip install flask-sqlalchemy |
初始化sqlalchemy
1
2
3
4
5
6
7
8
9
10
11
|
from flask import Flask from flask.ext.sqlalchemy import SQLAlchemy app = Flask(__name__) # dialect+driver://username:password@host:port/database?charset=utf8 # 配置 sqlalchemy 數據庫驅動://數據庫用戶名:密碼@主機地址:端口/數據庫?編碼 app.config[ 'SQLALCHEMY_DATABASE_URI' ] = 'mysql://root:@localhost:3306/sqlalchemy?charset=utf8' # 初始化 db = SQLAlchemy(app) |
定義model
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
class User(db.Model): """ 定義了三個字段, 數據庫表名為model名小寫 """ id = db.Column(db.Integer, primary_key = True ) username = db.Column(db.String( 80 ), unique = True ) email = db.Column(db.String( 120 ), unique = True ) def __init__( self , username, email): self .username = username self .email = email def __repr__( self ): return '<User %r>' % self .username def save( self ): db.session.add( self ) db.session.commit() |
創(chuàng)建數據表
數據包的創(chuàng)建使用sqlalchemy app,如果表已經存在,則忽略,如果不存在,則新建
1
2
3
4
5
|
>>> from yourapp import db, User >>> db.session.add(u) # 添加session >>> db.session.commit() # 提交查詢 >>> users = User.query. all () # 查詢 |
需要注意的是,如果要插入中文,必須插入 unicode字符串
1
2
|
>>> u.save() |
定義關系
關系型數據庫,最重要的就是關系。通常關系分為 一對一(例如無限級欄目),一對多(文章和欄目),多對多(文章和標簽)
one to many:
我們定義一個Category(欄目)和Post(文章),兩者是一對多的關系,一個欄目有許多文章,一個文章屬于一個欄目。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
class Category(db.Model): id = db.Column(db.Integer, primary_key = True ) name = db.Column(db.String( 50 )) def __init__( self , name): self .name = name def __repr__( self ): return '<Category %r>' % self .name class Post(db.Model): """ 定義了五個字段,分別是 id,title,body,pub_date,category_id """ id = db.Column(db.Integer, primary_key = True ) title = db.Column(db.String( 80 )) body = db.Column(db.Text) pub_date = db.Column(db.String( 20 )) # 用于外鍵的字段 category_id = db.Column(db.Integer, db.ForeignKey( 'category.id' )) # 外鍵對象,不會生成數據庫實際字段 # backref指反向引用,也就是外鍵Category通過backref(post_set)查詢Post category = db.relationship( 'Category' , backref = db.backref( 'post_set' , lazy = 'dynamic' )) def __init__( self , title, body, category, pub_date = None ): self .title = title self .body = body if pub_date is None : pub_date = time.time() self .pub_date = pub_date self .category = category def __repr__( self ): return '<Post %r>' % self .title def save( self ): db.session.add( self ) db.session.commit() |
如何使用查詢呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
>>> c = Category(name = 'Python' ) >>> c <Category 'Python' > >>> c.post_set <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B58F60 > >>> c.post_set. all () [] >>> p = Post(title = 'hello python' , body = 'python is cool' , category = c) >>> p.save() >>> c.post_set <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B73710 > >>> c.post_set. all () # 反向查詢 [<Post u 'hello python' >] >>> p <Post u 'hello python' > >>> p.category <Category u 'Python' > # 也可以使用category_id 字段來添加 >>> p = Post(title = 'hello flask' , body = 'flask is cool' , category_id = 1 ) >>> p.save() |
many to many (評論已經指出,這樣的做法無法關聯刪除,簡書沒有刪除線格式,多多對例子作廢,在此提示,以免被誤導)
對于多對多的關系,往往是定義一個兩個model的id的另外一張表,例如 Post 和 Tag之間是多對多,需要定義一個 Post_Tag的表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
post_tag = db.Table( 'post_tag' , db.Column( 'post_id' , db.Integer, db.ForeignKey( 'post.id' )), db.Column( 'tag_id' , db.Integer, db.ForeignKey( 'tag.id' )) ) class Post(db.Model): id = db.Column(db.Integer, primary_key = True ) # ... 省略 # 定義一個反向引用,tag可以通過 post_set查詢到 post的集合 tags = db.relationship( 'Tag' , secondary = post_tag, backref = db.backref( 'post_set' , lazy = 'dynamic' )) class Tag(db.Model): id = db.Column(db.Integer, primary_key = True ) content = db.Column(db.String( 10 ), unique = True ) # 定義反向查詢 posts = db.relationship( 'Post' , secondary = post_tag, backref = db.backref( 'tag_set' , lazy = 'dynamic' )) def __init__( self , content): self .content = content def save( self ): db.session.add( self ) db.session.commit() |
查詢:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
>>> tag_list = [] >>> tags = [ 'python' , 'flask' , 'ruby' , 'rails' ] >>> for tag in tags: t = Tag(tag) tag_list.append(t) >>> tag_list [<f_sqlalchemy.Tag object at 0x0000000003B7CF28 >, <f_sqlalchemy.Tag object at 0x0000000003B7CF98 >, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8 >, <f_sqlalchemy.Tag object at 0x0000000003B7CE80 >] >>> p <Post u 'hello python' > >>> p.tags [] >>> p.tags = tag_list # 添加多對多的數據 >>> p.save() >>> p.tags [<f_sqlalchemy.Tag object at 0x0000000003B7CF28 >, <f_sqlalchemy.Tag object at 0x0000000003B7CF98 >, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8 >, <f_sqlalchemy.Tag object at 0x0000000003B7CE80 >] >>> p.tag_set # 反向查詢 <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B7C080 > >>> p.tag_set. all () [<f_sqlalchemy.Tag object at 0x0000000003B7CF28 >, <f_sqlalchemy.Tag object at 0x0000000003B7CF98 >, <f_sqlalchemy.Tag object at 0x0000000003B7CEB8 >, <f_sqlalchemy.Tag object at 0x0000000003B7CE80 >] >>> t = Tag.query. all ()[ 1 ] >>> t <f_sqlalchemy.Tag object at 0x0000000003B7CF28 > >>> t.content u 'python' >>> t.posts [<Post u 'hello python' >] >>> t.post_set <sqlalchemy.orm.dynamic.AppenderBaseQuery object at 0x0000000003B7C358 > >>> t.post_set. all () [<Post u 'hello python' >] self one to one |
自身一對一也是常用的需求,比如無限分級欄目
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
class Category(db.Model): id = db.Column(db.Integer, primary_key = True ) name = db.Column(db.String( 50 )) # 父級 id pid = db.Column(db.Integer, db.ForeignKey( 'category.id' )) # 父欄目對象 pcategory = db.relationship( 'Category' , uselist = False , remote_side = [ id ], backref = db.backref( 'scategory' , uselist = False )) def __init__( self , name, pcategory = None ): self .name = name self .pcategory = pcategory def __repr__( self ): return '<Category %r>' % self .name def save( self ): db.session.add( self ) db.session.commit() |
查詢:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
>>> p = Category( 'Python' ) >>> p <Category 'Python' > >>> p.pid >>> p.pcategory # 查詢父欄目 >>> p.scategory # 查詢子欄目 >>> f = Category( 'Flask' , p) >>> f.save() >>> f <Category u 'Flask' > >>> f.pid 1L >>> f.pcategory # 查詢父欄目 <Category u 'Python' > >>> f.scategory # 查詢父欄目 >>> p.scategory # 查詢子欄目 <Category u 'Flask' > |
關于 flask-sqlalchemy 定義models的簡單應用就這么多,更多的技巧在于如何查詢。