View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.util;
18  
19  import java.io.IOException;
20  import java.util.HashSet;
21  import java.util.Set;
22  import java.util.concurrent.atomic.AtomicLong;
23  
24  import org.apache.commons.lang.math.RandomUtils;
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.HRegionLocation;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.client.Get;
31  import org.apache.hadoop.hbase.client.HTable;
32  import org.apache.hadoop.hbase.client.Result;
33  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
34  
35  /** Creates multiple threads that read and verify previously written data */
36  public class MultiThreadedReader extends MultiThreadedAction
37  {
38    private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
39  
40    protected Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
41    private final double verifyPercent;
42    private volatile boolean aborted;
43  
44    protected MultiThreadedWriterBase writer = null;
45  
46    /**
47     * The number of keys verified in a sequence. This will never be larger than
48     * the total number of keys in the range. The reader might also verify
49     * random keys when it catches up with the writer.
50     */
51    private final AtomicLong numUniqueKeysVerified = new AtomicLong();
52  
53    /**
54     * Default maximum number of read errors to tolerate before shutting down all
55     * readers.
56     */
57    public static final int DEFAULT_MAX_ERRORS = 10;
58  
59    /**
60     * Default "window" size between the last key written by the writer and the
61     * key that we attempt to read. The lower this number, the stricter our
62     * testing is. If this is zero, we always attempt to read the highest key
63     * in the contiguous sequence of keys written by the writers.
64     */
65    public static final int DEFAULT_KEY_WINDOW = 0;
66  
67    protected AtomicLong numKeysVerified = new AtomicLong(0);
68    protected AtomicLong numReadErrors = new AtomicLong(0);
69    protected AtomicLong numReadFailures = new AtomicLong(0);
70    protected AtomicLong nullResult = new AtomicLong(0);
71  
72    private int maxErrors = DEFAULT_MAX_ERRORS;
73    private int keyWindow = DEFAULT_KEY_WINDOW;
74  
75    public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
76        TableName tableName, double verifyPercent) {
77      super(dataGen, conf, tableName, "R");
78      this.verifyPercent = verifyPercent;
79    }
80  
81    public void linkToWriter(MultiThreadedWriterBase writer) {
82      this.writer = writer;
83      writer.setTrackWroteKeys(true);
84    }
85  
86    public void setMaxErrors(int maxErrors) {
87      this.maxErrors = maxErrors;
88    }
89  
90    public void setKeyWindow(int keyWindow) {
91      this.keyWindow = keyWindow;
92    }
93  
94    @Override
95    public void start(long startKey, long endKey, int numThreads) throws IOException {
96      super.start(startKey, endKey, numThreads);
97      if (verbose) {
98        LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
99      }
100 
101     addReaderThreads(numThreads);
102     startThreads(readers);
103   }
104 
105   protected void addReaderThreads(int numThreads) throws IOException {
106     for (int i = 0; i < numThreads; ++i) {
107       HBaseReaderThread reader = new HBaseReaderThread(i);
108       readers.add(reader);
109     }
110   }
111 
112   public class HBaseReaderThread extends Thread {
113     protected final int readerId;
114     protected final HTable table;
115 
116     /** The "current" key being read. Increases from startKey to endKey. */
117     private long curKey;
118 
119     /** Time when the thread started */
120     protected long startTimeMs;
121 
122     /** If we are ahead of the writer and reading a random key. */
123     private boolean readingRandomKey;
124 
125     /**
126      * @param readerId only the keys with this remainder from division by
127      *          {@link #numThreads} will be read by this thread
128      */
129     public HBaseReaderThread(int readerId) throws IOException {
130       this.readerId = readerId;
131       table = createTable();
132       setName(getClass().getSimpleName() + "_" + readerId);
133     }
134 
135     protected HTable createTable() throws IOException {
136       return new HTable(conf, tableName);
137     }
138 
139     @Override
140     public void run() {
141       try {
142         runReader();
143       } finally {
144         closeTable();
145         numThreadsWorking.decrementAndGet();
146       }
147     }
148 
149     protected void closeTable() {
150       try {
151         if (table != null) {
152           table.close();
153         }
154       } catch (IOException e) {
155         LOG.error("Error closing table", e);
156       }
157     }
158 
159     private void runReader() {
160       if (verbose) {
161         LOG.info("Started thread #" + readerId + " for reads...");
162       }
163 
164       startTimeMs = System.currentTimeMillis();
165       curKey = startKey;
166       while (curKey < endKey && !aborted) {
167         long k = getNextKeyToRead();
168 
169         // A sanity check for the key range.
170         if (k < startKey || k >= endKey) {
171           numReadErrors.incrementAndGet();
172           throw new AssertionError("Load tester logic error: proposed key " +
173               "to read " + k + " is out of range (startKey=" + startKey +
174               ", endKey=" + endKey + ")");
175         }
176 
177         if (k % numThreads != readerId ||
178             writer != null && writer.failedToWriteKey(k)) {
179           // Skip keys that this thread should not read, as well as the keys
180           // that we know the writer failed to write.
181           continue;
182         }
183 
184         readKey(k);
185         if (k == curKey - 1 && !readingRandomKey) {
186           // We have verified another unique key.
187           numUniqueKeysVerified.incrementAndGet();
188         }
189       }
190     }
191 
192     /**
193      * Should only be used for the concurrent writer/reader workload. The
194      * maximum key we are allowed to read, subject to the "key window"
195      * constraint.
196      */
197     private long maxKeyWeCanRead() {
198       long insertedUpToKey = writer.wroteUpToKey();
199       if (insertedUpToKey >= endKey - 1) {
200         // The writer has finished writing our range, so we can read any
201         // key in the range.
202         return endKey - 1;
203       }
204       return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
205     }
206 
207     private long getNextKeyToRead() {
208       readingRandomKey = false;
209       if (writer == null || curKey <= maxKeyWeCanRead()) {
210         return curKey++;
211       }
212 
213       // We caught up with the writer. See if we can read any keys at all.
214       long maxKeyToRead;
215       while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
216         // The writer has not written sufficient keys for us to be able to read
217         // anything at all. Sleep a bit. This should only happen in the
218         // beginning of a load test run.
219         Threads.sleepWithoutInterrupt(50);
220       }
221 
222       if (curKey <= maxKeyToRead) {
223         // The writer wrote some keys, and we are now allowed to read our
224         // current key.
225         return curKey++;
226       }
227 
228       // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
229       // Don't increment the current key -- we still have to try reading it
230       // later. Set a flag to make sure that we don't count this key towards
231       // the set of unique keys we have verified.
232       readingRandomKey = true;
233       return startKey + Math.abs(RandomUtils.nextLong())
234           % (maxKeyToRead - startKey + 1);
235     }
236 
237     private Get readKey(long keyToRead) {
238       Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
239       String cfsString = "";
240       byte[][] columnFamilies = dataGenerator.getColumnFamilies();
241       for (byte[] cf : columnFamilies) {
242         get.addFamily(cf);
243         if (verbose) {
244           if (cfsString.length() > 0) {
245             cfsString += ", ";
246           }
247           cfsString += "[" + Bytes.toStringBinary(cf) + "]";
248         }
249       }
250 
251       try {
252         get = dataGenerator.beforeGet(keyToRead, get);
253         if (verbose) {
254           LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
255         }
256         queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead);
257       } catch (IOException e) {
258         numReadFailures.addAndGet(1);
259         LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
260             + ", time from start: "
261             + (System.currentTimeMillis() - startTimeMs) + " ms");
262       }
263       return get;
264     }
265 
266     public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
267       String rowKey = Bytes.toString(get.getRow());
268 
269       // read the data
270       long start = System.currentTimeMillis();
271       Result result = table.get(get);
272       getResultMetricUpdation(verify, rowKey, start, result, table, false);
273     }
274 
275     protected void getResultMetricUpdation(boolean verify, String rowKey, long start,
276         Result result, HTable table, boolean isNullExpected)
277         throws IOException {
278       totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
279       numKeys.addAndGet(1);
280       if (!result.isEmpty()) {
281         if (verify) {
282           numKeysVerified.incrementAndGet();
283         }
284       } else {
285          HRegionLocation hloc = table.getRegionLocation(
286              Bytes.toBytes(rowKey));
287         LOG.info("Key = " + rowKey + ", RegionServer: "
288             + hloc.getHostname());
289         if(isNullExpected) {
290           nullResult.incrementAndGet();
291           LOG.debug("Null result obtained for the key ="+rowKey);
292           return;
293         }
294       }
295       boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
296       long numErrorsAfterThis = 0;
297       if (isOk) {
298         long cols = 0;
299         // Count the columns for reporting purposes.
300         for (byte[] cf : result.getMap().keySet()) {
301           cols += result.getFamilyMap(cf).size();
302         }
303         numCols.addAndGet(cols);
304       } else {
305         if (writer != null) {
306           LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
307         }
308         numErrorsAfterThis = numReadErrors.incrementAndGet();
309       }
310 
311       if (numErrorsAfterThis > maxErrors) {
312         LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
313         aborted = true;
314       }
315     }
316   }
317 
318   public long getNumReadFailures() {
319     return numReadFailures.get();
320   }
321 
322   public long getNumReadErrors() {
323     return numReadErrors.get();
324   }
325 
326   public long getNumKeysVerified() {
327     return numKeysVerified.get();
328   }
329 
330   public long getNumUniqueKeysVerified() {
331     return numUniqueKeysVerified.get();
332   }
333 
334   public long getNullResultsCount() {
335     return nullResult.get();
336   }
337 
338   @Override
339   protected String progressInfo() {
340     StringBuilder sb = new StringBuilder();
341     appendToStatus(sb, "verified", numKeysVerified.get());
342     appendToStatus(sb, "READ FAILURES", numReadFailures.get());
343     appendToStatus(sb, "READ ERRORS", numReadErrors.get());
344     appendToStatus(sb, "NULL RESULT", nullResult.get());
345     return sb.toString();
346   }
347 }