1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.util;
18
19 import static org.junit.Assert.assertEquals;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HColumnDescriptor;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.HTableDescriptor;
34 import org.apache.hadoop.hbase.LargeTests;
35 import org.apache.hadoop.hbase.TableNotFoundException;
36 import org.apache.hadoop.hbase.client.HBaseAdmin;
37 import org.apache.hadoop.hbase.io.compress.Compression;
38 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
39 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44 import org.junit.runner.RunWith;
45 import org.junit.runners.Parameterized;
46 import org.junit.runners.Parameterized.Parameters;
47
48
49
50
51
52 @Category(LargeTests.class)
53 @RunWith(Parameterized.class)
54 public class TestMiniClusterLoadSequential {
55
56 private static final Log LOG = LogFactory.getLog(
57 TestMiniClusterLoadSequential.class);
58
59 protected static final TableName TABLE =
60 TableName.valueOf("load_test_tbl");
61 protected static final byte[] CF = Bytes.toBytes("load_test_cf");
62 protected static final int NUM_THREADS = 8;
63 protected static final int NUM_RS = 2;
64 protected static final int TIMEOUT_MS = 180000;
65 protected static final HBaseTestingUtility TEST_UTIL =
66 new HBaseTestingUtility();
67
68 protected final Configuration conf = TEST_UTIL.getConfiguration();
69 protected final boolean isMultiPut;
70 protected final DataBlockEncoding dataBlockEncoding;
71
72 protected MultiThreadedWriter writerThreads;
73 protected MultiThreadedReader readerThreads;
74 protected int numKeys;
75
76 protected Compression.Algorithm compression = Compression.Algorithm.NONE;
77
78 public TestMiniClusterLoadSequential(boolean isMultiPut,
79 DataBlockEncoding dataBlockEncoding) {
80 this.isMultiPut = isMultiPut;
81 this.dataBlockEncoding = dataBlockEncoding;
82 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
83
84
85 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, 10.0f);
86 }
87
88 @Parameters
89 public static Collection<Object[]> parameters() {
90 List<Object[]> parameters = new ArrayList<Object[]>();
91 for (boolean multiPut : new boolean[]{false, true}) {
92 for (DataBlockEncoding dataBlockEncoding : new DataBlockEncoding[] {
93 DataBlockEncoding.NONE, DataBlockEncoding.PREFIX }) {
94 parameters.add(new Object[]{multiPut, dataBlockEncoding});
95 }
96 }
97 return parameters;
98 }
99
100 @Before
101 public void setUp() throws Exception {
102 LOG.debug("Test setup: isMultiPut=" + isMultiPut);
103 TEST_UTIL.startMiniCluster(1, NUM_RS);
104 }
105
106 @After
107 public void tearDown() throws Exception {
108 LOG.debug("Test teardown: isMultiPut=" + isMultiPut);
109 TEST_UTIL.shutdownMiniCluster();
110 }
111
112 protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen,
113 Configuration conf, TableName tableName, double verifyPercent) {
114 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
115 return reader;
116 }
117
118 protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen,
119 Configuration conf, TableName tableName) {
120 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName);
121 writer.setMultiPut(isMultiPut);
122 return writer;
123 }
124
125 @Test(timeout=TIMEOUT_MS)
126 public void loadTest() throws Exception {
127 prepareForLoadTest();
128 runLoadTestOnExistingTable();
129 }
130
131 protected void runLoadTestOnExistingTable() throws IOException {
132 writerThreads.start(0, numKeys, NUM_THREADS);
133 writerThreads.waitForFinish();
134 assertEquals(0, writerThreads.getNumWriteFailures());
135
136 readerThreads.start(0, numKeys, NUM_THREADS);
137 readerThreads.waitForFinish();
138 assertEquals(0, readerThreads.getNumReadFailures());
139 assertEquals(0, readerThreads.getNumReadErrors());
140 assertEquals(numKeys, readerThreads.getNumKeysVerified());
141 }
142
143 protected void createPreSplitLoadTestTable(HTableDescriptor htd, HColumnDescriptor hcd)
144 throws IOException {
145 HBaseTestingUtility.createPreSplitLoadTestTable(conf, htd, hcd);
146 TEST_UTIL.waitUntilAllRegionsAssigned(htd.getTableName());
147 }
148
149 protected void prepareForLoadTest() throws IOException {
150 LOG.info("Starting load test: dataBlockEncoding=" + dataBlockEncoding +
151 ", isMultiPut=" + isMultiPut);
152 numKeys = numKeys();
153 HBaseAdmin admin = new HBaseAdmin(conf);
154 while (admin.getClusterStatus().getServers().size() < NUM_RS) {
155 LOG.info("Sleeping until " + NUM_RS + " RSs are online");
156 Threads.sleepWithoutInterrupt(1000);
157 }
158 admin.close();
159
160 HTableDescriptor htd = new HTableDescriptor(TABLE);
161 HColumnDescriptor hcd = new HColumnDescriptor(CF)
162 .setCompressionType(compression)
163 .setDataBlockEncoding(dataBlockEncoding);
164 createPreSplitLoadTestTable(htd, hcd);
165
166 LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
167 writerThreads = prepareWriterThreads(dataGen, conf, TABLE);
168 readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100);
169 }
170
171 protected int numKeys() {
172 return 1000;
173 }
174
175 protected HColumnDescriptor getColumnDesc(HBaseAdmin admin)
176 throws TableNotFoundException, IOException {
177 return admin.getTableDescriptor(TABLE).getFamily(CF);
178 }
179
180 }