Using RxSwift to Observe Changes to items within an array

We’d like to get updates from the queue manager when it is sent the following events:

  • An item was added to the queue
  • An item was removed from the queue

I’d also like to get notified when the state of the item in the queue changes in response to:

  • item.Done is set to false
  • item.DayOfTheWeek is set to a day other than the current day.

Do we also need to monitor changes to the position of items in the queue in response to reordering items? If not we go and fetch a new data source after updating one or more items.

Adding an item to the queue

When an item is added to the queue, a new queue position is generated and the item.queuePosition is updated with the new queue position. The item is then saved to CoreData and appended to the QueueManager.items array.

Maybe it is overkill to store an in-memory representation in the QueueManager.items array. We could accomplish the same objective by using a items: Observable<[Items]> stream which is what populates the UI.

Monitoring changes

To monitor changes we could use Signal<T> or RxSwift. Let’s use RxSwift for now. RxSwift has Observable<T> objects that wrap the value to be observed. An Observable<Int> wraps an Int and processes any events of type Int that are emitted on the stream, while a Observable<[Item]> wraps an [Item] and processes any events of type [Item] that are emitted on the stream. An Observer object listens to the stream and proceses events that any events that it encounters. The Observable relationship with an Observer is started when a subscribe() is called on the Observable.

There may be cases where an observer will encounter events that occurred before the observer subscribed to the stream (Observable)

To model a stream of data we can use marble diagrams:

--a---b-c---d---X---|->

a, b, c, d are events
X is an error event
| is the 'completed' signal
---> is the timeline

To model an Observable<Int> or Observable<[Item]> we’d have the following

Observable<Int>    : --2---3-------4-----5-|->
Observable<[Item]> : --[items1]------[items2]-|->

When items are added or removed from the queue, we would have

Observable<[Item]> : --[items_add_1]------[items_remove_1]--[items_add_2]---|->

Add variable (BehaviorSubject) to the queue manager and an observable which will observe the subject

The Variable<T> type is just a BehaviorSubject which is an Observable<T>. The Variable<T> is just a convenient way to access the value within the Observable<T>. I use it to represent target of write operations while an Observable<T> is used to represent a target of read operations.

class QueueManager {
  var queueItemSubject: Variable<[Item]>
}

class QueueViewModel {
  var queueItemsObservable: Observable<[Item]>
}

When items are added or removed from the queue, we would have the following written to the QueueManager.queueItemSubject and read from the QueueViewModel.queueItemsObservable

queueItemSubject    : --[items_add_1]------[items_remove_1]--------[items_add_2]------------|->
queueItemsObservable: ---------------[items]---------------[items]-------------[items]------|->
  1. Subscribe to the queueItemSubject within each client. Use asObservable() to convert the subject into an observable.
  2. To write items into the QueueManager.queueItemSubject just set the value property of the queueItemSubject.
// Who is responsible for creating QueueManager?

class QueueManager {
  private let _queueItemSubject: BehaviorSubject<[Item]>([])
  private let _dataService : DataAccessProvider
  private let _queueOperationSubject: PublishSubject<QueueOperation>()
  
  init(dataService: DataAccessProvider) {
    self.operation = _queueOperationSubject.asObserver()
    
    self.items = _queueOperationSubject.asObservable()
      .map(onNext: { [weak self] self?.fetchItems() })
      
     
  }

  private func fetchItems() -> Observable<[Item]> {
    return _dataService.fetchQueuedItems()
  }

  public let items : Observable<[Item]>
  public let operation : AnyObserver<QueueOperation>
}

enum QueueOperation {
  case add(Item)
  case delete(Item)
}

// VC or WF/Coordinator/Test Harness creates the view model
class QueueViewModel {
  var queueItemsObservable: Observable<[Item]>
  let queueManager: QueueManager
  private let _queueItemOperationSubject: PublishSubject<QueueOperation>()
  private let disposeBag : DisposeBag
  
  init(queueManager: QueueManager) {
    self.disposeBag = DisposeBag()
    self.queueManager = queueManager
    self.queueItemsObservable = queueManager.items
    
    self._queueItemOperationSubject.asObservable()
      .subscribe(queueManager.operation)
      .disposed(by: disposeBag)
  }
  
  func addItem(item: Item) {
     _queueItemOperationSubject.accept(.add(item))
  }
  
