Coverage for src/ifunnel/models/MST.py: 100%
38 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-12 09:14 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-12 09:14 +0000
1import networkx as nx
2import numpy as np
3import pandas as pd
4from loguru import logger
5from sklearn.decomposition import PCA
8def minimum_spanning_tree(dataset: pd.DataFrame) -> (list, pd.DataFrame, float, float):
9 logger.debug("💡 Running MST method")
11 corr = dataset.corr(method="spearman") # calculate the correlation
12 distance_corr = (2 * (1 - corr)) ** 0.5 # calculate the distance
13 mask = np.triu(np.ones_like(corr, dtype=bool)) # get only the upper half of the matrix
14 distance_corr = distance_corr * mask
16 # use the correlation matrix to create links
17 links = distance_corr.stack().reset_index(level=1)
18 links.columns = ["var2", "value"]
19 links = links.reset_index()
20 links = links.replace(0, np.nan) # drop 0 values from the matrix
21 links = links.dropna(how="any", axis=0)
22 links.columns = ["var1", "var2", "value"] # rename the columns
23 links_filtered = links.loc[(links["var1"] != links["var2"])] # filter out self-correlations
25 # Create the graph
26 created_graph = nx.Graph()
27 for i in range(len(corr)): # add nodes
28 created_graph.add_node(corr.index[i])
29 tuples = list(links_filtered.itertuples(index=False, name=None)) # add edges with weight
30 created_graph.add_weighted_edges_from(tuples)
32 # Create a MST from the full graph
33 mst = nx.minimum_spanning_tree(created_graph)
35 # Save the nodes with degree one
36 degrees = [val for (node, val) in mst.degree()]
37 df = pd.DataFrame(degrees, corr.index)
38 df.columns = ["degree"]
39 subset = df[df["degree"] == 1].index.tolist()
41 # Create a new dataframe with only the assets from the subset
42 subset_df = dataset.loc[:, dataset.columns.isin(subset)]
44 # Calculate the average correlation of the subset
45 corr_subset = subset_df.corr(method="spearman")
46 corr_avg = corr_subset.mean().mean()
48 # Calculate the PDI for the subset
49 pca = PCA()
50 pca.fit(corr_subset)
51 value = 0
52 for i in range(1, corr_subset.shape[1]):
53 value = value + i * pca.explained_variance_ratio_[i - 1]
54 pdi = 2 * value - 1
56 return subset, subset_df, corr_avg, pdi