import { Observable, Observer, Subject } from 'rxjs'; import { createServer } from 'http'; import { Server, Socket as SocketForConnectedClient } from 'socket.io'; import { io, Socket as ClientSocket } from 'socket.io-client'; import * as fs from 'fs' import { ReceiverProfile, TransportEventNotification, TransportMessage } from '../interface/ITransport.interface'; import { v4 as uuidv4 } from 'uuid' export function startSocketServer(port: number): Observable { return new Observable((observer) => { try { let httpServer = createServer(); let socketServer = new Server(httpServer) // something wrong here socketServer.on('connection', (socket) => { observer.next(socket) }) socketServer.engine.on("connection_error", (err) => { console.log(err.req); // the request object console.log(err.code); // the error code, for example 1 console.log(err.message); // the error message, for example "Session ID unknown" console.log(err.context); // some additional error context }); // Start the socketServer httpServer.listen(port) } catch (error) { observer.error(error) } }) } export async function startClientSocketConnection(serverUrl: string): Promise { return new Promise((resolve, reject) => { try { let clientSocket = io(serverUrl) // let clientSocket = io(serverUrl, { // reconnection: true, // Enable automatic reconnections // reconnectionAttempts: 100, // Retry up to 10 times // reconnectionDelay: 500, // Start with a 500ms delay // reconnectionDelayMax: 10000, // Delay can grow to a max of 10 seconds // randomizationFactor: 0.3, // }) // Listen for a connection event clientSocket.on('connect', () => { console.log('Connected to the server:', clientSocket.id) resolve(clientSocket) }); } catch (error) { reject(error) } }) } // Specifically to write receiver profile information export async function writeFile(data: ReceiverProfile, filename: string): Promise { return new Promise((resolve, reject) => { // Write JSON data to a file fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => { if (err) { console.error('Error writing file', err); reject(false) } else { console.log('File has been written'); resolve(true) } }); }) } // After establishing connection to the server, set up the credentials, confirm whether or not if there's any credentials, if not ask for one from the server export function handleClientSocketConnection(socket: ClientSocket, incomingMessage: Subject): Observable { return new Observable((eventNotification: Observer) => { let buffer: any[] = [] let receiverProfileInfo!: ReceiverProfile checkOwnClientInfo('client1').then((profile: ReceiverProfile) => { receiverProfileInfo = profile socket.emit('profile', { name: 'Old Client', data: profile }) }).catch((error) => { socket.emit('profile', { name: 'New Client', data: null }) }) // Listen for messages from the server. Generally here's the responses socket.on('message', (msg: any) => { console.log(`Websocket Client Transport Receieve Msg`, msg.id) if (receiverProfileInfo) { eventNotification.next({ event: 'New Message', description: 'Received new message', transportType: 'WEBSOCKET', data: msg }) incomingMessage.next({ id: msg.header.MessageID, receiverID: receiverProfileInfo.uuid, payload: msg, event: 'New Message' }) } else { // Do nothing. just store in local array first. Cannot process without information. but then again, don['t need information if acting as client // but for consistency sake, will impose the standard buffer.push(msg) // store locally for now } }) socket.on('profile', (data: { name: string, message: any }) => { // console.log(data) if (data.name == 'New Profile') { console.log(`Assigned client Name: ${(data.message as ReceiverProfile).name}`) receiverProfileInfo = data.message as ReceiverProfile writeFile(data.message as ReceiverProfile, (data.message as ReceiverProfile).name).then(() => [ // broadcast event to allow retransmission to release buffer eventNotification.next({ event: 'Connection', description: 'Profile acquired || updated and stored', transportType: 'WEBSOCKET', }) ]).catch((error) => { }) // do nothing at the moment. } if (data.name == 'Adjusted Profile') { console.log(`Assigned client Name: ${(data.message as ReceiverProfile).name}`) receiverProfileInfo = data.message as ReceiverProfile writeFile(data.message as ReceiverProfile, (data.message as ReceiverProfile).name).then(() => [ // broadcast event to allow retransmission to release buffer eventNotification.next({ event: 'Connection', description: 'Profile acquired || updated and stored', transportType: 'WEBSOCKET', }) ]).catch((error) => { }) // do nothing at the moment. } if (data.name == 'Error') { console.log(`Server cannot find credentials`, data.message) // logic to request for new credentials } }) // Handle disconnection socket.on('disconnect', () => { console.log('Websocket Client disconnected from the server'); if (receiverProfileInfo) { eventNotification.next({ event: 'Disconnection', description: 'Disconnected from the server', transportType: 'WEBSOCKET' }) } }); }) } // Check if filename exists. Return profile information if there's any export async function checkOwnClientInfo(filename: string): Promise { return new Promise((resolve, reject) => { // Check if the file exists if (fs.existsSync(`${filename}.json`)) { try { // Read the file contents const fileData = fs.readFileSync(`${filename}.json`, 'utf8'); // If the file is empty, return an error if (fileData.trim() === "") { throw new Error("File is empty"); } // Parse and return the data if present const jsonData = JSON.parse(fileData); resolve(jsonData) } catch (err) { // Handle parsing errors or other file-related errors console.error("Error reading or parsing file:", err); reject(''); } } else { console.error("File does not exist"); reject(''); } }) } // For SERVER Usage: set up socket listeners to start listening for different events export function handleNewSocketClient(socket: SocketForConnectedClient, eventNotification: Subject, socketReceiverProfile: ReceiverProfile[]): void { console.log(`Setting up listeners for socket:${socket.id}`) // returns the socket client instance // listen to receiver's initiotion first before assigning 'credentials' socket.on(`profile`, (message: { name: string, data: ReceiverProfile }) => { if (message.name == 'New Client') { let receiverProfile: ReceiverProfile = { uuid: uuidv4(), name: `Client${uuidv4()}`, dateCreated: new Date(), transportType: `WEBSOCKET`, eventNotification: new Subject(), instance: socket } // publish first event notification eventNotification.next({ event: 'Connection', description: 'New Client Connected', transportType: 'WEBSOCKET', data: { receiverID: receiverProfile.uuid, receiverName: receiverProfile.name, date: new Date(), payload: receiverProfile } }) // send to receiver for reference socket.emit('profile', { name: `New Profile`, message: { uuid: receiverProfile.uuid, name: receiverProfile.name, dateCreated: receiverProfile.dateCreated, transportType: `WEBSOCKET`, eventNotification: null, instance: null // have to put null, otherwise circular reference maximum stack error } }) socketReceiverProfile.push(receiverProfile) startListening(socket, receiverProfile) } else { // update first let receiverProfile: ReceiverProfile | undefined = socketReceiverProfile.find(obj => obj.uuid === message.data.uuid) if (receiverProfile) { receiverProfile.instance = socket socket.emit('profile', { name: 'Adjusted Profile', message: receiverProfile }) // need to start listening again, because it's assigned a different socket instance this time round startListening(socket, receiverProfile) } else { socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' }) } } }) } export function startListening(socket: SocketForConnectedClient, receiverProfile: ReceiverProfile): void { /* Generally, we don't need this unless in the case of being the receiver */ socket.on('message', (message: any) => { // here }) socket.on('request', (request: any) => { // here : Let's say there's a subcsription request here receiverProfile.eventNotification.next({ event: 'New Message', description: 'Incoming request', transportType: 'WEBSOCKET', data: { receiverID: receiverProfile.uuid, receiverName: receiverProfile.name, date: new Date(), payload: request } }) }) socket.on('notification', (notification: any) => { // logic here }) socket.on('disconnect', () => { receiverProfile.eventNotification.next( { event: 'Disconnection', description: `Existing Client ${socket.id} disonnected`, transportType: `WEBSOCKET`, data: { receiverID: receiverProfile.uuid, receiverName: receiverProfile.name, date: new Date(), } } ) }) }