EP / schemaless

Schemaless Data Store.

Clone this repository (size: 166.6 KB): HTTPS / SSH
$ hg clone http://bitbucket.org/EP/schemaless/
commit 23: 8b1b7e934784
parent 22: 4ba06c6f16b3
branch: default
Working index gardener.
Eung-ju PARK
12 months ago

Changed (Δ1.9 KB):

Up to file-list src/main/java/schemaless/EntityStore.java:

@@ -12,6 +12,5 @@ public interface EntityStore {
12
12
	byte[] find(UUID id);
13
13
14
14
	void scanAll(EntityScanner scanner);
15
	void scanRecentlyUpdated(EntityScanner scanner, int n);
16
15
	void scanUpdatedSince(EntityScanner scanner, Date basis);
17
16
}

Up to file-list src/main/java/schemaless/gardener/IndexGardener.java:

@@ -37,10 +37,10 @@ public class IndexGardener implements Ru
37
37
		MultipleIndexGardening gardening = new MultipleIndexGardening(this.indexStores);
38
38
		while (running) {
39
39
			gardening.reset();
40
			entityStore.scanRecentlyUpdated(gardening, 10);
40
			entityStore.scanUpdatedSince(gardening, gardening.getCursor());
41
41
			if (gardening.getInconsistentCount() == 0) {
42
42
				try {
43
					wait(1000);
43
					wait(200);
44
44
				} catch (InterruptedException e) {
45
45
					e.printStackTrace(System.err);
46
46
				}

Up to file-list src/main/java/schemaless/gardener/IndexGardening.java:

@@ -30,7 +30,6 @@ public class IndexGardening implements E
30
30
	}
31
31
	
32
32
	public void inspect(EntityRow entityRow) {
33
		System.out.println(System.currentTimeMillis());
34
33
		UUID id = entityRow.id;
35
34
		System.out.println("Check " + indexDefinition.name() + " index of " + entityDefinition.name() + " entity " + id);
36
35
		try {
@@ -50,10 +49,10 @@ public class IndexGardening implements E
50
49
				listener.fixedOutdated(entityRow);
51
50
			}
52
51
		} catch (Exception e) {
52
			listener.failed(entityRow);
53
53
			System.err.println("Error while checking " + indexDefinition.name() + " index of " + entityDefinition.name() + " entity " + id);
54
54
			e.printStackTrace(System.err);
55
55
		}
56
		System.out.println(System.currentTimeMillis());
57
56
	}
58
57
59
58
	protected boolean isMissing(IndexRow indexRow) {
@@ -81,9 +80,10 @@ public class IndexGardening implements E
81
80
	
82
81
	public static interface Listener {
83
82
		void foundMissing(EntityRow entityRow);
83
		void fixedMissing(EntityRow entityRow);
84
84
		void foundOutdated(EntityRow entityRow);
85
		void fixedMissing(EntityRow entityRow);
86
85
		void fixedOutdated(EntityRow entityRow);
86
		void failed(EntityRow entityRow);
87
87
	}
88
88
	
89
89
	public static class NullListener implements Listener {
@@ -98,5 +98,8 @@ public class IndexGardening implements E
98
98
99
99
		public void foundOutdated(EntityRow entityRow) {
100
100
		}
101
102
		public void failed(EntityRow entityRow) {
103
		}
101
104
	}
102
105
}

Up to file-list src/main/java/schemaless/gardener/MultipleIndexGardening.java:

1
1
package schemaless.gardener;
2
2
3
import java.util.Date;
4
3
5
import schemaless.EntityRow;
4
6
import schemaless.EntityScanner;
5
7
import schemaless.IndexStore;
6
8
7
9
class MultipleIndexGardening implements EntityScanner, IndexGardening.Listener {
8
10
	private IndexGardening[] gardenings;
11
	private Date cursor;
9
12
	private int missingCount;
10
13
	private int outdatedCount;
11
14
	
@@ -14,16 +17,30 @@ class MultipleIndexGardening implements
14
17
		for (int i = 0; i < indexStores.length; i++) {
15
18
			gardenings[i] = new IndexGardening(indexStores[i], this);
16
19
		}
20
		cursor = new Date();
21
		reset();
22
	}
