|
@@ -7,58 +7,94 @@ import { io, Socket } from "socket.io-client";
|
|
|
import { handleClientSocketConnection } from "../utils/socket.utils";
|
|
|
import { ConnectedServerSocket } from "../transport/websocket";
|
|
|
import { v4 as uuidv4 } from 'uuid'
|
|
|
-import { filter, interval, Observable, Observer, Subject } from "rxjs";
|
|
|
-import { TransportEvent, TransportMessage } from "../interface/connector.interface";
|
|
|
-import { FisMessage } from "../interface/transport.interface";
|
|
|
+import { filter, interval, map, Observable, Observer, Subject, Subscription, takeWhile } from "rxjs";
|
|
|
+import { TransportEvent } from "../interface/connector.interface";
|
|
|
+import { EventMessage, FisMessage } from "../interface/transport.interface";
|
|
|
+import { checkMessage, WrappedMessage } from "../utils/message.ordering";
|
|
|
+import { RetransmissionService } from "../utils/retransmission.service";
|
|
|
|
|
|
class SocketClient {
|
|
|
- private socket: Socket;
|
|
|
+ private currentSocketId!: string
|
|
|
+ private socket!: Socket;
|
|
|
private connectedServerSocket: ConnectedServerSocket[] = []
|
|
|
+ private requestMessages: Subject<any> = new Subject()
|
|
|
private event: Subject<TransportEvent> = new Subject()
|
|
|
+ private retransmission: RetransmissionService = new RetransmissionService()
|
|
|
|
|
|
constructor(url: string) {
|
|
|
+ this.setUpClientServerConnection(url)
|
|
|
+ }
|
|
|
+
|
|
|
+ private setUpClientServerConnection(url: string) {
|
|
|
// Connect to the serve
|
|
|
this.socket = io(url);
|
|
|
// use the existing socket handler
|
|
|
handleClientSocketConnection(this.socket, this.connectedServerSocket).subscribe(this.event)
|
|
|
this.startListening(this.event)
|
|
|
- // interval(5000).subscribe(time => {
|
|
|
- // let requestId: string = uuidv4()
|
|
|
- // this.sendMessage(requestId).subscribe({
|
|
|
- // next: (message: FisMessage) => console.log(`Incoming Response`, message.header.messageID),
|
|
|
- // error: error => console.error(error),
|
|
|
- // complete: () => console.log(`Completed Response for ${requestId}`)
|
|
|
- // })
|
|
|
- // })
|
|
|
}
|
|
|
|
|
|
private startListening(event: Subject<TransportEvent>): void {
|
|
|
- event.subscribe((event: TransportEvent) => console.log(event))
|
|
|
+ event.subscribe((event: TransportEvent) => {
|
|
|
+ // console.log(event)
|
|
|
+ if (event.event == `New Server`) {
|
|
|
+ this.currentSocketId = (event.data as EventMessage).clientId
|
|
|
+
|
|
|
+ let currentClientSocket: ConnectedServerSocket | undefined = this.connectedServerSocket.find(obj => obj.id === this.currentSocketId)
|
|
|
+ if (currentClientSocket) {
|
|
|
+ // so retransmission is working as usual
|
|
|
+ this.retransmission.implementRetransmission(this.requestMessages, currentClientSocket.connectionState, true)
|
|
|
+ this.startGeneratingRequest(1000, this.requestMessages)
|
|
|
+ this.retransmission.returnSubjectForBufferedItems().subscribe((message: WrappedMessage) => {
|
|
|
+ this.sendMessage(message).subscribe({
|
|
|
+ next: message => console.log(message),
|
|
|
+ complete: () => console.log(`Request Completed for ${message.thisMessageID}`)
|
|
|
+ })
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
- private sendMessage(requestId: string): Observable<any> {
|
|
|
- return new Observable((response: Observer<FisMessage>) => {
|
|
|
+ private startGeneratingRequest(intervalDuration: number, requestsPipe: Subject<FisMessage>) {
|
|
|
+ interval(intervalDuration).subscribe(time => {
|
|
|
let message: FisMessage = {
|
|
|
header: {
|
|
|
- messageID: requestId,
|
|
|
+ messageID: uuidv4(),
|
|
|
messageName: 'RequestMessage'
|
|
|
},
|
|
|
data: 'Data'
|
|
|
}
|
|
|
+ requestsPipe.next(message)
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ private sendMessage(message: WrappedMessage): Observable<WrappedMessage> {
|
|
|
+ return new Observable((response: Observer<WrappedMessage>) => {
|
|
|
+ console.log(`Emitting: ${message.thisMessageID}`)
|
|
|
this.socket.emit('message', message)
|
|
|
+ let onHold = new Subject<WrappedMessage>()
|
|
|
+ let toBeReleased = new Subject<WrappedMessage>()
|
|
|
|
|
|
- this.event.pipe(
|
|
|
- filter(obj => obj.event == 'New Message'),
|
|
|
- filter(obj => (obj.data as FisMessage).header.messageID == message.header.messageID)
|
|
|
- ).subscribe({
|
|
|
- next: (event: TransportEvent) => {
|
|
|
- if ((event.data as FisMessage).data == 'Complete') {
|
|
|
- response.complete()
|
|
|
- }
|
|
|
+ let subscription: Subscription = toBeReleased.subscribe(message => {
|
|
|
+ if ((message.payload as FisMessage).data == 'Complete') {
|
|
|
+ onHold.complete()
|
|
|
+ toBeReleased.complete()
|
|
|
+ subscription.unsubscribe()
|
|
|
+ response.complete()
|
|
|
+ } else {
|
|
|
response.next(message)
|
|
|
- },
|
|
|
- error: error => response.error(error),
|
|
|
- complete: () => { }
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ this.event.pipe(
|
|
|
+ filter(event => event.event == 'New Message'),
|
|
|
+ filter(event => ((event.data as EventMessage).payload as WrappedMessage).thisMessageID === message.thisMessageID),
|
|
|
+ takeWhile(event => (((event.data as EventMessage).payload as WrappedMessage).payload as FisMessage).data == 'Complete'),
|
|
|
+ map(event => ((event.data as EventMessage).payload as WrappedMessage)),
|
|
|
+ ).subscribe((message: WrappedMessage) => {
|
|
|
+ checkMessage(message, onHold).then(() => {
|
|
|
+ toBeReleased.next(message)
|
|
|
+ })
|
|
|
})
|
|
|
})
|
|
|
}
|