Browse Source

name changes and code cleaning

Dr-Swopt 1 tháng trước cách đây
mục cha
commit
533b8af6e2

+ 34 - 19
doc/explanation.txt

@@ -1,20 +1,25 @@
-Discussion Points: (Do these first)
-i) Explore multiple traffic concept
--To make use of different ports as well as multi ISP to stream data. Eg: External Netword card.
--See how I can talk to underlying OS network configuration to make use of the network stream and delegate ports to it
-ii) Move transport service instantiation to adapterManager
-ExtraNotes: In some cases, servers can only be transmitting. Although this program allows for dual roles if there's a need for me.
-
-For 2/12/2024 Monday (Updated!!!!)
-URGENT: Fix the default socket transmission first. SOmething wrong with retransmission for both side when either side reconnected. Play with proxy for now.
-i) Do include in the list of discussion for the message ordering mechanism
-- Currently the way it is working is not favorable. In the mean time, do think up suggestion to improve the ordering.
-- Just to acquire boss's input. Doesn't have to challenge him or anything
-ii) Http Service Prep
--Need to emulate logical channel somehow.
-iii) TCP Service Prep
--Try this out, so that I can proceed with multi port and metric performance
--Adapter Manager need to be intelligent enough to instantiate necessary components. So there will be a need to also check if a port is busy or in used.
+Concept as of 5/12/2024:
+So, when an application run, it will instantiate the transmission manager that will in turn instantiate adapter manager. The adapter manager will turn first check the 
+configuration file, whether a config json file or .env depending on the environment, and set up the necessary transport services in order to be able to create the 
+necessary adapter instance, but not right away. It will have the information, but not create them right away, unless there are already clients connected. The adapter
+manager would also cater for the situation that it would also connect with existing transport that are active. Of course, the details for such mechanism are yet to be
+discussed further. Until then, current context will only assume that this adapter will instantiate the necessary components. Now the scenario is such way that when 
+there a client receiver that wishes to connect, it will first perform then necessary handshake protocol, in this socket case, which is to specify  whether or not
+the client is a new clietn or a previously connected client. Then the tranpsort services that was instantiated earlier on by the adapter will first assign the necessary
+details to the connected clients so that the information of the client is first established and kept in record so to speak. Same things goes for previius client, Until
+the transport service can establish their respective identiites, no message transmission will be carried out, and in the case of old clients, the transmitted messages
+will still be in the buffer. So, once the identities of the clients receiver has been established, the transport service will then in turn publish a transport event,
+and then transmission manager who had been subscribed to that event bus will pick up the signal so to speak, and instantiate the necessary transmission components.
+So what happens here also, is that during that procress, it will also liase with adapter manager to instantiate and return available adapters, so that that it can
+included in part of the transmission set. So once, the "package" of transmsision set is ready, which by now consists of a set of transmission and it's respective 
+adaptesr, along side with the buffer mechanism in place, the application or the producer or client if you would can start using these transmitter and receiver 
+transmission components to start sending and receiving messages without having to worry about the underlying transport system. 
+Things to note, the logic to decide which adapter to use and how many adapters will be placed in transmitter tramsmission itself. Naturally, there would be 
+presumably eitehr a performnace metrics measurement of it's kind, whether self-written or an existing library to be consulted so to speak, before deciding 
+on the usage of the available adapters. The transmission components will also have the ability to take and throw away their adapters, depending on the signal
+given by transmission manager. Essentially, transmission manager will keep itself up to date via the aforementioned event bus, and notify and return additional
+adapters if there are any. Adapter manager will basically keep an out or instantiate more circumstantially, and will update transmission manager via the event bus.
+
 
 
 Things to do:
@@ -29,7 +34,7 @@ i) R&D for multi channel data traversal.
 ii) Functional Http Service options to be made available.
 -Default transport will be geared towards socket at the moment.
 iii) Code Adjustments and Cleaning
--Make sure the file structure and folders are in orderi
+-Make sure the file structure and folders are in order
 -Necessary comments
 iv) Documentation
 -A special Readme file to help understand the usage and what it does.
@@ -45,4 +50,14 @@ i) A global logging service that can be configured and toggled <DONE>
 Things to be discuss later: (as of 2/12/2024)
 i) If a service crashes and comes back up, does client(UI sender) still send requests?
 -Since the service loses all it's memory and have no clue who were connected and what it was doing unless there's a state machine.
--Current paradigm only solves for internet disruption, it doesn't assume either of the sender and receiver crashes.
+-Current paradigm only solves for internet disruption, it doesn't assume either of the sender and receiver crashes.
+
+Discussion points as of 5th December:
+i) Transmission Components can be using multiple adapters for transmission operatores. There would be utilities to measure performances as such
+to be included in the near future.
+ii) Actor system: May or may not expose methods circumstantially. So it all depends. But utlize interfaces for services and have them become functional so thatit
+it can be reusedd again
+iii) Learn how to use uml to better represents my ideas and absracting a concept so to speak.
+Dont' forget about depedencies injection
+
+

