import { Subject, from, interval, take } from 'rxjs'; import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface'; import { GrpcServiceMethod } from '../services/grpc.service.method'; import { readFileSync } from 'fs'; import { ServerClientManager } from '../services/server-client.service'; import mongoose from 'mongoose'; mongoose.connect('mongodb://localhost:27017/grpc2') const Message = mongoose.model('Message', require('../models/message.schema')) // Subject for bidirectional communication const connectionService: ServerClientManager = new ServerClientManager() const messagesJSON: any = readFileSync('payload.json') let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial let targetserver: string = 'localhost:3000' let targetserver2: string = 'localhost:3001' let hostServer: string = 'localhost:3002' let intervalToStreamOutGoingMessage: number = 1 let array: Message[] = [] /* Simple Test: 1 to 1 */ let connectionRequest: ConnectionRequest = { server: { name: 'G2', serverUrl: hostServer, connectionType: 'GRPC', messageToBePublishedFromApplication: new Subject() }, client: { name: 'G0', targetServer: targetserver, connectionType: 'GRPC', messageToBeReceivedFromRemote: new Subject() } } connectionService.generateConnection(connectionRequest) // 10000th message == 848438e1-da50-4d98-aa12-e44d6d6a1489 // let generateFakeMessagesToBePublished = stream().pipe(take(1000)) // let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(1000)) // generateFakeMessagesToBePublished.subscribe({ // next: message => { // let payload: Message = { // id: hostServer, // message: message // } // connectionRequest.server.messageToBePublishedfromApplication.next(payload) // } // }) // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({ // next: response => { // // if((response.message as MessageLog).appData.msgId == `ebf94479-44fe-470d-827c-9f1389396d6a`){ // // console.log(`Received the 1000th message. Running the test. Initiating server restart....`) // // connectionService.restartServerInDuration(10) // // } // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // // Message.create(response) // array.push(response) // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) /* Complex Test: 1 to 1*/ // let connectionRequest: ConnectionRequest = { // server: { // name: 'g1', // serverUrl: hostServer, // connectionType: 'GRPC', // messageToBePublishedfromApplication: new Subject() // }, // client: { // name: 'g2', // targetServer: targetserver, // connectionType: 'GRPC', // messageToBeReceivedFromRemote: new Subject() // } // } // connectionService.generateConnection(connectionRequest) // setTimeout(() => { // let message = { // id: parsedMessages[10].appData.msgId, // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request // } // connectionRequest.server.messageToBePublishedfromApplication.next(message) // }, 3000) // setTimeout(() => { // let message = { // id: parsedMessages[11].appData.msgId, // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request // } // connectionRequest.server.messageToBePublishedfromApplication.next(message) // }, 4000) // Handler for the incoming Messages from the other side. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({ // next: request => { // // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is // if ((request.message as MessageLog).appData.msgPayload == 'Query') { // generateFakeStreamResponse(request).subscribe({ // next: (responseMessage: Message) => { // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage) // }, // error: error => console.error(error), // complete: () => { // console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite // } // }) // } else { // array.push(request) // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`) // } // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) /* Simple Test: 1 to Many */ // let connectionRequest: ConnectionRequest = { // server: { // name: 'g1', // serverUrl: hostServer, // connectionType: 'GRPC', // messageToBePublishedfromApplication: new Subject() // }, // client: { // name: 'g2', // targetServer: targetserver, // connectionType: 'GRPC', // messageToBeReceivedFromRemote: new Subject() // } // } // let connectionRequest2: ConnectionRequest = { // server: { // name: 'g1', // serverUrl: hostServer, // connectionType: 'GRPC', // messageToBePublishedfromApplication: new Subject() // }, // client: { // name: 'g3', // targetServer: targetserver2, // connectionType: 'GRPC', // messageToBeReceivedFromRemote: new Subject() // } // } // connectionService.generateConnection(connectionRequest) // connectionService.generateConnection(connectionRequest2) // let generateFakeMessagesToBePublished = stream().pipe(take(10)) // generateFakeMessagesToBePublished.subscribe({ // next: message => { // let payload: Message = { // id: hostServer, // message: message // } // connectionRequest.server.messageToBePublishedfromApplication.next(payload) // } // }) // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({ // next: request => { // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // array.push(request) // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({ // next: request => { // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // array.push(request) // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({ // next: request => { // array.push(request) // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) /* Complex Test: 1 to Many */ // let connectionRequest: ConnectionRequest = { // server: { // name: 'g1', // serverUrl: hostServer, // connectionType: 'GRPC', // messageToBePublishedfromApplication: new Subject() // }, // client: { // name: 'g2', // targetServer: targetserver, // connectionType: 'GRPC', // messageToBeReceivedFromRemote: new Subject() // } // } // let connectionRequest2: ConnectionRequest = { // server: { // name: 'g1', // serverUrl: hostServer, // connectionType: 'GRPC', // messageToBePublishedfromApplication: new Subject() // }, // client: { // name: 'g3', // targetServer: targetserver2, // connectionType: 'GRPC', // messageToBeReceivedFromRemote: new Subject() // } // } // connectionService.generateConnection(connectionRequest) // connectionService.generateConnection(connectionRequest2) // setTimeout(() => { // let message = { // id: parsedMessages[10].appData.msgId, // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request // } // connectionRequest.server.messageToBePublishedfromApplication.next(message) // connectionRequest2.server.messageToBePublishedfromApplication.next(message) // }, 3000) // setTimeout(() => { // let message = { // id: parsedMessages[11].appData.msgId, // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request // } // connectionRequest.server.messageToBePublishedfromApplication.next(message) // connectionRequest2.server.messageToBePublishedfromApplication.next(message) // }, 4000) // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({ // next: request => { // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is // if ((request.message as MessageLog).appData.msgPayload == 'Query') { // generateFakeStreamResponse(request).subscribe({ // next: (responseMessage: Message) => { // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage) // }, // error: error => console.error(error), // complete: () => { // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite // } // }) // } else { // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // array.push(request) // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`) // } // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({ // next: request => { // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is // if ((request.message as MessageLog).appData.msgPayload == 'Query') { // generateFakeStreamResponse(request).subscribe({ // next: (responseMessage: Message) => { // connectionRequest2.server.messageToBePublishedfromApplication.next(responseMessage) // }, // error: error => console.error(error), // complete: () => { // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite // } // }) // } else { // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // array.push(request) // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`) // } // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) // this is just to publish an array of fake data as a Subject function stream(): Subject { let result: Subject = new Subject() let messages: any[] = parsedMessages let count = 0 const intervalId = setInterval(() => { result.next(messages[count]); count++; if (count >= 1000) { clearInterval(intervalId); result.complete(); } }, intervalToStreamOutGoingMessage) return result } function generateFakeStreamResponse(request: any): Subject { let res: Subject = new Subject() stream().pipe(take(7)).subscribe({ next: element => { let message = { id: request.id, // Caller's message: element } res.next(message) }, error: error => console.error(error), complete: () => console.log(`Stream response for ${request.id} has been prepared.`) }) return res } /* Checking the values by the end of the test */ interval(5000).subscribe(() => { console.log(`All received data: ${array.length}`); });