Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ All notable changes to this project will be documented in this file. From versio
- Shutdown should wait for in flight requests by @mkleczek in #4702
- Remove automatic transaction retries on `40001 (serialization_failure)` errors to prevent replication lag by @laurenceisla in #3673
- Fix unexpected results when embedding and filtering the same table more than once by @laurenceisla in #4075
- Stop reporting 503s errors unnecessarily while the schema cache is loading at startup by @mkleczek in #4880

### Changed

Expand Down
16 changes: 8 additions & 8 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import qualified PostgREST.AppState as AppState
import qualified Network.Socket as NS
import Protolude

runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO ()
runAdmin appState maybeAdminSocket socketREST settings = do
runAdmin :: AppState -> Maybe NS.Socket -> IO (Maybe NS.Socket) -> Warp.Settings -> IO ()
runAdmin appState maybeAdminSocket getSocketREST settings = do
whenJust maybeAdminSocket $ \adminSocket -> do
address <- resolveSocketToAddress adminSocket
observer $ AdminStartObs address
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
where
adminApp = admin appState socketREST
adminApp = admin appState getSocketREST
observer = AppState.getObserver appState

-- | PostgREST admin application
admin :: AppState.AppState -> NS.Socket -> Wai.Application
admin appState socketREST req respond = do
isMainAppReachable <- isRight <$> reachMainApp socketREST
admin :: AppState.AppState -> IO (Maybe NS.Socket) -> Wai.Application
admin appState getSocketREST req respond = do
isMainAppReachable <- getSocketREST >>= maybe (pure False) (fmap isRight . reachMainApp)
isLoaded <- AppState.isLoaded appState
isPending <- AppState.isPending appState

Expand All @@ -44,8 +44,8 @@ admin appState socketREST req respond = do
respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status500) [] mempty
["ready"] ->
let
status | not isMainAppReachable = HTTP.status500
| isPending = HTTP.status503
status | isPending = HTTP.status503
| not isMainAppReachable = HTTP.status500
Comment thread
steve-chavez marked this conversation as resolved.
| isLoaded = HTTP.status200
| otherwise = HTTP.status500
in
Expand Down
52 changes: 29 additions & 23 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import System.IO.Error (ioeGetErrorType)
import Control.Monad.Except (liftEither)
import Control.Monad.Extra (whenJust)
import Data.Either.Combinators (mapLeft, whenLeft)
import Data.IORef (atomicWriteIORef, newIORef,
readIORef)
import Data.String (IsString (..))
import Network.Wai.Handler.Warp (defaultSettings, setHost,
setOnException, setPort,
Expand Down Expand Up @@ -68,7 +70,7 @@ import qualified Data.List as L
import Data.Streaming.Network (bindPortTCP)
import qualified Data.Text as T
import qualified Network.HTTP.Types as HTTP
import qualified Network.HTTP.Types.Header as HTTP (hVary)
import qualified Network.HTTP.Types.Header as HTTP
import qualified Network.Socket as NS
import PostgREST.Unix (createAndBindDomainSocket)
import Protolude hiding (Handler)
Expand All @@ -77,22 +79,30 @@ run :: AppState -> IO ()
run appState = do
conf <- AppState.getConfig appState

AppState.schemaCacheLoader appState -- Loads the initial SchemaCache
(mainSocket, adminSocket) <- initSockets conf
mainSocketRef <- newIORef Nothing
adminSocket <- initAdminServerSocket conf

let closeSockets = do
whenJust adminSocket NS.close
NS.close mainSocket
readIORef mainSocketRef >>= foldMap NS.close
Unix.installSignalHandlers observer closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)

Admin.runAdmin appState adminSocket (readIORef mainSocketRef) (serverSettings conf)

Listener.runListener appState

Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
-- Kick off and wait for the initial SchemaCache load before creating the
-- main API socket.
AppState.schemaCacheLoader appState
AppState.waitForSchemaCacheInit appState

mainSocket <- initServerSocket conf
atomicWriteIORef mainSocketRef $ Just mainSocket
Comment thread
steve-chavez marked this conversation as resolved.

let app = postgrest appState (AppState.schemaCacheLoader appState)

do
address <- resolveSocketToAddress mainSocket
observer $ AppServerAddressObs address
address <- resolveSocketToAddress mainSocket
observer $ AppServerAddressObs address

Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app
where
Expand Down Expand Up @@ -255,20 +265,16 @@ addRetryHint delay response = do
isServiceUnavailable :: Wai.Response -> Bool
isServiceUnavailable response = Wai.responseStatus response == HTTP.status503

