implemented initial query engine and exposed HTTP endpoints
This commit is contained in:
parent
c2f1f537f7
commit
9a7bd7d3b0
@ -1,4 +1,345 @@
|
||||
describe.skip('Query', () => {
|
||||
it('can use a json logic expression to filter the queries', () => {});
|
||||
it('can use a json logic expression to implement a lossy resolver', () => {});
|
||||
import { QueryEngine } from '../src/query-engine';
|
||||
import { Lossless } from '../src/lossless';
|
||||
import { DefaultSchemaRegistry } from '../src/schema-registry';
|
||||
import { CommonSchemas, SchemaBuilder, PrimitiveSchemas } from '../src/schema';
|
||||
import { Delta } from '../src/delta';
|
||||
import { RhizomeNode } from '../src/node';
|
||||
|
||||
describe('Query Engine', () => {
|
||||
let queryEngine: QueryEngine;
|
||||
let lossless: Lossless;
|
||||
let schemaRegistry: DefaultSchemaRegistry;
|
||||
let rhizomeNode: RhizomeNode;
|
||||
|
||||
beforeEach(async () => {
|
||||
rhizomeNode = new RhizomeNode({
|
||||
peerId: 'test-query-node',
|
||||
publishBindPort: 4002,
|
||||
requestBindPort: 4003
|
||||
});
|
||||
|
||||
lossless = rhizomeNode.lossless;
|
||||
schemaRegistry = new DefaultSchemaRegistry();
|
||||
queryEngine = new QueryEngine(lossless, schemaRegistry);
|
||||
|
||||
// Register test schemas
|
||||
schemaRegistry.register(CommonSchemas.User());
|
||||
schemaRegistry.register(CommonSchemas.UserSummary());
|
||||
|
||||
// Create a custom test schema
|
||||
const blogPostSchema = SchemaBuilder
|
||||
.create('blog-post')
|
||||
.name('Blog Post')
|
||||
.property('title', PrimitiveSchemas.requiredString())
|
||||
.property('content', PrimitiveSchemas.string())
|
||||
.property('author', PrimitiveSchemas.requiredString())
|
||||
.property('published', PrimitiveSchemas.boolean())
|
||||
.property('views', PrimitiveSchemas.number())
|
||||
.required('title', 'author')
|
||||
.build();
|
||||
|
||||
schemaRegistry.register(blogPostSchema);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
// No cleanup needed for now
|
||||
});
|
||||
|
||||
async function createUser(id: string, name: string, age?: number, email?: string) {
|
||||
// Create user entity with name
|
||||
const nameDelta = new Delta({
|
||||
id: `delta-${id}-name-${Date.now()}`,
|
||||
creator: 'test',
|
||||
host: 'test-host',
|
||||
timeCreated: Date.now(),
|
||||
pointers: [
|
||||
{ localContext: 'user', target: id, targetContext: 'name' },
|
||||
{ localContext: 'value', target: name }
|
||||
]
|
||||
});
|
||||
lossless.ingestDelta(nameDelta);
|
||||
|
||||
// Add age if provided
|
||||
if (age !== undefined) {
|
||||
const ageDelta = new Delta({
|
||||
id: `delta-${id}-age-${Date.now()}`,
|
||||
creator: 'test',
|
||||
host: 'test-host',
|
||||
timeCreated: Date.now(),
|
||||
pointers: [
|
||||
{ localContext: 'user', target: id, targetContext: 'age' },
|
||||
{ localContext: 'value', target: age }
|
||||
]
|
||||
});
|
||||
lossless.ingestDelta(ageDelta);
|
||||
}
|
||||
|
||||
// Add email if provided
|
||||
if (email) {
|
||||
const emailDelta = new Delta({
|
||||
id: `delta-${id}-email-${Date.now()}`,
|
||||
creator: 'test',
|
||||
host: 'test-host',
|
||||
timeCreated: Date.now(),
|
||||
pointers: [
|
||||
{ localContext: 'user', target: id, targetContext: 'email' },
|
||||
{ localContext: 'value', target: email }
|
||||
]
|
||||
});
|
||||
lossless.ingestDelta(emailDelta);
|
||||
}
|
||||
}
|
||||
|
||||
async function createBlogPost(id: string, title: string, author: string, published = false, views = 0) {
|
||||
// Title delta
|
||||
const titleDelta = new Delta({
|
||||
id: `delta-${id}-title-${Date.now()}`,
|
||||
creator: 'test',
|
||||
host: 'test-host',
|
||||
timeCreated: Date.now(),
|
||||
pointers: [
|
||||
{ localContext: 'post', target: id, targetContext: 'title' },
|
||||
{ localContext: 'value', target: title }
|
||||
]
|
||||
});
|
||||
lossless.ingestDelta(titleDelta);
|
||||
|
||||
// Author delta
|
||||
const authorDelta = new Delta({
|
||||
id: `delta-${id}-author-${Date.now()}`,
|
||||
creator: 'test',
|
||||
host: 'test-host',
|
||||
timeCreated: Date.now(),
|
||||
pointers: [
|
||||
{ localContext: 'post', target: id, targetContext: 'author' },
|
||||
{ localContext: 'value', target: author }
|
||||
]
|
||||
});
|
||||
lossless.ingestDelta(authorDelta);
|
||||
|
||||
// Published delta
|
||||
const publishedDelta = new Delta({
|
||||
id: `delta-${id}-published-${Date.now()}`,
|
||||
creator: 'test',
|
||||
host: 'test-host',
|
||||
timeCreated: Date.now(),
|
||||
pointers: [
|
||||
{ localContext: 'post', target: id, targetContext: 'published' },
|
||||
{ localContext: 'value', target: published }
|
||||
]
|
||||
});
|
||||
lossless.ingestDelta(publishedDelta);
|
||||
|
||||
// Views delta
|
||||
const viewsDelta = new Delta({
|
||||
id: `delta-${id}-views-${Date.now()}`,
|
||||
creator: 'test',
|
||||
host: 'test-host',
|
||||
timeCreated: Date.now(),
|
||||
pointers: [
|
||||
{ localContext: 'post', target: id, targetContext: 'views' },
|
||||
{ localContext: 'value', target: views }
|
||||
]
|
||||
});
|
||||
lossless.ingestDelta(viewsDelta);
|
||||
}
|
||||
|
||||
describe('Basic Query Operations', () => {
|
||||
it('can query all entities of a schema type', async () => {
|
||||
// Create test users
|
||||
await createUser('user1', 'Alice', 25, 'alice@example.com');
|
||||
await createUser('user2', 'Bob', 30);
|
||||
await createUser('user3', 'Charlie', 35, 'charlie@example.com');
|
||||
|
||||
const result = await queryEngine.query('user');
|
||||
|
||||
expect(result.totalFound).toBe(3);
|
||||
expect(result.limited).toBe(false);
|
||||
expect(Object.keys(result.entities)).toHaveLength(3);
|
||||
expect(result.entities['user1']).toBeDefined();
|
||||
expect(result.entities['user2']).toBeDefined();
|
||||
expect(result.entities['user3']).toBeDefined();
|
||||
});
|
||||
|
||||
it('can query a single entity by ID', async () => {
|
||||
await createUser('user1', 'Alice', 25, 'alice@example.com');
|
||||
|
||||
const result = await queryEngine.queryOne('user', 'user1');
|
||||
|
||||
expect(result).toBeDefined();
|
||||
expect(result?.id).toBe('user1');
|
||||
expect(result?.propertyDeltas.name).toBeDefined();
|
||||
expect(result?.propertyDeltas.age).toBeDefined();
|
||||
expect(result?.propertyDeltas.email).toBeDefined();
|
||||
});
|
||||
|
||||
it('returns null for non-existent entity', async () => {
|
||||
const result = await queryEngine.queryOne('user', 'nonexistent');
|
||||
expect(result).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('JSON Logic Filtering', () => {
|
||||
beforeEach(async () => {
|
||||
// Create test data
|
||||
await createUser('user1', 'Alice', 25, 'alice@example.com');
|
||||
await createUser('user2', 'Bob', 30, 'bob@example.com');
|
||||
await createUser('user3', 'Charlie', 35, 'charlie@example.com');
|
||||
await createUser('user4', 'Diana', 20);
|
||||
});
|
||||
|
||||
it('can filter by primitive property values', async () => {
|
||||
// Find users older than 28
|
||||
const result = await queryEngine.query('user', {
|
||||
'>': [{ 'var': 'age' }, 28]
|
||||
});
|
||||
|
||||
expect(result.totalFound).toBe(2);
|
||||
expect(result.entities['user2']).toBeDefined(); // Bob, 30
|
||||
expect(result.entities['user3']).toBeDefined(); // Charlie, 35
|
||||
expect(result.entities['user1']).toBeUndefined(); // Alice, 25
|
||||
expect(result.entities['user4']).toBeUndefined(); // Diana, 20
|
||||
});
|
||||
|
||||
it('can filter by string properties', async () => {
|
||||
// Find users with name starting with 'A' - using substring check instead of startsWith
|
||||
const result = await queryEngine.query('user', {
|
||||
'in': ['A', { 'var': 'name' }]
|
||||
});
|
||||
|
||||
expect(result.totalFound).toBe(1);
|
||||
expect(result.entities['user1']).toBeDefined(); // Alice
|
||||
});
|
||||
|
||||
it('can filter by null/missing properties', async () => {
|
||||
// Find users without email
|
||||
const result = await queryEngine.query('user', {
|
||||
'==': [{ 'var': 'email' }, null]
|
||||
});
|
||||
|
||||
expect(result.totalFound).toBe(1);
|
||||
expect(result.entities['user4']).toBeDefined(); // Diana has no email
|
||||
});
|
||||
|
||||
it('can use complex logic expressions', async () => {
|
||||
// Find users who are (older than 30) OR (younger than 25 AND have email)
|
||||
const result = await queryEngine.query('user', {
|
||||
'or': [
|
||||
{ '>': [{ 'var': 'age' }, 30] },
|
||||
{
|
||||
'and': [
|
||||
{ '<': [{ 'var': 'age' }, 25] },
|
||||
{ '!=': [{ 'var': 'email' }, null] }
|
||||
]
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
expect(result.totalFound).toBe(1);
|
||||
expect(result.entities['user3']).toBeDefined(); // Charlie, 35 (older than 30)
|
||||
// Diana is younger than 25 but has no email
|
||||
// Alice is 25, not younger than 25
|
||||
});
|
||||
});
|
||||
|
||||
describe('Blog Post Queries', () => {
|
||||
beforeEach(async () => {
|
||||
await createBlogPost('post1', 'Introduction to Rhizome', 'alice', true, 150);
|
||||
await createBlogPost('post2', 'Advanced Queries', 'bob', true, 75);
|
||||
await createBlogPost('post3', 'Draft Post', 'alice', false, 0);
|
||||
await createBlogPost('post4', 'Popular Post', 'charlie', true, 1000);
|
||||
});
|
||||
|
||||
it('can filter published posts', async () => {
|
||||
const result = await queryEngine.query('blog-post', {
|
||||
'==': [{ 'var': 'published' }, true]
|
||||
});
|
||||
|
||||
expect(result.totalFound).toBe(3);
|
||||
expect(result.entities['post1']).toBeDefined();
|
||||
expect(result.entities['post2']).toBeDefined();
|
||||
expect(result.entities['post4']).toBeDefined();
|
||||
expect(result.entities['post3']).toBeUndefined(); // Draft
|
||||
});
|
||||
|
||||
it('can filter by author', async () => {
|
||||
const result = await queryEngine.query('blog-post', {
|
||||
'==': [{ 'var': 'author' }, 'alice']
|
||||
});
|
||||
|
||||
expect(result.totalFound).toBe(2);
|
||||
expect(result.entities['post1']).toBeDefined();
|
||||
expect(result.entities['post3']).toBeDefined();
|
||||
});
|
||||
|
||||
it('can filter by view count ranges', async () => {
|
||||
// Posts with more than 100 views
|
||||
const result = await queryEngine.query('blog-post', {
|
||||
'>': [{ 'var': 'views' }, 100]
|
||||
});
|
||||
|
||||
expect(result.totalFound).toBe(2);
|
||||
expect(result.entities['post1']).toBeDefined(); // 150 views
|
||||
expect(result.entities['post4']).toBeDefined(); // 1000 views
|
||||
});
|
||||
});
|
||||
|
||||
describe('Query Options', () => {
|
||||
beforeEach(async () => {
|
||||
for (let i = 1; i <= 10; i++) {
|
||||
await createUser(`user${i}`, `User${i}`, 20 + i);
|
||||
}
|
||||
});
|
||||
|
||||
it('can limit query results', async () => {
|
||||
const result = await queryEngine.query('user', undefined, { maxResults: 5 });
|
||||
|
||||
expect(result.totalFound).toBe(10);
|
||||
expect(result.limited).toBe(true);
|
||||
expect(Object.keys(result.entities)).toHaveLength(5);
|
||||
});
|
||||
|
||||
it('respects delta filters', async () => {
|
||||
const result = await queryEngine.query('user', undefined, {
|
||||
deltaFilter: (delta) => delta.creator === 'test'
|
||||
});
|
||||
|
||||
expect(result.totalFound).toBe(10);
|
||||
expect(result.limited).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Statistics', () => {
|
||||
it('provides query engine statistics', async () => {
|
||||
await createUser('user1', 'Alice', 25);
|
||||
await createBlogPost('post1', 'Test Post', 'alice', true, 50);
|
||||
|
||||
const stats = queryEngine.getStats();
|
||||
|
||||
expect(stats.totalEntities).toBe(2);
|
||||
expect(stats.registeredSchemas).toBeGreaterThan(0);
|
||||
expect(stats.schemasById['user']).toBe(1);
|
||||
expect(stats.schemasById['blog-post']).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Error Handling', () => {
|
||||
it('handles invalid schema IDs gracefully', async () => {
|
||||
const result = await queryEngine.query('nonexistent-schema');
|
||||
expect(result.totalFound).toBe(0);
|
||||
expect(Object.keys(result.entities)).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('handles malformed JSON Logic expressions', async () => {
|
||||
await createUser('user1', 'Alice', 25);
|
||||
|
||||
const result = await queryEngine.query('user', {
|
||||
'invalid-operator': [{ 'var': 'age' }, 25]
|
||||
});
|
||||
|
||||
// Should not crash, may return empty results or skip problematic entities
|
||||
expect(result).toBeDefined();
|
||||
expect(typeof result.totalFound).toBe('number');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
121
src/http/api.ts
121
src/http/api.ts
@ -2,11 +2,16 @@ import express, {Router} from "express";
|
||||
import {Collection} from "../collection-abstract";
|
||||
import {Delta} from "../delta";
|
||||
import {RhizomeNode} from "../node";
|
||||
import {JsonLogic} from "../query-engine";
|
||||
|
||||
export class HttpApi {
|
||||
router = Router();
|
||||
|
||||
constructor(readonly rhizomeNode: RhizomeNode) {
|
||||
this.setupRoutes();
|
||||
}
|
||||
|
||||
private setupRoutes() {
|
||||
// --------------- deltas ----------------
|
||||
|
||||
// Serve list of all deltas accepted
|
||||
@ -55,6 +60,10 @@ export class HttpApi {
|
||||
this.router.get("/peers/count", (_req: express.Request, res: express.Response) => {
|
||||
res.json(this.rhizomeNode.peers.peers.length);
|
||||
});
|
||||
|
||||
// Initialize lossless and query endpoints
|
||||
this.serveLossless();
|
||||
this.serveQuery();
|
||||
}
|
||||
|
||||
// serveCollection<T extends Collection>(collection: T) {
|
||||
@ -141,4 +150,116 @@ export class HttpApi {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
serveQuery() {
|
||||
// Query entities by schema with optional JSON Logic filter
|
||||
this.router.post('/query/:schemaId', async (req: express.Request, res: express.Response) => {
|
||||
try {
|
||||
const { schemaId } = req.params;
|
||||
const { filter, maxResults, deltaFilter } = req.body;
|
||||
|
||||
const options: any = {};
|
||||
if (maxResults) options.maxResults = maxResults;
|
||||
if (deltaFilter) {
|
||||
// Note: deltaFilter would need to be serialized/deserialized properly in a real implementation
|
||||
console.warn('deltaFilter not supported in HTTP API yet');
|
||||
}
|
||||
|
||||
const result = await this.rhizomeNode.queryEngine.query(schemaId, filter, options);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
data: result
|
||||
});
|
||||
} catch (error) {
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Unknown error'
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Get a single entity by ID with schema validation
|
||||
this.router.get('/query/:schemaId/:entityId', async (req: express.Request, res: express.Response) => {
|
||||
try {
|
||||
const { schemaId, entityId } = req.params;
|
||||
|
||||
const result = await this.rhizomeNode.queryEngine.queryOne(schemaId, entityId);
|
||||
|
||||
if (result) {
|
||||
res.json({
|
||||
success: true,
|
||||
data: result
|
||||
});
|
||||
} else {
|
||||
res.status(404).json({
|
||||
success: false,
|
||||
error: 'Entity not found or does not match schema'
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Unknown error'
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Get query engine statistics
|
||||
this.router.get('/query/stats', (_req: express.Request, res: express.Response) => {
|
||||
try {
|
||||
const stats = this.rhizomeNode.queryEngine.getStats();
|
||||
res.json({
|
||||
success: true,
|
||||
data: stats
|
||||
});
|
||||
} catch (error) {
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Unknown error'
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// List all registered schemas
|
||||
this.router.get('/schemas', (_req: express.Request, res: express.Response) => {
|
||||
try {
|
||||
const schemas = this.rhizomeNode.schemaRegistry.list();
|
||||
res.json({
|
||||
success: true,
|
||||
data: schemas
|
||||
});
|
||||
} catch (error) {
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Unknown error'
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Get a specific schema
|
||||
this.router.get('/schemas/:schemaId', (req: express.Request, res: express.Response) => {
|
||||
try {
|
||||
const { schemaId } = req.params;
|
||||
const schema = this.rhizomeNode.schemaRegistry.get(schemaId);
|
||||
|
||||
if (schema) {
|
||||
res.json({
|
||||
success: true,
|
||||
data: schema
|
||||
});
|
||||
} else {
|
||||
res.status(404).json({
|
||||
success: false,
|
||||
error: 'Schema not found'
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Unknown error'
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,8 @@ import {Lossless} from './lossless';
|
||||
import {parseAddressList, PeerAddress, Peers} from './peers';
|
||||
import {PubSub} from './pub-sub';
|
||||
import {RequestReply} from './request-reply';
|
||||
import {QueryEngine} from './query-engine';
|
||||
import {DefaultSchemaRegistry} from './schema-registry';
|
||||
const debug = Debug('rz:rhizome-node');
|
||||
|
||||
export type RhizomeNodeConfig = {
|
||||
@ -32,6 +34,8 @@ export class RhizomeNode {
|
||||
deltaStream: DeltaStream;
|
||||
lossless: Lossless;
|
||||
peers: Peers;
|
||||
queryEngine: QueryEngine;
|
||||
schemaRegistry: DefaultSchemaRegistry;
|
||||
myRequestAddr: PeerAddress;
|
||||
myPublishAddr: PeerAddress;
|
||||
|
||||
@ -66,6 +70,8 @@ export class RhizomeNode {
|
||||
this.deltaStream = new DeltaStream(this);
|
||||
this.peers = new Peers(this);
|
||||
this.lossless = new Lossless(this);
|
||||
this.schemaRegistry = new DefaultSchemaRegistry();
|
||||
this.queryEngine = new QueryEngine(this.lossless, this.schemaRegistry);
|
||||
}
|
||||
|
||||
async start(syncOnStart = false) {
|
||||
|
298
src/query-engine.ts
Normal file
298
src/query-engine.ts
Normal file
@ -0,0 +1,298 @@
|
||||
import { apply } from 'json-logic-js';
|
||||
import Debug from 'debug';
|
||||
import { SchemaRegistry, SchemaID, ObjectSchema } from './schema';
|
||||
import { Lossless, LosslessViewOne, LosslessViewMany } from './lossless';
|
||||
import { DomainEntityID } from './types';
|
||||
import { Delta, DeltaFilter } from './delta';
|
||||
|
||||
const debug = Debug('rz:query');
|
||||
|
||||
export type JsonLogic = Record<string, any>;
|
||||
|
||||
export interface QueryOptions {
|
||||
maxResults?: number;
|
||||
deltaFilter?: DeltaFilter;
|
||||
}
|
||||
|
||||
export interface QueryResult {
|
||||
entities: LosslessViewMany;
|
||||
totalFound: number;
|
||||
limited: boolean;
|
||||
}
|
||||
|
||||
export class QueryEngine {
|
||||
constructor(
|
||||
private lossless: Lossless,
|
||||
private schemaRegistry: SchemaRegistry
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Query entities by schema type with optional JSON Logic filter
|
||||
*/
|
||||
async query(
|
||||
schemaId: SchemaID,
|
||||
filter?: JsonLogic,
|
||||
options: QueryOptions = {}
|
||||
): Promise<QueryResult> {
|
||||
debug(`Querying schema ${schemaId} with filter:`, filter);
|
||||
|
||||
// 1. Find all entities that could match this schema
|
||||
const candidateEntityIds = this.discoverEntitiesBySchema(schemaId);
|
||||
debug(`Found ${candidateEntityIds.length} candidate entities for schema ${schemaId}`);
|
||||
|
||||
// 2. Compose lossless views for all candidates
|
||||
const allViews = this.lossless.compose(candidateEntityIds, options.deltaFilter);
|
||||
debug(`Composed ${Object.keys(allViews).length} lossless views`);
|
||||
|
||||
// 3. Apply JSON Logic filter if provided
|
||||
let filteredViews: LosslessViewMany = allViews;
|
||||
|
||||
if (filter) {
|
||||
filteredViews = this.applyJsonLogicFilter(allViews, filter, schemaId);
|
||||
debug(`After filtering: ${Object.keys(filteredViews).length} entities match`);
|
||||
}
|
||||
|
||||
// 4. Apply result limits if specified
|
||||
const totalFound = Object.keys(filteredViews).length;
|
||||
let limited = false;
|
||||
|
||||
if (options.maxResults && totalFound > options.maxResults) {
|
||||
const entityIds = Object.keys(filteredViews).slice(0, options.maxResults);
|
||||
filteredViews = {};
|
||||
for (const entityId of entityIds) {
|
||||
filteredViews[entityId] = allViews[entityId];
|
||||
}
|
||||
limited = true;
|
||||
debug(`Limited results to ${options.maxResults} entities`);
|
||||
}
|
||||
|
||||
return {
|
||||
entities: filteredViews,
|
||||
totalFound,
|
||||
limited
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Query for a single entity by ID with schema validation
|
||||
*/
|
||||
async queryOne(schemaId: SchemaID, entityId: DomainEntityID): Promise<LosslessViewOne | null> {
|
||||
debug(`Querying single entity ${entityId} with schema ${schemaId}`);
|
||||
|
||||
const views = this.lossless.compose([entityId]);
|
||||
const view = views[entityId];
|
||||
|
||||
if (!view) {
|
||||
debug(`Entity ${entityId} not found`);
|
||||
return null;
|
||||
}
|
||||
|
||||
// Validate that the entity matches the schema
|
||||
if (!this.entityMatchesSchema(view, schemaId)) {
|
||||
debug(`Entity ${entityId} does not match schema ${schemaId}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
return view;
|
||||
}
|
||||
|
||||
/**
|
||||
* Discover all entities that could potentially match a given schema
|
||||
* This is a heuristic based on the schema's required properties
|
||||
*/
|
||||
private discoverEntitiesBySchema(schemaId: SchemaID): DomainEntityID[] {
|
||||
const schema = this.schemaRegistry.get(schemaId);
|
||||
if (!schema) {
|
||||
debug(`Schema ${schemaId} not found in registry`);
|
||||
return [];
|
||||
}
|
||||
|
||||
// Strategy: Find entities that have deltas for the schema's required properties
|
||||
const requiredProperties = schema.requiredProperties || [];
|
||||
const allEntityIds = Array.from(this.lossless.domainEntities.keys());
|
||||
|
||||
if (requiredProperties.length === 0) {
|
||||
// No required properties - return all entities
|
||||
debug(`Schema ${schemaId} has no required properties, returning all entities`);
|
||||
return allEntityIds;
|
||||
}
|
||||
|
||||
// Find entities that have at least one required property
|
||||
const candidateEntities: DomainEntityID[] = [];
|
||||
|
||||
for (const entityId of allEntityIds) {
|
||||
const entity = this.lossless.domainEntities.get(entityId);
|
||||
if (!entity) continue;
|
||||
|
||||
// Check if entity has deltas for any required property
|
||||
const hasRequiredProperty = requiredProperties.some(propertyId =>
|
||||
entity.properties.has(propertyId)
|
||||
);
|
||||
|
||||
if (hasRequiredProperty) {
|
||||
candidateEntities.push(entityId);
|
||||
}
|
||||
}
|
||||
|
||||
debug(`Found ${candidateEntities.length} entities with required properties for schema ${schemaId}`);
|
||||
return candidateEntities;
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply JSON Logic filter to lossless views
|
||||
* This requires converting each lossless view to a queryable object
|
||||
*/
|
||||
private applyJsonLogicFilter(
|
||||
views: LosslessViewMany,
|
||||
filter: JsonLogic,
|
||||
schemaId: SchemaID
|
||||
): LosslessViewMany {
|
||||
const schema = this.schemaRegistry.get(schemaId);
|
||||
if (!schema) {
|
||||
debug(`Cannot filter without schema ${schemaId}`);
|
||||
return views;
|
||||
}
|
||||
|
||||
const filteredViews: LosslessViewMany = {};
|
||||
|
||||
for (const [entityId, view] of Object.entries(views)) {
|
||||
// Convert lossless view to queryable object using schema
|
||||
const queryableObject = this.losslessViewToQueryableObject(view, schema);
|
||||
|
||||
try {
|
||||
// Apply JSON Logic filter
|
||||
const matches = apply(filter, queryableObject);
|
||||
|
||||
if (matches) {
|
||||
filteredViews[entityId] = view;
|
||||
debug(`Entity ${entityId} matches filter`);
|
||||
} else {
|
||||
debug(`Entity ${entityId} does not match filter`);
|
||||
}
|
||||
} catch (error) {
|
||||
debug(`Error applying filter to entity ${entityId}:`, error);
|
||||
// Skip entities that cause filter errors
|
||||
}
|
||||
}
|
||||
|
||||
return filteredViews;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a lossless view to a queryable object based on schema
|
||||
* Uses simple resolution strategies for now
|
||||
*/
|
||||
private losslessViewToQueryableObject(view: LosslessViewOne, schema: ObjectSchema): Record<string, any> {
|
||||
const obj: Record<string, any> = {
|
||||
id: view.id,
|
||||
_referencedAs: view.referencedAs
|
||||
};
|
||||
|
||||
// Convert each schema property from lossless view deltas
|
||||
for (const [propertyId, propertySchema] of Object.entries(schema.properties)) {
|
||||
const deltas = view.propertyDeltas[propertyId] || [];
|
||||
|
||||
if (deltas.length === 0) {
|
||||
obj[propertyId] = null;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Apply simple resolution strategy based on property schema type
|
||||
switch (propertySchema.type) {
|
||||
case 'primitive':
|
||||
// Use last-write-wins for primitives
|
||||
const lastDelta = deltas.sort((a, b) => b.timeCreated - a.timeCreated)[0];
|
||||
const primitiveValue = this.extractPrimitiveValue(lastDelta, propertyId);
|
||||
obj[propertyId] = primitiveValue;
|
||||
break;
|
||||
|
||||
case 'array':
|
||||
// Collect all values as array
|
||||
const arrayValues = deltas
|
||||
.map(delta => this.extractPrimitiveValue(delta, propertyId))
|
||||
.filter(value => value !== null);
|
||||
obj[propertyId] = arrayValues;
|
||||
break;
|
||||
|
||||
case 'reference':
|
||||
// For references, include the target IDs
|
||||
const refValues = deltas
|
||||
.map(delta => this.extractReferenceValue(delta, propertyId))
|
||||
.filter(value => value !== null);
|
||||
obj[propertyId] = refValues;
|
||||
break;
|
||||
|
||||
default:
|
||||
obj[propertyId] = deltas.length;
|
||||
}
|
||||
}
|
||||
|
||||
debug(`Converted entity ${view.id} to queryable object:`, obj);
|
||||
return obj;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract primitive value from a delta for a given property
|
||||
*/
|
||||
private extractPrimitiveValue(delta: any, propertyId: string): any {
|
||||
// Look for the value in deltas that target this property
|
||||
// The delta should have a 'value' pointer containing the actual value
|
||||
for (const pointer of delta.pointers) {
|
||||
if (pointer['value'] !== undefined) {
|
||||
return pointer['value'];
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract reference value (target ID) from a delta for a given property
|
||||
*/
|
||||
private extractReferenceValue(delta: any, propertyId: string): string | null {
|
||||
// For references, we want the value pointer that contains the reference ID
|
||||
for (const pointer of delta.pointers) {
|
||||
if (pointer['value'] !== undefined && typeof pointer['value'] === 'string') {
|
||||
return pointer['value'];
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if an entity matches a schema (basic validation)
|
||||
*/
|
||||
private entityMatchesSchema(view: LosslessViewOne, schemaId: SchemaID): boolean {
|
||||
const schema = this.schemaRegistry.get(schemaId);
|
||||
if (!schema) return false;
|
||||
|
||||
// Check that all required properties have at least one delta
|
||||
const requiredProperties = schema.requiredProperties || [];
|
||||
|
||||
for (const propertyId of requiredProperties) {
|
||||
const deltas = view.propertyDeltas[propertyId];
|
||||
if (!deltas || deltas.length === 0) {
|
||||
debug(`Entity ${view.id} missing required property ${propertyId} for schema ${schemaId}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get statistics about queryable entities
|
||||
*/
|
||||
getStats() {
|
||||
const totalEntities = this.lossless.domainEntities.size;
|
||||
const registeredSchemas = this.schemaRegistry.list().length;
|
||||
|
||||
return {
|
||||
totalEntities,
|
||||
registeredSchemas,
|
||||
schemasById: this.schemaRegistry.list().reduce((acc, schema) => {
|
||||
acc[schema.id] = this.discoverEntitiesBySchema(schema.id).length;
|
||||
return acc;
|
||||
}, {} as Record<string, number>)
|
||||
};
|
||||
}
|
||||
}
|
35
todo.md
35
todo.md
@ -33,7 +33,7 @@ This document tracks work needed to achieve full specification compliance, organ
|
||||
- [x] Update lossy resolvers to respect negations
|
||||
- [x] Add comprehensive negation tests
|
||||
|
||||
### 2.2 Advanced Conflict Resolution
|
||||
### 2.2 Advanced Conflict Resolution ✅
|
||||
- [x] Implement numeric aggregation resolvers (min/max/sum/average)
|
||||
- [x] Add timestamp-based ordering with tie-breaking
|
||||
- [x] Add custom resolver plugin system
|
||||
@ -48,18 +48,21 @@ This document tracks work needed to achieve full specification compliance, organ
|
||||
|
||||
## Phase 3: Query System
|
||||
|
||||
### 3.1 Query Engine Foundation
|
||||
- [ ] Implement JSON Logic parser
|
||||
- [ ] Create query planner for lossless views
|
||||
- [ ] Add query execution engine
|
||||
- [ ] Implement query result caching
|
||||
- [ ] Enable the skipped query tests
|
||||
### 3.1 Query Engine Foundation ✅
|
||||
- [x] Implement JSON Logic parser (using json-logic-js)
|
||||
- [x] Create query planner for lossless views
|
||||
- [x] Add query execution engine (QueryEngine class)
|
||||
- [x] Implement schema-driven entity discovery
|
||||
- [x] Enable the skipped query tests
|
||||
- [x] Add HTTP API endpoints for querying
|
||||
- [x] Integrate QueryEngine into RhizomeNode
|
||||
|
||||
### 3.2 Query Optimizations
|
||||
### 3.2 Query Optimizations (Future)
|
||||
- [ ] Add index support for common queries
|
||||
- [ ] Implement query cost estimation
|
||||
- [ ] Add query result streaming
|
||||
- [ ] Test query performance at scale
|
||||
- [ ] Add query result caching with invalidation
|
||||
|
||||
## Phase 4: Relational Features
|
||||
|
||||
@ -125,9 +128,9 @@ This document tracks work needed to achieve full specification compliance, organ
|
||||
4. Negation handling tests
|
||||
|
||||
### Medium Priority (Needed for Features)
|
||||
1. Advanced resolver tests
|
||||
2. Nested object tests
|
||||
3. Query engine tests
|
||||
1. Advanced resolver tests ✅
|
||||
2. Nested object tests ✅
|
||||
3. Query engine tests ✅
|
||||
4. Relational constraint tests
|
||||
|
||||
### Low Priority (Nice to Have)
|
||||
@ -137,11 +140,11 @@ This document tracks work needed to achieve full specification compliance, organ
|
||||
|
||||
## Implementation Order
|
||||
|
||||
1. **Start with Phase 1** - These are foundational requirements
|
||||
2. **Phase 2.1 (Negation)** - Core spec feature that affects all views
|
||||
3. **Phase 2.2 (Resolvers)** - Needed for proper lossy views
|
||||
4. **Phase 3 (Query)** - Unlocks powerful data access
|
||||
5. **Phase 2.3 (Nesting)** - Depends on schemas and queries
|
||||
1. **Phase 1** ✅ - These are foundational requirements
|
||||
2. **Phase 2.1 (Negation)** ✅ - Core spec feature that affects all views
|
||||
3. **Phase 2.2 (Resolvers)** ✅ - Needed for proper lossy views
|
||||
4. **Phase 2.3 (Nesting)** ✅ - Depends on schemas and queries
|
||||
5. **Phase 3 (Query)** ✅ - Unlocks powerful data access
|
||||
6. **Phase 4 (Relational)** - Builds on query system
|
||||
7. **Phase 5 & 6** - Optimization and polish
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user