diff --git a/CHANGELOG.md b/CHANGELOG.md index 575c86f68d..f526d35a03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ All notable changes to this project will be documented in this file. From versio - Shutdown should wait for in flight requests by @mkleczek in #4702 - Remove automatic transaction retries on `40001 (serialization_failure)` errors to prevent replication lag by @laurenceisla in #3673 - Fix unexpected results when embedding and filtering the same table more than once by @laurenceisla in #4075 +- Stop reporting 503s errors unnecessarily while the schema cache is loading at startup by @mkleczek in #4880 ### Changed diff --git a/src/PostgREST/Admin.hs b/src/PostgREST/Admin.hs index 99733a6995..f8501417be 100644 --- a/src/PostgREST/Admin.hs +++ b/src/PostgREST/Admin.hs @@ -22,20 +22,20 @@ import qualified PostgREST.AppState as AppState import qualified Network.Socket as NS import Protolude -runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO () -runAdmin appState maybeAdminSocket socketREST settings = do +runAdmin :: AppState -> Maybe NS.Socket -> IO (Maybe NS.Socket) -> Warp.Settings -> IO () +runAdmin appState maybeAdminSocket getSocketREST settings = do whenJust maybeAdminSocket $ \adminSocket -> do address <- resolveSocketToAddress adminSocket observer $ AdminStartObs address void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp where - adminApp = admin appState socketREST + adminApp = admin appState getSocketREST observer = AppState.getObserver appState -- | PostgREST admin application -admin :: AppState.AppState -> NS.Socket -> Wai.Application -admin appState socketREST req respond = do - isMainAppReachable <- isRight <$> reachMainApp socketREST +admin :: AppState.AppState -> IO (Maybe NS.Socket) -> Wai.Application +admin appState getSocketREST req respond = do + isMainAppReachable <- getSocketREST >>= maybe (pure False) (fmap isRight . reachMainApp) isLoaded <- AppState.isLoaded appState isPending <- AppState.isPending appState @@ -44,8 +44,8 @@ admin appState socketREST req respond = do respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status500) [] mempty ["ready"] -> let - status | not isMainAppReachable = HTTP.status500 - | isPending = HTTP.status503 + status | isPending = HTTP.status503 + | not isMainAppReachable = HTTP.status500 | isLoaded = HTTP.status200 | otherwise = HTTP.status500 in diff --git a/src/PostgREST/App.hs b/src/PostgREST/App.hs index 25f7f03a23..aa73354781 100644 --- a/src/PostgREST/App.hs +++ b/src/PostgREST/App.hs @@ -25,6 +25,8 @@ import System.IO.Error (ioeGetErrorType) import Control.Monad.Except (liftEither) import Control.Monad.Extra (whenJust) import Data.Either.Combinators (mapLeft, whenLeft) +import Data.IORef (atomicWriteIORef, newIORef, + readIORef) import Data.String (IsString (..)) import Network.Wai.Handler.Warp (defaultSettings, setHost, setOnException, setPort, @@ -63,11 +65,10 @@ import PostgREST.Version (docsVersion, prettyVersion) import qualified Data.ByteString.Char8 as BS import qualified Data.List as L -import Data.Streaming.Network (bindPortTCP, - bindRandomPortTCP) +import Data.Streaming.Network (bindPortTCP) import qualified Data.Text as T import qualified Network.HTTP.Types as HTTP -import qualified Network.HTTP.Types.Header as HTTP (hVary) +import qualified Network.HTTP.Types.Header as HTTP import qualified Network.Socket as NS import PostgREST.Unix (createAndBindDomainSocket) import Protolude hiding (Handler) @@ -76,22 +77,30 @@ run :: AppState -> IO () run appState = do conf <- AppState.getConfig appState - AppState.schemaCacheLoader appState -- Loads the initial SchemaCache - (mainSocket, adminSocket) <- initSockets conf + mainSocketRef <- newIORef Nothing + adminSocket <- initAdminServerSocket conf + let closeSockets = do whenJust adminSocket NS.close - NS.close mainSocket + readIORef mainSocketRef >>= foldMap NS.close Unix.installSignalHandlers observer closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState) + Admin.runAdmin appState adminSocket (readIORef mainSocketRef) (serverSettings conf) + Listener.runListener appState - Admin.runAdmin appState adminSocket mainSocket (serverSettings conf) + -- Kick off and wait for the initial SchemaCache load before creating the + -- main API socket. + AppState.schemaCacheLoader appState + AppState.waitForSchemaCacheInit appState + + mainSocket <- initServerSocket conf + atomicWriteIORef mainSocketRef $ Just mainSocket let app = postgrest appState (AppState.schemaCacheLoader appState) - do - address <- resolveSocketToAddress mainSocket - observer $ AppServerAddressObs address + address <- resolveSocketToAddress mainSocket + observer $ AppServerAddressObs address Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app where @@ -258,38 +267,16 @@ addRetryHint delay response = do isServiceUnavailable :: Wai.Response -> Bool isServiceUnavailable response = Wai.responseStatus response == HTTP.status503 -type AppSockets = (NS.Socket, Maybe NS.Socket) - -initSockets :: AppConfig -> IO AppSockets -initSockets AppConfig{..} = do - let - cfg'usp = configServerUnixSocket - cfg'uspm = configServerUnixSocketMode - cfg'host = configServerHost - cfg'port = configServerPort - cfg'adminHost = configAdminServerHost - cfg'adminPort = configAdminServerPort - - sock <- case cfg'usp of - -- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows, - -- but we need to have runtime error if we try to use it in Windows, not compile time error - Just path -> createAndBindDomainSocket path cfg'uspm - Nothing -> do - (_, sock) <- - if cfg'port /= 0 - then do - sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host) - pure (cfg'port, sock) - else do - -- explicitly bind to a random port, returning bound port number - (num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host) - pure (num, sock) - pure sock - - adminSock <- case cfg'adminPort of - Just adminPort -> do - adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost) - pure $ Just adminSock - Nothing -> pure Nothing - - pure (sock, adminSock) +initServerSocket :: AppConfig -> IO NS.Socket +initServerSocket AppConfig{..} = case configServerUnixSocket of + -- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows, + -- but we need to have runtime error if we try to use it in Windows, not compile time error + Just path -> createAndBindDomainSocket path configServerUnixSocketMode + Nothing -> bindPortTCP configServerPort (fromString $ T.unpack configServerHost) + +initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket) +initAdminServerSocket AppConfig{..} = + traverse (`bindPortTCP` adminHost) configAdminServerPort + where + adminHost = fromString $ T.unpack configAdminServerHost + diff --git a/src/PostgREST/AppState.hs b/src/PostgREST/AppState.hs index fdaac1a908..f975ea4134 100644 --- a/src/PostgREST/AppState.hs +++ b/src/PostgREST/AppState.hs @@ -27,6 +27,7 @@ module PostgREST.AppState , getObserver , isLoaded , isPending + , waitForSchemaCacheInit ) where import qualified Data.ByteString.Char8 as BS @@ -53,6 +54,9 @@ import Data.IORef (IORef, atomicWriteIORef, newIORef, readIORef) import Data.Time.Clock (UTCTime, getCurrentTime) +import Control.Concurrent.STM (TMVar, newEmptyTMVarIO, + putTMVar, readTMVar, + tryReadTMVar, tryTakeTMVar) import PostgREST.Auth.JwtCache (JwtCacheState, update) import PostgREST.Config (AppConfig (..), readAppConfig, @@ -102,9 +106,11 @@ data AppState = AppState } -- | Schema cache status. --- Empty means pending and full means loaded. +-- Empty means initial loading on startup, False means pending and True means loaded. +-- "Initial" state is needed so that we can wait with application socket listening +-- until after initial schema cache querying. newtype SchemaCacheStatus = SchemaCacheStatus - { getSCStatusMVar :: MVar () + { getSCStatusTMVar :: TMVar Bool } init :: AppConfig -> IO AppState @@ -380,16 +386,21 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea oneSecondInUs = 1000000 -- one second in microseconds newSchemaCacheStatus :: IO SchemaCacheStatus -newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyMVar +newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyTMVarIO markSchemaCachePending :: AppState -> IO () -markSchemaCachePending = void . tryTakeMVar . getSCStatusMVar . stateSCacheStatus +markSchemaCachePending = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` False) . getSCStatusTMVar . stateSCacheStatus markSchemaCacheLoaded :: AppState -> IO () -markSchemaCacheLoaded = void . (`tryPutMVar` ()) . getSCStatusMVar . stateSCacheStatus +markSchemaCacheLoaded = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` True) . getSCStatusTMVar . stateSCacheStatus isSchemaCacheLoaded :: AppState -> IO Bool -isSchemaCacheLoaded = fmap not . isEmptyMVar . getSCStatusMVar . stateSCacheStatus +isSchemaCacheLoaded = atomically . (pure . fromMaybe False <=< tryReadTMVar) . getSCStatusTMVar . stateSCacheStatus + +-- | Wait for initial schema cache load to either finish or retry +-- | We wait until scStatusTMVar is not empty. +waitForSchemaCacheInit :: AppState -> IO () +waitForSchemaCacheInit = atomically . void . readTMVar . getSCStatusTMVar . stateSCacheStatus -- | Reads the in-db config and reads the config file again -- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue. diff --git a/src/PostgREST/SchemaCache.hs b/src/PostgREST/SchemaCache.hs index 997d16ab23..c85dfe878e 100644 --- a/src/PostgREST/SchemaCache.hs +++ b/src/PostgREST/SchemaCache.hs @@ -157,6 +157,9 @@ maxDbTablesForFuzzySearch = 500 querySchemaCache :: AppConfig -> SQL.Transaction SchemaCache querySchemaCache conf@AppConfig{..} = do SQL.sql "set local schema ''" -- This voids the search path. The following queries need this for getting the fully qualified name(schema.name) of every db object + _ <- + let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in + for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing tabs <- sqlTimedStmt gucTbls conf allTables keyDeps <- sqlTimedStmt gucKDeps conf allViewsKeyDependencies m2oRels <- sqlTimedStmt gucRels mempty allM2OandO2ORels @@ -167,9 +170,6 @@ querySchemaCache conf@AppConfig{..} = do tzones <- if configDbTimezoneEnabled then sqlTimedStmt gucTzones mempty timezones else pure S.empty - _ <- - let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in - for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing qsTime <- if isLogDebug diff --git a/test/io/test_io.py b/test/io/test_io.py index aa3e923dcd..a506992587 100644 --- a/test/io/test_io.py +++ b/test/io/test_io.py @@ -6,6 +6,7 @@ import subprocess import time import pytest +import requests from config import CONFIGSDIR, FIXTURES, SECRET from util import ( @@ -1090,7 +1091,7 @@ def test_empty_schema_cache_log_contains_jwt_role(defaultenv): env = { **defaultenv, - "PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000", + "PGRST_DB_SCHEMAS": "non_existent_schema_aaaa", "PGRST_JWT_SECRET": SECRET, } headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET) @@ -1543,14 +1544,19 @@ def test_log_postgrest_host_and_port(host, defaultenv): with run( env=defaultenv, host=host, port=port, no_startup_stdout=False ) as postgrest: - output = postgrest.read_stdout(nlines=10) + output = postgrest.read_stdout(nlines=11) + # Cannot assume a particular log entry order + # Listening on a socket happens after schema querying + # but is concurrent to the schema loading process + # and migh happen before or after writing of the + # "Schema cache loaded" log entry if is_unix: - re.match(r'API server listening on "/tmp/.*\.sock"', output[2]) + match_log(output, [r".*API server listening on .*/tmp/.*\.sock"]) elif is_ipv6(host): - assert f"API server listening on [{host}]:{port}" in output[2] + match_log(output, [r".*API server listening on \[.+]:\d+"]) else: # IPv4 - assert f"API server listening on {host}:{port}" in output[2] + match_log(output, [r".*API server listening on .+:\d+"]) def test_succeed_w_role_having_superuser_settings(defaultenv): @@ -1898,17 +1904,24 @@ def test_pgrst_log_503_client_error_to_stderr(defaultenv): assert any(log_message in line for line in output) -def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv): - "Should log the 503 error message when there is an empty schema cache on startup" +def test_log_error_when_schema_cache_load_error_on_startup_to_stderr(defaultenv): + "Should log the 503 error message when there is an error loading schema cache on startup" env = { **defaultenv, - "PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300", + "PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000", + "PGRST_DB_SCHEMAS": "non_existent_schema_aaaa", } with run(env=env, wait_for=None) as postgrest: postgrest.wait_until_scache_starts_loading() + # First call should fail with connection refused + with pytest.raises(requests.ConnectionError): + postgrest.session.get("/projects") + + # Next call should return 503 + time.sleep(1) response = postgrest.session.get("/projects") assert response.status_code == 503 @@ -1920,7 +1933,7 @@ def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv): def test_no_double_schema_cache_reload_on_empty_schema(defaultenv): - "Should only load the schema cache once on a 503 error when there's an empty schema cache on startup" + "Should only load the schema cache once when there's an empty schema cache on startup" env = { **defaultenv, @@ -1930,12 +1943,15 @@ def test_no_double_schema_cache_reload_on_empty_schema(defaultenv): with run(env=env, port=freeport(), wait_for=None) as postgrest: postgrest.wait_until_scache_starts_loading() - response = postgrest.session.get("/projects") - assert response.status_code == 503 + with pytest.raises(requests.ConnectionError): + postgrest.session.get("/projects") # Should wait enough time to load the schema cache twice to guarantee that the test is valid time.sleep(1) + response = postgrest.session.get("/projects") + assert response.status_code == 200 + response = postgrest.admin.get("/metrics") assert response.status_code == 200 assert 'pgrst_schema_cache_loads_total{status="SUCCESS"} 1.0' in response.text @@ -2017,7 +2033,7 @@ def test_schema_cache_error_observation(defaultenv): output = postgrest.read_stdout(nlines=9) assert ( "Failed to load the schema cache using db-schemas=public and db-extra-search-path=x" - in output[7] + in output[6] )