"""Litefuse / Langfuse observability plugin for Hermes Agent.

Subscribes to plugin-hook events to emit one Litefuse trace per
``run_conversation`` turn with real per-event wall-clock timestamps.
All callbacks run in-process inside the agent's Python interpreter, so
state lives in a single ``_state`` dict guarded by a lock instead of
the external offset/usage files used by the legacy shell-hook
implementation.

Trace shape::

    Hermes Agent — Turn N             (root span, input=user_message)
    ├── api: <model> #1               (generation, usage_details, real ts)
    ├── tool: <name>                  (tool, input=args, output=result, duration)
    ├── api: <model> #2               (generation, usage_details)
    ├── tool: <name>
    └── api: <model> #3               (generation, usage_details — last)
                                      ↳ root span output = assistant_response

Hooks subscribed (signatures per Hermes plugin-hook reference)::

    on_session_start (session_id, model, platform)
    pre_llm_call     (session_id, user_message, conversation_history,
                      is_first_turn, model, platform)
    post_api_request (session_id, task_id, platform, model, provider,
                      base_url, api_mode, api_call_count, api_duration,
                      finish_reason, message_count, response_model,
                      usage, assistant_content_chars,
                      assistant_tool_call_count)
    pre_tool_call    (tool_name, args, task_id, session_id, tool_call_id)
    post_tool_call   (tool_name, args, result, task_id, duration_ms,
                      session_id, tool_call_id)
    post_llm_call    (session_id, user_message, assistant_response,
                      conversation_history, model, platform)
    on_session_end   (session_id, completed, interrupted, model, platform)

Credentials are read from ``os.environ`` first, with a fallback to
``~/.hermes/.env`` (already loaded by Hermes startup) — no separate env
file required.
"""

from __future__ import annotations

import json
import logging
import os
import threading
from pathlib import Path
from typing import Any, Dict, Optional

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

HERMES_HOME = Path(os.path.expanduser("~/.hermes"))
LOG_PATH = HERMES_HOME / "state" / "litefuse_plugin.log"
MAX_CHARS = int(os.environ.get("HERMES_LITEFUSE_MAX_CHARS", "1000000"))

# ---------------------------------------------------------------------------
# Logger
# ---------------------------------------------------------------------------

_file_handler_attached = False


def _attach_file_handler() -> None:
    """Mirror plugin log lines to ~/.hermes/state/litefuse_plugin.log.

    Idempotent — safe to call multiple times across hook firings.
    Hermes' default logging stays on the agent's own log path; this
    handler exists so the operator can ``tail`` a dedicated file when
    diagnosing the integration.
    """
    global _file_handler_attached
    if _file_handler_attached:
        return
    try:
        LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
        handler = logging.FileHandler(LOG_PATH)
        handler.setFormatter(
            logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
        )
        plugin_logger = logging.getLogger(__name__)
        plugin_logger.addHandler(handler)
        plugin_logger.setLevel(
            logging.DEBUG
            if os.environ.get("HERMES_LITEFUSE_DEBUG", "").lower() in ("1", "true", "yes")
            else logging.INFO
        )
        _file_handler_attached = True
    except Exception:
        # Never break the agent on log-setup failure.
        pass


# ---------------------------------------------------------------------------
# Langfuse client — lazy + thread-safe
# ---------------------------------------------------------------------------

_client_lock = threading.Lock()
_client: Optional[Any] = None
_client_init_failed = False


