Procházet zdrojové kódy

Enhancements for message transmission to be universally implemented across different Env. In this update, specifically for UI.

Enzo před 1 měsícem
rodič
revize
32290cf257

+ 5 - 0
dist/config/config.json

@@ -0,0 +1,5 @@
+{
+    "connection": {
+        "transmitter": "http://localhost:3001/"
+    }
+}

+ 5 - 3
doc/explanation.txt

@@ -20,7 +20,7 @@ ii) Move transport service instantiation to adapterManager
 ExtraNotes: In some cases, servers can only be transmitting. Although this program allows for dual roles if there's a need for me.
 
 For 21/11/2024 (Thursday):
-i) Test multi client. 
+i) Test multi client.  <DONE>
 -This is probelmatic. Observation: One client, but the all the transmission stop, but it shouldn't be.
 ii) Do include in the list of discussion for the message ordering mechanism
 - Currently the way it is working is not favorable. In the mean time, do think up suggestion to improve the ordering.
@@ -29,6 +29,7 @@ iii) Start doing some R&D.
 -details down there
 iv) Also, enhancements to cater for UI side to use this mesasge transmission interface.  [MAJOR OVERHAUL]
 - That means when instantiating message transmission, the browser environment must be specified to use socket client instead of starting a server. 
+- Need to redo all the test again, (request-response emulation && multi Client Test)
 
 Things to do:
 - Connection Manager to manage different transport options. Default using websocket, but will also consider fail detection on each transport and decide on adapters swap
@@ -38,7 +39,7 @@ Need to cater for those cases too.
 Target for week:
 i) R&D for multi channel data traversal.
 -That means utilizing multiple TCP ports or network cards or transport services.
--To be Prep via documents for discussion
+-To be Prep via documents for discussion 
 ii) Functional Http Service options to be made available.
 -Default transport will be geared towards socket at the moment.
 iii) Code Adjustments and Cleaning
@@ -47,4 +48,5 @@ iii) Code Adjustments and Cleaning
 iv) Documentation
 -A special Readme file to help understand the usage and what it does.
 -Also guide for future enhancements
- 
+ 
+DONT  FORGET TO UPDATE SPICEWORK!!!!!!

+ 5 - 0
src/config/config.json

@@ -0,0 +1,5 @@
+{
+    "connection": {
+        "transmitter": "http://localhost:3001/"
+    }
+}

+ 1 - 1
src/connector/connector.base.ts

@@ -1,6 +1,6 @@
 import { BehaviorSubject, Observable, Observer, Subject, Subscriber, Unsubscribable } from "rxjs";
 import dotenv from 'dotenv';
