adapter.transmitter.ts 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import dotenv from 'dotenv';
  2. import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Subject } from 'rxjs';
  3. import { WrappedMessage } from '../utils/message.ordering';
  4. import ConsoleLogger from '../utils/log.utils';
  5. import { AdapterBase } from '../base/adapter.base';
  6. import { ClientObject, ConnectionState, FisMessage, TransportType, TransportMessage, TransportServiceInterface } from '../interface/interface';
  7. import { fileURLToPath } from 'url';
  8. dotenv.config();
  9. /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
  10. So how?: */
  11. export class TransmitterAdapter extends AdapterBase {
  12. private console!: ConsoleLogger
  13. constructor(clientId: string, adapterType: TransportType, transportService: TransportServiceInterface) {
  14. super()
  15. this.console = new ConsoleLogger(`${adapterType}TransmitterAdapter`, ['adapter'])
  16. this.setAdapterProfile(clientId, adapterType, transportService, 'Transmitter')
  17. this.setupConnectionState(transportService)
  18. this.console.log({ message: `Contructing TransmitterAdapter for client: ${clientId}` })
  19. }
  20. emit(selfId: string, message: WrappedMessage): void {
  21. // logic here
  22. this.console.log({ message: `Emitting: ${(message.payload as FisMessage).header.messageID} to ${this.AdapterProfile.clientId}` })
  23. this.AdapterProfile.transportService.emit({
  24. id: this.AdapterProfile.clientId,
  25. self: selfId,
  26. target: this.AdapterProfile.clientId,
  27. payload: message
  28. } as TransportMessage)
  29. }
  30. getConnectionState(): Observable<ConnectionState> {
  31. return this.AdapterProfile.connectionState.asObservable()
  32. }
  33. private setupConnectionState(transportService: TransportServiceInterface): void {
  34. transportService.subscribeForEvent().pipe(
  35. filter(event => event.type === `Transport Event`),
  36. filter(event => (event.data as ClientObject).clientId === this.AdapterProfile.clientId),
  37. map(event => {
  38. if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
  39. return 'OFFLINE'
  40. } else {
  41. return `ONLINE`
  42. }
  43. }),
  44. distinctUntilChanged()
  45. ).subscribe((signal: ConnectionState) => {
  46. this.AdapterProfile.connectionState.next(signal)
  47. if (signal == 'OFFLINE') this.console.error({ message: `${this.AdapterProfile.clientId} disconnected` })
  48. if (signal == 'ONLINE') this.console.log({ message: `${this.AdapterProfile.clientId} connected and ready to go` })
  49. })
  50. }
  51. }