View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.Set;
23  
24  import org.apache.commons.cli.CommandLine;
25  import org.apache.commons.lang.StringUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
31  import org.apache.hadoop.hbase.regionserver.HStore;
32  import org.apache.hadoop.hbase.regionserver.StoreEngine;
33  import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
34  import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
35  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.MultiThreadedAction;
38  import org.apache.hadoop.hbase.util.MultiThreadedReader;
39  import org.apache.hadoop.hbase.util.MultiThreadedWriter;
40  import org.apache.hadoop.hbase.util.RegionSplitter;
41  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
42  import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
43  import org.junit.Assert;
44  
45  
46  /**
47   * A perf test which does large data ingestion using stripe compactions and regular compactions.
48   */
49  @InterfaceAudience.Private
50  public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
51    private static final Log LOG = LogFactory.getLog(StripeCompactionsPerformanceEvaluation.class);
52    private static final String TABLE_NAME =
53        StripeCompactionsPerformanceEvaluation.class.getSimpleName();
54    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF");
55    private static final int MIN_NUM_SERVERS = 1;
56  
57    // Option names.
58    private static final String DATAGEN_KEY = "datagen";
59    private static final String ITERATIONS_KEY = "iters";
60    private static final String PRELOAD_COUNT_KEY = "pwk";
61    private static final String WRITE_COUNT_KEY = "wk";
62    private static final String WRITE_THREADS_KEY = "wt";
63    private static final String READ_THREADS_KEY = "rt";
64    private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes";
65    private static final String SPLIT_SIZE_KEY = "splitsize";
66    private static final String SPLIT_PARTS_KEY = "splitparts";
67    private static final String VALUE_SIZE_KEY = "valsize";
68    private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards";
69  
70    // Option values.
71    private LoadTestDataGenerator dataGen;
72    private int iterationCount;
73    private long preloadKeys;
74    private long writeKeys;
75    private int writeThreads;
76    private int readThreads;
77    private Long initialStripeCount;
78    private Long splitSize;
79    private Long splitParts;
80  
81    private static final String VALUE_SIZE_DEFAULT = "512:4096";
82  
83    protected IntegrationTestingUtility util = new IntegrationTestingUtility();
84  
85    @Override
86    protected void addOptions() {
87      addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)");
88      addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many"
89          + " sequences. The number of such shards per server is specified (default is 1).");
90      addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare");
91      addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server");
92      addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server");
93      addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing");
94      addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading");
95      addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially");
96      addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes");
97      addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits");
98      addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;"
99          + " default " + VALUE_SIZE_DEFAULT);
100   }
101 
102   @Override
103   protected void processOptions(CommandLine cmd) {
104     int minValueSize = 0, maxValueSize = 0;
105     String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT);
106     if (valueSize.contains(":")) {
107       String[] valueSizes = valueSize.split(":");
108       if (valueSize.length() != 2) throw new RuntimeException("Invalid value size: " + valueSize);
109       minValueSize = Integer.parseInt(valueSizes[0]);
110       maxValueSize = Integer.parseInt(valueSizes[1]);
111     } else {
112       minValueSize = maxValueSize = Integer.parseInt(valueSize);
113     }
114     String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase();
115     if ("default".equals(datagen)) {
116       dataGen = new MultiThreadedAction.DefaultDataGenerator(
117           minValueSize, maxValueSize, 1, 1, new byte[][] { COLUMN_FAMILY });
118     } else if ("sequential".equals(datagen)) {
119       int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1"));
120       dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards);
121     } else {
122       throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen);
123     }
124     iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1"));
125     preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000"));
126     writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000"));
127     writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10"));
128     readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20"));
129     initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY);
130     splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY);
131     splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY);
132   }
133 
134   private Long getLongOrNull(CommandLine cmd, String option) {
135     if (!cmd.hasOption(option)) return null;
136     return Long.parseLong(cmd.getOptionValue(option));
137   }
138 
139   @Override
140   public Configuration getConf() {
141     Configuration c = super.getConf();
142     if (c == null && util != null) {
143       conf = util.getConfiguration();
144       c = conf;
145     }
146     return c;
147   }
148 
149   @Override
150   protected int doWork() throws Exception {
151     setUp();
152     try {
153       boolean isStripe = true;
154       for (int i = 0; i < iterationCount * 2; ++i) {
155         createTable(isStripe);
156         runOneTest((isStripe ? "Stripe" : "Default") + i, conf);
157         isStripe = !isStripe;
158       }
159       return 0;
160     } finally {
161       tearDown();
162     }
163   }
164 
165 
166   private void setUp() throws Exception {
167     this.util = new IntegrationTestingUtility();
168     LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers");
169     util.initializeCluster(MIN_NUM_SERVERS);
170     LOG.debug("Done initializing/checking cluster");
171   }
172 
173   protected void deleteTable() throws Exception {
174     if (util.getHBaseAdmin().tableExists(TABLE_NAME)) {
175       LOG.info("Deleting table");
176       if (!util.getHBaseAdmin().isTableDisabled(TABLE_NAME)) {
177         util.getHBaseAdmin().disableTable(TABLE_NAME);
178       }
179       util.getHBaseAdmin().deleteTable(TABLE_NAME);
180       LOG.info("Deleted table");
181     }
182   }
183 
184   private void createTable(boolean isStripe) throws Exception {
185     createTable(createHtd(isStripe));
186   }
187 
188   private void tearDown() throws Exception {
189     deleteTable();
190     LOG.info("Restoring the cluster");
191     util.restoreCluster();
192     LOG.info("Done restoring the cluster");
193   }
194 
195   private void runOneTest(String description, Configuration conf) throws Exception {
196     int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
197     long startKey = (long)preloadKeys * numServers;
198     long endKey = startKey + (long)writeKeys * numServers;
199     status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
200         description, numServers, startKey, endKey));
201 
202     TableName tn = TableName.valueOf(TABLE_NAME);
203     if (preloadKeys > 0) {
204       MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, tn);
205       long time = System.currentTimeMillis();
206       preloader.start(0, startKey, writeThreads);
207       preloader.waitForFinish();
208       if (preloader.getNumWriteFailures() > 0) {
209         throw new IOException("Preload failed");
210       }
211       int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary
212       status(description + " preload took " + (System.currentTimeMillis()-time)/1000
213           + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
214       Thread.sleep(waitTime);
215     }
216 
217     MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tn);
218     MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tn, 100);
219     // reader.getMetrics().enable();
220     reader.linkToWriter(writer);
221 
222     long testStartTime = System.currentTimeMillis();
223     writer.start(startKey, endKey, writeThreads);
224     reader.start(startKey, endKey, readThreads);
225     writer.waitForFinish();
226     reader.waitForFinish();
227     // reader.waitForVerification(300000);
228     // reader.abortAndWaitForFinish();
229     status("Readers and writers stopped for test " + description);
230 
231     boolean success = writer.getNumWriteFailures() == 0;
232     if (!success) {
233       LOG.error("Write failed");
234     } else {
235       success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
236       if (!success) {
237         LOG.error("Read failed");
238       }
239     }
240 
241     // Dump perf regardless of the result.
242     /*StringBuilder perfDump = new StringBuilder();
243     for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) {
244       perfDump.append(String.format(
245           "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond()));
246     }
247     if (dumpTimePerf) {
248       Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries();
249       while (timePerf.hasNext()) {
250         Triple<Long, Double, Long> pt = timePerf.next();
251         perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n",
252             description, pt.getFirst(), pt.getThird(), pt.getSecond()));
253       }
254     }
255     LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/
256     status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
257     Assert.assertTrue(success);
258   }
259 
260   private static void status(String s) {
261     LOG.info("STATUS " + s);
262     System.out.println(s);
263   }
264 
265   private HTableDescriptor createHtd(boolean isStripe) throws Exception {
266     HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
267     htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
268     String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName();
269     htd.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy);
270     if (isStripe) {
271       htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
272       if (initialStripeCount != null) {
273         htd.setConfiguration(
274             StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString());
275         htd.setConfiguration(
276             HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount));
277       } else {
278         htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "500");
279       }
280       if (splitSize != null) {
281         htd.setConfiguration(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString());
282       }
283       if (splitParts != null) {
284         htd.setConfiguration(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString());
285       }
286     } else {
287       htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "10"); // default
288     }
289     return htd;
290   }
291 
292   protected void createTable(HTableDescriptor htd) throws Exception {
293     deleteTable();
294     if (util.getHBaseClusterInterface() instanceof MiniHBaseCluster) {
295       LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low.");
296       htd.setConfiguration(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576");
297     }
298     byte[][] splits = new RegionSplitter.HexStringSplit().split(
299         util.getHBaseClusterInterface().getClusterStatus().getServersSize());
300     util.getHBaseAdmin().createTable(htd, splits);
301   }
302 
303   public static class SeqShardedDataGenerator extends LoadTestDataGenerator {
304     private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") };
305     private static final int PAD_TO = 10;
306     private static final int PREFIX_PAD_TO = 7;
307 
308     private final int numPartitions;
309 
310     public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) {
311       super(minValueSize, maxValueSize);
312       this.numPartitions = numPartitions;
313     }
314 
315     @Override
316     public byte[] getDeterministicUniqueKey(long keyBase) {
317       String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0");
318       return Bytes.toBytes(getPrefix(keyBase) + num);
319     }
320 
321     private String getPrefix(long i) {
322       return StringUtils.leftPad(String.valueOf((int)(i % numPartitions)), PREFIX_PAD_TO, "0");
323     }
324 
325     @Override
326     public byte[][] getColumnFamilies() {
327       return new byte[][] { COLUMN_FAMILY };
328     }
329 
330     @Override
331     public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
332       return COLUMN_NAMES;
333     }
334 
335     @Override
336     public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
337       return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
338     }
339 
340     @Override
341     public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
342       return LoadTestKVGenerator.verify(value, rowKey, cf, column);
343     }
344 
345     @Override
346     public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
347       return true;
348     }
349   };
350 }