#!/usr/bin/env python3
"""
Claude Code -> Langfuse hook (Litefuse build, v3-compatible)

Fixes vs the upstream langfuse_hook.py from langfuse.com/integrations/other/claude-code:

  0. The upstream script imports `propagate_attributes` which does NOT exist
     in langfuse-python >= 3.x. The bare `except Exception: sys.exit(0)` at
     module load made it silently no-op. This rewrite uses the v3 OTel layer
     directly via the configured Langfuse tracer.
  1. Each assistant message becomes its own generation span. Tool spans are
     emitted as temporal siblings under the Turn span, in the order they ran.
     (Upstream collapsed everything into one fake "Claude Response" span.)
  2. start_time / end_time on every span come from the JSONL row timestamps,
     not the wall-clock at hook-execution time. The Langfuse timeline now
     reflects the true duration of LLM calls and tool executions.
"""

import copy
import getpass
import hashlib
import json
import os
import socket
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple

try:
    from langfuse import Langfuse, LangfuseOtelSpanAttributes as LFA
    from opentelemetry import trace as otel_trace_api
except Exception:
    sys.exit(0)

STATE_DIR = Path.home() / ".claude" / "state"
LOG_FILE = STATE_DIR / "litefuse_hook.log"
STATE_FILE = STATE_DIR / "litefuse_state.json"
LOCK_FILE = STATE_DIR / "litefuse_state.lock"

DEBUG = os.environ.get("CC_LANGFUSE_DEBUG", "").lower() == "true"
MAX_CHARS = int(os.environ.get("CC_LANGFUSE_MAX_CHARS", "1000000"))


def _log(level: str, message: str) -> None:
    try:
        STATE_DIR.mkdir(parents=True, exist_ok=True)
        ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        with open(LOG_FILE, "a", encoding="utf-8") as f:
            f.write(f"{ts} [{level}] {message}\n")
    except Exception:
        pass


def debug(msg: str) -> None:
    if DEBUG:
        _log("DEBUG", msg)


def info(msg: str) -> None:
    _log("INFO", msg)


def warn(msg: str) -> None:
    _log("WARN", msg)


def error(msg: str) -> None:
    _log("ERROR", msg)


