=====================
Aggregation Framework
=====================

Tests for the MongoDB aggregation framework implementation.

$Id$

Setup
-----

  >>> import datetime
  >>> from pprint import pprint
  >>> from bson import ObjectId
  >>> import m01.fake
  >>> import m01.fake.testing

Create a test database and collection:

  >>> client = m01.fake.testing.getTestClient()
  >>> db = client['test_aggregation']
  >>> collection = db['orders']

Insert test data:

  >>> from datetime import datetime as dt
  >>> orders = [
  ...     {'_id': 1, 'customer': 'Alice', 'amount': 100, 'status': 'completed',
  ...      'date': dt(2024, 1, 15), 'items': ['apple', 'banana'],
  ...      'qty': 5, 'category': 'fruit'},
  ...     {'_id': 2, 'customer': 'Bob', 'amount': 200, 'status': 'pending',
  ...      'date': dt(2024, 1, 16), 'items': ['carrot'],
  ...      'qty': 3, 'category': 'vegetable'},
  ...     {'_id': 3, 'customer': 'Alice', 'amount': 150, 'status': 'completed',
  ...      'date': dt(2024, 1, 17), 'items': ['apple', 'carrot'],
  ...      'qty': 8, 'category': 'mixed'},
  ...     {'_id': 4, 'customer': 'Charlie', 'amount': 300, 'status': 'completed',
  ...      'date': dt(2024, 1, 18), 'items': ['banana'],
  ...      'qty': 2, 'category': 'fruit'},
  ...     {'_id': 5, 'customer': 'Bob', 'amount': 50, 'status': 'cancelled',
  ...      'date': dt(2024, 1, 19), 'items': ['apple', 'banana', 'carrot'],
  ...      'qty': 10, 'category': 'mixed'},
  ... ]
  >>> for order in orders:
  ...     _ = collection.insert_one(order)


$match Stage
------------

Filter documents:

  >>> result = list(collection.aggregate([
  ...     {'$match': {'status': 'completed'}}
  ... ]))
  >>> len(result)
  3
  >>> [r['customer'] for r in result]
  ['Alice', 'Alice', 'Charlie']

Match with comparison operators:

  >>> result = list(collection.aggregate([
  ...     {'$match': {'amount': {'$gte': 150}}}
  ... ]))
  >>> len(result)
  3
  >>> sorted([r['amount'] for r in result])
  [150, 200, 300]


$project Stage
--------------

Include specific fields:

  >>> result = list(collection.aggregate([
  ...     {'$project': {'customer': 1, 'amount': 1}}
  ... ]))
  >>> sorted(result[0].keys())
  ['_id', 'amount', 'customer']

Exclude _id:

  >>> result = list(collection.aggregate([
  ...     {'$project': {'_id': 0, 'customer': 1, 'amount': 1}}
  ... ]))
  >>> '_id' in result[0]
  False

Expression in projection:

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'customer': 1,
  ...         'totalValue': {'$multiply': ['$amount', '$qty']}
  ...     }}
  ... ]))
  >>> result[0]['totalValue']
  500


$group Stage
------------

Group by field with count:

  >>> result = list(collection.aggregate([
  ...     {'$group': {
  ...         '_id': '$customer',
  ...         'count': {'$sum': 1}
  ...     }}
  ... ]))
  >>> len(result)
  3
  >>> sorted([(r['_id'], r['count']) for r in result])
  [('Alice', 2), ('Bob', 2), ('Charlie', 1)]

Group with multiple accumulators:

  >>> result = list(collection.aggregate([
  ...     {'$group': {
  ...         '_id': '$customer',
  ...         'totalAmount': {'$sum': '$amount'},
  ...         'avgAmount': {'$avg': '$amount'},
  ...         'minAmount': {'$min': '$amount'},
  ...         'maxAmount': {'$max': '$amount'}
  ...     }}
  ... ]))
  >>> alice = [r for r in result if r['_id'] == 'Alice'][0]
  >>> alice['totalAmount']
  250
  >>> alice['avgAmount']
  125.0
  >>> alice['minAmount']
  100
  >>> alice['maxAmount']
  150

Group with $push and $addToSet:

  >>> result = list(collection.aggregate([
  ...     {'$group': {
  ...         '_id': '$status',
  ...         'customers': {'$addToSet': '$customer'},
  ...         'amounts': {'$push': '$amount'}
  ...     }}
  ... ]))
  >>> completed = [r for r in result if r['_id'] == 'completed'][0]
  >>> sorted(completed['customers'])
  ['Alice', 'Charlie']
  >>> sorted(completed['amounts'])
  [100, 150, 300]

