Creating a Custom Combine Operator
I recently had an issue where I wanted to access the output of a publisher at the top of a Combine operator chain at a later stage in the chain. For example, if you have an Author object and you call a function to retrieve all Books written by the author, you might have an operator chain that looks like this:
struct Author {
let name: String
}
struct Book {
let name: String
let author: Author
}
func getBooksForAuthor(_ author: Author) -> AnyPublisher<[Book], BookLoadingError> {
...
return books
}
// Combine chain
loadAuthor() // AnyPublisher<Author, AuthorError>
.flatMap(getBooksForAuthor) // AnyPublisher<[Book], AuthorError>
This is okay but I’d like to return a tuple of the Author object along with its associated [Book] array. I’d like to get something similar to this:
AnyPublisher<(Author, [Book]), AuthorError>
There are a few ways to accomplish this but I decided to create a custom Combine operator. The relationship between Publisher, Subscriber, and Combine operator looks like this:

Using existing operators
You can compose existing operators within a function:
extension Publisher {
/// Usage:
/// loadSquad().mapKeepingUpstream(processPilots)
func mapKeepingUpstream<T, P>(_ transform: @escaping (Self.Output) -> P) ->
AnyPublisher<(T, P.Output), Self.Failure> where T == Self.Output, P: Publisher, Self.Failure == P.Failure
{
flatMap { tValue in // the value from the current publisher
transform(tValue) // the transformed publisher AnyPublisher<[JSON], Self.Failure>
.map { pOutput in (tValue, pOutput) } // AnyPublisher<(JSON, [JSON]), Self.Failure>
}.eraseToAnyPublisher()
}
}
You can also create a custom Combine operator.
Custom Combine operator components
To create a custom Combine operator you have the following components:
- A function that is defined as an extension of
Publisher. This function is called within the Combine chain. - A structure that adopts the
Publisherprotocol and also contains afunc receive<S>(subscriber: S)function which subscribes to the upstream publisher. - A structure that adopts the
Subscriberprotocol. The implementation of the requiredfunc receive(_ input: S.Input) -> Subscribers.Demandwill send events to the subscriber.

Publisher extension function
public extension Publisher {
/// - parameter transform: Closure that takes an input and creates an output of type T.
/// - returns: A publisher that outputs a tuple of the upstream elements and the result of the transform.
///
///
/*
Usage:
[1, 2, 3, 4, 5].publisher
.keepUpstreamAndMap(transform: { $0 * 2 })
(1, 2)
(2, 4)
(3, 6)
(4, 8)
(5, 10)
*/
func keepUpstreamAndMap<T>(transform: @escaping (Output) -> T) -> Publishers.KeepUpstreamOperator<Self, T>
{
Publishers.KeepUpstreamOperator(upstream: self, transform: transform)
}
}
- The
transformparameter is a closure that will accept the output of the upstream publisher (the publisher that thekeepUpstreamAndMapfunction is called on) and return some typeT. The extension function is generic over typeT. - The
KeepUpstreamOperatoris an object that adopts thePublisherprotocol.
Publisher struct
public extension Publishers {
/// A publisher that outputs a tuple of the output of the upstream publisher and the return value of the transform
struct KeepUpstreamOperator<Upstream: Publisher, T>: Publisher {
public typealias Output = (element: Upstream.Output, transformed: T)
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let transform: (Upstream.Output) -> T
public init(upstream: Upstream, transform: @escaping (Upstream.Output) -> T) {
self.upstream = upstream
self.transform = transform
}
public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, S.Input == Output
{
upstream.subscribe(Inner(publisher: self, downstream: subscriber))
}
}
}
- This publisher stores the
transformclosure as well as theUpstreampublisher. - When a subscriber subscribes to the publisher the
receive<S>(subscriber: S)function is called. The function takes as input the subscriber. - The
Inneris an object that adopts theSubscriberprotocol. It is initialized with thetransformclosure and the subscriber. - To connect the upstream publisher to the
bridgeobject, thesubscribefunction is called on theupstreamfield of theKeepUpstreamOperator
Subscriber struct
private extension Publishers.KeepUpstreamOperator {
final class Inner<Downstream: Subscriber>: Subscriber
where Downstream.Input == Output, Downstream.Failure == Failure
{
private let downstream: Downstream
private let transform: (Upstream.Output) -> T
fileprivate init(
publisher: Publishers.KeepUpstreamOperator<Upstream, T>,
downstream: Downstream
) {
self.downstream = downstream
self.transform = publisher.transform
}
func receive(subscription: Subscription) {
downstream.receive(subscription: subscription)
}
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
let transformed: T = transform(input)
return downstream.receive((element: input, transformed: transformed))
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
downstream.receive(completion: completion)
}
}
}
We define this struct as an extension of the
KeepUpstreamOperator.
- Similar to how the
KeepUpstreamOperatorwas initialized with thetransformand the upstream publisher, this object is initialized with thetransformand the downstream subscriber. - The required functions on the
Subscriberprotocol are implemented. They allow for theKeepUpstreamOperatorBridgeto receive- A subscription when a downstream publisher subscribes to this Subscriber
- A value from the upstream publisher. The value is passed to the
transformclosure and sent to the downstream subscriber. - A completion event from the upstream publisher. This could be a completion or failure message.
- In each case the corresponding
receive(subscription:),receive(input:)andreceive(completion:)function on the downstream subscriber is called.
Usage
The following Combine method chain performs the following
- Obtain a publisher by calling the
loadSquad()function. This function returns either a JSON object or fails with anXWSImportError - Calls the
keepUpstreamAndMapfunction, passing in a reference to theprocessPilots(squad:)function. This returns a tuple of the upstream publisher created byloadSquad()and the output of the call to theprocessPilots(squad:)function. - Finally we map the tuple to the
buildUpdatedPilotsInfo(tuple:)function which returns a publisher ofUpdatedPilotsInfoobjects.
func processPilots(squad: JSON) -> [JSON] { ... }
func buildUpdatedPilotsInfo(_ tuple: (JSON, [JSON])) -> UpdatedPilotsInfo {
return UpdatedPilotsInfo(squad: tuple.0, pilots: tuple.1)
}
loadSquad() // AnyPublisher<JSON, XWSImportError>
.keepUpstreamAndMap(processPilots) // AnyPublisher<(JSON, [JSON]), XWSImportError>
.map(buildUpdatedPilotsInfo) // AnyPublisher<UpdatedPilotsInfo, XWSImportError>
Note that when the functions are passed to the Combine operators we don’t have to do the weak self dance since these functions are not contained within a class. If they were contained within a class we’d have to change our chain to avoid reference cycles like so:
loadSquad() // AnyPublisher<JSON, XWSImportError>
.keepUpstreamAndMap{ [weak self] squad in
guard let self = self else { return JSON.null }
return self.processPilots(squad: squad)
} // AnyPublisher<(JSON, [JSON]), XWSImportError>
.map{ [weak self] tuple in
guard let self = self else { return UpdatedPilotsInfo() }
return self.buildUpdatedPilotsInfo(tuple)
} // AnyPublisher<UpdatedPilotsInfo, XWSImportError>
For more information see self-weak-unowned