Skip to content

Commit

Permalink
move from manifest.chunks_index_id to cache/chunks_hash
Browse files Browse the repository at this point in the history
so we can use that even if we don't have the manifest loaded yet.
  • Loading branch information
ThomasWaldmann committed Sep 24, 2024
1 parent f3180bf commit 49c3bd6
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 37 deletions.
10 changes: 7 additions & 3 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -2101,10 +2101,14 @@ def valid_item(obj):
def finish(self):
if self.repair:
logger.info("Writing Manifest.")
# we may have deleted chunks!
self.manifest.chunks_index_id = None # invalidate all chunks index caches
# we may have deleted chunks, invalidate/remove the chunks index cache!
try:
self.repository.store_delete("cache/chunks") # delete the now invalid chunks cache
self.repository.store_delete("cache/chunks_hash")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
pass
try:
self.repository.store_delete("cache/chunks")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
pass
Expand Down
2 changes: 1 addition & 1 deletion src/borg/archiver/compact_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def save_chunk_index(self):
# as we put the wrong size in there, we need to clean up the size:
self.chunks[id] = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
# now self.chunks is an uptodate ChunkIndex, usable for general borg usage!
write_chunkindex_to_repo_cache(self.repository, self.chunks, self.manifest, compact=True, clear=True)
write_chunkindex_to_repo_cache(self.repository, self.chunks, compact=True, clear=True, force_write=True)
self.chunks = None # nothing there (cleared!)

def analyze_archives(self) -> Tuple[Set, Set, int, int, int]:
Expand Down
55 changes: 36 additions & 19 deletions src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,18 @@ def memorize_file(self, hashed_path, path_hash, st, chunks):
)


def write_chunkindex_to_repo_cache(repository, chunks, manifest=None, compact=False, clear=False):
def load_chunks_hash(repository) -> bytes:
try:
hash = repository.store_load("cache/chunks_hash")
logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.")
except StoreObjectNotFound:
hash = b""
logger.debug("cache/chunks_hash missing!")
return hash


