Source code for stream.models.cbc

from datetime import datetime

import community as community_louvain
import networkx as nx
import numpy as np
import pandas as pd
from loguru import logger
from sklearn.preprocessing import OneHotEncoder

from ..preprocessor import c_tf_idf, extract_tfidf_topics
from ..utils.cbc_utils import (DocumentCoherence,
                               get_top_tfidf_words_per_document)
from ..utils.check_dataset_steps import check_dataset_steps
from ..utils.dataset import TMDataset
from .abstract_helper_models.base import BaseModel, TrainingStatus

MODEL_NAME = "CBC"
time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# logger.add(f"{MODEL_NAME}_{time}.log", backtrace=True, diagnose=True)


[docs]class CBC(BaseModel): def __init__(self): """ Initializes the DocumentClusterer with a DataFrame of coherence scores. Parameters: coherence_scores (DataFrame): DataFrame containing coherence scores between documents. """ self._status = TrainingStatus.NOT_STARTED self.n_topics = None
[docs] def get_info(self): """ Get information about the model. Returns ------- dict Dictionary containing model information including model name """ info = { "model_name": MODEL_NAME, "num_topics": self.n_topics, "trained": self._status.name, } return info
def _create_coherence_graph(self): """ Initializes the CBC model. Attributes ---------- trained : bool Indicator whether the model has been trained. """ G = nx.Graph() for i in self.coherence_scores.index: for j in self.coherence_scores.columns: # Add an edge if coherence score is above a certain threshold # Threshold can be adjusted if self.coherence_scores.at[i, j] > 0: G.add_edge(i, j, weight=self.coherence_scores.at[i, j]) return G
[docs] def cluster_documents(self): """ Clusters documents based on coherence scores. Returns ------- dict A dictionary mapping cluster labels to lists of document indices. """ try: logger.info("--- Creating document cluster ---") G = self._create_coherence_graph() partition = community_louvain.best_partition(G, weight="weight") clusters = {} for node, cluster_id in partition.items(): clusters.setdefault(cluster_id, []).append(node) except Exception as e: raise RuntimeError(f"Error in clustering: {e}") from e return clusters
[docs] def combine_documents(self, documents, clusters): """ Combines documents within each cluster. Parameters ---------- documents : DataFrame Original DataFrame of documents. clusters : dict Dictionary of document clusters. Returns ------- DataFrame New DataFrame with combined documents. """ combined_docs = [] for cluster_id, doc_indices in clusters.items(): combined_text = " ".join( documents.iloc[idx]["text"] for idx in doc_indices ) # Assuming 'text' column combined_docs.append( {"cluster_id": cluster_id, "combined_text": combined_text} ) return pd.DataFrame(combined_docs)
[docs] def prepare_data( self, dataset, ): """ Prepares the dataset for clustering. Parameters ---------- dataset : TMDataset Dataset containing the documents. """ self.dataframe = dataset.dataframe self.dataframe["tfidf_top_words"] = get_top_tfidf_words_per_document( self.dataframe["text"] )
[docs] def fit( self, dataset: TMDataset = None, max_topics: int = 20, max_iterations: int = 20, ): """ Clusters documents based on coherence scores until the number of clusters is within a specified threshold. Parameters ---------- dataset : TMDataset, optional Dataset containing the documents. max_topics : int, optional Maximum acceptable number of clusters. max_iterations : int, optional Maximum number of iterations for clustering. Raises ------ AssertionError If the dataset is not an instance of TMDataset. """ self.max_topics = max_topics assert isinstance( dataset, TMDataset ), "The dataset must be an instance of TMDataset." check_dataset_steps(dataset, logger, MODEL_NAME) self.prepare_data( dataset, ) iteration = 0 current_documents = self.dataframe.copy() document_indices = [[i] for i in range(len(current_documents))] self._status = TrainingStatus.INITIALIZED try: logger.info(f"--- Training {MODEL_NAME} topic model ---") self._status = TrainingStatus.RUNNING while True: print(f"Iteration: {iteration}") # Calculate coherence scores for the current set of documents coherence_scores = DocumentCoherence( current_documents, column="tfidf_top_words" ).calculate_document_coherence() # Cluster the documents based on the current coherence scores self.coherence_scores = coherence_scores clusters = self.cluster_documents() num_clusters = len(clusters) print( f"Iteration {iteration}: {num_clusters} clusters formed.") # Prepare for the next iteration combined_documents = self.combine_documents( current_documents, clusters) current_documents = combined_documents iteration += 1 # Update document indices to reflect their new combined form new_document_indices = [] for cluster_ids in clusters.values(): new_document_indices.append( [document_indices[idx] for idx in cluster_ids] ) document_indices = new_document_indices # Check if the number of clusters is within the threshold if 2 <= num_clusters <= self.max_topics: break elif num_clusters < 2: print( "Too few clusters formed. Consider changing parameters or input data." ) break # Stop if too many iterations to prevent infinite loop if iteration > max_iterations: # You can adjust this limit print("Maximum iterations reached. Stopping clustering process.") break except Exception as e: logger.error(f"Error in training: {e}") self._status = TrainingStatus.FAILED raise except KeyboardInterrupt: logger.error("Training interrupted.") self._status = TrainingStatus.INTERRUPTED raise labels = {} for cluster_label, doc_indices_group in enumerate(document_indices): for doc_indices in doc_indices_group: for index in doc_indices: labels[index] = cluster_label self.dataframe["predictions"] = self.dataframe.index.map(labels) self.labels = np.array(self.dataframe["predictions"]) self.n_topics = len(np.unique(self.labels)) if np.isnan(self.labels).sum() > 0: # Store the indices of NaN values self.dropped_indices = np.where(np.isnan(self.labels))[0] # Replace NaN values with -1 in self.labels self.labels[np.isnan(self.labels)] = -1 self.labels += 1 # Update the 'predictions' column in the dataframe with -1 where NaN was present self.dataframe["predictions"] = self.dataframe["predictions"].fillna( -1) self.dataframe["predictions"] += 1 print("--- replaced NaN values with 0 in topics ---") print( "--- indices of original NaN values stored in self.dropped_indices ---" ) docs_per_topic = self.dataframe.groupby(["predictions"], as_index=False).agg( {"text": " ".join} ) logger.info("--- Extract topics ---") tfidf, count = c_tf_idf( docs_per_topic["text"].values, m=len(self.dataframe)) self.topic_dict = extract_tfidf_topics( tfidf, count, docs_per_topic, n=10) one_hot_encoder = OneHotEncoder( sparse=False ) # Use sparse=False to get a dense array predictions_one_hot = one_hot_encoder.fit_transform( self.dataframe[["predictions"]] ) logger.info("--- Training completed successfully. ---") self._status = TrainingStatus.SUCCEEDED self.beta = tfidf self.theta = predictions_one_hot
[docs] def predict(self, texts): """ Predict topics for new documents. Parameters ---------- texts : list of str List of texts to predict topics for. Returns ------- list of int List of predicted topic labels. Raises ------ ValueError If the model has not been trained yet. """ if self._status != TrainingStatus.SUCCEEDED: raise RuntimeError("Model has not been trained yet or failed.") embeddings = self.encode_documents( texts, encoder_model=self.embedding_model_name, use_average=True ) reduced_embeddings = self.reducer.transform(embeddings) labels = self.clustering_model.predict(reduced_embeddings) return labels