Преглед на файлове

Code cleaning && Additonal Comments

enzo преди 1 месец
родител
ревизия
307d1a0b97

+ 0 - 15
doc/explanation.txt

@@ -1,14 +1,3 @@
-2 Jan 2025:
-Things to do: 
-- Get the socket transwmission working first.
-- Sepearate out the logic for receiver Identification, make it modular 
-- Put actor concept aside. That one will be a separate project
-
-
-
-
-
-
 Concept as of 5/12/2024:
 So, when an application run, it will instantiate the transmission manager that will in turn instantiate adapter manager. The adapter manager will turn first check the 
 configuration file, whether a config json file or .env depending on the environment, and set up the necessary transport services in order to be able to create the 
@@ -31,8 +20,6 @@ on the usage of the available adapters. The transmission components will also ha
 given by transmission manager. Essentially, transmission manager will keep itself up to date via the aforementioned event bus, and notify and return additional
 adapters if there are any. Adapter manager will basically keep an out or instantiate more circumstantially, and will update transmission manager via the event bus.
 
-
-
 Things to do:
 - Connection Manager to manage different transport options. Default using websocket, but will also consider fail detection on each transport and decide on adapters swap
 - Also need to cater for browser environment. Right the now, the default behaviour is that one would assume to instantiate a socket server, instead of client. 
@@ -95,5 +82,3 @@ As of 16/12/2024: (Monday)
 -If I manage to hash out all the remaining problems. Then proceed to do request response. Test with same app first, before developing for web UI version. it use fetch
 instead of axios, so there's going to be some tweaking. 
 
-
-Calm down, don't be stupid, it's just a game. Focus on what's I'm supposed to do today. 

+ 3 - 5
src/adapters/adapter.manager.ts

