Преглед на файлове

some additional adjustments

enzo преди 1 месец
родител
ревизия
96893e8cb6

+ 1 - 2
src/adapters/adapter.manager.ts

@@ -31,7 +31,6 @@ export class AdapterManager extends AdapterManagerBase {
                             id: uuidv4(),
                             type: `Adapter Event`,
                             event: `New Adapter`,
-                            transport: event.data.clientInfo.transport,
                             date: new Date(),
                             data: adapter
                         })
@@ -56,7 +55,7 @@ export class AdapterManager extends AdapterManagerBase {
             transportServices.forEach((transportService: TransportServiceInterface) => {
                 this.console.log({ message: `updating transport in adapter ${transportService.getInfo().transportServiceId}` })
                 this.updateTransportServicesRecord(transportService)
-            })
+            })      
         })
         // to automatically connect to existing started transport services
         this.event.next({

+ 4 - 0
src/base/msg.transmission.base.ts

@@ -10,5 +10,9 @@ export class MessageTransmissionBase implements MessageTransmissionInterface {
         // logic here
     }
 
+    protected handleAdapters(event: Observable<GeneralEvent<any>>): void {
+        throw new Error(`Method not implemented...`)
+    }
+
 }
 

+ 0 - 11
src/interface/interface.ts

@@ -130,18 +130,7 @@ export interface ClientObject {
     transportServiceId: string 
 }
 
-
-export interface AdapterProfile {
-    id: string,
-    transportType: Transport,
-}
-
 export interface TransportSet {
     transport: Transport,
     port: number
-}
-export interface TransportProfileMessage {
-    clientId: string,
-    message?: string,
-    payload?: any
 }

+ 6 - 1
src/test/receiver.ts

@@ -88,7 +88,7 @@ class Supervisor {
     private request(request: FisMessage, messageTransmission: TransmissionInterface): Observable<any> {
         return new Observable((response: Observer<any>) => {
             messageTransmission.transmitter.emit(request)
-            this.generalBus.pipe(
+            const subscription: Subscription = this.generalBus.pipe(
                 filter(event => event.event == 'New Message'),
                 filter(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID === request.header.messageID),
                 map(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage))
@@ -99,6 +99,11 @@ class Supervisor {
                     response.next(message)
                 }
             })
+
+            // Clean up on unsubscription
+            return () => {
+                subscription.unsubscribe();
+            };
         })
     }
 

+ 1 - 1
src/test/transmitter.ts

@@ -63,7 +63,7 @@ class Supervisor {
         messageTransmission.receiver.getIncoming().subscribe((event: GeneralEvent<TransportMessage>) => {
             let requestMessage: FisMessage = ((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage
             this.console.log({ message: `General Bus ${requestMessage?.header?.messageID ?? 'Not a message'}`, details: event }) // receiving end
-            this.clientIncomingMessage.next(requestMessage)
+            // this.clientIncomingMessage.next(requestMessage)
             this.messageProducer.getOutgoingMessages().pipe(
                 filter(message => message.header.messageID === requestMessage.header.messageID)
             ).subscribe(message => {

+ 1 - 1
src/transmission/msg.transmission.manager.ts

@@ -55,7 +55,7 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
     protected instantiateTransmissionComponents(clientObj: ClientObject, eventRef: Subject<GeneralEvent<any>>): TransmissionInterface {
         let receiverInstance: MessageTransmissionReceiver = new MessageTransmissionReceiver(clientObj.clientId, eventRef)
         let transmitterInstance: MessageTransmissionTransmitter = new MessageTransmissionTransmitter(clientObj.clientId, eventRef)
-        let requestResponseInstance: MessageTransmissionRequestResponse = new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, eventRef)
+        let requestResponseInstance: MessageTransmissionRequestResponse = new MessageTransmissionRequestResponse(clientObj.clientId, transmitterInstance, receiverInstance, eventRef)
         let transmissionObj: TransmissionInterface = {
             clientId: clientObj.clientId,
             transmitter: transmitterInstance,

+ 2 - 2
src/transmission/msg.transmission.receiver.ts

@@ -21,7 +21,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         this.handleAdapters(this.event.asObservable())
     }
 
-    getIncoming(): Observable<GeneralEvent<TransportMessage>> {
+    public getIncoming(): Observable<GeneralEvent<TransportMessage>> {
         this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
         return new Observable((observable: Observer<GeneralEvent<any>>) => {
             const subscription: Subscription = this.incomingMessage.pipe(
@@ -47,7 +47,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
     }
 
     /* Assigned and update adapters record. Currently no logic to swtich adapters based on performance or whatever logic to be integrated in the future */
-    private handleAdapters(adapterEvent: Observable<GeneralEvent<any>>): void {
+    protected handleAdapters(adapterEvent: Observable<GeneralEvent<any>>): void {
         adapterEvent.pipe(
             filter(event => event.type === `Adapter Event`),
             filter(event => event.event === `New Adapter`),

+ 34 - 13
src/transmission/msg.transmission.request-response.ts

@@ -1,33 +1,54 @@
 import { MessageTransmissionBase } from "../base/msg.transmission.base";
-import { filter, Observable, Observer, Subject, Subscription, takeWhile } from "rxjs";
+import { filter, map, Observable, Observer, Subject, Subscription, takeWhile } from "rxjs";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
-import { AdapterInterface, FisMessage, GeneralEvent, MessageRequestResponseInterface } from "../interface/interface";
+import { AdapterInterface, FisMessage, GeneralEvent, MessageRequestResponseInterface, TransportMessage } from "../interface/interface";
+import { WrappedMessage } from "../utils/message.ordering";
 
 export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface {
     transmitterInstance!: MessageTransmissionTransmitter;
     receiverInstance!: MessageTransmissionReceiver;
 
-    constructor(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, event: Subject<GeneralEvent<any>>) {
+    constructor(clientId: string, transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, event: Subject<GeneralEvent<any>>) {
         super()
-        this.setTransmissionProfile(transmitterInstance, receiverInstance)
+        this.clientId = clientId
+        this.transmitterInstance = transmitterInstance
+        this.receiverInstance = receiverInstance
         this.event = event
     }
 
-    setTransmissionProfile(transmissionInfo: MessageTransmissionTransmitter, receiverInfo: MessageTransmissionReceiver): void {
-        this.transmitterInstance = transmissionInfo
-        this.receiverInstance = receiverInfo
-    }
-
-    // To be Enhanced. This is actually wrong at the moment
     send(message: FisMessage): Observable<FisMessage> {
         return new Observable((response: Observer<FisMessage>) => {
             // logic here
+            if (this.transmitterInstance && this.receiverInstance) {
+                this.transmitterInstance.emit(message)
+                const subscription: Subscription = this.receiverInstance.getIncoming().pipe(
+                    filter(event => event.event === `New Message`),
+                    filter(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID === message.header.messageID),
+                    map(event => {
+                        return (event.data as TransportMessage).payload as FisMessage
+                    })
+                ).subscribe({
+                    next: (message: FisMessage) => {
+                        if (message.data == 'Complete') {
+                            response.complete()
+                        } else {
+                            response.next(message)
+                        }
+                    },
+                    error: error => console.error(error)
+                })
+                // Clean up on unsubscription
+                return () => {
+                    subscription.unsubscribe();
+                }
+            } else {
+                response.error(new Error('Transmitter or receiver instance is missing.'));
+                return;
+            }
         });
     }
 
-    setUpAdapter(adapter: AdapterInterface): void {
-        // logic here
-    }
+
 }

+ 1 - 1
src/transmission/msg.transmission.transmitter.ts

@@ -69,7 +69,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     }
 
     // hardcode it to use the first adapter added for now. Haven't decide on the logic to dynamically switch them adapters
-    private handleAdapters(adaptersEvent: Observable<GeneralEvent<any>>): void {
+    protected handleAdapters(adaptersEvent: Observable<GeneralEvent<any>>): void {
         adaptersEvent.pipe(
             filter(event => event.type === `Adapter Event`),
             filter(event => event.event === `New Adapter`),