def _get_client() -> Optional[Any]:
    """Return a singleton Langfuse client, or None if creds missing /
    SDK missing / init failed. Caches the failure state so we do not
    spam logs on every event firing.
    """
    global _client, _client_init_failed

    if _client is not None:
        return _client
    if _client_init_failed:
        return None

    with _client_lock:
        if _client is not None:
            return _client
        if _client_init_failed:
            return None

        pk = os.environ.get("LANGFUSE_PUBLIC_KEY")
        sk = os.environ.get("LANGFUSE_SECRET_KEY")
        host = (
            os.environ.get("LANGFUSE_BASE_URL")
            or os.environ.get("LANGFUSE_HOST")
            or "https://cloud.langfuse.com"
        )
        if not (pk and sk):
            logger.warning(
                "Litefuse credentials not set (LANGFUSE_PUBLIC_KEY / "
                "LANGFUSE_SECRET_KEY). Plugin will be inert. Put them "
                "in ~/.hermes/.env so Hermes loads them at startup."
            )
            _client_init_failed = True
            return None

        try:
            from langfuse import Langfuse
        except ImportError as exc:
            logger.error(
                "langfuse SDK not importable in the Hermes venv: %s. "
                "Install with: %s/bin/pip install 'langfuse>=4,<5'",
                exc, os.environ.get("VIRTUAL_ENV", "<hermes venv>"),
            )
            _client_init_failed = True
            return None

        try:
            _client = Langfuse(
                public_key=pk, secret_key=sk, host=host, tracing_enabled=True,
            )
            logger.info("Litefuse client ready (host=%s)", host)
            return _client
        except Exception as exc:
            logger.error("Langfuse client init failed: %s", exc)
            _client_init_failed = True
            return None


# ---------------------------------------------------------------------------
# Per-session state
# ---------------------------------------------------------------------------

# session_id -> {
#   "container": LangfuseSpan        # long-lived parent span for this turn
#                                    # (the "Hermes Agent — Turn N" you see
#                                    # at the trace top-level); all api,
#                                    # tool, and final-response spans nest
#                                    # underneath. Open at pre_llm_call,
#                                    # closed at post_llm_call.
#   "trace_id":  str                 # current turn's trace id
#   "tool_q":    deque[entry]        # FIFO of open tool spans for this turn
#   "api_seq":   int                 # api_call index, reset per turn
#   "tool_seq":  int                 # tool_call index, reset per turn
#   "turn_no":   int                 # 1-based turn counter, PERSISTS across
#                                    # turns in the same session (cleared
#                                    # only on on_session_finalize /
#                                    # on_session_reset).
#   "model":     str
#   "platform":  str
# }
_state: Dict[str, Dict[str, Any]] = {}
_state_lock = threading.Lock()

# Threading-local "current session" — set by ``pre_llm_call`` and read
# by ``pre_tool_call``. Necessary because in Hermes v0.12.0 the
# ``pre_tool_call`` invocation sites in run_agent.py (lines 9647 and
# 9998) pass only ``task_id=effective_task_id`` to
# ``get_pre_tool_call_block_message`` — both ``session_id`` and
# ``tool_call_id`` are empty. ``task_id`` is a fresh UUID4 per
# turn that does NOT equal ``session_id`` outside the CLI single-shot
# path, so we cannot use it as a fallback key.
#
# Concurrency model: each session/turn runs ``run_conversation`` on
# its own thread (or asyncio task in the gateway), and ``pre_llm_call``
# / ``pre_tool_call`` fire from the same thread, so threading.local
# yields the right session even when the gateway is serving multiple
# Feishu / Telegram / Slack users in parallel.
_current = threading.local()


def _session_key(kwargs: Dict[str, Any]) -> str:
    """Resolve the per-session state key for the active hook payload.

    Preference order:

    1. ``session_id`` from kwargs — populated for most events
       (``pre_llm_call``, ``post_llm_call``, ``on_session_start``,
       ``on_session_end``, ``post_api_request``, ``post_tool_call``).
    2. ``threading.local`` cache set by ``pre_llm_call`` — needed for
       ``pre_tool_call`` whose v0.12.0 invocation site omits
       ``session_id``.
    3. ``parent_session_id`` (subagent_stop).
    4. Falls back to ``task_id`` only as a last resort. For
       single-shot CLI invocations Hermes uses the session_id as
       task_id so the lookup still hits the right entry; for the
       gateway path this would land on a fresh entry and lose
       correlation — but the threading.local lookup above usually
       prevents this.
    """
    return (
        kwargs.get("session_id")
        or getattr(_current, "session_id", "")
        or kwargs.get("parent_session_id")
        or kwargs.get("task_id")
        or ""
    )


