/* Transmitter components, as the name implies, solely created to transmit messages only. Will subscribe for adapters from adapter manager to acquire adapters. Once adaptesr are required, it will just pick the one that is currently online, and assciate that connection status with the buffer service / offline retransmission to start sending buffered messages. Note for enhancements in the future; i) Logic to dynamically switch adapters, either based on their connection status or other factors ii) Enabling the use of mutli adapters usage to increase bandwith for data transmission. (More Advanced) */ import { MessageTransmissionBase } from "../base/msg.transmission.base"; import { v4 as uuidv4 } from 'uuid' import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs"; import { RetransmissionService } from "../utils/retransmission.service"; import { WrappedMessage } from "../utils/message.ordering"; import ConsoleLogger from "../utils/log.utils"; import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmissionConfig, TransmissionProfile, TransmitterAdapterInterface, TransportMessage } from "../interface/interface"; import { checkRxType } from "../utils/general.utils"; export class MessageTransmissionTransmitter extends MessageTransmissionBase { private internalObservable: Observable> = new Observable() private connectionStateEvent: BehaviorSubject = new BehaviorSubject('OFFLINE') private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission']) private messageToBeBuffered!: Subject private buffer!: RetransmissionService; private currentAdapter!: TransmitterAdapterInterface constructor(config: TransmissionConfig, transmissionEvent: Observable>) { super(config, transmissionEvent) this.setTransmissionProfile(`Transmitter`, config) this.console.log({ message: `Constructing Transmitter Transmission for Receiving target: ${this.transmissionProfile.config.target}` }) this.messageToBeBuffered = new Subject() this.buffer = new RetransmissionService() this.initializeTransmitterComponents(transmissionEvent) } public subscribe(observer: Observer | Observable): Unsubscribable | null { if (checkRxType(observer) === `Observable`) { this.console.log({ message: `Is Observable` }); // Create a new Subscription to manage unsubscription const subscription = (observer as Observable).subscribe(message => { this.emit(message); this.console.log({ message: `Message ${message.header?.messageID ?? `Undefined`} being processed... ` }); }); return subscription; // Return the Subscription (Unsubscribable) } else { return null; } } public emit(message: FisMessage): void { // this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting...` : `Buffering...`}` }) this.messageToBeBuffered.next(message) } /* After setting up, will listen specifically to the connection state of this particular remote client. So that, the buffer signal can be established to allow the buffer to do their thing. */ private initializeTransmitterComponents(transmissionEvent: Observable>>): void { this.console.log({ message: `Setting up Retransmission Service...` }) // Listen and update adapters transmissionEvent.pipe( filter(event => event.type === `Transmission Event`), filter(event => event.event === `New Adapter`), filter(event => (event.data as AdapterInterface).getAdapterProfile(`clientId`) === this.transmissionProfile.config.target), filter(event => (event.data as AdapterInterface).getAdapterProfile(`role`) === `Transmitter`), map(event => { return event.data as AdapterInterface }) ).subscribe((adapter: AdapterInterface) => { this.adapters.push(adapter) this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} transmitting adapter. Current adapter length: ${this.adapters.length}` }) if (!this.currentAdapter) { this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`id`)} as current adapter.` }) this.currentAdapter = adapter as TransmitterAdapterInterface let connectionState: Observable = this.currentAdapter.getAdapterProfile('connectionState') as Observable connectionState.subscribe(this.connectionStateEvent) } }) this.buffer.implementRetransmission(this.messageToBeBuffered, this.connectionStateEvent.asObservable(), true) // automatically subscribe to allow released bffered messages to be released this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => { // need to work with wrapped messages this.console.log({ message: `Transmitting ${bufferedMessage.thisMessageID}` }); if (this.currentAdapter) { this.currentAdapter.emit(this.transmissionProfile.config.source, bufferedMessage) } else { // just flush back the message inside the buffer, if the adapter is not ready or assigned. this.messageToBeBuffered.next(bufferedMessage) this.console.error({ message: `Adapter is not set. Please ensure adapters are ready. Message ${(bufferedMessage.payload as FisMessage).header.messageID} is flushed back into buffer.` }) } }) } }