type AppSockets = (NS.Socket, Maybe NS.Socket)
initServerSocket :: AppConfig -> IO NS.Socket
initServerSocket AppConfig{..} = case configServerUnixSocket of
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
-- but we need to have runtime error if we try to use it in Windows, not compile time error
Just path -> createAndBindDomainSocket path configServerUnixSocketMode
Nothing -> bindPortTCP configServerPort (fromString $ T.unpack configServerHost)

initSockets :: AppConfig -> IO AppSockets
initSockets AppConfig{..} = do
sock <- case configServerUnixSocket of
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
-- but we need to have runtime error if we try to use it in Windows, not compile time error
Just path -> createAndBindDomainSocket path configServerUnixSocketMode
Nothing -> bindPortTCP configServerPort (fromString $ T.unpack configServerHost)

adminSock <- case configAdminServerPort of
Just adminPort -> do
adminSock <- bindPortTCP adminPort (fromString $ T.unpack configAdminServerHost)
pure $ Just adminSock
Nothing -> pure Nothing
initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket)
initAdminServerSocket AppConfig{..} =
traverse (`bindPortTCP` adminHost) configAdminServerPort
where
adminHost = fromString $ T.unpack configAdminServerHost

pure (sock, adminSock)
23 changes: 17 additions & 6 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module PostgREST.AppState
, getObserver
, isLoaded
, isPending
, waitForSchemaCacheInit
) where

import qualified Data.ByteString.Char8 as BS
Expand All @@ -53,6 +54,9 @@ import Data.IORef (IORef, atomicWriteIORef, newIORef,
readIORef)
import Data.Time.Clock (UTCTime, getCurrentTime)

import Control.Concurrent.STM (TMVar, newEmptyTMVarIO,
putTMVar, readTMVar,
tryReadTMVar, tryTakeTMVar)
import PostgREST.Auth.JwtCache (JwtCacheState, update)
import PostgREST.Config (AppConfig (..),
readAppConfig,
Expand Down Expand Up @@ -102,9 +106,11 @@ data AppState = AppState
}

-- | Schema cache status.
-- Empty means pending and full means loaded.
-- Empty means initial loading on startup, False means pending and True means loaded.
Comment thread
steve-chavez marked this conversation as resolved.
-- "Initial" state is needed so that we can wait with application socket listening
-- until after initial schema cache querying.
newtype SchemaCacheStatus = SchemaCacheStatus
{ getSCStatusMVar :: MVar ()
{ getSCStatusTMVar :: TMVar Bool
}

init :: AppConfig -> IO AppState
Expand Down Expand Up @@ -380,16 +386,21 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
oneSecondInUs = 1000000 -- one second in microseconds

newSchemaCacheStatus :: IO SchemaCacheStatus
newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyMVar
newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyTMVarIO

markSchemaCachePending :: AppState -> IO ()
markSchemaCachePending = void . tryTakeMVar . getSCStatusMVar . stateSCacheStatus
markSchemaCachePending = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` False) . getSCStatusTMVar . stateSCacheStatus

markSchemaCacheLoaded :: AppState -> IO ()
markSchemaCacheLoaded = void . (`tryPutMVar` ()) . getSCStatusMVar . stateSCacheStatus
markSchemaCacheLoaded = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` True) . getSCStatusTMVar . stateSCacheStatus

isSchemaCacheLoaded :: AppState -> IO Bool
isSchemaCacheLoaded = fmap not . isEmptyMVar . getSCStatusMVar . stateSCacheStatus
isSchemaCacheLoaded = atomically . (pure . fromMaybe False <=< tryReadTMVar) . getSCStatusTMVar . stateSCacheStatus

-- | Wait for initial schema cache load to either finish or retry
-- | We wait until scStatusTMVar is not empty.
waitForSchemaCacheInit :: AppState -> IO ()
waitForSchemaCacheInit = atomically . void . readTMVar . getSCStatusTMVar . stateSCacheStatus

