import { BehaviorSubject, buffer, distinct, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs"; import { v4 as uuidV4 } from 'uuid'; import { sortMessageBasedOnDate, WrappedMessage } from "./message.ordering"; import ConsoleLogger from "./log.utils"; export class RetransmissionService { private console: ConsoleLogger = new ConsoleLogger(`RetransmissionService`, ['retransmission']) private currentMessageId!: string | null private sortMessage: boolean = false private bufferReleaseSignal: Subject = new Subject() private receiverConnectionState: BehaviorSubject = new BehaviorSubject('OFFLINE') private transmissionState: BehaviorSubject = new BehaviorSubject('ARRAY EMPTY') private arrayToBeTransmitted: Subject = new Subject() private toBeWrapped: Subject = new Subject() private wrappedMessageToBeBuffered: Subject = new Subject() private messageToBeTransmitted: Subject = new Subject() // Interface public implementRetransmission(payloadToBeTransmitted: Observable, eventListener: Observable, wantMessageOrdering?: boolean) { if (wantMessageOrdering) { this.sortMessage = true this.console.log({ message: `Message ordering is set to ${this.sortMessage}` }) } eventListener.pipe(distinctUntilChanged()).subscribe(event => this.receiverConnectionState.next(event)) this.startWrappingOperation() this.startBufferTransmisionProcess() this.linkEventListenerToBufferSignal() payloadToBeTransmitted.subscribe((message) => { this.toBeWrapped.next(message) }) } public returnSubjectForBufferedItems(): Observable { return this.messageToBeTransmitted.asObservable() } private startWrappingOperation() { this.toBeWrapped.subscribe(message => { this.wrappedMessageToBeBuffered.next(this.wrapMessageWithTimeReceived(message, this.currentMessageId ? this.currentMessageId : null)) }) // wrappedMessageToBeBuffered will then be pushed to buffer this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => { this.console.log({ message: `${bufferedMessages.length > 0 ? `${bufferedMessages.length} buffered messages` : `No buffered messages at the moment`} ` }) // console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`) this.arrayToBeTransmitted.next(sortMessageBasedOnDate(bufferedMessages)) // this.arrayToBeTransmitted.next((this.sortMessage && bufferedMessages.length > 0) ? sortMessageBasedOnDate(bufferedMessages) : bufferedMessages) }); } private wrapMessageWithTimeReceived(message: any, previousMessageID: string | null): WrappedMessage { // check if message has already a time received property if so no need to add anymore if (!message.timeReceived) { let WrappedMessage: WrappedMessage = { timeReceived: new Date(), payload: message, thisMessageID: uuidV4(), previousMessageID: previousMessageID } // console.log(`Current`, WrappedMessage.thisMessageID, 'Previous for this message:', WrappedMessage.previousMessageID) this.currentMessageId = WrappedMessage.thisMessageID as string // console.log(`Updating: `, this.currentMessageId) return WrappedMessage } else { return message as WrappedMessage } } private startBufferTransmisionProcess() { this.console.log({ message: `StartBufferTransmissionProcess` }) this.arrayToBeTransmitted.subscribe(array => { if (array.length > 0) { this.transmissionState.next('TRANSMITTING') from(array).subscribe({ next: (message: WrappedMessage) => { if (this.receiverConnectionState.getValue() == 'OFFLINE') { // buffer this message. Flush it back to buffer this.wrappedMessageToBeBuffered.next(message) } if (this.receiverConnectionState.getValue() == 'ONLINE') { this.messageToBeTransmitted.next(message) } }, error: err => console.error(err), complete: () => { // update transmission state to indicate this batch is completed this.transmissionState.next('ARRAY EMPTY'); if (this.receiverConnectionState.getValue() === 'ONLINE' && this.transmissionState.getValue() === 'ARRAY EMPTY') { setTimeout(() => { this.bufferReleaseSignal.next() }, 1000) } // Do nothing if the receiver connection is offline } }); } else { // If I don't do setTimeout, then bufferrelasesignal will be overloaded if (this.receiverConnectionState.getValue() === 'ONLINE') { setTimeout(() => { this.bufferReleaseSignal.next() }, 3000) } } } ) } private linkEventListenerToBufferSignal() { this.receiverConnectionState.pipe( distinctUntilChanged() ).subscribe(clientState => { this.console.log({ message: `Client is now ${clientState}. ${(clientState === 'OFFLINE') ? 'Buffering Mode Active...' : 'Releasing Buffered Messages...'}` }) if (clientState == 'OFFLINE') { this.console.log({ message: `Current transmission state: ${this.transmissionState.getValue()}` }) // just keep buffering } if (clientState == 'ONLINE') { this.console.log({ message: `Current transmission state: ${this.transmissionState.getValue()}` }) // get the stored messages to pump it back into the buffer to be ready to be processed immediately if (this.transmissionState.getValue() == 'ARRAY EMPTY') { this.bufferReleaseSignal.next() } } }) } } type ConnectionState = 'ONLINE' | 'OFFLINE' type TransmissionState = 'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'