59 lines
2.0 KiB
TypeScript
59 lines
2.0 KiB
TypeScript
import { Injectable } from '@angular/core';
|
|
import { QueueingSubject } from 'queueing-subject';
|
|
import { Observable } from 'rxjs/Observable';
|
|
import websocketConnect from 'rxjs-websockets';
|
|
import { filter, map, share } from 'rxjs/operators';
|
|
import { WSMessage } from './wsmessage';
|
|
|
|
|
|
function jsonWebsocketConnect(url: string, input: Observable<object>, protocols?: string | string[]) {
|
|
const jsonInput = input.pipe(map(message => JSON.stringify(message)));
|
|
const { connectionStatus, messages } = websocketConnect(url, jsonInput, protocols);
|
|
const jsonMessages = messages.pipe(map(message => JSON.parse(message)));
|
|
return { connectionStatus, messages: jsonMessages };
|
|
}
|
|
|
|
@Injectable()
|
|
export class WebSocketService {
|
|
private uplink: QueueingSubject<object>;
|
|
public downlink: Observable<WSMessage<undefined>>;
|
|
|
|
constructor() {
|
|
|
|
}
|
|
|
|
public connect() {
|
|
if (this.downlink) {
|
|
return;
|
|
}
|
|
|
|
// Using share() causes a single WebSocket to be created when the first
|
|
// observer subscribes. This socket is shared with subsequent observers
|
|
// and closed when the observer count falls to zero.
|
|
let wsproto = (location.protocol === 'https:') ? 'wss://' : 'ws://';
|
|
this.downlink = jsonWebsocketConnect(
|
|
wsproto + window.location.host + '/ws',
|
|
this.uplink = new QueueingSubject<object>()
|
|
).messages.pipe(
|
|
map(message => <WSMessage<undefined>> message),
|
|
share()
|
|
);
|
|
console.log('ws connected');
|
|
}
|
|
|
|
public observeAnchor<T>(anchor: string): Observable<WSMessage<T>> {
|
|
return this.downlink.pipe(filter(message => message.anchor === anchor));
|
|
}
|
|
|
|
public send(anchor: string, data: any): void {
|
|
// If the WebSocket is not connected then the QueueingSubject will ensure
|
|
// that messages are queued and delivered when the WebSocket reconnects.
|
|
// A regular Subject can be used to discard messages sent when the WebSocket
|
|
// is disconnected.
|
|
this.uplink.next({
|
|
'anchor': anchor,
|
|
'data': data
|
|
});
|
|
}
|
|
}
|