Insight into Combine (Part III)

Yanbo Sha
10 min readMar 9, 2023

--

Photo by Ezra Jeffrey-Comeau on Unsplash

Apple unveiled the official Reactive framwork: Combine in WWDC²⁰¹⁹. There had been a lot of articles around most of which are the basic tutorials. Therefore, I’d like to talk about it in a more detailed way.

This is the final part of a three part series about diving into Combine framework. In this part, I’ll implement PassthroughSubject, CurrentValueSubject, Future, Published, Sink, Assign, MakeConnectable, Autoconnect, Multicast, TimerPublisher.

you can check other parts by links: first part. second part .

Okay, we’ve written a couple of Publishers, but these Publishers have something in common, did you notice that? All of them will be triggered each time being subscribed which means they’re not in a one-to-many relationship. we write the core logic in the receive method which will be called each time being subscribed. we can’t emit an event to multiple subscribers at once. And also, we can’t control these Publishers as we want, like the control over sending events.

For these demands, Combine provides another Protocol called Subject which conforms to the Publisher Protocol and adds some methods that are used to send events. the Combine framework offers two built-in concrete types: PassthroughSubject and CurrentValueSubject.

Both PassthroughSubject and CurrentValueSubject are able to maintain multiple subscribers which means they’re in a one-to-many relationship. Developers can call send method on them to send events manually which is useful when we are refactoring the legacy code in a reactive way. The only difference between them is that CurrentValueSubject has an initial value and updates it when send methods are called. it ensures subscribers will be delivered with the latest value immediately after the subscription occurs.

PassthroughSubject

Let’s write the PassthroughSubject first.

class PassthroughSubject<Output, Failure: Error> : Subject {

typealias SendValue = ((Output)->Void)
typealias SendCompletion = ((Subscribers.Completion<Failure>)->Void)

private var sendValues = [SendValue]()
private var sendCompletions = [SendCompletion]()

func send(_ value: Output) {
sendValues.forEach { $0(value) }
}

func send(completion: Subscribers.Completion<Failure>) {
sendCompletions.forEach { $0(completion) }
}

func send(subscription: Subscription) {}

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
sendValues.append { let _ = subscriber.receive($0) }
sendCompletions.append { subscriber.receive(completion: $0) }
}
}
/// sample code
let passthru = PassthroughSubject<Int, Never>()
let c1 = passthru.sink { print($0) }
let c2 = passthru.sink { print($0) }
let c3 = passthru.sink { print($0) }
passthru.send(1) /// [1,1,1]
passthru.send(2) /// [2,2,2]

It looks not like those publishers we wrote above, right? Instead of handling the event sending work inside the receive method, we use two properties to reference the sendValue job and sendCompletion job respectively, and let the caller decide when they’re performed by calling them in the corresponding send methods.

Maybe you have a confusion about why we don’t hold the subscriber directly instead of holding its methods respectively? It’s actually easy to answer especially when you implement it on your own. the key point is the subscriber parameter inside the receive method is a generic type scoped within a method. It’s invalid to use this generic type from outside of the method, so the workaround is to use the outside closures to wrap up the code inside the method to extend its accessible scope. for this reason, we should update the variables in the non-mutating receive method which would lead to a compilation error if we define the Subject as a struct. This also explains why we define this Subject as a class instead of struct.

CurrentValueSubject

the following Subject is CurrentValueSubject.

class CurrentValueSubject<Output, Failure: Error> : Subject {

let initialValue: Output
var lastValue: Output

typealias SendValue = ((Output)->Void)
typealias SendCompletion = ((Subscribers.Completion<Failure>)->Void)

private var sendValues = [SendValue]()
private var sendCompletions = [SendCompletion]()
private var completed = false

init(_ initialValue: Output) {
self.initialValue = initialValue
self.lastValue = initialValue
}

func send(_ value: Output) {
sendValues.forEach { $0(value) }
lastValue = value
}

func send(completion: Subscribers.Completion<Failure>) {
sendCompletions.forEach { $0(completion) }
}

func send(subscription: Subscription) {}

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
sendValues.append { let _ = subscriber.receive($0) }
sendCompletions.append { subscriber.receive(completion: $0) }
let _ = subscriber.receive(lastValue)
}
}
/// sample code
let currentVal = CurrentValueSubject<Int, Never>(1)
let c1 = currentVal.sink { print($0) } /// [1]
let c2 = currentVal.sink { print($0) } /// [1]
let c3 = currentVal.sink { print($0) } /// [1]
currentVal.send(2) /// [2,2,2]
currentVal.send(3) /// [3,3,3]

