Selaa lähdekoodia

transmission && adapter bug fix

enzo 1 kuukausi sitten
vanhempi
commit
bccdd0fcdb

+ 1 - 1
src/adapters/adapter.manager.ts

@@ -23,7 +23,7 @@ export class AdapterManager extends AdapterManagerBase {
             const subscription: Subscription = this.event.pipe(
                 // filter(event => event.type === `Transport Event`),
                 // filter(event => event.event === 'New Client' || event.event === `New Server`)
-                filter(event => event.type === `Transmission Event`),
+                filter(event => event.type === `General Event`),
                 filter(event => event.event === `New Transmission`),
             ).subscribe((event: GeneralEvent<TransmissionInterface>) => {
                 let adapters: AdapterInterface[] = this.instantiateAdapterComponents(event.data.clientInfo)

+ 2 - 2
src/adapters/adapter.transmitter.ts

@@ -3,7 +3,7 @@ import { Subject } from 'rxjs';
 import { WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
 import { AdapterBase } from '../base/adapter.base';
-import { TransmissionRole, Transport, TransportMessage, TransportServiceInterface } from '../interface/interface';
+import { FisMessage, TransmissionRole, Transport, TransportMessage, TransportServiceInterface } from '../interface/interface';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
@@ -22,7 +22,7 @@ export class TransmitterAdapter extends AdapterBase {
 
     emit(message: WrappedMessage): void {
         // logic here
-        this.console.log({ message: `Emitting: ${message.thisMessageID} to ${this.adapterId}` })
+        this.console.log({ message: `Emitting: ${(message.payload as FisMessage).header.messageID} to ${this.adapterId}` })
         this.transportService.emit({
             id: this.adapterId,
             transport: this.transport,

+ 22 - 13
src/test/receiver.ts

@@ -31,25 +31,34 @@ class Supervisor {
         this.transmissionManager.subscribeForTransmission().pipe(
             filter(event => event.type === `Transmission Event`),
             filter(event => event.event === `New Transmission`)
-        ).subscribe(
-            ((event: GeneralEvent<TransmissionInterface>) => {
-                let transmission: TransmissionInterface = event.data
-                this.console.log({ message: `Acquired transmission set for client ${transmission.clientId}` })
-                this.transmissionSets.push(transmission)
-
-                this.handleActivity(transmission)
-                this.outgoingPipe.subscribe(message => transmission.transmitter.emit(message))
+        ).subscribe((event: GeneralEvent<TransmissionInterface>) => {
+            this.event.next({
+                id: uuidv4(),
+                type: `General Event`,
+                event: `New Transmission`,
+                date: new Date(),
+                data: event.data
             })
-        )
+            let transmission: TransmissionInterface = event.data
+            this.console.log({ message: `Acquired transmission set for client ${transmission.clientId}` })
+            this.transmissionSets.push(transmission)
 
-        // testing
-        this.event.subscribe(event => {
-            this.console.log({ message: `Supervisor Event: ${event.type} && ${event.event}` })
+            this.startMessageTransmission(transmission)
+            this.outgoingPipe.subscribe((message: FisMessage) => {
+                transmission.transmitter.emit(message)
+            })
         })
+
+
+        // testing
+        // this.event.subscribe(event => {
+        //     this.console.log({ message: `Supervisor Event: ${event.type} && ${event.event}` })
+        // })
     }
 
     // only called once for each connected clients.
-    private handleActivity(messageTransmission: TransmissionInterface): void {
+    private startMessageTransmission(messageTransmission: TransmissionInterface): void {
+        this.console.log({ message: `is this transmission even starte?` })
         // start listening to incoming messages from this client
         messageTransmission.receiver.getIncoming().subscribe((event: GeneralEvent<any>) => {
             this.console.log({ message: `General Bus ${event.event} ${(((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header?.messageID ?? 'Not Message'}`, details: event })

+ 3 - 3
src/test/transmitter.ts

@@ -32,9 +32,9 @@ class Supervisor {
         this.startMessageTransmission()
 
         // testing
-        this.event.subscribe(event => {
-            this.console.log({ message: `Supervisor Event: ${event.type} && ${event.event}` })
-        })
+        // this.event.subscribe(event => {
+        //     this.console.log({ message: `Supervisor Event: ${event.type} && ${event.event}` })
+        // })
     }
 
     private startMessageTransmission(): void {

+ 1 - 1
src/transmission/msg.transmission.manager.ts

@@ -34,7 +34,7 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
                 let transmission: TransmissionInterface | undefined = this.instantiateTransmissionComponents(event.data, this.event)
                 this.console.log({ message: `Passing this transmission for client:${transmission.clientId} to global event bus.` })
                 if (transmission) {
-                    this.event.next({
+                    observer.next({
                         id: uuidv4(),
                         type: `Transmission Event`,
                         event: 'New Transmission',

+ 1 - 7
src/transmission/msg.transmission.transmitter.ts

@@ -13,7 +13,6 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
     private messageToBeBuffered!: Subject<FisMessage | WrappedMessage>
-    private messageToBeTransmitted!: Subject<WrappedMessage>
     private buffer!: RetransmissionService;
     private currentAdapter!: TransmitterAdapterInterface
 
@@ -22,14 +21,10 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         this.console.log({ message: `Constructing Transmitter Transmission with ${clientId}` })
         this.clientId = clientId
         this.event = event
-        this.messageToBeTransmitted = new Subject()
         this.messageToBeBuffered = new Subject()
         this.buffer = new RetransmissionService()
         this.handleAdapters(this.event.asObservable())
         this.setupBuffer()
-
-        // special case just for http in case of server/client disconnected, the unsent msg will be flushed back into messageToBeBuffered
-       
     }
 
     public emit(message: FisMessage): void {
@@ -62,8 +57,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
             // need to work with wrapped messages
             this.console.log({ message: `Releasing ${bufferedMessage.thisMessageID}` });
             if (this.currentAdapter) {
-                // this.currentAdapter.emit(bufferedMessage)
-                this.messageToBeTransmitted.next(bufferedMessage)
+                this.currentAdapter.emit(bufferedMessage)
             } else {
                 this.messageToBeBuffered.next(bufferedMessage)
                 this.console.error({ message: `Adapter is not set. Please ensure adapters are ready.` })

+ 1 - 1
src/utils/general.utils.ts

@@ -1,7 +1,7 @@
 import * as fs from 'fs'
 import path from 'path';
 import ConsoleLogger from './log.utils';
-const console: ConsoleLogger = new ConsoleLogger(`GeneralUtils`, ['base'])
+const console: ConsoleLogger = new ConsoleLogger(`GeneralUtils`, ['util'])
 // Check if filename exists. Return profile information if there's any
 export async function checkOwnClientInfo(filename?: string): Promise<{ id: string }> {
     return new Promise((resolve, reject) => {

+ 2 - 36
src/utils/log.utils.ts

@@ -86,9 +86,9 @@ class ConsoleLogger {
 
         const formattedCategory = categoryStyle(`[${category}]`);
         const formattedClassName = categoryStyle(`${this.className}`);
-        const formattedLocation = locationStyle(`${location}`);
+        const formattedLocation = locationStyle(` ${location}`);
 
-        const formattedMessage = `${formattedClassName} ${formattedLocation}: ${message.message}`;
+        const formattedMessage = `${formattedClassName}${formattedLocation}: ${message.message}`;
         console.log(formattedMessage, message.details ? applyColor([255, 255, 97])(message.details) : '');
 
         if (message.details && this.isCategoryEnabled(["details"])) {
@@ -156,37 +156,3 @@ class ConsoleLogger {
 
 export default ConsoleLogger;
 
-
-// Extract file name and line number using regex
-// const match = callerLine?.match(/\((.*):(\d+):(\d+)\)/);
-// if (match) {
-//     const [_, filePath, line, column] = match;
-//     return `${filePath}:${line}:${column}`;
-// }
-
-// log(message: { message: string, details?: any }): void {
-//     if (!this.isCategoryEnabled(this.categoryPath)) {
-//         return; // Skip logging if the category is disabled
-//     }
-//     const category = this.categoryPath.join(" -> ").toUpperCase();
-//     const location = this.getLogLocation();
-
-//     // Map the primary category to a color
-//     const primaryCategory = this.categoryPath[0];
-//     const categoryStyle = logColors[primaryCategory] || ((text: string) => text); // Default to no style if not found
-//     const formattedCategory = categoryStyle(`[${category}]`); // Apply color to category part
-
-//     // Apply the same color to the className
-//     const formattedClassName = categoryStyle(`${this.className}`);
-
-//     // Format the message
-//     const formattedLocation = logColors.location(`${location}`); // Apply color to location part
-//     const formattedMessage = `${formattedClassName} ${formattedLocation}: ${message.message}`;
-
-//     // Log based on whether it's an error or regular log
-//     console.log(formattedMessage, message.details ? logColors.details(message.details) : '');
-//     // Log details if enabled
-//     if (message.details && this.isCategoryEnabled(["details"])) {
-//         console.log(logColors.details('Details: '), message.details);
-//     }
-// }