def write_chunkindex_to_repo_cache(repository, chunks, *, compact=False, clear=False, force_write=False):
cached_hash = load_chunks_hash(repository)
if compact:
# if we don't need the in-memory chunks index anymore:
chunks.compact() # vacuum the hash table
Expand All @@ -629,31 +640,36 @@ def write_chunkindex_to_repo_cache(repository, chunks, manifest=None, compact=Fa
if clear:
# if we don't need the in-memory chunks index anymore:
chunks.clear() # free memory, immediately
id = xxh64(data)
logger.debug(f"writing chunks index {bin_to_hex(id)} to cache/chunks in repo...")
update = manifest is None or id != manifest.chunks_index_id
if update:
new_hash = xxh64(data)
if force_write or new_hash != cached_hash:
# when an updated chunks index is stored into the cache, we also store its hash into the cache.
# when a client is loading the chunks index from a cache, it has to compare its xxh64
# hash against cache/chunks_hash in the repository. if it is the same, the cache
# is valid. If it is different, the cache is either corrupted or out of date and
# has to be discarded.
# when some functionality is DELETING chunks from the repository, it has to either
# update both cache/chunks and cache/chunks_hash (like borg compact does) or it has to set
# chunks_hash to an invalid value (like empty), so that all clients will discard their chunks
# index caches.
logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...")
repository.store_store("cache/chunks", data)
if manifest and update:
logger.debug(f"updating manifest.chunks_index_id = {bin_to_hex(id)}.")
manifest.chunks_index_id = id
manifest.write()
return id
repository.store_store("cache/chunks_hash", new_hash)
return new_hash


def build_chunkindex_from_repo(repository, *, manifest=None, disable_caches=False):
def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=True):
chunks = None
# first, try to load a pre-computed and centrally cached chunks index:
if manifest and not disable_caches:
wanted_ci_id = manifest.chunks_index_id
logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_ci_id or b'')}) from the repo...")
if not disable_caches:
wanted_hash = load_chunks_hash(repository)
logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash or b'')}) from the repo...")
try:
chunks_data = repository.store_load("cache/chunks")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
logger.debug("cache/chunks not found in the repository.")
else:
if xxh64(chunks_data) == manifest.chunks_index_id:
if xxh64(chunks_data) == wanted_hash:
logger.debug("cache/chunks is valid.")
with io.BytesIO(chunks_data) as f:
chunks = ChunkIndex.read(f)
Expand Down Expand Up @@ -688,8 +704,9 @@ def build_chunkindex_from_repo(repository, *, manifest=None, disable_caches=Fals
# Protocol overhead is neglected in this calculation.
speed = format_file_size(num_chunks * 34 / duration)
logger.debug(f"queried {num_chunks} chunk IDs in {duration} s ({num_requests} requests), ~{speed}/s")
# immediately update cache/chunks, so we only rarely have to do it the slow way:
write_chunkindex_to_repo_cache(repository, chunks, manifest, compact=False, clear=False)
if cache_immediately:
# immediately update cache/chunks, so we only rarely have to do it the slow way:
write_chunkindex_to_repo_cache(repository, chunks, compact=False, clear=False, force_write=True)
return chunks


Expand All @@ -704,7 +721,7 @@ def __init__(self):
@property
def chunks(self):
if self._chunks is None:
self._chunks = build_chunkindex_from_repo(self.repository, manifest=self.manifest)
self._chunks = build_chunkindex_from_repo(self.repository)
return self._chunks

def seen_chunk(self, id, size=None):
Expand Down Expand Up @@ -755,7 +772,7 @@ def add_chunk(

def _write_chunks_cache(self, chunks):
# this is called from .close, so we can clear/compact here:
write_chunkindex_to_repo_cache(self.repository, self._chunks, self.manifest, compact=True, clear=True)
write_chunkindex_to_repo_cache(self.repository, self._chunks, compact=True, clear=True)
self._chunks = None # nothing there (cleared!)


Expand Down
14 changes: 0 additions & 14 deletions src/borg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,17 +424,6 @@ def __init__(self, key, repository, item_keys=None, ro_cls=RepoObj):
self.repository = repository
self.item_keys = frozenset(item_keys) if item_keys is not None else ITEM_KEYS
self.timestamp = None
# when an updated chunks index has been stored into the cache, the manifest must
# be updated with its chunks_index_id.
# when a client is loading the chunks index from a cache, it has to compare its xxh64
# hash against the chunks_index_id in the manifest. if it is the same, the cache
# is valid. If it is different, the cache is either corrupted or out of date and
# has to be discarded.
# when some functionality is DELETING chunks from the repository, it has to either
# update the cache with a correct chunks index and the manifest with the corresponding
# chunks_index_id (like borg compact does) or it has to set chunks_index_id to an
# invalid value (like None), so that all clients will discard their chunks index caches.
self.chunks_index_id = None # xxh64(chunks_index)

@property
def id_str(self):
Expand Down Expand Up @@ -466,8 +455,6 @@ def load(cls, repository, operations, key=None, *, ro_cls=RepoObj):
manifest.item_keys = ITEM_KEYS
manifest.item_keys |= frozenset(m.config.get("item_keys", [])) # new location of item_keys since borg2
manifest.item_keys |= frozenset(m.get("item_keys", [])) # legacy: borg 1.x: item_keys not in config yet
manifest.chunks_index_id = m.config.get("chunks_index_id")
logger.debug(f"loaded manifest.chunks_index_id: {bin_to_hex(manifest.chunks_index_id or b'')}")
manifest.check_repository_compatibility(operations)
return manifest

Expand Down Expand Up @@ -512,7 +499,6 @@ def write(self):
assert all(len(name) <= 255 for name in self.archives.names())
assert len(self.item_keys) <= 100
self.config["item_keys"] = tuple(sorted(self.item_keys))
self.config["chunks_index_id"] = self.chunks_index_id
manifest_archives = self.archives.finish(self)
manifest = ManifestItem(
version=2, archives=manifest_archives, timestamp=self.timestamp, config=StableDict(self.config)
Expand Down

0 comments on commit 49c3bd6

Please sign in to comment.