diff --git a/README.md b/README.md
index 28e50c1..b6ce005 100644
--- a/README.md
+++ b/README.md
@@ -35,7 +35,8 @@ Then if their clocks drift relative to ours, we can seek consensus among a broad
 But at that point just run ntpd. Can still do consensus to verify
 but probably no need to implement custom time synchronization protocol.
 
-Wait NTP is centralized isn't it, not peer to peer...
+Apparently PTP, Precision Time Protocol, is a thing.
+PTP affords for a layer of user defined priority for best clock selection.
 
 ## Peering
 
@@ -118,7 +119,7 @@ To demonstrate the example application, you can open multiple terminals, and in
 export DEBUG="*,-express"
 export RHIZOME_REQUEST_BIND_PORT=4000
 export RHIZOME_PUBLISH_BIND_PORT=4001
-export RHIZOME_SEED_PEERS='127.0.0.1:4002, 127.0.0.1:4004'
+export RHIZOME_SEED_PEERS='localhost:4002, localhost:4004'
 export RHIZOME_HTTP_API_PORT=3000
 export RHIZOME_PEER_ID=peer1
 npm run example-app
@@ -128,7 +129,7 @@ npm run example-app
 export DEBUG="*,-express"
 export RHIZOME_REQUEST_BIND_PORT=4002
 export RHIZOME_PUBLISH_BIND_PORT=4003
-export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4004'
+export RHIZOME_SEED_PEERS='localhost:4000, localhost:4004'
 export RHIZOME_HTTP_API_PORT=3001
 export RHIZOME_PEER_ID=peer2
 npm run example-app
@@ -138,7 +139,7 @@ npm run example-app
 export DEBUG="*,-express"
 export RHIZOME_REQUEST_BIND_PORT=4004
 export RHIZOME_PUBLISH_BIND_PORT=4005
-export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4002'
+export RHIZOME_SEED_PEERS='localhost:4000, localhost:4002'
 export RHIZOME_HTTP_API_PORT=3002
 export RHIZOME_PEER_ID=peer3
 npm run example-app
diff --git a/__tests__/run.ts b/__tests__/run.ts
new file mode 100644
index 0000000..73133c8
--- /dev/null
+++ b/__tests__/run.ts
@@ -0,0 +1,60 @@
+import Debug from 'debug';
+import {RhizomeNode, RhizomeNodeConfig} from "../src/node";
+import {TypedCollection} from '../src/typed-collection';
+const debug = Debug('test:run');
+
+type User = {
+  id?: string;
+  name: string;
+  nameLong?: string;
+  email?: string;
+  age: number;
+};
+
+class App extends RhizomeNode {
+  constructor(config?: Partial<RhizomeNodeConfig>) {
+    super(config);
+    const users = new TypedCollection<User>("users");
+    users.rhizomeConnect(this);
+  }
+}
+
+describe('Run', () => {
+  let app: App;
+
+  beforeAll(async () => {
+    app = new App({
+      // TODO expose more conveniently as test config options
+      httpPort: 5000,
+      httpEnable: true,
+      requestBindPort: 5001,
+      publishBindPort: 5002,
+    });
+    await app.start();
+  });
+
+  afterAll(async () => {
+    debug('attempting to stop app');
+    await app.stop();
+  });
+
+  it('can put a new user', async () => {
+    const res = await fetch('http://localhost:5000/users', {
+      method: 'PUT',
+      headers: {'Content-Type': 'application/json'},
+      body: JSON.stringify({
+        name: "Peon",
+        id: "peon-1",
+        age: 263
+      })
+    });
+    const data = await res.json();
+    expect(data).toMatchObject({
+      properties: {
+        name: "Peon",
+        id: "peon-1",
+        age: 263
+      }
+    });
+  });
+});
diff --git a/src/collection.ts b/src/collection.ts
index bb2c0c6..8cb3ca5 100644
--- a/src/collection.ts
+++ b/src/collection.ts
@@ -3,12 +3,11 @@
 // It should enable operations like removing a property removes the value from the entities in the collection
 // It could then be further extended with e.g. table semantics like filter, sort, join
 
+import {randomUUID} from "node:crypto";
 import EventEmitter from "node:events";
-import { deltasAccepted, publishDelta, subscribeDeltas } from "./deltas";
-import { Entity, EntityProperties, EntityPropertiesDeltaBuilder } from "./object-layer";
-import { Delta } from "./types";
-import { randomUUID } from "node:crypto";
-import {myRequestAddr} from "./peers";
+import {RhizomeNode} from "./node";
+import {Entity, EntityProperties, EntityPropertiesDeltaBuilder} from "./object-layer";
+import {Delta} from "./types";
 
 // type Property = {
 //   name: string,
