123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- /* Receiving Component, as the name implies only deals with receving data. Same concept as the transmitter, it will subscribe for
- receving adapters from adapter manager, and use the ones that are available.
- Note for enhancements in the future;
- i) Logic to dynamically switch adapters, either based on their connection status or other factors
- ii) Enabling the use of mutli adapters usage to increase bandwith for data transmission. (More Advanced)
- */
- import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription, Unsubscribable } from 'rxjs';
- import { checkMessage, WrappedMessage } from '../utils/message.ordering';
- import ConsoleLogger from '../utils/log.utils';
- import { MessageTransmissionBase } from '../base/msg.transmission.base';
- import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransmissionConfig, TransmissionProfile, TransportMessage } from '../interface/interface';
- import { checkRxType } from '../utils/general.utils';
- export class MessageTransmissionReceiver<T> extends MessageTransmissionBase<T> {
- private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>(`OFFLINE`)
- private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
- private onHoldMessage: Subject<WrappedMessage> = new Subject()
- private currentAdapter!: ReceiverAdapterInterface<any>
- private incomingMessage: Subject<GeneralEvent<TransportMessage>> = new Subject()
- constructor(config: TransmissionConfig, transmissionEvent: Observable<GeneralEvent<any>>) {
- super(config, transmissionEvent)
- this.setTransmissionProfile(`Receiver`, config)
- this.console.log({ message: `Constructing Receiver Transmission for Receiving target: ${this.transmissionProfile.config.target}` })
- this.initializeReceiverComponents(transmissionEvent)
- }
- public subscribe(param: Observer<any> | Observable<any>): Unsubscribable | null {
- if (checkRxType(param) === `Observer`) {
- this.console.log({ message: `Is Observer` });
- return this.getReceivables().subscribe(param as Observer<any>)
- } else {
- return null
- }
- }
- public getReceivables(): Observable<FisMessage> {
- return new Observable((receivable: Observer<FisMessage>) => {
- this.console.log({ message: `Tranmission Subscription: Streaming incoming messages from ${this.transmissionProfile.config.target}` })
- const subscription: Subscription = this.incomingMessage.pipe(
- filter((event: GeneralEvent<any>) => event.type == `Adapter Event`),
- filter((event: GeneralEvent<any>) => event.event == 'New Message'),
- ).subscribe((event: GeneralEvent<TransportMessage>) => {
- this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
- checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
- // only release the message before it exists
- this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
- receivable.next(((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage);
- }).catch((error) => {
- this.console.log({ message: `Observer Error`, details: error })
- })
- })
- // Clean up on unsubscription
- return () => {
- subscription.unsubscribe();
- };
- })
- }
- /* Assigned and update adapters record. Currently no logic to swtich adapters based on performance or whatever logic to be integrated in the future */
- private initializeReceiverComponents(transmissionEvent: Observable<GeneralEvent<AdapterInterface<any>>>): void {
- transmissionEvent.pipe(
- filter(event => event.type === `Transmission Event`),
- filter(event => event.event === `New Adapter`),
- filter(event => (event.data as AdapterInterface<any>).getAdapterProfile(`clientId`) === this.transmissionProfile.config.target),
- filter(event => (event.data as AdapterInterface<any>).getAdapterProfile(`role`) === `Receiver`),
- map(event => { return event.data as AdapterInterface<any> })
- ).subscribe((adapter: AdapterInterface<any>) => {
- this.adapters.push(adapter)
- this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} receiving adapter. Current adapter length: ${this.adapters.length}` })
- if (!this.currentAdapter) {
- this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`id`)} as current adapter.` })
- this.currentAdapter = adapter as ReceiverAdapterInterface<any>
- this.currentAdapter.subscribeForIncoming().subscribe({
- next: (message: GeneralEvent<TransportMessage>) => {
- this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).source)}`, details: message })
- this.incomingMessage.next(message)
- },
- error: error => {
- // Error handling. Idealling switching to other adapters
- }
- })
- let connectionState: Observable<ConnectionState> = this.currentAdapter.getAdapterProfile(`connectionState`) as Observable<ConnectionState>
- connectionState.subscribe(this.connectionStateEvent)
- } else {
- this.currentAdapter.subscribeForIncoming().subscribe({
- next: (message: GeneralEvent<TransportMessage>) => this.incomingMessage.next(message),
- error: error => {
- // Error handling. Idealling switching to other adapters
- }
- })
- }
- })
- }
- }
|