Преглед изворни кода

basic functional message transmission

enzo пре 2 недеља
родитељ
комит
9150a6e182

+ 3 - 3
src/adapters/adapter.manager.ts

@@ -22,7 +22,7 @@ export class AdapterManager extends AdapterManagerBase {
     public subscribeForAdapters(clientId: string, role: TransmissionRole): Observable<AdapterInterface> {
         return new Observable((adapters: Observer<AdapterInterface>) => {
             this.adapters.forEach((adapter: AdapterInterface) => {
-                if (adapter.getAdapterProfile(`clientId`) == clientId && adapter.getAdapterProfile(`role`) == role) {
+                if (adapter.getAdapterProfile(`clientId`) === clientId && adapter.getAdapterProfile(`role`) === role) {
                     adapters.next(adapter)
                 }
             })
@@ -108,8 +108,8 @@ export class AdapterManager extends AdapterManagerBase {
         if (transportService) {
             this.console.log({ message: `Instantiating ${transportType} adapters for client ${clientRef.clientId}` });
             const adapters: AdapterInterface[] = [
-                new TransmitterAdapter(clientRef.clientId, transportService.getInfo().transport, transportService),
-                new ReceiverAdapter(clientRef.clientId, transportService.getInfo().transport, transportService),
+                new TransmitterAdapter(clientRef.clientId, transportService.getInfo().transport, transportService, `Transmitter`),
+                new ReceiverAdapter(clientRef.clientId, transportService.getInfo().transport, transportService, `Receiver`),
             ];
             this.adapters.concat(adapters)
             return adapters;

+ 9 - 37
src/adapters/adapter.receiver.ts

@@ -4,29 +4,27 @@ import { v4 as uuidv4 } from 'uuid'
 import ConsoleLogger from '../utils/log.utils';
 import { WrappedMessage } from '../utils/message.ordering';
 import { AdapterBase } from '../base/adapter.base';
-import { ClientObject, ConnectionState, FisMessage, GeneralEvent, TransportType, TransportMessage, TransportServiceInterface } from '../interface/interface';
+import { ClientObject, ConnectionState, FisMessage, GeneralEvent, TransportType, TransportMessage, TransportServiceInterface, TransmissionRole } from '../interface/interface';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
-So how?: */
+So how?: */ 
 export class ReceiverAdapter extends AdapterBase {
-    private console!: ConsoleLogger
 
-    constructor(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface) {
-        super()
-        this.console = new ConsoleLogger(`${adapterType}ReceiverAdapter`, ['adapter'])
-        this.setAdapterProfile(clientId, adapterType, transportService, 'Receiver')
+    constructor(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface, role: TransmissionRole) {
+        super(clientId, adapterType, transportService, role)
+        this.console = new ConsoleLogger(`${this.adapterProfile.transportType}ReceiverAdapter`, ['adapter'])
         this.setupConnectionState(transportService)
 
-        this.console.log({ message: `Contructing ReceiverAdapter for clientId: ${clientId}` })
+        this.console.log({ message: `Contructing ReceiverAdapter for clientId: ${this.adapterProfile.clientId}` })
     }
 
     subscribeForIncoming(): Observable<GeneralEvent<any>> {
-        this.console.log({ message: `Connector getting message bus for this connector: ${this.AdapterProfile.clientId}` })
+        this.console.log({ message: `Connector getting message bus for this connector: ${this.adapterProfile.clientId}` })
         return new Observable((observable: Observer<GeneralEvent<any>>) => {
-            const subscription: Subscription = this.AdapterProfile.transportService.subscribeForEvent().pipe(
+            const subscription: Subscription = this.adapterProfile.transportService.subscribeForEvent().pipe(
                 filter(event => event.type === `Transport Event`),
-                filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).self === this.AdapterProfile.clientId ),
+                filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).source === this.adapterProfile.clientId ),
                 filter((message: GeneralEvent<any>) => message.event === 'New Message'),
             ).subscribe((message: GeneralEvent<TransportMessage>) => {
                 this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })
@@ -49,32 +47,6 @@ export class ReceiverAdapter extends AdapterBase {
         });
     }
 
-    // generally not required but can be used for other indicative purposes
-    getConnectionState(): Observable<ConnectionState> {
-        return this.AdapterProfile.connectionState.asObservable()
-    }
-
-    // this is irrelevant at this point in time. Adapter will just listen regardless of whether there's connection or not
-    private setupConnectionState(transportService: TransportServiceInterface): void {
-        transportService.subscribeForEvent().pipe(
-            filter(event => event.type === `Transport Event`),
-            filter(event => (event.data as ClientObject).clientId === this.AdapterProfile.clientId),
-            map(event => {
-                if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
-                    return 'OFFLINE'
-                } else {
-                    return `ONLINE`
-                }
-            }),
-            distinctUntilChanged()
-        ).subscribe((signal: ConnectionState) => {
-            this.AdapterProfile.connectionState.next(signal)
-            if (signal == 'OFFLINE') this.console.error({ message: `${this.AdapterProfile.clientId} disconnected` })
-            if (signal == 'ONLINE') this.console.log({ message: `${this.AdapterProfile.clientId} connected and ready to go` })
-        })
-    }
-
-
 }
 
 

+ 13 - 38
src/adapters/adapter.transmitter.ts

@@ -3,59 +3,34 @@ import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Subject
 import { WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
 import { AdapterBase } from '../base/adapter.base';
-import { ClientObject, ConnectionState, FisMessage, TransportType, TransportMessage, TransportServiceInterface } from '../interface/interface';
-import { fileURLToPath } from 'url';
+import { ClientObject, ConnectionState, FisMessage, TransportType, TransportMessage, TransportServiceInterface, TransmissionRole } from '../interface/interface';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class TransmitterAdapter extends AdapterBase {
-    private console!: ConsoleLogger
 
-    constructor(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface) {
-        super()
-        this.console = new ConsoleLogger(`${adapterType}TransmitterAdapter`, ['adapter'])
-        this.setAdapterProfile(clientId, adapterType, transportService, 'Transmitter')
-        this.setupConnectionState(transportService)
+    constructor(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface, role: TransmissionRole) {
+        super(clientId, adapterType, transportService, role)
+        this.console = new ConsoleLogger(`${this.adapterProfile.transportType}TransmitterAdapter`, ['adapter'])
 
-
-        this.console.log({ message: `Contructing TransmitterAdapter for client: ${clientId}` })
+        this.console.log({ message: `Contructing TransmitterAdapter for client: ${this.adapterProfile.clientId}` })
     }
 
     emit(selfId: string, message: WrappedMessage): void {
         // logic here
-        this.console.log({ message: `Emitting: ${(message.payload as FisMessage).header.messageID} to ${this.AdapterProfile.clientId}` })
-        this.AdapterProfile.transportService.emit({
-            id: this.AdapterProfile.clientId,
-            self: selfId,
-            target: this.AdapterProfile.clientId,
+        // this.console.log({ message: `Emitting: ${(message.payload as FisMessage).header.messageID} to ${this.adapterProfile.clientId}` })
+        this.console.log({ message: `Emitting: ${message.thisMessageID} to ${this.adapterProfile.clientId}` })
+        this.adapterProfile.transportService.emit({
+            id: this.adapterProfile.clientId,
+            dateCreated: new Date(),
+            transport: this.adapterProfile.transportType,
+            target: this.adapterProfile.clientId,
+            source: selfId,
             payload: message
         } as TransportMessage)
     }
 
-    getConnectionState(): Observable<ConnectionState> {
-        return this.AdapterProfile.connectionState.asObservable()
-    }
-
-    private setupConnectionState(transportService: TransportServiceInterface): void {
-        transportService.subscribeForEvent().pipe(
-            filter(event => event.type === `Transport Event`),
-            filter(event => (event.data as ClientObject).clientId === this.AdapterProfile.clientId),
-            map(event => {
-                if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
-                    return 'OFFLINE'
-                } else {
-                    return `ONLINE`
-                }
-            }),
-            distinctUntilChanged()
-        ).subscribe((signal: ConnectionState) => {
-            this.AdapterProfile.connectionState.next(signal)
-            if (signal == 'OFFLINE') this.console.error({ message: `${this.AdapterProfile.clientId} disconnected` })
-            if (signal == 'ONLINE') this.console.log({ message: `${this.AdapterProfile.clientId} connected and ready to go` })
-        })
-    }
-
 }
 
 

+ 43 - 23
src/base/adapter.base.ts

@@ -1,46 +1,66 @@
-import { BehaviorSubject, Observable, Subject } from "rxjs";
+import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Subject } from "rxjs";
 import dotenv from 'dotenv';
-import { AdapterInterface, ConnectionState, TransmissionRole, TransportType, TransportServiceInterface, AdapterProfile } from "../interface/interface";
+import { AdapterInterface, ConnectionState, TransmissionRole, TransportType, TransportServiceInterface, AdapterProfile, ClientObject } from "../interface/interface";
+import ConsoleLogger from "../utils/log.utils";
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class AdapterBase implements AdapterInterface {
-    protected AdapterProfile!: AdapterProfile
-    // protected clientId!: string
-    // protected transportService!: TransportServiceInterface
-    // protected role!: TransmissionRole;
-    // protected connectionState!: Subject<ConnectionState>
-    // protected transport: TransportType;
-
-    constructor() {
+    protected console!: ConsoleLogger
+    protected adapterProfile!: AdapterProfile
+
+    constructor(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface, role: TransmissionRole) {
         //logic here
+        this.setAdapterProfile(clientId, adapterType, transportService, role)
+        this.setupConnectionState(this.adapterProfile.transportService)
     }
 
-    setAdapterProfile(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface, role: TransmissionRole): void {
-        this.AdapterProfile.clientId = clientId
-        this.AdapterProfile.connectionState = new BehaviorSubject<ConnectionState>(`OFFLINE`)
-        this.AdapterProfile.role = role
-        this.AdapterProfile.transportType = adapterType
-        this.AdapterProfile.transportService = transportService
+    protected setAdapterProfile(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface, role: TransmissionRole): void {
+        this.adapterProfile = {} as AdapterProfile
+        this.adapterProfile.clientId = clientId as string
+        this.adapterProfile.connectionState = new BehaviorSubject<ConnectionState>(`OFFLINE`)
+        this.adapterProfile.role = role
+        this.adapterProfile.transportType = adapterType
+        this.adapterProfile.transportService = transportService
     }
 
-    getAdapterProfile(type?: `clientId` | `role` | `transportId` | `transportType` | `connectionState`): string | BehaviorSubject<ConnectionState> | AdapterProfile | undefined {
+    getAdapterProfile(type?: `clientId` | `role` | `transportId` | `transportType` | `connectionState`): string | Observable<ConnectionState> | AdapterProfile | undefined {
         if (!type) {
-            return this.AdapterProfile
+            return this.adapterProfile
         } else if (type == `clientId`) {
-            return this.AdapterProfile.clientId
+            return this.adapterProfile.clientId
         } else if (type == `connectionState`) {
-            return this.AdapterProfile.connectionState
+            return this.adapterProfile.connectionState.asObservable()
         } else if (type == `role`) {
-            return this.AdapterProfile.role
+            return this.adapterProfile.role
         } else if (type == `transportType`) {
-            return this.AdapterProfile.transportType
+            return this.adapterProfile.transportType
         } else if (type == `transportId`) {
-            return this.AdapterProfile.transportService.getInfo().transportServiceId
+            return this.adapterProfile.transportService.getInfo().transportServiceId
         }
 
     }
+
+    // this is irrelevant at this point in time. Adapter will just listen regardless of whether there's connection or not
+    protected setupConnectionState(transportService: TransportServiceInterface): void {
+        transportService.subscribeForEvent().pipe(
+            filter(event => event.type === `Transport Event`),
+            filter(event => (event.data as ClientObject).clientId === this.adapterProfile.clientId),
+            map(event => {
+                if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
+                    return 'OFFLINE'
+                } else {
+                    return `ONLINE`
+                }
+            }),
+            distinctUntilChanged()
+        ).subscribe((signal: ConnectionState) => {
+            this.adapterProfile.connectionState.next(signal)
+            if (signal == 'OFFLINE') this.console.error({ message: `${this.adapterProfile.clientId} disconnected` })
+            if (signal == 'ONLINE') this.console.log({ message: `${this.adapterProfile.clientId} connected and ready to go` })
+        })
+    }
 }
 
 

+ 2 - 7
src/base/msg.transmission.base.ts

@@ -1,16 +1,11 @@
 import { Observable, Subject, Subscription } from 'rxjs';
-import { AdapterInterface,  GeneralEvent,  MessageTransmissionInterface } from '../interface/interface'
+import { AdapterInterface, GeneralEvent, MessageTransmissionInterface, TransmissionProfile } from '../interface/interface'
 
 export class MessageTransmissionBase implements MessageTransmissionInterface {
-    protected clientId!: string;
+    protected profile!: TransmissionProfile
     protected adapters: AdapterInterface[] = []
     constructor() {
         // logic here
     }
-
-    protected handleAdapters(event: Observable<GeneralEvent<any>>): void {
-        throw new Error(`Method not implemented...`)
-    }
-
 }
 

+ 5 - 7
src/base/msg.transmission.manager.base.ts

@@ -1,8 +1,6 @@
 
-import { Observable, Observer, Subject, Unsubscribable } from 'rxjs';
-import { ClientObject, GeneralEvent, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmissionInterface, MessageTransmissionManagerInterface, MessageTransmitterInterface } from '../interface/interface';
-import { ActorInterface, ActorProfile } from '../interface/actor.sample';
-import { ActorBase } from './actor.base';
+import { Subject } from 'rxjs';
+import { GeneralEvent, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmissionInterface, MessageTransmissionManagerInterface, MessageTransmitterInterface, TransmissionProfile } from '../interface/interface';
 import { AdapterManager } from '../adapters/adapter.manager';
 
 export class MessageTransmissionManagerBase implements MessageTransmissionManagerInterface {
@@ -15,15 +13,15 @@ export class MessageTransmissionManagerBase implements MessageTransmissionManage
         // logic here
     }
     
-    getTransmitter(receiverId: string): MessageTransmitterInterface {
+    getTransmitter(profile: TransmissionProfile): MessageTransmitterInterface {
         throw new Error(`Method not implemented`)
     }
 
-    getReceiver(transmitterid: string): MessageReceiverInterface {
+    getReceiver(profile: TransmissionProfile): MessageReceiverInterface {
         throw new Error(`Method not implemented`)
     }
 
-    getRequestResponse(clientId: string, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface): MessageRequestResponseInterface {
+    getRequestResponse(profile: TransmissionProfile, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface): MessageRequestResponseInterface {
         throw new Error(`Method not implemented`)
     }
 

+ 0 - 6
src/interface/actor.sample.ts

@@ -15,12 +15,6 @@ export interface ActorProfile<T = any> {
     data?: T
 }
 
-export interface TransmisisonProfile {
-    clientId: string,
-    transmitterService: MessageTransmissionTransmitter,
-    receiverService: MessageTransmissionReceiver
-}
-
 export interface ActorMessage<T = any> {
     actorProfile: ActorProfile,
     payload: T

+ 15 - 26
src/interface/interface.ts

@@ -1,4 +1,4 @@
-import { BehaviorSubject, Observable, Subject, Unsubscribable } from "rxjs"
+import { BehaviorSubject, Observable } from "rxjs"
 import { WrappedMessage } from "../utils/message.ordering"
 
 /* EVENT BUS */
@@ -15,12 +15,12 @@ export interface GeneralEvent<T> {
 
 /* MANAGEMENT */
 export interface MessageTransmissionManagerInterface {
-    getTransmitter(receiverId: string): MessageTransmissionInterface
-    getReceiver(transmitterid: string): MessageReceiverInterface
+    getTransmitter(profile: TransmissionProfile): MessageTransmissionInterface
+    getReceiver(profile: TransmissionProfile): MessageReceiverInterface
 }
 
 export interface AdapterManagerInterface {
-    subscribeForAdapters(selfId: string, receiverId: string, role: TransmissionRole): Observable<AdapterInterface>
+    subscribeForAdapters(receiverId: string, role: TransmissionRole): Observable<AdapterInterface>
 }
 
 
@@ -37,17 +37,16 @@ export interface MessageTransmitterInterface extends MessageTransmissionInterfac
 }
 
 export interface MessageRequestResponseInterface extends MessageTransmissionInterface {
-    send(message: any): Observable<any>
+    send(message: any): Observable<any> 
 }
 
 /* ADAPTER COMPONENTS */
 export interface AdapterInterface {
-    setAdapterProfile(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface, role: TransmissionRole): void
-    getAdapterProfile(type?: `clientId` | `role` | `transportId` | `transportType` | `connectionState`): AdapterProfile | string | BehaviorSubject<ConnectionState> | undefined
+    getAdapterProfile(type?: `clientId` | `role` | `transportId` | `transportType` | `connectionState`): AdapterProfile | string | Observable<ConnectionState> | undefined
 }
 
 export interface TransmitterAdapterInterface extends AdapterInterface {
-    emit(message: WrappedMessage): void
+    emit(selfId: string, message: WrappedMessage): void
 }
 
 export interface ReceiverAdapterInterface extends AdapterInterface {
@@ -55,29 +54,14 @@ export interface ReceiverAdapterInterface extends AdapterInterface {
 }
 
 
-/* Utils */
-export interface TransmissionProfile {
-    id: string,
-    name: string,
-    dateCreated: Date
-}
-
-export interface TransmitterProfile extends TransmissionProfile {
-}
-
-export interface ReceiverProfile extends TransmissionProfile {
-}
-
-export interface RequestResponseProfile extends TransmissionProfile {
-}
-
+// /* Utils */
 export interface TransportMessage {
     id: string,
     dateCreated: Date,
     transport: TransportType,
-    self: string,
     target: string,
-    payload: any
+    source: string
+    payload: any,
 }
 
 export interface FisMessage {
@@ -134,4 +118,9 @@ export interface AdapterProfile {
     transportType: TransportType,
     transportService: TransportServiceInterface,
     connectionState: BehaviorSubject<ConnectionState>
+}
+
+export interface TransmissionProfile {
+    source: string
+    target: string
 }

+ 11 - 6
src/test/receiver.ts

@@ -4,10 +4,10 @@ import config from '../config/config.json';
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { FisMessage, GeneralEvent, Transport, TransportMessage, TransportServiceInterface, TransportSet } from "../interface/interface";
+import { FisMessage, GeneralEvent, TransportType, TransportServiceInterface, TransportSet, TransmissionProfile } from "../interface/interface";
 import { WebsocketTransportService } from "../transport/websocket";
 import { HttpTransportService } from "../transport/http";
-import clientProfile from '../../clients/serverprofile.json'
+import clientProfile from '../../clients/clientprofile.json'
 import serverProfile from '../../clients/serverprofile.json'
 import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
@@ -23,8 +23,13 @@ class Supervisor {
     private outgoingPipe: Subject<any> = new Subject()
     private transportServiceArray: TransportServiceInterface[] = []
     private transportSet: TransportSet[] = []
+    private config: TransmissionProfile = {
+        target: clientProfile.clientId,
+        source: clientProfile.id
+    }
 
     constructor() {
+        this.console.log({message: `Self ${this.config.source} && Target: ${this.config.target}`})
         this.event = new Subject()
         // Start setting up existing transport based on .env file
         this.sortTransportFromEnv(this.transportSet)
@@ -39,9 +44,9 @@ class Supervisor {
     }
 
     private startMessageTransmission(): void {
-        let transmitter: MessageTransmissionTransmitter = this.transmissionManager.getTransmitter(clientProfile.clientId) as MessageTransmissionTransmitter
-        let receiver: MessageTransmissionReceiver = this.transmissionManager.getReceiver(clientProfile.clientId) as MessageTransmissionReceiver
-        let requestResponse: MessageTransmissionRequestResponse = this.transmissionManager.getRequestResponse(clientProfile.id, transmitter, receiver)
+        let transmitter: MessageTransmissionTransmitter = this.transmissionManager.getTransmitter(this.config) as MessageTransmissionTransmitter
+        let receiver: MessageTransmissionReceiver = this.transmissionManager.getReceiver(this.config) as MessageTransmissionReceiver
+        let requestResponse: MessageTransmissionRequestResponse = this.transmissionManager.getRequestResponse(this.config, transmitter, receiver)
 
         // emit Message only
         // this.emitMessage(transmitter, this.generateNotifcation())
@@ -123,7 +128,7 @@ class Supervisor {
         }
     }
 
-    private instantiateTransportService(transportType: Transport, event: Subject<GeneralEvent<any>>): TransportServiceInterface {
+    private instantiateTransportService(transportType: TransportType, event: Subject<GeneralEvent<any>>): TransportServiceInterface {
         if (transportType === 'Websocket') {
             return new WebsocketTransportService(event)
         }

+ 9 - 5
src/test/transmitter.ts

@@ -3,7 +3,7 @@ import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { FisMessage, GeneralEvent, MessageTransmissionInterface, Transport, TransportMessage, TransportServiceInterface, TransportSet } from "../interface/interface";
+import { FisMessage, GeneralEvent, TransportType, TransportServiceInterface, TransportSet, TransmissionProfile } from "../interface/interface";
 import config from '../config/config.json';
 import { startSocketServer } from "../utils/socket.utils";
 import { WebsocketTransportService } from "../transport/websocket";
@@ -22,6 +22,10 @@ class Supervisor {
     private event!: Subject<GeneralEvent<any>>
     private transportSet: TransportSet[] = []
     private transportServiceArray: TransportServiceInterface[] = []
+    private config: TransmissionProfile = {
+        target: serverProfile.clientId,
+        source: serverProfile.id
+    }
 
     constructor() {
         this.event = new Subject<GeneralEvent<any>>()
@@ -39,9 +43,9 @@ class Supervisor {
     }
 
     private startMessageTransmission(): void {
-        let transmitter: MessageTransmissionTransmitter = this.transmissionManager.getTransmitter(serverProfile.clientId) as MessageTransmissionTransmitter
-        let receiver: MessageTransmissionReceiver = this.transmissionManager.getReceiver(serverProfile.clientId) as MessageTransmissionReceiver
-        let requestResponse: MessageTransmissionRequestResponse = this.transmissionManager.getRequestResponse(serverProfile.id, transmitter, receiver)
+        let transmitter: MessageTransmissionTransmitter = this.transmissionManager.getTransmitter(this.config) as MessageTransmissionTransmitter
+        let receiver: MessageTransmissionReceiver = this.transmissionManager.getReceiver(this.config) as MessageTransmissionReceiver
+        let requestResponse: MessageTransmissionRequestResponse = this.transmissionManager.getRequestResponse(this.config, transmitter, receiver)
 
         // emit Message only
         this.emitMessage(transmitter, this.messageProducer.getNotificationMessage())
@@ -127,7 +131,7 @@ class Supervisor {
         }
     }
 
-    private instantiateTransportService(transportType: Transport, event: Subject<GeneralEvent<any>>): TransportServiceInterface {
+    private instantiateTransportService(transportType: TransportType, event: Subject<GeneralEvent<any>>): TransportServiceInterface {
         if (transportType === 'Websocket') {
             return new WebsocketTransportService(event)
         }

+ 8 - 8
src/transmission/msg.transmission.manager.ts

@@ -3,10 +3,10 @@ import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { AdapterManager } from "../adapters/adapter.manager";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
-import { filter, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs";
+import { Subject } from "rxjs";
 import ConsoleLogger from "../utils/log.utils";
+import { GeneralEvent, MessageReceiverInterface, MessageTransmitterInterface, TransmissionProfile } from "../interface/interface";
 import { MessageTransmissionManagerBase } from "../base/msg.transmission.manager.base";
-import { GeneralEvent, MessageReceiverInterface, MessageTransmitterInterface } from "../interface/interface";
 
 export class MessageTransmissionManager extends MessageTransmissionManagerBase {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionManager`, ['managers'])
@@ -21,20 +21,20 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
     }
 
     // for now these fuctions ain't pure, cuz messageTransmissionManager needs to keep a record of these tranmission instances as well for safe keeping.
-    public getTransmitter(receiverId: string): MessageTransmitterInterface {
-        let transmitterInstance = new MessageTransmissionTransmitter(receiverId, this.adapterManager)
+    public getTransmitter(profile: TransmissionProfile): MessageTransmitterInterface {
+        let transmitterInstance = new MessageTransmissionTransmitter(profile, this.adapterManager)
         this.tranmissionRef.push(transmitterInstance)
         return transmitterInstance
     }
 
-    public getReceiver(transmitterid: string): MessageReceiverInterface {
-        let receiverInstance = new MessageTransmissionReceiver(transmitterid, this.adapterManager)
+    public getReceiver(profile: TransmissionProfile): MessageReceiverInterface {
+        let receiverInstance = new MessageTransmissionReceiver(profile, this.adapterManager)
         this.tranmissionRef.push(receiverInstance)
         return receiverInstance
     }
 
-    public getRequestResponse(clientId: string, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface): MessageTransmissionRequestResponse {
-        return new MessageTransmissionRequestResponse(clientId, transmitterInstance, receiverInstance)
+    public getRequestResponse(profile: TransmissionProfile, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface): MessageTransmissionRequestResponse {
+        return new MessageTransmissionRequestResponse(profile, transmitterInstance, receiverInstance)
     }
 
 }

+ 19 - 9
src/transmission/msg.transmission.receiver.ts

@@ -4,7 +4,7 @@ import { ReceiverAdapter } from '../adapters/adapter.receiver';
 import { checkMessage, WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
 import { MessageTransmissionBase } from '../base/msg.transmission.base';
-import { AdapterInterface, AdapterManagerInterface, ConnectionState, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
+import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransmissionProfile, TransportMessage } from '../interface/interface';
 import { error } from 'console';
 
 export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
@@ -15,16 +15,16 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
     private incomingMessage: Subject<GeneralEvent<TransportMessage>> = new Subject()
     // private toBePassedOver: Subject<WrappedMessage> = new Subject()
 
-    constructor(clientId: string, adapterManager: AdapterManagerInterface) {
+    constructor(profile: TransmissionProfile, adapterManager: AdapterManagerInterface) {
         super()
-        this.clientId = clientId
-
+        this.profile = profile
+        this.console.log({ message: `Constructing Receiver Transmission for Receiving target: ${this.profile.target}` })
         this.initializeReceiverComponents(adapterManager)
     }
 
     public getReceivables(): Observable<GeneralEvent<TransportMessage>> {
         return new Observable((receivable: Observer<GeneralEvent<TransportMessage>>) => {
-            this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
+            this.console.log({ message: `Transmission streaming messages from ${this.profile.target}` })
             const subscription: Subscription = this.incomingMessage.pipe(
                 filter((event: GeneralEvent<any>) => event.event == 'New Message'),
             ).subscribe((event: GeneralEvent<TransportMessage>) => {
@@ -50,14 +50,24 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
 
     /* Assigned and update adapters record. Currently no logic to swtich adapters based on performance or whatever logic to be integrated in the future */
     private initializeReceiverComponents(adapterManager: AdapterManagerInterface): void {
-        adapterManager.subscribeForAdapters(this.clientId, `Receiver`).pipe(
+        adapterManager.subscribeForAdapters(this.profile.target, `Receiver`).pipe(
         ).subscribe((adapter: AdapterInterface) => {
-            this.console.log({ message: `Adding new ${adapter.transport} receiving adapter. Current adapter length: ${this.adapters.length}` })
             this.adapters.push(adapter)
+            this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} receiving adapter. Current adapter length: ${this.adapters.length}` })
             if (!this.currentAdapter) {
-                this.console.log({ message: `Setting this ${adapter.transport} as current adapter.` })
+                this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`role`)} as current adapter.` })
                 this.currentAdapter = adapter as ReceiverAdapterInterface
-                this.currentAdapter.connectionState.subscribe(this.connectionStateEvent)
+                this.currentAdapter.subscribeForIncoming().subscribe({
+                    next: (message: GeneralEvent<TransportMessage>) => {
+                        this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })
+                        this.incomingMessage.next(message)
+                    },
+                    error: error => {
+                        // Error handling. Idealling switching to other adapters
+                    }
+                })
+                let connectionState: Observable<ConnectionState> = this.currentAdapter.getAdapterProfile(`connectionState`) as Observable<ConnectionState>
+                connectionState.subscribe(this.connectionStateEvent)
             } else {
                 this.currentAdapter.subscribeForIncoming().subscribe({
                     next: (message: GeneralEvent<TransportMessage>) => this.incomingMessage.next(message),

+ 3 - 3
src/transmission/msg.transmission.request-response.ts

@@ -3,16 +3,16 @@ import { filter, map, Observable, Observer, Subject, Subscription, takeWhile } f
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
-import { AdapterInterface, FisMessage, GeneralEvent, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransportMessage } from "../interface/interface";
+import { AdapterInterface, FisMessage, GeneralEvent, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransmissionProfile, TransportMessage } from "../interface/interface";
 import { WrappedMessage } from "../utils/message.ordering";
 
 export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface {
     transmitterInstance!: MessageTransmitterInterface;
     receiverInstance!: MessageReceiverInterface;
 
-    constructor(clientId: string, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface) {
+    constructor(profile: TransmissionProfile, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface) {
         super()
-        this.clientId = clientId
+        this.profile = profile
         this.transmitterInstance = transmitterInstance
         this.receiverInstance = receiverInstance
     }

+ 10 - 9
src/transmission/msg.transmission.transmitter.ts

@@ -4,7 +4,7 @@ import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Subject
 import { RetransmissionService } from "../utils/retransmission.service";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmitterAdapterInterface, TransportMessage } from "../interface/interface";
+import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmissionProfile, TransmitterAdapterInterface, TransportMessage } from "../interface/interface";
 import { error } from "console";
 import { TransmitterAdapter } from "../adapters/adapter.transmitter";
 
@@ -17,10 +17,10 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     private buffer!: RetransmissionService;
     private currentAdapter!: TransmitterAdapterInterface
 
-    constructor(clientId: string, adapterManager: AdapterManagerInterface) {
+    constructor(profile: TransmissionProfile, adapterManager: AdapterManagerInterface) {
         super()
-        this.console.log({ message: `Constructing Transmitter Transmission with ${clientId}` })
-        this.clientId = clientId
+        this.profile = profile
+        this.console.log({ message: `Constructing Transmitter Transmission for Receiving target: ${this.profile.target}` })
         this.messageToBeBuffered = new Subject()
         this.buffer = new RetransmissionService()
         this.initializeTransmitterComponents(adapterManager)
@@ -36,13 +36,14 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     private initializeTransmitterComponents(adapterManager: AdapterManagerInterface): void {
         this.console.log({ message: `Setting up Retransmission Service...` })
         // Listen and update adapters
-        adapterManager.subscribeForAdapters(this.clientId, `Transmitter`).subscribe((adapter: AdapterInterface) => {
-            this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} transmitting adapter. Current adapter length: ${this.adapters.length}` })
+        adapterManager.subscribeForAdapters(this.profile.target, `Transmitter`).subscribe((adapter: AdapterInterface) => {
             this.adapters.push(adapter)
+            this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} transmitting adapter. Current adapter length: ${this.adapters.length}` })
             if (!this.currentAdapter) {
                 this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`transportType`)} as current adapter.` })
                 this.currentAdapter = adapter as TransmitterAdapterInterface
-                this.currentAdapter.connectionState.subscribe(this.connectionStateEvent)
+                let connectionState: Observable<ConnectionState> = this.currentAdapter.getAdapterProfile('connectionState') as Observable<ConnectionState>
+                connectionState.subscribe(this.connectionStateEvent)
             }
         })
 
@@ -52,7 +53,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
             // need to work with wrapped messages
             this.console.log({ message: `Transmitting ${bufferedMessage.thisMessageID}` });
             if (this.currentAdapter) {
-                this.currentAdapter.emit(bufferedMessage)
+                this.currentAdapter.emit(this.profile.source, bufferedMessage)
             } else {
                 // just flush back the message inside the buffer, if the adapter is not ready or assigned.
                 this.messageToBeBuffered.next(bufferedMessage)
@@ -65,7 +66,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     private uniqueHandlerToFlushUnsentMessages(event: Observable<GeneralEvent<any>>): void {
         event.pipe(
             filter(event => event.event == 'Re-Flush'),
-            filter(event => event.data.clientId == this.clientId),
+            filter(event => event.data.clientId == this.profile.target),
         ).subscribe((event: GeneralEvent<any>) => {
             this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}` : `Buffering ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}`}` })
             this.messageToBeBuffered.next(((event.data.payload as TransportMessage).payload as WrappedMessage))

+ 1 - 1
src/utils/socket.utils.ts

@@ -81,7 +81,7 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
             // Listen for messages from the server. Generally here's the responses
             socket.on('message', (msg: TransportMessage) => {
                 if (clientProfileInfo) {
-                    console.log({ message: `Websocket Client Transport Receive Msg` })
+                    console.log({ message: `Websocket Client Transport Receive Msg from ${msg.source}` })
                     // publish to event
                     eventNotification.next({
                         id: uuidv4(),

+ 2 - 2
src/utils/transport.utils.ts

@@ -1,5 +1,5 @@
 import { Subject } from "rxjs";
-import { GeneralEvent, Transport, TransportServiceInterface, TransportSet } from "../interface/interface";
+import { GeneralEvent, TransportType, TransportServiceInterface, TransportSet } from "../interface/interface";
 import { WebsocketTransportService } from "../transport/websocket";
 import { HttpTransportService } from "../transport/http";
 import ConsoleLogger from "./log.utils";
@@ -31,7 +31,7 @@ export function setUpTransportService(transportSet: TransportSet, event: Subject
     }
 }
 
-function instantiateTransportService(transportType: Transport, event: Subject<GeneralEvent<any>>): TransportServiceInterface {
+function instantiateTransportService(transportType: TransportType, event: Subject<GeneralEvent<any>>): TransportServiceInterface {
     if (transportType === 'Websocket') {
         return new WebsocketTransportService(event)
     }