rxjsbuffer.sample.ts 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import { fa } from "@faker-js/faker"
  2. import { Subject, interval, Observable, buffer, switchMap, bufferToggle, map, scan, startWith, BehaviorSubject, tap } from "rxjs"
  3. import { prepareResponseMessages } from "../services/utility/prepareFISmessage"
  4. let toggle = interval(5000)
  5. const syncSource = interval(500) // sync
  6. let asyncSource = new Subject<any>() //async
  7. syncSource.subscribe(e => {
  8. asyncSource.next(e)
  9. })
  10. /* So, output is the final and syncSource is the input. So the first trigger should triggle buffer.
  11. so initiailly, source should push to output, when toggle is triggered, source should be unsubscribed,
  12. and buffered should be instantiated immediately to keep receiving the messages.
  13. When the next trigger comes in (reconnection), than connect source to output again, whilst aslo
  14. releasing buffered into output as well. */
  15. // assuming the initial state is online, so source will stream straight to output, but the first toggle will be offline
  16. /* How buffer works: It doesn't have a normal. It will subscribe to another observable that will act as a signal to release
  17. It's default state would be that it will always buffer, until the subscribed observable emits a signal to relase the buffer,
  18. and then it will continue to buffer. There's no way to adjust buffer to stream normally. */
  19. function bufferTest1() {
  20. const intervalEvents = interval(1000);
  21. const buffered = intervalEvents.pipe(buffer(toggle));
  22. buffered.subscribe(x => console.log(x));
  23. }
  24. // VERSION 2 <with the help of chatGPT>
  25. // Toggle between buffering and normal emitting
  26. function bufferTest2() {
  27. const toggleBuffer$ = toggle.pipe(
  28. // Track the toggle state: true means buffering, false means normal
  29. scan((isBuffering) => !isBuffering, false),
  30. // Use the state to switch between buffer mode and normal mode
  31. switchMap(isBuffering => {
  32. // check for notif messatge and mutate open and close and isbuffering
  33. // open.next(blablabla) or close.next(blablabla)
  34. if (isBuffering) {
  35. console.log(isBuffering)
  36. // Start buffering: open on toggle, close on next toggle
  37. return asyncSource.pipe(
  38. // bufferToggle(toggle, () => toggle)
  39. bufferToggle(toggle.pipe(startWith(0)), () => toggle))
  40. // bufferToggle(open, () => { if (true) { return closing } })
  41. } else {
  42. console.log(isBuffering)
  43. // Emit values normally
  44. return asyncSource;
  45. }
  46. })
  47. );
  48. // Subscribe to the toggled stream
  49. toggleBuffer$.subscribe(values => {
  50. console.log('Values:', values);
  51. });
  52. }
  53. // bufferTest2()
  54. // bufferTest1()
  55. let notificationSubject = new BehaviorSubject<string>('online') // true as in online false as in offline
  56. let keep = new Subject<any>()
  57. let release = new Subject<any>()
  58. // bufferTest3().subscribe(values => {
  59. // console.log(`Values: ${values}`)
  60. // })
  61. // emulateNotification(`offline`, 3000)
  62. // emulateNotification(`online`, 7000)
  63. // emulateNotification(`offline`, 10000)
  64. // emulateNotification(`online`, 15000)
  65. function bufferTest3(): Observable<any> {
  66. const messageStream = notificationSubject.pipe(
  67. map(notif => {
  68. if (notif == 'offline') {
  69. return true
  70. } else {
  71. return false
  72. }
  73. }),
  74. switchMap(notif => {
  75. if (notif) {
  76. // Start buffering: open on toggle, close on next toggle
  77. return asyncSource.pipe(
  78. bufferToggle(keep.pipe(startWith(0)), () => release))
  79. } else {
  80. // Emit values normally
  81. return asyncSource;
  82. }
  83. })
  84. )
  85. notificationSubject.subscribe(notif => {
  86. // logic here
  87. if (notif == 'online') {
  88. console.log(`received notification: ${notif}, releasing...`)
  89. release.next('release')
  90. }
  91. if (notif == 'offline') {
  92. console.log(`received notification: ${notif}, buffering...`)
  93. keep.next('keep')
  94. }
  95. })
  96. return messageStream
  97. }
  98. function emulateNotification(status: string, delay: number) {
  99. setTimeout(() => {
  100. notificationSubject.next(status)
  101. }, delay)
  102. }
  103. // working version
  104. function bufferTest4(): Observable<any> {
  105. // Track buffering state
  106. const bufferingState$ = notificationSubject.pipe(
  107. scan((isBuffering, notif) => notif === 'offline' ? true : false, false),
  108. tap(isBuffering => {
  109. console.log(`Buffering state changed: ${isBuffering}`);
  110. })
  111. );
  112. // Message stream based on buffering state
  113. const messageStream = bufferingState$.pipe(
  114. switchMap(isBuffering => {
  115. if (isBuffering) {
  116. // Start buffering: open on toggle, close on next toggle
  117. return asyncSource.pipe(
  118. // bufferToggle(toggle.pipe(startWith(0)), () => release)
  119. bufferToggle(keep.pipe(startWith(0)), () => release)
  120. );
  121. } else {
  122. // Emit values normally
  123. return asyncSource;
  124. }
  125. })
  126. );
  127. notificationSubject.subscribe(notif => {
  128. console.log(`Received notification: ${notif}`);
  129. if (notif === 'online') {
  130. release.next(true); // Release buffered values
  131. }
  132. if (notif === 'offline') {
  133. keep.next(true); // Start buffering
  134. }
  135. });
  136. return messageStream;
  137. }
  138. let messages = prepareResponseMessages(1000000, 1)
  139. function bufferTest5() {
  140. const clicks = interval(300000);
  141. const buffered = messages.pipe(buffer(clicks));
  142. let count = 0
  143. buffered.subscribe({
  144. next: x => {
  145. console.log(`${count++} && Buffer Length: ${x.length}`)
  146. },
  147. error: err => console.error(err),
  148. complete: () => { }
  149. })
  150. }
  151. bufferTest5()