Skip to content

Domain Skills — API

Domain skills cover specific enterprise capabilities — agentic_rag (with its four provider families), weather, and database. Decision rationale lives in the ADR index; operational guidance lives in the skill pages.

Agentic RAG

mirai_shared_skills.agentic_rag

Agentic RAG package — multi-source retrieval orchestrator.

DEFAULT_CITATION_SAFETY_BUFFER_TOKENS module-attribute

DEFAULT_CITATION_SAFETY_BUFFER_TOKENS = 64

DEFAULT_TOKEN_BUDGET module-attribute

DEFAULT_TOKEN_BUDGET = 8000

QWEN3_LONG_CONTEXT_TOKENS module-attribute

QWEN3_LONG_CONTEXT_TOKENS: int = 32768

SourceName module-attribute

SourceName = Literal['neo4j', 'azure-search', 'web']

__all__ module-attribute

__all__ = [
    "DEFAULT_CITATION_SAFETY_BUFFER_TOKENS",
    "DEFAULT_TOKEN_BUDGET",
    "QWEN3_LONG_CONTEXT_TOKENS",
    "AgenticRAGSkill",
    "AzureSearchConfig",
    "AzureSearchProvider",
    "BrowserWebSearchProvider",
    "CohereRerankProvider",
    "Neo4jConnection",
    "Neo4jGraphProvider",
    "Neo4jUnavailableError",
    "NoOpRerankerProvider",
    "Qwen3RerankerProvider",
    "RAGContextChunk",
    "RerankerConfig",
    "RerankerProvider",
    "RetrievalQuery",
    "SourceMetadata",
    "SourceName",
    "WebSearchProvider",
    "estimate_citation_tokens",
    "truncate_chunks_to_budget",
]

AgenticRAGSkill

AgenticRAGSkill(
    *,
    neo4j: Neo4jGraphProvider | None = None,
    azure: AzureSearchProvider | None = None,
    web: WebSearchProvider | None = None,
    reranker: RerankerProvider | None = None,
    token_budget: int = DEFAULT_TOKEN_BUDGET,
    citation_buffer_tokens: int = DEFAULT_CITATION_SAFETY_BUFFER_TOKENS,
)

Bases: BaseSkill

Multi-source retrieval orchestrator (Graph + Hybrid Vector + Web).

Source code in mirai_shared_skills/agentic_rag/skill.py
def __init__(
    self,
    *,
    neo4j: Neo4jGraphProvider | None = None,
    azure: AzureSearchProvider | None = None,
    web: WebSearchProvider | None = None,
    reranker: RerankerProvider | None = None,
    token_budget: int = DEFAULT_TOKEN_BUDGET,
    citation_buffer_tokens: int = DEFAULT_CITATION_SAFETY_BUFFER_TOKENS,
) -> None:
    self._neo4j = neo4j
    self._azure = azure
    self._web = web or BrowserWebSearchProvider()
    self._reranker: RerankerProvider = reranker or NoOpRerankerProvider()
    self._token_budget = token_budget
    self._citation_buffer = citation_buffer_tokens

description property

description: str

instructions property

instructions: str

name property

name: str

aclose async

aclose() -> None
Source code in mirai_shared_skills/agentic_rag/skill.py
async def aclose(self) -> None:
    if self._neo4j is not None:
        await self._neo4j.aclose()
    if self._azure is not None:
        await self._azure.aclose()

get_tools

get_tools() -> list[Tool]
Source code in mirai_shared_skills/agentic_rag/skill.py
def get_tools(self) -> list[Tool]:
    async def graph_retrieval(
        query: str,
        top_k: int = 5,
        cypher_template: str | None = None,
        parameters: dict[str, Any] | None = None,
    ) -> str:
        """Execute a Cypher template against Neo4j and return chunks."""
        return await self._wrap(
            query,
            self._graph_retrieval_impl(query, top_k, cypher_template, parameters),
            "neo4j_error",
            top_k=top_k,
        )

    async def graph_schema() -> str:
        """Return the labels and relationships present in the graph."""
        return await self._graph_schema_impl()

    async def verify_graph_plugins() -> str:
        """Check whether APOC and GDS plugins are active in the graph."""
        return await self._verify_plugins_impl()

    async def list_cypher_templates() -> str:
        """Return the names + bodies of available Cypher templates."""
        return json.dumps({"templates": TEMPLATES}, indent=2)

    async def enterprise_search(
        query: str,
        top_k: int = 5,
        odata_filter: str | None = None,
    ) -> str:
        """Hybrid search against Azure AI Search with optional OData filter."""
        return await self._wrap(
            query,
            self._enterprise_search_impl(query, top_k, odata_filter),
            "azure_error",
            top_k=top_k,
        )

    async def web_intelligence(query: str, top_k: int = 5) -> str:
        """Fan out to the configured web search provider for live context."""
        return await self._wrap(
            query,
            self._web_intelligence_impl(query, top_k),
            "web_error",
            top_k=top_k,
        )

    async def parallel_retrieval(query: str, top_k: int = 5) -> str:
        """Run every configured source in parallel and merge the chunks."""
        return await self._parallel_retrieval_impl(query, top_k)

    return [
        tool_from_function(async_fn=graph_retrieval),
        tool_from_function(async_fn=graph_schema),
        tool_from_function(async_fn=verify_graph_plugins),
        tool_from_function(async_fn=list_cypher_templates),
        tool_from_function(async_fn=enterprise_search),
        tool_from_function(async_fn=web_intelligence),
        tool_from_function(async_fn=parallel_retrieval),
    ]

AzureSearchConfig dataclass

AzureSearchConfig(
    endpoint: str,
    index_name: str,
    api_key: str,
    api_version: str = DEFAULT_API_VERSION,
    semantic_configuration: str | None = None,
    vector_fields: tuple[str, ...] = DEFAULT_VECTOR_FIELDS,
)

Connection params for an Azure AI Search index.

api_key instance-attribute

api_key: str

api_version class-attribute instance-attribute

api_version: str = DEFAULT_API_VERSION

endpoint instance-attribute

endpoint: str

index_name instance-attribute

index_name: str

semantic_configuration class-attribute instance-attribute

semantic_configuration: str | None = None

vector_fields class-attribute instance-attribute

vector_fields: tuple[str, ...] = DEFAULT_VECTOR_FIELDS

AzureSearchProvider

AzureSearchProvider(
    config: AzureSearchConfig,
    *,
    client: AsyncClient | None = None,
    embedder: EmbeddingFn | None = None,
    timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
)

Async hybrid-search wrapper over the Azure AI Search REST API.

Source code in mirai_shared_skills/agentic_rag/providers/azure.py
def __init__(
    self,
    config: AzureSearchConfig,
    *,
    client: httpx.AsyncClient | None = None,
    embedder: EmbeddingFn | None = None,
    timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
) -> None:
    self._config = config
    self._client = client
    self._owns_client = client is None
    self._embedder = embedder
    self._timeout_seconds = timeout_seconds

aclose async

aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/azure.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None

search async

search(
    query: str,
    *,
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
) -> list[RAGContextChunk]

Issue a hybrid search and project the response into context chunks.

Source code in mirai_shared_skills/agentic_rag/providers/azure.py
async def search(
    self,
    query: str,
    *,
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
) -> list[RAGContextChunk]:
    """Issue a hybrid search and project the response into context chunks."""
    client = await self._ensure_client()
    body: dict[str, Any] = {
        "search": query,
        "top": top_k,
        "queryType": "semantic" if self._config.semantic_configuration else "simple",
    }
    if self._config.semantic_configuration:
        body["semanticConfiguration"] = self._config.semantic_configuration
    if filters and isinstance(filters.get("odata"), str):
        body["filter"] = filters["odata"]
    vector = await self._embed(query)
    if vector is not None:
        body["vectorQueries"] = [
            {
                "kind": "vector",
                "vector": list(vector),
                "k": top_k,
                "fields": ",".join(self._config.vector_fields),
            }
        ]
    path = f"/indexes/{self._config.index_name}/docs/search"
    params = {"api-version": self._config.api_version}
    response = await client.post(path, params=params, json=body)
    response.raise_for_status()
    payload = response.json()
    chunks: list[RAGContextChunk] = []
    for entry in payload.get("value", [])[:top_k]:
        identifier = str(entry.get("id") or entry.get("@search.documentId") or "unknown")
        text = str(entry.get("content") or entry.get("text") or "")
        if not text:
            continue
        score = entry.get("@search.rerankerScore") or entry.get("@search.score")
        chunks.append(
            RAGContextChunk(
                text=text,
                metadata=SourceMetadata(
                    source="azure-search",
                    identifier=identifier,
                    score=float(score) if score is not None else None,
                    extra={
                        k: v
                        for k, v in entry.items()
                        if k not in {"content", "text"} and not k.startswith("@search.")
                    },
                ),
            )
        )
    return chunks

BrowserWebSearchProvider

BrowserWebSearchProvider(
    *,
    browser: AgentBrowserSkill | None = None,
    url_builder: UrlBuilder = default_duckduckgo_url,
)

Bases: WebSearchProvider

Default WebSearchProvider that delegates to AgentBrowserSkill.

The url_builder callable controls which search engine is queried, so the same provider works with DuckDuckGo, Bing, Brave, or any private enterprise search portal.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
def __init__(
    self,
    *,
    browser: AgentBrowserSkill | None = None,
    url_builder: UrlBuilder = default_duckduckgo_url,
) -> None:
    self._browser = browser or AgentBrowserSkill()
    self._url_builder = url_builder
    self._cached_browse: Callable[..., Awaitable[str]] | None = None

search async

search(
    query: str, *, top_k: int = 5
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/web.py
async def search(self, query: str, *, top_k: int = 5) -> list[RAGContextChunk]:
    url = self._url_builder(query)
    browse = self._browse_callable()
    raw = await browse(url=url)
    payload = json.loads(raw)
    if "error" in payload:
        return []
    text: str = payload.get("text", "")
    if not text:
        return []
    # Split the body into paragraph-ish chunks and keep the top_k strongest.
    candidates = [block.strip() for block in text.split("\n\n") if block.strip()]
    if not candidates:
        candidates = [text]
    chunks: list[RAGContextChunk] = []
    for index, snippet in enumerate(candidates[:top_k]):
        chunks.append(
            RAGContextChunk(
                text=snippet,
                metadata=SourceMetadata(
                    source="web",
                    identifier=f"{payload.get('final_url') or url}#chunk-{index}",
                    score=None,
                    extra={
                        "search_url": url,
                        "status_code": payload.get("status_code"),
                        "final_url": payload.get("final_url"),
                    },
                ),
            )
        )
    return chunks

CohereRerankProvider

CohereRerankProvider(
    api_key: str,
    *,
    model: str = DEFAULT_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    config: RerankerConfig | None = None,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
)

Bases: RerankerProvider

Cohere Rerank v4 over the public REST API.

Requires a CO_API_KEY. The provider is async-first and reuses a single httpx.AsyncClient for connection pooling. Pass an existing client to share pools with surrounding application code.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
def __init__(
    self,
    api_key: str,
    *,
    model: str = DEFAULT_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    config: RerankerConfig | None = None,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
) -> None:
    self._api_key = api_key
    self._model = model
    self._endpoint = endpoint.rstrip("/")
    self._config = config or RerankerConfig()
    self._client = client
    self._owns_client = client is None
    self._timeout_seconds = timeout_seconds

DEFAULT_ENDPOINT class-attribute instance-attribute

DEFAULT_ENDPOINT = 'https://api.cohere.com'

DEFAULT_MODEL class-attribute instance-attribute

DEFAULT_MODEL = 'rerank-v3.5'

aclose async

aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None

rerank async

rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    if not chunks:
        return []
    limit = top_k if top_k is not None else self._config.top_k
    client = await self._ensure_client()
    response = await client.post(
        "/v2/rerank",
        json={
            "model": self._model,
            "query": query,
            "documents": [chunk.text for chunk in chunks],
            "top_n": limit,
            "max_tokens_per_doc": self._config.max_context_tokens,
        },
    )
    response.raise_for_status()
    payload = response.json()
    scored: list[RAGContextChunk] = []
    for entry in payload.get("results", []):
        index = int(entry.get("index", -1))
        if index < 0 or index >= len(chunks):
            continue
        score = float(entry.get("relevance_score", 0.0))
        scored.append(_stamp_score(chunks[index], score))
    return _apply_score_filters(
        scored,
        threshold=self._config.score_threshold,
        top_k=limit,
    )

Neo4jConnection dataclass

Neo4jConnection(
    uri: str,
    user: str,
    password: str,
    database: str | None = None,
)

Connection params for the Neo4j async driver.

database class-attribute instance-attribute

database: str | None = None

password instance-attribute

password: str

uri instance-attribute

uri: str

user instance-attribute

user: str

Neo4jGraphProvider

Neo4jGraphProvider(
    connection: Neo4jConnection | None = None,
    *,
    driver: _AsyncDriver | None = None,
)

Async Neo4j wrapper exposing schema introspection plus parameterised queries.

Test seams: callers can inject driver directly so unit tests bypass the real driver factory. Otherwise the driver is created lazily on first use and reused (singleton) for the lifetime of the provider instance.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
def __init__(
    self,
    connection: Neo4jConnection | None = None,
    *,
    driver: _AsyncDriver | None = None,
) -> None:
    if connection is None and driver is None:
        raise ValueError("Neo4jGraphProvider requires either `connection` or `driver`.")
    self._connection = connection
    self._driver: _AsyncDriver | None = driver

aclose async

aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def aclose(self) -> None:
    if self._driver is not None:
        await self._driver.close()
        self._driver = None

describe_schema async

describe_schema() -> dict[str, list[str]]

Return labels and relationship types known to the graph.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def describe_schema(self) -> dict[str, list[str]]:
    """Return labels and relationship types known to the graph."""
    async with self._session() as session:
        label_rows = await session.run("CALL db.labels()")
        labels = [row.data().get("label", "") async for row in label_rows]
        rel_rows = await session.run("CALL db.relationshipTypes()")
        relationships = [row.data().get("relationshipType", "") async for row in rel_rows]
    return {
        "labels": [label for label in labels if label],
        "relationships": [rel for rel in relationships if rel],
    }

execute async

execute(
    cypher: str,
    parameters: Mapping[str, Any] | None = None,
    *,
    top_k: int = 5,
) -> list[RAGContextChunk]

Run a Cypher statement and project the rows into RAGContextChunks.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def execute(
    self,
    cypher: str,
    parameters: Mapping[str, Any] | None = None,
    *,
    top_k: int = 5,
) -> list[RAGContextChunk]:
    """Run a Cypher statement and project the rows into `RAGContextChunk`s."""
    chunks: list[RAGContextChunk] = []
    async with self._session() as session:
        result = await session.run(cypher, parameters or {})
        count = 0
        async for record in result:
            if count >= top_k:
                break
            row = record.data()
            identifier = str(
                row.get("id") or row.get("identifier") or f"row-{count}",
            )
            text = str(row.get("text") or row)
            chunks.append(
                RAGContextChunk(
                    text=text,
                    metadata=SourceMetadata(
                        source="neo4j",
                        identifier=identifier,
                        score=None,
                        extra={"row": row},
                    ),
                )
            )
            count += 1
    return chunks

verify_plugins async

verify_plugins() -> dict[str, Any]

Probe the graph for APOC and GDS availability.

Returns a status dict shaped like::

{"apoc": True, "gds": False, "detail": {"apoc": "...", "gds": "..."}}

The probe runs lightweight introspection calls (apoc.help('apoc') and gds.list()) and never raises — when a procedure is missing the corresponding flag is set to False and the failure reason is captured in detail.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def verify_plugins(self) -> dict[str, Any]:
    """Probe the graph for APOC and GDS availability.

    Returns a status dict shaped like::

        {"apoc": True, "gds": False, "detail": {"apoc": "...", "gds": "..."}}

    The probe runs lightweight introspection calls (`apoc.help('apoc')`
    and `gds.list()`) and never raises — when a procedure is missing the
    corresponding flag is set to `False` and the failure reason is
    captured in `detail`.
    """
    status: dict[str, Any] = {"apoc": False, "gds": False, "detail": {}}
    async with self._session() as session:
        for plugin, probe in (
            ("apoc", "CALL apoc.help('apoc') YIELD name RETURN count(name) AS n"),
            ("gds", "CALL gds.list() YIELD name RETURN count(name) AS n"),
        ):
            try:
                result = await session.run(probe)
                count = 0
                async for record in result:
                    count = int(record.data().get("n", 0) or 0)
                    break
                status[plugin] = count > 0
                if count == 0:
                    status["detail"][plugin] = "no procedures registered"
            except Exception as exc:  # noqa: BLE001 — translate driver errors
                status[plugin] = False
                status["detail"][plugin] = str(exc)
    return status

Neo4jUnavailableError

Bases: RuntimeError

Raised when the optional neo4j dependency is missing.

NoOpRerankerProvider

Bases: RerankerProvider

Passthrough reranker — preserves original ordering and trims to top_k.

rerank async

rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    limit = top_k if top_k is not None else len(chunks)
    return list(chunks[:limit])

Qwen3RerankerProvider

Qwen3RerankerProvider(
    endpoint: str,
    *,
    api_key: str | None = None,
    model: str = DEFAULT_MODEL,
    config: RerankerConfig | None = None,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
)

Bases: RerankerProvider

Qwen3-Reranker-4B served via an OpenAI-compatible inference endpoint.

The provider POSTs {query, documents, top_n} to endpoint and expects a {results: [{index, score}]} envelope. vLLM, TGI, and Together AI all expose this shape for cross-encoder reranker models.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
def __init__(
    self,
    endpoint: str,
    *,
    api_key: str | None = None,
    model: str = DEFAULT_MODEL,
    config: RerankerConfig | None = None,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
) -> None:
    self._endpoint = endpoint.rstrip("/")
    self._api_key = api_key
    self._model = model
    self._config = config or RerankerConfig()
    self._client = client
    self._owns_client = client is None
    self._timeout_seconds = timeout_seconds

DEFAULT_MODEL class-attribute instance-attribute

DEFAULT_MODEL = 'Qwen/Qwen3-Reranker-4B'

aclose async

aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None

rerank async

rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    if not chunks:
        return []
    limit = top_k if top_k is not None else self._config.top_k
    client = await self._ensure_client()
    response = await client.post(
        "/v1/rerank",
        json={
            "model": self._model,
            "query": query,
            "documents": [chunk.text for chunk in chunks],
            "top_n": limit,
            "max_tokens": self._config.max_context_tokens,
        },
    )
    response.raise_for_status()
    payload = response.json()
    scored: list[RAGContextChunk] = []
    # Accept both `results` (Cohere-style) and `data` (vLLM-style) envelopes
    # so the same provider talks to multiple inference backends without
    # bespoke client code per host.
    rows: Iterable[dict[str, Any]] = payload.get("results") or payload.get("data") or []
    for entry in rows:
        index = int(entry.get("index", -1))
        if index < 0 or index >= len(chunks):
            continue
        raw_score = entry.get("relevance_score")
        if raw_score is None:
            raw_score = entry.get("score", 0.0)
        score = float(raw_score if raw_score is not None else 0.0)
        scored.append(_stamp_score(chunks[index], score))
    return _apply_score_filters(
        scored,
        threshold=self._config.score_threshold,
        top_k=limit,
    )

RAGContextChunk

Bases: BaseModel

A single unit of context returned by any retrieval provider.

metadata class-attribute instance-attribute

metadata: SourceMetadata = Field(
    description="Provenance metadata for citation."
)

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='forbid')

