Compare commits

...

4 Commits

Author SHA1 Message Date
29b1b8bb9a
hard-won battle to fix view resolution 2025-06-25 16:43:13 -05:00
bdc6958b49
more progress 2025-06-25 13:37:35 -05:00
880affad1c
progress 2025-06-25 13:37:16 -05:00
9957dccddd
fix: improve resolver dependency handling and logging
- Fixed AggregationResolver to properly accumulate values for sum/average/count operations
- Enhanced CustomResolver with detailed debug logging for dependency resolution
- Added execution order logging for better debugging
- Improved error messages and graph visualization in dependency resolution
- Moved valueFromCollapsedDelta to lossless.ts for better code organization
2025-06-25 12:25:40 -05:00
33 changed files with 363 additions and 234 deletions

View File

@ -1,11 +1,11 @@
import Debug from 'debug'; import Debug from 'debug';
import { PointerTarget } from "../../../src/core/delta"; import { PointerTarget } from "@src/core/delta";
import { Lossless, LosslessViewOne } from "../../../src/views/lossless"; import { Lossless, LosslessViewOne } from "@src/views/lossless";
import { Lossy } from "../../../src/views/lossy"; import { Lossy } from "@src/views/lossy";
import { RhizomeNode } from "../../../src/node"; import { RhizomeNode } from "@src/node";
import { valueFromCollapsedDelta } from "../../../src/views/resolvers/aggregation-resolvers"; import { valueFromCollapsedDelta } from "@src/views/lossless";
import { latestFromCollapsedDeltas } from "../../../src/views/resolvers/timestamp-resolvers"; import { latestFromCollapsedDeltas } from "@src/views/resolvers/timestamp-resolvers";
import { createDelta } from "../../../src/core/delta-builder"; import { createDelta } from "@src/core/delta-builder";
const debug = Debug('rz:test:lossy'); const debug = Debug('rz:test:lossy');
type Role = { type Role = {

View File

@ -8,8 +8,8 @@ import {
AverageResolver, AverageResolver,
CountResolver, CountResolver,
AggregationType AggregationType
} from "../../../../src"; } from "@src";
import { createDelta } from "../../../../src/core/delta-builder"; import { createDelta } from "@src/core/delta-builder";
describe('Aggregation Resolvers', () => { describe('Aggregation Resolvers', () => {
let node: RhizomeNode; let node: RhizomeNode;
@ -22,6 +22,8 @@ describe('Aggregation Resolvers', () => {
describe('Basic Aggregation', () => { describe('Basic Aggregation', () => {
test('should aggregate numbers using min resolver', () => { test('should aggregate numbers using min resolver', () => {
const minResolver = new MinResolver(lossless, ['score']);
// Add first entity with score 10 // Add first entity with score 10
lossless.ingestDelta(createDelta('test', 'host1') lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'score', 10, 'collection') .setProperty('entity1', 'score', 10, 'collection')
@ -40,7 +42,6 @@ describe('Aggregation Resolvers', () => {
.buildV1() .buildV1()
); );
const minResolver = new MinResolver(lossless, ['score']);
const result = minResolver.resolve(); const result = minResolver.resolve();
expect(result).toBeDefined(); expect(result).toBeDefined();
@ -51,6 +52,8 @@ describe('Aggregation Resolvers', () => {
}); });
test('should aggregate numbers using max resolver', () => { test('should aggregate numbers using max resolver', () => {
const maxResolver = new MaxResolver(lossless, ['score']);
// Add deltas for entities // Add deltas for entities
lossless.ingestDelta(createDelta('test', 'host1') lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'score', 10, 'collection') .setProperty('entity1', 'score', 10, 'collection')
@ -67,7 +70,6 @@ describe('Aggregation Resolvers', () => {
.buildV1() .buildV1()
); );
const maxResolver = new MaxResolver(lossless, ['score']);
const result = maxResolver.resolve(); const result = maxResolver.resolve();
expect(result).toBeDefined(); expect(result).toBeDefined();
@ -77,6 +79,8 @@ describe('Aggregation Resolvers', () => {
}); });
test('should aggregate numbers using sum resolver', () => { test('should aggregate numbers using sum resolver', () => {
const sumResolver = new SumResolver(lossless, ['value']);
// Add first value for entity1 // Add first value for entity1
lossless.ingestDelta(createDelta('test', 'host1') lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'value', 10, 'collection') .setProperty('entity1', 'value', 10, 'collection')
@ -95,7 +99,6 @@ describe('Aggregation Resolvers', () => {
.buildV1() .buildV1()
); );
const sumResolver = new SumResolver(lossless, ['value']);
const result = sumResolver.resolve(); const result = sumResolver.resolve();
expect(result).toBeDefined(); expect(result).toBeDefined();
@ -104,6 +107,8 @@ describe('Aggregation Resolvers', () => {
}); });
test('should aggregate numbers using average resolver', () => { test('should aggregate numbers using average resolver', () => {
const avgResolver = new AverageResolver(lossless, ['score']);
// Add multiple scores for entity1 // Add multiple scores for entity1
lossless.ingestDelta(createDelta('test', 'host1') lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'score', 10, 'collection') .setProperty('entity1', 'score', 10, 'collection')
@ -121,7 +126,6 @@ describe('Aggregation Resolvers', () => {
.buildV1() .buildV1()
); );
const avgResolver = new AverageResolver(lossless, ['score']);
const result = avgResolver.resolve(); const result = avgResolver.resolve();
expect(result).toBeDefined(); expect(result).toBeDefined();
@ -130,6 +134,8 @@ describe('Aggregation Resolvers', () => {
}); });
test('should count values using count resolver', () => { test('should count values using count resolver', () => {
const countResolver = new CountResolver(lossless, ['visits']);
// Add multiple visit deltas for entity1 // Add multiple visit deltas for entity1
lossless.ingestDelta(createDelta('test', 'host1') lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'visits', 1, 'collection') .setProperty('entity1', 'visits', 1, 'collection')
@ -147,7 +153,6 @@ describe('Aggregation Resolvers', () => {
.buildV1() .buildV1()
); );
const countResolver = new CountResolver(lossless, ['visits']);
const result = countResolver.resolve(); const result = countResolver.resolve();
expect(result).toBeDefined(); expect(result).toBeDefined();
@ -158,6 +163,12 @@ describe('Aggregation Resolvers', () => {
describe('Custom Aggregation Configuration', () => { describe('Custom Aggregation Configuration', () => {
test('should handle mixed aggregation types', () => { test('should handle mixed aggregation types', () => {
const resolver = new AggregationResolver(lossless, {
min_val: 'min' as AggregationType,
max_val: 'max' as AggregationType,
sum_val: 'sum' as AggregationType
});
// Add first set of values // Add first set of values
lossless.ingestDelta(createDelta('test', 'host1') lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'min_val', 10, 'collection') .setProperty('entity1', 'min_val', 10, 'collection')
@ -190,11 +201,6 @@ describe('Aggregation Resolvers', () => {
.buildV1() .buildV1()
); );
const resolver = new AggregationResolver(lossless, {
min_val: 'min' as AggregationType,
max_val: 'max' as AggregationType,
sum_val: 'sum' as AggregationType
});
const result = resolver.resolve(); const result = resolver.resolve();
expect(result).toBeDefined(); expect(result).toBeDefined();
@ -206,6 +212,11 @@ describe('Aggregation Resolvers', () => {
}); });
test('should ignore non-numeric values', () => { test('should ignore non-numeric values', () => {
const resolver = new AggregationResolver(lossless, {
score: 'sum' as AggregationType,
name: 'count' as AggregationType
});
// Add numeric value // Add numeric value
lossless.ingestDelta(createDelta('test', 'host1') lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'score', 10, 'collection') .setProperty('entity1', 'score', 10, 'collection')
@ -224,8 +235,7 @@ describe('Aggregation Resolvers', () => {
.buildV1() .buildV1()
); );
const sumResolver = new SumResolver(lossless, ['score', 'name']); const result = resolver.resolve();
const result = sumResolver.resolve();
expect(result).toBeDefined(); expect(result).toBeDefined();
const entity = result!['entity1']; const entity = result!['entity1'];
@ -234,13 +244,13 @@ describe('Aggregation Resolvers', () => {
}); });
test('should handle empty value arrays', () => { test('should handle empty value arrays', () => {
const sumResolver = new SumResolver(lossless, ['score']);
// Create entity with non-aggregated property // Create entity with non-aggregated property
lossless.ingestDelta(createDelta('test', 'host1') lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'name', 'test', 'collection') .setProperty('entity1', 'name', 'test', 'collection')
.buildV1() .buildV1()
); );
const sumResolver = new SumResolver(lossless, ['score']);
const result = sumResolver.resolve(); const result = sumResolver.resolve();
expect(result).toBeDefined(); expect(result).toBeDefined();
@ -251,12 +261,13 @@ describe('Aggregation Resolvers', () => {
describe('Edge Cases', () => { describe('Edge Cases', () => {
test('should handle single value aggregations', () => { test('should handle single value aggregations', () => {
const avgResolver = new AverageResolver(lossless, ['value']);
lossless.ingestDelta(createDelta('test', 'host1') lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'value', 42, 'collection') .setProperty('entity1', 'value', 42, 'collection')
.buildV1() .buildV1()
); );
const avgResolver = new AverageResolver(lossless, ['value']);
const result = avgResolver.resolve(); const result = avgResolver.resolve();
expect(result).toBeDefined(); expect(result).toBeDefined();
@ -264,6 +275,8 @@ describe('Aggregation Resolvers', () => {
}); });
test('should handle zero values', () => { test('should handle zero values', () => {
const sumResolver = new SumResolver(lossless, ['value']);
lossless.ingestDelta(createDelta('test', 'host1') lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'value', 0, 'collection') .setProperty('entity1', 'value', 0, 'collection')
.buildV1() .buildV1()
@ -274,7 +287,6 @@ describe('Aggregation Resolvers', () => {
.buildV1() .buildV1()
); );
const sumResolver = new SumResolver(lossless, ['value']);
const result = sumResolver.resolve(); const result = sumResolver.resolve();
expect(result).toBeDefined(); expect(result).toBeDefined();
@ -282,6 +294,8 @@ describe('Aggregation Resolvers', () => {
}); });
test('should handle negative values', () => { test('should handle negative values', () => {
const minResolver = new MinResolver(lossless, ['value']);
lossless.ingestDelta(createDelta('test', 'host1') lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'value', -5, 'collection') .setProperty('entity1', 'value', -5, 'collection')
.buildV1() .buildV1()
@ -292,7 +306,6 @@ describe('Aggregation Resolvers', () => {
.buildV1() .buildV1()
); );
const minResolver = new MinResolver(lossless, ['value']);
const result = minResolver.resolve(); const result = minResolver.resolve();
expect(result).toBeDefined(); expect(result).toBeDefined();

View File

@ -16,14 +16,15 @@ describe('Basic Dependency Resolution', () => {
test('should resolve dependencies in correct order', () => { test('should resolve dependencies in correct order', () => {
// Define a simple plugin that depends on another // Define a simple plugin that depends on another
class FirstPlugin implements ResolverPlugin<{ value: string }, string> { class FirstPlugin extends ResolverPlugin<{ value: string }, string> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
initialize() { initialize() {
return { value: '' }; return { value: '' };
} }
update(_currentState: { value: string }, newValue: PropertyTypes) { update(currentState: { value: string }, newValue: PropertyTypes) {
if (newValue === undefined) return currentState;
return { value: String(newValue) }; return { value: String(newValue) };
} }
@ -33,15 +34,16 @@ describe('Basic Dependency Resolution', () => {
} }
class SecondPlugin implements ResolverPlugin<{ value: string }, string> { class SecondPlugin extends ResolverPlugin<{ value: string }, string> {
readonly dependencies = ['first'] as const; readonly dependencies = ['first'] as const;
initialize() { initialize() {
return { value: '' }; return { value: '' };
} }
update(_currentState: { value: string }, newValue: PropertyTypes, _delta: CollapsedDelta, dependencies: { first: string }) { update(currentState: { value: string }, newValue?: PropertyTypes, _delta?: CollapsedDelta, dependencies?: { first: string }) {
return { value: `${dependencies.first}_${newValue}` }; if (newValue === undefined) return currentState;
return { value: `${dependencies?.first}_${newValue}` };
} }
resolve(state: { value: string }) { resolve(state: { value: string }) {

View File

@ -16,7 +16,7 @@ describe('Circular Dependency Detection', () => {
test('should detect circular dependencies', () => { test('should detect circular dependencies', () => {
// PluginA depends on PluginB // PluginA depends on PluginB
class PluginA implements ResolverPlugin<{ value: string }, string> { class PluginA extends ResolverPlugin<{ value: string }, string> {
readonly dependencies = ['b'] as const; readonly dependencies = ['b'] as const;
initialize() { initialize() {
@ -34,7 +34,7 @@ describe('Circular Dependency Detection', () => {
// PluginB depends on PluginA (circular dependency) // PluginB depends on PluginA (circular dependency)
class PluginB implements ResolverPlugin<{ value: string }, string> { class PluginB extends ResolverPlugin<{ value: string }, string> {
readonly dependencies = ['a'] as const; readonly dependencies = ['a'] as const;
initialize() { initialize() {
@ -61,21 +61,21 @@ describe('Circular Dependency Detection', () => {
}); });
test('should detect longer circular dependency chains', () => { test('should detect longer circular dependency chains', () => {
class PluginA implements ResolverPlugin<{ value: string }, string> { class PluginA extends ResolverPlugin<{ value: string }, string> {
readonly dependencies = ['c'] as const; readonly dependencies = ['c'] as const;
initialize() { return { value: '' }; } initialize() { return { value: '' }; }
update() { return { value: '' }; } update() { return { value: '' }; }
resolve() { return 'a'; } resolve() { return 'a'; }
} }
class PluginB implements ResolverPlugin<{ value: string }, string> { class PluginB extends ResolverPlugin<{ value: string }, string> {
readonly dependencies = ['a'] as const; readonly dependencies = ['a'] as const;
initialize() { return { value: '' }; } initialize() { return { value: '' }; }
update() { return { value: '' }; } update() { return { value: '' }; }
resolve() { return 'b'; } resolve() { return 'b'; }
} }
class PluginC implements ResolverPlugin<{ value: string }, string> { class PluginC extends ResolverPlugin<{ value: string }, string> {
readonly dependencies = ['b'] as const; readonly dependencies = ['b'] as const;
initialize() { return { value: '' }; } initialize() { return { value: '' }; }
update() { return { value: '' }; } update() { return { value: '' }; }

View File

@ -16,24 +16,9 @@ describe('Edge Cases', () => {
lossless = new Lossless(node); lossless = new Lossless(node);
}); });
test('should handle null and undefined values', () => { test('should handle null values', () => {
lossless.ingestDelta(
createDelta('user1', 'host1')
.withTimestamp(1000)
.setProperty('test1', 'value', null, 'test')
.buildV1()
);
// Use null instead of undefined as it's a valid PropertyType
lossless.ingestDelta(
createDelta('user1', 'host1')
.withTimestamp(2000)
.setProperty('test1', 'value', null, 'test')
.buildV1()
);
// Create a type-safe plugin that handles null/undefined values // Create a type-safe plugin that handles null/undefined values
class NullSafeLastWriteWinsPlugin implements ResolverPlugin<{ value: PropertyTypes | null, timestamp: number }, never> { class NullSafeLastWriteWinsPlugin extends ResolverPlugin<{ value: PropertyTypes | null, timestamp: number }, never> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
initialize() { initialize() {
@ -42,10 +27,11 @@ describe('Edge Cases', () => {
update( update(
currentState: { value: PropertyTypes | null, timestamp: number }, currentState: { value: PropertyTypes | null, timestamp: number },
newValue: PropertyTypes, newValue?: PropertyTypes,
delta: CollapsedDelta, delta?: CollapsedDelta,
_dependencies: DependencyStates
) { ) {
if (newValue === undefined) return currentState;
if (!delta) return currentState;
if (delta.timeCreated > currentState.timestamp) { if (delta.timeCreated > currentState.timestamp) {
return { value: newValue, timestamp: delta.timeCreated }; return { value: newValue, timestamp: delta.timeCreated };
} }
@ -54,9 +40,8 @@ describe('Edge Cases', () => {
resolve( resolve(
state: { value: PropertyTypes | null, timestamp: number }, state: { value: PropertyTypes | null, timestamp: number },
_dependencies: DependencyStates
): PropertyTypes | undefined { ): PropertyTypes | undefined {
return state.value ?? undefined; return state.value;
} }
} }
@ -64,31 +49,22 @@ describe('Edge Cases', () => {
value: new NullSafeLastWriteWinsPlugin() value: new NullSafeLastWriteWinsPlugin()
}); });
const results = resolver.resolve() || [];
expect(Array.isArray(results)).toBe(true);
const test1 = results.find(r => r.id === 'test1');
expect(test1).toBeDefined();
expect(test1?.properties.value).toBeUndefined();
});
test('should handle concurrent updates with same timestamp', () => {
// Two updates with the same timestamp
lossless.ingestDelta( lossless.ingestDelta(
createDelta('user1', 'host1') createDelta('user1', 'host1')
.withTimestamp(1000) .withTimestamp(1000)
.setProperty('test2', 'value', 'first', 'test') .setProperty('test2', 'value', null, 'test')
.buildV1() .buildV1()
); );
lossless.ingestDelta( const results = resolver.resolve() || {};
createDelta('user2', 'host2') const test1 = results['test2']
.withTimestamp(1000) // Same timestamp expect(test1).toBeDefined();
.setProperty('test2', 'value', 'second', 'test') expect(test1?.properties.value).toBeNull();
.buildV1() });
);
test('should handle concurrent updates with same timestamp', () => {
// Custom plugin that handles concurrent updates with the same timestamp // Custom plugin that handles concurrent updates with the same timestamp
class ConcurrentUpdatePlugin implements ResolverPlugin<{ value: PropertyTypes, timestamp: number }, never> { class ConcurrentUpdatePlugin extends ResolverPlugin<{ value: PropertyTypes, timestamp: number }, never> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
initialize() { initialize() {
@ -123,27 +99,33 @@ describe('Edge Cases', () => {
value: new ConcurrentUpdatePlugin() value: new ConcurrentUpdatePlugin()
}); });
const results = resolver.resolve() || []; // Two updates with the same timestamp
expect(Array.isArray(results)).toBe(true); lossless.ingestDelta(
const test2 = results.find(r => r.id === 'test2'); createDelta('user1', 'host1')
.withTimestamp(1000)
.setProperty('test2', 'value', null, 'test')
.buildV1()
);
lossless.ingestDelta(
createDelta('user2', 'host2')
.withTimestamp(1000) // Same timestamp
.setProperty('test2', 'value', 'xylophone', 'test')
.buildV1()
);
const results = resolver.resolve() || {};
const test2 = results['test2'];
expect(test2).toBeDefined(); expect(test2).toBeDefined();
// Should pick one of the values deterministically // Should pick one of the values deterministically
expect(test2?.properties.value).toBe('first'); expect(test2?.properties.value).toBeNull();
}); });
test('should handle very large numbers of updates', () => { test('should handle very large numbers of updates', () => {
// Add 1000 updates
for (let i = 0; i < 1000; i++) {
lossless.ingestDelta(
createDelta('user1', 'host1')
.withTimestamp(1000 + i)
.setProperty('test3', 'counter', i, 'test')
.buildV1()
);
}
// Plugin that handles large numbers of updates efficiently // Plugin that handles large numbers of updates efficiently
class CounterPlugin implements ResolverPlugin<{ count: number }, never> { class CounterPlugin extends ResolverPlugin<{ count: number }, never> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
initialize() { initialize() {
@ -171,9 +153,18 @@ describe('Edge Cases', () => {
counter: new CounterPlugin() counter: new CounterPlugin()
}); });
const results = resolver.resolve() || []; // Add 1000 updates
expect(Array.isArray(results)).toBe(true); for (let i = 0; i < 1000; i++) {
const test3 = results.find(r => r.id === 'test3'); lossless.ingestDelta(
createDelta('user1', 'host1')
.withTimestamp(1000 + i)
.setProperty('test3', 'counter', i, 'test')
.buildV1()
);
}
const results = resolver.resolve() || {};
const test3 = results['test3']
expect(test3).toBeDefined(); expect(test3).toBeDefined();
// Should handle large numbers of updates efficiently // Should handle large numbers of updates efficiently
expect(test3?.properties.counter).toBe(1000); // Should count all 1000 updates expect(test3?.properties.counter).toBe(1000); // Should count all 1000 updates
@ -182,29 +173,23 @@ describe('Edge Cases', () => {
test('should handle missing properties gracefully', () => { test('should handle missing properties gracefully', () => {
// No deltas added - should handle empty state // No deltas added - should handle empty state
// Plugin that handles missing properties gracefully // Plugin that handles missing properties gracefully
class MissingPropertyPlugin implements ResolverPlugin<{ initialized: boolean }, never> { class MissingPropertyPlugin extends ResolverPlugin<{ initialized: boolean }, never> {
private _initialized = false;
readonly dependencies = [] as const; readonly dependencies = [] as const;
initialize() { initialize() {
this._initialized = true;
return { initialized: true }; return { initialized: true };
} }
update( update(
currentState: { initialized: boolean }, currentState: { initialized: boolean },
_newValue: PropertyTypes,
_delta: CollapsedDelta,
_dependencies: DependencyStates
) { ) {
return currentState; return currentState;
} }
resolve( resolve(
_state: { initialized: boolean }, state: { initialized: boolean }
_dependencies: DependencyStates
): boolean { ): boolean {
return this._initialized; return state.initialized;
} }
} }

View File

@ -11,10 +11,11 @@ import {
} from '@src/views/resolvers/custom-resolvers'; } from '@src/views/resolvers/custom-resolvers';
// A simple plugin that depends on other plugins // A simple plugin that depends on other plugins
class AveragePlugin<Targets extends PropertyID> implements ResolverPlugin<{ initialized: boolean }, Targets> { class AveragePlugin<Targets extends PropertyID> extends ResolverPlugin<{ initialized: boolean }, Targets> {
readonly dependencies: Targets[] = []; readonly dependencies: Targets[] = [];
constructor(...targets: Targets[]) { constructor(...targets: Targets[]) {
super();
if (targets.length !== 2) { if (targets.length !== 2) {
throw new Error('This AveragePlugin requires exactly two targets'); throw new Error('This AveragePlugin requires exactly two targets');
} }
@ -96,8 +97,8 @@ describe('Multiple Plugins Integration', () => {
lossless.ingestDelta( lossless.ingestDelta(
createDelta('user1', 'host1') createDelta('user1', 'host1')
.withTimestamp(1000) .withTimestamp(1000)
.setProperty('entity1', 'name', 'Test Entity', 'test') .setProperty('entity1', 'name', 'Test Entity', 'test-name')
.setProperty('entity1', 'tags', 'tag1', 'test') .setProperty('entity1', 'tags', 'tag1', 'test-tags')
.buildV1() .buildV1()
); );

View File

@ -8,7 +8,7 @@ import {
import { PropertyTypes } from '@src/core/types'; import { PropertyTypes } from '@src/core/types';
// A simple plugin for testing lifecycle methods // A simple plugin for testing lifecycle methods
class LifecycleTestPlugin implements ResolverPlugin<LifecycleTestState> { class LifecycleTestPlugin extends ResolverPlugin<LifecycleTestState> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
private initialState: LifecycleTestState = { private initialState: LifecycleTestState = {

View File

@ -4,7 +4,7 @@ import { PropertyTypes } from '@src/core/types';
import type { CollapsedDelta } from '@src/views/lossless'; import type { CollapsedDelta } from '@src/views/lossless';
import { testResolverWithPlugins, createTestDelta } from '@test-helpers/resolver-test-helper'; import { testResolverWithPlugins, createTestDelta } from '@test-helpers/resolver-test-helper';
class CountPlugin implements ResolverPlugin<{ count: number }, never> { class CountPlugin extends ResolverPlugin<{ count: number }, never> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
initialize() { initialize() {

View File

@ -5,7 +5,7 @@ import { testResolverWithPlugins, createTestDelta } from '@test-helpers/resolver
import Debug from 'debug'; import Debug from 'debug';
const debug = Debug('rz:test:discount-plugins'); const debug = Debug('rz:test:discount-plugins');
// Mock plugins for testing // Mock plugins for testing
class DiscountPlugin implements ResolverPlugin<number, never> { class DiscountPlugin extends ResolverPlugin<number, never> {
readonly name = 'discount' as const; readonly name = 'discount' as const;
readonly dependencies = [] as const; readonly dependencies = [] as const;
@ -28,7 +28,7 @@ class DiscountPlugin implements ResolverPlugin<number, never> {
} }
} }
class DiscountedPricePlugin implements ResolverPlugin<number | null, 'discount'> { class DiscountedPricePlugin extends ResolverPlugin<number | null, 'discount'> {
readonly name = 'price' as const; readonly name = 'price' as const;
readonly dependencies = ['discount'] as const; readonly dependencies = ['discount'] as const;

View File

@ -8,7 +8,7 @@ import { ResolverPlugin } from '@src/views/resolvers/custom-resolvers/plugin';
// const debug = Debug('rz:test:resolver'); // const debug = Debug('rz:test:resolver');
// Mock plugins for testing // Mock plugins for testing
class TestPlugin implements ResolverPlugin<unknown, string> { class TestPlugin extends ResolverPlugin<unknown, string> {
name: string; name: string;
dependencies: readonly string[]; dependencies: readonly string[];

View File

@ -18,7 +18,7 @@ describe('State Visibility', () => {
}); });
// A test plugin that records which states it sees // A test plugin that records which states it sees
class StateSpyPlugin implements ResolverPlugin<{ values: string[] }, 'dependsOn'> { class StateSpyPlugin extends ResolverPlugin<{ values: string[] }, 'dependsOn'> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
seenStates: Record<string, unknown>[] = []; seenStates: Record<string, unknown>[] = [];
@ -51,7 +51,7 @@ describe('State Visibility', () => {
} }
// A simple plugin that depends on another property // A simple plugin that depends on another property
class DependentPlugin implements ResolverPlugin<{ value: string }, 'dependsOn'> { class DependentPlugin extends ResolverPlugin<{ value: string }, 'dependsOn'> {
readonly dependencies = ['dependsOn'] as const; readonly dependencies = ['dependsOn'] as const;
seenStates: Record<string, unknown>[] = []; seenStates: Record<string, unknown>[] = [];
@ -189,7 +189,7 @@ describe('State Visibility', () => {
}); });
test('should throw error for unknown dependencies', () => { test('should throw error for unknown dependencies', () => {
class PluginWithBadDeps implements ResolverPlugin<{ value: string }, 'nonexistent'> { class PluginWithBadDeps extends ResolverPlugin<{ value: string }, 'nonexistent'> {
readonly dependencies = ['nonexistent'] as const; readonly dependencies = ['nonexistent'] as const;
initialize() { initialize() {

View File

@ -164,7 +164,7 @@ const resolver = new CustomResolver(view, {
taxRate: new LastWriteWinsPlugin(), taxRate: new LastWriteWinsPlugin(),
// Complex plugin with multiple dependencies // Complex plugin with multiple dependencies
subtotal: new class implements ResolverPlugin<SubtotalState, 'unitPrice' | 'quantity'> { subtotal: new class extends ResolverPlugin<SubtotalState, 'unitPrice' | 'quantity'> {
readonly dependencies = ['unitPrice', 'quantity'] as const; readonly dependencies = ['unitPrice', 'quantity'] as const;
initialize() { return { value: 0 }; } initialize() { return { value: 0 }; }

View File

@ -93,7 +93,7 @@ Resolves the final value from the current state.
## Example Implementation ## Example Implementation
```typescript ```typescript
class CounterPlugin implements ResolverPlugin<CounterState> { class CounterPlugin extends ResolverPlugin<CounterState> {
initialize(): CounterState { initialize(): CounterState {
return { count: 0 }; return { count: 0 };
@ -126,7 +126,7 @@ class CounterPlugin implements ResolverPlugin<CounterState> {
### Accessing Dependencies ### Accessing Dependencies
```typescript ```typescript
class PriceCalculator implements ResolverPlugin<PriceState, 'basePrice' | 'taxRate'> { class PriceCalculator extends ResolverPlugin<PriceState, 'basePrice' | 'taxRate'> {
readonly dependencies = ['basePrice', 'taxRate'] as const; readonly dependencies = ['basePrice', 'taxRate'] as const;
update( update(
@ -147,7 +147,7 @@ class PriceCalculator implements ResolverPlugin<PriceState, 'basePrice' | 'taxRa
### Optional Dependencies ### Optional Dependencies
```typescript ```typescript
class OptionalDepPlugin implements ResolverPlugin<State, 'required' | 'optional?'> { class OptionalDepPlugin extends ResolverPlugin<State, 'required' | 'optional?'> {
readonly dependencies = ['required', 'optional?'] as const; readonly dependencies = ['required', 'optional?'] as const;
update( update(

View File

@ -117,7 +117,7 @@ Configuration object mapping property IDs to their resolver plugins.
### `LastWriteWinsPlugin` ### `LastWriteWinsPlugin`
```typescript ```typescript
class LastWriteWinsPlugin implements ResolverPlugin<LastWriteWinsState> { class LastWriteWinsPlugin extends ResolverPlugin<LastWriteWinsState> {
// ... // ...
} }
@ -130,7 +130,7 @@ interface LastWriteWinsState {
### `FirstWriteWinsPlugin` ### `FirstWriteWinsPlugin`
```typescript ```typescript
class FirstWriteWinsPlugin implements ResolverPlugin<FirstWriteWinsState> { class FirstWriteWinsPlugin extends ResolverPlugin<FirstWriteWinsState> {
// ... // ...
} }
@ -148,7 +148,7 @@ interface ConcatenationOptions {
sort?: boolean; sort?: boolean;
} }
class ConcatenationPlugin implements ResolverPlugin<ConcatenationState> { class ConcatenationPlugin extends ResolverPlugin<ConcatenationState> {
constructor(private options: ConcatenationOptions = {}) { constructor(private options: ConcatenationOptions = {}) {
this.options = { this.options = {
@ -173,7 +173,7 @@ interface MajorityVoteOptions {
minVotes?: number; minVotes?: number;
} }
class MajorityVotePlugin implements ResolverPlugin<MajorityVoteState> { class MajorityVotePlugin extends ResolverPlugin<MajorityVoteState> {
constructor(private options: MajorityVoteOptions = {}) { constructor(private options: MajorityVoteOptions = {}) {
this.options = { this.options = {
@ -222,7 +222,7 @@ interface CounterState {
type CounterDeps = 'incrementBy' | 'resetThreshold'; type CounterDeps = 'incrementBy' | 'resetThreshold';
// Implement plugin with type safety // Implement plugin with type safety
class CounterPlugin implements ResolverPlugin<CounterState, CounterDeps> { class CounterPlugin extends ResolverPlugin<CounterState, CounterDeps> {
readonly dependencies = ['incrementBy', 'resetThreshold'] as const; readonly dependencies = ['incrementBy', 'resetThreshold'] as const;
initialize(): CounterState { initialize(): CounterState {

View File

@ -50,7 +50,7 @@ const resolver = new CustomResolver(view, {
To make a dependency optional, mark it with a `?` suffix: To make a dependency optional, mark it with a `?` suffix:
```typescript ```typescript
class MyPlugin implements ResolverPlugin<MyState, 'required' | 'optional?'> { class MyPlugin extends ResolverPlugin<MyState, 'required' | 'optional?'> {
readonly dependencies = ['required', 'optional?'] as const; readonly dependencies = ['required', 'optional?'] as const;
// ... // ...
@ -62,7 +62,7 @@ class MyPlugin implements ResolverPlugin<MyState, 'required' | 'optional?'> {
For plugins that need to determine dependencies at runtime, you can implement a custom resolver: For plugins that need to determine dependencies at runtime, you can implement a custom resolver:
```typescript ```typescript
class DynamicDepsPlugin implements ResolverPlugin<DynamicState> { class DynamicDepsPlugin extends ResolverPlugin<DynamicState> {
getDependencies(config: any): string[] { getDependencies(config: any): string[] {
// Determine dependencies based on config // Determine dependencies based on config

View File

@ -21,7 +21,7 @@ The Custom Resolver system provides a powerful dependency management system that
## Example ## Example
```typescript ```typescript
class TotalPricePlugin implements ResolverPlugin<TotalState, 'price' | 'tax'> { class TotalPricePlugin extends ResolverPlugin<TotalState, 'price' | 'tax'> {
readonly dependencies = ['price', 'tax'] as const; readonly dependencies = ['price', 'tax'] as const;
initialize(): TotalState { initialize(): TotalState {

View File

@ -37,7 +37,7 @@ type DependencyStates = {
Dependencies are declared as a readonly array of string literals: Dependencies are declared as a readonly array of string literals:
```typescript ```typescript
class MyPlugin implements ResolverPlugin<MyState, 'dep1' | 'dep2'> { class MyPlugin extends ResolverPlugin<MyState, 'dep1' | 'dep2'> {
readonly dependencies = ['dep1', 'dep2'] as const; readonly dependencies = ['dep1', 'dep2'] as const;
// ... implementation // ... implementation
@ -101,7 +101,7 @@ if (typeof deps.price === 'number') {
### Optional Dependencies ### Optional Dependencies
```typescript ```typescript
class MyPlugin implements ResolverPlugin<MyState, 'required' | 'optional?'> { class MyPlugin extends ResolverPlugin<MyState, 'required' | 'optional?'> {
readonly dependencies = ['required', 'optional?'] as const; readonly dependencies = ['required', 'optional?'] as const;
update(_state: MyState, _value: unknown, _delta: CollapsedDelta, deps: any) { update(_state: MyState, _value: unknown, _delta: CollapsedDelta, deps: any) {
@ -118,7 +118,7 @@ class MyPlugin implements ResolverPlugin<MyState, 'required' | 'optional?'> {
```typescript ```typescript
type PriceDependencies = 'price1' | 'price2' | 'price3'; type PriceDependencies = 'price1' | 'price2' | 'price3';
class PriceAggregator implements ResolverPlugin<PriceState, PriceDependencies> { class PriceAggregator extends ResolverPlugin<PriceState, PriceDependencies> {
readonly dependencies: readonly PriceDependencies[] = ['price1', 'price2', 'price3'] as const; readonly dependencies: readonly PriceDependencies[] = ['price1', 'price2', 'price3'] as const;
update(_state: PriceState, _value: unknown, _delta: CollapsedDelta, deps: any) { update(_state: PriceState, _value: unknown, _delta: CollapsedDelta, deps: any) {

View File

@ -11,7 +11,7 @@ A minimal plugin must implement the `ResolverPlugin` interface:
```typescript ```typescript
import { ResolverPlugin } from '../resolver'; import { ResolverPlugin } from '../resolver';
class MyPlugin implements ResolverPlugin<MyState> { class MyPlugin extends ResolverPlugin<MyState> {
initialize(): MyState { initialize(): MyState {
// Return initial state // Return initial state
@ -40,7 +40,7 @@ class MyPlugin implements ResolverPlugin<MyState> {
To depend on other properties, specify the dependency types: To depend on other properties, specify the dependency types:
```typescript ```typescript
class DiscountedPricePlugin implements ResolverPlugin<DiscountState, 'basePrice' | 'discount'> { class DiscountedPricePlugin extends ResolverPlugin<DiscountState, 'basePrice' | 'discount'> {
readonly dependencies = ['basePrice', 'discount'] as const; readonly dependencies = ['basePrice', 'discount'] as const;
initialize(): DiscountState { initialize(): DiscountState {

View File

@ -18,6 +18,8 @@ export type EntityRecord = {
properties: EntityProperties; properties: EntityProperties;
}; };
export type EntityRecordMany = Record<string, EntityRecord>;
export class Entity { export class Entity {
properties: EntityProperties = {}; properties: EntityProperties = {};
ahead = 0; ahead = 0;

View File

@ -28,7 +28,7 @@ export abstract class BaseOrchestrator implements NodeOrchestrator {
* Default implementation does nothing - should be overridden by subclasses * Default implementation does nothing - should be overridden by subclasses
* that support direct node connections * that support direct node connections
*/ */
async connectNodes(node1: NodeHandle, node2: NodeHandle): Promise<void> { async connectNodes(_node1: NodeHandle, _node2: NodeHandle): Promise<void> {
// Default implementation does nothing // Default implementation does nothing
console.warn('connectNodes not implemented for this orchestrator'); console.warn('connectNodes not implemented for this orchestrator');
} }
@ -38,7 +38,7 @@ export abstract class BaseOrchestrator implements NodeOrchestrator {
* Default implementation does nothing - should be overridden by subclasses * Default implementation does nothing - should be overridden by subclasses
* that support network partitioning * that support network partitioning
*/ */
async partitionNetwork(partitions: { groups: string[][] }): Promise<void> { async partitionNetwork(_partitions: { groups: string[][] }): Promise<void> {
// Default implementation does nothing // Default implementation does nothing
console.warn('partitionNetwork not implemented for this orchestrator'); console.warn('partitionNetwork not implemented for this orchestrator');
} }
@ -49,8 +49,8 @@ export abstract class BaseOrchestrator implements NodeOrchestrator {
* that support resource management * that support resource management
*/ */
async setResourceLimits( async setResourceLimits(
handle: NodeHandle, _handle: NodeHandle,
limits: Partial<NodeConfig['resources']> _limits: Partial<NodeConfig['resources']>
): Promise<void> { ): Promise<void> {
// Default implementation does nothing // Default implementation does nothing
console.warn('setResourceLimits not implemented for this orchestrator'); console.warn('setResourceLimits not implemented for this orchestrator');

View File

@ -18,6 +18,20 @@ export type CollapsedDelta = Omit<DeltaNetworkImageV1, 'pointers'> & {
pointers: CollapsedPointer[]; pointers: CollapsedPointer[];
}; };
// Extract a particular value from a delta's pointers
export function valueFromCollapsedDelta(
key: string,
delta: CollapsedDelta
): string | number | undefined {
for (const pointer of delta.pointers) {
for (const [k, value] of Object.entries(pointer)) {
if (k === key && (typeof value === "string" || typeof value === "number")) {
return value;
}
}
}
}
export type LosslessViewOne = { export type LosslessViewOne = {
id: DomainEntityID, id: DomainEntityID,
referencedAs: string[]; referencedAs: string[];

View File

@ -62,7 +62,7 @@ export abstract class Lossy<Accumulator, Result = Accumulator> {
// Resolve the current state of the view // Resolve the current state of the view
resolve(entityIds?: DomainEntityID[]): Result | undefined { resolve(entityIds?: DomainEntityID[]): Result | undefined {
if (!this.accumulator) { if (!this.accumulator) {
return undefined; this.accumulator =this.initializer?.() || {} as Accumulator;
} }
if (!entityIds) { if (!entityIds) {

View File

@ -1,7 +1,8 @@
import { Lossless, LosslessViewOne } from "../lossless"; import { Lossless, LosslessViewOne } from "../lossless";
import { Lossy } from '../lossy'; import { Lossy } from '../lossy';
import { DomainEntityID, PropertyID, ViewMany } from "../../core/types"; import { DomainEntityID, PropertyID, ViewMany } from "../../core/types";
import { CollapsedDelta } from "../lossless"; import { valueFromCollapsedDelta } from "../lossless";
import { EntityRecord, EntityRecordMany } from "@src/core/entity";
export type AggregationType = 'min' | 'max' | 'sum' | 'average' | 'count'; export type AggregationType = 'min' | 'max' | 'sum' | 'average' | 'count';
@ -27,22 +28,30 @@ export type AggregatedViewOne = {
export type AggregatedViewMany = ViewMany<AggregatedViewOne>; export type AggregatedViewMany = ViewMany<AggregatedViewOne>;
type Accumulator = AggregatedViewMany; type Accumulator = AggregatedViewMany;
type Result = EntityRecordMany;
// Extract a particular value from a delta's pointers function aggregateValues(values: number[], type: AggregationType): number {
export function valueFromCollapsedDelta( if (values.length === 0) return 0;
key: string,
delta: CollapsedDelta switch (type) {
): string | number | undefined { case 'min':
for (const pointer of delta.pointers) { return Math.min(...values);
for (const [k, value] of Object.entries(pointer)) { case 'max':
if (k === key && (typeof value === "string" || typeof value === "number")) { return Math.max(...values);
return value; case 'sum':
} return values.reduce((sum, val) => sum + val, 0);
} case 'average':
return values.reduce((sum, val) => sum + val, 0) / values.length;
case 'count':
// For count, we want to count all values, including duplicates
// So we use the length of the values array directly
return values.length;
default:
throw new Error(`Unknown aggregation type: ${type}`);
} }
} }
export class AggregationResolver extends Lossy<Accumulator> { export class AggregationResolver extends Lossy<Accumulator, Result> {
constructor( constructor(
lossless: Lossless, lossless: Lossless,
private config: AggregationConfig private config: AggregationConfig
@ -67,20 +76,44 @@ export class AggregationResolver extends Lossy<Accumulator> {
} }
// Extract numeric values from all deltas for this property // Extract numeric values from all deltas for this property
const newValues: number[] = [];
for (const delta of deltas || []) { for (const delta of deltas || []) {
const value = valueFromCollapsedDelta(propertyId, delta); const value = valueFromCollapsedDelta(propertyId, delta);
if (typeof value === 'number') { if (typeof value === 'number') {
newValues.push(value); if (this.config[propertyId] === 'count') {
// For count, include all values (including duplicates)
acc[cur.id].properties[propertyId].values.push(value);
} else {
// For other aggregations, only add unique values
if (!acc[cur.id].properties[propertyId].values.includes(value)) {
acc[cur.id].properties[propertyId].values.push(value);
}
}
} }
} }
// Update the values array (avoiding duplicates by clearing and rebuilding)
acc[cur.id].properties[propertyId].values = newValues;
} }
return acc; return acc;
} }
resolver(cur: Accumulator): Result {
const res: Result = {};
for (const [id, entity] of Object.entries(cur)) {
const entityResult: EntityRecord = { id, properties: {} };
for (const [propertyId, aggregatedProp] of Object.entries(entity.properties)) {
const result = aggregateValues(aggregatedProp.values, aggregatedProp.type);
entityResult.properties[propertyId] = result;
}
// Only include entities that have at least one aggregated property
if (Object.keys(entityResult.properties).length > 0) {
res[id] = entityResult;
}
}
return res;
}
} }
// Convenience classes for common aggregation types // Convenience classes for common aggregation types

View File

@ -28,6 +28,40 @@ export abstract class ResolverPlugin<
*/ */
dependencies?: readonly D[]; dependencies?: readonly D[];
/**
* Convenience wrapper to avoid calling update() when there is no new value
* @param currentState The current state of the plugin
* @param newValue The new value to apply
* @param delta The delta that triggered the update
* @param dependencies The dependencies of the plugin
* @returns The updated state
*/
applyUpdate(
currentState: T,
newValue?: PropertyTypes,
delta?: CollapsedDelta,
dependencies?: DependencyStates
): T {
if (newValue === undefined) {
switch(this.dependencies?.length) {
case 0: {
// No dependencies, no new value -- nothing to do.
return currentState;
}
case 1: {
// Only one dependency, use it as the new value.
newValue = dependencies![this.dependencies[0]] as PropertyTypes;
break;
}
default: {
// Pass dependencies as is, and leave newValue undefined.
break;
}
}
}
return this.update(currentState, newValue, delta, dependencies);
};
/** /**
* Initialize the state for a property * Initialize the state for a property
*/ */
@ -38,7 +72,7 @@ export abstract class ResolverPlugin<
/** /**
* Process a new value for the property * Process a new value for the property
*/ */
abstract update( protected abstract update(
currentState: T, currentState: T,
newValue?: PropertyTypes, newValue?: PropertyTypes,
delta?: CollapsedDelta, delta?: CollapsedDelta,

View File

@ -13,10 +13,12 @@ type ConcatenationState = {
* *
* Concatenates all string values with a separator * Concatenates all string values with a separator
*/ */
export class ConcatenationPlugin implements ResolverPlugin<ConcatenationState, never> { export class ConcatenationPlugin extends ResolverPlugin<ConcatenationState, never> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
constructor(private separator: string = ' ') {} constructor(private separator: string = ' ') {
super();
}
initialize(): ConcatenationState { initialize(): ConcatenationState {
return { values: [] }; return { values: [] };

View File

@ -12,7 +12,7 @@ type FirstWriteWinsState = {
* *
* Keeps the first value that was written, ignoring subsequent writes * Keeps the first value that was written, ignoring subsequent writes
*/ */
export class FirstWriteWinsPlugin implements ResolverPlugin<FirstWriteWinsState, never> { export class FirstWriteWinsPlugin extends ResolverPlugin<FirstWriteWinsState, never> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
initialize(): FirstWriteWinsState { initialize(): FirstWriteWinsState {

View File

@ -12,7 +12,7 @@ type LastWriteWinsState = {
* *
* Keeps the most recent value based on the delta's timestamp * Keeps the most recent value based on the delta's timestamp
*/ */
export class LastWriteWinsPlugin implements ResolverPlugin<LastWriteWinsState, never> { export class LastWriteWinsPlugin extends ResolverPlugin<LastWriteWinsState, never> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
initialize(): LastWriteWinsState { initialize(): LastWriteWinsState {

View File

@ -10,7 +10,7 @@ type MajorityVoteState = {
* *
* Returns the value that appears most frequently * Returns the value that appears most frequently
*/ */
export class MajorityVotePlugin implements ResolverPlugin<MajorityVoteState, never> { export class MajorityVotePlugin extends ResolverPlugin<MajorityVoteState, never> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
initialize(): MajorityVoteState { initialize(): MajorityVoteState {
@ -21,6 +21,7 @@ export class MajorityVotePlugin implements ResolverPlugin<MajorityVoteState, nev
currentState: MajorityVoteState, currentState: MajorityVoteState,
newValue: PropertyTypes, newValue: PropertyTypes,
): MajorityVoteState { ): MajorityVoteState {
if (newValue === undefined) return currentState;
const currentCount = currentState.votes.get(newValue) || 0; const currentCount = currentState.votes.get(newValue) || 0;
// Create a new Map to ensure immutability // Create a new Map to ensure immutability
const newVotes = new Map(currentState.votes); const newVotes = new Map(currentState.votes);

View File

@ -1,5 +1,4 @@
import { PropertyID, PropertyTypes } from "@src/core/types"; import { PropertyID, PropertyTypes } from "@src/core/types";
import { CollapsedDelta } from "@src/views/lossless";
import { ResolverPlugin, DependencyStates } from "../plugin"; import { ResolverPlugin, DependencyStates } from "../plugin";
type MaxPluginState = { type MaxPluginState = {
@ -11,11 +10,12 @@ type MaxPluginState = {
* *
* Tracks the maximum numeric value * Tracks the maximum numeric value
*/ */
export class MaxPlugin<Target extends PropertyID> implements ResolverPlugin<MaxPluginState, Target> { export class MaxPlugin<Target extends PropertyID> extends ResolverPlugin<MaxPluginState, Target> {
name = 'max'; name = 'max';
readonly dependencies: Target[] = []; readonly dependencies: Target[] = [];
constructor(private readonly target?: Target) { constructor(private readonly target?: Target) {
super();
if (target) { if (target) {
this.dependencies = [target]; this.dependencies = [target];
} }
@ -28,12 +28,8 @@ export class MaxPlugin<Target extends PropertyID> implements ResolverPlugin<MaxP
update( update(
currentState: MaxPluginState, currentState: MaxPluginState,
newValue?: PropertyTypes, newValue?: PropertyTypes,
_delta?: CollapsedDelta,
dependencies?: DependencyStates
): MaxPluginState { ): MaxPluginState {
// const numValue = typeof newValue === 'number' ? newValue : parseFloat(String(newValue)); const numValue = newValue as number;
const numValue = (this.target ? dependencies?.[this.target] : newValue) as number;
if (currentState.max === undefined || numValue > currentState.max) { if (currentState.max === undefined || numValue > currentState.max) {
return { max: numValue }; return { max: numValue };
} }
@ -42,7 +38,6 @@ export class MaxPlugin<Target extends PropertyID> implements ResolverPlugin<MaxP
resolve( resolve(
state: MaxPluginState, state: MaxPluginState,
_dependencies?: DependencyStates
): PropertyTypes | undefined { ): PropertyTypes | undefined {
return state.max; return state.max;
} }

View File

@ -1,6 +1,5 @@
import { PropertyTypes, PropertyID } from "../../../../core/types"; import { PropertyTypes, PropertyID } from "../../../../core/types";
import { CollapsedDelta } from "../../../lossless"; import { DependencyStates, ResolverPlugin } from "../plugin";
import { ResolverPlugin, DependencyStates } from "../plugin";
type MinPluginState = { type MinPluginState = {
min?: number; min?: number;
@ -11,11 +10,12 @@ type MinPluginState = {
* *
* Tracks the minimum numeric value * Tracks the minimum numeric value
*/ */
export class MinPlugin<Target extends PropertyID> implements ResolverPlugin<MinPluginState, Target> { export class MinPlugin<Target extends PropertyID> extends ResolverPlugin<MinPluginState, Target> {
name = 'min'; name = 'min';
readonly dependencies: Target[] = []; readonly dependencies: Target[] = [];
constructor(private readonly target?: Target) { constructor(private readonly target?: Target) {
super();
if (target) { if (target) {
this.dependencies = [target]; this.dependencies = [target];
} }
@ -28,11 +28,8 @@ export class MinPlugin<Target extends PropertyID> implements ResolverPlugin<MinP
update( update(
currentState: MinPluginState, currentState: MinPluginState,
newValue?: PropertyTypes, newValue?: PropertyTypes,
_delta?: CollapsedDelta,
dependencies?: DependencyStates
): MinPluginState { ): MinPluginState {
const numValue = (this.target ? dependencies?.[this.target] : newValue) as number; const numValue = newValue as number;
if (currentState.min === undefined || numValue < currentState.min) { if (currentState.min === undefined || numValue < currentState.min) {
return { min: numValue }; return { min: numValue };
} }

View File

@ -12,7 +12,7 @@ type RunningAverageState = {
* *
* Tracks the running average of numeric values * Tracks the running average of numeric values
*/ */
export class RunningAveragePlugin implements ResolverPlugin<RunningAverageState, never> { export class RunningAveragePlugin extends ResolverPlugin<RunningAverageState, never> {
readonly dependencies = [] as const; readonly dependencies = [] as const;
initialize(): RunningAverageState { initialize(): RunningAverageState {

View File

@ -5,8 +5,8 @@ import { ResolverPlugin, DependencyStates } from "./plugin";
import { EntityRecord } from "@src/core/entity"; import { EntityRecord } from "@src/core/entity";
import Debug from 'debug'; import Debug from 'debug';
const debug = Debug('rz:resolver'); const debug = Debug('rz:custom-resolver');
const debugState = Debug('rz:resolver:state'); const debugState = Debug('rz:custom-resolver:state');
/** /**
* The state of a property for a single entity * The state of a property for a single entity
@ -56,6 +56,7 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
this.config = config; this.config = config;
this.buildDependencyGraph(); this.buildDependencyGraph();
this.executionOrder = this.calculateExecutionOrder(); this.executionOrder = this.calculateExecutionOrder();
debug(`Execution order: ${this.executionOrder.join(' -> ')}`);
} }
/** /**
@ -63,7 +64,7 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
* @param propertyId The key by which a plugin is registered * @param propertyId The key by which a plugin is registered
* @returns The base name of the plugin * @returns The base name of the plugin
*/ */
pluginBasenameFromKey(propertyId: PropertyID): PropertyID { pluginBasenameFromKey(propertyId: string): string {
return this.config[propertyId]?.name || propertyId; return this.config[propertyId]?.name || propertyId;
} }
@ -72,39 +73,71 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
* @param alias The alias of the plugin * @param alias The alias of the plugin
* @returns The key by which it is registered * @returns The key by which it is registered
*/ */
pluginKeyFromBasename(alias: PropertyID): PropertyID { pluginKeyFromBasename(name: string): string {
const entry = Object.entries(this.config).find(([_, plugin]) => plugin.name === alias); const entry = Object.entries(this.config).find(([_, plugin]) => plugin.name === name);
if (!entry) return alias; if (!entry) return name;
return entry[0]; return entry[0];
} }
private logGraph(): void {
// Log the final dependency graph
const graphLog: Record<string, string[]> = {};
this.dependencyGraph.forEach((deps, plugin) => {
graphLog[plugin] = Array.from(deps);
});
debug(`Dependency graph: ${JSON.stringify(graphLog, null, 2)}`);
}
/** /**
* Build the dependency graph for all plugins. * Build the dependency graph for all plugins.
* We'll use the basenames of the plugins in the graph. * We'll use the basenames of the plugins in the graph.
*/ */
private buildDependencyGraph(): void { private buildDependencyGraph(): void {
debug('Building dependency graph...');
// Initialize the graph with all plugins // Initialize the graph with all plugins
Object.keys(this.config).forEach(propertyId => { Object.keys(this.config).forEach(pluginKey => {
const pluginId = this.pluginBasenameFromKey(propertyId); this.dependencyGraph.set(pluginKey, new Set());
this.dependencyGraph.set(pluginId, new Set()); debug(`Added plugin node: ${pluginKey}`);
}); });
debug('Processing plugin dependencies...');
// Add edges based on dependencies // Add edges based on dependencies
Object.entries(this.config).forEach(([propertyId, plugin]) => { Object.entries(this.config).forEach(([pluginKey, plugin]) => {
const pluginId = this.pluginBasenameFromKey(propertyId); const pluginId = plugin.name || pluginKey;
const deps = plugin.dependencies || []; const deps = plugin.dependencies || [];
if (deps.length === 0) {
debug(`Plugin ${pluginId} has no dependencies`);
} else {
debug(`Plugin ${pluginId} depends on: ${deps.join(', ')}`);
}
deps.forEach((depId: string) => { deps.forEach((depId: string) => {
// This dependency may have an alias in our current config // This dependency may have an alias in our current config
const depKey = this.pluginKeyFromBasename(depId); const depKey = this.pluginKeyFromBasename(depId);
debug(`Processing dependency ${depKey} for plugin ${pluginKey}`);
if (!this.config[depKey]) { if (!this.config[depKey]) {
debug(`Config: ${JSON.stringify(this.config)}`) // TODO: This could still be a property, not a plugin
throw new Error(`Dependency ${depId} not found for plugin ${propertyId}`); const errorMsg = `Dependency ${depKey} not found for plugin ${pluginKey}`;
debug(`Error: ${errorMsg}`);
throw new Error(errorMsg);
} }
this.dependencyGraph.get(depId)?.add(pluginId);
// Add the dependency edge
const dep = this.dependencyGraph.get(depKey)
if (!dep) {
throw new Error(`Dependency ${depKey} not found in dependency graph`);
}
dep.add(pluginKey);
}); });
}); });
debug(`Config: ${JSON.stringify(this.config)}`);
debug(`Dependency graph: ${JSON.stringify(this.dependencyGraph)}`); debug('Dependency graph construction complete');
debug(`Config: ${JSON.stringify(this.config, null, 2)}`);
this.logGraph();
} }
/** /**
@ -162,49 +195,63 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
/** /**
* Get the resolved states of all dependencies for a plugin * Get the resolved states of all dependencies for a plugin
* @param entityState The state of the entity * @param entityPluginStates The state of the entity
* @param dependencies The dependencies to resolve * @param dependencies The dependencies to resolve
* *
*/ */
private getDependencyStates( private getDependencyStates(
entityState: EntityState, entityPluginStates: EntityState,
plugin: ResolverPlugin<unknown, string> pluginKey: string
): DependencyStates { ): DependencyStates {
const dependencyStates = {} as DependencyStates; const plugin = this.config[pluginKey];
if (!plugin) throw new Error(`Plugin ${pluginKey} not found`);
for (const depId of plugin.dependencies || []) { const dependencyStates: DependencyStates = {};
const depKey = this.pluginKeyFromBasename(depId);
for (const depKey of this.executionOrder) {
if (depKey === pluginKey) continue;
const depPlugin = this.config[depKey]; const depPlugin = this.config[depKey];
const depValue = entityState[depKey]; if (depPlugin) {
if (depValue) { if (!entityPluginStates[depKey]) {
// Resolve the dependency's dependencies first dependencyStates[depKey] = depPlugin.initialize(dependencyStates);
const depDependencies = this.getDependencyStates( entityPluginStates[depKey] = dependencyStates[depKey];
entityState, }
depPlugin dependencyStates[depKey] = depPlugin.resolve(entityPluginStates[depKey], dependencyStates);
);
// Resolve the dependency's state
dependencyStates[depId] = depPlugin.resolve(
depValue,
depDependencies
);
} }
} }
return dependencyStates; return dependencyStates;
} }
private initializePlugins(acc: Accumulator, entityId: DomainEntityID) {
if (!acc[entityId]) {
acc[entityId] = {};
}
const entityState = acc[entityId];
for (const pluginKey of this.executionOrder) {
const plugin = this.config[pluginKey];
if (!plugin) throw new Error(`Plugin ${pluginKey} not found`);
// We need to resolve dependencies, including entity properties that are not plugins.
const dependencies = this.getDependencyStates(entityState, pluginKey);
debug('Dependencies for', pluginKey, ':', JSON.stringify(dependencies));
// Initialize the plugin if it hasn't been initialized yet
entityState[pluginKey] = entityState[pluginKey] ?? plugin.initialize(dependencies);
}
return { entityState };
}
/** /**
* Update the state with new deltas from the view * Update the state with new deltas from the view
*/ */
reducer(acc: Accumulator, {id: entityId, propertyDeltas}: LosslessViewOne): Accumulator { reducer(acc: Accumulator, {id: entityId, propertyDeltas}: LosslessViewOne): Accumulator {
debug(`Processing deltas for entity: ${entityId}`); debug(`Processing deltas for entity: ${entityId}`);
debug('Property deltas:', JSON.stringify(propertyDeltas)); debug('Property deltas:', JSON.stringify(propertyDeltas));
if (!acc[entityId]) { const { entityState } = this.initializePlugins(acc, entityId);
acc[entityId] = {};
}
const entityState = acc[entityId];
// Now let's go through each plugin in order. // Now let's go through each plugin in order.
for (const pluginId of this.executionOrder) { for (const pluginId of this.executionOrder) {
@ -214,11 +261,7 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
debug(`Processing plugin: ${pluginId} (key: ${pluginKey})`); debug(`Processing plugin: ${pluginId} (key: ${pluginKey})`);
// We need to resolve dependencies, including entity properties that are not plugins. const pluginState = entityState[pluginKey];
const dependencies = this.getDependencyStates(entityState, plugin);
// Initialize the plugin if it hasn't been initialized yet
const pluginState = entityState[pluginKey] ?? plugin.initialize(dependencies);
const deltaPropertyValues : Record<PropertyID, PropertyTypes> = {}; const deltaPropertyValues : Record<PropertyID, PropertyTypes> = {};
let propertyValue : PropertyTypes | undefined; let propertyValue : PropertyTypes | undefined;
@ -231,6 +274,8 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
// It's possible that there are multiple deltas in this set with the same property ID. // It's possible that there are multiple deltas in this set with the same property ID.
// That can only happen if they are part of a transaction. Otherwise this function is // That can only happen if they are part of a transaction. Otherwise this function is
// only called once per delta, per entity affected. // only called once per delta, per entity affected.
// TODO: More flexible/robust error handling protocols?
// Some views might be more tolerant of errors than others.
throw new Error(`Duplicate property ID ${propertyId} found in delta ${delta.id}`); throw new Error(`Duplicate property ID ${propertyId} found in delta ${delta.id}`);
} }
deltaPropertyValues[propertyId] = pointer[propertyId]; deltaPropertyValues[propertyId] = pointer[propertyId];
@ -245,8 +290,10 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
} }
// Update the plugin state with the new delta // Update the plugin state with the new delta
entityState[pluginKey] = plugin.update(pluginState, propertyValue, updateDelta, dependencies); const dependencies = this.getDependencyStates(entityState, pluginKey);
debugState(`Updated entity state for ${entityId}:`, JSON.stringify(entityState[pluginKey])); entityState[pluginKey] = plugin.applyUpdate(pluginState, propertyValue, updateDelta, dependencies);
debugState(`Updated state for entity ${entityId} plugin ${pluginKey}:`,
JSON.stringify(entityState[pluginKey]));
} }
return acc; return acc;
@ -255,32 +302,36 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
resolver(acc: Accumulator, entityIds: DomainEntityID[]) { resolver(acc: Accumulator, entityIds: DomainEntityID[]) {
const result: Result = {}; const result: Result = {};
debug('Initial accumulator state:', JSON.stringify(acc)); debug('Initial accumulator state:', JSON.stringify(acc));
for (const entityId in acc) { for (const entityId in acc) {
if (!entityIds.includes(entityId)) continue; if (!entityIds.includes(entityId)) continue;
this.initializePlugins(acc, entityId);
result[entityId] = { result[entityId] = {
id: entityId, id: entityId,
properties: {} properties: {}
}; };
for (const propertyId of this.executionOrder) { for (const pluginKey of this.executionOrder) {
const pluginKey = this.pluginKeyFromBasename(propertyId);
const plugin = this.config[pluginKey]; const plugin = this.config[pluginKey];
if (!plugin) throw new Error(`Plugin for property ${propertyId} not found`); if (!plugin) throw new Error(`Plugin ${pluginKey} not found`);
debug(`Processing property: ${propertyId} (key: ${pluginKey})`); debug(`Processing property: ${pluginKey}`);
const dependencies = this.getDependencyStates(acc[entityId], plugin); const dependencies = this.getDependencyStates(acc[entityId], pluginKey);
debug(`Dependencies for ${propertyId}:`, JSON.stringify(dependencies)); debug(`Dependencies for ${pluginKey}:`, JSON.stringify(dependencies));
const state = acc[entityId][pluginKey] || plugin.initialize(dependencies); const state = acc[entityId][pluginKey] || plugin.initialize(dependencies);
debug(`State for ${propertyId}:`, JSON.stringify(state)); debug(`State for ${pluginKey}:`, JSON.stringify(state));
const resolvedValue = plugin.resolve(state, dependencies); const resolvedValue = plugin.resolve(state, dependencies);
if (resolvedValue === undefined) throw new Error(`Resolved value for property ${propertyId} is undefined`) if (resolvedValue === undefined) throw new Error(`Resolved value for property ${pluginKey} is undefined`)
debug(`Resolved value for ${propertyId}:`, resolvedValue); debug(`Resolved value for ${pluginKey}:`, resolvedValue);
result[entityId].properties[pluginKey] = resolvedValue; result[entityId].properties[pluginKey] = resolvedValue;
} }
} }
debug(`Result:`, JSON.stringify(result));
return result; return result;
} }

View File

@ -1,8 +1,7 @@
import { EntityProperties } from "../../core/entity"; import { EntityProperties } from "../../core/entity";
import { Lossless, LosslessViewOne, CollapsedDelta } from "../lossless"; import { Lossless, LosslessViewOne, CollapsedDelta, valueFromCollapsedDelta } from "../lossless";
import { Lossy } from '../lossy'; import { Lossy } from '../lossy';
import { DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany } from "../../core/types"; import { DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany } from "../../core/types";
import { valueFromCollapsedDelta } from "./aggregation-resolvers";
export type TimestampedProperty = { export type TimestampedProperty = {
value: PropertyTypes, value: PropertyTypes,