1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.Set;
23
24 import org.apache.commons.cli.CommandLine;
25 import org.apache.commons.lang.StringUtils;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
31 import org.apache.hadoop.hbase.regionserver.HStore;
32 import org.apache.hadoop.hbase.regionserver.StoreEngine;
33 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
34 import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
35 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.MultiThreadedAction;
38 import org.apache.hadoop.hbase.util.MultiThreadedReader;
39 import org.apache.hadoop.hbase.util.MultiThreadedWriter;
40 import org.apache.hadoop.hbase.util.RegionSplitter;
41 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
42 import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
43 import org.junit.Assert;
44
45
46
47
48
49 @InterfaceAudience.Private
50 public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
51 private static final Log LOG = LogFactory.getLog(StripeCompactionsPerformanceEvaluation.class);
52 private static final String TABLE_NAME =
53 StripeCompactionsPerformanceEvaluation.class.getSimpleName();
54 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF");
55 private static final int MIN_NUM_SERVERS = 1;
56
57
58 private static final String DATAGEN_KEY = "datagen";
59 private static final String ITERATIONS_KEY = "iters";
60 private static final String PRELOAD_COUNT_KEY = "pwk";
61 private static final String WRITE_COUNT_KEY = "wk";
62 private static final String WRITE_THREADS_KEY = "wt";
63 private static final String READ_THREADS_KEY = "rt";
64 private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes";
65 private static final String SPLIT_SIZE_KEY = "splitsize";
66 private static final String SPLIT_PARTS_KEY = "splitparts";
67 private static final String VALUE_SIZE_KEY = "valsize";
68 private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards";
69
70
71 private LoadTestDataGenerator dataGen;
72 private int iterationCount;
73 private long preloadKeys;
74 private long writeKeys;
75 private int writeThreads;
76 private int readThreads;
77 private Long initialStripeCount;
78 private Long splitSize;
79 private Long splitParts;
80
81 private static final String VALUE_SIZE_DEFAULT = "512:4096";
82
83 protected IntegrationTestingUtility util = new IntegrationTestingUtility();
84
85 @Override
86 protected void addOptions() {
87 addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)");
88 addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many"
89 + " sequences. The number of such shards per server is specified (default is 1).");
90 addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare");
91 addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server");
92 addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server");
93 addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing");
94 addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading");
95 addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially");
96 addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes");
97 addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits");
98 addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;"
99 + " default " + VALUE_SIZE_DEFAULT);
100 }
101
102 @Override
103 protected void processOptions(CommandLine cmd) {
104 int minValueSize = 0, maxValueSize = 0;
105 String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT);
106 if (valueSize.contains(":")) {
107 String[] valueSizes = valueSize.split(":");
108 if (valueSize.length() != 2) throw new RuntimeException("Invalid value size: " + valueSize);
109 minValueSize = Integer.parseInt(valueSizes[0]);
110 maxValueSize = Integer.parseInt(valueSizes[1]);
111 } else {
112 minValueSize = maxValueSize = Integer.parseInt(valueSize);
113 }
114 String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase();
115 if ("default".equals(datagen)) {
116 dataGen = new MultiThreadedAction.DefaultDataGenerator(
117 minValueSize, maxValueSize, 1, 1, new byte[][] { COLUMN_FAMILY });
118 } else if ("sequential".equals(datagen)) {
119 int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1"));
120 dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards);
121 } else {
122 throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen);
123 }
124 iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1"));
125 preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000"));
126 writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000"));
127 writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10"));
128 readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20"));
129 initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY);
130 splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY);
131 splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY);
132 }
133
134 private Long getLongOrNull(CommandLine cmd, String option) {
135 if (!cmd.hasOption(option)) return null;
136 return Long.parseLong(cmd.getOptionValue(option));
137 }
138
139 @Override
140 public Configuration getConf() {
141 Configuration c = super.getConf();
142 if (c == null && util != null) {
143 conf = util.getConfiguration();
144 c = conf;
145 }
146 return c;
147 }
148
149 @Override
150 protected int doWork() throws Exception {
151 setUp();
152 try {
153 boolean isStripe = true;
154 for (int i = 0; i < iterationCount * 2; ++i) {
155 createTable(isStripe);
156 runOneTest((isStripe ? "Stripe" : "Default") + i, conf);
157 isStripe = !isStripe;
158 }
159 return 0;
160 } finally {
161 tearDown();
162 }
163 }
164
165
166 private void setUp() throws Exception {
167 this.util = new IntegrationTestingUtility();
168 LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers");
169 util.initializeCluster(MIN_NUM_SERVERS);
170 LOG.debug("Done initializing/checking cluster");
171 }
172
173 protected void deleteTable() throws Exception {
174 if (util.getHBaseAdmin().tableExists(TABLE_NAME)) {
175 LOG.info("Deleting table");
176 if (!util.getHBaseAdmin().isTableDisabled(TABLE_NAME)) {
177 util.getHBaseAdmin().disableTable(TABLE_NAME);
178 }
179 util.getHBaseAdmin().deleteTable(TABLE_NAME);
180 LOG.info("Deleted table");
181 }
182 }
183
184 private void createTable(boolean isStripe) throws Exception {
185 createTable(createHtd(isStripe));
186 }
187
188 private void tearDown() throws Exception {
189 deleteTable();
190 LOG.info("Restoring the cluster");
191 util.restoreCluster();
192 LOG.info("Done restoring the cluster");
193 }
194
195 private void runOneTest(String description, Configuration conf) throws Exception {
196 int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
197 long startKey = (long)preloadKeys * numServers;
198 long endKey = startKey + (long)writeKeys * numServers;
199 status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
200 description, numServers, startKey, endKey));
201
202 TableName tn = TableName.valueOf(TABLE_NAME);
203 if (preloadKeys > 0) {
204 MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, tn);
205 long time = System.currentTimeMillis();
206 preloader.start(0, startKey, writeThreads);
207 preloader.waitForFinish();
208 if (preloader.getNumWriteFailures() > 0) {
209 throw new IOException("Preload failed");
210 }
211 int waitTime = (int)Math.min(preloadKeys / 100, 30000);
212 status(description + " preload took " + (System.currentTimeMillis()-time)/1000
213 + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
214 Thread.sleep(waitTime);
215 }
216
217 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tn);
218 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tn, 100);
219
220 reader.linkToWriter(writer);
221
222 long testStartTime = System.currentTimeMillis();
223 writer.start(startKey, endKey, writeThreads);
224 reader.start(startKey, endKey, readThreads);
225 writer.waitForFinish();
226 reader.waitForFinish();
227
228
229 status("Readers and writers stopped for test " + description);
230
231 boolean success = writer.getNumWriteFailures() == 0;
232 if (!success) {
233 LOG.error("Write failed");
234 } else {
235 success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
236 if (!success) {
237 LOG.error("Read failed");
238 }
239 }
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256 status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
257 Assert.assertTrue(success);
258 }
259
260 private static void status(String s) {
261 LOG.info("STATUS " + s);
262 System.out.println(s);
263 }
264
265 private HTableDescriptor createHtd(boolean isStripe) throws Exception {
266 HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
267 htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
268 String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName();
269 htd.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy);
270 if (isStripe) {
271 htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
272 if (initialStripeCount != null) {
273 htd.setConfiguration(
274 StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString());
275 htd.setConfiguration(
276 HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount));
277 } else {
278 htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "500");
279 }
280 if (splitSize != null) {
281 htd.setConfiguration(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString());
282 }
283 if (splitParts != null) {
284 htd.setConfiguration(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString());
285 }
286 } else {
287 htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "10");
288 }
289 return htd;
290 }
291
292 protected void createTable(HTableDescriptor htd) throws Exception {
293 deleteTable();
294 if (util.getHBaseClusterInterface() instanceof MiniHBaseCluster) {
295 LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low.");
296 htd.setConfiguration(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576");
297 }
298 byte[][] splits = new RegionSplitter.HexStringSplit().split(
299 util.getHBaseClusterInterface().getClusterStatus().getServersSize());
300 util.getHBaseAdmin().createTable(htd, splits);
301 }
302
303 public static class SeqShardedDataGenerator extends LoadTestDataGenerator {
304 private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") };
305 private static final int PAD_TO = 10;
306 private static final int PREFIX_PAD_TO = 7;
307
308 private final int numPartitions;
309
310 public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) {
311 super(minValueSize, maxValueSize);
312 this.numPartitions = numPartitions;
313 }
314
315 @Override
316 public byte[] getDeterministicUniqueKey(long keyBase) {
317 String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0");
318 return Bytes.toBytes(getPrefix(keyBase) + num);
319 }
320
321 private String getPrefix(long i) {
322 return StringUtils.leftPad(String.valueOf((int)(i % numPartitions)), PREFIX_PAD_TO, "0");
323 }
324
325 @Override
326 public byte[][] getColumnFamilies() {
327 return new byte[][] { COLUMN_FAMILY };
328 }
329
330 @Override
331 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
332 return COLUMN_NAMES;
333 }
334
335 @Override
336 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
337 return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
338 }
339
340 @Override
341 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
342 return LoadTestKVGenerator.verify(value, rowKey, cf, column);
343 }
344
345 @Override
346 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
347 return true;
348 }
349 };
350 }