# ---------------- File locking + state ----------------
class FileLock:
    def __init__(self, path: Path, timeout_s: float = 2.0):
        self.path = path
        self.timeout_s = timeout_s
        self._fh = None

    def __enter__(self):
        STATE_DIR.mkdir(parents=True, exist_ok=True)
        self._fh = open(self.path, "a+", encoding="utf-8")
        try:
            import fcntl
            deadline = time.time() + self.timeout_s
            while True:
                try:
                    fcntl.flock(self._fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
                    break
                except BlockingIOError:
                    if time.time() > deadline:
                        break
                    time.sleep(0.05)
        except Exception:
            pass
        return self

    def __exit__(self, exc_type, exc, tb):
        try:
            import fcntl
            fcntl.flock(self._fh.fileno(), fcntl.LOCK_UN)
        except Exception:
            pass
        try:
            self._fh.close()
        except Exception:
            pass


def load_state() -> Dict[str, Any]:
    try:
        if not STATE_FILE.exists():
            return {}
        return json.loads(STATE_FILE.read_text(encoding="utf-8"))
    except Exception:
        return {}


def save_state(state: Dict[str, Any]) -> None:
    try:
        STATE_DIR.mkdir(parents=True, exist_ok=True)
        tmp = STATE_FILE.with_suffix(".tmp")
        tmp.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
        os.replace(tmp, STATE_FILE)
    except Exception as e:
        debug(f"save_state failed: {e}")


def state_key(session_id: str, transcript_path: str) -> str:
    raw = f"{session_id}::{transcript_path}"
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()


# ---------------- Hook payload ----------------
def read_hook_payload() -> Dict[str, Any]:
    try:
        data = sys.stdin.read()
        if not data.strip():
            return {}
        return json.loads(data)
    except Exception:
        return {}


def extract_session_and_transcript(payload: Dict[str, Any]) -> Tuple[Optional[str], Optional[Path]]:
    session_id = (
        payload.get("sessionId")
        or payload.get("session_id")
        or payload.get("session", {}).get("id")
    )
    transcript = (
        payload.get("transcriptPath")
        or payload.get("transcript_path")
        or payload.get("transcript", {}).get("path")
    )
    if transcript:
        try:
            transcript_path = Path(transcript).expanduser().resolve()
        except Exception:
            transcript_path = None
    else:
        transcript_path = None
    return session_id, transcript_path


# ---------------- Transcript helpers ----------------
def parse_ts(s: Any) -> Optional[datetime]:
    if not isinstance(s, str) or not s:
        return None
    try:
        return datetime.fromisoformat(s.replace("Z", "+00:00"))
    except Exception:
        return None


def get_row_ts(row: Dict[str, Any]) -> Optional[datetime]:
    return parse_ts(row.get("timestamp"))


def get_content(msg: Dict[str, Any]) -> Any:
    if not isinstance(msg, dict):
        return None
    if "message" in msg and isinstance(msg.get("message"), dict):
        return msg["message"].get("content")
    return msg.get("content")


def get_role(msg: Dict[str, Any]) -> Optional[str]:
    t = msg.get("type")
    if t in ("user", "assistant"):
        return t
    m = msg.get("message")
    if isinstance(m, dict):
        r = m.get("role")
        if r in ("user", "assistant"):
            return r
    return None


def is_tool_result(msg: Dict[str, Any]) -> bool:
    if get_role(msg) != "user":
        return False
    content = get_content(msg)
    if isinstance(content, list):
        return any(isinstance(x, dict) and x.get("type") == "tool_result" for x in content)
    return False


def iter_tool_results(content: Any) -> List[Dict[str, Any]]:
    if isinstance(content, list):
        return [x for x in content if isinstance(x, dict) and x.get("type") == "tool_result"]
    return []


def iter_tool_uses(content: Any) -> List[Dict[str, Any]]:
    if isinstance(content, list):
        return [x for x in content if isinstance(x, dict) and x.get("type") == "tool_use"]
    return []


def extract_text(content: Any) -> str:
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        parts: List[str] = []
        for x in content:
            if isinstance(x, dict) and x.get("type") == "text":
                parts.append(x.get("text", ""))
            elif isinstance(x, str):
                parts.append(x)
        return "\n".join([p for p in parts if p])
    return ""


def truncate_text(s: Any, max_chars: int = MAX_CHARS) -> Tuple[str, Dict[str, Any]]:
    if s is None:
        return "", {"truncated": False, "orig_len": 0}
    if not isinstance(s, str):
        s = json.dumps(s, ensure_ascii=False, default=str)
    orig_len = len(s)
    if orig_len <= max_chars:
        return s, {"truncated": False, "orig_len": orig_len}
    head = s[:max_chars]
    return head, {
        "truncated": True,
        "orig_len": orig_len,
        "kept_len": len(head),
        "sha256": hashlib.sha256(s.encode("utf-8")).hexdigest(),
    }


def get_model(msg: Dict[str, Any]) -> str:
    m = msg.get("message")
    if isinstance(m, dict):
        return m.get("model") or "claude"
    return "claude"


def get_message_id(msg: Dict[str, Any]) -> Optional[str]:
    m = msg.get("message")
    if isinstance(m, dict):
        mid = m.get("id")
        if isinstance(mid, str) and mid:
            return mid
    return None


def get_msg_field(row: Dict[str, Any], key: str, default: Any = None) -> Any:
    """Read a field from the inner `.message` dict of a transcript row."""
    m = row.get("message") if isinstance(row, dict) else None
    if isinstance(m, dict):
        return m.get(key, default)
    return default


def get_usage(row: Dict[str, Any]) -> Dict[str, Any]:
    """Anthropic usage block from an assistant row (or {} if absent)."""
    u = get_msg_field(row, "usage")
    return u if isinstance(u, dict) else {}


def extract_usage_details(row: Dict[str, Any]) -> Dict[str, int]:
    """Pull token counters in shape Langfuse can use for cost calc.
    Uses Anthropic's native key names so model price configs match.

    Only the four canonical Anthropic counters go in here — the ephemeral
    1h/5m breakdown is already summed in cache_creation_input_tokens, and
    server_tool_use is a per-request count (not tokens). Including them
    here would double-count when Litefuse sums `total` from all usage_details
    entries (verified: produced totalTokens=545549 vs real ~281K)."""
    u = get_usage(row)
    out: Dict[str, int] = {}
    for k in ("input_tokens", "output_tokens",
              "cache_read_input_tokens", "cache_creation_input_tokens"):
        v = u.get(k)
        if isinstance(v, int):
            out[k] = v
    return out


def is_api_error(row: Dict[str, Any]) -> bool:
    if not isinstance(row, dict):
        return False
    return bool(row.get("isApiErrorMessage") or row.get("apiErrorStatus") or row.get("error"))


def extract_tool_use_result_summary(tur: Any) -> Dict[str, Any]:
    """Pull the useful fields out of a `toolUseResult` dict (which varies by
    tool: Bash has stdout/stderr/interrupted, WebFetch has code/bytes, Task
    has agentId/agentType, BashOutput has backgroundTaskId, etc.). Keep this
    a flat dict of small values — the raw payload is already in the tool
    span's OBSERVATION_OUTPUT, no need to duplicate."""
    if not isinstance(tur, dict):
        return {}
    out: Dict[str, Any] = {}
    # Bash-style execution
    for k in ("interrupted", "isImage", "noOutputExpected"):
        if k in tur:
            out[k] = tur[k]
    if isinstance(tur.get("command"), str):
        out["command"] = tur["command"][:300]
    if isinstance(tur.get("stdout"), str):
        out["stdout_len"] = len(tur["stdout"])
    if isinstance(tur.get("stderr"), str):
        out["stderr_len"] = len(tur["stderr"])
        if tur["stderr"]:
            out["has_stderr"] = True
    # HTTP / WebFetch / WebSearch
    for k in ("code", "codeText", "bytes"):
        if k in tur:
            out[k] = tur[k]
    # Pagination (Glob/Grep/Read)
    for k in ("appliedLimit", "appliedOffset"):
        if k in tur:
            out[k] = tur[k]
    # Subagent / Task tool
    for k in ("agentId", "agentType"):
        if k in tur:
            out[k] = tur[k]
    # Background / shell tools
    for k in ("backgroundTaskId", "assistantAutoBackgrounded"):
        if k in tur:
            out[k] = tur[k]
    # Item counts (TodoWrite, Glob, Grep ...)
    if "itemCount" in tur:
        out["itemCount"] = tur["itemCount"]
    return out


def summarize_user_content(content: Any) -> Dict[str, Any]:
    """Count text/image blocks in a user message and surface media types
    without dumping the base64 image data into metadata."""
    if not isinstance(content, list):
        return {}
    text_blocks = image_blocks = 0
    media_types: List[str] = []
    for b in content:
        if not isinstance(b, dict):
            continue
        t = b.get("type")
        if t == "text":
            text_blocks += 1
        elif t == "image":
            image_blocks += 1
            src = b.get("source") or {}
            mt = src.get("media_type")
            if isinstance(mt, str):
                media_types.append(mt)
    if not (text_blocks or image_blocks):
        return {}
    return {
        "text_blocks": text_blocks,
        "image_blocks": image_blocks,
        "image_media_types": media_types or None,
    }


def normalize_tool_result_content(content: Any) -> Any:
    """tool_result.content can be a string, or a list mixing text and image
    blocks. Preserve text faithfully; replace image base64 with a size
    placeholder so trace storage stays sane."""
    if isinstance(content, str):
        text, _ = truncate_text(content)
        return text
    if not isinstance(content, list):
        return content
    out: List[Any] = []
    for b in content:
        if not isinstance(b, dict):
            out.append(b)
            continue
        t = b.get("type")
        if t == "text":
            text, _ = truncate_text(b.get("text", ""))
            out.append({"type": "text", "text": text})
        elif t == "image":
            src = b.get("source") or {}
            data = src.get("data", "")
            out.append({
                "type": "image",
                "source": {
                    "type": src.get("type"),
                    "media_type": src.get("media_type"),
                    "data_len_b64": len(data) if isinstance(data, str) else None,
                },
            })
        else:
            out.append({"type": t} if t else b)
    return out


# ---------------- Incremental reader ----------------
@dataclass
class SessionState:
    offset: int = 0
    buffer: str = ""
    turn_count: int = 0
    # User-message UUIDs whose turn has already been emitted as a
    # Litefuse trace. The Stop hook can fire while a turn is still in
    # progress (e.g., while the agent is waiting for tool dispatch
    # between tool_use and tool_result), then NOT fire again — the
    # subsequent rows would be stranded past the byte offset. To
    # recover, we re-read the same byte range on the next firing and
    # use this UUID set to dedup against turns we've already emitted.
    # The set grows by one per turn and is small (~36 bytes per UUID)
    # so it's fine to carry indefinitely.
    emitted_user_uuids: List[str] = field(default_factory=list)


def load_session_state(global_state: Dict[str, Any], key: str) -> SessionState:
    s = global_state.get(key, {})
    raw_emitted = s.get("emitted_user_uuids") or []
    return SessionState(
        offset=int(s.get("offset", 0)),
        buffer=str(s.get("buffer", "")),
        turn_count=int(s.get("turn_count", 0)),
        emitted_user_uuids=list(raw_emitted) if isinstance(raw_emitted, list) else [],
    )


def write_session_state(global_state: Dict[str, Any], key: str, ss: SessionState) -> None:
    global_state[key] = {
        "offset": ss.offset,
        "buffer": ss.buffer,
        "turn_count": ss.turn_count,
        "emitted_user_uuids": ss.emitted_user_uuids,
        "updated": datetime.now(timezone.utc).isoformat(),
    }


def _is_turn_complete(turn: "Turn") -> bool:
    """Return True iff the turn's last assistant row contains a ``text``
    block — Claude Code's signature "Final response" marker.

    Every well-formed Claude Code turn ends with a text response: the
    model emits one final `text` content block summarizing what it did.
    Rows in the middle of a turn are tool_use, thinking, or
    intermediate text — but the LAST row is always text.

    A weaker heuristic — "all tool_use blocks have matching
    tool_results" — was incorrect: tool_results arriving doesn't mean
    the agent is done responding, just that the current tool finished.
    The agent might be in the middle of a tool loop and about to
    dispatch another tool_use. That heuristic caused Turn 28 to emit
    a partial trace ending at `Tool call: Bash (#15)`, missing all
    the subsequent assistant rows.

    Used to defer in-progress turns by ``main()``: if the Stop hook
    fires while the agent is still working (e.g., between tool
    dispatch boundaries), the turn isn't complete yet. We rewind the
    offset so the next Stop firing re-reads the same bytes and
    re-evaluates — by then the closing text row has usually landed.
    """
    if not turn.assistant_rows:
        return False
    last = turn.assistant_rows[-1]
    content = last.msg.get("message", {}).get("content")
    if not isinstance(content, list):
        # Plain-string content is rare on assistant rows; treat as
        # complete since we have nothing better to check.
        return True
    block_types = [
        b.get("type") for b in content if isinstance(b, dict)
    ]
    return "text" in block_types


def read_new_jsonl(transcript_path: Path, ss: SessionState) -> Tuple[List[Dict[str, Any]], SessionState]:
    if not transcript_path.exists():
        return [], ss
    try:
        with open(transcript_path, "rb") as f:
            f.seek(ss.offset)
            chunk = f.read()
            new_offset = f.tell()
    except Exception as e:
        debug(f"read_new_jsonl failed: {e}")
        return [], ss

    if not chunk:
        return [], ss

    try:
        text = chunk.decode("utf-8", errors="replace")
    except Exception:
        text = chunk.decode(errors="replace")

    combined = ss.buffer + text
    lines = combined.split("\n")
    ss.buffer = lines[-1]
    ss.offset = new_offset

    msgs: List[Dict[str, Any]] = []
    for line in lines[:-1]:
        line = line.strip()
        if not line:
            continue
        try:
            msgs.append(json.loads(line))
        except Exception:
            continue
    return msgs, ss


# ---------------- Turn assembly (timestamp-aware) ----------------
@dataclass
class AssistantRow:
    """One assistant-role JSONL row. Claude Code splits a single Anthropic
    API response across multiple rows (one content block per row: thinking,
    text, tool_use, ...). Each row becomes its own generation observation —
    we don't merge by message.id."""
    msg: Dict[str, Any]
    ts: Optional[datetime] = None
    # True iff this is the LAST row carrying this message.id. Every row of
    # the same message has the same `usage` block, so we attach the
    # usage_details first-class field only to the last row, avoiding N-way
    # double-counting on the trace's totalTokens.
    is_last_in_message: bool = False


@dataclass
class ToolResultEntry:
    content: Any
    timestamp: Optional[datetime] = None
    is_error: bool = False
    source_row: Optional[Dict[str, Any]] = None  # full user row containing tool_result


@dataclass
class Turn:
    user_msg: Dict[str, Any]
    user_ts: Optional[datetime]
    assistant_rows: List[AssistantRow] = field(default_factory=list)
    tool_results_by_id: Dict[str, ToolResultEntry] = field(default_factory=dict)
    # System rows (API errors, retries, hook errors, throttling) that
    # happened during this turn's time window. Captured separately because
    # they don't fit the user/assistant/tool_result trio but carry real
    # signal for debugging.
    system_rows: List[Dict[str, Any]] = field(default_factory=list)


def build_turns(rows: List[Dict[str, Any]]) -> List[Turn]:
    turns: List[Turn] = []
    current_user: Optional[Dict[str, Any]] = None
    current_user_ts: Optional[datetime] = None
    assistant_rows: List[AssistantRow] = []
    tool_results: Dict[str, ToolResultEntry] = {}
    system_rows: List[Dict[str, Any]] = []
    seen_row_uuids: Set[str] = set()

    def flush() -> bool:
        """Emit the in-progress turn. Returns True iff a turn was emitted."""
        nonlocal current_user, current_user_ts, assistant_rows, tool_results, system_rows
        if current_user is None or not assistant_rows:
            return False
        # Mark the LAST assistant row of each message.id as the carrier of
        # usage_details (since every row of a message has the same full
        # `usage` block, attaching usage to all would N-way-inflate totals).
        # Rows without a message.id (rare, fallback) are each marked true.
        last_by_mid: Dict[str, AssistantRow] = {}
        for ar in assistant_rows:
            mid = get_message_id(ar.msg)
            if not mid:
                ar.is_last_in_message = True
                continue
            if mid in last_by_mid:
                last_by_mid[mid].is_last_in_message = False
            last_by_mid[mid] = ar
            ar.is_last_in_message = True
        turns.append(Turn(
            user_msg=current_user,
            user_ts=current_user_ts,
            assistant_rows=list(assistant_rows),
            tool_results_by_id=dict(tool_results),
            system_rows=list(system_rows),
        ))
        return True

    for row in rows:
        # ---- Global dedup by row uuid ----
        # The transcript sometimes re-writes the same row verbatim (session
        # resume, replay). Skip exact duplicates so they don't double-emit.
        ru = row.get("uuid") if isinstance(row, dict) else None
        if isinstance(ru, str):
            if ru in seen_row_uuids:
                continue
            seen_row_uuids.add(ru)

        ts = get_row_ts(row)
        rt = row.get("type") if isinstance(row, dict) else None

        # Capture system rows (API errors, retries, hook errors, throttling).
        # They get attached to whatever turn is currently being assembled.
        # Skip `stop_hook_summary` — it's just a record of the Stop hook's
        # own execution (including this script's stdout/stderr) and adds
        # noise to every trace without surfacing actionable signal.
        if rt == "system":
            if row.get("subtype") == "stop_hook_summary":
                continue
            system_rows.append(row)
            continue

        if is_tool_result(row):
            for tr in iter_tool_results(get_content(row)):
                tid = tr.get("tool_use_id")
                if tid:
                    tool_results[str(tid)] = ToolResultEntry(
                        content=tr.get("content"),
                        timestamp=ts,
                        is_error=bool(tr.get("is_error")),
                        source_row=row,
                    )
            continue

        role = get_role(row)
        if role == "user":
            flushed = flush()
            current_user = row
            current_user_ts = ts
            assistant_rows = []
            tool_results = {}
            # Only clear system_rows if a turn was actually emitted. Otherwise
            # they are orphans from the previous hook run's tail (e.g., the
            # `stop_hook_summary` row Claude Code writes AFTER the Stop hook
            # returns — by definition not visible in that hook's invocation,
            # so it shows up at the head of the next invocation before any
            # current_user is set). Carry them into the new turn so the data
            # isn't dropped on the floor.
            if flushed:
                system_rows = []
            continue

        if role == "assistant":
            if current_user is None:
                continue
            # Each JSONL row → its own AssistantRow → its own observation.
            # No merging by message.id; the user prefers row-level fidelity.
            assistant_rows.append(AssistantRow(msg=row, ts=ts))
            continue

    flush()
    return turns


# ---------------- Langfuse emit (OTel layer, explicit timestamps) ----------------
def _ns(ts: Optional[datetime]) -> Optional[int]:
    """datetime -> OTel-style nanoseconds since epoch."""
    if ts is None:
        return None
    if ts.tzinfo is None:
        ts = ts.replace(tzinfo=timezone.utc)
    return int(ts.timestamp() * 1_000_000_000)


def _json_dump(value: Any) -> str:
    try:
        return json.dumps(value, ensure_ascii=False, default=str)
    except Exception:
        return str(value)


def _set_attr(span, key: str, value: Any, *, json_encode: bool = False) -> None:
    if value is None:
        return
    try:
        if json_encode and not isinstance(value, str):
            span.set_attribute(key, _json_dump(value))
        elif isinstance(value, (str, int, float, bool)):
            span.set_attribute(key, value)
        elif isinstance(value, (list, tuple)) and all(isinstance(x, str) for x in value):
            span.set_attribute(key, list(value))
        else:
            span.set_attribute(key, _json_dump(value))
    except Exception as e:
        debug(f"set_attribute {key} failed: {e}")


def _flatten_metadata(span, meta: Dict[str, Any], prefix: str = LFA.OBSERVATION_METADATA) -> None:
    """Set a metadata dict as OTel span attributes — ONE attribute per
    top-level key. Nested dict values are JSON-encoded as a single
    attribute value, not recursively flattened into dot-separated keys.

    Why one-level only: Litefuse's UI parses each metadata attribute's
    value at display time. A JSON-encoded dict value (e.g.
    ``metadata.claude_code = '{"uuid":"abc","turn_number":26}'``) renders
    as a collapsible tree with the inner keys shown as children. Flat
    dot-separated keys (``metadata.claude_code.uuid``,
    ``metadata.claude_code.turn_number``) instead render as flat
    sibling rows with the prefix verbatim, defeating the purpose of
    the ``claude_code`` namespace.

    Mirrors ``langfuse-python==4.5.x`` ``_flatten_and_serialize_metadata``
    behavior (later v4 versions added recursion, which produces flat
    sibling rows — undesirable for our use).
    """
    if not isinstance(meta, dict):
        _set_attr(span, prefix, meta, json_encode=True)
        return
    for k, v in meta.items():
        if v is None:
            continue
        key = f"{prefix}.{k}"
        if isinstance(v, (str, int)):
            _set_attr(span, key, v)
        else:
            _set_attr(span, key, v, json_encode=True)


def _inline(target: Dict[str, Any], prefix: str, value: Any) -> None:
    """Flatten a nested dict into the parent metadata under prefixed keys.

    Example: _inline(md, "user_text", {truncated: False, orig_len: 42})
    adds md["user_text_truncated"] = False, md["user_text_orig_len"] = 42.
    """
    if not isinstance(value, dict):
        return
    for k, v in value.items():
        if v is None:
            continue
        target[f"{prefix}_{k}"] = v


def _strip_none(d: Dict[str, Any]) -> Dict[str, Any]:
    """Return a copy of ``d`` with ``None`` values dropped. Use on the
    ``cc_meta`` dicts before they go into ``{"claude_code": ...}`` so
    fields that genuinely weren't in the source JSONL (e.g.
    ``apiErrorStatus`` on a normal assistant row) don't materialize as
    ``null`` placeholders in the Litefuse metadata pane.
    """
    return {k: v for k, v in d.items() if v is not None}


def _build_message_inputs(rows: List[AssistantRow],
                          tool_results: Dict[str, ToolResultEntry],
                          user_text: str) -> Dict[str, Any]:
    """For each Anthropic message.id encountered in `rows`, compute the
    input that drove the LLM call producing that message. All rows of the
    same message share the same input (they are parts of one API call).

    Message #1's input is the user's prompt. Message #N's input (N>1) is
    the tool_result content produced by message #(N-1)'s tool_use blocks.
    """
    message_order: List[str] = []
    for ar in rows:
        mid = get_message_id(ar.msg)
        if mid and mid not in message_order:
            message_order.append(mid)
    inputs: Dict[str, Any] = {}
    for idx, mid in enumerate(message_order):
        if idx == 0:
            inputs[mid] = {"role": "user", "content": user_text}
            continue
        prev_mid = message_order[idx - 1]
        items: List[Dict[str, Any]] = []
        for prev_ar in rows:
            if get_message_id(prev_ar.msg) != prev_mid:
                continue
            for tu in iter_tool_uses(get_content(prev_ar.msg)):
                tid = str(tu.get("id") or "")
                tr = tool_results.get(tid)
                if tr is None:
                    continue
                out_text, _ = truncate_text(tr.content)
                items.append({"type": "tool_result", "tool_use_id": tid, "content": out_text})
        inputs[mid] = (
            {"role": "user", "content": items} if items
            else {"role": "user", "content": "(no tool results from previous message)"}
        )
    return inputs


def normalize_assistant_content(content: Any) -> Any:
    """Mirror Anthropic's assistant content array (text + tool_use blocks)
    so the generation's output isn't empty when the LLM emitted only
    tool_use blocks with no text preamble. Truncates large text/inputs.
    """
    if isinstance(content, str):
        text, _ = truncate_text(content)
        return text
    if not isinstance(content, list):
        return content
    out: List[Any] = []
    for block in content:
        if not isinstance(block, dict):
            out.append(block)
            continue
        btype = block.get("type")
        if btype == "text":
            text, _ = truncate_text(block.get("text", ""))
            out.append({"type": "text", "text": text})
        elif btype == "tool_use":
            in_obj = block.get("input")
            if isinstance(in_obj, str):
                in_obj, _ = truncate_text(in_obj)
            out.append({
                "type": "tool_use",
                "id": block.get("id"),
                "name": block.get("name"),
                "input": in_obj,
            })
        elif btype == "thinking":
            thinking_text, _ = truncate_text(block.get("thinking", ""))
            out.append({"type": "thinking", "thinking": thinking_text})
        else:
            out.append({"type": btype} if btype else block)
    return out


def _end_span(span, ts: Optional[datetime]) -> None:
    try:
        ns = _ns(ts)
        if ns is not None:
            span.end(end_time=ns)
        else:
            span.end()
    except Exception as e:
        debug(f"span.end failed: {e}")


def emit_turn(langfuse: Langfuse,
              session_id: str,
              turn_num: int,
              turn: Turn,
              transcript_path: Path) -> None:
    user_text_raw = extract_text(get_content(turn.user_msg))
    user_text, user_text_meta = truncate_text(user_text_raw)

    rows = turn.assistant_rows
    if not rows:
        return

    # Final text = the LAST text-bearing row in the turn (what the user sees
    # as the assistant's natural-language answer).
    final_text_raw = ""
    for ar in reversed(rows):
        c = get_content(ar.msg)
        if isinstance(c, list):
            for b in c:
                if isinstance(b, dict) and b.get("type") == "text":
                    final_text_raw = b.get("text", "")
                    break
            if final_text_raw:
                break
    final_text, final_text_meta = truncate_text(final_text_raw)

    # ---- Turn timing ----
    turn_start = turn.user_ts or rows[0].ts
    turn_end = rows[-1].ts
    for tr in turn.tool_results_by_id.values():
        if tr.timestamp and (turn_end is None or tr.timestamp > turn_end):
            turn_end = tr.timestamp

    tracer = getattr(langfuse, "_otel_tracer", None)
    if tracer is None:
        debug("langfuse._otel_tracer unavailable; skipping emit")
        return

    # ---- Environmental context (cwd, git branch, version, etc.) ----
    # These fields are duplicated on every transcript row; sample from the
    # user row (or first assistant row as fallback).
    ctx_row = turn.user_msg or (rows[0].msg if rows else {}) or {}
    cwd = ctx_row.get("cwd")
    git_branch = ctx_row.get("gitBranch")
    cc_version = ctx_row.get("version")
    entrypoint = ctx_row.get("entrypoint")
    user_type = ctx_row.get("userType")
    is_sidechain = bool(ctx_row.get("isSidechain"))
    permission_mode = ctx_row.get("permissionMode")
    user_prompt_id = (turn.user_msg or {}).get("promptId")

    # Aggregate token usage at trace level. Each row of a multi-row message
    # has the same `usage` block, so we sum only `is_last_in_message` rows
    # to avoid double-counting.
    turn_usage: Dict[str, int] = {}
    for ar in rows:
        if not ar.is_last_in_message:
            continue
        for k, v in extract_usage_details(ar.msg).items():
            turn_usage[k] = turn_usage.get(k, 0) + v

    distinct_models = sorted({get_model(ar.msg) for ar in rows if get_model(ar.msg)})

    # Per-message inputs (shared across rows of the same message).
    message_inputs = _build_message_inputs(rows, turn.tool_results_by_id, user_text)

    trace_tags = ["claude-code", "litefuse"]
    if is_sidechain:
        trace_tags.append("sidechain")
    if distinct_models:
        trace_tags.extend([f"model:{m}" for m in distinct_models])

    # Trace-level identity for filtering/grouping in the UI.
    # userId: env override > unix login > hostname > fallback.
    user_id = (os.environ.get("LITEFUSE_USER_ID")
               or os.environ.get("CC_LANGFUSE_USER_ID"))
    if not user_id:
        try:
            user_id = getpass.getuser()
        except Exception:
            try:
                user_id = socket.gethostname()
            except Exception:
                user_id = "claude-code"

    # ---- Turn (root) span ----
    turn_span = tracer.start_span(
        name=f"Claude Code - Turn {turn_num}",
        start_time=_ns(turn_start),
    )
    _set_attr(turn_span, LFA.AS_ROOT, True)
    _set_attr(turn_span, LFA.OBSERVATION_TYPE, "span")
    _set_attr(turn_span, LFA.TRACE_NAME, f"Claude Code - Turn {turn_num}")
    _set_attr(turn_span, LFA.TRACE_SESSION_ID, session_id)
    _set_attr(turn_span, LFA.TRACE_USER_ID, user_id)
    _set_attr(turn_span, LFA.TRACE_TAGS, trace_tags)
    if cc_version:
        _set_attr(turn_span, LFA.RELEASE, str(cc_version))
    # Note: trace.externalId is a first-class Litefuse column but v3 SDK
    # has no OTel attribute for it. Setting it requires generating the
    # trace_id deterministically from a seed via Langfuse.create_trace_id(),
    # which is a bigger refactor. The user_row_uuid in trace metadata serves
    # the same cross-reference need for now.
    _set_attr(turn_span, LFA.TRACE_INPUT, {"role": "user", "content": user_text}, json_encode=True)
    _set_attr(turn_span, LFA.OBSERVATION_INPUT, {"role": "user", "content": user_text}, json_encode=True)
    # User message content composition (text vs image blocks)
    user_content_summary = summarize_user_content(get_content(turn.user_msg or {}))

    # All Claude-Code-specific fields go under a single ``claude_code``
    # namespace so the trace's flat metadata stays organized: Litefuse-
    # standard fields stay at the top level and everything Claude-Code-
    # specific (JSONL row UUIDs, prompt id, git branch, permission mode,
    # truncation summaries, etc.) hangs off ``claude_code.*``.
    cc_meta: Dict[str, Any] = {
        # Derived
        "source": "claude-code",
        "turn_number": turn_num,
        "transcript_path": str(transcript_path),
        "models_used": distinct_models,
        # Verbatim JSONL fields from the user row (camelCase preserved)
        "sessionId": session_id,
        "uuid": (turn.user_msg or {}).get("uuid"),
        "parentUuid": (turn.user_msg or {}).get("parentUuid"),
        "promptId": user_prompt_id,
        "isMeta": (turn.user_msg or {}).get("isMeta"),
        "origin": (turn.user_msg or {}).get("origin"),
        "cwd": cwd,
        "gitBranch": git_branch,
        "version": cc_version,
        "entrypoint": entrypoint,
        "userType": user_type,
        "isSidechain": is_sidechain,
        "permissionMode": permission_mode,
    }
    # Flatten the nested truncation/summary dicts into prefixed scalar
    # keys (instead of one JSON-encoded dict per attribute).
    _inline(cc_meta, "user_text", user_text_meta)
    _inline(cc_meta, "user_content", user_content_summary)
    _flatten_metadata(turn_span, {"claude_code": _strip_none(cc_meta)})

    try:
        with otel_trace_api.use_span(turn_span, end_on_exit=False):
            # ---- "user message" event ----
            # Opening child observation that mirrors the closing
            # "Final response" generation. Carries the user's prompt as
            # input. Type=event so Litefuse renders it as a point-in-time
            # marker at the top of the timeline rather than a duration bar.
            # The turn_span already flushed every TRACE_* attribute above
            # so this event is purely structural — no AS_ROOT, no trace-
            # level attributes, just observation-level input + metadata.
            if turn.user_ts is not None:
                um_ev = tracer.start_span(
                    name="user message",
                    start_time=_ns(turn.user_ts),
                )
                _set_attr(um_ev, LFA.OBSERVATION_TYPE, "event")
                _set_attr(
                    um_ev, LFA.OBSERVATION_INPUT,
                    {"role": "user", "content": user_text}, json_encode=True,
                )
                um_meta_inner: Dict[str, Any] = {
                    "uuid": (turn.user_msg or {}).get("uuid"),
                    "parentUuid": (turn.user_msg or {}).get("parentUuid"),
                    "promptId": user_prompt_id,
                }
                _inline(um_meta_inner, "user_content", user_content_summary)
                _inline(um_meta_inner, "user_text", user_text_meta)
                _flatten_metadata(um_ev, {"claude_code": _strip_none(um_meta_inner)})
                _end_span(um_ev, turn.user_ts)  # instantaneous

            # ---- System events ----
            # Capture API errors, retry attempts, rate limits, hook failures
            # as Langfuse `event`-type observations. Without these the trace
            # has no explanation when a generation came back empty or a
            # tool stalled. Event type is "event" — excluded from graph view
            # nodes per Langfuse docs, so they show in timeline only.
            for sr in turn.system_rows:
                sr_ts = get_row_ts(sr)
                if sr_ts is None:
                    continue
                subtype = sr.get("subtype") or "system"
                lvl = (sr.get("level") or "").upper()
                # Promote retries / errors into a recognizable name
                name = f"System: {subtype}"
                if sr.get("error") or sr.get("apiErrorStatus") or lvl == "ERROR":
                    name = f"System error: {subtype}"
                elif sr.get("retryAttempt") is not None:
                    name = f"System retry #{sr.get('retryAttempt')}: {subtype}"

                ev_span = tracer.start_span(name=name, start_time=_ns(sr_ts))
                _set_attr(ev_span, LFA.OBSERVATION_TYPE, "event")
                # Bubble error level up so they're filterable
                if (sr.get("error") or sr.get("apiErrorStatus")
                        or sr.get("isApiErrorMessage") or lvl == "ERROR"):
                    _set_attr(ev_span, LFA.OBSERVATION_LEVEL, "ERROR")
                    msg = str(sr.get("error") or sr.get("apiErrorStatus") or subtype)
                    _set_attr(ev_span, LFA.OBSERVATION_STATUS_MESSAGE, msg[:500])
                elif lvl in ("WARN", "WARNING"):
                    _set_attr(ev_span, LFA.OBSERVATION_LEVEL, "WARNING")
                _flatten_metadata(ev_span, {"claude_code": _strip_none({
                    # Verbatim JSONL fields from the system row (camelCase preserved)
                    "subtype": subtype,
                    "level": sr.get("level"),
                    "cause": sr.get("cause"),
                    "error": sr.get("error"),
                    "uuid": sr.get("uuid"),
                    "parentUuid": sr.get("parentUuid"),
                    "stopReason": sr.get("stopReason"),
                    "toolUseID": sr.get("toolUseID"),
                    "hookCount": sr.get("hookCount"),
                    "hookErrors": sr.get("hookErrors"),
                    "hookInfos": sr.get("hookInfos"),
                    "retryAttempt": sr.get("retryAttempt"),
                    "maxRetries": sr.get("maxRetries"),
                    "retryInMs": sr.get("retryInMs"),
                    "preventedContinuation": sr.get("preventedContinuation"),
                    "hasOutput": sr.get("hasOutput"),
                })})
                _end_span(ev_span, sr_ts)  # instantaneous

            # Track the "previous activity end" timestamp so each generation
            # gets a real duration via `start = prev_end + 1ms`. Avoids
            # 0-duration spans AND keeps a 1ms gap from sibling tool spans
            # (which would otherwise produce X-pattern edges in the graph view).
            prev_end_ts: Optional[datetime] = turn.user_ts

            for i, ar in enumerate(rows):
                step_idx = i + 1
                model = get_model(ar.msg)
                content = get_content(ar.msg)
                blocks_in_row = content if isinstance(content, list) else []
                block_types = [b.get("type") for b in blocks_in_row if isinstance(b, dict)]
                tool_uses = iter_tool_uses(content)
                gen_output_content = normalize_assistant_content(content)
                gen_text_raw = extract_text(content)
                _, gen_text_meta = truncate_text(gen_text_raw)
                mid = get_message_id(ar.msg)

                # Naming convention (each row = one observation):
                #   - row with tool_use block(s): "Decision to call tool: X (#N)"
                #   - row with text block: "Final response (#N)" if last in
                #     turn, otherwise "Text response (#N)"
                #   - row with thinking block: "Thinking (#N)"
                #   - tool span (below): "Tool call: <name> (#N)"
                # Names avoid embedding sibling span names (no `→`) and the
                # (#N) suffix keeps each obs unique → graph view stays linear.
                tool_names = [tu.get("name") or "unknown" for tu in tool_uses]
                is_last_in_turn = (i == len(rows) - 1)
                if tool_names:
                    word = "tool" if len(tool_names) == 1 else "tools"
                    gen_span_name = f"Decision to call {word}: {', '.join(tool_names)} (#{step_idx})"
                elif "text" in block_types:
                    label = "Final response" if is_last_in_turn else "Text response"
                    gen_span_name = f"{label} (#{step_idx})"
                elif "thinking" in block_types:
                    gen_span_name = f"Thinking (#{step_idx})"
                else:
                    gen_span_name = f"Claude Response (#{step_idx})"

                # ---- Timing ----
                # start = previous activity end + 1ms (≥ the row's own ts is
                # impossible if the transcript is monotonic, so this should
                # always produce a non-negative duration).
                if prev_end_ts is not None:
                    candidate_start = prev_end_ts + timedelta(milliseconds=1)
                    if ar.ts is not None and candidate_start > ar.ts:
                        # Row written faster than the gap allows — fall back
                        # to a zero-duration span at ar.ts.
                        gen_start = ar.ts
                    else:
                        gen_start = candidate_start
                else:
                    gen_start = ar.ts

                gen_span = tracer.start_span(
                    name=gen_span_name,
                    start_time=_ns(gen_start),
                )
                _set_attr(gen_span, LFA.OBSERVATION_TYPE, "generation")
                _set_attr(gen_span, LFA.OBSERVATION_MODEL, model)

                # Input: shared by all rows of the same message (one LLM
                # API call produced them all, so they all "saw" the same
                # context). Looked up by message.id.
                gen_input = message_inputs.get(mid)
                if gen_input is not None:
                    _set_attr(gen_span, LFA.OBSERVATION_INPUT, gen_input, json_encode=True)

                # Output: just THIS row's content block(s). Don't merge with
                # sibling rows of the same message — the user explicitly
                # wants per-row fidelity.
                _set_attr(gen_span, LFA.OBSERVATION_OUTPUT,
                          {"role": "assistant", "content": gen_output_content},
                          json_encode=True)

                # Token usage: ONLY on the last row of each Anthropic message,
                # since every row of a multi-row message has the same usage
                # block — putting it on all rows would N-way-inflate totals.
                if ar.is_last_in_message:
                    usage_details = extract_usage_details(ar.msg)
                    if usage_details:
                        _set_attr(gen_span, LFA.OBSERVATION_USAGE_DETAILS,
                                  usage_details, json_encode=True)

                # Error level if the API returned an error (rate limit etc.)
                if is_api_error(ar.msg):
                    _set_attr(gen_span, LFA.OBSERVATION_LEVEL, "ERROR")
                    err_msg = str(ar.msg.get("error") or ar.msg.get("apiErrorStatus") or "api error")
                    _set_attr(gen_span, LFA.OBSERVATION_STATUS_MESSAGE, err_msg[:500])

                # Anthropic raw usage block has non-counter fields (service_tier,
                # speed, iterations) that don't belong in usage_details; surface
                # them in metadata instead.
                raw_usage = get_usage(ar.msg)
                gen_cc_meta: Dict[str, Any] = {
                    # Derived
                    "step_index": i,
                    "is_last_in_message": ar.is_last_in_message,
                    # Verbatim JSONL fields from the assistant row (camelCase preserved)
                    "uuid": ar.msg.get("uuid"),
                    # placeholder; assistant_text_* fields added below when truncated
                    "parentUuid": ar.msg.get("parentUuid"),
                    "requestId": ar.msg.get("requestId"),
                    "apiErrorStatus": ar.msg.get("apiErrorStatus"),
                    "isApiErrorMessage": ar.msg.get("isApiErrorMessage"),
                    "error": ar.msg.get("error"),
                    # Verbatim fields from assistant row's `message` block
                    # (Anthropic-side; already snake_case).
                    "id": get_message_id(ar.msg),
                    "stop_reason": get_msg_field(ar.msg, "stop_reason"),
                    "stop_sequence": get_msg_field(ar.msg, "stop_sequence"),
                    "stop_details": get_msg_field(ar.msg, "stop_details"),
                    "diagnostics": get_msg_field(ar.msg, "diagnostics"),
                    "context_management": get_msg_field(ar.msg, "context_management"),
                    "container": get_msg_field(ar.msg, "container"),
                    # Verbatim fields from `message.usage` (Anthropic-side).
                    "service_tier": raw_usage.get("service_tier"),
                    "speed": raw_usage.get("speed"),
                    "inference_geo": raw_usage.get("inference_geo"),
                    "iterations": raw_usage.get("iterations"),
                    "cache_creation": raw_usage.get("cache_creation"),
                    "server_tool_use": raw_usage.get("server_tool_use"),
                }
                # Truncation meta only when the text was actually truncated —
                # writing {truncated: False, orig_len: N} on every gen adds
                # noise. Flattened to match user_text / input / output style.
                if (gen_text_raw and isinstance(gen_text_meta, dict)
                        and gen_text_meta.get("truncated")):
                    _inline(gen_cc_meta, "assistant_text", gen_text_meta)
                _flatten_metadata(gen_span, {"claude_code": _strip_none(gen_cc_meta)})
                gen_end = ar.ts
                _end_span(gen_span, gen_end)

                # Update prev_end for the next iteration. If this row has no
                # tool_use blocks, the next obs follows directly after gen.end.
                prev_end_ts = gen_end

                # ---- Tool spans for tool_use blocks in THIS row ----
                # Each tool_use in this row gets its own tool span. Start at
                # ar.ts + 1ms (avoids boundary touching with the gen's end
                # → keeps Litefuse's graph view linear).
                tool_start = (ar.ts + timedelta(milliseconds=1)) if ar.ts else None
                latest_tool_end: Optional[datetime] = None
                for tu in tool_uses:
                    tid = str(tu.get("id") or "")
                    tr = turn.tool_results_by_id.get(tid)

                    in_obj = tu.get("input")
                    if isinstance(in_obj, str):
                        in_obj, in_meta = truncate_text(in_obj)
                    else:
                        in_meta = None

                    out_obj: Any = None
                    out_meta: Optional[Dict[str, Any]] = None
                    if tr is not None:
                        # Normalize tool_result.content: it can be a string,
                        # or a list mixing text/image blocks. The normalizer
                        # strips base64 image data (kept as size placeholder)
                        # so the trace stays small.
                        normalized = normalize_tool_result_content(tr.content)
                        out_obj, out_meta = truncate_text(normalized)

                    tool_end = tr.timestamp if (tr and tr.timestamp) else tool_start

                    tool_span = tracer.start_span(
                        name=f"Tool call: {tu.get('name') or 'unknown'} (#{step_idx})",
                        start_time=_ns(tool_start),
                    )
                    _set_attr(tool_span, LFA.OBSERVATION_TYPE, "tool")
                    _set_attr(tool_span, LFA.OBSERVATION_INPUT, in_obj, json_encode=True)
                    _set_attr(tool_span, LFA.OBSERVATION_OUTPUT, out_obj, json_encode=True)

                    # Surface tool-side errors as ERROR level so Litefuse
                    # can filter/aggregate them.
                    if tr is not None and tr.is_error:
                        _set_attr(tool_span, LFA.OBSERVATION_LEVEL, "ERROR")
                        err_preview = (tr.content if isinstance(tr.content, str)
                                       else json.dumps(tr.content, ensure_ascii=False)[:200])
                        _set_attr(tool_span, LFA.OBSERVATION_STATUS_MESSAGE, str(err_preview)[:500])

                    # toolUseResult on the user row has tool-type-specific
                    # execution details. The shape varies: Bash has stdout/
                    # stderr/exit, WebFetch has code/bytes, Task has agentId/
                    # agentType, BashOutput has backgroundTaskId, etc. The
                    # helper pulls a flat summary of any field that's present.
                    tur = (tr.source_row.get("toolUseResult")
                           if (tr and tr.source_row) else None)
                    tur_summary = extract_tool_use_result_summary(tur)

                    tool_cc_meta: Dict[str, Any] = {
                        # Derived
                        "step_index": i,
                        # Verbatim from the tool_use block (in assistant content)
                        "name": tu.get("name") or "unknown",
                        "tool_use_id": tid,
                        # Verbatim from the tool_result block (already snake_case)
                        "is_error": tr.is_error if tr else None,
                        # Verbatim from the user row carrying the tool_result
                        "uuid": (tr.source_row.get("uuid")
                                 if (tr and tr.source_row) else None),
                        "parentUuid": (tr.source_row.get("parentUuid")
                                       if (tr and tr.source_row) else None),
                        "sourceToolAssistantUUID": (
                            tr.source_row.get("sourceToolAssistantUUID")
                            if (tr and tr.source_row) else None),
                        "toolUseResult": tur_summary or None,
                    }
                    # Flatten the truncation meta dicts into prefixed keys.
                    _inline(tool_cc_meta, "input", in_meta)
                    _inline(tool_cc_meta, "output", out_meta)
                    _flatten_metadata(tool_span, {"claude_code": _strip_none(tool_cc_meta)})
                    _end_span(tool_span, tool_end)

                    if tool_end is not None and (latest_tool_end is None
                                                 or tool_end > latest_tool_end):
                        latest_tool_end = tool_end

                # If this row had tool_use blocks, the next observation
                # follows AFTER the latest tool finished. Otherwise stays
                # at this row's end.
                if latest_tool_end is not None:
                    prev_end_ts = latest_tool_end
    finally:
        # finalize Turn outputs and close root
        _set_attr(turn_span, LFA.OBSERVATION_OUTPUT,
                  {"role": "assistant", "content": final_text}, json_encode=True)
        _set_attr(turn_span, LFA.TRACE_OUTPUT,
                  {"role": "assistant", "content": final_text}, json_encode=True)
        _end_span(turn_span, turn_end)


# ---------------- Main ----------------
def main() -> int:
    start = time.time()
    debug("Hook started")

    if os.environ.get("TRACE_TO_LANGFUSE", "").lower() != "true":
        return 0

    public_key = os.environ.get("LANGFUSE_PUBLIC_KEY")
    secret_key = os.environ.get("LANGFUSE_SECRET_KEY")
    host = os.environ.get("LANGFUSE_BASE_URL") or "https://cloud.langfuse.com"

    if not public_key or not secret_key:
        return 0

    payload = read_hook_payload()
    session_id, transcript_path = extract_session_and_transcript(payload)

    if not session_id or not transcript_path:
        debug("Missing session_id or transcript_path; exiting.")
        return 0

    if not transcript_path.exists():
        debug(f"Transcript path does not exist: {transcript_path}")
        return 0

    langfuse: Optional[Langfuse] = None
    try:
        langfuse = Langfuse(public_key=public_key, secret_key=secret_key, host=host)
    except Exception as e:
        debug(f"Langfuse client init failed: {e}")
        return 0

    emitted = 0
    deferred = 0
    try:
        with FileLock(LOCK_FILE):
            state = load_state()
            key = state_key(session_id, str(transcript_path))
            ss = load_session_state(state, key)

            old_offset = ss.offset

            msgs, ss = read_new_jsonl(transcript_path, ss)
            if not msgs:
                write_session_state(state, key, ss)
                save_state(state)
                return 0

            turns = build_turns(msgs)
            if not turns:
                write_session_state(state, key, ss)
                save_state(state)
                return 0

            emitted_set = set(ss.emitted_user_uuids)
            last_idx = len(turns) - 1
            for i, t in enumerate(turns):
                uuid = (t.user_msg or {}).get("uuid")
                # Dedup: skip a turn whose user_msg.uuid we've already
                # emitted in a previous firing (possible when we rewind
                # the offset to recover a deferred turn).
                if uuid and uuid in emitted_set:
                    continue

                is_last = (i == last_idx)
                # Defer ONLY the last turn (the agent's in-progress
                # work). Earlier turns are followed by another user
                # message in the JSONL, so they're guaranteed complete
                # regardless of whether their last assistant row
                # happens to be a tool_use rather than text (could
                # happen if the agent crashed / was interrupted on a
                # past turn — we still want that incomplete trace).
                if is_last and not _is_turn_complete(t):
                    deferred += 1
                    ss.offset = old_offset
                    break

                emitted += 1
                turn_num = ss.turn_count + emitted
                try:
                    emit_turn(langfuse, session_id, turn_num, t, transcript_path)
                    if uuid:
                        emitted_set.add(uuid)
                except Exception as e:
                    debug(f"emit_turn failed: {e}")

            ss.turn_count += emitted
            ss.emitted_user_uuids = list(emitted_set)
            write_session_state(state, key, ss)
            save_state(state)

        try:
            langfuse.flush()
        except Exception:
            pass

        dur = time.time() - start
        if deferred:
            info(
                f"Processed {emitted} turns, deferred {deferred} (in-progress) "
                f"in {dur:.2f}s (session={session_id})"
            )
        else:
            info(f"Processed {emitted} turns in {dur:.2f}s (session={session_id})")
        return 0

    except Exception as e:
        debug(f"Unexpected failure: {e}")
        return 0

    finally:
        if langfuse is not None:
            try:
                langfuse.shutdown()
            except Exception:
                pass


if __name__ == "__main__":
    sys.exit(main())
