123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- /* This is more unique transmission component, because it's supposed to emulate a conventional request response call. But the underlying
- mechanism is still using message. Here as you will see, it is basically taking the already instantiated transmitter and receiver components
- and basically just filtering the responses based on whatever identifier it needs for the orignal request. */
- import { MessageTransmissionBase } from "../base/msg.transmission.base";
- import { filter, Observable, Observer, Subscription } from "rxjs";
- import { FisMessage, MessageReceiverInterface, MessageRequestResponseInterface, MessageTransmitterInterface, TransmissionProfile } from "../interface/interface";
- import ConsoleLogger from "../utils/log.utils";
- export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface {
- private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionRequestResponse`, ['transmission'])
- protected transmitterInstance!: MessageTransmitterInterface;
- protected receiverInstance!: MessageReceiverInterface;
- constructor(profile: TransmissionProfile, transmitterInstance: MessageTransmitterInterface, receiverInstance: MessageReceiverInterface) {
- super()
- this.profile = profile
- this.console.log({ message: `Constructing Request Response Transmission for Receiving target: ${this.profile.target}` })
- this.transmitterInstance = transmitterInstance
- this.receiverInstance = receiverInstance
- }
- send(message: FisMessage): Observable<FisMessage> {
- return new Observable((response: Observer<FisMessage>) => {
- // logic here
- if (this.transmitterInstance && this.receiverInstance) {
- this.transmitterInstance.emit(message)
- const subscription: Subscription = this.receiverInstance.getReceivables().pipe(
- filter(event => event.header.messageID === message.header.messageID),
- ).subscribe({
- next: (message: FisMessage) => {
- if (message.data == 'Complete') {
- response.complete()
- } else {
- response.next(message)
- }
- },
- error: error => console.error(error)
- })
- // Clean up on unsubscription
- return () => {
- subscription.unsubscribe();
- }
- } else {
- response.error(new Error('Transmitter or receiver instance is missing.'));
- return;
- }
- });
- }
- }
|