transmitter.ts 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import { filter, interval, map, Observable, Observer, Subject, take } from "rxjs";
  2. import { Bus, EventMessage, FisMessage, MessageTransmission, TransmissionMessage } from "../interface/transport.interface";
  3. import { v4 as uuidv4 } from 'uuid'
  4. import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
  5. import { error } from "console";
  6. import e from "express";
  7. import { TransportEvent } from "../interface/connector.interface";
  8. /* These are the purple fonts. Gonna interact with blue fonts to set up the credentials to establish the necessary roles.
  9. Assuming the primary role is server. That means we will need transmitter and multiple receiver profiles that are connected. */
  10. class Application {
  11. messageTransmissionManager: MessageTransmissionManager
  12. transmissionInstance!: MessageTransmission
  13. generalNotification: Subject<FisMessage> = new Subject()
  14. constructor() {
  15. this.messageTransmissionManager = new MessageTransmissionManager()
  16. this.transmissionInstance = this.messageTransmissionManager.getTransmissionInstance()
  17. this.generateNotifcation().subscribe(this.generalNotification)
  18. }
  19. // Emulating request response. For the case where this transmitter is acting as a receiver
  20. send(message: FisMessage): Observable<FisMessage> {
  21. return new Observable((response) => {
  22. // logic here
  23. })
  24. }
  25. // Transmission only
  26. emit(message: FisMessage, adapterId: string): void {
  27. this.transmissionInstance.transmitter.emit({
  28. adapterId: adapterId, // this should mqatch the request ID??
  29. payload: message
  30. })
  31. }
  32. // Receiving only
  33. susbcribe(): Observable<FisMessage> {
  34. return new Observable((observer: Observer<any>) => {
  35. this.transmissionInstance.receiver.getMessageBus(Bus.GeneralBus).subscribe((message: TransmissionMessage) => {
  36. // logic here
  37. this.appProcess(message.adapterId, message.payload)
  38. })
  39. })
  40. }
  41. // no request needed, auto broadcast
  42. subscribeForNewClientWhoWantsNotification(): void {
  43. this.transmissionInstance.event.pipe(
  44. filter(obj => obj.event == 'New Adapter')
  45. ).subscribe((event: TransportEvent) => {
  46. this.generalNotification.subscribe((message: FisMessage) => {
  47. this.emit(message, (event.data as EventMessage).adapterId)
  48. })
  49. })
  50. }
  51. // just assume that the provide will provide 10 responses messages
  52. appProcess(adapterId: string, message: FisMessage): void {
  53. this.generateMessage(10).subscribe({
  54. next: (message: FisMessage) => {
  55. this.emit(message, adapterId)
  56. },
  57. error: error => console.error(error),
  58. complete: () => console.log(`All responses generated completed and passed into adapter: ${adapterId}`)
  59. })
  60. }
  61. private generateMessage(amount: number): Observable<FisMessage> {
  62. return new Observable((response: Observer<FisMessage>) => {
  63. const intervalMessageGeneration = interval(1000).pipe(
  64. take(amount), // Ensures only 'amount' messages are generated
  65. map(() => {
  66. const message: FisMessage = {
  67. header: {
  68. messageID: uuidv4(),
  69. messageName: 'ResponseMessage'
  70. },
  71. data: `Data`
  72. };
  73. return message;
  74. })
  75. );
  76. const subscription = intervalMessageGeneration.subscribe({
  77. next: message => response.next(message),
  78. error: error => response.error(error),
  79. complete: () => {
  80. response.next({
  81. header: {
  82. messageID: uuidv4(),
  83. messageName: 'ResponseMessage'
  84. },
  85. data: `Complete`
  86. });
  87. response.complete();
  88. }
  89. });
  90. // Ensure cleanup on unsubscribe
  91. return () => subscription.unsubscribe();
  92. });
  93. }
  94. private generateNotifcation(): Observable<FisMessage> {
  95. return new Observable((response: Observer<FisMessage>) => {
  96. const intervalMessageGeneration = interval(1000).pipe(
  97. map(() => {
  98. const message: FisMessage = {
  99. header: {
  100. messageID: uuidv4(),
  101. messageName: 'ResponseMessage'
  102. },
  103. data: `Data`
  104. };
  105. return message;
  106. })
  107. );
  108. const subscription = intervalMessageGeneration.subscribe({
  109. next: message => response.next(message),
  110. error: error => response.error(error),
  111. complete: () => {
  112. response.next({
  113. header: {
  114. messageID: uuidv4(),
  115. messageName: 'NotificationMessage'
  116. },
  117. data: `Complete`
  118. });
  119. response.complete();
  120. }
  121. });
  122. // Ensure cleanup on unsubscribe
  123. return () => subscription.unsubscribe();
  124. });
  125. }
  126. }
  127. const application = new Application()