Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. From versio

## Unreleased

### Fixed

- Fix race condition in pool_available metric causing negative values during network instability by @mkleczek in #4622

## [14.11] - 2026-05-04

### Fixed
Expand Down
3 changes: 3 additions & 0 deletions nix/tools/tests.nix
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
, hostPlatform
, jq
, lib
, nginx
, postgrest
, python3
, runtimeShell
Expand Down Expand Up @@ -94,6 +95,7 @@ let
args = [ "ARG_LEFTOVERS([pytest arguments])" ];
workingDir = "/";
withEnv = postgrest.env;
withPath = [ nginx ];
}
''
${cabal-install}/bin/cabal v2-build ${devCabalOptions} exe:postgrest
Expand Down Expand Up @@ -156,6 +158,7 @@ let
redirectTixFiles = false;
withEnv = postgrest.env;
withTmpDir = true;
withPath = [ nginx ];
}
(
# required for `hpc markup` in CI; glibcLocales is not available e.g. on Darwin
Expand Down
1 change: 1 addition & 0 deletions postgrest.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ library
, stm-hamt >= 1.2 && < 2
, focus >= 1.0 && < 2
, some >= 1.0.4.1 && < 2
, uuid >= 1.3 && < 2
-- -fno-spec-constr may help keep compile time memory use in check,
-- see https://gitlab.haskell.org/ghc/ghc/issues/16017#note_219304
-- -optP-Wno-nonportable-include-path
Expand Down
64 changes: 53 additions & 11 deletions src/PostgREST/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ Description : Metrics based on the Observation module. See Observation.hs.
-}
module PostgREST.Metrics
( init
, ConnTrack
, ConnStats (..)
, MetricsState (..)
, connectionCounts
, observationMetrics
, metricsToText
) where
Expand All @@ -17,12 +20,18 @@ import Prometheus

import PostgREST.Observation

import Protolude
import Control.Arrow ((&&&))
import Data.Bitraversable (bisequenceA)
import Data.Tuple.Extra (both)
import Data.UUID (UUID)
import qualified Focus
import Protolude
import qualified StmHamt.SizedHamt as SH

