Procházet zdrojové kódy

tested latest change with establishing connection. Now testing on clieint side to send message over

Enzo před 2 měsíci
rodič
revize
91440f4992

+ 4 - 2
.env

@@ -1,2 +1,4 @@
-Transport = "Websocket, Http"
-PORT = 3000, 3001
+;Transport = "Websocket, Http"
+;PORT = 3000, 3001
+Transport = "Websocket"
+PORT = 3000

+ 2 - 1
src/connector/connector.base.ts

@@ -17,7 +17,8 @@ export class ConnectionAdapter implements ConnectionAdaptorBase {
     }
 
     getInfo(): AdapterProfile {
-        throw new Error("Method not implemented.");
+        // throw new Error("Method not implemented.");
+        return this.connectorProfile
     }
     subscribeConnectionState(): Observable<ConnectionState> {
         throw new Error("Method not implemented.");

+ 4 - 2
src/interface/transport.interface.ts

@@ -5,8 +5,7 @@ import { MessageTransmissionReceiver } from "../transmission/msg.transmission.re
 import { ConnectionAdapter } from "../connector/connector.base";
 
 export interface MessageTransmissionManager {
-    // what the hell is this here for
-    getTransmissionInstance(transportServices: TransportService[]): MessageTransmission
+    subscribe(): Observable<MessageTransmission>
 }
 
 export interface MessageTransmission {
@@ -23,6 +22,7 @@ export interface MessageTransmissionBase  {
     adaptorsArray: Array<ConnectionAdapter> 
 
     getInfo(): TransmissionProfile
+    setUpAdapter(adapterSet: AdapterSet[]): void
 }
 
 export interface MessageReceiver extends MessageTransmissionBase {
@@ -39,6 +39,8 @@ export interface MessageTransmitter extends MessageTransmissionBase {
 }
 
 export interface MessageRequestResponse extends MessageTransmissionBase {
+    incomingMessageBus: Subject<any>
+    outgoingMessageBus: Subject<any>
     transmitterInstance: MessageTransmissionTransmitter
     receiverInstance: MessageTransmissionReceiver
 

+ 17 - 2
src/test/receiver.ts

@@ -7,8 +7,9 @@ import { io, Socket } from "socket.io-client";
 import { handleClientSocketConnection } from "../utils/socket.utils";
 import { ConnectedServerSocket } from "../transport/websocket";
 import { v4 as uuidv4 } from 'uuid'
-import { Subject } from "rxjs";
-import { TransportEvent } from "../interface/connector.interface";
+import { interval, Subject } from "rxjs";
+import { TransportEvent, TransportMessage } from "../interface/connector.interface";
+import { FisMessage } from "../interface/transport.interface";
 
 class SocketClient {
     private socket: Socket;
@@ -21,11 +22,25 @@ class SocketClient {
         // use the existing socket handler
         handleClientSocketConnection(this.socket, this.connectedServerSocket).subscribe(this.event)
         this.startListening(this.event)
+        this.sendMessage()
     }
 
     private startListening(event: Subject<TransportEvent>): void {
         event.subscribe((event: TransportEvent) => console.log(event))
     }
+
+    private sendMessage(): void {
+        interval(3000).subscribe(interval => {
+            let message: FisMessage = {
+                header: {
+                    messageID: uuidv4(),
+                    messageName: 'RequestMessage'
+                }, 
+                data: 'Data'
+            }
+            this.socket.emit('message', message)
+        })
+    }
 }
 
 // Usage example:

+ 7 - 4
src/transmission/msg.transmission.base.ts

@@ -1,21 +1,20 @@
 
 import { filter, Observable, Observer, Subject, Subscription, takeWhile, Unsubscribable } from 'rxjs';
-import { AdapterProfile, AdaptorTransmissionRole } from '../interface/connector.interface';
+import { AdapterProfile, AdapterSet, AdaptorTransmissionRole, TransportEvent } from '../interface/connector.interface';
 import { Bus, FisMessage, MessageTransmissionBase as MessageTransmissionBaseInterface, TransmissionProfile } from '../interface/transport.interface'
 import { v4 as uuidv4 } from 'uuid'
 import { ConnectionAdapter } from '../connector/connector.base';
 
 export class MessageTransmissionBase implements MessageTransmissionBaseInterface {
+    event!: Observable<TransportEvent>
     transmissionProfile!: TransmissionProfile
     msgRepositoryService: any;
     transmissionRole!: AdaptorTransmissionRole;
     adaptorsArray: Array<ConnectionAdapter> = []
     transmissionService: any;
     adapterService!: ConnectionAdapter | undefined
-    incomingMessageBus!: Subject<any>;
-    outgoingMessageBus!: Subject<any>;
 
-    constructor() { 
+    constructor(event: Observable<TransportEvent>) { 
         // logic here
     }
 
@@ -23,4 +22,8 @@ export class MessageTransmissionBase implements MessageTransmissionBaseInterface
         return this.transmissionProfile
     }
 
+    setUpAdapter(adapterSet: AdapterSet[]): void {
+        throw new Error(`Method not implemented`)
+    }
+
 }

+ 12 - 11
src/transmission/msg.transmission.manager.ts

@@ -10,7 +10,7 @@ import { WebsocketTransportService } from "../transport/websocket";
 import { HttpTransportService } from "../transport/http";
 import { error } from "console";
 
-export class MessageTransmissionManager {
+export class MessageTransmissionManager implements MessageTransmissionManagerInterface{
     private transportServiceArray: TransportService[] = []
     private transportSet: Set<TransportSet> = new Set()
     transmission: MessageTransmission[] = []
@@ -48,9 +48,9 @@ export class MessageTransmissionManager {
                 }
 
                 // 1 set only
-                let transmitter: MessageTransmissionTransmitter = this.getTransmitter(clientId, adapterSet)
-                let receiver: MessageTransmissionReceiver = this.getReceiver(clientId, adapterSet)
-                let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver)
+                let transmitter: MessageTransmissionTransmitter = this.getTransmitter(clientId, adapterSet, this.event.asObservable())
+                let receiver: MessageTransmissionReceiver = this.getReceiver(clientId, adapterSet, this.event.asObservable())
+                let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver, this.event.asObservable())
                 let transmission: MessageTransmission = {
                     id: clientId,
                     transmitter: transmitter,
@@ -66,26 +66,26 @@ export class MessageTransmissionManager {
     }
 
 
-    private getTransmitter(transmissionId: string, adapterSets: AdapterSet[]): MessageTransmissionTransmitter {
+    private getTransmitter(transmissionId: string, adapterSets: AdapterSet[], event: Observable<TransportEvent>): MessageTransmissionTransmitter {
         let transmitterProfile: TransmitterProfile = {
             id: transmissionId,
             name: '', // for now make it empty. We will use the assigned uuid here
             dateCreated: new Date()
         }
-        return new MessageTransmissionTransmitter(transmitterProfile, adapterSets)
+        return new MessageTransmissionTransmitter(transmitterProfile, adapterSets, event)
     }
 
-    private getReceiver(transmissionId: string, adapterSets: AdapterSet[]): MessageTransmissionReceiver {
+    private getReceiver(transmissionId: string, adapterSets: AdapterSet[], event: Observable<TransportEvent>): MessageTransmissionReceiver {
         let receiverProfile: ReceiverProfile = {
             id: transmissionId,
             name: '', // for now make it empty. We will use the assigned uuid here
             dateCreated: new Date()
         }
-        return new MessageTransmissionReceiver(receiverProfile, adapterSets)
+        return new MessageTransmissionReceiver(receiverProfile, adapterSets, event)
     }
 
-    private getRequestResponse(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver): MessageTransmissionRequestResponse {
-        return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance)
+    private getRequestResponse(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, event: Observable<TransportEvent>): MessageTransmissionRequestResponse {
+        return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, event)
     }
 
 
@@ -130,4 +130,5 @@ export class MessageTransmissionManager {
 interface TransportSet {
     transport: Transport,
     port: number
-}
+}
+

+ 19 - 7
src/transmission/msg.transmission.receiver.ts

@@ -1,5 +1,5 @@
 import { filter, map, Observable, Observer, PartialObserver, Subject, Subscriber, Subscription, takeWhile, Unsubscribable } from 'rxjs';
-import { AdapterProfile, AdapterSet, AdaptorTransmissionRole, TransportEvent, TransportMessage } from '../interface/connector.interface';
+import { AdapterProfile, AdapterSet, AdaptorTransmissionRole, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
 import { MessageTransmissionBase } from './msg.transmission.base';
 import { Bus, EventMessage, FisMessage, MessageReceiver as MessageReceiverInterface, ReceiverProfile, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
 import { ConnectionAdapter } from '../connector/connector.base';
@@ -9,12 +9,12 @@ import { ConnectionManager } from '../connector/connector.manager';
 
 export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
     receiverProfile!: ReceiverProfile;
-    private incomingTransportMessage: Subject<TransportMessage> = new Subject()
 
-    constructor(profile: ReceiverProfile, adapterSets: AdapterSet[]) {
-        super();
+    constructor(profile: ReceiverProfile, adapterSets: AdapterSet[], event: Observable<TransportEvent>) {
+        super(event);
 
         this.setReceiver(profile)
+        this.setUpAdapter(adapterSets)
     }
 
     setReceiver(receiverProfile: ReceiverProfile): void {
@@ -25,9 +25,11 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         return new Observable((observable: Observer<TransmissionMessage>) => {
             // logic here
             if (bus == Bus.GeneralBus) {
-                const subscription: Subscription = this.incomingTransportMessage.pipe(
-                    filter((message: TransportMessage) => message.payload === `Incoming Message`),
-                    map(message => message.payload as TransportMessage)
+                // Need to merge all the adapters into one when the time comes 
+                // SAMPLE: This adapterArray.forEach(adapter => { ... })
+                const subscription: Subscription = (this.adapterService as ReceiverConnectionAdapter).getMessageBus(Bus.GeneralBus).pipe(
+                    filter((event: TransportEvent) => event.event == 'New Message'),
+                    map((event: TransportEvent) => event.data as TransportMessage)
                 ).subscribe((message: TransportMessage) => {
                     observable.next({
                         adapterId: message.target as string, // assign adapter id so that it knows who to reply back
@@ -42,4 +44,14 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
             }
         })
     }
+
+    setUpAdapter(adapterSets: AdapterSet[]): void {
+        if (adapterSets.length > 0) {
+            adapterSets.forEach((adapter: AdapterSet) => {
+                this.adaptorsArray.push(adapter.receiverAdapter)
+            })
+        }
+        // for now just hardcode to use 1 adapter type until connection manager is further enhacne to configure adapters on the fly
+        this.adapterService = this.adaptorsArray.find(obj => obj.getInfo().transportType === Transport.Websocket)
+    }
 }

+ 17 - 6
src/transmission/msg.transmission.request-response.ts

@@ -1,18 +1,19 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
-import { FisMessage, MessageReceiver, MessageRequestResponse as MessageRequestResponseInterface } from '../interface/transport.interface'
-import { filter, Observable, Observer, Subscription, takeWhile } from "rxjs";
+import { FisMessage, MessageRequestResponse as MessageRequestResponseInterface } from '../interface/transport.interface'
+import { filter, Observable, Observer, Subject, Subscription, takeWhile } from "rxjs";
 import { v4 as uuidv4 } from 'uuid'
-import { AdapterSet, AdaptorTransmissionRole, RequestResponseConnectionAdapter, TransportEvent } from "../interface/connector.interface";
+import { AdapterSet, Transport, TransportEvent } from "../interface/connector.interface";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
-import { ConnectionManager } from "../connector/connector.manager";
 
 export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface {
     transmitterInstance!: MessageTransmissionTransmitter;
     receiverInstance!: MessageTransmissionReceiver;
+    incomingMessageBus!: Subject<any>;
+    outgoingMessageBus!: Subject<any>;
 
-    constructor(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver) {
-        super()
+    constructor(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, event: Observable<TransportEvent>) {
+        super(event)
         this.setTransmissionProfile(transmitterInstance, receiverInstance)
     }
 
@@ -45,4 +46,14 @@ export class MessageTransmissionRequestResponse extends MessageTransmissionBase
                 });
         });
     }
+
+    setUpAdapter(adapterSets: AdapterSet[]): void {
+        if (adapterSets.length > 0) {
+            adapterSets.forEach((adapter: AdapterSet) => {
+                this.adaptorsArray.push(adapter.requestResponsAdapter)
+            })
+        }
+        // for now just hardcode to use 1 adapter type until connection manager is further enhacne to configure adapters on the fly
+        this.adapterService = this.adaptorsArray.find(obj => obj.getInfo().transportType === Transport.Websocket)
+    }
 }

+ 5 - 4
src/transmission/msg.transmission.transmitter.ts

@@ -1,16 +1,17 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
 import { FisMessage, MessageTransmitter as MessageTransmitterInterface, TransmitterProfile } from '../interface/transport.interface'
-import { AdapterSet, Transport } from "../interface/connector.interface";
+import { AdapterSet, Transport, TransportEvent } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
+import { Observable } from "rxjs";
 
 /* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
 connectors or adapters will have their own identifier*/
 export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
     transmitterProfile!: TransmitterProfile;
 
-    constructor(profile: TransmitterProfile, adapterSets: AdapterSet[]) {
-        super()
+    constructor(profile: TransmitterProfile, adapterSets: AdapterSet[], event: Observable<TransportEvent>) {
+        super(event)
         this.setTransmitter(profile)
         this.setUpAdapter(adapterSets)
 
@@ -25,7 +26,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         (this.adapterService as TransmitterConnectionAdapter).emit(message)
     }
 
-    private setUpAdapter(adapterSets: AdapterSet[]) {
+    setUpAdapter(adapterSets: AdapterSet[]) {
         if (adapterSets.length > 0) {
             adapterSets.forEach((adapter: AdapterSet) => {
                 this.adaptorsArray.push(adapter.transmitterAdapter)

+ 2 - 3
src/transport/websocket.ts

@@ -39,6 +39,7 @@ export class WebsocketTransportService implements TransportService {
     public startClient(url: string): void {
         // logic here
         startClientSocketConnection(url).then((socket: ClientSocket) => {
+            let clientName = '' // initiate a check in local file. If no client name, then this is new 
             handleClientSocketConnection(socket, this.connectedServer).subscribe(this.transportEvent)
         }).catch((error) => {
             console.error(`WebsocketTransport ERROR:`, error)
@@ -48,11 +49,9 @@ export class WebsocketTransportService implements TransportService {
 
     // for transmission(Server Only, not applicable for client Socket)
     public emit(message: TransportMessage): void {
-        /* Just a rough idea, Because this service still needs to be direct the mesage to be emiteed based on the client that send it earlier.
-        For example, if it'd doing a request response, obviosuly, it needs to be identified whose respnses it belong to. */
         let clientObj: ConnectedClientSocket | undefined = this.connectedClientSocket.find(obj => obj.id == message.target)
         if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
-            clientObj.socketInstance.emit(`message`, message)
+            clientObj.socketInstance.emit(`message`, message.payload)
         }
     }
 

+ 99 - 25
src/utils/socket.utils.ts

@@ -6,7 +6,8 @@ import * as fs from 'fs'
 import { v4 as uuidv4 } from 'uuid'
 import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
 import { ConnectedClientSocket, ConnectedServerSocket } from '../transport/websocket';
-import { EventMessage } from '../interface/transport.interface';
+import { EventMessage, FisMessage } from '../interface/transport.interface';
+import { error } from 'console';
 
 export function startSocketServer(port: number): Observable<SocketForConnectedClient> {
     return new Observable((observer) => {
@@ -146,6 +147,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                 // Update websocket instance record
                 let clientObj: ConnectedServerSocket | undefined = serversConnected.find(obj => obj.id === (data.message as ConnectedServerSocket).id)
                 if (clientObj) {
+                    receiverProfileInfo.connectionState = clientObj.connectionState
                     clientObj.socketInstance = receiverProfileInfo.socketInstance
                     clientObj.connectionState.next('ONLINE')
                 }
@@ -210,33 +212,50 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
                 })
                 // Update connected clientInstance info to adapter
                 connectedClientSocket.push(clientInstance)
+                addClientToDB(clientInstance)
                 startListening(socket, clientInstance, event)
             } else {
                 // update first
-                let clientInstance: ConnectedClientSocket | undefined = connectedClientSocket.find(obj => obj.id === message.data.id)
-                if (clientInstance) {
-                    console.log(`Socket Client ${clientInstance.id} Found`)
-                    socket.emit('profile', { name: 'Adjusted Profile', message: { id: clientInstance.id } })
-                    // replace socket instance since the previous has been terminated
-                    clientInstance.socketInstance = socket
-                    // need to start listening again, because it's assigned a different socket instance this time round
-                    startListening(socket, clientInstance, event)
-                    event.next({
-                        id: uuidv4(),
-                        event: 'Client Reconnected',
-                        data: {
-                            clientId: clientInstance.id,
-                            message: `Client ${clientInstance.id} connection re-established`,
-                            payload: clientInstance
-                        } as EventMessage
-                    })
-                    // Resume operation
-                    if (clientInstance.connectionState.getValue() == 'OFFLINE') {
-                        clientInstance.connectionState.next(`ONLINE`)
-                    }
+                let clientInstance: ConnectedClientSocket | undefined
+                if (connectedClientSocket.length > 0) {
+                    clientInstance = connectedClientSocket.find(obj => obj.id === message.data.id)
+                    temp(clientInstance)
                 } else {
-                    console.log(`Profile Not Found`)
-                    socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' })
+                    // for the case server itself got shit down or something
+                    checkIfClientExists(message.data.id).then((client: ConnectedClientSocket) => {
+                        clientInstance = client
+                        temp(clientInstance)
+                    }).catch(error => console.error(error))
+                }
+                function temp(clientInstance: ConnectedClientSocket | undefined) {
+                    if (clientInstance) {
+                        console.log(`Socket Client ${clientInstance.id} Found`)
+                        socket.emit('profile', { name: 'Adjusted Profile', message: { id: clientInstance.id } })
+                        // replace socket instance since the previous has been terminated
+                        clientInstance.socketInstance = socket
+                        // some explanation here. For the case where the server reads from the DB, no need to terminate subject, since all instances would be destroyed alongside the server shut down. This case is specificd only when there's a need to read from local file
+                        if (!clientInstance.connectionState) {
+                            clientInstance.connectionState = new BehaviorSubject<ConnectionState>(`ONLINE`)
+                        }
+                        // need to start listening again, because it's assigned a different socket instance this time round
+                        startListening(socket, clientInstance, event)
+                        event.next({
+                            id: uuidv4(),
+                            event: 'Client Reconnected',
+                            data: {
+                                clientId: clientInstance.id,
+                                message: `Client ${clientInstance.id} connection re-established`,
+                                payload: clientInstance
+                            } as EventMessage
+                        })
+                        // Resume operation
+                        if (clientInstance.connectionState.getValue() == 'OFFLINE') {
+                            clientInstance.connectionState.next(`ONLINE`)
+                        }
+                    } else {
+                        console.log(`Profile Not Found`)
+                        socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' })
+                    }
                 }
             }
         })
@@ -260,6 +279,62 @@ export async function writeFile(data: ConnectedServerSocket, filename: string):
     })
 }
 
+/* For Internal Usage only. Temporary serve as a way for server to keep track of clients. To be replaced in the future with better alternatives. */
+function addClientToDB(entry: ConnectedClientSocket, filePath: string = 'clients.json'): void {
+    try {
+        let data: ConnectedClientSocket[] = [];
+
+        // Check if the file exists and load existing data
+        if (fs.existsSync(filePath)) {
+            const fileContent = fs.readFileSync(filePath, 'utf-8');
+            data = JSON.parse(fileContent);
+        }
+
+        // Append the new object to the array
+        data.push({
+            id: entry.id,
+            dateCreated: entry.dateCreated,
+            connectionState: null,
+            socketInstance: null
+        } as unknown as ConnectedClientSocket);
+
+        // Write the updated array back to the file
+        fs.writeFileSync(filePath, JSON.stringify(data, null, 2), 'utf-8');
+        console.log(`Entry added successfully.`);
+    } catch (error) {
+        console.error('Error writing to file:', error);
+    }
+}
+
+async function checkIfClientExists(id: string, filePath: string = 'clients.json'): Promise<ConnectedClientSocket> {
+    return new Promise((resolve, reject) => {
+        try {
+            // Check if the file exists
+            if (!fs.existsSync(filePath)) {
+                console.log("File does not exist.");
+                reject('File does not exist');
+            }
+
+            // Read and parse the data
+            const fileContent = fs.readFileSync(filePath, 'utf-8');
+            const data: any[] = JSON.parse(fileContent);
+
+            // Check if an object with the given id exists
+            let obj = data.find(entry => entry.id === id);
+
+            if (obj) {
+                console.log(`Client with ID ${id} exists.`);
+            } else {
+                console.log(`Client with ID ${id} does not exist.`);
+            }
+
+            resolve(obj);
+        } catch (error) {
+            console.error('Error reading the file:', error);
+            reject(`Error reading the file`)
+        }
+    })
+}
 
 
 // Check if filename exists. Return profile information if there's any
@@ -321,4 +396,3 @@ export function startListening(socket: SocketForConnectedClient, client: Connect
         eventListener.complete()
     })
 }
-