diff --git a/CHANGELOG.md b/CHANGELOG.md index 271b50c..6ddd7a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,13 @@ + `.sqlalchemy` has been made a subpackage and split; `sqlalchemy_async` has been deprecated. Update your imports. + Add several new utilities to `.sqlalchemy`: `BitSelector`, `secret_column`, `a_relationship`, `SessionWrapper`, - `wrap=` argument to SQLAlchemy. Also removed dead batteries. -+ Add `.waiter` module. For now, non-functional. + `wrap=` argument to SQLAlchemy. Also removed dead batteries ++ Add `.waiter` module. For now, non-functional ~ + Add `ArgConfigSource` to `.configparse` ++ Add Z85 (`z85encode()` `z85decode()`) encoding support + Add more strings to `.legal` module ++ `.signing` module is now covered by tests ++ New decorator `dei_args()`. Now offensive naming is no more a worry! ## 0.5.3 diff --git a/src/suou/__init__.py b/src/suou/__init__.py index 96db617..e47d233 100644 --- a/src/suou/__init__.py +++ b/src/suou/__init__.py @@ -23,10 +23,12 @@ from .bits import count_ones, mask_shift, split_bits, join_bits, mod_ceil, mod_f from .calendar import want_datetime, want_isodate, want_timestamp, age_and_days from .configparse import MissingConfigError, MissingConfigWarning, ConfigOptions, ConfigParserConfigSource, ConfigSource, DictConfigSource, ConfigValue, EnvConfigSource from .collections import TimedDict +from .dei import dei_args from .functools import deprecated, not_implemented, timed_cache, none_pass, alru_cache from .classtools import Wanted, Incomplete from .itertools import makelist, kwargs_prefix, ltuple, rtuple, additem, addattr from .i18n import I18n, JsonI18n, TomlI18n +from .signing import UserSigner from .snowflake import Snowflake, SnowflakeGen from .lex import symbol_table, lex, ilex from .strtools import PrefixIdentifier @@ -34,20 +36,21 @@ from .validators import matches from .redact import redact_url_password from .http import WantsContentType -__version__ = "0.6.0-dev35" +__version__ = "0.6.0" __all__ = ( 'ConfigOptions', 'ConfigParserConfigSource', 'ConfigSource', 'ConfigValue', 'DictConfigSource', 'EnvConfigSource', 'I18n', 'Incomplete', 'JsonI18n', 'MissingConfigError', 'MissingConfigWarning', 'PrefixIdentifier', 'Siq', 'SiqCache', 'SiqGen', 'SiqType', 'Snowflake', 'SnowflakeGen', - 'StringCase', 'TimedDict', 'TomlI18n', 'Wanted', 'WantsContentType', + 'StringCase', 'TimedDict', 'TomlI18n', 'UserSigner', 'Wanted', 'WantsContentType', 'addattr', 'additem', 'age_and_days', 'alru_cache', 'b2048decode', 'b2048encode', 'b32ldecode', 'b32lencode', 'b64encode', 'b64decode', 'cb32encode', - 'cb32decode', 'count_ones', 'deprecated', 'ilex', 'join_bits', + 'cb32decode', 'count_ones', 'dei_args', 'deprecated', 'ilex', 'join_bits', 'jsonencode', 'kwargs_prefix', 'lex', 'ltuple', 'makelist', 'mask_shift', 'matches', 'mod_ceil', 'mod_floor', 'none_pass', 'not_implemented', 'redact_url_password', 'rtuple', 'split_bits', 'ssv_list', 'symbol_table', 'timed_cache', 'twocolon_list', 'want_bytes', 'want_datetime', 'want_isodate', - 'want_str', 'want_timestamp', 'want_urlsafe', 'want_urlsafe_bytes' + 'want_str', 'want_timestamp', 'want_urlsafe', 'want_urlsafe_bytes', + 'z85encode', 'z85decode' ) diff --git a/src/suou/classtools.py b/src/suou/classtools.py index c58a123..458a498 100644 --- a/src/suou/classtools.py +++ b/src/suou/classtools.py @@ -27,6 +27,8 @@ logger = logging.getLogger(__name__) class MissingType(object): __slots__ = () + def __bool__(self): + return False MISSING = MissingType() diff --git a/src/suou/codecs.py b/src/suou/codecs.py index e5f94af..c617160 100644 --- a/src/suou/codecs.py +++ b/src/suou/codecs.py @@ -225,6 +225,28 @@ def rb64decode(val: bytes | str) -> bytes: val = want_urlsafe(val) return base64.urlsafe_b64decode(val.rjust(mod_ceil(len(val), 4), 'A')) + +B85_TO_Z85 = str.maketrans( + '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~', + '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#' +) +Z85_TO_B85 = str.maketrans( + '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#', + '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~' +) + +if hasattr(base64, 'z85encode'): + # Python >=3.13 + def z85encode(val: bytes) -> str: + return want_str(base64.z85encode(val)) + z85decode = base64.z85decode +else: + # Python <=3.12 + def z85encode(val: bytes) -> str: + return want_str(base64.b85encode(val)).translate(B85_TO_Z85) + def z85decode(val: bytes | str) -> bytes: + return base64.b85decode(want_str(val).translate(Z85_TO_B85)) + def b2048encode(val: bytes) -> str: ''' Encode a bytestring using the BIP-39 wordlist. @@ -345,5 +367,6 @@ class StringCase(enum.Enum): __all__ = ( 'cb32encode', 'cb32decode', 'b32lencode', 'b32ldecode', 'b64encode', 'b64decode', 'jsonencode' - 'StringCase', 'want_bytes', 'want_str', 'jsondecode', 'ssv_list', 'twocolon_list', 'want_urlsafe', 'want_urlsafe_bytes' + 'StringCase', 'want_bytes', 'want_str', 'jsondecode', 'ssv_list', 'twocolon_list', 'want_urlsafe', 'want_urlsafe_bytes', + 'z85encode', 'z85decode' ) \ No newline at end of file diff --git a/src/suou/dei.py b/src/suou/dei.py index c485216..0f7a7a0 100644 --- a/src/suou/dei.py +++ b/src/suou/dei.py @@ -18,6 +18,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. from __future__ import annotations +from functools import wraps +from typing import Callable BRICKS = '@abcdefghijklmnopqrstuvwxyz+?-\'/' @@ -108,3 +110,28 @@ class Pronoun(int): i += BRICKS.index(ch) << (5 * j) return Pronoun(i) + + +def dei_args(**renames): + """ + Allow for aliases in the keyword argument names, in form alias='real_name'. + + DEI utility for those programmers who don't want to have to do with + potentially offensive variable naming. + + Dear conservatives, this does not influence the ability to call the wrapped function + with the original parameter names. + """ + def decorator(func: Callable): + @wraps(func) + def wrapper(*args, **kwargs): + for alias_name, actual_name in renames.items(): + if alias_name in kwargs: + val = kwargs.pop(alias_name) + kwargs[actual_name] = val + + return func(*args, **kwargs) + return wrapper + return decorator + + diff --git a/src/suou/signing.py b/src/suou/signing.py index 6f2dcb9..c2011ce 100644 --- a/src/suou/signing.py +++ b/src/suou/signing.py @@ -15,30 +15,45 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. """ from abc import ABC -from base64 import b64decode from typing import Any, Callable, Sequence +import warnings from itsdangerous import TimestampSigner +from itsdangerous import Signer as _Signer +from itsdangerous.encoding import int_to_bytes as _int_to_bytes + +from suou.dei import dei_args from suou.itertools import rtuple from .functools import not_implemented -from .codecs import jsondecode, jsonencode, want_bytes, want_str +from .codecs import jsondecode, jsonencode, rb64decode, want_bytes, want_str, b64decode, b64encode from .iding import Siq +from .classtools import MISSING class UserSigner(TimestampSigner): """ itsdangerous.TimestampSigner() instanced from a user ID, with token generation and validation capabilities. """ user_id: int + @dei_args(primary_secret='master_secret') def __init__(self, master_secret: bytes, user_id: int, user_secret: bytes, **kwargs): super().__init__(master_secret + user_secret, salt=Siq(user_id).to_bytes(), **kwargs) self.user_id = user_id - def token(self) -> str: - return self.sign(Siq(self.user_id).to_base64()).decode('ascii') + def token(self, *, test_timestamp=MISSING) -> str: + payload = Siq(self.user_id).to_base64() + ## The following is not intended for general use + if test_timestamp is not MISSING: + warnings.warn('timestamp= parameter is intended for testing only!\n\x1b[31mDO NOT use it in production or you might get consequences\x1b[0m, just saying', UserWarning) + ts_payload = b64encode(_int_to_bytes(test_timestamp)) + payload = want_bytes(payload) + want_bytes(self.sep) + want_bytes(ts_payload) + return want_str(_Signer.sign(self, payload)) + ## END the following is not intended for general use + + return want_str(self.sign(payload)) @classmethod def split_token(cls, /, token: str | bytes) : a, b, c = want_str(token).rsplit('.', 2) - return b64decode(a), b, b64decode(c) + return b64decode(a), int.from_bytes(b64decode(b), 'big'), b64decode(c) def sign_object(self, obj: dict, /, *, encoder=jsonencode, **kwargs): """ Return a signed JSON payload of an object. @@ -54,7 +69,6 @@ class UserSigner(TimestampSigner): def split_signed(self, payload: str | bytes) -> Sequence[bytes]: return rtuple(want_bytes(payload).rsplit(b'.', 2), 3, b'') - class HasSigner(ABC): ''' Abstract base class for INTERNAL USE. diff --git a/src/suou/sqlalchemy/orm.py b/src/suou/sqlalchemy/orm.py index 06a876a..403e762 100644 --- a/src/suou/sqlalchemy/orm.py +++ b/src/suou/sqlalchemy/orm.py @@ -20,19 +20,24 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. from binascii import Incomplete import os -from typing import Any, Callable +from typing import Any, Callable, TypeVar import warnings from sqlalchemy import BigInteger, Boolean, CheckConstraint, Column, Date, ForeignKey, LargeBinary, MetaData, SmallInteger, String, text from sqlalchemy.orm import DeclarativeBase, InstrumentedAttribute, Relationship, declarative_base as _declarative_base, relationship from sqlalchemy.types import TypeEngine -from suou.classtools import Wanted +from sqlalchemy.ext.hybrid import Comparator +from suou.classtools import Wanted, Incomplete from suou.codecs import StringCase +from suou.dei import dei_args from suou.iding import Siq, SiqCache, SiqGen, SiqType from suou.itertools import kwargs_prefix from suou.snowflake import SnowflakeGen from suou.sqlalchemy import IdType +_T = TypeVar('_T') + + def want_column(cls: type[DeclarativeBase], col: Column[_T] | str) -> Column[_T]: """ Return a table's column given its name. @@ -117,6 +122,7 @@ def bool_column(value: bool = False, nullable: bool = False, **kwargs) -> Column return Column(Boolean, server_default=def_val, nullable=nullable, **kwargs) +@dei_args(primary_secret='master_secret') def declarative_base(domain_name: str, master_secret: bytes, metadata: dict | None = None, **kwargs) -> type[DeclarativeBase]: """ Drop-in replacement for sqlalchemy.orm.declarative_base() diff --git a/tests/test_codecs.py b/tests/test_codecs.py index 035a605..95eb10f 100644 --- a/tests/test_codecs.py +++ b/tests/test_codecs.py @@ -2,7 +2,7 @@ import binascii import unittest -from suou.codecs import b64encode, b64decode, want_urlsafe +from suou.codecs import b64encode, b64decode, want_urlsafe, z85decode B1 = b'N\xf0\xb4\xc3\x85\n\xf9\xb6\x9a\x0f\x82\xa6\x99G\x07#' B2 = b'\xbcXiF,@|{\xbe\xe3\x0cz\xa8\xcbQ\x82' @@ -49,3 +49,12 @@ class TestCodecs(unittest.TestCase): self.assertEqual('Disney-', want_urlsafe('Disney+')) self.assertEqual('spaziocosenza', want_urlsafe('spazio cosenza')) self.assertEqual('=======', want_urlsafe('=======')) + + def test_z85decode(self): + self.assertEqual(z85decode('pvLTdG:NT:NH+1ENmvGb'), B1) + self.assertEqual(z85decode('YJw(qei[PfZt/SFSln4&'), B2) + self.assertEqual(z85decode('>[>>)c=hgL?I8'), B3) + self.assertEqual(z85decode('2p3(-x*%TsE0-P/40[>}'), B4) + self.assertEqual(z85decode('%m&HH?#r'), B5) + self.assertEqual(z85decode('%m&HH?#uEvW8mO8}l(.5F#j@a2o%'), B5 + B1) + \ No newline at end of file diff --git a/tests/test_signing.py b/tests/test_signing.py new file mode 100644 index 0000000..e56c976 --- /dev/null +++ b/tests/test_signing.py @@ -0,0 +1,41 @@ + + + + +import time +import unittest + +from suou.codecs import want_bytes, b64decode, z85decode +from suou.iding import Siq +from suou.signing import UserSigner + + +class TestSigning(unittest.TestCase): + def setUp(self) -> None: + # use deterministic secrets in testing + self.signer = UserSigner( + z85decode('suou-test!'), # master secret + Siq(1907492221233425151961830768246784), # user id + b64decode('e7YXG4ob22mBCxoPvgewlAsfiZE2MFu50aP_gtnXW2v2') + ) + def tearDown(self) -> None: + ... + + def test_UserSigner_token(self): + # self coherence test + TIMESTAMP = 1757426896 + with self.assertWarns(UserWarning): + tok = self.signer.token(test_timestamp=TIMESTAMP) + self.assertIsInstance(tok, str) + self.assertEqual(tok, 'AF4L78gAAAAAAAAAAAAA.aMA00A.0au9HDfOJZv-gpudEevT6Squ8go') + + tok2 = self.signer.token() + tim = int(time.time()) + if tim != TIMESTAMP: + self.assertNotEqual(tok2, tok) + + tokp = UserSigner.split_token(tok) + self.assertEqual(tokp[0], z85decode('0a364:n=hu000000000')) + self.assertEqual(tokp[1], TIMESTAMP) + + \ No newline at end of file