Skip to content

Commit

Permalink
Rollover kbs with external index provider (#2396)
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran authored Aug 16, 2024
1 parent e2e4d34 commit fb25dbd
Show file tree
Hide file tree
Showing 26 changed files with 690 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
"""

from nucliadb.common.cluster.rollover import rollover_kb_shards
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext


async def migrate(context: ExecutionContext) -> None: ...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
await rollover_kb_shards(context, kbid)
await rollover_kb_index(context, kbid)
4 changes: 2 additions & 2 deletions nucliadb/src/migrations/0010_fix_corrupt_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import logging

from nucliadb.common.cluster.rollover import rollover_kb_shards
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext

logger = logging.getLogger(__name__)
Expand All @@ -46,4 +46,4 @@ async def migrate(context: ExecutionContext) -> None: ...
async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
if kbid in AFFECTED_KBS:
logger.info(f"Rolling over affected KB: {kbid}")
await rollover_kb_shards(context, kbid)
await rollover_kb_index(context, kbid)
4 changes: 2 additions & 2 deletions nucliadb/src/migrations/0012_rollover_shards.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import logging
import os

from nucliadb.common.cluster.rollover import rollover_kb_shards
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext

logger = logging.getLogger(__name__)
Expand All @@ -40,4 +40,4 @@ async def migrate(context: ExecutionContext) -> None: ...
async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
if kbid in AFFECTED_KBS:
logger.info(f"Rolling over affected KB: {kbid}")
await rollover_kb_shards(context, kbid)
await rollover_kb_index(context, kbid)
4 changes: 2 additions & 2 deletions nucliadb/src/migrations/0014_rollover_shards.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import logging
import os

from nucliadb.common.cluster.rollover import rollover_kb_shards
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext

logger = logging.getLogger(__name__)
Expand All @@ -37,4 +37,4 @@ async def migrate(context: ExecutionContext) -> None: ...
async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
if os.environ.get("RUNNING_ENVIRONMENT", os.environ.get("ENVIRONMENT")) == "stage":
logger.info(f"Rolling over affected KB: {kbid}")
await rollover_kb_shards(context, kbid)
await rollover_kb_index(context, kbid)
4 changes: 2 additions & 2 deletions nucliadb/src/migrations/0015_targeted_rollover.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import logging
import os

from nucliadb.common.cluster.rollover import rollover_kb_shards
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext

logger = logging.getLogger(__name__)
Expand All @@ -41,4 +41,4 @@ async def migrate(context: ExecutionContext) -> None: ...
async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
if kbid in AFFECTED_KBS:
logger.info(f"Rolling over affected KB: {kbid}")
await rollover_kb_shards(context, kbid)
await rollover_kb_index(context, kbid)
4 changes: 2 additions & 2 deletions nucliadb/src/migrations/0016_upgrade_to_paragraphs_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import logging

from nucliadb.common import datamanagers
from nucliadb.common.cluster.rollover import rollover_kb_shards
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext
from nucliadb_protos.noderesources_pb2 import ShardCreated

Expand All @@ -43,7 +43,7 @@ async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
try:
if await has_old_paragraphs_index(context, kbid):
logger.info("Rolling over affected KB", extra={"kbid": kbid})
await rollover_kb_shards(context, kbid)
await rollover_kb_index(context, kbid)
else:
logger.info(
"KB already has the latest version of the paragraphs index, skipping rollover",
Expand Down
4 changes: 2 additions & 2 deletions nucliadb/src/migrations/0019_upgrade_to_paragraphs_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import logging

from nucliadb.common import datamanagers
from nucliadb.common.cluster.rollover import rollover_kb_shards
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext
from nucliadb_protos.noderesources_pb2 import ShardCreated

Expand All @@ -43,7 +43,7 @@ async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
try:
if await has_old_paragraphs_index(context, kbid):
logger.info("Rolling over affected KB", extra={"kbid": kbid})
await rollover_kb_shards(context, kbid)
await rollover_kb_index(context, kbid)
else:
logger.info(
"KB already has the latest version of the paragraphs index, skipping rollover",
Expand Down
4 changes: 2 additions & 2 deletions nucliadb/src/migrations/0020_drain_nodes_from_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import logging

from nucliadb.common import datamanagers
from nucliadb.common.cluster.rollover import rollover_kb_shards
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.common.cluster.settings import settings as cluster_settings
from nucliadb.migrator.context import ExecutionContext

Expand All @@ -56,7 +56,7 @@ async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
return

logger.info("Rolling over affected KB", extra={"kbid": kbid})
await rollover_kb_shards(context, kbid, drain_nodes=drain_node_ids)
await rollover_kb_index(context, kbid, drain_nodes=drain_node_ids)


async def kb_has_shards_on_drain_nodes(kbid: str, drain_node_ids: list[str]) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions nucliadb/src/migrations/0022_fix_paragraph_deletion_bug.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import logging
import os

from nucliadb.common.cluster.rollover import rollover_kb_shards
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.migrator.context import ExecutionContext

logger = logging.getLogger(__name__)
Expand All @@ -40,4 +40,4 @@ async def migrate(context: ExecutionContext) -> None: ...

async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
if os.environ.get("RUNNING_ENVIRONMENT", os.environ.get("ENVIRONMENT")) == "stage":
await rollover_kb_shards(context, kbid)
await rollover_kb_index(context, kbid)
Loading

0 comments on commit fb25dbd

Please sign in to comment.