text class-attribute instance-attribute

text: str = Field(
    min_length=1,
    description="Plain-text content for the LLM to read.",
)

RerankerConfig dataclass

RerankerConfig(
    top_k: int = 5,
    max_context_tokens: int = QWEN3_LONG_CONTEXT_TOKENS,
    score_threshold: float | None = None,
    extra_headers: dict[str, str] = dict(),
)

Common reranker configuration knobs shared by every backend.

Attributes:

Name Type Description
top_k int

Maximum number of chunks the reranker should return.

max_context_tokens int

Per-pair input token budget. Defaults to the 32k window supported by Qwen3-Reranker-4B; Cohere Rerank v4 also operates against long-form documents and respects this hint.

score_threshold float | None

Optional minimum score; chunks below it are dropped.

extra_headers dict[str, str]

Extra headers forwarded with every request.

extra_headers class-attribute instance-attribute

extra_headers: dict[str, str] = field(default_factory=dict)

max_context_tokens class-attribute instance-attribute

max_context_tokens: int = QWEN3_LONG_CONTEXT_TOKENS

score_threshold class-attribute instance-attribute

score_threshold: float | None = None

top_k class-attribute instance-attribute

top_k: int = 5

RerankerProvider

Bases: ABC

Abstract base for any reranker implementation.

rerank abstractmethod async

rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]

Return chunks reordered by relevance to query, truncated to top_k.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
@abstractmethod
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    """Return chunks reordered by relevance to `query`, truncated to `top_k`."""

RetrievalQuery

Bases: BaseModel

Caller-supplied query envelope sent to the retrieval providers.

filters class-attribute instance-attribute

filters: dict[str, Any] | None = Field(
    default=None,
    description="Provider-specific filter map (e.g. OData filter for Azure).",
)

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="forbid", str_strip_whitespace=True
)

query class-attribute instance-attribute

query: str = Field(
    min_length=1,
    description="Natural-language query string.",
)

top_k class-attribute instance-attribute

top_k: int = Field(
    default=5,
    ge=1,
    le=50,
    description="Maximum chunks to return.",
)

SourceMetadata

Bases: BaseModel

Provenance attached to every retrieved chunk so the LLM can cite sources.

extra class-attribute instance-attribute

extra: dict[str, Any] = Field(
    default_factory=dict,
    description="Free-form provider extras (URL, label, embedding, etc.).",
)

identifier class-attribute instance-attribute

identifier: str = Field(
    min_length=1,
    description="Stable ID of the underlying record.",
)

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='forbid')

score class-attribute instance-attribute

score: float | None = Field(
    default=None,
    description="Provider-reported relevance score, when available.",
)

source class-attribute instance-attribute

source: SourceName = Field(
    description="Provider identifier emitting this chunk."
)

WebSearchProvider

Bases: ABC

Abstract base for any web search backend.

search abstractmethod async

search(
    query: str, *, top_k: int = 5
) -> list[RAGContextChunk]

Return chunks containing live web text for the given query.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
@abstractmethod
async def search(self, query: str, *, top_k: int = 5) -> list[RAGContextChunk]:
    """Return chunks containing live web text for the given query."""

estimate_citation_tokens

estimate_citation_tokens(
    chunks: Sequence[RAGContextChunk],
) -> int

Return the approximate token cost of citing every chunk in chunks.

Each citation reserves space for the source label, the identifier (often a URL), and the JSON-serialised metadata extras the LLM may surface back to the user. Token counts are estimated as chars / 4 to stay tokeniser-free.

Source code in mirai_shared_skills/agentic_rag/skill.py
def estimate_citation_tokens(chunks: Sequence[RAGContextChunk]) -> int:
    """Return the approximate token cost of citing every chunk in `chunks`.

    Each citation reserves space for the source label, the identifier (often a
    URL), and the JSON-serialised metadata extras the LLM may surface back to
    the user. Token counts are estimated as `chars / 4` to stay tokeniser-free.
    """
    total_chars = 0
    for chunk in chunks:
        meta = chunk.metadata
        total_chars += len(meta.source) + len(meta.identifier)
        # Cheap upper bound on the cost of round-tripping `extra` through JSON.
        if meta.extra:
            try:
                total_chars += len(json.dumps(meta.extra, default=str))
            except (TypeError, ValueError):
                # Fallback when extras contain non-JSON-able values; charge a
                # conservative flat cost so the budget still caps growth.
                total_chars += sum(len(str(v)) for v in meta.extra.values())
    return total_chars // APPROX_CHARS_PER_TOKEN

truncate_chunks_to_budget

truncate_chunks_to_budget(
    chunks: Sequence[RAGContextChunk],
    token_budget: int = DEFAULT_TOKEN_BUDGET,
    *,
    citation_buffer_tokens: int = DEFAULT_CITATION_SAFETY_BUFFER_TOKENS,
) -> list[RAGContextChunk]

Citation-aware greedy truncation.

Chunks are fitted into a budget reduced by the projected citation overhead:

chunk_budget = token_budget - (estimated_citation_tokens + citation_buffer)

The citation estimate is derived from the source label, identifier, and metadata extras of the candidate chunks. The safety buffer accommodates structural overhead (commas, JSON braces, surrounding prose). When the resulting budget is non-positive the function returns an empty list.

Parameters:

Name Type Description Default
chunks Sequence[RAGContextChunk]

Candidate chunks in priority order. The first chunk is always kept verbatim if any budget remains.

required
token_budget int

Total tokens the synthesis prompt can spend on retrieval.

DEFAULT_TOKEN_BUDGET
citation_buffer_tokens int

Constant token allowance added to the dynamic citation estimate. Defaults to 64 tokens.

DEFAULT_CITATION_SAFETY_BUFFER_TOKENS

Returns:

Type Description
list[RAGContextChunk]

A list of chunks whose total textual char count fits within the

list[RAGContextChunk]

derived character budget. The trailing chunk may be partially

list[RAGContextChunk]

truncated; in that case its metadata.extra["truncated"] is set.

Source code in mirai_shared_skills/agentic_rag/skill.py
def truncate_chunks_to_budget(
    chunks: Sequence[RAGContextChunk],
    token_budget: int = DEFAULT_TOKEN_BUDGET,
    *,
    citation_buffer_tokens: int = DEFAULT_CITATION_SAFETY_BUFFER_TOKENS,
) -> list[RAGContextChunk]:
    """Citation-aware greedy truncation.

    Chunks are fitted into a budget reduced by the projected citation overhead:

        chunk_budget = token_budget - (estimated_citation_tokens + citation_buffer)

    The citation estimate is derived from the source label, identifier, and
    metadata extras of the candidate chunks. The safety buffer accommodates
    structural overhead (commas, JSON braces, surrounding prose). When the
    resulting budget is non-positive the function returns an empty list.

    Args:
        chunks: Candidate chunks in priority order. The first chunk is always
            kept verbatim if any budget remains.
        token_budget: Total tokens the synthesis prompt can spend on retrieval.
        citation_buffer_tokens: Constant token allowance added to the dynamic
            citation estimate. Defaults to 64 tokens.

    Returns:
        A list of chunks whose total textual char count fits within the
        derived character budget. The trailing chunk may be partially
        truncated; in that case its `metadata.extra["truncated"]` is set.
    """
    if not chunks:
        return []

    citation_overhead = estimate_citation_tokens(chunks) + citation_buffer_tokens
    chunk_token_budget = token_budget - citation_overhead
    if chunk_token_budget <= 0:
        return []

    char_budget = chunk_token_budget * APPROX_CHARS_PER_TOKEN
    kept: list[RAGContextChunk] = []
    used = 0
    for chunk in chunks:
        cost = len(chunk.text)
        if used + cost <= char_budget:
            kept.append(chunk)
            used += cost
            continue
        remaining = char_budget - used
        if remaining <= 0:
            break
        truncated_text = chunk.text[:remaining]
        kept.append(
            RAGContextChunk(
                text=truncated_text,
                metadata=chunk.metadata.model_copy(
                    update={"extra": {**chunk.metadata.extra, "truncated": True}},
                ),
            )
        )
        break
    return kept

eval

Evaluation suite for the AgenticRAGSkill — judge models, metrics, dataset.

DEFAULT_GEMINI_MODEL module-attribute

DEFAULT_GEMINI_MODEL: str = 'gemini-1.5-flash'

DEFAULT_GPT4O_MINI_MODEL module-attribute

DEFAULT_GPT4O_MINI_MODEL: str = 'gpt-4o-mini'

GOLDEN_DATASET_PATH module-attribute

GOLDEN_DATASET_PATH = (
    parent / "tests" / "data" / "rag_golden_set.json"
)

__all__ module-attribute

__all__ = [
    "DEFAULT_GEMINI_MODEL",
    "DEFAULT_GPT4O_MINI_MODEL",
    "DeepEvalJudgeAdapter",
    "EvaluationReport",
    "GOLDEN_DATASET_PATH",
    "GPT4oMiniJudge",
    "GeminiFlashJudge",
    "GoldenCase",
    "JudgeLLM",
    "MetricName",
    "MetricScore",
    "MockJudge",
    "evaluate_dataset",
    "load_golden_dataset",
]

DeepEvalJudgeAdapter

DeepEvalJudgeAdapter(judge: JudgeLLM)

Adapter that exposes a JudgeLLM as DeepEval's DeepEvalBaseLLM.

Imported lazily — the adapter is only instantiable when the optional [eval] extra (deepeval) is installed. The class itself remains constructible without DeepEval so type-checking stays clean.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def __init__(self, judge: JudgeLLM) -> None:
    self._judge = judge
__getattr__
__getattr__(item: str) -> Any
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def __getattr__(self, item: str) -> Any:
    # DeepEval introspects auxiliary methods (batch_generate, etc.); fall
    # back to no-ops where reasonable so the adapter stays drop-in.
    if item == "should_use_native_model":
        return lambda: False
    raise AttributeError(item)
a_generate async
a_generate(prompt: str, *args: Any, **kwargs: Any) -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
async def a_generate(self, prompt: str, *args: Any, **kwargs: Any) -> str:
    return await self._judge.a_generate(prompt)
generate
generate(prompt: str, *args: Any, **kwargs: Any) -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def generate(self, prompt: str, *args: Any, **kwargs: Any) -> str:
    return self._judge.generate(prompt)
get_model_name
get_model_name() -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def get_model_name(self) -> str:
    return self._judge.name
load_model
load_model() -> Any
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def load_model(self) -> Any:  # pragma: no cover — DeepEval lifecycle hook
    return self._judge

EvaluationReport dataclass

EvaluationReport(
    cases: int,
    averages: dict[MetricName, float],
    per_case: list[MetricScore] = list(),
)

Aggregated scores for a full evaluation run.

averages instance-attribute
averages: dict[MetricName, float]
cases instance-attribute
cases: int
per_case class-attribute instance-attribute
per_case: list[MetricScore] = field(default_factory=list)
above
above(threshold: float) -> dict[MetricName, bool]

Return whether each metric average is at or above threshold.

Source code in mirai_shared_skills/agentic_rag/eval/metrics.py
def above(self, threshold: float) -> dict[MetricName, bool]:
    """Return whether each metric average is at or above `threshold`."""
    return {metric: avg >= threshold for metric, avg in self.averages.items()}

GPT4oMiniJudge

GPT4oMiniJudge(
    api_key: str,
    *,
    model: str = DEFAULT_GPT4O_MINI_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS,
)

Bases: _HttpJudge

OpenAI gpt-4o-mini judge over the chat completions API.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def __init__(
    self,
    api_key: str,
    *,
    model: str = DEFAULT_GPT4O_MINI_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS,
) -> None:
    super().__init__(
        client=client,
        timeout_seconds=timeout_seconds,
        identifier=f"openai::{model}",
    )
    self._api_key = api_key
    self._model = model
    self._endpoint = endpoint.rstrip("/")
DEFAULT_ENDPOINT class-attribute instance-attribute
DEFAULT_ENDPOINT = 'https://api.openai.com'
a_generate async
a_generate(prompt: str) -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
async def a_generate(self, prompt: str) -> str:
    client = await self._ensure_client(
        self._endpoint,
        {
            "Authorization": f"Bearer {self._api_key}",
            "Content-Type": "application/json",
        },
    )
    body = {
        "model": self._model,
        "messages": [{"role": "user", "content": prompt}],
        "temperature": 0.0,
        "response_format": {"type": "json_object"},
    }
    response = await client.post("/v1/chat/completions", json=body)
    response.raise_for_status()
    payload = response.json()
    choices = payload.get("choices") or []
    if not choices:
        return ""
    message = choices[0].get("message", {})
    return str(message.get("content", ""))

GeminiFlashJudge

GeminiFlashJudge(
    api_key: str,
    *,
    model: str = DEFAULT_GEMINI_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS,
)

Bases: _HttpJudge

Gemini 1.5 Flash judge over the Generative Language REST API.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def __init__(
    self,
    api_key: str,
    *,
    model: str = DEFAULT_GEMINI_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS,
) -> None:
    super().__init__(
        client=client,
        timeout_seconds=timeout_seconds,
        identifier=f"gemini::{model}",
    )
    self._api_key = api_key
    self._model = model
    self._endpoint = endpoint.rstrip("/")
DEFAULT_ENDPOINT class-attribute instance-attribute
DEFAULT_ENDPOINT = (
    "https://generativelanguage.googleapis.com"
)
a_generate async
a_generate(prompt: str) -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
async def a_generate(self, prompt: str) -> str:
    client = await self._ensure_client(self._endpoint, {"Content-Type": "application/json"})
    path = f"/v1beta/models/{self._model}:generateContent"
    params = {"key": self._api_key}
    body = {
        "contents": [{"role": "user", "parts": [{"text": prompt}]}],
        "generationConfig": {
            "temperature": 0.0,
            "responseMimeType": "application/json",
        },
    }
    response = await client.post(path, params=params, json=body)
    response.raise_for_status()
    payload = response.json()
    candidates = payload.get("candidates") or []
    if not candidates:
        return ""
    parts = candidates[0].get("content", {}).get("parts") or []
    return "".join(part.get("text", "") for part in parts)

GoldenCase

Bases: BaseModel

A single ground-truth tuple used for offline evaluation.

context class-attribute instance-attribute
context: list[str] = Field(
    min_length=1,
    description="Retrieval chunks that should ground the response.",
)
expected_answer class-attribute instance-attribute
expected_answer: str = Field(
    min_length=1,
    description="The synthesised answer the agent should produce.",
)
expected_keywords class-attribute instance-attribute
expected_keywords: list[str] = Field(
    default_factory=list,
    description="Optional canonical phrases the answer is expected to contain.",
)
id class-attribute instance-attribute
id: str = Field(
    min_length=1,
    description="Stable identifier for the case.",
)
model_config class-attribute instance-attribute
model_config = ConfigDict(
    extra="forbid", str_strip_whitespace=True
)
query class-attribute instance-attribute
query: str = Field(
    min_length=1,
    description="User-style question to answer.",
)

JudgeLLM

Bases: ABC

Abstract judge with a single async generate entry point.

name abstractmethod property
name: str

