Python MongoDB 聚合管道操作符及使用

MongoDB 聚合操作是 MongoDB 数据库中用于数据处理和分析的功能之一,它允许你对文档进行多步处理以生成所需的结果。聚合操作可以用于查询、转换、过滤和分析数据,它们通常用于替代传统的 SQL 查询。本文主要介绍Python中使用pymongo实现聚合操作,和聚合管道操作符及使用。

1、$match

筛选文档,类似于 SQL 的 WHERE 子句。可以使用 $match 来选择满足特定条件的文档。

from pymongo import MongoClient

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
orders_collection = db['orders']  # 获取集合

# 使用 $match 操作筛选文档
pipeline = [
    {
        '$match': {
            'amount': {'$gte': 1000},  # 大于等于 1000 的金额
            'date': {'$gte': '2023-01-01', '$lte': '2023-12-31'}  # 日期范围筛选
        }
    }
]

# 执行聚合操作
result = list(sales_collection.aggregate(pipeline))

# 打印筛选结果
for doc in result:
    print(doc)

2、$group

分组和汇总数据,类似于 SQL 的 GROUP BY 子句。你可以使用 $group 来对文档进行分组,并进行聚合操作,如计算总和、平均值、计数等。

from pymongo import MongoClient

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
orders_collection = db['orders']  # 获取集合

# 使用 $group 操作对文档分组和聚合
pipeline = [
    {
        '$group': {
            '_id': '$product',  # 根据产品名称分组
            'total_sales': {'$sum': '$amount'},  # 计算总销售额
            'average_sales': {'$avg': '$amount'}  # 计算平均销售额
        }
    }
]

# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))

# 打印分组和聚合结果
for doc in result:
    print(doc)

3、$project

投影操作,类似于 SQL 的 SELECT 子句。可以使用 $project 来选择要返回的字段,并可以进行计算、重命名字段等操作。

from pymongo import MongoClient

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
orders_collection = db['orders']  # 获取集合

# 使用 $project 操作选择指定字段并计算新字段
pipeline = [
    {
        '$project': {
            'name': 1,  # 选择姓名字段
            'age': 1,   # 选择年龄字段
            'discount': {'$multiply': ['$amount', 0.1]}  # 计算折扣字段(销售额的 10%)
        }
    }
]

# 执行查询和投影操作
result = list(orders_collection.aggregate(pipeline))

# 打印查询结果
for doc in result:
    print(doc)

注意:$multiply 操作符用于在 MongoDB 聚合管道中执行乘法操作。它可以将一个或多个字段的值相乘,或将字段的值与常数相乘,并将结果作为新的字段添加到聚合文档中。

4、$sort

排序文档,类似于 SQL 的 ORDER BY 子句。可以使用 $sort 来指定文档的排序方式,升序或降序。

from pymongo import MongoClient

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
orders_collection = db['orders']  # 获取集合

# 使用 $sort 操作对文档进行降序排序
pipeline = [
    {
        '$sort': {
            'amount': -1  # 降序排序,使用 -1 表示降序,1 表示升序
        }
    }
]

# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))

# 打印排序后的查询结果
for doc in result:
    print(doc)

5、$limit 和 $skip

限制结果集的大小,类似于 SQL 的 LIMIT 和 OFFSET 子句。$limit 用于限制返回的文档数量,而 $skip 用于跳过一定数量的文档。

from pymongo import MongoClient

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
orders_collection = db['orders']  # 获取集合

# 定义分页参数
page_number = 2  # 第二页
page_size = 5  # 每页显示 5 条记录

# 使用 $skip 和 $limit 进行分页查询
pipeline = [
    {
        '$skip': (page_number - 1) * page_size  # 计算要跳过的文档数量
    },
    {
        '$limit': page_size  # 限制每页返回的文档数量
    }
]

# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))

# 打印分页结果
for doc in result:
    print(doc)

6、$unwind

展开数组字段,将数组字段的每个元素拆分成单独的文档。这对于处理包含数组的文档非常有用。

from pymongo import MongoClient

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
orders_collection = db['orders']  # 获取集合