@@ -21,8 +21,6 @@ export class AdapterManager extends AdapterManagerBase {
     public subscribeForAdapters(): Observable<GeneralEvent<AdapterInterface>> {
         return new Observable((observer: Observer<GeneralEvent<AdapterInterface>>) => {
             const subscription: Subscription = this.event.pipe(
-                // filter(event => event.type === `Transport Event`),
-                // filter(event => event.event === 'New Client' || event.event === `New Server`)
                 filter(event => event.type === `General Event`),
                 filter(event => event.event === `New Transmission`),
             ).subscribe((event: GeneralEvent<TransmissionInterface>) => {
@@ -52,7 +50,7 @@ export class AdapterManager extends AdapterManagerBase {
     private connectToExistingTransport(event: Subject<GeneralEvent<any>>): void {
         this.event.pipe(
             filter(event => event.type === `General Event`),
-            filter(event => event.event === `Available Transport`)
+            filter(event => event.event === `Available Transport` || event.event === `New Transport`)
         ).subscribe((event: GeneralEvent<TransportServiceInterface[]>) => {
             let transportServices: TransportServiceInterface[] = event.data
             transportServices.forEach((transportService: TransportServiceInterface) => {
@@ -80,7 +78,8 @@ export class AdapterManager extends AdapterManagerBase {
             this.console.log({ message: `Transport Service Id: ${transportService.getInfo().transportServiceId} has been added to adapter's record.` })
         }
     }
-
+    /* At the moment, all adapters are instantiated based on which transport the client is connected. For multi type adapters instantiation logic for one client, this 
+    will be for future enhancements. */
     private instantiateAdapterComponents(clientRef: ClientObject): AdapterInterface[] {
         let transportService = this.transportServiceArray.find(obj => obj.getInfo().transportServiceId === clientRef.transportServiceId)
         if (transportService) {
@@ -94,7 +93,6 @@ export class AdapterManager extends AdapterManagerBase {
             this.console.error({ message: `Transport Service id ${clientRef.transportServiceId} not found. Unable to instantiate adapters.` })
             return []
         }
-
     }
 }
 

+ 0 - 1
src/adapters/adapter.receiver.ts

@@ -27,7 +27,6 @@ export class ReceiverAdapter extends AdapterBase {
             const subscription: Subscription = this.transportService.subscribeForEvent().pipe(
                 filter(event => event.type === `Transport Event`), 
                 filter((message: GeneralEvent<any>) => message.event === 'New Message'),
-                // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
                 filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).target == this.adapterId),
             ).subscribe((message: GeneralEvent<TransportMessage>) => {
                 this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })

+ 15 - 12
src/test/receiver.ts

@@ -21,25 +21,25 @@ class Supervisor {
 
     constructor() {
         this.event = new Subject()
+        // Start setting up existing transport based on .env file
         this.sortTransportFromEnv(this.transportSet)
         this.transportSet.forEach(transport => {
             this.setUpTransportService(transport, this.event, this.isClient)
         })
+        // once adapter manager is instantiated, it will attempt to connect to existing started transport
         this.tieInAdapterWithExistingTransportServices(this.event)
 
         this.transmissionManager = new MessageTransmissionManager(this.event, this.isClient)
         this.startMessageTransmission()
-        // testing
-        // this.event.subscribe(event => {
-        //     this.console.log({ message: `Supervisor Event: ${event.type} && ${event.event}` })
-        // })
     }
 
     private startMessageTransmission(): void {
+        // every new remote client connected, a new transmission object will be instantiated to allow message transmission
         this.transmissionManager.subscribeForTransmission().pipe(
             filter(event => event.type === `Transmission Event`),
             filter(event => event.event === `New Transmission`)
         ).subscribe((event: GeneralEvent<TransmissionInterface>) => {
+            // broadcast to indicate a transmission object is ready, this signal will be received by adapter manager to instantiate the releavnt adapters to be used
             this.event.next({
                 id: uuidv4(),
                 type: `General Event`,
@@ -49,12 +49,13 @@ class Supervisor {
             })
             let transmission: TransmissionInterface = event.data
             this.console.log({ message: `Acquired transmission set for client ${transmission.clientId}` })
+            // updating transmission records. No logic for keeping track of client state, as that will be handled exclusively by tranmistter transmission at the moment
             this.transmissionSets.push(transmission)
 
             this.handleClientActivity(transmission)
         })
     }
-    // only called once for each connected clients.
+
     private handleClientActivity(messageTransmission: TransmissionInterface): void {
         // start listening to incoming messages from this client
         messageTransmission.receiver.getIncoming().subscribe((event: GeneralEvent<any>) => {
@@ -62,10 +63,12 @@ class Supervisor {
             this.generalBus.next(event)
         })
 
+        // for all the responses or messages to be emitted on the provider perspective
         this.outgoingPipe.subscribe((message: FisMessage) => {
             messageTransmission.transmitter.emit(message)
         })
 
+        // test sample to simulate request response, but not using request response transmission
         let request: FisMessage = {
             header: {
                 messageID: uuidv4(),
@@ -73,12 +76,12 @@ class Supervisor {
             },
             data: 'Data'
         }
+        // this.request(request, messageTransmission).subscribe({
+        //     next: res => this.console.log({ message: `received ${res.header.messageID}`, details: res }),
+        //     complete: () => this.console.log({ message: `Responses Completed for request: ${request.header.messageID}` })
+        // })
 
-        this.request(request, messageTransmission).subscribe({
-            next: res => this.console.log({ message: `received ${res.header.messageID}`, details: res }),
-            complete: () => this.console.log({ message: `Responses Completed for request: ${request.header.messageID}` })
-        })
-
+        // test sample to stream new messages every second
         // this.startGeneratingRequest(1000, this.outgoingPipe)
     }
 
@@ -113,8 +116,6 @@ class Supervisor {
         })
     }
 
-
-    // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.
     private setUpTransportService(transportSet: TransportSet, event: Subject<GeneralEvent<any>>, isClient?: boolean): void {
         try {
             let transportService: TransportServiceInterface = this.instantiateTransportService(transportSet.transport, event)
@@ -152,6 +153,7 @@ class Supervisor {
         }
     }
 
+    // just to re-arrange the list of transport servicce from env. Of course, varying modules will have varying ways to start up their transport respectively. TO adapter later
     private sortTransportFromEnv(transportSet: TransportSet[]): void {
         let transportList: string[] = process.env.Transport?.split(',') || []
         let portList: number[] = (process.env.PORT?.split(',') || []).map(port => Number(port));
@@ -161,6 +163,7 @@ class Supervisor {
         this.console.log({ message: 'TransportSetList', details: this.transportSet })
     }
 
+    // A method to broadcast avaible tranpsort to adapterManager the available transport to be connected after the adapter manager has been instantiated
     private tieInAdapterWithExistingTransportServices(eventBus: Subject<GeneralEvent<any>>): void {
         const subscription: Subscription = eventBus.pipe(
             filter(event => event.type === `Adapter Event`),

+ 3 - 3
src/test/transmitter.ts

@@ -72,9 +72,9 @@ class Supervisor {
         })
 
         // to emulate general notification. Send every second
-        // this.messageProducer.getNotificationMessage().subscribe((message: FisMessage) => {
-        //     messageTransmission.transmitter.emit(message)
-        // })
+        this.messageProducer.getNotificationMessage().subscribe((message: FisMessage) => {
+            messageTransmission.transmitter.emit(message)
+        })
     }
 
     // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.

+ 3 - 2
src/transmission/msg.transmission.manager.ts

@@ -16,9 +16,8 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
         if (browserEnv) this.browserEnv = browserEnv
         this.console.log({ message: `Contructing self... ${this.browserEnv ? `is receiving end` : `is not browser env`}` })
         this.event = event
-        // Subscribe for adapterManager and it's relevent event
+        // Instantiate and subscribe for adapterEvent
         this.adapterManager = new AdapterManager(event, this.browserEnv)
-        // this.adapterManager.subscribeForAdapters().subscribe(this.event)
         this.adapterManager.subscribeForAdapters().subscribe((adapterEvent: GeneralEvent<AdapterInterface>) => {
             this.event.next(adapterEvent)
         })
@@ -34,6 +33,8 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
                 let transmission: TransmissionInterface | undefined = this.instantiateTransmissionComponents(event.data, this.event)
                 this.console.log({ message: `Acknowledged new client: ${transmission.clientId}. Instantiated Transmission Components.` })
                 if (transmission) {
+                    // reason for this, is because the subscribe for adapter Events is not pushed into the global event
+                    // This is needed also for relevant supervisitory components to be notified that the ransmission for the particular client is ready to go
                     observer.next({
                         id: uuidv4(),
                         type: `Transmission Event`,

+ 1 - 0
src/transmission/msg.transmission.receiver.ts

@@ -46,6 +46,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         })
     }
 
+    /* Assigned and update adapters record. Currently no logic to swtich adapters based on performance or whatever logic to be integrated in the future */
     private handleAdapters(adapterEvent: Observable<GeneralEvent<any>>): void {
         adapterEvent.pipe(
             filter(event => event.type === `Adapter Event`),

+ 5 - 1
src/transmission/msg.transmission.transmitter.ts

@@ -32,6 +32,8 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         this.messageToBeBuffered.next(message)
     }
 
+    /* After setting up, will listen specifically to the connection state of this particular remote client. So that, the buffer signal can be
+    established to allow the buffer to do their thing. */
     private setupBuffer(): void {
         this.console.log({ message: `Setting up Retransmission Service...` })
         this.event.pipe(
@@ -59,14 +61,15 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
             if (this.currentAdapter) {
                 this.currentAdapter.emit(bufferedMessage)
             } else {
+                // just flush back the message inside the buffer, if the adapter is not ready or assigned.
                 this.messageToBeBuffered.next(bufferedMessage)
                 this.console.error({ message: `Adapter is not set. Please ensure adapters are ready. Message ${(bufferedMessage.payload as FisMessage).header.messageID} is flushed back into buffer.` })
             }
         })
     }
 
+    // hardcode it to use the first adapter added for now. Haven't decide on the logic to dynamically switch them adapters
     private handleAdapters(adaptersEvent: Observable<GeneralEvent<any>>): void {
-        // hardcode it to use the first adapter added for now. Haven't decide on the logic to dynamically switch them adapters
         adaptersEvent.pipe(
             filter(event => event.type === `Adapter Event`),
             filter(event => event.event === `New Adapter`),
@@ -87,6 +90,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         })
     }
 
+    // this is for http only. Please ignore for now.
     private uniqueHandlerToFlushUnsentMessages(event: Observable<GeneralEvent<any>>): void {
         event.pipe(
             filter(event => event.event == 'Re-Flush'),

+ 1 - 2
src/transport/websocket.ts

@@ -13,7 +13,6 @@ export class WebsocketTransportService implements TransportServiceInterface {
     private info!: TransportServiceProfile
     private connectedSocketServer: ConnectedSocketServer[] = [] // to allow the possibility of having to communicate with multiple servers as a client
     private connectedClientSocket: ConnectedSocketClient[] = [] // to keep track of the all the clients that are connected
-    // private incomingMessage: Subject<TransportMessage> = new Subject() // this is only for client roles only atm
     private event!: Subject<GeneralEvent<any>>
 
     constructor(event: Subject<GeneralEvent<any>>) {
@@ -54,7 +53,6 @@ export class WebsocketTransportService implements TransportServiceInterface {
         this.console.log({ message: `Emitting: ${((message.payload as WrappedMessage).payload as FisMessage).header.messageID} to ${message.target}`, details: message })
         let clientObj: ConnectedSocketClient | undefined = this.connectedClientSocket.find(obj => obj.clientId === message.target)
         let serverObj: ConnectedSocketServer | undefined = this.connectedSocketServer.find(obj => obj.clientId === message.target)
-        // this.console.log({ message: `${serverObj?.connectionState.getValue(), serverObj?.id}` })
         // for server usage
         if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
             clientObj.socketInstance.emit(`message`, message.payload)
@@ -65,6 +63,7 @@ export class WebsocketTransportService implements TransportServiceInterface {
         }
     }
 
+    // this is for incoming messages. ALthough it's generally the same bus as the global event bus. 
     public subscribeForEvent(): Observable<GeneralEvent<any>> {
         return this.event.asObservable()
     }

+ 6 - 13
src/utils/socket.utils.ts

@@ -126,6 +126,7 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
                     transportServiceId: transportServiceId
                 }
                 writeFile(data.message as ConnectedSocketServer, receiverProfileInfo.clientId).then(() => {
+                    /* Note that there are two separate events, because transmission must first be set up before releasing buffer. */
                     // broadcast event to allow transmission manager to instantiate transmission components
                     eventNotification.next({
                         id: uuidv4(),
@@ -172,6 +173,7 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
                 console.log({ message: `Server cannot find credentials`, details: data.message })
                 // logic to request for new credentials
                 setTimeout(() => {
+                    // for now, if clent cannot be recognize, then we will just proceed as a new client
                     socket.emit('profile', {
                         name: 'New Client',
                         data: null
@@ -189,10 +191,7 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
                     type: `Transport Event`,
                     event: `Server Disconnected`,
                     date: new Date(),
-                    data: {
-                        clientId: receiverProfileInfo.clientId,
-                        message: `Server for Channel ${receiverProfileInfo.clientId} disconnected.`
-                    },
+                    data: receiverProfileInfo,
                     transport: `Websocket`
                 })
                 receiverProfileInfo.connectionState.next(`OFFLINE`)
@@ -204,7 +203,7 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
 // For SERVER Usage: set up socket listeners to start listening for different events
 export function handleNewSocketClient(transportServiceId: string, socket: SocketForConnectedClient, connectedClientSocket: ConnectedSocketClient[]): Observable<GeneralEvent<any>> {
     return new Observable((event: Observer<GeneralEvent<any>>) => {
-        console.log({ message: `New sopcket client connected. Setting up listeners for socket:${socket.id}` })
+        console.log({ message: `Socket client connected. Setting up listeners for socket:${socket.id}` })
         // returns the socket client instance 
         // listen to receiver's initiotion first before assigning 'credentials'
         socket.on(`profile`, (message: { name: string, data: any }) => {
@@ -251,7 +250,7 @@ export function handleNewSocketClient(transportServiceId: string, socket: Socket
                 }
                 function handleFoundClient(clientInstance: ConnectedSocketClient | undefined) {
                     if (clientInstance) {
-                        console.log({ message: `Socket Client ${clientInstance.clientId} Found` })
+                        console.log({ message: `Socket Client ${clientInstance.clientId} Found. This is a previously connected client.` })
                         socket.emit('profile', { name: 'Adjusted Profile', message: { id: clientInstance.clientId } })
                         // replace socket instance since the previous has been terminated
                         clientInstance.socketInstance = socket
@@ -311,13 +310,7 @@ export function startListening(socket: SocketForConnectedClient, client: Connect
             type: `Transport Event`,
             event: 'Client Disconnected',
             date: new Date(),
-            data: {
-                clientId: client.clientId,
-                message: '',
-                payload: {
-                    time: new Date()
-                }
-            },
+            data: client,
             transport: 'Websocket'
         })
         eventListener.error(`Client ${client.clientId} disconnected. Terminating this observable event for this client socket...`)