live_srv.ts 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import _ from 'lodash';
  2. import config from 'app/core/config';
  3. import { Observable } from 'rxjs/Observable';
  4. export class LiveSrv {
  5. conn: any;
  6. observers: any;
  7. initPromise: any;
  8. constructor() {
  9. this.observers = {};
  10. }
  11. getWebSocketUrl() {
  12. var l = window.location;
  13. return (l.protocol === 'https:' ? 'wss://' : 'ws://') + l.host + config.appSubUrl + '/ws';
  14. }
  15. getConnection() {
  16. if (this.initPromise) {
  17. return this.initPromise;
  18. }
  19. if (this.conn && this.conn.readyState === 1) {
  20. return Promise.resolve(this.conn);
  21. }
  22. this.initPromise = new Promise((resolve, reject) => {
  23. console.log('Live: connecting...');
  24. this.conn = new WebSocket(this.getWebSocketUrl());
  25. this.conn.onclose = evt => {
  26. console.log('Live: websocket onclose', evt);
  27. reject({ message: 'Connection closed' });
  28. this.initPromise = null;
  29. setTimeout(this.reconnect.bind(this), 2000);
  30. };
  31. this.conn.onmessage = evt => {
  32. this.handleMessage(evt.data);
  33. };
  34. this.conn.onerror = evt => {
  35. this.initPromise = null;
  36. reject({ message: 'Connection error' });
  37. console.log('Live: websocket error', evt);
  38. };
  39. this.conn.onopen = evt => {
  40. console.log('opened');
  41. this.initPromise = null;
  42. resolve(this.conn);
  43. };
  44. });
  45. return this.initPromise;
  46. }
  47. handleMessage(message) {
  48. message = JSON.parse(message);
  49. if (!message.stream) {
  50. console.log('Error: stream message without stream!', message);
  51. return;
  52. }
  53. var observer = this.observers[message.stream];
  54. if (!observer) {
  55. this.removeObserver(message.stream, null);
  56. return;
  57. }
  58. observer.next(message);
  59. }
  60. reconnect() {
  61. // no need to reconnect if no one cares
  62. if (_.keys(this.observers).length === 0) {
  63. return;
  64. }
  65. console.log('LiveSrv: Reconnecting');
  66. this.getConnection().then(conn => {
  67. _.each(this.observers, (value, key) => {
  68. this.send({ action: 'subscribe', stream: key });
  69. });
  70. });
  71. }
  72. send(data) {
  73. this.conn.send(JSON.stringify(data));
  74. }
  75. addObserver(stream, observer) {
  76. this.observers[stream] = observer;
  77. this.getConnection().then(conn => {
  78. this.send({ action: 'subscribe', stream: stream });
  79. });
  80. }
  81. removeObserver(stream, observer) {
  82. console.log('unsubscribe', stream);
  83. delete this.observers[stream];
  84. this.getConnection().then(conn => {
  85. this.send({ action: 'unsubscribe', stream: stream });
  86. });
  87. }
  88. subscribe(streamName) {
  89. console.log('LiveSrv.subscribe: ' + streamName);
  90. return Observable.create(observer => {
  91. this.addObserver(streamName, observer);
  92. return () => {
  93. this.removeObserver(streamName, observer);
  94. };
  95. });
  96. // return this.init().then(() => {
  97. // this.send({action: 'subscribe', stream: name});
  98. // });
  99. }
  100. }
  101. var instance = new LiveSrv();
  102. export { instance as liveSrv };