query.service.ts 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. import * as fs from 'fs'
  2. import { filter, isMatch } from 'lodash'
  3. import { Observable, Subject, interval, map, of } from 'rxjs'
  4. export class queryService {
  5. private dataFromStorage: Subject<any> = new Subject()
  6. private filteredResult: Subject<any> = new Subject()
  7. public query(storageAddress: Storage, ...conditions: Entries[]) : Observable<any> {
  8. this.loadObsData(storageAddress.address)
  9. this.filterFromObs(...conditions)
  10. return this.filteredResult.pipe()
  11. }
  12. // Data preparations: Purely Observables
  13. private loadObsData(location: string) {
  14. let data = fs.readFileSync(location, 'utf-8')
  15. let dataJson = JSON.parse(data)
  16. let count = 0
  17. const intervalId = setInterval(() => {
  18. this.dataFromStorage.next(dataJson[count]);
  19. count++;
  20. if (count >= 100) {
  21. clearInterval(intervalId);
  22. this.dataFromStorage.complete();
  23. }
  24. }, 1000)
  25. }
  26. // Search and Filter: Pure Observables. To be moved out to become a separate library again.
  27. private filterFromObs(...conditions: Entries[]) {
  28. this.dataFromStorage.subscribe({
  29. next: element => {
  30. if(isMatch(element, conditions)){
  31. // Logic to check if data meets the conditions, if so, put it into result.next{}
  32. this.filteredResult.next(element)
  33. }
  34. }
  35. })
  36. }
  37. }
  38. // Entries that client will use. Subject to be improved later on
  39. export interface Entries {
  40. _id?: string,
  41. appLogLocId?: string,
  42. msgId?: string,
  43. msgLogDateTime?: Date | string,
  44. msgDateTime?: Date | string,
  45. msgTag?: string[],
  46. msgPayload?: string
  47. }
  48. export interface Storage {
  49. type: string,
  50. address: string
  51. }