ソースを参照

general request response fix

enzo 1 ヶ月 前
コミット
b23c530022

+ 2 - 1
src/adapters/adapter.receiver.ts

@@ -24,7 +24,8 @@ export class ReceiverAdapter extends AdapterBase {
     subscribeForIncoming(): Observable<GeneralEvent<any>> {
         this.console.log({ message: `Connector getting message bus for this connector: ${this.adapterId}` })
         return new Observable((observable: Observer<GeneralEvent<any>>) => {
-            const subscription: Subscription = this.transportService.subscribeForTransportEvent().pipe(
+            const subscription: Subscription = this.transportService.subscribeForEvent().pipe(
+                filter(event => event.type === `Transport Event`), 
                 filter((message: GeneralEvent<any>) => message.event === 'New Message'),
                 // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
                 filter((message: GeneralEvent<TransportMessage>) => (message.data as TransportMessage).target == this.adapterId),

+ 1 - 1
src/interface/interface.ts

@@ -110,7 +110,7 @@ export type ConnectionState = 'ONLINE' | 'OFFLINE'
 export interface TransportServiceInterface {
     getInfo(): TransportServiceProfile
     emit(message: TransportMessage): void
-    subscribeForTransportEvent(): Observable<GeneralEvent<any>> //all messages and whatever event will go through this, easier to implemnet across different transport protocol
+    subscribeForEvent(): Observable<GeneralEvent<any>> //all messages and whatever event will go through this, easier to implemnet across different transport protocol
 }
 
 export interface TransportServiceProfile {

+ 19 - 18
src/test/receiver.ts

@@ -28,6 +28,14 @@ class Supervisor {
         this.tieInAdapterWithExistingTransportServices(this.event)
 
         this.transmissionManager = new MessageTransmissionManager(this.event, this.isClient)
+        this.startMessageTransmission()
+        // testing
+        // this.event.subscribe(event => {
+        //     this.console.log({ message: `Supervisor Event: ${event.type} && ${event.event}` })
+        // })
+    }
+
+    private startMessageTransmission(): void {
         this.transmissionManager.subscribeForTransmission().pipe(
             filter(event => event.type === `Transmission Event`),
             filter(event => event.event === `New Transmission`)
@@ -43,28 +51,21 @@ class Supervisor {
             this.console.log({ message: `Acquired transmission set for client ${transmission.clientId}` })
             this.transmissionSets.push(transmission)
 
-            this.startMessageTransmission(transmission)
-            this.outgoingPipe.subscribe((message: FisMessage) => {
-                transmission.transmitter.emit(message)
-            })
+            this.handleClientActivity(transmission)
         })
-
-
-        // testing
-        // this.event.subscribe(event => {
-        //     this.console.log({ message: `Supervisor Event: ${event.type} && ${event.event}` })
-        // })
     }
-
     // only called once for each connected clients.
-    private startMessageTransmission(messageTransmission: TransmissionInterface): void {
-        this.console.log({ message: `is this transmission even starte?` })
+    private handleClientActivity(messageTransmission: TransmissionInterface): void {
         // start listening to incoming messages from this client
         messageTransmission.receiver.getIncoming().subscribe((event: GeneralEvent<any>) => {
             this.console.log({ message: `General Bus ${event.event} ${(((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header?.messageID ?? 'Not Message'}`, details: event })
             this.generalBus.next(event)
         })
 
+        this.outgoingPipe.subscribe((message: FisMessage) => {
+            messageTransmission.transmitter.emit(message)
+        })
+
         let request: FisMessage = {
             header: {
                 messageID: uuidv4(),
@@ -73,12 +74,12 @@ class Supervisor {
             data: 'Data'
         }
 
-        // this.request(request, messageTransmission).subscribe({
-        //     next: res => this.console.log({ message: `received ${res.header.messageID}`, details: res }),
-        //     complete: () => this.console.log({ message: `Responses Completed for request: ${request.header.messageID}` })
-        // })
+        this.request(request, messageTransmission).subscribe({
+            next: res => this.console.log({ message: `received ${res.header.messageID}`, details: res }),
+            complete: () => this.console.log({ message: `Responses Completed for request: ${request.header.messageID}` })
+        })
 
-        this.startGeneratingRequest(1000, this.outgoingPipe)
+        // this.startGeneratingRequest(1000, this.outgoingPipe)
     }
 
     private request(request: FisMessage, messageTransmission: TransmissionInterface): Observable<any> {

+ 13 - 6
src/test/transmitter.ts

@@ -41,12 +41,19 @@ class Supervisor {
         this.transmissionManager.subscribeForTransmission().pipe(
             filter(event => event.type === `Transmission Event`),
             filter(event => event.event == `New Transmission`)
-        ).subscribe(event => {
+        ).subscribe((event: GeneralEvent<TransmissionInterface>) => {
+            this.event.next({
+                id: uuidv4(),
+                type: `General Event`,
+                event: `New Transmission`,
+                date: new Date(),
+                data: event.data
+            })
+            let transmission: TransmissionInterface = event.data
+            this.console.log({ message: `Acquired transmission set for client ${transmission.clientId}` })
             // update transmission record on every new transmission object instantiated
-            this.transmissionSets.push(event.data as TransmissionInterface)
-            // start message transmission for said transmission object instantiated
-            this.console.log({ message: `Received transmission object ${(event.data as TransmissionInterface).clientId}` })
-            this.handleClientActivity(event.data as TransmissionInterface)
+            this.transmissionSets.push(transmission)
+            this.handleClientActivity(transmission)
         })
     }
 
@@ -56,7 +63,7 @@ class Supervisor {
         messageTransmission.receiver.getIncoming().subscribe((event: GeneralEvent<TransportMessage>) => {
             let requestMessage: FisMessage = ((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage
             this.console.log({ message: `General Bus ${requestMessage?.header?.messageID ?? 'Not a message'}`, details: event }) // receiving end
-            // this.clientIncomingMessage.next(requestMessage)
+            this.clientIncomingMessage.next(requestMessage)
             this.messageProducer.getOutgoingMessages().pipe(
                 filter(message => message.header.messageID === requestMessage.header.messageID)
             ).subscribe(message => {

+ 1 - 1
src/transmission/msg.transmission.manager.ts

@@ -32,7 +32,7 @@ export class MessageTransmissionManager extends MessageTransmissionManagerBase {
             ).subscribe((event: GeneralEvent<ClientObject>) => {
                 // get all adapters for all the connection
                 let transmission: TransmissionInterface | undefined = this.instantiateTransmissionComponents(event.data, this.event)
-                this.console.log({ message: `Passing this transmission for client:${transmission.clientId} to global event bus.` })
+                this.console.log({ message: `Acknowledged new client: ${transmission.clientId}. Instantiated Transmission Components.` })
                 if (transmission) {
                     observer.next({
                         id: uuidv4(),

+ 18 - 20
src/transmission/msg.transmission.receiver.ts

@@ -5,13 +5,11 @@ import { checkMessage, WrappedMessage } from '../utils/message.ordering';
 import ConsoleLogger from '../utils/log.utils';
 import { MessageTransmissionBase } from '../base/msg.transmission.base';
 import { AdapterInterface, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransportMessage } from '../interface/interface';
-import { AdapterBase } from '../base/adapter.base';
-import { IncomingMessage } from 'http';
 
 export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
     private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
     private onHoldMessage: Subject<WrappedMessage> = new Subject()
-    private currentAdapter!: ReceiverAdapter
+    private currentAdapter!: ReceiverAdapterInterface
     private incomingMessage: Subject<GeneralEvent<TransportMessage>> = new Subject()
     // private toBePassedOver: Subject<WrappedMessage> = new Subject()
 
@@ -20,7 +18,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         this.clientId = clientId
         this.event = event
 
-        this.handleAdapterEvent(this.event.asObservable())
+        this.handleAdapters(this.event.asObservable())
     }
 
     getIncoming(): Observable<GeneralEvent<TransportMessage>> {
@@ -48,25 +46,25 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         })
     }
 
-    private handleAdapterEvent(adapterEvent: Observable<GeneralEvent<any>>): void {
-        const subscription: Subscription = adapterEvent.pipe(
+    private handleAdapters(adapterEvent: Observable<GeneralEvent<any>>): void {
+        adapterEvent.pipe(
             filter(event => event.type === `Adapter Event`),
             filter(event => event.event === `New Adapter`),
-            map(event => {
-                return event.data
-            }),
+            map(event => { return event.data as AdapterInterface }),
             filter((adapter: AdapterInterface) => adapter.role === `Receiver`),
-            map(adapter => {
-                return adapter as ReceiverAdapter
-            })
-        ).subscribe((adapter: ReceiverAdapter) => {
-            if (!this.adapters.some(adapterObj => adapterObj.adapterId === adapter.adapterId)) {
-                this.adapters.push(adapter)
-                this.currentAdapter = adapter
-                this.currentAdapter.subscribeForIncoming().subscribe(this.incomingMessage)
-            } else {
-                this.console.error({ message: `Adapter ID: ${adapter.adapterId} already existed.` })
-            }
+            map(adapter => { return adapter as ReceiverAdapter })
+        ).subscribe({
+            next: (adapter: ReceiverAdapterInterface) => {
+                if (!this.adapters.some(adapterObj => adapterObj.adapterId === adapter.adapterId)) {
+                    this.adapters.push(adapter)
+                    this.currentAdapter = adapter
+                    this.console.log({ message: `Setting Current adapter = ${this.currentAdapter.adapterId}` })
+                    this.currentAdapter.subscribeForIncoming().subscribe(this.incomingMessage)
+                } else {
+                    this.console.error({ message: `Adapter ID: ${adapter.adapterId} already existed.` })
+                }
+            },
+            error: error => this.console.error({ message: 'Observer Error', details: error })
         })
     }
 

+ 4 - 4
src/transmission/msg.transmission.transmitter.ts

@@ -28,7 +28,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     }
 
     public emit(message: FisMessage): void {
-        this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting message` : `Buffering message`}` })
+        // this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting...` : `Buffering...`}` })
         this.messageToBeBuffered.next(message)
     }
 
@@ -55,12 +55,12 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         // automatically subscribe to allow released bffered messages to be released
         this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
             // need to work with wrapped messages
-            this.console.log({ message: `Releasing ${bufferedMessage.thisMessageID}` });
+            this.console.log({ message: `Transmitting ${bufferedMessage.thisMessageID}` });
             if (this.currentAdapter) {
                 this.currentAdapter.emit(bufferedMessage)
             } else {
                 this.messageToBeBuffered.next(bufferedMessage)
-                this.console.error({ message: `Adapter is not set. Please ensure adapters are ready.` })
+                this.console.error({ message: `Adapter is not set. Please ensure adapters are ready. Message ${(bufferedMessage.payload as FisMessage).header.messageID} is flushed back into buffer.` })
             }
         })
     }
@@ -70,7 +70,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         adaptersEvent.pipe(
             filter(event => event.type === `Adapter Event`),
             filter(event => event.event === `New Adapter`),
-            map(event => { return event.data }),
+            map(event => { return event.data as AdapterInterface }),
             filter((adapter: AdapterInterface) => adapter.role === `Transmitter`),
             map(adapter => { return adapter as TransmitterAdapterInterface })
         ).subscribe({

+ 1 - 1
src/transport/http.ts

@@ -26,7 +26,7 @@ export class HttpTransportService implements TransportServiceInterface {
         this.transportEvent = event
     }
 
-    public subscribeForTransportEvent(): Observable<GeneralEvent<any>> {
+    public subscribeForEvent(): Observable<GeneralEvent<any>> {
         throw new Error('Method not implemented.');
     }
 

+ 6 - 6
src/transport/websocket.ts

@@ -14,7 +14,7 @@ export class WebsocketTransportService implements TransportServiceInterface {
     private connectedSocketServer: ConnectedSocketServer[] = [] // to allow the possibility of having to communicate with multiple servers as a client
     private connectedClientSocket: ConnectedSocketClient[] = [] // to keep track of the all the clients that are connected
     // private incomingMessage: Subject<TransportMessage> = new Subject() // this is only for client roles only atm
-    private transportEvent!: Subject<GeneralEvent<any>>
+    private event!: Subject<GeneralEvent<any>>
 
     constructor(event: Subject<GeneralEvent<any>>) {
         this.console.log({ message: `WebsocketTransportService: Constructing socket transport service....` })
@@ -23,14 +23,14 @@ export class WebsocketTransportService implements TransportServiceInterface {
             transport: 'Websocket'
         }
         // logic here
-        this.transportEvent = event
+        this.event = event
     }
 
     public startServer(port: number): void {
         startSocketServer(port).subscribe({
             next: (connectedClient: SocketForConnectedClient) => {
                 handleNewSocketClient(this.info.transportServiceId, connectedClient, this.connectedClientSocket).subscribe({
-                    next: event => this.transportEvent.next(event),
+                    next: event => this.event.next(event),
                     error: error => this.console.error({ message: `Observer Error: ${error}`, details: error }),
                     complete: () => this.console.log({ message: `Client ${connectedClient.id} disconnected...` })
                 })
@@ -43,7 +43,7 @@ export class WebsocketTransportService implements TransportServiceInterface {
     public startClient(url: string): void {
         // logic here
         startClientSocketConnection(url).then((socket: SocketForConnectedServer) => {
-            handleClientSocketConnection(this.info.transportServiceId, socket, this.connectedSocketServer).subscribe(this.transportEvent)
+            handleClientSocketConnection(this.info.transportServiceId, socket, this.connectedSocketServer).subscribe(this.event)
         }).catch((error) => {
             this.console.log({ message: `Observer Error`, details: error })
         })
@@ -65,8 +65,8 @@ export class WebsocketTransportService implements TransportServiceInterface {
         }
     }
 
-    public subscribeForTransportEvent(): Observable<GeneralEvent<any>> {
-        return this.transportEvent.asObservable()
+    public subscribeForEvent(): Observable<GeneralEvent<any>> {
+        return this.event.asObservable()
     }
 
     public getInfo(): TransportServiceProfile {

+ 2 - 2
src/utils/socket.utils.ts

@@ -148,7 +148,7 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
                 serversConnected.push(receiverProfileInfo)
             }
             if (data.name == 'Adjusted Profile') {
-                console.log({ message: `Adjusted client Name: ${(data.message as ConnectedSocketServer).clientId}` })
+                console.log({ message: `Adjusted client Name: ${data.message.id}` })
                 // Update websocket instance record
                 let clientObj: ConnectedSocketServer | undefined = serversConnected.find(obj => obj.clientId === data.message.id)
                 if (clientObj) {
@@ -204,7 +204,7 @@ export function handleClientSocketConnection(transportServiceId: string, socket:
 // For SERVER Usage: set up socket listeners to start listening for different events
 export function handleNewSocketClient(transportServiceId: string, socket: SocketForConnectedClient, connectedClientSocket: ConnectedSocketClient[]): Observable<GeneralEvent<any>> {
     return new Observable((event: Observer<GeneralEvent<any>>) => {
-        console.log({ message: `Setting up listeners for socket:${socket.id}` })
+        console.log({ message: `New sopcket client connected. Setting up listeners for socket:${socket.id}` })
         // returns the socket client instance 
         // listen to receiver's initiotion first before assigning 'credentials'
         socket.on(`profile`, (message: { name: string, data: any }) => {