소스 검색

added buffer limit feature

Enzo 11 달 전
부모
커밋
470058a5f6
5개의 변경된 파일177개의 추가작업 그리고 15개의 파일을 삭제
  1. 1 1
      interfaces/general.interface.ts
  2. 3 1
      package.json
  3. 40 13
      services/buffer.service.ts
  4. 91 0
      test/grpcTest.ts
  5. 42 0
      test/simpleObsTest.ts

+ 1 - 1
interfaces/general.interface.ts

@@ -3,7 +3,7 @@
 import { Observable, Subject } from "rxjs"
 
 export interface ConnectionState {
-    status: 'BUFFER' | 'DIRECT_PUBLISH';
+    status: 'BUFFER' | 'DIRECT_PUBLISH' | 'LIMIT_EXCEEDED'
     reason?: string;
     payload?: any;
 }

+ 3 - 1
package.json

@@ -12,7 +12,9 @@
     "grpc1": "node test/grpc1.js",
     "grpc2": "node test/grpc2.js",
     "grpc3": "node test/grpc3.js",
-    "testing": "node test/test.js"
+    "testing": "node test/test.js",
+    "server": "node test/grpcTest.js",
+    "simpleObsTest": "node test/simpleObsTest.js"
   },
   "author": "",
   "license": "ISC",

+ 40 - 13
services/buffer.service.ts