data MetricsState =
MetricsState {
poolTimeouts :: Counter,
poolAvailable :: Gauge,
connTrack :: ConnTrack,
poolWaiting :: Gauge,
poolMaxSize :: Gauge,
schemaCacheLoads :: Vector Label1 Counter,
Expand All @@ -36,7 +45,7 @@ init :: Int -> IO MetricsState
init configDbPoolSize = do
metricState <- MetricsState <$>
register (counter (Info "pgrst_db_pool_timeouts_total" "The total number of pool connection timeouts")) <*>
register (gauge (Info "pgrst_db_pool_available" "Available connections in the pool")) <*>
register (Metric ((identity &&& dbPoolAvailable) <$> connectionTracker)) <*>
register (gauge (Info "pgrst_db_pool_waiting" "Requests waiting to acquire a pool connection")) <*>
register (gauge (Info "pgrst_db_pool_max" "Max pool connections")) <*>
register (vector "status" $ counter (Info "pgrst_schema_cache_loads_total" "The total number of times the schema cache was loaded")) <*>
Expand All @@ -46,20 +55,28 @@ init configDbPoolSize = do
register (counter (Info "pgrst_jwt_cache_evictions_total" "The total number of JWT cache evictions"))
setGauge (poolMaxSize metricState) (fromIntegral configDbPoolSize)
pure metricState
where
dbPoolAvailable = (pure . noLabelsGroup (Info "pgrst_db_pool_available" "Available connections in the pool") GaugeType . calcAvailable <$>) . connectionCounts
where
calcAvailable = liftA2 (-) connected inUse
toSample name labels = Sample name labels . encodeUtf8 . show
noLabelsGroup info sampleType = SampleGroup info sampleType . pure . toSample (metricName info) mempty

-- Only some observations are used as metrics
observationMetrics :: MetricsState -> ObservationHandler
observationMetrics MetricsState{..} obs = case obs of
(PoolAcqTimeoutObs _) -> do
incCounter poolTimeouts
(HasqlPoolObs (SQL.ConnectionObservation _ status)) -> case status of
SQL.ReadyForUseConnectionStatus -> do
incGauge poolAvailable
SQL.InUseConnectionStatus -> do
decGauge poolAvailable
SQL.TerminatedConnectionStatus _ -> do
decGauge poolAvailable
SQL.ConnectingConnectionStatus -> pure ()
-- Handle pool observations with connection tracking
-- this is necessary because it is not possible
-- to accurately maintain open/in use conneciton counts
-- statelessly based only on pool observation events.
-- The reason is that hasql-pool emits TerminatedConnectionStatus
-- both for connections successfully established and failed when connecting.
-- When receiving TerminatedConnectionStatus we have to find out
-- if we can decrement established connection count. To do that we have to track
-- established connections.
(HasqlPoolObs sqlObs) -> trackConnections connTrack sqlObs
PoolRequest ->
incGauge poolWaiting
PoolRequestFullfilled ->
Expand All @@ -77,3 +94,28 @@ observationMetrics MetricsState{..} obs = case obs of

metricsToText :: IO LBS.ByteString
metricsToText = exportMetricsAsText

data ConnStats = ConnStats {
connected :: Int,
inUse :: Int
} deriving (Eq, Show)

data ConnTrack = ConnTrack { connTrackConnected :: SH.SizedHamt UUID, connTrackInUse :: SH.SizedHamt UUID }

connectionTracker :: IO ConnTrack
connectionTracker = ConnTrack <$> SH.newIO <*> SH.newIO

trackConnections :: ConnTrack -> SQL.Observation -> IO ()
trackConnections ConnTrack{..} (SQL.ConnectionObservation uuid status) = case status of
SQL.ReadyForUseConnectionStatus -> atomically $
SH.insert identity uuid connTrackConnected *>
SH.focus Focus.delete identity uuid connTrackInUse
SQL.TerminatedConnectionStatus _ -> atomically $
SH.focus Focus.delete identity uuid connTrackConnected *>
SH.focus Focus.delete identity uuid connTrackInUse
SQL.InUseConnectionStatus -> atomically $
SH.insert identity uuid connTrackInUse
_ -> mempty

connectionCounts :: ConnTrack -> IO ConnStats
connectionCounts = atomically . fmap (uncurry ConnStats) . bisequenceA . both SH.size . (connTrackConnected &&& connTrackInUse)
1 change: 1 addition & 0 deletions test/io/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
FIXTURES = yaml.load(
(BASEDIR / "fixtures/fixtures.yaml").read_text(), Loader=yaml.Loader
)
NGINX_BIN = shutil.which("nginx")
POSTGREST_BIN = shutil.which("postgrest")
SECRET = "reallyreallyreallyreallyverysafe"

Expand Down
13 changes: 13 additions & 0 deletions test/io/nginx/nginx.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# the PG* variables are replaced by preprocessing, not done by nginx itself
daemon off;
pid ./nginx.pid;

events {}

stream {
server {
listen unix:$PGPROXYHOST/.s.PGSQL.5432;
proxy_timeout $PGPROXY_TIMEOUT;
proxy_pass unix:$PGHOST/.s.PGSQL.5432;
}
}
63 changes: 60 additions & 3 deletions test/io/postgrest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

import contextlib
import dataclasses
import enum
import os
import pathlib
import socket
import subprocess
import tempfile
import time
import string
import urllib.parse

import requests
import requests_unixsocket

from config import POSTGREST_BIN, hpctixfile
from config import POSTGREST_BIN, NGINX_BIN, hpctixfile


def sleep_until_postgrest_scache_reload():
Expand All @@ -35,6 +37,13 @@ class PostgrestTimedOut(Exception):
"Connecting to PostgREST endpoint timed out."


class Admin(str, enum.Enum):
"Admin endpoint to wait for before yielding a PostgREST process."

live = "live"
ready = "ready"


class PostgrestSession(requests_unixsocket.Session):
"HTTP client session directed at a PostgREST endpoint."

Expand Down Expand Up @@ -86,7 +95,7 @@ def run(
env=None,
port=None,
host=None,
wait_for_readiness=True,
wait_for=Admin.ready,
wait_max_seconds=1,
no_pool_connection_available=False,
no_startup_stdout=True,
Expand Down Expand Up @@ -138,8 +147,10 @@ def run(
process.stdin.write(stdin or b"")
process.stdin.close()

if wait_for_readiness:
if wait_for == Admin.ready:
wait_until_status_code(adminurl + "/ready", wait_max_seconds, 200)
elif wait_for == Admin.live:
wait_until_status_code(adminurl + "/live", wait_max_seconds, 200)

if no_startup_stdout:
process.stdout.read()
Expand All @@ -165,6 +176,52 @@ def run(
process.wait()


@contextlib.contextmanager
def run_pgproxy(env=None, proxy_timeout="1s"):
"Run nginx as a unix socket proxy for PostgreSQL and expose PGPROXYHOST."
env = dict(os.environ if env is None else env)

with tempfile.TemporaryDirectory() as tmpdir:
# build a <tmpdir>/conf/ so `nginx -p` picks the config automatically
tmpdir = pathlib.Path(tmpdir)
conf_dir = tmpdir / "conf"
conf_dir.mkdir(parents=True)

nginx_env = dict(env)
nginx_env["PGPROXYHOST"] = str(tmpdir)
nginx_env["PGPROXY_TIMEOUT"] = proxy_timeout

source_conf = pathlib.Path("test/io/nginx/nginx.conf")
out_conf = conf_dir / "nginx.conf"
out_conf.write_text(
string.Template(source_conf.read_text()).substitute(nginx_env)
)

process = subprocess.Popen(
[NGINX_BIN, "-p", str(tmpdir), "-e", "stderr"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=nginx_env,
)

if process.poll() is not None:
(_, stderr_output) = process.communicate(timeout=1)
raise RuntimeError(
f"{NGINX_BIN} exited with {process.returncode}: {stderr_output}"
)

try:
yield str(tmpdir)
finally:
process.terminate()
try:
process.wait(timeout=1)
except subprocess.TimeoutExpired:
process.kill()
process.wait()


def freeport(used_ports=None):
"Find an unused free port on localhost."
while True:
Expand Down
2 changes: 1 addition & 1 deletion test/io/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def test_fail_with_invalid_password(defaultenv):
"Connecting with an invalid password should fail without retries."
uri = f'postgresql://?dbname={defaultenv["PGDATABASE"]}&host={defaultenv["PGHOST"]}&user=some_protected_user&password=invalid_pass'
env = {**defaultenv, "PGRST_DB_URI": uri}
with run(env=env, wait_for_readiness=False) as postgrest:
with run(env=env, wait_for=None) as postgrest:
exitCode = wait_until_exit(postgrest)
assert exitCode == 1

Expand Down
36 changes: 28 additions & 8 deletions test/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
match_log,
)
from postgrest import (
Admin,
freeport,
is_ipv6,
reset_statement_timeout,
run,
run_pgproxy,
set_statement_timeout,
sleep_until_postgrest_config_reload,
sleep_until_postgrest_full_reload,
Expand Down Expand Up @@ -1078,7 +1080,7 @@ def test_schema_cache_concurrent_notifications(slow_schema_cache_env):
int(slow_schema_cache_env["PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP"]) / 1000
)

with run(env=slow_schema_cache_env, wait_for_readiness=False) as postgrest:
with run(env=slow_schema_cache_env, wait_for=None) as postgrest:
time.sleep(2 * internal_sleep + 0.1) # wait for readiness manually

# first request, create a function and set a schema cache reload in progress
Expand Down Expand Up @@ -1269,7 +1271,7 @@ def test_fail_with_invalid_dbname_and_automatic_recovery_disabled(defaultenv):
"PGRST_DB_POOL_AUTOMATIC_RECOVERY": "false",
}

with run(env=env, wait_for_readiness=False) as postgrest:
with run(env=env, wait_for=None) as postgrest:
exitCode = wait_until_exit(postgrest)
assert exitCode == 1

Expand Down Expand Up @@ -1558,7 +1560,7 @@ def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv):
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300",
}

with run(env=env, wait_for_readiness=False) as postgrest:
with run(env=env, wait_for=None) as postgrest:
postgrest.wait_until_scache_starts_loading()

response = postgrest.session.get("/projects")
Expand All @@ -1579,7 +1581,7 @@ def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300",
}

