forked from Chandan8186/cs3103-grp-30
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
278 lines (228 loc) · 9.83 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
"""
This script runs the application using a development server.
It contains the definition of routes and views for the application.
"""
from flask import Flask, render_template, request, redirect, url_for, flash
from flask_login import LoginManager, login_user, logout_user, current_user
from flask_dance.contrib.google import make_google_blueprint, google
from flask_dance.contrib.azure import make_azure_blueprint, azure
from oauthlib.oauth2.rfc6749.errors import InvalidGrantError, TokenExpiredError
from parser import Parser
from image_link import Image_Count_Manager
from email_manager import Email_Manager
from login import LoginForm, User, SMTP_User, Google_User, Azure_User
import os
import keyring
import json
app = Flask(__name__)
app.secret_key = "testing"
# Please register an OAuth app at Google Cloud / Microsoft Azure to generate client_id and client_secret
# Additionally, please ensure that you have given the app the necessary API permissions such as scopes,
# redirect uri, permitted emails if testing, etc. Note that Azure does only allows localhost for http,
# and NOT 127.0.0.1.
google_bp = make_google_blueprint(
client_id="",
client_secret="",
redirect_to="login_google",
offline=True,
scope=["https://www.googleapis.com/auth/userinfo.email", "https://www.googleapis.com/auth/gmail.send"]
)
azure_bp = make_azure_blueprint(
client_id="",
client_secret="",
redirect_to="login_azure",
scope=["https://graph.microsoft.com/.default"]
)
app.register_blueprint(google_bp, url_prefix="/login")
app.register_blueprint(azure_bp, url_prefix="/login")
login_manager = LoginManager()
login_manager.init_app(app)
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" # Allow running on localhost without https
os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1" # Server may give more scopes than requested
wsgi_app = app.wsgi_app
UPLOAD_FOLDER = 'uploads'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
image_count_manager = Image_Count_Manager()
email_manager = Email_Manager()
def store_secret(key_name, secret_value):
keyring.set_password('SmartMailerApp', key_name, secret_value)
def retrieve_secret(key_name):
return keyring.get_password('SmartMailerApp', key_name)
def delete_secret(key_name):
keyring.delete_password('SmartMailerApp', key_name)
parser = None
# Ensure the upload folder exists
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
@app.route('/')
def index():
if not current_user.is_authenticated:
return redirect(url_for('login'))
return render_template('index.html')
@login_manager.user_loader
def load_user(user_id):
# Note: This function will be ran with every page refresh by flask_login to ensure security.
return User.load(user_id)
@app.route('/logout', methods=['GET'])
def logout():
if current_user.is_authenticated:
user_id = current_user.get_id()
delete_secret(user_id)
logout_user()
return redirect(url_for('index'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = LoginForm(request.form)
if request.method == 'POST' and form.validate():
email = form.email.data
password = form.password.data
user = SMTP_User(email, password)
user_data_json = json.dumps({'password': password})
store_secret(user.get_id(), user_data_json)
login_user(user, remember=True)
next = request.args.get('next')
# !TO VALIDATE IN PRODUCTION APP! (with a valid domain name)
# if not url_has_allowed_host_and_scheme(next, request.host):
# return flask.abort(400)
return redirect(next or url_for('index'))
return render_template('login.html', form=form)
@app.route('/login_google', methods=['GET'])
def login_google():
if current_user.is_authenticated:
return redirect(url_for('index'))
if not google.authorized:
return redirect(url_for("google.login"))
try:
email = google.get("/oauth2/v1/userinfo").json()["email"]
except (InvalidGrantError, TokenExpiredError):
return redirect(url_for("google.login"))
user = Google_User(email)
store_secret(user.get_id(), json.dumps({}))
login_user(user, remember=True)
return redirect(url_for('index'))
@app.route('/login_azure', methods=['GET'])
def login_azure():
if current_user.is_authenticated:
return redirect(url_for('index'))
if not azure.authorized:
return redirect(url_for("azure.login"))
try:
email = azure.get("/v1.0/me").json()["userPrincipalName"]
except (InvalidGrantError, TokenExpiredError):
return redirect(url_for("azure.login"))
user = Azure_User(email)
store_secret(user.get_id(), json.dumps({}))
login_user(user, remember=True)
return redirect(url_for('index'))
#goes to the preview.html website
#csv_file and body_file comes from index.html website
@app.route('/preview', methods=['POST'])
def upload_file():
if not current_user.is_authenticated:
return redirect(url_for('login'))
if 'csv_file' not in request.files or 'body_file' not in request.files:
flash('Missing file(s)')
return redirect(url_for('index'))
csv_file = request.files['csv_file']
body_file = request.files['body_file']
if csv_file.filename == '' or body_file.filename == '':
flash('No selected file(s)')
return redirect(url_for('index'))
if csv_file and body_file:
#get file paths for both and put them in the uploads folder
csv_name = csv_file.filename
csvpath = os.path.join(app.config['UPLOAD_FOLDER'], csv_name)
csv_file.save(csvpath)
body_name = body_file.filename
bodypath = os.path.join(app.config['UPLOAD_FOLDER'], body_name)
body_file.save(bodypath)
#using the parser class to prepare the emails
try:
# set the global parser variable
global parser
parser = Parser(csvpath, bodypath)
except Exception as e:
flash(f"{e}")
return redirect(url_for('index'))
department_input = request.form.get('department_search')
if department_input and department_input != "all" and department_input not in parser.departments:
flash(f'There is no user in the given department: "{department_input}"')
return redirect(url_for('index'))
department = department_input if department_input else "all"
try:
if "view-counts" in request.form:
emails = parser.prepare_all_emails(department, attach_transparent_images=False)
parser.update_report_data(emails)
report = parser.prepare_report()
hashes = [email['hash'] for email in emails]
image_count_manager.update_unique_id_list(hashes)
return render_template('upload.html', emails=emails, headers=parser.headers,
report=report, is_send=False)
elif "preview-emails" in request.form:
email = parser.prepare_first_email()
placeholders = ['#' + header + '#' for header in parser.headers]
placeholders = ', '.join(placeholders)
if not email_manager.allow_next_batch():
return redirect(url_for('index'))
return render_template('preview.html',
department=department,
subject=email['subject'],
body=email['body'],
placeholders=placeholders)
except Exception as e:
flash(f'An error occurred: {str(e)}')
return redirect(url_for('index'))
# asks for user confirmation then sends the emails
@app.route('/upload', methods=['POST'])
def preview_and_send():
if not current_user.is_authenticated:
return redirect(url_for('login'))
try:
if "go-back" in request.form:
return redirect(url_for('index'))
if email_manager.is_sending():
flash("Note: The current batch of emails are still being sent. Previewing emails being sent instead.")
return redirect(url_for('sent_emails'))
emails = parser.prepare_all_emails(request.form.get('department'))
parser.update_report_data(emails)
email_manager.store_header_and_report(parser.headers, parser.prepare_report())
email_manager.send_emails(current_user._get_current_object(), emails)
return redirect(url_for('sent_emails'))
except Exception as e:
flash(f'An error occurred: {str(e)}')
return redirect(url_for('index'))
@app.get('/sent_emails')
def sent_emails():
if not email_manager.headers:
flash("No recently sent emails were found.")
return redirect(url_for('index'))
hashes = [email['hash'] for email in email_manager.emails]
image_count_manager.update_unique_id_list(hashes)
return render_template('upload.html', emails=email_manager.emails, headers=email_manager.headers,
report=email_manager.report, is_send=True)
@app.get("/cancel_sending_emails")
def cancel_sending_emails():
email_manager.cancel()
return redirect(url_for('sent_emails'))
@app.get("/update_send_status")
def update_send_status():
return email_manager.results
@app.get("/update_count")
def update_count():
return image_count_manager.get_image_counts()
#if needed
@app.route('/about')
def about():
return render_template('about.html')
if __name__ == '__main__':
"""
HOST = os.environ.get('SERVER_HOST', 'localhost')
try:
PORT = int(os.environ.get('SERVER_PORT', '5000'))
except ValueError:
PORT = 5000
app.run(HOST, PORT, debug=True)
"""
#could use the above to make sure the uri is localhost
app.run(debug=True)