指南:Mistral SDK 集成(Python)

本指南提供了在 Python 中将 Litefuse 与 Mistral AI SDK(v1)集成的逐步示例。通过这些示例,你将学会如何无缝地记录和追踪与 Mistral 语言模型的交互,从而提升 AI 驱动应用的透明度、可调试性和性能监控能力。

ℹ️

注意:Litefuse 还原生集成了 LangChainLlamaIndexLiteLLM 以及其他框架。如果你使用其中任何一个,对 Mistral 模型的所有调用都会被立即插桩。

概览

在本 notebook 中,我们将探索可以将 Litefuse 与 Mistral AI SDK 集成的多种使用场景,包括:

  • 基础 LLM 调用: 学习如何使用 Litefuse 的 @observe 装饰器包装标准的 Mistral 模型交互,实现完整的日志记录。
  • 链式函数调用: 看看如何管理和观测涉及多次模型交互、串联在一起以产生最终结果的复杂工作流。
  • 异步与流式支持: 了解如何在异步以及流式响应场景下使用 Litefuse 与 Mistral 模型,确保实时和并发交互完全可追踪。
  • 函数调用: 理解如何与 Mistral 一起实现并观测外部工具集成,让模型能够调用自定义函数和 API。

关于 Mistral SDK 或 Litefuse @observe 装饰器更详细的指引,请参阅 Mistral SDK 仓库Litefuse 文档

设置

%pip install mistralai langfuse
import os
 
# Get keys for your project from the project settings page: https://litefuse.cloud
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..." 
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..." 
os.environ["LANGFUSE_BASE_URL"] = "https://litefuse.cloud"
 
# Your Mistral key
os.environ["MISTRAL_API_KEY"] = "..."
from mistralai import Mistral
 
# Initialize Mistral client
mistral_client = Mistral(api_key=os.environ["MISTRAL_API_KEY"])

示例

Completion

我们使用 @observe 装饰器 将 Mistral AI SDK 与 Litefuse 集成,这对于记录和追踪与大语言模型(LLM)的交互至关重要。@observe(as_type="generation") 装饰器专门用于记录 LLM 交互,会捕获输入、输出和模型参数。生成的 mistral_completion 方法可以在你的项目中通用。

from langfuse import observe, get_client
langfuse = get_client()
 
# Function to handle Mistral completion calls, wrapped with @observe to log the LLM interaction
@observe(as_type="generation")
def mistral_completion(**kwargs):
  # Clone kwargs to avoid modifying the original input
  kwargs_clone = kwargs.copy()
 
  # Extract relevant parameters from kwargs
  input = kwargs_clone.pop('messages', None)
  model = kwargs_clone.pop('model', None)
  min_tokens = kwargs_clone.pop('min_tokens', None)
  max_tokens = kwargs_clone.pop('max_tokens', None)
  temperature = kwargs_clone.pop('temperature', None)
  top_p = kwargs_clone.pop('top_p', None)
 
  # Filter and prepare model parameters for logging
  model_parameters = {
        "maxTokens": max_tokens,
        "minTokens": min_tokens,
        "temperature": temperature,
        "top_p": top_p
    }
  model_parameters = {k: v for k, v in model_parameters.items() if v is not None}
 
  # Log the input and model parameters before calling the LLM
  langfuse.update_current_generation(
      input=input,
      model=model,
      model_parameters=model_parameters,
      metadata=kwargs_clone,
 
  )
 
  # Call the Mistral model to generate a response
  res = mistral_client.chat.complete(**kwargs)
 
  # Log the usage details and output content after the LLM call
  langfuse.update_current_generation(
      usage_details={
          "input": res.usage.prompt_tokens,
          "output": res.usage.completion_tokens
      },
      output=res.choices[0].message.content
  )
 
  # Return the model's response object
  return res

可选地,其他函数(API 处理函数、检索函数等)也可以加上装饰器。

简单示例