def _get_session(session_id: str) -> Dict[str, Any]:
    from collections import deque
    with _state_lock:
        s = _state.get(session_id)
        if s is None:
            s = {
                "container": None,
                "trace_id": None,
                "tool_q": deque(),
                "api_seq": 0,
                "tool_seq": 0,
                "turn_no": 0,
                "model": "",
                "platform": "",
            }
            _state[session_id] = s
        return s


def _drop_session(session_id: str) -> Optional[Dict[str, Any]]:
    with _state_lock:
        return _state.pop(session_id, None)


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


def _truncate(text: Any) -> Any:
    if text is None:
        return None
    if not isinstance(text, str):
        return text
    if len(text) <= MAX_CHARS:
        return text
    return text[:MAX_CHARS] + f"\n…[truncated {len(text) - MAX_CHARS} chars]"


def _user_id() -> str:
    return (
        os.environ.get("LITEFUSE_USER_ID")
        or os.environ.get("USER")
        or "hermes-user"
    )


def _set_trace_attrs(
    root,
    *,
    trace_name: str,
    session_id: str,
    user_id: str,
    tags: list,
    input_obj: Any,
    metadata: dict,
    as_root: bool = True,
) -> None:
    """Set top-of-trace OTel attributes that Langfuse uses for the
    Trace row (vs the root observation). Mirrors what the shell-hook
    integration did via _set_trace_attrs but in-process.

    ``as_root`` controls whether AS_ROOT=True is set on this span. The
    long-lived container span passes ``True``; the quick bootstrap
    span passes ``False`` so Litefuse interprets the bootstrap's
    TRACE_* attrs as an UPDATE to the container's trace rather than
    creating a separate "bootstrap" sibling trace.
    """
    try:
        from langfuse import LangfuseOtelSpanAttributes as LFA
    except Exception as exc:
        logger.debug("LangfuseOtelSpanAttributes unavailable: %s", exc)
        return
    span = getattr(root, "_otel_span", None)
    if span is None or not hasattr(span, "set_attribute"):
        return
    try:
        if as_root:
            span.set_attribute(LFA.AS_ROOT, True)
        span.set_attribute(LFA.TRACE_NAME, trace_name)
        if session_id:
            span.set_attribute(LFA.TRACE_SESSION_ID, session_id)
        if user_id:
            span.set_attribute(LFA.TRACE_USER_ID, user_id)
        if tags:
            span.set_attribute(LFA.TRACE_TAGS, list(tags))
        if input_obj is not None:
            span.set_attribute(
                LFA.TRACE_INPUT,
                input_obj if isinstance(input_obj, str)
                else json.dumps(input_obj, default=str),
            )
        if metadata:
            span.set_attribute(
                LFA.TRACE_METADATA, json.dumps(metadata, default=str)
            )
    except Exception as exc:
        logger.debug("set_attribute failed: %s", exc)


def _usage_details(usage: Optional[dict]) -> Optional[dict]:
    """Map Hermes' usage dict to Anthropic-style keys so Litefuse cost
    mapping picks the right model price tier.
    """
    if not usage:
        return None
    out: Dict[str, int] = {}
    for k in (
        "input_tokens",
        "output_tokens",
        "cache_read_input_tokens",
        "cache_creation_input_tokens",
    ):
        v = usage.get(k)
        if isinstance(v, int):
            out[k] = v
    # Hermes' Anthropic adapter exposes cache fields as
    # cache_read_tokens / cache_write_tokens — translate.
    if "cache_read_input_tokens" not in out and isinstance(
        usage.get("cache_read_tokens"), int
    ):
        out["cache_read_input_tokens"] = usage["cache_read_tokens"]
    if "cache_creation_input_tokens" not in out and isinstance(
        usage.get("cache_write_tokens"), int
    ):
        out["cache_creation_input_tokens"] = usage["cache_write_tokens"]
    # OpenAI-style fallback.
    if "input_tokens" not in out and isinstance(
        usage.get("prompt_tokens"), int
    ):
        out["input_tokens"] = usage["prompt_tokens"]
    if "output_tokens" not in out and isinstance(
        usage.get("completion_tokens"), int
    ):
        out["output_tokens"] = usage["completion_tokens"]
    return out or None


