xefyl/xefyl/models.py

265 lines
7.9 KiB
Python

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import BigInteger, Boolean, CheckConstraint, Column, DateTime, Float, ForeignKey, Integer, SmallInteger, String, Table, UniqueConstraint, func, insert, select, delete, text
from sqlalchemy.orm import declarative_base, relationship
from .hashing import Dhash
from .interutils import show_image
## constants
IMG_EXTENSIONS = {
'jpeg': 1, 'jpg': 1, 'png': 2, 'webp': 3,
'gif': 4, 'svg': 5, 'heic': 6
}
IMG_EXTENSIONS_REV = {
1: 'jpg', 2: 'png', 3: 'webp', 4: 'gif', 5: 'svg', 6: 'heic',
101: 'mp4'
}
VIDEO_EXTENSIONS = {
'mp4': 101
}
IMG_TYPE_CAMERA = 1
IMG_TYPE_DOWNLOAD = 2
IMG_TYPE_EDIT = 3
IMG_TYPE_SCREENSHOT = 4
IMG_FORMAT_JPEG = 1
IMG_FORMAT_PNG = 2
IMG_FORMAT_WEBP = 3
IMG_FORMAT_GIF = 4
IMG_FORMAT_SVG = 5
## END constants
## NOTE HEIC capabilities require pillow_heif
Base = declarative_base()
db = SQLAlchemy(model_class=Base)
class User(Base):
__tablename__ = "user"
id = Column(BigInteger, primary_key=True)
username = Column(String(30), CheckConstraint("username = lower(username) and username ~ '^[a-z0-9_-]+$'", name="user_valid_username"), unique=True)
display_name = Column(String(64), nullable=True)
passhash = Column(String(256), nullable=True)
joined_at = Column(String(256), nullable=True)
collections = relationship("ImageCollection", back_populates="owner")
class Image(Base):
__tablename__ = "image"
id = Column(BigInteger, primary_key=True)
type = Column(SmallInteger, index=True)
format = Column(SmallInteger) # e.g. .jpg, .png
name = Column(String(256), index=True)
hash = Column(String(256), index=True)
date = Column(DateTime, index=True)
size = Column(BigInteger)
# XXX Unique Indexes this length do not work with MySQL.
# Use PostgreSQL or SQLite instead.
pathname = Column(String(8192), unique=True)
tags = relationship(
"ImageTag",
back_populates="image"
)
in_collections = relationship(
"ImageCollection",
secondary="imageincollection",
back_populates="images"
)
@property
def hash_obj(self):
return Dhash(self.hash)
def url(self):
return f"/0/{self.hash_obj.diff}/{self.name}.{IMG_EXTENSIONS_REV[self.format]}"
def filename(self):
return f'{self.name}.{IMG_EXTENSIONS_REV[self.format]}'
def thumbnail_16x16_url(self):
return f'/thumb/16x16/{self.hash}.png'
@property
def category_text(self):
# deprecated
return 'Uncategorized'
def get_tags(self):
return [t.name for t in self.tags]
# TODO methods: set_tags, add_tags, replace_tags, show_i
def set_tags(self, replace: bool, tags):
with get_db().Session() as sess:
old_s = set(self.get_tags())
new_s = set(tags)
added_s = new_s - old_s
remo_s = old_s - new_s
for t in new_s:
sess.execute(insert(ImageTag).values(
image = self,
name = t,
))
if replace:
sess.execute(delete(ImageTag).where(ImageTag.image == self, ImageTag.name.in_(remo_s)))
sess.commit()
def show_i(self):
show_image(self.pathname)
class ImageTag(Base):
__tablename__ = "imagetag"
id = Column(BigInteger, primary_key = True)
image_id = Column(BigInteger, ForeignKey("image.id"))
name = Column(String(64))
image = relationship("Image", back_populates="tags")
__table_args__ = (
UniqueConstraint("image_id", "name"),
)
...
class ImageCollection(Base):
__tablename__ = "imagecollection"
id = Column(BigInteger, primary_key = True)
name = Column(String(128), nullable=False)
owner_id = Column(BigInteger, ForeignKey("user.id"), nullable=True)
description = Column(String(4096), nullable=True)
owner = relationship("User", back_populates="collections")
images = relationship("Image", back_populates="in_collections", secondary="imageincollection")
ImageInCollection = Table(
"imageincollection",
Base.metadata,
Column("image_id", BigInteger, ForeignKey("image.id"), primary_key=True),
Column("collection_id", BigInteger, ForeignKey("imagecollection.id"), primary_key=True),
Column("since", DateTime, server_default=func.current_timestamp(), index=True),
)
## TODO maybe merge it with Image?
class Video(Base):
__tablename__ = "video"
id = Column(BigInteger, primary_key = True)
type = Column(SmallInteger, index=True)
format = Column(SmallInteger) # e.g. .mp4
name = Column(String(256), index=True)
duration = Column(Float, index=True) # in seconds
date = Column(DateTime, index=True)
size = Column(BigInteger)
# XXX Unique Indexes this length do not work with MySQL.
# Use PostgreSQL or SQLite instead.
pathname = Column(String(8192), unique=True)
# helper methods
def url(self):
return '/video/{}.{}'.format(self.name, IMG_EXTENSIONS_REV[self.format] )
def filename(self):
return '{}.{}'.format(self.name, IMG_EXTENSIONS_REV[self.format])
def duration_s(self):
if self.duration > 3600.:
return "{0}:{1:02}:{2:02}".format(
int(self.duration // 3600),
int(self.duration // 60 % 60),
int(self.duration % 60)
)
else:
return "{0:02}:{1:02}".format(
int(self.duration // 60),
int(self.duration % 60)
)
class Page(Base):
__tablename__ = "page"
id = Column(BigInteger, primary_key=True)
url = Column(String(4096), unique=True, nullable=False)
title = Column(String(1024), index=True, nullable=False)
description = Column(String(1024), nullable=True)
# renamed from pub_date
created_at = Column(DateTime, server_default=func.current_timestamp(), index=True, nullable=False)
is_robots = Column(Boolean, server_default=text('false'))
class Href(Base):
__tablename__ = 'href'
id = Column(BigInteger, primary_key=True)
page_id = Column(BigInteger, ForeignKey('page.id'), nullable=True, unique=True)
url = Column(String(4096), unique=True, nullable=False)
@classmethod
def find(self, name: str):
hr = db.session.execute(
select(Href).where(Href.content == name)
).scalar()
if hr is None:
hr = db.session.execute(insert(Href).values(
url = name,
page_id = None
).returning(Href)).scalar()
return hr
class PageLink(Base):
__tablename__ = 'pagelink'
id = Column(BigInteger,primary_key=True)
from_page_id= Column(BigInteger, ForeignKey('page.id'), nullable=False)
to_page_id = Column(BigInteger, ForeignKey('page.id'), nullable=False)
rank = Column(Integer, server_default=text('1'), nullable=False)
__table_args__ = (
UniqueConstraint('from_page_id', 'to_page_id'),
)
class Word(Base):
__tablename__ = 'word'
id = Column(BigInteger, primary_key=True)
content = Column(String(256), unique=True, nullable=False)
parent_id = Column(BigInteger, ForeignKey('word.id'), nullable=True)
@classmethod
def find(self, name: str):
wo = db.session.execute(
select(Word).where(Word.content == name)
).scalar()
if wo is None:
wo = db.session.execute(insert(Word).values(
content = name,
parent_id = None
).returning(Word)).scalar()
return wo
class AnchorWord(Base):
__tablename__ = "anchorword"
id = Column(BigInteger,primary_key=True)
word_id = Column(BigInteger, ForeignKey('word.id'), index = True, nullable=False)
from_page_id = Column(BigInteger, ForeignKey('page.id'))
to_page_id = Column(BigInteger, ForeignKey('href.id'))
count = Column(Integer, server_default=text('1'))
...
## END MODELS
if __name__ == '__main__':
from . import CSI