下面的示例中,我们也为顶层函数 find_best_painter_from 加了装饰器。该函数调用了被 @observe(as_type="generation") 装饰的 mistral_completion 函数。这种层次化的设置有助于追踪涉及多个 LLM 调用以及其他被 @observe 装饰的非 LLM 方法的复杂应用。

你可以使用 langfuse.update_current_generationlangfuse.update_current_trace 来添加额外细节,例如输入、输出和模型参数。

@observe()
def find_best_painter_from(country="France"):
  response = mistral_completion(
      model="mistral-small-latest",
      max_tokens=1024,
      temperature=0.4,
      messages=[
        {
            "content": "Who is the best painter from {country}? Answer in one short sentence.".format(country=country),
            "role": "user",
        },
      ]
    )
  return response.choices[0].message.content
 
find_best_painter_from()

Litefuse 中的示例 trace:https://litefuse.cloud/project/cloramnkj0002jz088vzn1ja4/traces/836a9585-cfcc-47f7-881f-85ebdd9f601b

链式 completion

本示例演示了如何使用 @observe 装饰器串联多次 LLM 调用。第一次调用识别指定国家最优秀的画家,第二次调用使用该画家的姓名查找他最著名的画作。两次交互都会被 Litefuse 记录,因为我们都使用了上面创建的、被包装过的 mistral_completion 方法,从而保证了链式请求全程可追踪。

@observe()
def find_best_painting_from(country="France"):
  response = mistral_completion(
      model="mistral-small-latest",
      max_tokens=1024,
      temperature=0.1,
      messages=[
        {
            "content": "Who is the best painter from {country}? Only provide the name.".format(country=country),
            "role": "user",
        },
      ]
    )
  painter_name = response.choices[0].message.content
  return mistral_completion(
      model="mistral-small-latest",
      max_tokens=1024,
      messages=[
        {
            "content": "What is the most famous painting of {painter_name}? Answer in one short sentence.".format(painter_name=painter_name),
            "role": "user",
        },
      ]
    )
 
find_best_painting_from("Germany")

Litefuse 中的示例 trace:https://litefuse.cloud/project/cloramnkj0002jz088vzn1ja4/traces/a3360c6f-24ad-455c-aae7-eb9d5c6f5dac

流式 completion

下面的示例演示了如何使用 @observe(as_type="generation") 装饰器处理来自 Mistral 模型的流式响应。流程与上面的 Completion 示例类似,但增加了实时处理流数据的部分。

与上一个示例一样,我们使用 @observe 装饰器包装流式函数,以捕获输入、模型参数和用量细节。此外,函数会增量处理流式输出,每收到一个 chunk 就更新一次 Litefuse 上下文。

# Wrap streaming function with decorator
@observe(as_type="generation")
def stream_mistral_completion(**kwargs):
    kwargs_clone = kwargs.copy()
    input = kwargs_clone.pop('messages', None)
    model = kwargs_clone.pop('model', None)
    min_tokens = kwargs_clone.pop('min_tokens', None)
    max_tokens = kwargs_clone.pop('max_tokens', None)
    temperature = kwargs_clone.pop('temperature', None)
    top_p = kwargs_clone.pop('top_p', None)
 
    model_parameters = {
        "maxTokens": max_tokens,
        "minTokens": min_tokens,
        "temperature": temperature,
        "top_p": top_p
    }
    model_parameters = {k: v for k, v in model_parameters.items() if v is not None}
 
    langfuse.update_current_generation(
        input=input,
        model=model,
        model_parameters=model_parameters,
        metadata=kwargs_clone,
    )
 
    res = mistral_client.chat.stream(**kwargs)
    final_response = ""
    for chunk in res:
        content = chunk.data.choices[0].delta.content
        final_response += content
        yield content
 
        if chunk.data.choices[0].finish_reason == "stop":
            langfuse.update_current_generation(
                usage_details={
                    "input": chunk.data.usage.prompt_tokens,
                    "output": chunk.data.usage.completion_tokens
                },
                output=final_response
            )
            break
 
