Coverage for src / antarctic / pandas_field.py: 100%

35 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-01-27 10:29 +0000

1"""Provide a custom Pandas field type for MongoEngine. 

2 

3It allows storing pandas DataFrames in MongoDB by converting them to and from 

4parquet-formatted byte streams. This enables efficient storage and retrieval 

5of pandas data structures within MongoDB documents. 

6""" 

7 

8from __future__ import annotations 

9 

10from io import BytesIO 

11from typing import Any 

12 

13import pandas as pd 

14import pyarrow as pa 

15import pyarrow.parquet as pq 

16from mongoengine.base import BaseField 

17 

18 

19def _read(value: bytes, columns: list[str] | None = None) -> pd.DataFrame: 

20 """Read a DataFrame from its binary parquet representation. 

21 

22 Args: 

23 value: Binary representation of a DataFrame stored as parquet 

24 columns: Optional list of column names to load (loads all columns if None) 

25 

26 Returns: 

27 pd.DataFrame: The reconstructed pandas DataFrame 

28 

29 Examples: 

30 >>> import pandas as pd 

31 >>> df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) 

32 >>> data = _write(df) 

33 >>> result = _read(data) 

34 >>> result["a"].tolist() 

35 [1, 2, 3] 

36 

37 Select specific columns: 

38 

39 >>> result = _read(data, columns=["b"]) 

40 >>> list(result.columns) 

41 ['b'] 

42 

43 """ 

44 with BytesIO(value) as buffer: 

45 table = pq.read_table(buffer, columns=columns) 

46 return table.to_pandas() 

47 

48 

49def _write(value: pd.DataFrame, compression: str = "zstd") -> bytes: 

50 """Convert a Pandas DataFrame into a compressed parquet byte-stream. 

51 

52 The byte-stream encodes in its metadata the structure of the Pandas object, 

53 including column names, data types, and index information. 

54 

55 Args: 

56 value: The DataFrame to convert 

57 compression: Compression algorithm to use (default: "zstd") 

58 

59 Returns: 

60 bytes: Binary representation of the DataFrame in parquet format 

61 

62 Examples: 

63 >>> import pandas as pd 

64 >>> df = pd.DataFrame({"x": [1.0, 2.0], "y": [3.0, 4.0]}) 

65 >>> data = _write(df) 

66 >>> isinstance(data, bytes) 

67 True 

68 >>> data[:4] # Parquet magic bytes 

69 b'PAR1' 

70 

71 """ 

72 if isinstance(value, pd.DataFrame): 

73 table = pa.Table.from_pandas(value) 

74 

75 with BytesIO() as buffer: 

76 pq.write_table(table, buffer, compression=compression) 

77 return buffer.getvalue() 

78 

79 

80class PandasField(BaseField): # type: ignore[misc] 

81 """Custom MongoEngine field type for storing pandas DataFrames. 

82 

83 This field handles the conversion between pandas DataFrames and binary data 

84 that can be stored in MongoDB. It uses parquet format for efficient storage 

85 and retrieval of tabular data. 

86 """ 

87 

88 def __init__(self, compression: str = "zstd", **kwargs: Any) -> None: 

89 """Initialize a PandasField. 

90 

91 Args: 

92 compression: Compression algorithm to use for parquet storage (default: "zstd") 

93 **kwargs: Additional arguments passed to the parent BaseField 

94 

95 """ 

96 super().__init__(**kwargs) 

97 self.compression = compression 

98 

99 def __set__(self, instance: Any, value: pd.DataFrame | bytes | None) -> None: 

100 """Convert and set the value for this field. 

101 

102 If the value is a DataFrame, it's converted to a parquet byte stream. 

103 If it's already bytes, it's stored as-is. 

104 

105 Args: 

106 instance: The document instance 

107 value: The value to set (DataFrame, bytes, or None) 

108 

109 Raises: 

110 AssertionError: If the value is neither a DataFrame, bytes, nor None 

111 

112 """ 

113 if value is not None: 

114 if isinstance(value, pd.DataFrame): 

115 # Convert DataFrame to binary format for storage 

116 value = _write(value, compression=self.compression) 

117 elif isinstance(value, bytes): 

118 # Already in binary format, store as-is 

119 pass 

120 else: 

121 msg = f"Type of value {type(value)} not supported. Expected DataFrame or bytes." 

122 raise TypeError(msg) 

123 super().__set__(instance, value) 

124 

125 def __get__(self, instance: Any, owner: type) -> pd.DataFrame | None: 

126 """Retrieve and convert the stored value back to a DataFrame. 

127 

128 Args: 

129 instance: The document instance 

130 owner: The document class 

131 

132 Returns: 

133 Optional[pd.DataFrame]: The retrieved DataFrame or None if no data 

134 

135 """ 

136 data = super().__get__(instance, owner) 

137 

138 if data is not None: 

139 return _read(data) 

140 

141 return None