import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter, interval } from 'rxjs'; import { DataPrepService } from '../services/dataprep.service'; import { StorageLocation } from '../types/interface'; import { Server } from 'socket.io' import { io } from 'socket.io-client'; import { UtilityService } from '../services/utility.service'; let msgData = new DataPrepService() let util = new UtilityService() util.checkMaxHeap() /* ---------------------- COMPLEX OPERATION ------------------------------ */ let msgPayload: Subject = new Subject(); let consumerTrafficStatus: Subject = new Subject() let bufferTrigger: BehaviorSubject = new BehaviorSubject(false) let mongoStorage: StorageLocation = { type: `MongoDB`, url: `mongodb://192.168.100.59:27017/default` } msgData.loadObsData(mongoStorage, msgPayload) consumerTrafficStatus.subscribe((consumerHeapUsage) => { if (consumerHeapUsage >= 2) { // If consumerHeapUsage is over 2 %, it will trigger buffer bufferTrigger.next(true) } else { bufferTrigger.next(false) } }) bufferTrigger.subscribe((element) => { if(element){ console.log(`Heap Load Exceeded on client side. Buffering.....`) } }) // Create a WebSocket server function createWebsocketServer() { const io = new Server({}) io.on('connection', (socket) => { console.log(`Connected to Clients/Consumers`) // Subscribe to the subject when a client connects const subscription = msgPayload.pipe( buffer(bufferTrigger.pipe(filter(value => !value))) ).subscribe((element) => { console.log(`Emitting ${element.length} messages`) socket.emit(`payload`, element); }); //Listen for messages from consumer/client socket.on(`message`, (message) => { console.log(`Received message from client: ${message}`) util.checkHeapSize() // Send a message back to the client socket.send(`${message} received!`); }) // Listen for the socket to be closed socket.on('disconnect', () => { console.log('Client/Consumer disconnected'); // Need to put next(true) trigger the buffer due to disconnection from the other side. bufferTrigger.next(true) subscription.unsubscribe(); }); }) io.listen(8080) } // Create a new WebSocket client function connectWebSocket() { const socket = io('http://localhost:8081'); socket.on('connect', () => { console.log(`Connected to Consumer'Server.`); }); socket.on('trafficControl', (report: number) => { consumerTrafficStatus.next(report); }); socket.on('disconnect', () => { console.log(`Disconnected from Consumer'Server`); // Attempt to reconnect every 3 seconds setTimeout(() => { console.log('Attempting to reconnect...'); socket.connect(); }, 3000); }); } createWebsocketServer() connectWebSocket();