123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- // Test Observable //
- import mongoose, { Model } from "mongoose";
- import { Subject, from, map, of, interval, buffer, asyncScheduler, observeOn, takeUntil, delay, queueScheduler, bufferWhen } from "rxjs";
- const used = process.memoryUsage();
- let MongooseConnection: mongoose.Connection
- let connectionStatus = 0
- let mongoStorage: any = {
- type: `MongoDB`,
- url: `mongodb://192.168.100.59:27017/fromEnzo`
- }
- let testSubject: Subject<any> = new Subject()
- let count = 0
- // Connect to designated storage destination
- async function connectMongo(storage: Storage) {
- return new Promise((resolve, reject) => {
- try {
- console.log(`Connecting to ${storage.url}`)
- MongooseConnection = mongoose.createConnection(storage.url)
- connectionStatus = 1
- resolve(connectionStatus)
- }
- catch (error) {
- connectionStatus = 0
- console.error('An error occurred while connecting to the database:', error);
- setTimeout(() => {
- connectMongo(storage).then(() => {
- resolve(connectionStatus)
- })
- console.log(`Reconnecting...`)
- }, 3000);
- }
- })
- }
- // Acquire data from Mongo
- function streamMongoData(storage: Storage, subjectStream: Subject<any>) {
- connectMongo(storage).then(() => {
- let message: Model<any> = MongooseConnection.model('Message', require('../types/message.schema'))
- let stream = message.find().limit(10).cursor()
- stream.on('data', (data: any) => subjectStream.next(data));
- stream.on('error', (error) => subjectStream.error(error));
- stream.on('end', () => subjectStream.complete());
- })
- }
- /* --------------- Understanding the Concepts && Behaviour --------------- */
- // Callbacks to be used to emulate high traffic observables subscription to observe it's behaviour.
- // What happens if the data received is used to call other functions that may take a while to finish, observe what happens to the stream and event stack.
- async function handler1(element): Promise<any> {
- return new Promise((resolve, reject) => {
- const min = 1;
- const max = 5;
- const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
- setTimeout(() => {
- console.log(`Handler 1: Processing data ${element} for ${randomInt} seconds.`)
- resolve(element)
- }, randomInt * 1000)
- })
- }
- function handler2(element: any) {
- const min = 1;
- const max = 5;
- const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
- setTimeout(() => {
- console.log(`Handler 2: Processing data ${element} for ${randomInt} seconds.`)
- handlers(element)
- }, randomInt * 1000)
- }
- function handlers(element: any) {
- const min = 1;
- const max = 5;
- const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
- setTimeout(() => {
- console.log(`Callback Handlers: Processing data ${element} for ${randomInt} seconds.`)
- }, randomInt * 1000)
- }
- function printLog() {
- const t0 = performance.now()
- let i
- for (i = 0; i <= 6000000000; i++) {
- }
- const t1 = performance.now()
- const timeTakenInSeconds = (t1 - t0) / 1000;
- console.log(`Time taken: ${timeTakenInSeconds} seconds to run this printLog()`);
- }
- /* Explanation: So the producer will emit 1 data very 1 second indefinitely. For the consumer, when they subscribes to the producer, the data will be consumed.
- So when the producer emits data, the next method of the consumer is called, and the following tasks are registered into the call stack or event queue:
- 1.The console.log() statement logs the received data to the console. This task is synchronous and is executed immediately.
- 2.The handler1() function is called with the received data as an argument. This task is asynchronous and is added to the event queue.
- 3.The then() method of the Promise returned by handler1() is called with a callback function as an argument. This task is also asynchronous and is added to the event queue.
- 4.The printLog() function is called. This task is synchronous and is executed immediately.
- After all synchronous tasks in the call stack are completed, the asynchronous tasks in the event queue are executed one by one, starting with the handler1() function call
- and followed by the then() callback function call.*/
- let publishDataEverySecond = interval(1000)
- let control = interval(5000)
- function understandingOBS() {
- publishDataEverySecond.subscribe({
- next: element => {
- console.log(`Data received: ${element}`)
- handler1(element).then((data) => { // asynchronous
- handler2(data) // setTimeout will put the call into the call stack
- })
- printLog() // synchronous: this must complete before the next data to be received
- }
- })
- }
- /* Buffer */
- let bufferring = publishDataEverySecond.pipe(buffer(control)) // standard buffer
- let buffered = publishDataEverySecond.pipe( // using buffer when
- bufferWhen(() => interval(1000 + Math.random() * 4000))
- );
- function bufferOBS() {
- buffered.subscribe({
- next(element) {
- console.log(`Data received: ${element}`)
- handler1(element).then((data) => {
- handler2(data)
- })
- }
- })
- }
- /* Scheduler */
- let scheduler = publishDataEverySecond.pipe(observeOn(asyncScheduler)) //async scheduler
- let source$ = interval(1000, queueScheduler); // queue scheduler
- let result$ = source$.pipe(takeUntil(control))
- function scheduleOBS() {
- result$.subscribe({
- next: element => {
- console.log(`Scheduler: ${element}`)
- }
- })
- }
- understandingOBS()
|