123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import { BehaviorSubject, buffer, distinct, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
- import { v4 as uuidV4 } from 'uuid';
- import { sortMessageBasedOnDate, WrappedMessage } from "./message.ordering";
- import ConsoleLogger from "./log.utils";
- export class RetransmissionService {
- private console: ConsoleLogger = new ConsoleLogger(`RetransmissionService`, ['retransmission'])
- private currentMessageId!: string | null
- private sortMessage: boolean = false
- private bufferReleaseSignal: Subject<void> = new Subject()
- private receiverConnectionState: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
- private transmissionState: BehaviorSubject<TransmissionState> = new BehaviorSubject<TransmissionState>('ARRAY EMPTY')
- private arrayToBeTransmitted: Subject<WrappedMessage[]> = new Subject()
- private toBeWrapped: Subject<any> = new Subject()
- private wrappedMessageToBeBuffered: Subject<WrappedMessage> = new Subject()
- private messageToBeTransmitted: Subject<WrappedMessage> = new Subject()
- // Interface
- public implementRetransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<ConnectionState>, wantMessageOrdering?: boolean) {
- if (wantMessageOrdering) {
- this.sortMessage = true
- this.console.log({ message: `Message ordering is set to ${this.sortMessage}` })
- }
- eventListener.pipe(distinctUntilChanged()).subscribe(event => this.receiverConnectionState.next(event))
- this.startWrappingOperation()
- this.startBufferTransmisionProcess()
- this.linkEventListenerToBufferSignal()
- payloadToBeTransmitted.subscribe((message) => {
- this.toBeWrapped.next(message)
- })
- }
- public returnSubjectForBufferedItems(): Observable<WrappedMessage> {
- return this.messageToBeTransmitted.asObservable()
- }
- private startWrappingOperation() {
- this.toBeWrapped.subscribe(message => {
- this.wrappedMessageToBeBuffered.next(this.wrapMessageWithTimeReceived(message, this.currentMessageId ? this.currentMessageId : null))
- })
- // wrappedMessageToBeBuffered will then be pushed to buffer
- this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
- this.console.log({ message: `${bufferedMessages.length > 0 ? `${bufferedMessages.length} buffered messages` : `No buffered messages at the moment`} ` })
- // console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
- this.arrayToBeTransmitted.next(sortMessageBasedOnDate(bufferedMessages))
- // this.arrayToBeTransmitted.next((this.sortMessage && bufferedMessages.length > 0) ? sortMessageBasedOnDate(bufferedMessages) : bufferedMessages)
- });
- }
- private wrapMessageWithTimeReceived(message: any, previousMessageID: string | null): WrappedMessage {
- // check if message has already a time received property if so no need to add anymore
- if (!message.timeReceived) {
- let WrappedMessage: WrappedMessage = {
- timeReceived: new Date(),
- payload: message,
- thisMessageID: uuidV4(),
- previousMessageID: previousMessageID
- }
- // console.log(`Current`, WrappedMessage.thisMessageID, 'Previous for this message:', WrappedMessage.previousMessageID)
- this.currentMessageId = WrappedMessage.thisMessageID as string
- // console.log(`Updating: `, this.currentMessageId)
- return WrappedMessage
- } else {
- return message as WrappedMessage
- }
- }
- private startBufferTransmisionProcess() {
- this.console.log({ message: `StartBufferTransmissionProcess` })
- this.arrayToBeTransmitted.subscribe(array => {
- if (array.length > 0) {
- this.transmissionState.next('TRANSMITTING')
- from(array).subscribe({
- next: (message: WrappedMessage) => {
- if (this.receiverConnectionState.getValue() == 'OFFLINE') {
- // buffer this message. Flush it back to buffer
- this.wrappedMessageToBeBuffered.next(message)
- }
- if (this.receiverConnectionState.getValue() == 'ONLINE') {
- this.messageToBeTransmitted.next(message)
- }
- },
- error: err => console.error(err),
- complete: () => {
- // update transmission state to indicate this batch is completed
- this.transmissionState.next('ARRAY EMPTY');
- if (this.receiverConnectionState.getValue() === 'ONLINE' && this.transmissionState.getValue() === 'ARRAY EMPTY') {
- setTimeout(() => {
- this.bufferReleaseSignal.next()
- }, 1000)
- }
- // Do nothing if the receiver connection is offline
- }
- });
- } else {
- // If I don't do setTimeout, then bufferrelasesignal will be overloaded
- if (this.receiverConnectionState.getValue() === 'ONLINE') {
- setTimeout(() => {
- this.bufferReleaseSignal.next()
- }, 3000)
- }
- }
- }
- )
- }
- private linkEventListenerToBufferSignal() {
- this.receiverConnectionState.pipe(
- distinctUntilChanged()
- ).subscribe(clientState => {
- this.console.log({ message: `Client is now ${clientState}. ${(clientState === 'OFFLINE') ? 'Buffering Mode Active...' : 'Releasing Buffered Messages...'}` })
- if (clientState == 'OFFLINE') {
- this.console.log({ message: `Current transmission state: ${this.transmissionState.getValue()}` })
- // just keep buffering
- }
- if (clientState == 'ONLINE') {
- this.console.log({ message: `Current transmission state: ${this.transmissionState.getValue()}` })
- // get the stored messages to pump it back into the buffer to be ready to be processed immediately
- if (this.transmissionState.getValue() == 'ARRAY EMPTY') {
- this.bufferReleaseSignal.next()
- }
- }
- })
- }
- }
- type ConnectionState = 'ONLINE' | 'OFFLINE'
- type TransmissionState = 'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'
|