23
24
	public void inspect(EntityRow entityRow) {
25
		for (IndexGardening each : gardenings) {
26
			each.inspect(entityRow);
27
		}
28
		if (entityRow.updated.after(cursor)) {
29
			cursor = entityRow.updated;
30
		}
31
	}
32
	
33
	public void reset() {
34
		missingCount = 0;
35
		outdatedCount = 0;
17
36
	}
18
37
19
38
	public int getInconsistentCount() {
20
39
		return missingCount + outdatedCount;
21
40
	}
22
41
23
	public void inspect(EntityRow entityRow) {
24
		for (IndexGardening each : gardenings) {
25
			each.inspect(entityRow);
26
		}
42
	public Date getCursor() {
43
		return cursor;
27
44
	}
28
45
	
29
46
	public void foundMissing(EntityRow entityRow) {
@@ -39,9 +56,7 @@ class MultipleIndexGardening implements
39
56
40
57
	public void fixedOutdated(EntityRow entityRow) {
41
58
	}
42
	
43
	public void reset() {
44
		missingCount = 0;
45
		outdatedCount = 0;
59
60
	public void failed(EntityRow entityRow) {
46
61
	}
47
62
}

Up to file-list src/main/java/schemaless/store/MySqlEntityStore.java:

@@ -225,7 +225,7 @@ public class MySqlEntityStore implements
225
225
			rs = statement.executeQuery();
226
226
			while (rs.next()) {
227
227
				UUID id = jdbcAdapter.getColumnAsUuid(rs, 1);
228
				Date updated = rs.getTimestamp(2);
228
				Date updated = new Date(rs.getTimestamp(2).getTime());
229
229
				byte[] body = rs.getBytes(3);
230
230
				scanner.inspect(new EntityRow(id, body, updated));
231
231
			}
@@ -238,42 +238,18 @@ public class MySqlEntityStore implements
238
238
		}
239
239
	}
240
240
241
	public void scanRecentlyUpdated(EntityScanner scanner, int n) {
242
		Connection connection = null;
243
		PreparedStatement statement = null;
244
		ResultSet rs = null;
245
		try {
246
			connection = dataSource.getConnection();
247
			statement = connection.prepareStatement("SELECT id, updated, body FROM " + tableName + " ORDER BY updated DESC LIMIT ?");
248
			statement.setInt(1, n);
249
			rs = statement.executeQuery();
250
			while (rs.next()) {
251
				UUID id = jdbcAdapter.getColumnAsUuid(rs, 1);
252
				Date updated = rs.getTimestamp(2);
253
				byte[] body = rs.getBytes(3);
254
				scanner.inspect(new EntityRow(id, body, updated));
255
			}
256
		} catch (SQLException e) {
257
			throw new DataAccessException("Cannot scan entities on the table " + tableName, e);
258
		} finally {
259
			closeQuietly(rs);
260
			closeQuietly(statement);
261
			closeQuietly(connection);
262
		}
263
	}
264
	
