Compare commits

..

5 commits

19 changed files with 433 additions and 37 deletions

1
.gitignore vendored
View file

@ -24,3 +24,4 @@ dist/
.err
.vscode
/run.sh
ROADMAP.md

View file

@ -2,15 +2,26 @@
## 0.4.0
+ Added `ValueProperty`, abstract superclass for `ConfigProperty`.
+ Added `ValueProperty`, abstract superclass for `ConfigProperty`
+ \[BREAKING] Changed the behavior of `makelist()`: now it's also a decorator, converting its return type to a list (revertable with `wrap=False`)
+ New module `lex` with functions `symbol_table()` and `lex()` — make tokenization more affordable
+ Add `dorks` module and `flask.harden()`
+ Add `sqlalchemy.bool_column()`: make making flags painless
+ Introduce `rb64encode()` and `rb64decode()` to deal with issues about Base64 and padding
+ Added `addattr()`, `PrefixIdentifier()`, `mod_floor()`, `mod_ceil()`
+ First version to have unit tests!
## 0.3.7
- Fixed a bug in `b64decode()` padding handling which made the function inconsistent and non injective. Now, leading `'A'` is NEVER stripped.
## 0.3.6
- Fixed `ConfigValue` behavior with multiple sources. It used to iterate through all the sources, possibly overwriting; now, iteration stops at first non-missing value.
- Fixed `ConfigValue` behavior with multiple sources. It used to iterate through all the sources, possibly overwriting; now, iteration stops at first non-missing value
## 0.3.5
- Fixed cb32 handling. Now leading zeros in SIQ's are stripped, and `.from_cb32()` was implemented.
- Fixed cb32 handling. Now leading zeros in SIQ's are stripped, and `.from_cb32()` was implemented
## 0.3.4

View file

