核心可观测性SDK插桩

Instrumentation

使用 Langfuse SDK 为应用做 instrumentation 主要有两种方式:

  • 使用我们针对常见 LLM 和 agent 库(如 OpenAI、LangChain 或 Vercel AI SDK)提供的 原生集成。它们会自动创建 observation 与 trace,并捕获 prompt、响应、用量与错误。
  • 使用 Langfuse SDK 手动 instrumentation。SDK 提供了 3 种创建 observation 的方式:

这些方式之间可以互操作。你可以在 context manager 中嵌套一个 decorator 创建的 observation,或将手动 span 与我们的原生集成混合使用。

自定义 instrumentation

使用 Langfuse SDK 通过下面的方式为应用做 instrumentation:

Context manager

context manager 让你可以创建一个新 span 并在其生命周期内将其设为 OTel context 中当前活跃的 observation。在该代码块内创建的所有新 observation 都会自动成为它的子节点。

start_as_current_observation() 是创建 observation 的主要方式,同时确保活跃的 OpenTelemetry context 得到更新。在 with 块内创建的子 observation 都会自动继承父节点。

通过设置 as_type 参数,observation 可以是不同的类型

from langfuse import get_client, propagate_attributes
 
langfuse = get_client()
 
with langfuse.start_as_current_observation(
    as_type="span",
    name="user-request-pipeline",
    input={"user_query": "Tell me a joke"},
) as root_span:
    with propagate_attributes(user_id="user_123", session_id="session_abc"):
        with langfuse.start_as_current_observation(
            as_type="generation",
            name="joke-generation",
            model="gpt-4o",
        ) as generation:
            generation.update(output="Why did the span cross the road?")
 
    root_span.update(output={"final_joke": "..."})

Observe wrapper

observe 装饰器是一种简单的方式,可以在不修改函数内部逻辑的前提下,自动捕获被包裹函数的输入、输出、耗时和错误。

使用 observe() 装饰一个函数,自动捕获其输入、输出、耗时和错误。

通过设置 as_type 参数,observation 可以是不同的类型

from langfuse import observe
 
@observe()
def my_data_processing_function(data, parameter):
    return {"processed_data": data, "status": "ok"}
 
@observe(name="llm-call", as_type="generation")
async def my_async_llm_call(prompt_text):
    return "LLM response"

捕获大体积输入/输出可能带来开销。可以在装饰器上禁用 IO 捕获(capture_input=Falsecapture_output=False),或通过 LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED 环境变量统一禁用。

手动 observation

你也可以手动创建 observation。这在以下情况下很有用:

  • 记录自包含的工作,或与主执行流并行进行但仍应属于同一条 trace 的工作(例如由请求触发的后台任务)。
  • 显式管理 observation 的生命周期,可能是因为它的开始和结束由不连续的事件决定。
  • 在 observation 与某个特定 context 块绑定之前就需要拿到它的引用。

当你需要手动控制而不改变活跃 context 时,使用 start_observation()

可以通过 as_type 参数指定要创建的 observation 类型

from langfuse import get_client
 
langfuse = get_client()
 
span = langfuse.start_observation(name="manual-span")
span.update(input="Data for side task")
child = span.start_observation(name="child-span", as_type="generation")
child.end()
span.end()
⚠️

如果你使用 start_observation(),你需要负责在返回的 observation 对象上调用 .end()。否则在 Litefuse 中会出现不完整或缺失的 observation。它们的 start_as_current_... 对应方法配合 with 语句会自动处理这一点。

关键特征:

  • 不切换 context:和它们的 start_as_current_... 对应方法不同,这些方法不会把新 observation 设为 OpenTelemetry context 中活跃的那个。先前活跃的 span(如果有)仍然是后续主流程操作的当前 context。
  • 父子关系start_observation() 创建的 observation 仍然会成为创建时刻 context 中活跃 span 的子节点。
  • 手动生命周期:这些 observation 不由 with 块管理,因此必须显式地通过调用 .end() 方法结束
  • 嵌套子节点
    • 后续通过全局 langfuse.start_as_current_observation()(或类似全局方法)创建的 observation 不会 成为这些 “manual” observation 的子节点。它们会以原本活跃的 span 作为父节点。
    • 要直接在 “manual” observation 下创建子节点,需要使用 该具体 observation 对象上的方法(例如 manual_span.start_as_current_observation(...))。

