Browse Source

integrated Subsribable to adapter components

enzo 17 hours ago
parent
commit
e831cb8e84

+ 13 - 3
src/adapters/adapter.receiver.ts

@@ -1,10 +1,11 @@
 import dotenv from 'dotenv';
-import { filter, Observable, Observer, Subscription } from 'rxjs';
+import { filter, Observable, Observer, Subscription, Unsubscribable } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
 import ConsoleLogger from '../utils/log.utils';
 import { WrappedMessage } from '../utils/message.ordering';
 import { AdapterBase } from '../base/adapter.base';
 import { FisMessage, GeneralEvent, TransportType, TransportMessage, TransportServiceInterface, TransmissionRole } from '../interface/interface';
+import { checkRxType } from '../utils/general.utils';
 
 dotenv.config();
 
@@ -17,13 +18,22 @@ export class ReceiverAdapter<T> extends AdapterBase<T> {
         this.console.log({ message: `Contructing ReceiverAdapter for clientId: ${this.adapterProfile.clientId}` })
     }
 
-    subscribeForIncoming(): Observable<GeneralEvent<any>> {
+    public subscribe(param: Observer<T> | Observable<T>): Unsubscribable | null {
+        if (checkRxType(param) === `Observer`) {
+            this.console.log({ message: `Is Observer` });
+            return this.subscribeForIncoming().subscribe(param as Observer<any>)
+        } else {
+            return null
+        }
+    }
+
+    public subscribeForIncoming(): Observable<GeneralEvent<any>> {
         this.console.log({ message: `Getting message bus for this connector: ${this.adapterProfile.clientId}` })
         return new Observable((observable: Observer<GeneralEvent<any>>) => {
             const subscription: Subscription = this.adapterProfile.transportService.subscribeForEvent().pipe(
                 filter(event => event.type === `Transport Event`),
                 filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).source === this.adapterProfile.clientId),
-                filter((message: GeneralEvent<any>) => message.event === 'New Message'),
+                filter((message: GeneralEvent<TransportMessage>) => message.event === 'New Message'),
             ).subscribe((message: GeneralEvent<TransportMessage>) => {
                 this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).source)}`, details: message })
                 observable.next({

+ 17 - 3
src/adapters/adapter.transmitter.ts

@@ -1,9 +1,10 @@
 import dotenv from 'dotenv';
-import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Subject } from 'rxjs';
+import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Observer, Subject, Unsubscribable } from 'rxjs';
 import { WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
 import { AdapterBase } from '../base/adapter.base';
 import { ClientObject, ConnectionState, FisMessage, TransportType, TransportMessage, TransportServiceInterface, TransmissionRole } from '../interface/interface';
+import { checkRxType } from '../utils/general.utils';
 
 dotenv.config();
 export class TransmitterAdapter<T> extends AdapterBase<T> {
@@ -15,9 +16,22 @@ export class TransmitterAdapter<T> extends AdapterBase<T> {
         this.console.log({ message: `Contructing TransmitterAdapter for client: ${this.adapterProfile.clientId}` })
     }
 
-    emit(selfId: string, message: WrappedMessage): void {
+    public subscribe(param: Observer<T> | Observable<T>): Unsubscribable | null {
+        if (checkRxType(param) === `Observable`) {
+            this.console.log({ message: `Is Observable` });
+            // Create a new Subscription to manage unsubscription
+            const subscription = (param as Observable<any>).subscribe((message: { selfId: string, payload: WrappedMessage }) => {
+                this.emit(message.selfId, message.payload);
+            });
+
+            return subscription; // Return the Subscription (Unsubscribable)
+        } else {
+            return null;
+        }
+    }
+
+    public emit(selfId: string, message: WrappedMessage): void {
         // logic here
-        // this.console.log({ message: `Emitting: ${(message.payload as FisMessage).header.messageID} to ${this.adapterProfile.clientId}` })
         this.console.log({ message: `Emitting: ${message.thisMessageID} to ${this.adapterProfile.clientId}` })
         this.adapterProfile.transportService.emit({
             id: this.adapterProfile.clientId,

+ 1 - 1
src/base/adapter.base.ts

@@ -15,7 +15,7 @@ export class AdapterBase<T> implements AdapterInterface<T> {
         this.setupConnectionState(this.adapterProfile.transportService)
     }
 
-    subscribe(observer: Observer<T>): Unsubscribable {
+    public subscribe(param: Observer<T> | Observable<T>): Unsubscribable | null {
         throw new Error("Method not implemented.");
     }
 

+ 1 - 1
src/interface/interface.ts

@@ -47,7 +47,7 @@ export interface MessageRequestResponseInterface<T> extends MessageTransmissionI
 
 /* ADAPTER COMPONENTS */
 export interface AdapterInterface<T> {
-    subscribe(observer: Observer<T>): Unsubscribable
+    subscribe(param: Observer<T> | Observable<T>): Unsubscribable | null
     getAdapterProfile(type?: `id` | `clientId` | `role` | `transportId` | `transportType` | `connectionState`): AdapterProfile | string | Observable<ConnectionState> | undefined
 }
 

+ 9 - 5
src/test/receiver.ts

@@ -47,6 +47,7 @@ class Supervisor {
         this.messageProducer = new MessageProducer(this.clientIncomingMessage)
         this.transmissionManager = new MessageTransmissionManager(this.event, this.isClient)
         this.startMessageTransmission()
+        // just for debugging only. Will comment it out later
         this.clientIncomingMessage.subscribe(message => this.console.log({ message: `Received ${message.header.messageID ?? `Undefined`}` }))
 
     }
@@ -54,12 +55,15 @@ class Supervisor {
     // Testing here. Just comment out or uncomment out the parts you want to test
     private startMessageTransmission(): void {
         /* Actor Version <All operations are synchronized for now>  [Can set to async at a later time]*/
+        let transmitter: MessageTransmissionTransmitter<any> = this.transmissionManager.getTransmissionObject(this.config, `Transmitter`) as MessageTransmissionTransmitter<any>
+        let transmitterSubscription = transmitter.subscribe(this.messageProducer.getNotificationMessage())
         let receiver: MessageTransmissionReceiver<any> = this.transmissionManager.getTransmissionObject(this.config, `Receiver`) as MessageTransmissionReceiver<any>
-        let receiverSubscription = receiver.subscribe(this.clientIncomingMessage as Observer<any>) as Unsubscribable
-        setTimeout(() => {
-            receiverSubscription!.unsubscribe()
-            this.console.log({ message: `Stop receiving messag after 10 seconds` })
-        }, 20000)
+        // let receiverSubscription = receiver.subscribe(this.clientIncomingMessage as Observer<any>) as Unsubscribable
+        // testing to stop operation by unsubscribing
+        // setTimeout(() => {
+        //     receiverSubscription!.unsubscribe()
+        //     this.console.log({ message: `Stop receiving messag after 10 seconds` })
+        // }, 20000)
 
         /* Primitive Version */
         // let transmitter: MessageTransmissionTransmitter<any> = this.transmissionManager.getTransmitter(this.config) as MessageTransmissionTransmitter<any>

+ 9 - 6
src/test/transmitter.ts

@@ -7,7 +7,7 @@ listening to it.
 Note: However, that doesn't mean that if there's no tranport protocol available, it will not work. The trannsmission manager and adapter
 manager will just subscribe and listen to the global event for any new transport protocal and then perform the necessary subscription to 
 it. */
-import { filter, interval, map, Observable, Observer, Subject, Subscription, take } from "rxjs";
+import { filter, interval, map, Observable, Observer, Subject, Subscription, take, Unsubscribable } from "rxjs";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import ConsoleLogger from "../utils/log.utils";
@@ -49,11 +49,14 @@ class Supervisor {
     private startMessageTransmission(): void {
         /* Actor Version <All operations are synchronized for now>  [Can set to async at a later time]*/
         let transmitter: MessageTransmissionTransmitter<any> = this.transmissionManager.getTransmissionObject(this.config, `Transmitter`) as MessageTransmissionTransmitter<any>
-        let transmitterSubscription = transmitter.subscribe(this.messageProducer.getNotificationMessage())
-        setTimeout(() => {
-            transmitterSubscription!.unsubscribe()
-            this.console.log({ message: `Stop transmitting messag after 10 seconds` })
-        }, 20000)
+        // let transmitterSubscription = transmitter.subscribe(this.messageProducer.getNotificationMessage())
+        let receiver: MessageTransmissionReceiver<any> = this.transmissionManager.getTransmissionObject(this.config, `Receiver`) as MessageTransmissionReceiver<any>
+        let receiverSubscription = receiver.subscribe(this.clientIncomingMessage as Observer<any>) as Unsubscribable
+        // testing to stop operation by unsubscribing
+        // setTimeout(() => {
+        //     transmitterSubscription!.unsubscribe()
+        //     this.console.log({ message: `Stop transmitting messag after 10 seconds` })
+        // }, 20000)
 
         /* Primitive Version */
         // let transmitter: MessageTransmissionTransmitter<any> = this.transmissionManager.getTransmitter(this.config) as MessageTransmissionTransmitter<any>

+ 1 - 2
src/transmission/msg.transmission.manager.ts

@@ -37,7 +37,6 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase<a
         }
     }
 
-
     public getTransmitter(config: TransmissionConfig): MessageTransmitterInterface<any> {
         let transmitterInstance = new MessageTransmissionTransmitter(config, this.transmissionEvent)
         this.transmissionObjects.push(transmitterInstance)
@@ -51,7 +50,7 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase<a
     }
 
     public getRequestResponse(config: TransmissionConfig, transmitterInstance: MessageTransmitterInterface<any>, receiverInstance: MessageReceiverInterface<any>): MessageTransmissionRequestResponse<any> {
-        return new MessageTransmissionRequestResponse(config, transmitterInstance, receiverInstance)
+        return new MessageTransmissionRequestResponse(config, this.transmissionEvent, transmitterInstance, receiverInstance)
     }
 
     // Subscribe for new adapters from Adapter Manager

+ 18 - 10
src/transmission/msg.transmission.receiver.ts

@@ -25,6 +25,11 @@ export class MessageTransmissionReceiver<T> extends MessageTransmissionBase<T> {
         this.setTransmissionProfile(`Receiver`, config)
         this.console.log({ message: `Constructing Receiver Transmission for Receiving target: ${this.transmissionProfile.config.target}` })
         this.initializeReceiverComponents(transmissionEvent)
+
+        // Just for testing only. Will comment this out later.
+        this.incomingMessage.subscribe(message => {
+            this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).source)}`, details: message })
+        })
     }
 
     public subscribe(param: Observer<any> | Observable<any>): Unsubscribable | null {
@@ -46,7 +51,7 @@ export class MessageTransmissionReceiver<T> extends MessageTransmissionBase<T> {
                 this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
                 checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
                     // only release the message before it exists
-                    this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
+                    // this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
                     receivable.next(((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage);
                 }).catch((error) => {
                     this.console.log({ message: `Observer Error`, details: error })
@@ -75,15 +80,18 @@ export class MessageTransmissionReceiver<T> extends MessageTransmissionBase<T> {
             if (!this.currentAdapter) {
                 this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`id`)} as current adapter.` })
                 this.currentAdapter = adapter as ReceiverAdapterInterface<any>
-                this.currentAdapter.subscribeForIncoming().subscribe({
-                    next: (message: GeneralEvent<TransportMessage>) => {
-                        this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).source)}`, details: message })
-                        this.incomingMessage.next(message)
-                    },
-                    error: error => {
-                        // Error handling. Idealling switching to other adapters
-                    }
-                })
+                /* New version for subscribable */
+                this.currentAdapter.subscribe(this.incomingMessage)
+
+                // this.currentAdapter.subscribeForIncoming().subscribe({
+                //     next: (message: GeneralEvent<TransportMessage>) => {
+                //         this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).source)}`, details: message })
+                //         this.incomingMessage.next(message)
+                //     },
+                //     error: error => {
+                //         // Error handling. Idealling switching to other adapters
+                //     }
+                // })
                 let connectionState: Observable<ConnectionState> = this.currentAdapter.getAdapterProfile(`connectionState`) as Observable<ConnectionState>
                 connectionState.subscribe(this.connectionStateEvent)
             } else {

+ 5 - 9
src/transmission/msg.transmission.request-response.ts

@@ -2,27 +2,23 @@
 mechanism is still using message. Here as you will see, it is basically taking the already instantiated transmitter and receiver components
 and basically just filtering the responses based on whatever identifier it needs for the orignal request. */
 import { MessageTransmissionBase } from "../base/msg.transmission.base";
-import { filter, Observable, Observer, Subscription, Unsubscribable } from "rxjs";
-import { FisMessage, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransmissionConfig, TransmissionProfile } from "../interface/interface";
+import { filter, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs";
+import { FisMessage, GeneralEvent, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransmissionConfig, TransmissionProfile } from "../interface/interface";
 import ConsoleLogger from "../utils/log.utils";
 import { checkRxType } from "../utils/general.utils";
 
-export class MessageTransmissionRequestResponse<T> implements MessageRequestResponseInterface<T> {
+export class MessageTransmissionRequestResponse<T> extends MessageTransmissionBase<T> {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionRequestResponse`, ['transmission'])
-    private transmissionProfile!: TransmissionProfile
     protected transmitterInstance!: MessageTransmitterInterface<any>;
     protected receiverInstance!: MessageReceiverInterface<any>;
 
-    constructor(config: TransmissionConfig, transmitterInstance: MessageTransmitterInterface<any>, receiverInstance: MessageReceiverInterface<any>) {
+    constructor(config: TransmissionConfig, event: Subject<GeneralEvent<any>>, transmitterInstance: MessageTransmitterInterface<any>, receiverInstance: MessageReceiverInterface<any>) {
+        super(config, event)
         this.console.log({ message: `Constructing Request Response Transmission for Receiving target: ${config.target}` })
         this.transmitterInstance = transmitterInstance
         this.receiverInstance = receiverInstance
     }
 
-    getTransmissionProfile(type?: `config` | `id` | `role`): TransmissionProfile | TransmissionConfig | string | undefined {
-        return type ? this.transmissionProfile[type] : this.transmissionProfile;
-    }
-
     // this is still broken at this point in time. DO NOT USE!!!!
     subscribe(observer: Observer<T> | Observable<T>): Unsubscribable | null {
         if (checkRxType(observer) === `Observable`) {

+ 22 - 14
src/transmission/msg.transmission.transmitter.ts

@@ -17,7 +17,6 @@ import { checkRxType } from "../utils/general.utils";
 
 
 export class MessageTransmissionTransmitter<T> extends MessageTransmissionBase<T> {
-    private internalObservable: Observable<GeneralEvent<T>> = new Observable()
     private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
     private messageToBeBuffered!: Subject<FisMessage | WrappedMessage>
@@ -39,7 +38,7 @@ export class MessageTransmissionTransmitter<T> extends MessageTransmissionBase<T
             // Create a new Subscription to manage unsubscription
             const subscription = (observer as Observable<any>).subscribe(message => {
                 this.emit(message);
-                this.console.log({ message: `Message ${message.header?.messageID ?? `Undefined`} being processed... ` });
+                this.console.log({ message: `Message ${message.header?.messageID ?? `Undefined`} being ${this.connectionStateEvent.getValue() === `ONLINE` ? `processed...` : `buffered`}... ` });
             });
 
             return subscription; // Return the Subscription (Unsubscribable)
@@ -66,28 +65,37 @@ export class MessageTransmissionTransmitter<T> extends MessageTransmissionBase<T
             map(event => { return event.data as AdapterInterface<any> })
         ).subscribe((adapter: AdapterInterface<any>) => {
             this.adapters.push(adapter)
+            // Setting the first adapter as default for now
             this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} transmitting adapter. Current adapter length: ${this.adapters.length}` })
             if (!this.currentAdapter) {
                 this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`id`)} as current adapter.` })
                 this.currentAdapter = adapter as TransmitterAdapterInterface<T>
                 let connectionState: Observable<ConnectionState> = this.currentAdapter.getAdapterProfile('connectionState') as Observable<ConnectionState>
                 connectionState.subscribe(this.connectionStateEvent)
+                // new Version, adhering to actor subscribable paradigm
+                this.currentAdapter.subscribe(this.buffer.returnSubjectForBufferedItems().pipe(map(message => {
+                    // have to do this because of the subsribe method
+                    return {
+                        selfId: this.transmissionProfile.config.source,
+                        payload: message
+                    }
+                })))
             }
         })
 
         this.buffer.implementRetransmission(this.messageToBeBuffered, this.connectionStateEvent.asObservable(), true)
-        // automatically subscribe to allow released bffered messages to be released
-        this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
-            // need to work with wrapped messages
-            this.console.log({ message: `Transmitting ${bufferedMessage.thisMessageID}` });
-            if (this.currentAdapter) {
-                this.currentAdapter.emit(this.transmissionProfile.config.source, bufferedMessage)
-            } else {
-                // just flush back the message inside the buffer, if the adapter is not ready or assigned.
-                this.messageToBeBuffered.next(bufferedMessage)
-                this.console.error({ message: `Adapter is not set. Please ensure adapters are ready. Message ${(bufferedMessage.payload as FisMessage).header.messageID} is flushed back into buffer.` })
-            }
-        })
+        /* Previous Version: automatically subscribe to allow released bffered messages to be released */
+        // this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
+        //     // need to work with wrapped messages
+        //     this.console.log({ message: `Transmitting ${bufferedMessage.thisMessageID}` });
+        //     if (this.currentAdapter) {
+        //         this.currentAdapter.emit(this.transmissionProfile.config.source, bufferedMessage)
+        //     } else {
+        //         // just flush back the message inside the buffer, if the adapter is not ready or assigned.
+        //         this.messageToBeBuffered.next(bufferedMessage)
+        //         this.console.error({ message: `Adapter is not set. Please ensure adapters are ready. Message ${(bufferedMessage.payload as FisMessage).header.messageID} is flushed back into buffer.` })
+        //     }
+        // })
     }
 
 }

+ 1 - 1
src/utils/general.utils.ts

@@ -136,7 +136,7 @@ function isRxSubject(value: any): value is Subject<any> {
 }
 
 export function checkRxType(value: any): 'Subject' | 'Observable' | 'Observer' | 'Neither' {
-    if (isRxSubject(value)) return 'Observer'
+    if (isRxSubject(value)) return 'Observer' // for now returns observer, because subject passes as Observable as well. Can modify at later date
     if (isRxObservable(value)) return 'Observable'
     if (isRxObserver(value)) return 'Observer'
     return 'Neither';