Group with $first and $last:

  >>> result = list(collection.aggregate([
  ...     {'$sort': {'date': 1}},
  ...     {'$group': {
  ...         '_id': '$customer',
  ...         'firstOrder': {'$first': '$amount'},
  ...         'lastOrder': {'$last': '$amount'}
  ...     }}
  ... ]))
  >>> alice = [r for r in result if r['_id'] == 'Alice'][0]
  >>> alice['firstOrder']
  100
  >>> alice['lastOrder']
  150

Group all documents (null _id):

  >>> result = list(collection.aggregate([
  ...     {'$group': {
  ...         '_id': None,
  ...         'totalAmount': {'$sum': '$amount'},
  ...         'count': {'$sum': 1}
  ...     }}
  ... ]))
  >>> result[0]['totalAmount']
  800
  >>> result[0]['count']
  5


$sort Stage
-----------

Sort ascending:

  >>> result = list(collection.aggregate([
  ...     {'$sort': {'amount': 1}}
  ... ]))
  >>> [r['amount'] for r in result]
  [50, 100, 150, 200, 300]

Sort descending:

  >>> result = list(collection.aggregate([
  ...     {'$sort': {'amount': -1}}
  ... ]))
  >>> [r['amount'] for r in result]
  [300, 200, 150, 100, 50]

Sort by multiple fields:

  >>> result = list(collection.aggregate([
  ...     {'$sort': {'status': 1, 'amount': -1}}
  ... ]))
  >>> [(r['status'], r['amount']) for r in result]
  [('cancelled', 50), ('completed', 300), ('completed', 150), ('completed', 100), ('pending', 200)]


$limit and $skip Stages
-----------------------

Limit results:

  >>> result = list(collection.aggregate([
  ...     {'$sort': {'amount': -1}},
  ...     {'$limit': 3}
  ... ]))
  >>> [r['amount'] for r in result]
  [300, 200, 150]

Skip results:

  >>> result = list(collection.aggregate([
  ...     {'$sort': {'amount': -1}},
  ...     {'$skip': 2}
  ... ]))
  >>> [r['amount'] for r in result]
  [150, 100, 50]

Pagination (skip + limit):

  >>> result = list(collection.aggregate([
  ...     {'$sort': {'amount': -1}},
  ...     {'$skip': 1},
  ...     {'$limit': 2}
  ... ]))
  >>> [r['amount'] for r in result]
  [200, 150]


$count Stage
------------

Count documents:

  >>> result = list(collection.aggregate([
  ...     {'$match': {'status': 'completed'}},
  ...     {'$count': 'totalCompleted'}
  ... ]))
  >>> result[0]['totalCompleted']
  3


$unwind Stage
-------------

Unwind array field:

  >>> result = list(collection.aggregate([
  ...     {'$match': {'_id': 1}},
  ...     {'$unwind': '$items'}
  ... ]))
  >>> len(result)
  2
  >>> [r['items'] for r in result]
  ['apple', 'banana']

Unwind with index:

  >>> result = list(collection.aggregate([
  ...     {'$match': {'_id': 1}},
  ...     {'$unwind': {'path': '$items', 'includeArrayIndex': 'idx'}}
  ... ]))
  >>> [(r['items'], r['idx']) for r in result]
  [('apple', 0), ('banana', 1)]

Unwind with preserveNullAndEmptyArrays:

  >>> _ = collection.insert_one({'_id': 99, 'customer': 'Test', 'items': []})
  >>> result = list(collection.aggregate([
  ...     {'$match': {'_id': 99}},
  ...     {'$unwind': {'path': '$items', 'preserveNullAndEmptyArrays': True}}
  ... ]))
  >>> len(result)
  1
  >>> _ = collection.delete_one({'_id': 99})


$addFields and $set Stages
--------------------------

Add new fields:

  >>> result = list(collection.aggregate([
  ...     {'$addFields': {
  ...         'discountedAmount': {'$multiply': ['$amount', 0.9]},
  ...         'processed': True
  ...     }},
  ...     {'$limit': 1}
  ... ]))
  >>> result[0]['discountedAmount']
  90.0
  >>> result[0]['processed']
  True

$set is alias for $addFields:

  >>> result = list(collection.aggregate([
  ...     {'$set': {'doubled': {'$multiply': ['$amount', 2]}}},
  ...     {'$limit': 1}
  ... ]))
  >>> result[0]['doubled']
  200


$unset Stage
------------

Remove fields:

  >>> result = list(collection.aggregate([
  ...     {'$unset': ['items', 'date']},
  ...     {'$limit': 1}
  ... ]))
  >>> 'items' in result[0]
  False
  >>> 'date' in result[0]
  False
  >>> 'customer' in result[0]
  True


