salvi/extensions/instagram.py

173 lines
5.7 KiB
Python

from flask import Blueprint, render_template
from peewee import *
import instagram_private_api, json, os, sys, random, codecs
database = SqliteDatabase('instagram.sqlite')
class BaseModel(Model):
class Meta:
database = database
class InstagramProfile(BaseModel):
p_id = IntegerField()
p_username = CharField(30)
p_full_name = CharField(30)
p_biography = CharField(150)
posts_count = IntegerField()
followers_count = IntegerField()
following_count = IntegerField()
flags = BitField()
pub_date = DateTimeField()
is_verified = flags.flag(1)
is_private = flags.flag(2)
class InstagramMedia(BaseModel):
user = IntegerField()
pub_date = DateTimeField()
media_url = TextField()
description = CharField(2200)
def init_db():
database.create_tables([InstagramProfile, InstagramMedia])
def bytes_to_json(python_object):
if isinstance(python_object, bytes):
return {'__class__': 'bytes',
'__value__': codecs.encode(python_object, 'base64').decode()}
raise TypeError(repr(python_object) + ' is not JSON serializable')
def bytes_from_json(json_object):
if '__class__' in json_object and json_object['__class__'] == 'bytes':
return codecs.decode(json_object['__value__'].encode(), 'base64')
return json_object
SETTINGS_PATH = 'ig_api_settings'
def load_settings(username):
with open(os.path.join(SETTINGS_PATH, username + '.json')) as f:
settings = json.load(f, object_hook=bytes_from_json)
return settings
def save_settings(username, settings):
with open(os.path.join(SETTINGS_PATH, username + '.json'), 'w') as f:
json.dump(settings, f, default=bytes_to_json)
CLIENTS = []
def load_clients():
try:
with open(os.path.join(SETTINGS_PATH, 'config.txt')) as f:
conf = f.read()
except OSError:
print('Config file not found.')
return
for up in conf.split('\n'):
try:
up = up.split('#')[0].strip()
if not up:
continue
username, password = up.split(':')
try:
settings = load_settings(username)
except Exception:
settings = None
try:
if settings:
device_id = settings.get('device_id')
api = instagram_private_api.Client(
username, password,
settings=settings
)
else:
api = instagram_private_api.Client(
username, password,
on_login=lambda x: save_settings(username, x.settings)
)
except (instagram_private_api.ClientCookieExpiredError,
instagram_private_api.ClientLoginRequiredError) as e:
api = instagram_private_api.Client(
username, password,
device_id=device_id,
on_login=lambda x: save_settings(username, x.settings)
)
CLIENTS.append(api)
except Exception:
sys.excepthook(*sys.exc_info())
continue
def make_request(method_name, *args, **kwargs):
exc = None
usable_clients = list(range(len(CLIENTS)))
while usable_clients:
ci = random.choice(usable_clients)
client = CLIENTS[ci]
usable_clients.remove(ci)
try:
method = getattr(client, method_name)
except AttributeError:
raise ValueError('client has no method called {!r}'.format(method_name))
if not callable(method):
raise ValueError('client has no method called {!r}'.format(method_name))
try:
return method(*args, **kwargs)
except Exception as e:
exc = e
if exc:
raise exc
else:
raise RuntimeError('no active clients')
N_FORCE = 0
N_FALLBACK_CACHE = 1
N_PREFER_CACHE = 2
N_OFFLINE = 3
def choose_method(online, offline, network):
if network == N_FORCE:
return online()
elif network == N_FALLBACK_CACHE:
try:
return online()
except Exception:
return offline()
elif network == N_PREFER_CACHE:
try:
return offline()
except Exception:
return online()
elif network == N_OFFLINE:
return offline()
def get_profile_info(username_or_id, network=N_FALLBACK_CACHE):
if isinstance(username_or_id, str):
username, userid = username_or_id, None
elif isinstance(username_or_id, int):
username, userid = None, username_or_id
else:
raise TypeError('invalid username or id')
def online():
if userid:
data = make_request('user_info', userid)
else:
data = make_request('username_info', username)
return InstagramProfile.create(
p_id = data['user']['pk'],
p_username = data['user']['username'],
p_full_name = data['user']['full_name'],
p_biography = data['user']['biography'],
posts_count = data['user']['media_count'],
followers_count = data['user']['follower_count'],
following_count = data['user']['following_count'],
is_verified = data['user']['is_verified'],
is_private = data['user']['is_private'],
pub_date = datetime.datetime.now()
)
def offline():
if userid:
q = InstagramProfile.select().where(InstagramProfile.p_id == userid)
else:
q = InstagramProfile.select().where(InstagramProfile.p_username == username)
return q.order_by(InstagramProfile.pub_date.desc())[0]
return choose_method(online, offline, network)
load_clients()