From b70d6008d0d5aa30fc48502e4dd34bad8cfd8281 Mon Sep 17 00:00:00 2001 From: hunnyboy1217 Date: Thu, 4 Jun 2026 08:30:29 -0600 Subject: [PATCH 1/2] feat[Go]: implement Langfuse API key management endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports three Langfuse credential endpoints from Python (langfuse_api.py) to Go, closing issue #15675: POST/PUT /api/v1/langfuse/api-key (LangfuseHandler.SetAPIKey) GET /api/v1/langfuse/api-key (LangfuseHandler.GetAPIKey) DELETE /api/v1/langfuse/api-key (LangfuseHandler.DeleteAPIKey) New files: internal/entity/langfuse.go – TenantLangfuse entity (tenant_langfuse table) internal/dao/langfuse.go – LangfuseDAO: GetByTenantID, Create, UpdateByTenantID, DeleteByTenantID internal/service/langfuse.go – LangfuseService: SetAPIKey, GetAPIKey, DeleteAPIKey; langfuseAuthCheck and langfuseGetProjects HTTP helpers internal/handler/langfuse.go – LangfuseHandler with three HTTP handlers Changed files: internal/router/router.go – Router struct gains langfuseHandler; NewRouter constructs it; four routes added Functional parity with Python: SetAPIKey – validates all three fields; calls GET {host}/api/public/projects with Basic Auth (mirrors langfuse.auth_check()); upserts record; secret_key not echoed in response GetAPIKey – validates stored keys; fetches project list from Langfuse; returns public_key, host, project_id, project_name; secret_key intentionally excluded from response DeleteAPIKey – returns "Have not record" message when no keys stored, otherwise hard-deletes and returns true --- internal/dao/langfuse.go | 59 +++++++++ internal/entity/langfuse.go | 32 +++++ internal/handler/langfuse.go | 125 +++++++++++++++++++ internal/router/router.go | 11 ++ internal/service/langfuse.go | 227 +++++++++++++++++++++++++++++++++++ 5 files changed, 454 insertions(+) create mode 100644 internal/dao/langfuse.go create mode 100644 internal/entity/langfuse.go create mode 100644 internal/handler/langfuse.go create mode 100644 internal/service/langfuse.go diff --git a/internal/dao/langfuse.go b/internal/dao/langfuse.go new file mode 100644 index 00000000000..7b8d8d1a685 --- /dev/null +++ b/internal/dao/langfuse.go @@ -0,0 +1,59 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package dao + +import ( + "ragflow/internal/entity" +) + +// LangfuseDAO data access for tenant_langfuse table. +type LangfuseDAO struct{} + +// NewLangfuseDAO creates a LangfuseDAO. +func NewLangfuseDAO() *LangfuseDAO { + return &LangfuseDAO{} +} + +// GetByTenantID returns the Langfuse credential record for a tenant, or nil +// when none exists. +func (d *LangfuseDAO) GetByTenantID(tenantID string) (*entity.TenantLangfuse, error) { + var entry entity.TenantLangfuse + err := DB.Where("tenant_id = ?", tenantID).First(&entry).Error + if err != nil { + return nil, err + } + return &entry, nil +} + +// Create inserts a new TenantLangfuse record. +func (d *LangfuseDAO) Create(entry *entity.TenantLangfuse) error { + return DB.Create(entry).Error +} + +// UpdateByTenantID applies updates to the record for a tenant. +func (d *LangfuseDAO) UpdateByTenantID(tenantID string, updates map[string]interface{}) error { + return DB.Model(&entity.TenantLangfuse{}). + Where("tenant_id = ?", tenantID). + Updates(updates).Error +} + +// DeleteByTenantID hard-deletes the record for a tenant. +func (d *LangfuseDAO) DeleteByTenantID(tenantID string) error { + return DB.Unscoped(). + Where("tenant_id = ?", tenantID). + Delete(&entity.TenantLangfuse{}).Error +} diff --git a/internal/entity/langfuse.go b/internal/entity/langfuse.go new file mode 100644 index 00000000000..091432b5a59 --- /dev/null +++ b/internal/entity/langfuse.go @@ -0,0 +1,32 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package entity + +// TenantLangfuse stores per-tenant Langfuse credentials. +type TenantLangfuse struct { + ID string `gorm:"column:id;primaryKey;size:32" json:"id"` + TenantID string `gorm:"column:tenant_id;size:32;not null;uniqueIndex" json:"tenant_id"` + SecretKey string `gorm:"column:secret_key;size:255;not null" json:"secret_key"` + PublicKey string `gorm:"column:public_key;size:255;not null" json:"public_key"` + Host string `gorm:"column:host;size:255;not null" json:"host"` + BaseModel +} + +// TableName maps to the tenant_langfuse table (matches Python model). +func (TenantLangfuse) TableName() string { + return "tenant_langfuse" +} diff --git a/internal/handler/langfuse.go b/internal/handler/langfuse.go new file mode 100644 index 00000000000..4936161b973 --- /dev/null +++ b/internal/handler/langfuse.go @@ -0,0 +1,125 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package handler + +import ( + "net/http" + + "github.com/gin-gonic/gin" + + "ragflow/internal/common" + "ragflow/internal/service" +) + +// LangfuseHandler manages Langfuse credential endpoints. +type LangfuseHandler struct { + langfuseService *service.LangfuseService +} + +// NewLangfuseHandler creates a LangfuseHandler. +func NewLangfuseHandler() *LangfuseHandler { + return &LangfuseHandler{langfuseService: service.NewLangfuseService()} +} + +// SetAPIKey handles POST/PUT /api/v1/langfuse/api-key. +// Validates the supplied keys against Langfuse, then upserts the record. +// Secret key is stored but not echoed back to the caller. +// @Summary Set Langfuse API key +// @Description Create or update the Langfuse credentials for the current tenant. +// @Tags langfuse +// @Accept json +// @Produce json +// @Param request body service.SetAPIKeyRequest true "Langfuse credentials" +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/langfuse/api-key [post] +func (h *LangfuseHandler) SetAPIKey(c *gin.Context) { + user, code, msg := GetUser(c) + if code != common.CodeSuccess { + jsonError(c, code, msg) + return + } + + var req service.SetAPIKeyRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": err.Error()}) + return + } + + data, err := h.langfuseService.SetAPIKey(user.ID, &req) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": data, "message": "success"}) +} + +// GetAPIKey handles GET /api/v1/langfuse/api-key. +// Returns stored metadata and Langfuse project info; secret_key is never returned. +// @Summary Get Langfuse API key info +// @Description Retrieve the stored Langfuse credentials (without secret_key) and project info. +// @Tags langfuse +// @Produce json +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/langfuse/api-key [get] +func (h *LangfuseHandler) GetAPIKey(c *gin.Context) { + user, code, msg := GetUser(c) + if code != common.CodeSuccess { + jsonError(c, code, msg) + return + } + + data, err := h.langfuseService.GetAPIKey(user.ID) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": err.Error()}) + return + } + if data == nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": nil, "message": "Have not record any Langfuse keys."}) + return + } + + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": data, "message": "success"}) +} + +// DeleteAPIKey handles DELETE /api/v1/langfuse/api-key. +// Removes the stored Langfuse credentials for the current tenant. +// @Summary Delete Langfuse API key +// @Description Remove the stored Langfuse credentials for the current tenant. +// @Tags langfuse +// @Produce json +// @Success 200 {object} map[string]interface{} +// @Router /api/v1/langfuse/api-key [delete] +func (h *LangfuseHandler) DeleteAPIKey(c *gin.Context) { + user, code, msg := GetUser(c) + if code != common.CodeSuccess { + jsonError(c, code, msg) + return + } + + deleted, err := h.langfuseService.DeleteAPIKey(user.ID) + if err != nil { + c.JSON(http.StatusOK, gin.H{"code": common.CodeDataError, "data": false, "message": err.Error()}) + return + } + if !deleted { + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": nil, "message": "Have not record any Langfuse keys."}) + return + } + + c.JSON(http.StatusOK, gin.H{"code": common.CodeSuccess, "data": true, "message": "success"}) +} diff --git a/internal/router/router.go b/internal/router/router.go index 9a84c7b55ca..6e52b3f671c 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -43,6 +43,7 @@ type Router struct { providerHandler *handler.ProviderHandler agentHandler *handler.AgentHandler relatedQuestionsHandler *handler.SearchbotHandler + langfuseHandler *handler.LangfuseHandler } // NewRouter create router @@ -89,6 +90,7 @@ func NewRouter( providerHandler: providerHandler, agentHandler: agentHandler, relatedQuestionsHandler: relatedQuestionsHandler, + langfuseHandler: handler.NewLangfuseHandler(), } } @@ -381,6 +383,15 @@ func (r *Router) Setup(engine *gin.Engine) { } + // Langfuse credential management routes. + langfuse := v1.Group("/langfuse") + { + langfuse.POST("/api-key", r.langfuseHandler.SetAPIKey) + langfuse.PUT("/api-key", r.langfuseHandler.SetAPIKey) + langfuse.GET("/api-key", r.langfuseHandler.GetAPIKey) + langfuse.DELETE("/api-key", r.langfuseHandler.DeleteAPIKey) + } + connector := v1.Group("/connectors") { connector.GET("/", r.connectorHandler.ListConnectors) diff --git a/internal/service/langfuse.go b/internal/service/langfuse.go new file mode 100644 index 00000000000..c1925f055ad --- /dev/null +++ b/internal/service/langfuse.go @@ -0,0 +1,227 @@ +// +// Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package service + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" + + "gorm.io/gorm" + + "ragflow/internal/common" + "ragflow/internal/dao" + "ragflow/internal/entity" +) + +const ( + langfuseAuthCheckTimeout = 7 * time.Second + // projectsPath is the Langfuse endpoint used for auth_check. + langfuseProjectsPath = "/api/public/projects" +) + +// LangfuseService manages per-tenant Langfuse credentials. +type LangfuseService struct { + langfuseDAO *dao.LangfuseDAO +} + +// NewLangfuseService creates a LangfuseService. +func NewLangfuseService() *LangfuseService { + return &LangfuseService{langfuseDAO: dao.NewLangfuseDAO()} +} + +// SetAPIKeyRequest is the body for POST/PUT /langfuse/api-key. +type SetAPIKeyRequest struct { + SecretKey string `json:"secret_key" binding:"required"` + PublicKey string `json:"public_key" binding:"required"` + Host string `json:"host" binding:"required"` +} + +// LangfuseProject is one project entry returned by the Langfuse projects API. +type LangfuseProject struct { + ID string `json:"id"` + Name string `json:"name"` +} + +// SetAPIKey validates credentials against Langfuse, then upserts the record. +// Mirrors Python set_api_key. +func (s *LangfuseService) SetAPIKey(tenantID string, req *SetAPIKeyRequest) (map[string]interface{}, error) { + if strings.TrimSpace(req.SecretKey) == "" || + strings.TrimSpace(req.PublicKey) == "" || + strings.TrimSpace(req.Host) == "" { + return nil, fmt.Errorf("Missing required fields") + } + + if err := langfuseAuthCheck(req.PublicKey, req.SecretKey, req.Host); err != nil { + return nil, fmt.Errorf("Invalid Langfuse keys") + } + + existing, err := s.langfuseDAO.GetByTenantID(tenantID) + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { + return nil, fmt.Errorf("database error: %w", err) + } + + if existing == nil || errors.Is(err, gorm.ErrRecordNotFound) { + record := &entity.TenantLangfuse{ + ID: common.GenerateUUID(), + TenantID: tenantID, + SecretKey: req.SecretKey, + PublicKey: req.PublicKey, + Host: req.Host, + } + if err := s.langfuseDAO.Create(record); err != nil { + return nil, fmt.Errorf("failed to save Langfuse keys: %w", err) + } + } else { + if err := s.langfuseDAO.UpdateByTenantID(tenantID, map[string]interface{}{ + "secret_key": req.SecretKey, + "public_key": req.PublicKey, + "host": req.Host, + }); err != nil { + return nil, fmt.Errorf("failed to update Langfuse keys: %w", err) + } + } + + return map[string]interface{}{ + "tenant_id": tenantID, + "public_key": req.PublicKey, + "host": req.Host, + }, nil +} + +// GetAPIKey retrieves the stored credentials and validates them. +// The secret_key is intentionally omitted from the response. +// Mirrors Python get_api_key. +func (s *LangfuseService) GetAPIKey(tenantID string) (map[string]interface{}, error) { + entry, err := s.langfuseDAO.GetByTenantID(tenantID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil // caller interprets nil as "no keys stored" + } + return nil, fmt.Errorf("database error: %w", err) + } + + if err := langfuseAuthCheck(entry.PublicKey, entry.SecretKey, entry.Host); err != nil { + return nil, fmt.Errorf("Invalid Langfuse keys loaded") + } + + projects, err := langfuseGetProjects(entry.PublicKey, entry.SecretKey, entry.Host) + if err != nil { + return nil, fmt.Errorf("Error from Langfuse: %w", err) + } + + result := map[string]interface{}{ + "tenant_id": entry.TenantID, + "public_key": entry.PublicKey, + "host": entry.Host, + // secret_key is intentionally excluded. + } + if len(projects) > 0 { + result["project_id"] = projects[0].ID + result["project_name"] = projects[0].Name + } + return result, nil +} + +// DeleteAPIKey removes the stored Langfuse credentials for a tenant. +// Mirrors Python delete_api_key. +func (s *LangfuseService) DeleteAPIKey(tenantID string) (bool, error) { + _, err := s.langfuseDAO.GetByTenantID(tenantID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return false, nil // caller interprets false as "no keys stored" + } + return false, fmt.Errorf("database error: %w", err) + } + + if err := s.langfuseDAO.DeleteByTenantID(tenantID); err != nil { + return false, fmt.Errorf("failed to delete Langfuse keys: %w", err) + } + return true, nil +} + +// ── Langfuse API helpers ────────────────────────────────────────────────────── + +// langfuseAuthCheck calls GET {host}/api/public/projects with Basic Auth. +// Returns nil on HTTP 2xx, error otherwise — mirrors Python langfuse.auth_check(). +func langfuseAuthCheck(publicKey, secretKey, host string) error { + host = strings.TrimRight(host, "/") + url := host + langfuseProjectsPath + + ctx, cancel := context.WithTimeout(context.Background(), langfuseAuthCheckTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return err + } + req.SetBasicAuth(publicKey, secretKey) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + _, _ = io.Copy(io.Discard, resp.Body) + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("Langfuse auth failed: HTTP %d", resp.StatusCode) + } + return nil +} + +// langfuseGetProjects fetches the project list for the tenant. +func langfuseGetProjects(publicKey, secretKey, host string) ([]LangfuseProject, error) { + host = strings.TrimRight(host, "/") + url := host + langfuseProjectsPath + + ctx, cancel := context.WithTimeout(context.Background(), langfuseAuthCheckTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + req.SetBasicAuth(publicKey, secretKey) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + if err != nil { + return nil, err + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("Langfuse projects API returned HTTP %d", resp.StatusCode) + } + + var payload struct { + Data []LangfuseProject `json:"data"` + } + if err := json.Unmarshal(body, &payload); err != nil { + return nil, err + } + return payload.Data, nil +} From 0a8112406e47238a62695fee72071731f2e3aa4b Mon Sep 17 00:00:00 2001 From: hunnyboy1217 Date: Thu, 4 Jun 2026 23:48:07 -0600 Subject: [PATCH 2/2] refactor[Go]: address review on Langfuse API (PR #15677) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Applies CodeRabbit's final-review feedback. 1. Secret key leakage (entity/langfuse.go): TenantLangfuse.SecretKey is now tagged json:"-" so it can never leak through any path that marshals the entity directly. 2. SSRF host guard (service/langfuse.go): the user-supplied Langfuse host is now validated (http/https scheme + hostname) and all outbound calls go through an HTTP client whose dialer resolves the host, rejects any non-globally-routable IP (loopback/private/link-local/multicast/unspecified/CGNAT), and pins the validated address — closing the authenticated-SSRF / DNS-rebinding vector. Note: this rejects self-hosted Langfuse on private networks (a deliberate security/usability tradeoff; relax isGloballyRoutableIP if internal hosts must be allowed). 3. Upsert race (dao + service): SetAPIKey's read-then-create/update is replaced by a single atomic LangfuseDAO.UpsertByTenantID using GORM clause.OnConflict on the tenant_id unique index, removing the concurrent-write race. 4. Unbounded drain (service/langfuse.go): langfuseAuthCheck now drains the response through io.LimitReader (1 MB cap) instead of an unbounded io.Copy(io.Discard, resp.Body); the outbound client also carries an explicit timeout. --- internal/dao/langfuse.go | 14 ++++ internal/entity/langfuse.go | 4 +- internal/service/langfuse.go | 138 ++++++++++++++++++++++++++--------- 3 files changed, 122 insertions(+), 34 deletions(-) diff --git a/internal/dao/langfuse.go b/internal/dao/langfuse.go index 7b8d8d1a685..9904c2b2fd1 100644 --- a/internal/dao/langfuse.go +++ b/internal/dao/langfuse.go @@ -17,6 +17,8 @@ package dao import ( + "gorm.io/gorm/clause" + "ragflow/internal/entity" ) @@ -51,6 +53,18 @@ func (d *LangfuseDAO) UpdateByTenantID(tenantID string, updates map[string]inter Updates(updates).Error } +// UpsertByTenantID atomically inserts the record or, when a row already exists +// for the same tenant_id (unique index), updates its credentials in a single +// statement. This removes the read-then-write race between concurrent callers. +func (d *LangfuseDAO) UpsertByTenantID(entry *entity.TenantLangfuse) error { + return DB.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "tenant_id"}}, + DoUpdates: clause.AssignmentColumns([]string{ + "secret_key", "public_key", "host", "update_time", "update_date", + }), + }).Create(entry).Error +} + // DeleteByTenantID hard-deletes the record for a tenant. func (d *LangfuseDAO) DeleteByTenantID(tenantID string) error { return DB.Unscoped(). diff --git a/internal/entity/langfuse.go b/internal/entity/langfuse.go index 091432b5a59..1fba1c21d09 100644 --- a/internal/entity/langfuse.go +++ b/internal/entity/langfuse.go @@ -20,7 +20,9 @@ package entity type TenantLangfuse struct { ID string `gorm:"column:id;primaryKey;size:32" json:"id"` TenantID string `gorm:"column:tenant_id;size:32;not null;uniqueIndex" json:"tenant_id"` - SecretKey string `gorm:"column:secret_key;size:255;not null" json:"secret_key"` + // SecretKey is never serialised to JSON (json:"-") so it cannot leak through + // any response that marshals the entity directly. + SecretKey string `gorm:"column:secret_key;size:255;not null" json:"-"` PublicKey string `gorm:"column:public_key;size:255;not null" json:"public_key"` Host string `gorm:"column:host;size:255;not null" json:"host"` BaseModel diff --git a/internal/service/langfuse.go b/internal/service/langfuse.go index c1925f055ad..46186f2914d 100644 --- a/internal/service/langfuse.go +++ b/internal/service/langfuse.go @@ -22,7 +22,9 @@ import ( "errors" "fmt" "io" + "net" "net/http" + "net/url" "strings" "time" @@ -75,30 +77,18 @@ func (s *LangfuseService) SetAPIKey(tenantID string, req *SetAPIKeyRequest) (map return nil, fmt.Errorf("Invalid Langfuse keys") } - existing, err := s.langfuseDAO.GetByTenantID(tenantID) - if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { - return nil, fmt.Errorf("database error: %w", err) + // Atomic upsert: insert the credentials, or update them in place when a row + // already exists for this tenant. Avoids the read-then-write race between + // concurrent SetAPIKey calls for the same tenant. + record := &entity.TenantLangfuse{ + ID: common.GenerateUUID(), + TenantID: tenantID, + SecretKey: req.SecretKey, + PublicKey: req.PublicKey, + Host: req.Host, } - - if existing == nil || errors.Is(err, gorm.ErrRecordNotFound) { - record := &entity.TenantLangfuse{ - ID: common.GenerateUUID(), - TenantID: tenantID, - SecretKey: req.SecretKey, - PublicKey: req.PublicKey, - Host: req.Host, - } - if err := s.langfuseDAO.Create(record); err != nil { - return nil, fmt.Errorf("failed to save Langfuse keys: %w", err) - } - } else { - if err := s.langfuseDAO.UpdateByTenantID(tenantID, map[string]interface{}{ - "secret_key": req.SecretKey, - "public_key": req.PublicKey, - "host": req.Host, - }); err != nil { - return nil, fmt.Errorf("failed to update Langfuse keys: %w", err) - } + if err := s.langfuseDAO.UpsertByTenantID(record); err != nil { + return nil, fmt.Errorf("failed to save Langfuse keys: %w", err) } return map[string]interface{}{ @@ -161,27 +151,34 @@ func (s *LangfuseService) DeleteAPIKey(tenantID string) (bool, error) { // ── Langfuse API helpers ────────────────────────────────────────────────────── +// maxLangfuseResponseBytes bounds how much of a Langfuse response we read or +// drain, guarding against resource exhaustion from a hostile endpoint. +const maxLangfuseResponseBytes = 1 << 20 // 1 MB + // langfuseAuthCheck calls GET {host}/api/public/projects with Basic Auth. // Returns nil on HTTP 2xx, error otherwise — mirrors Python langfuse.auth_check(). func langfuseAuthCheck(publicKey, secretKey, host string) error { - host = strings.TrimRight(host, "/") - url := host + langfuseProjectsPath + if err := validateLangfuseHost(host); err != nil { + return err + } + reqURL := strings.TrimRight(host, "/") + langfuseProjectsPath ctx, cancel := context.WithTimeout(context.Background(), langfuseAuthCheckTimeout) defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) if err != nil { return err } req.SetBasicAuth(publicKey, secretKey) - resp, err := http.DefaultClient.Do(req) + resp, err := langfuseHTTPClient(langfuseAuthCheckTimeout).Do(req) if err != nil { return err } defer resp.Body.Close() - _, _ = io.Copy(io.Discard, resp.Body) + // Bound the drain so a hostile endpoint cannot stream an unbounded body. + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, maxLangfuseResponseBytes)) if resp.StatusCode < 200 || resp.StatusCode >= 300 { return fmt.Errorf("Langfuse auth failed: HTTP %d", resp.StatusCode) @@ -191,25 +188,27 @@ func langfuseAuthCheck(publicKey, secretKey, host string) error { // langfuseGetProjects fetches the project list for the tenant. func langfuseGetProjects(publicKey, secretKey, host string) ([]LangfuseProject, error) { - host = strings.TrimRight(host, "/") - url := host + langfuseProjectsPath + if err := validateLangfuseHost(host); err != nil { + return nil, err + } + reqURL := strings.TrimRight(host, "/") + langfuseProjectsPath ctx, cancel := context.WithTimeout(context.Background(), langfuseAuthCheckTimeout) defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) if err != nil { return nil, err } req.SetBasicAuth(publicKey, secretKey) - resp, err := http.DefaultClient.Do(req) + resp, err := langfuseHTTPClient(langfuseAuthCheckTimeout).Do(req) if err != nil { return nil, err } defer resp.Body.Close() - body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + body, err := io.ReadAll(io.LimitReader(resp.Body, maxLangfuseResponseBytes)) if err != nil { return nil, err } @@ -225,3 +224,76 @@ func langfuseGetProjects(publicKey, secretKey, host string) ([]LangfuseProject, } return payload.Data, nil } + +// validateLangfuseHost ensures the configured host is a well-formed http(s) URL. +// The user supplies this host and the server then issues requests to it, so it +// must be validated before use. +func validateLangfuseHost(host string) error { + u, err := url.Parse(strings.TrimRight(strings.TrimSpace(host), "/")) + if err != nil { + return fmt.Errorf("invalid Langfuse host") + } + if u.Scheme != "http" && u.Scheme != "https" { + return fmt.Errorf("Langfuse host must use http or https") + } + if u.Hostname() == "" { + return fmt.Errorf("Langfuse host is missing a hostname") + } + return nil +} + +// langfuseHTTPClient returns an HTTP client whose dialer resolves the target +// host, rejects any non-globally-routable IP, and pins the validated address — +// an SSRF guard (with DNS-rebinding protection) for the user-supplied Langfuse +// host. The IP check runs on every connection, so redirect hops are covered too. +func langfuseHTTPClient(timeout time.Duration) *http.Client { + dialer := &net.Dialer{Timeout: 5 * time.Second} + return &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + ips, err := net.DefaultResolver.LookupIP(ctx, "ip", host) + if err != nil { + return nil, fmt.Errorf("could not resolve host %q: %w", host, err) + } + var pinned net.IP + for _, ip := range ips { + if !isGloballyRoutableIP(ip) { + return nil, fmt.Errorf("Langfuse host resolves to a non-public address (%s)", ip) + } + if pinned == nil { + pinned = ip + } + } + if pinned == nil { + return nil, fmt.Errorf("host %q resolved to no addresses", host) + } + return dialer.DialContext(ctx, network, net.JoinHostPort(pinned.String(), port)) + }, + }, + } +} + +// isGloballyRoutableIP reports whether ip is a public, routable address. It +// rejects loopback, private, link-local, multicast, unspecified, and +// carrier-grade-NAT ranges. IPv4-mapped IPv6 addresses are handled by the +// stdlib predicates. +func isGloballyRoutableIP(ip net.IP) bool { + if ip == nil { + return false + } + if ip.IsLoopback() || ip.IsPrivate() || ip.IsUnspecified() || + ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() || + ip.IsMulticast() || ip.IsInterfaceLocalMulticast() { + return false + } + // Carrier-grade NAT 100.64.0.0/10 (RFC 6598) — not covered by IsPrivate. + if ip4 := ip.To4(); ip4 != nil && ip4[0] == 100 && ip4[1]&0xc0 == 0x40 { + return false + } + return true +}