Browse Source

revamp interfaces and class

enzo 2 weeks ago
parent
commit
692bb59540

+ 4 - 1
doc/explanation.txt

@@ -82,4 +82,7 @@ Target for the day:  Get the HTTP Services ready. at least the channel version,
 As of 16/12/2024: (Monday)
 -SO tomorrow I have to present something concrete. Will first finish up the UML diagram, and then proceed to hash out whatever errors there is for HTTP servcies.
 -If I manage to hash out all the remaining problems. Then proceed to do request response. Test with same app first, before developing for web UI version. it use fetch
-instead of axios, so there's going to be some tweaking. 
+instead of axios, so there's going to be some tweaking. 
+
+
+Calm down, don't be stupid, it's just a game. Focus on what's I'm supposed to do today. 

+ 3 - 13
src/connector/adapter.base.ts

@@ -1,27 +1,17 @@
 import { Observable, Subject } from "rxjs";
 import dotenv from 'dotenv';
-import { AdaptorTransmissionRole, AdaptorBase, ConnectionState, Transport, TransportEvent, TransportService, AdapterProfile } from "../interface/connector.interface";
+import { AdapterInterface, AdapterProfile, TransportService } from "../interface/interface";
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class Adapter implements AdaptorBase {
-    event!: Subject<TransportEvent>
-    connector!: TransportService;
-    adapterProfile!: AdapterProfile;
+export class Adapter implements AdapterInterface {
+    protected transportService!: TransportService
 
     constructor() {
         //logic here
     }
 
-    getInfo(): AdapterProfile {
-        // throw new Error("Method not implemented.");
-        return this.adapterProfile
-    }
-   
-    setAdapterProfile(id: string, transportType: Transport): void {
-        throw new Error("Method not implemented.");
-    }
 }
 
 

+ 5 - 2
src/connector/adapter.manager.ts

@@ -2,13 +2,13 @@ import { AdapterManager as AdapterManagerInterface, TransportService, TransportE
 import { TransmitterAdapter } from './adapter.transmitter'
 import { ReceiverAdapter } from './adapter.receiver'
 import { v4 as uuidv4 } from 'uuid'
-import { Subject } from "rxjs"
+import { Observable, Subject } from "rxjs"
 import { WebsocketTransportService } from "../transport/websocket"
 import { HttpTransportService } from "../transport/http"
 import config from '../config/config.json';
 import ConsoleLogger from "../utils/log.utils"
 import { RequestResponseAdapter } from "./adapter.request.response"
-import { AdapterSet } from "../interface/general.interface"
+=import { AdapterEvent } from "../interface/interface"
 
 export class AdapterManager implements AdapterManagerInterface {
     private console: ConsoleLogger = new ConsoleLogger(`AdapterManager`, ['managers'])
@@ -27,6 +27,9 @@ export class AdapterManager implements AdapterManagerInterface {
         })
     }
 
+    subscribe(): Observable<AdapterEvent> {
+        throw new Error(`Method not implemented`)
+    }
     /* This one change to subscribe since I want it to return an array of adapters instead of just a set of them, becauise there could
     mulitple adapters that the manager will have to work with */
     public getAdapter(clientId: string): AdapterSet | null {

+ 0 - 90
src/interface/connector.interface.ts

@@ -1,90 +0,0 @@
-import { BehaviorSubject, Observable, Subject } from "rxjs"
-import { Bus, FisMessage, ReceiverProfile, TransmissionProfile, TransmitterProfile } from "./transport.interface"
-import { WrappedMessage } from "../utils/message.ordering"
-import { AdapterSet } from "./general.interface"
-
-export type TYPE = {
-    adapterProfile: AdapterProfile,
-    transmitterProfile: TransmitterProfile,
-    receiverProfile: ReceiverProfile,
-    requestResponseProfile: TransmissionProfile
-}
-
-export interface AdapterProfile {
-    id: string,
-    transportType: Transport,
-}
-
-
-export interface AdapterManager {
-    getAdapter(clientId: string, transportService: TransportService): AdapterSet | null
-}
-
-export interface AdaptorBase {
-    connector: TransportService // this one will refer to the actual tranpsort service like websocket and so on
-    adapterProfile: AdapterProfile
-    event: Subject<TransportEvent>
-
-    getInfo(): AdapterProfile
-    setAdapterProfile(id: string, transportType: Transport): void
-}
-
-
-export interface TransmitterAdapter extends AdaptorBase {
-    emit(message: WrappedMessage): void
-}
-
-export interface ReceiverAdapter extends AdaptorBase {
-    getMessageBus(bus: Bus): Observable<any>
-}
-
-export interface RequestResponseAdapter extends AdaptorBase {
-    send(message: WrappedMessage): Observable<FisMessage>
-}
-
-export type ConnectionState = 'ONLINE' | 'OFFLINE'
-export enum AdaptorTransmissionRole {
-    Transmitter,
-    Receiver,
-    RequestResponse
-}
-
-export enum Transport {
-    Websocket = `Websocket`,
-    Grpc = `Grpc`,
-    Http = `Http`,
-    TCP = 'TCP'
-}
-
-// TO be used for transmission at the trasport level
-export interface TransportMessage {
-    id: string,
-    dateCreated: Date,
-    transport: Transport,
-    target?: string,
-    payload: any
-}
-
-export interface TransportEvent {
-    id: string,
-    event: Event,
-    data: any
-}
-
-export type Event = 'Server Started' | 'New Client' | 'Client Connected' | 'Client Re-connected' | 'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter' | 'Re-Flush'
-
-export interface TransportService {
-    getInfo(): Transport
-    emit(message: TransportMessage): void
-    subscribe(): Observable<TransportEvent> //all messages and whatever event will go through this, easier to implemnet across different transport protocol
-}
-
-export interface Info {
-    transport: Transport
-}
-
-export interface ClientObject {
-    id: string,
-    dateCreated: Date,
-    connectionState: BehaviorSubject<ConnectionState>
-}

+ 0 - 11
src/interface/general.interface.ts

@@ -1,11 +0,0 @@
-import { ReceiverAdapter } from "../connector/adapter.receiver";
-import { RequestResponseAdapter } from "../connector/adapter.request.response";
-import { TransmitterAdapter } from "../connector/adapter.transmitter";
-
-export interface AdapterSet {
-    id: string,
-    dateCreated: Date,
-    transmitterAdapter: TransmitterAdapter,
-    receiverAdapter: ReceiverAdapter,
-    requestResponsAdapter: RequestResponseAdapter
-}

+ 37 - 62
src/interface/interface.ts

@@ -1,13 +1,14 @@
 import { BehaviorSubject, Observable, Subject } from "rxjs"
 import { RetransmissionService } from "../utils/retransmission.service"
 import { WrappedMessage } from "../utils/message.ordering"
+import { Adapter } from "../connector/adapter.base"
 
 /* EVENT BUS */
 export interface GeneralEvent {
     id: string,
-    event: EventType,
+    type: EventType,
+    event: EventMessage,
     date: Date
-    message?: string
 }
 
 export interface TransmissionEvent extends GeneralEvent {
@@ -15,7 +16,7 @@ export interface TransmissionEvent extends GeneralEvent {
 }
 
 export interface AdapterEvent extends GeneralEvent {
-    adapters: AdapterBase | AdapterBase[]
+    adapters: Adapter[]
 }
 
 export interface TransportEvent extends GeneralEvent {
@@ -23,72 +24,44 @@ export interface TransportEvent extends GeneralEvent {
 }
 
 /* MANAGEMENT */
-export interface MessageTransmissionManager {
+export interface MessageTransmissionManagerInterface {
     subscribe(): Observable<TransmissionEvent>
 }
 
-export interface AdapterManager {
+export interface AdapterManagerInterface {
     subscribe(): Observable<AdapterEvent>
 }
 
 
 /* TRANSMISSION COMPONENTS */
-export interface MessageTransmissionBase {
-    adapters: AdapterBase[]
-    mainAdapter: AdapterBase
-
-    getInfo(): TransmissionProfile
-    setUpAdapter(adapter: AdapterBase): void
+export interface MessageTransmissionInterface {
 }
 
-export interface MessageReceiver extends MessageTransmissionBase {
-    receiverProfile: ReceiverProfile
-
-    getMessageBus(bus: Bus): Observable<any>
-    setReceiver(receiverProfile: ReceiverProfile, event: Observable<TransportEvent>): void
+export interface MessageReceiverInterface extends MessageTransmissionInterface {
+    subscribe(): Observable<any>
 }
 
-export interface MessageTransmitter extends MessageTransmissionBase {
-    transmitterProfile: TransmitterProfile
-    retransmission: RetransmissionService
-
-    setTransmitter(transmitterProfile: TransmitterProfile, event: Observable<TransportEvent>): void
+export interface MessageTransmitterInterface extends MessageTransmissionInterface {
+    emit(message: any): void
 }
 
-export interface MessageRequestResponse extends MessageTransmissionBase {
-    transmitterInstance: MessageTransmitter
-    receiverInstance: MessageReceiver
-
-    setTransmissionProfile(transmissionInfo: MessageTransmitter, receiverInfo: MessageReceiver): void
+export interface MessageRequestResponseInterface extends MessageTransmissionInterface {
+    send(message: any): Observable<any>
 }
 
-
-export interface AdapterProfile {
-    id: string,
-    transportType: Transport,
+/* ADAPTER COMPONENTS */
+export interface AdapterInterface {
 }
 
-
-
-
-export interface AdapterBase {
-    connector: TransportService // this one will refer to the actual tranpsort service like websocket and so on
-    adapterProfile: AdapterProfile
-    event: Subject<TransportEvent>
-
-    getInfo(): AdapterProfile
-    setAdapterProfile(id: string, transportType: Transport): void
-}
-
-
-export interface TransmitterAdapter extends AdapterBase {
+export interface TransmitterAdapterInterface extends AdapterInterface {
     emit(message: WrappedMessage): void
 }
 
-export interface ReceiverAdapter extends AdapterBase {
-    getMessageBus(bus: Bus): Observable<any>
+export interface ReceiverAdapterInterface extends AdapterInterface {
+    subscribe(): Observable<any>
 }
 
+
 /* Utils */
 export interface TransmissionProfile {
     id: string,
@@ -103,17 +76,8 @@ export interface ReceiverProfile extends TransmissionProfile {
 }
 
 export interface RequestResponseProfile extends TransmissionProfile {
-
-}
-
-export enum Bus {
-    GeneralBus,
-    ResponseMessageBus,
-    ErrorMessageBus,
-    NotificationMessageBus
 }
 
-
 export interface TransportMessage {
     id: string,
     dateCreated: Date,
@@ -130,13 +94,12 @@ export interface FisMessage {
     data: any
 }
 
-
 export interface Transmission {
     clientId: string,
-    transmitter: MessageTransmitter,
-    receiver: MessageReceiver,
-    requestResponse: MessageRequestResponse,
-    event: Observable<TransportEvent>
+    transmitter: MessageTransmitterInterface,
+    receiver: MessageReceiverInterface,
+    requestResponse: MessageRequestResponseInterface,
+    event: Observable<GeneralEvent>
 }
 
 export type Transport = 'Websocket' | 'Http' | 'TCP'
@@ -148,8 +111,8 @@ export enum AdapterTransmissionRole {
 }
 
 
-
-export type EventType = 'Server Started' | 'New Client' | 'Client Connected' | 'Client Re-connected' | 'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter' | 'Re-Flush'
+export type EventType = `General Event` | 'Transport Event' | 'Transmission Event' | 'Adapter Event'
+export type EventMessage = 'Server Started' | 'New Client' | 'Client Connected' | 'Client Re-connected' | 'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter' | 'Re-Flush' | 'New Transport' | 'New Transmission'
 export type ConnectionState = 'ONLINE' | 'OFFLINE'
 export interface TransportService {
     getInfo(): Transport
@@ -166,3 +129,15 @@ export interface ClientObject {
     dateCreated: Date,
     connectionState: BehaviorSubject<ConnectionState>
 }
+
+
+export interface AdapterProfile {
+    id: string,
+    transportType: Transport,
+}
+
+export interface EventObject {
+    globalEvent: Observable<GeneralEvent>,
+    transportEvent: Observable<TransportEvent>,
+    adapterEvent: Observable<AdapterEvent>
+}

+ 0 - 84
src/interface/transport.interface.ts

@@ -1,84 +0,0 @@
-import { Observable, Subject } from "rxjs";
-import { AdaptorTransmissionRole, TransportEvent } from "./connector.interface";
-import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
-import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
-import { RetransmissionService } from "../utils/retransmission.service";
-import { Adapter } from "../connector/adapter.base";
-export interface MessageTransmissionManager {
-    subscribe(): Observable<MessageTransmissionSet>
-    getEvent(): Observable<TransportEvent>
-}
-
-export interface MessageTransmissionSet {
-    id: string,
-    transmitter: MessageTransmissionTransmitter,
-    receiver: MessageTransmissionReceiver,
-    requestResponse: MessageRequestResponse,
-    event: Observable<TransportEvent>
-}
-
-export interface MessageTransmissionBase {
-    transmissionRole: AdaptorTransmissionRole
-    mainAdapter: Adapter
-
-    getInfo(): TransmissionProfile
-    setUpAdapter(adapter: Adapter): void
-}
-
-export interface MessageReceiver extends MessageTransmissionBase {
-    receiverProfile: ReceiverProfile
-
-    getMessageBus(bus: Bus): Observable<any>
-    setReceiver(receiverProfile: ReceiverProfile, event: Observable<TransportEvent>): void
-}
-
-export interface MessageTransmitter extends MessageTransmissionBase {
-    transmitterProfile: TransmitterProfile
-    retransmission: RetransmissionService
-
-    setTransmitter(transmitterProfile: TransmitterProfile, event: Observable<TransportEvent>): void
-}
-
-export interface MessageRequestResponse extends MessageTransmissionBase {
-    transmitterInstance: MessageTransmissionTransmitter
-    receiverInstance: MessageTransmissionReceiver
-
-    setTransmissionProfile(transmissionInfo: MessageTransmissionTransmitter, receiverInfo: MessageTransmissionReceiver): void
-}
-
-export interface FisMessage {
-    header: {
-        messageID: string,
-        messageName: `NotificationMessage` | `ResponseMessage` | `RequestMessage`
-    },
-    data: any
-}
-
-export interface TransmissionProfile {
-    id: string,
-    name: string,
-    dateCreated: Date
-}
-
-export interface TransmitterProfile extends TransmissionProfile {
-}
-
-export interface ReceiverProfile extends TransmissionProfile {
-}
-
-export interface RequestResponseProfile extends TransmissionProfile {
-
-}
-
-export enum Bus {
-    GeneralBus,
-    ResponseMessageBus,
-    ErrorMessageBus,
-    NotificationMessageBus
-}
-
-export interface EventMessage {
-    clientId: string,
-    message: string,
-    payload?: any
-}

+ 6 - 7
src/test/receiver.ts

@@ -1,10 +1,9 @@
 import { filter, interval, map, Observable, Observer, Subject } from "rxjs";
-import { Bus, FisMessage, MessageTransmissionSet } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
-import { TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
+import { FisMessage, Transmission, TransportEvent, TransportMessage } from "../interface/interface";
 
 class Supervisor {
     private generalBus: Subject<TransportEvent> = new Subject()
@@ -12,14 +11,14 @@ class Supervisor {
     private isClient: boolean = true
     private transmissionManager!: MessageTransmissionManager
     private event: Observable<TransportEvent>
-    private transmissionSets: MessageTransmissionSet[] = []
+    private transmissionSets: Transmission[] = []
     private outgoingPipe: Subject<any> = new Subject()
 
     constructor() {
         this.transmissionManager = new MessageTransmissionManager(this.isClient)
         this.event = this.transmissionManager.getEvent()
 
-        this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmissionSet) => {
+        this.transmissionManager.subscribe().subscribe((transmissionSet: Transmission) => {
             this.console.log({ message: `Acquired transmission set for client` })
             this.transmissionSets.push(transmissionSet)
 
@@ -29,9 +28,9 @@ class Supervisor {
     }
 
     // only called once for each connected clients.
-    private handleActivity(messageTransmission: MessageTransmissionSet): void {
+    private handleActivity(messageTransmission: Transmission): void {
         // start listening to incoming messages from this client
-        messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransportEvent) => {
+        messageTransmission.receiver.subscribe().subscribe((event: TransportEvent) => {
             this.console.log({ message: `General Bus ${event.event} ${(((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header?.messageID ?? 'Not Message'}`, details: event })
             this.generalBus.next(event)
         })
@@ -52,7 +51,7 @@ class Supervisor {
         // this.startGeneratingRequest(1000, this.outgoingPipe)
     }
 
-    private request(request: FisMessage, messageTransmission: MessageTransmissionSet): Observable<any> {
+    private request(request: FisMessage, messageTransmission: Transmission): Observable<any> {
         return new Observable((response: Observer<any>) => {
             messageTransmission.transmitter.emit(request)
             this.generalBus.pipe(

+ 5 - 6
src/test/transmitter.ts

@@ -1,17 +1,16 @@
 import { filter, interval, map, Observable, Observer, Subject, take } from "rxjs";
-import { Bus, EventMessage, FisMessage, MessageTransmissionSet } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
-import { TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
+import { FisMessage, Transmission, TransportEvent, TransportMessage } from "../interface/interface";
 class Supervisor {
     private console = new ConsoleLogger('Supervisor', ['base'])
     private clientIncomingMessage: Subject<FisMessage> = new Subject()
     private messageProducer!: MessageProducer
     private transmissionManager!: MessageTransmissionManager
     private event!: Observable<TransportEvent>
-    private transmissionSets: MessageTransmissionSet[] = []
+    private transmissionSets: Transmission[] = []
 
     constructor() {
         // so need them adapters now. But supervisor shouldn't be concerned, only messageTransmissionManager and ConnectionManager
@@ -19,7 +18,7 @@ class Supervisor {
         this.transmissionManager = new MessageTransmissionManager()
         this.event = this.transmissionManager.getEvent()
 
-        this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmissionSet) => {
+        this.transmissionManager.subscribe().subscribe((transmissionSet: Transmission) => {
             this.transmissionSets.push(transmissionSet)
 
             this.handleClientActivity(transmissionSet)
@@ -27,9 +26,9 @@ class Supervisor {
     }
 
     // only called once for each connected clients.
-    private handleClientActivity(messageTransmission: MessageTransmissionSet): void {
+    private handleClientActivity(messageTransmission: Transmission): void {
         // start listening to incoming messages from this client
-        messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransportEvent) => {
+        messageTransmission.receiver.subscribe().subscribe((event: TransportEvent) => {
             let requestMessage: FisMessage = ((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage
             this.console.log({ message: `General Bus ${requestMessage?.header?.messageID ?? 'Not a message'}`, details: event }) // receiving end
             this.clientIncomingMessage.next(requestMessage)

+ 6 - 18
src/transmission/msg.transmission.base.ts

@@ -1,26 +1,14 @@
-
 import { Observable } from 'rxjs';
-import { AdaptorTransmissionRole, TransportEvent } from '../interface/connector.interface';
-import { MessageTransmissionBase as MessageTransmissionBaseInterface, TransmissionProfile } from '../interface/transport.interface'
-import { v4 as uuidv4 } from 'uuid'
-import { Adapter } from '../connector/adapter.base';
+import { AdapterEvent, AdapterInterface, EventObject, MessageTransmissionInterface } from '../interface/interface'
 
-export class MessageTransmissionBase implements MessageTransmissionBaseInterface {
-    event!: Observable<TransportEvent>
-    transmissionRole!: AdaptorTransmissionRole;
-    adaptorsArray: Array<Adapter> = []
-    mainAdapter!: Adapter;
+export class MessageTransmissionBase implements MessageTransmissionInterface {
+    protected clientId!: string;
+    protected adapters: AdapterInterface[] = []
+    protected eventObj!: EventObject
 
     constructor() {
         // logic here
     }
 
-    getInfo(): TransmissionProfile {
-        throw new Error(`Method not implemented`)
-    }
-
-    setUpAdapter(adapter: Adapter): void {
-        throw new Error(`Method not implemented`)
-    }
+}
 
-}

+ 38 - 0
src/transmission/msg.transmission.manager.base.ts

@@ -0,0 +1,38 @@
+
+import { Observable, Subject } from 'rxjs';
+import { AdapterEvent, GeneralEvent, MessageTransmissionManagerInterface, Transmission, TransmissionEvent, TransportEvent } from '../interface/interface';
+
+export class MessageTransmissionManagerBase implements MessageTransmissionManagerInterface {
+    private globalEvent!: Subject<GeneralEvent>
+    private transportEvent!: Subject<TransportEvent>
+    private adapterEvent!: Subject<AdapterEvent>
+    protected eventObj = {
+        globalEvent: this.globalEvent,
+        transportEvent: this.transportEvent,
+        adapterEvent: this.adapterEvent
+    }
+
+    constructor() {
+        // logic here
+    }
+
+    public subscribe(): Observable<TransmissionEvent> {
+        throw new Error(`Method not implemented`)
+        /* Public interface to be used by 'client' to start transmitting and receiving. 
+        It will listen to adapterEvent, if adapter is not associated with any existing clientID that are connected
+        to the releveant transport service, than it will just pump in to the adapterEvent as ususal. But if it's a
+        new lcient, then a new Transmission is instantiated particularly with that client. Then an adapteverEvent
+        will also be attached to the transmission as well as toher relevant information so that a set of adapters
+        can be instantiated to be used accordingly. */
+    }
+
+    protected instantiateTransmissionComponents(clientId: string): Transmission {
+        throw new Error(`Method not implemented`)
+        /* Subscribe to adapterEvent, adapter will give adapter with the relevant client information. 
+        It will first check array of transmission clinetID, so if new clientId is detected then a new 
+        transmission set will be instantiated. Then an observable instance for each transmission components
+        will be attacked to each of the transmission componetns, so that the respective transmission 
+        can handle more or less of the adapters instantiated by the adapter manager based on the new
+        adapters or it ohter cases, terminating of adaptesr as well. */
+    }
+}

+ 53 - 89
src/transmission/msg.transmission.manager.ts

@@ -1,125 +1,89 @@
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { AdapterManager } from "../connector/adapter.manager";
-import { EventMessage, MessageTransmissionSet, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmitterProfile } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
-import { TransportEvent, Event } from "../interface/connector.interface";
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
 import { filter, Observable, Observer, Subject } from "rxjs";
 import ConsoleLogger from "../utils/log.utils";
 import { TransmitterAdapter } from "../connector/adapter.transmitter";
 import { ReceiverAdapter } from "../connector/adapter.receiver";
-import { AdapterSet } from "../interface/general.interface";
+import { AdapterEvent, AdapterInterface, EventMessage, EventObject, GeneralEvent, Transmission, TransmissionEvent, TransportEvent, TransportService } from '../interface/interface'
+import { MessageTransmissionManagerBase } from "./msg.transmission.manager.base";
 
-export class MessageTransmissionManager implements MessageTransmissionManagerInterface {
+export class MessageTransmissionManager extends MessageTransmissionManagerBase {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionManager`, ['managers'])
     private browserEnv!: boolean
-    transmission: MessageTransmissionSet[] = []
-    connectionManager!: AdapterManager
-    event!: Subject<TransportEvent>
+    private transmissionSet: Transmission[] = []
+    private adapterManager!: AdapterManager
 
-    constructor(browserEnv?: boolean) {
-        if (browserEnv) this.browserEnv = browserEnv
-        // logic here
+    constructor(transportRef: Observable<TransportService>, browserEnv?: boolean) {
+        super()
         this.console.log({ message: `Constructing self...` })
-        this.event = new Subject()
-        this.connectionManager = new AdapterManager(this.event, browserEnv)
-
-        // this.event.subscribe(event => this.console.log({ message: 'event', details: event }))
+        this.eventObj.globalEvent = new Subject()
+        this.eventObj.transportEvent = new Subject()
+        transportRef.subscribe((transport: TransportService) => {
+            this.eventObj.transportEvent.next({
+                id: uuidv4(),
+                type: `Transport Event`,
+                event: `New Transport`,
+                date: new Date(),
+                data: transport
+            } as TransportEvent)
+        })
+        if (browserEnv) this.browserEnv = browserEnv
+        // Subscribe for adapterManager and it's relevent event
+        this.adapterManager = new AdapterManager(this.eventObj, browserEnv)
+        this.adapterManager.subscribe().subscribe(this.eventObj.adapterEvent)
 
-        // note that if this server is down, all these instances of transmission and connector would be lost as well. SO cannot just simply find "instances" and reuse them. Must reinstantiate them again
-        this.handleEvent('Client Re-connected' as Event, this.event)
     }
 
-    /* so there will be some changes here. will nto be assigning just one, but all of them dynamically to pour into this boy
-    Transmitter only have to call this once. */
-    subscribe(): Observable<MessageTransmissionSet> {
-        return new Observable((observer: Observer<MessageTransmissionSet>) => {
-            const targetEvent: Event = this.browserEnv ? 'New Server' : 'New Client';
-            this.event.pipe(
+    public subscribe(): Observable<TransmissionEvent> {
+        return new Observable((observer: Observer<TransmissionEvent>) => {
+            const targetEvent: EventMessage = this.browserEnv ? 'New Server' : 'New Client';
+            this.eventObj.transportEvent.pipe(
                 filter(event => event.event == targetEvent)
             ).subscribe(event => {
                 // get all adapters for all the connection
-                let messageTransmissionSet: MessageTransmissionSet | undefined = this.instantiateComponents((event.data as EventMessage).clientId)
-                if (messageTransmissionSet) {
-                    observer.next(messageTransmissionSet)
+                let transmission: Transmission | undefined = this.instantiateTransmissionComponents(event?.data?.clientId)
+                if (transmission) {
+                    observer.next({
+                        id: uuidv4(),
+                        type: `Transmission Event`,
+                        event: 'New Transmission',
+                        date: new Date(),
+                        transmission: transmission
+                    })
                 }
             })
         })
     }
 
-    getEvent(): Observable<TransportEvent> {
-        return this.event.asObservable()
-    }
-
-    private instantiateComponents(clientId: string): MessageTransmissionSet | undefined {
-        this.console.log({ message: `Instantiating new transmission set for  ${this.browserEnv ? 'Server' : 'Client'}: ${clientId}` })
-        let adapterSet: AdapterSet | null = this.connectionManager.getAdapter(clientId)
-        if (adapterSet) {
-            let transmitter: MessageTransmissionTransmitter = this.getTransmitter(clientId, adapterSet.transmitterAdapter, this.event.asObservable())
-            let receiver: MessageTransmissionReceiver = this.getReceiver(clientId, adapterSet.receiverAdapter, this.event.asObservable())
-            let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver, this.event.asObservable())
-            let transmission: MessageTransmissionSet = {
-                id: clientId,
-                transmitter: transmitter,
-                receiver: receiver,
-                requestResponse: requestResponse,
-                event: this.event.asObservable()
-            }
-            this.transmission.push(transmission)
-            return transmission
-        } else {
-            this.console.error({ message: 'No Adapter Set' })
-            return undefined
+    private instantiateTransmissionComponents(clientId: string): Transmission {
+        let receiverInstance: MessageTransmissionReceiver = this.getReceiver(clientId, this.eventObj)
+        let transmitterInstance: MessageTransmissionTransmitter = this.getTransmitter(clientId, this.eventObj)
+        let requestResponseInstance: MessageTransmissionRequestResponse = this.getRequestResponse(clientId, this.eventObj, transmitterInstance, receiverInstance)
+        let transmissionObj: Transmission = {
+            clientId: clientId,
+            transmitter: transmitterInstance,
+            receiver: receiverInstance,
+            requestResponse: requestResponseInstance,
+            event: this.eventObj.globalEvent
         }
-    }
 
-    private getTransmitter(transmissionId: string, adapter: TransmitterAdapter, event: Observable<TransportEvent>): MessageTransmissionTransmitter {
-        let transmitterProfile: TransmitterProfile = {
-            id: transmissionId,
-            name: `${adapter.getInfo().transportType} Transmitter Adapter`,
-            dateCreated: new Date()
-        }
-        return new MessageTransmissionTransmitter(transmitterProfile, adapter, event)
+        return transmissionObj
     }
 
-    private getReceiver(transmissionId: string, adapter: ReceiverAdapter, event: Observable<TransportEvent>): MessageTransmissionReceiver {
-        let receiverProfile: ReceiverProfile = {
-            id: transmissionId,
-            name: `${adapter.getInfo().transportType} Receiver Adapter`,
-            dateCreated: new Date()
-        }
-        return new MessageTransmissionReceiver(receiverProfile, adapter, event)
+    private getReceiver(clientId: string, eventObj: EventObject): MessageTransmissionReceiver {
+        throw new Error(`Method not defined`)
     }
-
-    private getRequestResponse(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, event: Observable<TransportEvent>): MessageTransmissionRequestResponse {
-        return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, event)
+    private getTransmitter(clientId: string, eventObj: EventObject): MessageTransmissionTransmitter {
+        throw new Error(`Method not defined`)
     }
-
-    private handleEvent(eventName: Event, eventObs: Observable<TransportEvent>): void {
-        eventObs.pipe(
-            filter((event: TransportEvent) => event.event === eventName)
-        ).subscribe(event => {
-            // assuming this is reconnection case
-            this.reconnectionHandler((event.data as EventMessage).clientId)
-            // can include more event handlers here
-        })
+    private getRequestResponse(clientId: string, eventObj: EventObject, transmitter: MessageTransmissionTransmitter, receiver: MessageTransmissionReceiver): MessageTransmissionRequestResponse {
+        throw new Error(`Method not defined`)
     }
 
-    private reconnectionHandler(clientId: string): void {
-        this.console.log({ message: `TransmissionManager: A reconnection occured. Client: ${clientId}` })
-        let transmissionObj: MessageTransmissionSet | undefined = Array.from(this.transmission).find(obj => obj.id === clientId)
-        if (!transmissionObj) {
-            let transmissionSet: MessageTransmissionSet | undefined = this.instantiateComponents(clientId)
-            if (transmissionSet) {
-                this.transmission.push(transmissionSet)
-            } else {
-                this.console.error({ message: `Cannot find client transmission obj : ${clientId}` })
-            }
-        } {
-            this.console.log({ message: `Transmission Object for ${clientId} Found` })
-        }
-    }
+
 }
 
 

+ 29 - 42
src/transmission/msg.transmission.receiver.ts

@@ -1,63 +1,50 @@
 import { filter, Observable, Observer, Subject, Subscription } from 'rxjs';
-import { TransportEvent, TransportMessage } from '../interface/connector.interface';
-import { MessageTransmissionBase } from './msg.transmission.base';
-import { Bus, MessageReceiver as MessageReceiverInterface, ReceiverProfile } from '../interface/transport.interface'
 import { v4 as uuidv4 } from 'uuid'
 import { ReceiverAdapter } from '../connector/adapter.receiver';
 import { checkMessage, WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
-import { Adapter } from '../connector/adapter.base';
+import { MessageTransmissionBase } from './msg.transmission.base';
+import { AdapterEvent, AdapterInterface, MessageReceiverInterface, ReceiverAdapterInterface, TransportEvent, TransportMessage } from '../interface/interface';
 
 export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
     private onHoldMessage: Subject<WrappedMessage> = new Subject()
+    private currentAdapter!: ReceiverAdapterInterface
     // private toBePassedOver: Subject<WrappedMessage> = new Subject()
-    receiverProfile!: ReceiverProfile;
 
-    constructor(profile: ReceiverProfile, adapter: ReceiverAdapter, event: Observable<TransportEvent>) {
+    constructor(clientId: string, adapterEvent: Observable<AdapterEvent>) {
         super()
-        this.event = event
-        this.console.log({ message: `Constructing Receiver Transmission with ${profile.name}` })
-
-        this.setReceiver(profile)
-        this.setUpAdapter(adapter)
+        this.clientId = clientId
+        this.adapterEvent = adapterEvent
     }
 
-    setReceiver(receiverProfile: ReceiverProfile): void {
-        this.receiverProfile = receiverProfile
-    }
-
-    getMessageBus(bus: Bus): Observable<TransportEvent> {
-        this.console.log({ message: `Transmission getting message bus for ${this.receiverProfile.id}` })
+    subscribe(): Observable<TransportEvent> {
+        this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
         return new Observable((observable: Observer<TransportEvent>) => {
             // logic here
-            if (bus == Bus.GeneralBus) {
-                // Need to merge all the adapters into one when the time comes 
-                // SAMPLE: This adapterArray.forEach(adapter => { ... })
-                const subscription: Subscription = (this.mainAdapter as ReceiverAdapter).getMessageBus(Bus.GeneralBus).pipe(
-                    filter((event: TransportEvent) => event.event == 'New Message'),
-                ).subscribe((event: TransportEvent) => {
-                    // console.log(event) // data is transportMessage instead of eventmessage
-                    this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
-                    checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
-                        // only release the message before it exists
-                        this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
-                        // console.log(((event.data as TransportMessage).payload as WrappedMessage))
-                        observable.next(event);
-                    }).catch((error) => {
-                        this.console.log({ message: `Observer Error`, details: error })
-                    })
-                });
+            // Need to merge all the adapters into one when the time comes 
+            // SAMPLE: This adapterArray.forEach(adapter => { ... })
+            const subscription: Subscription = this.currentAdapter.subscribe().pipe(
+                filter((event: TransportEvent) => event.event == 'New Message'),
+            ).subscribe((event: TransportEvent) => {
+                // console.log(event) // data is transportMessage instead of eventmessage
+                this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
+                checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
+                    // only release the message before it exists
+                    this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
+                    // console.log(((event.data as TransportMessage).payload as WrappedMessage))
+                    observable.next(event);
+                }).catch((error) => {
+                    this.console.log({ message: `Observer Error`, details: error })
+                })
+            });
+
+            // Clean up on unsubscription
+            return () => {
+                subscription.unsubscribe();
+            };
 
-                // Clean up on unsubscription
-                return () => {
-                    subscription.unsubscribe();
-                };
-            }
         })
     }
 
-    setUpAdapter(adapter: Adapter): void {
-        this.mainAdapter = adapter
-    }
 }

+ 50 - 30
src/transmission/msg.transmission.transmitter.ts

@@ -1,6 +1,4 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
-import { EventMessage, FisMessage, MessageTransmitter as MessageTransmitterInterface, TransmitterProfile } from '../interface/transport.interface'
-import { ConnectionState, TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { BehaviorSubject, distinct, distinctUntilChanged, filter, map, Observable, Subject } from "rxjs";
 import { RetransmissionService } from "../utils/retransmission.service";
@@ -8,6 +6,7 @@ import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
 import { Adapter } from "../connector/adapter.base";
 import { TransmitterAdapter } from "../connector/adapter.transmitter";
+import { AdapterEvent, AdapterInterface, ConnectionState, EventObject, FisMessage, MessageTransmitterInterface, TransportEvent, TransportMessage } from "../interface/interface";
 
 /* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
 connectors or adapters will have their own identifier*/
@@ -15,27 +14,31 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
     private messageToBeTransmitted!: Subject<FisMessage | WrappedMessage>
-    transmitterProfile!: TransmitterProfile;
-    retransmission!: RetransmissionService;
+    private buffer!: RetransmissionService;
+    private currentAdapter!: TransmitterAdapter
 
-    constructor(profile: TransmitterProfile, adapter: TransmitterAdapter, event: Observable<TransportEvent>) {
+    constructor(clientId: string, eventObj: EventObject) {
         super()
-        this.console.log({ message: `Constructing Transmitter Transmission with ${profile.name}` })
-        this.event = event
+        this.console.log({ message: `Constructing Transmitter Transmission with ${clientId}` })
+        this.eventObj = eventObj
         this.messageToBeTransmitted = new Subject()
-        this.retransmission = new RetransmissionService()
-        this.setTransmitter(profile)
-        this.setUpAdapter(adapter)
+        this.buffer = new RetransmissionService()
         this.setupBuffer()
+        this.handleAdapters(eventObj.adapterEvent)
 
         // special case just for http in case of server/client disconnected, the unsent msg will be flushed back into messageToBeTransmitted
-        this.uniqueHandlerToFlushUnsentMessages(event)
+        // logic here
     }
 
-    setupBuffer(): void {
+    public emit(message: FisMessage): void {
+        this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting message` : `Buffering message`}` })
+        this.messageToBeTransmitted.next(message)
+    }
+
+    private setupBuffer(): void {
         this.console.log({ message: `Setting up Retransmission Service...` })
-        this.event.pipe(
-            filter(event => (event.data as EventMessage).clientId == this.transmitterProfile.id),
+        this.eventObj.transportEvent.pipe(
+            filter(event => event.data.clientId == this.clientId),
             filter(event => event.event == 'Client Disconnected' || event.event == 'Client Re-connected' || event.event == 'Client Connected' || event.event == 'Server Disconnected' || event.event == 'Server Connected'),
             map(event => {
                 if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
@@ -47,39 +50,56 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
             distinctUntilChanged()
         ).subscribe((signal: ConnectionState) => {
             this.connectionStateEvent.next(signal)
-            if (signal == 'OFFLINE') this.console.error({ message: `${this.transmitterProfile.id} disconnected` })
-            if (signal == 'ONLINE') this.console.log({ message: `${this.transmitterProfile.id} connected` })
+            if (signal == 'OFFLINE') this.console.error({ message: `${this.clientId} disconnected` })
+            if (signal == 'ONLINE') this.console.log({ message: `${this.clientId} connected` })
         })
-        this.retransmission.implementRetransmission(this.messageToBeTransmitted, this.connectionStateEvent.asObservable(), true)
+        this.buffer.implementRetransmission(this.messageToBeTransmitted, this.connectionStateEvent.asObservable(), true)
         // automatically subscribe to allow released bffered messages to be released
-        this.retransmission.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
+        this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
             // need to work with wrapped messages
             this.console.log({ message: `Releasing ${bufferedMessage.thisMessageID}` });
-            (this.mainAdapter as TransmitterAdapter).emit(bufferedMessage)
+            (this.currentAdapter as TransmitterAdapter).emit(bufferedMessage)
         })
     }
 
-    setTransmitter(transmitterProfile: TransmitterProfile): void {
-        this.transmitterProfile = transmitterProfile
+    private handleAdapters(adaptersEvent: Observable<AdapterEvent>): void {
+        if (Array.isArray(event.adapters)) {
+            if (event.adapters.length > 0) {
+                event.adapters.forEach(adapter => {
+                    this.adapters.push(adapter)
+                })
+            }
+        } else {
+            this.adapters.push(event.adapters)
+        }
     }
 
-    emit(message: FisMessage): void {
-        this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting message` : `Buffering message`}` })
-        this.messageToBeTransmitted.next(message)
+    private handleNewAdapters(adaptersEvent: Observable<AdapterEvent>): void {
+        adaptersEvent.pipe(
+            filter(event => event.event === `New Adapter`),
+            map(event => { return event.adapters }),
+        ).subscribe({
+            next: (adapters: ) => {
+
+            }
+        })
+    }
+
+    private handleAdaptersTermination(adaptersEvent: Observable<AdapterEvent>): void {
+        adaptersEvent.subscribe
     }
 
-    setUpAdapter(adapter: Adapter): void {
-        // for now just hardcode to use 1 adapter type until connection manager is further enhacne to configure adapters on the fly
-        this.mainAdapter = adapter
+    private setUpAdapter(adapter: TransmitterAdapter): void {
+        this.currentAdapter = adapter
     }
 
     private uniqueHandlerToFlushUnsentMessages(event: Observable<TransportEvent>): void {
         event.pipe(
             filter(event => event.event == 'Re-Flush'),
-            filter(event => (event.data as EventMessage).clientId == this.transmitterProfile.id),
+            filter(event => event.data.clientId == this.clientId),
         ).subscribe((event: TransportEvent) => {
-            this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting ${(((event.data as EventMessage).payload as TransportMessage).payload as WrappedMessage).thisMessageID}` : `Buffering ${(((event.data as EventMessage).payload as TransportMessage).payload as WrappedMessage).thisMessageID}`}` })
-            this.messageToBeTransmitted.next((((event.data as EventMessage).payload as TransportMessage).payload as WrappedMessage))
+            this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}` : `Buffering ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}`}` })
+            this.messageToBeTransmitted.next(((event.data.payload as TransportMessage).payload as WrappedMessage))
         })
     }