add tests to .signing + z85 encoding support
This commit is contained in:
parent
3d6d44e4a1
commit
bfc6cb8e85
7 changed files with 104 additions and 13 deletions
|
|
@ -4,10 +4,12 @@
|
|||
|
||||
+ `.sqlalchemy` has been made a subpackage and split; `sqlalchemy_async` has been deprecated. Update your imports.
|
||||
+ Add several new utilities to `.sqlalchemy`: `BitSelector`, `secret_column`, `a_relationship`, `SessionWrapper`,
|
||||
`wrap=` argument to SQLAlchemy. Also removed dead batteries.
|
||||
+ Add `.waiter` module. For now, non-functional.
|
||||
`wrap=` argument to SQLAlchemy. Also removed dead batteries
|
||||
+ Add `.waiter` module. For now, non-functional
|
||||
+ Add `ArgConfigSource` to `.configparse`
|
||||
+ Add Z85 (`z85encode()` `z85decode()`) encoding support
|
||||
+ Add more strings to `.legal` module
|
||||
+ `.signing` module is now covered by tests
|
||||
|
||||
## 0.5.3
|
||||
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ from .functools import deprecated, not_implemented, timed_cache, none_pass, alru
|
|||
from .classtools import Wanted, Incomplete
|
||||
from .itertools import makelist, kwargs_prefix, ltuple, rtuple, additem, addattr
|
||||
from .i18n import I18n, JsonI18n, TomlI18n
|
||||
from .signing import UserSigner
|
||||
from .snowflake import Snowflake, SnowflakeGen
|
||||
from .lex import symbol_table, lex, ilex
|
||||
from .strtools import PrefixIdentifier
|
||||
|
|
@ -34,14 +35,14 @@ from .validators import matches
|
|||
from .redact import redact_url_password
|
||||
from .http import WantsContentType
|
||||
|
||||
__version__ = "0.6.0-dev35"
|
||||
__version__ = "0.6.0-dev36"
|
||||
|
||||
__all__ = (
|
||||
'ConfigOptions', 'ConfigParserConfigSource', 'ConfigSource', 'ConfigValue',
|
||||
'DictConfigSource', 'EnvConfigSource', 'I18n', 'Incomplete', 'JsonI18n',
|
||||
'MissingConfigError', 'MissingConfigWarning', 'PrefixIdentifier',
|
||||
'Siq', 'SiqCache', 'SiqGen', 'SiqType', 'Snowflake', 'SnowflakeGen',
|
||||
'StringCase', 'TimedDict', 'TomlI18n', 'Wanted', 'WantsContentType',
|
||||
'StringCase', 'TimedDict', 'TomlI18n', 'UserSigner', 'Wanted', 'WantsContentType',
|
||||
'addattr', 'additem', 'age_and_days', 'alru_cache', 'b2048decode', 'b2048encode',
|
||||
'b32ldecode', 'b32lencode', 'b64encode', 'b64decode', 'cb32encode',
|
||||
'cb32decode', 'count_ones', 'deprecated', 'ilex', 'join_bits',
|
||||
|
|
@ -49,5 +50,6 @@ __all__ = (
|
|||
'matches', 'mod_ceil', 'mod_floor', 'none_pass', 'not_implemented',
|
||||
'redact_url_password', 'rtuple', 'split_bits', 'ssv_list', 'symbol_table',
|
||||
'timed_cache', 'twocolon_list', 'want_bytes', 'want_datetime', 'want_isodate',
|
||||
'want_str', 'want_timestamp', 'want_urlsafe', 'want_urlsafe_bytes'
|
||||
'want_str', 'want_timestamp', 'want_urlsafe', 'want_urlsafe_bytes',
|
||||
'z85encode', 'z85decode'
|
||||
)
|
||||
|
|
|
|||
|
|
@ -27,6 +27,8 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
class MissingType(object):
|
||||
__slots__ = ()
|
||||
def __bool__(self):
|
||||
return False
|
||||
|
||||
MISSING = MissingType()
|
||||
|
||||
|
|
|
|||
|
|
@ -225,6 +225,28 @@ def rb64decode(val: bytes | str) -> bytes:
|
|||
val = want_urlsafe(val)
|
||||
return base64.urlsafe_b64decode(val.rjust(mod_ceil(len(val), 4), 'A'))
|
||||
|
||||
|
||||
B85_TO_Z85 = str.maketrans(
|
||||
'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~',
|
||||
'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#'
|
||||
)
|
||||
Z85_TO_B85 = str.maketrans(
|
||||
'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#',
|
||||
'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~'
|
||||
)
|
||||
|
||||
if hasattr(base64, 'z85encode'):
|
||||
# Python >=3.13
|
||||
def z85encode(val: bytes) -> str:
|
||||
return want_str(base64.z85encode(val))
|
||||
z85decode = base64.z85decode
|
||||
else:
|
||||
# Python <=3.12
|
||||
def z85encode(val: bytes) -> str:
|
||||
return want_str(base64.b85encode(val)).translate(B85_TO_Z85)
|
||||
def z85decode(val: bytes | str) -> bytes:
|
||||
return base64.b85decode(want_str(val).translate(Z85_TO_B85))
|
||||
|
||||
def b2048encode(val: bytes) -> str:
|
||||
'''
|
||||
Encode a bytestring using the BIP-39 wordlist.
|
||||
|
|
@ -345,5 +367,6 @@ class StringCase(enum.Enum):
|
|||
|
||||
__all__ = (
|
||||
'cb32encode', 'cb32decode', 'b32lencode', 'b32ldecode', 'b64encode', 'b64decode', 'jsonencode'
|
||||
'StringCase', 'want_bytes', 'want_str', 'jsondecode', 'ssv_list', 'twocolon_list', 'want_urlsafe', 'want_urlsafe_bytes'
|
||||
'StringCase', 'want_bytes', 'want_str', 'jsondecode', 'ssv_list', 'twocolon_list', 'want_urlsafe', 'want_urlsafe_bytes',
|
||||
'z85encode', 'z85decode'
|
||||
)
|
||||
|
|
@ -15,15 +15,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
"""
|
||||
|
||||
from abc import ABC
|
||||
from base64 import b64decode
|
||||
from typing import Any, Callable, Sequence
|
||||
import warnings
|
||||
from itsdangerous import TimestampSigner
|
||||
|
||||
from itsdangerous import Signer as _Signer
|
||||
from itsdangerous.encoding import int_to_bytes as _int_to_bytes
|
||||
|
||||
from suou.itertools import rtuple
|
||||
|
||||
from .functools import not_implemented
|
||||
from .codecs import jsondecode, jsonencode, want_bytes, want_str
|
||||
from .codecs import jsondecode, jsonencode, rb64decode, want_bytes, want_str, b64decode, b64encode
|
||||
from .iding import Siq
|
||||
from .classtools import MISSING
|
||||
|
||||
class UserSigner(TimestampSigner):
|
||||
"""
|
||||
|
|
@ -33,12 +37,21 @@ class UserSigner(TimestampSigner):
|
|||
def __init__(self, master_secret: bytes, user_id: int, user_secret: bytes, **kwargs):
|
||||
super().__init__(master_secret + user_secret, salt=Siq(user_id).to_bytes(), **kwargs)
|
||||
self.user_id = user_id
|
||||
def token(self) -> str:
|
||||
return self.sign(Siq(self.user_id).to_base64()).decode('ascii')
|
||||
def token(self, *, test_timestamp=MISSING) -> str:
|
||||
payload = Siq(self.user_id).to_base64()
|
||||
## The following is not intended for general use
|
||||
if test_timestamp is not MISSING:
|
||||
warnings.warn('timestamp= parameter is intended for testing only!\n\x1b[31mDO NOT use it in production or you might get consequences\x1b[0m, just saying', UserWarning)
|
||||
ts_payload = b64encode(_int_to_bytes(test_timestamp))
|
||||
payload = want_bytes(payload) + want_bytes(self.sep) + want_bytes(ts_payload)
|
||||
return want_str(_Signer.sign(self, payload))
|
||||
## END the following is not intended for general use
|
||||
|
||||
return want_str(self.sign(payload))
|
||||
@classmethod
|
||||
def split_token(cls, /, token: str | bytes) :
|
||||
a, b, c = want_str(token).rsplit('.', 2)
|
||||
return b64decode(a), b, b64decode(c)
|
||||
return b64decode(a), int.from_bytes(b64decode(b), 'big'), b64decode(c)
|
||||
def sign_object(self, obj: dict, /, *, encoder=jsonencode, **kwargs):
|
||||
"""
|
||||
Return a signed JSON payload of an object.
|
||||
|
|
@ -54,7 +67,6 @@ class UserSigner(TimestampSigner):
|
|||
def split_signed(self, payload: str | bytes) -> Sequence[bytes]:
|
||||
return rtuple(want_bytes(payload).rsplit(b'.', 2), 3, b'')
|
||||
|
||||
|
||||
class HasSigner(ABC):
|
||||
'''
|
||||
Abstract base class for INTERNAL USE.
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
import binascii
|
||||
import unittest
|
||||
from suou.codecs import b64encode, b64decode, want_urlsafe
|
||||
from suou.codecs import b64encode, b64decode, want_urlsafe, z85decode
|
||||
|
||||
B1 = b'N\xf0\xb4\xc3\x85\n\xf9\xb6\x9a\x0f\x82\xa6\x99G\x07#'
|
||||
B2 = b'\xbcXiF,@|{\xbe\xe3\x0cz\xa8\xcbQ\x82'
|
||||
|
|
@ -49,3 +49,12 @@ class TestCodecs(unittest.TestCase):
|
|||
self.assertEqual('Disney-', want_urlsafe('Disney+'))
|
||||
self.assertEqual('spaziocosenza', want_urlsafe('spazio cosenza'))
|
||||
self.assertEqual('=======', want_urlsafe('======='))
|
||||
|
||||
def test_z85decode(self):
|
||||
self.assertEqual(z85decode('pvLTdG:NT:NH+1ENmvGb'), B1)
|
||||
self.assertEqual(z85decode('YJw(qei[PfZt/SFSln4&'), B2)
|
||||
self.assertEqual(z85decode('>[>>)c=hgL?I8'), B3)
|
||||
self.assertEqual(z85decode('2p3(-x*%TsE0-P/40[>}'), B4)
|
||||
self.assertEqual(z85decode('%m&HH?#r'), B5)
|
||||
self.assertEqual(z85decode('%m&HH?#uEvW8mO8}l(.5F#j@a2o%'), B5 + B1)
|
||||
|
||||
41
tests/test_signing.py
Normal file
41
tests/test_signing.py
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
|
||||
|
||||
|
||||
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from suou.codecs import want_bytes, b64decode, z85decode
|
||||
from suou.iding import Siq
|
||||
from suou.signing import UserSigner
|
||||
|
||||
|
||||
class TestSigning(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
# use deterministic secrets in testing
|
||||
self.signer = UserSigner(
|
||||
z85decode('suou-test!'), # master secret
|
||||
Siq(1907492221233425151961830768246784), # user id
|
||||
b64decode('e7YXG4ob22mBCxoPvgewlAsfiZE2MFu50aP_gtnXW2v2')
|
||||
)
|
||||
def tearDown(self) -> None:
|
||||
...
|
||||
|
||||
def test_UserSigner_token(self):
|
||||
# self coherence test
|
||||
TIMESTAMP = 1757426896
|
||||
with self.assertWarns(UserWarning):
|
||||
tok = self.signer.token(test_timestamp=TIMESTAMP)
|
||||
self.assertIsInstance(tok, str)
|
||||
self.assertEqual(tok, 'AF4L78gAAAAAAAAAAAAA.aMA00A.0au9HDfOJZv-gpudEevT6Squ8go')
|
||||
|
||||
tok2 = self.signer.token()
|
||||
tim = int(time.time())
|
||||
if tim != TIMESTAMP:
|
||||
self.assertNotEqual(tok2, tok)
|
||||
|
||||
tokp = UserSigner.split_token(tok)
|
||||
self.assertEqual(tokp[0], z85decode('0a364:n=hu000000000'))
|
||||
self.assertEqual(tokp[1], TIMESTAMP)
|
||||
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue