LangChain ✕ MLRun Integration#

langchain_mlrun is a hub module that implements LangChain integration with MLRun. Using the module allows MLRun to orchestrate LangChain and LangGraph code, enabling tracing and monitoring batch workflows and realtime deployments.


Main Components#

This is a short brief of the components available to import from the langchain_mlrun module. For full docs, see the documentation page.

Settings#

The module uses Pydantic settings classes that can be configured programmatically or via environment variables. The main class is MLRunTracerSettings. It contains two sub-settings:

  • MLRunTracerClientSettings - Connection settings (stream path, container, endpoint info). Env prefix: "LC_MLRUN_TRACER_CLIENT_"

  • MLRunTracerMonitorSettings - Controls what/how runs are captured (filters, labels, debug mode). Env prefix: "LC_MLRUN_TRACER_MONITOR_"

For more information about each setting, see the class docstrings.

Example - via code configuration#

from langchain_mlrun import MLRunTracerSettings, MLRunTracerClientSettings, MLRunTracerMonitorSettings

settings = MLRunTracerSettings(
    client=MLRunTracerClientSettings(
        stream_path="my-project/model-endpoints/stream-v1",
        container="projects",
        model_endpoint_name="my_endpoint",
        model_endpoint_uid="abc123",
        serving_function="my_function",
    ),
    monitor=MLRunTracerMonitorSettings(
        label="production",
        root_run_only=True,  # Only monitor root runs, not child runs
        tags_filter=["important"],  # Only monitor runs with this tag
    ),
)

Example - environment variable configuration#

export LC_MLRUN_TRACER_CLIENT_STREAM_PATH="my-project/model-endpoints/stream-v1"
export LC_MLRUN_TRACER_CLIENT_CONTAINER="projects"
export LC_MLRUN_TRACER_MONITOR_LABEL="production"
export LC_MLRUN_TRACER_MONITOR_ROOT_RUN_ONLY="true"

MLRun Tracer#

MLRunTracer is a LangChain-compatible tracer that converts LangChain Run objects into MLRun monitoring events and publishes them to a V3IO stream.

Key points:

  • No inheritance required - use it directly without subclassing.

  • Fully customizable via settings - control filtering, summarization, and output format.

  • Custom summarizer support - pass your own run_summarizer_function via settings to customize how runs are converted to events.

Monitoring Setup Utility Function#

setup_langchain_monitoring() is a utility function that creates the necessary MLRun infrastructure for LangChain monitoring. This is a temporary workaround until custom endpoint creation support is added to MLRun.

The function returns a dictionary of environment variables to configure auto-tracing. See how to use it in the tutorial section below.

LangChain Monitoring Application#

LangChainMonitoringApp is a base class (inheriting from MLRun’s ModelMonitoringApplicationBase) for building monitoring applications that process events from the MLRun Tracer.

It offers several built-in helper methods and metrics for analyzing LangChain runs:

  • Helper methods:

    • get_structured_runs() - Parse raw monitoring samples into structured run dictionaries with filtering options

    • iterate_structured_runs() - Iterate over all runs including nested child runs

  • Metric methods:

    • calculate_average_latency() - Average latency across root runs

    • calculate_success_rate() - Percentage of runs without errors

    • count_token_usage() - Total input/output tokens from LLM runs

    • count_run_names() - Count occurrences of each run name

The base app can be used as-is, but it is recommended to extend it with your own custom monitoring logic.


How to Apply MLRun?#

Auto Tracing#

Auto tracing automatically instruments all LangChain code by setting the LC_MLRUN_MONITORING_ENABLED environment variable and importing the module:

import os
os.environ["LC_MLRUN_MONITORING_ENABLED"] = "1"
# Set other LC_MLRUN_TRACER_* environment variables as needed...

# Import the module BEFORE any LangChain code
langchain_mlrun = mlrun.import_module("hub://langchain_mlrun")

# All LangChain/LangGraph code below will be automatically traced
chain.invoke(...)

Manual Tracing#

For more control, use the mlrun_monitoring() context manager to trace specific code blocks:

langchain_mlrun = mlrun.import_module("hub://langchain_mlrun")
mlrun_monitoring = langchain_mlrun.mlrun_monitoring
MLRunTracerSettings = langchain_mlrun.MLRunTracerSettings

