Skip to content

MLOps Hate Speech Project Documentation

Documentation of core classes and functions used in this project.

Data Preparation

load_and_prepare_dataset

Loads, merges, splits, and saves the hate speech dataset.

Parameters:

Name Type Description Default
seed int

Random seed for shuffling the dataset.

42
save_path Optional[str]

Path to save the processed dataset.

None

Returns:

Type Description
None

None

Source code in src/mlops_hatespeech/data.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def load_and_prepare_dataset(seed: int = 42, save_path: Optional[str] = None) -> None:
    """
    Loads, merges, splits, and saves the hate speech dataset.

    Args:
        seed (int): Random seed for shuffling the dataset.
        save_path (Optional[str]): Path to save the processed dataset.
        If None, uses default path under /data/processed.

    Returns:
        None
    """
    if save_path is None:
        save_path = DEFAULT_SAVE_PATH
    else:
        save_path = Path(save_path)

    # Load original dataset
    ds = load_dataset("thefrankhsu/hate_speech_twitter")

    # Concatenate train and test into one dataset
    combined = concatenate_datasets([ds["train"], ds["test"]])

    # Shuffle for good measure
    combined = combined.shuffle(seed=seed)

    # Split into train (70%), val (15%), test (15%)
    n = len(combined)
    train_size = int(0.7 * n)
    val_size = int(0.15 * n)

    train_ds = combined.select(range(0, train_size))
    val_ds = combined.select(range(train_size, train_size + val_size))
    test_ds = combined.select(range(train_size + val_size, n))

    # Combine into DatasetDict and save
    full_dataset = DatasetDict(
        {
            "train": train_ds,
            "validation": val_ds,
            "test": test_ds,
        }
    )

    # Concatenate train and test into one dataset
    combined = concatenate_datasets([ds["train"], ds["test"]])

    # Shuffle for good measure
    combined = combined.shuffle(seed=seed)

    # Split into train (70%), val (15%), test (15%)
    n = len(combined)
    train_size = int(0.7 * n)
    val_size = int(0.15 * n)

    train_ds = combined.select(range(0, train_size))
    val_ds = combined.select(range(train_size, train_size + val_size))
    test_ds = combined.select(range(train_size + val_size, n))

    # Combine into DatasetDict and save
    full_dataset = DatasetDict(
        {
            "train": train_ds,
            "validation": val_ds,
            "test": test_ds,
        }
    )

    full_dataset.save_to_disk(str(save_path))
    print(f"Dataset saved to {save_path}")

Model Training

get_config

Get the configuration from Hydra.

Source code in src/mlops_hatespeech/train.py
33
34
35
36
def get_config(overrides: Optional[List[str]]) -> DictConfig:
    """Get the configuration from Hydra."""
    with initialize(config_path="../..", job_name="train_app", version_base="1.1"):
        return compose(config_name="config", overrides=overrides or [])

train_model

Load configuration using Hydra with optional overrides.

Parameters:

Name Type Description Default
overrides Optional[List[str]]

List of override strings

required

Returns:

Name Type Description
DictConfig Trainer

Composed configuration object.

