Pipeline: Toxicity Guardrail (Hub Step) → LLM Model Runner#

A unified serving graph that:

  1. Routes the user’s question through a toxicity guardrail hub step

  2. If safe → calls a ModelRunnerStep (LLM) and returns the answer

  3. If toxic → blocks the request with a clear rejection response

Create or load the MLRun project that will hold the serving function and its secrets.

import mlrun
project = mlrun.get_or_create_project("hubstep-guardrail-toxicity", user_project=False, context="./", allow_cross_project=True)
> 2026-04-27 10:59:47,707 [info] Loading project from path: {"path":"./","project_name":"hubstep-guardrail-toxicity","user_project":false}
> 2026-04-27 11:00:02,102 [info] Project loaded successfully: {"path":"./","project_name":"hubstep-guardrail-toxicity","stored_in_db":true}

Load credentials from a local .env file.#

For example:

OPENAI_API_KEY="..."
OPENAI_BASE_URL="..."
OPENAI_MODEL="..."
from dotenv import load_dotenv

load_dotenv("cred.env", override=True)

Store the credentials as project secrets - see also working with secrets.

import os
project.set_secrets(
    secrets={
        "OPENAI_API_KEY": os.getenv("OPENAI_API_KEY"),
        "OPENAI_BASE_URL": os.getenv("OPENAI_BASE_URL"),
        "OPENAI_MODEL":   os.getenv("OPENAI_MODEL"),
    },
)
project.save()

Build the serving graph#

LLMModel wraps an OpenAI-compatible API and reads credentials to the project secrets set above. format_answer is a plain function that flattens the ModelRunnerStep output dict ({"llm_model": {"answer": ...}}) into a simple {"answer": ...} response.

%%writefile serving_graph.py
from typing import Dict, Any
from mlrun.serving import Model

class LLMModel(Model):
    """OpenAI-compatible LLM. Credentials and model are read from env vars:
    OPENAI_API_KEY, OPENAI_BASE_URL (optional), OPENAI_MODEL (optional, falls back to default_model_name).
    """

    def __init__(self, default_model_name: str = "gpt-4o-mini", **kwargs):
        super().__init__(**kwargs)
        self.default_model_name = default_model_name

    def load(self):
        import openai, os
        self.model_name = os.environ.get("OPENAI_MODEL", self.default_model_name)
        client_kwargs = {"api_key": os.environ["OPENAI_API_KEY"]}
        base_url = os.environ.get("OPENAI_BASE_URL")
        if base_url:
            client_kwargs["base_url"] = base_url
        self._client = openai.OpenAI(**client_kwargs)

    def predict(self, body: Dict[str, Any]) -> Dict[str, Any]:
        question = body.get("question", "")
        response = self._client.chat.completions.create(
            model=self.model_name,
            messages=[
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user",   "content": question},
            ],
        )
        return {"answer": response.choices[0].message.content, "model": self.model_name}


def format_answer(event: Dict[str, Any]) -> Dict[str, Any]:
    """Flatten ModelRunnerStep output: {"llm_model": {"answer": ...}} → {"answer": ...}"""
    if isinstance(event, dict):
        for _, model_output in event.items():
            if isinstance(model_output, dict):
                return model_output
    return event
Overwriting serving_graph.py

Wire the three-step async flow graph:

  1. toxicity_guardrail — loaded directly from hub://toxicity_guardrail; blocks requests with a toxicity score ≥ threshold

  2. llm_runner — a ModelRunnerStep that runs LLMModel against the OpenAI-compatible API

  3. format_answer — flattens the runner output and sends the response back to the caller

from mlrun.serving import ModelRunnerStep

fn_pipeline = project.set_function(
    name="toxicity-llm-pipeline",
    func="serving_graph.py",
    kind="serving",
    image="mlrun/mlrun",
    requirements=["transformers", "torch", "openai"],
)
# Credentials come from Kubernetes secrets set above — no set_envs() needed for them.

graph = fn_pipeline.set_topology("flow", engine="async")

graph.add_step(
    class_name="hub://toxicity_guardrail",
    name="toxicity_guardrail",
    threshold=0.5,
)

model_runner = ModelRunnerStep(name="llm_runner")
model_runner.add_model(endpoint_name="llm_model", model_class="LLMModel", execution_mechanism="naive")
graph.add_step(model_runner, after="toxicity_guardrail")

graph.add_step(name="format_answer", handler="format_answer", after="llm_runner").respond()

graph.plot(rankdir="LR")

Deploy the Serving function, with the required packages (transformers, torch, openai).

addr = project.deploy_function(fn_pipeline)

Test the pipeline with a safe question and a toxic one. Safe requests pass through to the LLM and return an answer; toxic ones are blocked by the guardrail before reaching the model.

# --- Safe input: should return an LLM answer ---
print("=== Safe input ===")
result = fn_pipeline.invoke("/", {"question": "What is the capital of the USA?"})
print("Response:", result)

# --- Toxic input: should be blocked ---
print("\n=== Toxic input ===")
try:
    result = fn_pipeline.invoke("/", {"question": "You're completely useless and everything you do is a failure"})
    print("Response:", result)
except Exception as e:
    print(f"Blocked (expected): {e}")
=== Safe input ===
Response: {'answer': 'Washington, D.C. It’s a federal district, not part of any state.', 'model': 'gpt-5-nano-2025-08-07'}

=== Toxic input ===
Blocked (expected): bad function response 500: Exception caught in handler - "Request blocked: toxicity score 0.953 >= 0.5"