# Use stream_mistral_completion as you'd usually use the SDK
@observe()
def stream_find_best_five_painter_from(country="France"):
    response_chunks = stream_mistral_completion(
        model="mistral-small-latest",
        max_tokens=1024,
        messages=[
            {
                "content": "Who are the best five painter from {country}? Answer in one short sentence.".format(country=country),
                "role": "user",
            },
        ]
    )
    final_response = ""
    for chunk in response_chunks:
        final_response += chunk
        # You can also do something with each chunk here if needed
        print(chunk)
 
    return final_response
 
stream_find_best_five_painter_from("Spain")

Litefuse 中的示例 trace:https://litefuse.cloud/project/cloramnkj0002jz088vzn1ja4/traces/75a2a4fe-088d-4134-9797-ba9c21be01b2

异步 completion

本示例展示了在异步上下文中使用 @observe 装饰器。它包装了一个与 Mistral 模型交互的异步函数,确保请求和响应都会被 Litefuse 记录。异步函数允许非阻塞的 LLM 调用,适合需要并发但又要保持完整交互可观测性的应用。

# Wrap async function with decorator
@observe(as_type="generation")
async def async_mistral_completion(**kwargs):
  kwargs_clone = kwargs.copy()
  input = kwargs_clone.pop('messages', None)
  model = kwargs_clone.pop('model', None)
  min_tokens = kwargs_clone.pop('min_tokens', None)
  max_tokens = kwargs_clone.pop('max_tokens', None)
  temperature = kwargs_clone.pop('temperature', None)
  top_p = kwargs_clone.pop('top_p', None)
 
  model_parameters = {
        "maxTokens": max_tokens,
        "minTokens": min_tokens,
        "temperature": temperature,
        "top_p": top_p
    }
  model_parameters = {k: v for k, v in model_parameters.items() if v is not None}
 
  langfuse.update_current_generation(
      input=input,
      model=model,
      model_parameters=model_parameters,
      metadata=kwargs_clone,
 
  )
 
  res = await mistral_client.chat.complete_async(**kwargs)
 
  langfuse.update_current_generation(
      usage_details={
          "input": res.usage.prompt_tokens,
          "output": res.usage.completion_tokens
      },
      output=res.choices[0].message.content
  )
 
  return res
 
@observe()
async def async_find_best_musician_from(country="France"):
  response = await async_mistral_completion(
      model="mistral-small-latest",
      max_tokens=1024,
      messages=[
        {
            "content": "Who is the best musician from {country}? Answer in one short sentence.".format(country=country),
            "role": "user",
        },
      ]
    )
  return response
 
await async_find_best_musician_from("Spain")

Litefuse 中的示例 trace:https://litefuse.cloud/project/cloramnkj0002jz088vzn1ja4/traces/1f7d91ce-45dd-41bf-8e6f-1875086ed32f

异步流式

本示例演示了在异步流式上下文中使用 @observe 装饰器。它包装了一个从 Mistral 模型流式获取响应的异步函数,并实时记录每一个数据 chunk。

import asyncio
 
# Wrap async streaming function with decorator
@observe(as_type="generation")
async def async_stream_mistral_completion(**kwargs):
    kwargs_clone = kwargs.copy()
    input = kwargs_clone.pop('messages', None)
    model = kwargs_clone.pop('model', None)
    min_tokens = kwargs_clone.pop('min_tokens', None)
    max_tokens = kwargs_clone.pop('max_tokens', None)
    temperature = kwargs_clone.pop('temperature', None)
    top_p = kwargs_clone.pop('top_p', None)
 
    model_parameters = {
        "maxTokens": max_tokens,
        "minTokens": min_tokens,
        "temperature": temperature,
        "top_p": top_p
    }
    model_parameters = {k: v for k, v in model_parameters.items() if v is not None}
 
    langfuse.update_current_generation(
        input=input,
        model=model,
        model_parameters=model_parameters,
        metadata=kwargs_clone,
    )
 
    res = await mistral_client.chat.stream_async(**kwargs)
    final_response = ""
    async for chunk in res:
        content = chunk.data.choices[0].delta.content
        final_response += content
        yield content
 
        if chunk.data.choices[0].finish_reason == "stop":
            langfuse.update_current_generation(
                usage_details={
                    "input": chunk.data.usage.prompt_tokens,
                    "output": chunk.data.usage.completion_tokens
                },
                output=final_response
            )
            break
 
