Rx javascript

From bibbleWiki
Jump to navigation Jump to search

Introduction

Resources

Operators by Categories http://reactivex.io/documentation/operators.html#categorized

Packages

The course used the following tools

 "dependencies": {
    "rxjs": "^6.6.2"
  },
  "devDependencies": {
    "source-map-loader": "^1.1.0",
    "ts-loader": "^8.0.3",
    "typescript": "^4.0.2",
    "typings": "^2.1.1",
    "webpack": "^4.44.1",
    "webpack-cli": "^3.3.12",
    "webpack-dev-server": "^3.11.0"
  }

Typescript Configuration

I used the following tsconfig.json.

{
    "compilerOptions": {
        "target": "ES5",
        "module": "commonjs",
        "sourceMap": true
    }
}

Webpack Configuration

Specify

  • the entry point
  • the output file
  • add script tag to index.html
  • Provide module loaders, which consists of a test and which loader to use
  • Provide how Webpack should resolve modules
module.exports = {
    entry: "./main",
    output: {filename: "app.js"},
    module: {
        rules: [
            {
                test: /\.js$/,
                enforce: 'pre',
                use: ['source-map-loader'],
            },            
            {
                test: /\.ts$/,
                use: "ts-loader",
                exclude: "/node_modules",
            }
        ]
    },
    resolve: {
        extensions: [
            ".ts",
            ".js"
        ],
    }
}

And the main.ts

alert("ok")

And in the Index.html

<html>
	<head>
		<title>RxJs</title>
	</head>
	<body>
		<div>hello</div>
		<script src="app.js"></script>
	</body>
</html>

Scripts Configuration

In the package.json we can not set our scripts for npm

"scripts": {
    "start": "webpack-dev-server --watch --inline",
    "postinstall": "typings install" 
}

Basic Example

From with Arrow Approach

import { from as ObservableFrom } from 'rxjs';

let numbers = [1,2,3,40]
let values = ObservableFrom(numbers)

source.subscribe(
   value => console.log(`"Value received was ${value}`),
   e => console.error(`We had an error ${e}`),
   () => console.log("Complete") 
);

From with Class Approach

Quite liked the explanation Anyway your responsibility is to provide a class which implements next, error and complete. You do not need to implement all but it is shown here

import { from as ObservableFrom } from 'rxjs';

let numbers = [1,2,3,40]
let source = ObservableFrom(numbers)

class MyObserverable
{
    next(value) {
        console.log(`"Value received was ${value}`)
    }

    error(e) {
        console.error(`We had an error ${e}`)
    }
    complete() {
        console.log("Complete")
    }
}

source.subscribe(new MyObserverable)

Create

This is much the same thing but we provide the function which takes an observer.

import { from as ObservableFrom } from 'rxjs';

let numbers = [1,2,3,40]
let source = Observable.create(observer => {
    for(let n of numbers) {
        observer.next(n)
    }
});

source.subscribe(
   value => console.log(`"Value received was ${value}`),
   e => console.error(`We had an error ${e}`),
   () => console.log("Complete") 
);

Unsubscribing

We can easily unsubscribe and stop events by calling the unsubscribe function

let subscription = 
        load("./movies.json")
        .subscribe(
            renderMovies,
            e => console.log("error ", e),
            () => console.log("complete"))

subscription.unsubscribe()

From within the component we can of course use ngOnDestroy to do this too.

ngOnDestroy() {
   if(this.eventbusSub) {
      this.eventbusSub.unsubscribe()
   }
}

Operators

RxJs provides many operators which can be chained together to manipulate the data the observer will see. Below is a simple example of this using map and filter.

let source = Observable.create(observer => {
    for(let n of numbers) {
        observer.next(n)
    }
})
.map(n => n * 2)
.filter(n => n > 4);

Importing

Size of the downloads can be considerable if left to the default. Although there are now tree shaking tools at build time we can help. Here is the recommended way for the example above using map and filter.

import {Observable} from "rxjs/Observable"
import { map,filter } from "rxjs/operators";

More Examples

Mouse Events (or Any)

You can capture events using the same technics as above. For the mouse use the fromEvent on the Observer. In this example I have filtered the event to just have the x,y co-ordinates sent to the source.

import { fromEvent } from 'rxjs';
import { map,delay } from 'rxjs/operators';

let circle = document.getElementById("circle")

let source = fromEvent(document, "mousemove").pipe(
    map(
        (e: MouseEvent) => {
        return {
            x: e.clientX,
            y: e.clientY
        }
    }),
    delay(300))

function onNext(value) {
    circle.style.left = value.x
    circle.style.top = value.y
}

