1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.Random;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
30 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
31 import org.apache.hadoop.hbase.client.Get;
32 import org.apache.hadoop.hbase.client.HBaseAdmin;
33 import org.apache.hadoop.hbase.client.HTable;
34 import org.apache.hadoop.hbase.client.Put;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.ResultScanner;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.util.StringUtils;
41 import org.apache.hadoop.util.Tool;
42 import org.apache.hadoop.util.ToolRunner;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45
46 import com.google.common.collect.Lists;
47
48
49
50
51
52
53
54
55 @Category(MediumTests.class)
56 public class TestAcidGuarantees implements Tool {
57 protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
58 public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees");
59 public static final byte [] FAMILY_A = Bytes.toBytes("A");
60 public static final byte [] FAMILY_B = Bytes.toBytes("B");
61 public static final byte [] FAMILY_C = Bytes.toBytes("C");
62 public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
63
64 public static final byte[][] FAMILIES = new byte[][] {
65 FAMILY_A, FAMILY_B, FAMILY_C };
66
67 private HBaseTestingUtility util;
68
69 public static int NUM_COLS_TO_CHECK = 50;
70
71
72 private Configuration conf;
73
74 private void createTableIfMissing()
75 throws IOException {
76 try {
77 util.createTable(TABLE_NAME, FAMILIES);
78 } catch (TableExistsException tee) {
79 }
80 }
81
82 public TestAcidGuarantees() {
83
84 Configuration conf = HBaseConfiguration.create();
85 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
86
87 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
88 ConstantSizeRegionSplitPolicy.class.getName());
89 util = new HBaseTestingUtility(conf);
90 }
91
92
93
94
95 public static class AtomicityWriter extends RepeatingTestThread {
96 Random rand = new Random();
97 byte data[] = new byte[10];
98 byte targetRows[][];
99 byte targetFamilies[][];
100 HTable table;
101 AtomicLong numWritten = new AtomicLong();
102
103 public AtomicityWriter(TestContext ctx, byte targetRows[][],
104 byte targetFamilies[][]) throws IOException {
105 super(ctx);
106 this.targetRows = targetRows;
107 this.targetFamilies = targetFamilies;
108 table = new HTable(ctx.getConf(), TABLE_NAME);
109 }
110 public void doAnAction() throws Exception {
111
112 byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
113 Put p = new Put(targetRow);
114 rand.nextBytes(data);
115
116 for (byte[] family : targetFamilies) {
117 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
118 byte qualifier[] = Bytes.toBytes("col" + i);
119 p.add(family, qualifier, data);
120 }
121 }
122 table.put(p);
123 numWritten.getAndIncrement();
124 }
125 }
126
127
128
129
130
131 public static class AtomicGetReader extends RepeatingTestThread {
132 byte targetRow[];
133 byte targetFamilies[][];
134 HTable table;
135 int numVerified = 0;
136 AtomicLong numRead = new AtomicLong();
137
138 public AtomicGetReader(TestContext ctx, byte targetRow[],
139 byte targetFamilies[][]) throws IOException {
140 super(ctx);
141 this.targetRow = targetRow;
142 this.targetFamilies = targetFamilies;
143 table = new HTable(ctx.getConf(), TABLE_NAME);
144 }
145
146 public void doAnAction() throws Exception {
147 Get g = new Get(targetRow);
148 Result res = table.get(g);
149 byte[] gotValue = null;
150 if (res.getRow() == null) {
151
152
153
154 return;
155 }
156
157 for (byte[] family : targetFamilies) {
158 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
159 byte qualifier[] = Bytes.toBytes("col" + i);
160 byte thisValue[] = res.getValue(family, qualifier);
161 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
162 gotFailure(gotValue, res);
163 }
164 numVerified++;
165 gotValue = thisValue;
166 }
167 }
168 numRead.getAndIncrement();
169 }
170
171 private void gotFailure(byte[] expected, Result res) {
172 StringBuilder msg = new StringBuilder();
173 msg.append("Failed after ").append(numVerified).append("!");
174 msg.append("Expected=").append(Bytes.toStringBinary(expected));
175 msg.append("Got:\n");
176 for (Cell kv : res.listCells()) {
177 msg.append(kv.toString());
178 msg.append(" val= ");
179 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
180 msg.append("\n");
181 }
182 throw new RuntimeException(msg.toString());
183 }
184 }
185
186
187
188
189
190 public static class AtomicScanReader extends RepeatingTestThread {
191 byte targetFamilies[][];
192 HTable table;
193 AtomicLong numScans = new AtomicLong();
194 AtomicLong numRowsScanned = new AtomicLong();
195
196 public AtomicScanReader(TestContext ctx,
197 byte targetFamilies[][]) throws IOException {
198 super(ctx);
199 this.targetFamilies = targetFamilies;
200 table = new HTable(ctx.getConf(), TABLE_NAME);
201 }
202
203 public void doAnAction() throws Exception {
204 Scan s = new Scan();
205 for (byte[] family : targetFamilies) {
206 s.addFamily(family);
207 }
208 ResultScanner scanner = table.getScanner(s);
209
210 for (Result res : scanner) {
211 byte[] gotValue = null;
212
213 for (byte[] family : targetFamilies) {
214 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
215 byte qualifier[] = Bytes.toBytes("col" + i);
216 byte thisValue[] = res.getValue(family, qualifier);
217 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
218 gotFailure(gotValue, res);
219 }
220 gotValue = thisValue;
221 }
222 }
223 numRowsScanned.getAndIncrement();
224 }
225 numScans.getAndIncrement();
226 }
227
228 private void gotFailure(byte[] expected, Result res) {
229 StringBuilder msg = new StringBuilder();
230 msg.append("Failed after ").append(numRowsScanned).append("!");
231 msg.append("Expected=").append(Bytes.toStringBinary(expected));
232 msg.append("Got:\n");
233 for (Cell kv : res.listCells()) {
234 msg.append(kv.toString());
235 msg.append(" val= ");
236 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
237 msg.append("\n");
238 }
239 throw new RuntimeException(msg.toString());
240 }
241 }
242
243 public void runTestAtomicity(long millisToRun,
244 int numWriters,
245 int numGetters,
246 int numScanners,
247 int numUniqueRows) throws Exception {
248 runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
249 }
250
251 public void runTestAtomicity(long millisToRun,
252 int numWriters,
253 int numGetters,
254 int numScanners,
255 int numUniqueRows,
256 final boolean systemTest) throws Exception {
257 createTableIfMissing();
258 TestContext ctx = new TestContext(util.getConfiguration());
259
260 byte rows[][] = new byte[numUniqueRows][];
261 for (int i = 0; i < numUniqueRows; i++) {
262 rows[i] = Bytes.toBytes("test_row_" + i);
263 }
264
265 List<AtomicityWriter> writers = Lists.newArrayList();
266 for (int i = 0; i < numWriters; i++) {
267 AtomicityWriter writer = new AtomicityWriter(
268 ctx, rows, FAMILIES);
269 writers.add(writer);
270 ctx.addThread(writer);
271 }
272
273 ctx.addThread(new RepeatingTestThread(ctx) {
274 HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
275 public void doAnAction() throws Exception {
276 try {
277 admin.flush(TABLE_NAME);
278 } catch(IOException ioe) {
279 LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
280 }
281
282
283
284
285
286
287
288
289 if (systemTest) Thread.sleep(60000);
290 }
291 });
292
293 List<AtomicGetReader> getters = Lists.newArrayList();
294 for (int i = 0; i < numGetters; i++) {
295 AtomicGetReader getter = new AtomicGetReader(
296 ctx, rows[i % numUniqueRows], FAMILIES);
297 getters.add(getter);
298 ctx.addThread(getter);
299 }
300
301 List<AtomicScanReader> scanners = Lists.newArrayList();
302 for (int i = 0; i < numScanners; i++) {
303 AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
304 scanners.add(scanner);
305 ctx.addThread(scanner);
306 }
307
308 ctx.startThreads();
309 ctx.waitFor(millisToRun);
310 ctx.stop();
311
312 LOG.info("Finished test. Writers:");
313 for (AtomicityWriter writer : writers) {
314 LOG.info(" wrote " + writer.numWritten.get());
315 }
316 LOG.info("Readers:");
317 for (AtomicGetReader reader : getters) {
318 LOG.info(" read " + reader.numRead.get());
319 }
320 LOG.info("Scanners:");
321 for (AtomicScanReader scanner : scanners) {
322 LOG.info(" scanned " + scanner.numScans.get());
323 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
324 }
325 }
326
327 @Test
328 public void testGetAtomicity() throws Exception {
329 util.startMiniCluster(1);
330 try {
331 runTestAtomicity(20000, 5, 5, 0, 3);
332 } finally {
333 util.shutdownMiniCluster();
334 }
335 }
336
337 @Test
338 public void testScanAtomicity() throws Exception {
339 util.startMiniCluster(1);
340 try {
341 runTestAtomicity(20000, 5, 0, 5, 3);
342 } finally {
343 util.shutdownMiniCluster();
344 }
345 }
346
347 @Test
348 public void testMixedAtomicity() throws Exception {
349 util.startMiniCluster(1);
350 try {
351 runTestAtomicity(20000, 5, 2, 2, 3);
352 } finally {
353 util.shutdownMiniCluster();
354 }
355 }
356
357
358
359
360 @Override
361 public Configuration getConf() {
362 return conf;
363 }
364
365 @Override
366 public void setConf(Configuration c) {
367 this.conf = c;
368 this.util = new HBaseTestingUtility(c);
369 }
370
371 @Override
372 public int run(String[] arg0) throws Exception {
373 Configuration c = getConf();
374 int millis = c.getInt("millis", 5000);
375 int numWriters = c.getInt("numWriters", 50);
376 int numGetters = c.getInt("numGetters", 2);
377 int numScanners = c.getInt("numScanners", 2);
378 int numUniqueRows = c.getInt("numUniqueRows", 3);
379 runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true);
380 return 0;
381 }
382
383 public static void main(String args[]) throws Exception {
384 Configuration c = HBaseConfiguration.create();
385 int status;
386 try {
387 TestAcidGuarantees test = new TestAcidGuarantees();
388 status = ToolRunner.run(c, test, args);
389 } catch (Exception e) {
390 LOG.error("Exiting due to error", e);
391 status = -1;
392 }
393 System.exit(status);
394 }
395
396
397 }
398