import { Subject, take } from 'rxjs'; import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface'; import { GrpcServiceMethod } from '../services/grpc.service.method'; import { readFileSync } from 'fs'; import { ServerClientManager } from '../services/server-client.service'; // Subject for bidirectional communication const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod()) const messagesJSON: any = readFileSync('payload.json') let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial let targetserver: string = 'localhost:300' // let targetserver2: string = 'localhost:3002' let hostServer: string = 'localhost:3002' let array: any[] = [] // Used for testing let request: ConnectionRequest = { server: { serverUrl: hostServer, connectionType: 'GRPC', messageToBePublishedfromApplication: new Subject() }, client: { targetServer: targetserver, connectionType: 'GRPC', messageToBeReceivedFromRemote: new Subject() }, } // Handler for the incoming Messages from the other side. request.client.messageToBeReceivedFromRemote.subscribe({ next: request => { // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is if ((request.message as MessageLog).appData.msgPayload == 'Query') { generateFakeStreamResponse(request).subscribe({ next: (responseMessage) => { // console.log(`Processing request:${request.id}....`) request.server.messageToBePublishedfromApplication.next(responseMessage) }, error: error => console.error(error), complete: () => { console.log(`Stream request for ${request.id} is queued.`) // should be indefinite } }) } else { array.push(request) console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`) } }, error: error => console.error(error), complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) }) connectionService.generateConnection(request) // grpcService.createGrpcInstance(hostServer, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased) // grpcService.createGrpcInstance(targetserver, { instanceType: 'client', serviceMethod: 'server streaming' }) // setTimeout(() => { // let message = { // id: parsedMessages[10].appData.msgId, // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request // } // applicationOutgoingResponse.next(message) // }, 3000) // setTimeout(() => { // let message = { // id: parsedMessages[11].appData.msgId, // message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request // } // applicationOutgoingResponse.next(message) // }, 4000) // setTimeout(() => { // console.log(`All received data: ${array.length}`) // }, 10000) // setTimeout(() => { // console.log(`All received data: ${array.length}`) // }, 20000) // this is just to publish an array of fake data as a Subject function stream(): Subject { let result: Subject = new Subject() let messages: any[] = parsedMessages let count = 0 const intervalId = setInterval(() => { result.next(messages[count]); count++; if (count >= 1000) { clearInterval(intervalId); result.complete(); } }, 500) return result } function generateFakeStreamResponse(request: any): Subject { let res: Subject = new Subject() stream().pipe(take(7)).subscribe({ next: element => { let message = { id: request.id, // Caller's message: element } res.next(message) }, error: error => console.error(error), complete: () => console.log(`Stream response for ${request.id} has been prepared.`) }) return res }