Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYNSD-1233] Adding in a try/catch for HTTP 409 conflicts on entity creation #1131

Draft
wants to merge 6 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions synapseclient/api/entity_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
<https://rest-docs.synapse.org/rest/#org.sagebionetworks.repo.web.controller.EntityController>
"""

import asyncio
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from async_lru import alru_cache

from synapseclient.core.exceptions import SynapseHTTPError
from synapseclient.core.retry import RETRYABLE_CONNECTION_EXCEPTIONS_NO_READ_ISSUES

if TYPE_CHECKING:
from synapseclient import Synapse

Expand Down Expand Up @@ -36,9 +40,45 @@ async def post_entity(
params = {}
if generated_by:
params["generatedBy"] = generated_by
return await client.rest_post_async(
uri="/entity", body=json.dumps(request), params=params
)

try:
return await client.rest_post_async(
uri="/entity",
body=json.dumps(request),
params=params,
retry_policy={
"retry_exceptions": RETRYABLE_CONNECTION_EXCEPTIONS_NO_READ_ISSUES
},
)
except SynapseHTTPError:
if "name" in request and "parentId" in request:
loop = asyncio.get_event_loop()
entity_id = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).findEntityId(
name=request["name"],
parent=request["parentId"],
),
)
if not entity_id:
raise

client.logger.warning(
"This is a temporary exception to recieving an HTTP 409 conflict error - Retrieving the state of the object in Synapse. If an error is not printed after this, assume the operation was successful (SYNSD-1233)."
)
existing_instance = await get_entity(
entity_id, synapse_client=synapse_client
)
# Loop over the request params and if any of them do not match the existing instance, raise the error
for key, value in request.items():
if key not in existing_instance or existing_instance[key] != value:
client.logger.error(
f"Failed temporary HTTP 409 logic check because {key} not in instance in Synapse or value does not match: [Existing: {existing_instance[key]}, Expected: {value}]."
)
raise
return existing_instance
else:
raise


async def put_entity(
Expand Down
1 change: 1 addition & 0 deletions synapseclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6299,6 +6299,7 @@ async def _rest_call_async(
Returns:
JSON encoding of response
"""
self.logger.debug(f"Sending {method} request to {uri}")
uri, headers = self._build_uri_and_headers(
uri, endpoint=endpoint, headers=headers
)
Expand Down
15 changes: 15 additions & 0 deletions synapseclient/core/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@
"SSLZeroReturnError",
]

RETRYABLE_CONNECTION_EXCEPTIONS_NO_READ_ISSUES = [
"ChunkedEncodingError",
"ConnectionError",
"ConnectionResetError",
"Timeout",
"timeout",
# HTTPX Specific connection exceptions:
"RemoteProtocolError",
"TimeoutException",
"ConnectError",
"ConnectTimeout",
# SSL Specific exceptions:
"SSLZeroReturnError",
]

DEBUG_EXCEPTION = "calling %s resulted in an Exception"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import os
import uuid
from typing import Callable
from unittest.mock import patch
from unittest.mock import AsyncMock, patch

import httpcore
import pytest
from httpcore._backends.anyio import AnyIOStream
from pytest_mock import MockerFixture

from synapseclient import Synapse
from synapseclient.core import utils
Expand Down Expand Up @@ -82,6 +85,82 @@ async def test_store_in_project(self, project_model: Project, file: File) -> Non
assert file.file_handle.key is not None
assert file.file_handle.external_url is None

async def test_store_in_project_with_read_error(
self,
project_model: Project,
file: File,
mocker: MockerFixture,
) -> None:
# GIVEN a file
file.name = str(uuid.uuid4())

# AND spy/mocks to simulate a read error
spy_tls_stream = mocker.spy(httpcore._backends.anyio.AnyIOStream, "start_tls")
logger_warning_spy = mocker.spy(self.syn.logger, "warning")
original_read = httpcore._backends.anyio.AnyIOStream.read
call_count = 0

async def new_read(*args, **kwargs) -> bytes:
nonlocal call_count
async_network_stream: httpcore.AsyncNetworkStream = (
spy_tls_stream.spy_return
)
call_count = call_count + 1
# This is very brittle, however, I do not see data to determine what the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 Does that mean if we add functions called in the async stream that we would have edit this call_count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if more or less calls occur during the store_async function we would need to adjust this call count.

This specific way of replacing the read isn't working on GH actions. Though I did confirm the logic is working on my local machine. This needs to be fixed, but more digging into this is needed. I can get the hostname/IP addresses used, but being this "far" down in the libraries and working directly in httpcore, I lose a lot of context like request headers and the uri.

# read is. In this case the 7th call is going to be the HTTP POST to create
# the entity.
if call_count == 7:
return b""
return await original_read(async_network_stream, *args, **kwargs)

# WHEN I store the file
with patch(
"httpcore._backends.anyio.AnyIOStream.read",
new_callable=AsyncMock,
side_effect=new_read,
) as mock_read:
mock_read.return_value = b""
file_copy_object = await file.store_async(
parent=project_model, synapse_client=self.syn
)
self.schedule_for_cleanup(file.id)

# THEN I expect the file to be stored
assert file.id is not None
assert file_copy_object.id is not None
assert file_copy_object == file
assert file.parent_id == project_model.id
assert file.content_type == CONTENT_TYPE
assert file.version_comment == VERSION_COMMENT
assert file.version_label is not None
assert file.version_number == 1
assert file.created_by is not None
assert file.created_on is not None
assert file.modified_by is not None
assert file.modified_on is not None
assert file.data_file_handle_id is not None
assert file.file_handle is not None
assert file.file_handle.id is not None
assert file.file_handle.etag is not None
assert file.file_handle.created_by is not None
assert file.file_handle.created_on is not None
assert file.file_handle.modified_on is not None
assert file.file_handle.concrete_type is not None
assert file.file_handle.content_type is not None
assert file.file_handle.content_md5 is not None
assert file.file_handle.file_name is not None
assert file.file_handle.storage_location_id is not None
assert file.file_handle.content_size is not None
assert file.file_handle.status is not None
assert file.file_handle.bucket_name is not None
assert file.file_handle.key is not None
assert file.file_handle.external_url is None

# AND the temporary warning is present:
logger_warning_spy.assert_called_once_with(
"This is a temporary exception to recieving an HTTP 409 conflict error - Retrieving the state of the object in Synapse. If an error is not printed after this, assume the operation was successful (SYNSD-1233)."
)

async def test_activity_store_then_delete(
self, project_model: Project, file: File
) -> None:
Expand Down
Loading