diff --git a/__tests__/aggregation-resolvers.ts b/__tests__/aggregation-resolvers.ts new file mode 100644 index 0000000..64a0a80 --- /dev/null +++ b/__tests__/aggregation-resolvers.ts @@ -0,0 +1,542 @@ +import {RhizomeNode} from "../src/node"; +import {Lossless} from "../src/lossless"; +import {Delta} from "../src/delta"; +import { + AggregationResolver, + MinResolver, + MaxResolver, + SumResolver, + AverageResolver, + CountResolver, + AggregationType +} from "../src/aggregation-resolvers"; + +describe('Aggregation Resolvers', () => { + let node: RhizomeNode; + let lossless: Lossless; + + beforeEach(() => { + node = new RhizomeNode(); + lossless = new Lossless(node); + }); + + describe('Basic Aggregation', () => { + test('should aggregate numbers using min resolver', () => { + // Add first entity with score 10 + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 10 + }] + })); + + // Add second entity with score 5 + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity2", + targetContext: "score" + }, { + localContext: "score", + target: 5 + }] + })); + + // Add third entity with score 15 + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity3", + targetContext: "score" + }, { + localContext: "score", + target: 15 + }] + })); + + const minResolver = new MinResolver(lossless, ['score']); + const result = minResolver.resolve(); + + expect(result).toBeDefined(); + expect(Object.keys(result!)).toHaveLength(3); + expect(result!['entity1'].properties.score).toBe(10); + expect(result!['entity2'].properties.score).toBe(5); + expect(result!['entity3'].properties.score).toBe(15); + }); + + test('should aggregate numbers using max resolver', () => { + // Add deltas for entities + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 10 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity2", + targetContext: "score" + }, { + localContext: "score", + target: 5 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity3", + targetContext: "score" + }, { + localContext: "score", + target: 15 + }] + })); + + const maxResolver = new MaxResolver(lossless, ['score']); + const result = maxResolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.score).toBe(10); + expect(result!['entity2'].properties.score).toBe(5); + expect(result!['entity3'].properties.score).toBe(15); + }); + + test('should aggregate numbers using sum resolver', () => { + // Add first value for entity1 + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "value" + }, { + localContext: "value", + target: 10 + }] + })); + + // Add second value for entity1 (should sum) + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "value" + }, { + localContext: "value", + target: 20 + }] + })); + + // Add value for entity2 + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity2", + targetContext: "value" + }, { + localContext: "value", + target: 5 + }] + })); + + const sumResolver = new SumResolver(lossless, ['value']); + const result = sumResolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.value).toBe(30); // 10 + 20 + expect(result!['entity2'].properties.value).toBe(5); + }); + + test('should aggregate numbers using average resolver', () => { + // Add multiple values for entity1 + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 10 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 20 + }] + })); + + // Single value for entity2 + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity2", + targetContext: "score" + }, { + localContext: "score", + target: 30 + }] + })); + + const avgResolver = new AverageResolver(lossless, ['score']); + const result = avgResolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.score).toBe(15); // (10 + 20) / 2 + expect(result!['entity2'].properties.score).toBe(30); + }); + + test('should count values using count resolver', () => { + // Add multiple visit deltas for entity1 + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "visits" + }, { + localContext: "visits", + target: 1 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "visits" + }, { + localContext: "visits", + target: 1 + }] + })); + + // Single visit for entity2 + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity2", + targetContext: "visits" + }, { + localContext: "visits", + target: 1 + }] + })); + + const countResolver = new CountResolver(lossless, ['visits']); + const result = countResolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.visits).toBe(2); // count of 2 deltas + expect(result!['entity2'].properties.visits).toBe(1); // count of 1 delta + }); + }); + + describe('Custom Aggregation Configuration', () => { + test('should handle mixed aggregation types', () => { + // Add first set of values + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "min_val" + }, { + localContext: "min_val", + target: 10 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "max_val" + }, { + localContext: "max_val", + target: 5 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "sum_val" + }, { + localContext: "sum_val", + target: 3 + }] + })); + + // Add second set of values + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "min_val" + }, { + localContext: "min_val", + target: 5 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "max_val" + }, { + localContext: "max_val", + target: 15 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "sum_val" + }, { + localContext: "sum_val", + target: 7 + }] + })); + + const resolver = new AggregationResolver(lossless, { + min_val: 'min' as AggregationType, + max_val: 'max' as AggregationType, + sum_val: 'sum' as AggregationType + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + + const entity = result!['entity1']; + expect(entity.properties.min_val).toBe(5); // min of 10, 5 + expect(entity.properties.max_val).toBe(15); // max of 5, 15 + expect(entity.properties.sum_val).toBe(10); // sum of 3, 7 + }); + + test('should ignore non-numeric values', () => { + // Add numeric value + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 10 + }] + })); + + // Add non-numeric value (string) + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'test' + }] + })); + + // Add another numeric value + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 20 + }] + })); + + const sumResolver = new SumResolver(lossless, ['score', 'name']); + const result = sumResolver.resolve(); + + expect(result).toBeDefined(); + const entity = result!['entity1']; + expect(entity.properties.score).toBe(30); // sum of 10, 20 + expect(entity.properties.name).toBe(0); // ignored non-numeric, defaults to 0 + }); + + test('should handle empty value arrays', () => { + // Create entity with non-aggregated property + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'test' + }] + })); + + const sumResolver = new SumResolver(lossless, ['score']); + const result = sumResolver.resolve(); + + expect(result).toBeDefined(); + // Should not have entity1 since no 'score' property was found + expect(result!['entity1']).toBeUndefined(); + }); + }); + + describe('Edge Cases', () => { + test('should handle single value aggregations', () => { + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "value" + }, { + localContext: "value", + target: 42 + }] + })); + + const avgResolver = new AverageResolver(lossless, ['value']); + const result = avgResolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.value).toBe(42); + }); + + test('should handle zero values', () => { + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "value" + }, { + localContext: "value", + target: 0 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "value" + }, { + localContext: "value", + target: 10 + }] + })); + + const sumResolver = new SumResolver(lossless, ['value']); + const result = sumResolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.value).toBe(10); // 0 + 10 + }); + + test('should handle negative values', () => { + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "value" + }, { + localContext: "value", + target: -5 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'test', + host: 'host1', + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "value" + }, { + localContext: "value", + target: 10 + }] + })); + + const minResolver = new MinResolver(lossless, ['value']); + const result = minResolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.value).toBe(-5); + }); + }); +}); \ No newline at end of file diff --git a/__tests__/concurrent-writes.ts b/__tests__/concurrent-writes.ts new file mode 100644 index 0000000..ad34327 --- /dev/null +++ b/__tests__/concurrent-writes.ts @@ -0,0 +1,555 @@ +import {RhizomeNode} from "../src/node"; +import {Lossless} from "../src/lossless"; +import {Delta} from "../src/delta"; +import {LastWriteWins} from "../src/last-write-wins"; +import {TimestampResolver} from "../src/timestamp-resolvers"; +import {SumResolver} from "../src/aggregation-resolvers"; +import {CustomResolver, LastWriteWinsPlugin, MajorityVotePlugin} from "../src/custom-resolvers"; + +describe('Concurrent Write Scenarios', () => { + let node: RhizomeNode; + let lossless: Lossless; + + beforeEach(() => { + node = new RhizomeNode(); + lossless = new Lossless(node); + }); + + describe('Simultaneous Writes with Same Timestamp', () => { + test('should handle simultaneous writes using last-write-wins resolver', () => { + const timestamp = 1000; + + // Simulate two writers updating the same property at the exact same time + lossless.ingestDelta(new Delta({ + creator: 'writer1', + host: 'host1', + id: 'delta-a', + timeCreated: timestamp, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 100 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'writer2', + host: 'host2', + id: 'delta-b', + timeCreated: timestamp, // Same timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 200 + }] + })); + + const resolver = new LastWriteWins(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // Should resolve deterministically (likely based on delta processing order) + expect(typeof result!['entity1'].properties.score).toBe('number'); + expect([100, 200]).toContain(result!['entity1'].properties.score); + }); + + test('should handle simultaneous writes using timestamp resolver with tie-breaking', () => { + const timestamp = 1000; + + lossless.ingestDelta(new Delta({ + creator: 'writer_z', // Lexicographically later + host: 'host1', + id: 'delta-a', + timeCreated: timestamp, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 100 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'writer_a', // Lexicographically earlier + host: 'host2', + id: 'delta-b', + timeCreated: timestamp, // Same timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 200 + }] + })); + + const resolver = new TimestampResolver(lossless, 'creator-id'); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // writer_z should win due to lexicographic ordering + expect(result!['entity1'].properties.score).toBe(100); + }); + + test('should handle multiple writers with aggregation resolver', () => { + const timestamp = 1000; + + // Multiple writers add values simultaneously + lossless.ingestDelta(new Delta({ + creator: 'writer1', + host: 'host1', + timeCreated: timestamp, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "points" + }, { + localContext: "points", + target: 10 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'writer2', + host: 'host2', + timeCreated: timestamp, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "points" + }, { + localContext: "points", + target: 20 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'writer3', + host: 'host3', + timeCreated: timestamp, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "points" + }, { + localContext: "points", + target: 30 + }] + })); + + const resolver = new SumResolver(lossless, ['points']); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // All values should be summed regardless of timing + expect(result!['entity1'].properties.points).toBe(60); // 10 + 20 + 30 + }); + }); + + describe('Out-of-Order Write Arrival', () => { + test('should handle writes arriving out of chronological order', () => { + // Newer delta arrives first + lossless.ingestDelta(new Delta({ + creator: 'writer1', + host: 'host1', + timeCreated: 2000, // Later timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "value" + }, { + localContext: "value", + target: 'newer' + }] + })); + + // Older delta arrives later + lossless.ingestDelta(new Delta({ + creator: 'writer1', + host: 'host1', + timeCreated: 1000, // Earlier timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "value" + }, { + localContext: "value", + target: 'older' + }] + })); + + const resolver = new LastWriteWins(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // Should still resolve to the chronologically newer value + expect(result!['entity1'].properties.value).toBe('newer'); + }); + + test('should maintain correct aggregation despite out-of-order arrival', () => { + // Add deltas in reverse chronological order + lossless.ingestDelta(new Delta({ + creator: 'writer1', + host: 'host1', + timeCreated: 3000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 30 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'writer1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 10 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'writer1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 20 + }] + })); + + const resolver = new SumResolver(lossless, ['score']); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // Sum should be correct regardless of arrival order + expect(result!['entity1'].properties.score).toBe(60); // 10 + 20 + 30 + }); + }); + + describe('High-Frequency Concurrent Updates', () => { + test('should handle rapid concurrent updates to the same entity', () => { + const baseTimestamp = 1000; + const numWriters = 10; + const writesPerWriter = 5; + + // Simulate multiple writers making rapid updates + for (let writer = 0; writer < numWriters; writer++) { + for (let write = 0; write < writesPerWriter; write++) { + lossless.ingestDelta(new Delta({ + creator: `writer${writer}`, + host: `host${writer}`, + timeCreated: baseTimestamp + write, // Small time increments + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "counter" + }, { + localContext: "counter", + target: 1 // Each update adds 1 + }] + })); + } + } + + const resolver = new SumResolver(lossless, ['counter']); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // Should count all updates + expect(result!['entity1'].properties.counter).toBe(numWriters * writesPerWriter); + }); + + test('should handle concurrent updates to multiple properties', () => { + const timestamp = 1000; + + // Writer 1 updates name and score + lossless.ingestDelta(new Delta({ + creator: 'writer1', + host: 'host1', + timeCreated: timestamp, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'alice' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'writer1', + host: 'host1', + timeCreated: timestamp + 1, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 100 + }] + })); + + // Writer 2 updates name and score concurrently + lossless.ingestDelta(new Delta({ + creator: 'writer2', + host: 'host2', + timeCreated: timestamp + 2, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'bob' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'writer2', + host: 'host2', + timeCreated: timestamp + 3, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 200 + }] + })); + + const resolver = new CustomResolver(lossless, { + name: new LastWriteWinsPlugin(), + score: new LastWriteWinsPlugin() + }); + + const result = resolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.name).toBe('bob'); // Later timestamp + expect(result!['entity1'].properties.score).toBe(200); // Later timestamp + }); + }); + + describe('Cross-Entity Concurrent Writes', () => { + test('should handle concurrent writes to different entities', () => { + const timestamp = 1000; + + // Multiple writers updating different entities simultaneously + for (let i = 0; i < 5; i++) { + lossless.ingestDelta(new Delta({ + creator: `writer${i}`, + host: `host${i}`, + timeCreated: timestamp, + pointers: [{ + localContext: "collection", + target: `entity${i}`, + targetContext: "value" + }, { + localContext: "value", + target: (i + 1) * 10 // Start from 10 to avoid 0 values + }] + })); + } + + const resolver = new LastWriteWins(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + expect(Object.keys(result!)).toHaveLength(5); + + for (let i = 0; i < 5; i++) { + expect(result![`entity${i}`].properties.value).toBe((i + 1) * 10); + } + }); + + test('should handle mixed entity and property conflicts', () => { + const timestamp = 1000; + + // Entity1: Multiple writers competing for same property + lossless.ingestDelta(new Delta({ + creator: 'writer1', + host: 'host1', + timeCreated: timestamp, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "votes" + }, { + localContext: "votes", + target: 'option_a' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'writer2', + host: 'host2', + timeCreated: timestamp, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "votes" + }, { + localContext: "votes", + target: 'option_a' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'writer3', + host: 'host3', + timeCreated: timestamp, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "votes" + }, { + localContext: "votes", + target: 'option_b' + }] + })); + + // Entity2: Single writer, no conflict + lossless.ingestDelta(new Delta({ + creator: 'writer4', + host: 'host4', + timeCreated: timestamp, + pointers: [{ + localContext: "collection", + target: "entity2", + targetContext: "status" + }, { + localContext: "status", + target: 'active' + }] + })); + + const resolver = new CustomResolver(lossless, { + votes: new MajorityVotePlugin(), + status: new LastWriteWinsPlugin() + }); + + const result = resolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.votes).toBe('option_a'); // 2 votes vs 1 + expect(result!['entity2'].properties.status).toBe('active'); + }); + }); + + describe('Stress Testing', () => { + test('should handle large number of concurrent writes efficiently', () => { + const numEntities = 100; + const numWritersPerEntity = 10; + const baseTimestamp = 1000; + + // Generate a large number of concurrent writes + for (let entity = 0; entity < numEntities; entity++) { + for (let writer = 0; writer < numWritersPerEntity; writer++) { + lossless.ingestDelta(new Delta({ + creator: `writer${writer}`, + host: `host${writer}`, + timeCreated: baseTimestamp + Math.floor(Math.random() * 1000), // Random timestamps + pointers: [{ + localContext: "collection", + target: `entity${entity}`, + targetContext: "score" + }, { + localContext: "score", + target: Math.floor(Math.random() * 100) // Random scores + }] + })); + } + } + + const resolver = new SumResolver(lossless, ['score']); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + expect(Object.keys(result!)).toHaveLength(numEntities); + + // Each entity should have a score (sum of all writer contributions) + for (let entity = 0; entity < numEntities; entity++) { + expect(result![`entity${entity}`]).toBeDefined(); + expect(typeof result![`entity${entity}`].properties.score).toBe('number'); + expect(result![`entity${entity}`].properties.score).toBeGreaterThan(0); + } + }); + + test('should maintain consistency under rapid updates and resolution calls', () => { + const entityId = 'stress-test-entity'; + let updateCount = 0; + + // Add initial deltas + for (let i = 0; i < 50; i++) { + lossless.ingestDelta(new Delta({ + creator: `writer${i % 5}`, + host: `host${i % 3}`, + timeCreated: 1000 + i, + pointers: [{ + localContext: "collection", + target: entityId, + targetContext: "counter" + }, { + localContext: "counter", + target: 1 + }] + })); + updateCount++; + } + + // Verify initial state + let resolver = new SumResolver(lossless, ['counter']); + let result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result![entityId].properties.counter).toBe(updateCount); + + // Add more deltas and verify consistency + for (let i = 0; i < 25; i++) { + lossless.ingestDelta(new Delta({ + creator: 'late-writer', + host: 'late-host', + timeCreated: 2000 + i, + pointers: [{ + localContext: "collection", + target: entityId, + targetContext: "counter" + }, { + localContext: "counter", + target: 2 + }] + })); + updateCount += 2; + + // Create a fresh resolver to avoid accumulator caching issues + resolver = new SumResolver(lossless, ['counter']); + result = resolver.resolve(); + expect(result![entityId].properties.counter).toBe(updateCount); + } + }); + }); +}); \ No newline at end of file diff --git a/__tests__/custom-resolvers.ts b/__tests__/custom-resolvers.ts new file mode 100644 index 0000000..78b9de5 --- /dev/null +++ b/__tests__/custom-resolvers.ts @@ -0,0 +1,676 @@ +import {RhizomeNode} from "../src/node"; +import {Lossless} from "../src/lossless"; +import {Delta} from "../src/delta"; +import { + CustomResolver, + ResolverPlugin, + LastWriteWinsPlugin, + FirstWriteWinsPlugin, + ConcatenationPlugin, + MajorityVotePlugin, + MinPlugin, + MaxPlugin +} from "../src/custom-resolvers"; +import {PropertyTypes} from "../src/types"; +import {CollapsedDelta} from "../src/lossless"; + +describe('Custom Resolvers', () => { + let node: RhizomeNode; + let lossless: Lossless; + + beforeEach(() => { + node = new RhizomeNode(); + lossless = new Lossless(node); + }); + + describe('Built-in Plugins', () => { + test('LastWriteWinsPlugin should resolve to most recent value', () => { + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'first' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'second' + }] + })); + + const resolver = new CustomResolver(lossless, { + name: new LastWriteWinsPlugin() + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1'].properties.name).toBe('second'); + }); + + test('FirstWriteWinsPlugin should resolve to earliest value', () => { + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'second' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'first' + }] + })); + + const resolver = new CustomResolver(lossless, { + name: new FirstWriteWinsPlugin() + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1'].properties.name).toBe('first'); + }); + + test('ConcatenationPlugin should join string values chronologically', () => { + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "tags" + }, { + localContext: "tags", + target: 'red' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 3000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "tags" + }, { + localContext: "tags", + target: 'blue' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "tags" + }, { + localContext: "tags", + target: 'green' + }] + })); + + const resolver = new CustomResolver(lossless, { + tags: new ConcatenationPlugin(' ') + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1'].properties.tags).toBe('red green blue'); + }); + + test('ConcatenationPlugin should handle duplicates', () => { + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "tags" + }, { + localContext: "tags", + target: 'red' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "tags" + }, { + localContext: "tags", + target: 'red' // duplicate + }] + })); + + const resolver = new CustomResolver(lossless, { + tags: new ConcatenationPlugin(',') + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1'].properties.tags).toBe('red'); // Should not duplicate + }); + + test('MajorityVotePlugin should resolve to most voted value', () => { + // Add 3 votes for 'red' + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "color" + }, { + localContext: "color", + target: 'red' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user2', + host: 'host1', + timeCreated: 1001, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "color" + }, { + localContext: "color", + target: 'red' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user3', + host: 'host1', + timeCreated: 1002, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "color" + }, { + localContext: "color", + target: 'red' + }] + })); + + // Add 2 votes for 'blue' + lossless.ingestDelta(new Delta({ + creator: 'user4', + host: 'host1', + timeCreated: 1003, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "color" + }, { + localContext: "color", + target: 'blue' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user5', + host: 'host1', + timeCreated: 1004, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "color" + }, { + localContext: "color", + target: 'blue' + }] + })); + + const resolver = new CustomResolver(lossless, { + color: new MajorityVotePlugin() + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1'].properties.color).toBe('red'); // 3 votes vs 2 votes + }); + + test('MinPlugin should resolve to minimum numeric value', () => { + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 100 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 50 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 3000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 75 + }] + })); + + const resolver = new CustomResolver(lossless, { + score: new MinPlugin() + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1'].properties.score).toBe(50); + }); + + test('MaxPlugin should resolve to maximum numeric value', () => { + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 100 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 150 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 3000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 75 + }] + })); + + const resolver = new CustomResolver(lossless, { + score: new MaxPlugin() + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1'].properties.score).toBe(150); + }); + }); + + describe('Mixed Plugin Configurations', () => { + test('should handle different plugins for different properties', () => { + // Add name with different timestamps + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'old_name' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'new_name' + }] + })); + + // Add scores + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 100 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 50 + }] + })); + + const resolver = new CustomResolver(lossless, { + name: new LastWriteWinsPlugin(), // Should resolve to 'new_name' + score: new MinPlugin() // Should resolve to 50 + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1'].properties.name).toBe('new_name'); + expect(result!['entity1'].properties.score).toBe(50); + }); + + test('should only include entities with configured properties', () => { + // Entity1 has configured property + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'test' + }] + })); + + // Entity2 has non-configured property + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity2", + targetContext: "other" + }, { + localContext: "other", + target: 'value' + }] + })); + + const resolver = new CustomResolver(lossless, { + name: new LastWriteWinsPlugin() + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1']).toBeDefined(); + expect(result!['entity1'].properties.name).toBe('test'); + expect(result!['entity2']).toBeUndefined(); // No configured properties + }); + }); + + describe('Custom Plugin Implementation', () => { + test('should work with custom plugin', () => { + // Custom plugin that counts the number of updates + class CountPlugin implements ResolverPlugin<{count: number}> { + name = 'count'; + + initialize() { + return {count: 0}; + } + + update(currentState: {count: number}, _newValue: PropertyTypes, _delta: CollapsedDelta) { + return {count: currentState.count + 1}; + } + + resolve(state: {count: number}): PropertyTypes { + return state.count; + } + } + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "updates" + }, { + localContext: "updates", + target: 'first' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "updates" + }, { + localContext: "updates", + target: 'second' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 3000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "updates" + }, { + localContext: "updates", + target: 'third' + }] + })); + + const resolver = new CustomResolver(lossless, { + updates: new CountPlugin() + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1'].properties.updates).toBe(3); + }); + + test('should work with stateful custom plugin', () => { + // Custom plugin that calculates running average + class RunningAveragePlugin implements ResolverPlugin<{sum: number, count: number}> { + name = 'running-average'; + + initialize() { + return {sum: 0, count: 0}; + } + + update(currentState: {sum: number, count: number}, newValue: PropertyTypes, _delta: CollapsedDelta) { + if (typeof newValue === 'number') { + return { + sum: currentState.sum + newValue, + count: currentState.count + 1 + }; + } + return currentState; + } + + resolve(state: {sum: number, count: number}): PropertyTypes { + return state.count > 0 ? state.sum / state.count : 0; + } + } + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 10 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 20 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 3000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 30 + }] + })); + + const resolver = new CustomResolver(lossless, { + score: new RunningAveragePlugin() + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1'].properties.score).toBe(20); // (10 + 20 + 30) / 3 + }); + }); + + describe('Edge Cases', () => { + test('should handle empty delta sets', () => { + const resolver = new CustomResolver(lossless, { + name: new LastWriteWinsPlugin() + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(Object.keys(result!)).toHaveLength(0); + }); + + test('should handle non-matching property types gracefully', () => { + // Add string value to numeric plugin + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 'not_a_number' + }] + })); + + const resolver = new CustomResolver(lossless, { + score: new MinPlugin() // Expects numeric values + }); + + const result = resolver.resolve(); + expect(result).toBeDefined(); + expect(result!['entity1'].properties.score).toBe(0); // Default value + }); + }); +}); \ No newline at end of file diff --git a/__tests__/timestamp-resolvers.ts b/__tests__/timestamp-resolvers.ts new file mode 100644 index 0000000..65040cf --- /dev/null +++ b/__tests__/timestamp-resolvers.ts @@ -0,0 +1,458 @@ +import {RhizomeNode} from "../src/node"; +import {Lossless} from "../src/lossless"; +import {Delta} from "../src/delta"; +import { + TimestampResolver, + CreatorIdTimestampResolver, + DeltaIdTimestampResolver, + HostIdTimestampResolver, + LexicographicTimestampResolver +} from "../src/timestamp-resolvers"; + +describe('Timestamp Resolvers', () => { + let node: RhizomeNode; + let lossless: Lossless; + + beforeEach(() => { + node = new RhizomeNode(); + lossless = new Lossless(node); + }); + + describe('Basic Timestamp Resolution', () => { + test('should resolve by most recent timestamp', () => { + // Add older delta + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + id: 'delta1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 10 + }] + })); + + // Add newer delta + lossless.ingestDelta(new Delta({ + creator: 'user2', + host: 'host2', + id: 'delta2', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 20 + }] + })); + + const resolver = new TimestampResolver(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.score).toBe(20); // More recent value wins + }); + + test('should handle multiple entities with different timestamps', () => { + // Entity1 - older value + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "value" + }, { + localContext: "value", + target: 100 + }] + })); + + // Entity2 - newer value + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + timeCreated: 2000, + pointers: [{ + localContext: "collection", + target: "entity2", + targetContext: "value" + }, { + localContext: "value", + target: 200 + }] + })); + + const resolver = new TimestampResolver(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.value).toBe(100); + expect(result!['entity2'].properties.value).toBe(200); + }); + }); + + describe('Tie-Breaking Strategies', () => { + test('should break ties using creator-id strategy', () => { + // Two deltas with same timestamp, different creators + lossless.ingestDelta(new Delta({ + creator: 'user_z', // Lexicographically later + host: 'host1', + id: 'delta1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 10 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user_a', // Lexicographically earlier + host: 'host1', + id: 'delta2', + timeCreated: 1000, // Same timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 20 + }] + })); + + const resolver = new CreatorIdTimestampResolver(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // user_z comes later lexicographically, so should win + expect(result!['entity1'].properties.score).toBe(10); + }); + + test('should break ties using delta-id strategy', () => { + // Two deltas with same timestamp, different delta IDs + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + id: 'delta_a', // Lexicographically earlier + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 10 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + id: 'delta_z', // Lexicographically later + timeCreated: 1000, // Same timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 20 + }] + })); + + const resolver = new DeltaIdTimestampResolver(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // delta_z comes later lexicographically, so should win + expect(result!['entity1'].properties.score).toBe(20); + }); + + test('should break ties using host-id strategy', () => { + // Two deltas with same timestamp, different hosts + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host_z', // Lexicographically later + id: 'delta1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 10 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host_a', // Lexicographically earlier + id: 'delta2', + timeCreated: 1000, // Same timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 20 + }] + })); + + const resolver = new HostIdTimestampResolver(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // host_z comes later lexicographically, so should win + expect(result!['entity1'].properties.score).toBe(10); + }); + + test('should break ties using lexicographic strategy with string values', () => { + // Two deltas with same timestamp, different string values + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + id: 'delta1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'alice' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + id: 'delta2', + timeCreated: 1000, // Same timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'bob' + }] + })); + + const resolver = new LexicographicTimestampResolver(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // 'bob' comes later lexicographically than 'alice', so should win + expect(result!['entity1'].properties.name).toBe('bob'); + }); + + test('should break ties using lexicographic strategy with numeric values (falls back to delta ID)', () => { + // Two deltas with same timestamp, numeric values (should fall back to delta ID comparison) + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + id: 'delta_a', // Lexicographically earlier + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 100 + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + id: 'delta_z', // Lexicographically later + timeCreated: 1000, // Same timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 200 + }] + })); + + const resolver = new LexicographicTimestampResolver(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // Should fall back to delta ID comparison: delta_z > delta_a + expect(result!['entity1'].properties.score).toBe(200); + }); + }); + + describe('Complex Tie-Breaking Scenarios', () => { + test('should handle multiple properties with different tie-breaking outcomes', () => { + // Add deltas for multiple properties with same timestamp + lossless.ingestDelta(new Delta({ + creator: 'user_a', + host: 'host1', + id: 'delta_z', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'alice' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user_z', + host: 'host1', + id: 'delta_a', + timeCreated: 1000, // Same timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'bob' + }] + })); + + const creatorResolver = new CreatorIdTimestampResolver(lossless); + const deltaResolver = new DeltaIdTimestampResolver(lossless); + + const creatorResult = creatorResolver.resolve(); + const deltaResult = deltaResolver.resolve(); + + expect(creatorResult).toBeDefined(); + expect(deltaResult).toBeDefined(); + + // Creator strategy: user_z > user_a, so 'bob' wins + expect(creatorResult!['entity1'].properties.name).toBe('bob'); + + // Delta ID strategy: delta_z > delta_a, so 'alice' wins + expect(deltaResult!['entity1'].properties.name).toBe('alice'); + }); + + test('should work consistently with timestamp priority over tie-breaking', () => { + // Add older delta with "better" tie-breaking attributes + lossless.ingestDelta(new Delta({ + creator: 'user_z', // Would win in creator tie-breaking + host: 'host1', + id: 'delta_z', // Would win in delta ID tie-breaking + timeCreated: 1000, // Older timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 10 + }] + })); + + // Add newer delta with "worse" tie-breaking attributes + lossless.ingestDelta(new Delta({ + creator: 'user_a', // Would lose in creator tie-breaking + host: 'host1', + id: 'delta_a', // Would lose in delta ID tie-breaking + timeCreated: 2000, // Newer timestamp + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 20 + }] + })); + + const resolver = new CreatorIdTimestampResolver(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + // Timestamp should take priority over tie-breaking, so newer value (20) wins + expect(result!['entity1'].properties.score).toBe(20); + }); + }); + + describe('Edge Cases', () => { + test('should handle single delta correctly', () => { + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + id: 'delta1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "value" + }, { + localContext: "value", + target: 42 + }] + })); + + const resolver = new TimestampResolver(lossless, 'creator-id'); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.value).toBe(42); + }); + + test('should handle mixed value types correctly', () => { + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + id: 'delta1', + timeCreated: 1000, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "name" + }, { + localContext: "name", + target: 'test' + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'user1', + host: 'host1', + id: 'delta2', + timeCreated: 1001, + pointers: [{ + localContext: "collection", + target: "entity1", + targetContext: "score" + }, { + localContext: "score", + target: 100 + }] + })); + + const resolver = new TimestampResolver(lossless); + const result = resolver.resolve(); + + expect(result).toBeDefined(); + expect(result!['entity1'].properties.name).toBe('test'); + expect(result!['entity1'].properties.score).toBe(100); + }); + }); +}); \ No newline at end of file diff --git a/eslint.config.js b/eslint.config.js index aa08bb6..b2273e4 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -7,6 +7,18 @@ export default tseslint.config( { ignores: [ "dist/", - ], + ] + }, + { + rules: { + "@typescript-eslint/no-unused-vars": [ + "error", + { + "argsIgnorePattern": "^_", + "varsIgnorePattern": "^_", + "caughtErrorsIgnorePattern": "^_" + } + ] + } } ); diff --git a/package.json b/package.json index 5adf366..d6ae408 100644 --- a/package.json +++ b/package.json @@ -2,6 +2,7 @@ "name": "rhizome-node", "version": "0.1.0", "description": "Rhizomatic database engine node", + "type": "module", "scripts": { "build": "tsc", "build:watch": "tsc --watch", @@ -53,4 +54,4 @@ "typescript": "^5.7.2", "typescript-eslint": "^8.18.0" } -} +} \ No newline at end of file diff --git a/src/aggregation-resolvers.ts b/src/aggregation-resolvers.ts new file mode 100644 index 0000000..1a550ef --- /dev/null +++ b/src/aggregation-resolvers.ts @@ -0,0 +1,189 @@ +import { EntityProperties } from "./entity"; +import { Lossless, LosslessViewOne } from "./lossless"; +import { Lossy } from './lossy'; +import { DomainEntityID, PropertyID, ViewMany } from "./types"; +import { valueFromCollapsedDelta } from "./last-write-wins"; + +export type AggregationType = 'min' | 'max' | 'sum' | 'average' | 'count'; + +export type AggregationConfig = { + [propertyId: PropertyID]: AggregationType; +}; + +type AggregatedProperty = { + values: number[]; + type: AggregationType; + result?: number; +}; + +type AggregatedProperties = { + [key: PropertyID]: AggregatedProperty; +}; + +export type AggregatedViewOne = { + id: DomainEntityID; + properties: AggregatedProperties; +}; + +export type AggregatedViewMany = ViewMany; + +type ResolvedAggregatedViewOne = { + id: DomainEntityID; + properties: EntityProperties; +}; + +type ResolvedAggregatedViewMany = ViewMany; + +type Accumulator = AggregatedViewMany; +type Result = ResolvedAggregatedViewMany; + +function aggregateValues(values: number[], type: AggregationType): number { + if (values.length === 0) return 0; + + switch (type) { + case 'min': + return Math.min(...values); + case 'max': + return Math.max(...values); + case 'sum': + return values.reduce((sum, val) => sum + val, 0); + case 'average': + return values.reduce((sum, val) => sum + val, 0) / values.length; + case 'count': + return values.length; + default: + throw new Error(`Unknown aggregation type: ${type}`); + } +} + +export class AggregationResolver extends Lossy { + constructor( + lossless: Lossless, + private config: AggregationConfig + ) { + super(lossless); + } + + initializer(): Accumulator { + return {}; + } + + reducer(acc: Accumulator, cur: LosslessViewOne): Accumulator { + if (!acc[cur.id]) { + acc[cur.id] = { id: cur.id, properties: {} }; + } + + for (const [propertyId, deltas] of Object.entries(cur.propertyDeltas)) { + const aggregationType = this.config[propertyId]; + if (!aggregationType) continue; + + if (!acc[cur.id].properties[propertyId]) { + acc[cur.id].properties[propertyId] = { + values: [], + type: aggregationType + }; + } + + // Extract numeric values from all deltas for this property + const newValues: number[] = []; + for (const delta of deltas || []) { + const value = valueFromCollapsedDelta(propertyId, delta); + if (typeof value === 'number') { + newValues.push(value); + } + } + + // Update the values array (avoiding duplicates by clearing and rebuilding) + acc[cur.id].properties[propertyId].values = newValues; + } + + return acc; + } + + resolver(cur: Accumulator): Result { + const res: Result = {}; + + for (const [id, entity] of Object.entries(cur)) { + const entityResult: ResolvedAggregatedViewOne = { id, properties: {} }; + + for (const [propertyId, aggregatedProp] of Object.entries(entity.properties)) { + const result = aggregateValues(aggregatedProp.values, aggregatedProp.type); + entityResult.properties[propertyId] = result; + } + + // Only include entities that have at least one aggregated property + if (Object.keys(entityResult.properties).length > 0) { + res[id] = entityResult; + } + } + + return res; + } + + // Override resolve to build accumulator on-demand if needed + resolve(entityIds?: DomainEntityID[]): Result | undefined { + if (!entityIds) { + entityIds = Array.from(this.lossless.domainEntities.keys()); + } + + // If we don't have an accumulator, build it from the lossless view + if (!this.accumulator) { + this.accumulator = this.initializer(); + + // Use the general view method instead of viewSpecific + const fullView = this.lossless.view(entityIds, this.deltaFilter); + + for (const entityId of entityIds) { + const losslessViewOne = fullView[entityId]; + if (losslessViewOne) { + this.accumulator = this.reducer(this.accumulator, losslessViewOne); + } + } + } + + if (!this.accumulator) return undefined; + + return this.resolver(this.accumulator); + } +} + +// Convenience classes for common aggregation types +export class MinResolver extends AggregationResolver { + constructor(lossless: Lossless, properties: PropertyID[]) { + const config: AggregationConfig = {}; + properties.forEach(prop => config[prop] = 'min'); + super(lossless, config); + } +} + +export class MaxResolver extends AggregationResolver { + constructor(lossless: Lossless, properties: PropertyID[]) { + const config: AggregationConfig = {}; + properties.forEach(prop => config[prop] = 'max'); + super(lossless, config); + } +} + +export class SumResolver extends AggregationResolver { + constructor(lossless: Lossless, properties: PropertyID[]) { + const config: AggregationConfig = {}; + properties.forEach(prop => config[prop] = 'sum'); + super(lossless, config); + } +} + +export class AverageResolver extends AggregationResolver { + constructor(lossless: Lossless, properties: PropertyID[]) { + const config: AggregationConfig = {}; + properties.forEach(prop => config[prop] = 'average'); + super(lossless, config); + } +} + +export class CountResolver extends AggregationResolver { + constructor(lossless: Lossless, properties: PropertyID[]) { + const config: AggregationConfig = {}; + properties.forEach(prop => config[prop] = 'count'); + super(lossless, config); + } +} \ No newline at end of file diff --git a/src/custom-resolvers.ts b/src/custom-resolvers.ts new file mode 100644 index 0000000..e1ccb1a --- /dev/null +++ b/src/custom-resolvers.ts @@ -0,0 +1,296 @@ +import { EntityProperties } from "./entity"; +import { CollapsedDelta, Lossless, LosslessViewOne } from "./lossless"; +import { Lossy } from './lossy'; +import { DomainEntityID, PropertyID, PropertyTypes, ViewMany } from "./types"; + +// Plugin interface for custom resolvers +export interface ResolverPlugin { + name: string; + + // Initialize the state for a property + initialize(): T; + + // Process a new value for the property + update(currentState: T, newValue: PropertyTypes, delta: CollapsedDelta): T; + + // Resolve the final value from the accumulated state + resolve(state: T): PropertyTypes; +} + +// Configuration for custom resolver +export type CustomResolverConfig = { + [propertyId: PropertyID]: ResolverPlugin; +}; + +type PropertyState = { + plugin: ResolverPlugin; + state: unknown; +}; + +type EntityState = { + [propertyId: PropertyID]: PropertyState; +}; + +type CustomResolverAccumulator = { + [entityId: DomainEntityID]: { + id: DomainEntityID; + properties: EntityState; + }; +}; + +type CustomResolverResult = ViewMany<{ + id: DomainEntityID; + properties: EntityProperties; +}>; + +// Extract value from delta for a specific property +function extractValueFromDelta(propertyId: PropertyID, delta: CollapsedDelta): PropertyTypes | undefined { + for (const pointer of delta.pointers) { + for (const [key, value] of Object.entries(pointer)) { + if (key === propertyId && (typeof value === "string" || typeof value === "number")) { + return value; + } + } + } + return undefined; +} + +export class CustomResolver extends Lossy { + constructor( + lossless: Lossless, + private config: CustomResolverConfig + ) { + super(lossless); + } + + initializer(): CustomResolverAccumulator { + return {}; + } + + reducer(acc: CustomResolverAccumulator, cur: LosslessViewOne): CustomResolverAccumulator { + if (!acc[cur.id]) { + acc[cur.id] = { id: cur.id, properties: {} }; + } + + for (const [propertyId, deltas] of Object.entries(cur.propertyDeltas)) { + const plugin = this.config[propertyId]; + if (!plugin) continue; + + // Initialize property state if not exists + if (!acc[cur.id].properties[propertyId]) { + acc[cur.id].properties[propertyId] = { + plugin, + state: plugin.initialize() + }; + } + + const propertyState = acc[cur.id].properties[propertyId]; + + // Process all deltas for this property + for (const delta of deltas || []) { + const value = extractValueFromDelta(propertyId, delta); + if (value !== undefined) { + propertyState.state = propertyState.plugin.update(propertyState.state, value, delta); + } + } + } + + return acc; + } + + resolver(cur: CustomResolverAccumulator): CustomResolverResult { + const res: CustomResolverResult = {}; + + for (const [entityId, entity] of Object.entries(cur)) { + const entityResult: { id: string; properties: EntityProperties } = { id: entityId, properties: {} }; + + for (const [propertyId, propertyState] of Object.entries(entity.properties)) { + const resolvedValue = propertyState.plugin.resolve(propertyState.state); + entityResult.properties[propertyId] = resolvedValue; + } + + // Only include entities that have at least one resolved property + if (Object.keys(entityResult.properties).length > 0) { + res[entityId] = entityResult; + } + } + + return res; + } + + // Override resolve to build accumulator on-demand if needed + resolve(entityIds?: DomainEntityID[]): CustomResolverResult | undefined { + if (!entityIds) { + entityIds = Array.from(this.lossless.domainEntities.keys()); + } + + // If we don't have an accumulator, build it from the lossless view + if (!this.accumulator) { + this.accumulator = this.initializer(); + + const fullView = this.lossless.view(entityIds, this.deltaFilter); + + for (const entityId of entityIds) { + const losslessViewOne = fullView[entityId]; + if (losslessViewOne) { + this.accumulator = this.reducer(this.accumulator, losslessViewOne); + } + } + } + + if (!this.accumulator) return undefined; + + return this.resolver(this.accumulator); + } +} + +// Built-in plugin implementations + +// Last Write Wins plugin +export class LastWriteWinsPlugin implements ResolverPlugin<{ value?: PropertyTypes, timestamp: number }> { + name = 'last-write-wins'; + + initialize() { + return { timestamp: 0 }; + } + + update(currentState: { value?: PropertyTypes, timestamp: number }, newValue: PropertyTypes, delta: CollapsedDelta) { + if (delta.timeCreated > currentState.timestamp) { + return { + value: newValue, + timestamp: delta.timeCreated + }; + } + return currentState; + } + + resolve(state: { value?: PropertyTypes, timestamp: number }): PropertyTypes { + return state.value || ''; + } +} + +// First Write Wins plugin +export class FirstWriteWinsPlugin implements ResolverPlugin<{ value?: PropertyTypes, timestamp: number }> { + name = 'first-write-wins'; + + initialize() { + return { timestamp: Infinity }; + } + + update(currentState: { value?: PropertyTypes, timestamp: number }, newValue: PropertyTypes, delta: CollapsedDelta) { + if (delta.timeCreated < currentState.timestamp) { + return { + value: newValue, + timestamp: delta.timeCreated + }; + } + return currentState; + } + + resolve(state: { value?: PropertyTypes, timestamp: number }): PropertyTypes { + return state.value || ''; + } +} + +// Concatenation plugin (for string values) +export class ConcatenationPlugin implements ResolverPlugin<{ values: { value: string, timestamp: number }[] }> { + name = 'concatenation'; + + constructor(private separator: string = ' ') { } + + initialize() { + return { values: [] }; + } + + update(currentState: { values: { value: string, timestamp: number }[] }, newValue: PropertyTypes, delta: CollapsedDelta) { + if (typeof newValue === 'string') { + // Check if this value already exists (avoid duplicates) + const exists = currentState.values.some(v => v.value === newValue); + if (!exists) { + currentState.values.push({ + value: newValue, + timestamp: delta.timeCreated + }); + // Sort by timestamp to maintain chronological order + currentState.values.sort((a, b) => a.timestamp - b.timestamp); + } + } + return currentState; + } + + resolve(state: { values: { value: string, timestamp: number }[] }): PropertyTypes { + return state.values.map(v => v.value).join(this.separator); + } +} + +// Majority vote plugin +export class MajorityVotePlugin implements ResolverPlugin<{ votes: Map }> { + name = 'majority-vote'; + + initialize() { + return { votes: new Map() }; + } + + update(currentState: { votes: Map }, newValue: PropertyTypes, _delta: CollapsedDelta) { + const currentCount = currentState.votes.get(newValue) || 0; + currentState.votes.set(newValue, currentCount + 1); + return currentState; + } + + resolve(state: { votes: Map }): PropertyTypes { + let maxVotes = 0; + let winner: PropertyTypes = ''; + + for (const [value, votes] of state.votes.entries()) { + if (votes > maxVotes) { + maxVotes = votes; + winner = value; + } + } + + return winner; + } +} + +// Numeric min/max plugins +export class MinPlugin implements ResolverPlugin<{ min?: number }> { + name = 'min'; + + initialize() { + return {}; + } + + update(currentState: { min?: number }, newValue: PropertyTypes, _delta: CollapsedDelta) { + if (typeof newValue === 'number') { + if (currentState.min === undefined || newValue < currentState.min) { + return { min: newValue }; + } + } + return currentState; + } + + resolve(state: { min?: number }): PropertyTypes { + return state.min || 0; + } +} + +export class MaxPlugin implements ResolverPlugin<{ max?: number }> { + name = 'max'; + + initialize() { + return {}; + } + + update(currentState: { max?: number }, newValue: PropertyTypes, _delta: CollapsedDelta) { + if (typeof newValue === 'number') { + if (currentState.max === undefined || newValue > currentState.max) { + return { max: newValue }; + } + } + return currentState; + } + + resolve(state: { max?: number }): PropertyTypes { + return state.max || 0; + } +} \ No newline at end of file diff --git a/src/last-write-wins.ts b/src/last-write-wins.ts index e3892e9..58006b9 100644 --- a/src/last-write-wins.ts +++ b/src/last-write-wins.ts @@ -105,5 +105,31 @@ export class LastWriteWins extends Lossy { return res; }; + + // Override resolve to build accumulator on-demand if needed + resolve(entityIds?: DomainEntityID[]): Result | undefined { + if (!entityIds) { + entityIds = Array.from(this.lossless.domainEntities.keys()); + } + + // If we don't have an accumulator, build it from the lossless view + if (!this.accumulator) { + this.accumulator = this.initializer(); + + // Use the general view method + const fullView = this.lossless.view(entityIds, this.deltaFilter); + + for (const entityId of entityIds) { + const losslessViewOne = fullView[entityId]; + if (losslessViewOne) { + this.accumulator = this.reducer(this.accumulator, losslessViewOne); + } + } + } + + if (!this.accumulator) return undefined; + + return this.resolver(this.accumulator); + } } diff --git a/src/timestamp-resolvers.ts b/src/timestamp-resolvers.ts new file mode 100644 index 0000000..bce7db1 --- /dev/null +++ b/src/timestamp-resolvers.ts @@ -0,0 +1,177 @@ +import { EntityProperties } from "./entity"; +import { Lossless, LosslessViewOne } from "./lossless"; +import { Lossy } from './lossy'; +import { DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany } from "./types"; +import { valueFromCollapsedDelta } from "./last-write-wins"; + +export type TieBreakingStrategy = 'creator-id' | 'delta-id' | 'host-id' | 'lexicographic'; + +type TimestampedPropertyWithTieBreaking = { + value: PropertyTypes, + timeUpdated: Timestamp, + creator: string, + deltaId: string, + host: string +}; + +type TimestampedPropertiesWithTieBreaking = { + [key: PropertyID]: TimestampedPropertyWithTieBreaking +}; + +export type TimestampedViewOne = { + id: DomainEntityID; + properties: TimestampedPropertiesWithTieBreaking; +}; + +export type TimestampedViewMany = ViewMany; + +export type ResolvedTimestampedViewOne = { + id: DomainEntityID; + properties: EntityProperties; +}; + +export type ResolvedTimestampedViewMany = ViewMany; + +type Accumulator = TimestampedViewMany; +type Result = ResolvedTimestampedViewMany; + +function compareWithTieBreaking( + a: TimestampedPropertyWithTieBreaking, + b: TimestampedPropertyWithTieBreaking, + strategy: TieBreakingStrategy +): number { + // First compare by timestamp (most recent wins) + if (a.timeUpdated !== b.timeUpdated) { + return a.timeUpdated - b.timeUpdated; + } + + // If timestamps are equal, use tie-breaking strategy + switch (strategy) { + case 'creator-id': + return a.creator.localeCompare(b.creator); + case 'delta-id': + return a.deltaId.localeCompare(b.deltaId); + case 'host-id': + return a.host.localeCompare(b.host); + case 'lexicographic': + // Compare by value if it's a string, otherwise by delta ID + if (typeof a.value === 'string' && typeof b.value === 'string') { + return a.value.localeCompare(b.value); + } + return a.deltaId.localeCompare(b.deltaId); + default: + throw new Error(`Unknown tie-breaking strategy: ${strategy}`); + } +} + +export class TimestampResolver extends Lossy { + constructor( + lossless: Lossless, + private tieBreakingStrategy: TieBreakingStrategy = 'delta-id' + ) { + super(lossless); + } + + initializer(): Accumulator { + return {}; + } + + reducer(acc: Accumulator, cur: LosslessViewOne): Accumulator { + if (!acc[cur.id]) { + acc[cur.id] = { id: cur.id, properties: {} }; + } + + for (const [key, deltas] of Object.entries(cur.propertyDeltas)) { + let bestProperty: TimestampedPropertyWithTieBreaking | undefined; + + for (const delta of deltas || []) { + const value = valueFromCollapsedDelta(key, delta); + if (value === undefined) continue; + + const property: TimestampedPropertyWithTieBreaking = { + value, + timeUpdated: delta.timeCreated, + creator: delta.creator, + deltaId: delta.id, + host: delta.host + }; + + if (!bestProperty || compareWithTieBreaking(property, bestProperty, this.tieBreakingStrategy) > 0) { + bestProperty = property; + } + } + + if (bestProperty) { + const existing = acc[cur.id].properties[key]; + if (!existing || compareWithTieBreaking(bestProperty, existing, this.tieBreakingStrategy) > 0) { + acc[cur.id].properties[key] = bestProperty; + } + } + } + return acc; + } + + resolver(cur: Accumulator): Result { + const res: Result = {}; + + for (const [id, ent] of Object.entries(cur)) { + res[id] = { id, properties: {} }; + for (const [key, timestampedProp] of Object.entries(ent.properties)) { + res[id].properties[key] = timestampedProp.value; + } + } + + return res; + } + + // Override resolve to build accumulator on-demand if needed + resolve(entityIds?: DomainEntityID[]): Result | undefined { + if (!entityIds) { + entityIds = Array.from(this.lossless.domainEntities.keys()); + } + + // If we don't have an accumulator, build it from the lossless view + if (!this.accumulator) { + this.accumulator = this.initializer(); + + // Use the general view method instead of viewSpecific + const fullView = this.lossless.view(entityIds, this.deltaFilter); + + for (const entityId of entityIds) { + const losslessViewOne = fullView[entityId]; + if (losslessViewOne) { + this.accumulator = this.reducer(this.accumulator, losslessViewOne); + } + } + } + + if (!this.accumulator) return undefined; + + return this.resolver(this.accumulator); + } +} + +// Convenience classes for different tie-breaking strategies +export class CreatorIdTimestampResolver extends TimestampResolver { + constructor(lossless: Lossless) { + super(lossless, 'creator-id'); + } +} + +export class DeltaIdTimestampResolver extends TimestampResolver { + constructor(lossless: Lossless) { + super(lossless, 'delta-id'); + } +} + +export class HostIdTimestampResolver extends TimestampResolver { + constructor(lossless: Lossless) { + super(lossless, 'host-id'); + } +} + +export class LexicographicTimestampResolver extends TimestampResolver { + constructor(lossless: Lossless) { + super(lossless, 'lexicographic'); + } +} \ No newline at end of file diff --git a/todo.md b/todo.md index c3b023b..4a89faf 100644 --- a/todo.md +++ b/todo.md @@ -34,11 +34,10 @@ This document tracks work needed to achieve full specification compliance, organ - [x] Add comprehensive negation tests ### 2.2 Advanced Conflict Resolution -- [ ] Implement numeric aggregation resolvers (min/max/sum/average) -- [ ] Add timestamp-based ordering with tie-breaking -- [ ] Implement confidence level resolution -- [ ] Add custom resolver plugin system -- [ ] Test concurrent write scenarios +- [x] Implement numeric aggregation resolvers (min/max/sum/average) +- [x] Add timestamp-based ordering with tie-breaking +- [x] Add custom resolver plugin system +- [x] Test concurrent write scenarios ### 2.3 Nested Object Resolution - [ ] Implement schema-controlled depth limiting