Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/country_workspace/admin/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def clean_record(self) -> dict:
@admin.register(Transformer)
class TransformerAdmin(BaseModelAdmin):
readonly_fields = ("created_at", "last_modified", "created_by")
list_display = ("name", "office", "created_by", "created_at", "last_modified")
list_display = ("name", "engine", "office", "created_by", "created_at", "last_modified")
list_filter = (
("office", AutoCompleteFilter),
("created_by", AutoCompleteFilter),
Expand All @@ -61,6 +61,7 @@ def get_fields(self, request: HttpRequest, obj: Transformer | None = None) -> tu
"name",
"description",
"office",
"engine",
)
tail_fields = (
"created_by",
Expand Down Expand Up @@ -113,6 +114,7 @@ def edit_and_verify(self, request: HttpRequest, pk: str) -> HttpResponse:
name=obj.name,
description=obj.description,
office=obj.office,
engine=obj.engine,
value_transformations=code,
)
try:
Expand Down
20 changes: 20 additions & 0 deletions src/country_workspace/migrations/0043_transformer_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("country_workspace", "0042_alter_transformer_value_transformations"),
]

operations = [
migrations.AddField(
model_name="transformer",
name="engine",
field=models.CharField(
choices=[("JAVASCRIPT", "JavaScript"), ("STEFICON", "Steficon Python")],
default="JAVASCRIPT",
help_text="Formula engine used to transform records.",
max_length=20,
),
),
]
43 changes: 38 additions & 5 deletions src/country_workspace/models/transformer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import logging
from typing import Any
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.utils.translation import gettext_lazy as _

from country_workspace.models.base import BaseModel
from country_workspace.utils.js_executor import JavaScriptExecutor
from country_workspace.validators.mapping import ValueTransformationRulesValidator
from country_workspace.utils.steficon_executor import SteficonExecutor
from country_workspace.validators.mapping import SteficonTransformationRulesValidator, ValueTransformationRulesValidator

logger = logging.getLogger(__name__)


class Transformer(BaseModel):
class Engine(models.TextChoices):
JAVASCRIPT = "JAVASCRIPT", _("JavaScript")
STEFICON = "STEFICON", _("Steficon Python")

name = models.CharField(max_length=255)
description = models.CharField(max_length=255, blank=True)
office = models.ForeignKey(
Expand All @@ -20,13 +26,19 @@ class Transformer(BaseModel):
related_name="transformers",
help_text=_("Business Area (Office) this transformer belongs to"),
)
engine = models.CharField(
max_length=20,
choices=Engine.choices,
default=Engine.JAVASCRIPT,
help_text=_("Formula engine used to transform records."),
)
value_transformations = models.TextField(
blank=True,
default="",
validators=[ValueTransformationRulesValidator()],
help_text=_(
"Value transformation rules (JavaScript). "
"Example: `function transform(record) { record['sex'] = 'Male'; return record; }`"
"Value transformation formula."
" JavaScript example: `function transform(record) { record['sex'] = 'Male'; return record; }`."
" Steficon example: `result.value = context['record']; result.value['sex'] = 'MALE'`."
),
)
created_at = models.DateTimeField(auto_now_add=True)
Expand All @@ -42,13 +54,34 @@ class Meta:
def __str__(self) -> str:
return self.name

def clean(self) -> None:
super().clean()
code = self.value_transformations or ""
if not code.strip():
return

if self.engine == self.Engine.STEFICON:
try:
SteficonTransformationRulesValidator()(code)
except ValidationError as exc:
raise ValidationError({"value_transformations": exc.messages}) from exc
return

try:
ValueTransformationRulesValidator()(code)
except ValidationError as exc:
raise ValidationError({"value_transformations": exc.messages}) from exc

def apply(self, data: dict[str, Any]) -> dict[str, Any]:
"""Apply value transformations to the data dictionary."""
if not self.value_transformations:
return data

try:
executor = JavaScriptExecutor(data, self.value_transformations)
if self.engine == self.Engine.STEFICON:
executor = SteficonExecutor(data, self.value_transformations)
else:
executor = JavaScriptExecutor(data, self.value_transformations)
return executor.execute()
except Exception:
logger.exception("Error applying value transformations")
Expand Down
93 changes: 93 additions & 0 deletions src/country_workspace/utils/steficon_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from __future__ import annotations

from builtins import __build_class__ # noqa: A004
import datetime
from dataclasses import dataclass, field
from decimal import Decimal
import traceback
from typing import Any


class SteficonValidationError(Exception):
pass


@dataclass
class SteficonResult:
value: Any = 0
extra: dict[str, Any] = field(default_factory=dict)


class SteficonExecutor:
"""Execute Steficon-style Python formula code.

Formula receives:
- context: dict with "record"
- result: object exposing "value" and "extra"
"""

FORBIDDEN_TOKENS = (
"__import__",
"import ",
"\nimport ",
" from ",
" delete",
" save",
" eval",
" exec",
)

def __init__(self, data: dict[str, Any], code: str) -> None:
self.data = data
self.code = code

def execute(self) -> dict[str, Any]:
result = SteficonResult(value=dict(self.data))
context = {"record": dict(self.data)}
gl = self._globals()
locals_: dict[str, Any] = {
"context": context,
"result": result,
}
exec(self.code, gl, locals_) # noqa: S102

if isinstance(result.value, dict):
return result.value
if isinstance(context.get("record"), dict):
return context["record"]
raise SteficonValidationError("Steficon formula must produce a dict in result.value or context['record']")

@classmethod
def validate(cls, code: str) -> None:
if any(token in code for token in cls.FORBIDDEN_TOKENS):
raise SteficonValidationError("Steficon formula contains forbidden statements")
try:
compile(code, "<steficon_formula>", mode="exec")
except SyntaxError as exc:
tb = traceback.format_exc(limit=-1)
message = tb.split('<steficon_formula>", ')[-1]
raise SteficonValidationError(message) from exc

@staticmethod
def _globals() -> dict[str, Any]:
return {
"__builtins__": {
"__build_class__": __build_class__,
"__name__": __name__,
"date": datetime.date,
"datetime": datetime.datetime,
"timedelta": datetime.timedelta,
"Decimal": Decimal,
"complex": complex,
"dict": dict,
"float": float,
"frozenset": frozenset,
"int": int,
"list": list,
"memoryview": memoryview,
"range": range,
"set": set,
"str": str,
"tuple": tuple,
}
}
17 changes: 17 additions & 0 deletions src/country_workspace/validators/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from django.utils.translation import gettext_lazy as _

from country_workspace.utils.js_executor import JavaScriptExecutor
from country_workspace.utils.steficon_executor import SteficonExecutor, SteficonValidationError


@deconstructible
Expand Down Expand Up @@ -48,3 +49,19 @@ def __call__(self, value: str) -> None:
_("Invalid JavaScript code. Must contain a function definition."),
code="invalid_js",
)


@deconstructible
class SteficonTransformationRulesValidator:
"""Validate Steficon-style Python formula code."""

def __call__(self, value: str) -> None:
if not value.strip():
return
try:
SteficonExecutor.validate(value)
except SteficonValidationError as exc:
raise ValidationError(
_("Invalid Steficon formula. %(error)s") % {"error": str(exc)},
code="invalid_steficon",
) from exc
Loading
Loading