Explorar o código

some fixes and adjustments on http utils

Dr-Swopt hai 4 semanas
pai
achega
b3460aa262

+ 2 - 2
.env

@@ -1,5 +1,5 @@
 ;Transport = "Websocket, Http"
 ;PORT = 3000, 3001
-Transport = "Websocket"
-# Transport = "Http"
+# Transport = "Websocket"
+Transport = "Http"
 PORT = 3000

+ 1 - 1
dist/config/config.json

@@ -1,5 +1,5 @@
 {
     "connection": {
-        "transmitter": "http://localhost:3001/"
+        "transmitter": "http://localhost:3000/"
     }
 }

+ 5 - 0
doc/explanation.txt

@@ -66,3 +66,8 @@ OF course, ample efforts to use request-response adapter will be allocated to fu
 be preserveed for consistencies.
 -So going forward, after the UML draft is done, continue with the http development, so that I can test out multiple use of transport services. Of course, I can
 probably also use 2 socket, but it just feels wrong.
+
+DevLog:
+As of 12/12/2024: 
+-Fixed the axios error by cutting out the timeout param, but it also expose other problems, in that the reconnection logic needs to be fixed, because it always
+reconnect under new profile?? have to check again.

+ 3 - 3
logSetting.json

@@ -1,12 +1,12 @@
 {
-    "base": true,
+    "base": false,
     "managers": true,
     "transmission": true,
     "adapter": true,
-    "transport": false,
+    "transport": true,
     "error": true,
     "util": true,
     "details": false,
-    "location": true,
+    "location": false,
     "retransmission": true
 }

+ 1 - 1
src/config/config.json

@@ -1,5 +1,5 @@
 {
     "connection": {
-        "transmitter": "http://localhost:3001/"
+        "transmitter": "http://localhost:3000/"
     }
 }

+ 190 - 4
src/test/proxy.ts

@@ -1,14 +1,16 @@
 import { Socket as ClientSocket, io } from 'socket.io-client'
 import { Server, Socket as SocketForConnectedClient } from "socket.io"
-import { Subject } from "rxjs";
-import { createServer } from "http";
+import { Observable, Subject } from "rxjs";
+import { createServer, request as httpRequest } from "http";
+import http, { IncomingMessage, ServerResponse } from 'http';
 
 let fromServer = new Subject<{ event: 'profile' | 'message', payload: any }>()
 let toServer = new Subject<{ event: 'profile' | 'message', payload: any }>()
 
-startSocketServer(3001)
+// startSocketServer(3001)
 // startSocketServer(3002)
-startClientSocketConnection('http://localhost:3000')
+// startClientSocketConnection('http://localhost:3000')
+startHttpServer(3001, 'http://localhost:3000/response')
 consoleLog()
 
 function consoleLog(): void {
@@ -82,3 +84,187 @@ function startClientSocketConnection(serverUrl: string): void {
     })
 }
 
