Pipeline: Toxicity Guardrail (Hub Step) → LLM Model Runner#
A unified serving graph that:
Routes the user’s question through a toxicity guardrail hub step
If safe → calls a
ModelRunnerStep(LLM) and returns the answerIf toxic → blocks the request with a clear rejection response
Create or load the MLRun project that will hold the serving function and its secrets.
import mlrun
project = mlrun.get_or_create_project("hubstep-guardrail-toxicity", user_project=False, context="./", allow_cross_project=True)
> 2026-04-27 10:59:47,707 [info] Loading project from path: {"path":"./","project_name":"hubstep-guardrail-toxicity","user_project":false}
> 2026-04-27 11:00:02,102 [info] Project loaded successfully: {"path":"./","project_name":"hubstep-guardrail-toxicity","stored_in_db":true}
Load credentials from a local .env file.#
For example:
OPENAI_API_KEY="..."
OPENAI_BASE_URL="..."
OPENAI_MODEL="..."
from dotenv import load_dotenv
load_dotenv("cred.env", override=True)
Store the credentials as project secrets - see also working with secrets.
import os
project.set_secrets(
secrets={
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY"),
"OPENAI_BASE_URL": os.getenv("OPENAI_BASE_URL"),
"OPENAI_MODEL": os.getenv("OPENAI_MODEL"),
},
)
project.save()
Build the serving graph#
LLMModel wraps an OpenAI-compatible API and reads credentials to the project secrets set above.
format_answer is a plain function that flattens the ModelRunnerStep output dict
({"llm_model": {"answer": ...}}) into a simple {"answer": ...} response.
%%writefile serving_graph.py
from typing import Dict, Any
from mlrun.serving import Model
class LLMModel(Model):
"""OpenAI-compatible LLM. Credentials and model are read from env vars:
OPENAI_API_KEY, OPENAI_BASE_URL (optional), OPENAI_MODEL (optional, falls back to default_model_name).
"""
def __init__(self, default_model_name: str = "gpt-4o-mini", **kwargs):
super().__init__(**kwargs)
self.default_model_name = default_model_name
def load(self):
import openai, os
self.model_name = os.environ.get("OPENAI_MODEL", self.default_model_name)
client_kwargs = {"api_key": os.environ["OPENAI_API_KEY"]}
base_url = os.environ.get("OPENAI_BASE_URL")
if base_url:
client_kwargs["base_url"] = base_url
self._client = openai.OpenAI(**client_kwargs)
def predict(self, body: Dict[str, Any]) -> Dict[str, Any]:
question = body.get("question", "")
response = self._client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": question},
],
)
return {"answer": response.choices[0].message.content, "model": self.model_name}
def format_answer(event: Dict[str, Any]) -> Dict[str, Any]:
"""Flatten ModelRunnerStep output: {"llm_model": {"answer": ...}} → {"answer": ...}"""
if isinstance(event, dict):
for _, model_output in event.items():
if isinstance(model_output, dict):
return model_output
return event
Overwriting serving_graph.py
Wire the three-step async flow graph:
toxicity_guardrail— loaded directly fromhub://toxicity_guardrail; blocks requests with a toxicity score ≥thresholdllm_runner— aModelRunnerStepthat runsLLMModelagainst the OpenAI-compatible APIformat_answer— flattens the runner output and sends the response back to the caller
from mlrun.serving import ModelRunnerStep
fn_pipeline = project.set_function(
name="toxicity-llm-pipeline",
func="serving_graph.py",
kind="serving",
image="mlrun/mlrun",
requirements=["transformers", "torch", "openai"],
)
# Credentials come from Kubernetes secrets set above — no set_envs() needed for them.
graph = fn_pipeline.set_topology("flow", engine="async")
graph.add_step(
class_name="hub://toxicity_guardrail",
name="toxicity_guardrail",
threshold=0.5,
)
model_runner = ModelRunnerStep(name="llm_runner")
model_runner.add_model(endpoint_name="llm_model", model_class="LLMModel", execution_mechanism="naive")
graph.add_step(model_runner, after="toxicity_guardrail")
graph.add_step(name="format_answer", handler="format_answer", after="llm_runner").respond()
graph.plot(rankdir="LR")
Deploy the Serving function, with the required packages (transformers, torch, openai).
addr = project.deploy_function(fn_pipeline)
Test the pipeline with a safe question and a toxic one. Safe requests pass through to the LLM and return an answer; toxic ones are blocked by the guardrail before reaching the model.
# --- Safe input: should return an LLM answer ---
print("=== Safe input ===")
result = fn_pipeline.invoke("/", {"question": "What is the capital of the USA?"})
print("Response:", result)
# --- Toxic input: should be blocked ---
print("\n=== Toxic input ===")
try:
result = fn_pipeline.invoke("/", {"question": "You're completely useless and everything you do is a failure"})
print("Response:", result)
except Exception as e:
print(f"Blocked (expected): {e}")
=== Safe input ===
Response: {'answer': 'Washington, D.C. It’s a federal district, not part of any state.', 'model': 'gpt-5-nano-2025-08-07'}
=== Toxic input ===
Blocked (expected): bad function response 500: Exception caught in handler - "Request blocked: toxicity score 0.953 >= 0.5"