# ---------------------------------------------------------------------------
# Hook callbacks — all accept **kwargs for forward compatibility
# ---------------------------------------------------------------------------


def _on_session_start(**kwargs: Any) -> None:
    _attach_file_handler()
    session_id = _session_key(kwargs)
    model = kwargs.get("model") or ""
    platform = kwargs.get("platform") or ""
    s = _get_session(session_id)
    s["model"] = model
    s["platform"] = platform
    logger.debug(
        "session_start session=%s model=%s platform=%s",
        session_id, model, platform,
    )


def _on_pre_llm_call(**kwargs: Any) -> None:
    """Bootstrap a Litefuse trace for the new turn.

    Strategy: emit a single bootstrap span with all trace-level
    attributes (session_id, user_id, tags, input, metadata) and END IT
    IMMEDIATELY so Litefuse receives the trace metadata at the start of
    the turn rather than at the end. Subsequent api / tool observations
    are emitted as root-level siblings via ``trace_context`` so the
    trace stays open across the whole turn while each child still
    appears in real time as it ends.

    This double-act fixes the "session/name/input empty until trace
    ends" issue caused by OTel attribute export deferring to span end.
    """
    _attach_file_handler()
    lf = _get_client()
    if lf is None:
        return

    session_id = _session_key(kwargs)
    user_message = kwargs.get("user_message") or ""
    model = kwargs.get("model") or ""
    platform = kwargs.get("platform") or ""
    is_first_turn = bool(kwargs.get("is_first_turn"))
    conversation_history = kwargs.get("conversation_history") or []

    s = _get_session(session_id)
    s["model"] = s["model"] or model
    s["platform"] = s["platform"] or platform
    s["turn_no"] = s.get("turn_no", 0) + 1
    s["api_seq"] = 0   # reset per-turn API counter
    s["tool_seq"] = 0  # reset per-turn tool counter

    trace_name = f"Hermes Agent — Turn {s['turn_no']}"
    user_input = {"role": "user", "content": _truncate(user_message)}
    # All Hermes-specific fields live under a single "hermes_agent"
    # key so Litefuse / Langfuse's UI groups them cleanly under that
    # namespace and the top-level metadata dict only contains Litefuse-
    # standard keys (session_id, user_id, etc. — set via OTel attrs
    # at the trace level, not as metadata).
    metadata = {
        "hermes_agent": {
            "turn_number": s["turn_no"],
            "platform": platform,
            "model": model,
            "is_first_turn": is_first_turn,
            "history_messages": len(conversation_history),
        }
    }

    tags = ["hermes-agent", "litefuse"] + ([f"model:{model}"] if model else [])

    try:
        # Container span — long-lived parent of every observation we
        # emit for this turn. Stays open until post_llm_call so api,
        # tool, and final-response spans nest underneath instead of
        # ending up as root-level siblings. Critical for two reasons:
        #
        # 1. Trace structure: Litefuse renders the container as the
        #    single "Hermes Agent — Turn N" parent in the timeline,
        #    with all children grouped underneath.
        # 2. Trace-header stability: root-level siblings cause
        #    Litefuse to repeatedly overwrite the Trace row's
        #    name/input from the latest sibling's name/input.
        #    Children of an open container don't trigger that update,
        #    so the trace header set by the bootstrap child below
        #    stays put until we end the container with the final
        #    output at end-of-turn.
        container = lf.start_observation(
            name=trace_name,
            as_type="span",
            input=user_input,
            metadata=metadata,
        )
        _set_trace_attrs(
            container,
            trace_name=trace_name,
            session_id=session_id,
            user_id=_user_id(),
            tags=tags,
            input_obj=user_input,
            metadata=metadata,
        )
        s["container"] = container
        s["trace_id"] = getattr(container, "trace_id", None)

        # Flush trace header (name / input / session_id / user_id /
        # tags) to Litefuse immediately, without waiting for the
        # container to end at post_llm_call.
        #
        # Why we need a separate bootstrap span:
        #   Langfuse exports OTel attrs only when a span ENDS. The
        #   container's TRACE_* attrs above won't reach Litefuse
        #   until end-of-turn (potentially minutes later). To get
        #   the trace header to appear in the Litefuse UI early, we
        #   create a tiny "bootstrap" child span, copy the same
        #   TRACE_* attrs onto it, end it immediately, and force-flush.
        #
        # Two pitfalls we navigated:
        #   1. ``as_type="event"`` in langfuse v4 creates a span
        #      with a FRESH trace_id instead of inheriting from its
        #      parent. That caused a spurious sibling "trace_init"
        #      trace. ``as_type="span"`` correctly inherits, so we
        #      use that.
        #   2. ``AS_ROOT=False`` on the bootstrap causes Litefuse to
        #      ignore its TRACE_* attrs because no AS_ROOT span has
        #      created the trace yet. We set AS_ROOT=True on both
        #      bootstrap and container. They share trace_id (verified)
        #      so this is "two AS_ROOT spans, same trace_id" — Litefuse
        #      treats the second one as an UPDATE to the first's
        #      trace, with the same TRACE_NAME/INPUT etc. there is
        #      nothing to churn.
        #
        # The bootstrap shows up as a brief sibling observation in the
        # timeline with the same name as the container; visually it's
        # a small extra row but the trace structure is correct.
        try:
            # Bootstrap doubles as a semantic "user message" observation:
            # (a) flushes the trace header to Litefuse immediately by
            # ending right away, (b) gives the timeline a clean opening
            # child that mirrors the closing "assistant response" child
            # emitted at post_llm_call.
            #
            # We create this as ``as_type="span"`` (not "event") because:
            #   - ``container.start_observation(as_type="event")`` in
            #     langfuse v4 assigns a *fresh* trace_id rather than
            #     inheriting from its parent, which would spawn a
            #     sibling "user message" trace (verified empirically).
            #   - ``container.create_event(...)`` does inherit trace_id
            #     correctly, but Litefuse doesn't propagate AS_ROOT /
            #     TRACE_* attributes from event-type observations to
            #     the trace row, so the trace header (name / session /
            #     input) ends up empty until the container ends.
            #
            # Instead we build the bootstrap as a span (which DOES
            # propagate TRACE_* on flush), then explicitly override
            # ``LFA.OBSERVATION_TYPE`` to "event" so Litefuse's UI
            # still renders it with the event icon / styling.
            bootstrap = container.start_observation(
                name="user message",
                as_type="span",
                input=user_input,
            )
            _set_trace_attrs(
                bootstrap,
                trace_name=trace_name,
                session_id=session_id,
                user_id=_user_id(),
                tags=tags,
                input_obj=user_input,
                metadata=metadata,
                as_root=True,
            )
            # Override observation_type so the Litefuse timeline
            # renders this span as an EVENT (point-in-time marker)
            # rather than a SPAN (duration bar).
            try:
                from langfuse import LangfuseOtelSpanAttributes as LFA
                otel_span = getattr(bootstrap, "_otel_span", None)
                if otel_span is not None and hasattr(otel_span, "set_attribute"):
                    otel_span.set_attribute(LFA.OBSERVATION_TYPE, "event")
            except Exception:
                pass
            bootstrap.end()
            try:
                lf.flush()  # push the bootstrap immediately
            except Exception:
                pass
        except Exception as exc:
            logger.debug("bootstrap event emit failed: %s", exc)
    except Exception as exc:
        logger.warning("container emit failed: %s", exc)
        return

    # Cache for pre_tool_call to recover the session_id that v0.12.0
    # forgets to forward.
    _current.session_id = session_id
    logger.debug(
        "pre_llm_call session=%s turn=%d trace=%s",
        session_id, s["turn_no"],
        (s["trace_id"] or "")[:8],
    )