+
+function startHttpServer(port: number, clientUrl: string): void {
+    // Create an observable from the client connection
+    const clientObservable = startHttpClientConnection(clientUrl);
+
+    const server = http.createServer(async (req: IncomingMessage, res: ServerResponse) => {
+        const { method, url } = req;
+
+        if (method === 'POST' && url === '/profile') {
+            // Forward the /profile request to another server (e.g., localhost:3000)
+            let body = '';
+            req.on('data', chunk => (body += chunk));
+            req.on('end', () => {
+                const options = {
+                    hostname: 'localhost',
+                    port: 3000,
+                    path: '/profile',
+                    method: 'POST',
+                    headers: {
+                        'Content-Type': 'application/json',
+                        'Content-Length': Buffer.byteLength(body),
+                    },
+                };
+
+                const forwardReq = http.request(options, forwardRes => {
+                    let forwardBody = '';
+                    forwardRes.on('data', chunk => (forwardBody += chunk));
+                    forwardRes.on('end', () => {
+                        res.writeHead(forwardRes.statusCode || 200, { 'Content-Type': 'application/json' });
+                        res.end(forwardBody); // Send back the forwarded response
+                    });
+                });
+
+                forwardReq.on('error', error => {
+                    console.error('Error forwarding /profile request:', error);
+                    res.writeHead(500, { 'Content-Type': 'application/json' });
+                    res.end(JSON.stringify({ error: 'Failed to forward request' }));
+                });
+
+                forwardReq.write(body); // Send the original request body
+                forwardReq.end();
+            });
+        } else if (method === 'POST' && url === '/response') {
+            // Handle long-polling using the observable
+            res.writeHead(200, { 'Content-Type': 'application/json' });
+
+            const subscription = clientObservable.subscribe({
+                next: data => {
+                    res.write(JSON.stringify(data)); // Send data to the client
+                    res.end(); // Close the response for this poll
+                    subscription.unsubscribe(); // Unsubscribe after sending one response
+                },
+                error: error => {
+                    console.error('Error in observable:', error);
+                    res.write(JSON.stringify({ error: 'Error occurred' }));
+                    res.end();
+                },
+            });
+
+            // Cleanup subscription if client disconnects
+            req.on('close', () => {
+                subscription.unsubscribe();
+            });
+        } else {
+            // Default 404 handler
+            res.writeHead(404, { 'Content-Type': 'application/json' });
+            res.end(JSON.stringify({ error: 'Not Found' }));
+        }
+    });
+
+    server.listen(port, () => {
+        console.log(`Server is running on http://localhost:${port}`);
+    });
+};
+
+
+/**
+ * Starts a long-polling HTTP connection to the given URL and channels responses into an Observable.
+ * @param url - The URL to send the long-polling requests to.
+ * @returns Observable<any> - An observable emitting the response data for each poll.
+ */
+function startHttpClientConnection(url: string): Observable<any> {
+    return new Observable(subscriber => {
+        const poll = () => {
+            const options = new URL(url);
+
+            const req = http.request(
+                {
+                    hostname: options.hostname,
+                    port: options.port,
+                    path: options.pathname + options.search,
+                    method: 'GET',
+                },
+                res => {
+                    let body = '';
+
+                    // Accumulate data chunks
+                    res.on('data', chunk => {
+                        body += chunk;
+                    });
+
+                    // On response end, emit the data and start the next poll
+                    res.on('end', () => {
+                        try {
+                            const parsedData = JSON.parse(body);
+                            subscriber.next(parsedData);
+                            poll(); // Start the next poll
+                        } catch (error: any) {
+                            subscriber.error('Error parsing response: ' + error.message);
+                        }
+                    });
+                }
+            );
+
+            // Handle request errors
+            req.on('error', error => {
+                subscriber.error('Request failed: ' + error.message);
+            });
+
+            req.end(); // Send the request
+        };
+
+        poll(); // Start the polling loop
+
+        // Cleanup logic: teardown observable if unsubscribed
+        return () => {
+            console.log('Stopping long-polling connection.');
+        };
+    });
+}
+
+
+
+// Utility function to parse JSON body
+const parseBody = (req: IncomingMessage): Promise<any> => {
+    return new Promise((resolve, reject) => {
+        let body = '';
+        req.on('data', chunk => {
+            body += chunk;
+        });
+        req.on('end', () => {
+            try {
+                resolve(JSON.parse(body));
+            } catch (error) {
+                reject(error);
+            }
+        });
+        req.on('error', reject);
+    });
+};
+
+// Function to forward request to another server and get the response
+const forwardRequest = (data: any): Promise<any> => {
+    return new Promise((resolve, reject) => {
+        const options = {
+            hostname: 'localhost',
+            port: 3000,
+            path: '/profile',
+            method: 'POST',
+            headers: {
+                'Content-Type': 'application/json',
+                'Content-Length': Buffer.byteLength(JSON.stringify(data)),
+            },
+        };
+
+        const req = httpRequest(options, res => {
+            let responseBody = '';
+            res.on('data', chunk => {
+                responseBody += chunk;
+            });
+            res.on('end', () => {
+                try {
+                    resolve(JSON.parse(responseBody));
+                } catch (error) {
+                    reject(error);
+                }
+            });
+        });
+
+        req.on('error', reject);
+        req.write(JSON.stringify(data));
+        req.end();
+    });
+};

+ 0 - 2
src/test/receiver.ts

@@ -25,9 +25,7 @@ class Supervisor {
 
             this.handleActivity(transmissionSet)
             this.outgoingPipe.subscribe(message => transmissionSet.transmitter.emit(message))
-
         })
-
     }
 
     // only called once for each connected clients.

+ 5 - 3
src/transmission/msg.transmission.transmitter.ts

