/* This is more unique transmission component, because it's supposed to emulate a conventional request response call. But the underlying mechanism is still using message. Here as you will see, it is basically taking the already instantiated transmitter and receiver components and basically just filtering the responses based on whatever identifier it needs for the orignal request. */ import { MessageTransmissionBase } from "../base/msg.transmission.base"; import { filter, Observable, Observer, Subscription, Unsubscribable } from "rxjs"; import { FisMessage, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransmissionConfig, TransmissionProfile } from "../interface/interface"; import ConsoleLogger from "../utils/log.utils"; import { checkRxType } from "../utils/general.utils"; export class MessageTransmissionRequestResponse implements MessageRequestResponseInterface { private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionRequestResponse`, ['transmission']) private transmissionProfile!: TransmissionProfile protected transmitterInstance!: MessageTransmitterInterface; protected receiverInstance!: MessageReceiverInterface; constructor(config: TransmissionConfig, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface) { this.console.log({ message: `Constructing Request Response Transmission for Receiving target: ${config.target}` }) this.transmitterInstance = transmitterInstance this.receiverInstance = receiverInstance } getTransmissionProfile(type?: `config` | `id` | `role`): TransmissionProfile | TransmissionConfig | string | undefined { return type ? this.transmissionProfile[type] : this.transmissionProfile; } // this is still broken at this point in time. DO NOT USE!!!! subscribe(observer: Observer | Observable): Unsubscribable | null { if (checkRxType(observer) === `Observable`) { // Create a new Subscription to manage unsubscription const subscription = (observer as Observable).subscribe(message => { this.send(message).subscribe(response => { this.console.log({ message: `Response received: ${response}` }); }); }); return subscription; // Return the Subscription (Unsubscribable) } else { return null; } } send(message: FisMessage): Observable { return new Observable((response: Observer) => { // logic here if (this.transmitterInstance && this.receiverInstance) { this.transmitterInstance.emit(message) const subscription: Subscription = this.receiverInstance.getReceivables().pipe( filter(event => event.header.messageID === message.header.messageID), ).subscribe({ next: (message: FisMessage) => { if (message.data == 'Complete') { response.complete() } else { response.next(message) } }, error: error => console.error(error) }) // Clean up on unsubscription return () => { subscription.unsubscribe(); } } else { response.error(new Error('Transmitter or receiver instance is missing.')); return; } }); } }