Insight into Combine (Part I)

Yanbo Sha
8 min readMar 9, 2023

--

Photo by Sigmund on Unsplash

Apple unveiled the official Reactive framwork: Combine in WWDC²⁰¹⁹. There had been a lot of articles around most of which are the basic tutorials. Therefore, I’d like to talk about it in a more detailed way.

This is the first part of a three part series about diving into Combine framework. In this part, I’ll talk about the basic concepts and implement Empty, Just, Sequence, Map, CompactMap, TryMap, Fail.

you can check other parts by links: second part . final part .

I suppose you have known a little or a lot about the Combine framework before reading this article, if not, don’t worry. from now, I’ll take you to go through the Combine framework by implementing them.

As you know, like RxSwift for Swift or ReactiveCocoa for Objective-C, Combine is another similar reactive framework unveiled by Apple. It works well with SwiftUI and updates along with the iOS version.

Whether you’re new to it or have used it for a long time, do you know exactly how it works under the hood?

the Combine framework has 3 core concepts. those are Publisher, Operator, and Subscriber. The Publisher is the event source, responsible for emitting events to the downstreams (either Operator or Subscriber); the Operator takes charge of handling the events from the upstreams (either Publisher or Operator) and delivering them to the downstreams (Subscriber); the Subscriber is responsible for consuming these events from the upstream (either Publisher or Operator). Keep in mind these concepts and relationships. we will implement our own Combine framework based on these 3 concepts.

Before writing our own Publishers, I have to talk a little more about the Publisher. It’s a protocol in the Combine framework that can emit three types of events. they’re respectively value, finished completion, and failure completion. Also, this Protocol has two associated types: Output and Failure. Output is the type of value event mentioned above, and Failure is the type of failure completion event mentioned above ( the finished completion is an event with no parameter).

Empty

First, we can write an Empty Publisher as the starter. the Empty is the Publisher that emits a finished completion event immediately after being subscribed. So we define a struct called Empty, and let it conform to the Publisher and explicitly declare the Output and Failure.

struct Empty : Publisher {

typealias Failure = Never
typealias Output = Never

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
subscriber.receive(completion: .finished)
}
}
/// sample code
let _ = Empty().sink { completion in
print(completion) /// finished
} receiveValue: { _ in }

Behind the scenes. When we call the sink method (it’ll be implemented below), a new Subscriber instance is created inside, holding the closure we provide, and passed to the Publisher’s receive method as the subscriber parameter. Then we emit the finished completion event to the subscriber by calling its receive method. this way, the closure we provide will be called with the parameter of the finished completion event.

Just

Take some time to understand the simplest Publisher above. After that, let’s take a look at another simple one: Just Publisher. Not like Empty, the Just Publisher will send one value event before sending the finished completion event.

struct Just<T> : Publisher {

typealias Failure = Never
typealias Output = T

private let value: T
init(_ value: T) {
self.value = value
}

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let _ = subscriber.receive(self.value)
subscriber.receive(completion: .finished)
}
}
/// sample code
let _ = Just(1).sink { completion in
print(completion) /// finished
} receiveValue: { x in
print(x) /// [1]
}

the Just Publisher’s initializer requires a value of any type which is used for the value event. so the difference with the Empty Publisher is that we publish the only value event by calling the receive method before sending the finished completion event.

Sequence

It’s just as simple as the Empty Publisher, isn’t it? Let’s review what we did, it’s not hard to identify that the Empty and the Just represent 0 and 1 respectively. Umm…, you know what we should do next. Yup, we should write a Publisher for Sequence.

in the Combine framework, we can easily create a Sequence Publisher by calling the publisher method on Array type. Let’s write our own one.

struct Sequence<T>: Publisher {

typealias Failure = Never
typealias Output = T

private let seq: [T]

init(_ seq: [T]) {
self.seq = seq
}

func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, T == S.Input {
self.seq.forEach { item in
let _ = subscriber.receive(item)
}
subscriber.receive(completion: .finished)
}
}

extension Array {
var publisher: Sequence<Self.Element> {
Sequence(self)
}
}
/// sample code
let _ = [1,2,3,4].publisher.sink { x in
print(x) /// [1, 2, 3, 4]
}

As you can see, the Sequence Publisher needs an Array instance as the event source and iterates through it passing each item to the subscriber before sending the finished completion event. and also, we add a method extension to easily build a sequence publisher from the Array type. It’s worth noting that most of the handy method is implemented by creating and returning a relevant instance taking the self as the parameter, you will deeply understand this sentence by looking through this article.

Okay, we have written 3 Publishers: Empty(0), Just(1), and Sequence(0+). It’s a good point to introduce some Operators here before continuing to write other Publishers.

The Operator is like a pipeline. it’s a Publisher in nature taking other Publishers as its upstreams and taking the Subscribers as its downstreams. In between the pipeline, we can do some filter work, transformation, or anything we’d like to do.

Map

First, we introduce a frequently used operator called Map, it’s used to transform a group of values into another group with the same or different type. Talk is cheap, show me the code.

