socket.utils.ts 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. import { BehaviorSubject, Observable, Observer, Subject } from 'rxjs';
  2. import { createServer } from 'http';
  3. import { Server, Socket as SocketForConnectedClient } from 'socket.io';
  4. import { io, Socket as ClientSocket } from 'socket.io-client';
  5. import * as fs from 'fs'
  6. import { v4 as uuidv4 } from 'uuid'
  7. import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
  8. import { ConnectedClientSocket, ConnectedServerSocket } from '../transport/websocket';
  9. import { EventMessage } from '../interface/transport.interface';
  10. import ConsoleLogger from './log.utils';
  11. const console: ConsoleLogger = new ConsoleLogger(`SocketUtils`, ['transport'])
  12. export function startSocketServer(port: number): Observable<SocketForConnectedClient> {
  13. return new Observable((observer) => {
  14. try {
  15. console.log({ message: `Socket Server ${port} Started....` })
  16. let httpServer = createServer();
  17. let socketServer = new Server(httpServer)
  18. // something wrong here
  19. socketServer.on('connection', (socket) => {
  20. observer.next(socket)
  21. })
  22. socketServer.engine.on("connection_error", (err) => {
  23. console.log({ message: `Socket Server ${port} Connection Error`, details: err.req })
  24. console.log({ message: `Socket Server ${port} Connection Error`, details: err.code })
  25. console.log({ message: `Socket Server ${port} Connection Error`, details: err.message })
  26. console.log({ message: `Socket Server ${port} Connection Error`, details: err.context })
  27. });
  28. // Start the HTTP server on 127.0.0.1 with the given port
  29. httpServer.listen(port, '0.0.0.0', () => {
  30. console.log({ message: `Socket server listening on ${port}` });
  31. });
  32. } catch (error) {
  33. observer.error(error);
  34. }
  35. })
  36. }
  37. export async function startClientSocketConnection(serverUrl: string): Promise<ClientSocket> {
  38. return new Promise((resolve, reject) => {
  39. try {
  40. // let clientSocket = io(serverUrl)
  41. let clientSocket: ClientSocket = io(serverUrl, {
  42. reconnection: true, // Enable automatic reconnections
  43. reconnectionAttempts: 1000, // Retry up to 10 times
  44. reconnectionDelay: 500, // Start with a 500ms delay
  45. reconnectionDelayMax: 10000, // Delay can grow to a max of 10 seconds
  46. randomizationFactor: 0.3,
  47. })
  48. resolve(clientSocket)
  49. }
  50. catch (error) {
  51. reject(error)
  52. }
  53. })
  54. }
  55. // After establishing connection to the server, set up the credentials, confirm whether or not if there's any credentials, if not ask for one from the server
  56. export function handleClientSocketConnection(socket: ClientSocket, serversConnected: ConnectedServerSocket[]): Observable<TransportEvent> {
  57. return new Observable((eventNotification: Observer<TransportEvent>) => {
  58. let buffer: any[] = []
  59. let receiverProfileInfo!: ConnectedServerSocket
  60. // Listen for a connection event
  61. socket.on('connect', () => {
  62. console.log({ message: `Connected to the server ${socket.id} ` })
  63. if (receiverProfileInfo?.id) {
  64. checkOwnClientInfo(receiverProfileInfo.id).then((profile: { id: string }) => {
  65. socket.emit('profile', {
  66. name: 'Old Client',
  67. data: profile
  68. })
  69. }).catch((error) => {
  70. socket.emit('profile', {
  71. name: 'New Client',
  72. data: null
  73. })
  74. })
  75. } else {
  76. socket.emit('profile', {
  77. name: 'New Client',
  78. data: null
  79. })
  80. }
  81. });
  82. // Listen for messages from the server. Generally here's the responses
  83. socket.on('message', (msg: any) => {
  84. // console.log(`Websocket Client Transport Receieve Msg`, msg)
  85. if (receiverProfileInfo) {
  86. // publish to event
  87. eventNotification.next({
  88. id: uuidv4(),
  89. event: 'New Message',
  90. data: {
  91. id: uuidv4(),
  92. dateCreated: new Date(),
  93. transport: Transport.Websocket,
  94. target: receiverProfileInfo.id,
  95. payload: msg
  96. } as TransportMessage
  97. })
  98. } else {
  99. // Do nothing. just store in local array first. Cannot process without information. but then again, don['t need information if acting as client
  100. // but for consistency sake, will impose the standard
  101. buffer.push(msg) // store locally for now
  102. }
  103. })
  104. socket.on('profile', (data: { name: string, message: any }) => {
  105. if (data.name == 'New Profile') {
  106. console.log({ message: `Assigned client Name: ${data.message.id}` })
  107. // Update websocket instance record
  108. receiverProfileInfo = {
  109. id: data.message.id,
  110. dateCreated: new Date(),
  111. socketInstance: socket,
  112. connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
  113. }
  114. writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
  115. // broadcast event to allow transmission manager to instantiate transmission components
  116. eventNotification.next({
  117. id: uuidv4(),
  118. event: `New Server`,
  119. data: {
  120. clientId: (data.message as ConnectedServerSocket).id,
  121. message: `New Websocket Channel ${(data.message as ConnectedServerSocket).id} established.`
  122. } as EventMessage
  123. })
  124. // broadcast event to allow retransmission to relase buffered messages
  125. eventNotification.next({
  126. id: uuidv4(),
  127. event: `Server Connected`,
  128. data: {
  129. clientId: (data.message as ConnectedServerSocket).id,
  130. message: `Server ${(data.message as ConnectedServerSocket).id} connected and ready to go.`
  131. } as EventMessage
  132. })
  133. }).catch((error) => { }) // do nothing at the moment.
  134. serversConnected.push(receiverProfileInfo)
  135. }
  136. if (data.name == 'Adjusted Profile') {
  137. console.log({ message: `Adjusted client Name: ${(data.message as ConnectedServerSocket).id}` })
  138. // Update websocket instance record
  139. let clientObj: ConnectedServerSocket | undefined = serversConnected.find(obj => obj.id === data.message.id)
  140. if (clientObj) {
  141. receiverProfileInfo.id = (data.message.id)
  142. clientObj.id = receiverProfileInfo.id
  143. clientObj.socketInstance = socket
  144. clientObj.connectionState.next('ONLINE')
  145. console.log({
  146. message: `Just to make sure they are pointed accurately: This should be ONLINE: ${receiverProfileInfo.connectionState.getValue()} !! Id match? ${receiverProfileInfo.id == clientObj.id ? true : false}`,
  147. })
  148. }
  149. writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
  150. // broadcast event to allow retransmission to release buffer
  151. eventNotification.next({
  152. id: uuidv4(),
  153. event: 'Server Connected',
  154. data: {
  155. clientId: (data.message as ConnectedServerSocket).id,
  156. message: `Existing Websocket Channel ${(data.message as ConnectedServerSocket).id} re-established.`
  157. } as EventMessage
  158. })
  159. }).catch((error) => { }) // do nothing at the moment.
  160. }
  161. if (data.name == 'Error') {
  162. console.log({ message: `Server cannot find credentials`, details: data.message })
  163. // logic to request for new credentials
  164. setTimeout(() => {
  165. socket.emit('profile', {
  166. name: 'New Client',
  167. data: null
  168. })
  169. }, 2000)
  170. }
  171. })
  172. // Handle disconnection
  173. socket.on('disconnect', () => {
  174. console.log({ message: `Socket Server ${receiverProfileInfo.id} Disconnected` })
  175. if (receiverProfileInfo) {
  176. eventNotification.next({
  177. id: uuidv4(),
  178. event: `Server Disconnected`,
  179. data: {
  180. clientId: receiverProfileInfo.id,
  181. message: `Server for Channel ${receiverProfileInfo.id} disconnected.`
  182. } as EventMessage
  183. })
  184. receiverProfileInfo.connectionState.next(`OFFLINE`)
  185. }
  186. });
  187. })
  188. }
  189. // For SERVER Usage: set up socket listeners to start listening for different events
  190. export function handleNewSocketClient(socket: SocketForConnectedClient, connectedClientSocket: ConnectedClientSocket[]): Observable<TransportEvent> {
  191. return new Observable((event: Observer<TransportEvent>) => {
  192. console.log({ message: `Setting up listeners for socket:${socket.id}` })
  193. // returns the socket client instance
  194. // listen to receiver's initiotion first before assigning 'credentials'
  195. socket.on(`profile`, (message: { name: string, data: any }) => {
  196. if (message.name == 'New Client') {
  197. let clientInstance: ConnectedClientSocket = {
  198. id: uuidv4(), // client should only be assigned at this level. And is passed around for reference pointing
  199. dateCreated: new Date(),
  200. socketInstance: socket,
  201. connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`)
  202. }
  203. // send to receiver for reference
  204. socket.emit('profile', {
  205. name: `New Profile`, message: { id: clientInstance.id }
  206. })
  207. // publish first event notification
  208. event.next({
  209. id: uuidv4(),
  210. event: `New Client`,
  211. data: {
  212. clientId: clientInstance.id,
  213. message: `New Socket Client Connected. Adapter ID assigned: ${clientInstance.id}`,
  214. payload: clientInstance
  215. } as EventMessage
  216. })
  217. // Update connected clientInstance info to adapter
  218. connectedClientSocket.push(clientInstance)
  219. addClientToDB(clientInstance)
  220. startListening(socket, clientInstance, event)
  221. } else {
  222. // update first
  223. let clientInstance: ConnectedClientSocket | undefined
  224. if (connectedClientSocket.length > 0) {
  225. clientInstance = connectedClientSocket.find(obj => obj.id === message.data.id)
  226. handleFoundClient(clientInstance)
  227. } else {
  228. // for the case server itself got shit down or something
  229. checkIfClientExists(message.data.id).then((client: ConnectedClientSocket) => {
  230. clientInstance = client
  231. handleFoundClient(clientInstance)
  232. }).catch(error => {
  233. console.log({ message: `Promise Error`, details: error })
  234. })
  235. }
  236. function handleFoundClient(clientInstance: ConnectedClientSocket | undefined) {
  237. if (clientInstance) {
  238. console.log({ message: `Socket Client ${clientInstance.id} Found` })
  239. socket.emit('profile', { name: 'Adjusted Profile', message: { id: clientInstance.id } })
  240. // replace socket instance since the previous has been terminated
  241. clientInstance.socketInstance = socket
  242. // need to start listening again, because it's assigned a different socket instance this time round
  243. startListening(socket, clientInstance, event, true)
  244. } else {
  245. console.log({ message: `Profile Not Found` })
  246. socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' })
  247. }
  248. }
  249. }
  250. })
  251. })
  252. }
  253. // Specifically to write receiver profile information
  254. export async function writeFile(data: ConnectedServerSocket, filename: string): Promise<boolean> {
  255. return new Promise((resolve, reject) => {
  256. // Write JSON data to a file
  257. fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => {
  258. if (err) {
  259. console.log({ message: 'Error writing file', details: err })
  260. reject(false)
  261. } else {
  262. console.log({ message: 'File has been written', details: err })
  263. resolve(true)
  264. }
  265. });
  266. })
  267. }
  268. /* For Internal Usage only. Temporary serve as a way for server to keep track of clients. To be replaced in the future with better alternatives. */
  269. export function addClientToDB(entry: ConnectedClientSocket, filePath: string = 'clients.json'): void {
  270. try {
  271. let data: ConnectedClientSocket[] = [];
  272. // Check if the file exists and load existing data
  273. if (fs.existsSync(filePath)) {
  274. const fileContent = fs.readFileSync(filePath, 'utf-8');
  275. data = JSON.parse(fileContent);
  276. }
  277. // Append the new details to the array
  278. data.push({
  279. id: entry.id,
  280. dateCreated: entry.dateCreated,
  281. connectionState: null,
  282. socketInstance: null
  283. } as unknown as ConnectedClientSocket);
  284. // Write the updated array back to the file
  285. fs.writeFileSync(filePath, JSON.stringify(data, null, 2), 'utf-8');
  286. console.log({ message: `Entry added successfully.` })
  287. } catch (error) {
  288. console.log({ message: 'Error writing to file:', details: error })
  289. }
  290. }
  291. export async function checkIfClientExists(id: string, filePath: string = 'clients.json'): Promise<ConnectedClientSocket> {
  292. return new Promise((resolve, reject) => {
  293. try {
  294. // Check if the file exists
  295. if (!fs.existsSync(filePath)) {
  296. console.log({ message: "File does not exist." })
  297. reject('File does not exist');
  298. }
  299. // Read and parse the data
  300. const fileContent = fs.readFileSync(filePath, 'utf-8');
  301. const data: any[] = JSON.parse(fileContent);
  302. // Check if an details with the given id exists
  303. let obj = data.find(entry => entry.id === id);
  304. if (obj) {
  305. console.log({ message: "Client with ID ${id} exists." })
  306. } else {
  307. console.log({ message: `Client with ID ${id} does not exist.` })
  308. }
  309. resolve(obj);
  310. } catch (error) {
  311. reject(`Error reading the file`)
  312. }
  313. })
  314. }
  315. // Check if filename exists. Return profile information if there's any
  316. export async function checkOwnClientInfo(filename?: string): Promise<{ id: string }> {
  317. return new Promise((resolve, reject) => {
  318. // Check if the file exists
  319. if (fs.existsSync(`${filename}.json`)) {
  320. try {
  321. // Read the file contents
  322. const fileData = fs.readFileSync(`${filename}.json`, 'utf8');
  323. // If the file is empty, return an error
  324. if (fileData.trim() === "") {
  325. throw new Error("File is empty");
  326. }
  327. // Parse and return the data if present
  328. const jsonData = JSON.parse(fileData);
  329. resolve(jsonData)
  330. } catch (err) {
  331. // Handle parsing errors or other file-related errors
  332. console.log({ message: "Error reading or parsing file:", details: err })
  333. reject('');
  334. }
  335. } else {
  336. console.log({ message: "File does not exist" })
  337. reject('');
  338. }
  339. })
  340. }
  341. // this is for server usage only
  342. export function startListening(socket: SocketForConnectedClient, client: ConnectedClientSocket, eventListener: Observer<TransportEvent>, oldClient?: boolean): void {
  343. // notify it's associated retransmission to start releaseing buffer
  344. eventListener.next({
  345. id: uuidv4(),
  346. event: oldClient ? 'Client Re-connected' : `Client Connected`,
  347. data: {
  348. clientId: client.id,
  349. message: `Socket Client ${oldClient ? `Re-Connected` : `Connected`}. Adapter ID assigned: ${client.id}`,
  350. payload: client
  351. } as EventMessage
  352. })
  353. // Resume operation
  354. // some explanation here. For the case where the server reads from the DB, no need to terminate subject, since all instances would be destroyed alongside the server shut down. This case is specificd only when there's a need to read from local file
  355. if (!client.connectionState) {
  356. client.connectionState = new BehaviorSubject<ConnectionState>(`ONLINE`)
  357. } else {
  358. client.connectionState.next(`ONLINE`)
  359. }
  360. /* Generally, we don't need this unless in the case of being the receiver */
  361. socket.on('message', (message: any) => {
  362. console.log({ message: `Message from client ${client.id}`, details: message })
  363. eventListener.next({
  364. id: uuidv4(),
  365. event: 'New Message',
  366. data: {
  367. id: uuidv4(),
  368. dateCreated: new Date(),
  369. transport: Transport.Websocket,
  370. target: client.id, // this ref to be associated with the client/channel
  371. payload: message
  372. } as TransportMessage
  373. })
  374. })
  375. socket.on('disconnect', () => {
  376. eventListener.next({
  377. id: uuidv4(),
  378. event: 'Client Disconnected',
  379. data: {
  380. clientId: client.id,
  381. message: '',
  382. payload: {
  383. time: new Date()
  384. }
  385. } as EventMessage
  386. })
  387. eventListener.error(`Client ${client.id} disconnected. Terminating this observable event for this client socket...`)
  388. eventListener.complete()
  389. })
  390. }