Skip to content

Commit

Permalink
Optimise fetch call ids and support multiple flow ids as param (#25)
Browse files Browse the repository at this point in the history
* Initial commit

* Refactor to support min call quantity for flow-ids

* Move values to constants

* Update secrets.dvc

* Update version

---------

Co-authored-by: dhanashree.s <[email protected]>
  • Loading branch information
harshithere and d-shree authored Nov 15, 2023
1 parent 656826b commit cef89e6
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "skit-calls"
version = "0.2.46"
version = "0.2.47"
description = "Library to fetch calls from a given environment."
authors = ["ltbringer <[email protected]>"]
license = "GPL-3.0-only"
Expand Down
4 changes: 2 additions & 2 deletions secrets.dvc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
outs:
- md5: e955074d27999737daa70b494ac9df32.dir
size: 5364
- md5: 371b502452d45963aab9f99e55aaa894.dir
size: 5495
nfiles: 5
path: secrets
102 changes: 81 additions & 21 deletions skit_calls/calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from skit_calls import constants as const
from skit_calls.data import mutators, query
from skit_calls.data.model import Turn

from skit_calls.utils import convert_str_to_int_list

def save_turns_in_memory(stream: Iterable[Dict[str, Any]]) -> pd.DataFrame:
return pd.DataFrame(list(stream))
Expand All @@ -24,6 +24,42 @@ def save_turns_on_disk(stream: Iterable[Dict[str, Any]]) -> str:
writer.writerow(turn)
return file_path

def get_call_ids_for_flow(flow_id,
call_quantity,
random_call_id_limit,
start_date,
end_date,
org_ids,
call_type,
lang,
min_duration,
template_id,
use_case,
flow_name,
ignore_callers,
reported):
logger.info(f"Random id limit {random_call_id_limit}")
logger.info(f"Call quantity limit {call_quantity}")
logger.info(f"Flow ids {flow_id}")
call_ids = query.gen_random_call_ids(
start_date=start_date,
end_date=end_date,
ids_=org_ids,
limit=call_quantity,
call_type=call_type,
lang=lang,
min_duration=min_duration,
template_id=template_id,
use_case=use_case,
flow_name=flow_name,
excluded_numbers=ignore_callers,
reported=reported,
flow_id=flow_id,
random_id_limit=random_call_id_limit
)
logger.info(f"Number of call Ids obtained is {len(call_ids)}")
return call_ids


def sample(
start_date: str,
Expand All @@ -47,6 +83,7 @@ def sample(
batch_turns: int = const.TURNS_LIMIT,
delay: float = const.Q_DELAY,
timezone: str = const.DEFAULT_TIMEZONE,
flow_ids: Optional[List[str]] = [],
) -> Union[str, pd.DataFrame]:
"""
Sample calls.
Expand Down Expand Up @@ -101,29 +138,53 @@ def sample(
:param timezone: Timezone for the sampling, defaults to "Asia/Kolkata"
:type timezone: str, optional
:param flow_ids: A list of flow ids from which to retrieve the data
:type flow_ids: Optional[str]
:return: A directory path if save is set to "files" otherwise path to a file.
:rtype: str
"""
start_time = time.time()
random_call_ids = query.gen_random_call_ids(
start_date,
end_date,
ids_=org_ids,
limit=call_quantity,
call_type=call_type,
lang=lang,
min_duration=min_duration,
template_id=template_id,
use_case=use_case,
flow_name=flow_name,
excluded_numbers=ignore_callers,
reported=reported,
)
logger.info(f"Number of call Ids obtained is {len(random_call_ids)}")
end_time_first = time.time()
total_time = str(end_time_first - start_time)
logger.info(f"Time required to obtain call IDs {total_time} seconds")
logger.info(f"Flow ids: {flow_ids}")
flow_ids = convert_str_to_int_list(flow_ids)
random_id_limit = min(30*call_quantity, 75000)
all_call_ids = []
org_ids = convert_str_to_int_list(org_ids)
for flow_id in flow_ids:
flow_id_list = []
flow_id_list.append(flow_id)
random_call_ids = get_call_ids_for_flow(flow_id_list, const.MIN_ASSURED_CALL_QUANTITY,
const.MIN_RANDOM_CALL_ID_LIMIT, start_date,
end_date, org_ids, call_type, lang,
min_duration, template_id, use_case,
flow_name, ignore_callers, reported)
random_call_id_list_1= list(random_call_ids)
logger.info(f"Number of call ids for flow {flow_id}: {len(random_call_id_list_1)}")
all_call_ids += random_call_id_list_1

loop_end_time = time.time()
final_time = str(loop_end_time-start_time)
logger.info(f"Time to finish loop: {final_time}")

random_call_ids = get_call_ids_for_flow(flow_ids, call_quantity,
random_id_limit, start_date,
end_date, org_ids, call_type, lang,
min_duration, template_id, use_case,
flow_name, ignore_callers, reported)

end_time_1 = time.time()
final_time_1 = str(end_time_1-start_time)

random_call_id_list_2= list(random_call_ids)
all_call_ids += random_call_id_list_2

final_call_ids = tuple(set(all_call_ids))

logger.info(f"Number of call ids: {len(final_call_ids)}")

logger.info(f"Time to finish getting call ids: {final_time_1}")

random_call_data = query.gen_random_calls(
random_call_ids,
asr_provider=asr_provider,
Expand All @@ -136,15 +197,14 @@ def sample(
timezone=timezone,
)
end_time_second = time.time()
total_time_second_query = str(end_time_second - end_time_first)
total_time_second_query = str(end_time_second - end_time_1)
logger.info(f"Time required to obtain call data from queried IDs {total_time_second_query} seconds")
if on_disk:
return save_turns_on_disk(random_call_data)
df = save_turns_in_memory(random_call_data)
logger.info(f"Number of call with data obtained is {df.shape[0]}")
return df


def select(
call_ids: Optional[List[int]] = None,
org_ids: Optional[Set[int]] = None,
Expand Down
8 changes: 8 additions & 0 deletions skit_calls/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ def build_sample_command(parser: argparse.ArgumentParser) -> None:
help="A comma separated list of states to keep turns from, and remove all else.",
default=[],
)
parser.add_argument(
"--flow-ids",
type=str,
nargs="*",
help="A comma separated list of flow ids to keep turns from, and remove all else.",
default=[],
)


def build_select_command(parser: argparse.ArgumentParser) -> None:
Expand Down Expand Up @@ -299,6 +306,7 @@ def random_sample_calls(args: argparse.Namespace) -> Union[str, pd.DataFrame]:
batch_turns=args.batch_turns,
delay=args.delay,
timezone=args.timezone,
flow_ids=args.flow_ids
)
logger.info(f"Finished in {time.time() - start:.2f} seconds")
return maybe_df
Expand Down
4 changes: 4 additions & 0 deletions skit_calls/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@
CALL_IDS = "call_ids"
USE_CASE = "use_case"
FLOW_NAME = "flow_name"
FLOW_ID = "flow_id"
RANDOM_ID_LIMT = "random_id_limit"
TEMPLATE_ID = "template_id"
LANG = "lang"
MIN_AUDIO_DURATION = "min_duration"
Expand All @@ -140,3 +142,5 @@
UCASE_INPUT = "INPUT"
UCASE_AUDIO = "AUDIO"
MARGIN = 0.1
MIN_ASSURED_CALL_QUANTITY = 25 # minimum assured number of calls per flow id
MIN_RANDOM_CALL_ID_LIMIT = 750 # An upper limit of MIN_ASSURED_CALL_QUANTITY * 30
10 changes: 9 additions & 1 deletion skit_calls/data/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,24 @@ def gen_random_call_ids(
lang: Optional[str] = None,
template_id: Optional[int] = None,
flow_name: Optional[str] = None,
flow_id: Optional[Set[str]] = [],
min_duration: Optional[float] = None,
excluded_numbers: Optional[Set[str]] = None,
retry_limit: int = 2,
random_id_limit: int = const.DEFAULT_CALL_QUANTITY,
):
excluded_numbers = set(excluded_numbers) or set()
ids_ = set(ids_) or set()
if not ids_ or template_id :
ids_ = None
elif ids_ and not template_id:
ids_= tuple(ids_)
excluded_numbers = excluded_numbers.union(const.DEFAULT_IGNORE_CALLERS_LIST)
reported_status = 0 if reported else None
call_filters = {
const.END_DATE: end_date,
const.START_DATE: start_date,
const.ID: tuple(ids_) if not template_id else (None,),
const.ID: ids_,
const.CALL_TYPE: tuple(call_type),
const.RESOLVED: reported_status,
const.LANG: lang,
Expand All @@ -56,6 +62,8 @@ def gen_random_call_ids(
const.FLOW_NAME: flow_name,
const.LIMIT: limit + const.MARGIN * limit,
const.TEMPLATE_ID: template_id,
const.FLOW_ID: flow_id,
const.RANDOM_ID_LIMT: random_id_limit
}

logger.debug(f"call_filters={pformat(call_filters)} | {limit=}")
Expand Down
9 changes: 8 additions & 1 deletion skit_calls/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,11 @@ def optimal_paging_params(total_count: int, page_size: int, delay: int) -> Tuple
init_call_quantity *= 10
delay += 0.05
page_size //= 1.8
return int(page_size), delay
return int(page_size), delay

def convert_str_to_int_list(str_values):
int_list = []
if str_values and len(str_values[0])>0:
str_list = str_values[0].strip("[]").split(',')
int_list = [int(value) for value in str_list]
return int_list

0 comments on commit cef89e6

Please sign in to comment.