Jelajahi Sumber

additonal bug fixes for adapters and transmission

enzo 2 bulan lalu
induk
melakukan
d836c2b41a

+ 16 - 10
src/adapters/adapter.manager.ts

@@ -1,7 +1,7 @@
 import { filter, Observable, Observer, Subject, Subscription } from "rxjs"
 import { v4 as uuidv4 } from 'uuid'
 import ConsoleLogger from "../utils/log.utils"
-import { AdapterInterface, ClientObject, GeneralEvent, ReceiverAdapterInterface, TransmitterAdapterInterface, Transport, TransportServiceInterface } from "../interface/interface"
+import { AdapterInterface, ClientObject, GeneralEvent, ReceiverAdapterInterface, TransmissionInterface, TransmitterAdapterInterface, Transport, TransportServiceInterface } from "../interface/interface"
 import { TransmitterAdapter } from "./adapter.transmitter";
 import { ReceiverAdapter } from "./adapter.receiver";
 import { AdapterManagerBase } from "../base/adapter.manager.base";
@@ -12,7 +12,7 @@ export class AdapterManager extends AdapterManagerBase {
     constructor(event: Subject<GeneralEvent<any>>, browserEnv?: boolean) {
         super()
         this.browserEnv = browserEnv ?? false
-        this.console.log({ message: `Contructing self...` })
+        this.console.log({ message: `Contructing self... ${this.browserEnv ? `is receiving end` : `is not browser env`}` })
         this.event = event
 
         this.connectToExistingTransport(this.event)
@@ -21,17 +21,19 @@ export class AdapterManager extends AdapterManagerBase {
     public subscribeForAdapters(): Observable<GeneralEvent<AdapterInterface>> {
         return new Observable((observer: Observer<GeneralEvent<AdapterInterface>>) => {
             const subscription: Subscription = this.event.pipe(
-                filter(event => event.type === `Transport Event`),
-                filter(event => event.event === 'New Client' || event.event === `New Server`)
-            ).subscribe((event: GeneralEvent<ClientObject>) => {
-                let adapters: AdapterInterface[] = this.instantiateAdapterComponents(event.data)
+                // filter(event => event.type === `Transport Event`),
+                // filter(event => event.event === 'New Client' || event.event === `New Server`)
+                filter(event => event.type === `Transmission Event`),
+                filter(event => event.event === `New Transmission`),
+            ).subscribe((event: GeneralEvent<TransmissionInterface>) => {
+                let adapters: AdapterInterface[] = this.instantiateAdapterComponents(event.data.clientInfo)
                 if (adapters.length > 0) {
                     adapters.forEach((adapter: AdapterInterface) => {
                         observer.next({
                             id: uuidv4(),
                             type: `Adapter Event`,
                             event: `New Adapter`,
-                            transport: event.data.transport,
+                            transport: event.data.clientInfo.transport,
                             date: new Date(),
                             data: adapter
                         })
@@ -58,6 +60,7 @@ export class AdapterManager extends AdapterManagerBase {
                 this.updateTransportServicesRecord(transportService)
             })
         })
+        // to automatically connect to existing started transport services
         this.event.next({
             id: uuidv4(),
             type: 'Adapter Event',
@@ -81,9 +84,12 @@ export class AdapterManager extends AdapterManagerBase {
     private instantiateAdapterComponents(clientRef: ClientObject): AdapterInterface[] {
         let transportService = this.transportServiceArray.find(obj => obj.getInfo().transportServiceId === clientRef.transportServiceId)
         if (transportService) {
-            let transmitterAdapter: TransmitterAdapterInterface = new TransmitterAdapter(clientRef.clientId, transportService.getInfo().transport, transportService)
-            let receiverAdapter: ReceiverAdapterInterface = new ReceiverAdapter(clientRef.clientId, transportService.getInfo().transport, transportService)
-            return [transmitterAdapter, receiverAdapter]
+            this.console.log({ message: `Instantiating adapters for client ${clientRef.clientId}` });
+            const adapters: AdapterInterface[] = [
+                new TransmitterAdapter(clientRef.clientId, transportService.getInfo().transport, transportService),
+                new ReceiverAdapter(clientRef.clientId, transportService.getInfo().transport, transportService),
+            ];
+            return adapters;
         } else {
             this.console.error({ message: `Transport Service id ${clientRef.transportServiceId} not found. Unable to instantiate adapters.` })
             return []

+ 3 - 2
src/adapters/adapter.receiver.ts

@@ -14,10 +14,11 @@ export class ReceiverAdapter extends AdapterBase {
 
     constructor(adapterId: string, adapterType: Transport, transportService: TransportServiceInterface) {
         super()
-        
-        this.console = new ConsoleLogger(`${adapterType}ReceiverAdapter`, ['adapter'])
         this.setAdapterProfile(adapterId, adapterType, 'Receiver')
+        this.console = new ConsoleLogger(`${adapterType}ReceiverAdapter`, ['adapter'])
         this.transportService = transportService
+
+        this.console.log({ message: `Contructing ReceiverAdapter for client: ${adapterId}` })
     }
 
     subscribeForIncoming(): Observable<GeneralEvent<any>> {

+ 3 - 2
src/adapters/adapter.transmitter.ts

@@ -13,10 +13,11 @@ export class TransmitterAdapter extends AdapterBase {
 
     constructor(adapterId: string, adapterType: Transport, transportService: TransportServiceInterface) {
         super()
-        // logic here
+        this.setAdapterProfile(adapterId, adapterType, 'Transmitter')
         this.console = new ConsoleLogger(`${adapterType}TransmitterAdapter`, ['adapter'])
         this.transportService = transportService
-        this.setAdapterProfile(adapterId, adapterType, 'Transmitter')
+
+        this.console.log({ message: `Contructing TransmitterAdapter for client: ${adapterId}` })
     }
 
     emit(message: WrappedMessage): void {

+ 2 - 2
src/base/msg.transmission.manager.base.ts

@@ -1,6 +1,6 @@
 
 import { Observable, Observer, Subject, Unsubscribable } from 'rxjs';
-import { GeneralEvent, MessageTransmissionManagerInterface, TransmissionInterface } from '../interface/interface';
+import { ClientObject, GeneralEvent, MessageTransmissionManagerInterface, TransmissionInterface } from '../interface/interface';
 import { ActorInterface, ActorProfile } from '../interface/actor.sample';
 import { ActorBase } from './actor.base';
 import { AdapterManager } from '../adapters/adapter.manager';
@@ -18,7 +18,7 @@ export class MessageTransmissionManagerBase implements MessageTransmissionManage
         throw new Error('Method not implemented.');
     }
 
-    protected instantiateTransmissionComponents(clientId: string, event: Subject<GeneralEvent<any>>): TransmissionInterface {
+    protected instantiateTransmissionComponents(clientObj: ClientObject, event: Subject<GeneralEvent<any>>): TransmissionInterface {
         throw new Error(`Method not implemented`)
         /* Subscribe to adapterEvent, adapter will give adapter with the relevant client information. 
         It will first check array of transmission clinetID, so if new clientId is detected then a new 

+ 1 - 1
src/interface/interface.ts

@@ -90,7 +90,7 @@ export interface TransmissionInterface {
     transmitter: MessageTransmitterInterface,
     receiver: MessageReceiverInterface,
     requestResponse: MessageRequestResponseInterface,
-    event: Observable<GeneralEvent<TransmissionInterface>>
+    clientInfo: ClientObject
 }
 
 export type Transport = 'Websocket' | 'Http' | 'TCP' | undefined

+ 7 - 3
src/test/receiver.ts

@@ -21,13 +21,13 @@ class Supervisor {
 
     constructor() {
         this.event = new Subject()
-        this.transmissionManager = new MessageTransmissionManager(this.event, this.isClient)
         this.sortTransportFromEnv(this.transportSet)
         this.transportSet.forEach(transport => {
             this.setUpTransportService(transport, this.event, this.isClient)
         })
         this.tieInAdapterWithExistingTransportServices(this.event)
 
+        this.transmissionManager = new MessageTransmissionManager(this.event, this.isClient)
         this.transmissionManager.subscribeForTransmission().pipe(
             filter(event => event.type === `Transmission Event`),
             filter(event => event.event === `New Transmission`)
@@ -41,6 +41,11 @@ class Supervisor {
                 this.outgoingPipe.subscribe(message => transmission.transmitter.emit(message))
             })
         )
+
+        // testing
+        this.event.subscribe(event => {
+            this.console.log({ message: `Supervisor Event: ${event.type} && ${event.event}` })
+        })
     }
 
     // only called once for each connected clients.
@@ -150,8 +155,7 @@ class Supervisor {
         const subscription: Subscription = eventBus.pipe(
             filter(event => event.type === `Adapter Event`),
             filter(event => event.event === `Adapter Manager Started`)
-        ).subscribe((event: GeneralEvent<any>) => {
-            this.console.log({ message: `There's something going on here. ${event.event}` })
+        ).subscribe(event => {
             eventBus.next({
                 id: uuidv4(),
                 type: `General Event`,

+ 7 - 2
src/test/transmitter.ts

@@ -29,10 +29,15 @@ class Supervisor {
         // so need them adapters now. But supervisor shouldn't be concerned, only messageTransmissionManager and ConnectionManager
         this.messageProducer = new MessageProducer(this.clientIncomingMessage)
         this.transmissionManager = new MessageTransmissionManager(this.event)
-        this.startMessageTransmission(this.event)
+        this.startMessageTransmission()
+
+        // testing
+        this.event.subscribe(event => {
+            this.console.log({ message: `Supervisor Event: ${event.type} && ${event.event}` })
+        })
     }
 
-    private startMessageTransmission(transmissionEvent: Subject<GeneralEvent<TransmissionInterface>>): void {
+    private startMessageTransmission(): void {
         this.transmissionManager.subscribeForTransmission().pipe(
             filter(event => event.type === `Transmission Event`),
             filter(event => event.event == `New Transmission`)

+ 16 - 16
src/transmission/msg.transmission.manager.ts

@@ -5,11 +5,8 @@ import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
 import { filter, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs";
 import ConsoleLogger from "../utils/log.utils";
-import { TransmitterAdapter } from "../adapters/adapter.transmitter"
-import { ReceiverAdapter } from "../adapters/adapter.receiver"
 import { MessageTransmissionManagerBase } from "../base/msg.transmission.manager.base";
-import { ActorInterface, ActorProfile } from "../interface/actor.sample";
-import { GeneralEvent, TransmissionInterface, TransportServiceInterface } from "../interface/interface";
+import { AdapterInterface, ClientObject, GeneralEvent, TransmissionInterface } from "../interface/interface";
 
 export class MessageTransmissionManager extends MessageTransmissionManagerBase {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionManager`, ['managers'])
@@ -17,11 +14,14 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
     constructor(event: Subject<GeneralEvent<any>>, browserEnv?: boolean) {
         super()
         if (browserEnv) this.browserEnv = browserEnv
-        this.console.log({ message: `Constructing self...` })
+        this.console.log({ message: `Contructing self... ${this.browserEnv ? `is receiving end` : `is not browser env`}` })
         this.event = event
         // Subscribe for adapterManager and it's relevent event
-        this.adapterManager = new AdapterManager(event)
-        this.adapterManager
+        this.adapterManager = new AdapterManager(event, this.browserEnv)
+        // this.adapterManager.subscribeForAdapters().subscribe(this.event)
+        this.adapterManager.subscribeForAdapters().subscribe((adapterEvent: GeneralEvent<AdapterInterface>) => {
+            this.event.next(adapterEvent)
+        })
     }
 
     public subscribeForTransmission(): Observable<GeneralEvent<TransmissionInterface>> {
@@ -29,12 +29,12 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
             const subscription: Subscription = this.event.pipe(
                 filter(event => event.type === `Transport Event`),
                 filter(event => event.event === `New Client` || event.event === `New Server`)
-            ).subscribe(event => {
+            ).subscribe((event: GeneralEvent<ClientObject>) => {
                 // get all adapters for all the connection
-                let transmission: TransmissionInterface | undefined = this.instantiateTransmissionComponents(event?.data?.clientId, this.event)
-                this.console.log({ message: `Passing this transmission<${transmission.clientId}> to global event bus.` })
+                let transmission: TransmissionInterface | undefined = this.instantiateTransmissionComponents(event.data, this.event)
+                this.console.log({ message: `Passing this transmission for client:${transmission.clientId} to global event bus.` })
                 if (transmission) {
-                    observer.next({
+                    this.event.next({
                         id: uuidv4(),
                         type: `Transmission Event`,
                         event: 'New Transmission',
@@ -51,16 +51,16 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
         })
     }
 
-    protected instantiateTransmissionComponents(clientId: string, eventRef: Subject<GeneralEvent<any>>): TransmissionInterface {
-        let receiverInstance: MessageTransmissionReceiver = new MessageTransmissionReceiver(clientId, eventRef)
-        let transmitterInstance: MessageTransmissionTransmitter = new MessageTransmissionTransmitter(clientId, eventRef)
+    protected instantiateTransmissionComponents(clientObj: ClientObject, eventRef: Subject<GeneralEvent<any>>): TransmissionInterface {
+        let receiverInstance: MessageTransmissionReceiver = new MessageTransmissionReceiver(clientObj.clientId, eventRef)
+        let transmitterInstance: MessageTransmissionTransmitter = new MessageTransmissionTransmitter(clientObj.clientId, eventRef)
         let requestResponseInstance: MessageTransmissionRequestResponse = new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, eventRef)
         let transmissionObj: TransmissionInterface = {
-            clientId: clientId,
+            clientId: clientObj.clientId,
             transmitter: transmitterInstance,
             receiver: receiverInstance,
             requestResponse: requestResponseInstance,
-            event: this.event
+            clientInfo: clientObj
         }
 
         return transmissionObj

+ 7 - 3
src/transmission/msg.transmission.receiver.ts

@@ -54,11 +54,15 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
             filter(event => event.event === `New Adapter`),
             map(event => {
                 return event.data
+            }),
+            filter((adapter: AdapterInterface) => adapter.role === `Receiver`),
+            map(adapter => {
+                return adapter as ReceiverAdapter
             })
-        ).subscribe((adapter: AdapterInterface) => {
-            if (adapter.role == `Receiver` && !this.adapters.some(adapterObj => adapterObj.adapterId === adapter.adapterId)) {
+        ).subscribe((adapter: ReceiverAdapter) => {
+            if (!this.adapters.some(adapterObj => adapterObj.adapterId === adapter.adapterId)) {
                 this.adapters.push(adapter)
-                this.currentAdapter = adapter as ReceiverAdapter
+                this.currentAdapter = adapter
                 this.currentAdapter.subscribeForIncoming().subscribe(this.incomingMessage)
             } else {
                 this.console.error({ message: `Adapter ID: ${adapter.adapterId} already existed.` })

+ 16 - 33
src/transmission/msg.transmission.transmitter.ts

@@ -1,6 +1,6 @@
 import { MessageTransmissionBase } from "../base/msg.transmission.base";
 import { v4 as uuidv4 } from 'uuid'
-import { BehaviorSubject, distinct, distinctUntilChanged, filter, map, Observable, Subject, Subscription } from "rxjs";
+import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Subject, Subscription } from "rxjs";
 import { RetransmissionService } from "../utils/retransmission.service";
 import { WrappedMessage } from "../utils/message.ordering";
 import ConsoleLogger from "../utils/log.utils";
@@ -25,11 +25,11 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         this.messageToBeTransmitted = new Subject()
         this.messageToBeBuffered = new Subject()
         this.buffer = new RetransmissionService()
-        this.handleAdapters(this.event)
+        this.handleAdapters(this.event.asObservable())
         this.setupBuffer()
 
         // special case just for http in case of server/client disconnected, the unsent msg will be flushed back into messageToBeBuffered
-        // logic here
+       
     }
 
     public emit(message: FisMessage): void {
@@ -71,43 +71,26 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         })
     }
 
-    private handleAdapters(adaptersEvent: Subject<GeneralEvent<any>>): void {
+    private handleAdapters(adaptersEvent: Observable<GeneralEvent<any>>): void {
+        // hardcode it to use the first adapter added for now. Haven't decide on the logic to dynamically switch them adapters
         adaptersEvent.pipe(
+            filter(event => event.type === `Adapter Event`),
             filter(event => event.event === `New Adapter`),
             map(event => { return event.data }),
+            filter((adapter: AdapterInterface) => adapter.role === `Transmitter`),
+            map(adapter => { return adapter as TransmitterAdapterInterface })
         ).subscribe({
-            next: (adapters: AdapterInterface[]) => {
-                adapters.forEach((adapter: AdapterInterface) => {
-                    if (adapter.role === `Transmitter`) {
-                        this.adapters.push(adapter as TransmitterAdapterInterface)
-                        adaptersEvent.next({
-                            id: uuidv4(),
-                            type: 'Transmission Event',
-                            event: `New Adapter`,
-                            date: new Date(),
-                            data: adapter,
-                            transport: adapter.transport
-                        })
-                    }
-                })
+            next: (adapter: TransmitterAdapterInterface) => {
+                this.adapters.push(adapter)
+                if (!this.currentAdapter) {
+                    this.currentAdapter = adapter as TransmitterAdapterInterface
+                    this.console.log({ message: `Setting Current adapter = ${this.currentAdapter.adapterId}` })
+                } else {
+                    this.console.log({ message: `Already have existing transmitting adapter. Current adapter = ${this.currentAdapter.adapterId}` })
+                }
             },
             error: error => this.console.error({ message: 'Observer Error', details: error })
         })
-
-        // listen to newly added adapters in transmission
-        adaptersEvent.pipe(
-            filter(event => event.type === `Transmission Event`),
-            filter(event => event.event === `New Adapter`),
-            map(event => {
-                return event.data
-            })
-        ).subscribe((adapter: AdapterInterface) => {
-            if (!this.currentAdapter) {
-                this.currentAdapter = adapter as TransmitterAdapterInterface
-            } else {
-                this.console.log({ message: `Already have existing transmitting adapter. Currently hardcode to use only 1` })
-            }
-        })
     }
 
     private uniqueHandlerToFlushUnsentMessages(event: Observable<GeneralEvent<any>>): void {

+ 2 - 10
src/utils/socket.utils.ts

@@ -227,11 +227,7 @@ export function handleNewSocketClient(transportServiceId: string, socket: Socket
                     type: 'Transport Event',
                     event: `New Client`,
                     date: new Date(),
-                    data: {
-                        clientId: clientInstance.clientId,
-                        message: `New Socket Client Connected. Adapter ID assigned: ${clientInstance.clientId}`,
-                        payload: clientInstance
-                    },
+                    data: clientInstance,
                     transport: 'Websocket'
                 })
                 // Update connected clientInstance info to adapter
@@ -280,11 +276,7 @@ export function startListening(socket: SocketForConnectedClient, client: Connect
         type: 'Transport Event',
         event: oldClient ? 'Client Re-connected' : `Client Connected`,
         date: new Date(),
-        data: {
-            clientId: client.clientId,
-            message: `Socket Client ${oldClient ? `Re-Connected` : `Connected`}. Adapter ID assigned: ${client.clientId}`,
-            payloclientId: client
-        },
+        data: client,
         transport: 'Websocket'
     })
     // Resume operation