Source code in src/mlops_hatespeech/train.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def train_model(cfg: DictConfig) -> Trainer:
    """
    Load configuration using Hydra with optional overrides.

    Args:
        overrides (Optional[List[str]]): List of override strings

    Returns:
        DictConfig: Composed configuration object.
    """
    logger.info(f"Loading dataset from: {cfg.data_path}")
    ds = load_from_disk(cfg.data_path)

    idx2lbl = {
        0: "non-hate",
        1: "hate",
    }
    lbl2idx = {v: k for k, v in idx2lbl.items()}

    tokenizer = AutoTokenizer.from_pretrained(MODEL_STR)

    def tokenize_seqs(examples):
        texts = examples["tweet"]
        return tokenizer(texts, truncation=True, max_length=512)

    def is_valid(example):
        text = example["tweet"]
        return isinstance(text, str) and len(text.strip()) > 0

    ds = ds.filter(is_valid)
    ds = ds.map(tokenize_seqs, batched=True)
    ds = ds.rename_column("label", "labels")

    model = AutoModelForSequenceClassification.from_pretrained(
        MODEL_STR,
        num_labels=len(lbl2idx),
        id2label=idx2lbl,
        label2id=lbl2idx,
    )

    def compute_metrics(eval_preds):
        logits, labels = eval_preds.predictions, eval_preds.label_ids
        pred_labels = np.argmax(logits, axis=-1)

        f1 = f1_score(y_true=labels, y_pred=pred_labels, average="weighted")
        acc = accuracy_score(y_true=labels, y_pred=pred_labels)

        return {"f1": f1, "accuracy": acc}

    training_args = TrainingArguments(
        output_dir="./logs/run1",
        per_device_train_batch_size=cfg.hyperparameters.per_device_train_batch_size,
        per_gpu_eval_batch_size=cfg.hyperparameters.per_gpu_eval_batch_size,
        gradient_accumulation_steps=cfg.hyperparameters.gradient_accumulation_steps,
        learning_rate=cfg.hyperparameters.lr,
        weight_decay=cfg.hyperparameters.wd,
        num_train_epochs=cfg.hyperparameters.epochs,
        logging_strategy=cfg.hyperparameters.logging_strategy,
        logging_steps=cfg.hyperparameters.logging_steps,
        save_strategy=cfg.hyperparameters.save_strategy,
        eval_strategy=cfg.hyperparameters.eval_strategy,
        eval_steps=cfg.hyperparameters.eval_steps,
        save_total_limit=cfg.hyperparameters.save_total_limit,
        seed=cfg.hyperparameters.seed,
        data_seed=cfg.hyperparameters.seed,
        dataloader_num_workers=cfg.hyperparameters.dataloader_num_workers,
        load_best_model_at_end=cfg.hyperparameters.load_best_model_at_end,
        report_to=cfg.hyperparameters.report_to,
        use_cpu=cfg.hyperparameters.use_cpu,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        compute_metrics=compute_metrics,
        train_dataset=ds["train"],
        eval_dataset=ds["validation"],
        tokenizer=tokenizer,
    )

    trainer.train()

    preds_output = trainer.predict(ds["validation"])
    preds_probs = preds_output.predictions
    labels = preds_output.label_ids

    RocCurveDisplay.from_predictions(
        labels,
        preds_probs[:, 1],
        name="ROC Curve",
    )

    wandb.log({"roc_curve": wandb.Image(plt.gcf())})
    plt.close()

    metrics = trainer.evaluate()

    torch.save(model.state_dict(), "model.pth")
    artifact = wandb.Artifact(
        name="mlops_hatespeech_model",
        type="model",
        description="A model trained to detect hate speech in tweets.",
        metadata=metrics,
    )
    artifact.add_file("model.pth")
    wandb.log_artifact(artifact)

    return trainer

train

Entry point for training. Optionally override key hyperparameters.

Parameters:

Name Type Description Default
lr Optional[float]

Learning rate.

None
wd Optional[float]

Weight decay.

None
epochs Optional[int]

Number of training epochs.

None
seed Optional[int]

Random seed.

