Procházet zdrojové kódy

weekly sync., but non working code

enzo před 3 týdny
rodič
revize
0c4b49db85

+ 0 - 0
dist/config/config.json → dist/src/config/config.json


+ 8 - 8
package.json

@@ -7,14 +7,14 @@
     "test": "echo \"Error: no test specified\" && exit 1",
     "build": "tsc",
     "watch": "tsc --watch",
-    "start": "node dist/index.js",
-    "proxy": "node dist/test/proxy.js",
-    "transmitter": "node dist/test/transmitter.js",
-    "receiver": "node dist/test/receiver.js",
-    "actor": "node dist/test/actor.js",
-    "dev": "node --inspect dist/test/transmitter.js",
-    "pmtransmitter": "pm2 start node dist/test/transmitter.js",
-    "pmreceiver": "pm2 start node dist/test/receiver.js"
+    "start": "node dist/src/index.js",
+    "proxy": "node dist/src/test/proxy.js",
+    "transmitter": "node dist/src/test/transmitter.js",
+    "receiver": "node dist/src/test/receiver.js",
+    "actor": "node dist/src/test/actor.js",
+    "dev": "node --inspect dist/src/test/transmitter.js",
+    "pmtransmitter": "pm2 start node dist/src/test/transmitter.js",
+    "pmreceiver": "pm2 start node dist/src/test/receiver.js"
   },
   "author": "",
   "license": "ISC",

+ 1 - 1
src/actor/transmission.actor.ts

