Kaynağa Gözat

reqeust response emulation and transport service update

Enzo 1 ay önce
ebeveyn
işleme
550c80e7c1

+ 29 - 37
doc/explanation.txt

@@ -1,35 +1,3 @@
-Latest change as of 12/11/2024
-Currently following the diagram from Chin. 
-
-So, right now, transmitter file, even though it can act as a receiver, will act as a server application. So it will instantiate a transmission manager first thing first, separating 
-from application logic. So when the transmission manager is instantiated, it will also instantiate the necessary transport service. Please DO NOTE that at the moment, the code is 
-hardcoded to only work with 1 type of transport and a singular transport, which is Websocket. I have not design a system where it can cycle through multiple transport. But can be
-enhanced later on. Going back to transmission manager, once the transprot service, aka socket service in this case is instantiated, it will start listening to events. Events like
-whether or not is there a new or old client connected, and whether or not is there any new requests or messages or notifications. There are many events going on, but for the 
-transmission manager, it's only concern is whether or not if it's a new client or old client. So, it's only subscribing to these two types of events, and based on these events, 
-let's say it's a new client has been established, it will instantiate the necessary transmission instances needed to transmit and receive message.
-Here's how it works. A new client is connected, transmission manager will talk to connection/adapter manager to instantiate the necessary adapters according to the transport services
-that are available. A set of transmission instance, which is transmitter, receiver and request response instances will be returned so that the applciation can start sending and
-receiving messages through the aforementioned instances. And the adapters acquired from adapter manager will be attached alongside with the transmission instances. 
-Retransmission will be implemented at the transmission level.
-
-Please note that the current code is only tested with one on one, which means only 1 server and 1 client, and it's a one way communication at the moment. I have yet to test it with
-multiple clients, but the current test is working so far with the retransmission. To emulate internet disruption without manually shutting down both the server and the client, I 
-came up with an idea of writing another middle-man. or a proxy if you would, to simluate internet disruption by just shutting off the proxy, so that the socket on each side of the 
-party can pick up the event, and notify/broadcast the information to the parties concerned. That's where the retransmission can shine.
-
-
-Things to do:
-i) Still need to test it out with multiple clients
-ii) Try it out with http as well, need to allocate time to code http services to emulate bi-directional streaming
-iii) Haven't try to use request-response. To be enhanced.
-iv) Implement dual roles: Eg => A server can be both transmitter and receiver. <Current Level only transmitter>
-v) Mutliple adapters implmentation. <TBD> (Need to discuss the logic to cycle through the transport and metrics measurement)
-
-
-Final i) General Documentation and it's associated components.
-Final ii) Clean Code (Usually this is done after prototype is functionally acceptable and stress tested.)
-
 As of 14/11/2024 <Since there's no slot for meeting for the rest of the week>
 Things to consider:
 i) Multi Client test. <Based on chin, this is out of my scope, but will do anyways.>
@@ -51,8 +19,32 @@ i) Explore multiple traffic concept
 ii) Move transport service instantiation to adapterManager
 ExtraNotes: In some cases, servers can only be transmitting. Although this program allows for dual roles if there's a need for me.
 