@@ -47,13 +47,14 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
             distinctUntilChanged()
         ).subscribe((signal: ConnectionState) => {
             this.connectionStateEvent.next(signal)
-            if(signal == 'OFFLINE') this.console.error({message: `${this.transmitterProfile.id} disconnected`})
-            if(signal == 'ONLINE') this.console.log({message: `${this.transmitterProfile.id} connected`})
+            if (signal == 'OFFLINE') this.console.error({ message: `${this.transmitterProfile.id} disconnected` })
+            if (signal == 'ONLINE') this.console.log({ message: `${this.transmitterProfile.id} connected` })
         })
         this.retransmission.implementRetransmission(this.messageToBeTransmitted, this.connectionStateEvent.asObservable(), true)
         // automatically subscribe to allow released bffered messages to be released
         this.retransmission.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
             // need to work with wrapped messages
+            this.console.log({ message: `Releasing ${bufferedMessage.thisMessageID}` });
             (this.mainAdapter as TransmitterAdapter).emit(bufferedMessage)
         })
     }
@@ -63,7 +64,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     }
 
     emit(message: FisMessage): void {
-        this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting ${message.header.messageID}` : `Buffering ${message.header.messageID}`}` })
+        this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting message` : `Buffering message`}` })
         this.messageToBeTransmitted.next(message)
     }
 
@@ -77,6 +78,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
             filter(event => event.event == 'Re-Flush'),
             filter(event => (event.data as EventMessage).clientId == this.transmitterProfile.id),
         ).subscribe((event: TransportEvent) => {
+            this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting ${(((event.data as EventMessage).payload as TransportMessage).payload as WrappedMessage).thisMessageID}` : `Buffering ${(((event.data as EventMessage).payload as TransportMessage).payload as WrappedMessage).thisMessageID}`}` })
             this.messageToBeTransmitted.next((((event.data as EventMessage).payload as TransportMessage).payload as WrappedMessage))
         })
     }

+ 35 - 29
src/transport/http.ts

@@ -8,9 +8,10 @@ import { WrappedMessage } from '../utils/message.ordering';
 import { error } from 'console';
 import axios, { AxiosError } from 'axios';
 import { EventMessage } from '../interface/transport.interface';
+import ConsoleLogger from '../utils/log.utils';
 
 export class HttpTransportService implements TransportService {
-    private retryLogicStarted: boolean = false
+    private console: ConsoleLogger = new ConsoleLogger(`HttpTransportService`, ['transport'])
     private baseUrl!: string;
     private info: Transport = Transport.Http
     private connectedHttpServer: ConnectedHttpServer[] = [] // to allow the possibility of having to communicate with multiple servers as a client
@@ -34,15 +35,14 @@ export class HttpTransportService implements TransportService {
         if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
             clientObj.responseStream.next(message.payload as WrappedMessage)
         }
-        // for client usage
-        if (serverObj && serverObj.connectionState.getValue() == 'ONLINE') {
-            axios.post(`${this.baseUrl}message`, message.payload, {
-                headers: { 'Content-Type': 'application/json' },
-            })
-                .then((response) => {
-                    console.log('Response From Server:', response.status);
-                })
-                .catch((error: AxiosError) => {
+        // for client usage 
+        if (serverObj) {
+            if (serverObj.connectionState.getValue() == 'ONLINE') {
+                axios.post(`${this.baseUrl}message`, message.payload, {
+                    headers: { 'Content-Type': 'application/json' },
+                }).then((response) => {
+                    this.console.log({ message: `Response From Server: ${response.data}`, details: response.status });
+                }).catch((error: AxiosError) => {
                     console.error('HTTP emit error:', error.code);
                     this.transportEvent.next({
                         id: uuidv4(),
@@ -53,6 +53,17 @@ export class HttpTransportService implements TransportService {
                         } as EventMessage
                     } as TransportEvent)
                 });
+            } else {
+                this.console.error({ message: `Target Server is offline: Reflusing message ${(message.payload as WrappedMessage).thisMessageID}` });
+                this.transportEvent.next({
+                    id: uuidv4(),
+                    event: 'Re-Flush',
+                    data: {
+                        clientId: serverObj.id,
+                        payload: message
+                    } as EventMessage
+                } as TransportEvent)
+            }
         }
 
     }
@@ -70,34 +81,29 @@ export class HttpTransportService implements TransportService {
                     complete: () => (`Client ${client.id} disconnected...`)
                 })
             },
-            error: error => console.error(error),
-            complete: () => console.log(`...`)
+            error: error => this.console.error({ message: 'Observer Error', details: error }),
+            complete: () => this.console.log({ message: '...' })
         })
     }
 
     public startClient(url: string, receiverProfileInfo?: ConnectedHttpServer | undefined): void {
         initiateClientToServer(url, this.transportEvent, this.connectedHttpServer, receiverProfileInfo).then((connectedHttpServer: ConnectedHttpServer) => {
-            handleClientHttpConnection(url, connectedHttpServer).subscribe(this.transportEvent)
-            if (!this.retryLogicStarted) this.retryConnection(url) // standby reconnection logic
-        }).catch((error: AxiosError) => {
-            console.error(`HttpTransport ERROR:`, error.code)
+            handleClientHttpConnection(url, connectedHttpServer).subscribe({
+                next: event => this.transportEvent.next(event),
+                error: error => this.console.log({ message: `Observer Error`, details: error }),
+                complete: () => {
+                    this.console.log({ message: `Re-connecting with server` })
+                    this.startClient(url, connectedHttpServer)
+                }
+            })
+        }).catch((error: { error: AxiosError, objRef: ConnectedHttpServer | undefined }) => {
+            this.console.error({ message: `HttpTransport ERROR: <InitiateClientToServerFailure>`, details: error.error.code })
             setTimeout(() => {
-                console.info(`Reconnecting to server....`)
-                this.startClient(url)
-            }, 5000); // Retry with delay
+                this.startClient(url, error.objRef)
+            }, 3000); // Retry with delay
         })
     }
 
