1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- import dotenv from 'dotenv';
- import { Bus, FisMessage } from "../interface/transport.interface";
- import { RequestResponseAdapter as RequestResponseAdapterInterface, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
- import { filter, map, Observable, Observer, Subscription, takeWhile } from 'rxjs';
- import { WrappedMessage } from '../utils/message.ordering';
- import { Adapter } from './adapter.base';
- import { ReceiverAdapter } from './adapter.receiver';
- import { TransmitterAdapter } from './adapter.transmitter';
- dotenv.config();
- /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
- So how?: */
- export class RequestResponseAdapter extends Adapter implements RequestResponseAdapterInterface {
- private transmitterAdapter!: TransmitterAdapter
- private receiverAdapter!: ReceiverAdapter
- constructor( transmitterAdapter: TransmitterAdapter, receiverAdapter: ReceiverAdapter) {
- super()
- // logic here
- this.transmitterAdapter = transmitterAdapter
- this.receiverAdapter = receiverAdapter
- }
- // Make use of the adapters ref passed in
- send(message: WrappedMessage): Observable<FisMessage> {
- return new Observable((response: Observer<FisMessage>) => {
- // logic here
- this.transmitterAdapter.emit(message)
- const subscription: Subscription = this.receiverAdapter.getMessageBus(Bus.GeneralBus).pipe(
- filter((message: TransportEvent) => message.event === 'New Message'),
- // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
- filter((message: TransportEvent) => (message.data as TransportMessage).target == this.adapterProfile.id),
- takeWhile((message: TransportEvent) => {
- const shouldTake = ((message.data as TransportMessage).payload as FisMessage).data !== 'Complete';
- if (!shouldTake) {
- response.complete(); // Ensure the observer is completed
- }
- return shouldTake;
- }),
- map(message => (message.data as TransportMessage).payload as FisMessage)
- ).subscribe((message: FisMessage) => {
- response.next(message);
- });
- // Clean up on unsubscription
- return () => {
- subscription.unsubscribe();
- };
- })
- }
- }
|