diff --git a/gcsfs/core.py b/gcsfs/core.py index 390a3b7b..20bcda6d 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -27,11 +27,9 @@ logger = logging.getLogger("gcsfs") - if "GCSFS_DEBUG" in os.environ: setup_logging(logger=logger, level=os.getenv("GCSFS_DEBUG")) - # client created 2018-01-16 ACLs = { "authenticatedread", @@ -50,9 +48,9 @@ } DEFAULT_PROJECT = os.getenv("GCSFS_DEFAULT_PROJECT", "") -GCS_MIN_BLOCK_SIZE = 2**18 -GCS_MAX_BLOCK_SIZE = 2**28 -DEFAULT_BLOCK_SIZE = 5 * 2**20 +GCS_MIN_BLOCK_SIZE = 2 ** 18 +GCS_MAX_BLOCK_SIZE = 2 ** 28 +DEFAULT_BLOCK_SIZE = 5 * 2 ** 20 SUPPORTED_FIXED_KEY_METADATA = { "content_encoding": "contentEncoding", @@ -118,7 +116,7 @@ def _chunks(lst, n): Implementation based on https://stackoverflow.com/a/312464. """ for i in range(0, len(lst), n): - yield lst[i : i + n] + yield lst[i: i + n] def _coalesce_generation(*args): @@ -351,9 +349,9 @@ def _strip_protocol(cls, path): protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol for protocol in protos: if path.startswith(protocol + "://"): - path = path[len(protocol) + 3 :] + path = path[len(protocol) + 3:] elif path.startswith(protocol + "::"): - path = path[len(protocol) + 2 :] + path = path[len(protocol) + 2:] # use of root_marker to make minimum required path, e.g., "/" return path or cls.root_marker @@ -406,7 +404,6 @@ async def _request( data=data, timeout=self.requests_timeout, ) as r: - status = r.status headers = r.headers info = r.request_info # for debug only @@ -507,7 +504,7 @@ async def _get_object(self, path): maxResults=1 if not generation else None, versions="true" if generation else None, ) - for item in resp.get("items", []): + for item in self._filter_ghost_items(resp.get("items", [])): if item["name"] == key and ( not generation or item.get("generation") == generation ): @@ -559,6 +556,23 @@ async def _list_objects(self, path, prefix="", versions=False): self.dircache[path] = out return out + @staticmethod + def _filter_ghost_items(items): + if not items: + items = [] + + filtered_items = [] + + for item in items: + if item.get("kind", "") == "storage#object" \ + and item.get("size", "") == "0" \ + and item.get("crc32c", "") == "AAAAAA==": + # This is a ghost item, skip it + continue + filtered_items.append(item) + + return filtered_items + async def _do_list_objects( self, path, max_results=None, delimiter="/", prefix="", versions=False ): @@ -581,7 +595,7 @@ async def _do_list_objects( ) prefixes.extend(page.get("prefixes", [])) - items.extend(page.get("items", [])) + items.extend(self._filter_ghost_items(page.get("items", []))) next_page_token = page.get("nextPageToken", None) while next_page_token is not None: @@ -599,7 +613,7 @@ async def _do_list_objects( assert page["kind"] == "storage#objects" prefixes.extend(page.get("prefixes", [])) - items.extend(page.get("items", [])) + items.extend(self._filter_ghost_items(page.get("items", []))) next_page_token = page.get("nextPageToken", None) items = [self._process_object(bucket, i) for i in items] @@ -612,7 +626,7 @@ async def _list_buckets(self): page = await self._call("GET", "b", project=self.project, json_out=True) assert page["kind"] == "storage#buckets" - items.extend(page.get("items", [])) + items.extend(self._filter_ghost_items(page.get("items", []))) next_page_token = page.get("nextPageToken", None) while next_page_token is not None: @@ -625,7 +639,7 @@ async def _list_buckets(self): ) assert page["kind"] == "storage#buckets" - items.extend(page.get("items", [])) + items.extend(self._filter_ghost_items(page.get("items", []))) next_page_token = page.get("nextPageToken", None) buckets = [ @@ -1025,7 +1039,7 @@ async def _rm_files(self, paths): f"{self._location}/batch/storage/v1", headers={ "Content-Type": 'multipart/mixed; boundary="==========' - '=====7330845974216740156=="' + '=====7330845974216740156=="' }, data=body + "\n--===============7330845974216740156==--", ) @@ -1058,7 +1072,7 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20): exs = await asyncio.gather( *( [ - self._rm_files(files[i : i + batchsize]) + self._rm_files(files[i: i + batchsize]) for i in range(0, len(files), batchsize) ] ), @@ -1074,8 +1088,8 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20): ex for ex in exs if ex is not None - and "No such object" not in str(ex) - and not isinstance(ex, FileNotFoundError) + and "No such object" not in str(ex) + and not isinstance(ex, FileNotFoundError) ] if exs: raise exs[0] @@ -1090,21 +1104,21 @@ async def _pipe_file( metadata=None, consistency=None, content_type="application/octet-stream", - chunksize=50 * 2**20, + chunksize=50 * 2 ** 20, ): # enforce blocksize should be a multiple of 2**18 consistency = consistency or self.consistency bucket, key, generation = self.split_path(path) size = len(data) out = None - if size < 5 * 2**20: + if size < 5 * 2 ** 20: location = await simple_upload( self, bucket, key, data, metadata, consistency, content_type ) else: location = await initiate_upload(self, bucket, key, content_type, metadata) for offset in range(0, len(data), chunksize): - bit = data[offset : offset + chunksize] + bit = data[offset: offset + chunksize] out = await upload_chunk( self, location, bit, offset, size, content_type ) @@ -1123,7 +1137,7 @@ async def _put_file( metadata=None, consistency=None, content_type="application/octet-stream", - chunksize=50 * 2**20, + chunksize=50 * 2 ** 20, callback=None, **kwargs, ): @@ -1141,7 +1155,7 @@ async def _put_file( f0.seek(0) callback.set_size(size) - if size < 5 * 2**20: + if size < 5 * 2 ** 20: await simple_upload( self, bucket,