@ -37,9 +37,7 @@ sqlalchemy = [
]
flask = [
"Flask>=2.0.0",
"Flask-RestX",
"Quart",
"Quart-Schema"
"Flask-RestX"
]
flask_sqlalchemy = [
"Flask-SqlAlchemy",
@ -50,6 +48,21 @@ peewee = [
markdown = [
"markdown>=3.0.0"
]
quart = [
"Flask>=2.0.0",
"Quart",
"Quart-Schema",
"uvloop; os_name=='posix'"
]
full = [
"sakuragasaki46-suou[sqlalchemy]",
"sakuragasaki46-suou[flask]",
"sakuragasaki46-suou[quart]",
"sakuragasaki46-suou[peewee]",
"sakuragasaki46-suou[markdown]"
]
[tool.setuptools.dynamic]
version = { attr = "suou.__version__" }

View file

@ -18,22 +18,27 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
from .iding import Siq, SiqCache, SiqType, SiqGen
from .codecs import (StringCase, cb32encode, cb32decode, b32lencode, b32ldecode, b64encode, b64decode, b2048encode, b2048decode,
jsonencode, want_bytes, want_str, ssv_list)
from .bits import count_ones, mask_shift, split_bits, join_bits
jsonencode, want_bytes, want_str, ssv_list, want_urlsafe)
from .bits import count_ones, mask_shift, split_bits, join_bits, mod_ceil, mod_floor
from .configparse import MissingConfigError, MissingConfigWarning, ConfigOptions, ConfigParserConfigSource, ConfigSource, DictConfigSource, ConfigValue, EnvConfigSource
from .functools import deprecated, not_implemented
from .classtools import Wanted, Incomplete
from .itertools import makelist, kwargs_prefix, ltuple, rtuple, additem
from .i18n import I18n, JsonI18n, TomlI18n
from .snowflake import Snowflake, SnowflakeGen
from .lex import symbol_table, lex, ilex
__version__ = "0.4.0-dev27"
__version__ = "0.4.0-dev28"
__all__ = (
'Siq', 'SiqCache', 'SiqType', 'SiqGen', 'StringCase',
'MissingConfigError', 'MissingConfigWarning', 'ConfigOptions', 'ConfigParserConfigSource', 'ConfigSource', 'ConfigValue', 'EnvConfigSource', 'DictConfigSource',
'deprecated', 'not_implemented', 'Wanted', 'Incomplete', 'jsonencode', 'ltuple', 'rtuple',
'makelist', 'kwargs_prefix', 'I18n', 'JsonI18n', 'TomlI18n', 'cb32encode', 'cb32decode', 'count_ones', 'mask_shift',
'want_bytes', 'want_str', 'version', 'b2048encode', 'split_bits', 'join_bits', 'b2048decode',
'Snowflake', 'SnowflakeGen', 'ssv_list', 'additem', 'b32lencode', 'b32ldecode', 'b64encode', 'b64decode'
'ConfigOptions', 'ConfigParserConfigSource', 'ConfigSource', 'ConfigValue',
'DictConfigSource', 'EnvConfigSource', 'I18n', 'Incomplete', 'JsonI18n',
'MissingConfigError', 'MissingConfigWarning', 'Siq', 'SiqCache', 'SiqGen',
'SiqType', 'Snowflake', 'SnowflakeGen', 'StringCase', 'TomlI18n', 'Wanted',
'additem', 'b2048decode', 'b2048encode', 'b32ldecode', 'b32lencode',
'b64encode', 'b64decode', 'cb32encode', 'cb32decode', 'count_ones',
'deprecated', 'ilex', 'join_bits', 'jsonencode', 'kwargs_prefix', 'lex',
'ltuple', 'makelist', 'mask_shift', 'mod_ceil', 'mod_floor',
'not_implemented', 'rtuple', 'split_bits', 'ssv_list', 'symbol_table',
'want_bytes', 'want_str', 'want_urlsafe'
)

View file

@ -1,5 +1,5 @@
'''
Utilities for working with bits
Utilities for working with bits & handy arithmetics
---
@ -93,5 +93,19 @@ def join_bits(l: list[int], nbits: int) -> bytes:
return ou
## arithmetics because yes
__all__ = ('count_ones', 'mask_shift', 'split_bits', 'join_bits')
def mod_floor(x: int, y: int) -> int:
"""
Greatest integer smaller than x and divisible by y
"""
return x - x % y
def mod_ceil(x: int, y: int) -> int:
"""
Smallest integer greater than x and divisible by y
"""
return x + (y - x % y) % y
__all__ = ('count_ones', 'mask_shift', 'split_bits', 'join_bits', 'mod_floor', 'mod_ceil')

View file

@ -22,7 +22,7 @@ import math
import re
from typing import Any, Callable
from .bits import split_bits, join_bits
from .bits import mod_ceil, split_bits, join_bits
from .functools import deprecated
# yes, I know ItsDangerous implements that as well, but remember
@ -49,6 +49,25 @@ def want_str(s: str | bytes, encoding: str = "utf-8", errors: str = "strict") ->
s = s.decode(encoding, errors)
return s
BASE64_TO_URLSAFE = str.maketrans('+/', '-_', ' ')
def want_urlsafe(s: str | bytes) -> str:
"""
Force a Base64 string into its urlsafe representation.
Behavior is unchecked and undefined with anything else than Base64 strings.
Used by b64encode() and b64decode().
"""
return want_str(s).translate(BASE64_TO_URLSAFE)
def want_urlsafe_bytes(s: str | bytes) -> bytes:
"""
Shorthand for want_bytes(want_urlsafe(s)).
"""
return want_bytes(want_urlsafe(s))
B32_TO_CROCKFORD = str.maketrans(
'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',
'0123456789ABCDEFGHJKMNPQRSTVWXYZ',
@ -59,6 +78,7 @@ CROCKFORD_TO_B32 = str.maketrans(
'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',
'=')
BIP39_WORD_LIST = """
abandon ability able about above absent absorb abstract absurd abuse access accident account accuse achieve acid acoustic acquire across act action
actor actress actual adapt add addict address adjust admit adult advance advice aerobic affair afford afraid again age agent agree ahead aim air airport
@ -178,16 +198,31 @@ def b32ldecode(val: bytes | str) -> bytes:
def b64encode(val: bytes, *, strip: bool = True) -> str:
'''
Wrapper around base64.urlsafe_b64encode() which also strips trailing '=' and leading 'A'.
Wrapper around base64.urlsafe_b64encode() which also strips trailing '='.
'''
b = want_str(base64.urlsafe_b64encode(val))
return b.lstrip('A').rstrip('=') if strip else b
return b.rstrip('=') if strip else b
def b64decode(val: bytes | str) -> bytes:
'''
Wrapper around base64.urlsafe_b64decode() which deals with padding.
'''
return base64.urlsafe_b64decode(want_bytes(val).replace(b'/', b'_').replace(b'+', b'-') + b'=' * ((4 - len(val) % 4) % 4))
val = want_urlsafe(val)
return base64.urlsafe_b64decode(val.ljust(mod_ceil(len(val), 4), '='))
def rb64encode(val: bytes, *, strip: bool = True) -> str:
'''
Call base64.urlsafe_b64encode() with null bytes i.e. '\\0' padding to the start. Leading 'A' are stripped from result.
'''
b = want_str(base64.urlsafe_b64encode(val.rjust(mod_ceil(len(val), 3), '\0')))
return b.lstrip('A') if strip else b
def rb64decode(val: bytes | str) -> bytes:
'''
Wrapper around base64.urlsafe_b64decode() which deals with padding.
'''
val = want_urlsafe(val)
return base64.urlsafe_b64decode(val.rjust(mod_ceil(len(val), 4), 'A'))
def b2048encode(val: bytes) -> str:
'''

28
src/suou/dorks.py Normal file
View file

@ -0,0 +1,28 @@
"""
Web app hardening and PT utilities.
---
Copyright (c) 2025 Sakuragasaki46.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
See LICENSE for the specific language governing permissions and
limitations under the License.
This software is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
"""
SENSITIVE_ENDPOINTS = """
/.git
/.gitignore
/node_modules
/wp-admin
/wp-login.php
/.ht
/package.json
/package-lock.json
/composer.
""".split()

View file

@ -14,8 +14,6 @@ This software is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
"""
class MissingConfigError(LookupError):
"""
Config variable not found.
@ -31,3 +29,18 @@ class MissingConfigWarning(MissingConfigError, Warning):
A required config property is missing, and the application is assuming a default value.
"""
pass
class LexError(SyntaxError):
"""
Illegal character or sequence found in the token stream.
"""
class InconsistencyError(RuntimeError):
"""
This program is in a state which it's not supposed to be in.
"""
__all__ = (
'MissingConfigError', 'MissingConfigWarning', 'LexError', 'InconsistencyError'
)

View file

@ -15,9 +15,10 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
"""
from typing import Any
from flask import Flask, current_app, g, request
from flask import Flask, abort, current_app, g, request
from .i18n import I18n
from .configparse import ConfigOptions
from .dorks import SENSITIVE_ENDPOINTS
def add_context_from_config(app: Flask, config: ConfigOptions) -> Flask:
@ -66,6 +67,21 @@ def get_flask_conf(key: str, default = None, *, app: Flask | None = None) -> Any
app = current_app
return app.config.get(key, default)
__all__ = ('add_context_from_config', 'add_i18n', 'get_flask_conf')
## XXX UNTESTED!
def harden(app: Flask):
"""
Make common "dork" endpoints unavailable
"""
i = 1
for ep in SENSITIVE_ENDPOINTS:
@app.route(f'{ep}<path:rest>', name=f'unavailable_{i}')
def unavailable(rest):
abort(403)
i += 1
return app
# Optional dependency: do not import into __init__.py
__all__ = ('add_context_from_config', 'add_i18n', 'get_flask_conf', 'harden')

View file

@ -74,5 +74,5 @@ class Api(_Api):
super().__init__(*a, **ka)
self.representations['application/json'] = output_json
# Optional dependency: do not import into __init__.py
__all__ = ('Api',)

View file

@ -76,5 +76,5 @@ def require_auth(cls: type[DeclarativeBase], db: SQLAlchemy) -> Callable[Any, Ca
return auth_required
# Optional dependency: do not import into __init__.py
__all__ = ('require_auth', )

View file

@ -14,20 +14,28 @@ This software is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
'''
from typing import Any, Iterable, MutableMapping, TypeVar
from functools import wraps
from typing import Any, Callable, Iterable, MutableMapping, TypeVar
import warnings
from suou.classtools import MISSING
_T = TypeVar('_T')
def makelist(l: Any) -> list:
def makelist(l: Any, *, wrap: bool = True) -> list | Callable[Any, list]:
'''
Make a list out of an iterable or a single value.
NEW 0.4.0: Now supports a callable: can be used to decorate generators and turn them into lists.
Pass wrap=False to return instead the unwrapped function in a list.
'''
if callable(l) and wrap:
return wraps(l)(lambda *a, **k: makelist(l(*a, **k), wrap=False))
if isinstance(l, (str, bytes, bytearray)):
return [l]
elif isinstance(l, Iterable):
return list(l)
elif l in (None, NotImplemented, Ellipsis):
elif l in (None, NotImplemented, Ellipsis, MISSING):
return []
else:
return [l]
@ -83,6 +91,18 @@ def additem(obj: MutableMapping, /, name: str = None):
return func
return decorator
def addattr(obj: Any, /, name: str = None):
"""
Same as additem() but setting as attribute instead.
"""
def decorator(func):
key = name or func.__name__
if hasattr(obj, key):
warnings.warn(f'object does already have attribute {key!r}')
setattr(obj, key, func)
return func
return decorator
__all__ = ('makelist', 'kwargs_prefix', 'ltuple', 'rtuple', 'additem')
__all__ = ('makelist', 'kwargs_prefix', 'ltuple', 'rtuple', 'additem', 'addattr')

86
src/suou/lex.py Normal file
View file

@ -0,0 +1,86 @@
"""
Utilities for tokenization of text.
---
"""
from re import Match
from dataclasses import dataclass
import re
from typing import Any, Callable, Iterable
from .exceptions import InconsistencyError, LexError
from .itertools import makelist
@dataclass
class TokenSym:
pattern: str
label: str
cast: Callable[[str], Any] | None = None
discard: bool = False
# convenience methods below
def match(self, s: str, index: int = 0) -> Match[str] | None:
return re.compile(self.pattern, 0).match(s, index)
@makelist
def symbol_table(*args: Iterable[tuple | TokenSym], whitespace: str | None = None):
"""
Make a symbol table from a list of tuples.
Tokens are in form (pattern, label[, cast]) where:
- [] means optional
- pattern is a regular expression (r-string syntax advised)
- label is a constant string
- cast is a function
Need to strip whitespace? Pass the whitespace= keyword parameter.
"""
for arg in args:
if isinstance(arg, TokenSym):
pass
elif isinstance(arg, tuple):
arg = TokenSym(*arg)
else:
raise TypeError(f'invalid type {arg.__class__.__name__!r}')
yield arg
if whitespace:
yield TokenSym('[' + re.escape(whitespace) + ']+', '', discard=True)
symbol_table: Callable[..., list]
def ilex(text: str, table: Iterable[TokenSym], *, whitespace = False):
"""
Return a text as a list of tokens, given a token table (iterable of TokenSym).
ilex() returns a generator; lex() returns a list.
table must be a result from symbol_table().
"""
i = 0
while i < len(text):
mo = None
for sym in table:
if mo := re.compile(sym.pattern).match(text, i):
if not sym.discard:
mtext = mo.group(0)
if callable(sym.cast):
mtext = sym.cast(mtext)
yield (sym.label, mtext)
elif whitespace:
yield (None, mo.group(0))
break
if mo is None:
raise LexError(f'illegal character near {text[i:i+5]!r}')
if i == mo.end(0):
raise InconsistencyError
i = mo.end(0)
lex: Callable[..., list] = makelist(ilex)
__all__ = ('symbol_table', 'lex', 'ilex')

View file

@ -117,6 +117,6 @@ class SiqField(Field):
def python_value(self, value: bytes) -> Siq:
return Siq.from_bytes(value)
# Optional dependency: do not import into __init__.py
__all__ = ('connect_reconnect', 'RegexCharField', 'SiqField')

View file

@ -20,7 +20,7 @@ from abc import ABCMeta, abstractmethod
from functools import wraps
from typing import Callable, Iterable, Never, TypeVar
import warnings
from sqlalchemy import BigInteger, CheckConstraint, Date, Dialect, ForeignKey, LargeBinary, Column, MetaData, SmallInteger, String, create_engine, select, text
from sqlalchemy import BigInteger, Boolean, CheckConstraint, Date, Dialect, ForeignKey, LargeBinary, Column, MetaData, SmallInteger, String, create_engine, select, text
from sqlalchemy.orm import DeclarativeBase, Session, declarative_base as _declarative_base, relationship
from .snowflake import SnowflakeGen
@ -120,7 +120,17 @@ def match_column(length: int, regex: str, /, case: StringCase = StringCase.AS_IS
constraint_name=constraint_name or f'{x.__tablename__}_{n}_valid')), *args, **kwargs)
def declarative_base(domain_name: str, master_secret: bytes, metadata: dict | None = None, **kwargs):
def bool_column(value: bool = False, nullable: bool = False, **kwargs):
"""
Column for a single boolean value.
NEW in 0.4.0
"""
def_val = text('true') if value else text('false')
return Column(Boolean, server_default=def_val, nullable=nullable, **kwargs)
def declarative_base(domain_name: str, master_secret: bytes, metadata: dict | None = None, **kwargs) -> DeclarativeBase:
"""
Drop-in replacement for sqlalchemy.orm.declarative_base()
taking in account requirements for SIQ generation (i.e. domain name).
@ -295,7 +305,7 @@ def require_auth_base(cls: type[DeclarativeBase], *, src: AuthSrc, column: str |
return wrapper
return decorator
# Optional dependency: do not import into __init__.py
__all__ = (
'IdType', 'id_column', 'entity_base', 'declarative_base', 'token_signer', 'match_column', 'match_constraint',
'author_pair', 'age_pair', 'require_auth_base', 'want_column'

50
src/suou/strtools.py Normal file
View file

@ -0,0 +1,50 @@
"""
Utilities for string manipulation.
Why `strtools`? Why not `string`? I just~ happen to not like it
---
Copyright (c) 2025 Sakuragasaki46.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
See LICENSE for the specific language governing permissions and
limitations under the License.
This software is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
"""
from typing import Callable, Iterable
from pydantic import validate_call
from .itertools import makelist
class PrefixIdentifier:
_prefix: str
def __init__(self, prefix: str | None, validators: Iterable[Callable[[str], bool]] | Callable[[str], bool] | None = None):
prefix = '' if prefix is None else prefix
if not isinstance(prefix, str):
raise TypeError
validators = makelist(validators, wrap=False)
for validator in validators:
if not validator(prefix):
raise ValueError('invalid prefix')
self._prefix = prefix
@validate_call()
def __getattr__(self, key: str):
return f'{self._prefix}{key}'
@validate_call()
def __getitem__(self, key: str) -> str:
return f'{self._prefix}{key}'
def __str__(self):
return f'{self._prefix}'
__all__ = ('PrefixIdentifier',)

View file

@ -1,5 +1,5 @@
"""
Utilities for marshmallow, a schema-agnostic serializer/deserializer.
Miscellaneous validator closures.
---

54
tests/test_codecs.py Normal file
View file

@ -0,0 +1,54 @@
import binascii
import unittest
from suou.codecs import b64encode, b64decode, want_urlsafe
B1 = b'N\xf0\xb4\xc3\x85\n\xf9\xb6\x9a\x0f\x82\xa6\x99G\x07#'
B2 = b'\xbcXiF,@|{\xbe\xe3\x0cz\xa8\xcbQ\x82'
B3 = b"\xe9\x18)\xcb'\xc2\x96\xae\xde\x86"
B4 = B1[-2:] + B2[:-2]
B5 = b'\xff\xf8\xa7\x8a\xdf\xff'
class TestCodecs(unittest.TestCase):
def setUp(self) -> None:
...
def tearDown(self) -> None:
...
#def runTest(self):
# self.test_b64encode()
# self.test_b64decode()
def test_b64encode(self):
self.assertEqual(b64encode(B1), 'TvC0w4UK-baaD4KmmUcHIw')
self.assertEqual(b64encode(B2), 'vFhpRixAfHu-4wx6qMtRgg')
self.assertEqual(b64encode(B3), '6RgpyyfClq7ehg')
self.assertEqual(b64encode(B4), 'ByO8WGlGLEB8e77jDHqoyw')
self.assertEqual(b64encode(B5), '__init__')
self.assertEqual(b64encode(B1[:4]), 'TvC0ww')
self.assertEqual(b64encode(b'\0' + B1[:4]), 'AE7wtMM')
self.assertEqual(b64encode(b'\0\0\0\0\0' + B1[:4]), 'AAAAAABO8LTD')
self.assertEqual(b64encode(b'\xff'), '_w')
self.assertEqual(b64encode(b''), '')
def test_b64decode(self):
self.assertEqual(b64decode('TvC0w4UK-baaD4KmmUcHIw'), B1)
self.assertEqual(b64decode('vFhpRixAfHu-4wx6qMtRgg'), B2)
self.assertEqual(b64decode('6RgpyyfClq7ehg'), B3)
self.assertEqual(b64decode('ByO8WGlGLEB8e77jDHqoyw'), B4)
self.assertEqual(b64decode('__init__'), B5)
self.assertEqual(b64decode('TvC0ww'), B1[:4])
self.assertEqual(b64decode('AE7wtMM'), b'\0' + B1[:4])
self.assertEqual(b64decode('AAAAAABO8LTD'), b'\0\0\0\0\0' + B1[:4])
self.assertEqual(b64decode('_w'), b'\xff')
self.assertEqual(b64decode(''), b'')
self.assertRaises(binascii.Error, b64decode, 'C')
def test_want_urlsafe(self):
self.assertEqual('__init__', want_urlsafe('//init_/'))
self.assertEqual('Disney-', want_urlsafe('Disney+'))
self.assertEqual('spaziocosenza', want_urlsafe('spazio cosenza'))
self.assertEqual('=======', want_urlsafe('======='))

40
tests/test_strtools.py Normal file
View file

@ -0,0 +1,40 @@
import unittest
from suou.strtools import PrefixIdentifier
from pydantic import ValidationError
class TestStrtools(unittest.TestCase):
def setUp(self) -> None:
...
def tearDown(self) -> None:
...
def test_PrefixIdentifier_empty(self):
pi = PrefixIdentifier(None)
self.assertEqual(pi.hello, 'hello')
self.assertEqual(pi['with spaces'], 'with spaces')
self.assertEqual(pi['\x1b\x00'], '\x1b\0')
self.assertEqual(pi.same_thing, pi['same_thing'])
with self.assertRaises(ValidationError):
pi[0]
self.assertEqual(f'{PrefixIdentifier(None)}', f'{PrefixIdentifier("")}')
def test_PrefixIdentifier_get_nostr(self):
with self.assertRaises(TypeError):
pi = PrefixIdentifier(1)
pi.hello
with self.assertRaises(TypeError):
PrefixIdentifier([99182])
with self.assertRaises(TypeError):
PrefixIdentifier(b'alpha_')