Angular Reactive Development: Difference between revisions
Tag: Reverted |
Tag: Reverted |
||
Line 711: | Line 711: | ||
const result$ = tenDivideByValueCatchError(source$) | const result$ = tenDivideByValueCatchError(source$) | ||
expectObservable(result$).toBe(expectedMarble, expectedValues) | expectObservable(result$).toBe(expectedMarble, expectedValues) | ||
}) | |||
}) | |||
</syntaxhighlight> | |||
===Testing with an concat with cold=== | |||
Struggled again to work off the examples on the net but getting through it has allowed me to understand the concepts. | |||
<syntaxhighlight lang="ts"> | |||
const concatHot = ( | |||
source1$: Observable<number>, | |||
source2$: Observable<number>, | |||
): Observable<number> => { | |||
return source1$.pipe(concat(source2$)) | |||
} | |||
</syntaxhighlight> | |||
And the test case | |||
<syntaxhighlight lang="ts"> | |||
it('should work with cold observables', () => { | |||
scheduler.run(({ cold, expectObservable }) => { | |||
const values1 = { a: 5, b: 2 } | |||
const source1$ = cold('-a---b-|', values1) | |||
const values2 = { c: 10, d: 0 } | |||
const source2$ = cold('-c---d-|', values2) | |||
const expectedMarbles = '-a---b--c---d-|' | |||
const expectedValues = { a: 5, b: 2, c: 10, d: 0 } | |||
const result$ = concatCold(source1$, source2$) | |||
expectObservable(result$).toBe(expectedMarbles, expectedValues) | |||
}) | |||
}) | |||
</syntaxhighlight> | |||
===Testing with an concat with hot=== | |||
I don't quite understand how we set the test up in real life. My guess it that the hot observable is mocking the timings of the release of the results. | |||
<syntaxhighlight lang="ts"> | |||
const concatHot = ( | |||
source1$: Observable<number>, | |||
source2$: Observable<number>, | |||
): Observable<number> => { | |||
return source1$.pipe(concat(source2$)) | |||
} | |||
</syntaxhighlight> | |||
Here is the test case which does make sense. | |||
<syntaxhighlight lang="ts"> | |||
/* | |||
When testing hot observables you can specify the subscription | |||
point using a caret ^, similar to how you specify subscriptions | |||
when utilizing the expectSubscriptions assertion. | |||
*/ | |||
it('should work with hot observables', () => { | |||
scheduler.run(({ hot, expectObservable }) => { | |||
const values1 = { a: 5, b: 2 } | |||
const source1$ = hot('---a--^-b-|', values1) | |||
const values2 = { c: 10, d: 0 } | |||
const source2$ = hot('-----c^----d-|', values2) | |||
const expectedMarbles = '--b--d-|' | |||
const expectedValues = { b: 2, d: 0 } | |||
const result$ = concatHot(source1$, source2$) | |||
expectObservable(result$).toBe(expectedMarbles, expectedValues) | |||
}) | }) | ||
}) | }) |
Revision as of 20:22, 12 December 2021
Introduction
Background
RxJs originally was developed by Microsoft in Rx.NET. It works on Java and other platforms. The definition is "RxJs is a library for composing asynchronous and event-based programs using observable sequences"
Other approaches
- Callbacks - difficult to use with nested async operations
- Promises - can only handle a single emission and is not cancellable
- Asyc/await - can only handle a single emission and is not cancellable
Why Rxjs
- One - technical for all data e.g. mouse clicks, api calls
- Compositional - with the operators easy to transform and combine
- Watchful - With its push model to notify subscribers
- Laxy - Only executes when subscribed to
- Handles Error - Built in
- Cancellable
Angular already use RxJs in
- Routing
- Reactive Forms
- HttpClient
Reactive Development
This is characterized by
- Quick to react to user - drrrrrr
- Resilient to failure -error checking all the time
- Reactive to state changes
Resources
The demos were at https://github.com/DeborahK/Angular-RxJS
Terms and Syntax
Observer
Observes the stream and responds to its notification
- Next item
- Error occurred
- Complete
const observer = {
next: apple => console.log(`Apple emitted ${apple}`),
error: err => console.log(`Error occurred ${err}`),
complete: ()=> console.log(`No more Apples`)
}
Obeservable Stream
Also call an Observable sequence, a stream and can have a stream of any type of data such as number, string, events, objects or other streams and of course http requests
const appleStream = new Observerable(appleObserver => {
appleObserver.next(`Applie 1`)
appleObserver.next(`Applie 2`)
appleObserver.complete()
})
Subscription
In order to start the stream we need to subscribe. We do this by passing an observer to the stream.
const sub = applestream.subscribe(observer)
It is unusual to see this in real code. In reality must developers would just pass it in
const sub = appleStream.subscribe(
apple => console.log(`Apple emitted ${apple}`),
err => console.log(`Error occurred ${err}`),
()=> console.log(`No more Apples`)
)
There are several ways to unsubscribe or stop the stream.
- Call Complete
- Use completing operators like take(1)
- Throw an error
- Call unsubscribe
Without stopping the stream memory leaks can occur
Creating an Observable
of
Creates an observable from a set of defined values. If emits each item and then calls complete.
const appleStream = of(`Apple 1`, `Apple 2`)
from
Same as of but create an item from each value in the array.
const appleStream = from([`Apple 1`, `Apple 2`])
Differences
The difference is more apparent between of and from when you consider passing [`Apple 1`, `Apple 2`] to the of function. This will only produce one item. To get the same result you would need to use the spread operator of(...apples) to get two results.
Operators
Introduction
There are lots of these. With pipe the result of the operator is passed to the next operator.
of(2,4,6)
.pipe(
map(item => item * 2),
tap(item => console.log(item)),
take(2)
).subscribe(console.log)
Marble Diagrams
These are presented back to front in my view. As the first item is shown on the left and the last item on the right. Having said that, here is an example.
Declarative vs Imperative
Bit of a sideline but this kept coming up in the course and needed to understand what they meant. So looking at https://ui.dev/imperative-vs-declarative-programming I got the following
I’m going to ask you a question. I want you to think of both an imperative response and a declarative response.
“I’m right next to Wal-Mart. How do I get to your house from here?”
Imperative response Go out of the north exit of the parking lot and take a left. Get on I-15 North until you get to the 12th street exit. Take a right off the exit like you’re going to Ikea. Go straight and take a right at the first light. Continue through the next light then take your next left. My house is #298.
A declarative response My address is 298 West Immutable Alley, Eden, Utah 84310
Right now this is sounding more like requirements, declarative, vs imperative, design. Still confused. But I think declarative means that we right functional code wrapping around the implementation. Just like the old days.
Going Reactive
Async Pipes
Good for reactive UI
- subscribes to observable
- returns each emitted value
- when a new item is emitted, component is marked to be checked for changes
- Unsubscribes when component is destroyed
Error Handling
Catch and Replace
This where you catch the error and replace the error with something else. In this case it is very stupid.
return this.http.get<Product[]>(this.productUrl)
.pipe(
catchError(err => {
console.error(err)
return of([
{id:1, productName: 'cart'},
{id:2, productName: 'hammer'}
])
})
We need to remember catchError is just an operator.
Catch and Rethrow
From the course this resulted propagating the error further us the stream.
return this.http.get<Product[]>(this.productUrl)
.pipe(
catchError(err => {
console.error(err)
return throwError(err)
})
So instead of throwError a function is used to do this.
private handleError(err: any): Observable<never> {
// in a real world app, we may send the server to some remote logging infrastructure
// instead of just logging it to the console
let errorMessage: string;
if (err.error instanceof ErrorEvent) {
// A client-side or network error occurred. Handle it accordingly.
errorMessage = `An error occurred: ${err.error.message}`;
} else {
// The backend returned an unsuccessful response code.
// The response body may contain clues as to what went wrong,
errorMessage = `Backend returned code ${err.status}: ${err.body.error}`;
}
console.error(err);
return throwError(errorMessage);
}
This is eventually passed on to the component.
this.products$ = this.productService.getProducts().pipe(
catchError(err => {
this.errorMessage = err;
return EMPTY;
})
);
Change Detection
Using Async Pipes means, no need to subscribe or unsubscribe and improves change detection. There are two types of change detection. Default (check always)
- every component is check when ANY change is detected
OnPush (mimimizes change detection) when
- @Input properties change
- Event emits
- A bound Observable emits (using the async pipe)
So we change change this in our app
@Component({
templateUrl: './product-list.component.html',
styleUrls: ['./product-list.component.css'],
changeDetection: ChangeDetectionStrategy.OnPush
})
However because we use the OnPush the component, although we change the errormessage in the catchError the UI is not updated.
Declarative Pattern
Previously the pattern was
// Service
getProducts(): Observable<Product[]> {
return this.http.get<Product[]>(this.productsUrl)
.pipe(
tap(data => console.log('Products: ', JSON.stringify(data))),
catchError(this.handleError)
);
}
// Component
this.products$ = this.productService.getProducts().pipe(
catchError(err => {
this.errorMessage = err;
return EMPTY;
})
);
We can change this to be more declarative
// Service
products$: Observable<Product[]> = this.http.get<Product[]>(this.productsUrl)
.pipe(
catchError(this.handleError)
);
// Components
products$ = this.productService.products$.pipe(
catchError(err => {
this.errorMessage = err;
return EMPTY;
})
);
Mapping Returned Data
Mapping a Return Array
As mentioned previous apple[] is not the same as apple. To make this work they used the map function on the array which I am unsure if this is the right approach but it works.
products$: Observable<Product[]> = this.http.get<Product[]>(this.productsUrl)
.pipe(
map(products =>
products.map(product => ({
...product,
price: product.price * 1.5,
searchKey: [product.productName]
}) as Product)
),
catchError(this.handleError)
);
So their recommendation is to leave the array as is and do
- Map emmitted array
- Map the array elements
- Transform each array element
Combining Streams
Introduction
We my want to do this to
- Map id to string
- Combine with other sources e.g. share price, reviews
- React to actions e.g. re-filter
- Simplify for template code
CombineLatest
This is a function
- Takes in set of stream
- Creates and output stream
At least one value must have been emitted from each stream and it completes when all streams are complete.
ForkJoin
This is a function Creates an output stream using the lastest data from the input streams
WithLatest
This is a function
- Takes in set of stream
- Creates and output stream
At least one value must have been emitted from each stream, it emits a value each time the source stream gets a value. It completes when all streams are complete.
An Example
For the lookup table here is the example. Combine the two API calls to provide the additional data. The key thing is to understand the timings of when the data is calculated. For this case we have the latest data.
productsWithCategory = combineLatest([
this.products$,
this.productCategoryService.productsCategories$,
]).pipe(
map(([products, categories]) =>
products.map(
(product) =>
({
...product,
price: product.price * 1.5,
category: categories.find((c) => product.categoryId === c.id).name,
searchKey: [product.productName],
} as Product)
)
),
catchError(this.handleError)
);
Reacting to Actions
Filtering
No surprises, we map the data and add an array.filter. Change the UI to use this new Observable.
productsSimpleFilter$ = this.productService.productsWithCategory.pipe(
map(products =>
products.filter(product =>
this.selectedCategoryId ?
product.categoryId === this.selectedCategoryId : true))
);
An Aside
In the course they created a combobox which held a list of categories which could be used for filtering. To give an example so I can copy here is the code.
For the observable
categories$ = this.productCategoryService.productsCategories$.pipe(
catchError(err => {
this.errorMessage = err;
return EMPTY;
})
);
For the template
<select class="form-control"
(change)="onSelected($event.target.value)">
<option value="0">- Display All -</option>
<option *ngFor="let category of categories$ | async"
[value]="category.id">{{ category.name }}</option>
</select>
And for the select
onSelected(value): void {
this.selectedCategoryId = +value;
}
Back to the rest
Creating Action Stream
Create an action stream
private categorySelectedSubject = new Subject<number>()
Expose the subjects Observable
categorySelectedAction$ = this.categorySelectedSubject.asObservable()
Combine with the stream
products$ = combineLatest([
this.productService.productsWithCategory,
this.categorySelectedAction$
]).pipe(
map(([products, selectedCategoryId]) =>
products.filter(product =>
selectedCategoryId
? product.categoryId === selectedCategoryId
: true
)
),
catchError((err) => {
this.errorMessage = err;
return EMPTY;
})
);
Emit value on change
Finally emit a value to cause the combineLatest to be re-evaluated
onSelected(value): void {
this.categorySelectedSubject.next(+value);
}
Initial Value
Subject does not set an initial value. To resolve this there are two options add a startWith operator to the subject.
products$ = combineLatest([
this.productService.productsWithCategory,
this.categorySelectedAction$.pipe(
startWith(0),
),
]).pipe(
...
);
Or use a behaviourSubject
private categorySelectedSubject = new BehaviorSubject<number>(0);
Action Stream Example 2
Problem Statement
This is another example with the Action stream. The problem we are trying to solve is that the use can select a product on the left. When this occurs we want the selection to be highlighted on the left and the product to be displayed on the right.
Create A Service
In the Product Service create
- A subject and action to support this
- An observable to run on change
- A function to fire the change which can be used by the components
private productSelectedSubject = new Subject<number>();
productSelectedAction$ = this.productSelectedSubject.asObservable();
selectedProduct$ = combineLatest([
this.productsWithCategory,
this.productSelectedAction$.pipe(
startWith(0),
),
]).pipe(
map(([products, selectedProductId]) => {
return products.find(product => product.id === selectedProductId);
})
);
onProductSelected(value: string): void {
this.productSelectedSubject.next(+value);
}
Change the UI
Change Detection Strategy , add the selected Product to the component and Fire the change on selection
@Component({
selector: 'pm-product-list',
templateUrl: './product-list-alt.component.html',
changeDetection: ChangeDetectionStrategy.OnPush
})
...
selectedProduct$ = this.productService.selectedProduct$;
onSelected(value: string): void {
this.productService.onProductSelected(value);
}
Change the template to use the selected product.
[ngClass]="{'active': product?.id === (selectedProduct$ |async)?.id}"
Managing Errors
When we use streams we change the change detection to OnPush. This means that even through error occurs the UI is no longer notified. To fix this we use the same action stream strategy
private errorSubject = new Subject<string>();
errorAction$ = this.errorSubject.asObservable();
...
catchError((err) => {
this.errorSubject.next(err);
return EMPTY;
})
On the UI we can now do this
<div class="alert alert-danger"
*ngIf="emerrrorAction$ | async as errorMessage">
{{ errorMessage }}
</div>
Merge And Scan Operators
Really like this for managing data. The merge, derrr, lets you merge to streams together and the scan is like an accumulator in c++. Must be time for a picture.
So
- create an action stream
- merge the streams
- scan (accumulate into one)
- create an add product function
private productInsertedSubject = new Subject<Product>();
productInsertedAction$ = this.productInsertedSubject.asObservable();
productsWithAdd$ = merge(
this.productsWithCategory$,
this.productInsertedAction$
).pipe(
scan( (acc: Product[], value: Product) => {
return [...acc, value];
})
);
addProduct(newProduct: Product) {
newProduct = newProduct || this.fakeProduct();
this.productInsertedSubject.next(newProduct);
}
Caching Data
To cache streams such as lookup tables or data which is not expected to change you can use shareReplay with a parameter of how many to replay. When retrieving an array 1 will suffice.
productsCategories$: Observable<ProductCategory[]> = this.http.get<ProductCategory[]>(this.productCategoriesUrl)
.pipe(
tap(data => console.log(`Category ${data}`)),
shareReplay(1),
catchError(this.handleError)
);
Higher Order Mapping Operators
Higher order Observables are of type Observable<Observable<Product>>. Higher-Order mapping operator are used with these and end in xxxMap(). This section will discuss
- concatMap - Executes each input before processing the next
- mergeMap - Executes inputs in parallel so results could be in a different order
- switchMap - Unsubscribes from the prior input and switches to the new one
Combining All the Streams
One use which I have already come across is when you have an item and you want the associated child records. I our case a product with n number of suppliers. To do this we can use mergeMap
And here is the 'almost finished code.
selectedProductSupplier$ = this.selectedProduct$.pipe(
filter(selectedProduct => Boolean(selectedProduct)), // Empty Selected Product
mergeMap(selectedProduct =>
from(selectedProduct.supplierIds).pipe(
mergeMap(supplierId => this.http.get<Supplier>(`${this.suppliersUrl}/${supplierId}`)),
toArray()
)));
A second bash demonstrates switchMap. We do this because the data is received each time the user clicks the detail so if they click four times quickly we only want the last result and we can forget about the rest
selectedProductSupplier$ = this.selectedProduct$.pipe(
filter(selectedProduct => Boolean(selectedProduct)), // Empty Selected Product
switchMap(selectedProduct =>
from(selectedProduct.supplierIds).pipe(
mergeMap(supplierId => this.http.get<Supplier>(`${this.suppliersUrl}/${supplierId}`)),
toArray()
)));
Ancillary Streams
We can create streams from streams to update the UI. For instance get the Title for the page.
productTitle$ = this.product$.pipe(
map((p: Product) => p ? `Product Detail for : ${p.productName}` : null)
);
Combining At Last!!!
We can now combine out streams so the template only uses one.
// Create a combined stream with the data used in the view
// Use filter to skip if the product is null
vm$ = combineLatest([
this.product$,
this.productSuppliers$,
this.pageTitle$
])
.pipe(
filter(([product]) => Boolean(product)),
map(([product, productSuppliers, pageTitle]) =>
({ product, productSuppliers, pageTitle }))
);
Some Terms
Cold Observable - Doesn't emit until subscribe to (Unicast) e.g. http.get Hot Observable - Does emit without subscribers (Multicast) e.g. Subject
Karma
This took a lifetime to set up and make me wonder if I am using the right tool. It comes with Angular to stayed with it. What took the time was setting up the console.log to work. Here is my karma.conf.js.
module.exports = function (config) {
config.set({
basePath: "",
frameworks: ["jasmine", "@angular-devkit/build-angular"],
plugins: [
require("karma-jasmine"),
require("karma-chrome-launcher"),
require("karma-jasmine-html-reporter"),
require("karma-coverage-istanbul-reporter"),
require("karma-coverage"),
require("@angular-devkit/build-angular/plugins/karma"),
],
client: {
clearContext: false, // leave Jasmine Spec Runner output visible in browser
},
browserConsoleLogOptions: {
terminal: true,
level: ""
},
jasmineHtmlReporter: {
suppressAll: true // removes the duplicated traces
},
coverageIstanbulReporter: {
dir: require("path").join(__dirname, "./coverage/debug-tests"),
reports: ["html", "lcovonly", "text-summary"],
fixWebpackSourcePaths: true,
},
coverageReporter: {
dir: require('path').join(__dirname, './coverage/bibble-park-app'),
subdir: '.',
reporters: [
{ type: 'html' },
{ type: 'text-summary' }
]
},
reporters: ["progress", "kjhtml"],
port: 9876,
colors: true,
logLevel: config.LOG_DEBUG,
autoWatch: true,
browsers: ["Chrome"],
singleRun: false,
restartOnFileChange: true,
});
};
Testing
Decided to look a bit more into how to test observables. My starting point is Kevin Kreuzer's Medium page here.
The RxJs team provides and approach where you define a scheduler and a RunHelper
interface RunHelpers {
cold: typeof TestScheduler.prototype.createColdObservable
hot: typeof TestScheduler.prototype.createHotObservable
flush: typeof TestScheduler.prototype.flush
time: typeof TestScheduler.prototype.createTime
expectObservable: typeof TestScheduler.prototype.expectObservable
expectSubscriptions: typeof TestScheduler.prototype.expectSubscriptions
animate: (marbles: string) => void
}
Let's define the code we are testing which is function which filters out odd numbers and multiplies the even ones by 10.
const evenTimesTen = (source$: Observable<number>): Observable<number> => {
return source$.pipe(
filter((n) => n % 2 === 0),
map((n) => n * 10),
)
}
Simple Test
Then a define the test case like any Jasmine case.
describe('ExampleRxjsTest', () => {
let scheduler: TestScheduler
beforeEach(() => {
scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected)
})
})
it('should filter out odd numbers', () => {
scheduler.run((runhelpers: RunHelpers) => {...})
})
})
The author suggested that destructuring the interface implementation makes it a log easier to read. In this example we will define code and expectObservable.
/*
it('should filter out odd numbers', () => {
scheduler.run((runhelpers: RunHelpers) => {...})
})
*/
it('should filter out odd numbers', () => {
scheduler.run(({ cold, expectObservable }) => {
const values = { a: 1, b: 2, c: 3, d: 4, e: 5, f: 6 }
const source$ = cold('a-b-c-d-e-f|', values)
const expectedMarble = '--a---b---c|'
const expectedValues = { a: 20, b: 40, c: 60 }
const result$ = evenTimesTen(source$)
expectObservable(result$).toBe(expectedMarble, expectedValues)
})
})
Testing with an Error
I like this because I found my trusted divide by zero does not work with TS/Javascript. Much the same as above but this time we are testing for an error. Here is the function
const tenDivideByValue = (
source$: Observable<number>,
): Observable<number> | Observable<unknown> => {
return source$.pipe(
map((n) => {
const result = 10 / n
console.log(`Value we got was ${result}`)
if (!isFinite(result)) {
throw new Error('Divide by zero')
} else {
return result
}
}),
)
}
And here is the test case. The tricky part was understanding errorvalue meant an instance of Error.
it('should fail on divide by zero', () => {
const errorValue = new Error('Divide by zero')
scheduler.run(({ cold, animate, expectObservable }) => {
const values = { a: 5, b: 2, c: 10, d: 0 }
const source$ = cold('--a---b---c---d---|', values)
const expectedMarble = '--a---b---c---#'
animate(' ----r----r---r--')
const expectedValues = { a: 2, b: 5, c: 1 }
const result$ = tenDivideByValue(source$)
expectObservable(result$).toBe(expectedMarble, expectedValues, errorValue)
})
})
Testing with an catchError
Again a long struggle to get this working but here we go.
const tenDivideByValueCatchError = (
source$: Observable<number>,
): Observable<number> | Observable<unknown> => {
return source$.pipe(
map((n) => {
const result = 10 / n
console.log(`Value we got was ${result}`)
if (!isFinite(result)) {
throw new Error('Divide by zero')
} else {
return result
}
}),
catchError((error) => {
return of({})
}),
)
}
And here is the test case. The tricky part this time was understanding how to define on frame 4 emit a and complete.
it('should fail on divide by zero with catch', () => {
scheduler.run(({ cold, expectObservable }) => {
const values = { a: 5, b: 2, c: 10, d: 0 }
const source$ = cold('--a---b---c---d|', values)
const expectedMarble = '--a---b---c---(d|)'
const expectedValues = { a: 2, b: 5, c: 1, d: {} }
const result$ = tenDivideByValueCatchError(source$)
expectObservable(result$).toBe(expectedMarble, expectedValues)
})
})
Testing with an concat with cold
Struggled again to work off the examples on the net but getting through it has allowed me to understand the concepts.
const concatHot = (
source1$: Observable<number>,
source2$: Observable<number>,
): Observable<number> => {
return source1$.pipe(concat(source2$))
}
And the test case
it('should work with cold observables', () => {
scheduler.run(({ cold, expectObservable }) => {
const values1 = { a: 5, b: 2 }
const source1$ = cold('-a---b-|', values1)
const values2 = { c: 10, d: 0 }
const source2$ = cold('-c---d-|', values2)
const expectedMarbles = '-a---b--c---d-|'
const expectedValues = { a: 5, b: 2, c: 10, d: 0 }
const result$ = concatCold(source1$, source2$)
expectObservable(result$).toBe(expectedMarbles, expectedValues)
})
})
Testing with an concat with hot
I don't quite understand how we set the test up in real life. My guess it that the hot observable is mocking the timings of the release of the results.
const concatHot = (
source1$: Observable<number>,
source2$: Observable<number>,
): Observable<number> => {
return source1$.pipe(concat(source2$))
}
Here is the test case which does make sense.
/*
When testing hot observables you can specify the subscription
point using a caret ^, similar to how you specify subscriptions
when utilizing the expectSubscriptions assertion.
*/
it('should work with hot observables', () => {
scheduler.run(({ hot, expectObservable }) => {
const values1 = { a: 5, b: 2 }
const source1$ = hot('---a--^-b-|', values1)
const values2 = { c: 10, d: 0 }
const source2$ = hot('-----c^----d-|', values2)
const expectedMarbles = '--b--d-|'
const expectedValues = { b: 2, d: 0 }
const result$ = concatHot(source1$, source2$)
expectObservable(result$).toBe(expectedMarbles, expectedValues)
})
})
Marble Testing Anotation
You will see the source$ and expectedMarble have characters in them. This is a language that the library supports. Below is a list of some of these which can be found in detail at [1]
- '-' or '------': Equivalent to NEVER, or an observable that never emits or errors or completes.
- |: Equivalent to EMPTY, or an observable that never emits and completes immediately.
- #: Equivalent to throwError, or an observable that never emits and errors immediately.
- '--a--': An observable that waits 2 "frames", emits value a on frame 2 and then never completes.
- '--a--b--|': On frame 2 emit a, on frame 5 emit b, and on frame 8, complete.
- '--a--b--#': On frame 2 emit a, on frame 5 emit b, and on frame 8, error.
- '-a-^-b--|': In a hot observable, on frame -2 emit a, then on frame 2 emit b, and on frame 5, complete.
- '--(abc)-|': on frame 2 emit a, b, and c, then on frame 8, complete.
- '-----(a|)': on frame 5 emit a and complete.
- 'a 9ms b 9s c|': on frame 0 emit a, on frame 10 emit b, on frame 9,011 emit c, then on frame 9,012 complete.
- '--a 2.5m b': on frame 2 emit a, on frame 150,003 emit b and never complete.