Create initial version of WebSocket service

This commit is contained in:
Bálint Bokros 2017-12-18 17:50:24 +01:00
parent f14be39faf
commit 9ccb152f0f
3 changed files with 75 additions and 0 deletions

View File

@ -0,0 +1,15 @@
import { TestBed, inject } from '@angular/core/testing';
import { WebSocketService } from './websocket.service';
describe('WebSocketService', () => {
beforeEach(() => {
TestBed.configureTestingModule({
providers: [WebSocketService]
});
});
it('should be created', inject([WebSocketService], (service: WebSocketService) => {
expect(service).toBeTruthy();
}));
});

View File

@ -0,0 +1,56 @@
import { Injectable } from '@angular/core';
import { QueueingSubject } from 'queueing-subject';
import { Observable } from 'rxjs/Observable';
import websocketConnect from 'rxjs-websockets';
import { WSMessage } from './wsmessage';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/share';
function jsonWebsocketConnect(url: string, input: Observable<object>, protocols?: string | string[]) {
const jsonInput = input.map(message => JSON.stringify(message));
const { connectionStatus, messages } = websocketConnect(url, jsonInput, protocols);
const jsonMessages = messages.map(message => JSON.parse(message));
return { connectionStatus, messages: jsonMessages };
}
@Injectable()
export class WebSocketService {
private uplink: QueueingSubject<object>;
public downlink: Observable<WSMessage>;
constructor() {
}
public connect() {
if (this.downlink) {
return;
}
// Using share() causes a single WebSocket to be created when the first
// observer subscribes. This socket is shared with subsequent observers
// and closed when the observer count falls to zero.
this.downlink = jsonWebsocketConnect(
'ws://localhost:4242/ws',
this.uplink = new QueueingSubject<object>()
).messages.map(message => <WSMessage> message).share();
console.log('ws connected');
}
public observeAnchor(anchor: string): Observable<WSMessage> {
return this.downlink.filter(message => message.anchor === anchor);
}
public send(anchor: string, data: any): void {
// If the WebSocket is not connected then the QueueingSubject will ensure
// that messages are queued and delivered when the WebSocket reconnects.
// A regular Subject can be used to discard messages sent when the WebSocket
// is disconnected.
this.uplink.next({
'anchor': anchor,
'data': data
});
}
}

4
src/app/wsmessage.ts Normal file
View File

@ -0,0 +1,4 @@
export class WSMessage {
anchor: string;
data: any; // TODO: sane annotation
}