浏览代码

implemented retransmission for single server and single client test

Enzo 2 月之前
父节点
当前提交
08a8031528

+ 0 - 3
2d05fca6-e7aa-4eb1-900a-3da684c150be.json

@@ -1,3 +0,0 @@
-{
-  "id": "2d05fca6-e7aa-4eb1-900a-3da684c150be"
-}

+ 3 - 0
660c8fa3-cc72-4c7e-9855-b973a41735e2.json

@@ -0,0 +1,3 @@
+{
+  "id": "660c8fa3-cc72-4c7e-9855-b973a41735e2"
+}

+ 2 - 2
clients.json

@@ -1,7 +1,7 @@
 [
   {
-    "id": "2d05fca6-e7aa-4eb1-900a-3da684c150be",
-    "dateCreated": "2024-11-12T07:18:20.740Z",
+    "id": "660c8fa3-cc72-4c7e-9855-b973a41735e2",
+    "dateCreated": "2024-11-12T08:04:21.967Z",
     "connectionState": null,
     "socketInstance": null
   }

+ 5 - 3
src/interface/transport.interface.ts

@@ -3,6 +3,7 @@ import { AdapterProfile, AdapterSet, AdaptorTransmissionRole, ConnectionManager,
 import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
 import { ConnectionAdapter } from "../connector/connector.base";
+import { RetransmissionService } from "../utils/retransmission.service";
 
 export interface MessageTransmissionManager {
     subscribe(): Observable<MessageTransmission>
@@ -16,10 +17,10 @@ export interface MessageTransmission {
     event: Observable<TransportEvent>
 }
 
-export interface MessageTransmissionBase  {
+export interface MessageTransmissionBase {
     msgRepositoryService: any // like logging service and what not
     transmissionRole: AdaptorTransmissionRole
-    adaptorsArray: Array<ConnectionAdapter> 
+    adaptorsArray: Array<ConnectionAdapter>
 
     getInfo(): TransmissionProfile
     setUpAdapter(adapterSet: AdapterSet[]): void
@@ -27,13 +28,14 @@ export interface MessageTransmissionBase  {
 
 export interface MessageReceiver extends MessageTransmissionBase {
     receiverProfile: ReceiverProfile
-    
+
     getMessageBus(bus: Bus): Observable<any>
     setReceiver(receiverProfile: ReceiverProfile, role: AdaptorTransmissionRole, event: Subject<TransportEvent>): void
 }
 
 export interface MessageTransmitter extends MessageTransmissionBase {
     transmitterProfile: TransmitterProfile
+    retransmission: RetransmissionService
 
     setTransmitter(transmitterProfile: TransmitterProfile, role: AdaptorTransmissionRole, event: Subject<TransportEvent>): void
 }

+ 3 - 3
src/test/proxy.ts

@@ -8,11 +8,11 @@ let toServer = new Subject<{ event: 'profile' | 'message', payload: any }>()
 
 startSocketServer(3001)
 startClientSocketConnection('http://localhost:3000')
-
+consoleLog()
 
 function consoleLog(): void {
-    fromServer.subscribe(message => console.log(`From Server`, message.event))
-    toServer.subscribe(message => console.log(`To Server`, message.event))
+    fromServer.subscribe(message => console.log(`From Server`, message.event, message.payload?.header?.messageID ?? 'undefined message'))
+    toServer.subscribe(message => console.log(`To Server`, message.event, message.payload?.header?.messageID ?? 'undefined message'))
 }
 
 function startSocketServer(port: number): void {

+ 2 - 1
src/test/transmitter.ts

@@ -28,8 +28,9 @@ class Supervisor {
     private handleClientActivity(messageTransmission: MessageTransmission): void {
         // start listening to incoming messages from this client
         messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransmissionMessage) => {
+            console.log(event)
             let requestMessage: FisMessage = event.payload
-            this.clientIncomingMessage.next(event.payload as FisMessage)
+            // this.clientIncomingMessage.next(event.payload as FisMessage)
             this.messageProducer.getOutgoingMessages().pipe(
                 filter(message => message.header.messageID === requestMessage.header.messageID)
             ).subscribe(message => {

+ 1 - 0
src/transmission/msg.transmission.manager.ts

@@ -141,6 +141,7 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
             if(event.event == 'Client Reconnected') {
                 this.reconnectionHandler((event.data as EventMessage).clientId)
             } 
+            
             // can include more event handlers here
         })
     }

+ 31 - 3
src/transmission/msg.transmission.transmitter.ts

@@ -1,19 +1,46 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
 import { EventMessage, FisMessage, MessageTransmitter as MessageTransmitterInterface, TransmitterProfile } from '../interface/transport.interface'
-import { AdapterSet, Event, Transport, TransportEvent } from "../interface/connector.interface";
+import { AdapterSet, ConnectionState, Event, Transport, TransportEvent } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
-import { filter, Observable } from "rxjs";
+import { BehaviorSubject, filter, map, Observable, Subject } from "rxjs";
+import { RetransmissionService } from "../utils/retransmission.service";
 
 /* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
 connectors or adapters will have their own identifier*/
 export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
+    private messageToBeTransmitted!: Subject<FisMessage>
     transmitterProfile!: TransmitterProfile;
+    retransmission!: RetransmissionService;
 
     constructor(profile: TransmitterProfile, adapterSets: AdapterSet[], event: Observable<TransportEvent>) {
         super(event)
+        this.event = event
+        this.messageToBeTransmitted = new Subject()
+        this.retransmission = new RetransmissionService()
         this.setTransmitter(profile)
         this.setUpAdapter(adapterSets)
+        this.setUpRetransmission()
+    }
+
+    // by the time this transmission set is instantiated, the connected client would've been online. Need ot manually signal retransmission to release buffer immediately
+    setUpRetransmission(): void {
+        let connectionStateEvent = new BehaviorSubject<'OFFLINE' | 'ONLINE'>('ONLINE')
+        this.event.pipe(
+            filter(event => event.event == 'Client Disconnected' || event.event == 'Client Reconnected'),
+            map(event => {
+                if (event.event == 'Client Disconnected') {
+                    return 'OFFLINE'
+                } else {
+                    return `ONLINE`
+                }
+            })
+        ).subscribe((signal: ConnectionState) => {
+            connectionStateEvent.next(signal)
+        })
+        this.retransmission.implementRetransmission(this.messageToBeTransmitted, connectionStateEvent)
+        // automatically subscribe to allow released bffered messages to be released
+        this.retransmission.returnSubjectForBufferedItems().subscribe(message => (this.adapterService as TransmitterConnectionAdapter).emit(message.payload))
     }
 
     setTransmitter(transmitterProfile: TransmitterProfile): void {
@@ -22,7 +49,8 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
 
     emit(message: FisMessage): void {
         // for now only use one adapter
-        (this.adapterService as TransmitterConnectionAdapter).emit(message)
+        this.messageToBeTransmitted.next(message)
+        // (this.adapterService as TransmitterConnectionAdapter).emit(message)
     }
 
     setUpAdapter(adapterSets: AdapterSet[]) {

+ 1 - 1
src/utils/retransmission.service.ts

@@ -110,7 +110,7 @@ export class RetransmissionService {
         this.receiverConnectionState.pipe(
             distinctUntilChanged()
         ).subscribe(clientState => {
-            console.log(`Client is now ${clientState}`)
+            console.log(`Client is now ${clientState}. ${(clientState === 'OFFLINE')? 'Buffering...' : ''}`)
             if (clientState == 'OFFLINE') {
                 console.log(`Current transmission state: ${this.transmissionState.getValue()}`)
                 // just keep buffering

+ 2 - 1
src/utils/socket.utils.ts

@@ -368,6 +368,7 @@ export async function checkOwnClientInfo(filename?: string): Promise<ConnectedSe
     })
 }
 
+// this is for server usage only
 export function startListening(socket: SocketForConnectedClient, client: ConnectedClientSocket, eventListener: Observer<TransportEvent>): void {
     /* Generally, we don't need this unless in the case of being the receiver */
     socket.on('message', (message: any) => {
@@ -387,7 +388,7 @@ export function startListening(socket: SocketForConnectedClient, client: Connect
     socket.on('disconnect', () => {
         eventListener.next({
             id: uuidv4(),
-            event: 'Server Disconnected',
+            event: 'Client Disconnected',
             data: {
                 clientID: client.id,
                 time: new Date()