265
241
	public void scanUpdatedSince(EntityScanner scanner, Date basis) {
266
242
		Connection connection = null;
267
243
		PreparedStatement statement = null;
268
244
		ResultSet rs = null;
269
245
		try {
270
246
			connection = dataSource.getConnection();
271
			statement = connection.prepareStatement("SELECT id, updated, body FROM " + tableName + " WHERE updated > ? ORDER BY updated");
247
			statement = connection.prepareStatement("SELECT id, updated, body FROM " + tableName + " WHERE updated>=? ORDER BY updated");
272
248
			statement.setTimestamp(1, new java.sql.Timestamp(basis.getTime()));
273
249
			rs = statement.executeQuery();
274
250
			while (rs.next()) {
275
251
				UUID id = jdbcAdapter.getColumnAsUuid(rs, 1);
276
				Date updated = rs.getTimestamp(2);
252
				Date updated = new Date(rs.getTimestamp(2).getTime());
277
253
				byte[] body = rs.getBytes(3);
278
254
				scanner.inspect(new EntityRow(id, body, updated));
279
255
			}

Up to file-list src/test/java/schemaless/examples/FeedExample.java:

@@ -17,13 +17,15 @@ public class FeedExample {
17
17
		DataSource dataSource = JdbcTestHelper.dataSource();
18
18
		MySqlStoreFactory factory = new MySqlStoreFactory(dataSource);
19
19
		EntityDefinition entityDefinition = new EntityDefinition("feed", Feed.class);
20
		IndexDefinition indexDefinition = new IndexDefinition(entityDefinition, "user_id", new String[] { "userId" });
21
22
		IndexGardener gardener = new IndexGardener(new DataStore(factory, entityDefinition, indexDefinition));
23
		gardener.start();
24
20
25
		DataStore dataStore = new DataStore(factory, entityDefinition);
21
		dataStore.insert(new Feed(UUID.randomUUID(), "someone"));
26
		Feed entity = new Feed(UUID.randomUUID(), "someone");
27
		dataStore.insert(entity);
22
28
23
		IndexDefinition indexDefinition = new IndexDefinition(entityDefinition, "user_id", new String[] { "userId" });
24
		dataStore = new DataStore(factory, entityDefinition, indexDefinition);
25
		IndexGardener gardener = new IndexGardener(dataStore);
26
		gardener.start();
27
29
		Thread.sleep(10 * 1000);
28
30
		gardener.stop();
29
31
	}

Up to file-list src/test/java/schemaless/gardener/IndexGardeningTest.java:

@@ -17,12 +17,13 @@ import schemaless.Feed;
17
17
import schemaless.IndexDefinition;
18
18
import schemaless.IndexRow;
19
19
import schemaless.IndexStore;
20
import schemaless.gardener.IndexGardening;
20
import schemaless.gardener.IndexGardening.Listener;
21
21
22
22
@RunWith(JMock.class)
23
23
public class IndexGardeningTest {
24
24
	private Mockery mockery = new JUnit4Mockery();
25
25
	private IndexGardening dut;
26
	private Listener listener;
26
27
	private IndexStore indexStore;
27
28
	private EntityDefinition entityDefinition;
28
29
	private IndexDefinition indexDefinition;
@@ -36,15 +37,17 @@ public class IndexGardeningTest {
36
37
		mockery.checking(new Expectations() {{
37
38
			allowing(indexStore).definition(); will(returnValue(indexDefinition));
38
39
		}});
39
		
40
		dut = new IndexGardening(indexStore);
40
41
		listener = mockery.mock(IndexGardening.Listener.class);
42
		dut = new IndexGardening(indexStore, listener);
41
43
		entity = new Feed(UUID.randomUUID(), "someone");
42
44
		entityRow = new EntityRow(entity.getId(), entityDefinition.serializer().serialize(entity), new Date());
43
45
	}
44
46
	
45
	@Test public void shouldNotThrowException() {
47
	@Test public void indexInspectionFailed() {
46
48
		mockery.checking(new Expectations() {{
47
49
			one(indexStore).find(entity.getId()); will(throwException(new Exception("BLAH")));
50
			one(listener).failed(entityRow);
48
51
		}});
49
52
		dut.inspect(entityRow);
50
53
	}
@@ -63,7 +66,9 @@ public class IndexGardeningTest {
63
66
	@Test public void indexIsMissing() {
64
67
		mockery.checking(new Expectations() {{
65
68
			one(indexStore).find(entity.getId()); will(returnValue(null));
69
			one(listener).foundMissing(entityRow);
66
70
			one(indexStore).insert(entity);
71
			one(listener).fixedMissing(entityRow);
67
72
		}});
68
73
69
74
		dut.inspect(entityRow);
@@ -75,7 +80,9 @@ public class IndexGardeningTest {
75
80
		
76
81
		mockery.checking(new Expectations() {{
77
82
			one(indexStore).find(entity.getId()); will(returnValue(indexRow));
83
			one(listener).foundOutdated(entityRow);
78
84
			one(indexStore).update(entity);
85
			one(listener).fixedOutdated(entityRow);
79
86
		}});
80
87
81
88
		dut.inspect(entityRow);

Up to file-list src/test/java/schemaless/store/MySql.sql:

1
DROP TABLE IF EXISTS feed;
2
CREATE TABLE feed (
3
    added_id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
4
    id CHAR(36) NOT NULL,
5
    updated TIMESTAMP NOT NULL,
6
    body MEDIUMBLOB,
7
    UNIQUE KEY (id),
8
    KEY (updated)
9
) ENGINE=InnoDB;
10
11
DROP TABLE IF EXISTS feed_index_user_id;
12
CREATE TABLE feed_index_user_id (
13
    user_id VARCHAR(36) NOT NULL,
14
    entity_id CHAR(36) NOT NULL UNIQUE,
15
    PRIMARY KEY (user_id, entity_id)
16
) ENGINE=InnoDB;

Up to file-list src/test/java/schemaless/store/MySqlEntityStoreTest.java:

@@ -15,48 +15,49 @@ import schemaless.Feed;
15
15
16
16
public class MySqlEntityStoreTest {
17
17
	private BasicDataSource dataSource;
18
	private MySqlEntityStore store;
18
	private MySqlEntityStore dut;
19
19
	private UUID id;
20
20
	
21
21
	@Before public void beforeEach() {
22
		dataSource = JdbcTestHelper.dataSource();
23
22
24
		EntityDefinition entityDefinition = new EntityDefinition("feed", Feed.class);
23
		dataSource = JdbcTestHelper.dataSource();
24
		store = new MySqlEntityStore(entityDefinition, dataSource);
25
		store.deleteAll();
25
		dut = new MySqlEntityStore(entityDefinition, dataSource);
26
		dut.deleteAll();
26
27
		id = UUID.randomUUID();
27
28
	}
28
29
	
29
30
	@After public void afterEach() throws SQLException {
30
		store.deleteAll();
31
		dut.deleteAll();
31
32
		dataSource.close();
32
33
	}
33
34
	
34
35
	@Test public void insert() {
35
		store.insert(id, new byte[0]);
36
		dut.insert(id, new byte[0]);
36
37
	}
37
38
38
39
	@Test public void update() {
39
		store.insert(id, new byte[0]);
40
		store.update(id, new byte[] { 1, 2, 3 });
41
		assertArrayEquals(new byte[] { 1, 2, 3 }, store.find(id));
40
		dut.insert(id, new byte[0]);
41
		dut.update(id, new byte[] { 1, 2, 3 });
42
		assertArrayEquals(new byte[] { 1, 2, 3 }, dut.find(id));
42
43
	}
43
44
44
45
	@Test public void delete() {
45
		store.insert(id, new byte[0]);
46
		store.delete(id);
47
		assertNull(store.find(id));
46
		dut.insert(id, new byte[0]);
47
		dut.delete(id);
48
		assertNull(dut.find(id));
48
49
	}
49
50
50
51
	@Test public void isExist() {
51
		assertFalse(store.isExist(id));
52
		store.insert(id, new byte[0]);
53
		assertTrue(store.isExist(id));
52
		assertFalse(dut.isExist(id));
53
		dut.insert(id, new byte[0]);
54
		assertTrue(dut.isExist(id));
54
55
	}
55
56
56
57
	@Test public void find() {
57
		assertNull(store.find(id));
58
		assertNull(dut.find(id));
58
59
59
		store.insert(id, new byte[] { 1, 2, 3 });
60
		assertArrayEquals(new byte[] { 1, 2, 3 }, store.find(id));
60
		dut.insert(id, new byte[] { 1, 2, 3 });
61
		assertArrayEquals(new byte[] { 1, 2, 3 }, dut.find(id));
61
62
	}
62
63
}

Up to file-list src/test/java/schemaless/store/MySqlIndexStoreTest.java:

@@ -25,9 +25,10 @@ public class MySqlIndexStoreTest {
25
25
	private Feed entity = new Feed(UUID.randomUUID(), "someone"); 
26
26
27
27
	@Before public void beforeEach() {
28
		dataSource = JdbcTestHelper.dataSource();
29
28
30
		EntityDefinition entityDefinition = new EntityDefinition("feed", Feed.class);
29
31
		IndexDefinition index = new IndexDefinition(entityDefinition, "user_id", new String[] { "userId" });
30
		dataSource = JdbcTestHelper.dataSource();
31
32
		dut = new MySqlIndexStore(index, dataSource);
32
33
		dut.deleteAll();
33
34
	}