Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

item content chunks preloading related changes #8592

Merged
merged 2 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 50 additions & 32 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,42 +262,55 @@
def __init__(self, repository, repo_objs):
self.repository = repository
self.repo_objs = repo_objs
self.hlids_preloaded = None

def unpack_many(self, ids, *, filter=None, preload=False):
def unpack_many(self, ids, *, filter=None):
"""
Return iterator of items.

*ids* is a chunk ID list of an item stream. *filter* is a callable
to decide whether an item will be yielded. *preload* preloads the data chunks of every yielded item.

Warning: if *preload* is True then all data chunks of every yielded item have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
*ids* is a chunk ID list of an item content data stream.
*filter* is an optional callable to decide whether an item will be yielded, default: yield all items.
"""
hlids_preloaded = set()
self.hlids_preloaded = set()
unpacker = msgpack.Unpacker(use_list=False)
for data in self.fetch_many(ids, ro_type=ROBJ_ARCHIVE_STREAM):
unpacker.feed(data)
for _item in unpacker:
item = Item(internal_dict=_item)
if "chunks" in item:
item.chunks = [ChunkListEntry(*e) for e in item.chunks]
if "chunks_healthy" in item:
item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy]
if filter and not filter(item):
continue
if preload and "chunks" in item:
hlid = item.get("hlid", None)
if hlid is None:
preload_chunks = True
elif hlid in hlids_preloaded:
preload_chunks = False
else:
# not having the hardlink's chunks already preloaded for other hardlink to same inode
preload_chunks = True
hlids_preloaded.add(hlid)
if preload_chunks:
self.repository.preload([c.id for c in item.chunks])
yield item
if filter is None or filter(item):
if "chunks" in item:
item.chunks = [ChunkListEntry(*e) for e in item.chunks]
if "chunks_healthy" in item:
item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy]
yield item

def preload_item_chunks(self, item, optimize_hardlinks=False):
"""
Preloads the content data chunks of an item (if any).
optimize_hardlinks can be set to True if item chunks only need to be preloaded for
1st hardlink, but not for any further hardlink to same inode / with same hlid.
Returns True if chunks were preloaded.

Warning: if data chunks are preloaded then all data chunks have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
"""
preload_chunks = False
if "chunks" in item:
if optimize_hardlinks:
hlid = item.get("hlid", None)
if hlid is None:
preload_chunks = True
elif hlid in self.hlids_preloaded:
preload_chunks = False
else:
# not having the hardlink's chunks already preloaded for other hardlink to same inode
preload_chunks = True
self.hlids_preloaded.add(hlid)
else:
preload_chunks = True

Check warning on line 310 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L310

Added line #L310 was not covered by tests
if preload_chunks:
self.repository.preload([c.id for c in item.chunks])
return preload_chunks

def fetch_many(self, ids, is_preloaded=False, ro_type=None):
assert ro_type is not None
Expand Down Expand Up @@ -605,12 +618,17 @@
def item_filter(self, item, filter=None):
return filter(item) if filter else True

def iter_items(self, filter=None, preload=False):
# note: when calling this with preload=True, later fetch_many() must be called with
# is_preloaded=True or the RemoteRepository code will leak memory!
yield from self.pipeline.unpack_many(
self.metadata.items, preload=preload, filter=lambda item: self.item_filter(item, filter)
)
def iter_items(self, filter=None):
yield from self.pipeline.unpack_many(self.metadata.items, filter=lambda item: self.item_filter(item, filter))

def preload_item_chunks(self, item, optimize_hardlinks=False):
"""
Preloads item content data chunks from the repository.

Warning: if data chunks are preloaded then all data chunks have to be retrieved,
otherwise preloaded chunks will accumulate in RemoteRepository and create a memory leak.
"""
return self.pipeline.preload_item_chunks(item, optimize_hardlinks=optimize_hardlinks)

def add_item(self, item, show_progress=True, stats=None):
if show_progress and self.show_progress:
Expand Down
3 changes: 2 additions & 1 deletion src/borg/archiver/extract_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def do_extract(self, args, repository, manifest, archive):
else:
pi = None

for item in archive.iter_items(filter, preload=True):
for item in archive.iter_items(filter):
archive.preload_item_chunks(item, optimize_hardlinks=True)
orig_path = item.path
if strip_components:
item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])
Expand Down
3 changes: 2 additions & 1 deletion src/borg/archiver/tar_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ def item_to_paxheaders(format, item):
ph["BORG.item.meta"] = meta_text
return ph

for item in archive.iter_items(filter, preload=True):
for item in archive.iter_items(filter):
archive.preload_item_chunks(item, optimize_hardlinks=True)
orig_path = item.path
if strip_components:
item.path = os.sep.join(orig_path.split(os.sep)[strip_components:])
Expand Down
2 changes: 1 addition & 1 deletion src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def _build_files_cache(self):
)
files_cache_logger.debug("FILES-CACHE-BUILD: starting...")
archive = Archive(self.manifest, prev_archive.id)
for item in archive.iter_items(preload=False):
for item in archive.iter_items():
# only put regular files' infos into the files cache:
if stat.S_ISREG(item.mode):
path_hash = self.key.id_hash(safe_encode(item.path))
Expand Down
29 changes: 17 additions & 12 deletions src/borg/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,8 @@ def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True):
if not calls and cmd != "async_responses":
return

assert not is_preloaded or cmd == "get", "is_preloaded is only supported for 'get'"

def send_buffer():
if self.to_send:
try:
Expand Down Expand Up @@ -846,6 +848,9 @@ def handle_error(unpacked):
maximum_to_send = 0 if wait else self.upload_buffer_size_limit
send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise.
while wait or calls:
logger.debug(
f"call_many: calls: {len(calls)} waiting_for: {len(waiting_for)} responses: {len(self.responses)}"
)
if self.shutdown_time and time.monotonic() > self.shutdown_time:
# we are shutting this RemoteRepository down already, make sure we do not waste
# a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
Expand Down Expand Up @@ -946,18 +951,18 @@ def handle_error(unpacked):
and len(waiting_for) < MAX_INFLIGHT
):
if calls:
if is_preloaded:
assert cmd == "get", "is_preload is only supported for 'get'"
if calls[0]["id"] in self.chunkid_to_msgids:
waiting_for.append(pop_preload_msgid(calls.pop(0)["id"]))
else:
args = calls.pop(0)
if cmd == "get" and args["id"] in self.chunkid_to_msgids:
waiting_for.append(pop_preload_msgid(args["id"]))
else:
self.msgid += 1
waiting_for.append(self.msgid)
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
args = calls[0]
if cmd == "get" and args["id"] in self.chunkid_to_msgids:
# we have a get command and have already sent a request for this chunkid when
# doing preloading, so we know the msgid of the response we are waiting for:
waiting_for.append(pop_preload_msgid(args["id"]))
del calls[0]
elif not is_preloaded:
# make and send a request (already done if we are using preloading)
self.msgid += 1
waiting_for.append(self.msgid)
del calls[0]
self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
if not self.to_send and self.preload_ids:
chunk_id = self.preload_ids.pop(0)
args = {"id": chunk_id}
Expand Down
Loading