Human-readable identifier used by DeepEval for telemetry.

a_generate abstractmethod async
a_generate(prompt: str) -> str

Return the judge's textual response for prompt.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
@abstractmethod
async def a_generate(self, prompt: str) -> str:
    """Return the judge's textual response for `prompt`."""
generate
generate(prompt: str) -> str

Sync wrapper around a_generate for DeepEval interoperability.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def generate(self, prompt: str) -> str:
    """Sync wrapper around `a_generate` for DeepEval interoperability."""
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # When called from within a running loop (e.g. DeepEval's
            # async metric path) the caller should use `a_generate`.
            raise RuntimeError("Use a_generate() inside a running event loop.")
    except RuntimeError:
        return asyncio.run(self.a_generate(prompt))
    return asyncio.run(self.a_generate(prompt))

MetricName

Bases: StrEnum

ANSWER_RELEVANCY class-attribute instance-attribute
ANSWER_RELEVANCY = 'answer_relevancy'
CONTEXTUAL_PRECISION class-attribute instance-attribute
CONTEXTUAL_PRECISION = 'contextual_precision'
FAITHFULNESS class-attribute instance-attribute
FAITHFULNESS = 'faithfulness'

MetricScore dataclass

MetricScore(
    case_id: str,
    metric: MetricName,
    score: float,
    reason: str,
)

A single metric outcome for a single golden case.

case_id instance-attribute
case_id: str
metric instance-attribute
metric: MetricName
reason instance-attribute
reason: str
score instance-attribute
score: float

MockJudge

MockJudge(
    responder: Callable[[str], str],
    *,
    identifier: str = "mock-judge",
)

Bases: JudgeLLM

Scripted judge — returns the result of responder(prompt) verbatim.

Used by hermetic unit tests to verify the eval pipeline wiring without issuing real LLM calls.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def __init__(
    self,
    responder: Callable[[str], str],
    *,
    identifier: str = "mock-judge",
) -> None:
    self._responder = responder
    self._identifier = identifier
    self.calls: list[str] = []
calls instance-attribute
calls: list[str] = []
name property
name: str
a_generate async
a_generate(prompt: str) -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
async def a_generate(self, prompt: str) -> str:
    self.calls.append(prompt)
    return self._responder(prompt)

evaluate_dataset async

evaluate_dataset(
    cases: list[GoldenCase],
    candidate_answers: dict[str, str],
    judge: JudgeLLM,
) -> EvaluationReport

Score every case across the three metrics using judge.

Parameters:

Name Type Description Default
cases list[GoldenCase]

Golden ground-truth cases.

required
candidate_answers dict[str, str]

Mapping of case.id -> answer produced by the agent. Cases without a candidate answer score as 0.0.

required
judge JudgeLLM

The JudgeLLM to drive scoring.

required
Source code in mirai_shared_skills/agentic_rag/eval/metrics.py
async def evaluate_dataset(
    cases: list[GoldenCase],
    candidate_answers: dict[str, str],
    judge: JudgeLLM,
) -> EvaluationReport:
    """Score every case across the three metrics using `judge`.

    Args:
        cases: Golden ground-truth cases.
        candidate_answers: Mapping of `case.id -> answer` produced by the
            agent. Cases without a candidate answer score as 0.0.
        judge: The `JudgeLLM` to drive scoring.
    """
    per_case: list[MetricScore] = []
    bucket: dict[MetricName, list[float]] = {m: [] for m in MetricName}
    for case in cases:
        answer = candidate_answers.get(case.id, "")
        case_scores = await _score_case(case, answer, judge)
        per_case.extend(case_scores)
        for ms in case_scores:
            bucket[ms.metric].append(ms.score)
    averages = {
        metric: (sum(values) / len(values)) if values else 0.0 for metric, values in bucket.items()
    }
    return EvaluationReport(cases=len(cases), averages=averages, per_case=per_case)

load_golden_dataset

load_golden_dataset(
    path: Path | str | None = None,
) -> list[GoldenCase]

Load and validate the JSON-encoded golden dataset.

Parameters:

Name Type Description Default
path Path | str | None

Optional override; defaults to tests/data/rag_golden_set.json relative to the repository root.

None

Returns:

Type Description
list[GoldenCase]

A validated list of GoldenCase instances.

Source code in mirai_shared_skills/agentic_rag/eval/dataset.py
def load_golden_dataset(path: Path | str | None = None) -> list[GoldenCase]:
    """Load and validate the JSON-encoded golden dataset.

    Args:
        path: Optional override; defaults to `tests/data/rag_golden_set.json`
            relative to the repository root.

    Returns:
        A validated list of `GoldenCase` instances.
    """
    target = Path(path) if path is not None else GOLDEN_DATASET_PATH
    with target.open("r", encoding="utf-8") as fh:
        payload = json.load(fh)
    return [GoldenCase.model_validate(entry) for entry in payload]

dataset

Golden dataset loader for the AgenticRAG evaluation suite.

GOLDEN_DATASET_PATH module-attribute
GOLDEN_DATASET_PATH = (
    parent / "tests" / "data" / "rag_golden_set.json"
)
__all__ module-attribute
__all__ = [
    "GOLDEN_DATASET_PATH",
    "GoldenCase",
    "load_golden_dataset",
]
GoldenCase

Bases: BaseModel

A single ground-truth tuple used for offline evaluation.

context class-attribute instance-attribute
context: list[str] = Field(
    min_length=1,
    description="Retrieval chunks that should ground the response.",
)
expected_answer class-attribute instance-attribute
expected_answer: str = Field(
    min_length=1,
    description="The synthesised answer the agent should produce.",
)
expected_keywords class-attribute instance-attribute
expected_keywords: list[str] = Field(
    default_factory=list,
    description="Optional canonical phrases the answer is expected to contain.",
)
id class-attribute instance-attribute
id: str = Field(
    min_length=1,
    description="Stable identifier for the case.",
)
model_config class-attribute instance-attribute
model_config = ConfigDict(
    extra="forbid", str_strip_whitespace=True
)
query class-attribute instance-attribute
query: str = Field(
    min_length=1,
    description="User-style question to answer.",
)
load_golden_dataset
load_golden_dataset(
    path: Path | str | None = None,
) -> list[GoldenCase]

Load and validate the JSON-encoded golden dataset.

Parameters:

Name Type Description Default
path Path | str | None

Optional override; defaults to tests/data/rag_golden_set.json relative to the repository root.

None

Returns:

Type Description
list[GoldenCase]

A validated list of GoldenCase instances.

Source code in mirai_shared_skills/agentic_rag/eval/dataset.py
def load_golden_dataset(path: Path | str | None = None) -> list[GoldenCase]:
    """Load and validate the JSON-encoded golden dataset.

    Args:
        path: Optional override; defaults to `tests/data/rag_golden_set.json`
            relative to the repository root.

    Returns:
        A validated list of `GoldenCase` instances.
    """
    target = Path(path) if path is not None else GOLDEN_DATASET_PATH
    with target.open("r", encoding="utf-8") as fh:
        payload = json.load(fh)
    return [GoldenCase.model_validate(entry) for entry in payload]

judge

Cost-efficient judge models for the AgenticRAG evaluation suite.

The evaluation pipeline scores retrievals + answers using small, fast models (Gemini 1.5 Flash, GPT-4o-mini) so high-frequency CI runs stay economically viable while remaining well-aligned with human judgement on RAG metrics.

Three concrete JudgeLLM implementations are shipped:

  • MockJudge: scripted responses for hermetic unit tests.
  • GeminiFlashJudge: Google gemini-1.5-flash via the Generative Language API.
  • GPT4oMiniJudge: OpenAI gpt-4o-mini via the chat completions API.

A DeepEvalJudgeAdapter bridges any JudgeLLM to DeepEval's DeepEvalBaseLLM interface so the same judge powers both our lightweight deterministic eval runner and DeepEval's full metric suite.

DEFAULT_GEMINI_MODEL module-attribute
DEFAULT_GEMINI_MODEL: str = 'gemini-1.5-flash'
DEFAULT_GPT4O_MINI_MODEL module-attribute
DEFAULT_GPT4O_MINI_MODEL: str = 'gpt-4o-mini'
DEFAULT_JUDGE_TIMEOUT_SECONDS module-attribute
DEFAULT_JUDGE_TIMEOUT_SECONDS: float = 30.0
JudgeFactory module-attribute
JudgeFactory = Callable[[], Awaitable[JudgeLLM]]
__all__ module-attribute
__all__ = [
    "DEFAULT_GEMINI_MODEL",
    "DEFAULT_GPT4O_MINI_MODEL",
    "DEFAULT_JUDGE_TIMEOUT_SECONDS",
    "DeepEvalJudgeAdapter",
    "GPT4oMiniJudge",
    "GeminiFlashJudge",
    "JudgeFactory",
    "JudgeLLM",
    "MockJudge",
]
DeepEvalJudgeAdapter
DeepEvalJudgeAdapter(judge: JudgeLLM)

Adapter that exposes a JudgeLLM as DeepEval's DeepEvalBaseLLM.

Imported lazily — the adapter is only instantiable when the optional [eval] extra (deepeval) is installed. The class itself remains constructible without DeepEval so type-checking stays clean.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def __init__(self, judge: JudgeLLM) -> None:
    self._judge = judge
__getattr__
__getattr__(item: str) -> Any
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def __getattr__(self, item: str) -> Any:
    # DeepEval introspects auxiliary methods (batch_generate, etc.); fall
    # back to no-ops where reasonable so the adapter stays drop-in.
    if item == "should_use_native_model":
        return lambda: False
    raise AttributeError(item)
a_generate async
a_generate(prompt: str, *args: Any, **kwargs: Any) -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
async def a_generate(self, prompt: str, *args: Any, **kwargs: Any) -> str:
    return await self._judge.a_generate(prompt)
generate
generate(prompt: str, *args: Any, **kwargs: Any) -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def generate(self, prompt: str, *args: Any, **kwargs: Any) -> str:
    return self._judge.generate(prompt)
get_model_name
get_model_name() -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def get_model_name(self) -> str:
    return self._judge.name
load_model
load_model() -> Any
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def load_model(self) -> Any:  # pragma: no cover — DeepEval lifecycle hook
    return self._judge
GPT4oMiniJudge
GPT4oMiniJudge(
    api_key: str,
    *,
    model: str = DEFAULT_GPT4O_MINI_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS,
)

Bases: _HttpJudge

OpenAI gpt-4o-mini judge over the chat completions API.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def __init__(
    self,
    api_key: str,
    *,
    model: str = DEFAULT_GPT4O_MINI_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS,
) -> None:
    super().__init__(
        client=client,
        timeout_seconds=timeout_seconds,
        identifier=f"openai::{model}",
    )
    self._api_key = api_key
    self._model = model
    self._endpoint = endpoint.rstrip("/")
DEFAULT_ENDPOINT class-attribute instance-attribute
DEFAULT_ENDPOINT = 'https://api.openai.com'
a_generate async
a_generate(prompt: str) -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
async def a_generate(self, prompt: str) -> str:
    client = await self._ensure_client(
        self._endpoint,
        {
            "Authorization": f"Bearer {self._api_key}",
            "Content-Type": "application/json",
        },
    )
    body = {
        "model": self._model,
        "messages": [{"role": "user", "content": prompt}],
        "temperature": 0.0,
        "response_format": {"type": "json_object"},
    }
    response = await client.post("/v1/chat/completions", json=body)
    response.raise_for_status()
    payload = response.json()
    choices = payload.get("choices") or []
    if not choices:
        return ""
    message = choices[0].get("message", {})
    return str(message.get("content", ""))
GeminiFlashJudge
GeminiFlashJudge(
    api_key: str,
    *,
    model: str = DEFAULT_GEMINI_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS,
)

Bases: _HttpJudge

Gemini 1.5 Flash judge over the Generative Language REST API.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def __init__(
    self,
    api_key: str,
    *,
    model: str = DEFAULT_GEMINI_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_JUDGE_TIMEOUT_SECONDS,
) -> None:
    super().__init__(
        client=client,
        timeout_seconds=timeout_seconds,
        identifier=f"gemini::{model}",
    )
    self._api_key = api_key
    self._model = model
    self._endpoint = endpoint.rstrip("/")
DEFAULT_ENDPOINT class-attribute instance-attribute
DEFAULT_ENDPOINT = (
    "https://generativelanguage.googleapis.com"
)
a_generate async
a_generate(prompt: str) -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
async def a_generate(self, prompt: str) -> str:
    client = await self._ensure_client(self._endpoint, {"Content-Type": "application/json"})
    path = f"/v1beta/models/{self._model}:generateContent"
    params = {"key": self._api_key}
    body = {
        "contents": [{"role": "user", "parts": [{"text": prompt}]}],
        "generationConfig": {
            "temperature": 0.0,
            "responseMimeType": "application/json",
        },
    }
    response = await client.post(path, params=params, json=body)
    response.raise_for_status()
    payload = response.json()
    candidates = payload.get("candidates") or []
    if not candidates:
        return ""
    parts = candidates[0].get("content", {}).get("parts") or []
    return "".join(part.get("text", "") for part in parts)
JudgeLLM

Bases: ABC

Abstract judge with a single async generate entry point.

name abstractmethod property
name: str

Human-readable identifier used by DeepEval for telemetry.

a_generate abstractmethod async
a_generate(prompt: str) -> str

Return the judge's textual response for prompt.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
@abstractmethod
async def a_generate(self, prompt: str) -> str:
    """Return the judge's textual response for `prompt`."""
generate
generate(prompt: str) -> str

Sync wrapper around a_generate for DeepEval interoperability.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def generate(self, prompt: str) -> str:
    """Sync wrapper around `a_generate` for DeepEval interoperability."""
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # When called from within a running loop (e.g. DeepEval's
            # async metric path) the caller should use `a_generate`.
            raise RuntimeError("Use a_generate() inside a running event loop.")
    except RuntimeError:
        return asyncio.run(self.a_generate(prompt))
    return asyncio.run(self.a_generate(prompt))
MockJudge
MockJudge(
    responder: Callable[[str], str],
    *,
    identifier: str = "mock-judge",
)

Bases: JudgeLLM

Scripted judge — returns the result of responder(prompt) verbatim.

Used by hermetic unit tests to verify the eval pipeline wiring without issuing real LLM calls.

Source code in mirai_shared_skills/agentic_rag/eval/judge.py
def __init__(
    self,
    responder: Callable[[str], str],
    *,
    identifier: str = "mock-judge",
) -> None:
    self._responder = responder
    self._identifier = identifier
    self.calls: list[str] = []
calls instance-attribute
calls: list[str] = []
name property
name: str
a_generate async
a_generate(prompt: str) -> str
Source code in mirai_shared_skills/agentic_rag/eval/judge.py
async def a_generate(self, prompt: str) -> str:
    self.calls.append(prompt)
    return self._responder(prompt)

metrics

LLM-as-a-judge metrics for the AgenticRAG evaluation suite.

Each metric is graded by a small, cost-efficient JudgeLLM (Gemini Flash or GPT-4o-mini by default) so high-frequency CI runs stay economically viable. The judge is instructed to return a strict JSON envelope {"score": float, "reason": str} which we parse defensively.

Three metrics ship out of the box:

  • Faithfulness — does every claim in the answer appear in the context?
  • Answer Relevancy — does the answer address the user's question?
  • Contextual Precision — does the retrieved context cover the answer?

When DeepEval is installed, the same JudgeLLM instance is reusable through DeepEvalJudgeAdapter for the full DeepEval metric suite; see mirai_shared_skills.agentic_rag.eval.judge.DeepEvalJudgeAdapter.

__all__ module-attribute
__all__ = [
    "EvaluationReport",
    "MetricName",
    "MetricScore",
    "evaluate_dataset",
]
EvaluationReport dataclass
EvaluationReport(
    cases: int,
    averages: dict[MetricName, float],
    per_case: list[MetricScore] = list(),
)

Aggregated scores for a full evaluation run.

averages instance-attribute
averages: dict[MetricName, float]
cases instance-attribute
cases: int
per_case class-attribute instance-attribute
per_case: list[MetricScore] = field(default_factory=list)
above
above(threshold: float) -> dict[MetricName, bool]

Return whether each metric average is at or above threshold.

Source code in mirai_shared_skills/agentic_rag/eval/metrics.py
def above(self, threshold: float) -> dict[MetricName, bool]:
    """Return whether each metric average is at or above `threshold`."""
    return {metric: avg >= threshold for metric, avg in self.averages.items()}
MetricName

Bases: StrEnum

ANSWER_RELEVANCY class-attribute instance-attribute
ANSWER_RELEVANCY = 'answer_relevancy'
CONTEXTUAL_PRECISION class-attribute instance-attribute
CONTEXTUAL_PRECISION = 'contextual_precision'
FAITHFULNESS class-attribute instance-attribute
FAITHFULNESS = 'faithfulness'
MetricScore dataclass
MetricScore(
    case_id: str,
    metric: MetricName,
    score: float,
    reason: str,
)

A single metric outcome for a single golden case.

case_id instance-attribute
case_id: str
metric instance-attribute
metric: MetricName
reason instance-attribute
reason: str
score instance-attribute
score: float
evaluate_dataset async
evaluate_dataset(
    cases: list[GoldenCase],
    candidate_answers: dict[str, str],
    judge: JudgeLLM,
) -> EvaluationReport

