msg.transmission.request-response.ts 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. /* This is more unique transmission component, because it's supposed to emulate a conventional request response call. But the underlying
  2. mechanism is still using message. Here as you will see, it is basically taking the already instantiated transmitter and receiver components
  3. and basically just filtering the responses based on whatever identifier it needs for the orignal request. */
  4. import { MessageTransmissionBase } from "../base/msg.transmission.base";
  5. import { filter, Observable, Observer, Subscription } from "rxjs";
  6. import { FisMessage, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransmissionProfile } from "../interface/interface";
  7. import ConsoleLogger from "../utils/log.utils";
  8. export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface {
  9. private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionRequestResponse`, ['transmission'])
  10. protected transmitterInstance!: MessageTransmitterInterface;
  11. protected receiverInstance!: MessageReceiverInterface;
  12. constructor(profile: TransmissionProfile, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface) {
  13. super()
  14. this.profile = profile
  15. this.console.log({ message: `Constructing Request Response Transmission for Receiving target: ${this.profile.target}` })
  16. this.transmitterInstance = transmitterInstance
  17. this.receiverInstance = receiverInstance
  18. }
  19. send(message: FisMessage): Observable<FisMessage> {
  20. return new Observable((response: Observer<FisMessage>) => {
  21. // logic here
  22. if (this.transmitterInstance && this.receiverInstance) {
  23. this.transmitterInstance.emit(message)
  24. const subscription: Subscription = this.receiverInstance.getReceivables().pipe(
  25. filter(event => event.header.messageID === message.header.messageID),
  26. ).subscribe({
  27. next: (message: FisMessage) => {
  28. if (message.data == 'Complete') {
  29. response.complete()
  30. } else {
  31. response.next(message)
  32. }
  33. },
  34. error: error => console.error(error)
  35. })
  36. // Clean up on unsubscription
  37. return () => {
  38. subscription.unsubscribe();
  39. }
  40. } else {
  41. response.error(new Error('Transmitter or receiver instance is missing.'));
  42. return;
  43. }
  44. });
  45. }
  46. }