Skip to content

Commit

Permalink
[db cach] node defs, config, and types
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Oct 25, 2024
1 parent 561665a commit 592d0df
Show file tree
Hide file tree
Showing 15 changed files with 311 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def ensure_valid_config(remote_job: RemoteJob, run_config: object) -> object:
if not validated_config.success:
raise UserFacingGraphQLError(
GrapheneRunConfigValidationInvalid.for_validation_errors(
remote_job, validated_config.errors
remote_job,
check.not_none(validated_config.errors),
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def resolve_is_run_config_valid(
pipeline_name=represented_pipeline.name,
errors=[
GraphenePipelineConfigValidationError.from_dagster_error(
represented_pipeline.config_schema_snapshot,
represented_pipeline.config_schema_snapshot.get_config_snap,
err,
)
for err in errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,36 +391,42 @@ def stale_status_loader(self) -> StaleStatusLoader:
def asset_graph_differ(self) -> Optional[AssetGraphDiffer]:
return self._asset_graph_differ

def _job_selector(self) -> Optional[JobSelector]:
if len(self._asset_node_snap.job_names) < 1:
return None

return JobSelector(
location_name=self._repository_selector.location_name,
repository_name=self._repository_selector.repository_name,
job_name=self._asset_node_snap.job_names[0],
)

def get_remote_job(self, graphene_info: ResolveInfo) -> RemoteJob:
selector = self._job_selector()
if self._remote_job is None:
check.invariant(
len(self._asset_node_snap.job_names) >= 1,
"Asset must be part of at least one job",
)
selector = JobSelector(
location_name=self._repository_selector.location_name,
repository_name=self._repository_selector.repository_name,
job_name=self._asset_node_snap.job_names[0],
)
if selector is None:
check.failed("Asset must be part of a job")
self._remote_job = graphene_info.context.get_full_job(selector)
return self._remote_job

def get_node_definition_snap(
self,
graphene_info: ResolveInfo,
) -> Union[GraphDefSnap, OpDefSnap]:
if self._node_definition_snap is None and len(self._asset_node_snap.job_names) > 0:
) -> Optional[Union[GraphDefSnap, OpDefSnap]]:
selector = self._job_selector()
if selector is None:
return None

if self._node_definition_snap is None:
node_key = check.not_none(
self._asset_node_snap.node_definition_name
# nodes serialized using an older Dagster version may not have node_definition_name
or self._asset_node_snap.graph_name
or self._asset_node_snap.op_name
)
self._node_definition_snap = self.get_remote_job(graphene_info).get_node_def_snap(
node_key
)
# weird mypy bug causes mistyped _node_definition_snap
return check.not_none(self._node_definition_snap)
self._node_definition_snap = graphene_info.context.get_node_def(selector, node_key)

return self._node_definition_snap

def get_partition_keys(
self,
Expand Down Expand Up @@ -650,17 +656,19 @@ def resolve_assetObservations(
]

def resolve_configField(self, graphene_info: ResolveInfo) -> Optional[GrapheneConfigTypeField]:
if not self.is_executable:
selector = self._job_selector()
if selector is None:
return None
job = self.get_remote_job(graphene_info)
node_def_snap = self.get_node_definition_snap(graphene_info)
return (
GrapheneConfigTypeField(
config_schema_snapshot=job.config_schema_snapshot,
field_snap=node_def_snap.config_field_snap,
)
if node_def_snap.config_field_snap
else None
if node_def_snap is None or node_def_snap.config_field_snap is None:
return None

def _get_config_type(key: str):
return graphene_info.context.get_config_type(selector, key)

return GrapheneConfigTypeField(
get_config_type=_get_config_type,
field_snap=node_def_snap.config_field_snap,
)

def resolve_computeKind(self, _graphene_info: ResolveInfo) -> Optional[str]:
Expand Down Expand Up @@ -1158,6 +1166,9 @@ def resolve_op(
return None
job = self.get_remote_job(graphene_info)
node_def_snap = self.get_node_definition_snap(graphene_info)
if node_def_snap is None:
return None

if isinstance(node_def_snap, OpDefSnap):
return GrapheneSolidDefinition(job, node_def_snap.name)

Expand Down Expand Up @@ -1234,6 +1245,8 @@ def resolve_required_resources(
if not self.is_executable:
return []
node_def_snap = self.get_node_definition_snap(graphene_info)
if node_def_snap is None:
return []
all_unique_keys = self.get_required_resource_keys(graphene_info, node_def_snap)
return [GrapheneResourceRequirement(key) for key in all_unique_keys]

Expand All @@ -1244,16 +1257,26 @@ def resolve_type(
"GrapheneListDagsterType", "GrapheneNullableDagsterType", "GrapheneRegularDagsterType"
]
]:
if not self._asset_node_snap.is_materializable:
selector = self._job_selector()
node_def_snap = self.get_node_definition_snap(graphene_info)

if selector is None or node_def_snap is None:
return None
job = self.get_remote_job(graphene_info)

def _get_dagster_type(key: str):
return graphene_info.context.get_dagster_type(selector, key)

def _get_config_type(key: str):
return graphene_info.context.get_config_type(selector, key)

output_name = self._asset_node_snap.output_name
if output_name:
for output_def in self.get_node_definition_snap(graphene_info).output_def_snaps:
for output_def in node_def_snap.output_def_snaps:
if output_def.name == output_name:
return to_dagster_type(
job.job_snapshot,
output_def.dagster_type_key,
get_dagster_type=_get_dagster_type,
get_config_type=_get_config_type,
dagster_type_key=output_def.dagster_type_key,
)
return None

Expand Down
Loading

0 comments on commit 592d0df

Please sign in to comment.