Bladeren bron

socket fixes

Dr-Swopt 1 maand geleden
bovenliggende
commit
d6a465642b

+ 2 - 2
.env

@@ -1,5 +1,5 @@
 ;Transport = "Websocket, Http"
 ;PORT = 3000, 3001
-Transport = "Websocket"
-# Transport = "Http"
+# Transport = "Websocket"
+Transport = "Http"
 PORT = 3000

+ 1 - 1
src/connector/connector.manager.ts

@@ -27,7 +27,7 @@ export class ConnectionManager implements ConnectionManagerInterface {
     async getAdapter(clientId: string): Promise<AdapterSet> {
         return new Promise((resolve, reject) => {
             console.log(`Instantiating an adapter set....`)
-            let transportType: Transport = Transport.Websocket // as default  for now
+            let transportType: Transport = process.env.Transport as unknown as Transport // as default  for now
             let adapterId: string = clientId
             let transportService: TransportService | undefined = this.transportServiceArray.find(obj => obj.getInfo() == transportType)
             if (transportService) {

+ 1 - 1
src/interface/connector.interface.ts

@@ -77,7 +77,7 @@ export interface TransportEvent {
     data: any
 }
 
-export type Event = 'Server Started' | 'New Client' | 'Client Disconnected' | 'Client Reconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Reconnected` | `New Transport` | 'New Adapter'
+export type Event = 'Server Started' | 'New Client' | 'Client Connected' |'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter'
 
 export interface TransportService {
     getInfo(): Transport

+ 20 - 1
src/test/receiver.ts

@@ -1,4 +1,4 @@
-import { filter, map, Observable, Observer, Subject } from "rxjs";
+import { filter, interval, map, Observable, Observer, Subject } from "rxjs";
 import { Bus, FisMessage, MessageTransmission } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
@@ -10,6 +10,7 @@ class Supervisor {
     private transmissionManager!: MessageTransmissionManager
     private event: Subject<TransportEvent> = new Subject()
     private transmissionSets: MessageTransmission[] = []
+    private outgoingPipe: Subject<any> = new Subject()
 
     constructor() {
         this.transmissionManager = new MessageTransmissionManager(this.event, this.isClient)
@@ -18,7 +19,9 @@ class Supervisor {
             this.transmissionSets.push(transmissionSet)
 
             this.handleActivity(transmissionSet)
+            this.outgoingPipe.subscribe(message => transmissionSet.transmitter.emit(message))
         })
+
     }
 
     // only called once for each connected clients.
@@ -40,6 +43,8 @@ class Supervisor {
         //     next: res => console.log(res),
         //     complete: () => console.log(`Responses Completed for request: ${request.header.messageID}`)
         // })
+
+        this.startGeneratingRequest(1000, this.outgoingPipe)
     }
 
     private request(request: FisMessage, messageTransmission: MessageTransmission): Observable<any> {
@@ -59,6 +64,20 @@ class Supervisor {
         })
     }
 
+    
+    private startGeneratingRequest(intervalDuration: number, requestsPipe: Subject<FisMessage>) {
+        interval(intervalDuration).subscribe(time => {
+            let message: FisMessage = {
+                header: {
+                    messageID: uuidv4(),
+                    messageName: 'RequestMessage'
+                },
+                data: 'Data'
+            }
+            requestsPipe.next(message)
+        })
+    }
+
 }
 
 let supervisor = new Supervisor()

+ 3 - 2
src/test/transmitter.ts

@@ -28,9 +28,9 @@ class Supervisor {
     private handleClientActivity(messageTransmission: MessageTransmission): void {
         // start listening to incoming messages from this client
         messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransportEvent) => {
-            // console.log(event) 
+            console.log(event) // receiving end
             let requestMessage: FisMessage = ((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage
-            this.clientIncomingMessage.next(requestMessage)
+            // this.clientIncomingMessage.next(requestMessage)
             this.messageProducer.getOutgoingMessages().pipe(
                 filter(message => message.header.messageID === requestMessage.header.messageID)
             ).subscribe(message => {
@@ -42,6 +42,7 @@ class Supervisor {
         this.messageProducer.getNotificationMessage().subscribe((message: FisMessage) => {
             messageTransmission.transmitter.emit(message)
         })
+
     }
 
 }

+ 1 - 1
src/transmission/msg.transmission.manager.ts

@@ -103,7 +103,7 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
             filter((event: TransportEvent) => event.event === eventName)
         ).subscribe(event => {
             // assuming this is reconnection case
-            if (event.event == 'Client Reconnected') {
+            if (event.event == 'Client Connected') {
                 this.reconnectionHandler((event.data as EventMessage).clientId)
             }
 

+ 1 - 0
src/transmission/msg.transmission.receiver.ts

@@ -39,6 +39,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
                     checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
                         // only release the message before it exists
                         console.log(`This one passes. Does have previousID. Case for message ordering`) 
+                        // console.log(((event.data as TransportMessage).payload as WrappedMessage))
                         observable.next(event);
                     }).catch((error) => console.error(error))
                 });

+ 5 - 4
src/transmission/msg.transmission.transmitter.ts

@@ -3,7 +3,7 @@ import { EventMessage, FisMessage, MessageTransmitter as MessageTransmitterInter
 import { AdapterSet, ConnectionAdaptorBase, ConnectionState, Event, Transport, TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
-import { BehaviorSubject, filter, map, Observable, Subject } from "rxjs";
+import { BehaviorSubject, distinct, filter, map, Observable, Subject } from "rxjs";
 import { RetransmissionService } from "../utils/retransmission.service";
 import { WrappedMessage } from "../utils/message.ordering";
 import { ConnectionAdapter } from "../connector/connector.base";
@@ -29,15 +29,16 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     setUpRetransmission(): void {
         let connectionStateEvent = new BehaviorSubject<'OFFLINE' | 'ONLINE'>('ONLINE')
         this.event.pipe(
-            filter(event => event.event == 'Client Disconnected' || event.event == 'Client Reconnected'),
+            filter(event => event.event == 'New Client' || event.event == 'Client Disconnected' || event.event == 'Client Connected' || event.event == 'Server Disconnected' || event.event == 'Server Connected' || event.event == 'New Server'),
             filter(event => (event.data as EventMessage).clientId == this.transmitterProfile.id),
             map(event => {
-                if (event.event == 'Client Disconnected') {
+                if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
                     return 'OFFLINE'
                 } else {
                     return `ONLINE`
                 }
-            })
+            }),
+            distinct()
         ).subscribe((signal: ConnectionState) => {
             connectionStateEvent.next(signal)
         })

+ 3 - 3
src/transport/http.ts

@@ -29,14 +29,14 @@ export class HttpTransportService implements TransportService {
 
         // for server usage
         if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
-            clientObj.instance.emit(`message`, message.payload)
+            clientObj.responseStream.next(message.payload as WrappedMessage)
         }
         // for client usage
         if (serverObj && serverObj.connectionState.getValue() == 'ONLINE') {
-            fetch(`${this.baseUrl}/message`, {
+            fetch(`${this.baseUrl}message`, {
                 method: 'POST',
                 headers: { 'Content-Type': 'application/json' },
-                body: JSON.stringify(message),
+                body: JSON.stringify(message.payload),
             }).catch((error) => console.error('HTTP emit error:', error));
         }
 

+ 82 - 36
src/utils/http.utils.ts

@@ -17,11 +17,6 @@ export function startHttpServer(port: number): Observable<ConnectedHttpClient> {
         // Middleware to parse JSON requests
         app.use(express.json());
 
-        // Handling a GET request
-        app.get('/', (req, res) => {
-            res.send('Hello, World!');
-        });
-
         app.listen(port, () => {
             console.log(`Server running at http://localhost:${port}`);
         });
@@ -36,10 +31,6 @@ export function startHttpServer(port: number): Observable<ConnectedHttpClient> {
 }
 
 export async function initiateClientToServer(url: string, event: Subject<TransportEvent>, connectedHttpServers: ConnectedHttpServer[], browserEnv?: boolean): Promise<ConnectedHttpServer> {
-    /* Here's what needs to be done. Set up profile first before attempting to long poll.
-    Essentially, this is setting to receive responses from the server. Need to have additional checkign 
-    to see if hte server's connection status. With regards to sending request, well just utilize
-    the fetch method as written in the service. */
     return new Promise((resolve, reject) => {
         let clientName!: string
         let receiverProfileInfo!: ConnectedHttpServer
@@ -56,7 +47,7 @@ export async function initiateClientToServer(url: string, event: Subject<Transpo
                             writeFile(data.message as ConnectedHttpServer, (data.message as ConnectedHttpServer).id).then(() => {
                                 event.next({
                                     id: uuidv4(),
-                                    event: 'Server Reconnected',
+                                    event: 'Server Connected',
                                     data: {
                                         clientId: (data.message as ConnectedHttpServer).id,
                                         message: `Existing Http Channel ${(data.message as ConnectedHttpServer).id} re-established.`
@@ -97,12 +88,18 @@ export async function initiateClientToServer(url: string, event: Subject<Transpo
 
 export function handleClientHttpConnection(url: string, server: ConnectedHttpServer): Observable<TransportEvent> {
     return new Observable((observer: Observer<TransportEvent>) => {
-        // Recursive function to handle long polling
-        const longPoll = async () => {
+        server.connectionState.next(`ONLINE`);
+
+        const longPoll = async (retryCount = 0) => {
             try {
-                const response = await fetch(url); // Make the HTTP request to the server
+                const controller = new AbortController();
+                const timeout = setTimeout(() => controller.abort(), 10000); // 10s timeout
+
+                const response = await fetch(url + `/poll`, { signal: controller.signal });
+                clearTimeout(timeout);
+
                 if (response.ok) {
-                    const data = await response.json() as WrappedMessage;
+                    const data = (await response.json()) as WrappedMessage;
                     observer.next({
                         id: uuidv4(),
                         event: 'New Message',
@@ -111,34 +108,69 @@ export function handleClientHttpConnection(url: string, server: ConnectedHttpSer
                             dateCreated: new Date(),
                             transport: Transport.Http,
                             target: server.id,
-                            payload: data
-                        } as TransportMessage
-                    }); // Emit the received message to the Observable
+                            payload: data,
+                        } as TransportMessage,
+                    });
                 } else if (response.status === 204) {
-                    // No Content (keep polling for more updates)
                     console.log('No new messages from the server.');
                 } else {
                     throw new Error(`Unexpected response status: ${response.status}`);
                 }
-            } catch (error) {
-                observer.error(error); // Notify observer of any errors
-                return; // Stop polling on errors
+
+                retryCount = 0; // Reset retry count on success
+            } catch (error: unknown) {
+                // Handle unknown errors
+                let errorMessage: string;
+
+                if (error instanceof Error) {
+                    // Error is of type Error
+                    errorMessage = error.message;
+                    console.error(`Polling error: ${errorMessage}`);
+                } else if (typeof error === 'string') {
+                    // Error is a string
+                    errorMessage = error;
+                    console.error(`Polling error: ${errorMessage}`);
+                } else {
+                    // Fallback for unknown types
+                    errorMessage = 'An unknown error occurred during polling.';
+                    console.error(errorMessage);
+                }
+
+                retryCount++;
+                if (retryCount > 10) {
+                    console.error('Max retry attempts reached. Stopping polling.');
+                    observer.error(new Error(errorMessage)); // Notify observer of final failure
+                    return;
+                }
+
+                console.log(`Retrying (${retryCount})...`);
+                await new Promise((resolve) => setTimeout(resolve, retryCount * 1000)); // Exponential backoff
             }
 
-            // Continue polling after processing the response
-            setTimeout(longPoll, 0); // Optionally add a delay to avoid overwhelming the server
+            setTimeout(() => longPoll(retryCount), 0);
         };
 
-        // Start the long polling
         longPoll();
 
-        // Cleanup logic when the observable is unsubscribed
         return () => {
             console.log('Unsubscribed from the long-polling channel.');
+            observer.next({
+                id: uuidv4(),
+                event: 'Server Disconnected',
+                data: {
+                    clientId: server.id,
+                    message: '',
+                    payload: {
+                        time: new Date()
+                    }
+                } as EventMessage
+            })
+            server.connectionState.next(`OFFLINE`);
         };
     });
 }
 
+
 async function updateProfileAndPublishEvent(clientName: string | undefined, receiverProfileInfo: ConnectedHttpServer, profile: { name: string, message: any }, event: Subject<TransportEvent>, connectedHttpServers: ConnectedHttpServer[]): Promise<ConnectedHttpServer> {
     return new Promise((resolve, reject) => {
         console.log(`Assigned client Name: ${(profile.message as ConnectedHttpServer).id}`)
@@ -249,14 +281,14 @@ function handleProfile(app: Express, data: { name: `Old Client` | `New Client`,
                 startListeningAndStreaming(app, clientInstance, event)
                 event.next({
                     id: uuidv4(),
-                    event: 'Client Reconnected',
+                    event: 'Client Connected',
                     data: {
                         clientId: clientInstance.id,
                         message: `Client ${clientInstance.id} connection re-established`,
                         payload: clientInstance
                     } as EventMessage
                 })
-                
+
             } else {
                 console.log(`Profile Not Found`)
                 res.json({ name: 'Error', message: 'Receiver Profile Not found' })
@@ -326,12 +358,6 @@ export function addClientToDB(entry: ConnectedHttpClient, filePath: string = 'cl
 
 // this is for server usage only
 export function startListeningAndStreaming(app: Express, client: ConnectedHttpClient, eventListener: Observer<TransportEvent>): void {
-    // sample
-    app.post('/data', (req, res) => {
-        const { name, age } = req.body;
-        res.json({ message: `Received data: ${name}, ${age}` });
-    });
-
     /* Generally, we don't need this unless in the case of being the receiver */
     app.post('/message', (req, res) => {
         eventListener.next({
@@ -350,7 +376,17 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
 
     app.get('/poll', (req, res) => {
         console.log('Client connected for long polling.');
-        client.connectionState.next(`ONLINE`)
+        eventListener.next({
+            id: uuidv4(),
+            event: 'Client Connected',
+            data: {
+                clientId: client.id,
+                message: '',
+                payload: {
+                    time: new Date()
+                }
+            } as EventMessage
+        })
 
         // Subscribe to the data stream
         const subscription: Subscription = client.responseStream.asObservable().subscribe({
@@ -371,8 +407,18 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
 
         // Handle client disconnection
         res.on('close', () => {
-            client.connectionState.next('OFFLINE')
-            console.log('Client disconnected.');
+            console.error(`Http Client ${client.id} disconnected`);
+            eventListener.next({
+                id: uuidv4(),
+                event: 'Client Disconnected',
+                data: {
+                    clientId: client.id,
+                    message: '',
+                    payload: {
+                        time: new Date()
+                    }
+                } as EventMessage
+            })
             subscription.unsubscribe(); // Ensure cleanup
         });
     });

+ 19 - 17
src/utils/socket.utils.ts

@@ -132,7 +132,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                     id: (data.message as ConnectedServerSocket).id,
                     dateCreated: new Date(),
                     socketInstance: socket,
-                    connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
+                    connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`)
                 }
                 serversConnected.push(receiverProfileInfo)
             }
@@ -143,7 +143,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                     // broadcast event to allow retransmission to release buffer
                     eventNotification.next({
                         id: uuidv4(),
-                        event: 'Server Reconnected',
+                        event: 'Server Connected',
                         data: {
                             clientId: (data.message as ConnectedServerSocket).id,
                             message: `Existing Websocket Channel ${(data.message as ConnectedServerSocket).id} re-established.`
@@ -200,7 +200,7 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
                     id: uuidv4(), // client should only be assigned at this level. And is passed around for reference pointing
                     dateCreated: new Date(),
                     socketInstance: socket,
-                    connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
+                    connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`)
                 }
                 // send to receiver for reference
                 socket.emit('profile', {
@@ -241,23 +241,11 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
                         clientInstance.socketInstance = socket
                         // some explanation here. For the case where the server reads from the DB, no need to terminate subject, since all instances would be destroyed alongside the server shut down. This case is specificd only when there's a need to read from local file
                         if (!clientInstance.connectionState) {
-                            clientInstance.connectionState = new BehaviorSubject<ConnectionState>(`ONLINE`)
+                            clientInstance.connectionState = new BehaviorSubject<ConnectionState>(`OFFLINE`)
                         }
                         // need to start listening again, because it's assigned a different socket instance this time round
                         startListening(socket, clientInstance, event)
-                        event.next({
-                            id: uuidv4(),
-                            event: 'Client Reconnected',
-                            data: {
-                                clientId: clientInstance.id,
-                                message: `Client ${clientInstance.id} connection re-established`,
-                                payload: clientInstance
-                            } as EventMessage
-                        })
-                        // Resume operation
-                        if (clientInstance.connectionState.getValue() == 'OFFLINE') {
-                            clientInstance.connectionState.next(`ONLINE`)
-                        }
+
                     } else {
                         console.log(`Profile Not Found`)
                         socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' })
@@ -375,6 +363,20 @@ export async function checkOwnClientInfo(filename?: string): Promise<ConnectedSe
 
 // this is for server usage only
 export function startListening(socket: SocketForConnectedClient, client: ConnectedClientSocket, eventListener: Observer<TransportEvent>): void {
+    // notify it's associated retransmission to start releaseing buffer
+    eventListener.next({
+        id: uuidv4(),
+        event: `Client Connected`,
+        data: {
+            clientId: client.id,
+            message: `Socket Client Connected. Adapter ID assigned: ${client.id}`,
+            payload: client
+        } as EventMessage
+    })
+    // Resume operation
+    if (client.connectionState.getValue() == 'OFFLINE') {
+        client.connectionState.next(`ONLINE`)
+    }
     /* Generally, we don't need this unless in the case of being the receiver */
     socket.on('message', (message: any) => {
         eventListener.next({