buffer.service.ts 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // bufferService.ts
  2. import { BehaviorSubject, Observable, Subject, from, map, switchMap } from 'rxjs';
  3. import mongoose, { Connection, Model, Document } from 'mongoose';
  4. import { ConnectionState, Message, MessageLog } from '../interfaces/general.interface';
  5. import { resolve } from 'path';
  6. export class BufferService {
  7. private messageStream: Subject<Message>
  8. private connectionState: BehaviorSubject<ConnectionState>
  9. private messageBuffer: Message[] = [];
  10. private messageModel: Model<Message> | undefined;
  11. private readonly dbUrl: string = process.env.MONGO as string;
  12. constructor(
  13. messageFromApp: Subject<Message>,
  14. connectionStateSubject: BehaviorSubject<ConnectionState>,
  15. dbName: string
  16. ) {
  17. this.messageStream = messageFromApp;
  18. this.connectionState = connectionStateSubject;
  19. this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model
  20. /* Disable for now. Use local array first */
  21. this.initializeDatabaseConnection(dbName).then((connection: mongoose.Connection) => {
  22. const grpcMessageSchema = require('../models/message.schema');
  23. this.messageModel = connection.model<Message>('Message', grpcMessageSchema)
  24. this.transferLocalBufferToMongoDB() // transfer all data from local array into mongodb after the mongo setup is complete
  25. }).catch(error => {
  26. console.error('Database initialization failed:', error);
  27. // Implement retry logic or additional error handling here
  28. });
  29. }
  30. public getMessages(): Observable<Message> {
  31. return this.messageStream as Observable<Message>
  32. }
  33. private setupSubscriptions(): void {
  34. this.messageStream.subscribe({
  35. next: (message: Message) => this.handleIncomingMessage(message),
  36. error: (err) => console.error('Error in messageToBePublished subject:', err),
  37. complete: () => console.log('messageToBePublished subscription completed')
  38. });
  39. this.connectionState.subscribe({
  40. next: (state: ConnectionState) => this.handleConnectionStateChanges(state),
  41. error: (err) => console.error('Error in connectionState subject:', err),
  42. complete: () => console.log('connectionState subscription completed')
  43. });
  44. }
  45. private async initializeDatabaseConnection(dbName: string): Promise<Connection> {
  46. try {
  47. console.log(`${this.dbUrl}${dbName}`)
  48. const connection: mongoose.Connection = await mongoose.createConnection(`${this.dbUrl}${dbName}`);
  49. console.log(`Connected to ${this.dbUrl}${dbName}`)
  50. return connection;
  51. } catch (error) {
  52. console.error('Error connecting to MongoDB:', error);
  53. throw error;
  54. }
  55. }
  56. private handleIncomingMessage(message: Message): void {
  57. if (this.connectionState.getValue().status === 'BUFFER') {
  58. this.bufferMessage(message);
  59. }
  60. if (this.connectionState.getValue().status === 'DIRECT_PUBLISH') {
  61. // additional logic here
  62. }
  63. }
  64. private handleConnectionStateChanges(state: ConnectionState): void {
  65. console.log(this.connectionState.getValue().status)
  66. if (state.status === 'BUFFER') {
  67. if (state.payload && typeof state.payload !== 'string') {
  68. this.bufferMessage(state.payload); // Buffer the last message immediately
  69. }
  70. }
  71. if (state.status === 'DIRECT_PUBLISH') {
  72. this.releaseBufferedMessages(this.messageStream)
  73. }
  74. }
  75. private async bufferMessage(message: Message): Promise<void> {
  76. if (this.messageModel) {
  77. try {
  78. // const newMessage = new this.messageModel(message);
  79. await this.messageModel.create(message);
  80. this.messageModel.countDocuments({}).then((count) => {
  81. console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer. There is ${count} messages in datatbase at the moment.`);
  82. })
  83. } catch (error) {
  84. console.error('Error saving message to MongoDB:', error);
  85. // Implement retry logic or additional error handling here
  86. }
  87. } else {
  88. this.messageBuffer.push(message); // Fallback to local buffer if model is not defined
  89. console.log(`pushing ${(message.message as MessageLog).appData.msgId} into local array buffer.... There is now ${this.messageBuffer.length} messages`)
  90. }
  91. }
  92. private releaseBufferedMessages(messageFromBuffer: Subject<Message>): Promise<boolean> {
  93. return new Promise((resolve, reject) => {
  94. if (this.messageModel) {
  95. this.messageModel.countDocuments({}).then((count) => {
  96. console.log(`There is ${count} messages in datatbase buffer at the moment. Releasing them....`);
  97. })
  98. const stream = this.messageModel.find().cursor();
  99. stream.on('data', async (message) => {
  100. // Process each message individually
  101. messageFromBuffer.next(message);
  102. });
  103. stream.on('error', (error) => {
  104. console.error('Error streaming messages from MongoDB:', error);
  105. reject(error)
  106. });
  107. stream.on('end', async () => {
  108. // Delete the data once it has been streamed
  109. try {
  110. if (this.messageModel) {
  111. await this.messageModel.deleteMany({});
  112. console.log('Data in Mongo deleted successfully.');
  113. } else {
  114. console.log(`Message Mongoose Model is not intiated properly...`)
  115. }
  116. } catch (err) {
  117. console.error('Error deleting data:', err);
  118. }
  119. resolve(true)
  120. });
  121. }
  122. if (!this.messageModel) {
  123. // If MongoDB model is not defined, use the local buffer
  124. console.log(`Releasing buffer Message: currently there is ${this.messageBuffer.length} messages to be released`)
  125. this.messageBuffer.forEach(message => this.messageStream.next(message));
  126. this.messageBuffer.length = 0 // Clear the local buffer after transferring
  127. if (this.messageBuffer.length < 1) {
  128. resolve(true)
  129. } else {
  130. reject(`Somehow the array is not emptied. This should not happen`)
  131. }
  132. }
  133. })
  134. }
  135. public getStateObservable(): BehaviorSubject<ConnectionState> {
  136. return this.connectionState;
  137. }
  138. private async transferLocalBufferToMongoDB(): Promise<void> {
  139. return new Promise((resolve, reject) => {
  140. console.log(`Releasing buffer Message: currently there is ${this.messageBuffer.length}. Transferring to database...`)
  141. if (this.messageModel) {
  142. this.messageBuffer.forEach(async message => {
  143. try {
  144. if (this.messageModel) {
  145. await this.messageModel.create(message);
  146. }
  147. } catch (error) {
  148. console.error('Error transferring message to MongoDB:', error);
  149. }
  150. })
  151. this.messageBuffer = []; // Clear local buffer after transferring
  152. }
  153. })
  154. }
  155. // Additional methods as required...
  156. }