1、$match
筛选文档,类似于 SQL 的 WHERE 子句。可以使用 $match 来选择满足特定条件的文档。
from pymongo import MongoClient # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 orders_collection = db['orders'] # 获取集合 # 使用 $match 操作筛选文档 pipeline = [ { '$match': { 'amount': {'$gte': 1000}, # 大于等于 1000 的金额 'date': {'$gte': '2023-01-01', '$lte': '2023-12-31'} # 日期范围筛选 } } ] # 执行聚合操作 result = list(sales_collection.aggregate(pipeline)) # 打印筛选结果 for doc in result: print(doc)
2、$group
分组和汇总数据,类似于 SQL 的 GROUP BY 子句。你可以使用 $group 来对文档进行分组,并进行聚合操作,如计算总和、平均值、计数等。
from pymongo import MongoClient # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 orders_collection = db['orders'] # 获取集合 # 使用 $group 操作对文档分组和聚合 pipeline = [ { '$group': { '_id': '$product', # 根据产品名称分组 'total_sales': {'$sum': '$amount'}, # 计算总销售额 'average_sales': {'$avg': '$amount'} # 计算平均销售额 } } ] # 执行聚合操作 result = list(orders_collection.aggregate(pipeline)) # 打印分组和聚合结果 for doc in result: print(doc)
3、$project
投影操作,类似于 SQL 的 SELECT 子句。可以使用 $project 来选择要返回的字段,并可以进行计算、重命名字段等操作。
from pymongo import MongoClient # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 orders_collection = db['orders'] # 获取集合 # 使用 $project 操作选择指定字段并计算新字段 pipeline = [ { '$project': { 'name': 1, # 选择姓名字段 'age': 1, # 选择年龄字段 'discount': {'$multiply': ['$amount', 0.1]} # 计算折扣字段(销售额的 10%) } } ] # 执行查询和投影操作 result = list(orders_collection.aggregate(pipeline)) # 打印查询结果 for doc in result: print(doc)
注意:$multiply
操作符用于在 MongoDB 聚合管道中执行乘法操作。它可以将一个或多个字段的值相乘,或将字段的值与常数相乘,并将结果作为新的字段添加到聚合文档中。
4、$sort
排序文档,类似于 SQL 的 ORDER BY 子句。可以使用 $sort 来指定文档的排序方式,升序或降序。
from pymongo import MongoClient # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 orders_collection = db['orders'] # 获取集合 # 使用 $sort 操作对文档进行降序排序 pipeline = [ { '$sort': { 'amount': -1 # 降序排序,使用 -1 表示降序,1 表示升序 } } ] # 执行聚合操作 result = list(orders_collection.aggregate(pipeline)) # 打印排序后的查询结果 for doc in result: print(doc)
5、$limit 和 $skip
限制结果集的大小,类似于 SQL 的 LIMIT 和 OFFSET 子句。$limit 用于限制返回的文档数量,而 $skip 用于跳过一定数量的文档。
from pymongo import MongoClient # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 orders_collection = db['orders'] # 获取集合 # 定义分页参数 page_number = 2 # 第二页 page_size = 5 # 每页显示 5 条记录 # 使用 $skip 和 $limit 进行分页查询 pipeline = [ { '$skip': (page_number - 1) * page_size # 计算要跳过的文档数量 }, { '$limit': page_size # 限制每页返回的文档数量 } ] # 执行聚合操作 result = list(orders_collection.aggregate(pipeline)) # 打印分页结果 for doc in result: print(doc)
6、$unwind
展开数组字段,将数组字段的每个元素拆分成单独的文档。这对于处理包含数组的文档非常有用。
from pymongo import MongoClient # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 orders_collection = db['orders'] # 获取集合 # 示例订单文档 orders_data = [ { 'order_id': 1, 'items': ['item1', 'item2', 'item3'] }, { 'order_id': 2, 'items': ['item4', 'item5'] } ] # 插入示例订单数据 orders_collection.insert_many(orders_data) # 使用 $unwind 操作符展开订单项数组 pipeline = [ { '$unwind': '$items' # 展开 items 数组字段 } ] # 执行聚合操作 result = list(orders_collection.aggregate(pipeline)) # 打印展开后的结果 for doc in result: print(doc)
7、$lookup
执行左外连接操作,将两个集合中的文档关联在一起。这允许在一个文档中访问另一个集合的数据。
from pymongo import MongoClient # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 orders_collection = db['orders'] # 获取订单集合 products_collection = db['products'] # 获取产品集合 # 示例订单文档 orders_data = [ { 'order_id': 1, 'product_id': 101, 'quantity': 3 }, { 'order_id': 2, 'product_id': 102, 'quantity': 2 }, { 'order_id': 3, 'product_id': 101, 'quantity': 1 } ] # 示例产品文档 products_data = [ { 'product_id': 101, 'product_name': 'Product A', 'price': 50 }, { 'product_id': 102, 'product_name': 'Product B', 'price': 30 } ] # 插入示例数据到订单集合和产品集合 orders_collection.insert_many(orders_data) products_collection.insert_many(products_data) # 使用 $lookup 操作符连接订单和产品集合 pipeline = [ { '$lookup': { 'from': 'products', # 目标集合名称 'localField': 'product_id', # 本地集合的连接字段 'foreignField': 'product_id', # 目标集合的连接字段 'as': 'product_info' # 结果别名 } }, { '$unwind': '$product_info' # 展开连接后的结果数组 } ] # 执行聚合操作 result = list(orders_collection.aggregate(pipeline)) # 打印连接后的结果 for doc in result: print(doc)
8、$addFields 和 $set
添加新字段或修改现有字段的值。可以使用 $addFields 来添加新的计算字段,而 $set 用于更新现有字段的值。
from pymongo import MongoClient # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 students_collection = db['students'] # 获取学生集合 # 示例学生文档 students_data = [ { 'student_id': 1, 'name': 'Alice', 'math_score': 90, 'english_score': 85 }, { 'student_id': 2, 'name': 'Bob', 'math_score': 78, 'english_score': 92 } ] # 插入示例数据到学生集合 students_collection.insert_many(students_data) # 使用 $addFields 操作符添加 "total_score" 字段 pipeline = [ { '$addFields': { 'total_score': {'$sum': ['$math_score', '$english_score']} } } ] # 执行聚合操作 result = list(students_collection.aggregate(pipeline)) # 打印结果 for doc in result: print(doc)
9、$out
将聚合操作的结果写入到新的集合中,可以用于创建新的集合以保存聚合后的数据。
from pymongo import MongoClient # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 orders_collection = db['orders'] # 获取订单集合 # 示例订单文档 orders_data = [ { 'order_id': 1, 'customer_id': 101, 'order_amount': 50 }, { 'order_id': 2, 'customer_id': 102, 'order_amount': 30 }, { 'order_id': 3, 'customer_id': 101, 'order_amount': 70 } ] # 插入示例数据到订单集合 orders_collection.insert_many(orders_data) # 使用 $group 操作符计算每个客户的总订单金额 pipeline = [ { '$group': { '_id': '$customer_id', 'total_amount': {'$sum': '$order_amount'} } }, { '$out': 'customer_total_orders' # 将结果输出到名为 'customer_total_orders' 的集合中 } ] # 执行聚合操作 orders_collection.aggregate(pipeline) # 查询新集合 'customer_total_orders' 中的数据 customer_total_orders = db['customer_total_orders'] # 打印新集合中的数据 for doc in customer_total_orders.find(): print(doc)
10、$redact
根据安全策略控制文档的访问,用于处理敏感数据。
from pymongo import MongoClient # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 employees_collection = db['employees'] # 获取员工集合 # 示例员工文档 employees_data = [ { 'name': 'Alice', 'salary': 60000, 'security_level': 'low' }, { 'name': 'Bob', 'salary': 80000, 'security_level': 'medium' }, { 'name': 'Charlie', 'salary': 100000, 'security_level': 'high' } ] # 插入示例数据到员工集合 employees_collection.insert_many(employees_data) # 使用 $redact 操作符控制文档字段的可见性 pipeline = [ { '$redact': { '$cond': { 'if': {'$eq': ['$security_level', 'low']}, 'then': '$$PRUNE', # 如果安全级别为 'low',则隐藏整个文档 'else': '$$DESCEND' # 否则继续显示文档 } } } ] # 执行聚合操作 result = list(employees_collection.aggregate(pipeline)) # 打印经过 $redact 处理后的结果 for doc in result: print(doc)
11、$bucket
将文档按照指定的条件分组到不同的桶中,类似于 SQL 中的分桶操作。
from pymongo import MongoClient # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 orders_collection = db['orders'] # 获取订单集合 # 示例订单文档 orders_data = [ {'order_id': 1, 'amount': 100}, {'order_id': 2, 'amount': 150}, {'order_id': 3, 'amount': 200}, {'order_id': 4, 'amount': 80}, {'order_id': 5, 'amount': 120}, {'order_id': 6, 'amount': 300}, ] # 插入示例数据到订单集合 orders_collection.insert_many(orders_data) # 使用 $bucket 操作符对订单金额进行分桶 pipeline = [ { '$bucket': { 'groupBy': '$amount', 'boundaries': [0, 100, 200, 300], # 定义桶的范围 'default': 'Other', # 当不在任何桶范围内时的默认桶 'output': { 'count': {'$sum': 1}, # 统计每个桶中的订单数量 'orders': {'$push': '$order_id'} # 收集每个桶中的订单号 } } } ] # 执行聚合操作 result = list(orders_collection.aggregate(pipeline)) # 打印每个桶的统计信息 for bucket in result: print(bucket)
12、$facet
用于执行多个聚合操作,并返回每个操作的结果。它允许在单个聚合查询中执行多个不相关的子聚合,然后将它们的结果组合在一起。
from pymongo import MongoClient from datetime import datetime # 连接到 MongoDB 数据库 client = MongoClient('mongodb://localhost:27017/') db = client['mydb'] # 切换到数据库 orders_collection = db['orders'] # 获取订单集合 # 示例订单文档 orders_data = [ {'order_id': 1, 'amount': 100, 'date': datetime(2023, 1, 5), 'category': 'Electronics'}, {'order_id': 2, 'amount': 150, 'date': datetime(2023, 1, 10), 'category': 'Clothing'}, {'order_id': 3, 'amount': 200, 'date': datetime(2023, 2, 15), 'category': 'Electronics'}, {'order_id': 4, 'amount': 80, 'date': datetime(2023, 2, 20), 'category': 'Books'}, {'order_id': 5, 'amount': 120, 'date': datetime(2023, 3, 3), 'category': 'Clothing'}, {'order_id': 6, 'amount': 300, 'date': datetime(2023, 3, 8), 'category': 'Electronics'}, ] # 插入示例数据到订单集合 orders_collection.insert_many(orders_data) # 使用 $facet 操作符执行多个聚合操作 pipeline = [ { '$facet': { 'totalSalesByMonth': [ { '$group': { '_id': {'$month': '$date'}, 'totalSales': {'$sum': '$amount'} } }, { '$sort': {'_id': 1} } ], 'orderCountByCategory': [ { '$group': { '_id': '$category', 'orderCount': {'$sum': 1} } } ], 'maxSalesByMonth': [ { '$group': { '_id': {'$month': '$date'}, 'maxSales': {'$max': '$amount'} } }, { '$sort': {'_id': 1} } ] } } ] # 执行聚合操作 result = list(orders_collection.aggregate(pipeline)) # 打印每个子聚合的结果 for facet_result in result: print(facet_result)