123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 |
- import { BehaviorSubject, Observable, Observer, Subject } from 'rxjs';
- import { createServer } from 'http';
- import { Server, Socket as SocketForConnectedClient } from 'socket.io';
- import { io, Socket as ClientSocket } from 'socket.io-client';
- import * as fs from 'fs'
- import { v4 as uuidv4 } from 'uuid'
- import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
- import { ConnectedClientSocket, ConnectedServerSocket } from '../transport/websocket';
- import { EventMessage } from '../interface/transport.interface';
- import ConsoleLogger from './log.utils';
- const console: ConsoleLogger = new ConsoleLogger(`SocketUtils`, ['transport'])
- export function startSocketServer(port: number): Observable<SocketForConnectedClient> {
- return new Observable((observer) => {
- try {
- console.log({ message: `Socket Server ${port} Started....` })
- let httpServer = createServer();
- let socketServer = new Server(httpServer)
- // something wrong here
- socketServer.on('connection', (socket) => {
- observer.next(socket)
- })
- socketServer.engine.on("connection_error", (err) => {
- console.log({ message: `Socket Server ${port} Connection Error`, details: err.req })
- console.log({ message: `Socket Server ${port} Connection Error`, details: err.code })
- console.log({ message: `Socket Server ${port} Connection Error`, details: err.message })
- console.log({ message: `Socket Server ${port} Connection Error`, details: err.context })
- });
- // Start the HTTP server on 127.0.0.1 with the given port
- httpServer.listen(port, '0.0.0.0', () => {
- console.log({ message: `Socket server listening on ${port}` });
- });
- } catch (error) {
- observer.error(error);
- }
- })
- }
- export async function startClientSocketConnection(serverUrl: string): Promise<ClientSocket> {
- return new Promise((resolve, reject) => {
- try {
- // let clientSocket = io(serverUrl)
- let clientSocket: ClientSocket = io(serverUrl, {
- reconnection: true, // Enable automatic reconnections
- reconnectionAttempts: 1000, // Retry up to 10 times
- reconnectionDelay: 500, // Start with a 500ms delay
- reconnectionDelayMax: 10000, // Delay can grow to a max of 10 seconds
- randomizationFactor: 0.3,
- })
- resolve(clientSocket)
- }
- catch (error) {
- reject(error)
- }
- })
- }
- // After establishing connection to the server, set up the credentials, confirm whether or not if there's any credentials, if not ask for one from the server
- export function handleClientSocketConnection(socket: ClientSocket, serversConnected: ConnectedServerSocket[]): Observable<TransportEvent> {
- return new Observable((eventNotification: Observer<TransportEvent>) => {
- let buffer: any[] = []
- let receiverProfileInfo!: ConnectedServerSocket
- // Listen for a connection event
- socket.on('connect', () => {
- console.log({ message: `Connected to the server ${socket.id} ` })
- if (receiverProfileInfo?.id) {
- checkOwnClientInfo(receiverProfileInfo.id).then((profile: { id: string }) => {
- socket.emit('profile', {
- name: 'Old Client',
- data: profile
- })
- }).catch((error) => {
- socket.emit('profile', {
- name: 'New Client',
- data: null
- })
- })
- } else {
- socket.emit('profile', {
- name: 'New Client',
- data: null
- })
- }
- });
- // Listen for messages from the server. Generally here's the responses
- socket.on('message', (msg: any) => {
- // console.log(`Websocket Client Transport Receieve Msg`, msg)
- if (receiverProfileInfo) {
- // publish to event
- eventNotification.next({
- id: uuidv4(),
- event: 'New Message',
- data: {
- id: uuidv4(),
- dateCreated: new Date(),
- transport: Transport.Websocket,
- target: receiverProfileInfo.id,
- payload: msg
- } as TransportMessage
- })
- } else {
- // Do nothing. just store in local array first. Cannot process without information. but then again, don['t need information if acting as client
- // but for consistency sake, will impose the standard
- buffer.push(msg) // store locally for now
- }
- })
- socket.on('profile', (data: { name: string, message: any }) => {
- if (data.name == 'New Profile') {
- console.log({ message: `Assigned client Name: ${data.message.id}` })
- // Update websocket instance record
- receiverProfileInfo = {
- id: data.message.id,
- dateCreated: new Date(),
- socketInstance: socket,
- connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
- }
- writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
- // broadcast event to allow transmission manager to instantiate transmission components
- eventNotification.next({
- id: uuidv4(),
- event: `New Server`,
- data: {
- clientId: (data.message as ConnectedServerSocket).id,
- message: `New Websocket Channel ${(data.message as ConnectedServerSocket).id} established.`
- } as EventMessage
- })
- // broadcast event to allow retransmission to relase buffered messages
- eventNotification.next({
- id: uuidv4(),
- event: `Server Connected`,
- data: {
- clientId: (data.message as ConnectedServerSocket).id,
- message: `Server ${(data.message as ConnectedServerSocket).id} connected and ready to go.`
- } as EventMessage
- })
- }).catch((error) => { }) // do nothing at the moment.
- serversConnected.push(receiverProfileInfo)
- }
- if (data.name == 'Adjusted Profile') {
- console.log({ message: `Adjusted client Name: ${(data.message as ConnectedServerSocket).id}` })
- // Update websocket instance record
- let clientObj: ConnectedServerSocket | undefined = serversConnected.find(obj => obj.id === data.message.id)
- if (clientObj) {
- receiverProfileInfo.id = (data.message.id)
- clientObj.id = receiverProfileInfo.id
- clientObj.socketInstance = socket
- clientObj.connectionState.next('ONLINE')
- console.log({
- message: `Just to make sure they are pointed accurately: This should be ONLINE: ${receiverProfileInfo.connectionState.getValue()} !! Id match? ${receiverProfileInfo.id == clientObj.id ? true : false}`,
- })
- }
- writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
- // broadcast event to allow retransmission to release buffer
- eventNotification.next({
- id: uuidv4(),
- event: 'Server Connected',
- data: {
- clientId: (data.message as ConnectedServerSocket).id,
- message: `Existing Websocket Channel ${(data.message as ConnectedServerSocket).id} re-established.`
- } as EventMessage
- })
- }).catch((error) => { }) // do nothing at the moment.
- }
- if (data.name == 'Error') {
- console.log({ message: `Server cannot find credentials`, details: data.message })
- // logic to request for new credentials
- setTimeout(() => {
- socket.emit('profile', {
- name: 'New Client',
- data: null
- })
- }, 2000)
- }
- })
- // Handle disconnection
- socket.on('disconnect', () => {
- console.log({ message: `Socket Server ${receiverProfileInfo.id} Disconnected` })
- if (receiverProfileInfo) {
- eventNotification.next({
- id: uuidv4(),
- event: `Server Disconnected`,
- data: {
- clientId: receiverProfileInfo.id,
- message: `Server for Channel ${receiverProfileInfo.id} disconnected.`
- } as EventMessage
- })
- receiverProfileInfo.connectionState.next(`OFFLINE`)
- }
- });
- })
- }
- // For SERVER Usage: set up socket listeners to start listening for different events
- export function handleNewSocketClient(socket: SocketForConnectedClient, connectedClientSocket: ConnectedClientSocket[]): Observable<TransportEvent> {
- return new Observable((event: Observer<TransportEvent>) => {
- console.log({ message: `Setting up listeners for socket:${socket.id}` })
- // returns the socket client instance
- // listen to receiver's initiotion first before assigning 'credentials'
- socket.on(`profile`, (message: { name: string, data: any }) => {
- if (message.name == 'New Client') {
- let clientInstance: ConnectedClientSocket = {
- id: uuidv4(), // client should only be assigned at this level. And is passed around for reference pointing
- dateCreated: new Date(),
- socketInstance: socket,
- connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`)
- }
- // send to receiver for reference
- socket.emit('profile', {
- name: `New Profile`, message: { id: clientInstance.id }
- })
- // publish first event notification
- event.next({
- id: uuidv4(),
- event: `New Client`,
- data: {
- clientId: clientInstance.id,
- message: `New Socket Client Connected. Adapter ID assigned: ${clientInstance.id}`,
- payload: clientInstance
- } as EventMessage
- })
- // Update connected clientInstance info to adapter
- connectedClientSocket.push(clientInstance)
- addClientToDB(clientInstance)
- startListening(socket, clientInstance, event)
- } else {
- // update first
- let clientInstance: ConnectedClientSocket | undefined
- if (connectedClientSocket.length > 0) {
- clientInstance = connectedClientSocket.find(obj => obj.id === message.data.id)
- handleFoundClient(clientInstance)
- } else {
- // for the case server itself got shit down or something
- checkIfClientExists(message.data.id).then((client: ConnectedClientSocket) => {
- clientInstance = client
- handleFoundClient(clientInstance)
- }).catch(error => {
- console.log({ message: `Promise Error`, details: error })
- })
- }
- function handleFoundClient(clientInstance: ConnectedClientSocket | undefined) {
- if (clientInstance) {
- console.log({ message: `Socket Client ${clientInstance.id} Found` })
- socket.emit('profile', { name: 'Adjusted Profile', message: { id: clientInstance.id } })
- // replace socket instance since the previous has been terminated
- clientInstance.socketInstance = socket
- // need to start listening again, because it's assigned a different socket instance this time round
- startListening(socket, clientInstance, event, true)
- } else {
- console.log({ message: `Profile Not Found` })
- socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' })
- }
- }
- }
- })
- })
- }
- // Specifically to write receiver profile information
- export async function writeFile(data: ConnectedServerSocket, filename: string): Promise<boolean> {
- return new Promise((resolve, reject) => {
- // Write JSON data to a file
- fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => {
- if (err) {
- console.log({ message: 'Error writing file', details: err })
- reject(false)
- } else {
- console.log({ message: 'File has been written', details: err })
- resolve(true)
- }
- });
- })
- }
- /* For Internal Usage only. Temporary serve as a way for server to keep track of clients. To be replaced in the future with better alternatives. */
- export function addClientToDB(entry: ConnectedClientSocket, filePath: string = 'clients.json'): void {
- try {
- let data: ConnectedClientSocket[] = [];
- // Check if the file exists and load existing data
- if (fs.existsSync(filePath)) {
- const fileContent = fs.readFileSync(filePath, 'utf-8');
- data = JSON.parse(fileContent);
- }
- // Append the new details to the array
- data.push({
- id: entry.id,
- dateCreated: entry.dateCreated,
- connectionState: null,
- socketInstance: null
- } as unknown as ConnectedClientSocket);
- // Write the updated array back to the file
- fs.writeFileSync(filePath, JSON.stringify(data, null, 2), 'utf-8');
- console.log({ message: `Entry added successfully.` })
- } catch (error) {
- console.log({ message: 'Error writing to file:', details: error })
- }
- }
- export async function checkIfClientExists(id: string, filePath: string = 'clients.json'): Promise<ConnectedClientSocket> {
- return new Promise((resolve, reject) => {
- try {
- // Check if the file exists
- if (!fs.existsSync(filePath)) {
- console.log({ message: "File does not exist." })
- reject('File does not exist');
- }
- // Read and parse the data
- const fileContent = fs.readFileSync(filePath, 'utf-8');
- const data: any[] = JSON.parse(fileContent);
- // Check if an details with the given id exists
- let obj = data.find(entry => entry.id === id);
- if (obj) {
- console.log({ message: "Client with ID ${id} exists." })
- } else {
- console.log({ message: `Client with ID ${id} does not exist.` })
- }
- resolve(obj);
- } catch (error) {
- reject(`Error reading the file`)
- }
- })
- }
- // Check if filename exists. Return profile information if there's any
- export async function checkOwnClientInfo(filename?: string): Promise<{ id: string }> {
- return new Promise((resolve, reject) => {
- // Check if the file exists
- if (fs.existsSync(`${filename}.json`)) {
- try {
- // Read the file contents
- const fileData = fs.readFileSync(`${filename}.json`, 'utf8');
- // If the file is empty, return an error
- if (fileData.trim() === "") {
- throw new Error("File is empty");
- }
- // Parse and return the data if present
- const jsonData = JSON.parse(fileData);
- resolve(jsonData)
- } catch (err) {
- // Handle parsing errors or other file-related errors
- console.log({ message: "Error reading or parsing file:", details: err })
- reject('');
- }
- } else {
- console.log({ message: "File does not exist" })
- reject('');
- }
- })
- }
- // this is for server usage only
- export function startListening(socket: SocketForConnectedClient, client: ConnectedClientSocket, eventListener: Observer<TransportEvent>, oldClient?: boolean): void {
- // notify it's associated retransmission to start releaseing buffer
- eventListener.next({
- id: uuidv4(),
- event: oldClient ? 'Client Re-connected' : `Client Connected`,
- data: {
- clientId: client.id,
- message: `Socket Client ${oldClient ? `Re-Connected` : `Connected`}. Adapter ID assigned: ${client.id}`,
- payload: client
- } as EventMessage
- })
- // Resume operation
- // some explanation here. For the case where the server reads from the DB, no need to terminate subject, since all instances would be destroyed alongside the server shut down. This case is specificd only when there's a need to read from local file
- if (!client.connectionState) {
- client.connectionState = new BehaviorSubject<ConnectionState>(`ONLINE`)
- } else {
- client.connectionState.next(`ONLINE`)
- }
- /* Generally, we don't need this unless in the case of being the receiver */
- socket.on('message', (message: any) => {
- console.log({ message: `Message from client ${client.id}`, details: message })
- eventListener.next({
- id: uuidv4(),
- event: 'New Message',
- data: {
- id: uuidv4(),
- dateCreated: new Date(),
- transport: Transport.Websocket,
- target: client.id, // this ref to be associated with the client/channel
- payload: message
- } as TransportMessage
- })
- })
- socket.on('disconnect', () => {
- eventListener.next({
- id: uuidv4(),
- event: 'Client Disconnected',
- data: {
- clientId: client.id,
- message: '',
- payload: {
- time: new Date()
- }
- } as EventMessage
- })
- eventListener.error(`Client ${client.id} disconnected. Terminating this observable event for this client socket...`)
- eventListener.complete()
- })
- }
|