After implementing base Observable, we can implement other basic Observable types like Empty, Throw, Never and so on.

We can create Observable from array through creation static function. And now we want to implement other static creation methods like from Range, empty, never, throw. For more information about these functions look at this post about creation methods of Observable.

Tests

Again we will start with implementing test, which are defined by definition of creation method. Some of the tests are intuitive and straightforward.

When we start creating many tests for different types of Observables, Is good to create some method, which will test Observable in a positive way with an expected collection of values.

assertObservable: observable contains: data
	| testObserver | 

	testObserver := TestObserver new. 

	observable subscribe: testObserver. 

	self assert: testObserver isCompleted equals: true.
	self assert: testObserver isError equals: false.
	self assert: testObserver data size equals: data size.
	self assertIterables: testObserver data equals: data

Now we can simply test Observable by calling:

self assertObservable: observable contains: { item item2 }.

All tests which we will implement has the same form:

  1. Create Observable.
  2. Subscribe to Observable.
  3. Assert TestObserver with expected values.

Just

Just method only creates Observable from n items.

testJust
	| observable item|
	item := 1. 

	observable := Observable just: item.
	
	self assertObservable: observable contains: { item }.

Range

Creates Observable from interval defined by two numbers.

testRange
	| observable | 
	observable := Observable range: 1 to: 5.
	
	self assertObservable: observable contains: (1 to: 5).

Empty

Creates Observable with zero data.

testEmpty
	| observable |
	observable := Observable empty.

	observable subscribe: observer.
	
	self assert: (observer isCompleted) equals: true.
	self assert: (observer isError) equals: false. 
	self assert: (observer data size) equals: 0.

Never

Creates Observable, which will never stop emitting zero items.

testNever
	| observable |
	observable := Observable never.

	observable subscribe: observer.
	
	self assert: (observer isCompleted) equals: false.
	self assert: (observer isError) equals: false.
	self assert: (observer data size) equals: 0.

Raise

Creates Observable which only emits exception.

testRaise
	| observable exception | 

	exception := TestError new.
	observable := Observable raise: exception.
	observable subscribe: observer. 

	self assert: observer isCompleted equals: false.
	self assert: observer isError equals: true.
	self assert: observer data size equals: 0.
	self assert: observer error equals: exception

Implementation

In first part we defined the basic structure of Observable and in this post we will continue implementing from this structure.

Adding another type of Observable is simple. It can described with these steps:

  1. Define creation function with an appropriate name, in which will be created type of Observable.
  2. Create a new subclass of Observable for this creation method, which will contain only create subscription for this type of Observable.
  3. Create a new subscription for this type of Observable, which will call observer.onNext: item and observe.onComplete when is needed.

Range

We will begin with the simpliest method. Range method is nothing else than fromArray with another arguments.

range: from to: to
	^self array:(from to: to).

Just

Creation method just creates Observable from given items. We will implement only creation from one item.

Creation method

just: item
	^ JustObservable newItem: item.

Observable subclass

Observable subclass: #JustObservable
	instanceVariableNames: 'item'
	classVariableNames: ''
	package: 'Rx'
subscribe: observer
	observer onSubscribe: (JustSubscription newObserver: observer item: item)

Subscription subclass

Subscription only holds item as data and then it will emit it in request method for any count.

Subscription subclass: #JustSubscription
	instanceVariableNames: ''
	classVariableNames: ''
	package: 'Rx'
newObserver: aObserver item: aItem
	|subscription|
	subscription := self new.
	subscription observer: aObserver.
	subscription data: aItem.
	subscription initialize.
	^subscription .
request: count
	((count > 0 and: completed not) and: cancelled not)
		ifTrue: [ observer onNext: data.
			self complete ]

Empty

Empty Observable will emit no items and only emit complete after subscription.

Creation method

empty
	^ EmptyObservable new.

Observable subclass

Observable subclass: #EmptyObservable
	instanceVariableNames: ''
	classVariableNames: ''
	package: 'Rx'
subscribe: observer
	observer onSubscribe: (EmptySubscription newObserver: observer)

Subscription subclass

Implementing of empty subscription is straightforward, call observer.onComplete when is observer call request.

newObserver: aObserver
	|subscription|
	subscription := self new.
	subscription observer: aObserver.
	subscription initialize.
	^subscription .
request: count
	"Request count items from publisher, but emit zero items."

	((count >= 0 and: completed not) and: cancelled not)
		ifTrue: [ self complete ]

Never

Never Observable will emit no item or onComplete or onError ever.

Creation method

never
	^ NeverObservable new.

Observable subclass

Observable subclass: #NeverObservable
	instanceVariableNames: ''
	classVariableNames: ''
	package: 'Rx'
subscribe: observer
	observer onSubscribe: (NeverSubscription newObserver: observer)

Subscription subclass

Never subscription will do nothing in method request, so we do not need to override it.

Subscription subclass: #NeverSubscription
	instanceVariableNames: ''
	classVariableNames: ''
	package: 'Rx'
newObserver: aObserver
	"Creates never subscription."
	
	|subscription|
	subscription := self new.
	subscription observer: aObserver.
	subscription initialize.
	^subscription.

Raise

Raise Observable will holds error and emit it when observer subscribe to it.

Creation method

raise: error
	^ RaiseObservable newError: error

Observable subclass

Observable subclass: #RaiseObservable
	instanceVariableNames: 'error'
	classVariableNames: ''
	package: 'Rx'
subscribe: observer
	observer onSubscribe: (RaiseSubscription newObserver: observer error: error)

Subscription subclass

Implementation is again simple, call only observer.onError with saved error.

newObserver: aObserver error: error
	|subscription|
	subscription := self new.
	subscription observer: aObserver.
	subscription data: error.
	subscription initialize.
	^subscription .
accessing
request: count
	cancelled not
		ifTrue: [ observer onError: data ]