123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- import { BehaviorSubject, Subject } from 'rxjs';
- import { ClientRequest, ConnectionAttribute, ConnectionRequest, ConnectionState, Message, ServerRequest, State } from '../interfaces/general.interface';
- import { GrpcServiceMethod } from './grpc.service.method';
- import { BufferService } from './buffer.service';
- import * as dotenv from 'dotenv'
- import * as _ from 'lodash'
- dotenv.config()
- export class ServerClientManager {
- private connectionAttributes: ConnectionAttribute[] = []
- private grpcService: GrpcServiceMethod = new GrpcServiceMethod()
- private defaultServerAttribute: ServerRequest = {
- name: 'Default - Server',
- serverUrl: "localhost:3000",
- connectionType: 'GRPC',
- messageToBePublishedFromApplication: new Subject<Message>()
- }
- private defaultClientAttribute: ClientRequest = {
- name: 'Default - Client',
- targetServer: "localhost:3001",
- connectionType: 'GRPC',
- messageToBeReceivedFromRemote: new Subject<Message>()
- }
- constructor() {
- }
- public async generateConnection(request: ConnectionRequest): Promise<any> {
- return new Promise(async (resolve, reject) => {
- let initialReport: ConnectionState
- let reportSubject: BehaviorSubject<ConnectionState>
- let retransmission: BufferService
- let errorString: string
- let originalRequest = _.cloneDeep(request)
- let database: string
- let response: any = { message: `Fail to complete connection generation` }
- let statusChain: State = 1
- let connectionAttribute: ConnectionAttribute
- if (statusChain == 1) {
- if (!request.server) {
- request.server = this.defaultServerAttribute
- }
- if (!request.client) {
- request.client = this.defaultClientAttribute
- }
- if (request.database) {
- database = request.database
- } else {
- database = request.server.name + request.client.name
- }
- /* Inject retransmission here */
- initialReport = { status: 'BUFFER' }
- reportSubject = new BehaviorSubject(initialReport)
- retransmission = new BufferService(request.server.messageToBePublishedFromApplication, reportSubject, database)
- }
- if (statusChain == 1) {
- // Connection Type checking
- if (request.server!.connectionType != request.client!.connectionType) {
- statusChain = -1
- errorString = "Connection Type DOES NOT MATCH!"
- } else {
- statusChain = 1
- }
- }
- if (statusChain == 1) {
- connectionAttribute = {
- ConnectionID: {
- local: request.server!.name + request.client!.name,
- remote: request.client!.name + request.server!.name
- },
- outGoing: {
- StreamID: request.server!.name,
- PublisherID: request.server!.name,
- SubscriberID: request.server!.name,
- serverUrl: request.server?.serverUrl,
- connectionState: `OFF`,
- MessageToBePublished: retransmission!.getMessages(),
- MessageToBeReceived: null
- },
- inComing: {
- StreamID: request.client!.name,
- PublisherID: request.client!.name,
- SubscriberID: request.client!.name,
- serverUrl: request.client?.targetServer,
- connectionState: `OFF`,
- MessageToBePublished: null,
- MessageToBeReceived: request.client!.messageToBeReceivedFromRemote
- },
- connectionStatus: reportSubject!
- }
- }
- if (statusChain == 1) {
- await this.checkConnectionAttribute(connectionAttribute!).then((res) => {
- if (res == true) {
- console.log(`Connection<${connectionAttribute.ConnectionID.local}> already exists `)
- }
- if (res == false) {
- this.connectionAttributes.push(connectionAttribute)
- console.log(`Connection ${connectionAttribute.ConnectionID.local} registered...`)
- }
- console.log(`There is now ${this.connectionAttributes.length} connection Attributes`)
- })
- }
- if (statusChain == 1) {
- // This is default connection`
- if (!request.client!.connectionType) {
- request.client!.connectionType = 'GRPC'
- }
- // For each connection type:
- if (request.client!.connectionType == 'GRPC') {
- this.grpcServerStatusHandler()
- this.grpcClientStatusHandler()
- this.grpcService.create(connectionAttribute!).then(() => {
- // logic here
- }).catch(() => {
- errorString = `Something wrong with gRPC methods`
- statusChain = -1
- })
- }
- }
- if (statusChain == 1) {
- response = {
- message: "Channel Response",
- requestedTo: originalRequest,
- data: connectionAttribute!
- }
- resolve(response);
- } else if (statusChain == -1) {
- response = {
- message: "Channel Response Error",
- requestedTo: originalRequest,
- data: errorString! // put error string here
- }
- resolve(response);
- }
- })
- }
- private async checkConnectionAttribute(connectionAttribute: ConnectionAttribute): Promise<boolean> {
- return new Promise((resolve) => {
- let result: boolean = this.connectionAttributes.some(connection =>
- connection.ConnectionID.local === connectionAttribute.ConnectionID.local
- );
- resolve(result);
- });
- }
- private grpcServerStatusHandler() {
- this.grpcService.getLocalServerStatus().subscribe({
- next: (notification: any) => {
- if (notification.connectionStatus === `ON`) {
- let connectionAttribute = this.connectionAttributes.find(connection => connection.ConnectionID.local == notification.connectionIDlocal)
- if (connectionAttribute) {
- connectionAttribute.outGoing.connectionState = 'ON'
- console.log(`Connection ${notification.connectionIDlocal} updated`)
- } else {
- console.log(`Connection ${notification.connectionIDlocal} attribute is not found.`)
- }
- }
- },
- error: err => console.error(err),
- complete: () => { }
- })
- }
- private grpcClientStatusHandler() {
- this.grpcService.getClientRequest().subscribe({
- next: (clientConnectionAttribute: ConnectionAttribute) => {
- console.log(`Received a request from ${clientConnectionAttribute.outGoing.serverUrl}`)
- let connectionAttribute = this.connectionAttributes.find(connection => connection.ConnectionID.remote == clientConnectionAttribute.ConnectionID.local)
- if (connectionAttribute) {
- console.log(`Connection ${clientConnectionAttribute.ConnectionID.local} updated`)
- connectionAttribute.inComing.connectionState = 'ON'
- } else {
- console.log(`Connection Attribut ${clientConnectionAttribute.inComing.PublisherID} is not found`)
- }
- },
- error: err => console.error(err),
- complete: () => { }
- })
- }
- }
|