Skip to content

Commit

Permalink
Update delete objects xml escape.
Browse files Browse the repository at this point in the history
  • Loading branch information
liyanzhang505 committed Nov 8, 2019
1 parent 708db2f commit f392404
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 17 deletions.
21 changes: 21 additions & 0 deletions oss2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,3 +803,24 @@ def to_str(pos):
return str(pos)

return to_str(start) + '-' + to_str(last)

def xml_escape(key):
if key is None:
return None

new_key = ""
escape_keys = {
'\r' : '
',
'\'' :'"',
'&' : '&',
'<' : '&lt;',
'>' : '&gt;'
}

for i in range(0, len(key)):
if escape_keys.get(key[i]) is not None:
new_key += escape_keys.get(key[i])
else:
new_key += key[i]

return new_key
37 changes: 20 additions & 17 deletions oss2/xml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,34 +711,37 @@ def to_complete_upload_request(parts):

return _node_to_string(root)


def to_batch_delete_objects_request(keys, quiet):
root_node = ElementTree.Element('Delete')

_add_text_child(root_node, 'Quiet', str(quiet).lower())
xml_body = '''<?xml version="1.0" encoding="UTF-8"?>'''
xml_body += "<Delete>"
xml_body += "<Quiet>" + str(quiet).lower() + "</Quiet>"

for key in keys:
object_node = ElementTree.SubElement(root_node, 'Object')
_add_text_child(object_node, 'Key', key)

return _node_to_string(root_node)
new_key = utils.xml_escape(to_unicode(key))
object_body = "<Object>" + "<Key>" + new_key + "</Key>" + "</Object>"
xml_body += object_body

def to_batch_delete_objects_version_request(objectVersions, quiet):
xml_body += "</Delete>"

root_node = ElementTree.Element('Delete')
return xml_body

_add_text_child(root_node, 'Quiet', str(quiet).lower())
def to_batch_delete_objects_version_request(objectVersions, quiet):
xml_body = '''<?xml version="1.0" encoding="UTF-8"?>'''
xml_body += "<Delete>"
xml_body += "<Quiet>" + str(quiet).lower() + "</Quiet>"

objectVersionList = objectVersions.object_version_list

for ver in objectVersionList:
object_node = ElementTree.SubElement(root_node, 'Object')
_add_text_child(object_node, 'Key', ver.key)
if ver.versionid != '':
_add_text_child(object_node, 'VersionId', ver.versionid)
new_key = utils.xml_escape(to_unicode(ver.key))
object_body = "<Object>"
object_body += "<Key>" + new_key + "</Key>"
object_body += "<VersionId>" + ver.versionid + "</VersionId>"
object_body += "</Object>"
xml_body += object_body

return _node_to_string(root_node)
xml_body += "</Delete>"

return xml_body

def to_put_bucket_config(bucket_config):
root = ElementTree.Element('CreateBucketConfiguration')
Expand Down
11 changes: 11 additions & 0 deletions tests/test_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,17 @@ def test_batch_delete_objects(self):
for object in object_list:
self.assertTrue(not self.bucket.object_exists(object))

def test_batch_delete_objects_specificalChars(self):
object_name = "测试中文python-bswodnvsttpqvnzwsgifetwe\n\n\t@#$!\t<>!@#$%^&*()-="
for i in range(1, 32):
object_name += chr(i)

result = self.bucket.put_object(object_name, "123")

key_list = [object_name]
self.bucket.batch_delete_objects(key_list)
self.assertFalse(self.bucket.object_exists(object_name))

def test_batch_delete_objects_empty(self):
try:
self.bucket.batch_delete_objects([])
Expand Down
49 changes: 49 additions & 0 deletions tests/test_object_versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,55 @@ def test_delete_object_versions_with_invalid_arguments(self):
self.assertRaises(oss2.exceptions.ClientError, bucket.delete_object_versions, None)
self.assertRaises(oss2.exceptions.ClientError, bucket.delete_object_versions, [])

def test_batch_delete_verions_with_specifical_chars(self):

from oss2.models import BucketVersioningConfig
from oss2.models import BatchDeleteObjectVersion
from oss2.models import BatchDeleteObjectVersionList

auth = oss2.Auth(OSS_ID, OSS_SECRET)
bucket_name = OSS_BUCKET + "-test-batch-delete-versions-spec-chars"
bucket = oss2.Bucket(auth, self.endpoint, bucket_name)

bucket.create_bucket(oss2.BUCKET_ACL_PRIVATE)

wait_meta_sync()

config = BucketVersioningConfig()
config.status = 'Enabled'

result = bucket.put_bucket_versioning(config)

wait_meta_sync()

result = bucket.get_bucket_info()

object_name = "测试中文python-bswodnvsttpqvnzwsgifetwe\n\n\t@#$!\t<>!@#$%^&*()-="
for i in range(1, 32):
object_name += chr(i)

# put version 1
result = bucket.put_object(object_name, "123")
self.assertEqual(int(result.status)/100, 2)
self.assertTrue(result.versionid != "")
versionid1 = result.versionid

version_list = BatchDeleteObjectVersionList()
version_list.append(BatchDeleteObjectVersion(key=object_name, versionid=versionid1))

self.assertTrue(version_list.len(), 1)

result = bucket.delete_object_versions(version_list)

self.assertTrue(len(result.delete_versions) == 1)
self.assertTrue(result.delete_versions[0].versionid == versionid1)

self.assertFalse(bucket.object_exists(object_name))

try:
bucket.delete_bucket()
except:
self.assertFalse(True, "should not get a exception")

if __name__ == '__main__':
unittest.main()

0 comments on commit f392404

Please sign in to comment.