|
@@ -4,12 +4,11 @@ import { Express } from 'express';
|
|
|
import { v4 as uuidv4 } from 'uuid'
|
|
|
import { ConnectedHttpClient, ConnectedHttpServer } from "../transport/http";
|
|
|
import { BehaviorSubject, Observable, Observer, Subject, Subscription } from "rxjs";
|
|
|
-import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
|
|
|
-import { EventMessage, FisMessage } from '../interface/transport.interface';
|
|
|
import { WrappedMessage } from './message.ordering';
|
|
|
import axios, { AxiosError, AxiosResponse } from 'axios';
|
|
|
import ConsoleLogger from './log.utils';
|
|
|
import path from 'path';
|
|
|
+import { ConnectionState, FisMessage, GeneralEvent, TransportMessage } from '../interface/interface';
|
|
|
const console: ConsoleLogger = new ConsoleLogger(`HttpUtils`, ['transport'])
|
|
|
|
|
|
export function startHttpServer(port: number): Observable<ConnectedHttpClient> {
|
|
@@ -23,37 +22,35 @@ export function startHttpServer(port: number): Observable<ConnectedHttpClient> {
|
|
|
console.log({ message: `Server running at http://localhost:${port}` });
|
|
|
});
|
|
|
|
|
|
- observer.next({
|
|
|
- id: uuidv4(),
|
|
|
- dateCreated: new Date(),
|
|
|
- connectionState: new BehaviorSubject<ConnectionState>('ONLINE'),
|
|
|
- instance: app
|
|
|
- } as ConnectedHttpClient)
|
|
|
+
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-export async function initiateClientToServer(url: string, event: Subject<TransportEvent>, connectedHttpServers: ConnectedHttpServer[], receiverProfileInfo: ConnectedHttpServer | undefined, browserEnv?: boolean,): Promise<ConnectedHttpServer> {
|
|
|
+export async function initiateClientToServer(transportServiceId: string, url: string, event: Subject<GeneralEvent<any>>, connectedHttpServers: ConnectedHttpServer[], receiverProfileInfo: ConnectedHttpServer | undefined, browserEnv?: boolean,): Promise<ConnectedHttpServer> {
|
|
|
return new Promise((resolve, reject) => {
|
|
|
if (browserEnv) {
|
|
|
// logic here for using browser fetch
|
|
|
} else { // axios methods
|
|
|
if (receiverProfileInfo) {
|
|
|
console.log({ message: `Is Old profile, reconnecting with server` })
|
|
|
- checkOwnClientInfo(receiverProfileInfo.id).then((profile: ConnectedHttpServer) => {
|
|
|
- receiverProfileInfo!.id = profile.id
|
|
|
+ checkOwnClientInfo(receiverProfileInfo.clientId).then((profile: ConnectedHttpServer) => {
|
|
|
+ receiverProfileInfo!.clientId = profile.clientId
|
|
|
// console.log({ message: 'jsonfile', details: profile })
|
|
|
postAxiosRequest(url + 'profile', { name: 'Old Client', message: profile }).then((profileInfo: { name: string, message: { id: string } }) => {
|
|
|
console.log({ message: `Acknowledged as previous client. Id: ${profileInfo.message.id}` })
|
|
|
event.next({
|
|
|
id: uuidv4(),
|
|
|
+ type: `Transport Event`,
|
|
|
event: 'Server Connected',
|
|
|
+ date: new Date(),
|
|
|
data: {
|
|
|
clientId: profileInfo.message.id,
|
|
|
message: `Existing Http Channel ${profileInfo.message.id} re-established.`
|
|
|
- } as EventMessage
|
|
|
+ },
|
|
|
+ transport: `Http`
|
|
|
})
|
|
|
// Update Http instance record
|
|
|
- let clientObj: ConnectedHttpServer | undefined = connectedHttpServers.find(obj => obj.id === profileInfo.message.id)
|
|
|
+ let clientObj: ConnectedHttpServer | undefined = connectedHttpServers.find(obj => obj.clientId === profileInfo.message.id)
|
|
|
console.log({ message: 'ClientObj', details: clientObj })
|
|
|
console.log({ message: 'ReceiverProfile', details: receiverProfileInfo })
|
|
|
if (clientObj) {
|
|
@@ -68,7 +65,7 @@ export async function initiateClientToServer(url: string, event: Subject<Transpo
|
|
|
}).catch((error) => {
|
|
|
console.error(error)
|
|
|
postAxiosRequest(url + 'profile', { name: 'New Client', data: null }).then((profileInfo: { name: string, message: any }) => {
|
|
|
- updateProfileAndPublishEvent((receiverProfileInfo as ConnectedHttpServer), profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
|
|
|
+ updateProfileAndPublishEvent(transportServiceId, (receiverProfileInfo as ConnectedHttpServer), profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
|
|
|
resolve(receiverProfileInfo)
|
|
|
})
|
|
|
}).catch((error) => {
|
|
@@ -79,7 +76,7 @@ export async function initiateClientToServer(url: string, event: Subject<Transpo
|
|
|
} else {
|
|
|
console.log({ message: `Is New profile, Connecting with server` })
|
|
|
postAxiosRequest(url + 'profile', { name: 'New Client', data: null }).then((profileInfo: { name: string, message: any }) => {
|
|
|
- updateProfileAndPublishEvent(receiverProfileInfo, profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
|
|
|
+ updateProfileAndPublishEvent(transportServiceId, receiverProfileInfo, profileInfo, event, connectedHttpServers).then((receiverProfileInfo) => {
|
|
|
resolve(receiverProfileInfo)
|
|
|
})
|
|
|
}).catch((error) => {
|
|
@@ -91,9 +88,9 @@ export async function initiateClientToServer(url: string, event: Subject<Transpo
|
|
|
}
|
|
|
|
|
|
// For client usage
|
|
|
-export function handleClientHttpConnection(url: string, server: ConnectedHttpServer): Observable<TransportEvent> {
|
|
|
- return new Observable((eventNotification: Observer<TransportEvent>) => {
|
|
|
- console.log({ message: `Long Poll Attempt for ${server.id}` })
|
|
|
+export function handleClientHttpConnection(url: string, server: ConnectedHttpServer): Observable<GeneralEvent<any>> {
|
|
|
+ return new Observable((eventNotification: Observer<GeneralEvent<any>>) => {
|
|
|
+ console.log({ message: `Long Poll Attempt for ${server.clientId}` })
|
|
|
server.connectionState.next('ONLINE');
|
|
|
let active: boolean = true; // Flag to control polling lifecycle
|
|
|
|
|
@@ -111,14 +108,17 @@ export function handleClientHttpConnection(url: string, server: ConnectedHttpSer
|
|
|
console.log({ message: 'Long Poll Response', details: data })
|
|
|
eventNotification.next({
|
|
|
id: uuidv4(),
|
|
|
+ type: `Transport Event`,
|
|
|
event: 'New Message',
|
|
|
+ date: new Date(),
|
|
|
data: {
|
|
|
id: uuidv4(),
|
|
|
dateCreated: new Date(),
|
|
|
- transport: Transport.Http,
|
|
|
- target: server.id,
|
|
|
+ transport: `Http`,
|
|
|
+ target: server.clientId,
|
|
|
payload: data,
|
|
|
- } as TransportMessage,
|
|
|
+ },
|
|
|
+ transport: `Http`
|
|
|
});
|
|
|
} else if (response.status === 204) {
|
|
|
console.log({ message: 'No new messages from the server.' });
|
|
@@ -164,46 +164,54 @@ export function handleClientHttpConnection(url: string, server: ConnectedHttpSer
|
|
|
});
|
|
|
}
|
|
|
|
|
|
-function handleServerConnectionError(active: boolean, observer: Observer<TransportEvent>, server: ConnectedHttpServer): void {
|
|
|
+function handleServerConnectionError(active: boolean, observer: Observer<GeneralEvent<any>>, server: ConnectedHttpServer): void {
|
|
|
server.connectionState.next('OFFLINE');
|
|
|
console.log({ message: 'Server lost connection' });
|
|
|
active = false; // Stop polling
|
|
|
observer.next({
|
|
|
id: uuidv4(),
|
|
|
+ type: `Transport Event`,
|
|
|
event: 'Server Disconnected',
|
|
|
+ date: new Date(),
|
|
|
data: {
|
|
|
- clientId: server.id,
|
|
|
+ clientId: server.clientId,
|
|
|
message: '',
|
|
|
payload: {
|
|
|
- time: new Date(),
|
|
|
objRef: server
|
|
|
},
|
|
|
- } as EventMessage,
|
|
|
+ },
|
|
|
+ transport: `Http`
|
|
|
});
|
|
|
observer.complete()
|
|
|
}
|
|
|
|
|
|
-async function updateProfileAndPublishEvent(receiverProfileInfo: ConnectedHttpServer | undefined, profile: { name: string, message: any }, event: Subject<TransportEvent>, connectedHttpServers: ConnectedHttpServer[]): Promise<ConnectedHttpServer> {
|
|
|
+async function updateProfileAndPublishEvent(transportServiceId: string, receiverProfileInfo: ConnectedHttpServer | undefined, profile: { name: string, message: any }, event: Subject<GeneralEvent<any>>, connectedHttpServers: ConnectedHttpServer[]): Promise<ConnectedHttpServer> {
|
|
|
return new Promise((resolve, reject) => {
|
|
|
- console.log({ message: `Assigned client Name: ${(profile.message as ConnectedHttpServer).id}` })
|
|
|
+ console.log({ message: `Assigned client Name: ${(profile.message as ConnectedHttpServer).clientId}` })
|
|
|
receiverProfileInfo = profile.message as ConnectedHttpServer
|
|
|
- writeFile(profile.message as ConnectedHttpServer, (profile.message as ConnectedHttpServer).id).then(() => {
|
|
|
+ writeFile(profile.message as ConnectedHttpServer, (profile.message as ConnectedHttpServer).clientId).then(() => {
|
|
|
event.next({
|
|
|
id: uuidv4(),
|
|
|
+ type: `Transport Event`,
|
|
|
event: `New Server`,
|
|
|
+ date: new Date(),
|
|
|
data: {
|
|
|
- clientId: (profile.message as ConnectedHttpServer).id,
|
|
|
- message: `New Http Channel ${(profile.message as ConnectedHttpServer).id} established.`
|
|
|
- } as EventMessage
|
|
|
+ clientId: (profile.message as ConnectedHttpServer).clientId,
|
|
|
+ message: `New Http Channel ${(profile.message as ConnectedHttpServer).clientId} established.`
|
|
|
+ },
|
|
|
+ transport: `Http`
|
|
|
})
|
|
|
// broadcast event to allow retransmission to relase buffered messages
|
|
|
event.next({
|
|
|
id: uuidv4(),
|
|
|
+ type: `Transport Event`,
|
|
|
event: `Server Connected`,
|
|
|
+ date: new Date(),
|
|
|
data: {
|
|
|
- clientId: (profile.message as ConnectedHttpServer).id,
|
|
|
- message: `Server ${(profile.message as ConnectedHttpServer).id} connected and ready to go.`
|
|
|
- } as EventMessage
|
|
|
+ clientId: (profile.message as ConnectedHttpServer).clientId,
|
|
|
+ message: `Server ${(profile.message as ConnectedHttpServer).clientId} connected and ready to go.`
|
|
|
+ },
|
|
|
+ transport: `Http`
|
|
|
})
|
|
|
}).catch((error) => {
|
|
|
reject(error)
|
|
@@ -211,9 +219,11 @@ async function updateProfileAndPublishEvent(receiverProfileInfo: ConnectedHttpSe
|
|
|
|
|
|
// Update http instance record
|
|
|
receiverProfileInfo = {
|
|
|
- id: (profile.message as ConnectedHttpServer).id,
|
|
|
+ clientId: (profile.message as ConnectedHttpServer).clientId,
|
|
|
dateCreated: new Date(),
|
|
|
- connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
|
|
|
+ connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`),
|
|
|
+ transport: `Http`,
|
|
|
+ transportServiceId: transportServiceId
|
|
|
}
|
|
|
connectedHttpServers.push(receiverProfileInfo)
|
|
|
resolve(receiverProfileInfo)
|
|
@@ -238,38 +248,43 @@ export async function postAxiosRequest(url: string, data: any): Promise<any> {
|
|
|
}
|
|
|
|
|
|
|
|
|
-export function handleHttpClient(clientInfo: ConnectedHttpClient, connectedClientHttp: ConnectedHttpClient[]): Observable<TransportEvent> {
|
|
|
- return new Observable((event: Observer<TransportEvent>) => {
|
|
|
+export function handleHttpClient(transportServiceId: string, clientInfo: ConnectedHttpClient, connectedClientHttp: ConnectedHttpClient[]): Observable<GeneralEvent<any>> {
|
|
|
+ return new Observable((event: Observer<GeneralEvent<any>>) => {
|
|
|
clientInfo.instance.post('/profile', (req, res) => {
|
|
|
// Client will declare this first before attempting to poll for response channel
|
|
|
- handleProfile(clientInfo.instance, req.body, res, event, connectedClientHttp)
|
|
|
+ handleProfile(transportServiceId, clientInfo.instance, req.body, res, event, connectedClientHttp)
|
|
|
});
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-function handleProfile(app: Express, data: { name: `Old Client` | `New Client`, message: any }, res: Response, event: Observer<TransportEvent>, connectedClientHttp: ConnectedHttpClient[]): void {
|
|
|
+function handleProfile(transportServiceId: string, app: Express, data: { name: `Old Client` | `New Client`, message: any }, res: Response, event: Observer<GeneralEvent<any>>, connectedClientHttp: ConnectedHttpClient[]): void {
|
|
|
if (data.name == `New Client`) {
|
|
|
let clientInstance: ConnectedHttpClient = {
|
|
|
- id: uuidv4(), // client should only be assigned at this level. And is passed around for reference pointing
|
|
|
+ clientId: uuidv4(), // client should only be assigned at this level. And is passed around for reference pointing
|
|
|
dateCreated: new Date(),
|
|
|
instance: app,
|
|
|
connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`), // for now it's offline because it needs to establish the long polling first
|
|
|
- responseStream: new Subject<WrappedMessage>()
|
|
|
+ responseStream: new Subject<WrappedMessage>(),
|
|
|
+ transportServiceId: transportServiceId,
|
|
|
+ transport: `Http`
|
|
|
}
|
|
|
|
|
|
// send to receiver for reference
|
|
|
res.json({
|
|
|
- name: `New Profile`, message: { id: clientInstance.id }
|
|
|
+ name: `New Profile`, message: { id: clientInstance.clientId }
|
|
|
})
|
|
|
// publish first event notification
|
|
|
event.next({
|
|
|
id: uuidv4(),
|
|
|
+ type: `Transport Event`,
|
|
|
event: `New Client`,
|
|
|
+ date: new Date(),
|
|
|
data: {
|
|
|
- clientId: clientInstance.id,
|
|
|
- message: `New Http Client Connected. Adapter ID assigned: ${clientInstance.id}`,
|
|
|
+ clientId: clientInstance.clientId,
|
|
|
+ message: `New Http Client Connected. Adapter ID assigned: ${clientInstance.clientId}`,
|
|
|
payload: clientInstance
|
|
|
- } as EventMessage
|
|
|
+ },
|
|
|
+ transport: `Http`
|
|
|
})
|
|
|
// Update connected clientInstance info to adapter
|
|
|
connectedClientHttp.push(clientInstance)
|
|
@@ -280,7 +295,7 @@ function handleProfile(app: Express, data: { name: `Old Client` | `New Client`,
|
|
|
// update first
|
|
|
let clientInstance: ConnectedHttpClient | undefined
|
|
|
if (connectedClientHttp.length > 0) {
|
|
|
- clientInstance = connectedClientHttp.find(obj => obj.id === data.message.id)
|
|
|
+ clientInstance = connectedClientHttp.find(obj => obj.clientId === data.message.id)
|
|
|
handleFoundClient(clientInstance)
|
|
|
} else {
|
|
|
// for the case server itself got shit down or something
|
|
@@ -291,8 +306,8 @@ function handleProfile(app: Express, data: { name: `Old Client` | `New Client`,
|
|
|
}
|
|
|
function handleFoundClient(clientInstance: ConnectedHttpClient | undefined): void {
|
|
|
if (clientInstance) {
|
|
|
- console.log({ message: `Http Client ${clientInstance.id} Found` })
|
|
|
- res.json({ name: 'Adjusted Profile', message: { id: clientInstance.id } })
|
|
|
+ console.log({ message: `Http Client ${clientInstance. clientId} Found` })
|
|
|
+ res.json({ name: 'Adjusted Profile', message: { id: clientInstance.clientId } })
|
|
|
// replace socket instance since the previous has been terminated
|
|
|
clientInstance.instance = app
|
|
|
// some explanation here. For the case where the server reads from the DB, no need to terminate subject, since all instances would be destroyed alongside the server shut down. This case is specificd only when there's a need to read from local file
|
|
@@ -303,12 +318,15 @@ function handleProfile(app: Express, data: { name: `Old Client` | `New Client`,
|
|
|
startListeningAndStreaming(app, clientInstance, event, true)
|
|
|
event.next({
|
|
|
id: uuidv4(),
|
|
|
+ type: `Transport Event`,
|
|
|
event: 'Client Connected',
|
|
|
+ date: new Date(),
|
|
|
data: {
|
|
|
- clientId: clientInstance.id,
|
|
|
- message: `Client ${clientInstance.id} connection re-established`,
|
|
|
+ clientId: clientInstance.clientId,
|
|
|
+ message: `Client ${clientInstance.clientId} connection re-established`,
|
|
|
payload: clientInstance
|
|
|
- } as EventMessage
|
|
|
+ },
|
|
|
+ transport: `Http`
|
|
|
})
|
|
|
|
|
|
} else {
|
|
@@ -361,7 +379,7 @@ export function addClientToDB(entry: ConnectedHttpClient): void {
|
|
|
|
|
|
// Append the new object to the array
|
|
|
data.push({
|
|
|
- id: entry.id,
|
|
|
+ id: entry.clientId,
|
|
|
dateCreated: entry.dateCreated,
|
|
|
connectionState: null,
|
|
|
instance: null
|
|
@@ -376,19 +394,22 @@ export function addClientToDB(entry: ConnectedHttpClient): void {
|
|
|
}
|
|
|
|
|
|
// this is for server usage only
|
|
|
-export function startListeningAndStreaming(app: Express, client: ConnectedHttpClient, eventListener: Observer<TransportEvent>, oldClient?: boolean): void {
|
|
|
+export function startListeningAndStreaming(app: Express, client: ConnectedHttpClient, eventListener: Observer<GeneralEvent<any>>, oldClient?: boolean): void {
|
|
|
/* Generally, we don't need this unless in the case of being the receiver */
|
|
|
app.post('/message', (req, res) => {
|
|
|
eventListener.next({
|
|
|
id: uuidv4(),
|
|
|
+ type: `Transport Event`,
|
|
|
event: 'New Message',
|
|
|
+ date: new Date(),
|
|
|
data: {
|
|
|
id: uuidv4(),
|
|
|
dateCreated: new Date(),
|
|
|
- transport: Transport.Http,
|
|
|
- target: client.id, // this ref to be associated with the client/channel
|
|
|
+ transport: `Http`,
|
|
|
+ target: client.clientId, // this ref to be associated with the client/channel
|
|
|
payload: req.body
|
|
|
- } as TransportMessage
|
|
|
+ } as TransportMessage,
|
|
|
+ transport: `Http`
|
|
|
})
|
|
|
res.json(`Received ${((req.body as WrappedMessage)?.payload as FisMessage)?.header?.messageID ?? `Undefined`}`)
|
|
|
})
|
|
@@ -400,12 +421,15 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
|
|
|
// notify it's associated retransmission to start releaseing buffer
|
|
|
eventListener.next({
|
|
|
id: uuidv4(),
|
|
|
+ type: `Transport Event`,
|
|
|
event: oldClient ? 'Client Re-connected' : `Client Connected`,
|
|
|
+ date: new Date(),
|
|
|
data: {
|
|
|
- clientId: client.id,
|
|
|
- message: `Socket Client ${oldClient ? `Re-Connected` : `Connected`}. Adapter ID assigned: ${client.id}`,
|
|
|
+ clientId: client.clientId,
|
|
|
+ message: `Socket Client ${oldClient ? `Re-Connected` : `Connected`}. Adapter ID assigned: ${client.clientId}`,
|
|
|
payload: client
|
|
|
- } as EventMessage
|
|
|
+ },
|
|
|
+ transport: `Http`
|
|
|
})
|
|
|
|
|
|
// Flag to track if the response has been sent
|
|
@@ -415,7 +439,7 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
|
|
|
const subscription = client.responseStream.asObservable().subscribe({
|
|
|
next: (message: WrappedMessage) => {
|
|
|
if (!responseSent) {
|
|
|
- console.log({ message: `Sending data ${message.thisMessageID} to client ${client.id}}` });
|
|
|
+ console.log({ message: `Sending data ${message.thisMessageID} to client ${client.clientId}}` });
|
|
|
res.json(message); // Send the data to the client
|
|
|
responseSent = true; // Mark response as sent
|
|
|
subscription.unsubscribe(); // Unsubscribe to close this request
|
|
@@ -452,17 +476,17 @@ export function startListeningAndStreaming(app: Express, client: ConnectedHttpCl
|
|
|
// Handle client disconnection
|
|
|
res.on('close', () => {
|
|
|
if (!responseSent) {
|
|
|
- console.error({ message: `Http Client ${client.id} disconnected` });
|
|
|
+ console.error({ message: `Http Client ${client.clientId} disconnected` });
|
|
|
eventListener.next({
|
|
|
id: uuidv4(),
|
|
|
+ type: `Transport Event`,
|
|
|
event: 'Client Disconnected',
|
|
|
+ date: new Date(),
|
|
|
data: {
|
|
|
- clientId: client.id,
|
|
|
- payload: {
|
|
|
- time: new Date()
|
|
|
- }
|
|
|
- } as EventMessage
|
|
|
- } as TransportEvent)
|
|
|
+ clientId: client.clientId,
|
|
|
+ },
|
|
|
+ transport: `Http`
|
|
|
+ })
|
|
|
client.connectionState.next(`OFFLINE`)
|
|
|
subscription.unsubscribe(); // Ensure cleanup
|
|
|
}
|