live_srv.ts 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. ///<reference path="../../headers/common.d.ts" />
  2. import _ from 'lodash';
  3. import config from 'app/core/config';
  4. import coreModule from 'app/core/core_module';
  5. import {Observable} from 'vendor/npm/rxjs/Observable';
  6. export class LiveSrv {
  7. conn: any;
  8. observers: any;
  9. initPromise: any;
  10. constructor() {
  11. this.observers = {};
  12. }
  13. getWebSocketUrl() {
  14. var l = window.location;
  15. return ((l.protocol === "https:") ? "wss://" : "ws://") + l.host + config.appSubUrl + '/ws';
  16. }
  17. getConnection() {
  18. if (this.initPromise) {
  19. return this.initPromise;
  20. }
  21. if (this.conn && this.conn.readyState === 1) {
  22. return Promise.resolve(this.conn);
  23. }
  24. this.initPromise = new Promise((resolve, reject) => {
  25. console.log('Live: connecting...');
  26. this.conn = new WebSocket(this.getWebSocketUrl());
  27. this.conn.onclose = (evt) => {
  28. console.log("Live: websocket onclose", evt);
  29. reject({message: 'Connection closed'});
  30. this.initPromise = null;
  31. setTimeout(this.reconnect.bind(this), 2000);
  32. };
  33. this.conn.onmessage = (evt) => {
  34. this.handleMessage(evt.data);
  35. };
  36. this.conn.onerror = (evt) => {
  37. this.initPromise = null;
  38. reject({message: 'Connection error'});
  39. console.log("Live: websocket error", evt);
  40. };
  41. this.conn.onopen = (evt) => {
  42. console.log('opened');
  43. this.initPromise = null;
  44. resolve(this.conn);
  45. };
  46. });
  47. return this.initPromise;
  48. }
  49. handleMessage(message) {
  50. message = JSON.parse(message);
  51. if (!message.stream) {
  52. console.log("Error: stream message without stream!", message);
  53. return;
  54. }
  55. var observer = this.observers[message.stream];
  56. if (!observer) {
  57. this.removeObserver(message.stream, null);
  58. return;
  59. }
  60. observer.next(message);
  61. }
  62. reconnect() {
  63. // no need to reconnect if no one cares
  64. if (_.keys(this.observers).length === 0) {
  65. return;
  66. }
  67. console.log('LiveSrv: Reconnecting');
  68. this.getConnection().then(conn => {
  69. _.each(this.observers, (value, key) => {
  70. this.send({action: 'subscribe', stream: key});
  71. });
  72. });
  73. }
  74. send(data) {
  75. this.conn.send(JSON.stringify(data));
  76. }
  77. addObserver(stream, observer) {
  78. this.observers[stream] = observer;
  79. this.getConnection().then(conn => {
  80. this.send({action: 'subscribe', stream: stream});
  81. });
  82. }
  83. removeObserver(stream, observer) {
  84. console.log('unsubscribe', stream);
  85. delete this.observers[stream];
  86. this.getConnection().then(conn => {
  87. this.send({action: 'unsubscribe', stream: stream});
  88. });
  89. }
  90. subscribe(streamName) {
  91. console.log('LiveSrv.subscribe: ' + streamName);
  92. return Observable.create(observer => {
  93. this.addObserver(streamName, observer);
  94. return () => {
  95. this.removeObserver(streamName, observer);
  96. };
  97. });
  98. // return this.init().then(() => {
  99. // this.send({action: 'subscribe', stream: name});
  100. // });
  101. }
  102. }
  103. var instance = new LiveSrv();
  104. export {instance as liveSrv};