-Note for 19/11/2024:
-i) Move the instantiation of transport service to adapterManager
-ii) Test multi client. Make sure they can be recognized. Will have to have multi proxies to be set up.
-iii) Also test if the server can also receives instead of just transmitting, and responds accordingly.
-iv) Modify the code to start using wrapped messages to allow message ordering
+As of 20/11/2024
+Here's what I did yesterday. Succeeded in wrapping all the messages to implement the retransmisison as well as message ordering mechanism. I also focused on getting the receiver to implement retransmission on their side as well. Didn't get to test the message ordering, even though I enabled it. 
+I assumed it worked for now. Although I didn't really observe the test to see if it really works, but it was tested previously, so, I would assume it works for now. But the thing 
+about the message ordering is that, if the previous message never arrives, the current message that arrives will always be held hostage. Just to take note. So, with that being said,
+today I will be focus in on the following:
+i) Move the instantiation of transport serivce to adapterManager Side. <DONE>
+ii) Code and test for server to respond this time round. Make sure the receiver have all the required responses. (Can also observe for the message ordering.)
+-The message ordering may need to be fixed for the request response. It is not ideal to hold the last message, because that would mean that the receiver has to 
+make another action just for the message that has been held hostage to be released. Special clause should be enforced to allow the completion of a request
+iii) Test multi client. 
+
+Things to do:
+- Connection Manager to manage different transport options. Default using websocket, but will also consider fail detection on each transport and decide on adapters swap
+- Also need to cater for browser environment. Right the now, the default behaviour is that one would assume to instantiate a socket server, instead of client. 
+Need to cater for those cases too.
+
+Target for week:
+i) R&D for multi channel data traversal.
+-That means utilizing multiple TCP ports or network cards or transport services.
+-To be Prep via documents for discussion
+ii) Functional Http Service options to be made available.
+-Default transport will be geared towards socket at the moment.
+iii) Code Adjustments and Cleaning
+-Make sure the file structure and folders are in orderi
+-Necessary comments
+iv) Documentation
+-A special Readme file to help understand the usage and what it does.
+-Also guide for future enhancements
+ 

+ 58 - 0
src/connector/connector.manager.ts

@@ -4,13 +4,22 @@ import { ReceiverConnectionAdapter } from './connector.receiver'
 import { RequestResponseConnectionAdapter } from './connector.request.response'
 import { v4 as uuidv4 } from 'uuid'
 import { Subject } from "rxjs"
