From 2255ee04ff2cf8c8dadf5c3051b2a1e285866e76 Mon Sep 17 00:00:00 2001 From: NachoSoto Date: Fri, 1 Jul 2022 15:38:34 -0700 Subject: [PATCH] [skip ci] WIP --- Sources/Action.swift | 3 +- Sources/Atomic.swift | 10 ++- Sources/Bag.swift | 4 +- Sources/Disposable.swift | 23 +++--- Sources/Event.swift | 20 ++--- Sources/EventLogger.swift | 33 ++++---- Sources/Flatten.swift | 43 +++++----- Sources/FoundationExtensions.swift | 2 +- Sources/Lifetime.swift | 4 +- Sources/Observers/AttemptMap.swift | 12 +-- Sources/Observers/Collect.swift | 16 ++-- Sources/Observers/CollectEvery.swift | 2 +- Sources/Observers/CombinePrevious.swift | 10 +-- Sources/Observers/CompactMap.swift | 12 +-- Sources/Observers/Debounce.swift | 2 +- Sources/Observers/Delay.swift | 2 +- Sources/Observers/Dematerialize.swift | 10 +-- Sources/Observers/DematerializeResults.swift | 10 +-- Sources/Observers/Filter.swift | 12 +-- Sources/Observers/LazyMap.swift | 4 +- Sources/Observers/Map.swift | 12 +-- Sources/Observers/MapError.swift | 12 +-- Sources/Observers/Materialize.swift | 10 +-- Sources/Observers/MaterializeAsResult.swift | 10 +-- Sources/Observers/Observer.swift | 10 ++- Sources/Observers/Reduce.swift | 10 +-- Sources/Observers/ScanMap.swift | 12 +-- Sources/Observers/SkipFirst.swift | 10 +-- Sources/Observers/SkipRepeats.swift | 10 +-- Sources/Observers/SkipWhile.swift | 10 +-- Sources/Observers/TakeFirst.swift | 10 +-- Sources/Observers/TakeLast.swift | 10 +-- Sources/Observers/TakeUntil.swift | 12 +-- Sources/Observers/TakeWhile.swift | 12 +-- Sources/Observers/Throttle.swift | 2 +- Sources/Observers/UnaryAsyncOperator.swift | 12 ++- Sources/Observers/UniqueValues.swift | 10 +-- Sources/Optional.swift | 2 +- Sources/Property.swift | 4 +- Sources/Scheduler.swift | 6 +- Sources/Signal.Observer.swift | 10 ++- Sources/Signal.swift | 82 ++++++++++---------- Sources/SignalProducer.swift | 2 +- 43 files changed, 266 insertions(+), 248 deletions(-) diff --git a/Sources/Action.swift b/Sources/Action.swift index b7c4076a3..3f542e3e7 100644 --- a/Sources/Action.swift +++ b/Sources/Action.swift @@ -116,7 +116,8 @@ public final class Action { let isEnabled = MutableProperty(actionState.value.isEnabled) self.isEnabled = Property(capturing: isEnabled) - func modifyActionState(_ action: (inout ActionState) throws -> Result) rethrows -> Result { + @Sendable + func modifyActionState(_ action: @Sendable (inout ActionState) throws -> Result) rethrows -> Result { return try actionState.begin { storage in let oldState = storage.value defer { diff --git a/Sources/Atomic.swift b/Sources/Atomic.swift index 96fd6775b..6778e35e8 100644 --- a/Sources/Atomic.swift +++ b/Sources/Atomic.swift @@ -14,7 +14,7 @@ import MachO /// A simple, generic lock-free finite state machine. /// /// - warning: `deinitialize` must be called to dispose of the consumed memory. -internal struct UnsafeAtomicState where State.RawValue == Int32 { +internal struct UnsafeAtomicState: Sendable where State.RawValue == Int32 { internal typealias Transition = (expected: State, next: State) #if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) private let value: UnsafeMutablePointer @@ -29,6 +29,7 @@ internal struct UnsafeAtomicState where State.RawValue } /// Deinitialize the finite state machine. + @Sendable internal func deinitialize() { value.deinitialize(count: 1) value.deallocate() @@ -104,7 +105,8 @@ internal struct UnsafeAtomicState where State.RawValue /// `Lock` exposes `os_unfair_lock` on supported platforms, with pthread mutex as the /// fallback. -internal class Lock: LockProtocol { +// TODO: unckecked? subclass? +internal class Lock: LockProtocol, @unchecked Sendable { #if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) @available(iOS 10.0, *) @available(macOS 10.12, *) @@ -212,7 +214,7 @@ internal class Lock: LockProtocol { func `try`() -> Bool { fatalError() } } -internal protocol LockProtocol { +internal protocol LockProtocol: Sendable { static func make() -> Self func lock() @@ -229,7 +231,7 @@ internal struct NoLock: LockProtocol { } /// An atomic variable. -public final class Atomic { +public final class Atomic: @unchecked Sendable { private let lock: Lock private var _value: Value diff --git a/Sources/Bag.swift b/Sources/Bag.swift index e38b92b04..f2e9a5079 100644 --- a/Sources/Bag.swift +++ b/Sources/Bag.swift @@ -10,7 +10,7 @@ public struct Bag { /// A uniquely identifying token for removing a value that was inserted into a /// Bag. - public struct Token { + public struct Token: Sendable { fileprivate let value: UInt64 } @@ -97,3 +97,5 @@ extension Bag: RandomAccessCollection { } } } + +extension Bag: Sendable where Element: Sendable {} diff --git a/Sources/Disposable.swift b/Sources/Disposable.swift index 4930c83cf..0d1569bc0 100644 --- a/Sources/Disposable.swift +++ b/Sources/Disposable.swift @@ -8,7 +8,7 @@ /// Represents something that can be “disposed”, usually associated with freeing /// resources or canceling work. -public protocol Disposable: AnyObject { +public protocol Disposable: AnyObject, Sendable { /// Whether this disposable has been disposed already. var isDisposed: Bool { get } @@ -16,6 +16,7 @@ public protocol Disposable: AnyObject { /// been disposed of, it does nothing. /// /// - note: Implementations must issue a memory barrier. + @Sendable func dispose() } @@ -58,22 +59,22 @@ internal final class _SimpleDisposable: Disposable { /// A disposable that has already been disposed. internal final class NopDisposable: Disposable { static let shared = NopDisposable() - var isDisposed = true + let isDisposed = true func dispose() {} private init() {} } /// A type-erased disposable that forwards operations to an underlying disposable. public final class AnyDisposable: Disposable { - private final class ActionDisposable: Disposable { + private final class ActionDisposable: Disposable, @unchecked Sendable { let state: UnsafeAtomicState - var action: (() -> Void)? + var action: (@Sendable () -> Void)? var isDisposed: Bool { return state.is(.disposed) } - init(_ action: (() -> Void)?) { + init(_ action: (@Sendable () -> Void)?) { self.state = UnsafeAtomicState(.active) self.action = action } @@ -100,7 +101,7 @@ public final class AnyDisposable: Disposable { /// /// - parameters: /// - action: A closure to run when calling `dispose()`. - public init(_ action: @escaping () -> Void) { + public init(_ action: @escaping @Sendable () -> Void) { base = ActionDisposable(action) } @@ -123,7 +124,7 @@ public final class AnyDisposable: Disposable { } /// A disposable that will dispose of any number of other disposables. -public final class CompositeDisposable: Disposable { +public final class CompositeDisposable: Disposable, @unchecked Sendable { private let disposables: Atomic?> private var state: UnsafeAtomicState @@ -203,7 +204,7 @@ public final class CompositeDisposable: Disposable { /// composite has been disposed of, `disposable` has been disposed of, or /// `disposable` is `nil`. @discardableResult - public func add(_ action: @escaping () -> Void) -> Disposable? { + public func add(_ action: @escaping @Sendable () -> Void) -> Disposable? { return add(AnyDisposable(action)) } @@ -246,7 +247,7 @@ public final class CompositeDisposable: Disposable { /// - returns: An instance of `DisposableHandle` that can be used to opaquely /// remove the disposable later (if desired). @discardableResult - public static func += (lhs: CompositeDisposable, rhs: @escaping () -> Void) -> Disposable? { + public static func += (lhs: CompositeDisposable, rhs: @escaping @Sendable () -> Void) -> Disposable? { return lhs.add(rhs) } } @@ -325,7 +326,7 @@ extension ScopedDisposable where Inner == CompositeDisposable { /// - returns: An instance of `DisposableHandle` that can be used to opaquely /// remove the disposable later (if desired). @discardableResult - public static func += (lhs: ScopedDisposable, rhs: @escaping () -> Void) -> Disposable? { + public static func += (lhs: ScopedDisposable, rhs: @escaping @Sendable () -> Void) -> Disposable? { return lhs.inner.add(rhs) } } @@ -334,7 +335,7 @@ extension ScopedDisposable where Inner == CompositeDisposable { /// wrapped disposable to be replaced. public final class SerialDisposable: Disposable { private let _inner: Atomic - private var state: UnsafeAtomicState + private let state: UnsafeAtomicState public var isDisposed: Bool { return state.is(.disposed) diff --git a/Sources/Event.swift b/Sources/Event.swift index c8137938f..bc93a5eec 100644 --- a/Sources/Event.swift +++ b/Sources/Event.swift @@ -206,27 +206,27 @@ extension Signal.Event: EventProtocol { // This operator performs side effect upon interruption. extension Signal.Event { - internal typealias Transformation = (ReactiveSwift.Observer, Lifetime) -> ReactiveSwift.Observer + internal typealias Transformation = (any ReactiveSwift.Observer, Lifetime) -> (any ReactiveSwift.Observer) - internal static func filter(_ isIncluded: @escaping (Value) -> Bool) -> Transformation { + internal static func filter(_ isIncluded: @escaping @Sendable (Value) -> Bool) -> Transformation { return { downstream, _ in Operators.Filter(downstream: downstream, predicate: isIncluded) } } - internal static func compactMap(_ transform: @escaping (Value) -> U?) -> Transformation { + internal static func compactMap(_ transform: @escaping @Sendable (Value) -> U?) -> Transformation { return { downstream, _ in Operators.CompactMap(downstream: downstream, transform: transform) } } - internal static func map(_ transform: @escaping (Value) -> U) -> Transformation { + internal static func map(_ transform: @escaping @Sendable (Value) -> U) -> Transformation { return { downstream, _ in Operators.Map(downstream: downstream, transform: transform) } } - internal static func mapError(_ transform: @escaping (Error) -> E) -> Transformation { + internal static func mapError(_ transform: @escaping @Sendable (Error) -> E) -> Transformation { return { downstream, _ in Operators.MapError(downstream: downstream, transform: transform) } @@ -244,13 +244,13 @@ extension Signal.Event { } } - internal static func attemptMap(_ transform: @escaping (Value) -> Result) -> Transformation { + internal static func attemptMap(_ transform: @escaping @Sendable (Value) -> Result) -> Transformation { return { downstream, _ in Operators.AttemptMap(downstream: downstream, transform: transform) } } - internal static func attempt(_ action: @escaping (Value) -> Result<(), Error>) -> Transformation { + internal static func attempt(_ action: @escaping @Sendable (Value) -> Result<(), Error>) -> Transformation { return attemptMap { value -> Result in return action(value).map { _ in value } } @@ -285,13 +285,13 @@ extension Signal.Event { } } - internal static func take(while shouldContinue: @escaping (Value) -> Bool) -> Transformation { + internal static func take(while shouldContinue: @escaping @Sendable (Value) -> Bool) -> Transformation { return { downstream, _ in Operators.TakeWhile(downstream: downstream, shouldContinue: shouldContinue) } } - internal static func take(until shouldContinue: @escaping (Value) -> Bool) -> Transformation { + internal static func take(until shouldContinue: @escaping @Sendable (Value) -> Bool) -> Transformation { return { downstream, _ in Operators.TakeUntil(downstream: downstream, shouldContinue: shouldContinue) } @@ -397,7 +397,7 @@ extension Signal.Event { return scan(into: initialResult) { $0 = nextPartialResult($0, $1) } } - internal static func scanMap(into initialState: State, _ next: @escaping (inout State, Value) -> U) -> Transformation { + internal static func scanMap(into initialState: State, _ next: @escaping @Sendable (inout State, Value) -> U) -> Transformation { return { downstream, _ in Operators.ScanMap(downstream: downstream, initial: initialState, next: next) } diff --git a/Sources/EventLogger.swift b/Sources/EventLogger.swift index 16713d13b..4147c163d 100644 --- a/Sources/EventLogger.swift +++ b/Sources/EventLogger.swift @@ -31,7 +31,7 @@ public func defaultEventLog(identifier: String, event: String, fileName: String, /// A type that represents an event logging function. /// Signature is: -/// - identifier +/// - identifier /// - event /// - fileName /// - functionName @@ -52,14 +52,14 @@ fileprivate struct LogContext { let functionName: String let lineNumber: Int let logger: EventLogger - - func log(_ event: Event) -> ((T) -> Void)? { + + func log(_ event: Event) -> (@Sendable (T) -> Void)? { return event.logIfNeeded(events: self.events) { event in self.logger(self.identifier, event, self.fileName, self.functionName, self.lineNumber) } } - - func log(_ event: Event) -> (() -> Void)? { + + func log(_ event: Event) -> (@Sendable () -> Void)? { return event.logIfNeededNoArg(events: self.events) { event in self.logger(self.identifier, event, self.fileName, self.functionName, self.lineNumber) } @@ -67,7 +67,7 @@ fileprivate struct LogContext { } extension Signal { - /// Logs all events that the receiver sends. By default, it will print to + /// Logs all events that the receiver sends. By default, it will print to /// the standard output. /// /// - parameters: @@ -80,14 +80,21 @@ extension Signal { /// - logger: Logger that logs the events. /// /// - returns: Signal that, when observed, logs the fired events. - public func logEvents(identifier: String = "", events: Set = Set(LoggingEvent.Signal.allCases), fileName: String = #file, functionName: String = #function, lineNumber: Int = #line, logger: @escaping EventLogger = defaultEventLog) -> Signal { + public func logEvents( + identifier: String = "", + events: Set = Set(LoggingEvent.Signal.allCases), + fileName: String = #file, + functionName: String = #function, + lineNumber: Int = #line, + logger: @escaping EventLogger = defaultEventLog + ) -> Signal { let logContext = LogContext(events: events, identifier: identifier, fileName: fileName, functionName: functionName, lineNumber: lineNumber, logger: logger) - + return self.on( failed: logContext.log(.failed), completed: logContext.log(.completed), @@ -100,7 +107,7 @@ extension Signal { } extension SignalProducer { - /// Logs all events that the receiver sends. By default, it will print to + /// Logs all events that the receiver sends. By default, it will print to /// the standard output. /// /// - parameters: @@ -149,14 +156,14 @@ private extension LoggingEventProtocol { // Due to differences in the type checker, this method cannot // overload the generic `logIfNeeded`, or otherwise it would lead to // infinite recursion with Swift 4.0.x. - func logIfNeededNoArg(events: Set, logger: @escaping (String) -> Void) -> (() -> Void)? { + func logIfNeededNoArg(events: Set, logger: @escaping (String) -> Void) -> (@Sendable () -> Void)? { return (self.logIfNeeded(events: events, logger: logger) as ((()) -> Void)?) .map { closure in - { closure(()) } + { @Sendable in closure(()) } } } - - func logIfNeeded(events: Set, logger: @escaping (String) -> Void) -> ((T) -> Void)? { + + func logIfNeeded(events: Set, logger: @escaping (String) -> Void) -> (@Sendable (T) -> Void)? { guard events.contains(self) else { return nil } diff --git a/Sources/Flatten.swift b/Sources/Flatten.swift index 658d790c3..d24c00a05 100644 --- a/Sources/Flatten.swift +++ b/Sources/Flatten.swift @@ -299,14 +299,14 @@ extension SignalProducer where Value: SignalProducerConvertible, Value.Error == extension Signal where Value: Sequence { /// Flattens the `sequence` value sent by `signal`. public func flatten() -> Signal { - return self.flatMap(.merge, SignalProducer.init) + return self.flatMap(.merge) { SignalProducer($0) } } } extension SignalProducer where Value: Sequence { /// Flattens the `sequence` value sent by `signal`. public func flatten() -> SignalProducer { - return self.flatMap(.merge, SignalProducer.init) + return self.flatMap(.merge) { SignalProducer($0) } } } @@ -322,6 +322,7 @@ extension Signal where Value: SignalProducerConvertible, Error == Value.Error { fileprivate func observeConcurrent(_ observer: Signal.Observer, _ limit: UInt, _ lifetime: Lifetime) -> Disposable? { let state = Atomic(ConcurrentFlattenState(limit: limit)) + @Sendable func startNextIfNeeded() { while let producer = state.modify({ $0.dequeue() }) { let producerState = UnsafeAtomicState(.starting) @@ -763,7 +764,7 @@ extension Signal where Value: SignalProducerConvertible, Error == Value.Error { let disposableHandle = relayDisposable.add(innerDisposable) var isWinningSignal = false - innerSignal.observe { event in + innerSignal.observe { @Sendable event in if !isWinningSignal { isWinningSignal = state.modify { state in guard !state.isActivated else { @@ -965,7 +966,7 @@ extension Signal { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> SignalProducer) -> Signal{ + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> SignalProducer) -> Signal{ return map(transform).flatten(strategy) } @@ -980,7 +981,7 @@ extension Signal { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> Inner) -> Signal where Inner.Error == Error { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> Inner) -> Signal where Inner.Error == Error { return flatMap(strategy) { transform($0).producer } } @@ -995,7 +996,7 @@ extension Signal { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> SignalProducer) -> Signal { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> SignalProducer) -> Signal { return map(transform).flatten(strategy) } @@ -1010,7 +1011,7 @@ extension Signal { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> Inner) -> Signal where Inner.Error == Never { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> Inner) -> Signal where Inner.Error == Never { return flatMap(strategy) { transform($0).producer } } } @@ -1027,7 +1028,7 @@ extension Signal where Error == Never { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> SignalProducer) -> Signal { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> SignalProducer) -> Signal { return map(transform).flatten(strategy) } @@ -1042,7 +1043,7 @@ extension Signal where Error == Never { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> Inner) -> Signal { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> Inner) -> Signal { return flatMap(strategy) { transform($0).producer } } @@ -1054,7 +1055,7 @@ extension Signal where Error == Never { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> SignalProducer) -> Signal { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> SignalProducer) -> Signal { return map(transform).flatten(strategy) } @@ -1066,7 +1067,7 @@ extension Signal where Error == Never { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> Inner) -> Signal where Inner.Error == Never { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> Inner) -> Signal where Inner.Error == Never { return flatMap(strategy) { transform($0).producer } } } @@ -1083,7 +1084,7 @@ extension SignalProducer { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> SignalProducer) -> SignalProducer { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> SignalProducer) -> SignalProducer { return map(transform).flatten(strategy) } @@ -1098,7 +1099,7 @@ extension SignalProducer { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> Inner) -> SignalProducer where Inner.Error == Error { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> Inner) -> SignalProducer where Inner.Error == Error { return flatMap(strategy) { transform($0).producer } } @@ -1113,7 +1114,7 @@ extension SignalProducer { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> SignalProducer) -> SignalProducer { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> SignalProducer) -> SignalProducer { return map(transform).flatten(strategy) } @@ -1128,7 +1129,7 @@ extension SignalProducer { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> Inner) -> SignalProducer where Inner.Error == Never { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> Inner) -> SignalProducer where Inner.Error == Never { return flatMap(strategy) { transform($0).producer } } } @@ -1142,7 +1143,7 @@ extension SignalProducer where Error == Never { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> SignalProducer) -> SignalProducer { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> SignalProducer) -> SignalProducer { return map(transform).flatten(strategy) } @@ -1154,7 +1155,7 @@ extension SignalProducer where Error == Never { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> Inner) -> SignalProducer where Inner.Error == Error { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> Inner) -> SignalProducer where Inner.Error == Error { return flatMap(strategy) { transform($0).producer } } @@ -1169,7 +1170,7 @@ extension SignalProducer where Error == Never { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> SignalProducer) -> SignalProducer { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> SignalProducer) -> SignalProducer { return map(transform).flatten(strategy) } @@ -1184,7 +1185,7 @@ extension SignalProducer where Error == Never { /// - strategy: Strategy used when flattening signals. /// - transform: A closure that takes a value emitted by `self` and /// returns a signal producer with transformed value. - public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping (Value) -> Inner) -> SignalProducer { + public func flatMap(_ strategy: FlattenStrategy, _ transform: @escaping @Sendable (Value) -> Inner) -> SignalProducer { return flatMap(strategy) { transform($0).producer } } } @@ -1196,7 +1197,7 @@ extension Signal { /// - parameters: /// - transform: A closure that accepts emitted error and returns a signal /// producer with a different type of error. - public func flatMapError(_ transform: @escaping (Error) -> SignalProducer) -> Signal { + public func flatMapError(_ transform: @escaping @Sendable (Error) -> SignalProducer) -> Signal { return Signal { observer, lifetime in lifetime += self.observeFlatMapError(transform, observer, SerialDisposable()) } @@ -1208,7 +1209,7 @@ extension Signal { /// - parameters: /// - transform: A closure that accepts emitted error and returns a signal /// producer with a different type of error. - public func flatMapError(_ transform: @escaping (Error) -> Inner) -> Signal where Inner.Value == Value { + public func flatMapError(_ transform: @escaping @Sendable (Error) -> Inner) -> Signal where Inner.Value == Value { return flatMapError { transform($0).producer } } diff --git a/Sources/FoundationExtensions.swift b/Sources/FoundationExtensions.swift index d0273fa31..a47292ff0 100644 --- a/Sources/FoundationExtensions.swift +++ b/Sources/FoundationExtensions.swift @@ -74,7 +74,7 @@ extension Reactive where Base: URLSession { } } - lifetime.observeEnded(task.cancel) + lifetime.observeEnded { @Sendable in task.cancel() } task.resume() } } diff --git a/Sources/Lifetime.swift b/Sources/Lifetime.swift index 4af0c0a29..8ee2d89a1 100644 --- a/Sources/Lifetime.swift +++ b/Sources/Lifetime.swift @@ -2,7 +2,7 @@ import Foundation /// Represents the lifetime of an object, and provides a hook to observe when /// the object deinitializes. -public final class Lifetime { +public final class Lifetime: Sendable { private let disposables: CompositeDisposable /// A signal that sends a `completed` event when the lifetime ends. @@ -49,7 +49,7 @@ public final class Lifetime { /// - returns: A disposable that detaches `action` from the lifetime, or `nil` /// if `lifetime` has already ended. @discardableResult - public func observeEnded(_ action: @escaping () -> Void) -> Disposable? { + public func observeEnded(_ action: @escaping @Sendable () -> Void) -> Disposable? { return disposables += action } diff --git a/Sources/Observers/AttemptMap.swift b/Sources/Observers/AttemptMap.swift index 922a3c819..446385e20 100644 --- a/Sources/Observers/AttemptMap.swift +++ b/Sources/Observers/AttemptMap.swift @@ -1,14 +1,14 @@ extension Operators { - internal final class AttemptMap: Observer { - let downstream: Observer - let transform: (InputValue) -> Result + internal final class AttemptMap: Observer { + let downstream: any Observer + let transform: @Sendable (InputValue) -> Result - init(downstream: Observer, transform: @escaping (InputValue) -> Result) { + init(downstream: some Observer, transform: @escaping @Sendable (InputValue) -> Result) { self.downstream = downstream self.transform = transform } - override func receive(_ value: InputValue) { + func receive(_ value: InputValue) { switch transform(value) { case let .success(value): downstream.receive(value) @@ -17,7 +17,7 @@ extension Operators { } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/Collect.swift b/Sources/Observers/Collect.swift index 7704c1c1e..cb9177d0b 100644 --- a/Sources/Observers/Collect.swift +++ b/Sources/Observers/Collect.swift @@ -1,12 +1,12 @@ extension Operators { - internal final class Collect: Observer { - let downstream: Observer<[Value], Error> - let modify: (_ collected: inout [Value], _ latest: Value) -> [Value]? + internal final class Collect: Observer, @unchecked Sendable { + let downstream: any Observer<[Value], Error> + let modify: @Sendable (_ collected: inout [Value], _ latest: Value) -> [Value]? private var values: [Value] = [] private var hasReceivedValues = false - convenience init(downstream: Observer<[Value], Error>, shouldEmit: @escaping (_ collected: [Value], _ latest: Value) -> Bool) { + convenience init(downstream: some Observer<[Value], Error>, shouldEmit: @escaping (_ collected: [Value], _ latest: Value) -> Bool) { self.init(downstream: downstream, modify: { collected, latest in if shouldEmit(collected, latest) { defer { collected = [latest] } @@ -18,7 +18,7 @@ extension Operators { }) } - convenience init(downstream: Observer<[Value], Error>, shouldEmit: @escaping (_ collected: [Value]) -> Bool) { + convenience init(downstream: some Observer<[Value], Error>, shouldEmit: @escaping (_ collected: [Value]) -> Bool) { self.init(downstream: downstream, modify: { collected, latest in collected.append(latest) @@ -31,12 +31,12 @@ extension Operators { }) } - private init(downstream: Observer<[Value], Error>, modify: @escaping (_ collected: inout [Value], _ latest: Value) -> [Value]?) { + private init(downstream: some Observer<[Value], Error>, modify: @escaping @Sendable (_ collected: inout [Value], _ latest: Value) -> [Value]?) { self.downstream = downstream self.modify = modify } - override func receive(_ value: Value) { + func receive(_ value: Value) { if let outgoing = modify(&values, value) { downstream.receive(outgoing) } @@ -46,7 +46,7 @@ extension Operators { } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { if case .completed = termination { if !values.isEmpty { downstream.receive(values) diff --git a/Sources/Observers/CollectEvery.swift b/Sources/Observers/CollectEvery.swift index 91d61ea8c..4515cf53b 100644 --- a/Sources/Observers/CollectEvery.swift +++ b/Sources/Observers/CollectEvery.swift @@ -10,7 +10,7 @@ extension Operators { private let timerDisposable = SerialDisposable() init( - downstream: Observer<[Value], Error>, + downstream: some Observer<[Value], Error>, downstreamLifetime: Lifetime, target: DateScheduler, interval: DispatchTimeInterval, diff --git a/Sources/Observers/CombinePrevious.swift b/Sources/Observers/CombinePrevious.swift index 70fa09cb5..e5fc3679b 100644 --- a/Sources/Observers/CombinePrevious.swift +++ b/Sources/Observers/CombinePrevious.swift @@ -1,14 +1,14 @@ extension Operators { - internal final class CombinePrevious: Observer { - let downstream: Observer<(Value, Value), Error> + internal final class CombinePrevious: Observer, @unchecked Sendable { + let downstream: any Observer<(Value, Value), Error> var previous: Value? - init(downstream: Observer<(Value, Value), Error>, initial: Value?) { + init(downstream: some Observer<(Value, Value), Error>, initial: Value?) { self.downstream = downstream self.previous = initial } - override func receive(_ value: Value) { + func receive(_ value: Value) { if let previous = previous { downstream.receive((previous, value)) } @@ -16,7 +16,7 @@ extension Operators { previous = value } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/CompactMap.swift b/Sources/Observers/CompactMap.swift index d51c448e4..99c162cad 100644 --- a/Sources/Observers/CompactMap.swift +++ b/Sources/Observers/CompactMap.swift @@ -1,20 +1,20 @@ extension Operators { - internal final class CompactMap: Observer { - let downstream: Observer - let transform: (InputValue) -> OutputValue? + internal final class CompactMap: Observer { + let downstream: any Observer + let transform: @Sendable (InputValue) -> OutputValue? - init(downstream: Observer, transform: @escaping (InputValue) -> OutputValue?) { + init(downstream: some Observer, transform: @escaping @Sendable (InputValue) -> OutputValue?) { self.downstream = downstream self.transform = transform } - override func receive(_ value: InputValue) { + func receive(_ value: InputValue) { if let output = transform(value) { downstream.receive(output) } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/Debounce.swift b/Sources/Observers/Debounce.swift index bfe0ef2a3..05b244aed 100644 --- a/Sources/Observers/Debounce.swift +++ b/Sources/Observers/Debounce.swift @@ -10,7 +10,7 @@ extension Operators { private let schedulerDisposable = SerialDisposable() init( - downstream: Observer, + downstream: some Observer, downstreamLifetime: Lifetime, target: DateScheduler, interval: TimeInterval, diff --git a/Sources/Observers/Delay.swift b/Sources/Observers/Delay.swift index 1c4eb42c0..82ed0f808 100644 --- a/Sources/Observers/Delay.swift +++ b/Sources/Observers/Delay.swift @@ -6,7 +6,7 @@ extension Operators { let targetWithClock: DateScheduler init( - downstream: Observer, + downstream: some Observer, downstreamLifetime: Lifetime, target: DateScheduler, interval: TimeInterval diff --git a/Sources/Observers/Dematerialize.swift b/Sources/Observers/Dematerialize.swift index 6c8c15e33..a9f33fee8 100644 --- a/Sources/Observers/Dematerialize.swift +++ b/Sources/Observers/Dematerialize.swift @@ -1,12 +1,12 @@ extension Operators { - internal final class Dematerialize: Observer where Event: EventProtocol { - let downstream: Observer + internal final class Dematerialize: Observer where Event: EventProtocol { + let downstream: any Observer - init(downstream: Observer) { + init(downstream: some Observer) { self.downstream = downstream } - override func receive(_ event: Event) { + func receive(_ event: Event) { switch event.event { case let .value(value): downstream.receive(value) @@ -19,7 +19,7 @@ extension Operators { } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { switch termination { case .completed: downstream.terminate(.completed) diff --git a/Sources/Observers/DematerializeResults.swift b/Sources/Observers/DematerializeResults.swift index 986582b99..2af721460 100644 --- a/Sources/Observers/DematerializeResults.swift +++ b/Sources/Observers/DematerializeResults.swift @@ -1,12 +1,12 @@ extension Operators { - internal final class DematerializeResults: Observer where Result: ResultProtocol { - let downstream: Observer + internal final class DematerializeResults: Observer where Result: ResultProtocol { + let downstream: any Observer - init(downstream: Observer) { + init(downstream: some Observer) { self.downstream = downstream } - override func receive(_ value: Result) { + func receive(_ value: Result) { switch value.result { case let .success(value): downstream.receive(value) @@ -15,7 +15,7 @@ extension Operators { } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { switch termination { case .completed: downstream.terminate(.completed) diff --git a/Sources/Observers/Filter.swift b/Sources/Observers/Filter.swift index 060164d73..89cad6942 100644 --- a/Sources/Observers/Filter.swift +++ b/Sources/Observers/Filter.swift @@ -1,20 +1,20 @@ extension Operators { - internal final class Filter: Observer { - let downstream: Observer - let predicate: (Value) -> Bool + internal final class Filter: Observer { + let downstream: any Observer + let predicate: @Sendable (Value) -> Bool - init(downstream: Observer, predicate: @escaping (Value) -> Bool) { + init(downstream: some Observer, predicate: @escaping @Sendable (Value) -> Bool) { self.downstream = downstream self.predicate = predicate } - override func receive(_ value: Value) { + func receive(_ value: Value) { if predicate(value) { downstream.receive(value) } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/LazyMap.swift b/Sources/Observers/LazyMap.swift index 5cab84648..c1f624b78 100644 --- a/Sources/Observers/LazyMap.swift +++ b/Sources/Observers/LazyMap.swift @@ -2,10 +2,10 @@ extension Operators { internal final class LazyMap: UnaryAsyncOperator { let transform: (Value) -> NewValue let box = Atomic(nil) - let valueDisposable = SerialDisposable() + let valueDisposable = SerialDisposable() init( - downstream: Observer, + downstream: some Observer, downstreamLifetime: Lifetime, target: Scheduler, transform: @escaping (Value) -> NewValue diff --git a/Sources/Observers/Map.swift b/Sources/Observers/Map.swift index 58ac26173..2d63c84f1 100644 --- a/Sources/Observers/Map.swift +++ b/Sources/Observers/Map.swift @@ -1,18 +1,18 @@ extension Operators { - internal final class Map: Observer { - let downstream: Observer - let transform: (InputValue) -> OutputValue + internal final class Map: Observer { + let downstream: any Observer + let transform: @Sendable (InputValue) -> OutputValue - init(downstream: Observer, transform: @escaping (InputValue) -> OutputValue) { + init(downstream: some Observer, transform: @escaping @Sendable (InputValue) -> OutputValue) { self.downstream = downstream self.transform = transform } - override func receive(_ value: InputValue) { + func receive(_ value: InputValue) { downstream.receive(transform(value)) } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/MapError.swift b/Sources/Observers/MapError.swift index c8dc9a95b..5c9952a21 100644 --- a/Sources/Observers/MapError.swift +++ b/Sources/Observers/MapError.swift @@ -1,18 +1,18 @@ extension Operators { - internal final class MapError: Observer { - let downstream: Observer - let transform: (InputError) -> OutputError + internal final class MapError: Observer { + let downstream: any Observer + let transform: @Sendable (InputError) -> OutputError - init(downstream: Observer, transform: @escaping (InputError) -> OutputError) { + init(downstream: some Observer, transform: @escaping @Sendable (InputError) -> OutputError) { self.downstream = downstream self.transform = transform } - override func receive(_ value: Value) { + func receive(_ value: Value) { downstream.receive(value) } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { switch termination { case .completed: downstream.terminate(.completed) diff --git a/Sources/Observers/Materialize.swift b/Sources/Observers/Materialize.swift index 03eff9a0b..12f710f91 100644 --- a/Sources/Observers/Materialize.swift +++ b/Sources/Observers/Materialize.swift @@ -1,16 +1,16 @@ extension Operators { - internal final class Materialize: Observer { - let downstream: Observer.Event, Never> + internal final class Materialize: Observer { + let downstream: any Observer.Event, Never> - init(downstream: Observer.Event, Never>) { + init(downstream: some Observer.Event, Never>) { self.downstream = downstream } - override func receive(_ value: Value) { + func receive(_ value: Value) { downstream.receive(.value(value)) } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.receive(Signal.Event(termination)) switch termination { diff --git a/Sources/Observers/MaterializeAsResult.swift b/Sources/Observers/MaterializeAsResult.swift index 27ae14ce6..14d8b16b7 100644 --- a/Sources/Observers/MaterializeAsResult.swift +++ b/Sources/Observers/MaterializeAsResult.swift @@ -1,16 +1,16 @@ extension Operators { - internal final class MaterializeAsResult: Observer { - let downstream: Observer, Never> + internal final class MaterializeAsResult: Observer { + let downstream: any Observer, Never> - init(downstream: Observer, Never>) { + init(downstream: some Observer, Never>) { self.downstream = downstream } - override func receive(_ value: Value) { + func receive(_ value: Value) { downstream.receive(.success(value)) } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { switch termination { case .completed: downstream.terminate(.completed) diff --git a/Sources/Observers/Observer.swift b/Sources/Observers/Observer.swift index 449e1dbcc..37a5064c4 100644 --- a/Sources/Observers/Observer.swift +++ b/Sources/Observers/Observer.swift @@ -1,8 +1,9 @@ -open class Observer { - public init() {} +public protocol Observer: Sendable { + associatedtype Value + associatedtype Error: Swift.Error - open func receive(_ value: Value) { fatalError() } - open func terminate(_ termination: Termination) { fatalError() } + func receive(_ value: Value) + func terminate(_ termination: Termination) } extension Observer { @@ -14,6 +15,7 @@ extension Observer { process(event) } + @Sendable fileprivate func process(_ event: Signal.Event) { switch event { case let .value(value): diff --git a/Sources/Observers/Reduce.swift b/Sources/Observers/Reduce.swift index 3c66dfdff..626ab9655 100644 --- a/Sources/Observers/Reduce.swift +++ b/Sources/Observers/Reduce.swift @@ -1,20 +1,20 @@ extension Operators { - internal final class Reduce: Observer { - let downstream: Observer + internal final class Reduce: Observer, @unchecked Sendable { + let downstream: any Observer let nextPartialResult: (inout Result, Value) -> Void var accumulator: Result - init(downstream: Observer, initial: Result, nextPartialResult: @escaping (inout Result, Value) -> Void) { + init(downstream: some Observer, initial: Result, nextPartialResult: @escaping (inout Result, Value) -> Void) { self.downstream = downstream self.accumulator = initial self.nextPartialResult = nextPartialResult } - override func receive(_ value: Value) { + func receive(_ value: Value) { nextPartialResult(&accumulator, value) } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { if case .completed = termination { downstream.receive(accumulator) } diff --git a/Sources/Observers/ScanMap.swift b/Sources/Observers/ScanMap.swift index c0f4e6da1..adb0a6c79 100644 --- a/Sources/Observers/ScanMap.swift +++ b/Sources/Observers/ScanMap.swift @@ -1,21 +1,21 @@ extension Operators { - internal final class ScanMap: Observer { - let downstream: Observer - let next: (inout State, Value) -> Result + internal final class ScanMap: Observer, @unchecked Sendable { + let downstream: any Observer + let next: @Sendable (inout State, Value) -> Result var accumulator: State - init(downstream: Observer, initial: State, next: @escaping (inout State, Value) -> Result) { + init(downstream: some Observer, initial: State, next: @escaping @Sendable (inout State, Value) -> Result) { self.downstream = downstream self.accumulator = initial self.next = next } - override func receive(_ value: Value) { + func receive(_ value: Value) { let result = next(&accumulator, value) downstream.receive(result) } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/SkipFirst.swift b/Sources/Observers/SkipFirst.swift index 63c335375..da611bb1e 100644 --- a/Sources/Observers/SkipFirst.swift +++ b/Sources/Observers/SkipFirst.swift @@ -1,17 +1,17 @@ extension Operators { - internal final class SkipFirst: Observer { - let downstream: Observer + internal final class SkipFirst: Observer, @unchecked Sendable { + let downstream: any Observer let count: Int var skipped: Int = 0 - init(downstream: Observer, count: Int) { + init(downstream: some Observer, count: Int) { precondition(count >= 1) self.downstream = downstream self.count = count } - override func receive(_ value: Value) { + func receive(_ value: Value) { if skipped < count { skipped += 1 } else { @@ -19,7 +19,7 @@ extension Operators { } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/SkipRepeats.swift b/Sources/Observers/SkipRepeats.swift index 0aba82357..0f9cb3fcb 100644 --- a/Sources/Observers/SkipRepeats.swift +++ b/Sources/Observers/SkipRepeats.swift @@ -1,16 +1,16 @@ extension Operators { - internal final class SkipRepeats: Observer { - let downstream: Observer + internal final class SkipRepeats: Observer, @unchecked Sendable { + let downstream: any Observer let isEquivalent: (Value, Value) -> Bool var previous: Value? = nil - init(downstream: Observer, isEquivalent: @escaping (Value, Value) -> Bool) { + init(downstream: some Observer, isEquivalent: @escaping (Value, Value) -> Bool) { self.downstream = downstream self.isEquivalent = isEquivalent } - override func receive(_ value: Value) { + func receive(_ value: Value) { let isRepeating = previous.map { isEquivalent($0, value) } ?? false previous = value @@ -19,7 +19,7 @@ extension Operators { } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/SkipWhile.swift b/Sources/Observers/SkipWhile.swift index e928ffefe..ba7780f82 100644 --- a/Sources/Observers/SkipWhile.swift +++ b/Sources/Observers/SkipWhile.swift @@ -1,15 +1,15 @@ extension Operators { - internal final class SkipWhile: Observer { - let downstream: Observer + internal final class SkipWhile: Observer, @unchecked Sendable { + let downstream: any Observer let shouldContinueToSkip: (Value) -> Bool var isSkipping = true - init(downstream: Observer, shouldContinueToSkip: @escaping (Value) -> Bool) { + init(downstream: some Observer, shouldContinueToSkip: @escaping (Value) -> Bool) { self.downstream = downstream self.shouldContinueToSkip = shouldContinueToSkip } - override func receive(_ value: Value) { + func receive(_ value: Value) { isSkipping = isSkipping && shouldContinueToSkip(value) if !isSkipping { @@ -17,7 +17,7 @@ extension Operators { } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/TakeFirst.swift b/Sources/Observers/TakeFirst.swift index a39075940..40d23ef73 100644 --- a/Sources/Observers/TakeFirst.swift +++ b/Sources/Observers/TakeFirst.swift @@ -1,17 +1,17 @@ extension Operators { - internal final class TakeFirst: Observer { - let downstream: Observer + internal final class TakeFirst: Observer, @unchecked Sendable { + let downstream: any Observer let count: Int var taken: Int = 0 - init(downstream: Observer, count: Int) { + init(downstream: some Observer, count: Int) { precondition(count >= 1) self.downstream = downstream self.count = count } - override func receive(_ value: Value) { + func receive(_ value: Value) { if taken < count { taken += 1 downstream.receive(value) @@ -22,7 +22,7 @@ extension Operators { } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/TakeLast.swift b/Sources/Observers/TakeLast.swift index 8e26170cb..757292fdc 100644 --- a/Sources/Observers/TakeLast.swift +++ b/Sources/Observers/TakeLast.swift @@ -1,10 +1,10 @@ extension Operators { - internal final class TakeLast: Observer { - let downstream: Observer + internal final class TakeLast: Observer, @unchecked Sendable { + let downstream: any Observer let count: Int var buffer: [Value] = [] - init(downstream: Observer, count: Int) { + init(downstream: some Observer, count: Int) { precondition(count >= 1) self.downstream = downstream @@ -13,7 +13,7 @@ extension Operators { buffer.reserveCapacity(count) } - override func receive(_ value: Value) { + func receive(_ value: Value) { // To avoid exceeding the reserved capacity of the buffer, // we remove then add. Remove elements until we have room to // add one more. @@ -24,7 +24,7 @@ extension Operators { buffer.append(value) } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { if case .completed = termination { buffer.forEach(downstream.receive) buffer = [] diff --git a/Sources/Observers/TakeUntil.swift b/Sources/Observers/TakeUntil.swift index 4507c1a54..e67c98d32 100644 --- a/Sources/Observers/TakeUntil.swift +++ b/Sources/Observers/TakeUntil.swift @@ -1,14 +1,14 @@ extension Operators { - internal final class TakeUntil: Observer { - let downstream: Observer - let shouldContinue: (Value) -> Bool + internal final class TakeUntil: Observer { + let downstream: any Observer + let shouldContinue: @Sendable (Value) -> Bool - init(downstream: Observer, shouldContinue: @escaping (Value) -> Bool) { + init(downstream: some Observer, shouldContinue: @escaping @Sendable (Value) -> Bool) { self.downstream = downstream self.shouldContinue = shouldContinue } - override func receive(_ value: Value) { + func receive(_ value: Value) { downstream.receive(value) if !shouldContinue(value) { @@ -16,7 +16,7 @@ extension Operators { } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/TakeWhile.swift b/Sources/Observers/TakeWhile.swift index 20262f2cf..11acd1337 100644 --- a/Sources/Observers/TakeWhile.swift +++ b/Sources/Observers/TakeWhile.swift @@ -1,14 +1,14 @@ extension Operators { - internal final class TakeWhile: Observer { - let downstream: Observer - let shouldContinue: (Value) -> Bool + internal final class TakeWhile: Observer, @unchecked Sendable { + let downstream: any Observer + let shouldContinue: @Sendable (Value) -> Bool - init(downstream: Observer, shouldContinue: @escaping (Value) -> Bool) { + init(downstream: some Observer, shouldContinue: @Sendable @escaping (Value) -> Bool) { self.downstream = downstream self.shouldContinue = shouldContinue } - override func receive(_ value: Value) { + func receive(_ value: Value) { if !shouldContinue(value) { downstream.terminate(.completed) } else { @@ -16,7 +16,7 @@ extension Operators { } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Observers/Throttle.swift b/Sources/Observers/Throttle.swift index a70f4bfdf..4d004fbc5 100644 --- a/Sources/Observers/Throttle.swift +++ b/Sources/Observers/Throttle.swift @@ -9,7 +9,7 @@ extension Operators { private let schedulerDisposable = SerialDisposable() init( - downstream: Observer, + downstream: some Observer, downstreamLifetime: Lifetime, target: DateScheduler, interval: TimeInterval diff --git a/Sources/Observers/UnaryAsyncOperator.swift b/Sources/Observers/UnaryAsyncOperator.swift index 6821e5f22..8ca32ffde 100644 --- a/Sources/Observers/UnaryAsyncOperator.swift +++ b/Sources/Observers/UnaryAsyncOperator.swift @@ -1,4 +1,4 @@ -internal class UnaryAsyncOperator: Observer { +internal class UnaryAsyncOperator: Observer, @unchecked Sendable { let downstreamLifetime: Lifetime let target: Scheduler @@ -9,11 +9,11 @@ internal class UnaryAsyncOperator: var isActive: Bool { state.is(.active) } // Direct access is discouraged for subclasses by keeping this private. - private let downstream: Observer + private let downstream: any Observer private let state: UnsafeAtomicState public init( - downstream: Observer, + downstream: some Observer, downstreamLifetime: Lifetime, target: Scheduler ) { @@ -22,8 +22,6 @@ internal class UnaryAsyncOperator: self.target = target self.state = UnsafeAtomicState(.active) - super.init() - downstreamLifetime.observeEnded { if self.state.tryTransition(from: .active, to: .terminated) { target.schedule { @@ -37,7 +35,7 @@ internal class UnaryAsyncOperator: state.deinitialize() } - open override func receive(_ value: InputValue) { fatalError() } + open func receive(_ value: InputValue) { fatalError() } /// Send a value to the downstream without any implicit scheduling on `target`. /// @@ -59,7 +57,7 @@ internal class UnaryAsyncOperator: } } - open override func terminate(_ termination: Termination) { + open func terminate(_ termination: Termination) { // The atomic transition here must happen **after** we hop onto the target scheduler. This is to preserve the timing // behaviour observed in previous versions of ReactiveSwift. diff --git a/Sources/Observers/UniqueValues.swift b/Sources/Observers/UniqueValues.swift index 48045587d..1e72c4117 100644 --- a/Sources/Observers/UniqueValues.swift +++ b/Sources/Observers/UniqueValues.swift @@ -1,16 +1,16 @@ extension Operators { - internal final class UniqueValues: Observer { - let downstream: Observer + internal final class UniqueValues: Observer, @unchecked Sendable { + let downstream: any Observer let extract: (Value) -> Identity var seenIdentities: Set = [] - init(downstream: Observer, extract: @escaping (Value) -> Identity) { + init(downstream: some Observer, extract: @escaping (Value) -> Identity) { self.downstream = downstream self.extract = extract } - override func receive(_ value: Value) { + func receive(_ value: Value) { let identity = extract(value) let (inserted, _) = seenIdentities.insert(identity) @@ -19,7 +19,7 @@ extension Operators { } } - override func terminate(_ termination: Termination) { + func terminate(_ termination: Termination) { downstream.terminate(termination) } } diff --git a/Sources/Optional.swift b/Sources/Optional.swift index d30ba0fcf..f7f9155e6 100644 --- a/Sources/Optional.swift +++ b/Sources/Optional.swift @@ -30,7 +30,7 @@ extension Optional: OptionalProtocol { extension Signal { /// Turns each value into an Optional. internal func optionalize() -> Signal { - return map(Optional.init) + return map { @Sendable in Optional($0) } } } diff --git a/Sources/Property.swift b/Sources/Property.swift index 5708aebfe..1d347b8ce 100644 --- a/Sources/Property.swift +++ b/Sources/Property.swift @@ -12,7 +12,7 @@ import Glibc /// /// Only classes can conform to this protocol, because having a signal /// for changes over time implies the origin must have a unique identity. -public protocol PropertyProtocol: AnyObject, BindingSource { +public protocol PropertyProtocol: AnyObject, BindingSource { /// The current value of the property. var value: Value { get } @@ -36,7 +36,7 @@ public protocol PropertyProtocol: AnyObject, BindingSource { } /// Represents an observable property that can be mutated directly. -public protocol MutablePropertyProtocol: PropertyProtocol, BindingTargetProvider { +public protocol MutablePropertyProtocol: PropertyProtocol, BindingTargetProvider { /// The current value of the property. var value: Value { get set } diff --git a/Sources/Scheduler.swift b/Sources/Scheduler.swift index e92d8b7dc..7c2e93677 100644 --- a/Sources/Scheduler.swift +++ b/Sources/Scheduler.swift @@ -14,7 +14,7 @@ import Foundation #endif /// Represents a serial queue of work items. -public protocol Scheduler: AnyObject { +public protocol Scheduler: AnyObject, Sendable { /// Enqueues an action on the scheduler. /// /// When the work is executed depends on the scheduler in use. @@ -205,7 +205,7 @@ private final class DispatchSourceTimerWrapper: Hashable { } /// A scheduler backed by a serial GCD queue. -public final class QueueScheduler: DateScheduler { +public final class QueueScheduler: DateScheduler, @unchecked Sendable { /// A singleton `QueueScheduler` that always targets the main thread's GCD /// queue. /// @@ -389,7 +389,7 @@ public final class QueueScheduler: DateScheduler { } /// A scheduler that implements virtualized time, for use in testing. -public final class TestScheduler: DateScheduler { +public final class TestScheduler: DateScheduler, @unchecked Sendable { private final class ScheduledAction { let date: Date let action: () -> Void diff --git a/Sources/Signal.Observer.swift b/Sources/Signal.Observer.swift index 0b21479c8..b98807331 100644 --- a/Sources/Signal.Observer.swift +++ b/Sources/Signal.Observer.swift @@ -9,8 +9,8 @@ extension Signal { /// An Observer is a simple wrapper around a function which can receive Events /// (typically from a Signal). - public final class Observer: ReactiveSwift.Observer { - public typealias Action = (Event) -> Void + public final class Observer: NSObject, ReactiveSwift.Observer, Sendable { + public typealias Action = @Sendable (Event) -> Void private let _send: Action /// Whether the observer should send an `interrupted` event as it deinitializes. @@ -91,11 +91,11 @@ extension Signal { } } - public override func receive(_ value: Value) { + public func receive(_ value: Value) { send(value: value) } - public override func terminate(_ termination: Termination) { + public func terminate(_ termination: Termination) { switch termination { case let .failed(error): send(error: error) @@ -128,11 +128,13 @@ extension Signal { } /// Puts a `completed` event into `self`. + @Sendable public func sendCompleted() { _send(.completed) } /// Puts an `interrupted` event into `self`. + @Sendable public func sendInterrupted() { _send(.interrupted) } diff --git a/Sources/Signal.swift b/Sources/Signal.swift index 660a625c0..e9c5564df 100644 --- a/Sources/Signal.swift +++ b/Sources/Signal.swift @@ -17,7 +17,7 @@ import Dispatch /// A Signal is kept alive until either of the following happens: /// 1. its input observer receives a terminating event; or /// 2. it has no active observers, and is not being retained. -public final class Signal { +public final class Signal: Sendable { /// The `Signal` core which manages the event stream. /// /// A `Signal` is the externally retained shell of the `Signal` core. The separation @@ -43,14 +43,9 @@ public final class Signal { /// | Other observers | | Other observers | /// ------------------- ------------------- /// ``` - private let core: CoreBase + private let core: any SignalCoreBase - private class CoreBase { - func observe(_ observer: Observer) -> Disposable? { fatalError() } - func signalDidDeinitialize() { fatalError() } - } - - private final class Core: CoreBase { + private final class Core: SignalCoreBase, @unchecked Sendable { /// The disposable associated with the signal. /// /// Disposing of `disposable` is assumed to remove the generator @@ -67,21 +62,20 @@ public final class Signal { /// Used to ensure that events are serialized during delivery to observers. private let sendLock: SendLock - fileprivate init(_ generator: (Observer, Lifetime) -> Void) { + fileprivate init(_ generator: @Sendable (Observer, Lifetime) -> Void) { state = .alive(Bag(), hasDeinitialized: false) stateLock = Lock.make() sendLock = SendLock.make() disposable = CompositeDisposable() - super.init() - // The generator observer retains the `Signal` core. generator(Observer(action: self.send, interruptsOnDeinit: true), Lifetime(disposable)) } @_specialize(kind: partial, where SendLock == Lock) @_specialize(kind: partial, where SendLock == NoLock) + @Sendable private func send(_ event: Event) { if event.isTerminating { // Recursive events are disallowed for `value` events, but are permitted @@ -160,7 +154,7 @@ public final class Signal { /// /// - returns: A `Disposable` which can be used to disconnect the observer, /// or `nil` if the signal has already terminated. - fileprivate override func observe(_ observer: Observer) -> Disposable? { + fileprivate func observe(_ observer: Observer) -> Disposable? { var token: Bag.Token? stateLock.lock() @@ -277,7 +271,7 @@ public final class Signal { } /// Acknowledge the deinitialization of the `Signal`. - fileprivate override func signalDidDeinitialize() { + fileprivate func signalDidDeinitialize() { stateLock.lock() // Mark the `Signal` has now deinitialized. @@ -294,7 +288,7 @@ public final class Signal { } } - private init(_ core: CoreBase) { + private init(_ core: any SignalCoreBase) { self.core = core } @@ -311,7 +305,7 @@ public final class Signal { /// - parameters: /// - generator: A closure that accepts an implicitly created observer /// that will act as an event emitter for the signal. - public convenience init(_ generator: (Observer, Lifetime) -> Void) { + public convenience init(_ generator: @Sendable (Observer, Lifetime) -> Void) { self.init(Core(generator)) } @@ -332,7 +326,7 @@ public final class Signal { /// - parameters: /// - generator: A closure that accepts an implicitly created observer /// that will act as an event emitter for the signal. - public static func unserialized(_ generator: (Observer, Lifetime) -> Void) -> Signal { + public static func unserialized(_ generator: @Sendable (Observer, Lifetime) -> Void) -> Signal { self.init(Core(generator)) } @@ -357,12 +351,12 @@ public final class Signal { /// - parameters: /// - generator: A closure that accepts an implicitly created observer /// that will act as an event emitter for the signal. - public static func reentrantUnserialized(_ generator: (Observer, Lifetime) -> Void) -> Signal { + public static func reentrantUnserialized(_ generator: @Sendable (Observer, Lifetime) -> Void) -> Signal { self.init(Core { innerObserver, lifetime in var eventQueue: [Event] = [] var isInLoop = false - let wrappedObserver = Observer { outerEvent in + let wrappedObserver = Observer { @Sendable outerEvent in if !isInLoop { isInLoop = true innerObserver.send(outerEvent) @@ -407,7 +401,7 @@ public final class Signal { /// /// The Swift compiler has also an optimization for enums with payloads that are /// all reference counted, and at most one no-payload case. - private enum State { + private enum State: Sendable { // `TerminationKind` is constantly pointer-size large to keep `Signal.Core` // allocation size independent of the actual `Value` and `Error` types. enum TerminationKind { @@ -455,6 +449,14 @@ public final class Signal { } } +private protocol SignalCoreBase: Sendable { + associatedtype Value + associatedtype Error: Swift.Error + + func observe(_ observer: Signal.Observer) -> Disposable? + func signalDidDeinitialize() +} + extension Signal { /// A Signal that never sends any events to its observers. public static var never: Signal { @@ -522,7 +524,7 @@ extension Signal { public static func unserializedPipe(disposable: Disposable? = nil) -> (output: Signal, input: Observer) { var observer: Observer! - let signal = unserialized { innerObserver, lifetime in + let signal = unserialized { @Sendable innerObserver, lifetime in observer = innerObserver lifetime += disposable } @@ -566,7 +568,7 @@ extension Signal { } } -public protocol SignalProtocol: AnyObject { +public protocol SignalProtocol: AnyObject { /// The type of values being sent by `self`. associatedtype Value @@ -705,7 +707,7 @@ extension Signal { /// returns a new value. /// /// - returns: A signal that will send new values. - public func map(_ transform: @escaping (Value) -> U) -> Signal { + public func map(_ transform: @escaping @Sendable (Value) -> U) -> Signal { return flatMapEvent(Signal.Event.map(transform)) } @@ -736,7 +738,7 @@ extension Signal { /// a new type of error object. /// /// - returns: A signal that will send new type of errors. - public func mapError(_ transform: @escaping (Error) -> F) -> Signal { + public func mapError(_ transform: @escaping @Sendable (Error) -> F) -> Signal { return flatMapEvent(Signal.Event.mapError(transform)) } @@ -766,7 +768,7 @@ extension Signal { /// included in the returned `Signal`. /// /// - returns: A signal that forwards the values passing the given closure. - public func filter(_ isIncluded: @escaping (Value) -> Bool) -> Signal { + public func filter(_ isIncluded: @escaping @Sendable (Value) -> Bool) -> Signal { return flatMapEvent(Signal.Event.filter(isIncluded)) } @@ -776,7 +778,7 @@ extension Signal { /// returns a new optional value. /// /// - returns: A signal that will send new values, that are non `nil` after the transformation. - public func compactMap(_ transform: @escaping (Value) -> U?) -> Signal { + public func compactMap(_ transform: @escaping @Sendable (Value) -> U?) -> Signal { return flatMapEvent(Signal.Event.compactMap(transform)) } @@ -787,7 +789,7 @@ extension Signal { /// /// - returns: A signal that will send new values, that are non `nil` after the transformation. @available(*, deprecated, renamed: "compactMap") - public func filterMap(_ transform: @escaping (Value) -> U?) -> Signal { + public func filterMap(_ transform: @escaping @Sendable (Value) -> U?) -> Signal { return flatMapEvent(Signal.Event.compactMap(transform)) } } @@ -1091,13 +1093,13 @@ extension Signal { /// /// - returns: A signal with attached side-effects for given event cases. public func on( - event: ((Event) -> Void)? = nil, - failed: ((Error) -> Void)? = nil, - completed: (() -> Void)? = nil, - interrupted: (() -> Void)? = nil, - terminated: (() -> Void)? = nil, - disposed: (() -> Void)? = nil, - value: ((Value) -> Void)? = nil + event: (@Sendable (Event) -> Void)? = nil, + failed: (@Sendable (Error) -> Void)? = nil, + completed: (@Sendable () -> Void)? = nil, + interrupted: (@Sendable () -> Void)? = nil, + terminated: (@Sendable () -> Void)? = nil, + disposed: (@Sendable () -> Void)? = nil, + value: (@Sendable (Value) -> Void)? = nil ) -> Signal { return Signal.unserialized { observer, lifetime in if let action = disposed { @@ -1486,7 +1488,7 @@ extension Signal { /// next call of `next`. /// /// - returns: A producer that sends the output that is computed from the accumuation. - public func scanMap(_ initialState: State, _ next: @escaping (State, Value) -> (State, U)) -> Signal { + public func scanMap(_ initialState: State, _ next: @escaping @Sendable (State, Value) -> (State, U)) -> Signal { return flatMapEvent(Signal.Event.scanMap(initialState, next)) } @@ -1500,7 +1502,7 @@ extension Signal { /// next call of `next`. /// /// - returns: A producer that sends the output that is computed from the accumuation. - public func scanMap(into initialState: State, _ next: @escaping (inout State, Value) -> U) -> Signal { + public func scanMap(into initialState: State, _ next: @escaping @Sendable (inout State, Value) -> U) -> Signal { return flatMapEvent(Signal.Event.scanMap(into: initialState, next)) } } @@ -1527,7 +1529,7 @@ extension Signal { /// - isEquivalent: A closure to determine whether two values are equivalent. /// /// - returns: A signal which conditionally forwards values from `self`. - public func skipRepeats(_ isEquivalent: @escaping (Value, Value) -> Bool) -> Signal { + public func skipRepeats(_ isEquivalent: @escaping @Sendable (Value, Value) -> Bool) -> Signal { return flatMapEvent(Signal.Event.skipRepeats(isEquivalent)) } @@ -1539,7 +1541,7 @@ extension Signal { /// - shouldContinue: A closure to determine whether the skipping should continue. /// /// - returns: A signal which conditionally forwards values from `self`. - public func skip(while shouldContinue: @escaping (Value) -> Bool) -> Signal { + public func skip(while shouldContinue: @escaping @Sendable (Value) -> Bool) -> Signal { return flatMapEvent(Signal.Event.skip(while: shouldContinue)) } @@ -2352,7 +2354,7 @@ extension Signal where Value == Bool { /// /// - returns: A signal that emits the logical NOT results. public func negate() -> Signal { - return self.map(!) + return self.map { @Sendable in !$0 } } /// Create a signal that computes a logical AND between the latest values of `self` @@ -2428,7 +2430,7 @@ extension Signal { /// /// - returns: A signal which forwards the values from `self` until the given action /// fails. - public func attempt(_ action: @escaping (Value) -> Result<(), Error>) -> Signal { + public func attempt(_ action: @escaping @Sendable (Value) -> Result<(), Error>) -> Signal { return flatMapEvent(Signal.Event.attempt(action)) } @@ -2441,7 +2443,7 @@ extension Signal { /// error. /// /// - returns: A signal which forwards the transformed values. - public func attemptMap(_ transform: @escaping (Value) -> Result) -> Signal { + public func attemptMap(_ transform: @escaping @Sendable (Value) -> Result) -> Signal { return flatMapEvent(Signal.Event.attemptMap(transform)) } } diff --git a/Sources/SignalProducer.swift b/Sources/SignalProducer.swift index 36672f01f..699203442 100644 --- a/Sources/SignalProducer.swift +++ b/Sources/SignalProducer.swift @@ -551,7 +551,7 @@ public protocol SignalProducerConvertible { } /// A protocol for constraining associated types to `SignalProducer`. -public protocol SignalProducerProtocol { +public protocol SignalProducerProtocol { /// The type of values being sent by `self`. associatedtype Value