Skip to content

Commit

Permalink
revert kb_usage utility changes (#2356)
Browse files Browse the repository at this point in the history
  • Loading branch information
drf7 authored Jul 30, 2024
1 parent 64f870b commit 1926cb5
Show file tree
Hide file tree
Showing 22 changed files with 205 additions and 269 deletions.
4 changes: 0 additions & 4 deletions nucliadb/src/nucliadb/ingest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@
start_indexing_utility,
start_nats_manager,
start_transaction_utility,
start_usage_utility,
stop_audit_utility,
stop_indexing_utility,
stop_nats_manager,
stop_transaction_utility,
stop_usage_utility,
)


Expand All @@ -64,13 +62,11 @@ async def initialize() -> list[Callable[[], Awaitable[None]]]:
await start_indexing_utility(SERVICE_NAME)

await start_audit_utility(SERVICE_NAME)
await start_usage_utility(SERVICE_NAME)

finalizers = [
stop_transaction_utility,
stop_indexing_utility,
stop_audit_utility,
stop_usage_utility,
teardown_cluster,
]

Expand Down
13 changes: 2 additions & 11 deletions nucliadb/src/nucliadb/ingest/consumer/auditing.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
from nucliadb_utils import const
from nucliadb_utils.audit.audit import AuditStorage
from nucliadb_utils.cache.pubsub import PubSubDriver
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import KBSource, Service
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import Storage as KbUsageStorage
from nucliadb_utils.nuclia_usage.utils.kb_usage_report import KbUsageReportUtility
from nucliadb_utils.storages.storage import Storage

from . import metrics
Expand Down Expand Up @@ -61,12 +58,10 @@ def __init__(
self,
*,
audit: AuditStorage,
usage: KbUsageReportUtility,
pubsub: PubSubDriver,
check_delay: float = 5.0,
):
self.audit = audit
self.usage = usage
self.pubsub = pubsub
self.shard_manager = get_shard_manager()
self.task_handler = DelayedTaskHandler(check_delay)
Expand Down Expand Up @@ -123,12 +118,8 @@ async def process_kb(self, kbid: str) -> None:
total_fields += shard.fields
total_paragraphs += shard.paragraphs

self.usage.send_kb_usage(
service=Service.NUCLIA_DB, # type: ignore
account_id=None,
kb_id=kbid,
kb_source=KBSource.HOSTED, # type: ignore
storage=KbUsageStorage(paragraphs=total_paragraphs, fields=total_fields),
self.audit.report_fields_and_paragraphs(
kbid=kbid, paragraphs=total_paragraphs, fields=total_fields
)


Expand Down
17 changes: 4 additions & 13 deletions nucliadb/src/nucliadb/ingest/consumer/materializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
from nucliadb_protos import writer_pb2
from nucliadb_utils import const
from nucliadb_utils.cache.pubsub import PubSubDriver
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import KBSource, Service
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import Storage as UsageStorage
from nucliadb_utils.storages.storage import Storage
from nucliadb_utils.utilities import get_usage_utility
from nucliadb_utils.utilities import get_audit

from .utils import DelayedTaskHandler

Expand Down Expand Up @@ -101,13 +99,6 @@ async def process(self, kbid: str) -> None:
await datamanagers.resources.set_number_of_resources(txn, kbid=kbid, value=value)
await txn.commit()

usage_utility = get_usage_utility()

if usage_utility:
usage_utility.send_kb_usage(
service=Service.NUCLIA_DB, # type: ignore
account_id=None,
kb_id=kbid,
kb_source=KBSource.HOSTED, # type: ignore
storage=UsageStorage(resources=value),
)
audit = get_audit()
if audit:
audit.report_resources(kbid=kbid, resources=value)
5 changes: 1 addition & 4 deletions nucliadb/src/nucliadb/ingest/consumer/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
get_nats_manager,
get_pubsub,
get_storage,
get_usage_utility,
)

