Rx javascript: Difference between revisions
(12 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
=Introduction= | =Introduction= | ||
==Resources== | |||
Operators by Categories http://reactivex.io/documentation/operators.html#categorized | |||
==Packages== | ==Packages== | ||
The course used the following tools | The course used the following tools | ||
Line 86: | Line 89: | ||
} | } | ||
</syntaxhighlight> | </syntaxhighlight> | ||
=Basic Example= | =Basic Example= | ||
==From with Arrow Approach== | ==From with Arrow Approach== | ||
Line 146: | Line 150: | ||
</syntaxhighlight> | </syntaxhighlight> | ||
==Unsubscribing== | |||
We can easily unsubscribe and stop events by calling the unsubscribe function | |||
<syntaxhighlight lang="ts"> | |||
let subscription = | |||
load("./movies.json") | |||
.subscribe( | |||
renderMovies, | |||
e => console.log("error ", e), | |||
() => console.log("complete")) | |||
subscription.unsubscribe() | |||
</syntaxhighlight> | |||
From within the component we can of course use ngOnDestroy to do this too. | |||
<syntaxhighlight lang="ts"> | |||
ngOnDestroy() { | |||
if(this.eventbusSub) { | |||
this.eventbusSub.unsubscribe() | |||
} | |||
} | |||
</syntaxhighlight> | |||
==Operators== | ==Operators== | ||
RxJs provides many operators which can be chained together to manipulate the data the observer will see. Below is a simple example of this using map and filter. | RxJs provides many operators which can be chained together to manipulate the data the observer will see. Below is a simple example of this using map and filter. | ||
Line 215: | Line 240: | ||
</body> | </body> | ||
</html> | </html> | ||
</syntaxhighlight> | |||
==Working with Remote Data== | |||
===Traditional Approach=== | |||
Here we have one approach to retrieving data from the server. | |||
<syntaxhighlight lang="ts"> | |||
import { fromEvent } from 'rxjs'; | |||
import { map,delay } from 'rxjs/operators'; | |||
let output = document.getElementById("output") | |||
let button = document.getElementById("button") | |||
let click = fromEvent(button, "click").pipe() | |||
function load(url : string) { | |||
let xhr = new XMLHttpRequest(); | |||
console.log("doing load") | |||
console.log("URL is ", url) | |||
xhr.addEventListener("load", () => { | |||
let movies = JSON.parse(xhr.responseText) | |||
movies.forEach(element => { | |||
let div = document.createElement("div") | |||
div.innerText = element.title | |||
output.appendChild(div) | |||
}); | |||
}) | |||
xhr.open("GET",url) | |||
xhr.send(); | |||
} | |||
click.subscribe( | |||
e => load("./movies.json"), | |||
e => console.error(`We had an error ${e}`), | |||
() => console.log("Complete") | |||
); | |||
</syntaxhighlight> | |||
===Issues=== | |||
Looking at the code on of the main issues is the processing of the code is embedded in the receiving of the code. | |||
<syntaxhighlight lang="ts"> | |||
let movies = JSON.parse(xhr.responseText) | |||
movies.forEach(element => { | |||
let div = document.createElement("div") | |||
div.innerText = element.title | |||
output.appendChild(div) | |||
}); | |||
</syntaxhighlight> | |||
===Solution=== | |||
We can separate the concerns to render the movies into a separate function. I think RxJs was in the middle of rework as flatmap no longer existed and the imports were different | |||
The imports | |||
<syntaxhighlight lang="ts"> | |||
import { Observable, fromEvent } from "rxjs"; | |||
import { mergeMap} from "rxjs/operators"; | |||
</syntaxhighlight> | |||
The rendering | |||
<syntaxhighlight lang="ts"> | |||
function renderMovies(movies : string[]) | |||
{ | |||
movies.forEach(element => { | |||
let div = document.createElement("div") | |||
div.innerText = element.title | |||
output.appendChild(div) | |||
}); | |||
} | |||
</syntaxhighlight> | |||
Change the receive to use an observer. Once the data is received, call next, and because we only have one receive we can call complete too. | |||
<syntaxhighlight lang="ts"> | |||
function load(url: string) { | |||
return Observable.create((observer) => { | |||
let xhr = new XMLHttpRequest(); | |||
xhr.addEventListener("load", () => { | |||
let data = JSON.parse(xhr.responseText); | |||
observer.next(data) | |||
observer.complete() | |||
}); | |||
xhr.open("GET", url); | |||
xhr.send(); | |||
}); | |||
} | |||
</syntaxhighlight> | |||
Now we need to subscribe to the load on the button. | |||
<syntaxhighlight lang="ts"> | |||
click.pipe( | |||
mergeMap(e => load("./movies.json"))) | |||
.subscribe( | |||
renderMovies, | |||
e => console.log("error ", e), | |||
() => console.log("complete")) | |||
</syntaxhighlight> | |||
===Retrying=== | |||
With the rxJs library it is possible to implement sophisticated logic for retrying. One of the operators you can attach is retryWhen which lets you specify a function to call. | |||
<syntaxhighlight lang="ts"> | |||
function retryStrategy({attempts = 2, delayInMilliseconds = 1000}) { | |||
return function(errors) { | |||
return errors.pipe( | |||
scan((acc: number, value: void) => { | |||
console.log(acc, value) | |||
return acc + 1 | |||
},0), | |||
delay(delayInMilliseconds), | |||
takeWhile( acc => acc < attempts)) | |||
} | |||
} | |||
function load(url: string) { | |||
return Observable.create((observer) => { | |||
let xhr = new XMLHttpRequest() | |||
xhr.addEventListener("load", () => { | |||
if(xhr.status === 200) { | |||
let data = JSON.parse(xhr.responseText); | |||
observer.next(data) | |||
observer.complete() | |||
} else { | |||
observer.error(xhr.statusText) | |||
} | |||
}) | |||
xhr.open("GET", url); | |||
xhr.send(); | |||
}).pipe( | |||
retryWhen(retryStrategy({attempts: 4, delayInMilliseconds:1000})) | |||
) | |||
} | |||
</syntaxhighlight> | |||
===Working with Promises=== | |||
Easily done with from. | |||
<syntaxhighlight lang="ts"> | |||
import { from } from 'rxjs'; | |||
... | |||
function loadWithFetch(url: string) { | |||
return from(fetch(url).then(r => r.json())) | |||
} | |||
click.pipe(mergeMap(e => loadWithFetch("./movies.json"))) | |||
.subscribe( | |||
renderMovies, | |||
e => console.log("error ", e), | |||
() => console.log("complete")) | |||
</syntaxhighlight> | |||
Note the promise is executed but we can stop this by wrapping the call in a defer. | |||
<syntaxhighlight lang="ts"> | |||
function loadWithFetch(url: string) { | |||
return defer(() => { | |||
return from(fetch(url).then(r => r.json())) | |||
}) | |||
} | |||
</syntaxhighlight> | |||
One lets add some retrying as with the XHR request. | |||
<syntaxhighlight lang="ts"> | |||
function loadWithFetch(url: string) { | |||
return defer(() => { | |||
return from(fetch(url) | |||
.then( | |||
r => { | |||
if(r.status === 200) { | |||
r.json() | |||
} else { | |||
return Promise.reject(r) | |||
} | |||
})) | |||
}).pipe( | |||
retryWhen(retryStrategy({attempts: 4, delayInMilliseconds:1000})) | |||
) | |||
} | |||
</syntaxhighlight> | </syntaxhighlight> |
Latest revision as of 03:23, 4 September 2020
Introduction
Resources
Operators by Categories http://reactivex.io/documentation/operators.html#categorized
Packages
The course used the following tools
"dependencies": {
"rxjs": "^6.6.2"
},
"devDependencies": {
"source-map-loader": "^1.1.0",
"ts-loader": "^8.0.3",
"typescript": "^4.0.2",
"typings": "^2.1.1",
"webpack": "^4.44.1",
"webpack-cli": "^3.3.12",
"webpack-dev-server": "^3.11.0"
}
Typescript Configuration
I used the following tsconfig.json.
{
"compilerOptions": {
"target": "ES5",
"module": "commonjs",
"sourceMap": true
}
}
Webpack Configuration
Specify
- the entry point
- the output file
- add script tag to index.html
- Provide module loaders, which consists of a test and which loader to use
- Provide how Webpack should resolve modules
module.exports = {
entry: "./main",
output: {filename: "app.js"},
module: {
rules: [
{
test: /\.js$/,
enforce: 'pre',
use: ['source-map-loader'],
},
{
test: /\.ts$/,
use: "ts-loader",
exclude: "/node_modules",
}
]
},
resolve: {
extensions: [
".ts",
".js"
],
}
}
And the main.ts
alert("ok")
And in the Index.html
<html>
<head>
<title>RxJs</title>
</head>
<body>
<div>hello</div>
<script src="app.js"></script>
</body>
</html>
Scripts Configuration
In the package.json we can not set our scripts for npm
"scripts": {
"start": "webpack-dev-server --watch --inline",
"postinstall": "typings install"
}
Basic Example
From with Arrow Approach
import { from as ObservableFrom } from 'rxjs';
let numbers = [1,2,3,40]
let values = ObservableFrom(numbers)
source.subscribe(
value => console.log(`"Value received was ${value}`),
e => console.error(`We had an error ${e}`),
() => console.log("Complete")
);
From with Class Approach
Quite liked the explanation Anyway your responsibility is to provide a class which implements next, error and complete. You do not need to implement all but it is shown here
import { from as ObservableFrom } from 'rxjs';
let numbers = [1,2,3,40]
let source = ObservableFrom(numbers)
class MyObserverable
{
next(value) {
console.log(`"Value received was ${value}`)
}
error(e) {
console.error(`We had an error ${e}`)
}
complete() {
console.log("Complete")
}
}
source.subscribe(new MyObserverable)
Create
This is much the same thing but we provide the function which takes an observer.
import { from as ObservableFrom } from 'rxjs';
let numbers = [1,2,3,40]
let source = Observable.create(observer => {
for(let n of numbers) {
observer.next(n)
}
});
source.subscribe(
value => console.log(`"Value received was ${value}`),
e => console.error(`We had an error ${e}`),
() => console.log("Complete")
);
Unsubscribing
We can easily unsubscribe and stop events by calling the unsubscribe function
let subscription =
load("./movies.json")
.subscribe(
renderMovies,
e => console.log("error ", e),
() => console.log("complete"))
subscription.unsubscribe()
From within the component we can of course use ngOnDestroy to do this too.
ngOnDestroy() {
if(this.eventbusSub) {
this.eventbusSub.unsubscribe()
}
}
Operators
RxJs provides many operators which can be chained together to manipulate the data the observer will see. Below is a simple example of this using map and filter.
let source = Observable.create(observer => {
for(let n of numbers) {
observer.next(n)
}
})
.map(n => n * 2)
.filter(n => n > 4);
Importing
Size of the downloads can be considerable if left to the default. Although there are now tree shaking tools at build time we can help. Here is the recommended way for the example above using map and filter.
import {Observable} from "rxjs/Observable"
import { map,filter } from "rxjs/operators";
More Examples
Mouse Events (or Any)
You can capture events using the same technics as above. For the mouse use the fromEvent on the Observer. In this example I have filtered the event to just have the x,y co-ordinates sent to the source.
import { fromEvent } from 'rxjs';
import { map,delay } from 'rxjs/operators';
let circle = document.getElementById("circle")
let source = fromEvent(document, "mousemove").pipe(
map(
(e: MouseEvent) => {
return {
x: e.clientX,
y: e.clientY
}
}),
delay(300))
function onNext(value) {
circle.style.left = value.x
circle.style.top = value.y
}
source.subscribe(
onNext,
e => console.error(`We had an error ${e}`),
() => console.log("Complete")
);
And the modified html
<html>
<head>
<title>RxJs</title>
<style>
#circle {
width: 20px;
height: 20px;
border-radius: 50%;
background-color: red;
position: absolute;
}
</style>
</head>
<body>
<div id="circle"></div>
<script src="app.js"></script>
</body>
</html>
Working with Remote Data
Traditional Approach
Here we have one approach to retrieving data from the server.
import { fromEvent } from 'rxjs';
import { map,delay } from 'rxjs/operators';
let output = document.getElementById("output")
let button = document.getElementById("button")
let click = fromEvent(button, "click").pipe()
function load(url : string) {
let xhr = new XMLHttpRequest();
console.log("doing load")
console.log("URL is ", url)
xhr.addEventListener("load", () => {
let movies = JSON.parse(xhr.responseText)
movies.forEach(element => {
let div = document.createElement("div")
div.innerText = element.title
output.appendChild(div)
});
})
xhr.open("GET",url)
xhr.send();
}
click.subscribe(
e => load("./movies.json"),
e => console.error(`We had an error ${e}`),
() => console.log("Complete")
);
Issues
Looking at the code on of the main issues is the processing of the code is embedded in the receiving of the code.
let movies = JSON.parse(xhr.responseText)
movies.forEach(element => {
let div = document.createElement("div")
div.innerText = element.title
output.appendChild(div)
});
Solution
We can separate the concerns to render the movies into a separate function. I think RxJs was in the middle of rework as flatmap no longer existed and the imports were different
The imports
import { Observable, fromEvent } from "rxjs";
import { mergeMap} from "rxjs/operators";
The rendering
function renderMovies(movies : string[])
{
movies.forEach(element => {
let div = document.createElement("div")
div.innerText = element.title
output.appendChild(div)
});
}
Change the receive to use an observer. Once the data is received, call next, and because we only have one receive we can call complete too.
function load(url: string) {
return Observable.create((observer) => {
let xhr = new XMLHttpRequest();
xhr.addEventListener("load", () => {
let data = JSON.parse(xhr.responseText);
observer.next(data)
observer.complete()
});
xhr.open("GET", url);
xhr.send();
});
}
Now we need to subscribe to the load on the button.
click.pipe(
mergeMap(e => load("./movies.json")))
.subscribe(
renderMovies,
e => console.log("error ", e),
() => console.log("complete"))
Retrying
With the rxJs library it is possible to implement sophisticated logic for retrying. One of the operators you can attach is retryWhen which lets you specify a function to call.
function retryStrategy({attempts = 2, delayInMilliseconds = 1000}) {
return function(errors) {
return errors.pipe(
scan((acc: number, value: void) => {
console.log(acc, value)
return acc + 1
},0),
delay(delayInMilliseconds),
takeWhile( acc => acc < attempts))
}
}
function load(url: string) {
return Observable.create((observer) => {
let xhr = new XMLHttpRequest()
xhr.addEventListener("load", () => {
if(xhr.status === 200) {
let data = JSON.parse(xhr.responseText);
observer.next(data)
observer.complete()
} else {
observer.error(xhr.statusText)
}
})
xhr.open("GET", url);
xhr.send();
}).pipe(
retryWhen(retryStrategy({attempts: 4, delayInMilliseconds:1000}))
)
}
Working with Promises
Easily done with from.
import { from } from 'rxjs';
...
function loadWithFetch(url: string) {
return from(fetch(url).then(r => r.json()))
}
click.pipe(mergeMap(e => loadWithFetch("./movies.json")))
.subscribe(
renderMovies,
e => console.log("error ", e),
() => console.log("complete"))
Note the promise is executed but we can stop this by wrapping the call in a defer.
function loadWithFetch(url: string) {
return defer(() => {
return from(fetch(url).then(r => r.json()))
})
}
One lets add some retrying as with the XHR request.
function loadWithFetch(url: string) {
return defer(() => {
return from(fetch(url)
.then(
r => {
if(r.status === 200) {
r.json()
} else {
return Promise.reject(r)
}
}))
}).pipe(
retryWhen(retryStrategy({attempts: 4, delayInMilliseconds:1000}))
)
}