diff --git a/.github/workflows/test-api.yml b/.github/workflows/test-api.yml new file mode 100644 index 00000000..c2f7f5c5 --- /dev/null +++ b/.github/workflows/test-api.yml @@ -0,0 +1,60 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Test API + +permissions: + contents: read + +on: + workflow_dispatch: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + failover: + runs-on: ubuntu-latest + services: + mock-server: + image: livekit/test-server:latest + ports: + - 9999:9999 + - 10000:10000 + - 10001:10001 + - 10002:10002 + steps: + - uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6 + + - name: Set up Opus + run: | + sudo apt-get update + sudo apt-get install -y libsoxr-dev libopus-dev libopusfile-dev + + - name: Set up Go + uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6 + with: + go-version: 1.26.0 + + - name: Wait for mock server + run: | + for i in $(seq 1 30); do + curl -sf http://127.0.0.1:9999/settings/regions >/dev/null && exit 0 + sleep 1 + done + echo "mock server did not become ready" && exit 1 + + - name: Run API tests + run: go test -v -count=1 -run '^TestAPI_' . diff --git a/agent_client.go b/agent_client.go index 10fc9de1..076d663a 100644 --- a/agent_client.go +++ b/agent_client.go @@ -29,7 +29,7 @@ func NewAgentClient(url string, apiKey string, apiSecret string, opts ...AgentCl } c := &AgentClient{ authBase: authBase{apiKey, apiSecret}, - httpClient: &http.Client{}, + httpClient: newAPIHTTPClient(), } for _, opt := range opts { opt(c) diff --git a/agent_dispatch_client.go b/agent_dispatch_client.go index 71a724bd..f84b44d6 100644 --- a/agent_dispatch_client.go +++ b/agent_dispatch_client.go @@ -16,7 +16,6 @@ package lksdk import ( "context" - "net/http" "github.com/twitchtv/twirp" @@ -31,7 +30,7 @@ type AgentDispatchClient struct { func NewAgentDispatchServiceClient(url string, apiKey string, secretKey string, opts ...twirp.ClientOption) *AgentDispatchClient { url = signalling.ToHttpURL(url) - client := livekit.NewAgentDispatchServiceProtobufClient(url, &http.Client{}, opts...) + client := livekit.NewAgentDispatchServiceProtobufClient(url, newAPIHTTPClient(), opts...) return &AgentDispatchClient{ agentDispatchService: client, diff --git a/agent_simulation_client.go b/agent_simulation_client.go index 52552858..b2851468 100644 --- a/agent_simulation_client.go +++ b/agent_simulation_client.go @@ -16,7 +16,6 @@ package lksdk import ( "context" - "net/http" "github.com/twitchtv/twirp" @@ -29,7 +28,7 @@ type AgentSimulationClient struct { } func NewAgentSimulationClient(url string, apiKey string, apiSecret string, opts ...twirp.ClientOption) *AgentSimulationClient { - client := livekit.NewAgentSimulationProtobufClient(url, &http.Client{}, opts...) + client := livekit.NewAgentSimulationProtobufClient(url, newAPIHTTPClient(), opts...) return &AgentSimulationClient{ simulationClient: client, authBase: authBase{apiKey, apiSecret}, diff --git a/connectorclient.go b/connectorclient.go index a67f3c68..c025627c 100644 --- a/connectorclient.go +++ b/connectorclient.go @@ -16,7 +16,6 @@ package lksdk import ( "context" - "net/http" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/utils/xtwirp" @@ -32,7 +31,7 @@ type ConnectorClient struct { func NewConnectorClient(url string, apiKey string, secretKey string, opts ...twirp.ClientOption) *ConnectorClient { opts = append(opts, xtwirp.DefaultClientOptions()...) url = signalling.ToHttpURL(url) - client := livekit.NewConnectorProtobufClient(url, &http.Client{}, opts...) + client := livekit.NewConnectorProtobufClient(url, newAPIHTTPClient(), opts...) return &ConnectorClient{ connector: client, authBase: authBase{ diff --git a/egressclient.go b/egressclient.go index 60d37f94..42173e61 100644 --- a/egressclient.go +++ b/egressclient.go @@ -16,7 +16,6 @@ package lksdk import ( "context" - "net/http" "github.com/twitchtv/twirp" @@ -33,7 +32,7 @@ type EgressClient struct { func NewEgressClient(url string, apiKey string, secretKey string, opts ...twirp.ClientOption) *EgressClient { opts = append(opts, xtwirp.DefaultClientOptions()...) url = signalling.ToHttpURL(url) - client := livekit.NewEgressProtobufClient(url, &http.Client{}, opts...) + client := livekit.NewEgressProtobufClient(url, newAPIHTTPClient(), opts...) return &EgressClient{ egressClient: client, authBase: authBase{ diff --git a/failover.go b/failover.go new file mode 100644 index 00000000..bf08ee8f --- /dev/null +++ b/failover.go @@ -0,0 +1,247 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lksdk + +import ( + "bytes" + "context" + "io" + "net/http" + "net/url" + "strings" + "time" + + "github.com/livekit/protocol/livekit" + + "github.com/livekit/server-sdk-go/v2/signalling" +) + +// Total attempts (the original request plus fallback regions) and the base +// retry backoff are fixed, not user-configurable, so retries can't be tuned to +// values that could overwhelm the server. +const ( + failoverMaxAttempts = 3 + failoverBackoffBase = 200 * time.Millisecond +) + +// failoverConfig is the resolved per-request region-failover configuration. The +// public API exposes only the enabled toggle (default true); force and +// backoffBase are internal test-only knobs. +type failoverConfig struct { + enabled bool + // force bypasses the cloud-host check. Internal testing only. + force bool + // backoffBase overrides the retry backoff base. Internal testing only. + backoffBase time.Duration +} + +// attempts returns the total request attempts for a host; 1 means no failover. +// Failover only engages when enabled and the host is a LiveKit Cloud domain. +// force bypasses the cloud-host check and is for internal testing only. +func (c failoverConfig) attempts(hostname string) int { + if c.enabled && (c.force || isCloud(hostname)) { + return failoverMaxAttempts + } + return 1 +} + +type failoverEnabledKey struct{} +type failoverForceKey struct{} + +// WithFailover returns a context that enables or disables region failover for +// API requests made with it (enabled by default). Failover only engages for +// LiveKit Cloud hosts. Pass the returned context to any service client method. +func WithFailover(ctx context.Context, enabled bool) context.Context { + return context.WithValue(ctx, failoverEnabledKey{}, enabled) +} + +// withFailoverForce returns a context that forces failover on regardless of +// host and overrides the retry backoff. Internal, test-only. +func withFailoverForce(ctx context.Context, backoff time.Duration) context.Context { + return context.WithValue(ctx, failoverForceKey{}, backoff) +} + +func failoverConfigFromContext(ctx context.Context) failoverConfig { + cfg := failoverConfig{enabled: true, backoffBase: failoverBackoffBase} + if enabled, ok := ctx.Value(failoverEnabledKey{}).(bool); ok { + cfg.enabled = enabled + } + if backoff, ok := ctx.Value(failoverForceKey{}).(time.Duration); ok { + cfg.force = true + cfg.backoffBase = backoff + } + return cfg +} + +// newAPIHTTPClient returns the *http.Client used by every API service client. +// It wraps the default transport with region-failover retries. +func newAPIHTTPClient() *http.Client { + return &http.Client{ + Transport: &failoverTransport{base: http.DefaultTransport, regions: sharedAPIRegions}, + } +} + +// failoverTransport orchestrates region failover for a single API request. On +// a retryable failure (any transport error or HTTP 5xx) it discovers +// alternative regions via /settings/regions and replays the request — body and +// headers intact — against the next region, with exponential backoff. 4xx +// responses are returned immediately. +type failoverTransport struct { + base http.RoundTripper + regions *regionCache +} + +func (t *failoverTransport) RoundTrip(req *http.Request) (*http.Response, error) { + cfg := failoverConfigFromContext(req.Context()) + maxAttempts := cfg.attempts(req.URL.Hostname()) + + // Buffer the body once so it can be replayed against each region. + var body []byte + if req.Body != nil { + b, err := io.ReadAll(req.Body) + _ = req.Body.Close() + if err != nil { + return nil, err + } + body = b + } + + originalScheme, originalHost := req.URL.Scheme, req.URL.Host + attempted := map[string]struct{}{strings.ToLower(originalHost): {}} + + var ( + settings *livekit.RegionSettings + fetchedRegions bool + resp *http.Response + err error + ) + scheme, host := originalScheme, originalHost + + for attempt := 0; attempt < maxAttempts; attempt++ { + if ctxErr := req.Context().Err(); ctxErr != nil { + return nil, ctxErr + } + + r := req.Clone(req.Context()) + r.URL.Scheme = scheme + r.URL.Host = host + r.Host = host + if body != nil { + buf := body + r.Body = io.NopCloser(bytes.NewReader(buf)) + r.ContentLength = int64(len(buf)) + r.GetBody = func() (io.ReadCloser, error) { return io.NopCloser(bytes.NewReader(buf)), nil } + } + + resp, err = t.base.RoundTrip(r) + if !isRetryable(resp, err) { + return resp, err + } + if attempt == maxAttempts-1 { + break // out of attempts; surface the last result + } + + // discover regions lazily; honor the request scheme (so it works against + // an http mock) and forward the caller's headers to the discovery fetch. + if !fetchedRegions { + discoveryURL := url.URL{Scheme: originalScheme, Host: originalHost, Path: "/settings/regions"} + settings, _ = t.regions.get(originalHost, discoveryURL.String(), req.Header, 0) + fetchedRegions = true + } + nextScheme, nextHost, ok := nextRegion(settings, attempted) + if !ok { + break // no untried region left + } + + status := 0 + if resp != nil { + status = resp.StatusCode + } + logger.Warnw("livekit API request failed, retrying with fallback url", err, + "failedUrl", scheme+"://"+host, "fallbackUrl", nextScheme+"://"+nextHost, + "attempt", attempt+1, "maxAttempts", maxAttempts, "status", status) + + drainResponse(resp) + if !sleepCtx(req.Context(), cfg.backoffBase<= 500 +} + +// nextRegion returns the first region whose host has not yet been attempted. +func nextRegion(settings *livekit.RegionSettings, attempted map[string]struct{}) (scheme, host string, ok bool) { + if settings == nil { + return "", "", false + } + for _, region := range settings.Regions { + u, err := url.Parse(signalling.ToHttpURL(region.Url)) + if err != nil || u.Host == "" { + continue + } + if _, seen := attempted[strings.ToLower(u.Host)]; seen { + continue + } + return u.Scheme, u.Host, true + } + return "", "", false +} + +func drainResponse(resp *http.Response) { + if resp != nil && resp.Body != nil { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + } +} + +func sleepCtx(ctx context.Context, d time.Duration) bool { + if d <= 0 { + return ctx.Err() == nil + } + timer := time.NewTimer(d) + defer timer.Stop() + select { + case <-ctx.Done(): + return false + case <-timer.C: + return true + } +} + +// sharedAPIRegions is the process-wide region cache used by the API failover +// path. The RTC signaling path has its own regionURLProvider over a separate +// cache, but both share the same regionCache discovery/fetch/TTL logic. +var sharedAPIRegions = newRegionCache() + +// TwirpRegionError indicates the /settings/regions endpoint returned a non-200 +// status while attempting region failover. +type TwirpRegionError struct { + StatusCode int +} + +func (e *TwirpRegionError) Error() string { + return "failed to fetch region settings: status " + http.StatusText(e.StatusCode) +} diff --git a/failover_apitest_test.go b/failover_apitest_test.go new file mode 100644 index 00000000..4752e5f3 --- /dev/null +++ b/failover_apitest_test.go @@ -0,0 +1,149 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// API failover tests that exercise the public server SDK against the shared +// mock LiveKit API server (livekit/livekit cmd/test-server). Point them at a +// running instance with LK_TEST_SERVER_URL (default http://127.0.0.1:9999); +// they skip when no server is reachable. In CI the server is booted as a Docker +// container. +// +// See cmd/test-server/README.md for the X-Lk-Mock-* control protocol. Mock +// directives are passed to the SDK via twirp request headers, which the +// failover transport forwards to the discovery fetch and every retry. These +// tests are in-package so they can use the internal test-force hook (the public +// API only exposes WithFailover(ctx, bool), which is cloud-gated). +package lksdk + +import ( + "context" + "net/http" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/twitchtv/twirp" + + "github.com/livekit/protocol/livekit" +) + +const ( + hdrFailRegions = "X-Lk-Mock-Fail-Regions" + hdrFailMode = "X-Lk-Mock-Fail-Mode" + hdrFailStatus = "X-Lk-Mock-Fail-Status" + hdrRegionsStat = "X-Lk-Mock-Regions-Status" +) + +func testServerURL(t *testing.T) string { + url := os.Getenv("LK_TEST_SERVER_URL") + if url == "" { + url = "http://127.0.0.1:9999" + } + resp, err := http.Get(url + "/settings/regions") + if err != nil { + t.Skipf("mock test server not reachable at %s (set LK_TEST_SERVER_URL): %v", url, err) + } + _ = resp.Body.Close() + return url +} + +// failoverCtx returns a context that forces failover on (the mock is not a +// cloud host) with a tiny backoff, carrying the given X-Lk-Mock-* directives as +// twirp request headers. force/backoff are internal, test-only knobs. +func failoverCtx(t *testing.T, directives map[string]string) context.Context { + ctx := withFailoverForce(context.Background(), time.Millisecond) + if len(directives) > 0 { + h := make(http.Header) + for k, v := range directives { + h.Set(k, v) + } + var err error + ctx, err = twirp.WithHTTPRequestHeaders(ctx, h) + require.NoError(t, err) + } + return ctx +} + +func TestAPI_Healthy(t *testing.T) { + client := NewRoomServiceClient(testServerURL(t), "devkey", "secret") + room, err := client.CreateRoom(failoverCtx(t, nil), &livekit.CreateRoomRequest{Name: "api-test"}) + require.NoError(t, err) + require.Equal(t, "api-test", room.Name, "the mock echoes the request name") + require.NotEmpty(t, room.Sid) +} + +func TestAPI_PrimaryUnavailable(t *testing.T) { + client := NewRoomServiceClient(testServerURL(t), "devkey", "secret") + ctx := failoverCtx(t, map[string]string{hdrFailRegions: "0"}) + _, err := client.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: "api-test"}) + require.NoError(t, err, "should fail over to a healthy region") +} + +func TestAPI_TwoRegionsUnavailable(t *testing.T) { + client := NewRoomServiceClient(testServerURL(t), "devkey", "secret") + ctx := failoverCtx(t, map[string]string{hdrFailRegions: "0,1"}) + _, err := client.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: "api-test"}) + require.NoError(t, err, "should fail over to the third region") +} + +func TestAPI_AllUnavailable(t *testing.T) { + client := NewRoomServiceClient(testServerURL(t), "devkey", "secret") + ctx := failoverCtx(t, map[string]string{hdrFailRegions: "0,1,2,3"}) + _, err := client.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: "api-test"}) + require.Error(t, err) +} + +func TestAPI_ClientErrorNotRetried(t *testing.T) { + client := NewRoomServiceClient(testServerURL(t), "devkey", "secret") + ctx := failoverCtx(t, map[string]string{hdrFailRegions: "0", hdrFailStatus: "400"}) + _, err := client.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: "api-test"}) + require.Error(t, err) + var terr twirp.Error + require.ErrorAs(t, err, &terr) + require.Equal(t, twirp.InvalidArgument, terr.Code(), "a 4xx must surface as a typed error, not fail over") +} + +func TestAPI_TransportError(t *testing.T) { + client := NewRoomServiceClient(testServerURL(t), "devkey", "secret") + ctx := failoverCtx(t, map[string]string{hdrFailRegions: "0", hdrFailMode: "drop"}) + _, err := client.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: "api-test"}) + require.NoError(t, err, "a dropped connection should fail over to a healthy region") +} + +func TestAPI_RegionDiscoveryUnreachable(t *testing.T) { + client := NewRoomServiceClient(testServerURL(t), "devkey", "secret") + ctx := failoverCtx(t, map[string]string{hdrFailRegions: "0", hdrRegionsStat: "500"}) + _, err := client.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: "api-test"}) + require.Error(t, err, "no fallback hosts means the original error is surfaced") +} + +func TestAPI_FailoverNotCloudHost(t *testing.T) { + client := NewRoomServiceClient(testServerURL(t), "devkey", "secret") + // Enabled (the default) but not forced; 127.0.0.1 is not a cloud host, so + // failover must not engage. + h := make(http.Header) + h.Set(hdrFailRegions, "0") + ctx, err := twirp.WithHTTPRequestHeaders(context.Background(), h) + require.NoError(t, err) + _, err = client.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: "api-test"}) + require.Error(t, err) +} + +func TestAPI_FailoverDisabled(t *testing.T) { + client := NewRoomServiceClient(testServerURL(t), "devkey", "secret") + // Forced on, but explicitly disabled via WithFailover. + ctx := WithFailover(failoverCtx(t, map[string]string{hdrFailRegions: "0"}), false) + _, err := client.CreateRoom(ctx, &livekit.CreateRoomRequest{Name: "api-test"}) + require.Error(t, err) +} diff --git a/failover_internal_test.go b/failover_internal_test.go new file mode 100644 index 00000000..9ebc4136 --- /dev/null +++ b/failover_internal_test.go @@ -0,0 +1,42 @@ +// Copyright 2026 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lksdk + +import "testing" + +func TestFailoverAttempts(t *testing.T) { + cases := []struct { + cfg failoverConfig + host string + want int + }{ + // Enabled (the default): only *.livekit.cloud project domains fail over. + {failoverConfig{enabled: true}, "myproject.livekit.cloud", failoverMaxAttempts}, + {failoverConfig{enabled: true}, "myproject.region.livekit.cloud", failoverMaxAttempts}, + {failoverConfig{enabled: true}, "myproject.livekit.io", 1}, + {failoverConfig{enabled: true}, "example.com", 1}, + {failoverConfig{enabled: true}, "127.0.0.1", 1}, + {failoverConfig{enabled: true}, "notlivekit.cloud", 1}, + // force bypasses the cloud-host check; disabled never fails over. + {failoverConfig{enabled: true, force: true}, "127.0.0.1", failoverMaxAttempts}, + {failoverConfig{enabled: false, force: true}, "myproject.livekit.cloud", 1}, + {failoverConfig{enabled: false}, "myproject.livekit.cloud", 1}, + } + for _, c := range cases { + if got := c.cfg.attempts(c.host); got != c.want { + t.Errorf("attempts(cfg=%+v, host=%q) = %v, want %v", c.cfg, c.host, got, c.want) + } + } +} diff --git a/ingressclient.go b/ingressclient.go index da3e4a0e..3e000846 100644 --- a/ingressclient.go +++ b/ingressclient.go @@ -16,7 +16,6 @@ package lksdk import ( "context" - "net/http" "github.com/twitchtv/twirp" @@ -33,7 +32,7 @@ type IngressClient struct { func NewIngressClient(url string, apiKey string, secretKey string, opts ...twirp.ClientOption) *IngressClient { opts = append(opts, xtwirp.DefaultClientOptions()...) url = signalling.ToHttpURL(url) - client := livekit.NewIngressProtobufClient(url, &http.Client{}, opts...) + client := livekit.NewIngressProtobufClient(url, newAPIHTTPClient(), opts...) return &IngressClient{ ingressClient: client, authBase: authBase{ diff --git a/phonenumberclient.go b/phonenumberclient.go index ba9c0824..4f2b332b 100644 --- a/phonenumberclient.go +++ b/phonenumberclient.go @@ -16,7 +16,6 @@ package lksdk import ( "context" - "net/http" "time" "github.com/twitchtv/twirp" @@ -35,7 +34,7 @@ type PhoneNumberClient struct { func NewPhoneNumberClient(url string, apiKey string, secretKey string, opts ...twirp.ClientOption) *PhoneNumberClient { opts = append(opts, xtwirp.DefaultClientOptions()...) return &PhoneNumberClient{ - phoneNumberClient: livekit.NewPhoneNumberServiceProtobufClient(signalling.ToHttpURL(url), &http.Client{}, opts...), + phoneNumberClient: livekit.NewPhoneNumberServiceProtobufClient(signalling.ToHttpURL(url), newAPIHTTPClient(), opts...), authBase: authBase{ apiKey: apiKey, apiSecret: secretKey, diff --git a/regionurlprovider.go b/regionurlprovider.go index dbdeeae9..08d0ba7f 100644 --- a/regionurlprovider.go +++ b/regionurlprovider.go @@ -18,6 +18,9 @@ import ( const ( regionHostnameProviderSettingsCacheTime = 3 * time.Second + // regionDiscoveryTimeout bounds a single /settings/regions fetch so a slow + // or unreachable endpoint doesn't stall the failover path. + regionDiscoveryTimeout = 2 * time.Second ) // regionSettingsURL builds the LiveKit Cloud region-discovery URL for a hostname. @@ -25,133 +28,159 @@ var regionSettingsURL = func(cloudHostname string) string { return "https://" + cloudHostname + "/settings/regions" } +// regionURLProvider supplies LiveKit Cloud region lists to the RTC signaling +// reconnect path. It is a thin token/https adapter over the shared regionCache, +// adding a default cache TTL and support for server-pushed region lists. type regionURLProvider struct { - hostnameSettingsCache map[string]*hostnameSettingsCacheItem // hostname -> regionSettings - - mutex sync.RWMutex - httpClient *http.Client -} - -type hostnameSettingsCacheItem struct { - regionSettings *livekit.RegionSettings - updatedAt time.Time - cacheTTL time.Duration // from Cache-Control max-age; falls back to the default + cache *regionCache } func newRegionURLProvider() *regionURLProvider { - return ®ionURLProvider{ - hostnameSettingsCache: make(map[string]*hostnameSettingsCacheItem), - httpClient: &http.Client{ - Timeout: 5 * time.Second, - }, - } + return ®ionURLProvider{cache: newRegionCache()} } func (r *regionURLProvider) RefreshRegionSettings(cloudHostname, token string) error { - r.mutex.RLock() - hostnameSettings := r.hostnameSettingsCache[cloudHostname] - r.mutex.RUnlock() - - if hostnameSettings != nil { - ttl := hostnameSettings.cacheTTL - if ttl <= 0 { - ttl = regionHostnameProviderSettingsCacheTime - } - if time.Since(hostnameSettings.updatedAt) < ttl { - return nil - } - } + _, err := r.RegionSettings(cloudHostname, token) + return err +} - settingsURL := regionSettingsURL(cloudHostname) - req, err := http.NewRequest("GET", settingsURL, nil) +// RegionSettings returns the cached region list for a hostname, refreshing it if +// stale. The returned list is owned by the cache and must not be mutated; the +// caller is responsible for tracking its own per-failover attempt state. +func (r *regionURLProvider) RegionSettings(cloudHostname, token string) (*livekit.RegionSettings, error) { + headers := http.Header{"Authorization": []string{"Bearer " + token}} + settings, err := r.cache.get(cloudHostname, regionSettingsURL(cloudHostname), headers, regionHostnameProviderSettingsCacheTime) if err != nil { - return errors.New("refreshRegionSettings failed to create request: " + err.Error()) + return nil, err } - req.Header = http.Header{ - "Authorization": []string{"Bearer " + token}, + if settings == nil { + return nil, errors.New("no regions available") } + if len(settings.Regions) == 0 { + logger.Warnw("no regions returned", nil, "cloudHostname", cloudHostname) + } + return settings, nil +} - resp, err := r.httpClient.Do(req) - if err != nil { - return err +// SetServerReportedRegions stores a region list pushed by the server (e.g. on a +// reconnect LeaveRequest), overriding the cached /settings/regions list and +// resetting the cache TTL so the next failover uses it without re-fetching. +func (r *regionURLProvider) SetServerReportedRegions(cloudHostname string, regions *livekit.RegionSettings) { + r.cache.set(cloudHostname, regions, regionHostnameProviderSettingsCacheTime) +} + +// regionCache fetches and caches the LiveKit Cloud region list per host. It is +// shared by the API failover path (which honors the request scheme and forwards +// the caller's headers) and the RTC signaling path (which fetches over https +// with a bearer token). The caller supplies the discovery URL so each path +// keeps its own URL scheme and test seams. +type regionCache struct { + mu sync.Mutex + client *http.Client + cache map[string]*regionCacheEntry +} + +type regionCacheEntry struct { + settings *livekit.RegionSettings + fetchedAt time.Time + ttl time.Duration +} + +func newRegionCache() *regionCache { + return ®ionCache{ + client: &http.Client{Timeout: regionDiscoveryTimeout}, + cache: make(map[string]*regionCacheEntry), } - defer resp.Body.Close() +} - if resp.StatusCode != http.StatusOK { - return errors.New("refreshRegionSettings failed to fetch region settings. http status: " + resp.Status) +// get returns the cached region list for key, fetching discoveryURL if the +// cache is stale. The server's Cache-Control max-age sets the TTL; when absent, +// defaultTTL is used (0 means "do not cache"). Best-effort: on a fetch failure +// it serves a stale cached list when available, otherwise returns the error. +func (c *regionCache) get(key, discoveryURL string, headers http.Header, defaultTTL time.Duration) (*livekit.RegionSettings, error) { + key = strings.ToLower(key) + + c.mu.Lock() + if entry := c.cache[key]; entry != nil && time.Since(entry.fetchedAt) < entry.ttl { + defer c.mu.Unlock() + return entry.settings, nil } + c.mu.Unlock() - respBody, err := io.ReadAll(resp.Body) + settings, ttl, err := c.fetch(discoveryURL, headers) if err != nil { - return errors.New("refreshRegionSettings failed to read response body: " + err.Error()) - } - regions := &livekit.RegionSettings{} - if err := protojson.Unmarshal(respBody, regions); err != nil { - return errors.New("refreshRegionSettings failed to decode region settings: " + err.Error()) + c.mu.Lock() + defer c.mu.Unlock() + if entry := c.cache[key]; entry != nil { + return entry.settings, nil // serve stale on failure + } + return nil, err } - ttl := regionHostnameProviderSettingsCacheTime - if maxAge := parseRegionSettingsMaxAge(resp.Header.Get("Cache-Control")); maxAge > 0 { - ttl = maxAge + if ttl <= 0 { + ttl = defaultTTL } - - r.mutex.Lock() - item := &hostnameSettingsCacheItem{ - regionSettings: regions, - updatedAt: time.Now(), - cacheTTL: ttl, + if ttl > 0 { + c.mu.Lock() + c.cache[key] = ®ionCacheEntry{settings: settings, fetchedAt: time.Now(), ttl: ttl} + c.mu.Unlock() } - r.hostnameSettingsCache[cloudHostname] = item - r.mutex.Unlock() + return settings, nil +} - if len(item.regionSettings.Regions) == 0 { - logger.Warnw("no regions returned", nil, "cloudHostname", cloudHostname) +// set stores a region list pushed out-of-band (e.g. by the server on reconnect), +// overriding any cached list and keeping the existing TTL, or defaultTTL. +func (c *regionCache) set(key string, settings *livekit.RegionSettings, defaultTTL time.Duration) { + if settings == nil { + return } - - return nil + key = strings.ToLower(key) + c.mu.Lock() + defer c.mu.Unlock() + ttl := defaultTTL + if existing := c.cache[key]; existing != nil && existing.ttl > 0 { + ttl = existing.ttl + } + c.cache[key] = ®ionCacheEntry{settings: settings, fetchedAt: time.Now(), ttl: ttl} } -// RegionSettings returns the cached region list for a hostname, refreshing it if -// stale. The returned list is owned by the cache and must not be mutated; the -// caller is responsible for tracking its own per-failover attempt state. -func (r *regionURLProvider) RegionSettings(cloudHostname, token string) (*livekit.RegionSettings, error) { - if err := r.RefreshRegionSettings(cloudHostname, token); err != nil { - r.mutex.RLock() - cached := r.hostnameSettingsCache[cloudHostname] - r.mutex.RUnlock() - if cached == nil { - return nil, err +func (c *regionCache) fetch(discoveryURL string, headers http.Header) (*livekit.RegionSettings, time.Duration, error) { + req, err := http.NewRequest(http.MethodGet, discoveryURL, nil) + if err != nil { + return nil, 0, err + } + // Forward the caller's headers (Authorization and any custom headers), + // minus body-specific ones, so a validly-signed token reaches the discovery + // endpoint and test directives propagate. + for k, vv := range headers { + switch http.CanonicalHeaderKey(k) { + case "Content-Type", "Content-Length": + continue + } + for _, v := range vv { + req.Header.Add(k, v) } - // a cached list exists; fall through and use it } - r.mutex.RLock() - defer r.mutex.RUnlock() - item := r.hostnameSettingsCache[cloudHostname] - if item == nil { - return nil, errors.New("no regions available") + resp, err := c.client.Do(req) + if err != nil { + return nil, 0, err } - return item.regionSettings, nil -} + defer drainResponse(resp) -// SetServerReportedRegions stores a region list pushed by the server (e.g. on a -// reconnect LeaveRequest), overriding the cached /settings/regions list and -// resetting the cache TTL so the next failover uses it without re-fetching. -func (r *regionURLProvider) SetServerReportedRegions(cloudHostname string, regions *livekit.RegionSettings) { - if regions == nil { - return + if resp.StatusCode != http.StatusOK { + return nil, 0, &TwirpRegionError{StatusCode: resp.StatusCode} } - r.mutex.Lock() - defer r.mutex.Unlock() - ttl := regionHostnameProviderSettingsCacheTime - if existing := r.hostnameSettingsCache[cloudHostname]; existing != nil && existing.cacheTTL > 0 { - ttl = existing.cacheTTL + b, err := io.ReadAll(resp.Body) + if err != nil { + return nil, 0, err } - r.hostnameSettingsCache[cloudHostname] = &hostnameSettingsCacheItem{ - regionSettings: regions, - updatedAt: time.Now(), - cacheTTL: ttl, + settings := &livekit.RegionSettings{} + if err := protojson.Unmarshal(b, settings); err != nil { + return nil, 0, err } + ttl := parseRegionSettingsMaxAge(resp.Header.Get("Cache-Control")) + return settings, ttl, nil } // parseRegionSettingsMaxAge extracts the max-age (seconds) from a Cache-Control @@ -182,7 +211,8 @@ func parseCloudURL(serverURL string) (string, error) { return parsedURL.Hostname(), nil } -// isCloud reports whether the hostname belongs to LiveKit Cloud. +// isCloud reports whether the hostname belongs to a LiveKit Cloud project +// (a *.livekit.cloud subdomain). var isCloud = func(hostname string) bool { - return strings.HasSuffix(hostname, "livekit.cloud") || strings.HasSuffix(hostname, "livekit.io") + return strings.HasSuffix(hostname, ".livekit.cloud") } diff --git a/regionurlprovider_test.go b/regionurlprovider_test.go index 97a0290b..64e18e73 100644 --- a/regionurlprovider_test.go +++ b/regionurlprovider_test.go @@ -80,7 +80,7 @@ func (rs *regionSettingsServer) setRegions(regions *livekit.RegionSettings) { func newTestProvider(rs *regionSettingsServer) *regionURLProvider { p := newRegionURLProvider() // trust the httptest TLS certificate - p.httpClient = rs.server.Client() + p.cache.client = rs.server.Client() return p } @@ -139,9 +139,9 @@ func TestRegionURLProvider_RefreshRepopulates(t *testing.T) { require.Equal(t, "wss://a.example.com", settings.GetRegions()[0].Url) // expire the cache and serve a different region list - p.mutex.Lock() - p.hostnameSettingsCache[rs.hostname].updatedAt = time.Now().Add(-2 * regionHostnameProviderSettingsCacheTime) - p.mutex.Unlock() + p.cache.mu.Lock() + p.cache.cache[strings.ToLower(rs.hostname)].fetchedAt = time.Now().Add(-2 * regionHostnameProviderSettingsCacheTime) + p.cache.mu.Unlock() rs.setRegions(&livekit.RegionSettings{ Regions: []*livekit.RegionInfo{{Region: "b", Url: "wss://b.example.com"}}, }) diff --git a/roomclient.go b/roomclient.go index 86e89844..adfad779 100644 --- a/roomclient.go +++ b/roomclient.go @@ -16,7 +16,6 @@ package lksdk import ( "context" - "net/http" "github.com/google/uuid" "github.com/twitchtv/twirp" @@ -35,7 +34,7 @@ type RoomServiceClient struct { func NewRoomServiceClient(url string, apiKey string, secretKey string, opts ...twirp.ClientOption) *RoomServiceClient { opts = append(opts, xtwirp.DefaultClientOptions()...) url = signalling.ToHttpURL(url) - client := livekit.NewRoomServiceProtobufClient(url, &http.Client{}, opts...) + client := livekit.NewRoomServiceProtobufClient(url, newAPIHTTPClient(), opts...) return &RoomServiceClient{ roomService: client, authBase: authBase{ diff --git a/sipclient.go b/sipclient.go index 1fa0faed..acded63f 100644 --- a/sipclient.go +++ b/sipclient.go @@ -16,7 +16,6 @@ package lksdk import ( "context" - "net/http" "time" "github.com/twitchtv/twirp" @@ -38,7 +37,7 @@ type SIPClient struct { func NewSIPClient(url string, apiKey string, secretKey string, opts ...twirp.ClientOption) *SIPClient { opts = append(opts, xtwirp.DefaultClientOptions()...) return &SIPClient{ - sipClient: livekit.NewSIPProtobufClient(signalling.ToHttpURL(url), &http.Client{}, opts...), + sipClient: livekit.NewSIPProtobufClient(signalling.ToHttpURL(url), newAPIHTTPClient(), opts...), authBase: authBase{ apiKey: apiKey, apiSecret: secretKey,