msg.transmission.request-response.ts 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import { MessageTransmissionBase } from "../base/msg.transmission.base";
  2. import { filter, map, Observable, Observer, Subject, Subscription, takeWhile } from "rxjs";
  3. import { v4 as uuidv4 } from 'uuid'
  4. import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
  5. import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
  6. import { AdapterInterface, FisMessage, GeneralEvent, MessageRequestResponseInterface, TransportMessage } from "../interface/interface";
  7. import { WrappedMessage } from "../utils/message.ordering";
  8. export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface {
  9. transmitterInstance!: MessageTransmissionTransmitter;
  10. receiverInstance!: MessageTransmissionReceiver;
  11. constructor(clientId: string, transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, event: Subject<GeneralEvent<any>>) {
  12. super()
  13. this.clientId = clientId
  14. this.transmitterInstance = transmitterInstance
  15. this.receiverInstance = receiverInstance
  16. this.event = event
  17. }
  18. send(message: FisMessage): Observable<FisMessage> {
  19. return new Observable((response: Observer<FisMessage>) => {
  20. // logic here
  21. if (this.transmitterInstance && this.receiverInstance) {
  22. this.transmitterInstance.emit(message)
  23. const subscription: Subscription = this.receiverInstance.getIncoming().pipe(
  24. filter(event => event.event === `New Message`),
  25. filter(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID === message.header.messageID),
  26. map(event => {
  27. return (event.data as TransportMessage).payload as FisMessage
  28. })
  29. ).subscribe({
  30. next: (message: FisMessage) => {
  31. if (message.data == 'Complete') {
  32. response.complete()
  33. } else {
  34. response.next(message)
  35. }
  36. },
  37. error: error => console.error(error)
  38. })
  39. // Clean up on unsubscription
  40. return () => {
  41. subscription.unsubscribe();
  42. }
  43. } else {
  44. response.error(new Error('Transmitter or receiver instance is missing.'));
  45. return;
  46. }
  47. });
  48. }
  49. }