Skip to content

Commit

Permalink
Rollover for texts3 (#2486)
Browse files Browse the repository at this point in the history
  • Loading branch information
javitonino authored Sep 24, 2024
1 parent e00b8fa commit 135ae0c
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
"""

from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext


async def migrate(context: ExecutionContext) -> None: ...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
await rollover_kb_index(context, kbid)
"""
We only need 1 rollover migration defined at a time; otherwise, we will
possibly run many for a kb when we only ever need to run one
"""
20 changes: 10 additions & 10 deletions nucliadb/src/migrations/0010_fix_corrupt_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@

import logging

from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext

logger = logging.getLogger(__name__)

AFFECTED_KBS = [
"1efc5a33-bc5a-490c-8b47-b190beee212d",
"f11d6eb9-da5e-4519-ac3d-e304bfa5c354",
"096d9070-f7be-40c8-a24c-19c89072e3ff",
"848f01bc-341a-4346-b473-6b11b76b26eb",
]
# AFFECTED_KBS = [
# "1efc5a33-bc5a-490c-8b47-b190beee212d",
# "f11d6eb9-da5e-4519-ac3d-e304bfa5c354",
# "096d9070-f7be-40c8-a24c-19c89072e3ff",
# "848f01bc-341a-4346-b473-6b11b76b26eb",
# ]


async def migrate(context: ExecutionContext) -> None: ...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
if kbid in AFFECTED_KBS:
logger.info(f"Rolling over affected KB: {kbid}")
await rollover_kb_index(context, kbid)
"""
We only need 1 rollover migration defined at a time; otherwise, we will
possibly run many for a kb when we only ever need to run one
"""
11 changes: 5 additions & 6 deletions nucliadb/src/migrations/0012_rollover_shards.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@
"""

import logging
import os

from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext

logger = logging.getLogger(__name__)


AFFECTED_KBS = [kbid.strip() for kbid in os.environ.get("ROLLOVER_KBS", "").split(",") if kbid.strip()]
# AFFECTED_KBS = [kbid.strip() for kbid in os.environ.get("ROLLOVER_KBS", "").split(",") if kbid.strip()]


async def migrate(context: ExecutionContext) -> None: ...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
if kbid in AFFECTED_KBS:
logger.info(f"Rolling over affected KB: {kbid}")
await rollover_kb_index(context, kbid)
"""
We only need 1 rollover migration defined at a time; otherwise, we will
possibly run many for a kb when we only ever need to run one
"""
9 changes: 4 additions & 5 deletions nucliadb/src/migrations/0014_rollover_shards.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
"""

import logging
import os

from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext

logger = logging.getLogger(__name__)
Expand All @@ -35,6 +33,7 @@ async def migrate(context: ExecutionContext) -> None: ...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
if os.environ.get("RUNNING_ENVIRONMENT", os.environ.get("ENVIRONMENT")) == "stage":
logger.info(f"Rolling over affected KB: {kbid}")
await rollover_kb_index(context, kbid)
"""
We only need 1 rollover migration defined at a time; otherwise, we will
possibly run many for a kb when we only ever need to run one
"""
11 changes: 5 additions & 6 deletions nucliadb/src/migrations/0015_targeted_rollover.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,20 @@
"""

import logging
import os

from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext

logger = logging.getLogger(__name__)


AFFECTED_KBS = [kbid.strip() for kbid in os.environ.get("ROLLOVER_KBS", "").split(",") if kbid.strip()]
# AFFECTED_KBS = [kbid.strip() for kbid in os.environ.get("ROLLOVER_KBS", "").split(",") if kbid.strip()]


async def migrate(context: ExecutionContext) -> None: ...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
if kbid in AFFECTED_KBS:
logger.info(f"Rolling over affected KB: {kbid}")
await rollover_kb_index(context, kbid)
"""
We only need 1 rollover migration defined at a time; otherwise, we will
possibly run many for a kb when we only ever need to run one
"""
49 changes: 25 additions & 24 deletions nucliadb/src/migrations/0016_upgrade_to_paragraphs_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@

import logging

from nucliadb.common import datamanagers
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext
from nucliadb_protos.noderesources_pb2 import ShardCreated

logger = logging.getLogger(__name__)

Expand All @@ -40,26 +37,30 @@ async def migrate(context: ExecutionContext) -> None: ...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
try:
if await has_old_paragraphs_index(context, kbid):
logger.info("Rolling over affected KB", extra={"kbid": kbid})
await rollover_kb_index(context, kbid)
else:
logger.info(
"KB already has the latest version of the paragraphs index, skipping rollover",
extra={"kbid": kbid},
)
except ShardsObjectNotFound:
logger.warning("KB not found, skipping rollover", extra={"kbid": kbid})
"""
We only need 1 rollover migration defined at a time; otherwise, we will
possibly run many for a kb when we only ever need to run one
"""
# try:
# if await has_old_paragraphs_index(context, kbid):
# logger.info("Rolling over affected KB", extra={"kbid": kbid})
# await rollover_kb_index(context, kbid)
# else:
# logger.info(
# "KB already has the latest version of the paragraphs index, skipping rollover",
# extra={"kbid": kbid},
# )
# except ShardsObjectNotFound:
# logger.warning("KB not found, skipping rollover", extra={"kbid": kbid})


async def has_old_paragraphs_index(context: ExecutionContext, kbid: str) -> bool:
async with context.kv_driver.transaction(read_only=True) as txn:
shards_object = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid, for_update=False)
if not shards_object:
raise ShardsObjectNotFound()
for shard in shards_object.shards:
for replica in shard.replicas:
if replica.shard.paragraph_service != ShardCreated.ParagraphService.PARAGRAPH_V2:
return True
return False
# async def has_old_paragraphs_index(context: ExecutionContext, kbid: str) -> bool:
# async with context.kv_driver.transaction(read_only=True) as txn:
# shards_object = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid, for_update=False)
# if not shards_object:
# raise ShardsObjectNotFound()
# for shard in shards_object.shards:
# for replica in shard.replicas:
# if replica.shard.paragraph_service != ShardCreated.ParagraphService.PARAGRAPH_V2:
# return True
# return False
49 changes: 25 additions & 24 deletions nucliadb/src/migrations/0019_upgrade_to_paragraphs_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@