$sortByCount Stage
------------------

Group, count, and sort:

  >>> result = list(collection.aggregate([
  ...     {'$sortByCount': '$category'}
  ... ]))
  >>> len(result)
  3
  >>> result[-1]['_id'], result[-1]['count']  # vegetable has lowest count
  ('vegetable', 1)
  >>> sorted(r['_id'] for r in result if r['count'] == 2)  # fruit and mixed have count 2
  ['fruit', 'mixed']


$sample Stage
-------------

Random sample:

  >>> result = list(collection.aggregate([
  ...     {'$sample': {'size': 2}}
  ... ]))
  >>> len(result)
  2


$replaceRoot Stage
------------------

Replace document with embedded document:

  >>> _ = collection.insert_one({
  ...     '_id': 100, 'metadata': {'source': 'web', 'version': 1}
  ... })
  >>> result = list(collection.aggregate([
  ...     {'$match': {'_id': 100}},
  ...     {'$replaceRoot': {'newRoot': '$metadata'}}
  ... ]))
  >>> result[0]
  {'source': 'web', 'version': 1}
  >>> _ = collection.delete_one({'_id': 100})


$lookup Stage
-------------

Create related collection:

  >>> customers = db['customers']
  >>> _ = customers.insert_one({'_id': 'Alice', 'tier': 'gold'})
  >>> _ = customers.insert_one({'_id': 'Bob', 'tier': 'silver'})
  >>> _ = customers.insert_one({'_id': 'Charlie', 'tier': 'bronze'})

Join collections:

  >>> result = list(collection.aggregate([
  ...     {'$match': {'_id': 1}},
  ...     {'$lookup': {
  ...         'from': 'customers',
  ...         'localField': 'customer',
  ...         'foreignField': '_id',
  ...         'as': 'customerInfo'
  ...     }}
  ... ]))
  >>> result[0]['customerInfo'][0]['tier']
  'gold'


$facet Stage
------------

Multiple pipelines:

  >>> result = list(collection.aggregate([
  ...     {'$facet': {
  ...         'byStatus': [
  ...             {'$group': {'_id': '$status', 'count': {'$sum': 1}}}
  ...         ],
  ...         'topAmount': [
  ...             {'$sort': {'amount': -1}},
  ...             {'$limit': 2},
  ...             {'$project': {'customer': 1, 'amount': 1}}
  ...         ]
  ...     }}
  ... ]))
  >>> len(result)
  1
  >>> len(result[0]['byStatus'])
  3
  >>> len(result[0]['topAmount'])
  2


$bucket Stage
-------------

Bucket by amount:

  >>> result = list(collection.aggregate([
  ...     {'$bucket': {
  ...         'groupBy': '$amount',
  ...         'boundaries': [0, 100, 200, 500],
  ...         'default': 'other',
  ...         'output': {
  ...             'count': {'$sum': 1},
  ...             'total': {'$sum': '$amount'}
  ...         }
  ...     }}
  ... ]))
  >>> sorted([(r['_id'], r['count']) for r in result])
  [(0, 1), (100, 2), (200, 2)]


Expression Operators
====================

Arithmetic Operators
--------------------

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'add': {'$add': ['$amount', 10]},
  ...         'subtract': {'$subtract': ['$amount', 10]},
  ...         'multiply': {'$multiply': ['$amount', 2]},
  ...         'divide': {'$divide': ['$amount', 2]},
  ...         'mod': {'$mod': ['$amount', 30]}
  ...     }},
  ...     {'$match': {'_id': 1}}
  ... ]))
  >>> result[0]['add']
  110
  >>> result[0]['subtract']
  90
  >>> result[0]['multiply']
  200
  >>> result[0]['divide']
  50.0
  >>> result[0]['mod']
  10


Comparison Operators
--------------------

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'isExpensive': {'$gt': ['$amount', 100]},
  ...         'isCheap': {'$lt': ['$amount', 100]},
  ...         'isExact': {'$eq': ['$amount', 100]}
  ...     }},
  ...     {'$match': {'_id': 1}}
  ... ]))
  >>> result[0]['isExpensive']
  False
  >>> result[0]['isCheap']
  False
  >>> result[0]['isExact']
  True


Boolean Operators
-----------------

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'bothTrue': {'$and': [True, True]},
  ...         'eitherTrue': {'$or': [True, False]},
  ...         'notTrue': {'$not': [True]}
  ...     }},
  ...     {'$limit': 1}
  ... ]))
  >>> result[0]['bothTrue']
  True
  >>> result[0]['eitherTrue']
  True
  >>> result[0]['notTrue']
  False


Conditional Operators
---------------------

