retransmission.service.ts 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import { BehaviorSubject, buffer, distinct, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
  2. import { v4 as uuidV4 } from 'uuid';
  3. import { sortMessageBasedOnDate, WrappedMessage } from "./message.ordering";
  4. import ConsoleLogger from "./log.utils";
  5. export class RetransmissionService {
  6. private console: ConsoleLogger = new ConsoleLogger(`RetransmissionService`, ['retransmission'])
  7. private currentMessageId!: string | null
  8. private sortMessage: boolean = false
  9. private bufferReleaseSignal: Subject<void> = new Subject()
  10. private receiverConnectionState: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
  11. private transmissionState: BehaviorSubject<TransmissionState> = new BehaviorSubject<TransmissionState>('ARRAY EMPTY')
  12. private arrayToBeTransmitted: Subject<WrappedMessage[]> = new Subject()
  13. private toBeWrapped: Subject<any> = new Subject()
  14. private wrappedMessageToBeBuffered: Subject<WrappedMessage> = new Subject()
  15. private messageToBeTransmitted: Subject<WrappedMessage> = new Subject()
  16. // Interface
  17. public implementRetransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<ConnectionState>, wantMessageOrdering?: boolean) {
  18. if (wantMessageOrdering) {
  19. this.sortMessage = true
  20. this.console.log({ message: `Message ordering is set to ${this.sortMessage}` })
  21. }
  22. eventListener.pipe(distinctUntilChanged()).subscribe(event => this.receiverConnectionState.next(event))
  23. this.startWrappingOperation()
  24. this.startBufferTransmisionProcess()
  25. this.linkEventListenerToBufferSignal()
  26. payloadToBeTransmitted.subscribe((message) => {
  27. this.toBeWrapped.next(message)
  28. })
  29. }
  30. public returnSubjectForBufferedItems(): Observable<WrappedMessage> {
  31. return this.messageToBeTransmitted.asObservable()
  32. }
  33. private startWrappingOperation() {
  34. this.toBeWrapped.subscribe(message => {
  35. this.wrappedMessageToBeBuffered.next(this.wrapMessageWithTimeReceived(message, this.currentMessageId ? this.currentMessageId : null))
  36. })
  37. // wrappedMessageToBeBuffered will then be pushed to buffer
  38. this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
  39. this.console.log({ message: `${bufferedMessages.length > 0 ? `${bufferedMessages.length} buffered messages` : `No buffered messages at the moment`} ` })
  40. // console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
  41. this.arrayToBeTransmitted.next(sortMessageBasedOnDate(bufferedMessages))
  42. // this.arrayToBeTransmitted.next((this.sortMessage && bufferedMessages.length > 0) ? sortMessageBasedOnDate(bufferedMessages) : bufferedMessages)
  43. });
  44. }
  45. private wrapMessageWithTimeReceived(message: any, previousMessageID: string | null): WrappedMessage {
  46. // check if message has already a time received property if so no need to add anymore
  47. if (!message.timeReceived) {
  48. let WrappedMessage: WrappedMessage = {
  49. timeReceived: new Date(),
  50. payload: message,
  51. thisMessageID: uuidV4(),
  52. previousMessageID: previousMessageID
  53. }
  54. // console.log(`Current`, WrappedMessage.thisMessageID, 'Previous for this message:', WrappedMessage.previousMessageID)
  55. this.currentMessageId = WrappedMessage.thisMessageID as string
  56. // console.log(`Updating: `, this.currentMessageId)
  57. return WrappedMessage
  58. } else {
  59. return message as WrappedMessage
  60. }
  61. }
  62. private startBufferTransmisionProcess() {
  63. this.console.log({ message: `StartBufferTransmissionProcess` })
  64. this.arrayToBeTransmitted.subscribe(array => {
  65. if (array.length > 0) {
  66. this.transmissionState.next('TRANSMITTING')
  67. from(array).subscribe({
  68. next: (message: WrappedMessage) => {
  69. if (this.receiverConnectionState.getValue() == 'OFFLINE') {
  70. // buffer this message. Flush it back to buffer
  71. this.wrappedMessageToBeBuffered.next(message)
  72. }
  73. if (this.receiverConnectionState.getValue() == 'ONLINE') {
  74. this.messageToBeTransmitted.next(message)
  75. }
  76. },
  77. error: err => console.error(err),
  78. complete: () => {
  79. // update transmission state to indicate this batch is completed
  80. this.transmissionState.next('ARRAY EMPTY');
  81. if (this.receiverConnectionState.getValue() === 'ONLINE' && this.transmissionState.getValue() === 'ARRAY EMPTY') {
  82. setTimeout(() => {
  83. this.bufferReleaseSignal.next()
  84. }, 1000)
  85. }
  86. // Do nothing if the receiver connection is offline
  87. }
  88. });
  89. } else {
  90. // If I don't do setTimeout, then bufferrelasesignal will be overloaded
  91. if (this.receiverConnectionState.getValue() === 'ONLINE') {
  92. setTimeout(() => {
  93. this.bufferReleaseSignal.next()
  94. }, 3000)
  95. }
  96. }
  97. }
  98. )
  99. }
  100. private linkEventListenerToBufferSignal() {
  101. this.receiverConnectionState.pipe(
  102. distinctUntilChanged()
  103. ).subscribe(clientState => {
  104. this.console.log({ message: `Client is now ${clientState}. ${(clientState === 'OFFLINE') ? 'Buffering Mode Active...' : 'Releasing Buffered Messages...'}` })
  105. if (clientState == 'OFFLINE') {
  106. this.console.log({ message: `Current transmission state: ${this.transmissionState.getValue()}` })
  107. // just keep buffering
  108. }
  109. if (clientState == 'ONLINE') {
  110. this.console.log({ message: `Current transmission state: ${this.transmissionState.getValue()}` })
  111. // get the stored messages to pump it back into the buffer to be ready to be processed immediately
  112. if (this.transmissionState.getValue() == 'ARRAY EMPTY') {
  113. this.bufferReleaseSignal.next()
  114. }
  115. }
  116. })
  117. }
  118. }
  119. type ConnectionState = 'ONLINE' | 'OFFLINE'
  120. type TransmissionState = 'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'