diff --git a/src/app/services/queueing-subject.ts b/src/app/services/queueing-subject.ts deleted file mode 100644 index 7434546..0000000 --- a/src/app/services/queueing-subject.ts +++ /dev/null @@ -1,26 +0,0 @@ -// Based on https://github.com/ohjames/queueing-subject - -import { Subject, Subscriber, Subscription } from 'rxjs'; - -export class QueueingSubject extends Subject { - private queuedValues: T[] = []; - - next(value: T): void { - if (this.closed || this.observers.length) { - super.next(value); - } else { - this.queuedValues.push(value); - } - } - - _subscribe(subscriber: Subscriber): Subscription { - const subscription = super._subscribe(subscriber); - - if (this.queuedValues.length) { - this.queuedValues.forEach(value => super.next(value)); - this.queuedValues.splice(0); - } - - return subscription; - } -} diff --git a/src/app/services/websocket.service.ts b/src/app/services/websocket.service.ts index 724cba0..b7a4dcc 100644 --- a/src/app/services/websocket.service.ts +++ b/src/app/services/websocket.service.ts @@ -2,55 +2,34 @@ // All Rights Reserved. See LICENSE file for details. import { Injectable } from '@angular/core'; -import { QueueingSubject } from './queueing-subject'; import { Observable } from 'rxjs'; -import websocketConnect from 'rxjs-websockets'; -import { filter, map, share } from 'rxjs/operators'; +import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; +import { filter, map } from 'rxjs/operators'; import { WebSocketMessage } from '../message-types/websocket-message'; -function jsonWebsocketConnect(url: string, input: Observable, protocols?: string | string[]) { - const jsonInput = input.pipe(map(message => JSON.stringify(message))); - const { connectionStatus, messages } = websocketConnect(url, jsonInput, protocols); - const jsonMessages = messages.pipe(map(message => JSON.parse(message))); - return { connectionStatus, messages: jsonMessages }; -} - @Injectable() export class WebSocketService { - private uplink: QueueingSubject; - public downlink: Observable; + private ws: WebSocketSubject; constructor() {} public connect() { - if (this.downlink) { - return; + if (!this.ws) { + const wsproto = (location.protocol === 'https:') ? 'wss://' : 'ws://'; + const connAddr = wsproto + window.location.host + '/ws'; + this.ws = webSocket(connAddr); } - - const wsproto = (location.protocol === 'https:') ? 'wss://' : 'ws://'; - this.downlink = jsonWebsocketConnect( - wsproto + window.location.host + '/ws', - this.uplink = new QueueingSubject() - ).messages.pipe( - map(message => message), - share() - ); - console.log('ws connected'); } public observeKey(key: string): Observable { - return this.downlink.pipe(filter(message => message.key.startsWith(key)), map(message => message)); - } - - public send(key: string, data?: any, trigger?: any): void { - const message = {'key': key}; - if (data) { message['data'] = data; } - if (trigger) { message['trigger'] = trigger; } - this.sendJSON(message); + return this.ws.pipe( + filter(message => message.key.startsWith(key)), + map(message => message) + ); } public sendJSON(json: any) { - this.uplink.next(json); + this.ws.next(json); } }