diff --git a/internal/handler/document.go b/internal/handler/document.go index 96e81c4005f..70789a03185 100644 --- a/internal/handler/document.go +++ b/internal/handler/document.go @@ -21,16 +21,17 @@ import ( "errors" "fmt" "mime" + "mime/multipart" "net/http" "path/filepath" - "ragflow/internal/common" - "ragflow/internal/entity" "strconv" "strings" "time" "github.com/gin-gonic/gin" + "ragflow/internal/common" + "ragflow/internal/entity" "ragflow/internal/service" ) @@ -55,12 +56,14 @@ type documentServiceIface interface { DeleteDocumentMetadata(docID string, keys []string) error DeleteDocumentAllMetadata(docID string) error GetDocumentMetadataByID(docID string) (map[string]interface{}, error) + BatchUpdateDocumentMetadata(datasetID string, req *service.BatchUpdateMetadataRequest) (*service.BatchUpdateMetadataResult, error) } // DocumentHandler document handler type DocumentHandler struct { documentService documentServiceIface datasetService *service.DatasetService + fileService *service.FileService } // NewDocumentHandler create document handler @@ -68,6 +71,7 @@ func NewDocumentHandler(documentService *service.DocumentService, datasetService return &DocumentHandler{ documentService: documentService, datasetService: datasetService, + fileService: service.NewFileService(), } } @@ -886,3 +890,191 @@ func (h *DocumentHandler) StopParseDocuments(c *gin.Context) { "data": result, }) } + +// UploadInfo uploads one or more files (or a URL) and returns file metadata. +// Mirrors Python POST /api/v1/documents/upload (upload_info). +// @Summary Upload document info +// @Description Upload files via multipart form or supply ?url=... to fetch remotely. +// @Tags documents +// @Accept multipart/form-data +// @Produce json +// @Param file formData file false "File(s) to upload" +// @Param url query string false "Remote URL to fetch" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/documents/upload [post] +func (h *DocumentHandler) UploadInfo(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + url := c.Query("url") + + // Distinguish "request isn't multipart" (benign — e.g. the ?url= path) from a + // malformed multipart body, which is a client error worth surfacing. + form, formErr := c.MultipartForm() + if formErr != nil && !errors.Is(formErr, http.ErrNotMultipart) { + c.JSON(http.StatusBadRequest, gin.H{ + "code": common.CodeArgumentError, + "data": false, + "message": "invalid multipart form: " + formErr.Error(), + }) + return + } + var files []*multipart.FileHeader + if form != nil && form.File != nil { + files = form.File["file"] + } + + if len(files) > 0 && url != "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeArgumentError, "data": false, + "message": "Provide either multipart file(s) or ?url=..., not both."}) + return + } + if len(files) == 0 && url == "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeArgumentError, "data": false, + "message": "Missing input: provide multipart file(s) or url"}) + return + } + + if url != "" { + // URL upload path — delegate to file service. + data, err := h.fileService.UploadFromURL(user.ID, url) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": data, "message": "success"}) + return + } + + // Multipart file(s). + uploaded, err := h.fileService.UploadFile(user.ID, "", files) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": err.Error()}) + return + } + + if len(files) == 1 { + if len(uploaded) > 0 { + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": uploaded[0], "message": "success"}) + } else { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": "upload failed"}) + } + return + } + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": uploaded, "message": "success"}) +} + +// batchMetadataUpdate is the shared implementation for both bulk-metadata endpoints. +func (h *DocumentHandler) batchMetadataUpdate(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + datasetID := c.Param("dataset_id") + if datasetID == "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": "dataset_id is required"}) + return + } + + if !h.datasetService.Accessible(datasetID, user.ID) { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, + "message": fmt.Sprintf("You don't own the dataset %s.", datasetID)}) + return + } + + var body struct { + Selector map[string]interface{} `json:"selector"` + Updates []map[string]interface{} `json:"updates"` + Deletes []map[string]interface{} `json:"deletes"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": err.Error()}) + return + } + + // Validate updates. + updates := make([]service.MetadataUpdate, 0, len(body.Updates)) + for _, u := range body.Updates { + key, hasKey := u["key"].(string) + _, hasVal := u["value"] + if !hasKey || key == "" || !hasVal { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": "Each update requires key and value."}) + return + } + updates = append(updates, service.MetadataUpdate{Key: key, Value: u["value"]}) + } + + // Validate deletes. + deletes := make([]service.MetadataDelete, 0, len(body.Deletes)) + for _, d := range body.Deletes { + key, ok := d["key"].(string) + if !ok || key == "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": "Each delete requires key."}) + return + } + deletes = append(deletes, service.MetadataDelete{Key: key}) + } + + // Build selector. + selector := service.MetadataSelector{} + if body.Selector != nil { + if ids, ok := body.Selector["document_ids"].([]interface{}); ok { + for _, id := range ids { + if s, ok := id.(string); ok { + selector.DocumentIDs = append(selector.DocumentIDs, s) + } + } + } + if mc, ok := body.Selector["metadata_condition"].(map[string]interface{}); ok { + selector.MetadataCondition = mc + } + } + + req := &service.BatchUpdateMetadataRequest{ + Selector: selector, + Updates: updates, + Deletes: deletes, + } + + result, err := h.documentService.BatchUpdateDocumentMetadata(datasetID, req) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": result, "message": "success"}) +} + +// UpdateDocumentMetadatas handles PATCH /api/v1/datasets/:dataset_id/documents/metadatas. +// Mirrors Python PATCH /datasets//documents/metadatas (update_metadata). +// @Summary Bulk update document metadata +// @Description Bulk-set or bulk-delete metadata fields on documents in a dataset. +// @Tags documents +// @Accept json +// @Produce json +// @Param dataset_id path string true "Dataset ID" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/datasets/{dataset_id}/documents/metadatas [patch] +func (h *DocumentHandler) UpdateDocumentMetadatas(c *gin.Context) { + h.batchMetadataUpdate(c) +} + +// MetadataBatchUpdate handles POST /api/v1/datasets/:dataset_id/metadata/update. +// Mirrors Python POST /datasets//metadata/update (metadata_batch_update). +// @Summary Batch update document metadata (POST variant) +// @Description Batch-apply updates and deletes to document metadata via selector. +// @Tags documents +// @Accept json +// @Produce json +// @Param dataset_id path string true "Dataset ID" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/datasets/{dataset_id}/metadata/update [post] +func (h *DocumentHandler) MetadataBatchUpdate(c *gin.Context) { + h.batchMetadataUpdate(c) +} + diff --git a/internal/router/router.go b/internal/router/router.go index a64c9f6e496..b5552d2b304 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -203,6 +203,7 @@ func (r *Router) Setup(engine *gin.Engine) { documents := v1.Group("/documents") { documents.POST("", r.documentHandler.CreateDocument) + documents.POST("/upload", r.documentHandler.UploadInfo) documents.GET("", r.documentHandler.ListDocuments) documents.GET("/:id", r.documentHandler.GetDocumentByID) documents.PUT("/:id", r.documentHandler.UpdateDocument) @@ -241,6 +242,8 @@ func (r *Router) Setup(engine *gin.Engine) { // Metadata Config datasets.GET("/:dataset_id/metadata/config", r.datasetsHandler.GetMetadataConfig) datasets.PUT("/:dataset_id/metadata/config", r.datasetsHandler.UpdateMetadataConfig) + datasets.POST("/:dataset_id/metadata/update", r.documentHandler.MetadataBatchUpdate) + datasets.PATCH("/:dataset_id/documents/metadatas", r.documentHandler.UpdateDocumentMetadatas) // Dataset documents datasets.GET("/:dataset_id/documents", r.documentHandler.ListDocuments) diff --git a/internal/service/document.go b/internal/service/document.go index 213c6edb39e..7dbbadd34a2 100644 --- a/internal/service/document.go +++ b/internal/service/document.go @@ -20,17 +20,19 @@ import ( "context" "encoding/json" "fmt" - "ragflow/internal/common" - "ragflow/internal/entity" - "ragflow/internal/storage" "regexp" "sort" "strings" "time" + "go.uber.org/zap" + "ragflow/internal/cache" + "ragflow/internal/common" "ragflow/internal/dao" "ragflow/internal/engine" + "ragflow/internal/entity" + "ragflow/internal/storage" "ragflow/internal/server" @@ -1146,3 +1148,148 @@ func isTimeString(s string) bool { matched, _ := regexp.MatchString(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$`, s) return matched } + +// ── batch metadata update ───────────────────────────────────────────────────── + +// MetadataUpdate is one update item: set key to value. +type MetadataUpdate struct { + Key string `json:"key"` + Value interface{} `json:"value"` +} + +// MetadataDelete is one delete item: remove key. +type MetadataDelete struct { + Key string `json:"key"` +} + +// MetadataSelector selects which documents to target. +type MetadataSelector struct { + DocumentIDs []string `json:"document_ids"` + MetadataCondition map[string]interface{} `json:"metadata_condition"` +} + +// BatchUpdateMetadataRequest is the shared body for both bulk-metadata endpoints. +type BatchUpdateMetadataRequest struct { + Selector MetadataSelector `json:"selector"` + Updates []MetadataUpdate `json:"updates"` + Deletes []MetadataDelete `json:"deletes"` +} + +// BatchUpdateMetadataResult summarises the operation. +type BatchUpdateMetadataResult struct { + Updated int `json:"updated"` + MatchedDocs int `json:"matched_docs"` +} + +// BatchUpdateDocumentMetadata implements the shared logic for +// PATCH /datasets/:dataset_id/documents/metadatas and +// POST /datasets/:dataset_id/metadata/update. +// +// Steps (mirrors Python): +// 1. Validate document_ids belong to the dataset. +// 2. Apply metadata_condition filter if provided. +// 3. Set / delete metadata for each target document. +func (s *DocumentService) BatchUpdateDocumentMetadata(datasetID string, req *BatchUpdateMetadataRequest) (*BatchUpdateMetadataResult, error) { + // Resolve which document IDs to target. + targetDocIDs := make(map[string]struct{}) + + if len(req.Selector.DocumentIDs) > 0 { + // Validate that supplied IDs actually belong to this dataset. + allRows, err := s.documentDAO.GetAllDocIDsByKBIDs([]string{datasetID}) + if err != nil { + return nil, fmt.Errorf("failed to list dataset documents: %w", err) + } + kbDocIDSet := make(map[string]struct{}, len(allRows)) + for _, row := range allRows { + kbDocIDSet[row["id"]] = struct{}{} + } + var invalidIDs []string + for _, id := range req.Selector.DocumentIDs { + if _, ok := kbDocIDSet[id]; !ok { + invalidIDs = append(invalidIDs, id) + } + } + if len(invalidIDs) > 0 { + return nil, fmt.Errorf("these documents do not belong to dataset %s: %s", + datasetID, strings.Join(invalidIDs, ", ")) + } + for _, id := range req.Selector.DocumentIDs { + targetDocIDs[id] = struct{}{} + } + } + + // Apply metadata_condition filter. + if len(req.Selector.MetadataCondition) > 0 { + flattedMeta, err := s.metadataSvc.GetFlattedMetaByKBs([]string{datasetID}) + if err != nil { + return nil, fmt.Errorf("failed to get flattened metadata: %w", err) + } + + // ParseAndConvert mirrors Python convert_conditions: conditions arrive as + // {name, comparison_operator, value}, the operator is normalised, and the + // (possibly non-string) value is preserved. MetaFilter then matches against + // the common.MetaData returned by GetFlattedMetaByKBs. + filterInput := common.ParseAndConvert(req.Selector.MetadataCondition) + filteredIDs := common.MetaFilter(flattedMeta, filterInput) + + filteredSet := make(map[string]struct{}, len(filteredIDs)) + for _, id := range filteredIDs { + filteredSet[id] = struct{}{} + } + + if len(targetDocIDs) > 0 { + // Intersect with the document_ids restriction. + for id := range targetDocIDs { + if _, ok := filteredSet[id]; !ok { + delete(targetDocIDs, id) + } + } + } else { + targetDocIDs = filteredSet + } + + // Early-exit when conditions given but nothing matched. + rawConds, _ := req.Selector.MetadataCondition["conditions"] + if rawConds != nil && len(targetDocIDs) == 0 { + return &BatchUpdateMetadataResult{Updated: 0, MatchedDocs: 0}, nil + } + } + + ids := make([]string, 0, len(targetDocIDs)) + for id := range targetDocIDs { + ids = append(ids, id) + } + + // Apply updates and deletes per document. + updated := 0 + for _, docID := range ids { + if len(req.Updates) > 0 { + meta := make(map[string]interface{}, len(req.Updates)) + for _, u := range req.Updates { + meta[u.Key] = u.Value + } + if err := s.SetDocumentMetadata(docID, meta); err != nil { + common.Warn("BatchUpdateDocumentMetadata: set metadata failed", + zap.String("docID", docID), zap.Error(err)) + continue + } + } + if len(req.Deletes) > 0 { + keys := make([]string, 0, len(req.Deletes)) + for _, d := range req.Deletes { + keys = append(keys, d.Key) + } + if err := s.DeleteDocumentMetadata(docID, keys); err != nil { + common.Warn("BatchUpdateDocumentMetadata: delete metadata failed", + zap.String("docID", docID), zap.Error(err)) + // Only count a document as updated when every requested + // operation (updates AND deletes) succeeded. + continue + } + } + updated++ + } + + return &BatchUpdateMetadataResult{Updated: updated, MatchedDocs: len(ids)}, nil +} + diff --git a/internal/service/file.go b/internal/service/file.go index 872ae573631..d6774ea7ed1 100644 --- a/internal/service/file.go +++ b/internal/service/file.go @@ -21,18 +21,22 @@ import ( "fmt" "io" "mime/multipart" + "net" + "net/http" + "net/url" "os" "path/filepath" + "strings" + "time" + + "github.com/google/uuid" + "ragflow/internal/common" "ragflow/internal/dao" "ragflow/internal/engine" "ragflow/internal/entity" "ragflow/internal/storage" "ragflow/internal/utility" - "strings" - "time" - - "github.com/google/uuid" ) // FileService file service @@ -991,3 +995,202 @@ func (s *FileService) GetStorageAddress(fileID string) (*StorageAddress, error) Name: *doc.Location, }, nil } + +// maxRemoteFileSize bounds the body of a ?url= upload (100 MB). +const maxRemoteFileSize = 100 << 20 + +// UploadFromURL fetches a remote URL, saves the content to the tenant's root +// folder, and returns the file metadata map — mirroring Python +// FileService.upload_info(tenant_id, None, url). +// +// The remote fetch is SSRF-guarded (mirrors Python's assert_url_is_safe): the +// scheme must be http/https and every address the host resolves to must be +// globally routable; the validated IP is pinned for the actual connection — and +// re-validated on each redirect hop — to defeat DNS-rebinding. The HTTP client +// carries connect and overall timeouts, and the response body is bounded with +// truncation detection so an oversized file is rejected rather than silently +// clipped. +func (s *FileService) UploadFromURL(tenantID, rawURL string) (map[string]interface{}, error) { + if rawURL == "" { + return nil, fmt.Errorf("url is required") + } + parsed, err := url.Parse(rawURL) + if err != nil || (parsed.Scheme != "http" && parsed.Scheme != "https") || parsed.Hostname() == "" { + return nil, fmt.Errorf("invalid or unsafe URL") + } + + data, err := fetchRemoteFileSafely(rawURL, maxRemoteFileSize) + if err != nil { + return nil, err + } + + // Derive and sanitize the filename from the URL path. + filename := sanitizeFilename(filepath.Base(parsed.Path)) + + storageImpl := storage.GetStorageFactory().GetStorage() + if storageImpl == nil { + return nil, fmt.Errorf("storage not initialized") + } + + rootFolder, err := s.fileDAO.GetRootFolder(tenantID) + if err != nil { + return nil, fmt.Errorf("failed to get root folder: %w", err) + } + + location := filename + for storageImpl.ObjExist(rootFolder.ID, location) { + location += "_" + } + if err := storageImpl.Put(rootFolder.ID, location, data); err != nil { + return nil, fmt.Errorf("failed to store file: %w", err) + } + + uniqueName := s.getUniqueFilename(filename, rootFolder.ID) + fileType := utility.FilenameType(filename) + fileRecord := &entity.File{ + ID: s.generateUUID(), + ParentID: rootFolder.ID, + TenantID: tenantID, + CreatedBy: tenantID, + Name: uniqueName, + Location: &location, + Size: int64(len(data)), + Type: fileType, + } + if err := s.fileDAO.Insert(fileRecord); err != nil { + return nil, fmt.Errorf("failed to insert file record: %w", err) + } + return s.toFileResponse(fileRecord), nil +} + +// fetchRemoteFileSafely downloads rawURL with SSRF protection, connect/overall +// timeouts, and a hard size cap that rejects (rather than truncates) oversized +// bodies. +func fetchRemoteFileSafely(rawURL string, maxSize int64) ([]byte, error) { + dialer := &net.Dialer{Timeout: 10 * time.Second} + transport := &http.Transport{ + // DialContext resolves the host, rejects any non-public IP, and dials a + // validated, pinned address. Because it runs for every connection it + // also re-validates the target of each redirect hop. + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + ips, err := net.DefaultResolver.LookupIP(ctx, "ip", host) + if err != nil { + return nil, fmt.Errorf("could not resolve host %q: %w", host, err) + } + var pinned net.IP + for _, ip := range ips { + if !isPublicIP(ip) { + return nil, fmt.Errorf("URL resolves to a non-public address (%s)", ip) + } + if pinned == nil { + pinned = ip + } + } + if pinned == nil { + return nil, fmt.Errorf("host %q resolved to no addresses", host) + } + return dialer.DialContext(ctx, network, net.JoinHostPort(pinned.String(), port)) + }, + } + client := &http.Client{ + Transport: transport, + Timeout: 30 * time.Second, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + if len(via) >= 5 { + return fmt.Errorf("stopped after too many redirects") + } + if req.URL.Scheme != "http" && req.URL.Scheme != "https" { + return fmt.Errorf("disallowed redirect scheme %q", req.URL.Scheme) + } + return nil + }, + } + + resp, err := client.Get(rawURL) // #nosec G107 — scheme validated, IP resolved/validated/pinned in DialContext + if err != nil { + return nil, fmt.Errorf("failed to fetch URL: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode >= 400 { + return nil, fmt.Errorf("remote URL returned HTTP %d", resp.StatusCode) + } + + // Read one byte past the cap so an oversized body is detected and rejected + // rather than silently truncated by io.LimitReader. + data, err := io.ReadAll(io.LimitReader(resp.Body, maxSize+1)) + if err != nil { + return nil, fmt.Errorf("failed to read remote content: %w", err) + } + if int64(len(data)) > maxSize { + return nil, fmt.Errorf("remote file exceeds the maximum allowed size of %d bytes", maxSize) + } + return data, nil +} + +// isPublicIP reports whether ip is a globally routable address. It mirrors the +// allowlist intent of Python's assert_url_is_safe (which requires ip.is_global) +// by rejecting loopback, private, link-local, multicast, unspecified, and +// carrier-grade NAT ranges. IPv4-mapped IPv6 addresses are handled by the +// stdlib predicates. +func isPublicIP(ip net.IP) bool { + if ip == nil { + return false + } + if ip.IsLoopback() || ip.IsPrivate() || ip.IsUnspecified() || + ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() || + ip.IsMulticast() || ip.IsInterfaceLocalMulticast() { + return false + } + // Carrier-grade NAT 100.64.0.0/10 (RFC 6598) — not covered by IsPrivate. + if ip4 := ip.To4(); ip4 != nil && ip4[0] == 100 && ip4[1]&0xc0 == 0x40 { + return false + } + return true +} + +// reservedDeviceNames are Windows reserved filenames that must never be used. +var reservedDeviceNames = map[string]bool{ + "CON": true, "PRN": true, "AUX": true, "NUL": true, + "COM1": true, "COM2": true, "COM3": true, "COM4": true, "COM5": true, + "COM6": true, "COM7": true, "COM8": true, "COM9": true, + "LPT1": true, "LPT2": true, "LPT3": true, "LPT4": true, "LPT5": true, + "LPT6": true, "LPT7": true, "LPT8": true, "LPT9": true, +} + +// sanitizeFilename produces a safe, filesystem-friendly filename from an +// arbitrary URL path segment: it strips directory components, replaces unsafe / +// control characters, rejects reserved names, bounds the length, and falls back +// to "download". +func sanitizeFilename(name string) string { + name = filepath.Base(name) + name = strings.TrimSpace(name) + + name = strings.Map(func(r rune) rune { + switch r { + case '/', '\\', ':', '*', '?', '"', '<', '>', '|', 0: + return '_' + } + if r < 0x20 { // control characters + return '_' + } + return r + }, name) + + // Strip leading/trailing dots and spaces to avoid hidden or reserved forms. + name = strings.Trim(name, ". ") + + if name == "" || name == "." || name == ".." { + return "download" + } + if stem := strings.SplitN(strings.ToUpper(name), ".", 2)[0]; reservedDeviceNames[stem] { + return "download" + } + if len(name) > 255 { + name = name[:255] + } + return name +}