msg.transmission.receiver.ts 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import { filter, Observable, Observer, Subject, Subscription } from 'rxjs';
  2. import { v4 as uuidv4 } from 'uuid'
  3. import { ReceiverAdapter } from '../connector/adapter.receiver';
  4. import { checkMessage, WrappedMessage } from '../utils/message.ordering';
  5. import ConsoleLogger from '../utils/log.utils';
  6. import { MessageTransmissionBase } from '../base/msg.transmission.base';
  7. import { MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
  8. export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
  9. private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
  10. private onHoldMessage: Subject<WrappedMessage> = new Subject()
  11. private currentAdapter!: ReceiverAdapterInterface
  12. // private toBePassedOver: Subject<WrappedMessage> = new Subject()
  13. constructor(clientId: string, eventObj: EventObject) {
  14. super()
  15. this.clientId = clientId
  16. this.eventObj = eventObj
  17. this.handleAdapterEvent(this.eventObj.adapterEvent.asObservable())
  18. }
  19. getIncoming(): Observable<TransportEvent> {
  20. this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
  21. return new Observable((observable: Observer<TransportEvent>) => {
  22. // logic here
  23. // Need to merge all the adapters into one when the time comes
  24. // SAMPLE: This adapterArray.forEach(adapter => { ... })
  25. const subscription: Subscription = this.currentAdapter.subscribe().pipe(
  26. filter((event: TransportEvent) => event.event == 'New Message'),
  27. ).subscribe((event: TransportEvent) => {
  28. // console.log(event) // data is transportMessage instead of eventmessage
  29. this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
  30. checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
  31. // only release the message before it exists
  32. this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
  33. // console.log(((event.data as TransportMessage).payload as WrappedMessage))
  34. observable.next(event);
  35. }).catch((error) => {
  36. this.console.log({ message: `Observer Error`, details: error })
  37. })
  38. });
  39. // Clean up on unsubscription
  40. return () => {
  41. subscription.unsubscribe();
  42. };
  43. })
  44. }
  45. private handleAdapterEvent(adapterEvent: Observable<AdapterEvent>): void {
  46. }
  47. }