# 示例订单文档
orders_data = [
    {
        'order_id': 1,
        'items': ['item1', 'item2', 'item3']
    },
    {
        'order_id': 2,
        'items': ['item4', 'item5']
    }
]

# 插入示例订单数据
orders_collection.insert_many(orders_data)

# 使用 $unwind 操作符展开订单项数组
pipeline = [
    {
        '$unwind': '$items'  # 展开 items 数组字段
    }
]

# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))

# 打印展开后的结果
for doc in result:
    print(doc)

7、$lookup

执行左外连接操作,将两个集合中的文档关联在一起。这允许在一个文档中访问另一个集合的数据。

from pymongo import MongoClient

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
orders_collection = db['orders']  # 获取订单集合
products_collection = db['products']  # 获取产品集合

# 示例订单文档
orders_data = [
    {
        'order_id': 1,
        'product_id': 101,
        'quantity': 3
    },
    {
        'order_id': 2,
        'product_id': 102,
        'quantity': 2
    },
    {
        'order_id': 3,
        'product_id': 101,
        'quantity': 1
    }
]

# 示例产品文档
products_data = [
    {
        'product_id': 101,
        'product_name': 'Product A',
        'price': 50
    },
    {
        'product_id': 102,
        'product_name': 'Product B',
        'price': 30
    }
]

# 插入示例数据到订单集合和产品集合
orders_collection.insert_many(orders_data)
products_collection.insert_many(products_data)

# 使用 $lookup 操作符连接订单和产品集合
pipeline = [
    {
        '$lookup': {
            'from': 'products',  # 目标集合名称
            'localField': 'product_id',  # 本地集合的连接字段
            'foreignField': 'product_id',  # 目标集合的连接字段
            'as': 'product_info'  # 结果别名
        }
    },
    {
        '$unwind': '$product_info'  # 展开连接后的结果数组
    }
]

# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))

# 打印连接后的结果
for doc in result:
    print(doc)

8、$addFields 和 $set

添加新字段或修改现有字段的值。可以使用 $addFields 来添加新的计算字段,而 $set 用于更新现有字段的值。

from pymongo import MongoClient

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
students_collection = db['students']  # 获取学生集合

# 示例学生文档
students_data = [
    {
        'student_id': 1,
        'name': 'Alice',
        'math_score': 90,
        'english_score': 85
    },
    {
        'student_id': 2,
        'name': 'Bob',
        'math_score': 78,
        'english_score': 92
    }
]

# 插入示例数据到学生集合
students_collection.insert_many(students_data)

# 使用 $addFields 操作符添加 "total_score" 字段
pipeline = [
    {
        '$addFields': {
            'total_score': {'$sum': ['$math_score', '$english_score']}
        }
    }
]

# 执行聚合操作
result = list(students_collection.aggregate(pipeline))

# 打印结果
for doc in result:
    print(doc)

9、$out

将聚合操作的结果写入到新的集合中,可以用于创建新的集合以保存聚合后的数据。

from pymongo import MongoClient

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
orders_collection = db['orders']  # 获取订单集合

# 示例订单文档
orders_data = [
    {
        'order_id': 1,
        'customer_id': 101,
        'order_amount': 50
    },
    {
        'order_id': 2,
        'customer_id': 102,
        'order_amount': 30
    },
    {
        'order_id': 3,
        'customer_id': 101,
        'order_amount': 70
    }
]

# 插入示例数据到订单集合
orders_collection.insert_many(orders_data)

# 使用 $group 操作符计算每个客户的总订单金额
pipeline = [
    {
        '$group': {
            '_id': '$customer_id',
            'total_amount': {'$sum': '$order_amount'}
        }
    },
    {
        '$out': 'customer_total_orders'  # 将结果输出到名为 'customer_total_orders' 的集合中
    }
]

# 执行聚合操作
orders_collection.aggregate(pipeline)

# 查询新集合 'customer_total_orders' 中的数据
customer_total_orders = db['customer_total_orders']

# 打印新集合中的数据
for doc in customer_total_orders.find():
    print(doc)

10、$redact

根据安全策略控制文档的访问,用于处理敏感数据。

from pymongo import MongoClient

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
employees_collection = db['employees']  # 获取员工集合

