Browse Source

need to be fixed

enzo 5 days ago
parent
commit
c0fc4307cd

+ 15 - 0
doc/explanation.txt

@@ -1,3 +1,18 @@
+2 Jan 2025:
+Things to do: 
+- Get the socket transwmission working first.
+- Sepearate out the logic for receiver Identification, make it modular 
+- Put actor concept aside. That one will be a separate project
+
+
+
+
+
+
+
+
+
+
 Concept as of 5/12/2024:
 So, when an application run, it will instantiate the transmission manager that will in turn instantiate adapter manager. The adapter manager will turn first check the 
 configuration file, whether a config json file or .env depending on the environment, and set up the necessary transport services in order to be able to create the 

+ 1 - 1
src/actor/transmission.actor.ts

@@ -1,7 +1,7 @@
 import { filter, map, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs"
 import { ActorBase } from "../base/actor.base"
 import { FisMessage } from "../interface/interface"
-import { ActorInterface, ActorMessage, ActorProfile, TransmisisonProfile } from "../interface/actor.interface"
+import { ActorInterface, ActorMessage, ActorProfile, TransmisisonProfile } from "../interface/actor.sample"
 import { unsubscribe } from "diagnostics_channel"
 import ConsoleLogger from "../utils/log.utils"
 

+ 57 - 0
src/adapters/adapter.manager.ts

@@ -0,0 +1,57 @@
+import { filter, Observable, Observer, Subject } from "rxjs"
+import config from '../config/config.json';
+import { v4 as uuidv4 } from 'uuid'
+import ConsoleLogger from "../utils/log.utils"
+import { AdapterInterface, ClientObject, GeneralEvent, ReceiverAdapterInterface, TransmitterAdapterInterface, Transport, TransportService, TransportSet } from "../interface/interface"
+import { TransmitterAdapter } from "./adapter.transmitter";
+import { ReceiverAdapter } from "./adapter.receiver";
+import { AdapterManagerBase } from "../base/adapter.manager.base";
+
+/* Note: There will be a need to use the logic in place for socket utility. Especially for client identification
+Will think about that later. Because that mechanism needs to be made universal somehow. If let's say utilizing already
+existing transport, there are no logic in place to exchange information to identify connected clients. */
+
+export class AdapterManager extends AdapterManagerBase {
+    private console: ConsoleLogger = new ConsoleLogger(`AdapterManager`, ['managers'])
+
+    constructor(event: Subject<GeneralEvent<any>>, browserEnv?: boolean) {
+        super()
+        this.console.log({ message: `Contructing self...` })
+        this.event = event
+
+    }
+
+    public subscribeForAdapters(): Observable<AdapterInterface> {
+        return new Observable((observer: Observer<AdapterInterface>) => {
+            this.event.pipe(
+                // usually a new client is notified from the transport service.
+                filter(event => event.event === `New Client`)
+            ).subscribe({
+                next: (event: GeneralEvent<any>) => {
+                    this.handleTransportEvent(event, this.event, event.transport)                                              
+                }
+            })
+        })
+    }
+
+    private handleTransportEvent(event: GeneralEvent<ClientObject>, adapterEvent: Subject<GeneralEvent<any>>, transport: Transport): void {
+        let transportService: TransportService | undefined = this.transportServiceArray.find(obj => obj.getInfo() === transport)
+        if (transportService) {
+            let transmitterAdapter: TransmitterAdapterInterface = new TransmitterAdapter(event.data.clientId, event.transport, transportService)
+            let receiverAdapter: ReceiverAdapterInterface = new ReceiverAdapter(event.data.clientId, event.transport, transportService)
+            adapterEvent.next({
+                id: uuidv4(),
+                event: 'New Adapter',
+                type: 'Adapter Event',
+                date: new Date(),
+                data: [transmitterAdapter, receiverAdapter]
+            } as GeneralEvent<AdapterInterface[]>)
+        } else {
+            this.console.error({ message: `No ${transport} service is not properly instantiated....` })
+            throw new Error(`No ${transport} service is not properly instantiated....`)
+        }
+    }
+
+}
+
+

+ 9 - 9
src/connector/adapter.receiver.ts → src/adapters/adapter.receiver.ts

@@ -4,7 +4,7 @@ import { v4 as uuidv4 } from 'uuid'
 import ConsoleLogger from '../utils/log.utils';
 import { WrappedMessage } from '../utils/message.ordering';
 import { AdapterBase } from '../base/adapter.base';
-import { AdapterEvent, FisMessage, Transport, TransportEvent, TransportMessage, TransportService } from '../interface/interface';
+import { FisMessage, GeneralEvent, Transport, TransportMessage, TransportServiceInterface } from '../interface/interface';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
@@ -12,7 +12,7 @@ So how?: */
 export class ReceiverAdapter extends AdapterBase {
     private console!: ConsoleLogger
 
-    constructor(adapterId: string, adapterType: Transport, transportService: TransportService) {
+    constructor(adapterId: string, adapterType: Transport, transportService: TransportServiceInterface) {
         super()
         
         this.console = new ConsoleLogger(`${adapterType}ReceiverAdapter`, ['adapter'])
@@ -20,22 +20,22 @@ export class ReceiverAdapter extends AdapterBase {
         this.transportService = transportService
     }
 
-    subscribe(): Observable<AdapterEvent> {
+    subscribeForIncoming(): Observable<GeneralEvent<any>> {
         this.console.log({ message: `Connector getting message bus for this connector: ${this.adapterID}` })
-        return new Observable((observable: Observer<AdapterEvent>) => {
+        return new Observable((observable: Observer<GeneralEvent<any>>) => {
             const subscription: Subscription = this.transportService.subscribe().pipe(
-                filter((message: TransportEvent) => message.event === 'New Message'),
+                filter((message: GeneralEvent<any>) => message.event === 'New Message'),
                 // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
-                filter((message: TransportEvent) => (message.data as TransportMessage).target == this.adapterID),
-            ).subscribe((message: TransportEvent) => {
+                filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).target == this.adapterID),
+            ).subscribe((message: GeneralEvent<TransportMessage>) => {
                 this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })
                 observable.next({
                     id: uuidv4(),
                     type: `Adapter Event`,
                     event: `New Message`,
                     date: new Date(),
-
-                } as AdapterEvent);
+                    data: message.data
+                } as GeneralEvent<TransportMessage>);
             });
 
             // Clean up on unsubscription

+ 2 - 2
src/connector/adapter.transmitter.ts → src/adapters/adapter.transmitter.ts

@@ -3,7 +3,7 @@ import { Subject } from 'rxjs';
 import { WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
 import { AdapterBase } from '../base/adapter.base';
-import { TransmissionRole, Transport, TransportMessage, TransportService } from '../interface/interface';
+import { TransmissionRole, Transport, TransportMessage, TransportServiceInterface } from '../interface/interface';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
@@ -11,7 +11,7 @@ So how?: */
 export class TransmitterAdapter extends AdapterBase {
     private console!: ConsoleLogger
 
-    constructor(adapterId: string, adapterType: Transport, transportService: TransportService) {
+    constructor(adapterId: string, adapterType: Transport, transportService: TransportServiceInterface) {
         super()
         // logic here
         this.console = new ConsoleLogger(`${adapterType}TransmitterAdapter`, ['adapter'])

+ 1 - 1
src/base/actor.base.ts

@@ -1,5 +1,5 @@
 import { filter, Observable, Observer, Subject, Unsubscribable } from "rxjs";
-import { ActorInterface, ActorMessage, ActorProfile } from "../interface/actor.interface";
+import { ActorInterface, ActorMessage, ActorProfile } from "../interface/actor.sample";
 
 export class ActorBase<T> implements ActorInterface<T> {
     protected actorProfile!: ActorProfile

+ 7 - 10
src/base/adapter.manager.base.ts

@@ -1,22 +1,19 @@
-import { Observer, Unsubscribable } from "rxjs"
-import { AdapterInterface, AdapterManagerInterface, TransportService, TransportSet } from "../interface/interface"
-import { ActorInterface, ActorProfile } from "../interface/actor.interface"
-import { ActorBase } from "../base/actor.base"
+import { Observable, Subject } from "rxjs"
+import { AdapterInterface, AdapterManagerInterface, GeneralEvent, TransportService } from "../interface/interface"
 
-export class AdapterManagerBase<T> extends ActorBase<T> implements AdapterManagerInterface<T> {
+export class AdapterManagerBase implements AdapterManagerInterface {
+    protected event!: Subject<GeneralEvent<any>>
     protected transportServiceArray: TransportService[] = []
-    protected transportSet: TransportSet[] = []
     protected adapters: AdapterInterface[] = []
 
     constructor() {
-        super()
+        // logic here
     }
 
-    public subscribe(actorProfile: ActorProfile, observer: Partial<Observer<T>>, subscribable?: ActorInterface<T>): Unsubscribable {
-        throw new Error("Method not implemented.");
+    subscribeForAdapters(): Observable<AdapterInterface> {
+        throw new Error("Method not implemented.")
     }
 
-
 }
 
 

+ 9 - 10
src/base/msg.transmission.manager.base.ts

@@ -1,24 +1,23 @@
 
 import { Observable, Observer, Subject, Unsubscribable } from 'rxjs';
-import { MessageTransmissionManagerInterface, Transmission } from '../interface/interface';
-import { ActorInterface, ActorProfile } from '../interface/actor.interface';
+import { MessageTransmissionManagerInterface, TransmissionInterface } from '../interface/interface';
+import { ActorInterface, ActorProfile } from '../interface/actor.sample';
 import { ActorBase } from './actor.base';
-import { AdapterManager } from '../connector/adapter.manager';
+import { AdapterManager } from '../adapters/adapter.manager';
 
-export class MessageTransmissionManagerBase<T> extends ActorBase<T> implements MessageTransmissionManagerInterface<T> {
+export class MessageTransmissionManagerBase implements MessageTransmissionManagerInterface {
     protected browserEnv!: boolean
-    protected transmissionSet: Transmission[] = []
+    protected transmissionSet: TransmissionInterface[] = []
     protected adapterManager!: AdapterManager
 
     constructor() {
-        super()
+        // logic here
     }
-
-    public subscribe(actorProfile: ActorProfile, observer: Partial<Observer<T>>, subscribable?: ActorInterface<T>): Unsubscribable {
-        throw new Error("Method not implemented.");
+    subscribeForTransmission(): Observable<TransmissionInterface> {
+        throw new Error('Method not implemented.');
     }
 
-    protected instantiateTransmissionComponents(clientId: string): Transmission {
+    protected instantiateTransmissionComponents(clientId: string): TransmissionInterface {
         throw new Error(`Method not implemented`)
         /* Subscribe to adapterEvent, adapter will give adapter with the relevant client information. 
         It will first check array of transmission clinetID, so if new clientId is detected then a new 

+ 0 - 110
src/connector/adapter.manager.ts

@@ -1,110 +0,0 @@
-import { filter, Observable, Observer, Subject } from "rxjs"
-import { WebsocketTransportService } from "../transport/websocket"
-import { HttpTransportService } from "../transport/http"
-import config from '../config/config.json';
-import { v4 as uuidv4 } from 'uuid'
-import ConsoleLogger from "../utils/log.utils"
-import { AdapterEvent, EventObject, ReceiverAdapterInterface, TransmitterAdapterInterface, Transport, TransportEvent, TransportService, TransportSet } from "../interface/interface"
-import { AdapterManagerBase } from "./adapter.manager.base"
-import { TransmitterAdapter } from "./adapter.transmitter";
-import { ReceiverAdapter } from "./adapter.receiver";
-
-/* Note: There will be a need to use the logic in place for socket utility. Especially for client identification
-Will think about that later. Because that mechanism needs to be made universal somehow. If let's say utilizing already
-existing transport, there are no logic in place to exchange information to identify connected clients. */
-
-export class AdapterManager extends AdapterManagerBase {
-    private console: ConsoleLogger = new ConsoleLogger(`AdapterManager`, ['managers'])
-
-    constructor(eventObj: EventObject, browserEnv?: boolean) {
-        super()
-        this.eventObj = eventObj
-        this.console.log({ message: `Contructing self...` })
-
-        this.sortTransportFromEnv(this.transportSet)
-        // for now set up these transportEvent
-        this.transportSet.forEach(set => {
-            this.setUpTransportService(set, this.eventObj.transportEvent, browserEnv)
-        })
-    }
-
-    public subscribe(): Observable<AdapterEvent> {
-        return new Observable((event: Observer<AdapterEvent>) => {
-            this.eventObj.transportEvent.pipe(
-                filter(event => event.event === `New Client`)
-            ).subscribe({
-                next: (event: TransportEvent) => {
-                    this.handleTransportEvent(event, this.eventObj.adapterEvent, event.transport)
-                }
-            })
-        })
-    }
-
-    private handleTransportEvent(event: TransportEvent, adapterEvent: Subject<AdapterEvent>, transport: Transport): void {
-        let transportService: TransportService | undefined = this.transportServiceArray.find(obj => obj.getInfo() === transport)
-        if (transportService) {
-            let transmitterAdapter: TransmitterAdapterInterface = new TransmitterAdapter(event.data?.clientId, event.transport, transportService)
-            let receiverAdapter: ReceiverAdapterInterface = new ReceiverAdapter(event.data?.clientId, event.transport, transportService)
-            adapterEvent.next({
-                id: uuidv4(),
-                event: 'New Adapter',
-                type: 'Adapter Event',
-                date: new Date(),
-                adapters: [transmitterAdapter, receiverAdapter]
-            } as AdapterEvent)
-        } else {
-            this.console.error({ message: `No ${transport} service is not properly instantiated....` })
-            throw new Error(`No ${transport} service is not properly instantiated....`)
-        }
-    }
-
-    // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.
-    private setUpTransportService(transportSet: TransportSet, event: Subject<TransportEvent>, isClient?: boolean): void {
-        try {
-            let transportService: TransportService = this.instantiateTransportService(transportSet.transport, event)
-            this.transportServiceArray.push(transportService)
-            if (transportService instanceof WebsocketTransportService) {
-                this.console.log({ message: `Just Double Checking... this is websocket` })
-                if (isClient) {
-                    // please note this is subject to change depending on the UI environemnt. Angular has their own built in function to read json file based on Swopt-UI
-                    transportService.startClient(config.connection.transmitter)
-                } else {
-                    transportService.startServer(transportSet.port);
-                }
-            } else if (transportService instanceof HttpTransportService) {
-                this.console.log({ message: `Just Double Checking... this is http` })
-                // Additional Http-specific setup if needed.
-                if (isClient) {
-                    transportService.startClient(config.connection.transmitter)
-                } else {
-                    transportService.startServer(transportSet.port)
-                }
-            }
-        } catch (error) {
-            this.console.error({ message: 'Fail to set transport. Error in setting up transport', details: error })
-        }
-    }
-
-    private instantiateTransportService(transportType: Transport, event: Subject<TransportEvent>): TransportService {
-        if (transportType === 'Websocket') {
-            return new WebsocketTransportService(event)
-        }
-        else if (transportType === 'Http') {
-            return new HttpTransportService(event)
-        } else {
-            throw new Error(`No Transport Service Instantiated`)
-        }
-    }
-
-    private sortTransportFromEnv(transportSet: TransportSet[]): void {
-        let transportList: string[] = process.env.Transport?.split(',') || []
-        let portList: number[] = (process.env.PORT?.split(',') || []).map(port => Number(port));
-        transportList.forEach((transport, index) => {
-            transportSet.push({ transport: transport, port: portList[index] } as unknown as TransportSet)
-        })
-        this.console.log({ message: 'TransportSetList', details: this.transportSet })
-    }
-
-}
-
-

+ 0 - 0
src/interface/actor.interface.ts → src/interface/actor.sample.ts


+ 0 - 0
src/interface/game.ts


+ 15 - 16
src/interface/interface.ts

@@ -1,24 +1,23 @@
 import { BehaviorSubject, Observable, Subject, Unsubscribable } from "rxjs"
-import { RetransmissionService } from "../utils/retransmission.service"
 import { WrappedMessage } from "../utils/message.ordering"
-import { ActorInterface, ActorProfile } from "./actor.interface"
 
 /* EVENT BUS */
-export interface GeneralEvent {
+export interface GeneralEvent<T> {
     id: string,
     type: EventType,
     event: EventMessage,
     date: Date,
-    transmission?: Transmission,
-    adapters?: AdapterInterface[],
-    message?: TransportMessage
+    data: T
+    transport?: Transport,
 }
 
 /* MANAGEMENT */
-export interface MessageTransmissionManagerInterface<T> extends ActorInterface<T> {
+export interface MessageTransmissionManagerInterface {
+    subscribeForTransmission(): Observable<TransmissionInterface>
 }
 
-export interface AdapterManagerInterface<T> extends ActorInterface<T> {
+export interface AdapterManagerInterface {
+    subscribeForAdapters(): Observable<AdapterInterface>
 }
 
 
@@ -49,7 +48,7 @@ export interface TransmitterAdapterInterface extends AdapterInterface {
 }
 
 export interface ReceiverAdapterInterface extends AdapterInterface {
-    subscribe(): Observable<any>
+    subscribeForIncoming(): Observable<any>
 }
 
 
@@ -85,15 +84,15 @@ export interface FisMessage {
     data: any
 }
 
-export interface Transmission {
+export interface TransmissionInterface {
     clientId: string,
     transmitter: MessageTransmitterInterface,
     receiver: MessageReceiverInterface,
     requestResponse: MessageRequestResponseInterface,
-    event: Observable<GeneralEvent>
+    event: Observable<GeneralEvent<TransmissionInterface>>
 }
 
-export type Transport = 'Websocket' | 'Http' | 'TCP'
+export type Transport = 'Websocket' | 'Http' | 'TCP' | undefined
 
 export enum AdapterTransmissionRole {
     Transmitter,
@@ -106,10 +105,11 @@ export type EventType = `General Event` | 'Transport Event' | 'Transmission Even
 export type TransmissionRole = `Transmitter` | 'Receiver' | 'RequestResponse'
 export type EventMessage = 'Server Started' | 'New Client' | 'Client Connected' | 'Client Re-connected' | 'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter' | 'Re-Flush' | 'New Transport' | 'New Transmission'
 export type ConnectionState = 'ONLINE' | 'OFFLINE'
-export interface TransportService {
+
+export interface TransportServiceInterface {
     getInfo(): Transport
     emit(message: TransportMessage): void
-    subscribe(): Observable<GeneralEvent> //all messages and whatever event will go through this, easier to implemnet across different transport protocol
+    subscribeForTransportEvent(): Observable<GeneralEvent<any>> //all messages and whatever event will go through this, easier to implemnet across different transport protocol
 }
 
 export interface Info {
@@ -117,7 +117,7 @@ export interface Info {
 }
 
 export interface ClientObject {
-    id: string,
+    clientId: string,
     dateCreated: Date,
     connectionState: BehaviorSubject<ConnectionState>
 }
@@ -132,7 +132,6 @@ export interface TransportSet {
     transport: Transport,
     port: number
 }
-
 export interface TransportProfileMessage {
     clientId: string,
     message?: string,

+ 1 - 1
src/test/actor.ts

@@ -1,6 +1,6 @@
 import { interval, map, Subject } from "rxjs";
 import { ActorBase } from "../base/actor.base"
-import { ActorInterface, ActorMessage, ActorProfile } from "../interface/actor.interface";
+import { ActorInterface, ActorMessage, ActorProfile } from "../interface/actor.sample";
 import { FisMessage } from "../interface/interface";
 
 class TestA extends ActorBase<{ message: string }> {

+ 8 - 6
src/test/transmitter.ts

@@ -3,14 +3,15 @@ import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { FisMessage, Transmission, TransportEvent, TransportMessage } from "../interface/interface";
+import { FisMessage, GeneralEvent, TransmissionInterface, TransportMessage } from "../interface/interface";
+import config from '../config/config.json';
 class Supervisor {
     private console = new ConsoleLogger('Supervisor', ['base'])
     private clientIncomingMessage: Subject<FisMessage> = new Subject()
     private messageProducer!: MessageProducer
     private transmissionManager!: MessageTransmissionManager
-    private event!: Observable<TransportEvent>
-    private transmissionSets: Transmission[] = []
+    private event!: Observable<GeneralEvent<any>>
+    private transmissionSets: TransmissionInterface[] = []
 
     constructor() {
         // so need them adapters now. But supervisor shouldn't be concerned, only messageTransmissionManager and ConnectionManager
@@ -18,17 +19,18 @@ class Supervisor {
         this.transmissionManager = new MessageTransmissionManager()
         this.event = this.transmissionManager.getEvent()
 
-        this.transmissionManager.subscribe().subscribe((transmissionSet: Transmission) => {
+        this.transmissionManager.subscribeForTransmission().subscribe((transmissionSet: TransmissionInterface) => {
             this.transmissionSets.push(transmissionSet)
 
             this.handleClientActivity(transmissionSet)
         })
+
     }
 
     // only called once for each connected clients.
-    private handleClientActivity(messageTransmission: Transmission): void {
+    private handleClientActivity(messageTransmission: TransmissionInterface): void {
         // start listening to incoming messages from this client
-        messageTransmission.receiver.subscribe().subscribe((event: TransportEvent) => {
+        messageTransmission.receiver.subscribe().subscribe((event: GeneralEvent<TransportMessage>) => {
             let requestMessage: FisMessage = ((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage
             this.console.log({ message: `General Bus ${requestMessage?.header?.messageID ?? 'Not a message'}`, details: event }) // receiving end
             this.clientIncomingMessage.next(requestMessage)

+ 6 - 6
src/transmission/msg.transmission.manager.ts

@@ -1,17 +1,17 @@
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
-import { AdapterManager } from "../connector/adapter.manager";
+import { AdapterManager } from "../adapters/adapter.manager";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
 import { filter, Observable, Observer, Subject, Unsubscribable } from "rxjs";
 import ConsoleLogger from "../utils/log.utils";
-import { TransmitterAdapter } from "../connector/adapter.transmitter"
-import { ReceiverAdapter } from "../connector/adapter.receiver"
+import { TransmitterAdapter } from "../adapters/adapter.transmitter"
+import { ReceiverAdapter } from "../adapters/adapter.receiver"
 import { MessageTransmissionManagerBase } from "../base/msg.transmission.manager.base";
-import { ActorInterface, ActorProfile } from "../interface/actor.interface";
+import { ActorInterface, ActorProfile } from "../interface/actor.sample";
 import { TransportService } from "../interface/interface";
 
-export class MessageTransmissionManager<GeneralEvent> extends MessageTransmissionManagerBase<GeneralEvent> {
+export class MessageTransmissionManager extends MessageTransmissionManagerBase {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionManager`, ['managers'])
 
     constructor(transportRef: Observable<TransportService>, browserEnv?: boolean) {
@@ -29,7 +29,7 @@ export class MessageTransmissionManager<GeneralEvent> extends MessageTransmissio
         if (browserEnv) this.browserEnv = browserEnv
         // Subscribe for adapterManager and it's relevent event
         this.adapterManager = new AdapterManager()
-        this.adapterManager.subscribe(this.actorProfile, this.incomingBus, this.outgoingBus.asObservable)
+        this.adapterManager
     }
 
     public subscribe(actorProfile: ActorProfile, observer: Partial<Observer<GeneralEvent>>, subscribable?: ActorInterface<GeneralEvent>): Unsubscribable {

+ 10 - 10
src/transmission/msg.transmission.receiver.ts

@@ -1,10 +1,10 @@
 import { filter, Observable, Observer, Subject, Subscription } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
-import { ReceiverAdapter } from '../connector/adapter.receiver';
+import { ReceiverAdapter } from '../adapters/adapter.receiver';
 import { checkMessage, WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
 import { MessageTransmissionBase } from '../base/msg.transmission.base';
-import { MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
+import { GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
 
 export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
@@ -12,23 +12,23 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
     private currentAdapter!: ReceiverAdapterInterface
     // private toBePassedOver: Subject<WrappedMessage> = new Subject()
 
-    constructor(clientId: string, eventObj: EventObject) {
+    constructor(clientId: string, event:  Subject<GeneralEvent<any>>) {
         super()
         this.clientId = clientId
-        this.eventObj = eventObj
+        this.eventObj = event
 
         this.handleAdapterEvent(this.eventObj.adapterEvent.asObservable())
     }
 
-    getIncoming(): Observable<TransportEvent> {
+    getIncoming(): Observable<GeneralEvent<TransportMessage>> {
         this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
-        return new Observable((observable: Observer<TransportEvent>) => {
+        return new Observable((observable: Observer<GeneralEvent<any>>) => {
             // logic here
             // Need to merge all the adapters into one when the time comes 
             // SAMPLE: This adapterArray.forEach(adapter => { ... })
-            const subscription: Subscription = this.currentAdapter.subscribe().pipe(
-                filter((event: TransportEvent) => event.event == 'New Message'),
-            ).subscribe((event: TransportEvent) => {
+            const subscription: Subscription = this.currentAdapter.subscribeForIncoming().pipe(
+                filter((event: GeneralEvent<any>) => event.event == 'New Message'),
+            ).subscribe((event: GeneralEvent<TransportMessage>) => {
                 // console.log(event) // data is transportMessage instead of eventmessage
                 this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
                 checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
@@ -49,7 +49,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         })
     }
 
-    private handleAdapterEvent(adapterEvent: Observable<AdapterEvent>): void {
+    private handleAdapterEvent(adapterEvent: Observable<GeneralEvent<any>>): void {
 
     }
 

+ 1 - 1
src/transmission/msg.transmission.transmitter.ts

@@ -4,7 +4,7 @@ import { BehaviorSubject, distinct, distinctUntilChanged, filter, map, Observabl
 import { RetransmissionService } from "../utils/retransmission.service";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
-import { TransmitterAdapter } from "../connector/adapter.transmitter";
+import { TransmitterAdapter } from "../adapters/adapter.transmitter";
 import { AdapterEvent, AdapterInterface, ConnectionState, EventObject, FisMessage, MessageTransmitterInterface, TransmitterAdapterInterface, TransportEvent, TransportMessage } from "../interface/interface";
 import { error } from "console";
 

+ 22 - 23
src/transport/http.ts

@@ -1,35 +1,38 @@
 import { Express } from 'express';
 import { filter, Observable, Subject, Subscription, take } from "rxjs";
-import { ClientObject, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
 import config from '../config/config.json';
 import { handleClientHttpConnection, handleHttpClient, initiateClientToServer, startHttpServer } from "../utils/http.utils";
 import { WrappedMessage } from '../utils/message.ordering';
 import { error } from 'console';
 import axios, { AxiosError } from 'axios';
-import { EventMessage } from '../interface/transport.interface';
 import ConsoleLogger from '../utils/log.utils';
+import { ClientObject, GeneralEvent, Transport, TransportMessage, TransportServiceInterface } from '../interface/interface';
 
-export class HttpTransportService implements TransportService {
+export class HttpTransportService implements TransportServiceInterface {
     private console: ConsoleLogger = new ConsoleLogger(`HttpTransportService`, ['transport'])
     private baseUrl!: string;
-    private info: Transport = Transport.Http
+    private info: Transport = `Http`
     private connectedHttpServer: ConnectedHttpServer[] = [] // to allow the possibility of having to communicate with multiple servers as a client
     private connectedHttpClients: ConnectedHttpClient[] = [] // to keep track of the all the clients that are connected
-    transportEvent!: Subject<TransportEvent>
+    transportEvent!: Subject<GeneralEvent<any>>
 
-    constructor(event: Subject<TransportEvent>) {
+    constructor(event: Subject<GeneralEvent<any>>) {
         this.baseUrl = config.connection.transmitter
         this.transportEvent = event
     }
 
+    subscribeForTransportEvent(): Observable<GeneralEvent<any>> {
+        throw new Error('Method not implemented.');
+    }
+
     public getInfo(): Transport {
         return this.info
     }
 
     public emit(message: TransportMessage): void {
-        let clientObj: ConnectedHttpClient | undefined = this.connectedHttpClients.find(obj => obj.id == message.target)
-        let serverObj: ConnectedHttpServer | undefined = this.connectedHttpServer.find(obj => obj.id === message.target)
+        let clientObj: ConnectedHttpClient | undefined = this.connectedHttpClients.find(obj => obj.clientId == message.target)
+        let serverObj: ConnectedHttpServer | undefined = this.connectedHttpServer.find(obj => obj.clientId === message.target)
 
         // for server usage
         if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
@@ -46,39 +49,35 @@ export class HttpTransportService implements TransportService {
                     console.error('HTTP emit error:', error.code);
                     this.transportEvent.next({
                         id: uuidv4(),
+                        type: 'Transport Event',
                         event: 'Re-Flush',
-                        data: {
-                            clientId: serverObj.id,
-                            payload: message
-                        } as EventMessage
-                    } as TransportEvent)
+                        date: new Date(),
+                        data: message,
+                        transport: 'Http'
+                    } as GeneralEvent<TransportMessage>)
                 });
             } else {
                 this.console.error({ message: `Target Server is offline: Reflusing message ${(message.payload as WrappedMessage).thisMessageID}` });
                 this.transportEvent.next({
                     id: uuidv4(),
+                    type: 'Transport Event',
                     event: 'Re-Flush',
-                    data: {
-                        clientId: serverObj.id,
-                        payload: message
-                    } as EventMessage
-                } as TransportEvent)
+                    date: new Date(),
+                    data: message,
+                    transport: 'Http'
+                } as GeneralEvent<TransportMessage>)
             }
         }
 
     }
 
-    public subscribe(): Observable<TransportEvent> {
-        return this.transportEvent.asObservable();
-    }
-
     public startServer(port: number): void {
         startHttpServer(port).subscribe({
             next: (client: ConnectedHttpClient) => {
                 handleHttpClient(client, this.connectedHttpClients).subscribe({
                     next: event => this.transportEvent.next(event),
                     error: error => console.error(error),
-                    complete: () => (`Client ${client.id} disconnected...`)
+                    complete: () => (`Client ${client.clientId} disconnected...`)
                 })
             },
             error: error => this.console.error({ message: 'Observer Error', details: error }),

+ 8 - 10
src/transport/websocket.ts

@@ -2,21 +2,20 @@ import { Observable, Subject } from "rxjs";
 import { Socket as SocketForConnectedServer } from 'socket.io-client'
 import { Socket as SocketForConnectedClient } from "socket.io"
 import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
-import { ClientObject, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { WrappedMessage } from "../utils/message.ordering";
-import { FisMessage } from "../interface/transport.interface";
 import ConsoleLogger from "../utils/log.utils";
+import { ClientObject, FisMessage, GeneralEvent, Transport, TransportMessage, TransportServiceInterface } from "../interface/interface";
 
 /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
-export class WebsocketTransportService implements TransportService {
+export class WebsocketTransportService implements TransportServiceInterface {
     private console: ConsoleLogger = new ConsoleLogger(`WebsocketTransportService`, ['transport'])
-    private info: Transport = Transport.Websocket
+    private info: Transport = `Websocket`
     private connectedSocketServer: ConnectedSocketServer[] = [] // to allow the possibility of having to communicate with multiple servers as a client
     private connectedClientSocket: ConnectedSocketClient[] = [] // to keep track of the all the clients that are connected
     // private incomingMessage: Subject<TransportMessage> = new Subject() // this is only for client roles only atm
-    private transportEvent!: Subject<TransportEvent>
+    private transportEvent!: Subject<GeneralEvent<any>>
 
-    constructor(event: Subject<TransportEvent>) {
+    constructor(event: Subject<GeneralEvent<any>>) {
         this.console.log({ message: `WebsocketTransportService: Constructing socket transport service....` })
         this.transportEvent = event
         // logic here
@@ -49,8 +48,8 @@ export class WebsocketTransportService implements TransportService {
 
     public emit(message: TransportMessage): void {
         this.console.log({ message: `Emitting: ${((message.payload as WrappedMessage).payload as FisMessage).header.messageID} to ${message.target}`, details: message })
-        let clientObj: ConnectedSocketClient | undefined = this.connectedClientSocket.find(obj => obj.id == message.target)
-        let serverObj: ConnectedSocketServer | undefined = this.connectedSocketServer.find(obj => obj.id === message.target)
+        let clientObj: ConnectedSocketClient | undefined = this.connectedClientSocket.find(obj => obj.clientId === message.target)
+        let serverObj: ConnectedSocketServer | undefined = this.connectedSocketServer.find(obj => obj.clientId === message.target)
         // this.console.log({ message: `${serverObj?.connectionState.getValue(), serverObj?.id}` })
         // for server usage
         if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
@@ -62,8 +61,7 @@ export class WebsocketTransportService implements TransportService {
         }
     }
 
-    // this returns the ref pointer for the TransportEvent instantiated at Supervisor. Socket will broadcast incoming messages as event
-    public subscribe(): Observable<TransportEvent> {
+    public subscribeForTransportEvent(): Observable<GeneralEvent<any>> {
         return this.transportEvent.asObservable()
     }
 

+ 52 - 0
src/utils/transport.utils.ts

@@ -0,0 +1,52 @@
+import { Subject } from "rxjs";
+import { GeneralEvent, Transport, TransportServiceInterface, TransportSet } from "../interface/interface";
+import { WebsocketTransportService } from "../transport/websocket";
+import { HttpTransportService } from "../transport/http";
+import ConsoleLogger from "./log.utils";
+const console: ConsoleLogger = new ConsoleLogger(`TransportUtils`, ['transport'])
+// Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.
+export function setUpTransportService(transportSet: TransportSet, event: Subject<GeneralEvent<any>>, transportServiceArray: TransportServiceInterface[], url: string, isClient?: boolean): void {
+    try {
+        let transportService: TransportServiceInterface = instantiateTransportService(transportSet.transport, event)
+        transportServiceArray.push(transportService)
+        if (transportService instanceof WebsocketTransportService) {
+            console.log({ message: `Just Double Checking... this is websocket` })
+            if (isClient) {
+                // please note this is subject to change depending on the UI environemnt. Angular has their own built in function to read json file based on Swopt-UI
+                transportService.startClient(url)
+            } else {
+                transportService.startServer(transportSet.port);
+            }
+        } else if (transportService instanceof HttpTransportService) {
+            console.log({ message: `Just Double Checking... this is http` })
+            // Additional Http-specific setup if needed.
+            if (isClient) {
+                transportService.startClient(url)
+            } else {
+                transportService.startServer(transportSet.port)
+            }
+        }
+    } catch (error) {
+        console.error({ message: 'Fail to set transport. Error in setting up transport', details: error })
+    }
+}
+
+function instantiateTransportService(transportType: Transport, event: Subject<GeneralEvent<any>>): TransportServiceInterface {
+    if (transportType === 'Websocket') {
+        return new WebsocketTransportService(event)
+    }
+    else if (transportType === 'Http') {
+        return new HttpTransportService(event)
+    } else {
+        throw new Error(`No Transport Service Instantiated`)
+    }
+}
+
+export function sortTransportFromEnv(transportSet: TransportSet[]): void {
+    let transportList: string[] = process.env.Transport?.split(',') || []
+    let portList: number[] = (process.env.PORT?.split(',') || []).map(port => Number(port));
+    transportList.forEach((transport, index) => {
+        transportSet.push({ transport: transport, port: portList[index] } as unknown as TransportSet)
+    })
+    console.log({ message: 'TransportSetList', details: transportSet })
+}