Let’s try to implement reactive extension in Smalltalk. For inspiration, we will use ReactiveX. In first part of this series, we will implement basic reactive object called Observable with one creation method.
Observable
Full description of Observable is in this post. In simple words, it is something what emits a sequence of values.
Basic creation methods
We will start with implementing creation methods, because at the begin we need have the way to create observable. Basic creation methods are: fromArray, just, range, empty, never. For their description with examples, see this post about creation functions of Observables.
Test method
At the begging we need test method as use cases for creating Observable. So, positive test for array creation method is:
testArray
"Test method to create observable from array."
| observable observer|
observable := Observable array: #( 'Nice' 5 'Ok' ).
observer := TestObserver new.
observable subscribe: observer.
self assert: (observer data size) equals: 3.
self assert: (observer dataAt: 1) equals: 'Nice'.
self assert: (observer dataAt: 2) equals: 5.
self assert: (observer dataAt: 3) equals: 'Ok'.
self assert: (observer isCompleted).
TestObserver
TestObserver is class, which will observe our Observable and saves emitted items, which cane asserted later. From definition of Observer in reactive programming, observer must implement three basic methods: onNext: item
, onComplete
and onError: error
.
Class definition
Emitted items We will be storing in collection and complete and error will be boolean flag. So, the definition of class is:
Object subclass: #TestObserver
instanceVariableNames: 'data isCompleted isError'
classVariableNames: ''
package: 'Rx-Tests'
Methods implementation
Implementing Observer methods are straightforward. Set a boolean flag if completed or error, add items into collection in the onNext method. Of course getters should be mplemented also, but they are not shown.
initialize
data := OrderedCollection new.
isCompleted := false.
onComplete
isCompleted := true.
onNext: item
data add: item
onError: exception
isError := true.
Naive implementation
Now, we have implemented a simple test with TestObserver, so as next step, we can start implementing Observable. In the begging, let’s start with some naive implementation based on Observer design pattern.
Class definition
We will need only one attribute, in which will be all data stored. That is mandatory to emit items multiple times, because subscribe method can be called many times.
Object subclass: #Observable
instanceVariableNames: 'data'
classVariableNames: ''
poolDictionaries: ''
category: 'Rx'
Class methods
Our main method for creating Observable with array as argument, stores array to data attribute as a collection.
"Class method"
array: array
"Create observable from array"
| observable |
observable := self new.
data: OrderedCollection new
array do: [ :item | observable data add: item ].
^ observable.
Instance methods
Now, we need to implement methods to notify observer about a change of our observable state. Again, implementation is straightforward.
notify: item to: observer
"Notify all observer about change in state."
observer onNext: item
notifyCompleteTo: observer
"Notify observer about completed observable."
observer onComplete
notifyError: error to: observer
"Notify observer about error."
observer onError: error
And as last method, which we need to implement is subscribing Observer to Observable.
subscribe: observer
"Subscribe observer to this observable."
data do: [ :item | self notify: item to: observer ].
self notifyCompleteTo: observer.
Now run the test … green. Nice, it works. But if we think about implementing some additional features of reactive programming (reactive functions, other Observable types) and when we look at the definition of Observer in Reactive Programming and also on ReactiveX implementation of Observable, we see that Reactive programming uses extended Observer design pattern with subscription (we can unsubscribe) desing pattern. And also implementing of a new type of Observable must costs only implementing one or two classes.
Our actual state of codes can be found in repository of RxSmalltalk under this commit.
Robust implementation
After implementing naive solution, we discovered that we need more robust implementation, which will be more in OOP style and give us flexible hierarchy to simply create new type of Observable. So, the hierarchy of classes shoudl be:
- Class Observable will be an abstract class for all Observables. With “abstract” method subscribe, which will return the subscription.
- Class Subscription will holds data and will know how to emit each item of data to Observers.
- Creating class ArrayObservable, will create and return ArraySubscription in subscribe observer method.
- Class ArraySubscription will holds data as a collection and will know how to iterate the collection of data and how to emit data.
- Class Observer will be abstract class and have a default implementation of common methods of Observer.
Sequence of calling methods will be:
- Observer.fromArray: array -> ArrayObservable
- ArrayObservable.subscribe: observer -> ArraySubscription
- ArraySubscription -> Observer.onSubscribe
- Observer.onSubscribe -> Subscription.request: n.
Observable
Observable must be changed to more abstract type. It will be our facade over Observable child classes to hide implementation hierarchy. So, it will only create instances of child classes of Observables through class methods (Observable creation methods). Implementation of this is simple, removing all previous code from Observable and implement only one method, which will create our ArrayObservable with passed array as arguments.
Class definition
Object subclass: #Observable
instanceVariableNames: ''
classVariableNames: ''
package: 'Rx'
Create method for array
"Class method"
array: array
"Create observable from array"
^ ArrayObservable newFromArray: array
ArrayObservable
After that we need to implement a subclass of Observable for arrays. It will be called ArrayObservable and it will be creating ArraySubscription for Observers. So, we need to store array into some instance attribute in the constructor and implement subscribe method, which will return ArraySubscription with stored array.
Class definition
Observable subclass: #ArrayObservable
instanceVariableNames: 'collection'
classVariableNames: ''
package: 'Rx'
Constructor
newFromArray: aCollection
"Create observable from collection."
| observable |
observable := self new.
observable collection: aCollection.
^ observable
Subscribe method
Subscribe method will call only observer’s onSubscribe with instance of ArraySubsciption.
subscribe: observer
"Subscribe observer to observable by subscription."
observer onSubscribe: (ArraySubscription newObserver: observer data: collection)
Subscription
Subscription is bridge between Observable and Observer, which is created by Observable and used by the Observer. So, it must hold the data (which will be emitted), observer reference (which is owner of subscription) and also it must provide the option to be cancelled during emitting.
Class definition
We will define abstract class for all subscriptions, which will implement common (simple) methods and child classes will now what is stored in data and how it must be emitted.
Object subclass: #Subscription
instanceVariableNames: 'observer data cancelled completed'
classVariableNames: ''
package: 'Rx'
Common methods
Implementing methods initialize
, cancel
and complete
methods are straightforward.
initialize
cancelled := false.
completed := false.
cancel
"Request observable to stop sending data."
cancelled := true.
complete
"Sent notification about completed observable."
completed := true.
observer onComplete
ArraySubscription
Now we will create a subclass of Subscription for arrays, which will iterate over data and emit each item from the array.
Class definition
Subscription subclass: #ArraySubscription
instanceVariableNames: 'actualIndex'
classVariableNames: ''
package: 'Rx'
Constructor
The constructor will only create instance with data and prepare everything for emitting data. Data will be emitted when Observer will request it.
newObserver: aObserver data: aData
"Create subscribtion from observer and data."
|subscription|
subscription := self new.
subscription observer: aObserver.
subscription data: aData.
subscription initialize.
^subscription .
initialize
super initialize.
actualIndex := 1
Request method
The main part of all Subscriptions is method request: count
, which will emit count items to the observer.
Method request should emit count items by calling onNext: item
of Observer, only if the subscription is not completed or cancelled and catch any exception during emitting and emit erro to observer throught onError: error
method. Implementation of the method is below:
request: count
"Request count items from publisher"
[ self requestIndexes: (actualIndex to: (self maxIndex: count)).
actualIndex >= data size
ifTrue: [ self complete ] ]
on: Exception
do: [ :exception | observer onError: exception ]
MaxIndex method return maximal index, which can be requested on data.
maxIndex: count
"Return maximal index for request."
| indexWithCount |
indexWithCount := actualIndex + count.
indexWithCount <= data size
ifTrue: [ ^ indexWithCount ]
ifFalse: [ ^ data size ]
Method requestIndexes will create a range of indexes and check if the item can be now emitted (if subscription is not cancelled or completed) and if can be, it will call onNext: item
of the Observer.
requestIndexes: range
range
do: [ :index |
(completed or: cancelled)
ifTrue: [ ^ self ]
ifFalse: [ observer onNext: (self requestItem: index) ]. ]
RequestItem is only wrapping of method collection method at: index
with storing actual index to attribute actualIndex.
requestItem: index
"Return item on index."
actualIndex := index.
^ data at: index
Observer
And as last class, which we need to implement is a common class for Observers. It will holds achieved subscription, provides default implementation of cancel
and onSubscription: subscription
method. And other methods like onNext: item
, onError: error
and onComplete
should be implemented by subclass.
Class definition
Object subclass: #Observer
instanceVariableNames: 'subscription'
classVariableNames: ''
package: 'Rx'
Default methods
cancel
"Cancel subscription."
subscription cancel.
subscription := nil
Method onSubscribe
will store subscription and will request some big number of items on subscription.
onSubscribe: aSubscription
"Invoked after calling subscribe to observable."
subscription := aSubscription.
self request: 100000.
request: count
"Request count of items from subscription."
subscription isNotNil ifTrue: [ subscription request: count ]
Conclusion
As you can see by running test, we have implemented working creation method of Observable for arrays, which will be also the base for another Observable types(from range, never, empty and so on). This robust implementation gives us a more flexible implementation and nice OOP hierarchy. In the next part, we will implement another basic Observables of Reactive programming.
Actual state of RxSmalltalk can be found in RxSmalltalk repository.`