Score every case across the three metrics using judge.

Parameters:

Name Type Description Default
cases list[GoldenCase]

Golden ground-truth cases.

required
candidate_answers dict[str, str]

Mapping of case.id -> answer produced by the agent. Cases without a candidate answer score as 0.0.

required
judge JudgeLLM

The JudgeLLM to drive scoring.

required
Source code in mirai_shared_skills/agentic_rag/eval/metrics.py
async def evaluate_dataset(
    cases: list[GoldenCase],
    candidate_answers: dict[str, str],
    judge: JudgeLLM,
) -> EvaluationReport:
    """Score every case across the three metrics using `judge`.

    Args:
        cases: Golden ground-truth cases.
        candidate_answers: Mapping of `case.id -> answer` produced by the
            agent. Cases without a candidate answer score as 0.0.
        judge: The `JudgeLLM` to drive scoring.
    """
    per_case: list[MetricScore] = []
    bucket: dict[MetricName, list[float]] = {m: [] for m in MetricName}
    for case in cases:
        answer = candidate_answers.get(case.id, "")
        case_scores = await _score_case(case, answer, judge)
        per_case.extend(case_scores)
        for ms in case_scores:
            bucket[ms.metric].append(ms.score)
    averages = {
        metric: (sum(values) / len(values)) if values else 0.0 for metric, values in bucket.items()
    }
    return EvaluationReport(cases=len(cases), averages=averages, per_case=per_case)

models

Strict Pydantic v2 schemas shared by every retrieval provider.

SourceName module-attribute

SourceName = Literal['neo4j', 'azure-search', 'web']

__all__ module-attribute

__all__ = [
    "RAGContextChunk",
    "RetrievalQuery",
    "SourceMetadata",
    "SourceName",
]

RAGContextChunk

Bases: BaseModel

A single unit of context returned by any retrieval provider.

metadata class-attribute instance-attribute
metadata: SourceMetadata = Field(
    description="Provenance metadata for citation."
)
model_config class-attribute instance-attribute
model_config = ConfigDict(extra='forbid')
text class-attribute instance-attribute
text: str = Field(
    min_length=1,
    description="Plain-text content for the LLM to read.",
)

RetrievalQuery

Bases: BaseModel

Caller-supplied query envelope sent to the retrieval providers.

filters class-attribute instance-attribute
filters: dict[str, Any] | None = Field(
    default=None,
    description="Provider-specific filter map (e.g. OData filter for Azure).",
)
model_config class-attribute instance-attribute
model_config = ConfigDict(
    extra="forbid", str_strip_whitespace=True
)
query class-attribute instance-attribute
query: str = Field(
    min_length=1,
    description="Natural-language query string.",
)
top_k class-attribute instance-attribute
top_k: int = Field(
    default=5,
    ge=1,
    le=50,
    description="Maximum chunks to return.",
)

SourceMetadata

Bases: BaseModel

Provenance attached to every retrieved chunk so the LLM can cite sources.

extra class-attribute instance-attribute
extra: dict[str, Any] = Field(
    default_factory=dict,
    description="Free-form provider extras (URL, label, embedding, etc.).",
)
identifier class-attribute instance-attribute
identifier: str = Field(
    min_length=1,
    description="Stable ID of the underlying record.",
)
model_config class-attribute instance-attribute
model_config = ConfigDict(extra='forbid')
score class-attribute instance-attribute
score: float | None = Field(
    default=None,
    description="Provider-reported relevance score, when available.",
)
source class-attribute instance-attribute
source: SourceName = Field(
    description="Provider identifier emitting this chunk."
)

providers

Retrieval providers consumed by the AgenticRAGSkill.

__all__ module-attribute

__all__ = [
    "AzureSearchProvider",
    "BrowserWebSearchProvider",
    "CohereRerankProvider",
    "Neo4jGraphProvider",
    "NoOpRerankerProvider",
    "Qwen3RerankerProvider",
    "RerankerConfig",
    "RerankerProvider",
    "WebSearchProvider",
]

AzureSearchProvider

AzureSearchProvider(
    config: AzureSearchConfig,
    *,
    client: AsyncClient | None = None,
    embedder: EmbeddingFn | None = None,
    timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
)

Async hybrid-search wrapper over the Azure AI Search REST API.

Source code in mirai_shared_skills/agentic_rag/providers/azure.py
def __init__(
    self,
    config: AzureSearchConfig,
    *,
    client: httpx.AsyncClient | None = None,
    embedder: EmbeddingFn | None = None,
    timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
) -> None:
    self._config = config
    self._client = client
    self._owns_client = client is None
    self._embedder = embedder
    self._timeout_seconds = timeout_seconds
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/azure.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None
search async
search(
    query: str,
    *,
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
) -> list[RAGContextChunk]

Issue a hybrid search and project the response into context chunks.

Source code in mirai_shared_skills/agentic_rag/providers/azure.py
async def search(
    self,
    query: str,
    *,
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
) -> list[RAGContextChunk]:
    """Issue a hybrid search and project the response into context chunks."""
    client = await self._ensure_client()
    body: dict[str, Any] = {
        "search": query,
        "top": top_k,
        "queryType": "semantic" if self._config.semantic_configuration else "simple",
    }
    if self._config.semantic_configuration:
        body["semanticConfiguration"] = self._config.semantic_configuration
    if filters and isinstance(filters.get("odata"), str):
        body["filter"] = filters["odata"]
    vector = await self._embed(query)
    if vector is not None:
        body["vectorQueries"] = [
            {
                "kind": "vector",
                "vector": list(vector),
                "k": top_k,
                "fields": ",".join(self._config.vector_fields),
            }
        ]
    path = f"/indexes/{self._config.index_name}/docs/search"
    params = {"api-version": self._config.api_version}
    response = await client.post(path, params=params, json=body)
    response.raise_for_status()
    payload = response.json()
    chunks: list[RAGContextChunk] = []
    for entry in payload.get("value", [])[:top_k]:
        identifier = str(entry.get("id") or entry.get("@search.documentId") or "unknown")
        text = str(entry.get("content") or entry.get("text") or "")
        if not text:
            continue
        score = entry.get("@search.rerankerScore") or entry.get("@search.score")
        chunks.append(
            RAGContextChunk(
                text=text,
                metadata=SourceMetadata(
                    source="azure-search",
                    identifier=identifier,
                    score=float(score) if score is not None else None,
                    extra={
                        k: v
                        for k, v in entry.items()
                        if k not in {"content", "text"} and not k.startswith("@search.")
                    },
                ),
            )
        )
    return chunks

BrowserWebSearchProvider

BrowserWebSearchProvider(
    *,
    browser: AgentBrowserSkill | None = None,
    url_builder: UrlBuilder = default_duckduckgo_url,
)

Bases: WebSearchProvider

Default WebSearchProvider that delegates to AgentBrowserSkill.

The url_builder callable controls which search engine is queried, so the same provider works with DuckDuckGo, Bing, Brave, or any private enterprise search portal.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
def __init__(
    self,
    *,
    browser: AgentBrowserSkill | None = None,
    url_builder: UrlBuilder = default_duckduckgo_url,
) -> None:
    self._browser = browser or AgentBrowserSkill()
    self._url_builder = url_builder
    self._cached_browse: Callable[..., Awaitable[str]] | None = None
search async
search(
    query: str, *, top_k: int = 5
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/web.py
async def search(self, query: str, *, top_k: int = 5) -> list[RAGContextChunk]:
    url = self._url_builder(query)
    browse = self._browse_callable()
    raw = await browse(url=url)
    payload = json.loads(raw)
    if "error" in payload:
        return []
    text: str = payload.get("text", "")
    if not text:
        return []
    # Split the body into paragraph-ish chunks and keep the top_k strongest.
    candidates = [block.strip() for block in text.split("\n\n") if block.strip()]
    if not candidates:
        candidates = [text]
    chunks: list[RAGContextChunk] = []
    for index, snippet in enumerate(candidates[:top_k]):
        chunks.append(
            RAGContextChunk(
                text=snippet,
                metadata=SourceMetadata(
                    source="web",
                    identifier=f"{payload.get('final_url') or url}#chunk-{index}",
                    score=None,
                    extra={
                        "search_url": url,
                        "status_code": payload.get("status_code"),
                        "final_url": payload.get("final_url"),
                    },
                ),
            )
        )
    return chunks

CohereRerankProvider

CohereRerankProvider(
    api_key: str,
    *,
    model: str = DEFAULT_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    config: RerankerConfig | None = None,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
)

Bases: RerankerProvider

Cohere Rerank v4 over the public REST API.

Requires a CO_API_KEY. The provider is async-first and reuses a single httpx.AsyncClient for connection pooling. Pass an existing client to share pools with surrounding application code.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
def __init__(
    self,
    api_key: str,
    *,
    model: str = DEFAULT_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    config: RerankerConfig | None = None,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
) -> None:
    self._api_key = api_key
    self._model = model
    self._endpoint = endpoint.rstrip("/")
    self._config = config or RerankerConfig()
    self._client = client
    self._owns_client = client is None
    self._timeout_seconds = timeout_seconds
DEFAULT_ENDPOINT class-attribute instance-attribute
DEFAULT_ENDPOINT = 'https://api.cohere.com'
DEFAULT_MODEL class-attribute instance-attribute
DEFAULT_MODEL = 'rerank-v3.5'
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None
rerank async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    if not chunks:
        return []
    limit = top_k if top_k is not None else self._config.top_k
    client = await self._ensure_client()
    response = await client.post(
        "/v2/rerank",
        json={
            "model": self._model,
            "query": query,
            "documents": [chunk.text for chunk in chunks],
            "top_n": limit,
            "max_tokens_per_doc": self._config.max_context_tokens,
        },
    )
    response.raise_for_status()
    payload = response.json()
    scored: list[RAGContextChunk] = []
    for entry in payload.get("results", []):
        index = int(entry.get("index", -1))
        if index < 0 or index >= len(chunks):
            continue
        score = float(entry.get("relevance_score", 0.0))
        scored.append(_stamp_score(chunks[index], score))
    return _apply_score_filters(
        scored,
        threshold=self._config.score_threshold,
        top_k=limit,
    )

Neo4jGraphProvider

Neo4jGraphProvider(
    connection: Neo4jConnection | None = None,
    *,
    driver: _AsyncDriver | None = None,
)

Async Neo4j wrapper exposing schema introspection plus parameterised queries.

Test seams: callers can inject driver directly so unit tests bypass the real driver factory. Otherwise the driver is created lazily on first use and reused (singleton) for the lifetime of the provider instance.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
def __init__(
    self,
    connection: Neo4jConnection | None = None,
    *,
    driver: _AsyncDriver | None = None,
) -> None:
    if connection is None and driver is None:
        raise ValueError("Neo4jGraphProvider requires either `connection` or `driver`.")
    self._connection = connection
    self._driver: _AsyncDriver | None = driver
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def aclose(self) -> None:
    if self._driver is not None:
        await self._driver.close()
        self._driver = None
describe_schema async
describe_schema() -> dict[str, list[str]]

Return labels and relationship types known to the graph.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def describe_schema(self) -> dict[str, list[str]]:
    """Return labels and relationship types known to the graph."""
    async with self._session() as session:
        label_rows = await session.run("CALL db.labels()")
        labels = [row.data().get("label", "") async for row in label_rows]
        rel_rows = await session.run("CALL db.relationshipTypes()")
        relationships = [row.data().get("relationshipType", "") async for row in rel_rows]
    return {
        "labels": [label for label in labels if label],
        "relationships": [rel for rel in relationships if rel],
    }
execute async
execute(
    cypher: str,
    parameters: Mapping[str, Any] | None = None,
    *,
    top_k: int = 5,
) -> list[RAGContextChunk]

Run a Cypher statement and project the rows into RAGContextChunks.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def execute(
    self,
    cypher: str,
    parameters: Mapping[str, Any] | None = None,
    *,
    top_k: int = 5,
) -> list[RAGContextChunk]:
    """Run a Cypher statement and project the rows into `RAGContextChunk`s."""
    chunks: list[RAGContextChunk] = []
    async with self._session() as session:
        result = await session.run(cypher, parameters or {})
        count = 0
        async for record in result:
            if count >= top_k:
                break
            row = record.data()
            identifier = str(
                row.get("id") or row.get("identifier") or f"row-{count}",
            )
            text = str(row.get("text") or row)
            chunks.append(
                RAGContextChunk(
                    text=text,
                    metadata=SourceMetadata(
                        source="neo4j",
                        identifier=identifier,
                        score=None,
                        extra={"row": row},
                    ),
                )
            )
            count += 1
    return chunks
verify_plugins async
verify_plugins() -> dict[str, Any]

Probe the graph for APOC and GDS availability.

Returns a status dict shaped like::

{"apoc": True, "gds": False, "detail": {"apoc": "...", "gds": "..."}}

The probe runs lightweight introspection calls (apoc.help('apoc') and gds.list()) and never raises — when a procedure is missing the corresponding flag is set to False and the failure reason is captured in detail.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def verify_plugins(self) -> dict[str, Any]:
    """Probe the graph for APOC and GDS availability.

    Returns a status dict shaped like::

        {"apoc": True, "gds": False, "detail": {"apoc": "...", "gds": "..."}}

    The probe runs lightweight introspection calls (`apoc.help('apoc')`
    and `gds.list()`) and never raises — when a procedure is missing the
    corresponding flag is set to `False` and the failure reason is
    captured in `detail`.
    """
    status: dict[str, Any] = {"apoc": False, "gds": False, "detail": {}}
    async with self._session() as session:
        for plugin, probe in (
            ("apoc", "CALL apoc.help('apoc') YIELD name RETURN count(name) AS n"),
            ("gds", "CALL gds.list() YIELD name RETURN count(name) AS n"),
        ):
            try:
                result = await session.run(probe)
                count = 0
                async for record in result:
                    count = int(record.data().get("n", 0) or 0)
                    break
                status[plugin] = count > 0
                if count == 0:
                    status["detail"][plugin] = "no procedures registered"
            except Exception as exc:  # noqa: BLE001 — translate driver errors
                status[plugin] = False
                status["detail"][plugin] = str(exc)
    return status

NoOpRerankerProvider

Bases: RerankerProvider

Passthrough reranker — preserves original ordering and trims to top_k.

rerank async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    limit = top_k if top_k is not None else len(chunks)
    return list(chunks[:limit])

Qwen3RerankerProvider

Qwen3RerankerProvider(
    endpoint: str,
    *,
    api_key: str | None = None,
    model: str = DEFAULT_MODEL,
    config: RerankerConfig | None = None,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
)

Bases: RerankerProvider

Qwen3-Reranker-4B served via an OpenAI-compatible inference endpoint.

The provider POSTs {query, documents, top_n} to endpoint and expects a {results: [{index, score}]} envelope. vLLM, TGI, and Together AI all expose this shape for cross-encoder reranker models.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
def __init__(
    self,
    endpoint: str,
    *,
    api_key: str | None = None,
    model: str = DEFAULT_MODEL,
    config: RerankerConfig | None = None,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
) -> None:
    self._endpoint = endpoint.rstrip("/")
    self._api_key = api_key
    self._model = model
    self._config = config or RerankerConfig()
    self._client = client
    self._owns_client = client is None
    self._timeout_seconds = timeout_seconds
DEFAULT_MODEL class-attribute instance-attribute
DEFAULT_MODEL = 'Qwen/Qwen3-Reranker-4B'
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None
rerank async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    if not chunks:
        return []
    limit = top_k if top_k is not None else self._config.top_k
    client = await self._ensure_client()
    response = await client.post(
        "/v1/rerank",
        json={
            "model": self._model,
            "query": query,
            "documents": [chunk.text for chunk in chunks],
            "top_n": limit,
            "max_tokens": self._config.max_context_tokens,
        },
    )
    response.raise_for_status()
    payload = response.json()
    scored: list[RAGContextChunk] = []
    # Accept both `results` (Cohere-style) and `data` (vLLM-style) envelopes
    # so the same provider talks to multiple inference backends without
    # bespoke client code per host.
    rows: Iterable[dict[str, Any]] = payload.get("results") or payload.get("data") or []
    for entry in rows:
        index = int(entry.get("index", -1))
        if index < 0 or index >= len(chunks):
            continue
        raw_score = entry.get("relevance_score")
        if raw_score is None:
            raw_score = entry.get("score", 0.0)
        score = float(raw_score if raw_score is not None else 0.0)
        scored.append(_stamp_score(chunks[index], score))
    return _apply_score_filters(
        scored,
        threshold=self._config.score_threshold,
        top_k=limit,
    )

RerankerConfig dataclass

RerankerConfig(
    top_k: int = 5,
    max_context_tokens: int = QWEN3_LONG_CONTEXT_TOKENS,
    score_threshold: float | None = None,
    extra_headers: dict[str, str] = dict(),
)

Common reranker configuration knobs shared by every backend.

Attributes:

Name Type Description
top_k int

Maximum number of chunks the reranker should return.

max_context_tokens int

Per-pair input token budget. Defaults to the 32k window supported by Qwen3-Reranker-4B; Cohere Rerank v4 also operates against long-form documents and respects this hint.

score_threshold float | None

Optional minimum score; chunks below it are dropped.

extra_headers dict[str, str]

Extra headers forwarded with every request.

extra_headers class-attribute instance-attribute
extra_headers: dict[str, str] = field(default_factory=dict)
max_context_tokens class-attribute instance-attribute
max_context_tokens: int = QWEN3_LONG_CONTEXT_TOKENS
score_threshold class-attribute instance-attribute
score_threshold: float | None = None
top_k class-attribute instance-attribute
top_k: int = 5

RerankerProvider

Bases: ABC

Abstract base for any reranker implementation.

rerank abstractmethod async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]

