123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- import { MessageTransmissionBase } from "../base/msg.transmission.base";
- import { filter, map, Observable, Observer, Subject, Subscription, takeWhile } from "rxjs";
- import { v4 as uuidv4 } from 'uuid'
- import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
- import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
- import { AdapterInterface, FisMessage, GeneralEvent, MessageRequestResponseInterface, TransportMessage } from "../interface/interface";
- import { WrappedMessage } from "../utils/message.ordering";
- export class MessageTransmissionRequestResponse extends MessageTransmissionBase implements MessageRequestResponseInterface {
- transmitterInstance!: MessageTransmissionTransmitter;
- receiverInstance!: MessageTransmissionReceiver;
- constructor(clientId: string, transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, event: Subject<GeneralEvent<any>>) {
- super()
- this.clientId = clientId
- this.transmitterInstance = transmitterInstance
- this.receiverInstance = receiverInstance
- this.event = event
- }
- send(message: FisMessage): Observable<FisMessage> {
- return new Observable((response: Observer<FisMessage>) => {
- // logic here
- if (this.transmitterInstance && this.receiverInstance) {
- this.transmitterInstance.emit(message)
- const subscription: Subscription = this.receiverInstance.getIncoming().pipe(
- filter(event => event.event === `New Message`),
- filter(event => (((event.data as TransportMessage)?.payload as WrappedMessage)?.payload as FisMessage)?.header.messageID === message.header.messageID),
- map(event => {
- return (event.data as TransportMessage).payload as FisMessage
- })
- ).subscribe({
- next: (message: FisMessage) => {
- if (message.data == 'Complete') {
- response.complete()
- } else {
- response.next(message)
- }
- },
- error: error => console.error(error)
- })
- // Clean up on unsubscription
- return () => {
- subscription.unsubscribe();
- }
- } else {
- response.error(new Error('Transmitter or receiver instance is missing.'));
- return;
- }
- });
- }
- }
|