123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import { filter, interval, map, Observable, Observer, Subject, Subscription, take } from "rxjs";
- import dotenv from 'dotenv';
- import { Bus, FisMessage, MessageTransmission, TransmissionMessage } from "../interface/transport.interface";
- import { v4 as uuidv4 } from 'uuid'
- import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
- import { TransportEvent } from "../interface/connector.interface";
- class Supervisor {
- private clientIncomingMessage: Subject<FisMessage> = new Subject()
- private messageProducer!: MessageProducer
- private transmissionManager!: MessageTransmissionManager
- private event: Subject<TransportEvent> = new Subject()
- private transmissionSets: MessageTransmission[] = []
- constructor() {
- // so need them adapters now. But supervisor shouldn't be concerned, only messageTransmissionManager and ConnectionManager
- this.messageProducer = new MessageProducer(this.clientIncomingMessage)
- this.transmissionManager = new MessageTransmissionManager(this.event)
- this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmission) => {
- this.transmissionSets.push(transmissionSet)
- this.handleClientActivity(transmissionSet)
- })
- }
- // only called once for each connected clients.
- private handleClientActivity(messageTransmission: MessageTransmission): void {
- // start listening to incoming messages from this client
- messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransmissionMessage) => {
- console.log(event)
- let requestMessage: FisMessage = event.payload
- // this.clientIncomingMessage.next(event.payload as FisMessage)
- this.messageProducer.getOutgoingMessages().pipe(
- filter(message => message.header.messageID === requestMessage.header.messageID)
- ).subscribe(message => {
- messageTransmission.transmitter.emit(message)
- })
- })
- // to emulate general notification. Send every second
- this.messageProducer.getNotificationMessage().subscribe((message: FisMessage) => {
- messageTransmission.transmitter.emit(message)
- })
- }
- }
- class MessageProducer {
- private transmissionInstance!: MessageTransmission
- private generalNotification: Subject<FisMessage> = new Subject()
- private incomingMessageBus!: Subject<FisMessage>
- private outgoingMessageBus: Subject<FisMessage> = new Subject()
- constructor(incomingMessageBus: Subject<any>) {
- console.log(`Contructing Application....`)
- this.incomingMessageBus = incomingMessageBus
- this.generateNotifcation().subscribe(this.generalNotification)
- this.handleIncomingRequests(this.incomingMessageBus, this.outgoingMessageBus)
- }
- public getNotificationMessage(): Observable<FisMessage> {
- return this.generalNotification.asObservable()
- }
- public getOutgoingMessages(): Observable<FisMessage> {
- return this.outgoingMessageBus.asObservable()
- }
- private handleIncomingRequests(requests: Subject<FisMessage>, outgoingMessageBus: Subject<FisMessage>): void {
- requests.subscribe(request => {
- this.generateMessage(10).subscribe({
- next: message => outgoingMessageBus.next(message),
- error: error => console.error(error),
- complete: () => {
- outgoingMessageBus.next({
- header: {
- messageID: request.header.messageID,
- messageName: `ResponseMessage`
- },
- data: `Complete`
- } as FisMessage)
- }
- })
- })
- }
- private generateMessage(amount: number): Observable<FisMessage> {
- return new Observable((response: Observer<FisMessage>) => {
- const intervalMessageGeneration = interval(1000).pipe(
- take(amount), // Ensures only 'amount' messages are generated
- map(() => {
- const message: FisMessage = {
- header: {
- messageID: uuidv4(),
- messageName: 'ResponseMessage'
- },
- data: `Data`
- };
- return message;
- })
- );
- const subscription = intervalMessageGeneration.subscribe({
- next: message => response.next(message),
- error: error => response.error(error),
- complete: () => {
- response.next({
- header: {
- messageID: uuidv4(),
- messageName: 'ResponseMessage'
- },
- data: `Complete`
- });
- response.complete();
- }
- });
- // Ensure cleanup on unsubscribe
- return () => subscription.unsubscribe();
- });
- }
- private generateNotifcation(): Observable<FisMessage> {
- return new Observable((response: Observer<FisMessage>) => {
- const intervalMessageGeneration = interval(1000).pipe(
- map(() => {
- const message: FisMessage = {
- header: {
- messageID: uuidv4(),
- messageName: 'NotificationMessage'
- },
- data: `Data`
- };
- return message;
- })
- );
- const subscription = intervalMessageGeneration.subscribe({
- next: message => response.next(message),
- error: error => response.error(error),
- complete: () => {
- response.next({
- header: {
- messageID: uuidv4(),
- messageName: 'NotificationMessage'
- },
- data: `Complete`
- });
- response.complete();
- }
- });
- // Ensure cleanup on unsubscribe
- return () => subscription.unsubscribe();
- });
- }
- }
- let supervisor = new Supervisor()
|