# Optional: customize settings
settings = MLRunTracerSettings(...)

with mlrun_monitoring(settings=settings) as tracer:
    # Only LangChain code within this block will be traced
    result = chain.invoke({"topic": "MLRun"})

Tutorial#

In this tutorial we’ll show how to orchestrate LangChain based code with MLRun using the langchain_mlrun hub module.

Prerequisites#

Install MLRun and the langchain_mlrun requirements.

!pip install mlrun langchain~=1.2 pydantic-settings~=2.12 kafka-python~=2.3

Local Development Setup (Optional)#

Skip this section if you’re running inside a Jupyter instance deployed in the MLRun cluster.

If you’re running this notebook from your local machine, follow these steps:

Step 1: Set Environment Variables#

Run the cell below to set up all required environment variables for local development.

import os

# MLRun API endpoint:
# os.environ["MLRUN_DBPATH"] = "http://localhost:30070"

# Kafka Configuration:
# os.environ["KAFKA_BROKER"] = "<kafka-broker-address>"

# TDEngine Configuration:
# os.environ["TDENGINE_HOST"] = "<tdengine-host>"
# os.environ["TDENGINE_PORT"] = "<tdengine-port>"
# os.environ["TDENGINE_USER"] = "<tdengine-username>"
# os.environ["TDENGINE_PASSWORD"] = "<tdengine-password>"

# MinIO/S3 Configuration:
# os.environ["AWS_ACCESS_KEY_ID"] = "<your-minio-access-key>"
# os.environ["AWS_SECRET_ACCESS_KEY"] = "<your-minio-secret-key>"
# os.environ["AWS_ENDPOINT_URL_S3"] = "<s3-endpoint-url>"

Step 2: Set Up Port Forwarding#

Set up port-forwarding to access cluster services. Run these commands in separate terminal windows:

# MLRun API
kubectl port-forward -n mlrun svc/mlrun-api 30070:8080
# MinIO (S3-compatible storage)
kubectl port-forward -n mlrun svc/minio 9000:9000
# Kafka (for CE mode) - requires /etc/hosts entry: 127.0.0.1 kafka-stream
kubectl port-forward -n mlrun svc/kafka-stream 9092:9092
# TDEngine (for CE mode) - requires /etc/hosts entry: 127.0.0.1 tdengine-tsdb
kubectl port-forward -n mlrun svc/tdengine-tsdb 6041:6041

Create Project#

We’ll first create an MLRun project

import time
import datetime
import mlrun

print(f"MLRun version: {mlrun.__version__}")
print(f"CE Mode: {mlrun.mlconf.is_ce_mode()}")

project = mlrun.get_or_create_project("langchain-mlrun-tutorial")
print(f"Project: {project.name}")
MLRun version: 1.10.0
CE Mode: True
> 2026-02-03 21:43:18,053 [info] Loading project from path: {"path":"./","project_name":"langchain-mlrun-tutorial","user_project":false}
> 2026-02-03 21:43:18,141 [info] Project loaded successfully: {"path":"./","project_name":"langchain-mlrun-tutorial","stored_in_db":true}
Project: langchain-mlrun-tutorial

Enable Monitoring#

To use MLRun’s monitoring feature in our project we first need to set up the monitoring infrastructure.

  • MLRun CE: Uses Kafka for streaming (automatically detected)

  • MLRun Enterprise: Uses V3IO for streaming (automatically detected)

The cell below automatically detects your MLRun mode and sets up the appropriate streaming infrastructure.

# Create datastore profiles (based on CE or Enterprise):
if mlrun.mlconf.is_ce_mode():
    print("Setting up Kafka streaming for MLRun CE...")
    from mlrun.datastore.datastore_profile import DatastoreProfileKafkaStream, DatastoreProfileTDEngine
    
    stream_profile = DatastoreProfileKafkaStream(
        name="kafka-stream-profile",
        brokers=os.environ["KAFKA_BROKER"],
        topics=[],
    )
    tsdb_profile = DatastoreProfileTDEngine(
        name="tsdb-profile",
        user=os.environ["TDENGINE_USER"],
        password=os.environ["TDENGINE_PASSWORD"],
        host=os.environ["TDENGINE_HOST"],
        port=int(os.environ["TDENGINE_PORT"]),
    )
    project.register_datastore_profile(stream_profile)
    project.register_datastore_profile(tsdb_profile)