# 示例员工文档
employees_data = [
    {
        'name': 'Alice',
        'salary': 60000,
        'security_level': 'low'
    },
    {
        'name': 'Bob',
        'salary': 80000,
        'security_level': 'medium'
    },
    {
        'name': 'Charlie',
        'salary': 100000,
        'security_level': 'high'
    }
]

# 插入示例数据到员工集合
employees_collection.insert_many(employees_data)

# 使用 $redact 操作符控制文档字段的可见性
pipeline = [
    {
        '$redact': {
            '$cond': {
                'if': {'$eq': ['$security_level', 'low']},
                'then': '$$PRUNE',  # 如果安全级别为 'low',则隐藏整个文档
                'else': '$$DESCEND'  # 否则继续显示文档
            }
        }
    }
]

# 执行聚合操作
result = list(employees_collection.aggregate(pipeline))

# 打印经过 $redact 处理后的结果
for doc in result:
    print(doc)

11、$bucket

将文档按照指定的条件分组到不同的桶中,类似于 SQL 中的分桶操作。

from pymongo import MongoClient

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
orders_collection = db['orders']  # 获取订单集合

# 示例订单文档
orders_data = [
    {'order_id': 1, 'amount': 100},
    {'order_id': 2, 'amount': 150},
    {'order_id': 3, 'amount': 200},
    {'order_id': 4, 'amount': 80},
    {'order_id': 5, 'amount': 120},
    {'order_id': 6, 'amount': 300},
]

# 插入示例数据到订单集合
orders_collection.insert_many(orders_data)

# 使用 $bucket 操作符对订单金额进行分桶
pipeline = [
    {
        '$bucket': {
            'groupBy': '$amount',
            'boundaries': [0, 100, 200, 300],  # 定义桶的范围
            'default': 'Other',  # 当不在任何桶范围内时的默认桶
            'output': {
                'count': {'$sum': 1},  # 统计每个桶中的订单数量
                'orders': {'$push': '$order_id'}  # 收集每个桶中的订单号
            }
        }
    }
]

# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))

# 打印每个桶的统计信息
for bucket in result:
    print(bucket)

12、$facet

用于执行多个聚合操作,并返回每个操作的结果。它允许在单个聚合查询中执行多个不相关的子聚合,然后将它们的结果组合在一起。

from pymongo import MongoClient
from datetime import datetime

# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['mydb']  # 切换到数据库
orders_collection = db['orders']  # 获取订单集合

# 示例订单文档
orders_data = [
    {'order_id': 1, 'amount': 100, 'date': datetime(2023, 1, 5), 'category': 'Electronics'},
    {'order_id': 2, 'amount': 150, 'date': datetime(2023, 1, 10), 'category': 'Clothing'},
    {'order_id': 3, 'amount': 200, 'date': datetime(2023, 2, 15), 'category': 'Electronics'},
    {'order_id': 4, 'amount': 80, 'date': datetime(2023, 2, 20), 'category': 'Books'},
    {'order_id': 5, 'amount': 120, 'date': datetime(2023, 3, 3), 'category': 'Clothing'},
    {'order_id': 6, 'amount': 300, 'date': datetime(2023, 3, 8), 'category': 'Electronics'},
]

# 插入示例数据到订单集合
orders_collection.insert_many(orders_data)

# 使用 $facet 操作符执行多个聚合操作
pipeline = [
    {
        '$facet': {
            'totalSalesByMonth': [
                {
                    '$group': {
                        '_id': {'$month': '$date'},
                        'totalSales': {'$sum': '$amount'}
                    }
                },
                {
                    '$sort': {'_id': 1}
                }
            ],
            'orderCountByCategory': [
                {
                    '$group': {
                        '_id': '$category',
                        'orderCount': {'$sum': 1}
                    }
                }
            ],
            'maxSalesByMonth': [
                {
                    '$group': {
                        '_id': {'$month': '$date'},
                        'maxSales': {'$max': '$amount'}
                    }
                },
                {
                    '$sort': {'_id': 1}
                }
            ]
        }
    }
]

# 执行聚合操作
result = list(orders_collection.aggregate(pipeline))

# 打印每个子聚合的结果
for facet_result in result:
    print(facet_result)

推荐阅读
cjavapy编程之路首页