socket.service.ts 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import { BehaviorSubject, catchError, Observable, of, Subject, Subscription, tap, timeout } from "rxjs"
  2. import { RetransmissionService } from "../../services/retransmission.service"
  3. import { BaseMessage } from "../../dependencies/logging/services/logging-service"
  4. import { v4 as uuidV4 } from 'uuid';
  5. const express = require('express');
  6. const http = require('http');
  7. // const { Server } = require('socket.io');
  8. import { Server } from 'socket.io'
  9. /* This is only for demonstration purposes. Because the actual nestjs socket implementation may differ. */
  10. export class SocketService {
  11. private connectedClients: ClientInfo[] = []
  12. private announcements: Subject<any> = new Subject()
  13. private app = express();
  14. private server = http.createServer(this.app);
  15. private io = new Server(this.server);
  16. private responseFromApp: Subject<BaseMessage | any>
  17. private incomingRequest: Subject<BaseMessage> = new Subject()
  18. constructor(response: Subject<BaseMessage>) {
  19. this.responseFromApp = response
  20. this.announcements.subscribe(announcement => {
  21. console.log(`Server Announcement: ${announcement}`)
  22. })
  23. }
  24. public getIncomingRequest(): Observable<BaseMessage> {
  25. return this.incomingRequest.asObservable()
  26. }
  27. public async setUpConnection() {
  28. this.io.on('connection', (socket) => {
  29. this.announcements.next('a client is connected:' + socket.id);
  30. let clientInfo: ClientInfo | null
  31. socket.on('connect', (msg) => {
  32. // this is reserved....
  33. });
  34. socket.on('notification', (msg) => {
  35. console.log(msg)
  36. if (msg.agenda == 'newClient') {
  37. clientInfo = {
  38. id: socket.id,
  39. clientName: uuidV4(),
  40. connectedAt: new Date(),
  41. clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
  42. requests: [],
  43. buffer: new RetransmissionService(),
  44. responseObs: new Subject<any>()
  45. }
  46. this.connectedClients.push(clientInfo);
  47. // Send data over for client to persist
  48. socket.emit('notification', {
  49. notification: 'Your credentials',
  50. createdAt: new Date(),
  51. socketInfo: clientInfo
  52. })
  53. // this is the supposed responses to be pushed to this socket client
  54. clientInfo.buffer.retransmission(clientInfo.responseObs, clientInfo.clientConnectionState)
  55. let subscription = clientInfo.buffer.returnBufferedMessages().subscribe(output => {
  56. // console.log(output)
  57. if (clientInfo.clientConnectionState.getValue() === 'ONLINE') {
  58. socket.emit('response', output)
  59. } else {
  60. subscription.unsubscribe()
  61. }
  62. })
  63. }
  64. if (msg.agenda == 'existingClient') {
  65. // check if client exists
  66. let clientObj: ClientInfo = this.connectedClients.find(obj => obj.clientName === msg.data.clientName)
  67. if (clientObj) {
  68. clientInfo = clientObj
  69. console.log('Existing client found')
  70. // but also update socketId
  71. clientInfo.id = socket.id
  72. // Send data over for client to persist
  73. socket.emit('notification', {
  74. notification: 'Your updated credentials',
  75. connectedAt: new Date(),
  76. updatedId: socket.id
  77. })
  78. // resume operation Release them buffer
  79. /* local client isOnline need not be mutated, since this is a new connection. However the previous intance of client Connection State
  80. inside the retransmission needs to be updated to release the buffered values.*/
  81. function releaseBufferedItems(clientInfo: ClientInfo) {
  82. let subscription: Subscription = clientInfo.buffer.returnBufferedMessages().pipe(
  83. tap(message => {
  84. if (clientInfo.clientConnectionState.getValue() === 'OFFLINE') {
  85. clientInfo.responseObs.next(message)
  86. }
  87. }),
  88. timeout(10000), // Unsubscribe if no value is emitted within 10 seconds
  89. catchError(err => {
  90. if (err.name === 'TimeoutError') {
  91. console.log('TimeoutError: No value emitted within 10 seconds.');
  92. if (clientInfo.clientConnectionState.getValue() === 'ONLINE') {
  93. releaseBufferedItems(clientInfo); // Call the function if it's still online
  94. } else {
  95. subscription.unsubscribe()
  96. }
  97. }
  98. return of();
  99. })
  100. )
  101. .subscribe({
  102. next: output => {
  103. socket.emit('response', output)
  104. },
  105. error: err => console.error(err),
  106. complete: () => { }
  107. })
  108. }
  109. releaseBufferedItems(clientInfo)
  110. //signal to release buffered items
  111. clientObj.clientConnectionState.next('ONLINE')
  112. } else {
  113. console.log(this.connectedClients)
  114. console.log(`Existing Client is not found`)
  115. }
  116. }
  117. })
  118. // Listen for messages from the client
  119. socket.on('request', (request: BaseMessage) => {
  120. if (clientInfo) {
  121. this.announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.clientName}`);
  122. // clientInfo.requests.push({ message: request, completed: false })
  123. this.incomingRequest.next(request)
  124. this.processRequest(request).subscribe({
  125. next: message => {
  126. // console.log(message.header.messageName) // it does receive
  127. clientInfo.responseObs.next(message)
  128. },
  129. error: err => console.error(err),
  130. complete: () => { }
  131. })
  132. } else {
  133. console.log(`Client is still not defined. Please have this client set up the credentials`)
  134. socket.emit('notification', {
  135. notification: 'Failed Request',
  136. data: request,
  137. message: 'Client Credentials is not properply set up! Cannot process requests at the moment.'
  138. })
  139. }
  140. });
  141. // Handle disconnection
  142. socket.on('disconnect', () => {
  143. if (clientInfo) {
  144. clientInfo.clientConnectionState.next('OFFLINE') // signal to start buffering\
  145. this.announcements.next(`Client ${clientInfo.id} disconnected`);
  146. // this.deleteClientById(socket.id)
  147. }
  148. });
  149. });
  150. this.io.engine.on("connection_error", (err) => {
  151. console.log(err.req); // the request object
  152. console.log(err.code); // the error code, for example 1
  153. console.log(err.message); // the error message, for example "Session ID unknown"
  154. console.log(err.context); // some additional error context
  155. });
  156. // Start the server
  157. const PORT = process.env.PORT || 3000;
  158. this.server.listen(PORT, () => {
  159. console.log(`Server listening on port ${PORT}`);
  160. });
  161. }
  162. // Utils
  163. // Function to delete an item by its id (mutating the array)
  164. private deleteClientById(id) {
  165. const index = this.connectedClients.findIndex(item => item.id === id);
  166. if (index !== -1) {
  167. this.connectedClients.splice(index, 1);
  168. }
  169. }
  170. private processRequest(request: BaseMessage): Observable<any> {
  171. return new Observable((observer) => {
  172. this.responseFromApp.subscribe(message => {
  173. // console.log(message)
  174. if (message.header.messageID === request.header.messageID) {
  175. if (!message.complete) {
  176. observer.next(message)
  177. } else {
  178. observer.next(message)
  179. observer.complete()
  180. }
  181. }
  182. })
  183. })
  184. }
  185. }
  186. export interface ClientInfo {
  187. id: string,
  188. clientName: string,
  189. connectedAt: Date,
  190. clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>,
  191. requests: { message: any, completed: boolean }[],
  192. buffer: RetransmissionService,
  193. responseObs: Subject<any>
  194. }