import { filter, Observable, Observer, Subject, Subscription } from 'rxjs'; import { TransportEvent, TransportMessage } from '../interface/connector.interface'; import { MessageTransmissionBase } from './msg.transmission.base'; import { Bus, MessageReceiver as MessageReceiverInterface, ReceiverProfile } from '../interface/transport.interface' import { v4 as uuidv4 } from 'uuid' import { ReceiverAdapter } from '../connector/adapter.receiver'; import { checkMessage, WrappedMessage } from '../utils/message.ordering'; import ConsoleLogger from '../utils/log.utils'; import { Adapter } from '../connector/adapter.base'; export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface { private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission']) private onHoldMessage: Subject = new Subject() // private toBePassedOver: Subject = new Subject() receiverProfile!: ReceiverProfile; constructor(profile: ReceiverProfile, adapter: ReceiverAdapter, event: Observable) { super() this.event = event this.console.log({ message: `Constructing Receiver Transmission with ${profile.name}` }) this.setReceiver(profile) this.setUpAdapter(adapter) } setReceiver(receiverProfile: ReceiverProfile): void { this.receiverProfile = receiverProfile } getMessageBus(bus: Bus): Observable { this.console.log({ message: `Transmission getting message bus for ${this.receiverProfile.id}` }) return new Observable((observable: Observer) => { // logic here if (bus == Bus.GeneralBus) { // Need to merge all the adapters into one when the time comes // SAMPLE: This adapterArray.forEach(adapter => { ... }) const subscription: Subscription = (this.mainAdapter as ReceiverAdapter).getMessageBus(Bus.GeneralBus).pipe( filter((event: TransportEvent) => event.event == 'New Message'), ).subscribe((event: TransportEvent) => { // console.log(event) // data is transportMessage instead of eventmessage this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage)) checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => { // only release the message before it exists this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` }) // console.log(((event.data as TransportMessage).payload as WrappedMessage)) observable.next(event); }).catch((error) => { this.console.log({ message: `Observer Error`, details: error }) }) }); // Clean up on unsubscription return () => { subscription.unsubscribe(); }; } }) } setUpAdapter(adapter: Adapter): void { this.mainAdapter = adapter } }