# (C) 2020-2021 Sakuragasaki46.
# See LICENSE for copying info.
'''
A simple wiki-like note webapp.
Pages are stored in SQLite databases.
Markdown is used for text formatting.
Application is kept compact, with all its core in a single file.
Extensions are supported (?), kept in extensions/ folder.
'''
#### IMPORTS ####
from flask import (
Flask, Markup, abort, flash, g, jsonify, make_response, redirect, request,
render_template, send_from_directory)
from werkzeug.routing import BaseConverter
from peewee import *
import (
csv, datetime, hashlib, html, importlib, json, markdown, os, random, re,
sys, uuid, warnings)
from functools import lru_cache, partial
from urllib.parse import quote
from configparser import ConfigParser
try:
import gzip
except ImportError:
gzip = None
try:
from slugify import slugify
except ImportError:
slugify = None
try:
import markdown_strikethrough
except Exception:
markdown_strikethrough = None
__version__ = '0.4'
#### CONSTANTS ####
APP_BASE_DIR = os.path.dirname(__file__)
FK = ForeignKeyField
SLUG_RE = r'[a-z0-9]+(?:-[a-z0-9]+)*'
MAGIC_RE = r'\{\{\s*(' + SLUG_RE + ')\s*:\s*(.*?)\s*\}\}'
REDIRECT_RE = r'\{\{\s*redirect\s*:\s*(\d+)\s*\}\}'
upload_types = {'jpeg': 1, 'jpg': 1, 'png': 2}
upload_types_rev = {1: 'jpg', 2: 'png'}
UPLOAD_DIR = APP_BASE_DIR + '/media'
DATABASE_DIR = APP_BASE_DIR + "/database"
#### GENERAL CONFIG ####
DEFAULT_CONF = {
('site', 'title'): 'Salvi',
('config', 'media_dir'): APP_BASE_DIR + '/media',
('config', 'database_dir'): APP_BASE_DIR + "/database",
}
_cfp = ConfigParser()
if _cfp.read([APP_BASE_DIR + '/site.conf']):
@lru_cache(maxsize=50)
def _getconf(k1, k2, fallback=None, cast=None):
if fallback is None:
fallback = DEFAULT_CONF.get((k1, k2))
v = _cfp.get(k1, k2, fallback=fallback)
if cast in (int, float, str):
try:
v = cast(v)
except ValueError:
v = fallback
return v
else:
def _getconf(k1, k2, fallback=None, cast=None):
if fallback is None:
fallback = DEFAULT_CONF.get((k1, k2))
return fallback
#### misc. helpers ####
def _makelist(l):
if isinstance(l, (str, bytes, bytearray)):
return [l]
elif hasattr(l, '__iter__'):
return list(l)
elif l:
return [l]
else:
return []
#### DATABASE SCHEMA ####
database = SqliteDatabase(_getconf("config", "database_dir") + '/data.sqlite')
class BaseModel(Model):
class Meta:
database = database
# Used for PagePolicy
def _passphrase_hash(pp):
pp_bin = pp.encode('utf-8')
h = str(len(pp_bin)) + ':' + hashlib.sha256(pp_bin).hexdigest()
return h
class Page(BaseModel):
url = CharField(64, unique=True, null=True)
title = CharField(256, index=True)
touched = DateTimeField(index=True)
flags = BitField()
is_redirect = flags.flag(1)
is_sync = flags.flag(2)
@property
def latest(self):
if self.revisions:
return self.revisions.order_by(PageRevision.pub_date.desc())[0]
def get_url(self):
return '/' + self.url + '/' if self.url else '/p/{}/'.format(self.id)
def short_desc(self):
text = remove_tags(self.latest.text)
return text[:200] + ('\u2026' if len(text) > 200 else '')
def change_tags(self, new_tags):
old_tags = set(x.name for x in self.tags)
new_tags = set(new_tags)
PageTag.delete().where((PageTag.page == self) &
(PageTag.name << (old_tags - new_tags))).execute()
for tag in (new_tags - old_tags):
PageTag.create(page=self, name=tag)
def js_info(self):
latest = self.latest
return dict(
id=self.id,
url=self.url,
title=self.title,
is_redirect=self.is_redirect,
touched=self.touched.timestamp(),
is_editable=self.is_editable(),
latest=dict(
id=latest.id if latest else None,
length=latest.length,
pub_date=latest.pub_date.timestamp() if latest and latest.pub_date else None
),
tags=[x.name for x in self.tags]
)
@property
def prop(self):
return PagePropertyDict(self)
def unlock(self, perm, pp, sec):
## XX complete later!
policies = self.policies.where(PagePolicy.type << _makelist(perm))
if not policies.exists():
return True
for policy in policies:
if policy.verify(pp, sec):
return True
return False
def is_locked(self, perm):
policies = self.policies.where(PagePolicy.type << _makelist(perm))
return policies.exists()
def is_classified(self):
return self.is_locked(POLICY_CLASSIFY)
def is_editable(self):
return not self.is_locked(POLICY_EDIT)
class PageText(BaseModel):
content = BlobField()
flags = BitField()
is_utf8 = flags.flag(1)
is_gzipped = flags.flag(2)
def get_content(self):
c = self.content
if self.is_gzipped:
c = gzip.decompress(c)
if self.is_utf8:
return c.decode('utf-8')
else:
return c.decode('latin-1')
@classmethod
def create_content(cls, text, treshold=600, search_dup=True):
c = text.encode('utf-8')
use_gzip = len(c) > treshold
if use_gzip and gzip:
c = gzip.compress(c)
if search_dup:
item = cls.get_or_none((cls.content == c) & (cls.is_gzipped == use_gzip))
if item:
return item
return cls.create(
content=c,
is_utf8=True,
is_gzipped=use_gzip
)
class PageRevision(BaseModel):
page = FK(Page, backref='revisions', index=True)
user_id = IntegerField(default=0)
comment = CharField(1024, default='')
textref = FK(PageText)
pub_date = DateTimeField(index=True)
length = IntegerField()
@property
def text(self):
return self.textref.get_content()
def html(self):
return md(self.text)
def human_pub_date(self):
delta = datetime.datetime.now() - self.pub_date
T = partial(get_string, g.lang)
if delta < datetime.timedelta(seconds=60):
return T('just-now')
elif delta < datetime.timedelta(seconds=3600):
return T('n-minutes-ago').format(delta.seconds // 60)
elif delta < datetime.timedelta(days=1):
return T('n-hours-ago').format(delta.seconds // 3600)
elif delta < datetime.timedelta(days=15):
return T('n-days-ago').format(delta.days)
else:
return self.pub_date.strftime('%B %-d, %Y')
class PageTag(BaseModel):
page = FK(Page, backref='tags', index=True)
name = CharField(64, index=True)
class Meta:
indexes = (
(('page', 'name'), True),
)
def popularity(self):
return PageTag.select().where(PageTag.name == self.name).count()
class PageProperty(BaseModel):
page = ForeignKeyField(Page, backref='page_meta', index=True)
key = CharField(64)
value = CharField(8000)
class Meta:
indexes = (
(('page', 'key'), True),
)
# currently experimental
class PagePropertyDict(object):
def __init__(self, page):
self._page = page
def items(self):
for kv in self._page.page_meta:
yield kv.key, kv.value
def __len__(self):
return self._page.page_meta.count()
def keys(self):
for kv in self._page.page_meta:
yield kv.key
__iter__ = keys
def __getitem__(self, key):
try:
return self._page.page_meta.get(PageProperty.key == key).value
except PageProperty.DoesNotExist:
raise KeyError(key)
def get(self, key, default=None):
try:
return self._page.page_meta.get(PageProperty.key == key).value
except PageProperty.DoesNotExist:
return default
def setdefault(self, key, default):
try:
return self._page.page_meta.get(PageProperty.key == key).value
except PageProperty.DoesNotExist:
self[key] = default
return default
def __setitem__(self, key, value):
if key in self:
pp = self._page.page_meta.get(PageProperty.key == key)
pp.value = value
pp.save()
else:
PageProperty.create(page=self._page, key=key, value=value)
def __delitem__(self, key):
PageProperty.delete().where((PageProperty.page == self._page) &
(PageProperty.key == key)).execute()
def __contains__(self, key):
return PageProperty.select().where((PageProperty.page == self._page) &
(PageProperty.key == key)).exists()
# Store keys for PagePolicy.
# Experimental.
class PagePolicyKey(BaseModel):
passphrase = CharField()
sec_code = IntegerField()
class Meta:
indexes = (
(('passphrase','sec_code'), True),
)
@classmethod
def create_from_plain(cls, pp, sec):
PagePolicyKey.create(passphrase=_passphrase_hash(pp), sec_code=sec)
def verify(self, pp, sec):
h = _passphrase_hash(pp)
return self.passphrase == h and self.sec_code == sec
POLICY_ADMIN = 1
POLICY_READ = 2
POLICY_EDIT = 3
POLICY_META = 4
POLICY_CLASSIFY = 5
# Manage policies for pages (e.g., reading or editing).
# Experimental.
class PagePolicy(BaseModel):
page = FK(Page, backref='policies', index=True, null=True)
type = IntegerField()
key = FK(PagePolicyKey, backref='applied_to')
sitewide = IntegerField(default=0)
class Meta:
indexes = (
(('page', 'key'), True),
)
class Upload(BaseModel):
name = CharField(256)
url_name = CharField(256, null=True)
filetype = SmallIntegerField()
filesize = IntegerField()
upload_date = DateTimeField(index=True)
md5 = CharField(32, index=True)
@property
def filepath(self):
return '{0}/{1}/{2}{3}.{4}'.format(self.md5[:1], self.md5[:2], self.id,
'-' + self.url_name if self.url_name else '', upload_types_rev[self.filetype])
@property
def url(self):
return '/media/' + self.filepath
def get_content(self, check=True):
with open(os.path.join(UPLOAD_DIR, self.filepath)) as f:
content = f.read()
if check:
if len(content) != self.filesize:
raise AssertionError('file is corrupted')
if hashlib.md5(content).hexdigest() != self.md5:
raise AssertionError('file is corrupted')
return content
@classmethod
def create_content(cls, name, ext, content):
ext = ext.lstrip('.')
if ext not in upload_types:
raise ValueError('invalid file type')
filetype = upload_types[ext]
name = name[:256]
if slugify:
url_name = slugify(name)[:256]
else:
url_name = None
filemd5 = hashlib.md5(content).hexdigest()
basepath = os.path.join(UPLOAD_DIR, filemd5[:1], filemd5[:2])
if not os.path.exists(basepath):
os.makedirs(basepath)
obj = cls.create(
name=name,
url_name=url_name,
filetype=filetype,
filesize=len(content),
upload_date=datetime.datetime.now(),
md5=filemd5
)
try:
with open(os.path.join(basepath, '{0}{1}.{2}'.format(obj.id,
'-' + url_name if url_name else '', upload_types_rev[filetype]
)), 'wb') as f:
f.write(content)
except OSError:
cls.delete_by_id(obj.id)
raise
return obj
def init_db():
database.create_tables([Page, PageText, PageRevision, PageTag, PageProperty, PagePolicyKey, PagePolicy, Upload])
#### WIKI SYNTAX ####
magic_word_filters = {}
def _replace_magic_word(match):
name = match.group(1)
if name not in magic_word_filters:
return match.group()
f = magic_word_filters[name]
try:
return f(*(x.strip() for x in match.group(2).split('|')))
except Exception:
return ''
def expand_magic_words(text):
'''
Replace the special markups in double curly brackets.
Unknown keywords are not replaced. Valid keywords with invalid arguments are replaced with nothing.
'''
return re.sub(MAGIC_RE, _replace_magic_word, text)
def md(text, expand_magic=False, toc=True):
if expand_magic:
# DEPRECATED seeking for a better solution.
warnings.warn('Magic words are no more supported.', DeprecationWarning)
text = expand_magic_words(text)
extensions = ['tables', 'footnotes', 'fenced_code', 'sane_lists']
if markdown_strikethrough:
extensions.append("markdown_strikethrough.extension")
if toc:
extensions.append('toc')
return markdown.Markdown(extensions=extensions).convert(text)
def remove_tags(text, convert=True, headings=True):
if headings:
text = re.sub(r'\#[^\n]*', '', text)
if convert:
text = md(text, expand_magic=False, toc=False)
return re.sub(r'<.*?>|\{\{.*?\}\}', '', text)
### Magic words (deprecated!) ###
def expand_backto(pageid):
p = Page[pageid]
return '*« Main article: [{}]({}).*'.format(html.escape(p.title), p.get_url())
magic_word_filters['backto'] = expand_backto
def expand_upload(id, *opt):
try:
upload = Upload[id]
except Upload.DoesNotExist:
return ''
if opt:
desc = opt[-1]
else:
desc = None
classname = 'fig-right'
return '{3}
{3}
').replace('\n', '
')
return Markup(text)
@app.route('/')
def homepage():
page_limit = _getconf("appearance","items_per_page",20,cast=int)
return render_template('home.html', new_notes=Page.select()
.order_by(Page.touched.desc()).limit(page_limit),
gallery=make_gallery((x, '') for x in Upload.select().order_by(Upload.upload_date.desc()).limit(3)))
@app.route('/robots.txt')
def robots():
return send_from_directory(APP_BASE_DIR, 'robots.txt')
@app.route('/favicon.ico')
def favicon():
return send_from_directory(APP_BASE_DIR, 'favicon.ico')
## error handlers ##
@app.errorhandler(404)
def error_404(body):
return render_template('notfound.html'), 404
@app.errorhandler(403)
def error_403(body):
return render_template('forbidden.html'), 403
@app.errorhandler(500)
def error_400(body):
return render_template('badrequest.html'), 400
# Middle point during page editing.
def savepoint(form, is_preview=False):
if is_preview:
preview = md(form['text'])
else:
preview = None
pl_js_info = dict()
pl_js_info['editing'] = dict(
original_text = None, # TODO
preview_text = form['text'],
)
return render_template('edit.html', pl_url=form['url'], pl_title=form['title'], pl_text=form['text'], pl_tags=form['tags'], preview=preview, pl_js_info=pl_js_info)
@app.route('/create/', methods=['GET', 'POST'])
def create():
if request.method == 'POST':
if request.form.get('preview'):
return savepoint(request.form, is_preview=True)
p_url = request.form['url'] or None
if p_url:
if not is_valid_url(p_url):
flash('Invalid URL. Valid URLs contain only letters, numbers and hyphens.')
return savepoint(request.form)
elif not is_url_available(p_url):
flash('This URL is not available.')
return savepoint(request.form)
p_tags = [x.strip().lower().replace(' ', '-').replace('_', '-').lstrip('#')
for x in request.form.get('tags', '').split(',') if x]
if any(not re.fullmatch(SLUG_RE, x) for x in p_tags):
flash('Invalid tags text. Tags contain only letters, numbers and hyphens, and are separated by comma.')
return savepoint(request.form)
try:
p = Page.create(
url=p_url,
title=request.form['title'],
is_redirect=False,
touched=datetime.datetime.now(),
)
p.change_tags(p_tags)
except IntegrityError as e:
flash('An error occurred while saving this revision: {e}'.format(e=e))
return savepoint(request.form)
pr = PageRevision.create(
page=p,
user_id=0,
comment='',
textref=PageText.create_content(request.form['text']),
pub_date=datetime.datetime.now(),
length=len(request.form['text'])
)
return redirect(p.get_url())
return render_template('edit.html', pl_url=request.args.get('url'))
@app.route('/edit/{0}