with run(env=env, port=freeport(), wait_for_readiness=False) as postgrest:
with run(env=env, port=freeport(), wait_for=None) as postgrest:
postgrest.wait_until_scache_starts_loading()

response = postgrest.session.get("/projects")
Expand Down Expand Up @@ -1661,7 +1663,7 @@ def test_schema_cache_error_observation(defaultenv):
"PGRST_DB_EXTRA_SEARCH_PATH": "x",
}

with run(env=env, no_startup_stdout=False, wait_for_readiness=False) as postgrest:
with run(env=env, no_startup_stdout=False, wait_for=None) as postgrest:
# TODO: postgrest should exit here, instead it keeps retrying
# exitCode = wait_until_exit(postgrest)
# assert exitCode == 1
Expand All @@ -1682,7 +1684,7 @@ def test_log_listener_connection_errors(defaultenv):
"PGRST_DB_CHANNEL_ENABLED": "true",
}

with run(env=env, no_startup_stdout=False, wait_for_readiness=False) as postgrest:
with run(env=env, no_startup_stdout=False, wait_for=None) as postgrest:
output = postgrest.read_stdout(nlines=5)
assert any(
'Failed listening for database notifications on the "pgrst" channel. could not translate host name "no_host" to address:'
Expand All @@ -1699,7 +1701,7 @@ def test_log_listener_connection_start(defaultenv):
"PGRST_DB_CHANNEL_ENABLED": "true",
}