None
Source code in src/mlops_hatespeech/train.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@app.command()
def train(
    lr: Optional[float] = None, wd: Optional[float] = None, epochs: Optional[int] = None, seed: Optional[int] = None
) -> None:
    """
    Entry point for training. Optionally override key hyperparameters.

    Args:
        lr (Optional[float]): Learning rate.
        wd (Optional[float]): Weight decay.
        epochs (Optional[int]): Number of training epochs.
        seed (Optional[int]): Random seed.
    """
    overrides = []
    if lr is not None:
        overrides.append(f"hyperparameters.lr={lr}")
    if wd is not None:
        overrides.append(f"hyperparameters.wd={wd}")
    if epochs is not None:
        overrides.append(f"hyperparameters.epochs={epochs}")
    if seed is not None:
        overrides.append(f"hyperparameters.seed={seed}")

    cfg = get_config(overrides)

    # wandb.login(key=os.environ["WANDB_API_KEY"], relogin=True)

    wandb.init(
        project="mlops_hatespeech",
        config={
            "learning rate": cfg.hyperparameters.lr,
            "weight decay": cfg.hyperparameters.wd,
            "epochs": cfg.hyperparameters.epochs,
            "model": MODEL_STR,
        },
    )

    profiler = cProfile.Profile()
    profiler.enable()

    # Run the actual training
    trainer = train_model(cfg)

    profiler.disable()
    s = io.StringIO()
    ps = pstats.Stats(profiler, stream=s).sort_stats("cumtime")
    ps.print_stats()

    with open("reports/logs/train_profile.txt", "w") as f:
        f.write(s.getvalue())

    logger.info("Training is done.")

    wandb.finish()

Model Evaluation

find_latest_checkpoint

Finds the latest checkpoint folder in the given directory based on the highest checkpoint number.

Parameters:

Name Type Description Default
run_dir str

Path to the directory containing checkpoint folders.

'logs/run1'

Returns:

Name Type Description
str str

Path to the latest checkpoint folder.

Raises:

Type Description
FileNotFoundError

If no checkpoints are found.

Source code in src/mlops_hatespeech/evaluate.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def find_latest_checkpoint(run_dir: str = "logs/run1") -> str:
    """
    Finds the latest checkpoint folder in the given directory based on the highest checkpoint number.

    Args:
        run_dir (str): Path to the directory containing checkpoint folders.

    Returns:
        str: Path to the latest checkpoint folder.

    Raises:
        FileNotFoundError: If no checkpoints are found.
    """
    checkpoints = []
    pattern = re.compile(r"^checkpoint-(\d+)$")
    for name in os.listdir(run_dir):
        match = pattern.match(name)
        if match:
            checkpoints.append((int(match.group(1)), name))

    if not checkpoints:
        raise FileNotFoundError(f"No training checkpoints found.")

    # Pick highest
    latest_checkpoint = max(checkpoints, key=lambda x: x[0])[1]
    return os.path.join(run_dir, latest_checkpoint)

compute_metrics

Computes accuracy and weighted F1 score from model predictions and true labels.

Parameters:

Name Type Description Default
eval_preds Any

An object with 'predictions' (logits) and 'label_ids' (true labels).

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: Dictionary with 'f1' and 'accuracy' scores.

Source code in src/mlops_hatespeech/evaluate.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def compute_metrics(eval_preds: Any) -> Dict[str, float]:
    """
    Computes accuracy and weighted F1 score from model predictions and true labels.

    Args:
        eval_preds: An object with 'predictions' (logits) and 'label_ids' (true labels).

    Returns:
        Dict[str, float]: Dictionary with 'f1' and 'accuracy' scores.
    """
    logits, labels = eval_preds.predictions, eval_preds.label_ids
    pred_labels = np.argmax(logits, axis=-1)
    f1 = f1_score(y_true=labels, y_pred=pred_labels, average="weighted")
    acc = accuracy_score(y_true=labels, y_pred=pred_labels)
    return {
        "f1": f1,
        "accuracy": acc,
    }

main

Main function: Loads the latest checkpoint, prepares the dataset, runs evaluation, and uploads results as a JSON file to a GCS bucket.

