1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- import { filter, map, Observable, Observer, Subject, Subscription } from 'rxjs';
- import { v4 as uuidv4 } from 'uuid'
- import { ReceiverAdapter } from '../adapters/adapter.receiver';
- import { checkMessage, WrappedMessage } from '../utils/message.ordering';
- import ConsoleLogger from '../utils/log.utils';
- import { MessageTransmissionBase } from '../base/msg.transmission.base';
- import { AdapterInterface, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
- export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
- private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
- private onHoldMessage: Subject<WrappedMessage> = new Subject()
- private currentAdapter!: ReceiverAdapterInterface
- private incomingMessage: Subject<GeneralEvent<TransportMessage>> = new Subject()
- // private toBePassedOver: Subject<WrappedMessage> = new Subject()
- constructor(clientId: string, event: Subject<GeneralEvent<any>>) {
- super()
- this.clientId = clientId
- this.event = event
- this.handleAdapters(this.event.asObservable())
- }
- getIncoming(): Observable<GeneralEvent<TransportMessage>> {
- this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
- return new Observable((observable: Observer<GeneralEvent<any>>) => {
- const subscription: Subscription = this.incomingMessage.pipe(
- filter((event: GeneralEvent<any>) => event.event == 'New Message'),
- ).subscribe((event: GeneralEvent<TransportMessage>) => {
- // console.log(event) // data is transportMessage instead of eventmessage
- this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
- checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
- // only release the message before it exists
- this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
- // console.log(((event.data as TransportMessage).payload as WrappedMessage))
- observable.next(event);
- }).catch((error) => {
- this.console.log({ message: `Observer Error`, details: error })
- })
- })
- // Clean up on unsubscription
- return () => {
- subscription.unsubscribe();
- };
- })
- }
- private handleAdapters(adapterEvent: Observable<GeneralEvent<any>>): void {
- adapterEvent.pipe(
- filter(event => event.type === `Adapter Event`),
- filter(event => event.event === `New Adapter`),
- map(event => { return event.data as AdapterInterface }),
- filter((adapter: AdapterInterface) => adapter.role === `Receiver`),
- map(adapter => { return adapter as ReceiverAdapter })
- ).subscribe({
- next: (adapter: ReceiverAdapterInterface) => {
- if (!this.adapters.some(adapterObj => adapterObj.adapterId === adapter.adapterId)) {
- this.adapters.push(adapter)
- this.currentAdapter = adapter
- this.console.log({ message: `Setting Current adapter = ${this.currentAdapter.adapterId}` })
- this.currentAdapter.subscribeForIncoming().subscribe(this.incomingMessage)
- } else {
- this.console.error({ message: `Adapter ID: ${adapter.adapterId} already existed.` })
- }
- },
- error: error => this.console.error({ message: 'Observer Error', details: error })
- })
- }
- }
|