with run(env=env, no_startup_stdout=False, wait_for_readiness=True) as postgrest:
with run(env=env, no_startup_stdout=False, wait_for=Admin.ready) as postgrest:
output = postgrest.read_stdout(nlines=10)
# Check for the listener start message containing host and port
# Do not check if pg version is displayed properly as it is tricky to test it
Expand Down Expand Up @@ -1727,7 +1729,7 @@ def test_db_pre_config_with_pg_reserved_words(defaultenv):
"PGRST_DB_PRE_CONFIG": "select", # no "select" function in our fixtures, fail gracefully at startup
}

with run(env=env, no_startup_stdout=False, wait_for_readiness=False) as postgrest:
with run(env=env, no_startup_stdout=False, wait_for=None) as postgrest:
output = postgrest.read_stdout(nlines=8)
assert any(
'Failed to query database settings for the config parameters.{"code":"42883","details":null,"hint":"No function matches the given name and argument types. You might need to add explicit type casts.","message":"function select() does not exist"}'
Expand Down Expand Up @@ -1808,3 +1810,21 @@ def test_server_timing_transaction_duration(defaultenv, metapostgrest):
]

assert 2000 <= response_dur < 3000


def test_positive_pool_metric(defaultenv):
"When a network failure is caused on the pg connection, pgrst_db_pool_available stays positive"

with run_pgproxy(defaultenv, proxy_timeout="1ms") as pgproxyhost:
env = {**defaultenv, "PGHOST": pgproxyhost}

with run(env=env, wait_for=Admin.live) as postgrest:
response = postgrest.admin.get("/metrics", timeout=1)
assert response.status_code == 200

metrics = float(
re.search(
r"pgrst_db_pool_available (-?\d+(?:\.\d+)?)", response.text
).group(1)
)
assert metrics >= 0
Loading
Loading