def _on_post_api_request(**kwargs: Any) -> None:
    """Emit one generation observation per API call inside the turn."""
    _attach_file_handler()
    lf = _get_client()
    if lf is None:
        return

    session_id = _session_key(kwargs)
    s = _get_session(session_id)
    container = s.get("container")
    if container is None:
        logger.debug(
            "post_api_request session=%s but no open container — skipping",
            session_id,
        )
        return

    s["api_seq"] += 1
    seq = s["api_seq"]
    model = kwargs.get("response_model") or kwargs.get("model") or s.get("model")
    finish_reason = kwargs.get("finish_reason")
    tool_call_count = kwargs.get("assistant_tool_call_count") or 0
    content_chars = kwargs.get("assistant_content_chars") or 0
    api_duration = kwargs.get("api_duration")

    summary = (
        f"finish={finish_reason} • duration={api_duration:.2f}s • "
        f"tool_calls={tool_call_count} • content_chars={content_chars}"
        if isinstance(api_duration, (int, float))
        else f"finish={finish_reason} • tool_calls={tool_call_count} • "
             f"content_chars={content_chars}"
    )

    try:
        gen = container.start_observation(
            name=f"api: {model} #{seq}",
            as_type="generation",
            input={
                "model": model,
                "message_count": kwargs.get("message_count"),
                "api_mode": kwargs.get("api_mode"),
                "provider": kwargs.get("provider"),
            },
            output=summary,
            model=model,
            usage_details=_usage_details(kwargs.get("usage")),
            metadata={
                "hermes_agent": {
                    "provider": kwargs.get("provider"),
                    "base_url": kwargs.get("base_url"),
                    "api_mode": kwargs.get("api_mode"),
                    "api_call_count": kwargs.get("api_call_count"),
                    "api_duration": api_duration,
                    "finish_reason": finish_reason,
                    "message_count": kwargs.get("message_count"),
                    "assistant_content_chars": content_chars,
                    "assistant_tool_call_count": tool_call_count,
                    "step_index": seq,
                }
            },
        )
        gen.end()
    except Exception as exc:
        logger.warning("api generation emit failed: %s", exc)


