import { Subject, take } from 'rxjs'; import { GrpcServiceMethod } from '../services/grpc.service.method'; import { readFileSync } from 'fs'; import { ConnectionRequest, Message } from '../interfaces/general.interface'; import { ServerClientManager } from '../services/server-client.service'; // Subject for bidirectional communication const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod()) const messagesJSON: any = readFileSync('payload.json') let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial let targetserver: string = 'localhost:3001' let targetserver2: string = 'localhost:3002' let hostServer: string = 'localhost:3000' let array: any[] = [] // Used for testing let connectionRequest: ConnectionRequest = { server: { name: 'grpc1', serverUrl: hostServer, connectionType: 'GRPC', messageToBePublishedfromApplication: new Subject() }, client: [{ name: 'grpc2', targetServer: targetserver, connectionType: 'GRPC', messageToBeReceivedFromRemote: new Subject() }] } let client :ConnectionAttribute[] = [ { name:"con1" ConnectionID: "aaa123-xxx123", outGoing: { Name?: string, ChannelID?: "aaa123", PublisherID?: "bbb123", SubscriberID?: "ccc123", } , inComing: { Name?: string, ChannelID?: "xxx123", PublisherID?: "yyy123", SubscriberID?: "zzz123", } , connectionStatus: Subject } { name:"con2" ConnectionID: "aaa123xxx-xxx123xx", outGoing: { Name?: string, ChannelID?: "aaa123xxx", PublisherID?: "bbb123", SubscriberID?: "ccc123xxx", } , inComing: { Name?: string, ChannelID?: "xxx123xx", PublisherID?: "yyy123xxx", SubscriberID?: "zzz123xxx", } , connectionStatus: Subject } ] // Handler for the incoming Messages from the other side. connectionRequest.client.forEach((client) => { client.messageToBeReceivedFromRemote.subscribe({ next: request => { // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is if ((request.message as MessageLog).appData.msgPayload == 'Query') { generateFakeStreamResponse(request).subscribe({ next: (responseMessage: Message) => { // console.log(`Processing request:${request.id}....`) connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage) }, error: error => console.error(error), complete: () => { console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite } }) } else { array.push(request) console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`) } }, error: error => console.error(error), complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) }) }) connectionService.generateConnection(connectionRequest) /* Simple Test */ // let generateFakeMessagesToBePublished = stream().pipe(take(10)) // generateFakeMessagesToBePublished.subscribe({ // next: message => { // let payload: Message = { // id: hostServer, // message: message // } // connectionRequest.server.messageToBePublishedfromApplication.next(payload) // } // }) /* Complex Test: Expected out come, both must receive 14 message by the end. Havent try to disconnect.*/ setTimeout(() => { let message = { id: parsedMessages[10].appData.msgId, message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request } connectionRequest.server.messageToBePublishedfromApplication.next(message) }, 3000) setTimeout(() => { let message = { id: parsedMessages[11].appData.msgId, message: parsedMessages[11]// Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request } connectionRequest.server.messageToBePublishedfromApplication.next(message) }, 4000) setTimeout(() => { console.log(`All received data: ${array.length}`) }, 10000) setTimeout(() => { console.log(`All received data: ${array.length}`) }, 20000) // this is just to publish an array of fake data as a Subject function stream(): Subject { let result: Subject = new Subject() let messages: any[] = parsedMessages let count = 0 const intervalId = setInterval(() => { result.next(messages[count]); count++; if (count >= 1000) { clearInterval(intervalId); result.complete(); } }, 500) return result } function generateFakeStreamResponse(request: any): Subject { let res: Subject = new Subject() stream().pipe(take(7)).subscribe({ next: element => { let message = { id: request.id, // Caller's message: element } res.next(message) }, error: error => console.error(error), complete: () => console.log(`Stream response for ${request.id} has been prepared.`) }) return res }