Return chunks reordered by relevance to query, truncated to top_k.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
@abstractmethod
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    """Return chunks reordered by relevance to `query`, truncated to `top_k`."""

WebSearchProvider

Bases: ABC

Abstract base for any web search backend.

search abstractmethod async
search(
    query: str, *, top_k: int = 5
) -> list[RAGContextChunk]

Return chunks containing live web text for the given query.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
@abstractmethod
async def search(self, query: str, *, top_k: int = 5) -> list[RAGContextChunk]:
    """Return chunks containing live web text for the given query."""

azure

Azure AI Search provider — hybrid retrieval over the REST API.

The provider issues hybrid (vector + BM25) queries with the Semantic Ranker enabled when semantic_configuration is provided. Filters are passed through verbatim as OData expressions.

A pluggable embedding callable produces vectors for the user query; tests can inject a stub callable to avoid hitting the real embedding model.

DEFAULT_API_VERSION module-attribute
DEFAULT_API_VERSION = '2024-07-01'
DEFAULT_TIMEOUT_SECONDS module-attribute
DEFAULT_TIMEOUT_SECONDS = 15.0
DEFAULT_VECTOR_FIELDS module-attribute
DEFAULT_VECTOR_FIELDS = ('contentVector',)
EmbeddingFn module-attribute
EmbeddingFn = Callable[[str], Awaitable[Sequence[float]]]
__all__ module-attribute
__all__ = [
    "AzureSearchConfig",
    "AzureSearchProvider",
    "EmbeddingFn",
]
AzureSearchConfig dataclass
AzureSearchConfig(
    endpoint: str,
    index_name: str,
    api_key: str,
    api_version: str = DEFAULT_API_VERSION,
    semantic_configuration: str | None = None,
    vector_fields: tuple[str, ...] = DEFAULT_VECTOR_FIELDS,
)

Connection params for an Azure AI Search index.

api_key instance-attribute
api_key: str
api_version class-attribute instance-attribute
api_version: str = DEFAULT_API_VERSION
endpoint instance-attribute
endpoint: str
index_name instance-attribute
index_name: str
semantic_configuration class-attribute instance-attribute
semantic_configuration: str | None = None
vector_fields class-attribute instance-attribute
vector_fields: tuple[str, ...] = DEFAULT_VECTOR_FIELDS
AzureSearchProvider
AzureSearchProvider(
    config: AzureSearchConfig,
    *,
    client: AsyncClient | None = None,
    embedder: EmbeddingFn | None = None,
    timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
)

Async hybrid-search wrapper over the Azure AI Search REST API.

Source code in mirai_shared_skills/agentic_rag/providers/azure.py
def __init__(
    self,
    config: AzureSearchConfig,
    *,
    client: httpx.AsyncClient | None = None,
    embedder: EmbeddingFn | None = None,
    timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
) -> None:
    self._config = config
    self._client = client
    self._owns_client = client is None
    self._embedder = embedder
    self._timeout_seconds = timeout_seconds
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/azure.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None
search async
search(
    query: str,
    *,
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
) -> list[RAGContextChunk]

Issue a hybrid search and project the response into context chunks.

Source code in mirai_shared_skills/agentic_rag/providers/azure.py
async def search(
    self,
    query: str,
    *,
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
) -> list[RAGContextChunk]:
    """Issue a hybrid search and project the response into context chunks."""
    client = await self._ensure_client()
    body: dict[str, Any] = {
        "search": query,
        "top": top_k,
        "queryType": "semantic" if self._config.semantic_configuration else "simple",
    }
    if self._config.semantic_configuration:
        body["semanticConfiguration"] = self._config.semantic_configuration
    if filters and isinstance(filters.get("odata"), str):
        body["filter"] = filters["odata"]
    vector = await self._embed(query)
    if vector is not None:
        body["vectorQueries"] = [
            {
                "kind": "vector",
                "vector": list(vector),
                "k": top_k,
                "fields": ",".join(self._config.vector_fields),
            }
        ]
    path = f"/indexes/{self._config.index_name}/docs/search"
    params = {"api-version": self._config.api_version}
    response = await client.post(path, params=params, json=body)
    response.raise_for_status()
    payload = response.json()
    chunks: list[RAGContextChunk] = []
    for entry in payload.get("value", [])[:top_k]:
        identifier = str(entry.get("id") or entry.get("@search.documentId") or "unknown")
        text = str(entry.get("content") or entry.get("text") or "")
        if not text:
            continue
        score = entry.get("@search.rerankerScore") or entry.get("@search.score")
        chunks.append(
            RAGContextChunk(
                text=text,
                metadata=SourceMetadata(
                    source="azure-search",
                    identifier=identifier,
                    score=float(score) if score is not None else None,
                    extra={
                        k: v
                        for k, v in entry.items()
                        if k not in {"content", "text"} and not k.startswith("@search.")
                    },
                ),
            )
        )
    return chunks

neo4j_graph

Neo4j graph retrieval provider.

neo4j is an optional dependency. The driver is imported lazily so the catalog can be loaded without the Neo4j package installed; attempting to execute a query without the driver raises a structured error chunk instead.

__all__ module-attribute
__all__ = [
    "Neo4jConnection",
    "Neo4jGraphProvider",
    "Neo4jUnavailableError",
]
Neo4jConnection dataclass
Neo4jConnection(
    uri: str,
    user: str,
    password: str,
    database: str | None = None,
)

Connection params for the Neo4j async driver.

database class-attribute instance-attribute
database: str | None = None
password instance-attribute
password: str
uri instance-attribute
uri: str
user instance-attribute
user: str
Neo4jGraphProvider
Neo4jGraphProvider(
    connection: Neo4jConnection | None = None,
    *,
    driver: _AsyncDriver | None = None,
)

Async Neo4j wrapper exposing schema introspection plus parameterised queries.

Test seams: callers can inject driver directly so unit tests bypass the real driver factory. Otherwise the driver is created lazily on first use and reused (singleton) for the lifetime of the provider instance.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
def __init__(
    self,
    connection: Neo4jConnection | None = None,
    *,
    driver: _AsyncDriver | None = None,
) -> None:
    if connection is None and driver is None:
        raise ValueError("Neo4jGraphProvider requires either `connection` or `driver`.")
    self._connection = connection
    self._driver: _AsyncDriver | None = driver
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def aclose(self) -> None:
    if self._driver is not None:
        await self._driver.close()
        self._driver = None
describe_schema async
describe_schema() -> dict[str, list[str]]

Return labels and relationship types known to the graph.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def describe_schema(self) -> dict[str, list[str]]:
    """Return labels and relationship types known to the graph."""
    async with self._session() as session:
        label_rows = await session.run("CALL db.labels()")
        labels = [row.data().get("label", "") async for row in label_rows]
        rel_rows = await session.run("CALL db.relationshipTypes()")
        relationships = [row.data().get("relationshipType", "") async for row in rel_rows]
    return {
        "labels": [label for label in labels if label],
        "relationships": [rel for rel in relationships if rel],
    }
execute async
execute(
    cypher: str,
    parameters: Mapping[str, Any] | None = None,
    *,
    top_k: int = 5,
) -> list[RAGContextChunk]

Run a Cypher statement and project the rows into RAGContextChunks.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def execute(
    self,
    cypher: str,
    parameters: Mapping[str, Any] | None = None,
    *,
    top_k: int = 5,
) -> list[RAGContextChunk]:
    """Run a Cypher statement and project the rows into `RAGContextChunk`s."""
    chunks: list[RAGContextChunk] = []
    async with self._session() as session:
        result = await session.run(cypher, parameters or {})
        count = 0
        async for record in result:
            if count >= top_k:
                break
            row = record.data()
            identifier = str(
                row.get("id") or row.get("identifier") or f"row-{count}",
            )
            text = str(row.get("text") or row)
            chunks.append(
                RAGContextChunk(
                    text=text,
                    metadata=SourceMetadata(
                        source="neo4j",
                        identifier=identifier,
                        score=None,
                        extra={"row": row},
                    ),
                )
            )
            count += 1
    return chunks
verify_plugins async
verify_plugins() -> dict[str, Any]

Probe the graph for APOC and GDS availability.

Returns a status dict shaped like::

{"apoc": True, "gds": False, "detail": {"apoc": "...", "gds": "..."}}

The probe runs lightweight introspection calls (apoc.help('apoc') and gds.list()) and never raises — when a procedure is missing the corresponding flag is set to False and the failure reason is captured in detail.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def verify_plugins(self) -> dict[str, Any]:
    """Probe the graph for APOC and GDS availability.

    Returns a status dict shaped like::

        {"apoc": True, "gds": False, "detail": {"apoc": "...", "gds": "..."}}

    The probe runs lightweight introspection calls (`apoc.help('apoc')`
    and `gds.list()`) and never raises — when a procedure is missing the
    corresponding flag is set to `False` and the failure reason is
    captured in `detail`.
    """
    status: dict[str, Any] = {"apoc": False, "gds": False, "detail": {}}
    async with self._session() as session:
        for plugin, probe in (
            ("apoc", "CALL apoc.help('apoc') YIELD name RETURN count(name) AS n"),
            ("gds", "CALL gds.list() YIELD name RETURN count(name) AS n"),
        ):
            try:
                result = await session.run(probe)
                count = 0
                async for record in result:
                    count = int(record.data().get("n", 0) or 0)
                    break
                status[plugin] = count > 0
                if count == 0:
                    status["detail"][plugin] = "no procedures registered"
            except Exception as exc:  # noqa: BLE001 — translate driver errors
                status[plugin] = False
                status["detail"][plugin] = str(exc)
    return status
Neo4jUnavailableError

Bases: RuntimeError

Raised when the optional neo4j dependency is missing.

reranker

Reranker providers — high-precision second-stage scoring for retrieved chunks.

Two production-grade implementations are shipped, each behind a unified RerankerProvider interface:

  • CohereRerankProvider — Cohere Rerank v4 over the public REST API.
  • Qwen3RerankerProvider — open-source SOTA Qwen3-Reranker-4B exposed via an OpenAI-compatible inference server (vLLM, TGI, or any compatible host).

Both providers talk over httpx so unit tests stay fully hermetic with respx. A NoOpRerankerProvider is provided as a passthrough default so the skill can opt out without conditional logic at the call site.

DEFAULT_RERANK_TIMEOUT_SECONDS module-attribute
DEFAULT_RERANK_TIMEOUT_SECONDS: float = 30.0
QWEN3_LONG_CONTEXT_TOKENS module-attribute
QWEN3_LONG_CONTEXT_TOKENS: int = 32768
__all__ module-attribute
__all__ = [
    "DEFAULT_RERANK_TIMEOUT_SECONDS",
    "QWEN3_LONG_CONTEXT_TOKENS",
    "CohereRerankProvider",
    "NoOpRerankerProvider",
    "Qwen3RerankerProvider",
    "RerankerConfig",
    "RerankerProvider",
]
CohereRerankProvider
CohereRerankProvider(
    api_key: str,
    *,
    model: str = DEFAULT_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    config: RerankerConfig | None = None,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
)

Bases: RerankerProvider

Cohere Rerank v4 over the public REST API.

Requires a CO_API_KEY. The provider is async-first and reuses a single httpx.AsyncClient for connection pooling. Pass an existing client to share pools with surrounding application code.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
def __init__(
    self,
    api_key: str,
    *,
    model: str = DEFAULT_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    config: RerankerConfig | None = None,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
) -> None:
    self._api_key = api_key
    self._model = model
    self._endpoint = endpoint.rstrip("/")
    self._config = config or RerankerConfig()
    self._client = client
    self._owns_client = client is None
    self._timeout_seconds = timeout_seconds
DEFAULT_ENDPOINT class-attribute instance-attribute
DEFAULT_ENDPOINT = 'https://api.cohere.com'
DEFAULT_MODEL class-attribute instance-attribute
DEFAULT_MODEL = 'rerank-v3.5'
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None
rerank async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    if not chunks:
        return []
    limit = top_k if top_k is not None else self._config.top_k
    client = await self._ensure_client()
    response = await client.post(
        "/v2/rerank",
        json={
            "model": self._model,
            "query": query,
            "documents": [chunk.text for chunk in chunks],
            "top_n": limit,
            "max_tokens_per_doc": self._config.max_context_tokens,
        },
    )
    response.raise_for_status()
    payload = response.json()
    scored: list[RAGContextChunk] = []
    for entry in payload.get("results", []):
        index = int(entry.get("index", -1))
        if index < 0 or index >= len(chunks):
            continue
        score = float(entry.get("relevance_score", 0.0))
        scored.append(_stamp_score(chunks[index], score))
    return _apply_score_filters(
        scored,
        threshold=self._config.score_threshold,
        top_k=limit,
    )
NoOpRerankerProvider

Bases: RerankerProvider

Passthrough reranker — preserves original ordering and trims to top_k.

rerank async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    limit = top_k if top_k is not None else len(chunks)
    return list(chunks[:limit])
Qwen3RerankerProvider
Qwen3RerankerProvider(
    endpoint: str,
    *,
    api_key: str | None = None,
    model: str = DEFAULT_MODEL,
    config: RerankerConfig | None = None,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
)

Bases: RerankerProvider

Qwen3-Reranker-4B served via an OpenAI-compatible inference endpoint.

The provider POSTs {query, documents, top_n} to endpoint and expects a {results: [{index, score}]} envelope. vLLM, TGI, and Together AI all expose this shape for cross-encoder reranker models.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
def __init__(
    self,
    endpoint: str,
    *,
    api_key: str | None = None,
    model: str = DEFAULT_MODEL,
    config: RerankerConfig | None = None,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
) -> None:
    self._endpoint = endpoint.rstrip("/")
    self._api_key = api_key
    self._model = model
    self._config = config or RerankerConfig()
    self._client = client
    self._owns_client = client is None
    self._timeout_seconds = timeout_seconds
DEFAULT_MODEL class-attribute instance-attribute
DEFAULT_MODEL = 'Qwen/Qwen3-Reranker-4B'
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None
rerank async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    if not chunks:
        return []
    limit = top_k if top_k is not None else self._config.top_k
    client = await self._ensure_client()
    response = await client.post(
        "/v1/rerank",
        json={
            "model": self._model,
            "query": query,
            "documents": [chunk.text for chunk in chunks],
            "top_n": limit,
            "max_tokens": self._config.max_context_tokens,
        },
    )
    response.raise_for_status()
    payload = response.json()
    scored: list[RAGContextChunk] = []
    # Accept both `results` (Cohere-style) and `data` (vLLM-style) envelopes
    # so the same provider talks to multiple inference backends without
    # bespoke client code per host.
    rows: Iterable[dict[str, Any]] = payload.get("results") or payload.get("data") or []
    for entry in rows:
        index = int(entry.get("index", -1))
        if index < 0 or index >= len(chunks):
            continue
        raw_score = entry.get("relevance_score")
        if raw_score is None:
            raw_score = entry.get("score", 0.0)
        score = float(raw_score if raw_score is not None else 0.0)
        scored.append(_stamp_score(chunks[index], score))
    return _apply_score_filters(
        scored,
        threshold=self._config.score_threshold,
        top_k=limit,
    )
RerankerConfig dataclass
RerankerConfig(
    top_k: int = 5,
    max_context_tokens: int = QWEN3_LONG_CONTEXT_TOKENS,
    score_threshold: float | None = None,
    extra_headers: dict[str, str] = dict(),
)

Common reranker configuration knobs shared by every backend.

Attributes:

Name Type Description
top_k int

Maximum number of chunks the reranker should return.

max_context_tokens int

Per-pair input token budget. Defaults to the 32k window supported by Qwen3-Reranker-4B; Cohere Rerank v4 also operates against long-form documents and respects this hint.

score_threshold float | None

Optional minimum score; chunks below it are dropped.

extra_headers dict[str, str]

Extra headers forwarded with every request.

extra_headers class-attribute instance-attribute
extra_headers: dict[str, str] = field(default_factory=dict)
max_context_tokens class-attribute instance-attribute
max_context_tokens: int = QWEN3_LONG_CONTEXT_TOKENS
score_threshold class-attribute instance-attribute
score_threshold: float | None = None
top_k class-attribute instance-attribute
top_k: int = 5
RerankerProvider

Bases: ABC

Abstract base for any reranker implementation.

rerank abstractmethod async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]

Return chunks reordered by relevance to query, truncated to top_k.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
@abstractmethod
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    """Return chunks reordered by relevance to `query`, truncated to `top_k`."""

web

Web search provider abstraction.

The agentic RAG skill needs to fall back to live web data when internal sources are insufficient. This module exposes:

  • WebSearchProvider: an abstract interface (just search(query, top_k)).
  • BrowserWebSearchProvider: a default implementation that wraps the existing AgentBrowserSkill plus a configurable URL builder so any search engine (Bing, Brave, Tavily, Perplexity, custom intranet) can be plugged in by changing the URL template.
