Selaa lähdekoodia

Massive enhancments and socket fixes

Dr-Swopt 1 kuukausi sitten
vanhempi
commit
e7b6b26a40

+ 1 - 1
dist/config/config.json

@@ -1,5 +1,5 @@
 {
     "connection": {
-        "transmitter": "http://localhost:3000/"
+        "transmitter": "http://localhost:3001/"
     }
 }

+ 13 - 2
doc/explanation.txt

@@ -5,7 +5,8 @@ i) Explore multiple traffic concept
 ii) Move transport service instantiation to adapterManager
 ExtraNotes: In some cases, servers can only be transmitting. Although this program allows for dual roles if there's a need for me.
 
-For 25/11/2024 Monday
+For 2/12/2024 Monday (Updated!!!!)
+URGENT: Fix the default socket transmission first. SOmething wrong with retransmission for both side when either side reconnected. Play with proxy for now.
 i) Do include in the list of discussion for the message ordering mechanism
 - Currently the way it is working is not favorable. In the mean time, do think up suggestion to improve the ordering.
 - Just to acquire boss's input. Doesn't have to challenge him or anything
@@ -34,4 +35,14 @@ iv) Documentation
 -A special Readme file to help understand the usage and what it does.
 -Also guide for future enhancements
  
-DONT  FORGET TO UPDATE SPICEWORK!!!!!!
+DONT  FORGET TO UPDATE SPICEWORK!!!!!!
+
+Optiomal but cool things to do:
+i) A global logging service that can be configured and toggled <DONE>
+- instead a console.log, create a logging features in utility. It's like nestjs version but but my own. It will just read from logSetting.json configurations
+
+
+Things to be discuss later: (as of 2/12/2024)
+i) If a service crashes and comes back up, does client(UI sender) still send requests?
+-Since the service loses all it's memory and have no clue who were connected and what it was doing unless there's a state machine.
+-Current paradigm only solves for internet disruption, it doesn't assume either of the sender and receiver crashes.

+ 12 - 0
logSetting.json

@@ -0,0 +1,12 @@
+{
+    "base": true,
+    "managers": true,
+    "transmission": true,
+    "adapter": true,
+    "transport": false,
+    "error": true,
+    "util": true,
+    "details": false,
+    "location": true,
+    "retransmission": true
+}

+ 159 - 0
package-lock.json

@@ -10,11 +10,13 @@
       "license": "ISC",
       "dependencies": {
         "axios": "^1.7.7",
+        "chalk": "^4.1.0",
         "dotenv": "^16.4.5",
         "express": "^4.21.1",
         "rxjs": "^7.8.1",
         "socket.io": "^4.8.0",
         "socket.io-client": "^4.8.0",
+        "source-map-support": "^0.5.21",
         "uuid": "^10.0.0"
       },
       "devDependencies": {
@@ -163,6 +165,21 @@
         "node": ">= 0.6"
       }
     },
+    "node_modules/ansi-styles": {
+      "version": "4.3.0",
+      "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz",
+      "integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==",
+      "license": "MIT",
+      "dependencies": {
+        "color-convert": "^2.0.1"
+      },
+      "engines": {
+        "node": ">=8"
+      },
+      "funding": {
+        "url": "https://github.com/chalk/ansi-styles?sponsor=1"
+      }
+    },
     "node_modules/array-flatten": {
       "version": "1.1.1",
       "resolved": "https://registry.npmjs.org/array-flatten/-/array-flatten-1.1.1.tgz",
@@ -234,6 +251,12 @@
       "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==",
       "license": "MIT"
     },
+    "node_modules/buffer-from": {
+      "version": "1.1.2",
+      "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.2.tgz",
+      "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==",
+      "license": "MIT"
+    },
     "node_modules/bytes": {
       "version": "3.1.2",
       "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.2.tgz",
@@ -262,6 +285,40 @@
         "url": "https://github.com/sponsors/ljharb"
       }
     },
+    "node_modules/chalk": {
+      "version": "4.1.0",
+      "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.0.tgz",
+      "integrity": "sha512-qwx12AxXe2Q5xQ43Ac//I6v5aXTipYrSESdOgzrN+9XjgEpyjpKuvSGaN4qE93f7TQTlerQQ8S+EQ0EyDoVL1A==",
+      "license": "MIT",
+      "dependencies": {
+        "ansi-styles": "^4.1.0",
+        "supports-color": "^7.1.0"
+      },
+      "engines": {
+        "node": ">=10"
+      },
+      "funding": {
+        "url": "https://github.com/chalk/chalk?sponsor=1"
+      }
+    },
+    "node_modules/color-convert": {
+      "version": "2.0.1",
+      "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz",
+      "integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==",
+      "license": "MIT",
+      "dependencies": {
+        "color-name": "~1.1.4"
+      },
+      "engines": {
+        "node": ">=7.0.0"
+      }
+    },
+    "node_modules/color-name": {
+      "version": "1.1.4",
+      "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz",
+      "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==",
+      "license": "MIT"
+    },
     "node_modules/combined-stream": {
       "version": "1.0.8",
       "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz",
@@ -682,6 +739,15 @@
         "url": "https://github.com/sponsors/ljharb"
       }
     },
+    "node_modules/has-flag": {
+      "version": "4.0.0",
+      "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz",
+      "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==",
+      "license": "MIT",
+      "engines": {
+        "node": ">=8"
+      }
+    },
     "node_modules/has-property-descriptors": {
       "version": "1.0.2",
       "resolved": "https://registry.npmjs.org/has-property-descriptors/-/has-property-descriptors-1.0.2.tgz",
@@ -1149,6 +1215,25 @@
         "node": ">=10.0.0"
       }
     },
+    "node_modules/source-map": {
+      "version": "0.6.1",
+      "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
+      "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g==",
+      "license": "BSD-3-Clause",
+      "engines": {
+        "node": ">=0.10.0"
+      }
+    },
+    "node_modules/source-map-support": {
+      "version": "0.5.21",
+      "resolved": "https://registry.npmjs.org/source-map-support/-/source-map-support-0.5.21.tgz",
+      "integrity": "sha512-uBHU3L3czsIyYXKX88fdrGovxdSCoTGDRZ6SYXtSRxLZUzHg5P/66Ht6uoUlHu9EZod+inXhKo3qQgwXUT/y1w==",
+      "license": "MIT",
+      "dependencies": {
+        "buffer-from": "^1.0.0",
+        "source-map": "^0.6.0"
+      }
+    },
     "node_modules/statuses": {
       "version": "2.0.1",
       "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz",
@@ -1158,6 +1243,18 @@
         "node": ">= 0.8"
       }
     },
+    "node_modules/supports-color": {
+      "version": "7.2.0",
+      "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz",
+      "integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==",
+      "license": "MIT",
+      "dependencies": {
+        "has-flag": "^4.0.0"
+      },
+      "engines": {
+        "node": ">=8"
+      }
+    },
     "node_modules/toidentifier": {
       "version": "1.0.1",
       "resolved": "https://registry.npmjs.org/toidentifier/-/toidentifier-1.0.1.tgz",
@@ -1405,6 +1502,14 @@
         "negotiator": "0.6.3"
       }
     },
+    "ansi-styles": {
+      "version": "4.3.0",
+      "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz",
+      "integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==",
+      "requires": {
+        "color-convert": "^2.0.1"
+      }
+    },
     "array-flatten": {
       "version": "1.1.1",
       "resolved": "https://registry.npmjs.org/array-flatten/-/array-flatten-1.1.1.tgz",
@@ -1464,6 +1569,11 @@
         }
       }
     },
+    "buffer-from": {
+      "version": "1.1.2",
+      "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.2.tgz",
+      "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ=="
+    },
     "bytes": {
       "version": "3.1.2",
       "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.2.tgz",
@@ -1481,6 +1591,28 @@
         "set-function-length": "^1.2.1"
       }
     },
+    "chalk": {
+      "version": "4.1.0",
+      "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.0.tgz",
+      "integrity": "sha512-qwx12AxXe2Q5xQ43Ac//I6v5aXTipYrSESdOgzrN+9XjgEpyjpKuvSGaN4qE93f7TQTlerQQ8S+EQ0EyDoVL1A==",
+      "requires": {
+        "ansi-styles": "^4.1.0",
+        "supports-color": "^7.1.0"
+      }
+    },
+    "color-convert": {
+      "version": "2.0.1",
+      "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz",
+      "integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==",
+      "requires": {
+        "color-name": "~1.1.4"
+      }
+    },
+    "color-name": {
+      "version": "1.1.4",
+      "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz",
+      "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA=="
+    },
     "combined-stream": {
       "version": "1.0.8",
       "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz",
@@ -1763,6 +1895,11 @@
         "get-intrinsic": "^1.1.3"
       }
     },
+    "has-flag": {
+      "version": "4.0.0",
+      "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz",
+      "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ=="
+    },
     "has-property-descriptors": {
       "version": "1.0.2",
       "resolved": "https://registry.npmjs.org/has-property-descriptors/-/has-property-descriptors-1.0.2.tgz",
@@ -2071,11 +2208,33 @@
         "debug": "~4.3.1"
       }
     },
+    "source-map": {
+      "version": "0.6.1",
+      "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz",
+      "integrity": "sha512-UjgapumWlbMhkBgzT7Ykc5YXUT46F0iKu8SGXq0bcwP5dz/h0Plj6enJqjz1Zbq2l5WaqYnrVbwWOWMyF3F47g=="
+    },
+    "source-map-support": {
+      "version": "0.5.21",
+      "resolved": "https://registry.npmjs.org/source-map-support/-/source-map-support-0.5.21.tgz",
+      "integrity": "sha512-uBHU3L3czsIyYXKX88fdrGovxdSCoTGDRZ6SYXtSRxLZUzHg5P/66Ht6uoUlHu9EZod+inXhKo3qQgwXUT/y1w==",
+      "requires": {
+        "buffer-from": "^1.0.0",
+        "source-map": "^0.6.0"
+      }
+    },
     "statuses": {
       "version": "2.0.1",
       "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz",
       "integrity": "sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ=="
     },