else:  # Enterprise
    print("Setting up V3IO streaming for MLRun Enterprise...")
    from mlrun.datastore import DatastoreProfileV3io
    
    stream_profile = DatastoreProfileV3io(name="v3io-ds", v3io_access_key=os.environ["V3IO_ACCESS_KEY"])
    tsdb_profile = stream_profile
    project.register_datastore_profile(stream_profile)

# Enable monitoring in our project:
project.set_model_monitoring_credentials(
    stream_profile_name=stream_profile.name,
    tsdb_profile_name=tsdb_profile.name,
)
project.enable_model_monitoring(
    base_period=1,
    wait_for_deployment=True,
)

print("Monitoring enabled successfully!")

Import langchain_mlrun#

Now we’ll import langchain_mlrun from the hub.

# Import the module from the hub:
langchain_mlrun = mlrun.import_module("hub://langchain_mlrun")

# Import the utility function and monitoring application from the module:
setup_langchain_monitoring = langchain_mlrun.setup_langchain_monitoring
LangChainMonitoringApp = langchain_mlrun.LangChainMonitoringApp

Create Monitorable Endpoint#

Endpoints are the entities being monitored by MLRun. We’ll use the setup_langchain_monitoring() utility function to create the model monitoring endpoint.

For MLRun CE mode, you must pass the kafka_stream_profile_name parameter with the name of the registered Kafka stream profile.

By default, the endpoint name will be "langchain_mlrun_endpoint" but you can change it by using the model_endpoint_name parameter.

# Pass kafka_stream_profile_name for CE mode (required)
env_vars = setup_langchain_monitoring(
    kafka_stream_profile_name=stream_profile.name if mlrun.mlconf.is_ce_mode() else None
)
Creating LangChain model endpoint

  [✓] Loading Project......................... Done (0.00s)
  [✓] Creating Model.......................... Done (0.31s)                            
  [✓] Creating Function....................... Done (0.04s)                                  
  [✓] Creating Model Endpoint................. Done (0.09s)                        

✨ Done! LangChain monitoring model endpoint created successfully.
You can now set the following environment variables to enable MLRun tracing in your LangChain code:

{
    "MLRUN_MONITORING_ENABLED": "1",
    "MLRUN_TRACER_CLIENT_PROJECT": "langchain-mlrun-tutorial",
    "MLRUN_TRACER_CLIENT_MODEL_ENDPOINT_NAME": "langchain_mlrun_endpoint",
    "MLRUN_TRACER_CLIENT_MODEL_ENDPOINT_UID": "d1d2b2686772441cacf687b45cd48ffa",
    "MLRUN_TRACER_CLIENT_SERVING_FUNCTION": "langchain_mlrun_function",
    "MLRUN_TRACER_CLIENT_KAFKA_STREAM_PROFILE_NAME": "kafka-stream-profile"
}

To customize the monitoring behavior, you can also set additional environment variables prefixed with 'MLRUN_TRACER_MONITOR_'. Refer to the MLRun tracer documentation for more details.

Setup Environment Variables for Auto Tracing#

We’ll use the environment variables returned from setup_langchain_monitoring to setup the environment for auto-tracing. Read the printed outputs for more information.

os.environ.update(env_vars)

Run langchain or langgraph Code#

Here we have 3 functions, each using different method utilizing LLMs with langchain and langgraph:

  • run_simple_chain - Using langchain’s chains.

  • run_simple_agent - Using langchain’s create_agent function and tools.

  • run_langgraph_graph - Using pure langgraph.

Notice: You don’t need to set OpenAI API credentials, there is a mock ChatModel that will replace it if the credentials are not set in the environment. If you wish to use OpenAI models, make sure you pip install langchain_openai and set the OPENAI_API_KEY environment variable before continue to the next cell.

Because the auto-tracing environment is set, any run will be automatically traced and monitored!

Feel free to adjust the code as you like.

Remember: To enable auto-tracing you do need to set the environment variables and import the langchain_mlrun module before any LangChain code. For batch jobs and realtime functions, make sure you set env vars in the MLRun function and add the import line langchain_mlrun = mlrun.import_module("hub://langchain_mlrun") at the top of your code.

import os
from typing import Literal, TypedDict, Annotated, Sequence, Any, Callable
from operator import add

