123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- /* Transmitter components, as the name implies, solely created to transmit messages only. Will subscribe for adapters from
- adapter manager to acquire adapters. Once adaptesr are required, it will just pick the one that is currently online, and
- assciate that connection status with the buffer service / offline retransmission to start sending buffered messages.
- Note for enhancements in the future;
- i) Logic to dynamically switch adapters, either based on their connection status or other factors
- ii) Enabling the use of mutli adapters usage to increase bandwith for data transmission. (More Advanced)
- */
- import { MessageTransmissionBase } from "../base/msg.transmission.base";
- import { v4 as uuidv4 } from 'uuid'
- import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs";
- import { RetransmissionService } from "../utils/retransmission.service";
- import { WrappedMessage } from "../utils/message.ordering";
- import ConsoleLogger from "../utils/log.utils";
- import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmissionConfig, TransmissionProfile, TransmitterAdapterInterface, TransportMessage } from "../interface/interface";
- import { checkRxType } from "../utils/general.utils";
- export class MessageTransmissionTransmitter<T> extends MessageTransmissionBase<T> {
- private internalObservable: Observable<GeneralEvent<T>> = new Observable()
- private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
- private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
- private messageToBeBuffered!: Subject<FisMessage | WrappedMessage>
- private buffer!: RetransmissionService;
- private currentAdapter!: TransmitterAdapterInterface<any>
- constructor(config: TransmissionConfig, transmissionEvent: Observable<GeneralEvent<any>>) {
- super(config, transmissionEvent)
- this.setTransmissionProfile(`Transmitter`, config)
- this.console.log({ message: `Constructing Transmitter Transmission for Receiving target: ${this.transmissionProfile.config.target}` })
- this.messageToBeBuffered = new Subject()
- this.buffer = new RetransmissionService()
- this.initializeTransmitterComponents(transmissionEvent)
- }
- public subscribe(observer: Observer<any> | Observable<any>): Unsubscribable | null {
- if (checkRxType(observer) === `Observable`) {
- this.console.log({ message: `Is Observable` });
- // Create a new Subscription to manage unsubscription
- const subscription = (observer as Observable<any>).subscribe(message => {
- this.emit(message);
- this.console.log({ message: `Message ${message.header?.messageID ?? `Undefined`} being processed... ` });
- });
- return subscription; // Return the Subscription (Unsubscribable)
- } else {
- return null;
- }
- }
- public emit(message: FisMessage): void {
- // this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting...` : `Buffering...`}` })
- this.messageToBeBuffered.next(message)
- }
- /* After setting up, will listen specifically to the connection state of this particular remote client. So that, the buffer signal can be
- established to allow the buffer to do their thing. */
- private initializeTransmitterComponents(transmissionEvent: Observable<GeneralEvent<AdapterInterface<any>>>): void {
- this.console.log({ message: `Setting up Retransmission Service...` })
- // Listen and update adapters
- transmissionEvent.pipe(
- filter(event => event.type === `Transmission Event`),
- filter(event => event.event === `New Adapter`),
- filter(event => (event.data as AdapterInterface<any>).getAdapterProfile(`clientId`) === this.transmissionProfile.config.target),
- filter(event => (event.data as AdapterInterface<any>).getAdapterProfile(`role`) === `Transmitter`),
- map(event => { return event.data as AdapterInterface<any> })
- ).subscribe((adapter: AdapterInterface<any>) => {
- this.adapters.push(adapter)
- this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} transmitting adapter. Current adapter length: ${this.adapters.length}` })
- if (!this.currentAdapter) {
- this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`id`)} as current adapter.` })
- this.currentAdapter = adapter as TransmitterAdapterInterface<T>
- let connectionState: Observable<ConnectionState> = this.currentAdapter.getAdapterProfile('connectionState') as Observable<ConnectionState>
- connectionState.subscribe(this.connectionStateEvent)
- }
- })
- this.buffer.implementRetransmission(this.messageToBeBuffered, this.connectionStateEvent.asObservable(), true)
- // automatically subscribe to allow released bffered messages to be released
- this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
- // need to work with wrapped messages
- this.console.log({ message: `Transmitting ${bufferedMessage.thisMessageID}` });
- if (this.currentAdapter) {
- this.currentAdapter.emit(this.transmissionProfile.config.source, bufferedMessage)
- } else {
- // just flush back the message inside the buffer, if the adapter is not ready or assigned.
- this.messageToBeBuffered.next(bufferedMessage)
- this.console.error({ message: `Adapter is not set. Please ensure adapters are ready. Message ${(bufferedMessage.payload as FisMessage).header.messageID} is flushed back into buffer.` })
- }
- })
- }
- }
|