Quellcode durchsuchen

some fixes. Although there' still quite a lot of problems overall. Need to test again thoroughly to make sure socket is still working as expected.

Dr-Swopt vor 1 Monat
Ursprung
Commit
24d32624dd

+ 2 - 2
.env

@@ -1,5 +1,5 @@
 ;Transport = "Websocket, Http"
 ;PORT = 3000, 3001
-# Transport = "Websocket"
-Transport = "Http"
+Transport = "Websocket"
+# Transport = "Http"
 PORT = 3000

+ 3 - 2
src/connector/connector.request.response.ts

@@ -5,6 +5,7 @@ import { RequestResponseConnectionAdapter as RequestResponseConnectionAdapterInt
 import { TransmitterConnectionAdapter } from './connector.transmitter';
 import { ReceiverConnectionAdapter } from './connector.receiver';
 import { filter, map, Observable, Observer, Subscription, takeWhile } from 'rxjs';
+import { WrappedMessage } from '../utils/message.ordering';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
@@ -21,7 +22,7 @@ export class RequestResponseConnectionAdapter extends ConnectionAdapter implemen
     }
 
     // Make use of the adapters ref passed in
-    send(message: FisMessage): Observable<FisMessage> {
+    send(message: WrappedMessage): Observable<FisMessage> {
         return new Observable((response: Observer<FisMessage>) => {
             // logic here
             this.transmitterAdapter.emit(message)
@@ -54,7 +55,7 @@ export class RequestResponseConnectionAdapter extends ConnectionAdapter implemen
         throw new Error('Method not implemented.');
     }
 
-    emit(message: FisMessage): void {
+    emit(message: WrappedMessage): void {
         throw new Error('Method not implemented.');
     }
 

+ 2 - 1
src/connector/connector.transmitter.ts

@@ -19,8 +19,9 @@ export class TransmitterConnectionAdapter extends ConnectionAdapter implements T
         this.setAdapterProfile(adapterId, adapterType)
     }
 
-    emit(message: FisMessage | WrappedMessage): void {
+    emit(message: WrappedMessage): void {
         // logic here
+        console.log(`Adapter Transmission Level. Emitting: `, (message.payload as FisMessage).header.messageID)
         this.connector.emit({
             id: this.connectorProfile.id,
             transport: this.connectorProfile.transportType,

+ 3 - 3
src/interface/connector.interface.ts

@@ -37,7 +37,7 @@ export interface ConnectionAdaptorBase {
 
 
 export interface TransmitterConnectionAdapter extends ConnectionAdaptorBase {
-    emit(message: FisMessage | WrappedMessage): void
+    emit(message: WrappedMessage): void
 }
 
 export interface ReceiverConnectionAdapter extends ConnectionAdaptorBase {
@@ -45,7 +45,7 @@ export interface ReceiverConnectionAdapter extends ConnectionAdaptorBase {
 }
 
 export interface RequestResponseConnectionAdapter extends TransmitterConnectionAdapter, ReceiverConnectionAdapter {
-    send(message: FisMessage): Observable<FisMessage>
+    send(message: WrappedMessage): Observable<FisMessage>
 }
 
 export type ConnectionState = 'ONLINE' | 'OFFLINE'
@@ -77,7 +77,7 @@ export interface TransportEvent {
     data: any
 }
 
-export type Event = 'Server Started' | 'New Client' | 'Client Connected' |'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter'
+export type Event = 'Server Started' | 'New Client' | 'Client Connected' |'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter' | 'Re-Flush'
 
 export interface TransportService {
     getInfo(): Transport

+ 1 - 1
src/test/receiver.ts

@@ -70,7 +70,7 @@ class Supervisor {
             let message: FisMessage = {
                 header: {
                     messageID: uuidv4(),
-                    messageName: 'RequestMessage'
+                    messageName: 'NotificationMessage'
                 },
                 data: 'Data'
             }

+ 4 - 5
src/test/transmitter.ts

@@ -28,7 +28,7 @@ class Supervisor {
     private handleClientActivity(messageTransmission: MessageTransmission): void {
         // start listening to incoming messages from this client
         messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransportEvent) => {
-            console.log(event) // receiving end
+            console.log(`General Bus`, event) // receiving end
             let requestMessage: FisMessage = ((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage
             // this.clientIncomingMessage.next(requestMessage)
             this.messageProducer.getOutgoingMessages().pipe(
@@ -39,10 +39,9 @@ class Supervisor {
         })
 
         // to emulate general notification. Send every second
-        this.messageProducer.getNotificationMessage().subscribe((message: FisMessage) => {
-            messageTransmission.transmitter.emit(message)
-        })
-
+        // this.messageProducer.getNotificationMessage().subscribe((message: FisMessage) => {
+        //     messageTransmission.transmitter.emit(message)
+        // })
     }
 
 }

+ 16 - 4
src/transmission/msg.transmission.transmitter.ts

@@ -11,7 +11,7 @@ import { ConnectionAdapter } from "../connector/connector.base";
 /* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
 connectors or adapters will have their own identifier*/
 export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
-    private messageToBeTransmitted!: Subject<FisMessage>
+    private messageToBeTransmitted!: Subject<FisMessage | WrappedMessage>
     transmitterProfile!: TransmitterProfile;
     retransmission!: RetransmissionService;
 
@@ -23,8 +23,11 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         this.setTransmitter(profile)
         this.setUpAdapter(adapter)
         this.setUpRetransmission()
+
+        // special case just for http in case of server/client disconnected, the unsent msg will be flushed back into messageToBeTransmitted
+        this.uniqueHandlerToFlushUnsentMessages(event)
     }
-    
+
     // by the time this transmission set is instantiated, the connected client would've been online. Need ot manually signal retransmission to release buffer immediately
     setUpRetransmission(): void {
         let connectionStateEvent = new BehaviorSubject<'OFFLINE' | 'ONLINE'>('ONLINE')
@@ -49,13 +52,13 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
             (this.mainAdapter as TransmitterConnectionAdapter).emit(message)
         })
     }
-    
+
     setTransmitter(transmitterProfile: TransmitterProfile): void {
         this.transmitterProfile = transmitterProfile
     }
 
     emit(message: FisMessage): void {
-        // for now only use one adapter
+        console.log(`Transmission Transmitter: `, message.header.messageID)
         this.messageToBeTransmitted.next(message)
     }
 
@@ -64,4 +67,13 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         this.mainAdapter = adapter
     }
 
+    private uniqueHandlerToFlushUnsentMessages(event: Observable<TransportEvent>): void {
+        event.pipe(
+            filter(event => event.event == 'Re-Flush'),
+            filter(event => (event.data as EventMessage).clientId == this.transmitterProfile.id),
+        ).subscribe((event: TransportEvent) => {
+            this.messageToBeTransmitted.next((((event.data as EventMessage).payload as TransportMessage).payload as WrappedMessage))
+        })
+    }
+
 }

+ 40 - 8
src/transport/http.ts

@@ -1,13 +1,16 @@
 import { Express } from 'express';
-import { Observable, Subject } from "rxjs";
+import { filter, Observable, Subject, Subscription, take } from "rxjs";
 import { ClientObject, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
 import config from '../config/config.json';
 import { handleClientHttpConnection, handleHttpClient, initiateClientToServer, startHttpServer } from "../utils/http.utils";
 import { WrappedMessage } from '../utils/message.ordering';
 import { error } from 'console';
+import axios, { AxiosError } from 'axios';
+import { EventMessage } from '../interface/transport.interface';
 
 export class HttpTransportService implements TransportService {
+    private retryLogicStarted: boolean = false
     private baseUrl!: string;
     private info: Transport = Transport.Http
     private connectedHttpServer: ConnectedHttpServer[] = [] // to allow the possibility of having to communicate with multiple servers as a client
@@ -33,11 +36,23 @@ export class HttpTransportService implements TransportService {
         }
         // for client usage
         if (serverObj && serverObj.connectionState.getValue() == 'ONLINE') {
-            fetch(`${this.baseUrl}message`, {
-                method: 'POST',
+            axios.post(`${this.baseUrl}message`, message.payload, {
                 headers: { 'Content-Type': 'application/json' },
-                body: JSON.stringify(message.payload),
-            }).catch((error) => console.error('HTTP emit error:', error));
+            })
+                .then((response) => {
+                    console.log('Response From Server:', response.status);
+                })
+                .catch((error: AxiosError) => {
+                    console.error('HTTP emit error:', error.code);
+                    this.transportEvent.next({
+                        id: uuidv4(),
+                        event: 'Re-Flush',
+                        data: {
+                            clientId: serverObj.id,
+                            payload: message
+                        } as EventMessage
+                    } as TransportEvent)
+                });
         }
 
     }
@@ -60,10 +75,27 @@ export class HttpTransportService implements TransportService {
         })
     }
 
-    public startClient(url: string): void {
-        initiateClientToServer(url, this.transportEvent, this.connectedHttpServer).then((connectedHttpServer: ConnectedHttpServer) => {
+    public startClient(url: string, receiverProfileInfo?: ConnectedHttpServer | undefined): void {
+        initiateClientToServer(url, this.transportEvent, this.connectedHttpServer, receiverProfileInfo).then((connectedHttpServer: ConnectedHttpServer) => {
             handleClientHttpConnection(url, connectedHttpServer).subscribe(this.transportEvent)
-        }).catch((error) => console.error(`HttpTransport ERROR:`, error))
+            if (!this.retryLogicStarted) this.retryConnection(url) // standby reconnection logic
+        }).catch((error: AxiosError) => {
+            console.error(`HttpTransport ERROR:`, error.code)
+            setTimeout(() => {
+                console.info(`Reconnecting to server....`)
+                this.startClient(url)
+            }, 5000); // Retry with delay
+        })
+    }
+
+    private retryConnection(url: string): void {
+        this.transportEvent.pipe(
+            filter(event => event.event === 'Server Disconnected'),
+            take(1) // Automatically unsubscribe after one emission
+        ).subscribe((event: TransportEvent) => {
+            console.log(`Server activity detected: Re-connect-ing to server ${(event.data as EventMessage).clientId}...`)
+            this.startClient(url, this.connectedHttpServer.find(obj => obj.id === (event.data as EventMessage).clientId)); // Restart the client
+        });
     }
 
 }

+ 4 - 1
src/transport/websocket.ts

@@ -3,6 +3,8 @@ import { Socket as ClientSocket } from 'socket.io-client'
 import { Socket as SocketForConnectedClient } from "socket.io"
 import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
 import { ClientObject, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
+import { WrappedMessage } from "../utils/message.ordering";
+import { FisMessage } from "../interface/transport.interface";
 
 /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
 export class WebsocketTransportService implements TransportService {
@@ -36,7 +38,6 @@ export class WebsocketTransportService implements TransportService {
     public startClient(url: string): void {
         // logic here
         startClientSocketConnection(url).then((socket: ClientSocket) => {
-            let clientName = '' // initiate a check in local file. If no client name, then this is new 
             handleClientSocketConnection(socket, this.connectedServer).subscribe(this.transportEvent)
         }).catch((error) => {
             console.error(`WebsocketTransport ERROR:`, error)
@@ -45,8 +46,10 @@ export class WebsocketTransportService implements TransportService {
 
 
     public emit(message: TransportMessage): void {
+        console.log(`Transport Socket service level. Emitting: ${((message.payload as WrappedMessage).payload as FisMessage).header.messageID}`)
         let clientObj: ConnectedClientSocket | undefined = this.connectedClientSocket.find(obj => obj.id == message.target)
         let serverObj: ConnectedServerSocket | undefined = this.connectedServer.find(obj => obj.id === message.target)
+        console.log(serverObj?.connectionState.getValue(), serverObj?.id)
         // for server usage
         if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
             clientObj.socketInstance.emit(`message`, message.payload)

+ 148 - 123
src/utils/http.utils.ts

@@ -7,7 +7,7 @@ import { BehaviorSubject, Observable, Observer, Subject, Subscription } from "rx
 import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
 import { EventMessage, FisMessage } from '../interface/transport.interface';
 import { WrappedMessage } from './message.ordering';
-import axios, { AxiosResponse } from 'axios';
+import axios, { AxiosError, AxiosResponse } from 'axios';
 import { error } from 'console';
 
 export function startHttpServer(port: number): Observable<ConnectedHttpClient> {
@@ -30,21 +30,21 @@ export function startHttpServer(port: number): Observable<ConnectedHttpClient> {
     })
 }
 
-export async function initiateClientToServer(url: string, event: Subject<TransportEvent>, connectedHttpServers: ConnectedHttpServer[], browserEnv?: boolean): Promise<ConnectedHttpServer> {
+export async function initiateClientToServer(url: string, event: Subject<TransportEvent>, connectedHttpServers: ConnectedHttpServer[], receiverProfileInfo: ConnectedHttpServer | undefined, browserEnv?: boolean,): Promise<ConnectedHttpServer> {
     return new Promise((resolve, reject) => {
-        let clientName!: string
-        let receiverProfileInfo!: ConnectedHttpServer
         if (browserEnv) {
             // logic here for using browser fetch
         } else { // axios methods
-            if (clientName) {
-                checkOwnClientInfo(clientName).then((profile: ConnectedHttpServer) => {
-                    receiverProfileInfo = profile
-                    postAxiosRequest(url + '/profile', { name: 'Old Client', data: profile }).then((profileInfo: any) => {
-                        writeFile(profileInfo.message, (profileInfo.message as ConnectedHttpServer).id).then((data: any) => {
-                            console.log(`Assigned client Name: ${(data.message as ConnectedHttpServer).id}`)
+            if (receiverProfileInfo) {
+                console.log(`Is Old profile, reconnecting with server`)
+                checkOwnClientInfo(receiverProfileInfo.id).then((profile: ConnectedHttpServer) => {
+                    receiverProfileInfo!.id = profile.id
+                    console.log(`jsonfile.`, profile)
+                    postAxiosRequest(url + '/profile', { name: 'Old Client', data: profile }).then((profileInfo: { name: string, message: { id: string } }) => {
+                        writeFile(profileInfo.message).then((data: any) => {
+                            console.log(`Assigned new client Id: ${(data.message as ConnectedHttpServer).id}`)
                             receiverProfileInfo = data.message as ConnectedHttpServer
-                            writeFile(data.message as ConnectedHttpServer, (data.message as ConnectedHttpServer).id).then(() => {
+                            writeFile(data.message).then(() => {
                                 event.next({
                                     id: uuidv4(),
                                     event: 'Server Connected',
@@ -62,10 +62,13 @@ export async function initiateClientToServer(url: string, event: Subject<Transpo
                                 resolve(clientObj)
                             }
                         })
+                    }).catch((error: AxiosError) => {
+                        reject(error)
                     })
                 }).catch((error) => {
-                    postAxiosRequest(url + '/profile', { name: 'New Client', data: null }).then((profileInfo: any) => {
-                        updateProfileAndPublishEvent(clientName, receiverProfileInfo, profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
+                    console.error(error)
+                    postAxiosRequest(url + '/profile', { name: 'New Client', data: null }).then((profileInfo: { name: string, message: any }) => {
+                        updateProfileAndPublishEvent((receiverProfileInfo as ConnectedHttpServer), profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
                             resolve(receiverProfileInfo)
                         })
                     }).catch((error) => {
@@ -74,8 +77,8 @@ export async function initiateClientToServer(url: string, event: Subject<Transpo
                     reject(error)
                 })
             } else {
-                postAxiosRequest(url + '/profile', { name: 'New Client', data: null }).then((profileInfo: any) => {
-                    updateProfileAndPublishEvent(clientName, receiverProfileInfo, profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
+                postAxiosRequest(url + '/profile', { name: 'New Client', data: null }).then((profileInfo: { name: string, message: any }) => {
+                    updateProfileAndPublishEvent(receiverProfileInfo, profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
                         resolve(receiverProfileInfo)
                     })
                 }).catch((error) => {
@@ -86,97 +89,102 @@ export async function initiateClientToServer(url: string, event: Subject<Transpo
     })
 }
 
+
 export function handleClientHttpConnection(url: string, server: ConnectedHttpServer): Observable<TransportEvent> {
     return new Observable((observer: Observer<TransportEvent>) => {
-        server.connectionState.next(`ONLINE`);
-
-        const longPoll = async (retryCount = 0) => {
-            try {
-                const controller = new AbortController();
-                const timeout = setTimeout(() => controller.abort(), 10000); // 10s timeout
-
-                const response = await fetch(url + `/poll`, { signal: controller.signal });
-                clearTimeout(timeout);
-
-                if (response.ok) {
-                    const data = (await response.json()) as WrappedMessage;
-                    observer.next({
-                        id: uuidv4(),
-                        event: 'New Message',
-                        data: {
-                            id: uuidv4(),
-                            dateCreated: new Date(),
-                            transport: Transport.Http,
-                            target: server.id,
-                            payload: data,
-                        } as TransportMessage,
+        server.connectionState.next('ONLINE');
+        let active: boolean = true; // Flag to control polling lifecycle
+
+        const longPoll = async () => {
+            while (active) {
+                try {
+                    // Axios request with timeout
+                    const response = await axios.get(`${url}/poll`, {
+                        timeout: 10000, // 10s timeout
                     });
-                } else if (response.status === 204) {
-                    console.log('No new messages from the server.');
-                } else {
-                    throw new Error(`Unexpected response status: ${response.status}`);
-                }
 
-                retryCount = 0; // Reset retry count on success
-            } catch (error: unknown) {
-                // Handle unknown errors
-                let errorMessage: string;
+                    if (response.status === 200) {
+                        const data = response.data as WrappedMessage;
+                        observer.next({
+                            id: uuidv4(),
+                            event: 'New Message',
+                            data: {
+                                id: uuidv4(),
+                                dateCreated: new Date(),
+                                transport: Transport.Http,
+                                target: server.id,
+                                payload: data,
+                            } as TransportMessage,
+                        });
+                    } else if (response.status === 204) {
+                        console.log('No new messages from the server.');
+                    } else {
+                        handleServerConnectionError(active, observer, server)
+                        throw new Error(`Unexpected response status: ${response.status}`);
+                    }
+                } catch (error: unknown) {
+                    handleServerConnectionError(active, observer, server)
+                    // Error handling with server disconnect notification
+                    let errorMessage: string;
+
+                    if (axios.isAxiosError(error)) {
+                        if (error.response) {
+                            errorMessage = `Server returned status ${error.response.status}: ${error.response.statusText}`;
+                        } else if (error.code === 'ECONNABORTED') {
+                            errorMessage = 'Request timed out.';
+                        } else {
+                            errorMessage = error.message || 'An Axios error occurred.';
+                        }
+                    } else if (error instanceof Error) {
+                        errorMessage = error.message;
+                    } else {
+                        errorMessage = 'An unknown error occurred during polling.';
+                    }
 
-                if (error instanceof Error) {
-                    // Error is of type Error
-                    errorMessage = error.message;
-                    console.error(`Polling error: ${errorMessage}`);
-                } else if (typeof error === 'string') {
-                    // Error is a string
-                    errorMessage = error;
                     console.error(`Polling error: ${errorMessage}`);
-                } else {
-                    // Fallback for unknown types
-                    errorMessage = 'An unknown error occurred during polling.';
-                    console.error(errorMessage);
+                    // observer.error(new Error(errorMessage)); // Notify subscribers of the error
+                    break; // Stop polling on error
                 }
-
-                retryCount++;
-                if (retryCount > 10) {
-                    console.error('Max retry attempts reached. Stopping polling.');
-                    observer.error(new Error(errorMessage)); // Notify observer of final failure
-                    return;
-                }
-
-                console.log(`Retrying (${retryCount})...`);
-                await new Promise((resolve) => setTimeout(resolve, retryCount * 1000)); // Exponential backoff
             }
-
-            setTimeout(() => longPoll(retryCount), 0);
         };
 
         longPoll();
 
+
+
+        // Cleanup logic for unsubscribing
         return () => {
             console.log('Unsubscribed from the long-polling channel.');
-            observer.next({
-                id: uuidv4(),
-                event: 'Server Disconnected',
-                data: {
-                    clientId: server.id,
-                    message: '',
-                    payload: {
-                        time: new Date()
-                    }
-                } as EventMessage
-            })
-            server.connectionState.next(`OFFLINE`);
+            observer.complete(); // Notify completion
         };
     });
 }
 
+function handleServerConnectionError(active: boolean, observer: Observer<TransportEvent>, server: ConnectedHttpServer): void {
+    console.log('Server lost connection');
+    active = false; // Stop polling
+    observer.next({
+        id: uuidv4(),
+        event: 'Server Disconnected',
+        data: {
+            clientId: server.id,
+            message: '',
+            payload: {
+                time: new Date(),
+            },
+        } as EventMessage,
+    });
+    server.connectionState.next('OFFLINE');
+}
+
+
 
-async function updateProfileAndPublishEvent(clientName: string | undefined, receiverProfileInfo: ConnectedHttpServer, profile: { name: string, message: any }, event: Subject<TransportEvent>, connectedHttpServers: ConnectedHttpServer[]): Promise<ConnectedHttpServer> {
+
+async function updateProfileAndPublishEvent(receiverProfileInfo: ConnectedHttpServer | undefined, profile: { name: string, message: any }, event: Subject<TransportEvent>, connectedHttpServers: ConnectedHttpServer[]): Promise<ConnectedHttpServer> {
     return new Promise((resolve, reject) => {
         console.log(`Assigned client Name: ${(profile.message as ConnectedHttpServer).id}`)
         receiverProfileInfo = profile.message as ConnectedHttpServer
-        writeFile(profile.message, (profile.message as ConnectedHttpServer).id).then(() => {
-            clientName = receiverProfileInfo.id
+        writeFile(profile.message).then(() => {
             event.next({
                 id: uuidv4(),
                 event: `New Server`,
@@ -208,7 +216,7 @@ async function postAxiosRequest(url: string, data: any): Promise<any> {
             resolve(response.data)
         } catch (error) {
             if (axios.isAxiosError(error)) {
-                console.error('Axios Error:', error.message);
+                console.error('Axios Error:', error.code);
             } else {
                 console.error('Unexpected Error:', error);
             }
@@ -217,6 +225,7 @@ async function postAxiosRequest(url: string, data: any): Promise<any> {
     })
 }
 
+
 export function handleHttpClient(clientInfo: ConnectedHttpClient, connectedClientHttp: ConnectedHttpClient[]): Observable<TransportEvent> {
     return new Observable((event: Observer<TransportEvent>) => {
         clientInfo.instance.post('/profile', (req, res) => {
@@ -376,50 +385,67 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
 
     app.get('/poll', (req, res) => {
         console.log('Client connected for long polling.');
-        eventListener.next({
-            id: uuidv4(),
-            event: 'Client Connected',
-            data: {
-                clientId: client.id,
-                message: '',
-                payload: {
-                    time: new Date()
-                }
-            } as EventMessage
-        })
+        client.connectionState.next('ONLINE');
+
+        // Flag to track if the response has been sent
+        let responseSent = false;
 
         // Subscribe to the data stream
-        const subscription: Subscription = client.responseStream.asObservable().subscribe({
+        const subscription = client.responseStream.asObservable().subscribe({
             next: (message: WrappedMessage) => {
-                console.log(`Sending data to client: ${message}`);
-                res.json({ message }); // Send the data to the client
-                subscription.unsubscribe(); // End the current request
+                if (!responseSent) {
+                    console.log(`Sending data to client: ${JSON.stringify(message)}`);
+                    res.json({ message }); // Send the data to the client
+                    responseSent = true; // Mark response as sent
+                    subscription.unsubscribe(); // Unsubscribe to close this request
+                }
             },
             error: (err) => {
-                console.error('Error in data stream:', err);
-                res.status(500).send('Internal Server Error');
+                if (!responseSent) {
+                    console.error('Error in data stream:', err);
+                    res.status(500).send('Internal Server Error');
+                    responseSent = true; // Mark response as sent
+                }
+                subscription.unsubscribe(); // Ensure cleanup
             },
             complete: () => {
-                console.log('Data stream completed.');
-                res.status(204).send(); // No Content
+                if (!responseSent) {
+                    console.log('Data stream completed.');
+                    res.status(204).send(); // No Content
+                    responseSent = true; // Mark response as sent
+                }
+                subscription.unsubscribe(); // Ensure cleanup
             },
         });
 
+        // Timeout if no data is emitted within a specified duration
+        const timeout = setTimeout(() => {
+            if (!responseSent) {
+                console.log('No data emitted. Sending timeout response.');
+                res.status(204).send(); // No Content
+                responseSent = true; // Mark response as sent
+                subscription.unsubscribe(); // Ensure cleanup
+            }
+        }, 15000); // 15 seconds timeout (adjust as needed)
+
         // Handle client disconnection
         res.on('close', () => {
-            console.error(`Http Client ${client.id} disconnected`);
-            eventListener.next({
-                id: uuidv4(),
-                event: 'Client Disconnected',
-                data: {
-                    clientId: client.id,
-                    message: '',
-                    payload: {
-                        time: new Date()
-                    }
-                } as EventMessage
-            })
-            subscription.unsubscribe(); // Ensure cleanup
+            if (!responseSent) {
+                console.error(`Http Client ${client.id} disconnected`);
+                eventListener.next({
+                    id: uuidv4(),
+                    event: 'Client Disconnected',
+                    data: {
+                        clientId: client.id,
+                        payload: {
+                            time: new Date()
+                        }
+                    } as EventMessage
+                } as TransportEvent)
+                client.connectionState.next(`OFFLINE`)
+                subscription.unsubscribe(); // Ensure cleanup
+            }
+            clearTimeout(timeout); // Clear timeout to avoid unnecessary execution
         });
     });
 
@@ -446,21 +472,20 @@ export async function checkOwnClientInfo(filename?: string): Promise<ConnectedHt
 
             } catch (err) {
                 // Handle parsing errors or other file-related errors
-                console.error("Error reading or parsing file:", err);
-                reject('');
+                let errMsg: string = ("Error reading or parsing file: " + err)
+                reject(errMsg);
             }
         } else {
-            console.error("File does not exist");
-            reject('');
+            reject("File does not exist");
         }
     })
 }
 
 // Specifically to write receiver profile information
-export async function writeFile(data: ConnectedHttpServer, filename: string): Promise<boolean> {
+export async function writeFile(data: { id: string }): Promise<boolean> {
     return new Promise((resolve, reject) => {
         // Write JSON data to a file
-        fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => {
+        fs.writeFile(`${data.id}.json`, JSON.stringify(data, null, 2), (err) => {
             if (err) {
                 console.error('Error writing file', err);
                 reject(false)

+ 21 - 23
src/utils/socket.utils.ts

@@ -59,16 +59,14 @@ export async function startClientSocketConnection(serverUrl: string): Promise<Cl
 // After establishing connection to the server, set up the credentials, confirm whether or not if there's any credentials, if not ask for one from the server
 export function handleClientSocketConnection(socket: ClientSocket, serversConnected: ConnectedServerSocket[]): Observable<TransportEvent> {
     return new Observable((eventNotification: Observer<TransportEvent>) => {
-        let clientName!: string
         let buffer: any[] = []
         let receiverProfileInfo!: ConnectedServerSocket
 
         // Listen for a connection event
         socket.on('connect', () => {
             console.log('Connected to the server:', socket.id)
-            if (clientName) {
-                checkOwnClientInfo(clientName).then((profile: ConnectedServerSocket) => {
-                    receiverProfileInfo = profile
+            if (receiverProfileInfo?.id) {
+                checkOwnClientInfo(receiverProfileInfo.id).then((profile: { id: string }) => {
                     socket.emit('profile', {
                         name: 'Old Client',
                         data: profile
@@ -113,10 +111,15 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
         socket.on('profile', (data: { name: string, message: any }) => {
             console.log(data)
             if (data.name == 'New Profile') {
-                console.log(`Assigned client Name: ${(data.message as ConnectedServerSocket).id}`)
-                receiverProfileInfo = data.message as ConnectedServerSocket
+                console.log(`Assigned client Name: ${data.message.id}`)
+                // Update websocket instance record
+                receiverProfileInfo = {
+                    id: data.message.id,
+                    dateCreated: new Date(),
+                    socketInstance: socket,
+                    connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
+                }
                 writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
-                    clientName = receiverProfileInfo.id
                     // broadcast event to allow retransmission to release buffer
                     eventNotification.next({
                         id: uuidv4(),
@@ -127,18 +130,20 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                         } as EventMessage
                     })
                 }).catch((error) => { }) // do nothing at the moment. 
-                // Update websocket instance record
-                receiverProfileInfo = {
-                    id: (data.message as ConnectedServerSocket).id,
-                    dateCreated: new Date(),
-                    socketInstance: socket,
-                    connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`)
-                }
                 serversConnected.push(receiverProfileInfo)
             }
             if (data.name == 'Adjusted Profile') {
                 console.log(`Assigned client Name: ${(data.message as ConnectedServerSocket).id}`)
-                receiverProfileInfo = data.message as ConnectedServerSocket
+                // Update websocket instance record
+                let clientObj: ConnectedServerSocket | undefined = serversConnected.find(obj => obj.id === data.message.id)
+                if (clientObj) {
+                    receiverProfileInfo.id = (data.message.id)
+
+                    clientObj.id = receiverProfileInfo.id
+                    clientObj.socketInstance = socket
+                    clientObj.connectionState.next('ONLINE')
+                    console.log(`Just to make sure they are pointed accurately:`, `This should be ONLINE: ${receiverProfileInfo.connectionState.getValue()}`, `Id match? ${receiverProfileInfo.id == clientObj.id ? true : false}`)
+                }
                 writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
                     // broadcast event to allow retransmission to release buffer
                     eventNotification.next({
@@ -150,13 +155,6 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                         } as EventMessage
                     })
                 }).catch((error) => { }) // do nothing at the moment. 
-                // Update websocket instance record
-                let clientObj: ConnectedServerSocket | undefined = serversConnected.find(obj => obj.id === (data.message as ConnectedServerSocket).id)
-                if (clientObj) {
-                    receiverProfileInfo.connectionState = clientObj.connectionState
-                    clientObj.socketInstance = receiverProfileInfo.socketInstance
-                    clientObj.connectionState.next('ONLINE')
-                }
             }
             if (data.name == 'Error') {
                 console.log(`Server cannot find credentials`, data.message)
@@ -332,7 +330,7 @@ export async function checkIfClientExists(id: string, filePath: string = 'client
 
 
 // Check if filename exists. Return profile information if there's any
-export async function checkOwnClientInfo(filename?: string): Promise<ConnectedServerSocket> {
+export async function checkOwnClientInfo(filename?: string): Promise<{ id: string }> {
     return new Promise((resolve, reject) => {
         // Check if the file exists
         if (fs.existsSync(`${filename}.json`)) {