Prechádzať zdrojové kódy

Updated and tested with request response and addtional comments

enzo 2 týždňov pred
rodič
commit
9126cccdfe

+ 0 - 34
src/actor/transmission.actor.ts

@@ -1,34 +0,0 @@
-import { filter, map, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs"
-import { ActorBase } from "../base/actor.base"
-import { FisMessage } from "../interface/interface"
-import { ActorInterface, ActorMessage, ActorProfile, TransmisisonProfile } from "../interface/actor.sample"
-import { unsubscribe } from "diagnostics_channel"
-import ConsoleLogger from "../utils/log.utils"
-
-export class TransmissionActor extends ActorBase<ActorMessage<FisMessage>> {
-    private transmissionProfile!: TransmisisonProfile
-    private console: ConsoleLogger = new ConsoleLogger(`TransmissionActor`, [`base`])
-
-    constructor(actorParam: ActorProfile<TransmisisonProfile>) {
-        super()
-        this.setup(actorParam)
-    }
-
-    protected handleMessageTransmission(messageToBeTransmitted: Observable<ActorMessage<FisMessage>>): void {
-        messageToBeTransmitted.subscribe(message => {
-            this.transmissionProfile.transmitterService.emit(message.payload)
-        })
-    }
-
-    protected handleMessageReception(incomingBus: Subject<ActorMessage<FisMessage>>): void {
-        this.transmissionProfile.receiverService.getReceivables().pipe(
-            // filter()
-        )
-    }
-
-
-    protected setup(actorInfo: ActorProfile<TransmisisonProfile>): void {
-        this.actorProfile = actorInfo
-        this.transmissionProfile = actorInfo.data!
-    }
-}

+ 8 - 1
src/adapters/adapter.manager.ts

@@ -1,3 +1,9 @@
+/* Adapter Manager, whose responsibilities is to listen to transport event and instantiate all the adapers. 
+How it works: It will first connect and listen to any underlying transport protocol that is available as well as the one that will be instantiated dynamically perhaps at a later time.
+It then listens to the event there, particularly if a new server clients has been connected. It then creates the adapters based on that client and stores them in memory. It will also
+broadcast the aforementioned event, so that all those interested in the instantiated of a new adapter event can also has the reference for that adapter. For example, transmitter components
+wants to subscribe for transmitter adapter, so this manager will find through it's adapter's record and publish it to the transmitter, whilst also publish relevant event regarding the 
+instantiation of new adapters, so that the aforementioned transmitter components will have access to the adapter, and can begin it's transmission process.*/
 import { filter, map, Observable, Observer, Subject, Subscription } from "rxjs"
 import { v4 as uuidv4 } from 'uuid'
 import ConsoleLogger from "../utils/log.utils"
@@ -7,10 +13,10 @@ import { ReceiverAdapter } from "./adapter.receiver";
 import { AdapterManagerBase } from "../base/adapter.manager.base";
 
 export class AdapterManager extends AdapterManagerBase {
-    private console: ConsoleLogger = new ConsoleLogger(`AdapterManager`, ['managers'])
 
     constructor(generalEvent: Subject<GeneralEvent<any>>, browserEnv?: boolean) {
         super()
+        this.console = new ConsoleLogger(`AdapterManager`, ['managers'])
         this.browserEnv = browserEnv ?? false
         this.console.log({ message: `Contructing self... ${this.browserEnv ? `is receiving end` : `is not browser env`}` })
         this.event = new Subject<GeneralEvent<any>>()
@@ -73,6 +79,7 @@ export class AdapterManager extends AdapterManagerBase {
     }
 
     private connectToExistingTransport(generalEvent: Subject<GeneralEvent<any>>): void {
+        // start listening to global event specially for new transports or existing transport services
         generalEvent.pipe(
             filter(event => event.type === `General Event`),
             filter(event => event.event === `Available Transport` || event.event === `New Transport`)

+ 5 - 7
src/adapters/adapter.receiver.ts

@@ -1,30 +1,28 @@
 import dotenv from 'dotenv';
-import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Observer, Subscription, takeWhile } from 'rxjs';
+import { filter, Observable, Observer, Subscription } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
 import ConsoleLogger from '../utils/log.utils';
 import { WrappedMessage } from '../utils/message.ordering';
 import { AdapterBase } from '../base/adapter.base';
-import { ClientObject, ConnectionState, FisMessage, GeneralEvent, TransportType, TransportMessage, TransportServiceInterface, TransmissionRole } from '../interface/interface';
+import { FisMessage, GeneralEvent, TransportType, TransportMessage, TransportServiceInterface, TransmissionRole } from '../interface/interface';
 
 dotenv.config();
-/* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
-So how?: */ 
+
 export class ReceiverAdapter extends AdapterBase {
 
     constructor(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface, role: TransmissionRole) {
         super(clientId, adapterType, transportService, role)
         this.console = new ConsoleLogger(`${this.adapterProfile.transportType}ReceiverAdapter`, ['adapter'])
-        this.setupConnectionState(transportService)
 
         this.console.log({ message: `Contructing ReceiverAdapter for clientId: ${this.adapterProfile.clientId}` })
     }
 
     subscribeForIncoming(): Observable<GeneralEvent<any>> {
-        this.console.log({ message: `Connector getting message bus for this connector: ${this.adapterProfile.clientId}` })
+        this.console.log({ message: `Getting message bus for this connector: ${this.adapterProfile.clientId}` })
         return new Observable((observable: Observer<GeneralEvent<any>>) => {
             const subscription: Subscription = this.adapterProfile.transportService.subscribeForEvent().pipe(
                 filter(event => event.type === `Transport Event`),
-                filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).source === this.adapterProfile.clientId ),
+                filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).source === this.adapterProfile.clientId),
                 filter((message: GeneralEvent<any>) => message.event === 'New Message'),
             ).subscribe((message: GeneralEvent<TransportMessage>) => {
                 this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).source)}`, details: message })

+ 0 - 2
src/adapters/adapter.transmitter.ts

@@ -6,8 +6,6 @@ import { AdapterBase } from '../base/adapter.base';
 import { ClientObject, ConnectionState, FisMessage, TransportType, TransportMessage, TransportServiceInterface, TransmissionRole } from '../interface/interface';
 
 dotenv.config();
-/* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
-So how?: */
 export class TransmitterAdapter extends AdapterBase {
 
     constructor(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface, role: TransmissionRole) {

+ 5 - 4
src/base/adapter.base.ts

@@ -1,9 +1,8 @@
 import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Subject } from "rxjs";
-import dotenv from 'dotenv';
 import { AdapterInterface, ConnectionState, TransmissionRole, TransportType, TransportServiceInterface, AdapterProfile, ClientObject } from "../interface/interface";
 import ConsoleLogger from "../utils/log.utils";
+import { v4 as uuidv4 } from 'uuid'
 
-dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class AdapterBase implements AdapterInterface {
@@ -18,6 +17,7 @@ export class AdapterBase implements AdapterInterface {
 
     protected setAdapterProfile(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface, role: TransmissionRole): void {
         this.adapterProfile = {} as AdapterProfile
+        this.adapterProfile.id = uuidv4()
         this.adapterProfile.clientId = clientId as string
         this.adapterProfile.connectionState = new BehaviorSubject<ConnectionState>(`OFFLINE`)
         this.adapterProfile.role = role
@@ -25,9 +25,11 @@ export class AdapterBase implements AdapterInterface {
         this.adapterProfile.transportService = transportService
     }
 
-    getAdapterProfile(type?: `clientId` | `role` | `transportId` | `transportType` | `connectionState`): string | Observable<ConnectionState> | AdapterProfile | undefined {
+    getAdapterProfile(type?: `id` | `clientId` | `role` | `transportId` | `transportType` | `connectionState`): string | Observable<ConnectionState> | AdapterProfile | undefined {
         if (!type) {
             return this.adapterProfile
+        } else if (type == `id`) {
+            return this.adapterProfile.id
         } else if (type == `clientId`) {
             return this.adapterProfile.clientId
         } else if (type == `connectionState`) {
@@ -42,7 +44,6 @@ export class AdapterBase implements AdapterInterface {
 
     }
 
-    // this is irrelevant at this point in time. Adapter will just listen regardless of whether there's connection or not
     protected setupConnectionState(transportService: TransportServiceInterface): void {
         transportService.subscribeForEvent().pipe(
             filter(event => event.type === `Transport Event`),

+ 3 - 1
src/base/adapter.manager.base.ts

@@ -1,7 +1,9 @@
 import { Observable, Subject } from "rxjs"
 import { AdapterInterface, AdapterManagerInterface, GeneralEvent, TransmissionRole, TransportServiceInterface } from "../interface/interface"
+import ConsoleLogger from "../utils/log.utils"
 
 export class AdapterManagerBase implements AdapterManagerInterface {
+    protected console!: ConsoleLogger 
     protected browserEnv!: boolean
     protected event!: Subject<GeneralEvent<any>>
     protected transportServiceArray: TransportServiceInterface[] = []
@@ -11,7 +13,7 @@ export class AdapterManagerBase implements AdapterManagerInterface {
         // logic here
     }
 
-    subscribeForAdapters(selfId: string, receiverId: string, role: TransmissionRole): Observable<AdapterInterface> {
+    subscribeForAdapters(receiverId: string, role: TransmissionRole): Observable<AdapterInterface> {
         throw new Error("Method not implemented.")
     }
 

+ 3 - 1
src/base/msg.transmission.manager.base.ts

@@ -2,10 +2,12 @@
 import { Subject } from 'rxjs';
 import { GeneralEvent, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmissionInterface, MessageTransmissionManagerInterface, MessageTransmitterInterface, TransmissionProfile } from '../interface/interface';
 import { AdapterManager } from '../adapters/adapter.manager';
+import ConsoleLogger from '../utils/log.utils';
 
 export class MessageTransmissionManagerBase implements MessageTransmissionManagerInterface {
+    protected console!: ConsoleLogger
     protected browserEnv!: boolean
-    protected tranmissionRef: MessageTransmissionInterface[] = []
+    protected transmissionRef: MessageTransmissionInterface[] = []
     protected adapterManager!: AdapterManager
     protected event!: Subject<GeneralEvent<any>>
 

+ 2 - 1
src/interface/interface.ts

@@ -42,7 +42,7 @@ export interface MessageRequestResponseInterface extends MessageTransmissionInte
 
 /* ADAPTER COMPONENTS */
 export interface AdapterInterface {
-    getAdapterProfile(type?: `clientId` | `role` | `transportId` | `transportType` | `connectionState`): AdapterProfile | string | Observable<ConnectionState> | undefined
+    getAdapterProfile(type?: `id` | `clientId` | `role` | `transportId` | `transportType` | `connectionState`): AdapterProfile | string | Observable<ConnectionState> | undefined
 }
 
 export interface TransmitterAdapterInterface extends AdapterInterface {
@@ -113,6 +113,7 @@ export interface TransportSet {
 }
 
 export interface AdapterProfile {
+    id: string, 
     clientId: string,
     role: TransmissionRole,
     transportType: TransportType,

+ 122 - 0
src/test/messageProducer.ts

@@ -0,0 +1,122 @@
+/* This is the message producer class. Just a standard class to generate mock up data to emulate real time messages on the go.
+This is currently being used only in test cases for generating fake data to test the transmisison. Can configure the param to test
+the transmission speed and so on. */
+import { interval, map, Observable, Observer, Subject, take } from "rxjs"
+import { FisMessage } from "../interface/interface"
+import { v4 as uuidv4 } from 'uuid'
+import ConsoleLogger from "../utils/log.utils"
+
+//  This class is to emulate a message source that will be tranmitted to the target. Can be used to test request response as well
+export class MessageProducer {
+    private console = new ConsoleLogger('Message Producer', ['base'])
+    private generalNotification: Subject<FisMessage> = new Subject()
+    private incomingMessageBus!: Subject<FisMessage>
+    private outgoingMessageBus: Subject<FisMessage> = new Subject()
+
+    constructor(incomingMessageBus: Subject<FisMessage>) {
+        this.console.log({ message: `Constructing Message Producer` })
+        this.incomingMessageBus = incomingMessageBus
+
+        this.generateNotifcation().subscribe(this.generalNotification)
+        this.handleIncomingRequests(this.incomingMessageBus.asObservable(), this.outgoingMessageBus)
+    }
+
+    public getNotificationMessage(): Observable<FisMessage> {
+        return this.generalNotification.asObservable()
+    }
+
+    public getOutgoingMessages(): Observable<FisMessage> {
+        return this.outgoingMessageBus.asObservable()
+    }
+
+    // this is called no problem
+    private handleIncomingRequests(requests: Observable<FisMessage>, outgoingMessageBus: Subject<FisMessage>, amount?: number): void {
+        requests.subscribe((request: FisMessage) => {
+            this.console.log({ message: `Generating response for new request ${request.header.messageID}` })
+            this.generateMessage(request.header.messageID, amount ?? 10).subscribe({
+                next: message => outgoingMessageBus.next(message),
+                error: error => this.console.log({ message: 'observer Error', details: error }),
+                complete: () => {
+                    outgoingMessageBus.next({
+                        header: {
+                            messageID: request.header.messageID,
+                            messageName: `ResponseMessage`
+                        },
+                        data: `Complete`
+                    } as FisMessage)
+                }
+            })
+        })
+    }
+
+    private generateMessage(requestID: string, amount: number): Observable<FisMessage> {
+        return new Observable((response: Observer<FisMessage>) => {
+            const intervalMessageGeneration = interval(1000).pipe(
+                take(amount), // Ensures only 'amount' messages are generated
+                map(() => {
+                    const message: FisMessage = {
+                        header: {
+                            messageID: requestID,
+                            messageName: 'ResponseMessage'
+                        },
+                        data: `Data`
+                    };
+                    return message;
+                })
+            );
+
+            const subscription = intervalMessageGeneration.subscribe({
+                next: message => response.next(message),
+                error: error => response.error(error),
+                complete: () => {
+                    response.next({
+                        header: {
+                            messageID: requestID,
+                            messageName: 'ResponseMessage'
+                        },
+                        data: `Complete`
+                    });
+                    response.complete();
+                }
+            });
+
+            // Ensure cleanup on unsubscribe
+            return () => subscription.unsubscribe();
+        });
+    }
+
+    private generateNotifcation(): Observable<FisMessage> {
+        return new Observable((response: Observer<FisMessage>) => {
+            const intervalMessageGeneration = interval(1000).pipe(
+                map(() => {
+                    const message: FisMessage = {
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'NotificationMessage'
+                        },
+                        data: `Data`
+                    };
+                    return message;
+                })
+            );
+
+            const subscription = intervalMessageGeneration.subscribe({
+                next: message => response.next(message),
+                error: error => response.error(error),
+                complete: () => {
+                    response.next({
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'NotificationMessage'
+                        },
+                        data: `Complete`
+                    });
+                    response.complete();
+                }
+            });
+
+            // Ensure cleanup on unsubscribe
+            return () => subscription.unsubscribe();
+        });
+    }
+}

+ 3 - 147
src/test/proxy.ts

@@ -1,20 +1,16 @@
+/* This proxy is specifically developed for simulating disconection between the two party. The reason for this is that, I want
+to create a case where both Sender and Receiver are BOTH ALIVE, as in not server down, and it's only brief network disruption to
+ensure that the buffer system aka Offline Retransmission works just fine on BOTH SIDE!!!! */
 import { Socket as ClientSocket, io } from 'socket.io-client'
 import { Server, Socket as SocketForConnectedClient } from "socket.io"
 import { Observable, Observer, Subject } from "rxjs";
 import { createServer, request as httpRequest } from "http";
-import express, { Response } from 'express';
-import { Express } from 'express';
-import { postAxiosRequest } from '../utils/http.utils';
-import axios from 'axios';
 let fromServer = new Subject<{ event: 'handshaking' | 'message', payload: any }>()
 let toServer = new Subject<{ event: 'handshaking' | 'message', payload: any }>()
 
 startSocketServer(3001)
 // startSocketServer(3002)
 startClientSocketConnection('http://localhost:3000')
-// startHttpServer(3001).then((app: Express) => {
-//     operateHttpServer(app, 'http://localhost:3000/')
-// })
 consoleLog()
 
 function consoleLog(): void {
@@ -89,143 +85,3 @@ function startClientSocketConnection(serverUrl: string): void {
     })
 }
 
-async function startHttpServer(port: number): Promise<Express> {
-    return new Promise((resolve, reject) => {
-        let app: Express = express();
-        // Middleware to parse JSON requests
-        app.use(express.json());
-        app.listen(port, () => {
-            console.log({ message: `Server running at http://localhost:${port}` });
-        });
-        resolve(app)
-    })
-}
-
-function operateHttpServer(app: Express, url: string): void {
-    app.post('/profile', (req, res) => {
-        postAxiosRequest(url + `profile`, req.body).then((response) => {
-            res.json(response)
-        }).catch((error) => {
-            console.log(error)
-            res.json(error)
-        })
-    })
-
-    app.post('/message', (req, res) => {
-        postAxiosRequest(url + `message`, req.body).then((response) => {
-            console.log(response)
-            res.json(response)
-        }).catch((error) => {
-            console.log(error)
-            res.json(error)
-        })
-    })
-
-    app.get('/poll', (req, res) => {
-        console.log('Client connected for long polling.');
-        // Flag to track if the response has been sent
-        let responseSent = false;
-        // Subscribe to the data stream
-        const subscription = handleClientHttpConnection(url).subscribe({
-            next: (message: any) => {
-                if (!responseSent) {
-                    console.log(`Sending data to client: ${JSON.stringify(message)}`);
-                    res.json(message); // Send the data to the client
-                    responseSent = true; // Mark response as sent
-                    subscription.unsubscribe(); // Unsubscribe to close this request
-                }
-            },
-            error: (err: any) => {
-                if (!responseSent) {
-                    console.error('Error in data stream:');
-                    res.status(500).send('Internal Server Error');
-                    responseSent = true; // Mark response as sent
-                }
-                subscription.unsubscribe(); // Ensure cleanup
-            },
-            complete: () => {
-                if (!responseSent) {
-                    console.log('Data stream completed.');
-                    res.status(204).send(); // No Content
-                    responseSent = true; // Mark response as sent
-                }
-                subscription.unsubscribe(); // Ensure cleanup
-            },
-        });
-
-        // Timeout if no data is emitted within a specified duration
-        const timeout = setTimeout(() => {
-            if (!responseSent) {
-                console.log({ message: 'No data emitted. Sending timeout response.' });
-                res.status(204).send(); // No Content
-                responseSent = true; // Mark response as sent
-                subscription.unsubscribe(); // Ensure cleanup
-            }
-        }, 15000); // 15 seconds timeout (adjust as needed)
-
-        // Handle client disconnection
-        res.on('close', () => {
-            if (!responseSent) {
-                console.error(`Http Client disconnected`);
-
-                subscription.unsubscribe(); // Ensure cleanup
-            }
-            clearTimeout(timeout); // Clear timeout to avoid unnecessary execution
-        });
-    });
-}
-
-
-// For client usage
-export function handleClientHttpConnection(url: string): Observable<any> {
-    return new Observable((eventNotification: Observer<any>) => {
-        let active: boolean = true; // Flag to control polling lifecycle
-
-        const longPoll = async () => {
-            while (active) {
-                try {
-                    // Axios request with timeout
-                    const response = await axios.get(`${url}poll`); // removing the timeout temporarily. 
-                    if (response.status === 200) {
-                        const data = response.data;
-                        eventNotification.next(data)
-                    } else if (response.status === 204) {
-                        console.log('No new messages from the server.');
-                    } else {
-                        throw new Error(`Unexpected response status: ${response.status}`);
-                    }
-                } catch (error: unknown) {
-                    console.error(`Unknown Error.`) // culprit is here
-                    // Error handling with server disconnect notification
-                    let errorMessage: string;
-
-                    if (axios.isAxiosError(error)) {
-                        if (error.response) {
-                            errorMessage = `Server returned status ${error.response.status}: ${error.response.statusText}`;
-                        } else if (error.code === 'ECONNABORTED') {
-                            errorMessage = 'Request timed out.';
-                        } else {
-                            errorMessage = error.message || 'An Axios error occurred.';
-                        }
-                    } else if (error instanceof Error) {
-                        errorMessage = error.message;
-                    } else {
-                        errorMessage = 'An unknown error occurred during polling.';
-                    }
-
-                    console.error(`Polling error: ${errorMessage}`);
-                    // observer.error(new Error(errorMessage)); // Notify subscribers of the error
-                    break; // Stop polling on error
-                }
-            }
-        };
-
-        longPoll();
-
-        // Cleanup logic for unsubscribing
-        return () => {
-            console.log({ message: 'Unsubscribed from the long-polling channel.' });
-            eventNotification.complete(); // Notify completion
-        };
-    });
-}