Source code in src/mlops_hatespeech/evaluate.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def main():
    """
    Main function: Loads the latest checkpoint, prepares the dataset,
    runs evaluation, and uploads results as a JSON file to a GCS bucket.
    """
    # Load highest checkpoint
    checkpoint_path = find_latest_checkpoint()

    # Load data
    ds = load_from_disk("data/processed")

    def is_valid(example):
        text = example["tweet"]
        return isinstance(text, str) and len(text.strip()) > 0

    ds = ds.filter(is_valid)

    # Load tokenizer
    tokenizer = AutoTokenizer.from_pretrained(MODEL_STR)

    # Filter only valid examples where 'tweet' is a non-empty string
    def tokenize_seqs(examples):
        return tokenizer(examples["tweet"], truncation=True, max_length=512)

    ds = ds.map(tokenize_seqs, batched=True)
    ds = ds.rename_column("label", "labels")

    model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path)

    training_args = TrainingArguments(
        output_dir="./logs/eval",
        per_device_eval_batch_size=128,
        no_cuda=True,
        report_to=None,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        compute_metrics=compute_metrics,
        tokenizer=tokenizer,
    )

    # Run evaluation on the test dataset
    eval_result = trainer.evaluate(eval_dataset=ds["test"])
    print("Evaluated the thing")
    print(eval_result)

    # Save evaluation results as JSON
    output_path = "logs/eval/gen_perf.json"
    with open(output_path, "w") as f:
        json.dump(eval_result, f, indent=2)

    # Upload results JSON file to GCS bucket
    client = storage.Client(project="mlops-hs-project")
    bucket = client.bucket(BUCKET_NAME)
    blob = bucket.blob("logs/eval/gen_perf.json")
    blob.upload_from_filename(output_path)

    print("Uploaded results to Bucket.")

Drift Detection

get_bert_embeddings

Generates mean-pooled BERT embeddings for a list of texts.

Parameters:

Name Type Description Default
texts List[str]

Input texts.

required
tokenizer PreTrainedTokenizer

Tokenizer for the model.

required
model PreTrainedModel

Pretrained BERT model.

required
device str

Device to run the model on, default is "cpu".

'cpu'

Returns:

Type Description
ndarray

np.ndarray: 2D numpy array with shape (n_texts, hidden_size).

Source code in src/mlops_hatespeech/drift_detector.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def get_bert_embeddings(texts: List[str], tokenizer: Any, model: Any, device: str = "cpu") -> np.ndarray:
    """
    Generates mean-pooled BERT embeddings for a list of texts.

    Args:
        texts (List[str]): Input texts.
        tokenizer (transformers.PreTrainedTokenizer): Tokenizer for the model.
        model (transformers.PreTrainedModel): Pretrained BERT model.
        device (str): Device to run the model on, default is "cpu".

    Returns:
        np.ndarray: 2D numpy array with shape (n_texts, hidden_size).
    """
    inputs = tokenizer(texts, return_tensors="pt", padding=True, truncation=True)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs)

    attention_mask = inputs["attention_mask"]
    token_embeddings = outputs.last_hidden_state
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    summed = torch.sum(token_embeddings * input_mask_expanded, 1)
    summed_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    mean_pooled = summed / summed_mask

    return mean_pooled.cpu().numpy()

download_predictions_from_gcs

Downloads prediction JSON files from a Google Cloud Storage bucket.

Each JSON is expected to contain a 'tweet' (under 'input_text') and a 'label' (under 'prediction'). These are collected into a DataFrame for further processing.

Parameters:

Name Type Description Default
bucket_name str

Name of the GCS bucket.

required
prefix str

Prefix path under which prediction JSON files are stored.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with columns 'tweet' and 'label' from all valid JSON files.

Source code in src/mlops_hatespeech/drift_detector.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def download_predictions_from_gcs(bucket_name: str, prefix: str) -> pd.DataFrame:
    """
    Downloads prediction JSON files from a Google Cloud Storage bucket.

    Each JSON is expected to contain a 'tweet' (under 'input_text') and a 'label' (under 'prediction').
    These are collected into a DataFrame for further processing.

    Args:
        bucket_name (str): Name of the GCS bucket.
        prefix (str): Prefix path under which prediction JSON files are stored.

    Returns:
        pd.DataFrame: DataFrame with columns 'tweet' and 'label' from all valid JSON files.
    """
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=prefix))

    rows = []
    for blob in blobs:
        if blob.name.endswith(".json"):
            content = blob.download_as_string()
            data = json.loads(content)
            rows.append({"tweet": str(data.get("input_text", "")), "label": data.get("prediction", "")})
    return pd.DataFrame(rows)

