socketTest.txt 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. import { BehaviorSubject, buffer, concatMap, distinctUntilChanged, from, interval, Observable, Subject, Subscriber, take, takeUntil, takeWhile } from "rxjs";
  2. import { io, Socket } from "socket.io-client";
  3. import { prepareResponseMessages } from "../services/utility/prepareFISmessage";
  4. import { BaseMessage } from "../dependencies/logging/services/logging-service";
  5. import { rejects } from "assert";
  6. // Connect to the server
  7. const socket: Socket = io('http://localhost:3000');
  8. export let abstractStorage: WrappedMessage[] = []
  9. export let bufferReleaseSignal: Subject<void> = new Subject()
  10. export let sender: Subject<BaseMessage> = prepareResponseMessages(3000, 500)
  11. export let receiverConnectionState: BehaviorSubject<'OFFLINE' | 'ONLINE'> = new BehaviorSubject('OFFLINE')
  12. export let transmissionState: BehaviorSubject<'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'> = new BehaviorSubject('ARRAY EMPTY')
  13. export let arrayToBeTransmitted: Subject<WrappedMessage[]> = new Subject()
  14. export let toBeWrapped: Subject<any> = new Subject()
  15. export let wrappedMessage: Subject<WrappedMessage> = new Subject()
  16. export let transportLayerMessages = new Subject<any>()
  17. // run this to active the release mechanism
  18. releaseSignalManager()
  19. // sender goes to toBeWrapped
  20. sender.subscribe(message => toBeWrapped.next(message))
  21. // toBeWrapped will wrap the message with timeReceived and push next to wrappedMesasge subject
  22. let currentMessageId: string | null
  23. toBeWrapped.subscribe(message => {
  24. wrappedMessage.next(wrapMessageWithTimeReceived(message, currentMessageId ? currentMessageId : null))
  25. currentMessageId = message.header.messageID
  26. })
  27. //simulate connection test
  28. // wrappedMessage will then be pushed to buffer
  29. wrappedMessage.pipe(buffer(bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
  30. console.log(bufferedMessages.length + ' buffered messages')
  31. console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
  32. // arrayToBeTransmitted.next(sortMessage(bufferedMessages))
  33. arrayToBeTransmitted.next(bufferedMessages.length > 0 ? sortMessage(bufferedMessages) : [])
  34. });
  35. arrayToBeTransmitted.subscribe(array => {
  36. if (array.length > 0) {
  37. /* Note: Latest update, no point checking the receiver's connection state, since, once the message is pass on, it will
  38. be flushed into the event queue to be executed at a later time, which the connnection state would be mutated by then. */
  39. // update transmission to indicate that this batch of array is being processed
  40. transmissionState.next('TRANSMITTING')
  41. from(array).pipe(
  42. concatMap((message: WrappedMessage) => {
  43. if (transmissionState.getValue() === 'TRANSMITTING') {
  44. console.log(message.timeReceived);
  45. return sendMessage(message).catch((error) => {
  46. return storeMessage(message).then((msgId) => {
  47. console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`);
  48. }).catch((error) => {
  49. console.error(error);
  50. });
  51. });
  52. } else if (transmissionState.getValue() === 'STORING DATA') {
  53. return storeMessage(message).then((msgId) => {
  54. console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`);
  55. }).catch((error) => {
  56. console.error(error);
  57. });
  58. } else if (receiverConnectionState.getValue() === 'OFFLINE') {
  59. transmissionState.next('STORING DATA'); // to be fired every message processing
  60. return storeMessage(message).then((msgId) => {
  61. console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`);
  62. }).catch((error) => {
  63. console.error(error);
  64. });
  65. } else {
  66. return Promise.resolve(); // No async work, but need to return a resolved promise
  67. }
  68. })
  69. ).subscribe({
  70. error: err => console.error(err),
  71. complete: () => {
  72. // update transmission state to indicate this batch is completed
  73. console.log(`Processing buffered array completed. Changing transmission state to ARRAY EMPTY`);
  74. transmissionState.next('ARRAY EMPTY');
  75. if (receiverConnectionState.getValue() === 'ONLINE' && transmissionState.getValue() === 'ARRAY EMPTY') {
  76. setTimeout(() => {
  77. bufferReleaseSignal.next()
  78. }, 1000)
  79. }
  80. // Do nothing if the receiver connection is offline
  81. }
  82. });
  83. } else {
  84. // If I don't do setTimeout, then bufferrelasesignal will be overloaded
  85. setTimeout(() => {
  86. bufferReleaseSignal.next()
  87. }, 3000)
  88. }
  89. }
  90. )
  91. /* Utils */
  92. function releaseSignalManager() {
  93. receiverConnectionState.pipe(
  94. distinctUntilChanged()
  95. ).subscribe(clientState => {
  96. console.log(`Client is now ${clientState}`)
  97. if (clientState == 'OFFLINE') {
  98. console.log(`Current transmission state: ${transmissionState.getValue()}`)
  99. // just keep buffering
  100. }
  101. if (clientState == 'ONLINE') {
  102. console.log(`Current transmission state: ${transmissionState.getValue()}`)
  103. // get the stored messages to pump it back into the buffer to be ready to be processed immediately
  104. if (transmissionState.getValue() == 'ARRAY EMPTY') {
  105. getDataAndUpdateState()
  106. }
  107. if (transmissionState.getValue() == 'STORING DATA') {
  108. // have to wait for storing data to be completed before proceeding to the code above
  109. transmissionState.pipe(
  110. takeWhile(value => value == 'ARRAY EMPTY') //listen to this value and then destroy this observable
  111. ).subscribe({
  112. next: () => {
  113. getDataAndUpdateState()
  114. },
  115. error: err => console.error(err),
  116. complete: () => { }
  117. })
  118. }
  119. }
  120. })
  121. }
  122. function sortMessage(array: WrappedMessage[]): WrappedMessage[] {
  123. console.log(`Sorting ${array.length} messages....`)
  124. return array.sort((a, b) => {
  125. return new Date(a.timeReceived).getTime() - new Date(b.timeReceived).getTime();
  126. });
  127. }
  128. function wrapMessageWithTimeReceived(message: any, previousMessageID: string): any {
  129. // check if message has already a time received property if so no need to add anymore
  130. if (!message.timeReceived) {
  131. let WrappedMessage: WrappedMessage = {
  132. timeReceived: new Date(),
  133. payload: message as BaseMessage,
  134. previousMessageID: previousMessageID
  135. }
  136. return WrappedMessage
  137. } else {
  138. return message as WrappedMessage
  139. }
  140. }
  141. async function getStoredMessages(): Promise<WrappedMessage[]> {
  142. return new Promise((resolve, reject) => {
  143. let array = []
  144. setTimeout(() => {
  145. abstractStorage.forEach(message => {
  146. array.push(message)
  147. })
  148. abstractStorage = []
  149. }, 5000)
  150. resolve(array)
  151. })
  152. }
  153. // just an abstraction
  154. async function storeMessage(message: WrappedMessage): Promise<any> {
  155. return new Promise((resolve, reject) => {
  156. try {
  157. setTimeout(() => {
  158. console.log(`Storing ${message.payload.header.messageID}....`)
  159. abstractStorage.push(message)
  160. resolve(message.payload.header.messageID)
  161. }, 1000)
  162. }
  163. catch (error) {
  164. reject(error)
  165. }
  166. })
  167. }
  168. function getDataAndUpdateState() {
  169. transmissionState.next('GETTING STORED DATA')
  170. console.log(`Current transmission state: ${transmissionState.getValue()}`)
  171. getStoredMessages().then((storedMessages: WrappedMessage[]) => {
  172. if (storedMessages.length > 0) {
  173. console.log(`${storedMessages.length} STORED messages.`)
  174. from(storedMessages).subscribe({
  175. next: message => {
  176. wrappedMessage.next(message)
  177. },
  178. error: err => console.error(err),
  179. complete: () => {
  180. console.log(`Flushed back ${storedMessages.length} messages back in buffer`)
  181. transmissionState.next('ARRAY EMPTY')
  182. bufferReleaseSignal.next()
  183. }
  184. })
  185. } else {
  186. console.log(`${storedMessages.length} STORED messages.`)
  187. transmissionState.next('ARRAY EMPTY')
  188. bufferReleaseSignal.next()
  189. }
  190. }).catch((err) => {
  191. console.error(err)
  192. })
  193. }
  194. export interface WrappedMessage {
  195. timeReceived: any, // this property is for sender to sort
  196. payload: BaseMessage,
  197. previousMessageID?: string // this property is for receiver to sort
  198. }
  199. // Listen for a connection event
  200. socket.on('connect', () => {
  201. socket.emit('Hello from the client!')
  202. console.log('Connected to the server:', socket.id)
  203. receiverConnectionState.next('ONLINE')
  204. });
  205. // Listen for messages from the server
  206. socket.on('message', (msg: string) => {
  207. console.log('Message from server:', msg);
  208. })
  209. async function sendMessage(message: WrappedMessage): Promise<any> {
  210. return new Promise((resolve, reject) => {
  211. try {
  212. // extra precaution: According to chatgpt, if disconnected, then the payload will be loaded back in event queue whilst the socket will try to reestablish connection
  213. // https://socket.io/docs/v4/client-offline-behavior/
  214. socket.emit('message', message); // inherently an aysnc
  215. console.log(`SocketEmit() for message to event queue ${message.payload.header.messageID}
  216. current tranmission State: ${transmissionState.getValue()}
  217. current connection State: ${receiverConnectionState.getValue()}
  218. ${receiverConnectionState.getValue()=='OFFLINE'? 'Message in the event queue will be attempted again after connection is back' : 'Sent over'}`);
  219. resolve('')
  220. } catch (error) {
  221. console.error('Error emitting message:', error);
  222. wrappedMessage.next(message)
  223. reject(error)
  224. }``
  225. })
  226. }
  227. // Handle disconnection
  228. socket.on('disconnect', () => {
  229. console.log('Disconnected from the server');
  230. receiverConnectionState.next('OFFLINE')
  231. });