@@ -7,14 +7,21 @@ export class BufferService {
   private bufferIdentifier!: string
   private messageStream: Subject<Message>;
   private connectionState: BehaviorSubject<ConnectionState>;
-  private messageBuffer: Message[] = [];
+  private messageBuffered: Message[] = [];
   private messageModel!: Model<Message>
   private readonly dbUrl!: string
+  private bufferLimit!: number
   constructor(
     messageFromApp: Subject<Message>,
     connectionStateSubject: BehaviorSubject<ConnectionState>,
-    dbName: string
+    dbName: string,
+    bufferLimit?: number
   ) {
+    if (bufferLimit) {
+      this.bufferLimit = bufferLimit
+    } else {
+      this.bufferLimit = 10000 // default buffer limit
+    }
     this.bufferIdentifier = dbName
     this.messageStream = messageFromApp;
     this.connectionState = connectionStateSubject;
@@ -80,6 +87,9 @@ export class BufferService {
   }
 
   private handleIncomingMessage(message: any): void {
+    if (this.connectionState.getValue().status === `LIMIT_EXCEEDED`) {
+      // do nothing... Let handleConnectionStateChanges deal with this state
+    }
     if (this.connectionState.getValue().status === "BUFFER") {
       this.bufferMessage(message);
     }
@@ -93,6 +103,16 @@ export class BufferService {
 
   private handleConnectionStateChanges(state: ConnectionState): void {
     console.log(`${this.bufferIdentifier}: ${this.connectionState.getValue().status}`);
+    if (state.status === `LIMIT_EXCEEDED`) {
+      console.log(`Limit exceed. Clearing buffered messages...`)
+      let message: Message = {
+        id: `test`,
+        message: `Limit exceed. Please take care. Buffer Service Out!`
+      }
+      this.messageStream.next(message)
+      // this.messageStream.unsubscribe() //destroy existing subscription
+      this.messageBuffered = []
+    }
     if (state.status === "BUFFER") {
       if (state.payload && typeof state.payload !== "string") {
         this.bufferMessage(state.payload); // Buffer the last message immediately
@@ -111,7 +131,6 @@ export class BufferService {
         await this.messageModel.create(message);
         this.messageModel.countDocuments({}).then((count) => {
           console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer. There is ${count} messages in datatbase under ${this.bufferIdentifier} at the moment.`);
-
           // if connection status okay 
           // if(this.connectionState.getValue().status == "DIRECT_PUBLISH")
           // {
@@ -125,9 +144,17 @@ export class BufferService {
         // Implement retry logic or additional error handling here 
       }
     } else {
-      this.messageBuffer.push(message);
-      console.log(this.messageBuffer) // Fallback to local buffer if model is not defined
-      console.log(`pushing into local array buffer under ${this.bufferIdentifier}.... There is now ${this.messageBuffer.length} messages`);
+      if (this.bufferLimit > this.messageBuffered.length) {
+        this.messageBuffered.push(message);
+        console.log(this.messageBuffered) // Fallback to local buffer if model is not defined
+        console.log(`pushing into local array buffer under ${this.bufferIdentifier}.... There is now ${this.messageBuffered.length} messages`);
+      } else {
+        let reportState: ConnectionState = {
+          status: `LIMIT_EXCEEDED`,
+          reason: `${this.bufferLimit} Limit exceeded. Buffer Service will be terminated...`
+        }
+        this.connectionState.next(reportState)
+      }
     }
   }
 
@@ -215,12 +242,12 @@ export class BufferService {
       }
       if (!this.messageModel) {
         // If MongoDB model is not defined, use the local buffer
-        console.log(`Releasing buffer Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffer.length} messages to be released`);
-        this.messageBuffer.forEach((message) =>
+        console.log(`Releasing buffer Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length} messages to be released`);
+        this.messageBuffered.forEach((message) =>
           this.messageStream.next(message)
         );
-        this.messageBuffer.length = 0; // Clear the local buffer after transferring
-        if (this.messageBuffer.length < 1) {
+        this.messageBuffered.length = 0; // Clear the local buffer after transferring
+        if (this.messageBuffered.length < 1) {
           resolve(true);
         } else {
           reject(`Somehow the array is not emptied. This should not happen`);
@@ -231,9 +258,9 @@ export class BufferService {
 
   private async transferLocalBufferToMongoDB(): Promise<void> {
     return new Promise((resolve, reject) => {
-      console.log(`Transferring local array buffered Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffer.length}. Transferring to database...`);
+      console.log(`Transferring local array buffered Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length}. Transferring to database...`);
       if (this.messageModel) {
-        let locallyBufferedMessage: Observable<Message> = from(this.messageBuffer);
+        let locallyBufferedMessage: Observable<Message> = from(this.messageBuffered);
         locallyBufferedMessage.subscribe({
           next: async (message: Message) => {
             try {
@@ -250,7 +277,7 @@ export class BufferService {
             if (this.messageModel) {
               this.messageModel.countDocuments({}).then((count) => {
                 console.log(`Local buffered message transfer completed. There is a total of ${count} messages in database ${this.bufferIdentifier} at the moment.`)
-                this.messageBuffer = [] // Clear local buffer after transferring
+                this.messageBuffered = [] // Clear local buffer after transferring
               });
             }
           },

+ 91 - 0
test/grpcTest.ts

@@ -0,0 +1,91 @@
+import { Subject, Subscription, from, interval, take } from 'rxjs';
+import * as grpc from '@grpc/grpc-js';
+import { readFileSync } from 'fs';
+import { message_proto } from '../services/protos/server.proto';
+import { Status } from '@grpc/grpc-js/build/src/constants';
+import assert = require('assert');
+
+const messagesJSON: any = readFileSync('payload.json')
+let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
+let hostServer: string = 'localhost:3000'
+let targetServer: string = 'localhost:3000'
+let array: any[] = [] // Used for testing    
+let intervalToStreamOutGoingMessage: number = 15
+
+/* Checking the values by the end of the test */
+interval(5000).subscribe(() => {
+    console.log(`All received data: ${array.length}`);
+});
+
+async function createServerStreamingServer(): Promise<any> { // '0.0.0.0:3001'
+    return new Promise((resolve, reject) => {
+        let server = new grpc.Server()
+        server.addService(message_proto.MessageService.service, {
+            HandleMessage: (call) => { 
+                console.log(call.request)
+                let response = {
+                    id: `1`,
+                    message: 'Hi, I received your request. Thanks for choosing FIS enterprise'
+                }
+                call.write(response)
+            }
+        })
+        server.bindAsync(hostServer, grpc.ServerCredentials.createInsecure(), (err, port) => {
+            assert.ifError(err);
+            server.start()
+            resolve(`gRPC server is running on ${hostServer}`)
+        })
+    })
+}
+
+async function createServerStreamingClient() {
+    const client = new message_proto.MessageService(targetServer, grpc.credentials.createInsecure());
+    console.log(`Sending request to ${targetServer} to open response channel...`)
+
+    let call = client.HandleMessage({ id: '1', message: `Testing` })
+
+    call.on('status', (status: Status) => {
+        if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
+            console.log(`Message trasmission operation is successful`)
+        }
+        if (status == grpc.status.UNAVAILABLE) {
+            console.log(`Request Failed`)
+        }
+    });
+
+    call.on('data', (data: any) => {
+        console.log(data)
+    });
+
+    call.on('error', (err) => {
+        console.error(err)
+    });
+
+    call.on('end', () => {
+        console.log(`DONE`)
+    })
+}
+
+createServerStreamingServer().then((res) => {
+    console.log(res)
+    createServerStreamingClient()
+})
+
+function queryBooks() {
+    // const queryBooksRequest = new QueryBooksRequest();
+    // queryBooksRequest.setAuthorPrefix("Geor");
+    // const client = grpc.client(BookService.QueryBooks, {
+    //   host: host,
+    // });
+    // client.onHeaders((headers: grpc.Metadata) => {
+    //   console.log("queryBooks.onHeaders", headers);
+    // });
+    // client.onMessage((message: Book) => {
+    //   console.log("queryBooks.onMessage", message.toObject());
+    // });
+    // client.onEnd((code: grpc.Code, msg: string, trailers: grpc.Metadata) => {
+    //   console.log("queryBooks.onEnd", code, msg, trailers);
+    // });
+    // client.start();
+    // client.send(queryBooksRequest);
+  }

+ 42 - 0
test/simpleObsTest.ts

@@ -0,0 +1,42 @@
+import { BehaviorSubject, Observable, Subject, interval } from "rxjs";
+import { BufferService } from "../services/buffer.service";
+import { ConnectionState } from "../interfaces/general.interface";
+
+// let obs1 = new Subject<any>
+let obs1 = new Subject<any>;
+interval(1000).subscribe(
+    {
+        next:(msg)=>{
+            obs1.next(msg)
+        }
+    }
+);
+
+let initialReport = { status: 'DIRECT_PUBLISH' }
+let reportSubject: BehaviorSubject<any> = new BehaviorSubject(initialReport)
+let retransmission = new BufferService(obs1, reportSubject, 'mongo')
+
+let result: Observable<any> = retransmission.getMessages()
+result.subscribe(e => console.log(e))
+
+let publishBUFFER: ConnectionState = {
+    status: `BUFFER`
+}
+let publishDIRECT_PUBLISH: ConnectionState = {
+    status: `DIRECT_PUBLISH`
+}
+
+setTimeout(()=>{
+    reportSubject.next(publishBUFFER);
+},5000) 
+
+setTimeout(()=>{
+    reportSubject.next(publishDIRECT_PUBLISH);
+},10000)
+
+// let initialReport = { status: 'BUFFER' }
+// let reportSubject: BehaviorSubject<any> = new BehaviorSubject(initialReport)
+// let retransmission = new BufferService(obs1, reportSubject, 'mongo', 5)
+
+// retransmission.getMessages().subscribe(e => console.log(e))
+