mirror of
				https://github.com/avatao-content/frontend-tutorial-framework
				synced 2025-11-04 12:12:55 +00:00 
			
		
		
		
	Rewrite WebSocketService to use rxjs instead of 3rd party dep
This commit is contained in:
		@@ -1,26 +0,0 @@
 | 
			
		||||
// Based on https://github.com/ohjames/queueing-subject
 | 
			
		||||
 | 
			
		||||
import { Subject, Subscriber, Subscription } from 'rxjs';
 | 
			
		||||
 | 
			
		||||
export class QueueingSubject<T> extends Subject<T> {
 | 
			
		||||
  private queuedValues: T[] = [];
 | 
			
		||||
 | 
			
		||||
  next(value: T): void {
 | 
			
		||||
    if (this.closed || this.observers.length) {
 | 
			
		||||
      super.next(value);
 | 
			
		||||
    } else {
 | 
			
		||||
      this.queuedValues.push(value);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  _subscribe(subscriber: Subscriber<T>): Subscription {
 | 
			
		||||
    const subscription = super._subscribe(subscriber);
 | 
			
		||||
 | 
			
		||||
    if (this.queuedValues.length) {
 | 
			
		||||
      this.queuedValues.forEach(value => super.next(value));
 | 
			
		||||
      this.queuedValues.splice(0);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return subscription;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@@ -2,55 +2,34 @@
 | 
			
		||||
// All Rights Reserved. See LICENSE file for details.
 | 
			
		||||
 | 
			
		||||
import { Injectable } from '@angular/core';
 | 
			
		||||
import { QueueingSubject } from './queueing-subject';
 | 
			
		||||
import { Observable } from 'rxjs';
 | 
			
		||||
import websocketConnect from 'rxjs-websockets';
 | 
			
		||||
import { filter, map, share } from 'rxjs/operators';
 | 
			
		||||
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
 | 
			
		||||
import { filter, map } from 'rxjs/operators';
 | 
			
		||||
import { WebSocketMessage } from '../message-types/websocket-message';
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
function jsonWebsocketConnect(url: string, input: Observable<object>, protocols?: string | string[]) {
 | 
			
		||||
  const jsonInput = input.pipe(map(message => JSON.stringify(message)));
 | 
			
		||||
  const { connectionStatus, messages } = websocketConnect(url, jsonInput, protocols);
 | 
			
		||||
  const jsonMessages = messages.pipe(map(message => JSON.parse(message)));
 | 
			
		||||
  return { connectionStatus, messages: jsonMessages };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class WebSocketService {
 | 
			
		||||
  private uplink: QueueingSubject<object>;
 | 
			
		||||
  public downlink: Observable<WebSocketMessage>;
 | 
			
		||||
  private ws: WebSocketSubject<WebSocketMessage>;
 | 
			
		||||
 | 
			
		||||
  constructor() {}
 | 
			
		||||
 | 
			
		||||
  public connect() {
 | 
			
		||||
    if (this.downlink) {
 | 
			
		||||
      return;
 | 
			
		||||
    if (!this.ws) {
 | 
			
		||||
      const wsproto = (location.protocol === 'https:') ? 'wss://' : 'ws://';
 | 
			
		||||
      const connAddr = wsproto + window.location.host + '/ws';
 | 
			
		||||
      this.ws = webSocket<WebSocketMessage>(connAddr);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const wsproto = (location.protocol === 'https:') ? 'wss://' : 'ws://';
 | 
			
		||||
    this.downlink = jsonWebsocketConnect(
 | 
			
		||||
      wsproto + window.location.host + '/ws',
 | 
			
		||||
      this.uplink = new QueueingSubject<object>()
 | 
			
		||||
    ).messages.pipe(
 | 
			
		||||
        map(message => <WebSocketMessage> message),
 | 
			
		||||
        share()
 | 
			
		||||
      );
 | 
			
		||||
    console.log('ws connected');
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public observeKey<T extends WebSocketMessage>(key: string): Observable<T> {
 | 
			
		||||
    return this.downlink.pipe(filter(message => message.key.startsWith(key)), map(message => <T> message));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public send(key: string, data?: any, trigger?: any): void {
 | 
			
		||||
    const message = {'key': key};
 | 
			
		||||
    if (data) { message['data'] = data; }
 | 
			
		||||
    if (trigger) { message['trigger'] = trigger; }
 | 
			
		||||
    this.sendJSON(message);
 | 
			
		||||
    return this.ws.pipe(
 | 
			
		||||
      filter(message => message.key.startsWith(key)),
 | 
			
		||||
      map(message => <T> message)
 | 
			
		||||
    );
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public sendJSON(json: any) {
 | 
			
		||||
    this.uplink.next(json);
 | 
			
		||||
    this.ws.next(json);
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user