Add data processing and sampling for fake news dataset

This commit is contained in:
2025-03-26 17:49:39 +02:00
parent 97466edeae
commit 1dc796b59e
7 changed files with 737 additions and 187 deletions

View File

@@ -1,27 +1,205 @@
import random
import numpy as np
import pandas as pd
import spacy
import nltk
import matplotlib.pyplot as plt
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from collections import Counter
from pandarallel import pandarallel
import multiprocessing
import os
import subprocess
import pyarrow.parquet as pq
import pyarrow as pa
from datetime import datetime
data_path = "./FNC/news_cleaned_2018_02_13.csv"
sample_path = "sampled_news"
SAMPLE_FRACTION = 0.001 # Use 0.001 for 0.1% of the dataset
# Print log messages with timestamp
def print_log(msg):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {msg}")
if not os.path.exists(data_path):
print(f"❌ Error: File not found at {data_path}")
# Download NLTK stopwords
nltk.download('stopwords')
# Load spaCy model
print_log("📚 Loading spaCy model...")
try:
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
except OSError:
import subprocess
print_log("⬇️ Model not found. Downloading...")
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"])
nlp = spacy.load("en_core_web_sm")
print_log("📖 spaCy model loaded.")
# Paths
csv_path = "../data/news_cleaned_2018_02_13.csv"
parquet_path = "../data/news_cleaned_2018_02_13.parquet"
output_parquet = "../data/processed_fakenews.parquet"
output_csv = "../data/processed_fakenews.csv"
# Convert CSV to Parquet if needed
if os.path.exists(parquet_path):
data_path = parquet_path
elif os.path.exists(csv_path):
print_log("🔄 Converting CSV to Parquet...")
chunksize=1e5
pqwriter = None
for i, df in enumerate(pd.read_csv(csv_path, lineterminator="\n", on_bad_lines="skip", chunksize=chunksize, usecols=["id", "content", "type"])):
table = pa.Table.from_pandas(df)
# If it's the first chunk, create a new parquet writer
if i == 0:
pqwriter = pq.ParquetWriter(parquet_path, table.schema)
pqwriter.write_table(table)
if pqwriter:
pqwriter.close()
print_log("✅ Conversion complete.")
data_path = parquet_path
else:
print_log("❌ Error: No dataset found.")
exit()
# Get total rows. Only works on Unix-like systems due to `wc` command
total_rows = int(subprocess.check_output(["wc", "-l", data_path]).split()[0]) - 1
print(f"🔍 Dataset contains {total_rows:,} rows.")
# Stopwords & Stemmer
stop_words = set(stopwords.words("english"))
stemmer = PorterStemmer()
sample_size = int(total_rows * SAMPLE_FRACTION)
print(f"📉 Reducing dataset to {sample_size:,} rows...")
# Initialize parallel processing
# !WARNING: This will use all available CPU cores, might kill host machine
# Set progress_bar=True to see a progress bar
pandarallel.initialize(nb_workers=max(1, int(multiprocessing.cpu_count())), progress_bar=False)
# Read only a sample
skip_rows = sorted(random.sample(range(1, total_rows + 1), total_rows - sample_size))
df_sample = pd.read_csv(data_path, skiprows=skip_rows, lineterminator="\n", on_bad_lines="skip")
df_sample.to_csv(f"{sample_path}.csv", index=False)
df_sample.to_parquet(f"{sample_path}.parquet", index=False)
batch_size = 100000
parquet_file = pq.ParquetFile(data_path)
print("✅ Sample saved to sampled_news.csv and sampled_news.parquet.")
processed_chunks = []
vocab_before = Counter()
vocab_after_stopwords = Counter()
vocab_after_stemming = Counter()
total_words_before = 0
total_words_after_stopwords = 0
total_words_after_stemming = 0
total_chars_after_stopwords = 0
total_chars_after_stemming = 0
# Process text in batches
print_log("🧮 Processing text in batches...")
batch_num = 0
for batch in parquet_file.iter_batches(batch_size):
print_log(f"🔢 Processing batch {batch_num + 1}...")
chunk = batch.to_pandas()
chunk = chunk.dropna(subset=["content"]).astype({'content': 'string'})
# Tokenize, remove stopwords, and apply stemming
print_log("🪙 Tokenizing text...")
chunk_tokens = chunk["content"].parallel_apply(lambda text: [word.lower() for word in text.split() if word.isalpha()])
for tokens in chunk_tokens:
vocab_before.update(tokens)
total_words_before += len(tokens)
print_log("🚫 Removing stopwords...")
chunk_no_stopwords = chunk_tokens.parallel_apply(lambda tokens: [word for word in tokens if word not in stop_words])
for tokens in chunk_no_stopwords:
vocab_after_stopwords.update(tokens)
total_words_after_stopwords += len(tokens)
total_chars_after_stopwords += sum(len(word) for word in tokens)
print_log("🌱 Applying stemming...")
chunk_stemmed = chunk_no_stopwords.parallel_apply(lambda tokens: [stemmer.stem(word) for word in tokens])
for tokens in chunk_stemmed:
vocab_after_stemming.update(tokens)
total_words_after_stemming += len(tokens)
total_chars_after_stemming += sum(len(word) for word in tokens)
# Join tokens back to text
print_log("📝 Joining tokens back to text...")
chunk["processed_text"] = chunk_stemmed.parallel_apply(lambda tokens: ' '.join(tokens))
processed_chunks.append(chunk[["id", "processed_text", "type"]])
batch_num += 1
# Save processed data
final_df = pd.concat(processed_chunks, ignore_index=True)
final_df.to_parquet(output_parquet, index=False)
final_df.to_csv(output_csv, index=False)
print_log(f"💾 Processed data saved to '{output_parquet}' and '{output_csv}'")
# Print statistics
total_vocab_before = len(vocab_before)
total_vocab_after_stopwords = len(vocab_after_stopwords)
total_vocab_after_stemming = len(vocab_after_stemming)
total_stopword_reduction = (total_words_before - total_words_after_stopwords) / total_words_before * 100
print_log(f"📊 Total words (the raw number of all words in the text, including duplicates): {total_words_before:,}")
print(f"⏮️ Before stopword removal: {total_words_before:,}")
print(f"🔻 After stopword removal: {total_words_after_stopwords:,} (-{total_stopword_reduction:.2f}%)")
vocab_stemming_reduction = (total_vocab_after_stopwords - total_vocab_after_stemming) / total_vocab_after_stopwords * 100
print_log(f"🫆 Vocabulary (the number of distinct words in the text, ignoring duplicates):")
print(f"⏮️ Before stemming: {total_vocab_before:,}")
print(f"🔻 After stemming: {total_vocab_after_stemming:,} (-{vocab_stemming_reduction:.2f}%)")
avg_chars_after_stopwords = total_chars_after_stopwords / total_words_after_stopwords
avg_chars_after_stemming = total_chars_after_stemming / total_words_after_stemming
avg_chars_reduction = (avg_chars_after_stopwords - avg_chars_after_stemming) / avg_chars_after_stopwords * 100
print_log(f"📏 Avg. length of retained words:")
print(f"⏮️ After stopword removal: {avg_chars_after_stopwords:.2f}")
print(f"🔻 After stemming: {avg_chars_after_stemming:.2f} (-{avg_chars_reduction:.2f}%)")
# Get most frequent words before and after stopword removal & stemming
def get_most_frequent_words(vocab, top_n=10):
return vocab.most_common(top_n)
top_words_before = get_most_frequent_words(vocab_before)
top_words_after_stopwords = get_most_frequent_words(vocab_after_stopwords)
top_words_after_stemming = get_most_frequent_words(vocab_after_stemming)
print_log("📌 Top 10 words:")
print("🔝 Before preprocessing:", top_words_before)
print("🔝 After stopword removal:", top_words_after_stopwords)
print("🔝 After stemming:", top_words_after_stemming)
def plot_word_frequencies(vocab_before, vocab_after_stopwords, vocab_after_stemming, top_n=10000):
plt.figure(figsize=(12, 7))
freq_before = [freq for _, freq in vocab_before.most_common(top_n)]
freq_after_stopwords = [freq for _, freq in vocab_after_stopwords.most_common(top_n)]
freq_after_stemming = [freq for _, freq in vocab_after_stemming.most_common(top_n)]
plt.loglog(range(1, len(freq_before)+1), freq_before,
label='Raw Text', color='royalblue', alpha=0.8, linewidth=2)
plt.loglog(range(1, len(freq_after_stopwords)+1), freq_after_stopwords,
label='After Stopword Removal', color='orange', alpha=0.8, linewidth=2)
plt.loglog(range(1, len(freq_after_stemming)+1), freq_after_stemming,
label='After Stemming', color='green', alpha=0.8, linewidth=2)
# Add Zipf's law reference line
zipf_x = np.array(range(1, top_n+1))
zipf_y = freq_before[0] / zipf_x
plt.plot(zipf_x, zipf_y, 'r--', label="Zipf's Law", alpha=0.5)
top_words = [word for word, _ in vocab_before.most_common(5)]
for rank, word in enumerate(top_words, 1):
freq = vocab_before[word]
plt.annotate(word, xy=(rank, freq), xytext=(rank*1.5, freq*1.5),
arrowprops=dict(facecolor='black', shrink=0.05, width=1, headwidth=4),
fontsize=9, bbox=dict(boxstyle="round,pad=0.3", fc="white", ec="gray", lw=1))
plt.title('Word Frequency Distribution (Log-Log Scale)', fontsize=14, pad=20)
plt.xlabel('Word Rank (Log Scale)', fontsize=12)
plt.ylabel('Frequency (Log Scale)', fontsize=12)
plt.grid(True, which="both", ls="-", alpha=0.2)
plt.legend(fontsize=11)
plt.text(0.02, 0.02,
"• Steep drop at left = Stopwords dominate\n"
"• Flatter curve after processing = Better balance\n"
"• Close to Zipf's line = Natural language pattern",
transform=plt.gca().transAxes, fontsize=10,
bbox=dict(boxstyle="round", fc="white", ec="gray", pad=0.4))
plt.tight_layout()
plt.show()
plot_word_frequencies(vocab_before, vocab_after_stopwords, vocab_after_stemming)

