From d2332bc40f79b8aee1db3bb93d328ab167ca0105 Mon Sep 17 00:00:00 2001 From: albertony <12441419+albertony@users.noreply.github.com> Date: Tue, 29 Mar 2022 10:26:51 +0200 Subject: [PATCH] Added support for Jottacloud cli token --- cli-token.html | 35 +++++++++ main.py | 198 ++++++++++++++++++++++++++++++++++++++----------- settings.py | 17 +++++ 3 files changed, 208 insertions(+), 42 deletions(-) create mode 100644 cli-token.html diff --git a/cli-token.html b/cli-token.html new file mode 100644 index 0000000..d71179b --- /dev/null +++ b/cli-token.html @@ -0,0 +1,35 @@ + + + + + + + {{longappname}} + + + + + + + + + + + +
+

{{appname}} for {{service}}

+ +

Type in the CLI token

+
+ + +
+
+ +
+
+ + \ No newline at end of file diff --git a/main.py b/main.py index 7e09a0a..2e6585f 100644 --- a/main.py +++ b/main.py @@ -60,6 +60,41 @@ def find_service(id): return service +def create_authtoken(provider_id, token): + # We store the ID if we get it back + if token.has_key("user_id"): + user_id = token["user_id"] + else: + user_id = "N/A" + + exp_secs = 1800 # 30 min guess + try: + exp_secs = int(token["expires_in"]) + except: + pass + + # Create a random password and encrypt the response + # This ensures that a hostile takeover will not get access + # to stored access and refresh tokens + password = password_generator.generate_pass() + cipher = simplecrypt.encrypt(password, json.dumps(token)) + + # Convert to text and prepare for storage + b64_cipher = base64.b64encode(cipher) + expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=exp_secs) + + entry = None + keyid = None + + # Find a random un-used user ID, and store the encrypted data + while entry is None: + keyid = '%030x' % random.randrange(16 ** 32) + entry = dbmodel.insert_new_authtoken(keyid, user_id, b64_cipher, expires, provider_id) + + # Return the keyid and authid + return keyid, keyid + ':' + password + + class RedirectToLoginHandler(webapp2.RequestHandler): """Creates a state and redirects the user to the login page""" @@ -129,12 +164,16 @@ def get(self): if filtertype is None and n.has_key('hidden') and n['hidden']: continue - link = '/login?id=' + n['id'] - if self.request.get('token', None) is not None: - link += '&token=' + self.request.get('token') + link = '' + if service.has_key('cli-token') and service['cli-token']: + link = '/cli-token?id=' + n['id'] + else: + link = '/login?id=' + n['id'] + if self.request.get('token', None) is not None: + link += '&token=' + self.request.get('token') - if tokenversion is not None: - link += '&tokenversion=' + str(tokenversion) + if tokenversion is not None: + link += '&tokenversion=' + str(tokenversion) notes = '' if n.has_key('notes'): @@ -309,39 +348,105 @@ def get(self, service=None): logging.info('Returned refresh token for service %s', provider['id']) return - # We store the ID if we get it back - if resp.has_key("user_id"): - user_id = resp["user_id"] - else: - user_id = "N/A" + # Return the id and password to the user + keyid, authid = create_authtoken(provider['id'], resp) + + fetchtoken = statetoken.fetchtoken + + # If this was part of a polling request, signal completion + dbmodel.update_fetch_token(fetchtoken, authid) + + # Report results to the user + template_values = { + 'service': display, + 'appname': settings.APP_NAME, + 'longappname': settings.SERVICE_DISPLAYNAME, + 'authid': authid, + 'fetchtoken': fetchtoken + } + + template = JINJA_ENVIRONMENT.get_template('logged-in.html') + self.response.write(template.render(template_values)) + statetoken.delete() + + logging.info('Created new authid %s for service %s', keyid, provider['id']) + + except: + logging.exception('handler error for ' + display) + + template_values = { + 'service': display, + 'appname': settings.APP_NAME, + 'longappname': settings.SERVICE_DISPLAYNAME, + 'authid': 'Server error, close window and try again', + 'fetchtoken': '' + } + + template = JINJA_ENVIRONMENT.get_template('logged-in.html') + self.response.write(template.render(template_values)) + +class CliTokenHandler(webapp2.RequestHandler): + """Renders the cli-token.html page""" + + def get(self): + + provider, service = find_provider_and_service(self.request.get('id', None)) + + template_values = { + 'service': provider['display'], + 'appname': settings.APP_NAME, + 'longappname': settings.SERVICE_DISPLAYNAME, + 'id': provider['id'] + } + + template = JINJA_ENVIRONMENT.get_template('cli-token.html') + self.response.write(template.render(template_values)) + + +class CliTokenLoginHandler(webapp2.RequestHandler): + """Handler that processes cli-token login and redirects the user to the logged-in page""" + + def post(self): + display = 'Unknown' + error = 'Server error, close window and try again' + try: + id = self.request.POST.get('id') + provider, service = find_provider_and_service(id) + display = provider['display'] - exp_secs = 1800 # 30 min guess try: - exp_secs = int(resp["expires_in"]) + data = self.request.POST.get('token') + content = base64.urlsafe_b64decode(str(data) + '=' * (-len(data) % 4)) + resp = json.loads(content) except: - pass + error = 'Error: Invalid CLI token' + raise - # Create a random password and encrypt the response - # This ensures that a hostile takeover will not get access - # to stored access and refresh tokens - password = password_generator.generate_pass() - cipher = simplecrypt.encrypt(password, json.dumps(resp)) - - # Convert to text and prepare for storage - b64_cipher = base64.b64encode(cipher) - expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=exp_secs) - fetchtoken = statetoken.fetchtoken + urlfetch.set_default_fetch_deadline(20) + url = service['auth-url'] + data = urllib.urlencode({ + 'client_id': service['client-id'], + 'grant_type': 'password', + 'scope': provider['scope'], + 'username': resp['username'], + 'password': resp['auth_token'] + }) + try: + req = urllib2.Request(url, data, {'Content-Type': 'application/x-www-form-urlencoded'}) + f = urllib2.urlopen(req) + content = f.read() + f.close() + except urllib2.HTTPError as err: + if err.code == 401: + # If trying to re-use a single-use cli token + error = 'Error: CLI token could not be authorized, create a new and try again' + raise err - entry = None - keyid = None + resp = json.loads(content) - # Find a random un-used user ID, and store the encrypted data - while entry is None: - keyid = '%030x' % random.randrange(16 ** 32) - entry = dbmodel.insert_new_authtoken(keyid, user_id, b64_cipher, expires, provider['id']) + keyid, authid = create_authtoken(id, resp) - # Return the id and password to the user - authid = keyid + ':' + password + fetchtoken = dbmodel.create_fetch_token(resp) # If this was part of a polling request, signal completion dbmodel.update_fetch_token(fetchtoken, authid) @@ -357,9 +462,8 @@ def get(self, service=None): template = JINJA_ENVIRONMENT.get_template('logged-in.html') self.response.write(template.render(template_values)) - statetoken.delete() - logging.info('Created new authid %s for service %s', keyid, provider['id']) + logging.info('Created new authid %s for service %s', keyid, id) except: logging.exception('handler error for ' + display) @@ -368,7 +472,7 @@ def get(self, service=None): 'service': display, 'appname': settings.APP_NAME, 'longappname': settings.SERVICE_DISPLAYNAME, - 'authid': 'Server error, close window and try again', + 'authid': error, 'fetchtoken': '' } @@ -559,11 +663,14 @@ def process(self, authid): url = service['auth-url'] request_params = { 'client_id': service['client-id'], - 'redirect_uri': service['redirect-uri'], - 'client_secret': service['client-secret'], 'grant_type': 'refresh_token', 'refresh_token': resp['refresh_token'] } + if service.has_key("client_secret"): + request_params['client_secret'] = service['client-secret'] + if service.has_key("redirect_uri"): + request_params['redirect_uri'] = service['redirect-uri'] + # Some services do not allow the state to be passed if service.has_key('no-redirect_uri-for-refresh-request') and service['no-redirect_uri-for-refresh-request']: del request_params['redirect_uri'] @@ -673,12 +780,17 @@ def handle_v2(self, inputfragment): logging.info('Cached response to: %s is invalid because it expires in %s', tokenhash, exp_secs) url = service['auth-url'] - data = urllib.urlencode({'client_id': service['client-id'], - 'redirect_uri': service['redirect-uri'], - 'client_secret': service['client-secret'], - 'grant_type': 'refresh_token', - 'refresh_token': refresh_token - }) + request_params = { + 'client_id': service['client-id'], + 'grant_type': 'refresh_token', + 'refresh_token': refresh_token + } + if service.has_key("client_secret"): + request_params['client_secret'] = service['client-secret'] + if service.has_key("redirect_uri"): + request_params['redirect_uri'] = service['redirect-uri'] + + data = urllib.urlencode(request_params) urlfetch.set_default_fetch_deadline(20) @@ -983,6 +1095,8 @@ def get(self): app = webapp2.WSGIApplication([ ('/logged-in', LoginHandler), ('/login', RedirectToLoginHandler), + ('/cli-token', CliTokenHandler), + ('/cli-token-login', CliTokenLoginHandler), ('/refresh', RefreshHandler), ('/fetch', FetchHandler), ('/token-state', TokenStateHandler), diff --git a/settings.py b/settings.py index d948ade..7252425 100644 --- a/settings.py +++ b/settings.py @@ -160,6 +160,8 @@ DROPBOX_AUTH_URL = 'https://api.dropboxapi.com/oauth2/token' DROPBOX_LOGIN_URL = 'https://www.dropbox.com/oauth2/authorize' +JOTTACLOUD_AUTH_URL = 'https://id.jottacloud.com/auth/realms/jottacloud/protocol/openid-connect/token' + LOOKUP = { 'wl': { 'display': 'Windows Live', @@ -214,6 +216,7 @@ 'auth-url': BOX_AUTH_URL, 'login-url': BOX_LOGIN_URL }, + 'dropbox': { 'display': 'Dropbox', 'client-id': DROPBOX_CLIENT_ID, @@ -225,6 +228,13 @@ 'no-state-for-token-request': True, # Dropbox is a little picky 'no-redirect_uri-for-refresh-request': True + }, + + 'jottacloud': { + 'display': 'Jottacloud', + 'client-id': "jottacli", + 'auth-url': JOTTACLOUD_AUTH_URL, + 'cli-token': True } } @@ -324,6 +334,13 @@ 'scope': 'files.content.write files.content.read files.metadata.read files.metadata.write', 'extraurl': 'token_access_type=offline', 'servicelink': 'https://dropbox.com' + }, + { + 'display': 'Jottacloud', + 'type': 'jottacloud', + 'id': 'jottacloud', + 'scope': 'openid offline_access', + 'servicelink': 'https://jottacloud.com' } ]