-- | Reads the in-db config and reads the config file again
-- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue.
Expand Down
6 changes: 3 additions & 3 deletions src/PostgREST/SchemaCache.hs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ maxDbTablesForFuzzySearch = 500
querySchemaCache :: AppConfig -> SQL.Transaction (SchemaCache, Maybe QueryTimings)
querySchemaCache conf@AppConfig{..} = do
SQL.sql "set local schema ''" -- This voids the search path. The following queries need this for getting the fully qualified name(schema.name) of every db object
_ <-
let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in
for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing
tabs <- sqlTimedStmt gucTbls conf allTables
keyDeps <- sqlTimedStmt gucKDeps conf allViewsKeyDependencies
m2oRels <- sqlTimedStmt gucRels mempty allM2OandO2ORels
Expand All @@ -166,9 +169,6 @@ querySchemaCache conf@AppConfig{..} = do
tzones <- if configDbTimezoneEnabled
then sqlTimedStmt gucTzones mempty timezones
else pure S.empty
_ <-
let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in
for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing
Comment on lines 159 to -171
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this moved here? Change looks like it should be done independently. I remember we had some tests relying on the fact this sleep was after all the schema cache queries.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is needed - otherwise test_log_error_when_schema_cache_load_error_on_startup_to_stderr fails. The reason is that this test sets two env variables: PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP=1000 and PGRST_DB_SCHEMAS=non_existent_schema_aaaa (ie. we want to fail schema cache query but after waiting 1s.
If waiting is after queries then it fails immediately.

Copy link
Copy Markdown
Member

@steve-chavez steve-chavez May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I remember some tests depend on this sleep being last, so if we change it those tests would be ineffective and misleading. We should do a git blame for finding those tests and tune them (or remove) if necessary.

Or as an alternative, introduce another sleep before the schema cache queries and leave this one in place.

Copy link
Copy Markdown
Collaborator Author

@mkleczek mkleczek May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I've searched through the tests for usage of this variable and cannot see anything that would require it after queries. It would only be relevant for tests that don't want to wait in case of errors in queries - but we don't have such tests, I think.

Copy link
Copy Markdown
Member

@steve-chavez steve-chavez May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, the original test was modified on 92ac7e5. This leaves us with this current test:

postgrest/test/io/test_io.py

Lines 1336 to 1367 in ed0d9dc

def test_schema_cache_concurrent_notifications(slow_schema_cache_env):
"schema cache should be up-to-date whenever a notification is sent while another reload is in progress, see https://github.com/PostgREST/postgrest/issues/2791"
internal_sleep = (
int(slow_schema_cache_env["PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP"]) / 1000
)
with run(env=slow_schema_cache_env, wait_for_readiness=False) as postgrest:
time.sleep(2 * internal_sleep + 0.1) # wait for readiness manually
# first request, create a function and set a schema cache reload in progress
response = postgrest.session.post("/rpc/create_function")
assert response.text == ""
assert response.status_code == 204
time.sleep(
internal_sleep / 2
) # wait to be inside the schema cache reload process
# second request, change the same function and do another schema cache reload
response = postgrest.session.post("/rpc/migrate_function")
assert response.text == ""
assert response.status_code == 204
time.sleep(
2 * internal_sleep
) # wait enough time to get the final schema cache state
# confirm the schema cache is up-to-date and the 2nd reload wasn't lost
response = postgrest.session.get("/rpc/mult_them?c=3&d=4")
assert response.text == "12"
assert response.status_code == 200

Which IIRC it's kind of tricky because it involves concurrent notifications.

@mkleczek Can you ensure this test is still effective? I would also be fine if it's modified to not depend on this sleep if possible (if done should be another PR).

Copy link
Copy Markdown
Collaborator Author

@mkleczek mkleczek May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that the order did matter otherwise I couldn't reproduce the failure as I did on #2810. Changing the order of this sleep would render that test ineffective.

I would like to understand how it is possible. The signature of schema loading function where sleep happens is:

querySchemaCache :: AppConfig -> SQL.Transaction SchemaCache

So it is an atomic transaction returning schema cache or error. There is no way for any interim state to be observable. So there is no way the order of operations would matter for any code outside of this transaction.
The only reason why sleep was moved was to make sure that waiting happens always - regardless of whether it returns failure or success.

I wouldn't introduce any other complexity without first understanding throughly the need for it.

Copy link
Copy Markdown
Member

@steve-chavez steve-chavez May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkleczek Mind you that #2810 was before debouncing was introduced. We want to prove the test on 4bcb2d7#diff-d0a535c2686bd7fcd98f5fa5baca2b9dc84e59c6a2c0f350a0728ebd3e2017a7R949-R986 fails.

Sleeping at the end:

  • 1st Migration happens, NOTIFY is done, cache is reloaded + schema cache query sleeps at the end
  • 2nd Migration happens while the 1st cache query is sleeping, NOTIFY is done, no cache reload happens, 1st schema cache query finishes sleeping and the tx ends, change is lost
  • Test fails

Sleeping before:

  • 1st Migration happens, NOTIFY is done, cache is reloaded + schema cache query sleeps at the beginning
  • 2nd Migration happens while the 1st cache query is sleeping, NOTIFY is done, no cache reload happens but 1st cache query is finished sleeping then proceeds to run the schema cache query and the tx ends, change is captured
  • Test succeeds

Did my best to refresh my memory and explain, hopefully that will do. If this is not sufficient I'd suggest doing a git checkout on 4bcb2d7 and see how the test succeeds (we want to prove a fail there) by moving the sleep before.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh... got it.

I would say we need to think about some totally different way to test it.

The problem with the approach we have today is that what we really are testing is PostgreSQL transaction isolation:
We optimistically assume second migration happens after all the queries but before querySchemaCache transaction completes. But what would happen if the second migration happened somewhere between the queries in querySchemaCache? The test would succeed and yet some second migration changes would be lost.

The only proper way to test that no notifications are lost is to verify that two schema reloads happened. Checking final schema cache state is not enough I'm afraid.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what would happen if the second migration happened somewhere between the queries in querySchemaCache? The test would succeed and yet some second migration changes would be lost.

Correct, what I did on #2810 was the minimal sufficient to prove the failure and fix.

The only proper way to test that no notifications are lost is to verify that two schema reloads happened. Checking final schema cache state is not enough I'm afraid.

We could add the pgrst_schema_cache_loads_total = 2 metric for extra assurance. Although that wouldn't prove the postgres migration was applied correctly.

Any other ideas?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something along the lines of: #4962


qsTime <-
if isLogDebug
Expand Down
40 changes: 28 additions & 12 deletions test/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import subprocess
import time
import pytest
import requests

from config import CONFIGSDIR, FIXTURES, SECRET
from util import (
Expand Down Expand Up @@ -1090,7 +1091,7 @@ def test_empty_schema_cache_log_contains_jwt_role(defaultenv):

env = {
**defaultenv,
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000",
"PGRST_DB_SCHEMAS": "non_existent_schema_aaaa",
"PGRST_JWT_SECRET": SECRET,
}
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
Expand Down Expand Up @@ -1543,14 +1544,19 @@ def test_log_postgrest_host_and_port(host, defaultenv):
with run(
env=defaultenv, host=host, port=port, no_startup_stdout=False
) as postgrest:
output = postgrest.read_stdout(nlines=10)
output = postgrest.read_stdout(nlines=11)

# Cannot assume a particular log entry order
# Listening on a socket happens after schema querying
# but is concurrent to the schema loading process
# and migh happen before or after writing of the
# "Schema cache loaded" log entry
if is_unix:
re.match(r'API server listening on "/tmp/.*\.sock"', output[2])
match_log(output, [r".*API server listening on .*/tmp/.*\.sock"])
elif is_ipv6(host):
assert f"API server listening on [{host}]:{port}" in output[2]
match_log(output, [r".*API server listening on \[.+]:\d+"])
else: # IPv4
assert f"API server listening on {host}:{port}" in output[2]
match_log(output, [r".*API server listening on .+:\d+"])


def test_succeed_w_role_having_superuser_settings(defaultenv):
Expand Down Expand Up @@ -1898,17 +1904,24 @@ def test_pgrst_log_503_client_error_to_stderr(defaultenv):
assert any(log_message in line for line in output)


def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv):
"Should log the 503 error message when there is an empty schema cache on startup"
def test_log_error_when_schema_cache_load_error_on_startup_to_stderr(defaultenv):
"Should log the 503 error message when there is an error loading schema cache on startup"

env = {
**defaultenv,
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300",
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000",
"PGRST_DB_SCHEMAS": "non_existent_schema_aaaa",
}

with run(env=env, wait_for=None) as postgrest:
postgrest.wait_until_scache_starts_loading()

# First call should fail with connection refused
with pytest.raises(requests.ConnectionError):
postgrest.session.get("/projects")

# Next call should return 503
time.sleep(1)
Comment thread
steve-chavez marked this conversation as resolved.
response = postgrest.session.get("/projects")
assert response.status_code == 503

Expand All @@ -1920,7 +1933,7 @@ def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv):


