Skip to content

Commit

Permalink
Correcting OTEL context propogation
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanFauble committed Dec 1, 2023
1 parent 6b5b9af commit 86a474e
Show file tree
Hide file tree
Showing 11 changed files with 469 additions and 247 deletions.
9 changes: 8 additions & 1 deletion synapseclient/api/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@

from typing import TYPE_CHECKING, Optional
from synapseclient import Synapse
from opentelemetry import context

if TYPE_CHECKING:
from synapseclient.models import Annotations


def set_annotations(
annotations: "Annotations", synapse_client: Optional[Synapse] = None
annotations: "Annotations",
synapse_client: Optional[Synapse] = None,
opentelemetry_context: Optional[context.Context] = None,
):
"""Call to synapse and set the annotations for the given input.
:param annotations: The annotations to set. This is expected to have the id, etag, and annotations filled in.
:param synapse_client: If not passed in or None this will use the last client from the `.login()` method.
:param opentelemetry_context: OpenTelemetry context to propogate to this function to use for tracing. Used
cases where concurrent operations need to be linked to parent spans.
:return: _description_
"""
annotations_dict = asdict(annotations)
Expand All @@ -34,4 +40,5 @@ def set_annotations(
return Synapse.get_client(synapse_client=synapse_client).restPUT(
f"/entity/{annotations.id}/annotations2",
body=json.dumps(filtered_dict),
opentelemetry_context=opentelemetry_context,
)
262 changes: 152 additions & 110 deletions synapseclient/client.py

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions synapseclient/models/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
from synapseclient.api import set_annotations
from opentelemetry import trace
from opentelemetry import trace, context

from synapseclient import Synapse

Expand Down Expand Up @@ -63,10 +63,13 @@ async def store(
print(f"Storing annotations for id: {self.id}, etag: {self.etag}")
with tracer.start_as_current_span(f"Annotation_store: {self.id}"):
loop = asyncio.get_event_loop()
current_context = context.get_current()
result = await loop.run_in_executor(
None,
lambda: set_annotations(
annotations=self, synapse_client=synapse_client
annotations=self,
synapse_client=synapse_client,
opentelemetry_context=current_context,
),
)
print(f"annotations store for {self.id} complete")
Expand Down
14 changes: 10 additions & 4 deletions synapseclient/models/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class File:
"""The date this entity was created."""

modified_on: Optional[str] = None
""""The date this entity was last modified."""
"""The date this entity was last modified."""

created_by: Optional[str] = None
"""The ID of the user that created this entity."""
Expand Down Expand Up @@ -125,7 +125,9 @@ async def store(
:param synapse_client: If not passed in or None this will use the last client from the `.login()` method.
:return: The file object.
"""
with tracer.start_as_current_span(f"File_Store: {self.path}"):
with tracer.start_as_current_span(
f"File_Store: {self.path if self.path else self.id}"
):
# TODO - We need to add in some validation before the store to verify we have enough
# information to store the data

Expand All @@ -138,10 +140,11 @@ async def store(
parent=parent.id if parent else self.parent_id,
)
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).store(
obj=synapse_file, opentelemetry_context=context.get_current()
obj=synapse_file, opentelemetry_context=current_context
),
)

Expand Down Expand Up @@ -179,13 +182,14 @@ async def get(
"""
with tracer.start_as_current_span(f"File_Get: {self.id}"):
loop = asyncio.get_event_loop()
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).get(
entity=self.id,
downloadFile=download_file,
downloadLocation=download_location,
opentelemetry_context=current_context,
),
)

Expand All @@ -199,9 +203,11 @@ async def delete(self, synapse_client: Optional[Synapse] = None) -> None:
"""
with tracer.start_as_current_span(f"File_Delete: {self.id}"):
loop = asyncio.get_event_loop()
current_context = context.get_current()
await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).delete(
obj=self.id,
opentelemetry_context=current_context,
),
)
10 changes: 7 additions & 3 deletions synapseclient/models/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ async def store(
synapse_folder = Synapse_Folder(
self.name, parent=parent.id if parent else self.parent_id
)
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).store(
obj=synapse_folder, opentelemetry_context=context.get_current()
obj=synapse_folder, opentelemetry_context=current_context
),
)

Expand Down Expand Up @@ -175,11 +175,12 @@ async def get(
"""
with tracer.start_as_current_span(f"Folder_Get: {self.id}"):
loop = asyncio.get_event_loop()
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).get(
entity=self.id,
opentelemetry_context=current_context,
),
)

Expand All @@ -194,6 +195,7 @@ async def get(
).getChildren(
parent=self.id,
includeTypes=["folder", "file"],
opentelemetry_context=current_context,
),
)

Expand Down Expand Up @@ -230,9 +232,11 @@ async def delete(self, synapse_client: Optional[Synapse] = None):
"""
with tracer.start_as_current_span(f"Folder_Delete: {self.id}"):
loop = asyncio.get_event_loop()
current_context = context.get_current()
await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).delete(
obj=self.id,
opentelemetry_context=current_context,
),
)
10 changes: 7 additions & 3 deletions synapseclient/models/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ async def store(self, synapse_client: Optional[Synapse] = None):
# Call synapse
loop = asyncio.get_event_loop()
synapse_project = Synapse_Project(self.name)
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).store(
obj=synapse_project, opentelemetry_context=context.get_current()
obj=synapse_project, opentelemetry_context=current_context
),
)
self.convert_from_api_parameters(
Expand Down Expand Up @@ -160,11 +160,12 @@ async def get(
"""
with tracer.start_as_current_span(f"Project_Get: {self.id}"):
loop = asyncio.get_event_loop()
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).get(
entity=self.id,
opentelemetry_context=current_context,
),
)

