123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- import * as _ from 'lodash'
- import * as fs from 'fs'
- import mongoose, { Model, Schema } from 'mongoose';
- import { Observable, Subject, Subscription, from } from 'rxjs'
- import { ColorCode, GrpcMessage, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
- require('dotenv').config();
- export class FisRetransmissionService {
- private mongoUrl: string = process.env.MONGO + 'emergencyStorage'
- private bufferedStorage: any[] = []
- private mongoConnection: any
- private messageModel: any
- private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string)
- constructor() {
-
- this.manageMongoConnection()
- }
-
- public handleMessage(messageToBePublished: Subject<GrpcMessage>, statusReport: Subject<ReportStatus>): Subject<GrpcMessage> {
- let releaseMessageSubject: Subject<GrpcMessage> = new Subject()
-
- let messageReleaseSubscription: Subscription | null = null
- let messageBufferSubscription: Subscription | null = null
- let messageStreamToMongo: Subscription | null = null
- this.checkBufferLimit(messageToBePublished, statusReport)
- statusReport.subscribe((report: ReportStatus) => {
- if (report.code == ColorCode.GREEN) {
- console.log(`Connection status report && ${report.message ?? 'No Message'}`)
-
- let status: Status = 1
- if (status === 1) {
- messageStreamToMongo = this.deactivateMongoStreamSubscription(messageStreamToMongo)
- if (messageStreamToMongo) status = 0
- }
- if (status === 1) {
- messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
- if (messageBufferSubscription) status = 0
- }
- if (status === 1) {
- messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, messageToBePublished, releaseMessageSubject)
- if (!messageReleaseSubscription) status = 0
- }
- if (status === 1) {
- this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<GrpcMessage>) => {
- resObs.subscribe({
- next: message => releaseMessageSubject.next(message),
- error: err => console.error(err),
- complete: () => {
- this.bufferedStorage = []
- console.log(`Reset buffer Storage count: ${this.bufferedStorage.length}. All messages have been released back into the stream.`)
- }
- })
- }).catch((err) => {
- status = 0
- console.error(err)
- })
- }
- if (status === 1) {
- this.releaseMessageFromMongoStorage().then((resObs: Subject<GrpcMessage>) => {
- resObs.subscribe({
- next: message => releaseMessageSubject.next(message),
- error: err => console.error(err),
- complete: () => console.log(`All Mongo data are transferred `)
- })
- }).catch((err) => {
- status = 0
- console.error(err)
- })
- }
- if (status === 0) {
- console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
- }
- }
- if (report.code == ColorCode.YELLOW) {
- if (report.payload) {
- console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`)
- this.bufferedStorage.push(report.payload)
- }
- console.log(`Connection status report && ${report.message ?? 'No Message'}`)
- let status: Status = 1
-
- if (status === 1) {
- messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, messageToBePublished)
- if (!messageBufferSubscription) status = 0
- }
- if (status === 1) {
- messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
- if (messageReleaseSubscription) status = 0
- }
- if (status === 0) {
- console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
- }
- }
- if (report.code == ColorCode.RED) {
- console.log(`Connection status report: Server down. ${report.message} lol`)
- let status: Status = 1
- if (status === 1) {
- messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, messageToBePublished)
- if (!messageStreamToMongo) status = 0
- }
- if (status === 1) {
- messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
- if (messageBufferSubscription) status = 0
- }
- if (status === 1) {
- this.transferBufferedMessageToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: any[]) => {
- if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1
- })
- }
- if (status === 0) {
- console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
- }
- }
- if (!report.code) {
- console.log(`Unknown message...`)
- }
- })
- return releaseMessageSubject
- }
- private checkBufferLimit(message: Subject<GrpcMessage>, statusReport: Subject<ReportStatus>) {
- let status: Status = 1
- if (status = 1) {
- message.subscribe(() => {
- if (this.bufferedStorage.length >= this.maximumBufferLength) {
-
- console.log(`Buffer length exceeds limit imposed!!!`)
- let report: ReportStatus = {
- code: ColorCode.RED,
- message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,
- from: `Error Handling Service`
- }
- statusReport.next(report)
- }
- })
- }
- }
-
- private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, messageToBePublished: Subject<GrpcMessage>, releaseMessageSubject: Subject<GrpcMessage>): Subscription | null {
- let status: Status = 1
- if (status = 1) {
- if (!messageReleaseSubscription) {
- messageReleaseSubscription = messageToBePublished.subscribe({
- next: (message: GrpcMessage) => {
- console.log(`Releasing ${(message.message as MessageLog).appData.msgId}...`);
- releaseMessageSubject.next(message);
- },
- error: (err) => console.error(err),
- complete: () => { },
- });
- console.log(`Subscription message release activated.`);
- } else {
- status = 0
- console.log(`Subscription message release is already active.`);
- }
- }
- return messageReleaseSubscription
- }
-
- private deactivateReleaseSubscription(messageReleaseSubscription: Subscription | null): Subscription | null {
- let status: Status = 1
- if (status = 1) {
- if (messageReleaseSubscription) {
- messageReleaseSubscription.unsubscribe();
- messageReleaseSubscription = null;
- console.log(`Subscription message release deactivated.`);
- } else {
- console.log(`Subscription message release is already deactivated.`);
- }
- }
- return messageReleaseSubscription
- }
-
- private activateBufferSubscription(bufferStorage: GrpcMessage[], messageBufferSubscription: Subscription | null, messageToBePublished: Subject<GrpcMessage>): Subscription | null {
- let status: Status = 1
- if (status = 1) {
- if (!messageBufferSubscription) {
- messageBufferSubscription = messageToBePublished.subscribe({
- next: (message: any) => {
- console.log(`Buffering ${(message.message as MessageLog).appData.msgId}... Local array length: ${bufferStorage.length}`);
- bufferStorage.push(message)
- },
- error: (err) => console.error(err),
- complete: () => { },
- });
- console.log(`Subscription message buffer activated.`);
- } else {
- status = 0
- console.log(`Subscription message buffer is already active.`);
- }
- }
- return messageBufferSubscription
- }
-
- private deactivateBufferSubscription(messageBufferSubscription: Subscription | null): Subscription | null {
- let status: Status = 1
- if (status) {
- if (messageBufferSubscription) {
- messageBufferSubscription.unsubscribe();
- messageBufferSubscription = null;
- console.log(`Subscription message buffer deactivated.`);
- } else {
- status = 0
- console.log(`Subscription message buffer is already deactivated.`);
- }
- }
- return null
- }
-
- private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, messageToBePublished: Subject<GrpcMessage>): Subscription | null {
- let status: Status = 1
- if (status = 1) {
- if (!messageStreamToMongo) {
- messageStreamToMongo = messageToBePublished.subscribe({
- next: (message: any) => {
- console.log(`Saving ${(message.message as MessageLog).appData.msgId}...`);
- this.saveToMongo(message)
- },
- error: (err) => console.error(err),
- complete: () => { },
- });
- console.log(`Subscription message streaming to Mongo activated.`);
- } else {
- status = 0
- console.log(`Subscription message streaming to Mongo is already active.`);
- }
- }
- return messageStreamToMongo
- }
-
- private deactivateMongoStreamSubscription(messageStreamToMongo: Subscription | null): Subscription | null {
- let status: Status = 1
- if (status = 1) {
- if (messageStreamToMongo) {
- messageStreamToMongo.unsubscribe();
- messageStreamToMongo = null;
- console.log(`Subscription message streaming to Mongo deactivated.`);
- } else {
- status = 0
- console.log(`Subscription message streaming to Mongo is already deactivated.`);
- }
- }
- return messageStreamToMongo
- }
-
- private async saveToMongo(message: GrpcMessage): Promise<boolean> {
- return new Promise((resolve, reject) => {
-
- this.messageModel.create(message).then(() => {
- console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`);
- resolve(true)
- }).catch((err) => {
- console.log(`MongoSaveError: ${err.message}`)
- reject(err)
- })
- })
- }
-
- private async transferBufferedMessageToMongoStorage(bufferedMessage: GrpcMessage[], messageBufferSubscription: Subscription | null): Promise<GrpcMessage[]> {
- return new Promise((resolve, reject) => {
- let status: Status = 1
- if (status = 1) {
- let bufferedStorage: Observable<GrpcMessage> = from(bufferedMessage)
- bufferedStorage.subscribe({
- next: (message: any) => {
- this.saveToMongo(message).then((res) => {
- console.log(`Message ${(message.message as MessageLog).appData.msgId} saved successfully...`)
- }).catch((err) => console.error(err))
- },
- error: (error) => {
- reject(error)
- console.error(error)
- },
- complete: () => {
- this.bufferedStorage = []
- if (messageBufferSubscription) {
- console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`)
- }
- resolve(this.bufferedStorage)
- }
- })
- }
- })
- }
-
- private async releaseMessageFromLocalBuffer(bufferedStorage: GrpcMessage[]): Promise<Observable<GrpcMessage>> {
- return new Promise((resolve, reject) => {
- let status: Status = 1
- if (status = 1) {
- if (bufferedStorage.length > 1) {
- let caseVariable = this.bufferedStorage.length > 1;
- console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
- let returnArrayObs: Observable<GrpcMessage> = from(bufferedStorage)
- resolve(returnArrayObs)
- } else {
- let message = `There is no data in stored in local instance`
- reject(message)
- }
- }
- })
- }
-
- private async releaseMessageFromMongoStorage(): Promise<Subject<GrpcMessage>> {
- return new Promise((resolve, reject) => {
- let status: Status = 1
- if (status = 1) {
- let dataSubject: Subject<GrpcMessage> = new Subject()
- this.extractAllMessages(dataSubject)
- resolve(dataSubject)
- }
- })
- }
-
- private async connectToMongoDatabase(): Promise<any> {
- return new Promise((resolve, reject) => {
- let status: Status = 1
- if (status = 1) {
- console.log(this.mongoUrl)
- this.mongoConnection = mongoose.createConnection(this.mongoUrl)
- this.mongoConnection.on('error', (error) => {
- console.error('Connection error:', error);
- resolve('')
- });
- this.mongoConnection.once('open', () => {
- console.log(`Connected to ${process.env.MONGO}`);
- this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));
- });
- }
- })
- }
-
- private async manageMongoConnection(): Promise<boolean> {
- while (true) {
- try {
- await this.connectToMongoDatabase()
- } catch (error) {
- console.log(`Something Wrong occured. Please check at manageMongoConnection`)
- }
- await new Promise(resolve => setTimeout(resolve, 1000));
- }
- }
- public async extractAllMessages(subjectArgs: Subject<GrpcMessage>): Promise<void> {
-
- let status: Status = 1
- if (status = 1) {
- if (this.messageModel) {
- const eventStream = this.messageModel.find().lean().cursor()
- eventStream.on('data', (message) => {
-
- subjectArgs.next(message);
- });
- eventStream.on('end', async () => {
-
- subjectArgs.complete();
-
- try {
- await this.messageModel.deleteMany({});
- console.log('Data in Mongo deleted successfully.');
- } catch (err) {
- console.error('Error deleting data:', err);
- }
- });
- } else {
- status = 0
- console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connectino properly!`)
- }
- }
- }
- }
|