import { Injectable } from '@angular/core'; import { Observable, Subject, Subscription } from 'rxjs'; import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; import { filter, map } from 'rxjs/operators'; import { WebSocketMessage } from '../message-types/websocket-message'; export enum Scope { ZMQ = 'zmq', WEBSOCKET = 'websocket' } export enum Intent { CONTROL = 'control', EVENT = 'event' } @Injectable() export class WebSocketService { private ws: WebSocketSubject; private subject: Subject = new Subject(); private subscription: Subscription; constructor() {} public connect() { if (!this.ws) { if (this.subscription) { this.subscription.unsubscribe(); } const wsproto = (location.protocol === 'https:') ? 'wss://' : 'ws://'; const connAddr = wsproto + window.location.host + '/ws'; this.ws = webSocket({ url: connAddr, closeObserver: { next: closeEvent => { this.ws = null; this.connect(); } } }); this.subscription = this.ws.subscribe(msg => this.subject.next(msg)); } } public observeControl(key: string): Observable { return this.observeAll(key).pipe( filter(message => message.intent !== Intent.EVENT) ); } public observeAll(key: string): Observable { return this.subject.pipe( filter(message => message.key.startsWith(key)), map(message => message) ); } public send(json: any) { this.ws.next(json); } }