Creating a Custom Combine Operator
I recently had an issue where I wanted to access the output of a publisher at the top of a Combine operator chain at a later stage in the chain. For example, if you have an Author object and you call a function to retrieve all Books written by the author, you might have an operator chain that looks like this:
struct Author {
let name: String
}
struct Book {
let name: String
let author: Author
}
func getBooksForAuthor(_ author: Author) -> AnyPublisher<[Book], BookLoadingError> {
...
return books
}
// Combine chain
loadAuthor() // AnyPublisher<Author, AuthorError>
.flatMap(getBooksForAuthor) // AnyPublisher<[Book], AuthorError>
This is okay but I’d like to return a tuple of the Author
object along with its associated [Book]
array. I’d like to get something similar to this:
AnyPublisher<(Author, [Book]), AuthorError>
There are a few ways to accomplish this but I decided to create a custom Combine operator. The relationship between Publisher, Subscriber, and Combine operator looks like this:
Using existing operators
You can compose existing operators within a function:
extension Publisher {
/// Usage:
/// loadSquad().mapKeepingUpstream(processPilots)
func mapKeepingUpstream<T, P>(_ transform: @escaping (Self.Output) -> P) ->
AnyPublisher<(T, P.Output), Self.Failure> where T == Self.Output, P: Publisher, Self.Failure == P.Failure
{
flatMap { tValue in // the value from the current publisher
transform(tValue) // the transformed publisher AnyPublisher<[JSON], Self.Failure>
.map { pOutput in (tValue, pOutput) } // AnyPublisher<(JSON, [JSON]), Self.Failure>
}.eraseToAnyPublisher()
}
}
You can also create a custom Combine operator.
Custom Combine operator components
To create a custom Combine operator you have the following components:
- A function that is defined as an extension of
Publisher
. This function is called within the Combine chain. - A structure that adopts the
Publisher
protocol and also contains afunc receive<S>(subscriber: S)
function which subscribes to the upstream publisher. - A structure that adopts the
Subscriber
protocol. The implementation of the requiredfunc receive(_ input: S.Input) -> Subscribers.Demand
will send events to the subscriber.
Publisher extension function
public extension Publisher {
/// - parameter transform: Closure that takes an input and creates an output of type T.
/// - returns: A publisher that outputs a tuple of the upstream elements and the result of the transform.
///
///
/*
Usage:
[1, 2, 3, 4, 5].publisher
.keepUpstreamAndMap(transform: { $0 * 2 })
(1, 2)
(2, 4)
(3, 6)
(4, 8)
(5, 10)
*/
func keepUpstreamAndMap<T>(transform: @escaping (Output) -> T) -> Publishers.KeepUpstreamOperator<Self, T>
{
Publishers.KeepUpstreamOperator(upstream: self, transform: transform)
}
}
- The
transform
parameter is a closure that will accept the output of the upstream publisher (the publisher that thekeepUpstreamAndMap
function is called on) and return some typeT
. The extension function is generic over typeT
. - The
KeepUpstreamOperator
is an object that adopts thePublisher
protocol.
Publisher struct
public extension Publishers {
/// A publisher that outputs a tuple of the output of the upstream publisher and the return value of the transform
struct KeepUpstreamOperator<Upstream: Publisher, T>: Publisher {
public typealias Output = (element: Upstream.Output, transformed: T)
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let transform: (Upstream.Output) -> T
public init(upstream: Upstream, transform: @escaping (Upstream.Output) -> T) {
self.upstream = upstream
self.transform = transform
}
public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, S.Input == Output
{
upstream.subscribe(Inner(publisher: self, downstream: subscriber))
}
}
}
- This publisher stores the
transform
closure as well as theUpstream
publisher. - When a subscriber subscribes to the publisher the
receive<S>(subscriber: S)
function is called. The function takes as input the subscriber. - The
Inner
is an object that adopts theSubscriber
protocol. It is initialized with thetransform
closure and the subscriber. - To connect the upstream publisher to the
bridge
object, thesubscribe
function is called on theupstream
field of theKeepUpstreamOperator
Subscriber struct
private extension Publishers.KeepUpstreamOperator {
final class Inner<Downstream: Subscriber>: Subscriber
where Downstream.Input == Output, Downstream.Failure == Failure
{
private let downstream: Downstream
private let transform: (Upstream.Output) -> T
fileprivate init(
publisher: Publishers.KeepUpstreamOperator<Upstream, T>,
downstream: Downstream
) {
self.downstream = downstream
self.transform = publisher.transform
}
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
let transformed: T = transform(input)
return downstream.receive((element: input, transformed: transformed))
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
downstream.receive(completion: completion)
}
}
}
We define this struct as an extension of the
KeepUpstreamOperator
.
- Similar to how the
KeepUpstreamOperator
was initialized with thetransform
and the upstream publisher, this object is initialized with thetransform
and the downstream subscriber. - The required functions on the
Subscriber
protocol are implemented. They allow for theKeepUpstreamOperatorBridge
to receive- A subscription when a downstream publisher subscribes to this Subscriber
- A value from the upstream publisher. The value is passed to the
transform
closure and sent to the downstream subscriber. - A completion event from the upstream publisher. This could be a completion or failure message.
- In each case the corresponding
receive(subscription:)
,receive(input:)
andreceive(completion:)
function on the downstream subscriber is called.
Usage
The following Combine method chain performs the following
- Obtain a publisher by calling the
loadSquad()
function. This function returns either a JSON object or fails with anXWSImportError
- Calls the
keepUpstreamAndMap
function, passing in a reference to theprocessPilots(squad:)
function. This returns a tuple of the upstream publisher created byloadSquad()
and the output of the call to theprocessPilots(squad:)
function. - Finally we map the tuple to the
buildUpdatedPilotsInfo(tuple:)
function which returns a publisher ofUpdatedPilotsInfo
objects.
func processPilots(squad: JSON) -> [JSON] { ... }
func buildUpdatedPilotsInfo(_ tuple: (JSON, [JSON])) -> UpdatedPilotsInfo {
return UpdatedPilotsInfo(squad: tuple.0, pilots: tuple.1)
}
loadSquad() // AnyPublisher<JSON, XWSImportError>
.keepUpstreamAndMap(processPilots) // AnyPublisher<(JSON, [JSON]), XWSImportError>
.map(buildUpdatedPilotsInfo) // AnyPublisher<UpdatedPilotsInfo, XWSImportError>
Note that when the functions are passed to the Combine operators we don’t have to do the weak self
dance since these functions are not contained within a class. If they were contained within a class we’d have to change our chain to avoid reference cycles like so:
loadSquad() // AnyPublisher<JSON, XWSImportError>
.keepUpstreamAndMap{ [weak self] squad in
guard let self = self else { return JSON.null }
return self.processPilots(squad: squad)
} // AnyPublisher<(JSON, [JSON]), XWSImportError>
.map{ [weak self] tuple in
guard let self = self else { return UpdatedPilotsInfo() }
return self.buildUpdatedPilotsInfo(tuple)
} // AnyPublisher<UpdatedPilotsInfo, XWSImportError>
For more information see self-weak-unowned