+ 39 - 108
src/test/receiver.ts

@@ -1,58 +1,70 @@
-import { filter, interval, map, Observable, Observer, Subject, Subscription } from "rxjs";
+/* Same as Test 1 in transmitter.ts. This one will reflect the UI environment, in that it will use is browser env.
+For some cases, it is important, like websocket or grpc, or even http, because the way the transport service set up
+is very different. Need to declare. If not, be default, transmission manager and adapter manager will assume they are
+from the backned environment, and thus will attempt tp use dotenv configuration.
+
+Generally the layout is more or less the same, with the exception of having to declare the isClient boolean to true.
+The rest is pretty much the same. There is no message producer as in transmitter class, because I was too lazy to
+separate them out. This receiver file was written after, but the concept is the same. Both transmitter and receiver
+can assume dual roles. So it doesn't really matter.
+
+Please note: For further testing, can just copy and paste from the transmitter.ts, with some tweaks on getting
+profile information as well as setting the isClient boolean.*/
+import { filter, interval, map, Observable, Observer, Subject, Subscription, take } from "rxjs";
 import { v4 as uuidv4 } from 'uuid'
-import config from '../config/config.json';
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
-import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { FisMessage, GeneralEvent, TransportType, TransportServiceInterface, TransportSet, TransmissionProfile } from "../interface/interface";
-import { WebsocketTransportService } from "../transport/websocket";
-import { HttpTransportService } from "../transport/http";
+import { FisMessage, GeneralEvent, TransportType, TransportServiceInterface, TransportSet, TransmissionProfile, TransportMessage } from "../interface/interface";
+import config from '../config/config.json';
 import clientProfile from '../../clients/clientprofile.json'
