Sfoglia il codice sorgente

additional fixes in transmisison and adapter code

enzo 2 mesi fa
parent
commit
a2cf8747ab

+ 2 - 1
src/adapters/adapter.manager.ts

@@ -48,11 +48,12 @@ export class AdapterManager extends AdapterManagerBase {
 
     private connectToExistingTransport(event: Subject<GeneralEvent<any>>): void {
         this.event.pipe(
-            filter(event => event.type === `Transport Event`),
+            filter(event => event.type === `General Event`),
             filter(event => event.event === `Available Transport`)
         ).subscribe((event: GeneralEvent<TransportServiceInterface[]>) => {
             let transportServices: TransportServiceInterface[] = event.data
             transportServices.forEach((transportService: TransportServiceInterface) => {
+                this.console.log({ message: `updating transport in adapter ${transportService.getInfo().transportServiceId}` })
                 this.updateTransportServicesRecord(transportService)
             })
         })

+ 2 - 2
src/adapters/adapter.receiver.ts

@@ -21,12 +21,12 @@ export class ReceiverAdapter extends AdapterBase {
     }
 
     subscribeForIncoming(): Observable<GeneralEvent<any>> {
-        this.console.log({ message: `Connector getting message bus for this connector: ${this.adapterID}` })
+        this.console.log({ message: `Connector getting message bus for this connector: ${this.adapterId}` })
         return new Observable((observable: Observer<GeneralEvent<any>>) => {
             const subscription: Subscription = this.transportService.subscribeForTransportEvent().pipe(
                 filter((message: GeneralEvent<any>) => message.event === 'New Message'),
                 // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
-                filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).target == this.adapterID),
+                filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).target == this.adapterId),
             ).subscribe((message: GeneralEvent<TransportMessage>) => {
                 this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })
                 observable.next({

+ 4 - 4
src/adapters/adapter.transmitter.ts

@@ -21,17 +21,17 @@ export class TransmitterAdapter extends AdapterBase {
 
     emit(message: WrappedMessage): void {
         // logic here
-        this.console.log({ message: `Emitting: ${message.thisMessageID} to ${this.adapterID}` })
+        this.console.log({ message: `Emitting: ${message.thisMessageID} to ${this.adapterId}` })
         this.transportService.emit({
-            id: this.adapterID,
+            id: this.adapterId,
             transport: this.transport,
-            target: this.adapterID, // this should be directed to the channel/client established. Make sure this is right
+            target: this.adapterId, // this should be directed to the channel/client established. Make sure this is right
             payload: message
         } as TransportMessage)
     }
 
     setAdapterProfile(id: string, adapterType: Transport, role: TransmissionRole): void {
-        this.adapterID = id
+        this.adapterId = id
         this.transport = adapterType
         this.role = role
     }

+ 4 - 5
src/base/adapter.base.ts

@@ -6,18 +6,17 @@ dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class AdapterBase implements AdapterInterface {
-    adapterID!: string
-    role!: TransmissionRole
-    transport!: Transport
-
     protected transportService!: TransportServiceInterface
+    adapterId!: string;
+    role!: TransmissionRole;
+    transport: Transport;
 
     constructor() {
         //logic here
     }
 
     setAdapterProfile(id: string, adapterType: Transport, role: TransmissionRole): void {
-        this.adapterID = id
+        this.adapterId = id
         this.transport = adapterType
         this.role = role
     }

+ 1 - 1
src/base/msg.transmission.manager.base.ts

@@ -18,7 +18,7 @@ export class MessageTransmissionManagerBase implements MessageTransmissionManage
         throw new Error('Method not implemented.');
     }
 
-    protected instantiateTransmissionComponents(clientId: string): TransmissionInterface {
+    protected instantiateTransmissionComponents(clientId: string, event: Subject<GeneralEvent<any>>): TransmissionInterface {
         throw new Error(`Method not implemented`)
         /* Subscribe to adapterEvent, adapter will give adapter with the relevant client information. 
         It will first check array of transmission clinetID, so if new clientId is detected then a new 

+ 2 - 1
src/interface/interface.ts

@@ -39,7 +39,8 @@ export interface MessageRequestResponseInterface extends MessageTransmissionInte
 
 /* ADAPTER COMPONENTS */
 export interface AdapterInterface {
-    role: TransmissionRole
+    adapterId: string,
+    role: TransmissionRole,
     transport: Transport
 }
 

+ 1 - 1
src/test/proxy.ts

@@ -11,7 +11,7 @@ let toServer = new Subject<{ event: 'profile' | 'message', payload: any }>()
 
 startSocketServer(3001)
 // startSocketServer(3002)
-// startClientSocketConnection('http://localhost:3000')
+startClientSocketConnection('http://localhost:3000')
 // startHttpServer(3001).then((app: Express) => {
 //     operateHttpServer(app, 'http://localhost:3000/')
 // })

+ 1 - 1
src/test/receiver.ts

@@ -59,7 +59,7 @@ class Supervisor {
         //     complete: () => this.console.log({ message: `Responses Completed for request: ${request.header.messageID}` })
         // })
 
-        // this.startGeneratingRequest(1000, this.outgoingPipe)
+        this.startGeneratingRequest(1000, this.outgoingPipe)
     }
 
     private request(request: FisMessage, messageTransmission: TransmissionInterface): Observable<any> {

+ 21 - 5
src/test/transmitter.ts

@@ -1,4 +1,4 @@
-import { filter, interval, map, Observable, Observer, Subject, take } from "rxjs";
+import { filter, interval, map, Observable, Observer, Subject, Subscription, take } from "rxjs";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { WrappedMessage } from "../utils/message.ordering";
@@ -25,6 +25,7 @@ class Supervisor {
         this.transportSet.forEach(set => {
             this.setUpTransportService(set, this.event)
         })
+        this.tieInAdapterWithExistingTransportServices(this.event)
         // so need them adapters now. But supervisor shouldn't be concerned, only messageTransmissionManager and ConnectionManager
         this.messageProducer = new MessageProducer(this.clientIncomingMessage)
         this.transmissionManager = new MessageTransmissionManager(this.event)
@@ -48,7 +49,7 @@ class Supervisor {
         messageTransmission.receiver.getIncoming().subscribe((event: GeneralEvent<TransportMessage>) => {
             let requestMessage: FisMessage = ((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage
             this.console.log({ message: `General Bus ${requestMessage?.header?.messageID ?? 'Not a message'}`, details: event }) // receiving end
-            this.clientIncomingMessage.next(requestMessage)
+            // this.clientIncomingMessage.next(requestMessage)
             this.messageProducer.getOutgoingMessages().pipe(
                 filter(message => message.header.messageID === requestMessage.header.messageID)
             ).subscribe(message => {
@@ -57,9 +58,9 @@ class Supervisor {
         })
 
         // to emulate general notification. Send every second
-        this.messageProducer.getNotificationMessage().subscribe((message: FisMessage) => {
-            messageTransmission.transmitter.emit(message)
-        })
+        // this.messageProducer.getNotificationMessage().subscribe((message: FisMessage) => {
+        //     messageTransmission.transmitter.emit(message)
+        // })
     }
 
     // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.
@@ -108,6 +109,21 @@ class Supervisor {
         })
         this.console.log({ message: 'TransportSetList', details: this.transportSet })
     }
+
+    private tieInAdapterWithExistingTransportServices(eventBus: Subject<GeneralEvent<any>>): void {
+        const subscription: Subscription = eventBus.pipe(
+            filter(event => event.type === `Adapter Event`),
+            filter(event => event.event === `Adapter Manager Started`)
+        ).subscribe(event => {
+            eventBus.next({
+                id: uuidv4(),
+                type: `General Event`,
+                event: `Available Transport`,
+                date: new Date(),
+                data: this.transportServiceArray
+            })
+        })
+    }
 }
 
 

+ 5 - 18
src/transmission/msg.transmission.manager.ts

@@ -19,8 +19,6 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
         if (browserEnv) this.browserEnv = browserEnv
         this.console.log({ message: `Constructing self...` })
         this.event = event
-        this.event.subscribe((transport: GeneralEvent<TransportServiceInterface>) => {
-        })
         // Subscribe for adapterManager and it's relevent event
         this.adapterManager = new AdapterManager(event)
         this.adapterManager
@@ -32,7 +30,7 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
                 filter(event => event.event == 'New Client')
             ).subscribe(event => {
                 // get all adapters for all the connection
-                let transmission: TransmissionInterface | undefined = this.instantiateTransmissionComponents(event?.data?.clientId)
+                let transmission: TransmissionInterface | undefined = this.instantiateTransmissionComponents(event?.data?.clientId, this.event)
                 if (transmission) {
                     observer.next({
                         id: uuidv4(),
@@ -46,10 +44,10 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
         })
     }
 
-    protected instantiateTransmissionComponents(clientId: string): TransmissionInterface {
-        let receiverInstance: MessageTransmissionReceiver = this.getReceiver(clientId, this.event)
-        let transmitterInstance: MessageTransmissionTransmitter = this.getTransmitter(clientId, this.event)
-        let requestResponseInstance: MessageTransmissionRequestResponse = this.getRequestResponse(clientId, this.event, transmitterInstance, receiverInstance)
+    protected instantiateTransmissionComponents(clientId: string, eventRef: Subject<GeneralEvent<any>>): TransmissionInterface {
+        let receiverInstance: MessageTransmissionReceiver = new MessageTransmissionReceiver(clientId, eventRef)
+        let transmitterInstance: MessageTransmissionTransmitter = new MessageTransmissionTransmitter(clientId, eventRef)
+        let requestResponseInstance: MessageTransmissionRequestResponse = new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, eventRef)
         let transmissionObj: TransmissionInterface = {
             clientId: clientId,
             transmitter: transmitterInstance,
@@ -61,17 +59,6 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
         return transmissionObj
     }
 
-    private getReceiver(clientId: string, eventObj: Subject<GeneralEvent<any>>): MessageTransmissionReceiver {
-        throw new Error(`Method not defined`)
-    }
-    private getTransmitter(clientId: string, eventObj: Subject<GeneralEvent<any>>): MessageTransmissionTransmitter {
-        throw new Error(`Method not defined`)
-    }
-    private getRequestResponse(clientId: string, eventObj: Subject<GeneralEvent<any>>, transmitter: MessageTransmissionTransmitter, receiver: MessageTransmissionReceiver): MessageTransmissionRequestResponse {
-        throw new Error(`Method not defined`)
-    }
-
-
 }
 
 

+ 24 - 11
src/transmission/msg.transmission.receiver.ts

@@ -1,18 +1,21 @@
-import { filter, Observable, Observer, Subject, Subscription } from 'rxjs';
+import { filter, map, Observable, Observer, Subject, Subscription } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
 import { ReceiverAdapter } from '../adapters/adapter.receiver';
 import { checkMessage, WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
 import { MessageTransmissionBase } from '../base/msg.transmission.base';
-import { GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
+import { AdapterInterface, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
+import { AdapterBase } from '../base/adapter.base';
+import { IncomingMessage } from 'http';
 
 export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
     private onHoldMessage: Subject<WrappedMessage> = new Subject()
-    private currentAdapter!: ReceiverAdapterInterface
+    private currentAdapter!: ReceiverAdapter
+    private incomingMessage: Subject<GeneralEvent<TransportMessage>> = new Subject()
     // private toBePassedOver: Subject<WrappedMessage> = new Subject()
 
-    constructor(clientId: string, event:  Subject<GeneralEvent<any>>) {
+    constructor(clientId: string, event: Subject<GeneralEvent<any>>) {
         super()
         this.clientId = clientId
         this.event = event
@@ -23,10 +26,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
     getIncoming(): Observable<GeneralEvent<TransportMessage>> {
         this.console.log({ message: `Transmission getting message bus for ${this.clientId}` })
         return new Observable((observable: Observer<GeneralEvent<any>>) => {
-            // logic here
-            // Need to merge all the adapters into one when the time comes 
-            // SAMPLE: This adapterArray.forEach(adapter => { ... })
-            const subscription: Subscription = this.currentAdapter.subscribeForIncoming().pipe(
+            const subscription: Subscription = this.incomingMessage.pipe(
                 filter((event: GeneralEvent<any>) => event.event == 'New Message'),
             ).subscribe((event: GeneralEvent<TransportMessage>) => {
                 // console.log(event) // data is transportMessage instead of eventmessage
@@ -39,18 +39,31 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
                 }).catch((error) => {
                     this.console.log({ message: `Observer Error`, details: error })
                 })
-            });
+            })
 
             // Clean up on unsubscription
             return () => {
                 subscription.unsubscribe();
             };
-
         })
     }
 
     private handleAdapterEvent(adapterEvent: Observable<GeneralEvent<any>>): void {
-
+        const subscription: Subscription = adapterEvent.pipe(
+            filter(event => event.type === `Adapter Event`),
+            filter(event => event.event === `New Adapter`),
+            map(event => {
+                return event.data
+            })
+        ).subscribe((adapter: AdapterInterface) => {
+            if (adapter.role == `Receiver` && !this.adapters.some(adapterObj => adapterObj.adapterId === adapter.adapterId)) {
+                this.adapters.push(adapter)
+                this.currentAdapter = adapter as ReceiverAdapter
+                this.currentAdapter.subscribeForIncoming().subscribe(this.incomingMessage)
+            } else {
+                this.console.error({ message: `Adapter ID: ${adapter.adapterId} already existed.` })
+            }
+        })
     }
 
 }

+ 33 - 15
src/transmission/msg.transmission.transmitter.ts

@@ -12,7 +12,8 @@ connectors or adapters will have their own identifier*/
 export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
     private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
-    private messageToBeTransmitted!: Subject<FisMessage | WrappedMessage>
+    private messageToBeBuffered!: Subject<FisMessage | WrappedMessage>
+    private messageToBeTransmitted!: Subject<WrappedMessage>
     private buffer!: RetransmissionService;
     private currentAdapter!: TransmitterAdapterInterface
 
@@ -21,17 +22,18 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         this.console.log({ message: `Constructing Transmitter Transmission with ${clientId}` })
         this.event = event
         this.messageToBeTransmitted = new Subject()
+        this.messageToBeBuffered = new Subject()
         this.buffer = new RetransmissionService()
         this.handleAdapters(this.event)
         this.setupBuffer()
 
-        // special case just for http in case of server/client disconnected, the unsent msg will be flushed back into messageToBeTransmitted
+        // special case just for http in case of server/client disconnected, the unsent msg will be flushed back into messageToBeBuffered
         // logic here
     }
 
     public emit(message: FisMessage): void {
         this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting message` : `Buffering message`}` })
-        this.messageToBeTransmitted.next(message)
+        this.messageToBeBuffered.next(message)
     }
 
     private setupBuffer(): void {
@@ -52,25 +54,22 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
             if (signal == 'OFFLINE') this.console.error({ message: `${this.clientId} disconnected` })
             if (signal == 'ONLINE') this.console.log({ message: `${this.clientId} connected` })
         })
-        this.buffer.implementRetransmission(this.messageToBeTransmitted, this.connectionStateEvent.asObservable(), true)
+        this.buffer.implementRetransmission(this.messageToBeBuffered, this.connectionStateEvent.asObservable(), true)
         // automatically subscribe to allow released bffered messages to be released
         this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
             // need to work with wrapped messages
             this.console.log({ message: `Releasing ${bufferedMessage.thisMessageID}` });
             if (this.currentAdapter) {
-                this.currentAdapter.emit(bufferedMessage)
-            } else {
+                // this.currentAdapter.emit(bufferedMessage)
                 this.messageToBeTransmitted.next(bufferedMessage)
+            } else {
+                this.messageToBeBuffered.next(bufferedMessage)
                 this.console.error({ message: `Adapter is not set. Please ensure adapters are ready.` })
             }
         })
     }
 
-    private handleAdapters(adaptersEvent: Observable<GeneralEvent<any>>): void {
-        this.handleNewAdapters(adaptersEvent)
-    }
-
-    private handleNewAdapters(adaptersEvent: Observable<GeneralEvent<any>>): void {
+    private handleAdapters(adaptersEvent: Subject<GeneralEvent<any>>): void {
         adaptersEvent.pipe(
             filter(event => event.event === `New Adapter`),
             map(event => { return event.data }),
@@ -79,15 +78,34 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
                 adapters.forEach((adapter: AdapterInterface) => {
                     if (adapter.role === `Transmitter`) {
                         this.adapters.push(adapter as TransmitterAdapterInterface)
+                        adaptersEvent.next({
+                            id: uuidv4(),
+                            type: 'Transmission Event',
+                            event: `New Adapter`,
+                            date: new Date(),
+                            data: adapter,
+                            transport: adapter.transport
+                        })
                     }
                 })
-                this.setUpAdapter()
             },
             error: error => this.console.error({ message: 'Observer Error', details: error })
         })
-    }
 
-    private handleAdaptersTermination(adaptersEvent: Observable<GeneralEvent<any>>): void {
+        // listen to newly added adapters in transmission
+        adaptersEvent.pipe(
+            filter(event => event.type === `Transmission Event`),
+            filter(event => event.event === `New Adapter`),
+            map(event => {
+                return event.data
+            })
+        ).subscribe((adapter: AdapterInterface) => {
+            if (!this.currentAdapter) {
+                this.currentAdapter = adapter as TransmitterAdapterInterface
+            } else {
+                this.console.log({ message: `Already have existing transmitting adapter. Currently hardcode to use only 1` })
+            }
+        })
     }
 
     // temporary logic for now. 
@@ -105,7 +123,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
             filter(event => event.data.clientId == this.clientId),
         ).subscribe((event: GeneralEvent<any>) => {
             this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}` : `Buffering ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}`}` })
-            this.messageToBeTransmitted.next(((event.data.payload as TransportMessage).payload as WrappedMessage))
+            this.messageToBeBuffered.next(((event.data.payload as TransportMessage).payload as WrappedMessage))
         })
     }
 

+ 2 - 1
src/utils/general.utils.ts

@@ -1,6 +1,7 @@
 import * as fs from 'fs'
 import path from 'path';
-
+import ConsoleLogger from './log.utils';
+const console: ConsoleLogger = new ConsoleLogger(`GeneralUtils`, ['base'])
 // Check if filename exists. Return profile information if there's any
 export async function checkOwnClientInfo(filename?: string): Promise<{ id: string }> {
     return new Promise((resolve, reject) => {