更复杂嵌套的示例:

from langfuse import get_client
 
langfuse = get_client()
 
# This outer span establishes an active context.
with langfuse.start_as_current_observation(as_type="span", name="main-operation") as main_operation_span:
    # 'main_operation_span' is the current active context.
 
    # 1. Create a "manual" span using langfuse.start_observation().
    #    - It becomes a child of 'main_operation_span'.
    #    - Crucially, 'main_operation_span' REMAINS the active context.
    #    - 'manual_side_task' does NOT become the active context.
    manual_side_task = langfuse.start_observation(name="manual-side-task")
    manual_side_task.update(input="Data for side task")
 
    # 2. Start another operation that DOES become the active context.
    #    This will be a child of 'main_operation_span', NOT 'manual_side_task',
    #    because 'manual_side_task' did not alter the active context.
    with langfuse.start_as_current_observation(as_type="span", name="core-step-within-main") as core_step_span:
        # 'core_step_span' is now the active context.
        # 'manual_side_task' is still open but not active in the global context.
        core_step_span.update(input="Data for core step")
        # ... perform core step logic ...
        core_step_span.update(output="Core step finished")
    # 'core_step_span' ends. 'main_operation_span' is the active context again.
 
    # 3. Complete and end the manual side task.
    # This could happen at any point after its creation, even after 'core_step_span'.
    manual_side_task.update(output="Side task completed")
    manual_side_task.end() # Manual end is crucial for 'manual_side_task'
 
    main_operation_span.update(output="Main operation finished")
# 'main_operation_span' ends automatically here.
 
# Expected trace structure in Langfuse:
# - main-operation
#   |- manual-side-task
#   |- core-step-within-main
#     (Note: 'core-step-within-main' is a sibling to 'manual-side-task', both children of 'main-operation')

嵌套 observation

Langfuse SDK 的方法会自动处理 observation 的嵌套。

Observe 装饰器

如果你使用 observe wrapper,函数调用层级会被自动捕获并反映在 trace 中。

from langfuse import observe
 
@observe
def my_data_processing_function(data, parameter):
    # ... processing logic ...
    return {"processed_data": data, "status": "ok"}
 
 
@observe
def main_function(data, parameter):
    return my_data_processing_function(data, parameter)

Context Manager

如果你使用 context manager,嵌套由 OpenTelemetry 的上下文传播自动处理。当你通过 start_as_current_observation() 创建新 observation 时,它会成为创建时 context 中活跃 observation 的子节点。

from langfuse import get_client
 
langfuse = get_client()
 
with langfuse.start_as_current_observation(as_type="span", name="outer-process") as outer_span:
    # outer_span is active
 
    with langfuse.start_as_current_observation(as_type="generation", name="llm-step-1") as gen1:
        # gen1 is active, child of outer_span
        gen1.update(output="LLM 1 output")
 
    with outer_span.start_as_current_observation(name="intermediate-step") as mid_span:
        # mid_span is active, also a child of outer_span
        # This demonstrates using the yielded span object to create children
 
        with mid_span.start_as_current_observation(as_type="generation", name="llm-step-2") as gen2:
            # gen2 is active, child of mid_span
            gen2.update(output="LLM 2 output")
 
        mid_span.update(output="Intermediate processing done")
 
    outer_span.update(output="Outer process finished")

手动 observation

如果你正在手动创建 observation,可以使用父 LangfuseSpanLangfuseGeneration 对象上的方法来创建子节点。这些子节点 不会 成为当前 context,除非使用其 _as_current_ 变体(参见 context manager)。

from langfuse import get_client
 
langfuse = get_client()
 
parent = langfuse.start_observation(name="manual-parent")
 
child_span = parent.start_observation(name="manual-child-span")
# ... work ...
child_span.end()
 
child_gen = parent.start_observation(name="manual-child-generation", as_type="generation")
# ... work ...
child_gen.end()
 
parent.end()

更新 observation

随着代码执行,你可以为 observation 更新新的信息。

from langfuse import get_client
 
langfuse = get_client()
 
with langfuse.start_as_current_observation(as_type="generation", name="llm-call", model="gpt-5-mini") as gen:
    gen.update(input={"prompt": "Why is the sky blue?"})
    
    # ... make LLM call ...
    response_text = "Rayleigh scattering..."
    
    gen.update(
        output=response_text,
        usage_details={"input_tokens": 5, "output_tokens": 50},
        metadata={"confidence": 0.9}
    )
 
