101 lines
2.9 KiB
TypeScript
101 lines
2.9 KiB
TypeScript
import Debug from 'debug';
|
|
import {Publisher, Subscriber} from 'zeromq';
|
|
import {RhizomeNode} from './node';
|
|
import {PeerAddress} from './peers';
|
|
const debug = Debug('rz:pub-sub');
|
|
|
|
export type SubscribedMessageHandler = (sender: PeerAddress, msg: string) => void;
|
|
|
|
// TODO: Allow subscribing to multiple topics on one socket
|
|
export class Subscription {
|
|
sock = new Subscriber();
|
|
topic: string;
|
|
publishAddr: PeerAddress;
|
|
publishAddrStr: string;
|
|
cb: SubscribedMessageHandler;
|
|
|
|
constructor(
|
|
readonly pubSub: PubSub,
|
|
publishAddr: PeerAddress,
|
|
topic: string,
|
|
cb: SubscribedMessageHandler,
|
|
) {
|
|
this.cb = cb;
|
|
this.topic = topic;
|
|
this.publishAddr = publishAddr;
|
|
this.publishAddrStr = `tcp://${this.publishAddr.toAddrString()}`;
|
|
}
|
|
|
|
async start() {
|
|
this.sock.connect(this.publishAddrStr);
|
|
this.sock.subscribe(this.topic);
|
|
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `Subscribing to ${this.topic} topic on ZeroMQ ${this.publishAddrStr}`);
|
|
|
|
// Wait for ZeroMQ messages.
|
|
// This will block indefinitely.
|
|
for await (const [, sender, msg] of this.sock) {
|
|
const senderStr = PeerAddress.fromString(sender.toString());
|
|
const msgStr = msg.toString();
|
|
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `ZeroMQ subscribtion received msg: ${msgStr}`);
|
|
this.cb(senderStr, msgStr);
|
|
}
|
|
|
|
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `Done waiting for subscription socket for topic ${this.topic}`);
|
|
}
|
|
}
|
|
|
|
export class PubSub {
|
|
rhizomeNode: RhizomeNode;
|
|
publishSock?: Publisher;
|
|
publishAddrStr: string;
|
|
subscriptions: Subscription[] = [];
|
|
|
|
constructor(rhizomeNode: RhizomeNode) {
|
|
this.rhizomeNode = rhizomeNode;
|
|
|
|
const {publishBindAddr, publishBindPort} = this.rhizomeNode.config;
|
|
this.publishAddrStr = `tcp://${publishBindAddr}:${publishBindPort}`;
|
|
}
|
|
|
|
async startZmq() {
|
|
this.publishSock = new Publisher();
|
|
|
|
await this.publishSock.bind(this.publishAddrStr);
|
|
debug(`[${this.rhizomeNode.config.peerId}]`, `ZeroMQ publishing socket bound to ${this.publishAddrStr}`);
|
|
}
|
|
|
|
async publish(topic: string, msg: string) {
|
|
if (this.publishSock) {
|
|
await this.publishSock.send([
|
|
topic,
|
|
this.rhizomeNode.myRequestAddr.toAddrString(),
|
|
msg
|
|
]);
|
|
debug(`[${this.rhizomeNode.config.peerId}]`, `Published to ZeroMQ, msg: ${msg}`);
|
|
}
|
|
}
|
|
|
|
subscribe(
|
|
publishAddr: PeerAddress,
|
|
topic: string,
|
|
cb: SubscribedMessageHandler
|
|
): Subscription {
|
|
const subscription = new Subscription(this, publishAddr, topic, cb);
|
|
this.subscriptions.push(subscription);
|
|
return subscription;
|
|
}
|
|
|
|
async stop() {
|
|
if (this.publishSock) {
|
|
await this.publishSock.unbind(this.publishAddrStr);
|
|
this.publishSock.close();
|
|
// Free the memory by taking the old object out of scope.
|
|
this.publishSock = undefined;
|
|
}
|
|
|
|
for (const subscription of this.subscriptions) {
|
|
subscription.sock.close();
|
|
}
|
|
}
|
|
}
|