Jelajahi Sumber

additional fixes on adapter events management

enzo 2 bulan lalu
induk
melakukan
369b34ecd1

+ 2 - 1
src/adapters/adapter.manager.ts

@@ -21,7 +21,8 @@ export class AdapterManager extends AdapterManagerBase {
     public subscribeForAdapters(): Observable<GeneralEvent<AdapterInterface>> {
         return new Observable((observer: Observer<GeneralEvent<AdapterInterface>>) => {
             const subscription: Subscription = this.event.pipe(
-                filter((event: GeneralEvent<ClientObject>) => event.event === 'New Client')
+                filter(event => event.type === `Transport Event`),
+                filter(event => event.event === 'New Client' || event.event === `New Server`)
             ).subscribe((event: GeneralEvent<ClientObject>) => {
                 let adapters: AdapterInterface[] = this.instantiateAdapterComponents(event.data)
                 if (adapters.length > 0) {

+ 17 - 11
src/test/receiver.ts

@@ -23,19 +23,24 @@ class Supervisor {
         this.event = new Subject()
         this.transmissionManager = new MessageTransmissionManager(this.event, this.isClient)
         this.sortTransportFromEnv(this.transportSet)
-        this.updateAdapterManagerForTransport(this.event)
         this.transportSet.forEach(transport => {
             this.setUpTransportService(transport, this.event, this.isClient)
         })
-
-        this.transmissionManager.subscribeForTransmission().subscribe((event: GeneralEvent<TransmissionInterface>) => {
-            let transmission: TransmissionInterface = event.data
-            this.console.log({ message: `Acquired transmission set for client ${transmission.clientId}` })
-            this.transmissionSets.push(transmission)
-
-            this.handleActivity(transmission)
-            this.outgoingPipe.subscribe(message => transmission.transmitter.emit(message))
-        })
+        this.tieInAdapterWithExistingTransportServices(this.event)
+
+        this.transmissionManager.subscribeForTransmission().pipe(
+            filter(event => event.type === `Transmission Event`),
+            filter(event => event.event === `New Transmission`)
+        ).subscribe(
+            ((event: GeneralEvent<TransmissionInterface>) => {
+                let transmission: TransmissionInterface = event.data
+                this.console.log({ message: `Acquired transmission set for client ${transmission.clientId}` })
+                this.transmissionSets.push(transmission)
+
+                this.handleActivity(transmission)
+                this.outgoingPipe.subscribe(message => transmission.transmitter.emit(message))
+            })
+        )
     }
 
     // only called once for each connected clients.
@@ -141,11 +146,12 @@ class Supervisor {
         this.console.log({ message: 'TransportSetList', details: this.transportSet })
     }
 
-    private updateAdapterManagerForTransport(eventBus: Subject<GeneralEvent<any>>): void {
+    private tieInAdapterWithExistingTransportServices(eventBus: Subject<GeneralEvent<any>>): void {
         const subscription: Subscription = eventBus.pipe(
             filter(event => event.type === `Adapter Event`),
             filter(event => event.event === `Adapter Manager Started`)
         ).subscribe((event: GeneralEvent<any>) => {
+            this.console.log({ message: `There's something going on here. ${event.event}` })
             eventBus.next({
                 id: uuidv4(),
                 type: `General Event`,

+ 2 - 0
src/test/transmitter.ts

@@ -34,11 +34,13 @@ class Supervisor {
 
     private startMessageTransmission(transmissionEvent: Subject<GeneralEvent<TransmissionInterface>>): void {
         this.transmissionManager.subscribeForTransmission().pipe(
+            filter(event => event.type === `Transmission Event`),
             filter(event => event.event == `New Transmission`)
         ).subscribe(event => {
             // update transmission record on every new transmission object instantiated
             this.transmissionSets.push(event.data as TransmissionInterface)
             // start message transmission for said transmission object instantiated
+            this.console.log({ message: `Received transmission object ${(event.data as TransmissionInterface).clientId}` })
             this.handleClientActivity(event.data as TransmissionInterface)
         })
     }

+ 10 - 3
src/transmission/msg.transmission.manager.ts

@@ -3,7 +3,7 @@ import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { AdapterManager } from "../adapters/adapter.manager";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
-import { filter, Observable, Observer, Subject, Unsubscribable } from "rxjs";
+import { filter, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs";
 import ConsoleLogger from "../utils/log.utils";
 import { TransmitterAdapter } from "../adapters/adapter.transmitter"
 import { ReceiverAdapter } from "../adapters/adapter.receiver"
@@ -26,11 +26,13 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
 
     public subscribeForTransmission(): Observable<GeneralEvent<TransmissionInterface>> {
         return new Observable((observer: Observer<GeneralEvent<TransmissionInterface>>) => {
-            this.event.pipe(
-                filter(event => event.event == 'New Client')
+            const subscription: Subscription = this.event.pipe(
+                filter(event => event.type === `Transport Event`),
+                filter(event => event.event === `New Client` || event.event === `New Server`)
             ).subscribe(event => {
                 // get all adapters for all the connection
                 let transmission: TransmissionInterface | undefined = this.instantiateTransmissionComponents(event?.data?.clientId, this.event)
+                this.console.log({ message: `Passing this transmission<${transmission.clientId}> to global event bus.` })
                 if (transmission) {
                     observer.next({
                         id: uuidv4(),
@@ -41,6 +43,11 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
                     })
                 }
             })
+
+            // Clean up on unsubscription
+            return () => {
+                subscription.unsubscribe();
+            };
         })
     }
 

+ 4 - 11
src/transmission/msg.transmission.transmitter.ts

@@ -20,6 +20,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     constructor(clientId: string, event: Subject<GeneralEvent<any>>) {
         super()
         this.console.log({ message: `Constructing Transmitter Transmission with ${clientId}` })
+        this.clientId = clientId
         this.event = event
         this.messageToBeTransmitted = new Subject()
         this.messageToBeBuffered = new Subject()
@@ -39,8 +40,9 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     private setupBuffer(): void {
         this.console.log({ message: `Setting up Retransmission Service...` })
         this.event.pipe(
-            filter(event => event.data.clientId == this.clientId),
+            filter(event => event.type === `Transport Event`),
             filter(event => event.event == 'Client Disconnected' || event.event == 'Client Re-connected' || event.event == 'Client Connected' || event.event == 'Server Disconnected' || event.event == 'Server Connected'),
+            filter(event => event.data.clientId == this.clientId),
             map(event => {
                 if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
                     return 'OFFLINE'
@@ -52,7 +54,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         ).subscribe((signal: ConnectionState) => {
             this.connectionStateEvent.next(signal)
             if (signal == 'OFFLINE') this.console.error({ message: `${this.clientId} disconnected` })
-            if (signal == 'ONLINE') this.console.log({ message: `${this.clientId} connected` })
+            if (signal == 'ONLINE') this.console.log({ message: `${this.clientId} connected and ready to go` })
         })
         this.buffer.implementRetransmission(this.messageToBeBuffered, this.connectionStateEvent.asObservable(), true)
         // automatically subscribe to allow released bffered messages to be released
@@ -108,15 +110,6 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         })
     }
 
-    // temporary logic for now. 
-    private setUpAdapter(): void {
-        if (!this.currentAdapter && this.adapters.some(adapter => adapter.transport === `Websocket`)) {
-            this.currentAdapter = this.adapters.find(adapter => adapter.transport === `Websocket`) as TransmitterAdapterInterface
-        } else {
-            this.console.error({ message: 'No websocket socket adapter avaiable' })
-        }
-    }
-
     private uniqueHandlerToFlushUnsentMessages(event: Observable<GeneralEvent<any>>): void {
         event.pipe(
             filter(event => event.event == 'Re-Flush'),

+ 1 - 1
src/utils/general.utils.ts

@@ -66,7 +66,7 @@ export async function writeFile(data: any, filename: string): Promise<boolean> {
     return new Promise((resolve, reject) => {
         // Ensure the folder exists
         const folderPath = process.env.FolderPath as string
-        console.log({ message: folderPath })
+        console.log({ message: folderPath + filename })
         // const folderPath = path.join(__dirname, folder);
         if (!fs.existsSync(folderPath)) {
             fs.mkdirSync(folderPath, { recursive: true }); // Create folder if it doesn't exist

+ 6 - 15
src/utils/socket.utils.ts

@@ -7,7 +7,7 @@ import { v4 as uuidv4 } from 'uuid'
 import { ConnectedSocketClient, ConnectedSocketServer } from '../transport/websocket';
 import ConsoleLogger from './log.utils';
 import path from 'path';
-import { ConnectionState, GeneralEvent, TransportMessage } from '../interface/interface';
+import { ClientObject, ConnectionState, GeneralEvent, TransportMessage } from '../interface/interface';
 import { addClientToDB, checkIfClientExists, checkOwnClientInfo, writeFile } from './general.utils';
 const console: ConsoleLogger = new ConsoleLogger(`SocketUtils`, ['transport'])
 
@@ -81,7 +81,7 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
                     })
                 })
             } else {
-                socket.emit('profile', { 
+                socket.emit('profile', {
                     name: 'New Client',
                     data: null
                 })
@@ -125,17 +125,14 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
                     transport: 'Websocket',
                     transportServiceId: transportServiceId
                 }
-                writeFile(data.message as ConnectedSocketServer, (data.message as ConnectedSocketServer).clientId).then(() => {
+                writeFile(data.message as ConnectedSocketServer, receiverProfileInfo.clientId).then(() => {
                     // broadcast event to allow transmission manager to instantiate transmission components
                     eventNotification.next({
                         id: uuidv4(),
                         type: 'Transport Event',
                         event: `New Server`,
                         date: new Date(),
-                        data: {
-                            clientId: (data.message as ConnectedSocketServer).clientId,
-                            message: `New Websocket Channel ${(data.message as ConnectedSocketServer).clientId} established.`
-                        },
+                        data: receiverProfileInfo,
                         transport: 'Websocket'
                     })
                     // broadcast event to allow retransmission to relase buffered messages
@@ -144,10 +141,7 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
                         type: 'Transport Event',
                         event: `Server Connected`,
                         date: new Date(),
-                        data: {
-                            clientId: (data.message as ConnectedSocketServer).clientId,
-                            message: `Server ${(data.message as ConnectedSocketServer).clientId} connected and ready to go.`
-                        },
+                        data: receiverProfileInfo,
                         transport: 'Websocket'
                     })
                 }).catch((error) => { }) // do nothing at the moment. 
@@ -169,10 +163,7 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
                         type: `Transport Event`,
                         event: 'Server Connected',
                         date: new Date(),
-                        data: {
-                            clientId: (data.message as ConnectedSocketServer).clientId,
-                            message: `Existing Websocket Channel ${(data.message as ConnectedSocketServer).clientId} re-established.`
-                        },
+                        data: clientObj,
                         transport: 'Websocket'
                     })
                 }