Apple unveiled the official Reactive framwork: Combine in WWDC²⁰¹⁹. There had been a lot of articles around most of which are the basic tutorials. Therefore, I’d like to talk about it in a more detailed way.
This is the second part of a three part series about diving into Combine framework. In this part, I’ll implement
Filter
,Contains
,Reduce
,MapError
,ReplaceError
,Catch
.you can check other parts by links: first part. final part .
Get back again to the Operator, let’s introduce a few relatively simple ones.
When we are using Combine, sometimes we want to inspect the result of pipelines instead of only the result received by the subscribers. for this reason, the Combine framework offers a Print
operator so that we can do this job by appending it to the operator we want to inspect. Let’s inspect the implementation below:
struct Print<UpStream> : Publisher where UpStream : Publisher {
typealias Output = UpStream.Output
typealias Failure = UpStream.Failure
let upstream: UpStream
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let _ = upstream.sink { completion in
Swift.print("received finished")
subscriber.receive(completion: completion)
} receiveValue: { x in
Swift.print("received value: (\(x))")
let _ = subscriber.receive(x)
}
}
}
/// sample code
let _ = [1, 2, 3, 4].publisher.print().sink { completion in
/// received finished
} receiveValue: { x in
/// received value: (1)
/// received value: (2)
/// received value: (3)
/// received value: (4)
}
As the code said, it prints the behaviors of the events before sending them.
There are several other operators like those in Foundation’s Sequence. such as Filter, Contains, and Reduce. Let’s write them one by one.
Filter
the Filter
operator is literally used to filter out some events that match the provided closure. Talk is meaningless here, check the code below:
struct Filter<UpStream> : Publisher where UpStream : Publisher {
typealias Output = UpStream.Output
typealias Failure = UpStream.Failure
let upstream: UpStream
let filterBlock: (UpStream.Output) -> Bool
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let _ = upstream.sink { completion in
subscriber.receive(completion: completion)
} receiveValue: { x in
if filterBlock(x) {
let _ = subscriber.receive(x)
}
}
}
}
extension Publisher {
func filter(_ isIncluded: @escaping (Output) -> Bool) -> Filter<Self> {
Filter(upstream: self, filterBlock: isIncluded)
}
}
/// sample code
let _ = [1, 2, 3, 4].publisher.filter { x in
x % 2 == 0
}.sink { x in
print(x) /// [2, 4]
}
Actually, there’s another similar operator called TryFilter, but I don’t write it here, instead, you can try to implement it on your own.
Contains
the Contains
operator is literally used to check if the event source contains a certain value. it will send true if matches, otherwise it will send false after all of the events fail to match.
struct Contains<UpStream> : Publisher where UpStream : Publisher, UpStream.Output : Equatable {
typealias Output = Bool
typealias Failure = UpStream.Failure
let upstream: UpStream
let value: UpStream.Output
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let _ = upstream.sink { completion in
let _ = subscriber.receive(false)
subscriber.receive(completion: completion)
} receiveValue: { x in
if x == value {
let _ = subscriber.receive(true)
subscriber.receive(completion: .finished)
}
}
}
}
extension Publisher {
func contains(_ output: Self.Output) -> Contains<Self> where Output : Equatable {
Contains(upstream: self, value: output)
}
}
/// sample code
let _ = [1, 2, 3, 4].publisher.contains { x in
x == 3
}.sink { x in
print(x) /// [true]
}
There’s also a similar operator also called Contains which accepts a predicate instead of an exact value. Try it out!
Reduce
the Reduce
operator is used to produce an accumulated value by applying the closure to all of the events. we have to maintain the result value and update it each time the closure is called.
struct Reduce<Output, Upstream> : Publisher where Upstream : Publisher {
typealias Failure = Upstream.Failure
let upstream: Upstream
let reduceBlock: (Output, Upstream.Output) -> Output
let initialValue: Output
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
var sum = initialValue
let _ = upstream.sink { completion in
let _ = subscriber.receive(sum)
subscriber.receive(completion: .finished)
} receiveValue: { x in
sum = reduceBlock(sum, x)
}
}
}
extension Publisher {
func reduce<T>(_ initialResult: T, _ nextPartialResult: @escaping (T, Output) -> T) -> Reduce<T, Self> {
Reduce(upstream: self, reduceBlock: nextPartialResult, initialValue: initialResult)
}
}
/// sample code
let _ = [1, 2, 3, 4].publisher.reduce(0) { x, result in
result + x
}.sink { x in
print(x) /// [10]
}
This operator features a little bit of complex generic use. we should identify which generic type in the extension method matches which one in the struct.
After writing these 3 operators, we introduce 3 operators relevant to the Error. those are MapError, ReplaceError, and Catch.
MapError
the MapError
is an operator that we can convert the error from the upstream to another one.
struct MapError<UpStream, Failure> : Publisher where UpStream : Publisher, Failure: Error {
typealias Output = UpStream.Output
let upstream: UpStream
let errorTransform: (UpStream.Failure) -> Failure
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let _ = upstream.sink { completion in
switch completion {
case .finished:
subscriber.receive(completion: .finished)
case .failure(let error):
subscriber.receive(completion: .failure(errorTransform(error)))
}
} receiveValue: { x in
let _ = subscriber.receive(x)
}
}
}
extension Publisher {
func _mapError<E>(_ transform: @escaping (Self.Failure) -> E) -> MapError<Self, E> where E : Error {
MapError(upstream: self, errorTransform: transform)
}
}
/// sample code
struct ErrA : Error {}
struct ErrB : Error {}
let _ = [1,2,3,4].publisher.tryMap { _ in
throw ErrA()
}._mapError { _ in
ErrB()
}.sink { completion in
print(completion) /// failure(ErrB())
} receiveValue: { _ in }
Like the Map operator, we need a closure asking the developers which error would be returned instead of the one from the upstream. so we use the switch statement to match the failure case and pass it out to get another error back.
ReplaceError
the ReplaceError
operator is different from the MapError. it replaces the failure event delivered from the upstream with another value event. in this way, the subscribers will be never received any failure event.
struct ReplaceError<UpStream> : Publisher where UpStream : Publisher {
typealias Output = UpStream.Output
typealias Failure = UpStream.Failure
let upstream: UpStream
let value: Output
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let _ = upstream.sink { completion in
if case .failure(_) = completion {
let _ = subscriber.receive(value)
subscriber.receive(completion: .finished)
} else {
subscriber.receive(completion: completion)
}
} receiveValue: { x in
let _ = subscriber.receive(x)
}
}
}
extension Publisher {
func replaceError(with output: Self.Output) -> ReplaceError<Self> {
ReplaceError(upstream: self, value: output)
}
}
/// sample code
struct Err : Error {}
let _ = [1,2,3,4].publisher.tryMap { _ in
throw Err()
}.replaceError(with: 5).sink { x in
print(x) /// [5]
}
Catch
the MapError is to replace a failure event with another failure event; the ReplaceError is to replace a failure event with a value event; the Catch
operator we’ll talk about below is to replace a failure event with a Publisher. Sounds wired? actually no, this operator will make the downstreams turn to subscribe to the new Publisher obtained from the catch operator when a failure event occurs. it’s like using try-catch, let’s imagine that try-clause and catch-clause represent a Publisher respectively. each time an error is thrown when running the try-clause, the right of control will hand over to the catch-clause.
struct Catch<UpStream, NewPublisher> : Publisher where UpStream : Publisher, NewPublisher : Publisher, UpStream.Output == NewPublisher.Output, UpStream.Failure == NewPublisher.Failure {
typealias Output = UpStream.Output
typealias Failure = UpStream.Failure
let upstream: UpStream
let catchBlock: (UpStream.Failure) -> NewPublisher
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let _ = upstream.sink { completion in
if case .failure(let error) = completion {
let _ = catchBlock(error).sink { newCompletion in
subscriber.receive(completion: newCompletion)
} receiveValue: { newX in
let _ = subscriber.receive(newX)
}
} else {
subscriber.receive(completion: .finished)
}
} receiveValue: { x in
let _ = subscriber.receive(x)
}
}
}
extension Publisher {
func `catch`<P>(_ handler: @escaping (Self.Failure) -> P) -> Catch<Self, P> where P : Publisher, Self.Output == P.Output {
Catch(upstream: self, catchBlock: handler)
}
}
/// sample code
struct Err : Error {}
let _ = [1,2,3,4].publisher.tryMap { x in
if x == 3 {
throw Err()
}
return x
}.catch { error in
[5,6,7,8].publisher
}.sink { x in
print(x) /// [1,2,5,6,7,8]
}
the core logic resides in the failure case of the switch statement, we subscribe to another Publisher when a failure event occurs.