diff --git a/internal/dao/agent_session.go b/internal/dao/agent_session.go new file mode 100644 index 00000000000..e7be38808f7 --- /dev/null +++ b/internal/dao/agent_session.go @@ -0,0 +1,111 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package dao + +import ( + "ragflow/internal/entity" +) + +// AgentSessionDAO data access for agent sessions (conversation table, dialog_id = canvas id). +type AgentSessionDAO struct{} + +// NewAgentSessionDAO creates an AgentSessionDAO. +func NewAgentSessionDAO() *AgentSessionDAO { + return &AgentSessionDAO{} +} + +// GetByID returns a single agent session by conversation ID. +func (d *AgentSessionDAO) GetByID(id string) (*entity.AgentSession, error) { + var s entity.AgentSession + err := DB.Where("id = ?", id).First(&s).Error + return &s, err +} + +// ListByAgentID returns all sessions for a given agent (canvas) ID, ordered newest first. +func (d *AgentSessionDAO) ListByAgentID(agentID string) ([]*entity.AgentSession, error) { + var sessions []*entity.AgentSession + err := DB.Where("dialog_id = ?", agentID). + Order("create_time DESC"). + Find(&sessions).Error + return sessions, err +} + +// ListByAgentIDPaged returns paginated sessions for a given agent ID. +func (d *AgentSessionDAO) ListByAgentIDPaged( + agentID string, + page, pageSize int, + orderby string, + desc bool, + sessionID, userID, keywords string, +) ([]*entity.AgentSession, int64, error) { + base := DB.Model(&entity.AgentSession{}).Where("dialog_id = ?", agentID) + + if sessionID != "" { + base = base.Where("id = ?", sessionID) + } + if userID != "" { + base = base.Where("user_id = ?", userID) + } + if keywords != "" { + base = base.Where("name LIKE ?", "%"+keywords+"%") + } + + var total int64 + if err := base.Count(&total).Error; err != nil { + return nil, 0, err + } + + order := orderby + if order == "" { + order = "update_time" + } + if desc { + order += " DESC" + } else { + order += " ASC" + } + + query := base.Order(order) + if page > 0 && pageSize > 0 { + query = query.Offset((page - 1) * pageSize).Limit(pageSize) + } + + var sessions []*entity.AgentSession + err := query.Find(&sessions).Error + return sessions, total, err +} + +// DeleteByID hard-deletes a session by ID. +func (d *AgentSessionDAO) DeleteByID(id string) error { + return DB.Unscoped().Where("id = ?", id).Delete(&entity.AgentSession{}).Error +} + +// GetByIDAndAgentID returns a session only if it belongs to the given agent. +func (d *AgentSessionDAO) GetByIDAndAgentID(sessionID, agentID string) (*entity.AgentSession, error) { + var s entity.AgentSession + err := DB.Where("id = ? AND dialog_id = ?", sessionID, agentID).First(&s).Error + return &s, err +} + +// BelongsToAgent checks whether a session belongs to a specific agent (canvas owner). +func (d *AgentSessionDAO) BelongsToAgent(sessionID, agentID string) (bool, error) { + var count int64 + err := DB.Model(&entity.AgentSession{}). + Where("id = ? AND dialog_id = ?", sessionID, agentID). + Count(&count).Error + return count > 0, err +} diff --git a/internal/entity/canvas.go b/internal/entity/canvas.go index c9c93108564..0ca563a21d0 100644 --- a/internal/entity/canvas.go +++ b/internal/entity/canvas.go @@ -16,6 +16,28 @@ package entity +import "encoding/json" + +// AgentSession maps to the conversation table for agent-owned sessions. +// It extends the basic ChatSession fields with agent-specific columns. +type AgentSession struct { + ID string `gorm:"column:id;primaryKey;size:32" json:"id"` + DialogID string `gorm:"column:dialog_id;size:32;not null;index" json:"agent_id"` + Name *string `gorm:"column:name;size:255" json:"name,omitempty"` + Message json.RawMessage `gorm:"column:message;type:longtext" json:"message,omitempty"` + Reference json.RawMessage `gorm:"column:reference;type:longtext" json:"reference"` + UserID *string `gorm:"column:user_id;size:255" json:"user_id,omitempty"` + ExpUserID *string `gorm:"column:exp_user_id;size:255" json:"exp_user_id,omitempty"` + Source *string `gorm:"column:source;size:64" json:"source,omitempty"` + VersionTitle *string `gorm:"column:version_title;size:255" json:"version_title,omitempty"` + BaseModel +} + +// TableName maps AgentSession to the conversation table. +func (AgentSession) TableName() string { + return "conversation" +} + // UserCanvas user canvas model type UserCanvas struct { ID string `gorm:"column:id;primaryKey;size:32" json:"id"` diff --git a/internal/handler/agent.go b/internal/handler/agent.go index db35ba192bc..c914e6f5227 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -21,6 +21,7 @@ import ( "fmt" "mime/multipart" "net/http" + "net/url" "strconv" "strings" @@ -31,6 +32,7 @@ import ( "ragflow/internal/common" "ragflow/internal/entity" "ragflow/internal/service" + "ragflow/internal/utility" ) // AgentHandler agent handler @@ -361,6 +363,15 @@ type updateAgentTagsRequest struct { Tags interface{} `json:"tags"` } +// UpdateAgentTags updates the tag set on an agent canvas. +// @Summary Update Agent Tags +// @Description Replace the tags on an agent canvas (owner / team access required). +// @Tags agents +// @Accept json +// @Produce json +// @Param agent_id path string true "Agent ID" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/agents/{agent_id}/tags [put] func (h *AgentHandler) UpdateAgentTags(c *gin.Context) { user, errorCode, errorMessage := GetUser(c) if errorCode != common.CodeSuccess { @@ -394,3 +405,360 @@ func (h *AgentHandler) UpdateAgentTags(c *gin.Context) { "message": "success", }) } + +// GetAgent returns a single agent canvas with ownership validation. +// @Summary Get Agent +// @Description Retrieve a single agent canvas by ID (access check included). +// @Tags agents +// @Produce json +// @Param agent_id path string true "Agent ID" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/agents/{agent_id} [get] +func (h *AgentHandler) GetAgent(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + agentID := c.Param("agent_id") + if agentID == "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeArgumentError, "data": nil, "message": "agent_id is required"}) + return + } + + canvas, err := h.agentService.GetAgent(user.ID, agentID) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeOperatingError, "data": false, "message": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": canvas, "message": ""}) +} + +// ListAgentSessions lists sessions for an agent canvas. +// @Summary List Agent Sessions +// @Description Return paginated sessions for an agent canvas. +// @Tags agents +// @Produce json +// @Param agent_id path string true "Agent ID" +// @Param page query int false "Page number (default 1)" +// @Param page_size query int false "Items per page (default 30)" +// @Param orderby query string false "Order-by field (default: update_time)" +// @Param desc query bool false "Descending order (default: true)" +// @Param id query string false "Filter by session ID" +// @Param user_id query string false "Filter by user ID" +// @Param keywords query string false "Search keyword" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/agents/{agent_id}/sessions [get] +func (h *AgentHandler) ListAgentSessions(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + agentID := c.Param("agent_id") + if agentID == "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeArgumentError, "data": nil, "message": "agent_id is required"}) + return + } + + ok, err := h.agentService.CheckCanvasAccess(user.ID, agentID) + if err != nil || !ok { + c.JSON(http.StatusOK, gin.H{"code": common.CodeOperatingError, "data": false, "message": "Make sure you have permission to access the agent."}) + return + } + + page := 1 + if v := c.Query("page"); v != "" { + if p, err := strconv.Atoi(v); err == nil && p > 0 { + page = p + } + } + pageSize := 30 + if v := c.Query("page_size"); v != "" { + if ps, err := strconv.Atoi(v); err == nil && ps > 0 { + if ps > 100 { + ps = 100 + } + pageSize = ps + } + } + desc := true + if v := c.Query("desc"); v != "" { + desc = strings.ToLower(v) != "false" + } + + params := service.ListAgentSessionsParams{ + SessionID: c.Query("id"), + UserID: c.Query("user_id"), + Page: page, + PageSize: pageSize, + Orderby: c.DefaultQuery("orderby", "update_time"), + Desc: desc, + Keywords: c.Query("keywords"), + } + + sessions, total, err := h.agentService.ListAgentSessions(agentID, params) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeServerError, "data": nil, "message": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": common.CodeSuccess, + "data": sessions, + "total": total, + "message": "success", + }) +} + +// GetAgentSession returns a single session by ID. +// @Summary Get Agent Session +// @Description Retrieve a single agent session by session ID. +// @Tags agents +// @Produce json +// @Param agent_id path string true "Agent ID" +// @Param session_id path string true "Session ID" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/agents/{agent_id}/sessions/{session_id} [get] +func (h *AgentHandler) GetAgentSession(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + agentID := c.Param("agent_id") + sessionID := c.Param("session_id") + if agentID == "" || sessionID == "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeArgumentError, "data": nil, "message": "agent_id and session_id are required"}) + return + } + + ok, err := h.agentService.CheckCanvasAccess(user.ID, agentID) + if err != nil || !ok { + c.JSON(http.StatusOK, gin.H{"code": common.CodeOperatingError, "data": false, "message": "Make sure you have permission to access the agent."}) + return + } + + session, err := h.agentService.GetAgentSession(agentID, sessionID) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeNotFound, "data": false, "message": "Session not found!"}) + return + } + + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": session, "message": ""}) +} + +// DeleteAgentSessionItem deletes a single session by ID. +// @Summary Delete Agent Session +// @Description Delete a single agent session by session ID. +// @Tags agents +// @Produce json +// @Param agent_id path string true "Agent ID" +// @Param session_id path string true "Session ID" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/agents/{agent_id}/sessions/{session_id} [delete] +func (h *AgentHandler) DeleteAgentSessionItem(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + agentID := c.Param("agent_id") + sessionID := c.Param("session_id") + if agentID == "" || sessionID == "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeArgumentError, "data": nil, "message": "agent_id and session_id are required"}) + return + } + + ok, err := h.agentService.CheckCanvasAccess(user.ID, agentID) + if err != nil || !ok { + c.JSON(http.StatusOK, gin.H{"code": common.CodeOperatingError, "data": false, "message": "Make sure you have permission to access the agent."}) + return + } + + if err := h.agentService.DeleteAgentSessionByID(agentID, sessionID); err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeNotFound, "data": false, "message": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": true, "message": ""}) +} + +// DeleteAgentSessions bulk-deletes sessions for an agent. +// @Summary Bulk Delete Agent Sessions +// @Description Delete multiple sessions by IDs, or all sessions when delete_all=true. +// @Tags agents +// @Accept json +// @Produce json +// @Param agent_id path string true "Agent ID" +// @Param body body object false "Session IDs or delete_all flag" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/agents/{agent_id}/sessions [delete] +func (h *AgentHandler) DeleteAgentSessions(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + agentID := c.Param("agent_id") + if agentID == "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeArgumentError, "data": nil, "message": "agent_id is required"}) + return + } + + // Must be canvas owner for bulk delete (mirrors Python _require_canvas_owner check). + ok, err := h.agentService.CheckCanvasOwner(user.ID, agentID) + if err != nil || !ok { + c.JSON(http.StatusOK, gin.H{"code": common.CodeOperatingError, "data": false, "message": fmt.Sprintf("You don't own the agent %s", agentID)}) + return + } + + var body struct { + IDs []string `json:"ids"` + DeleteAll bool `json:"delete_all"` + } + if err := c.ShouldBindJSON(&body); err != nil { + // Empty / missing body is OK — means no-op. + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": true, "message": ""}) + return + } + + if len(body.IDs) == 0 && !body.DeleteAll { + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": true, "message": ""}) + return + } + + result, err := h.agentService.DeleteAgentSessions(agentID, body.IDs, body.DeleteAll) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeServerError, "data": false, "message": err.Error()}) + return + } + + if len(result.Errors) > 0 && result.SuccessCount == 0 { + c.JSON(http.StatusOK, gin.H{"code": common.CodeOperatingError, "data": false, "message": strings.Join(result.Errors, "; ")}) + return + } + + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": true, "message": ""}) +} + +// GetAgentLogs retrieves execution logs for a session message from Redis. +// @Summary Get Agent Logs +// @Description Retrieve agent execution logs for a specific message ID from Redis. +// @Tags agents +// @Produce json +// @Param agent_id path string true "Agent ID" +// @Param message_id path string true "Message ID" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/agents/{agent_id}/logs/{message_id} [get] +func (h *AgentHandler) GetAgentLogs(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + agentID := c.Param("agent_id") + messageID := c.Param("message_id") + if agentID == "" || messageID == "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeArgumentError, "data": nil, "message": "agent_id and message_id are required"}) + return + } + + ok, err := h.agentService.CheckCanvasAccess(user.ID, agentID) + if err != nil || !ok { + c.JSON(http.StatusOK, gin.H{"code": common.CodeOperatingError, "data": false, "message": "Make sure you have permission to access the agent."}) + return + } + + logs, err := h.agentService.GetAgentLogs(agentID, messageID) + if err != nil { + common.Warn("get agent logs failed", zap.String("agent_id", agentID), zap.String("message_id", messageID), zap.Error(err)) + c.JSON(http.StatusOK, gin.H{"code": common.CodeServerError, "data": false, "message": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": logs, "message": ""}) +} + +// DownloadAgentFile downloads a file blob associated with an agent. +// @Summary Download Agent File +// @Description Download a file by its ID. The file must belong to the caller's tenant. +// @Tags agents +// @Produce application/octet-stream +// @Param id query string true "File ID" +// @Success 200 {file} binary +// @Router /api/v1/agents/download [get] +func (h *AgentHandler) DownloadAgentFile(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + fileID := c.Query("id") + if fileID == "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeArgumentError, "data": nil, "message": "id is required"}) + return + } + + blob, fileName, err := h.agentService.DownloadAgentFile(user.ID, fileID) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeOperatingError, "data": false, "message": err.Error()}) + return + } + + ext := utility.GetFileExtension(fileName) + contentType := utility.GetContentType(ext, "") + if contentType == "" { + contentType = "application/octet-stream" + } + if utility.ShouldForceAttachment(ext, contentType) { + c.Header("X-Content-Type-Options", "nosniff") + c.Header("Content-Disposition", "attachment; filename*=UTF-8''"+url.QueryEscape(fileName)) + } + c.Data(http.StatusOK, contentType, blob) +} + +// DownloadAttachment streams an attachment file to the caller. +// @Summary Download Attachment +// @Description Download an attachment by storage object name. The caller's tenant ID is used as the bucket. +// @Tags agents +// @Produce application/octet-stream +// @Param attachment_id path string true "Attachment object name" +// @Param ext query string false "File extension hint for content-type (default: markdown)" +// @Success 200 {file} binary +// @Router /api/v1/agents/attachments/{attachment_id}/download [get] +func (h *AgentHandler) DownloadAttachment(c *gin.Context) { + user, errorCode, errorMessage := GetUser(c) + if errorCode != common.CodeSuccess { + jsonError(c, errorCode, errorMessage) + return + } + + attachmentID := c.Param("attachment_id") + if attachmentID == "" { + c.JSON(http.StatusOK, gin.H{"code": common.CodeArgumentError, "data": nil, "message": "attachment_id is required"}) + return + } + + ext := c.DefaultQuery("ext", "markdown") + + blob, err := h.agentService.DownloadAttachment(user.ID, attachmentID) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeServerError, "data": false, "message": err.Error()}) + return + } + + contentType := utility.GetContentType(ext, "") + if contentType == "" { + contentType = "application/octet-stream" + } + c.Data(http.StatusOK, contentType, blob) +} diff --git a/internal/router/router.go b/internal/router/router.go index 765a8a2aa79..7ca5f4d0551 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -366,11 +366,21 @@ func (r *Router) Setup(engine *gin.Engine) { agents := v1.Group("/agents") { agents.GET("", r.agentHandler.ListAgents) + // Static-path routes must be registered before parameterised routes + // so gin's radix tree resolves them with priority. agents.GET("/templates", r.agentHandler.ListTemplates) + agents.GET("/download", r.agentHandler.DownloadAgentFile) + agents.GET("/attachments/:attachment_id/download", r.agentHandler.DownloadAttachment) + agents.GET("/:agent_id", r.agentHandler.GetAgent) agents.GET("/:agent_id/versions", r.agentHandler.ListAgentVersions) agents.GET("/:agent_id/versions/:version_id", r.agentHandler.GetAgentVersion) agents.POST("/:agent_id/upload", r.agentHandler.UploadAgentFile) agents.PUT("/:agent_id/tags", r.agentHandler.UpdateAgentTags) + agents.GET("/:agent_id/sessions", r.agentHandler.ListAgentSessions) + agents.GET("/:agent_id/sessions/:session_id", r.agentHandler.GetAgentSession) + agents.DELETE("/:agent_id/sessions/:session_id", r.agentHandler.DeleteAgentSessionItem) + agents.DELETE("/:agent_id/sessions", r.agentHandler.DeleteAgentSessions) + agents.GET("/:agent_id/logs/:message_id", r.agentHandler.GetAgentLogs) } connector := v1.Group("/connectors") diff --git a/internal/service/agent.go b/internal/service/agent.go index 3eb284c7d73..3d5a0d5b70c 100644 --- a/internal/service/agent.go +++ b/internal/service/agent.go @@ -17,12 +17,18 @@ package service import ( + "encoding/json" + "errors" "fmt" "strings" + "gorm.io/gorm" + + "ragflow/internal/cache" "ragflow/internal/common" "ragflow/internal/dao" "ragflow/internal/entity" + "ragflow/internal/storage" ) const ( @@ -36,6 +42,8 @@ type AgentService struct { userTenantDAO *dao.UserTenantDAO userCanvasVersionDAO *dao.UserCanvasVersionDAO canvasTemplateDAO *dao.CanvasTemplateDAO + agentSessionDAO *dao.AgentSessionDAO + fileDAO *dao.FileDAO } // NewAgentService create agent service @@ -45,6 +53,8 @@ func NewAgentService() *AgentService { userTenantDAO: dao.NewUserTenantDAO(), userCanvasVersionDAO: dao.NewUserCanvasVersionDAO(), canvasTemplateDAO: dao.NewCanvasTemplateDAO(), + agentSessionDAO: dao.NewAgentSessionDAO(), + fileDAO: dao.NewFileDAO(), } } @@ -274,3 +284,191 @@ func (s *AgentService) GetVersion(canvasID, versionID string) (*entity.UserCanva } return version, nil } + +// GetAgent returns a single agent canvas after verifying the caller has access. +func (s *AgentService) GetAgent(userID, agentID string) (*entity.UserCanvas, error) { + canvas, err := s.canvasDAO.GetByID(agentID) + if err != nil { + return nil, err + } + // Verify access: owner always OK; team permission requires joined-tenant check. + if canvas.UserID != userID { + if canvas.Permission != string(entity.TenantPermissionTeam) { + return nil, fmt.Errorf("canvas not found") + } + tenantIDs, err := s.userTenantDAO.GetTenantIDsByUserID(userID) + if err != nil { + return nil, err + } + found := false + for _, tid := range tenantIDs { + if canvas.UserID == tid { + found = true + break + } + } + if !found { + return nil, fmt.Errorf("canvas not found") + } + } + return canvas, nil +} + +// CheckCanvasOwner returns true only if userID is the direct owner of the canvas. +// Used for operations that require ownership (not just access). +func (s *AgentService) CheckCanvasOwner(userID, canvasID string) (bool, error) { + canvas, err := s.canvasDAO.GetByID(canvasID) + if err != nil { + return false, err + } + return canvas.UserID == userID, nil +} + +// ListAgentSessionsParams bundles optional filter parameters. +type ListAgentSessionsParams struct { + SessionID string + UserID string + Page int + PageSize int + Orderby string + Desc bool + Keywords string +} + +// ListAgentSessions returns paginated sessions for an agent. +func (s *AgentService) ListAgentSessions(agentID string, p ListAgentSessionsParams) ([]*entity.AgentSession, int64, error) { + return s.agentSessionDAO.ListByAgentIDPaged( + agentID, + p.Page, + p.PageSize, + p.Orderby, + p.Desc, + p.SessionID, + p.UserID, + p.Keywords, + ) +} + +// GetAgentSession returns a single session, verifying it belongs to the given agent. +func (s *AgentService) GetAgentSession(agentID, sessionID string) (*entity.AgentSession, error) { + session, err := s.agentSessionDAO.GetByIDAndAgentID(sessionID, agentID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, fmt.Errorf("session not found") + } + return nil, err + } + return session, nil +} + +// DeleteAgentSessionByID deletes a single session after verifying it belongs to the agent. +func (s *AgentService) DeleteAgentSessionByID(agentID, sessionID string) error { + belongs, err := s.agentSessionDAO.BelongsToAgent(sessionID, agentID) + if err != nil { + return err + } + if !belongs { + return fmt.Errorf("session not found") + } + return s.agentSessionDAO.DeleteByID(sessionID) +} + +// DeleteAgentSessionsResult captures the outcome of a bulk delete. +type DeleteAgentSessionsResult struct { + SuccessCount int + Errors []string +} + +// DeleteAgentSessions bulk-deletes sessions belonging to the agent. +// If ids is empty and deleteAll is true, all sessions for the agent are deleted. +func (s *AgentService) DeleteAgentSessions(agentID string, ids []string, deleteAll bool) (*DeleteAgentSessionsResult, error) { + if len(ids) == 0 && deleteAll { + all, err := s.agentSessionDAO.ListByAgentID(agentID) + if err != nil { + return nil, err + } + ids = make([]string, len(all)) + for i, sess := range all { + ids[i] = sess.ID + } + } + + res := &DeleteAgentSessionsResult{} + seen := make(map[string]bool, len(ids)) + for _, id := range ids { + if seen[id] { + res.Errors = append(res.Errors, fmt.Sprintf("duplicate session id: %s", id)) + continue + } + seen[id] = true + + belongs, err := s.agentSessionDAO.BelongsToAgent(id, agentID) + if err != nil || !belongs { + res.Errors = append(res.Errors, fmt.Sprintf("the agent doesn't own the session %s", id)) + continue + } + if err := s.agentSessionDAO.DeleteByID(id); err != nil { + res.Errors = append(res.Errors, fmt.Sprintf("failed to delete session %s: %v", id, err)) + continue + } + res.SuccessCount++ + } + return res, nil +} + +// GetAgentLogs retrieves execution logs for a given message from Redis. +// Key format mirrors Python: "{agent_id}-{message_id}-logs". +func (s *AgentService) GetAgentLogs(agentID, messageID string) (interface{}, error) { + redisClient := cache.Get() + if redisClient == nil { + return map[string]interface{}{}, nil + } + key := fmt.Sprintf("%s-%s-logs", agentID, messageID) + raw, err := redisClient.Get(key) + if err != nil || raw == "" { + return map[string]interface{}{}, nil + } + var payload interface{} + if err := json.Unmarshal([]byte(raw), &payload); err != nil { + return map[string]interface{}{}, nil + } + return payload, nil +} + +// DownloadAgentFile retrieves the raw bytes for a file owned by the given tenant. +// Returns the blob and the file name (for content-type inference). +func (s *AgentService) DownloadAgentFile(tenantID, fileID string) ([]byte, string, error) { + file, err := s.fileDAO.GetByID(fileID) + if err != nil { + return nil, "", fmt.Errorf("file not found") + } + if file.TenantID != tenantID { + return nil, "", fmt.Errorf("file not found") + } + if file.Location == nil || *file.Location == "" { + return nil, "", fmt.Errorf("file has no storage location") + } + storageImpl := storage.GetStorageFactory().GetStorage() + if storageImpl == nil { + return nil, "", fmt.Errorf("storage not initialized") + } + blob, err := storageImpl.Get(file.ParentID, *file.Location) + if err != nil { + return nil, "", err + } + return blob, file.Name, nil +} + +// DownloadAttachment retrieves raw bytes for an attachment stored under the tenant's bucket. +// attachmentID is the object name (storage key), tenantID is the bucket. +func (s *AgentService) DownloadAttachment(tenantID, attachmentID string) ([]byte, error) { + storageImpl := storage.GetStorageFactory().GetStorage() + if storageImpl == nil { + return nil, fmt.Errorf("storage not initialized") + } + blob, err := storageImpl.Get(tenantID, attachmentID) + if err != nil { + return nil, err + } + return blob, nil +}