import dotenv from 'dotenv'; import { Bus, FisMessage } from "../interface/transport.interface"; import { RequestResponseAdapter as RequestResponseAdapterInterface, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface"; import { filter, map, Observable, Observer, Subscription, takeWhile } from 'rxjs'; import { WrappedMessage } from '../utils/message.ordering'; import { Adapter } from './adapter.base'; import { ReceiverAdapter } from './adapter.receiver'; import { TransmitterAdapter } from './adapter.transmitter'; dotenv.config(); /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers So how?: */ export class RequestResponseAdapter extends Adapter implements RequestResponseAdapterInterface { private transmitterAdapter!: TransmitterAdapter private receiverAdapter!: ReceiverAdapter constructor( transmitterAdapter: TransmitterAdapter, receiverAdapter: ReceiverAdapter) { super() // logic here this.transmitterAdapter = transmitterAdapter this.receiverAdapter = receiverAdapter } // Make use of the adapters ref passed in send(message: WrappedMessage): Observable { return new Observable((response: Observer) => { // logic here this.transmitterAdapter.emit(message) const subscription: Subscription = this.receiverAdapter.getMessageBus(Bus.GeneralBus).pipe( filter((message: TransportEvent) => message.event === 'New Message'), // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management filter((message: TransportEvent) => (message.data as TransportMessage).target == this.adapterProfile.id), takeWhile((message: TransportEvent) => { const shouldTake = ((message.data as TransportMessage).payload as FisMessage).data !== 'Complete'; if (!shouldTake) { response.complete(); // Ensure the observer is completed } return shouldTake; }), map(message => (message.data as TransportMessage).payload as FisMessage) ).subscribe((message: FisMessage) => { response.next(message); }); // Clean up on unsubscription return () => { subscription.unsubscribe(); }; }) } }