diff --git a/RxSwift/Observables/Zip+Collection.swift b/RxSwift/Observables/Zip+Collection.swift index 7b831235d..637d02e3f 100644 --- a/RxSwift/Observables/Zip+Collection.swift +++ b/RxSwift/Observables/Zip+Collection.swift @@ -42,11 +42,30 @@ private final class ZipCollectionTypeSink typealias SourceElement = Collection.Element.Element + private struct Iterate: InvocableType { + let action: () -> Void + + func invoke() { + action() + } + } + + private enum CalloutAction { + case none + case disposeSubscription(SingleAssignmentDisposable) + case evaluate([SourceElement]) + case forwardCompletedAndDispose + case forwardErrorAndDispose(Swift.Error) + } + private let parent: Parent + // Serialize event processing while keeping observer and disposal callouts out of `lock`. + private let gate = AsyncLock() private let lock = RecursiveLock() // state + private var isStopped = false private var numberOfValues = 0 private var values: [Queue] private var isDone: [Bool] @@ -68,7 +87,22 @@ private final class ZipCollectionTypeSink, atIndex: Int) { - lock.lock(); defer { self.lock.unlock() } + gate.invoke(Iterate(action: { self.synchronized_on(event, atIndex: atIndex) })) + } + + private func synchronized_on(_ event: Event, atIndex: Int) { + let action = lock.performLocked { + nextAction(for: event, atIndex: atIndex) + } + + perform(action) + } + + private func nextAction(for event: Event, atIndex: Int) -> CalloutAction { + if isStopped || isDisposed { + return .none + } + switch event { case let .next(element): values[atIndex].enqueue(element) @@ -79,52 +113,93 @@ private final class ZipCollectionTypeSink Bool in + if isStopped || isDisposed { + return false + } + + isStopped = true + return true } + + if shouldTerminate { + forwardOn(.error(error)) + dispose() + } + } + + override func dispose() { + gate.dispose() + super.dispose() } func run() -> Disposable {