/* Receiving Component, as the name implies only deals with receving data. Same concept as the transmitter, it will subscribe for receving adapters from adapter manager, and use the ones that are available. Note for enhancements in the future; i) Logic to dynamically switch adapters, either based on their connection status or other factors ii) Enabling the use of mutli adapters usage to increase bandwith for data transmission. (More Advanced) */ import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription, Unsubscribable } from 'rxjs'; import { checkMessage, WrappedMessage } from '../utils/message.ordering'; import ConsoleLogger from '../utils/log.utils'; import { MessageTransmissionBase } from '../base/msg.transmission.base'; import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransmissionConfig, TransmissionProfile, TransportMessage } from '../interface/interface'; import { checkRxType } from '../utils/general.utils'; export class MessageTransmissionReceiver extends MessageTransmissionBase { private connectionStateEvent: BehaviorSubject = new BehaviorSubject(`OFFLINE`) private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission']) private onHoldMessage: Subject = new Subject() private currentAdapter!: ReceiverAdapterInterface private incomingMessage: Subject> = new Subject() constructor(config: TransmissionConfig, transmissionEvent: Observable>) { super(config, transmissionEvent) this.setTransmissionProfile(`Receiver`, config) this.console.log({ message: `Constructing Receiver Transmission for Receiving target: ${this.transmissionProfile.config.target}` }) this.initializeReceiverComponents(transmissionEvent) } public subscribe(param: Observer | Observable): Unsubscribable | null { if (checkRxType(param) === `Observer`) { this.console.log({ message: `Is Observer` }); return this.getReceivables().subscribe(param as Observer) } else { return null } } public getReceivables(): Observable { return new Observable((receivable: Observer) => { this.console.log({ message: `Tranmission Subscription: Streaming incoming messages from ${this.transmissionProfile.config.target}` }) const subscription: Subscription = this.incomingMessage.pipe( filter((event: GeneralEvent) => event.type == `Adapter Event`), filter((event: GeneralEvent) => event.event == 'New Message'), ).subscribe((event: GeneralEvent) => { this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage)) checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => { // only release the message before it exists this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` }) receivable.next(((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage); }).catch((error) => { this.console.log({ message: `Observer Error`, details: error }) }) }) // Clean up on unsubscription return () => { subscription.unsubscribe(); }; }) } /* Assigned and update adapters record. Currently no logic to swtich adapters based on performance or whatever logic to be integrated in the future */ private initializeReceiverComponents(transmissionEvent: Observable>>): void { transmissionEvent.pipe( filter(event => event.type === `Transmission Event`), filter(event => event.event === `New Adapter`), filter(event => (event.data as AdapterInterface).getAdapterProfile(`clientId`) === this.transmissionProfile.config.target), filter(event => (event.data as AdapterInterface).getAdapterProfile(`role`) === `Receiver`), map(event => { return event.data as AdapterInterface }) ).subscribe((adapter: AdapterInterface) => { this.adapters.push(adapter) this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} receiving adapter. Current adapter length: ${this.adapters.length}` }) if (!this.currentAdapter) { this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`id`)} as current adapter.` }) this.currentAdapter = adapter as ReceiverAdapterInterface this.currentAdapter.subscribeForIncoming().subscribe({ next: (message: GeneralEvent) => { this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).source)}`, details: message }) this.incomingMessage.next(message) }, error: error => { // Error handling. Idealling switching to other adapters } }) let connectionState: Observable = this.currentAdapter.getAdapterProfile(`connectionState`) as Observable connectionState.subscribe(this.connectionStateEvent) } else { this.currentAdapter.subscribeForIncoming().subscribe({ next: (message: GeneralEvent) => this.incomingMessage.next(message), error: error => { // Error handling. Idealling switching to other adapters } }) } }) } }