live_srv.ts 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. ///<reference path="../../headers/common.d.ts" />
  2. import _ from 'lodash';
  3. import config from 'app/core/config';
  4. import {Observable} from 'rxjs/Observable';
  5. export class LiveSrv {
  6. conn: any;
  7. observers: any;
  8. initPromise: any;
  9. constructor() {
  10. this.observers = {};
  11. }
  12. getWebSocketUrl() {
  13. var l = window.location;
  14. return ((l.protocol === "https:") ? "wss://" : "ws://") + l.host + config.appSubUrl + '/ws';
  15. }
  16. getConnection() {
  17. if (this.initPromise) {
  18. return this.initPromise;
  19. }
  20. if (this.conn && this.conn.readyState === 1) {
  21. return Promise.resolve(this.conn);
  22. }
  23. this.initPromise = new Promise((resolve, reject) => {
  24. console.log('Live: connecting...');
  25. this.conn = new WebSocket(this.getWebSocketUrl());
  26. this.conn.onclose = (evt) => {
  27. console.log("Live: websocket onclose", evt);
  28. reject({message: 'Connection closed'});
  29. this.initPromise = null;
  30. setTimeout(this.reconnect.bind(this), 2000);
  31. };
  32. this.conn.onmessage = (evt) => {
  33. this.handleMessage(evt.data);
  34. };
  35. this.conn.onerror = (evt) => {
  36. this.initPromise = null;
  37. reject({message: 'Connection error'});
  38. console.log("Live: websocket error", evt);
  39. };
  40. this.conn.onopen = (evt) => {
  41. console.log('opened');
  42. this.initPromise = null;
  43. resolve(this.conn);
  44. };
  45. });
  46. return this.initPromise;
  47. }
  48. handleMessage(message) {
  49. message = JSON.parse(message);
  50. if (!message.stream) {
  51. console.log("Error: stream message without stream!", message);
  52. return;
  53. }
  54. var observer = this.observers[message.stream];
  55. if (!observer) {
  56. this.removeObserver(message.stream, null);
  57. return;
  58. }
  59. observer.next(message);
  60. }
  61. reconnect() {
  62. // no need to reconnect if no one cares
  63. if (_.keys(this.observers).length === 0) {
  64. return;
  65. }
  66. console.log('LiveSrv: Reconnecting');
  67. this.getConnection().then(conn => {
  68. _.each(this.observers, (value, key) => {
  69. this.send({action: 'subscribe', stream: key});
  70. });
  71. });
  72. }
  73. send(data) {
  74. this.conn.send(JSON.stringify(data));
  75. }
  76. addObserver(stream, observer) {
  77. this.observers[stream] = observer;
  78. this.getConnection().then(conn => {
  79. this.send({action: 'subscribe', stream: stream});
  80. });
  81. }
  82. removeObserver(stream, observer) {
  83. console.log('unsubscribe', stream);
  84. delete this.observers[stream];
  85. this.getConnection().then(conn => {
  86. this.send({action: 'unsubscribe', stream: stream});
  87. });
  88. }
  89. subscribe(streamName) {
  90. console.log('LiveSrv.subscribe: ' + streamName);
  91. return Observable.create(observer => {
  92. this.addObserver(streamName, observer);
  93. return () => {
  94. this.removeObserver(streamName, observer);
  95. };
  96. });
  97. // return this.init().then(() => {
  98. // this.send({action: 'subscribe', stream: name});
  99. // });
  100. }
  101. }
  102. var instance = new LiveSrv();
  103. export {instance as liveSrv};