I suppose it’s needless to say more about why it is. we can compare it with the PassthroughSubject to know a lot. But there’s still something worth noting that we call the sendValue closure each time being subscribed to achieve the goal that subscribers would be delivered with the latest value immediately after the subscription occurs.

Future

In the Combine, there’s another Publisher called Future. it takes a closure as the parameter and developers can use the promise inside the closure to send the result. it’s useful when we are wrapping up some asynchronous tasks into a Publisher and producing only one result. like fetching data from the local database or loading data from the remote server.

struct Future<Output, Failure: Error> : Publisher {

typealias Promise = (Result<Output, Failure>)->Void
let block: (@escaping Promise) -> Void

init(_ block: @escaping (@escaping Promise)->Void) {
self.block = block
}

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {

let promise: Promise = { promise in
switch promise {
case .success(let data):
let _ = subscriber.receive(data)
subscriber.receive(completion: .finished)
case .failure(let error):
subscriber.receive(completion: .failure(error))
}
}
block(promise)
}
}
/// sample code
Future { promise in
AF.request("https://domain/path/to").response { response in
let id model = toModel(response) else {
promise(.success(model))
} else {
promise(.failure(someError))
}
}
}.sink { x in
print(x) /// [model]
}

In this Publisher, we use the Result enum to represent all of the cases to make the API concise.

@Published

Okay, let’s talk about a property wrapper called @Published. Most of the time, we use @Published to modify properties in an ObservableObject. because @Published will create a CurrentValueSubject inside which will send events each time this property changes and ObservableObject uses a publisher to aggregate all of the @Published-generated events, which means we can subscribe the ObservableObject’s publisher instead of subscribing to each of its @Published’s publishers (SwiftUI offers StateObject, ObservedObject, and EnvironmentObject all of which subscribe to the ObservableObject’s aggregated publisher and update UI automatically).

Let’s get back on track, here is the @Published implementation:

@propertyWrapper
struct Published<T> {

let currentValue: CurrentValueSubject<T, Never>

init(wrappedValue: T) {
currentValue = CurrentValueSubject(wrappedValue)
}

var wrappedValue: T {
get { currentValue.value }
set { currentValue.send(newValue) }
}

var projectedValue: CurrentValueSubject<T, Never> { currentValue }
}
/// sample code
struct Demo {
@Published var name = "Yanbo Sha"
}
var demo = Demo()
let cancellable = demo.$name.sink { completion in
print(completion)
} receiveValue: { x in
print(x) /// ["Yanbo Sha", "Rock", "iOS Developer"]
}
demo.name = "Rock"
demo.name = "iOS Developer"

As it’s holding a CurrentValueSubject instead of a PassthroughSubject, when the sink method is called, we will immediately receive the initial value: “Yanbo Sha”.

We’ve written lots of Publishers and Operators. Each time we are writing sample code, we use the sink method to subscribe the upstream. Now, it’s time to detail the Subscriber.

in the Combine framework, it features 2 built-in Subscribers: Sink and Assign, both conform to the Subscriber protocol.

Sink

Let’s look at the most familiar Subscriber: Sink

struct Sink<Input, Failure: Error> : Subscriber {

let receiveCompletion: (Subscribers.Completion<Failure>)->Void
let receiveBlock: (Input)->Void

func receive(subscription: Subscription) {
subscription.request(.unlimited)
}

func receive(_ input: Self.Input) -> Subscribers.Demand {
receiveBlock(input)
return .unlimited
}

func receive(completion: Subscribers.Completion<Failure>) {
receiveCompletion(completion)
}

var combineIdentifier: CombineIdentifier {
CombineIdentifier()
}
}

extension Publisher {
func sink(receiveCompletion:@escaping (Subscribers.Completion<Self.Failure>)->Void, receive:@escaping (Self.Output)->Void) {
self.receive(subscriber: Sink<Self.Output, Self.Failure>(receiveCompletion: receiveCompletion, receiveBlock: receive))
}
}
/// sample code
let _ = [1, 2, 3, 4].publisher.sink { completion in
print(completion) /// finished
} receive: { x in
print(x) /// [1,2,3,4]
}

