Skip to content

Feat: artifact compilation pipeline + shared knowlege_compile engines#15697

Open
KevinHuSh wants to merge 18 commits into
infiniflow:mainfrom
KevinHuSh:feat/merge_structure_with_wiki
Open

Feat: artifact compilation pipeline + shared knowlege_compile engines#15697
KevinHuSh wants to merge 18 commits into
infiniflow:mainfrom
KevinHuSh:feat/merge_structure_with_wiki

Conversation

@KevinHuSh
Copy link
Copy Markdown
Collaborator

Summary

Builds out a four-phase artifact compilation pipeline (MAP → REDUCE → PLAN → REFINE) alongside the existing compile_structure_from_text extractor, and consolidates the plumbing both pipelines share into a small _common module so neither file carries the same code twice.

The artifact pipeline lives in rag/advanced_rag/knowlege_compile/artifact.py (renamed from wiki.py — both git mv and case-preserving content rename). It turns a KB's ingested chunks into a hyperlinked set of artifact pages, each backed by ES rows that retrievers can serve.

Pipeline phases

Phase Public entry Scope Output
MAP artifact_map_from_chunks per doc one non-searchable artifact_map_extract row per source chunk
REDUCE artifact_reduce_from_extracts per KB one artifact_reduce_result row with canonical entities/concepts
PLAN artifact_plan_from_reduction per KB one artifact_compilation_plan row with pages[]
REFINE artifact_refine_from_plan per KB many searchable artifact_page rows + non-searchable artifact_page_draft cache rows

Entity / relation schemas and the prompt's Rules section come from parser_config[\"artifact_compilation\"] (same YAML shape compile_structure_from_text accepts). source_chunk_id is always appended so chunk attribution survives whatever the user defines.

Shared engines in _common.py

  • build_chunk_batches + run_chunked_pipeline — generic chunked-LLM scaffold used by both compile_structure_from_text and artifact_map_from_chunks.
  • bulk_dedup_items — three-phase dedup (exact → embedding cosine → LLM disambiguation), used by REDUCE.
  • Utilities: stable_row_id, encode, tokenize_for_search, union_ordered, make_input_budget, ensure_llm_bundle, es_search / es_insert / es_delete / es_upsert_one, find_vec_field.

REFINE / persistence

