transmitter.ts 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import { filter, interval, map, Observable, Observer, Subject, Subscription, take } from "rxjs";
  2. import dotenv from 'dotenv';
  3. import { Bus, FisMessage, MessageTransmission, TransmissionMessage } from "../interface/transport.interface";
  4. import { v4 as uuidv4 } from 'uuid'
  5. import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
  6. import { TransportEvent } from "../interface/connector.interface";
  7. class Supervisor {
  8. private clientIncomingMessage: Subject<FisMessage> = new Subject()
  9. private messageProducer!: MessageProducer
  10. private transmissionManager!: MessageTransmissionManager
  11. private event: Subject<TransportEvent> = new Subject()
  12. private transmissionSets: MessageTransmission[] = []
  13. constructor() {
  14. // so need them adapters now. But supervisor shouldn't be concerned, only messageTransmissionManager and ConnectionManager
  15. this.messageProducer = new MessageProducer(this.clientIncomingMessage)
  16. this.transmissionManager = new MessageTransmissionManager(this.event)
  17. this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmission) => {
  18. this.transmissionSets.push(transmissionSet)
  19. this.handleClientActivity(transmissionSet)
  20. })
  21. }
  22. // only called once for each connected clients.
  23. private handleClientActivity(messageTransmission: MessageTransmission): void {
  24. // start listening to incoming messages from this client
  25. messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransmissionMessage) => {
  26. console.log(event)
  27. let requestMessage: FisMessage = event.payload
  28. // this.clientIncomingMessage.next(event.payload as FisMessage)
  29. this.messageProducer.getOutgoingMessages().pipe(
  30. filter(message => message.header.messageID === requestMessage.header.messageID)
  31. ).subscribe(message => {
  32. messageTransmission.transmitter.emit(message)
  33. })
  34. })
  35. // to emulate general notification. Send every second
  36. this.messageProducer.getNotificationMessage().subscribe((message: FisMessage) => {
  37. messageTransmission.transmitter.emit(message)
  38. })
  39. }
  40. }
  41. class MessageProducer {
  42. private transmissionInstance!: MessageTransmission
  43. private generalNotification: Subject<FisMessage> = new Subject()
  44. private incomingMessageBus!: Subject<FisMessage>
  45. private outgoingMessageBus: Subject<FisMessage> = new Subject()
  46. constructor(incomingMessageBus: Subject<any>) {
  47. console.log(`Contructing Application....`)
  48. this.incomingMessageBus = incomingMessageBus
  49. this.generateNotifcation().subscribe(this.generalNotification)
  50. this.handleIncomingRequests(this.incomingMessageBus, this.outgoingMessageBus)
  51. }
  52. public getNotificationMessage(): Observable<FisMessage> {
  53. return this.generalNotification.asObservable()
  54. }
  55. public getOutgoingMessages(): Observable<FisMessage> {
  56. return this.outgoingMessageBus.asObservable()
  57. }
  58. private handleIncomingRequests(requests: Subject<FisMessage>, outgoingMessageBus: Subject<FisMessage>): void {
  59. requests.subscribe(request => {
  60. this.generateMessage(10).subscribe({
  61. next: message => outgoingMessageBus.next(message),
  62. error: error => console.error(error),
  63. complete: () => {
  64. outgoingMessageBus.next({
  65. header: {
  66. messageID: request.header.messageID,
  67. messageName: `ResponseMessage`
  68. },
  69. data: `Complete`
  70. } as FisMessage)
  71. }
  72. })
  73. })
  74. }
  75. private generateMessage(amount: number): Observable<FisMessage> {
  76. return new Observable((response: Observer<FisMessage>) => {
  77. const intervalMessageGeneration = interval(1000).pipe(
  78. take(amount), // Ensures only 'amount' messages are generated
  79. map(() => {
  80. const message: FisMessage = {
  81. header: {
  82. messageID: uuidv4(),
  83. messageName: 'ResponseMessage'
  84. },
  85. data: `Data`
  86. };
  87. return message;
  88. })
  89. );
  90. const subscription = intervalMessageGeneration.subscribe({
  91. next: message => response.next(message),
  92. error: error => response.error(error),
  93. complete: () => {
  94. response.next({
  95. header: {
  96. messageID: uuidv4(),
  97. messageName: 'ResponseMessage'
  98. },
  99. data: `Complete`
  100. });
  101. response.complete();
  102. }
  103. });
  104. // Ensure cleanup on unsubscribe
  105. return () => subscription.unsubscribe();
  106. });
  107. }
  108. private generateNotifcation(): Observable<FisMessage> {
  109. return new Observable((response: Observer<FisMessage>) => {
  110. const intervalMessageGeneration = interval(1000).pipe(
  111. map(() => {
  112. const message: FisMessage = {
  113. header: {
  114. messageID: uuidv4(),
  115. messageName: 'NotificationMessage'
  116. },
  117. data: `Data`
  118. };
  119. return message;
  120. })
  121. );
  122. const subscription = intervalMessageGeneration.subscribe({
  123. next: message => response.next(message),
  124. error: error => response.error(error),
  125. complete: () => {
  126. response.next({
  127. header: {
  128. messageID: uuidv4(),
  129. messageName: 'NotificationMessage'
  130. },
  131. data: `Complete`
  132. });
  133. response.complete();
  134. }
  135. });
  136. // Ensure cleanup on unsubscribe
  137. return () => subscription.unsubscribe();
  138. });
  139. }
  140. }
  141. let supervisor = new Supervisor()