Using RxSwift to Observe Changes to items within an array
We’d like to get updates from the queue manager when it is sent the following events:
- An item was added to the queue
- An item was removed from the queue
I’d also like to get notified when the state of the item in the queue changes in response to:
item.Done
is set tofalse
item.DayOfTheWeek
is set to a day other than the current day.
Do we also need to monitor changes to the position of items in the queue in response to reordering items? If not we go and fetch a new data source after updating one or more items.
Adding an item to the queue
When an item is added to the queue, a new queue position is generated and the item.queuePosition
is updated with the new queue position. The item is then saved to CoreData and appended to the QueueManager.items
array.
Maybe it is overkill to store an in-memory representation in the
QueueManager.items
array. We could accomplish the same objective by using aitems: Observable<[Items]>
stream which is what populates the UI.
Monitoring changes
To monitor changes we could use Signal<T>
or RxSwift. Let’s use RxSwift for now. RxSwift has Observable<T>
objects that wrap the value to be observed. An Observable<Int>
wraps an Int and processes any events of type Int
that are emitted on the stream, while a Observable<[Item]>
wraps an [Item]
and processes any events of type [Item]
that are emitted on the stream. An Observer
object listens to the stream and proceses events that any events that it encounters. The Observable relationship with an Observer is started when a subscribe()
is called on the Observable.
There may be cases where an observer will encounter events that occurred before the observer subscribed to the stream (Observable)
To model a stream of data we can use marble diagrams:
--a---b-c---d---X---|->
a, b, c, d are events
X is an error event
| is the 'completed' signal
---> is the timeline
To model an Observable<Int>
or Observable<[Item]>
we’d have the following
Observable<Int> : --2---3-------4-----5-|->
Observable<[Item]> : --[items1]------[items2]-|->
When items are added or removed from the queue, we would have
Observable<[Item]> : --[items_add_1]------[items_remove_1]--[items_add_2]---|->
Add variable (BehaviorSubject) to the queue manager and an observable which will observe the subject
The
Variable<T>
type is just aBehaviorSubject
which is anObservable<T>
. TheVariable<T>
is just a convenient way to access the value within theObservable<T>
. I use it to represent target of write operations while anObservable<T>
is used to represent a target of read operations.
class QueueManager {
var queueItemSubject: Variable<[Item]>
}
class QueueViewModel {
var queueItemsObservable: Observable<[Item]>
}
When items are added or removed from the queue, we would have the following written to the QueueManager.queueItemSubject
and read from the QueueViewModel.queueItemsObservable
queueItemSubject : --[items_add_1]------[items_remove_1]--------[items_add_2]------------|->
queueItemsObservable: ---------------[items]---------------[items]-------------[items]------|->
- Subscribe to the
queueItemSubject
within each client. UseasObservable()
to convert the subject into an observable. - To write items into the
QueueManager.queueItemSubject
just set thevalue
property of thequeueItemSubject
.
// Who is responsible for creating QueueManager?
class QueueManager {
private let _queueItemSubject: BehaviorSubject<[Item]>([])
private let _dataService : DataAccessProvider
private let _queueOperationSubject: PublishSubject<QueueOperation>()
init(dataService: DataAccessProvider) {
self.operation = _queueOperationSubject.asObserver()
self.items = _queueOperationSubject.asObservable()
.map(onNext: { [weak self] self?.fetchItems() })
}
private func fetchItems() -> Observable<[Item]> {
return _dataService.fetchQueuedItems()
}
public let items : Observable<[Item]>
public let operation : AnyObserver<QueueOperation>
}
enum QueueOperation {
case add(Item)
case delete(Item)
}
// VC or WF/Coordinator/Test Harness creates the view model
class QueueViewModel {
var queueItemsObservable: Observable<[Item]>
let queueManager: QueueManager
private let _queueItemOperationSubject: PublishSubject<QueueOperation>()
private let disposeBag : DisposeBag
init(queueManager: QueueManager) {
self.disposeBag = DisposeBag()
self.queueManager = queueManager
self.queueItemsObservable = queueManager.items
self._queueItemOperationSubject.asObservable()
.subscribe(queueManager.operation)
.disposed(by: disposeBag)
}
func addItem(item: Item) {
_queueItemOperationSubject.accept(.add(item))
}
func deleteItem(item: Item) {
_queueItemOperationSubject.accept(.delete(item))
}
}
A common practice is to keep your subjects
private
and provide a computed property to return them asObservable<T>
Once the call to subscribeToQueue()
completes we will have a subscription to the queued items. Any time the QueueManager.queueItemSubject.value
changes, we should see an update to the QueueViewModel.items
property. If there already exists items in the QueueManager.queueItemSubject.value
(via a call to fetchItems()
, we will see these items in the OueueViewModel.items
property.
The type of subject dictates whether the stream data is available upon subscription. For
Variable<T>
orBehaviorSubject
subjects, the data is immediately available when subscribed and the subscribed observer will process the stream immediately. If the subject is aPublishSubject
data is only available when the value of thePublishSubject
changes, no data is available on subscription.
So far we have
QueueManager.fetchItems() loads initial [items]
QueueManager.items : --[items]------[items_remove_1]--[items_add_2]------------|->
QueueViewModel.items : -----------[items]--[items_remove_1]--[items_add_2]---|->
The queue manager and view model are now kept in sync.
Monitoring property changes on items in the queue
The sequences for item.Done
and item.DayOfTheWeek
changes
item.Done Observable<Bool> : --T---F-------T-----F-|->
item.DayOfTheWeek Observable<Int> : ----2----3------4----5-|->
class Item {
private var doneSubject: Variable<Bool>
private var dayOfTheWeekSubject: Variable<Int>
var doneSubjectObservable: Observable<Bool> = {
return doneSubject.asObservable
}
var dayOfTheWeekSubjectObservable: Observable<Int> = {
return dayOfTheWeekSubject.asObservable
}
}
The QueueManager
is interested in knowing when the Done
or DayOfTheWeek
values change on items within the queue. The subscription to the done and day of the week observable is created for each item in the items array whenever the queueItemSubject.value
changes. This can happen for the following reasons
- The
queueItemSubject.value
is set directly to a new array ofItem
objects - An item is added to the array via a call to
add()
- An item is removed from the array via a call to
remove()
func add(item: Item) {
self.queueItemSubject.value.append(item)
item.queuePosition = self.queueItemSubject.count
// Add subscriptions
}
func remove(item: Item) {
self.queueItemSubject.value
.index(of: item)
.map {
self.queueItemSubject.value.remove(at: $0)
}
// Add subscriptions
}
Now we just need to figure out how to subscribe to the changes.
func addSubscriptions(item: Item) {
item.doneSubjectObservable.subscribe { done in
print(done)
if (!done) { self.remove(item) }
}
item.dayOfTheWeekSubjectObservable.subscribe { dayOfTheWeek in
print(dayOfTheWeek)
if (dayOfTheWeek != self.currentDay) { self.remove(item) }
}
}
Now if the done
or dayOfTheWeek
properties change on an item, the closure passed to subscribe()
will execute, which will remove the item from the queue.
The item no longer exists in the queue but we’ll keep observing changes on the item since we haven’t removed the subscription
Adding a single item to the queue then updating the done to false should remove the item from the queue and cancel the subscription.
add(itemA) : O
subscribe() : -O
queue : --[itemA]-------------
item.Done : ---------T--------F-X-|->
remove(itemA): -------------------O
For the DayOfTheWeek
case, if the value changes and the new value is not equal to the current day then remove the item from the queue and cancel the subscription
queueManager.currentDay = 2
add(itemA) : O
subscribe() : -O
queue : --[itemA]-------------
item : ---------2--------3-X-|->
.DayOfTheWeek
remove(itemA): -------------------O
Cancelling the subscription
Usually the object responsible for the subject will issue an error
or completed
event to the subscriber which will dispose the subscription. For our case, we’d like the subscription to dispose itself if we read an invalid value within the sequence.
Throwing an error in map on the observable
One approach is to throw an error inside of map()
within the subscription
enum QueueError: Error {
case DoneNotTrue
case DayOfTheWeekNotCurrentDay
}
func addSubscriptions(item: Item) {
item.doneSubjectObservable
.map{ done in
if (!done) {
self.remove(item)
}
}
.catch { error in
print(error)
}
.subscribe { done in
print(done)
}
item.dayOfTheWeekSubjectObservable
.map{ dayOfTheWeek in
print(dayOfTheWeek)
if (dayOfTheWeek != self.currentDay) { self.remove(item) }
}
.catch{ error in
print(error)
}
.subscribe { done in
print(done)
}
}
Using a helper function
This helps to keep your subscriptions clean. The helper functions will return a new observable that will either be a single value within a .just
observable or an error value within a .error
observable
private func checkDone(done: Bool) -> Observable<Bool> {
if done {
return .just(done)
} else {
return .error(DoneNotTrue)
}
}
private func checkDayOfTheWeek(day: Int) -> Observable<Int> {
if (dayOfTheWeek != self.currentDay) {
return .just(day)
} else {
return .error(DayOfTheWeekNotCurrentDay)
}
}
We can remove the throw
from the map and rely on the return value from the helper functions to determine whether or not to cancel the subscription. The subscription is cancelled if a .error
observable is encountered in the subscribe closure as a result of the helper function call.
func addSubscriptions(item: Item) {
item.doneSubjectObservable
.map{ done in
return checkDone(done)
}
.subscribe(
onCompleted: {
print("Done")
},
onError: {
switch error{
case .DoneNotTrue:
print("Done is false, cancelling subscription")
self.remove(item)
}
})
item.dayOfTheWeekSubjectObservable
.map{ dayOfTheWeek in
return checkDayOfTheWeek(dayOfTheWeek)
}
.subscribe(
onCompleted: {
print("Done")
},
onError: {
switch error{
case .DayOfTheWeekNotCurrentDay:
print("DayOfTheWeek not equal to current day, cancelling subscription")
self.remove(item)
}
})
}
and the updated add & remove
func add(item: Item) {
self.queueItemSubject.value.append(item)
item.queuePosition = self.queueItemSubject.count
addSubscriptions(item: item)
}
func remove(item: Item) {
self.queueItemSubject.value
.index(of: item)
.map {
self.queueItemSubject.value.remove(at: $0)
}
}
No need to remove the subscription in
remove()
since the subscription will be disposed in the error handler of the subscription which is called beforeremove()
is called.
Dispose the subscription within remove()
You can just dispose the subscription in the remove()
function.
func remove(item: Item) {
self.queueItemSubject.value
.index(of: item)
.map {
self.queueItemSubject.value.remove(at: $0)
item.doneSubjectObservable.dispose()
item.dayOfTheWeekObservable.dispose()
}
}
Here we call the dispose()
on the item observables. This will cancel the subscription without error and terminate the stream.
add(itemA) : O
subscribe() : -O
queue : --[itemA]-------------
item.Done : ---------T--------F-|->
remove(itemA): -------------------O
queueManager.currentDay = 2
add(itemA) : O
subscribe() : -O
queue : --[itemA]-------------
item : ---------2--------3-|->
.DayOfTheWeek
remove(itemA): -------------------O
Now we can clean up the subscription logic by removing the map and adding logic to subscribe to validate the incoming data. First, update the helper functions to call remove()
when validation fails. The functions also need to accept an Item
and return Void
.
private func validateDone(done: Bool, item: Item) {
print(done)
if !done {
self.remove(item)
}
}
private func validateDayOfTheWeek(day: Int, item: Item) {
print(dayOfTheWeek)
if (dayOfTheWeek != self.currentDay) {
self.remove(item)
}
}
Update the subscription logic
func addSubscriptions(item: Item) {
item.doneSubjectObservable
.subscribe(
onNext: { done in
validateDone(done, item)
},
onCompleted: {
print("Done")
},
onError: {
switch error{
case .DoneNotTrue:
print("Done is false, cancelling subscription")
self.remove(item)
}
})
item.dayOfTheWeekSubjectObservable
.subscribe(
onNext: { dayOfTheWeek in
validateDayOfTheWeek(dayOfTheWeek)
},
onCompleted: {
print("Done")
},
onError: {
switch error{
case .DayOfTheWeekNotCurrentDay:
print("DayOfTheWeek not equal to current day, cancelling subscription")
self.remove(item)
}
})
}