Compare commits

...

3 commits

9 changed files with 144 additions and 16 deletions

View file

@ -4,10 +4,13 @@
+ `.sqlalchemy` has been made a subpackage and split; `sqlalchemy_async` has been deprecated. Update your imports. + `.sqlalchemy` has been made a subpackage and split; `sqlalchemy_async` has been deprecated. Update your imports.
+ Add several new utilities to `.sqlalchemy`: `BitSelector`, `secret_column`, `a_relationship`, `SessionWrapper`, + Add several new utilities to `.sqlalchemy`: `BitSelector`, `secret_column`, `a_relationship`, `SessionWrapper`,
`wrap=` argument to SQLAlchemy. Also removed dead batteries. `wrap=` argument to SQLAlchemy. Also removed dead batteries
+ Add `.waiter` module. For now, non-functional. + Add `.waiter` module. For now, non-functional ~
+ Add `ArgConfigSource` to `.configparse` + Add `ArgConfigSource` to `.configparse`
+ Add Z85 (`z85encode()` `z85decode()`) encoding support
+ Add more strings to `.legal` module + Add more strings to `.legal` module
+ `.signing` module is now covered by tests
+ New decorator `dei_args()`. Now offensive naming is no more a worry!
## 0.5.3 ## 0.5.3

View file

@ -23,10 +23,12 @@ from .bits import count_ones, mask_shift, split_bits, join_bits, mod_ceil, mod_f
from .calendar import want_datetime, want_isodate, want_timestamp, age_and_days from .calendar import want_datetime, want_isodate, want_timestamp, age_and_days
from .configparse import MissingConfigError, MissingConfigWarning, ConfigOptions, ConfigParserConfigSource, ConfigSource, DictConfigSource, ConfigValue, EnvConfigSource from .configparse import MissingConfigError, MissingConfigWarning, ConfigOptions, ConfigParserConfigSource, ConfigSource, DictConfigSource, ConfigValue, EnvConfigSource
from .collections import TimedDict from .collections import TimedDict
from .dei import dei_args
from .functools import deprecated, not_implemented, timed_cache, none_pass, alru_cache from .functools import deprecated, not_implemented, timed_cache, none_pass, alru_cache
from .classtools import Wanted, Incomplete from .classtools import Wanted, Incomplete
from .itertools import makelist, kwargs_prefix, ltuple, rtuple, additem, addattr from .itertools import makelist, kwargs_prefix, ltuple, rtuple, additem, addattr
from .i18n import I18n, JsonI18n, TomlI18n from .i18n import I18n, JsonI18n, TomlI18n
from .signing import UserSigner
from .snowflake import Snowflake, SnowflakeGen from .snowflake import Snowflake, SnowflakeGen
from .lex import symbol_table, lex, ilex from .lex import symbol_table, lex, ilex
from .strtools import PrefixIdentifier from .strtools import PrefixIdentifier
@ -34,20 +36,21 @@ from .validators import matches
from .redact import redact_url_password from .redact import redact_url_password
from .http import WantsContentType from .http import WantsContentType
__version__ = "0.6.0-dev35" __version__ = "0.6.0"
__all__ = ( __all__ = (
'ConfigOptions', 'ConfigParserConfigSource', 'ConfigSource', 'ConfigValue', 'ConfigOptions', 'ConfigParserConfigSource', 'ConfigSource', 'ConfigValue',
'DictConfigSource', 'EnvConfigSource', 'I18n', 'Incomplete', 'JsonI18n', 'DictConfigSource', 'EnvConfigSource', 'I18n', 'Incomplete', 'JsonI18n',
'MissingConfigError', 'MissingConfigWarning', 'PrefixIdentifier', 'MissingConfigError', 'MissingConfigWarning', 'PrefixIdentifier',
'Siq', 'SiqCache', 'SiqGen', 'SiqType', 'Snowflake', 'SnowflakeGen', 'Siq', 'SiqCache', 'SiqGen', 'SiqType', 'Snowflake', 'SnowflakeGen',
'StringCase', 'TimedDict', 'TomlI18n', 'Wanted', 'WantsContentType', 'StringCase', 'TimedDict', 'TomlI18n', 'UserSigner', 'Wanted', 'WantsContentType',
'addattr', 'additem', 'age_and_days', 'alru_cache', 'b2048decode', 'b2048encode', 'addattr', 'additem', 'age_and_days', 'alru_cache', 'b2048decode', 'b2048encode',
'b32ldecode', 'b32lencode', 'b64encode', 'b64decode', 'cb32encode', 'b32ldecode', 'b32lencode', 'b64encode', 'b64decode', 'cb32encode',
'cb32decode', 'count_ones', 'deprecated', 'ilex', 'join_bits', 'cb32decode', 'count_ones', 'dei_args', 'deprecated', 'ilex', 'join_bits',
'jsonencode', 'kwargs_prefix', 'lex', 'ltuple', 'makelist', 'mask_shift', 'jsonencode', 'kwargs_prefix', 'lex', 'ltuple', 'makelist', 'mask_shift',
'matches', 'mod_ceil', 'mod_floor', 'none_pass', 'not_implemented', 'matches', 'mod_ceil', 'mod_floor', 'none_pass', 'not_implemented',
'redact_url_password', 'rtuple', 'split_bits', 'ssv_list', 'symbol_table', 'redact_url_password', 'rtuple', 'split_bits', 'ssv_list', 'symbol_table',
'timed_cache', 'twocolon_list', 'want_bytes', 'want_datetime', 'want_isodate', 'timed_cache', 'twocolon_list', 'want_bytes', 'want_datetime', 'want_isodate',
'want_str', 'want_timestamp', 'want_urlsafe', 'want_urlsafe_bytes' 'want_str', 'want_timestamp', 'want_urlsafe', 'want_urlsafe_bytes',
'z85encode', 'z85decode'
) )

