1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- /* This is more unique transmission component, because it's supposed to emulate a conventional request response call. But the underlying
- mechanism is still using message. Here as you will see, it is basically taking the already instantiated transmitter and receiver components
- and basically just filtering the responses based on whatever identifier it needs for the orignal request. */
- import { MessageTransmissionBase } from "../base/msg.transmission.base";
- import { filter, Observable, Observer, Subscription, Unsubscribable } from "rxjs";
- import { FisMessage, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransmissionConfig, TransmissionProfile } from "../interface/interface";
- import ConsoleLogger from "../utils/log.utils";
- import { checkRxType } from "../utils/general.utils";
- export class MessageTransmissionRequestResponse<T> implements MessageRequestResponseInterface<T> {
- private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionRequestResponse`, ['transmission'])
- private transmissionProfile!: TransmissionProfile
- protected transmitterInstance!: MessageTransmitterInterface<any>;
- protected receiverInstance!: MessageReceiverInterface<any>;
- constructor(config: TransmissionConfig, transmitterInstance: MessageTransmitterInterface<any>, receiverInstance: MessageReceiverInterface<any>) {
- this.console.log({ message: `Constructing Request Response Transmission for Receiving target: ${config.target}` })
- this.transmitterInstance = transmitterInstance
- this.receiverInstance = receiverInstance
- }
- getTransmissionProfile(type?: `config` | `id` | `role`): TransmissionProfile | TransmissionConfig | string | undefined {
- return type ? this.transmissionProfile[type] : this.transmissionProfile;
- }
- // this is still broken at this point in time. DO NOT USE!!!!
- subscribe(observer: Observer<T> | Observable<T>): Unsubscribable | null {
- if (checkRxType(observer) === `Observable`) {
- // Create a new Subscription to manage unsubscription
- const subscription = (observer as Observable<any>).subscribe(message => {
- this.send(message).subscribe(response => {
- this.console.log({ message: `Response received: ${response}` });
- });
- });
- return subscription; // Return the Subscription (Unsubscribable)
- } else {
- return null;
- }
- }
- send(message: FisMessage): Observable<FisMessage> {
- return new Observable((response: Observer<FisMessage>) => {
- // logic here
- if (this.transmitterInstance && this.receiverInstance) {
- this.transmitterInstance.emit(message)
- const subscription: Subscription = this.receiverInstance.getReceivables().pipe(
- filter(event => event.header.messageID === message.header.messageID),
- ).subscribe({
- next: (message: FisMessage) => {
- if (message.data == 'Complete') {
- response.complete()
- } else {
- response.next(message)
- }
- },
- error: error => console.error(error)
- })
- // Clean up on unsubscription
- return () => {
- subscription.unsubscribe();
- }
- } else {
- response.error(new Error('Transmitter or receiver instance is missing.'));
- return;
- }
- });
- }
- }
|