Creating a Custom Combine Operator

I recently had an issue where I wanted to access the output of a publisher at the top of a Combine operator chain at a later stage in the chain. For example, if you have an Author object and you call a function to retrieve all Books written by the author, you might have an operator chain that looks like this:

struct Author {
  let name: String
}

struct Book {
  let name: String
  let author: Author
}

func getBooksForAuthor(_ author: Author) -> AnyPublisher<[Book], BookLoadingError> {
  ...
  return books
}

// Combine chain
loadAuthor()    // AnyPublisher<Author, AuthorError>
  .flatMap(getBooksForAuthor) // AnyPublisher<[Book], AuthorError>

This is okay but I’d like to return a tuple of the Author object along with its associated [Book] array. I’d like to get something similar to this:

AnyPublisher<(Author, [Book]), AuthorError>

There are a few ways to accomplish this but I decided to create a custom Combine operator. The relationship between Publisher, Subscriber, and Combine operator looks like this:

CombinePublisherSubscriber

Using existing operators

You can compose existing operators within a function:

extension Publisher {
    /// Usage:
    /// loadSquad().mapKeepingUpstream(processPilots)
    func mapKeepingUpstream<T, P>(_ transform: @escaping (Self.Output) -> P) ->
        AnyPublisher<(T, P.Output), Self.Failure> where T == Self.Output, P: Publisher, Self.Failure == P.Failure
    {
        flatMap { tValue in // the value from the current publisher
            transform(tValue)   // the transformed publisher AnyPublisher<[JSON], Self.Failure>
                .map { pOutput in (tValue, pOutput) }   // AnyPublisher<(JSON, [JSON]), Self.Failure>
        }.eraseToAnyPublisher()
    }
}

You can also create a custom Combine operator.

Custom Combine operator components

To create a custom Combine operator you have the following components:

  • A function that is defined as an extension of Publisher. This function is called within the Combine chain.
  • A structure that adopts the Publisher protocol and also contains a func receive<S>(subscriber: S) function which subscribes to the upstream publisher.
  • A structure that adopts the Subscriber protocol. The implementation of the required func receive(_ input: S.Input) -> Subscribers.Demand will send events to the subscriber.

CombineCustomOperator

Publisher extension function

public extension Publisher {
    /// - parameter transform: Closure that takes an input and creates an output of type T.
    /// - returns: A publisher that outputs a tuple of the upstream elements and the result of the transform.
    ///
    ///
    /*
     Usage:
        [1, 2, 3, 4, 5].publisher
            .keepUpstreamAndMap(transform: { $0 * 2 })
     
        (1, 2)
        (2, 4)
        (3, 6)
        (4, 8)
        (5, 10)
     */
    func keepUpstreamAndMap<T>(transform: @escaping (Output) -> T) -> Publishers.KeepUpstreamOperator<Self, T>
    {
        Publishers.KeepUpstreamOperator(upstream: self, transform: transform)
    }
}
  • The transform parameter is a closure that will accept the output of the upstream publisher (the publisher that the keepUpstreamAndMap function is called on) and return some type T. The extension function is generic over type T.
  • The KeepUpstreamOperator is an object that adopts the Publisher protocol.

Publisher struct

public extension Publishers {
    /// A publisher that outputs a tuple of the output of the upstream publisher and the return value of the transform
    struct KeepUpstreamOperator<Upstream: Publisher, T>: Publisher {
        public typealias Output = (element: Upstream.Output, transformed: T)
        public typealias Failure = Upstream.Failure

        public let upstream: Upstream
        public let transform: (Upstream.Output) -> T
        
        public init(upstream: Upstream, transform: @escaping (Upstream.Output) -> T) {
            self.upstream = upstream
            self.transform = transform
        }

        public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, S.Input == Output
        {
            upstream.subscribe(Inner(publisher: self, downstream: subscriber))
        }
    }
}
  • This publisher stores the transform closure as well as the Upstream publisher.
  • When a subscriber subscribes to the publisher the receive<S>(subscriber: S) function is called. The function takes as input the subscriber.
  • The Inner is an object that adopts the Subscriber protocol. It is initialized with the transform closure and the subscriber.
  • To connect the upstream publisher to the bridge object, the subscribe function is called on the upstream field of the KeepUpstreamOperator

Subscriber struct

private extension Publishers.KeepUpstreamOperator {
    final class Inner<Downstream: Subscriber>: Subscriber
    where Downstream.Input == Output, Downstream.Failure == Failure
    {
        private let downstream: Downstream
        private let transform: (Upstream.Output) -> T
        
        fileprivate init(
            publisher: Publishers.KeepUpstreamOperator<Upstream, T>,
            downstream: Downstream
        ) {
            self.downstream = downstream
            self.transform = publisher.transform
        }

        func receive(subscription: Subscription) {
            downstream.receive(subscription: subscription)
        }

        func receive(_ input: Upstream.Output) -> Subscribers.Demand {
            let transformed: T = transform(input)
            return downstream.receive((element: input, transformed: transformed))
        }

        func receive(completion: Subscribers.Completion<Upstream.Failure>) {
            downstream.receive(completion: completion)
        }
    }
}

We define this struct as an extension of the KeepUpstreamOperator.

  • Similar to how the KeepUpstreamOperator was initialized with the transform and the upstream publisher, this object is initialized with the transform and the downstream subscriber.
  • The required functions on the Subscriber protocol are implemented. They allow for the KeepUpstreamOperatorBridge to receive
    • A subscription when a downstream publisher subscribes to this Subscriber
    • A value from the upstream publisher. The value is passed to the transform closure and sent to the downstream subscriber.
    • A completion event from the upstream publisher. This could be a completion or failure message.
    • In each case the corresponding receive(subscription:), receive(input:) and receive(completion:) function on the downstream subscriber is called.

Usage

The following Combine method chain performs the following

  1. Obtain a publisher by calling the loadSquad() function. This function returns either a JSON object or fails with an XWSImportError
  2. Calls the keepUpstreamAndMap function, passing in a reference to the processPilots(squad:) function. This returns a tuple of the upstream publisher created by loadSquad() and the output of the call to the processPilots(squad:) function.
  3. Finally we map the tuple to the buildUpdatedPilotsInfo(tuple:) function which returns a publisher of UpdatedPilotsInfo objects.
func processPilots(squad: JSON) -> [JSON] { ... }
func buildUpdatedPilotsInfo(_ tuple: (JSON, [JSON])) -> UpdatedPilotsInfo {
  return UpdatedPilotsInfo(squad: tuple.0, pilots: tuple.1)
}
     
loadSquad()    // AnyPublisher<JSON, XWSImportError>
  .keepUpstreamAndMap(processPilots)  // AnyPublisher<(JSON, [JSON]), XWSImportError>
  .map(buildUpdatedPilotsInfo)    // AnyPublisher<UpdatedPilotsInfo, XWSImportError>

Note that when the functions are passed to the Combine operators we don’t have to do the weak self dance since these functions are not contained within a class. If they were contained within a class we’d have to change our chain to avoid reference cycles like so:

loadSquad()    // AnyPublisher<JSON, XWSImportError>
  .keepUpstreamAndMap{ [weak self] squad in
    guard let self = self else { return JSON.null }
    return self.processPilots(squad: squad) 
  } // AnyPublisher<(JSON, [JSON]), XWSImportError>
  .map{ [weak self] tuple in
    guard let self = self else { return UpdatedPilotsInfo() }
    return self.buildUpdatedPilotsInfo(tuple)
  } // AnyPublisher<UpdatedPilotsInfo, XWSImportError>

For more information see self-weak-unowned