1、安装Pony
1)使用pip安装Pony
pip install pony
2)安装数据库的驱动
若使用SQLite数据库,不需要安装数据库的驱动,否则需要安装数据库驱动如下:
PostgreSQL需要安装 psycopg 或 psycopg2cffi
MySQL需要安装 MySQL-python 或 PyMySQL
Oracle需要安装 cx_Oracle
2、定义数据库和表
使用Database()
创建数据库对象,使用bind()
绑定到数据库,定义表的实体,需要继承db.Entity
,使用sql_debug(True)
可以进行调试,查看Pony生成的sql语句,使用generate_mapping(create_tables=True)可
以建表及关系映射。
1)定义字段
使用attr_name = kind(type, *options)
来定义表的字段,kind
可以为Required
、Optional
、PrimaryKey
和Set
。
Required
和Optional
是最常用的两种属性,Required
表示必须存在的属性,Optional
表示可选的属性。PrimaryKey
是配置主键,映射到数据库中是primary key,每个实体必须包含一个。如没有显示的定义,pony会自动指定一个:id = PrimaryKey(int, auto=True)
,Set
代表集合,也叫做关系。实现to-many的关系。
type支持多种类型:
str
unicode
int
float
Decimal
datetime
date
time
timedelta
bool
buffer : 用于Python 2和3中的二进制数据
bytes :用于Python 3中的二进制数据
LongStr :用于大字符串
LongUnicode :用于大字符串
UUID
Json :用于映射到本机数据库JSON类型
IntArray :整数数组
StrArray :字符串数组
FloatArray :浮点数组
参考文档:https://docs.ponyorm.org/api_reference.html#attribute-options
from __future__ import absolute_import, print_function
from decimal import Decimal
from pony.orm import *
#db = Database("sqlite", "demo.sqlite", create_db=True)
db = Database()#Database("sqlite", "demo.sqlite", create_db=True)
db.bind(provider='oracle', user='admin', password='admin', dsn='docker_oracle11')
class Customer(db.Entity):
id = PrimaryKey(int, auto=True)
name = Required(str)
email = Required(str, unique=True)
orders = Set("OrderInfo")
class OrderInfo(db.Entity):
id = PrimaryKey(int, auto=True)
total_price = Required(Decimal)
customer = Required(Customer)
items = Set("OrderItem")
class Product(db.Entity):
id = PrimaryKey(int, auto=True)
name = Required(str)
price = Required(Decimal)
items = Set("OrderItem")
class OrderItem(db.Entity):
quantity = Required(int, default=1)
order = Required(OrderInfo)
product = Required(Product)
PrimaryKey(order, product)
#设置为debug模式
sql_debug(True)
#表不存在则会建表
db.generate_mapping(create_tables=True)
2)复合主键和复合索引
#复合主键
class Example(db.Entity):
a = Required(int)
b = Required(str)
c = Required(str)
d = Required(str)
PrimaryKey(a, b)
composite_key(c, d)
#composite_key(a, b)将被表示为UNIQUE ("a", "b")约束。
#复合索引
class Example(db.Entity):
a = Required(str)
b = Optional(int)
composite_index(a, b)
# or composite_index(a, 'b')
3、实体关联
1)一对多
class Order(db.Entity):
items = Set("OrderItem")
class OrderItem(db.Entity):
order = Required(Order)
或
class Order(db.Entity):
items = Set("OrderItem")
class OrderItem(db.Entity):
order = Optional(Order)
2)多对多
Pony 会自动生成中间表
class Product(db.Entity):
tags = Set("Tag")
class Tag(db.Entity):
products = Set(Product)
3)一对一
必须定义为Optional
和Required
或者Optiona
和Optional
class Person(db.Entity):
passport = Optional("Passport")
class Passport(db.Entity):
person = Required("Person")
4)自我关联
实体可以使用自引用关系与自身关联。这种关系可以有两种类型:对称和非对称。非对称关系由属于同一实体的两个属性定义。对称关系的具体特性是,实体只指定了一个关系属性,而该属性定义了关系的两边。这种关系可以是一对一,也可以是多对多。
class Person(db.Entity):
name = Required(str)
spouse = Optional("Person", reverse="spouse") # 对称 一对一
friends = Set("Person", reverse="friends") # 对称 多对多
manager = Optional("Person", reverse="employees") # 不对称的一面
employees = Set("Person", reverse="manager") # 非对称的另一面
5)两实体间的多重关系
当两个实体之间有多个关系时,Pony需要指定reverse
属性。
class User(db.Entity):
tweets = Set("Tweet", reverse="author")
favorites = Set("Tweet", reverse="favorited")
class Tweet(db.Entity):
author = Required(User, reverse="tweets")
favorited = Set(User, reverse="favorites")
4、数据增删改查
所有创建的实例都属于当前的db_session()
。Pony自动跟踪创建或更新的对象,并在当前db_session()
结束时自动将其保存到数据库中。如果需要在离开db_session()
作用域之前保存新创建的对象,可以使用flush()
或commit()
函数来保存。
from __future__ import absolute_import, print_function
from decimal import Decimal
from pony.orm import *
#db = Database("sqlite", "demo.sqlite", create_db=True)
db = Database()#Database("sqlite", "demo.sqlite", create_db=True)
db.bind(provider='oracle', user='admin', password='admin', dsn='docker_oracle11')
class Customer(db.Entity):
id = PrimaryKey(int, auto=True)
name = Required(str)
email = Required(str, unique=True)
orders = Set("OrderInfo")
class OrderInfo(db.Entity):
id = PrimaryKey(int, auto=True)
total_price = Required(Decimal)
customer = Required(Customer)
items = Set("OrderItem")
class Product(db.Entity):
id = PrimaryKey(int, auto=True)
name = Required(str)
price = Required(Decimal)
items = Set("OrderItem")
class OrderItem(db.Entity):
quantity = Required(int, default=1)
order = Required(OrderInfo)
product = Required(Product)
PrimaryKey(order, product)
sql_debug(True)
db.generate_mapping(create_tables=True)
@db_session
def populate_database():
c1 = Customer(name='John Smith', email='john@example.com')
c2 = Customer(name='Matthew Reed', email='matthew@example.com')
c3 = Customer(name='Chuan Qin', email='chuanqin@example.com')
c4 = Customer(name='Rebecca Lawson', email='rebecca@example.com')
c5 = Customer(name='Oliver Blakey', email='oliver@example.com')
p1 = Product(name='Kindle Fire HD', price=Decimal('284.00'))
p2 = Product(name='Apple iPad with Retina Display', price=Decimal('478.50'))
p3 = Product(name='SanDisk Cruzer 16 GB USB Flash Drive', price=Decimal('9.99'))
p4 = Product(name='Kingston DataTraveler 16GB USB 2.0', price=Decimal('9.98'))
p5 = Product(name='Samsung 840 Series 120GB SATA III SSD', price=Decimal('98.95'))
p6 = Product(name='Crucial m4 256GB SSD SATA 6Gb/s', price=Decimal('188.67'))
o1 = OrderInfo(customer=c1, total_price=Decimal('292.00'))
OrderItem(order=o1, product=p1)
OrderItem(order=o1, product=p4, quantity=2)
o2 = OrderInfo(customer=c1, total_price=Decimal('478.50'))
OrderItem(order=o2, product=p2)
o3 = OrderInfo(customer=c2, total_price=Decimal('680.50'))
OrderItem(order=o3, product=p2)
OrderItem(order=o3, product=p4, quantity=2)
OrderItem(order=o3, product=p6)
o4 = OrderInfo(customer=c3, total_price=Decimal('99.80'))
OrderItem(order=o4, product=p4, quantity=10)
o5 = OrderInfo(customer=c4, total_price=Decimal('722.00'))
OrderItem(order=o5, product=p1)
OrderItem(order=o5, product=p2)
commit()
#populate_database()
#查询
@db_session
def query():
print(select(p for p in Product if p.id==3)[:])
#多条数据会报错:pony.orm.core.MultipleObjectsFoundError: Multiple objects were found. Use Customer.select(...) to retrieve them
#通过pk获取
print(Product[1].name)
print(Customer.get(id=1).name)
print(Customer.select(lambda c: c.id==1)[:][0].name)
y = 1
print(Product.select_by_sql("SELECT * FROM Product WHERE id=$(y*2)")[0].name)
@db_session
def update():
c1 = Customer.get(id=1)
c1.name="john"
p1 = select(p for p in Product if p.id==3)[:][0]
p1.name="cjavapy"
c2 = Customer.get(id=2)
#更新多个字段
#c.name="cjavapy"
#c.email="cjavapy@gmail.com"
c2.set(name="cjavapy",email="cjavapy@gmail.com")
@db_session
def delete():
p = Product[1]
p.delete()
# 或者:
#delete(p for p in Product if p.id >=5)
# 或:
#使用bulk=True参数是一条delete语句批量删除,否则是先select满足条件数据,然后每次删除一个
Product.select(lambda p: p.id >=5).delete(bulk=True)
query()
update()
delete()