Source code for stream.utils.cbc_utils
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix
from sklearn.feature_extraction.text import TfidfVectorizer
from tqdm import tqdm
[docs]def get_top_tfidf_words_per_document(corpus, n=10):
"""
Get the top TF-IDF words per document in a corpus.
Args:
corpus (list): List of documents.
n (int, optional): Number of top words to retrieve per document (default is 10).
Returns:
list: A list of lists containing the top TF-IDF words for each document in the corpus.
"""
vectorizer = TfidfVectorizer(stop_words="english")
X = vectorizer.fit_transform(corpus)
feature_names = vectorizer.get_feature_names_out()
top_words_per_document = []
for row in X:
sorted_indices = np.argsort(row.toarray()).flatten()[::-1]
top_n_indices = sorted_indices[:n]
top_words = [feature_names[i] for i in top_n_indices]
top_words_per_document.append(top_words)
return top_words_per_document
[docs]class DocumentCoherence:
"""
A class for calculating the coherence between documents based on their top words.
This is achieved through the use of Normalized Pointwise Mutual Information (NPMI).
Attributes:
documents (DataFrame): DataFrame containing documents and their top words.
column (str): Column name in DataFrame that contains the top words for each document.
stopwords (set): Set of stopwords to exclude from analysis.
word_index (dict): Dictionary mapping each unique word to a unique index.
doc_word_matrix (csr_matrix): Sparse matrix representing the occurrence of words in documents.
"""
def __init__(self, documents, column="tfidf_top_words", stopwords=None):
"""
Initializes the DocumentCoherence object with a DataFrame of documents.
Parameters:
documents (DataFrame): DataFrame containing documents and their top words.
column (str): The column name in the DataFrame that contains the top words for each document.
stopwords (list, optional): List of stopwords to exclude from analysis.
"""
self.documents = documents
self.column = column
self.stopwords = set(stopwords) if stopwords else set()
self.word_index = self._create_word_index()
self.doc_word_matrix = self._create_doc_word_matrix()
def _create_word_index(self):
unique_words = set()
for words in self.documents[self.column]:
unique_words.update(words)
unique_words -= self.stopwords
return {word: idx for idx, word in enumerate(unique_words)}
def _create_doc_word_matrix(self):
print("--- create doc-word-matrix ---")
rows, cols = [], []
for idx, words in enumerate(self.documents[self.column]):
words = set(words) - self.stopwords
for word in words:
if word in self.word_index:
rows.append(idx)
cols.append(self.word_index[word])
data = [1] * len(rows)
return csr_matrix(
(data, (rows, cols)),
shape=(
len(self.documents),
len(self.word_index),
),
)
def _calculate_co_occurrences(self):
# Matrix multiplication to find co-occurrences
return self.doc_word_matrix.T.dot(self.doc_word_matrix)
def _calculate_npmi(self, co_occurrences, n_documents):
eps = 1e-12
word_prob = np.array(self.doc_word_matrix.sum(
axis=0) / n_documents).flatten()
# Convert sparse matrix to dense for the operation
joint_prob = co_occurrences.toarray() / n_documents
# Calculate PMI
pmi = np.log((joint_prob + eps) /
(np.outer(word_prob, word_prob) + eps))
# Calculate NPMI
npmi = pmi / -np.log(joint_prob + eps)
return npmi
[docs] def calculate_document_coherence(self):
"""
Calculate document coherence scores based on NP (Normalized Pointwise) Mutual Information (NPMI).
Returns:
pd.DataFrame: A DataFrame containing coherence scores between each pair of documents.
"""
n_documents = self.doc_word_matrix.shape[0]
co_occurrences = self._calculate_co_occurrences()
npmi_matrix = self._calculate_npmi(co_occurrences, n_documents)
# Initialize DataFrame for coherence scores
coherence_scores = pd.DataFrame(
np.nan, index=self.documents.index, columns=self.documents.index
)
# Precompute nonzero indices for each document
doc_nonzero_indices = [
set(self.doc_word_matrix[i, :].nonzero()[1]) for i in range(n_documents)
]
for i in tqdm(range(n_documents)):
for j in range(
i + 1, n_documents
): # Avoid redundant calculations by only doing one half of the matrix
combined_indices = doc_nonzero_indices[i] & doc_nonzero_indices[j]
if combined_indices:
selected_npmi_values = npmi_matrix[list(combined_indices)][
:, list(combined_indices)
]
coherence_score = np.nanmean(selected_npmi_values)
coherence_scores.iat[i, j] = coherence_score
# Symmetric matrix
coherence_scores.iat[j, i] = coherence_score
return coherence_scores