123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- import * as grpc from '@grpc/grpc-js';
- import { Subject, Subscription } from "rxjs";
- import { ReportStatus, ColorCode, Message, MessageLog } from "../interfaces/general.interface";
- import { Status } from '@grpc/grpc-js/build/src/constants';
- import { error } from 'console';
- const message_proto = require('./protos/server.proto')
- export class GrpcServiceMethod {
-
- public async createServerStreamingServer(
- serverUrl: string,
- grpcServerConnection: any,
- messageToBeStream: Subject<Message>
- ): Promise<any> {
- return new Promise((resolve, reject) => {
- try {
-
- let server: grpc.Server = new grpc.Server();
-
-
- server.addService(message_proto.Message.service, {
- HandleMessage: (call) => {
-
- console.log(`Intializing stream. Opening Channel. Confirmation from ${call.getPeer()}`)
- let subscription: Subscription = messageToBeStream.subscribe({
- next: (response: Message) => {
- console.log(`Sending ${(response.message as MessageLog).appData.msgId}`)
- let message = {
- id: response.id,
- message: JSON.stringify(response.message)
- }
- call.write(message)
- },
- error: err => {
- console.error(err)
- subscription.unsubscribe()
- resolve('')
- },
- complete: () => {
- console.log(`Stream response completed for ${call.request.id}`)
- subscription.unsubscribe()
- resolve('')
-
- }
- })
- },
- Check: (_, callback) => {
-
-
-
- callback(null, { status: 'SERVING' });
- },
- });
-
- server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
- console.log(`gRPC server is running on ${serverUrl}`);
- server.start();
- });
- grpcServerConnection[serverUrl] = server
- }
- catch (error) {
- resolve(error)
- }
- })
- }
-
- public async createServerStreamingClient(
- server: string,
- alreadyHealthCheck: boolean,
- statusControl: Subject<ReportStatus>,
- incomingMessage: Subject<Message>
- ): Promise<string> {
- return new Promise(async (resolve, reject) => {
- const client = new message_proto.Message(server, grpc.credentials.createInsecure());
-
- this.checkConnectionHealth(client, statusControl, alreadyHealthCheck).catch((error) => {
- resolve('')
- })
-
- let call = client.HandleMessage({ id: `0000`, message: `Intiate Main Stream Channel Response` })
- console.log(`Sending request to open response channel...`)
- call.on('status', (status: Status) => {
- if (status == grpc.status.OK) {
- console.log(`Message trasmission operation is successful`)
-
- } if (status == grpc.status.UNAVAILABLE) {
- let report = {
- code: ColorCode.YELLOW,
- message: `Server doesn't seem to be alive. Error returned.`,
- from: `Server Streaming Client Instance`
- }
- statusControl.next(report)
- resolve('No connection established. Server is not responding..')
- }
- });
- call.on('data', (data: any) => {
-
- let response: Message = {
- id: data.id,
- message: JSON.parse(data.message)
- }
- incomingMessage.next(response)
- console.log((response.message as MessageLog).appData.msgId)
- });
- call.on('error', (err) => {
- resolve('')
- });
-
-
-
-
- })
- }
-
- public async createGrpcBidirectionalServer(
- serverUrl: string,
- messageToBeStream: Subject<any>,
- statusControl: Subject<ReportStatus>,
- grpcServerConnection: any,
- ): Promise<any> {
- return new Promise((resolve, reject) => {
- try {
-
- let server: grpc.Server = new grpc.Server();
-
-
- server.addService(message_proto.Message.service, {
- sendMessageStream: (call) => {
- console.log(`Client connected from: ${call.getPeer()}`);
- let report: ReportStatus = {
- code: ColorCode.GREEN,
- message: `Client connected!!`,
- from: `Bidirectional Instance`
- }
- statusControl.next(report)
-
- let subscription: Subscription = messageToBeStream.subscribe({
- next: (payload: any) => {
- let noConnection = call.cancelled
- if (noConnection === true) {
- let report: ReportStatus = {
- code: ColorCode.YELLOW,
- message: `Client is not alive.....`,
- payload: payload,
- from: `Bidirectional Instance`
- }
- statusControl.next(report)
- subscription.unsubscribe()
- } else {
- console.log(`Sending ${payload.appData.msgId}`)
- let message: string = JSON.stringify(payload)
- call.write({ message })
- }
- },
- error: err => console.error(err),
- complete: () => { }
- })
- call.on('data', (data: any) => {
-
- let payload = JSON.parse(data.message)
- console.log(`Received Message from Client: ${payload.appData?.msgId}`);
-
-
-
-
-
-
-
-
-
- });
- call.on('end', () => {
- console.log('Client stream ended');
-
- });
- call.on('error', (err) => {
-
-
-
- });
- call.on('close', () => {
- console.log('Unknown cause for diconnectivity');
-
- });
- },
- Check: (_, callback) => {
-
-
-
- callback(null, { status: 'SERVING' });
- },
- });
-
- server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
- console.log(`gRPC server is running on ${serverUrl}`);
- server.start();
- });
- grpcServerConnection[serverUrl] = server
- }
- catch (error) {
- resolve(error)
- }
- })
- }
- public async createBidirectionalStreamingClient(
- server: string,
- alreadyHealthCheck: boolean,
- messageToBeTransmitted: Subject<any>,
- statusControl: Subject<ReportStatus>,
- incomingResponse: Subject<Message>
- ): Promise<string> {
- let subscription: any
- let unsubscribed: boolean = false
- return new Promise(async (resolve, reject) => {
- const client = new message_proto.Message(server, grpc.credentials.createInsecure());
- const call = client.sendMessageStream();
- this.checkConnectionHealth(client, statusControl, alreadyHealthCheck)
- call.on('status', (status: Status) => {
-
-
-
-
-
- });
-
-
- subscription = messageToBeTransmitted.subscribe({
- next: (payload: any) => {
- if (!unsubscribed) {
- console.log(`Sending ${payload.appData.msgId}`)
- let message: string = JSON.stringify(payload)
- call.write({ message })
- }
- },
- error: err => console.error(err),
- complete: () => { }
- });
- call.on('data', (data: any) => {
- let message = JSON.parse(data.message)
- console.log(`Received message from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
- });
- call.on('error', (err) => {
-
- if (!unsubscribed && subscription) {
- subscription.unsubscribe();
- unsubscribed = true;
- }
- resolve('Server Error');
- });
- call.on('end', () => {
- if (!unsubscribed && subscription) {
- subscription.unsubscribe();
- unsubscribed = true;
- }
- resolve('Server Error');
- });
- })
- }
-
- public async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
- return new Promise((resolve, reject) => {
- client.Check({}, (error, response) => {
- if (response) {
- console.log(`GRPC Health check status: ${response.status} Server Connected`);
- let report: ReportStatus = {
- code: ColorCode.GREEN,
- message: `Good to go!!!`,
- from: `GRPC health check`
- }
- statusControl.next(report)
- } else {
- if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
- resolve(false)
- }
- })
- })
- }
- }
|