import { MessageTransmissionBase } from "../base/msg.transmission.base"; import { filter, map, Observable, Observer, Subject, Subscription, takeWhile } from "rxjs"; import { v4 as uuidv4 } from 'uuid' import { MessageTransmissionReceiver } from "./msg.transmission.receiver"; import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter"; import { AdapterInterface, FisMessage, GeneralEvent, MessageRequestResponseInterface, TransportMessage } from "../interface/interface"; import { WrappedMessage } from "../utils/message.ordering"; export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface { transmitterInstance!: MessageTransmissionTransmitter; receiverInstance!: MessageTransmissionReceiver; constructor(clientId: string, transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, event: Subject>) { super() this.clientId = clientId this.transmitterInstance = transmitterInstance this.receiverInstance = receiverInstance this.event = event } send(message: FisMessage): Observable { return new Observable((response: Observer) => { // logic here if (this.transmitterInstance && this.receiverInstance) { this.transmitterInstance.emit(message) const subscription: Subscription = this.receiverInstance.getIncoming().pipe( filter(event => event.event === `New Message`), filter(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID === message.header.messageID), map(event => { return (event.data as TransportMessage).payload as FisMessage }) ).subscribe({ next: (message: FisMessage) => { if (message.data == 'Complete') { response.complete() } else { response.next(message) } }, error: error => console.error(error) }) // Clean up on unsubscription return () => { subscription.unsubscribe(); } } else { response.error(new Error('Transmitter or receiver instance is missing.')); return; } }); } }