msg.transmission.transmitter.ts 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import { MessageTransmissionBase } from "../base/msg.transmission.base";
  2. import { v4 as uuidv4 } from 'uuid'
  3. import { BehaviorSubject, distinct, distinctUntilChanged, filter, map, Observable, Subject, Subscription } from "rxjs";
  4. import { RetransmissionService } from "../utils/retransmission.service";
  5. import { WrappedMessage } from "../utils/message.ordering";
  6. import ConsoleLogger from "../utils/log.utils";
  7. import { AdapterInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmitterAdapterInterface, TransportMessage } from "../interface/interface";
  8. import { error } from "console";
  9. /* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
  10. connectors or adapters will have their own identifier*/
  11. export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
  12. private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
  13. private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
  14. private messageToBeBuffered!: Subject<FisMessage | WrappedMessage>
  15. private messageToBeTransmitted!: Subject<WrappedMessage>
  16. private buffer!: RetransmissionService;
  17. private currentAdapter!: TransmitterAdapterInterface
  18. constructor(clientId: string, event: Subject<GeneralEvent<any>>) {
  19. super()
  20. this.console.log({ message: `Constructing Transmitter Transmission with ${clientId}` })
  21. this.event = event
  22. this.messageToBeTransmitted = new Subject()
  23. this.messageToBeBuffered = new Subject()
  24. this.buffer = new RetransmissionService()
  25. this.handleAdapters(this.event)
  26. this.setupBuffer()
  27. // special case just for http in case of server/client disconnected, the unsent msg will be flushed back into messageToBeBuffered
  28. // logic here
  29. }
  30. public emit(message: FisMessage): void {
  31. this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting message` : `Buffering message`}` })
  32. this.messageToBeBuffered.next(message)
  33. }
  34. private setupBuffer(): void {
  35. this.console.log({ message: `Setting up Retransmission Service...` })
  36. this.event.pipe(
  37. filter(event => event.data.clientId == this.clientId),
  38. filter(event => event.event == 'Client Disconnected' || event.event == 'Client Re-connected' || event.event == 'Client Connected' || event.event == 'Server Disconnected' || event.event == 'Server Connected'),
  39. map(event => {
  40. if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
  41. return 'OFFLINE'
  42. } else {
  43. return `ONLINE`
  44. }
  45. }),
  46. distinctUntilChanged()
  47. ).subscribe((signal: ConnectionState) => {
  48. this.connectionStateEvent.next(signal)
  49. if (signal == 'OFFLINE') this.console.error({ message: `${this.clientId} disconnected` })
  50. if (signal == 'ONLINE') this.console.log({ message: `${this.clientId} connected` })
  51. })
  52. this.buffer.implementRetransmission(this.messageToBeBuffered, this.connectionStateEvent.asObservable(), true)
  53. // automatically subscribe to allow released bffered messages to be released
  54. this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
  55. // need to work with wrapped messages
  56. this.console.log({ message: `Releasing ${bufferedMessage.thisMessageID}` });
  57. if (this.currentAdapter) {
  58. // this.currentAdapter.emit(bufferedMessage)
  59. this.messageToBeTransmitted.next(bufferedMessage)
  60. } else {
  61. this.messageToBeBuffered.next(bufferedMessage)
  62. this.console.error({ message: `Adapter is not set. Please ensure adapters are ready.` })
  63. }
  64. })
  65. }
  66. private handleAdapters(adaptersEvent: Subject<GeneralEvent<any>>): void {
  67. adaptersEvent.pipe(
  68. filter(event => event.event === `New Adapter`),
  69. map(event => { return event.data }),
  70. ).subscribe({
  71. next: (adapters: AdapterInterface[]) => {
  72. adapters.forEach((adapter: AdapterInterface) => {
  73. if (adapter.role === `Transmitter`) {
  74. this.adapters.push(adapter as TransmitterAdapterInterface)
  75. adaptersEvent.next({
  76. id: uuidv4(),
  77. type: 'Transmission Event',
  78. event: `New Adapter`,
  79. date: new Date(),
  80. data: adapter,
  81. transport: adapter.transport
  82. })
  83. }
  84. })
  85. },
  86. error: error => this.console.error({ message: 'Observer Error', details: error })
  87. })
  88. // listen to newly added adapters in transmission
  89. adaptersEvent.pipe(
  90. filter(event => event.type === `Transmission Event`),
  91. filter(event => event.event === `New Adapter`),
  92. map(event => {
  93. return event.data
  94. })
  95. ).subscribe((adapter: AdapterInterface) => {
  96. if (!this.currentAdapter) {
  97. this.currentAdapter = adapter as TransmitterAdapterInterface
  98. } else {
  99. this.console.log({ message: `Already have existing transmitting adapter. Currently hardcode to use only 1` })
  100. }
  101. })
  102. }
  103. // temporary logic for now.
  104. private setUpAdapter(): void {
  105. if (!this.currentAdapter && this.adapters.some(adapter => adapter.transport === `Websocket`)) {
  106. this.currentAdapter = this.adapters.find(adapter => adapter.transport === `Websocket`) as TransmitterAdapterInterface
  107. } else {
  108. this.console.error({ message: 'No websocket socket adapter avaiable' })
  109. }
  110. }
  111. private uniqueHandlerToFlushUnsentMessages(event: Observable<GeneralEvent<any>>): void {
  112. event.pipe(
  113. filter(event => event.event == 'Re-Flush'),
  114. filter(event => event.data.clientId == this.clientId),
  115. ).subscribe((event: GeneralEvent<any>) => {
  116. this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}` : `Buffering ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}`}` })
  117. this.messageToBeBuffered.next(((event.data.payload as TransportMessage).payload as WrappedMessage))
  118. })
  119. }
  120. }