Instrumentation
使用 Langfuse SDK 为应用做 instrumentation 主要有两种方式:
- 使用我们针对常见 LLM 和 agent 库(如 OpenAI、LangChain 或 Vercel AI SDK)提供的 原生集成。它们会自动创建 observation 与 trace,并捕获 prompt、响应、用量与错误。
- 使用 Langfuse SDK 手动 instrumentation。SDK 提供了 3 种创建 observation 的方式:
这些方式之间可以互操作。你可以在 context manager 中嵌套一个 decorator 创建的 observation,或将手动 span 与我们的原生集成混合使用。
自定义 instrumentation
使用 Langfuse SDK 通过下面的方式为应用做 instrumentation:
Context manager
context manager 让你可以创建一个新 span 并在其生命周期内将其设为 OTel context 中当前活跃的 observation。在该代码块内创建的所有新 observation 都会自动成为它的子节点。
start_as_current_observation() 是创建 observation 的主要方式,同时确保活跃的 OpenTelemetry context 得到更新。在 with 块内创建的子 observation 都会自动继承父节点。
通过设置 as_type 参数,observation 可以是不同的类型。
from langfuse import get_client, propagate_attributes
langfuse = get_client()
with langfuse.start_as_current_observation(
as_type="span",
name="user-request-pipeline",
input={"user_query": "Tell me a joke"},
) as root_span:
with propagate_attributes(user_id="user_123", session_id="session_abc"):
with langfuse.start_as_current_observation(
as_type="generation",
name="joke-generation",
model="gpt-4o",
) as generation:
generation.update(output="Why did the span cross the road?")
root_span.update(output={"final_joke": "..."})Observe wrapper
observe 装饰器是一种简单的方式,可以在不修改函数内部逻辑的前提下,自动捕获被包裹函数的输入、输出、耗时和错误。
使用 observe() 装饰一个函数,自动捕获其输入、输出、耗时和错误。
通过设置 as_type 参数,observation 可以是不同的类型。
from langfuse import observe
@observe()
def my_data_processing_function(data, parameter):
return {"processed_data": data, "status": "ok"}
@observe(name="llm-call", as_type="generation")
async def my_async_llm_call(prompt_text):
return "LLM response"捕获大体积输入/输出可能带来开销。可以在装饰器上禁用 IO 捕获(capture_input=False、capture_output=False),或通过 LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED 环境变量统一禁用。
手动 observation
你也可以手动创建 observation。这在以下情况下很有用:
- 记录自包含的工作,或与主执行流并行进行但仍应属于同一条 trace 的工作(例如由请求触发的后台任务)。
- 显式管理 observation 的生命周期,可能是因为它的开始和结束由不连续的事件决定。
- 在 observation 与某个特定 context 块绑定之前就需要拿到它的引用。
当你需要手动控制而不改变活跃 context 时,使用 start_observation()。
可以通过 as_type 参数指定要创建的 observation 类型。
from langfuse import get_client
langfuse = get_client()
span = langfuse.start_observation(name="manual-span")
span.update(input="Data for side task")
child = span.start_observation(name="child-span", as_type="generation")
child.end()
span.end()如果你使用 start_observation(),你需要负责在返回的 observation 对象上调用 .end()。否则在 Litefuse 中会出现不完整或缺失的 observation。它们的 start_as_current_... 对应方法配合 with 语句会自动处理这一点。
关键特征:
- 不切换 context:和它们的
start_as_current_...对应方法不同,这些方法不会把新 observation 设为 OpenTelemetry context 中活跃的那个。先前活跃的 span(如果有)仍然是后续主流程操作的当前 context。 - 父子关系:
start_observation()创建的 observation 仍然会成为创建时刻 context 中活跃 span 的子节点。 - 手动生命周期:这些 observation 不由
with块管理,因此必须显式地通过调用.end()方法结束。 - 嵌套子节点:
- 后续通过全局
langfuse.start_as_current_observation()(或类似全局方法)创建的 observation 不会 成为这些 “manual” observation 的子节点。它们会以原本活跃的 span 作为父节点。 - 要直接在 “manual” observation 下创建子节点,需要使用 该具体 observation 对象上的方法(例如
manual_span.start_as_current_observation(...))。
- 后续通过全局
更复杂嵌套的示例:
from langfuse import get_client
langfuse = get_client()
# This outer span establishes an active context.
with langfuse.start_as_current_observation(as_type="span", name="main-operation") as main_operation_span:
# 'main_operation_span' is the current active context.
# 1. Create a "manual" span using langfuse.start_observation().
# - It becomes a child of 'main_operation_span'.
# - Crucially, 'main_operation_span' REMAINS the active context.
# - 'manual_side_task' does NOT become the active context.
manual_side_task = langfuse.start_observation(name="manual-side-task")
manual_side_task.update(input="Data for side task")
# 2. Start another operation that DOES become the active context.
# This will be a child of 'main_operation_span', NOT 'manual_side_task',
# because 'manual_side_task' did not alter the active context.
with langfuse.start_as_current_observation(as_type="span", name="core-step-within-main") as core_step_span:
# 'core_step_span' is now the active context.
# 'manual_side_task' is still open but not active in the global context.
core_step_span.update(input="Data for core step")
# ... perform core step logic ...
core_step_span.update(output="Core step finished")
# 'core_step_span' ends. 'main_operation_span' is the active context again.
# 3. Complete and end the manual side task.
# This could happen at any point after its creation, even after 'core_step_span'.
manual_side_task.update(output="Side task completed")
manual_side_task.end() # Manual end is crucial for 'manual_side_task'
main_operation_span.update(output="Main operation finished")
# 'main_operation_span' ends automatically here.
# Expected trace structure in Langfuse:
# - main-operation
# |- manual-side-task
# |- core-step-within-main
# (Note: 'core-step-within-main' is a sibling to 'manual-side-task', both children of 'main-operation')嵌套 observation
Langfuse SDK 的方法会自动处理 observation 的嵌套。
Observe 装饰器
如果你使用 observe wrapper,函数调用层级会被自动捕获并反映在 trace 中。
from langfuse import observe
@observe
def my_data_processing_function(data, parameter):
# ... processing logic ...
return {"processed_data": data, "status": "ok"}
@observe
def main_function(data, parameter):
return my_data_processing_function(data, parameter)Context Manager
如果你使用 context manager,嵌套由 OpenTelemetry 的上下文传播自动处理。当你通过 start_as_current_observation() 创建新 observation 时,它会成为创建时 context 中活跃 observation 的子节点。
from langfuse import get_client
langfuse = get_client()
with langfuse.start_as_current_observation(as_type="span", name="outer-process") as outer_span:
# outer_span is active
with langfuse.start_as_current_observation(as_type="generation", name="llm-step-1") as gen1:
# gen1 is active, child of outer_span
gen1.update(output="LLM 1 output")
with outer_span.start_as_current_observation(name="intermediate-step") as mid_span:
# mid_span is active, also a child of outer_span
# This demonstrates using the yielded span object to create children
with mid_span.start_as_current_observation(as_type="generation", name="llm-step-2") as gen2:
# gen2 is active, child of mid_span
gen2.update(output="LLM 2 output")
mid_span.update(output="Intermediate processing done")
outer_span.update(output="Outer process finished")手动 observation
如果你正在手动创建 observation,可以使用父 LangfuseSpan 或 LangfuseGeneration 对象上的方法来创建子节点。这些子节点 不会 成为当前 context,除非使用其 _as_current_ 变体(参见 context manager)。
from langfuse import get_client
langfuse = get_client()
parent = langfuse.start_observation(name="manual-parent")
child_span = parent.start_observation(name="manual-child-span")
# ... work ...
child_span.end()
child_gen = parent.start_observation(name="manual-child-generation", as_type="generation")
# ... work ...
child_gen.end()
parent.end()更新 observation
随着代码执行,你可以为 observation 更新新的信息。
- 对于通过 context manager 创建或赋值给变量的 observation:使用对象上的
.update()方法。 - 要在不持有引用的情况下更新 当前活跃的 observation:使用
langfuse.update_current_span()或langfuse.update_current_generation()。
from langfuse import get_client
langfuse = get_client()
with langfuse.start_as_current_observation(as_type="generation", name="llm-call", model="gpt-5-mini") as gen:
gen.update(input={"prompt": "Why is the sky blue?"})
# ... make LLM call ...
response_text = "Rayleigh scattering..."
gen.update(
output=response_text,
usage_details={"input_tokens": 5, "output_tokens": 50},
metadata={"confidence": 0.9}
)
# Alternatively, update the current observation in context:
with langfuse.start_as_current_observation(as_type="span", name="data-processing"):
# ... some processing ...
langfuse.update_current_span(metadata={"step1_complete": True})
# ... more processing ...
langfuse.update_current_span(output={"result": "final_data"})为 observation 添加属性
你可以为 observation 添加属性,帮助你更好地理解应用并在 Litefuse 中关联 observation:
要更新 trace 的输入和输出,请参见 trace 级输入/输出。
使用 propagate_attributes() 为 observation 添加属性。
from langfuse import get_client, propagate_attributes
langfuse = get_client()
with langfuse.start_as_current_observation(as_type="span", name="user-workflow"):
with propagate_attributes(
user_id="user_123",
session_id="session_abc",
metadata={"experiment": "variant_a"},
version="1.0",
trace_name="user-workflow",
):
with langfuse.start_as_current_observation(as_type="generation", name="llm-call"):
pass使用 @observe() 装饰器时:
from langfuse import observe, propagate_attributes
@observe()
def my_llm_pipeline(user_id: str, session_id: str):
# Propagate early in the trace
with propagate_attributes(
user_id=user_id,
session_id=session_id,
metadata={"pipeline": "main"}
):
# All nested @observe functions inherit these attributes
result = call_llm()
return result
@observe()
def call_llm():
# This automatically has user_id, session_id, metadata from parent
pass- Values must be strings ≤200 characters
- Metadata keys: Alphanumeric characters only (no whitespace or special characters)
- Call early in your trace to ensure all observations are covered. This way you make sure that all Metrics in Litefuse are accurate.
- Invalid values are dropped with a warning
跨服务传播
对于跨多个服务的分布式 tracing,使用 as_baggage 参数(详情参见 OpenTelemetry 文档)通过 HTTP header 传播属性。
from langfuse import get_client, propagate_attributes
import requests
langfuse = get_client()
with langfuse.start_as_current_observation(as_type="span", name="api-request"):
with propagate_attributes(
user_id="user_123",
session_id="session_abc",
as_baggage=True,
):
requests.get("https://service-b.example.com/api")安全提示:当启用 baggage 传播时,属性会被加到所有对外的 HTTP header 上。请仅对分布式 tracing 所需的非敏感值使用它。
更新 trace
默认情况下,trace 的输入/输出会镜像你在 根 observation(trace 中第一个 observation)上设置的内容。如果出于 LLM-as-a-Judge、AB 测试或 UI 清晰度的需要,你可以自定义 trace 级别的信息。
Litefuse 中的 LLM-as-a-Judge 工作流可能依赖 trace 级别的输入/输出。如果你的评估载荷与根 observation 不同,请显式设置它们,而不要依赖默认行为。
默认行为
from langfuse import get_client
langfuse = get_client()
# Using the context manager
with langfuse.start_as_current_observation(
as_type="span",
name="user-request",
input={"query": "What is the capital of France?"} # This becomes the trace input
) as root_span:
with langfuse.start_as_current_observation(
as_type="generation",
name="llm-call",
model="gpt-4o",
input={"messages": [{"role": "user", "content": "What is the capital of France?"}]}
) as gen:
response = "Paris is the capital of France."
gen.update(output=response)
# LLM generation input/output are separate from trace input/output
root_span.update(output={"answer": "Paris"}) # This becomes the trace output覆盖默认行为
如果你需要让 trace 的输入/输出与根 observation 不同,请使用 observation.set_trace_io() 或 langfuse.set_current_trace_io():
set_trace_io() 和 set_current_trace_io() 已被弃用,仅为兼容依赖 trace 级输入/输出的旧版 LLM-as-a-judge 评估器而保留。新代码请直接在根 observation 上设置输入/输出。详见 Python v3 → v4 迁移指南。
from langfuse import get_client
langfuse = get_client()
with langfuse.start_as_current_observation(as_type="span", name="complex-pipeline") as root_span:
# Root span has its own input/output
root_span.update(input="Step 1 data", output="Step 1 result")
# But trace should have different input/output (e.g., for LLM-as-a-judge)
root_span.set_trace_io(
input={"original_query": "User's actual question"},
output={"final_answer": "Complete response", "confidence": 0.95}
)
# Now trace input/output are independent of root span input/output
# Using the observe decorator
@observe()
def process_user_query(user_question: str):
# LLM processing...
answer = call_llm(user_question)
# Explicitly set trace input/output for evaluation features
langfuse.set_current_trace_io(
input={"question": user_question},
output={"answer": answer}
)
return answerTrace 与 observation ID
Litefuse 遵循 W3C Trace Context 标准:
- trace ID 是 32 字符的小写十六进制字符串(16 字节)
- observation ID 是 16 字符的小写十六进制字符串(8 字节)
你不能设置任意的 observation ID,但可以生成确定性的 trace ID 以便与外部系统关联。
更多关于跨服务关联 trace 的信息,请参见 Trace ID 与分布式 tracing。
使用 create_trace_id() 生成 trace ID。如果提供了 seed,则 ID 是确定性的。使用相同的 seed 会得到相同的 ID。这对于将外部 ID 与 Litefuse trace 关联非常有用。
from langfuse import get_client, Langfuse
langfuse = get_client()
external_request_id = "req_12345"
deterministic_trace_id = langfuse.create_trace_id(seed=external_request_id)使用 get_current_trace_id() 获取当前 trace ID,使用 get_current_observation_id 获取当前 observation ID。
你也可以通过 observation.trace_id 和 observation.id 直接从 LangfuseSpan 或 LangfuseGeneration 对象上访问 trace ID 和 observation ID。
from langfuse import get_client, Langfuse
langfuse = get_client()
with langfuse.start_as_current_observation(as_type="span", name="my-op") as current_op:
trace_id = langfuse.get_current_trace_id()
observation_id = langfuse.get_current_observation_id()
print(trace_id, observation_id)关联到已有 trace
当与已经有 trace ID 的上游服务集成时,请提供 W3C trace context,让 Litefuse span 加入到已有的 trace 树中,而不是另起一棵。
使用 trace_context 参数设置自定义 trace context 信息。
from langfuse import get_client
langfuse = get_client()
existing_trace_id = "abcdef1234567890abcdef1234567890"
existing_parent_span_id = "fedcba0987654321"
with langfuse.start_as_current_observation(
as_type="span",
name="process-downstream-task",
trace_context={
"trace_id": existing_trace_id,
"parent_span_id": existing_parent_span_id,
},
):
pass客户端生命周期与 flush
由于 Langfuse SDK 是异步的,它会在后台缓冲 span。在短生命周期进程(脚本、serverless 函数、worker)中,请始终调用 flush() 或 shutdown() 以避免数据丢失。
手动触发将所有缓冲的 observation(span、generation、score、媒体元数据)发送到 Litefuse API。这在短脚本中或退出应用前确保数据持久化非常有用。
from langfuse import get_client
langfuse = get_client()
# ... create traces and observations ...
langfuse.flush() # Ensures all pending data is sentflush() 方法会阻塞,直到队列中的数据被相应的后台线程处理完毕。
优雅关闭 Langfuse 客户端。包括:
- flush 所有缓冲的数据(类似
flush())。 - 等待后台线程(数据写入和媒体上传)完成当前任务并终止。
在应用退出前调用 shutdown() 至关重要,能避免数据丢失并确保资源被干净释放。SDK 会自动注册 atexit 钩子,在程序正常退出时调用 shutdown(),但在以下场景下推荐手动调用:
- 长期运行的守护进程或服务在收到关闭信号时。
atexit可能不可靠的场景(例如某些 serverless 环境或被强制终止时)。
from langfuse import get_client
langfuse = get_client()
# ... application logic ...
# Before exiting:
langfuse.shutdown()