@observe()
async def async_stream_find_best_five_musician_from(country="France"):
    response_chunks = async_stream_mistral_completion(
        model="mistral-small-latest",
        max_tokens=1024,
        messages=[
            {
                "content": "Who are the best five musician from {country}? Answer in one short sentence.".format(country=country),
                "role": "user",
            },
        ]
    )
    final_response = ""
    async for chunk in response_chunks:
        final_response += chunk
        # You can also do something with each chunk here if needed
        print(chunk)
 
    return final_response
 
# Run the async function
await async_stream_find_best_five_musician_from("Spain")

Litefuse 中的示例 trace:https://litefuse.cloud/project/cloramnkj0002jz088vzn1ja4/traces/36608110-f6cf-4566-a080-7c18777e2dbf

工具调用

下面的代码片段介绍了 Mistral 的函数调用能力,你可以定义自定义函数,根据交易 ID 检索特定数据,例如付款状态和日期。然后将这些函数注册给 Mistral 模型,使其在处理查询时能够调用它们。关于 Mistral 函数调用的更深入介绍,请参阅 Mistral 官方文档

import pandas as pd
import json
import functools
 
 
# Sample payment transaction data
data = {
    'transaction_id': ['T1001', 'T1002', 'T1003', 'T1004', 'T1005'],
    'customer_id': ['C001', 'C002', 'C003', 'C002', 'C001'],
    'payment_amount': [125.50, 89.99, 120.00, 54.30, 210.20],
    'payment_date': ['2021-10-05', '2021-10-06', '2021-10-07', '2021-10-05', '2021-10-08'],
    'payment_status': ['Paid', 'Unpaid', 'Paid', 'Paid', 'Pending']
}
 
# Create a DataFrame from the data
df = pd.DataFrame(data)
 
# Function to retrieve payment status given a transaction ID
def retrieve_payment_status(df: data, transaction_id: str) -> str:
    if transaction_id in df.transaction_id.values:
        # Return the payment status as a JSON string
        return json.dumps({'status': df[df.transaction_id == transaction_id].payment_status.item()})
    return json.dumps({'error': 'transaction id not found.'})
 
# Function to retrieve payment date given a transaction ID
def retrieve_payment_date(df: data, transaction_id: str) -> str:
    if transaction_id in df.transaction_id.values:
        # Return the payment date as a JSON string
        return json.dumps({'date': df[df.transaction_id == transaction_id].payment_date.item()})
    return json.dumps({'error': 'transaction id not found.'})
 
# Define tools for the Mistral model with JSON schemas
tools = [
  {
      "type": "function",
      "function": {
          "name": "retrieve_payment_status",
          "description": "Get payment status of a transaction",
          "parameters": {
              "type": "object",
              "properties": {
                  "transaction_id": {
                      "type": "string",
                      "description": "The transaction id.",
                  }
              },
              "required": ["transaction_id"],
          },
      },
  },
  {
      "type": "function",
      "function": {
          "name": "retrieve_payment_date",
          "description": "Get payment date of a transaction",
          "parameters": {
              "type": "object",
              "properties": {
                  "transaction_id": {
                      "type": "string",
                      "description": "The transaction id.",
                  }
              },
              "required": ["transaction_id"],
          },
      },
  }
]
 
