From 8df4d181b6b81f75488928e795f2b7e732e88dac Mon Sep 17 00:00:00 2001 From: Alexander Smarus Date: Mon, 25 Dec 2023 15:10:26 +0200 Subject: [PATCH] Cancel DispatchSource before closing socket (#4791) Extends socket lifetime enough to let DispatchSource cancel properly. Also prevents from creating new DispatchSources while other are in the middle of cancelling. Also includes tests (see #4854 for test details). --- .../URL.subproj/CFURLSessionInterface.c | 4 + .../URL.subproj/CFURLSessionInterface.h | 2 + .../URLSession/libcurl/MultiHandle.swift | 153 ++++++++++++++++-- Tests/Foundation/HTTPServer.swift | 54 +++++-- Tests/Foundation/Tests/TestURLSession.swift | 106 +++++++++++- 5 files changed, 291 insertions(+), 28 deletions(-) diff --git a/CoreFoundation/URL.subproj/CFURLSessionInterface.c b/CoreFoundation/URL.subproj/CFURLSessionInterface.c index 6226a3f5cea..6a30127b812 100644 --- a/CoreFoundation/URL.subproj/CFURLSessionInterface.c +++ b/CoreFoundation/URL.subproj/CFURLSessionInterface.c @@ -111,6 +111,10 @@ CFURLSessionEasyCode CFURLSession_easy_setopt_tc(CFURLSessionEasyHandle _Nonnull return MakeEasyCode(curl_easy_setopt(curl, option.value, a)); } +CFURLSessionEasyCode CFURLSession_easy_setopt_scl(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionCloseSocketCallback * _Nullable a) { + return MakeEasyCode(curl_easy_setopt(curl, option.value, a)); +} + CFURLSessionEasyCode CFURLSession_easy_getinfo_long(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, long *_Nonnull a) { return MakeEasyCode(curl_easy_getinfo(curl, info.value, a)); } diff --git a/CoreFoundation/URL.subproj/CFURLSessionInterface.h b/CoreFoundation/URL.subproj/CFURLSessionInterface.h index d7601770411..3e219f8003f 100644 --- a/CoreFoundation/URL.subproj/CFURLSessionInterface.h +++ b/CoreFoundation/URL.subproj/CFURLSessionInterface.h @@ -625,6 +625,8 @@ typedef int (CFURLSessionSeekCallback)(void *_Nullable userp, long long offset, CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_seek(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionSeekCallback * _Nullable a); typedef int (CFURLSessionTransferInfoCallback)(void *_Nullable userp, long long dltotal, long long dlnow, long long ultotal, long long ulnow); CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_tc(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionTransferInfoCallback * _Nullable a); +typedef int (CFURLSessionCloseSocketCallback)(void *_Nullable clientp, CFURLSession_socket_t item); +CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_scl(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionCloseSocketCallback * _Nullable a); CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_getinfo_long(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, long *_Nonnull a); CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_getinfo_double(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, double *_Nonnull a); diff --git a/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift b/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift index 6196b2fa39e..c480e50a021 100644 --- a/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift +++ b/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift @@ -45,6 +45,7 @@ extension URLSession { let queue: DispatchQueue let group = DispatchGroup() fileprivate var easyHandles: [_EasyHandle] = [] + fileprivate var socketReferences: [CFURLSession_socket_t: _SocketReference] = [:] fileprivate var timeoutSource: _TimeoutSource? = nil private var reentrantInUpdateTimeoutTimer = false @@ -127,13 +128,14 @@ fileprivate extension URLSession._MultiHandle { if let opaque = socketSourcePtr { Unmanaged<_SocketSources>.fromOpaque(opaque).release() } + socketSources?.tearDown(handle: self, socket: socket, queue: queue) socketSources = nil } if let ss = socketSources { let handler = DispatchWorkItem { [weak self] in self?.performAction(for: socket) } - ss.createSources(with: action, socket: socket, queue: queue, handler: handler) + ss.createSources(with: action, handle: self, socket: socket, queue: queue, handler: handler) } return 0 } @@ -161,9 +163,104 @@ extension Collection where Element == _EasyHandle { } } +private extension URLSession._MultiHandle { + class _SocketReference { + let socket: CFURLSession_socket_t + var shouldClose: Bool + var workItem: DispatchWorkItem? + + init(socket: CFURLSession_socket_t) { + self.socket = socket + shouldClose = false + } + + deinit { + if shouldClose { + #if os(Windows) + closesocket(socket) + #else + close(socket) + #endif + } + } + } + + /// Creates and stores socket reference. Reentrancy is not supported. + /// Trying to begin operation for same socket twice would mean something + /// went horribly wrong, or our assumptions about CURL register/unregister + /// action flow are nor correct. + func beginOperation(for socket: CFURLSession_socket_t) -> _SocketReference { + let reference = _SocketReference(socket: socket) + precondition(socketReferences.updateValue(reference, forKey: socket) == nil, "Reentrancy is not supported for socket operations") + return reference + } + + /// Removes socket reference from the shared store. If there is work item scheduled, + /// executes it on the current thread. + func endOperation(for socketReference: _SocketReference) { + precondition(socketReferences.removeValue(forKey: socketReference.socket) != nil, "No operation associated with the socket") + if let workItem = socketReference.workItem, !workItem.isCancelled { + // CURL never asks for socket close without unregistering first, and + // we should cancel pending work when unregister action is requested. + precondition(!socketReference.shouldClose, "Socket close was scheduled, but there is some pending work left") + workItem.perform() + } + } + + /// Marks this reference to close socket on deinit. This allows us + /// to extend socket lifecycle by keeping the reference alive. + func scheduleClose(for socket: CFURLSession_socket_t) { + let reference = socketReferences[socket] ?? _SocketReference(socket: socket) + reference.shouldClose = true + } + + /// Schedules work to be performed when an operation ends for the socket, + /// or performs it immediately if there is no operation in progress. + /// + /// We're using this to postpone Dispatch Source creation when + /// previous Dispatch Source is not cancelled yet. + func schedule(_ workItem: DispatchWorkItem, for socket: CFURLSession_socket_t) { + guard let socketReference = socketReferences[socket] else { + workItem.perform() + return + } + // CURL never asks for register without pairing it with unregister later, + // and we're cancelling pending work item on unregister. + // But it is safe to just drop existing work item anyway, + // and replace it with the new one. + socketReference.workItem = workItem + } + + /// Cancels pending work for socket operation. Does nothing if + /// there is no operation in progress or no pending work item. + /// + /// CURL may become not interested in Dispatch Sources + /// we have planned to create. In this case we should just cancel + /// scheduled work. + func cancelWorkItem(for socket: CFURLSession_socket_t) { + guard let socketReference = socketReferences[socket] else { + return + } + socketReference.workItem?.cancel() + socketReference.workItem = nil + } + +} + internal extension URLSession._MultiHandle { /// Add an easy handle -- start its transfer. func add(_ handle: _EasyHandle) { + // Set CLOSESOCKETFUNCTION. Note that while the option belongs to easy_handle, + // the connection cache is managed by CURL multi_handle, and sockets can actually + // outlive easy_handle (even after curl_easy_cleanup call). That's why + // socket management lives in _MultiHandle. + try! CFURLSession_easy_setopt_ptr(handle.rawHandle, CFURLSessionOptionCLOSESOCKETDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError() + try! CFURLSession_easy_setopt_scl(handle.rawHandle, CFURLSessionOptionCLOSESOCKETFUNCTION) { (clientp: UnsafeMutableRawPointer?, item: CFURLSession_socket_t) in + guard let handle = URLSession._MultiHandle.from(callbackUserData: clientp) else { fatalError() } + handle.scheduleClose(for: item) + return 0 + }.asError() + // If this is the first handle being added, we need to `kick` the // underlying multi handle by calling `timeoutTimerFired` as // described in @@ -426,6 +523,7 @@ fileprivate class _SocketSources { func createReadSource(socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) { guard readSource == nil else { return } + #if os(Windows) let s = DispatchSource.makeReadSource(handle: HANDLE(bitPattern: Int(socket))!, queue: queue) #else @@ -448,25 +546,56 @@ fileprivate class _SocketSources { s.resume() } - func tearDown() { - if let s = readSource { - s.cancel() + func tearDown(handle: URLSession._MultiHandle, socket: CFURLSession_socket_t, queue: DispatchQueue) { + handle.cancelWorkItem(for: socket) // There could be pending register action which needs to be cancelled + + guard readSource != nil, writeSource != nil else { + // This means that we have posponed (and already abandoned) + // sources creation. + return } - readSource = nil - if let s = writeSource { - s.cancel() + + // Socket is guaranteed to not to be closed as long as we keeping + // the reference. + let socketReference = handle.beginOperation(for: socket) + let cancelHandlerGroup = DispatchGroup() + [readSource, writeSource].compactMap({ $0 }).forEach { source in + cancelHandlerGroup.enter() + source.setCancelHandler { + cancelHandlerGroup.leave() + } + source.cancel() + } + cancelHandlerGroup.notify(queue: queue) { + handle.endOperation(for: socketReference) } + + readSource = nil writeSource = nil } } extension _SocketSources { /// Create a read and/or write source as specified by the action. - func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) { - if action.needsReadSource { - createReadSource(socket: socket, queue: queue, handler: handler) + func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, handle: URLSession._MultiHandle, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) { + // CURL casually requests to unregister and register handlers for same + // socket in a row. There is (pretty low) chance of overlapping tear-down operation + // with "register" request. Bad things could happen if we create + // a new Dispatch Source while other is being cancelled for the same socket. + // We're using `_MultiHandle.schedule(_:for:)` here to postpone sources creation until + // pending operation is finished (if there is none, submitted work item is performed + // immediately). + // Also, CURL may request unregister even before we perform any postponed work, + // so we have to cancel such work in such case. See + let createSources = DispatchWorkItem { + if action.needsReadSource { + self.createReadSource(socket: socket, queue: queue, handler: handler) + } + if action.needsWriteSource { + self.createWriteSource(socket: socket, queue: queue, handler: handler) + } } - if action.needsWriteSource { - createWriteSource(socket: socket, queue: queue, handler: handler) + if (action.needsReadSource || action.needsWriteSource) { + handle.schedule(createSources, for: socket) } } } diff --git a/Tests/Foundation/HTTPServer.swift b/Tests/Foundation/HTTPServer.swift index 5af9fb9c526..0ab0f0dc71e 100644 --- a/Tests/Foundation/HTTPServer.swift +++ b/Tests/Foundation/HTTPServer.swift @@ -99,7 +99,7 @@ class _TCPSocket: CustomStringConvertible { listening = false } - init(port: UInt16?) throws { + init(port: UInt16?, backlog: Int32) throws { listening = true self.port = 0 @@ -124,7 +124,7 @@ class _TCPSocket: CustomStringConvertible { try socketAddress.withMemoryRebound(to: sockaddr.self, capacity: MemoryLayout.size, { let addr = UnsafePointer($0) _ = try attempt("bind", valid: isZero, bind(_socket, addr, socklen_t(MemoryLayout.size))) - _ = try attempt("listen", valid: isZero, listen(_socket, SOMAXCONN)) + _ = try attempt("listen", valid: isZero, listen(_socket, backlog)) }) var actualSA = sockaddr_in() @@ -295,8 +295,8 @@ class _HTTPServer: CustomStringConvertible { let tcpSocket: _TCPSocket var port: UInt16 { tcpSocket.port } - init(port: UInt16?) throws { - tcpSocket = try _TCPSocket(port: port) + init(port: UInt16?, backlog: Int32 = SOMAXCONN) throws { + tcpSocket = try _TCPSocket(port: port, backlog: backlog) } init(socket: _TCPSocket) { @@ -1094,6 +1094,14 @@ enum InternalServerError : Error { case badHeaders } +extension LoopbackServerTest { + struct Options { + var serverBacklog: Int32 + var isAsynchronous: Bool + + static let `default` = Options(serverBacklog: SOMAXCONN, isAsynchronous: true) + } +} class LoopbackServerTest : XCTestCase { private static let staticSyncQ = DispatchQueue(label: "org.swift.TestFoundation.HTTPServer.StaticSyncQ") @@ -1101,8 +1109,17 @@ class LoopbackServerTest : XCTestCase { private static var _serverPort: Int = -1 private static var _serverActive = false private static var testServer: _HTTPServer? = nil - - + private static var _options: Options = .default + + static var options: Options { + get { + return staticSyncQ.sync { _options } + } + set { + staticSyncQ.sync { _options = newValue } + } + } + static var serverPort: Int { get { return staticSyncQ.sync { _serverPort } @@ -1119,12 +1136,20 @@ class LoopbackServerTest : XCTestCase { override class func setUp() { super.setUp() + Self.startServer() + } + override class func tearDown() { + Self.stopServer() + super.tearDown() + } + + static func startServer() { var _serverPort = 0 let dispatchGroup = DispatchGroup() func runServer() throws { - testServer = try _HTTPServer(port: nil) + testServer = try _HTTPServer(port: nil, backlog: options.serverBacklog) _serverPort = Int(testServer!.port) serverActive = true dispatchGroup.leave() @@ -1132,7 +1157,8 @@ class LoopbackServerTest : XCTestCase { while serverActive { do { let httpServer = try testServer!.listen() - globalDispatchQueue.async { + + func handleRequest() { let subServer = TestURLSessionServer(httpServer: httpServer) do { try subServer.readAndRespond() @@ -1140,6 +1166,12 @@ class LoopbackServerTest : XCTestCase { NSLog("readAndRespond: \(error)") } } + + if options.isAsynchronous { + globalDispatchQueue.async(execute: handleRequest) + } else { + handleRequest() + } } catch { if (serverActive) { // Ignore errors thrown on shutdown NSLog("httpServer: \(error)") @@ -1165,11 +1197,11 @@ class LoopbackServerTest : XCTestCase { fatalError("Timedout waiting for server to be ready") } serverPort = _serverPort + debugLog("Listening on \(serverPort)") } - - override class func tearDown() { + + static func stopServer() { serverActive = false try? testServer?.stop() - super.tearDown() } } diff --git a/Tests/Foundation/Tests/TestURLSession.swift b/Tests/Foundation/Tests/TestURLSession.swift index 8c048555892..19118ee80b1 100644 --- a/Tests/Foundation/Tests/TestURLSession.swift +++ b/Tests/Foundation/Tests/TestURLSession.swift @@ -495,21 +495,104 @@ class TestURLSession: LoopbackServerTest { waitForExpectations(timeout: 30) } - func test_timeoutInterval() { + func test_httpTimeout() { let config = URLSessionConfiguration.default config.timeoutIntervalForRequest = 10 - let urlString = "http://127.0.0.1:-1/Peru" + let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Peru" let session = URLSession(configuration: config, delegate: nil, delegateQueue: nil) let expect = expectation(description: "GET \(urlString): will timeout") - var req = URLRequest(url: URL(string: "http://127.0.0.1:-1/Peru")!) + var req = URLRequest(url: URL(string: urlString)!) + req.setValue("3", forHTTPHeaderField: "x-pause") req.timeoutInterval = 1 let task = session.dataTask(with: req) { (data, _, error) -> Void in defer { expect.fulfill() } - XCTAssertNotNil(error) + XCTAssertEqual((error as? URLError)?.code, .timedOut, "Task should fail with URLError.timedOut error") } task.resume() + waitForExpectations(timeout: 30) + } + + func test_connectTimeout() { + // Reconfigure http server for this specific scenario: + // a slow request keeps web server busy, while other + // request times out on connection attempt. + Self.stopServer() + Self.options = Options(serverBacklog: 1, isAsynchronous: false) + Self.startServer() + + let config = URLSessionConfiguration.default + let slowUrlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Peru" + let fastUrlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Italy" + let session = URLSession(configuration: config, delegate: nil, delegateQueue: nil) + let slowReqExpect = expectation(description: "GET \(slowUrlString): will complete") + let fastReqExpect = expectation(description: "GET \(fastUrlString): will timeout") + + var slowReq = URLRequest(url: URL(string: slowUrlString)!) + slowReq.setValue("3", forHTTPHeaderField: "x-pause") + + var fastReq = URLRequest(url: URL(string: fastUrlString)!) + fastReq.timeoutInterval = 1 + + let slowTask = session.dataTask(with: slowReq) { (data, _, error) -> Void in + slowReqExpect.fulfill() + } + let fastTask = session.dataTask(with: fastReq) { (data, _, error) -> Void in + defer { fastReqExpect.fulfill() } + XCTAssertEqual((error as? URLError)?.code, .timedOut, "Task should fail with URLError.timedOut error") + } + slowTask.resume() + Thread.sleep(forTimeInterval: 0.1) // Give slow task some time to start + fastTask.resume() waitForExpectations(timeout: 30) + + // Reconfigure http server back to default settings + Self.stopServer() + Self.options = .default + Self.startServer() + } + + func test_repeatedRequestsStress() throws { + // TODO: try disabling curl connection cache to force socket close early. Or create several url sessions (they have cleanup in deinit) + + let config = URLSessionConfiguration.default + let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Peru" + let session = URLSession(configuration: config, delegate: nil, delegateQueue: nil) + let req = URLRequest(url: URL(string: urlString)!) + + var requestsLeft = 3000 + let expect = expectation(description: "\(requestsLeft) x GET \(urlString)") + + func doRequests(completion: @escaping () -> Void) { + // We only care about completion of one of the tasks, + // so we could move to next cycle. + // Some overlapping would happen and that's what we + // want actually to provoke issue with socket reuse + // on Windows. + let task = session.dataTask(with: req) { (_, _, _) -> Void in + } + task.resume() + let task2 = session.dataTask(with: req) { (_, _, _) -> Void in + } + task2.resume() + let task3 = session.dataTask(with: req) { (_, _, _) -> Void in + completion() + } + task3.resume() + } + + func checkCountAndRunNext() { + guard requestsLeft > 0 else { + expect.fulfill() + return + } + requestsLeft -= 1 + doRequests(completion: checkCountAndRunNext) + } + + checkCountAndRunNext() + + waitForExpectations(timeout: 30) } func test_httpRedirectionWithCode300() throws { @@ -2049,7 +2132,6 @@ class TestURLSession: LoopbackServerTest { ("test_taskTimeout", test_taskTimeout), ("test_verifyRequestHeaders", test_verifyRequestHeaders), ("test_verifyHttpAdditionalHeaders", test_verifyHttpAdditionalHeaders), - ("test_timeoutInterval", test_timeoutInterval), ("test_httpRedirectionWithCode300", test_httpRedirectionWithCode300), ("test_httpRedirectionWithCode301_302", test_httpRedirectionWithCode301_302), ("test_httpRedirectionWithCode303", test_httpRedirectionWithCode303), @@ -2098,6 +2180,7 @@ class TestURLSession: LoopbackServerTest { /* ⚠️ */ testExpectedToFail(test_noDoubleCallbackWhenCancellingAndProtocolFailsFast, "This test crashes nondeterministically: https://bugs.swift.org/browse/SR-11310")), /* ⚠️ */ ("test_cancelledTasksCannotBeResumed", testExpectedToFail(test_cancelledTasksCannotBeResumed, "Breaks on Ubuntu 18.04")), ] + #if NS_FOUNDATION_ALLOWS_TESTABLE_IMPORT if #available(macOS 12.0, *) { retVal.append(contentsOf: [ ("test_webSocket", asyncTest(test_webSocket)), @@ -2106,6 +2189,19 @@ class TestURLSession: LoopbackServerTest { ("test_webSocketSemiAbruptClose", asyncTest(test_webSocketSemiAbruptClose)), ]) } + #endif + #if os(Windows) + retVal.append(contentsOf: [ + ("test_httpTimeout", test_httpTimeout), + ("test_connectTimeout", test_connectTimeout), + ("test_repeatedRequestsStress", testExpectedToFail(test_repeatedRequestsStress, "Crashes with high probability")), + ]) + #else + retVal.append(contentsOf: [ + ("test_httpTimeout", test_httpTimeout), + ("test_connectTimeout", test_connectTimeout), + ]) + #endif return retVal }