-import serverProfile from '../../clients/serverprofile.json'
-import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
+import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
 import { MessageTransmissionRequestResponse } from "../transmission/msg.transmission.request-response";
-
-/* This is Emulating UI perspective. Using just browser environment, and not using any dotenv. */
+import { setUpTransportService, sortTransportFromEnv } from "../utils/transport.utils";
+import { MessageProducer } from "./messageProducer";
 class Supervisor {
-    private generalBus: Subject<GeneralEvent<any>> = new Subject()
     private console = new ConsoleLogger('Supervisor', ['base'])
     private isClient: boolean = true
+    private clientIncomingMessage: Subject<FisMessage> = new Subject()
+    private messageProducer!: MessageProducer
     private transmissionManager!: MessageTransmissionManager
-    private event: Subject<GeneralEvent<any>>
-    private outgoingPipe: Subject<any> = new Subject()
-    private transportServiceArray: TransportServiceInterface[] = []
+    private event!: Subject<GeneralEvent<any>>
     private transportSet: TransportSet[] = []
+    private transportServiceArray: TransportServiceInterface[] = []
     private config: TransmissionProfile = {
         target: clientProfile.clientId,
         source: clientProfile.id
     }
 
     constructor() {
-        this.console.log({message: `Self ${this.config.source} && Target: ${this.config.target}`})
-        this.event = new Subject()
-        // Start setting up existing transport based on .env file
-        this.sortTransportFromEnv(this.transportSet)
-        this.transportSet.forEach(transport => {
-            this.setUpTransportService(transport, this.event, this.isClient)
+        this.event = new Subject<GeneralEvent<any>>()
+        // set up all the transportServices
+        sortTransportFromEnv(this.transportSet)
+        this.transportSet.forEach(set => {
+            setUpTransportService(set, this.event, this.transportServiceArray, config.connection.transmitter)
         })
-        // once adapter manager is instantiated, it will attempt to connect to existing started transport
         this.tieInAdapterWithExistingTransportServices(this.event)
-
+        this.messageProducer = new MessageProducer(this.clientIncomingMessage)
         this.transmissionManager = new MessageTransmissionManager(this.event, this.isClient)
         this.startMessageTransmission()
+
     }
 
+    // Testing here. Just comment out or uncomment out the parts you want to test
     private startMessageTransmission(): void {
         let transmitter: MessageTransmissionTransmitter = this.transmissionManager.getTransmitter(this.config) as MessageTransmissionTransmitter
         let receiver: MessageTransmissionReceiver = this.transmissionManager.getReceiver(this.config) as MessageTransmissionReceiver
         let requestResponse: MessageTransmissionRequestResponse = this.transmissionManager.getRequestResponse(this.config, transmitter, receiver)
 
         // emit Message only
-        // this.emitMessage(transmitter, this.generateNotifcation())
+        this.emitMessage(transmitter, this.messageProducer.getNotificationMessage())
+        // this.emitMessage(transmitter, this.messageProducer.getOutgoingMessages())
 
         // receive Message only
-        this.streamMessage(receiver)
+        this.streamMessage(receiver).subscribe((message: FisMessage) => {
+            this.console.log({ message: `Received message ${message.header.messageID}` })
+            this.clientIncomingMessage.next(message)
+        })
 
         // request-response emulation
         // this.sendMessage(requestResponse, { header: { messageID: `123`, messageName: `RequestMessage` }, data: `Test Data` } as FisMessage).subscribe({
@@ -62,7 +74,6 @@ class Supervisor {
         // })
     }
 
-
     private emitMessage(transmitter: MessageTransmissionTransmitter, source: Subject<any> | Observable<any>): void {
         source.subscribe(message => transmitter.emit(message))
     }
@@ -85,6 +96,7 @@ class Supervisor {
         })
     }
 
+    // Just for clarification, this is incoming not outgoing stream. Apologies in advanced for the lack of a better term.
     private streamMessage(receiverInstance: MessageTransmissionReceiver): Observable<any> {
         return new Observable((response: Observer<any>) => {
             const subscription: Subscription = receiverInstance.getReceivables().subscribe({
@@ -102,54 +114,9 @@ class Supervisor {
         })
     }
 
-    private setUpTransportService(transportSet: TransportSet, event: Subject<GeneralEvent<any>>, isClient?: boolean): void {
-        try {
-            let transportService: TransportServiceInterface = this.instantiateTransportService(transportSet.transport, event)
-            this.transportServiceArray.push(transportService)
-            if (transportService instanceof WebsocketTransportService) {
-                this.console.log({ message: `Just Double Checking... this is websocket` })
-                if (isClient) {
-                    // please note this is subject to change depending on the UI environemnt. Angular has their own built in function to read json file based on Swopt-UI
-                    transportService.startClient(config.connection.transmitter)
-                } else {
-                    transportService.startServer(transportSet.port);
-                }
-            } else if (transportService instanceof HttpTransportService) {
-                this.console.log({ message: `Just Double Checking... this is http` })
-                // Additional Http-specific setup if needed.
-                if (isClient) {
-                    transportService.startClient(config.connection.transmitter)
-                } else {
-                    transportService.startServer(transportSet.port)
-                }
-            }
-        } catch (error) {
-            this.console.error({ message: 'Fail to set transport. Error in setting up transport', details: error })
-        }
-    }
 
-    private instantiateTransportService(transportType: TransportType, event: Subject<GeneralEvent<any>>): TransportServiceInterface {
-        if (transportType === 'Websocket') {
-            return new WebsocketTransportService(event)
-        }
-        else if (transportType === 'Http') {
-            return new HttpTransportService(event)
-        } else {
-            throw new Error(`No Transport Service Instantiated`)
-        }
-    }
-
-    // just to re-arrange the list of transport servicce from env. Of course, varying modules will have varying ways to start up their transport respectively. TO adapter later
-    private sortTransportFromEnv(transportSet: TransportSet[]): void {
-        let transportList: string[] = process.env.Transport?.split(',') || []
-        let portList: number[] = (process.env.PORT?.split(',') || []).map(port => Number(port));
-        transportList.forEach((transport, index) => {
-            transportSet.push({ transport: transport, port: portList[index] } as unknown as TransportSet)
-        })
-        this.console.log({ message: 'TransportSetList', details: this.transportSet })
-    }
-
-    // A method to broadcast avaible tranpsort to adapterManager the available transport to be connected after the adapter manager has been instantiated
+    // this one is set up specifically for the case if there's already existing transport services ready, before the instantation of adapter manager.
+    // this is so that the adapter can get all the existing available transport whilst also listening to new ones in the future
     private tieInAdapterWithExistingTransportServices(eventBus: Subject<GeneralEvent<any>>): void {
         const subscription: Subscription = eventBus.pipe(
             filter(event => event.type === `Adapter Event`),
@@ -166,42 +133,6 @@ class Supervisor {
             })
         })
     }
-
-    private generateNotifcation(): Observable<FisMessage> {
-        return new Observable((response: Observer<FisMessage>) => {
-            const intervalMessageGeneration = interval(1000).pipe(
-                map(() => {
-                    const message: FisMessage = {
-                        header: {
-                            messageID: uuidv4(),
-                            messageName: 'NotificationMessage'
-                        },
-                        data: `Data`
-                    };
-                    return message;
-                })
-            );
-
-            const subscription = intervalMessageGeneration.subscribe({
-                next: message => response.next(message),
-                error: error => response.error(error),
-                complete: () => {
-                    response.next({
-                        header: {
-                            messageID: uuidv4(),
-                            messageName: 'NotificationMessage'
-                        },
-                        data: `Complete`
-                    });
-                    response.complete();
-                }
-            });
-
-            // Ensure cleanup on unsubscribe
-            return () => subscription.unsubscribe();
-        });
-    }
-
 }
 
 let supervisor = new Supervisor()

+ 75 - 178
src/test/transmitter.ts

@@ -1,19 +1,23 @@
+/* Test 1 Scenario:
+So for testing, there's a few things that needs to be set up for this 1 on 1 transmission scenario to play out. First thing first, 
+the 'clients' that are using this transmission is presumed. We don't know who, so we are going to pretend that they are there. Hence 
+the message producer and this supervisor class that I have coded is going to fill in that supposed role. Additionally, transport services
+should be set up beforehand, to ensure that there are transport protocol ready, so that this Message Transmission components can start
+listening to it. 
+Note: However, that doesn't mean that if there's no tranport protocol available, it will not work. The trannsmission manager and adapter
+manager will just subscribe and listen to the global event for any new transport protocal and then perform the necessary subscription to 
+it. */
 import { filter, interval, map, Observable, Observer, Subject, Subscription, take } from "rxjs";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
-import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { FisMessage, GeneralEvent, TransportType, TransportServiceInterface, TransportSet, TransmissionProfile } from "../interface/interface";
-import config from '../config/config.json';
-import { startSocketServer } from "../utils/socket.utils";
-import { WebsocketTransportService } from "../transport/websocket";
-import { HttpTransportService } from "../transport/http";
-import clientProfile from '../../clients/serverprofile.json'
+import { FisMessage, GeneralEvent, TransportType, TransportServiceInterface, TransportSet, TransmissionProfile, TransportMessage } from "../interface/interface";
 import serverProfile from '../../clients/serverprofile.json'
 import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
 import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
 import { MessageTransmissionRequestResponse } from "../transmission/msg.transmission.request-response";
-import { error } from "console";
+import { setUpTransportService, sortTransportFromEnv } from "../utils/transport.utils";
+import { MessageProducer } from "./messageProducer";
 class Supervisor {
     private console = new ConsoleLogger('Supervisor', ['base'])
     private clientIncomingMessage: Subject<FisMessage> = new Subject()
@@ -30,18 +34,18 @@ class Supervisor {
     constructor() {
         this.event = new Subject<GeneralEvent<any>>()
         // set up all the transportServices
-        this.sortTransportFromEnv(this.transportSet)
+        sortTransportFromEnv(this.transportSet)
         this.transportSet.forEach(set => {
-            this.setUpTransportService(set, this.event)
+            setUpTransportService(set, this.event, this.transportServiceArray)
         })
         this.tieInAdapterWithExistingTransportServices(this.event)
-        // so need them adapters now. But supervisor shouldn't be concerned, only messageTransmissionManager and ConnectionManager
         this.messageProducer = new MessageProducer(this.clientIncomingMessage)
         this.transmissionManager = new MessageTransmissionManager(this.event)
         this.startMessageTransmission()
 
     }
 
+    // Testing here. Just comment out or uncomment out the parts you want to test
     private startMessageTransmission(): void {
         let transmitter: MessageTransmissionTransmitter = this.transmissionManager.getTransmitter(this.config) as MessageTransmissionTransmitter
         let receiver: MessageTransmissionReceiver = this.transmissionManager.getReceiver(this.config) as MessageTransmissionReceiver
@@ -52,17 +56,17 @@ class Supervisor {
         // this.emitMessage(transmitter, this.messageProducer.getOutgoingMessages())
 
         // receive Message only
-        this.streamMessage(receiver).subscribe(message => {
-            this.console.log({ message: message.header.messageID })
-            // this.clientIncomingMessage.next(message)
+        this.streamMessage(receiver).subscribe((message: FisMessage) => {
+            this.console.log({ message: `Received message ${message.header.messageID}` })
+            this.clientIncomingMessage.next(message)
         })
 
         // request-response emulation
-        this.sendMessage(requestResponse, { header: { messageID: `123`, messageName: `RequestMessage` }, data: `Test Data` } as FisMessage).subscribe({
-            next: message => this.console.log({ message: `Received response message for request ${(message as FisMessage).header.messageID}` }),
-            error: error => this.console.error({ message: `Something went wrong`, details: error }),
-            complete: () => this.console.log({ message: `Request completed` })
-        })
+        // this.sendMessage(requestResponse, { header: { messageID: `123`, messageName: `RequestMessage` }, data: `Test Data` } as FisMessage).subscribe({
+        //     next: message => this.console.log({ message: `Received response message for request ${(message as FisMessage).header.messageID}` }),
+        //     error: error => this.console.error({ message: `Something went wrong`, details: error }),
+        //     complete: () => this.console.log({ message: `Request completed` })
+        // })
     }
 
     private emitMessage(transmitter: MessageTransmissionTransmitter, source: Subject<any> | Observable<any>): void {
@@ -87,6 +91,7 @@ class Supervisor {
         })
     }
 
+    // Just for clarification, this is incoming not outgoing stream. Apologies in advanced for the lack of a better term.
     private streamMessage(receiverInstance: MessageTransmissionReceiver): Observable<any> {
         return new Observable((response: Observer<any>) => {
             const subscription: Subscription = receiverInstance.getReceivables().subscribe({
@@ -105,52 +110,57 @@ class Supervisor {
     }
 
     // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.
-    private setUpTransportService(transportSet: TransportSet, event: Subject<GeneralEvent<any>>, isClient?: boolean): void {
-        try {
-            let transportService: TransportServiceInterface = this.instantiateTransportService(transportSet.transport, event)
-            this.transportServiceArray.push(transportService)
-            if (transportService instanceof WebsocketTransportService) {
-                this.console.log({ message: `Just Double Checking... this is websocket` })
-                if (isClient) {
-                    // please note this is subject to change depending on the UI environemnt. Angular has their own built in function to read json file based on Swopt-UI
-                    transportService.startClient(config.connection.transmitter)
-                } else {
-                    transportService.startServer(transportSet.port);
-                }
-            } else if (transportService instanceof HttpTransportService) {
-                this.console.log({ message: `Just Double Checking... this is http` })
-                // Additional Http-specific setup if needed.
-                if (isClient) {
-                    transportService.startClient(config.connection.transmitter)
-                } else {
-                    transportService.startServer(transportSet.port)
-                }
-            }
-        } catch (error) {
-            this.console.error({ message: 'Fail to set transport. Error in setting up transport', details: error })
-        }
-    }
-
-    private instantiateTransportService(transportType: TransportType, event: Subject<GeneralEvent<any>>): TransportServiceInterface {
-        if (transportType === 'Websocket') {
-            return new WebsocketTransportService(event)
-        }
-        else if (transportType === 'Http') {
-            return new HttpTransportService(event)
-        } else {
-            throw new Error(`No Transport Service Instantiated`)
-        }
-    }
-
-    private sortTransportFromEnv(transportSet: TransportSet[]): void {
-        let transportList: string[] = process.env.Transport?.split(',') || []
-        let portList: number[] = (process.env.PORT?.split(',') || []).map(port => Number(port));
-        transportList.forEach((transport, index) => {
-            transportSet.push({ transport: transport, port: portList[index] } as unknown as TransportSet)
-        })
-        this.console.log({ message: 'TransportSetList', details: this.transportSet })
-    }
-
+    // private setUpTransportService(transportSet: TransportSet, event: Subject<GeneralEvent<any>>, isClient?: boolean): void {
+    //     try {
+    //         let transportService: TransportServiceInterface = this.instantiateTransportService(transportSet.transport, event)
+    //         this.transportServiceArray.push(transportService)
+    //         if (transportService instanceof WebsocketTransportService) {
+    //             this.console.log({ message: `Just Double Checking... this is websocket` })
+    //             if (isClient) {
+    //                 // please note this is subject to change depending on the UI environemnt. Angular has their own built in function to read json file based on Swopt-UI
+    //                 transportService.startClient(config.connection.transmitter)
+    //             } else {
+    //                 transportService.startServer(transportSet.port);
+    //             }
+    //         } else if (transportService instanceof HttpTransportService) {
+    //             this.console.log({ message: `Just Double Checking... this is http` })
+    //             // Additional Http-specific setup if needed.
+    //             if (isClient) {
+    //                 transportService.startClient(config.connection.transmitter)
+    //             } else {
+    //                 transportService.startServer(transportSet.port)
+    //             }
+    //         }
+    //         // additional transport protocol here...
+    //     } catch (error) {
+    //         this.console.error({ message: 'Fail to set transport. Error in setting up transport', details: error })
+    //     }
+    // }
+
+    // private instantiateTransportService(transportType: TransportType, event: Subject<GeneralEvent<any>>): TransportServiceInterface {
+    //     if (transportType === 'Websocket') {
+    //         return new WebsocketTransportService(event)
+    //     }
+    //     else if (transportType === 'Http') {
+    //         return new HttpTransportService(event)
+    //     } else {
+    //         throw new Error(`No Transport Service Instantiated`)
+    //     }
+    //     // additional transport protocol here...
+    // }
+
+    // // get the transpor information from dotenv file. Plase note this is just testing only. Real production may have their own way of defining their transport protocl. Please adapte accordingly.
+    // private sortTransportFromEnv(transportSet: TransportSet[]): void {
+    //     let transportList: string[] = process.env.Transport?.split(',') || []
+    //     let portList: number[] = (process.env.PORT?.split(',') || []).map(port => Number(port));
+    //     transportList.forEach((transport, index) => {
+    //         transportSet.push({ transport: transport, port: portList[index] } as unknown as TransportSet)
+    //     })
+    //     this.console.log({ message: 'TransportSetList', details: this.transportSet })
+    // }
+
+    // this one is set up specifically for the case if there's already existing transport services ready, before the instantation of adapter manager.
+    // this is so that the adapter can get all the existing available transport whilst also listening to new ones in the future
     private tieInAdapterWithExistingTransportServices(eventBus: Subject<GeneralEvent<any>>): void {
         const subscription: Subscription = eventBus.pipe(
             filter(event => event.type === `Adapter Event`),
@@ -170,118 +180,5 @@ class Supervisor {
 }
 
 
-class MessageProducer {
-    private console = new ConsoleLogger('Message Producer', ['base'])
-    private generalNotification: Subject<FisMessage> = new Subject()
-    private incomingMessageBus!: Subject<FisMessage>
-    private outgoingMessageBus: Subject<FisMessage> = new Subject()
-
-    constructor(incomingMessageBus: Subject<FisMessage>) {
-        this.console.log({ message: `Constructing Message Producer` })
-        this.incomingMessageBus = incomingMessageBus
-
-        this.generateNotifcation().subscribe(this.generalNotification)
-        this.handleIncomingRequests(this.incomingMessageBus.asObservable(), this.outgoingMessageBus)
-    }
-
-    public getNotificationMessage(): Observable<FisMessage> {
-        return this.generalNotification.asObservable()
-    }
-
-    public getOutgoingMessages(): Observable<FisMessage> {
-        return this.outgoingMessageBus.asObservable()
-    }
-
-    // this is called no problem
-    private handleIncomingRequests(requests: Observable<FisMessage>, outgoingMessageBus: Subject<FisMessage>): void {
-        requests.subscribe((request: FisMessage) => {
-            this.console.log({ message: `Generating response for new request ${request.header.messageID}` })
-            this.generateMessage(request.header.messageID, 10).subscribe({
-                next: message => outgoingMessageBus.next(message),
-                error: error => this.console.log({ message: 'observer Error', details: error }),
-                complete: () => {
-                    outgoingMessageBus.next({
-                        header: {
-                            messageID: request.header.messageID,
-                            messageName: `ResponseMessage`
-                        },
-                        data: `Complete`
-                    } as FisMessage)
-                }
-            })
-        })
-    }
-
-    private generateMessage(requestID: string, amount: number): Observable<FisMessage> {
-        return new Observable((response: Observer<FisMessage>) => {
-            const intervalMessageGeneration = interval(1000).pipe(
-                take(amount), // Ensures only 'amount' messages are generated
-                map(() => {
-                    const message: FisMessage = {
-                        header: {
-                            messageID: requestID,
-                            messageName: 'ResponseMessage'
-                        },
-                        data: `Data`
-                    };
-                    return message;
-                })
-            );
-
-            const subscription = intervalMessageGeneration.subscribe({
-                next: message => response.next(message),
-                error: error => response.error(error),
-                complete: () => {
-                    response.next({
-                        header: {
-                            messageID: requestID,
-                            messageName: 'ResponseMessage'
-                        },
-                        data: `Complete`
-                    });
-                    response.complete();
-                }
-            });
-
-            // Ensure cleanup on unsubscribe
-            return () => subscription.unsubscribe();
-        });
-    }
-
-    private generateNotifcation(): Observable<FisMessage> {
-        return new Observable((response: Observer<FisMessage>) => {
-            const intervalMessageGeneration = interval(1000).pipe(
-                map(() => {
-                    const message: FisMessage = {
-                        header: {
-                            messageID: uuidv4(),
-                            messageName: 'NotificationMessage'
-                        },
-                        data: `Data`
-                    };
-                    return message;
-                })
-            );
-
-            const subscription = intervalMessageGeneration.subscribe({
-                next: message => response.next(message),
-                error: error => response.error(error),
-                complete: () => {
-                    response.next({
-                        header: {
-                            messageID: uuidv4(),
-                            messageName: 'NotificationMessage'
-                        },
-                        data: `Complete`
-                    });
-                    response.complete();
-                }
-            });
-
-            // Ensure cleanup on unsubscribe
-            return () => subscription.unsubscribe();
-        });
-    }
-}
 
 let supervisor = new Supervisor()

+ 5 - 3
src/transmission/msg.transmission.manager.ts

@@ -1,3 +1,5 @@
+/* General Note: Mesasge transmission manager instantiate transmission component, but it itself is not concerned with the state of the 
+client. So, whether or not if the client is online, or what-ever transport protocol they're using is none of this manager's concern. */
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { AdapterManager } from "../adapters/adapter.manager";
@@ -9,10 +11,10 @@ import { GeneralEvent, MessageReceiverInterface, MessageTransmitterInterface, Tr
 import { MessageTransmissionManagerBase } from "../base/msg.transmission.manager.base";
 
 export class MessageTransmissionManager extends MessageTransmissionManagerBase {
-    private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionManager`, ['managers'])
 
     constructor(event: Subject<GeneralEvent<any>>, browserEnv?: boolean) {
         super()
+        this.console = new ConsoleLogger(`MessageTransmissionManager`, ['managers'])
         if (browserEnv) this.browserEnv = browserEnv
         this.console.log({ message: `Contructing self... ${this.browserEnv ? `is receiving end` : `is not browser env`}` })
         this.event = event
@@ -23,13 +25,13 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
     // for now these fuctions ain't pure, cuz messageTransmissionManager needs to keep a record of these tranmission instances as well for safe keeping.
     public getTransmitter(profile: TransmissionProfile): MessageTransmitterInterface {
         let transmitterInstance = new MessageTransmissionTransmitter(profile, this.adapterManager)
-        this.tranmissionRef.push(transmitterInstance)
+        this.transmissionRef.push(transmitterInstance)
         return transmitterInstance
     }
 
     public getReceiver(profile: TransmissionProfile): MessageReceiverInterface {
         let receiverInstance = new MessageTransmissionReceiver(profile, this.adapterManager)
-        this.tranmissionRef.push(receiverInstance)
+        this.transmissionRef.push(receiverInstance)
         return receiverInstance
     }
 

+ 15 - 12
src/transmission/msg.transmission.receiver.ts

@@ -1,11 +1,16 @@
-import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription } from 'rxjs';
-import { v4 as uuidv4 } from 'uuid'
-import { ReceiverAdapter } from '../adapters/adapter.receiver';
+/* Receiving Component, as the name implies only deals with receving data. Same concept as the transmitter, it will subscribe for 
+receving adapters from adapter manager, and use the ones that are available.
+
+Note for enhancements in the future;
+i) Logic to dynamically switch adapters, either based on their connection status or other factors
+ii) Enabling the use of mutli adapters usage to increase bandwith for data transmission. (More Advanced)
+
+ */
+import { BehaviorSubject, filter, Observable, Observer, Subject, Subscription } from 'rxjs';
 import { checkMessage, WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
 import { MessageTransmissionBase } from '../base/msg.transmission.base';
 import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransmissionProfile, TransportMessage } from '../interface/interface';
-import { error } from 'console';
 
 export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
     private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>(`OFFLINE`)
@@ -13,7 +18,6 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
     private onHoldMessage: Subject<WrappedMessage> = new Subject()
     private currentAdapter!: ReceiverAdapterInterface
     private incomingMessage: Subject<GeneralEvent<TransportMessage>> = new Subject()
-    // private toBePassedOver: Subject<WrappedMessage> = new Subject()
 
     constructor(profile: TransmissionProfile, adapterManager: AdapterManagerInterface) {
         super()
@@ -22,19 +26,18 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         this.initializeReceiverComponents(adapterManager)
     }
 
-    public getReceivables(): Observable<GeneralEvent<TransportMessage>> {
-        return new Observable((receivable: Observer<GeneralEvent<TransportMessage>>) => {
-            this.console.log({ message: `Transmission streaming messages from ${this.profile.target}` })
+    public getReceivables(): Observable<FisMessage> {
+        return new Observable((receivable: Observer<FisMessage>) => {
+            this.console.log({ message: `Tranmission Subscription: Streaming incoming messages from ${this.profile.target}` })
             const subscription: Subscription = this.incomingMessage.pipe(
+                filter((event: GeneralEvent<any>) => event.type == `Adapter Event`),
                 filter((event: GeneralEvent<any>) => event.event == 'New Message'),
             ).subscribe((event: GeneralEvent<TransportMessage>) => {
-                // console.log(event) // data is transportMessage instead of eventmessage
                 this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
                 checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
                     // only release the message before it exists
                     this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
-                    // console.log(((event.data as TransportMessage).payload as WrappedMessage))
-                    receivable.next(event);
+                    receivable.next(((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage);
                 }).catch((error) => {
                     this.console.log({ message: `Observer Error`, details: error })
                 })
@@ -55,7 +58,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
             this.adapters.push(adapter)
             this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} receiving adapter. Current adapter length: ${this.adapters.length}` })
             if (!this.currentAdapter) {
-                this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`role`)} as current adapter.` })
+                this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`id`)} as current adapter.` })
                 this.currentAdapter = adapter as ReceiverAdapterInterface
                 this.currentAdapter.subscribeForIncoming().subscribe({
                     next: (message: GeneralEvent<TransportMessage>) => {

+ 11 - 13
src/transmission/msg.transmission.request-response.ts

@@ -1,18 +1,20 @@
+/* This is more unique transmission component, because it's supposed to emulate a conventional request response call. But the underlying
+mechanism is still using message. Here as you will see, it is basically taking the already instantiated transmitter and receiver components
+and basically just filtering the responses based on whatever identifier it needs for the orignal request. */
 import { MessageTransmissionBase } from "../base/msg.transmission.base";
-import { filter, map, Observable, Observer, Subject, Subscription, takeWhile } from "rxjs";
-import { v4 as uuidv4 } from 'uuid'
-import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
-import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
-import { AdapterInterface, FisMessage, GeneralEvent, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransmissionProfile, TransportMessage } from "../interface/interface";
-import { WrappedMessage } from "../utils/message.ordering";
+import { filter, Observable, Observer, Subscription } from "rxjs";
+import { FisMessage, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransmissionProfile } from "../interface/interface";
+import ConsoleLogger from "../utils/log.utils";
 
 export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface {
-    transmitterInstance!: MessageTransmitterInterface;
-    receiverInstance!: MessageReceiverInterface;
+    private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionRequestResponse`, ['transmission'])
+    protected transmitterInstance!: MessageTransmitterInterface;
+    protected receiverInstance!: MessageReceiverInterface;
 
     constructor(profile: TransmissionProfile, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface) {
         super()
         this.profile = profile
+        this.console.log({ message: `Constructing Request Response Transmission for Receiving target: ${this.profile.target}` })
         this.transmitterInstance = transmitterInstance
         this.receiverInstance = receiverInstance
     }
@@ -23,11 +25,7 @@ export class MessageTransmissionRequestResponse extends MessageTransmissionBase
             if (this.transmitterInstance && this.receiverInstance) {
                 this.transmitterInstance.emit(message)
                 const subscription: Subscription = this.receiverInstance.getReceivables().pipe(
-                    filter(event => event.event === `New Message`),
-                    filter(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID === message.header.messageID),
-                    map(event => {
-                        return (event.data as TransportMessage).payload as FisMessage
-                    })
+                    filter(event => event.header.messageID === message.header.messageID),
                 ).subscribe({
                     next: (message: FisMessage) => {
                         if (message.data == 'Complete') {

+ 10 - 14
src/transmission/msg.transmission.transmitter.ts

@@ -1,3 +1,11 @@
+/* Transmitter components, as the name implies, solely created to transmit messages only. Will subscribe for adapters from
+adapter manager to acquire adapters. Once adaptesr are required, it will just pick the one that is currently online, and 
+assciate that connection status with the buffer service / offline retransmission to start sending buffered messages.
+
+Note for enhancements in the future;
+i) Logic to dynamically switch adapters, either based on their connection status or other factors
+ii) Enabling the use of mutli adapters usage to increase bandwith for data transmission. (More Advanced)
+*/
 import { MessageTransmissionBase } from "../base/msg.transmission.base";
 import { v4 as uuidv4 } from 'uuid'
 import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Subject, Subscription } from "rxjs";
@@ -8,8 +16,7 @@ import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage,
 import { error } from "console";
 import { TransmitterAdapter } from "../adapters/adapter.transmitter";
 
-/* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
-connectors or adapters will have their own identifier*/
+
 export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
     private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
@@ -40,7 +47,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
             this.adapters.push(adapter)
             this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} transmitting adapter. Current adapter length: ${this.adapters.length}` })
             if (!this.currentAdapter) {
-                this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`transportType`)} as current adapter.` })
+                this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`id`)} as current adapter.` })
                 this.currentAdapter = adapter as TransmitterAdapterInterface
                 let connectionState: Observable<ConnectionState> = this.currentAdapter.getAdapterProfile('connectionState') as Observable<ConnectionState>
                 connectionState.subscribe(this.connectionStateEvent)
@@ -62,15 +69,4 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         })
     }
 
-    // this is for http only. Please ignore for now.
-    private uniqueHandlerToFlushUnsentMessages(event: Observable<GeneralEvent<any>>): void {
-        event.pipe(
-            filter(event => event.event == 'Re-Flush'),
-            filter(event => event.data.clientId == this.profile.target),
-        ).subscribe((event: GeneralEvent<any>) => {
-            this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}` : `Buffering ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}`}` })
-            this.messageToBeBuffered.next(((event.data.payload as TransportMessage).payload as WrappedMessage))
-        })
-    }
-
 }

+ 4 - 68
src/transport/http.ts

@@ -2,12 +2,11 @@ import { Express } from 'express';
 import { filter, Observable, Subject, Subscription, take } from "rxjs";
 import { v4 as uuidv4 } from 'uuid'
 import config from '../config/config.json';
-import { handleClientHttpConnection, handleHttpClient, initiateClientToServer, startHttpServer } from "../utils/http.utils";
 import { WrappedMessage } from '../utils/message.ordering';
 import { error } from 'console';
 import axios, { AxiosError } from 'axios';
 import ConsoleLogger from '../utils/log.utils';
-import { ClientObject, GeneralEvent, Transport, TransportMessage, TransportServiceInterface, TransportServiceProfile } from '../interface/interface';
+import { ClientObject, GeneralEvent, TransportMessage, TransportServiceInterface, TransportServiceProfile } from '../interface/interface';
 
 export class HttpTransportService implements TransportServiceInterface {
     private console: ConsoleLogger = new ConsoleLogger(`HttpTransportService`, ['transport'])
@@ -35,81 +34,18 @@ export class HttpTransportService implements TransportServiceInterface {
     }
 
     public emit(message: TransportMessage): void {
-        let clientObj: ConnectedHttpClient | undefined = this.connectedHttpClients.find(obj => obj.clientId == message.target)
-        let serverObj: ConnectedHttpServer | undefined = this.connectedHttpServer.find(obj => obj.clientId === message.target)
-
-        // for server usage
-        if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
-            clientObj.responseStream.next(message.payload as WrappedMessage)
-        }
-        // for client usage 
-        if (serverObj) {
-            if (serverObj.connectionState.getValue() == 'ONLINE') {
-                axios.post(`${this.baseUrl}message`, message.payload, {
-                    headers: { 'Content-Type': 'application/json' },
-                }).then((response) => {
-                    this.console.log({ message: `Response From Server: ${response.data}`, details: response.status });
-                }).catch((error: AxiosError) => {
-                    console.error('HTTP emit error:', error.code);
-                    this.transportEvent.next({
-                        id: uuidv4(),
-                        type: 'Transport Event',
-                        event: 'Re-Flush',
-                        date: new Date(),
-                        data: message,
-                        transport: 'Http'
-                    } as GeneralEvent<TransportMessage>)
-                });
-            } else {
-                this.console.error({ message: `Target Server is offline: Reflusing message ${(message.payload as WrappedMessage).thisMessageID}` });
-                this.transportEvent.next({
-                    id: uuidv4(),
-                    type: 'Transport Event',
-                    event: 'Re-Flush',
-                    date: new Date(),
-                    data: message,
-                    transport: 'Http'
-                } as GeneralEvent<TransportMessage>)
-            }
-        }
-
+        // code here
     }
 
     public startServer(port: number): void {
-        startHttpServer(port).subscribe({
-            next: (client: ConnectedHttpClient) => {
-                handleHttpClient(this.info.transportServiceId, client, this.connectedHttpClients).subscribe({
-                    next: event => this.transportEvent.next(event),
-                    error: error => console.error(error),
-                    complete: () => (`Client ${client.clientId} disconnected...`)
-                })
-            },
-            error: error => this.console.error({ message: 'Observer Error', details: error }),
-            complete: () => this.console.log({ message: '...' })
-        })
+        // code here
     }
 
     public startClient(url: string, receiverProfileInfo?: ConnectedHttpServer | undefined): void {
-        initiateClientToServer(this.info.transportServiceId, url, this.transportEvent, this.connectedHttpServer, receiverProfileInfo).then((connectedHttpServer: ConnectedHttpServer) => {
-            handleClientHttpConnection(url, connectedHttpServer).subscribe({
-                next: event => this.transportEvent.next(event),
-                error: error => this.console.log({ message: `Observer Error`, details: error }),
-                complete: () => {
-                    this.console.log({ message: `Re-connecting with server` })
-                    this.startClient(url, connectedHttpServer)
-                }
-            })
-        }).catch((error: { error: AxiosError, objRef: ConnectedHttpServer | undefined }) => {
-            this.console.error({ message: `HttpTransport ERROR: <InitiateClientToServerFailure>`, details: error.error.code })
-            setTimeout(() => {
-                this.startClient(url, error.objRef)
-            }, 3000); // Retry with delay
-        })
+        // code hereX
     }
 
 }
