123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- import { BehaviorSubject, Observable, Subject } from "rxjs"
- import { RetransmissionService } from "../../services/retransmission.service"
- import { BaseMessage } from "../../dependencies/logging/services/logging-service"
- import { v4 as uuidV4 } from 'uuid';
- import { Socket } from "socket.io-client";
- const express = require('express');
- const http = require('http');
- const { Server } = require('socket.io');
- export class SocketService {
- private connectedClients: ClientInfo[] = []
- private announcements: Subject<any> = new Subject()
- private app = express();
- private server = http.createServer(this.app);
- private io = new Server(this.server);
- private responseFromApp: Subject<BaseMessage>
- private incomingRequest: Subject<BaseMessage> = new Subject()
- constructor(response: Subject<BaseMessage>) {
- this.responseFromApp = response
- this.announcements.subscribe(announcement => {
- console.log(`Server Announcement: ${announcement}`)
- })
- }
- public getIncomingRequest(): Observable<BaseMessage> {
- return this.incomingRequest.asObservable()
- }
- public async setUpConnection() {
- this.io.on('connection', (socket) => {
- this.announcements.next('a client is connected:' + socket.id);
- let clientInfo: ClientInfo | null
- socket.on('connect', (msg) => {
-
- });
- socket.on('notification', (msg) => {
- console.log(msg)
- clientInfo = this.handleNotification(msg, socket, clientInfo)
- })
-
- socket.on('request', (request: BaseMessage) => {
- if (clientInfo) {
- this.announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.clientName}`);
-
- this.incomingRequest.next(request)
- this.processRequest(request).subscribe({
- next: message => {
-
- clientInfo.responseObs.next(message)
- },
- error: err => console.error(err),
- complete: () => { }
- })
- } else {
- console.log(`Client is still not defined. Please have this client set up the credentials`)
- socket.emit('notification', {
- notification: 'Failed Request',
- data: request,
- message: 'Client Credentials is not properply set up! Cannot process requests at the moment.'
- })
- }
- });
-
- socket.on('disconnect', () => {
- if (clientInfo) {
- clientInfo.clientConnectionState.next('OFFLINE')
- this.announcements.next(`Client ${clientInfo.id} disconnected`);
-
- }
- });
- });
- this.io.engine.on("connection_error", (err) => {
- console.log(err.req);
- console.log(err.code);
- console.log(err.message);
- console.log(err.context);
- });
-
- const PORT = process.env.PORT || 3000;
- this.server.listen(PORT, () => {
- console.log(`Server listening on port ${PORT}`);
- });
- }
-
-
- private deleteClientById(id) {
- const index = this.connectedClients.findIndex(item => item.id === id);
- if (index !== -1) {
- this.connectedClients.splice(index, 1);
- }
- }
- private processRequest(request: BaseMessage): Observable<BaseMessage> {
- return new Observable((observer) => {
- this.responseFromApp.subscribe(message => {
-
- if (message.header.messageID === request.header.messageID && message.header.messageName != 'Complete') {
- observer.next(message)
- }
- if (message.header.messageID === request.header.messageID && message.header.messageName == 'Complete') {
- observer.next(message)
-
- observer.complete()
- }
- })
- })
- }
- private handleNotification(msg: any, socket: Socket, clientInfo: ClientInfo | null) {
- if (msg.agenda == 'newClient') {
- clientInfo = {
- id: socket.id,
- clientName: uuidV4(),
- connectedAt: new Date(),
- clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
- requests: [],
- buffer: new RetransmissionService(),
- responseObs: new Subject<BaseMessage>()
- }
- this.connectedClients.push(clientInfo);
-
- socket.emit('notification', {
- notification: 'Your credentials',
- createdAt: new Date(),
- socketInfo: clientInfo
- })
-
- clientInfo.buffer.retransmission(clientInfo.responseObs, clientInfo.clientConnectionState).subscribe(output => {
-
- socket.emit('response', output)
- })
- }
- if (msg.agenda == 'existingClient') {
-
- let clientObj = this.connectedClients.find(obj => obj.clientName === msg.data.clientName)
- if (clientObj) {
- clientInfo = clientObj
- console.log('Existing client found')
-
- clientObj.id = socket.id
-
- socket.emit('notification', {
- notification: 'Your updated credentials',
- connectedAt: new Date(),
- socketInfo: clientInfo
- })
- socket.emit('notification', `Hello from server. You have been assigned ${socket.id}`);
-
- clientObj.clientConnectionState.next('ONLINE')
- } else {
- console.log(this.connectedClients)
- console.log(`Existing Client is not found`)
- }
- }
- return clientInfo
- }
- }
- export interface ClientInfo {
- id: string,
- clientName: string,
- connectedAt: Date,
- clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>,
- requests: { message: any, completed: boolean }[],
- buffer: RetransmissionService,
- responseObs: Subject<BaseMessage>
- }
|