Coverage for src/antarctic/pandas_field.py: 100%
34 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-07 13:57 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-07 13:57 +0000
1"""Provide a custom Pandas field type for MongoEngine.
3It allows storing pandas DataFrames in MongoDB by converting them to and from
4parquet-formatted byte streams. This enables efficient storage and retrieval
5of pandas data structures within MongoDB documents.
6"""
8from __future__ import annotations
10from io import BytesIO
11from typing import Any
13import pandas as pd
14import pyarrow as pa
15import pyarrow.parquet as pq
16from mongoengine.base import BaseField
19def _read(value: bytes, columns: list[str] | None = None) -> pd.DataFrame:
20 """Read a DataFrame from its binary parquet representation.
22 Args:
23 value: Binary representation of a DataFrame stored as parquet
24 columns: Optional list of column names to load (loads all columns if None)
26 Returns:
27 pd.DataFrame: The reconstructed pandas DataFrame
29 """
30 with BytesIO(value) as buffer:
31 table = pq.read_table(buffer, columns=columns)
32 return table.to_pandas()
35def _write(value: pd.DataFrame, compression: str = "zstd") -> bytes:
36 """Convert a Pandas DataFrame into a compressed parquet byte-stream.
38 The byte-stream encodes in its metadata the structure of the Pandas object,
39 including column names, data types, and index information.
41 Args:
42 value: The DataFrame to convert
43 compression: Compression algorithm to use (default: "zstd")
45 Returns:
46 bytes: Binary representation of the DataFrame in parquet format
48 """
49 if isinstance(value, pd.DataFrame):
50 table = pa.Table.from_pandas(value)
52 with BytesIO() as buffer:
53 pq.write_table(table, buffer, compression=compression)
54 return buffer.getvalue()
57class PandasField(BaseField):
58 """Custom MongoEngine field type for storing pandas DataFrames.
60 This field handles the conversion between pandas DataFrames and binary data
61 that can be stored in MongoDB. It uses parquet format for efficient storage
62 and retrieval of tabular data.
63 """
65 def __init__(self, compression: str = "zstd", **kwargs: Any) -> None:
66 """Initialize a PandasField.
68 Args:
69 compression: Compression algorithm to use for parquet storage (default: "zstd")
70 **kwargs: Additional arguments passed to the parent BaseField
72 """
73 super().__init__(**kwargs)
74 self.compression = compression
76 def __set__(self, instance: Any, value: pd.DataFrame | bytes | None) -> None:
77 """Convert and set the value for this field.
79 If the value is a DataFrame, it's converted to a parquet byte stream.
80 If it's already bytes, it's stored as-is.
82 Args:
83 instance: The document instance
84 value: The value to set (DataFrame, bytes, or None)
86 Raises:
87 AssertionError: If the value is neither a DataFrame, bytes, nor None
89 """
90 if value is not None:
91 if isinstance(value, pd.DataFrame):
92 # Convert DataFrame to binary format for storage
93 value = _write(value, compression=self.compression)
94 elif isinstance(value, bytes):
95 # Already in binary format, store as-is
96 pass
97 else:
98 raise AssertionError(f"Type of value {type(value)} not supported. Expected DataFrame or bytes.")
99 super().__set__(instance, value)
101 def __get__(self, instance: Any, owner: type) -> pd.DataFrame | None:
102 """Retrieve and convert the stored value back to a DataFrame.
104 Args:
105 instance: The document instance
106 owner: The document class
108 Returns:
109 Optional[pd.DataFrame]: The retrieved DataFrame or None if no data
111 """
112 data = super().__get__(instance, owner)
114 if data is not None:
115 return _read(data)
117 return None