msg.transmission.receiver.ts 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import { filter, Observable, Observer, Subject, Subscription } from 'rxjs';
  2. import { TransportEvent, TransportMessage } from '../interface/connector.interface';
  3. import { MessageTransmissionBase } from './msg.transmission.base';
  4. import { Bus, MessageReceiver as MessageReceiverInterface, ReceiverProfile } from '../interface/transport.interface'
  5. import { v4 as uuidv4 } from 'uuid'
  6. import { ReceiverAdapter } from '../connector/adapter.receiver';
  7. import { checkMessage, WrappedMessage } from '../utils/message.ordering';
  8. import ConsoleLogger from '../utils/log.utils';
  9. import { Adapter } from '../connector/adapter.base';
  10. export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
  11. private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
  12. private onHoldMessage: Subject<WrappedMessage> = new Subject()
  13. // private toBePassedOver: Subject<WrappedMessage> = new Subject()
  14. receiverProfile!: ReceiverProfile;
  15. constructor(profile: ReceiverProfile, adapter: ReceiverAdapter, event: Observable<TransportEvent>) {
  16. super()
  17. this.event = event
  18. this.console.log({ message: `Constructing Receiver Transmission with ${profile.name}` })
  19. this.setReceiver(profile)
  20. this.setUpAdapter(adapter)
  21. }
  22. setReceiver(receiverProfile: ReceiverProfile): void {
  23. this.receiverProfile = receiverProfile
  24. }
  25. getMessageBus(bus: Bus): Observable<TransportEvent> {
  26. this.console.log({ message: `Transmission getting message bus for ${this.receiverProfile.id}` })
  27. return new Observable((observable: Observer<TransportEvent>) => {
  28. // logic here
  29. if (bus == Bus.GeneralBus) {
  30. // Need to merge all the adapters into one when the time comes
  31. // SAMPLE: This adapterArray.forEach(adapter => { ... })
  32. const subscription: Subscription = (this.mainAdapter as ReceiverAdapter).getMessageBus(Bus.GeneralBus).pipe(
  33. filter((event: TransportEvent) => event.event == 'New Message'),
  34. ).subscribe((event: TransportEvent) => {
  35. // console.log(event) // data is transportMessage instead of eventmessage
  36. this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
  37. checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
  38. // only release the message before it exists
  39. this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
  40. // console.log(((event.data as TransportMessage).payload as WrappedMessage))
  41. observable.next(event);
  42. }).catch((error) => {
  43. this.console.log({ message: `Observer Error`, details: error })
  44. })
  45. });
  46. // Clean up on unsubscription
  47. return () => {
  48. subscription.unsubscribe();
  49. };
  50. }
  51. })
  52. }
  53. setUpAdapter(adapter: Adapter): void {
  54. this.mainAdapter = adapter
  55. }
  56. }