12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription } from 'rxjs';
- import { v4 as uuidv4 } from 'uuid'
- import { ReceiverAdapter } from '../adapters/adapter.receiver';
- import { checkMessage, WrappedMessage } from '../utils/message.ordering';
- import ConsoleLogger from '../utils/log.utils';
- import { MessageTransmissionBase } from '../base/msg.transmission.base';
- import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransmissionProfile, TransportMessage } from '../interface/interface';
- import { error } from 'console';
- export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
- private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>(`OFFLINE`)
- private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
- private onHoldMessage: Subject<WrappedMessage> = new Subject()
- private currentAdapter!: ReceiverAdapterInterface
- private incomingMessage: Subject<GeneralEvent<TransportMessage>> = new Subject()
- // private toBePassedOver: Subject<WrappedMessage> = new Subject()
- constructor(profile: TransmissionProfile, adapterManager: AdapterManagerInterface) {
- super()
- this.profile = profile
- this.console.log({ message: `Constructing Receiver Transmission for Receiving target: ${this.profile.target}` })
- this.initializeReceiverComponents(adapterManager)
- }
- public getReceivables(): Observable<GeneralEvent<TransportMessage>> {
- return new Observable((receivable: Observer<GeneralEvent<TransportMessage>>) => {
- this.console.log({ message: `Transmission streaming messages from ${this.profile.target}` })
- const subscription: Subscription = this.incomingMessage.pipe(
- filter((event: GeneralEvent<any>) => event.event == 'New Message'),
- ).subscribe((event: GeneralEvent<TransportMessage>) => {
- // console.log(event) // data is transportMessage instead of eventmessage
- this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
- checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
- // only release the message before it exists
- this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
- // console.log(((event.data as TransportMessage).payload as WrappedMessage))
- receivable.next(event);
- }).catch((error) => {
- this.console.log({ message: `Observer Error`, details: error })
- })
- })
- // Clean up on unsubscription
- return () => {
- subscription.unsubscribe();
- };
- })
- }
- /* Assigned and update adapters record. Currently no logic to swtich adapters based on performance or whatever logic to be integrated in the future */
- private initializeReceiverComponents(adapterManager: AdapterManagerInterface): void {
- adapterManager.subscribeForAdapters(this.profile.target, `Receiver`).pipe(
- ).subscribe((adapter: AdapterInterface) => {
- this.adapters.push(adapter)
- this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} receiving adapter. Current adapter length: ${this.adapters.length}` })
- if (!this.currentAdapter) {
- this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`role`)} as current adapter.` })
- this.currentAdapter = adapter as ReceiverAdapterInterface
- this.currentAdapter.subscribeForIncoming().subscribe({
- next: (message: GeneralEvent<TransportMessage>) => {
- this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })
- this.incomingMessage.next(message)
- },
- error: error => {
- // Error handling. Idealling switching to other adapters
- }
- })
- let connectionState: Observable<ConnectionState> = this.currentAdapter.getAdapterProfile(`connectionState`) as Observable<ConnectionState>
- connectionState.subscribe(this.connectionStateEvent)
- } else {
- this.currentAdapter.subscribeForIncoming().subscribe({
- next: (message: GeneralEvent<TransportMessage>) => this.incomingMessage.next(message),
- error: error => {
- // Error handling. Idealling switching to other adapters
- }
- })
- }
- })
- }
- }
|