123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- import * as grpc from '@grpc/grpc-js';
- import { Subject, Subscription } from "rxjs";
- import { ReportStatus, ColorCode, Message, MessageLog, ConnectionAttribute, ConnectionRequest, GrpcConnectionType } from "../interfaces/general.interface";
- import { Status } from '@grpc/grpc-js/build/src/constants';
- import { v4 as uuidv4 } from 'uuid'
- import { message_proto } from './protos/server.proto'
- export class GrpcServiceMethod {
- public async create(request: ConnectionRequest, connectionAttribute: ConnectionAttribute): Promise<any> {
- // Assuming currently only one client
- this.createGrpcInstance(request.server.serverUrl, { instanceType: 'server' }, connectionAttribute)
- this.createGrpcInstance(request.client.targetServer, { instanceType: 'client' }, connectionAttribute)
- }
- private async generateAdditionalAttributes(connectionAttribute: ConnectionAttribute, clientInfo?: any, localInfo?: any) {
- if (clientInfo) {
- connectionAttribute.inComing.ChannelID = clientInfo.channelID
- connectionAttribute.inComing.PublisherID = clientInfo.publisherID
- connectionAttribute.inComing.SubscriberID = clientInfo.subscriberID
- // let report: any = {
- // message: 'Remote Server Communication Established',
- // channelID: clientInfo.channelID
- // }
- // connectionAttribute.connectionStatus.next(report)
- }
- if (localInfo) {
- connectionAttribute.outGoing.ChannelID = localInfo.channelID
- connectionAttribute.outGoing.PublisherID = localInfo.publisherID
- connectionAttribute.outGoing.SubscriberID = localInfo.subscriberID
- // let report: any = {
- // message: 'Local Server Communication Established',
- // channelID: localInfo.channelID
- // }
- // connectionAttribute.connectionStatus.next(report)
- }
- if (connectionAttribute.outGoing.ChannelID && connectionAttribute.inComing.ChannelID) {
- connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.ChannelID + connectionAttribute.inComing.ChannelID
- connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.ChannelID + connectionAttribute.outGoing.ChannelID
- let report: ReportStatus = {
- code: ColorCode.GREEN,
- message: `ConnectionID acquired. Informing Restranmission to release Messages...`,
- }
- connectionAttribute.connectionStatus.next(report)
- console.log(connectionAttribute)
- }
- }
- // To be migrated into a service in the immediate future
- private async createGrpcInstance(
- serverUrl: string,
- grpcType: GrpcConnectionType,
- connectionAttribute: ConnectionAttribute,
- ) {
- let statusControl: Subject<ReportStatus> = connectionAttribute.connectionStatus
- let consecutiveResolutions = 0;
- let lastResolutionTime = Date.now();
- let alreadyHealthCheck: boolean = false
- let yellowErrorEmission: boolean = false
- let redErrorEmission: boolean = false
- while (true) {
- try {
- let recreatePromise = new Promise((resolve) => {
- if (grpcType.instanceType == 'server') {
- this.createServerStreamingServer(serverUrl, connectionAttribute).then(() => {
- resolve('recreate')
- })
- }
- if (grpcType.instanceType == 'client') {
- this.createServerStreamingClient(serverUrl, alreadyHealthCheck, connectionAttribute).then(() => {
- resolve('recreate')
- })
- }
- })
- await recreatePromise
- // If connection resolves (indicating failure), increment the count
- consecutiveResolutions++;
- // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
- alreadyHealthCheck = true
- if (redErrorEmission == false) {
- redErrorEmission = true
- // console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
- let error: ReportStatus = {
- code: ColorCode.RED,
- message: 'Server is not responding. Proceed to buffer.',
- }
- statusControl.next(error)
- }
- // Comment it out if Client wishes to use YELLOW for memory buffer instead of persistent storage buffer
- // if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
- // yellowErrorEmission = true
- // let error: ReportStatus = {
- // code: ColorCode.YELLOW,
- // // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
- // message: `Attempting reconnection... Server has yet to respond`,
- // }
- // statusControl.next(error);
- // }
- } catch (error) {
- // Connection did not resolve, reset the count
- consecutiveResolutions = 0;
- console.error('Connection attempt failed:', error);
- }
- // Check for a pause of more than 3 seconds since the last resolution attempt
- const currentTime = Date.now();
- const timeSinceLastResolution = currentTime - lastResolutionTime;
- if (timeSinceLastResolution > 2000) {
- consecutiveResolutions = 0;
- yellowErrorEmission = false
- redErrorEmission = false
- alreadyHealthCheck = false
- }
- // Update the last resolution time
- lastResolutionTime = currentTime;
- await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
- // timeout generate message to trigger this reconnection
- }
- }
- // Create Server Instance to stream all application Outgoing messages
- public async createServerStreamingServer(
- serverUrl: string,
- connectionAttribute: ConnectionAttribute
- ): Promise<any> { // '0.0.0.0:3001'
- return new Promise((resolve, reject) => {
- try {
- // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
- let server: grpc.Server = new grpc.Server();
- server.addService(message_proto.Message.service, {
- HandleMessage: (call) => {
- // Assign channel uuid
- let clientInfo = JSON.parse(call.request.message)
- this.generateAdditionalAttributes(connectionAttribute, clientInfo)
- console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
- if (connectionAttribute.outGoing.MessageToBePublished) {
- let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({
- next: (response: Message) => {
- console.log(`Sending ${(response.message as MessageLog).appData.msgId}`)
- let message = {
- id: response.id,
- message: JSON.stringify(response.message)
- }
- call.write(message)
- },
- error: err => {
- console.error(err)
- subscription.unsubscribe()
- resolve('')
- },
- complete: () => {
- console.log(`Stream response completed for ${call.request.id}`)
- subscription.unsubscribe()
- resolve('')
- }
- })
- }
- },
- Check: (_, callback) => {
- // for now it is just sending the status message over to tell the client it is alive
- // For simplicity, always return "SERVING" as status
- callback(null, { status: 'SERVING' });
- },
- });
- // Bind and start the server
- server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
- console.log(`gRPC server is running on ${serverUrl}`);
- server.start();
- });
- }
- catch (error) {
- resolve(error)
- }
- })
- }
- // Send a request over to the other server to open a channel for this server to emit/stream messages over
- public async createServerStreamingClient(
- server: string,
- alreadyHealthCheck: boolean,
- connectionAttribute: ConnectionAttribute
- ): Promise<string> {
- return new Promise(async (resolve, reject) => {
- const client = new message_proto.Message(server, grpc.credentials.createInsecure());
- // perform check to see if server is alive, if not terminate this grpc instant and create again
- let outGoingInfo: any = {
- channelID: uuidv4(),
- publisherID: uuidv4(),
- subscriberID: uuidv4()
- }
- this.generateAdditionalAttributes(connectionAttribute, {}, outGoingInfo)
- // connectionAttribute.ConnectionID = connectionAttribute.outGoing.ChannelID + (connectionAttribute.inComing.ChannelID ?? 'undefined')
- let call = client.HandleMessage({ id: server, message: JSON.stringify(outGoingInfo) })
- console.log(`Sending request to ${server} to open response channel...`)
- call.on('status', (status: Status) => {
- if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
- console.log(`Message trasmission operation is successful`)
- // RPC completed successfully
- } if (status == grpc.status.UNAVAILABLE) {
- let report: ReportStatus = {
- code: ColorCode.RED,
- message: `Server doesn't seem to be alive. Error returned.`,
- }
- connectionAttribute.connectionStatus.next(report)
- resolve('No connection established. Server is not responding..')
- }
- });
- call.on('data', (data: any) => {
- let response: Message = {
- id: data.id,
- message: JSON.parse(data.message)
- }
- if (connectionAttribute.inComing.MessageToBeReceived) {
- connectionAttribute.inComing.MessageToBeReceived.next(response)
- }
- console.log((response.message as MessageLog).appData.msgId)
- });
- call.on('error', (err) => {
- resolve('')
- });
- })
- }
- // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health
- public async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
- return new Promise((resolve, reject) => {
- client.Check({}, (error, response) => {
- if (response) {
- console.log(`GRPC Health check status: ${response.status} Server Connected`);
- // Intepret the response status and implement code logic or handler
- resolve(response.status)
- } else {
- if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
- reject(false)
- }
- })
- })
- }
- }
|