import { filter, interval, map, Observable, Observer, Subject, Subscription, take } from "rxjs"; import dotenv from 'dotenv'; import { Bus, FisMessage, MessageTransmission, TransmissionMessage } from "../interface/transport.interface"; import { v4 as uuidv4 } from 'uuid' import { MessageTransmissionManager } from "../transmission/msg.transmission.manager"; import { TransportEvent } from "../interface/connector.interface"; class Supervisor { private clientIncomingMessage: Subject = new Subject() private messageProducer!: MessageProducer private transmissionManager!: MessageTransmissionManager private event: Subject = new Subject() private transmissionSets: MessageTransmission[] = [] constructor() { // so need them adapters now. But supervisor shouldn't be concerned, only messageTransmissionManager and ConnectionManager this.messageProducer = new MessageProducer(this.clientIncomingMessage) this.transmissionManager = new MessageTransmissionManager(this.event) this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmission) => { this.transmissionSets.push(transmissionSet) this.handleClientActivity(transmissionSet) }) } // only called once for each connected clients. private handleClientActivity(messageTransmission: MessageTransmission): void { // start listening to incoming messages from this client messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransmissionMessage) => { console.log(event) let requestMessage: FisMessage = event.payload // this.clientIncomingMessage.next(event.payload as FisMessage) this.messageProducer.getOutgoingMessages().pipe( filter(message => message.header.messageID === requestMessage.header.messageID) ).subscribe(message => { messageTransmission.transmitter.emit(message) }) }) // to emulate general notification. Send every second this.messageProducer.getNotificationMessage().subscribe((message: FisMessage) => { messageTransmission.transmitter.emit(message) }) } } class MessageProducer { private transmissionInstance!: MessageTransmission private generalNotification: Subject = new Subject() private incomingMessageBus!: Subject private outgoingMessageBus: Subject = new Subject() constructor(incomingMessageBus: Subject) { console.log(`Contructing Application....`) this.incomingMessageBus = incomingMessageBus this.generateNotifcation().subscribe(this.generalNotification) this.handleIncomingRequests(this.incomingMessageBus, this.outgoingMessageBus) } public getNotificationMessage(): Observable { return this.generalNotification.asObservable() } public getOutgoingMessages(): Observable { return this.outgoingMessageBus.asObservable() } private handleIncomingRequests(requests: Subject, outgoingMessageBus: Subject): void { requests.subscribe(request => { this.generateMessage(10).subscribe({ next: message => outgoingMessageBus.next(message), error: error => console.error(error), complete: () => { outgoingMessageBus.next({ header: { messageID: request.header.messageID, messageName: `ResponseMessage` }, data: `Complete` } as FisMessage) } }) }) } private generateMessage(amount: number): Observable { return new Observable((response: Observer) => { const intervalMessageGeneration = interval(1000).pipe( take(amount), // Ensures only 'amount' messages are generated map(() => { const message: FisMessage = { header: { messageID: uuidv4(), messageName: 'ResponseMessage' }, data: `Data` }; return message; }) ); const subscription = intervalMessageGeneration.subscribe({ next: message => response.next(message), error: error => response.error(error), complete: () => { response.next({ header: { messageID: uuidv4(), messageName: 'ResponseMessage' }, data: `Complete` }); response.complete(); } }); // Ensure cleanup on unsubscribe return () => subscription.unsubscribe(); }); } private generateNotifcation(): Observable { return new Observable((response: Observer) => { const intervalMessageGeneration = interval(1000).pipe( map(() => { const message: FisMessage = { header: { messageID: uuidv4(), messageName: 'NotificationMessage' }, data: `Data` }; return message; }) ); const subscription = intervalMessageGeneration.subscribe({ next: message => response.next(message), error: error => response.error(error), complete: () => { response.next({ header: { messageID: uuidv4(), messageName: 'NotificationMessage' }, data: `Complete` }); response.complete(); } }); // Ensure cleanup on unsubscribe return () => subscription.unsubscribe(); }); } } let supervisor = new Supervisor()