Enzo 1 месяц назад
Родитель
Сommit
5dd33f64e3

+ 3 - 0
00d7df06-395d-4ccd-9473-dd26d58535d6.json

@@ -0,0 +1,3 @@
+{
+  "id": "00d7df06-395d-4ccd-9473-dd26d58535d6"
+}

+ 3 - 0
5975a712-9d61-4bc2-81af-2c05c772b59f.json

@@ -0,0 +1,3 @@
+{
+  "id": "5975a712-9d61-4bc2-81af-2c05c772b59f"
+}

+ 0 - 3
660c8fa3-cc72-4c7e-9855-b973a41735e2.json

@@ -1,3 +0,0 @@
-{
-  "id": "660c8fa3-cc72-4c7e-9855-b973a41735e2"
-}

+ 3 - 0
b2a18219-ee84-4872-bc64-e6f8a7afa6e4.json

@@ -0,0 +1,3 @@
+{
+  "id": "b2a18219-ee84-4872-bc64-e6f8a7afa6e4"
+}

+ 20 - 2
clients.json

@@ -1,7 +1,25 @@
 [
   {
-    "id": "660c8fa3-cc72-4c7e-9855-b973a41735e2",
-    "dateCreated": "2024-11-12T08:04:21.967Z",
+    "id": "5975a712-9d61-4bc2-81af-2c05c772b59f",
+    "dateCreated": "2024-11-14T02:53:13.306Z",
+    "connectionState": null,
+    "socketInstance": null
+  },
+  {
+    "id": "b2a18219-ee84-4872-bc64-e6f8a7afa6e4",
+    "dateCreated": "2024-11-14T02:55:35.822Z",
+    "connectionState": null,
+    "socketInstance": null
+  },
+  {
+    "id": "00d7df06-395d-4ccd-9473-dd26d58535d6",
+    "dateCreated": "2024-11-14T03:20:30.133Z",
+    "connectionState": null,
+    "socketInstance": null
+  },
+  {
+    "id": "d6ff43cc-6a52-497c-9af5-ca3b9e21be7b",
+    "dateCreated": "2024-11-14T03:22:33.808Z",
     "connectionState": null,
     "socketInstance": null
   }

+ 3 - 0
d6ff43cc-6a52-497c-9af5-ca3b9e21be7b.json

@@ -0,0 +1,3 @@
+{
+  "id": "d6ff43cc-6a52-497c-9af5-ca3b9e21be7b"
+}

+ 6 - 0
doc/explanation.txt

@@ -43,3 +43,9 @@ iii) Http Transport service
 -This one will take some time, because need to emulate it to mimic bidirectional streaming, so there's a need to emulate logical channel 
 iv) Think about requestresponse transmission and it's associated adapter's implementation
 -Do this when I have too much time to waste
+
+Discussion Points:
+i) Explore multiple traffic concept
+-To make use of different ports as well as multi ISP to stream data. Eg: External Netword card.
+-See how I can talk to underlying OS network configuration to make use of the network stream and delegate ports to it
+ii) Move transport service instantiation to adapterManager

+ 27 - 6
src/test/receiver.ts

@@ -7,7 +7,7 @@ import { io, Socket } from "socket.io-client";
 import { handleClientSocketConnection } from "../utils/socket.utils";
 import { ConnectedServerSocket } from "../transport/websocket";
 import { v4 as uuidv4 } from 'uuid'