UrlBuilder module-attribute
UrlBuilder = Callable[[str], str]
__all__ module-attribute
__all__ = [
    "BrowserWebSearchProvider",
    "UrlBuilder",
    "WebSearchProvider",
    "default_duckduckgo_url",
]
BrowserWebSearchProvider
BrowserWebSearchProvider(
    *,
    browser: AgentBrowserSkill | None = None,
    url_builder: UrlBuilder = default_duckduckgo_url,
)

Bases: WebSearchProvider

Default WebSearchProvider that delegates to AgentBrowserSkill.

The url_builder callable controls which search engine is queried, so the same provider works with DuckDuckGo, Bing, Brave, or any private enterprise search portal.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
def __init__(
    self,
    *,
    browser: AgentBrowserSkill | None = None,
    url_builder: UrlBuilder = default_duckduckgo_url,
) -> None:
    self._browser = browser or AgentBrowserSkill()
    self._url_builder = url_builder
    self._cached_browse: Callable[..., Awaitable[str]] | None = None
search async
search(
    query: str, *, top_k: int = 5
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/web.py
async def search(self, query: str, *, top_k: int = 5) -> list[RAGContextChunk]:
    url = self._url_builder(query)
    browse = self._browse_callable()
    raw = await browse(url=url)
    payload = json.loads(raw)
    if "error" in payload:
        return []
    text: str = payload.get("text", "")
    if not text:
        return []
    # Split the body into paragraph-ish chunks and keep the top_k strongest.
    candidates = [block.strip() for block in text.split("\n\n") if block.strip()]
    if not candidates:
        candidates = [text]
    chunks: list[RAGContextChunk] = []
    for index, snippet in enumerate(candidates[:top_k]):
        chunks.append(
            RAGContextChunk(
                text=snippet,
                metadata=SourceMetadata(
                    source="web",
                    identifier=f"{payload.get('final_url') or url}#chunk-{index}",
                    score=None,
                    extra={
                        "search_url": url,
                        "status_code": payload.get("status_code"),
                        "final_url": payload.get("final_url"),
                    },
                ),
            )
        )
    return chunks
WebSearchProvider

Bases: ABC

Abstract base for any web search backend.

search abstractmethod async
search(
    query: str, *, top_k: int = 5
) -> list[RAGContextChunk]

Return chunks containing live web text for the given query.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
@abstractmethod
async def search(self, query: str, *, top_k: int = 5) -> list[RAGContextChunk]:
    """Return chunks containing live web text for the given query."""
default_duckduckgo_url
default_duckduckgo_url(query: str) -> str

Return the DuckDuckGo HTML endpoint for query. Used as the safe default.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
def default_duckduckgo_url(query: str) -> str:
    """Return the DuckDuckGo HTML endpoint for `query`. Used as the safe default."""
    from urllib.parse import quote_plus

    return f"https://duckduckgo.com/html/?q={quote_plus(query)}"

skill

AgenticRAGSkill — multi-source orchestrator for graph, vector, and web retrieval.

APPROX_CHARS_PER_TOKEN module-attribute

APPROX_CHARS_PER_TOKEN = 4

ChunkEnvelope module-attribute

ChunkEnvelope = dict[str, Any]

DEFAULT_CITATION_SAFETY_BUFFER_TOKENS module-attribute

DEFAULT_CITATION_SAFETY_BUFFER_TOKENS = 64

DEFAULT_TOKEN_BUDGET module-attribute

DEFAULT_TOKEN_BUDGET = 8000

__all__ module-attribute

__all__ = [
    "AgenticRAGSkill",
    "DEFAULT_CITATION_SAFETY_BUFFER_TOKENS",
    "DEFAULT_TOKEN_BUDGET",
    "estimate_citation_tokens",
    "truncate_chunks_to_budget",
]

AgenticRAGSkill

AgenticRAGSkill(
    *,
    neo4j: Neo4jGraphProvider | None = None,
    azure: AzureSearchProvider | None = None,
    web: WebSearchProvider | None = None,
    reranker: RerankerProvider | None = None,
    token_budget: int = DEFAULT_TOKEN_BUDGET,
    citation_buffer_tokens: int = DEFAULT_CITATION_SAFETY_BUFFER_TOKENS,
)

Bases: BaseSkill

Multi-source retrieval orchestrator (Graph + Hybrid Vector + Web).

Source code in mirai_shared_skills/agentic_rag/skill.py
def __init__(
    self,
    *,
    neo4j: Neo4jGraphProvider | None = None,
    azure: AzureSearchProvider | None = None,
    web: WebSearchProvider | None = None,
    reranker: RerankerProvider | None = None,
    token_budget: int = DEFAULT_TOKEN_BUDGET,
    citation_buffer_tokens: int = DEFAULT_CITATION_SAFETY_BUFFER_TOKENS,
) -> None:
    self._neo4j = neo4j
    self._azure = azure
    self._web = web or BrowserWebSearchProvider()
    self._reranker: RerankerProvider = reranker or NoOpRerankerProvider()
    self._token_budget = token_budget
    self._citation_buffer = citation_buffer_tokens
description property
description: str
instructions property
instructions: str
name property
name: str
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/skill.py
async def aclose(self) -> None:
    if self._neo4j is not None:
        await self._neo4j.aclose()
    if self._azure is not None:
        await self._azure.aclose()
get_tools
get_tools() -> list[Tool]
Source code in mirai_shared_skills/agentic_rag/skill.py
def get_tools(self) -> list[Tool]:
    async def graph_retrieval(
        query: str,
        top_k: int = 5,
        cypher_template: str | None = None,
        parameters: dict[str, Any] | None = None,
    ) -> str:
        """Execute a Cypher template against Neo4j and return chunks."""
        return await self._wrap(
            query,
            self._graph_retrieval_impl(query, top_k, cypher_template, parameters),
            "neo4j_error",
            top_k=top_k,
        )

    async def graph_schema() -> str:
        """Return the labels and relationships present in the graph."""
        return await self._graph_schema_impl()

    async def verify_graph_plugins() -> str:
        """Check whether APOC and GDS plugins are active in the graph."""
        return await self._verify_plugins_impl()

    async def list_cypher_templates() -> str:
        """Return the names + bodies of available Cypher templates."""
        return json.dumps({"templates": TEMPLATES}, indent=2)

    async def enterprise_search(
        query: str,
        top_k: int = 5,
        odata_filter: str | None = None,
    ) -> str:
        """Hybrid search against Azure AI Search with optional OData filter."""
        return await self._wrap(
            query,
            self._enterprise_search_impl(query, top_k, odata_filter),
            "azure_error",
            top_k=top_k,
        )

    async def web_intelligence(query: str, top_k: int = 5) -> str:
        """Fan out to the configured web search provider for live context."""
        return await self._wrap(
            query,
            self._web_intelligence_impl(query, top_k),
            "web_error",
            top_k=top_k,
        )

    async def parallel_retrieval(query: str, top_k: int = 5) -> str:
        """Run every configured source in parallel and merge the chunks."""
        return await self._parallel_retrieval_impl(query, top_k)

    return [
        tool_from_function(async_fn=graph_retrieval),
        tool_from_function(async_fn=graph_schema),
        tool_from_function(async_fn=verify_graph_plugins),
        tool_from_function(async_fn=list_cypher_templates),
        tool_from_function(async_fn=enterprise_search),
        tool_from_function(async_fn=web_intelligence),
        tool_from_function(async_fn=parallel_retrieval),
    ]

estimate_citation_tokens

estimate_citation_tokens(
    chunks: Sequence[RAGContextChunk],
) -> int

Return the approximate token cost of citing every chunk in chunks.

Each citation reserves space for the source label, the identifier (often a URL), and the JSON-serialised metadata extras the LLM may surface back to the user. Token counts are estimated as chars / 4 to stay tokeniser-free.

Source code in mirai_shared_skills/agentic_rag/skill.py
def estimate_citation_tokens(chunks: Sequence[RAGContextChunk]) -> int:
    """Return the approximate token cost of citing every chunk in `chunks`.

    Each citation reserves space for the source label, the identifier (often a
    URL), and the JSON-serialised metadata extras the LLM may surface back to
    the user. Token counts are estimated as `chars / 4` to stay tokeniser-free.
    """
    total_chars = 0
    for chunk in chunks:
        meta = chunk.metadata
        total_chars += len(meta.source) + len(meta.identifier)
        # Cheap upper bound on the cost of round-tripping `extra` through JSON.
        if meta.extra:
            try:
                total_chars += len(json.dumps(meta.extra, default=str))
            except (TypeError, ValueError):
                # Fallback when extras contain non-JSON-able values; charge a
                # conservative flat cost so the budget still caps growth.
                total_chars += sum(len(str(v)) for v in meta.extra.values())
    return total_chars // APPROX_CHARS_PER_TOKEN

truncate_chunks_to_budget

truncate_chunks_to_budget(
    chunks: Sequence[RAGContextChunk],
    token_budget: int = DEFAULT_TOKEN_BUDGET,
    *,
    citation_buffer_tokens: int = DEFAULT_CITATION_SAFETY_BUFFER_TOKENS,
) -> list[RAGContextChunk]

Citation-aware greedy truncation.

Chunks are fitted into a budget reduced by the projected citation overhead:

chunk_budget = token_budget - (estimated_citation_tokens + citation_buffer)

The citation estimate is derived from the source label, identifier, and metadata extras of the candidate chunks. The safety buffer accommodates structural overhead (commas, JSON braces, surrounding prose). When the resulting budget is non-positive the function returns an empty list.

Parameters:

Name Type Description Default
chunks Sequence[RAGContextChunk]

Candidate chunks in priority order. The first chunk is always kept verbatim if any budget remains.

required
token_budget int

Total tokens the synthesis prompt can spend on retrieval.

DEFAULT_TOKEN_BUDGET
citation_buffer_tokens int

Constant token allowance added to the dynamic citation estimate. Defaults to 64 tokens.

DEFAULT_CITATION_SAFETY_BUFFER_TOKENS

Returns:

Type Description
list[RAGContextChunk]

A list of chunks whose total textual char count fits within the

list[RAGContextChunk]

derived character budget. The trailing chunk may be partially

list[RAGContextChunk]

truncated; in that case its metadata.extra["truncated"] is set.

Source code in mirai_shared_skills/agentic_rag/skill.py
def truncate_chunks_to_budget(
    chunks: Sequence[RAGContextChunk],
    token_budget: int = DEFAULT_TOKEN_BUDGET,
    *,
    citation_buffer_tokens: int = DEFAULT_CITATION_SAFETY_BUFFER_TOKENS,
) -> list[RAGContextChunk]:
    """Citation-aware greedy truncation.

    Chunks are fitted into a budget reduced by the projected citation overhead:

        chunk_budget = token_budget - (estimated_citation_tokens + citation_buffer)

    The citation estimate is derived from the source label, identifier, and
    metadata extras of the candidate chunks. The safety buffer accommodates
    structural overhead (commas, JSON braces, surrounding prose). When the
    resulting budget is non-positive the function returns an empty list.

    Args:
        chunks: Candidate chunks in priority order. The first chunk is always
            kept verbatim if any budget remains.
        token_budget: Total tokens the synthesis prompt can spend on retrieval.
        citation_buffer_tokens: Constant token allowance added to the dynamic
            citation estimate. Defaults to 64 tokens.

    Returns:
        A list of chunks whose total textual char count fits within the
        derived character budget. The trailing chunk may be partially
        truncated; in that case its `metadata.extra["truncated"]` is set.
    """
    if not chunks:
        return []

    citation_overhead = estimate_citation_tokens(chunks) + citation_buffer_tokens
    chunk_token_budget = token_budget - citation_overhead
    if chunk_token_budget <= 0:
        return []

    char_budget = chunk_token_budget * APPROX_CHARS_PER_TOKEN
    kept: list[RAGContextChunk] = []
    used = 0
    for chunk in chunks:
        cost = len(chunk.text)
        if used + cost <= char_budget:
            kept.append(chunk)
            used += cost
            continue
        remaining = char_budget - used
        if remaining <= 0:
            break
        truncated_text = chunk.text[:remaining]
        kept.append(
            RAGContextChunk(
                text=truncated_text,
                metadata=chunk.metadata.model_copy(
                    update={"extra": {**chunk.metadata.extra, "truncated": True}},
                ),
            )
        )
        break
    return kept

Models

mirai_shared_skills.agentic_rag.models

Strict Pydantic v2 schemas shared by every retrieval provider.

SourceName module-attribute

SourceName = Literal['neo4j', 'azure-search', 'web']

__all__ module-attribute

__all__ = [
    "RAGContextChunk",
    "RetrievalQuery",
    "SourceMetadata",
    "SourceName",
]

RAGContextChunk

Bases: BaseModel

A single unit of context returned by any retrieval provider.

metadata class-attribute instance-attribute

metadata: SourceMetadata = Field(
    description="Provenance metadata for citation."
)

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='forbid')

text class-attribute instance-attribute

text: str = Field(
    min_length=1,
    description="Plain-text content for the LLM to read.",
)

RetrievalQuery

Bases: BaseModel

Caller-supplied query envelope sent to the retrieval providers.

filters class-attribute instance-attribute

filters: dict[str, Any] | None = Field(
    default=None,
    description="Provider-specific filter map (e.g. OData filter for Azure).",
)

model_config class-attribute instance-attribute

model_config = ConfigDict(
    extra="forbid", str_strip_whitespace=True
)

query class-attribute instance-attribute

query: str = Field(
    min_length=1,
    description="Natural-language query string.",
)

top_k class-attribute instance-attribute

top_k: int = Field(
    default=5,
    ge=1,
    le=50,
    description="Maximum chunks to return.",
)

SourceMetadata

Bases: BaseModel

Provenance attached to every retrieved chunk so the LLM can cite sources.

extra class-attribute instance-attribute

extra: dict[str, Any] = Field(
    default_factory=dict,
    description="Free-form provider extras (URL, label, embedding, etc.).",
)

identifier class-attribute instance-attribute

identifier: str = Field(
    min_length=1,
    description="Stable ID of the underlying record.",
)

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='forbid')

score class-attribute instance-attribute

score: float | None = Field(
    default=None,
    description="Provider-reported relevance score, when available.",
)

source class-attribute instance-attribute

source: SourceName = Field(
    description="Provider identifier emitting this chunk."
)

Providers

mirai_shared_skills.agentic_rag.providers

Retrieval providers consumed by the AgenticRAGSkill.

__all__ module-attribute

__all__ = [
    "AzureSearchProvider",
    "BrowserWebSearchProvider",
    "CohereRerankProvider",
    "Neo4jGraphProvider",
    "NoOpRerankerProvider",
    "Qwen3RerankerProvider",
    "RerankerConfig",
    "RerankerProvider",
    "WebSearchProvider",
]

AzureSearchProvider

AzureSearchProvider(
    config: AzureSearchConfig,
    *,
    client: AsyncClient | None = None,
    embedder: EmbeddingFn | None = None,
    timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
)

Async hybrid-search wrapper over the Azure AI Search REST API.

Source code in mirai_shared_skills/agentic_rag/providers/azure.py
def __init__(
    self,
    config: AzureSearchConfig,
    *,
    client: httpx.AsyncClient | None = None,
    embedder: EmbeddingFn | None = None,
    timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
) -> None:
    self._config = config
    self._client = client
    self._owns_client = client is None
    self._embedder = embedder
    self._timeout_seconds = timeout_seconds

aclose async

aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/azure.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None

search async

search(
    query: str,
    *,
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
) -> list[RAGContextChunk]

Issue a hybrid search and project the response into context chunks.

Source code in mirai_shared_skills/agentic_rag/providers/azure.py
async def search(
    self,
    query: str,
    *,
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
) -> list[RAGContextChunk]:
    """Issue a hybrid search and project the response into context chunks."""
    client = await self._ensure_client()
    body: dict[str, Any] = {
        "search": query,
        "top": top_k,
        "queryType": "semantic" if self._config.semantic_configuration else "simple",
    }
    if self._config.semantic_configuration:
        body["semanticConfiguration"] = self._config.semantic_configuration
    if filters and isinstance(filters.get("odata"), str):
        body["filter"] = filters["odata"]
    vector = await self._embed(query)
    if vector is not None:
        body["vectorQueries"] = [
            {
                "kind": "vector",
                "vector": list(vector),
                "k": top_k,
                "fields": ",".join(self._config.vector_fields),
            }
        ]
    path = f"/indexes/{self._config.index_name}/docs/search"
    params = {"api-version": self._config.api_version}
    response = await client.post(path, params=params, json=body)
    response.raise_for_status()
    payload = response.json()
    chunks: list[RAGContextChunk] = []
    for entry in payload.get("value", [])[:top_k]:
        identifier = str(entry.get("id") or entry.get("@search.documentId") or "unknown")
        text = str(entry.get("content") or entry.get("text") or "")
        if not text:
            continue
        score = entry.get("@search.rerankerScore") or entry.get("@search.score")
        chunks.append(
            RAGContextChunk(
                text=text,
                metadata=SourceMetadata(
                    source="azure-search",
                    identifier=identifier,
                    score=float(score) if score is not None else None,
                    extra={
                        k: v
                        for k, v in entry.items()
                        if k not in {"content", "text"} and not k.startswith("@search.")
                    },
                ),
            )
        )
    return chunks

BrowserWebSearchProvider

