|
@@ -0,0 +1,433 @@
|
|
|
|
+import express, { Response } from 'express';
|
|
|
|
+import * as fs from 'fs'
|
|
|
|
+import { Express } from 'express';
|
|
|
|
+import { v4 as uuidv4 } from 'uuid'
|
|
|
|
+import { ConnectedHttpClient, ConnectedHttpServer } from "../transport/http";
|
|
|
|
+import { BehaviorSubject, Observable, Observer, Subject, Subscription } from "rxjs";
|
|
|
|
+import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
|
|
|
|
+import { EventMessage, FisMessage } from '../interface/transport.interface';
|
|
|
|
+import { WrappedMessage } from './message.ordering';
|
|
|
|
+import axios, { AxiosResponse } from 'axios';
|
|
|
|
+import { error } from 'console';
|
|
|
|
+
|
|
|
|
+export function startHttpServer(port: number): Observable<ConnectedHttpClient> {
|
|
|
|
+ return new Observable((observer: Observer<ConnectedHttpClient>) => {
|
|
|
|
+ let app: Express = express();
|
|
|
|
+
|
|
|
|
+ // Middleware to parse JSON requests
|
|
|
|
+ app.use(express.json());
|
|
|
|
+
|
|
|
|
+ // Handling a GET request
|
|
|
|
+ app.get('/', (req, res) => {
|
|
|
|
+ res.send('Hello, World!');
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ // Handling a POST request
|
|
|
|
+ app.post('/data', (req, res) => {
|
|
|
|
+ const { name, age } = req.body;
|
|
|
|
+ res.json({ message: `Received data: ${name}, ${age}` });
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ app.listen(port, () => {
|
|
|
|
+ console.log(`Server running at http://localhost:${port}`);
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ observer.next({
|
|
|
|
+ id: uuidv4(),
|
|
|
|
+ dateCreated: new Date(),
|
|
|
|
+ connectionState: new BehaviorSubject<ConnectionState>('ONLINE'),
|
|
|
|
+ instance: app
|
|
|
|
+ } as ConnectedHttpClient)
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+export async function initiateClientToServer(url: string, event: Subject<TransportEvent>, connectedHttpServers: ConnectedHttpServer[], browserEnv?: boolean): Promise<ConnectedHttpServer> {
|
|
|
|
+ /* Here's what needs to be done. Set up profile first before attempting to long poll.
|
|
|
|
+ Essentially, this is setting to receive responses from the server. Need to have additional checkign
|
|
|
|
+ to see if hte server's connection status. With regards to sending request, well just utilize
|
|
|
|
+ the fetch method as written in the service. */
|
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
|
+ let clientName!: string
|
|
|
|
+ let receiverProfileInfo!: ConnectedHttpServer
|
|
|
|
+ if (browserEnv) {
|
|
|
|
+ // logic here for using browser fetch
|
|
|
|
+ } else { // axios methods
|
|
|
|
+ if (clientName) {
|
|
|
|
+ checkOwnClientInfo(clientName).then((profile: ConnectedHttpServer) => {
|
|
|
|
+ receiverProfileInfo = profile
|
|
|
|
+ postAxiosRequest(url + 'profile', { name: 'Old Client', data: profile }).then((profileInfo: any) => {
|
|
|
|
+ writeFile(profileInfo.message, (profileInfo.message as ConnectedHttpServer).id).then((data: any) => {
|
|
|
|
+ console.log(`Assigned client Name: ${(data.message as ConnectedHttpServer).id}`)
|
|
|
|
+ receiverProfileInfo = data.message as ConnectedHttpServer
|
|
|
|
+ writeFile(data.message as ConnectedHttpServer, (data.message as ConnectedHttpServer).id).then(() => {
|
|
|
|
+ event.next({
|
|
|
|
+ id: uuidv4(),
|
|
|
|
+ event: 'Server Reconnected',
|
|
|
|
+ data: {
|
|
|
|
+ clientId: (data.message as ConnectedHttpServer).id,
|
|
|
|
+ message: `Existing Http Channel ${(data.message as ConnectedHttpServer).id} re-established.`
|
|
|
|
+ } as EventMessage
|
|
|
|
+ })
|
|
|
|
+ })
|
|
|
|
+ // Update Http instance record
|
|
|
|
+ let clientObj: ConnectedHttpServer | undefined = connectedHttpServers.find(obj => obj.id === (data.message as ConnectedHttpServer).id)
|
|
|
|
+ if (clientObj) {
|
|
|
|
+ receiverProfileInfo.connectionState = clientObj.connectionState
|
|
|
|
+ clientObj.connectionState.next('ONLINE')
|
|
|
|
+ resolve(clientObj)
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+ })
|
|
|
|
+ }).catch((error) => {
|
|
|
|
+ postAxiosRequest(url + 'profile', { name: 'New Client', data: null }).then((profileInfo: any) => {
|
|
|
|
+ updateProfileAndPublishEvent(clientName, receiverProfileInfo, profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
|
|
|
|
+ resolve(receiverProfileInfo)
|
|
|
|
+ })
|
|
|
|
+ }).catch((error) => {
|
|
|
|
+ reject(error)
|
|
|
|
+ })
|
|
|
|
+ reject(error)
|
|
|
|
+ })
|
|
|
|
+ } else {
|
|
|
|
+ postAxiosRequest(url + 'profile', { name: 'New Client', data: null }).then((profileInfo: any) => {
|
|
|
|
+ updateProfileAndPublishEvent(clientName, receiverProfileInfo, profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
|
|
|
|
+ resolve(receiverProfileInfo)
|
|
|
|
+ })
|
|
|
|
+ }).catch((error) => {
|
|
|
|
+ reject(error)
|
|
|
|
+ })
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+export function handleClientHttpConnection(url: string, server: ConnectedHttpServer): Observable<TransportEvent> {
|
|
|
|
+ return new Observable((observer: Observer<TransportEvent>) => {
|
|
|
|
+ // Recursive function to handle long polling
|
|
|
|
+ const longPoll = async () => {
|
|
|
|
+ try {
|
|
|
|
+ const response = await fetch(url); // Make the HTTP request to the server
|
|
|
|
+ if (response.ok) {
|
|
|
|
+ const data = await response.json() as WrappedMessage;
|
|
|
|
+ observer.next({
|
|
|
|
+ id: uuidv4(),
|
|
|
|
+ event: 'New Message',
|
|
|
|
+ data: {
|
|
|
|
+ id: uuidv4(),
|
|
|
|
+ dateCreated: new Date(),
|
|
|
|
+ transport: Transport.Http,
|
|
|
|
+ target: server.id,
|
|
|
|
+ payload: data
|
|
|
|
+ } as TransportMessage
|
|
|
|
+ }); // Emit the received message to the Observable
|
|
|
|
+ } else if (response.status === 204) {
|
|
|
|
+ // No Content (keep polling for more updates)
|
|
|
|
+ console.log('No new messages from the server.');
|
|
|
|
+ } else {
|
|
|
|
+ throw new Error(`Unexpected response status: ${response.status}`);
|
|
|
|
+ }
|
|
|
|
+ } catch (error) {
|
|
|
|
+ observer.error(error); // Notify observer of any errors
|
|
|
|
+ return; // Stop polling on errors
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Continue polling after processing the response
|
|
|
|
+ setTimeout(longPoll, 0); // Optionally add a delay to avoid overwhelming the server
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ // Start the long polling
|
|
|
|
+ longPoll();
|
|
|
|
+
|
|
|
|
+ // Cleanup logic when the observable is unsubscribed
|
|
|
|
+ return () => {
|
|
|
|
+ console.log('Unsubscribed from the long-polling channel.');
|
|
|
|
+ };
|
|
|
|
+ });
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+async function updateProfileAndPublishEvent(clientName: string | undefined, receiverProfileInfo: ConnectedHttpServer, profile: { name: string, message: any }, event: Subject<TransportEvent>, connectedHttpServers: ConnectedHttpServer[]): Promise<ConnectedHttpServer> {
|
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
|
+ console.log(`Assigned client Name: ${(profile.message as ConnectedHttpServer).id}`)
|
|
|
|
+ receiverProfileInfo = profile.message as ConnectedHttpServer
|
|
|
|
+ writeFile(profile.message, (profile.message as ConnectedHttpServer).id).then(() => {
|
|
|
|
+ clientName = receiverProfileInfo.id
|
|
|
|
+ event.next({
|
|
|
|
+ id: uuidv4(),
|
|
|
|
+ event: `New Server`,
|
|
|
|
+ data: {
|
|
|
|
+ clientId: (profile.message as ConnectedHttpServer).id,
|
|
|
|
+ message: `New Http Channel ${(profile.message as ConnectedHttpServer).id} established.`
|
|
|
|
+ } as EventMessage
|
|
|
|
+ })
|
|
|
|
+ }).catch((error) => {
|
|
|
|
+ reject(error)
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ // Update http instance record
|
|
|
|
+ receiverProfileInfo = {
|
|
|
|
+ id: (profile.message as ConnectedHttpServer).id,
|
|
|
|
+ dateCreated: new Date(),
|
|
|
|
+ connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
|
|
|
|
+ }
|
|
|
|
+ connectedHttpServers.push(receiverProfileInfo)
|
|
|
|
+ resolve(receiverProfileInfo)
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+async function postAxiosRequest(url: string, data: any): Promise<any> {
|
|
|
|
+ return new Promise(async (resolve, reject) => {
|
|
|
|
+ try {
|
|
|
|
+ const response: AxiosResponse<any> = await axios.post(url, data);
|
|
|
|
+ console.log('Response:', response.data);
|
|
|
|
+ resolve(response.data)
|
|
|
|
+ } catch (error) {
|
|
|
|
+ if (axios.isAxiosError(error)) {
|
|
|
|
+ console.error('Axios Error:', error.message);
|
|
|
|
+ } else {
|
|
|
|
+ console.error('Unexpected Error:', error);
|
|
|
|
+ }
|
|
|
|
+ reject(error)
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+export function handleHttpClient(clientInfo: ConnectedHttpClient, connectedClientHttp: ConnectedHttpClient[]): Observable<TransportEvent> {
|
|
|
|
+ return new Observable((event: Observer<TransportEvent>) => {
|
|
|
|
+ clientInfo.instance.post('/profile', (req, res) => {
|
|
|
|
+ // Client will declare this first before attempting to poll for response channel
|
|
|
|
+ handleProfile(clientInfo.instance, req.body, res, event, connectedClientHttp)
|
|
|
|
+ });
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+export function handleProfile(app: Express, data: { name: `Old Client` | `New Client`, message: any }, res: Response, event: Observer<TransportEvent>, connectedClientHttp: ConnectedHttpClient[]): void {
|
|
|
|
+ if (data.name == `New Client`) {
|
|
|
|
+ let clientInstance: ConnectedHttpClient = {
|
|
|
|
+ id: uuidv4(), // client should only be assigned at this level. And is passed around for reference pointing
|
|
|
|
+ dateCreated: new Date(),
|
|
|
|
+ instance: app,
|
|
|
|
+ connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`), // for now it's offline because it needs to establish the long polling first
|
|
|
|
+ responseStream: new Subject<WrappedMessage>()
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // send to receiver for reference
|
|
|
|
+ res.json({
|
|
|
|
+ name: `New Client`, message: { id: clientInstance.id }
|
|
|
|
+ })
|
|
|
|
+ // publish first event notification
|
|
|
|
+ event.next({
|
|
|
|
+ id: uuidv4(),
|
|
|
|
+ event: `New Client`,
|
|
|
|
+ data: {
|
|
|
|
+ clientId: clientInstance.id,
|
|
|
|
+ message: `New Http Client Connected. Adapter ID assigned: ${clientInstance.id}`,
|
|
|
|
+ payload: clientInstance
|
|
|
|
+ } as EventMessage
|
|
|
|
+ })
|
|
|
|
+ // Update connected clientInstance info to adapter
|
|
|
|
+ connectedClientHttp.push(clientInstance)
|
|
|
|
+ addClientToDB(clientInstance)
|
|
|
|
+ startListeningAndStreaming(app, clientInstance, event)
|
|
|
|
+ } else {
|
|
|
|
+ // update first
|
|
|
|
+ let clientInstance: ConnectedHttpClient | undefined
|
|
|
|
+ if (connectedClientHttp.length > 0) {
|
|
|
|
+ clientInstance = connectedClientHttp.find(obj => obj.id === data.message.id)
|
|
|
|
+ handleFoundClient(clientInstance)
|
|
|
|
+ } else {
|
|
|
|
+ // for the case server itself got shit down or something
|
|
|
|
+ checkIfClientExists(data.message.id).then((client: ConnectedHttpClient) => {
|
|
|
|
+ clientInstance = client
|
|
|
|
+ handleFoundClient(clientInstance)
|
|
|
|
+ }).catch(error => console.error(error))
|
|
|
|
+ }
|
|
|
|
+ function handleFoundClient(clientInstance: ConnectedHttpClient | undefined): void {
|
|
|
|
+ if (clientInstance) {
|
|
|
|
+ console.log(`Http Client ${clientInstance.id} Found`)
|
|
|
|
+ res.json({ name: 'Adjusted Profile', message: { id: clientInstance.id } })
|
|
|
|
+ // replace socket instance since the previous has been terminated
|
|
|
|
+ clientInstance.instance = app
|
|
|
|
+ // some explanation here. For the case where the server reads from the DB, no need to terminate subject, since all instances would be destroyed alongside the server shut down. This case is specificd only when there's a need to read from local file
|
|
|
|
+ if (!clientInstance.connectionState) {
|
|
|
|
+ clientInstance.connectionState = new BehaviorSubject<ConnectionState>(`OFFLINE`)
|
|
|
|
+ }
|
|
|
|
+ // need to start listening again, because it's assigned a different socket instance this time round
|
|
|
|
+ startListeningAndStreaming(app, clientInstance, event)
|
|
|
|
+ event.next({
|
|
|
|
+ id: uuidv4(),
|
|
|
|
+ event: 'Client Reconnected',
|
|
|
|
+ data: {
|
|
|
|
+ clientId: clientInstance.id,
|
|
|
|
+ message: `Client ${clientInstance.id} connection re-established`,
|
|
|
|
+ payload: clientInstance
|
|
|
|
+ } as EventMessage
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ } else {
|
|
|
|
+ console.log(`Profile Not Found`)
|
|
|
|
+ res.json({ name: 'Error', message: 'Receiver Profile Not found' })
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+export async function checkIfClientExists(id: string, filePath: string = 'clients.json'): Promise<ConnectedHttpClient> {
|
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
|
+ try {
|
|
|
|
+ // Check if the file exists
|
|
|
|
+ if (!fs.existsSync(filePath)) {
|
|
|
|
+ console.log("File does not exist.");
|
|
|
|
+ reject('File does not exist');
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Read and parse the data
|
|
|
|
+ const fileContent = fs.readFileSync(filePath, 'utf-8');
|
|
|
|
+ const data: any[] = JSON.parse(fileContent);
|
|
|
|
+
|
|
|
|
+ // Check if an object with the given id exists
|
|
|
|
+ let obj = data.find(entry => entry.id === id);
|
|
|
|
+
|
|
|
|
+ if (obj) {
|
|
|
|
+ console.log(`Client with ID ${id} exists.`);
|
|
|
|
+ } else {
|
|
|
|
+ console.log(`Client with ID ${id} does not exist.`);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ resolve(obj);
|
|
|
|
+ } catch (error) {
|
|
|
|
+ console.error('Error reading the file:', error);
|
|
|
|
+ reject(`Error reading the file`)
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+/* For Internal Usage only. Temporary serve as a way for server to keep track of clients. To be replaced in the future with better alternatives. */
|
|
|
|
+export function addClientToDB(entry: ConnectedHttpClient, filePath: string = 'clients.json'): void {
|
|
|
|
+ try {
|
|
|
|
+ let data: ConnectedHttpClient[] = [];
|
|
|
|
+
|
|
|
|
+ // Check if the file exists and load existing data
|
|
|
|
+ if (fs.existsSync(filePath)) {
|
|
|
|
+ const fileContent = fs.readFileSync(filePath, 'utf-8');
|
|
|
|
+ data = JSON.parse(fileContent);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Append the new object to the array
|
|
|
|
+ data.push({
|
|
|
|
+ id: entry.id,
|
|
|
|
+ dateCreated: entry.dateCreated,
|
|
|
|
+ connectionState: null,
|
|
|
|
+ instance: null
|
|
|
|
+ } as unknown as ConnectedHttpClient);
|
|
|
|
+
|
|
|
|
+ // Write the updated array back to the file
|
|
|
|
+ fs.writeFileSync(filePath, JSON.stringify(data, null, 2), 'utf-8');
|
|
|
|
+ console.log(`Entry added successfully.`);
|
|
|
|
+ } catch (error) {
|
|
|
|
+ console.error('Error writing to file:', error);
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// this is for server usage only
|
|
|
|
+export function startListeningAndStreaming(app: Express, client: ConnectedHttpClient, eventListener: Observer<TransportEvent>): void {
|
|
|
|
+ // sample
|
|
|
|
+ app.post('/data', (req, res) => {
|
|
|
|
+ const { name, age } = req.body;
|
|
|
|
+ res.json({ message: `Received data: ${name}, ${age}` });
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ /* Generally, we don't need this unless in the case of being the receiver */
|
|
|
|
+ app.post('/message', (req, res) => {
|
|
|
|
+ eventListener.next({
|
|
|
|
+ id: uuidv4(),
|
|
|
|
+ event: 'New Message',
|
|
|
|
+ data: {
|
|
|
|
+ id: uuidv4(),
|
|
|
|
+ dateCreated: new Date(),
|
|
|
|
+ transport: Transport.Http,
|
|
|
|
+ target: client.id, // this ref to be associated with the client/channel
|
|
|
|
+ payload: req.body
|
|
|
|
+ } as TransportMessage
|
|
|
|
+ })
|
|
|
|
+ res.json(`Received ${(req.body as FisMessage)?.header?.messageID ?? `Undefined`}`)
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ app.get('/poll', (req, res) => {
|
|
|
|
+ console.log('Client connected for long polling.');
|
|
|
|
+
|
|
|
|
+ // Subscribe to the data stream
|
|
|
|
+ const subscription: Subscription = client.responseStream.asObservable().subscribe({
|
|
|
|
+ next: (message: WrappedMessage) => {
|
|
|
|
+ console.log(`Sending data to client: ${message}`);
|
|
|
|
+ res.json({ message }); // Send the data to the client
|
|
|
|
+ subscription.unsubscribe(); // End the current request
|
|
|
|
+ },
|
|
|
|
+ error: (err) => {
|
|
|
|
+ console.error('Error in data stream:', err);
|
|
|
|
+ res.status(500).send('Internal Server Error');
|
|
|
|
+ },
|
|
|
|
+ complete: () => {
|
|
|
|
+ console.log('Data stream completed.');
|
|
|
|
+ res.status(204).send(); // No Content
|
|
|
|
+ },
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ // Handle client disconnection
|
|
|
|
+ res.on('close', () => {
|
|
|
|
+ client.connectionState.next('OFFLINE')
|
|
|
|
+ console.log('Client disconnected.');
|
|
|
|
+ subscription.unsubscribe(); // Ensure cleanup
|
|
|
|
+ });
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+// Check if filename exists. Return profile information if there's any
|
|
|
|
+export async function checkOwnClientInfo(filename?: string): Promise<ConnectedHttpServer> {
|
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
|
+ // Check if the file exists
|
|
|
|
+ if (fs.existsSync(`${filename}.json`)) {
|
|
|
|
+ try {
|
|
|
|
+ // Read the file contents
|
|
|
|
+ const fileData = fs.readFileSync(`${filename}.json`, 'utf8');
|
|
|
|
+
|
|
|
|
+ // If the file is empty, return an error
|
|
|
|
+ if (fileData.trim() === "") {
|
|
|
|
+ throw new Error("File is empty");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Parse and return the data if present
|
|
|
|
+ const jsonData = JSON.parse(fileData);
|
|
|
|
+ resolve(jsonData)
|
|
|
|
+
|
|
|
|
+ } catch (err) {
|
|
|
|
+ // Handle parsing errors or other file-related errors
|
|
|
|
+ console.error("Error reading or parsing file:", err);
|
|
|
|
+ reject('');
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ console.error("File does not exist");
|
|
|
|
+ reject('');
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Specifically to write receiver profile information
|
|
|
|
+export async function writeFile(data: ConnectedHttpServer, filename: string): Promise<boolean> {
|
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
|
+ // Write JSON data to a file
|
|
|
|
+ fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => {
|
|
|
|
+ if (err) {
|
|
|
|
+ console.error('Error writing file', err);
|
|
|
|
+ reject(false)
|
|
|
|
+ } else {
|
|
|
|
+ console.log('File has been written');
|
|
|
|
+ resolve(true)
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|