Source code for stream.utils.dataset

import json
import os
import pickle
import re

import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, Dataset, random_split

from ..preprocessor import TextPreprocessor


[docs]class TMDataset(Dataset): def __init__(self, name=None, language="en"): """ Initialize the TMDataset. Parameters ---------- name : str, optional Name of the dataset. """ super().__init__() self.dataset_registry = [ "20NewsGroup", "M10", "Spotify", "Spotify_most_popular", "Poliblogs", "Reuters", "BBC_News", "DBLP", "DBPedia_IT", "Europarl_IT", ] self.name = name self.dataframe = None self.embeddings = None self.texts = None self.labels = None self.language = language self.preprocessing_steps = self.default_preprocessing_steps() def default_preprocessing_steps(self): return { "remove_stopwords": False, "lowercase": True, "remove_punctuation": False, "remove_numbers": False, "lemmatize": False, "stem": False, "expand_contractions": True, "remove_html_tags": True, "remove_special_chars": True, "remove_accents": False, "custom_stopwords": set(), "detokenize": True, }
[docs] def load_model_preprocessing_steps(self, model_type, filepath=None): """ Load the default preprocessing steps from a JSON file. Parameters ---------- filepath : str The path to the JSON file containing the default preprocessing steps. Returns ------- dict The default preprocessing steps. """ if filepath is None: # Determine the absolute path based on the current file's location current_dir = os.path.dirname(__file__) filepath = os.path.join( current_dir, "..", "preprocessor", "default_preprocessing_steps.json" ) filepath = os.path.abspath(filepath) with open(filepath, "r") as file: all_steps = json.load(file) return all_steps.get(model_type, {})
[docs] def fetch_dataset(self, name, dataset_path=None): """ Fetch a dataset by name. Parameters ---------- name : str Name of the dataset to fetch. dataset_path : str, optional Path to the dataset directory. """ self.name = name if dataset_path is None: dataset_path = self.get_package_dataset_path(name) if os.path.exists(dataset_path): self.load_custom_dataset_from_folder(dataset_path) else: raise ValueError(f"Dataset path {dataset_path} does not exist.") self._load_data_to_dataframe() self.info = self.get_info(dataset_path)
def _load_data_to_dataframe(self): """ Load data into a pandas DataFrame. """ self.dataframe = pd.DataFrame( { "tokens": self.get_corpus(), "labels": self.get_labels(), } ) self.dataframe["text"] = [" ".join(words) for words in self.dataframe["tokens"]] self.texts = self.dataframe["text"].tolist() self.labels = self.dataframe["labels"].tolist()
[docs] def get_package_dataset_path(self, name): """ Get the path to the package dataset. Parameters ---------- name : str Name of the dataset. Returns ------- str Path to the dataset. """ script_dir = os.path.dirname(os.path.abspath(__file__)) my_package_dir = os.path.dirname(script_dir) dataset_path = os.path.join( my_package_dir, "preprocessed_datasets", name) return dataset_path
[docs] def has_embeddings(self, embedding_model_name, path=None, file_name=None): """ Check if embeddings are available for the dataset. Parameters ---------- embedding_model_name : str Name of the embedding model used. path : str, optional Path where embeddings are expected to be saved. file_name : str, optional File name for the embeddings. Returns ------- bool True if embeddings are available, False otherwise. """ if path is None: path = self.get_package_embeddings_path(self.name) embeddings_file = ( os.path.join(path, file_name) if file_name else os.path.join( path, f"{self.name}_embeddings_{embedding_model_name}.pkl" ) ) return os.path.exists(embeddings_file)
[docs] def save_embeddings( self, embeddings, embedding_model_name, path=None, file_name=None ): """ Save embeddings for the dataset. Parameters ---------- embeddings : np.ndarray Embeddings to save. embedding_model_name : str Name of the embedding model used. path : str, optional Path to save the embeddings. file_name : str, optional File name for the embeddings. """ if path is None: path = self.get_package_embeddings_path(self.name) embeddings_file = ( os.path.join(path, file_name) if file_name else os.path.join( path, f"{self.name}_embeddings_{embedding_model_name}.pkl" ) ) with open(embeddings_file, "wb") as file: pickle.dump(embeddings, file)
[docs] def get_embeddings(self, embedding_model_name, path=None, file_name=None): """ Get embeddings for the dataset. Parameters ---------- embedding_model_name : str Name of the embedding model to use. path : str, optional Path to save the embeddings. file_name : str, optional File name for the embeddings. Returns ------- np.ndarray Embeddings for the dataset. """ if not self.has_embeddings(embedding_model_name, path, file_name): raise ValueError( "Embeddings are not available. Run the encoding process first or load embeddings." ) # print("--- Loading pre-computed document embeddings ---") if self.embeddings is None: if path is None: path = self.get_package_embeddings_path(self.name) embeddings_file = ( os.path.join(path, file_name) if file_name else os.path.join( path, f"{self.name}_embeddings_{embedding_model_name}.pkl" ) ) with open(embeddings_file, "rb") as file: self.embeddings = pickle.load(file) return self.embeddings
[docs] def get_package_embeddings_path(self, name): """ Get the path to the package embeddings. Parameters ---------- name : str Name of the dataset. Returns ------- str Path to the embeddings. """ script_dir = os.path.dirname(os.path.abspath(__file__)) my_package_dir = os.path.dirname(script_dir) dataset_path = os.path.join( my_package_dir, "pre_embedded_datasets", name) return dataset_path
[docs] def create_load_save_dataset( self, data, dataset_name, save_dir, doc_column=None, label_column=None, **kwargs, ): """ Create, load, and save a dataset. Parameters ---------- data : pd.DataFrame or list The data to create the dataset from. dataset_name : str Name of the dataset. save_dir : str Directory to save the dataset. doc_column : str, optional Column name for documents if data is a DataFrame. label_column : str, optional Column name for labels if data is a DataFrame. **kwargs : dict Additional columns and their values to include in the dataset. Returns ------- Preprocessing The preprocessed dataset. """ if isinstance(data, pd.DataFrame): if doc_column is None: raise ValueError( "doc_column must be specified for DataFrame input") documents = [ self.clean_text(str(row[doc_column])) for _, row in data.iterrows() ] labels = ( data[label_column].tolist() if label_column else [ None] * len(documents) ) elif isinstance(data, list): documents = [self.clean_text(doc) for doc in data] labels = [None] * len(documents) else: raise TypeError( "data must be a pandas DataFrame or a list of documents") # Initialize preprocessor with kwargs preprocessor = TextPreprocessor(**kwargs) preprocessed_documents = preprocessor.preprocess_documents(documents) self.texts = preprocessed_documents self.labels = labels # Add additional columns from kwargs to the DataFrame additional_columns = { key: value for key, value in kwargs.items() if key != "preprocessor" } additional_columns.update({"text": self.texts, "labels": self.labels}) self.dataframe = pd.DataFrame(additional_columns) # Save the dataset to Parquet format parquet_path = os.path.join(save_dir, f"{dataset_name}.parquet") self.dataframe.to_parquet(parquet_path) # Save dataset information dataset_info = { "name": dataset_name, "language": self.language, "preprocessing_steps": { k: v for k, v in preprocessor.__dict__.items() if k not in ["stop_words", "language", "contractions_dict"] }, } info_path = os.path.join(save_dir, f"{dataset_name}_info.pkl") with open(info_path, "wb") as info_file: pickle.dump(dataset_info, info_file) return preprocessor
[docs] def preprocess(self, model_type=None, custom_stopwords=None, **preprocessing_steps): """ Preprocess the dataset. Parameters ---------- language : str, optional The language to use for preprocessing (default is "english"). remove_stopwords : bool, optional Whether to remove stopwords (default is False). lowercase : bool, optional Whether to convert text to lowercase (default is True). remove_punctuation : bool, optional Whether to remove punctuation (default is True). remove_numbers : bool, optional Whether to remove numbers (default is True). lemmatize : bool, optional Whether to lemmatize words (default is False). stem : bool, optional Whether to stem words (default is False). expand_contractions : bool, optional Whether to expand contractions (default is False). remove_html_tags : bool, optional Whether to remove HTML tags (default is False). remove_special_chars : bool, optional Whether to remove special characters (default is False). remove_accents : bool, optional Whether to remove accents (default is False). custom_stopwords : list of str, optional List of custom stopwords to remove (default is an empty list). detokenize : bool, optional Whether to detokenize the text after processing (default is True). Returns ------- None This method modifies the object's texts and dataframe attributes in place. Notes ----- This function applies a series of preprocessing steps to the text data stored in the object's `texts` attribute. The preprocessed text is then stored back into the `texts` attribute and updated in the `dataframe["text"]` column. """ if model_type: preprocessing_steps = self.load_model_preprocessing_steps( model_type) previous_steps = self.preprocessing_steps # Filter out steps that have already been applied filtered_steps = { key: ( False if key in previous_steps and previous_steps[key] == value else value ) for key, value in preprocessing_steps.items() } if custom_stopwords: filtered_steps["remove_stopwords"] = True filtered_steps["custom_stopwords"] = list(set(custom_stopwords)) else: filtered_steps["custom_stopwords"] = [] # Only preprocess if there are steps that need to be applied if filtered_steps: try: preprocessor = TextPreprocessor( language=self.language, **preprocessing_steps, ) self.texts = preprocessor.preprocess_documents(self.texts) self.dataframe["text"] = self.texts self.dataframe["tokens"] = self.dataframe["text"].apply( lambda x: x.split() ) self.info.update( { "preprocessing_steps": { k: v for k, v in preprocessor.__dict__.items() if k != "stopwords" } } ) except Exception as e: raise RuntimeError( f"Error in dataset preprocessing: {e}") from e self.update_preprocessing_steps(**filtered_steps)
[docs] def update_preprocessing_steps(self, **preprocessing_steps): """ Update preprocessing steps to True if they were previously False. Parameters ---------- preprocessing_steps : dict Key-value pairs of preprocessing steps to update. """ for step, value in preprocessing_steps.items(): if ( value is True and step in self.preprocessing_steps and not self.preprocessing_steps[step] ): self.preprocessing_steps[step] = True elif value is True and step not in self.preprocessing_steps: self.preprocessing_steps[step] = True
[docs] def get_info(self, dataset_path=None): """ Load and return the dataset information. Parameters ---------- name : str Name of the dataset. save_dir : str Directory where the dataset is saved. Returns ------- dict Dictionary containing the dataset information. """ if dataset_path is None: dataset_path = self.get_package_dataset_path(self.name) elif os.path.exists(dataset_path): pass else: raise ValueError(f"Dataset path {dataset_path} does not exist.") info_path = os.path.join(dataset_path, f"{self.name}_info.pkl") if not os.path.exists(info_path): raise FileNotFoundError( f"Dataset info file {info_path} does not exist.") with open(info_path, "rb") as info_file: dataset_info = pickle.load(info_file) return dataset_info
[docs] @staticmethod def clean_text(text): """ Clean the input text. Parameters ---------- text : str Input text to clean. Returns ------- str Cleaned text. """ text = text.replace("\n", " ").replace("\r", " ").replace("\\", "") text = re.sub(r"[{}[\]-]", "", text) text = text.encode("utf-8", "replace").decode("utf-8") return text
def __len__(self): """ Get the number of samples in the dataset. Returns ------- int Number of samples. """ return len(self.texts) def __getitem__(self, idx): """ Get a sample by index. Parameters ---------- idx : int Index of the sample. Returns ------- dict Sample at the given index. """ item = {"text": self.texts[idx]} if self.labels[idx] is not None: item["label"] = self.labels[idx] if self.embeddings is not None: item["embedding"] = self.embeddings[idx] return item
[docs] def get_data_loader( self, batch_size=32, shuffle=True, num_workers=0, pin_memory=False ): """ Get a data loader for the dataset. Parameters ---------- batch_size : int, optional Number of samples per batch, by default 32. shuffle : bool, optional Whether to shuffle the data, by default True. num_workers : int, optional Number of subprocesses to use for data loading, by default 0. pin_memory : bool, optional If True, the data loader will copy tensors into CUDA pinned memory, by default False. Returns ------- DataLoader Data loader for the dataset. """ return DataLoader( self, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, pin_memory=pin_memory, )
[docs] def load_custom_dataset_from_folder(self, dataset_path): """ Load a custom dataset from a folder. Parameters ---------- dataset_path : str Path to the dataset folder. """ parquet_path = os.path.join(dataset_path, f"{self.name}.parquet") if os.path.exists(parquet_path): self.load_dataset_from_parquet(parquet_path) else: documents_path = os.path.join(dataset_path, "corpus.txt") labels_path = os.path.join(dataset_path, "labels.txt") with open(documents_path, encoding="utf-8") as f: documents = f.readlines() with open(labels_path, encoding="utf-8") as f: labels = f.readlines() self.dataframe = pd.DataFrame( { "text": [doc.strip() for doc in documents], "labels": [label.strip() for label in labels], } ) self.dataframe["tokens"] = self.dataframe["text"].apply( lambda x: x.split()) self.texts = self.dataframe["text"].tolist() self.labels = self.dataframe["labels"].tolist()
[docs] def get_corpus(self): """ Get the corpus (tokens) from the dataframe. Returns ------- list of list of str Corpus tokens. """ return self.dataframe["tokens"].tolist()
[docs] def get_vocabulary(self): """ Get the vocabulary from the dataframe. Returns ------- list of str Vocabulary. """ # Flatten the list of lists and convert to set for unique words all_tokens = [ token for sublist in self.dataframe["tokens"].tolist() for token in sublist ] return list(set(all_tokens))
[docs] def get_labels(self): """ Get the labels from the dataframe. Returns ------- list of str Labels. """ return self.dataframe["labels"].tolist()
[docs] def split_dataset(self, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1, seed=None): """ Split the dataset into train, validation, and test sets. Parameters ---------- train_ratio : float, optional Ratio of the training set, by default 0.8. val_ratio : float, optional Ratio of the validation set, by default 0.1. test_ratio : float, optional Ratio of the test set, by default 0.1. seed : int, optional Random seed for shuffling, by default None. Returns ------- tuple of Dataset Train, validation, and test datasets. """ total_size = len(self) if train_ratio < 0 or val_ratio < 0 or test_ratio < 0: raise ValueError("Train, val and test ratios must be positive") if train_ratio == 0 and val_ratio == 0 and test_ratio == 0: raise ValueError("Train, val and test ratios cannot all be 0") if train_ratio > 1 or val_ratio > 1 or test_ratio > 1: raise ValueError( "Train, val and test ratios must be less than or equal to 1" ) if train_ratio + val_ratio + test_ratio != 1.0: raise ValueError("Train, validation and test ratios must sum to 1") train_size = int(train_ratio * total_size) val_size = int(val_ratio * total_size) test_size = total_size - train_size - val_size if seed is not None: np.random.seed(seed) train_dataset, val_dataset, test_dataset = random_split( self, [train_size, val_size, test_size] ) return train_dataset, val_dataset, test_dataset
[docs] def get_data_loaders( self, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1, batch_size=32, shuffle=True, num_workers=0, pin_memory=False, seed=None, ): """ Get data loaders for train, validation, and test sets. Parameters ---------- train_ratio : float, optional Ratio of the training set, by default 0.8. val_ratio : float, optional Ratio of the validation set, by default 0.1. test_ratio : float, optional Ratio of the test set, by default 0.1. batch_size : int, optional Number of samples per batch, by default 32. shuffle : bool, optional Whether to shuffle the data, by default True. num_workers : int, optional Number of subprocesses to use for data loading, by default 0. pin_memory : bool, optional If True, the data loader will copy tensors into CUDA pinned memory, by default False. seed : int, optional Random seed for shuffling, by default None. Returns ------- tuple of DataLoader Data loaders for train, validation, and test sets. """ train_dataset, val_dataset, test_dataset = self.split_dataset( train_ratio, val_ratio, test_ratio, seed ) train_loader = DataLoader( train_dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, pin_memory=pin_memory, ) val_loader = DataLoader( val_dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, pin_memory=pin_memory, ) test_loader = DataLoader( test_dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, pin_memory=pin_memory, ) return train_loader, val_loader, test_loader
def _save_to_parquet(self, save_dir, dataset_name): """ Save the dataset to a Parquet file. Parameters ---------- save_dir : str Directory to save the dataset. dataset_name : str Name of the dataset. """ save_path = os.path.join(save_dir, f"{dataset_name}.parquet") self.dataframe.to_parquet(save_path, index=False)
[docs] def load_dataset_from_parquet(self, load_path): """ Load a dataset from a Parquet file. Parameters ---------- load_path : str Path to the Parquet file. """ if not os.path.exists(load_path): raise FileNotFoundError(f"File {load_path} does not exist.") self.dataframe = pd.read_parquet(load_path) self.dataframe["tokens"] = self.dataframe["text"].apply( lambda x: x.split()) self.texts = self.dataframe["text"].tolist() self.labels = self.dataframe["labels"].tolist()