- Refactored delta creation in test files to use createDelta() pattern - Replaced direct Delta instantiations with fluent builder API - Updated relationship deltas to use setProperty with proper entity context - Ensured all tests pass with the new delta creation approach This is part of the ongoing effort to standardize on the DeltaBuilder API across the codebase for better consistency and maintainability.
382 lines
13 KiB
TypeScript
382 lines
13 KiB
TypeScript
import { createDelta } from '../src/core/delta-builder';
|
|
import {
|
|
RhizomeNode,
|
|
Lossless,
|
|
Delta,
|
|
LastWriteWins,
|
|
TimestampResolver,
|
|
SumResolver,
|
|
CustomResolver,
|
|
LastWriteWinsPlugin,
|
|
MajorityVotePlugin
|
|
} from "../src";
|
|
|
|
describe('Concurrent Write Scenarios', () => {
|
|
let node: RhizomeNode;
|
|
let lossless: Lossless;
|
|
|
|
beforeEach(() => {
|
|
node = new RhizomeNode();
|
|
lossless = new Lossless(node);
|
|
});
|
|
|
|
describe('Simultaneous Writes with Same Timestamp', () => {
|
|
test('should handle simultaneous writes using last-write-wins resolver', () => {
|
|
const timestamp = 1000;
|
|
|
|
// Simulate two writers updating the same property at the exact same time
|
|
lossless.ingestDelta(createDelta('writer1', 'host1')
|
|
.withId('delta-a')
|
|
.withTimestamp(timestamp)
|
|
.setProperty('entity1', 'score', 100, 'collection')
|
|
.buildV1()
|
|
);
|
|
|
|
lossless.ingestDelta(createDelta('writer2', 'host2')
|
|
.withId('delta-b')
|
|
.withTimestamp(timestamp) // Same timestamp
|
|
.setProperty('entity1', 'score', 200, 'collection')
|
|
.buildV1()
|
|
);
|
|
|
|
const resolver = new LastWriteWins(lossless);
|
|
const result = resolver.resolve();
|
|
|
|
expect(result).toBeDefined();
|
|
// Should resolve deterministically using the LastWriteWins resolver's tie-breaking algorithm
|
|
expect(typeof result!['entity1'].properties.score).toBe('number');
|
|
expect([100, 200]).toContain(result!['entity1'].properties.score);
|
|
});
|
|
|
|
test('should handle simultaneous writes using timestamp resolver with tie-breaking', () => {
|
|
const timestamp = 1000;
|
|
|
|
lossless.ingestDelta(createDelta('writer_z', 'host1') // Lexicographically later
|
|
.withId('delta-a')
|
|
.withTimestamp(timestamp)
|
|
.setProperty('entity1', 'score', 100, 'collection')
|
|
.buildV1()
|
|
);
|
|
|
|
lossless.ingestDelta(createDelta('writer_a', 'host2') // Lexicographically earlier
|
|
.withId('delta-b')
|
|
.withTimestamp(timestamp) // Same timestamp
|
|
.setProperty('entity1', 'score', 200, 'collection')
|
|
.buildV1()
|
|
);
|
|
|
|
const resolver = new TimestampResolver(lossless, 'creator-id');
|
|
const result = resolver.resolve();
|
|
|
|
expect(result).toBeDefined();
|
|
// writer_z should win due to lexicographic ordering
|
|
expect(result!['entity1'].properties.score).toBe(100);
|
|
});
|
|
|
|
test('should handle multiple writers with aggregation resolver', () => {
|
|
const timestamp = 1000;
|
|
|
|
// Multiple writers add values simultaneously
|
|
lossless.ingestDelta(createDelta('writer1', 'host1')
|
|
.withTimestamp(1000)
|
|
.setProperty('entity1', 'points', 10, 'collection')
|
|
.buildV1()
|
|
);
|
|
|
|
lossless.ingestDelta(createDelta('writer2', 'host2')
|
|
.withTimestamp(1000) // Same timestamp
|
|
.setProperty('entity1', 'points', 20, 'collection')
|
|
.buildV1()
|
|
);
|
|
|
|
// Third writer adds another value
|
|
lossless.ingestDelta(createDelta('writer3', 'host3')
|
|
.withTimestamp(1000) // Same timestamp
|
|
.setProperty('entity1', 'points', 30, 'collection')
|
|
.buildV1()
|
|
);
|
|
|
|
const resolver = new SumResolver(lossless, ['points']);
|
|
const result = resolver.resolve();
|
|
|
|
expect(result).toBeDefined();
|
|
// All values should be summed regardless of timing
|
|
expect(result!['entity1'].properties.points).toBe(60); // 10 + 20 + 30
|
|
});
|
|
});
|
|
|
|
describe('Out-of-Order Write Arrival', () => {
|
|
test('should handle writes arriving out of chronological order', () => {
|
|
// Newer delta arrives first
|
|
lossless.ingestDelta(createDelta('writer1', 'host1')
|
|
.withTimestamp(2000)
|
|
.addPointer('collection', 'entity1', 'value')
|
|
.addPointer('value', 'newer')
|
|
.buildV1()
|
|
);
|
|
|
|
// Older delta arrives later
|
|
lossless.ingestDelta(createDelta('writer1', 'host1')
|
|
.withTimestamp(1000)
|
|
.addPointer('collection', 'entity1', 'value')
|
|
.addPointer('value', 'older')
|
|
.buildV1()
|
|
);
|
|
|
|
const resolver = new LastWriteWins(lossless);
|
|
const result = resolver.resolve();
|
|
|
|
expect(result).toBeDefined();
|
|
// Should still resolve to the chronologically newer value
|
|
expect(result!['entity1'].properties.value).toBe('newer');
|
|
});
|
|
|
|
test('should maintain correct aggregation despite out-of-order arrival', () => {
|
|
// Add deltas in reverse chronological order
|
|
lossless.ingestDelta(createDelta('writer1', 'host1')
|
|
.withTimestamp(3000)
|
|
.addPointer('collection', 'entity1', 'score')
|
|
.addPointer('score', 30)
|
|
.buildV1()
|
|
);
|
|
|
|
lossless.ingestDelta(createDelta('writer1', 'host1')
|
|
.withTimestamp(1000)
|
|
.addPointer('collection', 'entity1', 'score')
|
|
.addPointer('score', 10)
|
|
.buildV1()
|
|
);
|
|
|
|
lossless.ingestDelta(createDelta('writer1', 'host1')
|
|
.withTimestamp(2000)
|
|
.addPointer('collection', 'entity1', 'score')
|
|
.addPointer('score', 20)
|
|
.buildV1()
|
|
);
|
|
|
|
const resolver = new SumResolver(lossless, ['score']);
|
|
const result = resolver.resolve();
|
|
|
|
expect(result).toBeDefined();
|
|
// Sum should be correct regardless of arrival order
|
|
expect(result!['entity1'].properties.score).toBe(60); // 10 + 20 + 30
|
|
});
|
|
});
|
|
|
|
describe('High-Frequency Concurrent Updates', () => {
|
|
test('should handle rapid concurrent updates to the same entity', () => {
|
|
const baseTimestamp = 1000;
|
|
const numWriters = 10;
|
|
const writesPerWriter = 5;
|
|
|
|
// Simulate multiple writers making rapid updates
|
|
for (let writer = 0; writer < numWriters; writer++) {
|
|
for (let write = 0; write < writesPerWriter; write++) {
|
|
lossless.ingestDelta(createDelta(`writer${writer}`, `host${writer}`)
|
|
.withTimestamp(baseTimestamp + write)
|
|
.addPointer('collection', 'entity1', 'counter')
|
|
.addPointer('counter', 1)
|
|
.buildV1()
|
|
);
|
|
}
|
|
}
|
|
|
|
const resolver = new SumResolver(lossless, ['counter']);
|
|
const result = resolver.resolve();
|
|
|
|
expect(result).toBeDefined();
|
|
// Should count all updates
|
|
expect(result!['entity1'].properties.counter).toBe(numWriters * writesPerWriter);
|
|
});
|
|
|
|
test('should handle concurrent updates to multiple properties', () => {
|
|
const timestamp = 1000;
|
|
|
|
// Writer 1 updates name and score
|
|
lossless.ingestDelta(createDelta('writer1', 'host1')
|
|
.withTimestamp(timestamp)
|
|
.addPointer('collection', 'entity1', 'name')
|
|
.addPointer('name', 'alice')
|
|
.buildV1()
|
|
);
|
|
|
|
lossless.ingestDelta(createDelta('writer1', 'host1')
|
|
.withTimestamp(timestamp + 1)
|
|
.addPointer('collection', 'entity1', 'score')
|
|
.addPointer('score', 100)
|
|
.buildV1()
|
|
);
|
|
|
|
// Writer 2 updates name and score concurrently
|
|
lossless.ingestDelta(createDelta('writer2', 'host2')
|
|
.withTimestamp(timestamp + 2)
|
|
.addPointer('collection', 'entity1', 'name')
|
|
.addPointer('name', 'bob')
|
|
.buildV1()
|
|
);
|
|
|
|
lossless.ingestDelta(createDelta('writer2', 'host2')
|
|
.withTimestamp(timestamp + 3)
|
|
.addPointer('collection', 'entity1', 'score')
|
|
.addPointer('score', 200)
|
|
.buildV1()
|
|
);
|
|
|
|
const resolver = new CustomResolver(lossless, {
|
|
name: new LastWriteWinsPlugin(),
|
|
score: new LastWriteWinsPlugin()
|
|
});
|
|
|
|
const result = resolver.resolve();
|
|
|
|
expect(result).toBeDefined();
|
|
expect(result!['entity1'].properties.name).toBe('bob'); // Later timestamp
|
|
expect(result!['entity1'].properties.score).toBe(200); // Later timestamp
|
|
});
|
|
});
|
|
|
|
describe('Cross-Entity Concurrent Writes', () => {
|
|
test('should handle concurrent writes to different entities', () => {
|
|
const timestamp = 1000;
|
|
|
|
// Multiple writers updating different entities simultaneously
|
|
for (let i = 0; i < 5; i++) {
|
|
lossless.ingestDelta(createDelta(`writer${i}`, `host${i}`)
|
|
.withTimestamp(timestamp)
|
|
.addPointer('collection', `entity${i}`, 'value')
|
|
.addPointer('value', (i + 1) * 10)
|
|
.buildV1()
|
|
);
|
|
}
|
|
|
|
const resolver = new LastWriteWins(lossless);
|
|
const result = resolver.resolve();
|
|
|
|
expect(result).toBeDefined();
|
|
expect(Object.keys(result!)).toHaveLength(5);
|
|
|
|
for (let i = 0; i < 5; i++) {
|
|
expect(result![`entity${i}`].properties.value).toBe((i + 1) * 10);
|
|
}
|
|
});
|
|
|
|
test('should handle mixed entity and property conflicts', () => {
|
|
const timestamp = 1000;
|
|
|
|
// Entity1: Multiple writers competing for same property
|
|
lossless.ingestDelta(createDelta('writer1', 'host1')
|
|
.withTimestamp(timestamp)
|
|
.addPointer('collection', 'entity1', 'votes')
|
|
.addPointer('votes', 'option_a')
|
|
.buildV1()
|
|
);
|
|
|
|
lossless.ingestDelta(createDelta('writer2', 'host2')
|
|
.withTimestamp(timestamp)
|
|
.addPointer('collection', 'entity1', 'votes')
|
|
.addPointer('votes', 'option_a')
|
|
.buildV1()
|
|
);
|
|
|
|
lossless.ingestDelta(createDelta('writer3', 'host3')
|
|
.withTimestamp(timestamp)
|
|
.addPointer('collection', 'entity1', 'votes')
|
|
.addPointer('votes', 'option_b')
|
|
.buildV1()
|
|
);
|
|
|
|
// Entity2: Single writer, no conflict
|
|
lossless.ingestDelta(createDelta('writer4', 'host4')
|
|
.withTimestamp(timestamp)
|
|
.addPointer('collection', 'entity2', 'status')
|
|
.addPointer('status', 'active')
|
|
.buildV1()
|
|
);
|
|
|
|
const resolver = new CustomResolver(lossless, {
|
|
votes: new MajorityVotePlugin(),
|
|
status: new LastWriteWinsPlugin()
|
|
});
|
|
|
|
const result = resolver.resolve();
|
|
|
|
expect(result).toBeDefined();
|
|
expect(result!['entity1'].properties.votes).toBe('option_a'); // 2 votes vs 1
|
|
expect(result!['entity2'].properties.status).toBe('active');
|
|
});
|
|
});
|
|
|
|
describe('Stress Testing', () => {
|
|
test('should handle large number of concurrent writes efficiently', () => {
|
|
const numEntities = 100;
|
|
const numWritersPerEntity = 10;
|
|
const baseTimestamp = 1000;
|
|
|
|
// Generate a large number of concurrent writes
|
|
for (let entity = 0; entity < numEntities; entity++) {
|
|
for (let writer = 0; writer < numWritersPerEntity; writer++) {
|
|
lossless.ingestDelta(createDelta(`writer${writer}`, `host${writer}`)
|
|
.withTimestamp(baseTimestamp + Math.floor(Math.random() * 1000))
|
|
.addPointer('collection', `entity${entity}`, 'score')
|
|
.addPointer('score', Math.floor(Math.random() * 100))
|
|
.buildV1()
|
|
);
|
|
}
|
|
}
|
|
|
|
const resolver = new SumResolver(lossless, ['score']);
|
|
const result = resolver.resolve();
|
|
|
|
expect(result).toBeDefined();
|
|
expect(Object.keys(result!)).toHaveLength(numEntities);
|
|
|
|
// Each entity should have a score (sum of all writer contributions)
|
|
for (let entity = 0; entity < numEntities; entity++) {
|
|
expect(result![`entity${entity}`]).toBeDefined();
|
|
expect(typeof result![`entity${entity}`].properties.score).toBe('number');
|
|
expect(result![`entity${entity}`].properties.score).toBeGreaterThan(0);
|
|
}
|
|
});
|
|
|
|
test('should maintain consistency under rapid updates and resolution calls', () => {
|
|
const entityId = 'stress-test-entity';
|
|
let updateCount = 0;
|
|
|
|
// Add initial deltas
|
|
for (let i = 0; i < 50; i++) {
|
|
lossless.ingestDelta(createDelta(
|
|
`writer${i % 5}`,
|
|
`host${i % 3}`
|
|
)
|
|
.withTimestamp(1000 + i)
|
|
.addPointer('collection', entityId, 'counter')
|
|
.addPointer('counter', 1)
|
|
.buildV1()
|
|
);
|
|
updateCount++;
|
|
}
|
|
|
|
// Verify initial state
|
|
let resolver = new SumResolver(lossless, ['counter']);
|
|
let result = resolver.resolve();
|
|
expect(result).toBeDefined();
|
|
expect(result![entityId].properties.counter).toBe(updateCount);
|
|
|
|
// Add more deltas and verify consistency
|
|
for (let i = 0; i < 25; i++) {
|
|
lossless.ingestDelta(createDelta('late-writer', 'late-host')
|
|
.withTimestamp(2000 + i)
|
|
.addPointer('collection', entityId, 'counter')
|
|
.addPointer('counter', 2)
|
|
.buildV1()
|
|
);
|
|
updateCount += 2;
|
|
|
|
// Create a fresh resolver to avoid accumulator caching issues
|
|
resolver = new SumResolver(lossless, ['counter']);
|
|
result = resolver.resolve();
|
|
expect(result![entityId].properties.counter).toBe(updateCount);
|
|
}
|
|
});
|
|
});
|
|
}); |