@@ -21,7 +21,7 @@ export class TransmissionActor extends ActorBase<ActorMessage<FisMessage>> {
     }
 
     protected handleMessageReception(incomingBus: Subject<ActorMessage<FisMessage>>): void {
-        this.transmissionProfile.receiverService.getIncoming().pipe(
+        this.transmissionProfile.receiverService.getReceivables().pipe(
             // filter()
         )
     }

+ 61 - 36
src/adapters/adapter.manager.ts

@@ -1,7 +1,7 @@
-import { filter, Observable, Observer, Subject, Subscription } from "rxjs"
+import { filter, map, Observable, Observer, Subject, Subscription } from "rxjs"
 import { v4 as uuidv4 } from 'uuid'
 import ConsoleLogger from "../utils/log.utils"
-import { AdapterInterface, ClientObject, GeneralEvent, ReceiverAdapterInterface, TransmissionInterface, TransmitterAdapterInterface, Transport, TransportServiceInterface } from "../interface/interface"
+import { AdapterInterface, ClientObject, GeneralEvent, TransmissionRole, TransportType, TransportServiceInterface } from "../interface/interface"
 import { TransmitterAdapter } from "./adapter.transmitter";
 import { ReceiverAdapter } from "./adapter.receiver";
 import { AdapterManagerBase } from "../base/adapter.manager.base";
@@ -9,45 +9,71 @@ import { AdapterManagerBase } from "../base/adapter.manager.base";
 export class AdapterManager extends AdapterManagerBase {
     private console: ConsoleLogger = new ConsoleLogger(`AdapterManager`, ['managers'])
 
-    constructor(event: Subject<GeneralEvent<any>>, browserEnv?: boolean) {
+    constructor(generalEvent: Subject<GeneralEvent<any>>, browserEnv?: boolean) {
         super()
         this.browserEnv = browserEnv ?? false
         this.console.log({ message: `Contructing self... ${this.browserEnv ? `is receiving end` : `is not browser env`}` })
-        this.event = event
+        this.event = new Subject<GeneralEvent<any>>()
 
-        this.connectToExistingTransport(this.event)
+        this.connectToExistingTransport(generalEvent)
+        this.initializeAdapterInstantiation(generalEvent, this.event)
     }
 
-    public subscribeForAdapters(): Observable<GeneralEvent<AdapterInterface>> {
-        return new Observable((observer: Observer<GeneralEvent<AdapterInterface>>) => {
-            const subscription: Subscription = this.event.pipe(
-                filter(event => event.type === `General Event`),
-                filter(event => event.event === `New Transmission`),
-            ).subscribe((event: GeneralEvent<TransmissionInterface>) => {
-                let adapters: AdapterInterface[] = this.instantiateAdapterComponents(event.data.clientInfo)
-                if (adapters.length > 0) {
-                    adapters.forEach((adapter: AdapterInterface) => {
-                        observer.next({
-                            id: uuidv4(),
-                            type: `Adapter Event`,
-                            event: `New Adapter`,
-                            date: new Date(),
-                            data: adapter
-                        })
-                    })
-                } else {
-                    // handler for no adapters instantiated for this new client
+    public subscribeForAdapters(clientId: string, role: TransmissionRole): Observable<AdapterInterface> {
+        return new Observable((adapters: Observer<AdapterInterface>) => {
+            this.adapters.forEach((adapter: AdapterInterface) => {
+                if (adapter.getAdapterProfile(`clientId`) == clientId && adapter.getAdapterProfile(`role`) == role) {
+                    adapters.next(adapter)
                 }
             })
 
+            // According to chatgpt, any new adpaters that I insert into the array therafter will not be picked up, so need to manually listen to adapter evnet
+            const subscription: Subscription = this.event.pipe(
+                filter(event => event.type === `Adapter Event`),
+                filter(event => event.event === `New Adapter`),
+                filter(event => (event.data as AdapterInterface).getAdapterProfile(`role`) === role),
+                filter(event => (event.data as AdapterInterface).getAdapterProfile(`clientId`) === clientId),
+                map(event => { return (event.data as AdapterInterface) })
+            ).subscribe(adapter => {
+                adapters.next(adapter)
+            })
+
             return () => {
                 subscription.unsubscribe()
             }
         })
     }
 
-    private connectToExistingTransport(event: Subject<GeneralEvent<any>>): void {
-        this.event.pipe(
+    private initializeAdapterInstantiation(generalEvent: Subject<GeneralEvent<any>>, transportEvent: Subject<GeneralEvent<any>>): void {
+        generalEvent.pipe(
+            filter(event => event.type === `Transport Event`),
+            map(event => { return event as GeneralEvent<any> })
+        ).subscribe(transportEvent)
+
+        transportEvent.pipe(
+            filter(event => event.type === `Transport Event`),
+        ).subscribe(event => {
+            if (event.event == `New Client` || event.event == `New Server`) {
+                let adapters: AdapterInterface[] = this.instantiateAdapterComponents(event.data as ClientObject, event.transport, event.transportServiceId as string)
+                adapters.forEach(adapter => {
+                    transportEvent.next({
+                        id: uuidv4(),
+                        type: `Adapter Event`,
+                        event: `New Adapter`,
+                        date: new Date(),
+                        data: adapter,
+                        transport: event.transport,
+                        transportServiceId: event.transportServiceId
+                    } as GeneralEvent<AdapterInterface>)
+                })
+            } else if (event.event == `Client Disconnected` || event.event == `Server Disconnected`) {
+                // logic here ... pass...
+            }
+        })
+    }
+
+    private connectToExistingTransport(generalEvent: Subject<GeneralEvent<any>>): void {
+        generalEvent.pipe(
             filter(event => event.type === `General Event`),
             filter(event => event.event === `Available Transport` || event.event === `New Transport`)
         ).subscribe((event: GeneralEvent<TransportServiceInterface[]>) => {
@@ -55,18 +81,16 @@ export class AdapterManager extends AdapterManagerBase {
             transportServices.forEach((transportService: TransportServiceInterface) => {
                 this.console.log({ message: `updating transport in adapter ${transportService.getInfo().transportServiceId}` })
                 this.updateTransportServicesRecord(transportService)
-            })      
+            })
         })
         // to automatically connect to existing started transport services
-        this.event.next({
+        generalEvent.next({
             id: uuidv4(),
             type: 'Adapter Event',
             event: `Adapter Manager Started`,
             date: new Date(),
-            data: {
-                event: event
-            }
-        })
+            data: null
+        } as GeneralEvent<any>)
     }
 
     private updateTransportServicesRecord(transportService: TransportServiceInterface): void {
@@ -79,17 +103,18 @@ export class AdapterManager extends AdapterManagerBase {
     }
     /* At the moment, all adapters are instantiated based on which transport the client is connected. For multi type adapters instantiation logic for one client, this 
     will be for future enhancements. */
-    private instantiateAdapterComponents(clientRef: ClientObject): AdapterInterface[] {
-        let transportService = this.transportServiceArray.find(obj => obj.getInfo().transportServiceId === clientRef.transportServiceId)
+    private instantiateAdapterComponents(clientRef: ClientObject, transportType: TransportType, transportServiceId: string): AdapterInterface[] {
+        let transportService = this.transportServiceArray.find(obj => obj.getInfo().transportServiceId === transportServiceId)
         if (transportService) {
-            this.console.log({ message: `Instantiating adapters for client ${clientRef.clientId}` });
+            this.console.log({ message: `Instantiating ${transportType} adapters for client ${clientRef.clientId}` });
             const adapters: AdapterInterface[] = [
                 new TransmitterAdapter(clientRef.clientId, transportService.getInfo().transport, transportService),
                 new ReceiverAdapter(clientRef.clientId, transportService.getInfo().transport, transportService),
             ];
+            this.adapters.concat(adapters)
             return adapters;
         } else {
-            this.console.error({ message: `Transport Service id ${clientRef.transportServiceId} not found. Unable to instantiate adapters.` })
+            this.console.error({ message: `Transport Service id ${transportServiceId} not found. Unable to instantiate adapters.` })
             return []
         }
     }

+ 39 - 11
src/adapters/adapter.receiver.ts

@@ -1,10 +1,10 @@
 import dotenv from 'dotenv';
-import { filter, Observable, Observer, Subscription, takeWhile } from 'rxjs';
+import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Observer, Subscription, takeWhile } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
 import ConsoleLogger from '../utils/log.utils';
 import { WrappedMessage } from '../utils/message.ordering';
 import { AdapterBase } from '../base/adapter.base';
-import { FisMessage, GeneralEvent, Transport, TransportMessage, TransportServiceInterface } from '../interface/interface';
+import { ClientObject, ConnectionState, FisMessage, GeneralEvent, TransportType, TransportMessage, TransportServiceInterface } from '../interface/interface';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
@@ -12,22 +12,22 @@ So how?: */
 export class ReceiverAdapter extends AdapterBase {
     private console!: ConsoleLogger
 
-    constructor(adapterId: string, adapterType: Transport, transportService: TransportServiceInterface) {
+    constructor(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface) {
         super()
-        this.setAdapterProfile(adapterId, adapterType, 'Receiver')
         this.console = new ConsoleLogger(`${adapterType}ReceiverAdapter`, ['adapter'])
-        this.transportService = transportService
+        this.setAdapterProfile(clientId, adapterType, transportService, 'Receiver')
+        this.setupConnectionState(transportService)
 
-        this.console.log({ message: `Contructing ReceiverAdapter for client: ${adapterId}` })
+        this.console.log({ message: `Contructing ReceiverAdapter for clientId: ${clientId}` })
     }
 
     subscribeForIncoming(): Observable<GeneralEvent<any>> {
-        this.console.log({ message: `Connector getting message bus for this connector: ${this.adapterId}` })
+        this.console.log({ message: `Connector getting message bus for this connector: ${this.AdapterProfile.clientId}` })
         return new Observable((observable: Observer<GeneralEvent<any>>) => {
-            const subscription: Subscription = this.transportService.subscribeForEvent().pipe(
-                filter(event => event.type === `Transport Event`), 
+            const subscription: Subscription = this.AdapterProfile.transportService.subscribeForEvent().pipe(
+                filter(event => event.type === `Transport Event`),
+                filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).self === this.AdapterProfile.clientId ),
                 filter((message: GeneralEvent<any>) => message.event === 'New Message'),
-                filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).target == this.adapterId),
             ).subscribe((message: GeneralEvent<TransportMessage>) => {
                 this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })
                 observable.next({
@@ -35,7 +35,9 @@ export class ReceiverAdapter extends AdapterBase {
                     type: `Adapter Event`,
                     event: `New Message`,
                     date: new Date(),
-                    data: message.data
+                    data: message.data,
+                    transport: undefined,
+                    transportServiceId: null
                 } as GeneralEvent<TransportMessage>);
             });
 
@@ -47,6 +49,32 @@ export class ReceiverAdapter extends AdapterBase {
         });
     }
 
+    // generally not required but can be used for other indicative purposes
+    getConnectionState(): Observable<ConnectionState> {
+        return this.AdapterProfile.connectionState.asObservable()
+    }
+
+    // this is irrelevant at this point in time. Adapter will just listen regardless of whether there's connection or not
+    private setupConnectionState(transportService: TransportServiceInterface): void {
+        transportService.subscribeForEvent().pipe(
+            filter(event => event.type === `Transport Event`),
+            filter(event => (event.data as ClientObject).clientId === this.AdapterProfile.clientId),
+            map(event => {
+                if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
+                    return 'OFFLINE'
+                } else {
+                    return `ONLINE`
+                }
+            }),
+            distinctUntilChanged()
+        ).subscribe((signal: ConnectionState) => {
+            this.AdapterProfile.connectionState.next(signal)
+            if (signal == 'OFFLINE') this.console.error({ message: `${this.AdapterProfile.clientId} disconnected` })
+            if (signal == 'ONLINE') this.console.log({ message: `${this.AdapterProfile.clientId} connected and ready to go` })
+        })
+    }
+
+
 }
 
 

+ 35 - 16
src/adapters/adapter.transmitter.ts

@@ -1,9 +1,10 @@
 import dotenv from 'dotenv';
-import { Subject } from 'rxjs';
+import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Subject } from 'rxjs';
 import { WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
 import { AdapterBase } from '../base/adapter.base';
-import { FisMessage, TransmissionRole, Transport, TransportMessage, TransportServiceInterface } from '../interface/interface';
+import { ClientObject, ConnectionState, FisMessage, TransportType, TransportMessage, TransportServiceInterface } from '../interface/interface';
+import { fileURLToPath } from 'url';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
@@ -11,30 +12,48 @@ So how?: */
 export class TransmitterAdapter extends AdapterBase {
     private console!: ConsoleLogger
 
-    constructor(adapterId: string, adapterType: Transport, transportService: TransportServiceInterface) {
+    constructor(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface) {
         super()
-        this.setAdapterProfile(adapterId, adapterType, 'Transmitter')
         this.console = new ConsoleLogger(`${adapterType}TransmitterAdapter`, ['adapter'])
-        this.transportService = transportService
+        this.setAdapterProfile(clientId, adapterType, transportService, 'Transmitter')
+        this.setupConnectionState(transportService)
 
-        this.console.log({ message: `Contructing TransmitterAdapter for client: ${adapterId}` })
+
+        this.console.log({ message: `Contructing TransmitterAdapter for client: ${clientId}` })
     }
 
-    emit(message: WrappedMessage): void {
+    emit(selfId: string, message: WrappedMessage): void {
         // logic here
-        this.console.log({ message: `Emitting: ${(message.payload as FisMessage).header.messageID} to ${this.adapterId}` })
-        this.transportService.emit({
-            id: this.adapterId,
-            transport: this.transport,
-            target: this.adapterId, // this should be directed to the channel/client established. Make sure this is right
+        this.console.log({ message: `Emitting: ${(message.payload as FisMessage).header.messageID} to ${this.AdapterProfile.clientId}` })
+        this.AdapterProfile.transportService.emit({
+            id: this.AdapterProfile.clientId,
+            self: selfId,
+            target: this.AdapterProfile.clientId,
             payload: message
         } as TransportMessage)
     }
 
-    setAdapterProfile(id: string, adapterType: Transport, role: TransmissionRole): void {
-        this.adapterId = id
-        this.transport = adapterType
-        this.role = role
+    getConnectionState(): Observable<ConnectionState> {
+        return this.AdapterProfile.connectionState.asObservable()
+    }
+
+    private setupConnectionState(transportService: TransportServiceInterface): void {
+        transportService.subscribeForEvent().pipe(
+            filter(event => event.type === `Transport Event`),
+            filter(event => (event.data as ClientObject).clientId === this.AdapterProfile.clientId),
+            map(event => {
+                if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
+                    return 'OFFLINE'
+                } else {
+                    return `ONLINE`
+                }
+            }),
+            distinctUntilChanged()
+        ).subscribe((signal: ConnectionState) => {
+            this.AdapterProfile.connectionState.next(signal)
+            if (signal == 'OFFLINE') this.console.error({ message: `${this.AdapterProfile.clientId} disconnected` })
+            if (signal == 'ONLINE') this.console.log({ message: `${this.AdapterProfile.clientId} connected and ready to go` })
+        })
     }
 
 }

+ 30 - 10
src/base/adapter.base.ts

@@ -1,26 +1,46 @@
-import { Observable, Subject } from "rxjs";
+import { BehaviorSubject, Observable, Subject } from "rxjs";
 import dotenv from 'dotenv';
-import { AdapterInterface, TransmissionRole, Transport, TransportServiceInterface } from "../interface/interface";
+import { AdapterInterface, ConnectionState, TransmissionRole, TransportType, TransportServiceInterface, AdapterProfile } from "../interface/interface";
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class AdapterBase implements AdapterInterface {
-    protected transportService!: TransportServiceInterface
-    adapterId!: string;
-    role!: TransmissionRole;
-    transport: Transport;
+    protected AdapterProfile!: AdapterProfile
+    // protected clientId!: string
+    // protected transportService!: TransportServiceInterface
+    // protected role!: TransmissionRole;
+    // protected connectionState!: Subject<ConnectionState>
+    // protected transport: TransportType;
 
     constructor() {
         //logic here
     }
 
-    setAdapterProfile(id: string, adapterType: Transport, role: TransmissionRole): void {
-        this.adapterId = id
-        this.transport = adapterType
-        this.role = role
+    setAdapterProfile(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface, role: TransmissionRole): void {
+        this.AdapterProfile.clientId = clientId
+        this.AdapterProfile.connectionState = new BehaviorSubject<ConnectionState>(`OFFLINE`)
+        this.AdapterProfile.role = role
+        this.AdapterProfile.transportType = adapterType
+        this.AdapterProfile.transportService = transportService
     }
 
+    getAdapterProfile(type?: `clientId` | `role` | `transportId` | `transportType` | `connectionState`): string | BehaviorSubject<ConnectionState> | AdapterProfile | undefined {
+        if (!type) {
+            return this.AdapterProfile
+        } else if (type == `clientId`) {
+            return this.AdapterProfile.clientId
+        } else if (type == `connectionState`) {
+            return this.AdapterProfile.connectionState
+        } else if (type == `role`) {
+            return this.AdapterProfile.role
+        } else if (type == `transportType`) {
+            return this.AdapterProfile.transportType
+        } else if (type == `transportId`) {
+            return this.AdapterProfile.transportService.getInfo().transportServiceId
+        }
+
+    }
 }
 
 

+ 2 - 2
src/base/adapter.manager.base.ts

@@ -1,5 +1,5 @@
 import { Observable, Subject } from "rxjs"
-import { AdapterInterface, AdapterManagerInterface, GeneralEvent, TransportServiceInterface } from "../interface/interface"
+import { AdapterInterface, AdapterManagerInterface, GeneralEvent, TransmissionRole, TransportServiceInterface } from "../interface/interface"
 
 export class AdapterManagerBase implements AdapterManagerInterface {
     protected browserEnv!: boolean
@@ -11,7 +11,7 @@ export class AdapterManagerBase implements AdapterManagerInterface {
         // logic here
     }
 
-    subscribeForAdapters(): Observable<GeneralEvent<any>> {
+    subscribeForAdapters(selfId: string, receiverId: string, role: TransmissionRole): Observable<AdapterInterface> {
         throw new Error("Method not implemented.")
     }
 

+ 1 - 3
src/base/msg.transmission.base.ts

@@ -1,11 +1,9 @@
-import { Observable, Subject } from 'rxjs';
+import { Observable, Subject, Subscription } from 'rxjs';
 import { AdapterInterface,  GeneralEvent,  MessageTransmissionInterface } from '../interface/interface'
 
 export class MessageTransmissionBase implements MessageTransmissionInterface {
     protected clientId!: string;
     protected adapters: AdapterInterface[] = []
-    protected event!: Subject<GeneralEvent<any>>
-
     constructor() {
         // logic here
     }

+ 11 - 11
src/base/msg.transmission.manager.base.ts

@@ -1,30 +1,30 @@
 
 import { Observable, Observer, Subject, Unsubscribable } from 'rxjs';
-import { ClientObject, GeneralEvent, MessageTransmissionManagerInterface, TransmissionInterface } from '../interface/interface';
+import { ClientObject, GeneralEvent, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmissionInterface, MessageTransmissionManagerInterface, MessageTransmitterInterface } from '../interface/interface';
 import { ActorInterface, ActorProfile } from '../interface/actor.sample';
 import { ActorBase } from './actor.base';
 import { AdapterManager } from '../adapters/adapter.manager';
 
 export class MessageTransmissionManagerBase implements MessageTransmissionManagerInterface {
     protected browserEnv!: boolean
-    protected transmissionSet: TransmissionInterface[] = []
+    protected tranmissionRef: MessageTransmissionInterface[] = []
     protected adapterManager!: AdapterManager
     protected event!: Subject<GeneralEvent<any>>
 
     constructor() {
         // logic here
     }
-    subscribeForTransmission(): Observable<GeneralEvent<any>> {
-        throw new Error('Method not implemented.');
+    
+    getTransmitter(receiverId: string): MessageTransmitterInterface {
+        throw new Error(`Method not implemented`)
     }
 
-    protected instantiateTransmissionComponents(clientObj: ClientObject, event: Subject<GeneralEvent<any>>): TransmissionInterface {
+    getReceiver(transmitterid: string): MessageReceiverInterface {
         throw new Error(`Method not implemented`)
-        /* Subscribe to adapterEvent, adapter will give adapter with the relevant client information. 
-        It will first check array of transmission clinetID, so if new clientId is detected then a new 
-        transmission set will be instantiated. Then an observable instance for each transmission components
-        will be attacked to each of the transmission componetns, so that the respective transmission 
-        can handle more or less of the adapters instantiated by the adapter manager based on the new
-        adapters or it ohter cases, terminating of adaptesr as well. */
     }
+
+    getRequestResponse(clientId: string, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface): MessageRequestResponseInterface {
+        throw new Error(`Method not implemented`)
+    }
+
 }

+ 0 - 0
src/interface/game.ts


+ 25 - 24
src/interface/interface.ts

@@ -8,16 +8,19 @@ export interface GeneralEvent<T> {
     event: EventMessage,
     date: Date,
     data: T
-    transport?: Transport,
+    transport: TransportType,
+    transportServiceId: string | null
 }
 
+
 /* MANAGEMENT */
 export interface MessageTransmissionManagerInterface {
-    subscribeForTransmission(): Observable<GeneralEvent<any>>
+    getTransmitter(receiverId: string): MessageTransmissionInterface
+    getReceiver(transmitterid: string): MessageReceiverInterface
 }
 
 export interface AdapterManagerInterface {
-    subscribeForAdapters(): Observable<GeneralEvent<any>>
+    subscribeForAdapters(selfId: string, receiverId: string, role: TransmissionRole): Observable<AdapterInterface>
 }
 
 
@@ -26,7 +29,7 @@ export interface MessageTransmissionInterface {
 }
 
 export interface MessageReceiverInterface extends MessageTransmissionInterface {
-    getIncoming(): Observable<any>
+    getReceivables(): Observable<any>
 }
 
 export interface MessageTransmitterInterface extends MessageTransmissionInterface {
@@ -39,9 +42,8 @@ export interface MessageRequestResponseInterface extends MessageTransmissionInte
 
 /* ADAPTER COMPONENTS */
 export interface AdapterInterface {
-    adapterId: string,
-    role: TransmissionRole,
-    transport: Transport
+    setAdapterProfile(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface, role: TransmissionRole): void
+    getAdapterProfile(type?: `clientId` | `role` | `transportId` | `transportType` | `connectionState`): AdapterProfile | string | BehaviorSubject<ConnectionState> | undefined
 }
 
 export interface TransmitterAdapterInterface extends AdapterInterface {
@@ -72,8 +74,9 @@ export interface RequestResponseProfile extends TransmissionProfile {
 export interface TransportMessage {
     id: string,
     dateCreated: Date,
-    transport: Transport,
-    target?: string,
+    transport: TransportType,
+    self: string,
+    target: string,
     payload: any
 }
 
@@ -85,15 +88,8 @@ export interface FisMessage {
     data: any
 }
 
-export interface TransmissionInterface {
-    clientId: string,
-    transmitter: MessageTransmitterInterface,
-    receiver: MessageReceiverInterface,
-    requestResponse: MessageRequestResponseInterface,
-    clientInfo: ClientObject
-}
 
-export type Transport = 'Websocket' | 'Http' | 'TCP' | undefined
+export type TransportType = 'Websocket' | 'Http' | 'TCP' | undefined
 
 export enum AdapterTransmissionRole {
     Transmitter,
@@ -104,7 +100,7 @@ export enum AdapterTransmissionRole {
 
 export type EventType = `General Event` | 'Transport Event' | 'Transmission Event' | 'Adapter Event'
 export type TransmissionRole = `Transmitter` | 'Receiver' | 'RequestResponse'
-export type EventMessage = 'Server Started' | 'New Client' | 'Client Connected' | 'Client Re-connected' | 'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter' | 'Re-Flush' | 'New Transport' | 'New Transmission' | 'Adapter Manager Started' | 'Available Transport'
+export type EventMessage = 'Server Started' | 'New Client' | 'Client Connected' | 'Client Re-connected' | 'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter' | 'Re-Flush' | 'New Transport' | 'New Transmission' | 'Adapter Manager Started' | 'Available Transport' | 'Pushing Adapter'
 export type ConnectionState = 'ONLINE' | 'OFFLINE'
 
 export interface TransportServiceInterface {
@@ -115,22 +111,27 @@ export interface TransportServiceInterface {
 
 export interface TransportServiceProfile {
     transportServiceId: string,
-    transport: Transport
+    transport: TransportType
 }
 
 export interface Info {
-    transport: Transport
+    transport: TransportType
 }
 
 export interface ClientObject {
     clientId: string,
     dateCreated: Date,
-    connectionState: BehaviorSubject<ConnectionState>,
-    transport: Transport,
-    transportServiceId: string 
 }
 
 export interface TransportSet {
-    transport: Transport,
+    transport: TransportType,
     port: number
+}
+
+export interface AdapterProfile {
+    clientId: string,
+    role: TransmissionRole,
+    transportType: TransportType,
+    transportService: TransportServiceInterface,
+    connectionState: BehaviorSubject<ConnectionState>
 }

+ 6 - 6
src/test/proxy.ts

@@ -6,8 +6,8 @@ import express, { Response } from 'express';
 import { Express } from 'express';
 import { postAxiosRequest } from '../utils/http.utils';
 import axios from 'axios';
-let fromServer = new Subject<{ event: 'profile' | 'message', payload: any }>()
-let toServer = new Subject<{ event: 'profile' | 'message', payload: any }>()
+let fromServer = new Subject<{ event: 'handshaking' | 'message', payload: any }>()
+let toServer = new Subject<{ event: 'handshaking' | 'message', payload: any }>()
 
 startSocketServer(3001)
 // startSocketServer(3002)
@@ -28,8 +28,8 @@ function startSocketServer(port: number): void {
     let socketServer = new Server(httpServer)
 
     socketServer.on('connection', (socket: SocketForConnectedClient) => {
-        socket.on(`profile`, (msg) => {
-            toServer.next({ event: 'profile', payload: msg })
+        socket.on(`handshaking`, (msg) => {
+            toServer.next({ event: 'handshaking', payload: msg })
         })
 
         socket.on('message', (msg) => {
@@ -76,8 +76,8 @@ function startClientSocketConnection(serverUrl: string): void {
 
     })
 
-    clientSocket.on(`profile`, (msg) => {
-        fromServer.next({ event: 'profile', payload: msg })
+    clientSocket.on(`handshaking`, (msg) => {
+        fromServer.next({ event: 'handshaking', payload: msg })
     })
 
     clientSocket.on(`message`, (msg) => {

+ 84 - 71
src/test/receiver.ts

@@ -4,17 +4,22 @@ import config from '../config/config.json';
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { FisMessage, GeneralEvent, TransmissionInterface, Transport, TransportMessage, TransportServiceInterface, TransportSet } from "../interface/interface";
+import { FisMessage, GeneralEvent, Transport, TransportMessage, TransportServiceInterface, TransportSet } from "../interface/interface";
 import { WebsocketTransportService } from "../transport/websocket";
 import { HttpTransportService } from "../transport/http";
+import clientProfile from '../../clients/serverprofile.json'
+import serverProfile from '../../clients/serverprofile.json'
+import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
+import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
+import { MessageTransmissionRequestResponse } from "../transmission/msg.transmission.request-response";
 
+/* This is Emulating UI perspective. Using just browser environment, and not using any dotenv. */
 class Supervisor {
     private generalBus: Subject<GeneralEvent<any>> = new Subject()
     private console = new ConsoleLogger('Supervisor', ['base'])
     private isClient: boolean = true
     private transmissionManager!: MessageTransmissionManager
     private event: Subject<GeneralEvent<any>>
-    private transmissionSets: TransmissionInterface[] = []
     private outgoingPipe: Subject<any> = new Subject()
     private transportServiceArray: TransportServiceInterface[] = []
     private transportSet: TransportSet[] = []
@@ -34,71 +39,56 @@ class Supervisor {
     }
 
     private startMessageTransmission(): void {
-        // every new remote client connected, a new transmission object will be instantiated to allow message transmission
-        this.transmissionManager.subscribeForTransmission().pipe(
-            filter(event => event.type === `Transmission Event`),
-            filter(event => event.event === `New Transmission`)
-        ).subscribe((event: GeneralEvent<TransmissionInterface>) => {
-            // broadcast to indicate a transmission object is ready, this signal will be received by adapter manager to instantiate the releavnt adapters to be used
-            this.event.next({
-                id: uuidv4(),
-                type: `General Event`,
-                event: `New Transmission`,
-                date: new Date(),
-                data: event.data
-            })
-            let transmission: TransmissionInterface = event.data
-            this.console.log({ message: `Acquired transmission set for client ${transmission.clientId}` })
-            // updating transmission records. No logic for keeping track of client state, as that will be handled exclusively by tranmistter transmission at the moment
-            this.transmissionSets.push(transmission)
+        let transmitter: MessageTransmissionTransmitter = this.transmissionManager.getTransmitter(clientProfile.clientId) as MessageTransmissionTransmitter
+        let receiver: MessageTransmissionReceiver = this.transmissionManager.getReceiver(clientProfile.clientId) as MessageTransmissionReceiver
+        let requestResponse: MessageTransmissionRequestResponse = this.transmissionManager.getRequestResponse(clientProfile.id, transmitter, receiver)
 
-            this.handleClientActivity(transmission)
-        })
-    }
-
-    private handleClientActivity(messageTransmission: TransmissionInterface): void {
-        // start listening to incoming messages from this client
-        messageTransmission.receiver.getIncoming().subscribe((event: GeneralEvent<any>) => {
-            this.console.log({ message: `General Bus ${event.event} ${(((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header?.messageID ?? 'Not Message'}`, details: event })
-            this.generalBus.next(event)
-        })
+        // emit Message only
+        // this.emitMessage(transmitter, this.generateNotifcation())
 
-        // for all the responses or messages to be emitted on the provider perspective
-        this.outgoingPipe.subscribe((message: FisMessage) => {
-            messageTransmission.transmitter.emit(message)
-        })
+        // receive Message only
+        this.streamMessage(receiver)
 
-        // test sample to simulate request response, but not using request response transmission
-        let request: FisMessage = {
-            header: {
-                messageID: uuidv4(),
-                messageName: 'RequestMessage'
-            },
-            data: 'Data'
-        }
-        // this.request(request, messageTransmission).subscribe({
-        //     next: res => this.console.log({ message: `received ${res.header.messageID}`, details: res }),
-        //     complete: () => this.console.log({ message: `Responses Completed for request: ${request.header.messageID}` })
+        // request-response emulation
+        // this.sendMessage(requestResponse, { header: { messageID: `123`, messageName: `RequestMessage` }, data: `Test Data` } as FisMessage).subscribe({
+        //     next: message => this.console.log({ message: `Received response message for request ${(message as FisMessage).header.messageID}` }),
+        //     error: error => this.console.error({ message: `Something went wrong`, details: error }),
+        //     complete: () => this.console.log({ message: `Request completed` })
         // })
+    }
 
-        // test sample to stream new messages every second
-        // this.startGeneratingRequest(1000, this.outgoingPipe)
+
+    private emitMessage(transmitter: MessageTransmissionTransmitter, source: Subject<any> | Observable<any>): void {
+        source.subscribe(message => transmitter.emit(message))
     }
 
-    private request(request: FisMessage, messageTransmission: TransmissionInterface): Observable<any> {
+    private sendMessage(requestResponse: MessageTransmissionRequestResponse, message: any): Observable<any> {
         return new Observable((response: Observer<any>) => {
-            messageTransmission.transmitter.emit(request)
-            const subscription: Subscription = this.generalBus.pipe(
-                filter(event => event.event == 'New Message'),
-                filter(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID === request.header.messageID),
-                map(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage))
-            ).subscribe(message => {
-                if (message.data == 'Complete') {
+            requestResponse.send(message).subscribe({
+                next: respMessage => {
+                    response.next(respMessage)
+                },
+                error: error => {
+                    this.console.error({ message: error })
+                    response.error(error)
+                },
+                complete: () => {
+                    this.console.log({ message: `Response Completed for message: ${message.id ?? 'undefined message'}` })
                     response.complete()
-                } else {
-                    response.next(message)
                 }
             })
+        })
+    }
+
+    private streamMessage(receiverInstance: MessageTransmissionReceiver): Observable<any> {
+        return new Observable((response: Observer<any>) => {
+            const subscription: Subscription = receiverInstance.getReceivables().subscribe({
+                next: message => {
+                    response.next(message)
+                },
+                error: error => response.error(error),
+                complete: () => response.complete()
+            })
 
             // Clean up on unsubscription
             return () => {
@@ -107,20 +97,6 @@ class Supervisor {
         })
     }
 
-
-    private startGeneratingRequest(intervalDuration: number, requestsPipe: Subject<FisMessage>) {
-        interval(intervalDuration).subscribe(time => {
-            let message: FisMessage = {
-                header: {
-                    messageID: uuidv4(),
-                    messageName: 'NotificationMessage'
-                },
-                data: 'Data'
-            }
-            requestsPipe.next(message)
-        })
-    }
-
     private setUpTransportService(transportSet: TransportSet, event: Subject<GeneralEvent<any>>, isClient?: boolean): void {
         try {
             let transportService: TransportServiceInterface = this.instantiateTransportService(transportSet.transport, event)
@@ -179,11 +155,48 @@ class Supervisor {
                 type: `General Event`,
                 event: `Available Transport`,
                 date: new Date(),
-                data: this.transportServiceArray
+                data: this.transportServiceArray,
+                transport: undefined,
+                transportServiceId: null
             })
         })
     }
 
+    private generateNotifcation(): Observable<FisMessage> {
+        return new Observable((response: Observer<FisMessage>) => {
+            const intervalMessageGeneration = interval(1000).pipe(
+                map(() => {
+                    const message: FisMessage = {
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'NotificationMessage'
+                        },
+                        data: `Data`
+                    };
+                    return message;
+                })
+            );
+
+            const subscription = intervalMessageGeneration.subscribe({
+                next: message => response.next(message),
+                error: error => response.error(error),
+                complete: () => {
+                    response.next({
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'NotificationMessage'
+                        },
+                        data: `Complete`
+                    });
+                    response.complete();
+                }
+            });
+
+            // Ensure cleanup on unsubscribe
+            return () => subscription.unsubscribe();
+        });
+    }
+
 }
 
 let supervisor = new Supervisor()

+ 62 - 37
src/test/transmitter.ts

@@ -3,18 +3,23 @@ import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { FisMessage, GeneralEvent, TransmissionInterface, Transport, TransportMessage, TransportServiceInterface, TransportSet } from "../interface/interface";
+import { FisMessage, GeneralEvent, MessageTransmissionInterface, Transport, TransportMessage, TransportServiceInterface, TransportSet } from "../interface/interface";
 import config from '../config/config.json';
 import { startSocketServer } from "../utils/socket.utils";
 import { WebsocketTransportService } from "../transport/websocket";
 import { HttpTransportService } from "../transport/http";
+import clientProfile from '../../clients/serverprofile.json'
+import serverProfile from '../../clients/serverprofile.json'
+import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
+import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
+import { MessageTransmissionRequestResponse } from "../transmission/msg.transmission.request-response";
+import { error } from "console";
 class Supervisor {
     private console = new ConsoleLogger('Supervisor', ['base'])
     private clientIncomingMessage: Subject<FisMessage> = new Subject()
     private messageProducer!: MessageProducer
     private transmissionManager!: MessageTransmissionManager
     private event!: Subject<GeneralEvent<any>>
-    private transmissionSets: TransmissionInterface[] = []
     private transportSet: TransportSet[] = []
     private transportServiceArray: TransportServiceInterface[] = []
 
@@ -31,49 +36,67 @@ class Supervisor {
         this.transmissionManager = new MessageTransmissionManager(this.event)
         this.startMessageTransmission()
 
-        // testing
-        // this.event.subscribe(event => {
-        //     this.console.log({ message: `Supervisor Event: ${event.type} && ${event.event}` })
-        // })
     }
 
     private startMessageTransmission(): void {
-        this.transmissionManager.subscribeForTransmission().pipe(
-            filter(event => event.type === `Transmission Event`),
-            filter(event => event.event == `New Transmission`)
-        ).subscribe((event: GeneralEvent<TransmissionInterface>) => {
-            this.event.next({
-                id: uuidv4(),
-                type: `General Event`,
-                event: `New Transmission`,
-                date: new Date(),
-                data: event.data
-            })
-            let transmission: TransmissionInterface = event.data
-            this.console.log({ message: `Acquired transmission set for client ${transmission.clientId}` })
-            // update transmission record on every new transmission object instantiated
-            this.transmissionSets.push(transmission)
-            this.handleClientActivity(transmission)
+        let transmitter: MessageTransmissionTransmitter = this.transmissionManager.getTransmitter(serverProfile.clientId) as MessageTransmissionTransmitter
+        let receiver: MessageTransmissionReceiver = this.transmissionManager.getReceiver(serverProfile.clientId) as MessageTransmissionReceiver
+        let requestResponse: MessageTransmissionRequestResponse = this.transmissionManager.getRequestResponse(serverProfile.id, transmitter, receiver)
+
+        // emit Message only
+        this.emitMessage(transmitter, this.messageProducer.getNotificationMessage())
+        // this.emitMessage(transmitter, this.messageProducer.getOutgoingMessages())
+
+        // receive Message only
+        this.streamMessage(receiver).subscribe(message => {
+            this.console.log({ message: message.header.messageID })
+            // this.clientIncomingMessage.next(message)
+        })
+
+        // request-response emulation
+        this.sendMessage(requestResponse, { header: { messageID: `123`, messageName: `RequestMessage` }, data: `Test Data` } as FisMessage).subscribe({
+            next: message => this.console.log({ message: `Received response message for request ${(message as FisMessage).header.messageID}` }),
+            error: error => this.console.error({ message: `Something went wrong`, details: error }),
+            complete: () => this.console.log({ message: `Request completed` })
         })
     }
 
-    // only called once for each connected clients.
-    private handleClientActivity(messageTransmission: TransmissionInterface): void {
-        // start listening to incoming messages from this client
-        messageTransmission.receiver.getIncoming().subscribe((event: GeneralEvent<TransportMessage>) => {
-            let requestMessage: FisMessage = ((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage
-            this.console.log({ message: `General Bus ${requestMessage?.header?.messageID ?? 'Not a message'}`, details: event }) // receiving end
-            // this.clientIncomingMessage.next(requestMessage)
-            this.messageProducer.getOutgoingMessages().pipe(
-                filter(message => message.header.messageID === requestMessage.header.messageID)
-            ).subscribe(message => {
-                messageTransmission.transmitter.emit(message)
+    private emitMessage(transmitter: MessageTransmissionTransmitter, source: Subject<any> | Observable<any>): void {
+        source.subscribe(message => transmitter.emit(message))
+    }
+
+    private sendMessage(requestResponse: MessageTransmissionRequestResponse, message: any): Observable<any> {
+        return new Observable((response: Observer<any>) => {
+            requestResponse.send(message).subscribe({
+                next: respMessage => {
+                    response.next(respMessage)
+                },
+                error: error => {
+                    this.console.error({ message: error })
+                    response.error(error)
+                },
+                complete: () => {
+                    this.console.log({ message: `Response Completed for message: ${message.id ?? 'undefined message'}` })
+                    response.complete()
+                }
             })
         })
+    }
+
+    private streamMessage(receiverInstance: MessageTransmissionReceiver): Observable<any> {
+        return new Observable((response: Observer<any>) => {
+            const subscription: Subscription = receiverInstance.getReceivables().subscribe({
+                next: message => {
+                    response.next(message)
+                },
+                error: error => response.error(error),
+                complete: () => response.complete()
+            })
 
-        // to emulate general notification. Send every second
-        this.messageProducer.getNotificationMessage().subscribe((message: FisMessage) => {
-            messageTransmission.transmitter.emit(message)
+            // Clean up on unsubscription
+            return () => {
+                subscription.unsubscribe();
+            };
         })
     }
 
@@ -134,7 +157,9 @@ class Supervisor {
                 type: `General Event`,
                 event: `Available Transport`,
                 date: new Date(),
-                data: this.transportServiceArray
+                data: this.transportServiceArray,
+                transport: undefined,
+                transportServiceId: null
             })
         })
     }

+ 14 - 44
src/transmission/msg.transmission.manager.ts

@@ -6,7 +6,7 @@ import { MessageTransmissionRequestResponse } from "./msg.transmission.request-r
 import { filter, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs";
 import ConsoleLogger from "../utils/log.utils";
 import { MessageTransmissionManagerBase } from "../base/msg.transmission.manager.base";
-import { AdapterInterface, ClientObject, GeneralEvent, TransmissionInterface } from "../interface/interface";
+import { GeneralEvent, MessageReceiverInterface, MessageTransmitterInterface } from "../interface/interface";
 
 export class MessageTransmissionManager extends MessageTransmissionManagerBase {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionManager`, ['managers'])
@@ -16,55 +16,25 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
         if (browserEnv) this.browserEnv = browserEnv
         this.console.log({ message: `Contructing self... ${this.browserEnv ? `is receiving end` : `is not browser env`}` })
         this.event = event
-        // Instantiate and subscribe for adapterEvent
+        // Instantiate adapter manager
         this.adapterManager = new AdapterManager(event, this.browserEnv)
-        this.adapterManager.subscribeForAdapters().subscribe((adapterEvent: GeneralEvent<AdapterInterface>) => {
-            this.event.next(adapterEvent)
-        })
     }
 
-    public subscribeForTransmission(): Observable<GeneralEvent<TransmissionInterface>> {
-        return new Observable((observer: Observer<GeneralEvent<TransmissionInterface>>) => {
-            const subscription: Subscription = this.event.pipe(
-                filter(event => event.type === `Transport Event`),
-                filter(event => event.event === `New Client` || event.event === `New Server`)
-            ).subscribe((event: GeneralEvent<ClientObject>) => {
-                // get all adapters for all the connection
-                let transmission: TransmissionInterface | undefined = this.instantiateTransmissionComponents(event.data, this.event)
-                this.console.log({ message: `Acknowledged new client: ${transmission.clientId}. Instantiated Transmission Components.` })
-                if (transmission) {
-                    // reason for this, is because the subscribe for adapter Events is not pushed into the global event
-                    // This is needed also for relevant supervisitory components to be notified that the ransmission for the particular client is ready to go
-                    observer.next({
-                        id: uuidv4(),
-                        type: `Transmission Event`,
-                        event: 'New Transmission',
-                        date: new Date(),
-                        data: transmission
-                    })
-                }
-            })
-
-            // Clean up on unsubscription
-            return () => {
-                subscription.unsubscribe();
-            };
-        })
+    // for now these fuctions ain't pure, cuz messageTransmissionManager needs to keep a record of these tranmission instances as well for safe keeping.
+    public getTransmitter(receiverId: string): MessageTransmitterInterface {
+        let transmitterInstance = new MessageTransmissionTransmitter(receiverId, this.adapterManager)
+        this.tranmissionRef.push(transmitterInstance)
+        return transmitterInstance
     }
 
-    protected instantiateTransmissionComponents(clientObj: ClientObject, eventRef: Subject<GeneralEvent<any>>): TransmissionInterface {
-        let receiverInstance: MessageTransmissionReceiver = new MessageTransmissionReceiver(clientObj.clientId, eventRef)
-        let transmitterInstance: MessageTransmissionTransmitter = new MessageTransmissionTransmitter(clientObj.clientId, eventRef)
-        let requestResponseInstance: MessageTransmissionRequestResponse = new MessageTransmissionRequestResponse(clientObj.clientId, transmitterInstance, receiverInstance, eventRef)
-        let transmissionObj: TransmissionInterface = {
-            clientId: clientObj.clientId,
-            transmitter: transmitterInstance,
-            receiver: receiverInstance,
-            requestResponse: requestResponseInstance,
-            clientInfo: clientObj
-        }
+    public getReceiver(transmitterid: string): MessageReceiverInterface {
+        let receiverInstance = new MessageTransmissionReceiver(transmitterid, this.adapterManager)
+        this.tranmissionRef.push(receiverInstance)
+        return receiverInstance
+    }
 
-        return transmissionObj
+    public getRequestResponse(clientId: string, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface): MessageTransmissionRequestResponse {
+        return new MessageTransmissionRequestResponse(clientId, transmitterInstance, receiverInstance)
     }
 
 }

+ 28 - 28
src/transmission/msg.transmission.receiver.ts

@@ -1,29 +1,30 @@
-import { filter, map, Observable, Observer, Subject, Subscription } from 'rxjs';
+import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
 import { ReceiverAdapter } from '../adapters/adapter.receiver';
 import { checkMessage, WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
 import { MessageTransmissionBase } from '../base/msg.transmission.base';
-import { AdapterInterface, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
+import { AdapterInterface, AdapterManagerInterface, ConnectionState, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
+import { error } from 'console';
 
 export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
+    private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>(`OFFLINE`)
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
     private onHoldMessage: Subject<WrappedMessage> = new Subject()
     private currentAdapter!: ReceiverAdapterInterface
     private incomingMessage: Subject<GeneralEvent<TransportMessage>> = new Subject()
     // private toBePassedOver: Subject<WrappedMessage> = new Subject()
 
-    constructor(clientId: string, event: Subject<GeneralEvent<any>>) {
+    constructor(clientId: string, adapterManager: AdapterManagerInterface) {
         super()
         this.clientId = clientId
-        this.event = event
 
-        this.handleAdapters(this.event.asObservable())
+        this.initializeReceiverComponents(adapterManager)
     }
 
-    public getIncoming(): Observable<GeneralEvent<TransportMessage>> {
-        this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
-        return new Observable((observable: Observer<GeneralEvent<any>>) => {
+    public getReceivables(): Observable<GeneralEvent<TransportMessage>> {
+        return new Observable((receivable: Observer<GeneralEvent<TransportMessage>>) => {
+            this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
             const subscription: Subscription = this.incomingMessage.pipe(
                 filter((event: GeneralEvent<any>) => event.event == 'New Message'),
             ).subscribe((event: GeneralEvent<TransportMessage>) => {
@@ -33,7 +34,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
                     // only release the message before it exists
                     this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
                     // console.log(((event.data as TransportMessage).payload as WrappedMessage))
-                    observable.next(event);
+                    receivable.next(event);
                 }).catch((error) => {
                     this.console.log({ message: `Observer Error`, details: error })
                 })
@@ -44,28 +45,27 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
                 subscription.unsubscribe();
             };
         })
+
     }
 
     /* Assigned and update adapters record. Currently no logic to swtich adapters based on performance or whatever logic to be integrated in the future */
-    protected handleAdapters(adapterEvent: Observable<GeneralEvent<any>>): void {
-        adapterEvent.pipe(
-            filter(event => event.type === `Adapter Event`),
-            filter(event => event.event === `New Adapter`),
-            map(event => { return event.data as AdapterInterface }),
-            filter((adapter: AdapterInterface) => adapter.role === `Receiver`),
-            map(adapter => { return adapter as ReceiverAdapter })
-        ).subscribe({
-            next: (adapter: ReceiverAdapterInterface) => {
-                if (!this.adapters.some(adapterObj => adapterObj.adapterId === adapter.adapterId)) {
-                    this.adapters.push(adapter)
-                    this.currentAdapter = adapter
-                    this.console.log({ message: `Setting Current adapter = ${this.currentAdapter.adapterId}` })
-                    this.currentAdapter.subscribeForIncoming().subscribe(this.incomingMessage)
-                } else {
-                    this.console.error({ message: `Adapter ID: ${adapter.adapterId} already existed.` })
-                }
-            },
-            error: error => this.console.error({ message: 'Observer Error', details: error })
+    private initializeReceiverComponents(adapterManager: AdapterManagerInterface): void {
+        adapterManager.subscribeForAdapters(this.clientId, `Receiver`).pipe(
+        ).subscribe((adapter: AdapterInterface) => {
+            this.console.log({ message: `Adding new ${adapter.transport} receiving adapter. Current adapter length: ${this.adapters.length}` })
+            this.adapters.push(adapter)
+            if (!this.currentAdapter) {
+                this.console.log({ message: `Setting this ${adapter.transport} as current adapter.` })
+                this.currentAdapter = adapter as ReceiverAdapterInterface
+                this.currentAdapter.connectionState.subscribe(this.connectionStateEvent)
+            } else {
+                this.currentAdapter.subscribeForIncoming().subscribe({
+                    next: (message: GeneralEvent<TransportMessage>) => this.incomingMessage.next(message),
+                    error: error => {
+                        // Error handling. Idealling switching to other adapters
+                    }
+                })
+            }
         })
     }
 

+ 5 - 6
src/transmission/msg.transmission.request-response.ts

@@ -3,19 +3,18 @@ import { filter, map, Observable, Observer, Subject, Subscription, takeWhile } f
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
-import { AdapterInterface, FisMessage, GeneralEvent, MessageRequestResponseInterface, TransportMessage } from "../interface/interface";
+import { AdapterInterface, FisMessage, GeneralEvent, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransportMessage } from "../interface/interface";
 import { WrappedMessage } from "../utils/message.ordering";
 
 export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface {
-    transmitterInstance!: MessageTransmissionTransmitter;
-    receiverInstance!: MessageTransmissionReceiver;
+    transmitterInstance!: MessageTransmitterInterface;
+    receiverInstance!: MessageReceiverInterface;
 
-    constructor(clientId: string, transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, event: Subject<GeneralEvent<any>>) {
+    constructor(clientId: string, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface) {
         super()
         this.clientId = clientId
         this.transmitterInstance = transmitterInstance
         this.receiverInstance = receiverInstance
-        this.event = event
     }
 
     send(message: FisMessage): Observable<FisMessage> {
@@ -23,7 +22,7 @@ export class MessageTransmissionRequestResponse extends MessageTransmissionBase
             // logic here
             if (this.transmitterInstance && this.receiverInstance) {
                 this.transmitterInstance.emit(message)
-                const subscription: Subscription = this.receiverInstance.getIncoming().pipe(
+                const subscription: Subscription = this.receiverInstance.getReceivables().pipe(
                     filter(event => event.event === `New Message`),
                     filter(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID === message.header.messageID),
                     map(event => {

+ 15 - 44
src/transmission/msg.transmission.transmitter.ts

@@ -4,8 +4,9 @@ import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Subject
 import { RetransmissionService } from "../utils/retransmission.service";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { AdapterInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmitterAdapterInterface, TransportMessage } from "../interface/interface";
+import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmitterAdapterInterface, TransportMessage } from "../interface/interface";
 import { error } from "console";
+import { TransmitterAdapter } from "../adapters/adapter.transmitter";
 
 /* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
 connectors or adapters will have their own identifier*/
@@ -16,15 +17,13 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     private buffer!: RetransmissionService;
     private currentAdapter!: TransmitterAdapterInterface
 
-    constructor(clientId: string, event: Subject<GeneralEvent<any>>) {
+    constructor(clientId: string, adapterManager: AdapterManagerInterface) {
         super()
         this.console.log({ message: `Constructing Transmitter Transmission with ${clientId}` })
         this.clientId = clientId
-        this.event = event
         this.messageToBeBuffered = new Subject()
         this.buffer = new RetransmissionService()
-        this.handleAdapters(this.event.asObservable())
-        this.setupBuffer()
+        this.initializeTransmitterComponents(adapterManager)
     }
 
     public emit(message: FisMessage): void {
@@ -34,25 +33,19 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
 
     /* After setting up, will listen specifically to the connection state of this particular remote client. So that, the buffer signal can be
     established to allow the buffer to do their thing. */
-    private setupBuffer(): void {
+    private initializeTransmitterComponents(adapterManager: AdapterManagerInterface): void {
         this.console.log({ message: `Setting up Retransmission Service...` })
-        this.event.pipe(
-            filter(event => event.type === `Transport Event`),
-            filter(event => event.event == 'Client Disconnected' || event.event == 'Client Re-connected' || event.event == 'Client Connected' || event.event == 'Server Disconnected' || event.event == 'Server Connected'),
-            filter(event => event.data.clientId == this.clientId),
-            map(event => {
-                if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
-                    return 'OFFLINE'
-                } else {
-                    return `ONLINE`
-                }
-            }),
-            distinctUntilChanged()
-        ).subscribe((signal: ConnectionState) => {
-            this.connectionStateEvent.next(signal)
-            if (signal == 'OFFLINE') this.console.error({ message: `${this.clientId} disconnected` })
-            if (signal == 'ONLINE') this.console.log({ message: `${this.clientId} connected and ready to go` })
+        // Listen and update adapters
+        adapterManager.subscribeForAdapters(this.clientId, `Transmitter`).subscribe((adapter: AdapterInterface) => {
+            this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} transmitting adapter. Current adapter length: ${this.adapters.length}` })
+            this.adapters.push(adapter)
+            if (!this.currentAdapter) {
+                this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`transportType`)} as current adapter.` })
+                this.currentAdapter = adapter as TransmitterAdapterInterface
+                this.currentAdapter.connectionState.subscribe(this.connectionStateEvent)
+            }
         })
+
         this.buffer.implementRetransmission(this.messageToBeBuffered, this.connectionStateEvent.asObservable(), true)
         // automatically subscribe to allow released bffered messages to be released
         this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
@@ -68,28 +61,6 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         })
     }
 
-    // hardcode it to use the first adapter added for now. Haven't decide on the logic to dynamically switch them adapters
-    protected handleAdapters(adaptersEvent: Observable<GeneralEvent<any>>): void {
-        adaptersEvent.pipe(
-            filter(event => event.type === `Adapter Event`),
-            filter(event => event.event === `New Adapter`),
-            map(event => { return event.data as AdapterInterface }),
-            filter((adapter: AdapterInterface) => adapter.role === `Transmitter`),
-            map(adapter => { return adapter as TransmitterAdapterInterface })
-        ).subscribe({
-            next: (adapter: TransmitterAdapterInterface) => {
-                this.adapters.push(adapter)
-                if (!this.currentAdapter) {
-                    this.currentAdapter = adapter as TransmitterAdapterInterface
-                    this.console.log({ message: `Setting Current adapter = ${this.currentAdapter.adapterId}` })
-                } else {
-                    this.console.log({ message: `Already have existing transmitting adapter. Current adapter = ${this.currentAdapter.adapterId}` })
-                }
-            },
-            error: error => this.console.error({ message: 'Observer Error', details: error })
-        })
-    }
-
     // this is for http only. Please ignore for now.
     private uniqueHandlerToFlushUnsentMessages(event: Observable<GeneralEvent<any>>): void {
         event.pipe(

+ 7 - 5
src/transport/websocket.ts

@@ -5,7 +5,7 @@ import { Socket as SocketForConnectedClient } from "socket.io"
 import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { ClientObject, FisMessage, GeneralEvent, Transport, TransportMessage, TransportServiceInterface, TransportServiceProfile } from "../interface/interface";
+import { ClientObject, FisMessage, GeneralEvent, TransportMessage, TransportServiceInterface, TransportServiceProfile } from "../interface/interface";
 
 /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
 export class WebsocketTransportService implements TransportServiceInterface {
@@ -54,12 +54,12 @@ export class WebsocketTransportService implements TransportServiceInterface {
         let clientObj: ConnectedSocketClient | undefined = this.connectedClientSocket.find(obj => obj.clientId === message.target)
         let serverObj: ConnectedSocketServer | undefined = this.connectedSocketServer.find(obj => obj.clientId === message.target)
         // for server usage
-        if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
-            clientObj.socketInstance.emit(`message`, message.payload)
+        if (clientObj) {
+            clientObj.socketInstance.emit(`message`, message)
         }
         // for client usage
-        if (serverObj && serverObj.connectionState.getValue() == 'ONLINE') {
-            serverObj.socketInstance.emit(`message`, message.payload)
+        if (serverObj) {
+            serverObj.socketInstance.emit(`message`, message)
         }
     }
 
@@ -80,3 +80,5 @@ export interface ConnectedSocketClient extends ClientObject {
 export interface ConnectedSocketServer extends ClientObject {
     socketInstance: SocketForConnectedServer
 }
+
+

+ 9 - 9
src/utils/general.utils.ts

@@ -1,9 +1,10 @@
 import * as fs from 'fs'
 import path from 'path';
 import ConsoleLogger from './log.utils';
+import { ClientObject, TransportServiceInterface } from '../interface/interface';
 const console: ConsoleLogger = new ConsoleLogger(`GeneralUtils`, ['util'])
 // Check if filename exists. Return profile information if there's any
-export async function checkOwnClientInfo(filename?: string): Promise<{ id: string }> {
+export async function checkOwnClientInfo(filename?: string): Promise<{ id: string, clientId: string }> {
     return new Promise((resolve, reject) => {
         // Check if the file exists
         let filePath = process.env.FolderPath as string
@@ -18,7 +19,7 @@ export async function checkOwnClientInfo(filename?: string): Promise<{ id: strin
                 }
 
                 // Parse and return the data if present
-                const jsonData = JSON.parse(fileData);
+                const jsonData: { id: string, clientId: string } = JSON.parse(fileData);
                 resolve(jsonData)
 
             } catch (err) {
@@ -51,7 +52,7 @@ export async function checkIfClientExists(id: string): Promise<any> {
             let obj = data.find(entry => entry.id === id);
 
             if (obj) {
-                console.log({ message: "Client with ID ${id} exists." })
+                console.log({ message: `Client with ID ${id} exists.` })
             } else {
                 console.log({ message: `Client with ID ${id} does not exist.` })
             }
@@ -66,7 +67,7 @@ export async function writeFile(data: any, filename: string): Promise<boolean> {
     return new Promise((resolve, reject) => {
         // Ensure the folder exists
         const folderPath = process.env.FolderPath as string
-        console.log({ message: folderPath + filename })
+        console.log({ message: folderPath + (data.clientId ?? `undefined client Id`) })
         // const folderPath = path.join(__dirname, folder);
         if (!fs.existsSync(folderPath)) {
             fs.mkdirSync(folderPath, { recursive: true }); // Create folder if it doesn't exist
@@ -91,10 +92,10 @@ export async function writeFile(data: any, filename: string): Promise<boolean> {
 }
 
 /* For Internal Usage only. Temporary serve as a way for server to keep track of clients. To be replaced in the future with better alternatives. */
-export function addClientToDB(entry: any): void {
+export function addClientToDB(entry: any, filename: string): void {
     try {
         let data: any[] = [];
-        let filePath = process.env.FolderPath as string + 'clients.json'
+        let filePath = process.env.FolderPath as string + `${filename}.json`
         // Check if the file exists and load existing data
         if (fs.existsSync(filePath)) {
             const fileContent = fs.readFileSync(filePath, 'utf-8');
@@ -105,8 +106,6 @@ export function addClientToDB(entry: any): void {
         data.push({
             id: entry.clientId,
             dateCreated: entry.dateCreated,
-            connectionState: null,
-            socketInstance: null
         });
 
         // Write the updated array back to the file
@@ -115,4 +114,5 @@ export function addClientToDB(entry: any): void {
     } catch (error) {
         console.log({ message: 'Error writing to file:', details: error })
     }
-}
+}
+

+ 140 - 166
src/utils/socket.utils.ts

@@ -1,4 +1,4 @@
-import { BehaviorSubject, Observable, Observer, Subject } from 'rxjs';
+import { Observable, Observer } from 'rxjs';
 import { createServer } from 'http';
 import { Server, Socket as SocketForConnectedClient } from 'socket.io';
 import { io, Socket as SocketForConnectedServer } from 'socket.io-client';
@@ -7,8 +7,8 @@ import { v4 as uuidv4 } from 'uuid'
 import { ConnectedSocketClient, ConnectedSocketServer } from '../transport/websocket';
 import ConsoleLogger from './log.utils';
 import path from 'path';
-import { ClientObject, ConnectionState, GeneralEvent, TransportMessage } from '../interface/interface';
-import { addClientToDB, checkIfClientExists, checkOwnClientInfo, writeFile } from './general.utils';
+import { ClientObject, GeneralEvent, TransportMessage } from '../interface/interface';
+import { addClientToDB, checkIfClientExists, checkOwnClientInfo } from './general.utils';
 const console: ConsoleLogger = new ConsoleLogger(`SocketUtils`, ['transport'])
 
 export function startSocketServer(port: number): Observable<SocketForConnectedClient> {
@@ -63,69 +63,71 @@ export async function startClientSocketConnection(serverUrl: string): Promise<So
 export function handleClientSocketConnection(transportServiceId: string, socket: SocketForConnectedServer, serversConnected: ConnectedSocketServer[]): Observable<GeneralEvent<any>> {
     return new Observable((eventNotification: Observer<GeneralEvent<any>>) => {
         let buffer: any[] = []
-        let receiverProfileInfo!: ConnectedSocketServer
+        let selfId!: string
+        let clientProfileInfo!: ConnectedSocketServer // this information refers to the server itself
 
         // Listen for a connection event
         socket.on('connect', () => {
             console.log({ message: `Connected to the server ${socket.id} ` })
-            if (receiverProfileInfo?.clientId) {
-                checkOwnClientInfo(receiverProfileInfo.clientId).then((profile: { id: string }) => {
-                    socket.emit('profile', {
-                        name: 'Old Client',
-                        data: profile
-                    })
-                }).catch((error) => {
-                    socket.emit('profile', {
-                        name: 'New Client',
-                        data: null
-                    })
-                })
-            } else {
-                socket.emit('profile', {
-                    name: 'New Client',
-                    data: null
+            checkOwnClientInfo(`clientprofile`).then((profile: { id: string, clientId: string }) => {
+                socket.emit('handshaking', {
+                    id: profile.id,
                 })
-            }
-        });
+            }).catch((error) => {
+                console.error({ message: `Client profile is not defined!`, details: error })
+            })
 
-        // Listen for messages from the server. Generally here's the responses
-        socket.on('message', (msg: any) => {
-            // console.log(`Websocket Client Transport Receieve Msg`, msg)
-            if (receiverProfileInfo) {
-                // publish to event
-                eventNotification.next({
-                    id: uuidv4(),
-                    type: `Transport Event`,
-                    event: 'New Message',
-                    date: new Date(),
-                    data: {
+
+            // Listen for messages from the server. Generally here's the responses
+            socket.on('message', (msg: TransportMessage) => {
+                if (clientProfileInfo) {
+                    console.log({ message: `Websocket Client Transport Receive Msg` })
+                    // publish to event
+                    eventNotification.next({
                         id: uuidv4(),
-                        dateCreated: new Date(),
+                        type: `Transport Event`,
+                        event: 'New Message',
+                        date: new Date(),
+                        data: msg,
                         transport: `Websocket`,
-                        target: receiverProfileInfo.clientId,
-                        payload: msg
-                    } as TransportMessage
-                })
-            } else {
-                // Do nothing. just store in local array first. Cannot process without information. but then again, don['t need information if acting as client
-                // but for consistency sake, will impose the standard 
-                buffer.push(msg) // store locally for now
-            }
-        })
-
-        socket.on('profile', (data: { name: string, message: any }) => {
-            if (data.name == 'New Profile') {
-                console.log({ message: `Assigned client Name: ${data.message.id}` })
-                // Update websocket instance record
-                receiverProfileInfo = {
-                    clientId: data.message.id,
-                    dateCreated: new Date(),
-                    socketInstance: socket,
-                    connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`),
-                    transport: 'Websocket',
-                    transportServiceId: transportServiceId
+                        transportServiceId: transportServiceId
+                    } as GeneralEvent<TransportMessage>)
+                } else {
+                    // Do nothing. just store in local array first. Cannot process without information. but then again, don['t need information if acting as client
+                    // but for consistency sake, will impose the standard 
+                    buffer.push(msg) // store locally for now
                 }
-                writeFile(data.message as ConnectedSocketServer, receiverProfileInfo.clientId).then(() => {
+            })
+
+            socket.on('handshaking', (data: { id: string, message?: any }) => {
+                // check if this is previously connected or not by checking if the clientId has been previously assigned
+                if (serversConnected.some(obj => obj.clientId === data.id)) {
+                    console.log({ message: data.message ?? `No message from server...` })
+                    // Update websocket instance record
+                    let clientObj: ConnectedSocketServer | undefined = serversConnected.find(obj => obj.clientId === data.id)
+                    if (clientObj) {
+                        console.log({
+                            message: `Just to make sure they are pointed accurately !! Id match? ${clientProfileInfo.clientId == clientObj.clientId ? true : false} && compare ${clientObj.clientId}`,
+                        })
+                        // broadcast event to allow retransmission to release buffer
+                        eventNotification.next({
+                            id: uuidv4(),
+                            type: `Transport Event`,
+                            event: 'Server Connected',
+                            date: new Date(),
+                            data: clientObj,
+                            transport: 'Websocket',
+                            transportServiceId: transportServiceId
+                        } as GeneralEvent<ClientObject>)
+                    }
+                } else {
+                    console.log({ message: `Server acknowledged for new client connection for ${data.id}` })
+                    clientProfileInfo = {
+                        clientId: data.id,
+                        dateCreated: new Date(),
+                        socketInstance: socket
+                    }
+                    addClientToDB(clientProfileInfo as ConnectedSocketServer, 'servers')
                     /* Note that there are two separate events, because transmission must first be set up before releasing buffer. */
                     // broadcast event to allow transmission manager to instantiate transmission components
                     eventNotification.next({
@@ -133,115 +135,63 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
                         type: 'Transport Event',
                         event: `New Server`,
                         date: new Date(),
-                        data: receiverProfileInfo,
-                        transport: 'Websocket'
-                    })
+                        data: clientProfileInfo,
+                        transport: 'Websocket',
+                        transportServiceId: transportServiceId
+                    } as GeneralEvent<ClientObject>)
                     // broadcast event to allow retransmission to relase buffered messages
                     eventNotification.next({
                         id: uuidv4(),
                         type: 'Transport Event',
                         event: `Server Connected`,
                         date: new Date(),
-                        data: receiverProfileInfo,
-                        transport: 'Websocket'
-                    })
-                }).catch((error) => { }) // do nothing at the moment. 
-                serversConnected.push(receiverProfileInfo)
-            }
-            if (data.name == 'Adjusted Profile') {
-                console.log({ message: `Adjusted client Name: ${data.message.id}` })
-                // Update websocket instance record
-                let clientObj: ConnectedSocketServer | undefined = serversConnected.find(obj => obj.clientId === data.message.id)
-                if (clientObj) {
-                    clientObj.socketInstance = socket
-                    clientObj.connectionState.next('ONLINE')
-                    console.log({
-                        message: `Just to make sure they are pointed accurately: This should be ONLINE: ${receiverProfileInfo.connectionState.getValue()} !! Id match? ${receiverProfileInfo.clientId == clientObj.clientId ? true : false} && compare ${clientObj.clientId}`,
-                    })
-                    // broadcast event to allow retransmission to release buffer
+                        data: clientProfileInfo,
+                        transport: 'Websocket',
+                        transportServiceId: transportServiceId
+                    } as GeneralEvent<ClientObject>)
+                    serversConnected.push(clientProfileInfo)
+                }
+            })
+
+            // Handle disconnection
+            socket.on('disconnect', () => {
+                console.log({ message: `Socket Server ${clientProfileInfo.clientId} Disconnected` })
+                if (clientProfileInfo) {
                     eventNotification.next({
                         id: uuidv4(),
                         type: `Transport Event`,
-                        event: 'Server Connected',
+                        event: `Server Disconnected`,
                         date: new Date(),
-                        data: clientObj,
-                        transport: 'Websocket'
-                    })
+                        data: clientProfileInfo,
+                        transport: `Websocket`,
+                        transportServiceId: transportServiceId
+                    } as GeneralEvent<ClientObject>)
                 }
-            }
-            if (data.name == 'Error') {
-                console.log({ message: `Server cannot find credentials`, details: data.message })
-                // logic to request for new credentials
-                setTimeout(() => {
-                    // for now, if clent cannot be recognize, then we will just proceed as a new client
-                    socket.emit('profile', {
-                        name: 'New Client',
-                        data: null
-                    })
-                }, 2000)
-            }
+            });
         })
-
-        // Handle disconnection
-        socket.on('disconnect', () => {
-            console.log({ message: `Socket Server ${receiverProfileInfo.clientId} Disconnected` })
-            if (receiverProfileInfo) {
-                eventNotification.next({
-                    id: uuidv4(),
-                    type: `Transport Event`,
-                    event: `Server Disconnected`,
-                    date: new Date(),
-                    data: receiverProfileInfo,
-                    transport: `Websocket`
-                })
-                receiverProfileInfo.connectionState.next(`OFFLINE`)
-            }
-        });
     })
 }
 
 // For SERVER Usage: set up socket listeners to start listening for different events
 export function handleNewSocketClient(transportServiceId: string, socket: SocketForConnectedClient, connectedClientSocket: ConnectedSocketClient[]): Observable<GeneralEvent<any>> {
     return new Observable((event: Observer<GeneralEvent<any>>) => {
+        let selfId!: string
+        checkOwnClientInfo(`serverprofile`).then((profile: { id: string }) => {
+            selfId = profile.id
+        })
         console.log({ message: `Socket client connected. Setting up listeners for socket:${socket.id}` })
         // returns the socket client instance 
-        // listen to receiver's initiotion first before assigning 'credentials'
-        socket.on(`profile`, (message: { name: string, data: any }) => {
-            if (message.name == 'New Client') {
-                let clientInstance: ConnectedSocketClient = {
-                    clientId: uuidv4(), // client should only be assigned at this level. And is passed around for reference pointing
-                    dateCreated: new Date(),
-                    socketInstance: socket,
-                    connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`),
-                    transport: 'Websocket',
-                    transportServiceId: transportServiceId
-                }
-                // send to receiver for reference
-                socket.emit('profile', {
-                    name: `New Profile`, message: { id: clientInstance.clientId }
-                })
-                // publish first event notification
-                event.next({
-                    id: uuidv4(),
-                    type: 'Transport Event',
-                    event: `New Client`,
-                    date: new Date(),
-                    data: clientInstance,
-                    transport: 'Websocket'
-                })
-                // Update connected clientInstance info to adapter
-                connectedClientSocket.push(clientInstance)
-                addClientToDB(clientInstance)
-                startListening(socket, clientInstance, event)
-            } else {
+        socket.on(`handshaking`, (data: { id: string, message?: any }) => {
+            // if it's a previously connected client
+            if (connectedClientSocket.some(obj => obj.clientId == data.id)) {
                 // update first
                 let clientInstance: ConnectedSocketClient | undefined
                 if (connectedClientSocket.length > 0) {
-                    clientInstance = connectedClientSocket.find(obj => obj.clientId === message.data.id)
+                    clientInstance = connectedClientSocket.find(obj => obj.clientId === data.id)
                     handleFoundClient(clientInstance)
                 } else {
-                    // for the case server itself got shit down or something
-                    checkIfClientExists(message.data.id).then((client: ConnectedSocketClient) => {
+                    // for the case server itself got shut down or something. This one will check at persisted local storage
+                    checkIfClientExists(data.id).then((client: ConnectedSocketClient) => {
                         clientInstance = client
                         handleFoundClient(clientInstance)
                     }).catch(error => {
@@ -251,57 +201,80 @@ export function handleNewSocketClient(transportServiceId: string, socket: Socket
                 function handleFoundClient(clientInstance: ConnectedSocketClient | undefined) {
                     if (clientInstance) {
                         console.log({ message: `Socket Client ${clientInstance.clientId} Found. This is a previously connected client.` })
-                        socket.emit('profile', { name: 'Adjusted Profile', message: { id: clientInstance.clientId } })
+                        // publish own id to client
+                        if (selfId) {
+                            socket.emit('handshaking', { id: selfId })
+                        } else {
+                            console.error({ message: 'Self Id not established' })
+                        }
                         // replace socket instance since the previous has been terminated
                         clientInstance.socketInstance = socket
                         // need to start listening again, because it's assigned a different socket instance this time round
-                        startListening(socket, clientInstance, event, true)
+                        startListening(socket, clientInstance, event, transportServiceId, true)
                     } else {
                         console.log({ message: `Profile Not Found` })
-                        socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' })
+                        socket.emit('handshaking', { id: selfId, message: 'Receiver Profile Not found' })
                     }
                 }
+            } else {
+                console.log({ message: `New Client Connected. ClientID: ${data.id}` })
+                let clientInstance: ConnectedSocketClient = {
+                    clientId: data.id,
+                    dateCreated: new Date(),
+                    socketInstance: socket,
+                }
+                // send to receiver for reference
+                if (selfId) {
+                    socket.emit('handshaking', { id: selfId })
+                } else {
+                    console.error({ message: 'Self Id not established' })
+                }
+                // publish first event notification
+                event.next({
+                    id: uuidv4(),
+                    type: 'Transport Event',
+                    event: `New Client`,
+                    date: new Date(),
+                    data: clientInstance,
+                    transport: 'Websocket',
+                    transportServiceId: transportServiceId
+                } as GeneralEvent<ClientObject>)
+                // Update connected clientInstance info to adapter
+                connectedClientSocket.push(clientInstance)
+                addClientToDB(clientInstance, `clients`)
+                startListening(socket, clientInstance, event, transportServiceId)
             }
+
         })
     })
 }
 
 
 // this is for server usage only
-export function startListening(socket: SocketForConnectedClient, client: ConnectedSocketClient, eventListener: Observer<GeneralEvent<any>>, oldClient?: boolean): void {
+export function startListening(socket: SocketForConnectedClient, client: ConnectedSocketClient, eventListener: Observer<GeneralEvent<any>>, transportServiceid: string, oldClient?: boolean): void {
     // notify it's associated retransmission to start releaseing buffer
     eventListener.next({
         id: uuidv4(),
         type: 'Transport Event',
-        event: oldClient ? 'Client Re-connected' : `Client Connected`,
+        event: oldClient ? `Client Re-connected` : `Client Connected`,
         date: new Date(),
         data: client,
-        transport: 'Websocket'
-    })
-    // Resume operation
-    // some explanation here. For the case where the server reads from the DB, no need to terminate subject, since all instances would be destroyed alongside the server shut down. This case is specificd only when there's a need to read from local file
-    if (!client.connectionState) {
-        client.connectionState = new BehaviorSubject<ConnectionState>(`ONLINE`)
-    } else {
-        client.connectionState.next(`ONLINE`)
-    }
+        transport: 'Websocket',
+        transportServiceid: transportServiceid
+    } as unknown as GeneralEvent<ClientObject>)
 
     /* Generally, we don't need this unless in the case of being the receiver */
-    socket.on('message', (message: any) => {
+    socket.on('message', (message: TransportMessage) => {
         console.log({ message: `Message from client ${client.clientId}`, details: message })
         eventListener.next({
             id: uuidv4(),
             type: 'Transport Event',
             event: 'New Message',
             date: new Date(),
-            data: {
-                id: uuidv4(),
-                dateCreated: new Date(),
-                transport: `Websocket`,
-                target: client.clientId, // this ref to be associated with the client/channel
-                payload: message
-            } as TransportMessage
-        })
+            data: message,
+            transport: `Websocket`,
+            transportServiceId: transportServiceid
+        } as GeneralEvent<TransportMessage>)
     })
 
     socket.on('disconnect', () => {
@@ -311,8 +284,9 @@ export function startListening(socket: SocketForConnectedClient, client: Connect
             event: 'Client Disconnected',
             date: new Date(),
             data: client,
-            transport: 'Websocket'
-        })
+            transport: 'Websocket',
+            transportServiceId: transportServiceid
+        } as GeneralEvent<ClientObject>)
         eventListener.error(`Client ${client.clientId} disconnected. Terminating this observable event for this client socket...`)
         eventListener.complete()
     })