diff --git a/kmock/_internal/apps.py b/kmock/_internal/apps.py index 6c9f394..95e6f4a 100644 --- a/kmock/_internal/apps.py +++ b/kmock/_internal/apps.py @@ -305,11 +305,21 @@ async def __call__(self, raw_request: aiohttp.web.BaseRequest) -> aiohttp.web.St # Mind that both the user-provided callbacks and our own code can fail. except Exception as e: # Outside of context managers, let the caller (e.g. a web server) deal with the failure. - if not self._errors: + # Exception: KubernetesError should always be rendered properly with correct status codes. + try: + # Import here to avoid circular dependency (k8s imports apps), ideally the error + # classes would be defined in their own module. + from kmock._internal.k8s import KubernetesError + is_k8s_error = isinstance(e, KubernetesError) + except ImportError: + is_k8s_error = False + + if not self._errors and not is_k8s_error: raise # Inside the context managers, accumulate the errors and re-raise at exiting. - self._errors[-1].append(e) + if self._errors: + self._errors[-1].append(e) # If the connection already got some traffic, reuse that stream. If not, respond anew. if raw_response is None or not raw_response.prepared: diff --git a/pyproject.toml b/pyproject.toml index c4b7d32..d0de5d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,7 @@ test = [ "pytest-asyncio", "pytest-mock", "pytest-timeout", + "kubernetes", ] lint = [ {include-group = "test"}, diff --git a/tests/dynamicclient/conftest.py b/tests/dynamicclient/conftest.py new file mode 100644 index 0000000..aa7b780 --- /dev/null +++ b/tests/dynamicclient/conftest.py @@ -0,0 +1,107 @@ +import pytest +from kubernetes import client +from kubernetes.dynamic import DynamicClient +from kmock import KubernetesEmulator +from typing import Optional +import threading +import asyncio +import time + +from aiohttp import web + + +# === HTTP.CLIENT MONKEY PATCH === +# # Monkey-patch HTTPResponse to log response body +# # Useful for debugging HTTP traffic in tests +# import http.client as http_client +# _original_read = http_client.HTTPResponse.read +# +# def _logged_read(self, amt=None): +# data = _original_read(self, amt) +# if data: +# print(f"RESPONSE: {data.decode('utf-8').replace("\n", " ")}") +# return data +# +# http_client.HTTPResponse.read = _logged_read + +class KMockServer: + def __init__(self, handler, host: str = '127.0.0.1', port: int = 8080): + self.host = host + self.port = port + self.server_thread: Optional[threading.Thread] = None + self.loop: Optional[asyncio.AbstractEventLoop] = None + self.runner: Optional[web.AppRunner] = None + self.handler = handler + + def start(self): + """Start kmock server in background thread""" + + def run_server(): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + app = web.Application() + app.router.add_route('*', '/{path:.*}', self.handler) + + self.runner = web.AppRunner(app) + self.loop.run_until_complete(self.runner.setup()) + site = web.TCPSite(self.runner, self.host, self.port) + self.loop.run_until_complete(site.start()) + self.loop.run_forever() + + self.server_thread = threading.Thread(target=run_server, daemon=True) + self.server_thread.start() + time.sleep(1) # Wait for server to be ready + + def stop(self): + """Stop kmock server""" + if self.loop and self.runner: + # Schedule cleanup and stop in the event loop + future = asyncio.run_coroutine_threadsafe(self.runner.cleanup(), self.loop) + # Wait for cleanup to complete + try: + future.result(timeout=5) + except Exception: + pass + # Stop the event loop + self.loop.call_soon_threadsafe(self.loop.stop) + # Wait for thread to finish + if self.server_thread: + self.server_thread.join(timeout=5) + + +@pytest.fixture(scope="function") +def kmock_handler(): + handler = KubernetesEmulator() + handler.resources['apps/v1/deployments'] = { + 'kind': 'Deployment', + 'namespaced': True, + } + return handler + +@pytest.fixture(scope="function") +def kmock_server(kmock_handler): + server = KMockServer(kmock_handler) + server.start() + yield f"http://{server.host}:{server.port}" + server.stop() + + +@pytest.fixture(scope="function") +def k8s_client(kmock_server): + print(f"\n{'='*80}") + print(f"Creating k8s_client for kmock_server: {kmock_server}") + print(f"{'='*80}\n") + + configuration = client.Configuration() + configuration.host = kmock_server + configuration.verify_ssl = False + configuration.debug = False # Enable debug mode + + api_client = client.ApiClient(configuration) + + # Create DynamicClient with cache disabled to force fresh discovery + print("Creating DynamicClient (cache disabled)...") + dyn_client = DynamicClient(api_client, cache_file=None) + + return dyn_client diff --git a/tests/dynamicclient/test_dynamicclient.py b/tests/dynamicclient/test_dynamicclient.py new file mode 100644 index 0000000..8d2a023 --- /dev/null +++ b/tests/dynamicclient/test_dynamicclient.py @@ -0,0 +1,113 @@ +import pytest +import logging +from kubernetes.dynamic.exceptions import ResourceNotFoundError, NotFoundError, ConflictError + +logger = logging.getLogger(__name__) + + +def test_resource_not_found_error(k8s_client): + with pytest.raises(ResourceNotFoundError): + result = k8s_client.resources.get(api_version='v1', kind='PodWithATypo200') + raise AssertionError(f"Expected ResourceNotFoundError but got result: {result}") + with pytest.raises(ResourceNotFoundError): + result = k8s_client.resources.get(api_version='randomApiVersion/v100', kind='randomKind3') + raise AssertionError(f"Expected ResourceNotFoundError but got result: {result}") + + +def test_crud(k8s_client): + v1_deployment = k8s_client.resources.get(api_version='apps/v1', kind='Deployment') + + # Step 1: Verify resource doesn't exist + with pytest.raises(NotFoundError): + v1_deployment.get(name='crud-test-deployment', namespace='test-namespace') + + # Step 2: Create deployment + deployment = { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "name": "crud-test-deployment", + "namespace": "test-namespace", + "labels": { + "original-label": "original-value" + } + }, + "spec": { + "replicas": 1, + "selector": { + "matchLabels": { + "app": "crud-test" + } + }, + "template": { + "metadata": { + "labels": { + "app": "crud-test" + } + }, + "spec": { + "containers": [{ + "name": "nginx", + "image": "nginx:1.14.2" + }] + } + } + } + } + + v1_deployment.create(body=deployment, namespace='test-namespace') + + # Step 3: Try to create the same deployment again + with pytest.raises(ConflictError): + v1_deployment.create(body=deployment, namespace='test-namespace') + + # Step 4: Get and verify creation worked + retrieved = v1_deployment.get(name='crud-test-deployment', namespace='test-namespace') + assert retrieved.metadata.name == 'crud-test-deployment' + assert retrieved.metadata.labels['original-label'] == 'original-value' + assert retrieved.spec.replicas == 1 + assert retrieved.spec.template.spec.containers[0].image == 'nginx:1.14.2' + + # Step 5: Patch (update) deployment + patch = { + "metadata": { + "labels": { + "new-label": "new-value", + "original-label": "updated-value" + } + }, + "spec": { + "replicas": 3, + "template": { + "spec": { + "containers": [{ + "name": "nginx", + "image": "nginx:1.21.0" + }] + } + } + } + } + + patched = v1_deployment.patch(body=patch, name='crud-test-deployment', namespace='test-namespace') + + # Step 6: Get and verify update worked + updated = v1_deployment.get(name='crud-test-deployment', namespace='test-namespace') + + # Verify existing fields not in patch are preserved + assert updated.spec.selector.matchLabels['app'] == 'crud-test' + + # Verify new field was added + assert updated.metadata.labels['new-label'] == 'new-value' + + # Verify existing fields in patch were updated + assert updated.metadata.labels['original-label'] == 'updated-value' + assert updated.spec.replicas == 3 + assert updated.spec.template.spec.containers[0].image == 'nginx:1.21.0' + + # Step 7: Delete deployment + v1_deployment.delete(name='crud-test-deployment', namespace='test-namespace') + + # Step 8: Verify deletion + with pytest.raises(NotFoundError): + v1_deployment.get(name='crud-test-deployment', namespace='test-namespace')