def _on_pre_tool_call(**kwargs: Any) -> None:
    """Open a tool observation. Will be closed by post_tool_call."""
    _attach_file_handler()
    lf = _get_client()
    if lf is None:
        return

    session_id = _session_key(kwargs)
    s = _get_session(session_id)
    container = s.get("container")
    if container is None:
        return

    tool_name = kwargs.get("tool_name") or "tool"
    args = kwargs.get("args")
    s["tool_seq"] += 1
    seq = s["tool_seq"]

    try:
        tool_span = container.start_observation(
            name=f"tool: {tool_name} (#{seq})",
            as_type="tool",
            input={
                "tool_name": tool_name,
                "arguments": args,
            },
            metadata={
                "hermes_agent": {
                    "tool_name": tool_name,
                    "task_id": kwargs.get("task_id"),
                    "step_index": seq,
                    # Hermes v0.12.0 leaves tool_call_id empty at
                    # pre_tool_call; gets filled at post_tool_call
                    # and we merge it back onto the same span there.
                }
            },
        )
        s["tool_q"].append({"span": tool_span, "tool_name": tool_name, "seq": seq})
    except Exception as exc:
        logger.warning("pre_tool_call emit failed: %s", exc)


def _on_post_tool_call(**kwargs: Any) -> None:
    """Close the tool observation opened by pre_tool_call."""
    _attach_file_handler()
    session_id = _session_key(kwargs)
    s = _get_session(session_id)
    queue = s["tool_q"]
    tool_name = kwargs.get("tool_name") or "tool"

    # Prefer popping a same-tool-name entry off the queue (in case
    # parallel calls landed out of FIFO order); otherwise just take the
    # oldest. Serial-tool turns always hit the popleft branch.
    entry = None
    if queue and queue[0]["tool_name"] == tool_name:
        entry = queue.popleft()
    else:
        for i, e in enumerate(queue):
            if e["tool_name"] == tool_name:
                entry = e
                del queue[i]
                break
    if entry is None:
        logger.debug(
            "post_tool_call session=%s tool=%s — no open pre_tool_call span "
            "(queue=%s); emitting self-contained tool obs",
            session_id, tool_name, [e["tool_name"] for e in queue],
        )
        container = s.get("container")
        if container is None:
            return
        s["tool_seq"] += 1
        seq = s["tool_seq"]
        try:
            tool_span = container.start_observation(
                name=f"tool: {tool_name} (#{seq})",
                as_type="tool",
                input={"tool_name": tool_name, "arguments": kwargs.get("args")},
                metadata={
                    "hermes_agent": {
                        "step_index": seq,
                        "no_matching_pre": True,
                    }
                },
            )
        except Exception as exc:
            logger.warning("post_tool_call fallback emit failed: %s", exc)
            return
    else:
        tool_span = entry["span"]

    result = kwargs.get("result")
    duration_ms = kwargs.get("duration_ms")
    is_error = _looks_like_error(result)

    try:
        tool_span.update(
            output=_truncate(result if isinstance(result, str) else json.dumps(result, default=str)),
            level="ERROR" if is_error else None,
            metadata={
                "hermes_agent": {
                    "duration_ms": duration_ms,
                    "is_error": is_error,
                    "tool_call_id": kwargs.get("tool_call_id"),
                }
            },
        )
        tool_span.end()
    except Exception as exc:
        logger.warning("post_tool_call end failed: %s", exc)


