View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import java.util.Random;
24  import java.util.concurrent.atomic.AtomicLong;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
30  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
31  import org.apache.hadoop.hbase.client.Get;
32  import org.apache.hadoop.hbase.client.HBaseAdmin;
33  import org.apache.hadoop.hbase.client.HTable;
34  import org.apache.hadoop.hbase.client.Put;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.ResultScanner;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.util.StringUtils;
41  import org.apache.hadoop.util.Tool;
42  import org.apache.hadoop.util.ToolRunner;
43  import org.junit.Test;
44  import org.junit.experimental.categories.Category;
45  
46  import com.google.common.collect.Lists;
47  
48  /**
49   * Test case that uses multiple threads to read and write multifamily rows
50   * into a table, verifying that reads never see partially-complete writes.
51   *
52   * This can run as a junit test, or with a main() function which runs against
53   * a real cluster (eg for testing with failures, region movement, etc)
54   */
55  @Category(MediumTests.class)
56  public class TestAcidGuarantees implements Tool {
57    protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
58    public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees");
59    public static final byte [] FAMILY_A = Bytes.toBytes("A");
60    public static final byte [] FAMILY_B = Bytes.toBytes("B");
61    public static final byte [] FAMILY_C = Bytes.toBytes("C");
62    public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
63  
64    public static final byte[][] FAMILIES = new byte[][] {
65      FAMILY_A, FAMILY_B, FAMILY_C };
66  
67    private HBaseTestingUtility util;
68  
69    public static int NUM_COLS_TO_CHECK = 50;
70  
71    // when run as main
72    private Configuration conf;
73  
74    private void createTableIfMissing()
75      throws IOException {
76      try {
77        util.createTable(TABLE_NAME, FAMILIES);
78      } catch (TableExistsException tee) {
79      }
80    }
81  
82    public TestAcidGuarantees() {
83      // Set small flush size for minicluster so we exercise reseeking scanners
84      Configuration conf = HBaseConfiguration.create();
85      conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
86      // prevent aggressive region split
87      conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
88        ConstantSizeRegionSplitPolicy.class.getName());
89      util = new HBaseTestingUtility(conf);
90    }
91  
92    /**
93     * Thread that does random full-row writes into a table.
94     */
95    public static class AtomicityWriter extends RepeatingTestThread {
96      Random rand = new Random();
97      byte data[] = new byte[10];
98      byte targetRows[][];
99      byte targetFamilies[][];
100     HTable table;
101     AtomicLong numWritten = new AtomicLong();
102 
103     public AtomicityWriter(TestContext ctx, byte targetRows[][],
104                            byte targetFamilies[][]) throws IOException {
105       super(ctx);
106       this.targetRows = targetRows;
107       this.targetFamilies = targetFamilies;
108       table = new HTable(ctx.getConf(), TABLE_NAME);
109     }
110     public void doAnAction() throws Exception {
111       // Pick a random row to write into
112       byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
113       Put p = new Put(targetRow);
114       rand.nextBytes(data);
115 
116       for (byte[] family : targetFamilies) {
117         for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
118           byte qualifier[] = Bytes.toBytes("col" + i);
119           p.add(family, qualifier, data);
120         }
121       }
122       table.put(p);
123       numWritten.getAndIncrement();
124     }
125   }
126 
127   /**
128    * Thread that does single-row reads in a table, looking for partially
129    * completed rows.
130    */
131   public static class AtomicGetReader extends RepeatingTestThread {
132     byte targetRow[];
133     byte targetFamilies[][];
134     HTable table;
135     int numVerified = 0;
136     AtomicLong numRead = new AtomicLong();
137 
138     public AtomicGetReader(TestContext ctx, byte targetRow[],
139                            byte targetFamilies[][]) throws IOException {
140       super(ctx);
141       this.targetRow = targetRow;
142       this.targetFamilies = targetFamilies;
143       table = new HTable(ctx.getConf(), TABLE_NAME);
144     }
145 
146     public void doAnAction() throws Exception {
147       Get g = new Get(targetRow);
148       Result res = table.get(g);
149       byte[] gotValue = null;
150       if (res.getRow() == null) {
151         // Trying to verify but we didn't find the row - the writing
152         // thread probably just hasn't started writing yet, so we can
153         // ignore this action
154         return;
155       }
156 
157       for (byte[] family : targetFamilies) {
158         for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
159           byte qualifier[] = Bytes.toBytes("col" + i);
160           byte thisValue[] = res.getValue(family, qualifier);
161           if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
162             gotFailure(gotValue, res);
163           }
164           numVerified++;
165           gotValue = thisValue;
166         }
167       }
168       numRead.getAndIncrement();
169     }
170 
171     private void gotFailure(byte[] expected, Result res) {
172       StringBuilder msg = new StringBuilder();
173       msg.append("Failed after ").append(numVerified).append("!");
174       msg.append("Expected=").append(Bytes.toStringBinary(expected));
175       msg.append("Got:\n");
176       for (Cell kv : res.listCells()) {
177         msg.append(kv.toString());
178         msg.append(" val= ");
179         msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
180         msg.append("\n");
181       }
182       throw new RuntimeException(msg.toString());
183     }
184   }
185 
186   /**
187    * Thread that does full scans of the table looking for any partially completed
188    * rows.
189    */
190   public static class AtomicScanReader extends RepeatingTestThread {
191     byte targetFamilies[][];
192     HTable table;
193     AtomicLong numScans = new AtomicLong();
194     AtomicLong numRowsScanned = new AtomicLong();
195 
196     public AtomicScanReader(TestContext ctx,
197                            byte targetFamilies[][]) throws IOException {
198       super(ctx);
199       this.targetFamilies = targetFamilies;
200       table = new HTable(ctx.getConf(), TABLE_NAME);
201     }
202 
203     public void doAnAction() throws Exception {
204       Scan s = new Scan();
205       for (byte[] family : targetFamilies) {
206         s.addFamily(family);
207       }
208       ResultScanner scanner = table.getScanner(s);
209 
210       for (Result res : scanner) {
211         byte[] gotValue = null;
212 
213         for (byte[] family : targetFamilies) {
214           for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
215             byte qualifier[] = Bytes.toBytes("col" + i);
216             byte thisValue[] = res.getValue(family, qualifier);
217             if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
218               gotFailure(gotValue, res);
219             }
220             gotValue = thisValue;
221           }
222         }
223         numRowsScanned.getAndIncrement();
224       }
225       numScans.getAndIncrement();
226     }
227 
228     private void gotFailure(byte[] expected, Result res) {
229       StringBuilder msg = new StringBuilder();
230       msg.append("Failed after ").append(numRowsScanned).append("!");
231       msg.append("Expected=").append(Bytes.toStringBinary(expected));
232       msg.append("Got:\n");
233       for (Cell kv : res.listCells()) {
234         msg.append(kv.toString());
235         msg.append(" val= ");
236         msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
237         msg.append("\n");
238       }
239       throw new RuntimeException(msg.toString());
240     }
241   }
242 
243   public void runTestAtomicity(long millisToRun,
244       int numWriters,
245       int numGetters,
246       int numScanners,
247       int numUniqueRows) throws Exception {
248     runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
249   }
250 
251   public void runTestAtomicity(long millisToRun,
252       int numWriters,
253       int numGetters,
254       int numScanners,
255       int numUniqueRows,
256       final boolean systemTest) throws Exception {
257     createTableIfMissing();
258     TestContext ctx = new TestContext(util.getConfiguration());
259 
260     byte rows[][] = new byte[numUniqueRows][];
261     for (int i = 0; i < numUniqueRows; i++) {
262       rows[i] = Bytes.toBytes("test_row_" + i);
263     }
264 
265     List<AtomicityWriter> writers = Lists.newArrayList();
266     for (int i = 0; i < numWriters; i++) {
267       AtomicityWriter writer = new AtomicityWriter(
268           ctx, rows, FAMILIES);
269       writers.add(writer);
270       ctx.addThread(writer);
271     }
272     // Add a flusher
273     ctx.addThread(new RepeatingTestThread(ctx) {
274       HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
275       public void doAnAction() throws Exception {
276         try {
277           admin.flush(TABLE_NAME);
278         } catch(IOException ioe) {
279           LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
280         }
281         // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
282         // we would flush as often as possible.  On a running cluster, this isn't practical:
283         // (1) we will cause a lot of load due to all the flushing and compacting
284         // (2) we cannot change the flushing/compacting related Configuration options to try to
285         // alleviate this
286         // (3) it is an unrealistic workload, since no one would actually flush that often.
287         // Therefore, let's flush every minute to have more flushes than usual, but not overload
288         // the running cluster.
289         if (systemTest) Thread.sleep(60000);
290       }
291     });
292 
293     List<AtomicGetReader> getters = Lists.newArrayList();
294     for (int i = 0; i < numGetters; i++) {
295       AtomicGetReader getter = new AtomicGetReader(
296           ctx, rows[i % numUniqueRows], FAMILIES);
297       getters.add(getter);
298       ctx.addThread(getter);
299     }
300 
301     List<AtomicScanReader> scanners = Lists.newArrayList();
302     for (int i = 0; i < numScanners; i++) {
303       AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
304       scanners.add(scanner);
305       ctx.addThread(scanner);
306     }
307 
308     ctx.startThreads();
309     ctx.waitFor(millisToRun);
310     ctx.stop();
311 
312     LOG.info("Finished test. Writers:");
313     for (AtomicityWriter writer : writers) {
314       LOG.info("  wrote " + writer.numWritten.get());
315     }
316     LOG.info("Readers:");
317     for (AtomicGetReader reader : getters) {
318       LOG.info("  read " + reader.numRead.get());
319     }
320     LOG.info("Scanners:");
321     for (AtomicScanReader scanner : scanners) {
322       LOG.info("  scanned " + scanner.numScans.get());
323       LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
324     }
325   }
326 
327   @Test
328   public void testGetAtomicity() throws Exception {
329     util.startMiniCluster(1);
330     try {
331       runTestAtomicity(20000, 5, 5, 0, 3);
332     } finally {
333       util.shutdownMiniCluster();
334     }
335   }
336 
337   @Test
338   public void testScanAtomicity() throws Exception {
339     util.startMiniCluster(1);
340     try {
341       runTestAtomicity(20000, 5, 0, 5, 3);
342     } finally {
343       util.shutdownMiniCluster();
344     }
345   }
346 
347   @Test
348   public void testMixedAtomicity() throws Exception {
349     util.startMiniCluster(1);
350     try {
351       runTestAtomicity(20000, 5, 2, 2, 3);
352     } finally {
353       util.shutdownMiniCluster();
354     }
355   }
356 
357   ////////////////////////////////////////////////////////////////////////////
358   // Tool interface
359   ////////////////////////////////////////////////////////////////////////////
360   @Override
361   public Configuration getConf() {
362     return conf;
363   }
364 
365   @Override
366   public void setConf(Configuration c) {
367     this.conf = c;
368     this.util = new HBaseTestingUtility(c);
369   }
370 
371   @Override
372   public int run(String[] arg0) throws Exception {
373     Configuration c = getConf();
374     int millis = c.getInt("millis", 5000);
375     int numWriters = c.getInt("numWriters", 50);
376     int numGetters = c.getInt("numGetters", 2);
377     int numScanners = c.getInt("numScanners", 2);
378     int numUniqueRows = c.getInt("numUniqueRows", 3);
379     runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true);
380     return 0;
381   }
382 
383   public static void main(String args[]) throws Exception {
384     Configuration c = HBaseConfiguration.create();
385     int status;
386     try {
387       TestAcidGuarantees test = new TestAcidGuarantees();
388       status = ToolRunner.run(c, test, args);
389     } catch (Exception e) {
390       LOG.error("Exiting due to error", e);
391       status = -1;
392     }
393     System.exit(status);
394   }
395 
396 
397 }
398