adapter.request.response.ts 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. import dotenv from 'dotenv';
  2. import { Bus, FisMessage } from "../interface/transport.interface";
  3. import { RequestResponseAdapter as RequestResponseAdapterInterface, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
  4. import { filter, map, Observable, Observer, Subscription, takeWhile } from 'rxjs';
  5. import { WrappedMessage } from '../utils/message.ordering';
  6. import { Adapter } from './adapter.base';
  7. import { ReceiverAdapter } from './adapter.receiver';
  8. import { TransmitterAdapter } from './adapter.transmitter';
  9. dotenv.config();
  10. /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
  11. So how?: */
  12. export class RequestResponseAdapter extends Adapter implements RequestResponseAdapterInterface {
  13. private transmitterAdapter!: TransmitterAdapter
  14. private receiverAdapter!: ReceiverAdapter
  15. constructor( transmitterAdapter: TransmitterAdapter, receiverAdapter: ReceiverAdapter) {
  16. super()
  17. // logic here
  18. this.transmitterAdapter = transmitterAdapter
  19. this.receiverAdapter = receiverAdapter
  20. }
  21. // Make use of the adapters ref passed in
  22. send(message: WrappedMessage): Observable<FisMessage> {
  23. return new Observable((response: Observer<FisMessage>) => {
  24. // logic here
  25. this.transmitterAdapter.emit(message)
  26. const subscription: Subscription = this.receiverAdapter.getMessageBus(Bus.GeneralBus).pipe(
  27. filter((message: TransportEvent) => message.event === 'New Message'),
  28. // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
  29. filter((message: TransportEvent) => (message.data as TransportMessage).target == this.adapterProfile.id),
  30. takeWhile((message: TransportEvent) => {
  31. const shouldTake = ((message.data as TransportMessage).payload as FisMessage).data !== 'Complete';
  32. if (!shouldTake) {
  33. response.complete(); // Ensure the observer is completed
  34. }
  35. return shouldTake;
  36. }),
  37. map(message => (message.data as TransportMessage).payload as FisMessage)
  38. ).subscribe((message: FisMessage) => {
  39. response.next(message);
  40. });
  41. // Clean up on unsubscription
  42. return () => {
  43. subscription.unsubscribe();
  44. };
  45. })
  46. }
  47. }