def _on_post_llm_call(**kwargs: Any) -> None:
    """End-of-turn boundary. Emits the final 'assistant response' root
    sibling and also updates the trace-level output attribute so the
    Trace view in Litefuse shows the final answer.
    """
    _attach_file_handler()
    session_id = _session_key(kwargs)
    s = _get_session(session_id)
    container = s.get("container")
    if container is None:
        return

    assistant_response = kwargs.get("assistant_response") or ""
    lf = _get_client()

    # Defensively close any tool spans that are still open.
    while s["tool_q"]:
        entry = s["tool_q"].popleft()
        try:
            entry["span"].update(level="ERROR", metadata={"leaked": True})
            entry["span"].end()
        except Exception:
            pass

    # Emit final assistant response as a child of the container so
    # users can see the assistant's final text inline with the other
    # turn steps. The container itself also carries the response as
    # its own output (set below) — that's what feeds the trace header.
    try:
        final = container.start_observation(
            name="assistant response",
            as_type="generation",
            output=_truncate(assistant_response),
            model=s.get("model"),
            metadata={
                "hermes_agent": {
                    "step_index": s["api_seq"] + s["tool_seq"] + 1,
                }
            },
        )
        final.end()
    except Exception as exc:
        logger.warning("final response span emit failed: %s", exc)

    # Close the container: set its output and the trace-level
    # TRACE_OUTPUT attribute so Litefuse's Trace row shows the final
    # answer. Trace name / input / session / user / tags were already
    # set on the container at pre_llm_call (and flushed early by the
    # bootstrap event child); they don't need re-asserting here
    # because the container hasn't been overwritten by any sibling.
    try:
        container.update(output=_truncate(assistant_response))
        otel_span = getattr(container, "_otel_span", None)
        if otel_span is not None and hasattr(otel_span, "set_attribute"):
            try:
                from langfuse import LangfuseOtelSpanAttributes as LFA
                otel_span.set_attribute(
                    LFA.TRACE_OUTPUT, _truncate(assistant_response)
                )
            except Exception:
                pass
        container.end()
    except Exception as exc:
        logger.warning("container.end failed: %s", exc)

    # Per-turn cleanup: clear container so the NEXT turn's
    # pre_llm_call starts a fresh trace, but PRESERVE turn_no / model
    # / platform so subsequent turns get incrementing turn numbers
    # and consistent metadata.
    s["container"] = None
    s["trace_id"] = None
    s["tool_q"].clear()
    try:
        del _current.session_id
    except AttributeError:
        pass

    logger.info(
        "turn complete session=%s turn=%d api_calls=%d tool_calls=%d",
        session_id, s["turn_no"], s["api_seq"], s["tool_seq"],
    )

    if lf is not None:
        try:
            lf.flush()
        except Exception as exc:
            logger.debug("flush failed: %s", exc)