-import { AdapterProfile, AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionState, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
+import { AdapterProfile, AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionState, Transport, TransportEvent,  TransportService } from "../interface/connector.interface";
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers

+ 13 - 6
src/connector/connector.manager.ts

@@ -6,19 +6,21 @@ import { v4 as uuidv4 } from 'uuid'
 import { Subject } from "rxjs"
 import { WebsocketTransportService } from "../transport/websocket"
 import { HttpTransportService } from "../transport/http"
-export class ConnectionManager implements ConnectionManagerInterface {
+import config from '../config/config.json';
+
+console.log(config);export class ConnectionManager implements ConnectionManagerInterface {
     private transportServiceArray: TransportService[] = []
     private transportSet: Set<TransportSet> = new Set()
     private adapterSet: AdapterSet[] = []
     private event!: Subject<TransportEvent>
 
-    constructor(event: Subject<TransportEvent>) {
+    constructor(event: Subject<TransportEvent>, browserEnv?: boolean | undefined) {
         this.event = event
         console.log(`Connection Manager: Contructing ConnectionManager....`)
 
         this.sort(this.transportSet)
         this.transportSet.forEach(set => {
-            this.setUpTransportService(set, event)
+            this.setUpTransportService(set, event, browserEnv)
         })
     }
 
@@ -48,14 +50,19 @@ export class ConnectionManager implements ConnectionManagerInterface {
         return this.transportServiceArray
     }
 
-    
+
     // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.
-    private setUpTransportService(transportSet: TransportSet, event: Subject<TransportEvent>): void {
+    private setUpTransportService(transportSet: TransportSet, event: Subject<TransportEvent>, browserEnv?: boolean): void {
         this.instantiateTransportService(transportSet.transport, event).then((transportService: TransportService) => {
             this.transportServiceArray.push(transportService)
             if (transportService instanceof WebsocketTransportService) {
                 console.log(`Just Double Checking... this is websocket`)
-                transportService.startServer(transportSet.port);
+                if(browserEnv) {
+                    // please note this is subject to change depending on the UI environemnt. Angular has their own built in function to read json file based on Swopt-UI
+                    transportService.startClient(config.connection.transmitter) 
+                } else {
+                    transportService.startServer(transportSet.port);
+                }
             } else if (transportService instanceof HttpTransportService) {
                 console.log(`Just Double Checking... this is http`)
                 transportService.startServer(transportSet.port);

+ 44 - 78
src/test/receiver.ts

@@ -1,98 +1,64 @@
-/*  This is to emulate another remote process also using socket io to connect.
-TEST: to see if it performs the necessary self check to identify itself, as well as 
-receiving all the notification and response messages */
-
-// Import the necessary modules
-import { io, Socket } from "socket.io-client";
-import { handleClientSocketConnection } from "../utils/socket.utils";
-import { ConnectedServerSocket } from "../transport/websocket";
+import { filter, map, Observable, Observer, Subject } from "rxjs";
+import { Bus, FisMessage, MessageTransmission } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
-import { filter, interval, map, Observable, Observer, Subject, Subscription, takeWhile } from "rxjs";
-import { TransportEvent } from "../interface/connector.interface";
-import { EventMessage, FisMessage } from "../interface/transport.interface";
-import { checkMessage, WrappedMessage } from "../utils/message.ordering";
-import { RetransmissionService } from "../utils/retransmission.service";
+import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
+import { TransportEvent, TransportMessage } from "../interface/connector.interface";
+import { WrappedMessage } from "../utils/message.ordering";
 
-class SocketClient {
-    private currentSocketId!: string
-    private socket!: Socket;
-    private connectedServerSocket: ConnectedServerSocket[] = []
-    private requestMessages: Subject<any> = new Subject()
+class Supervisor {
+    private isClient: boolean = true
+    private transmissionManager!: MessageTransmissionManager
     private event: Subject<TransportEvent> = new Subject()
-    private retransmission: RetransmissionService = new RetransmissionService()
+    private transmissionSets: MessageTransmission[] = []
 
-    constructor(url: string) {
-        this.setUpClientServerConnection(url)
-    }
+    constructor() {
+        this.transmissionManager = new MessageTransmissionManager(this.event, this.isClient)
 
-    private setUpClientServerConnection(url: string) {
-        // Connect to the serve
-        this.socket = io(url);
-        // use the existing socket handler
-        handleClientSocketConnection(this.socket, this.connectedServerSocket).subscribe(this.event)
-        this.startListening(this.event)
-    }
+        this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmission) => {
+            this.transmissionSets.push(transmissionSet)
 
-    private startListening(event: Subject<TransportEvent>): void {
-        event.subscribe((event: TransportEvent) => {
-            console.log('Event', (((event.data as EventMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID ?? 'Not Fis Message')
-            if (event.event == `New Server`) {
-                this.currentSocketId = (event.data as EventMessage).clientId
-
-                let currentClientSocket: ConnectedServerSocket | undefined = this.connectedServerSocket.find(obj => obj.id === this.currentSocketId)
-                if (currentClientSocket) {
-                    // so retransmission is working as usual
-                    this.retransmission.implementRetransmission(this.requestMessages, currentClientSocket.connectionState, true)
-                    // this.startGeneratingRequest(10000, this.requestMessages)
-                    this.retransmission.returnSubjectForBufferedItems().subscribe((message: WrappedMessage) => {
-                        this.sendMessage(message).subscribe({
-                            next: response => console.log(`Receiving response for ${message.thisMessageID}`),
-                            complete: () => console.log(`Request Completed for ${message.thisMessageID}`)
-                        })
-                    })
-                }
-            }
+            this.handleActivity(transmissionSet)
         })
     }
 
-    private startGeneratingRequest(intervalDuration: number, requestsPipe: Subject<FisMessage>) {
-        interval(intervalDuration).subscribe(time => {
-            let message: FisMessage = {
-                header: {
-                    messageID: uuidv4(),
-                    messageName: 'RequestMessage'
-                },
-                data: 'Data'
-            }
-            requestsPipe.next(message)
+    // only called once for each connected clients.
+    private handleActivity(messageTransmission: MessageTransmission): void {
+        // start listening to incoming messages from this client
+        messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransportEvent) => {
+            console.log(`General Bus`, event)
         })
-    }
 
-    private sendMessage(message: WrappedMessage): Observable<WrappedMessage> {
-        return new Observable((response: Observer<WrappedMessage>) => {
-            console.log(`Emitting: ${(message.payload as FisMessage).header.messageID}`)
-            this.socket.emit('message', message)
+        let request: FisMessage = {
+            header: {
+                messageID: uuidv4(),
+                messageName: 'RequestMessage'
+            },
+            data: 'Data'
+        }
+
+        // this.request(request, messageTransmission).subscribe({
+        //     next: res => console.log(res),
+        //     complete: () => console.log(`Responses Completed for request: ${request.header.messageID}`)
+        // })
+    }
 
-            let eventSubscription: Subscription = this.event.pipe(
+    private request(request: FisMessage, messageTransmission: MessageTransmission): Observable<any> {
+        return new Observable((response: Observer<any>) => {
+            messageTransmission.transmitter.emit(request)
+            messageTransmission.receiver.getMessageBus(Bus.GeneralBus).pipe(
                 filter(event => event.event == 'New Message'),
-                filter(event => (((event.data as EventMessage).payload as WrappedMessage).payload as FisMessage).header.messageID === (message.payload as FisMessage).header.messageID),
-                // takeWhile(event => (((event.data as EventMessage).payload as WrappedMessage).payload as FisMessage).data != 'Complete'),
-                map(event => ((event.data as EventMessage).payload as WrappedMessage)),
-            ).subscribe((message: WrappedMessage) => {
-                response.next(message)
-                if ((message.payload as FisMessage).data == 'Complete') {
-                    eventSubscription.unsubscribe()
+                filter(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID === request.header.messageID),
+                map(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage))
+            ).subscribe(message => {
+                if (message.data == 'Complete') {
                     response.complete()
+                } else {
+                    response.next(message)
                 }
             })
         })
     }
-}
 
-// Usage example:
-const client = new SocketClient("http://localhost:3001");
-// const client = new SocketClient("http://localhost:3002");
-// const client = new SocketClient("http://127.0.0.1:3000");
-// const client = new SocketClient("http://192.168.100.96:3000");
+}
 
-// Send a message
+let supervisor = new Supervisor()

+ 98 - 0
src/test/receiver.txt

@@ -0,0 +1,98 @@
+/*  This is to emulate another remote process also using socket io to connect.
+TEST: to see if it performs the necessary self check to identify itself, as well as 
+receiving all the notification and response messages */
+
+// Import the necessary modules
+import { io, Socket } from "socket.io-client";
+import { handleClientSocketConnection } from "../utils/socket.utils";
+import { ConnectedServerSocket } from "../transport/websocket";
+import { v4 as uuidv4 } from 'uuid'
+import { filter, interval, map, Observable, Observer, Subject, Subscription, takeWhile } from "rxjs";
+import { TransportEvent } from "../interface/connector.interface";
+import { EventMessage, FisMessage } from "../interface/transport.interface";
+import { checkMessage, WrappedMessage } from "../utils/message.ordering";
+import { RetransmissionService } from "../utils/retransmission.service";
+
+class SocketClient {
+    private currentSocketId!: string
+    private socket!: Socket;
+    private connectedServerSocket: ConnectedServerSocket[] = []
+    private requestMessages: Subject<any> = new Subject()
+    private event: Subject<TransportEvent> = new Subject()
+    private retransmission: RetransmissionService = new RetransmissionService()
+
+    constructor(url: string) {
+        this.setUpClientServerConnection(url)
+    }
+
+    private setUpClientServerConnection(url: string) {
+        // Connect to the serve
+        this.socket = io(url);
+        // use the existing socket handler
+        handleClientSocketConnection(this.socket, this.connectedServerSocket).subscribe(this.event)
+        this.startListening(this.event)
+    }
+
+    private startListening(event: Subject<TransportEvent>): void {
+        event.subscribe((event: TransportEvent) => {
+            console.log('Event', (((event.data as EventMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID ?? 'Not Fis Message')
+            if (event.event == `New Server`) {
+                this.currentSocketId = (event.data as EventMessage).clientId
+
+                let currentClientSocket: ConnectedServerSocket | undefined = this.connectedServerSocket.find(obj => obj.id === this.currentSocketId)
+                if (currentClientSocket) {
+                    // so retransmission is working as usual
+                    this.retransmission.implementRetransmission(this.requestMessages, currentClientSocket.connectionState, true)
+                    // this.startGeneratingRequest(10000, this.requestMessages)
+                    this.retransmission.returnSubjectForBufferedItems().subscribe((message: WrappedMessage) => {
+                        this.sendMessage(message).subscribe({
+                            next: response => console.log(`Receiving response for ${message.thisMessageID}`),
+                            complete: () => console.log(`Request Completed for ${message.thisMessageID}`)
+                        })
+                    })
+                }
+            }
+        })
+    }
+
+    private startGeneratingRequest(intervalDuration: number, requestsPipe: Subject<FisMessage>) {
+        interval(intervalDuration).subscribe(time => {
+            let message: FisMessage = {
+                header: {
+                    messageID: uuidv4(),
+                    messageName: 'RequestMessage'
+                },
+                data: 'Data'
+            }
+            requestsPipe.next(message)
+        })
+    }
+
+    private sendMessage(message: WrappedMessage): Observable<WrappedMessage> {
+        return new Observable((response: Observer<WrappedMessage>) => {
+            console.log(`Emitting: ${(message.payload as FisMessage).header.messageID}`)
+            this.socket.emit('message', message)
+
+            let eventSubscription: Subscription = this.event.pipe(
+                filter(event => event.event == 'New Message'),
+                filter(event => (((event.data as EventMessage).payload as WrappedMessage).payload as FisMessage).header.messageID === (message.payload as FisMessage).header.messageID),
+                // takeWhile(event => (((event.data as EventMessage).payload as WrappedMessage).payload as FisMessage).data != 'Complete'),
+                map(event => ((event.data as EventMessage).payload as WrappedMessage)),
+            ).subscribe((message: WrappedMessage) => {
+                response.next(message)
+                if ((message.payload as FisMessage).data == 'Complete') {
+                    eventSubscription.unsubscribe()
+                    response.complete()
+                }
+            })
+        })
+    }
+}
+
+// Usage example:
+const client = new SocketClient("http://localhost:3001");
+// const client = new SocketClient("http://localhost:3002");
+// const client = new SocketClient("http://127.0.0.1:3000");
+// const client = new SocketClient("http://192.168.100.96:3000");
+
+// Send a message

+ 11 - 9
src/transmission/msg.transmission.manager.ts

@@ -8,16 +8,17 @@ import { MessageTransmissionRequestResponse } from "./msg.transmission.request-r
 import { filter, Observable, Observer, Subject } from "rxjs";
 
 export class MessageTransmissionManager implements MessageTransmissionManagerInterface {
-   
+    private browserEnv!: boolean
     transmission: MessageTransmission[] = []
     connectionManager!: ConnectionManager
     event!: Subject<TransportEvent>
 
-    constructor(event: Subject<TransportEvent>) {
+    constructor(event: Subject<TransportEvent>, browserEnv?: boolean) {
+        if (browserEnv) this.browserEnv = browserEnv
         // logic here
         console.log(`TransmissionManager: Contructing Transmission Manager...`)
         this.event = event
-        this.connectionManager = new ConnectionManager(this.event)
+        this.connectionManager = new ConnectionManager(this.event, browserEnv)
 
         // this.event.subscribe(event => console.log(`event`, event))
 
@@ -30,8 +31,9 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
     Transmitter only have to call this once. */
     subscribe(): Observable<MessageTransmission> {
         return new Observable((observer: Observer<MessageTransmission>) => {
+            const targetEvent: Event = this.browserEnv ? 'New Server' : 'New Client';
             this.event.pipe(
-                filter(event => event.event == 'New Client')
+                filter(event => event.event == targetEvent)
             ).subscribe(event => {
                 // get all adapters for all the connection
                 observer.next(this.instantiateComponents((event.data as EventMessage).clientId))
@@ -40,7 +42,7 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
     }
 
     private instantiateComponents(clientId: string): MessageTransmission {
-        console.log(`Instantiating new transmission set for another CLient`)
+        console.log(`Instantiating new transmission set for another ${this.browserEnv ? 'Server' : 'Client'}`)
         let adapterSet: AdapterSet[] = []
         if (this.connectionManager.getTransportArray().length > 0) {
             this.connectionManager.getTransportArray().forEach(transport => {
@@ -95,10 +97,10 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
             filter((event: TransportEvent) => event.event === eventName)
         ).subscribe(event => {
             // assuming this is reconnection case
-            if(event.event == 'Client Reconnected') {
+            if (event.event == 'Client Reconnected') {
                 this.reconnectionHandler((event.data as EventMessage).clientId)
-            } 
-            
+            }
+
             // can include more event handlers here
         })
     }
@@ -110,7 +112,7 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
             this.transmission.push(transmission)
         }
     }
-    
+
 }
 
 

+ 6 - 3
src/transport/websocket.ts

@@ -3,8 +3,6 @@ import { Socket as ClientSocket } from 'socket.io-client'
 import { Socket as SocketForConnectedClient } from "socket.io"
 import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
 import { ClientObject, ConnectionState, Info, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
-import { error } from "console";
-import { subscribe } from "diagnostics_channel";
 
 /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
 export class WebsocketTransportService implements TransportService {
@@ -47,12 +45,17 @@ export class WebsocketTransportService implements TransportService {
     }
 
 
-    // for transmission(Server Only, not applicable for client Socket)
     public emit(message: TransportMessage): void {
         let clientObj: ConnectedClientSocket | undefined = this.connectedClientSocket.find(obj => obj.id == message.target)
+        let serverObj: ConnectedServerSocket | undefined = this.connectedServer.find(obj => obj.id === message.target)
+        // for server usage
         if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
             clientObj.socketInstance.emit(`message`, message.payload)
         }
+        // for client usage
+        if (serverObj && serverObj.connectionState.getValue() == 'ONLINE') {
+            serverObj.socketInstance.emit(`message`, message.payload)
+        }
     }
 
     // this returns the ref pointer for the TransportEvent instantiated at Supervisor. Socket will broadcast incoming messages as event

+ 6 - 3
src/utils/socket.utils.ts

@@ -96,9 +96,12 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                     id: uuidv4(),
                     event: 'New Message',
                     data: {
-                        clientId: receiverProfileInfo.id,
+                        id: uuidv4(),
+                        dateCreated: new Date(),
+                        transport: Transport.Websocket,
+                        target: receiverProfileInfo.id,
                         payload: msg
-                    } as EventMessage
+                    } as TransportMessage
                 })
             } else {
                 // Do nothing. just store in local array first. Cannot process without information. but then again, don['t need information if acting as client
@@ -170,7 +173,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
 
         // Handle disconnection
         socket.on('disconnect', () => {
-            console.log('Websocket Client disconnected from the server');
+            console.error(`Socket Server ${receiverProfileInfo.id} Disconnected`)
             if (receiverProfileInfo) {
                 eventNotification.next({
                     id: uuidv4(),

+ 1 - 1
tsconfig.json

@@ -36,7 +36,7 @@
     // "resolvePackageJsonExports": true,                /* Use the package.json 'exports' field when resolving package imports. */
     // "resolvePackageJsonImports": true,                /* Use the package.json 'imports' field when resolving imports. */
     // "customConditions": [],                           /* Conditions to set in addition to the resolver-specific defaults when resolving imports. */
-    // "resolveJsonModule": true,                        /* Enable importing .json files. */
+    "resolveJsonModule": true,                        /* Enable importing .json files. */
     // "allowArbitraryExtensions": true,                 /* Enable importing files with any extension, provided a declaration file is present. */
     // "noResolve": true,                                /* Disallow 'import's, 'require's or '<reference>'s from expanding the number of files TypeScript should add to a project. */
     /* JavaScript Support */