refactor: optimize lossless view and improve code quality
- Optimize pointer handling and entity reference tracking in Lossless view - Improve type safety with proper TypeScript types - Add debug logging for better troubleshooting - Clean up imports and unused variables - Update test cases for DeltaBuilder
This commit is contained in:
parent
d7c4fda93e
commit
d0941f417e
@ -46,8 +46,8 @@ describe('DeltaBuilder', () => {
|
|||||||
|
|
||||||
// Verify that the entity property resolves correctly
|
// Verify that the entity property resolves correctly
|
||||||
const lossless = new Lossless(node);
|
const lossless = new Lossless(node);
|
||||||
lossless.ingestDelta(delta);
|
|
||||||
const lossy = new TimestampResolver(lossless);
|
const lossy = new TimestampResolver(lossless);
|
||||||
|
lossless.ingestDelta(delta);
|
||||||
const result = lossy.resolve();
|
const result = lossy.resolve();
|
||||||
expect(result).toBeDefined();
|
expect(result).toBeDefined();
|
||||||
expect(result!['entity-1'].properties.name).toBe('Test Entity');
|
expect(result!['entity-1'].properties.name).toBe('Test Entity');
|
||||||
@ -71,8 +71,8 @@ describe('DeltaBuilder', () => {
|
|||||||
|
|
||||||
// Verify that the entity property resolves correctly
|
// Verify that the entity property resolves correctly
|
||||||
const lossless = new Lossless(node);
|
const lossless = new Lossless(node);
|
||||||
lossless.ingestDelta(delta);
|
|
||||||
const lossy = new TimestampResolver(lossless);
|
const lossy = new TimestampResolver(lossless);
|
||||||
|
lossless.ingestDelta(delta);
|
||||||
const result = lossy.resolve();
|
const result = lossy.resolve();
|
||||||
expect(result).toBeDefined();
|
expect(result).toBeDefined();
|
||||||
expect(result!['entity-1'].properties.name).toBe('Test Entity');
|
expect(result!['entity-1'].properties.name).toBe('Test Entity');
|
||||||
@ -171,8 +171,8 @@ describe('DeltaBuilder', () => {
|
|||||||
expect(delta.pointers).toHaveProperty('type', 'follows');
|
expect(delta.pointers).toHaveProperty('type', 'follows');
|
||||||
|
|
||||||
const lossless = new Lossless(node);
|
const lossless = new Lossless(node);
|
||||||
lossless.ingestDelta(delta);
|
|
||||||
const lossy = new TimestampResolver(lossless);
|
const lossy = new TimestampResolver(lossless);
|
||||||
|
lossless.ingestDelta(delta);
|
||||||
const result = lossy.resolve([relId]);
|
const result = lossy.resolve([relId]);
|
||||||
expect(result).toBeDefined();
|
expect(result).toBeDefined();
|
||||||
expect(result![relId]).toMatchObject({
|
expect(result![relId]).toMatchObject({
|
||||||
@ -201,8 +201,8 @@ describe('DeltaBuilder', () => {
|
|||||||
expect(delta.pointers).toHaveProperty('version', 1);
|
expect(delta.pointers).toHaveProperty('version', 1);
|
||||||
|
|
||||||
const lossless = new Lossless(node);
|
const lossless = new Lossless(node);
|
||||||
lossless.ingestDelta(delta);
|
|
||||||
const lossy = new TimestampResolver(lossless);
|
const lossy = new TimestampResolver(lossless);
|
||||||
|
lossless.ingestDelta(delta);
|
||||||
const result = lossy.resolve([relId]);
|
const result = lossy.resolve([relId]);
|
||||||
expect(result).toBeDefined();
|
expect(result).toBeDefined();
|
||||||
expect(result![relId]).toMatchObject({
|
expect(result![relId]).toMatchObject({
|
||||||
|
@ -351,8 +351,8 @@ describe('Negation System', () => {
|
|||||||
test('should handle self-referential entities in negations', () => {
|
test('should handle self-referential entities in negations', () => {
|
||||||
// Create a delta that references itself
|
// Create a delta that references itself
|
||||||
const selfRefDelta = createDelta('user1', 'host1')
|
const selfRefDelta = createDelta('user1', 'host1')
|
||||||
.setProperty('node1', 'parent', 'node1')
|
.setProperty('node1', 'parent', 'node1', 'node-as-parent')
|
||||||
.setProperty('node1', 'child', 'node1') // Self-reference
|
.setProperty('node1', 'child', 'node1', 'node-as-child') // Self-reference
|
||||||
.buildV1();
|
.buildV1();
|
||||||
|
|
||||||
const negationDelta = createDelta('admin', 'host1').negate(selfRefDelta.id).buildV1();
|
const negationDelta = createDelta('admin', 'host1').negate(selfRefDelta.id).buildV1();
|
||||||
|
@ -4,7 +4,7 @@ import { Lossless, LosslessViewOne } from "../../../src/views/lossless";
|
|||||||
import { Lossy } from "../../../src/views/lossy";
|
import { Lossy } from "../../../src/views/lossy";
|
||||||
import { RhizomeNode } from "../../../src/node";
|
import { RhizomeNode } from "../../../src/node";
|
||||||
import { valueFromCollapsedDelta } from "../../../src/views/resolvers/aggregation-resolvers";
|
import { valueFromCollapsedDelta } from "../../../src/views/resolvers/aggregation-resolvers";
|
||||||
import { lastValueFromDeltas } from "../../../src/views/resolvers/timestamp-resolvers";
|
import { latestFromCollapsedDeltas } from "../../../src/views/resolvers/timestamp-resolvers";
|
||||||
import { createDelta } from "../../../src/core/delta-builder";
|
import { createDelta } from "../../../src/core/delta-builder";
|
||||||
const debug = Debug('rz:test:lossy');
|
const debug = Debug('rz:test:lossy');
|
||||||
|
|
||||||
@ -18,8 +18,16 @@ type Summary = {
|
|||||||
roles: Role[];
|
roles: Role[];
|
||||||
};
|
};
|
||||||
|
|
||||||
class Summarizer extends Lossy<Summary, Summary> {
|
class Summarizer extends Lossy<Summary> {
|
||||||
|
private readonly debug: debug.Debugger;
|
||||||
|
|
||||||
|
constructor(lossless: Lossless) {
|
||||||
|
super(lossless);
|
||||||
|
this.debug = Debug('rz:test:lossy:summarizer');
|
||||||
|
}
|
||||||
|
|
||||||
initializer(): Summary {
|
initializer(): Summary {
|
||||||
|
this.debug('Initializing new summary');
|
||||||
return {
|
return {
|
||||||
roles: []
|
roles: []
|
||||||
};
|
};
|
||||||
@ -30,23 +38,53 @@ class Summarizer extends Lossy<Summary, Summary> {
|
|||||||
// TODO: Prove with failing test
|
// TODO: Prove with failing test
|
||||||
|
|
||||||
reducer(acc: Summary, cur: LosslessViewOne): Summary {
|
reducer(acc: Summary, cur: LosslessViewOne): Summary {
|
||||||
|
this.debug(`Processing view for entity ${cur.id} (referenced as: ${cur.referencedAs.join(', ')})`);
|
||||||
|
this.debug(`lossless view:`, JSON.stringify(cur));
|
||||||
|
|
||||||
if (cur.referencedAs.includes("role")) {
|
if (cur.referencedAs.includes("role")) {
|
||||||
const {delta, value: actor} = lastValueFromDeltas("actor", cur.propertyDeltas["actor"]) ?? {};
|
this.debug(`Found role entity: ${cur.id}`);
|
||||||
if (!delta) throw new Error('expected to find delta');
|
|
||||||
if (!actor) throw new Error('expected to find actor');
|
const actorDeltas = cur.propertyDeltas["actor"];
|
||||||
|
this.debug(`Found ${actorDeltas?.length ?? 0} actor deltas`);
|
||||||
|
|
||||||
|
const {delta, value: actor} = latestFromCollapsedDeltas("actor", actorDeltas) ?? {};
|
||||||
|
|
||||||
|
if (!delta) {
|
||||||
|
this.debug('No delta found for actor property');
|
||||||
|
throw new Error('expected to find delta');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!actor) {
|
||||||
|
this.debug('No actor value found in deltas');
|
||||||
|
throw new Error('expected to find actor');
|
||||||
|
}
|
||||||
|
|
||||||
|
this.debug(`Found actor: ${actor}`);
|
||||||
const film = valueFromCollapsedDelta("film", delta);
|
const film = valueFromCollapsedDelta("film", delta);
|
||||||
if (!film) throw new Error('expected to find film');
|
|
||||||
acc.roles.push({
|
if (!film) {
|
||||||
|
this.debug('No film property found in delta');
|
||||||
|
throw new Error('expected to find film');
|
||||||
|
}
|
||||||
|
|
||||||
|
this.debug(`Found film: ${film}`);
|
||||||
|
const role = {
|
||||||
role: cur.id,
|
role: cur.id,
|
||||||
actor,
|
actor,
|
||||||
film
|
film
|
||||||
});
|
};
|
||||||
|
|
||||||
|
acc.roles.push(role);
|
||||||
|
this.debug(`Added role: ${JSON.stringify(role)}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.debug(`Updated accumulator: ${JSON.stringify(acc, null, 2)}`);
|
||||||
|
|
||||||
return acc;
|
return acc;
|
||||||
}
|
}
|
||||||
|
|
||||||
resolver(acc: Summary): Summary {
|
resolver(acc: Summary): Summary {
|
||||||
|
this.debug(`Resolving summary with ${acc.roles.length} roles`);
|
||||||
return acc;
|
return acc;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,9 @@ import { DeltaV1, DeltaV2 } from './delta';
|
|||||||
import { randomUUID } from 'crypto';
|
import { randomUUID } from 'crypto';
|
||||||
import { PropertyTypes } from './types';
|
import { PropertyTypes } from './types';
|
||||||
import { PointersV2 } from './delta';
|
import { PointersV2 } from './delta';
|
||||||
|
import Debug from 'debug';
|
||||||
|
|
||||||
|
const debug = Debug('rz:delta-builder');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A fluent builder for creating Delta objects with proper validation and type safety.
|
* A fluent builder for creating Delta objects with proper validation and type safety.
|
||||||
@ -77,9 +80,11 @@ export class DeltaBuilder {
|
|||||||
addPointer(localContext: string, target: string | number | boolean | null, targetContext?: string): this {
|
addPointer(localContext: string, target: string | number | boolean | null, targetContext?: string): this {
|
||||||
const pointerTarget = (targetContext && typeof target === 'string')
|
const pointerTarget = (targetContext && typeof target === 'string')
|
||||||
? { [target]: targetContext } : target;
|
? { [target]: targetContext } : target;
|
||||||
if (this.pointers[localContext] &&
|
// Prevent duplicate primitive properties with the same key
|
||||||
|
if (this.pointers[localContext] &&
|
||||||
JSON.stringify(this.pointers[localContext]) !== JSON.stringify(pointerTarget)
|
JSON.stringify(this.pointers[localContext]) !== JSON.stringify(pointerTarget)
|
||||||
) {
|
) {
|
||||||
|
debug(`Pointer for '${localContext}' already exists with different value: ${JSON.stringify(this.pointers[localContext])} !== ${JSON.stringify(pointerTarget)}`);
|
||||||
throw new Error(`Pointer for ${localContext} already exists with different value`);
|
throw new Error(`Pointer for ${localContext} already exists with different value`);
|
||||||
}
|
}
|
||||||
this.pointers[localContext] = pointerTarget;
|
this.pointers[localContext] = pointerTarget;
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import Docker, { DockerOptions } from 'dockerode';
|
import Docker from 'dockerode';
|
||||||
import * as path from 'path';
|
import * as path from 'path';
|
||||||
import { promises as fs } from 'fs';
|
import { promises as fs } from 'fs';
|
||||||
import * as tar from 'tar-fs';
|
import * as tar from 'tar-fs';
|
||||||
@ -63,9 +63,9 @@ export class ImageManager implements IImageManager {
|
|||||||
debug('Created build context tar stream');
|
debug('Created build context tar stream');
|
||||||
|
|
||||||
testImageBuildPromise = new Promise<void>((resolve, reject) => {
|
testImageBuildPromise = new Promise<void>((resolve, reject) => {
|
||||||
const log = (...args: any[]) => {
|
const log = (...args: unknown[]) => {
|
||||||
const message = args.map(arg =>
|
const message = args.map(arg =>
|
||||||
typeof arg === 'object' ? JSON.stringify(arg, null, 2) : String(arg)
|
typeof arg === 'object' ? JSON.stringify(arg) : String(arg)
|
||||||
).join(' ');
|
).join(' ');
|
||||||
debug(message);
|
debug(message);
|
||||||
};
|
};
|
||||||
@ -111,7 +111,7 @@ export class ImageManager implements IImageManager {
|
|||||||
// Log any other non-empty JSON objects
|
// Log any other non-empty JSON objects
|
||||||
log(`[Docker Build] ${JSON.stringify(json)}`);
|
log(`[Docker Build] ${JSON.stringify(json)}`);
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (_e) {
|
||||||
// If not JSON, log as plain text if not empty
|
// If not JSON, log as plain text if not empty
|
||||||
if (line.trim()) {
|
if (line.trim()) {
|
||||||
log(`[Docker Build] ${line}`);
|
log(`[Docker Build] ${line}`);
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import Docker, { Container } from 'dockerode';
|
import { Container } from 'dockerode';
|
||||||
import { IStatusManager } from './interfaces';
|
import { IStatusManager } from './interfaces';
|
||||||
import { NodeHandle, NodeStatus } from '../../types';
|
import { NodeHandle, NodeStatus } from '../../types';
|
||||||
import Debug from 'debug';
|
import Debug from 'debug';
|
||||||
|
@ -269,7 +269,7 @@ export class QueryEngine {
|
|||||||
case 'primitive': {
|
case 'primitive': {
|
||||||
// Use last-write-wins for primitives
|
// Use last-write-wins for primitives
|
||||||
const deltasSorted = deltas.sort((a, b) => b.timeCreated - a.timeCreated);
|
const deltasSorted = deltas.sort((a, b) => b.timeCreated - a.timeCreated);
|
||||||
for (let delta of deltasSorted) {
|
for (const delta of deltasSorted) {
|
||||||
const primitiveValue = this.extractPrimitiveValue(delta, propertyId);
|
const primitiveValue = this.extractPrimitiveValue(delta, propertyId);
|
||||||
if (primitiveValue !== null) {
|
if (primitiveValue !== null) {
|
||||||
obj[propertyId] = primitiveValue;
|
obj[propertyId] = primitiveValue;
|
||||||
|
@ -72,7 +72,7 @@ class LosslessEntity {
|
|||||||
export class Lossless {
|
export class Lossless {
|
||||||
domainEntities = new LosslessEntityMap();
|
domainEntities = new LosslessEntityMap();
|
||||||
transactions: Transactions;
|
transactions: Transactions;
|
||||||
referencedAs = new Map<string, Set<DomainEntityID>>();
|
referencedAs = new Map<string, Set<string>>();
|
||||||
eventStream = new EventEmitter();
|
eventStream = new EventEmitter();
|
||||||
|
|
||||||
// Track all deltas by ID for negation processing
|
// Track all deltas by ID for negation processing
|
||||||
@ -155,19 +155,6 @@ export class Lossless {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const {target, localContext} of delta.pointers) {
|
|
||||||
if (typeof target === "string" && this.domainEntities.has(target)) {
|
|
||||||
if (this.domainEntities.has(target)) {
|
|
||||||
let referencedAs = this.referencedAs.get(localContext);
|
|
||||||
if (!referencedAs) {
|
|
||||||
referencedAs = new Set<string>();
|
|
||||||
this.referencedAs.set(localContext, referencedAs);
|
|
||||||
}
|
|
||||||
referencedAs.add(target);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const transactionId = this.transactions.ingestDelta(delta, targets);
|
const transactionId = this.transactions.ingestDelta(delta, targets);
|
||||||
|
|
||||||
if (!transactionId) {
|
if (!transactionId) {
|
||||||
@ -232,8 +219,8 @@ export class Lossless {
|
|||||||
const ent = this.domainEntities.get(id);
|
const ent = this.domainEntities.get(id);
|
||||||
if (!ent) continue;
|
if (!ent) continue;
|
||||||
|
|
||||||
|
|
||||||
const referencedAs = new Set<string>();
|
const referencedAs = new Set<string>();
|
||||||
|
|
||||||
const propertyDeltas: {
|
const propertyDeltas: {
|
||||||
[key: PropertyID]: CollapsedDelta[]
|
[key: PropertyID]: CollapsedDelta[]
|
||||||
} = {};
|
} = {};
|
||||||
@ -272,15 +259,11 @@ export class Lossless {
|
|||||||
|
|
||||||
const pointers: CollapsedPointer[] = [];
|
const pointers: CollapsedPointer[] = [];
|
||||||
|
|
||||||
for (const {localContext, target, targetContext} of delta.pointers) {
|
for (const {localContext, target} of delta.pointers) {
|
||||||
if (targetContext) {
|
|
||||||
// Only store primitive pointers in the collapsed delta
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
pointers.push({[localContext]: target});
|
|
||||||
if (target === ent.id) {
|
if (target === ent.id) {
|
||||||
referencedAs.add(localContext);
|
referencedAs.add(localContext);
|
||||||
}
|
}
|
||||||
|
pointers.push({[localContext]: target});
|
||||||
}
|
}
|
||||||
|
|
||||||
visibleDeltas.push({
|
visibleDeltas.push({
|
||||||
@ -295,6 +278,14 @@ export class Lossless {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.referencedAs.has(ent.id)) {
|
||||||
|
for (const ref of referencedAs) {
|
||||||
|
this.referencedAs.get(ent.id)!.add(ref);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.referencedAs.set(ent.id, referencedAs);
|
||||||
|
}
|
||||||
|
|
||||||
// Only include entity in view if it has visible deltas
|
// Only include entity in view if it has visible deltas
|
||||||
if (hasVisibleDeltas) {
|
if (hasVisibleDeltas) {
|
||||||
view[ent.id] = {
|
view[ent.id] = {
|
||||||
|
@ -47,6 +47,7 @@ export abstract class Lossy<Accumulator, Result = Accumulator> {
|
|||||||
return this.deltaFilter(delta);
|
return this.deltaFilter(delta);
|
||||||
};
|
};
|
||||||
const losslessPartial = this.lossless.compose([entityId], combinedFilter);
|
const losslessPartial = this.lossless.compose([entityId], combinedFilter);
|
||||||
|
debug(`Lossless partial for entity ${entityId}:`, JSON.stringify(losslessPartial));
|
||||||
|
|
||||||
if (!losslessPartial) {
|
if (!losslessPartial) {
|
||||||
// This should not happen; this should only be called after the lossless view has been updated
|
// This should not happen; this should only be called after the lossless view has been updated
|
||||||
|
@ -199,7 +199,7 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
|
|||||||
*/
|
*/
|
||||||
reducer(acc: Accumulator, {id: entityId, propertyDeltas}: LosslessViewOne): Accumulator {
|
reducer(acc: Accumulator, {id: entityId, propertyDeltas}: LosslessViewOne): Accumulator {
|
||||||
debug(`Processing deltas for entity: ${entityId}`);
|
debug(`Processing deltas for entity: ${entityId}`);
|
||||||
debug('Property deltas:', JSON.stringify(propertyDeltas, null, 2));
|
debug('Property deltas:', JSON.stringify(propertyDeltas));
|
||||||
|
|
||||||
if (!acc[entityId]) {
|
if (!acc[entityId]) {
|
||||||
acc[entityId] = {};
|
acc[entityId] = {};
|
||||||
@ -246,7 +246,7 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
|
|||||||
|
|
||||||
// Update the plugin state with the new delta
|
// Update the plugin state with the new delta
|
||||||
entityState[pluginKey] = plugin.update(pluginState, propertyValue, updateDelta, dependencies);
|
entityState[pluginKey] = plugin.update(pluginState, propertyValue, updateDelta, dependencies);
|
||||||
debugState(`Updated entity state for ${entityId}:`, JSON.stringify(entityState[pluginKey], null, 2));
|
debugState(`Updated entity state for ${entityId}:`, JSON.stringify(entityState[pluginKey]));
|
||||||
}
|
}
|
||||||
|
|
||||||
return acc;
|
return acc;
|
||||||
@ -254,7 +254,7 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
|
|||||||
|
|
||||||
resolver(acc: Accumulator, entityIds: DomainEntityID[]) {
|
resolver(acc: Accumulator, entityIds: DomainEntityID[]) {
|
||||||
const result: Result = {};
|
const result: Result = {};
|
||||||
debug('Initial accumulator state:', JSON.stringify(acc, null, 2));
|
debug('Initial accumulator state:', JSON.stringify(acc));
|
||||||
|
|
||||||
for (const entityId in acc) {
|
for (const entityId in acc) {
|
||||||
if (!entityIds.includes(entityId)) continue;
|
if (!entityIds.includes(entityId)) continue;
|
||||||
@ -270,9 +270,9 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
|
|||||||
|
|
||||||
debug(`Processing property: ${propertyId} (key: ${pluginKey})`);
|
debug(`Processing property: ${propertyId} (key: ${pluginKey})`);
|
||||||
const dependencies = this.getDependencyStates(acc[entityId], plugin);
|
const dependencies = this.getDependencyStates(acc[entityId], plugin);
|
||||||
debug(`Dependencies for ${propertyId}:`, JSON.stringify(dependencies, null, 2));
|
debug(`Dependencies for ${propertyId}:`, JSON.stringify(dependencies));
|
||||||
const state = acc[entityId][pluginKey] || plugin.initialize(dependencies);
|
const state = acc[entityId][pluginKey] || plugin.initialize(dependencies);
|
||||||
debug(`State for ${propertyId}:`, JSON.stringify(state, null, 2));
|
debug(`State for ${propertyId}:`, JSON.stringify(state));
|
||||||
|
|
||||||
const resolvedValue = plugin.resolve(state, dependencies);
|
const resolvedValue = plugin.resolve(state, dependencies);
|
||||||
if (resolvedValue === undefined) throw new Error(`Resolved value for property ${propertyId} is undefined`)
|
if (resolvedValue === undefined) throw new Error(`Resolved value for property ${propertyId} is undefined`)
|
||||||
|
@ -158,7 +158,7 @@ export class LexicographicTimestampResolver extends TimestampResolver {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Resolve a value for an entity by last write wins
|
// Resolve a value for an entity by last write wins
|
||||||
export function lastValueFromDeltas(
|
export function latestFromCollapsedDeltas(
|
||||||
key: string,
|
key: string,
|
||||||
deltas?: CollapsedDelta[]
|
deltas?: CollapsedDelta[]
|
||||||
): {
|
): {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user