LangChain ✕ MLRun Integration#
langchain_mlrun is a hub module that implements LangChain integration with MLRun. Using the module allows MLRun to orchestrate LangChain and LangGraph code, enabling tracing and monitoring batch workflows and realtime deployments.
Main Components#
This is a short brief of the components available to import from the langchain_mlrun module. For full docs, see the documentation page.
Settings#
The module uses Pydantic settings classes that can be configured programmatically or via environment variables. The main class is MLRunTracerSettings. It contains two sub-settings:
MLRunTracerClientSettings- Connection settings (stream path, container, endpoint info). Env prefix:"LC_MLRUN_TRACER_CLIENT_"MLRunTracerMonitorSettings- Controls what/how runs are captured (filters, labels, debug mode). Env prefix:"LC_MLRUN_TRACER_MONITOR_"
For more information about each setting, see the class docstrings.
Example - via code configuration#
from langchain_mlrun import MLRunTracerSettings, MLRunTracerClientSettings, MLRunTracerMonitorSettings
settings = MLRunTracerSettings(
client=MLRunTracerClientSettings(
stream_path="my-project/model-endpoints/stream-v1",
container="projects",
model_endpoint_name="my_endpoint",
model_endpoint_uid="abc123",
serving_function="my_function",
),
monitor=MLRunTracerMonitorSettings(
label="production",
root_run_only=True, # Only monitor root runs, not child runs
tags_filter=["important"], # Only monitor runs with this tag
),
)
Example - environment variable configuration#
export LC_MLRUN_TRACER_CLIENT_STREAM_PATH="my-project/model-endpoints/stream-v1"
export LC_MLRUN_TRACER_CLIENT_CONTAINER="projects"
export LC_MLRUN_TRACER_MONITOR_LABEL="production"
export LC_MLRUN_TRACER_MONITOR_ROOT_RUN_ONLY="true"
MLRun Tracer#
MLRunTracer is a LangChain-compatible tracer that converts LangChain Run objects into MLRun monitoring events and publishes them to a V3IO stream.
Key points:
No inheritance required - use it directly without subclassing.
Fully customizable via settings - control filtering, summarization, and output format.
Custom summarizer support - pass your own
run_summarizer_functionvia settings to customize how runs are converted to events.
Monitoring Setup Utility Function#
setup_langchain_monitoring() is a utility function that creates the necessary MLRun infrastructure for LangChain monitoring. This is a temporary workaround until custom endpoint creation support is added to MLRun.
The function returns a dictionary of environment variables to configure auto-tracing. See how to use it in the tutorial section below.
LangChain Monitoring Application#
LangChainMonitoringApp is a base class (inheriting from MLRun’s ModelMonitoringApplicationBase) for building monitoring applications that process events from the MLRun Tracer.
It offers several built-in helper methods and metrics for analyzing LangChain runs:
Helper methods:
get_structured_runs()- Parse raw monitoring samples into structured run dictionaries with filtering optionsiterate_structured_runs()- Iterate over all runs including nested child runs
Metric methods:
calculate_average_latency()- Average latency across root runscalculate_success_rate()- Percentage of runs without errorscount_token_usage()- Total input/output tokens from LLM runscount_run_names()- Count occurrences of each run name
The base app can be used as-is, but it is recommended to extend it with your own custom monitoring logic.
How to Apply MLRun?#
Auto Tracing#
Auto tracing automatically instruments all LangChain code by setting the LC_MLRUN_MONITORING_ENABLED environment variable and importing the module:
import os
os.environ["LC_MLRUN_MONITORING_ENABLED"] = "1"
# Set other LC_MLRUN_TRACER_* environment variables as needed...
# Import the module BEFORE any LangChain code
langchain_mlrun = mlrun.import_module("hub://langchain_mlrun")
# All LangChain/LangGraph code below will be automatically traced
chain.invoke(...)
Manual Tracing#
For more control, use the mlrun_monitoring() context manager to trace specific code blocks:
langchain_mlrun = mlrun.import_module("hub://langchain_mlrun")
mlrun_monitoring = langchain_mlrun.mlrun_monitoring
MLRunTracerSettings = langchain_mlrun.MLRunTracerSettings
# Optional: customize settings
settings = MLRunTracerSettings(...)
with mlrun_monitoring(settings=settings) as tracer:
# Only LangChain code within this block will be traced
result = chain.invoke({"topic": "MLRun"})
Tutorial#
In this tutorial we’ll show how to orchestrate LangChain based code with MLRun using the langchain_mlrun hub module.
Prerequisites#
Install MLRun and the langchain_mlrun requirements.
!pip install mlrun langchain~=1.2 pydantic-settings~=2.12 kafka-python~=2.3
Local Development Setup (Optional)#
Skip this section if you’re running inside a Jupyter instance deployed in the MLRun cluster.
If you’re running this notebook from your local machine, follow these steps:
Step 1: Set Environment Variables#
Run the cell below to set up all required environment variables for local development.
import os
# MLRun API endpoint:
# os.environ["MLRUN_DBPATH"] = "http://localhost:30070"
# Kafka Configuration:
# os.environ["KAFKA_BROKER"] = "<kafka-broker-address>"
# TDEngine Configuration:
# os.environ["TDENGINE_HOST"] = "<tdengine-host>"
# os.environ["TDENGINE_PORT"] = "<tdengine-port>"
# os.environ["TDENGINE_USER"] = "<tdengine-username>"
# os.environ["TDENGINE_PASSWORD"] = "<tdengine-password>"
# MinIO/S3 Configuration:
# os.environ["AWS_ACCESS_KEY_ID"] = "<your-minio-access-key>"
# os.environ["AWS_SECRET_ACCESS_KEY"] = "<your-minio-secret-key>"
# os.environ["AWS_ENDPOINT_URL_S3"] = "<s3-endpoint-url>"
Step 2: Set Up Port Forwarding#
Set up port-forwarding to access cluster services. Run these commands in separate terminal windows:
# MLRun API
kubectl port-forward -n mlrun svc/mlrun-api 30070:8080
# MinIO (S3-compatible storage)
kubectl port-forward -n mlrun svc/minio 9000:9000
# Kafka (for CE mode) - requires /etc/hosts entry: 127.0.0.1 kafka-stream
kubectl port-forward -n mlrun svc/kafka-stream 9092:9092
# TDEngine (for CE mode) - requires /etc/hosts entry: 127.0.0.1 tdengine-tsdb
kubectl port-forward -n mlrun svc/tdengine-tsdb 6041:6041
Create Project#
We’ll first create an MLRun project
import time
import datetime
import mlrun
print(f"MLRun version: {mlrun.__version__}")
print(f"CE Mode: {mlrun.mlconf.is_ce_mode()}")
project = mlrun.get_or_create_project("langchain-mlrun-tutorial")
print(f"Project: {project.name}")
MLRun version: 1.10.0
CE Mode: True
> 2026-02-03 21:43:18,053 [info] Loading project from path: {"path":"./","project_name":"langchain-mlrun-tutorial","user_project":false}
> 2026-02-03 21:43:18,141 [info] Project loaded successfully: {"path":"./","project_name":"langchain-mlrun-tutorial","stored_in_db":true}
Project: langchain-mlrun-tutorial
Enable Monitoring#
To use MLRun’s monitoring feature in our project we first need to set up the monitoring infrastructure.
MLRun CE: Uses Kafka for streaming (automatically detected)
MLRun Enterprise: Uses V3IO for streaming (automatically detected)
The cell below automatically detects your MLRun mode and sets up the appropriate streaming infrastructure.
# Create datastore profiles (based on CE or Enterprise):
if mlrun.mlconf.is_ce_mode():
print("Setting up Kafka streaming for MLRun CE...")
from mlrun.datastore.datastore_profile import DatastoreProfileKafkaStream, DatastoreProfileTDEngine
stream_profile = DatastoreProfileKafkaStream(
name="kafka-stream-profile",
brokers=os.environ["KAFKA_BROKER"],
topics=[],
)
tsdb_profile = DatastoreProfileTDEngine(
name="tsdb-profile",
user=os.environ["TDENGINE_USER"],
password=os.environ["TDENGINE_PASSWORD"],
host=os.environ["TDENGINE_HOST"],
port=int(os.environ["TDENGINE_PORT"]),
)
project.register_datastore_profile(stream_profile)
project.register_datastore_profile(tsdb_profile)
else: # Enterprise
print("Setting up V3IO streaming for MLRun Enterprise...")
from mlrun.datastore import DatastoreProfileV3io
stream_profile = DatastoreProfileV3io(name="v3io-ds", v3io_access_key=os.environ["V3IO_ACCESS_KEY"])
tsdb_profile = stream_profile
project.register_datastore_profile(stream_profile)
# Enable monitoring in our project:
project.set_model_monitoring_credentials(
stream_profile_name=stream_profile.name,
tsdb_profile_name=tsdb_profile.name,
)
project.enable_model_monitoring(
base_period=1,
wait_for_deployment=True,
)
print("Monitoring enabled successfully!")
Import langchain_mlrun#
Now we’ll import langchain_mlrun from the hub.
# Import the module from the hub:
langchain_mlrun = mlrun.import_module("hub://langchain_mlrun")
# Import the utility function and monitoring application from the module:
setup_langchain_monitoring = langchain_mlrun.setup_langchain_monitoring
LangChainMonitoringApp = langchain_mlrun.LangChainMonitoringApp
Create Monitorable Endpoint#
Endpoints are the entities being monitored by MLRun. We’ll use the setup_langchain_monitoring() utility function to create the model monitoring endpoint.
For MLRun CE mode, you must pass the kafka_stream_profile_name parameter with the name of the registered Kafka stream profile.
By default, the endpoint name will be "langchain_mlrun_endpoint" but you can change it by using the model_endpoint_name parameter.
# Pass kafka_stream_profile_name for CE mode (required)
env_vars = setup_langchain_monitoring(
kafka_stream_profile_name=stream_profile.name if mlrun.mlconf.is_ce_mode() else None
)
Creating LangChain model endpoint
[✓] Loading Project......................... Done (0.00s)
[✓] Creating Model.......................... Done (0.31s)
[✓] Creating Function....................... Done (0.04s)
[✓] Creating Model Endpoint................. Done (0.09s)
✨ Done! LangChain monitoring model endpoint created successfully.
You can now set the following environment variables to enable MLRun tracing in your LangChain code:
{
"MLRUN_MONITORING_ENABLED": "1",
"MLRUN_TRACER_CLIENT_PROJECT": "langchain-mlrun-tutorial",
"MLRUN_TRACER_CLIENT_MODEL_ENDPOINT_NAME": "langchain_mlrun_endpoint",
"MLRUN_TRACER_CLIENT_MODEL_ENDPOINT_UID": "d1d2b2686772441cacf687b45cd48ffa",
"MLRUN_TRACER_CLIENT_SERVING_FUNCTION": "langchain_mlrun_function",
"MLRUN_TRACER_CLIENT_KAFKA_STREAM_PROFILE_NAME": "kafka-stream-profile"
}
To customize the monitoring behavior, you can also set additional environment variables prefixed with 'MLRUN_TRACER_MONITOR_'. Refer to the MLRun tracer documentation for more details.
Setup Environment Variables for Auto Tracing#
We’ll use the environment variables returned from setup_langchain_monitoring to setup the environment for auto-tracing. Read the printed outputs for more information.
os.environ.update(env_vars)
Run langchain or langgraph Code#
Here we have 3 functions, each using different method utilizing LLMs with langchain and langgraph:
run_simple_chain- Usinglangchain’s chains.run_simple_agent- Usinglangchain’screate_agentfunction andtools.run_langgraph_graph- Using purelanggraph.
Notice: You don’t need to set OpenAI API credentials, there is a mock
ChatModelthat will replace it if the credentials are not set in the environment. If you wish to use OpenAI models, make sure youpip install langchain_openaiand set theOPENAI_API_KEYenvironment variable before continue to the next cell.
Because the auto-tracing environment is set, any run will be automatically traced and monitored!
Feel free to adjust the code as you like.
Remember: To enable auto-tracing you do need to set the environment variables and import the
langchain_mlrunmodule before any LangChain code. For batch jobs and realtime functions, make sure you set env vars in the MLRun function and add the import linelangchain_mlrun = mlrun.import_module("hub://langchain_mlrun")at the top of your code.
import os
from typing import Literal, TypedDict, Annotated, Sequence, Any, Callable
from operator import add
from langchain_core.language_models import LanguageModelInput
from langchain_core.runnables import Runnable, RunnableLambda
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.language_models.fake_chat_models import FakeListChatModel, GenericFakeChatModel
from langchain.agents import create_agent
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.tools import tool, BaseTool
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import BaseMessage
def _check_openai_credentials() -> bool:
"""
Check if OpenAI API key is set in environment variables.
:return: True if OPENAI_API_KEY is set, False otherwise.
"""
return "OPENAI_API_KEY" in os.environ
# Import ChatOpenAI only if OpenAI credentials are available (meaning `langchain-openai` must be installed).
if _check_openai_credentials():
from langchain_openai import ChatOpenAI
class _ToolEnabledFakeModel(GenericFakeChatModel):
"""
A fake chat model that supports tool binding for running agent tracing tests.
"""
def bind_tools(
self,
tools: Sequence[
dict[str, Any] | type | Callable | BaseTool # noqa: UP006
],
*,
tool_choice: str | None = None,
**kwargs: Any,
) -> Runnable[LanguageModelInput, AIMessage]:
return self
#: Tag value for testing tag filtering.
_dummy_tag = "dummy_tag"
def run_simple_chain() -> str:
"""
Run a simple LangChain chain that gets a fact about a topic.
"""
# Build a simple chain: prompt -> llm -> str output parser
llm = ChatOpenAI(
model="gpt-4o-mini",
tags=[_dummy_tag]
) if _check_openai_credentials() else (
FakeListChatModel(
responses=[
"MLRun is an open-source orchestrator for machine learning pipelines."
],
tags=[_dummy_tag]
)
)
prompt = ChatPromptTemplate.from_template("Tell me a short fact about {topic}")
chain = prompt | llm | StrOutputParser()
# Run the chain:
response = chain.invoke({"topic": "MLRun"})
return response
def run_simple_agent():
"""
Run a simple LangChain agent that uses two tools to get weather and stock price.
"""
# Define the tools:
@tool
def get_weather(city: str) -> str:
"""Get the current weather for a specific city."""
return f"The weather in {city} is 22°C and sunny."
@tool
def get_stock_price(symbol: str) -> str:
"""Get the current stock price for a symbol."""
return f"The stock price for {symbol} is $150.25."
# Define the model:
model = ChatOpenAI(
model="gpt-4o-mini",
tags=[_dummy_tag]
) if _check_openai_credentials() else (
_ToolEnabledFakeModel(
messages=iter(
[
AIMessage(
content="",
tool_calls=[
{"name": "get_weather", "args": {"city": "London"}, "id": "call_abc123"},
{"name": "get_stock_price", "args": {"symbol": "AAPL"}, "id": "call_def456"}
]
),
AIMessage(content="The weather in London is 22°C and AAPL is trading at $150.25.")
]
),
tags=[_dummy_tag]
)
)
# Create the agent:
agent = create_agent(
model=model,
tools=[get_weather, get_stock_price],
system_prompt="You are a helpful assistant with access to tools."
)
# Run the agent:
return agent.invoke({"messages": ["What is the weather in London and the stock price of AAPL?"]})
def run_langgraph_graph():
"""
Run a LangGraph agent that uses reflection to correct its answer.
"""
# Define the graph state:
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add]
attempts: int
# Define the model:
model = ChatOpenAI(model="gpt-4o-mini") if _check_openai_credentials() else (
_ToolEnabledFakeModel(
messages=iter(
[
AIMessage(content="There are 2 'r's in Strawberry."), # Mocking the failure
AIMessage(content="I stand corrected. S-t-r-a-w-b-e-r-r-y. There are 3 'r's."), # Mocking the fix
]
)
)
)
# Define the graph nodes and router:
def call_model(state: AgentState):
response = model.invoke(state["messages"])
return {"messages": [response], "attempts": state["attempts"] + 1}
def reflect_node(state: AgentState):
prompt = "Wait, count the 'r's again slowly, letter by letter. Are you sure?"
return {"messages": [HumanMessage(content=prompt)]}
def router(state: AgentState) -> Literal["reflect", END]:
# Make sure there are 2 attempts at least for an answer:
if state["attempts"] == 1:
return "reflect"
return END
# Build the graph:
builder = StateGraph(AgentState)
builder.add_node("model", call_model)
tagged_reflect_node = RunnableLambda(reflect_node).with_config(tags=[_dummy_tag])
builder.add_node("reflect", tagged_reflect_node)
builder.add_edge(START, "model")
builder.add_conditional_edges("model", router)
builder.add_edge("reflect", "model")
graph = builder.compile()
# Run the graph:
return graph.invoke({"messages": [HumanMessage(content="How many 'r's in Strawberry?")], "attempts": 0})
Let’s create some traffic, we’ll run whatever function you want in a loop to get some events. We take timestamps in order to use them later to run the monitoring application on the data we’ll send.
# Run LangChain code and now it should be tracked and monitored in MLRun:
start_timestamp = datetime.datetime.now() - datetime.timedelta(minutes=1)
for i in range(20):
run_simple_agent()
end_timestamp = datetime.datetime.now() + datetime.timedelta(minutes=5)
> 2026-02-04 00:05:52,553 [info] Project loaded successfully: {"project_name":"langchain-mlrun-tutorial"}
Note: Please wait a minute or two until the events are processed.
time.sleep(60)
Test the LangChain Monitoring Application#
To test a monitoring application, we use the evaluate class method. We’ll run an evaluation on the data we just sent. It is a small local job and should run fast.
Keep an eye for the returned metrics from the monitoring application.
LangChainMonitoringApp.evaluate(
func_name="langchain-monitoring-app-test",
func_path="langchain_mlrun.py",
run_local=True,
endpoints=[env_vars["LC_MLRUN_TRACER_CLIENT_MODEL_ENDPOINT_NAME"]],
start=start_timestamp.isoformat(),
end=end_timestamp.isoformat(),
)
> 2026-02-03 21:50:26,671 [info] Changing function name - adding `"-batch"` suffix: {"func_name":"langchain-monitoring-app-test-batch"}
> 2026-02-03 21:50:26,815 [warning] It is recommended to use k8s secret (specify secret_name), specifying aws_access_key/aws_secret_key directly is unsafe.
> 2026-02-03 21:50:26,829 [info] Storing function: {"db":"http://localhost:30070","name":"langchain-monitoring-app-test-batch--handler","uid":"f2c3c94681094915beb2c5c1ccc0dac8"}
> 2026-02-03 21:50:27,953 [warning] No data was found for any of the specified endpoints. No results were produced: {"application_name":"langchain-monitoring-app-test-batch","end":"2026-02-03T21:54:26.640556","endpoints":["langchain_mlrun_endpoint"],"start":"2026-02-03T21:48:24.904667"}
| project | uid | iter | start | end | state | kind | name | labels | inputs | parameters | results |
|---|---|---|---|---|---|---|---|---|---|---|---|
| langchain-mlrun-tutorial | ...c0dac8 |
0 | Feb 03 19:50:26 | NaT | completed | run | langchain-monitoring-app-test-batch--handler | kind=local owner=Tomer_Weitzman host=M-QXN63PHMF9 |
endpoints=['langchain_mlrun_endpoint'] start=2026-02-03T21:48:24.904667 end=2026-02-03T21:54:26.640556 base_period=None write_output=False existing_data_handling=fail_on_overlap stream_profile=None |
> 2026-02-03 21:50:28,001 [info] Run execution finished: {"name":"langchain-monitoring-app-test-batch--handler","status":"completed"}
<mlrun.model.RunObject at 0x132cfaa50>
Deploy the Monitoring Application#
All that’s left to do now is to deploy our monitoring application!
# Deploy the monitoring app:
LangChainMonitoringApp.deploy(
func_name="langchain-monitoring-app",
func_path="langchain_mlrun.py",
image="mlrun/mlrun",
requirements=[
"langchain",
"pydantic-settings",
],
)
Once it is deployed, you can run events again and see the monitoring application in MLRun UI in action:
