瀏覽代碼

include proxy to simulate offline disconnection without shutting down server and client

Enzo 2 月之前
父節點
當前提交
74c98782bd

+ 3 - 0
2d05fca6-e7aa-4eb1-900a-3da684c150be.json

@@ -0,0 +1,3 @@
+{
+  "id": "2d05fca6-e7aa-4eb1-900a-3da684c150be"
+}

+ 8 - 0
clients.json

@@ -0,0 +1,8 @@
+[
+  {
+    "id": "2d05fca6-e7aa-4eb1-900a-3da684c150be",
+    "dateCreated": "2024-11-12T07:18:20.740Z",
+    "connectionState": null,
+    "socketInstance": null
+  }
+]

+ 1 - 0
package.json

@@ -7,6 +7,7 @@
     "test": "echo \"Error: no test specified\" && exit 1",
     "build": "tsc",
     "start": "node dist/index.js",
+    "proxy": "node dist/test/proxy.js",
     "transmitter": "node dist/test/transmitter.js",
     "receiver": "node dist/test/receiver.js"
   },

+ 3 - 1
src/interface/connector.interface.ts

@@ -71,10 +71,12 @@ export interface TransportMessage {
 
 export interface TransportEvent {
     id: string,
-    event: 'Server Started' | 'New Client' | 'Client Disconnected' | 'Client Reconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `New Transport` | 'New Adapter',
+    event: Event,
     data: any
 }
 
+export type Event = 'Server Started' | 'New Client' | 'Client Disconnected' | 'Client Reconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `New Transport` | 'New Adapter'
+
 export interface TransportService {
     getInfo(): Transport
     emit(message: TransportMessage): void

+ 83 - 0
src/test/proxy.ts

@@ -0,0 +1,83 @@
+import { Socket as ClientSocket, io } from 'socket.io-client'
+import { Server, Socket as SocketForConnectedClient } from "socket.io"
+import { Subject } from "rxjs";
+import { createServer } from "http";
+
+let fromServer = new Subject<{ event: 'profile' | 'message', payload: any }>()
+let toServer = new Subject<{ event: 'profile' | 'message', payload: any }>()
+
+startSocketServer(3001)
+startClientSocketConnection('http://localhost:3000')
+
+
+function consoleLog(): void {
+    fromServer.subscribe(message => console.log(`From Server`, message.event))
+    toServer.subscribe(message => console.log(`To Server`, message.event))
+}
+
+function startSocketServer(port: number): void {
+    console.log(`Socket Server ${port} Started....`)
+    let httpServer = createServer();
+    let socketServer = new Server(httpServer)
+
+    socketServer.on('connection', (socket: SocketForConnectedClient) => {
+        socket.on(`profile`, (msg) => {
+            toServer.next({ event: 'profile', payload: msg })
+        })
+
+        socket.on('message', (msg) => {
+            toServer.next({ event: 'message', payload: msg })
+        })
+
+        socket.on(`disconnect`, () => {
+            console.log(`I shouldn't be here lol`)
+        })
+
+        fromServer.subscribe(message => {
+            socketServer.emit(message.event, message.payload)
+        })
+    })
+
+    socketServer.engine.on("connection_error", (err) => {
+        console.log(err.req);      // the request object
+        console.log(err.code);     // the error code, for example 1
+        console.log(err.message);  // the error message, for example "Session ID unknown"
+        console.log(err.context);  // some additional error context
+    });
+
+    // Start the HTTP server on 127.0.0.1 with the given port
+    httpServer.listen(port, '0.0.0.0', () => {
+        console.log(`Socket server listening on ${port}`);
+    });
+
+}
+
+function startClientSocketConnection(serverUrl: string): void {
+    // let clientSocket = io(serverUrl)
+    let clientSocket: ClientSocket = io(serverUrl, {
+        reconnection: true,              // Enable automatic reconnections
+        reconnectionAttempts: 1000,       // Retry up to 10 times
+        reconnectionDelay: 500,          // Start with a 500ms delay
+        reconnectionDelayMax: 10000,     // Delay can grow to a max of 10 seconds
+        randomizationFactor: 0.3,
+    })
+
+    toServer.subscribe(message => { clientSocket.emit(message.event, message.payload) })
+
+    clientSocket.on(`connect`, () => {
+
+    })
+
+    clientSocket.on(`profile`, (msg) => {
+        fromServer.next({ event: 'profile', payload: msg })
+    })
+
+    clientSocket.on(`message`, (msg) => {
+        fromServer.next({ event: 'message', payload: msg })
+    })
+
+    clientSocket.on(`disconnect`, () => {
+
+    })
+}
+

+ 3 - 1
src/test/receiver.ts

@@ -44,6 +44,8 @@ class SocketClient {
 }
 
 // Usage example:
-const client = new SocketClient("http://localhost:3000");
+const client = new SocketClient("http://localhost:3001");
+// const client = new SocketClient("http://127.0.0.1:3000");
+// const client = new SocketClient("http://192.168.100.96:3000");
 
 // Send a message

+ 1 - 1
src/test/transmitter.ts

@@ -129,7 +129,7 @@ class MessageProducer {
                     const message: FisMessage = {
                         header: {
                             messageID: uuidv4(),
-                            messageName: 'ResponseMessage'
+                            messageName: 'NotificationMessage'
                         },
                         data: `Data`
                     };

+ 57 - 28
src/transmission/msg.transmission.manager.ts

@@ -1,16 +1,15 @@
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { ConnectionManager } from "../connector/connector.manager";
-import { EventMessage, MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmissionProfile, TransmitterProfile } from "../interface/transport.interface";
+import { EventMessage, MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmitterProfile } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
-import { AdapterSet, AdaptorTransmissionRole, RequestResponseConnectionAdapter, Transport, TransportEvent, TransportService } from "../interface/connector.interface";
+import { AdapterSet, Transport, TransportEvent, TransportService, Event } from "../interface/connector.interface";
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
 import { filter, Observable, Observer, Subject } from "rxjs";
 import { WebsocketTransportService } from "../transport/websocket";
 import { HttpTransportService } from "../transport/http";
-import { error } from "console";
 
-export class MessageTransmissionManager implements MessageTransmissionManagerInterface{
+export class MessageTransmissionManager implements MessageTransmissionManagerInterface {
     private transportServiceArray: TransportService[] = []
     private transportSet: Set<TransportSet> = new Set()
     transmission: MessageTransmission[] = []
@@ -27,6 +26,12 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
         this.transportSet.forEach(set => {
             this.setUpTransportService(set, event)
         })
+
+        this.event.subscribe(event => console.log(`event`, event))
+
+        // note that if this server is down, all these instances of transmission and connector would be lost as well. SO cannot just simply find "instances" and reuse them. Must reinstantiate them again
+        this.handleEvent(`Client Disconnected` as Event, event)
+        this.handleEvent('Client Reconnected' as Event, event)
     }
 
     /* so there will be some changes here. will nto be assigning just one, but all of them dynamically to pour into this boy
@@ -37,34 +42,36 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
                 filter(event => event.event == 'New Client')
             ).subscribe(event => {
                 // get all adapters for all the connection
-                let adapterSet: AdapterSet[] = []
-                let clientId: string = (event.data as EventMessage).clientId
-                if (this.transportServiceArray.length > 0) {
-                    this.transportServiceArray.forEach(transport => {
-                        adapterSet.push(this.connectionManager.getAdapter(clientId, transport))
-                    })
-                } else {
-                    observer.error('No Transport is instantiated.... ERROR!!!!')
-                }
-
-                // 1 set only
-                let transmitter: MessageTransmissionTransmitter = this.getTransmitter(clientId, adapterSet, this.event.asObservable())
-                let receiver: MessageTransmissionReceiver = this.getReceiver(clientId, adapterSet, this.event.asObservable())
-                let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver, this.event.asObservable())
-                let transmission: MessageTransmission = {
-                    id: clientId,
-                    transmitter: transmitter,
-                    receiver: receiver,
-                    requestResponse: requestResponse,
-                    event: this.event.asObservable()
-                }
-                this.transmission.push(transmission)
-
-                observer.next(transmission)
+                observer.next(this.instantiateComponents((event.data as EventMessage).clientId))
             })
         })
     }
 
+    private instantiateComponents(clientId: string): MessageTransmission {
+        let adapterSet: AdapterSet[] = []
+        if (this.transportServiceArray.length > 0) {
+            this.transportServiceArray.forEach(transport => {
+                adapterSet.push(this.connectionManager.getAdapter(clientId, transport))
+            })
+        } else {
+            throw new Error(`Transmission Manager: No transport is Instantiated`)
+        }
+
+        // 1 set only
+        let transmitter: MessageTransmissionTransmitter = this.getTransmitter(clientId, adapterSet, this.event.asObservable())
+        let receiver: MessageTransmissionReceiver = this.getReceiver(clientId, adapterSet, this.event.asObservable())
+        let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver, this.event.asObservable())
+        let transmission: MessageTransmission = {
+            id: clientId,
+            transmitter: transmitter,
+            receiver: receiver,
+            requestResponse: requestResponse,
+            event: this.event.asObservable()
+        }
+        this.transmission.push(transmission)
+        return transmission
+    }
+
 
     private getTransmitter(transmissionId: string, adapterSets: AdapterSet[], event: Observable<TransportEvent>): MessageTransmissionTransmitter {
         let transmitterProfile: TransmitterProfile = {
@@ -124,6 +131,28 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
             transportSet.add({ transport: transport, port: portList[index] } as unknown as TransportSet)
         })
     }
+
+
+    private handleEvent(eventName: Event, eventObs: Observable<TransportEvent>): void {
+        eventObs.pipe(
+            filter((event: TransportEvent) => event.event === eventName)
+        ).subscribe(event => {
+            // assuming this is reconnection case
+            if(event.event == 'Client Reconnected') {
+                this.reconnectionHandler((event.data as EventMessage).clientId)
+            } 
+            // can include more event handlers here
+        })
+    }
+
+    private reconnectionHandler(clientId: string): void {
+        let transmissionObj: MessageTransmission | undefined = Array.from(this.transmission).find(obj => obj.id === clientId)
+        if (!transmissionObj) {
+            let transmission: MessageTransmission = this.instantiateComponents(clientId)
+            this.transmission.push(transmission)
+        }
+    }
+    
 }
 
 

+ 4 - 4
src/transmission/msg.transmission.transmitter.ts

@@ -1,9 +1,9 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
-import { FisMessage, MessageTransmitter as MessageTransmitterInterface, TransmitterProfile } from '../interface/transport.interface'
-import { AdapterSet, Transport, TransportEvent } from "../interface/connector.interface";
+import { EventMessage, FisMessage, MessageTransmitter as MessageTransmitterInterface, TransmitterProfile } from '../interface/transport.interface'
+import { AdapterSet, Event, Transport, TransportEvent } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
-import { Observable } from "rxjs";
+import { filter, Observable } from "rxjs";
 
 /* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
 connectors or adapters will have their own identifier*/
@@ -14,7 +14,6 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         super(event)
         this.setTransmitter(profile)
         this.setUpAdapter(adapterSets)
-
     }
 
     setTransmitter(transmitterProfile: TransmitterProfile): void {
@@ -35,4 +34,5 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         // for now just hardcode to use 1 adapter type until connection manager is further enhacne to configure adapters on the fly
         this.adapterService = this.adaptorsArray.find(obj => obj.getInfo().transportType === Transport.Websocket)
     }
+
 }

+ 7 - 6
src/utils/socket.utils.ts

@@ -7,7 +7,6 @@ import { v4 as uuidv4 } from 'uuid'
 import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
 import { ConnectedClientSocket, ConnectedServerSocket } from '../transport/websocket';
 import { EventMessage, FisMessage } from '../interface/transport.interface';
-import { error } from 'console';
 
 export function startSocketServer(port: number): Observable<SocketForConnectedClient> {
     return new Observable((observer) => {
@@ -28,10 +27,12 @@ export function startSocketServer(port: number): Observable<SocketForConnectedCl
                 console.log(err.context);  // some additional error context
             });
 
-            // Start the socketServer
-            httpServer.listen(port)
+            // Start the HTTP server on 127.0.0.1 with the given port
+            httpServer.listen(port, '0.0.0.0', () => {
+                console.log(`Socket server listening on ${port}`);
+            });
         } catch (error) {
-            observer.error(error)
+            observer.error(error);
         }
     })
 }
@@ -280,7 +281,7 @@ export async function writeFile(data: ConnectedServerSocket, filename: string):
 }
 
 /* For Internal Usage only. Temporary serve as a way for server to keep track of clients. To be replaced in the future with better alternatives. */
-function addClientToDB(entry: ConnectedClientSocket, filePath: string = 'clients.json'): void {
+export function addClientToDB(entry: ConnectedClientSocket, filePath: string = 'clients.json'): void {
     try {
         let data: ConnectedClientSocket[] = [];
 
@@ -306,7 +307,7 @@ function addClientToDB(entry: ConnectedClientSocket, filePath: string = 'clients
     }
 }
 
-async function checkIfClientExists(id: string, filePath: string = 'clients.json'): Promise<ConnectedClientSocket> {
+export async function checkIfClientExists(id: string, filePath: string = 'clients.json'): Promise<ConnectedClientSocket> {
     return new Promise((resolve, reject) => {
         try {
             // Check if the file exists