1、安装Pony
1)使用pip安装Pony
pip install pony
2)安装数据库的驱动
若使用SQLite数据库,不需要安装数据库的驱动,否则需要安装数据库驱动如下:
PostgreSQL需要安装 psycopg 或 psycopg2cffi
MySQL需要安装 MySQL-python 或 PyMySQL
Oracle需要安装 cx_Oracle
2、定义数据库和表
使用Database()
创建数据库对象,使用bind()
绑定到数据库,定义表的实体,需要继承db.Entity
,使用sql_debug(True)
可以进行调试,查看Pony生成的sql语句,使用generate_mapping(create_tables=True)可
以建表及关系映射。
1)定义字段
使用attr_name = kind(type, *options)
来定义表的字段,kind
可以为Required
、Optional
、PrimaryKey
和Set
。
Required
和Optional
是最常用的两种属性,Required
表示必须存在的属性,Optional
表示可选的属性。PrimaryKey
是配置主键,映射到数据库中是primary key,每个实体必须包含一个。如没有显示的定义,pony会自动指定一个:id = PrimaryKey(int, auto=True)
,Set
代表集合,也叫做关系。实现to-many的关系。
type支持多种类型:
str
unicode
int
float
Decimal
datetime
date
time
timedelta
bool
buffer : 用于Python 2和3中的二进制数据
bytes :用于Python 3中的二进制数据
LongStr :用于大字符串
LongUnicode :用于大字符串
UUID
Json :用于映射到本机数据库JSON类型
IntArray :整数数组
StrArray :字符串数组
FloatArray :浮点数组
参考文档:https://docs.ponyorm.org/api_reference.html#attribute-options
from __future__ import absolute_import, print_function from decimal import Decimal from pony.orm import * #db = Database("sqlite", "demo.sqlite", create_db=True) db = Database()#Database("sqlite", "demo.sqlite", create_db=True) db.bind(provider='oracle', user='admin', password='admin', dsn='docker_oracle11') class Customer(db.Entity): id = PrimaryKey(int, auto=True) name = Required(str) email = Required(str, unique=True) orders = Set("OrderInfo") class OrderInfo(db.Entity): id = PrimaryKey(int, auto=True) total_price = Required(Decimal) customer = Required(Customer) items = Set("OrderItem") class Product(db.Entity): id = PrimaryKey(int, auto=True) name = Required(str) price = Required(Decimal) items = Set("OrderItem") class OrderItem(db.Entity): quantity = Required(int, default=1) order = Required(OrderInfo) product = Required(Product) PrimaryKey(order, product) #设置为debug模式 sql_debug(True) #表不存在则会建表 db.generate_mapping(create_tables=True)
2)复合主键和复合索引
#复合主键 class Example(db.Entity): a = Required(int) b = Required(str) c = Required(str) d = Required(str) PrimaryKey(a, b) composite_key(c, d) #composite_key(a, b)将被表示为UNIQUE ("a", "b")约束。 #复合索引 class Example(db.Entity): a = Required(str) b = Optional(int) composite_index(a, b) # or composite_index(a, 'b')
3、实体关联
1)一对多
class Order(db.Entity): items = Set("OrderItem") class OrderItem(db.Entity): order = Required(Order)
或
class Order(db.Entity): items = Set("OrderItem") class OrderItem(db.Entity): order = Optional(Order)
2)多对多
Pony 会自动生成中间表
class Product(db.Entity): tags = Set("Tag") class Tag(db.Entity): products = Set(Product)
3)一对一
必须定义为Optional
和Required
或者Optiona
和Optional
class Person(db.Entity): passport = Optional("Passport") class Passport(db.Entity): person = Required("Person")
4)自我关联
实体可以使用自引用关系与自身关联。这种关系可以有两种类型:对称和非对称。非对称关系由属于同一实体的两个属性定义。对称关系的具体特性是,实体只指定了一个关系属性,而该属性定义了关系的两边。这种关系可以是一对一,也可以是多对多。
class Person(db.Entity): name = Required(str) spouse = Optional("Person", reverse="spouse") # 对称 一对一 friends = Set("Person", reverse="friends") # 对称 多对多 manager = Optional("Person", reverse="employees") # 不对称的一面 employees = Set("Person", reverse="manager") # 非对称的另一面
5)两实体间的多重关系
当两个实体之间有多个关系时,Pony需要指定reverse
属性。
class User(db.Entity): tweets = Set("Tweet", reverse="author") favorites = Set("Tweet", reverse="favorited") class Tweet(db.Entity): author = Required(User, reverse="tweets") favorited = Set(User, reverse="favorites")
4、数据增删改查
所有创建的实例都属于当前的db_session()
。Pony自动跟踪创建或更新的对象,并在当前db_session()
结束时自动将其保存到数据库中。如果需要在离开db_session()
作用域之前保存新创建的对象,可以使用flush()
或commit()
函数来保存。
from __future__ import absolute_import, print_function from decimal import Decimal from pony.orm import * #db = Database("sqlite", "demo.sqlite", create_db=True) db = Database()#Database("sqlite", "demo.sqlite", create_db=True) db.bind(provider='oracle', user='admin', password='admin', dsn='docker_oracle11') class Customer(db.Entity): id = PrimaryKey(int, auto=True) name = Required(str) email = Required(str, unique=True) orders = Set("OrderInfo") class OrderInfo(db.Entity): id = PrimaryKey(int, auto=True) total_price = Required(Decimal) customer = Required(Customer) items = Set("OrderItem") class Product(db.Entity): id = PrimaryKey(int, auto=True) name = Required(str) price = Required(Decimal) items = Set("OrderItem") class OrderItem(db.Entity): quantity = Required(int, default=1) order = Required(OrderInfo) product = Required(Product) PrimaryKey(order, product) sql_debug(True) db.generate_mapping(create_tables=True) @db_session def populate_database(): c1 = Customer(name='John Smith', email='john@example.com') c2 = Customer(name='Matthew Reed', email='matthew@example.com') c3 = Customer(name='Chuan Qin', email='chuanqin@example.com') c4 = Customer(name='Rebecca Lawson', email='rebecca@example.com') c5 = Customer(name='Oliver Blakey', email='oliver@example.com') p1 = Product(name='Kindle Fire HD', price=Decimal('284.00')) p2 = Product(name='Apple iPad with Retina Display', price=Decimal('478.50')) p3 = Product(name='SanDisk Cruzer 16 GB USB Flash Drive', price=Decimal('9.99')) p4 = Product(name='Kingston DataTraveler 16GB USB 2.0', price=Decimal('9.98')) p5 = Product(name='Samsung 840 Series 120GB SATA III SSD', price=Decimal('98.95')) p6 = Product(name='Crucial m4 256GB SSD SATA 6Gb/s', price=Decimal('188.67')) o1 = OrderInfo(customer=c1, total_price=Decimal('292.00')) OrderItem(order=o1, product=p1) OrderItem(order=o1, product=p4, quantity=2) o2 = OrderInfo(customer=c1, total_price=Decimal('478.50')) OrderItem(order=o2, product=p2) o3 = OrderInfo(customer=c2, total_price=Decimal('680.50')) OrderItem(order=o3, product=p2) OrderItem(order=o3, product=p4, quantity=2) OrderItem(order=o3, product=p6) o4 = OrderInfo(customer=c3, total_price=Decimal('99.80')) OrderItem(order=o4, product=p4, quantity=10) o5 = OrderInfo(customer=c4, total_price=Decimal('722.00')) OrderItem(order=o5, product=p1) OrderItem(order=o5, product=p2) commit() #populate_database() #查询 @db_session def query(): print(select(p for p in Product if p.id==3)[:]) #多条数据会报错:pony.orm.core.MultipleObjectsFoundError: Multiple objects were found. Use Customer.select(...) to retrieve them #通过pk获取 print(Product[1].name) print(Customer.get(id=1).name) print(Customer.select(lambda c: c.id==1)[:][0].name) y = 1 print(Product.select_by_sql("SELECT * FROM Product WHERE id=$(y*2)")[0].name) @db_session def update(): c1 = Customer.get(id=1) c1.name="john" p1 = select(p for p in Product if p.id==3)[:][0] p1.name="cjavapy" c2 = Customer.get(id=2) #更新多个字段 #c.name="cjavapy" #c.email="cjavapy@gmail.com" c2.set(name="cjavapy",email="cjavapy@gmail.com") @db_session def delete(): p = Product[1] p.delete() # 或者: #delete(p for p in Product if p.id >=5) # 或: #使用bulk=True参数是一条delete语句批量删除,否则是先select满足条件数据,然后每次删除一个 Product.select(lambda p: p.id >=5).delete(bulk=True) query() update() delete()