Skip to content

Commit

Permalink
aggregate parser logic into single object
Browse files Browse the repository at this point in the history
  • Loading branch information
fumito-ito committed Mar 23, 2024
1 parent c7991ab commit 9c0c1e6
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 48 deletions.
49 changes: 1 addition & 48 deletions Sources/AnthropicSwiftSDK/Messages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public struct Messages {
)
}

// swiftlint:disable:next cyclomatic_complexity
public func streamMessage(
_ messages: [Message],
model: Model = .claude_3_Opus,
Expand Down Expand Up @@ -158,52 +157,6 @@ public struct Messages {
throw AnthropicAPIError(fromHttpStatusCode: httpResponse.statusCode)
}

return AsyncThrowingStream.init { continuation in
let task = Task {
var currentEvent: StreamingEvent?
for try await line in data.lines {
do {
let lineType = try StreamingResponseParser.parse(line: line)
switch lineType {
case .empty:
break
case .event:
currentEvent = try StreamingEventLineParser.parse(eventLine: line)
case .data:
guard let currentEvent = currentEvent else {
break
}

switch currentEvent {
case .ping:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingPingResponse)
case .messageStart:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingMessageStartResponse)
case .messageDelta:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingMessageDeltaResponse)
case .messageStop:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingMessageStopResponse)
case .contentBlockStart:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingContentBlockStartResponse)
case .contentBlockDelta:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingContentBlockDeltaResponse)
case .contentBlockStop:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingContentBlockStopResponse)
case .error:
let data = try StreamingDataLineParser.parse(dataLine: line) as StreamingErrorResponse
continuation.finish(throwing: data.error.type)
}
}
} catch let error {
continuation.finish(throwing: error)
}
}

continuation.finish()
}
continuation.onTermination = { @Sendable _ in
task.cancel()
}
}
return try await AnthropicStreamingParser.parse(stream: data.lines)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//
// AnthropicStreamingParser.swift
//
//
// Created by 伊藤史 on 2024/03/24.
//

import Foundation

public enum AnthropicStreamingParser {
public static func parse<T: AsyncSequence>(stream: T) async throws -> AsyncThrowingStream<StreamingResponse, Error> where T.Element == String {
return AsyncThrowingStream.init { continuation in
let task = Task {
var currentEvent: StreamingEvent?
for try await line in stream {
do {
let lineType = try StreamingResponseParser.parse(line: line)
switch lineType {
case .empty:
break
case .event:
currentEvent = try StreamingEventLineParser.parse(eventLine: line)
case .data:
guard let currentEvent = currentEvent else {
break
}

switch currentEvent {
case .ping:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingPingResponse)
case .messageStart:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingMessageStartResponse)
case .messageDelta:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingMessageDeltaResponse)
case .messageStop:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingMessageStopResponse)
case .contentBlockStart:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingContentBlockStartResponse)
case .contentBlockDelta:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingContentBlockDeltaResponse)
case .contentBlockStop:
continuation.yield(try StreamingDataLineParser.parse(dataLine: line) as StreamingContentBlockStopResponse)
case .error:
let data = try StreamingDataLineParser.parse(dataLine: line) as StreamingErrorResponse
continuation.finish(throwing: data.error.type)
}
}
} catch let error {
continuation.finish(throwing: error)
}
}

continuation.finish()
}
continuation.onTermination = { @Sendable _ in
task.cancel()
}
}
}
}

0 comments on commit 9c0c1e6

Please sign in to comment.