import logging

from nucliadb.common import datamanagers
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext
from nucliadb_protos.noderesources_pb2 import ShardCreated

logger = logging.getLogger(__name__)

Expand All @@ -40,26 +37,30 @@ async def migrate(context: ExecutionContext) -> None: ...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
try:
if await has_old_paragraphs_index(context, kbid):
logger.info("Rolling over affected KB", extra={"kbid": kbid})
await rollover_kb_index(context, kbid)
else:
logger.info(
"KB already has the latest version of the paragraphs index, skipping rollover",
extra={"kbid": kbid},
)
except ShardsObjectNotFound:
logger.warning("KB not found, skipping rollover", extra={"kbid": kbid})
"""
We only need 1 rollover migration defined at a time; otherwise, we will
possibly run many for a kb when we only ever need to run one
"""
# try:
# if await has_old_paragraphs_index(context, kbid):
# logger.info("Rolling over affected KB", extra={"kbid": kbid})
# await rollover_kb_index(context, kbid)
# else:
# logger.info(
# "KB already has the latest version of the paragraphs index, skipping rollover",
# extra={"kbid": kbid},
# )
# except ShardsObjectNotFound:
# logger.warning("KB not found, skipping rollover", extra={"kbid": kbid})


async def has_old_paragraphs_index(context: ExecutionContext, kbid: str) -> bool:
async with context.kv_driver.transaction(read_only=True) as txn:
shards_object = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
if not shards_object:
raise ShardsObjectNotFound()
for shard in shards_object.shards:
for replica in shard.replicas:
if replica.shard.paragraph_service != ShardCreated.ParagraphService.PARAGRAPH_V3:
return True
return False
# async def has_old_paragraphs_index(context: ExecutionContext, kbid: str) -> bool:
# async with context.kv_driver.transaction(read_only=True) as txn:
# shards_object = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
# if not shards_object:
# raise ShardsObjectNotFound()
# for shard in shards_object.shards:
# for replica in shard.replicas:
# if replica.shard.paragraph_service != ShardCreated.ParagraphService.PARAGRAPH_V3:
# return True
# return False
8 changes: 4 additions & 4 deletions nucliadb/src/migrations/0022_fix_paragraph_deletion_bug.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
"""

import logging
import os

from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext

logger = logging.getLogger(__name__)
Expand All @@ -39,5 +37,7 @@ async def migrate(context: ExecutionContext) -> None: ...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
if os.environ.get("RUNNING_ENVIRONMENT", os.environ.get("ENVIRONMENT")) == "stage":
await rollover_kb_index(context, kbid)
"""
We only need 1 rollover migration defined at a time; otherwise, we will
possibly run many for a kb when we only ever need to run one
"""
34 changes: 34 additions & 0 deletions nucliadb/src/migrations/0027_rollover_texts3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

"""Migration #27
Rollover for nucliadb_texts3
"""

from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext


async def migrate(context: ExecutionContext) -> None: ...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
await rollover_kb_index(context, kbid)

0 comments on commit 135ae0c

Please sign in to comment.