# Alternatively, update the current observation in context:
with langfuse.start_as_current_observation(as_type="span", name="data-processing"):
    # ... some processing ...
    langfuse.update_current_span(metadata={"step1_complete": True})
    # ... more processing ...
    langfuse.update_current_span(output={"result": "final_data"})

为 observation 添加属性

你可以为 observation 添加属性,帮助你更好地理解应用并在 Litefuse 中关联 observation:

要更新 trace 的输入和输出,请参见 trace 级输入/输出

使用 propagate_attributes() 为 observation 添加属性。

from langfuse import get_client, propagate_attributes
 
langfuse = get_client()
 
with langfuse.start_as_current_observation(as_type="span", name="user-workflow"):
    with propagate_attributes(
        user_id="user_123",
        session_id="session_abc",
        metadata={"experiment": "variant_a"},
        version="1.0",
        trace_name="user-workflow",
    ):
        with langfuse.start_as_current_observation(as_type="generation", name="llm-call"):
            pass

使用 @observe() 装饰器时:

from langfuse import observe, propagate_attributes
 
@observe()
def my_llm_pipeline(user_id: str, session_id: str):
    # Propagate early in the trace
    with propagate_attributes(
        user_id=user_id,
        session_id=session_id,
        metadata={"pipeline": "main"}
    ):
        # All nested @observe functions inherit these attributes
        result = call_llm()
        return result
 
@observe()
def call_llm():
    # This automatically has user_id, session_id, metadata from parent
    pass
Note on Attribute Propagation
We use Attribute Propagation to propagate specific attributes (userId, sessionId, version, tags, metadata, traceName) across all observations in an execution context. We will use all observations with these attributes to calculate attribute-level metrics. Please consider the following when using Attribute Propagation:
  • Values must be strings ≤200 characters
  • Metadata keys: Alphanumeric characters only (no whitespace or special characters)
  • Call early in your trace to ensure all observations are covered. This way you make sure that all Metrics in Litefuse are accurate.
  • Invalid values are dropped with a warning

跨服务传播

对于跨多个服务的分布式 tracing,使用 as_baggage 参数(详情参见 OpenTelemetry 文档)通过 HTTP header 传播属性。

from langfuse import get_client, propagate_attributes
import requests
 
langfuse = get_client()
 
with langfuse.start_as_current_observation(as_type="span", name="api-request"):
    with propagate_attributes(
        user_id="user_123",
        session_id="session_abc",
        as_baggage=True,
    ):
        requests.get("https://service-b.example.com/api")
⚠️

安全提示:当启用 baggage 传播时,属性会被加到所有对外的 HTTP header 上。请仅对分布式 tracing 所需的非敏感值使用它。

更新 trace

默认情况下,trace 的输入/输出会镜像你在 根 observation(trace 中第一个 observation)上设置的内容。如果出于 LLM-as-a-Judge、AB 测试或 UI 清晰度的需要,你可以自定义 trace 级别的信息。

Litefuse 中的 LLM-as-a-Judge 工作流可能依赖 trace 级别的输入/输出。如果你的评估载荷与根 observation 不同,请显式设置它们,而不要依赖默认行为。

默认行为

from langfuse import get_client
 
langfuse = get_client()
 
# Using the context manager
with langfuse.start_as_current_observation(
    as_type="span",
    name="user-request",
    input={"query": "What is the capital of France?"}  # This becomes the trace input
) as root_span:
 
    with langfuse.start_as_current_observation(
        as_type="generation",
        name="llm-call",
        model="gpt-4o",
        input={"messages": [{"role": "user", "content": "What is the capital of France?"}]}
    ) as gen:
        response = "Paris is the capital of France."
        gen.update(output=response)
        # LLM generation input/output are separate from trace input/output
 
    root_span.update(output={"answer": "Paris"})  # This becomes the trace output

覆盖默认行为

如果你需要让 trace 的输入/输出与根 observation 不同,请使用 observation.set_trace_io()langfuse.set_current_trace_io()

⚠️

set_trace_io()set_current_trace_io() 已被弃用,仅为兼容依赖 trace 级输入/输出的旧版 LLM-as-a-judge 评估器而保留。新代码请直接在根 observation 上设置输入/输出。详见 Python v3 → v4 迁移指南