BrowserWebSearchProvider(
    *,
    browser: AgentBrowserSkill | None = None,
    url_builder: UrlBuilder = default_duckduckgo_url,
)

Bases: WebSearchProvider

Default WebSearchProvider that delegates to AgentBrowserSkill.

The url_builder callable controls which search engine is queried, so the same provider works with DuckDuckGo, Bing, Brave, or any private enterprise search portal.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
def __init__(
    self,
    *,
    browser: AgentBrowserSkill | None = None,
    url_builder: UrlBuilder = default_duckduckgo_url,
) -> None:
    self._browser = browser or AgentBrowserSkill()
    self._url_builder = url_builder
    self._cached_browse: Callable[..., Awaitable[str]] | None = None

search async

search(
    query: str, *, top_k: int = 5
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/web.py
async def search(self, query: str, *, top_k: int = 5) -> list[RAGContextChunk]:
    url = self._url_builder(query)
    browse = self._browse_callable()
    raw = await browse(url=url)
    payload = json.loads(raw)
    if "error" in payload:
        return []
    text: str = payload.get("text", "")
    if not text:
        return []
    # Split the body into paragraph-ish chunks and keep the top_k strongest.
    candidates = [block.strip() for block in text.split("\n\n") if block.strip()]
    if not candidates:
        candidates = [text]
    chunks: list[RAGContextChunk] = []
    for index, snippet in enumerate(candidates[:top_k]):
        chunks.append(
            RAGContextChunk(
                text=snippet,
                metadata=SourceMetadata(
                    source="web",
                    identifier=f"{payload.get('final_url') or url}#chunk-{index}",
                    score=None,
                    extra={
                        "search_url": url,
                        "status_code": payload.get("status_code"),
                        "final_url": payload.get("final_url"),
                    },
                ),
            )
        )
    return chunks

CohereRerankProvider

CohereRerankProvider(
    api_key: str,
    *,
    model: str = DEFAULT_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    config: RerankerConfig | None = None,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
)

Bases: RerankerProvider

Cohere Rerank v4 over the public REST API.

Requires a CO_API_KEY. The provider is async-first and reuses a single httpx.AsyncClient for connection pooling. Pass an existing client to share pools with surrounding application code.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
def __init__(
    self,
    api_key: str,
    *,
    model: str = DEFAULT_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    config: RerankerConfig | None = None,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
) -> None:
    self._api_key = api_key
    self._model = model
    self._endpoint = endpoint.rstrip("/")
    self._config = config or RerankerConfig()
    self._client = client
    self._owns_client = client is None
    self._timeout_seconds = timeout_seconds

DEFAULT_ENDPOINT class-attribute instance-attribute

DEFAULT_ENDPOINT = 'https://api.cohere.com'

DEFAULT_MODEL class-attribute instance-attribute

DEFAULT_MODEL = 'rerank-v3.5'

aclose async

aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None

rerank async

rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    if not chunks:
        return []
    limit = top_k if top_k is not None else self._config.top_k
    client = await self._ensure_client()
    response = await client.post(
        "/v2/rerank",
        json={
            "model": self._model,
            "query": query,
            "documents": [chunk.text for chunk in chunks],
            "top_n": limit,
            "max_tokens_per_doc": self._config.max_context_tokens,
        },
    )
    response.raise_for_status()
    payload = response.json()
    scored: list[RAGContextChunk] = []
    for entry in payload.get("results", []):
        index = int(entry.get("index", -1))
        if index < 0 or index >= len(chunks):
            continue
        score = float(entry.get("relevance_score", 0.0))
        scored.append(_stamp_score(chunks[index], score))
    return _apply_score_filters(
        scored,
        threshold=self._config.score_threshold,
        top_k=limit,
    )

Neo4jGraphProvider

Neo4jGraphProvider(
    connection: Neo4jConnection | None = None,
    *,
    driver: _AsyncDriver | None = None,
)

Async Neo4j wrapper exposing schema introspection plus parameterised queries.

Test seams: callers can inject driver directly so unit tests bypass the real driver factory. Otherwise the driver is created lazily on first use and reused (singleton) for the lifetime of the provider instance.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
def __init__(
    self,
    connection: Neo4jConnection | None = None,
    *,
    driver: _AsyncDriver | None = None,
) -> None:
    if connection is None and driver is None:
        raise ValueError("Neo4jGraphProvider requires either `connection` or `driver`.")
    self._connection = connection
    self._driver: _AsyncDriver | None = driver

aclose async

aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def aclose(self) -> None:
    if self._driver is not None:
        await self._driver.close()
        self._driver = None

describe_schema async

describe_schema() -> dict[str, list[str]]

Return labels and relationship types known to the graph.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def describe_schema(self) -> dict[str, list[str]]:
    """Return labels and relationship types known to the graph."""
    async with self._session() as session:
        label_rows = await session.run("CALL db.labels()")
        labels = [row.data().get("label", "") async for row in label_rows]
        rel_rows = await session.run("CALL db.relationshipTypes()")
        relationships = [row.data().get("relationshipType", "") async for row in rel_rows]
    return {
        "labels": [label for label in labels if label],
        "relationships": [rel for rel in relationships if rel],
    }

execute async

execute(
    cypher: str,
    parameters: Mapping[str, Any] | None = None,
    *,
    top_k: int = 5,
) -> list[RAGContextChunk]

Run a Cypher statement and project the rows into RAGContextChunks.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def execute(
    self,
    cypher: str,
    parameters: Mapping[str, Any] | None = None,
    *,
    top_k: int = 5,
) -> list[RAGContextChunk]:
    """Run a Cypher statement and project the rows into `RAGContextChunk`s."""
    chunks: list[RAGContextChunk] = []
    async with self._session() as session:
        result = await session.run(cypher, parameters or {})
        count = 0
        async for record in result:
            if count >= top_k:
                break
            row = record.data()
            identifier = str(
                row.get("id") or row.get("identifier") or f"row-{count}",
            )
            text = str(row.get("text") or row)
            chunks.append(
                RAGContextChunk(
                    text=text,
                    metadata=SourceMetadata(
                        source="neo4j",
                        identifier=identifier,
                        score=None,
                        extra={"row": row},
                    ),
                )
            )
            count += 1
    return chunks

verify_plugins async

verify_plugins() -> dict[str, Any]

Probe the graph for APOC and GDS availability.

Returns a status dict shaped like::

{"apoc": True, "gds": False, "detail": {"apoc": "...", "gds": "..."}}

The probe runs lightweight introspection calls (apoc.help('apoc') and gds.list()) and never raises — when a procedure is missing the corresponding flag is set to False and the failure reason is captured in detail.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def verify_plugins(self) -> dict[str, Any]:
    """Probe the graph for APOC and GDS availability.

    Returns a status dict shaped like::

        {"apoc": True, "gds": False, "detail": {"apoc": "...", "gds": "..."}}

    The probe runs lightweight introspection calls (`apoc.help('apoc')`
    and `gds.list()`) and never raises — when a procedure is missing the
    corresponding flag is set to `False` and the failure reason is
    captured in `detail`.
    """
    status: dict[str, Any] = {"apoc": False, "gds": False, "detail": {}}
    async with self._session() as session:
        for plugin, probe in (
            ("apoc", "CALL apoc.help('apoc') YIELD name RETURN count(name) AS n"),
            ("gds", "CALL gds.list() YIELD name RETURN count(name) AS n"),
        ):
            try:
                result = await session.run(probe)
                count = 0
                async for record in result:
                    count = int(record.data().get("n", 0) or 0)
                    break
                status[plugin] = count > 0
                if count == 0:
                    status["detail"][plugin] = "no procedures registered"
            except Exception as exc:  # noqa: BLE001 — translate driver errors
                status[plugin] = False
                status["detail"][plugin] = str(exc)
    return status

NoOpRerankerProvider

Bases: RerankerProvider

Passthrough reranker — preserves original ordering and trims to top_k.

rerank async

rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    limit = top_k if top_k is not None else len(chunks)
    return list(chunks[:limit])

Qwen3RerankerProvider

Qwen3RerankerProvider(
    endpoint: str,
    *,
    api_key: str | None = None,
    model: str = DEFAULT_MODEL,
    config: RerankerConfig | None = None,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
)

Bases: RerankerProvider

Qwen3-Reranker-4B served via an OpenAI-compatible inference endpoint.

The provider POSTs {query, documents, top_n} to endpoint and expects a {results: [{index, score}]} envelope. vLLM, TGI, and Together AI all expose this shape for cross-encoder reranker models.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
def __init__(
    self,
    endpoint: str,
    *,
    api_key: str | None = None,
    model: str = DEFAULT_MODEL,
    config: RerankerConfig | None = None,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
) -> None:
    self._endpoint = endpoint.rstrip("/")
    self._api_key = api_key
    self._model = model
    self._config = config or RerankerConfig()
    self._client = client
    self._owns_client = client is None
    self._timeout_seconds = timeout_seconds

DEFAULT_MODEL class-attribute instance-attribute

DEFAULT_MODEL = 'Qwen/Qwen3-Reranker-4B'

aclose async

aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None

rerank async

rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    if not chunks:
        return []
    limit = top_k if top_k is not None else self._config.top_k
    client = await self._ensure_client()
    response = await client.post(
        "/v1/rerank",
        json={
            "model": self._model,
            "query": query,
            "documents": [chunk.text for chunk in chunks],
            "top_n": limit,
            "max_tokens": self._config.max_context_tokens,
        },
    )
    response.raise_for_status()
    payload = response.json()
    scored: list[RAGContextChunk] = []
    # Accept both `results` (Cohere-style) and `data` (vLLM-style) envelopes
    # so the same provider talks to multiple inference backends without
    # bespoke client code per host.
    rows: Iterable[dict[str, Any]] = payload.get("results") or payload.get("data") or []
    for entry in rows:
        index = int(entry.get("index", -1))
        if index < 0 or index >= len(chunks):
            continue
        raw_score = entry.get("relevance_score")
        if raw_score is None:
            raw_score = entry.get("score", 0.0)
        score = float(raw_score if raw_score is not None else 0.0)
        scored.append(_stamp_score(chunks[index], score))
    return _apply_score_filters(
        scored,
        threshold=self._config.score_threshold,
        top_k=limit,
    )

RerankerConfig dataclass

RerankerConfig(
    top_k: int = 5,
    max_context_tokens: int = QWEN3_LONG_CONTEXT_TOKENS,
    score_threshold: float | None = None,
    extra_headers: dict[str, str] = dict(),
)

Common reranker configuration knobs shared by every backend.

Attributes:

Name Type Description
top_k int

Maximum number of chunks the reranker should return.

max_context_tokens int

Per-pair input token budget. Defaults to the 32k window supported by Qwen3-Reranker-4B; Cohere Rerank v4 also operates against long-form documents and respects this hint.

score_threshold float | None

Optional minimum score; chunks below it are dropped.

extra_headers dict[str, str]

Extra headers forwarded with every request.

extra_headers class-attribute instance-attribute

extra_headers: dict[str, str] = field(default_factory=dict)

max_context_tokens class-attribute instance-attribute

max_context_tokens: int = QWEN3_LONG_CONTEXT_TOKENS

score_threshold class-attribute instance-attribute

score_threshold: float | None = None

top_k class-attribute instance-attribute

top_k: int = 5

RerankerProvider

Bases: ABC

Abstract base for any reranker implementation.

rerank abstractmethod async

rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]

Return chunks reordered by relevance to query, truncated to top_k.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
@abstractmethod
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    """Return chunks reordered by relevance to `query`, truncated to `top_k`."""

WebSearchProvider

Bases: ABC

Abstract base for any web search backend.

search abstractmethod async

search(
    query: str, *, top_k: int = 5
) -> list[RAGContextChunk]

Return chunks containing live web text for the given query.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
@abstractmethod
async def search(self, query: str, *, top_k: int = 5) -> list[RAGContextChunk]:
    """Return chunks containing live web text for the given query."""

azure

Azure AI Search provider — hybrid retrieval over the REST API.

The provider issues hybrid (vector + BM25) queries with the Semantic Ranker enabled when semantic_configuration is provided. Filters are passed through verbatim as OData expressions.

A pluggable embedding callable produces vectors for the user query; tests can inject a stub callable to avoid hitting the real embedding model.

DEFAULT_API_VERSION module-attribute

DEFAULT_API_VERSION = '2024-07-01'

DEFAULT_TIMEOUT_SECONDS module-attribute

DEFAULT_TIMEOUT_SECONDS = 15.0

DEFAULT_VECTOR_FIELDS module-attribute

DEFAULT_VECTOR_FIELDS = ('contentVector',)

EmbeddingFn module-attribute

EmbeddingFn = Callable[[str], Awaitable[Sequence[float]]]

__all__ module-attribute

__all__ = [
    "AzureSearchConfig",
    "AzureSearchProvider",
    "EmbeddingFn",
]

AzureSearchConfig dataclass

AzureSearchConfig(
    endpoint: str,
    index_name: str,
    api_key: str,
    api_version: str = DEFAULT_API_VERSION,
    semantic_configuration: str | None = None,
    vector_fields: tuple[str, ...] = DEFAULT_VECTOR_FIELDS,
)

Connection params for an Azure AI Search index.

api_key instance-attribute
api_key: str
api_version class-attribute instance-attribute
api_version: str = DEFAULT_API_VERSION
endpoint instance-attribute
endpoint: str
index_name instance-attribute
index_name: str
semantic_configuration class-attribute instance-attribute
semantic_configuration: str | None = None
vector_fields class-attribute instance-attribute
vector_fields: tuple[str, ...] = DEFAULT_VECTOR_FIELDS

AzureSearchProvider

AzureSearchProvider(
    config: AzureSearchConfig,
    *,
    client: AsyncClient | None = None,
    embedder: EmbeddingFn | None = None,
    timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
)

Async hybrid-search wrapper over the Azure AI Search REST API.

Source code in mirai_shared_skills/agentic_rag/providers/azure.py
def __init__(
    self,
    config: AzureSearchConfig,
    *,
    client: httpx.AsyncClient | None = None,
    embedder: EmbeddingFn | None = None,
    timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
) -> None:
    self._config = config
    self._client = client
    self._owns_client = client is None
    self._embedder = embedder
    self._timeout_seconds = timeout_seconds
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/azure.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None
search async
search(
    query: str,
    *,
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
) -> list[RAGContextChunk]

Issue a hybrid search and project the response into context chunks.

Source code in mirai_shared_skills/agentic_rag/providers/azure.py
async def search(
    self,
    query: str,
    *,
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
) -> list[RAGContextChunk]:
    """Issue a hybrid search and project the response into context chunks."""
    client = await self._ensure_client()
    body: dict[str, Any] = {
        "search": query,
        "top": top_k,
        "queryType": "semantic" if self._config.semantic_configuration else "simple",
    }
    if self._config.semantic_configuration:
        body["semanticConfiguration"] = self._config.semantic_configuration
    if filters and isinstance(filters.get("odata"), str):
        body["filter"] = filters["odata"]
    vector = await self._embed(query)
    if vector is not None:
        body["vectorQueries"] = [
            {
                "kind": "vector",
                "vector": list(vector),
                "k": top_k,
                "fields": ",".join(self._config.vector_fields),
            }
        ]
    path = f"/indexes/{self._config.index_name}/docs/search"
    params = {"api-version": self._config.api_version}
    response = await client.post(path, params=params, json=body)
    response.raise_for_status()
    payload = response.json()
    chunks: list[RAGContextChunk] = []
    for entry in payload.get("value", [])[:top_k]:
        identifier = str(entry.get("id") or entry.get("@search.documentId") or "unknown")
        text = str(entry.get("content") or entry.get("text") or "")
        if not text:
            continue
        score = entry.get("@search.rerankerScore") or entry.get("@search.score")
        chunks.append(
            RAGContextChunk(
                text=text,
                metadata=SourceMetadata(
                    source="azure-search",
                    identifier=identifier,
                    score=float(score) if score is not None else None,
                    extra={
                        k: v
                        for k, v in entry.items()
                        if k not in {"content", "text"} and not k.startswith("@search.")
                    },
                ),
            )
        )
    return chunks

neo4j_graph

Neo4j graph retrieval provider.

neo4j is an optional dependency. The driver is imported lazily so the catalog can be loaded without the Neo4j package installed; attempting to execute a query without the driver raises a structured error chunk instead.

__all__ module-attribute

__all__ = [
    "Neo4jConnection",
    "Neo4jGraphProvider",
    "Neo4jUnavailableError",
]

Neo4jConnection dataclass

Neo4jConnection(
    uri: str,
    user: str,
    password: str,
    database: str | None = None,
)

Connection params for the Neo4j async driver.

database class-attribute instance-attribute
database: str | None = None
password instance-attribute
password: str
uri instance-attribute
uri: str
user instance-attribute
user: str

Neo4jGraphProvider

Neo4jGraphProvider(
    connection: Neo4jConnection | None = None,
    *,
    driver: _AsyncDriver | None = None,
)

Async Neo4j wrapper exposing schema introspection plus parameterised queries.

Test seams: callers can inject driver directly so unit tests bypass the real driver factory. Otherwise the driver is created lazily on first use and reused (singleton) for the lifetime of the provider instance.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
def __init__(
    self,
    connection: Neo4jConnection | None = None,
    *,
    driver: _AsyncDriver | None = None,
) -> None:
    if connection is None and driver is None:
        raise ValueError("Neo4jGraphProvider requires either `connection` or `driver`.")
    self._connection = connection
    self._driver: _AsyncDriver | None = driver
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def aclose(self) -> None:
    if self._driver is not None:
        await self._driver.close()
        self._driver = None
