Skip to content

Commit

Permalink
iter_items: decouple item iteration and content data chunks preloading
Browse files Browse the repository at this point in the history
It needs to be possible to iterate over all items in an archive,
do some output (e.g. if an item is included / excluded) and then
only preload content data chunks for the included items.
  • Loading branch information
ThomasWaldmann committed Dec 23, 2024
1 parent 2108616 commit 694fa93
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 35 deletions.
82 changes: 50 additions & 32 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,42 +262,55 @@ class DownloadPipeline:
def __init__(self, repository, repo_objs):
self.repository = repository
self.repo_objs = repo_objs
self.hlids_preloaded = None

def unpack_many(self, ids, *, filter=None, preload=False):
def unpack_many(self, ids, *, filter=None):
"""
Return iterator of items.
*ids* is a chunk ID list of an item stream. *filter* is a callable
to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.
Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
*ids* is a chunk ID list of an item content data stream.
*filter* is an optional callable to decide whether an item will be yielded, default: yield all items.
"""
hlids_preloaded = set()
self.hlids_preloaded = set()
unpacker = msgpack.Unpacker(use_list=False)
for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM):
unpacker.feed(data)
for _item in unpacker:
item = Item(internal_dict=_item)
if "chunks" in item:
item.chunks = [ChunkListEntry(*e) for e in item.chunks]
if "chunks_healthy" in item:
item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy]
if filter and not filter(item):
continue
if preload and "chunks" in item:
hlid = item.get("hlid", None)
if hlid is None:
preload_chunks = True
elif hlid in hlids_preloaded:
preload_chunks = False
else:
# not having the hardlink's chunks already preloaded for other hardlink to same inode
preload_chunks = True
hlids_preloaded.add(hlid)
if preload_chunks:
self.repository.preload([c.id for c in item.chunks])
yield item
if filter is None or filter(item):
if "chunks" in item:
item.chunks = [ChunkListEntry(*e) for e in item.chunks]
if "chunks_healthy" in item:
item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy]
yield item

def preload_item_chunks(self, item, optimize_hardlinks=False):
"""
Preloads the content data chunks of an item (if any).
optimize_hardlinks can be set to True if item chunks only need to be preloaded for
1st hardlink, but not for any further hardlink to same inode / with same hlid.
Returns True if chunks were preloaded.
Warning: if data chunks are preloaded then all data chunks have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
"""
preload_chunks = False
if "chunks" in item:
if optimize_hardlinks:
hlid = item.get("hlid", None)
if hlid is None:
preload_chunks = True
elif hlid in self.hlids_preloaded:
preload_chunks = False
else:
# not having the hardlink's chunks already preloaded for other hardlink to same inode
preload_chunks = True
self.hlids_preloaded.add(hlid)
else:
preload_chunks = True

Check warning on line 310 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L310

Added line #L310 was not covered by tests
if preload_chunks:
self.repository.preload([c.id for c in item.chunks])
return preload_chunks

def fetch_many(self, ids, is_preloaded=False, ro_type=None):
assert ro_type is not None
Expand Down Expand Up @@ -605,12 +618,17 @@ def __repr__(self):
def item_filter(self, item, filter=None):
return filter(item) if filter else True

def iter_items(self, filter=None, preload=False):
# note: when calling this with preload=True, later fetch_many() must be called with
# is_preloaded=True or the RemoteRepository code will leak memory!
yield from self.pipeline.unpack_many(
self.metadata.items, preload=preload, filter=lambda item: self.item_filter(item, filter)
)
def iter_items(self, filter=None):
yield from self.pipeline.unpack_many(self.metadata.items, filter=lambda item: self.item_filter(item, filter))

def preload_item_chunks(self, item, optimize_hardlinks=False):
"""
Preloads item content data chunks from the repository.
Warning: if data chunks are preloaded then all data chunks have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
"""
return self.pipeline.preload_item_chunks(item, optimize_hardlinks=optimize_hardlinks)

def add_item(self, item, show_progress=True, stats=None):
if show_progress and self.show_progress:
Expand Down
3 changes: 2 additions & 1 deletion src/borg/archiver/extract_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def do_extract(self, args, repository, manifest, archive):
else:
pi = None

for item in archive.iter_items(filter, preload=True):
for item in archive.iter_items(filter):
archive.preload_item_chunks(item, optimize_hardlinks=True)
orig_path = item.path
if strip_components:
item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])
Expand Down
3 changes: 2 additions & 1 deletion src/borg/archiver/tar_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ def item_to_paxheaders(format, item):
ph["BORG.item.meta"] = meta_text
return ph

for item in archive.iter_items(filter, preload=True):
for item in archive.iter_items(filter):
archive.preload_item_chunks(item, optimize_hardlinks=True)
orig_path = item.path
if strip_components:
item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])
Expand Down
2 changes: 1 addition & 1 deletion src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def _build_files_cache(self):
)
files_cache_logger.debug("FILES-CACHE-BUILD: starting...")
archive = Archive(self.manifest, prev_archive.id)
for item in archive.iter_items(preload=False):
for item in archive.iter_items():
# only put regular files' infos into the files cache:
if stat.S_ISREG(item.mode):
path_hash = self.key.id_hash(safe_encode(item.path))
Expand Down

0 comments on commit 694fa93

Please sign in to comment.