Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update delete objects xml escape. #213

Open
wants to merge 1 commit into
base: dev-1908
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions oss2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,3 +803,25 @@ def to_str(pos):
return str(pos)

return to_str(start) + '-' + to_str(last)

def xml_escape(key):
if key is None:
return None

new_key = ""
escape_keys = {
'\r' : '
',
'\'' :''',
'"' : '"',
'&' : '&',
'<' : '&lt;',
'>' : '&gt;'
}

for i in range(0, len(key)):
if escape_keys.get(key[i]) is not None:
new_key += escape_keys.get(key[i])
else:
new_key += key[i]

return new_key
37 changes: 20 additions & 17 deletions oss2/xml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,34 +711,37 @@ def to_complete_upload_request(parts):

return _node_to_string(root)


def to_batch_delete_objects_request(keys, quiet):
root_node = ElementTree.Element('Delete')

_add_text_child(root_node, 'Quiet', str(quiet).lower())
xml_body = '''<?xml version="1.0" encoding="UTF-8"?>'''
xml_body += "<Delete>"
xml_body += "<Quiet>" + str(quiet).lower() + "</Quiet>"

for key in keys:
object_node = ElementTree.SubElement(root_node, 'Object')
_add_text_child(object_node, 'Key', key)

return _node_to_string(root_node)
new_key = utils.xml_escape(to_unicode(key))
object_body = "<Object>" + "<Key>" + new_key + "</Key>" + "</Object>"
xml_body += object_body

def to_batch_delete_objects_version_request(objectVersions, quiet):
xml_body += "</Delete>"

root_node = ElementTree.Element('Delete')
return xml_body

_add_text_child(root_node, 'Quiet', str(quiet).lower())
def to_batch_delete_objects_version_request(objectVersions, quiet):
xml_body = '''<?xml version="1.0" encoding="UTF-8"?>'''
xml_body += "<Delete>"
xml_body += "<Quiet>" + str(quiet).lower() + "</Quiet>"

objectVersionList = objectVersions.object_version_list

for ver in objectVersionList:
object_node = ElementTree.SubElement(root_node, 'Object')
_add_text_child(object_node, 'Key', ver.key)
if ver.versionid != '':
_add_text_child(object_node, 'VersionId', ver.versionid)
new_key = utils.xml_escape(to_unicode(ver.key))
object_body = "<Object>"
object_body += "<Key>" + new_key + "</Key>"
object_body += "<VersionId>" + ver.versionid + "</VersionId>"
object_body += "</Object>"
xml_body += object_body

return _node_to_string(root_node)
xml_body += "</Delete>"

return xml_body

def to_put_bucket_config(bucket_config):
root = ElementTree.Element('CreateBucketConfiguration')
Expand Down
11 changes: 11 additions & 0 deletions tests/test_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,17 @@ def test_batch_delete_objects(self):
for object in object_list:
self.assertTrue(not self.bucket.object_exists(object))

def test_batch_delete_objects_specificalChars(self):
object_name = "测试中文python-bswodnvsttpqvnzwsgifetwe\"'\n\n\t@#$!\t<>!@#$%^&*()-="
for i in range(1, 32):
object_name += chr(i)

result = self.bucket.put_object(object_name, "123")

key_list = [object_name]
self.bucket.batch_delete_objects(key_list)
self.assertFalse(self.bucket.object_exists(object_name))

def test_batch_delete_objects_empty(self):
try:
self.bucket.batch_delete_objects([])
Expand Down
49 changes: 49 additions & 0 deletions tests/test_object_versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,55 @@ def test_delete_object_versions_with_invalid_arguments(self):
self.assertRaises(oss2.exceptions.ClientError, bucket.delete_object_versions, None)
self.assertRaises(oss2.exceptions.ClientError, bucket.delete_object_versions, [])

def test_batch_delete_verions_with_specifical_chars(self):

from oss2.models import BucketVersioningConfig
from oss2.models import BatchDeleteObjectVersion
from oss2.models import BatchDeleteObjectVersionList

auth = oss2.Auth(OSS_ID, OSS_SECRET)
bucket_name = OSS_BUCKET + "-test-batch-delete-versions-spec-chars"
bucket = oss2.Bucket(auth, self.endpoint, bucket_name)

bucket.create_bucket(oss2.BUCKET_ACL_PRIVATE)

wait_meta_sync()

config = BucketVersioningConfig()
config.status = 'Enabled'

result = bucket.put_bucket_versioning(config)

wait_meta_sync()

result = bucket.get_bucket_info()

object_name = "测试中文python-bswodnvsttpqvnzwsgifetwe\"'\n\n\t@#$!\t<>!@#$%^&*()-="
for i in range(1, 32):
object_name += chr(i)

# put version 1
result = bucket.put_object(object_name, "123")
self.assertEqual(int(result.status)/100, 2)
self.assertTrue(result.versionid != "")
versionid1 = result.versionid

version_list = BatchDeleteObjectVersionList()
version_list.append(BatchDeleteObjectVersion(key=object_name, versionid=versionid1))

self.assertTrue(version_list.len(), 1)

result = bucket.delete_object_versions(version_list)

self.assertTrue(len(result.delete_versions) == 1)
self.assertTrue(result.delete_versions[0].versionid == versionid1)

self.assertFalse(bucket.object_exists(object_name))

try:
bucket.delete_bucket()
except:
self.assertFalse(True, "should not get a exception")

if __name__ == '__main__':
unittest.main()