artifact_page rows carry artifact_slug_kwd, artifact_title_kwd, artifact_page_type_kwd, artifact_kb_id_kwd, artifact_doc_id_kwd (list of contributing docs), artifact_outlinks_kwd (slugs linked from this page), plus artifact_raw_md_kwd (the LLM's [[slug]] form, used by the merger) and content_with_weight (the rendered form with clickable [text](artifact/{kb_id}/{slug}) links). UPDATE pages are LLM-merged against the existing content with a 70 % shrink-check fallback.

Task-executor wiring

task_handler.py::TaskHandler._artifact_compilation chains the four phases under parser_config[\"toc_extraction\"] for now. The call runs after chunk insertion so REFINE's chunk-by-id lookup resolves the source rows in ES. _artifact_load_chunks_by_id falls back to per-id docStoreConn.get() for whatever the batch search misses, with diagnostic warnings so future cross-backend filter quirks are debuggable.

Test plan

  • Naive parser with parser_config[\"toc_extraction\"]=true runs end-to-end on a single doc; artifact_page rows are produced with non-empty source_chunk_ids and artifact_doc_id_kwd.
  • Ingesting a second doc into the same KB produces additional rows; entities common to both docs end up on a single artifact_page whose source_doc_ids lists both contributing docs.
  • Clicking an [text](artifact/{kb_id}/{slug}) link in a rendered page resolves to another artifact_page row in ES via (artifact_kb_id_kwd, artifact_slug_kwd).
  • With compile_structure_from_text (list/set/hypergraph) still passing the existing extractor tests — i.e. the _common extraction didn't change behaviour.
  • No regressions to merge_compiled_structures (deliberately not migrated to bulk_dedup_items because the algorithm differs; left in place).

Caveats

  • Existing ES data persisted under the old wiki_* compile_kwd values is orphaned. _artifact_compilation deletes the five old kwds at the top of each run so the next invocation produces fresh, artifact-keyed data — but any front-end that intercepts the old link prefix needs the path migration too (wiki/{kb}/{slug}artifact/{kb}/{slug}).
  • This PR is intentionally additive plus a rename; it does not touch compile_structure_from_text's public surface beyond having it consume the shared _common helpers.

🤖 Generated with Claude Code

KevinHuSh and others added 18 commits May 20, 2026 15:55
ASYNC109 flags async functions that accept a `timeout` parameter
(callers should compose with `asyncio.timeout`/`wait_for` instead).
Match the existing convention in agent/tools/base.py and rename
`FunctionToolSession.tool_call_async`'s `timeout` to `request_timeout`.
The sync `tool_call` wrapper keeps `timeout` (ASYNC109 only applies
to async defs) and forwards via keyword.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Adds a configurable structured-knowledge extraction pipeline driven by the
YAML schema delivered through document.parser_config (mirrors the
Hyper-Extract template convention: type, output, guideline, identifiers).

Key pieces in rag/prompts/generator.py:
- compile_structure_from_text(chunks, parser_config, chat_mdl, embd_mdl,
  doc_id, language, callback, max_workers) — the public entry point.
- Prompt builders for list/set (single-stage) and hypergraph (two-stage
  node-first then edge with known-entity context), patterned after
  hyperextract/utils/template_engine/parsers/guideline.py but without any
  langchain dependency. Multilingual str|list|dict values are localized
  inline, and observation_time is substituted into time rules.
- Per-batch sequential node→edge ordering; batches themselves run in
  parallel under an asyncio.Semaphore (max_workers, default 10) with the
  same cancel-on-error gather pattern used by run_toc_from_text.
- split_chunks packs chunk texts to (chat_mdl.max_length *
  INPUT_UTILIZATION - prompt_overhead) tokens per batch.
- Extraction goes through gen_json (no Pydantic / structured output
  required). Embeddings go through LLMBundle.encode via thread_pool_exec.
- Cross-chunk merging is intentionally not implemented; deferred to a
  later pipeline stage.

ES doc shape emitted per item:
  content_with_weight, compile_kwd (list|set|hypergraph),
  structure_kwd (entity|relation), doc_id, chunk_ids,
  content_ltks/content_sm_ltks (from payload.description),
  title_tks (concat of tokenized non-description fields),
  q_<dim>_vec, src_name_kwd/target_name_kwd (relations, when resolvable
  via identifiers.relation_members or default source/target fields),
  id = xxh64(content_with_weight + doc_id).

rag/svr/task_executor.py wires the new function in via a
knowledge_compilation() helper next to build_TOC(). The call site is
left commented out behind the existing toc_extraction flag — activation
will be wired through its own dedicated flag in a follow-up.
….flow.extractor

- Move compile_structure_from_text and the supporting _struct_* helpers
  from rag/prompts/generator.py into rag/flow/extractor/extractor.py so
  the structured-knowledge pipeline lives alongside its caller. The
  generator module now re-exports nothing new; existing run_toc_from_text /
  gen_json / split_chunks / INPUT_UTILIZATION are imported from there.

- Add merge_compiled_structures(docs, chat_mdl, embd_mdl, tenant_id, kb_id,
  similarity_threshold=0.9) which is meant to run on the docs returned by
  compile_structure_from_text before they are written to ES:
    Phase 1 (local dedup): group docs by (doc_id, compile_kwd,
      src_name_kwd?, target_name_kwd?), compute pairwise cosine similarity
      over q_<dim>_vec via sklearn.metrics.pairwise.cosine_similarity, and
      for each pair above the threshold ask the LLM via _struct_merge_pair
      whether they are the same logical entity/relation. On a duplicate
      verdict the surviving entry is rebuilt from the merged payload:
      chunk_ids are unioned, the description is re-embedded, src/target
      are forced back to the existing payload's values for relations, and
      the kept entry's id and identity kwds are preserved.
    Phase 2 (ES dedup): for each surviving doc, build the same filter
      condition and KNN-search ES via MatchDenseExpr (topn=1, similarity >=
      threshold). On a hit + duplicate verdict, update the existing ES doc
      by its old id via settings.docStoreConn.update; otherwise insert via
      settings.docStoreConn.insert. Returns
      {"inserted": N, "updated": M, "duplicates_dropped": K}.

- Merge prompts: MERGE_SYSTEM_PROMPT and MERGE_USER_PROMPT are kept
  verbatim; a small MERGE_DECISION_INSTRUCTION is appended so the LLM also
  emits {"duplicated": bool, "merged": <json|null>} via gen_json,
  preserving the user-supplied prompts untouched.

- task_executor.py: import compile_structure_from_text and the new
  merge_compiled_structures from rag.flow.extractor.extractor (no longer
  rag.prompts.generator). knowledge_compilation now reads the structure
  YAML from parser_config["knowledge_compilation"], runs the compile step,
  and then calls merge_compiled_structures. The previously dead toc-style
  branch is replaced by a dedicated kc_thread gated on
  parser_config["knowledge_compilation"] so the new pipeline is opt-in
  via its own flag.
…res into rag.advanced_rag.knowlege_compile.structure

The list/set/hypergraph compilation pipeline and its local-plus-ES
deduplicator no longer live alongside the workflow Extractor component
in rag/flow/extractor/extractor.py. They are now in a dedicated package:

  rag/advanced_rag/knowlege_compile/
    __init__.py      — re-exports the two public entry points
    structure.py     — all _struct_* helpers, MERGE_* prompts,
                       compile_structure_from_text, and
                       merge_compiled_structures

No behavior changes: helpers, prompts, and function signatures move
verbatim. Two existing call sites are updated:
  - rag/flow/extractor/extractor.py now imports the two entry points
    from rag.advanced_rag.knowlege_compile.structure; only the
    Extractor / ExtractorParam classes and run_toc_from_text remain in
    this file.
  - rag/svr/task_executor.py's knowledge_compilation helper imports
    from the new package as well.
Adds rag/advanced_rag/knowlege_compile/_common.py and migrates both
pipelines (compile_structure + the four-phase MRP pipeline) onto a small
set of shared utilities and engines:

- ID minting (stable_row_id), tokenize_for_search, ordered union,
  token-budget calculator (make_input_budget), defensive LLMBundle
  unwrap (ensure_llm_bundle), encode wrapper, ES-IO wrappers
  (es_search / es_insert / es_delete / es_upsert_one).
- build_chunk_batches + run_chunked_pipeline: shared chunked-LLM
  scaffold (filter empties + resume + pack via split_chunks + parallel
  asyncio.gather under a semaphore). Used by both MAP entry points;
  the per-batch LLM call shape stays in each pipeline via a callback.
- bulk_dedup_items: exact + embedding + LLM-disambiguation dedup with
  parameterised name/type keys, optional aggregate_extra callback.
  Replaces the artifact pipeline's _wiki_exact_dedup_entities /
  _wiki_exact_dedup_concepts / _wiki_embedding_dedup_entities /
  _wiki_resolve_ambiguous_entities / _wiki_apply_merges helpers.

structure.py and the (formerly) wiki pipeline both consume those
helpers; the duplicated plumbing they used to carry shrinks by ~300
lines across the two files.

Renames the wiki module to "artifact" with case-preserving edits
(wiki -> artifact, Wiki -> Artifact, WIKI -> ARTIFACT) across:

- rag/advanced_rag/knowlege_compile/wiki.py -> artifact.py (git mv;
  history follows the file)
- rag/advanced_rag/knowlege_compile/_common.py (docstrings/comments)
- rag/advanced_rag/knowlege_compile/__init__.py (re-exports)
- rag/svr/task_executor_refactor/task_handler.py (import + method
  name + log strings)

Public entry points: artifact_map_from_chunks,
artifact_reduce_from_extracts, artifact_plan_from_reduction,
artifact_refine_from_plan. Constants: ARTIFACT_*_COMPILE_KWD.
compile_kwd string values, keyword-field names (artifact_slug_kwd,
artifact_kb_id_kwd, ...), and the clickable-link prefix
"artifact/{kb_id}/{slug}" are renamed in step. Existing rows persisted
under the old "wiki_*" compile_kwd values are no longer reachable from
the new code path; the task-handler's artifact_compilation entry
deletes the old kwds at the top of each run so the next pipeline
invocation produces fresh, artifact-keyed data.

Additional fixes folded in:

- Per-batch positional chunk labels (C1, C2, ...) and a known-id
  scrubber on chunk bodies so the extraction LLM does not surface
  chunk-id hashes as entity names.
- Stronger claim-extraction rules in the MAP prompt, plus an
  entity/concept fallback for evidence so source_chunk_ids /
  source_doc_ids populate even when MAP produces no claims.
- Synthetic fallback evidence stubs marked with _synthetic so they
  carry chunk_ids forward without appearing in the writer prompt.
- Pages-spec dedup in REFINE so duplicate slugs from the planner do
  not multiply writer calls or bloat the "Available pages" list.
- DEFAULT_ARTIFACT_PLAN_TIMEOUT raised to 600s for reasoning models.
- _ensure_llm_bundle defensive unwrap at the entry of REFINE (and now
  helpful when the same misuse hits REDUCE/PLAN; the centralised
  encode keeps tuple-shaped misuse from crashing deep in the stack).
- artifact_compilation in task_handler.py moved to AFTER chunk
  insertion so REFINE's chunk-by-id lookup actually finds the source
  rows in the doc store.
- _wiki_load_chunks_by_id falls back to per-id docStoreConn.get for
  whatever the batch search misses, with diagnostic warnings so future
  cross-backend filter quirks are debuggable.

MAP entity & relation schemas plus rule sections are now driven by
parser_config (the same YAML shape compile_structure_from_text
accepts); source_chunk_id is always appended so chunk attribution
survives whatever the user defines.
@dosubot dosubot Bot added size:XXL This PR changes 1000+ lines, ignoring generated files. 🌈 python Pull requests that update Python code 💞 feature Feature request, pull request that fullfill a new feature. labels Jun 5, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 5, 2026

Review Change Stack

📝 Walkthrough

Walkthrough

The PR introduces a complete structured knowledge extraction pipeline for automatically mining and deduplicating entities, lists, sets, and graph relationships from documents via LLM-driven extraction, embedding-based similarity matching, and optional disambiguation. This spans utility infrastructure, extraction engines, per-field integration, and KB-wide orchestration with robust error handling throughout.

Changes

Structured Knowledge Extraction Pipeline

Layer / File(s) Summary
Common utilities and infrastructure
rag/advanced_rag/knowlege_compile/_common.py
Shared helpers for the pipeline: stable ID minting, async embedding, tokenization, prompt budgeting, LLMBundle validation, ES I/O wrappers (es_search, es_insert, es_delete, es_upsert_one), chunk batching with token packing (build_chunk_batches), async pipeline execution (run_chunked_pipeline), and three-phase bulk dedup combining exact key matching, embedding-based cosine similarity classification, and LLM-based pair disambiguation with union-find merging.
Package API re-exports
rag/advanced_rag/knowlege_compile/__init__.py
Exposes knowledge compilation entry points (compile_structure_from_text, merge_compiled_structures) and artifact pipeline functions (artifact_map_from_chunks, artifact_plan_from_reduction, artifact_reduce_from_extracts, artifact_refine_from_plan) alongside keyword constants.
Structured extraction and merge engine
rag/advanced_rag/knowlege_compile/structure.py
Core extraction module that builds localized prompts from parser_config, batches chunks, sequentially runs hypergraph node extraction then relation extraction per batch, embeds payloads, and generates ES-ready documents. compile_structure_from_text handles the extraction phase; merge_compiled_structures performs two-phase dedup: local in-memory dedup via cosine similarity + LLM merge decisions (with relation field preservation and source_id union), followed by ES-level dedup using KNN search and selective document updates. Includes robust error handling that falls back to insert/skip behaviors on extraction, embedding, or ES operation failures.
Per-field extractor integration
rag/flow/extractor/extractor.py
ExtractorParam gains a knowledge_compilation config field. Extractor._knowledge_compile(docs) constructs an embedding bundle, sorts docs by page/top, runs extraction and merge pipelines. Extractor._invoke routes field_name in ["set", "list", "graph"] through knowledge compilation with deterministic IDs and early return; other fields fall through to the existing per-item generation behavior.
Task handler artifact compilation
rag/svr/task_executor_refactor/task_handler.py
Integrates KB-wide artifact compilation: toc_thread is now initialized unconditionally; after chunk insertion, _artifact_compilation(...) is conditionally invoked (gated on naive parser and TOC extraction enabled) to run the 4-phase MAP→REDUCE→PLAN→REFINE artifact pipeline via imported helpers, with ES cleanup for prior compile markers and progress logging. Compilation failures are caught and logged without aborting the task.
Supporting improvements
api/db/services/dialog_service.py, rag/prompts/generator.py, rag/svr/task_executor.py
Chat empty response now emits an additional non-final intermediate result before the final yield; multi_queries_gen returns explicit {} fallback on exception; minor whitespace/logging cleanup.

Sequence Diagram

sequenceDiagram
  participant TaskHandler
  participant Compile as compile_structure_from_text
  participant Extract as LLM<br/>Extraction
  participant Embed as Embedding
  participant MergeS as merge_compiled_structures
  participant ESDedup as ES Dedup
  participant ES as Elasticsearch
  
  TaskHandler->>Compile: sorted chunks + parser_config
  Compile->>Extract: per-batch node/relation extraction
  Extract->>Embed: aggregated payloads
  Embed->>Compile: vectors + tokens
  Compile->>MergeS: local dedup candidates
  MergeS->>MergeS: cosine similarity + LLM verdict
  MergeS->>ESDedup: surviving docs
  ESDedup->>ES: KNN search filtered by compile_kwd
  ES->>ESDedup: matching candidates
  ESDedup->>ES: update or insert with merged fields
  ESDedup->>TaskHandler: final artifact pages
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • infiniflow/ragflow#15154: Both PRs modify TaskHandler orchestration; this PR adds a new post-chunk artifact compilation step while the related PR introduces the overall TaskHandler refactoring structure.

Suggested labels

🌈 python, feature: knowledge-extraction

Suggested reviewers

  • yuzhichang
  • yingfeng

Poem

🐰 In documents deep, where entities sleep,
We gather and blend what knowledge we reap,
With vectors and logic, we settle the score—
No duplicate names shall confuse us no more! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 52.38% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately reflects the main change: introducing an artifact compilation pipeline and consolidating shared knowledge compilation engines.
Description check ✅ Passed The description is comprehensive and well-structured, covering problem statement, pipeline design, shared utilities, task wiring, testing approach, and caveats, aligning with template requirements.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
rag/advanced_rag/knowlege_compile/__init__.py (1)

17-43: ⚡ Quick win

Consider adding a module-level docstring.

This package exposes a public API for structured knowledge extraction and artifact compilation. A module-level docstring would help users understand the package's purpose and the relationship between the exported functions (e.g., the MAP→REDUCE→PLAN→REFINE pipeline flow).

📝 Suggested docstring
 #  limitations under the License.
 #
+"""Structured knowledge extraction and artifact compilation.
+
+This package provides two primary workflows:
+
+1. **Field-level extraction** (via ``compile_structure_from_text`` and
+   ``merge_compiled_structures``): Extract and deduplicate structured data
+   (lists, sets, hypergraphs) from document chunks.
+
+2. **KB-wide artifact compilation** (4-phase pipeline):
+   - MAP (``artifact_map_from_chunks``): Per-chunk entity/relation extraction
+   - REDUCE (``artifact_reduce_from_extracts``): KB-scoped deduplication
+   - PLAN (``artifact_plan_from_reduction``): Generate artifact pages outline
+   - REFINE (``artifact_refine_from_plan``): Produce searchable artifact pages
+"""
 
 from .structure import compile_structure_from_text, merge_compiled_structures
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rag/advanced_rag/knowlege_compile/__init__.py` around lines 17 - 43, Add a
clear module-level docstring at the top of this package explaining its purpose
(structured knowledge extraction and artifact compilation) and the typical
MAP→REDUCE→PLAN→REFINE pipeline flow; mention the primary API symbols to orient
users: compile_structure_from_text, merge_compiled_structures,
artifact_map_from_chunks, artifact_reduce_from_extracts,
artifact_plan_from_reduction, artifact_refine_from_plan and the ARTIFACT_*
compile keyword constants (ARTIFACT_MAP_COMPILE_KWD,
ARTIFACT_REDUCE_COMPILE_KWD, ARTIFACT_PLAN_COMPILE_KWD,
ARTIFACT_PAGE_COMPILE_KWD, ARTIFACT_DRAFT_COMPILE_KWD); keep it short (2–4
sentences) and place it as the first statement in the module so tools and IDEs
surface it.
api/db/services/dialog_service.py (1)

756-760: ⚡ Quick win

Add logging for the new empty_response streaming flow.

The empty_response path now emits an intermediate non-final yield (line 758) before the final yield. Per coding guidelines, new flows in **/*.py should include logging. Consider adding a debug or info log when entering this path to aid troubleshooting.

As per coding guidelines: "**/*.py: Add logging for new flows".

📝 Suggested logging addition
 if not knowledges and prompt_config.get("empty_response"):
     empty_res = prompt_config["empty_response"]
+    logging.debug("Emitting empty_response for query '%s': no knowledges retrieved", " ".join(questions))
     yield {"answer": empty_res, "reference": {}, "prompt": "", "audio_binary": None, "final": False}
     yield {"answer": empty_res, "reference": kbinfos, "prompt": "\n\n### Query:\n%s" % " ".join(questions), "audio_binary": tts(tts_mdl, empty_res), "final": True}
     return
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@api/db/services/dialog_service.py` around lines 756 - 760, Add a log entry
when the empty_response streaming branch is taken: inside the block that checks
"if not knowledges and prompt_config.get('empty_response')" (where you yield the
intermediate and final empty_responses and call tts(tts_mdl,...)), call the
module logger (e.g., logger.debug or logger.info) to record that the
empty_response flow was entered and include contextual data such as
prompt_config keys, questions, and kbinfos identifiers; place the log before the
first yield so the event is recorded for troubleshooting and follow existing
logging conventions used in dialog_service.py.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rag/advanced_rag/knowlege_compile/structure.py`:
- Around line 551-589: The code calls _build_chunk_batches and
_run_chunked_pipeline in compile_structure but they are not imported, causing a
NameError; add imports for _build_chunk_batches and _run_chunked_pipeline from
the module that exports them (the same _common where encode, find_vec_field,
stable_row_id, tokenize_for_search, union_ordered are imported) so the functions
used in this file (references: _build_chunk_batches and _run_chunked_pipeline)
are available at runtime.

In `@rag/flow/extractor/extractor.py`:
- Around line 81-98: The async method _knowledge_compile incorrectly calls
merge_compiled_structures without awaiting it and also sends a misleading
callback message; change the callback text in _knowledge_compile to reflect
"Start knowledge compilation ..." and add await before
merge_compiled_structures(...) so the coroutine runs (ensure you keep existing
args: docs, self.chat_mdl, embedding_model, self._canvas.get_tenant_id(),
DocumentService.get_knowledgebase_id(self._canvas._doc_id)); verify
compile_structure_from_text is still awaited as-is.

In `@rag/svr/task_executor_refactor/task_handler.py`:
- Around line 575-581: The code is deleting the same five compile_kwd entries
twice; remove the duplicate set so each marker is deleted only once—either
delete the explicit five individual calls or remove the for-loop that repeats
them. Locate the block using settings.docStoreConn.delete and
search.index_name(ctx.tenant_id) (referencing ctx.kb_id and the kwd values
"artifact_map_extract", "artifact_reduce_result", "artifact_compilation_plan",
"artifact_page_draft", "artifact_page") and keep just a single deletion
implementation (preferably the concise for kwd in (...) loop) and remove the
other set.
- Around line 650-654: Inside the async method _artifact_compilation, don't call
asyncio.run(_run()) (which raises RuntimeError when the event loop is already
running); instead await the coroutine returned by _run() (i.e., pages = await
_run()), and keep the existing try/except around that await so the
logging.exception("artifact_compilation: pipeline failed for doc %s",
ctx.doc_id) path still runs on errors—update the call site in
_artifact_compilation to use await _run() and preserve error handling.

---

Nitpick comments:
In `@api/db/services/dialog_service.py`:
- Around line 756-760: Add a log entry when the empty_response streaming branch
is taken: inside the block that checks "if not knowledges and
prompt_config.get('empty_response')" (where you yield the intermediate and final
empty_responses and call tts(tts_mdl,...)), call the module logger (e.g.,
logger.debug or logger.info) to record that the empty_response flow was entered
and include contextual data such as prompt_config keys, questions, and kbinfos
identifiers; place the log before the first yield so the event is recorded for
troubleshooting and follow existing logging conventions used in
dialog_service.py.

In `@rag/advanced_rag/knowlege_compile/__init__.py`:
- Around line 17-43: Add a clear module-level docstring at the top of this
package explaining its purpose (structured knowledge extraction and artifact
compilation) and the typical MAP→REDUCE→PLAN→REFINE pipeline flow; mention the
primary API symbols to orient users: compile_structure_from_text,
merge_compiled_structures, artifact_map_from_chunks,
artifact_reduce_from_extracts, artifact_plan_from_reduction,
artifact_refine_from_plan and the ARTIFACT_* compile keyword constants
(ARTIFACT_MAP_COMPILE_KWD, ARTIFACT_REDUCE_COMPILE_KWD,
ARTIFACT_PLAN_COMPILE_KWD, ARTIFACT_PAGE_COMPILE_KWD,
ARTIFACT_DRAFT_COMPILE_KWD); keep it short (2–4 sentences) and place it as the
first statement in the module so tools and IDEs surface it.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 39e42760-70bc-4298-9095-0d0a88e66e57

📥 Commits

Reviewing files that changed from the base of the PR and between f78ef32 and 1aa3cee.

📒 Files selected for processing (9)
  • api/db/services/dialog_service.py
  • rag/advanced_rag/knowlege_compile/__init__.py
  • rag/advanced_rag/knowlege_compile/_common.py
  • rag/advanced_rag/knowlege_compile/artifact.py
  • rag/advanced_rag/knowlege_compile/structure.py
  • rag/flow/extractor/extractor.py
  • rag/prompts/generator.py
  • rag/svr/task_executor.py
  • rag/svr/task_executor_refactor/task_handler.py

Comment on lines +551 to +589
packed_batches, _info = _build_chunk_batches(
chunks,
chat_mdl,
prompt_overhead_tokens=prompt_overhead,
)
if not packed_batches:
return []

async def _process_one(batch: list[dict], bi: int, total: int) -> list[dict]:
# The engine's semaphore already bounds concurrency.
return await _struct_process_batch(
packed=batch,
batch_idx=bi,
total=total,
autotype=autotype,
parser_config=parser_config,
chat_mdl=chat_mdl,
embd_mdl=embd_mdl,
doc_id=doc_id,
language=language,
callback=callback,
semaphore=None,
)

def _flatten(per_batch: list) -> list[dict]:
out: list[dict] = []
for br in (per_batch or []):
if br:
out.extend(br)
return out

return await _run_chunked_pipeline(
packed_batches,
process_batch=_process_one,
aggregate=_flatten,
max_workers=max_workers,
callback=callback,
log_prefix="compile_structure",
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Missing imports will cause NameError at runtime.

The function calls _build_chunk_batches (line 551) and _run_chunked_pipeline (line 582), but these are not imported from _common. The current imports (lines 70-76) only include encode, find_vec_field, stable_row_id, tokenize_for_search, and union_ordered.

🐛 Proposed fix: Add missing imports
 from ._common import (
+    build_chunk_batches as _build_chunk_batches,
     encode as _encode,
     find_vec_field as _find_vec_field,
+    run_chunked_pipeline as _run_chunked_pipeline,
     stable_row_id as _stable_row_id,
     tokenize_for_search as _tokenize_for_search,
     union_ordered as _union_ordered,
 )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rag/advanced_rag/knowlege_compile/structure.py` around lines 551 - 589, The
code calls _build_chunk_batches and _run_chunked_pipeline in compile_structure
but they are not imported, causing a NameError; add imports for
_build_chunk_batches and _run_chunked_pipeline from the module that exports them
(the same _common where encode, find_vec_field, stable_row_id,
tokenize_for_search, union_ordered are imported) so the functions used in this
file (references: _build_chunk_batches and _run_chunked_pipeline) are available
at runtime.

Comment on lines +81 to +98
async def _knowledge_compile(self, docs):
embedding_model = LLMBundle(self._canvas.get_tenant_id(), LLMType.EMBEDDING,
max_retries=self._param.max_retries,
retry_interval=self._param.delay_after_error)
self.callback(0.2,message="Start to generate table of content ...")
docs = sorted(docs, key=lambda d:(
d.get("page_num_int", 0)[0] if isinstance(d.get("page_num_int", 0), list) else d.get("page_num_int", 0),
d.get("top_int", 0)[0] if isinstance(d.get("top_int", 0), list) else d.get("top_int", 0)
))
docs = await compile_structure_from_text(docs,
self._param.knowledge_compilation,
self.chat_mdl, embedding_model,
self._canvas._doc_id)
info = merge_compiled_structures(docs, self.chat_mdl,
embedding_model,
self._canvas.get_tenant_id(),
DocumentService.get_knowledgebase_id(self._canvas._doc_id))
return info
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Missing await on async function call causes merge to never execute.

merge_compiled_structures is an async function, but line 94 does not await it. This returns a coroutine object instead of executing the merge, meaning deduplication will silently be skipped.

Also, the callback message on line 85 says "table of content" but should reflect knowledge compilation.

🐛 Proposed fix
     async def _knowledge_compile(self, docs):
         embedding_model = LLMBundle(self._canvas.get_tenant_id(), LLMType.EMBEDDING,
                             max_retries=self._param.max_retries,
                             retry_interval=self._param.delay_after_error)
-        self.callback(0.2,message="Start to generate table of content ...")
+        self.callback(0.2, message="Start knowledge compilation...")
         docs = sorted(docs, key=lambda d:(
             d.get("page_num_int", 0)[0] if isinstance(d.get("page_num_int", 0), list) else d.get("page_num_int", 0),
             d.get("top_int", 0)[0] if isinstance(d.get("top_int", 0), list) else d.get("top_int", 0)
         ))
         docs = await compile_structure_from_text(docs, 
                     self._param.knowledge_compilation, 
                     self.chat_mdl, embedding_model, 
                     self._canvas._doc_id)
-        info = merge_compiled_structures(docs, self.chat_mdl, 
+        info = await merge_compiled_structures(docs, self.chat_mdl, 
                                   embedding_model, 
                                   self._canvas.get_tenant_id(), 
                                   DocumentService.get_knowledgebase_id(self._canvas._doc_id))
         return info
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rag/flow/extractor/extractor.py` around lines 81 - 98, The async method
_knowledge_compile incorrectly calls merge_compiled_structures without awaiting
it and also sends a misleading callback message; change the callback text in
_knowledge_compile to reflect "Start knowledge compilation ..." and add await
before merge_compiled_structures(...) so the coroutine runs (ensure you keep
existing args: docs, self.chat_mdl, embedding_model,
self._canvas.get_tenant_id(),
DocumentService.get_knowledgebase_id(self._canvas._doc_id)); verify
compile_structure_from_text is still awaited as-is.

Comment on lines +575 to +581
settings.docStoreConn.delete({"compile_kwd": "artifact_map_extract"}, search.index_name(ctx.tenant_id), ctx.kb_id)
settings.docStoreConn.delete({"compile_kwd": "artifact_reduce_result"}, search.index_name(ctx.tenant_id), ctx.kb_id)
settings.docStoreConn.delete({"compile_kwd": "artifact_compilation_plan"}, search.index_name(ctx.tenant_id), ctx.kb_id)
settings.docStoreConn.delete({"compile_kwd": "artifact_page_draft"}, search.index_name(ctx.tenant_id), ctx.kb_id)
settings.docStoreConn.delete({"compile_kwd": "artifact_page"}, search.index_name(ctx.tenant_id), ctx.kb_id)
for kwd in ("artifact_map_extract", "artifact_reduce_result", "artifact_compilation_plan", "artifact_page_draft", "artifact_page"):
settings.docStoreConn.delete({"compile_kwd": kwd}, search.index_name(ctx.tenant_id), ctx.kb_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Duplicate ES deletions waste I/O.

Lines 575-579 delete records for 5 compile_kwd markers, then the loop on lines 580-581 immediately deletes the exact same 5 markers again. Remove one set of deletions.

♻️ Proposed fix: Remove duplicate deletion
-        settings.docStoreConn.delete({"compile_kwd": "artifact_map_extract"}, search.index_name(ctx.tenant_id), ctx.kb_id)
-        settings.docStoreConn.delete({"compile_kwd": "artifact_reduce_result"}, search.index_name(ctx.tenant_id), ctx.kb_id)
-        settings.docStoreConn.delete({"compile_kwd": "artifact_compilation_plan"}, search.index_name(ctx.tenant_id), ctx.kb_id)
-        settings.docStoreConn.delete({"compile_kwd": "artifact_page_draft"}, search.index_name(ctx.tenant_id), ctx.kb_id)
-        settings.docStoreConn.delete({"compile_kwd": "artifact_page"}, search.index_name(ctx.tenant_id), ctx.kb_id)
         for kwd in ("artifact_map_extract", "artifact_reduce_result", "artifact_compilation_plan", "artifact_page_draft", "artifact_page"):
             settings.docStoreConn.delete({"compile_kwd": kwd}, search.index_name(ctx.tenant_id), ctx.kb_id)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rag/svr/task_executor_refactor/task_handler.py` around lines 575 - 581, The
code is deleting the same five compile_kwd entries twice; remove the duplicate
set so each marker is deleted only once—either delete the explicit five
individual calls or remove the for-loop that repeats them. Locate the block
using settings.docStoreConn.delete and search.index_name(ctx.tenant_id)
(referencing ctx.kb_id and the kwd values "artifact_map_extract",
"artifact_reduce_result", "artifact_compilation_plan", "artifact_page_draft",
"artifact_page") and keep just a single deletion implementation (preferably the
concise for kwd in (...) loop) and remove the other set.

Comment on lines +650 to +654
try:
pages = asyncio.run(_run())
except Exception:
logging.exception("artifact_compilation: pipeline failed for doc %s", ctx.doc_id)
return None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

asyncio.run() inside async function will raise RuntimeError.

_artifact_compilation is an async def method, which means it runs inside an event loop. Calling asyncio.run(_run()) from within a running event loop raises RuntimeError: This event loop is already running. The _run() coroutine should be awaited directly.

🐛 Proposed fix
         try:
-            pages = asyncio.run(_run())
+            pages = await _run()
         except Exception:
             logging.exception("artifact_compilation: pipeline failed for doc %s", ctx.doc_id)
             return None
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rag/svr/task_executor_refactor/task_handler.py` around lines 650 - 654,
Inside the async method _artifact_compilation, don't call asyncio.run(_run())
(which raises RuntimeError when the event loop is already running); instead
await the coroutine returned by _run() (i.e., pages = await _run()), and keep
the existing try/except around that await so the
logging.exception("artifact_compilation: pipeline failed for doc %s",
ctx.doc_id) path still runs on errors—update the call site in
_artifact_compilation to use await _run() and preserve error handling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

💞 feature Feature request, pull request that fullfill a new feature. 🌈 python Pull requests that update Python code size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant