Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2a9f84e
feat: add async support for PostgreSQL janitor and loader
tboy1337 Mar 13, 2026
526ee6c
test: enhance async tests for Database Janitor and Loader
tboy1337 Mar 13, 2026
2258c81
fix: improve error handling for missing optional dependencies in asyn…
tboy1337 Mar 13, 2026
5c92310
fix: resolve double plugin registration, add asyncio_mode and default…
tboy1337 Mar 13, 2026
ea71a07
Revert "fix: resolve double plugin registration, add asyncio_mode and…
tboy1337 Mar 13, 2026
990496a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 13, 2026
fe8c553
fix: enhance async fixture and retry tests
tboy1337 Mar 13, 2026
1384100
Merge branch 'async' of https://github.com/tboy1337/pytest-postgresql…
tboy1337 Mar 13, 2026
f16583a
refactor: update connection handling in AsyncDatabaseJanitor
tboy1337 Mar 13, 2026
38a5ae4
fix: add space in SQL query for connection termination
tboy1337 Mar 13, 2026
bdca11e
refactor: improve PostgreSQLExecutor and AsyncDatabaseJanitor handling
tboy1337 Mar 14, 2026
2267c00
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 14, 2026
717e81f
revert: remove Windows-specific changes deferred to PR-1182
tboy1337 Mar 14, 2026
685cb57
fix: resolve merge conflicts by keeping HEAD (exclude Windows changes…
tboy1337 Mar 14, 2026
83c7bae
refactor: enhance SQL query handling in DatabaseJanitor and AsyncData…
tboy1337 Mar 14, 2026
e9911ea
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 14, 2026
c23b364
fix: improve error handling in postgresql_async fixture
tboy1337 Mar 14, 2026
17d37ea
Merge branch 'async' of https://github.com/tboy1337/pytest-postgresql…
tboy1337 Mar 14, 2026
bfbff75
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 14, 2026
e2d10f2
fix: enhance postgresql_async fixture to provide synchronous stub whe…
tboy1337 Mar 14, 2026
24ec05d
Resolve merge conflict in factories/client.py: drop redundant in-body…
tboy1337 Mar 14, 2026
b40ac9d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions newsfragments/1235.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added async PostgreSQL fixture support via ``postgresql_async`` factory and ``AsyncDatabaseJanitor``.
Added configurable fixture ``scope`` parameter to ``postgresql``, ``postgresql_async``, ``postgresql_proc``, and ``postgresql_noproc`` factories (defaults preserved: ``"function"`` for client fixtures, ``"session"`` for process fixtures).
Added optional ``async`` extra (``pip install pytest-postgresql[async]``) providing ``pytest-asyncio`` and ``aiofiles`` dependencies.
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ dependencies = [
]
requires-python = ">= 3.10"

[project.optional-dependencies]
async = [
"pytest-asyncio >= 0.21",
"aiofiles >= 23.0"
]

[project.urls]
"Source" = "https://github.com/dbfixtures/pytest-postgresql"
"Bug Tracker" = "https://github.com/dbfixtures/pytest-postgresql/issues"
Expand Down
4 changes: 2 additions & 2 deletions pytest_postgresql/factories/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
# along with pytest-postgresql. If not, see <http://www.gnu.org/licenses/>.
"""Fixture factories for postgresql fixtures."""

from pytest_postgresql.factories.client import postgresql
from pytest_postgresql.factories.client import postgresql, postgresql_async
from pytest_postgresql.factories.noprocess import postgresql_noproc
from pytest_postgresql.factories.process import PortType, postgresql_proc

__all__ = ("postgresql_proc", "postgresql_noproc", "postgresql", "PortType")
__all__ = ("postgresql_proc", "postgresql_noproc", "postgresql", "postgresql_async", "PortType")
81 changes: 76 additions & 5 deletions pytest_postgresql/factories/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,44 @@
# along with pytest-postgresql. If not, see <http://www.gnu.org/licenses/>.
"""Fixture factory for postgresql client."""

from typing import Callable, Iterator
from typing import AsyncIterator, Callable, Iterator

import psycopg
import pytest
from psycopg import Connection
from psycopg import AsyncConnection, Connection
from pytest import FixtureRequest

try:
import pytest_asyncio
except ImportError:
pytest_asyncio = None # type: ignore[assignment]

from pytest_postgresql.config import get_config
from pytest_postgresql.executor import PostgreSQLExecutor
from pytest_postgresql.executor_noop import NoopExecutor
from pytest_postgresql.janitor import DatabaseJanitor
from pytest_postgresql.janitor import AsyncDatabaseJanitor, DatabaseJanitor
from pytest_postgresql.types import FixtureScopeT


def postgresql(
process_fixture_name: str,
dbname: str | None = None,
isolation_level: "psycopg.IsolationLevel | None" = None,
scope: FixtureScopeT = "function",
) -> Callable[[FixtureRequest], Iterator[Connection]]:
"""Return connection fixture factory for PostgreSQL.

:param process_fixture_name: name of the process fixture
:param dbname: database name
:param isolation_level: optional postgresql isolation level
defaults to server's default
:param scope: fixture scope; by default "function" which is recommended.
:returns: function which makes a connection to postgresql
"""

@pytest.fixture
@pytest.fixture(scope=scope)
def postgresql_factory(request: FixtureRequest) -> Iterator[Connection]:
"""Fixture factory for PostgreSQL.
"""Fixture connection factory for PostgreSQL.

:param request: fixture request object
:returns: postgresql client
Expand Down Expand Up @@ -85,3 +93,66 @@ def postgresql_factory(request: FixtureRequest) -> Iterator[Connection]:
db_connection.close()

return postgresql_factory


def postgresql_async(
process_fixture_name: str,
dbname: str | None = None,
isolation_level: "psycopg.IsolationLevel | None" = None,
scope: FixtureScopeT = "function",
) -> Callable[[FixtureRequest], AsyncIterator[AsyncConnection]]:
"""Return async connection fixture factory for PostgreSQL.

:param process_fixture_name: name of the process fixture
:param dbname: database name
:param isolation_level: optional postgresql isolation level
defaults to server's default
:param scope: fixture scope; by default "function" which is recommended.
:returns: function which makes an async connection to postgresql
"""
if pytest_asyncio is None:
raise ImportError(
"pytest-asyncio is required for async fixtures. Install it with: pip install pytest-postgresql[async]"
)

@pytest_asyncio.fixture(scope=scope, loop_scope=scope)
async def postgresql_async_factory(request: FixtureRequest) -> AsyncIterator[AsyncConnection]:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"""Async connection fixture factory for PostgreSQL.

:param request: fixture request object
:returns: postgresql async client
"""
proc_fixture: PostgreSQLExecutor | NoopExecutor = request.getfixturevalue(process_fixture_name)
config = get_config(request)

pg_host = proc_fixture.host
pg_port = proc_fixture.port
pg_user = proc_fixture.user
pg_password = proc_fixture.password
pg_options = proc_fixture.options
pg_db = dbname or proc_fixture.dbname
janitor = AsyncDatabaseJanitor(
user=pg_user,
host=pg_host,
port=pg_port,
dbname=pg_db,
template_dbname=proc_fixture.template_dbname,
version=proc_fixture.version,
password=pg_password,
isolation_level=isolation_level,
)
if config.drop_test_database:
await janitor.drop()
async with janitor:
db_connection: AsyncConnection = await AsyncConnection.connect(
dbname=pg_db,
user=pg_user,
password=pg_password,
host=pg_host,
port=pg_port,
options=pg_options,
)
yield db_connection
await db_connection.close()

return postgresql_async_factory
5 changes: 4 additions & 1 deletion pytest_postgresql/factories/noprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from pytest_postgresql.config import get_config
from pytest_postgresql.executor_noop import NoopExecutor
from pytest_postgresql.janitor import DatabaseJanitor
from pytest_postgresql.types import FixtureScopeT


def xdistify_dbname(dbname: str) -> str:
Expand All @@ -46,6 +47,7 @@ def postgresql_noproc(
options: str = "",
load: list[Callable | str | Path] | None = None,
depends_on: str | None = None,
scope: FixtureScopeT = "session",
) -> Callable[[FixtureRequest], Iterator[NoopExecutor]]:
"""Postgresql noprocess factory.

Expand All @@ -57,10 +59,11 @@ def postgresql_noproc(
:param options: Postgresql connection options
:param load: List of functions used to initialize database's template.
:param depends_on: Optional name of the fixture to depend on.
:param scope: fixture scope; by default "session" which is recommended.
:returns: function which makes a postgresql process
"""

@pytest.fixture(scope="session")
@pytest.fixture(scope=scope)
def postgresql_noproc_fixture(request: FixtureRequest) -> Iterator[NoopExecutor]:
"""Noop Process fixture for PostgreSQL.

Expand Down
5 changes: 4 additions & 1 deletion pytest_postgresql/factories/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from pytest_postgresql.exceptions import ExecutableMissingException
from pytest_postgresql.executor import PostgreSQLExecutor
from pytest_postgresql.janitor import DatabaseJanitor
from pytest_postgresql.types import FixtureScopeT

PortType = port_for.PortType # mypy requires explicit export

Expand Down Expand Up @@ -81,6 +82,7 @@ def postgresql_proc(
unixsocketdir: str | None = None,
postgres_options: str | None = None,
load: list[Callable | str | Path] | None = None,
scope: FixtureScopeT = "session",
) -> Callable[[FixtureRequest, TempPathFactory], Iterator[PostgreSQLExecutor]]:
"""Postgresql process factory.

Expand All @@ -101,10 +103,11 @@ def postgresql_proc(
:param unixsocketdir: directory to create postgresql's unixsockets
:param postgres_options: Postgres executable options for use by pg_ctl
:param load: List of functions used to initialize database's template.
:param scope: fixture scope; by default "session" which is recommended.
:returns: function which makes a postgresql process
"""

@pytest.fixture(scope="session")
@pytest.fixture(scope=scope)
def postgresql_proc_fixture(
request: FixtureRequest, tmp_path_factory: TempPathFactory
) -> Iterator[PostgreSQLExecutor]:
Expand Down
161 changes: 156 additions & 5 deletions pytest_postgresql/janitor.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
"""Database Janitor."""

from contextlib import contextmanager
import inspect
from contextlib import asynccontextmanager, contextmanager
from pathlib import Path
from types import TracebackType
from typing import Callable, Iterator, Type, TypeVar
from typing import AsyncIterator, Callable, Iterator, Type, TypeVar

import psycopg
from packaging.version import parse
from psycopg import Connection, Cursor
from psycopg import AsyncCursor, Connection, Cursor

from pytest_postgresql.loader import build_loader
from pytest_postgresql.retry import retry
from pytest_postgresql.loader import build_loader, build_loader_async
from pytest_postgresql.retry import retry, retry_async

Version = type(parse("1"))


DatabaseJanitorType = TypeVar("DatabaseJanitorType", bound="DatabaseJanitor")
AsyncDatabaseJanitorType = TypeVar("AsyncDatabaseJanitorType", bound="AsyncDatabaseJanitor")


class DatabaseJanitor:
Expand Down Expand Up @@ -164,3 +166,152 @@ def __exit__(
) -> None:
"""Exit from Database janitor context cleaning after itself."""
self.drop()


class AsyncDatabaseJanitor:
"""Manage database state asynchronously for specific tasks."""

def __init__(
self,
*,
user: str,
host: str,
port: str | int,
version: str | float | Version, # type: ignore[valid-type]
dbname: str,
template_dbname: str | None = None,
as_template: bool = False,
password: str | None = None,
isolation_level: "psycopg.IsolationLevel | None" = None,
connection_timeout: int = 60,
) -> None:
"""Initialize async janitor.

:param user: postgresql username
:param host: postgresql host
:param port: postgresql port
:param dbname: database name
:param template_dbname: template database name to clone from
:param as_template: whether to mark the database as a template
:param version: postgresql version number
:param password: optional postgresql password
:param isolation_level: optional postgresql isolation level
defaults to server's default
:param connection_timeout: how long to retry connection before
raising a TimeoutError
"""
self.user = user
self.password = password
self.host = host
self.port = port
self.dbname = dbname
self.template_dbname = template_dbname
self.as_template = as_template
self._connection_timeout = connection_timeout
self.isolation_level = isolation_level
if not isinstance(version, Version):
self.version = parse(str(version))
else:
self.version = version

async def init(self) -> None:
"""Create database in postgresql."""
async with self.cursor() as cur:
if self.template_dbname:
# And make sure no-one is left connected to the template database.
# Otherwise, Creating database from template will fail
await self._terminate_connection(cur, self.template_dbname)
query = f'CREATE DATABASE "{self.dbname}" TEMPLATE "{self.template_dbname}"'
else:
query = f'CREATE DATABASE "{self.dbname}"'

if self.as_template:
query += " IS_TEMPLATE = true"

await cur.execute(f"{query};")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

def is_template(self) -> bool:
"""Determine whether the AsyncDatabaseJanitor maintains template or database."""
return self.as_template

async def drop(self) -> None:
"""Drop database in postgresql."""
# We cannot drop the database while there are connections to it, so we
# terminate all connections first while not allowing new connections.
async with self.cursor() as cur:
await self._dont_datallowconn(cur, self.dbname)
await self._terminate_connection(cur, self.dbname)
if self.as_template:
await cur.execute(f'ALTER DATABASE "{self.dbname}" with is_template false;')
await cur.execute(f'DROP DATABASE IF EXISTS "{self.dbname}";')

@staticmethod
async def _dont_datallowconn(cur: AsyncCursor, dbname: str) -> None: # type: ignore[type-arg]
await cur.execute(f'ALTER DATABASE "{dbname}" with allow_connections false;')

@staticmethod
async def _terminate_connection(cur: AsyncCursor, dbname: str) -> None: # type: ignore[type-arg]
await cur.execute(
"SELECT pg_terminate_backend(pg_stat_activity.pid)"
"FROM pg_stat_activity "
"WHERE pg_stat_activity.datname = %s;",
(dbname,),
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

async def load(self, load: Callable | str | Path) -> None:
"""Load data into a database.

Expects:

* a Path to sql file, that'll be loaded
* an import path to import callable
* a callable that expects: host, port, user, dbname and password arguments.

"""
_loader = build_loader_async(load)
result = _loader(
host=self.host,
port=self.port,
user=self.user,
dbname=self.dbname,
password=self.password,
)
if inspect.isawaitable(result):
await result

@asynccontextmanager
async def cursor(self, dbname: str = "postgres") -> AsyncIterator[AsyncCursor]: # type: ignore[type-arg]
"""Return postgresql async cursor."""

async def connect() -> psycopg.AsyncConnection:
return await psycopg.AsyncConnection.connect(
dbname=dbname,
user=self.user,
password=self.password,
host=self.host,
port=self.port,
)

conn = await retry_async(connect, timeout=self._connection_timeout, possible_exception=psycopg.OperationalError)
try:
conn.isolation_level = self.isolation_level
conn.autocommit = True
# We must not run a transaction since we create a database.
async with conn.cursor() as cur:
yield cur
finally:
await conn.close()

async def __aenter__(self: AsyncDatabaseJanitorType) -> AsyncDatabaseJanitorType:
"""Initialize Async Database Janitor."""
await self.init()
return self

async def __aexit__(
self: AsyncDatabaseJanitorType,
exc_type: Type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit from Async Database Janitor context cleaning after itself."""
await self.drop()
Loading