@@ -24,13 +23,24 @@ import {myRequestAddr} from "./peers";
 // }
 
 export class Collection {
+  rhizomeNode?: RhizomeNode;
+  name: string;
   entities = new Map<string, Entity>();
   eventStream = new EventEmitter();
-  constructor() {
-    subscribeDeltas((delta: Delta) => {
+
+  constructor(name: string) {
+    this.name = name;
+  }
+
+  rhizomeConnect(rhizomeNode: RhizomeNode) {
+    this.rhizomeNode = rhizomeNode;
+
+    rhizomeNode.deltaStream.subscribeDeltas((delta: Delta) => {
       // TODO: Make sure this is the kind of delta we're looking for
       this.applyDelta(delta);
     });
+
+    rhizomeNode.httpApi.serveCollection(this);
   }
 
   // Applies the javascript rules for updating object values,
@@ -45,7 +55,7 @@ export class Collection {
       entity.id = entityId;
       eventType = 'create';
     }
-    const deltaBulider = new EntityPropertiesDeltaBuilder(entityId);
+    const deltaBulider = new EntityPropertiesDeltaBuilder(this.rhizomeNode!, entityId);
 
     if (!properties) {
       // Let's interpret this as entity deletion
@@ -80,7 +90,7 @@ export class Collection {
       //* specific deltas removed. We could use it to extract a measurement
       //* of the effects of some deltas' inclusion or exclusion, the
       //* evaluation of which may lend evidence to some possible arguments.
-     
+
       this.entities.set(entityId, entity);
       if (anyChanged) {
         deltas?.push(deltaBulider.delta);
@@ -135,9 +145,9 @@ export class Collection {
     const deltas: Delta[] = [];
     const entity = this.updateEntity(entityId, properties, true, deltas);
     deltas.forEach(async (delta: Delta) => {
-      delta.receivedFrom = myRequestAddr;
-      deltasAccepted.push(delta);
-      await publishDelta(delta);
+      delta.receivedFrom = this.rhizomeNode!.myRequestAddr;
+      this.rhizomeNode!.deltaStream.deltasAccepted.push(delta);
+      await this.rhizomeNode!.deltaStream.publishDelta(delta);
     });
     return entity;
   }
@@ -146,8 +156,8 @@ export class Collection {
     const deltas: Delta[] = [];
     this.updateEntity(entityId, undefined, true, deltas);
     deltas.forEach(async (delta: Delta) => {
-      deltasAccepted.push(delta);
-      await publishDelta(delta);
+      this.rhizomeNode!.deltaStream.deltasAccepted.push(delta);
+      await this.rhizomeNode!.deltaStream.publishDelta(delta);
     });
   }
 
diff --git a/src/config.ts b/src/config.ts
index 1821528..24cf79b 100644
--- a/src/config.ts
+++ b/src/config.ts
@@ -6,7 +6,7 @@ import {PeerAddress} from "./types";
 
 export const LEVEL_DB_DIR = process.env.RHIZOME_LEVEL_DB_DIR ?? './data';
 export const CREATOR = process.env.USER!;
-export const HOST_ID = process.env.RHIZOME_PEER_ID || randomUUID();
+export const PEER_ID = process.env.RHIZOME_PEER_ID || randomUUID();
 export const ADDRESS = process.env.RHIZOME_ADDRESS ?? 'localhost';
 export const REQUEST_BIND_ADDR = process.env.RHIZOME_REQUEST_BIND_ADDR || ADDRESS;
 export const REQUEST_BIND_PORT = parseInt(process.env.RHIZOME_REQUEST_BIND_PORT || '4000');
@@ -14,7 +14,7 @@ export const REQUEST_BIND_HOST = process.env.RHIZOME_REQUEST_BIND_HOST || REQUES
 export const PUBLISH_BIND_ADDR = process.env.RHIZOME_PUBLISH_BIND_ADDR || ADDRESS;
 export const PUBLISH_BIND_PORT = parseInt(process.env.RHIZOME_PUBLISH_BIND_PORT || '4001');
 export const PUBLISH_BIND_HOST = process.env.RHIZOME_PUBLISH_BIND_HOST || PUBLISH_BIND_ADDR;
-export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || '127.0.0.1';
+export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || 'localhost';
 export const HTTP_API_PORT = parseInt(process.env.RHIZOME_HTTP_API_PORT || '3000');
 export const HTTP_API_ENABLE = process.env.RHIZOME_HTTP_API_ENABLE === 'true';
 export const SEED_PEERS: PeerAddress[]  = (process.env.RHIZOME_SEED_PEERS || '').split(',')
diff --git a/src/context.ts b/src/context.ts
new file mode 100644
index 0000000..e69de29
diff --git a/src/deltas.ts b/src/deltas.ts
index 021ac62..a4d9890 100644
--- a/src/deltas.ts
+++ b/src/deltas.ts
@@ -1,103 +1,94 @@
+import Debug from 'debug';
 import EventEmitter from 'node:events';
 import objectHash from 'object-hash';
-import {myRequestAddr} from './peers';
-import {publishSock, subscribeSock} from './pub-sub';
-import {Decision, Delta, PeerAddress} from './types';
-import Debug from 'debug';
+import {RhizomeNode} from './node';
+import {Decision, Delta} from './types';
 const debug = Debug('deltas');
 
-export const deltaStream = new EventEmitter();
+export class DeltaStream {
+  rhizomeNode: RhizomeNode;
+  deltaStream = new EventEmitter();
+  deltasProposed: Delta[] = [];
+  deltasAccepted: Delta[] = [];
+  deltasRejected: Delta[] = [];
+  deltasDeferred: Delta[] = [];
+  hashesReceived = new Set<string>();
 
-export const deltasProposed: Delta[] = [];
-export const deltasAccepted: Delta[] = [];
-export const deltasRejected: Delta[] = [];
-export const deltasDeferred: Delta[] = [];
-
-export const hashesReceived = new Set<string>();
-
-export function applyPolicy(delta: Delta): Decision {
-  return !!delta && Decision.Accept;
-}
-
-export function receiveDelta(delta: Delta) {
-  // Deduplication: if we already received this delta, disregard it
-  const hash = objectHash(delta);
-  if (!hashesReceived.has(hash)) {
-    hashesReceived.add(hash);
-    deltasProposed.push(delta);
+  constructor(rhizomeNode: RhizomeNode) {
+    this.rhizomeNode = rhizomeNode;
   }
-}
 
-export function ingestDelta(delta: Delta) {
-  const decision = applyPolicy(delta);
-  switch (decision) {
-    case Decision.Accept:
-      deltasAccepted.push(delta);
-      deltaStream.emit('delta', {delta});
-      break;
-    case Decision.Reject:
-      deltasRejected.push(delta);
-      break;
-    case Decision.Defer:
-      deltasDeferred.push(delta);
-      break;
+  applyPolicy(delta: Delta): Decision {
+    return !!delta && Decision.Accept;
   }
-}
 
-export function ingestNext(): boolean {
-  const delta = deltasProposed.shift();
-  if (!delta) {
-    return false;
-  }
-  ingestDelta(delta);
-  return true;
-}
-
-export function ingestAll() {
-  while (ingestNext());
-}
-
-export function ingestNextDeferred(): boolean {
-  const delta = deltasDeferred.shift();
-  if (!delta) {
-    return false;
-  }
-  ingestDelta(delta);
-  return true;
-}
-
-export function ingestAllDeferred() {
-  while (ingestNextDeferred());
-}
-
-export function subscribeDeltas(fn: (delta: Delta) => void) {
-  deltaStream.on('delta', ({delta}) => {
-    fn(delta);
-  });
-}
-
-export async function publishDelta(delta: Delta) {
-  debug(`Publishing delta: ${JSON.stringify(delta)}`);
-  await publishSock.send(["deltas", myRequestAddr.toAddrString(), serializeDelta(delta)]);
-}
-
-function serializeDelta(delta: Delta) {
-  return JSON.stringify(delta);
-}
-
-function deserializeDelta(input: string) {
-  return JSON.parse(input);
-}
-
-export async function runDeltas() {
-  for await (const [topic, sender, msg] of subscribeSock) {
-    if (topic.toString() !== "deltas") {
-      continue;
+  receiveDelta(delta: Delta) {
+    // Deduplication: if we already received this delta, disregard it
+    const hash = objectHash(delta);
+    if (!this.hashesReceived.has(hash)) {
+      this.hashesReceived.add(hash);
+      this.deltasProposed.push(delta);
     }
-    const delta = deserializeDelta(msg.toString());
-    delta.receivedFrom = PeerAddress.fromString(sender.toString());
-    debug(`Received delta: ${JSON.stringify(delta)}`);
-    ingestDelta(delta);
+  }
+
+  ingestDelta(delta: Delta) {
+    const decision = this.applyPolicy(delta);
+    switch (decision) {
+      case Decision.Accept:
+        this.deltasAccepted.push(delta);
+        this.deltaStream.emit('delta', {delta});
+        break;
+      case Decision.Reject:
+        this.deltasRejected.push(delta);
+        break;
+      case Decision.Defer:
+        this.deltasDeferred.push(delta);
+        break;
+    }
+  }
+
+  ingestNext(): boolean {
+    const delta = this.deltasProposed.shift();
+    if (!delta) {
+      return false;
+    }
+    this.ingestDelta(delta);
+    return true;
+  }
+
+  ingestAll() {
+    while (this.ingestNext());
+  }
+
+  ingestNextDeferred(): boolean {
+    const delta = this.deltasDeferred.shift();
+    if (!delta) {
+      return false;
+    }
+    this.ingestDelta(delta);
+    return true;
+  }
+
+  ingestAllDeferred() {
+    while (this.ingestNextDeferred());
+  }
+
+  subscribeDeltas(fn: (delta: Delta) => void) {
+    this.deltaStream.on('delta', ({delta}) => {
+      fn(delta);
+    });
+  }
+
+  async publishDelta(delta: Delta) {
+    debug(`Publishing delta: ${JSON.stringify(delta)}`);
+    await this.rhizomeNode.pubSub.publish("deltas", this.serializeDelta(delta));
+  }
+
+  serializeDelta(delta: Delta) {
+    return JSON.stringify(delta);
+  }
+
+  deserializeDelta(input: string) {
+    return JSON.parse(input);
   }
 }
-
diff --git a/src/example-app.ts b/src/example-app.ts
index d767986..057a473 100644
--- a/src/example-app.ts
+++ b/src/example-app.ts
@@ -1,12 +1,7 @@
-import {HTTP_API_ENABLE} from "./config";
-import {runDeltas} from "./deltas";
-import {runHttpApi} from "./http-api";
-import {Entity} from "./object-layer";
-import {askAllPeersForDeltas, subscribeToSeeds} from "./peers";
-import {bindPublish, } from "./pub-sub";
-import {bindReply, runRequestHandlers} from "./request-reply";
-import {TypedCollection} from "./typed-collection";
 import Debug from 'debug';
+import {RhizomeNode} from "./node";
+import {Entity} from "./object-layer";
+import {TypedCollection} from "./typed-collection";
 const debug = Debug('example-app');
 
 // As an app we want to be able to write and read data.
@@ -23,21 +18,9 @@ type User = {
 };
 
 (async () => {
-  const users = new TypedCollection<User>();
-
-  await bindPublish();
-  await bindReply();
-  if (HTTP_API_ENABLE) {
-    runHttpApi({users});
-  }
-
-  runDeltas();
-  runRequestHandlers();
-  await new Promise((resolve) => setTimeout(resolve, 500));
-  subscribeToSeeds();
-  await new Promise((resolve) => setTimeout(resolve, 500));
-  askAllPeersForDeltas();
-  await new Promise((resolve) => setTimeout(resolve, 1000));
+  const rhizomeNode = new RhizomeNode();
+  const users = new TypedCollection<User>("users");
+  users.rhizomeConnect(rhizomeNode);
 
   users.onUpdate((u: Entity) => {
     debug('User updated:', u);
@@ -47,6 +30,9 @@ type User = {
     debug('New user!:', u);
   });
 
+
+  await rhizomeNode.start()
+
   const taliesin = users.put(undefined, {
     // id: 'taliesin-1',
     name: 'Taliesin',
diff --git a/src/http-api.ts b/src/http-api.ts
index b01aeb6..76417df 100644
--- a/src/http-api.ts
+++ b/src/http-api.ts
@@ -1,25 +1,22 @@
 import Debug from "debug";
 import express from "express";
+import {FSWatcher} from "fs";
 import {readdirSync, readFileSync, watch} from "fs";
+import {Server} from "http";
 import path, {join} from "path";
 import {Converter} from "showdown";
 import {Collection} from "./collection";
-import {HTTP_API_ADDR, HTTP_API_PORT} from "./config";
-import {deltasAccepted} from "./deltas";
-import {peers} from "./peers";
+import {RhizomeNode} from "./node";
 import {Delta} from "./types";
 const debug = Debug('http-api');
 
-type CollectionsToServe = {
-  [key: string]: Collection;
-};
-
 const docConverter = new Converter({
   completeHTMLDocument: true,
   // simpleLineBreaks: true,
   tables: true,
   tasklists: true
 });
+
 const htmlDocFromMarkdown = (md: string): string => docConverter.makeHtml(md);
 
 type mdFileInfo = {
@@ -31,6 +28,8 @@ type mdFileInfo = {
 class MDFiles {
   files = new Map<string, mdFileInfo>();
   readme?: mdFileInfo;
+  dirWatcher?: FSWatcher;
+  readmeWatcher?: FSWatcher;
 
   readFile(name: string) {
     const md = readFileSync(join('./markdown', `${name}.md`)).toString();
@@ -66,7 +65,7 @@ class MDFiles {
   }
 
   watchDir() {
-    watch('./markdown', null, (eventType, filename) => {
+    this.dirWatcher = watch('./markdown', null, (eventType, filename) => {
       if (!filename) return;
       if (!filename.endsWith(".md")) return;
 
@@ -91,7 +90,7 @@ class MDFiles {
   }
 
   watchReadme() {
-    watch('./README.md', null, (eventType, filename) => {
+   this.readmeWatcher = watch('./README.md', null, (eventType, filename) => {
       if (!filename) return;
 
       switch (eventType) {
@@ -104,127 +103,143 @@ class MDFiles {
       }
     });
   }
+
+  close() {
+    this.dirWatcher?.close();
+    this.readmeWatcher?.close();
+  }
 }
 
-export function runHttpApi(collections?: CollectionsToServe) {
-  const app = express();
-  app.use(express.json());
+export class HttpApi {
+  rhizomeNode: RhizomeNode;
+  app = express();
+  mdFiles = new MDFiles();
+  server?: Server;
 
-  // Get list of markdown files
-  const mdFiles = new MDFiles();
-  mdFiles.readDir();
-  mdFiles.readReadme();
-  mdFiles.watchDir();
-  mdFiles.watchReadme();
+  constructor(rhizomeNode: RhizomeNode) {
+    this.rhizomeNode = rhizomeNode;
+    this.app.use(express.json());
+  }
 
-  // Serve README
-  app.get('/html/README', (_req: express.Request, res: express.Response) => {
-    const html = mdFiles.getReadmeHTML();
-    res.setHeader('content-type', 'text/html').send(html);
-  });
+  start() {
+    // Scan and watch for markdown files
+    this.mdFiles.readDir();
+    this.mdFiles.readReadme();
+    this.mdFiles.watchDir();
+    this.mdFiles.watchReadme();
 
-  // Serve markdown files as html
-  app.get('/html/:name', (req: express.Request, res: express.Response) => {
-    let html = mdFiles.getHtml(req.params.name);
-    if (!html) {
-      res.status(404);
-      html = htmlDocFromMarkdown('# 404\n\n## [Index](/html)');
-    }
-    res.setHeader('content-type', 'text/html');
-    res.send(html);
-  });
-
-  // Serve index
-  {
-    let md = `# Files\n\n`;
-    md += `[README](/html/README)\n\n`;
-    for (const name of mdFiles.list()) {
-      md += `- [${name}](./${name})\n`;
-    }
-    const html = htmlDocFromMarkdown(md);
-
-    app.get('/html', (_req: express.Request, res: express.Response) => {
+    // Serve README
+    this.app.get('/html/README', (_req: express.Request, res: express.Response) => {
+      const html = this.mdFiles.getReadmeHTML();
       res.setHeader('content-type', 'text/html').send(html);
     });
+
+    // Serve markdown files as html
+    this.app.get('/html/:name', (req: express.Request, res: express.Response) => {
+      let html = this.mdFiles.getHtml(req.params.name);
+      if (!html) {
+        res.status(404);
+        html = htmlDocFromMarkdown('# 404\n\n## [Index](/html)');
+      }
+      res.setHeader('content-type', 'text/html');
+      res.send(html);
+    });
+
+    // Serve index
+    {
+      let md = `# Files\n\n`;
+      md += `[README](/html/README)\n\n`;
+      for (const name of this.mdFiles.list()) {
+        md += `- [${name}](./${name})\n`;
+      }
+      const html = htmlDocFromMarkdown(md);
+
+      this.app.get('/html', (_req: express.Request, res: express.Response) => {
+        res.setHeader('content-type', 'text/html').send(html);
+      });
+    }
+
+    // Serve list of all deltas accepted
+    // TODO: This won't scale well
+    this.app.get("/deltas", (_req: express.Request, res: express.Response) => {
+      res.json(this.rhizomeNode.deltaStream.deltasAccepted);
+    });
+
+    // Get the number of deltas ingested by this node
+    this.app.get("/deltas/count", (_req: express.Request, res: express.Response) => {
+      res.json(this.rhizomeNode.deltaStream.deltasAccepted.length);
+    });
+
+    // Get the list of peers seen by this node (including itself)
+    this.app.get("/peers", (_req: express.Request, res: express.Response) => {
+      res.json(this.rhizomeNode.peers.peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => {
+        const deltasAcceptedCount = this.rhizomeNode.deltaStream.deltasAccepted
+          .filter((delta: Delta) => {
+            return delta.receivedFrom?.addr == reqAddr.addr &&
+              delta.receivedFrom?.port == reqAddr.port;
+          })
+          .length;
+        const peerInfo = {
+          reqAddr: reqAddr.toAddrString(),
+          publishAddr: publishAddr?.toAddrString(),
+          isSelf,
+          isSeedPeer,
+          deltaCount: {
+            accepted: deltasAcceptedCount
+          }
+        };
+        return peerInfo;
+      }));
+    });
+
+    // Get the number of peers seen by this node (including itself)
+    this.app.get("/peers/count", (_req: express.Request, res: express.Response) => {
+      res.json(this.rhizomeNode.peers.peers.length);
+    });
+
+    const {httpAddr, httpPort} = this.rhizomeNode.config;
+    this.server = this.app.listen(httpPort, httpAddr, () => {
+      debug(`HTTP API bound to ${httpAddr}:${httpPort}`);
+    });
   }
 
-  // Set up API routes
+  serveCollection(collection: Collection) {
+    const {name} = collection;
 
-  if (collections) {
-    for (const [name, collection] of Object.entries(collections)) {
-      debug(`collection: ${name}`);
+    // Get the ID of all domain entities
+    this.app.get(`/${name}/ids`, (_req: express.Request, res: express.Response) => {
+      res.json({ids: collection.getIds()});
+    });
 
-      // Get the ID of all domain entities
-      app.get(`/${name}/ids`, (_req: express.Request, res: express.Response) => {
-        res.json({ids: collection.getIds()});
-      });
+    // Get a single domain entity by ID
+    this.app.get(`/${name}/:id`, (req: express.Request, res: express.Response) => {
+      const {params: {id}} = req;
+      const ent = collection.get(id);
+      res.json(ent);
+    });
 
-      // Get a single domain entity by ID
-      app.get(`/${name}/:id`, (req: express.Request, res: express.Response) => {
-        const {params: {id}} = req;
-        const ent = collection.get(id);
-        res.json(ent);
-      });
+    // Add a new domain entity
+    // TODO: schema validation
+    this.app.put(`/${name}`, (req: express.Request, res: express.Response) => {
+      const {body: properties} = req;
+      const ent = collection.put(properties.id, properties);
+      res.json(ent);
+    });
 
-      // Add a new domain entity
-      // TODO: schema validation
-      app.put(`/${name}`, (req: express.Request, res: express.Response) => {
-        const {body: properties} = req;
-        const ent = collection.put(properties.id, properties);
-        res.json(ent);
-      });
-
-      // Update a domain entity
-      app.put(`/${name}/:id`, (req: express.Request, res: express.Response) => {
-        const {body: properties, params: {id}} = req;
-        if (properties.id && properties.id !== id) {
-          res.status(400).json({error: "ID Mismatch", param: id, property: properties.id});
-          return;
-        }
-        const ent = collection.put(id, properties);
-        res.json(ent);
-      });
-    }
+    // Update a domain entity
+    this.app.put(`/${name}/:id`, (req: express.Request, res: express.Response) => {
+      const {body: properties, params: {id}} = req;
+      if (properties.id && properties.id !== id) {
+        res.status(400).json({error: "ID Mismatch", param: id, property: properties.id});
+        return;
+      }
+      const ent = collection.put(id, properties);
+      res.json(ent);
+    });
   }
 
-  app.get("/deltas", (_req: express.Request, res: express.Response) => {
-    // TODO: streaming
-    res.json(deltasAccepted);
-  });
-
-  // Get the number of deltas ingested by this node
-  app.get("/deltas/count", (_req: express.Request, res: express.Response) => {
-    res.json(deltasAccepted.length);
-  });
-
-  // Get the list of peers seen by this node (including itself)
-  app.get("/peers", (_req: express.Request, res: express.Response) => {
-    res.json(peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => {
-      const deltasAcceptedCount = deltasAccepted
-        .filter((delta: Delta) => {
-          return delta.receivedFrom?.addr == reqAddr.addr &&
-            delta.receivedFrom?.port == reqAddr.port;
-        })
-        .length;
-      const peerInfo = {
-        reqAddr: reqAddr.toAddrString(),
-        publishAddr: publishAddr?.toAddrString(),
-        isSelf,
-        isSeedPeer,
-        deltaCount: {
-          accepted: deltasAcceptedCount
-        }
-      };
-      return peerInfo;
-    }));
-  });
-
-  // Get the number of peers seen by this node (including itself)
-  app.get("/peers/count", (_req: express.Request, res: express.Response) => {
-    res.json(peers.length);
-  });
-
-  app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => {
-    debug(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`);
-  });
+  async stop() {
+    this.server?.close();
+    this.mdFiles.close();
+  }
 }
diff --git a/src/node.ts b/src/node.ts
new file mode 100644
index 0000000..c57d33a
--- /dev/null
+++ b/src/node.ts
@@ -0,0 +1,82 @@
+import Debug from 'debug';
+import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUBLISH_BIND_ADDR, PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_ADDR, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from './config';
+import {DeltaStream} from './deltas';
+import {HttpApi} from './http-api';
+import {Peers} from './peers';
+import {PubSub} from './pub-sub';
+import {RequestReply} from './request-reply';
+import {PeerAddress} from './types';
+import {Collection} from './collection';
+const debug = Debug('rhizome-node');
+
+export type RhizomeNodeConfig = {
+  requestBindAddr: string;
+  requestBindHost: string;
+  requestBindPort: number;
+  publishBindAddr: string;
+  publishBindHost: string;
+  publishBindPort: number;
+  httpAddr: string;
+  httpPort: number;
+  httpEnable: boolean;
+  seedPeers: PeerAddress[];
+  peerId: string;
+  creator: string; // TODO each host should be able to support multiple users
+};
+
+// So that we can run more than one instance in the same process (for testing)
+export class RhizomeNode {
+  config: RhizomeNodeConfig;
+  pubSub: PubSub;
+  requestReply: RequestReply;
+  httpApi: HttpApi;
+  deltaStream: DeltaStream;
+  peers: Peers;
+  myRequestAddr: PeerAddress;
+  myPublishAddr: PeerAddress;
+
+  constructor(config?: Partial<RhizomeNodeConfig>) {
+    this.config = {
+      requestBindAddr: REQUEST_BIND_ADDR,
+      requestBindHost: REQUEST_BIND_HOST,
+      requestBindPort: REQUEST_BIND_PORT,
+      publishBindAddr: PUBLISH_BIND_ADDR,
+      publishBindHost: PUBLISH_BIND_HOST,
+      publishBindPort: PUBLISH_BIND_PORT,
+      httpAddr: HTTP_API_ADDR,
+      httpPort: HTTP_API_PORT,
+      httpEnable: HTTP_API_ENABLE,
+      seedPeers: SEED_PEERS,
+      peerId: PEER_ID,
+      creator: CREATOR,
+      ...config
+    };
+    debug('config', this.config);
+    this.myRequestAddr = new PeerAddress(this.config.requestBindHost, this.config.requestBindPort);
+    this.myPublishAddr = new PeerAddress(this.config.publishBindHost, this.config.publishBindPort);
+    this.pubSub = new PubSub(this);
+    this.requestReply = new RequestReply(this);
+    this.httpApi = new HttpApi(this);
+    this.deltaStream = new DeltaStream(this);
+    this.peers = new Peers(this);
+  }
+
+  async start() {
+    this.pubSub.start();
+    this.requestReply.start();
+    if (this.config.httpEnable) {
+      this.httpApi.start();
+    }
+    await new Promise((resolve) => setTimeout(resolve, 500));
+    this.peers.subscribeToSeeds();
+    await new Promise((resolve) => setTimeout(resolve, 500));
+    this.peers.askAllPeersForDeltas();
+    await new Promise((resolve) => setTimeout(resolve, 1000));
+  }
+
+  async stop() {
+    await this.pubSub.stop();
+    await this.requestReply.stop();
+    await this.httpApi.stop();
+  }
+}
diff --git a/src/object-layer.ts b/src/object-layer.ts
index 4ab91d5..bab164d 100644
--- a/src/object-layer.ts
+++ b/src/object-layer.ts
@@ -1,14 +1,14 @@
 // The goal here is to provide a translation for
 // entities and their properties
 // to and from (sequences of) deltas.
- 
+
 // How can our caller define the entities and their properties?
 // - As typescript types?
 // - As typescript interfaces?
 // - As typescript classes?
 
-import { CREATOR, HOST_ID } from "./config";
-import { Delta, PropertyTypes } from "./types";
+import {RhizomeNode} from "./node";
+import {Delta, PropertyTypes} from "./types";
 
 export type EntityProperties = {
   [key: string]: PropertyTypes;
@@ -29,10 +29,10 @@ export class Entity {
 export class EntityPropertiesDeltaBuilder {
   delta: Delta;
 
-  constructor(entityId: string) {
+  constructor(rhizomeNode: RhizomeNode, entityId: string) {
     this.delta = {
-      creator: CREATOR,
-      host: HOST_ID,
+      creator: rhizomeNode.config.creator,
+      host: rhizomeNode.config.peerId,
       pointers: [{
         localContext: 'id',
         target: entityId,
diff --git a/src/peers.ts b/src/peers.ts
index 3dfc0a3..82f0331 100644
--- a/src/peers.ts
+++ b/src/peers.ts
@@ -1,9 +1,10 @@
-import {PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from "./config";
-import {deltasAccepted, ingestAll, receiveDelta} from "./deltas";
-import {connectSubscribe} from "./pub-sub";
-import {PeerRequest, registerRequestHandler, RequestSocket, ResponseSocket} from "./request-reply";
-import {Delta, PeerAddress} from "./types";
 import Debug from 'debug';
+import {Message} from 'zeromq';
+import {SEED_PEERS} from "./config";
+import {RhizomeNode} from "./node";
+import {Subscription} from './pub-sub';
+import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply";
+import {Delta, PeerAddress} from "./types";
 const debug = Debug('peers');
 
 export enum PeerMethods {
@@ -11,48 +12,50 @@ export enum PeerMethods {
   AskForDeltas
 }
 
-export const myRequestAddr = new PeerAddress(REQUEST_BIND_HOST, REQUEST_BIND_PORT);
-export const myPublishAddr = new PeerAddress(PUBLISH_BIND_HOST, PUBLISH_BIND_PORT);
-
-registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => {
-  debug('inspecting peer request');
-  switch (req.method) {
-    case PeerMethods.GetPublishAddress: {
-      debug('it\'s a request for our publish address');
-      await res.send(myPublishAddr.toAddrString());
-      break;
-    }
-    case PeerMethods.AskForDeltas: {
-      debug('it\'s a request for deltas');
-      // TODO: stream these rather than
-      // trying to write them all in one message
-      await res.send(JSON.stringify(deltasAccepted));
-      break;
-    }
-  }
-});
-
 class Peer {
+  rhizomeNode: RhizomeNode;
   reqAddr: PeerAddress;
-  reqSock: RequestSocket;
+  reqSock?: RequestSocket;
   publishAddr: PeerAddress | undefined;
   isSelf: boolean;
   isSeedPeer: boolean;
-  constructor(addr: string, port: number) {
-    this.reqAddr = new PeerAddress(addr, port);
-    this.reqSock = new RequestSocket(addr, port);
-    this.isSelf = addr === myRequestAddr.addr && port === myRequestAddr.port;
-    this.isSeedPeer = !!SEED_PEERS.find((seedPeer) =>
-      addr === seedPeer.addr && port === seedPeer.port);
+  subscription?: Subscription;
+
+  constructor(rhizomeNode: RhizomeNode, reqAddr: PeerAddress) {
+    this.rhizomeNode = rhizomeNode;
+    this.reqAddr = reqAddr;
+    this.isSelf = reqAddr.isEqual(this.rhizomeNode.myRequestAddr);
+    this.isSeedPeer = !!SEED_PEERS.find((seedPeer) => reqAddr.isEqual(seedPeer));
   }
-  async subscribe() {
-    if (!this.publishAddr) {
-      const res = await this.reqSock.request(PeerMethods.GetPublishAddress);
-      // TODO: input validation
-      this.publishAddr = PeerAddress.fromString(res.toString());
-      connectSubscribe(this.publishAddr!);
+
+  async request(method: PeerMethods): Promise<Message> {
+    if (!this.reqSock) {
+      this.reqSock = new RequestSocket(this.reqAddr);
     }
+    return this.reqSock.request(method);
   }
+
+  async subscribeDeltas() {
+    if (!this.publishAddr) {
+      debug(`requesting publish addr from peer ${this.reqAddr.toAddrString()}`);
+      const res = await this.request(PeerMethods.GetPublishAddress);
+      this.publishAddr = PeerAddress.fromString(res.toString());
+      debug(`received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`);
+    }
+
+    this.subscription = this.rhizomeNode.pubSub.subscribe(
+      this.publishAddr,
+      "deltas",
+      (sender, msg) => {
+        const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg.toString());
+        delta.receivedFrom = sender;
+        debug(`Received delta: ${JSON.stringify(delta)}`);
+        this.rhizomeNode.deltaStream.ingestDelta(delta);
+      });
+
+    this.subscription.start();
+  }
+
   async askForDeltas(): Promise<Delta[]> {
     // TODO as a first approximation we are trying to cram the entire history
     // of accepted deltas, into one (potentially gargantuan) json message.
@@ -60,42 +63,69 @@ class Peer {
     // Third pass should find a way to reduce the number of deltas transmitted.
 
     // TODO: requestTimeout
-    const res = await this.reqSock.request(PeerMethods.AskForDeltas);
+    const res = await this.request(PeerMethods.AskForDeltas);
     const deltas = JSON.parse(res.toString());
     return deltas;
   }
 }
 
-export const peers: Peer[] = [];
+export class Peers {
+  rhizomeNode: RhizomeNode;
+  peers: Peer[] = [];
 
-peers.push(new Peer(myRequestAddr.addr, myRequestAddr.port));
+  constructor(rhizomeNode: RhizomeNode) {
+    this.rhizomeNode = rhizomeNode;
 
-function newPeer(addr: string, port: number) {
-  const peer = new Peer(addr, port);
-  peers.push(peer);
-  return peer;
-}
+    // Add self to the list of peers, but don't connect
+    this.addPeer(this.rhizomeNode.myRequestAddr);
 
-export async function subscribeToSeeds() {
-  SEED_PEERS.forEach(async ({addr, port}, idx) => {
-    debug(`SEED PEERS[${idx}]=${addr}:${port}`);
-    const peer = newPeer(addr, port);
-    await peer.subscribe();
-  });
-}
-
-//! TODO Expect abysmal scaling properties with this function
-export async function askAllPeersForDeltas() {
-  peers
-    .filter(({isSelf}) => !isSelf)
-    .forEach(async (peer, idx) => {
-      debug(`Asking peer ${idx} for deltas`);
-      const deltas = await peer.askForDeltas();
-      debug(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);
-      for (const delta of deltas) {
-        delta.receivedFrom = peer.reqAddr;
-        receiveDelta(delta);
+    this.rhizomeNode.requestReply.registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => {
+      debug('inspecting peer request');
+      switch (req.method) {
+        case PeerMethods.GetPublishAddress: {
+          debug('it\'s a request for our publish address');
+          await res.send(this.rhizomeNode.myPublishAddr.toAddrString());
+          break;
+        }
+        case PeerMethods.AskForDeltas: {
+          debug('it\'s a request for deltas');
+          // TODO: stream these rather than
+          // trying to write them all in one message
+          await res.send(JSON.stringify(this.rhizomeNode.deltaStream.deltasAccepted));
+          break;
+        }
       }
-      ingestAll();
     });
+
+  }
+
+  addPeer(addr: PeerAddress): Peer {
+    const peer = new Peer(this.rhizomeNode, addr);
+    this.peers.push(peer);
+    debug('added peer', addr);
+    return peer;
+  }
+
+  async subscribeToSeeds() {
+    SEED_PEERS.forEach(async (addr, idx) => {
+      debug(`SEED PEERS[${idx}]=${addr.toAddrString()}`);
+      const peer = this.addPeer(addr);
+      await peer.subscribeDeltas();
+    });
+  }
+
+  //! TODO Expect abysmal scaling properties with this function
+  async askAllPeersForDeltas() {
+    this.peers.filter(({isSelf}) => !isSelf)
+      .forEach(async (peer, idx) => {
+        debug(`Asking peer ${idx} for deltas`);
+        const deltas = await peer.askForDeltas();
+        debug(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);
+        for (const delta of deltas) {
+          delta.receivedFrom = peer.reqAddr;
+          this.rhizomeNode.deltaStream.receiveDelta(delta);
+        }
+        this.rhizomeNode.deltaStream.ingestAll();
+      });
+  }
 }
diff --git a/src/pub-sub.ts b/src/pub-sub.ts
index 4356241..5286255 100644
--- a/src/pub-sub.ts
+++ b/src/pub-sub.ts
@@ -1,23 +1,79 @@
-import {Publisher, Subscriber} from 'zeromq';
-import {PUBLISH_BIND_ADDR, PUBLISH_BIND_PORT} from './config';
-import {PeerAddress} from './types';
 import Debug from 'debug';
+import {Message, Publisher, Subscriber} from 'zeromq';
+import {RhizomeNode} from './node';
+import {PeerAddress} from './types';
 const debug = Debug('pub-sub');
 
-export const publishSock = new Publisher();
-export const subscribeSock = new Subscriber();
+export type SubscribedMessageHandler = (sender: PeerAddress, msg: Message) => void;
 
-export async function bindPublish() {
-  const addrStr = `tcp://${PUBLISH_BIND_ADDR}:${PUBLISH_BIND_PORT}`;
-  await publishSock.bind(addrStr);
-  debug(`Publishing socket bound to ${addrStr}`);
+// TODO: Allow subscribing to multiple topics on one socket
+export class Subscription {
+  sock = new Subscriber();
+  topic: string;
+  publishAddr: PeerAddress;
+  publishAddrStr: string;
+  cb: SubscribedMessageHandler;
+
+  constructor(publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler) {
+    this.cb = cb;
+    this.topic = topic;
+    this.publishAddr = publishAddr;
+    this.publishAddrStr = `tcp://${this.publishAddr.toAddrString()}`;
+  }
+
+  async start() {
+    this.sock.connect(this.publishAddrStr);
+    this.sock.subscribe(this.topic);
+    debug(`Subscribing to ${this.topic} topic on ${this.publishAddrStr}`);
+
+    for await (const [, sender, msg] of this.sock) {
+      const senderAddr = PeerAddress.fromString(sender.toString());
+      this.cb(senderAddr, msg);
+    }
+  }
 }
 
-export function connectSubscribe(publishAddr: PeerAddress) {
-  // TODO: peer discovery
-  const addrStr = `tcp://${publishAddr.toAddrString()}`;
-  debug('connectSubscribe', {addrStr});
-  subscribeSock.connect(addrStr);
-  subscribeSock.subscribe("deltas");
-  debug(`Subscribing to ${addrStr}`);
+export class PubSub {
+  rhizomeNode: RhizomeNode;
+  publishSock: Publisher;
+  publishAddrStr: string;
+  subscriptions: Subscription[] = [];
+
+  constructor(rhizomeNode: RhizomeNode) {
+    this.rhizomeNode = rhizomeNode;
+    this.publishSock = new Publisher();
+
+    const {publishBindAddr, publishBindPort} = this.rhizomeNode.config;
+    this.publishAddrStr = `tcp://${publishBindAddr}:${publishBindPort}`;
+  }
+
+  async start() {
+    await this.publishSock.bind(this.publishAddrStr);
+    debug(`Publishing socket bound to ${this.publishAddrStr}`);
+  }
+
+  async publish(topic: string, msg: string) {
+    await this.publishSock.send([
+      topic,
+      this.rhizomeNode.myRequestAddr.toAddrString(),
+      msg
+    ]);
+  }
+
+  subscribe(publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler): Subscription {
+    const subscription = new Subscription(publishAddr, topic, cb);
+    this.subscriptions.push(subscription);
+    return subscription;
+  }
+
+  async stop() {
+    await this.publishSock.unbind(this.publishAddrStr);
+    this.publishSock.close();
+    this.publishSock = new Publisher();
+
+    for (const subscription of this.subscriptions) {
+      subscription.sock.close();
+      debug('subscription socket is closed?', subscription.sock.closed);
+    }
+  }
 }
diff --git a/src/query.ts b/src/query.ts
deleted file mode 100644
index aa3199e..0000000
--- a/src/query.ts
+++ /dev/null
@@ -1,15 +0,0 @@
-import { Query, QueryResult, } from './types';
-import { deltasAccepted } from './deltas';
-import { applyFilter } from './filter';
-
-// export const queryResultMemo = new Map<Query, QueryResult>();
-
-export function issueQuery(query: Query): QueryResult {
-  const deltas = applyFilter(deltasAccepted, query.filterExpr);
-  return {
-    deltas
-    // TODO: Materialized view / state collapse snapshot
-  };
-}
-
-
diff --git a/src/request-reply.ts b/src/request-reply.ts
index 6a5bb33..029e912 100644
--- a/src/request-reply.ts
+++ b/src/request-reply.ts
@@ -1,8 +1,9 @@
-import { Request, Reply, Message } from 'zeromq';
-import { REQUEST_BIND_PORT, REQUEST_BIND_ADDR} from './config';
-import { EventEmitter } from 'node:events';
-import { PeerMethods } from './peers';
+import {Request, Reply, Message} from 'zeromq';
+import {EventEmitter} from 'node:events';
+import {PeerMethods} from './peers';
 import Debug from 'debug';
+import {RhizomeNode} from './node';
+import {PeerAddress} from './types';
 const debug = Debug('request-reply');
 
 export type PeerRequest = {
@@ -11,32 +12,28 @@ export type PeerRequest = {
 
 export type RequestHandler = (req: PeerRequest, res: ResponseSocket) => void;
 
-export const replySock = new Reply();
-const requestStream = new EventEmitter();
+// TODO: Retain handle to request socket for each peer, so we only need to open once
+export class RequestSocket {
+  sock = new Request();
 
-export async function bindReply() {
-  const addrStr = `tcp://${REQUEST_BIND_ADDR}:${REQUEST_BIND_PORT}`;
-  await replySock.bind(addrStr);
-  debug(`Reply socket bound to ${addrStr}`);
-}
-
-export async function runRequestHandlers() {
-  for await (const [msg] of replySock) {
-    debug(`Received message`, {msg: msg.toString()});
-    const req = peerRequestFromMsg(msg);
-    requestStream.emit('request', req);
+  constructor(addr: PeerAddress) {
+    const addrStr = `tcp://${addr.addr}:${addr.port}`;
+    this.sock.connect(addrStr);
+    debug(`Request socket connecting to ${addrStr}`);
   }
-}
 
-function peerRequestFromMsg(msg: Message): PeerRequest | null {
-  let req: PeerRequest | null = null;
-  try {
-    const obj = JSON.parse(msg.toString());
-    req = {...obj};
-  } catch(e) {
-    debug('error receiving command', e);
+  async request(method: PeerMethods): Promise<Message> {
+    const req: PeerRequest = {
+      method
+    };
+    await this.sock.send(JSON.stringify(req));
+    // Wait for a response.
+    // TODO: Timeout
+    // TODO: Retry
+    // this.sock.receiveTimeout = ...
+    const [res] = await this.sock.receive();
+    return res;
   }
-  return req;
 }
 
 export class ResponseSocket {
@@ -53,30 +50,54 @@ export class ResponseSocket {
   }
 }
 
-export function registerRequestHandler(handler: RequestHandler) {
-  requestStream.on('request', (req) => {
-    const res = new ResponseSocket(replySock);
-    handler(req, res);
-  });
+function peerRequestFromMsg(msg: Message): PeerRequest | null {
+  let req: PeerRequest | null = null;
+  try {
+    const obj = JSON.parse(msg.toString());
+    req = {...obj};
+  } catch (e) {
+    debug('error receiving command', e);
+  }
+  return req;
 }
 
-export class RequestSocket {
-  sock = new Request();
-  constructor(host: string, port: number) {
-    const addrStr = `tcp://${host}:${port}`;
-    this.sock.connect(addrStr);
-    debug(`Request socket connecting to ${addrStr}`);
+export class RequestReply {
+  rhizomeNode: RhizomeNode;
+  replySock = new Reply();
+  requestStream = new EventEmitter();
+  requestBindAddrStr: string;
+
+  constructor(rhizomeNode: RhizomeNode) {
+    this.rhizomeNode = rhizomeNode;
+    const {requestBindAddr, requestBindPort} = this.rhizomeNode.config;
+    this.requestBindAddrStr = `tcp://${requestBindAddr}:${requestBindPort}`;
   }
-  async request(method: PeerMethods): Promise<Message> {
-    const req: PeerRequest = {
-      method
-    };
-    await this.sock.send(JSON.stringify(req));
-    // Wait for a response.
-    // TODO: Timeout
-    // TODO: Retry
-    // this.sock.receiveTimeout = ...
-    const [res] = await this.sock.receive();
-    return res;
+
+  // Listen for incoming requests
+  async start() {
+
+    await this.replySock.bind(this.requestBindAddrStr);
+    debug(`Reply socket bound to ${this.requestBindAddrStr}`);
+
+    for await (const [msg] of this.replySock) {
+      debug(`Received message`, {msg: msg.toString()});
+      const req = peerRequestFromMsg(msg);
+      this.requestStream.emit('request', req);
+    }
+  }
+
+  // Add a top level handler for incoming requests.
+  // Each handler will get a copy of every message.
+  registerRequestHandler(handler: RequestHandler) {
+    this.requestStream.on('request', (req) => {
+      const res = new ResponseSocket(this.replySock);
+      handler(req, res);
+    });
+  }
+
+  async stop() {
+    await this.replySock.unbind(this.requestBindAddrStr);
+    this.replySock.close();
+    this.replySock = new Reply();
   }
 }
diff --git a/src/typed-collection.ts b/src/typed-collection.ts
index fec9742..28c188c 100644
--- a/src/typed-collection.ts
+++ b/src/typed-collection.ts
@@ -5,6 +5,7 @@ export class TypedCollection<T extends EntityProperties> extends Collection {
   put(id: string | undefined, properties: T): Entity {
     return super.put(id, properties);
   }
+
   get(id: string): Entity | undefined {
     return super.get(id);
   }
diff --git a/src/types.ts b/src/types.ts
index e72b124..816347e 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -46,19 +46,27 @@ export type Properties = {[key: string]: PropertyTypes};
 export class PeerAddress {
   addr: string;
   port: number;
+
   constructor(addr: string, port: number) {
     this.addr = addr;
     this.port = port;
   }
+
   static fromString(addrString: string): PeerAddress {
     const [addr, port] = addrString.trim().split(':');
     return new PeerAddress(addr, parseInt(port));
   }
+
   toAddrString() {
     return `${this.addr}:${this.port}`;
   }
+
   toJSON() {
     return this.toAddrString();
   }
+
+  isEqual(other: PeerAddress) {
+    return this.addr === other.addr && this.port === other.port;
+  }
 };