12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- import { BehaviorSubject, filter, Observable, Subject } from "rxjs";
- import { Socket as ClientSocket } from 'socket.io-client'
- import { Socket as SocketForConnectedClient } from "socket.io"
- import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
- import { ClientObject, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
- import { WrappedMessage } from "../utils/message.ordering";
- import { FisMessage } from "../interface/transport.interface";
- /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
- export class WebsocketTransportService implements TransportService {
- private info: Transport = Transport.Websocket
- private connectedServer: ConnectedServerSocket[] = [] // to allow the possibility of having to communicate with multiple servers as a client
- private connectedClientSocket: ConnectedClientSocket[] = [] // to keep track of the all the clients that are connected
- // private incomingMessage: Subject<TransportMessage> = new Subject() // this is only for client roles only atm
- private transportEvent!: Subject<TransportEvent>
- constructor(event: Subject<TransportEvent>) {
- console.log(`WebsocketTransportService: Constructing socket transport service....`)
- this.transportEvent = event
- // logic here
- }
- public startServer(port: number): void {
- // logic here
- startSocketServer(port).subscribe({
- next: (connectedClient: SocketForConnectedClient) => {
- handleNewSocketClient(connectedClient, this.connectedClientSocket).subscribe({
- next: event => this.transportEvent.next(event),
- error: error => console.error(error),
- complete: () => console.log(`Client ${connectedClient.id} disconnected...`)
- })
- },
- error: error => console.error(error),
- complete: () => console.log(`...`)
- })
- }
- public startClient(url: string): void {
- // logic here
- startClientSocketConnection(url).then((socket: ClientSocket) => {
- handleClientSocketConnection(socket, this.connectedServer).subscribe(this.transportEvent)
- }).catch((error) => {
- console.error(`WebsocketTransport ERROR:`, error)
- })
- }
- public emit(message: TransportMessage): void {
- console.log(`Transport Socket service level. Emitting: ${((message.payload as WrappedMessage).payload as FisMessage).header.messageID}`)
- let clientObj: ConnectedClientSocket | undefined = this.connectedClientSocket.find(obj => obj.id == message.target)
- let serverObj: ConnectedServerSocket | undefined = this.connectedServer.find(obj => obj.id === message.target)
- console.log(serverObj?.connectionState.getValue(), serverObj?.id)
- // for server usage
- if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
- clientObj.socketInstance.emit(`message`, message.payload)
- }
- // for client usage
- if (serverObj && serverObj.connectionState.getValue() == 'ONLINE') {
- serverObj.socketInstance.emit(`message`, message.payload)
- }
- }
- // this returns the ref pointer for the TransportEvent instantiated at Supervisor. Socket will broadcast incoming messages as event
- public subscribe(): Observable<TransportEvent> {
- return this.transportEvent.asObservable()
- }
- public getInfo(): Transport {
- return this.info
- }
- }
- export interface ConnectedClientSocket extends ClientObject {
- socketInstance: SocketForConnectedClient
- }
- export interface ConnectedServerSocket extends ClientObject {
- socketInstance: ClientSocket
- }
|