frontend-tutorial-framework/src/app/websocket.service.ts

59 lines
2.0 KiB
TypeScript

import { Injectable } from '@angular/core';
import { QueueingSubject } from 'queueing-subject';
import { Observable } from 'rxjs/Observable';
import websocketConnect from 'rxjs-websockets';
import { filter, map, share } from 'rxjs/operators';
import { WSMessage } from './wsmessage';
function jsonWebsocketConnect(url: string, input: Observable<object>, protocols?: string | string[]) {
const jsonInput = input.pipe(map(message => JSON.stringify(message)));
const { connectionStatus, messages } = websocketConnect(url, jsonInput, protocols);
const jsonMessages = messages.pipe(map(message => JSON.parse(message)));
return { connectionStatus, messages: jsonMessages };
}
@Injectable()
export class WebSocketService {
private uplink: QueueingSubject<object>;
public downlink: Observable<WSMessage<undefined>>;
constructor() {
}
public connect() {
if (this.downlink) {
return;
}
// Using share() causes a single WebSocket to be created when the first
// observer subscribes. This socket is shared with subsequent observers
// and closed when the observer count falls to zero.
const wsproto = (location.protocol === 'https:') ? 'wss://' : 'ws://';
this.downlink = jsonWebsocketConnect(
wsproto + window.location.host + '/ws',
this.uplink = new QueueingSubject<object>()
).messages.pipe(
map(message => <WSMessage<undefined>> message),
share()
);
console.log('ws connected');
}
public observeAnchor<T>(anchor: string): Observable<WSMessage<T>> {
return this.downlink.pipe(filter(message => message.anchor === anchor));
}
public send(anchor: string, data: any): void {
// If the WebSocket is not connected then the QueueingSubject will ensure
// that messages are queued and delivered when the WebSocket reconnects.
// A regular Subject can be used to discard messages sent when the WebSocket
// is disconnected.
this.uplink.next({
'anchor': anchor,
'data': data
});
}
}