Coverage for src/antarctic/pandas_field.py: 100%

34 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-07 13:57 +0000

1"""Provide a custom Pandas field type for MongoEngine. 

2 

3It allows storing pandas DataFrames in MongoDB by converting them to and from 

4parquet-formatted byte streams. This enables efficient storage and retrieval 

5of pandas data structures within MongoDB documents. 

6""" 

7 

8from __future__ import annotations 

9 

10from io import BytesIO 

11from typing import Any 

12 

13import pandas as pd 

14import pyarrow as pa 

15import pyarrow.parquet as pq 

16from mongoengine.base import BaseField 

17 

18 

19def _read(value: bytes, columns: list[str] | None = None) -> pd.DataFrame: 

20 """Read a DataFrame from its binary parquet representation. 

21 

22 Args: 

23 value: Binary representation of a DataFrame stored as parquet 

24 columns: Optional list of column names to load (loads all columns if None) 

25 

26 Returns: 

27 pd.DataFrame: The reconstructed pandas DataFrame 

28 

29 """ 

30 with BytesIO(value) as buffer: 

31 table = pq.read_table(buffer, columns=columns) 

32 return table.to_pandas() 

33 

34 

35def _write(value: pd.DataFrame, compression: str = "zstd") -> bytes: 

36 """Convert a Pandas DataFrame into a compressed parquet byte-stream. 

37 

38 The byte-stream encodes in its metadata the structure of the Pandas object, 

39 including column names, data types, and index information. 

40 

41 Args: 

42 value: The DataFrame to convert 

43 compression: Compression algorithm to use (default: "zstd") 

44 

45 Returns: 

46 bytes: Binary representation of the DataFrame in parquet format 

47 

48 """ 

49 if isinstance(value, pd.DataFrame): 

50 table = pa.Table.from_pandas(value) 

51 

52 with BytesIO() as buffer: 

53 pq.write_table(table, buffer, compression=compression) 

54 return buffer.getvalue() 

55 

56 

57class PandasField(BaseField): 

58 """Custom MongoEngine field type for storing pandas DataFrames. 

59 

60 This field handles the conversion between pandas DataFrames and binary data 

61 that can be stored in MongoDB. It uses parquet format for efficient storage 

62 and retrieval of tabular data. 

63 """ 

64 

65 def __init__(self, compression: str = "zstd", **kwargs: Any) -> None: 

66 """Initialize a PandasField. 

67 

68 Args: 

69 compression: Compression algorithm to use for parquet storage (default: "zstd") 

70 **kwargs: Additional arguments passed to the parent BaseField 

71 

72 """ 

73 super().__init__(**kwargs) 

74 self.compression = compression 

75 

76 def __set__(self, instance: Any, value: pd.DataFrame | bytes | None) -> None: 

77 """Convert and set the value for this field. 

78 

79 If the value is a DataFrame, it's converted to a parquet byte stream. 

80 If it's already bytes, it's stored as-is. 

81 

82 Args: 

83 instance: The document instance 

84 value: The value to set (DataFrame, bytes, or None) 

85 

86 Raises: 

87 AssertionError: If the value is neither a DataFrame, bytes, nor None 

88 

89 """ 

90 if value is not None: 

91 if isinstance(value, pd.DataFrame): 

92 # Convert DataFrame to binary format for storage 

93 value = _write(value, compression=self.compression) 

94 elif isinstance(value, bytes): 

95 # Already in binary format, store as-is 

96 pass 

97 else: 

98 raise AssertionError(f"Type of value {type(value)} not supported. Expected DataFrame or bytes.") 

99 super().__set__(instance, value) 

100 

101 def __get__(self, instance: Any, owner: type) -> pd.DataFrame | None: 

102 """Retrieve and convert the stored value back to a DataFrame. 

103 

104 Args: 

105 instance: The document instance 

106 owner: The document class 

107 

108 Returns: 

109 Optional[pd.DataFrame]: The retrieved DataFrame or None if no data 

110 

111 """ 

112 data = super().__get__(instance, owner) 

113 

114 if data is not None: 

115 return _read(data) 

116 

117 return None