import { filter, interval, map, Observable, Observer, Subject, take } from "rxjs"; import { Bus, EventMessage, FisMessage, MessageTransmission, TransmissionMessage } from "../interface/transport.interface"; import { v4 as uuidv4 } from 'uuid' import { MessageTransmissionManager } from "../transmission/msg.transmission.manager"; import { error } from "console"; import e from "express"; import { TransportEvent } from "../interface/connector.interface"; /* These are the purple fonts. Gonna interact with blue fonts to set up the credentials to establish the necessary roles. Assuming the primary role is server. That means we will need transmitter and multiple receiver profiles that are connected. */ class Application { messageTransmissionManager: MessageTransmissionManager transmissionInstance!: MessageTransmission generalNotification: Subject = new Subject() constructor() { this.messageTransmissionManager = new MessageTransmissionManager() this.transmissionInstance = this.messageTransmissionManager.getTransmissionInstance() this.generateNotifcation().subscribe(this.generalNotification) } // Emulating request response. For the case where this transmitter is acting as a receiver send(message: FisMessage): Observable { return new Observable((response) => { // logic here }) } // Transmission only emit(message: FisMessage, adapterId: string): void { this.transmissionInstance.transmitter.emit({ adapterId: adapterId, // this should mqatch the request ID?? payload: message }) } // Receiving only susbcribe(): Observable { return new Observable((observer: Observer) => { this.transmissionInstance.receiver.getMessageBus(Bus.GeneralBus).subscribe((message: TransmissionMessage) => { // logic here this.appProcess(message.adapterId, message.payload) }) }) } // no request needed, auto broadcast subscribeForNewClientWhoWantsNotification(): void { this.transmissionInstance.event.pipe( filter(obj => obj.event == 'New Adapter') ).subscribe((event: TransportEvent) => { this.generalNotification.subscribe((message: FisMessage) => { this.emit(message, (event.data as EventMessage).adapterId) }) }) } // just assume that the provide will provide 10 responses messages appProcess(adapterId: string, message: FisMessage): void { this.generateMessage(10).subscribe({ next: (message: FisMessage) => { this.emit(message, adapterId) }, error: error => console.error(error), complete: () => console.log(`All responses generated completed and passed into adapter: ${adapterId}`) }) } private generateMessage(amount: number): Observable { return new Observable((response: Observer) => { const intervalMessageGeneration = interval(1000).pipe( take(amount), // Ensures only 'amount' messages are generated map(() => { const message: FisMessage = { header: { messageID: uuidv4(), messageName: 'ResponseMessage' }, data: `Data` }; return message; }) ); const subscription = intervalMessageGeneration.subscribe({ next: message => response.next(message), error: error => response.error(error), complete: () => { response.next({ header: { messageID: uuidv4(), messageName: 'ResponseMessage' }, data: `Complete` }); response.complete(); } }); // Ensure cleanup on unsubscribe return () => subscription.unsubscribe(); }); } private generateNotifcation(): Observable { return new Observable((response: Observer) => { const intervalMessageGeneration = interval(1000).pipe( map(() => { const message: FisMessage = { header: { messageID: uuidv4(), messageName: 'ResponseMessage' }, data: `Data` }; return message; }) ); const subscription = intervalMessageGeneration.subscribe({ next: message => response.next(message), error: error => response.error(error), complete: () => { response.next({ header: { messageID: uuidv4(), messageName: 'NotificationMessage' }, data: `Complete` }); response.complete(); } }); // Ensure cleanup on unsubscribe return () => subscription.unsubscribe(); }); } } const application = new Application()