import { BehaviorSubject, Observable, Observer, Subject } from 'rxjs'; import { createServer } from 'http'; import { Server, Socket as SocketForConnectedClient } from 'socket.io'; import { io, Socket as ClientSocket } from 'socket.io-client'; import * as fs from 'fs' import { v4 as uuidv4 } from 'uuid' import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface'; import { ConnectedClientSocket, ConnectedServerSocket } from '../transport/websocket'; import { EventMessage } from '../interface/transport.interface'; import ConsoleLogger from './log.utils'; const console: ConsoleLogger = new ConsoleLogger(`SocketUtils`, ['transport']) export function startSocketServer(port: number): Observable { return new Observable((observer) => { try { console.log({ message: `Socket Server ${port} Started....` }) let httpServer = createServer(); let socketServer = new Server(httpServer) // something wrong here socketServer.on('connection', (socket) => { observer.next(socket) }) socketServer.engine.on("connection_error", (err) => { console.log({ message: `Socket Server ${port} Connection Error`, details: err.req }) console.log({ message: `Socket Server ${port} Connection Error`, details: err.code }) console.log({ message: `Socket Server ${port} Connection Error`, details: err.message }) console.log({ message: `Socket Server ${port} Connection Error`, details: err.context }) }); // Start the HTTP server on 127.0.0.1 with the given port httpServer.listen(port, '0.0.0.0', () => { console.log({ message: `Socket server listening on ${port}` }); }); } catch (error) { observer.error(error); } }) } export async function startClientSocketConnection(serverUrl: string): Promise { return new Promise((resolve, reject) => { try { // let clientSocket = io(serverUrl) let clientSocket: ClientSocket = io(serverUrl, { reconnection: true, // Enable automatic reconnections reconnectionAttempts: 1000, // Retry up to 10 times reconnectionDelay: 500, // Start with a 500ms delay reconnectionDelayMax: 10000, // Delay can grow to a max of 10 seconds randomizationFactor: 0.3, }) resolve(clientSocket) } catch (error) { reject(error) } }) } // After establishing connection to the server, set up the credentials, confirm whether or not if there's any credentials, if not ask for one from the server export function handleClientSocketConnection(socket: ClientSocket, serversConnected: ConnectedServerSocket[]): Observable { return new Observable((eventNotification: Observer) => { let buffer: any[] = [] let receiverProfileInfo!: ConnectedServerSocket // Listen for a connection event socket.on('connect', () => { console.log({ message: `Connected to the server ${socket.id} ` }) if (receiverProfileInfo?.id) { checkOwnClientInfo(receiverProfileInfo.id).then((profile: { id: string }) => { socket.emit('profile', { name: 'Old Client', data: profile }) }).catch((error) => { socket.emit('profile', { name: 'New Client', data: null }) }) } else { socket.emit('profile', { name: 'New Client', data: null }) } }); // Listen for messages from the server. Generally here's the responses socket.on('message', (msg: any) => { // console.log(`Websocket Client Transport Receieve Msg`, msg) if (receiverProfileInfo) { // publish to event eventNotification.next({ id: uuidv4(), event: 'New Message', data: { id: uuidv4(), dateCreated: new Date(), transport: Transport.Websocket, target: receiverProfileInfo.id, payload: msg } as TransportMessage }) } else { // Do nothing. just store in local array first. Cannot process without information. but then again, don['t need information if acting as client // but for consistency sake, will impose the standard buffer.push(msg) // store locally for now } }) socket.on('profile', (data: { name: string, message: any }) => { if (data.name == 'New Profile') { console.log({ message: `Assigned client Name: ${data.message.id}` }) // Update websocket instance record receiverProfileInfo = { id: data.message.id, dateCreated: new Date(), socketInstance: socket, connectionState: new BehaviorSubject(`ONLINE`) } writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => { // broadcast event to allow transmission manager to instantiate transmission components eventNotification.next({ id: uuidv4(), event: `New Server`, data: { clientId: (data.message as ConnectedServerSocket).id, message: `New Websocket Channel ${(data.message as ConnectedServerSocket).id} established.` } as EventMessage }) // broadcast event to allow retransmission to relase buffered messages eventNotification.next({ id: uuidv4(), event: `Server Connected`, data: { clientId: (data.message as ConnectedServerSocket).id, message: `Server ${(data.message as ConnectedServerSocket).id} connected and ready to go.` } as EventMessage }) }).catch((error) => { }) // do nothing at the moment. serversConnected.push(receiverProfileInfo) } if (data.name == 'Adjusted Profile') { console.log({ message: `Adjusted client Name: ${(data.message as ConnectedServerSocket).id}` }) // Update websocket instance record let clientObj: ConnectedServerSocket | undefined = serversConnected.find(obj => obj.id === data.message.id) if (clientObj) { receiverProfileInfo.id = (data.message.id) clientObj.id = receiverProfileInfo.id clientObj.socketInstance = socket clientObj.connectionState.next('ONLINE') console.log({ message: `Just to make sure they are pointed accurately: This should be ONLINE: ${receiverProfileInfo.connectionState.getValue()} !! Id match? ${receiverProfileInfo.id == clientObj.id ? true : false}`, }) } writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => { // broadcast event to allow retransmission to release buffer eventNotification.next({ id: uuidv4(), event: 'Server Connected', data: { clientId: (data.message as ConnectedServerSocket).id, message: `Existing Websocket Channel ${(data.message as ConnectedServerSocket).id} re-established.` } as EventMessage }) }).catch((error) => { }) // do nothing at the moment. } if (data.name == 'Error') { console.log({ message: `Server cannot find credentials`, details: data.message }) // logic to request for new credentials setTimeout(() => { socket.emit('profile', { name: 'New Client', data: null }) }, 2000) } }) // Handle disconnection socket.on('disconnect', () => { console.log({ message: `Socket Server ${receiverProfileInfo.id} Disconnected` }) if (receiverProfileInfo) { eventNotification.next({ id: uuidv4(), event: `Server Disconnected`, data: { clientId: receiverProfileInfo.id, message: `Server for Channel ${receiverProfileInfo.id} disconnected.` } as EventMessage }) receiverProfileInfo.connectionState.next(`OFFLINE`) } }); }) } // For SERVER Usage: set up socket listeners to start listening for different events export function handleNewSocketClient(socket: SocketForConnectedClient, connectedClientSocket: ConnectedClientSocket[]): Observable { return new Observable((event: Observer) => { console.log({ message: `Setting up listeners for socket:${socket.id}` }) // returns the socket client instance // listen to receiver's initiotion first before assigning 'credentials' socket.on(`profile`, (message: { name: string, data: any }) => { if (message.name == 'New Client') { let clientInstance: ConnectedClientSocket = { id: uuidv4(), // client should only be assigned at this level. And is passed around for reference pointing dateCreated: new Date(), socketInstance: socket, connectionState: new BehaviorSubject(`OFFLINE`) } // send to receiver for reference socket.emit('profile', { name: `New Profile`, message: { id: clientInstance.id } }) // publish first event notification event.next({ id: uuidv4(), event: `New Client`, data: { clientId: clientInstance.id, message: `New Socket Client Connected. Adapter ID assigned: ${clientInstance.id}`, payload: clientInstance } as EventMessage }) // Update connected clientInstance info to adapter connectedClientSocket.push(clientInstance) addClientToDB(clientInstance) startListening(socket, clientInstance, event) } else { // update first let clientInstance: ConnectedClientSocket | undefined if (connectedClientSocket.length > 0) { clientInstance = connectedClientSocket.find(obj => obj.id === message.data.id) handleFoundClient(clientInstance) } else { // for the case server itself got shit down or something checkIfClientExists(message.data.id).then((client: ConnectedClientSocket) => { clientInstance = client handleFoundClient(clientInstance) }).catch(error => { console.log({ message: `Promise Error`, details: error }) }) } function handleFoundClient(clientInstance: ConnectedClientSocket | undefined) { if (clientInstance) { console.log({ message: `Socket Client ${clientInstance.id} Found` }) socket.emit('profile', { name: 'Adjusted Profile', message: { id: clientInstance.id } }) // replace socket instance since the previous has been terminated clientInstance.socketInstance = socket // need to start listening again, because it's assigned a different socket instance this time round startListening(socket, clientInstance, event, true) } else { console.log({ message: `Profile Not Found` }) socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' }) } } } }) }) } // Specifically to write receiver profile information export async function writeFile(data: ConnectedServerSocket, filename: string): Promise { return new Promise((resolve, reject) => { // Write JSON data to a file fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => { if (err) { console.log({ message: 'Error writing file', details: err }) reject(false) } else { console.log({ message: 'File has been written', details: err }) resolve(true) } }); }) } /* For Internal Usage only. Temporary serve as a way for server to keep track of clients. To be replaced in the future with better alternatives. */ export function addClientToDB(entry: ConnectedClientSocket, filePath: string = 'clients.json'): void { try { let data: ConnectedClientSocket[] = []; // Check if the file exists and load existing data if (fs.existsSync(filePath)) { const fileContent = fs.readFileSync(filePath, 'utf-8'); data = JSON.parse(fileContent); } // Append the new details to the array data.push({ id: entry.id, dateCreated: entry.dateCreated, connectionState: null, socketInstance: null } as unknown as ConnectedClientSocket); // Write the updated array back to the file fs.writeFileSync(filePath, JSON.stringify(data, null, 2), 'utf-8'); console.log({ message: `Entry added successfully.` }) } catch (error) { console.log({ message: 'Error writing to file:', details: error }) } } export async function checkIfClientExists(id: string, filePath: string = 'clients.json'): Promise { return new Promise((resolve, reject) => { try { // Check if the file exists if (!fs.existsSync(filePath)) { console.log({ message: "File does not exist." }) reject('File does not exist'); } // Read and parse the data const fileContent = fs.readFileSync(filePath, 'utf-8'); const data: any[] = JSON.parse(fileContent); // Check if an details with the given id exists let obj = data.find(entry => entry.id === id); if (obj) { console.log({ message: "Client with ID ${id} exists." }) } else { console.log({ message: `Client with ID ${id} does not exist.` }) } resolve(obj); } catch (error) { reject(`Error reading the file`) } }) } // Check if filename exists. Return profile information if there's any export async function checkOwnClientInfo(filename?: string): Promise<{ id: string }> { return new Promise((resolve, reject) => { // Check if the file exists if (fs.existsSync(`${filename}.json`)) { try { // Read the file contents const fileData = fs.readFileSync(`${filename}.json`, 'utf8'); // If the file is empty, return an error if (fileData.trim() === "") { throw new Error("File is empty"); } // Parse and return the data if present const jsonData = JSON.parse(fileData); resolve(jsonData) } catch (err) { // Handle parsing errors or other file-related errors console.log({ message: "Error reading or parsing file:", details: err }) reject(''); } } else { console.log({ message: "File does not exist" }) reject(''); } }) } // this is for server usage only export function startListening(socket: SocketForConnectedClient, client: ConnectedClientSocket, eventListener: Observer, oldClient?: boolean): void { // notify it's associated retransmission to start releaseing buffer eventListener.next({ id: uuidv4(), event: oldClient ? 'Client Re-connected' : `Client Connected`, data: { clientId: client.id, message: `Socket Client ${oldClient ? `Re-Connected` : `Connected`}. Adapter ID assigned: ${client.id}`, payload: client } as EventMessage }) // Resume operation // some explanation here. For the case where the server reads from the DB, no need to terminate subject, since all instances would be destroyed alongside the server shut down. This case is specificd only when there's a need to read from local file if (!client.connectionState) { client.connectionState = new BehaviorSubject(`ONLINE`) } else { client.connectionState.next(`ONLINE`) } /* Generally, we don't need this unless in the case of being the receiver */ socket.on('message', (message: any) => { console.log({ message: `Message from client ${client.id}`, details: message }) eventListener.next({ id: uuidv4(), event: 'New Message', data: { id: uuidv4(), dateCreated: new Date(), transport: Transport.Websocket, target: client.id, // this ref to be associated with the client/channel payload: message } as TransportMessage }) }) socket.on('disconnect', () => { eventListener.next({ id: uuidv4(), event: 'Client Disconnected', data: { clientId: client.id, message: '', payload: { time: new Date() } } as EventMessage }) eventListener.error(`Client ${client.id} disconnected. Terminating this observable event for this client socket...`) eventListener.complete() }) }