Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make handling of EMME matrices less error-prone and more flexible #525

Merged
merged 13 commits into from
Sep 4, 2023
Merged
126 changes: 66 additions & 60 deletions Scripts/assignment/assignment_period.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
import numpy
import pandas
import copy

import utils.log as log
import parameters.assignment as param
Expand All @@ -13,22 +11,41 @@


class AssignmentPeriod(Period):
"""
EMME assignment period definition.

This typically represents an hour of the day, which may or may not
have a dedicated EMME scenario. In case it does not have its own
EMME scenario, assignment results are stored only in extra attributes.

Parameters
----------
name : str
Time period name (aht/pt/iht)
emme_scenario : int
EMME scenario linked to the time period
emme_context : assignment.emme_bindings.emme_project.EmmeProject
Emme project to connect to this assignment
emme_matrices : dict
key : str
Assignment class (car_work/transit_leisure/...)
value : dict
key : str
Matrix type (demand/time/cost/dist/...)
value : str
EMME matrix id
separate_emme_scenarios : bool (optional)
Whether separate scenarios have been created in EMME
for storing time-period specific network results.
"""
def __init__(self, name, emme_scenario, emme_context,
demand_mtx=param.emme_demand_mtx,
result_mtx=param.emme_result_mtx, save_matrices=False,
separate_emme_scenarios=False):
emme_matrices, separate_emme_scenarios=False):
self.name = name
self.emme_scenario = emme_context.modeller.emmebank.scenario(
emme_scenario)
self.emme_project = emme_context
self._separate_emme_scenarios = separate_emme_scenarios
if save_matrices:
self.demand_mtx = copy.deepcopy(demand_mtx)
self.result_mtx = copy.deepcopy(result_mtx)
else:
# Refer to the same matrices for all time periods
self.demand_mtx = demand_mtx
self.result_mtx = result_mtx
self.emme_matrices = emme_matrices
self.dist_unit_cost = param.dist_unit_cost

def extra(self, attr):
Expand Down Expand Up @@ -91,7 +108,7 @@ def assign(self, matrices, iteration):
if iteration=="init":
self._assign_pedestrians()
self._set_bike_vdfs()
self._assign_bikes(self.result_mtx["dist"]["bike"]["id"], "all")
self._assign_bikes(self.emme_matrices["bike"]["dist"], "all")
self._set_car_and_transit_vdfs()
if not self._separate_emme_scenarios:
self._calc_background_traffic()
Expand Down Expand Up @@ -123,7 +140,7 @@ def assign(self, matrices, iteration):
self._assign_transit()
elif iteration=="last":
self._set_bike_vdfs()
self._assign_bikes(self.result_mtx["dist"]["bike"]["id"], "all")
self._assign_bikes(self.emme_matrices["bike"]["dist"], "all")
self._set_car_and_transit_vdfs()
self._calc_background_traffic()
self._assign_cars(param.stopping_criteria_fine)
Expand Down Expand Up @@ -186,11 +203,7 @@ def calc_transit_cost(self, fares, peripheral_cost, mapping):
tc = "transit_work"
spec = TransitSpecification(
self._segment_results[tc], self.extra("hw"),
self.demand_mtx[tc]["id"],
self.result_mtx["time"][tc]["id"],
self.result_mtx["dist"][tc]["id"],
self.result_mtx["trip_part_"+tc],
count_zone_boardings=True)
self.emme_matrices[tc], count_zone_boardings=True)
is_in_transit_zone_attr = param.is_in_transit_zone_attr.replace(
"ui", "data")
for transit_zone in transit_zones:
Expand All @@ -205,7 +218,7 @@ def calc_transit_cost(self, fares, peripheral_cost, mapping):
self.emme_project.matrix_results(
spec.transit_result_spec, self.emme_scenario)
nr_visits = self._get_matrix(
"trip_part_transit_work", "board_cost")
tc, "actual_total_boarding_costs")
# If the number of visits is less than 1, there seems to
# be an easy way to avoid visiting this transit zone
has_visited[transit_zone] = (nr_visits > 0.99)
Expand Down Expand Up @@ -243,7 +256,7 @@ def calc_transit_cost(self, fares, peripheral_cost, mapping):
cost[l:u, :u] = peripheral_cost
cost[:u, l:u] = peripheral_cost.T
# Calculate distance-based cost from inv-distance
dist = self._get_matrix("dist", "transit_work")
dist = self._get_matrix(tc, "dist")
dist_cost = fares.start_fare + fares.dist_fare*dist
cost[cost>=maxfare] = dist_cost[cost>=maxfare]
# Reset boarding penalties
Expand Down Expand Up @@ -403,19 +416,15 @@ def _set_emmebank_matrices(self, matrices, is_last_iteration):
else:
self._set_matrix(mtx, matrices[mtx])

