forked from openlabs/trytond-nereid
-
Notifications
You must be signed in to change notification settings - Fork 0
/
party.py
834 lines (687 loc) · 27.1 KB
/
party.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import random
import string
import urllib
try:
import hashlib
except ImportError:
hashlib = None
import sha
import pytz
from wtforms import Form, TextField, IntegerField, SelectField, validators, \
PasswordField
from wtfrecaptcha.fields import RecaptchaField
from werkzeug import redirect, abort
from nereid import request, url_for, render_template, login_required, flash, \
jsonify
from nereid.globals import session, current_app
from nereid.signals import registration
from nereid.templating import render_email
from trytond.model import ModelView, ModelSQL, fields
from trytond.pool import Pool
from trytond.pyson import Eval, Bool, Not
from trytond.transaction import Transaction
from trytond.config import CONFIG
from trytond.tools import get_smtp_server
from .i18n import _, get_translations
__all__ = ['Address', 'Party', 'NereidUser',
'ContactMechanism', 'Permission', 'UserPermission']
class RegistrationForm(Form):
"Simple Registration form"
def _get_translations(self):
"""
Provide alternate translations factory.
"""
return get_translations()
name = TextField(_('Name'), [validators.Required(),])
email = TextField(_('e-mail'), [validators.Required(), validators.Email()])
password = PasswordField(_('New Password'), [
validators.Required(),
validators.EqualTo('confirm', message=_('Passwords must match'))])
confirm = PasswordField(_('Confirm Password'))
if 're_captcha_public' in CONFIG.options:
captcha = RecaptchaField(
public_key=CONFIG.options['re_captcha_public'],
private_key=CONFIG.options['re_captcha_private'],
secure=True
)
class AddressForm(Form):
"""
A form resembling the party.address
"""
def _get_translations(self):
"""
Provide alternate translations factory.
"""
return get_translations()
name = TextField(_('Name'), [validators.Required(),])
street = TextField(_('Street'), [validators.Required(),])
streetbis = TextField(_('Street (Bis)'))
zip = TextField(_('Post Code'), [validators.Required(),])
city = TextField(_('City'), [validators.Required(),])
country = SelectField(_('Country'), [validators.Required(),], coerce=int)
subdivision = IntegerField(_('State/County'), [validators.Required()])
email = TextField(_('Email'))
phone = TextField(_('Phone'))
class NewPasswordForm(Form):
"""
Form to set a new password
"""
def _get_translations(self):
"""
Provide alternate translations factory.
"""
return get_translations()
password = PasswordField(_('New Password'), [
validators.Required(),
validators.EqualTo('confirm', message=_('Passwords must match'))])
confirm = PasswordField(_('Repeat Password'))
class ChangePasswordForm(NewPasswordForm):
"""
Form to change the password
"""
def _get_translations(self):
"""
Provide alternate translations factory.
"""
return get_translations()
old_password = PasswordField(_('Old Password'), [validators.Required()])
STATES = {
'readonly': Not(Bool(Eval('active'))),
}
class Address(ModelSQL, ModelView):
"""Party Address"""
__name__ = 'party.address'
registration_form = RegistrationForm
email = fields.Char('Email')
phone = fields.Char('Phone')
@classmethod
@login_required
def edit_address(cls, address=None):
"""
Create/Edit an Address
POST will create a new address or update and existing address depending
on the value of address.
GET will return a new address/existing address edit form
:param address: ID of the address
"""
form = AddressForm(request.form, name=request.nereid_user.display_name)
countries = [
(c.id, c.name) for c in request.nereid_website.countries
]
form.country.choices = countries
if address not in (a.id for a in request.nereid_user.party.addresses):
address = None
if request.method == 'POST' and form.validate():
if address is not None:
cls.write([cls(address)], {
'name': form.name.data,
'street': form.street.data,
'streetbis': form.streetbis.data,
'zip': form.zip.data,
'city': form.city.data,
'country': form.country.data,
'subdivision': form.subdivision.data,
'email': form.email.data,
'phone': form.phone.data,
})
else:
cls.create([{
'name': form.name.data,
'street': form.street.data,
'streetbis': form.streetbis.data,
'zip': form.zip.data,
'city': form.city.data,
'country': form.country.data,
'subdivision': form.subdivision.data,
'party': request.nereid_user.party.id,
'email': form.email.data,
'phone': form.phone.data,
}])
return redirect(url_for('party.address.view_address'))
elif request.method == 'GET' and address:
# Its an edit of existing address, prefill data
address = cls(address)
form = AddressForm(
name=address.name,
street=address.street,
streetbis=address.streetbis,
zip=address.zip,
city=address.city,
country=address.country and address.country.id,
subdivision=address.subdivision and address.subdivision.id,
email=address.email,
phone=address.phone
)
form.country.choices = countries
return render_template('address-edit.jinja', form=form, address=address)
@classmethod
@login_required
def view_address(cls):
"View the addresses of user"
return render_template('address.jinja')
class Party(ModelSQL, ModelView):
"Party"
__name__ = 'party.party'
nereid_users = fields.One2Many('nereid.user', 'party', 'Nereid Users')
class ProfileForm(Form):
"""User Profile Form"""
display_name = TextField('Display Name', [validators.Required(),],
description="Your display name"
)
timezone = SelectField('Timezone',
choices = [(tz, tz) for tz in pytz.common_timezones],
coerce=unicode, description="Your timezone"
)
email = TextField('Email', [validators.Required(), validators.Email()],
description="Your Login Email. This Cannot be edited."
)
class NereidUser(ModelSQL, ModelView):
"""
Nereid Users
"""
__name__ = "nereid.user"
_rec_name = 'display_name'
party = fields.Many2One('party.party', 'Party', required=True,
ondelete='CASCADE', select=1)
display_name = fields.Char('Display Name', required=True)
#: The email of the user is also the login name/username of the user
email = fields.Char("e-Mail", select=1)
#: The password is the user password + the salt, which is
#: then hashed together
password = fields.Sha('Password')
#: The salt which was used to make the hash is separately
#: stored. Needed for
salt = fields.Char('Salt', size=8)
#: A unique activation code required to match the user's request
#: for activation of the account.
activation_code = fields.Char('Unique Activation Code')
# The company of the website(s) to which the user is affiliated. This
# allows websites of the same company to share authentication/users. It
# does not make business or technical sense to have website of multiple
# companies share the authentication.
#
# .. versionchanged:: 0.3
# Company is mandatory
company = fields.Many2One('company.company', 'Company', required=True)
timezone = fields.Selection(
[(x, x) for x in pytz.common_timezones], 'Timezone', translate=False
)
permissions = fields.Many2Many('nereid.permission-nereid.user',
'nereid_user', 'permission', 'Permissions')
def get_permissions(self):
"""
Returns all the permissions as a list of names
"""
# TODO: Cache this value for each user to avoid hitting the database
# everytime.
return frozenset([p.value for p in self.permissions])
def has_permissions(self, permissions):
"""Check if the user has required permissions for access
:param permissions: A set/frozenset of permission values/keywords
:return: True/False
"""
if not isinstance(permissions, (set, frozenset)):
permissions = frozenset(permissions)
current_user_permissions = self.get_permissions()
if permissions.issubset(current_user_permissions):
return True
return False
@staticmethod
def default_timezone():
return "UTC"
@staticmethod
def default_company():
return Transaction().context.get('company') or False
@classmethod
def __setup__(cls):
super(NereidUser, cls).__setup__()
cls._sql_constraints += [
('unique_email_company', 'UNIQUE(email, company)',
'Email must be unique in a company'),
]
def _activate(self, activation_code):
"""
Activate the User account
.. note::
This method will raise an assertion error if the activation_code is
not valid.
:param activation_code: The activation code used
:return: True if the activation code was correct
"""
assert self.activation_code == activation_code, \
'Invalid Activation Code'
return self.write([self], {'activation_code': None})
@staticmethod
def get_registration_form():
"""
Returns a registration form for use in the site
.. tip::
Configuration of re_captcha
Remember to forward X-Real-IP in the case of Proxy servers
"""
# Add re_captcha if the configuration has such an option
if 're_captcha_public' in CONFIG.options:
registration_form = RegistrationForm(
request.form, captcha={'ip_address': request.remote_addr}
)
else:
registration_form = RegistrationForm(request.form)
return registration_form
@classmethod
def registration(cls):
"""
Invokes registration of an user
"""
Party = Pool().get('party.party')
registration_form = cls.get_registration_form()
if request.method == 'POST' and registration_form.validate():
existing = cls.search([
('email', '=', request.form['email']),
('company', '=', request.nereid_website.company.id),
])
if existing:
flash(_('A registration already exists with this email. '
'Please contact customer care')
)
else:
party = Party(name=registration_form.name.data)
party.save()
nereid_user = cls(**{
'party': party.id,
'display_name': registration_form.name.data,
'email': registration_form.email.data,
'password': registration_form.password.data,
'company': request.nereid_website.company.id,
})
nereid_user.save()
nereid_user.create_act_code()
registration.send(nereid_user)
nereid_user.send_activation_email()
flash(
_('Registration Complete. Check your email for activation')
)
return redirect(
request.args.get('next', url_for('nereid.website.home'))
)
return render_template('registration.jinja', form=registration_form)
def send_activation_email(self):
"""
Send an activation email to the user
:param nereid_user: The browse record of the user
"""
email_message = render_email(
CONFIG['smtp_from'], self.email, _('Account Activation'),
text_template = 'emails/activation-text.jinja',
html_template = 'emails/activation-html.jinja',
nereid_user = self
)
server = get_smtp_server()
server.sendmail(
CONFIG['smtp_from'], [self.email], email_message.as_string()
)
server.quit()
@classmethod
@login_required
def change_password(cls):
"""
Changes the password
.. tip::
On changing the password, the user is logged out and the login page
is thrown at the user
"""
form = ChangePasswordForm(request.form)
if request.method == 'POST' and form.validate():
if request.nereid_user.match_password(form.old_password.data):
cls.write(
[request.nereid_user],
{'password': form.password.data}
)
flash(
_('Your password has been successfully changed! '
'Please login again')
)
session.pop('user')
return redirect(url_for('nereid.website.login'))
else:
flash(_("The current password you entered is invalid"))
return render_template(
'change-password.jinja', change_password_form=form
)
@classmethod
@login_required
def new_password(cls):
"""Create a new password
.. tip::
Unlike change password this does not demand the old password.
And hence this method will check in the session for a parameter
called allow_new_password which has to be True. This acts as a
security against attempts to POST to this method and changing
password.
The allow_new_password flag is popped on successful saving
This is intended to be used when a user requests for a password reset.
"""
form = NewPasswordForm(request.form)
if request.method == 'POST' and form.validate():
if not session.get('allow_new_password', False):
current_app.logger.debug('New password not allowed in session')
abort(403)
cls.write(
[request.nereid_user],
{'password': form.password.data}
)
session.pop('allow_new_password')
flash(_('Your password has been successfully changed! '
'Please login again')
)
session.pop('user')
return redirect(url_for('nereid.website.login'))
return render_template('new-password.jinja', password_form=form)
def activate(self, activation_code):
"""A web request handler for activation
:param activation_code: A 12 character activation code indicates reset
while 16 character activation code indicates a new registration
"""
try:
self._activate(activation_code)
except AssertionError:
flash(_('Invalid Activation Code'))
else:
# Log the user in since the activation code is correct
session['user'] = self.id
# Redirect the user to the correct location according to the type
# of activation code.
if len(activation_code) == 12:
session['allow_new_password'] = True
return redirect(url_for('nereid.user.new_password'))
elif len(activation_code) == 16:
flash(_('Your account has been activated'))
return redirect(url_for('nereid.website.home'))
return redirect(url_for('nereid.website.login'))
def create_act_code(self, code_type="new"):
"""Create activation code
A 12 character activation code indicates reset while 16
character activation code indicates a new registration
:param user_id: ID of the User
:param code_type: "new" for new activation code
"reset" for resetting existing account
"""
assert code_type in ("new", "reset")
length = 16 if code_type == "new" else 12
act_code = ''.join(
random.sample(string.letters + string.digits, length)
)
return self.write([self], {'activation_code': act_code})
@classmethod
def reset_account(cls):
"""
Reset the password for the user.
.. tip::
This does NOT reset the password, but just creates an activation
code and sends the link to the email of the user. If the user uses
the link, he can change his password.
"""
if request.method == 'POST':
user_ids = cls.search([
('email', '=', request.form['email']),
('company', '=', request.nereid_website.company.id),
])
if not user_ids:
flash(_('Invalid email address'))
return render_template('reset-password.jinja')
nereid_user, = user_ids
nereid_user.create_act_code("reset")
nereid_user.send_reset_email()
flash(_('An email has been sent to your account for resetting'
' your credentials'))
return redirect(url_for('nereid.website.login'))
return render_template('reset-password.jinja')
def send_reset_email(self):
"""
Send an account reset email to the user
:param nereid_user: The browse record of the user
"""
email_message = render_email(
CONFIG['smtp_from'], self.email, _('Account Password Reset'),
text_template = 'emails/reset-text.jinja',
html_template = 'emails/reset-html.jinja',
nereid_user = self
)
server = get_smtp_server()
server.sendmail(
CONFIG['smtp_from'], [self.email], email_message.as_string()
)
server.quit()
def match_password(self, password):
"""
Checks if 'password' is the same as the current users password.
:param password: The password of the user (string or unicode)
:return: True or False
"""
password += self.salt or ''
if isinstance(password, unicode):
password = password.encode('utf-8')
if hashlib:
digest = hashlib.sha1(password).hexdigest()
else:
digest = sha.new(password).hexdigest()
return (digest == self.password)
@classmethod
def authenticate(cls, email, password):
"""Assert credentials and if correct return the
browse record of the user
:param email: email of the user
:param password: password of the user
:return:
Browse Record: Successful Login
None: User cannot be found or wrong password
False: Account is inactive
"""
users = cls.search([
('email', '=', request.form['email']),
('company', '=', request.nereid_website.company.id),
])
if not users:
current_app.logger.debug("No user with email %s" % email)
return None
if len(users) > 1:
current_app.logger.debug('%s has too many accounts' % email)
return None
user, = users
if user.activation_code and len(user.activation_code) == 16:
# A new account with activation pending
current_app.logger.debug('%s not activated' % email)
flash(_("Your account has not been activated yet!"))
return False # False so to avoid `invalid credentials` flash
if user.match_password(password):
# Reset any reset activation code that might be there since its a
# successful login with the old password
if user.activation_code:
cls.write([user], {'activation_code': None})
return user
return None
@staticmethod
def _convert_values(values):
"""
A helper method which looks if the password is specified in the values.
If it is, then the salt is also made and added
:param values: A dictionary of field: value pairs
"""
if 'password' in values and values['password']:
values['salt'] = ''.join(random.sample(
string.ascii_letters + string.digits, 8))
values['password'] += values['salt']
return values
@classmethod
def create(cls, vlist):
"""
Create, but add salt before saving
:param vlist: List of dictionary of Values
"""
vlist = [cls._convert_values(vals.copy()) for vals in vlist]
return super(NereidUser, cls).create(vlist)
@classmethod
def write(cls, nereid_users, values):
"""
Update salt before saving
"""
return super(NereidUser, cls).write(
nereid_users, cls._convert_values(values)
)
@staticmethod
def get_gravatar_url(email, **kwargs):
"""
Return a gravatar url for the given email
:param email: e-mail of the user
:param https: To get a secure URL
:param default: The default image to return if there is no profile pic
For example a unisex avatar
:param size: The size for the image
"""
if kwargs.get('https', request.scheme == 'https'):
url = 'https://secure.gravatar.com/avatar/%s?'
else:
url = 'http://www.gravatar.com/avatar/%s?'
url = url % hashlib.md5(email.lower()).hexdigest()
params = []
default = kwargs.get('default', None)
if default:
params.append(('d', default))
size = kwargs.get('size', None)
if size:
params.append(('s', str(size)))
return url + urllib.urlencode(params)
def get_profile_picture(self, **kwargs):
"""
Return the url to the profile picture of the user.
The default implementation fetches the profile image of the user from
gravatar using :meth:`get_gravatar_url`
"""
return self.get_gravatar_url(self.email, **kwargs)
@staticmethod
def aslocaltime(naive_date, local_tz_name=None):
"""
Returns a localized time using `pytz.astimezone` method.
:param naive_date: a naive datetime (datetime with no timezone
information), which is assumed to be the UTC time.
:param local_tz_name: The timezone in which the date has to be returned
:type local_tz_name: string
:return: A datetime object with local time
"""
utc_date = pytz.utc.localize(naive_date)
if not local_tz_name:
return utc_date
local_tz = pytz.timezone(local_tz_name)
if local_tz == pytz.utc:
return utc_date
return utc_date.astimezone(local_tz)
def as_user_local_time(self, naive_date):
"""
Returns a date localized in the user's timezone.
:param naive_date: a naive datetime (datetime with no timezone
information), which is assumed to be the UTC time.
"""
return self.aslocaltime(naive_date, self.timezone)
@classmethod
@login_required
def profile(cls):
"""
User profile
"""
user_form = ProfileForm(request.form, obj=request.nereid_user)
if request.method == 'POST' and user_form.validate():
cls.write(
[request.nereid_user], {
'display_name': user_form.display_name.data,
'timezone': user_form.timezone.data,
}
)
flash('Your profile has been updated.')
return redirect(
request.args.get('next', url_for('nereid.user.profile'))
)
return render_template(
'profile.jinja', user_form=user_form, active_type_name="general"
)
class ContactMechanismForm(Form):
type = SelectField('Type', [validators.Required()])
value = TextField('Value', [validators.Required()])
comment = TextField('Comment')
class ContactMechanism(ModelSQL, ModelView):
"""
Allow modification of contact mechanisms
"""
__name__ = "party.contact_mechanism"
def get_form(self):
"""
Returns the contact mechanism form
"""
from trytond.modules.party import contact_mechanism
form = ContactMechanismForm(request.form)
form.type.choices = contact_mechanism._TYPES
return form
@login_required
def add(self):
"""
Adds a contact mechanism to the party's contact mechanisms
"""
form = self.get_form()
if form.validate():
self.create({
'party': request.nereid_user.party.id,
'type': form.type.data,
'value': form.value.data,
'comment': form.comment.data,
})
if request.is_xhr:
return jsonify({'success': True})
return redirect(request.referrer)
if request.is_xhr:
return jsonify({'success': False})
else:
for field, messages in form.errors:
flash("<br>".join(messages), "Field %s" % field)
return redirect(request.referrer)
@login_required
def remove(self):
"""
:param record_id: Delete the contat mechanism with the given ID
"""
record_id = request.form.get('record_id', type=int)
if not record_id:
abort(404)
record = self.browse(record_id)
if not record:
abort(404)
if record.party == request.nereid_user.party:
self.delete(record_id)
else:
abort(403)
if request.is_xhr:
return jsonify({
'success': True
})
return redirect(request.referrer)
class Permission(ModelSQL, ModelView):
"Nereid Permissions"
__name__ = 'nereid.permission'
name = fields.Char('Name', required=True, select=True)
value = fields.Char('Value', required=True, select=True)
nereid_users = fields.Many2Many('nereid.permission-nereid.user',
'permission', 'nereid_user', 'Nereid Users'
)
@classmethod
def __setup__(cls):
super(Permission, cls).__setup__()
cls._sql_constraints += [
('unique_value', 'UNIQUE(value)',
'Permissions must be unique by value'),
]
class UserPermission(ModelSQL):
"Nereid User Permissions"
__name__ = 'nereid.permission-nereid.user'
permission = fields.Many2One('nereid.permission', 'Permission',
ondelete='CASCADE', select=True, required=True)
nereid_user = fields.Many2One('nereid.user', 'User',
ondelete='CASCADE', select=True, required=True)