websocket.ts 3.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import { BehaviorSubject, filter, Observable, Subject } from "rxjs";
  2. import { Socket as ClientSocket } from 'socket.io-client'
  3. import { Socket as SocketForConnectedClient } from "socket.io"
  4. import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
  5. import { ClientObject, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
  6. import { WrappedMessage } from "../utils/message.ordering";
  7. import { FisMessage } from "../interface/transport.interface";
  8. /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
  9. export class WebsocketTransportService implements TransportService {
  10. private info: Transport = Transport.Websocket
  11. private connectedServer: ConnectedServerSocket[] = [] // to allow the possibility of having to communicate with multiple servers as a client
  12. private connectedClientSocket: ConnectedClientSocket[] = [] // to keep track of the all the clients that are connected
  13. // private incomingMessage: Subject<TransportMessage> = new Subject() // this is only for client roles only atm
  14. private transportEvent!: Subject<TransportEvent>
  15. constructor(event: Subject<TransportEvent>) {
  16. console.log(`WebsocketTransportService: Constructing socket transport service....`)
  17. this.transportEvent = event
  18. // logic here
  19. }
  20. public startServer(port: number): void {
  21. // logic here
  22. startSocketServer(port).subscribe({
  23. next: (connectedClient: SocketForConnectedClient) => {
  24. handleNewSocketClient(connectedClient, this.connectedClientSocket).subscribe({
  25. next: event => this.transportEvent.next(event),
  26. error: error => console.error(error),
  27. complete: () => console.log(`Client ${connectedClient.id} disconnected...`)
  28. })
  29. },
  30. error: error => console.error(error),
  31. complete: () => console.log(`...`)
  32. })
  33. }
  34. public startClient(url: string): void {
  35. // logic here
  36. startClientSocketConnection(url).then((socket: ClientSocket) => {
  37. handleClientSocketConnection(socket, this.connectedServer).subscribe(this.transportEvent)
  38. }).catch((error) => {
  39. console.error(`WebsocketTransport ERROR:`, error)
  40. })
  41. }
  42. public emit(message: TransportMessage): void {
  43. console.log(`Transport Socket service level. Emitting: ${((message.payload as WrappedMessage).payload as FisMessage).header.messageID}`)
  44. let clientObj: ConnectedClientSocket | undefined = this.connectedClientSocket.find(obj => obj.id == message.target)
  45. let serverObj: ConnectedServerSocket | undefined = this.connectedServer.find(obj => obj.id === message.target)
  46. console.log(serverObj?.connectionState.getValue(), serverObj?.id)
  47. // for server usage
  48. if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
  49. clientObj.socketInstance.emit(`message`, message.payload)
  50. }
  51. // for client usage
  52. if (serverObj && serverObj.connectionState.getValue() == 'ONLINE') {
  53. serverObj.socketInstance.emit(`message`, message.payload)
  54. }
  55. }
  56. // this returns the ref pointer for the TransportEvent instantiated at Supervisor. Socket will broadcast incoming messages as event
  57. public subscribe(): Observable<TransportEvent> {
  58. return this.transportEvent.asObservable()
  59. }
  60. public getInfo(): Transport {
  61. return this.info
  62. }
  63. }
  64. export interface ConnectedClientSocket extends ClientObject {
  65. socketInstance: SocketForConnectedClient
  66. }
  67. export interface ConnectedServerSocket extends ClientObject {
  68. socketInstance: ClientSocket
  69. }