//////////////////////////////////////////////////////////////////////////// // // Copyright 2020 Realm Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // //////////////////////////////////////////////////////////////////////////// import Combine import Realm import Realm.Private // MARK: - Identifiable /// A protocol which defines a default identity for Realm Objects /// /// Declaring your Object subclass as conforming to this protocol will supply /// a default implementation for `Identifiable`'s `id` which works for Realm /// Objects: /// /// // Automatically conforms to `Identifiable` /// class MyObjectType: Object, ObjectKeyIdentifiable { /// // ... /// } /// /// You can also manually conform to `Identifiable` if you wish, but note that /// using the object's memory address does *not* work for managed objects. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public protocol ObjectKeyIdentifiable: Identifiable { /// The stable identity of the entity associated with `self`. var id: UInt64 { get } } /// :nodoc: @available(*, deprecated, renamed: "ObjectKeyIdentifiable") @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public typealias ObjectKeyIdentifable = ObjectKeyIdentifiable @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension ObjectKeyIdentifiable where Self: ObjectBase { /// A stable identifier for this object. For managed Realm objects, this /// value will be the same for all object instances which refer to the same /// object (i.e. for which `Object.isSameObject(as:)` returns true). public var id: UInt64 { RLMObjectBaseGetCombineId(self) } } /// :nodoc: @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension ObjectKeyIdentifiable where Self: ProjectionObservable { /// A stable identifier for this projection. public var id: UInt64 { RLMObjectBaseGetCombineId(rootObject) } } // MARK: - Combine /// A type which can be passed to `valuePublisher()` or `changesetPublisher()`. @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public protocol RealmSubscribable { /// :nodoc: func _observe<S>(_ keyPaths: [String]?, on queue: DispatchQueue?, _ subscriber: S) -> NotificationToken where S: Subscriber, S.Input == Self /// :nodoc: func _observe<S>(_ keyPaths: [String]?, _ subscriber: S) -> NotificationToken where S: Subscriber, S.Input == Void } @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension Publisher { /// Freezes all Realm objects and collections emitted by the upstream publisher /// /// Freezing a Realm object makes it no longer live-update when writes are /// made to the Realm and makes it safe to pass freely between threads /// without using `.threadSafeReference()`. /// /// ``` /// // Get a publisher for a Results /// let cancellable = myResults.publisher /// // Convert to frozen Results /// .freeze() /// // Unlike live objects, frozen objects can be sent to a concurrent queue /// .receive(on: DispatchQueue.global()) /// .sink { frozenResults in /// // Do something with the frozen Results /// } /// ``` /// /// - returns: A publisher that publishes frozen copies of the objects which the upstream publisher publishes. public func freeze<T>() -> Publishers.Map<Self, T> where Output: ThreadConfined, T == Output { return map { $0.freeze() } } /// Freezes all Realm object changesets emitted by the upstream publisher. /// /// Freezing a Realm object changeset makes the included object reference /// no longer live-update when writes are made to the Realm and makes it /// safe to pass freely between threads without using /// `.threadSafeReference()`. It also guarantees that the frozen object /// contained in the changeset will always match the property changes, which /// is not always the case when using thread-safe references. /// /// ``` /// // Get a changeset publisher for an object /// let cancellable = changesetPublisher(object) /// // Convert to frozen changesets /// .freeze() /// // Unlike live objects, frozen objects can be sent to a concurrent queue /// .receive(on: DispatchQueue.global()) /// .sink { changeset in /// // Do something with the frozen changeset /// } /// ``` /// /// - returns: A publisher that publishes frozen copies of the changesets /// which the upstream publisher publishes. public func freeze<T: Object>() -> Publishers.Map<Self, ObjectChange<T>> where Output == ObjectChange<T> { return map { if case .change(let object, let properties) = $0 { return .change(object.freeze(), properties) } return $0 } } /// Freezes all Realm collection changesets from the upstream publisher. /// /// Freezing a Realm collection changeset makes the included collection /// reference no longer live-update when writes are made to the Realm and /// makes it safe to pass freely between threads without using /// `.threadSafeReference()`. It also guarantees that the frozen collection /// contained in the changeset will always match the change information, /// which is not always the case when using thread-safe references. /// /// ``` /// // Get a changeset publisher for a collection /// let cancellable = myList.changesetPublisher /// // Convert to frozen changesets /// .freeze() /// // Unlike live objects, frozen objects can be sent to a concurrent queue /// .receive(on: DispatchQueue.global()) /// .sink { changeset in /// // Do something with the frozen changeset /// } /// ``` /// /// - returns: A publisher that publishes frozen copies of the changesets /// which the upstream publisher publishes. public func freeze<T: RealmCollection>() -> Publishers.Map<Self, RealmCollectionChange<T>> where Output == RealmCollectionChange<T> { return map { switch $0 { case .initial(let collection): return .initial(collection.freeze()) case .update(let collection, deletions: let deletions, insertions: let insertions, modifications: let modifications): return .update(collection.freeze(), deletions: deletions, insertions: insertions, modifications: modifications) case .error(let error): return .error(error) } } } /// Freezes all Realm sectioned results changesets from the upstream publisher. /// /// Freezing a Realm sectioned results changeset makes the included sectioned results /// reference no longer live-update when writes are made to the Realm and /// makes it safe to pass freely between threads without using /// `.threadSafeReference()`. It also guarantees that the frozen sectioned results /// contained in the changeset will always match the change information, /// which is not always the case when using thread-safe references. /// /// ``` /// // Get a changeset publisher for the sectioned results /// let cancellable = mySectionedResults.changesetPublisher /// // Convert to frozen changesets /// .freeze() /// // Unlike live objects, frozen objects can be sent to a concurrent queue /// .receive(on: DispatchQueue.global()) /// .sink { changeset in /// // Do something with the frozen changeset /// } /// ``` /// /// - returns: A publisher that publishes frozen copies of the changesets /// which the upstream publisher publishes. public func freeze<T: RealmSectionedResult>() -> Publishers.Map<Self, SectionedResultsChange<T>> where Output == SectionedResultsChange<T> { return map { switch $0 { case .initial(let collection): return .initial(collection.freeze()) case .update(let collection, deletions: let deletions, insertions: let insertions, modifications: let modifications, sectionsToInsert: let sectionsToInsert, sectionsToDelete: let sectionsToDelete): return .update(collection.freeze(), deletions: deletions, insertions: insertions, modifications: modifications, sectionsToInsert: sectionsToInsert, sectionsToDelete: sectionsToDelete) } } } /// Freezes all Realm collection changesets from the upstream publisher. /// /// Freezing a Realm collection changeset makes the included collection /// reference no longer live-update when writes are made to the Realm and /// makes it safe to pass freely between threads without using /// `.threadSafeReference()`. It also guarantees that the frozen collection /// contained in the changeset will always match the change information, /// which is not always the case when using thread-safe references. /// /// ``` /// // Get a changeset publisher for a collection /// let cancellable = myMap.changesetPublisher /// // Convert to frozen changesets /// .freeze() /// // Unlike live objects, frozen objects can be sent to a concurrent queue /// .receive(on: DispatchQueue.global()) /// .sink { changeset in /// // Do something with the frozen changeset /// } /// ``` /// /// - returns: A publisher that publishes frozen copies of the changesets /// which the upstream publisher publishes. public func freeze<T: RealmKeyedCollection>() -> Publishers.Map<Self, RealmMapChange<T>> where Output == RealmMapChange<T> { return map { switch $0 { case .initial(let collection): return .initial(collection.freeze()) case .update(let collection, deletions: let deletions, insertions: let insertions, modifications: let modifications): return .update(collection.freeze(), deletions: deletions, insertions: insertions, modifications: modifications) case .error(let error): return .error(error) } } } /// Freezes all Realm projection changesets emitted by the upstream publisher. /// /// Freezing a Realm projection changeset makes the included projection reference /// no longer live-update when writes are made to the Realm and makes it /// safe to pass freely between threads without using /// `.threadSafeReference()`. It also guarantees that the frozen projection /// contained in the changeset will always match the property changes, which /// is not always the case when using thread-safe references. /// /// ``` /// // Get a changeset publisher for an projection /// let cancellable = changesetPublisher(projection) /// // Convert to frozen changesets /// .freeze() /// // Unlike live projections, frozen projections can be sent to a concurrent queue /// .receive(on: DispatchQueue.global()) /// .sink { changeset in /// // Do something with the frozen changeset /// } /// ``` /// /// - returns: A publisher that publishes frozen copies of the changesets /// which the upstream publisher publishes. public func freeze<T: ProjectionObservable>() -> Publishers.Map<Self, ObjectChange<T>> where Output == ObjectChange<T>, T: ThreadConfined { return map { if case .change(let projection, let properties) = $0 { return .change(projection.freeze(), properties) } return $0 } } } @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension Publisher where Output: ThreadConfined { /// Enables passing thread-confined objects to a different dispatch queue. /// /// Each call to `receive(on:)` on a publisher which emits Realm /// thread-confined objects must be proceeded by a call to /// `.threadSafeReference()`.The returned publisher handles the required /// logic to pass the thread-confined object to the new queue. Only serial /// dispatch queues are supported and using other schedulers will result in /// a fatal error. /// /// For example, to subscribe on a background thread, do some work there, /// then pass the object to the main thread you can do: /// /// let cancellable = publisher(myObject) /// .subscribe(on: DispatchQueue(label: "background queue") /// .print() /// .threadSafeReference() /// .receive(on: DispatchQueue.main) /// .sink { object in /// // Do things with the object on the main thread /// } /// /// Calling this function on a publisher which emits frozen or unmanaged /// objects is unneccesary but is allowed. /// /// - returns: A publisher that supports `receive(on:)` for thread-confined objects. public func threadSafeReference() -> RealmPublishers.MakeThreadSafe<Self> { RealmPublishers.MakeThreadSafe(self) } } @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension Publisher { /// Enables passing object changesets to a different dispatch queue. /// /// Each call to `receive(on:)` on a publisher which emits Realm /// thread-confined objects must be proceeded by a call to /// `.threadSafeReference()`. The returned publisher handles the required /// logic to pass the thread-confined object to the new queue. Only serial /// dispatch queues are supported and using other schedulers will result in /// a fatal error. /// /// For example, to subscribe on a background thread, do some work there, /// then pass the object changeset to the main thread you can do: /// /// let cancellable = changesetPublisher(myObject) /// .subscribe(on: DispatchQueue(label: "background queue") /// .print() /// .threadSafeReference() /// .receive(on: DispatchQueue.main) /// .sink { objectChange in /// // Do things with the object on the main thread /// } /// /// - returns: A publisher that supports `receive(on:)` for thread-confined objects. public func threadSafeReference<T: Object>() -> RealmPublishers.MakeThreadSafeObjectChangeset<Self, T> where Output == ObjectChange<T> { RealmPublishers.MakeThreadSafeObjectChangeset(self) } /// Enables passing projection changesets to a different dispatch queue. /// /// Each call to `receive(on:)` on a publisher which emits Realm /// thread-confined projection must be proceeded by a call to /// `.threadSafeReference()`. The returned publisher handles the required /// logic to pass the thread-confined projection to the new queue. Only serial /// dispatch queues are supported and using other schedulers will result in /// a fatal error. /// /// For example, to subscribe on a background thread, do some work there, /// then pass the projection changeset to the main thread you can do: /// /// let cancellable = changesetPublisher(myProjection) /// .subscribe(on: DispatchQueue(label: "background queue") /// .print() /// .threadSafeReference() /// .receive(on: DispatchQueue.main) /// .sink { projectionChange in /// // Do things with the projection on the main thread /// } /// /// - returns: A publisher that supports `receive(on:)` for thread-confined objects. public func threadSafeReference<T: ProjectionObservable>() -> RealmPublishers.MakeThreadSafeObjectChangeset<Self, T> where Output == ObjectChange<T>, T: ThreadConfined { RealmPublishers.MakeThreadSafeObjectChangeset(self) } /// Enables passing Realm collection changesets to a different dispatch queue. /// /// Each call to `receive(on:)` on a publisher which emits Realm /// thread-confined objects must be proceeded by a call to /// `.threadSafeReference()`. The returned publisher handles the required /// logic to pass the thread-confined object to the new queue. Only serial /// dispatch queues are supported and using other schedulers will result in /// a fatal error. /// /// For example, to subscribe on a background thread, do some work there, /// then pass the collection changeset to the main thread you can do: /// /// let cancellable = myCollection.changesetPublisher /// .subscribe(on: DispatchQueue(label: "background queue") /// .print() /// .threadSafeReference() /// .receive(on: DispatchQueue.main) /// .sink { collectionChange in /// // Do things with the collection on the main thread /// } /// /// - returns: A publisher that supports `receive(on:)` for thread-confined objects. public func threadSafeReference<T: RealmCollection>() -> RealmPublishers.MakeThreadSafeCollectionChangeset<Self, T> where Output == RealmCollectionChange<T> { RealmPublishers.MakeThreadSafeCollectionChangeset(self) } /// Enables passing Realm collection changesets to a different dispatch queue. /// /// Each call to `receive(on:)` on a publisher which emits Realm /// thread-confined objects must be proceeded by a call to /// `.threadSafeReference()`. The returned publisher handles the required /// logic to pass the thread-confined object to the new queue. Only serial /// dispatch queues are supported and using other schedulers will result in /// a fatal error. /// /// For example, to subscribe on a background thread, do some work there, /// then pass the collection changeset to the main thread you can do: /// /// let cancellable = myCollection.changesetPublisher /// .subscribe(on: DispatchQueue(label: "background queue") /// .print() /// .threadSafeReference() /// .receive(on: DispatchQueue.main) /// .sink { collectionChange in /// // Do things with the collection on the main thread /// } /// /// - returns: A publisher that supports `receive(on:)` for thread-confined objects. public func threadSafeReference<T: RealmKeyedCollection>() -> RealmPublishers.MakeThreadSafeKeyedCollectionChangeset<Self, T> where Output == RealmMapChange<T> { RealmPublishers.MakeThreadSafeKeyedCollectionChangeset(self) } /// Enables passing Realm sectioned results changesets to a different dispatch queue. /// /// Each call to `receive(on:)` on a publisher which emits Realm /// thread-confined objects must be proceeded by a call to /// `.threadSafeReference()`. The returned publisher handles the required /// logic to pass the thread-confined object to the new queue. Only serial /// dispatch queues are supported and using other schedulers will result in /// a fatal error. /// /// For example, to subscribe on a background thread, do some work there, /// then pass the collection changeset to the main thread you can do: /// /// let cancellable = mySectionedResults.changesetPublisher /// .subscribe(on: DispatchQueue(label: "background queue") /// .print() /// .threadSafeReference() /// .receive(on: DispatchQueue.main) /// .sink { sectionedResultsChange in /// // Do things with the sectioned results on the main thread /// } /// /// - returns: A publisher that supports `receive(on:)` for thread-confined objects. public func threadSafeReference<T: RealmSectionedResult>() -> RealmPublishers.MakeThreadSafeSectionedResultsChangeset<Self, T> where Output == SectionedResultsChange<T> { RealmPublishers.MakeThreadSafeSectionedResultsChangeset(self) } } @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension RealmCollection where Self: RealmSubscribable { /// A publisher that emits Void each time the collection changes. /// /// Despite the name, this actually emits *after* the collection has changed. public var objectWillChange: RealmPublishers.WillChange<Self> { RealmPublishers.WillChange(self) } /// :nodoc: @available(*, deprecated, renamed: "collectionPublisher") public var publisher: RealmPublishers.Value<Self> { RealmPublishers.Value(self) } /// A publisher that emits the collection each time the collection changes. public var collectionPublisher: RealmPublishers.Value<Self> { RealmPublishers.Value(self) } /// A publisher that emits the collection each time the collection changes on the given property keyPaths. public func collectionPublisher(keyPaths: [String]?) -> RealmPublishers.Value<Self> { return RealmPublishers.Value(self, keyPaths: keyPaths) } /// A publisher that emits a collection changeset each time the collection changes. public var changesetPublisher: RealmPublishers.CollectionChangeset<Self> { RealmPublishers.CollectionChangeset(self) } /// A publisher that emits a collection changeset each time the collection changes on the given property keyPaths. public func changesetPublisher(keyPaths: [String]?) -> RealmPublishers.CollectionChangeset<Self> { return RealmPublishers.CollectionChangeset(self, keyPaths: keyPaths) } } @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension RealmKeyedCollection where Self: RealmSubscribable { /// A publisher that emits Void each time the collection changes. /// /// Despite the name, this actually emits *after* the collection has changed. public var objectWillChange: RealmPublishers.WillChange<Self> { RealmPublishers.WillChange(self) } /// :nodoc: @available(*, deprecated, renamed: "collectionPublisher") public var publisher: RealmPublishers.Value<Self> { RealmPublishers.Value(self) } /// A publisher that emits the collection each time the collection changes. public var collectionPublisher: RealmPublishers.Value<Self> { RealmPublishers.Value(self) } /// A publisher that emits the collection each time the collection changes on the given property keyPaths. public func collectionPublisher(keyPaths: [String]?) -> RealmPublishers.Value<Self> { return RealmPublishers.Value(self, keyPaths: keyPaths) } /// A publisher that emits a collection changeset each time the collection changes. public var changesetPublisher: RealmPublishers.MapChangeset<Self> { RealmPublishers.MapChangeset(self) } /// A publisher that emits a collection changeset each time the collection changes on the given property keyPaths. public func changesetPublisher(keyPaths: [String]?) -> RealmPublishers.MapChangeset<Self> { return RealmPublishers.MapChangeset(self, keyPaths: keyPaths) } } /// Creates a publisher that emits the object each time the object changes. /// /// - precondition: The object must be a managed object which has not been invalidated. /// - parameter object: A managed object to observe. /// - parameter keyPaths: The publisher emits changes on these property keyPaths. If `nil` the publisher emits changes for every property. /// - returns: A publisher that emits the object each time it changes. @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public func valuePublisher<T: Object>(_ object: T, keyPaths: [String]? = nil) -> RealmPublishers.Value<T> { RealmPublishers.Value<T>(object, keyPaths: keyPaths) } /// Creates a publisher that emits the collection each time the collection changes. /// /// - precondition: The collection must be a managed collection which has not been invalidated. /// - parameter object: A managed collection to observe. /// - parameter keyPaths: The publisher emits changes on these property keyPaths. If `nil` the publisher emits changes for every property. /// - returns: A publisher that emits the collection each time it changes. @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public func valuePublisher<T: RealmCollection>(_ collection: T, keyPaths: [String]? = nil) -> RealmPublishers.Value<T> { RealmPublishers.Value<T>(collection, keyPaths: keyPaths) } /// Creates a publisher that emits the object each time the object changes. /// /// - precondition: The object must be a managed object which has not been invalidated. /// - parameter object: A managed object to observe. /// - parameter keyPaths: The publisher emits changes on these property keyPaths. If `nil` the publisher emits changes for every property. /// - returns: A publisher that emits the object each time it changes. @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public func valuePublisher<T: ProjectionObservable>(_ projection: T, keyPaths: [String]? = nil) -> RealmPublishers.Value<T> { RealmPublishers.Value<T>(projection, keyPaths: keyPaths) } /// Creates a publisher that emits an object changeset each time the object changes. /// /// - precondition: The object must be a managed object which has not been invalidated. /// - parameter object: A managed object to observe. /// - parameter keyPaths: The publisher emits changes on these property keyPaths. If `nil` the publisher emits changes for every property. /// - returns: A publisher that emits an object changeset each time the object changes. @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public func changesetPublisher<T: Object>(_ object: T, keyPaths: [String]? = nil) -> RealmPublishers.ObjectChangeset<T> { precondition(object.realm != nil, "Only managed objects can be published") precondition(!object.isInvalidated, "Object is invalidated or deleted") return RealmPublishers.ObjectChangeset<T> { queue, fn in object.observe(keyPaths: keyPaths, on: queue, fn) } } /// Creates a publisher that emits an object changeset each time the object changes. /// /// - precondition: The object must be a projection. /// - parameter projection: A projection of Realm Object to observe. /// - parameter keyPaths: The publisher emits changes on these property keyPaths. If `nil` the publisher emits changes for every property. /// - returns: A publisher that emits an object changeset each time the projection changes. @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public func changesetPublisher<T: ProjectionObservable>(_ projection: T, keyPaths: [String]? = nil) -> RealmPublishers.ObjectChangeset<T> { precondition(projection.realm != nil, "Only managed objects can be published") precondition(!projection.isInvalidated, "Object is invalidated or deleted") return RealmPublishers.ObjectChangeset<T> { queue, fn in projection.observe(keyPaths: keyPaths ?? [], on: queue, fn) } } /// Creates a publisher that emits a collection changeset each time the collection changes. /// /// - precondition: The collection must be a managed collection which has not been invalidated. /// - parameter object: A managed collection to observe. /// - parameter keyPaths: The publisher emits changes on these property keyPaths. If `nil` the publisher emits changes for every property. /// - returns: A publisher that emits a collection changeset each time the collection changes. @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public func changesetPublisher<T: RealmCollection>(_ collection: T, keyPaths: [String]? = nil) -> RealmPublishers.CollectionChangeset<T> { RealmPublishers.CollectionChangeset<T>(collection, keyPaths: keyPaths) } // MARK: - Realm @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension Realm { /// A publisher that emits Void each time the object changes. /// /// Despite the name, this actually emits *after* the collection has changed. public var objectWillChange: RealmPublishers.RealmWillChange { return RealmPublishers.RealmWillChange(self) } } // MARK: - Object @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension Object: ObservableObject { /// A publisher that emits Void each time the object changes. /// /// Despite the name, this actually emits *after* the object has changed. public var objectWillChange: RealmPublishers.WillChange<Object> { return RealmPublishers.WillChange(self) } } @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension EmbeddedObject: ObservableObject { /// A publisher that emits Void each time the object changes. /// /// Despite the name, this actually emits *after* the embedded object has changed. public var objectWillChange: RealmPublishers.WillChange<EmbeddedObject> { return RealmPublishers.WillChange(self) } } @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension ObjectBase: RealmSubscribable { /// :nodoc: public func _observe<S: Subscriber>(_ keyPaths: [String]?, on queue: DispatchQueue?, _ subscriber: S) -> NotificationToken where S.Input: ObjectBase { return _observe(keyPaths: keyPaths, on: queue) { (object: S.Input?) in if let object = object { _ = subscriber.receive(object) } else { subscriber.receive(completion: .finished) } } } /// :nodoc: public func _observe<S>(_ keyPaths: [String]?, _ subscriber: S) -> NotificationToken where S: Subscriber, S.Input == Void { return _observe(keyPaths: keyPaths, { _ = subscriber.receive() }) } } // MARK: - List @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension List: ObservableObject, RealmSubscribable { /// A publisher that emits Void each time the collection changes. /// /// Despite the name, this actually emits *after* the collection has changed. public var objectWillChange: RealmPublishers.WillChange<List> { RealmPublishers.WillChange(self) } } // MARK: - MutableSet @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension MutableSet: ObservableObject, RealmSubscribable { /// A publisher that emits Void each time the collection changes. /// /// Despite the name, this actually emits *after* the collection has changed. public var objectWillChange: RealmPublishers.WillChange<MutableSet> { RealmPublishers.WillChange(self) } } // MARK: - Map @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension Map: ObservableObject, RealmSubscribable { /// A publisher that emits Void each time the collection changes. /// /// Despite the name, this actually emits *after* the collection has changed. public var objectWillChange: RealmPublishers.WillChange<Map> { RealmPublishers.WillChange(self) } } // MARK: - LinkingObjects @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension LinkingObjects: RealmSubscribable { /// A publisher that emits Void each time the collection changes. /// /// Despite the name, this actually emits *after* the collection has changed. public var objectWillChange: RealmPublishers.WillChange<LinkingObjects> { RealmPublishers.WillChange(self) } } // MARK: - Results @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension Results: RealmSubscribable { /// A publisher that emits Void each time the collection changes. /// /// Despite the name, this actually emits *after* the collection has changed. public var objectWillChange: RealmPublishers.WillChange<Results> { RealmPublishers.WillChange(self) } } // MARK: - Sectioned Results @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension SectionedResults: RealmSubscribable { /// :nodoc: public func _observe<S>(_ keyPaths: [String]? = nil, on queue: DispatchQueue? = nil, _ subscriber: S) -> NotificationToken where S: Subscriber, S.Input == Self { return observe(keyPaths: keyPaths, on: queue) { change in switch change { case .initial(let collection): _ = subscriber.receive(collection) case .update(let collection, deletions: _, insertions: _, modifications: _, sectionsToInsert: _, sectionsToDelete: _): _ = subscriber.receive(collection) } } } /// :nodoc: public func _observe<S: Subscriber>(_ keyPaths: [String]? = nil, _ subscriber: S) -> NotificationToken where S.Input == Void { return observe(keyPaths: keyPaths, on: nil) { _ in _ = subscriber.receive() } } /// A publisher that emits Void each time the sectioned results collection changes. /// /// Despite the name, this actually emits *after* the sectioned results collection has changed. public var objectWillChange: RealmPublishers.WillChange<SectionedResults> { RealmPublishers.WillChange(self) } /// A publisher that emits the sectioned results collection each time the sectioned results collection changes. public var collectionPublisher: RealmPublishers.Value<Self> { RealmPublishers.Value(self) } /// A publisher that emits the sectioned results collection each time the sectioned results collection changes on the given property keyPaths. public func collectionPublisher(keyPaths: [String]?) -> RealmPublishers.Value<Self> { return RealmPublishers.Value(self, keyPaths: keyPaths) } /// A publisher that emits a sectioned results collection changeset each time the sectioned results collection changes. public var changesetPublisher: RealmPublishers.SectionedResultsChangeset<Self> { RealmPublishers.SectionedResultsChangeset(self) } /// A publisher that emits a sectioned results collection changeset each time the sectioned results collection changes on the given property keyPaths. public func changesetPublisher(keyPaths: [String]?) -> RealmPublishers.SectionedResultsChangeset<Self> { return RealmPublishers.SectionedResultsChangeset(self, keyPaths: keyPaths) } } @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension ResultsSection: RealmSubscribable { /// :nodoc: public func _observe<S>(_ keyPaths: [String]? = nil, on queue: DispatchQueue? = nil, _ subscriber: S) -> NotificationToken where S: Subscriber, S.Input == Self { return observe(keyPaths: keyPaths, on: queue) { change in switch change { case .initial(let collection): _ = subscriber.receive(collection) case .update(let collection, deletions: _, insertions: _, modifications: _, sectionsToInsert: _, sectionsToDelete: _): _ = subscriber.receive(collection) } } } /// :nodoc: public func _observe<S: Subscriber>(_ keyPaths: [String]? = nil, _ subscriber: S) -> NotificationToken where S.Input == Void { return observe(keyPaths: keyPaths, on: nil) { _ in _ = subscriber.receive() } } /// A publisher that emits Void each time the results section collection changes. /// /// Despite the name, this actually emits *after* the results section collection has changed. public var objectWillChange: RealmPublishers.WillChange<ResultsSection> { RealmPublishers.WillChange(self) } /// A publisher that emits the results section collection each time the results section collection changes. public var collectionPublisher: RealmPublishers.Value<Self> { RealmPublishers.Value(self) } /// A publisher that emits the results section collection each time the results section collection changes on the given property keyPaths. public func collectionPublisher(keyPaths: [String]?) -> RealmPublishers.Value<Self> { return RealmPublishers.Value(self, keyPaths: keyPaths) } /// A publisher that emits a results section collection changeset each time the results section collection changes. public var changesetPublisher: RealmPublishers.SectionChangeset<Self> { RealmPublishers.SectionChangeset(self) } /// A publisher that emits a results section collection changeset each time the results section collection changes on the given property keyPaths. public func changesetPublisher(keyPaths: [String]?) -> RealmPublishers.SectionChangeset<Self> { return RealmPublishers.SectionChangeset(self, keyPaths: keyPaths) } } // MARK: RealmCollection @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension RealmCollectionImpl { /// :nodoc: public func _observe<S>(_ keyPaths: [String]? = nil, on queue: DispatchQueue? = nil, _ subscriber: S) -> NotificationToken where S: Subscriber, S.Input == Self { var col: Self? return collection.addNotificationBlock({ collection, _, _ in if col == nil, let collection = collection { col = self.collection === collection ? self : Self(collection: collection) } if let col = col { _ = subscriber.receive(col) } }, keyPaths: keyPaths, queue: queue) } /// :nodoc: public func _observe<S: Subscriber>(_ keyPaths: [String]? = nil, _ subscriber: S) -> NotificationToken where S.Input == Void { collection.addNotificationBlock({ _, _, _ in _ = subscriber.receive() }, keyPaths: keyPaths, queue: nil) } } @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension AnyRealmCollection: RealmSubscribable {} // MARK: RealmKeyedCollection @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) extension RealmKeyedCollection { /// :nodoc: public func _observe<S>(_ keyPaths: [String]?, on queue: DispatchQueue? = nil, _ subscriber: S) -> NotificationToken where S: Subscriber, S.Input == Self { // FIXME: we could skip some pointless work in converting the changeset to the Swift type here return observe(keyPaths: keyPaths, on: queue) { change in switch change { case .initial(let collection): _ = subscriber.receive(collection) case .update(let collection, deletions: _, insertions: _, modifications: _): _ = subscriber.receive(collection) case .error(let error): fatalError("Unexpected error \(error)") } } } /// :nodoc: public func _observe<S: Subscriber>(_ subscriber: S) -> NotificationToken where S.Input == Void { return observe(keyPaths: nil, on: nil) { _ in _ = subscriber.receive() } } /// :nodoc: public func _observe<S: Subscriber>(_ keyPaths: [String]? = nil, _ subscriber: S) -> NotificationToken where S.Input == Void { return observe(keyPaths: keyPaths, on: nil) { _ in _ = subscriber.receive() } } } // MARK: Subscriptions /// A subscription which wraps a Realm notification. @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) @frozen public struct ObservationSubscription: Subscription { private var token: NotificationToken internal init(token: NotificationToken) { self.token = token } /// A unique identifier for identifying publisher streams. public var combineIdentifier: CombineIdentifier { return CombineIdentifier(token) } /// This function is not implemented. /// /// Realm publishers do not support backpressure and so this function does nothing. public func request(_ demand: Subscribers.Demand) { } /// Stop emitting values on this subscription. public func cancel() { token.invalidate() } } /// A subscription which wraps a Realm AsyncOpenTask. @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) @frozen public struct AsyncOpenSubscription: Subscription { private let task: Realm.AsyncOpenTask internal init(task: Realm.AsyncOpenTask, callbackQueue: DispatchQueue, onProgressNotificationCallback: ((SyncSession.Progress) -> Void)?) { self.task = task if let onProgressNotificationCallback = onProgressNotificationCallback { self.task.addProgressNotification(queue: callbackQueue, block: onProgressNotificationCallback) } } /// A unique identifier for identifying publisher streams. public var combineIdentifier: CombineIdentifier { return CombineIdentifier(task.rlmTask) } /// This function is not implemented. /// /// Realm publishers do not support backpressure and so this function does nothing. public func request(_ demand: Subscribers.Demand) { } /// Stop emitting values on this subscription. public func cancel() { task.cancel() } } // MARK: Publishers /// Combine publishers for Realm types. /// /// You normally should not create any of these types directly, and should /// instead use the extension methods which create them. @available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public enum RealmPublishers { static private func realm<S: Scheduler>(_ config: RLMRealmConfiguration, _ scheduler: S) -> Realm? { try? Realm(RLMRealm(configuration: config, queue: scheduler as? DispatchQueue)) } static private func realm<S: Scheduler>(_ sourceRealm: Realm, _ scheduler: S) -> Realm? { return realm(sourceRealm.rlmRealm.configuration, scheduler) } /// A publisher which emits an asynchronously opened Realm. @frozen public struct AsyncOpenPublisher: Publisher { /// This publisher can fail if there is an error opening the Realm. public typealias Failure = Error /// This publisher emits an opened Realm. public typealias Output = Realm private let configuration: Realm.Configuration private let callbackQueue: DispatchQueue private let onProgressNotificationCallback: ((SyncSession.Progress) -> Void)? internal init(configuration: Realm.Configuration, callbackQueue: DispatchQueue = .main, onProgressNotificationCallback: ((SyncSession.Progress) -> Void)? = nil) { self.configuration = configuration self.callbackQueue = callbackQueue self.onProgressNotificationCallback = onProgressNotificationCallback } /// Triggers an event when there is a notification on the async open progress. /// /// This should be called directly after invoking the publisher. /// /// - Parameter onProgressNotificationCallback: Callback which will be invoked when there is an update on progress. /// - Returns: A publisher that emits an asynchronously opened Realm. public func onProgressNotification(_ onProgressNotificationCallback: @escaping ((SyncSession.Progress) -> Void)) -> Self { Self(configuration: configuration, callbackQueue: callbackQueue, onProgressNotificationCallback: onProgressNotificationCallback) } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, Output == S.Input { subscriber.receive(subscription: AsyncOpenSubscription(task: Realm.AsyncOpenTask(rlmTask: RLMRealm.asyncOpen(with: configuration.rlmConfiguration, callbackQueue: callbackQueue, callback: { rlmRealm, error in if let realm = rlmRealm.flatMap(Realm.init) { _ = subscriber.receive(realm) subscriber.receive(completion: .finished) } else { subscriber.receive(completion: .failure(error ?? Realm.Error.callFailed)) } })), callbackQueue: callbackQueue, onProgressNotificationCallback: onProgressNotificationCallback)) } /// Specifies the scheduler on which to perform the async open task. /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> Self { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return Self(configuration: configuration, callbackQueue: queue, onProgressNotificationCallback: onProgressNotificationCallback) } } /// A publisher which emits Void each time the Realm is refreshed. /// /// Despite the name, this actually emits *after* the Realm is refreshed. @frozen public struct RealmWillChange: Publisher { /// This publisher cannot fail. public typealias Failure = Never /// This publisher emits Void. public typealias Output = Void private let realm: Realm internal init(_ realm: Realm) { self.realm = realm } /// Captures the `NotificationToken` produced by observing a Realm Collection. /// /// This allows you to do notification skipping when performing a `Realm.write(withoutNotifying:)`. You should use this call if you /// require to write to the Realm database and ignore this specific observation chain. /// The `NotificationToken` will be saved on the specified `KeyPath`from the observation block set up in `receive(subscriber:)`. /// /// - Parameters: /// - object: The object which the `NotificationToken` is written to. /// - keyPath: The KeyPath which the `NotificationToken` is written to. /// - Returns: A `RealmWillChangeWithToken` Publisher. public func saveToken<T>(on object: T, for keyPath: WritableKeyPath<T, NotificationToken?>) -> RealmWillChangeWithToken<T> { return RealmWillChangeWithToken<T>(realm, object, keyPath) } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.realm.observe { _, _ in _ = subscriber.receive() } subscriber.receive(subscription: ObservationSubscription(token: token)) } } /// :nodoc: public class RealmWillChangeWithToken<T>: Publisher { /// This publisher cannot fail. public typealias Failure = Never /// This publisher emits Void. public typealias Output = Void internal typealias TokenParent = T internal typealias TokenKeyPath = WritableKeyPath<T, NotificationToken?> private let realm: Realm private var tokenParent: TokenParent private var tokenKeyPath: TokenKeyPath internal init(_ realm: Realm, _ tokenParent: TokenParent, _ tokenKeyPath: TokenKeyPath) { self.realm = realm self.tokenParent = tokenParent self.tokenKeyPath = tokenKeyPath } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.realm.observe { _, _ in _ = subscriber.receive() } tokenParent[keyPath: tokenKeyPath] = token subscriber.receive(subscription: ObservationSubscription(token: token)) } } /// A publisher which emits Void each time the object is mutated. /// /// Despite the name, this actually emits *after* the collection has changed. @frozen public struct WillChange<Collection: RealmSubscribable>: Publisher where Collection: ThreadConfined { /// This publisher cannot fail. public typealias Failure = Never /// This publisher emits Void. public typealias Output = Void private let collection: Collection internal init(_ collection: Collection) { self.collection = collection } /// Captures the `NotificationToken` produced by observing a Realm Collection. /// /// This allows you to do notification skipping when performing a `Realm.write(withoutNotifying:)`. You should use this call if you /// require to write to the Realm database and ignore this specific observation chain. /// The `NotificationToken` will be saved on the specified `KeyPath`from the observation block set up in `receive(subscriber:)`. /// /// - Parameters: /// - object: The object which the `NotificationToken` is written to. /// - keyPath: The KeyPath which the `NotificationToken` is written to. /// - Returns: A `WillChangeWithToken` Publisher. public func saveToken<T>(on object: T, at keyPath: WritableKeyPath<T, NotificationToken?>) -> WillChangeWithToken<Collection, T> { return WillChangeWithToken<Collection, T>(collection, object, keyPath) } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.collection._observe(nil, subscriber) subscriber.receive(subscription: ObservationSubscription(token: token)) } } /// A publisher which emits Void each time the object is mutated. /// /// Despite the name, this actually emits *after* the collection has changed. public class WillChangeWithToken<Collection: RealmSubscribable, T>: Publisher where Collection: ThreadConfined { /// This publisher cannot fail. public typealias Failure = Never /// This publisher emits Void. public typealias Output = Void internal typealias TokenParent = T internal typealias TokenKeyPath = WritableKeyPath<T, NotificationToken?> private let object: Collection private var tokenParent: TokenParent private var tokenKeyPath: TokenKeyPath internal init(_ object: Collection, _ tokenParent: TokenParent, _ tokenKeyPath: TokenKeyPath) { self.object = object self.tokenParent = tokenParent self.tokenKeyPath = tokenKeyPath } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.object._observe(nil, subscriber) tokenParent[keyPath: tokenKeyPath] = token subscriber.receive(subscription: ObservationSubscription(token: token)) } } /// A publisher which emits an object or collection each time that object is mutated. @frozen public struct Value<Subscribable: RealmSubscribable>: Publisher where Subscribable: ThreadConfined { /// This publisher cannot actually fail and will change to Never in the future. public typealias Failure = Error /// This publisher emits the object or collection which it is publishing. public typealias Output = Subscribable private let subscribable: Subscribable private let keyPaths: [String]? private let queue: DispatchQueue? internal init(_ subscribable: Subscribable, keyPaths: [String]? = nil, queue: DispatchQueue? = nil) { precondition(subscribable.realm != nil, "Only managed objects can be published") self.subscribable = subscribable self.keyPaths = keyPaths self.queue = queue } /// Captures the `NotificationToken` produced by observing a Realm Collection. /// /// This allows you to do notification skipping when performing a `Realm.write(withoutNotifying:)`. You should use this call if you /// require to write to the Realm database and ignore this specific observation chain. /// The `NotificationToken` will be saved on the specified `KeyPath`from the observation block set up in `receive(subscriber:)`. /// /// - Parameters: /// - object: The object which the `NotificationToken` is written to. /// - keyPath: The KeyPath which the `NotificationToken` is written to. /// - Returns: A `ValueWithToken` Publisher. public func saveToken<T>(on object: T, at keyPath: WritableKeyPath<T, NotificationToken?>) -> ValueWithToken<Subscribable, T> { return ValueWithToken<Subscribable, T>(subscribable, queue, object, keyPath) } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, Output == S.Input { subscriber.receive(subscription: ObservationSubscription(token: self.subscribable._observe(keyPaths, on: queue, subscriber))) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> Value<Subscribable> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return Value(subscribable, keyPaths: keyPaths, queue: queue) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)`. /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> RealmPublishers.Handover<Self, S> { return Handover(self, scheduler, self.subscribable.realm!) } } /// A publisher which emits an object or collection each time that object is mutated. public class ValueWithToken<Subscribable: RealmSubscribable, T>: Publisher where Subscribable: ThreadConfined { /// This publisher cannot actually fail and will change to Never in the future. public typealias Failure = Error /// This publisher emits the object or collection which it is publishing. public typealias Output = Subscribable internal typealias TokenParent = T internal typealias TokenKeyPath = WritableKeyPath<T, NotificationToken?> private let object: Subscribable private let queue: DispatchQueue? private var tokenParent: TokenParent private var tokenKeyPath: TokenKeyPath internal init(_ object: Subscribable, _ queue: DispatchQueue? = nil, _ tokenParent: TokenParent, _ tokenKeyPath: TokenKeyPath) { precondition(object.realm != nil, "Only managed objects can be published") self.object = object self.queue = queue self.tokenParent = tokenParent self.tokenKeyPath = tokenKeyPath } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, Output == S.Input { let token = self.object._observe(nil, on: queue, subscriber) tokenParent[keyPath: tokenKeyPath] = token subscriber.receive(subscription: ObservationSubscription(token: token)) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> ValueWithToken<Subscribable, T> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return ValueWithToken(object, queue, tokenParent, tokenKeyPath) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)`. /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> Handover<ValueWithToken, S> { return Handover(self, scheduler, self.object.realm!) } } /// A helper publisher used to support `receive(on:)` on Realm publishers. @frozen public struct Handover<Upstream: Publisher, S: Scheduler>: Publisher where Upstream.Output: ThreadConfined { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let config: RLMRealmConfiguration private let upstream: Upstream private let scheduler: S internal init(_ upstream: Upstream, _ scheduler: S, _ realm: Realm) { self.config = realm.rlmRealm.configuration self.upstream = upstream self.scheduler = scheduler } /// :nodoc: public func receive<Sub>(subscriber: Sub) where Sub: Subscriber, Sub.Failure == Failure, Output == Sub.Input { let scheduler = self.scheduler let config = self.config self.upstream .map { ThreadSafeReference(to: $0) } .receive(on: scheduler) .compactMap { realm(config, scheduler)?.resolve($0) } .receive(subscriber: subscriber) } } /// A publisher which makes `receive(on:)` work for streams of thread-confined objects /// /// Create using .threadSafeReference() @frozen public struct MakeThreadSafe<Upstream: Publisher>: Publisher where Upstream.Output: ThreadConfined { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let upstream: Upstream internal init(_ upstream: Upstream) { self.upstream = upstream } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, Output == S.Input { self.upstream.receive(subscriber: subscriber) } /// Specifies the scheduler on which to receive elements from the publisher. /// /// This publisher converts each value emitted by the upstream /// publisher to a `ThreadSafeReference`, passes it to the target /// scheduler, and then converts back to the original type. /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandover<Upstream, S> { DeferredHandover(self.upstream, scheduler) } } /// A publisher which delivers thread-confined values to a serial dispatch queue. /// /// Create using `.threadSafeReference().receive(on: queue)` on a publisher /// that emits thread-confined objects. @frozen public struct DeferredHandover<Upstream: Publisher, S: Scheduler>: Publisher where Upstream.Output: ThreadConfined { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let upstream: Upstream private let scheduler: S internal init(_ upstream: Upstream, _ scheduler: S) { self.upstream = upstream self.scheduler = scheduler } private enum Handover { case object(_ object: Output) case tsr(_ tsr: ThreadSafeReference<Output>, config: RLMRealmConfiguration) } /// :nodoc: public func receive<Sub>(subscriber: Sub) where Sub: Subscriber, Sub.Failure == Failure, Output == Sub.Input { let scheduler = self.scheduler self.upstream .map { (obj: Output) -> Handover in guard let realm = obj.realm, !realm.isFrozen else { return .object(obj) } return .tsr(ThreadSafeReference(to: obj), config: realm.rlmRealm.configuration) } .receive(on: scheduler) .compactMap { (handover: Handover) -> Output? in switch handover { case .object(let obj): return obj case .tsr(let tsr, let config): return realm(config, scheduler)?.resolve(tsr) } } .receive(subscriber: subscriber) } } /// A publisher which emits ObjectChange<T> each time the observed object is modified /// /// `receive(on:)` and `subscribe(on:)` can be called directly on this /// publisher, and calling `.threadSafeReference()` is only required if /// there is an intermediate transform. If `subscribe(on:)` is used, it /// should always be the first operation in the pipeline. /// /// Create this publisher using the `objectChangeset()` function. @frozen public struct ObjectChangeset<O: ThreadConfined>: Publisher { /// This publisher emits a ObjectChange<T> indicating which object and /// which properties of that object have changed each time a Realm is /// refreshed after a write transaction which modifies the observed /// object. public typealias Output = ObjectChange<O> /// This publisher reports error via the `.error` case of ObjectChange. public typealias Failure = Never @usableFromInline internal typealias Observe = (_ queue: DispatchQueue?, @escaping (Output) -> Void) -> NotificationToken private let observe: Observe private let queue: DispatchQueue? internal init(_ observe: @escaping Observe, queue: DispatchQueue? = nil) { self.observe = observe self.queue = queue } /// Captures the `NotificationToken` produced by observing a Realm Collection. /// /// This allows you to do notification skipping when performing a `Realm.write(withoutNotifying:)`. You should use this call if you /// require to write to the Realm database and ignore this specific observation chain. /// The `NotificationToken` will be saved on the specified `KeyPath`from the observation block set up in `receive(subscriber:)`. /// /// - Parameters: /// - object: The object which the `NotificationToken` is written to. /// - keyPath: The KeyPath which the `NotificationToken` is written to. /// - Returns: A `ObjectChangesetWithToken` Publisher. public func saveToken<T>(on tokenParent: T, at keyPath: WritableKeyPath<T, NotificationToken?>) -> ObjectChangesetWithToken<O, T> { return ObjectChangesetWithToken<O, T>(observe, queue, tokenParent, keyPath) } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = observe(self.queue) { change in switch change { case .change(let o, let properties): _ = subscriber.receive(.change(o, properties)) case .error(let error): _ = subscriber.receive(.error(error)) case .deleted: subscriber.receive(completion: .finished) } } subscriber.receive(subscription: ObservationSubscription(token: token)) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> ObjectChangeset<O> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return ObjectChangeset(observe, queue: queue) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)` /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverObjectChangeset<Self, O, S> { DeferredHandoverObjectChangeset(self, scheduler) } } /// A publisher which emits ObjectChange<T> each time the observed object is modified /// /// `receive(on:)` and `subscribe(on:)` can be called directly on this /// publisher, and calling `.threadSafeReference()` is only required if /// there is an intermediate transform. If `subscribe(on:)` is used, it /// should always be the first operation in the pipeline. /// /// Create this publisher using the `objectChangeset()` function. public class ObjectChangesetWithToken<O: Object, T>: Publisher { /// This publisher emits a ObjectChange<T> indicating which object and /// which properties of that object have changed each time a Realm is /// refreshed after a write transaction which modifies the observed /// object. public typealias Output = ObjectChange<O> /// This publisher reports error via the `.error` case of ObjectChange. public typealias Failure = Never internal typealias TokenParent = T internal typealias TokenKeyPath = WritableKeyPath<T, NotificationToken?> private var tokenParent: TokenParent private var tokenKeyPath: TokenKeyPath @usableFromInline internal typealias Observe = (_ queue: DispatchQueue?, @escaping (Output) -> Void) -> NotificationToken private let observe: Observe private let queue: DispatchQueue? internal init(_ observe: @escaping Observe, _ queue: DispatchQueue? = nil, _ tokenParent: TokenParent, _ tokenKeyPath: TokenKeyPath) { self.observe = observe self.queue = queue self.tokenParent = tokenParent self.tokenKeyPath = tokenKeyPath } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = observe(self.queue) { change in switch change { case .change(let o, let properties): _ = subscriber.receive(.change(o, properties)) case .error(let error): _ = subscriber.receive(.error(error)) case .deleted: subscriber.receive(completion: .finished) } } tokenParent[keyPath: tokenKeyPath] = token subscriber.receive(subscription: ObservationSubscription(token: token)) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> ObjectChangesetWithToken<O, T> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return ObjectChangesetWithToken(observe, queue, tokenParent, tokenKeyPath) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)` /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverObjectChangeset<ObjectChangesetWithToken, T, S> { DeferredHandoverObjectChangeset(self, scheduler) } } /// A helper publisher created by calling `.threadSafeReference()` on a publisher which emits thread-confined values. @frozen public struct MakeThreadSafeObjectChangeset<Upstream: Publisher, T: ThreadConfined>: Publisher where Upstream.Output == ObjectChange<T> { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let upstream: Upstream internal init(_ upstream: Upstream) { self.upstream = upstream } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, Output == S.Input { self.upstream.receive(subscriber: subscriber) } /// Specifies the scheduler to deliver object changesets to. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)`. /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverObjectChangeset<Upstream, T, S> { DeferredHandoverObjectChangeset(self.upstream, scheduler) } } /// A publisher which delivers thread-confined object changesets to a serial dispatch queue. /// /// Create using `.threadSafeReference().receive(on: queue)` on a publisher /// that emits `ObjectChange`. @frozen public struct DeferredHandoverObjectChangeset<Upstream: Publisher, T: ThreadConfined, S: Scheduler>: Publisher where Upstream.Output == ObjectChange<T> { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let upstream: Upstream private let scheduler: S internal init(_ upstream: Upstream, _ scheduler: S) { self.upstream = upstream self.scheduler = scheduler } private enum Handover { // .error and .change containing a frozen object can be delivered // without any handover case passthrough(_ change: ObjectChange<T>) // .change containing a live object need to be wrapped in a TSR. // We also hold a reference to a pinned Realm to ensure that the // source version remains pinned and we can deliver the object at // the same version as the change information. case tsr(_ pin: RLMPinnedRealm, _ tsr: ThreadSafeReference<T>, _ properties: [PropertyChange]) } /// :nodoc: public func receive<Sub>(subscriber: Sub) where Sub: Subscriber, Sub.Failure == Failure, Output == Sub.Input { let scheduler = self.scheduler self.upstream .map { (change: Output) -> Handover in guard case .change(let obj, let properties) = change else { return .passthrough(change) } guard let realm = obj.realm, !realm.isFrozen else { return .passthrough(change) } return .tsr(RLMPinnedRealm(realm: realm.rlmRealm), ThreadSafeReference(to: obj), properties) } .receive(on: scheduler) .compactMap { (handover: Handover) -> Output? in switch handover { case .passthrough(let change): return change case .tsr(let pin, let tsr, let properties): defer { pin.unpin() } if let resolved = realm(pin.configuration, scheduler)?.resolve(tsr) { return .change(resolved, properties) } return nil } } .receive(subscriber: subscriber) } } /// A publisher which emits RealmCollectionChange<T> each time the observed object is modified /// /// `receive(on:)` and `subscribe(on:)` can be called directly on this /// publisher, and calling `.threadSafeReference()` is only required if /// there is an intermediate transform. If `subscribe(on:)` is used, it /// should always be the first operation in the pipeline. /// /// Create this publisher using the `changesetPublisher` property on RealmCollection. @frozen public struct CollectionChangeset<Collection: RealmCollection>: Publisher { public typealias Output = RealmCollectionChange<Collection> /// This publisher reports error via the `.error` case of RealmCollectionChange. public typealias Failure = Never private let collection: Collection private let keyPaths: [String]? private let queue: DispatchQueue? internal init(_ collection: Collection, keyPaths: [String]? = nil, queue: DispatchQueue? = nil) { precondition(collection.realm != nil, "Only managed collections can be published") self.collection = collection self.keyPaths = keyPaths self.queue = queue } /// Captures the `NotificationToken` produced by observing a Realm Collection. /// /// This allows you to do notification skipping when performing a `Realm.write(withoutNotifying:)`. You should use this call if you /// require to write to the Realm database and ignore this specific observation chain. /// The `NotificationToken` will be saved on the specified `KeyPath`from the observation block set up in `receive(subscriber:)`. /// /// - Parameters: /// - object: The object which the `NotificationToken` is written to. /// - keyPath: The KeyPath which the `NotificationToken` is written to. /// - Returns: A `CollectionChangesetWithToken` Publisher. public func saveToken<T>(on object: T, at keyPath: WritableKeyPath<T, NotificationToken?>) -> CollectionChangesetWithToken<Collection, T> { return CollectionChangesetWithToken<Collection, T>(collection, queue, object, keyPath) } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.collection.observe(keyPaths: self.keyPaths, on: self.queue) { change in _ = subscriber.receive(change) } subscriber.receive(subscription: ObservationSubscription(token: token)) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> CollectionChangeset<Collection> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return CollectionChangeset(collection, keyPaths: self.keyPaths, queue: queue) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)` /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverCollectionChangeset<Self, Collection, S> { DeferredHandoverCollectionChangeset(self, scheduler) } } /// A publisher which emits RealmMapChange<Key, Value> each time the observed object is modified /// /// `receive(on:)` and `subscribe(on:)` can be called directly on this /// publisher, and calling `.threadSafeReference()` is only required if /// there is an intermediate transform. If `subscribe(on:)` is used, it /// should always be the first operation in the pipeline. /// /// Create this publisher using the `changesetPublisher` property on RealmCollection. @frozen public struct MapChangeset<Collection: RealmKeyedCollection>: Publisher { public typealias Output = RealmMapChange<Collection> /// This publisher reports error via the `.error` case of RealmMapChange. public typealias Failure = Never private let collection: Collection private let keyPaths: [String]? private let queue: DispatchQueue? internal init(_ collection: Collection, keyPaths: [String]? = nil, queue: DispatchQueue? = nil) { precondition(collection.realm != nil, "Only managed collections can be published") self.collection = collection self.keyPaths = keyPaths self.queue = queue } /// Captures the `NotificationToken` produced by observing a Realm Collection. /// /// This allows you to do notification skipping when performing a `Realm.write(withoutNotifying:)`. You should use this call if you /// require to write to the Realm database and ignore this specific observation chain. /// The `NotificationToken` will be saved on the specified `KeyPath`from the observation block set up in `receive(subscriber:)`. /// /// - Parameters: /// - object: The object which the `NotificationToken` is written to. /// - keyPath: The KeyPath which the `NotificationToken` is written to. /// - Returns: A `CollectionChangesetWithToken` Publisher. public func saveToken<T>(on object: T, at keyPath: WritableKeyPath<T, NotificationToken?>) -> MapChangesetWithToken<Collection, T> { return MapChangesetWithToken<Collection, T>(collection, queue, object, keyPath) } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.collection.observe(keyPaths: self.keyPaths, on: self.queue) { change in _ = subscriber.receive(change) } subscriber.receive(subscription: ObservationSubscription(token: token)) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> MapChangeset<Collection> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return MapChangeset(collection, keyPaths: self.keyPaths, queue: queue) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)` /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverKeyedCollectionChangeset<Self, Collection, S> { DeferredHandoverKeyedCollectionChangeset(self, scheduler) } } /// A publisher which emits SectionedResultsChange<Collection> each time the observed object is modified /// /// `receive(on:)` and `subscribe(on:)` can be called directly on this /// publisher, and calling `.threadSafeReference()` is only required if /// there is an intermediate transform. If `subscribe(on:)` is used, it /// should always be the first operation in the pipeline. /// /// Create this publisher using the `changesetPublisher` property on RealmSectionedResult. @frozen public struct SectionedResultsChangeset<Collection: RealmSectionedResult>: Publisher { public typealias Output = SectionedResultsChange<Collection> /// This publisher reports error via the `.error` case of SectionedResultsChange. public typealias Failure = Never private let collection: Collection private let keyPaths: [String]? private let queue: DispatchQueue? internal init(_ collection: Collection, keyPaths: [String]? = nil, queue: DispatchQueue? = nil) { precondition(collection.realm != nil, "Only managed collections can be published") self.collection = collection self.keyPaths = keyPaths self.queue = queue } /// Captures the `NotificationToken` produced by observing the collection. /// /// This allows you to do notification skipping when performing a `Realm.write(withoutNotifying:)`. You should use this call if you /// require to write to the Realm database and ignore this specific observation chain. /// The `NotificationToken` will be saved on the specified `KeyPath`from the observation block set up in `receive(subscriber:)`. /// /// - Parameters: /// - object: The object which the `NotificationToken` is written to. /// - keyPath: The KeyPath which the `NotificationToken` is written to. /// - Returns: A `SectionedResultsChangesetWithToken` Publisher. public func saveToken<T>(on object: T, at keyPath: WritableKeyPath<T, NotificationToken?>) -> SectionedResultsChangesetWithToken<Collection, T> { return SectionedResultsChangesetWithToken<Collection, T>(collection, queue, object, keyPath) } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.collection.observe(keyPaths: self.keyPaths, on: self.queue) { change in _ = subscriber.receive(change) } subscriber.receive(subscription: ObservationSubscription(token: token)) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> SectionedResultsChangeset<Collection> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return SectionedResultsChangeset(collection, keyPaths: self.keyPaths, queue: queue) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)` /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverSectionedResultsChangeset<Self, Collection, S> { DeferredHandoverSectionedResultsChangeset(self, scheduler) } } /// A publisher which emits SectionedResultsChange<Collection> each time the observed object is modified /// /// `receive(on:)` and `subscribe(on:)` can be called directly on this /// publisher, and calling `.threadSafeReference()` is only required if /// there is an intermediate transform. If `subscribe(on:)` is used, it /// should always be the first operation in the pipeline. /// /// Create this publisher using the `changesetPublisher` property on RealmSectionedResult. @frozen public struct SectionChangeset<Collection: RealmSectionedResult>: Publisher { public typealias Output = SectionedResultsChange<Collection> /// This publisher reports error via the `.error` case of SectionedResultsChange. public typealias Failure = Never private let collection: Collection private let keyPaths: [String]? private let queue: DispatchQueue? internal init(_ collection: Collection, keyPaths: [String]? = nil, queue: DispatchQueue? = nil) { precondition(collection.realm != nil, "Only managed collections can be published") self.collection = collection self.keyPaths = keyPaths self.queue = queue } /// Captures the `NotificationToken` produced by observing a the collection. /// /// This allows you to do notification skipping when performing a `Realm.write(withoutNotifying:)`. You should use this call if you /// require to write to the Realm database and ignore this specific observation chain. /// The `NotificationToken` will be saved on the specified `KeyPath`from the observation block set up in `receive(subscriber:)`. /// /// - Parameters: /// - object: The object which the `NotificationToken` is written to. /// - keyPath: The KeyPath which the `NotificationToken` is written to. /// - Returns: A `SectionedResultsChangesetWithToken` Publisher. public func saveToken<T>(on object: T, at keyPath: WritableKeyPath<T, NotificationToken?>) -> SectionedResultsChangesetWithToken<Collection, T> { return SectionedResultsChangesetWithToken<Collection, T>(collection, queue, object, keyPath) } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.collection.observe(keyPaths: self.keyPaths, on: self.queue) { change in _ = subscriber.receive(change) } subscriber.receive(subscription: ObservationSubscription(token: token)) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> SectionedResultsChangeset<Collection> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return SectionedResultsChangeset(collection, keyPaths: self.keyPaths, queue: queue) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)` /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverSectionedResultsChangeset<Self, Collection, S> { DeferredHandoverSectionedResultsChangeset(self, scheduler) } } /// A publisher which emits RealmCollectionChange<T> each time the observed object is modified /// /// `receive(on:)` and `subscribe(on:)` can be called directly on this /// publisher, and calling `.threadSafeReference()` is only required if /// there is an intermediate transform. If `subscribe(on:)` is used, it /// should always be the first operation in the pipeline. /// /// Create this publisher using the `changesetPublisher` property on RealmCollection. public class CollectionChangesetWithToken<Collection: RealmCollection, T>: Publisher { public typealias Output = RealmCollectionChange<Collection> /// This publisher reports error via the `.error` case of RealmCollectionChange. public typealias Failure = Never internal typealias TokenParent = T internal typealias TokenKeyPath = WritableKeyPath<T, NotificationToken?> private var tokenParent: TokenParent private var tokenKeyPath: TokenKeyPath private let collection: Collection private let queue: DispatchQueue? internal init(_ collection: Collection, _ queue: DispatchQueue? = nil, _ tokenParent: TokenParent, _ tokenKeyPath: TokenKeyPath) { precondition(collection.realm != nil, "Only managed collections can be published") self.collection = collection self.queue = queue self.tokenParent = tokenParent self.tokenKeyPath = tokenKeyPath } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.collection.observe(on: self.queue) { change in _ = subscriber.receive(change) } tokenParent[keyPath: tokenKeyPath] = token subscriber.receive(subscription: ObservationSubscription(token: token)) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> CollectionChangesetWithToken<Collection, T> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return CollectionChangesetWithToken(collection, queue, tokenParent, tokenKeyPath) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)` /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverCollectionChangeset<CollectionChangesetWithToken, Collection, S> { DeferredHandoverCollectionChangeset(self, scheduler) } } /// A publisher which emits SectionedResultsChange<T> each time the observed object is modified /// /// `receive(on:)` and `subscribe(on:)` can be called directly on this /// publisher, and calling `.threadSafeReference()` is only required if /// there is an intermediate transform. If `subscribe(on:)` is used, it /// should always be the first operation in the pipeline. /// /// Create this publisher using the `changesetPublisher` property on RealmSectionedResult. public class SectionedResultsChangesetWithToken<Collection: RealmSectionedResult, T>: Publisher { public typealias Output = SectionedResultsChange<Collection> /// This publisher reports error via the `.error` case of RealmCollectionChange. public typealias Failure = Never internal typealias TokenParent = T internal typealias TokenKeyPath = WritableKeyPath<T, NotificationToken?> private var tokenParent: TokenParent private var tokenKeyPath: TokenKeyPath private let collection: Collection private let queue: DispatchQueue? internal init(_ collection: Collection, _ queue: DispatchQueue? = nil, _ tokenParent: TokenParent, _ tokenKeyPath: TokenKeyPath) { precondition(collection.realm != nil, "Only managed collections can be published") self.collection = collection self.queue = queue self.tokenParent = tokenParent self.tokenKeyPath = tokenKeyPath } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.collection.observe(on: self.queue) { change in _ = subscriber.receive(change) } tokenParent[keyPath: tokenKeyPath] = token subscriber.receive(subscription: ObservationSubscription(token: token)) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> SectionedResultsChangesetWithToken<Collection, T> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return SectionedResultsChangesetWithToken(collection, queue, tokenParent, tokenKeyPath) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)` /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverSectionedResultsChangeset<SectionedResultsChangesetWithToken, Collection, S> { DeferredHandoverSectionedResultsChangeset(self, scheduler) } } /// A publisher which emits SectionedResultsChange<T> each time the observed object is modified /// /// `receive(on:)` and `subscribe(on:)` can be called directly on this /// publisher, and calling `.threadSafeReference()` is only required if /// there is an intermediate transform. If `subscribe(on:)` is used, it /// should always be the first operation in the pipeline. /// /// Create this publisher using the `changesetPublisher` property on RealmSectionedResult. public class SectionChangesetWithToken<Collection: RealmSectionedResult, T>: Publisher { public typealias Output = SectionedResultsChange<Collection> /// This publisher reports error via the `.error` case of RealmCollectionChange. public typealias Failure = Never internal typealias TokenParent = T internal typealias TokenKeyPath = WritableKeyPath<T, NotificationToken?> private var tokenParent: TokenParent private var tokenKeyPath: TokenKeyPath private let collection: Collection private let queue: DispatchQueue? internal init(_ collection: Collection, _ queue: DispatchQueue? = nil, _ tokenParent: TokenParent, _ tokenKeyPath: TokenKeyPath) { precondition(collection.realm != nil, "Only managed collections can be published") self.collection = collection self.queue = queue self.tokenParent = tokenParent self.tokenKeyPath = tokenKeyPath } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.collection.observe(on: self.queue) { change in _ = subscriber.receive(change) } tokenParent[keyPath: tokenKeyPath] = token subscriber.receive(subscription: ObservationSubscription(token: token)) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> SectionedResultsChangesetWithToken<Collection, T> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return SectionedResultsChangesetWithToken(collection, queue, tokenParent, tokenKeyPath) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)` /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverSectionChangeset<SectionChangesetWithToken, Collection, S> { DeferredHandoverSectionChangeset(self, scheduler) } } /// A publisher which emits RealmMapChange<T> each time the observed object is modified /// /// `receive(on:)` and `subscribe(on:)` can be called directly on this /// publisher, and calling `.threadSafeReference()` is only required if /// there is an intermediate transform. If `subscribe(on:)` is used, it /// should always be the first operation in the pipeline. /// /// Create this publisher using the `changesetPublisher` property on RealmCollection. public class MapChangesetWithToken<Collection: RealmKeyedCollection, T>: Publisher { public typealias Output = RealmMapChange<Collection> /// This publisher reports error via the `.error` case of RealmCollectionChange. public typealias Failure = Never internal typealias TokenParent = T internal typealias TokenKeyPath = WritableKeyPath<T, NotificationToken?> private var tokenParent: TokenParent private var tokenKeyPath: TokenKeyPath private let collection: Collection private let queue: DispatchQueue? internal init(_ collection: Collection, _ queue: DispatchQueue? = nil, _ tokenParent: TokenParent, _ tokenKeyPath: TokenKeyPath) { precondition(collection.realm != nil, "Only managed collections can be published") self.collection = collection self.queue = queue self.tokenParent = tokenParent self.tokenKeyPath = tokenKeyPath } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Never, Output == S.Input { let token = self.collection.observe(on: self.queue) { change in _ = subscriber.receive(change) } tokenParent[keyPath: tokenKeyPath] = token subscriber.receive(subscription: ObservationSubscription(token: token)) } /// Specifies the scheduler on which to perform subscribe, cancel, and request operations. /// /// For Realm Publishers, this determines which queue the underlying /// change notifications are sent to. If `receive(on:)` is not used /// subsequently, it also will determine which queue elements received /// from the publisher are evaluated on. Currently only serial dispatch /// queues are supported, and the `options:` parameter is not /// supported. /// /// - parameter scheduler: The serial dispatch queue to perform the subscription on. /// - returns: A publisher which subscribes on the given scheduler. public func subscribe<S: Scheduler>(on scheduler: S) -> MapChangesetWithToken<Collection, T> { guard let queue = scheduler as? DispatchQueue else { fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.") } return MapChangesetWithToken(collection, queue, tokenParent, tokenKeyPath) } /// Specifies the scheduler on which to perform downstream operations. /// /// This differs from `subscribe(on:)` in how it is integrated with the /// autorefresh cycle. When using `subscribe(on:)`, the subscription is /// performed on the target scheduler and the publisher will emit the /// collection during the refresh. When using `receive(on:)`, the /// collection is then converted to a `ThreadSafeReference` and /// delivered to the target scheduler with no integration into the /// autorefresh cycle, meaning it may arrive some time after the /// refresh occurs. /// /// When in doubt, you probably want `subscribe(on:)` /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverKeyedCollectionChangeset<MapChangesetWithToken, Collection, S> { DeferredHandoverKeyedCollectionChangeset(self, scheduler) } } /// A helper publisher created by calling `.threadSafeReference()` on a /// publisher which emits `RealmCollectionChange`. @frozen public struct MakeThreadSafeCollectionChangeset<Upstream: Publisher, T: RealmCollection>: Publisher where Upstream.Output == RealmCollectionChange<T> { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let upstream: Upstream internal init(_ upstream: Upstream) { self.upstream = upstream } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, Output == S.Input { self.upstream.receive(subscriber: subscriber) } /// Specifies the scheduler on which to receive elements from the publisher. /// /// This publisher converts each value emitted by the upstream /// publisher to a `ThreadSafeReference`, passes it to the target /// scheduler, and then converts back to the original type. /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverCollectionChangeset<Upstream, T, S> { DeferredHandoverCollectionChangeset(self.upstream, scheduler) } } /// A helper publisher created by calling `.threadSafeReference()` on a /// publisher which emits `RealmMapChange`. @frozen public struct MakeThreadSafeKeyedCollectionChangeset<Upstream: Publisher, T: RealmKeyedCollection>: Publisher where Upstream.Output == RealmMapChange<T> { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let upstream: Upstream internal init(_ upstream: Upstream) { self.upstream = upstream } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, Output == S.Input { self.upstream.receive(subscriber: subscriber) } /// Specifies the scheduler on which to receive elements from the publisher. /// /// This publisher converts each value emitted by the upstream /// publisher to a `ThreadSafeReference`, passes it to the target /// scheduler, and then converts back to the original type. /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverKeyedCollectionChangeset<Upstream, T, S> { DeferredHandoverKeyedCollectionChangeset(self.upstream, scheduler) } } /// A helper publisher created by calling `.threadSafeReference()` on a /// publisher which emits `SectionedResultsChange`. @frozen public struct MakeThreadSafeSectionedResultsChangeset<Upstream: Publisher, T: RealmSectionedResult>: Publisher where Upstream.Output == SectionedResultsChange<T> { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let upstream: Upstream internal init(_ upstream: Upstream) { self.upstream = upstream } /// :nodoc: public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, Output == S.Input { self.upstream.receive(subscriber: subscriber) } /// Specifies the scheduler on which to receive elements from the publisher. /// /// This publisher converts each value emitted by the upstream /// publisher to a `ThreadSafeReference`, passes it to the target /// scheduler, and then converts back to the original type. /// /// - parameter scheduler: The serial dispatch queue to receive values on. /// - returns: A publisher which delivers values to the given scheduler. public func receive<S: Scheduler>(on scheduler: S) -> DeferredHandoverSectionedResultsChangeset<Upstream, T, S> { DeferredHandoverSectionedResultsChangeset(self.upstream, scheduler) } } /// A publisher which delivers thread-confined collection changesets to a /// serial dispatch queue. /// /// Create using `.threadSafeReference().receive(on: queue)` on a publisher /// that emits `RealmCollectionChange`. @frozen public struct DeferredHandoverCollectionChangeset<Upstream: Publisher, T: RealmCollection, S: Scheduler>: Publisher where Upstream.Output == RealmCollectionChange<T> { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let upstream: Upstream private let scheduler: S internal init(_ upstream: Upstream, _ scheduler: S) { self.upstream = upstream self.scheduler = scheduler } private enum Handover { // A collection change which does not contain a live object and so // can be delivered directly case passthrough(_ change: RealmCollectionChange<T>) // The initial and update notifications for live collections need // to wrap the collection in a thread-safe reference and hold onto // a pinned Realm to ensure that the version which the change // information is for stays pinned until it's delivered. case initial(_ pin: RLMPinnedRealm, _ tsr: ThreadSafeReference<T>) case update(_ pin: RLMPinnedRealm, _ tsr: ThreadSafeReference<T>, deletions: [Int], insertions: [Int], modifications: [Int]) } /// :nodoc: public func receive<Sub>(subscriber: Sub) where Sub: Subscriber, Sub.Failure == Failure, Output == Sub.Input { let scheduler = self.scheduler self.upstream .map { (change: Output) -> Handover in switch change { case .initial(let collection): guard let realm = collection.realm, !realm.isFrozen else { return .passthrough(change) } return .initial(RLMPinnedRealm(realm: realm.rlmRealm), ThreadSafeReference(to: collection)) case .update(let collection, deletions: let deletions, insertions: let insertions, modifications: let modifications): guard let realm = collection.realm, !realm.isFrozen else { return .passthrough(change) } return .update(RLMPinnedRealm(realm: realm.rlmRealm), ThreadSafeReference(to: collection), deletions: deletions, insertions: insertions, modifications: modifications) case .error: return .passthrough(change) } } .receive(on: scheduler) .compactMap { (handover: Handover) -> Output? in switch handover { case .passthrough(let change): return change case .initial(let pin, let tsr): defer { pin.unpin() } if let resolved = realm(pin.configuration, scheduler)?.resolve(tsr) { return .initial(resolved) } return nil case .update(let pin, let tsr, deletions: let deletions, insertions: let insertions, modifications: let modifications): defer { pin.unpin() } if let resolved = realm(pin.configuration, scheduler)?.resolve(tsr) { return .update(resolved, deletions: deletions, insertions: insertions, modifications: modifications) } return nil } } .receive(subscriber: subscriber) } } /// A publisher which delivers thread-confined `Map` changesets to a /// serial dispatch queue. /// /// Create using `.threadSafeReference().receive(on: queue)` on a publisher /// that emits `RealmMapChange`. @frozen public struct DeferredHandoverKeyedCollectionChangeset<Upstream: Publisher, T: RealmKeyedCollection, S: Scheduler>: Publisher where Upstream.Output == RealmMapChange<T> { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let upstream: Upstream private let scheduler: S internal init(_ upstream: Upstream, _ scheduler: S) { self.upstream = upstream self.scheduler = scheduler } private enum Handover { // A collection change which does not contain a live object and so // can be delivered directly case passthrough(_ change: RealmMapChange<T>) // The initial and update notifications for live collections need // to wrap the collection in a thread-safe reference and hold onto // a pinned Realm to ensure that the version which the change // information is for stays pinned until it's delivered. case initial(_ pin: RLMPinnedRealm, _ tsr: ThreadSafeReference<T>) case update(_ pin: RLMPinnedRealm, _ tsr: ThreadSafeReference<T>, deletions: [T.Key], insertions: [T.Key], modifications: [T.Key]) } /// :nodoc: public func receive<Sub>(subscriber: Sub) where Sub: Subscriber, Sub.Failure == Failure, Output == Sub.Input { let scheduler = self.scheduler self.upstream .map { (change: Output) -> Handover in switch change { case .initial(let collection): guard let realm = collection.realm, !realm.isFrozen else { return .passthrough(change) } return .initial(RLMPinnedRealm(realm: realm.rlmRealm), ThreadSafeReference(to: collection)) case .update(let collection, deletions: let deletions, insertions: let insertions, modifications: let modifications): guard let realm = collection.realm, !realm.isFrozen else { return .passthrough(change) } return .update(RLMPinnedRealm(realm: realm.rlmRealm), ThreadSafeReference(to: collection), deletions: deletions, insertions: insertions, modifications: modifications) case .error: return .passthrough(change) } } .receive(on: scheduler) .compactMap { (handover: Handover) -> Output? in switch handover { case .passthrough(let change): return change case .initial(let pin, let tsr): defer { pin.unpin() } if let resolved = realm(pin.configuration, scheduler)?.resolve(tsr) { return .initial(resolved) } return nil case .update(let pin, let tsr, deletions: let deletions, insertions: let insertions, modifications: let modifications): defer { pin.unpin() } if let resolved = realm(pin.configuration, scheduler)?.resolve(tsr) { return .update(resolved, deletions: deletions, insertions: insertions, modifications: modifications) } return nil } } .receive(subscriber: subscriber) } } private enum SectionedHandover<T: RealmSectionedResult, S: Scheduler> { // A collection change which does not contain a live object and so // can be delivered directly case passthrough(_ change: SectionedResultsChange<T>) // The initial and update notifications for live collections need // to wrap the collection in a thread-safe reference and hold onto // a pinned Realm to ensure that the version which the change // information is for stays pinned until it's delivered. case initial(_ pin: RLMPinnedRealm, _ tsr: ThreadSafeReference<T>) case update(_ pin: RLMPinnedRealm, _ tsr: ThreadSafeReference<T>, deletions: [IndexPath], insertions: [IndexPath], modifications: [IndexPath], sectionsToInsert: IndexSet, sectionsToDelete: IndexSet) init(_ change: SectionedResultsChange<T>) { switch change { case .initial(let collection): guard let realm = collection.realm, !realm.isFrozen else { self = .passthrough(change) return } self = .initial(RLMPinnedRealm(realm: realm.rlmRealm), ThreadSafeReference(to: collection)) case .update(let collection, deletions: let deletions, insertions: let insertions, modifications: let modifications, sectionsToInsert: let sectionsToInsert, sectionsToDelete: let sectionsToDelete): guard let realm = collection.realm, !realm.isFrozen else { self = .passthrough(change) return } self = .update(RLMPinnedRealm(realm: realm.rlmRealm), ThreadSafeReference(to: collection), deletions: deletions, insertions: insertions, modifications: modifications, sectionsToInsert: sectionsToInsert, sectionsToDelete: sectionsToDelete) } } func resolve(_ scheduler: S) -> SectionedResultsChange<T>? { switch self { case .passthrough(let change): return change case .initial(let pin, let tsr): defer { pin.unpin() } if let resolved = realm(pin.configuration, scheduler)?.resolve(tsr) { return .initial(resolved) } return nil case .update(let pin, let tsr, deletions: let deletions, insertions: let insertions, modifications: let modifications, sectionsToInsert: let sectionsToInsert, sectionsToDelete: let sectionsToDelete): defer { pin.unpin() } if let resolved = realm(pin.configuration, scheduler)?.resolve(tsr) { return .update(resolved, deletions: deletions, insertions: insertions, modifications: modifications, sectionsToInsert: sectionsToInsert, sectionsToDelete: sectionsToDelete) } return nil } } } // swiftlint:disable type_name /// A publisher which delivers thread-confined collection changesets to a /// serial dispatch queue. /// /// Create using `.threadSafeReference().receive(on: queue)` on a publisher /// that emits `RealmCollectionChange`. @frozen public struct DeferredHandoverSectionedResultsChangeset<Upstream: Publisher, T: RealmSectionedResult, S: Scheduler>: Publisher where Upstream.Output == SectionedResultsChange<T> { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let upstream: Upstream private let scheduler: S internal init(_ upstream: Upstream, _ scheduler: S) { self.upstream = upstream self.scheduler = scheduler } /// :nodoc: public func receive<Sub>(subscriber: Sub) where Sub: Subscriber, Sub.Failure == Failure, Output == Sub.Input { let scheduler = self.scheduler self.upstream .map { SectionedHandover<T, S>($0) } .receive(on: scheduler) .compactMap { $0.resolve(scheduler) } .receive(subscriber: subscriber) } } // swiftlint:enable type_name /// A publisher which delivers thread-confined collection changesets to a /// serial dispatch queue. /// /// Create using `.threadSafeReference().receive(on: queue)` on a publisher /// that emits `SectionedResultsChange`. @frozen public struct DeferredHandoverSectionChangeset<Upstream: Publisher, T: RealmSectionedResult, S: Scheduler>: Publisher where Upstream.Output == SectionedResultsChange<T> { /// :nodoc: public typealias Failure = Upstream.Failure /// :nodoc: public typealias Output = Upstream.Output private let upstream: Upstream private let scheduler: S internal init(_ upstream: Upstream, _ scheduler: S) { self.upstream = upstream self.scheduler = scheduler } /// :nodoc: public func receive<Sub>(subscriber: Sub) where Sub: Subscriber, Sub.Failure == Failure, Output == Sub.Input { let scheduler = self.scheduler self.upstream .map { SectionedHandover<T, S>($0) } .receive(on: scheduler) .compactMap { $0.resolve(scheduler) } .receive(subscriber: subscriber) } } }