1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter, interval } from 'rxjs';
- import { DataPrepService } from '../services/dataprep.service';
- import { StorageLocation } from '../types/interface';
- import { Server } from 'socket.io'
- import { io } from 'socket.io-client';
- import { UtilityService } from '../services/utility.service';
- let msgData = new DataPrepService()
- let util = new UtilityService()
- util.checkMaxHeap()
- /* ---------------------- COMPLEX OPERATION ------------------------------ */
- let msgPayload: Subject<any> = new Subject();
- let consumerTrafficStatus: Subject<any> = new Subject()
- let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(false)
- let mongoStorage: StorageLocation = {
- type: `MongoDB`,
- url: `mongodb://192.168.100.59:27017/default`
- }
- msgData.loadObsData(mongoStorage, msgPayload)
- consumerTrafficStatus.subscribe((consumerHeapUsage) => {
- if (consumerHeapUsage >= 2) {
- // If consumerHeapUsage is over 2 %, it will trigger buffer
- bufferTrigger.next(true)
- } else {
- bufferTrigger.next(false)
- }
- })
- bufferTrigger.subscribe((element) => {
- if(element){
- console.log(`Heap Load Exceeded on client side. Buffering.....`)
- }
- })
- // Create a WebSocket server
- function createWebsocketServer() {
- const io = new Server({})
- io.on('connection', (socket) => {
- console.log(`Connected to Clients/Consumers`)
- // Subscribe to the subject when a client connects
- const subscription = msgPayload.pipe(
- buffer(bufferTrigger.pipe(filter(value => !value)))
- ).subscribe((element) => {
- console.log(`Emitting ${element.length} messages`)
- socket.emit(`payload`, element);
- });
- //Listen for messages from consumer/client
- socket.on(`message`, (message) => {
- console.log(`Received message from client: ${message}`)
- util.checkHeapSize()
- // Send a message back to the client
- socket.send(`${message} received!`);
- })
- // Listen for the socket to be closed
- socket.on('disconnect', () => {
- console.log('Client/Consumer disconnected');
- // Need to put next(true) trigger the buffer due to disconnection from the other side.
- bufferTrigger.next(true)
- subscription.unsubscribe();
- });
- })
- io.listen(8080)
- }
- // Create a new WebSocket client
- function connectWebSocket() {
- const socket = io('http://localhost:8081');
- socket.on('connect', () => {
- console.log(`Connected to Consumer'Server.`);
- });
- socket.on('trafficControl', (report: number) => {
- consumerTrafficStatus.next(report);
- });
- socket.on('disconnect', () => {
- console.log(`Disconnected from Consumer'Server`);
- // Attempt to reconnect every 3 seconds
- setTimeout(() => {
- console.log('Attempting to reconnect...');
- socket.connect();
- }, 3000);
- });
- }
- createWebsocketServer()
- connectWebSocket();
|