def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
"Should only load the schema cache once on a 503 error when there's an empty schema cache on startup"
"Should only load the schema cache once when there's an empty schema cache on startup"

env = {
**defaultenv,
Expand All @@ -1930,12 +1943,15 @@ def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
with run(env=env, port=freeport(), wait_for=None) as postgrest:
postgrest.wait_until_scache_starts_loading()

response = postgrest.session.get("/projects")
assert response.status_code == 503
with pytest.raises(requests.ConnectionError):
postgrest.session.get("/projects")
Comment thread
wolfgangwalther marked this conversation as resolved.

# Should wait enough time to load the schema cache twice to guarantee that the test is valid
time.sleep(1)

response = postgrest.session.get("/projects")
assert response.status_code == 200

Comment thread
steve-chavez marked this conversation as resolved.
response = postgrest.admin.get("/metrics")
assert response.status_code == 200
assert 'pgrst_schema_cache_loads_total{status="SUCCESS"} 1.0' in response.text
Expand Down Expand Up @@ -2017,7 +2033,7 @@ def test_schema_cache_error_observation(defaultenv):
output = postgrest.read_stdout(nlines=9)
assert (
"Failed to load the schema cache using db-schemas=public and db-extra-search-path=x"
in output[7]
in output[6]
)


Expand Down