def _set_matrix(self, mtx_label, matrix, result_type=None):
def _set_matrix(self, ass_class, matrix, matrix_type="demand"):
if numpy.isnan(matrix).any():
msg = ("NAs in demand matrix {}. ".format(mtx_label)
+ "Would cause infinite loop in Emme assignment.")
msg = ("NAs in demand matrix {} ".format(ass_class)
+ "would cause infinite loop in Emme assignment.")
log.error(msg)
raise ValueError(msg)
elif result_type is None:
self.emme_project.modeller.emmebank.matrix(
self.demand_mtx[mtx_label]["id"]).set_numpy_data(
matrix, scenario_id=self.emme_scenario.id)
else:
self.emme_project.modeller.emmebank.matrix(
self.result_mtx[result_type][mtx_label]["id"]).set_numpy_data(
self.emme_matrices[ass_class][matrix_type]).set_numpy_data(
matrix, scenario_id=self.emme_scenario.id)

def _get_matrices(self, mtx_type, is_last_iteration=False):
Expand All @@ -435,44 +444,45 @@ def _get_matrices(self, mtx_type, is_last_iteration=False):
Subtype (car_work/truck/inv_time/...) : numpy 2-d matrix
Matrix of the specified type
"""
last_iteration_classes = param.freight_classes + ("transit_leisure",)
last_iter_classes = param.freight_classes + ("transit_leisure",)
matrices = {}
for subtype in self.result_mtx[mtx_type]:
if is_last_iteration or subtype not in last_iteration_classes:
if mtx_type == "time" and subtype in param.assignment_modes:
mtx = self._extract_timecost_from_gcost(subtype)
elif mtx_type == "time" and subtype in param.transit_classes:
mtx = self._damp_travel_time(subtype)
for ass_class, mtx_types in self.emme_matrices.items():
if (mtx_type in mtx_types and
(is_last_iteration or ass_class not in last_iter_classes)):
if mtx_type == "time" and ass_class in param.assignment_modes:
mtx = self._extract_timecost_from_gcost(ass_class)
elif mtx_type == "time" and ass_class in param.transit_classes:
mtx = self._damp_travel_time(ass_class)
else:
mtx = self._get_matrix(mtx_type, subtype)
matrices[subtype] = mtx
mtx = self._get_matrix(ass_class, mtx_type)
matrices[ass_class] = mtx
if not is_last_iteration:
matrices["transit_leisure"] = matrices["transit_work"]
return matrices

def _get_matrix(self, assignment_result_type, subtype):
def _get_matrix(self, ass_class, matrix_type):
"""Get matrix with type pair (e.g., demand, car_work).

Parameters
----------
assignment_result_type : str
Type (demand/time/transit/...)
subtype : str
Subtype (car_work/truck/inv_time/...)
ass_class : str
Assignment class (car_work/transit_leisure/truck/...)
matrix_type : str
Type (demand/time/cost/...)

Return
------
numpy 2-d matrix
Matrix of the specified type
"""
emme_id = self.result_mtx[assignment_result_type][subtype]["id"]
emme_id = self.emme_matrices[ass_class][matrix_type]
return (self.emme_project.modeller.emmebank.matrix(emme_id)
.get_numpy_data(scenario_id=self.emme_scenario.id))

def _damp_travel_time(self, demand_type):
"""Reduce the impact from first waiting time on total travel time."""
travel_time = self._get_matrix("time", demand_type)
fw_time = self._get_matrix("trip_part_"+demand_type, "fw_time")
travel_time = self._get_matrix(demand_type, "time")
fw_time = self._get_matrix(demand_type, "actual_first_waiting_times")
wt_weight = param.waiting_time_perception_factor
return travel_time + wt_weight*((5./3.*fw_time)**0.8 - fw_time)

Expand All @@ -483,9 +493,9 @@ def _extract_timecost_from_gcost(self, ass_class):
To get travel time, monetary cost is removed from generalized cost.
"""
vot_inv = param.vot_inv[param.vot_classes[ass_class]]
gcost = self._get_matrix("gen_cost", ass_class)
cost = self._get_matrix("cost", ass_class)
dist = self._get_matrix("dist", ass_class)
gcost = self._get_matrix(ass_class, "gen_cost")
cost = self._get_matrix(ass_class, "cost")
dist = self._get_matrix(ass_class, "dist")
if ass_class in ("trailer_truck", "truck"):
# toll costs are not applied to freight
time = gcost - vot_inv*param.freight_dist_unit_cost[ass_class]*dist
Expand Down Expand Up @@ -552,24 +562,20 @@ def _calc_boarding_penalties(self, extra_penalty=0, is_last_iteration=False):
self.emme_scenario.publish_network(network)

