FIX Snowflake codecs, add sqlalchemy.parent_children(), additem(), release 0.3.0

This commit is contained in:
Yusur 2025-06-18 08:40:37 +02:00
parent 946973f732
commit e615cbb628
6 changed files with 58 additions and 22 deletions

View file

@ -6,11 +6,11 @@
- **Changed behavior** of `kwargs_prefix()` which now removes keys from original mapping by default
- Add SQLAlchemy auth loaders i.e. `sqlalchemy.require_auth_base()`, `flask_sqlalchemy`.
What auth loaders do is loading user token and signature into app
- Add `sqlalchemy.create_session()`
- `sqlalchemy`: add `parent_children()` and `create_session()`
- Implement `UserSigner()`
- Improve JSON handling in `flask_restx`
- Add base2048 (i.e. [BIP-39](https://github.com/bitcoin/bips/blob/master/bip-0039.mediawiki)) codec
- Add `split_bits()`, `join_bits()`, `ltuple()`, `rtuple()`, `ssv_list()`
- Add `split_bits()`, `join_bits()`, `ltuple()`, `rtuple()`, `ssv_list()`, `additem()`
- Add `markdown` extensions
- Add Snowflake manipulation utilities

View file

@ -17,16 +17,17 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
"""
from .iding import Siq, SiqCache, SiqType, SiqGen
from .codecs import StringCase, cb32encode, cb32decode, jsonencode, want_bytes, want_str, b2048encode, b2048decode, ssv_list
from .codecs import (StringCase, cb32encode, cb32decode, b32lencode, b32ldecode, b64encode, b64decode, b2048encode, b2048decode,
jsonencode, want_bytes, want_str, ssv_list)
from .bits import count_ones, mask_shift, split_bits, join_bits
from .configparse import MissingConfigError, MissingConfigWarning, ConfigOptions, ConfigParserConfigSource, ConfigSource, DictConfigSource, ConfigValue, EnvConfigSource
from .functools import deprecated, not_implemented
from .classtools import Wanted, Incomplete
from .itertools import makelist, kwargs_prefix, ltuple, rtuple
from .itertools import makelist, kwargs_prefix, ltuple, rtuple, additem
from .i18n import I18n, JsonI18n, TomlI18n
from .snowflake import Snowflake, SnowflakeGen
__version__ = "0.3.0-dev24"
__version__ = "0.3.0"
__all__ = (
'Siq', 'SiqCache', 'SiqType', 'SiqGen', 'StringCase',
@ -34,5 +35,5 @@ __all__ = (
'deprecated', 'not_implemented', 'Wanted', 'Incomplete', 'jsonencode', 'ltuple', 'rtuple',
'makelist', 'kwargs_prefix', 'I18n', 'JsonI18n', 'TomlI18n', 'cb32encode', 'cb32decode', 'count_ones', 'mask_shift',
'want_bytes', 'want_str', 'version', 'b2048encode', 'split_bits', 'join_bits', 'b2048decode',
'Snowflake', 'SnowflakeGen', 'ssv_list'
'Snowflake', 'SnowflakeGen', 'ssv_list', 'additem', 'b32lencode', 'b32ldecode', 'b64encode', 'b64decode'
)

View file

@ -22,7 +22,7 @@ try:
from warnings import deprecated
except ImportError:
# Python <=3.12 does not implement warnings.deprecated
def deprecated(message: str, /, *, category=DeprecationWarning):
def deprecated(message: str, /, *, category=DeprecationWarning, stacklevel:int=1):
"""
Backport of PEP 702 for Python <=3.12.
The stack_level stuff is not reimplemented on purpose because
@ -32,7 +32,7 @@ except ImportError:
@wraps(func)
def wrapper(*a, **ka):
if category is not None:
warnings.warn(message, category)
warnings.warn(message, category, stacklevel=stacklevel)
return func(*a, **ka)
func.__deprecated__ = True
wrapper.__deprecated__ = True

View file

@ -14,7 +14,8 @@ This software is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
'''
from typing import Any, Iterable, TypeVar
from typing import Any, Iterable, MutableMapping, TypeVar
import warnings
_T = TypeVar('_T')
@ -70,6 +71,18 @@ def kwargs_prefix(it: dict[str, Any], prefix: str, *, remove = True, keep_prefix
it.pop(k)
return ka
def additem(obj: MutableMapping, /, name: str = None):
"""
Syntax sugar for adding a function to a mapping, immediately.
"""
def decorator(func):
key = name or func.__name__
if key in obj:
warnings.warn(f'mapping does already have item {key!r}')
obj[key] = func
return func
return decorator
__all__ = ('makelist', 'kwargs_prefix', 'ltuple', 'rtuple')
__all__ = ('makelist', 'kwargs_prefix', 'ltuple', 'rtuple', 'additem')

View file

@ -121,29 +121,31 @@ class Snowflake(int):
def to_bytes(self, length: int = 14, byteorder = "big", *, signed: bool = False) -> bytes:
return super().to_bytes(length, byteorder, signed=signed)
@classmethod
def from_bytes(cls, b: bytes, byteorder = 'big', *, signed: bool = False) -> Snowflake:
if len(b) != 8:
warnings.warn('Snowflakes are exactly 8 bytes long', BytesWarning)
return super().from_bytes(b, byteorder, signed=signed)
def to_base64(self, length: int = 9, *, strip: bool = True) -> str:
return b64encode(self.to_bytes(length), strip=strip)
def to_cb32(self)-> str:
return cb32encode(self.to_bytes(9, 'big'))
return cb32encode(self.to_bytes(8, 'big'))
to_crockford = to_cb32
def to_hex(self) -> str:
return f'{self:x}'
def to_oct(self) -> str:
return f'{self:o}'
def to_b32l(self) -> str:
"""PSA Snowflake Base32 representations are padded to 10 bytes!"""
return b32lencode(self.to_bytes(10, 'big')).lstrip('a')
@classmethod
def from_bytes(cls, b: bytes, byteorder = 'big', *, signed: bool = False) -> Snowflake:
if len(b) not in (8, 10):
warnings.warn('Snowflakes are exactly 8 bytes long', BytesWarning)
return super().from_bytes(b, byteorder, signed=signed)
@classmethod
def from_b32l(cls, val: str) -> Snowflake:
if val.startswith('_'):
## support for negative Snowflakes
return -cls.from_b32l(val.lstrip('_'))
return Snowflake.from_bytes(b32ldecode(val.ljust(16, 'a'))[-8:])
return Snowflake.from_bytes(b32ldecode(val.rjust(16, 'a')))
@override
def __format__(self, opt: str, /) -> str:

View file

@ -18,10 +18,10 @@ from __future__ import annotations
from abc import ABCMeta, abstractmethod
from functools import wraps
from typing import Any, Callable, Iterable, Never, TypeVar
from typing import Callable, Iterable, Never, TypeVar
import warnings
from sqlalchemy import BigInteger, CheckConstraint, Date, Dialect, ForeignKey, LargeBinary, Column, MetaData, SmallInteger, String, create_engine, select, text
from sqlalchemy.orm import DeclarativeBase, Session, declarative_base as _declarative_base
from sqlalchemy.orm import DeclarativeBase, Session, declarative_base as _declarative_base, relationship
from .snowflake import SnowflakeGen
from .itertools import kwargs_prefix, makelist
@ -130,11 +130,11 @@ def declarative_base(domain_name: str, master_secret: bytes, metadata: dict | No
if 'info' not in metadata:
metadata['info'] = dict()
# snowflake metadata
snowflake_kwargs = kwargs_prefix(kwargs, 'snowflake_', remove=True)
snowflake_kwargs = kwargs_prefix(kwargs, 'snowflake_', remove=True, keep_prefix=True)
metadata['info'].update(
domain_name = domain_name,
secret_key = master_secret,
**{f'snowflake_{k}': v for k, v in snowflake_kwargs}
**snowflake_kwargs
)
Base = _declarative_base(metadata=MetaData(**metadata), **kwargs)
return Base
@ -193,6 +193,26 @@ def age_pair(*, nullable: bool = False, **ka) -> tuple[Column, Column]:
return (date_col, acc_col)
def parent_children(keyword: str, /, **kwargs):
"""
Self-referential one-to-many relationship pair.
Parent comes first, children come later.
keyword is used in back_populates column names: convention over
configuration. Naming it otherwise will BREAK your models.
Additional keyword arguments can be sourced with parent_ and child_ argument prefixes,
obviously.
"""
parent_kwargs = kwargs_prefix(kwargs, 'parent_')
child_kwargs = kwargs_prefix(kwargs, 'child_')
parent = Incomplete(relationship, Wanted(lambda o, n: o.__name__), back_populates=f'child_{keyword}s', **parent_kwargs)
child = Incomplete(relationship, Wanted(lambda o, n: o.__name__), back_populates=f'parent_{keyword}', **child_kwargs)
return parent, child
def want_column(cls: type[DeclarativeBase], col: Column[_T] | str) -> Column[_T]:
"""
Return a table's column given its name.