import { filter, map, Observable, Observer, Subject, Subscription } from 'rxjs'; import { v4 as uuidv4 } from 'uuid' import { ReceiverAdapter } from '../adapters/adapter.receiver'; import { checkMessage, WrappedMessage } from '../utils/message.ordering'; import ConsoleLogger from '../utils/log.utils'; import { MessageTransmissionBase } from '../base/msg.transmission.base'; import { AdapterInterface, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface'; import { AdapterBase } from '../base/adapter.base'; import { IncomingMessage } from 'http'; export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface { private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission']) private onHoldMessage: Subject = new Subject() private currentAdapter!: ReceiverAdapter private incomingMessage: Subject> = new Subject() // private toBePassedOver: Subject = new Subject() constructor(clientId: string, event: Subject>) { super() this.clientId = clientId this.event = event this.handleAdapterEvent(this.event.asObservable()) } getIncoming(): Observable> { this.console.log({ message: `Transmission getting message bus for ${this.clientId}` }) return new Observable((observable: Observer>) => { const subscription: Subscription = this.incomingMessage.pipe( filter((event: GeneralEvent) => event.event == 'New Message'), ).subscribe((event: GeneralEvent) => { // console.log(event) // data is transportMessage instead of eventmessage this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage)) checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => { // only release the message before it exists this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` }) // console.log(((event.data as TransportMessage).payload as WrappedMessage)) observable.next(event); }).catch((error) => { this.console.log({ message: `Observer Error`, details: error }) }) }) // Clean up on unsubscription return () => { subscription.unsubscribe(); }; }) } private handleAdapterEvent(adapterEvent: Observable>): void { const subscription: Subscription = adapterEvent.pipe( filter(event => event.type === `Adapter Event`), filter(event => event.event === `New Adapter`), map(event => { return event.data }), filter((adapter: AdapterInterface) => adapter.role === `Receiver`), map(adapter => { return adapter as ReceiverAdapter }) ).subscribe((adapter: ReceiverAdapter) => { if (!this.adapters.some(adapterObj => adapterObj.adapterId === adapter.adapterId)) { this.adapters.push(adapter) this.currentAdapter = adapter this.currentAdapter.subscribeForIncoming().subscribe(this.incomingMessage) } else { this.console.error({ message: `Adapter ID: ${adapter.adapterId} already existed.` }) } }) } }