Kaynağa Gözat

exchanging info between server and clients >>INCOMPLETE>>

Enzo 1 yıl önce
ebeveyn
işleme
42d3111a9c

+ 4 - 7
interfaces/general.interface.ts

@@ -35,13 +35,13 @@ export interface Message {
 }
 
 export type State = -1 | 0 | 1 // For status chain effect
-
+export type ConnectionStatus = `ON` | `OFF`
 
 export interface ConnectionAttribute {
     ConnectionID: ConnectionID,
     outGoing: StreamAttribute,
     inComing: StreamAttribute,
-    connectionStatus: Subject<ConnectionState>
+    connectionStatus: Subject<ConnectionState> | null
 }
 export interface StreamAttribute {
     StreamID?: string,
@@ -49,6 +49,8 @@ export interface StreamAttribute {
     SubscriberID?: string,
     PublisherInstance?: any,
     SubscriberInstance?: any,
+    serverUrl?: string,
+    connectionState?: ConnectionStatus,
     MessageToBePublished: Observable<Message> | null
     MessageToBeReceived: Subject<Message> | null
 }
@@ -77,8 +79,3 @@ export interface ConnectionID {
     remote: string
 }
 
-export interface OutGoingInfo {
-    StreamID: string,
-    PublisherID: string,
-    SubscriberID: string
-}

+ 75 - 54
services/grpc.service.method.ts

@@ -1,56 +1,62 @@
 import * as grpc from '@grpc/grpc-js';
 import { Subject, Subscription } from "rxjs";
-import { Message, ConnectionAttribute, ConnectionRequest, GrpcConnectionType, ConnectionState, MessageLog, State, OutGoingInfo } from "../interfaces/general.interface";
+import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, MessageLog, ConnectionStatus } from "../interfaces/general.interface";
 import { Status } from '@grpc/grpc-js/build/src/constants';
 import { message_proto } from './protos/server.proto'