upload_report_to_gcs

Uploads a local file (e.g. an HTML report) to a specified location in a GCS bucket.

Parameters:

Name Type Description Default
local_path str

Path to the local file to be uploaded.

required
bucket_name str

Name of the target GCS bucket.

required
destination_path str

Path (including filename) in the bucket where the file should be stored.

required

Returns:

Type Description
None

None

Source code in src/mlops_hatespeech/drift_detector.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def upload_report_to_gcs(local_path: str, bucket_name: str, destination_path: str) -> None:
    """
    Uploads a local file (e.g. an HTML report) to a specified location in a GCS bucket.

    Args:
        local_path (str): Path to the local file to be uploaded.
        bucket_name (str): Name of the target GCS bucket.
        destination_path (str): Path (including filename) in the bucket where the file should be stored.

    Returns:
        None
    """
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(destination_path)
    blob.upload_from_filename(local_path)
    print(f"Drift report uploaded to gs://{bucket_name}/{destination_path}")

main

Main routine for detecting embedding-based data drift on tweet inputs using BERT embeddings and uploading the Evidently report to GCS.

Source code in src/mlops_hatespeech/drift_detector.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def main() -> None:
    """
    Main routine for detecting embedding-based data drift on tweet inputs
    using BERT embeddings and uploading the Evidently report to GCS.
    """
    # Load reference dataset
    dataset = load_from_disk("data/processed")
    reference_df = dataset["train"].to_pandas()[["tweet", "label"]].copy()
    reference_df["tweet"] = reference_df["tweet"].astype(str)
    # Load current predictions from GCS
    current_df = download_predictions_from_gcs(BUCKET_NAME, PREDICTIONS_PREFIX)
    current_df["tweet"] = current_df["tweet"].astype(str)
    current_df["label"] = current_df["label"].astype(str).str.strip().str.lower().map({"non-hate": 0, "hate": 1})

    if current_df.empty:
        print("No predictions found in the bucket.")
        return

    reference_texts = [str(t) for t in reference_df["tweet"].tolist()]
    current_texts = [str(t) for t in current_df["tweet"].tolist()]

    # Generate embeddings
    reference_embeddings = get_bert_embeddings(reference_texts, tokenizer, model)
    current_embeddings = get_bert_embeddings(current_texts, tokenizer, model)
    reference_embed_df = pd.DataFrame(
        reference_embeddings, columns=[f"dim_{i}" for i in range(reference_embeddings.shape[1])]
    )
    reference_embed_df["label"] = reference_df["label"].values

    current_embed_df = pd.DataFrame(
        current_embeddings, columns=[f"dim_{i}" for i in range(current_embeddings.shape[1])]
    )
    current_embed_df["label"] = current_df["label"].values
    reference_embed_df["tweet"] = reference_df["tweet"].values
    current_embed_df["tweet"] = current_df["tweet"].values

    reference_embed_df["embedding_mean"] = reference_embeddings.mean(axis=1)
    current_embed_df["embedding_mean"] = current_embeddings.mean(axis=1)

    # Only pick relevant columns
    reference_embed_df = reference_embed_df[["tweet", "label", "embedding_mean"]]
    current_embed_df = current_embed_df[["tweet", "label", "embedding_mean"]]

    # Drift score > 70% to be categorized as data drift
    report = Report(metrics=[DataDriftPreset(threshold=0.7)])
    # generate report
    eval = report.run(reference_data=reference_embed_df, current_data=current_embed_df)

    with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as tmp:
        eval.save_html(tmp.name)
        timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
        gcs_report_path = REPORT_OUTPUT_PATH.format(timestamp=timestamp)
        upload_report_to_gcs(tmp.name, BUCKET_NAME, gcs_report_path)