View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import com.google.common.base.Objects;
22  import com.google.common.collect.Sets;
23  import com.yammer.metrics.core.Histogram;
24  import org.apache.commons.cli.CommandLine;
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
29  import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
30  import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
31  import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
32  import org.apache.hadoop.hbase.chaos.policies.Policy;
33  import org.apache.hadoop.hbase.client.Admin;
34  import org.apache.hadoop.hbase.ipc.RpcClient;
35  import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
36  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.util.YammerHistogramUtils;
39  import org.apache.hadoop.mapreduce.Counters;
40  import org.apache.hadoop.mapreduce.Job;
41  import org.apache.hadoop.util.ToolRunner;
42  import org.junit.experimental.categories.Category;
43  
44  import java.util.*;
45  import java.util.concurrent.Callable;
46  
47  import static java.lang.String.format;
48  import static org.junit.Assert.assertEquals;
49  import static org.junit.Assert.assertNotNull;
50  import static org.junit.Assert.assertTrue;
51  
52  /**
53   * Test for comparing the performance impact of region replicas. Uses
54   * components of {@link PerformanceEvaluation}. Does not run from
55   * {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible
56   * with the JUnit runner. Hence no @Test annotations either. See {@code -help}
57   * for full list of options.
58   */
59  @Category(IntegrationTests.class)
60  public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
61  
62    private static final Log LOG = LogFactory.getLog(IntegrationTestRegionReplicaPerf.class);
63  
64    private static final String SLEEP_TIME_KEY = "sleeptime";
65    // short default interval because tests don't run very long.
66    private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
67    private static final String TABLE_NAME_KEY = "tableName";
68    private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
69    private static final String REPLICA_COUNT_KEY = "replicas";
70    private static final String REPLICA_COUNT_DEFAULT = "" + 3;
71    private static final String PRIMARY_TIMEOUT_KEY = "timeout";
72    private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms
73    private static final String NUM_RS_KEY = "numRs";
74    private static final String NUM_RS_DEFAULT = "" + 3;
75  
76    /** Extract a descriptive statistic from a {@link com.yammer.metrics.core.Histogram}. */
77    private enum Stat {
78      STDEV {
79        @Override
80        double apply(Histogram hist) {
81          return hist.stdDev();
82        }
83      },
84      FOUR_9S {
85        @Override
86        double apply(Histogram hist) {
87          return hist.getSnapshot().getValue(0.9999);
88        }
89      };
90  
91      abstract double apply(Histogram hist);
92    }
93  
94    private TableName tableName;
95    private long sleepTime;
96    private int replicaCount;
97    private int primaryTimeout;
98    private int clusterSize;
99  
100   /**
101    * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
102    */
103   static class PerfEvalCallable implements Callable<TimingResult> {
104     private final Queue<String> argv = new LinkedList<String>();
105     private final Admin admin;
106 
107     public PerfEvalCallable(Admin admin, String argv) {
108       // TODO: this API is awkward, should take HConnection, not HBaseAdmin
109       this.admin = admin;
110       this.argv.addAll(Arrays.asList(argv.split(" ")));
111       LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
112     }
113 
114     @Override
115     public TimingResult call() throws Exception {
116       PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
117       PerformanceEvaluation.checkTable(admin, opts);
118       PerformanceEvaluation.RunResult results[] = null;
119       long numRows = opts.totalRows;
120       long elapsedTime = 0;
121       if (opts.nomapred) {
122         results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
123         for (PerformanceEvaluation.RunResult r : results) {
124           elapsedTime = Math.max(elapsedTime, r.duration);
125         }
126       } else {
127         Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
128         Counters counters = job.getCounters();
129         numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
130         elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
131       }
132       return new TimingResult(numRows, elapsedTime, results);
133     }
134   }
135 
136   /**
137    * Record the results from a single {@link PerformanceEvaluation} job run.
138    */
139   static class TimingResult {
140     public final long numRows;
141     public final long elapsedTime;
142     public final PerformanceEvaluation.RunResult results[];
143 
144     public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) {
145       this.numRows = numRows;
146       this.elapsedTime = elapsedTime;
147       this.results = results;
148     }
149 
150     @Override
151     public String toString() {
152       return Objects.toStringHelper(this)
153         .add("numRows", numRows)
154         .add("elapsedTime", elapsedTime)
155         .toString();
156     }
157   }
158 
159   @Override
160   public void setUp() throws Exception {
161     super.setUp();
162     Configuration conf = util.getConfiguration();
163 
164     // sanity check cluster
165     // TODO: this should reach out to master and verify online state instead
166     assertEquals("Master must be configured with StochasticLoadBalancer",
167       "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
168       conf.get("hbase.master.loadbalancer.class"));
169     // TODO: this should reach out to master and verify online state instead
170     assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
171       conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
172 
173     // enable client-side settings
174     conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true);
175     // TODO: expose these settings to CLI override
176     conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
177     conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
178   }
179 
180   @Override
181   public void setUpCluster() throws Exception {
182     util = getTestingUtil(getConf());
183     util.initializeCluster(clusterSize);
184   }
185 
186   @Override
187   public void setUpMonkey() throws Exception {
188     Policy p = new PeriodicRandomActionPolicy(sleepTime,
189       new RestartRandomRsExceptMetaAction(sleepTime),
190       new MoveRandomRegionOfTableAction(tableName));
191     this.monkey = new PolicyBasedChaosMonkey(util, p);
192     // don't start monkey right away
193   }
194 
195   @Override
196   protected void addOptions() {
197     addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '"
198       + TABLE_NAME_DEFAULT + "'");
199     addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: "
200       + SLEEP_TIME_DEFAULT);
201     addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: "
202       + REPLICA_COUNT_DEFAULT);
203     addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: "
204       + PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
205     addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: "
206         + NUM_RS_DEFAULT);
207   }
208 
209   @Override
210   protected void processOptions(CommandLine cmd) {
211     tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
212     sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
213     replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
214     primaryTimeout =
215       Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
216     clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
217     LOG.debug(Objects.toStringHelper("Parsed Options")
218       .add(TABLE_NAME_KEY, tableName)
219       .add(SLEEP_TIME_KEY, sleepTime)
220       .add(REPLICA_COUNT_KEY, replicaCount)
221       .add(PRIMARY_TIMEOUT_KEY, primaryTimeout)
222       .add(NUM_RS_KEY, clusterSize)
223       .toString());
224   }
225 
226   @Override
227   public int runTestFromCommandLine() throws Exception {
228     test();
229     return 0;
230   }
231 
232   @Override
233   public TableName getTablename() {
234     return tableName;
235   }
236 
237   @Override
238   protected Set<String> getColumnFamilies() {
239     return Sets.newHashSet(Bytes.toString(PerformanceEvaluation.FAMILY_NAME));
240   }
241 
242   /** Compute the mean of the given {@code stat} from a timing results. */
243   private static double calcMean(String desc, Stat stat, List<TimingResult> results) {
244     double sum = 0;
245     int count = 0;
246 
247     for (TimingResult tr : results) {
248       for (PerformanceEvaluation.RunResult r : tr.results) {
249         assertNotNull("One of the run results is missing detailed run data.", r.hist);
250         sum += stat.apply(r.hist);
251         count += 1;
252         LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}");
253       }
254     }
255     return sum / count;
256   }
257 
258   public void test() throws Exception {
259     int maxIters = 3;
260     String replicas = "--replicas=" + replicaCount;
261     // TODO: splits disabled until "phase 2" is complete.
262     String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
263     String writeOpts = format("%s --nomapred --table=%s --presplit=16 sequentialWrite 4",
264       splitPolicy, tableName);
265     String readOpts =
266       format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName);
267     String replicaReadOpts = format("%s %s", replicas, readOpts);
268 
269     ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<TimingResult>(maxIters);
270     ArrayList<TimingResult> resultsWithReplicas = new ArrayList<TimingResult>(maxIters);
271 
272     // create/populate the table, replicas disabled
273     LOG.debug("Populating table.");
274     new PerfEvalCallable(util.getHBaseAdmin(), writeOpts).call();
275 
276     // one last sanity check, then send in the clowns!
277     assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
278         DisabledRegionSplitPolicy.class.getName(),
279         util.getHBaseAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
280     startMonkey();
281 
282     // collect a baseline without region replicas.
283     for (int i = 0; i < maxIters; i++) {
284       LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
285       resultsWithoutReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call());
286       // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
287       Thread.sleep(5000l);
288     }
289 
290     // disable monkey, enable region replicas, enable monkey
291     cleanUpMonkey("Altering table.");
292     LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
293     IntegrationTestingUtility.setReplicas(util.getHBaseAdmin(), tableName, replicaCount);
294     setUpMonkey();
295     startMonkey();
296 
297     // run test with region replicas.
298     for (int i = 0; i < maxIters; i++) {
299       LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
300       resultsWithReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call());
301       // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
302       Thread.sleep(5000l);
303     }
304 
305     // compare the average of the stdev and 99.99pct across runs to determine if region replicas
306     // are having an overall improvement on response variance experienced by clients.
307     double withoutReplicasStdevMean =
308         calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas);
309     double withoutReplicas9999Mean =
310         calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas);
311     double withReplicasStdevMean =
312         calcMean("withReplicas", Stat.STDEV, resultsWithReplicas);
313     double withReplicas9999Mean =
314         calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas);
315 
316     LOG.info(Objects.toStringHelper(this)
317       .add("withoutReplicas", resultsWithoutReplicas)
318       .add("withReplicas", resultsWithReplicas)
319       .add("withoutReplicasStdevMean", withoutReplicasStdevMean)
320       .add("withoutReplicas99.99Mean", withoutReplicas9999Mean)
321       .add("withReplicasStdevMean", withReplicasStdevMean)
322       .add("withReplicas99.99Mean", withReplicas9999Mean)
323       .toString());
324 
325     assertTrue(
326       "Running with region replicas under chaos should have less request variance than without. "
327       + "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms "
328       + "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.",
329       withReplicasStdevMean <= withoutReplicasStdevMean);
330     assertTrue(
331         "Running with region replicas under chaos should improve 99.99pct latency. "
332             + "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms "
333             + "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.",
334         withReplicas9999Mean <= withoutReplicas9999Mean);
335   }
336 
337   public static void main(String[] args) throws Exception {
338     Configuration conf = HBaseConfiguration.create();
339     IntegrationTestingUtility.setUseDistributedCluster(conf);
340     int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
341     System.exit(status);
342   }
343 }