$cond:

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'priceCategory': {
  ...             '$cond': {
  ...                 'if': {'$gte': ['$amount', 200]},
  ...                 'then': 'expensive',
  ...                 'else': 'affordable'
  ...             }
  ...         }
  ...     }}
  ... ]))
  >>> sorted(set(r['priceCategory'] for r in result))
  ['affordable', 'expensive']

$ifNull:

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'discount': {'$ifNull': ['$discount', 0]}
  ...     }},
  ...     {'$limit': 1}
  ... ]))
  >>> result[0]['discount']
  0

$switch:

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'statusLabel': {
  ...             '$switch': {
  ...                 'branches': [
  ...                     {'case': {'$eq': ['$status', 'completed']}, 'then': 'Done'},
  ...                     {'case': {'$eq': ['$status', 'pending']}, 'then': 'Waiting'}
  ...                 ],
  ...                 'default': 'Unknown'
  ...             }
  ...         }
  ...     }}
  ... ]))
  >>> sorted(set(r['statusLabel'] for r in result))
  ['Done', 'Unknown', 'Waiting']


Date Operators
--------------

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'year': {'$year': '$date'},
  ...         'month': {'$month': '$date'},
  ...         'day': {'$dayOfMonth': '$date'}
  ...     }},
  ...     {'$match': {'_id': 1}}
  ... ]))
  >>> result[0]['year']
  2024
  >>> result[0]['month']
  1
  >>> result[0]['day']
  15

$dateToString:

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'dateStr': {
  ...             '$dateToString': {
  ...                 'date': '$date',
  ...                 'format': '%Y-%m-%d'
  ...             }
  ...         }
  ...     }},
  ...     {'$match': {'_id': 1}}
  ... ]))
  >>> result[0]['dateStr']
  '2024-01-15'


String Operators
----------------

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'upper': {'$toUpper': '$customer'},
  ...         'lower': {'$toLower': '$customer'},
  ...         'concat': {'$concat': ['$customer', '-', '$status']}
  ...     }},
  ...     {'$match': {'_id': 1}}
  ... ]))
  >>> result[0]['upper']
  'ALICE'
  >>> result[0]['lower']
  'alice'
  >>> result[0]['concat']
  'Alice-completed'


Array Operators
---------------

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'itemCount': {'$size': '$items'},
  ...         'firstItem': {'$arrayElemAt': ['$items', 0]},
  ...         'lastItem': {'$arrayElemAt': ['$items', -1]}
  ...     }},
  ...     {'$match': {'_id': 1}}
  ... ]))
  >>> result[0]['itemCount']
  2
  >>> result[0]['firstItem']
  'apple'
  >>> result[0]['lastItem']
  'banana'

$filter:

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'apples': {
  ...             '$filter': {
  ...                 'input': '$items',
  ...                 'as': 'item',
  ...                 'cond': {'$eq': ['$$item', 'apple']}
  ...             }
  ...         }
  ...     }},
  ...     {'$match': {'_id': 1}}
  ... ]))
  >>> result[0]['apples']
  ['apple']

$map:

  >>> result = list(collection.aggregate([
  ...     {'$project': {
  ...         'upperItems': {
  ...             '$map': {
  ...                 'input': '$items',
  ...                 'as': 'item',
  ...                 'in': {'$toUpper': '$$item'}
  ...             }
  ...         }
  ...     }},
  ...     {'$match': {'_id': 1}}
  ... ]))
  >>> result[0]['upperItems']
  ['APPLE', 'BANANA']


Complex Pipeline Example
========================

Multi-stage pipeline for sales analysis:

  >>> result = list(collection.aggregate([
  ...     # Filter completed orders
  ...     {'$match': {'status': 'completed'}},
  ...     # Calculate order value
  ...     {'$addFields': {
  ...         'orderValue': {'$multiply': ['$amount', '$qty']}
  ...     }},
  ...     # Group by customer
  ...     {'$group': {
  ...         '_id': '$customer',
  ...         'totalOrders': {'$sum': 1},
  ...         'totalValue': {'$sum': '$orderValue'},
  ...         'avgValue': {'$avg': '$orderValue'}
  ...     }},
  ...     # Sort by total value
  ...     {'$sort': {'totalValue': -1}},
  ...     # Format output
  ...     {'$project': {
  ...         'customer': '$_id',
  ...         'totalOrders': 1,
  ...         'totalValue': 1,
  ...         'avgValue': {'$round': ['$avgValue', 0]}
  ...     }}
  ... ]))
  >>> len(result)
  2
  >>> result[0]['customer']
  'Alice'


Cleanup
-------

  >>> db.drop_collection('orders')
  >>> db.drop_collection('customers')