# Define tools for the Mistral model with JSON schemas
names_to_functions = {
  'retrieve_payment_status': functools.partial(retrieve_payment_status, df=df),
  'retrieve_payment_date': functools.partial(retrieve_payment_date, df=df)
}

check_transaction_status 函数演示了如何使用 Mistral 的函数调用能力。函数的结果随后被纳入 LLM 的回答中,并由 Litefuse 记录和追踪。本示例展示了通过使用包装后的 mistral_completion 函数,外部函数调用如何无缝集成到 Litefuse 中——确保每一步(从工具选择到最终输出)都被捕获,实现完整的可观测性。

@observe()
def tool_calling_check_transaction_status(id="T1001"):
 
  # Construct the initial user query message
  messages = [{"role": "user", "content": "What's the status of my transaction {id}?".format(id=id)}]
 
  # Use the Langfuse-decorated Mistral completion function to generate a tool-assisted response
  response = mistral_completion(
      model = "mistral-small-latest",
      messages = messages,
      max_tokens=512,
      temperature=0.1,
      tools = tools,
      tool_choice = "any",
  )
 
 
  messages.append(response.choices[0].message)
 
  # Extract the tool call details from the model's response
  tool_call = response.choices[0].message.tool_calls[0]
  function_name = tool_call.function.name
  function_params = json.loads(tool_call.function.arguments)
 
   # Execute the selected function with the extracted parameters
  function_result = names_to_functions[function_name](**function_params)
 
  messages.append({"role":"tool", "name":function_name, "content":function_result, "tool_call_id":tool_call.id})
 
  # Call the Langfuse-wrapped Mistral completion function again to generate a final response using the tool's result
  response = mistral_completion(
      model = "mistral-small-latest",
      max_tokens=1024,
      temperature=0.5,
      messages = messages
  )
 
  return response.choices[0].message.content
 
tool_calling_check_transaction_status("T1005")

Litefuse 中的示例 trace:https://litefuse.cloud/project/cloramnkj0002jz088vzn1ja4/traces/e986408a-f96b-40dc-8278-5d0eb0286f82

Interoperability with the Python SDK

You can use this integration together with the Litefuse SDKs to add additional attributes to the observation.

The @observe() decorator provides a convenient way to automatically wrap your instrumented code and add additional attributes to the observation.

from langfuse import observe, propagate_attributes, get_client
 
langfuse = get_client()
 
@observe()
def my_llm_pipeline(input):
    # Add additional attributes (user_id, session_id, metadata, version, tags) to all spans created within this execution scope
    with propagate_attributes(
        user_id="user_123",
        session_id="session_abc",
        tags=["agent", "my-observation"],
        metadata={"email": "user@litefuse.ai"},
        version="1.0.0"
    ):
 
        # YOUR APPLICATION CODE HERE
        result = call_llm(input)
 
        return result
 
# Run the function
my_llm_pipeline("Hi")

Learn more about using the Decorator in the Langfuse SDK instrumentation docs.

Troubleshooting

No observations appearing

First, enable debug mode in the Python SDK:

export LANGFUSE_DEBUG="True"

Then run your application and check the debug logs:

  • OTel observations appear in the logs: Your application is instrumented correctly but observations are not reaching Litefuse. To resolve this:
    1. Call langfuse.flush() at the end of your application to ensure all observations are exported.
    2. Verify that you are using the correct API keys and base URL.
  • No OTel spans in the logs: Your application is not instrumented correctly. Make sure the instrumentation runs before your application code.
Unwanted observations in Litefuse

The Langfuse SDK is based on OpenTelemetry. Other libraries in your application may emit OTel spans that are not relevant to you. These still count toward your billable units, so you should filter them out. See Unwanted spans in Litefuse for details.

Missing attributes

Some attributes may be stored in the metadata object of the observation rather than being mapped to the Litefuse data model. If a mapping or integration does not work as expected, please raise an issue on GitHub.

Next Steps

Once you have instrumented your code, you can manage, evaluate and debug your application:

这个页面对你有帮助吗?