live_srv.ts 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import _ from 'lodash';
  2. import config from 'app/core/config';
  3. import { Observable } from 'rxjs/Observable';
  4. export class LiveSrv {
  5. conn: any;
  6. observers: any;
  7. initPromise: any;
  8. constructor() {
  9. this.observers = {};
  10. }
  11. getWebSocketUrl() {
  12. var l = window.location;
  13. return (
  14. (l.protocol === 'https:' ? 'wss://' : 'ws://') +
  15. l.host +
  16. config.appSubUrl +
  17. '/ws'
  18. );
  19. }
  20. getConnection() {
  21. if (this.initPromise) {
  22. return this.initPromise;
  23. }
  24. if (this.conn && this.conn.readyState === 1) {
  25. return Promise.resolve(this.conn);
  26. }
  27. this.initPromise = new Promise((resolve, reject) => {
  28. console.log('Live: connecting...');
  29. this.conn = new WebSocket(this.getWebSocketUrl());
  30. this.conn.onclose = evt => {
  31. console.log('Live: websocket onclose', evt);
  32. reject({ message: 'Connection closed' });
  33. this.initPromise = null;
  34. setTimeout(this.reconnect.bind(this), 2000);
  35. };
  36. this.conn.onmessage = evt => {
  37. this.handleMessage(evt.data);
  38. };
  39. this.conn.onerror = evt => {
  40. this.initPromise = null;
  41. reject({ message: 'Connection error' });
  42. console.log('Live: websocket error', evt);
  43. };
  44. this.conn.onopen = evt => {
  45. console.log('opened');
  46. this.initPromise = null;
  47. resolve(this.conn);
  48. };
  49. });
  50. return this.initPromise;
  51. }
  52. handleMessage(message) {
  53. message = JSON.parse(message);
  54. if (!message.stream) {
  55. console.log('Error: stream message without stream!', message);
  56. return;
  57. }
  58. var observer = this.observers[message.stream];
  59. if (!observer) {
  60. this.removeObserver(message.stream, null);
  61. return;
  62. }
  63. observer.next(message);
  64. }
  65. reconnect() {
  66. // no need to reconnect if no one cares
  67. if (_.keys(this.observers).length === 0) {
  68. return;
  69. }
  70. console.log('LiveSrv: Reconnecting');
  71. this.getConnection().then(conn => {
  72. _.each(this.observers, (value, key) => {
  73. this.send({ action: 'subscribe', stream: key });
  74. });
  75. });
  76. }
  77. send(data) {
  78. this.conn.send(JSON.stringify(data));
  79. }
  80. addObserver(stream, observer) {
  81. this.observers[stream] = observer;
  82. this.getConnection().then(conn => {
  83. this.send({ action: 'subscribe', stream: stream });
  84. });
  85. }
  86. removeObserver(stream, observer) {
  87. console.log('unsubscribe', stream);
  88. delete this.observers[stream];
  89. this.getConnection().then(conn => {
  90. this.send({ action: 'unsubscribe', stream: stream });
  91. });
  92. }
  93. subscribe(streamName) {
  94. console.log('LiveSrv.subscribe: ' + streamName);
  95. return Observable.create(observer => {
  96. this.addObserver(streamName, observer);
  97. return () => {
  98. this.removeObserver(streamName, observer);
  99. };
  100. });
  101. // return this.init().then(() => {
  102. // this.send({action: 'subscribe', stream: name});
  103. // });
  104. }
  105. }
  106. var instance = new LiveSrv();
  107. export { instance as liveSrv };