From 135ae0c21cebabe1abb24af76521d75a182b30b9 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Tue, 24 Sep 2024 12:59:50 +0200 Subject: [PATCH] Rollover for texts3 (#2486) --- .../0009_upgrade_relations_and_texts_to_v2.py | 6 ++- .../migrations/0010_fix_corrupt_indexes.py | 20 ++++---- .../src/migrations/0012_rollover_shards.py | 11 ++--- .../src/migrations/0014_rollover_shards.py | 9 ++-- .../src/migrations/0015_targeted_rollover.py | 11 ++--- .../0016_upgrade_to_paragraphs_v2.py | 49 ++++++++++--------- .../0019_upgrade_to_paragraphs_v3.py | 49 ++++++++++--------- .../0022_fix_paragraph_deletion_bug.py | 8 +-- .../src/migrations/0027_rollover_texts3.py | 34 +++++++++++++ 9 files changed, 116 insertions(+), 81 deletions(-) create mode 100644 nucliadb/src/migrations/0027_rollover_texts3.py diff --git a/nucliadb/src/migrations/0009_upgrade_relations_and_texts_to_v2.py b/nucliadb/src/migrations/0009_upgrade_relations_and_texts_to_v2.py index d5cc870ebd..4bbddbc8f6 100644 --- a/nucliadb/src/migrations/0009_upgrade_relations_and_texts_to_v2.py +++ b/nucliadb/src/migrations/0009_upgrade_relations_and_texts_to_v2.py @@ -26,7 +26,6 @@ """ -from nucliadb.common.cluster.rollover import rollover_kb_index from nucliadb.migrator.context import ExecutionContext @@ -34,4 +33,7 @@ async def migrate(context: ExecutionContext) -> None: ... async def migrate_kb(context: ExecutionContext, kbid: str) -> None: - await rollover_kb_index(context, kbid) + """ + We only need 1 rollover migration defined at a time; otherwise, we will + possibly run many for a kb when we only ever need to run one + """ diff --git a/nucliadb/src/migrations/0010_fix_corrupt_indexes.py b/nucliadb/src/migrations/0010_fix_corrupt_indexes.py index 9522759366..18aa622b07 100644 --- a/nucliadb/src/migrations/0010_fix_corrupt_indexes.py +++ b/nucliadb/src/migrations/0010_fix_corrupt_indexes.py @@ -27,23 +27,23 @@ import logging -from nucliadb.common.cluster.rollover import rollover_kb_index from nucliadb.migrator.context import ExecutionContext logger = logging.getLogger(__name__) -AFFECTED_KBS = [ - "1efc5a33-bc5a-490c-8b47-b190beee212d", - "f11d6eb9-da5e-4519-ac3d-e304bfa5c354", - "096d9070-f7be-40c8-a24c-19c89072e3ff", - "848f01bc-341a-4346-b473-6b11b76b26eb", -] +# AFFECTED_KBS = [ +# "1efc5a33-bc5a-490c-8b47-b190beee212d", +# "f11d6eb9-da5e-4519-ac3d-e304bfa5c354", +# "096d9070-f7be-40c8-a24c-19c89072e3ff", +# "848f01bc-341a-4346-b473-6b11b76b26eb", +# ] async def migrate(context: ExecutionContext) -> None: ... async def migrate_kb(context: ExecutionContext, kbid: str) -> None: - if kbid in AFFECTED_KBS: - logger.info(f"Rolling over affected KB: {kbid}") - await rollover_kb_index(context, kbid) + """ + We only need 1 rollover migration defined at a time; otherwise, we will + possibly run many for a kb when we only ever need to run one + """ diff --git a/nucliadb/src/migrations/0012_rollover_shards.py b/nucliadb/src/migrations/0012_rollover_shards.py index 9481a99482..33709caf2d 100644 --- a/nucliadb/src/migrations/0012_rollover_shards.py +++ b/nucliadb/src/migrations/0012_rollover_shards.py @@ -23,21 +23,20 @@ """ import logging -import os -from nucliadb.common.cluster.rollover import rollover_kb_index from nucliadb.migrator.context import ExecutionContext logger = logging.getLogger(__name__) -AFFECTED_KBS = [kbid.strip() for kbid in os.environ.get("ROLLOVER_KBS", "").split(",") if kbid.strip()] +# AFFECTED_KBS = [kbid.strip() for kbid in os.environ.get("ROLLOVER_KBS", "").split(",") if kbid.strip()] async def migrate(context: ExecutionContext) -> None: ... async def migrate_kb(context: ExecutionContext, kbid: str) -> None: - if kbid in AFFECTED_KBS: - logger.info(f"Rolling over affected KB: {kbid}") - await rollover_kb_index(context, kbid) + """ + We only need 1 rollover migration defined at a time; otherwise, we will + possibly run many for a kb when we only ever need to run one + """ diff --git a/nucliadb/src/migrations/0014_rollover_shards.py b/nucliadb/src/migrations/0014_rollover_shards.py index d3435798e1..a2e43b31da 100644 --- a/nucliadb/src/migrations/0014_rollover_shards.py +++ b/nucliadb/src/migrations/0014_rollover_shards.py @@ -23,9 +23,7 @@ """ import logging -import os -from nucliadb.common.cluster.rollover import rollover_kb_index from nucliadb.migrator.context import ExecutionContext logger = logging.getLogger(__name__) @@ -35,6 +33,7 @@ async def migrate(context: ExecutionContext) -> None: ... async def migrate_kb(context: ExecutionContext, kbid: str) -> None: - if os.environ.get("RUNNING_ENVIRONMENT", os.environ.get("ENVIRONMENT")) == "stage": - logger.info(f"Rolling over affected KB: {kbid}") - await rollover_kb_index(context, kbid) + """ + We only need 1 rollover migration defined at a time; otherwise, we will + possibly run many for a kb when we only ever need to run one + """ diff --git a/nucliadb/src/migrations/0015_targeted_rollover.py b/nucliadb/src/migrations/0015_targeted_rollover.py index f309ee3178..5cb1757b05 100644 --- a/nucliadb/src/migrations/0015_targeted_rollover.py +++ b/nucliadb/src/migrations/0015_targeted_rollover.py @@ -24,21 +24,20 @@ """ import logging -import os -from nucliadb.common.cluster.rollover import rollover_kb_index from nucliadb.migrator.context import ExecutionContext logger = logging.getLogger(__name__) -AFFECTED_KBS = [kbid.strip() for kbid in os.environ.get("ROLLOVER_KBS", "").split(",") if kbid.strip()] +# AFFECTED_KBS = [kbid.strip() for kbid in os.environ.get("ROLLOVER_KBS", "").split(",") if kbid.strip()] async def migrate(context: ExecutionContext) -> None: ... async def migrate_kb(context: ExecutionContext, kbid: str) -> None: - if kbid in AFFECTED_KBS: - logger.info(f"Rolling over affected KB: {kbid}") - await rollover_kb_index(context, kbid) + """ + We only need 1 rollover migration defined at a time; otherwise, we will + possibly run many for a kb when we only ever need to run one + """ diff --git a/nucliadb/src/migrations/0016_upgrade_to_paragraphs_v2.py b/nucliadb/src/migrations/0016_upgrade_to_paragraphs_v2.py index c9ccc03df9..c1e6aaa8e8 100644 --- a/nucliadb/src/migrations/0016_upgrade_to_paragraphs_v2.py +++ b/nucliadb/src/migrations/0016_upgrade_to_paragraphs_v2.py @@ -25,10 +25,7 @@ import logging -from nucliadb.common import datamanagers -from nucliadb.common.cluster.rollover import rollover_kb_index from nucliadb.migrator.context import ExecutionContext -from nucliadb_protos.noderesources_pb2 import ShardCreated logger = logging.getLogger(__name__) @@ -40,26 +37,30 @@ async def migrate(context: ExecutionContext) -> None: ... async def migrate_kb(context: ExecutionContext, kbid: str) -> None: - try: - if await has_old_paragraphs_index(context, kbid): - logger.info("Rolling over affected KB", extra={"kbid": kbid}) - await rollover_kb_index(context, kbid) - else: - logger.info( - "KB already has the latest version of the paragraphs index, skipping rollover", - extra={"kbid": kbid}, - ) - except ShardsObjectNotFound: - logger.warning("KB not found, skipping rollover", extra={"kbid": kbid}) + """ + We only need 1 rollover migration defined at a time; otherwise, we will + possibly run many for a kb when we only ever need to run one + """ + # try: + # if await has_old_paragraphs_index(context, kbid): + # logger.info("Rolling over affected KB", extra={"kbid": kbid}) + # await rollover_kb_index(context, kbid) + # else: + # logger.info( + # "KB already has the latest version of the paragraphs index, skipping rollover", + # extra={"kbid": kbid}, + # ) + # except ShardsObjectNotFound: + # logger.warning("KB not found, skipping rollover", extra={"kbid": kbid}) -async def has_old_paragraphs_index(context: ExecutionContext, kbid: str) -> bool: - async with context.kv_driver.transaction(read_only=True) as txn: - shards_object = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid, for_update=False) - if not shards_object: - raise ShardsObjectNotFound() - for shard in shards_object.shards: - for replica in shard.replicas: - if replica.shard.paragraph_service != ShardCreated.ParagraphService.PARAGRAPH_V2: - return True - return False +# async def has_old_paragraphs_index(context: ExecutionContext, kbid: str) -> bool: +# async with context.kv_driver.transaction(read_only=True) as txn: +# shards_object = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid, for_update=False) +# if not shards_object: +# raise ShardsObjectNotFound() +# for shard in shards_object.shards: +# for replica in shard.replicas: +# if replica.shard.paragraph_service != ShardCreated.ParagraphService.PARAGRAPH_V2: +# return True +# return False diff --git a/nucliadb/src/migrations/0019_upgrade_to_paragraphs_v3.py b/nucliadb/src/migrations/0019_upgrade_to_paragraphs_v3.py index d51d9507df..5bd1d795fc 100644 --- a/nucliadb/src/migrations/0019_upgrade_to_paragraphs_v3.py +++ b/nucliadb/src/migrations/0019_upgrade_to_paragraphs_v3.py @@ -25,10 +25,7 @@ import logging -from nucliadb.common import datamanagers -from nucliadb.common.cluster.rollover import rollover_kb_index from nucliadb.migrator.context import ExecutionContext -from nucliadb_protos.noderesources_pb2 import ShardCreated logger = logging.getLogger(__name__) @@ -40,26 +37,30 @@ async def migrate(context: ExecutionContext) -> None: ... async def migrate_kb(context: ExecutionContext, kbid: str) -> None: - try: - if await has_old_paragraphs_index(context, kbid): - logger.info("Rolling over affected KB", extra={"kbid": kbid}) - await rollover_kb_index(context, kbid) - else: - logger.info( - "KB already has the latest version of the paragraphs index, skipping rollover", - extra={"kbid": kbid}, - ) - except ShardsObjectNotFound: - logger.warning("KB not found, skipping rollover", extra={"kbid": kbid}) + """ + We only need 1 rollover migration defined at a time; otherwise, we will + possibly run many for a kb when we only ever need to run one + """ + # try: + # if await has_old_paragraphs_index(context, kbid): + # logger.info("Rolling over affected KB", extra={"kbid": kbid}) + # await rollover_kb_index(context, kbid) + # else: + # logger.info( + # "KB already has the latest version of the paragraphs index, skipping rollover", + # extra={"kbid": kbid}, + # ) + # except ShardsObjectNotFound: + # logger.warning("KB not found, skipping rollover", extra={"kbid": kbid}) -async def has_old_paragraphs_index(context: ExecutionContext, kbid: str) -> bool: - async with context.kv_driver.transaction(read_only=True) as txn: - shards_object = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid) - if not shards_object: - raise ShardsObjectNotFound() - for shard in shards_object.shards: - for replica in shard.replicas: - if replica.shard.paragraph_service != ShardCreated.ParagraphService.PARAGRAPH_V3: - return True - return False +# async def has_old_paragraphs_index(context: ExecutionContext, kbid: str) -> bool: +# async with context.kv_driver.transaction(read_only=True) as txn: +# shards_object = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid) +# if not shards_object: +# raise ShardsObjectNotFound() +# for shard in shards_object.shards: +# for replica in shard.replicas: +# if replica.shard.paragraph_service != ShardCreated.ParagraphService.PARAGRAPH_V3: +# return True +# return False diff --git a/nucliadb/src/migrations/0022_fix_paragraph_deletion_bug.py b/nucliadb/src/migrations/0022_fix_paragraph_deletion_bug.py index 1f97d45c1a..19a63b90f9 100644 --- a/nucliadb/src/migrations/0022_fix_paragraph_deletion_bug.py +++ b/nucliadb/src/migrations/0022_fix_paragraph_deletion_bug.py @@ -27,9 +27,7 @@ """ import logging -import os -from nucliadb.common.cluster.rollover import rollover_kb_index from nucliadb.migrator.context import ExecutionContext logger = logging.getLogger(__name__) @@ -39,5 +37,7 @@ async def migrate(context: ExecutionContext) -> None: ... async def migrate_kb(context: ExecutionContext, kbid: str) -> None: - if os.environ.get("RUNNING_ENVIRONMENT", os.environ.get("ENVIRONMENT")) == "stage": - await rollover_kb_index(context, kbid) + """ + We only need 1 rollover migration defined at a time; otherwise, we will + possibly run many for a kb when we only ever need to run one + """ diff --git a/nucliadb/src/migrations/0027_rollover_texts3.py b/nucliadb/src/migrations/0027_rollover_texts3.py new file mode 100644 index 0000000000..45ced8a18c --- /dev/null +++ b/nucliadb/src/migrations/0027_rollover_texts3.py @@ -0,0 +1,34 @@ +# Copyright (C) 2021 Bosutech XXI S.L. +# +# nucliadb is offered under the AGPL v3.0 and as commercial software. +# For commercial licensing, contact us at info@nuclia.com. +# +# AGPL: +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# + +"""Migration #27 + +Rollover for nucliadb_texts3 +""" + +from nucliadb.common.cluster.rollover import rollover_kb_index +from nucliadb.migrator.context import ExecutionContext + + +async def migrate(context: ExecutionContext) -> None: ... + + +async def migrate_kb(context: ExecutionContext, kbid: str) -> None: + await rollover_kb_index(context, kbid)