-
-
 export interface ConnectedHttpClient extends ClientObject {
     instance: Express
     responseStream: Subject<WrappedMessage>

+ 2 - 3
src/transport/websocket.ts

@@ -24,7 +24,7 @@ export class WebsocketTransportService implements TransportServiceInterface {
         // logic here
         this.event = event
     }
-
+    
     public startServer(port: number): void {
         startSocketServer(port).subscribe({
             next: (connectedClient: SocketForConnectedClient) => {
@@ -40,7 +40,6 @@ export class WebsocketTransportService implements TransportServiceInterface {
     }
 
     public startClient(url: string): void {
-        // logic here
         startClientSocketConnection(url).then((socket: SocketForConnectedServer) => {
             handleClientSocketConnection(this.info.transportServiceId, socket, this.connectedSocketServer).subscribe(this.event)
         }).catch((error) => {
@@ -50,7 +49,7 @@ export class WebsocketTransportService implements TransportServiceInterface {
 
 
     public emit(message: TransportMessage): void {
-        this.console.log({ message: `Emitting: ${((message.payload as WrappedMessage).payload as FisMessage).header.messageID} to ${message.target}`, details: message })
+        this.console.log({ message: `Emitting: ${(message.payload as WrappedMessage).thisMessageID} to ${message.target}`, details: message })
         let clientObj: ConnectedSocketClient | undefined = this.connectedClientSocket.find(obj => obj.clientId === message.target)
         let serverObj: ConnectedSocketServer | undefined = this.connectedSocketServer.find(obj => obj.clientId === message.target)
         // for server usage

+ 1 - 558
src/utils/http.utils.ts

@@ -1,558 +1 @@
-import express, { Response } from 'express';
-import * as fs from 'fs'
-import { Express } from 'express';
-import { v4 as uuidv4 } from 'uuid'
-import { ConnectedHttpClient, ConnectedHttpServer } from "../transport/http";
-import { BehaviorSubject, Observable, Observer, Subject, Subscription } from "rxjs";
-import { WrappedMessage } from './message.ordering';
-import axios, { AxiosError, AxiosResponse } from 'axios';
-import ConsoleLogger from './log.utils';
-import path from 'path';
-import { ConnectionState, FisMessage, GeneralEvent, TransportMessage } from '../interface/interface';
-const console: ConsoleLogger = new ConsoleLogger(`HttpUtils`, ['transport'])
-
-export function startHttpServer(port: number): Observable<ConnectedHttpClient> {
-    return new Observable((observer: Observer<ConnectedHttpClient>) => {
-        let app: Express = express();
-
-        // Middleware to parse JSON requests
-        app.use(express.json());
-
-        app.listen(port, () => {
-            console.log({ message: `Server running at http://localhost:${port}` });
-        });
-
-
-    })
-}
-
-export async function initiateClientToServer(transportServiceId: string, url: string, event: Subject<GeneralEvent<any>>, connectedHttpServers: ConnectedHttpServer[], receiverProfileInfo: ConnectedHttpServer | undefined, browserEnv?: boolean,): Promise<ConnectedHttpServer> {
-    return new Promise((resolve, reject) => {
-        if (browserEnv) {
-            // logic here for using browser fetch
-        } else { // axios methods
-            if (receiverProfileInfo) {
-                console.log({ message: `Is Old profile, reconnecting with server` })
-                checkOwnClientInfo(receiverProfileInfo.clientId).then((profile: ConnectedHttpServer) => {
-                    receiverProfileInfo!.clientId = profile.clientId
-                    // console.log({ message: 'jsonfile', details: profile })
-                    postAxiosRequest(url + 'profile', { name: 'Old Client', message: profile }).then((profileInfo: { name: string, message: { id: string } }) => {
-                        console.log({ message: `Acknowledged as previous client. Id: ${profileInfo.message.id}` })
-                        event.next({
-                            id: uuidv4(),
-                            type: `Transport Event`,
-                            event: 'Server Connected',
-                            date: new Date(),
-                            data: {
-                                clientId: profileInfo.message.id,
-                                message: `Existing Http Channel ${profileInfo.message.id} re-established.`
-                            },
-                            transport: `Http`
-                        })
-                        // Update Http instance record
-                        let clientObj: ConnectedHttpServer | undefined = connectedHttpServers.find(obj => obj.clientId === profileInfo.message.id)
-                        console.log({ message: 'ClientObj', details: clientObj })
-                        console.log({ message: 'ReceiverProfile', details: receiverProfileInfo })
-                        if (clientObj) {
-                            clientObj.connectionState.next('ONLINE')
-                            console.log({ message: receiverProfileInfo.connectionState.getValue() })
-                            resolve(clientObj)
-                        }
-
-                    }).catch((error: AxiosError) => {
-                        reject({ error: error, objRef: receiverProfileInfo })
-                    })
-                }).catch((error) => {
-                    console.error(error)
-                    postAxiosRequest(url + 'profile', { name: 'New Client', data: null }).then((profileInfo: { name: string, message: any }) => {
-                        updateProfileAndPublishEvent(transportServiceId, (receiverProfileInfo as ConnectedHttpServer), profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
-                            resolve(receiverProfileInfo)
-                        })
-                    }).catch((error) => {
-                        reject({ error: error, objRef: receiverProfileInfo })
-                    })
-                    reject({ error: error, objRef: receiverProfileInfo })
-                })
-            } else {
-                console.log({ message: `Is New profile, Connecting with server` })
-                postAxiosRequest(url + 'profile', { name: 'New Client', data: null }).then((profileInfo: { name: string, message: any }) => {
-                    updateProfileAndPublishEvent(transportServiceId, receiverProfileInfo, profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
-                        resolve(receiverProfileInfo)
-                    })
-                }).catch((error) => {
-                    reject({ error: error, objRef: receiverProfileInfo })
-                })
-            }
-        }
-    })
-}
-
-// For client usage
-export function handleClientHttpConnection(url: string, server: ConnectedHttpServer): Observable<GeneralEvent<any>> {
-    return new Observable((eventNotification: Observer<GeneralEvent<any>>) => {
-        console.log({ message: `Long Poll Attempt for ${server.clientId}` })
-        server.connectionState.next('ONLINE');
-        let active: boolean = true; // Flag to control polling lifecycle
-
-        const longPoll = async () => {
-            while (active) {
-                try {
-                    // Axios request with timeout
-                    const response = await axios.get(`${url}poll`); // removing the timeout temporarily. 
-                    // const response = await axios.get(`${url}response`, {
-                    //     timeout: 3000, // 10s timeout this one will trigger error. That's why it keeps on throwing error
-                    // });
-
-                    if (response.status === 200) {
-                        const data = response.data;
-                        console.log({ message: 'Long Poll Response', details: data })
-                        eventNotification.next({
-                            id: uuidv4(),
-                            type: `Transport Event`,
-                            event: 'New Message',
-                            date: new Date(),
-                            data: {
-                                id: uuidv4(),
-                                dateCreated: new Date(),
-                                transport: `Http`,
-                                target: server.clientId,
-                                payload: data,
-                            },
-                            transport: `Http`
-                        });
-                    } else if (response.status === 204) {
-                        console.log({ message: 'No new messages from the server.' });
-                    } else {
-                        console.error({ message: `Unexpected response status: ${response.status}` })
-                        handleServerConnectionError(active, eventNotification, server)
-                        throw new Error(`Unexpected response status: ${response.status}`);
-                    }
-                } catch (error: unknown) {
-                    console.error({ message: `Unknown Error.`, details: error }) // culprit is here
-                    handleServerConnectionError(active, eventNotification, server)
-                    // Error handling with server disconnect notification
-                    let errorMessage: string;
-
-                    if (axios.isAxiosError(error)) {
-                        if (error.response) {
-                            errorMessage = `Server returned status ${error.response.status}: ${error.response.statusText}`;
-                        } else if (error.code === 'ECONNABORTED') {
-                            errorMessage = 'Request timed out.';
-                        } else {
-                            errorMessage = error.message || 'An Axios error occurred.';
-                        }
-                    } else if (error instanceof Error) {
-                        errorMessage = error.message;
-                    } else {
-                        errorMessage = 'An unknown error occurred during polling.';
-                    }
-
-                    console.error({ message: `Polling error: ${errorMessage}` });
-                    // observer.error(new Error(errorMessage)); // Notify subscribers of the error
-                    break; // Stop polling on error
-                }
-            }
-        };
-
-        longPoll();
-
-        // Cleanup logic for unsubscribing
-        return () => {
-            console.log({ message: 'Unsubscribed from the long-polling channel.' });
-            eventNotification.complete(); // Notify completion
-        };
-    });
-}
-
-function handleServerConnectionError(active: boolean, observer: Observer<GeneralEvent<any>>, server: ConnectedHttpServer): void {
-    server.connectionState.next('OFFLINE');
-    console.log({ message: 'Server lost connection' });
-    active = false; // Stop polling
-    observer.next({
-        id: uuidv4(),
-        type: `Transport Event`,
-        event: 'Server Disconnected',
-        date: new Date(),
-        data: {
-            clientId: server.clientId,
-            message: '',
-            payload: {
-                objRef: server
-            },
-        },
-        transport: `Http`
-    });
-    observer.complete()
-}
-
-async function updateProfileAndPublishEvent(transportServiceId: string, receiverProfileInfo: ConnectedHttpServer | undefined, profile: { name: string, message: any }, event: Subject<GeneralEvent<any>>, connectedHttpServers: ConnectedHttpServer[]): Promise<ConnectedHttpServer> {
-    return new Promise((resolve, reject) => {
-        console.log({ message: `Assigned client Name: ${(profile.message as ConnectedHttpServer).clientId}` })
-        receiverProfileInfo = profile.message as ConnectedHttpServer
-        writeFile(profile.message as ConnectedHttpServer, (profile.message as ConnectedHttpServer).clientId).then(() => {
-            event.next({
-                id: uuidv4(),
-                type: `Transport Event`,
-                event: `New Server`,
-                date: new Date(),
-                data: {
-                    clientId: (profile.message as ConnectedHttpServer).clientId,
-                    message: `New Http Channel ${(profile.message as ConnectedHttpServer).clientId} established.`
-                },
-                transport: `Http`
-            })
-            // broadcast event to allow retransmission to relase buffered messages
-            event.next({
-                id: uuidv4(),
-                type: `Transport Event`,
-                event: `Server Connected`,
-                date: new Date(),
-                data: {
-                    clientId: (profile.message as ConnectedHttpServer).clientId,
-                    message: `Server ${(profile.message as ConnectedHttpServer).clientId} connected and ready to go.`
-                },
-                transport: `Http`
-            })
-        }).catch((error) => {
-            reject(error)
-        })
-
-        // Update http instance record
-        receiverProfileInfo = {
-            clientId: (profile.message as ConnectedHttpServer).clientId,
-            dateCreated: new Date(),
-            connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`),
-            transport: `Http`,
-            transportServiceId: transportServiceId
-        }
-        connectedHttpServers.push(receiverProfileInfo)
-        resolve(receiverProfileInfo)
-    })
-}
-
-export async function postAxiosRequest(url: string, data: any): Promise<any> {
-    return new Promise(async (resolve, reject) => {
-        try {
-            const response: AxiosResponse<any> = await axios.post(url, data);
-            console.log({ message: 'Response', details: response.data });
-            resolve(response.data)
-        } catch (error) {
-            if (axios.isAxiosError(error)) {
-                console.error({ message: 'Axios Error:', details: error.code });
-            } else {
-                console.error({ message: 'Unexpected Error:', details: error });
-            }
-            reject(error)
-        }
-    })
-}
-
-
-export function handleHttpClient(transportServiceId: string, clientInfo: ConnectedHttpClient, connectedClientHttp: ConnectedHttpClient[]): Observable<GeneralEvent<any>> {
-    return new Observable((event: Observer<GeneralEvent<any>>) => {
-        clientInfo.instance.post('/profile', (req, res) => {
-            // Client will declare this first before attempting to poll for response channel
-            handleProfile(transportServiceId, clientInfo.instance, req.body, res, event, connectedClientHttp)
-        });
-    })
-}
-
-function handleProfile(transportServiceId: string, app: Express, data: { name: `Old Client` | `New Client`, message: any }, res: Response, event: Observer<GeneralEvent<any>>, connectedClientHttp: ConnectedHttpClient[]): void {
-    if (data.name == `New Client`) {
-        let clientInstance: ConnectedHttpClient = {
-            clientId: uuidv4(), // client should only be assigned at this level. And is passed around for reference pointing
-            dateCreated: new Date(),
-            instance: app,
-            connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`), // for now it's offline because it needs to establish the long polling first
-            responseStream: new Subject<WrappedMessage>(),
-            transportServiceId: transportServiceId,
-            transport: `Http`
-        }
-
-        // send to receiver for reference
-        res.json({
-            name: `New Profile`, message: { id: clientInstance.clientId }
-        })
-        // publish first event notification
-        event.next({
-            id: uuidv4(),
-            type: `Transport Event`,
-            event: `New Client`,
-            date: new Date(),
-            data: {
-                clientId: clientInstance.clientId,
-                message: `New Http Client Connected. Adapter ID assigned: ${clientInstance.clientId}`,
-                payload: clientInstance
-            },
-            transport: `Http`
-        })
-        // Update connected clientInstance info to adapter
-        connectedClientHttp.push(clientInstance)
-        addClientToDB(clientInstance)
-        startListeningAndStreaming(app, clientInstance, event)
-    } else if (data.name == 'Old Client') {
-        console.log({ message: `Is old client`, details: data })
-        // update first
-        let clientInstance: ConnectedHttpClient | undefined
-        if (connectedClientHttp.length > 0) {
-            clientInstance = connectedClientHttp.find(obj => obj.clientId === data.message.id)
-            handleFoundClient(clientInstance)
-        } else {
-            // for the case server itself got shit down or something
-            checkIfClientExists(data.message.id).then((client: ConnectedHttpClient) => {
-                clientInstance = client
-                handleFoundClient(clientInstance)
-            }).catch(error => console.error(error))
-        }
-        function handleFoundClient(clientInstance: ConnectedHttpClient | undefined): void {
-            if (clientInstance) {
-                console.log({ message: `Http Client ${clientInstance. clientId} Found` })
-                res.json({ name: 'Adjusted Profile', message: { id: clientInstance.clientId } })
-                // replace socket instance since the previous has been terminated
-                clientInstance.instance = app
-                // some explanation here. For the case where the server reads from the DB, no need to terminate subject, since all instances would be destroyed alongside the server shut down. This case is specificd only when there's a need to read from local file
-                if (!clientInstance.connectionState) {
-                    clientInstance.connectionState = new BehaviorSubject<ConnectionState>(`OFFLINE`)
-                }
-                // need to start listening again, because it's assigned a different socket instance this time round
-                startListeningAndStreaming(app, clientInstance, event, true)
-                event.next({
-                    id: uuidv4(),
-                    type: `Transport Event`,
-                    event: 'Client Connected',
-                    date: new Date(),
-                    data: {
-                        clientId: clientInstance.clientId,
-                        message: `Client ${clientInstance.clientId} connection re-established`,
-                        payload: clientInstance
-                    },
-                    transport: `Http`
-                })
-
-            } else {
-                console.log({ message: `Profile Not Found` })
-                res.json({ name: 'Error', message: 'Receiver Profile Not found' })
-            }
-        }
-    }
-}
-
-export async function checkIfClientExists(id: string): Promise<ConnectedHttpClient> {
-    return new Promise((resolve, reject) => {
-        try {
-            // Check if the file exists
-            let filePath = process.env.FolderPath as string + 'clients.json'
-            if (!fs.existsSync(filePath)) {
-                console.log({ message: `File does not exist.` })
-                reject('File does not exist');
-            }
-
-            // Read and parse the data
-            const fileContent = fs.readFileSync(filePath, 'utf-8');
-            const data: any[] = JSON.parse(fileContent);
-
-            // Check if an details with the given id exists
-            let obj = data.find(entry => entry.id === id);
-
-            if (obj) {
-                console.log({ message: `Client with ID ${id} exists.` })
-            } else {
-                console.log({ message: `Client with ID ${id} does not exist.` })
-            }
-            resolve(obj);
-        } catch (error) {
-            reject(`Error reading the file`)
-        }
-    })
-}
-
-/* For Internal Usage only. Temporary serve as a way for server to keep track of clients. To be replaced in the future with better alternatives. */
-export function addClientToDB(entry: ConnectedHttpClient): void {
-    try {
-        let data: ConnectedHttpClient[] = [];
-        let filePath = process.env.FolderPath as string + 'clients.json'
-        // Check if the file exists and load existing data
-        if (fs.existsSync(filePath)) {
-            const fileContent = fs.readFileSync(filePath, 'utf-8');
-            data = JSON.parse(fileContent);
-        }
-
-        // Append the new object to the array
-        data.push({
-            id: entry.clientId,
-            dateCreated: entry.dateCreated,
-            connectionState: null,
-            instance: null
-        } as unknown as ConnectedHttpClient);
-
-        // Write the updated array back to the file
-        fs.writeFileSync(filePath, JSON.stringify(data, null, 2), 'utf-8');
-        console.log({ message: `Entry added successfully.` });
-    } catch (error) {
-        console.error({ message: 'Error writing to file:', details: error });
-    }
-}
-
-// this is for server usage only
-export function startListeningAndStreaming(app: Express, client: ConnectedHttpClient, eventListener: Observer<GeneralEvent<any>>, oldClient?: boolean): void {
-    /* Generally, we don't need this unless in the case of being the receiver */
-    app.post('/message', (req, res) => {
-        eventListener.next({
-            id: uuidv4(),
-            type: `Transport Event`,
-            event: 'New Message',
-            date: new Date(),
-            data: {
-                id: uuidv4(),
-                dateCreated: new Date(),
-                transport: `Http`,
-                target: client.clientId, // this ref to be associated with the client/channel
-                payload: req.body
-            } as TransportMessage,
-            transport: `Http`
-        })
-        res.json(`Received ${((req.body as WrappedMessage)?.payload as FisMessage)?.header?.messageID ?? `Undefined`}`)
-    })
-
-    app.get('/poll', (req, res) => {
-        console.log({ message: 'Client connected for long polling.' });
-        client.connectionState.next('ONLINE');
-
-        // notify it's associated retransmission to start releaseing buffer
-        eventListener.next({
-            id: uuidv4(),
-            type: `Transport Event`,
-            event: oldClient ? 'Client Re-connected' : `Client Connected`,
-            date: new Date(),
-            data: {
-                clientId: client.clientId,
-                message: `Socket Client ${oldClient ? `Re-Connected` : `Connected`}. Adapter ID assigned: ${client.clientId}`,
-                payload: client
-            },
-            transport: `Http`
-        })
-
-        // Flag to track if the response has been sent
-        let responseSent = false;
-
-        // Subscribe to the data stream
-        const subscription = client.responseStream.asObservable().subscribe({
-            next: (message: WrappedMessage) => {
-                if (!responseSent) {
-                    console.log({ message: `Sending data ${message.thisMessageID} to client ${client.clientId}}` });
-                    res.json(message); // Send the data to the client
-                    responseSent = true; // Mark response as sent
-                    subscription.unsubscribe(); // Unsubscribe to close this request
-                }
-            },
-            error: (err) => {
-                if (!responseSent) {
-                    console.error({ message: 'Error in data stream:', details: err });
-                    res.status(500).send('Internal Server Error');
-                    responseSent = true; // Mark response as sent
-                }
-                subscription.unsubscribe(); // Ensure cleanup
-            },
-            complete: () => {
-                if (!responseSent) {
-                    console.log({ message: 'Data stream completed.' });
-                    res.status(204).send(); // No Content
-                    responseSent = true; // Mark response as sent
-                }
-                subscription.unsubscribe(); // Ensure cleanup
-            },
-        });
-
-        // Timeout if no data is emitted within a specified duration
-        const timeout = setTimeout(() => {
-            if (!responseSent) {
-                console.log({ message: 'No data emitted. Sending timeout response.' });
-                res.status(204).send(); // No Content
-                responseSent = true; // Mark response as sent
-                subscription.unsubscribe(); // Ensure cleanup
-            }
-        }, 15000); // 15 seconds timeout (adjust as needed)
-
-        // Handle client disconnection
-        res.on('close', () => {
-            if (!responseSent) {
-                console.error({ message: `Http Client ${client.clientId} disconnected` });
-                eventListener.next({
-                    id: uuidv4(),
-                    type: `Transport Event`,
-                    event: 'Client Disconnected',
-                    date: new Date(),
-                    data: {
-                        clientId: client.clientId,
-                    },
-                    transport: `Http`
-                })
-                client.connectionState.next(`OFFLINE`)
-                subscription.unsubscribe(); // Ensure cleanup
-            }
-            clearTimeout(timeout); // Clear timeout to avoid unnecessary execution
-        });
-    });
-
-}
-
-
-// Check if filename exists. Return profile information if there's any
-export async function checkOwnClientInfo(filename?: string): Promise<ConnectedHttpServer> {
-    return new Promise((resolve, reject) => {
-        // Check if the file exists
-        let filePath = process.env.FolderPath as string
-        if (fs.existsSync(filePath + `${filename}.json`)) {
-            try {
-                // Read the file contents
-                const fileData = fs.readFileSync(`${filename}.json`, 'utf8');
-
-                // If the file is empty, return an error
-                if (fileData.trim() === "") {
-                    throw new Error("File is empty");
-                }
-
-                // Parse and return the data if present
-                const jsonData = JSON.parse(fileData);
-                resolve(jsonData)
-
-            } catch (err) {
-                // Handle parsing errors or other file-related errors
-                let errMsg: string = ("Error reading or parsing file: " + err)
-                reject(errMsg);
-            }
-        } else {
-            reject("File does not exist");
-        }
-    })
-}
-
-// Specifically to write receiver profile information
-async function writeFile(data: ConnectedHttpServer, filename: string): Promise<boolean> {
-    return new Promise((resolve, reject) => {
-        // Ensure the folder exists
-        const folderPath = process.env.FolderPath as string
-        console.log({ message: folderPath })
-        // const folderPath = path.join(__dirname, folder);
-        if (!fs.existsSync(folderPath)) {
-            fs.mkdirSync(folderPath, { recursive: true }); // Create folder if it doesn't exist
-        } else {
-            console.log({ message: 'Folder already exist' })
-        }
-
-        // Construct the full file path (include the folder)
-        const filePath = path.join(folderPath, `${filename}.json`);
-
-        // Write JSON data to a file
-        fs.writeFile(filePath, JSON.stringify(data, null, 2), (err) => {
-            if (err) {
-                console.log({ message: 'Error writing file', details: err });
-                reject(false);
-            } else {
-                console.log({ message: 'File has been written', details: filePath });
-                resolve(true);
-            }
-        });
-    });
-}
-
+// code here.

+ 2 - 3
src/utils/socket.utils.ts

@@ -2,11 +2,9 @@ import { Observable, Observer } from 'rxjs';
 import { createServer } from 'http';
 import { Server, Socket as SocketForConnectedClient } from 'socket.io';
 import { io, Socket as SocketForConnectedServer } from 'socket.io-client';
-import * as fs from 'fs'
 import { v4 as uuidv4 } from 'uuid'
 import { ConnectedSocketClient, ConnectedSocketServer } from '../transport/websocket';
 import ConsoleLogger from './log.utils';
-import path from 'path';
 import { ClientObject, GeneralEvent, TransportMessage } from '../interface/interface';
 import { addClientToDB, checkIfClientExists, checkOwnClientInfo } from './general.utils';
 const console: ConsoleLogger = new ConsoleLogger(`SocketUtils`, ['transport'])
@@ -51,6 +49,7 @@ export async function startClientSocketConnection(serverUrl: string): Promise<So
                 reconnectionDelayMax: 10000,     // Delay can grow to a max of 10 seconds
                 randomizationFactor: 0.3,
             })
+            console.log({ message: `Is this called properly?` })
             resolve(clientSocket)
         }
         catch (error) {
@@ -128,6 +127,7 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
                         socketInstance: socket
                     }
                     addClientToDB(clientProfileInfo as ConnectedSocketServer, 'servers')
+                    serversConnected.push(clientProfileInfo)
                     /* Note that there are two separate events, because transmission must first be set up before releasing buffer. */
                     // broadcast event to allow transmission manager to instantiate transmission components
                     eventNotification.next({
@@ -149,7 +149,6 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
                         transport: 'Websocket',
                         transportServiceId: transportServiceId
                     } as GeneralEvent<ClientObject>)
-                    serversConnected.push(clientProfileInfo)
                 }
             })
 

+ 7 - 6
src/utils/transport.utils.ts

@@ -5,22 +5,22 @@ import { HttpTransportService } from "../transport/http";
 import ConsoleLogger from "./log.utils";
 const console: ConsoleLogger = new ConsoleLogger(`TransportUtils`, ['transport'])
 // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.
-export function setUpTransportService(transportSet: TransportSet, event: Subject<GeneralEvent<any>>, transportServiceArray: TransportServiceInterface[], url: string, isClient?: boolean): void {
+export function setUpTransportService(transportSet: TransportSet, event: Subject<GeneralEvent<any>>, transportServiceArray: TransportServiceInterface[], url?: string): void {
     try {
         let transportService: TransportServiceInterface = instantiateTransportService(transportSet.transport, event)
         transportServiceArray.push(transportService)
         if (transportService instanceof WebsocketTransportService) {
-            console.log({ message: `Just Double Checking... this is websocket` })
-            if (isClient) {
-                // please note this is subject to change depending on the UI environemnt. Angular has their own built in function to read json file based on Swopt-UI
+            console.log({ message: `${url}` })
+            console.log({ message: `Just Double Checking... this is websocket ${url ? `client` : `server`}` })
+            if (url) {
                 transportService.startClient(url)
             } else {
                 transportService.startServer(transportSet.port);
             }
         } else if (transportService instanceof HttpTransportService) {
-            console.log({ message: `Just Double Checking... this is http` })
+            console.log({ message: `Just Double Checking... this is http ${url ? `client` : `server`}` })
             // Additional Http-specific setup if needed.
-            if (isClient) {
+            if (url) {
                 transportService.startClient(url)
             } else {
                 transportService.startServer(transportSet.port)
@@ -50,3 +50,4 @@ export function sortTransportFromEnv(transportSet: TransportSet[]): void {
     })
     console.log({ message: 'TransportSetList', details: transportSet })
 }
+