Coverage for src/ifunnel/models/MST.py: 100%

38 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-12 09:14 +0000

1import networkx as nx 

2import numpy as np 

3import pandas as pd 

4from loguru import logger 

5from sklearn.decomposition import PCA 

6 

7 

8def minimum_spanning_tree(dataset: pd.DataFrame) -> (list, pd.DataFrame, float, float): 

9 logger.debug("💡 Running MST method") 

10 

11 corr = dataset.corr(method="spearman") # calculate the correlation 

12 distance_corr = (2 * (1 - corr)) ** 0.5 # calculate the distance 

13 mask = np.triu(np.ones_like(corr, dtype=bool)) # get only the upper half of the matrix 

14 distance_corr = distance_corr * mask 

15 

16 # use the correlation matrix to create links 

17 links = distance_corr.stack().reset_index(level=1) 

18 links.columns = ["var2", "value"] 

19 links = links.reset_index() 

20 links = links.replace(0, np.nan) # drop 0 values from the matrix 

21 links = links.dropna(how="any", axis=0) 

22 links.columns = ["var1", "var2", "value"] # rename the columns 

23 links_filtered = links.loc[(links["var1"] != links["var2"])] # filter out self-correlations 

24 

25 # Create the graph 

26 created_graph = nx.Graph() 

27 for i in range(len(corr)): # add nodes 

28 created_graph.add_node(corr.index[i]) 

29 tuples = list(links_filtered.itertuples(index=False, name=None)) # add edges with weight 

30 created_graph.add_weighted_edges_from(tuples) 

31 

32 # Create a MST from the full graph 

33 mst = nx.minimum_spanning_tree(created_graph) 

34 

35 # Save the nodes with degree one 

36 degrees = [val for (node, val) in mst.degree()] 

37 df = pd.DataFrame(degrees, corr.index) 

38 df.columns = ["degree"] 

39 subset = df[df["degree"] == 1].index.tolist() 

40 

41 # Create a new dataframe with only the assets from the subset 

42 subset_df = dataset.loc[:, dataset.columns.isin(subset)] 

43 

44 # Calculate the average correlation of the subset 

45 corr_subset = subset_df.corr(method="spearman") 

46 corr_avg = corr_subset.mean().mean() 

47 

48 # Calculate the PDI for the subset 

49 pca = PCA() 

50 pca.fit(corr_subset) 

51 value = 0 

52 for i in range(1, corr_subset.shape[1]): 

53 value = value + i * pca.explained_variance_ratio_[i - 1] 

54 pdi = 2 * value - 1 

55 

56 return subset, subset_df, corr_avg, pdi