+import { WebsocketTransportService } from "../transport/websocket"
+import { HttpTransportService } from "../transport/http"
 export class ConnectionManager implements ConnectionManagerInterface {
+    private transportServiceArray: TransportService[] = []
+    private transportSet: Set<TransportSet> = new Set()
     private adapterSet: AdapterSet[] = []
     private event!: Subject<TransportEvent>
 
     constructor(event: Subject<TransportEvent>) {
         this.event = event
         console.log(`Connection Manager: Contructing ConnectionManager....`)
+
+        this.sort(this.transportSet)
+        this.transportSet.forEach(set => {
+            this.setUpTransportService(set, event)
+        })
     }
 
     getAdapter(clientId: string, transportService: TransportService): AdapterSet {
@@ -35,6 +44,55 @@ export class ConnectionManager implements ConnectionManagerInterface {
         return adapterSet
     }
 
+    public getTransportArray(): TransportService[] {
+        return this.transportServiceArray
+    }
+
+    
+    // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.
+    private setUpTransportService(transportSet: TransportSet, event: Subject<TransportEvent>): void {
+        this.instantiateTransportService(transportSet.transport, event).then((transportService: TransportService) => {
+            this.transportServiceArray.push(transportService)
+            if (transportService instanceof WebsocketTransportService) {
+                console.log(`Just Double Checking... this is websocket`)
+                transportService.startServer(transportSet.port);
+            } else if (transportService instanceof HttpTransportService) {
+                console.log(`Just Double Checking... this is http`)
+                transportService.startServer(transportSet.port);
+                // Additional Http-specific setup if needed.
+            }
+        }).catch((error) => { throw new Error(error) })
+    }
+
+    private async instantiateTransportService(transportType: Transport, event: Subject<TransportEvent>): Promise<TransportService> {
+        return new Promise((resolve, reject) => {
+            if (transportType == Transport.Websocket) {
+                resolve(new WebsocketTransportService(event))
+            }
+            else if (transportType == Transport.Http) {
+                resolve(new HttpTransportService(event))
+            } else {
+                reject(`No Transport Service is not properly instantiated`)
+            }
+        })
+    }
+
+    private sort(transportSet: Set<TransportSet>): void {
+        let transportList: string[] = process.env.Transport?.split(',') || []
+        let portList: number[] = (process.env.PORT?.split(',') || []).map(port => Number(port));
+        transportList.forEach((transport, index) => {
+            transportSet.add({ transport: transport, port: portList[index] } as unknown as TransportSet)
+        })
+    }
+
+
 
 
 }
+
+
+interface TransportSet {
+    transport: Transport,
+    port: number
+}
+

+ 2 - 2
src/connector/connector.receiver.ts

@@ -1,8 +1,8 @@
 import dotenv from 'dotenv';
 import { Bus, FisMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
-import { AdapterProfile, AdaptorTransmissionRole, ConnectionState, ReceiverConnectionAdapter as ReceiverConnectionAdapterInterface, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
-import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription, takeWhile } from 'rxjs';
+import { ReceiverConnectionAdapter as ReceiverConnectionAdapterInterface, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
+import { filter, Observable, Observer, Subscription, takeWhile } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
 
 dotenv.config();

+ 6 - 6
src/connector/connector.transmitter.ts

@@ -1,8 +1,8 @@
 import dotenv from 'dotenv';
-import { FisMessage, TransmissionMessage } from "../interface/transport.interface";
+import { FisMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
-import { AdapterProfile, AdaptorTransmissionRole, ConnectionState, TransmitterConnectionAdapter as TransmitterConnectionAdapterInterface, Transport, TransportEvent, TransportMessage, TransportService } from '../interface/connector.interface';
-import { BehaviorSubject, Observable, Observer, Subject } from 'rxjs';
+import { ConnectionState, TransmitterConnectionAdapter as TransmitterConnectionAdapterInterface, Transport, TransportMessage, TransportService } from '../interface/connector.interface';
+import { Subject } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
 import { WrappedMessage } from '../utils/message.ordering';
 
@@ -10,7 +10,7 @@ dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class TransmitterConnectionAdapter extends ConnectionAdapter implements TransmitterConnectionAdapterInterface {
-    connectionStateBus: Subject<ConnectionState> = new Subject() 
+    connectionStateBus: Subject<ConnectionState> = new Subject()
 
     constructor(adapterId: string, adapterType: Transport, transportService: TransportService) {
         super()
@@ -20,7 +20,7 @@ export class TransmitterConnectionAdapter extends ConnectionAdapter implements T
     }
 
     emit(message: FisMessage | WrappedMessage): void {
-       // logic here
+        // logic here
         this.connector.emit({
             id: this.connectorProfile.id,
             transport: this.connectorProfile.transportType,
@@ -35,7 +35,7 @@ export class TransmitterConnectionAdapter extends ConnectionAdapter implements T
             transportType: adapterType
         }
     }
-    
+
 }
 
 

+ 12 - 23
src/test/receiver.ts

@@ -35,7 +35,7 @@ class SocketClient {
 
     private startListening(event: Subject<TransportEvent>): void {
         event.subscribe((event: TransportEvent) => {
-            // console.log(event)
+            // console.log((event.data as EventMessage).payload ?? 'Not Fis Message')
             if (event.event == `New Server`) {
                 this.currentSocketId = (event.data as EventMessage).clientId
 
@@ -43,10 +43,10 @@ class SocketClient {
                 if (currentClientSocket) {
                     // so retransmission is working as usual
                     this.retransmission.implementRetransmission(this.requestMessages, currentClientSocket.connectionState, true)
-                    this.startGeneratingRequest(1000, this.requestMessages)
+                    this.startGeneratingRequest(10000, this.requestMessages)
                     this.retransmission.returnSubjectForBufferedItems().subscribe((message: WrappedMessage) => {
                         this.sendMessage(message).subscribe({
-                            next: message => console.log(message),
+                            next: response => console.log(`Receiving response for ${message.thisMessageID}`),
                             complete: () => console.log(`Request Completed for ${message.thisMessageID}`)
                         })
                     })
@@ -70,31 +70,20 @@ class SocketClient {
 
     private sendMessage(message: WrappedMessage): Observable<WrappedMessage> {
         return new Observable((response: Observer<WrappedMessage>) => {
-            console.log(`Emitting: ${message.thisMessageID}`)
+            console.log(`Emitting: ${(message.payload as FisMessage).header.messageID}`)
             this.socket.emit('message', message)
-            let onHold = new Subject<WrappedMessage>()
-            let toBeReleased = new Subject<WrappedMessage>()
 
-            let subscription: Subscription = toBeReleased.subscribe(message => {
-                if ((message.payload as FisMessage).data == 'Complete') {
-                    onHold.complete()
-                    toBeReleased.complete()
-                    subscription.unsubscribe()
-                    response.complete()
-                } else {
-                    response.next(message)
-                }
-            })
-
-            this.event.pipe(
+            let eventSubscription: Subscription = this.event.pipe(
                 filter(event => event.event == 'New Message'),
-                filter(event => ((event.data as EventMessage).payload as WrappedMessage).thisMessageID === message.thisMessageID),
-                takeWhile(event => (((event.data as EventMessage).payload as WrappedMessage).payload as FisMessage).data == 'Complete'),
+                filter(event => (((event.data as EventMessage).payload as WrappedMessage).payload as FisMessage).header.messageID === (message.payload as FisMessage).header.messageID),
+                // takeWhile(event => (((event.data as EventMessage).payload as WrappedMessage).payload as FisMessage).data != 'Complete'),
                 map(event => ((event.data as EventMessage).payload as WrappedMessage)),
             ).subscribe((message: WrappedMessage) => {
-                checkMessage(message, onHold).then(() => {
-                    toBeReleased.next(message)
-                })
+                response.next(message)
+                if ((message.payload as FisMessage).data == 'Complete') {
+                    eventSubscription.unsubscribe()
+                    response.complete()
+                }
             })
         })
     }

+ 7 - 7
src/test/transmitter.ts

@@ -1,5 +1,5 @@
 import { filter, interval, map, Observable, Observer, Subject, take } from "rxjs";
-import { Bus, FisMessage, MessageTransmission, TransmissionMessage } from "../interface/transport.interface";
+import { Bus, FisMessage, MessageTransmission } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { TransportEvent, TransportMessage } from "../interface/connector.interface";
@@ -28,7 +28,7 @@ class Supervisor {
     private handleClientActivity(messageTransmission: MessageTransmission): void {
         // start listening to incoming messages from this client
         messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransportEvent) => {
-            console.log(event) 
+            // console.log(event) 
             let requestMessage: FisMessage = ((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage
             this.clientIncomingMessage.next(requestMessage)
             this.messageProducer.getOutgoingMessages().pipe(
@@ -71,8 +71,8 @@ class MessageProducer {
     // this is called no problem
     private handleIncomingRequests(requests: Observable<FisMessage>, outgoingMessageBus: Subject<FisMessage>): void {
         requests.subscribe((request: FisMessage) => {
-            // console.log(`Generating response for new request ${request.header.messageID}`)
-            this.generateMessage(10).subscribe({
+            console.log(`Generating response for new request ${request.header.messageID}`)
+            this.generateMessage(request.header.messageID, 10).subscribe({
                 next: message => outgoingMessageBus.next(message),
                 error: error => console.error(error),
                 complete: () => {
@@ -88,14 +88,14 @@ class MessageProducer {
         })
     }
 
-    private generateMessage(amount: number): Observable<FisMessage> {
+    private generateMessage(requestID: string, amount: number): Observable<FisMessage> {
         return new Observable((response: Observer<FisMessage>) => {
             const intervalMessageGeneration = interval(1000).pipe(
                 take(amount), // Ensures only 'amount' messages are generated
                 map(() => {
                     const message: FisMessage = {
                         header: {
-                            messageID: uuidv4(),
+                            messageID: requestID,
                             messageName: 'ResponseMessage'
                         },
                         data: `Data`
@@ -110,7 +110,7 @@ class MessageProducer {
                 complete: () => {
                     response.next({
                         header: {
-                            messageID: uuidv4(),
+                            messageID: requestID,
                             messageName: 'ResponseMessage'
                         },
                         data: `Complete`

+ 4 - 54
src/transmission/msg.transmission.manager.ts

@@ -3,15 +3,12 @@ import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { ConnectionManager } from "../connector/connector.manager";
 import { EventMessage, MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmitterProfile } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
-import { AdapterSet, Transport, TransportEvent, TransportService, Event } from "../interface/connector.interface";
+import { AdapterSet, TransportEvent, Event } from "../interface/connector.interface";
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
 import { filter, Observable, Observer, Subject } from "rxjs";
-import { WebsocketTransportService } from "../transport/websocket";
-import { HttpTransportService } from "../transport/http";
 
 export class MessageTransmissionManager implements MessageTransmissionManagerInterface {
-    private transportServiceArray: TransportService[] = []
-    private transportSet: Set<TransportSet> = new Set()
+   
     transmission: MessageTransmission[] = []
     connectionManager!: ConnectionManager
     event!: Subject<TransportEvent>
@@ -22,12 +19,6 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
         this.event = event
         this.connectionManager = new ConnectionManager(this.event)
 
-        this.sort(this.transportSet)
-        this.transportSet.forEach(set => {
-            this.setUpTransportService(set, event)
-        })
-
-        // temporarily off for now
         // this.event.subscribe(event => console.log(`event`, event))
 
         // note that if this server is down, all these instances of transmission and connector would be lost as well. SO cannot just simply find "instances" and reuse them. Must reinstantiate them again
@@ -50,8 +41,8 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
 
     private instantiateComponents(clientId: string): MessageTransmission {
         let adapterSet: AdapterSet[] = []
-        if (this.transportServiceArray.length > 0) {
-            this.transportServiceArray.forEach(transport => {
+        if (this.connectionManager.getTransportArray().length > 0) {
+            this.connectionManager.getTransportArray().forEach(transport => {
                 adapterSet.push(this.connectionManager.getAdapter(clientId, transport))
             })
         } else {
@@ -97,42 +88,6 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
     }
 
 
-    // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.
-    private setUpTransportService(transportSet: TransportSet, event: Subject<TransportEvent>): void {
-        this.instantiateTransportService(transportSet.transport, event).then((transportService: TransportService) => {
-            this.transportServiceArray.push(transportService)
-            if (transportService instanceof WebsocketTransportService) {
-                console.log(`Just Double Checking... this is websocket`)
-                transportService.startServer(transportSet.port);
-            } else if (transportService instanceof HttpTransportService) {
-                console.log(`Just Double Checking... this is http`)
-                transportService.startServer(transportSet.port);
-                // Additional Http-specific setup if needed.
-            }
-        }).catch((error) => { throw new Error(error) })
-    }
-
-    private async instantiateTransportService(transportType: Transport, event: Subject<TransportEvent>): Promise<TransportService> {
-        return new Promise((resolve, reject) => {
-            if (transportType == Transport.Websocket) {
-                resolve(new WebsocketTransportService(event))
-            }
-            else if (transportType == Transport.Http) {
-                resolve(new HttpTransportService(event))
-            } else {
-                reject(`No Transport Service is not properly instantiated`)
-            }
-        })
-    }
-
-    private sort(transportSet: Set<TransportSet>): void {
-        let transportList: string[] = process.env.Transport?.split(',') || []
-        let portList: number[] = (process.env.PORT?.split(',') || []).map(port => Number(port));
-        transportList.forEach((transport, index) => {
-            transportSet.add({ transport: transport, port: portList[index] } as unknown as TransportSet)
-        })
-    }
-
 
     private handleEvent(eventName: Event, eventObs: Observable<TransportEvent>): void {
         eventObs.pipe(
@@ -158,8 +113,3 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
 }
 
 
-interface TransportSet {
-    transport: Transport,
-    port: number
-}
-

+ 0 - 25
src/utils/socket.utils.ts

@@ -168,31 +168,6 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
             }
         })
 
-        /* https://socket.io/docs/v3/client-socket-instance/
-            this is no differnt from the 'connect' handler */
-        // socket.on(`reconnect`, () => {
-        //     console.log('Connected to the server:', socket.id)
-        //     if (clientName) {
-        //         checkOwnClientInfo(clientName).then((profile: ConnectedServerSocket) => {
-        //             receiverProfileInfo = profile
-        //             socket.emit('profile', {
-        //                 name: 'Old Client',
-        //                 data: profile
-        //             })
-        //         }).catch((error) => {
-        //             socket.emit('profile', {
-        //                 name: 'New Client',
-        //                 data: null
-        //             })
-        //         })
-        //     } else {
-        //         socket.emit('profile', {
-        //             name: 'New Client',
-        //             data: null
-        //         })
-        //     }
-        // })
-
         // Handle disconnection
         socket.on('disconnect', () => {
             console.log('Websocket Client disconnected from the server');