msg.transmission.receiver.ts 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. import { filter, map, Observable, Observer, Subject, Subscription } from 'rxjs';
  2. import { v4 as uuidv4 } from 'uuid'
  3. import { ReceiverAdapter } from '../adapters/adapter.receiver';
  4. import { checkMessage, WrappedMessage } from '../utils/message.ordering';
  5. import ConsoleLogger from '../utils/log.utils';
  6. import { MessageTransmissionBase } from '../base/msg.transmission.base';
  7. import { AdapterInterface, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
  8. export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
  9. private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
  10. private onHoldMessage: Subject<WrappedMessage> = new Subject()
  11. private currentAdapter!: ReceiverAdapterInterface
  12. private incomingMessage: Subject<GeneralEvent<TransportMessage>> = new Subject()
  13. // private toBePassedOver: Subject<WrappedMessage> = new Subject()
  14. constructor(clientId: string, event: Subject<GeneralEvent<any>>) {
  15. super()
  16. this.clientId = clientId
  17. this.event = event
  18. this.handleAdapters(this.event.asObservable())
  19. }
  20. getIncoming(): Observable<GeneralEvent<TransportMessage>> {
  21. this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
  22. return new Observable((observable: Observer<GeneralEvent<any>>) => {
  23. const subscription: Subscription = this.incomingMessage.pipe(
  24. filter((event: GeneralEvent<any>) => event.event == 'New Message'),
  25. ).subscribe((event: GeneralEvent<TransportMessage>) => {
  26. // console.log(event) // data is transportMessage instead of eventmessage
  27. this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
  28. checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
  29. // only release the message before it exists
  30. this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
  31. // console.log(((event.data as TransportMessage).payload as WrappedMessage))
  32. observable.next(event);
  33. }).catch((error) => {
  34. this.console.log({ message: `Observer Error`, details: error })
  35. })
  36. })
  37. // Clean up on unsubscription
  38. return () => {
  39. subscription.unsubscribe();
  40. };
  41. })
  42. }
  43. private handleAdapters(adapterEvent: Observable<GeneralEvent<any>>): void {
  44. adapterEvent.pipe(
  45. filter(event => event.type === `Adapter Event`),
  46. filter(event => event.event === `New Adapter`),
  47. map(event => { return event.data as AdapterInterface }),
  48. filter((adapter: AdapterInterface) => adapter.role === `Receiver`),
  49. map(adapter => { return adapter as ReceiverAdapter })
  50. ).subscribe({
  51. next: (adapter: ReceiverAdapterInterface) => {
  52. if (!this.adapters.some(adapterObj => adapterObj.adapterId === adapter.adapterId)) {
  53. this.adapters.push(adapter)
  54. this.currentAdapter = adapter
  55. this.console.log({ message: `Setting Current adapter = ${this.currentAdapter.adapterId}` })
  56. this.currentAdapter.subscribeForIncoming().subscribe(this.incomingMessage)
  57. } else {
  58. this.console.error({ message: `Adapter ID: ${adapter.adapterId} already existed.` })
  59. }
  60. },
  61. error: error => this.console.error({ message: 'Observer Error', details: error })
  62. })
  63. }
  64. }