bug fixes in sqlalchemy and classtools, 0.2.3

This commit is contained in:
Yusur 2025-05-27 16:04:09 +02:00
parent 4ad23d73f8
commit dfa4309216
4 changed files with 33 additions and 12 deletions

View file

@ -1,5 +1,9 @@
# Changelog
## 0.2.3
- Bug fixes in `classtools` and `sqlalchemy`
## 0.2.1
- Add `codecs.jsonencode`

View file

@ -25,7 +25,7 @@ from .classtools import Wanted, Incomplete
from .itertools import makelist, kwargs_prefix
from .i18n import I18n, JsonI18n, TomlI18n
__version__ = "0.2.2"
__version__ = "0.2.3"
__all__ = (
'Siq', 'SiqCache', 'SiqType', 'SiqGen', 'StringCase',

View file

@ -40,8 +40,13 @@ class Wanted(Generic[_T]):
return name
elif isinstance(self._target, str):
return getattr(owner, self._target)
else:
elif callable(self._target):
try:
return self._target(owner, name)
except TypeError:
return self._target(owner)
else:
raise TypeError(f'Wanted() requires a str, callable, or None, got {self._target.__class__.__name__!r}')
class Incomplete(Generic[_T]):
"""
@ -56,7 +61,12 @@ class Incomplete(Generic[_T]):
_obj = Callable[Any, _T]
_args: Iterable
_kwargs: dict
def __init__(self, obj: Callable[Any, _T], *args, **kwargs):
def __init__(self, obj: Callable[Any, _T] | Wanted, *args, **kwargs):
if isinstance(obj, Wanted):
self._obj = lambda x: x
self._args = (obj, )
self._kwargs = {}
else:
self._obj = obj
self._args = args
self._kwargs = kwargs
@ -69,7 +79,7 @@ class Incomplete(Generic[_T]):
args.append(arg)
kwargs = dict()
for ak, av in self._kwargs.items():
if isinstance(arg, Wanted):
if isinstance(av, Wanted):
kwargs[ak] = av(owner, name)
else:
kwargs[ak] = av

View file

@ -81,7 +81,7 @@ def match_column(length: int, regex: str, /, case: StringCase = StringCase.AS_IS
if case != StringCase.AS_IS: # TODO
warnings.warn('case arg is currently not working', FutureWarning)
return Incomplete(Column, String(length), Wanted(lambda x, n: match_constraint(n, regex, #dialect=x.metadata.engine.dialect.name,
constraint_name=constraint_name or f'{x.__tablename__}_{n}_valid'), *args, **kwargs))
constraint_name=constraint_name or f'{x.__tablename__}_{n}_valid')), *args, **kwargs)
def declarative_base(domain_name: str, master_secret: bytes, metadata: dict | None = None, **kwargs):
@ -109,15 +109,22 @@ def token_signer(id_attr: Column | str, secret_attr: Column | str) -> Incomplete
Requires a master secret (taken from Base.metadata), a user id (visible in the token)
and a user secret.
"""
if isinstance(id_attr, Column):
id_val = Wanted(id_attr.key)
if isinstance(id_attr, Incomplete):
raise TypeError('attempt to pass an uninstanced column. Pass the column name as a string instead.')
elif isinstance(id_attr, Column):
id_val = id_attr
elif isinstance(id_attr, str):
id_val = Wanted(id_attr)
if isinstance(secret_attr, Column):
secret_val = Wanted(secret_attr.key)
secret_val = secret_attr
elif isinstance(secret_attr, str):
secret_val = Wanted(secret_attr)
return Incomplete(UserSigner, Wanted(lambda x, n: x.metadata.info['secret_key']), id_val, secret_val)
def token_signer_factory(owner: DeclarativeBase, name: str):
def my_signer(self):
return UserSigner(owner.metadata.info['secret_key'], id_val.__get__(self, owner), secret_val.__get__(self, owner))
my_signer.__name__ = name
return my_signer
return Incomplete(Wanted(token_signer_factory))
def author_pair(fk_name: str, *, id_type: type = IdType, sig_type: type | None = None, nullable: bool = False, sig_length: int | None = 2048, **ka) -> tuple[Column, Column]:
"""