Configure mlrun project:#
%config Completer.use_jedi = False
import mlrun
from mlrun import get_or_create_project
image = "mlrun/mlrun"
project_name = "langchain-example"
project = get_or_create_project(project_name, context="./", allow_cross_project=True)
> 2025-12-03 07:17:36,530 [info] Project loaded successfully: {"project_name":"langchain-example-10"}
Create openai secret:#
# Create project secrets for project
secrets = {"OPENAI_API_KEY": "", # add your OpenAI API key here
"OPENAI_BASE_URL": "" # add your OpenAI base url here if needed
}
project.set_secrets(secrets=secrets, provider="kubernetes")
Write your python file:#
%%writefile langchain_model.py
# Langchain impoets for Agent initialization and use:
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.tools import tool
from langchain_core.prompts import PromptTemplate
from langchain_core.callbacks import UsageMetadataCallbackHandler
from langchain_core.tracers.langchain import wait_for_all_tracers
# mlrun imports:
import mlrun
from mlrun.serving import Model
# General imports:
from typing import Any
import asyncio
@tool
def calculator(expression: str) -> str:
"""
Use this tool to evaluate a mathematical expression.
It can handle addition, mutliplication, subtraction, division and exponents.
Example: `calculator("2 + 2")` or `calculator('3**4')`
"""
try:
return str(eval(expression))
except Exception as e:
return f"Error evaluating expression: {e}"
class LangchainWrapper(Model):
def __init__(
self,
*args,
prompt_template:str,
**kwargs
) -> None:
self.prompt_template = prompt_template
super().__init__(**kwargs)
self.executor = None
def load(self):
if not self.executor:
self.openai_cb = UsageMetadataCallbackHandler()
model = ChatOpenAI(model="gpt-4o-mini", openai_api_key=mlrun.get_secret_or_env("OPENAI_API_KEY"),
openai_api_base=mlrun.get_secret_or_env("OPENAI_BASE_URL"), temperature=0, model_kwargs={
"stream_options": {"include_usage": True}
}
)
agent = create_tool_calling_agent(llm=model, tools=[calculator], prompt=PromptTemplate.from_template(self.prompt_template))
self.executor = AgentExecutor(
agent=agent,
tools=[calculator],
handle_parsing_errors=True,
handle_intermediate_steps=True,
verbose=True
)
def predict(self, body: Any, **kwargs) -> Any:
if not self.executor:
raise RuntimeError("Model not loaded. Call load() before predict().")
print(f"Invoking Agent with {body}")
result = self.executor.invoke(
{"input": body},
config={
"max_iterations": 100,
"callbacks": [self.openai_cb],
"run_name": "my_run",
"metadata": {"request_id": "123"},
},
return_intermediate_steps=True
)
print(f"Extracting metrics {self.openai_cb.usage_metadata}")
# Get usage from callback after invoke
tokens_dict = self.openai_cb.usage_metadata
tokens_metrics = {}
# Extract for specific model or all models
if tokens_dict:
token_metrics = self.extract_token_metrics(tokens_dict)
wait_for_all_tracers()
result_body = {"output": result["output"]}
result_body.update(token_metrics)
return {"outputs": result_body}
@staticmethod
def extract_token_metrics(usage_metadata: dict) -> dict:
result = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
"total_cost_usd": 0.0,
}
if not usage_metadata:
return result
# Get first (and only) model's metrics
metrics = next(iter(usage_metadata.values()))
input_tokens = metrics.get('input_tokens', 0)
output_tokens = metrics.get('output_tokens', 0)
total = metrics.get('total_tokens', 0)
# gpt-4o-mini pricing: $0.15 per 1M input, $0.60 per 1M output
input_cost = input_tokens * (0.15 / 1_000_000)
output_cost = output_tokens * (0.60 / 1_000_000)
result["total_tokens"] = total
result["prompt_tokens"] = input_tokens
result["completion_tokens"] = output_tokens
result["successful_requests"] = 1
result["total_cost_usd"] = input_cost + output_cost
return result
Import the module from the hub:#
module = mlrun.import_module("hub://agent_deployer")
agent = module.AgentDeployer(
agent_name="langchain_agent",
model_class_name="LangchainWrapper",
function="langchain_model.py",
result_path="outputs",
output_schema=["output" ,"total_tokens", "prompt_tokens", "completion_tokens", "successful_requests", "total_cost_usd"],
requirements=["langchain==0.3.7", "langchain-openai==0.2.5", "langchain-community==0.3.3"],
set_model_monitoring=True,
prompt_template= """
Answer the following questions as best you can.
You have access to the following tools:
{tools}
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin!
Question: {input}
Thought:{agent_scratchpad}
""",
)
> 2025-12-03 10:55:46,194 [info] Project loaded successfully: {"project_name":"langchain-example-10"}
> 2025-12-03 10:55:46,463 [info] Model monitoring credentials were set successfully. Please keep in mind that if you already had model monitoring functions / model monitoring infra / tracked model server deployed on your project, you will need to redeploy them. For redeploying the model monitoring infra, first disable it using `project.disable_model_monitoring()` and then enable it using `project.enable_model_monitoring()`.
details: MLRunConflictError("The following model-montioring infrastructure functions are already deployed, aborting: ['model-monitoring-controller', 'model-monitoring-writer']\nIf you want to redeploy the model-monitoring controller (maybe with different base-period), use update_model_monitoring_controller.If you want to redeploy all of model-monitoring infrastructure, call disable_model_monitoringbefore calling enable_model_monitoring again.")
func = agent.deploy_function(enable_tracking=True)
Invoke the deployed agent:#
func.invoke("./", {"question" : "If a pizza costs $18.75 and I want to buy 3, plus a 15% tip, what is the total cost?"})
Write your monitoring application:#
This part is optional. It allows you to see detailed monitoring metrics about the LLM usage in the MLRun UI.
%%writefile monitoring_application.py
from typing import Any, Union
import pandas as pd
import mlrun.model_monitoring.applications.context as mm_context
from mlrun.common.schemas.model_monitoring.constants import (
ResultKindApp,
ResultStatusApp,
)
from mlrun.model_monitoring.applications import (
ModelMonitoringApplicationBase,
ModelMonitoringApplicationResult,
ModelMonitoringApplicationMetric,
)
class ModelMonitoringApplication(ModelMonitoringApplicationBase):
name = "LLModelMonitoringApplication"
def do_tracking(
self,
monitoring_context: mm_context.MonitoringApplicationContext,
) -> list[Union[ModelMonitoringApplicationResult,ModelMonitoringApplicationMetric]]:
""""""
df = monitoring_context.sample_df
if df.empty:
monitoring_context.logger.warning(
"Empty dataframe received, skipping tracking"
)
return [], []
# Example of processing the dataframe and creating results
results = []
metrics = []
# Calculate max, min, avg, and std for the 'usage'
for column in ["completion_tokens", "prompt_tokens", "total_tokens"]:
if column in df.columns:
stats = self._calculate_max_min_avg_std(df, column)
results.append(
self._create_result(
name=f"{column}_stats",
value=stats["avg"],
kind=ResultKindApp.model_performance,
threshold=1000, # Example threshold
)
)
metrics.append(
self._create_metric(
name=f"{column}_max",
value=stats["max"],
)
)
metrics.append(
self._create_metric(
name=f"{column}_min",
value=stats["min"],
)
)
metrics.append(
self._create_metric(
name=f"{column}_std",
value=stats["std"],
)
)
return results + metrics
@staticmethod
def _calculate_max_min_avg_std(df: pd.DataFrame, column: str) -> dict:
"""
Calculate max, min, avg, and std for a given column in the dataframe.
"""
if column not in df.columns:
raise ValueError(f"Column '{column}' does not exist in the dataframe.")
return {
"max": df[column].max(),
"min": df[column].min(),
"avg": df[column].mean(),
"std": df[column].std(),
}
@staticmethod
def _create_result(
name: str,
value: float,
kind: ResultKindApp,
threshold: float,
) -> ModelMonitoringApplicationResult:
status = ResultStatusApp.no_detection
if value > threshold:
status = ResultStatusApp.detected
return ModelMonitoringApplicationResult(
name=name,
value=value,
kind=kind,
status=status,
extra_data={
"threshold": threshold,
"value": value,
},
)
@staticmethod
def _create_metric(
name: str,
value: float,
) -> ModelMonitoringApplicationMetric:
return ModelMonitoringApplicationMetric(
name=name,
value=value,
)
Deploy the monitoring application:#
llm_monitoring_app = project.set_model_monitoring_function(
func="monitoring_application.py",
application_class="ModelMonitoringApplication",
name="llm-monitoring",
image=image,
)
project.deploy_function(llm_monitoring_app)