msg.transmission.receiver.ts 5.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription } from 'rxjs';
  2. import { v4 as uuidv4 } from 'uuid'
  3. import { ReceiverAdapter } from '../adapters/adapter.receiver';
  4. import { checkMessage, WrappedMessage } from '../utils/message.ordering';
  5. import ConsoleLogger from '../utils/log.utils';
  6. import { MessageTransmissionBase } from '../base/msg.transmission.base';
  7. import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransmissionProfile, TransportMessage } from '../interface/interface';
  8. import { error } from 'console';
  9. export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
  10. private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>(`OFFLINE`)
  11. private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
  12. private onHoldMessage: Subject<WrappedMessage> = new Subject()
  13. private currentAdapter!: ReceiverAdapterInterface
  14. private incomingMessage: Subject<GeneralEvent<TransportMessage>> = new Subject()
  15. // private toBePassedOver: Subject<WrappedMessage> = new Subject()
  16. constructor(profile: TransmissionProfile, adapterManager: AdapterManagerInterface) {
  17. super()
  18. this.profile = profile
  19. this.console.log({ message: `Constructing Receiver Transmission for Receiving target: ${this.profile.target}` })
  20. this.initializeReceiverComponents(adapterManager)
  21. }
  22. public getReceivables(): Observable<GeneralEvent<TransportMessage>> {
  23. return new Observable((receivable: Observer<GeneralEvent<TransportMessage>>) => {
  24. this.console.log({ message: `Transmission streaming messages from ${this.profile.target}` })
  25. const subscription: Subscription = this.incomingMessage.pipe(
  26. filter((event: GeneralEvent<any>) => event.event == 'New Message'),
  27. ).subscribe((event: GeneralEvent<TransportMessage>) => {
  28. // console.log(event) // data is transportMessage instead of eventmessage
  29. this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
  30. checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
  31. // only release the message before it exists
  32. this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
  33. // console.log(((event.data as TransportMessage).payload as WrappedMessage))
  34. receivable.next(event);
  35. }).catch((error) => {
  36. this.console.log({ message: `Observer Error`, details: error })
  37. })
  38. })
  39. // Clean up on unsubscription
  40. return () => {
  41. subscription.unsubscribe();
  42. };
  43. })
  44. }
  45. /* Assigned and update adapters record. Currently no logic to swtich adapters based on performance or whatever logic to be integrated in the future */
  46. private initializeReceiverComponents(adapterManager: AdapterManagerInterface): void {
  47. adapterManager.subscribeForAdapters(this.profile.target, `Receiver`).pipe(
  48. ).subscribe((adapter: AdapterInterface) => {
  49. this.adapters.push(adapter)
  50. this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} receiving adapter. Current adapter length: ${this.adapters.length}` })
  51. if (!this.currentAdapter) {
  52. this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`role`)} as current adapter.` })
  53. this.currentAdapter = adapter as ReceiverAdapterInterface
  54. this.currentAdapter.subscribeForIncoming().subscribe({
  55. next: (message: GeneralEvent<TransportMessage>) => {
  56. this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })
  57. this.incomingMessage.next(message)
  58. },
  59. error: error => {
  60. // Error handling. Idealling switching to other adapters
  61. }
  62. })
  63. let connectionState: Observable<ConnectionState> = this.currentAdapter.getAdapterProfile(`connectionState`) as Observable<ConnectionState>
  64. connectionState.subscribe(this.connectionStateEvent)
  65. } else {
  66. this.currentAdapter.subscribeForIncoming().subscribe({
  67. next: (message: GeneralEvent<TransportMessage>) => this.incomingMessage.next(message),
  68. error: error => {
  69. // Error handling. Idealling switching to other adapters
  70. }
  71. })
  72. }
  73. })
  74. }
  75. }