forked from ismetacar/input
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
92 lines (70 loc) · 2.77 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import json
import os
from collections import deque
from datetime import timedelta, datetime
from flask import flash, session, request, url_for, redirect
from src import create_app, make_celery
from src.utils.errors import BlupointError
from src.utils.json_jelpers import parse_boolean
from src.utils.token import me_user, extract_token, refresh_token
iha_queue = deque([], 1500)
dha_queue = deque([], 1500)
aa_queue = deque([], 1500)
reuters_queue = deque([], 1500)
ap_queue = deque([], 1500)
hha_queue = deque([], 1500)
def config_settings():
config_file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "confs/local.py"))
with open(config_file_path, 'r') as f:
settings = json.loads(f.read().split(' = ')[1], object_hook=parse_boolean)
f.close()
return settings
settings = config_settings()
app = create_app(settings)
celery = make_celery(app, settings)
@app.before_request
def before_request():
if request.endpoint == 'healtcheck':
token = session.get('token', None)
if not token:
flash("Please Login to see actions")
return redirect(url_for('login'))
decoded_token = extract_token(token)
token_expire_date = decoded_token['exp']
if (datetime.fromtimestamp(token_expire_date) - timedelta(minutes=5)).timestamp() < datetime.now().timestamp():
refresh_api_endpoint = settings['management_api'] + '/tokens/refresh'
refreshed_token = refresh_token(refresh_api_endpoint, token)
if not refreshed_token:
flash("session closed automatically because no action was taken")
return redirect(url_for('login'))
session['token'] = refreshed_token['token']
if request.endpoint not in ['login', 'static', 'logout']:
token = session.get('token', None)
if not token:
flash("Please Login to see actions")
return redirect(url_for('login'))
me_api_endpoint = settings['management_api'] + '/me'
user = me_user(me_api_endpoint, token)
if not user:
flash("Failed to retrieve user information")
return redirect(url_for('login'))
@app.errorhandler(Exception)
def handle_exceptions(e):
if isinstance(e, BlupointError):
error = {
'err_msg': e.err_msg or 'Internal error occurred',
'err_code': e.err_code or 'errors.internalError',
'context': e.context,
'reason': e.reason
}
flash(error['err_msg'], 'error')
return redirect(url_for('login'))
else:
error = {
'err_msg': str(e),
'err_code': "errors.internalError"
}
flash(error['err_msg'], 'error')
return redirect(url_for('index'))
if __name__ == '__main__':
app.run()