struct Map<T, UpStream>: Publisher where UpStream: Publisher {

typealias Output = T
typealias Failure = UpStream.Failure

let upstream: UpStream
let transform: (UpStream.Output) -> T

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {

let _ = upstream.sink { completion in
subscriber.receive(completion: completion)
} receiveValue: { x in
let _ = subscriber.receive(transform(x))
}
}
}

extension Publisher {
func map<T>(_ transform: @escaping (Self.Output) -> T) -> some Publisher {
Map(upstream: self, transform: transform)
}
}
/// sample code
let _ = [1, 2, 3, 4].publisher.map { x in
String(x * 10)
}.sink { x in
print(x) /// ["10", "20", "30", "40"]
}

Aha, It’s a little bit complex. Don’t worry, let’s break them down and explain them one by one. Firstly, as we said above, we can use a map operator to transform events into other ones. so it’s easy to understand why the Map’s initializer requires a closure that its parameter type and returning type are different generic types. Secondly, as an operator, we should subscribe the upstream upon we’re subscribed by others to make the “pipelines” work (we can use multiple operators between the original upstream and ultimate downstream). instead of sending the events to the downstream directly, we pass through the result of transform closure for each event. in this way. all of the events going through this pipeline will change to the other ones.

Have you still remember what I said? most of the handy method is implemented by creating and returning a relevant instance taking the self as the parameter. in this operator, we provide an extension method, so that we don’t have to write code like:

Map([1,2,3,4].publisher) { 
x in String(x * 10)
}

we can use it in a more functional way:

[1, 2, 3, 4].publisher.map { x in
String(x * 10)
}

Another point worth noting is the returning type of extension method. we use opaque type instead of concrete one because the caller doesn’t care about what exactly the publisher type is. for this reason, using opaque type makes the returning type more concise. otherwise, we have to write a long type name (nested generic types).

CompactMap

Like the Map operator, there’s a similar operator called CompactMap. it’s a little bit different from the Map operator because it will ditch some events whose result of calling the transform closure is nil. we can use this operator to filter out events we don’t follow. Let’s check the code below:

struct CompactMap<Upstream, Output> : Publisher where Upstream : Publisher {

typealias Failure = Upstream.Failure

let upstream: Upstream
let transform: (Upstream.Output) -> Output?

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let _ = upstream.sink { completion in
subscriber.receive(completion: completion)
} receiveValue: { x in
if let x = transform(x) {
let _ = subscriber.receive(x)
}
}
}
}

extension Publisher {
func compactMap<T>(_ transform: @escaping (Self.Output) -> T?) -> CompactMap<Self, T> {
CompactMap(upstream: self, transform: transform)
}
}
/// sample code
let _ = [1, 2, 3, 4].publisher.compactMap { x in
if x == 3 {
return nil
}
return String( x * 10 )
}.sink { x in
print(x) /// ["10", "20", "40"]
}

As we said, its returning type of closure is Optional, and we send the event only when the result is not nil.

TryMap

Like Map and CompactMap, there’s still another similar operator called TryMap. this operator can interrupt the pipeline by sending the failure completion event when an error is thrown. Take a look at its implementation below:

struct TryMap<UpStream, Output> : Publisher where UpStream: Publisher {

typealias Failure = Error

let upstream: UpStream
let transform: (UpStream.Output) throws -> Output

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {

let _ = upstream.sink { completion in
switch completion {
case .finished:
subscriber.receive(completion: .finished)
case .failure(let error as Error):
subscriber.receive(completion: .failure(error))
}
} receiveValue: { x in
do {
let _ = try subscriber.receive(transform(x))
} catch let err {
subscriber.receive(completion: .failure(err))
}
}

}
}

extension Publisher {
func _tryMap<T>(_ transform: @escaping (Self.Output) throws -> T) -> TryMap<Self, T> {
TryMap(upstream: self) { x in
try transform(x)
}
}
}
/// sample code
struct Err: Error {}
let _ = [1, 2, 3, 4].publisher._tryMap { x in
if x == 3 {
throw Err()
}
return String( x * 10 )
}.sink { completion in
print(completion) /// failure(Err())
} receiveValue: { x in
print(x) /// ["10", "20"]
}

As the receive method wrote, it try-catches the calling of each closure and sends the failure when an error occurs, ending the pipeline earlier than expected.

OK, Map, CompactMap, TryMap. I think it must take you some time to understand them. and you have possibly lost patience in reading through this article. So as a way to make you relax, let’s get back to the Publisher and let me introduce the last simple Publisher called Fail. as its name said, this Publisher only emits a failure immediately after being subscribed.

struct Fail : Publisher {

typealias Output = Never
typealias Failure = Error

let error: Error

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
subscriber.receive(completion: .failure(error))
}
}
/// sample code
struct Err : Error {}
let _ = Fail(error: Err()).sink { completion in
print(completion) /// failure(Err())
} receiveValue: { _ in }

I think I don’t need to explain it anymore, you can understand well as you’re here.

--

--

Yanbo Sha
Yanbo Sha

Written by Yanbo Sha

iOS Programmer with 10 years of experience, interested in the latest technique

No responses yet