msg.transmission.transmitter.ts 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. /* Transmitter components, as the name implies, solely created to transmit messages only. Will subscribe for adapters from
  2. adapter manager to acquire adapters. Once adaptesr are required, it will just pick the one that is currently online, and
  3. assciate that connection status with the buffer service / offline retransmission to start sending buffered messages.
  4. Note for enhancements in the future;
  5. i) Logic to dynamically switch adapters, either based on their connection status or other factors
  6. ii) Enabling the use of mutli adapters usage to increase bandwith for data transmission. (More Advanced)
  7. */
  8. import { MessageTransmissionBase } from "../base/msg.transmission.base";
  9. import { v4 as uuidv4 } from 'uuid'
  10. import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs";
  11. import { RetransmissionService } from "../utils/retransmission.service";
  12. import { WrappedMessage } from "../utils/message.ordering";
  13. import ConsoleLogger from "../utils/log.utils";
  14. import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmissionConfig, TransmissionProfile, TransmitterAdapterInterface, TransportMessage } from "../interface/interface";
  15. import { checkRxType } from "../utils/general.utils";
  16. export class MessageTransmissionTransmitter<T> extends MessageTransmissionBase<T> {
  17. private internalObservable: Observable<GeneralEvent<T>> = new Observable()
  18. private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
  19. private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
  20. private messageToBeBuffered!: Subject<FisMessage | WrappedMessage>
  21. private buffer!: RetransmissionService;
  22. private currentAdapter!: TransmitterAdapterInterface<any>
  23. constructor(config: TransmissionConfig, transmissionEvent: Observable<GeneralEvent<any>>) {
  24. super(config, transmissionEvent)
  25. this.setTransmissionProfile(`Transmitter`, config)
  26. this.console.log({ message: `Constructing Transmitter Transmission for Receiving target: ${this.transmissionProfile.config.target}` })
  27. this.messageToBeBuffered = new Subject()
  28. this.buffer = new RetransmissionService()
  29. this.initializeTransmitterComponents(transmissionEvent)
  30. }
  31. public subscribe(observer: Observer<any> | Observable<any>): Unsubscribable | null {
  32. if (checkRxType(observer) === `Observable`) {
  33. this.console.log({ message: `Is Observable` });
  34. // Create a new Subscription to manage unsubscription
  35. const subscription = (observer as Observable<any>).subscribe(message => {
  36. this.emit(message);
  37. this.console.log({ message: `Message ${message.header?.messageID ?? `Undefined`} being processed... ` });
  38. });
  39. return subscription; // Return the Subscription (Unsubscribable)
  40. } else {
  41. return null;
  42. }
  43. }
  44. public emit(message: FisMessage): void {
  45. // this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting...` : `Buffering...`}` })
  46. this.messageToBeBuffered.next(message)
  47. }
  48. /* After setting up, will listen specifically to the connection state of this particular remote client. So that, the buffer signal can be
  49. established to allow the buffer to do their thing. */
  50. private initializeTransmitterComponents(transmissionEvent: Observable<GeneralEvent<AdapterInterface<any>>>): void {
  51. this.console.log({ message: `Setting up Retransmission Service...` })
  52. // Listen and update adapters
  53. transmissionEvent.pipe(
  54. filter(event => event.type === `Transmission Event`),
  55. filter(event => event.event === `New Adapter`),
  56. filter(event => (event.data as AdapterInterface<any>).getAdapterProfile(`clientId`) === this.transmissionProfile.config.target),
  57. filter(event => (event.data as AdapterInterface<any>).getAdapterProfile(`role`) === `Transmitter`),
  58. map(event => { return event.data as AdapterInterface<any> })
  59. ).subscribe((adapter: AdapterInterface<any>) => {
  60. this.adapters.push(adapter)
  61. this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} transmitting adapter. Current adapter length: ${this.adapters.length}` })
  62. if (!this.currentAdapter) {
  63. this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`id`)} as current adapter.` })
  64. this.currentAdapter = adapter as TransmitterAdapterInterface<T>
  65. let connectionState: Observable<ConnectionState> = this.currentAdapter.getAdapterProfile('connectionState') as Observable<ConnectionState>
  66. connectionState.subscribe(this.connectionStateEvent)
  67. }
  68. })
  69. this.buffer.implementRetransmission(this.messageToBeBuffered, this.connectionStateEvent.asObservable(), true)
  70. // automatically subscribe to allow released bffered messages to be released
  71. this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
  72. // need to work with wrapped messages
  73. this.console.log({ message: `Transmitting ${bufferedMessage.thisMessageID}` });
  74. if (this.currentAdapter) {
  75. this.currentAdapter.emit(this.transmissionProfile.config.source, bufferedMessage)
  76. } else {
  77. // just flush back the message inside the buffer, if the adapter is not ready or assigned.
  78. this.messageToBeBuffered.next(bufferedMessage)
  79. this.console.error({ message: `Adapter is not set. Please ensure adapters are ready. Message ${(bufferedMessage.payload as FisMessage).header.messageID} is flushed back into buffer.` })
  80. }
  81. })
  82. }
  83. }