def _on_session_end(**kwargs: Any) -> None:
    """Per-turn end (NOT per-session end despite the name — Hermes
    fires this at the end of every ``run_conversation`` call). We do
    almost nothing here: ``post_llm_call`` already closed the trace.
    The defensive cleanup only catches interrupted/error turns where
    ``post_llm_call`` did not fire.
    """
    _attach_file_handler()
    session_id = _session_key(kwargs)
    with _state_lock:
        s = _state.get(session_id)
    if s is None or s.get("container") is None:
        return  # post_llm_call already closed cleanly

    # Interrupted or errored turn — close any leftover tool spans
    # and the container itself.
    for entry in list(s.get("tool_q") or ()):
        try:
            entry["span"].update(level="ERROR", metadata={"leaked": True})
            entry["span"].end()
        except Exception:
            pass
    s["tool_q"].clear()

    container = s.get("container")
    if container is not None:
        try:
            container.update(
                level="ERROR" if kwargs.get("interrupted") else None,
                metadata={
                    "hermes_agent": {
                        "interrupted": kwargs.get("interrupted"),
                        "completed": kwargs.get("completed"),
                    }
                },
            )
            container.end()
        except Exception:
            pass
        s["container"] = None
    s["trace_id"] = None
    try:
        del _current.session_id
    except AttributeError:
        pass

    lf = _get_client()
    if lf is not None:
        try:
            lf.flush()
        except Exception:
            pass


def _on_session_reset(**kwargs: Any) -> None:
    """Hermes drops the session and starts a new one (``/new`` slash
    command, ``/clear`` etc.). Reset turn counter.
    """
    _attach_file_handler()
    session_id = _session_key(kwargs)
    _drop_session(session_id)


def _on_session_finalize(**kwargs: Any) -> None:
    """Hermes is tearing down the session entirely (gateway shutdown,
    explicit finalization). Drop all in-memory state for this session.
    """
    _attach_file_handler()
    session_id = _session_key(kwargs)
    _drop_session(session_id)


def _looks_like_error(result: Any) -> bool:
    if not result:
        return False
    s = result if isinstance(result, str) else json.dumps(result, default=str)
    head = s.lstrip()[:200].lower()
    return (
        head.startswith("error")
        or '"error"' in head
        or '"success": false' in head
        or '"success":false' in head
    )


# ---------------------------------------------------------------------------
# Plugin entry point
# ---------------------------------------------------------------------------


def register(ctx) -> None:
    """Register all hooks with the Hermes plugin manager."""
    _attach_file_handler()
    ctx.register_hook("on_session_start", _on_session_start)
    ctx.register_hook("pre_llm_call", _on_pre_llm_call)
    ctx.register_hook("post_api_request", _on_post_api_request)
    ctx.register_hook("pre_tool_call", _on_pre_tool_call)
    ctx.register_hook("post_tool_call", _on_post_tool_call)
    ctx.register_hook("post_llm_call", _on_post_llm_call)
    ctx.register_hook("on_session_end", _on_session_end)
    ctx.register_hook("on_session_reset", _on_session_reset)
    ctx.register_hook("on_session_finalize", _on_session_finalize)
    logger.info("litefuse plugin registered (9 hooks)")
