diff --git a/internal/dao/file2document.go b/internal/dao/file2document.go index f3b030abc9f..f707eb1d3b1 100644 --- a/internal/dao/file2document.go +++ b/internal/dao/file2document.go @@ -82,3 +82,8 @@ func (dao *File2DocumentDAO) GetByDocumentID(docID string) ([]*entity.File2Docum func (dao *File2DocumentDAO) DeleteByDocumentID(docID string) error { return DB.Unscoped().Where("document_id = ?", docID).Delete(&entity.File2Document{}).Error } + +// Create inserts a new file2document mapping record. +func (dao *File2DocumentDAO) Create(mapping *entity.File2Document) error { + return DB.Create(mapping).Error +} diff --git a/internal/handler/file.go b/internal/handler/file.go index 8c83e3b1f6c..b040254547c 100644 --- a/internal/handler/file.go +++ b/internal/handler/file.go @@ -17,6 +17,8 @@ package handler import ( + "errors" + "fmt" "net/http" "net/url" "ragflow/internal/common" @@ -32,15 +34,17 @@ import ( // FileHandler file handler type FileHandler struct { - fileService *service.FileService - userService *service.UserService + fileService *service.FileService + userService *service.UserService + file2DocumentService *service.File2DocumentService } // NewFileHandler create file handler func NewFileHandler(fileService *service.FileService, userService *service.UserService) *FileHandler { return &FileHandler{ - fileService: fileService, - userService: userService, + fileService: fileService, + userService: userService, + file2DocumentService: service.NewFile2DocumentService(), } } @@ -552,3 +556,69 @@ func (h *FileHandler) Download(c *gin.Context) { // Send file data c.Data(http.StatusOK, contentType, blob) } + +// LinkToDatasets links files (or folder trees) to one or more datasets. +// Mirrors Python POST /api/v1/files/link-to-datasets (convert). +// @Summary Link files to datasets +// @Description Associate files with target knowledge-base datasets, re-indexing +// as needed. Folder inputs are expanded to their innermost files. +// The heavy DB work runs in a goroutine; the endpoint returns immediately. +// @Tags file +// @Accept json +// @Produce json +// @Param request body service.LinkToDatasetsRequest true "file_ids and kb_ids" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/files/link-to-datasets [post] +func (h *FileHandler) LinkToDatasets(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + var req service.LinkToDatasetsRequest + // Tolerate bind errors: a malformed or empty body simply leaves the fields + // empty, which the validate_request-style check below reports as missing + // arguments — matching Python's @validate_request behaviour and code. + _ = c.ShouldBindJSON(&req) + + // Mirror Python @validate_request("file_ids", "kb_ids"): missing arguments + // return ARGUMENT_ERROR (101) with data=null and the aggregated message. + var missing []string + if len(req.FileIDs) == 0 { + missing = append(missing, "file_ids") + } + if len(req.KbIDs) == 0 { + missing = append(missing, "kb_ids") + } + if len(missing) > 0 { + jsonError(c, common.CodeArgumentError, fmt.Sprintf("required argument are missing: %s; ", strings.Join(missing, ","))) + return + } + + if err := h.file2DocumentService.LinkToDatasets(user.ID, &req); err != nil { + jsonError(c, linkToDatasetsErrorCode(err), err.Error()) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": common.CodeSuccess, + "data": true, + "message": "success", + }) +} + +// linkToDatasetsErrorCode maps File2DocumentService sentinel errors to +// Python-compatible response codes. File/dataset-not-found and no-authorization +// use DATA_ERROR (102), matching Python's get_data_error_result in convert(); +// any other (internal) error is reported as a server error. +func linkToDatasetsErrorCode(err error) common.ErrorCode { + switch { + case errors.Is(err, service.ErrLinkFileNotFound), + errors.Is(err, service.ErrLinkDatasetNotFound), + errors.Is(err, service.ErrLinkNoAuthorization): + return common.CodeDataError + default: + return common.CodeServerError + } +} diff --git a/internal/router/router.go b/internal/router/router.go index a64c9f6e496..025ae6b5f33 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -269,6 +269,7 @@ func (r *Router) Setup(engine *gin.Engine) { file.GET("", r.fileHandler.ListFiles) file.DELETE("", r.fileHandler.DeleteFiles) file.POST("/move", r.fileHandler.MoveFiles) + file.POST("/link-to-datasets", r.fileHandler.LinkToDatasets) file.GET("/:id/ancestors", r.fileHandler.GetFileAncestors) file.GET("/:id/parent", r.fileHandler.GetParentFolder) file.GET("/:id", r.fileHandler.Download) diff --git a/internal/service/document.go b/internal/service/document.go index 213c6edb39e..a8cb4cfa3d1 100644 --- a/internal/service/document.go +++ b/internal/service/document.go @@ -263,6 +263,41 @@ func (s *DocumentService) deleteDocumentFull(docID string) error { return nil } +// RemoveDocumentKeepFile removes a document's chunks/metadata and the document +// row, decrementing the KB counters (doc_num/chunk_num/token_num), WITHOUT +// deleting the underlying file record, its storage blob, or its file2document +// mappings. Mirrors Python DocumentService.remove_document — the caller is +// responsible for cleaning up the file2document mappings separately. +func (s *DocumentService) RemoveDocumentKeepFile(docID string) error { + doc, kb, err := s.resolveDocAndKB(docID) + if err != nil { + return err + } + if _, delErr := s.taskDAO.DeleteByDocIDs([]string{docID}); delErr != nil { + common.Logger.Warn(fmt.Sprintf("RemoveDocumentKeepFile: failed to delete tasks for %s: %v", docID, delErr)) + } + s.deleteDocEngineData(docID, kb.TenantID, doc.KbID) + return s.deleteDocRecordWithCounters(doc, kb.ID) +} + +// InsertDocument creates a document row and increments the owning KB's doc_num +// counter in a single transaction. Mirrors Python DocumentService.insert, which +// updates dataset/document counters on insert. The document's ID and timestamps +// are populated by the caller / model hooks before insertion. +func (s *DocumentService) InsertDocument(doc *entity.Document) error { + return dao.DB.Transaction(func(tx *gorm.DB) error { + if err := tx.Create(doc).Error; err != nil { + return fmt.Errorf("failed to create document: %w", err) + } + if err := tx.Model(&entity.Knowledgebase{}). + Where("id = ?", doc.KbID). + Update("doc_num", gorm.Expr("doc_num + 1")).Error; err != nil { + return fmt.Errorf("failed to increment doc_num for KB %s: %w", doc.KbID, err) + } + return nil + }) +} + // resolveDocAndKB loads the document and its knowledgebase, returning both or // an error. func (s *DocumentService) resolveDocAndKB(docID string) (*entity.Document, *entity.Knowledgebase, error) { diff --git a/internal/service/file2document.go b/internal/service/file2document.go new file mode 100644 index 00000000000..2f490a4e56e --- /dev/null +++ b/internal/service/file2document.go @@ -0,0 +1,336 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package service + +import ( + "errors" + "path/filepath" + "strings" + + "go.uber.org/zap" + + "ragflow/internal/common" + "ragflow/internal/dao" + "ragflow/internal/entity" +) + +// Sentinel errors returned by File2DocumentService. Handlers map these to +// Python-compatible response codes/messages. Returning sentinels (instead of +// wrapped DAO/runtime errors) prevents internal DB details from leaking through +// the API response path. +var ( + // ErrLinkFileNotFound mirrors Python "File not found!". + ErrLinkFileNotFound = errors.New("File not found!") + // ErrLinkDatasetNotFound mirrors Python "Can't find this dataset!". + ErrLinkDatasetNotFound = errors.New("Can't find this dataset!") + // ErrLinkNoAuthorization mirrors Python "No authorization.". + ErrLinkNoAuthorization = errors.New("No authorization.") + // ErrLinkInternal is a generic, safe-to-expose internal failure. + ErrLinkInternal = errors.New("Internal server error.") +) + +// File2DocumentService handles linking files to datasets. +type File2DocumentService struct { + fileDAO *dao.FileDAO + file2DocumentDAO *dao.File2DocumentDAO + kbDAO *dao.KnowledgebaseDAO + userTenantDAO *dao.UserTenantDAO + documentSvc *DocumentService +} + +// NewFile2DocumentService creates a File2DocumentService. +func NewFile2DocumentService() *File2DocumentService { + return &File2DocumentService{ + fileDAO: dao.NewFileDAO(), + file2DocumentDAO: dao.NewFile2DocumentDAO(), + kbDAO: dao.NewKnowledgebaseDAO(), + userTenantDAO: dao.NewUserTenantDAO(), + documentSvc: NewDocumentService(), + } +} + +// LinkToDatasetsRequest is the body for POST /files/link-to-datasets. +type LinkToDatasetsRequest struct { + FileIDs []string `json:"file_ids"` + KbIDs []string `json:"kb_ids"` +} + +// LinkToDatasets validates inputs, expands folders, checks permissions, and +// schedules convertFiles in a goroutine — mirroring Python convert(). +// Returns immediately (fire-and-forget for the heavy DB work). +// +// On validation failure it returns a sentinel error (see ErrLink* above) so the +// handler can map it to a Python-compatible response without leaking internals. +func (s *File2DocumentService) LinkToDatasets(userID string, req *LinkToDatasetsRequest) error { + // ── 1. Validate files exist ─────────────────────────────────────────────── + files, err := s.fileDAO.GetByIDs(req.FileIDs) + if err != nil { + common.Warn("LinkToDatasets: GetByIDs failed", zap.Error(err)) + return ErrLinkInternal + } + filesSet := make(map[string]*entity.File, len(files)) + for _, f := range files { + filesSet[f.ID] = f + } + for _, id := range req.FileIDs { + if filesSet[id] == nil { + return ErrLinkFileNotFound + } + } + + // ── 2. Validate KBs exist ──────────────────────────────────────────────── + kbMap := make(map[string]*entity.Knowledgebase, len(req.KbIDs)) + for _, kbID := range req.KbIDs { + kb, err := s.kbDAO.GetByID(kbID) + if err != nil || kb == nil { + return ErrLinkDatasetNotFound + } + kbMap[kbID] = kb + } + + // ── 3. Expand folders to leaf files, then deduplicate ───────────────────── + // Mixed folder + direct file inputs (or overlapping folders) can yield the + // same leaf file more than once; dedupe so each file is converted exactly + // once. + expanded := make([]string, 0, len(req.FileIDs)) + for _, id := range req.FileIDs { + file := filesSet[id] + if file.Type == FileTypeFolder { + inner, err := s.getAllInnermostFileIDs(id) + if err != nil { + common.Warn("LinkToDatasets: folder expansion failed", zap.String("fileID", id), zap.Error(err)) + return ErrLinkInternal + } + expanded = append(expanded, inner...) + } else { + expanded = append(expanded, id) + } + } + allFileIDs := dedupeStrings(expanded) + + // ── 4. Validate expanded file permissions ───────────────────────────────── + for _, id := range allFileIDs { + file, err := s.fileDAO.GetByID(id) + if err != nil || file == nil { + return ErrLinkFileNotFound + } + if !s.checkFileTeamPermission(file, userID) { + return ErrLinkNoAuthorization + } + } + + // ── 5. Validate KB permissions ──────────────────────────────────────────── + for _, kb := range kbMap { + if !s.checkKBTeamPermission(kb, userID) { + return ErrLinkNoAuthorization + } + } + + // ── 6. Run conversion in background (fire-and-forget) ──────────────────── + kbIDs := req.KbIDs + go func() { + if err := s.convertFiles(allFileIDs, kbIDs, userID); err != nil { + common.Warn("file2document.convertFiles failed", + zap.Strings("file_ids", allFileIDs), + zap.Strings("kb_ids", kbIDs), + zap.Error(err)) + } + }() + + return nil +} + +// convertFiles mirrors Python _convert_files: for each file, remove existing +// documents (routing through DocumentService so KB counters are updated), drop +// the file2document mappings, then create a new document in each target KB and +// a fresh mapping. +func (s *File2DocumentService) convertFiles(fileIDs, kbIDs []string, userID string) error { + for _, fileID := range fileIDs { + // Remove existing documents linked to this file. Routing through + // DocumentService.RemoveDocumentKeepFile ensures KB doc_num/chunk_num/ + // token_num counters are decremented (mirrors Python remove_document) + // while preserving the file record itself for re-linking. + mappings, err := s.file2DocumentDAO.GetByFileID(fileID) + if err != nil { + common.Warn("convertFiles: GetByFileID failed", zap.String("fileID", fileID), zap.Error(err)) + } + for _, m := range mappings { + if m.DocumentID == nil { + continue + } + if err := s.documentSvc.RemoveDocumentKeepFile(*m.DocumentID); err != nil { + common.Warn("convertFiles: RemoveDocumentKeepFile failed", + zap.String("docID", *m.DocumentID), zap.Error(err)) + } + } + // Drop the file2document mappings for this file (mirrors Python + // File2DocumentService.delete_by_file_id, done once per file). + if err := s.file2DocumentDAO.DeleteByFileID(fileID); err != nil { + common.Warn("convertFiles: DeleteByFileID failed", zap.String("fileID", fileID), zap.Error(err)) + } + + // Reload the source file. + file, err := s.fileDAO.GetByID(fileID) + if err != nil || file == nil { + continue + } + + // Create a document + mapping in each target KB. + for _, kbID := range kbIDs { + kb, err := s.kbDAO.GetByID(kbID) + if err != nil || kb == nil { + continue + } + + parserID := getParser(file.Type, file.Name, kb.ParserID) + suffix := strings.TrimPrefix(filepath.Ext(file.Name), ".") + doc := &entity.Document{ + ID: common.GenerateUUID(), + KbID: kb.ID, + ParserID: parserID, + ParserConfig: kb.ParserConfig, + CreatedBy: userID, + Type: file.Type, + Name: &file.Name, + Suffix: suffix, + Size: file.Size, + } + if file.Location != nil { + doc.Location = file.Location + } + if kb.PipelineID != nil { + doc.PipelineID = kb.PipelineID + } + + // InsertDocument creates the row and increments KB doc_num in one + // transaction, so a failed insert never leaves a stale counter. + if err := s.documentSvc.InsertDocument(doc); err != nil { + common.Warn("convertFiles: InsertDocument failed", + zap.String("kbID", kbID), zap.String("fileID", fileID), zap.Error(err)) + continue + } + + mapping := &entity.File2Document{ + ID: common.GenerateUUID(), + FileID: &fileID, + DocumentID: &doc.ID, + } + if err := s.file2DocumentDAO.Create(mapping); err != nil { + common.Warn("convertFiles: Create file2document mapping failed", + zap.String("fileID", fileID), zap.String("docID", doc.ID), zap.Error(err)) + } + } + } + return nil +} + +// getAllInnermostFileIDs recursively collects all non-folder file IDs under a folder. +// Mirrors Python FileService.get_all_innermost_file_ids. +func (s *File2DocumentService) getAllInnermostFileIDs(folderID string) ([]string, error) { + children, err := s.fileDAO.ListByParentID(folderID) + if err != nil { + return nil, err + } + var ids []string + for _, child := range children { + if child.Type == FileTypeFolder { + sub, err := s.getAllInnermostFileIDs(child.ID) + if err != nil { + return nil, err + } + ids = append(ids, sub...) + } else { + ids = append(ids, child.ID) + } + } + return ids, nil +} + +// checkFileTeamPermission mirrors Python check_file_team_permission: +// true when file.TenantID == userID or user is in the file tenant's team. +func (s *File2DocumentService) checkFileTeamPermission(file *entity.File, userID string) bool { + if file.TenantID == userID { + return true + } + tenants, err := s.userTenantDAO.GetByUserID(userID) + if err != nil { + return false + } + for _, t := range tenants { + if t.TenantID == file.TenantID { + return true + } + } + return false +} + +// checkKBTeamPermission mirrors Python check_kb_team_permission: +// true when kb.TenantID == userID or user is in the KB tenant's team. +func (s *File2DocumentService) checkKBTeamPermission(kb *entity.Knowledgebase, userID string) bool { + if kb.TenantID == userID { + return true + } + tenants, err := s.userTenantDAO.GetByUserID(userID) + if err != nil { + return false + } + for _, t := range tenants { + if t.TenantID == kb.TenantID { + return true + } + } + return false +} + +// getParser maps (fileType, fileName, kbParserID) → a parser ID. +// Mirrors Python FileService.get_parser — falls back to the KB's parser. +func getParser(fileType, fileName, kbParserID string) string { + ext := strings.ToLower(strings.TrimPrefix(filepath.Ext(fileName), ".")) + switch ext { + case "pdf": + return "pdf" + case "doc", "docx": + return "naive" + case "ppt", "pptx": + return "presentation" + case "xls", "xlsx": + return "table" + case "txt", "md": + return "naive" + case "png", "jpg", "jpeg", "gif", "bmp", "webp": + return "picture" + } + if kbParserID != "" { + return kbParserID + } + return "naive" +} + +// dedupeStrings returns the input slice with duplicates removed, preserving the +// first-seen order. +func dedupeStrings(in []string) []string { + seen := make(map[string]struct{}, len(in)) + out := make([]string, 0, len(in)) + for _, v := range in { + if _, ok := seen[v]; ok { + continue + } + seen[v] = struct{}{} + out = append(out, v) + } + return out +}