Skip to content

Commit

Permalink
Validate request body for proper base64 encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
selectiveduplicate committed Sep 26, 2024
1 parent e1c2f56 commit d536edb
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
28 changes: 25 additions & 3 deletions moesif_aws_lambda/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import json
import os
from pprint import pprint
import base64

import random
import math
import binascii
import re

try:
from urllib import urlencode
Expand Down Expand Up @@ -169,6 +170,24 @@ def build_uri(self, event, payload_format_version_1_0):
uri = uri + '?' + event['rawQueryString']
return uri

def is_base64_str(self, data):
"""Checks if `data` is a valid base64-encoded string."""
if not isinstance(data, str):
return False
if len(data) % 4 != 0:
return False

b64_regex = re.compile("^[A-Za-z0-9+/]+={0,2}$")

if (not b64_regex.fullmatch(data)):
return False

try:
_ = base64.b64decode(data)
return True
except binascii.Error:
return False

def base64_body(cls, data):
"""Function to transfer body into base64 encoded"""
body = base64.b64encode(str(data).encode("utf-8"))
Expand All @@ -193,9 +212,11 @@ def process_body(self, body_wrapper):

body = None
transfer_encoding = None

try:
if body_wrapper.get('isBase64Encoded', False):
body = body_wrapper.get('body')
if body_wrapper.get('isBase64Encoded', False) and self.is_base64_str(
body_wrapper.get('body')
):
transfer_encoding = 'base64'
else:
if isinstance(body_wrapper['body'], str):
Expand All @@ -205,6 +226,7 @@ def process_body(self, body_wrapper):
transfer_encoding = 'json'
except Exception as e:
return self.base64_body(body_wrapper['body'])

return body, transfer_encoding

def before(self, event, context):
Expand Down
44 changes: 44 additions & 0 deletions moesif_aws_lambda/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json
import unittest
from .middleware import MoesifLogger

moesif_options = {
"LOG_BODY": True,
}


@MoesifLogger(moesif_options)
def lambda_handler(event, context):

return {
"statusCode": 200,
"isBase64Encoded": False,
"body": json.dumps(
{"msg": "Hello from Lambda!", "req_body": event.get("body")}
),
"headers": {"Content-Type": "application/json"},
}


class TestStrIsBase64(unittest.TestCase):
def test_valid_base64_encoded_str(self):
"""
Tests that `is_str_base64` returns `True` for a valid base64-encoded
string.
"""
valid_base64 = "eyJmb28iOiJiYXIifQ=="
moesif = MoesifLogger(moesif_options)
self.assertTrue(moesif.is_base64_str(self=moesif, data=valid_base64))

def test_invalid_base64_encoded_str(self):
"""
Tests that `is_str_base64` returns `False` for an invalid base64-encoded
string.
"""
invalid_base64 = json.dumps({"foo": "bar"})
moesif = MoesifLogger(moesif_options)
self.assertFalse(moesif.is_base64_str(self=moesif, data=invalid_base64))


if __name__ == "__main__":
unittest.main()

0 comments on commit d536edb

Please sign in to comment.