Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions kmock/_internal/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,21 @@ async def __call__(self, raw_request: aiohttp.web.BaseRequest) -> aiohttp.web.St
# Mind that both the user-provided callbacks and our own code can fail.
except Exception as e:
# Outside of context managers, let the caller (e.g. a web server) deal with the failure.
if not self._errors:
# Exception: KubernetesError should always be rendered properly with correct status codes.
try:
# Import here to avoid circular dependency (k8s imports apps), ideally the error
# classes would be defined in their own module.
from kmock._internal.k8s import KubernetesError
is_k8s_error = isinstance(e, KubernetesError)
except ImportError:
is_k8s_error = False

if not self._errors and not is_k8s_error:
raise

# Inside the context managers, accumulate the errors and re-raise at exiting.
self._errors[-1].append(e)
if self._errors:
self._errors[-1].append(e)

# If the connection already got some traffic, reuse that stream. If not, respond anew.
if raw_response is None or not raw_response.prepared:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ test = [
"pytest-asyncio",
"pytest-mock",
"pytest-timeout",
"kubernetes",
]
lint = [
{include-group = "test"},
Expand Down
107 changes: 107 additions & 0 deletions tests/dynamicclient/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import pytest
from kubernetes import client
from kubernetes.dynamic import DynamicClient
from kmock import KubernetesEmulator
from typing import Optional
import threading
import asyncio
import time

from aiohttp import web


# === HTTP.CLIENT MONKEY PATCH ===
# # Monkey-patch HTTPResponse to log response body
# # Useful for debugging HTTP traffic in tests
# import http.client as http_client
# _original_read = http_client.HTTPResponse.read
#
# def _logged_read(self, amt=None):
# data = _original_read(self, amt)
# if data:
# print(f"RESPONSE: {data.decode('utf-8').replace("\n", " ")}")
# return data
#
# http_client.HTTPResponse.read = _logged_read

class KMockServer:
def __init__(self, handler, host: str = '127.0.0.1', port: int = 8080):
self.host = host
self.port = port
self.server_thread: Optional[threading.Thread] = None
self.loop: Optional[asyncio.AbstractEventLoop] = None
self.runner: Optional[web.AppRunner] = None
self.handler = handler

def start(self):
"""Start kmock server in background thread"""

def run_server():
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

app = web.Application()
app.router.add_route('*', '/{path:.*}', self.handler)

self.runner = web.AppRunner(app)
self.loop.run_until_complete(self.runner.setup())
site = web.TCPSite(self.runner, self.host, self.port)
self.loop.run_until_complete(site.start())
self.loop.run_forever()

self.server_thread = threading.Thread(target=run_server, daemon=True)
self.server_thread.start()
time.sleep(1) # Wait for server to be ready

def stop(self):
"""Stop kmock server"""
if self.loop and self.runner:
# Schedule cleanup and stop in the event loop
future = asyncio.run_coroutine_threadsafe(self.runner.cleanup(), self.loop)
# Wait for cleanup to complete
try:
future.result(timeout=5)
except Exception:
pass
# Stop the event loop
self.loop.call_soon_threadsafe(self.loop.stop)
# Wait for thread to finish
if self.server_thread:
self.server_thread.join(timeout=5)


@pytest.fixture(scope="function")
def kmock_handler():
handler = KubernetesEmulator()
handler.resources['apps/v1/deployments'] = {
'kind': 'Deployment',
'namespaced': True,
}
return handler

@pytest.fixture(scope="function")
def kmock_server(kmock_handler):
server = KMockServer(kmock_handler)
server.start()
yield f"http://{server.host}:{server.port}"
server.stop()


@pytest.fixture(scope="function")
def k8s_client(kmock_server):
print(f"\n{'='*80}")
print(f"Creating k8s_client for kmock_server: {kmock_server}")
print(f"{'='*80}\n")

configuration = client.Configuration()
configuration.host = kmock_server
configuration.verify_ssl = False
configuration.debug = False # Enable debug mode

api_client = client.ApiClient(configuration)

# Create DynamicClient with cache disabled to force fresh discovery
print("Creating DynamicClient (cache disabled)...")
dyn_client = DynamicClient(api_client, cache_file=None)

return dyn_client
113 changes: 113 additions & 0 deletions tests/dynamicclient/test_dynamicclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import pytest
import logging
from kubernetes.dynamic.exceptions import ResourceNotFoundError, NotFoundError, ConflictError

logger = logging.getLogger(__name__)


def test_resource_not_found_error(k8s_client):
with pytest.raises(ResourceNotFoundError):
result = k8s_client.resources.get(api_version='v1', kind='PodWithATypo200')
raise AssertionError(f"Expected ResourceNotFoundError but got result: {result}")
with pytest.raises(ResourceNotFoundError):
result = k8s_client.resources.get(api_version='randomApiVersion/v100', kind='randomKind3')
raise AssertionError(f"Expected ResourceNotFoundError but got result: {result}")


def test_crud(k8s_client):
v1_deployment = k8s_client.resources.get(api_version='apps/v1', kind='Deployment')

# Step 1: Verify resource doesn't exist
with pytest.raises(NotFoundError):
v1_deployment.get(name='crud-test-deployment', namespace='test-namespace')

# Step 2: Create deployment
deployment = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "crud-test-deployment",
"namespace": "test-namespace",
"labels": {
"original-label": "original-value"
}
},
"spec": {
"replicas": 1,
"selector": {
"matchLabels": {
"app": "crud-test"
}
},
"template": {
"metadata": {
"labels": {
"app": "crud-test"
}
},
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:1.14.2"
}]
}
}
}
}

v1_deployment.create(body=deployment, namespace='test-namespace')

# Step 3: Try to create the same deployment again
with pytest.raises(ConflictError):
v1_deployment.create(body=deployment, namespace='test-namespace')

# Step 4: Get and verify creation worked
retrieved = v1_deployment.get(name='crud-test-deployment', namespace='test-namespace')
assert retrieved.metadata.name == 'crud-test-deployment'
assert retrieved.metadata.labels['original-label'] == 'original-value'
assert retrieved.spec.replicas == 1
assert retrieved.spec.template.spec.containers[0].image == 'nginx:1.14.2'

# Step 5: Patch (update) deployment
patch = {
"metadata": {
"labels": {
"new-label": "new-value",
"original-label": "updated-value"
}
},
"spec": {
"replicas": 3,
"template": {
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:1.21.0"
}]
}
}
}
}

patched = v1_deployment.patch(body=patch, name='crud-test-deployment', namespace='test-namespace')

# Step 6: Get and verify update worked
updated = v1_deployment.get(name='crud-test-deployment', namespace='test-namespace')

# Verify existing fields not in patch are preserved
assert updated.spec.selector.matchLabels['app'] == 'crud-test'

# Verify new field was added
assert updated.metadata.labels['new-label'] == 'new-value'

# Verify existing fields in patch were updated
assert updated.metadata.labels['original-label'] == 'updated-value'
assert updated.spec.replicas == 3
assert updated.spec.template.spec.containers[0].image == 'nginx:1.21.0'

# Step 7: Delete deployment
v1_deployment.delete(name='crud-test-deployment', namespace='test-namespace')

# Step 8: Verify deletion
with pytest.raises(NotFoundError):
v1_deployment.get(name='crud-test-deployment', namespace='test-namespace')