123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- // Test Observable //
- import mongoose, { Model } from "mongoose";
- import { Subject, from, map, of, interval } from "rxjs";
- const used = process.memoryUsage();
- let MongooseConnection: mongoose.Connection
- let connectionStatus = 0
- let mongoStorage: any = {
- type: `MongoDB`,
- url: `mongodb://192.168.100.59:27017/fromEnzo`
- }
- let testSubject: Subject<any> = new Subject()
- let count = 0
- let data: any[] = []
- // Connect to designated storage destination
- async function connectMongo(storage: Storage) {
- return new Promise((resolve, reject) => {
- try {
- console.log(`Connecting to ${storage.url}`)
- MongooseConnection = mongoose.createConnection(storage.url)
- connectionStatus = 1
- resolve(connectionStatus)
- }
- catch (error) {
- connectionStatus = 0
- console.error('An error occurred while connecting to the database:', error);
- setTimeout(() => {
- connectMongo(storage).then(() => {
- resolve(connectionStatus)
- })
- console.log(`Reconnecting...`)
- }, 3000);
- }
- })
- }
- // Acquire data from Mongo
- function streamMongoData(storage: Storage, subjectStream: Subject<any>) {
- connectMongo(storage).then(() => {
- let message: Model<any> = MongooseConnection.model('Message', require('../types/message.schema'))
- let stream = message.find().limit(10).cursor()
- stream.on('data', (data: any) => subjectStream.next(data));
- stream.on('error', (error) => subjectStream.error(error));
- stream.on('end', () => subjectStream.complete());
- })
- }
- /* --------------- TEST --------------- */
- // testSubject.subscribe({
- // next: (e) => {
- // setTimeout(() => {
- // count++
- // console.log(count + '. ' + e.appData.msgId) // No problem streaming all the elements from mongo streaming (2.9 milliion)
- // }, 10000) // But it doesn't wait 10 seconds
- // }
- // });
- function doThis(element: any) {
- const min = 1;
- const max = 10;
- const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
- setTimeout(() => {
- // console.log(`Doing random task for 3 seconds for ${element.appData.msgId}`)
- console.log(`Doing random task for ${randomInt} seconds for "${element}"`)
- }, randomInt * 1000)
- }
- async function doThat(element): Promise<any> {
- return new Promise((resolve, reject) => {
- const min = 1;
- const max = 10;
- const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
- setTimeout(() => {
- // console.log(`Processing ${element.appData.msgId} for 3 seconds`)
- console.log(`Processing "${element}" for ${randomInt} seconds`)
- resolve(element)
- }, randomInt * 1000)
- })
- }
- /* -------------- ACTION ------------------- */
- // streamMongoData(mongoStorage, testSubject)
- // testSubject.subscribe({
- // next(element) {
- // doThat(element).then((data) => {
- // doThis(data)
- // })
- // },
- // error(err) {
- // console.error('something wrong occurred: ' + err);
- // },
- // complete() {
- // console.log('done');
- // },
- // })
- /* --------- Just TESTING and understanding behaviour ---------- */
- /* Explanation: Producer streams at a constant rate of broadcasting 1 data per second. The first task of emitting the value will continue
- without interruptioon, where as the callbacks for doThis() and doThat() will be registered and be called.
- If the callback is asynchronous, it will register it to the event stack to be executed. And it doesn't have to wait for the previous
- tasks to complete before it can start it's own tasks.
- If the callback is synchronous, it will register the callbacks to the event stack to be executed. It doesn't wait for previous
- tasks to be completed before it can start it's own tasks. */
- let waitForOneSecond = interval(1000)
- waitForOneSecond.subscribe({
- next(element) {
- console.log(`Broadcasting: ${element}`)
- //Asynchrounous Code
- // doThat(element).then((data) => {
- // doThis(data)
- // })
- // Synchrounous
- doThis(element)
- }
- })
- /* Ignore this for now, just wanted to see how it behaves if the observable is created from arrays. The
- value publishing was too fast, I could derive meaningful comprehension from the data. Plese refer to the
- study case above. */
- // let obsArray = from([1, 2, 3, 4, 5])
- // obsArray.subscribe({
- // next(element) {
- // console.log(`Emmitting: ${element}`)
- // doThat(element).then((data) => {
- // doThis(data)
- // })
- // }
- // })
- // of(1, 2, 3, 4, 5).subscribe({
- // next(element) {
- // console.log(`Streaming: ${element}`)
- // doThat(element).then((data) => {
- // doThis(data)
- // })
- // }
- // })
|