Browse Source

to be enhanced further

enzo 1 week ago
parent
commit
e4f0018ca2

+ 2 - 2
.env

@@ -1,6 +1,6 @@
 ;Transport = "Websocket, Http"
 ;PORT = 3000, 3001
-# Transport = "Websocket"
-Transport = "Http"
+Transport = "Websocket"
+# Transport = "Http"
 PORT = 3000
 FolderPath = "E:/Task/Fis-MessageTransmission/clients/"

+ 3 - 2
package.json

@@ -10,10 +10,11 @@
     "start": "node dist/index.js",
     "proxy": "node dist/test/proxy.js",
     "transmitter": "node dist/test/transmitter.js",
+    "receiver": "node dist/test/receiver.js",
+    "actor": "node dist/test/actor.js",
     "dev": "node --inspect dist/test/transmitter.js",
     "pmtransmitter": "pm2 start node dist/test/transmitter.js",
-    "pmreceiver": "pm2 start node dist/test/receiver.js",
-    "receiver": "node dist/test/receiver.js"
+    "pmreceiver": "pm2 start node dist/test/receiver.js"
   },
   "author": "",
   "license": "ISC",

+ 36 - 0
src/actor/transmission.actor.ts

@@ -0,0 +1,36 @@
+import { filter, map, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs"
+import { ActorBase } from "../base/actor.base"
+import { FisMessage } from "../interface/interface"
+import { ActorInterface, ActorMessage, ActorProfile, TransmisisonProfile } from "../interface/actor.interface"
+import { unsubscribe } from "diagnostics_channel"
+import ConsoleLogger from "../utils/log.utils"
+
+export class TransmissionActor extends ActorBase<ActorMessage<FisMessage>> {
+    private transmissionProfile!: TransmisisonProfile
+    private console: ConsoleLogger = new ConsoleLogger(`TransmissionActor`, [`base`])
+
+    constructor(actorParam: ActorProfile<TransmisisonProfile>) {
+        super()
+        this.setup(actorParam)
+        this.handleMessageTransmission(this.outgoingBus.asObservable())
+        this.handleMessageReception(this.incomingBus)
+    }
+
+    protected handleMessageTransmission(messageToBeTransmitted: Observable<ActorMessage<FisMessage>>): void {
+        messageToBeTransmitted.subscribe(message => {
+            this.transmissionProfile.transmitterService.emit(message.payload)
+        })
+    }
+
+    protected handleMessageReception(incomingBus: Subject<ActorMessage<FisMessage>>): void {
+        this.transmissionProfile.receiverService.getIncoming().pipe(
+            // filter()
+        )
+    }
+
+
+    protected setup(actorInfo: ActorProfile<TransmisisonProfile>): void {
+        this.actorProfile = actorInfo
+        this.transmissionProfile = actorInfo.data!
+    }
+}

+ 66 - 0
src/base/actor.base.ts

@@ -0,0 +1,66 @@
+import { filter, Observable, Observer, Subject, Unsubscribable } from "rxjs";
+import { ActorInterface, ActorMessage, ActorProfile } from "../interface/actor.interface";
+
+export class ActorBase<T> implements ActorInterface<T> {
+    protected actorProfile!: ActorProfile
+    protected incomingBus!: Subject<ActorMessage<T>>
+    protected outgoingBus!: Subject<ActorMessage<T>>
+    protected subscribers: ActorProfile[] = []
+
+    /* so if one party call this to their respective target, they are essentially exposing their incomingBus so that both can pour messages through that channel.
+     */
+    public subscribe(subscriberProfile: ActorProfile, observer: Observer<ActorMessage<T>>): Unsubscribable {
+        try {
+            // Listen and update subscribers info.
+            this.subscribers.push(subscriberProfile);
+
+            // Filter out all other irrelevant messages and subscribe.
+            const subscription = this.outgoingBus.pipe(
+                filter((message: ActorMessage<T>) => message.actorProfile.actorId === subscriberProfile.actorId)
+            ).subscribe({
+                next: (message: ActorMessage<T>) => {
+                    observer.next(message);
+                },
+                error: (err) => {
+                    observer.error(err);
+                },
+            })
+
+            // Return an Unsubscribable that cleans up both the subscription and subscriber info.
+            return {
+                unsubscribe: () => {
+                    subscription.unsubscribe();
+                    this.subscribers = this.subscribers.filter(
+                        (sub) => sub.actorId !== subscriberProfile.actorId
+                    );
+                }
+            };
+        } catch (error) {
+            observer.error(`Error during subscription: ${error}`);
+            return {
+                unsubscribe: () => {
+                    // No-op in case of error
+                }
+            };
+        }
+    }
+
+
+
+    protected setup(param: ActorProfile): void {
+        throw new Error(`Method not implemented...`)
+    }
+
+    protected handleIncoming(message: Subject<T>): void {
+        throw new Error(`Method not implemented...`)
+    }
+
+    protected handleOutgoing(message: Observable<T>): void {
+        throw new Error(`Method not implemented...`)
+    }
+
+    protected checkSubscription(actorProfile: ActorProfile): boolean {
+        return this.subscribers.some(actor => actor.actorId === actorProfile.actorId)
+    }
+
+}

+ 27 - 0
src/base/adapter.base.ts

@@ -0,0 +1,27 @@
+import { Observable, Subject } from "rxjs";
+import dotenv from 'dotenv';
+import { AdapterInterface, TransmissionRole, Transport, TransportService } from "../interface/interface";
+
+dotenv.config();
+/* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
+So how?: */
+export class AdapterBase implements AdapterInterface {
+    adapterID!: string
+    role!: TransmissionRole
+    transport!: Transport
+
+    protected transportService!: TransportService
+
+    constructor() {
+        //logic here
+    }
+
+    setAdapterProfile(id: string, adapterType: Transport, role: TransmissionRole): void {
+        this.adapterID = id
+        this.transport = adapterType
+        this.role = role
+    }
+
+}
+
+

+ 23 - 0
src/base/adapter.manager.base.ts

@@ -0,0 +1,23 @@
+import { Observer, Unsubscribable } from "rxjs"
+import { AdapterInterface, AdapterManagerInterface, TransportService, TransportSet } from "../interface/interface"
+import { ActorInterface, ActorProfile } from "../interface/actor.interface"
+import { ActorBase } from "../base/actor.base"
+
+export class AdapterManagerBase<T> extends ActorBase<T> implements AdapterManagerInterface<T> {
+    protected transportServiceArray: TransportService[] = []
+    protected transportSet: TransportSet[] = []
+    protected adapters: AdapterInterface[] = []
+
+    constructor() {
+        super()
+    }
+
+    public subscribe(actorProfile: ActorProfile, observer: Partial<Observer<T>>, subscribable?: ActorInterface<T>): Unsubscribable {
+        throw new Error("Method not implemented.");
+    }
+
+
+}
+
+
+

+ 0 - 0
src/transmission/msg.transmission.base.ts → src/base/msg.transmission.base.ts


+ 30 - 0
src/base/msg.transmission.manager.base.ts

@@ -0,0 +1,30 @@
+
+import { Observable, Observer, Subject, Unsubscribable } from 'rxjs';
+import { MessageTransmissionManagerInterface, Transmission } from '../interface/interface';
+import { ActorInterface, ActorProfile } from '../interface/actor.interface';
+import { ActorBase } from './actor.base';
+import { AdapterManager } from '../connector/adapter.manager';
+
+export class MessageTransmissionManagerBase<T> extends ActorBase<T> implements MessageTransmissionManagerInterface<T> {
+    protected browserEnv!: boolean
+    protected transmissionSet: Transmission[] = []
+    protected adapterManager!: AdapterManager
+
+    constructor() {
+        super()
+    }
+
+    public subscribe(actorProfile: ActorProfile, observer: Partial<Observer<T>>, subscribable?: ActorInterface<T>): Unsubscribable {
+        throw new Error("Method not implemented.");
+    }
+
+    protected instantiateTransmissionComponents(clientId: string): Transmission {
+        throw new Error(`Method not implemented`)
+        /* Subscribe to adapterEvent, adapter will give adapter with the relevant client information. 
+        It will first check array of transmission clinetID, so if new clientId is detected then a new 
+        transmission set will be instantiated. Then an observable instance for each transmission components
+        will be attacked to each of the transmission componetns, so that the respective transmission 
+        can handle more or less of the adapters instantiated by the adapter manager based on the new
+        adapters or it ohter cases, terminating of adaptesr as well. */
+    }
+}

+ 0 - 17
src/connector/adapter.base.ts

@@ -1,17 +0,0 @@
-import { Observable, Subject } from "rxjs";
-import dotenv from 'dotenv';
-import { AdapterInterface, AdapterProfile, TransportService } from "../interface/interface";
-
-dotenv.config();
-/* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
-So how?: */
-export class Adapter implements AdapterInterface {
-    protected transportService!: TransportService
-
-    constructor() {
-        //logic here
-    }
-
-}
-
-

+ 40 - 47
src/connector/adapter.manager.ts

@@ -1,62 +1,60 @@
-import { AdapterManager as AdapterManagerInterface, TransportService, TransportEvent, Transport } from "../interface/connector.interface"
-import { TransmitterAdapter } from './adapter.transmitter'
-import { ReceiverAdapter } from './adapter.receiver'
-import { v4 as uuidv4 } from 'uuid'
-import { Observable, Subject } from "rxjs"
+import { filter, Observable, Observer, Subject } from "rxjs"
 import { WebsocketTransportService } from "../transport/websocket"
 import { HttpTransportService } from "../transport/http"
 import config from '../config/config.json';
+import { v4 as uuidv4 } from 'uuid'
 import ConsoleLogger from "../utils/log.utils"
-import { RequestResponseAdapter } from "./adapter.request.response"
-=import { AdapterEvent } from "../interface/interface"
+import { AdapterEvent, EventObject, ReceiverAdapterInterface, TransmitterAdapterInterface, Transport, TransportEvent, TransportService, TransportSet } from "../interface/interface"
+import { AdapterManagerBase } from "./adapter.manager.base"
+import { TransmitterAdapter } from "./adapter.transmitter";
+import { ReceiverAdapter } from "./adapter.receiver";
+
+/* Note: There will be a need to use the logic in place for socket utility. Especially for client identification
+Will think about that later. Because that mechanism needs to be made universal somehow. If let's say utilizing already
+existing transport, there are no logic in place to exchange information to identify connected clients. */
 
-export class AdapterManager implements AdapterManagerInterface {
+export class AdapterManager extends AdapterManagerBase {
     private console: ConsoleLogger = new ConsoleLogger(`AdapterManager`, ['managers'])
-    private transportServiceArray: TransportService[] = []
-    private transportSet: TransportSet[] = []
-    private adapterSet: AdapterSet[] = []
-    private event!: Subject<TransportEvent>
 
-    constructor(event: Subject<TransportEvent>, browserEnv?: boolean | undefined) {
-        this.event = event
+    constructor(eventObj: EventObject, browserEnv?: boolean) {
+        super()
+        this.eventObj = eventObj
         this.console.log({ message: `Contructing self...` })
 
         this.sortTransportFromEnv(this.transportSet)
+        // for now set up these transportEvent
         this.transportSet.forEach(set => {
-            this.setUpTransportService(set, event, browserEnv)
+            this.setUpTransportService(set, this.eventObj.transportEvent, browserEnv)
         })
     }
 
-    subscribe(): Observable<AdapterEvent> {
-        throw new Error(`Method not implemented`)
+    public subscribe(): Observable<AdapterEvent> {
+        return new Observable((event: Observer<AdapterEvent>) => {
+            this.eventObj.transportEvent.pipe(
+                filter(event => event.event === `New Client`)
+            ).subscribe({
+                next: (event: TransportEvent) => {
+                    this.handleTransportEvent(event, this.eventObj.adapterEvent, event.transport)
+                }
+            })
+        })
     }
-    /* This one change to subscribe since I want it to return an array of adapters instead of just a set of them, becauise there could
-    mulitple adapters that the manager will have to work with */
-    public getAdapter(clientId: string): AdapterSet | null {
-        this.console.log({ message: `Instantiating an adapter set....` })
-        let transportType: Transport = process.env.Transport as unknown as Transport // as default  for now
-        let adapterId: string = clientId
-        let transportService: TransportService | undefined = this.transportServiceArray.find(obj => obj.getInfo() == transportType)
-        if (transportService) {
-            let transmitterAdapter: TransmitterAdapter = new TransmitterAdapter(adapterId, transportType, transportService)
-            let receiverAdapter: ReceiverAdapter = new ReceiverAdapter(adapterId, transportType, transportService)
-            let adapterSet: AdapterSet = {
-                id: adapterId,
-                dateCreated: new Date(),
-                transmitterAdapter: transmitterAdapter,
-                receiverAdapter: receiverAdapter,
-                requestResponsAdapter: new RequestResponseAdapter(transmitterAdapter, receiverAdapter)
-            }
-            this.adapterSet.push(adapterSet)
 
-            this.event.next({
+    private handleTransportEvent(event: TransportEvent, adapterEvent: Subject<AdapterEvent>, transport: Transport): void {
+        let transportService: TransportService | undefined = this.transportServiceArray.find(obj => obj.getInfo() === transport)
+        if (transportService) {
+            let transmitterAdapter: TransmitterAdapterInterface = new TransmitterAdapter(event.data?.clientId, event.transport, transportService)
+            let receiverAdapter: ReceiverAdapterInterface = new ReceiverAdapter(event.data?.clientId, event.transport, transportService)
+            adapterEvent.next({
                 id: uuidv4(),
                 event: 'New Adapter',
-                data: adapterId
-            } as TransportEvent)
-            return adapterSet
+                type: 'Adapter Event',
+                date: new Date(),
+                adapters: [transmitterAdapter, receiverAdapter]
+            } as AdapterEvent)
         } else {
-            return null
+            this.console.error({ message: `No ${transport} service is not properly instantiated....` })
+            throw new Error(`No ${transport} service is not properly instantiated....`)
         }
     }
 
@@ -88,10 +86,10 @@ export class AdapterManager implements AdapterManagerInterface {
     }
 
     private instantiateTransportService(transportType: Transport, event: Subject<TransportEvent>): TransportService {
-        if (transportType == Transport.Websocket) {
+        if (transportType === 'Websocket') {
             return new WebsocketTransportService(event)
         }
-        else if (transportType == Transport.Http) {
+        else if (transportType === 'Http') {
             return new HttpTransportService(event)
         } else {
             throw new Error(`No Transport Service Instantiated`)
@@ -110,8 +108,3 @@ export class AdapterManager implements AdapterManagerInterface {
 }
 
 
-interface TransportSet {
-    transport: Transport,
-    port: number
-}
-

+ 30 - 42
src/connector/adapter.receiver.ts

@@ -1,60 +1,48 @@
 import dotenv from 'dotenv';
-import { Bus, EventMessage, FisMessage } from "../interface/transport.interface";
-import { ReceiverAdapter as ReceiverAdapterInterface, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { filter, Observable, Observer, Subscription, takeWhile } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
-import { Adapter } from './adapter.base';
 import ConsoleLogger from '../utils/log.utils';
 import { WrappedMessage } from '../utils/message.ordering';
+import { AdapterBase } from '../base/adapter.base';
+import { AdapterEvent, FisMessage, Transport, TransportEvent, TransportMessage, TransportService } from '../interface/interface';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class ReceiverAdapter extends Adapter implements ReceiverAdapterInterface {
+export class ReceiverAdapter extends AdapterBase {
     private console!: ConsoleLogger
 
-    constructor(adapterId: string, transportType: Transport, transportService: TransportService) {
+    constructor(adapterId: string, adapterType: Transport, transportService: TransportService) {
         super()
-        this.console = new ConsoleLogger(`${transportType}ReceiverAdapter`, ['adapter'])
-        this.connector = transportService
-        this.setAdapterProfile(adapterId, transportType)
-        this.console.log({ message: `Just testing to see if receiverAdapter is instantiated properly ${this.adapterProfile, this.connector ? 'TransportService Instantiated' : 'Trnasport Service not instantiated'}` })
+        
+        this.console = new ConsoleLogger(`${adapterType}ReceiverAdapter`, ['adapter'])
+        this.setAdapterProfile(adapterId, adapterType, 'Receiver')
+        this.transportService = transportService
     }
 
-    setAdapterProfile(id: string, transportType: Transport): void {
-        this.adapterProfile = {
-            id: id,
-            transportType: transportType
-        }
-    }
+    subscribe(): Observable<AdapterEvent> {
+        this.console.log({ message: `Connector getting message bus for this connector: ${this.adapterID}` })
+        return new Observable((observable: Observer<AdapterEvent>) => {
+            const subscription: Subscription = this.transportService.subscribe().pipe(
+                filter((message: TransportEvent) => message.event === 'New Message'),
+                // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
+                filter((message: TransportEvent) => (message.data as TransportMessage).target == this.adapterID),
+            ).subscribe((message: TransportEvent) => {
+                this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })
+                observable.next({
+                    id: uuidv4(),
+                    type: `Adapter Event`,
+                    event: `New Message`,
+                    date: new Date(),
+
+                } as AdapterEvent);
+            });
+
+            // Clean up on unsubscription
+            return () => {
+                subscription.unsubscribe();
+            };
 
-    getMessageBus(bus: Bus): Observable<TransportEvent> {
-        this.console.log({ message: `Connector getting message bus for this connector: ${this.adapterProfile.id}` })
-        return new Observable((observable: Observer<TransportEvent>) => {
-            if (bus == Bus.GeneralBus) {
-                const subscription: Subscription = this.connector.subscribe().pipe(
-                    filter((message: TransportEvent) => message.event === 'New Message'),
-                    // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
-                    filter((message: TransportEvent) => (message.data as TransportMessage).target == this.adapterProfile.id),
-                ).subscribe((message: TransportEvent) => {
-                    this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })
-                    observable.next(message);
-                });
-
-                // Clean up on unsubscription
-                return () => {
-                    subscription.unsubscribe();
-                };
-            }
-            if (bus == Bus.ResponseMessageBus) {
-                /// logic here
-            }
-            if (bus == Bus.NotificationMessageBus) {
-                /// logic here
-            }
-            if (bus == Bus.ErrorMessageBus) {
-                /// logic here
-            }
         });
     }
 

+ 0 - 53
src/connector/adapter.request.response.ts

@@ -1,53 +0,0 @@
- import dotenv from 'dotenv';
-import { Bus, FisMessage } from "../interface/transport.interface";
-import { RequestResponseAdapter as RequestResponseAdapterInterface, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
-import { filter, map, Observable, Observer, Subscription, takeWhile } from 'rxjs';
-import { WrappedMessage } from '../utils/message.ordering';
-import { Adapter } from './adapter.base';
-import { ReceiverAdapter } from './adapter.receiver';
-import { TransmitterAdapter } from './adapter.transmitter';
-
-dotenv.config();
-/* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
-So how?: */
-export class RequestResponseAdapter extends Adapter implements RequestResponseAdapterInterface {
-    private transmitterAdapter!: TransmitterAdapter
-    private receiverAdapter!: ReceiverAdapter
-
-    constructor( transmitterAdapter: TransmitterAdapter, receiverAdapter: ReceiverAdapter) {
-        super()
-        // logic here
-        this.transmitterAdapter = transmitterAdapter
-        this.receiverAdapter = receiverAdapter
-    }
-
-    // Make use of the adapters ref passed in
-    send(message: WrappedMessage): Observable<FisMessage> {
-        return new Observable((response: Observer<FisMessage>) => {
-            // logic here
-            this.transmitterAdapter.emit(message)
-            const subscription: Subscription = this.receiverAdapter.getMessageBus(Bus.GeneralBus).pipe(
-                filter((message: TransportEvent) => message.event === 'New Message'),
-                // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
-                filter((message: TransportEvent) => (message.data as TransportMessage).target == this.adapterProfile.id),
-                takeWhile((message: TransportEvent) => {
-                    const shouldTake = ((message.data as TransportMessage).payload as FisMessage).data !== 'Complete';
-                    if (!shouldTake) {
-                        response.complete();  // Ensure the observer is completed
-                    }
-                    return shouldTake;
-                }),
-                map(message => (message.data as TransportMessage).payload as FisMessage)
-            ).subscribe((message: FisMessage) => {
-                response.next(message);
-            });
-
-            // Clean up on unsubscription
-            return () => {
-                subscription.unsubscribe();
-            };
-        })
-    }
-
-}
-

+ 14 - 16
src/connector/adapter.transmitter.ts

@@ -1,41 +1,39 @@
 import dotenv from 'dotenv';
-import { FisMessage } from "../interface/transport.interface";
-import { Adapter } from "./adapter.base";
-import { ConnectionState, TransmitterAdapter as TransmitterAdapterInterface, Transport, TransportMessage, TransportService } from '../interface/connector.interface';
 import { Subject } from 'rxjs';
 import { WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
+import { AdapterBase } from '../base/adapter.base';
+import { TransmissionRole, Transport, TransportMessage, TransportService } from '../interface/interface';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class TransmitterAdapter extends Adapter implements TransmitterAdapterInterface {
+export class TransmitterAdapter extends AdapterBase {
     private console!: ConsoleLogger
 
     constructor(adapterId: string, adapterType: Transport, transportService: TransportService) {
         super()
         // logic here
         this.console = new ConsoleLogger(`${adapterType}TransmitterAdapter`, ['adapter'])
-        this.connector = transportService
-        this.setAdapterProfile(adapterId, adapterType)
+        this.transportService = transportService
+        this.setAdapterProfile(adapterId, adapterType, 'Transmitter')
     }
 
     emit(message: WrappedMessage): void {
         // logic here
-        this.console.log({ message: `Emitting: ${message.thisMessageID} to ${this.adapterProfile.id}` })
-        this.connector.emit({
-            id: this.adapterProfile.id,
-            transport: this.adapterProfile.transportType,
-            target: this.adapterProfile.id, // this should be directed to the channel/client established. Make sure this is right
+        this.console.log({ message: `Emitting: ${message.thisMessageID} to ${this.adapterID}` })
+        this.transportService.emit({
+            id: this.adapterID,
+            transport: this.transport,
+            target: this.adapterID, // this should be directed to the channel/client established. Make sure this is right
             payload: message
         } as TransportMessage)
     }
 
-    setAdapterProfile(id: string, adapterType: Transport): void {
-        this.adapterProfile = {
-            id: id,
-            transportType: adapterType
-        }
+    setAdapterProfile(id: string, adapterType: Transport, role: TransmissionRole): void {
+        this.adapterID = id
+        this.transport = adapterType
+        this.role = role
     }
 
 }

+ 27 - 0
src/interface/actor.interface.ts

@@ -0,0 +1,27 @@
+
+import { Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs"
+import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver"
+import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter"
+
+
+export interface ActorInterface<MessageType> {
+    subscribe(subscriberInfo: ActorProfile<any>, observer: Observer<ActorMessage<MessageType>>): Unsubscribable
+}
+
+export interface ActorProfile<T = any> {
+    actorId: string,
+    actorName: string,
+    dateCreated?: Date,
+    data?: T
+}
+
+export interface TransmisisonProfile {
+    clientId: string,
+    transmitterService: MessageTransmissionTransmitter,
+    receiverService: MessageTransmissionReceiver
+}
+
+export interface ActorMessage<T = any> {
+    actorProfile: ActorProfile,
+    payload: T
+}

+ 22 - 25
src/interface/interface.ts

@@ -1,35 +1,24 @@
-import { BehaviorSubject, Observable, Subject } from "rxjs"
+import { BehaviorSubject, Observable, Subject, Unsubscribable } from "rxjs"
 import { RetransmissionService } from "../utils/retransmission.service"
 import { WrappedMessage } from "../utils/message.ordering"
-import { Adapter } from "../connector/adapter.base"
+import { ActorInterface, ActorProfile } from "./actor.interface"
 
 /* EVENT BUS */
 export interface GeneralEvent {
     id: string,
     type: EventType,
     event: EventMessage,
-    date: Date
-}
-
-export interface TransmissionEvent extends GeneralEvent {
-    transmission: Transmission
-}
-
-export interface AdapterEvent extends GeneralEvent {
-    adapters: Adapter[]
-}
-
-export interface TransportEvent extends GeneralEvent {
-    data: any
+    date: Date,
+    transmission?: Transmission,
+    adapters?: AdapterInterface[],
+    message?: TransportMessage
 }
 
 /* MANAGEMENT */
-export interface MessageTransmissionManagerInterface {
-    subscribe(): Observable<TransmissionEvent>
+export interface MessageTransmissionManagerInterface<T> extends ActorInterface<T> {
 }
 
-export interface AdapterManagerInterface {
-    subscribe(): Observable<AdapterEvent>
+export interface AdapterManagerInterface<T> extends ActorInterface<T> {
 }
 
 
@@ -38,7 +27,7 @@ export interface MessageTransmissionInterface {
 }
 
 export interface MessageReceiverInterface extends MessageTransmissionInterface {
-    subscribe(): Observable<any>
+    getIncoming(): Observable<any>
 }
 
 export interface MessageTransmitterInterface extends MessageTransmissionInterface {
@@ -51,6 +40,8 @@ export interface MessageRequestResponseInterface extends MessageTransmissionInte
 
 /* ADAPTER COMPONENTS */
 export interface AdapterInterface {
+    role: TransmissionRole
+    transport: Transport
 }
 
 export interface TransmitterAdapterInterface extends AdapterInterface {
@@ -112,12 +103,13 @@ export enum AdapterTransmissionRole {
 
 
 export type EventType = `General Event` | 'Transport Event' | 'Transmission Event' | 'Adapter Event'
+export type TransmissionRole = `Transmitter` | 'Receiver' | 'RequestResponse'
 export type EventMessage = 'Server Started' | 'New Client' | 'Client Connected' | 'Client Re-connected' | 'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter' | 'Re-Flush' | 'New Transport' | 'New Transmission'
 export type ConnectionState = 'ONLINE' | 'OFFLINE'
 export interface TransportService {
     getInfo(): Transport
     emit(message: TransportMessage): void
-    subscribe(): Observable<TransportEvent> //all messages and whatever event will go through this, easier to implemnet across different transport protocol
+    subscribe(): Observable<GeneralEvent> //all messages and whatever event will go through this, easier to implemnet across different transport protocol
 }
 
 export interface Info {
@@ -136,8 +128,13 @@ export interface AdapterProfile {
     transportType: Transport,
 }
 
-export interface EventObject {
-    globalEvent: Observable<GeneralEvent>,
-    transportEvent: Observable<TransportEvent>,
-    adapterEvent: Observable<AdapterEvent>
+export interface TransportSet {
+    transport: Transport,
+    port: number
+}
+
+export interface TransportProfileMessage {
+    clientId: string,
+    message?: string,
+    payload?: any
 }

+ 63 - 0
src/test/actor.ts

@@ -0,0 +1,63 @@
+import { interval, map, Subject } from "rxjs";
+import { ActorBase } from "../base/actor.base"
+import { ActorInterface, ActorMessage, ActorProfile } from "../interface/actor.interface";
+import { FisMessage } from "../interface/interface";
+
+class TestA extends ActorBase<{ message: string }> {
+    // instantiate new actor
+    constructor() {
+        super();
+
+    }
+
+}
+
+
+class TestB extends ActorBase<{ message: string }> {
+    constructor() {
+        super();
+
+    }
+
+
+}
+
+
+function publishMessage<T>(actorProfile: ActorProfile, output: Subject<ActorMessage<T>>) {
+    let count = 0
+    interval(1000).pipe(
+        map(time => {
+            count++
+            return {
+                actorProfile: actorProfile,
+                payload: {
+                    message: `Message${count} from TEST A`
+                }
+            } as ActorMessage<T>
+        })
+    ).subscribe(output)
+}
+
+function doThese<T>(actorProfile: ActorProfile, input: Subject<ActorMessage<T>>, output: Subject<ActorMessage<T>>, actor: ActorInterface<T>, subscribers: ActorProfile[]) {
+    console.log(`${actorProfile.actorName} initiating ${actor ? `subscription to designated target` : `...`}`)
+    // start logging to see if the correct messages are in
+    // input = new Subject()
+    input.subscribe(message => {
+        console.log(`${actorProfile.actorName} Incoming Bus`, message)
+    })
+    // output = new Subject()
+    output.subscribe(message => {
+        // console.log(`${actorProfile.actorName} Outgoing Bus`, message)
+    })
+
+    // connect to target and see what happens
+    let unsubscribable = actor.subscribe(actorProfile, input)
+    // subscribers.find(obj =>  obj.actorId === )
+
+    // start publishing messages 
+    publishMessage<T>(actorProfile, output)
+}
+
+let testA = new TestA()
+let testB = new TestB()
+

+ 0 - 38
src/transmission/msg.transmission.manager.base.ts

@@ -1,38 +0,0 @@
-
-import { Observable, Subject } from 'rxjs';
-import { AdapterEvent, GeneralEvent, MessageTransmissionManagerInterface, Transmission, TransmissionEvent, TransportEvent } from '../interface/interface';
-
-export class MessageTransmissionManagerBase implements MessageTransmissionManagerInterface {
-    private globalEvent!: Subject<GeneralEvent>
-    private transportEvent!: Subject<TransportEvent>
-    private adapterEvent!: Subject<AdapterEvent>
-    protected eventObj = {
-        globalEvent: this.globalEvent,
-        transportEvent: this.transportEvent,
-        adapterEvent: this.adapterEvent
-    }
-
-    constructor() {
-        // logic here
-    }
-
-    public subscribe(): Observable<TransmissionEvent> {
-        throw new Error(`Method not implemented`)
-        /* Public interface to be used by 'client' to start transmitting and receiving. 
-        It will listen to adapterEvent, if adapter is not associated with any existing clientID that are connected
-        to the releveant transport service, than it will just pump in to the adapterEvent as ususal. But if it's a
-        new lcient, then a new Transmission is instantiated particularly with that client. Then an adapteverEvent
-        will also be attached to the transmission as well as toher relevant information so that a set of adapters
-        can be instantiated to be used accordingly. */
-    }
-
-    protected instantiateTransmissionComponents(clientId: string): Transmission {
-        throw new Error(`Method not implemented`)
-        /* Subscribe to adapterEvent, adapter will give adapter with the relevant client information. 
-        It will first check array of transmission clinetID, so if new clientId is detected then a new 
-        transmission set will be instantiated. Then an observable instance for each transmission components
-        will be attacked to each of the transmission componetns, so that the respective transmission 
-        can handle more or less of the adapters instantiated by the adapter manager based on the new
-        adapters or it ohter cases, terminating of adaptesr as well. */
-    }
-}

+ 28 - 35
src/transmission/msg.transmission.manager.ts

@@ -3,62 +3,55 @@ import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { AdapterManager } from "../connector/adapter.manager";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
-import { filter, Observable, Observer, Subject } from "rxjs";
+import { filter, Observable, Observer, Subject, Unsubscribable } from "rxjs";
 import ConsoleLogger from "../utils/log.utils";
-import { TransmitterAdapter } from "../connector/adapter.transmitter";
-import { ReceiverAdapter } from "../connector/adapter.receiver";
-import { AdapterEvent, AdapterInterface, EventMessage, EventObject, GeneralEvent, Transmission, TransmissionEvent, TransportEvent, TransportService } from '../interface/interface'
-import { MessageTransmissionManagerBase } from "./msg.transmission.manager.base";
+import { TransmitterAdapter } from "../connector/adapter.transmitter"
+import { ReceiverAdapter } from "../connector/adapter.receiver"
+import { MessageTransmissionManagerBase } from "../base/msg.transmission.manager.base";
+import { ActorInterface, ActorProfile } from "../interface/actor.interface";
+import { TransportService } from "../interface/interface";
 
-export class MessageTransmissionManager extends MessageTransmissionManagerBase {
+export class MessageTransmissionManager<GeneralEvent> extends MessageTransmissionManagerBase<GeneralEvent> {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionManager`, ['managers'])
-    private browserEnv!: boolean
-    private transmissionSet: Transmission[] = []
-    private adapterManager!: AdapterManager
 
     constructor(transportRef: Observable<TransportService>, browserEnv?: boolean) {
         super()
         this.console.log({ message: `Constructing self...` })
-        this.eventObj.globalEvent = new Subject()
-        this.eventObj.transportEvent = new Subject()
         transportRef.subscribe((transport: TransportService) => {
-            this.eventObj.transportEvent.next({
+            this.outgoingBus.next({
                 id: uuidv4(),
                 type: `Transport Event`,
                 event: `New Transport`,
                 date: new Date(),
                 data: transport
-            } as TransportEvent)
+            })
         })
         if (browserEnv) this.browserEnv = browserEnv
         // Subscribe for adapterManager and it's relevent event
-        this.adapterManager = new AdapterManager(this.eventObj, browserEnv)
-        this.adapterManager.subscribe().subscribe(this.eventObj.adapterEvent)
-
+        this.adapterManager = new AdapterManager()
+        this.adapterManager.subscribe(this.actorProfile, this.incomingBus, this.outgoingBus.asObservable)
     }
 
-    public subscribe(): Observable<TransmissionEvent> {
-        return new Observable((observer: Observer<TransmissionEvent>) => {
-            const targetEvent: EventMessage = this.browserEnv ? 'New Server' : 'New Client';
-            this.eventObj.transportEvent.pipe(
-                filter(event => event.event == targetEvent)
-            ).subscribe(event => {
-                // get all adapters for all the connection
-                let transmission: Transmission | undefined = this.instantiateTransmissionComponents(event?.data?.clientId)
-                if (transmission) {
-                    observer.next({
-                        id: uuidv4(),
-                        type: `Transmission Event`,
-                        event: 'New Transmission',
-                        date: new Date(),
-                        transmission: transmission
-                    })
-                }
-            })
+    public subscribe(actorProfile: ActorProfile, observer: Partial<Observer<GeneralEvent>>, subscribable?: ActorInterface<GeneralEvent>): Unsubscribable {
+        const targetEvent: EventMessage = this.browserEnv ? 'New Server' : 'New Client';
+        this.eventObj.transportEvent.pipe(
+            filter(event => event.event == targetEvent)
+        ).subscribe(event => {
+            // get all adapters for all the connection
+            let transmission: Transmission | undefined = this.instantiateTransmissionComponents(event?.data?.clientId)
+            if (transmission) {
+                observer.next({
+                    id: uuidv4(),
+                    type: `Transmission Event`,
+                    event: 'New Transmission',
+                    date: new Date(),
+                    transmission: transmission
+                })
+            }
         })
     }
 
-    private instantiateTransmissionComponents(clientId: string): Transmission {
+    protected instantiateTransmissionComponents(clientId: string): Transmission {
         let receiverInstance: MessageTransmissionReceiver = this.getReceiver(clientId, this.eventObj)
         let transmitterInstance: MessageTransmissionTransmitter = this.getTransmitter(clientId, this.eventObj)
         let requestResponseInstance: MessageTransmissionRequestResponse = this.getRequestResponse(clientId, this.eventObj, transmitterInstance, receiverInstance)

+ 11 - 5
src/transmission/msg.transmission.receiver.ts

@@ -3,8 +3,8 @@ import { v4 as uuidv4 } from 'uuid'
 import { ReceiverAdapter } from '../connector/adapter.receiver';
 import { checkMessage, WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
-import { MessageTransmissionBase } from './msg.transmission.base';
-import { AdapterEvent, AdapterInterface, MessageReceiverInterface, ReceiverAdapterInterface, TransportEvent, TransportMessage } from '../interface/interface';
+import { MessageTransmissionBase } from '../base/msg.transmission.base';
+import { MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
 
 export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
@@ -12,13 +12,15 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
     private currentAdapter!: ReceiverAdapterInterface
     // private toBePassedOver: Subject<WrappedMessage> = new Subject()
 
-    constructor(clientId: string, adapterEvent: Observable<AdapterEvent>) {
+    constructor(clientId: string, eventObj: EventObject) {
         super()
         this.clientId = clientId
-        this.adapterEvent = adapterEvent
+        this.eventObj = eventObj
+
+        this.handleAdapterEvent(this.eventObj.adapterEvent.asObservable())
     }
 
-    subscribe(): Observable<TransportEvent> {
+    getIncoming(): Observable<TransportEvent> {
         this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
         return new Observable((observable: Observer<TransportEvent>) => {
             // logic here
@@ -47,4 +49,8 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         })
     }
 
+    private handleAdapterEvent(adapterEvent: Observable<AdapterEvent>): void {
+
+    }
+
 }

+ 2 - 2
src/transmission/msg.transmission.request-response.ts

@@ -1,11 +1,11 @@
-import { MessageTransmissionBase } from "./msg.transmission.base";
+import { MessageTransmissionBase } from "../base/msg.transmission.base";
 import { FisMessage, MessageRequestResponse as MessageRequestResponseInterface } from '../interface/transport.interface'
 import { filter, Observable, Observer, Subject, Subscription, takeWhile } from "rxjs";
 import { v4 as uuidv4 } from 'uuid'
 import { TransportEvent } from "../interface/connector.interface";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
-import { Adapter } from "../connector/adapter.base";
+import { Adapter } from "../base/adapter.base";
 
 export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface {
     transmitterInstance!: MessageTransmissionTransmitter;

+ 29 - 22
src/transmission/msg.transmission.transmitter.ts

@@ -1,12 +1,12 @@
-import { MessageTransmissionBase } from "./msg.transmission.base";
+import { MessageTransmissionBase } from "../base/msg.transmission.base";
 import { v4 as uuidv4 } from 'uuid'
-import { BehaviorSubject, distinct, distinctUntilChanged, filter, map, Observable, Subject } from "rxjs";
+import { BehaviorSubject, distinct, distinctUntilChanged, filter, map, Observable, Subject, Subscription } from "rxjs";
 import { RetransmissionService } from "../utils/retransmission.service";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { Adapter } from "../connector/adapter.base";
 import { TransmitterAdapter } from "../connector/adapter.transmitter";
-import { AdapterEvent, AdapterInterface, ConnectionState, EventObject, FisMessage, MessageTransmitterInterface, TransportEvent, TransportMessage } from "../interface/interface";
+import { AdapterEvent, AdapterInterface, ConnectionState, EventObject, FisMessage, MessageTransmitterInterface, TransmitterAdapterInterface, TransportEvent, TransportMessage } from "../interface/interface";
+import { error } from "console";
 
 /* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
 connectors or adapters will have their own identifier*/
@@ -15,7 +15,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
     private messageToBeTransmitted!: Subject<FisMessage | WrappedMessage>
     private buffer!: RetransmissionService;
-    private currentAdapter!: TransmitterAdapter
+    private currentAdapter!: TransmitterAdapterInterface
 
     constructor(clientId: string, eventObj: EventObject) {
         super()
@@ -23,8 +23,8 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         this.eventObj = eventObj
         this.messageToBeTransmitted = new Subject()
         this.buffer = new RetransmissionService()
-        this.setupBuffer()
         this.handleAdapters(eventObj.adapterEvent)
+        this.setupBuffer()
 
         // special case just for http in case of server/client disconnected, the unsent msg will be flushed back into messageToBeTransmitted
         // logic here
@@ -58,20 +58,17 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
             // need to work with wrapped messages
             this.console.log({ message: `Releasing ${bufferedMessage.thisMessageID}` });
-            (this.currentAdapter as TransmitterAdapter).emit(bufferedMessage)
+            if (this.currentAdapter) {
+                this.currentAdapter.emit(bufferedMessage)
+            } else {
+                this.messageToBeTransmitted.next(bufferedMessage)
+                this.console.error({ message: `Adapter is not set. Please ensure adapters are ready.` })
+            }
         })
     }
 
     private handleAdapters(adaptersEvent: Observable<AdapterEvent>): void {
-        if (Array.isArray(event.adapters)) {
-            if (event.adapters.length > 0) {
-                event.adapters.forEach(adapter => {
-                    this.adapters.push(adapter)
-                })
-            }
-        } else {
-            this.adapters.push(event.adapters)
-        }
+        this.handleNewAdapters(adaptersEvent)
     }
 
     private handleNewAdapters(adaptersEvent: Observable<AdapterEvent>): void {
@@ -79,18 +76,28 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
             filter(event => event.event === `New Adapter`),
             map(event => { return event.adapters }),
         ).subscribe({
-            next: (adapters: ) => {
-
-            }
+            next: (adapters: AdapterInterface[]) => {
+                adapters.forEach((adapter: AdapterInterface) => {
+                    if (adapter.role === `Transmitter`) {
+                        this.adapters.push(adapter as TransmitterAdapterInterface)
+                    }
+                })
+                this.setUpAdapter()
+            },
+            error: error => this.console.error({ message: 'Observer Error', details: error })
         })
     }
 
     private handleAdaptersTermination(adaptersEvent: Observable<AdapterEvent>): void {
-        adaptersEvent.subscribe
     }
 
-    private setUpAdapter(adapter: TransmitterAdapter): void {
-        this.currentAdapter = adapter
+    // temporary logic for now. 
+    private setUpAdapter(): void {
+        if (!this.currentAdapter && this.adapters.some(adapter => adapter.transport === `Websocket`)) {
+            this.currentAdapter = this.adapters.find(adapter => adapter.transport === `Websocket`) as TransmitterAdapterInterface
+        } else {
+            this.console.error({ message: 'No websocket socket adapter avaiable' })
+        }
     }
 
     private uniqueHandlerToFlushUnsentMessages(event: Observable<TransportEvent>): void {