from langchain_core.language_models import LanguageModelInput
from langchain_core.runnables import Runnable, RunnableLambda
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.language_models.fake_chat_models import FakeListChatModel, GenericFakeChatModel
from langchain.agents import create_agent
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.tools import tool, BaseTool

from langgraph.graph import StateGraph, START, END
from langchain_core.messages import BaseMessage


def _check_openai_credentials() -> bool:
    """
    Check if OpenAI API key is set in environment variables.

    :return: True if OPENAI_API_KEY is set, False otherwise.
    """
    return "OPENAI_API_KEY" in os.environ


# Import ChatOpenAI only if OpenAI credentials are available (meaning `langchain-openai` must be installed).
if _check_openai_credentials():
    from langchain_openai import ChatOpenAI

    
class _ToolEnabledFakeModel(GenericFakeChatModel):
    """
    A fake chat model that supports tool binding for running agent tracing tests.
    """

    def bind_tools(
        self,
        tools: Sequence[
            dict[str, Any] | type | Callable | BaseTool  # noqa: UP006
        ],
        *,
        tool_choice: str | None = None,
        **kwargs: Any,
    ) -> Runnable[LanguageModelInput, AIMessage]:
        return self


#: Tag value for testing tag filtering.
_dummy_tag = "dummy_tag"


def run_simple_chain() -> str:
    """
    Run a simple LangChain chain that gets a fact about a topic.
    """
    # Build a simple chain: prompt -> llm -> str output parser
    llm = ChatOpenAI(
        model="gpt-4o-mini",
        tags=[_dummy_tag]
    ) if _check_openai_credentials() else (
        FakeListChatModel(
            responses=[
                "MLRun is an open-source orchestrator for machine learning pipelines."
            ],
            tags=[_dummy_tag]
        )
    )
    prompt = ChatPromptTemplate.from_template("Tell me a short fact about {topic}")
    chain = prompt | llm | StrOutputParser()

    # Run the chain:
    response = chain.invoke({"topic": "MLRun"})
    return response


def run_simple_agent():
    """
    Run a simple LangChain agent that uses two tools to get weather and stock price.
    """
    # Define the tools:
    @tool
    def get_weather(city: str) -> str:
        """Get the current weather for a specific city."""
        return f"The weather in {city} is 22°C and sunny."

    @tool
    def get_stock_price(symbol: str) -> str:
        """Get the current stock price for a symbol."""
        return f"The stock price for {symbol} is $150.25."

    # Define the model:
    model = ChatOpenAI(
        model="gpt-4o-mini",
        tags=[_dummy_tag]
    ) if _check_openai_credentials() else (
        _ToolEnabledFakeModel(
            messages=iter(
                [
                    AIMessage(
                        content="",
                        tool_calls=[
                            {"name": "get_weather", "args": {"city": "London"}, "id": "call_abc123"},
                            {"name": "get_stock_price", "args": {"symbol": "AAPL"}, "id": "call_def456"}
                        ]
                    ),
                    AIMessage(content="The weather in London is 22°C and AAPL is trading at $150.25.")
                ]
            ),
            tags=[_dummy_tag]
        )
    )

    # Create the agent:
    agent = create_agent(
        model=model,
        tools=[get_weather, get_stock_price],
        system_prompt="You are a helpful assistant with access to tools."
    )

    # Run the agent:
    return agent.invoke({"messages": ["What is the weather in London and the stock price of AAPL?"]})


