msg.transmission.request-response.ts 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. /* This is more unique transmission component, because it's supposed to emulate a conventional request response call. But the underlying
  2. mechanism is still using message. Here as you will see, it is basically taking the already instantiated transmitter and receiver components
  3. and basically just filtering the responses based on whatever identifier it needs for the orignal request. */
  4. import { MessageTransmissionBase } from "../base/msg.transmission.base";
  5. import { filter, Observable, Observer, Subscription, Unsubscribable } from "rxjs";
  6. import { FisMessage, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransmissionConfig, TransmissionProfile } from "../interface/interface";
  7. import ConsoleLogger from "../utils/log.utils";
  8. import { checkRxType } from "../utils/general.utils";
  9. export class MessageTransmissionRequestResponse<T> implements MessageRequestResponseInterface<T> {
  10. private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionRequestResponse`, ['transmission'])
  11. private transmissionProfile!: TransmissionProfile
  12. protected transmitterInstance!: MessageTransmitterInterface<any>;
  13. protected receiverInstance!: MessageReceiverInterface<any>;
  14. constructor(config: TransmissionConfig, transmitterInstance: MessageTransmitterInterface<any>, receiverInstance: MessageReceiverInterface<any>) {
  15. this.console.log({ message: `Constructing Request Response Transmission for Receiving target: ${config.target}` })
  16. this.transmitterInstance = transmitterInstance
  17. this.receiverInstance = receiverInstance
  18. }
  19. getTransmissionProfile(type?: `config` | `id` | `role`): TransmissionProfile | TransmissionConfig | string | undefined {
  20. return type ? this.transmissionProfile[type] : this.transmissionProfile;
  21. }
  22. // this is still broken at this point in time. DO NOT USE!!!!
  23. subscribe(observer: Observer<T> | Observable<T>): Unsubscribable | null {
  24. if (checkRxType(observer) === `Observable`) {
  25. // Create a new Subscription to manage unsubscription
  26. const subscription = (observer as Observable<any>).subscribe(message => {
  27. this.send(message).subscribe(response => {
  28. this.console.log({ message: `Response received: ${response}` });
  29. });
  30. });
  31. return subscription; // Return the Subscription (Unsubscribable)
  32. } else {
  33. return null;
  34. }
  35. }
  36. send(message: FisMessage): Observable<FisMessage> {
  37. return new Observable((response: Observer<FisMessage>) => {
  38. // logic here
  39. if (this.transmitterInstance && this.receiverInstance) {
  40. this.transmitterInstance.emit(message)
  41. const subscription: Subscription = this.receiverInstance.getReceivables().pipe(
  42. filter(event => event.header.messageID === message.header.messageID),
  43. ).subscribe({
  44. next: (message: FisMessage) => {
  45. if (message.data == 'Complete') {
  46. response.complete()
  47. } else {
  48. response.next(message)
  49. }
  50. },
  51. error: error => console.error(error)
  52. })
  53. // Clean up on unsubscription
  54. return () => {
  55. subscription.unsubscribe();
  56. }
  57. } else {
  58. response.error(new Error('Transmitter or receiver instance is missing.'));
  59. return;
  60. }
  61. });
  62. }
  63. }