|
@@ -8,29 +8,44 @@ ii) Enabling the use of mutli adapters usage to increase bandwith for data trans
|
|
|
*/
|
|
|
import { MessageTransmissionBase } from "../base/msg.transmission.base";
|
|
|
import { v4 as uuidv4 } from 'uuid'
|
|
|
-import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Subject, Subscription } from "rxjs";
|
|
|
+import { BehaviorSubject, distinctUntilChanged, filter, map, Observable, Observer, Subject, Subscription, Unsubscribable } from "rxjs";
|
|
|
import { RetransmissionService } from "../utils/retransmission.service";
|
|
|
import { WrappedMessage } from "../utils/message.ordering";
|
|
|
import ConsoleLogger from "../utils/log.utils";
|
|
|
-import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmissionProfile, TransmitterAdapterInterface, TransportMessage } from "../interface/interface";
|
|
|
-import { error } from "console";
|
|
|
-import { TransmitterAdapter } from "../adapters/adapter.transmitter";
|
|
|
+import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmissionConfig, TransmissionProfile, TransmitterAdapterInterface, TransportMessage } from "../interface/interface";
|
|
|
+import { checkRxType } from "../utils/general.utils";
|
|
|
|
|
|
|
|
|
-export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
|
|
|
+export class MessageTransmissionTransmitter<T> extends MessageTransmissionBase<T> {
|
|
|
+ private internalObservable: Observable<GeneralEvent<T>> = new Observable()
|
|
|
private connectionStateEvent: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
|
|
|
private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission'])
|
|
|
private messageToBeBuffered!: Subject<FisMessage | WrappedMessage>
|
|
|
private buffer!: RetransmissionService;
|
|
|
- private currentAdapter!: TransmitterAdapterInterface
|
|
|
+ private currentAdapter!: TransmitterAdapterInterface<any>
|
|
|
|
|
|
- constructor(profile: TransmissionProfile, adapterManager: AdapterManagerInterface) {
|
|
|
- super()
|
|
|
- this.profile = profile
|
|
|
- this.console.log({ message: `Constructing Transmitter Transmission for Receiving target: ${this.profile.target}` })
|
|
|
+ constructor(config: TransmissionConfig, transmissionEvent: Observable<GeneralEvent<any>>) {
|
|
|
+ super(config, transmissionEvent)
|
|
|
+ this.setTransmissionProfile(`Transmitter`, config)
|
|
|
+ this.console.log({ message: `Constructing Transmitter Transmission for Receiving target: ${this.transmissionProfile.config.target}` })
|
|
|
this.messageToBeBuffered = new Subject()
|
|
|
this.buffer = new RetransmissionService()
|
|
|
- this.initializeTransmitterComponents(adapterManager)
|
|
|
+ this.initializeTransmitterComponents(transmissionEvent)
|
|
|
+ }
|
|
|
+
|
|
|
+ public subscribe(observer: Observer<any> | Observable<any>): Unsubscribable | null {
|
|
|
+ if (checkRxType(observer) === `Observable`) {
|
|
|
+ this.console.log({ message: `Is Observable` });
|
|
|
+ // Create a new Subscription to manage unsubscription
|
|
|
+ const subscription = (observer as Observable<any>).subscribe(message => {
|
|
|
+ this.emit(message);
|
|
|
+ this.console.log({ message: `Message ${message.header?.messageID ?? `Undefined`} being processed... ` });
|
|
|
+ });
|
|
|
+
|
|
|
+ return subscription; // Return the Subscription (Unsubscribable)
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public emit(message: FisMessage): void {
|
|
@@ -40,15 +55,21 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
|
|
|
|
|
|
/* After setting up, will listen specifically to the connection state of this particular remote client. So that, the buffer signal can be
|
|
|
established to allow the buffer to do their thing. */
|
|
|
- private initializeTransmitterComponents(adapterManager: AdapterManagerInterface): void {
|
|
|
+ private initializeTransmitterComponents(transmissionEvent: Observable<GeneralEvent<AdapterInterface<any>>>): void {
|
|
|
this.console.log({ message: `Setting up Retransmission Service...` })
|
|
|
// Listen and update adapters
|
|
|
- adapterManager.subscribeForAdapters(this.profile.target, `Transmitter`).subscribe((adapter: AdapterInterface) => {
|
|
|
+ transmissionEvent.pipe(
|
|
|
+ filter(event => event.type === `Transmission Event`),
|
|
|
+ filter(event => event.event === `New Adapter`),
|
|
|
+ filter(event => (event.data as AdapterInterface<any>).getAdapterProfile(`clientId`) === this.transmissionProfile.config.target),
|
|
|
+ filter(event => (event.data as AdapterInterface<any>).getAdapterProfile(`role`) === `Transmitter`),
|
|
|
+ map(event => { return event.data as AdapterInterface<any> })
|
|
|
+ ).subscribe((adapter: AdapterInterface<any>) => {
|
|
|
this.adapters.push(adapter)
|
|
|
this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} transmitting adapter. Current adapter length: ${this.adapters.length}` })
|
|
|
if (!this.currentAdapter) {
|
|
|
this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`id`)} as current adapter.` })
|
|
|
- this.currentAdapter = adapter as TransmitterAdapterInterface
|
|
|
+ this.currentAdapter = adapter as TransmitterAdapterInterface<T>
|
|
|
let connectionState: Observable<ConnectionState> = this.currentAdapter.getAdapterProfile('connectionState') as Observable<ConnectionState>
|
|
|
connectionState.subscribe(this.connectionStateEvent)
|
|
|
}
|
|
@@ -60,7 +81,7 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
|
|
|
// need to work with wrapped messages
|
|
|
this.console.log({ message: `Transmitting ${bufferedMessage.thisMessageID}` });
|
|
|
if (this.currentAdapter) {
|
|
|
- this.currentAdapter.emit(this.profile.source, bufferedMessage)
|
|
|
+ this.currentAdapter.emit(this.transmissionProfile.config.source, bufferedMessage)
|
|
|
} else {
|
|
|
// just flush back the message inside the buffer, if the adapter is not ready or assigned.
|
|
|
this.messageToBeBuffered.next(bufferedMessage)
|