diff --git a/Scripts/assignment/abstract_assignment.py b/Scripts/assignment/abstract_assignment.py index 6c59c166..e17ceb00 100644 --- a/Scripts/assignment/abstract_assignment.py +++ b/Scripts/assignment/abstract_assignment.py @@ -1,20 +1,25 @@ +from __future__ import annotations from abc import ABCMeta, abstractmethod +from typing import Any, Dict, List, Union class AssignmentModel: __metaclass__ = ABCMeta + @property @abstractmethod - def mapping(self): + def mapping(self) -> Dict[int,int]: """Dictionary of zone numbers and corresponding indices.""" pass + @property @abstractmethod - def zone_numbers(self): + def zone_numbers(self) -> List[int]: pass + @property @abstractmethod - def nr_zones(self): + def nr_zones(self) -> int: pass @abstractmethod @@ -41,5 +46,5 @@ class Period: __metaclass__ = ABCMeta @abstractmethod - def assign(self): + def assign(self, matrices: Dict[Any, Any], iteration: Union[int, str]) -> Dict[Any, Any]: pass diff --git a/Scripts/assignment/assignment_period.py b/Scripts/assignment/assignment_period.py index 91b92730..72370f6c 100644 --- a/Scripts/assignment/assignment_period.py +++ b/Scripts/assignment/assignment_period.py @@ -1,6 +1,8 @@ -import numpy +from __future__ import annotations +import numpy # type: ignore import pandas +from typing import TYPE_CHECKING, Any, Dict, Optional, Union import utils.log as log import parameters.assignment as param import parameters.zone as zone_param @@ -8,6 +10,10 @@ from assignment.datatypes.transit import TransitSpecification from assignment.datatypes.path_analysis import PathAnalysis from assignment.abstract_assignment import Period +if TYPE_CHECKING: + from assignment.emme_bindings.emme_project import EmmeProject + from assignment.datatypes.transit_fare import TransitFareZoneSpecification + from emme_context.modeller.emmebank import Scenario # type: ignore class AssignmentPeriod(Period): @@ -31,24 +37,28 @@ class AssignmentPeriod(Period): Assignment class (car_work/transit_leisure/...) value : dict key : str - Matrix type (demand/time/cost/dist/...) + Matrix type (demand/time/c + + t/dist/...) value : str EMME matrix id separate_emme_scenarios : bool (optional) Whether separate scenarios have been created in EMME for storing time-period specific network results. """ - def __init__(self, name, emme_scenario, emme_context, - emme_matrices, separate_emme_scenarios=False): + def __init__(self, name: str, emme_scenario: int, + emme_context: EmmeProject, + emme_matrices: Dict[str, Dict[str, Any]], + separate_emme_scenarios: bool = False): self.name = name - self.emme_scenario = emme_context.modeller.emmebank.scenario( + self.emme_scenario: Scenario = emme_context.modeller.emmebank.scenario( emme_scenario) self.emme_project = emme_context self._separate_emme_scenarios = separate_emme_scenarios self.emme_matrices = emme_matrices self.dist_unit_cost = param.dist_unit_cost - def extra(self, attr): + def extra(self, attr: str) -> str: """Add prefix "@" and time-period suffix. Parameters @@ -63,7 +73,7 @@ def extra(self, attr): """ return "@{}_{}".format(attr, self.name) - def prepare(self, segment_results): + def prepare(self, segment_results: Dict[str,Dict[str,str]]): """Prepare network for assignment. Calculate road toll cost, set boarding penalties, @@ -86,7 +96,7 @@ def prepare(self, segment_results): self._calc_background_traffic() self._specify() - def assign(self, matrices, iteration): + def assign(self, matrices: dict, iteration: Union[int,str]) -> Dict: """Assign cars, bikes and transit for one time period. Get travel impedance matrices for one time period from assignment. @@ -173,7 +183,10 @@ def assign(self, matrices, iteration): mtxs["cost"][ass_cl] += self.dist_unit_cost * mtxs["dist"][ass_cl] return mtxs - def calc_transit_cost(self, fares, peripheral_cost, mapping): + def calc_transit_cost(self, + fares: TransitFareZoneSpecification, + peripheral_cost: numpy.ndarray, + mapping: dict): """Calculate transit zone cost matrix. Perform multiple transit assignments. @@ -385,7 +398,9 @@ def _set_bike_vdfs(self): link.modes -= {main_mode} self.emme_scenario.publish_network(network) - def _set_emmebank_matrices(self, matrices, is_last_iteration): + def _set_emmebank_matrices(self, + matrices: Dict[str,numpy.ndarray], + is_last_iteration: bool): """Set matrices in emmebank. Bike matrices are added together, so that only one matrix is to be @@ -416,7 +431,10 @@ def _set_emmebank_matrices(self, matrices, is_last_iteration): else: self._set_matrix(mtx, matrices[mtx]) - def _set_matrix(self, ass_class, matrix, matrix_type="demand"): + def _set_matrix(self, + ass_class: str, + matrix: numpy.ndarray, + matrix_type: Optional[str] = "demand"): if numpy.isnan(matrix).any(): msg = ("NAs in demand matrix {} ".format(ass_class) + "would cause infinite loop in Emme assignment.") @@ -427,7 +445,9 @@ def _set_matrix(self, ass_class, matrix, matrix_type="demand"): self.emme_matrices[ass_class][matrix_type]).set_numpy_data( matrix, scenario_id=self.emme_scenario.id) - def _get_matrices(self, mtx_type, is_last_iteration=False): + def _get_matrices(self, + mtx_type: str, + is_last_iteration: bool=False) -> Dict[str,numpy.ndarray]: """Get all matrices of specified type. Parameters @@ -460,7 +480,9 @@ def _get_matrices(self, mtx_type, is_last_iteration=False): matrices["transit_leisure"] = matrices["transit_work"] return matrices - def _get_matrix(self, ass_class, matrix_type): + def _get_matrix(self, + ass_class: str, + matrix_type: str) -> numpy.ndarray: """Get matrix with type pair (e.g., demand, car_work). Parameters @@ -479,14 +501,14 @@ def _get_matrix(self, ass_class, matrix_type): return (self.emme_project.modeller.emmebank.matrix(emme_id) .get_numpy_data(scenario_id=self.emme_scenario.id)) - def _damp_travel_time(self, demand_type): + def _damp_travel_time(self, demand_type: str): """Reduce the impact from first waiting time on total travel time.""" travel_time = self._get_matrix(demand_type, "time") fw_time = self._get_matrix(demand_type, "actual_first_waiting_times") wt_weight = param.waiting_time_perception_factor return travel_time + wt_weight*((5./3.*fw_time)**0.8 - fw_time) - def _extract_timecost_from_gcost(self, ass_class): + def _extract_timecost_from_gcost(self, ass_class: str): """Remove monetary cost from generalized cost. Traffic assignment produces a generalized cost matrix. @@ -504,7 +526,7 @@ def _extract_timecost_from_gcost(self, ass_class): self._set_matrix(ass_class, time, "time") return time - def _calc_background_traffic(self, include_trucks=False): + def _calc_background_traffic(self, include_trucks: bool=False): """Calculate background traffic (buses).""" network = self.emme_scenario.get_network() # emme api has name "data3" for ul3 @@ -541,7 +563,9 @@ def _calc_road_cost(self): link[self.extra("total_cost")] = toll_cost + dist_cost self.emme_scenario.publish_network(network) - def _calc_boarding_penalties(self, extra_penalty=0, is_last_iteration=False): + def _calc_boarding_penalties(self, + extra_penalty: int = 0, + is_last_iteration: bool = False): """Calculate boarding penalties for transit assignment.""" # Definition of line specific boarding penalties network = self.emme_scenario.get_network() @@ -557,8 +581,8 @@ def _calc_boarding_penalties(self, extra_penalty=0, is_last_iteration=False): except KeyError: missing_penalties.add(line.mode.id) if missing_penalties: - missing_penalties = ", ".join(missing_penalties) - log.warn("No boarding penalty found for transit modes " + missing_penalties) + missing_penalties_str: str = ", ".join(missing_penalties) + log.warn("No boarding penalty found for transit modes " + missing_penalties_str) self.emme_scenario.publish_network(network) def _specify(self): @@ -633,7 +657,9 @@ def _specify(self): }, } - def _assign_cars(self, stopping_criteria, lightweight=False): + def _assign_cars(self, + stopping_criteria: Dict[str, Union[int, float]], + lightweight: bool=False): """Perform car_work traffic assignment for one scenario.""" log.info("Car assignment started...") car_spec = self._car_spec.spec(lightweight) @@ -655,8 +681,10 @@ def _assign_cars(self, stopping_criteria, lightweight=False): if assign_report["stopping_criterion"] == "MAX_ITERATIONS": log.warn("Car assignment not fully converged.") - def _assign_bikes(self, length_mat_id, length_for_links): - """Perform bike traffic assignment for one scenario.""" + def _assign_bikes(self, + length_mat_id: Union[float, int, str], + length_for_links: str): + """Perform bike traffic assignment for one scenario.???TYPES""" scen = self.emme_scenario spec = self.bike_spec spec["classes"][0]["results"]["link_volumes"] = self.extra("bike") diff --git a/Scripts/assignment/datatypes/car.py b/Scripts/assignment/datatypes/car.py index 78aa4691..d9ef2f90 100644 --- a/Scripts/assignment/datatypes/car.py +++ b/Scripts/assignment/datatypes/car.py @@ -1,5 +1,8 @@ +from __future__ import annotations import parameters.assignment as param from assignment.datatypes.path_analysis import PathAnalysis +from collections.abc import Callable +from typing import Any, Dict, Optional, Union class Car: @@ -22,11 +25,15 @@ class Car: value_of_time_inv : float (optional) Inversed value of time [min/eur], default is param.vot_inv """ - def __init__(self, ass_class, extra, emme_matrices, - link_costs, value_of_time_inv=None): + def __init__(self, + ass_class: str, + extra: Callable, + emme_matrices: Dict[str, Union[str, Dict[str, str]]], + link_costs: str, + value_of_time_inv: Optional[float]=None): if value_of_time_inv is None: value_of_time_inv = param.vot_inv[param.vot_classes[ass_class]] - self.spec = { + self.spec: Dict[str, Any] = { "mode": param.assignment_modes[ass_class], "demand": emme_matrices["demand"], "generalized_cost": { @@ -45,6 +52,8 @@ def __init__(self, ass_class, extra, emme_matrices, if ass_class not in ("trailer_truck", "truck"): self.add_analysis(extra("toll_cost"), emme_matrices["cost"]) - def add_analysis (self, link_component, od_values): + def add_analysis (self, + link_component: str, + od_values: Union[int, str]): analysis = PathAnalysis(link_component, od_values) self.spec["path_analyses"].append(analysis.spec) diff --git a/Scripts/assignment/datatypes/car_specification.py b/Scripts/assignment/datatypes/car_specification.py index c3708d35..aba895f6 100644 --- a/Scripts/assignment/datatypes/car_specification.py +++ b/Scripts/assignment/datatypes/car_specification.py @@ -1,5 +1,8 @@ +from __future__ import annotations +from typing import Any, Dict, List, Union import parameters.assignment as param from assignment.datatypes.car import Car +from collections.abc import Callable class CarSpecification: """ @@ -19,7 +22,9 @@ class CarSpecification: value : str Emme matrix id """ - def __init__(self, extra, emme_matrices): + def __init__(self, + extra: Callable, + emme_matrices: Dict[str, Union[str, Dict[str, str]]]): self._modes = {} self._freight_modes = list(param.freight_dist_unit_cost) for mode in param.assignment_modes: @@ -31,7 +36,7 @@ def __init__(self, extra, emme_matrices): else: kwargs = {"link_costs": extra("total_cost")} self._modes[mode] = Car(mode, extra, emme_matrices[mode], **kwargs) - self._spec = { + self._spec: Dict[str, Any] = { "type": "SOLA_TRAFFIC_ASSIGNMENT", "background_traffic": { "link_component": param.background_traffic_attr, @@ -41,7 +46,7 @@ def __init__(self, extra, emme_matrices): "stopping_criteria": None, # This is defined later } - def spec(self, lightweight=False): + def spec(self, lightweight: bool = False) -> Dict[str, Any]: self._spec["classes"] = [self._modes[mode].spec for mode in self._modes if not lightweight or mode not in self._freight_modes] return self._spec diff --git a/Scripts/assignment/datatypes/journey_level.py b/Scripts/assignment/datatypes/journey_level.py index 09a95fde..defb2a8c 100644 --- a/Scripts/assignment/datatypes/journey_level.py +++ b/Scripts/assignment/datatypes/journey_level.py @@ -1,8 +1,13 @@ +from __future__ import annotations +from typing import Any, Dict import parameters.assignment as param class JourneyLevel: - def __init__(self, headway_attribute, boarded, count_zone_boardings=False): + def __init__(self, + headway_attribute: str, + boarded: bool, + count_zone_boardings: bool = False): # Definition of transition rules: all modes are allowed transitions = [] for mode in param.transit_modes: @@ -10,7 +15,7 @@ def __init__(self, headway_attribute, boarded, count_zone_boardings=False): "mode": mode, "next_journey_level": 1 }) - self.spec = { + self.spec: Dict[str, Any] = { "transition_rules": transitions, "boarding_time": None, "boarding_cost": dict.fromkeys([ diff --git a/Scripts/assignment/datatypes/path_analysis.py b/Scripts/assignment/datatypes/path_analysis.py index 22169244..f67862f4 100644 --- a/Scripts/assignment/datatypes/path_analysis.py +++ b/Scripts/assignment/datatypes/path_analysis.py @@ -1,5 +1,8 @@ +from typing import Optional, Union + + class PathAnalysis: - def __init__(self, link_component, od_values=None): + def __init__(self, link_component:str, od_values:Optional[Union[str,int]]=None): self.spec = { "link_component": link_component, "operator": "+", diff --git a/Scripts/assignment/datatypes/transit.py b/Scripts/assignment/datatypes/transit.py index be464a76..79942011 100644 --- a/Scripts/assignment/datatypes/transit.py +++ b/Scripts/assignment/datatypes/transit.py @@ -1,3 +1,5 @@ +from __future__ import annotations +from typing import Any, Dict, Union import parameters.assignment as param from assignment.datatypes.journey_level import JourneyLevel @@ -28,14 +30,17 @@ class TransitSpecification: count_zone_boardings : bool (optional) Whether assignment is performed only to count fare zone boardings """ - def __init__(self, segment_results, headway_attribute, - emme_matrices, count_zone_boardings=False): + def __init__(self, + segment_results: Dict[str,str], + headway_attribute: str, + emme_matrices: Dict[str, Union[str, Dict[str, str]]], + count_zone_boardings: bool = False): no_penalty = dict.fromkeys(["at_nodes", "on_lines", "on_segments"]) no_penalty["global"] = { "penalty": 0, "perception_factor": 1, } - self.transit_spec = { + self.transit_spec: Dict[str, Any] = { "type": "EXTENDED_TRANSIT_ASSIGNMENT", "modes": param.transit_assignment_modes, "demand": emme_matrices["demand"], diff --git a/Scripts/assignment/datatypes/transit_fare.py b/Scripts/assignment/datatypes/transit_fare.py index 5a0daf57..f350c971 100644 --- a/Scripts/assignment/datatypes/transit_fare.py +++ b/Scripts/assignment/datatypes/transit_fare.py @@ -1,5 +1,10 @@ +from __future__ import annotations +from typing import Any, Dict, Set, Union +import pandas + + class TransitFareZoneSpecification: - def __init__(self, fare_table): + def __init__(self, fare_table: pandas.DataFrame): """Transit fare zone specification. Parameters @@ -7,14 +12,14 @@ def __init__(self, fare_table): fare_table : pandas.DataFrame Table of transit zone combination fares """ - self.zone_fares = fare_table["fare"].to_dict() + self.zone_fares: Dict = fare_table["fare"].to_dict() try: - self.exclusive = fare_table["exclusive"].dropna().to_dict() + self.exclusive: Dict = fare_table["exclusive"].dropna().to_dict() except KeyError: self.exclusive = {} - self.dist_fare = self.zone_fares.pop("dist") - self.start_fare = self.zone_fares.pop("start") + self.dist_fare: float = self.zone_fares.pop("dist") + self.start_fare: float = self.zone_fares.pop("start") @property - def transit_fare_zones(self): + def transit_fare_zones(self) -> Set[str]: return {char for char in ''.join(self.zone_fares)} diff --git a/Scripts/assignment/departure_time.py b/Scripts/assignment/departure_time.py index b77afbde..e6b48a68 100644 --- a/Scripts/assignment/departure_time.py +++ b/Scripts/assignment/departure_time.py @@ -1,4 +1,8 @@ -import numpy +from __future__ import annotations +from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast +import numpy # type: ignore +from datatypes.demand import Demand +from datatypes.tour import Tour import utils.log as log import parameters.departure_time as param @@ -15,14 +19,16 @@ class DepartureTimeModel: time_periods : list of str (optional) Time period names, default is aht, pt, iht """ - def __init__(self, nr_zones, time_periods=list(param.backup_demand_share)): + def __init__(self, + nr_zones: int, + time_periods: List[str]=list(param.backup_demand_share)): self.nr_zones = nr_zones self.time_periods = time_periods - self.demand = None - self.old_car_demand = 0 + self.demand: Optional[Union[int,Dict[str,Dict[str,numpy.ndarray]]]] = None + self.old_car_demand: Union[int,numpy.ndarray] = 0 self.init_demand() - def init_demand(self): + def init_demand(self) -> Dict[str,float]: """Initialize/reset demand for all time periods. Includes all transport classes, each being set to zero. @@ -39,11 +45,13 @@ def init_demand(self): """ # Calculate gaps try: + self.demand = cast(Dict[str,Dict[str,numpy.ndarray]], self.demand) #type checker hint car_demand = self.demand[self.time_periods[0]]["car_work"] except TypeError: car_demand = 0 max_gap = numpy.abs(car_demand - self.old_car_demand).max() try: + self.old_car_demand = cast(numpy.ndarray, self.old_car_demand) #type checker hint old_sum = self.old_car_demand.sum() relative_gap = abs((car_demand.sum()-old_sum) / old_sum) except AttributeError: @@ -58,7 +66,7 @@ def init_demand(self): return {"rel_gap": relative_gap, "max_gap": max_gap} - def add_demand(self, demand): + def add_demand(self, demand: Union[Demand, Tour]): """Add demand matrix for whole day. Parameters @@ -66,6 +74,7 @@ def add_demand(self, demand): demand : Demand or Tour Travel demand matrix or number of travellers """ + demand.purpose.name = cast(str,demand.purpose.name) #type checker hint if demand.mode != "walk" and not demand.is_car_passenger: if demand.mode in param.divided_classes: ass_class = "{}_{}".format( @@ -73,23 +82,30 @@ def add_demand(self, demand): else: ass_class = demand.mode if len(demand.position) == 2: - share = param.demand_share[demand.purpose.name][demand.mode] + position2 = cast(Tuple[int,int], demand.position) #type checker hint + share: Dict[str, Any] = param.demand_share[demand.purpose.name][demand.mode] for time_period in self.time_periods: self._add_2d_demand( share[time_period], ass_class, time_period, - demand.matrix, demand.position) + demand.matrix, position2) elif len(demand.position) == 3: for time_period in self.time_periods: self._add_3d_demand(demand, ass_class, time_period) else: raise IndexError("Tuple position has wrong dimensions.") - def _add_2d_demand(self, demand_share, ass_class, time_period, mtx, mtx_pos): - """Slice demand, include transpose and add for one time period.""" + def _add_2d_demand(self, + demand_share: Any, + ass_class: str, + time_period: str, + mtx: numpy.ndarray, + mtx_pos: Tuple[int, int]): + """Slice demand, include transpose and add for one time period. ???types""" r_0 = mtx_pos[0] c_0 = mtx_pos[1] r_n = r_0 + mtx.shape[0] c_n = c_0 + mtx.shape[1] + self.demand = cast(Dict[str, Dict[str, Any]], self.demand) #type checker help large_mtx = self.demand[time_period][ass_class] try: large_mtx[r_0:r_n, c_0:c_n] += demand_share[0] * mtx @@ -101,13 +117,18 @@ def _add_2d_demand(self, demand_share, ass_class, time_period, mtx, mtx_pos): log.warn("{} {} matrix not matching {} demand shares. Resorted to backup demand shares.".format( mtx.shape, ass_class, len(demand_share[0]))) - def _add_3d_demand(self, demand, ass_class, time_period): + def _add_3d_demand(self, + demand: Union[Demand, Tour], + ass_class: str, + time_period: str): """Add three-way demand.""" + demand_position = cast(Tuple[int,int,int],demand.position) #type checker hint + demand.purpose.name = cast(str,demand.purpose.name) #type checker hint mtx = demand.matrix tp = time_period - o = demand.position[0] - d1 = demand.position[1] - d2 = demand.position[2] + o = demand_position[0] + d1 = demand_position[1] + d2 = demand_position[2] share = param.demand_share[demand.purpose.name][demand.mode][tp] if demand.dest is not None: # For agent simulation @@ -118,7 +139,7 @@ def _add_3d_demand(self, demand, ass_class, time_period): self._add_2d_demand(share[0], ass_class, tp, mtx, (d1, d2)) self._add_2d_demand(share[1], ass_class, tp, colsum, (d2, o)) - def add_vans(self, time_period, nr_zones): + def add_vans(self, time_period: str, nr_zones: int): """Add vans as a share of private car trips for one time period. Parameters @@ -128,8 +149,9 @@ def add_vans(self, time_period, nr_zones): nr_zones : int Number of zones in model area (metropolitan + peripheral) """ + demand = cast(Dict[str, Dict[str, numpy.ndarray]],self.demand) n = nr_zones - mtx = self.demand[time_period] + mtx = demand[time_period] car_demand = (mtx["car_work"][0:n, 0:n] + mtx["car_leisure"][0:n, 0:n]) share = param.demand_share["freight"]["van"][time_period] self._add_2d_demand(share, "van", time_period, car_demand, (0, 0)) diff --git a/Scripts/assignment/emme_assignment.py b/Scripts/assignment/emme_assignment.py index 1314dfdb..ec663627 100644 --- a/Scripts/assignment/emme_assignment.py +++ b/Scripts/assignment/emme_assignment.py @@ -1,3 +1,5 @@ +from __future__ import annotations +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, cast import pandas from math import log10 @@ -7,6 +9,12 @@ import parameters.zone as zone_param from assignment.abstract_assignment import AssignmentModel from assignment.assignment_period import AssignmentPeriod +if TYPE_CHECKING: + from assignment.emme_bindings.emme_project import EmmeProject + from assignment.datatypes.transit_fare import TransitFareZoneSpecification + from datahandling.resultdata import ResultsData + from inro.emme.database.scenario import Scenario # type: ignore + from inro.emme.network.Network import Network # type: ignore class EmmeAssignmentModel(AssignmentModel): @@ -34,9 +42,13 @@ class EmmeAssignmentModel(AssignmentModel): 300 matrix ids will be reserved, starting from first_matrix_id. Default is 100(-399). """ - def __init__(self, emme_context, first_scenario_id, - separate_emme_scenarios=False, save_matrices=False, - time_periods=param.time_periods, first_matrix_id=100): + def __init__(self, + emme_context: EmmeProject, + first_scenario_id: int, + separate_emme_scenarios: bool=False, + save_matrices: bool=False, + time_periods: List[str]=param.time_periods, + first_matrix_id: int=100): self.separate_emme_scenarios = separate_emme_scenarios self.save_matrices = save_matrices self.time_periods = time_periods @@ -45,7 +57,8 @@ def __init__(self, emme_context, first_scenario_id, self.mod_scenario = self.emme_project.modeller.emmebank.scenario( first_scenario_id) - def prepare_network(self, car_dist_unit_cost=None): + def prepare_network(self, + car_dist_unit_cost: Optional[float]=None): """Create matrices, extra attributes and calc background variables. Parameters @@ -96,7 +109,9 @@ def prepare_network(self, car_dist_unit_cost=None): self.emme_project.modeller.emmebank.create_function( idx, param.volume_delay_funcs[idx]) - def init_assign(self, demand): + def init_assign(self, + demand: Dict[str,List[numpy.ndarray]]): + """??? types""" ap0 = self.assignment_periods[0] ap0.assign(demand, iteration="init") if self.save_matrices: @@ -107,12 +122,12 @@ def init_assign(self, demand): self._copy_matrix("dist", "walk", ap0, ap) @property - def zone_numbers(self): - """List of all zone numbers.""" + def zone_numbers(self) -> List[int]: + """List of all zone numbers. ???types""" return self.mod_scenario.zone_numbers @property - def mapping(self): + def mapping(self) -> Dict[int, int]: """dict: Dictionary of zone numbers and corresponding indices.""" mapping = {} for idx, zone in enumerate(self.zone_numbers): @@ -120,11 +135,11 @@ def mapping(self): return mapping @property - def nr_zones(self): + def nr_zones(self) -> int: """int: Number of zones in assignment model.""" return len(self.zone_numbers) - def aggregate_results(self, resultdata): + def aggregate_results(self, resultdata: ResultsData): """Aggregate results to 24h and print vehicle kms. Parameters @@ -234,7 +249,10 @@ def aggregate_results(self, resultdata): resultdata.print_data(dists, "transit_kms.txt", "dist") resultdata.print_data(times, "transit_kms.txt", "time") - def calc_transit_cost(self, fares, peripheral_cost, default_cost=None): + def calc_transit_cost(self, + fares: TransitFareZoneSpecification, + peripheral_cost: numpy.ndarray, + default_cost: numpy.ndarray = None): """Calculate transit zone cost matrix. Perform multiple transit assignments. @@ -268,14 +286,18 @@ def calc_transit_cost(self, fares, peripheral_cost, default_cost=None): if not self.save_matrices: break - def _copy_matrix(self, mtx_type, ass_class, ass_period_1, ass_period_2): + def _copy_matrix(self, + mtx_type: str, + ass_class: str, + ass_period_1: AssignmentPeriod, + ass_period_2: AssignmentPeriod): from_mtx = ass_period_1.emme_matrices[ass_class][mtx_type] to_mtx = ass_period_2.emme_matrices[ass_class][mtx_type] description = f"{mtx_type}_{ass_class}_{ass_period_2.name}" self.emme_project.copy_matrix( from_mtx, to_mtx, description, description) - def _extra(self, attr): + def _extra(self, attr: str) -> str: """Add prefix "@" and suffix "_vrk". Parameters @@ -291,7 +313,7 @@ def _extra(self, attr): return "@{}_{}".format(attr, "vrk") def _add_bus_stops(self): - network = self.mod_scenario.get_network() + network: Network = self.mod_scenario.get_network() for line in network.transit_lines(): if line.mode.id in param.stop_codes: stop_codes = param.stop_codes[line.mode.id] @@ -372,7 +394,9 @@ def _create_matrices(self, time_period, id_hundred, id_ten): emme_matrices[ass_class] = matrix_ids return emme_matrices - def _create_attributes(self, scenario, extra): + def _create_attributes(self, + scenario: Any, + extra: Callable[[str], str]) -> Dict[str,Dict[str,str]]: """Create extra attributes needed in assignment. Parameters @@ -386,13 +410,14 @@ def _create_attributes(self, scenario, extra): # Create link attributes ass_classes = list(param.emme_matrices) + ["bus"] ass_classes.remove("walk") + if TYPE_CHECKING: scenario = cast(Scenario, scenario) for ass_class in ass_classes: self.emme_project.create_extra_attribute( "LINK", extra(ass_class), ass_class + " volume", overwrite=True, scenario=scenario) - for attr in ("total_cost", "toll_cost", "car_time", "aux_transit"): + for attr_s in ("total_cost", "toll_cost", "car_time", "aux_transit"): #attr_s tp make difference for type checker self.emme_project.create_extra_attribute( - "LINK", extra(attr), attr, + "LINK", extra(attr_s), attr_s, overwrite=True, scenario=scenario) # Create node and transit segment attributes attr = param.segment_results @@ -424,7 +449,7 @@ def _create_attributes(self, scenario, extra): scenario)) return seg_results - def calc_noise(self): + def calc_noise(self) -> pandas.Series: """Calculate noise according to Road Traffic Noise Nordic 1996. Returns @@ -492,7 +517,7 @@ def calc_noise(self): noise_areas[area] += 0.001 * zone_width * link.length return noise_areas - def _link_24h(self, attr): + def _link_24h(self, attr: str): """ Sums and expands link volumes to 24h. @@ -521,7 +546,7 @@ def _link_24h(self, attr): log.info("Link attribute {} aggregated to 24h (scenario {})".format( extra, self.day_scenario.id)) - def _node_24h(self, transit_class, attr): + def _node_24h(self, transit_class: str, attr: str): """ Sums and expands node attributes to 24h. @@ -553,7 +578,7 @@ def _node_24h(self, transit_class, attr): log.info("Node attribute {} aggregated to 24h (scenario {})".format( extra, self.day_scenario.id)) - def _transit_segment_24h(self, transit_class, attr): + def _transit_segment_24h(self, transit_class: str, attr: str): """ Sums and expands transit attributes to 24h. diff --git a/Scripts/assignment/emme_bindings/emme_project.py b/Scripts/assignment/emme_bindings/emme_project.py index 5cef5850..f5f59f22 100644 --- a/Scripts/assignment/emme_bindings/emme_project.py +++ b/Scripts/assignment/emme_bindings/emme_project.py @@ -1,8 +1,13 @@ +from __future__ import annotations import os +from typing import Any, Optional, cast, TYPE_CHECKING import utils.log as log import logging -import inro.emme.desktop.app as _app -import inro.modeller as _m +import inro.emme.desktop.app as _app # type: ignore +import inro.modeller as _m # type: ignore +if TYPE_CHECKING: + #The following one is likely in different location + from inro.modeller import ContentManager # type: ignore class EmmeProject: @@ -18,8 +23,11 @@ class EmmeProject: emmebank_path : str (optional) Path to emmebank file (if EMME project is not initialized) """ - def __init__(self, project_path, emmebank_path=None): + def __init__(self, + project_path: str, + emmebank_path: Optional[str] = None): log.info("Starting Emme...") + if TYPE_CHECKING: self.cm: Optional[ContentManager] = None #type checker hint emme_desktop = _app.start_dedicated( project=project_path, visible=False, user_initials="HSL") if emmebank_path is not None: @@ -59,10 +67,12 @@ def __init__(self, project_path, emmebank_path=None): self.create_extra_attribute = self.modeller.tool( "inro.emme.data.extra_attribute.create_extra_attribute") - def write(self, message): + def write(self, message: str): """Write to logbook.""" # _m.logbook_write(message) + try: + if TYPE_CHECKING: self.cm = cast(ContentManager, self.cm) self.cm.__exit__(None, None, None) except AttributeError: pass diff --git a/Scripts/assignment/emme_bindings/mock_project.py b/Scripts/assignment/emme_bindings/mock_project.py index aaae5f17..b10731a4 100644 --- a/Scripts/assignment/emme_bindings/mock_project.py +++ b/Scripts/assignment/emme_bindings/mock_project.py @@ -1,4 +1,6 @@ -import numpy +from __future__ import annotations +from typing import Dict, Iterable, List, Optional, Union +import numpy # type: ignore from collections import namedtuple import copy import os @@ -21,8 +23,13 @@ class MockProject: def __init__(self): self.modeller = Modeller(EmmeBank()) - def copy_scenario(self, from_scenario, scenario_id, scenario_title, - overwrite=False, copy_paths=True, copy_strategies=True): + def copy_scenario(self, + from_scenario, + scenario_id: int, + scenario_title: str, + overwrite: bool = False, + copy_paths: bool = True, + copy_strategies: bool = True): if overwrite: try: self.modeller.emmebank.delete_scenario(scenario_id) @@ -57,8 +64,12 @@ def import_scenario(self, scenario_dir, scenario_id, scenario_title): self.import_extra_attributes( os.path.join(scenario_dir, file_name), scenario=scenario) - def create_matrix(self, matrix_id, matrix_name, matrix_description, - default_value=0, overwrite=False): + def create_matrix(self, + matrix_id: int, + matrix_name, + matrix_description, + default_value=0, + overwrite=False): try: mtx = self.modeller.emmebank.create_matrix( matrix_id, default_value) @@ -69,11 +80,13 @@ def create_matrix(self, matrix_id, matrix_name, matrix_description, mtx.name = matrix_name mtx.description = matrix_description - def create_extra_attribute(self, extra_attribute_type, - extra_attribute_name, - extra_attribute_description, - extra_attribute_default_value=0.0, - overwrite=False, scenario=None): + def create_extra_attribute(self, + extra_attribute_type: str, + extra_attribute_name: str, + extra_attribute_description: str, + extra_attribute_default_value: float = 0.0, + overwrite: bool = False, + scenario: Optional[Scenario] = None): try: scenario.create_extra_attribute( extra_attribute_type, extra_attribute_name, @@ -349,14 +362,14 @@ def __init__(self): self._matrices = {} self._functions = {} - def scenario(self, idx): + def scenario(self, idx: int): if idx in self._scenarios: return self._scenarios[idx] def scenarios(self): return iter(self._scenarios.values()) - def create_scenario(self, idx): + def create_scenario(self, idx: int): if idx in self._scenarios: raise ExistenceError("Scenario already exists: {}".format(idx)) else: @@ -364,7 +377,7 @@ def create_scenario(self, idx): self._scenarios[idx] = scenario return scenario - def copy_scenario(self, source_id, destination_id): + def copy_scenario(self, source_id: int, destination_id: int): if self.scenario(source_id) is None: raise ExistenceError("Scenario does not exist: {}".format( source_id)) @@ -374,14 +387,14 @@ def copy_scenario(self, source_id, destination_id): copy.deepcopy(self.scenario(source_id).get_network())) return dest - def delete_scenario(self, idx): + def delete_scenario(self, idx: int): del self._scenarios[idx] - def matrix(self, idx): + def matrix(self, idx: int): if idx in self._matrices: return self._matrices[idx] - def create_matrix(self, idx, default_value=0.0): + def create_matrix(self, idx: int, default_value=0.0): if idx in self._matrices: raise ExistenceError("Matrix already exists: {}".format(idx)) else: @@ -390,14 +403,14 @@ def create_matrix(self, idx, default_value=0.0): self._matrices[idx] = matrix return matrix - def function(self, idx): + def function(self, idx: int): if idx in self._functions: return self._functions[idx] def functions(self): return iter(self._functions.values()) - def create_function(self, idx, expression): + def create_function(self, idx: int, expression: Dict[str,str]): if idx in self._functions: raise ExistenceError("Function already exists: {}".format(idx)) else: @@ -405,7 +418,7 @@ def create_function(self, idx, expression): self._functions[idx] = func return func - def delete_function(self, idx): + def delete_function(self, idx: int): try: del self._functions[idx] except KeyError: @@ -413,7 +426,7 @@ def delete_function(self, idx): class Scenario: - def __init__(self, idx): + def __init__(self, idx: int): self.id = str(idx) self.number = int(idx) self.title = "" @@ -423,13 +436,16 @@ def __init__(self, idx): def zone_numbers(self): return sorted(self._network._centroids) - def extra_attribute(self, idx): + def extra_attribute(self, idx: int): network = self.get_network() for attr_type in network._extra_attr: if idx in network._extra_attr[attr_type]: return network._extra_attr[attr_type][idx] - def create_extra_attribute(self, attr_type, idx, default_value=0.0): + def create_extra_attribute(self, + attr_type, + idx: int, + default_value: float=0.0): network = self.get_network() if idx in network._extra_attr[attr_type]: raise ExistenceError("Extra attribute already exists: {}".format( @@ -464,7 +480,7 @@ def initialize(self, value=0.0): class Matrix: - def __init__(self, idx, dim, default_value): + def __init__(self, idx: int, dim: int, default_value: Union[int, float]): self.id = idx self._data = numpy.full((dim, dim), default_value, dtype=float) self._name = "" @@ -494,15 +510,15 @@ def description(self, matrix_description): else: self._description = matrix_description - def get_numpy_data(self, scenario_id=None): + def get_numpy_data(self, scenario_id: Optional[int]=None): return self._data - def set_numpy_data(self, data, scenario_id=None): + def set_numpy_data(self, data, scenario_id: Optional[int]=None): self._data[:,:] = data class Function: - def __init__(self, idx, expression): + def __init__(self, idx: int, expression: Dict[str,str]): self.id = idx self.expression = expression @@ -525,35 +541,35 @@ def __init__(self): } self._extra_attr = {attr_type: {} for attr_type in self._objects} - def mode(self, idx): + def mode(self, idx: int) -> 'Mode': if idx in self._modes: return self._modes[idx] - def modes(self): + def modes(self) -> Iterable: return iter(self._modes.values()) - def create_mode(self, mode_type, idx): + def create_mode(self, mode_type: str, idx: str) -> 'Mode': if not isinstance(idx, str) or len(idx) != 1: raise Exception("Invalid mode ID: " + idx) mode = Mode(idx, mode_type) self._modes[idx] = mode return mode - def node(self, idx): + def node(self, idx: int) -> 'Node': idx = int(idx) if idx in self._nodes: return self._nodes[idx] - def nodes(self): + def nodes(self) -> Iterable: return iter(self._nodes.values()) - def centroids(self): + def centroids(self) -> Iterable: return iter(self._centroids.values()) - def regular_nodes(self): + def regular_nodes(self) -> Iterable: return iter(self._regular_nodes.values()) - def create_node(self, idx, is_centroid): + def create_node(self, idx: int, is_centroid: bool) -> 'Node': idx = int(idx) node = Node(self, idx, is_centroid) self._nodes[idx] = node @@ -563,15 +579,15 @@ def create_node(self, idx, is_centroid): self._regular_nodes[idx] = node return node - def link(self, i_node_id, j_node_id): + def link(self, i_node_id: int, j_node_id: int) -> 'Link': idx = "{}-{}".format(i_node_id, j_node_id) if idx in self._links: return self._links[idx] - def links(self): + def links(self) -> Iterable: return iter(self._links.values()) - def create_link(self, i_node_id, j_node_id, modes): + def create_link(self, i_node_id: int, j_node_id: int, modes: str) -> 'Link': modes = [self.mode(str(mode)) for mode in modes] link = Link( self, self._nodes[int(i_node_id)], self._nodes[int(j_node_id)], @@ -579,29 +595,29 @@ def create_link(self, i_node_id, j_node_id, modes): self._links["{}-{}".format(i_node_id, j_node_id)] = link return link - def transit_vehicle(self, idx): + def transit_vehicle(self, idx: int) -> 'TransitVehicle': if idx in self._vehicles: return self._vehicles[idx] - def transit_vehicles(self): + def transit_vehicles(self) -> Iterable: return iter(self._vehicles.values()) - def create_transit_vehicle(self, idx, mode_id): + def create_transit_vehicle(self, idx: int, mode_id: str) -> 'TransitVehicle': vehicle = TransitVehicle(idx, self.mode(mode_id)) self._vehicles[idx] = vehicle return vehicle - def transit_line(self, idx): + def transit_line(self, idx) -> 'TransitLine': if idx in self._lines: return self._lines[idx] - def transit_lines(self): + def transit_lines(self) -> Iterable['TransitLine']: return iter(self._lines.values()) - def transit_segments(self): + def transit_segments(self) -> Iterable: return iter(self._segments) - def create_transit_line(self, idx, transit_vehicle_id, itinerary): + def create_transit_line(self, idx: str, transit_vehicle_id: int, itinerary: List[List[str]]) -> 'TransitLine': line = TransitLine(self, idx, transit_vehicle_id) self._lines[idx] = line for i in range(len(itinerary) - 1): @@ -614,31 +630,31 @@ def create_transit_line(self, idx, transit_vehicle_id, itinerary): class Mode: - def __init__(self, idx, mode_type): + def __init__(self, idx: str, mode_type: str): self.id = idx self.type = mode_type self.description = "" - def __str__(self): + def __str__(self) -> str: return self.id class TransitVehicle: - def __init__(self, idx, mode): + def __init__(self, idx: int, mode: str): self.number = idx self.mode = mode self.description = "" @property - def id(self): + def id(self) -> str: return str(self.number) - def __str__(self): + def __str__(self) -> str: return self.id class NetworkObject: - def __init__(self, network, extra_attr): + def __init__(self, network: Network, extra_attr: Dict[str, Dict[str, Union['Link', 'Node', 'TransitLine', 'TransitSegment']]]): self.network = network self._extra_attr = {idx: extra_attr[idx].default_value for idx in extra_attr} @@ -669,7 +685,10 @@ def __str__(self): class Node(NetworkObject): - def __init__(self, network, idx, is_centroid=False): + def __init__(self, + network: Network, + idx: int, + is_centroid: bool = False): NetworkObject.__init__(self, network, network._extra_attr["NODE"]) self.is_centroid = is_centroid self.number = idx @@ -683,7 +702,11 @@ def id(self): class Link(NetworkObject): - def __init__(self, network, i_node, j_node, modes): + def __init__(self, + network: Network, + i_node: Node, + j_node: Node, + modes): NetworkObject.__init__(self, network, network._extra_attr["LINK"]) self.i_node = i_node self.j_node = j_node @@ -697,22 +720,22 @@ def __init__(self, network, i_node, j_node, modes): self._segments = [] @property - def id(self): + def id(self) -> str: return "{}-{}".format(self.i_node, self.j_node) @property - def reverse_link(self): + def reverse_link(self) -> Optional[Link]: try: return self.network.link(self.j_node, self.i_node) except KeyError: return None - def segments(self): + def segments(self) -> Iterable: return iter(self._segments) class TransitLine(NetworkObject): - def __init__(self, network, idx, vehicle): + def __init__(self, network: Network, idx: str, vehicle: TransitVehicle): NetworkObject.__init__( self, network, network._extra_attr["TRANSIT_LINE"]) self.id = idx @@ -721,34 +744,34 @@ def __init__(self, network, idx, vehicle): self._segments = [] @property - def id(self): + def id(self) -> str: return self._id @id.setter - def id(self, idx): + def id(self, idx: str): self._id = idx @property - def vehicle(self): + def vehicle(self) -> TransitVehicle: return self._vehicle @vehicle.setter - def vehicle(self, vehicle_id): + def vehicle(self, vehicle_id: int): self._vehicle = self.network._vehicles[vehicle_id] @property - def mode(self): + def mode(self) -> Mode: return self.vehicle.mode - def segment(self, idx): + def segment(self, idx) -> 'TransitSegment': return self._segments[idx] - def segments(self): + def segments(self) -> Iterable: return iter(self._segments) class TransitSegment(NetworkObject): - def __init__(self, network, line, link): + def __init__(self, network: Network, line: TransitLine, link: Link): NetworkObject.__init__( self, network, network._extra_attr["TRANSIT_SEGMENT"]) self.line = line @@ -757,15 +780,15 @@ def __init__(self, network, line, link): self.dwell_time = 0.01 @property - def id(self): + def id(self) -> str: return "{}-{}".format(self.line, self.link) @property - def i_node(self): + def i_node(self) -> Node: return self.link.i_node @property - def j_node(self): + def j_node(self) -> Node: return self.link.j_node diff --git a/Scripts/assignment/mock_assignment.py b/Scripts/assignment/mock_assignment.py index a1dbac5c..12014333 100644 --- a/Scripts/assignment/mock_assignment.py +++ b/Scripts/assignment/mock_assignment.py @@ -1,4 +1,9 @@ +from __future__ import annotations +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union +import numpy # type: ignore import pandas +if TYPE_CHECKING: + from datahandling.matrixdata import MatrixData import utils.log as log @@ -8,7 +13,7 @@ class MockAssignmentModel(AssignmentModel): - def __init__(self, matrices, time_periods=param.time_periods): + def __init__(self, matrices: MatrixData, time_periods: List[str]=param.time_periods): self.matrices = matrices log.info("Reading matrices from " + str(self.matrices.path)) self.time_periods = time_periods @@ -16,7 +21,7 @@ def __init__(self, matrices, time_periods=param.time_periods): for tp in time_periods] @property - def zone_numbers(self): + def zone_numbers(self) -> numpy.array: """Numpy array of all zone numbers.""" with self.matrices.open("time", "aht") as mtx: zone_numbers = mtx.zone_numbers @@ -30,7 +35,7 @@ def mapping(self): return mapping @property - def nr_zones(self): + def nr_zones(self) -> int: """int: Number of zones in assignment model.""" return len(self.zone_numbers) @@ -43,7 +48,7 @@ def aggregate_results(self, resultdata): def calc_noise(self): return pandas.Series(0, zone_param.area_aggregation) - def prepare_network(self, car_dist_unit_cost=None): + def prepare_network(self, car_dist_unit_cost: Optional[float]=None): pass def init_assign(self, demand): @@ -51,7 +56,7 @@ def init_assign(self, demand): class MockPeriod(Period): - def __init__(self, name, matrices): + def __init__(self, name: str, matrices: MatrixData): self.name = name self.matrices = matrices @@ -62,7 +67,9 @@ def zone_numbers(self): zone_numbers = mtx.zone_numbers return zone_numbers - def assign(self, matrices, iteration=None): + def assign(self, + matrices: Dict[str, numpy.ndarray], + iteration: Optional[Union[int,str]]=None) -> Dict[str,Dict[str,numpy.ndarray]]: """Assign cars, bikes and transit for one time period. Get travel impedance matrices for one time period from assignment. @@ -93,7 +100,7 @@ def assign(self, matrices, iteration=None): * mtxs["dist"][ass_cl]) return mtxs - def _get_matrices(self, mtx_type): + def _get_matrices(self, mtx_type: str) -> Dict[str, numpy.ndarray]: """Get all matrices of specified type. Parameters diff --git a/Scripts/datahandling/matrixdata.py b/Scripts/datahandling/matrixdata.py index 7ecaeb01..afc0b1b6 100644 --- a/Scripts/datahandling/matrixdata.py +++ b/Scripts/datahandling/matrixdata.py @@ -1,8 +1,12 @@ +from __future__ import annotations import os -import openmatrix as omx -import numpy +from typing import TYPE_CHECKING, Optional +import openmatrix as omx # type: ignore +import numpy # type: ignore import pandas from contextlib import contextmanager +if TYPE_CHECKING: + from datahandling.zonedata import BaseZoneData import utils.log as log from utils.read_csv_file import read_csv_file @@ -12,22 +16,26 @@ class MatrixData: - def __init__(self, path): + def __init__(self, path: str): self.path = path if not os.path.exists(self.path): os.makedirs(self.path) @contextmanager - def open(self, mtx_type, time_period, zone_numbers=None, m='r'): + def open(self, + mtx_type: str, + time_period: str, + zone_numbers: Optional[numpy.ndarray] = None, + m: str = 'r'): file_name = os.path.join(self.path, mtx_type+'_'+time_period+".omx") mtxfile = MatrixFile(omx.open_file(file_name, m), zone_numbers) yield mtxfile mtxfile.close() - def get_external(self, transport_mode): + def get_external(self, transport_mode: str): return read_csv_file(self.path, "external_"+transport_mode+".txt") - def peripheral_transit_cost(self, zonedata): + def peripheral_transit_cost(self, zonedata: BaseZoneData): filename = "transit_cost_peripheral.txt" try: aggr_mtx = read_csv_file(self.path, filename) @@ -52,7 +60,7 @@ def peripheral_transit_cost(self, zonedata): class MatrixFile: - def __init__(self, omx_file, zone_numbers): + def __init__(self, omx_file: omx.File, zone_numbers: numpy.ndarray): self._file = omx_file self.missing_zones = [] if zone_numbers is None: @@ -95,7 +103,7 @@ def __init__(self, omx_file, zone_numbers): def close(self): self._file.close() - def __getitem__(self, mode): + def __getitem__(self, mode: str): mtx = numpy.array(self._file[mode]) nr_zones = len(self.zone_numbers) dim = (nr_zones, nr_zones) diff --git a/Scripts/datahandling/resultdata.py b/Scripts/datahandling/resultdata.py index 73109e38..a7b25189 100644 --- a/Scripts/datahandling/resultdata.py +++ b/Scripts/datahandling/resultdata.py @@ -1,4 +1,6 @@ +from __future__ import annotations import os +from typing import Any, Dict import pandas try: from openpyxl import Workbook, load_workbook @@ -11,13 +13,13 @@ class ResultsData: """ Saves all result data to same folder. """ - def __init__(self, results_directory_path): + def __init__(self, results_directory_path: str): if not os.path.exists(results_directory_path): os.makedirs(results_directory_path) self.path = results_directory_path - self._line_buffer = {} - self._df_buffer = {} - self._xlsx_buffer = {} + self._line_buffer: Dict[str, Any] = {} + self._df_buffer: Dict[str, Any] = {} + self._xlsx_buffer: Dict[str, Any] = {} def flush(self): """Save to files and empty buffers.""" @@ -34,7 +36,7 @@ def flush(self): os.path.join(self.path, "{}.xlsx".format(filename))) self._xlsx_buffer = {} - def print_data(self, data, filename, colname): + def print_data(self, data: pandas.Series, filename: str, colname: str): """Save data to DataFrame buffer (printed to text file when flushing). Parameters @@ -54,7 +56,7 @@ def print_data(self, data, filename, colname): df.index.union(data.index), copy=False) self._df_buffer[filename][colname] = data - def print_line(self, line, filename): + def print_line(self, line: str, filename: str): """Write text to line in file (closed when flushing). Parameters @@ -72,7 +74,7 @@ def print_line(self, line, filename): self._line_buffer[filename] = buffer buffer.write(line + "\n") - def print_matrix(self, data, filename, sheetname): + def print_matrix(self, data: pandas.DataFrame, filename: str, sheetname: str): """Save 2-d matrix data to buffer (printed to file when flushing). Saves matrix both in Excel format and as list in text file. diff --git a/Scripts/datahandling/zonedata.py b/Scripts/datahandling/zonedata.py index c1025c92..848e481c 100644 --- a/Scripts/datahandling/zonedata.py +++ b/Scripts/datahandling/zonedata.py @@ -1,4 +1,6 @@ -import numpy +from __future__ import annotations +from typing import Any, Dict, List, Tuple, Union +import numpy # type: ignore import pandas import parameters.zone as param @@ -10,8 +12,8 @@ class ZoneData: - def __init__(self, data_dir, zone_numbers): - self._values = {} + def __init__(self, data_dir: str, zone_numbers: numpy.array): + self._values: Dict[str,Any]= {} self.share = ShareChecker(self) all_zone_numbers = numpy.array(zone_numbers) self.all_zone_numbers = all_zone_numbers @@ -114,7 +116,7 @@ def dummy(self, division_type, name, bounds=slice(None)): def __getitem__(self, key): return self._values[key] - def __setitem__(self, key, data): + def __setitem__(self, key: str, data: Any): try: if not numpy.isfinite(data).all(): for (i, val) in data.iteritems(): @@ -144,7 +146,8 @@ def __setitem__(self, key, data): raise ValueError(msg) self._values[key] = data - def zone_index(self, zone_number): + def zone_index(self, + zone_number: int) -> int: """Get index of given zone number. Parameters @@ -159,7 +162,7 @@ def zone_index(self, zone_number): """ return self.zones[zone_number].index - def get_freight_data(self): + def get_freight_data(self) -> pandas.DataFrame: """Get zone data for freight traffic calculation. Returns @@ -177,15 +180,15 @@ def get_freight_data(self): data = {k: self._values[k] for k in freight_variables} return pandas.DataFrame(data) - def get_data(self, key, bounds, generation=False): + def get_data(self, key: str, bounds: slice, generation: bool=False) -> Union[pandas.Series, numpy.ndarray]: """Get data of correct shape for zones included in purpose. Parameters ---------- key : str Key describing the data (e.g., "population") - bounds : tuple - Two integers that describe the lower and upper bounds of purpose + bounds : slice + Slice that describes the lower and upper bounds of purpose generation : bool, optional If set to True, returns data only for zones in purpose, otherwise returns data for all zones @@ -197,12 +200,12 @@ def get_data(self, key, bounds, generation=False): try: val = self._values[key] except KeyError as err: - key = key.split('_') - if key[1] in ("own", "other"): + keyl: List[str] = key.split('_') + if keyl[1] in ("own", "other"): # If parameter is only for own municipality or for all # municipalities except own, array is multiplied by # bool matrix - return (self[key[1]] * self._values[key[0]].values)[bounds, :] + return (self[keyl[1]] * self._values[keyl[0]].values)[bounds, :] else: raise KeyError(err) if val.ndim == 1: # If not a compound (i.e., matrix) @@ -215,7 +218,7 @@ def get_data(self, key, bounds, generation=False): class BaseZoneData(ZoneData): - def __init__(self, data_dir, zone_numbers): + def __init__(self, data_dir: str, zone_numbers: numpy.array): ZoneData.__init__(self, data_dir, zone_numbers) cardata = read_csv_file(data_dir, ".car", self.zone_numbers) self["car_density"] = cardata["cardens"] diff --git a/Scripts/datatypes/demand.py b/Scripts/datatypes/demand.py index 42493bc5..861f64bc 100644 --- a/Scripts/datatypes/demand.py +++ b/Scripts/datatypes/demand.py @@ -1,3 +1,9 @@ +from __future__ import annotations +from typing import TYPE_CHECKING, Optional + +import numpy # type: ignore +if TYPE_CHECKING: + from datatypes.purpose import Purpose import parameters.car as param @@ -5,7 +11,11 @@ class Demand: # Only used for sister class Tour is_car_passenger = False - def __init__(self, purpose, mode, matrix, origin=None): + def __init__(self, + purpose: Purpose, + mode: str, + matrix: numpy.ndarray, + origin: Optional[int]=None): """Demand matrix for whole day Parameters diff --git a/Scripts/datatypes/histogram.py b/Scripts/datatypes/histogram.py index 53fc9fdd..ad4f366d 100644 --- a/Scripts/datatypes/histogram.py +++ b/Scripts/datatypes/histogram.py @@ -1,5 +1,5 @@ import pandas -import numpy +import numpy # type: ignore from parameters.zone import tour_length_intervals as intervals diff --git a/Scripts/datatypes/person.py b/Scripts/datatypes/person.py index fce8990a..c6e76b2c 100644 --- a/Scripts/datatypes/person.py +++ b/Scripts/datatypes/person.py @@ -1,7 +1,14 @@ -import numpy +from __future__ import annotations +from typing import Dict, List, Tuple +import numpy # type: ignore import random +from datatypes.purpose import TourPurpose from datatypes.tour import Tour +from datatypes.zone import Zone +from models.car_use import CarUseModel +from models.linear import IncomeModel +from models.tour_combinations import TourCombinationModel class Person: @@ -18,6 +25,7 @@ class Person: Model used to create tours car_use_model : models.logit.CarUseModel Model used to decide if car user + income_model : models.linear.IncomeModel """ id_counter = 0 @@ -27,15 +35,19 @@ class Person: zone_attr = ["number", "area", "municipality"] attr = person_attr + zone_attr - def __init__(self, zone, age_group, - generation_model, car_use_model, income_model): + def __init__(self, + zone: Zone, + age_group: Tuple[int,int], + generation_model: TourCombinationModel, + car_use_model: CarUseModel, + income_model: IncomeModel): self.id = Person.id_counter Person.id_counter += 1 self.zone = zone self.age = random.randint(age_group[0], age_group[1]) self.age_group = "age_" + str(age_group[0]) + "-" + str(age_group[1]) self.sex = random.random() < 0.5 - self.tours = [] + self.tours: List[Tour] = [] self.generation_model = generation_model self._cm = car_use_model self._im = income_model @@ -63,7 +75,7 @@ def calc_income(self): self.income = numpy.exp(log_income) @property - def gender(self): + def gender(self) -> str: """Returns the person's gender. Returns @@ -73,7 +85,9 @@ def gender(self): """ return "female" if self.sex == Person.FEMALE else "male" - def add_tours(self, purposes, tour_probs): + def add_tours(self, + purposes: Dict[str,TourPurpose], + tour_probs: Dict[str,numpy.ndarray]): """Initilize tour list and add new tours. Parameters @@ -125,7 +139,7 @@ def add_tours(self, purposes, tour_probs): non_home_tour = Tour(purposes["oo"], tour, self.id) self.tours.append(non_home_tour) - def __str__(self): + def __str__(self) -> str: """ Return person attributes as string. Returns diff --git a/Scripts/datatypes/purpose.py b/Scripts/datatypes/purpose.py index d8e0b8f1..b8073cd0 100644 --- a/Scripts/datatypes/purpose.py +++ b/Scripts/datatypes/purpose.py @@ -1,5 +1,9 @@ -import numpy +from __future__ import annotations +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast +import numpy # type: ignore import pandas +from datahandling.resultdata import ResultsData +from datahandling.zonedata import ZoneData import parameters.zone as param from parameters.destination_choice import secondary_destination_threshold @@ -30,12 +34,17 @@ class Purpose: Data used for all demand calculations """ - def __init__(self, specification, zone_data, resultdata=None): + def __init__(self, + specification: Dict[str,Optional[str]], + zone_data: ZoneData, + resultdata: Optional[ResultsData]=None): self.name = specification["name"] self.orig = specification["orig"] self.dest = specification["dest"] self.area = specification["area"] - self.sources = [] + self.name = cast(str, self.name) #type checker help + self.area = cast(str, self.area) #type checker help + self.sources: List[Any] = [] zone_numbers = zone_data.all_zone_numbers zone_intervals = param.purpose_areas[self.area] self.bounds = slice(*zone_numbers.searchsorted( @@ -47,9 +56,9 @@ def __init__(self, specification, zone_data, resultdata=None): self.zone_data = zone_data self.resultdata = resultdata self.model = None - self.modes = [] - self.generated_tours = {} - self.attracted_tours = {} + self.modes: List[str] = [] + self.generated_tours: Dict[str, numpy.array] = {} + self.attracted_tours: Dict[str, numpy.array] = {} @property def zone_numbers(self): @@ -101,7 +110,7 @@ def __init__(self, specification, zone_data, resultdata): else: self.gen_model = generation.GenerationModel(self, resultdata) if self.name == "sop": - self.model = logit.OriginModel(zone_data, self, resultdata) + self.model: Union[logit.OriginModel, logit.DestModeModel, logit.ModeDestModel] = logit.OriginModel(zone_data, self, resultdata) elif self.name == "so": self.model = logit.DestModeModel(zone_data, self, resultdata) else: diff --git a/Scripts/datatypes/tour.py b/Scripts/datatypes/tour.py index 572ed7ca..eea3a9d6 100644 --- a/Scripts/datatypes/tour.py +++ b/Scripts/datatypes/tour.py @@ -1,5 +1,11 @@ -import numpy +from __future__ import annotations +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union, cast +import numpy # type: ignore import random +if TYPE_CHECKING: + from datatypes.purpose import TourPurpose + from datatypes.zone import Zone +from models.logit import ModeDestModel import parameters.car as param import parameters.zone as zone_param @@ -16,6 +22,8 @@ class Tour: Travel purpose (hw/hs/ho/...) origin : Zone or Tour Origin zone number or origin tour (if non-home tour) + person_id: + id of Person """ # Expansion factor used on demand in departure time model matrix = numpy.array([[1 / zone_param.agent_demand_fraction]]) @@ -23,10 +31,14 @@ class Tour: "total_access", "sustainable_access", "cost", "gen_cost"] - def __init__(self, purpose, origin, person_id): + def __init__(self, + purpose: TourPurpose, + origin: Union[Zone,'Tour'], + person_id: int): self.person_id = person_id self.purpose = purpose self.purpose_name = purpose.name + self.purpose.name = cast(str, self.purpose.name) #type checker help self.orig = origin try: self.sec_dest_prob = purpose.sec_dest_purpose.gen_model.param[purpose.name] @@ -45,7 +57,7 @@ def mode(self): return self.purpose.modes[self._mode_idx] @property - def is_car_passenger(self): + def is_car_passenger(self) -> bool: return self.mode == "car" and self._is_car_passenger @property @@ -62,7 +74,7 @@ def orig(self, origin): self._non_home_position = () @property - def dest(self): + def dest(self) -> Optional[int]: if len(self.position) > 1: return self.purpose.zone_data.zone_numbers[self.position[1]] else: @@ -76,8 +88,9 @@ def dest(self, destination): ) @property - def sec_dest(self): + def sec_dest(self) -> Optional[int]: if len(self.position) > 2: + self.position = cast(Tuple[int,int,int], self.position) #help for the type checker return self.purpose.zone_data.zone_numbers[self.position[2]] else: return None @@ -91,7 +104,7 @@ def sec_dest(self, destination): ) @property - def position(self): + def position(self) -> Union[Tuple[int,int], Tuple[int,int,int]]: """Index position in matrix where to insert the demand. Returns @@ -111,7 +124,7 @@ def position(self, position): else: self._non_home_position = position[1:] - def choose_mode(self, is_car_user): + def choose_mode(self, is_car_user: bool): """Choose tour travel mode. Assumes tour purpose model has already calculated probability matrices. @@ -121,6 +134,7 @@ def choose_mode(self, is_car_user): is_car_user : bool Whether the person is car user or not """ + self.purpose.model = cast(ModeDestModel, self.purpose.model) #type checker help probs, accessibility = self.purpose.model.calc_individual_mode_prob( is_car_user, self.position[0]) self._mode_idx = numpy.searchsorted(probs.cumsum(), self._mode_draw) @@ -131,7 +145,7 @@ def choose_mode(self, is_car_user): def sustainable_access(self): return -self.purpose.sustainable_access[self.orig] - def choose_destination(self, sec_dest_tours): + def choose_destination(self, sec_dest_tours: Dict[str, Dict[int, Dict[int, List['Tour']]]]): """Choose primary destination for the tour. Assumes tour purpose model has already calculated probability matrices. @@ -145,6 +159,7 @@ def choose_destination(self, sec_dest_tours): """ orig_idx = self.position[0] orig_rel_idx = orig_idx - self.purpose.bounds.start + self.purpose.model = cast(ModeDestModel, self.purpose.model) #type checker help dest_idx = numpy.searchsorted( self.purpose.model.cumul_dest_prob[self.mode][:, orig_rel_idx], self._dest_draw) @@ -170,7 +185,7 @@ def choose_destination(self, sec_dest_tours): dest_idx =- bounds.start sec_dest_tours[self.mode][orig_rel_idx][dest_idx].append(self) - def choose_secondary_destination(self, cumulative_probs): + def choose_secondary_destination(self, cumulative_probs: numpy.ndarray): """Choose secondary destination for the tour. Parameters @@ -183,7 +198,7 @@ def choose_secondary_destination(self, cumulative_probs): self.position = (self.position[0], self.position[1], dest_idx) self.purpose.sec_dest_purpose.attracted_tours[self.mode][dest_idx] += 1 - def calc_cost(self, impedance): + def calc_cost(self, impedance: Dict[str,Dict[str,Dict[str,numpy.ndarray]]]): """Construct cost and time components from tour dest choice. Parameters @@ -195,15 +210,19 @@ def calc_cost(self, impedance): """ time = self._get_cost(impedance, "time") self.cost = self._get_cost(impedance, "cost") + self.purpose_name = cast(str, self.purpose_name) #type checker help vot = 1 / vot_inv[assignment_classes[self.purpose_name]] self.gen_cost = self.cost + time * vot - def _get_cost(self, impedance, mtx_type): + def _get_cost(self, + impedance: Dict[str,Dict[str,Dict[str,numpy.ndarray]]], + mtx_type: str) -> Union[int,float]: """Get cost and time components from tour dest choice.""" + self.purpose.name = cast(str, self.purpose.name) #type checker help demand_type = assignment_classes[self.purpose.name] ass_class = ("{}_{}".format(self.mode, demand_type) if self.mode in divided_classes else self.mode) - cost = 0 + cost: float = 0.0 try: if demand_type == "work": departure_imp = impedance["aht"][mtx_type][ass_class] @@ -217,6 +236,7 @@ def _get_cost(self, impedance, mtx_type): cost += departure_imp[self.position[0], self.position[1]] # check if tour has secondary destination and add accordingly if len(self.position) > 2: + self.position = cast(Tuple[int,int,int], self.position) #type checker help cost += sec_dest_imp[self.position[1], self.position[2]] cost += return_imp[self.position[2], self.position[0]] else: @@ -226,13 +246,14 @@ def _get_cost(self, impedance, mtx_type): # KeyErrors are produced when trying to access matrix pass # scale transit costs from month to day + self.purpose.area = cast(str, self.purpose.area) #type checker help if self.mode == "transit" and mtx_type == "cost": i = self.purpose.sub_intervals.searchsorted( self.position[0], side="right") cost /= transit_trips_per_month[self.purpose.area][demand_type][i] return cost - def __str__(self): + def __str__(self) -> str: """ Return tour attributes as string. Returns diff --git a/Scripts/datatypes/zone.py b/Scripts/datatypes/zone.py index 60538e3b..da295442 100644 --- a/Scripts/datatypes/zone.py +++ b/Scripts/datatypes/zone.py @@ -5,7 +5,7 @@ class Zone: counter = 0 - def __init__(self, number): + def __init__(self, number: int): self.number = number self.index = Zone.counter Zone.counter += 1 diff --git a/Scripts/demand/external.py b/Scripts/demand/external.py index f3d1b207..c510610d 100644 --- a/Scripts/demand/external.py +++ b/Scripts/demand/external.py @@ -1,5 +1,10 @@ +from __future__ import annotations +from typing import TYPE_CHECKING import pandas -import numpy +import numpy # type: ignore +if TYPE_CHECKING: + from datahandling.matrixdata import MatrixData + from datahandling.zonedata import ZoneData from datatypes.demand import Demand from datatypes.purpose import Purpose @@ -19,7 +24,10 @@ class ExternalModel: Zone numbers from assignment model """ - def __init__(self, base_demand, zone_data, zone_numbers): + def __init__(self, + base_demand: MatrixData, + zone_data: ZoneData, + zone_numbers: numpy.array): self.base_demand = base_demand self.all_zone_numbers = zone_numbers self.growth = zone_data.externalgrowth @@ -31,7 +39,7 @@ def __init__(self, base_demand, zone_data, zone_numbers): } self.purpose = Purpose(spec, zone_data) - def calc_external(self, mode, internal_trips): + def calc_external(self, mode: str, internal_trips: pandas.Series) -> Demand: """Calculate external traffic. Parameters diff --git a/Scripts/demand/freight.py b/Scripts/demand/freight.py index 32b7cd8a..35a172ef 100644 --- a/Scripts/demand/freight.py +++ b/Scripts/demand/freight.py @@ -1,5 +1,10 @@ -import numpy +from __future__ import annotations +from typing import TYPE_CHECKING +import numpy # type: ignore import pandas +if TYPE_CHECKING: + from datahandling.matrixdata import MatrixData + from datahandling.zonedata import ZoneData import parameters.tour_generation as param from utils.freight import fratar, calibrate @@ -20,7 +25,10 @@ class FreightModel: Base demand matrices """ - def __init__(self, zone_data_base, zone_data_forecast, base_demand): + def __init__(self, + zone_data_base: ZoneData, + zone_data_forecast: ZoneData, + base_demand: MatrixData): self.zdata_b = zone_data_base self.zdata_f = zone_data_forecast self.base_demand = base_demand @@ -32,7 +40,7 @@ def __init__(self, zone_data_base, zone_data_forecast, base_demand): } self.purpose = Purpose(spec, zone_data_base) - def calc_freight_traffic(self, mode): + def calc_freight_traffic(self, mode: str) -> Demand: """Calculate freight traffic matrix. Parameters @@ -47,8 +55,8 @@ def calc_freight_traffic(self, mode): """ zone_data_base = self.zdata_b.get_freight_data() zone_data_forecast = self.zdata_f.get_freight_data() - production_base = self._generate_trips(zone_data_base, mode) - production_forecast = self._generate_trips(zone_data_forecast, mode) + production_base: numpy.ndarray = self._generate_trips(zone_data_base, mode) + production_forecast: numpy.ndarray = self._generate_trips(zone_data_forecast, mode) zone_numbers = self.zdata_b.zone_numbers with self.base_demand.open("freight", "vrk", list(zone_numbers)) as mtx: # Remove zero values @@ -98,6 +106,8 @@ def calc_freight_traffic(self, mode): demand.loc[self.zdata_f.trailers_prohibited] = 0 return Demand(self.purpose, mode, demand.values) - def _generate_trips(self, zone_data, mode): + def _generate_trips(self, + zone_data: pandas.DataFrame, + mode: str) -> numpy.ndarray: b = pandas.Series(param.tour_generation[mode]) return (b * zone_data).sum(1) + 0.001 diff --git a/Scripts/demand/trips.py b/Scripts/demand/trips.py index 68c9da9e..dab28ed1 100644 --- a/Scripts/demand/trips.py +++ b/Scripts/demand/trips.py @@ -1,11 +1,18 @@ -import numpy +from __future__ import annotations +from typing import TYPE_CHECKING, Dict, Tuple, Union, cast +import numpy # type: ignore import pandas +if TYPE_CHECKING: + from datahandling.resultdata import ResultsData + from datahandling.zonedata import ZoneData + from datatypes.purpose import Purpose +from datatypes.person import Person import utils.log as log import parameters.zone as param from datatypes.purpose import TourPurpose, SecDestPurpose from models import car_use, linear, tour_combinations -from datatypes.person import Person + class DemandModel: @@ -21,11 +28,14 @@ class DemandModel: Whether the model is used for agent-based simulation """ - def __init__(self, zone_data, resultdata, is_agent_model=False): + def __init__(self, + zone_data: ZoneData, + resultdata: ResultsData, + is_agent_model: bool=False): self.resultdata = resultdata self.zone_data = zone_data self.tour_purposes = [] - self.purpose_dict = {} + self.purpose_dict: Dict[str,Purpose] = {} for purpose_spec in param.tour_purposes: args = (purpose_spec, zone_data, resultdata) purpose = (SecDestPurpose(*args) if "sec_dest" in purpose_spec @@ -38,7 +48,8 @@ def __init__(self, zone_data, resultdata, is_agent_model=False): for source in purpose_spec["source"]: purpose.sources.append(self.purpose_dict[source]) if "sec_dest" in purpose_spec: - self.purpose_dict[source].sec_dest_purpose = purpose + tour_purpose = cast(TourPurpose, self.purpose_dict[source]) #type checker hint + tour_purpose.sec_dest_purpose = purpose bounds = param.purpose_areas["metropolitan"] self.bounds = slice(*zone_data.all_zone_numbers.searchsorted( [bounds[0], bounds[-1]])) @@ -158,7 +169,7 @@ def generate_tours(self): self.resultdata.print_matrix( result_data, "tour_combinations", "tour_combinations") - def generate_tour_probs(self): + def generate_tour_probs(self) -> Dict[Tuple[int,int], numpy.ndarray]: """Generate matrices of cumulative tour combination probabilities. Used in agent-based simulation. @@ -177,7 +188,7 @@ def generate_tour_probs(self): for is_car_user in (False, True)] return probs - def _get_probs(self, age, is_car_user): + def _get_probs(self, age: str, is_car_user: bool) -> pandas.DataFrame: probs = self.tour_generation_model.calc_prob( age, is_car_user, self.bounds) return pandas.DataFrame(probs).to_numpy().cumsum(axis=1) diff --git a/Scripts/helmet.py b/Scripts/helmet.py index 503c983b..0fe48fcf 100644 --- a/Scripts/helmet.py +++ b/Scripts/helmet.py @@ -19,11 +19,11 @@ def main(args): else: raise ArgumentTypeError( "Iteration number {} not valid".format(args.iterations)) - base_zonedata_path = os.path.join(args.baseline_data_path, "2018_zonedata") - base_matrices_path = os.path.join(args.baseline_data_path, "base_matrices") - forecast_zonedata_path = args.forecast_data_path - results_path = args.results_path - emme_project_path = args.emme_path + base_zonedata_path: str = os.path.join(args.baseline_data_path, "2018_zonedata") + base_matrices_path: str = os.path.join(args.baseline_data_path, "base_matrices") + forecast_zonedata_path: str = args.forecast_data_path + results_path: str = args.results_path + emme_project_path: str = args.emme_path log_extra = { "status": { "name": args.scenario_name, diff --git a/Scripts/helmet_validate_inputfiles.py b/Scripts/helmet_validate_inputfiles.py index d138175c..65482947 100644 --- a/Scripts/helmet_validate_inputfiles.py +++ b/Scripts/helmet_validate_inputfiles.py @@ -1,6 +1,7 @@ from argparse import ArgumentParser import os import sys +from typing import List, Union import utils.config import utils.log as log @@ -14,9 +15,9 @@ def main(args): base_zonedata_path = os.path.join(args.baseline_data_path, "2018_zonedata") base_matrices_path = os.path.join(args.baseline_data_path, "base_matrices") - emme_paths = args.emme_paths - first_scenario_ids = args.first_scenario_ids - forecast_zonedata_paths = args.forecast_data_paths + emme_paths: Union[str,List[str]] = args.emme_paths + first_scenario_ids: Union[int,List[int]] = args.first_scenario_ids + forecast_zonedata_paths: Union[str,List[str]] = args.forecast_data_paths if not emme_paths: msg = "Missing required argument 'emme-paths'." @@ -72,7 +73,7 @@ def main(args): emp_path) log.error(msg) raise ValueError(msg) - import inro.emme.desktop.app as _app + import inro.emme.desktop.app as _app # type: ignore app = _app.start_dedicated( project=emp_path, visible=False, user_initials="HSL") scen = app.data_explorer().active_database().core_emmebank.scenario( diff --git a/Scripts/models/car_use.py b/Scripts/models/car_use.py index 4f7c4a84..37c5e00f 100644 --- a/Scripts/models/car_use.py +++ b/Scripts/models/car_use.py @@ -1,5 +1,10 @@ -import numpy +from __future__ import annotations +from typing import TYPE_CHECKING, List, Optional, Tuple +import numpy # type: ignore import pandas +if TYPE_CHECKING: + from datahandling.resultdata import ResultsData + from datahandling.zonedata import ZoneData from models.logit import LogitModel from parameters.car import car_usage @@ -15,7 +20,7 @@ class CarUseModel(LogitModel): Data used for all demand calculations bounds : slice Zone bounds - age_groups : tuple + age_groups : list tuple int Age intervals @@ -23,7 +28,11 @@ class CarUseModel(LogitModel): Writer object to result directory """ - def __init__(self, zone_data, bounds, age_groups, resultdata): + def __init__(self, + zone_data: ZoneData, + bounds: slice, + age_groups: List[Tuple[int,int]], + resultdata: ResultsData): self.resultdata = resultdata self.zone_data = zone_data self.bounds = bounds @@ -46,7 +55,7 @@ def _check(self, dummy): raise AttributeError( "Car use dummy name {} not valid".format(age_interval)) - def calc_basic_prob(self): + def calc_basic_prob(self) -> numpy.ndarray: """Calculate car user probabilities without individual dummies. Returns @@ -63,7 +72,7 @@ def calc_basic_prob(self): prob = self.exps / (self.exps+1) return prob - def calc_prob(self): + def calc_prob(self) -> pandas.Series: """Calculate car user probabilities with individual dummies included. Returns @@ -74,10 +83,10 @@ def calc_prob(self): prob = self.calc_basic_prob() no_dummy_share = 1 dummy_prob = 0 - b = self.param + b = self.param # ???types for i in b["individual_dummy"]: try: - dummy_share = self.zone_data.get_data( + dummy_share:numpy.ndarray = self.zone_data.get_data( "share_"+i, self.bounds, generation=True) except TypeError: # If the dummy is for a compound segment (age + gender) @@ -96,7 +105,10 @@ def calc_prob(self): self.print_results(prob) return prob - def calc_individual_prob(self, age_group, gender, zone=None): + def calc_individual_prob(self, + age_group: str, + gender: str, + zone: Optional[int] = None): """Calculate car user probability with individual dummies included. Uses results from previously run `calc_basic_prob()`. @@ -129,7 +141,9 @@ def calc_individual_prob(self, age_group, gender, zone=None): prob = exp / (exp+1) return prob - def print_results(self, prob, population_7_99=None): + def print_results(self, + prob: pandas.Series, + population_7_99: Optional[pandas.Series]=None): """ Print results, mainly for calibration purposes""" # Print car user share by zone self.resultdata.print_data(prob, "car_use.txt", "car_use") diff --git a/Scripts/models/linear.py b/Scripts/models/linear.py index c376b466..7d4c49c9 100644 --- a/Scripts/models/linear.py +++ b/Scripts/models/linear.py @@ -1,4 +1,4 @@ -import numpy +import numpy # type: ignore import pandas import math diff --git a/Scripts/models/logit.py b/Scripts/models/logit.py index 5f3ff16e..803b858a 100644 --- a/Scripts/models/logit.py +++ b/Scripts/models/logit.py @@ -1,6 +1,12 @@ -import numpy +from __future__ import annotations +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, cast +import numpy # type: ignore import pandas import math +if TYPE_CHECKING: + from datahandling.resultdata import ResultsData + from datahandling.zonedata import ZoneData + from datatypes.purpose import TourPurpose from parameters.destination_choice import destination_choice, distance_boundary from parameters.mode_choice import mode_choice @@ -21,16 +27,20 @@ class LogitModel: Writer object to result directory """ - def __init__(self, zone_data, purpose, resultdata): + def __init__(self, + zone_data: ZoneData, + purpose: TourPurpose, + resultdata: ResultsData): self.resultdata = resultdata self.purpose = purpose self.bounds = purpose.bounds self.sub_bounds = purpose.sub_bounds self.zone_data = zone_data - self.dest_exps = {} - self.mode_exps = {} - self.dest_choice_param = destination_choice[purpose.name] - self.mode_choice_param = mode_choice[purpose.name] + self.dest_exps: Dict[str, numpy.array] = {} + self.mode_exps: Dict[str, numpy.array] = {} + purpose.name = cast(str, purpose.name) #type checker help + self.dest_choice_param: Dict[str, Dict[str, Any]] = destination_choice[purpose.name] + self.mode_choice_param: Optional[Dict[str, Dict[str, Any]]] = mode_choice[purpose.name] def _calc_mode_util(self, impedance): expsum = numpy.zeros_like( @@ -51,7 +61,7 @@ def _calc_mode_util(self, impedance): def _calc_dest_util(self, mode, impedance): b = self.dest_choice_param[mode] - utility = numpy.zeros_like(next(iter(impedance.values()))) + utility: numpy.array = numpy.zeros_like(next(iter(impedance.values()))) self._add_zone_util(utility, b["attraction"]) self._add_impedance(utility, impedance, b["impedance"]) self.dest_exps[mode] = numpy.exp(utility) @@ -337,7 +347,9 @@ def calc_individual_prob(self, mod_mode, dummy): mode_expsum += self.mode_exps[mode] return self._calc_prob(mode_expsum) - def calc_individual_mode_prob(self, is_car_user, zone): + def calc_individual_mode_prob(self, + is_car_user: bool, + zone: int) -> Tuple[numpy.array, float]: """Calculate individual choice probabilities with individual dummies. Calculate mode choice probabilities for individual @@ -365,6 +377,7 @@ def calc_individual_mode_prob(self, is_car_user, zone): modes = self.purpose.modes for mode in modes: mode_exps[mode] = self.mode_exps[mode][zone] + self.mode_choice_param = cast(Dict[str, Dict[str, Any]], self.mode_choice_param) #type checker help b = self.mode_choice_param[mode]["individual_dummy"] if is_car_user and "car_users" in b: try: @@ -388,6 +401,7 @@ def calc_individual_mode_prob(self, is_car_user, zone): # Separate sub-region parameters i = self.purpose.sub_intervals.searchsorted(zone, side="right") money_utility = 1 / b[i] + self.mode_choice_param = cast(Dict[str, Dict[str, Any]], self.mode_choice_param) #type checker help money_utility /= self.mode_choice_param["car"]["log"]["logsum"] accessibility = -money_utility * logsum return probs, accessibility diff --git a/Scripts/models/tour_combinations.py b/Scripts/models/tour_combinations.py index 437234f6..182b0ac4 100644 --- a/Scripts/models/tour_combinations.py +++ b/Scripts/models/tour_combinations.py @@ -1,4 +1,4 @@ -import numpy +import numpy # type: ignore import parameters.tour_generation as param diff --git a/Scripts/modelsystem.py b/Scripts/modelsystem.py index 83c9577e..a238c1f0 100644 --- a/Scripts/modelsystem.py +++ b/Scripts/modelsystem.py @@ -1,10 +1,14 @@ import threading import multiprocessing import os -import numpy +from typing import Any, Callable, Dict, List, Set, Union, cast +import numpy # type: ignore import pandas import random from collections import defaultdict +from assignment.abstract_assignment import AssignmentModel +from assignment.emme_assignment import EmmeAssignmentModel +from assignment.mock_assignment import MockAssignmentModel import utils.log as log from utils.zone_interval import ArrayAggregator @@ -45,11 +49,16 @@ class ModelSystem: Name of scenario, used for results subfolder """ - def __init__(self, zone_data_path, base_zone_data_path, base_matrices_path, - results_path, assignment_model, name): - self.ass_model = assignment_model - self.zone_numbers = self.ass_model.zone_numbers - self.travel_modes = {} # Dict instead of set, to preserve order + def __init__(self, + zone_data_path: str, + base_zone_data_path: str, + base_matrices_path: str, + results_path: str, + assignment_model: AssignmentModel, + name: str): + self.ass_model = cast(Union[MockAssignmentModel,EmmeAssignmentModel], assignment_model) #type checker hint + self.zone_numbers: numpy.array = self.ass_model.zone_numbers + self.travel_modes: Dict[str, bool] = {} # Dict instead of set, to preserve order # Input data self.zdata_base = BaseZoneData( @@ -74,7 +83,7 @@ def __init__(self, zone_data_path, base_zone_data_path, base_matrices_path, bounds = slice(0, self.zdata_forecast.nr_zones) self.cdm = CarDensityModel( self.zdata_base, self.zdata_forecast, bounds, self.resultdata) - self.mode_share = [] + self.mode_share: List[Dict[str,Any]] = [] self.convergence = pandas.DataFrame() self.trucks = self.fm.calc_freight_traffic("truck") self.trailer_trucks = self.fm.calc_freight_traffic("trailer_truck") @@ -143,7 +152,9 @@ def _add_internal_demand(self, previous_iter_impedance, is_last_iteration): log.info("Demand calculation completed") # possibly merge with init - def assign_base_demand(self, use_fixed_transit_cost=False, is_end_assignment=False): + def assign_base_demand(self, + use_fixed_transit_cost: bool = False, + is_end_assignment: bool = False) -> Dict[str, Dict[str, numpy.ndarray]]: """Assign base demand to network (before first iteration). Parameters @@ -195,6 +206,7 @@ def assign_base_demand(self, use_fixed_transit_cost=False, is_end_assignment=Fal for ap in self.ass_model.assignment_periods: tp = ap.name log.info("Assigning period {}...".format(tp)) + self.dtm.demand = cast(Dict[str, Any], self.dtm.demand) #type check hint with demand.open("demand", tp, self.ass_model.zone_numbers) as mtx: for ass_class in param.transport_classes: self.dtm.demand[tp][ass_class] = mtx[ass_class] diff --git a/Scripts/mypy_tests.txt b/Scripts/mypy_tests.txt new file mode 100644 index 00000000..71b9881f --- /dev/null +++ b/Scripts/mypy_tests.txt @@ -0,0 +1,2 @@ +python -m mypy helmet_validate_inputfiles.py +python -m mypy helmet.py \ No newline at end of file diff --git a/Scripts/parameters/assignment.py b/Scripts/parameters/assignment.py index 7830e5b0..2a25cbd1 100644 --- a/Scripts/parameters/assignment.py +++ b/Scripts/parameters/assignment.py @@ -1,6 +1,7 @@ ### ASSIGNMENT PARAMETERS ### from collections import namedtuple +from typing import Dict, List, Union RoadClass = namedtuple( "RoadClass", ( @@ -361,7 +362,7 @@ } ### ASSIGNMENT REFERENCES ### -time_periods = ("aht", "pt", "iht") +time_periods: List[str] = ["aht", "pt", "iht"] transport_classes = ( "car_work", "car_leisure", diff --git a/Scripts/parameters/car.py b/Scripts/parameters/car.py index 3b07fdc3..cb29680e 100644 --- a/Scripts/parameters/car.py +++ b/Scripts/parameters/car.py @@ -2,6 +2,9 @@ # Driver share of car tours # Inverse of car occupancy +from typing import Any, Dict, Tuple, Union + + car_driver_share = { "hw": 0.928309883, "hc": 0, @@ -15,7 +18,7 @@ "hop": 0.699605826, "oop": 0.784781268, } -car_usage = { +car_usage: Dict[str,Any] = { "constant": -11.2296, "generation": {}, "log": { diff --git a/Scripts/parameters/departure_time.py b/Scripts/parameters/departure_time.py index bfbce6a0..e756699d 100644 --- a/Scripts/parameters/departure_time.py +++ b/Scripts/parameters/departure_time.py @@ -1,7 +1,10 @@ ### DEPARTURE TIME PARAMETERS ### # Demand shares for different time periods -demand_share = { +from typing import Any, Dict + + +demand_share: Dict[str,Dict[str,Any]] = { "hw": { "car": { "aht": (0.288820549293814, 0.00164983305999384), diff --git a/Scripts/parameters/destination_choice.py b/Scripts/parameters/destination_choice.py index 1ef9eb3a..e03c98d8 100644 --- a/Scripts/parameters/destination_choice.py +++ b/Scripts/parameters/destination_choice.py @@ -1,10 +1,11 @@ -import numpy +from typing import Any, Dict +import numpy # type: ignore ### DESTINATION CHOICE PARAMETERS ### # Destination choice (calibrated 02.10.2020) -destination_choice = { +destination_choice: Dict[str, Dict[str, Dict[str, Any]]] = { "hw": { "car": { "attraction": { diff --git a/Scripts/parameters/impedance_transformation.py b/Scripts/parameters/impedance_transformation.py index 89a3b20a..4bcc2933 100644 --- a/Scripts/parameters/impedance_transformation.py +++ b/Scripts/parameters/impedance_transformation.py @@ -1,6 +1,9 @@ ### IMPEDANCE TRANSFORMATION PARAMETERS ### -transit_trips_per_month = { +from typing import Dict, Tuple, Union + + +transit_trips_per_month: Dict[str,Dict[str,Union[Tuple[float],Tuple[float,float]]]] = { "metropolitan": { "work": (60.0, 44.0), "leisure": (30.0, 30.0), diff --git a/Scripts/parameters/mode_choice.py b/Scripts/parameters/mode_choice.py index 9e6ec0b7..f9b1b0c5 100644 --- a/Scripts/parameters/mode_choice.py +++ b/Scripts/parameters/mode_choice.py @@ -1,7 +1,10 @@ ### MODE CHOICE PARAMETERS ### # Mode choice (calibrated 02.10.2020) -mode_choice = { +from typing import Any, Dict, Optional + + +mode_choice: Dict[str, Optional[Dict[str, Dict[str, Any]]]] = { "hw": { "car": { "constant": (0.830938747727 * (0 + 0.278), diff --git a/Scripts/parameters/zone.py b/Scripts/parameters/zone.py index 783f58f7..ee2e643d 100644 --- a/Scripts/parameters/zone.py +++ b/Scripts/parameters/zone.py @@ -1,4 +1,7 @@ # Share of demand that will be simulated in agent model +from typing import Any, Dict, List, Tuple, Union + + agent_demand_fraction = 1.0 # Seed number for population attributes: @@ -7,17 +10,17 @@ population_draw = 31 # Age groups in zone data -age_groups = ( +age_groups: List[Tuple[int, int]] = [ #changed to list for type checker (7, 17), (18, 29), (30, 49), (50, 64), (65, 99), - ) +] ### DEMAND MODEL REFERENCES ### -tour_purposes = ( +tour_purposes: List[Dict[str, Any]] = [ #changed to list for type checker { "name": "hw", "orig": "home", @@ -101,11 +104,11 @@ "source": ("sop",), "area": "all", }, -) +] # Tour purpose zone intervals # Some demand models have separate sub-region parameters, # hence need sub-intervals defined. -purpose_areas = { +purpose_areas: Dict[str, Union[Tuple[int,int],Tuple[int,int,int]]] = { "metropolitan": (0, 6000, 16000), "peripheral": (16000, 31000), "all": (0, 6000, 31000), diff --git a/Scripts/emme_tests/test_assignment.py b/Scripts/tests/emme_only/test_assignment.py similarity index 88% rename from Scripts/emme_tests/test_assignment.py rename to Scripts/tests/emme_only/test_assignment.py index 6be90d5d..aab0cd22 100644 --- a/Scripts/emme_tests/test_assignment.py +++ b/Scripts/tests/emme_only/test_assignment.py @@ -3,15 +3,20 @@ import os import logging +import utils.log as log import numpy import assignment.emme_assignment as ass from datahandling.zonedata import ZoneData from datahandling.matrixdata import MatrixData from datahandling.resultdata import ResultsData -from assignment.emme_bindings.emme_project import EmmeProject -import inro.emme.desktop.app as _app -import inro.emme.database.emmebank as _eb +try: + from assignment.emme_bindings.emme_project import EmmeProject + import inro.emme.desktop.app as _app + import inro.emme.database.emmebank as _eb + emme_available = True +except ImportError: + emme_available = False class EmmeAssignmentTest: @@ -26,6 +31,7 @@ def __init__(self): project_dir = os.path.join( os.path.dirname(os.path.realpath('__file__')), "tests", "test_data", "Results") + log.info(str(project_dir)) project_name = "test_assignment" db_dir = os.path.join(project_dir, project_name, "Database") try: @@ -88,13 +94,13 @@ def test_assignment(self): travel_cost[ap.name]["time"]["transit_uncongested"] = travel_cost[ap.name]["time"]["transit_work"] resultdata = ResultsData(os.path.join( os.path.dirname(os.path.realpath(__file__)), - "tests", "test_data", "Results", "assignment")) + "..","test_data", "Results", "assignment")) self.ass_model.aggregate_results(resultdata) self.ass_model.calc_noise() resultdata.flush() costs_files = MatrixData(os.path.join( os.path.dirname(os.path.realpath(__file__)), - "tests", "test_data", "Results", "assignment", "Matrices")) + "..","test_data", "Results", "assignment", "Matrices")) for time_period in travel_cost: for mtx_type in travel_cost[time_period]: zone_numbers = self.ass_model.zone_numbers @@ -105,11 +111,11 @@ def test_assignment(self): def test_transit_cost(self): zdata = ZoneData(os.path.join( - os.path.dirname(os.path.realpath(__file__)), "tests", "test_data", + os.path.dirname(os.path.realpath(__file__)), "..", "test_data", "Scenario_input_data", "2030_test"), self.ass_model.zone_numbers) peripheral_cost = numpy.ones((1, 10)) self.ass_model.calc_transit_cost(zdata.transit_zone, peripheral_cost) - -em = EmmeAssignmentTest() -em.test_assignment() +if emme_available: + em = EmmeAssignmentTest() + em.test_assignment() diff --git a/Scripts/transform/impedance_transformer.py b/Scripts/transform/impedance_transformer.py index 3304edc2..8eb81ed3 100644 --- a/Scripts/transform/impedance_transformer.py +++ b/Scripts/transform/impedance_transformer.py @@ -1,5 +1,5 @@ from collections import defaultdict -import numpy +import numpy # type: ignore import parameters.impedance_transformation as param from parameters.assignment import assignment_classes diff --git a/Scripts/utils/freight.py b/Scripts/utils/freight.py index 90ddd22a..f0722031 100644 --- a/Scripts/utils/freight.py +++ b/Scripts/utils/freight.py @@ -1,4 +1,4 @@ -import numpy +import numpy # type: ignore import parameters.tour_generation as param diff --git a/Scripts/utils/read_csv_file.py b/Scripts/utils/read_csv_file.py index 3b1e4162..4905f068 100644 --- a/Scripts/utils/read_csv_file.py +++ b/Scripts/utils/read_csv_file.py @@ -1,13 +1,18 @@ from decimal import DivisionByZero from itertools import groupby import os +from typing import Optional import pandas -import numpy +import numpy # type: ignore import utils.log as log -def read_csv_file(data_dir, file_end, zone_numbers=None, dtype=None, squeeze=False): +def read_csv_file(data_dir: str, + file_end: str, + zone_numbers: Optional[numpy.ndarray] = None, + dtype: Optional[numpy.dtype] = None, + squeeze: bool=False) -> pandas.DataFrame: """Read (zone) data from space-separated file. Parameters @@ -42,11 +47,11 @@ def read_csv_file(data_dir, file_end, zone_numbers=None, dtype=None, squeeze=Fal msg = "No {} file found in folder {}".format(file_end, data_dir) # This error should not be logged, as it is sometimes excepted raise NameError(msg) - header = None if squeeze else "infer" - data = pandas.read_csv( + header: Optional[str] = None if squeeze else "infer" + data: pandas.DataFrame = pandas.read_csv( path, delim_whitespace=True, squeeze=squeeze, keep_default_na=False, na_values="", comment='#', header=header) - if data.index.is_numeric() and data.index.hasnans: + if data.index.is_numeric() and data.index.hasnans: # type: ignore msg = "Row with only spaces or tabs in file {}".format(path) log.error(msg) raise IndexError(msg) diff --git a/Scripts/utils/zone_interval.py b/Scripts/utils/zone_interval.py index b90250b8..ab82d533 100644 --- a/Scripts/utils/zone_interval.py +++ b/Scripts/utils/zone_interval.py @@ -1,6 +1,6 @@ -import numpy +import numpy # type: ignore import pandas -from shapely.geometry import Point, Polygon +from shapely.geometry import Point, Polygon # type: ignore import parameters.zone as param import utils.log as log