def _specify(self):
self._car_spec = CarSpecification(
self.extra, self.demand_mtx, self.result_mtx)
self._car_spec = CarSpecification(self.extra, self.emme_matrices)
self._transit_specs = {tc: TransitSpecification(
self._segment_results[tc], self.extra("hw"),
self.demand_mtx[tc]["id"],
self.result_mtx["time"][tc]["id"],
self.result_mtx["dist"][tc]["id"],
self.result_mtx["trip_part_"+tc])
self.emme_matrices[tc])
for tc in param.transit_classes}
self.bike_spec = {
"type": "STANDARD_TRAFFIC_ASSIGNMENT",
"classes": [
{
"mode": param.main_mode,
"demand": self.demand_mtx["bike"]["id"],
"demand": self.emme_matrices["bike"]["demand"],
"results": {
"od_travel_times": {
"shortest_paths": self.result_mtx["time"]["bike"]["id"],
"shortest_paths": self.emme_matrices["bike"]["time"],
},
"link_volumes": None, # This is defined later
},
Expand All @@ -592,7 +598,7 @@ def _specify(self):
self.walk_spec = {
"type": "STANDARD_TRANSIT_ASSIGNMENT",
"modes": param.aux_modes,
"demand": self.demand_mtx["bike"]["id"],
"demand": self.emme_matrices["bike"]["demand"],
"waiting_time": {
"headway_fraction": 0.01,
"effective_headways": "hdw",
Expand All @@ -606,7 +612,7 @@ def _specify(self):
"perception_factor": 1,
},
"od_results": {
"transit_times": self.result_mtx["time"]["walk"]["id"],
"transit_times": self.emme_matrices["walk"]["time"],
},
"strategy_analysis": {
"sub_path_combination_operator": "+",
Expand All @@ -622,7 +628,7 @@ def _specify(self):
},
},
"results": {
"od_values": self.result_mtx["dist"]["walk"]["id"],
"od_values": self.emme_matrices["walk"]["dist"],
},
},
}
Expand Down
30 changes: 24 additions & 6 deletions Scripts/assignment/datatypes/car.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,47 @@


class Car:
def __init__(self, ass_class, extra, demand_mtx, result_mtx,
"""Car assignment class definition.

Parameters
----------
ass_class : str
Assignment class (car_work/car_leisure/van/truck/trailer_truck)
extra : assignment_period.AssignmentPeriod.extra()
Function for generating extra attribute name
for specific assignment period
emme_matrices : dict
key : str
Impedance type (time/cost/dist/...)
value : str
Emme matrix id
link_costs : str
Link attribute where link cost is found
value_of_time_inv : float (optional)
Inversed value of time [min/eur], default is param.vot_inv
"""
def __init__(self, ass_class, extra, emme_matrices,
link_costs, value_of_time_inv=None):
od_travel_times = result_mtx["gen_cost"][ass_class]["id"]
if value_of_time_inv is None:
value_of_time_inv = param.vot_inv[param.vot_classes[ass_class]]
self.spec = {
"mode": param.assignment_modes[ass_class],
"demand": demand_mtx[ass_class]["id"],
"demand": emme_matrices["demand"],
"generalized_cost": {
"link_costs": link_costs,
"perception_factor": value_of_time_inv,
},
"results": {
"link_volumes": extra(ass_class),
"od_travel_times": {
"shortest_paths": od_travel_times
"shortest_paths": emme_matrices["gen_cost"]
}
},
"path_analyses": []
}
self.add_analysis("length", result_mtx["dist"][ass_class]["id"])
self.add_analysis("length", emme_matrices["dist"])
if ass_class not in ("trailer_truck", "truck"):
self.add_analysis(extra("toll_cost"), result_mtx["cost"][ass_class]["id"])
self.add_analysis(extra("toll_cost"), emme_matrices["cost"])

def add_analysis (self, link_component, od_values):
analysis = PathAnalysis(link_component, od_values)
Expand Down
Loading