msg.transmission.receiver.ts 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import { filter, map, Observable, Observer, Subject, Subscription } from 'rxjs';
  2. import { v4 as uuidv4 } from 'uuid'
  3. import { ReceiverAdapter } from '../adapters/adapter.receiver';
  4. import { checkMessage, WrappedMessage } from '../utils/message.ordering';
  5. import ConsoleLogger from '../utils/log.utils';
  6. import { MessageTransmissionBase } from '../base/msg.transmission.base';
  7. import { AdapterInterface, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
  8. import { AdapterBase } from '../base/adapter.base';
  9. import { IncomingMessage } from 'http';
  10. export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
  11. private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
  12. private onHoldMessage: Subject<WrappedMessage> = new Subject()
  13. private currentAdapter!: ReceiverAdapter
  14. private incomingMessage: Subject<GeneralEvent<TransportMessage>> = new Subject()
  15. // private toBePassedOver: Subject<WrappedMessage> = new Subject()
  16. constructor(clientId: string, event: Subject<GeneralEvent<any>>) {
  17. super()
  18. this.clientId = clientId
  19. this.event = event
  20. this.handleAdapterEvent(this.event.asObservable())
  21. }
  22. getIncoming(): Observable<GeneralEvent<TransportMessage>> {
  23. this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
  24. return new Observable((observable: Observer<GeneralEvent<any>>) => {
  25. const subscription: Subscription = this.incomingMessage.pipe(
  26. filter((event: GeneralEvent<any>) => event.event == 'New Message'),
  27. ).subscribe((event: GeneralEvent<TransportMessage>) => {
  28. // console.log(event) // data is transportMessage instead of eventmessage
  29. this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
  30. checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
  31. // only release the message before it exists
  32. this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
  33. // console.log(((event.data as TransportMessage).payload as WrappedMessage))
  34. observable.next(event);
  35. }).catch((error) => {
  36. this.console.log({ message: `Observer Error`, details: error })
  37. })
  38. })
  39. // Clean up on unsubscription
  40. return () => {
  41. subscription.unsubscribe();
  42. };
  43. })
  44. }
  45. private handleAdapterEvent(adapterEvent: Observable<GeneralEvent<any>>): void {
  46. const subscription: Subscription = adapterEvent.pipe(
  47. filter(event => event.type === `Adapter Event`),
  48. filter(event => event.event === `New Adapter`),
  49. map(event => {
  50. return event.data
  51. }),
  52. filter((adapter: AdapterInterface) => adapter.role === `Receiver`),
  53. map(adapter => {
  54. return adapter as ReceiverAdapter
  55. })
  56. ).subscribe((adapter: ReceiverAdapter) => {
  57. if (!this.adapters.some(adapterObj => adapterObj.adapterId === adapter.adapterId)) {
  58. this.adapters.push(adapter)
  59. this.currentAdapter = adapter
  60. this.currentAdapter.subscribeForIncoming().subscribe(this.incomingMessage)
  61. } else {
  62. this.console.error({ message: `Adapter ID: ${adapter.adapterId} already existed.` })
  63. }
  64. })
  65. }
  66. }