@@ -1,16 +1,20 @@
import { Socket as ClientSocket, io } from 'socket.io-client'
import { Server, Socket as SocketForConnectedClient } from "socket.io"
-import { Observable, Subject } from "rxjs";
+import { Observable, Observer, Subject } from "rxjs";
import { createServer, request as httpRequest } from "http";
-import http, { IncomingMessage, ServerResponse } from 'http';
+import express, { Response } from 'express';
+import { Express } from 'express';
+import { postAxiosRequest } from '../utils/http.utils';
+import axios from 'axios';
let fromServer = new Subject<{ event: 'profile' | 'message', payload: any }>()
let toServer = new Subject<{ event: 'profile' | 'message', payload: any }>()
+startHttpServer(3001).then((app: Express) => {
+ operateHttpServer(app, 'http://localhost:3000/')
function consoleLog(): void {
@@ -55,6 +59,7 @@ function startSocketServer(port: number): void {
function startClientSocketConnection(serverUrl: string): void {
let clientSocket: ClientSocket = io(serverUrl, {
@@ -84,187 +89,143 @@ function startClientSocketConnection(serverUrl: string): void {
+async function startHttpServer(port: number): Promise<Express> {
+ return new Promise((resolve, reject) => {
+ let app: Express = express();
+ app.use(express.json());
+ app.listen(port, () => {
+ console.log({ message: `Server running at http://localhost:${port}` });
+ });
+ resolve(app)
+ })
-function startHttpServer(port: number, clientUrl: string): void {
- const clientObservable = startHttpClientConnection(clientUrl);
- const server = http.createServer(async (req: IncomingMessage, res: ServerResponse) => {
- const { method, url } = req;
- if (method === 'POST' && url === '/profile') {
- let body = '';
- req.on('data', chunk => (body += chunk));
- req.on('end', () => {
- const options = {
- hostname: 'localhost',
- port: 3000,
- path: '/profile',
- method: 'POST',
- headers: {
- 'Content-Type': 'application/json',
- 'Content-Length': Buffer.byteLength(body),
- },
- };
- const forwardReq = http.request(options, forwardRes => {
- let forwardBody = '';
- forwardRes.on('data', chunk => (forwardBody += chunk));
- forwardRes.on('end', () => {
- res.writeHead(forwardRes.statusCode || 200, { 'Content-Type': 'application/json' });
- res.end(forwardBody);
- });
- });
- forwardReq.on('error', error => {
- console.error('Error forwarding /profile request:', error);
- res.writeHead(500, { 'Content-Type': 'application/json' });
- res.end(JSON.stringify({ error: 'Failed to forward request' }));
- });
- forwardReq.write(body);
- forwardReq.end();
- });
- } else if (method === 'POST' && url === '/response') {
- res.writeHead(200, { 'Content-Type': 'application/json' });
- const subscription = clientObservable.subscribe({
- next: data => {
- res.write(JSON.stringify(data));
- res.end();
- subscription.unsubscribe();
- },
- error: error => {
- console.error('Error in observable:', error);
- res.write(JSON.stringify({ error: 'Error occurred' }));
- res.end();
- },
- });
- req.on('close', () => {
- subscription.unsubscribe();
- });
- } else {
- res.writeHead(404, { 'Content-Type': 'application/json' });
- res.end(JSON.stringify({ error: 'Not Found' }));
- }
- });
+function operateHttpServer(app: Express, url: string): void {
+ app.post('/profile', (req, res) => {
+ postAxiosRequest(url + `profile`, req.body).then((response) => {
+ res.json(response)
+ }).catch((error) => {
+ console.log(error)
+ res.json(error)
+ })
+ })
- server.listen(port, () => {
- console.log(`Server is running on http://localhost:${port}`);
- });
- * Starts a long-polling HTTP connection to the given URL and channels responses into an Observable.
- * @param url - The URL to send the long-polling requests to.
- * @returns Observable<any> - An observable emitting the response data for each poll.
- */
-function startHttpClientConnection(url: string): Observable<any> {
- return new Observable(subscriber => {
- const poll = () => {
- const options = new URL(url);
- const req = http.request(
- {
- hostname: options.hostname,
- port: options.port,
- path: options.pathname + options.search,
- method: 'GET',
- },
- res => {
- let body = '';
- res.on('data', chunk => {
- body += chunk;
- });
- res.on('end', () => {
- try {
- const parsedData = JSON.parse(body);
- subscriber.next(parsedData);
- poll();
- } catch (error: any) {
- subscriber.error('Error parsing response: ' + error.message);
- }
- });
- }
- );
+ app.post('/message', (req, res) => {
+ postAxiosRequest(url + `message`, req.body).then((response) => {
+ console.log(response)
+ res.json(response)
+ }).catch((error) => {
+ console.log(error)
+ res.json(error)
+ })
+ })
- req.on('error', error => {
- subscriber.error('Request failed: ' + error.message);
- });
+ app.get('/poll', (req, res) => {
+ console.log('Client connected for long polling.');
+ let responseSent = false;
+ const subscription = handleClientHttpConnection(url).subscribe({
+ next: (message: any) => {
+ if (!responseSent) {
+ console.log(`Sending data to client: ${JSON.stringify(message)}`);
+ res.json(message);
+ responseSent = true;
+ subscription.unsubscribe();
+ }
+ },
+ error: (err: any) => {
+ if (!responseSent) {
+ console.error('Error in data stream:');
+ res.status(500).send('Internal Server Error');
+ responseSent = true;
+ }
+ subscription.unsubscribe();
+ },
+ complete: () => {
+ if (!responseSent) {
+ console.log('Data stream completed.');
+ res.status(204).send();
+ responseSent = true;
+ }
+ subscription.unsubscribe();
+ },
+ });
- req.end();
- };
+ const timeout = setTimeout(() => {
+ if (!responseSent) {
+ console.log({ message: 'No data emitted. Sending timeout response.' });
+ res.status(204).send();
+ responseSent = true;
+ subscription.unsubscribe();
+ }
+ }, 15000);
- poll();
+ res.on('close', () => {
+ if (!responseSent) {
+ console.error(`Http Client disconnected`);
- return () => {
- console.log('Stopping long-polling connection.');
- };
+ subscription.unsubscribe();
+ }
+ clearTimeout(timeout);
+ });
+export function handleClientHttpConnection(url: string): Observable<any> {
+ return new Observable((eventNotification: Observer<any>) => {
+ let active: boolean = true;
-const parseBody = (req: IncomingMessage): Promise<any> => {
- return new Promise((resolve, reject) => {
- let body = '';
- req.on('data', chunk => {
- body += chunk;
- });
- req.on('end', () => {
- try {
- resolve(JSON.parse(body));
- } catch (error) {
- reject(error);
+ const longPoll = async () => {
+ while (active) {
+ try {
+ const response = await axios.get(`${url}poll`);
+ if (response.status === 200) {
+ const data = response.data;
+ eventNotification.next(data)
+ } else if (response.status === 204) {
+ console.log('No new messages from the server.');
+ } else {
+ throw new Error(`Unexpected response status: ${response.status}`);
+ }
+ } catch (error: unknown) {
+ console.error(`Unknown Error.`)
+ let errorMessage: string;
+ if (axios.isAxiosError(error)) {
+ if (error.response) {
+ errorMessage = `Server returned status ${error.response.status}: ${error.response.statusText}`;
+ } else if (error.code === 'ECONNABORTED') {
+ errorMessage = 'Request timed out.';
+ } else {
+ errorMessage = error.message || 'An Axios error occurred.';
+ }
+ } else if (error instanceof Error) {
+ errorMessage = error.message;
+ } else {
+ errorMessage = 'An unknown error occurred during polling.';
+ }
+ console.error(`Polling error: ${errorMessage}`);
+ break;
+ }
- });
- req.on('error', reject);
- });
-const forwardRequest = (data: any): Promise<any> => {
- return new Promise((resolve, reject) => {
- const options = {
- hostname: 'localhost',
- port: 3000,
- path: '/profile',
- method: 'POST',
- headers: {
- 'Content-Type': 'application/json',
- 'Content-Length': Buffer.byteLength(JSON.stringify(data)),
- },
- const req = httpRequest(options, res => {
- let responseBody = '';
- res.on('data', chunk => {
- responseBody += chunk;
- });
- res.on('end', () => {
- try {
- resolve(JSON.parse(responseBody));
- } catch (error) {
- reject(error);
- }
- });
- });
+ longPoll();
- req.on('error', reject);
- req.write(JSON.stringify(data));
- req.end();
+ return () => {
+ console.log({ message: 'Unsubscribed from the long-polling channel.' });
+ eventNotification.complete();
+ };