package voldemort.client.rebalance; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import org.junit.Assert; import org.junit.Test; import voldemort.ServerTestUtils; import voldemort.client.RoutingTier; import voldemort.cluster.Cluster; import voldemort.routing.RoutingStrategyType; import voldemort.serialization.SerializerDefinition; import voldemort.server.rebalance.Rebalancer; import voldemort.store.Store; import voldemort.store.StoreDefinition; import voldemort.store.StoreDefinitionBuilder; import voldemort.store.bdb.BdbStorageConfiguration; import voldemort.store.memory.InMemoryStorageEngine; import voldemort.store.metadata.MetadataStore; import voldemort.versioning.Versioned; import voldemort.xml.ClusterMapper; import voldemort.xml.StoreDefinitionsMapper; import com.google.common.collect.Lists; import com.google.testing.threadtester.AnnotatedTestRunner; import com.google.testing.threadtester.ThreadedAfter; import com.google.testing.threadtester.ThreadedMain; import com.google.testing.threadtester.ThreadedSecondary; public class RebalanceMetadataConsistencyTest { static MetadataStore metadataStore; static Cluster currentCluster; static Cluster targetCluster; protected static String testStoreNameRW = "test"; protected static String testStoreNameRW2 = "test2"; private static StoreDefinition rwStoreDefWithReplication; private static StoreDefinition rwStoreDefWithReplication2; static Rebalancer rebalancer; Cluster checkCluster; List checkstores; static Stub1 stub1; /** * Convenient method to execute private methods from other classes. * * @param test Instance of the class we want to test * @param methodName Name of the method we want to test * @param params Arguments we want to pass to the method * @return Object with the result of the executed method * @throws Exception */ public static Object invokePrivateMethod(Object test, String methodName, Object params[]) throws Exception { Object ret = null; final Method[] methods = test.getClass().getDeclaredMethods(); for(int i = 0; i < methods.length; ++i) { if(methods[i].getName().equals(methodName)) { methods[i].setAccessible(true); ret = methods[i].invoke(test, params); break; } } return ret; } static { stub1 = new Stub1(); currentCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0 }, { 1, 3 }, { 2 } }); targetCluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0 }, { 1 }, { 2 }, { 3 } }); rwStoreDefWithReplication = new StoreDefinitionBuilder().setName(testStoreNameRW) .setType(BdbStorageConfiguration.TYPE_NAME) .setKeySerializer(new SerializerDefinition("string")) .setValueSerializer(new SerializerDefinition("string")) .setRoutingPolicy(RoutingTier.CLIENT) .setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY) .setReplicationFactor(2) .setPreferredReads(1) .setRequiredReads(1) .setPreferredWrites(1) .setRequiredWrites(1) .build(); Store innerStore = new InMemoryStorageEngine("inner-store"); innerStore.put(MetadataStore.CLUSTER_KEY, new Versioned(new ClusterMapper().writeCluster(currentCluster)), null); innerStore.put(MetadataStore.STORES_KEY, new Versioned(new StoreDefinitionsMapper().writeStoreList(Lists.newArrayList(rwStoreDefWithReplication))), null); rwStoreDefWithReplication2 = new StoreDefinitionBuilder().setName(testStoreNameRW2) .setType(BdbStorageConfiguration.TYPE_NAME) .setKeySerializer(new SerializerDefinition("string")) .setValueSerializer(new SerializerDefinition("string")) .setRoutingPolicy(RoutingTier.CLIENT) .setRoutingStrategyType(RoutingStrategyType.CONSISTENT_STRATEGY) .setReplicationFactor(2) .setPreferredReads(1) .setRequiredReads(1) .setPreferredWrites(1) .setRequiredWrites(1) .build(); rebalancer = new Rebalancer(null, metadataStore, null, null); Object[] params = { MetadataStore.CLUSTER_KEY, currentCluster, MetadataStore.STORES_KEY, Lists.newArrayList(rwStoreDefWithReplication) }; try { invokePrivateMethod(rebalancer, "changeClusterAndStores", params); } catch(Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } metadataStore = new MetadataStore(innerStore, 0); } @ThreadedMain public void mainThread() { stub1.test(targetCluster, rwStoreDefWithReplication2, rebalancer); } @ThreadedSecondary public void secondThread() { Cluster checkCluster = metadataStore.getCluster(); List checkstores = metadataStore.getStoreDefList(); } @ThreadedAfter public void after() { if(checkCluster.equals(currentCluster)) { Assert.assertEquals(checkstores.get(0), rwStoreDefWithReplication); } if(checkCluster.equals(targetCluster)) { Assert.assertEquals(checkstores.get(0), rwStoreDefWithReplication2); } } @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testThreading() { AnnotatedTestRunner runner = new AnnotatedTestRunner(); // Run all Weaver tests in this class, using MyList as the Class Under // Test. ArrayList classes = new ArrayList(); classes.add(Stub1.class); classes.add(MetadataStore.class); runner.runTests(this.getClass(), classes); } } class Stub1 { public void test(Cluster targetCluster, StoreDefinition rwStoreDefWithReplication2, Rebalancer rebalancer) { Object[] params = { MetadataStore.CLUSTER_KEY, targetCluster, MetadataStore.STORES_KEY, Lists.newArrayList(rwStoreDefWithReplication2) }; try { RebalanceMetadataConsistencyTest.invokePrivateMethod(rebalancer, "changeClusterAndStores", params); } catch(Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }