123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- import { filter, Observable, Observer, Subject, Subscription } from 'rxjs';
- import { TransportEvent, TransportMessage } from '../interface/connector.interface';
- import { MessageTransmissionBase } from './msg.transmission.base';
- import { Bus, MessageReceiver as MessageReceiverInterface, ReceiverProfile } from '../interface/transport.interface'
- import { v4 as uuidv4 } from 'uuid'
- import { ReceiverAdapter } from '../connector/adapter.receiver';
- import { checkMessage, WrappedMessage } from '../utils/message.ordering';
- import ConsoleLogger from '../utils/log.utils';
- import { Adapter } from '../connector/adapter.base';
- export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
- private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
- private onHoldMessage: Subject<WrappedMessage> = new Subject()
- // private toBePassedOver: Subject<WrappedMessage> = new Subject()
- receiverProfile!: ReceiverProfile;
- constructor(profile: ReceiverProfile, adapter: ReceiverAdapter, event: Observable<TransportEvent>) {
- super()
- this.event = event
- this.console.log({ message: `Constructing Receiver Transmission with ${profile.name}` })
- this.setReceiver(profile)
- this.setUpAdapter(adapter)
- }
- setReceiver(receiverProfile: ReceiverProfile): void {
- this.receiverProfile = receiverProfile
- }
- getMessageBus(bus: Bus): Observable<TransportEvent> {
- this.console.log({ message: `Transmission getting message bus for ${this.receiverProfile.id}` })
- return new Observable((observable: Observer<TransportEvent>) => {
- // logic here
- if (bus == Bus.GeneralBus) {
- // Need to merge all the adapters into one when the time comes
- // SAMPLE: This adapterArray.forEach(adapter => { ... })
- const subscription: Subscription = (this.mainAdapter as ReceiverAdapter).getMessageBus(Bus.GeneralBus).pipe(
- filter((event: TransportEvent) => event.event == 'New Message'),
- ).subscribe((event: TransportEvent) => {
- // console.log(event) // data is transportMessage instead of eventmessage
- this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
- checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
- // only release the message before it exists
- this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
- // console.log(((event.data as TransportMessage).payload as WrappedMessage))
- observable.next(event);
- }).catch((error) => {
- this.console.log({ message: `Observer Error`, details: error })
- })
- });
- // Clean up on unsubscription
- return () => {
- subscription.unsubscribe();
- };
- }
- })
- }
- setUpAdapter(adapter: Adapter): void {
- this.mainAdapter = adapter
- }
- }
|