Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions RxSwift/Observables/Multicast.swift
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ private final class ConnectableObservableAdapter<Subject: SubjectType>:
}
}

enum RefCountSinkRunResult {
case alreadyDisposed
case nothingToDo
case connectionToCreate(SingleAssignmentDisposable)
}

private final class RefCountSink<ConnectableSource: ConnectableObservableType, Observer: ObserverType>:
Sink<Observer>,
ObserverType where ConnectableSource.Element == Observer.Element
Expand All @@ -264,19 +270,34 @@ private final class RefCountSink<ConnectableSource: ConnectableObservableType, O

func run() -> Disposable {
let subscription = parent.source.subscribe(self)
parent.lock.lock(); defer { self.parent.lock.unlock() }

let runResult = parent.lock.withLock { () -> RefCountSinkRunResult in
connectionIdSnapshot = parent.connectionId

connectionIdSnapshot = parent.connectionId
if isDisposed {
return .alreadyDisposed
}

if isDisposed {
return Disposables.create()
if parent.count == 0 {
parent.count = 1
let disposable = SingleAssignmentDisposable()
parent.connectableSubscription = disposable
return .connectionToCreate(disposable)
} else {
parent.count += 1
return .nothingToDo
}
}

if parent.count == 0 {
parent.count = 1
parent.connectableSubscription = parent.source.connect()
} else {
parent.count += 1
switch runResult {
case .nothingToDo:
break
case .alreadyDisposed:
return Disposables.create()
case .connectionToCreate(let singleAssignmentDisposable):
singleAssignmentDisposable.setDisposable(
parent.source.connect()
)
}

return Disposables.create {
Expand Down
Loading