1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.util;
18
19 import static org.junit.Assert.assertEquals;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HColumnDescriptor;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.HTableDescriptor;
34 import org.apache.hadoop.hbase.testclassification.LargeTests;
35 import org.apache.hadoop.hbase.TableNotFoundException;
36 import org.apache.hadoop.hbase.client.Admin;
37 import org.apache.hadoop.hbase.client.HBaseAdmin;
38 import org.apache.hadoop.hbase.io.compress.Compression;
39 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
41 import org.junit.After;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45 import org.junit.runner.RunWith;
46 import org.junit.runners.Parameterized;
47 import org.junit.runners.Parameterized.Parameters;
48
49
50
51
52
53 @Category(LargeTests.class)
54 @RunWith(Parameterized.class)
55 public class TestMiniClusterLoadSequential {
56
57 private static final Log LOG = LogFactory.getLog(
58 TestMiniClusterLoadSequential.class);
59
60 protected static final TableName TABLE =
61 TableName.valueOf("load_test_tbl");
62 protected static final byte[] CF = Bytes.toBytes("load_test_cf");
63 protected static final int NUM_THREADS = 8;
64 protected static final int NUM_RS = 2;
65 protected static final int TIMEOUT_MS = 180000;
66 protected static final HBaseTestingUtility TEST_UTIL =
67 new HBaseTestingUtility();
68
69 protected final Configuration conf = TEST_UTIL.getConfiguration();
70 protected final boolean isMultiPut;
71 protected final DataBlockEncoding dataBlockEncoding;
72
73 protected MultiThreadedWriter writerThreads;
74 protected MultiThreadedReader readerThreads;
75 protected int numKeys;
76
77 protected Compression.Algorithm compression = Compression.Algorithm.NONE;
78
79 public TestMiniClusterLoadSequential(boolean isMultiPut,
80 DataBlockEncoding dataBlockEncoding) {
81 this.isMultiPut = isMultiPut;
82 this.dataBlockEncoding = dataBlockEncoding;
83 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
84
85
86 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, 10.0f);
87 }
88
89 @Parameters
90 public static Collection<Object[]> parameters() {
91 List<Object[]> parameters = new ArrayList<Object[]>();
92 for (boolean multiPut : new boolean[]{false, true}) {
93 for (DataBlockEncoding dataBlockEncoding : new DataBlockEncoding[] {
94 DataBlockEncoding.NONE, DataBlockEncoding.PREFIX }) {
95 parameters.add(new Object[]{multiPut, dataBlockEncoding});
96 }
97 }
98 return parameters;
99 }
100
101 @Before
102 public void setUp() throws Exception {
103 LOG.debug("Test setup: isMultiPut=" + isMultiPut);
104 TEST_UTIL.startMiniCluster(1, NUM_RS);
105 }
106
107 @After
108 public void tearDown() throws Exception {
109 LOG.debug("Test teardown: isMultiPut=" + isMultiPut);
110 TEST_UTIL.shutdownMiniCluster();
111 }
112
113 protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen,
114 Configuration conf, TableName tableName, double verifyPercent) throws IOException {
115 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
116 return reader;
117 }
118
119 protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen,
120 Configuration conf, TableName tableName) throws IOException {
121 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName);
122 writer.setMultiPut(isMultiPut);
123 return writer;
124 }
125
126 @Test(timeout=TIMEOUT_MS)
127 public void loadTest() throws Exception {
128 prepareForLoadTest();
129 runLoadTestOnExistingTable();
130 }
131
132 protected void runLoadTestOnExistingTable() throws IOException {
133 writerThreads.start(0, numKeys, NUM_THREADS);
134 writerThreads.waitForFinish();
135 assertEquals(0, writerThreads.getNumWriteFailures());
136
137 readerThreads.start(0, numKeys, NUM_THREADS);
138 readerThreads.waitForFinish();
139 assertEquals(0, readerThreads.getNumReadFailures());
140 assertEquals(0, readerThreads.getNumReadErrors());
141 assertEquals(numKeys, readerThreads.getNumKeysVerified());
142 }
143
144 protected void createPreSplitLoadTestTable(HTableDescriptor htd, HColumnDescriptor hcd)
145 throws IOException {
146 HBaseTestingUtility.createPreSplitLoadTestTable(conf, htd, hcd);
147 TEST_UTIL.waitUntilAllRegionsAssigned(htd.getTableName());
148 }
149
150 protected void prepareForLoadTest() throws IOException {
151 LOG.info("Starting load test: dataBlockEncoding=" + dataBlockEncoding +
152 ", isMultiPut=" + isMultiPut);
153 numKeys = numKeys();
154 Admin admin = new HBaseAdmin(conf);
155 while (admin.getClusterStatus().getServers().size() < NUM_RS) {
156 LOG.info("Sleeping until " + NUM_RS + " RSs are online");
157 Threads.sleepWithoutInterrupt(1000);
158 }
159 admin.close();
160
161 HTableDescriptor htd = new HTableDescriptor(TABLE);
162 HColumnDescriptor hcd = new HColumnDescriptor(CF)
163 .setCompressionType(compression)
164 .setDataBlockEncoding(dataBlockEncoding);
165 createPreSplitLoadTestTable(htd, hcd);
166
167 LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
168 writerThreads = prepareWriterThreads(dataGen, conf, TABLE);
169 readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100);
170 }
171
172 protected int numKeys() {
173 return 1000;
174 }
175
176 protected HColumnDescriptor getColumnDesc(Admin admin)
177 throws TableNotFoundException, IOException {
178 return admin.getTableDescriptor(TABLE).getFamily(CF);
179 }
180
181 }