Configure mlrun project:#

%config Completer.use_jedi = False

import mlrun
from mlrun import get_or_create_project

image = "mlrun/mlrun"
project_name = "langchain-example"
project = get_or_create_project(project_name, context="./", allow_cross_project=True)
> 2025-12-03 07:17:36,530 [info] Project loaded successfully: {"project_name":"langchain-example-10"}

Create openai secret:#

# Create project secrets for project
secrets = {"OPENAI_API_KEY": "", # add your OpenAI API key here
          "OPENAI_BASE_URL": "" # add your OpenAI base url here if needed
          }
project.set_secrets(secrets=secrets, provider="kubernetes")

Write your python file:#

%%writefile langchain_model.py

# Langchain impoets for Agent initialization and use:
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.tools import tool
from langchain_core.prompts import PromptTemplate
from langchain_core.callbacks import UsageMetadataCallbackHandler
from langchain_core.tracers.langchain import wait_for_all_tracers


# mlrun imports:
import mlrun
from mlrun.serving import Model

# General imports:
from typing import Any
import asyncio

@tool
def calculator(expression: str) -> str:
    """
    Use this tool to evaluate a mathematical expression.
    It can handle addition, mutliplication, subtraction, division and exponents.
    Example: `calculator("2 + 2")` or `calculator('3**4')`
    """
    try:
        return str(eval(expression))
    except Exception as e:
        return f"Error evaluating expression: {e}"



class LangchainWrapper(Model):

    def __init__(
            self,
            *args,
            prompt_template:str,
            **kwargs
    ) -> None:
        self.prompt_template = prompt_template
        super().__init__(**kwargs)
        self.executor = None

    def load(self):
        if not self.executor:
            self.openai_cb = UsageMetadataCallbackHandler()
            model = ChatOpenAI(model="gpt-4o-mini", openai_api_key=mlrun.get_secret_or_env("OPENAI_API_KEY"),
                               openai_api_base=mlrun.get_secret_or_env("OPENAI_BASE_URL"), temperature=0, model_kwargs={
                    "stream_options": {"include_usage": True}
                }
                              )
            agent = create_tool_calling_agent(llm=model, tools=[calculator], prompt=PromptTemplate.from_template(self.prompt_template))
            self.executor = AgentExecutor(
                agent=agent,
                tools=[calculator],
                handle_parsing_errors=True,
                handle_intermediate_steps=True,
                verbose=True
            )


    def predict(self, body: Any, **kwargs) -> Any:
        if not self.executor:
            raise RuntimeError("Model not loaded. Call load() before predict().")

        print(f"Invoking Agent with {body}")

        result = self.executor.invoke(
                {"input": body},
                config={
                    "max_iterations": 100,
                    "callbacks": [self.openai_cb],
                    "run_name": "my_run",
                    "metadata": {"request_id": "123"},
                    },
                return_intermediate_steps=True
                )

        print(f"Extracting metrics {self.openai_cb.usage_metadata}")

        # Get usage from callback after invoke
        tokens_dict = self.openai_cb.usage_metadata
        tokens_metrics = {}
        # Extract for specific model or all models
        if tokens_dict:
            token_metrics = self.extract_token_metrics(tokens_dict)

        wait_for_all_tracers()

        result_body = {"output": result["output"]}
        result_body.update(token_metrics)

        return {"outputs": result_body}


    @staticmethod
    def extract_token_metrics(usage_metadata: dict) -> dict:
        result = {
            "total_tokens": 0,
            "prompt_tokens": 0,
            "completion_tokens": 0,
            "successful_requests": 0,
            "total_cost_usd": 0.0,
        }
        if not usage_metadata:
            return result

        # Get first (and only) model's metrics
        metrics = next(iter(usage_metadata.values()))

        input_tokens = metrics.get('input_tokens', 0)
        output_tokens = metrics.get('output_tokens', 0)
        total = metrics.get('total_tokens', 0)

        # gpt-4o-mini pricing: $0.15 per 1M input, $0.60 per 1M output
        input_cost = input_tokens * (0.15 / 1_000_000)
        output_cost = output_tokens * (0.60 / 1_000_000)

        result["total_tokens"] = total
        result["prompt_tokens"] = input_tokens
        result["completion_tokens"] = output_tokens
        result["successful_requests"] = 1
        result["total_cost_usd"] = input_cost + output_cost
        return result
            

Import the module from the hub:#

module = mlrun.import_module("hub://agent_deployer")