-    private retryConnection(url: string): void {
-        this.transportEvent.pipe(
-            filter(event => event.event === 'Server Disconnected'),
-            take(1) // Automatically unsubscribe after one emission
-        ).subscribe((event: TransportEvent) => {
-            console.log(`Server activity detected: Re-connect-ing to server ${(event.data as EventMessage).clientId}...`)
-            this.startClient(url, this.connectedHttpServer.find(obj => obj.id === (event.data as EventMessage).clientId)); // Restart the client
-        });
-    }
-
 }
 
 

+ 93 - 72
src/utils/http.utils.ts

@@ -8,7 +8,8 @@ import { ConnectionState, Transport, TransportEvent, TransportMessage } from '..
 import { EventMessage, FisMessage } from '../interface/transport.interface';
 import { WrappedMessage } from './message.ordering';
 import axios, { AxiosError, AxiosResponse } from 'axios';
-import { error } from 'console';
+import ConsoleLogger from './log.utils';
+const console: ConsoleLogger = new ConsoleLogger(`HttpUtils`, ['transport'])
 
 export function startHttpServer(port: number): Observable<ConnectedHttpClient> {
     return new Observable((observer: Observer<ConnectedHttpClient>) => {
@@ -18,7 +19,7 @@ export function startHttpServer(port: number): Observable<ConnectedHttpClient> {
         app.use(express.json());
 
         app.listen(port, () => {
-            console.log(`Server running at http://localhost:${port}`);
+            console.log({ message: `Server running at http://localhost:${port}` });
         });
 
         observer.next({
@@ -36,34 +37,32 @@ export async function initiateClientToServer(url: string, event: Subject<Transpo
             // logic here for using browser fetch
         } else { // axios methods
             if (receiverProfileInfo) {
-                console.log(`Is Old profile, reconnecting with server`)
+                console.log({ message: `Is Old profile, reconnecting with server` })
                 checkOwnClientInfo(receiverProfileInfo.id).then((profile: ConnectedHttpServer) => {
                     receiverProfileInfo!.id = profile.id
-                    console.log(`jsonfile.`, profile)
-                    postAxiosRequest(url + '/profile', { name: 'Old Client', data: profile }).then((profileInfo: { name: string, message: { id: string } }) => {
-                        writeFile(profileInfo.message).then((data: any) => {
-                            console.log(`Assigned new client Id: ${(data.message as ConnectedHttpServer).id}`)
-                            receiverProfileInfo = data.message as ConnectedHttpServer
-                            writeFile(data.message).then(() => {
-                                event.next({
-                                    id: uuidv4(),
-                                    event: 'Server Connected',
-                                    data: {
-                                        clientId: (data.message as ConnectedHttpServer).id,
-                                        message: `Existing Http Channel ${(data.message as ConnectedHttpServer).id} re-established.`
-                                    } as EventMessage
-                                })
-                            })
-                            // Update Http instance record
-                            let clientObj: ConnectedHttpServer | undefined = connectedHttpServers.find(obj => obj.id === (data.message as ConnectedHttpServer).id)
-                            if (clientObj) {
-                                receiverProfileInfo.connectionState = clientObj.connectionState
-                                clientObj.connectionState.next('ONLINE')
-                                resolve(clientObj)
-                            }
+                    // console.log({ message: 'jsonfile', details: profile })
+                    postAxiosRequest(url + '/profile', { name: 'Old Client', message: profile }).then((profileInfo: { name: string, message: { id: string } }) => {
+                        console.log({ message: `Acknowledged as previous client. Id: ${profileInfo.message.id}` })
+                        event.next({
+                            id: uuidv4(),
+                            event: 'Server Connected',
+                            data: {
+                                clientId: profileInfo.message.id,
+                                message: `Existing Http Channel ${profileInfo.message.id} re-established.`
+                            } as EventMessage
                         })
+                        // Update Http instance record
+                        let clientObj: ConnectedHttpServer | undefined = connectedHttpServers.find(obj => obj.id === profileInfo.message.id)
+                        console.log({ message: 'ClientObj', details: clientObj })
+                        console.log({ message: 'ReceiverProfile', details: receiverProfileInfo })
+                        if (clientObj) {
+                            clientObj.connectionState.next('ONLINE')
+                            console.log({ message: receiverProfileInfo.connectionState.getValue() })
+                            resolve(clientObj)
+                        }
+
                     }).catch((error: AxiosError) => {
-                        reject(error)
+                        reject({ error: error, objRef: receiverProfileInfo })
                     })
                 }).catch((error) => {
                     console.error(error)
@@ -72,9 +71,9 @@ export async function initiateClientToServer(url: string, event: Subject<Transpo
                             resolve(receiverProfileInfo)
                         })
                     }).catch((error) => {
-                        reject(error)
+                        reject({ error: error, objRef: receiverProfileInfo })
                     })
-                    reject(error)
+                    reject({ error: error, objRef: receiverProfileInfo })
                 })
             } else {
                 postAxiosRequest(url + '/profile', { name: 'New Client', data: null }).then((profileInfo: { name: string, message: any }) => {
@@ -82,16 +81,16 @@ export async function initiateClientToServer(url: string, event: Subject<Transpo
                         resolve(receiverProfileInfo)
                     })
                 }).catch((error) => {
-                    reject(error)
+                    reject({ error: error, objRef: receiverProfileInfo })
                 })
             }
         }
     })
 }
 
-
+// For client usage
 export function handleClientHttpConnection(url: string, server: ConnectedHttpServer): Observable<TransportEvent> {
-    return new Observable((observer: Observer<TransportEvent>) => {
+    return new Observable((eventNotification: Observer<TransportEvent>) => {
         server.connectionState.next('ONLINE');
         let active: boolean = true; // Flag to control polling lifecycle
 
@@ -99,13 +98,15 @@ export function handleClientHttpConnection(url: string, server: ConnectedHttpSer
             while (active) {
                 try {
                     // Axios request with timeout
+                    // const response = await axios.get(`${url}/poll`); // removing the timeout temporarily. 
                     const response = await axios.get(`${url}/poll`, {
-                        timeout: 10000, // 10s timeout
+                        timeout: 3000, // 10s timeout this one will trigger error. That's why it keeps on throwing error
                     });
 
                     if (response.status === 200) {
-                        const data = response.data as WrappedMessage;
-                        observer.next({
+                        const data = response.data;
+                        console.log({ message: 'Long Poll Response', details: data })
+                        eventNotification.next({
                             id: uuidv4(),
                             event: 'New Message',
                             data: {
@@ -113,17 +114,19 @@ export function handleClientHttpConnection(url: string, server: ConnectedHttpSer
                                 dateCreated: new Date(),
                                 transport: Transport.Http,
                                 target: server.id,
-                                payload: data,
+                                payload: data.message,
                             } as TransportMessage,
                         });
                     } else if (response.status === 204) {
-                        console.log('No new messages from the server.');
+                        console.log({ message: 'No new messages from the server.' });
                     } else {
-                        handleServerConnectionError(active, observer, server)
+                        console.error({ message: `Unexpected response status: ${response.status}` })
+                        handleServerConnectionError(active, eventNotification, server)
                         throw new Error(`Unexpected response status: ${response.status}`);
                     }
                 } catch (error: unknown) {
-                    handleServerConnectionError(active, observer, server)
+                    console.error({ message: `Unknown Error.`, details: error }) // culprit is here
+                    handleServerConnectionError(active, eventNotification, server)
                     // Error handling with server disconnect notification
                     let errorMessage: string;
 
@@ -141,7 +144,7 @@ export function handleClientHttpConnection(url: string, server: ConnectedHttpSer
                         errorMessage = 'An unknown error occurred during polling.';
                     }
 
-                    console.error(`Polling error: ${errorMessage}`);
+                    console.error({ message: `Polling error: ${errorMessage}` });
                     // observer.error(new Error(errorMessage)); // Notify subscribers of the error
                     break; // Stop polling on error
                 }
@@ -150,18 +153,17 @@ export function handleClientHttpConnection(url: string, server: ConnectedHttpSer
 
         longPoll();
 
-
-
         // Cleanup logic for unsubscribing
         return () => {
-            console.log('Unsubscribed from the long-polling channel.');
-            observer.complete(); // Notify completion
+            console.log({ message: 'Unsubscribed from the long-polling channel.' });
+            eventNotification.complete(); // Notify completion
         };
     });
 }
 
 function handleServerConnectionError(active: boolean, observer: Observer<TransportEvent>, server: ConnectedHttpServer): void {
-    console.log('Server lost connection');
+    server.connectionState.next('OFFLINE');
+    console.log({ message: 'Server lost connection' });
     active = false; // Stop polling
     observer.next({
         id: uuidv4(),
@@ -171,18 +173,16 @@ function handleServerConnectionError(active: boolean, observer: Observer<Transpo
             message: '',
             payload: {
                 time: new Date(),
+                objRef: server
             },
         } as EventMessage,
     });
-    server.connectionState.next('OFFLINE');
+    observer.complete()
 }
 
-
-
-
 async function updateProfileAndPublishEvent(receiverProfileInfo: ConnectedHttpServer | undefined, profile: { name: string, message: any }, event: Subject<TransportEvent>, connectedHttpServers: ConnectedHttpServer[]): Promise<ConnectedHttpServer> {
     return new Promise((resolve, reject) => {
-        console.log(`Assigned client Name: ${(profile.message as ConnectedHttpServer).id}`)
+        console.log({ message: `Assigned client Name: ${(profile.message as ConnectedHttpServer).id}` })
         receiverProfileInfo = profile.message as ConnectedHttpServer
         writeFile(profile.message).then(() => {
             event.next({
@@ -193,6 +193,15 @@ async function updateProfileAndPublishEvent(receiverProfileInfo: ConnectedHttpSe
                     message: `New Http Channel ${(profile.message as ConnectedHttpServer).id} established.`
                 } as EventMessage
             })
+            // broadcast event to allow retransmission to relase buffered messages
+            event.next({
+                id: uuidv4(),
+                event: `Server Connected`,
+                data: {
+                    clientId: (profile.message as ConnectedHttpServer).id,
+                    message: `Server ${(profile.message as ConnectedHttpServer).id} connected and ready to go.`
+                } as EventMessage
+            })
         }).catch((error) => {
             reject(error)
         })
@@ -212,13 +221,13 @@ async function postAxiosRequest(url: string, data: any): Promise<any> {
     return new Promise(async (resolve, reject) => {
         try {
             const response: AxiosResponse<any> = await axios.post(url, data);
-            console.log('Response:', response.data);
+            console.log({ message: 'Response', details: response.data });
             resolve(response.data)
         } catch (error) {
             if (axios.isAxiosError(error)) {
-                console.error('Axios Error:', error.code);
+                console.error({ message: 'Axios Error:', details: error.code });
             } else {
-                console.error('Unexpected Error:', error);
+                console.error({ message: 'Unexpected Error:', details: error });
             }
             reject(error)
         }
@@ -263,7 +272,8 @@ function handleProfile(app: Express, data: { name: `Old Client` | `New Client`,
         connectedClientHttp.push(clientInstance)
         addClientToDB(clientInstance)
         startListeningAndStreaming(app, clientInstance, event)
-    } else {
+    } else if (data.name == 'Old Client') {
+        console.log({ message: `Is old client`, details: data })
         // update first
         let clientInstance: ConnectedHttpClient | undefined
         if (connectedClientHttp.length > 0) {
@@ -278,7 +288,7 @@ function handleProfile(app: Express, data: { name: `Old Client` | `New Client`,
         }
         function handleFoundClient(clientInstance: ConnectedHttpClient | undefined): void {
             if (clientInstance) {
-                console.log(`Http Client ${clientInstance.id} Found`)
+                console.log({ message: `Http Client ${clientInstance.id} Found` })
                 res.json({ name: 'Adjusted Profile', message: { id: clientInstance.id } })
                 // replace socket instance since the previous has been terminated
                 clientInstance.instance = app
@@ -287,7 +297,7 @@ function handleProfile(app: Express, data: { name: `Old Client` | `New Client`,
                     clientInstance.connectionState = new BehaviorSubject<ConnectionState>(`OFFLINE`)
                 }
                 // need to start listening again, because it's assigned a different socket instance this time round
-                startListeningAndStreaming(app, clientInstance, event)
+                startListeningAndStreaming(app, clientInstance, event, true)
                 event.next({
                     id: uuidv4(),
                     event: 'Client Connected',
@@ -299,7 +309,7 @@ function handleProfile(app: Express, data: { name: `Old Client` | `New Client`,
                 })
 
             } else {
-                console.log(`Profile Not Found`)
+                console.log({ message: `Profile Not Found` })
                 res.json({ name: 'Error', message: 'Receiver Profile Not found' })
             }
         }
@@ -313,7 +323,7 @@ export async function checkIfClientExists(id: string, filePath: string = 'client
         try {
             // Check if the file exists
             if (!fs.existsSync(filePath)) {
-                console.log("File does not exist.");
+                console.log({ message: "File does not exist." });
                 reject('File does not exist');
             }
 
@@ -325,14 +335,14 @@ export async function checkIfClientExists(id: string, filePath: string = 'client
             let obj = data.find(entry => entry.id === id);
 
             if (obj) {
-                console.log(`Client with ID ${id} exists.`);
+                console.log({ message: `Client with ID ${id} exists.` });
             } else {
-                console.log(`Client with ID ${id} does not exist.`);
+                console.log({ message: `Client with ID ${id} does not exist.` });
             }
 
             resolve(obj);
         } catch (error) {
-            console.error('Error reading the file:', error);
+            console.error({ message: 'Error reading the file:', details: error });
             reject(`Error reading the file`)
         }
     })
@@ -359,14 +369,14 @@ export function addClientToDB(entry: ConnectedHttpClient, filePath: string = 'cl
 
         // Write the updated array back to the file
         fs.writeFileSync(filePath, JSON.stringify(data, null, 2), 'utf-8');
-        console.log(`Entry added successfully.`);
+        console.log({ message: `Entry added successfully.` });
     } catch (error) {
-        console.error('Error writing to file:', error);
+        console.error({ message: 'Error writing to file:', details: error });
     }
 }
 
 // this is for server usage only
-export function startListeningAndStreaming(app: Express, client: ConnectedHttpClient, eventListener: Observer<TransportEvent>): void {
+export function startListeningAndStreaming(app: Express, client: ConnectedHttpClient, eventListener: Observer<TransportEvent>, oldClient?: boolean): void {
     /* Generally, we don't need this unless in the case of being the receiver */
     app.post('/message', (req, res) => {
         eventListener.next({
@@ -380,13 +390,24 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
                 payload: req.body
             } as TransportMessage
         })
-        res.json(`Received ${(req.body as FisMessage)?.header?.messageID ?? `Undefined`}`)
+        res.json(`Received ${((req.body as WrappedMessage)?.payload as FisMessage)?.header?.messageID ?? `Undefined`}`)
     })
 
     app.get('/poll', (req, res) => {
-        console.log('Client connected for long polling.');
+        console.log({ message: 'Client connected for long polling.' });
         client.connectionState.next('ONLINE');
 
+        // notify it's associated retransmission to start releaseing buffer
+        eventListener.next({
+            id: uuidv4(),
+            event: oldClient ? 'Client Re-connected' : `Client Connected`,
+            data: {
+                clientId: client.id,
+                message: `Socket Client ${oldClient ? `Re-Connected` : `Connected`}. Adapter ID assigned: ${client.id}`,
+                payload: client
+            } as EventMessage
+        })
+
         // Flag to track if the response has been sent
         let responseSent = false;
 
@@ -394,7 +415,7 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
         const subscription = client.responseStream.asObservable().subscribe({
             next: (message: WrappedMessage) => {
                 if (!responseSent) {
-                    console.log(`Sending data to client: ${JSON.stringify(message)}`);
+                    console.log({ message: `Sending data to client: ${JSON.stringify(message)}` });
                     res.json({ message }); // Send the data to the client
                     responseSent = true; // Mark response as sent
                     subscription.unsubscribe(); // Unsubscribe to close this request
@@ -402,7 +423,7 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
             },
             error: (err) => {
                 if (!responseSent) {
-                    console.error('Error in data stream:', err);
+                    console.error({ message: 'Error in data stream:', details: err });
                     res.status(500).send('Internal Server Error');
                     responseSent = true; // Mark response as sent
                 }
@@ -410,7 +431,7 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
             },
             complete: () => {
                 if (!responseSent) {
-                    console.log('Data stream completed.');
+                    console.log({ message: 'Data stream completed.' });
                     res.status(204).send(); // No Content
                     responseSent = true; // Mark response as sent
                 }
@@ -421,7 +442,7 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
         // Timeout if no data is emitted within a specified duration
         const timeout = setTimeout(() => {
             if (!responseSent) {
-                console.log('No data emitted. Sending timeout response.');
+                console.log({ message: 'No data emitted. Sending timeout response.' });
                 res.status(204).send(); // No Content
                 responseSent = true; // Mark response as sent
                 subscription.unsubscribe(); // Ensure cleanup
@@ -431,7 +452,7 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
         // Handle client disconnection
         res.on('close', () => {
             if (!responseSent) {
-                console.error(`Http Client ${client.id} disconnected`);
+                console.error({ message: `Http Client ${client.id} disconnected` });
                 eventListener.next({
                     id: uuidv4(),
                     event: 'Client Disconnected',
@@ -487,10 +508,10 @@ export async function writeFile(data: { id: string }): Promise<boolean> {
         // Write JSON data to a file
         fs.writeFile(`${data.id}.json`, JSON.stringify(data, null, 2), (err) => {
             if (err) {
-                console.error('Error writing file', err);
+                console.error({ message: 'Error writing file', details: err });
                 reject(false)
             } else {
-                console.log('File has been written');
+                console.log({ message: 'File has been written' });
                 resolve(true)
             }
         });

+ 2 - 7
src/utils/socket.utils.ts

@@ -147,16 +147,11 @@ export function handleClientSocketConnection(socket: SocketForConnectedServer, s
                 // Update websocket instance record
                 let clientObj: ConnectedSocketServer | undefined = serversConnected.find(obj => obj.id === data.message.id)
                 if (clientObj) {
-                    receiverProfileInfo.id = (data.message.id)
-
-                    clientObj.id = receiverProfileInfo.id
                     clientObj.socketInstance = socket
                     clientObj.connectionState.next('ONLINE')
                     console.log({
-                        message: `Just to make sure they are pointed accurately: This should be ONLINE: ${receiverProfileInfo.connectionState.getValue()} !! Id match? ${receiverProfileInfo.id == clientObj.id ? true : false}`,
+                        message: `Just to make sure they are pointed accurately: This should be ONLINE: ${receiverProfileInfo.connectionState.getValue()} !! Id match? ${receiverProfileInfo.id == clientObj.id ? true : false} && compare ${clientObj.id}`,
                     })
-                }
-                writeFile(data.message as ConnectedSocketServer, (data.message as ConnectedSocketServer).id).then(() => {
                     // broadcast event to allow retransmission to release buffer
                     eventNotification.next({
                         id: uuidv4(),
@@ -166,7 +161,7 @@ export function handleClientSocketConnection(socket: SocketForConnectedServer, s
                             message: `Existing Websocket Channel ${(data.message as ConnectedSocketServer).id} re-established.`
                         } as EventMessage
                     })
-                }).catch((error) => { }) // do nothing at the moment. 
+                }
             }
             if (data.name == 'Error') {
                 console.log({ message: `Server cannot find credentials`, details: data.message })