From 9ccb152f0f9835392fecc5bd17c627985bbee798 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C3=A1lint=20Bokros?= Date: Mon, 18 Dec 2017 17:50:24 +0100 Subject: [PATCH] Create initial version of WebSocket service --- src/app/websocket.service.spec.ts | 15 +++++++++ src/app/websocket.service.ts | 56 +++++++++++++++++++++++++++++++ src/app/wsmessage.ts | 4 +++ 3 files changed, 75 insertions(+) create mode 100644 src/app/websocket.service.spec.ts create mode 100644 src/app/websocket.service.ts create mode 100644 src/app/wsmessage.ts diff --git a/src/app/websocket.service.spec.ts b/src/app/websocket.service.spec.ts new file mode 100644 index 0000000..d13769c --- /dev/null +++ b/src/app/websocket.service.spec.ts @@ -0,0 +1,15 @@ +import { TestBed, inject } from '@angular/core/testing'; + +import { WebSocketService } from './websocket.service'; + +describe('WebSocketService', () => { + beforeEach(() => { + TestBed.configureTestingModule({ + providers: [WebSocketService] + }); + }); + + it('should be created', inject([WebSocketService], (service: WebSocketService) => { + expect(service).toBeTruthy(); + })); +}); diff --git a/src/app/websocket.service.ts b/src/app/websocket.service.ts new file mode 100644 index 0000000..767f199 --- /dev/null +++ b/src/app/websocket.service.ts @@ -0,0 +1,56 @@ +import { Injectable } from '@angular/core'; +import { QueueingSubject } from 'queueing-subject'; +import { Observable } from 'rxjs/Observable'; +import websocketConnect from 'rxjs-websockets'; +import { WSMessage } from './wsmessage'; +import 'rxjs/add/operator/map'; +import 'rxjs/add/operator/filter'; +import 'rxjs/add/operator/share'; + + +function jsonWebsocketConnect(url: string, input: Observable, protocols?: string | string[]) { + const jsonInput = input.map(message => JSON.stringify(message)); + const { connectionStatus, messages } = websocketConnect(url, jsonInput, protocols); + const jsonMessages = messages.map(message => JSON.parse(message)); + return { connectionStatus, messages: jsonMessages }; +} + +@Injectable() +export class WebSocketService { + private uplink: QueueingSubject; + public downlink: Observable; + + constructor() { + + } + + public connect() { + if (this.downlink) { + return; + } + + // Using share() causes a single WebSocket to be created when the first + // observer subscribes. This socket is shared with subsequent observers + // and closed when the observer count falls to zero. + this.downlink = jsonWebsocketConnect( + 'ws://localhost:4242/ws', + this.uplink = new QueueingSubject() + ).messages.map(message => message).share(); + console.log('ws connected'); + } + + public observeAnchor(anchor: string): Observable { + return this.downlink.filter(message => message.anchor === anchor); + } + + public send(anchor: string, data: any): void { + // If the WebSocket is not connected then the QueueingSubject will ensure + // that messages are queued and delivered when the WebSocket reconnects. + // A regular Subject can be used to discard messages sent when the WebSocket + // is disconnected. + this.uplink.next({ + 'anchor': anchor, + 'data': data + }); + } +} diff --git a/src/app/wsmessage.ts b/src/app/wsmessage.ts new file mode 100644 index 0000000..d7a0827 --- /dev/null +++ b/src/app/wsmessage.ts @@ -0,0 +1,4 @@ +export class WSMessage { + anchor: string; + data: any; // TODO: sane annotation +}