123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- import { MessageTransmissionBase } from "../base/msg.transmission.base";
- import { v4 as uuidv4 } from 'uuid'
- import { BehaviorSubject, distinct, distinctUntilChanged, filter, map, Observable, Subject, Subscription } from "rxjs";
- import { RetransmissionService } from "../utils/retransmission.service";
- import { WrappedMessage } from "../utils/message.ordering";
- import ConsoleLogger from "../utils/log.utils";
- import { AdapterInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmitterAdapterInterface, TransportMessage } from "../interface/interface";
- import { error } from "console";
- /* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
- connectors or adapters will have their own identifier*/
- export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
- private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
- private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
- private messageToBeBuffered!: Subject<FisMessage | WrappedMessage>
- private messageToBeTransmitted!: Subject<WrappedMessage>
- private buffer!: RetransmissionService;
- private currentAdapter!: TransmitterAdapterInterface
- constructor(clientId: string, event: Subject<GeneralEvent<any>>) {
- super()
- this.console.log({ message: `Constructing Transmitter Transmission with ${clientId}` })
- this.event = event
- this.messageToBeTransmitted = new Subject()
- this.messageToBeBuffered = new Subject()
- this.buffer = new RetransmissionService()
- this.handleAdapters(this.event)
- this.setupBuffer()
- // special case just for http in case of server/client disconnected, the unsent msg will be flushed back into messageToBeBuffered
- // logic here
- }
- public emit(message: FisMessage): void {
- this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting message` : `Buffering message`}` })
- this.messageToBeBuffered.next(message)
- }
- private setupBuffer(): void {
- this.console.log({ message: `Setting up Retransmission Service...` })
- this.event.pipe(
- filter(event => event.data.clientId == this.clientId),
- filter(event => event.event == 'Client Disconnected' || event.event == 'Client Re-connected' || event.event == 'Client Connected' || event.event == 'Server Disconnected' || event.event == 'Server Connected'),
- map(event => {
- if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') {
- return 'OFFLINE'
- } else {
- return `ONLINE`
- }
- }),
- distinctUntilChanged()
- ).subscribe((signal: ConnectionState) => {
- this.connectionStateEvent.next(signal)
- if (signal == 'OFFLINE') this.console.error({ message: `${this.clientId} disconnected` })
- if (signal == 'ONLINE') this.console.log({ message: `${this.clientId} connected` })
- })
- this.buffer.implementRetransmission(this.messageToBeBuffered, this.connectionStateEvent.asObservable(), true)
- // automatically subscribe to allow released bffered messages to be released
- this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => {
- // need to work with wrapped messages
- this.console.log({ message: `Releasing ${bufferedMessage.thisMessageID}` });
- if (this.currentAdapter) {
- // this.currentAdapter.emit(bufferedMessage)
- this.messageToBeTransmitted.next(bufferedMessage)
- } else {
- this.messageToBeBuffered.next(bufferedMessage)
- this.console.error({ message: `Adapter is not set. Please ensure adapters are ready.` })
- }
- })
- }
- private handleAdapters(adaptersEvent: Subject<GeneralEvent<any>>): void {
- adaptersEvent.pipe(
- filter(event => event.event === `New Adapter`),
- map(event => { return event.data }),
- ).subscribe({
- next: (adapters: AdapterInterface[]) => {
- adapters.forEach((adapter: AdapterInterface) => {
- if (adapter.role === `Transmitter`) {
- this.adapters.push(adapter as TransmitterAdapterInterface)
- adaptersEvent.next({
- id: uuidv4(),
- type: 'Transmission Event',
- event: `New Adapter`,
- date: new Date(),
- data: adapter,
- transport: adapter.transport
- })
- }
- })
- },
- error: error => this.console.error({ message: 'Observer Error', details: error })
- })
- // listen to newly added adapters in transmission
- adaptersEvent.pipe(
- filter(event => event.type === `Transmission Event`),
- filter(event => event.event === `New Adapter`),
- map(event => {
- return event.data
- })
- ).subscribe((adapter: AdapterInterface) => {
- if (!this.currentAdapter) {
- this.currentAdapter = adapter as TransmitterAdapterInterface
- } else {
- this.console.log({ message: `Already have existing transmitting adapter. Currently hardcode to use only 1` })
- }
- })
- }
- // temporary logic for now.
- private setUpAdapter(): void {
- if (!this.currentAdapter && this.adapters.some(adapter => adapter.transport === `Websocket`)) {
- this.currentAdapter = this.adapters.find(adapter => adapter.transport === `Websocket`) as TransmitterAdapterInterface
- } else {
- this.console.error({ message: 'No websocket socket adapter avaiable' })
- }
- }
- private uniqueHandlerToFlushUnsentMessages(event: Observable<GeneralEvent<any>>): void {
- event.pipe(
- filter(event => event.event == 'Re-Flush'),
- filter(event => event.data.clientId == this.clientId),
- ).subscribe((event: GeneralEvent<any>) => {
- this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}` : `Buffering ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}`}` })
- this.messageToBeBuffered.next(((event.data.payload as TransportMessage).payload as WrappedMessage))
- })
- }
- }
|