diff --git a/CHANGELOG.md b/CHANGELOG.md index 0559d71e..a3de0deb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # CHANGELOG +## 119.0.0 + +* Reimplement `InsensitiveSet` using standard python dicts and a more complete implementation of `MutableSet` & `Sequence`. Subtle (but hopefully unimportant) behaviour changes should be expected. +* `InsensitiveDict.keys()` now returns an `InsensitiveSet`. + ## 118.0.2 * Fix small bug in counting length of text messages with non-GSM and extended GSM characters diff --git a/notifications_utils/insensitive_dict.py b/notifications_utils/insensitive_dict.py index 8702f970..a7d1d6e3 100644 --- a/notifications_utils/insensitive_dict.py +++ b/notifications_utils/insensitive_dict.py @@ -1,6 +1,9 @@ +from abc import ABCMeta, abstractmethod +from collections.abc import Iterable, Iterator, MutableSet, Sequence, Set from functools import lru_cache - -from ordered_set import OrderedSet +from itertools import chain, islice +from types import NotImplementedType +from typing import Self, TypeVar, overload class InsensitiveDict(dict): @@ -32,7 +35,7 @@ def from_keys(cls, keys): return cls({key: key for key in keys}, overwrite_duplicates=False) def keys(self): - return OrderedSet(super().keys()) + return InsensitiveSet(self) def __getitem__(self, key): return super().__getitem__(self.make_key(key)) @@ -54,55 +57,346 @@ def as_dict_with_keys(self, keys): @staticmethod @lru_cache(maxsize=1_024, typed=False) # Corresponds to 1,000 column limit when reading Excel files - def make_key(original_key): + def make_key(original_key: str) -> str: if original_key is None: return None return original_key.translate(InsensitiveDict.KEY_TRANSLATION_TABLE).lower() -class InsensitiveSet(OrderedSet): - """ - `InsensitiveSet` behaves like a normal set, except: - - it is ordered - - it normalises case, whitespace, hypens and underscores in items +T = TypeVar("T") - In other words: - InsensitiveSet(['FIRST_NAME']) == InsensitiveSet(['first name']) - >>> True - Note that the provided case and spacing is preserved for presentation, so: - InsensitiveSet(['FIRST-name'])[0] - >>> 'FIRST-name' - """ +class AbstractInsensitiveSet(MutableSet[T], Sequence[T], metaclass=ABCMeta): + __slots__ = ("_inner",) + _inner: dict[T, T] - def __init__(self, iterable=None, /): - return super().__init__(InsensitiveDict.from_keys(iterable or ()).values()) + @staticmethod + @abstractmethod + def make_key(original_key: T) -> T: + return original_key - def __contains__(self, key): - return key in InsensitiveDict.from_keys(self) + def __init__(self, it: Iterable[T] | None = None, /): + self._inner = {} + self._add_inner_pairs((self.make_key(item), item) for item in (it or ())) + + @classmethod + def _from_inner_pairs(cls, inner_pairs: Iterable[tuple[T, T]]) -> Self: + new_set = cls() + new_set._add_inner_pairs(inner_pairs) + return new_set + + def _add_inner_pairs(self, inner_pairs: Iterable[tuple[T, T]]): + """ + Like a dict.update(...) for self._inner, but prioritising earlier values + """ + for k, v in inner_pairs: + if k not in self._inner: + self._inner[k] = v + + # Set[T] + + def __contains__(self, item) -> bool: + return self.make_key(item) in self._inner + + def __iter__(self) -> Iterator[T]: + return iter(self._inner.values()) + + def __len__(self) -> int: + return len(self._inner) + + # MutableSet[T] + + def add(self, item: T): + key = self.make_key(item) + if key not in self._inner: + self._inner[key] = item + + def discard(self, item: T): + self._inner.pop( + self.make_key(item), None + ) # faster than possibly raising then ignoring exception if not present + + # Sequence[T] + + @overload + def __getitem__(self, index: int) -> T: ... + + @overload + def __getitem__(self, index: slice) -> Self: ... + + def __getitem__(self, index): # noqa: C901 is bunk + length = len(self._inner) + + if isinstance(index, slice): + start, stop, step = index.start, index.stop, index.step + + if step is None: + step = 1 + if start is None: + start = 0 if step > 0 else length + if stop is None: + stop = length if step > 0 else -(length + 1) + + if start < 0: + start += length + if stop < 0: + stop += length + + if step < 0: + it = reversed(self._inner.items()) + start = max(length - (start + 1), 0) + stop = max(length - (stop + 1), 0) + else: + it = iter(self._inner.items()) + start = max(start, 0) + stop = max(stop, 0) + + return type(self)._from_inner_pairs(islice(it, start, stop, abs(step))) + + elif isinstance(index, int): + if index < 0: + index = index + length + if index < 0: + raise IndexError + elif index >= length: + raise IndexError + + if index > length // 2: + # faster to iterate to it backwards + return next(islice(reversed(self._inner.values()), length - (index + 1), length - index)) + + return next(islice(self._inner.values(), index, index + 1)) + + else: + raise TypeError + + # make __eq__ work with Iterables to match OrderedSet behaviour + + def __eq__(self, other) -> bool: + if not isinstance(other, Iterable): + return False + + if not isinstance(other, Set): + return tuple(self._inner.keys()) == tuple(self.make_key(item) for item in other) + + return super().__eq__(other) + + # Accelerate Sequence[T] + + def __reversed__(self) -> Iterator[T]: + return reversed(self._inner.values()) + + def index(self, item: T, start: int | None = 0, stop: int | None = None) -> int: + key = self.make_key(item) + for i, candidate in enumerate(islice(self._inner.keys(), start, stop), start or 0): + if candidate == key: + return i + + raise KeyError + + # Accelerate Set[T] + + def __le__(self, other: Set) -> bool | NotImplementedType: + if not isinstance(other, Set): + return NotImplemented + + other_set = other if type(self) is type(other) else type(self)(other) # type comparison deliberately strict + + return self._inner.keys() <= other_set._inner.keys() + + def __ge__(self, other: Set) -> bool | NotImplementedType: + if not isinstance(other, Set): + return NotImplemented + + other_set = other if type(self) is type(other) else type(self)(other) # type comparison deliberately strict + + return self._inner.keys() >= other_set._inner.keys() + + def __and__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented + + other_set = other if type(self) is type(other) else type(self)(other) # type comparison deliberately strict + + return type(self)._from_inner_pairs((k, v) for k, v in self._inner.items() if k in other_set._inner) + + def __rand__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented - def __eq__(self, other): - return not self ^ other + if type(self) is type(other): # type comparison deliberately strict + return type(self)._from_inner_pairs((k, v) for k, v in other._inner if k in self._inner) - def __le__(self, other): - return self.issubset(other) + # ensure the un-normalised values come from the RHS + return type(self)(item for item in other if item in self) - def __lt__(self, other): - return self <= other and not self == other + def __or__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented - def __sub__(self, other): - return self.difference(other) + new_set = type(self)._from_inner_pairs(self._inner.items()) - def index(self, key): - return InsensitiveDict.from_keys(self).keys().index(InsensitiveDict.make_key(key)) + if type(self) is type(other): # type comparison deliberately strict + new_set._add_inner_pairs(other._inner.items()) + else: + for value in other: + new_set.add(value) - def issubset(self, other): - return all(key in self.__class__(other) for key in self) + return new_set - def intersection(self, other): - other_as_same_type_as_self = self.__class__(other) - both = self | other_as_same_type_as_self - return self.__class__(item for item in both if item in self and item in other_as_same_type_as_self) + def __ror__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented - def difference(self, other): - return self.__class__(item for item in self if item not in self.__class__(other)) + if type(self) is type(other): # type comparison deliberately strict + new_set = type(self)._from_inner_pairs(other._inner.items()) + else: + new_set = type(self)(other) + + new_set._add_inner_pairs(self._inner.items()) + + return new_set + + def isdisjoint(self, other: Iterable) -> bool | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented + + if type(self) is type(other): # type comparison deliberately strict + return self._inner.keys().isdisjoint(other._inner.keys()) + + return not any(item in self for item in other) + + def __sub__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented + + other_set = other if type(self) is type(other) else type(self)(other) # type comparison deliberately strict + + return type(self)._from_inner_pairs((k, v) for k, v in self._inner.items() if k not in other_set._inner) + + def __rsub__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented + + if type(self) is type(other): # type comparison deliberately strict + return type(self)._from_inner_pairs((k, v) for k, v in other._inner.items() if k not in self._inner) + + return type(self)(item for item in other if item not in self) + + def __xor__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented + + other_set = other if type(self) is type(other) else type(self)(other) # type comparison deliberately strict + + return type(self)._from_inner_pairs( + chain( + ((k, v) for k, v in self._inner.items() if k not in other_set._inner), + ((k, v) for k, v in other_set._inner.items() if k not in self._inner), + ) + ) + + def __rxor__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented + + other_set = other if type(self) is type(other) else type(self)(other) # type comparison deliberately strict + + return type(self)._from_inner_pairs( + chain( + ((k, v) for k, v in other_set._inner.items() if k not in self._inner), + ((k, v) for k, v in self._inner.items() if k not in other_set._inner), + ) + ) + + # Accelerate MutableSet[T] + + def remove(self, item: T): + del self._inner[self.make_key(item)] + + def pop(self): + try: + return self._inner.pop(next(reversed(self._inner))) + except StopIteration as e: + raise KeyError from e + + def clear(self): + self._inner.clear() + + def __iand__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented + + other_set = other if type(self) is type(other) else type(self)(other) # type comparison deliberately strict + + for k in tuple(self._inner): # must take copy of keys so we can modify underlying dict during iteration + if k not in other_set._inner: + del self._inner[k] + + return self + + def __ixor__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented + + other_set = other if type(self) is type(other) else type(self)(other) # type comparison deliberately strict + + intersection = self & other_set + self -= intersection + other_set -= intersection + self |= other_set + + return self + + def __isub__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented + + if type(self) is type(other): # type comparison deliberately strict + for k in other._inner: + self._inner.pop(k, None) + + return super().__isub__(other) # type: ignore[arg-type] + + def __ior__(self, other: Iterable) -> Self | NotImplementedType: + if not isinstance(other, Iterable): + return NotImplemented + + if type(self) is type(other): # type comparison deliberately strict + self._add_inner_pairs(other._inner.items()) + + return super().__ior__(other) # type: ignore[arg-type] + + # only included because old InsensitiveSet implemented them (note builtins.set accepts *others) + + def issubset(self, other: Set) -> bool: + return self <= other + + def issuperset(self, other: Set) -> bool: + return self >= other + + def intersection(self, other: Iterable) -> Self: + return self & other + + def difference(self, other: Iterable) -> Self: + return self - other + + def union(self, other: Iterable) -> Self: + return self | other + + def symmetric_difference(self, other: Iterable) -> Self: + return self ^ other + + # generally helpful + + def __str__(self) -> str: + return f"{self.__class__.__name__}({list(self._inner.values())})" + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({list(self._inner.values())!r})" + + +class InsensitiveSet(AbstractInsensitiveSet[str]): + @staticmethod + def make_key(original_key: str) -> str: + return InsensitiveDict.make_key(original_key) diff --git a/notifications_utils/version.py b/notifications_utils/version.py index 3254a490..b43abee5 100644 --- a/notifications_utils/version.py +++ b/notifications_utils/version.py @@ -5,4 +5,4 @@ # - `make version-minor` for new features # - `make version-patch` for bug fixes -__version__ = "118.0.2" # 75ca400e566fa067cf59d21e92107e23 +__version__ = "119.0.0" # ec7671431faa48dda6ca81ab384771ec diff --git a/tests/test_insensitive_dict.py b/tests/test_insensitive_dict.py index e9658b94..7b6ff05b 100644 --- a/tests/test_insensitive_dict.py +++ b/tests/test_insensitive_dict.py @@ -204,8 +204,281 @@ def test_insensitive_set_difference(): assert foobar - barbaz == {"foo"} -def test_insensitive_set_symetric_difference(): +def test_insensitive_set_symmetric_difference(): foobar = InsensitiveSet(("foo", "bar", "FOO", "BAR")) barbaz = {"Bar", "B A Z"} assert foobar.symmetric_difference(barbaz) == {"foo", "B A Z"} assert foobar ^ barbaz == {"foo", "B A Z"} + + +def test_insensitive_set_pop(): + foobar = InsensitiveSet(("foo", "bar", "FOO", " BAR ", "baz")) + assert foobar.pop() == "baz" + assert tuple(foobar) == ("foo", "bar") + assert foobar.pop() == "bar" + assert tuple(foobar) == ("foo",) + assert foobar.pop() == "foo" + assert not foobar + + with pytest.raises(KeyError): + foobar.pop() + + +def test_insensitive_set_or_iterable(): + assert tuple(InsensitiveSet(f" {i}" for i in range(8)) | (f"{i} " for i in range(18, 4, -1))) == ( + " 0", + " 1", + " 2", + " 3", + " 4", + " 5", + " 6", + " 7", + "18 ", + "17 ", + "16 ", + "15 ", + "14 ", + "13 ", + "12 ", + "11 ", + "10 ", + "9 ", + "8 ", + ) + + +def test_insensitive_set_ror_iterable(): + assert tuple((f"{i} " for i in range(18, 4, -1)) | InsensitiveSet(f" {i}" for i in range(8))) == ( + "18 ", + "17 ", + "16 ", + "15 ", + "14 ", + "13 ", + "12 ", + "11 ", + "10 ", + "9 ", + "8 ", + "7 ", + "6 ", + "5 ", + " 0", + " 1", + " 2", + " 3", + " 4", + ) + + +def test_insensitive_set_and_iterable(): + assert tuple(InsensitiveSet(f" {i}" for i in range(8)) & (f"{i} " for i in range(18, 4, -1))) == ( + " 5", + " 6", + " 7", + ) + + +def test_insensitive_set_rand_iterable(): + assert tuple((f"{i} " for i in range(18, 4, -1)) & InsensitiveSet(f" {i}" for i in range(8))) == ( + "7 ", + "6 ", + "5 ", + ) + + +def test_insensitive_set_xor_iterable(): + assert tuple(InsensitiveSet(f" {i}" for i in range(8)) ^ (f"{i} " for i in range(18, 4, -1))) == ( + " 0", + " 1", + " 2", + " 3", + " 4", + "18 ", + "17 ", + "16 ", + "15 ", + "14 ", + "13 ", + "12 ", + "11 ", + "10 ", + "9 ", + "8 ", + ) + + +def test_insensitive_set_rxor_iterable(): + assert tuple((f"{i} " for i in range(18, 4, -1)) ^ InsensitiveSet(f" {i}" for i in range(8))) == ( + "18 ", + "17 ", + "16 ", + "15 ", + "14 ", + "13 ", + "12 ", + "11 ", + "10 ", + "9 ", + "8 ", + " 0", + " 1", + " 2", + " 3", + " 4", + ) + + +def test_insensitive_set_sub_iterable(): + assert tuple(InsensitiveSet(f" {i}" for i in range(8)) - (f"{i} " for i in range(18, 4, -1))) == ( + " 0", + " 1", + " 2", + " 3", + " 4", + ) + + +def test_insensitive_set_rsub_iterable(): + assert tuple((f"{i} " for i in range(18, 4, -1)) - InsensitiveSet(f" {i}" for i in range(8))) == ( + "18 ", + "17 ", + "16 ", + "15 ", + "14 ", + "13 ", + "12 ", + "11 ", + "10 ", + "9 ", + "8 ", + ) + + +def test_insensitive_set_iand_iterable(): + s = InsensitiveSet(f" {i}" for i in range(8)) + s &= (f"{i} " for i in range(18, 4, -1)) + + assert tuple(s) == ( + " 5", + " 6", + " 7", + ) + + +def test_insensitive_set_ior_iterable(): + s = InsensitiveSet(f" {i}" for i in range(8)) + s |= (f"{i} " for i in range(18, 4, -1)) + + assert tuple(s) == ( + " 0", + " 1", + " 2", + " 3", + " 4", + " 5", + " 6", + " 7", + "18 ", + "17 ", + "16 ", + "15 ", + "14 ", + "13 ", + "12 ", + "11 ", + "10 ", + "9 ", + "8 ", + ) + + +def test_insensitive_set_ixor_iterable(): + s = InsensitiveSet(f" {i}" for i in range(8)) + s ^= (f"{i} " for i in range(18, 4, -1)) + + assert tuple(s) == ( + " 0", + " 1", + " 2", + " 3", + " 4", + "18 ", + "17 ", + "16 ", + "15 ", + "14 ", + "13 ", + "12 ", + "11 ", + "10 ", + "9 ", + "8 ", + ) + + +def test_insensitive_set_isub_iterable(): + s = InsensitiveSet(f" {i}" for i in range(8)) + s -= (f"{i} " for i in range(18, 4, -1)) + + assert tuple(s) == ( + " 0", + " 1", + " 2", + " 3", + " 4", + ) + + +def test_insensitive_set_invalid_inequality(): + with pytest.raises(TypeError): + InsensitiveSet() <= 1 # noqa: B015 + + with pytest.raises(TypeError): + InsensitiveSet() >= 1 # noqa: B015 + + +def test_insensitive_set_eq_set(): + assert {f" {i}" for i in range(8)} == InsensitiveSet(f"{i} " for i in range(7, -1, -1)) + assert InsensitiveSet(f"{i} " for i in range(7, -1, -1)) == {f" {i}" for i in range(8)} + + assert {f" {i}" for i in range(8)} != InsensitiveSet(f"{i} " for i in range(8, -1, -1)) + assert InsensitiveSet(f"{i} " for i in range(8, -1, -1)) != {f" {i}" for i in range(8)} + + +def test_insensitive_set_eq_insensitive_set_not_order_sensitive(): + assert InsensitiveSet(f" {i}" for i in range(8)) == InsensitiveSet(f"{i} " for i in range(7, -1, -1)) + assert InsensitiveSet(f" {i}" for i in range(8)) == InsensitiveSet(f"{i} " for i in range(8)) + assert InsensitiveSet(f" {i}" for i in range(8)) != InsensitiveSet(f"{i} " for i in range(7)) + + +def test_insensitive_set_eq_iterable_order_sensitive(): + assert InsensitiveSet(f" {i}" for i in range(8)) != (f"{i} " for i in range(7, -1, -1)) + assert InsensitiveSet(f" {i}" for i in range(8)) == (f"{i} " for i in range(8)) + + +def test_insensitive_set_getitem_positive_int(): + insensitive_set = InsensitiveSet(f" {i}" for i in range(8)) + for i in range(8): + assert insensitive_set[i] == f" {i}" + + +def test_insensitive_set_getitem_negative_int(): + insensitive_set = InsensitiveSet(f" {i}" for i in range(8)) + for i, j in zip(range(8), range(-8, 0), strict=True): + assert insensitive_set[j] == f" {i}" + + +@pytest.mark.parametrize("start", tuple(range(-5, 4)) + (None,)) +@pytest.mark.parametrize("stop", tuple(range(-5, 4)) + (None,)) +@pytest.mark.parametrize("step", (-1, 1, None)) +def test_insensitive_set_getitem_slices(start, stop, step): + tup = tuple(f" {i}" for i in range(4)) + iset = InsensitiveSet(tup) + + iset_ret = iset[start:stop:step] + tup_ret = tup[start:stop:step] + + assert isinstance(iset_ret, InsensitiveSet) + assert tuple(iset_ret) == tuple(tup_ret)