Let’s check what happened inside when the sink method is called. Firstly, we should create a Sink instance holding two closures passed in which are used to handle the value event and completion event respectively. Secondly, we manually call the receive method on Publisher (When we are working on writing Publishers or Operators, we know we need to write some code in the receive method because it’ll be called immediately after the subscription occurs. here’s the reason). Thirdly, we call the receive closures inside the corresponding receive method in Sink.

Assign

Most of the time, Sink is enough. But when we are working on updating the properties of an instance by subscribing to a Publisher. There’s is an easier Subscriber for us called Assign. We can use Assign to update properties by specifying its KeyPath.

class Assign<Input, Failure: Error, Root> : Subscriber {

var instance: Root
let keyPath: ReferenceWritableKeyPath<Root, Input>

init(instance: Root, keyPath: ReferenceWritableKeyPath<Root, Input>) {
self.instance = instance
self.keyPath = keyPath
}

func receive(_ input: Input) -> Subscribers.Demand {
instance[keyPath: keyPath] = input
return .unlimited
}

func receive(completion: Subscribers.Completion<Failure>) {}

func receive(subscription: Subscription) {
subscription.request(.unlimited)
}

var combineIdentifier: CombineIdentifier {
CombineIdentifier()
}
}

extension Publisher {
func assign<Root>(to keyPath:ReferenceWritableKeyPath<Root, Self.Output>, on root: Root) {
self.receive(subscriber: Assign<Self.Output, Self.Failure, Root>(instance: root, keyPath: keyPath))
}
}
/// sample code
class Person {
@Published var name = "Yanbo Sha"
}
class View {
var text = ""
}
let view = View()
let person = Person()
let cancellable = person.$name.assign(to: \.text, on: view)
person.name = "Rock"
print(view.text) /// "Rock"

Instead of holding two closures inside the Subscriber, it takes an instance and one of its KeyPaths as the parameters. When the subscriber receives the new value event, we will use the subscript method to update it as the new value associated with the KeyPath.

As far. We have written most of the Publishers, Operators, and Subscribers. and I think you can implement the rest of them in the Combine on your own if you read through this article.

Although, there’re still 3 Publishers I’d like to write down here. Because it’s a little complex. they’re MakeConnectable, AutoConnect, and Multicast.

Before writing, we need to introduce a protocol called ConnectablePublisher. it conforms to the Publisher protocol with an extra method: connect(). ConnectablePublisher is like a valve of the pipeline. the connect() is used to open the valve and the cancellable returned from the connect() is used to close the valve.

MakeConnectable

Let’s start with the MakeConnectable:

class MakeConnectable<UpStream> : ConnectablePublisher where UpStream : Publisher {

typealias Output = UpStream.Output
typealias Failure = UpStream.Failure

var connected = false

var subBlock: (()->Void)?
var cancellable: AnyCancellable?

let upstream: UpStream

init(_ upstream: UpStream) {
self.upstream = upstream
}

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
subBlock = {
self.cancellable = self.upstream.sink { completion in
subscriber.receive(completion: completion)
} receiveValue: { x in
let _ = subscriber.receive(x)
}
}
}

func connect() -> Cancellable {
if !connected {
connected = true
subBlock?()
}
return cancellable ?? AnyCancellable {}
}
}

extension Publisher {
func makeConnectable() -> MakeConnectable<Self> {
MakeConnectable(self)
}
}
/// sample code
let passthru = PassthroughSubject<Int, Never>()
let connectable = passthru.makeConnectable()
let cancellable1 = connectable.sink { x in
print(x) /// [3,4]
}
passthru.send(1)
passthru.send(2)
let cancellable2 = connectable.connect()
passthru.send(3)
passthru.send(4)

It’s a little bit longer, but not too hard to understand. As it’s a valve, we need a flag to indicate if it’s open or not, so we define a Bool type of property named connected. when the connect method is called, we update the connected flag to true and call the closure to establish the subscription relationship which will “open the valve”. the returned cancellable is used to decide when to destroy the relationship which will “close the valve”.

Autoconnect

the AutoConnect Publisher is used to automatically “open the valve” when the subscription method is called. it sounds useless, but it’s useful when we need to cancel the connectable feature if its upstream is ConnectablePublisher.