+    "supports-color": {
+      "version": "7.2.0",
+      "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz",
+      "integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==",
+      "requires": {
+        "has-flag": "^4.0.0"
+      }
+    },
     "toidentifier": {
       "version": "1.0.1",
       "resolved": "https://registry.npmjs.org/toidentifier/-/toidentifier-1.0.1.tgz",

+ 2 - 0
package.json

@@ -22,11 +22,13 @@
   },
   "dependencies": {
     "axios": "^1.7.7",
+    "chalk": "^4.1.0",
     "dotenv": "^16.4.5",
     "express": "^4.21.1",
     "rxjs": "^7.8.1",
     "socket.io": "^4.8.0",
     "socket.io-client": "^4.8.0",
+    "source-map-support": "^0.5.21",
     "uuid": "^10.0.0"
   }
 }

+ 1 - 1
src/config/config.json

@@ -1,5 +1,5 @@
 {
     "connection": {
-        "transmitter": "http://localhost:3000/"
+        "transmitter": "http://localhost:3001/"
     }
 }

+ 3 - 3
src/connector/connector.base.ts → src/connector/adapter.base.ts

@@ -1,11 +1,11 @@
-import { BehaviorSubject, Observable, Observer, Subject, Subscriber, Unsubscribable } from "rxjs";
+import { Observable, Subject } from "rxjs";
 import dotenv from 'dotenv';
