Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 101 additions & 26 deletions RxSwift/Observables/Zip+Collection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,30 @@ private final class ZipCollectionTypeSink<Collection: Swift.Collection, Observer
typealias Parent = ZipCollectionType<Collection, Result>
typealias SourceElement = Collection.Element.Element

private struct Iterate: InvocableType {
let action: () -> Void

func invoke() {
action()
}
}

private enum CalloutAction {
case none
case disposeSubscription(SingleAssignmentDisposable)
case evaluate([SourceElement])
case forwardCompletedAndDispose
case forwardErrorAndDispose(Swift.Error)
}

private let parent: Parent

// Serialize event processing while keeping observer and disposal callouts out of `lock`.
private let gate = AsyncLock<Iterate>()
private let lock = RecursiveLock()

// state
private var isStopped = false
private var numberOfValues = 0
private var values: [Queue<SourceElement>]
private var isDone: [Bool]
Expand All @@ -68,7 +87,22 @@ private final class ZipCollectionTypeSink<Collection: Swift.Collection, Observer
}

func on(_ event: Event<SourceElement>, atIndex: Int) {
lock.lock(); defer { self.lock.unlock() }
gate.invoke(Iterate(action: { self.synchronized_on(event, atIndex: atIndex) }))
}

private func synchronized_on(_ event: Event<SourceElement>, atIndex: Int) {
let action = lock.performLocked {
nextAction(for: event, atIndex: atIndex)
}

perform(action)
}

private func nextAction(for event: Event<SourceElement>, atIndex: Int) -> CalloutAction {
if isStopped || isDisposed {
return .none
}

switch event {
case let .next(element):
values[atIndex].enqueue(element)
Expand All @@ -79,52 +113,93 @@ private final class ZipCollectionTypeSink<Collection: Swift.Collection, Observer

if numberOfValues < parent.count {
if numberOfDone == parent.count - 1 {
forwardOn(.completed)
dispose()
isStopped = true
return .forwardCompletedAndDispose
}
return
return .none
}

do {
var arguments = [SourceElement]()
arguments.reserveCapacity(parent.count)
var arguments = [SourceElement]()
arguments.reserveCapacity(parent.count)

// recalculate number of values
numberOfValues = 0
// recalculate number of values
numberOfValues = 0

for i in 0 ..< values.count {
arguments.append(values[i].dequeue()!)
if !values[i].isEmpty {
numberOfValues += 1
}
for i in 0 ..< values.count {
arguments.append(values[i].dequeue()!)
if !values[i].isEmpty {
numberOfValues += 1
}

let result = try parent.resultSelector(arguments)
forwardOn(.next(result))
} catch {
forwardOn(.error(error))
dispose()
}

return .evaluate(arguments)

case let .error(error):
forwardOn(.error(error))
dispose()
isStopped = true
return .forwardErrorAndDispose(error)

case .completed:
if isDone[atIndex] {
return
return .none
}

isDone[atIndex] = true
numberOfDone += 1

if numberOfDone == parent.count {
forwardOn(.completed)
dispose()
isStopped = true
return .forwardCompletedAndDispose
} else {
subscriptions[atIndex].dispose()
return .disposeSubscription(subscriptions[atIndex])
}
}
}

private func perform(_ action: CalloutAction) {
switch action {
case .none:
return
case let .disposeSubscription(subscription):
subscription.dispose()
case let .evaluate(arguments):
if isDisposed {
return
}

do {
let result = try parent.resultSelector(arguments)
forwardOn(.next(result))
} catch {
forwardErrorAndDispose(error)
}
case .forwardCompletedAndDispose:
forwardOn(.completed)
dispose()
case let .forwardErrorAndDispose(error):
forwardOn(.error(error))
dispose()
}
}

private func forwardErrorAndDispose(_ error: Swift.Error) {
let shouldTerminate = lock.performLocked { () -> Bool in
if isStopped || isDisposed {
return false
}

isStopped = true
return true
}

if shouldTerminate {
forwardOn(.error(error))
dispose()
}
}

override func dispose() {
gate.dispose()
super.dispose()
}

func run() -> Disposable {
Expand Down
Loading