class Autoconnect<UpStream> : Publisher where UpStream : ConnectablePublisher {

let upstream: UpStream

init(_ upstream: UpStream) {
self.upstream = upstream
}

var cancellable: Cancellable?

typealias Output = UpStream.Output
typealias Failure = UpStream.Failure

func receive<S>(subscriber: S) where S : Subscriber, UpStream.Failure == S.Failure, UpStream.Output == S.Input {

let _ = upstream.sink { completion in
subscriber.receive(completion: completion)
} receiveValue: { x in
let _ = subscriber.receive(x)
}
cancellable = upstream.connect()
}
}

extension Publisher {
func autoconnect() -> Autoconnect<Self> where Self : ConnectablePublisher {
Autoconnect(self)
}
}
/// sample code
let _ = [1,2,3,4].publisher.makeConnectable().autoconnect().sink { x in
print(x) /// [1,2,3,4]
}

It’s a lot simpler than MakeConnect, Eh? we just need to call the connect() method inside the receive method.

Multicast

the last one is Multicast, it’s useful when we want to convert a normal Publisher into a Subject that’s able to send an event to multiple subscribers at once.

class Multicast<UpStream, SubjectType> : ConnectablePublisher where UpStream : Publisher, SubjectType : Subject, UpStream.Failure == SubjectType.Failure, UpStream.Output == SubjectType.Output {

typealias Output = UpStream.Output
typealias Failure = UpStream.Failure

var connected = false
var cancellable: Cancellable?

let upstream: UpStream
let subject: SubjectType

init(upstream: UpStream, createSubject: () -> SubjectType) {
self.upstream = upstream
self.subject = createSubject()
}

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
self.subject.subscribe(subscriber)
}

func connect() -> Cancellable {
if !connected {
connected = true

cancellable = upstream.sink {
self.subject.send(completion: $0)
} receiveValue: {
self.subject.send($0)
}
return cancellable ?? AnyCancellable {}
}
return AnyCancellable {}
}
}

extension Publisher {
func multicast<S>(_ createSubject: @escaping () -> S) -> Multicast<Self, S> where S : Subject, Self.Failure == S.Failure, Self.Output == S.Output {
Multicast(upstream: self, createSubject: createSubject)
}
}
/// sample code
let multi = [1,2,3,4].publisher.multicast(subject: PassthroughSubject())
let c1 = multi.sink { x in
print("sink1 \(x)") /// [1,2,3,4]
}
let c2 = multi.sink { x in
print("sink2 \(x)") /// [1,2,3,4]
}
let c3 = multi.sink { x in
print("sink3 \(x)") /// [1,2,3,4]
}
let cancellable = multi.connect()

TimerPublisher

Up to now, we have covered most of the built-in Publishers, Operators, and Subscribers in the Combine framework. Actually, in other Apple frameworks like Foundation, there’re also some out-of-the-box Publishers. Here I write a TimerPublisher as a sample, you can explore and implement more built-in Publishers.

class TimerPublisher : ConnectablePublisher {

typealias Failure = Never
typealias Output = Date

private var connected = false

let timeInterval: TimeInterval
let runloop: RunLoop
let mode: RunLoop.Mode

private var sendValue: ((Output)->Void)?
private var sendCompletion: ((Subscribers.Completion<Failure>)->Void)?

init(timeInterval: TimeInterval, runloop: RunLoop = .main, mode: RunLoop.Mode = .common) {
self.timeInterval = timeInterval
self.runloop = runloop
self.mode = mode
}

func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {

sendValue = {
let _ = subscriber.receive($0)
}
sendCompletion = {
subscriber.receive(completion: $0)
}
}

func connect() -> Cancellable {

if !connected {
connected = true

let timer = Timer(timeInterval: self.timeInterval, repeats: true) { timer in
self.sendValue?(Date())
}
self.runloop.add(timer, forMode: self.mode)
return AnyCancellable {
timer.invalidate()
self.sendCompletion?(.finished)
}
}

return AnyCancellable {}
}
}
/// sample code
let timer = TimerPublisher(timeInterval: 1)
let _ = timer.sink { date in
print(date)
}
cancellable = timer.connect()

Okay, this article is getting closer to the end. I hope you benefit from it. If there’s something wrong with this article, feel free to comment below to correct me. I’ll publish more and more high-quality articles about iOS development, you can follow me to stay up-to-date.

--

--

Yanbo Sha
Yanbo Sha

Written by Yanbo Sha

iOS Programmer with 10 years of experience, interested in the latest technique