Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions detection_rules/rule_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,20 @@ def filter(self, cb: Callable[[DeprecatedRule], bool]) -> "RuleCollection":
return filtered_collection


@dataclass
class RawDeprecatedCollection(BaseCollection[DictRule]):
"""Collection of loaded deprecated rules in raw dict form."""

id_map: dict[str, DictRule] = field(default_factory=dict) # type: ignore[reportUnknownVariableType]
file_map: dict[Path, DictRule] = field(default_factory=dict) # type: ignore[reportUnknownVariableType]
name_map: dict[str, DictRule] = field(default_factory=dict) # type: ignore[reportUnknownVariableType]
rules: list[DictRule] = field(default_factory=list) # type: ignore[reportUnknownVariableType]

def __contains__(self, rule: DictRule) -> bool:
"""Check if a rule is in the map by comparing IDs."""
return rule.id in self.id_map


class RawRuleCollection(BaseCollection[DictRule]):
"""Collection of rules in raw dict form."""

Expand All @@ -213,12 +227,16 @@ def __init__(self, rules: list[DictRule] | None = None, ext_patterns: list[str]
self.file_map: dict[Path, DictRule] = {}
self.name_map: dict[definitions.RuleName, DictRule] = {}
self.rules: list[DictRule] = []
self.deprecated: RawDeprecatedCollection = RawDeprecatedCollection()
self.errors: dict[Path, Exception] = {}
self.frozen = False

self._raw_load_cache: dict[Path, dict[str, Any]] = {}
for rule in rules or []:
self.add_rule(rule)
if self.is_deprecated_rule(rule):
self.add_deprecated_rule(rule)
else:
self.add_rule(rule)

def __contains__(self, rule: DictRule) -> bool:
"""Check if a rule is in the map by comparing IDs."""
Expand Down Expand Up @@ -260,11 +278,21 @@ def _get_paths(self, directory: Path, recursive: bool = True) -> list[Path]:
paths.extend(sorted(directory.rglob(pattern) if recursive else directory.glob(pattern)))
return paths

def _assert_new(self, rule: DictRule) -> None:
@staticmethod
def is_deprecated_rule(rule: DictRule) -> bool:
"""Return whether a raw rule dict represents a deprecated rule."""
return rule.metadata.get("maturity") == "deprecated"

def _assert_new(self, rule: DictRule, is_deprecated: bool = False) -> None:
"""Assert that a rule is new and can be added to the collection."""
id_map = self.id_map
file_map = self.file_map
name_map = self.name_map
if is_deprecated:
id_map = self.deprecated.id_map
file_map = self.deprecated.file_map
name_map = self.deprecated.name_map
else:
id_map = self.id_map
file_map = self.file_map
name_map = self.name_map

if self.frozen:
raise ValueError(f"Unable to add rule {rule.name} {rule.id} to a frozen collection")
Expand All @@ -288,10 +316,20 @@ def add_rule(self, rule: DictRule) -> None:
self.name_map[rule.name] = rule
self.rules.append(rule)

def add_deprecated_rule(self, rule: DictRule) -> None:
"""Add a deprecated rule to the collection."""
self._assert_new(rule, is_deprecated=True)
self.deprecated.id_map[rule.id] = rule
self.deprecated.name_map[rule.name] = rule
self.deprecated.rules.append(rule)

def load_dict(self, obj: dict[str, Any], path: Path | None = None) -> DictRule:
"""Load a rule from a dictionary."""
rule = DictRule(contents=obj, path=path)
self.add_rule(rule)
if self.is_deprecated_rule(rule):
self.add_deprecated_rule(rule)
else:
self.add_rule(rule)
return rule
Comment on lines 281 to 333

def load_file(self, path: Path) -> DictRule:
Expand All @@ -304,6 +342,10 @@ def load_file(self, path: Path) -> DictRule:
rule = self.__default.file_map[path]
self.add_rule(rule)
return rule
if self.__default and self is not self.__default and path in self.__default.deprecated.file_map:
rule = self.__default.deprecated.file_map[path]
self.add_deprecated_rule(rule)
return rule

obj = self._load_rule_file(path)
return self.load_dict(obj, path=path)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.6.53"
version = "1.6.54"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
Loading