-import { ServerWritableStreamImpl } from '@grpc/grpc-js/build/src/server-call';
+import * as _ from 'lodash'
 export class GrpcServiceMethod {
-    // Prefilled connection attribute and pass in grpc service method for reference
-    // Isolate connection attribute referencing issue to server-client service
     private server: grpc.Server | any
     private messageToBeSendOver: Message | any
-    private clientInfo: any[] = []
-    // private callRequestsFromRemote: ServerWritableStreamImpl<any, ResponseType>[] = []
+    private clientRequest: Subject<ConnectionAttribute> = new Subject()
+    private localServerStatus: Subject<any> = new Subject()
 
-    public async create(request: ConnectionRequest, connectionAttribute: ConnectionAttribute, outGoingInfo: OutGoingInfo): Promise<any> {
-        // Assuming currently only one client
-        // this.createGrpcInstance(request.server.serverUrl, { instanceType: 'server' }, connectionAttribute, outGoingInfo)
-        // this.createGrpcInstance(request.client.targetServer, { instanceType: 'client' }, connectionAttribute, outGoingInfo)
+    // keep track of all the incoming channel request from the servers
+    public getClientRequest() {
+        return this.clientRequest
+    }
+
+    // keep track of all the incoming channel request from the servers
+    public getLocalServerStatus() {
+        return this.localServerStatus
+    }
+
+    public async create(connectionAttribute: ConnectionAttribute): Promise<any> {
+        return new Promise((resolve, reject) => {
+            this.createGrpcInstance({ instanceType: 'server' }, connectionAttribute)
+            this.createGrpcInstance({ instanceType: 'client' }, connectionAttribute)
+            resolve('Just putting it here for now....')
+        })
     }
 
     private async generateAdditionalAttributes(connectionAttribute: ConnectionAttribute, clientInfo?: any, localInfo?: any) {
-        if (clientInfo) {
-            connectionAttribute.inComing.StreamID = clientInfo.StreamID
-            connectionAttribute.inComing.PublisherID = clientInfo.PublisherID
-            connectionAttribute.inComing.SubscriberID = clientInfo.SubscriberID
-        }
-        if (localInfo) {
-            connectionAttribute.outGoing.StreamID = localInfo.StreamID
-            connectionAttribute.outGoing.PublisherID = localInfo.PublisherID
-            connectionAttribute.outGoing.SubscriberID = localInfo.SubscriberID
-        }
-        if (connectionAttribute.outGoing.StreamID && connectionAttribute.inComing.StreamID) {
-            connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.StreamID + connectionAttribute.inComing.StreamID
-            connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.StreamID + connectionAttribute.outGoing.StreamID
-        }
+        // if (clientInfo) {
+        //     connectionAttribute.inComing.StreamID = clientInfo.StreamID
+        //     connectionAttribute.inComing.PublisherID = clientInfo.PublisherID
+        //     connectionAttribute.inComing.SubscriberID = clientInfo.SubscriberID
+        // }
+        // if (localInfo) {
+        //     connectionAttribute.outGoing.StreamID = localInfo.StreamID
+        //     connectionAttribute.outGoing.PublisherID = localInfo.PublisherID
+        //     connectionAttribute.outGoing.SubscriberID = localInfo.SubscriberID
+        // }
+        // if (connectionAttribute.outGoing.StreamID && connectionAttribute.inComing.StreamID) {
+        //     connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.StreamID + connectionAttribute.inComing.StreamID
+        //     connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.StreamID + connectionAttribute.outGoing.StreamID
+        // }
     }
 
-    private async createGrpcInstance(
-        serverUrl: string,
-        grpcType: GrpcConnectionType,
-        connectionAttribute: ConnectionAttribute,
-        outGoingInfo: OutGoingInfo
-    ) {
+    private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) {
+        // Reconnection Logic here
         while (true) {
             try {
                 let recreatePromise = new Promise((resolve) => {
                     if (grpcType.instanceType == 'server') {
-                        this.createServerStreamingServer(serverUrl, connectionAttribute).then(() => {
+                        this.createServerStreamingServer(connectionAttribute).then(() => {
                             resolve('recreate')
                         })
                     }
                     if (grpcType.instanceType == 'client') {
-                        this.createServerStreamingClient(serverUrl, connectionAttribute, outGoingInfo).then(() => {
+                        this.createServerStreamingClient(connectionAttribute).then(() => {
                             resolve('recreate')
                         })
                     }
@@ -65,23 +71,24 @@ export class GrpcServiceMethod {
     }
 
     // Create Server Instance to stream all application Outgoing messages
-    public async createServerStreamingServer(
-        serverUrl: string,
-        connectionAttribute: ConnectionAttribute
-    ): Promise<any> { // '0.0.0.0:3001'
+    public async createServerStreamingServer(connectionAttribute: ConnectionAttribute): Promise<any> { // '0.0.0.0:3001'
         return new Promise((resolve, reject) => {
             try {
                 if (!this.server) {
                     this.server = new grpc.Server()
+                    this.localServerStatus.next({
+                        connectionStatus: 'ON',
+                        serverName: connectionAttribute.outGoing.PublisherID,
+                        message: `${connectionAttribute.outGoing.serverUrl} started.`
+                    })
                 } else {
-                    console.log(`Grpc server alrady started.`) // this kept calling, that means this function is resolving on it's own, prompting the reconnection logic
+                    console.log(`Grpc server alrady started.`)
                 }
 
                 this.server.addService(message_proto.Message.service, {
                     HandleMessage: (call) => {
-                        let clientInfo: OutGoingInfo = JSON.parse(call.request.message)
-                        // console.log(clientInfo)
-                        this.generateAdditionalAttributes(connectionAttribute, clientInfo)
+                        let clientInfo: ConnectionAttribute = JSON.parse(call.request.message)
+                        this.clientRequest.next(clientInfo)
 
                         console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
 
@@ -106,11 +113,10 @@ export class GrpcServiceMethod {
                                     resolve('')
                                 }
                             })
-                            console.log(connectionAttribute)
                             let report: ConnectionState = {
                                 status: 'DIRECT_PUBLISH'
                             }
-                            connectionAttribute.connectionStatus.next(report)
+                            connectionAttribute.connectionStatus!.next(report)
                         }
                     },
                     Check: (_, callback) => {
@@ -120,8 +126,8 @@ export class GrpcServiceMethod {
                     },
                 });
                 // Bind and start the server
-                this.server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
-                    console.log(`gRPC server is running on ${serverUrl}`);
+                this.server.bindAsync(connectionAttribute.outGoing.serverUrl, grpc.ServerCredentials.createInsecure(), () => {
+                    console.log(`gRPC server is running on ${connectionAttribute.outGoing.serverUrl}`);
                     this.server.start();
                 });
             }
@@ -132,17 +138,32 @@ export class GrpcServiceMethod {
     }
 
     // Send a request over to the other server to open a channel for this server to emit/stream messages over
-    public async createServerStreamingClient(
-        server: string,
-        connectionAttribute: ConnectionAttribute,
-        outGoingInfo: OutGoingInfo
-    ): Promise<string> {
+    public async createServerStreamingClient(connectionAttribute: ConnectionAttribute): Promise<string> {
         return new Promise(async (resolve, reject) => {
-            const client = new message_proto.Message(server, grpc.credentials.createInsecure());
-            this.generateAdditionalAttributes(connectionAttribute, {}, outGoingInfo)
+            const client = new message_proto.Message(connectionAttribute.inComing.serverUrl, grpc.credentials.createInsecure());
+            let localInfo: ConnectionAttribute = { // need to make a new copy where it doesn't reference the subjects, otherwise circular ref error
+                ConnectionID: connectionAttribute.ConnectionID,
+                outGoing: {
+                    StreamID: connectionAttribute.outGoing.StreamID,
+                    PublisherID: connectionAttribute.outGoing.PublisherID,
+                    SubscriberID: connectionAttribute.outGoing.SubscriberID,
+                    serverUrl: connectionAttribute.outGoing.serverUrl,
+                    MessageToBePublished: null,
+                    MessageToBeReceived: null
+                },
+                inComing: {
+                    StreamID: connectionAttribute.inComing.StreamID,
+                    PublisherID: connectionAttribute.inComing.PublisherID,
+                    SubscriberID: connectionAttribute.inComing.SubscriberID,
+                    serverUrl: connectionAttribute.inComing.serverUrl,
+                    MessageToBePublished: null,
+                    MessageToBeReceived: null
+                },
+                connectionStatus: null
+            }
+            let call = client.HandleMessage({ id: connectionAttribute.inComing.serverUrl, message: JSON.stringify(localInfo) })
 
-            let call = client.HandleMessage({ id: server, message: JSON.stringify(outGoingInfo) })
-            console.log(`Sending request to ${server} to open response channel...`)
+            console.log(`Sending request to ${connectionAttribute.inComing.serverUrl} to open response channel...`)
 
             call.on('status', (status: Status) => {
                 if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
@@ -154,7 +175,7 @@ export class GrpcServiceMethod {
                         reason: `Server doesn't seem to be alive. Error returned.`,
                         payload: this.messageToBeSendOver ?? `There's no message at the moment...`
                     }
-                    connectionAttribute.connectionStatus.next(report)
+                    connectionAttribute.connectionStatus!.next(report)
                     resolve('No connection established. Server is not responding..')
                 }
             });

+ 49 - 20
services/server-client.service.ts

@@ -1,5 +1,5 @@
 import { BehaviorSubject, Subject } from 'rxjs';
-import { ClientRequest, ConnectionAttribute, ConnectionRequest, ConnectionState, Message, OutGoingInfo, ServerRequest, State } from '../interfaces/general.interface';
+import { ClientRequest, ConnectionAttribute, ConnectionRequest, ConnectionState, Message, ServerRequest, State } from '../interfaces/general.interface';
 import { GrpcServiceMethod } from './grpc.service.method';
 import { BufferService } from './buffer.service';
 import * as dotenv from 'dotenv'
@@ -23,17 +23,38 @@ export class ServerClientManager {
     }
 
     constructor() {
-        // logic here
+        this.grpcService.getLocalServerStatus().subscribe({
+            next: (notification: any) => {
+                console.log(`Received a update from Grpc Server methods`)
+                if (notification.connectionStatus === `ON`) {
+                    let connectionAttribute = this.connectionAttributes.find(connection => connection.outGoing.PublisherID == notification.serverName)
+                    connectionAttribute!.outGoing.connectionState = 'ON'
+                    console.log(notification.message)
+                }
+            },
+            error: err => console.error(err),
+            complete: () => { }
+        })
+        this.grpcService.getClientRequest().subscribe({
+            next: (clientConnectionAttribute: ConnectionAttribute) => {
+                console.log(`Received a request from ${clientConnectionAttribute.outGoing.serverUrl}`)
+                let connectionAttribute = this.connectionAttributes.find(connection => connection.ConnectionID.remote == clientConnectionAttribute.ConnectionID.local)
+                connectionAttribute!.inComing.connectionState = 'ON' 
+                // console.log(clientConnectionAttribute)
+                console.log(`Updated!!! look down...`)
+                console.log(this.connectionAttributes.find(connection => connection.ConnectionID.remote == clientConnectionAttribute.ConnectionID.local))
+            },
+            error: err => console.error(err),
+            complete: () => { }
+        })
     }
 
-
     public async generateConnection(request: ConnectionRequest): Promise<any> {
         return new Promise(async (resolve, reject) => {
             let initialReport: ConnectionState
             let reportSubject: BehaviorSubject<ConnectionState>
             let retransmission: BufferService
             let errorString: string
-            // let originalRequest = JSON.parse(JSON.stringify(request))
             let originalRequest = _.cloneDeep(request)
             let database: string
             let response: any = { message: `Fail to complete connection generation` }
@@ -61,12 +82,12 @@ export class ServerClientManager {
             if (statusChain == 1) {
                 // Connection Type checking
                 if (request.server!.connectionType != request.client!.connectionType) {
-                    console.log(`Connection Type DOES NOT MATCH!`)
+                    // console.log(`Connection Type DOES NOT MATCH!`)
                     statusChain = -1
-                    errorString ="Connection Type DOES NOT MATCH!"
+                    errorString = "Connection Type DOES NOT MATCH!"
                 } else {
                     statusChain = 1
-                }  
+                }
             }
 
             if (statusChain == 1) {
@@ -79,6 +100,8 @@ export class ServerClientManager {
                         StreamID: request.server!.name,
                         PublisherID: request.server!.name,
                         SubscriberID: request.server!.name,
+                        serverUrl: request.server?.serverUrl,
+                        connectionState: `OFF`,
                         MessageToBePublished: retransmission!.getMessages(),
                         MessageToBeReceived: null
                     },
@@ -86,12 +109,13 @@ export class ServerClientManager {
                         StreamID: request.client!.name,
                         PublisherID: request.client!.name,
                         SubscriberID: request.client!.name,
+                        serverUrl: request.client?.targetServer,
+                        connectionState: `OFF`,
                         MessageToBePublished: null,
                         MessageToBeReceived: request.client!.messageToBeReceivedFromRemote
                     },
                     connectionStatus: reportSubject!
                 }
-                statusChain = 1
             }
 
             if (statusChain == 1) {
@@ -105,7 +129,22 @@ export class ServerClientManager {
                     }
                     console.log(`There is now ${this.connectionAttributes.length} connection Attributes`)
                 })
-                statusChain = 1
+            }
+
+            if (statusChain == 1) {
+                // This is default connection`
+                if (!request.client!.connectionType) {
+                    request.client!.connectionType = 'GRPC'
+                }
+                // For each connection type:
+                if (request.client!.connectionType == 'GRPC') {
+                    this.grpcService.create(connectionAttribute!).then(() => {
+                        // logic here
+                    }).catch(() => {
+                        errorString = `Something wrong with gRPC methods`
+                        statusChain = -1
+                    })
+                }
             }
 
             if (statusChain == 1) {
@@ -123,16 +162,6 @@ export class ServerClientManager {
                 }
                 resolve(response);
             }
-            if (statusChain == 1) {
-                // This is default connection`
-                if (!request.client!.connectionType) {
-                    request.client!.connectionType = 'GRPC'
-                }
-                // For each connection type:
-                if (request.client!.connectionType == 'GRPC') {
-                    // this.grpcService.create(request, connectionAttribute, this.outGoingInfo)
-                }
-            }
         })
     }
 
@@ -141,7 +170,7 @@ export class ServerClientManager {
             let result: boolean = this.connectionAttributes.some(connection =>
                 connection.ConnectionID.local === connectionAttribute.ConnectionID.local
             );
-            console.log(`Checking ${connectionAttribute.ConnectionID.local} and returns ${result}`);
+            // console.log(`Checking ${connectionAttribute.ConnectionID.local} and returns ${result}`);
             resolve(result);
         });
     }

+ 8 - 8
test/grpc1.ts

@@ -6,8 +6,8 @@ import { ServerClientManager } from '../services/server-client.service';
 import mongoose from 'mongoose';
 
 // Connect to MongoDB
-mongoose.connect('mongodb://localhost:27017/grpc1')
-const Message = mongoose.model('Message', require('../models/message.schema'))
+// mongoose.connect('mongodb://localhost:27017/grpc1')
+// const Message = mongoose.model('Message', require('../models/message.schema'))
 // Subject for bidirectional communication
 const connectionService: ServerClientManager = new ServerClientManager()
 const messagesJSON: any = readFileSync('payload.json')
@@ -134,15 +134,15 @@ let intervalToStreamOutGoingMessage: number = 1
 
 
 /* Simple Test: 1 to Many */
-let connectionRequest: ConnectionRequest = {
+let connectionRequest1: ConnectionRequest = {
   server: {
-    name: 'g1',
+    name: 'G0',
     serverUrl: hostServer,
     connectionType: 'GRPC',
     messageToBePublishedFromApplication: new Subject<Message>()
   },
   client: {
-    name: 'g2',
+    name: 'G1',
     targetServer: targetserver,
     connectionType: 'GRPC',
     messageToBeReceivedFromRemote: new Subject<Message>()
@@ -150,13 +150,13 @@ let connectionRequest: ConnectionRequest = {
 }
 let connectionRequest2: ConnectionRequest = {
   server: {
-    name: 'g1',
+    name: 'G0',
     serverUrl: hostServer,
     connectionType: 'GRPC',
     messageToBePublishedFromApplication: new Subject<Message>()
   },
   client: {
-    name: 'g3',
+    name: 'G2',
     targetServer: targetserver2,
     connectionType: 'GRPC',
     messageToBeReceivedFromRemote: new Subject<Message>()
@@ -164,7 +164,7 @@ let connectionRequest2: ConnectionRequest = {
 }
 
 
-connectionService.generateConnection(connectionRequest)
+connectionService.generateConnection(connectionRequest1)
 connectionService.generateConnection(connectionRequest2)
 
 // let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(50))

+ 2 - 2
test/grpc2.ts

@@ -21,13 +21,13 @@ let array: Message[] = []
 /* Simple Test: 1 to 1 */
 let connectionRequest: ConnectionRequest = {
   server: {
-    name: 'g2',
+    name: 'G1',
     serverUrl: hostServer,
     connectionType: 'GRPC',
     messageToBePublishedFromApplication: new Subject<Message>()
   },
   client: {
-    name: 'g1',
+    name: 'G0',
     targetServer: targetserver,
     connectionType: 'GRPC',
     messageToBeReceivedFromRemote: new Subject<Message>()

+ 2 - 2
test/grpc3.ts

@@ -21,13 +21,13 @@ let array: Message[] = []
 /* Simple Test: 1 to 1 */
 let connectionRequest: ConnectionRequest = {
   server: {
-    name: 'g3',
+    name: 'G2',
     serverUrl: hostServer,
     connectionType: 'GRPC',
     messageToBePublishedFromApplication: new Subject<Message>()
   },
   client: {
-    name: 'g1',
+    name: 'G0',
     targetServer: targetserver,
     connectionType: 'GRPC',
     messageToBeReceivedFromRemote: new Subject<Message>()

+ 2 - 2
test/test2.ts

@@ -42,9 +42,9 @@ let connectionRequest2: ConnectionRequest = {
 
 // Client 1 request connection
 serverClientManager.generateConnection(connectionRequest1).then((response) => {
-  console.log(response)
+  // console.log(response)
   serverClientManager.generateConnection(connectionRequest2).then((response) => {
-    console.log(response)
+    // console.log(response)
     serverClientManager.generateConnection(connectionRequest1);
   })
 })