Expand All @@ -179,6 +180,7 @@ async def get(
).getChildren(
parent=self.id,
includeTypes=["folder", "file"],
opentelemetry_context=current_context,
),
)

Expand Down Expand Up @@ -215,9 +217,11 @@ async def delete(self, synapse_client: Optional[Synapse] = None) -> None:
"""
with tracer.start_as_current_span(f"Project_Delete: {self.id}"):
loop = asyncio.get_event_loop()
current_context = context.get_current()
await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).delete(
obj=self.id,
opentelemetry_context=current_context,
),
)
32 changes: 19 additions & 13 deletions synapseclient/models/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
delete_rows,
)
from synapseclient.models import Annotations, AnnotationsValue
from opentelemetry import trace
from opentelemetry import trace, context


tracer = trace.get_tracer("synapseclient")
Expand Down Expand Up @@ -312,11 +312,13 @@ async def store(self, synapse_client: Optional[Synapse] = None):

# Call synapse
loop = asyncio.get_event_loop()
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).createColumn(
name=self.name, columnType=self.column_type
name=self.name,
columnType=self.column_type,
opentelemetry_context=current_context,
),
)
print(entity)
Expand Down Expand Up @@ -433,11 +435,11 @@ async def store_rows_from_csv(
with tracer.start_as_current_span(f"Store_rows_by_csv: {csv_path}"):
synapse_table = Synapse_Table(schema=self.id, values=csv_path)
loop = asyncio.get_event_loop()
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).store(
obj=synapse_table
obj=synapse_table, opentelemetry_context=current_context
),
)
print(entity)
Expand All @@ -452,13 +454,14 @@ async def delete_rows(
for row in rows:
rows_to_delete.append([row.row_id, row.version_number])
loop = asyncio.get_event_loop()
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
await loop.run_in_executor(
None,
lambda: delete_rows(
syn=Synapse.get_client(synapse_client=synapse_client),
table_id=self.id,
row_id_vers_list=rows_to_delete,
opentelemetry_context=current_context,
),
)

Expand Down Expand Up @@ -497,11 +500,11 @@ async def store_schema(self, synapse_client: Optional[Synapse] = None) -> "Table
)

loop = asyncio.get_event_loop()
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).store(
obj=synapse_schema
obj=synapse_schema, opentelemetry_context=current_context
),
)

Expand Down Expand Up @@ -547,11 +550,11 @@ async def get(self, synapse_client: Optional[Synapse] = None) -> "Table":
# TODO: How do we want to support retriving the table? Do we want to support by name, and parent?
with tracer.start_as_current_span(f"Table_Get: {self.name}"):
loop = asyncio.get_event_loop()
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).get(
entity=self.id
entity=self.id, opentelemetry_context=current_context
),
)
self.convert_from_api_parameters(synapse_table=entity, set_annotations=True)
Expand All @@ -566,10 +569,11 @@ async def delete(self, synapse_client: Optional[Synapse] = None) -> None:
"""
with tracer.start_as_current_span(f"Table_Delete: {self.name}"):
loop = asyncio.get_event_loop()
current_context = context.get_current()
await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).delete(
obj=self.id
obj=self.id, opentelemetry_context=current_context
),
)

Expand All @@ -582,13 +586,15 @@ async def query(
) -> Union[Synapse_CsvFileTable, Synaspe_TableQueryResult]:
with tracer.start_as_current_span("Table_query"):
loop = asyncio.get_event_loop()
# TODO: Propogating OTEL context is not working in this case
current_context = context.get_current()

# TODO: Future Idea - We stream back a CSV, and let those reading this to handle the CSV however they want
results = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).tableQuery(
query=query, **result_format.convert_into_api_parameters()
query=query,
**result_format.convert_into_api_parameters(),
opentelemetry_context=current_context,
),
)
print(results)
Expand Down
26 changes: 18 additions & 8 deletions synapseclient/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@
from synapseclient.core.models.dict_object import DictObject
from .entity import Entity, entity_type_to_class
from synapseclient.core.constants import concrete_types
from opentelemetry import trace


tracer = trace.get_tracer("synapseclient")

aggregate_pattern = re.compile(r"(count|max|min|avg|sum)\((.+)\)")

Expand Down Expand Up @@ -745,20 +749,26 @@ def _delete_rows(syn, schema, row_id_vers_list):


def delete_rows(
syn, table_id: str, row_id_vers_list: typing.List[typing.Tuple[int, int]]
syn,
table_id: str,
row_id_vers_list: typing.List[typing.Tuple[int, int]],
opentelemetry_context=None,
):
"""
Deletes rows from a synapse table
:param syn: an instance of py:class:`synapseclient.client.Synapse`
:param row_id_vers_list: an iterable containing tuples with format: (row_id, row_version)
"""
delete_row_csv_filepath = _create_row_delete_csv(
row_id_vers_iterable=row_id_vers_list
)
try:
syn._uploadCsv(filepath=delete_row_csv_filepath, schema=table_id)
finally:
os.remove(delete_row_csv_filepath)
with tracer.start_as_current_span(
"Synapse::delete_rows", context=opentelemetry_context
):
delete_row_csv_filepath = _create_row_delete_csv(
row_id_vers_iterable=row_id_vers_list
)
try:
syn._uploadCsv(filepath=delete_row_csv_filepath, schema=table_id)
finally:
os.remove(delete_row_csv_filepath)


class SchemaBase(Entity, metaclass=abc.ABCMeta):
Expand Down
Loading

0 comments on commit 86a474e

Please sign in to comment.