agent = module.AgentDeployer(
            agent_name="langchain_agent",
            model_class_name="LangchainWrapper",
            function="langchain_model.py",
            result_path="outputs",
            output_schema=["output" ,"total_tokens", "prompt_tokens", "completion_tokens", "successful_requests", "total_cost_usd"],
            requirements=["langchain==0.3.7", "langchain-openai==0.2.5", "langchain-community==0.3.3"],
            set_model_monitoring=True,
            prompt_template= """
            Answer the following questions as best you can.
            You have access to the following tools:
            {tools}
            Use the following format:
            Question: the input question you must answer
            Thought: you should always think about what to do
            Action: the action to take, should be one of [{tool_names}]
            Action Input: the input to the action
            Observation: the result of the action
            ... (this Thought/Action/Action Input/Observation can repeat N times)
            Thought: I now know the final answer
            Final Answer: the final answer to the original input question

            Begin!
            Question: {input}
            Thought:{agent_scratchpad}
            """,
)
> 2025-12-03 10:55:46,194 [info] Project loaded successfully: {"project_name":"langchain-example-10"}
> 2025-12-03 10:55:46,463 [info] Model monitoring credentials were set successfully. Please keep in mind that if you already had model monitoring functions / model monitoring infra / tracked model server deployed on your project, you will need to redeploy them. For redeploying the model monitoring infra, first disable it using `project.disable_model_monitoring()` and then enable it using `project.enable_model_monitoring()`.
details: MLRunConflictError("The following model-montioring infrastructure functions are already deployed, aborting: ['model-monitoring-controller', 'model-monitoring-writer']\nIf you want to redeploy the model-monitoring controller (maybe with different base-period), use update_model_monitoring_controller.If you want to redeploy all of model-monitoring infrastructure, call disable_model_monitoringbefore calling enable_model_monitoring again.")
func = agent.deploy_function(enable_tracking=True)

Invoke the deployed agent:#

func.invoke("./", {"question" : "If a pizza costs $18.75 and I want to buy 3, plus a 15% tip, what is the total cost?"})

Write your monitoring application:#

This part is optional. It allows you to see detailed monitoring metrics about the LLM usage in the MLRun UI.

%%writefile monitoring_application.py


from typing import Any, Union
import pandas as pd
import mlrun.model_monitoring.applications.context as mm_context
from mlrun.common.schemas.model_monitoring.constants import (
    ResultKindApp,
    ResultStatusApp,
)
from mlrun.model_monitoring.applications import (
    ModelMonitoringApplicationBase,
    ModelMonitoringApplicationResult,
    ModelMonitoringApplicationMetric,
)


class ModelMonitoringApplication(ModelMonitoringApplicationBase):
    name = "LLModelMonitoringApplication"

    def do_tracking(
        self,
        monitoring_context: mm_context.MonitoringApplicationContext,
    ) -> list[Union[ModelMonitoringApplicationResult,ModelMonitoringApplicationMetric]]:
        """"""
        df = monitoring_context.sample_df
        if df.empty:
            monitoring_context.logger.warning(
                "Empty dataframe received, skipping tracking"
            )
            return [], []
        # Example of processing the dataframe and creating results
        results = []
        metrics = []
        # Calculate max, min, avg, and std for the 'usage'
        for column in ["completion_tokens", "prompt_tokens", "total_tokens"]:
            if column in df.columns:
                stats = self._calculate_max_min_avg_std(df, column)
                results.append(
                    self._create_result(
                        name=f"{column}_stats",
                        value=stats["avg"],
                        kind=ResultKindApp.model_performance,
                        threshold=1000,  # Example threshold
                    )
                )
                metrics.append(
                    self._create_metric(
                        name=f"{column}_max",
                        value=stats["max"],
                    )
                )
                metrics.append(
                    self._create_metric(
                        name=f"{column}_min",
                        value=stats["min"],
                    )
                )
                metrics.append(
                    self._create_metric(
                        name=f"{column}_std",
                        value=stats["std"],
                    )
                )
        return results + metrics


    @staticmethod
    def _calculate_max_min_avg_std(df: pd.DataFrame, column: str) -> dict:
        """
        Calculate max, min, avg, and std for a given column in the dataframe.
        """
        if column not in df.columns:
            raise ValueError(f"Column '{column}' does not exist in the dataframe.")

        return {
            "max": df[column].max(),
            "min": df[column].min(),
            "avg": df[column].mean(),
            "std": df[column].std(),
        }

    @staticmethod
    def _create_result(
        name: str,
        value: float,
        kind: ResultKindApp,
        threshold: float,
    ) -> ModelMonitoringApplicationResult:
        status = ResultStatusApp.no_detection
        if value > threshold:
            status = ResultStatusApp.detected
        return ModelMonitoringApplicationResult(
            name=name,
            value=value,
            kind=kind,
            status=status,
            extra_data={
                "threshold": threshold,
                "value": value,
            },
        )

    @staticmethod
    def _create_metric(
        name: str,
        value: float,
    ) -> ModelMonitoringApplicationMetric:
        return ModelMonitoringApplicationMetric(
            name=name,
            value=value,
        )

Deploy the monitoring application:#

llm_monitoring_app = project.set_model_monitoring_function(
    func="monitoring_application.py",
    application_class="ModelMonitoringApplication",
    name="llm-monitoring",
    image=image,
)

project.deploy_function(llm_monitoring_app)