rhizome-node/__tests__/performance/concurrent-writes.test.ts

378 lines
13 KiB
TypeScript

import { createDelta } from '@src/core/delta-builder';
import {
RhizomeNode,
Lossless,
SumResolver,
CustomResolver,
LastWriteWinsPlugin,
MajorityVotePlugin,
TimestampResolver
} from "../../src";
describe('Concurrent Write Scenarios', () => {
let node: RhizomeNode;
let lossless: Lossless;
beforeEach(() => {
node = new RhizomeNode();
lossless = new Lossless(node);
});
describe('Simultaneous Writes with Same Timestamp', () => {
test('should handle simultaneous writes using last-write-wins resolver', () => {
const timestamp = 1000;
// Simulate two writers updating the same property at the exact same time
lossless.ingestDelta(createDelta('writer1', 'host1')
.withId('delta-a')
.withTimestamp(timestamp)
.setProperty('entity1', 'score', 100, 'collection')
.buildV1()
);
lossless.ingestDelta(createDelta('writer2', 'host2')
.withId('delta-b')
.withTimestamp(timestamp) // Same timestamp
.setProperty('entity1', 'score', 200, 'collection')
.buildV1()
);
const resolver = new TimestampResolver(lossless);
const result = resolver.resolve();
expect(result).toBeDefined();
// Should resolve deterministically using the LastWriteWins resolver's tie-breaking algorithm
expect(typeof result!['entity1'].properties.score).toBe('number');
expect([100, 200]).toContain(result!['entity1'].properties.score);
});
test('should handle simultaneous writes using timestamp resolver with tie-breaking', () => {
const timestamp = 1000;
lossless.ingestDelta(createDelta('writer_z', 'host1') // Lexicographically later
.withId('delta-a')
.withTimestamp(timestamp)
.setProperty('entity1', 'score', 100, 'collection')
.buildV1()
);
lossless.ingestDelta(createDelta('writer_a', 'host2') // Lexicographically earlier
.withId('delta-b')
.withTimestamp(timestamp) // Same timestamp
.setProperty('entity1', 'score', 200, 'collection')
.buildV1()
);
const resolver = new TimestampResolver(lossless, 'creator-id');
const result = resolver.resolve();
expect(result).toBeDefined();
// writer_z should win due to lexicographic ordering
expect(result!['entity1'].properties.score).toBe(100);
});
test('should handle multiple writers with aggregation resolver', () => {
// Multiple writers add values simultaneously
lossless.ingestDelta(createDelta('writer1', 'host1')
.withTimestamp(1000)
.setProperty('entity1', 'points', 10, 'collection')
.buildV1()
);
lossless.ingestDelta(createDelta('writer2', 'host2')
.withTimestamp(1000) // Same timestamp
.setProperty('entity1', 'points', 20, 'collection')
.buildV1()
);
// Third writer adds another value
lossless.ingestDelta(createDelta('writer3', 'host3')
.withTimestamp(1000) // Same timestamp
.setProperty('entity1', 'points', 30, 'collection')
.buildV1()
);
const resolver = new SumResolver(lossless, ['points']);
const result = resolver.resolve();
expect(result).toBeDefined();
// All values should be summed regardless of timing
expect(result!['entity1'].properties.points).toBe(60); // 10 + 20 + 30
});
});
describe('Out-of-Order Write Arrival', () => {
test('should handle writes arriving out of chronological order', () => {
// Newer delta arrives first
lossless.ingestDelta(createDelta('writer1', 'host1')
.withTimestamp(2000)
.addPointer('collection', 'entity1', 'value')
.addPointer('value', 'newer')
.buildV1()
);
// Older delta arrives later
lossless.ingestDelta(createDelta('writer1', 'host1')
.withTimestamp(1000)
.addPointer('collection', 'entity1', 'value')
.addPointer('value', 'older')
.buildV1()
);
const resolver = new TimestampResolver(lossless);
const result = resolver.resolve();
expect(result).toBeDefined();
// Should still resolve to the chronologically newer value
expect(result!['entity1'].properties.value).toBe('newer');
});
test('should maintain correct aggregation despite out-of-order arrival', () => {
// Add deltas in reverse chronological order
lossless.ingestDelta(createDelta('writer1', 'host1')
.withTimestamp(3000)
.addPointer('collection', 'entity1', 'score')
.addPointer('score', 30)
.buildV1()
);
lossless.ingestDelta(createDelta('writer1', 'host1')
.withTimestamp(1000)
.addPointer('collection', 'entity1', 'score')
.addPointer('score', 10)
.buildV1()
);
lossless.ingestDelta(createDelta('writer1', 'host1')
.withTimestamp(2000)
.addPointer('collection', 'entity1', 'score')
.addPointer('score', 20)
.buildV1()
);
const resolver = new SumResolver(lossless, ['score']);
const result = resolver.resolve();
expect(result).toBeDefined();
// Sum should be correct regardless of arrival order
expect(result!['entity1'].properties.score).toBe(60); // 10 + 20 + 30
});
});
describe('High-Frequency Concurrent Updates', () => {
test('should handle rapid concurrent updates to the same entity', () => {
const baseTimestamp = 1000;
const numWriters = 10;
const writesPerWriter = 5;
// Simulate multiple writers making rapid updates
for (let writer = 0; writer < numWriters; writer++) {
for (let write = 0; write < writesPerWriter; write++) {
lossless.ingestDelta(createDelta(`writer${writer}`, `host${writer}`)
.withTimestamp(baseTimestamp + write)
.addPointer('collection', 'entity1', 'counter')
.addPointer('counter', 1)
.buildV1()
);
}
}
const resolver = new SumResolver(lossless, ['counter']);
const result = resolver.resolve();
expect(result).toBeDefined();
// Should count all updates
expect(result!['entity1'].properties.counter).toBe(numWriters * writesPerWriter);
});
test('should handle concurrent updates to multiple properties', () => {
const timestamp = 1000;
// Writer 1 updates name and score
lossless.ingestDelta(createDelta('writer1', 'host1')
.withTimestamp(timestamp)
.addPointer('collection', 'entity1', 'name')
.addPointer('name', 'alice')
.buildV1()
);
lossless.ingestDelta(createDelta('writer1', 'host1')
.withTimestamp(timestamp + 1)
.addPointer('collection', 'entity1', 'score')
.addPointer('score', 100)
.buildV1()
);
// Writer 2 updates name and score concurrently
lossless.ingestDelta(createDelta('writer2', 'host2')
.withTimestamp(timestamp + 2)
.addPointer('collection', 'entity1', 'name')
.addPointer('name', 'bob')
.buildV1()
);
lossless.ingestDelta(createDelta('writer2', 'host2')
.withTimestamp(timestamp + 3)
.addPointer('collection', 'entity1', 'score')
.addPointer('score', 200)
.buildV1()
);
const resolver = new CustomResolver(lossless, {
name: new LastWriteWinsPlugin(),
score: new LastWriteWinsPlugin()
});
const result = resolver.resolve();
expect(result).toBeDefined();
expect(result!['entity1'].properties.name).toBe('bob'); // Later timestamp
expect(result!['entity1'].properties.score).toBe(200); // Later timestamp
});
});
describe('Cross-Entity Concurrent Writes', () => {
test('should handle concurrent writes to different entities', () => {
const timestamp = 1000;
// Multiple writers updating different entities simultaneously
for (let i = 0; i < 5; i++) {
lossless.ingestDelta(createDelta(`writer${i}`, `host${i}`)
.withTimestamp(timestamp)
.addPointer('collection', `entity${i}`, 'value')
.addPointer('value', (i + 1) * 10)
.buildV1()
);
}
const resolver = new TimestampResolver(lossless);
const result = resolver.resolve();
expect(result).toBeDefined();
expect(Object.keys(result!)).toHaveLength(5);
for (let i = 0; i < 5; i++) {
expect(result![`entity${i}`].properties.value).toBe((i + 1) * 10);
}
});
test('should handle mixed entity and property conflicts', () => {
const timestamp = 1000;
// Entity1: Multiple writers competing for same property
lossless.ingestDelta(createDelta('writer1', 'host1')
.withTimestamp(timestamp)
.addPointer('collection', 'entity1', 'votes')
.addPointer('votes', 'option_a')
.buildV1()
);
lossless.ingestDelta(createDelta('writer2', 'host2')
.withTimestamp(timestamp)
.addPointer('collection', 'entity1', 'votes')
.addPointer('votes', 'option_a')
.buildV1()
);
lossless.ingestDelta(createDelta('writer3', 'host3')
.withTimestamp(timestamp)
.addPointer('collection', 'entity1', 'votes')
.addPointer('votes', 'option_b')
.buildV1()
);
// Entity2: Single writer, no conflict
lossless.ingestDelta(createDelta('writer4', 'host4')
.withTimestamp(timestamp)
.addPointer('collection', 'entity2', 'status')
.addPointer('status', 'active')
.buildV1()
);
const resolver = new CustomResolver(lossless, {
votes: new MajorityVotePlugin(),
status: new LastWriteWinsPlugin()
});
const result = resolver.resolve();
expect(result).toBeDefined();
expect(result!['entity1'].properties.votes).toBe('option_a'); // 2 votes vs 1
expect(result!['entity2'].properties.status).toBe('active');
});
});
describe('Stress Testing', () => {
test('should handle large number of concurrent writes efficiently', () => {
const numEntities = 100;
const numWritersPerEntity = 10;
const baseTimestamp = 1000;
// Generate a large number of concurrent writes
for (let entity = 0; entity < numEntities; entity++) {
for (let writer = 0; writer < numWritersPerEntity; writer++) {
lossless.ingestDelta(createDelta(`writer${writer}`, `host${writer}`)
.withTimestamp(baseTimestamp + Math.floor(Math.random() * 1000))
.addPointer('collection', `entity${entity}`, 'score')
.addPointer('score', Math.floor(Math.random() * 100))
.buildV1()
);
}
}
const resolver = new SumResolver(lossless, ['score']);
const result = resolver.resolve();
expect(result).toBeDefined();
expect(Object.keys(result!)).toHaveLength(numEntities);
// Each entity should have a score (sum of all writer contributions)
for (let entity = 0; entity < numEntities; entity++) {
expect(result![`entity${entity}`]).toBeDefined();
expect(typeof result![`entity${entity}`].properties.score).toBe('number');
expect(result![`entity${entity}`].properties.score).toBeGreaterThan(0);
}
});
test('should maintain consistency under rapid updates and resolution calls', () => {
const entityId = 'stress-test-entity';
let updateCount = 0;
// Add initial deltas
for (let i = 0; i < 50; i++) {
lossless.ingestDelta(createDelta(
`writer${i % 5}`,
`host${i % 3}`
)
.withTimestamp(1000 + i)
.addPointer('collection', entityId, 'counter')
.addPointer('counter', 1)
.buildV1()
);
updateCount++;
}
// Verify initial state
let resolver = new SumResolver(lossless, ['counter']);
let result = resolver.resolve();
expect(result).toBeDefined();
expect(result![entityId].properties.counter).toBe(updateCount);
// Add more deltas and verify consistency
for (let i = 0; i < 25; i++) {
lossless.ingestDelta(createDelta('late-writer', 'late-host')
.withTimestamp(2000 + i)
.addPointer('collection', entityId, 'counter')
.addPointer('counter', 2)
.buildV1()
);
updateCount += 2;
// Create a fresh resolver to avoid accumulator caching issues
resolver = new SumResolver(lossless, ['counter']);
result = resolver.resolve();
expect(result![entityId].properties.counter).toBe(updateCount);
}
});
});
});