View file

@ -27,6 +27,8 @@ logger = logging.getLogger(__name__)
class MissingType(object): class MissingType(object):
__slots__ = () __slots__ = ()
def __bool__(self):
return False
MISSING = MissingType() MISSING = MissingType()

View file

@ -225,6 +225,28 @@ def rb64decode(val: bytes | str) -> bytes:
val = want_urlsafe(val) val = want_urlsafe(val)
return base64.urlsafe_b64decode(val.rjust(mod_ceil(len(val), 4), 'A')) return base64.urlsafe_b64decode(val.rjust(mod_ceil(len(val), 4), 'A'))
B85_TO_Z85 = str.maketrans(
'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~',
'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#'
)
Z85_TO_B85 = str.maketrans(
'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#',
'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~'
)
if hasattr(base64, 'z85encode'):
# Python >=3.13
def z85encode(val: bytes) -> str:
return want_str(base64.z85encode(val))
z85decode = base64.z85decode
else:
# Python <=3.12
def z85encode(val: bytes) -> str:
return want_str(base64.b85encode(val)).translate(B85_TO_Z85)
def z85decode(val: bytes | str) -> bytes:
return base64.b85decode(want_str(val).translate(Z85_TO_B85))
def b2048encode(val: bytes) -> str: def b2048encode(val: bytes) -> str:
''' '''
Encode a bytestring using the BIP-39 wordlist. Encode a bytestring using the BIP-39 wordlist.
@ -345,5 +367,6 @@ class StringCase(enum.Enum):
__all__ = ( __all__ = (
'cb32encode', 'cb32decode', 'b32lencode', 'b32ldecode', 'b64encode', 'b64decode', 'jsonencode' 'cb32encode', 'cb32decode', 'b32lencode', 'b32ldecode', 'b64encode', 'b64decode', 'jsonencode'
'StringCase', 'want_bytes', 'want_str', 'jsondecode', 'ssv_list', 'twocolon_list', 'want_urlsafe', 'want_urlsafe_bytes' 'StringCase', 'want_bytes', 'want_str', 'jsondecode', 'ssv_list', 'twocolon_list', 'want_urlsafe', 'want_urlsafe_bytes',
'z85encode', 'z85decode'
) )

View file

