import { BehaviorSubject, catchError, Observable, of, Subject, Subscription, tap, timeout } from "rxjs" import { RetransmissionService } from "../../services/retransmission.service" import { BaseMessage } from "../../dependencies/logging/services/logging-service" import { v4 as uuidV4 } from 'uuid'; const express = require('express'); const http = require('http'); // const { Server } = require('socket.io'); import { Server } from 'socket.io' /* This is only for demonstration purposes. Because the actual nestjs socket implementation may differ. */ export class SocketService { private connectedClients: ClientInfo[] = [] private announcements: Subject = new Subject() private app = express(); private server = http.createServer(this.app); private io = new Server(this.server); private responseFromApp: Subject private incomingRequest: Subject = new Subject() constructor(response: Subject) { this.responseFromApp = response this.announcements.subscribe(announcement => { console.log(`Server Announcement: ${announcement}`) }) } public getIncomingRequest(): Observable { return this.incomingRequest.asObservable() } public async setUpConnection() { this.io.on('connection', (socket) => { this.announcements.next('a client is connected:' + socket.id); let clientInfo: ClientInfo | null socket.on('connect', (msg) => { // this is reserved.... }); socket.on('notification', (msg) => { console.log(msg) if (msg.agenda == 'newClient') { clientInfo = { id: socket.id, clientName: uuidV4(), connectedAt: new Date(), clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'), requests: [], buffer: new RetransmissionService(), responseObs: new Subject() } this.connectedClients.push(clientInfo); // Send data over for client to persist socket.emit('notification', { notification: 'Your credentials', createdAt: new Date(), socketInfo: clientInfo }) // this is the supposed responses to be pushed to this socket client clientInfo.buffer.retransmission(clientInfo.responseObs, clientInfo.clientConnectionState) let subscription = clientInfo.buffer.returnBufferedMessages().subscribe(output => { // console.log(output) if (clientInfo.clientConnectionState.getValue() === 'ONLINE') { socket.emit('response', output) } else { subscription.unsubscribe() } }) } if (msg.agenda == 'existingClient') { // check if client exists let clientObj: ClientInfo = this.connectedClients.find(obj => obj.clientName === msg.data.clientName) if (clientObj) { clientInfo = clientObj console.log('Existing client found') // but also update socketId clientInfo.id = socket.id // Send data over for client to persist socket.emit('notification', { notification: 'Your updated credentials', connectedAt: new Date(), updatedId: socket.id }) // resume operation Release them buffer /* local client isOnline need not be mutated, since this is a new connection. However the previous intance of client Connection State inside the retransmission needs to be updated to release the buffered values.*/ function releaseBufferedItems(clientInfo: ClientInfo) { let subscription: Subscription = clientInfo.buffer.returnBufferedMessages().pipe( tap(message => { if (clientInfo.clientConnectionState.getValue() === 'OFFLINE') { clientInfo.responseObs.next(message) } }), timeout(10000), // Unsubscribe if no value is emitted within 10 seconds catchError(err => { if (err.name === 'TimeoutError') { console.log('TimeoutError: No value emitted within 10 seconds.'); if (clientInfo.clientConnectionState.getValue() === 'ONLINE') { releaseBufferedItems(clientInfo); // Call the function if it's still online } else { subscription.unsubscribe() } } return of(); }) ) .subscribe({ next: output => { socket.emit('response', output) }, error: err => console.error(err), complete: () => { } }) } releaseBufferedItems(clientInfo) //signal to release buffered items clientObj.clientConnectionState.next('ONLINE') } else { console.log(this.connectedClients) console.log(`Existing Client is not found`) } } }) // Listen for messages from the client socket.on('request', (request: BaseMessage) => { if (clientInfo) { this.announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.clientName}`); // clientInfo.requests.push({ message: request, completed: false }) this.incomingRequest.next(request) this.processRequest(request).subscribe({ next: message => { // console.log(message.header.messageName) // it does receive clientInfo.responseObs.next(message) }, error: err => console.error(err), complete: () => { } }) } else { console.log(`Client is still not defined. Please have this client set up the credentials`) socket.emit('notification', { notification: 'Failed Request', data: request, message: 'Client Credentials is not properply set up! Cannot process requests at the moment.' }) } }); // Handle disconnection socket.on('disconnect', () => { if (clientInfo) { clientInfo.clientConnectionState.next('OFFLINE') // signal to start buffering\ this.announcements.next(`Client ${clientInfo.id} disconnected`); // this.deleteClientById(socket.id) } }); }); this.io.engine.on("connection_error", (err) => { console.log(err.req); // the request object console.log(err.code); // the error code, for example 1 console.log(err.message); // the error message, for example "Session ID unknown" console.log(err.context); // some additional error context }); // Start the server const PORT = process.env.PORT || 3000; this.server.listen(PORT, () => { console.log(`Server listening on port ${PORT}`); }); } // Utils // Function to delete an item by its id (mutating the array) private deleteClientById(id) { const index = this.connectedClients.findIndex(item => item.id === id); if (index !== -1) { this.connectedClients.splice(index, 1); } } private processRequest(request: BaseMessage): Observable { return new Observable((observer) => { this.responseFromApp.subscribe(message => { // console.log(message) if (message.header.messageID === request.header.messageID) { if (!message.complete) { observer.next(message) } else { observer.next(message) observer.complete() } } }) }) } } export interface ClientInfo { id: string, clientName: string, connectedAt: Date, clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>, requests: { message: any, completed: boolean }[], buffer: RetransmissionService, responseObs: Subject }