+ 3 - 16
src/connector/adapter.base.ts

@@ -8,9 +8,7 @@ So how?: */
 export class Adapter implements AdaptorBase {
     event!: Subject<TransportEvent>
     connector!: TransportService;
-    connectorProfile!: AdapterProfile;
-    connectionStateBus!: Subject<ConnectionState>;
-    adaptorTransmissionRole!: AdaptorTransmissionRole;
+    adapterProfile!: AdapterProfile;
 
     constructor() {
         //logic here
@@ -18,20 +16,9 @@ export class Adapter implements AdaptorBase {
 
     getInfo(): AdapterProfile {
         // throw new Error("Method not implemented.");
-        return this.connectorProfile
-    }
-    subscribeConnectionState(): Observable<ConnectionState> {
-        throw new Error("Method not implemented.");
-    }
-    publishConnectionState(): void {
-        throw new Error("Method not implemented.");
-    }
-    connect(): void {
-        throw new Error("Method not implemented.");
-    }
-    disconnect(): void {
-        throw new Error("Method not implemented.");
+        return this.adapterProfile
     }
+   
     setAdapterProfile(id: string, transportType: Transport): void {
         throw new Error("Method not implemented.");
     }

+ 7 - 10
src/connector/adapter.manager.ts

@@ -13,7 +13,7 @@ import { AdapterSet } from "../interface/general.interface"
 export class AdapterManager implements AdapterManagerInterface {
     private console: ConsoleLogger = new ConsoleLogger(`AdapterManager`, ['managers'])
     private transportServiceArray: TransportService[] = []
-    private transportSet: Set<TransportSet> = new Set()
+    private transportSet: TransportSet[] = []
     private adapterSet: AdapterSet[] = []
     private event!: Subject<TransportEvent>
 
@@ -21,13 +21,15 @@ export class AdapterManager implements AdapterManagerInterface {
         this.event = event
         this.console.log({ message: `Contructing self...` })
 
-        this.sort(this.transportSet)
+        this.sortTransportFromEnv(this.transportSet)
         this.transportSet.forEach(set => {
             this.setUpTransportService(set, event, browserEnv)
         })
     }
 
-    getAdapter(clientId: string): AdapterSet | null {
+    /* This one change to subscribe since I want it to return an array of adapters instead of just a set of them, becauise there could
+    mulitple adapters that the manager will have to work with */
+    public getAdapter(clientId: string): AdapterSet | null {
         this.console.log({ message: `Instantiating an adapter set....` })
         let transportType: Transport = process.env.Transport as unknown as Transport // as default  for now
         let adapterId: string = clientId
@@ -55,11 +57,6 @@ export class AdapterManager implements AdapterManagerInterface {
         }
     }
 
-    public getTransportArray(): TransportService[] {
-        return this.transportServiceArray
-    }
-
-
     // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.
     private setUpTransportService(transportSet: TransportSet, event: Subject<TransportEvent>, isClient?: boolean): void {
         this.instantiateTransportService(transportSet.transport, event).then((transportService: TransportService) => {
@@ -100,11 +97,11 @@ export class AdapterManager implements AdapterManagerInterface {
         })
     }
 
-    private sort(transportSet: Set<TransportSet>): void {
+    private sortTransportFromEnv(transportSet: TransportSet[]): void {
         let transportList: string[] = process.env.Transport?.split(',') || []
         let portList: number[] = (process.env.PORT?.split(',') || []).map(port => Number(port));
         transportList.forEach((transport, index) => {
-            transportSet.add({ transport: transport, port: portList[index] } as unknown as TransportSet)
+            transportSet.push({ transport: transport, port: portList[index] } as unknown as TransportSet)
         })
         this.console.log({ message: 'TransportSetList', details: this.transportSet })
     }

+ 4 - 4
src/connector/adapter.receiver.ts

@@ -18,24 +18,24 @@ export class ReceiverAdapter extends Adapter implements ReceiverAdapterInterface
         this.console = new ConsoleLogger(`${transportType}ReceiverAdapter`, ['adapter'])
         this.connector = transportService
         this.setAdapterProfile(adapterId, transportType)
-        this.console.log({ message: `Just testing to see if receiverAdapter is instantiated properly ${this.connectorProfile, this.connector ? 'TransportService Instantiated' : 'Trnasport Service not instantiated'}` })
+        this.console.log({ message: `Just testing to see if receiverAdapter is instantiated properly ${this.adapterProfile, this.connector ? 'TransportService Instantiated' : 'Trnasport Service not instantiated'}` })
     }
 
     setAdapterProfile(id: string, transportType: Transport): void {
-        this.connectorProfile = {
+        this.adapterProfile = {
             id: id,
             transportType: transportType
         }
     }
 
     getMessageBus(bus: Bus): Observable<TransportEvent> {
-        this.console.log({ message: `Connector getting message bus for this connector: ${this.connectorProfile.id}` })
+        this.console.log({ message: `Connector getting message bus for this connector: ${this.adapterProfile.id}` })
         return new Observable((observable: Observer<TransportEvent>) => {
             if (bus == Bus.GeneralBus) {
                 const subscription: Subscription = this.connector.subscribe().pipe(
                     filter((message: TransportEvent) => message.event === 'New Message'),
                     // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
-                    filter((message: TransportEvent) => (message.data as TransportMessage).target == this.connectorProfile.id),
+                    filter((message: TransportEvent) => (message.data as TransportMessage).target == this.adapterProfile.id),
                 ).subscribe((message: TransportEvent) => {
                     this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })
                     observable.next(message);

+ 2 - 12
src/connector/adapter.request.response.ts

@@ -1,5 +1,5 @@
  import dotenv from 'dotenv';
-import { Bus, FisMessage, TransmissionMessage } from "../interface/transport.interface";
+import { Bus, FisMessage } from "../interface/transport.interface";
 import { RequestResponseAdapter as RequestResponseAdapterInterface, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { filter, map, Observable, Observer, Subscription, takeWhile } from 'rxjs';
 import { WrappedMessage } from '../utils/message.ordering';
@@ -29,7 +29,7 @@ export class RequestResponseAdapter extends Adapter implements RequestResponseAd
             const subscription: Subscription = this.receiverAdapter.getMessageBus(Bus.GeneralBus).pipe(
                 filter((message: TransportEvent) => message.event === 'New Message'),
                 // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
-                filter((message: TransportEvent) => (message.data as TransportMessage).target == this.connectorProfile.id),
+                filter((message: TransportEvent) => (message.data as TransportMessage).target == this.adapterProfile.id),
                 takeWhile((message: TransportEvent) => {
                     const shouldTake = ((message.data as TransportMessage).payload as FisMessage).data !== 'Complete';
                     if (!shouldTake) {
@@ -49,15 +49,5 @@ export class RequestResponseAdapter extends Adapter implements RequestResponseAd
         })
     }
 
-
-    /* Extended from both transmitter and receiver */
-    getMessageBus(bus: Bus): Observable<TransportEvent> {
-        throw new Error('Method not implemented.');
-    }
-
-    emit(message: WrappedMessage): void {
-        throw new Error('Method not implemented.');
-    }
-
 }
 

+ 5 - 6
src/connector/adapter.transmitter.ts

@@ -11,7 +11,6 @@ dotenv.config();
 So how?: */
 export class TransmitterAdapter extends Adapter implements TransmitterAdapterInterface {
     private console!: ConsoleLogger
-    connectionStateBus: Subject<ConnectionState> = new Subject()
 
     constructor(adapterId: string, adapterType: Transport, transportService: TransportService) {
         super()
@@ -23,17 +22,17 @@ export class TransmitterAdapter extends Adapter implements TransmitterAdapterInt
 
     emit(message: WrappedMessage): void {
         // logic here
-        this.console.log({ message: `Emitting: ${message.thisMessageID} to ${this.connectorProfile.id}` })
+        this.console.log({ message: `Emitting: ${message.thisMessageID} to ${this.adapterProfile.id}` })
         this.connector.emit({
-            id: this.connectorProfile.id,
-            transport: this.connectorProfile.transportType,
-            target: this.connectorProfile.id, // this should be directed to the channel/client established. Make sure this is right
+            id: this.adapterProfile.id,
+            transport: this.adapterProfile.transportType,
+            target: this.adapterProfile.id, // this should be directed to the channel/client established. Make sure this is right
             payload: message
         } as TransportMessage)
     }
 
     setAdapterProfile(id: string, adapterType: Transport): void {
-        this.connectorProfile = {
+        this.adapterProfile = {
             id: id,
             transportType: adapterType
         }

+ 2 - 8
src/interface/connector.interface.ts

@@ -22,15 +22,9 @@ export interface AdapterManager {
 
 export interface AdaptorBase {
     connector: TransportService // this one will refer to the actual tranpsort service like websocket and so on
-    connectorProfile: AdapterProfile
-    connectionStateBus: Subject<ConnectionState>
-    adaptorTransmissionRole: AdaptorTransmissionRole
+    adapterProfile: AdapterProfile
     event: Subject<TransportEvent>
 
-    subscribeConnectionState(): Observable<ConnectionState>
-    publishConnectionState(): void
-    connect(): void
-    disconnect(): void
     getInfo(): AdapterProfile
     setAdapterProfile(id: string, transportType: Transport): void
 }
@@ -44,7 +38,7 @@ export interface ReceiverAdapter extends AdaptorBase {
     getMessageBus(bus: Bus): Observable<any>
 }
 
-export interface RequestResponseAdapter extends TransmitterAdapter, ReceiverAdapter {
+export interface RequestResponseAdapter extends AdaptorBase {
     send(message: WrappedMessage): Observable<FisMessage>
 }
 

+ 4 - 9
src/interface/transport.interface.ts

@@ -5,11 +5,11 @@ import { MessageTransmissionReceiver } from "../transmission/msg.transmission.re
 import { RetransmissionService } from "../utils/retransmission.service";
 import { Adapter } from "../connector/adapter.base";
 export interface MessageTransmissionManager {
-    subscribe(): Observable<MessageTransmission>
+    subscribe(): Observable<MessageTransmissionSet>
     getEvent(): Observable<TransportEvent>
 }
 
-export interface MessageTransmission {
+export interface MessageTransmissionSet {
     id: string,
     transmitter: MessageTransmissionTransmitter,
     receiver: MessageTransmissionReceiver,
@@ -18,7 +18,6 @@ export interface MessageTransmission {
 }
 
 export interface MessageTransmissionBase {
-    msgRepositoryService: any // like logging service and what not
     transmissionRole: AdaptorTransmissionRole
     mainAdapter: Adapter
 
@@ -30,14 +29,14 @@ export interface MessageReceiver extends MessageTransmissionBase {
     receiverProfile: ReceiverProfile
 
     getMessageBus(bus: Bus): Observable<any>
-    setReceiver(receiverProfile: ReceiverProfile, role: AdaptorTransmissionRole, event: Subject<TransportEvent>): void
+    setReceiver(receiverProfile: ReceiverProfile, event: Observable<TransportEvent>): void
 }
 
 export interface MessageTransmitter extends MessageTransmissionBase {
     transmitterProfile: TransmitterProfile
     retransmission: RetransmissionService
 
-    setTransmitter(transmitterProfile: TransmitterProfile, role: AdaptorTransmissionRole, event: Subject<TransportEvent>): void
+    setTransmitter(transmitterProfile: TransmitterProfile, event: Observable<TransportEvent>): void
 }
 
 export interface MessageRequestResponse extends MessageTransmissionBase {
@@ -72,10 +71,6 @@ export interface ReceiverProfile extends TransmissionProfile {
 export interface RequestResponseProfile extends TransmissionProfile {
 
 }
-export interface TransmissionMessage {
-    adapterId: string,
-    payload: FisMessage
-}
 
 export enum Bus {
     GeneralBus,

+ 5 - 5
src/test/receiver.ts

@@ -1,5 +1,5 @@
 import { filter, interval, map, Observable, Observer, Subject } from "rxjs";
-import { Bus, FisMessage, MessageTransmission } from "../interface/transport.interface";
+import { Bus, FisMessage, MessageTransmissionSet } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { TransportEvent, TransportMessage } from "../interface/connector.interface";
@@ -12,14 +12,14 @@ class Supervisor {
     private isClient: boolean = true
     private transmissionManager!: MessageTransmissionManager
     private event: Observable<TransportEvent>
-    private transmissionSets: MessageTransmission[] = []
+    private transmissionSets: MessageTransmissionSet[] = []
     private outgoingPipe: Subject<any> = new Subject()
 
     constructor() {
         this.transmissionManager = new MessageTransmissionManager(this.isClient)
         this.event = this.transmissionManager.getEvent()
 
-        this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmission) => {
+        this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmissionSet) => {
             this.console.log({ message: `Acquired transmission set for client` })
             this.transmissionSets.push(transmissionSet)
 
@@ -31,7 +31,7 @@ class Supervisor {
     }
 
     // only called once for each connected clients.
-    private handleActivity(messageTransmission: MessageTransmission): void {
+    private handleActivity(messageTransmission: MessageTransmissionSet): void {
         // start listening to incoming messages from this client
         messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransportEvent) => {
             this.console.log({ message: `General Bus ${event.event}`, details: event })
@@ -54,7 +54,7 @@ class Supervisor {
         this.startGeneratingRequest(1000, this.outgoingPipe)
     }
 
-    private request(request: FisMessage, messageTransmission: MessageTransmission): Observable<any> {
+    private request(request: FisMessage, messageTransmission: MessageTransmissionSet): Observable<any> {
         return new Observable((response: Observer<any>) => {
             messageTransmission.transmitter.emit(request)
             this.generalBus.pipe(

+ 4 - 4
src/test/transmitter.ts

@@ -1,5 +1,5 @@
 import { filter, interval, map, Observable, Observer, Subject, take } from "rxjs";
-import { Bus, EventMessage, FisMessage, MessageTransmission } from "../interface/transport.interface";
+import { Bus, EventMessage, FisMessage, MessageTransmissionSet } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { TransportEvent, TransportMessage } from "../interface/connector.interface";
@@ -11,7 +11,7 @@ class Supervisor {
     private messageProducer!: MessageProducer
     private transmissionManager!: MessageTransmissionManager
     private event!: Observable<TransportEvent>
-    private transmissionSets: MessageTransmission[] = []
+    private transmissionSets: MessageTransmissionSet[] = []
 
     constructor() {
         // so need them adapters now. But supervisor shouldn't be concerned, only messageTransmissionManager and ConnectionManager
@@ -19,7 +19,7 @@ class Supervisor {
         this.transmissionManager = new MessageTransmissionManager()
         this.event = this.transmissionManager.getEvent()
 
-        this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmission) => {
+        this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmissionSet) => {
             this.transmissionSets.push(transmissionSet)
 
             this.handleClientActivity(transmissionSet)
@@ -27,7 +27,7 @@ class Supervisor {
     }
 
     // only called once for each connected clients.
-    private handleClientActivity(messageTransmission: MessageTransmission): void {
+    private handleClientActivity(messageTransmission: MessageTransmissionSet): void {
         // start listening to incoming messages from this client
         messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransportEvent) => {
             this.console.log({ message: `General Bus`, details: event }) // receiving end

+ 0 - 2
src/transmission/msg.transmission.base.ts

@@ -7,10 +7,8 @@ import { Adapter } from '../connector/adapter.base';
 
 export class MessageTransmissionBase implements MessageTransmissionBaseInterface {
     event!: Observable<TransportEvent>
-    msgRepositoryService: any;
     transmissionRole!: AdaptorTransmissionRole;
     adaptorsArray: Array<Adapter> = []
-    transmissionService: any;
     mainAdapter!: Adapter;
 
     constructor() {

+ 25 - 27
src/transmission/msg.transmission.manager.ts

@@ -1,7 +1,7 @@
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { AdapterManager } from "../connector/adapter.manager";
-import { EventMessage, MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmitterProfile } from "../interface/transport.interface";
+import { EventMessage, MessageTransmissionSet, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmitterProfile } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { TransportEvent, Event } from "../interface/connector.interface";
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
@@ -14,7 +14,7 @@ import { AdapterSet } from "../interface/general.interface";
 export class MessageTransmissionManager implements MessageTransmissionManagerInterface {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionManager`, ['managers'])
     private browserEnv!: boolean
-    transmission: MessageTransmission[] = []
+    transmission: MessageTransmissionSet[] = []
     connectionManager!: AdapterManager
     event!: Subject<TransportEvent>
 
@@ -33,14 +33,14 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
 
     /* so there will be some changes here. will nto be assigning just one, but all of them dynamically to pour into this boy
     Transmitter only have to call this once. */
-    subscribe(): Observable<MessageTransmission> {
-        return new Observable((observer: Observer<MessageTransmission>) => {
+    subscribe(): Observable<MessageTransmissionSet> {
+        return new Observable((observer: Observer<MessageTransmissionSet>) => {
             const targetEvent: Event = this.browserEnv ? 'New Server' : 'New Client';
             this.event.pipe(
                 filter(event => event.event == targetEvent)
             ).subscribe(event => {
                 // get all adapters for all the connection
-                let messageTransmissionSet: MessageTransmission | undefined = this.instantiateComponents((event.data as EventMessage).clientId)
+                let messageTransmissionSet: MessageTransmissionSet | undefined = this.instantiateComponents((event.data as EventMessage).clientId)
                 if (messageTransmissionSet) {
                     observer.next(messageTransmissionSet)
                 }
@@ -52,27 +52,25 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
         return this.event.asObservable()
     }
 
-    private instantiateComponents(clientId: string): MessageTransmission | undefined {
+    private instantiateComponents(clientId: string): MessageTransmissionSet | undefined {
         this.console.log({ message: `Instantiating new transmission set for  ${this.browserEnv ? 'Server' : 'Client'}: ${clientId}` })
-        if (this.connectionManager.getTransportArray().length > 0) {
-            let adapterSet: AdapterSet | null = this.connectionManager.getAdapter(clientId)
-            if (adapterSet) {
-                let transmitter: MessageTransmissionTransmitter = this.getTransmitter(clientId, adapterSet.transmitterAdapter, this.event.asObservable())
-                let receiver: MessageTransmissionReceiver = this.getReceiver(clientId, adapterSet.receiverAdapter, this.event.asObservable())
-                let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver, this.event.asObservable())
-                let transmission: MessageTransmission = {
-                    id: clientId,
-                    transmitter: transmitter,
-                    receiver: receiver,
-                    requestResponse: requestResponse,
-                    event: this.event.asObservable()
-                }
-                this.transmission.push(transmission)
-                return transmission
-            } else {
-                this.console.error({ message: 'No Adapter Set' })
-                return undefined
+        let adapterSet: AdapterSet | null = this.connectionManager.getAdapter(clientId)
+        if (adapterSet) {
+            let transmitter: MessageTransmissionTransmitter = this.getTransmitter(clientId, adapterSet.transmitterAdapter, this.event.asObservable())
+            let receiver: MessageTransmissionReceiver = this.getReceiver(clientId, adapterSet.receiverAdapter, this.event.asObservable())
+            let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver, this.event.asObservable())
+            let transmission: MessageTransmissionSet = {
+                id: clientId,
+                transmitter: transmitter,
+                receiver: receiver,
+                requestResponse: requestResponse,
+                event: this.event.asObservable()
             }
+            this.transmission.push(transmission)
+            return transmission
+        } else {
+            this.console.error({ message: 'No Adapter Set' })
+            return undefined
         }
     }
 
@@ -110,16 +108,16 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
 
     private reconnectionHandler(clientId: string): void {
         this.console.log({ message: `TransmissionManager: A reconnection occured. Client: ${clientId}` })
-        let transmissionObj: MessageTransmission | undefined = Array.from(this.transmission).find(obj => obj.id === clientId)
+        let transmissionObj: MessageTransmissionSet | undefined = Array.from(this.transmission).find(obj => obj.id === clientId)
         if (!transmissionObj) {
-            let transmissionSet: MessageTransmission | undefined = this.instantiateComponents(clientId)
+            let transmissionSet: MessageTransmissionSet | undefined = this.instantiateComponents(clientId)
             if (transmissionSet) {
                 this.transmission.push(transmissionSet)
             } else {
                 this.console.error({ message: `Cannot find client transmission obj : ${clientId}` })
             }
         } {
-            this.console.log({ message: `Transmission Object for ${clientId} Found`})
+            this.console.log({ message: `Transmission Object for ${clientId} Found` })
         }
     }
 }

+ 2 - 2
src/transmission/msg.transmission.transmitter.ts

@@ -26,13 +26,13 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         this.retransmission = new RetransmissionService()
         this.setTransmitter(profile)
         this.setUpAdapter(adapter)
-        this.setUpRetransmission()
+        this.setupBuffer()
 
         // special case just for http in case of server/client disconnected, the unsent msg will be flushed back into messageToBeTransmitted
         this.uniqueHandlerToFlushUnsentMessages(event)
     }
 
-    setUpRetransmission(): void {
+    setupBuffer(): void {
         this.console.log({ message: `Setting up Retransmission Service...` })
         this.event.pipe(
             filter(event => (event.data as EventMessage).clientId == this.transmitterProfile.id),

+ 10 - 10
src/transport/websocket.ts

@@ -1,5 +1,5 @@
 import { Observable, Subject } from "rxjs";
-import { Socket as ClientSocket } from 'socket.io-client'
+import { Socket as SocketForConnectedServer } from 'socket.io-client'
 import { Socket as SocketForConnectedClient } from "socket.io"
 import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
 import { ClientObject, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
@@ -11,8 +11,8 @@ import ConsoleLogger from "../utils/log.utils";
 export class WebsocketTransportService implements TransportService {
     private console: ConsoleLogger = new ConsoleLogger(`WebsocketTransportService`, ['transport'])
     private info: Transport = Transport.Websocket
-    private connectedServer: ConnectedServerSocket[] = [] // to allow the possibility of having to communicate with multiple servers as a client
-    private connectedClientSocket: ConnectedClientSocket[] = [] // to keep track of the all the clients that are connected
+    private connectedSocketServer: ConnectedSocketServer[] = [] // to allow the possibility of having to communicate with multiple servers as a client
+    private connectedClientSocket: ConnectedSocketClient[] = [] // to keep track of the all the clients that are connected
     // private incomingMessage: Subject<TransportMessage> = new Subject() // this is only for client roles only atm
     private transportEvent!: Subject<TransportEvent>
 
@@ -39,8 +39,8 @@ export class WebsocketTransportService implements TransportService {
 
     public startClient(url: string): void {
         // logic here
-        startClientSocketConnection(url).then((socket: ClientSocket) => {
-            handleClientSocketConnection(socket, this.connectedServer).subscribe(this.transportEvent)
+        startClientSocketConnection(url).then((socket: SocketForConnectedServer) => {
+            handleClientSocketConnection(socket, this.connectedSocketServer).subscribe(this.transportEvent)
         }).catch((error) => {
             this.console.log({ message: `Observer Error`, details: error })
         })
@@ -49,8 +49,8 @@ export class WebsocketTransportService implements TransportService {
 
     public emit(message: TransportMessage): void {
         this.console.log({ message: `Emitting: ${((message.payload as WrappedMessage).payload as FisMessage).header.messageID} to ${message.target}`, details: message })
-        let clientObj: ConnectedClientSocket | undefined = this.connectedClientSocket.find(obj => obj.id == message.target)
-        let serverObj: ConnectedServerSocket | undefined = this.connectedServer.find(obj => obj.id === message.target)
+        let clientObj: ConnectedSocketClient | undefined = this.connectedClientSocket.find(obj => obj.id == message.target)
+        let serverObj: ConnectedSocketServer | undefined = this.connectedSocketServer.find(obj => obj.id === message.target)
         // this.console.log({ message: `${serverObj?.connectionState.getValue(), serverObj?.id}` })
         // for server usage
         if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
@@ -72,10 +72,10 @@ export class WebsocketTransportService implements TransportService {
     }
 }
 
-export interface ConnectedClientSocket extends ClientObject {
+export interface ConnectedSocketClient extends ClientObject {
     socketInstance: SocketForConnectedClient
 }
 
-export interface ConnectedServerSocket extends ClientObject {
-    socketInstance: ClientSocket
+export interface ConnectedSocketServer extends ClientObject {
+    socketInstance: SocketForConnectedServer
 }

+ 27 - 27
src/utils/socket.utils.ts

@@ -1,11 +1,11 @@
 import { BehaviorSubject, Observable, Observer, Subject } from 'rxjs';
 import { createServer } from 'http';
 import { Server, Socket as SocketForConnectedClient } from 'socket.io';
-import { io, Socket as ClientSocket } from 'socket.io-client';
+import { io, Socket as SocketForConnectedServer } from 'socket.io-client';
 import * as fs from 'fs'
 import { v4 as uuidv4 } from 'uuid'
 import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
-import { ConnectedClientSocket, ConnectedServerSocket } from '../transport/websocket';
+import { ConnectedSocketClient, ConnectedSocketServer } from '../transport/websocket';
 import { EventMessage } from '../interface/transport.interface';
 import ConsoleLogger from './log.utils';
 const console: ConsoleLogger = new ConsoleLogger(`SocketUtils`, ['transport'])
@@ -39,11 +39,11 @@ export function startSocketServer(port: number): Observable<SocketForConnectedCl
     })
 }
 
-export async function startClientSocketConnection(serverUrl: string): Promise<ClientSocket> {
+export async function startClientSocketConnection(serverUrl: string): Promise<SocketForConnectedServer> {
     return new Promise((resolve, reject) => {
         try {
             // let clientSocket = io(serverUrl)
-            let clientSocket: ClientSocket = io(serverUrl, {
+            let clientSocket: SocketForConnectedServer = io(serverUrl, {
                 reconnection: true,              // Enable automatic reconnections
                 reconnectionAttempts: 1000,       // Retry up to 10 times
                 reconnectionDelay: 500,          // Start with a 500ms delay
@@ -59,10 +59,10 @@ export async function startClientSocketConnection(serverUrl: string): Promise<Cl
 }
 
 // After establishing connection to the server, set up the credentials, confirm whether or not if there's any credentials, if not ask for one from the server
-export function handleClientSocketConnection(socket: ClientSocket, serversConnected: ConnectedServerSocket[]): Observable<TransportEvent> {
+export function handleClientSocketConnection(socket: SocketForConnectedServer, serversConnected: ConnectedSocketServer[]): Observable<TransportEvent> {
     return new Observable((eventNotification: Observer<TransportEvent>) => {
         let buffer: any[] = []
-        let receiverProfileInfo!: ConnectedServerSocket
+        let receiverProfileInfo!: ConnectedSocketServer
 
         // Listen for a connection event
         socket.on('connect', () => {
@@ -120,14 +120,14 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                     socketInstance: socket,
                     connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
                 }
-                writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
+                writeFile(data.message as ConnectedSocketServer, (data.message as ConnectedSocketServer).id).then(() => {
                     // broadcast event to allow transmission manager to instantiate transmission components
                     eventNotification.next({
                         id: uuidv4(),
                         event: `New Server`,
                         data: {
-                            clientId: (data.message as ConnectedServerSocket).id,
-                            message: `New Websocket Channel ${(data.message as ConnectedServerSocket).id} established.`
+                            clientId: (data.message as ConnectedSocketServer).id,
+                            message: `New Websocket Channel ${(data.message as ConnectedSocketServer).id} established.`
                         } as EventMessage
                     })
                     // broadcast event to allow retransmission to relase buffered messages
@@ -135,17 +135,17 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                         id: uuidv4(),
                         event: `Server Connected`,
                         data: {
-                            clientId: (data.message as ConnectedServerSocket).id,
-                            message: `Server ${(data.message as ConnectedServerSocket).id} connected and ready to go.`
+                            clientId: (data.message as ConnectedSocketServer).id,
+                            message: `Server ${(data.message as ConnectedSocketServer).id} connected and ready to go.`
                         } as EventMessage
                     })
                 }).catch((error) => { }) // do nothing at the moment. 
                 serversConnected.push(receiverProfileInfo)
             }
             if (data.name == 'Adjusted Profile') {
-                console.log({ message: `Adjusted client Name: ${(data.message as ConnectedServerSocket).id}` })
+                console.log({ message: `Adjusted client Name: ${(data.message as ConnectedSocketServer).id}` })
                 // Update websocket instance record
-                let clientObj: ConnectedServerSocket | undefined = serversConnected.find(obj => obj.id === data.message.id)
+                let clientObj: ConnectedSocketServer | undefined = serversConnected.find(obj => obj.id === data.message.id)
                 if (clientObj) {
                     receiverProfileInfo.id = (data.message.id)
 
@@ -156,14 +156,14 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                         message: `Just to make sure they are pointed accurately: This should be ONLINE: ${receiverProfileInfo.connectionState.getValue()} !! Id match? ${receiverProfileInfo.id == clientObj.id ? true : false}`,
                     })
                 }
-                writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
+                writeFile(data.message as ConnectedSocketServer, (data.message as ConnectedSocketServer).id).then(() => {
                     // broadcast event to allow retransmission to release buffer
                     eventNotification.next({
                         id: uuidv4(),
                         event: 'Server Connected',
                         data: {
-                            clientId: (data.message as ConnectedServerSocket).id,
-                            message: `Existing Websocket Channel ${(data.message as ConnectedServerSocket).id} re-established.`
+                            clientId: (data.message as ConnectedSocketServer).id,
+                            message: `Existing Websocket Channel ${(data.message as ConnectedSocketServer).id} re-established.`
                         } as EventMessage
                     })
                 }).catch((error) => { }) // do nothing at the moment. 
@@ -199,14 +199,14 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
 }
 
 // For SERVER Usage: set up socket listeners to start listening for different events
-export function handleNewSocketClient(socket: SocketForConnectedClient, connectedClientSocket: ConnectedClientSocket[]): Observable<TransportEvent> {
+export function handleNewSocketClient(socket: SocketForConnectedClient, connectedClientSocket: ConnectedSocketClient[]): Observable<TransportEvent> {
     return new Observable((event: Observer<TransportEvent>) => {
         console.log({ message: `Setting up listeners for socket:${socket.id}` })
         // returns the socket client instance 
         // listen to receiver's initiotion first before assigning 'credentials'
         socket.on(`profile`, (message: { name: string, data: any }) => {
             if (message.name == 'New Client') {
-                let clientInstance: ConnectedClientSocket = {
+                let clientInstance: ConnectedSocketClient = {
                     id: uuidv4(), // client should only be assigned at this level. And is passed around for reference pointing
                     dateCreated: new Date(),
                     socketInstance: socket,
@@ -232,20 +232,20 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
                 startListening(socket, clientInstance, event)
             } else {
                 // update first
-                let clientInstance: ConnectedClientSocket | undefined
+                let clientInstance: ConnectedSocketClient | undefined
                 if (connectedClientSocket.length > 0) {
                     clientInstance = connectedClientSocket.find(obj => obj.id === message.data.id)
                     handleFoundClient(clientInstance)
                 } else {
                     // for the case server itself got shit down or something
-                    checkIfClientExists(message.data.id).then((client: ConnectedClientSocket) => {
+                    checkIfClientExists(message.data.id).then((client: ConnectedSocketClient) => {
                         clientInstance = client
                         handleFoundClient(clientInstance)
                     }).catch(error => {
                         console.log({ message: `Promise Error`, details: error })
                     })
                 }
-                function handleFoundClient(clientInstance: ConnectedClientSocket | undefined) {
+                function handleFoundClient(clientInstance: ConnectedSocketClient | undefined) {
                     if (clientInstance) {
                         console.log({ message: `Socket Client ${clientInstance.id} Found` })
                         socket.emit('profile', { name: 'Adjusted Profile', message: { id: clientInstance.id } })
@@ -266,7 +266,7 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
 
 
 // Specifically to write receiver profile information
-export async function writeFile(data: ConnectedServerSocket, filename: string): Promise<boolean> {
+export async function writeFile(data: ConnectedSocketServer, filename: string): Promise<boolean> {
     return new Promise((resolve, reject) => {
         // Write JSON data to a file
         fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => {
@@ -282,9 +282,9 @@ export async function writeFile(data: ConnectedServerSocket, filename: string):
 }
 
 /* For Internal Usage only. Temporary serve as a way for server to keep track of clients. To be replaced in the future with better alternatives. */
-export function addClientToDB(entry: ConnectedClientSocket, filePath: string = 'clients.json'): void {
+export function addClientToDB(entry: ConnectedSocketClient, filePath: string = 'clients.json'): void {
     try {
-        let data: ConnectedClientSocket[] = [];
+        let data: ConnectedSocketClient[] = [];
 
         // Check if the file exists and load existing data
         if (fs.existsSync(filePath)) {
@@ -298,7 +298,7 @@ export function addClientToDB(entry: ConnectedClientSocket, filePath: string = '
             dateCreated: entry.dateCreated,
             connectionState: null,
             socketInstance: null
-        } as unknown as ConnectedClientSocket);
+        } as unknown as ConnectedSocketClient);
 
         // Write the updated array back to the file
         fs.writeFileSync(filePath, JSON.stringify(data, null, 2), 'utf-8');
@@ -308,7 +308,7 @@ export function addClientToDB(entry: ConnectedClientSocket, filePath: string = '
     }
 }
 
-export async function checkIfClientExists(id: string, filePath: string = 'clients.json'): Promise<ConnectedClientSocket> {
+export async function checkIfClientExists(id: string, filePath: string = 'clients.json'): Promise<ConnectedSocketClient> {
     return new Promise((resolve, reject) => {
         try {
             // Check if the file exists
@@ -368,7 +368,7 @@ export async function checkOwnClientInfo(filename?: string): Promise<{ id: strin
 }
 
 // this is for server usage only
-export function startListening(socket: SocketForConnectedClient, client: ConnectedClientSocket, eventListener: Observer<TransportEvent>, oldClient?: boolean): void {
+export function startListening(socket: SocketForConnectedClient, client: ConnectedSocketClient, eventListener: Observer<TransportEvent>, oldClient?: boolean): void {
     // notify it's associated retransmission to start releaseing buffer
     eventListener.next({
         id: uuidv4(),