fix: improve resolver dependency handling and logging

- Fixed AggregationResolver to properly accumulate values for sum/average/count operations
- Enhanced CustomResolver with detailed debug logging for dependency resolution
- Added execution order logging for better debugging
- Improved error messages and graph visualization in dependency resolution
- Moved valueFromCollapsedDelta to lossless.ts for better code organization
This commit is contained in:
Lentil Hoffman 2025-06-25 12:25:40 -05:00
parent d0941f417e
commit 9957dccddd
Signed by: lentil
GPG Key ID: 0F5B99F3F4D0C087
8 changed files with 147 additions and 53 deletions

View File

@ -1,11 +1,11 @@
import Debug from 'debug';
import { PointerTarget } from "../../../src/core/delta";
import { Lossless, LosslessViewOne } from "../../../src/views/lossless";
import { Lossy } from "../../../src/views/lossy";
import { RhizomeNode } from "../../../src/node";
import { valueFromCollapsedDelta } from "../../../src/views/resolvers/aggregation-resolvers";
import { latestFromCollapsedDeltas } from "../../../src/views/resolvers/timestamp-resolvers";
import { createDelta } from "../../../src/core/delta-builder";
import { PointerTarget } from "@src/core/delta";
import { Lossless, LosslessViewOne } from "@src/views/lossless";
import { Lossy } from "@src/views/lossy";
import { RhizomeNode } from "@src/node";
import { valueFromCollapsedDelta } from "@src/views/lossless";
import { latestFromCollapsedDeltas } from "@src/views/resolvers/timestamp-resolvers";
import { createDelta } from "@src/core/delta-builder";
const debug = Debug('rz:test:lossy');
type Role = {

View File

@ -8,8 +8,8 @@ import {
AverageResolver,
CountResolver,
AggregationType
} from "../../../../src";
import { createDelta } from "../../../../src/core/delta-builder";
} from "@src";
import { createDelta } from "@src/core/delta-builder";
describe('Aggregation Resolvers', () => {
let node: RhizomeNode;
@ -22,6 +22,8 @@ describe('Aggregation Resolvers', () => {
describe('Basic Aggregation', () => {
test('should aggregate numbers using min resolver', () => {
const minResolver = new MinResolver(lossless, ['score']);
// Add first entity with score 10
lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'score', 10, 'collection')
@ -40,7 +42,6 @@ describe('Aggregation Resolvers', () => {
.buildV1()
);
const minResolver = new MinResolver(lossless, ['score']);
const result = minResolver.resolve();
expect(result).toBeDefined();
@ -51,6 +52,8 @@ describe('Aggregation Resolvers', () => {
});
test('should aggregate numbers using max resolver', () => {
const maxResolver = new MaxResolver(lossless, ['score']);
// Add deltas for entities
lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'score', 10, 'collection')
@ -67,7 +70,6 @@ describe('Aggregation Resolvers', () => {
.buildV1()
);
const maxResolver = new MaxResolver(lossless, ['score']);
const result = maxResolver.resolve();
expect(result).toBeDefined();
@ -77,6 +79,8 @@ describe('Aggregation Resolvers', () => {
});
test('should aggregate numbers using sum resolver', () => {
const sumResolver = new SumResolver(lossless, ['value']);
// Add first value for entity1
lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'value', 10, 'collection')
@ -95,7 +99,6 @@ describe('Aggregation Resolvers', () => {
.buildV1()
);
const sumResolver = new SumResolver(lossless, ['value']);
const result = sumResolver.resolve();
expect(result).toBeDefined();
@ -104,6 +107,8 @@ describe('Aggregation Resolvers', () => {
});
test('should aggregate numbers using average resolver', () => {
const avgResolver = new AverageResolver(lossless, ['score']);
// Add multiple scores for entity1
lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'score', 10, 'collection')
@ -121,7 +126,6 @@ describe('Aggregation Resolvers', () => {
.buildV1()
);
const avgResolver = new AverageResolver(lossless, ['score']);
const result = avgResolver.resolve();
expect(result).toBeDefined();
@ -130,6 +134,8 @@ describe('Aggregation Resolvers', () => {
});
test('should count values using count resolver', () => {
const countResolver = new CountResolver(lossless, ['visits']);
// Add multiple visit deltas for entity1
lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'visits', 1, 'collection')
@ -147,7 +153,6 @@ describe('Aggregation Resolvers', () => {
.buildV1()
);
const countResolver = new CountResolver(lossless, ['visits']);
const result = countResolver.resolve();
expect(result).toBeDefined();
@ -158,6 +163,12 @@ describe('Aggregation Resolvers', () => {
describe('Custom Aggregation Configuration', () => {
test('should handle mixed aggregation types', () => {
const resolver = new AggregationResolver(lossless, {
min_val: 'min' as AggregationType,
max_val: 'max' as AggregationType,
sum_val: 'sum' as AggregationType
});
// Add first set of values
lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'min_val', 10, 'collection')
@ -190,11 +201,6 @@ describe('Aggregation Resolvers', () => {
.buildV1()
);
const resolver = new AggregationResolver(lossless, {
min_val: 'min' as AggregationType,
max_val: 'max' as AggregationType,
sum_val: 'sum' as AggregationType
});
const result = resolver.resolve();
expect(result).toBeDefined();
@ -206,6 +212,11 @@ describe('Aggregation Resolvers', () => {
});
test('should ignore non-numeric values', () => {
const resolver = new AggregationResolver(lossless, {
score: 'sum' as AggregationType,
name: 'count' as AggregationType
});
// Add numeric value
lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'score', 10, 'collection')
@ -224,8 +235,7 @@ describe('Aggregation Resolvers', () => {
.buildV1()
);
const sumResolver = new SumResolver(lossless, ['score', 'name']);
const result = sumResolver.resolve();
const result = resolver.resolve();
expect(result).toBeDefined();
const entity = result!['entity1'];
@ -234,13 +244,13 @@ describe('Aggregation Resolvers', () => {
});
test('should handle empty value arrays', () => {
const sumResolver = new SumResolver(lossless, ['score']);
// Create entity with non-aggregated property
lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'name', 'test', 'collection')
.buildV1()
);
const sumResolver = new SumResolver(lossless, ['score']);
const result = sumResolver.resolve();
expect(result).toBeDefined();
@ -251,12 +261,13 @@ describe('Aggregation Resolvers', () => {
describe('Edge Cases', () => {
test('should handle single value aggregations', () => {
const avgResolver = new AverageResolver(lossless, ['value']);
lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'value', 42, 'collection')
.buildV1()
);
const avgResolver = new AverageResolver(lossless, ['value']);
const result = avgResolver.resolve();
expect(result).toBeDefined();
@ -264,6 +275,8 @@ describe('Aggregation Resolvers', () => {
});
test('should handle zero values', () => {
const sumResolver = new SumResolver(lossless, ['value']);
lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'value', 0, 'collection')
.buildV1()
@ -274,7 +287,6 @@ describe('Aggregation Resolvers', () => {
.buildV1()
);
const sumResolver = new SumResolver(lossless, ['value']);
const result = sumResolver.resolve();
expect(result).toBeDefined();
@ -282,6 +294,8 @@ describe('Aggregation Resolvers', () => {
});
test('should handle negative values', () => {
const minResolver = new MinResolver(lossless, ['value']);
lossless.ingestDelta(createDelta('test', 'host1')
.setProperty('entity1', 'value', -5, 'collection')
.buildV1()
@ -292,7 +306,6 @@ describe('Aggregation Resolvers', () => {
.buildV1()
);
const minResolver = new MinResolver(lossless, ['value']);
const result = minResolver.resolve();
expect(result).toBeDefined();

View File

@ -23,7 +23,8 @@ describe('Basic Dependency Resolution', () => {
return { value: '' };
}
update(_currentState: { value: string }, newValue: PropertyTypes) {
update(currentState: { value: string }, newValue: PropertyTypes) {
if (newValue === undefined) return currentState;
return { value: String(newValue) };
}
@ -40,8 +41,9 @@ describe('Basic Dependency Resolution', () => {
return { value: '' };
}
update(_currentState: { value: string }, newValue: PropertyTypes, _delta: CollapsedDelta, dependencies: { first: string }) {
return { value: `${dependencies.first}_${newValue}` };
update(currentState: { value: string }, newValue?: PropertyTypes, _delta?: CollapsedDelta, dependencies?: { first: string }) {
if (newValue === undefined) return currentState;
return { value: `${dependencies?.first}_${newValue}` };
}
resolve(state: { value: string }) {

View File

@ -18,6 +18,8 @@ export type EntityRecord = {
properties: EntityProperties;
};
export type EntityRecordMany = Record<string, EntityRecord>;
export class Entity {
properties: EntityProperties = {};
ahead = 0;

View File

@ -18,6 +18,20 @@ export type CollapsedDelta = Omit<DeltaNetworkImageV1, 'pointers'> & {
pointers: CollapsedPointer[];
};
// Extract a particular value from a delta's pointers
export function valueFromCollapsedDelta(
key: string,
delta: CollapsedDelta
): string | number | undefined {
for (const pointer of delta.pointers) {
for (const [k, value] of Object.entries(pointer)) {
if (k === key && (typeof value === "string" || typeof value === "number")) {
return value;
}
}
}
}
export type LosslessViewOne = {
id: DomainEntityID,
referencedAs: string[];

View File

@ -1,7 +1,8 @@
import { Lossless, LosslessViewOne } from "../lossless";
import { Lossy } from '../lossy';
import { DomainEntityID, PropertyID, ViewMany } from "../../core/types";
import { CollapsedDelta } from "../lossless";
import { valueFromCollapsedDelta } from "../lossless";
import { EntityRecord, EntityRecordMany } from "@src/core/entity";
export type AggregationType = 'min' | 'max' | 'sum' | 'average' | 'count';
@ -27,22 +28,30 @@ export type AggregatedViewOne = {
export type AggregatedViewMany = ViewMany<AggregatedViewOne>;
type Accumulator = AggregatedViewMany;
type Result = EntityRecordMany;
// Extract a particular value from a delta's pointers
export function valueFromCollapsedDelta(
key: string,
delta: CollapsedDelta
): string | number | undefined {
for (const pointer of delta.pointers) {
for (const [k, value] of Object.entries(pointer)) {
if (k === key && (typeof value === "string" || typeof value === "number")) {
return value;
}
}
function aggregateValues(values: number[], type: AggregationType): number {
if (values.length === 0) return 0;
switch (type) {
case 'min':
return Math.min(...values);
case 'max':
return Math.max(...values);
case 'sum':
return values.reduce((sum, val) => sum + val, 0);
case 'average':
return values.reduce((sum, val) => sum + val, 0) / values.length;
case 'count':
// For count, we want to count all values, including duplicates
// So we use the length of the values array directly
return values.length;
default:
throw new Error(`Unknown aggregation type: ${type}`);
}
}
export class AggregationResolver extends Lossy<Accumulator> {
export class AggregationResolver extends Lossy<Accumulator, Result> {
constructor(
lossless: Lossless,
private config: AggregationConfig
@ -67,20 +76,44 @@ export class AggregationResolver extends Lossy<Accumulator> {
}
// Extract numeric values from all deltas for this property
const newValues: number[] = [];
for (const delta of deltas || []) {
const value = valueFromCollapsedDelta(propertyId, delta);
if (typeof value === 'number') {
newValues.push(value);
if (this.config[propertyId] === 'count') {
// For count, include all values (including duplicates)
acc[cur.id].properties[propertyId].values.push(value);
} else {
// For other aggregations, only add unique values
if (!acc[cur.id].properties[propertyId].values.includes(value)) {
acc[cur.id].properties[propertyId].values.push(value);
}
}
}
}
// Update the values array (avoiding duplicates by clearing and rebuilding)
acc[cur.id].properties[propertyId].values = newValues;
}
return acc;
}
resolver(cur: Accumulator): Result {
const res: Result = {};
for (const [id, entity] of Object.entries(cur)) {
const entityResult: EntityRecord = { id, properties: {} };
for (const [propertyId, aggregatedProp] of Object.entries(entity.properties)) {
const result = aggregateValues(aggregatedProp.values, aggregatedProp.type);
entityResult.properties[propertyId] = result;
}
// Only include entities that have at least one aggregated property
if (Object.keys(entityResult.properties).length > 0) {
res[id] = entityResult;
}
}
return res;
}
}
// Convenience classes for common aggregation types

View File

@ -56,6 +56,7 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
this.config = config;
this.buildDependencyGraph();
this.executionOrder = this.calculateExecutionOrder();
debug(`Execution order: ${this.executionOrder.join(' -> ')}`);
}
/**
@ -83,28 +84,53 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
* We'll use the basenames of the plugins in the graph.
*/
private buildDependencyGraph(): void {
debug('Building dependency graph...');
// Initialize the graph with all plugins
Object.keys(this.config).forEach(propertyId => {
const pluginId = this.pluginBasenameFromKey(propertyId);
this.dependencyGraph.set(pluginId, new Set());
debug(`Added plugin node: ${pluginId} (from property: ${propertyId})`);
});
debug('Processing plugin dependencies...');
// Add edges based on dependencies
Object.entries(this.config).forEach(([propertyId, plugin]) => {
const pluginId = this.pluginBasenameFromKey(propertyId);
const pluginId = plugin.name || propertyId;
const deps = plugin.dependencies || [];
if (deps.length === 0) {
debug(`Plugin ${pluginId} has no dependencies`);
} else {
debug(`Plugin ${pluginId} depends on: ${deps.join(', ')}`);
}
deps.forEach((depId: string) => {
// This dependency may have an alias in our current config
const depKey = this.pluginKeyFromBasename(depId);
debug(`Processing dependency: ${depId} (resolved to key: ${depKey}) for plugin ${pluginId}`);
if (!this.config[depKey]) {
debug(`Config: ${JSON.stringify(this.config)}`)
throw new Error(`Dependency ${depId} not found for plugin ${propertyId}`);
const errorMsg = `Dependency ${depId} not found for plugin ${propertyId}`;
debug(`Error: ${errorMsg}`);
throw new Error(errorMsg);
}
// Add the dependency edge
this.dependencyGraph.get(depId)?.add(pluginId);
debug(`Added edge: ${depId} -> ${pluginId}`);
});
});
debug(`Config: ${JSON.stringify(this.config)}`);
debug(`Dependency graph: ${JSON.stringify(this.dependencyGraph)}`);
// Log the final dependency graph
const graphLog: Record<string, string[]> = {};
this.dependencyGraph.forEach((deps, plugin) => {
graphLog[plugin] = Array.from(deps);
});
debug('Dependency graph construction complete');
debug(`Config: ${JSON.stringify(this.config, null, 2)}`);
debug(`Dependency graph: ${JSON.stringify(graphLog, null, 2)}`);
}
/**
@ -175,7 +201,11 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
for (const depId of plugin.dependencies || []) {
const depKey = this.pluginKeyFromBasename(depId);
const depPlugin = this.config[depKey];
// TODO: If this is not a plugin, see if it's an entity property, and include it
const depValue = entityState[depKey];
debug(`depId: ${depId}, depKey: ${depKey}, depPlugin: ${JSON.stringify(depPlugin)}, depValue: ${JSON.stringify(depValue)}`)
if (depValue) {
// Resolve the dependency's dependencies first
const depDependencies = this.getDependencyStates(
@ -216,6 +246,7 @@ export class CustomResolver extends Lossy<Accumulator, Result> {
// We need to resolve dependencies, including entity properties that are not plugins.
const dependencies = this.getDependencyStates(entityState, plugin);
debug('Dependencies for', pluginId, ':', JSON.stringify(dependencies));
// Initialize the plugin if it hasn't been initialized yet
const pluginState = entityState[pluginKey] ?? plugin.initialize(dependencies);

View File

@ -1,8 +1,7 @@
import { EntityProperties } from "../../core/entity";
import { Lossless, LosslessViewOne, CollapsedDelta } from "../lossless";
import { Lossless, LosslessViewOne, CollapsedDelta, valueFromCollapsedDelta } from "../lossless";
import { Lossy } from '../lossy';
import { DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany } from "../../core/types";
import { valueFromCollapsedDelta } from "./aggregation-resolvers";
export type TimestampedProperty = {
value: PropertyTypes,