From 7258a8da2e32704c156c11ef611cc7961d085681 Mon Sep 17 00:00:00 2001 From: Alexander Smarus Date: Mon, 25 Dec 2023 14:39:24 +0200 Subject: [PATCH 1/2] Make _MultiHandle timeout timer non-repeatable CURL documentation (https://curl.se/libcurl/c/CURLMOPT_TIMERFUNCTION.html) explicitly says that the timer should be one-time. We basically have to follow CURL requests for setting, resetting and disarming such timers. Current logic eventually leaves a 1ms repeating timer forever, because CURL assumes it fires once, and may not ask us to remove it explicitly. Also, being used as request timeout trigger, this timer also has no sense to be repeated. --- .../URLSession/libcurl/MultiHandle.swift | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift b/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift index 19bda9723d..6196b2fa39 100644 --- a/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift +++ b/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift @@ -359,7 +359,7 @@ class _TimeoutSource { let delay = UInt64(max(1, milliseconds - 1)) let start = DispatchTime.now() + DispatchTimeInterval.milliseconds(Int(delay)) - rawSource.schedule(deadline: start, repeating: .milliseconds(Int(delay)), leeway: (milliseconds == 1) ? .microseconds(Int(1)) : .milliseconds(Int(1))) + rawSource.schedule(deadline: start, repeating: .never, leeway: (milliseconds == 1) ? .microseconds(Int(1)) : .milliseconds(Int(1))) rawSource.setEventHandler(handler: handler) rawSource.resume() } @@ -384,13 +384,12 @@ fileprivate extension URLSession._MultiHandle { timeoutSource = nil queue.async { self.timeoutTimerFired() } case .milliseconds(let milliseconds): - if (timeoutSource == nil) || timeoutSource!.milliseconds != milliseconds { - //TODO: Could simply change the existing timer by using DispatchSourceTimer again. - let block = DispatchWorkItem { [weak self] in - self?.timeoutTimerFired() - } - timeoutSource = _TimeoutSource(queue: queue, milliseconds: milliseconds, handler: block) + //TODO: Could simply change the existing timer by using DispatchSourceTimer again. + let block = DispatchWorkItem { [weak self] in + self?.timeoutTimerFired() } + // Note: Previous timer instance would cancel internal Dispatch timer in deinit + timeoutSource = _TimeoutSource(queue: queue, milliseconds: milliseconds, handler: block) } } enum _Timeout { From f9a54f3afa4ca4adfba88d6e6c48c0120c3022a8 Mon Sep 17 00:00:00 2001 From: Alexander Smarus Date: Mon, 25 Dec 2023 15:10:26 +0200 Subject: [PATCH 2/2] Cancel DispatchSource before closing socket (#4791) Extends socket lifetime enough to let DispatchSource cancel properly. Also prevents from creating new DispatchSources while other are in the middle of cancelling. Also includes tests (see #4854 for test details). --- .../URL.subproj/CFURLSessionInterface.c | 4 + .../URL.subproj/CFURLSessionInterface.h | 2 + .../URLSession/libcurl/MultiHandle.swift | 152 ++++++++++++++++-- Tests/Foundation/HTTPServer.swift | 54 +++++-- Tests/Foundation/Tests/TestURLSession.swift | 106 +++++++++++- 5 files changed, 290 insertions(+), 28 deletions(-) diff --git a/CoreFoundation/URL.subproj/CFURLSessionInterface.c b/CoreFoundation/URL.subproj/CFURLSessionInterface.c index 6226a3f5ce..6a30127b81 100644 --- a/CoreFoundation/URL.subproj/CFURLSessionInterface.c +++ b/CoreFoundation/URL.subproj/CFURLSessionInterface.c @@ -111,6 +111,10 @@ CFURLSessionEasyCode CFURLSession_easy_setopt_tc(CFURLSessionEasyHandle _Nonnull return MakeEasyCode(curl_easy_setopt(curl, option.value, a)); } +CFURLSessionEasyCode CFURLSession_easy_setopt_scl(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionCloseSocketCallback * _Nullable a) { + return MakeEasyCode(curl_easy_setopt(curl, option.value, a)); +} + CFURLSessionEasyCode CFURLSession_easy_getinfo_long(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, long *_Nonnull a) { return MakeEasyCode(curl_easy_getinfo(curl, info.value, a)); } diff --git a/CoreFoundation/URL.subproj/CFURLSessionInterface.h b/CoreFoundation/URL.subproj/CFURLSessionInterface.h index d760177041..3e219f8003 100644 --- a/CoreFoundation/URL.subproj/CFURLSessionInterface.h +++ b/CoreFoundation/URL.subproj/CFURLSessionInterface.h @@ -625,6 +625,8 @@ typedef int (CFURLSessionSeekCallback)(void *_Nullable userp, long long offset, CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_seek(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionSeekCallback * _Nullable a); typedef int (CFURLSessionTransferInfoCallback)(void *_Nullable userp, long long dltotal, long long dlnow, long long ultotal, long long ulnow); CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_tc(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionTransferInfoCallback * _Nullable a); +typedef int (CFURLSessionCloseSocketCallback)(void *_Nullable clientp, CFURLSession_socket_t item); +CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_scl(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionCloseSocketCallback * _Nullable a); CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_getinfo_long(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, long *_Nonnull a); CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_getinfo_double(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, double *_Nonnull a); diff --git a/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift b/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift index 6196b2fa39..b58748dc9a 100644 --- a/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift +++ b/Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift @@ -45,6 +45,7 @@ extension URLSession { let queue: DispatchQueue let group = DispatchGroup() fileprivate var easyHandles: [_EasyHandle] = [] + fileprivate var socketReferences: [CFURLSession_socket_t: _SocketReference] = [:] fileprivate var timeoutSource: _TimeoutSource? = nil private var reentrantInUpdateTimeoutTimer = false @@ -127,13 +128,14 @@ fileprivate extension URLSession._MultiHandle { if let opaque = socketSourcePtr { Unmanaged<_SocketSources>.fromOpaque(opaque).release() } + socketSources?.tearDown(handle: self, socket: socket, queue: queue) socketSources = nil } if let ss = socketSources { let handler = DispatchWorkItem { [weak self] in self?.performAction(for: socket) } - ss.createSources(with: action, socket: socket, queue: queue, handler: handler) + ss.createSources(with: action, handle: self, socket: socket, queue: queue, handler: handler) } return 0 } @@ -161,9 +163,104 @@ extension Collection where Element == _EasyHandle { } } +private extension URLSession._MultiHandle { + class _SocketReference { + let socket: CFURLSession_socket_t + var shouldClose: Bool + var workItem: DispatchWorkItem? + + init(socket: CFURLSession_socket_t) { + self.socket = socket + shouldClose = false + } + + deinit { + if shouldClose { + #if os(Windows) + closesocket(socket) + #else + close(socket) + #endif + } + } + } + + /// Creates and stores socket reference. Reentrancy is not supported. + /// Trying to begin operation for same socket twice would mean something + /// went horribly wrong, or our assumptions about CURL register/unregister + /// action flow are nor correct. + func beginOperation(for socket: CFURLSession_socket_t) -> _SocketReference { + let reference = _SocketReference(socket: socket) + precondition(socketReferences.updateValue(reference, forKey: socket) == nil, "Reentrancy is not supported for socket operations") + return reference + } + + /// Removes socket reference from the shared store. If there is work item scheduled, + /// executes it on the current thread. + func endOperation(for socketReference: _SocketReference) { + precondition(socketReferences.removeValue(forKey: socketReference.socket) != nil, "No operation associated with the socket") + if let workItem = socketReference.workItem, !workItem.isCancelled { + // CURL never asks for socket close without unregistering first, and + // we should cancel pending work when unregister action is requested. + precondition(!socketReference.shouldClose, "Socket close was scheduled, but there is some pending work left") + workItem.perform() + } + } + + /// Marks this reference to close socket on deinit. This allows us + /// to extend socket lifecycle by keeping the reference alive. + func scheduleClose(for socket: CFURLSession_socket_t) { + let reference = socketReferences[socket] ?? _SocketReference(socket: socket) + reference.shouldClose = true + } + + /// Schedules work to be performed when an operation ends for the socket, + /// or performs it immediately if there is no operation in progress. + /// + /// We're using this to postpone Dispatch Source creation when + /// previous Dispatch Source is not cancelled yet. + func schedule(_ workItem: DispatchWorkItem, for socket: CFURLSession_socket_t) { + guard let socketReference = socketReferences[socket] else { + workItem.perform() + return + } + // CURL never asks for register without pairing it with unregister later, + // and we're cancelling pending work item on unregister. + // But it is safe to just drop existing work item anyway, + // and replace it with the new one. + socketReference.workItem = workItem + } + + /// Cancels pending work for socket operation. Does nothing if + /// there is no operation in progress or no pending work item. + /// + /// CURL may become not interested in Dispatch Sources + /// we have planned to create. In this case we should just cancel + /// scheduled work. + func cancelWorkItem(for socket: CFURLSession_socket_t) { + guard let socketReference = socketReferences[socket] else { + return + } + socketReference.workItem?.cancel() + socketReference.workItem = nil + } + +} + internal extension URLSession._MultiHandle { /// Add an easy handle -- start its transfer. func add(_ handle: _EasyHandle) { + // Set CLOSESOCKETFUNCTION. Note that while the option belongs to easy_handle, + // the connection cache is managed by CURL multi_handle, and sockets can actually + // outlive easy_handle (even after curl_easy_cleanup call). That's why + // socket management lives in _MultiHandle. + try! CFURLSession_easy_setopt_ptr(handle.rawHandle, CFURLSessionOptionCLOSESOCKETDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError() + try! CFURLSession_easy_setopt_scl(handle.rawHandle, CFURLSessionOptionCLOSESOCKETFUNCTION) { (clientp: UnsafeMutableRawPointer?, item: CFURLSession_socket_t) in + guard let handle = URLSession._MultiHandle.from(callbackUserData: clientp) else { fatalError() } + handle.scheduleClose(for: item) + return 0 + }.asError() + // If this is the first handle being added, we need to `kick` the // underlying multi handle by calling `timeoutTimerFired` as // described in @@ -448,25 +545,56 @@ fileprivate class _SocketSources { s.resume() } - func tearDown() { - if let s = readSource { - s.cancel() + func tearDown(handle: URLSession._MultiHandle, socket: CFURLSession_socket_t, queue: DispatchQueue) { + handle.cancelWorkItem(for: socket) // There could be pending register action which needs to be cancelled + + guard readSource != nil || writeSource != nil else { + // This means that we have posponed (and already abandoned) + // sources creation. + return } - readSource = nil - if let s = writeSource { - s.cancel() + + // Socket is guaranteed to not to be closed as long as we keeping + // the reference. + let socketReference = handle.beginOperation(for: socket) + let cancelHandlerGroup = DispatchGroup() + [readSource, writeSource].compactMap({ $0 }).forEach { source in + cancelHandlerGroup.enter() + source.setCancelHandler { + cancelHandlerGroup.leave() + } + source.cancel() + } + cancelHandlerGroup.notify(queue: queue) { + handle.endOperation(for: socketReference) } + + readSource = nil writeSource = nil } } extension _SocketSources { /// Create a read and/or write source as specified by the action. - func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) { - if action.needsReadSource { - createReadSource(socket: socket, queue: queue, handler: handler) + func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, handle: URLSession._MultiHandle, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) { + // CURL casually requests to unregister and register handlers for same + // socket in a row. There is (pretty low) chance of overlapping tear-down operation + // with "register" request. Bad things could happen if we create + // a new Dispatch Source while other is being cancelled for the same socket. + // We're using `_MultiHandle.schedule(_:for:)` here to postpone sources creation until + // pending operation is finished (if there is none, submitted work item is performed + // immediately). + // Also, CURL may request unregister even before we perform any postponed work, + // so we have to cancel such work in such case. See + let createSources = DispatchWorkItem { + if action.needsReadSource { + self.createReadSource(socket: socket, queue: queue, handler: handler) + } + if action.needsWriteSource { + self.createWriteSource(socket: socket, queue: queue, handler: handler) + } } - if action.needsWriteSource { - createWriteSource(socket: socket, queue: queue, handler: handler) + if action.needsReadSource || action.needsWriteSource { + handle.schedule(createSources, for: socket) } } } diff --git a/Tests/Foundation/HTTPServer.swift b/Tests/Foundation/HTTPServer.swift index 5af9fb9c52..0ab0f0dc71 100644 --- a/Tests/Foundation/HTTPServer.swift +++ b/Tests/Foundation/HTTPServer.swift @@ -99,7 +99,7 @@ class _TCPSocket: CustomStringConvertible { listening = false } - init(port: UInt16?) throws { + init(port: UInt16?, backlog: Int32) throws { listening = true self.port = 0 @@ -124,7 +124,7 @@ class _TCPSocket: CustomStringConvertible { try socketAddress.withMemoryRebound(to: sockaddr.self, capacity: MemoryLayout.size, { let addr = UnsafePointer($0) _ = try attempt("bind", valid: isZero, bind(_socket, addr, socklen_t(MemoryLayout.size))) - _ = try attempt("listen", valid: isZero, listen(_socket, SOMAXCONN)) + _ = try attempt("listen", valid: isZero, listen(_socket, backlog)) }) var actualSA = sockaddr_in() @@ -295,8 +295,8 @@ class _HTTPServer: CustomStringConvertible { let tcpSocket: _TCPSocket var port: UInt16 { tcpSocket.port } - init(port: UInt16?) throws { - tcpSocket = try _TCPSocket(port: port) + init(port: UInt16?, backlog: Int32 = SOMAXCONN) throws { + tcpSocket = try _TCPSocket(port: port, backlog: backlog) } init(socket: _TCPSocket) { @@ -1094,6 +1094,14 @@ enum InternalServerError : Error { case badHeaders } +extension LoopbackServerTest { + struct Options { + var serverBacklog: Int32 + var isAsynchronous: Bool + + static let `default` = Options(serverBacklog: SOMAXCONN, isAsynchronous: true) + } +} class LoopbackServerTest : XCTestCase { private static let staticSyncQ = DispatchQueue(label: "org.swift.TestFoundation.HTTPServer.StaticSyncQ") @@ -1101,8 +1109,17 @@ class LoopbackServerTest : XCTestCase { private static var _serverPort: Int = -1 private static var _serverActive = false private static var testServer: _HTTPServer? = nil - - + private static var _options: Options = .default + + static var options: Options { + get { + return staticSyncQ.sync { _options } + } + set { + staticSyncQ.sync { _options = newValue } + } + } + static var serverPort: Int { get { return staticSyncQ.sync { _serverPort } @@ -1119,12 +1136,20 @@ class LoopbackServerTest : XCTestCase { override class func setUp() { super.setUp() + Self.startServer() + } + override class func tearDown() { + Self.stopServer() + super.tearDown() + } + + static func startServer() { var _serverPort = 0 let dispatchGroup = DispatchGroup() func runServer() throws { - testServer = try _HTTPServer(port: nil) + testServer = try _HTTPServer(port: nil, backlog: options.serverBacklog) _serverPort = Int(testServer!.port) serverActive = true dispatchGroup.leave() @@ -1132,7 +1157,8 @@ class LoopbackServerTest : XCTestCase { while serverActive { do { let httpServer = try testServer!.listen() - globalDispatchQueue.async { + + func handleRequest() { let subServer = TestURLSessionServer(httpServer: httpServer) do { try subServer.readAndRespond() @@ -1140,6 +1166,12 @@ class LoopbackServerTest : XCTestCase { NSLog("readAndRespond: \(error)") } } + + if options.isAsynchronous { + globalDispatchQueue.async(execute: handleRequest) + } else { + handleRequest() + } } catch { if (serverActive) { // Ignore errors thrown on shutdown NSLog("httpServer: \(error)") @@ -1165,11 +1197,11 @@ class LoopbackServerTest : XCTestCase { fatalError("Timedout waiting for server to be ready") } serverPort = _serverPort + debugLog("Listening on \(serverPort)") } - - override class func tearDown() { + + static func stopServer() { serverActive = false try? testServer?.stop() - super.tearDown() } } diff --git a/Tests/Foundation/Tests/TestURLSession.swift b/Tests/Foundation/Tests/TestURLSession.swift index 8c04855589..19118ee80b 100644 --- a/Tests/Foundation/Tests/TestURLSession.swift +++ b/Tests/Foundation/Tests/TestURLSession.swift @@ -495,21 +495,104 @@ class TestURLSession: LoopbackServerTest { waitForExpectations(timeout: 30) } - func test_timeoutInterval() { + func test_httpTimeout() { let config = URLSessionConfiguration.default config.timeoutIntervalForRequest = 10 - let urlString = "http://127.0.0.1:-1/Peru" + let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Peru" let session = URLSession(configuration: config, delegate: nil, delegateQueue: nil) let expect = expectation(description: "GET \(urlString): will timeout") - var req = URLRequest(url: URL(string: "http://127.0.0.1:-1/Peru")!) + var req = URLRequest(url: URL(string: urlString)!) + req.setValue("3", forHTTPHeaderField: "x-pause") req.timeoutInterval = 1 let task = session.dataTask(with: req) { (data, _, error) -> Void in defer { expect.fulfill() } - XCTAssertNotNil(error) + XCTAssertEqual((error as? URLError)?.code, .timedOut, "Task should fail with URLError.timedOut error") } task.resume() + waitForExpectations(timeout: 30) + } + + func test_connectTimeout() { + // Reconfigure http server for this specific scenario: + // a slow request keeps web server busy, while other + // request times out on connection attempt. + Self.stopServer() + Self.options = Options(serverBacklog: 1, isAsynchronous: false) + Self.startServer() + + let config = URLSessionConfiguration.default + let slowUrlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Peru" + let fastUrlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Italy" + let session = URLSession(configuration: config, delegate: nil, delegateQueue: nil) + let slowReqExpect = expectation(description: "GET \(slowUrlString): will complete") + let fastReqExpect = expectation(description: "GET \(fastUrlString): will timeout") + + var slowReq = URLRequest(url: URL(string: slowUrlString)!) + slowReq.setValue("3", forHTTPHeaderField: "x-pause") + + var fastReq = URLRequest(url: URL(string: fastUrlString)!) + fastReq.timeoutInterval = 1 + + let slowTask = session.dataTask(with: slowReq) { (data, _, error) -> Void in + slowReqExpect.fulfill() + } + let fastTask = session.dataTask(with: fastReq) { (data, _, error) -> Void in + defer { fastReqExpect.fulfill() } + XCTAssertEqual((error as? URLError)?.code, .timedOut, "Task should fail with URLError.timedOut error") + } + slowTask.resume() + Thread.sleep(forTimeInterval: 0.1) // Give slow task some time to start + fastTask.resume() waitForExpectations(timeout: 30) + + // Reconfigure http server back to default settings + Self.stopServer() + Self.options = .default + Self.startServer() + } + + func test_repeatedRequestsStress() throws { + // TODO: try disabling curl connection cache to force socket close early. Or create several url sessions (they have cleanup in deinit) + + let config = URLSessionConfiguration.default + let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Peru" + let session = URLSession(configuration: config, delegate: nil, delegateQueue: nil) + let req = URLRequest(url: URL(string: urlString)!) + + var requestsLeft = 3000 + let expect = expectation(description: "\(requestsLeft) x GET \(urlString)") + + func doRequests(completion: @escaping () -> Void) { + // We only care about completion of one of the tasks, + // so we could move to next cycle. + // Some overlapping would happen and that's what we + // want actually to provoke issue with socket reuse + // on Windows. + let task = session.dataTask(with: req) { (_, _, _) -> Void in + } + task.resume() + let task2 = session.dataTask(with: req) { (_, _, _) -> Void in + } + task2.resume() + let task3 = session.dataTask(with: req) { (_, _, _) -> Void in + completion() + } + task3.resume() + } + + func checkCountAndRunNext() { + guard requestsLeft > 0 else { + expect.fulfill() + return + } + requestsLeft -= 1 + doRequests(completion: checkCountAndRunNext) + } + + checkCountAndRunNext() + + waitForExpectations(timeout: 30) } func test_httpRedirectionWithCode300() throws { @@ -2049,7 +2132,6 @@ class TestURLSession: LoopbackServerTest { ("test_taskTimeout", test_taskTimeout), ("test_verifyRequestHeaders", test_verifyRequestHeaders), ("test_verifyHttpAdditionalHeaders", test_verifyHttpAdditionalHeaders), - ("test_timeoutInterval", test_timeoutInterval), ("test_httpRedirectionWithCode300", test_httpRedirectionWithCode300), ("test_httpRedirectionWithCode301_302", test_httpRedirectionWithCode301_302), ("test_httpRedirectionWithCode303", test_httpRedirectionWithCode303), @@ -2098,6 +2180,7 @@ class TestURLSession: LoopbackServerTest { /* ⚠️ */ testExpectedToFail(test_noDoubleCallbackWhenCancellingAndProtocolFailsFast, "This test crashes nondeterministically: https://bugs.swift.org/browse/SR-11310")), /* ⚠️ */ ("test_cancelledTasksCannotBeResumed", testExpectedToFail(test_cancelledTasksCannotBeResumed, "Breaks on Ubuntu 18.04")), ] + #if NS_FOUNDATION_ALLOWS_TESTABLE_IMPORT if #available(macOS 12.0, *) { retVal.append(contentsOf: [ ("test_webSocket", asyncTest(test_webSocket)), @@ -2106,6 +2189,19 @@ class TestURLSession: LoopbackServerTest { ("test_webSocketSemiAbruptClose", asyncTest(test_webSocketSemiAbruptClose)), ]) } + #endif + #if os(Windows) + retVal.append(contentsOf: [ + ("test_httpTimeout", test_httpTimeout), + ("test_connectTimeout", test_connectTimeout), + ("test_repeatedRequestsStress", testExpectedToFail(test_repeatedRequestsStress, "Crashes with high probability")), + ]) + #else + retVal.append(contentsOf: [ + ("test_httpTimeout", test_httpTimeout), + ("test_connectTimeout", test_connectTimeout), + ]) + #endif return retVal }