  func deleteItem(item: Item) {
     _queueItemOperationSubject.accept(.delete(item))
  }
}

A common practice is to keep your subjects private and provide a computed property to return them as Observable<T>

Once the call to subscribeToQueue() completes we will have a subscription to the queued items. Any time the QueueManager.queueItemSubject.value changes, we should see an update to the QueueViewModel.items property. If there already exists items in the QueueManager.queueItemSubject.value (via a call to fetchItems(), we will see these items in the OueueViewModel.items property.

The type of subject dictates whether the stream data is available upon subscription. For Variable<T> or BehaviorSubject subjects, the data is immediately available when subscribed and the subscribed observer will process the stream immediately. If the subject is a PublishSubject data is only available when the value of the PublishSubject changes, no data is available on subscription.

So far we have

QueueManager.fetchItems() loads initial [items]
QueueManager.items    : --[items]------[items_remove_1]--[items_add_2]------------|->
QueueViewModel.items  : -----------[items]--[items_remove_1]--[items_add_2]---|->

The queue manager and view model are now kept in sync.

Monitoring property changes on items in the queue

The sequences for item.Done and item.DayOfTheWeek changes

item.Done Observable<Bool>           : --T---F-------T-----F-|->
item.DayOfTheWeek Observable<Int>    : ----2----3------4----5-|->
class Item {
  private var doneSubject: Variable<Bool>
  private var dayOfTheWeekSubject: Variable<Int>
  
  var doneSubjectObservable: Observable<Bool> = {
    return doneSubject.asObservable
  }

  var dayOfTheWeekSubjectObservable: Observable<Int> = {
    return dayOfTheWeekSubject.asObservable
  }
}

The QueueManager is interested in knowing when the Done or DayOfTheWeek values change on items within the queue. The subscription to the done and day of the week observable is created for each item in the items array whenever the queueItemSubject.value changes. This can happen for the following reasons

  • The queueItemSubject.value is set directly to a new array of Item objects
  • An item is added to the array via a call to add()
  • An item is removed from the array via a call to remove()
func add(item: Item) {
  self.queueItemSubject.value.append(item)
  item.queuePosition = self.queueItemSubject.count
  // Add subscriptions
}

func remove(item: Item) {
  self.queueItemSubject.value
      .index(of: item)
      .map { 
        self.queueItemSubject.value.remove(at: $0) 
      }
  // Add subscriptions
}

Now we just need to figure out how to subscribe to the changes.

func addSubscriptions(item: Item) {
  item.doneSubjectObservable.subscribe { done in 
    print(done) 
    if (!done) { self.remove(item) }
  }

  item.dayOfTheWeekSubjectObservable.subscribe { dayOfTheWeek in 
    print(dayOfTheWeek) 
    if (dayOfTheWeek != self.currentDay) { self.remove(item) }
  }
}

Now if the done or dayOfTheWeek properties change on an item, the closure passed to subscribe() will execute, which will remove the item from the queue.

The item no longer exists in the queue but we’ll keep observing changes on the item since we haven’t removed the subscription

Adding a single item to the queue then updating the done to false should remove the item from the queue and cancel the subscription.

add(itemA)   : O
subscribe()  : -O
queue        : --[itemA]-------------
item.Done    : ---------T--------F-X-|->
remove(itemA): -------------------O

For the DayOfTheWeek case, if the value changes and the new value is not equal to the current day then remove the item from the queue and cancel the subscription

queueManager.currentDay = 2

add(itemA)   : O
subscribe()  : -O
queue        : --[itemA]-------------
item         : ---------2--------3-X-|->
.DayOfTheWeek 
remove(itemA): -------------------O

Cancelling the subscription

Usually the object responsible for the subject will issue an error or completed event to the subscriber which will dispose the subscription. For our case, we’d like the subscription to dispose itself if we read an invalid value within the sequence.

Throwing an error in map on the observable

One approach is to throw an error inside of map() within the subscription

enum QueueError: Error {
  case DoneNotTrue
  case DayOfTheWeekNotCurrentDay
}

func addSubscriptions(item: Item) { 
  item.doneSubjectObservable
  .map{ done in 
    if (!done) { 
      self.remove(item) 
    }
  }
  .catch { error in
    print(error)
  }
  .subscribe { done in 
    print(done) 
  }

  item.dayOfTheWeekSubjectObservable
  .map{ dayOfTheWeek in 
    print(dayOfTheWeek) 
    if (dayOfTheWeek != self.currentDay) { self.remove(item) }
  }
  .catch{ error in
    print(error)
  }
  .subscribe { done in 
    print(done) 
  }
}

Using a helper function

This helps to keep your subscriptions clean. The helper functions will return a new observable that will either be a single value within a .just observable or an error value within a .error observable

private func checkDone(done: Bool) -> Observable<Bool> {
  if done {
    return .just(done)
  } else {
    return .error(DoneNotTrue)
  }
}

private func checkDayOfTheWeek(day: Int) -> Observable<Int> {
  if (dayOfTheWeek != self.currentDay) {
    return .just(day)
  } else {
    return .error(DayOfTheWeekNotCurrentDay)
  }
}

We can remove the throw from the map and rely on the return value from the helper functions to determine whether or not to cancel the subscription. The subscription is cancelled if a .error observable is encountered in the subscribe closure as a result of the helper function call.

func addSubscriptions(item: Item) { 
  item.doneSubjectObservable
  .map{ done in 
    return checkDone(done)
  }
  .subscribe(
    onCompleted: {
        print("Done")
    },
    onError: {
        switch error{
        case .DoneNotTrue:
            print("Done is false, cancelling subscription")
            self.remove(item)
        }
    })

  item.dayOfTheWeekSubjectObservable
  .map{ dayOfTheWeek in 
    return checkDayOfTheWeek(dayOfTheWeek)
  }
  .subscribe(
    onCompleted: {
        print("Done")
    },
    onError: {
        switch error{
        case .DayOfTheWeekNotCurrentDay:
            print("DayOfTheWeek not equal to current day, cancelling subscription")
            self.remove(item)
        }
    })
}

and the updated add & remove

func add(item: Item) {
  self.queueItemSubject.value.append(item)
  item.queuePosition = self.queueItemSubject.count
  addSubscriptions(item: item)
}

func remove(item: Item) {
  self.queueItemSubject.value
      .index(of: item)
      .map { 
        self.queueItemSubject.value.remove(at: $0) 
      }
}

No need to remove the subscription in remove() since the subscription will be disposed in the error handler of the subscription which is called before remove() is called.

Dispose the subscription within remove()

You can just dispose the subscription in the remove() function.

func remove(item: Item) {
  self.queueItemSubject.value
      .index(of: item)
      .map { 
        self.queueItemSubject.value.remove(at: $0)
        item.doneSubjectObservable.dispose()
        item.dayOfTheWeekObservable.dispose()
      }
}

Here we call the dispose() on the item observables. This will cancel the subscription without error and terminate the stream.

add(itemA)   : O
subscribe()  : -O
queue        : --[itemA]-------------
item.Done    : ---------T--------F-|->
remove(itemA): -------------------O

queueManager.currentDay = 2
add(itemA)   : O
subscribe()  : -O
queue        : --[itemA]-------------
item         : ---------2--------3-|->
.DayOfTheWeek 
remove(itemA): -------------------O

Now we can clean up the subscription logic by removing the map and adding logic to subscribe to validate the incoming data. First, update the helper functions to call remove() when validation fails. The functions also need to accept an Item and return Void.

private func validateDone(done: Bool, item: Item) {
  print(done)

  if !done {
    self.remove(item)
  }
}

private func validateDayOfTheWeek(day: Int, item: Item) {
  print(dayOfTheWeek)

  if (dayOfTheWeek != self.currentDay) {
    self.remove(item)
  }
}

Update the subscription logic

func addSubscriptions(item: Item) { 
  item.doneSubjectObservable
  .subscribe(
    onNext: { done in 
      validateDone(done, item)
    },
    onCompleted: {
        print("Done")
    },
    onError: {
        switch error{
        case .DoneNotTrue:
            print("Done is false, cancelling subscription")
            self.remove(item)
        }
    })

  item.dayOfTheWeekSubjectObservable
  .subscribe(
    onNext: { dayOfTheWeek in
      validateDayOfTheWeek(dayOfTheWeek)
    },
    onCompleted: {
        print("Done")
    },
    onError: {
        switch error{
        case .DayOfTheWeekNotCurrentDay:
            print("DayOfTheWeek not equal to current day, cancelling subscription")
            self.remove(item)
        }
    })
}