describe_schema async
describe_schema() -> dict[str, list[str]]

Return labels and relationship types known to the graph.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def describe_schema(self) -> dict[str, list[str]]:
    """Return labels and relationship types known to the graph."""
    async with self._session() as session:
        label_rows = await session.run("CALL db.labels()")
        labels = [row.data().get("label", "") async for row in label_rows]
        rel_rows = await session.run("CALL db.relationshipTypes()")
        relationships = [row.data().get("relationshipType", "") async for row in rel_rows]
    return {
        "labels": [label for label in labels if label],
        "relationships": [rel for rel in relationships if rel],
    }
execute async
execute(
    cypher: str,
    parameters: Mapping[str, Any] | None = None,
    *,
    top_k: int = 5,
) -> list[RAGContextChunk]

Run a Cypher statement and project the rows into RAGContextChunks.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def execute(
    self,
    cypher: str,
    parameters: Mapping[str, Any] | None = None,
    *,
    top_k: int = 5,
) -> list[RAGContextChunk]:
    """Run a Cypher statement and project the rows into `RAGContextChunk`s."""
    chunks: list[RAGContextChunk] = []
    async with self._session() as session:
        result = await session.run(cypher, parameters or {})
        count = 0
        async for record in result:
            if count >= top_k:
                break
            row = record.data()
            identifier = str(
                row.get("id") or row.get("identifier") or f"row-{count}",
            )
            text = str(row.get("text") or row)
            chunks.append(
                RAGContextChunk(
                    text=text,
                    metadata=SourceMetadata(
                        source="neo4j",
                        identifier=identifier,
                        score=None,
                        extra={"row": row},
                    ),
                )
            )
            count += 1
    return chunks
verify_plugins async
verify_plugins() -> dict[str, Any]

Probe the graph for APOC and GDS availability.

Returns a status dict shaped like::

{"apoc": True, "gds": False, "detail": {"apoc": "...", "gds": "..."}}

The probe runs lightweight introspection calls (apoc.help('apoc') and gds.list()) and never raises — when a procedure is missing the corresponding flag is set to False and the failure reason is captured in detail.

Source code in mirai_shared_skills/agentic_rag/providers/neo4j_graph.py
async def verify_plugins(self) -> dict[str, Any]:
    """Probe the graph for APOC and GDS availability.

    Returns a status dict shaped like::

        {"apoc": True, "gds": False, "detail": {"apoc": "...", "gds": "..."}}

    The probe runs lightweight introspection calls (`apoc.help('apoc')`
    and `gds.list()`) and never raises — when a procedure is missing the
    corresponding flag is set to `False` and the failure reason is
    captured in `detail`.
    """
    status: dict[str, Any] = {"apoc": False, "gds": False, "detail": {}}
    async with self._session() as session:
        for plugin, probe in (
            ("apoc", "CALL apoc.help('apoc') YIELD name RETURN count(name) AS n"),
            ("gds", "CALL gds.list() YIELD name RETURN count(name) AS n"),
        ):
            try:
                result = await session.run(probe)
                count = 0
                async for record in result:
                    count = int(record.data().get("n", 0) or 0)
                    break
                status[plugin] = count > 0
                if count == 0:
                    status["detail"][plugin] = "no procedures registered"
            except Exception as exc:  # noqa: BLE001 — translate driver errors
                status[plugin] = False
                status["detail"][plugin] = str(exc)
    return status

Neo4jUnavailableError

Bases: RuntimeError

Raised when the optional neo4j dependency is missing.

reranker

Reranker providers — high-precision second-stage scoring for retrieved chunks.

Two production-grade implementations are shipped, each behind a unified RerankerProvider interface:

  • CohereRerankProvider — Cohere Rerank v4 over the public REST API.
  • Qwen3RerankerProvider — open-source SOTA Qwen3-Reranker-4B exposed via an OpenAI-compatible inference server (vLLM, TGI, or any compatible host).

Both providers talk over httpx so unit tests stay fully hermetic with respx. A NoOpRerankerProvider is provided as a passthrough default so the skill can opt out without conditional logic at the call site.

DEFAULT_RERANK_TIMEOUT_SECONDS module-attribute

DEFAULT_RERANK_TIMEOUT_SECONDS: float = 30.0

QWEN3_LONG_CONTEXT_TOKENS module-attribute

QWEN3_LONG_CONTEXT_TOKENS: int = 32768

__all__ module-attribute

__all__ = [
    "DEFAULT_RERANK_TIMEOUT_SECONDS",
    "QWEN3_LONG_CONTEXT_TOKENS",
    "CohereRerankProvider",
    "NoOpRerankerProvider",
    "Qwen3RerankerProvider",
    "RerankerConfig",
    "RerankerProvider",
]

CohereRerankProvider

CohereRerankProvider(
    api_key: str,
    *,
    model: str = DEFAULT_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    config: RerankerConfig | None = None,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
)

Bases: RerankerProvider

Cohere Rerank v4 over the public REST API.

Requires a CO_API_KEY. The provider is async-first and reuses a single httpx.AsyncClient for connection pooling. Pass an existing client to share pools with surrounding application code.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
def __init__(
    self,
    api_key: str,
    *,
    model: str = DEFAULT_MODEL,
    endpoint: str = DEFAULT_ENDPOINT,
    config: RerankerConfig | None = None,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
) -> None:
    self._api_key = api_key
    self._model = model
    self._endpoint = endpoint.rstrip("/")
    self._config = config or RerankerConfig()
    self._client = client
    self._owns_client = client is None
    self._timeout_seconds = timeout_seconds
DEFAULT_ENDPOINT class-attribute instance-attribute
DEFAULT_ENDPOINT = 'https://api.cohere.com'
DEFAULT_MODEL class-attribute instance-attribute
DEFAULT_MODEL = 'rerank-v3.5'
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None
rerank async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    if not chunks:
        return []
    limit = top_k if top_k is not None else self._config.top_k
    client = await self._ensure_client()
    response = await client.post(
        "/v2/rerank",
        json={
            "model": self._model,
            "query": query,
            "documents": [chunk.text for chunk in chunks],
            "top_n": limit,
            "max_tokens_per_doc": self._config.max_context_tokens,
        },
    )
    response.raise_for_status()
    payload = response.json()
    scored: list[RAGContextChunk] = []
    for entry in payload.get("results", []):
        index = int(entry.get("index", -1))
        if index < 0 or index >= len(chunks):
            continue
        score = float(entry.get("relevance_score", 0.0))
        scored.append(_stamp_score(chunks[index], score))
    return _apply_score_filters(
        scored,
        threshold=self._config.score_threshold,
        top_k=limit,
    )

NoOpRerankerProvider

Bases: RerankerProvider

Passthrough reranker — preserves original ordering and trims to top_k.

rerank async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    limit = top_k if top_k is not None else len(chunks)
    return list(chunks[:limit])

Qwen3RerankerProvider

Qwen3RerankerProvider(
    endpoint: str,
    *,
    api_key: str | None = None,
    model: str = DEFAULT_MODEL,
    config: RerankerConfig | None = None,
    client: AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
)

Bases: RerankerProvider

Qwen3-Reranker-4B served via an OpenAI-compatible inference endpoint.

The provider POSTs {query, documents, top_n} to endpoint and expects a {results: [{index, score}]} envelope. vLLM, TGI, and Together AI all expose this shape for cross-encoder reranker models.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
def __init__(
    self,
    endpoint: str,
    *,
    api_key: str | None = None,
    model: str = DEFAULT_MODEL,
    config: RerankerConfig | None = None,
    client: httpx.AsyncClient | None = None,
    timeout_seconds: float = DEFAULT_RERANK_TIMEOUT_SECONDS,
) -> None:
    self._endpoint = endpoint.rstrip("/")
    self._api_key = api_key
    self._model = model
    self._config = config or RerankerConfig()
    self._client = client
    self._owns_client = client is None
    self._timeout_seconds = timeout_seconds
DEFAULT_MODEL class-attribute instance-attribute
DEFAULT_MODEL = 'Qwen/Qwen3-Reranker-4B'
aclose async
aclose() -> None
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def aclose(self) -> None:
    if self._owns_client and self._client is not None:
        await self._client.aclose()
        self._client = None
rerank async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    if not chunks:
        return []
    limit = top_k if top_k is not None else self._config.top_k
    client = await self._ensure_client()
    response = await client.post(
        "/v1/rerank",
        json={
            "model": self._model,
            "query": query,
            "documents": [chunk.text for chunk in chunks],
            "top_n": limit,
            "max_tokens": self._config.max_context_tokens,
        },
    )
    response.raise_for_status()
    payload = response.json()
    scored: list[RAGContextChunk] = []
    # Accept both `results` (Cohere-style) and `data` (vLLM-style) envelopes
    # so the same provider talks to multiple inference backends without
    # bespoke client code per host.
    rows: Iterable[dict[str, Any]] = payload.get("results") or payload.get("data") or []
    for entry in rows:
        index = int(entry.get("index", -1))
        if index < 0 or index >= len(chunks):
            continue
        raw_score = entry.get("relevance_score")
        if raw_score is None:
            raw_score = entry.get("score", 0.0)
        score = float(raw_score if raw_score is not None else 0.0)
        scored.append(_stamp_score(chunks[index], score))
    return _apply_score_filters(
        scored,
        threshold=self._config.score_threshold,
        top_k=limit,
    )

RerankerConfig dataclass

RerankerConfig(
    top_k: int = 5,
    max_context_tokens: int = QWEN3_LONG_CONTEXT_TOKENS,
    score_threshold: float | None = None,
    extra_headers: dict[str, str] = dict(),
)

Common reranker configuration knobs shared by every backend.

Attributes:

Name Type Description
top_k int

Maximum number of chunks the reranker should return.

max_context_tokens int

Per-pair input token budget. Defaults to the 32k window supported by Qwen3-Reranker-4B; Cohere Rerank v4 also operates against long-form documents and respects this hint.

score_threshold float | None

Optional minimum score; chunks below it are dropped.

extra_headers dict[str, str]

Extra headers forwarded with every request.

extra_headers class-attribute instance-attribute
extra_headers: dict[str, str] = field(default_factory=dict)
max_context_tokens class-attribute instance-attribute
max_context_tokens: int = QWEN3_LONG_CONTEXT_TOKENS
score_threshold class-attribute instance-attribute
score_threshold: float | None = None
top_k class-attribute instance-attribute
top_k: int = 5

RerankerProvider

Bases: ABC

Abstract base for any reranker implementation.

rerank abstractmethod async
rerank(
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]

Return chunks reordered by relevance to query, truncated to top_k.

Source code in mirai_shared_skills/agentic_rag/providers/reranker.py
@abstractmethod
async def rerank(
    self,
    query: str,
    chunks: Sequence[RAGContextChunk],
    *,
    top_k: int | None = None,
) -> list[RAGContextChunk]:
    """Return chunks reordered by relevance to `query`, truncated to `top_k`."""

web

Web search provider abstraction.

The agentic RAG skill needs to fall back to live web data when internal sources are insufficient. This module exposes:

  • WebSearchProvider: an abstract interface (just search(query, top_k)).
  • BrowserWebSearchProvider: a default implementation that wraps the existing AgentBrowserSkill plus a configurable URL builder so any search engine (Bing, Brave, Tavily, Perplexity, custom intranet) can be plugged in by changing the URL template.

UrlBuilder module-attribute

UrlBuilder = Callable[[str], str]

__all__ module-attribute

__all__ = [
    "BrowserWebSearchProvider",
    "UrlBuilder",
    "WebSearchProvider",
    "default_duckduckgo_url",
]

BrowserWebSearchProvider

BrowserWebSearchProvider(
    *,
    browser: AgentBrowserSkill | None = None,
    url_builder: UrlBuilder = default_duckduckgo_url,
)

Bases: WebSearchProvider

Default WebSearchProvider that delegates to AgentBrowserSkill.

The url_builder callable controls which search engine is queried, so the same provider works with DuckDuckGo, Bing, Brave, or any private enterprise search portal.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
def __init__(
    self,
    *,
    browser: AgentBrowserSkill | None = None,
    url_builder: UrlBuilder = default_duckduckgo_url,
) -> None:
    self._browser = browser or AgentBrowserSkill()
    self._url_builder = url_builder
    self._cached_browse: Callable[..., Awaitable[str]] | None = None
search async
search(
    query: str, *, top_k: int = 5
) -> list[RAGContextChunk]
Source code in mirai_shared_skills/agentic_rag/providers/web.py
async def search(self, query: str, *, top_k: int = 5) -> list[RAGContextChunk]:
    url = self._url_builder(query)
    browse = self._browse_callable()
    raw = await browse(url=url)
    payload = json.loads(raw)
    if "error" in payload:
        return []
    text: str = payload.get("text", "")
    if not text:
        return []
    # Split the body into paragraph-ish chunks and keep the top_k strongest.
    candidates = [block.strip() for block in text.split("\n\n") if block.strip()]
    if not candidates:
        candidates = [text]
    chunks: list[RAGContextChunk] = []
    for index, snippet in enumerate(candidates[:top_k]):
        chunks.append(
            RAGContextChunk(
                text=snippet,
                metadata=SourceMetadata(
                    source="web",
                    identifier=f"{payload.get('final_url') or url}#chunk-{index}",
                    score=None,
                    extra={
                        "search_url": url,
                        "status_code": payload.get("status_code"),
                        "final_url": payload.get("final_url"),
                    },
                ),
            )
        )
    return chunks

WebSearchProvider

Bases: ABC

Abstract base for any web search backend.

search abstractmethod async
search(
    query: str, *, top_k: int = 5
) -> list[RAGContextChunk]

Return chunks containing live web text for the given query.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
@abstractmethod
async def search(self, query: str, *, top_k: int = 5) -> list[RAGContextChunk]:
    """Return chunks containing live web text for the given query."""

default_duckduckgo_url

default_duckduckgo_url(query: str) -> str

Return the DuckDuckGo HTML endpoint for query. Used as the safe default.

Source code in mirai_shared_skills/agentic_rag/providers/web.py
def default_duckduckgo_url(query: str) -> str:
    """Return the DuckDuckGo HTML endpoint for `query`. Used as the safe default."""
    from urllib.parse import quote_plus

    return f"https://duckduckgo.com/html/?q={quote_plus(query)}"

Weather

mirai_shared_skills.weather

Weather domain — a Standard (read-only) shared skill.

Fetches forecasts for a location through a public HTTP endpoint. The skill is inherently safe and does not require downstream SecureSkill wrapping.

WeatherSkill

Bases: BaseSkill

Standard skill that retrieves real-time weather information.

description property

description: str

instructions property

instructions: str

name property

name: str

get_tools

get_tools() -> list[Tool]
Source code in mirai_shared_skills/weather.py
def get_tools(self) -> list[Tool]:
    async def get_current_weather(location: str) -> str:
        """Return a deterministic mock forecast for the given location."""
        return f"The current weather in {location} is 22 C and sunny."

    return [tool_from_function(async_fn=get_current_weather)]

Database

mirai_shared_skills.database

Raw database operations — comprehensive, including destructive tooling.

RawDatabaseSkill is intentionally written without security guards. The whole catalog is consumed by downstream clients that wrap raw skills with a SecureSkill policy mapping. The catalog therefore exposes the full surface area of database tooling, including destructive operations like drop_table, without arbitrarily limiting utility during the development phase.

RawDatabaseSkill

Bases: BaseSkill

Raw skill exposing comprehensive read and state-mutating database tools.

description property

description: str

instructions property

instructions: str

name property

name: str

get_tools

get_tools() -> list[Tool]
Source code in mirai_shared_skills/database.py
def get_tools(self) -> list[Tool]:
    async def db_select_records(table: str, where: dict[str, Any] | None = None) -> str:
        """Return a JSON-encoded list of rows that match `where`."""
        return json.dumps(
            {"table": table, "where": where or {}, "rows": []},
            indent=2,
        )

    async def db_update_record(table: str, primary_key: str, values: dict[str, Any]) -> str:
        """Update a single row keyed by `primary_key`."""
        return json.dumps(
            {
                "table": table,
                "primary_key": primary_key,
                "values": values,
                "updated": True,
            },
            indent=2,
        )

    async def db_insert_record(table: str, values: dict[str, Any]) -> str:
        """Insert a new row into `table`."""
        return json.dumps({"table": table, "values": values, "inserted": True}, indent=2)

    async def db_delete_record(table: str, primary_key: str) -> str:
        """Delete the row keyed by `primary_key` from `table`."""
        return json.dumps(
            {"table": table, "primary_key": primary_key, "deleted": True},
            indent=2,
        )

    async def db_drop_table(table: str) -> str:
        """Drop `table` entirely. Destructive; downstream policy must gate this."""
        return json.dumps({"table": table, "dropped": True}, indent=2)

    return [
        tool_from_function(async_fn=db_select_records),
        tool_from_function(async_fn=db_update_record),
        tool_from_function(async_fn=db_insert_record),
        tool_from_function(async_fn=db_delete_record),
        tool_from_function(async_fn=db_drop_table),
    ]