source.subscribe(
    onNext,
    e => console.error(`We had an error ${e}`),
    () => console.log("Complete") 
 );

And the modified html

<html>
	<head>
		<title>RxJs</title>
		<style>
			#circle {
				width: 20px;
				height: 20px;
				border-radius: 50%;
				background-color: red;
				position: absolute;
			}
		</style>
	</head>
	<body>
		<div id="circle"></div>
		<script src="app.js"></script>
	</body>
</html>

Working with Remote Data

Traditional Approach

Here we have one approach to retrieving data from the server.

import { fromEvent } from 'rxjs';
import { map,delay } from 'rxjs/operators';

let output = document.getElementById("output")
let button = document.getElementById("button")

let click = fromEvent(button, "click").pipe()

function load(url : string) {

    let xhr = new XMLHttpRequest();

    console.log("doing load")

    console.log("URL is ", url)

    xhr.addEventListener("load", () => {
        let movies = JSON.parse(xhr.responseText)
        movies.forEach(element => {
            let div = document.createElement("div")
            div.innerText = element.title
            output.appendChild(div)
        });
    })
    xhr.open("GET",url)
    xhr.send();
}

click.subscribe(
    e => load("./movies.json"),
    e => console.error(`We had an error ${e}`),
    () => console.log("Complete") 
 );

Issues

Looking at the code on of the main issues is the processing of the code is embedded in the receiving of the code.

        let movies = JSON.parse(xhr.responseText)
        movies.forEach(element => {
            let div = document.createElement("div")
            div.innerText = element.title
            output.appendChild(div)
        });

Solution

We can separate the concerns to render the movies into a separate function. I think RxJs was in the middle of rework as flatmap no longer existed and the imports were different

The imports

import { Observable, fromEvent } from "rxjs";
import { mergeMap} from "rxjs/operators";

The rendering

function renderMovies(movies : string[])
{
    movies.forEach(element => {
        let div = document.createElement("div")
        div.innerText = element.title
        output.appendChild(div)
    });
}

Change the receive to use an observer. Once the data is received, call next, and because we only have one receive we can call complete too.

function load(url: string) {
  return Observable.create((observer) => {
    let xhr = new XMLHttpRequest();

    xhr.addEventListener("load", () => {
      let data = JSON.parse(xhr.responseText);
      observer.next(data)
      observer.complete()
    });

    xhr.open("GET", url);
    xhr.send();
  });
}

Now we need to subscribe to the load on the button.

click.pipe(
    mergeMap(e => load("./movies.json")))
        .subscribe(
            renderMovies,
            e => console.log("error ", e),
            () => console.log("complete"))

Retrying

With the rxJs library it is possible to implement sophisticated logic for retrying. One of the operators you can attach is retryWhen which lets you specify a function to call.

function retryStrategy({attempts = 2, delayInMilliseconds = 1000}) {
    return function(errors) {
        return errors.pipe(
            scan((acc: number, value: void) => {
                console.log(acc, value)
                    return acc + 1
                },0),
            delay(delayInMilliseconds),
            takeWhile( acc => acc < attempts))
    }
}

function load(url: string) {
    return Observable.create((observer) => {
        let xhr = new XMLHttpRequest()
        xhr.addEventListener("load", () => {
            if(xhr.status === 200) {
                let data = JSON.parse(xhr.responseText);
                observer.next(data)
                observer.complete()
            } else {
                observer.error(xhr.statusText)
            }
        })
  
        xhr.open("GET", url);
        xhr.send();
    }).pipe(
        retryWhen(retryStrategy({attempts: 4, delayInMilliseconds:1000}))
    )
}

Working with Promises

Easily done with from.

import { from } from 'rxjs';
...

function loadWithFetch(url: string) {
    return from(fetch(url).then(r => r.json()))
}

click.pipe(mergeMap(e => loadWithFetch("./movies.json")))
        .subscribe(
            renderMovies,
            e => console.log("error ", e),
            () => console.log("complete"))

Note the promise is executed but we can stop this by wrapping the call in a defer.

function loadWithFetch(url: string) {
    return defer(() => {
        return from(fetch(url).then(r => r.json()))
    })
}

One lets add some retrying as with the XHR request.

function loadWithFetch(url: string) {
    return defer(() => {
        return from(fetch(url)
        .then(
            r => {
                if(r.status === 200) {
                    r.json()
                } else {
                    return Promise.reject(r)
                }
            }))
    }).pipe(
        retryWhen(retryStrategy({attempts: 4, delayInMilliseconds:1000}))
    )
}