def run_langgraph_graph():
    """
    Run a LangGraph agent that uses reflection to correct its answer.
    """
    # Define the graph state:
    class AgentState(TypedDict):
        messages: Annotated[list[BaseMessage], add]
        attempts: int

    # Define the model:
    model = ChatOpenAI(model="gpt-4o-mini") if _check_openai_credentials() else (
        _ToolEnabledFakeModel(
            messages=iter(
                [
                    AIMessage(content="There are 2 'r's in Strawberry."),  # Mocking the failure
                    AIMessage(content="I stand corrected. S-t-r-a-w-b-e-r-r-y. There are 3 'r's."),  # Mocking the fix
                ]
            )
        )
    )

    # Define the graph nodes and router:
    def call_model(state: AgentState):
        response = model.invoke(state["messages"])
        return {"messages": [response], "attempts": state["attempts"] + 1}

    def reflect_node(state: AgentState):
        prompt = "Wait, count the 'r's again slowly, letter by letter. Are you sure?"
        return {"messages": [HumanMessage(content=prompt)]}

    def router(state: AgentState) -> Literal["reflect", END]:
        # Make sure there are 2 attempts at least for an answer:
        if state["attempts"] == 1:
            return "reflect"
        return END

    # Build the graph:
    builder = StateGraph(AgentState)
    builder.add_node("model", call_model)
    tagged_reflect_node = RunnableLambda(reflect_node).with_config(tags=[_dummy_tag])
    builder.add_node("reflect", tagged_reflect_node)
    builder.add_edge(START, "model")
    builder.add_conditional_edges("model", router)
    builder.add_edge("reflect", "model")
    graph = builder.compile()

    # Run the graph:
    return graph.invoke({"messages": [HumanMessage(content="How many 'r's in Strawberry?")], "attempts": 0})

Let’s create some traffic, we’ll run whatever function you want in a loop to get some events. We take timestamps in order to use them later to run the monitoring application on the data we’ll send.

# Run LangChain code and now it should be tracked and monitored in MLRun:
start_timestamp = datetime.datetime.now() - datetime.timedelta(minutes=1)
for i in range(20):
    run_simple_agent()
end_timestamp = datetime.datetime.now() + datetime.timedelta(minutes=5)
> 2026-02-04 00:05:52,553 [info] Project loaded successfully: {"project_name":"langchain-mlrun-tutorial"}

Note: Please wait a minute or two until the events are processed.

time.sleep(60)

Test the LangChain Monitoring Application#

To test a monitoring application, we use the evaluate class method. We’ll run an evaluation on the data we just sent. It is a small local job and should run fast.

Keep an eye for the returned metrics from the monitoring application.

LangChainMonitoringApp.evaluate(
    func_name="langchain-monitoring-app-test",
    func_path="langchain_mlrun.py",
    run_local=True,
    endpoints=[env_vars["LC_MLRUN_TRACER_CLIENT_MODEL_ENDPOINT_NAME"]],
    start=start_timestamp.isoformat(),
    end=end_timestamp.isoformat(),
)
> 2026-02-03 21:50:26,671 [info] Changing function name - adding `"-batch"` suffix: {"func_name":"langchain-monitoring-app-test-batch"}
> 2026-02-03 21:50:26,815 [warning] It is recommended to use k8s secret (specify secret_name), specifying aws_access_key/aws_secret_key directly is unsafe.
> 2026-02-03 21:50:26,829 [info] Storing function: {"db":"http://localhost:30070","name":"langchain-monitoring-app-test-batch--handler","uid":"f2c3c94681094915beb2c5c1ccc0dac8"}
> 2026-02-03 21:50:27,953 [warning] No data was found for any of the specified endpoints. No results were produced: {"application_name":"langchain-monitoring-app-test-batch","end":"2026-02-03T21:54:26.640556","endpoints":["langchain_mlrun_endpoint"],"start":"2026-02-03T21:48:24.904667"}
project uid iter start end state kind name labels inputs parameters results
langchain-mlrun-tutorial
...c0dac8
0 Feb 03 19:50:26 NaT completed run langchain-monitoring-app-test-batch--handler
kind=local
owner=Tomer_Weitzman
host=M-QXN63PHMF9
endpoints=['langchain_mlrun_endpoint']
start=2026-02-03T21:48:24.904667
end=2026-02-03T21:54:26.640556
base_period=None
write_output=False
existing_data_handling=fail_on_overlap
stream_profile=None

> to track results use the .show() or .logs() methods
> 2026-02-03 21:50:28,001 [info] Run execution finished: {"name":"langchain-monitoring-app-test-batch--handler","status":"completed"}
<mlrun.model.RunObject at 0x132cfaa50>

Deploy the Monitoring Application#

All that’s left to do now is to deploy our monitoring application!

# Deploy the monitoring app:
LangChainMonitoringApp.deploy(
    func_name="langchain-monitoring-app",
    func_path="langchain_mlrun.py",
    image="mlrun/mlrun",
    requirements=[
        "langchain",
        "pydantic-settings",
    ],
)

Once it is deployed, you can run events again and see the monitoring application in MLRun UI in action:

mlrun ui example