# (C) 2020-2021 Sakuragasaki46. # See LICENSE for copying info. ''' A simple wiki-like note webapp. Pages are stored in SQLite/MySQL databases. Markdown is used for text formatting. Application is kept compact, with all its core in a single file. ''' #### IMPORTS #### from flask import ( Flask, Markup, abort, flash, g, jsonify, make_response, redirect, request, render_template, send_from_directory) from flask_login import LoginManager, login_user, logout_user, current_user, login_required from flask_wtf import CSRFProtect from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.routing import BaseConverter from peewee import * from playhouse.db_url import connect as dbconnect import calendar, datetime, hashlib, html, importlib, json, markdown, os, random, \ re, sys, warnings from functools import lru_cache, partial from urllib.parse import quote from configparser import ConfigParser import i18n import gzip __version__ = '0.7-dev' #### CONSTANTS #### APP_BASE_DIR = os.path.dirname(__file__) FK = ForeignKeyField SLUG_RE = r'[a-z0-9]+(?:-[a-z0-9]+)*' ILINK_RE = r'\]\(/(p/\d+|' + SLUG_RE + ')/?\)' USERNAME_RE = r'[a-z0-9_-]{3,30}' PING_RE = r'(?' + match.group(1) + '' # XXX ugly monkeypatch to make spoilers prevail over blockquotes from markdown.blockprocessors import BlockQuoteProcessor BlockQuoteProcessor.RE = re.compile(r'(^|\n)[ ]{0,3}>(?!!)[ ]?(.*)') ### XXX it currently only detects spoilers that are not at the beginning of the line. To be fixed. class SpoilerExtension(markdown.extensions.Extension): def extendMarkdown(self, md, md_globals=None): md.inlinePatterns.register(markdown.inlinepatterns.SimpleTagInlineProcessor(r'()>!(.*?)!<', 'span class="spoiler"'), 'spoiler', 14) #class PingExtension(markdown.extensions.Extension): # def extendMarkdown(self, md): # pass #### DATABASE SCHEMA #### database_url = _getconf('database', 'url') if database_url: database = dbconnect(database_url) else: database = SqliteDatabase(_getconf("database", "directory") + '/data.sqlite') class BaseModel(Model): class Meta: database = database # Used for PagePolicy def _passphrase_hash(pp): pp_bin = pp.encode('utf-8') h = str(len(pp_bin)) + ':' + hashlib.sha256(pp_bin).hexdigest() return h class User(BaseModel): username = CharField(32, unique=True) email = CharField(256, null=True) password = CharField() join_date = DateTimeField(default=datetime.datetime.now) karma = IntegerField(default=1) privileges = BitField() is_admin = privileges.flag(1) # helpers for flask_login @property def is_anonymous(self): return False @property def is_active(self): return True @property def is_authenticated(self): return True class Page(BaseModel): url = CharField(64, unique=True, null=True) title = CharField(256, index=True) touched = DateTimeField(index=True) calendar = DateTimeField(index=True, null=True) owner = ForeignKeyField(User, null=True) flags = BitField() is_redirect = flags.flag(1) is_sync = flags.flag(2) is_math_enabled = flags.flag(4) is_locked = flags.flag(8) @property def latest(self): if self.revisions: return self.revisions.order_by(PageRevision.pub_date.desc())[0] def get_url(self): return '/' + self.url + '/' if self.url else '/p/{}/'.format(self.id) def short_desc(self): full_text = self.latest.text text = remove_tags(full_text, convert = not self.is_math_enabled and not _getconf('appearance', 'simple_remove_tags', False)) return text[:200] + ('\u2026' if len(text) > 200 else '') def change_tags(self, new_tags): old_tags = set(x.name for x in self.tags) new_tags = set(new_tags) PageTag.delete().where((PageTag.page == self) & (PageTag.name << (old_tags - new_tags))).execute() for tag in (new_tags - old_tags): PageTag.create(page=self, name=tag) def js_info(self): latest = self.latest return dict( id=self.id, url=self.url, title=self.title, is_redirect=self.is_redirect, touched=self.touched.timestamp(), is_editable=self.is_editable(), latest=dict( id=latest.id if latest else None, length=latest.length, pub_date=latest.pub_date.timestamp() if latest and latest.pub_date else None ), tags=[x.name for x in self.tags] ) @property def prop(self): return PagePropertyDict(self) def is_editable(self): return not self.is_locked def can_edit(self, user): if self.is_locked: return user.id == self.owner.id return True def is_owned_by(self, user): return user.id == self.owner.id class PageText(BaseModel): content = BlobField() flags = BitField() is_utf8 = flags.flag(1) is_gzipped = flags.flag(2) def get_content(self): c = self.content if self.is_gzipped: c = gzip.decompress(c) if self.is_utf8: return c.decode('utf-8') else: return c.decode('latin-1') @classmethod def create_content(cls, text, *, treshold=600, search_dup=True): c = text.encode('utf-8') use_gzip = len(c) > treshold if use_gzip and gzip: c = gzip.compress(c) if search_dup: item = cls.get_or_none((cls.content == c) & (cls.is_gzipped == use_gzip)) if item: return item return cls.create( content=c, is_utf8=True, is_gzipped=use_gzip ) class PageRevision(BaseModel): page = FK(Page, backref='revisions', index=True) user = ForeignKeyField(User, backref='contributions', null=True) comment = CharField(1024, default='') textref = FK(PageText) pub_date = DateTimeField(index=True) length = IntegerField() @property def text(self): return self.textref.get_content() def html(self, *, math=True): return md(self.text, math=self.page.is_math_enabled and math) def human_pub_date(self): delta = datetime.datetime.now() - self.pub_date T = partial(get_string, g.lang) if delta < datetime.timedelta(seconds=60): return T('just-now') elif delta < datetime.timedelta(seconds=3600): return T('n-minutes-ago').format(delta.seconds // 60) elif delta < datetime.timedelta(days=1): return T('n-hours-ago').format(delta.seconds // 3600) elif delta < datetime.timedelta(days=15): return T('n-days-ago').format(delta.days) else: return self.pub_date.strftime('%B %-d, %Y') class PageTag(BaseModel): page = FK(Page, backref='tags', index=True) name = CharField(64, index=True) class Meta: indexes = ( (('page', 'name'), True), ) def popularity(self): return PageTag.select().where(PageTag.name == self.name).count() class PageProperty(BaseModel): page = ForeignKeyField(Page, backref='page_meta', index=True) key = CharField(64) value = CharField(8000) class Meta: indexes = ( (('page', 'key'), True), ) # currently experimental class PagePropertyDict(object): def __init__(self, page): self._page = page def items(self): for kv in self._page.page_meta: yield kv.key, kv.value def __len__(self): return self._page.page_meta.count() def keys(self): for kv in self._page.page_meta: yield kv.key __iter__ = keys def __getitem__(self, key): try: return self._page.page_meta.get(PageProperty.key == key).value except PageProperty.DoesNotExist: raise KeyError(key) def get(self, key, default=None): try: return self._page.page_meta.get(PageProperty.key == key).value except PageProperty.DoesNotExist: return default def setdefault(self, key, default): try: return self._page.page_meta.get(PageProperty.key == key).value except PageProperty.DoesNotExist: self[key] = default return default def __setitem__(self, key, value): if key in self: pp = self._page.page_meta.get(PageProperty.key == key) pp.value = value pp.save() else: PageProperty.create(page=self._page, key=key, value=value) def __delitem__(self, key): PageProperty.delete().where((PageProperty.page == self._page) & (PageProperty.key == key)).execute() def __contains__(self, key): return PageProperty.select().where((PageProperty.page == self._page) & (PageProperty.key == key)).exists() # Link table for caching purposes. class PageLink(BaseModel): from_page = FK(Page, backref='forward_links') to_page = FK(Page, backref='back_links') class Meta: indexes = ( (('from_page', 'to_page'), True), ) @classmethod def parse_links(cls, from_page, text, erase=True): with database.atomic(): old_links = list(cls.select().where(cls.from_page == from_page)) for mo in re.finditer(ILINK_RE, text): try: pageurl = mo.group(1) if pageurl.startswith('p/'): to_page = Page[int(pageurl[2:])] else: to_page = Page.get(Page.url == pageurl) linkobj, created = PageLink.get_or_create( from_page = from_page, to_page = to_page) if linkobj in old_links: old_links.remove(linkobj) except Exception: continue if erase: for linkobj in old_links: linkobj.delete_instance() # The actual ULTIMATE method to refresh all links # To be called from a maintenance script only! @classmethod def refresh_all_links(cls): for p in Page.select(): cls.parse_links(p, p.latest.text) def init_db(): database.create_tables([ Page, PageText, PageRevision, PageTag, PageProperty, PageLink ]) #### WIKI SYNTAX #### def md(text, expand_magic=False, toc=True, math=True): if expand_magic: # DEPRECATED seeking for a better solution. warnings.warn('Magic words are no more supported.', DeprecationWarning) extensions = ['tables', 'footnotes', 'fenced_code', 'sane_lists'] extension_configs = {} if not _getconf('markdown', 'disable_custom_extensions'): extensions.append(StrikethroughExtension()) extensions.append(SpoilerExtension()) if toc: extensions.append('toc') if math and markdown_katex and ('$`' in text or '```math' in text): extensions.append('markdown_katex') extension_configs['markdown_katex'] = { 'no_inline_svg': True, 'insert_fonts_css': not _getconf('site', 'katex_url') } try: return markdown.Markdown(extensions=extensions, extension_configs=extension_configs).convert(text) except Exception as e: return '
There was an error during rendering: {e.__class__.__name__}: {e}
'.format(e=e) def remove_tags(text, convert=True, headings=True): if headings: text = re.sub(r'\#[^\n]*', '', text) if convert: text = md(text, toc=False, math=False) return re.sub(r'<.*?>', '', text) def is_username(s): return re.match('^' + USERNAME_RE + '$', s) #### I18N #### i18n.load_path.append(os.path.join(APP_BASE_DIR, 'i18n')) i18n.set('file_format', 'json') def get_string(loc, s): i18n.set('locale', loc) return i18n.t('salvi.' + s) #### APPLICATION CONFIG #### class SlugConverter(BaseConverter): regex = SLUG_RE def is_valid_url(url): return re.fullmatch(SLUG_RE, url) def is_url_available(url): return url not in forbidden_urls and not Page.select().where(Page.url == url).exists() forbidden_urls = [ 'about', 'accounts', 'ajax', 'backlinks', 'calendar', 'circles', 'create', 'easter', 'edit', 'embed', 'help', 'history', 'init-config', 'kt', 'manage', 'media', 'p', 'privacy', 'protect', 'search', 'static', 'stats', 'tags', 'terms', 'u', 'upload', 'upload-info' ] app = Flask(__name__) app.secret_key = b'\xf3\xa9?\xbee$L\xabA\xd3\r\xa2\x08\xf6\x00%0b\xa9\xfe\x11\x04\xa6\xd8=\xd3\xa2\x00\xb3\xd5;9' app.url_map.converters['slug'] = SlugConverter csrf = CSRFProtect(app) login_manager = LoginManager(app) login_manager.login_view = 'accounts_login' #### ROUTES #### @app.before_request def _before_request(): if request.args.get('uselang') is not None: lang = request.args['uselang'] else: for l in request.headers.get('accept-language', 'it,en').split(','): if ';' in l: l, _ = l.split(';') lang = l break else: lang = 'en' g.lang = lang @app.context_processor def _inject_variables(): return { 'T': partial(get_string, g.lang), 'app_name': _getconf('site', 'title'), 'strong': lambda x:Markup('{0}').format(x), 'app_version': __version__, 'math_version': markdown_katex.__version__ if markdown_katex else None, 'material_icons_url': _getconf('site', 'material_icons_url'), 'katex_url': _getconf('site', 'katex_url') } @login_manager.user_loader def _inject_user(userid): return User[userid] @app.template_filter() def linebreaks(text): text = html.escape(text) text = text.replace("\n\n", '').replace('\n', '
')
return Markup(text)
@app.route('/')
def homepage():
page_limit = _getconf("appearance","items_per_page",20,cast=int)
return render_template('home.jinja2', new_notes=Page.select()
.order_by(Page.touched.desc()).limit(page_limit))
@app.route('/robots.txt')
def robots():
return send_from_directory(APP_BASE_DIR, 'robots.txt')
@app.route('/favicon.ico')
def favicon():
return send_from_directory(APP_BASE_DIR, 'favicon.ico')
## error handlers ##
@app.errorhandler(404)
def error_404(body):
return render_template('notfound.jinja2'), 404
@app.errorhandler(403)
def error_403(body):
return render_template('forbidden.jinja2'), 403
@app.errorhandler(400)
def error_400(body):
return render_template('badrequest.jinja2'), 400
@app.errorhandler(500)
def error_500(body):
return render_template('internalservererror.jinja2'), 500
# Middle point during page editing.
def savepoint(form, is_preview=False, pageobj=None):
if is_preview:
preview = md(form['text'], math='enablemath' in form)
else:
preview = None
pl_js_info = dict()
pl_js_info['editing'] = dict(
original_text = None, # TODO
preview_text = form['text'],
page_id = pageobj.id if pageobj else None
)
return render_template(
'edit.jinja2',
pl_url=form['url'],
pl_title=form['title'],
pl_text=form['text'],
pl_tags=form['tags'],
pl_comment=form['comment'],
pl_enablemath='enablemath' in form,
pl_is_locked='lockpage' in form,
pl_owner_is_current_user=pageobj.is_owned_by(current_user) if pageobj else True,
preview=preview,
pl_js_info=pl_js_info,
pl_calendar=('usecalendar' in form) and form['calendar'],
pl_readonly=not pageobj.can_edit(current_user) if pageobj else False
)
@app.route('/create/', methods=['GET', 'POST'])
@login_required
def create():
if request.method == 'POST':
if request.form.get('preview'):
return savepoint(request.form, is_preview=True)
p_url = request.form['url'] or None
if p_url:
if not is_valid_url(p_url):
flash('Invalid URL. Valid URLs contain only letters, numbers and hyphens.')
return savepoint(request.form)
elif not is_url_available(p_url):
flash('This URL is not available.')
return savepoint(request.form)
p_tags = [x.strip().lower().replace(' ', '-').replace('_', '-').lstrip('#')
for x in request.form.get('tags', '').split(',') if x]
if any(not re.fullmatch(SLUG_RE, x) for x in p_tags):
flash('Invalid tags text. Tags contain only letters, numbers and hyphens, and are separated by comma.')
return savepoint(request.form)
try:
p = Page.create(
url=p_url,
title=request.form['title'],
is_redirect=False,
touched=datetime.datetime.now(),
owner_id=current_user.id,
calendar=datetime.date.fromisoformat(request.form["calendar"]) if 'usecalendar' in request.form else None,
is_math_enabled='enablemath' in request.form,
is_locked = 'lockpage' in request.form
)
p.change_tags(p_tags)
except IntegrityError as e:
flash('An error occurred while saving this revision: {e}'.format(e=e))
return savepoint(request.form)
pr = PageRevision.create(
page=p,
user_id=p.owner.id,
comment='',
textref=PageText.create_content(request.form['text']),
pub_date=datetime.datetime.now(),
length=len(request.form['text'])
)
PageLink.parse_links(p, request.form['text'])
return redirect(p.get_url())
return savepoint({
"url": request.args.get("url"),
"title": "",
"text": "",
"tags": "",
"comment": get_string(g.lang, "page-created")
})
@app.route('/edit/{0}