from .auditing import IndexAuditHandler, ResourceWritesAuditHandler
Expand Down Expand Up @@ -154,13 +153,11 @@ async def start_ingest_processed_consumer(
async def start_auditor() -> Callable[[], Awaitable[None]]:
audit = get_audit()
assert audit is not None
usage = get_usage_utility()
assert usage is not None

pubsub = await get_pubsub()
assert pubsub is not None, "Pubsub is not configured"
storage = await get_storage(service_name=SERVICE_NAME)
index_auditor = IndexAuditHandler(audit=audit, usage=usage, pubsub=pubsub)
index_auditor = IndexAuditHandler(audit=audit, pubsub=pubsub)
resource_writes_auditor = ResourceWritesAuditHandler(storage=storage, audit=audit, pubsub=pubsub)

await index_auditor.initialize()
Expand Down
16 changes: 4 additions & 12 deletions nucliadb/src/nucliadb/ingest/orm/knowledgebox.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,13 @@
from nucliadb_protos.utils_pb2 import ReleaseChannel
from nucliadb_utils import const
from nucliadb_utils.aiopynecone.exceptions import PineconeAPIError
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import KBSource, Service
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import Storage as KbUsageStorage
from nucliadb_utils.settings import running_settings
from nucliadb_utils.storages.storage import Storage
from nucliadb_utils.utilities import (
get_audit,
get_endecryptor,
get_pinecone,
get_storage,
get_usage_utility,
has_feature,
)

Expand Down Expand Up @@ -329,15 +327,9 @@ async def delete(cls, driver: Driver, kbid: str):
if kb_config is not None:
await cls._maybe_delete_external_indexes(kbid, kb_config.external_index_provider)

usage_utility = get_usage_utility()
if usage_utility is not None:
usage_utility.send_kb_usage(
service=Service.NUCLIA_DB, # type: ignore
account_id=None,
kb_id=kbid,
kb_source=KBSource.HOSTED, # type: ignore
storage=KbUsageStorage(paragraphs=0, fields=0, resources=0),
)
audit = get_audit()
if audit is not None:
audit.delete_kb(kbid=kbid)

return kbid

Expand Down
28 changes: 1 addition & 27 deletions nucliadb/src/nucliadb/search/api/v1/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,7 @@
from nucliadb_models.security import RequestSecurity
from nucliadb_utils.authentication import requires
from nucliadb_utils.exceptions import LimitsExceededError
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import (
ClientType as ClientTypeKbUsage,
)
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import (
KBSource,
Search,
SearchType,
Service,
)
from nucliadb_utils.utilities import get_audit, get_usage_utility
from nucliadb_utils.utilities import get_audit

SEARCH_EXAMPLES = {
"filtering_by_icon": Example(
Expand Down Expand Up @@ -436,7 +427,6 @@ async def search(
with_status: Optional[ResourceProcessingStatus] = None,
) -> tuple[KnowledgeboxSearchResults, bool]:
audit = get_audit()
usage = get_usage_utility()
start_time = time()

item.min_score = min_score_from_payload(item.min_score)
Expand Down Expand Up @@ -503,22 +493,6 @@ async def search(
time() - start_time,
len(search_results.resources),
)
if usage is not None and do_audit:
usage.send_kb_usage(
service=Service.NUCLIA_DB, # type: ignore
account_id=None,
kb_id=kbid,
kb_source=KBSource.HOSTED, # type: ignore
# TODO unify AuditRequest client type and Nuclia Usage client type
searches=[
Search(
client=ClientTypeKbUsage.Value(x_ndb_client.name), # type: ignore
type=SearchType.SEARCH,
tokens=2000,
num_searches=1,
)
],
)

if item.debug:
search_results.nodes = debug_nodes_info(queried_nodes)
Expand Down
32 changes: 6 additions & 26 deletions nucliadb/src/nucliadb/search/api/v1/suggest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,7 @@
SuggestOptions,
)
from nucliadb_utils.authentication import requires
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import (
ClientType as ClientTypeKbUsage,
)
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import (
KBSource,
Search,
SearchType,
Service,
)
from nucliadb_utils.utilities import get_usage_utility
from nucliadb_utils.utilities import get_audit


@api.get(
Expand Down Expand Up @@ -135,7 +126,7 @@ async def suggest(
debug: bool,
highlight: bool,
) -> KnowledgeboxSuggestResults:
usage = get_usage_utility()
audit = get_audit()
# We need the nodes/shards that are connected to the KB
# We need to query all nodes
pb_query = suggest_query_to_pb(
Expand Down Expand Up @@ -166,21 +157,10 @@ async def suggest(
if debug and queried_shards:
search_results.shards = queried_shards

if usage is not None:
usage.send_kb_usage(
service=Service.NUCLIA_DB, # type: ignore
account_id=None,
kb_id=kbid,
kb_source=KBSource.HOSTED, # type: ignore
# TODO unify AuditRequest client type and Nuclia Usage client type
searches=[
Search(
client=ClientTypeKbUsage.Value(x_ndb_client.name), # type: ignore
type=SearchType.SUGGEST,
tokens=0,
num_searches=1,
)
],
if audit is not None:
audit.suggest(
kbid=kbid,
client_type=x_ndb_client.to_proto(),
)

return search_results
4 changes: 0 additions & 4 deletions nucliadb/src/nucliadb/search/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
finalize_utilities,
get_utility,
start_audit_utility,
start_usage_utility,
stop_audit_utility,
stop_usage_utility,
)


Expand All @@ -50,7 +48,6 @@ async def lifespan(app: FastAPI):
await setup_cluster()

await start_audit_utility(SERVICE_NAME)
await start_usage_utility(SERVICE_NAME)

yield

Expand All @@ -62,6 +59,5 @@ async def lifespan(app: FastAPI):

await finalize_utilities()
await stop_audit_utility()
await stop_usage_utility()
await teardown_cluster()
await clean_telemetry(SERVICE_NAME)
29 changes: 1 addition & 28 deletions nucliadb/src/nucliadb/search/search/find.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,7 @@
NucliaDBClientType,
SearchOptions,
)
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import (
ClientType as ClientTypeKbUsage,
)
from nucliadb_utils.nuclia_usage.protos.kb_usage_pb2 import (
KBSource,
Search,
SearchType,
Service,
)
from nucliadb_utils.utilities import get_audit, get_usage_utility
from nucliadb_utils.utilities import get_audit

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -99,7 +90,6 @@ async def _index_node_retrieval(
metrics: RAGMetrics = RAGMetrics(),
) -> tuple[KnowledgeboxFindResults, bool, QueryParser]:
audit = get_audit()
usage = get_usage_utility()
start_time = time()

item.min_score = min_score_from_payload(item.min_score)
Expand Down Expand Up @@ -170,23 +160,6 @@ async def _index_node_retrieval(
len(search_results.resources),
)

if usage is not None:
usage.send_kb_usage(
service=Service.NUCLIA_DB, # type: ignore
account_id=None,
kb_id=kbid,
kb_source=KBSource.HOSTED, # type: ignore
# TODO unify AuditRequest client type and Nuclia Usage client type
searches=[
Search(
client=ClientTypeKbUsage.Value(x_ndb_client.name), # type: ignore
type=SearchType.SEARCH,
tokens=2000,
num_searches=1,
)
],
)

if item.debug:
search_results.nodes = debug_nodes_info(queried_nodes)

Expand Down
14 changes: 0 additions & 14 deletions nucliadb/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,20 +530,6 @@ async def stream_audit(natsd: str, mocker):
await audit.finalize()


@pytest.fixture(scope="function")
async def stream_kbusage_util(natsd: str):
from nucliadb_utils.nuclia_usage.utils.kb_usage_report import KbUsageReportUtility
from nucliadb_utils.settings import usage_settings

audit = KbUsageReportUtility(
usage_settings.usage_jetstream_subject, # type: ignore
[natsd],
)
await audit.initialize()
yield audit
await audit.finalize()


@pytest.fixture(scope="function")
def predict_mock() -> Mock: # type: ignore
predict = get_utility(Utility.PREDICT)
Expand Down
27 changes: 12 additions & 15 deletions nucliadb/tests/ingest/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
from nucliadb_protos.writer_pb2 import BrokerMessage
from nucliadb_utils import const
from nucliadb_utils.audit.basic import BasicAuditStorage
from nucliadb_utils.audit.stream import StreamAuditStorage
from nucliadb_utils.cache.nats import NatsPubsub
from nucliadb_utils.indexing import IndexingUtility
from nucliadb_utils.nuclia_usage.utils.kb_usage_report import KbUsageReportUtility
from nucliadb_utils.settings import indexing_settings, transaction_settings
from nucliadb_utils.storages.settings import settings as storage_settings
from nucliadb_utils.storages.storage import Storage
Expand Down Expand Up @@ -203,21 +203,18 @@ async def audit():


@pytest.fixture(scope="function")
async def usage(natsd):
from nucliadb_utils.settings import usage_settings
from nucliadb_utils.utilities import Utility, set_utility

report_util = KbUsageReportUtility(
nats_servers=[natsd], nats_subject=usage_settings.usage_jetstream_subject
)
await report_util.initialize()
report_util.nats_connection_manager.js.publish = AsyncMock(
side_effect=report_util.nats_connection_manager.js.publish
async def stream_audit(natsd: str):
from nucliadb_utils.settings import audit_settings

audit = StreamAuditStorage(
[natsd],
audit_settings.audit_jetstream_target, # type: ignore
audit_settings.audit_partitions,
audit_settings.audit_hash_seed,
)
set_utility(Utility.USAGE, report_util)
yield report_util
await report_util.finalize()
clean_utility(Utility.USAGE)
await audit.initialize()
yield audit
await audit.finalize()


@pytest.fixture(scope="function")
Expand Down
Loading

0 comments on commit 1926cb5

Please sign in to comment.