1、$match
筛选文档,类似于 SQL 的 WHERE 子句。可以使用 $match 来选择满足特定条件的文档。
from pymongo import MongoClient
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
orders_collection = db['orders'] # 获取集合
# 使用 $match 操作筛选文档
pipeline = [
{
'$match': {
'amount': {'$gte': 1000}, # 大于等于 1000 的金额
'date': {'$gte': '2023-01-01', '$lte': '2023-12-31'} # 日期范围筛选
}
}
]
# 执行聚合操作
result = list(sales_collection.aggregate(pipeline))
# 打印筛选结果
for doc in result:
print(doc)
2、$group
分组和汇总数据,类似于 SQL 的 GROUP BY 子句。你可以使用 $group 来对文档进行分组,并进行聚合操作,如计算总和、平均值、计数等。
from pymongo import MongoClient
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
orders_collection = db['orders'] # 获取集合
# 使用 $group 操作对文档分组和聚合
pipeline = [
{
'$group': {
'_id': '$product', # 根据产品名称分组
'total_sales': {'$sum': '$amount'}, # 计算总销售额
'average_sales': {'$avg': '$amount'} # 计算平均销售额
}
}
]
# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))
# 打印分组和聚合结果
for doc in result:
print(doc)
3、$project
投影操作,类似于 SQL 的 SELECT 子句。可以使用 $project 来选择要返回的字段,并可以进行计算、重命名字段等操作。
from pymongo import MongoClient
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
orders_collection = db['orders'] # 获取集合
# 使用 $project 操作选择指定字段并计算新字段
pipeline = [
{
'$project': {
'name': 1, # 选择姓名字段
'age': 1, # 选择年龄字段
'discount': {'$multiply': ['$amount', 0.1]} # 计算折扣字段(销售额的 10%)
}
}
]
# 执行查询和投影操作
result = list(orders_collection.aggregate(pipeline))
# 打印查询结果
for doc in result:
print(doc)
注意:$multiply
操作符用于在 MongoDB 聚合管道中执行乘法操作。它可以将一个或多个字段的值相乘,或将字段的值与常数相乘,并将结果作为新的字段添加到聚合文档中。
4、$sort
排序文档,类似于 SQL 的 ORDER BY 子句。可以使用 $sort 来指定文档的排序方式,升序或降序。
from pymongo import MongoClient
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
orders_collection = db['orders'] # 获取集合
# 使用 $sort 操作对文档进行降序排序
pipeline = [
{
'$sort': {
'amount': -1 # 降序排序,使用 -1 表示降序,1 表示升序
}
}
]
# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))
# 打印排序后的查询结果
for doc in result:
print(doc)
5、$limit 和 $skip
限制结果集的大小,类似于 SQL 的 LIMIT 和 OFFSET 子句。$limit 用于限制返回的文档数量,而 $skip 用于跳过一定数量的文档。
from pymongo import MongoClient
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
orders_collection = db['orders'] # 获取集合
# 定义分页参数
page_number = 2 # 第二页
page_size = 5 # 每页显示 5 条记录
# 使用 $skip 和 $limit 进行分页查询
pipeline = [
{
'$skip': (page_number - 1) * page_size # 计算要跳过的文档数量
},
{
'$limit': page_size # 限制每页返回的文档数量
}
]
# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))
# 打印分页结果
for doc in result:
print(doc)
6、$unwind
展开数组字段,将数组字段的每个元素拆分成单独的文档。这对于处理包含数组的文档非常有用。
from pymongo import MongoClient
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
orders_collection = db['orders'] # 获取集合
# 示例订单文档
orders_data = [
{
'order_id': 1,
'items': ['item1', 'item2', 'item3']
},
{
'order_id': 2,
'items': ['item4', 'item5']
}
]
# 插入示例订单数据
orders_collection.insert_many(orders_data)
# 使用 $unwind 操作符展开订单项数组
pipeline = [
{
'$unwind': '$items' # 展开 items 数组字段
}
]
# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))
# 打印展开后的结果
for doc in result:
print(doc)
7、$lookup
执行左外连接操作,将两个集合中的文档关联在一起。这允许在一个文档中访问另一个集合的数据。
from pymongo import MongoClient
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
orders_collection = db['orders'] # 获取订单集合
products_collection = db['products'] # 获取产品集合
# 示例订单文档
orders_data = [
{
'order_id': 1,
'product_id': 101,
'quantity': 3
},
{
'order_id': 2,
'product_id': 102,
'quantity': 2
},
{
'order_id': 3,
'product_id': 101,
'quantity': 1
}
]
# 示例产品文档
products_data = [
{
'product_id': 101,
'product_name': 'Product A',
'price': 50
},
{
'product_id': 102,
'product_name': 'Product B',
'price': 30
}
]
# 插入示例数据到订单集合和产品集合
orders_collection.insert_many(orders_data)
products_collection.insert_many(products_data)
# 使用 $lookup 操作符连接订单和产品集合
pipeline = [
{
'$lookup': {
'from': 'products', # 目标集合名称
'localField': 'product_id', # 本地集合的连接字段
'foreignField': 'product_id', # 目标集合的连接字段
'as': 'product_info' # 结果别名
}
},
{
'$unwind': '$product_info' # 展开连接后的结果数组
}
]
# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))
# 打印连接后的结果
for doc in result:
print(doc)
8、$addFields 和 $set
添加新字段或修改现有字段的值。可以使用 $addFields 来添加新的计算字段,而 $set 用于更新现有字段的值。
from pymongo import MongoClient
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
students_collection = db['students'] # 获取学生集合
# 示例学生文档
students_data = [
{
'student_id': 1,
'name': 'Alice',
'math_score': 90,
'english_score': 85
},
{
'student_id': 2,
'name': 'Bob',
'math_score': 78,
'english_score': 92
}
]
# 插入示例数据到学生集合
students_collection.insert_many(students_data)
# 使用 $addFields 操作符添加 "total_score" 字段
pipeline = [
{
'$addFields': {
'total_score': {'$sum': ['$math_score', '$english_score']}
}
}
]
# 执行聚合操作
result = list(students_collection.aggregate(pipeline))
# 打印结果
for doc in result:
print(doc)
9、$out
将聚合操作的结果写入到新的集合中,可以用于创建新的集合以保存聚合后的数据。
from pymongo import MongoClient
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
orders_collection = db['orders'] # 获取订单集合
# 示例订单文档
orders_data = [
{
'order_id': 1,
'customer_id': 101,
'order_amount': 50
},
{
'order_id': 2,
'customer_id': 102,
'order_amount': 30
},
{
'order_id': 3,
'customer_id': 101,
'order_amount': 70
}
]
# 插入示例数据到订单集合
orders_collection.insert_many(orders_data)
# 使用 $group 操作符计算每个客户的总订单金额
pipeline = [
{
'$group': {
'_id': '$customer_id',
'total_amount': {'$sum': '$order_amount'}
}
},
{
'$out': 'customer_total_orders' # 将结果输出到名为 'customer_total_orders' 的集合中
}
]
# 执行聚合操作
orders_collection.aggregate(pipeline)
# 查询新集合 'customer_total_orders' 中的数据
customer_total_orders = db['customer_total_orders']
# 打印新集合中的数据
for doc in customer_total_orders.find():
print(doc)
10、$redact
根据安全策略控制文档的访问,用于处理敏感数据。
from pymongo import MongoClient
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
employees_collection = db['employees'] # 获取员工集合
# 示例员工文档
employees_data = [
{
'name': 'Alice',
'salary': 60000,
'security_level': 'low'
},
{
'name': 'Bob',
'salary': 80000,
'security_level': 'medium'
},
{
'name': 'Charlie',
'salary': 100000,
'security_level': 'high'
}
]
# 插入示例数据到员工集合
employees_collection.insert_many(employees_data)
# 使用 $redact 操作符控制文档字段的可见性
pipeline = [
{
'$redact': {
'$cond': {
'if': {'$eq': ['$security_level', 'low']},
'then': '$$PRUNE', # 如果安全级别为 'low',则隐藏整个文档
'else': '$$DESCEND' # 否则继续显示文档
}
}
}
]
# 执行聚合操作
result = list(employees_collection.aggregate(pipeline))
# 打印经过 $redact 处理后的结果
for doc in result:
print(doc)
11、$bucket
将文档按照指定的条件分组到不同的桶中,类似于 SQL 中的分桶操作。
from pymongo import MongoClient
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
orders_collection = db['orders'] # 获取订单集合
# 示例订单文档
orders_data = [
{'order_id': 1, 'amount': 100},
{'order_id': 2, 'amount': 150},
{'order_id': 3, 'amount': 200},
{'order_id': 4, 'amount': 80},
{'order_id': 5, 'amount': 120},
{'order_id': 6, 'amount': 300},
]
# 插入示例数据到订单集合
orders_collection.insert_many(orders_data)
# 使用 $bucket 操作符对订单金额进行分桶
pipeline = [
{
'$bucket': {
'groupBy': '$amount',
'boundaries': [0, 100, 200, 300], # 定义桶的范围
'default': 'Other', # 当不在任何桶范围内时的默认桶
'output': {
'count': {'$sum': 1}, # 统计每个桶中的订单数量
'orders': {'$push': '$order_id'} # 收集每个桶中的订单号
}
}
}
]
# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))
# 打印每个桶的统计信息
for bucket in result:
print(bucket)
12、$facet
用于执行多个聚合操作,并返回每个操作的结果。它允许在单个聚合查询中执行多个不相关的子聚合,然后将它们的结果组合在一起。
from pymongo import MongoClient
from datetime import datetime
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb'] # 切换到数据库
orders_collection = db['orders'] # 获取订单集合
# 示例订单文档
orders_data = [
{'order_id': 1, 'amount': 100, 'date': datetime(2023, 1, 5), 'category': 'Electronics'},
{'order_id': 2, 'amount': 150, 'date': datetime(2023, 1, 10), 'category': 'Clothing'},
{'order_id': 3, 'amount': 200, 'date': datetime(2023, 2, 15), 'category': 'Electronics'},
{'order_id': 4, 'amount': 80, 'date': datetime(2023, 2, 20), 'category': 'Books'},
{'order_id': 5, 'amount': 120, 'date': datetime(2023, 3, 3), 'category': 'Clothing'},
{'order_id': 6, 'amount': 300, 'date': datetime(2023, 3, 8), 'category': 'Electronics'},
]
# 插入示例数据到订单集合
orders_collection.insert_many(orders_data)
# 使用 $facet 操作符执行多个聚合操作
pipeline = [
{
'$facet': {
'totalSalesByMonth': [
{
'$group': {
'_id': {'$month': '$date'},
'totalSales': {'$sum': '$amount'}
}
},
{
'$sort': {'_id': 1}
}
],
'orderCountByCategory': [
{
'$group': {
'_id': '$category',
'orderCount': {'$sum': 1}
}
}
],
'maxSalesByMonth': [
{
'$group': {
'_id': {'$month': '$date'},
'maxSales': {'$max': '$amount'}
}
},
{
'$sort': {'_id': 1}
}
]
}
}
]
# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))
# 打印每个子聚合的结果
for facet_result in result:
print(facet_result)