import * as grpc from '@grpc/grpc-js'; import { Subject, Subscription } from "rxjs"; import { ReportStatus, ColorCode, Message, MessageLog, ConnectionAttribute, ConnectionRequest, GrpcConnectionType } from "../interfaces/general.interface"; import { Status } from '@grpc/grpc-js/build/src/constants'; import { v4 as uuidv4 } from 'uuid' const message_proto = require('./protos/server.proto') export class GrpcServiceMethod { public async create(request: ConnectionRequest, connectionAttribute: ConnectionAttribute): Promise { // Assuming currently only one client this.createGrpcInstance(request.server.serverUrl, { instanceType: 'server' }, connectionAttribute) this.createGrpcInstance(request.client.targetServer, { instanceType: 'client' }, connectionAttribute) } private async generateAdditionalAttributes(connectionAttribute: ConnectionAttribute, clientInfo?: any, localInfo?: any) { if (clientInfo) { connectionAttribute.inComing.ChannelID = clientInfo.channelID connectionAttribute.inComing.PublisherID = clientInfo.publisherID connectionAttribute.inComing.SubscriberID = clientInfo.subscriberID // let report: any = { // message: 'Remote Server Communication Established', // channelID: clientInfo.channelID // } // connectionAttribute.connectionStatus.next(report) } if (localInfo) { connectionAttribute.outGoing.ChannelID = localInfo.channelID connectionAttribute.outGoing.PublisherID = localInfo.publisherID connectionAttribute.outGoing.SubscriberID = localInfo.subscriberID // let report: any = { // message: 'Local Server Communication Established', // channelID: localInfo.channelID // } // connectionAttribute.connectionStatus.next(report) } if (connectionAttribute.outGoing.ChannelID && connectionAttribute.inComing.ChannelID) { connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.ChannelID + connectionAttribute.inComing.ChannelID connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.ChannelID + connectionAttribute.outGoing.ChannelID let report: ReportStatus = { code: ColorCode.GREEN, message: `ConnectionID: ${connectionAttribute.ConnectionID.local} && ${connectionAttribute.ConnectionID.remote} `, } connectionAttribute.connectionStatus.next(report) console.log(connectionAttribute) } } // To be migrated into a service in the immediate future private async createGrpcInstance( serverUrl: string, grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute, ) { let statusControl: Subject = connectionAttribute.connectionStatus let consecutiveResolutions = 0; let lastResolutionTime = Date.now(); let alreadyHealthCheck: boolean = false let yellowErrorEmission: boolean = false let redErrorEmission: boolean = false while (true) { try { let recreatePromise = new Promise((resolve) => { if (grpcType.instanceType == 'server') { this.createServerStreamingServer(serverUrl, connectionAttribute).then(() => { resolve('recreate') }) } if (grpcType.instanceType == 'client') { this.createServerStreamingClient(serverUrl, alreadyHealthCheck, connectionAttribute).then(() => { resolve('recreate') }) } }) await recreatePromise // If connection resolves (indicating failure), increment the count consecutiveResolutions++; // console.log(`Reconnection Attempt: ${consecutiveResolutions}`) alreadyHealthCheck = true // If there are x consecutive resolutions, log an error and break the loop if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) { redErrorEmission = true console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`); let error: ReportStatus = { code: ColorCode.RED, message: 'Initiate Doomsday protocol....', } statusControl.next(error) } if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) { yellowErrorEmission = true let error: ReportStatus = { code: ColorCode.YELLOW, // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond` message: `Attempting reconnection... Server has yet to respond`, } statusControl.next(error); } } catch (error) { // Connection did not resolve, reset the count consecutiveResolutions = 0; console.error('Connection attempt failed:', error); } // Check for a pause of more than 3 seconds since the last resolution attempt const currentTime = Date.now(); const timeSinceLastResolution = currentTime - lastResolutionTime; if (timeSinceLastResolution > 2000) { consecutiveResolutions = 0; yellowErrorEmission = false redErrorEmission = false alreadyHealthCheck = false } // Update the last resolution time lastResolutionTime = currentTime; await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt // timeout generate message to trigger this reconnection } } // Create Server Instance to stream all application Outgoing messages public async createServerStreamingServer( serverUrl: string, connectionAttribute: ConnectionAttribute ): Promise { // '0.0.0.0:3001' return new Promise((resolve, reject) => { try { // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md let server: grpc.Server = new grpc.Server(); server.addService(message_proto.Message.service, { HandleMessage: (call) => { // Assign channel uuid let clientInfo = JSON.parse(call.request.message) this.generateAdditionalAttributes(connectionAttribute, clientInfo) console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`) if (connectionAttribute.outGoing.MessageToBePublished) { let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({ next: (response: Message) => { console.log(`Sending ${(response.message as MessageLog).appData.msgId}`) let message = { id: response.id, message: JSON.stringify(response.message) } call.write(message) }, error: err => { console.error(err) subscription.unsubscribe() resolve('') }, complete: () => { console.log(`Stream response completed for ${call.request.id}`) subscription.unsubscribe() resolve('') } }) } }, Check: (_, callback) => { // for now it is just sending the status message over to tell the client it is alive // For simplicity, always return "SERVING" as status callback(null, { status: 'SERVING' }); }, }); // Bind and start the server server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => { console.log(`gRPC server is running on ${serverUrl}`); server.start(); }); } catch (error) { resolve(error) } }) } // Send a request over to the other server to open a channel for this server to emit/stream messages over public async createServerStreamingClient( server: string, alreadyHealthCheck: boolean, connectionAttribute: ConnectionAttribute ): Promise { return new Promise(async (resolve, reject) => { const client = new message_proto.Message(server, grpc.credentials.createInsecure()); // perform check to see if server is alive, if not terminate this grpc instant and create again this.checkConnectionHealth(client, connectionAttribute.connectionStatus, alreadyHealthCheck).catch(() => { resolve('') }) let outGoingInfo: any = { channelID: uuidv4(), publisherID: uuidv4(), subscriberID: uuidv4() } this.generateAdditionalAttributes(connectionAttribute, {}, outGoingInfo) // connectionAttribute.ConnectionID = connectionAttribute.outGoing.ChannelID + (connectionAttribute.inComing.ChannelID ?? 'undefined') let call = client.HandleMessage({ id: server, message: JSON.stringify(outGoingInfo) }) console.log(`Sending request to ${server} to open response channel...`) call.on('status', (status: Status) => { if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits console.log(`Message trasmission operation is successful`) // RPC completed successfully } if (status == grpc.status.UNAVAILABLE) { let report = { code: ColorCode.YELLOW, message: `Server doesn't seem to be alive. Error returned.`, from: `Server Streaming Client Instance` } connectionAttribute.connectionStatus.next(report) resolve('No connection established. Server is not responding..') } }); call.on('data', (data: any) => { let response: Message = { id: data.id, message: JSON.parse(data.message) } if (connectionAttribute.inComing.MessageToBeReceived) { connectionAttribute.inComing.MessageToBeReceived.next(response) } console.log((response.message as MessageLog).appData.msgId) }); call.on('error', (err) => { resolve('') }); }) } // Check connection To be Update. This function is destroying my code flow public async checkConnectionHealth(client: any, statusControl: Subject, alreadyHealthCheck: boolean): Promise { return new Promise((resolve, reject) => { client.Check({}, (error, response) => { if (response) { console.log(`GRPC Health check status: ${response.status} Server Connected`); let report: ReportStatus = { code: ColorCode.GREEN, message: `Good to go!!!`, } statusControl.next(report) } else { if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`); resolve(false) } }) }) } }