-import { AdapterProfile, AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionState, Transport, TransportEvent,  TransportService } from "../interface/connector.interface";
+import { AdaptorTransmissionRole, AdaptorBase, ConnectionState, Transport, TransportEvent, TransportService, AdapterProfile } from "../interface/connector.interface";
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class ConnectionAdapter implements ConnectionAdaptorBase {
+export class Adapter implements AdaptorBase {
     event!: Subject<TransportEvent>
     connector!: TransportService;
     connectorProfile!: AdapterProfile;

+ 41 - 37
src/connector/connector.manager.ts → src/connector/adapter.manager.ts

@@ -1,14 +1,17 @@
-import { ConnectionManager as ConnectionManagerInterface, AdapterSet, TransportService, TransportEvent, Transport } from "../interface/connector.interface"
-import { TransmitterConnectionAdapter } from './connector.transmitter'
-import { ReceiverConnectionAdapter } from './connector.receiver'
-import { RequestResponseConnectionAdapter } from './connector.request.response'
+import { AdapterManager as AdapterManagerInterface, TransportService, TransportEvent, Transport } from "../interface/connector.interface"
+import { TransmitterAdapter } from './adapter.transmitter'
+import { ReceiverAdapter } from './adapter.receiver'
 import { v4 as uuidv4 } from 'uuid'
 import { Subject } from "rxjs"
 import { WebsocketTransportService } from "../transport/websocket"
 import { HttpTransportService } from "../transport/http"
 import config from '../config/config.json';
+import ConsoleLogger from "../utils/log.utils"
+import { RequestResponseAdapter } from "./adapter.request.response"
+import { AdapterSet } from "../interface/general.interface"
 
-export class ConnectionManager implements ConnectionManagerInterface {
+export class AdapterManager implements AdapterManagerInterface {
+    private console: ConsoleLogger = new ConsoleLogger(`AdapterManager`, ['managers'])
     private transportServiceArray: TransportService[] = []
     private transportSet: Set<TransportSet> = new Set()
     private adapterSet: AdapterSet[] = []
@@ -16,7 +19,7 @@ export class ConnectionManager implements ConnectionManagerInterface {
 
     constructor(event: Subject<TransportEvent>, browserEnv?: boolean | undefined) {
         this.event = event
-        console.log(`Connection Manager: Contructing ConnectionManager....`)
+        this.console.log({ message: `Contructing self...` })
 
         this.sort(this.transportSet)
         this.transportSet.forEach(set => {
@@ -24,34 +27,32 @@ export class ConnectionManager implements ConnectionManagerInterface {
         })
     }
 
-    async getAdapter(clientId: string): Promise<AdapterSet> {
-        return new Promise((resolve, reject) => {
-            console.log(`Instantiating an adapter set....`)
-            let transportType: Transport = process.env.Transport as unknown as Transport // as default  for now
-            let adapterId: string = clientId
-            let transportService: TransportService | undefined = this.transportServiceArray.find(obj => obj.getInfo() == transportType)
-            if (transportService) {
-                let transmitterAdapter: TransmitterConnectionAdapter = new TransmitterConnectionAdapter(adapterId, transportType, transportService)
-                let receiverAdapter: ReceiverConnectionAdapter = new ReceiverConnectionAdapter(adapterId, transportType, transportService)
-                let adapterSet: AdapterSet = {
-                    id: adapterId,
-                    dateCreated: new Date(),
-                    transmitterAdapter: transmitterAdapter,
-                    receiverAdapter: receiverAdapter,
-                    requestResponsAdapter: new RequestResponseConnectionAdapter(transmitterAdapter, receiverAdapter)
-                }
-                this.adapterSet.push(adapterSet)
-
-                this.event.next({
-                    id: uuidv4(),
-                    event: 'New Adapter',
-                    data: adapterId
-                } as TransportEvent)
-                resolve(adapterSet)
-            } else {
-                reject(`Connection Manager: No Transport Service Instantiated!`)
+    getAdapter(clientId: string): AdapterSet | null {
+        this.console.log({ message: `Instantiating an adapter set....` })
+        let transportType: Transport = process.env.Transport as unknown as Transport // as default  for now
+        let adapterId: string = clientId
+        let transportService: TransportService | undefined = this.transportServiceArray.find(obj => obj.getInfo() == transportType)
+        if (transportService) {
+            let transmitterAdapter: TransmitterAdapter = new TransmitterAdapter(adapterId, transportType, transportService)
+            let receiverAdapter: ReceiverAdapter = new ReceiverAdapter(adapterId, transportType, transportService)
+            let adapterSet: AdapterSet = {
+                id: adapterId,
+                dateCreated: new Date(),
+                transmitterAdapter: transmitterAdapter,
+                receiverAdapter: receiverAdapter,
+                requestResponsAdapter: new RequestResponseAdapter(transmitterAdapter, receiverAdapter)
             }
-        })
+            this.adapterSet.push(adapterSet)
+
+            this.event.next({
+                id: uuidv4(),
+                event: 'New Adapter',
+                data: adapterId
+            } as TransportEvent)
+            return adapterSet
+        } else {
+            return null
+        }
     }
 
     public getTransportArray(): TransportService[] {
@@ -64,7 +65,7 @@ export class ConnectionManager implements ConnectionManagerInterface {
         this.instantiateTransportService(transportSet.transport, event).then((transportService: TransportService) => {
             this.transportServiceArray.push(transportService)
             if (transportService instanceof WebsocketTransportService) {
-                console.log(`Just Double Checking... this is websocket`)
+                this.console.log({ message: `Just Double Checking... this is websocket` })
                 if (isClient) {
                     // please note this is subject to change depending on the UI environemnt. Angular has their own built in function to read json file based on Swopt-UI
                     transportService.startClient(config.connection.transmitter)
@@ -72,7 +73,7 @@ export class ConnectionManager implements ConnectionManagerInterface {
                     transportService.startServer(transportSet.port);
                 }
             } else if (transportService instanceof HttpTransportService) {
-                console.log(`Just Double Checking... this is http`)
+                this.console.log({ message: `Just Double Checking... this is http` })
                 // Additional Http-specific setup if needed.
                 if (isClient) {
                     transportService.startClient(`http://localhost:3000`)
@@ -80,7 +81,10 @@ export class ConnectionManager implements ConnectionManagerInterface {
                     transportService.startServer(transportSet.port)
                 }
             }
-        }).catch((error) => { throw new Error(error) })
+        }).catch((error) => {
+            this.console.log({ message: `Promise Error`, details: error })
+            throw new Error(error)
+        })
     }
 
     private async instantiateTransportService(transportType: Transport, event: Subject<TransportEvent>): Promise<TransportService> {
@@ -102,7 +106,7 @@ export class ConnectionManager implements ConnectionManagerInterface {
         transportList.forEach((transport, index) => {
             transportSet.add({ transport: transport, port: portList[index] } as unknown as TransportSet)
         })
-        console.log(this.transportSet)
+        this.console.log({ message: 'TransportSetList', details: this.transportSet })
     }
 
 }

+ 12 - 12
src/connector/connector.receiver.ts → src/connector/adapter.receiver.ts

@@ -1,18 +1,24 @@
 import dotenv from 'dotenv';
-import { Bus, FisMessage } from "../interface/transport.interface";
-import { ConnectionAdapter } from "./connector.base";
-import { ReceiverConnectionAdapter as ReceiverConnectionAdapterInterface, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
+import { Bus, EventMessage, FisMessage } from "../interface/transport.interface";
+import { ReceiverAdapter as ReceiverAdapterInterface, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { filter, Observable, Observer, Subscription, takeWhile } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
+import { Adapter } from './adapter.base';
+import ConsoleLogger from '../utils/log.utils';
+import { WrappedMessage } from '../utils/message.ordering';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class ReceiverConnectionAdapter extends ConnectionAdapter implements ReceiverConnectionAdapterInterface {
+export class ReceiverAdapter extends Adapter implements ReceiverAdapterInterface {
+    private console!: ConsoleLogger
+
     constructor(adapterId: string, transportType: Transport, transportService: TransportService) {
         super()
+        this.console = new ConsoleLogger(`${transportType}ReceiverAdapter`, ['adapter'])
         this.connector = transportService
         this.setAdapterProfile(adapterId, transportType)
+        this.console.log({ message: `Just testing to see if receiverAdapter is instantiated properly ${this.connectorProfile, this.connector ? 'TransportService Instantiated' : 'Trnasport Service not instantiated'}` })
     }
 
     setAdapterProfile(id: string, transportType: Transport): void {
@@ -23,21 +29,15 @@ export class ReceiverConnectionAdapter extends ConnectionAdapter implements Rece
     }
 
     getMessageBus(bus: Bus): Observable<TransportEvent> {
-        console.log(`Connector getting message bus`)
+        this.console.log({ message: `Connector getting message bus for this connector: ${this.connectorProfile.id}` })
         return new Observable((observable: Observer<TransportEvent>) => {
             if (bus == Bus.GeneralBus) {
                 const subscription: Subscription = this.connector.subscribe().pipe(
                     filter((message: TransportEvent) => message.event === 'New Message'),
                     // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
                     filter((message: TransportEvent) => (message.data as TransportMessage).target == this.connectorProfile.id),
-                    takeWhile((message: TransportEvent) => {
-                        const shouldTake = ((message.data as TransportMessage).payload as FisMessage).data !== 'Complete';
-                        if (!shouldTake) {
-                            observable.complete();  // Ensure the observer is completed
-                        }
-                        return shouldTake;
-                    }),
                 ).subscribe((message: TransportEvent) => {
+                    this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).target)}`, details: message })
                     observable.next(message);
                 });
 

+ 8 - 8
src/connector/connector.request.response.ts → src/connector/adapter.request.response.ts

@@ -1,20 +1,20 @@
  import dotenv from 'dotenv';
 import { Bus, FisMessage, TransmissionMessage } from "../interface/transport.interface";
-import { ConnectionAdapter } from "./connector.base";
-import { RequestResponseConnectionAdapter as RequestResponseConnectionAdapterInterface, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
-import { TransmitterConnectionAdapter } from './connector.transmitter';
-import { ReceiverConnectionAdapter } from './connector.receiver';
+import { RequestResponseAdapter as RequestResponseAdapterInterface, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { filter, map, Observable, Observer, Subscription, takeWhile } from 'rxjs';
 import { WrappedMessage } from '../utils/message.ordering';
+import { Adapter } from './adapter.base';
+import { ReceiverAdapter } from './adapter.receiver';
+import { TransmitterAdapter } from './adapter.transmitter';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class RequestResponseConnectionAdapter extends ConnectionAdapter implements RequestResponseConnectionAdapterInterface {
-    private transmitterAdapter!: TransmitterConnectionAdapter
-    private receiverAdapter!: ReceiverConnectionAdapter
+export class RequestResponseAdapter extends Adapter implements RequestResponseAdapterInterface {
+    private transmitterAdapter!: TransmitterAdapter
+    private receiverAdapter!: ReceiverAdapter
 
-    constructor( transmitterAdapter: TransmitterConnectionAdapter, receiverAdapter: ReceiverConnectionAdapter) {
+    constructor( transmitterAdapter: TransmitterAdapter, receiverAdapter: ReceiverAdapter) {
         super()
         // logic here
         this.transmitterAdapter = transmitterAdapter

+ 7 - 5
src/connector/connector.transmitter.ts → src/connector/adapter.transmitter.ts

@@ -1,27 +1,29 @@
 import dotenv from 'dotenv';
 import { FisMessage } from "../interface/transport.interface";
-import { ConnectionAdapter } from "./connector.base";
-import { ConnectionState, TransmitterConnectionAdapter as TransmitterConnectionAdapterInterface, Transport, TransportMessage, TransportService } from '../interface/connector.interface';
+import { Adapter } from "./adapter.base";
+import { ConnectionState, TransmitterAdapter as TransmitterAdapterInterface, Transport, TransportMessage, TransportService } from '../interface/connector.interface';
 import { Subject } from 'rxjs';
-import { v4 as uuidv4 } from 'uuid'
 import { WrappedMessage } from '../utils/message.ordering';
+import ConsoleLogger from '../utils/log.utils';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class TransmitterConnectionAdapter extends ConnectionAdapter implements TransmitterConnectionAdapterInterface {
+export class TransmitterAdapter extends Adapter implements TransmitterAdapterInterface {
+    private console!: ConsoleLogger
     connectionStateBus: Subject<ConnectionState> = new Subject()
 
     constructor(adapterId: string, adapterType: Transport, transportService: TransportService) {
         super()
         // logic here
+        this.console = new ConsoleLogger(`${adapterType}TransmitterAdapter`, ['adapter'])
         this.connector = transportService
         this.setAdapterProfile(adapterId, adapterType)
     }
 
     emit(message: WrappedMessage): void {
         // logic here
-        console.log(`Adapter Transmission Level. Emitting: `, (message.payload as FisMessage).header.messageID)
+        this.console.log({ message: `Emitting: ${message.thisMessageID} to ${this.connectorProfile.id}` })
         this.connector.emit({
             id: this.connectorProfile.id,
             transport: this.connectorProfile.transportType,

+ 9 - 19
src/interface/connector.interface.ts

@@ -1,7 +1,7 @@
 import { BehaviorSubject, Observable, Subject } from "rxjs"
-import { Bus, FisMessage, ReceiverProfile, TransmissionMessage, TransmissionProfile, TransmitterProfile } from "./transport.interface"
+import { Bus, FisMessage, ReceiverProfile, TransmissionProfile, TransmitterProfile } from "./transport.interface"
 import { WrappedMessage } from "../utils/message.ordering"
-
+import { AdapterSet } from "./general.interface"
 
 export type TYPE = {
     adapterProfile: AdapterProfile,
@@ -16,11 +16,11 @@ export interface AdapterProfile {
 }
 
 
-export interface ConnectionManager {
-    getAdapter(clientId: string, transportService: TransportService): Promise<AdapterSet>
+export interface AdapterManager {
+    getAdapter(clientId: string, transportService: TransportService): AdapterSet | null
 }
 
-export interface ConnectionAdaptorBase {
+export interface AdaptorBase {
     connector: TransportService // this one will refer to the actual tranpsort service like websocket and so on
     connectorProfile: AdapterProfile
     connectionStateBus: Subject<ConnectionState>
@@ -36,15 +36,15 @@ export interface ConnectionAdaptorBase {
 }
 
 
-export interface TransmitterConnectionAdapter extends ConnectionAdaptorBase {
+export interface TransmitterAdapter extends AdaptorBase {
     emit(message: WrappedMessage): void
 }
 
-export interface ReceiverConnectionAdapter extends ConnectionAdaptorBase {
+export interface ReceiverAdapter extends AdaptorBase {
     getMessageBus(bus: Bus): Observable<any>
 }
 
-export interface RequestResponseConnectionAdapter extends TransmitterConnectionAdapter, ReceiverConnectionAdapter {
+export interface RequestResponseAdapter extends TransmitterAdapter, ReceiverAdapter {
     send(message: WrappedMessage): Observable<FisMessage>
 }
 
@@ -77,7 +77,7 @@ export interface TransportEvent {
     data: any
 }
 
-export type Event = 'Server Started' | 'New Client' | 'Client Connected' |'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter' | 'Re-Flush'
+export type Event = 'Server Started' | 'New Client' | 'Client Connected' | 'Client Re-connected' | 'Client Disconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `Server Connected` | `New Transport` | 'New Adapter' | 'Re-Flush'
 
 export interface TransportService {
     getInfo(): Transport
@@ -85,20 +85,10 @@ export interface TransportService {
     subscribe(): Observable<TransportEvent> //all messages and whatever event will go through this, easier to implemnet across different transport protocol
 }
 
-
 export interface Info {
     transport: Transport
 }
 
-
-export interface AdapterSet {
-    id: string,
-    dateCreated: Date,
-    transmitterAdapter: TransmitterConnectionAdapter,
-    receiverAdapter: ReceiverConnectionAdapter,
-    requestResponsAdapter: RequestResponseConnectionAdapter
-}
-
 export interface ClientObject {
     id: string,
     dateCreated: Date,

+ 11 - 0
src/interface/general.interface.ts

@@ -0,0 +1,11 @@
+import { ReceiverAdapter } from "../connector/adapter.receiver";
+import { RequestResponseAdapter } from "../connector/adapter.request.response";
+import { TransmitterAdapter } from "../connector/adapter.transmitter";
+
+export interface AdapterSet {
+    id: string,
+    dateCreated: Date,
+    transmitterAdapter: TransmitterAdapter,
+    receiverAdapter: ReceiverAdapter,
+    requestResponsAdapter: RequestResponseAdapter
+}

+ 5 - 5
src/interface/transport.interface.ts

@@ -1,12 +1,12 @@
 import { Observable, Subject } from "rxjs";
-import { AdapterProfile, AdapterSet, AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionManager, RequestResponseConnectionAdapter, TransportEvent, TransportService } from "./connector.interface";
+import { AdaptorTransmissionRole, TransportEvent } from "./connector.interface";
 import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
-import { ConnectionAdapter } from "../connector/connector.base";
 import { RetransmissionService } from "../utils/retransmission.service";
-
+import { Adapter } from "../connector/adapter.base";
 export interface MessageTransmissionManager {
     subscribe(): Observable<MessageTransmission>
+    getEvent(): Observable<TransportEvent>
 }
 
 export interface MessageTransmission {
@@ -20,10 +20,10 @@ export interface MessageTransmission {
 export interface MessageTransmissionBase {
     msgRepositoryService: any // like logging service and what not
     transmissionRole: AdaptorTransmissionRole
-    mainAdapter: ConnectionAdapter
+    mainAdapter: Adapter
 
     getInfo(): TransmissionProfile
-    setUpAdapter(adapter: ConnectionAdapter): void
+    setUpAdapter(adapter: Adapter): void
 }
 
 export interface MessageReceiver extends MessageTransmissionBase {

+ 14 - 7
src/test/receiver.ts

@@ -4,22 +4,28 @@ import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { WrappedMessage } from "../utils/message.ordering";
+import ConsoleLogger from "../utils/log.utils";
 
 class Supervisor {
+    private generalBus: Subject<TransportEvent> = new Subject()
+    private console = new ConsoleLogger('Supervisor', ['base'])
     private isClient: boolean = true
     private transmissionManager!: MessageTransmissionManager
-    private event: Subject<TransportEvent> = new Subject()
+    private event: Observable<TransportEvent>
     private transmissionSets: MessageTransmission[] = []
     private outgoingPipe: Subject<any> = new Subject()
 
     constructor() {
-        this.transmissionManager = new MessageTransmissionManager(this.event, this.isClient)
+        this.transmissionManager = new MessageTransmissionManager(this.isClient)
+        this.event = this.transmissionManager.getEvent()
 
         this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmission) => {
+            this.console.log({ message: `Acquired transmission set for client` })
             this.transmissionSets.push(transmissionSet)
 
             this.handleActivity(transmissionSet)
             this.outgoingPipe.subscribe(message => transmissionSet.transmitter.emit(message))
+
         })
 
     }
@@ -28,7 +34,8 @@ class Supervisor {
     private handleActivity(messageTransmission: MessageTransmission): void {
         // start listening to incoming messages from this client
         messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransportEvent) => {
-            console.log(`General Bus`, event)
+            this.console.log({ message: `General Bus ${event.event}`, details: event })
+            this.generalBus.next(event)
         })
 
         let request: FisMessage = {
@@ -40,8 +47,8 @@ class Supervisor {
         }
 
         // this.request(request, messageTransmission).subscribe({
-        //     next: res => console.log(res),
-        //     complete: () => console.log(`Responses Completed for request: ${request.header.messageID}`)
+        //     next: res => this.console.log({ message: `received ${res.header.messageID}`, details: res }),
+        //     complete: () => this.console.log({ message: `Responses Completed for request: ${request.header.messageID}` })
         // })
 
         this.startGeneratingRequest(1000, this.outgoingPipe)
@@ -50,7 +57,7 @@ class Supervisor {
     private request(request: FisMessage, messageTransmission: MessageTransmission): Observable<any> {
         return new Observable((response: Observer<any>) => {
             messageTransmission.transmitter.emit(request)
-            messageTransmission.receiver.getMessageBus(Bus.GeneralBus).pipe(
+            this.generalBus.pipe(
                 filter(event => event.event == 'New Message'),
                 filter(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID === request.header.messageID),
                 map(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage))
@@ -64,7 +71,7 @@ class Supervisor {
         })
     }
 
-    
+
     private startGeneratingRequest(intervalDuration: number, requestsPipe: Subject<FisMessage>) {
         interval(intervalDuration).subscribe(time => {
             let message: FisMessage = {

+ 11 - 9
src/test/transmitter.ts

@@ -1,21 +1,23 @@
 import { filter, interval, map, Observable, Observer, Subject, take } from "rxjs";
-import { Bus, FisMessage, MessageTransmission } from "../interface/transport.interface";
+import { Bus, EventMessage, FisMessage, MessageTransmission } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { WrappedMessage } from "../utils/message.ordering";
-
+import ConsoleLogger from "../utils/log.utils";
 class Supervisor {
+    private console = new ConsoleLogger('Supervisor', ['base'])
     private clientIncomingMessage: Subject<FisMessage> = new Subject()
     private messageProducer!: MessageProducer
     private transmissionManager!: MessageTransmissionManager
-    private event: Subject<TransportEvent> = new Subject()
+    private event!: Observable<TransportEvent>
     private transmissionSets: MessageTransmission[] = []
 
     constructor() {
         // so need them adapters now. But supervisor shouldn't be concerned, only messageTransmissionManager and ConnectionManager
         this.messageProducer = new MessageProducer(this.clientIncomingMessage)
-        this.transmissionManager = new MessageTransmissionManager(this.event)
+        this.transmissionManager = new MessageTransmissionManager()
+        this.event = this.transmissionManager.getEvent()
 
         this.transmissionManager.subscribe().subscribe((transmissionSet: MessageTransmission) => {
             this.transmissionSets.push(transmissionSet)
@@ -28,7 +30,7 @@ class Supervisor {
     private handleClientActivity(messageTransmission: MessageTransmission): void {
         // start listening to incoming messages from this client
         messageTransmission.receiver.getMessageBus(Bus.GeneralBus).subscribe((event: TransportEvent) => {
-            console.log(`General Bus`, event) // receiving end
+            this.console.log({ message: `General Bus`, details: event }) // receiving end
             let requestMessage: FisMessage = ((event.data as TransportMessage).payload as WrappedMessage).payload as FisMessage
             // this.clientIncomingMessage.next(requestMessage)
             this.messageProducer.getOutgoingMessages().pipe(
@@ -43,17 +45,17 @@ class Supervisor {
         //     messageTransmission.transmitter.emit(message)
         // })
     }
-
 }
 
 
 class MessageProducer {
+    private console = new ConsoleLogger('Message Producer', ['base'])
     private generalNotification: Subject<FisMessage> = new Subject()
     private incomingMessageBus!: Subject<FisMessage>
     private outgoingMessageBus: Subject<FisMessage> = new Subject()
 
     constructor(incomingMessageBus: Subject<FisMessage>) {
-        console.log(`Contructing Application....`)
+        this.console.log({ message: `Constructing Message Producer`})
         this.incomingMessageBus = incomingMessageBus
 
         this.generateNotifcation().subscribe(this.generalNotification)
@@ -71,10 +73,10 @@ class MessageProducer {
     // this is called no problem
     private handleIncomingRequests(requests: Observable<FisMessage>, outgoingMessageBus: Subject<FisMessage>): void {
         requests.subscribe((request: FisMessage) => {
-            console.log(`Generating response for new request ${request.header.messageID}`)
+            this.console.log({ message: `Generating response for new request ${request.header.messageID}`})
             this.generateMessage(request.header.messageID, 10).subscribe({
                 next: message => outgoingMessageBus.next(message),
-                error: error => console.error(error),
+                error: error => this.console.log({ message: 'observer Error', details: error }),
                 complete: () => {
                     outgoingMessageBus.next({
                         header: {

+ 8 - 8
src/transmission/msg.transmission.base.ts

@@ -1,19 +1,19 @@
 
-import { filter, Observable, Observer, Subject, Subscription, takeWhile, Unsubscribable } from 'rxjs';
-import { AdapterProfile, AdapterSet, AdaptorTransmissionRole, TransportEvent } from '../interface/connector.interface';
-import { Bus, FisMessage, MessageTransmissionBase as MessageTransmissionBaseInterface, TransmissionProfile } from '../interface/transport.interface'
+import { Observable } from 'rxjs';
+import { AdaptorTransmissionRole, TransportEvent } from '../interface/connector.interface';
+import { MessageTransmissionBase as MessageTransmissionBaseInterface, TransmissionProfile } from '../interface/transport.interface'
 import { v4 as uuidv4 } from 'uuid'
-import { ConnectionAdapter } from '../connector/connector.base';
+import { Adapter } from '../connector/adapter.base';
 
 export class MessageTransmissionBase implements MessageTransmissionBaseInterface {
     event!: Observable<TransportEvent>
     msgRepositoryService: any;
     transmissionRole!: AdaptorTransmissionRole;
-    adaptorsArray: Array<ConnectionAdapter> = []
+    adaptorsArray: Array<Adapter> = []
     transmissionService: any;
-    mainAdapter!: ConnectionAdapter;
+    mainAdapter!: Adapter;
 
-    constructor(event: Observable<TransportEvent>) {
+    constructor() {
         // logic here
     }
 
@@ -21,7 +21,7 @@ export class MessageTransmissionBase implements MessageTransmissionBaseInterface
         throw new Error(`Method not implemented`)
     }
 
-    setUpAdapter(adapter: ConnectionAdapter): void {
+    setUpAdapter(adapter: Adapter): void {
         throw new Error(`Method not implemented`)
     }
 

+ 56 - 56
src/transmission/msg.transmission.manager.ts

@@ -1,33 +1,34 @@
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
-import { ConnectionManager } from "../connector/connector.manager";
+import { AdapterManager } from "../connector/adapter.manager";
 import { EventMessage, MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmitterProfile } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
-import { AdapterSet, TransportEvent, Event } from "../interface/connector.interface";
+import { TransportEvent, Event } from "../interface/connector.interface";
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
 import { filter, Observable, Observer, Subject } from "rxjs";
-import { error } from "console";
-import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
-import { ReceiverConnectionAdapter } from "../connector/connector.receiver";
+import ConsoleLogger from "../utils/log.utils";
+import { TransmitterAdapter } from "../connector/adapter.transmitter";
+import { ReceiverAdapter } from "../connector/adapter.receiver";
+import { AdapterSet } from "../interface/general.interface";
 
 export class MessageTransmissionManager implements MessageTransmissionManagerInterface {
+    private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionManager`, ['managers'])
     private browserEnv!: boolean
     transmission: MessageTransmission[] = []
-    connectionManager!: ConnectionManager
+    connectionManager!: AdapterManager
     event!: Subject<TransportEvent>
 
-    constructor(event: Subject<TransportEvent>, browserEnv?: boolean) {
+    constructor(browserEnv?: boolean) {
         if (browserEnv) this.browserEnv = browserEnv
         // logic here
-        console.log(`TransmissionManager: Contructing Transmission Manager...`)
-        this.event = event
-        this.connectionManager = new ConnectionManager(this.event, browserEnv)
+        this.console.log({ message: `Constructing self...` })
+        this.event = new Subject()
+        this.connectionManager = new AdapterManager(this.event, browserEnv)
 
-        // this.event.subscribe(event => console.log(`event`, event))
+        // this.event.subscribe(event => this.console.log({ message: 'event', details: event }))
 
         // note that if this server is down, all these instances of transmission and connector would be lost as well. SO cannot just simply find "instances" and reuse them. Must reinstantiate them again
-        this.handleEvent(`Client Disconnected` as Event, event)
-        this.handleEvent('Client Reconnected' as Event, event)
+        this.handleEvent('Client Re-connected' as Event, this.event)
     }
 
     /* so there will be some changes here. will nto be assigning just one, but all of them dynamically to pour into this boy
@@ -39,54 +40,55 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
                 filter(event => event.event == targetEvent)
             ).subscribe(event => {
                 // get all adapters for all the connection
-                this.instantiateComponents((event.data as EventMessage).clientId).then((messageTransmission: MessageTransmission) => {
-                    observer.next(messageTransmission)
-                }).catch(error => {
-                    console.error(error)
-                })
+                let messageTransmissionSet: MessageTransmission | undefined = this.instantiateComponents((event.data as EventMessage).clientId)
+                if (messageTransmissionSet) {
+                    observer.next(messageTransmissionSet)
+                }
             })
         })
     }
 
-    private async instantiateComponents(clientId: string): Promise<MessageTransmission> {
-        return new Promise(async (resolve, reject) => {
-            console.log(`Instantiating new transmission set for another ${this.browserEnv ? 'Server' : 'Client'}`)
-            if (this.connectionManager.getTransportArray().length > 0) {
-                await this.connectionManager.getAdapter(clientId).then((adapterSet: AdapterSet) => {
-                    // 1 set only
-                    let transmitter: MessageTransmissionTransmitter = this.getTransmitter(clientId, adapterSet.transmitterAdapter, this.event.asObservable())
-                    let receiver: MessageTransmissionReceiver = this.getReceiver(clientId, adapterSet.receiverAdapter, this.event.asObservable())
-                    let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver, this.event.asObservable())
-                    let transmission: MessageTransmission = {
-                        id: clientId,
-                        transmitter: transmitter,
-                        receiver: receiver,
-                        requestResponse: requestResponse,
-                        event: this.event.asObservable()
-                    }
-                    this.transmission.push(transmission)
-                    resolve(transmission)
-                }).catch((error) => console.error(error))
+    getEvent(): Observable<TransportEvent> {
+        return this.event.asObservable()
+    }
+
+    private instantiateComponents(clientId: string): MessageTransmission | undefined {
+        this.console.log({ message: `Instantiating new transmission set for  ${this.browserEnv ? 'Server' : 'Client'}: ${clientId}` })
+        if (this.connectionManager.getTransportArray().length > 0) {
+            let adapterSet: AdapterSet | null = this.connectionManager.getAdapter(clientId)
+            if (adapterSet) {
+                let transmitter: MessageTransmissionTransmitter = this.getTransmitter(clientId, adapterSet.transmitterAdapter, this.event.asObservable())
+                let receiver: MessageTransmissionReceiver = this.getReceiver(clientId, adapterSet.receiverAdapter, this.event.asObservable())
+                let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver, this.event.asObservable())
+                let transmission: MessageTransmission = {
+                    id: clientId,
+                    transmitter: transmitter,
+                    receiver: receiver,
+                    requestResponse: requestResponse,
+                    event: this.event.asObservable()
+                }
+                this.transmission.push(transmission)
+                return transmission
             } else {
-                reject(`Transmission Manager: No transport is Instantiated`)
+                this.console.error({ message: 'No Adapter Set' })
+                return undefined
             }
-        })
+        }
     }
 
-
-    private getTransmitter(transmissionId: string, adapter: TransmitterConnectionAdapter, event: Observable<TransportEvent>): MessageTransmissionTransmitter {
+    private getTransmitter(transmissionId: string, adapter: TransmitterAdapter, event: Observable<TransportEvent>): MessageTransmissionTransmitter {
         let transmitterProfile: TransmitterProfile = {
             id: transmissionId,
-            name: '', // for now make it empty. We will use the assigned uuid here
+            name: `${adapter.getInfo().transportType} Transmitter Adapter`,
             dateCreated: new Date()
         }
         return new MessageTransmissionTransmitter(transmitterProfile, adapter, event)
     }
 
-    private getReceiver(transmissionId: string, adapter: ReceiverConnectionAdapter, event: Observable<TransportEvent>): MessageTransmissionReceiver {
+    private getReceiver(transmissionId: string, adapter: ReceiverAdapter, event: Observable<TransportEvent>): MessageTransmissionReceiver {
         let receiverProfile: ReceiverProfile = {
             id: transmissionId,
-            name: '', // for now make it empty. We will use the assigned uuid here
+            name: `${adapter.getInfo().transportType} Receiver Adapter`,
             dateCreated: new Date()
         }
         return new MessageTransmissionReceiver(receiverProfile, adapter, event)
@@ -96,32 +98,30 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
         return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, event)
     }
 
-
-
     private handleEvent(eventName: Event, eventObs: Observable<TransportEvent>): void {
         eventObs.pipe(
             filter((event: TransportEvent) => event.event === eventName)
         ).subscribe(event => {
             // assuming this is reconnection case
-            if (event.event == 'Client Connected') {
-                this.reconnectionHandler((event.data as EventMessage).clientId)
-            }
-
+            this.reconnectionHandler((event.data as EventMessage).clientId)
             // can include more event handlers here
         })
     }
 
     private reconnectionHandler(clientId: string): void {
+        this.console.log({ message: `TransmissionManager: A reconnection occured. Client: ${clientId}` })
         let transmissionObj: MessageTransmission | undefined = Array.from(this.transmission).find(obj => obj.id === clientId)
         if (!transmissionObj) {
-            this.instantiateComponents(clientId).then((messageTransmission: MessageTransmission) => {
-                this.transmission.push(messageTransmission)
-            }).catch((error) => {
-                console.error(error)
-            })
+            let transmissionSet: MessageTransmission | undefined = this.instantiateComponents(clientId)
+            if (transmissionSet) {
+                this.transmission.push(transmissionSet)
+            } else {
+                this.console.error({ message: `Cannot find client transmission obj : ${clientId}` })
+            }
+        } {
+            this.console.log({ message: `Transmission Object for ${clientId} Found`})
         }
     }
-
 }
 
 

+ 17 - 12
src/transmission/msg.transmission.receiver.ts

@@ -1,20 +1,23 @@
 import { filter, Observable, Observer, Subject, Subscription } from 'rxjs';
-import { AdapterSet, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
+import { TransportEvent, TransportMessage } from '../interface/connector.interface';
 import { MessageTransmissionBase } from './msg.transmission.base';
-import { Bus, EventMessage, MessageReceiver as MessageReceiverInterface, ReceiverProfile } from '../interface/transport.interface'
+import { Bus, MessageReceiver as MessageReceiverInterface, ReceiverProfile } from '../interface/transport.interface'
 import { v4 as uuidv4 } from 'uuid'
-import { ReceiverConnectionAdapter } from '../connector/connector.receiver';
+import { ReceiverAdapter } from '../connector/adapter.receiver';
 import { checkMessage, WrappedMessage } from '../utils/message.ordering';
-import { TransmitterConnectionAdapter } from '../connector/connector.transmitter';
-import { ConnectionAdapter } from '../connector/connector.base';
+import ConsoleLogger from '../utils/log.utils';
+import { Adapter } from '../connector/adapter.base';
 
 export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
+    private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission'])
     private onHoldMessage: Subject<WrappedMessage> = new Subject()
     // private toBePassedOver: Subject<WrappedMessage> = new Subject()
     receiverProfile!: ReceiverProfile;
 
-    constructor(profile: ReceiverProfile, adapter: ReceiverConnectionAdapter, event: Observable<TransportEvent>) {
-        super(event);
+    constructor(profile: ReceiverProfile, adapter: ReceiverAdapter, event: Observable<TransportEvent>) {
+        super()
+        this.event = event
+        this.console.log({ message: `Constructing Receiver Transmission with ${profile.name}` })
 
         this.setReceiver(profile)
         this.setUpAdapter(adapter)
@@ -25,23 +28,25 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
     }
 
     getMessageBus(bus: Bus): Observable<TransportEvent> {
-        console.log(`Transmission getting message bus`)
+        this.console.log({ message: `Transmission getting message bus for ${this.receiverProfile.id}` })
         return new Observable((observable: Observer<TransportEvent>) => {
             // logic here
             if (bus == Bus.GeneralBus) {
                 // Need to merge all the adapters into one when the time comes 
                 // SAMPLE: This adapterArray.forEach(adapter => { ... })
-                const subscription: Subscription = (this.mainAdapter as ReceiverConnectionAdapter).getMessageBus(Bus.GeneralBus).pipe( 
+                const subscription: Subscription = (this.mainAdapter as ReceiverAdapter).getMessageBus(Bus.GeneralBus).pipe(
                     filter((event: TransportEvent) => event.event == 'New Message'),
                 ).subscribe((event: TransportEvent) => {
                     // console.log(event) // data is transportMessage instead of eventmessage
                     this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage))
                     checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => {
                         // only release the message before it exists
-                        console.log(`This one passes. Does have previousID. Case for message ordering`) 
+                        this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` })
                         // console.log(((event.data as TransportMessage).payload as WrappedMessage))
                         observable.next(event);
-                    }).catch((error) => console.error(error))
+                    }).catch((error) => {
+                        this.console.log({ message: `Observer Error`, details: error })
+                    })
                 });
 
                 // Clean up on unsubscription
@@ -52,7 +57,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         })
     }
 
-    setUpAdapter(adapter: ConnectionAdapter): void {
+    setUpAdapter(adapter: Adapter): void {
         this.mainAdapter = adapter
     }
 }

+ 4 - 3
src/transmission/msg.transmission.request-response.ts

@@ -5,7 +5,7 @@ import { v4 as uuidv4 } from 'uuid'
 import { TransportEvent } from "../interface/connector.interface";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
-import { ConnectionAdapter } from "../connector/connector.base";
+import { Adapter } from "../connector/adapter.base";
 
 export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface {
     transmitterInstance!: MessageTransmissionTransmitter;
@@ -14,8 +14,9 @@ export class MessageTransmissionRequestResponse extends MessageTransmissionBase
     outgoingMessageBus!: Subject<any>;
 
     constructor(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, event: Observable<TransportEvent>) {
-        super(event)
+        super()
         this.setTransmissionProfile(transmitterInstance, receiverInstance)
+        this.event = event
     }
 
     setTransmissionProfile(transmissionInfo: MessageTransmissionTransmitter, receiverInfo: MessageTransmissionReceiver): void {
@@ -48,7 +49,7 @@ export class MessageTransmissionRequestResponse extends MessageTransmissionBase
         });
     }
 
-    setUpAdapter(adapter: ConnectionAdapter): void {
+    setUpAdapter(adapter: Adapter): void {
         this.mainAdapter = adapter
     }
 }

+ 20 - 15
src/transmission/msg.transmission.transmitter.ts

@@ -1,22 +1,26 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
 import { EventMessage, FisMessage, MessageTransmitter as MessageTransmitterInterface, TransmitterProfile } from '../interface/transport.interface'
-import { AdapterSet, ConnectionAdaptorBase, ConnectionState, Event, Transport, TransportEvent, TransportMessage } from "../interface/connector.interface";
+import { ConnectionState, TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
-import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
-import { BehaviorSubject, distinct, filter, map, Observable, Subject } from "rxjs";
+import { BehaviorSubject, distinct, distinctUntilChanged, filter, map, Observable, Subject } from "rxjs";
 import { RetransmissionService } from "../utils/retransmission.service";
 import { WrappedMessage } from "../utils/message.ordering";
-import { ConnectionAdapter } from "../connector/connector.base";
+import ConsoleLogger from "../utils/log.utils";
+import { Adapter } from "../connector/adapter.base";
+import { TransmitterAdapter } from "../connector/adapter.transmitter";
 
 /* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
 connectors or adapters will have their own identifier*/
 export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
+    private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
+    private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
     private messageToBeTransmitted!: Subject<FisMessage | WrappedMessage>
     transmitterProfile!: TransmitterProfile;
     retransmission!: RetransmissionService;
 
-    constructor(profile: TransmitterProfile, adapter: TransmitterConnectionAdapter, event: Observable<TransportEvent>) {
-        super(event)
+    constructor(profile: TransmitterProfile, adapter: TransmitterAdapter, event: Observable<TransportEvent>) {
+        super()
+        this.console.log({ message: `Constructing Transmitter Transmission with ${profile.name}` })
         this.event = event
         this.messageToBeTransmitted = new Subject()
         this.retransmission = new RetransmissionService()
@@ -28,12 +32,11 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
         this.uniqueHandlerToFlushUnsentMessages(event)
     }
 
-    // by the time this transmission set is instantiated, the connected client would've been online. Need ot manually signal retransmission to release buffer immediately
     setUpRetransmission(): void {
-        let connectionStateEvent = new BehaviorSubject<'OFFLINE' | 'ONLINE'>('ONLINE')
+        this.console.log({ message: `Setting up Retransmission Service...` })
         this.event.pipe(
-            filter(event => event.event == 'New Client' || event.event == 'Client Disconnected' || event.event == 'Client Connected' || event.event == 'Server Disconnected' || event.event == 'Server Connected' || event.event == 'New Server'),
             filter(event => (event.data as EventMessage).clientId == this.transmitterProfile.id),
+            filter(event => event.event == 'Client Disconnected' || event.event == 'Client Re-connected' || event.event == 'Client Connected' || event.event == 'Server Disconnected' || event.event == 'Server Connected'),
             map(event => {
                 if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
                     return 'OFFLINE'
@@ -41,15 +44,17 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
                     return `ONLINE`
                 }
             }),
-            distinct()
+            distinctUntilChanged()
         ).subscribe((signal: ConnectionState) => {
-            connectionStateEvent.next(signal)
+            this.connectionStateEvent.next(signal)
+            if(signal == 'OFFLINE') this.console.error({message: `${this.transmitterProfile.id} disconnected`})
+            if(signal == 'ONLINE') this.console.log({message: `${this.transmitterProfile.id} connected`})
         })
-        this.retransmission.implementRetransmission(this.messageToBeTransmitted, connectionStateEvent, true)
+        this.retransmission.implementRetransmission(this.messageToBeTransmitted, this.connectionStateEvent.asObservable(), true)
         // automatically subscribe to allow released bffered messages to be released
         this.retransmission.returnSubjectForBufferedItems().subscribe((message: WrappedMessage) => {
             // need to work with wrapped messages
-            (this.mainAdapter as TransmitterConnectionAdapter).emit(message)
+            (this.mainAdapter as TransmitterAdapter).emit(message)
         })
     }
 
@@ -58,11 +63,11 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     }
 
     emit(message: FisMessage): void {
-        console.log(`Transmission Transmitter: `, message.header.messageID)
+        this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting ${message.header.messageID}` : `Buffering ${message.header.messageID}`}` })
         this.messageToBeTransmitted.next(message)
     }
 
-    setUpAdapter(adapter: ConnectionAdapter): void {
+    setUpAdapter(adapter: Adapter): void {
         // for now just hardcode to use 1 adapter type until connection manager is further enhacne to configure adapters on the fly
         this.mainAdapter = adapter
     }

+ 11 - 9
src/transport/websocket.ts

@@ -1,13 +1,15 @@
-import { BehaviorSubject, filter, Observable, Subject } from "rxjs";
+import { Observable, Subject } from "rxjs";
 import { Socket as ClientSocket } from 'socket.io-client'
 import { Socket as SocketForConnectedClient } from "socket.io"
 import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
 import { ClientObject, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { WrappedMessage } from "../utils/message.ordering";
 import { FisMessage } from "../interface/transport.interface";
+import ConsoleLogger from "../utils/log.utils";
 
 /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
 export class WebsocketTransportService implements TransportService {
+    private console: ConsoleLogger = new ConsoleLogger(`WebsocketTransportService`, ['transport'])
     private info: Transport = Transport.Websocket
     private connectedServer: ConnectedServerSocket[] = [] // to allow the possibility of having to communicate with multiple servers as a client
     private connectedClientSocket: ConnectedClientSocket[] = [] // to keep track of the all the clients that are connected
@@ -15,7 +17,7 @@ export class WebsocketTransportService implements TransportService {
     private transportEvent!: Subject<TransportEvent>
 
     constructor(event: Subject<TransportEvent>) {
-        console.log(`WebsocketTransportService: Constructing socket transport service....`)
+        this.console.log({ message: `WebsocketTransportService: Constructing socket transport service....` })
         this.transportEvent = event
         // logic here
     }
@@ -26,12 +28,12 @@ export class WebsocketTransportService implements TransportService {
             next: (connectedClient: SocketForConnectedClient) => {
                 handleNewSocketClient(connectedClient, this.connectedClientSocket).subscribe({
                     next: event => this.transportEvent.next(event),
-                    error: error => console.error(error),
-                    complete: () => console.log(`Client ${connectedClient.id} disconnected...`)
+                    error: error => this.console.error({ message: `Observer Error: ${error}`, details: error }),
+                    complete: () => this.console.log({ message: `Client ${connectedClient.id} disconnected...` })
                 })
             },
-            error: error => console.error(error),
-            complete: () => console.log(`...`)
+            error: error => this.console.error({ message: `Observer Error`, details: error }),
+            complete: () => this.console.log({ message: `Not sure what this is for...` })
         })
     }
 
@@ -40,16 +42,16 @@ export class WebsocketTransportService implements TransportService {
         startClientSocketConnection(url).then((socket: ClientSocket) => {
             handleClientSocketConnection(socket, this.connectedServer).subscribe(this.transportEvent)
         }).catch((error) => {
-            console.error(`WebsocketTransport ERROR:`, error)
+            this.console.log({ message: `Observer Error`, details: error })
         })
     }
 
 
     public emit(message: TransportMessage): void {
-        console.log(`Transport Socket service level. Emitting: ${((message.payload as WrappedMessage).payload as FisMessage).header.messageID}`)
+        this.console.log({ message: `Emitting: ${((message.payload as WrappedMessage).payload as FisMessage).header.messageID} to ${message.target}`, details: message })
         let clientObj: ConnectedClientSocket | undefined = this.connectedClientSocket.find(obj => obj.id == message.target)
         let serverObj: ConnectedServerSocket | undefined = this.connectedServer.find(obj => obj.id === message.target)
-        console.log(serverObj?.connectionState.getValue(), serverObj?.id)
+        // this.console.log({ message: `${serverObj?.connectionState.getValue(), serverObj?.id}` })
         // for server usage
         if (clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
             clientObj.socketInstance.emit(`message`, message.payload)

+ 192 - 0
src/utils/log.utils.ts

@@ -0,0 +1,192 @@
+import fs from "fs";
+import "source-map-support/register";
+// const chalk = require('chalk');
+import chalk from 'chalk'
+
+// const logColors: Record<string, (text: string) => string> = {
+//     base: chalk.bgRgb(69, 64, 74),
+//     managers: chalk.bgRgb(128, 20, 217),
+//     transmission: chalk.bgRgb(0, 106, 255),
+//     adapter: chalk.bgRgb(51, 130, 68),
+//     transport: chalk.bgRgb(173, 9, 0),
+//     error: chalk.rgb(212, 32, 0),
+//     util: chalk.rgb(200, 204, 177),
+//     details: chalk.rgb(255, 255, 97),
+//     location: chalk.rgb(241, 112, 255),
+//     retransmission: chalk.bgRgb(186, 87, 0)
+// };
+
+function applyColor(rgb: [number, number, number], isBackground: boolean = false) {
+    const [r, g, b] = rgb;
+    return isBackground ? chalk.bgRgb(r, g, b) : chalk.rgb(r, g, b);
+}
+
+const logColors: Record<string, [number, number, number]> = {
+    base: [69, 64, 74],
+    managers: [128, 20, 217],
+    transmission: [0, 106, 255],
+    adapter: [51, 130, 68],
+    transport: [173, 9, 0],
+    error: [212, 32, 0],
+    util: [200, 204, 177],
+    details: [255, 255, 97],
+    retransmission: [186, 87, 0],
+};
+
+
+class ConsoleLogger {
+    private categoryPath: string[] = []
+    private settings: Record<string, any>;
+    private className!: string
+    constructor(className: string, categoryPath: string[]) {
+        this.className = className
+        let configPath = "./logSetting.json"
+        this.settings = this.loadSettings(configPath);
+        this.categoryPath = categoryPath
+    }
+
+    private loadSettings(configPath: string): Record<string, any> {
+        try {
+            const config = fs.readFileSync(configPath, "utf-8");
+            return JSON.parse(config);
+        } catch (error) {
+            console.error("Failed to load log settings:", error);
+            return {};
+        }
+    }
+
+    private isCategoryEnabled(categoryPath: string[]): boolean {
+        let currentLevel = this.settings;
+
+        for (const part of categoryPath) {
+            if (currentLevel[part] === undefined) {
+                return false; // Category or subcategory does not exist
+            }
+            if (typeof currentLevel[part] === "boolean") {
+                return currentLevel[part];
+            }
+            currentLevel = currentLevel[part];
+        }
+
+        return false;
+    }
+
+    log(message: { message: string, details?: any }): void {
+        if (!this.isCategoryEnabled(this.categoryPath)) {
+            return; // Skip logging if the category is disabled
+        }
+
+        const category = this.categoryPath.join(" -> ").toUpperCase();
+        const location = this.getLogLocation();
+
+        const primaryCategory = this.categoryPath[0];
+        const rgb = logColors[primaryCategory] || [255, 255, 255]; // Default to white
+        const categoryStyle = applyColor(rgb, true); // Use bgRgb for category
+        const locationStyle = applyColor(rgb); // Use rgb for location
+
+        const formattedCategory = categoryStyle(`[${category}]`);
+        const formattedClassName = categoryStyle(`${this.className}`);
+        const formattedLocation = locationStyle(`${location}`);
+
+        const formattedMessage = `${formattedClassName} ${formattedLocation}: ${message.message}`;
+        console.log(formattedMessage, message.details ? applyColor([255, 255, 97])(message.details) : '');
+
+        if (message.details && this.isCategoryEnabled(["details"])) {
+            console.log(applyColor([255, 255, 97])('Details: '), message.details);
+        }
+    }
+
+
+
+    error(message: { message: string, details?: any }): void {
+        if (!this.isCategoryEnabled(this.categoryPath)) {
+            return; // Skip logging if the category is disabled
+        }
+
+        const category = this.categoryPath.join(" -> ").toUpperCase();
+        const location = this.getLogLocation();
+
+        const primaryCategory = this.categoryPath[0];
+        const rgb = logColors[primaryCategory] || [255, 255, 255]; // Default to white
+        const categoryStyle = applyColor(rgb, true); // Use bgRgb for category
+        const locationStyle = applyColor(rgb); // Use rgb for location
+        const messageStyle = applyColor([224, 0, 0])
+        const formattedCategory = categoryStyle(`[${category}]`);
+        const formattedClassName = categoryStyle(`${this.className}`);
+        const formattedLocation = locationStyle(`${location}`);
+        const formattedErrorMessage = messageStyle(`${message.message}`)
+
+        const formattedMessage = `${formattedClassName} ${formattedLocation}: ${formattedErrorMessage}`;
+        console.log(formattedMessage, message.details ? applyColor([224, 0, 0])(message.details) : '');
+
+        if (message.details && this.isCategoryEnabled(["details"])) {
+            console.log(applyColor([224, 0, 0])('Details: '), message.details);
+        }
+    }
+
+
+
+    reloadSettings(configPath: string): void {
+        this.settings = this.loadSettings(configPath);
+    }
+
+    private getLogLocation(): string {
+        if (!this.isCategoryEnabled(["location"])) {
+            return ""; // Don't display location if the category is disabled
+        }
+
+        const error = new Error();
+        // Captures the current stack trace
+        Error.captureStackTrace(error, this.getLogLocation);
+
+        const stack = error.stack?.split("\n") || [];
+        const callerLine = stack[2]; // Adjust index to get the correct caller line (this may vary based on environment)
+
+        // Extract only line and column numbers using regex
+        const match = callerLine?.match(/:(\d+):(\d+)\)/);
+        if (match) {
+            const [, line, column] = match;
+            // return `line ${line}, column ${column}`;
+            return `at line ${line}`;
+        }
+
+        return "at unknown location";
+    }
+}
+
+export default ConsoleLogger;
+
+
+// Extract file name and line number using regex
+// const match = callerLine?.match(/\((.*):(\d+):(\d+)\)/);
+// if (match) {
+//     const [_, filePath, line, column] = match;
+//     return `${filePath}:${line}:${column}`;
+// }
+
+// log(message: { message: string, details?: any }): void {
+//     if (!this.isCategoryEnabled(this.categoryPath)) {
+//         return; // Skip logging if the category is disabled
+//     }
+//     const category = this.categoryPath.join(" -> ").toUpperCase();
+//     const location = this.getLogLocation();
+
+//     // Map the primary category to a color
+//     const primaryCategory = this.categoryPath[0];
+//     const categoryStyle = logColors[primaryCategory] || ((text: string) => text); // Default to no style if not found
+//     const formattedCategory = categoryStyle(`[${category}]`); // Apply color to category part
+
+//     // Apply the same color to the className
+//     const formattedClassName = categoryStyle(`${this.className}`);
+
+//     // Format the message
+//     const formattedLocation = logColors.location(`${location}`); // Apply color to location part
+//     const formattedMessage = `${formattedClassName} ${formattedLocation}: ${message.message}`;
+
+//     // Log based on whether it's an error or regular log
+//     console.log(formattedMessage, message.details ? logColors.details(message.details) : '');
+//     // Log details if enabled
+//     if (message.details && this.isCategoryEnabled(["details"])) {
+//         console.log(logColors.details('Details: '), message.details);
+//     }
+// }

+ 1 - 1
src/utils/message.ordering.ts

@@ -1,7 +1,7 @@
 import { Subject, takeWhile } from "rxjs";
 
 export function sortMessageBasedOnDate(array: WrappedMessage[]): WrappedMessage[] {
-    console.log(`Sorting ${array.length} messages....`)
+    // console.log(`Sorting ${array.length} messages....`)
     return array.sort((a, b) => {
         return new Date(a.timeReceived).getTime() - new Date(b.timeReceived).getTime();
     });

+ 11 - 9
src/utils/retransmission.service.ts

@@ -1,8 +1,10 @@
-import { BehaviorSubject, buffer, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
+import { BehaviorSubject, buffer, distinct, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
 import { v4 as uuidV4 } from 'uuid';
 import { sortMessageBasedOnDate, WrappedMessage } from "./message.ordering";
+import ConsoleLogger from "./log.utils";
 
 export class RetransmissionService {
+    private console: ConsoleLogger = new ConsoleLogger(`RetransmissionService`, ['retransmission'])
     private currentMessageId!: string | null
     private sortMessage: boolean = false
     private bufferReleaseSignal: Subject<void> = new Subject()
@@ -14,12 +16,12 @@ export class RetransmissionService {
     private messageToBeTransmitted: Subject<WrappedMessage> = new Subject()
 
     // Interface
-    public implementRetransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<any>, wantMessageOrdering?: boolean) {
+    public implementRetransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<ConnectionState>, wantMessageOrdering?: boolean) {
         if (wantMessageOrdering) {
             this.sortMessage = true
-            console.log(`Message ordering is set to ${this.sortMessage}`)
+            this.console.log({ message: `Message ordering is set to ${this.sortMessage}` })
         }
-        eventListener.subscribe(event => this.receiverConnectionState.next(event))
+        eventListener.pipe(distinctUntilChanged()).subscribe(event => this.receiverConnectionState.next(event))
 
         this.startWrappingOperation()
         this.startBufferTransmisionProcess()
@@ -41,7 +43,7 @@ export class RetransmissionService {
 
         // wrappedMessageToBeBuffered will then be pushed to buffer
         this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
-            console.log(bufferedMessages.length + ' buffered messages')
+            this.console.log({ message: `${bufferedMessages.length > 0 ? `${bufferedMessages.length} buffered messages` : `No buffered messages at the moment`} ` })
             // console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
             this.arrayToBeTransmitted.next(sortMessageBasedOnDate(bufferedMessages))
             // this.arrayToBeTransmitted.next((this.sortMessage && bufferedMessages.length > 0) ? sortMessageBasedOnDate(bufferedMessages) : bufferedMessages)
@@ -67,7 +69,7 @@ export class RetransmissionService {
     }
 
     private startBufferTransmisionProcess() {
-        console.log(`StartBufferTransmissionProcess`)
+        this.console.log({ message: `StartBufferTransmissionProcess` })
         this.arrayToBeTransmitted.subscribe(array => {
             if (array.length > 0) {
                 this.transmissionState.next('TRANSMITTING')
@@ -110,13 +112,13 @@ export class RetransmissionService {
         this.receiverConnectionState.pipe(
             distinctUntilChanged()
         ).subscribe(clientState => {
-            console.log(`Client is now ${clientState}. ${(clientState === 'OFFLINE')? 'Buffering...' : ''}`)
+            this.console.log({ message: `Client is now ${clientState}. ${(clientState === 'OFFLINE') ? 'Buffering Mode Active...' : 'Releasing Buffered Messages...'}` })
             if (clientState == 'OFFLINE') {
-                console.log(`Current transmission state: ${this.transmissionState.getValue()}`)
+                this.console.log({ message: `Current transmission state: ${this.transmissionState.getValue()}` })
                 // just keep buffering
             }
             if (clientState == 'ONLINE') {
-                console.log(`Current transmission state: ${this.transmissionState.getValue()}`)
+                this.console.log({ message: `Current transmission state: ${this.transmissionState.getValue()}` })
                 // get the stored messages to pump it back into the buffer to be ready to be processed immediately
                 if (this.transmissionState.getValue() == 'ARRAY EMPTY') {
                     this.bufferReleaseSignal.next()

+ 53 - 40
src/utils/socket.utils.ts

@@ -7,11 +7,13 @@ import { v4 as uuidv4 } from 'uuid'
 import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
 import { ConnectedClientSocket, ConnectedServerSocket } from '../transport/websocket';
 import { EventMessage } from '../interface/transport.interface';
+import ConsoleLogger from './log.utils';
+const console: ConsoleLogger = new ConsoleLogger(`SocketUtils`, ['transport'])
 
 export function startSocketServer(port: number): Observable<SocketForConnectedClient> {
     return new Observable((observer) => {
         try {
-            console.log(`Socket Server ${port} Started....`)
+            console.log({ message: `Socket Server ${port} Started....` })
             let httpServer = createServer();
             let socketServer = new Server(httpServer)
 
@@ -21,15 +23,15 @@ export function startSocketServer(port: number): Observable<SocketForConnectedCl
             })
 
             socketServer.engine.on("connection_error", (err) => {
-                console.log(err.req);      // the request object
-                console.log(err.code);     // the error code, for example 1
-                console.log(err.message);  // the error message, for example "Session ID unknown"
-                console.log(err.context);  // some additional error context
+                console.log({ message: `Socket Server ${port} Connection Error`, details: err.req })
+                console.log({ message: `Socket Server ${port} Connection Error`, details: err.code })
+                console.log({ message: `Socket Server ${port} Connection Error`, details: err.message })
+                console.log({ message: `Socket Server ${port} Connection Error`, details: err.context })
             });
 
             // Start the HTTP server on 127.0.0.1 with the given port
             httpServer.listen(port, '0.0.0.0', () => {
-                console.log(`Socket server listening on ${port}`);
+                console.log({ message: `Socket server listening on ${port}` });
             });
         } catch (error) {
             observer.error(error);
@@ -64,7 +66,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
 
         // Listen for a connection event
         socket.on('connect', () => {
-            console.log('Connected to the server:', socket.id)
+            console.log({ message: `Connected to the server ${socket.id} ` })
             if (receiverProfileInfo?.id) {
                 checkOwnClientInfo(receiverProfileInfo.id).then((profile: { id: string }) => {
                     socket.emit('profile', {
@@ -109,9 +111,8 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
         })
 
         socket.on('profile', (data: { name: string, message: any }) => {
-            console.log(data)
             if (data.name == 'New Profile') {
-                console.log(`Assigned client Name: ${data.message.id}`)
+                console.log({ message: `Assigned client Name: ${data.message.id}` })
                 // Update websocket instance record
                 receiverProfileInfo = {
                     id: data.message.id,
@@ -120,7 +121,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                     connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
                 }
                 writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
-                    // broadcast event to allow retransmission to release buffer
+                    // broadcast event to allow transmission manager to instantiate transmission components
                     eventNotification.next({
                         id: uuidv4(),
                         event: `New Server`,
@@ -129,11 +130,20 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                             message: `New Websocket Channel ${(data.message as ConnectedServerSocket).id} established.`
                         } as EventMessage
                     })
+                    // broadcast event to allow retransmission to relase buffered messages
+                    eventNotification.next({
+                        id: uuidv4(),
+                        event: `Server Connected`,
+                        data: {
+                            clientId: (data.message as ConnectedServerSocket).id,
+                            message: `Server ${(data.message as ConnectedServerSocket).id} connected and ready to go.`
+                        } as EventMessage
+                    })
                 }).catch((error) => { }) // do nothing at the moment. 
                 serversConnected.push(receiverProfileInfo)
             }
             if (data.name == 'Adjusted Profile') {
-                console.log(`Assigned client Name: ${(data.message as ConnectedServerSocket).id}`)
+                console.log({ message: `Adjusted client Name: ${(data.message as ConnectedServerSocket).id}` })
                 // Update websocket instance record
                 let clientObj: ConnectedServerSocket | undefined = serversConnected.find(obj => obj.id === data.message.id)
                 if (clientObj) {
@@ -142,7 +152,9 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                     clientObj.id = receiverProfileInfo.id
                     clientObj.socketInstance = socket
                     clientObj.connectionState.next('ONLINE')
-                    console.log(`Just to make sure they are pointed accurately:`, `This should be ONLINE: ${receiverProfileInfo.connectionState.getValue()}`, `Id match? ${receiverProfileInfo.id == clientObj.id ? true : false}`)
+                    console.log({
+                        message: `Just to make sure they are pointed accurately: This should be ONLINE: ${receiverProfileInfo.connectionState.getValue()} !! Id match? ${receiverProfileInfo.id == clientObj.id ? true : false}`,
+                    })
                 }
                 writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
                     // broadcast event to allow retransmission to release buffer
@@ -157,7 +169,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                 }).catch((error) => { }) // do nothing at the moment. 
             }
             if (data.name == 'Error') {
-                console.log(`Server cannot find credentials`, data.message)
+                console.log({ message: `Server cannot find credentials`, details: data.message })
                 // logic to request for new credentials
                 setTimeout(() => {
                     socket.emit('profile', {
@@ -170,7 +182,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
 
         // Handle disconnection
         socket.on('disconnect', () => {
-            console.error(`Socket Server ${receiverProfileInfo.id} Disconnected`)
+            console.log({ message: `Socket Server ${receiverProfileInfo.id} Disconnected` })
             if (receiverProfileInfo) {
                 eventNotification.next({
                     id: uuidv4(),
@@ -189,7 +201,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
 // For SERVER Usage: set up socket listeners to start listening for different events
 export function handleNewSocketClient(socket: SocketForConnectedClient, connectedClientSocket: ConnectedClientSocket[]): Observable<TransportEvent> {
     return new Observable((event: Observer<TransportEvent>) => {
-        console.log(`Setting up listeners for socket:${socket.id}`)
+        console.log({ message: `Setting up listeners for socket:${socket.id}` })
         // returns the socket client instance 
         // listen to receiver's initiotion first before assigning 'credentials'
         socket.on(`profile`, (message: { name: string, data: any }) => {
@@ -229,23 +241,21 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
                     checkIfClientExists(message.data.id).then((client: ConnectedClientSocket) => {
                         clientInstance = client
                         handleFoundClient(clientInstance)
-                    }).catch(error => console.error(error))
+                    }).catch(error => {
+                        console.log({ message: `Promise Error`, details: error })
+                    })
                 }
                 function handleFoundClient(clientInstance: ConnectedClientSocket | undefined) {
                     if (clientInstance) {
-                        console.log(`Socket Client ${clientInstance.id} Found`)
+                        console.log({ message: `Socket Client ${clientInstance.id} Found` })
                         socket.emit('profile', { name: 'Adjusted Profile', message: { id: clientInstance.id } })
                         // replace socket instance since the previous has been terminated
                         clientInstance.socketInstance = socket
-                        // some explanation here. For the case where the server reads from the DB, no need to terminate subject, since all instances would be destroyed alongside the server shut down. This case is specificd only when there's a need to read from local file
-                        if (!clientInstance.connectionState) {
-                            clientInstance.connectionState = new BehaviorSubject<ConnectionState>(`OFFLINE`)
-                        }
                         // need to start listening again, because it's assigned a different socket instance this time round
-                        startListening(socket, clientInstance, event)
+                        startListening(socket, clientInstance, event, true)
 
                     } else {
-                        console.log(`Profile Not Found`)
+                        console.log({ message: `Profile Not Found` })
                         socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' })
                     }
                 }
@@ -261,10 +271,10 @@ export async function writeFile(data: ConnectedServerSocket, filename: string):
         // Write JSON data to a file
         fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => {
             if (err) {
-                console.error('Error writing file', err);
+                console.log({ message: 'Error writing file', details: err })
                 reject(false)
             } else {
-                console.log('File has been written');
+                console.log({ message: 'File has been written', details: err })
                 resolve(true)
             }
         });
@@ -282,7 +292,7 @@ export function addClientToDB(entry: ConnectedClientSocket, filePath: string = '
             data = JSON.parse(fileContent);
         }
 
-        // Append the new object to the array
+        // Append the new details to the array
         data.push({
             id: entry.id,
             dateCreated: entry.dateCreated,
@@ -292,9 +302,9 @@ export function addClientToDB(entry: ConnectedClientSocket, filePath: string = '
 
         // Write the updated array back to the file
         fs.writeFileSync(filePath, JSON.stringify(data, null, 2), 'utf-8');
-        console.log(`Entry added successfully.`);
+        console.log({ message: `Entry added successfully.` })
     } catch (error) {
-        console.error('Error writing to file:', error);
+        console.log({ message: 'Error writing to file:', details: error })
     }
 }
 
@@ -303,7 +313,7 @@ export async function checkIfClientExists(id: string, filePath: string = 'client
         try {
             // Check if the file exists
             if (!fs.existsSync(filePath)) {
-                console.log("File does not exist.");
+                console.log({ message: "File does not exist." })
                 reject('File does not exist');
             }
 
@@ -311,18 +321,16 @@ export async function checkIfClientExists(id: string, filePath: string = 'client
             const fileContent = fs.readFileSync(filePath, 'utf-8');
             const data: any[] = JSON.parse(fileContent);
 
-            // Check if an object with the given id exists
+            // Check if an details with the given id exists
             let obj = data.find(entry => entry.id === id);
 
             if (obj) {
-                console.log(`Client with ID ${id} exists.`);
+                console.log({ message: "Client with ID ${id} exists." })
             } else {
-                console.log(`Client with ID ${id} does not exist.`);
+                console.log({ message: `Client with ID ${id} does not exist.` })
             }
-
             resolve(obj);
         } catch (error) {
-            console.error('Error reading the file:', error);
             reject(`Error reading the file`)
         }
     })
@@ -349,34 +357,39 @@ export async function checkOwnClientInfo(filename?: string): Promise<{ id: strin
 
             } catch (err) {
                 // Handle parsing errors or other file-related errors
-                console.error("Error reading or parsing file:", err);
+                console.log({ message: "Error reading or parsing file:", details: err })
                 reject('');
             }
         } else {
-            console.error("File does not exist");
+            console.log({ message: "File does not exist" })
             reject('');
         }
     })
 }
 
 // this is for server usage only
-export function startListening(socket: SocketForConnectedClient, client: ConnectedClientSocket, eventListener: Observer<TransportEvent>): void {
+export function startListening(socket: SocketForConnectedClient, client: ConnectedClientSocket, eventListener: Observer<TransportEvent>, oldClient?: boolean): void {
     // notify it's associated retransmission to start releaseing buffer
     eventListener.next({
         id: uuidv4(),
-        event: `Client Connected`,
+        event: oldClient ? 'Client Re-connected' : `Client Connected`,
         data: {
             clientId: client.id,
-            message: `Socket Client Connected. Adapter ID assigned: ${client.id}`,
+            message: `Socket Client ${oldClient ? `Re-Connected` : `Connected`}. Adapter ID assigned: ${client.id}`,
             payload: client
         } as EventMessage
     })
     // Resume operation
-    if (client.connectionState.getValue() == 'OFFLINE') {
+    // some explanation here. For the case where the server reads from the DB, no need to terminate subject, since all instances would be destroyed alongside the server shut down. This case is specificd only when there's a need to read from local file
+    if (!client.connectionState) {
+        client.connectionState = new BehaviorSubject<ConnectionState>(`ONLINE`)
+    } else {
         client.connectionState.next(`ONLINE`)
     }
+
     /* Generally, we don't need this unless in the case of being the receiver */
     socket.on('message', (message: any) => {
+        console.log({ message: `Message from client ${client.id}`, details: message })
         eventListener.next({
             id: uuidv4(),
             event: 'New Message',

+ 3 - 3
tsconfig.json

@@ -47,8 +47,8 @@
     // "declaration": true,                              /* Generate .d.ts files from TypeScript and JavaScript files in your project. */
     // "declarationMap": true,                           /* Create sourcemaps for d.ts files. */
     // "emitDeclarationOnly": true,                      /* Only output d.ts files and not JavaScript files. */
-    // "sourceMap": true,                                /* Create source map files for emitted JavaScript files. */
-    // "inlineSourceMap": true,                          /* Include sourcemap files inside the emitted JavaScript. */
+    "sourceMap": true,                                /* Create source map files for emitted JavaScript files. */
+    "inlineSourceMap": false,                          /* Include sourcemap files inside the emitted JavaScript. */
     // "outFile": "./",                                  /* Specify a file that bundles all outputs into one JavaScript file. If 'declaration' is true, also designates a file that bundles all .d.ts output. */
     "outDir": "./dist", /* Specify an output folder for all emitted files. */
     // "removeComments": true,                           /* Disable emitting comments. */
@@ -58,7 +58,7 @@
     // "downlevelIteration": true,                       /* Emit more compliant, but verbose and less performant JavaScript for iteration. */
     // "sourceRoot": "",                                 /* Specify the root path for debuggers to find the reference source code. */
     // "mapRoot": "",                                    /* Specify the location where debugger should locate map files instead of generated locations. */
-    // "inlineSources": true,                            /* Include source code in the sourcemaps inside the emitted JavaScript. */
+    "inlineSources": true,                            /* Include source code in the sourcemaps inside the emitted JavaScript. */
     // "emitBOM": true,                                  /* Emit a UTF-8 Byte Order Mark (BOM) in the beginning of output files. */
     // "newLine": "crlf",                                /* Set the newline character for emitting files. */
     // "stripInternal": true,                            /* Disable emitting declarations that have '@internal' in their JSDoc comments. */