123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- import { filter, interval, map, Observable, Observer, Subject, take } from "rxjs";
- import { Bus, EventMessage, FisMessage, MessageTransmission, TransmissionMessage } from "../interface/transport.interface";
- import { v4 as uuidv4 } from 'uuid'
- import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
- import { error } from "console";
- import e from "express";
- import { TransportEvent } from "../interface/connector.interface";
- /* These are the purple fonts. Gonna interact with blue fonts to set up the credentials to establish the necessary roles.
- Assuming the primary role is server. That means we will need transmitter and multiple receiver profiles that are connected. */
- class Application {
- messageTransmissionManager: MessageTransmissionManager
- transmissionInstance!: MessageTransmission
- generalNotification: Subject<FisMessage> = new Subject()
- constructor() {
- this.messageTransmissionManager = new MessageTransmissionManager()
- this.transmissionInstance = this.messageTransmissionManager.getTransmissionInstance()
- this.generateNotifcation().subscribe(this.generalNotification)
- }
- // Emulating request response. For the case where this transmitter is acting as a receiver
- send(message: FisMessage): Observable<FisMessage> {
- return new Observable((response) => {
- // logic here
- })
- }
- // Transmission only
- emit(message: FisMessage, adapterId: string): void {
- this.transmissionInstance.transmitter.emit({
- adapterId: adapterId, // this should mqatch the request ID??
- payload: message
- })
- }
- // Receiving only
- susbcribe(): Observable<FisMessage> {
- return new Observable((observer: Observer<any>) => {
- this.transmissionInstance.receiver.getMessageBus(Bus.GeneralBus).subscribe((message: TransmissionMessage) => {
- // logic here
- this.appProcess(message.adapterId, message.payload)
- })
- })
- }
- // no request needed, auto broadcast
- subscribeForNewClientWhoWantsNotification(): void {
- this.transmissionInstance.event.pipe(
- filter(obj => obj.event == 'New Adapter')
- ).subscribe((event: TransportEvent) => {
- this.generalNotification.subscribe((message: FisMessage) => {
- this.emit(message, (event.data as EventMessage).adapterId)
- })
- })
- }
- // just assume that the provide will provide 10 responses messages
- appProcess(adapterId: string, message: FisMessage): void {
- this.generateMessage(10).subscribe({
- next: (message: FisMessage) => {
- this.emit(message, adapterId)
- },
- error: error => console.error(error),
- complete: () => console.log(`All responses generated completed and passed into adapter: ${adapterId}`)
- })
- }
- private generateMessage(amount: number): Observable<FisMessage> {
- return new Observable((response: Observer<FisMessage>) => {
- const intervalMessageGeneration = interval(1000).pipe(
- take(amount), // Ensures only 'amount' messages are generated
- map(() => {
- const message: FisMessage = {
- header: {
- messageID: uuidv4(),
- messageName: 'ResponseMessage'
- },
- data: `Data`
- };
- return message;
- })
- );
- const subscription = intervalMessageGeneration.subscribe({
- next: message => response.next(message),
- error: error => response.error(error),
- complete: () => {
- response.next({
- header: {
- messageID: uuidv4(),
- messageName: 'ResponseMessage'
- },
- data: `Complete`
- });
- response.complete();
- }
- });
- // Ensure cleanup on unsubscribe
- return () => subscription.unsubscribe();
- });
- }
- private generateNotifcation(): Observable<FisMessage> {
- return new Observable((response: Observer<FisMessage>) => {
- const intervalMessageGeneration = interval(1000).pipe(
- map(() => {
- const message: FisMessage = {
- header: {
- messageID: uuidv4(),
- messageName: 'ResponseMessage'
- },
- data: `Data`
- };
- return message;
- })
- );
- const subscription = intervalMessageGeneration.subscribe({
- next: message => response.next(message),
- error: error => response.error(error),
- complete: () => {
- response.next({
- header: {
- messageID: uuidv4(),
- messageName: 'NotificationMessage'
- },
- data: `Complete`
- });
- response.complete();
- }
- });
- // Ensure cleanup on unsubscribe
- return () => subscription.unsubscribe();
- });
- }
- }
- const application = new Application()
|