from langfuse import get_client
 
langfuse = get_client()
 
with langfuse.start_as_current_observation(as_type="span", name="complex-pipeline") as root_span:
    # Root span has its own input/output
    root_span.update(input="Step 1 data", output="Step 1 result")
 
    # But trace should have different input/output (e.g., for LLM-as-a-judge)
    root_span.set_trace_io(
        input={"original_query": "User's actual question"},
        output={"final_answer": "Complete response", "confidence": 0.95}
    )
 
    # Now trace input/output are independent of root span input/output
 
# Using the observe decorator
@observe()
def process_user_query(user_question: str):
    # LLM processing...
    answer = call_llm(user_question)
 
    # Explicitly set trace input/output for evaluation features
    langfuse.set_current_trace_io(
        input={"question": user_question},
        output={"answer": answer}
    )
 
    return answer

Trace 与 observation ID

Litefuse 遵循 W3C Trace Context 标准

  • trace ID 是 32 字符的小写十六进制字符串(16 字节)
  • observation ID 是 16 字符的小写十六进制字符串(8 字节)

你不能设置任意的 observation ID,但可以生成确定性的 trace ID 以便与外部系统关联。

更多关于跨服务关联 trace 的信息,请参见 Trace ID 与分布式 tracing

使用 create_trace_id() 生成 trace ID。如果提供了 seed,则 ID 是确定性的。使用相同的 seed 会得到相同的 ID。这对于将外部 ID 与 Litefuse trace 关联非常有用。

from langfuse import get_client, Langfuse
langfuse = get_client()
 
external_request_id = "req_12345"
deterministic_trace_id = langfuse.create_trace_id(seed=external_request_id)

使用 get_current_trace_id() 获取当前 trace ID,使用 get_current_observation_id 获取当前 observation ID。

你也可以通过 observation.trace_idobservation.id 直接从 LangfuseSpan 或 LangfuseGeneration 对象上访问 trace ID 和 observation ID。

from langfuse import get_client, Langfuse
langfuse = get_client()
 
with langfuse.start_as_current_observation(as_type="span", name="my-op") as current_op:
    trace_id = langfuse.get_current_trace_id()
    observation_id = langfuse.get_current_observation_id()
    print(trace_id, observation_id)

关联到已有 trace

当与已经有 trace ID 的上游服务集成时,请提供 W3C trace context,让 Litefuse span 加入到已有的 trace 树中,而不是另起一棵。

使用 trace_context 参数设置自定义 trace context 信息。

from langfuse import get_client
 
langfuse = get_client()
 
existing_trace_id = "abcdef1234567890abcdef1234567890"
existing_parent_span_id = "fedcba0987654321"
 
with langfuse.start_as_current_observation(
    as_type="span",
    name="process-downstream-task",
    trace_context={
        "trace_id": existing_trace_id,
        "parent_span_id": existing_parent_span_id,
    },
):
    pass

客户端生命周期与 flush

由于 Langfuse SDK 是异步的,它会在后台缓冲 span。在短生命周期进程(脚本、serverless 函数、worker)中,请始终调用 flush()shutdown() 以避免数据丢失。

flush()

手动触发将所有缓冲的 observation(span、generation、score、媒体元数据)发送到 Litefuse API。这在短脚本中或退出应用前确保数据持久化非常有用。

from langfuse import get_client
 
langfuse = get_client()
# ... create traces and observations ...
langfuse.flush() # Ensures all pending data is sent

flush() 方法会阻塞,直到队列中的数据被相应的后台线程处理完毕。

shutdown()

优雅关闭 Langfuse 客户端。包括:

  1. flush 所有缓冲的数据(类似 flush())。
  2. 等待后台线程(数据写入和媒体上传)完成当前任务并终止。

在应用退出前调用 shutdown() 至关重要,能避免数据丢失并确保资源被干净释放。SDK 会自动注册 atexit 钩子,在程序正常退出时调用 shutdown(),但在以下场景下推荐手动调用:

  • 长期运行的守护进程或服务在收到关闭信号时。
  • atexit 可能不可靠的场景(例如某些 serverless 环境或被强制终止时)。
from langfuse import get_client
 
langfuse = get_client()
# ... application logic ...
 
# Before exiting:
langfuse.shutdown()
这个页面对你有帮助吗?