grpc2.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. import { Subject, from, interval, take } from 'rxjs';
  2. import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
  3. import { GrpcServiceMethod } from '../services/grpc.service.method';
  4. import { readFileSync } from 'fs';
  5. import { ServerClientManager } from '../services/server-client.service';
  6. import mongoose from 'mongoose';
  7. mongoose.connect('mongodb://localhost:27017/grpc2')
  8. const Message = mongoose.model('Message', require('../models/message.schema'))
  9. // Subject for bidirectional communication
  10. const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
  11. const messagesJSON: any = readFileSync('payload.json')
  12. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  13. let targetserver: string = 'localhost:3000'
  14. let targetserver2: string = 'localhost:3002'
  15. let hostServer: string = 'localhost:3001'
  16. let intervalToStreamOutGoingMessage: number = 1
  17. let array: Message[] = []
  18. /* Simple Test: 1 to 1 */
  19. let connectionRequest: ConnectionRequest = {
  20. server: {
  21. name: 'g2',
  22. serverUrl: hostServer,
  23. connectionType: 'GRPC',
  24. messageToBePublishedFromApplication: new Subject<Message>()
  25. },
  26. client: {
  27. name: 'g1',
  28. targetServer: targetserver,
  29. connectionType: 'GRPC',
  30. messageToBeReceivedFromRemote: new Subject<Message>()
  31. }
  32. }
  33. connectionService.generateConnection(connectionRequest)
  34. // 10000th message == 848438e1-da50-4d98-aa12-e44d6d6a1489
  35. // let generateFakeMessagesToBePublished = stream().pipe(take(1000))
  36. // let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(1000))
  37. // generateFakeMessagesToBePublished.subscribe({
  38. // next: message => {
  39. // let payload: Message = {
  40. // id: hostServer,
  41. // message: message
  42. // }
  43. // connectionRequest.server.messageToBePublishedfromApplication.next(payload)
  44. // }
  45. // })
  46. connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  47. next: response => {
  48. // if((response.message as MessageLog).appData.msgId == `ebf94479-44fe-470d-827c-9f1389396d6a`){
  49. // console.log(`Received the 1000th message. Running the test. Initiating server restart....`)
  50. // connectionService.restartServerInDuration(10)
  51. // }
  52. console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  53. // Message.create(response)
  54. array.push(response)
  55. },
  56. error: error => console.error(error),
  57. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  58. })
  59. /* Complex Test: 1 to 1*/
  60. // let connectionRequest: ConnectionRequest = {
  61. // server: {
  62. // name: 'g1',
  63. // serverUrl: hostServer,
  64. // connectionType: 'GRPC',
  65. // messageToBePublishedfromApplication: new Subject<Message>()
  66. // },
  67. // client: {
  68. // name: 'g2',
  69. // targetServer: targetserver,
  70. // connectionType: 'GRPC',
  71. // messageToBeReceivedFromRemote: new Subject<Message>()
  72. // }
  73. // }
  74. // connectionService.generateConnection(connectionRequest)
  75. // setTimeout(() => {
  76. // let message = {
  77. // id: parsedMessages[10].appData.msgId,
  78. // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  79. // }
  80. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  81. // }, 3000)
  82. // setTimeout(() => {
  83. // let message = {
  84. // id: parsedMessages[11].appData.msgId,
  85. // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  86. // }
  87. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  88. // }, 4000)
  89. // Handler for the incoming Messages from the other side.
  90. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  91. // next: request => {
  92. // // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
  93. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  94. // generateFakeStreamResponse(request).subscribe({
  95. // next: (responseMessage: Message) => {
  96. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  97. // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  98. // },
  99. // error: error => console.error(error),
  100. // complete: () => {
  101. // console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  102. // }
  103. // })
  104. // } else {
  105. // array.push(request)
  106. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  107. // }
  108. // },
  109. // error: error => console.error(error),
  110. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  111. // })
  112. /* Simple Test: 1 to Many */
  113. // let connectionRequest: ConnectionRequest = {
  114. // server: {
  115. // name: 'g1',
  116. // serverUrl: hostServer,
  117. // connectionType: 'GRPC',
  118. // messageToBePublishedfromApplication: new Subject<Message>()
  119. // },
  120. // client: {
  121. // name: 'g2',
  122. // targetServer: targetserver,
  123. // connectionType: 'GRPC',
  124. // messageToBeReceivedFromRemote: new Subject<Message>()
  125. // }
  126. // }
  127. // let connectionRequest2: ConnectionRequest = {
  128. // server: {
  129. // name: 'g1',
  130. // serverUrl: hostServer,
  131. // connectionType: 'GRPC',
  132. // messageToBePublishedfromApplication: new Subject<Message>()
  133. // },
  134. // client: {
  135. // name: 'g3',
  136. // targetServer: targetserver2,
  137. // connectionType: 'GRPC',
  138. // messageToBeReceivedFromRemote: new Subject<Message>()
  139. // }
  140. // }
  141. // connectionService.generateConnection(connectionRequest)
  142. // connectionService.generateConnection(connectionRequest2)
  143. // let generateFakeMessagesToBePublished = stream().pipe(take(10))
  144. // generateFakeMessagesToBePublished.subscribe({
  145. // next: message => {
  146. // let payload: Message = {
  147. // id: hostServer,
  148. // message: message
  149. // }
  150. // connectionRequest.server.messageToBePublishedfromApplication.next(payload)
  151. // }
  152. // })
  153. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  154. // next: request => {
  155. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  156. // array.push(request)
  157. // },
  158. // error: error => console.error(error),
  159. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  160. // })
  161. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  162. // next: request => {
  163. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  164. // array.push(request)
  165. // },
  166. // error: error => console.error(error),
  167. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  168. // })
  169. // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
  170. // next: request => {
  171. // array.push(request)
  172. // },
  173. // error: error => console.error(error),
  174. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  175. // })
  176. /* Complex Test: 1 to Many */
  177. // let connectionRequest: ConnectionRequest = {
  178. // server: {
  179. // name: 'g1',
  180. // serverUrl: hostServer,
  181. // connectionType: 'GRPC',
  182. // messageToBePublishedfromApplication: new Subject<Message>()
  183. // },
  184. // client: {
  185. // name: 'g2',
  186. // targetServer: targetserver,
  187. // connectionType: 'GRPC',
  188. // messageToBeReceivedFromRemote: new Subject<Message>()
  189. // }
  190. // }
  191. // let connectionRequest2: ConnectionRequest = {
  192. // server: {
  193. // name: 'g1',
  194. // serverUrl: hostServer,
  195. // connectionType: 'GRPC',
  196. // messageToBePublishedfromApplication: new Subject<Message>()
  197. // },
  198. // client: {
  199. // name: 'g3',
  200. // targetServer: targetserver2,
  201. // connectionType: 'GRPC',
  202. // messageToBeReceivedFromRemote: new Subject<Message>()
  203. // }
  204. // }
  205. // connectionService.generateConnection(connectionRequest)
  206. // connectionService.generateConnection(connectionRequest2)
  207. // setTimeout(() => {
  208. // let message = {
  209. // id: parsedMessages[10].appData.msgId,
  210. // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  211. // }
  212. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  213. // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
  214. // }, 3000)
  215. // setTimeout(() => {
  216. // let message = {
  217. // id: parsedMessages[11].appData.msgId,
  218. // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  219. // }
  220. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  221. // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
  222. // }, 4000)
  223. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  224. // next: request => {
  225. // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
  226. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  227. // generateFakeStreamResponse(request).subscribe({
  228. // next: (responseMessage: Message) => {
  229. // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  230. // },
  231. // error: error => console.error(error),
  232. // complete: () => {
  233. // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
  234. // }
  235. // })
  236. // } else {
  237. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  238. // array.push(request)
  239. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  240. // }
  241. // },
  242. // error: error => console.error(error),
  243. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  244. // })
  245. // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
  246. // next: request => {
  247. // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
  248. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  249. // generateFakeStreamResponse(request).subscribe({
  250. // next: (responseMessage: Message) => {
  251. // connectionRequest2.server.messageToBePublishedfromApplication.next(responseMessage)
  252. // },
  253. // error: error => console.error(error),
  254. // complete: () => {
  255. // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
  256. // }
  257. // })
  258. // } else {
  259. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  260. // array.push(request)
  261. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  262. // }
  263. // },
  264. // error: error => console.error(error),
  265. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  266. // })
  267. // this is just to publish an array of fake data as a Subject
  268. function stream(): Subject<any> {
  269. let result: Subject<any> = new Subject()
  270. let messages: any[] = parsedMessages
  271. let count = 0
  272. const intervalId = setInterval(() => {
  273. result.next(messages[count]);
  274. count++;
  275. if (count >= 1000) {
  276. clearInterval(intervalId);
  277. result.complete();
  278. }
  279. }, intervalToStreamOutGoingMessage)
  280. return result
  281. }
  282. function generateFakeStreamResponse(request: any): Subject<any> {
  283. let res: Subject<any> = new Subject()
  284. stream().pipe(take(7)).subscribe({
  285. next: element => {
  286. let message = {
  287. id: request.id, // Caller's
  288. message: element
  289. }
  290. res.next(message)
  291. },
  292. error: error => console.error(error),
  293. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  294. })
  295. return res
  296. }
  297. /* Checking the values by the end of the test */
  298. interval(5000).subscribe(() => {
  299. console.log(`All received data: ${array.length}`);
  300. });