-import { interval, Subject } from "rxjs";
+import { filter, interval, Observable, Observer, Subject } from "rxjs";
 import { TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { FisMessage } from "../interface/transport.interface";
 
@@ -22,23 +22,44 @@ class SocketClient {
         // use the existing socket handler
         handleClientSocketConnection(this.socket, this.connectedServerSocket).subscribe(this.event)
         this.startListening(this.event)
-        this.sendMessage()
+        // interval(5000).subscribe(time => {
+        //     let requestId: string = uuidv4()
+        //     this.sendMessage(requestId).subscribe({
+        //         next: (message: FisMessage) => console.log(`Incoming Response`, message.header.messageID),
+        //         error: error => console.error(error),
+        //         complete: () => console.log(`Completed Response for ${requestId}`)
+        //     })
+        // })
     }
 
     private startListening(event: Subject<TransportEvent>): void {
         event.subscribe((event: TransportEvent) => console.log(event))
     }
 
-    private sendMessage(): void {
-        interval(3000).subscribe(interval => {
+    private sendMessage(requestId: string): Observable<any> {
+        return new Observable((response: Observer<FisMessage>) => {
             let message: FisMessage = {
                 header: {
-                    messageID: uuidv4(),
+                    messageID: requestId,
                     messageName: 'RequestMessage'
-                }, 
+                },
                 data: 'Data'
             }
             this.socket.emit('message', message)
+
+            this.event.pipe(
+                filter(obj => obj.event == 'New Message'),
+                filter(obj => (obj.data as FisMessage).header.messageID == message.header.messageID)
+            ).subscribe({
+                next: (event: TransportEvent) => {
+                    if ((event.data as FisMessage).data == 'Complete') {
+                        response.complete()
+                    }
+                    response.next(message)
+                },
+                error: error => response.error(error),
+                complete: () => { }
+            })
         })
     }
 }

+ 10 - 9
src/test/transmitter.ts

@@ -2,7 +2,7 @@ import { filter, interval, map, Observable, Observer, Subject, take } from "rxjs
 import { Bus, FisMessage, MessageTransmission, TransmissionMessage } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
-import { TransportEvent } from "../interface/connector.interface";
+import { TransportEvent, TransportMessage } from "../interface/connector.interface";
 
 class Supervisor {
     private clientIncomingMessage: Subject<FisMessage> = new Subject()
@@ -26,10 +26,10 @@ class Supervisor {
     // only called once for each connected clients.
     private handleClientActivity(messageTransmission: MessageTransmission): void {
         // start listening to incoming messages from this client
-        messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransmissionMessage) => {
+        messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransportEvent) => {
             console.log(event)
-            let requestMessage: FisMessage = event.payload
-            // this.clientIncomingMessage.next(event.payload as FisMessage)
+            let requestMessage: FisMessage = (event.data as TransportMessage).payload as FisMessage
+            this.clientIncomingMessage.next(requestMessage)
             this.messageProducer.getOutgoingMessages().pipe(
                 filter(message => message.header.messageID === requestMessage.header.messageID)
             ).subscribe(message => {
@@ -47,17 +47,16 @@ class Supervisor {
 
 
 class MessageProducer {
-    private transmissionInstance!: MessageTransmission
     private generalNotification: Subject<FisMessage> = new Subject()
     private incomingMessageBus!: Subject<FisMessage>
     private outgoingMessageBus: Subject<FisMessage> = new Subject()
 
-    constructor(incomingMessageBus: Subject<any>) {
+    constructor(incomingMessageBus: Subject<FisMessage>) {
         console.log(`Contructing Application....`)
         this.incomingMessageBus = incomingMessageBus
 
         this.generateNotifcation().subscribe(this.generalNotification)
-        this.handleIncomingRequests(this.incomingMessageBus, this.outgoingMessageBus)
+        this.handleIncomingRequests(this.incomingMessageBus.asObservable(), this.outgoingMessageBus)
     }
 
     public getNotificationMessage(): Observable<FisMessage> {
@@ -68,8 +67,10 @@ class MessageProducer {
         return this.outgoingMessageBus.asObservable()
     }
 
-    private handleIncomingRequests(requests: Subject<FisMessage>, outgoingMessageBus: Subject<FisMessage>): void {
-        requests.subscribe(request => {
+    // this is called no problem
+    private handleIncomingRequests(requests: Observable<FisMessage>, outgoingMessageBus: Subject<FisMessage>): void {
+        requests.subscribe((request: FisMessage) => {
+            console.log(`Generating response for new request ${request.header.messageID}`)
             this.generateMessage(10).subscribe({
                 next: message => outgoingMessageBus.next(message),
                 error: error => console.error(error),

+ 5 - 8
src/transmission/msg.transmission.receiver.ts

@@ -21,20 +21,17 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         this.receiverProfile = receiverProfile
     }
 
-    getMessageBus(bus: Bus): Observable<TransmissionMessage> {
-        return new Observable((observable: Observer<TransmissionMessage>) => {
+    getMessageBus(bus: Bus): Observable<TransportEvent> {
+        return new Observable((observable: Observer<TransportEvent>) => {
             // logic here
             if (bus == Bus.GeneralBus) {
                 // Need to merge all the adapters into one when the time comes 
                 // SAMPLE: This adapterArray.forEach(adapter => { ... })
                 const subscription: Subscription = (this.adapterService as ReceiverConnectionAdapter).getMessageBus(Bus.GeneralBus).pipe(
                     filter((event: TransportEvent) => event.event == 'New Message'),
-                    map((event: TransportEvent) => event.data as TransportMessage)
-                ).subscribe((message: TransportMessage) => {
-                    observable.next({
-                        adapterId: message.target as string, // assign adapter id so that it knows who to reply back
-                        payload: message.payload
-                    });
+                    // map((event: TransportEvent) => event.data as TransportMessage)
+                ).subscribe((event: TransportEvent) => {
+                    observable.next(event);
                 });
 
                 // Clean up on unsubscription

+ 1 - 1
src/utils/socket.utils.ts

@@ -89,7 +89,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
 
         // Listen for messages from the server. Generally here's the responses
         socket.on('message', (msg: any) => {
-            console.log(`Websocket Client Transport Receieve Msg`, msg.id)
+            // console.log(`Websocket Client Transport Receieve Msg`, msg)
             if (receiverProfileInfo) {
                 // publish to event
                 eventNotification.next({