@ -18,6 +18,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
from __future__ import annotations from __future__ import annotations
from functools import wraps
from typing import Callable
BRICKS = '@abcdefghijklmnopqrstuvwxyz+?-\'/' BRICKS = '@abcdefghijklmnopqrstuvwxyz+?-\'/'
@ -108,3 +110,28 @@ class Pronoun(int):
i += BRICKS.index(ch) << (5 * j) i += BRICKS.index(ch) << (5 * j)
return Pronoun(i) return Pronoun(i)
def dei_args(**renames):
"""
Allow for aliases in the keyword argument names, in form alias='real_name'.
DEI utility for those programmers who don't want to have to do with
potentially offensive variable naming.
Dear conservatives, this does not influence the ability to call the wrapped function
with the original parameter names.
"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
for alias_name, actual_name in renames.items():
if alias_name in kwargs:
val = kwargs.pop(alias_name)
kwargs[actual_name] = val
return func(*args, **kwargs)
return wrapper
return decorator

View file

@ -15,30 +15,45 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
""" """
from abc import ABC from abc import ABC
from base64 import b64decode
from typing import Any, Callable, Sequence from typing import Any, Callable, Sequence
import warnings
from itsdangerous import TimestampSigner from itsdangerous import TimestampSigner
from itsdangerous import Signer as _Signer
from itsdangerous.encoding import int_to_bytes as _int_to_bytes
from suou.dei import dei_args
from suou.itertools import rtuple from suou.itertools import rtuple
from .functools import not_implemented from .functools import not_implemented
from .codecs import jsondecode, jsonencode, want_bytes, want_str from .codecs import jsondecode, jsonencode, rb64decode, want_bytes, want_str, b64decode, b64encode
from .iding import Siq from .iding import Siq
from .classtools import MISSING
class UserSigner(TimestampSigner): class UserSigner(TimestampSigner):
""" """
itsdangerous.TimestampSigner() instanced from a user ID, with token generation and validation capabilities. itsdangerous.TimestampSigner() instanced from a user ID, with token generation and validation capabilities.
""" """
user_id: int user_id: int
@dei_args(primary_secret='master_secret')
def __init__(self, master_secret: bytes, user_id: int, user_secret: bytes, **kwargs): def __init__(self, master_secret: bytes, user_id: int, user_secret: bytes, **kwargs):
super().__init__(master_secret + user_secret, salt=Siq(user_id).to_bytes(), **kwargs) super().__init__(master_secret + user_secret, salt=Siq(user_id).to_bytes(), **kwargs)
self.user_id = user_id self.user_id = user_id
def token(self) -> str: def token(self, *, test_timestamp=MISSING) -> str:
return self.sign(Siq(self.user_id).to_base64()).decode('ascii') payload = Siq(self.user_id).to_base64()
## The following is not intended for general use
if test_timestamp is not MISSING:
warnings.warn('timestamp= parameter is intended for testing only!\n\x1b[31mDO NOT use it in production or you might get consequences\x1b[0m, just saying', UserWarning)
ts_payload = b64encode(_int_to_bytes(test_timestamp))
payload = want_bytes(payload) + want_bytes(self.sep) + want_bytes(ts_payload)
return want_str(_Signer.sign(self, payload))
## END the following is not intended for general use
return want_str(self.sign(payload))
@classmethod @classmethod
def split_token(cls, /, token: str | bytes) : def split_token(cls, /, token: str | bytes) :
a, b, c = want_str(token).rsplit('.', 2) a, b, c = want_str(token).rsplit('.', 2)
return b64decode(a), b, b64decode(c) return b64decode(a), int.from_bytes(b64decode(b), 'big'), b64decode(c)
def sign_object(self, obj: dict, /, *, encoder=jsonencode, **kwargs): def sign_object(self, obj: dict, /, *, encoder=jsonencode, **kwargs):
""" """
Return a signed JSON payload of an object. Return a signed JSON payload of an object.
@ -54,7 +69,6 @@ class UserSigner(TimestampSigner):
def split_signed(self, payload: str | bytes) -> Sequence[bytes]: def split_signed(self, payload: str | bytes) -> Sequence[bytes]:
return rtuple(want_bytes(payload).rsplit(b'.', 2), 3, b'') return rtuple(want_bytes(payload).rsplit(b'.', 2), 3, b'')
class HasSigner(ABC): class HasSigner(ABC):
''' '''
Abstract base class for INTERNAL USE. Abstract base class for INTERNAL USE.

View file

@ -20,19 +20,24 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
from binascii import Incomplete from binascii import Incomplete
import os import os
from typing import Any, Callable from typing import Any, Callable, TypeVar
import warnings import warnings
from sqlalchemy import BigInteger, Boolean, CheckConstraint, Column, Date, ForeignKey, LargeBinary, MetaData, SmallInteger, String, text from sqlalchemy import BigInteger, Boolean, CheckConstraint, Column, Date, ForeignKey, LargeBinary, MetaData, SmallInteger, String, text
from sqlalchemy.orm import DeclarativeBase, InstrumentedAttribute, Relationship, declarative_base as _declarative_base, relationship from sqlalchemy.orm import DeclarativeBase, InstrumentedAttribute, Relationship, declarative_base as _declarative_base, relationship
from sqlalchemy.types import TypeEngine from sqlalchemy.types import TypeEngine
from suou.classtools import Wanted from sqlalchemy.ext.hybrid import Comparator
from suou.classtools import Wanted, Incomplete
from suou.codecs import StringCase from suou.codecs import StringCase
from suou.dei import dei_args
from suou.iding import Siq, SiqCache, SiqGen, SiqType from suou.iding import Siq, SiqCache, SiqGen, SiqType
from suou.itertools import kwargs_prefix from suou.itertools import kwargs_prefix
from suou.snowflake import SnowflakeGen from suou.snowflake import SnowflakeGen
from suou.sqlalchemy import IdType from suou.sqlalchemy import IdType
_T = TypeVar('_T')
def want_column(cls: type[DeclarativeBase], col: Column[_T] | str) -> Column[_T]: def want_column(cls: type[DeclarativeBase], col: Column[_T] | str) -> Column[_T]:
""" """
Return a table's column given its name. Return a table's column given its name.
@ -117,6 +122,7 @@ def bool_column(value: bool = False, nullable: bool = False, **kwargs) -> Column
return Column(Boolean, server_default=def_val, nullable=nullable, **kwargs) return Column(Boolean, server_default=def_val, nullable=nullable, **kwargs)
@dei_args(primary_secret='master_secret')
def declarative_base(domain_name: str, master_secret: bytes, metadata: dict | None = None, **kwargs) -> type[DeclarativeBase]: def declarative_base(domain_name: str, master_secret: bytes, metadata: dict | None = None, **kwargs) -> type[DeclarativeBase]:
""" """
Drop-in replacement for sqlalchemy.orm.declarative_base() Drop-in replacement for sqlalchemy.orm.declarative_base()

View file

@ -2,7 +2,7 @@
import binascii import binascii
import unittest import unittest
from suou.codecs import b64encode, b64decode, want_urlsafe from suou.codecs import b64encode, b64decode, want_urlsafe, z85decode
B1 = b'N\xf0\xb4\xc3\x85\n\xf9\xb6\x9a\x0f\x82\xa6\x99G\x07#' B1 = b'N\xf0\xb4\xc3\x85\n\xf9\xb6\x9a\x0f\x82\xa6\x99G\x07#'
B2 = b'\xbcXiF,@|{\xbe\xe3\x0cz\xa8\xcbQ\x82' B2 = b'\xbcXiF,@|{\xbe\xe3\x0cz\xa8\xcbQ\x82'
@ -49,3 +49,12 @@ class TestCodecs(unittest.TestCase):
self.assertEqual('Disney-', want_urlsafe('Disney+')) self.assertEqual('Disney-', want_urlsafe('Disney+'))
self.assertEqual('spaziocosenza', want_urlsafe('spazio cosenza')) self.assertEqual('spaziocosenza', want_urlsafe('spazio cosenza'))
self.assertEqual('=======', want_urlsafe('=======')) self.assertEqual('=======', want_urlsafe('======='))
def test_z85decode(self):
self.assertEqual(z85decode('pvLTdG:NT:NH+1ENmvGb'), B1)
self.assertEqual(z85decode('YJw(qei[PfZt/SFSln4&'), B2)
self.assertEqual(z85decode('>[>>)c=hgL?I8'), B3)
self.assertEqual(z85decode('2p3(-x*%TsE0-P/40[>}'), B4)
self.assertEqual(z85decode('%m&HH?#r'), B5)
self.assertEqual(z85decode('%m&HH?#uEvW8mO8}l(.5F#j@a2o%'), B5 + B1)

41
tests/test_signing.py Normal file
View file

@ -0,0 +1,41 @@
import time
import unittest
from suou.codecs import want_bytes, b64decode, z85decode
from suou.iding import Siq
from suou.signing import UserSigner
class TestSigning(unittest.TestCase):
def setUp(self) -> None:
# use deterministic secrets in testing
self.signer = UserSigner(
z85decode('suou-test!'), # master secret
Siq(1907492221233425151961830768246784), # user id
b64decode('e7YXG4ob22mBCxoPvgewlAsfiZE2MFu50aP_gtnXW2v2')
)
def tearDown(self) -> None:
...
def test_UserSigner_token(self):
# self coherence test
TIMESTAMP = 1757426896
with self.assertWarns(UserWarning):
tok = self.signer.token(test_timestamp=TIMESTAMP)
self.assertIsInstance(tok, str)
self.assertEqual(tok, 'AF4L78gAAAAAAAAAAAAA.aMA00A.0au9HDfOJZv-gpudEevT6Squ8go')
tok2 = self.signer.token()
tim = int(time.time())
if tim != TIMESTAMP:
self.assertNotEqual(tok2, tok)
tokp = UserSigner.split_token(tok)
self.assertEqual(tokp[0], z85decode('0a364:n=hu000000000'))
self.assertEqual(tokp[1], TIMESTAMP)