View File

@@ -1,182 +1,75 @@
import numpy as np
import random
import pandas as pd
import spacy
import nltk
import matplotlib.pyplot as plt
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from collections import Counter
from pandarallel import pandarallel
import multiprocessing
import os
import subprocess
import pyarrow as pa
import pyarrow.parquet as pq
# Download NLTK stopwords
nltk.download('stopwords')
parquet_path = "../data/processed_fakenews.parquet"
csv_path = "../data/processed_fakenews.csv"
sample_path = "../data/sampled_fakenews"
SAMPLE_FRACTION = 0.1
RANDOM_SEED = 42 # For reproducibility
# Paths
csv_path = "sampled_news.csv"
parquet_path = "sampled_news_sm.parquet"
output_parquet = "processed_fakenews.parquet"
output_csv = "processed_fakenews.csv"
def get_sample_size(total_rows, log=False):
sample_size = int(total_rows * SAMPLE_FRACTION)
if log:
print(f"📉 Reducing dataset from {total_rows:,} to {sample_size:,} rows...")
return sample_size
# Convert CSV to Parquet if needed
def sample_dataframe(df, total_rows):
sample_size = get_sample_size(total_rows=total_rows, log=True)
return df.sample(n=sample_size, random_state=RANDOM_SEED)
# Try to load from Parquet first, fall back to CSV if not available
if os.path.exists(parquet_path):
data_path = parquet_path
print(f"🔍 Loading data from Parquet file at '{parquet_path}'")
try:
# Read metadata to get row count without loading entire file
parquet_file = pq.ParquetFile(parquet_path)
total_rows = parquet_file.metadata.num_rows
print(f"🔍 Dataset contains {total_rows:,} rows.")
# Read and sample the data
df_sample = sample_dataframe(pd.read_parquet(parquet_path), total_rows)
except Exception as e:
print(f"❌ Error reading Parquet file: {e}")
print("🔄 Falling back to CSV...")
if not os.path.exists(csv_path):
print(f"❌ Error: Neither Parquet nor CSV file found at {parquet_path} or {csv_path}")
exit()
# Get total rows from CSV (Unix-like systems only due to `wc`)
total_rows = int(subprocess.check_output(["wc", "-l", csv_path]).split()[0]) - 1
print(f"🔍 Dataset contains {total_rows:,} rows.")
# Read and sample the data
df_sample = sample_dataframe(
pd.read_csv(csv_path, lineterminator="\n", on_bad_lines="skip"),
total_rows
)
elif os.path.exists(csv_path):
print("🔄 Converting CSV to Parquet...")
df = pd.read_csv(csv_path, lineterminator="\n", on_bad_lines="skip", usecols=["id", "content", "type"])
df.to_parquet(parquet_path, index=False)
print("✅ Conversion complete.")
data_path = parquet_path
print(f"🔍 Parquet file not found, loading from CSV at {csv_path}")
# Get total rows from CSV (Unix-like systems only due to `wc`)
total_rows = int(subprocess.check_output(["wc", "-l", csv_path]).split()[0]) - 1
print(f"🔍 Dataset contains {total_rows:,} rows.")
# Read and sample the data
df_sample = sample_dataframe(
pd.read_csv(csv_path, lineterminator="\n", on_bad_lines="skip"),
total_rows
)
else:
print("❌ Error: No dataset found.")
print(f"❌ Error: Neither Parquet nor CSV file found at {parquet_path} or {csv_path}")
exit()
# Load spaCy model
print("📚 Loading spaCy model...")
try:
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
except OSError:
import subprocess
print("⬇️ Model not found. Downloading...")
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"])
nlp = spacy.load("en_core_web_sm")
print("📖 spaCy model loaded.")
# Verify the sample size
print(f"✅ Sample contains {len(df_sample):,} rows (expected {get_sample_size(total_rows=total_rows):,} rows)")
# Stopwords & Stemmer
stop_words = set(stopwords.words("english"))
stemmer = PorterStemmer()
# Save the sample in both formats
df_sample.to_csv(f"{sample_path}.csv", index=False)
df_sample.to_parquet(f"{sample_path}.parquet", index=False)
# Initialize parallel processing
pandarallel.initialize(nb_workers=max(1, int(multiprocessing.cpu_count() / 2)), progress_bar=True)
batch_size = 100000
parquet_file = pq.ParquetFile(data_path)
processed_chunks = []
vocab_before = Counter()
vocab_after_stopwords = Counter()
vocab_after_stemming = Counter()
total_words_before = 0
total_words_after_stopwords = 0
total_words_after_stemming = 0
total_chars_after_stopwords = 0
total_chars_after_stemming = 0
print("🧮 Processing text in batches...")
batch_num = 0
for batch in parquet_file.iter_batches(batch_size):
print(f"🔢 Processing batch {batch_num + 1}...")
chunk = batch.to_pandas()
chunk = chunk.dropna(subset=["content"]).astype({'content': 'string'})
print("🪙 Tokenizing text...")
chunk_tokens = chunk["content"].parallel_apply(lambda text: [word.lower() for word in text.split() if word.isalpha()])
for tokens in chunk_tokens:
vocab_before.update(tokens)
total_words_before += len(tokens)
print("🚫 Removing stopwords...")
chunk_no_stopwords = chunk_tokens.parallel_apply(lambda tokens: [word for word in tokens if word not in stop_words])
for tokens in chunk_no_stopwords:
vocab_after_stopwords.update(tokens)
total_words_after_stopwords += len(tokens)
total_chars_after_stopwords += sum(len(word) for word in tokens)
print("🌱 Applying stemming...")
chunk_stemmed = chunk_no_stopwords.parallel_apply(lambda tokens: [stemmer.stem(word) for word in tokens])
for tokens in chunk_stemmed:
vocab_after_stemming.update(tokens)
total_words_after_stemming += len(tokens)
total_chars_after_stemming += sum(len(word) for word in tokens)
print("📝 Joining tokens back to text...")
chunk["processed_text"] = chunk_stemmed.parallel_apply(lambda tokens: ' '.join(tokens))
processed_chunks.append(chunk[["id", "processed_text", "type"]])
batch_num += 1
# Save processed data
final_df = pd.concat(processed_chunks, ignore_index=True)
final_df.to_parquet(output_parquet, index=False)
final_df.to_csv(output_csv, index=False)
print(f"💾 Processed data saved to '{output_parquet}' and '{output_csv}'")
total_vocab_before = len(vocab_before)
total_vocab_after_stopwords = len(vocab_after_stopwords)
total_vocab_after_stemming = len(vocab_after_stemming)
total_stopword_reduction = (total_words_before - total_words_after_stopwords) / total_words_before * 100
print(f"📊 Total words (the raw number of all words in the text, including duplicates): {total_words_before:,}")
print(f"⏮️ Before stopword removal: {total_words_before:,}")
print(f"🔻 After stopword removal: {total_words_after_stopwords:,} (-{total_stopword_reduction:.2f}%)")
vocab_stemming_reduction = (total_vocab_after_stopwords - total_vocab_after_stemming) / total_vocab_after_stopwords * 100
print(f"🫆 Vocabulary (the number of distinct words in the text, ignoring duplicates):")
print(f"⏮️ Before stemming: {total_vocab_before:,}")
print(f"🔻 After stemming: {total_vocab_after_stemming:,} (-{vocab_stemming_reduction:.2f}%)")
avg_chars_after_stopwords = total_chars_after_stopwords / total_words_after_stopwords
avg_chars_after_stemming = total_chars_after_stemming / total_words_after_stemming
avg_chars_reduction = (avg_chars_after_stopwords - avg_chars_after_stemming) / avg_chars_after_stopwords * 100
print(f"📏 Avg. length of retained words:")
print(f"⏮️ After stopword removal: {avg_chars_after_stopwords:.2f}")
print(f"🔻 After stemming: {avg_chars_after_stemming:.2f} (-{avg_chars_reduction:.2f}%)")
# Get most frequent words before and after stopword removal & stemming
def get_most_frequent_words(vocab, top_n=10):
return vocab.most_common(top_n)
top_words_before = get_most_frequent_words(vocab_before)
top_words_after_stopwords = get_most_frequent_words(vocab_after_stopwords)
top_words_after_stemming = get_most_frequent_words(vocab_after_stemming)
print("📌 Top 10 words before preprocessing:", top_words_before)
print("📌 Top 10 words after stopword removal:", top_words_after_stopwords)
print("📌 Top 10 words after stemming:", top_words_after_stemming)
def plot_word_frequencies(vocab_before, vocab_after_stopwords, vocab_after_stemming, top_n=10000):
plt.figure(figsize=(12, 7))
freq_before = [freq for _, freq in vocab_before.most_common(top_n)]
freq_after_stopwords = [freq for _, freq in vocab_after_stopwords.most_common(top_n)]
freq_after_stemming = [freq for _, freq in vocab_after_stemming.most_common(top_n)]
plt.loglog(range(1, len(freq_before)+1), freq_before,
label='Raw Text', color='royalblue', alpha=0.8, linewidth=2)
plt.loglog(range(1, len(freq_after_stopwords)+1), freq_after_stopwords,
label='After Stopword Removal', color='orange', alpha=0.8, linewidth=2)
plt.loglog(range(1, len(freq_after_stemming)+1), freq_after_stemming,
label='After Stemming', color='green', alpha=0.8, linewidth=2)
# Add Zipf's law reference line
zipf_x = np.array(range(1, top_n+1))
zipf_y = freq_before[0] / zipf_x
plt.plot(zipf_x, zipf_y, 'r--', label="Zipf's Law", alpha=0.5)
top_words = [word for word, _ in vocab_before.most_common(5)]
for rank, word in enumerate(top_words, 1):
freq = vocab_before[word]
plt.annotate(word, xy=(rank, freq), xytext=(rank*1.5, freq*1.5),
arrowprops=dict(facecolor='black', shrink=0.05, width=1, headwidth=4),
fontsize=9, bbox=dict(boxstyle="round,pad=0.3", fc="white", ec="gray", lw=1))
plt.title('Word Frequency Distribution (Log-Log Scale)', fontsize=14, pad=20)
plt.xlabel('Word Rank (Log Scale)', fontsize=12)
plt.ylabel('Frequency (Log Scale)', fontsize=12)
plt.grid(True, which="both", ls="-", alpha=0.2)
plt.legend(fontsize=11)
plt.text(0.02, 0.02,
"• Steep drop at left = Stopwords dominate\n"
"• Flatter curve after processing = Better balance\n"
"• Close to Zipf's line = Natural language pattern",
transform=plt.gca().transAxes, fontsize=10,
bbox=dict(boxstyle="round", fc="white", ec="gray", pad=0.4))
plt.tight_layout()
plt.show()
plot_word_frequencies(vocab_before, vocab_after_stopwords, vocab_after_stemming)
print(f"💾 Sample saved to '{sample_path}.csv' and '{sample_path}.parquet'.")

19
src/parquet_validator.py Normal file
View File

@@ -0,0 +1,19 @@
# Validate if a parquet file is valid or not, and print out some information about the file.
import pyarrow.parquet as pq
def validate_parquet_file(file_path):
parquet_file = None
try:
parquet_file = pq.ParquetFile(file_path)
print(f"✅ The file '{file_path}' is a valid Parquet file.")
except Exception as e:
print(f"❌ The file '{file_path}' is not a valid Parquet file.")
print(f"Error: {e}")
print(f" - Column Names: {parquet_file.schema}")
print(f" - File Metadata: {parquet_file.metadata}")
# Example usage:
validate_parquet_file("../data/processed_fakenews.parquet")
validate_parquet_file("../data/sampled_fakenews.parquet")