View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR;
24  import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR;
25  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
26  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
27  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
28  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
29  import static org.junit.Assert.assertArrayEquals;
30  import static org.junit.Assert.assertEquals;
31  import static org.junit.Assert.assertFalse;
32  import static org.junit.Assert.assertNotNull;
33  import static org.junit.Assert.assertNull;
34  import static org.junit.Assert.assertTrue;
35  import static org.junit.Assert.fail;
36  import static org.mockito.Matchers.any;
37  import static org.mockito.Matchers.anyBoolean;
38  import static org.mockito.Matchers.anyLong;
39  import static org.mockito.Matchers.eq;
40  import static org.mockito.Mockito.never;
41  import static org.mockito.Mockito.spy;
42  import static org.mockito.Mockito.times;
43  import static org.mockito.Mockito.verify;
44  
45  import java.io.IOException;
46  import java.io.InterruptedIOException;
47  import java.security.PrivilegedExceptionAction;
48  import java.util.ArrayList;
49  import java.util.Arrays;
50  import java.util.Collection;
51  import java.util.List;
52  import java.util.Map;
53  import java.util.NavigableMap;
54  import java.util.TreeMap;
55  import java.util.UUID;
56  import java.util.concurrent.atomic.AtomicBoolean;
57  import java.util.concurrent.atomic.AtomicInteger;
58  import java.util.concurrent.atomic.AtomicLong;
59  import java.util.concurrent.atomic.AtomicReference;
60  
61  import org.apache.commons.logging.Log;
62  import org.apache.commons.logging.LogFactory;
63  import org.apache.hadoop.conf.Configuration;
64  import org.apache.hadoop.fs.FSDataOutputStream;
65  import org.apache.hadoop.fs.FileStatus;
66  import org.apache.hadoop.fs.FileSystem;
67  import org.apache.hadoop.fs.Path;
68  import org.apache.hadoop.hbase.Cell;
69  import org.apache.hadoop.hbase.CellComparator;
70  import org.apache.hadoop.hbase.CellUtil;
71  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
72  import org.apache.hadoop.hbase.DroppedSnapshotException;
73  import org.apache.hadoop.hbase.HBaseConfiguration;
74  import org.apache.hadoop.hbase.HBaseTestCase;
75  import org.apache.hadoop.hbase.HBaseTestingUtility;
76  import org.apache.hadoop.hbase.HColumnDescriptor;
77  import org.apache.hadoop.hbase.HConstants;
78  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
79  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
80  import org.apache.hadoop.hbase.HRegionInfo;
81  import org.apache.hadoop.hbase.HTableDescriptor;
82  import org.apache.hadoop.hbase.KeyValue;
83  import org.apache.hadoop.hbase.MediumTests;
84  import org.apache.hadoop.hbase.MiniHBaseCluster;
85  import org.apache.hadoop.hbase.MultithreadedTestUtil;
86  import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
87  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
88  import org.apache.hadoop.hbase.NotServingRegionException;
89  import org.apache.hadoop.hbase.TableName;
90  import org.apache.hadoop.hbase.Waiter;
91  import org.apache.hadoop.hbase.client.Append;
92  import org.apache.hadoop.hbase.client.Delete;
93  import org.apache.hadoop.hbase.client.Durability;
94  import org.apache.hadoop.hbase.client.Get;
95  import org.apache.hadoop.hbase.client.HTable;
96  import org.apache.hadoop.hbase.client.Increment;
97  import org.apache.hadoop.hbase.client.Put;
98  import org.apache.hadoop.hbase.client.Result;
99  import org.apache.hadoop.hbase.client.Scan;
100 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
101 import org.apache.hadoop.hbase.filter.BinaryComparator;
102 import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
103 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
104 import org.apache.hadoop.hbase.filter.Filter;
105 import org.apache.hadoop.hbase.filter.FilterBase;
106 import org.apache.hadoop.hbase.filter.FilterList;
107 import org.apache.hadoop.hbase.filter.NullComparator;
108 import org.apache.hadoop.hbase.filter.PrefixFilter;
109 import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
110 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
111 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
112 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
113 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
114 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
115 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
116 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
117 import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
118 import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
119 import org.apache.hadoop.hbase.regionserver.wal.FaultyHLog;
120 import org.apache.hadoop.hbase.regionserver.wal.HLog;
121 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
122 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
123 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
124 import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
125 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
126 import org.apache.hadoop.hbase.security.User;
127 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
128 import org.apache.hadoop.hbase.util.Bytes;
129 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
130 import org.apache.hadoop.hbase.util.FSUtils;
131 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
132 import org.apache.hadoop.hbase.util.PairOfSameType;
133 import org.apache.hadoop.hbase.util.Threads;
134 import org.junit.After;
135 import org.junit.Assert;
136 import org.junit.Before;
137 import org.junit.Rule;
138 import org.junit.Test;
139 import org.junit.experimental.categories.Category;
140 import org.junit.rules.TestName;
141 import org.mockito.Mockito;
142 
143 import com.google.common.collect.Lists;
144 import com.google.protobuf.ByteString;
145 
146 /**
147  * Basic stand-alone testing of HRegion.  No clusters!
148  *
149  * A lot of the meta information for an HRegion now lives inside other HRegions
150  * or in the HBaseMaster, so only basic testing is possible.
151  */
152 @Category(MediumTests.class)
153 @SuppressWarnings("deprecation")
154 public class TestHRegion {
155   // Do not spin up clusters in here. If you need to spin up a cluster, do it
156   // over in TestHRegionOnCluster.
157   static final Log LOG = LogFactory.getLog(TestHRegion.class);
158   @Rule public TestName name = new TestName();
159 
160   private static final String COLUMN_FAMILY = "MyCF";
161   private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
162 
163   HRegion region = null;
164   // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
165   private static HBaseTestingUtility TEST_UTIL;
166   public static Configuration CONF ;
167   private String dir;
168   private static FileSystem FILESYSTEM;
169   private final int MAX_VERSIONS = 2;
170 
171   // Test names
172   protected byte[] tableName;
173   protected String method;
174   protected final byte[] qual1 = Bytes.toBytes("qual1");
175   protected final byte[] qual2 = Bytes.toBytes("qual2");
176   protected final byte[] qual3 = Bytes.toBytes("qual3");
177   protected final byte[] value1 = Bytes.toBytes("value1");
178   protected final byte[] value2 = Bytes.toBytes("value2");
179   protected final byte[] row = Bytes.toBytes("rowA");
180   protected final byte[] row2 = Bytes.toBytes("rowB");
181 
182   protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
183       .getInstance(MetricsAssertHelper.class);
184 
185   @Before
186   public void setup() throws IOException {
187     TEST_UTIL = HBaseTestingUtility.createLocalHTU();
188     FILESYSTEM = TEST_UTIL.getTestFileSystem();
189     CONF = TEST_UTIL.getConfiguration();
190     dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
191     method = name.getMethodName();
192     tableName = Bytes.toBytes(name.getMethodName());
193   }
194 
195   @After
196   public void tearDown() throws Exception {
197     EnvironmentEdgeManagerTestHelper.reset();
198     LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
199     TEST_UTIL.cleanupTestDir();
200   }
201 
202   String getName() {
203     return name.getMethodName();
204   }
205 
206   /**
207    * Test for Bug 2 of HBASE-10466.
208    * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
209    * is smaller than a certain value, or when region close starts a flush is ongoing, the first
210    * flush is skipped and only the second flush takes place. However, two flushes are required in
211    * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
212    * in current memstore. The fix is removing all conditions except abort check so we ensure 2
213    * flushes for region close."
214    * @throws IOException
215    */
216   @Test (timeout=60000)
217   public void testCloseCarryingSnapshot() throws IOException {
218     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
219     Store store = region.getStore(COLUMN_FAMILY_BYTES);
220     // Get some random bytes.
221     byte [] value = Bytes.toBytes(name.getMethodName());
222     // Make a random put against our cf.
223     Put put = new Put(value);
224     put.add(COLUMN_FAMILY_BYTES, null, value);
225     // First put something in current memstore, which will be in snapshot after flusher.prepare()
226     region.put(put);
227     StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
228     storeFlushCtx.prepare();
229     // Second put something in current memstore
230     put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
231     region.put(put);
232     // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
233     region.close();
234     assertEquals(0, region.getMemstoreSize().get());
235     HRegion.closeHRegion(region);
236   }
237 
238   /*
239    * This test is for verifying memstore snapshot size is correctly updated in case of rollback
240    * See HBASE-10845
241    */
242   @Test (timeout=60000)
243   public void testMemstoreSnapshotSize() throws IOException {
244     class MyFaultyHLog extends FaultyHLog {
245       StoreFlushContext storeFlushCtx;
246       public MyFaultyHLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
247           throws IOException {
248         super(fs, rootDir, logName, conf);
249       }
250       
251       void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
252         this.storeFlushCtx = storeFlushCtx;
253       }
254 
255       @Override
256       public void sync(long txid) throws IOException {
257         storeFlushCtx.prepare();
258         super.sync(txid);
259       }
260     }
261     
262     FileSystem fs = FileSystem.get(CONF);
263     Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
264     MyFaultyHLog faultyLog = new MyFaultyHLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
265     HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
266       CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
267     
268     Store store = region.getStore(COLUMN_FAMILY_BYTES);
269     // Get some random bytes.
270     byte [] value = Bytes.toBytes(name.getMethodName());
271     faultyLog.setStoreFlushCtx(store.createFlushContext(12345));
272     
273     Put put = new Put(value);
274     put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
275     faultyLog.setFailureType(FaultyHLog.FailureType.SYNC);
276 
277     boolean threwIOE = false;
278     try {
279       region.put(put);
280     } catch (IOException ioe) {
281       threwIOE = true;
282     } finally {
283       assertTrue("The regionserver should have thrown an exception", threwIOE);
284     }
285     long sz = store.getFlushableSize();
286     assertTrue("flushable size should be zero, but it is " + sz, sz == 0);
287     HRegion.closeHRegion(region);
288   }
289   
290   /**
291    * Test we do not lose data if we fail a flush and then close.
292    * Part of HBase-10466.  Tests the following from the issue description:
293    * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
294    * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
295    * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
296    * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
297    * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
298    * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
299    * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
300    * much smaller than expected. In extreme case, if the error accumulates to even bigger than
301    * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
302    * if memstoreSize is not larger than 0."
303    * @throws Exception
304    */
305   @Test (timeout=60000)
306   public void testFlushSizeAccounting() throws Exception {
307     final Configuration conf = HBaseConfiguration.create(CONF);
308     // Only retry once.
309     conf.setInt("hbase.hstore.flush.retries.number", 1);
310     final User user =
311       User.createUserForTesting(conf, this.name.getMethodName(), new String[]{"foo"});
312     // Inject our faulty LocalFileSystem
313     conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
314     user.runAs(new PrivilegedExceptionAction<Object>() {
315       @Override
316       public Object run() throws Exception {
317         // Make sure it worked (above is sensitive to caching details in hadoop core)
318         FileSystem fs = FileSystem.get(conf);
319         Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
320         FaultyFileSystem ffs = (FaultyFileSystem)fs;
321         HRegion region = null;
322         try {
323           // Initialize region
324           region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
325           long size = region.getMemstoreSize().get();
326           Assert.assertEquals(0, size);
327           // Put one item into memstore.  Measure the size of one item in memstore.
328           Put p1 = new Put(row);
329           p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
330           region.put(p1);
331           final long sizeOfOnePut = region.getMemstoreSize().get();
332           // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
333           try {
334             LOG.info("Flushing");
335             region.flushcache();
336             Assert.fail("Didn't bubble up IOE!");
337           } catch (DroppedSnapshotException dse) {
338             // What we are expecting
339           }
340           // Make it so all writes succeed from here on out
341           ffs.fault.set(false);
342           // Check sizes.  Should still be the one entry.
343           Assert.assertEquals(sizeOfOnePut, region.getMemstoreSize().get());
344           // Now add two entries so that on this next flush that fails, we can see if we
345           // subtract the right amount, the snapshot size only.
346           Put p2 = new Put(row);
347           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
348           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
349           region.put(p2);
350           Assert.assertEquals(sizeOfOnePut * 3, region.getMemstoreSize().get());
351           // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
352           // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
353           // it
354           region.flushcache();
355           // Make sure our memory accounting is right.
356           Assert.assertEquals(sizeOfOnePut * 2, region.getMemstoreSize().get());
357         } finally {
358           HRegion.closeHRegion(region);
359         }
360         return null;
361       }
362     });
363     FileSystem.closeAllForUGI(user.getUGI());
364   }
365 
366   @Test
367   public void testCompactionAffectedByScanners() throws Exception {
368     byte[] family = Bytes.toBytes("family");
369     this.region = initHRegion(tableName, method, CONF, family);
370 
371     Put put = new Put(Bytes.toBytes("r1"));
372     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
373     region.put(put);
374     region.flushcache();
375 
376     Scan scan = new Scan();
377     scan.setMaxVersions(3);
378     // open the first scanner
379     RegionScanner scanner1 = region.getScanner(scan);
380 
381     Delete delete = new Delete(Bytes.toBytes("r1"));
382     region.delete(delete);
383     region.flushcache();
384 
385     // open the second scanner
386     RegionScanner scanner2 = region.getScanner(scan);
387 
388     List<Cell> results = new ArrayList<Cell>();
389 
390     System.out.println("Smallest read point:" + region.getSmallestReadPoint());
391 
392     // make a major compaction
393     region.compactStores(true);
394 
395     // open the third scanner
396     RegionScanner scanner3 = region.getScanner(scan);
397 
398     // get data from scanner 1, 2, 3 after major compaction
399     scanner1.next(results);
400     System.out.println(results);
401     assertEquals(1, results.size());
402 
403     results.clear();
404     scanner2.next(results);
405     System.out.println(results);
406     assertEquals(0, results.size());
407 
408     results.clear();
409     scanner3.next(results);
410     System.out.println(results);
411     assertEquals(0, results.size());
412   }
413 
414   @Test
415   public void testToShowNPEOnRegionScannerReseek() throws Exception {
416     byte[] family = Bytes.toBytes("family");
417     this.region = initHRegion(tableName, method, CONF, family);
418 
419     Put put = new Put(Bytes.toBytes("r1"));
420     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
421     region.put(put);
422     put = new Put(Bytes.toBytes("r2"));
423     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
424     region.put(put);
425     region.flushcache();
426 
427     Scan scan = new Scan();
428     scan.setMaxVersions(3);
429     // open the first scanner
430     RegionScanner scanner1 = region.getScanner(scan);
431 
432     System.out.println("Smallest read point:" + region.getSmallestReadPoint());
433 
434     region.compactStores(true);
435 
436     scanner1.reseek(Bytes.toBytes("r2"));
437     List<Cell> results = new ArrayList<Cell>();
438     scanner1.next(results);
439     Cell keyValue = results.get(0);
440     Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
441     scanner1.close();
442   }
443 
444   @Test
445   public void testSkipRecoveredEditsReplay() throws Exception {
446     String method = "testSkipRecoveredEditsReplay";
447     TableName tableName = TableName.valueOf(method);
448     byte[] family = Bytes.toBytes("family");
449     this.region = initHRegion(tableName, method, CONF, family);
450     try {
451       Path regiondir = region.getRegionFileSystem().getRegionDir();
452       FileSystem fs = region.getRegionFileSystem().getFileSystem();
453       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
454 
455       Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
456 
457       long maxSeqId = 1050;
458       long minSeqId = 1000;
459 
460       for (long i = minSeqId; i <= maxSeqId; i += 10) {
461         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
462         fs.create(recoveredEdits);
463         HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
464 
465         long time = System.nanoTime();
466         WALEdit edit = new WALEdit();
467         edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
468             .toBytes(i)));
469         writer.append(new HLog.Entry(new HLogKey(regionName, tableName, i, time,
470             HConstants.DEFAULT_CLUSTER_ID), edit));
471 
472         writer.close();
473       }
474       MonitoredTask status = TaskMonitor.get().createStatus(method);
475       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
476       for (Store store : region.getStores().values()) {
477         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId - 1);
478       }
479       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
480       assertEquals(maxSeqId, seqId);
481       Get get = new Get(row);
482       Result result = region.get(get);
483       for (long i = minSeqId; i <= maxSeqId; i += 10) {
484         List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
485         assertEquals(1, kvs.size());
486         assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
487       }
488     } finally {
489       HRegion.closeHRegion(this.region);
490       this.region = null;
491     }
492   }
493 
494   @Test
495   public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
496     String method = "testSkipRecoveredEditsReplaySomeIgnored";
497     TableName tableName = TableName.valueOf(method);
498     byte[] family = Bytes.toBytes("family");
499     this.region = initHRegion(tableName, method, CONF, family);
500     try {
501       Path regiondir = region.getRegionFileSystem().getRegionDir();
502       FileSystem fs = region.getRegionFileSystem().getFileSystem();
503       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
504 
505       Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
506 
507       long maxSeqId = 1050;
508       long minSeqId = 1000;
509 
510       for (long i = minSeqId; i <= maxSeqId; i += 10) {
511         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
512         fs.create(recoveredEdits);
513         HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
514 
515         long time = System.nanoTime();
516         WALEdit edit = new WALEdit();
517         edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
518             .toBytes(i)));
519         writer.append(new HLog.Entry(new HLogKey(regionName, tableName, i, time,
520             HConstants.DEFAULT_CLUSTER_ID), edit));
521 
522         writer.close();
523       }
524       long recoverSeqId = 1030;
525       MonitoredTask status = TaskMonitor.get().createStatus(method);
526       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
527       for (Store store : region.getStores().values()) {
528         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1);
529       }
530       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
531       assertEquals(maxSeqId, seqId);
532       Get get = new Get(row);
533       Result result = region.get(get);
534       for (long i = minSeqId; i <= maxSeqId; i += 10) {
535         List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
536         if (i < recoverSeqId) {
537           assertEquals(0, kvs.size());
538         } else {
539           assertEquals(1, kvs.size());
540           assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
541         }
542       }
543     } finally {
544       HRegion.closeHRegion(this.region);
545       this.region = null;
546     }
547   }
548 
549   @Test
550   public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
551     byte[] family = Bytes.toBytes("family");
552     this.region = initHRegion(tableName, method, CONF, family);
553     try {
554       Path regiondir = region.getRegionFileSystem().getRegionDir();
555       FileSystem fs = region.getRegionFileSystem().getFileSystem();
556 
557       Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
558       for (int i = 1000; i < 1050; i += 10) {
559         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
560         FSDataOutputStream dos = fs.create(recoveredEdits);
561         dos.writeInt(i);
562         dos.close();
563       }
564       long minSeqId = 2000;
565       Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
566       FSDataOutputStream dos = fs.create(recoveredEdits);
567       dos.close();
568 
569       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
570       for (Store store : region.getStores().values()) {
571         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId);
572       }
573       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, null);
574       assertEquals(minSeqId, seqId);
575     } finally {
576       HRegion.closeHRegion(this.region);
577       this.region = null;
578     }
579   }
580 
581   @Test
582   public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
583     String method = "testSkipRecoveredEditsReplayTheLastFileIgnored";
584     TableName tableName = TableName.valueOf(method);
585     byte[] family = Bytes.toBytes("family");
586     this.region = initHRegion(tableName, method, CONF, family);
587     try {
588       Path regiondir = region.getRegionFileSystem().getRegionDir();
589       FileSystem fs = region.getRegionFileSystem().getFileSystem();
590       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
591 
592       assertEquals(0, region.getStoreFileList(
593         region.getStores().keySet().toArray(new byte[0][])).size());
594 
595       Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
596 
597       long maxSeqId = 1050;
598       long minSeqId = 1000;
599 
600       for (long i = minSeqId; i <= maxSeqId; i += 10) {
601         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
602         fs.create(recoveredEdits);
603         HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
604 
605         long time = System.nanoTime();
606         WALEdit edit = null;
607         if (i == maxSeqId) {
608           edit = WALEdit.createCompaction(region.getRegionInfo(),
609           CompactionDescriptor.newBuilder()
610           .setTableName(ByteString.copyFrom(tableName.getName()))
611           .setFamilyName(ByteString.copyFrom(regionName))
612           .setEncodedRegionName(ByteString.copyFrom(regionName))
613           .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
614           .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
615           .build());
616         } else {
617           edit = new WALEdit();
618           edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
619             .toBytes(i)));
620         }
621         writer.append(new HLog.Entry(new HLogKey(regionName, tableName, i, time,
622             HConstants.DEFAULT_CLUSTER_ID), edit));
623         writer.close();
624       }
625 
626       long recoverSeqId = 1030;
627       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
628       MonitoredTask status = TaskMonitor.get().createStatus(method);
629       for (Store store : region.getStores().values()) {
630         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1);
631       }
632       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
633       assertEquals(maxSeqId, seqId);
634 
635       // assert that the files are flushed
636       assertEquals(1, region.getStoreFileList(
637         region.getStores().keySet().toArray(new byte[0][])).size());
638 
639     } finally {
640       HRegion.closeHRegion(this.region);
641       this.region = null;
642     }  }
643 
644   @Test
645   public void testRecoveredEditsReplayCompaction() throws Exception {
646     String method = name.getMethodName();
647     TableName tableName = TableName.valueOf(method);
648     byte[] family = Bytes.toBytes("family");
649     this.region = initHRegion(tableName, method, CONF, family);
650     try {
651       Path regiondir = region.getRegionFileSystem().getRegionDir();
652       FileSystem fs = region.getRegionFileSystem().getFileSystem();
653       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
654 
655       long maxSeqId = 3;
656       long minSeqId = 0;
657 
658       for (long i = minSeqId; i < maxSeqId; i++) {
659         Put put = new Put(Bytes.toBytes(i));
660         put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
661         region.put(put);
662         region.flushcache();
663       }
664 
665       // this will create a region with 3 files
666       assertEquals(3, region.getStore(family).getStorefilesCount());
667       List<Path> storeFiles = new ArrayList<Path>(3);
668       for (StoreFile sf : region.getStore(family).getStorefiles()) {
669         storeFiles.add(sf.getPath());
670       }
671 
672       // disable compaction completion
673       CONF.setBoolean("hbase.hstore.compaction.complete", false);
674       region.compactStores();
675 
676       // ensure that nothing changed
677       assertEquals(3, region.getStore(family).getStorefilesCount());
678 
679       // now find the compacted file, and manually add it to the recovered edits
680       Path tmpDir = region.getRegionFileSystem().getTempDir();
681       FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
682       String errorMsg = "Expected to find 1 file in the region temp directory "
683           + "from the compaction, could not find any";
684       assertNotNull(errorMsg, files);
685       assertEquals(errorMsg, 1, files.length);
686       // move the file inside region dir
687       Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
688           files[0].getPath());
689 
690       CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
691           .getRegionInfo(), family, storeFiles, Lists.newArrayList(newFile), region
692           .getRegionFileSystem().getStoreDir(Bytes.toString(family)));
693 
694       HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
695           this.region.getRegionInfo(), compactionDescriptor, new AtomicLong(1));
696 
697       Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
698 
699       Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
700       fs.create(recoveredEdits);
701       HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
702 
703       long time = System.nanoTime();
704 
705       writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time,
706           HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
707           compactionDescriptor)));
708       writer.close();
709 
710       // close the region now, and reopen again
711       region.getTableDesc();
712       region.getRegionInfo();
713       region.close();
714       region = HRegion.openHRegion(region, null);
715 
716       // now check whether we have only one store file, the compacted one
717       Collection<StoreFile> sfs = region.getStore(family).getStorefiles();
718       for (StoreFile sf : sfs) {
719         LOG.info(sf.getPath());
720       }
721       assertEquals(1, region.getStore(family).getStorefilesCount());
722       files = FSUtils.listStatus(fs, tmpDir);
723       assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
724 
725       for (long i = minSeqId; i < maxSeqId; i++) {
726         Get get = new Get(Bytes.toBytes(i));
727         Result result = region.get(get);
728         byte[] value = result.getValue(family, Bytes.toBytes(i));
729         assertArrayEquals(Bytes.toBytes(i), value);
730       }
731     } finally {
732       HRegion.closeHRegion(this.region);
733       this.region = null;
734     }
735   }
736 
737   @Test
738   public void testGetWhileRegionClose() throws IOException {
739     TableName tableName = TableName.valueOf(name.getMethodName());
740     Configuration hc = initSplit();
741     int numRows = 100;
742     byte[][] families = { fam1, fam2, fam3 };
743 
744     // Setting up region
745     String method = name.getMethodName();
746     this.region = initHRegion(tableName, method, hc, families);
747     try {
748       // Put data in region
749       final int startRow = 100;
750       putData(startRow, numRows, qual1, families);
751       putData(startRow, numRows, qual2, families);
752       putData(startRow, numRows, qual3, families);
753       final AtomicBoolean done = new AtomicBoolean(false);
754       final AtomicInteger gets = new AtomicInteger(0);
755       GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
756       try {
757         // Set ten threads running concurrently getting from the region.
758         for (int i = 0; i < threads.length / 2; i++) {
759           threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
760           threads[i].setDaemon(true);
761           threads[i].start();
762         }
763         // Artificially make the condition by setting closing flag explicitly.
764         // I can't make the issue happen with a call to region.close().
765         this.region.closing.set(true);
766         for (int i = threads.length / 2; i < threads.length; i++) {
767           threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
768           threads[i].setDaemon(true);
769           threads[i].start();
770         }
771       } finally {
772         if (this.region != null) {
773           HRegion.closeHRegion(this.region);
774         }
775       }
776       done.set(true);
777       for (GetTillDoneOrException t : threads) {
778         try {
779           t.join();
780         } catch (InterruptedException e) {
781           e.printStackTrace();
782         }
783         if (t.e != null) {
784           LOG.info("Exception=" + t.e);
785           assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
786         }
787       }
788     } finally {
789       HRegion.closeHRegion(this.region);
790       this.region = null;
791     }
792   }
793 
794   /*
795    * Thread that does get on single row until 'done' flag is flipped. If an
796    * exception causes us to fail, it records it.
797    */
798   class GetTillDoneOrException extends Thread {
799     private final Get g;
800     private final AtomicBoolean done;
801     private final AtomicInteger count;
802     private Exception e;
803 
804     GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final AtomicInteger c) {
805       super("getter." + i);
806       this.g = new Get(r);
807       this.done = d;
808       this.count = c;
809     }
810 
811     @Override
812     public void run() {
813       while (!this.done.get()) {
814         try {
815           assertTrue(region.get(g).size() > 0);
816           this.count.incrementAndGet();
817         } catch (Exception e) {
818           this.e = e;
819           break;
820         }
821       }
822     }
823   }
824 
825   /*
826    * An involved filter test. Has multiple column families and deletes in mix.
827    */
828   @Test
829   public void testWeirdCacheBehaviour() throws Exception {
830     byte[] TABLE = Bytes.toBytes("testWeirdCacheBehaviour");
831     byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
832         Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
833     this.region = initHRegion(TABLE, getName(), CONF, FAMILIES);
834     try {
835       String value = "this is the value";
836       String value2 = "this is some other value";
837       String keyPrefix1 = "prefix1";
838       String keyPrefix2 = "prefix2";
839       String keyPrefix3 = "prefix3";
840       putRows(this.region, 3, value, keyPrefix1);
841       putRows(this.region, 3, value, keyPrefix2);
842       putRows(this.region, 3, value, keyPrefix3);
843       putRows(this.region, 3, value2, keyPrefix1);
844       putRows(this.region, 3, value2, keyPrefix2);
845       putRows(this.region, 3, value2, keyPrefix3);
846       System.out.println("Checking values for key: " + keyPrefix1);
847       assertEquals("Got back incorrect number of rows from scan", 3,
848           getNumberOfRows(keyPrefix1, value2, this.region));
849       System.out.println("Checking values for key: " + keyPrefix2);
850       assertEquals("Got back incorrect number of rows from scan", 3,
851           getNumberOfRows(keyPrefix2, value2, this.region));
852       System.out.println("Checking values for key: " + keyPrefix3);
853       assertEquals("Got back incorrect number of rows from scan", 3,
854           getNumberOfRows(keyPrefix3, value2, this.region));
855       deleteColumns(this.region, value2, keyPrefix1);
856       deleteColumns(this.region, value2, keyPrefix2);
857       deleteColumns(this.region, value2, keyPrefix3);
858       System.out.println("Starting important checks.....");
859       assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
860           getNumberOfRows(keyPrefix1, value2, this.region));
861       assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
862           getNumberOfRows(keyPrefix2, value2, this.region));
863       assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
864           getNumberOfRows(keyPrefix3, value2, this.region));
865     } finally {
866       HRegion.closeHRegion(this.region);
867       this.region = null;
868     }
869   }
870 
871   @Test
872   public void testAppendWithReadOnlyTable() throws Exception {
873     byte[] TABLE = Bytes.toBytes("readOnlyTable");
874     this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily"));
875     boolean exceptionCaught = false;
876     Append append = new Append(Bytes.toBytes("somerow"));
877     append.setDurability(Durability.SKIP_WAL);
878     append.add(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
879         Bytes.toBytes("somevalue"));
880     try {
881       region.append(append);
882     } catch (IOException e) {
883       exceptionCaught = true;
884     } finally {
885       HRegion.closeHRegion(this.region);
886       this.region = null;
887     }
888     assertTrue(exceptionCaught == true);
889   }
890 
891   @Test
892   public void testIncrWithReadOnlyTable() throws Exception {
893     byte[] TABLE = Bytes.toBytes("readOnlyTable");
894     this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily"));
895     boolean exceptionCaught = false;
896     Increment inc = new Increment(Bytes.toBytes("somerow"));
897     inc.setDurability(Durability.SKIP_WAL);
898     inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
899     try {
900       region.increment(inc);
901     } catch (IOException e) {
902       exceptionCaught = true;
903     } finally {
904       HRegion.closeHRegion(this.region);
905       this.region = null;
906     }
907     assertTrue(exceptionCaught == true);
908   }
909 
910   private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
911     InternalScanner scanner = buildScanner(keyPrefix, value, r);
912     int count = 0;
913     boolean more = false;
914     List<Cell> results = new ArrayList<Cell>();
915     do {
916       more = scanner.next(results);
917       if (results != null && !results.isEmpty())
918         count++;
919       else
920         break;
921       Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
922       delete.deleteColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
923       r.delete(delete);
924       results.clear();
925     } while (more);
926     assertEquals("Did not perform correct number of deletes", 3, count);
927   }
928 
929   private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
930     InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
931     int numberOfResults = 0;
932     List<Cell> results = new ArrayList<Cell>();
933     boolean more = false;
934     do {
935       more = resultScanner.next(results);
936       if (results != null && !results.isEmpty())
937         numberOfResults++;
938       else
939         break;
940       for (Cell kv : results) {
941         System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
942       }
943       results.clear();
944     } while (more);
945     return numberOfResults;
946   }
947 
948   private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
949       throws IOException {
950     // Defaults FilterList.Operator.MUST_PASS_ALL.
951     FilterList allFilters = new FilterList();
952     allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
953     // Only return rows where this column value exists in the row.
954     SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
955         Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value));
956     filter.setFilterIfMissing(true);
957     allFilters.addFilter(filter);
958     Scan scan = new Scan();
959     scan.addFamily(Bytes.toBytes("trans-blob"));
960     scan.addFamily(Bytes.toBytes("trans-type"));
961     scan.addFamily(Bytes.toBytes("trans-date"));
962     scan.addFamily(Bytes.toBytes("trans-tags"));
963     scan.addFamily(Bytes.toBytes("trans-group"));
964     scan.setFilter(allFilters);
965     return r.getScanner(scan);
966   }
967 
968   private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
969     for (int i = 0; i < numRows; i++) {
970       String row = key + "_" + i/* UUID.randomUUID().toString() */;
971       System.out.println(String.format("Saving row: %s, with value %s", row, value));
972       Put put = new Put(Bytes.toBytes(row));
973       put.setDurability(Durability.SKIP_WAL);
974       put.add(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
975       put.add(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
976       put.add(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
977       put.add(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
978       put.add(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
979       r.put(put);
980     }
981   }
982 
983   @Test
984   public void testFamilyWithAndWithoutColon() throws Exception {
985     byte[] b = Bytes.toBytes(getName());
986     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
987     this.region = initHRegion(b, getName(), CONF, cf);
988     try {
989       Put p = new Put(b);
990       byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
991       p.add(cfwithcolon, cfwithcolon, cfwithcolon);
992       boolean exception = false;
993       try {
994         this.region.put(p);
995       } catch (NoSuchColumnFamilyException e) {
996         exception = true;
997       }
998       assertTrue(exception);
999     } finally {
1000       HRegion.closeHRegion(this.region);
1001       this.region = null;
1002     }
1003   }
1004 
1005   @Test
1006   public void testBatchPut() throws Exception {
1007     byte[] b = Bytes.toBytes(getName());
1008     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1009     byte[] qual = Bytes.toBytes("qual");
1010     byte[] val = Bytes.toBytes("val");
1011     this.region = initHRegion(b, getName(), CONF, cf);
1012     MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1013     try {
1014       long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1015       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1016 
1017       LOG.info("First a batch put with all valid puts");
1018       final Put[] puts = new Put[10];
1019       for (int i = 0; i < 10; i++) {
1020         puts[i] = new Put(Bytes.toBytes("row_" + i));
1021         puts[i].add(cf, qual, val);
1022       }
1023 
1024       OperationStatus[] codes = this.region.batchMutate(puts);
1025       assertEquals(10, codes.length);
1026       for (int i = 0; i < 10; i++) {
1027         assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1028       }
1029       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1030 
1031       LOG.info("Next a batch put with one invalid family");
1032       puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
1033       codes = this.region.batchMutate(puts);
1034       assertEquals(10, codes.length);
1035       for (int i = 0; i < 10; i++) {
1036         assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1037             codes[i].getOperationStatusCode());
1038       }
1039 
1040       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1041 
1042       LOG.info("Next a batch put that has to break into two batches to avoid a lock");
1043       RowLock rowLock = region.getRowLock(Bytes.toBytes("row_2"));
1044 
1045       MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1046       final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>();
1047       TestThread putter = new TestThread(ctx) {
1048         @Override
1049         public void doWork() throws IOException {
1050           retFromThread.set(region.batchMutate(puts));
1051         }
1052       };
1053       LOG.info("...starting put thread while holding lock");
1054       ctx.addThread(putter);
1055       ctx.startThreads();
1056 
1057       LOG.info("...waiting for put thread to sync first time");
1058       long startWait = System.currentTimeMillis();
1059       while (metricsAssertHelper.getCounter("syncTimeNumOps", source) == syncs + 2) {
1060         Thread.sleep(100);
1061         if (System.currentTimeMillis() - startWait > 10000) {
1062           fail("Timed out waiting for thread to sync first minibatch");
1063         }
1064       }
1065       LOG.info("...releasing row lock, which should let put thread continue");
1066       rowLock.release();
1067       LOG.info("...joining on thread");
1068       ctx.stop();
1069       LOG.info("...checking that next batch was synced");
1070       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 4, source);
1071       codes = retFromThread.get();
1072       for (int i = 0; i < 10; i++) {
1073         assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1074             codes[i].getOperationStatusCode());
1075       }
1076 
1077     } finally {
1078       HRegion.closeHRegion(this.region);
1079       this.region = null;
1080     }
1081   }
1082 
1083   @Test
1084   public void testBatchPutWithTsSlop() throws Exception {
1085     byte[] b = Bytes.toBytes(getName());
1086     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1087     byte[] qual = Bytes.toBytes("qual");
1088     byte[] val = Bytes.toBytes("val");
1089 
1090     // add data with a timestamp that is too recent for range. Ensure assert
1091     CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1092     this.region = initHRegion(b, getName(), CONF, cf);
1093 
1094     try {
1095       MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1096       long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1097       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1098 
1099       final Put[] puts = new Put[10];
1100       for (int i = 0; i < 10; i++) {
1101         puts[i] = new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100);
1102         puts[i].add(cf, qual, val);
1103       }
1104 
1105       OperationStatus[] codes = this.region.batchMutate(puts);
1106       assertEquals(10, codes.length);
1107       for (int i = 0; i < 10; i++) {
1108         assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1109       }
1110       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1111 
1112     } finally {
1113       HRegion.closeHRegion(this.region);
1114       this.region = null;
1115     }
1116 
1117   }
1118 
1119   // ////////////////////////////////////////////////////////////////////////////
1120   // checkAndMutate tests
1121   // ////////////////////////////////////////////////////////////////////////////
1122   @Test
1123   public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1124     byte[] row1 = Bytes.toBytes("row1");
1125     byte[] fam1 = Bytes.toBytes("fam1");
1126     byte[] qf1 = Bytes.toBytes("qualifier");
1127     byte[] emptyVal = new byte[] {};
1128     byte[] val1 = Bytes.toBytes("value1");
1129     byte[] val2 = Bytes.toBytes("value2");
1130 
1131     // Setting up region
1132     String method = this.getName();
1133     this.region = initHRegion(tableName, method, CONF, fam1);
1134     try {
1135       // Putting empty data in key
1136       Put put = new Put(row1);
1137       put.add(fam1, qf1, emptyVal);
1138 
1139       // checkAndPut with empty value
1140       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1141           emptyVal), put, true);
1142       assertTrue(res);
1143 
1144       // Putting data in key
1145       put = new Put(row1);
1146       put.add(fam1, qf1, val1);
1147 
1148       // checkAndPut with correct value
1149       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1150           put, true);
1151       assertTrue(res);
1152 
1153       // not empty anymore
1154       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1155           put, true);
1156       assertFalse(res);
1157 
1158       Delete delete = new Delete(row1);
1159       delete.deleteColumn(fam1, qf1);
1160       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1161           delete, true);
1162       assertFalse(res);
1163 
1164       put = new Put(row1);
1165       put.add(fam1, qf1, val2);
1166       // checkAndPut with correct value
1167       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val1),
1168           put, true);
1169       assertTrue(res);
1170 
1171       // checkAndDelete with correct value
1172       delete = new Delete(row1);
1173       delete.deleteColumn(fam1, qf1);
1174       delete.deleteColumn(fam1, qf1);
1175       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val2),
1176           delete, true);
1177       assertTrue(res);
1178 
1179       delete = new Delete(row1);
1180       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1181           delete, true);
1182       assertTrue(res);
1183 
1184       // checkAndPut looking for a null value
1185       put = new Put(row1);
1186       put.add(fam1, qf1, val1);
1187 
1188       res = region
1189           .checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new NullComparator(), put, true);
1190       assertTrue(res);
1191     } finally {
1192       HRegion.closeHRegion(this.region);
1193       this.region = null;
1194     }
1195   }
1196 
1197   @Test
1198   public void testCheckAndMutate_WithWrongValue() throws IOException {
1199     byte[] row1 = Bytes.toBytes("row1");
1200     byte[] fam1 = Bytes.toBytes("fam1");
1201     byte[] qf1 = Bytes.toBytes("qualifier");
1202     byte[] val1 = Bytes.toBytes("value1");
1203     byte[] val2 = Bytes.toBytes("value2");
1204 
1205     // Setting up region
1206     String method = this.getName();
1207     this.region = initHRegion(tableName, method, CONF, fam1);
1208     try {
1209       // Putting data in key
1210       Put put = new Put(row1);
1211       put.add(fam1, qf1, val1);
1212       region.put(put);
1213 
1214       // checkAndPut with wrong value
1215       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1216           val2), put, true);
1217       assertEquals(false, res);
1218 
1219       // checkAndDelete with wrong value
1220       Delete delete = new Delete(row1);
1221       delete.deleteFamily(fam1);
1222       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val2),
1223           delete, true);
1224       assertEquals(false, res);
1225     } finally {
1226       HRegion.closeHRegion(this.region);
1227       this.region = null;
1228     }
1229   }
1230 
1231   @Test
1232   public void testCheckAndMutate_WithCorrectValue() throws IOException {
1233     byte[] row1 = Bytes.toBytes("row1");
1234     byte[] fam1 = Bytes.toBytes("fam1");
1235     byte[] qf1 = Bytes.toBytes("qualifier");
1236     byte[] val1 = Bytes.toBytes("value1");
1237 
1238     // Setting up region
1239     String method = this.getName();
1240     this.region = initHRegion(tableName, method, CONF, fam1);
1241     try {
1242       // Putting data in key
1243       Put put = new Put(row1);
1244       put.add(fam1, qf1, val1);
1245       region.put(put);
1246 
1247       // checkAndPut with correct value
1248       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1249           val1), put, true);
1250       assertEquals(true, res);
1251 
1252       // checkAndDelete with correct value
1253       Delete delete = new Delete(row1);
1254       delete.deleteColumn(fam1, qf1);
1255       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val1),
1256           put, true);
1257       assertEquals(true, res);
1258     } finally {
1259       HRegion.closeHRegion(this.region);
1260       this.region = null;
1261     }
1262   }
1263 
1264   @Test
1265   public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
1266     byte[] row1 = Bytes.toBytes("row1");
1267     byte[] fam1 = Bytes.toBytes("fam1");
1268     byte[] qf1 = Bytes.toBytes("qualifier");
1269     byte[] val1 = Bytes.toBytes("value1");
1270     byte[] val2 = Bytes.toBytes("value2");
1271     byte[] val3 = Bytes.toBytes("value3");
1272     byte[] val4 = Bytes.toBytes("value4");
1273 
1274     // Setting up region
1275     String method = this.getName();
1276     this.region = initHRegion(tableName, method, CONF, fam1);
1277     try {
1278       // Putting val3 in key
1279       Put put = new Put(row1);
1280       put.add(fam1, qf1, val3);
1281       region.put(put);
1282 
1283       // Test CompareOp.LESS: original = val3, compare with val3, fail
1284       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS,
1285           new BinaryComparator(val3), put, true);
1286       assertEquals(false, res);
1287 
1288       // Test CompareOp.LESS: original = val3, compare with val4, fail
1289       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS,
1290           new BinaryComparator(val4), put, true);
1291       assertEquals(false, res);
1292 
1293       // Test CompareOp.LESS: original = val3, compare with val2,
1294       // succeed (now value = val2)
1295       put = new Put(row1);
1296       put.add(fam1, qf1, val2);
1297       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS,
1298           new BinaryComparator(val2), put, true);
1299       assertEquals(true, res);
1300 
1301       // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1302       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS_OR_EQUAL,
1303           new BinaryComparator(val3), put, true);
1304       assertEquals(false, res);
1305 
1306       // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1307       // succeed (value still = val2)
1308       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS_OR_EQUAL,
1309           new BinaryComparator(val2), put, true);
1310       assertEquals(true, res);
1311 
1312       // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1313       // succeed (now value = val3)
1314       put = new Put(row1);
1315       put.add(fam1, qf1, val3);
1316       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS_OR_EQUAL,
1317           new BinaryComparator(val1), put, true);
1318       assertEquals(true, res);
1319 
1320       // Test CompareOp.GREATER: original = val3, compare with val3, fail
1321       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER,
1322           new BinaryComparator(val3), put, true);
1323       assertEquals(false, res);
1324 
1325       // Test CompareOp.GREATER: original = val3, compare with val2, fail
1326       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER,
1327           new BinaryComparator(val2), put, true);
1328       assertEquals(false, res);
1329 
1330       // Test CompareOp.GREATER: original = val3, compare with val4,
1331       // succeed (now value = val2)
1332       put = new Put(row1);
1333       put.add(fam1, qf1, val2);
1334       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER,
1335           new BinaryComparator(val4), put, true);
1336       assertEquals(true, res);
1337 
1338       // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
1339       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER_OR_EQUAL,
1340           new BinaryComparator(val1), put, true);
1341       assertEquals(false, res);
1342 
1343       // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
1344       // succeed (value still = val2)
1345       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER_OR_EQUAL,
1346           new BinaryComparator(val2), put, true);
1347       assertEquals(true, res);
1348 
1349       // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
1350       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER_OR_EQUAL,
1351           new BinaryComparator(val3), put, true);
1352       assertEquals(true, res);
1353     } finally {
1354       HRegion.closeHRegion(this.region);
1355       this.region = null;
1356     }
1357   }
1358 
1359   @Test
1360   public void testCheckAndPut_ThatPutWasWritten() throws IOException {
1361     byte[] row1 = Bytes.toBytes("row1");
1362     byte[] fam1 = Bytes.toBytes("fam1");
1363     byte[] fam2 = Bytes.toBytes("fam2");
1364     byte[] qf1 = Bytes.toBytes("qualifier");
1365     byte[] val1 = Bytes.toBytes("value1");
1366     byte[] val2 = Bytes.toBytes("value2");
1367 
1368     byte[][] families = { fam1, fam2 };
1369 
1370     // Setting up region
1371     String method = this.getName();
1372     this.region = initHRegion(tableName, method, CONF, families);
1373     try {
1374       // Putting data in the key to check
1375       Put put = new Put(row1);
1376       put.add(fam1, qf1, val1);
1377       region.put(put);
1378 
1379       // Creating put to add
1380       long ts = System.currentTimeMillis();
1381       KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
1382       put = new Put(row1);
1383       put.add(kv);
1384 
1385       // checkAndPut with wrong value
1386       HStore store = (HStore) region.getStore(fam1);
1387       store.memstore.kvset.size();
1388 
1389       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1390           val1), put, true);
1391       assertEquals(true, res);
1392       store.memstore.kvset.size();
1393 
1394       Get get = new Get(row1);
1395       get.addColumn(fam2, qf1);
1396       Cell[] actual = region.get(get).rawCells();
1397 
1398       Cell[] expected = { kv };
1399 
1400       assertEquals(expected.length, actual.length);
1401       for (int i = 0; i < actual.length; i++) {
1402         assertEquals(expected[i], actual[i]);
1403       }
1404     } finally {
1405       HRegion.closeHRegion(this.region);
1406       this.region = null;
1407     }
1408   }
1409 
1410   @Test
1411   public void testCheckAndPut_wrongRowInPut() throws IOException {
1412     TableName tableName = TableName.valueOf(name.getMethodName());
1413     this.region = initHRegion(tableName, this.getName(), CONF, COLUMNS);
1414     try {
1415       Put put = new Put(row2);
1416       put.add(fam1, qual1, value1);
1417       try {
1418         region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL,
1419             new BinaryComparator(value2), put, false);
1420         fail();
1421       } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
1422         // expected exception.
1423       }
1424     } finally {
1425       HRegion.closeHRegion(this.region);
1426       this.region = null;
1427     }
1428   }
1429 
1430   @Test
1431   public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
1432     byte[] row1 = Bytes.toBytes("row1");
1433     byte[] fam1 = Bytes.toBytes("fam1");
1434     byte[] fam2 = Bytes.toBytes("fam2");
1435     byte[] qf1 = Bytes.toBytes("qualifier1");
1436     byte[] qf2 = Bytes.toBytes("qualifier2");
1437     byte[] qf3 = Bytes.toBytes("qualifier3");
1438     byte[] val1 = Bytes.toBytes("value1");
1439     byte[] val2 = Bytes.toBytes("value2");
1440     byte[] val3 = Bytes.toBytes("value3");
1441     byte[] emptyVal = new byte[] {};
1442 
1443     byte[][] families = { fam1, fam2 };
1444 
1445     // Setting up region
1446     String method = this.getName();
1447     this.region = initHRegion(tableName, method, CONF, families);
1448     try {
1449       // Put content
1450       Put put = new Put(row1);
1451       put.add(fam1, qf1, val1);
1452       region.put(put);
1453       Threads.sleep(2);
1454 
1455       put = new Put(row1);
1456       put.add(fam1, qf1, val2);
1457       put.add(fam2, qf1, val3);
1458       put.add(fam2, qf2, val2);
1459       put.add(fam2, qf3, val1);
1460       put.add(fam1, qf3, val1);
1461       region.put(put);
1462 
1463       // Multi-column delete
1464       Delete delete = new Delete(row1);
1465       delete.deleteColumn(fam1, qf1);
1466       delete.deleteColumn(fam2, qf1);
1467       delete.deleteColumn(fam1, qf3);
1468       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1469           val2), delete, true);
1470       assertEquals(true, res);
1471 
1472       Get get = new Get(row1);
1473       get.addColumn(fam1, qf1);
1474       get.addColumn(fam1, qf3);
1475       get.addColumn(fam2, qf2);
1476       Result r = region.get(get);
1477       assertEquals(2, r.size());
1478       assertArrayEquals(val1, r.getValue(fam1, qf1));
1479       assertArrayEquals(val2, r.getValue(fam2, qf2));
1480 
1481       // Family delete
1482       delete = new Delete(row1);
1483       delete.deleteFamily(fam2);
1484       res = region.checkAndMutate(row1, fam2, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1485           delete, true);
1486       assertEquals(true, res);
1487 
1488       get = new Get(row1);
1489       r = region.get(get);
1490       assertEquals(1, r.size());
1491       assertArrayEquals(val1, r.getValue(fam1, qf1));
1492 
1493       // Row delete
1494       delete = new Delete(row1);
1495       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val1),
1496           delete, true);
1497       assertEquals(true, res);
1498       get = new Get(row1);
1499       r = region.get(get);
1500       assertEquals(0, r.size());
1501     } finally {
1502       HRegion.closeHRegion(this.region);
1503       this.region = null;
1504     }
1505   }
1506 
1507   // ////////////////////////////////////////////////////////////////////////////
1508   // Delete tests
1509   // ////////////////////////////////////////////////////////////////////////////
1510   @Test
1511   public void testDelete_multiDeleteColumn() throws IOException {
1512     byte[] row1 = Bytes.toBytes("row1");
1513     byte[] fam1 = Bytes.toBytes("fam1");
1514     byte[] qual = Bytes.toBytes("qualifier");
1515     byte[] value = Bytes.toBytes("value");
1516 
1517     Put put = new Put(row1);
1518     put.add(fam1, qual, 1, value);
1519     put.add(fam1, qual, 2, value);
1520 
1521     String method = this.getName();
1522     this.region = initHRegion(tableName, method, CONF, fam1);
1523     try {
1524       region.put(put);
1525 
1526       // We do support deleting more than 1 'latest' version
1527       Delete delete = new Delete(row1);
1528       delete.deleteColumn(fam1, qual);
1529       delete.deleteColumn(fam1, qual);
1530       region.delete(delete);
1531 
1532       Get get = new Get(row1);
1533       get.addFamily(fam1);
1534       Result r = region.get(get);
1535       assertEquals(0, r.size());
1536     } finally {
1537       HRegion.closeHRegion(this.region);
1538       this.region = null;
1539     }
1540   }
1541 
1542   @Test
1543   public void testDelete_CheckFamily() throws IOException {
1544     byte[] row1 = Bytes.toBytes("row1");
1545     byte[] fam1 = Bytes.toBytes("fam1");
1546     byte[] fam2 = Bytes.toBytes("fam2");
1547     byte[] fam3 = Bytes.toBytes("fam3");
1548     byte[] fam4 = Bytes.toBytes("fam4");
1549 
1550     // Setting up region
1551     String method = this.getName();
1552     this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
1553     try {
1554       List<Cell> kvs = new ArrayList<Cell>();
1555       kvs.add(new KeyValue(row1, fam4, null, null));
1556 
1557       // testing existing family
1558       byte[] family = fam2;
1559       try {
1560         NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<byte[], List<Cell>>(
1561             Bytes.BYTES_COMPARATOR);
1562         deleteMap.put(family, kvs);
1563         region.delete(deleteMap, Durability.SYNC_WAL);
1564       } catch (Exception e) {
1565         assertTrue("Family " + new String(family) + " does not exist", false);
1566       }
1567 
1568       // testing non existing family
1569       boolean ok = false;
1570       family = fam4;
1571       try {
1572         NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<byte[], List<Cell>>(
1573             Bytes.BYTES_COMPARATOR);
1574         deleteMap.put(family, kvs);
1575         region.delete(deleteMap, Durability.SYNC_WAL);
1576       } catch (Exception e) {
1577         ok = true;
1578       }
1579       assertEquals("Family " + new String(family) + " does exist", true, ok);
1580     } finally {
1581       HRegion.closeHRegion(this.region);
1582       this.region = null;
1583     }
1584   }
1585 
1586   @Test
1587   public void testDelete_mixed() throws IOException, InterruptedException {
1588     byte[] fam = Bytes.toBytes("info");
1589     byte[][] families = { fam };
1590     String method = this.getName();
1591     this.region = initHRegion(tableName, method, CONF, families);
1592     try {
1593       EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
1594 
1595       byte[] row = Bytes.toBytes("table_name");
1596       // column names
1597       byte[] serverinfo = Bytes.toBytes("serverinfo");
1598       byte[] splitA = Bytes.toBytes("splitA");
1599       byte[] splitB = Bytes.toBytes("splitB");
1600 
1601       // add some data:
1602       Put put = new Put(row);
1603       put.add(fam, splitA, Bytes.toBytes("reference_A"));
1604       region.put(put);
1605 
1606       put = new Put(row);
1607       put.add(fam, splitB, Bytes.toBytes("reference_B"));
1608       region.put(put);
1609 
1610       put = new Put(row);
1611       put.add(fam, serverinfo, Bytes.toBytes("ip_address"));
1612       region.put(put);
1613 
1614       // ok now delete a split:
1615       Delete delete = new Delete(row);
1616       delete.deleteColumns(fam, splitA);
1617       region.delete(delete);
1618 
1619       // assert some things:
1620       Get get = new Get(row).addColumn(fam, serverinfo);
1621       Result result = region.get(get);
1622       assertEquals(1, result.size());
1623 
1624       get = new Get(row).addColumn(fam, splitA);
1625       result = region.get(get);
1626       assertEquals(0, result.size());
1627 
1628       get = new Get(row).addColumn(fam, splitB);
1629       result = region.get(get);
1630       assertEquals(1, result.size());
1631 
1632       // Assert that after a delete, I can put.
1633       put = new Put(row);
1634       put.add(fam, splitA, Bytes.toBytes("reference_A"));
1635       region.put(put);
1636       get = new Get(row);
1637       result = region.get(get);
1638       assertEquals(3, result.size());
1639 
1640       // Now delete all... then test I can add stuff back
1641       delete = new Delete(row);
1642       region.delete(delete);
1643       assertEquals(0, region.get(get).size());
1644 
1645       region.put(new Put(row).add(fam, splitA, Bytes.toBytes("reference_A")));
1646       result = region.get(get);
1647       assertEquals(1, result.size());
1648     } finally {
1649       HRegion.closeHRegion(this.region);
1650       this.region = null;
1651     }
1652   }
1653 
1654   @Test
1655   public void testDeleteRowWithFutureTs() throws IOException {
1656     byte[] fam = Bytes.toBytes("info");
1657     byte[][] families = { fam };
1658     String method = this.getName();
1659     this.region = initHRegion(tableName, method, CONF, families);
1660     try {
1661       byte[] row = Bytes.toBytes("table_name");
1662       // column names
1663       byte[] serverinfo = Bytes.toBytes("serverinfo");
1664 
1665       // add data in the far future
1666       Put put = new Put(row);
1667       put.add(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
1668       region.put(put);
1669 
1670       // now delete something in the present
1671       Delete delete = new Delete(row);
1672       region.delete(delete);
1673 
1674       // make sure we still see our data
1675       Get get = new Get(row).addColumn(fam, serverinfo);
1676       Result result = region.get(get);
1677       assertEquals(1, result.size());
1678 
1679       // delete the future row
1680       delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
1681       region.delete(delete);
1682 
1683       // make sure it is gone
1684       get = new Get(row).addColumn(fam, serverinfo);
1685       result = region.get(get);
1686       assertEquals(0, result.size());
1687     } finally {
1688       HRegion.closeHRegion(this.region);
1689       this.region = null;
1690     }
1691   }
1692 
1693   /**
1694    * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
1695    * the actual timestamp
1696    */
1697   @Test
1698   public void testPutWithLatestTS() throws IOException {
1699     byte[] fam = Bytes.toBytes("info");
1700     byte[][] families = { fam };
1701     String method = this.getName();
1702     this.region = initHRegion(tableName, method, CONF, families);
1703     try {
1704       byte[] row = Bytes.toBytes("row1");
1705       // column names
1706       byte[] qual = Bytes.toBytes("qual");
1707 
1708       // add data with LATEST_TIMESTAMP, put without WAL
1709       Put put = new Put(row);
1710       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
1711       region.put(put);
1712 
1713       // Make sure it shows up with an actual timestamp
1714       Get get = new Get(row).addColumn(fam, qual);
1715       Result result = region.get(get);
1716       assertEquals(1, result.size());
1717       Cell kv = result.rawCells()[0];
1718       LOG.info("Got: " + kv);
1719       assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
1720           kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
1721 
1722       // Check same with WAL enabled (historically these took different
1723       // code paths, so check both)
1724       row = Bytes.toBytes("row2");
1725       put = new Put(row);
1726       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
1727       region.put(put);
1728 
1729       // Make sure it shows up with an actual timestamp
1730       get = new Get(row).addColumn(fam, qual);
1731       result = region.get(get);
1732       assertEquals(1, result.size());
1733       kv = result.rawCells()[0];
1734       LOG.info("Got: " + kv);
1735       assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
1736           kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
1737     } finally {
1738       HRegion.closeHRegion(this.region);
1739       this.region = null;
1740     }
1741 
1742   }
1743 
1744   /**
1745    * Tests that there is server-side filtering for invalid timestamp upper
1746    * bound. Note that the timestamp lower bound is automatically handled for us
1747    * by the TTL field.
1748    */
1749   @Test
1750   public void testPutWithTsSlop() throws IOException {
1751     byte[] fam = Bytes.toBytes("info");
1752     byte[][] families = { fam };
1753     String method = this.getName();
1754 
1755     // add data with a timestamp that is too recent for range. Ensure assert
1756     CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1757     this.region = initHRegion(tableName, method, CONF, families);
1758     boolean caughtExcep = false;
1759     try {
1760       try {
1761         // no TS specified == use latest. should not error
1762         region.put(new Put(row).add(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
1763         // TS out of range. should error
1764         region.put(new Put(row).add(fam, Bytes.toBytes("qual"), System.currentTimeMillis() + 2000,
1765             Bytes.toBytes("value")));
1766         fail("Expected IOE for TS out of configured timerange");
1767       } catch (FailedSanityCheckException ioe) {
1768         LOG.debug("Received expected exception", ioe);
1769         caughtExcep = true;
1770       }
1771       assertTrue("Should catch FailedSanityCheckException", caughtExcep);
1772     } finally {
1773       HRegion.closeHRegion(this.region);
1774       this.region = null;
1775     }
1776   }
1777 
1778   @Test
1779   public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
1780     byte[] fam1 = Bytes.toBytes("columnA");
1781     byte[] fam2 = Bytes.toBytes("columnB");
1782     this.region = initHRegion(tableName, getName(), CONF, fam1, fam2);
1783     try {
1784       byte[] rowA = Bytes.toBytes("rowA");
1785       byte[] rowB = Bytes.toBytes("rowB");
1786 
1787       byte[] value = Bytes.toBytes("value");
1788 
1789       Delete delete = new Delete(rowA);
1790       delete.deleteFamily(fam1);
1791 
1792       region.delete(delete);
1793 
1794       // now create data.
1795       Put put = new Put(rowA);
1796       put.add(fam2, null, value);
1797       region.put(put);
1798 
1799       put = new Put(rowB);
1800       put.add(fam1, null, value);
1801       put.add(fam2, null, value);
1802       region.put(put);
1803 
1804       Scan scan = new Scan();
1805       scan.addFamily(fam1).addFamily(fam2);
1806       InternalScanner s = region.getScanner(scan);
1807       List<Cell> results = new ArrayList<Cell>();
1808       s.next(results);
1809       assertTrue(CellUtil.matchingRow(results.get(0), rowA));
1810 
1811       results.clear();
1812       s.next(results);
1813       assertTrue(CellUtil.matchingRow(results.get(0), rowB));
1814     } finally {
1815       HRegion.closeHRegion(this.region);
1816       this.region = null;
1817     }
1818   }
1819 
1820   @Test
1821   public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
1822     Delete delete = new Delete(row);
1823     delete.deleteColumns(fam1, qual1);
1824     doTestDelete_AndPostInsert(delete);
1825   }
1826 
1827   @Test
1828   public void testDeleteFamily_PostInsert() throws IOException, InterruptedException {
1829     Delete delete = new Delete(row);
1830     delete.deleteFamily(fam1);
1831     doTestDelete_AndPostInsert(delete);
1832   }
1833 
1834   public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
1835     TableName tableName = TableName.valueOf(name.getMethodName());
1836     this.region = initHRegion(tableName, getName(), CONF, fam1);
1837     try {
1838       EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
1839       Put put = new Put(row);
1840       put.add(fam1, qual1, value1);
1841       region.put(put);
1842 
1843       // now delete the value:
1844       region.delete(delete);
1845 
1846       // ok put data:
1847       put = new Put(row);
1848       put.add(fam1, qual1, value2);
1849       region.put(put);
1850 
1851       // ok get:
1852       Get get = new Get(row);
1853       get.addColumn(fam1, qual1);
1854 
1855       Result r = region.get(get);
1856       assertEquals(1, r.size());
1857       assertArrayEquals(value2, r.getValue(fam1, qual1));
1858 
1859       // next:
1860       Scan scan = new Scan(row);
1861       scan.addColumn(fam1, qual1);
1862       InternalScanner s = region.getScanner(scan);
1863 
1864       List<Cell> results = new ArrayList<Cell>();
1865       assertEquals(false, s.next(results));
1866       assertEquals(1, results.size());
1867       Cell kv = results.get(0);
1868 
1869       assertArrayEquals(value2, CellUtil.cloneValue(kv));
1870       assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
1871       assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
1872       assertArrayEquals(row, CellUtil.cloneRow(kv));
1873     } finally {
1874       HRegion.closeHRegion(this.region);
1875       this.region = null;
1876     }
1877   }
1878 
1879   @Test
1880   public void testDelete_CheckTimestampUpdated() throws IOException {
1881     TableName tableName = TableName.valueOf(name.getMethodName());
1882     byte[] row1 = Bytes.toBytes("row1");
1883     byte[] col1 = Bytes.toBytes("col1");
1884     byte[] col2 = Bytes.toBytes("col2");
1885     byte[] col3 = Bytes.toBytes("col3");
1886 
1887     // Setting up region
1888     String method = this.getName();
1889     this.region = initHRegion(tableName, method, CONF, fam1);
1890     try {
1891       // Building checkerList
1892       List<Cell> kvs = new ArrayList<Cell>();
1893       kvs.add(new KeyValue(row1, fam1, col1, null));
1894       kvs.add(new KeyValue(row1, fam1, col2, null));
1895       kvs.add(new KeyValue(row1, fam1, col3, null));
1896 
1897       NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<byte[], List<Cell>>(
1898           Bytes.BYTES_COMPARATOR);
1899       deleteMap.put(fam1, kvs);
1900       region.delete(deleteMap, Durability.SYNC_WAL);
1901 
1902       // extract the key values out the memstore:
1903       // This is kinda hacky, but better than nothing...
1904       long now = System.currentTimeMillis();
1905       KeyValue firstKv = ((HStore) region.getStore(fam1)).memstore.kvset.first();
1906       assertTrue(firstKv.getTimestamp() <= now);
1907       now = firstKv.getTimestamp();
1908       for (KeyValue kv : ((HStore) region.getStore(fam1)).memstore.kvset) {
1909         assertTrue(kv.getTimestamp() <= now);
1910         now = kv.getTimestamp();
1911       }
1912     } finally {
1913       HRegion.closeHRegion(this.region);
1914       this.region = null;
1915     }
1916   }
1917 
1918   // ////////////////////////////////////////////////////////////////////////////
1919   // Get tests
1920   // ////////////////////////////////////////////////////////////////////////////
1921   @Test
1922   public void testGet_FamilyChecker() throws IOException {
1923     byte[] row1 = Bytes.toBytes("row1");
1924     byte[] fam1 = Bytes.toBytes("fam1");
1925     byte[] fam2 = Bytes.toBytes("False");
1926     byte[] col1 = Bytes.toBytes("col1");
1927 
1928     // Setting up region
1929     String method = this.getName();
1930     this.region = initHRegion(tableName, method, CONF, fam1);
1931     try {
1932       Get get = new Get(row1);
1933       get.addColumn(fam2, col1);
1934 
1935       // Test
1936       try {
1937         region.get(get);
1938       } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
1939         assertFalse(false);
1940         return;
1941       }
1942       assertFalse(true);
1943     } finally {
1944       HRegion.closeHRegion(this.region);
1945       this.region = null;
1946     }
1947   }
1948 
1949   @Test
1950   public void testGet_Basic() throws IOException {
1951     byte[] row1 = Bytes.toBytes("row1");
1952     byte[] fam1 = Bytes.toBytes("fam1");
1953     byte[] col1 = Bytes.toBytes("col1");
1954     byte[] col2 = Bytes.toBytes("col2");
1955     byte[] col3 = Bytes.toBytes("col3");
1956     byte[] col4 = Bytes.toBytes("col4");
1957     byte[] col5 = Bytes.toBytes("col5");
1958 
1959     // Setting up region
1960     String method = this.getName();
1961     this.region = initHRegion(tableName, method, CONF, fam1);
1962     try {
1963       // Add to memstore
1964       Put put = new Put(row1);
1965       put.add(fam1, col1, null);
1966       put.add(fam1, col2, null);
1967       put.add(fam1, col3, null);
1968       put.add(fam1, col4, null);
1969       put.add(fam1, col5, null);
1970       region.put(put);
1971 
1972       Get get = new Get(row1);
1973       get.addColumn(fam1, col2);
1974       get.addColumn(fam1, col4);
1975       // Expected result
1976       KeyValue kv1 = new KeyValue(row1, fam1, col2);
1977       KeyValue kv2 = new KeyValue(row1, fam1, col4);
1978       KeyValue[] expected = { kv1, kv2 };
1979 
1980       // Test
1981       Result res = region.get(get);
1982       assertEquals(expected.length, res.size());
1983       for (int i = 0; i < res.size(); i++) {
1984         assertTrue(CellUtil.matchingRow(expected[i], res.rawCells()[i]));
1985         assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
1986         assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
1987       }
1988 
1989       // Test using a filter on a Get
1990       Get g = new Get(row1);
1991       final int count = 2;
1992       g.setFilter(new ColumnCountGetFilter(count));
1993       res = region.get(g);
1994       assertEquals(count, res.size());
1995     } finally {
1996       HRegion.closeHRegion(this.region);
1997       this.region = null;
1998     }
1999   }
2000 
2001   @Test
2002   public void testGet_Empty() throws IOException {
2003     byte[] row = Bytes.toBytes("row");
2004     byte[] fam = Bytes.toBytes("fam");
2005 
2006     String method = this.getName();
2007     this.region = initHRegion(tableName, method, CONF, fam);
2008     try {
2009       Get get = new Get(row);
2010       get.addFamily(fam);
2011       Result r = region.get(get);
2012 
2013       assertTrue(r.isEmpty());
2014     } finally {
2015       HRegion.closeHRegion(this.region);
2016       this.region = null;
2017     }
2018   }
2019 
2020   // ////////////////////////////////////////////////////////////////////////////
2021   // Merge test
2022   // ////////////////////////////////////////////////////////////////////////////
2023   @Test
2024   public void testMerge() throws IOException {
2025     byte[][] families = { fam1, fam2, fam3 };
2026     Configuration hc = initSplit();
2027     // Setting up region
2028     String method = this.getName();
2029     this.region = initHRegion(tableName, method, hc, families);
2030     try {
2031       LOG.info("" + HBaseTestCase.addContent(region, fam3));
2032       region.flushcache();
2033       region.compactStores();
2034       byte[] splitRow = region.checkSplit();
2035       assertNotNull(splitRow);
2036       LOG.info("SplitRow: " + Bytes.toString(splitRow));
2037       HRegion[] subregions = splitRegion(region, splitRow);
2038       try {
2039         // Need to open the regions.
2040         for (int i = 0; i < subregions.length; i++) {
2041           HRegion.openHRegion(subregions[i], null);
2042           subregions[i].compactStores();
2043         }
2044         Path oldRegionPath = region.getRegionFileSystem().getRegionDir();
2045         Path oldRegion1 = subregions[0].getRegionFileSystem().getRegionDir();
2046         Path oldRegion2 = subregions[1].getRegionFileSystem().getRegionDir();
2047         long startTime = System.currentTimeMillis();
2048         region = HRegion.mergeAdjacent(subregions[0], subregions[1]);
2049         LOG.info("Merge regions elapsed time: "
2050             + ((System.currentTimeMillis() - startTime) / 1000.0));
2051         FILESYSTEM.delete(oldRegion1, true);
2052         FILESYSTEM.delete(oldRegion2, true);
2053         FILESYSTEM.delete(oldRegionPath, true);
2054         LOG.info("splitAndMerge completed.");
2055       } finally {
2056         for (int i = 0; i < subregions.length; i++) {
2057           try {
2058             HRegion.closeHRegion(subregions[i]);
2059           } catch (IOException e) {
2060             // Ignore.
2061           }
2062         }
2063       }
2064     } finally {
2065       HRegion.closeHRegion(this.region);
2066       this.region = null;
2067     }
2068   }
2069 
2070   /**
2071    * @param parent
2072    *          Region to split.
2073    * @param midkey
2074    *          Key to split around.
2075    * @return The Regions we created.
2076    * @throws IOException
2077    */
2078   HRegion[] splitRegion(final HRegion parent, final byte[] midkey) throws IOException {
2079     PairOfSameType<HRegion> result = null;
2080     SplitTransaction st = new SplitTransaction(parent, midkey);
2081     // If prepare does not return true, for some reason -- logged inside in
2082     // the prepare call -- we are not ready to split just now. Just return.
2083     if (!st.prepare())
2084       return null;
2085     try {
2086       result = st.execute(null, null);
2087     } catch (IOException ioe) {
2088       try {
2089         LOG.info("Running rollback of failed split of " + parent.getRegionNameAsString() + "; "
2090             + ioe.getMessage());
2091         st.rollback(null, null);
2092         LOG.info("Successful rollback of failed split of " + parent.getRegionNameAsString());
2093         return null;
2094       } catch (RuntimeException e) {
2095         // If failed rollback, kill this server to avoid having a hole in table.
2096         LOG.info("Failed rollback of failed split of " + parent.getRegionNameAsString()
2097             + " -- aborting server", e);
2098       }
2099     }
2100     return new HRegion[] { result.getFirst(), result.getSecond() };
2101   }
2102 
2103   // ////////////////////////////////////////////////////////////////////////////
2104   // Scanner tests
2105   // ////////////////////////////////////////////////////////////////////////////
2106   @Test
2107   public void testGetScanner_WithOkFamilies() throws IOException {
2108     byte[] fam1 = Bytes.toBytes("fam1");
2109     byte[] fam2 = Bytes.toBytes("fam2");
2110 
2111     byte[][] families = { fam1, fam2 };
2112 
2113     // Setting up region
2114     String method = this.getName();
2115     this.region = initHRegion(tableName, method, CONF, families);
2116     try {
2117       Scan scan = new Scan();
2118       scan.addFamily(fam1);
2119       scan.addFamily(fam2);
2120       try {
2121         region.getScanner(scan);
2122       } catch (Exception e) {
2123         assertTrue("Families could not be found in Region", false);
2124       }
2125     } finally {
2126       HRegion.closeHRegion(this.region);
2127       this.region = null;
2128     }
2129   }
2130 
2131   @Test
2132   public void testGetScanner_WithNotOkFamilies() throws IOException {
2133     byte[] fam1 = Bytes.toBytes("fam1");
2134     byte[] fam2 = Bytes.toBytes("fam2");
2135 
2136     byte[][] families = { fam1 };
2137 
2138     // Setting up region
2139     String method = this.getName();
2140     this.region = initHRegion(tableName, method, CONF, families);
2141     try {
2142       Scan scan = new Scan();
2143       scan.addFamily(fam2);
2144       boolean ok = false;
2145       try {
2146         region.getScanner(scan);
2147       } catch (Exception e) {
2148         ok = true;
2149       }
2150       assertTrue("Families could not be found in Region", ok);
2151     } finally {
2152       HRegion.closeHRegion(this.region);
2153       this.region = null;
2154     }
2155   }
2156 
2157   @Test
2158   public void testGetScanner_WithNoFamilies() throws IOException {
2159     byte[] row1 = Bytes.toBytes("row1");
2160     byte[] fam1 = Bytes.toBytes("fam1");
2161     byte[] fam2 = Bytes.toBytes("fam2");
2162     byte[] fam3 = Bytes.toBytes("fam3");
2163     byte[] fam4 = Bytes.toBytes("fam4");
2164 
2165     byte[][] families = { fam1, fam2, fam3, fam4 };
2166 
2167     // Setting up region
2168     String method = this.getName();
2169     this.region = initHRegion(tableName, method, CONF, families);
2170     try {
2171 
2172       // Putting data in Region
2173       Put put = new Put(row1);
2174       put.add(fam1, null, null);
2175       put.add(fam2, null, null);
2176       put.add(fam3, null, null);
2177       put.add(fam4, null, null);
2178       region.put(put);
2179 
2180       Scan scan = null;
2181       HRegion.RegionScannerImpl is = null;
2182 
2183       // Testing to see how many scanners that is produced by getScanner,
2184       // starting
2185       // with known number, 2 - current = 1
2186       scan = new Scan();
2187       scan.addFamily(fam2);
2188       scan.addFamily(fam4);
2189       is = (RegionScannerImpl) region.getScanner(scan);
2190       assertEquals(1, ((RegionScannerImpl) is).storeHeap.getHeap().size());
2191 
2192       scan = new Scan();
2193       is = (RegionScannerImpl) region.getScanner(scan);
2194       assertEquals(families.length - 1, ((RegionScannerImpl) is).storeHeap.getHeap().size());
2195     } finally {
2196       HRegion.closeHRegion(this.region);
2197       this.region = null;
2198     }
2199   }
2200 
2201   /**
2202    * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
2203    * 
2204    * @throws IOException
2205    */
2206   @Test
2207   public void testGetScanner_WithRegionClosed() throws IOException {
2208     byte[] fam1 = Bytes.toBytes("fam1");
2209     byte[] fam2 = Bytes.toBytes("fam2");
2210 
2211     byte[][] families = { fam1, fam2 };
2212 
2213     // Setting up region
2214     String method = this.getName();
2215     try {
2216       this.region = initHRegion(tableName, method, CONF, families);
2217     } catch (IOException e) {
2218       e.printStackTrace();
2219       fail("Got IOException during initHRegion, " + e.getMessage());
2220     }
2221     try {
2222       region.closed.set(true);
2223       try {
2224         region.getScanner(null);
2225         fail("Expected to get an exception during getScanner on a region that is closed");
2226       } catch (NotServingRegionException e) {
2227         // this is the correct exception that is expected
2228       } catch (IOException e) {
2229         fail("Got wrong type of exception - should be a NotServingRegionException, but was an IOException: "
2230             + e.getMessage());
2231       }
2232     } finally {
2233       HRegion.closeHRegion(this.region);
2234       this.region = null;
2235     }
2236   }
2237 
2238   @Test
2239   public void testRegionScanner_Next() throws IOException {
2240     byte[] row1 = Bytes.toBytes("row1");
2241     byte[] row2 = Bytes.toBytes("row2");
2242     byte[] fam1 = Bytes.toBytes("fam1");
2243     byte[] fam2 = Bytes.toBytes("fam2");
2244     byte[] fam3 = Bytes.toBytes("fam3");
2245     byte[] fam4 = Bytes.toBytes("fam4");
2246 
2247     byte[][] families = { fam1, fam2, fam3, fam4 };
2248     long ts = System.currentTimeMillis();
2249 
2250     // Setting up region
2251     String method = this.getName();
2252     this.region = initHRegion(tableName, method, CONF, families);
2253     try {
2254       // Putting data in Region
2255       Put put = null;
2256       put = new Put(row1);
2257       put.add(fam1, (byte[]) null, ts, null);
2258       put.add(fam2, (byte[]) null, ts, null);
2259       put.add(fam3, (byte[]) null, ts, null);
2260       put.add(fam4, (byte[]) null, ts, null);
2261       region.put(put);
2262 
2263       put = new Put(row2);
2264       put.add(fam1, (byte[]) null, ts, null);
2265       put.add(fam2, (byte[]) null, ts, null);
2266       put.add(fam3, (byte[]) null, ts, null);
2267       put.add(fam4, (byte[]) null, ts, null);
2268       region.put(put);
2269 
2270       Scan scan = new Scan();
2271       scan.addFamily(fam2);
2272       scan.addFamily(fam4);
2273       InternalScanner is = region.getScanner(scan);
2274 
2275       List<Cell> res = null;
2276 
2277       // Result 1
2278       List<Cell> expected1 = new ArrayList<Cell>();
2279       expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
2280       expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
2281 
2282       res = new ArrayList<Cell>();
2283       is.next(res);
2284       for (int i = 0; i < res.size(); i++) {
2285         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
2286       }
2287 
2288       // Result 2
2289       List<Cell> expected2 = new ArrayList<Cell>();
2290       expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
2291       expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
2292 
2293       res = new ArrayList<Cell>();
2294       is.next(res);
2295       for (int i = 0; i < res.size(); i++) {
2296         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
2297       }
2298     } finally {
2299       HRegion.closeHRegion(this.region);
2300       this.region = null;
2301     }
2302   }
2303 
2304   @Test
2305   public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
2306     byte[] row1 = Bytes.toBytes("row1");
2307     byte[] qf1 = Bytes.toBytes("qualifier1");
2308     byte[] qf2 = Bytes.toBytes("qualifier2");
2309     byte[] fam1 = Bytes.toBytes("fam1");
2310     byte[][] families = { fam1 };
2311 
2312     long ts1 = System.currentTimeMillis();
2313     long ts2 = ts1 + 1;
2314     long ts3 = ts1 + 2;
2315 
2316     // Setting up region
2317     String method = this.getName();
2318     this.region = initHRegion(tableName, method, CONF, families);
2319     try {
2320       // Putting data in Region
2321       Put put = null;
2322       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2323       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2324       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2325 
2326       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2327       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2328       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2329 
2330       put = new Put(row1);
2331       put.add(kv13);
2332       put.add(kv12);
2333       put.add(kv11);
2334       put.add(kv23);
2335       put.add(kv22);
2336       put.add(kv21);
2337       region.put(put);
2338 
2339       // Expected
2340       List<Cell> expected = new ArrayList<Cell>();
2341       expected.add(kv13);
2342       expected.add(kv12);
2343 
2344       Scan scan = new Scan(row1);
2345       scan.addColumn(fam1, qf1);
2346       scan.setMaxVersions(MAX_VERSIONS);
2347       List<Cell> actual = new ArrayList<Cell>();
2348       InternalScanner scanner = region.getScanner(scan);
2349 
2350       boolean hasNext = scanner.next(actual);
2351       assertEquals(false, hasNext);
2352 
2353       // Verify result
2354       for (int i = 0; i < expected.size(); i++) {
2355         assertEquals(expected.get(i), actual.get(i));
2356       }
2357     } finally {
2358       HRegion.closeHRegion(this.region);
2359       this.region = null;
2360     }
2361   }
2362 
2363   @Test
2364   public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
2365     byte[] row1 = Bytes.toBytes("row1");
2366     byte[] qf1 = Bytes.toBytes("qualifier1");
2367     byte[] qf2 = Bytes.toBytes("qualifier2");
2368     byte[] fam1 = Bytes.toBytes("fam1");
2369     byte[][] families = { fam1 };
2370 
2371     long ts1 = 1; // System.currentTimeMillis();
2372     long ts2 = ts1 + 1;
2373     long ts3 = ts1 + 2;
2374 
2375     // Setting up region
2376     String method = this.getName();
2377     this.region = initHRegion(tableName, method, CONF, families);
2378     try {
2379       // Putting data in Region
2380       Put put = null;
2381       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2382       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2383       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2384 
2385       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2386       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2387       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2388 
2389       put = new Put(row1);
2390       put.add(kv13);
2391       put.add(kv12);
2392       put.add(kv11);
2393       put.add(kv23);
2394       put.add(kv22);
2395       put.add(kv21);
2396       region.put(put);
2397       region.flushcache();
2398 
2399       // Expected
2400       List<Cell> expected = new ArrayList<Cell>();
2401       expected.add(kv13);
2402       expected.add(kv12);
2403       expected.add(kv23);
2404       expected.add(kv22);
2405 
2406       Scan scan = new Scan(row1);
2407       scan.addColumn(fam1, qf1);
2408       scan.addColumn(fam1, qf2);
2409       scan.setMaxVersions(MAX_VERSIONS);
2410       List<Cell> actual = new ArrayList<Cell>();
2411       InternalScanner scanner = region.getScanner(scan);
2412 
2413       boolean hasNext = scanner.next(actual);
2414       assertEquals(false, hasNext);
2415 
2416       // Verify result
2417       for (int i = 0; i < expected.size(); i++) {
2418         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
2419       }
2420     } finally {
2421       HRegion.closeHRegion(this.region);
2422       this.region = null;
2423     }
2424   }
2425 
2426   @Test
2427   public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws IOException {
2428     byte[] row1 = Bytes.toBytes("row1");
2429     byte[] fam1 = Bytes.toBytes("fam1");
2430     byte[][] families = { fam1 };
2431     byte[] qf1 = Bytes.toBytes("qualifier1");
2432     byte[] qf2 = Bytes.toBytes("qualifier2");
2433 
2434     long ts1 = 1;
2435     long ts2 = ts1 + 1;
2436     long ts3 = ts1 + 2;
2437     long ts4 = ts1 + 3;
2438 
2439     // Setting up region
2440     String method = this.getName();
2441     this.region = initHRegion(tableName, method, CONF, families);
2442     try {
2443       // Putting data in Region
2444       KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
2445       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2446       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2447       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2448 
2449       KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
2450       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2451       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2452       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2453 
2454       Put put = null;
2455       put = new Put(row1);
2456       put.add(kv14);
2457       put.add(kv24);
2458       region.put(put);
2459       region.flushcache();
2460 
2461       put = new Put(row1);
2462       put.add(kv23);
2463       put.add(kv13);
2464       region.put(put);
2465       region.flushcache();
2466 
2467       put = new Put(row1);
2468       put.add(kv22);
2469       put.add(kv12);
2470       region.put(put);
2471       region.flushcache();
2472 
2473       put = new Put(row1);
2474       put.add(kv21);
2475       put.add(kv11);
2476       region.put(put);
2477 
2478       // Expected
2479       List<Cell> expected = new ArrayList<Cell>();
2480       expected.add(kv14);
2481       expected.add(kv13);
2482       expected.add(kv12);
2483       expected.add(kv24);
2484       expected.add(kv23);
2485       expected.add(kv22);
2486 
2487       Scan scan = new Scan(row1);
2488       scan.addColumn(fam1, qf1);
2489       scan.addColumn(fam1, qf2);
2490       int versions = 3;
2491       scan.setMaxVersions(versions);
2492       List<Cell> actual = new ArrayList<Cell>();
2493       InternalScanner scanner = region.getScanner(scan);
2494 
2495       boolean hasNext = scanner.next(actual);
2496       assertEquals(false, hasNext);
2497 
2498       // Verify result
2499       for (int i = 0; i < expected.size(); i++) {
2500         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
2501       }
2502     } finally {
2503       HRegion.closeHRegion(this.region);
2504       this.region = null;
2505     }
2506   }
2507 
2508   @Test
2509   public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
2510     byte[] row1 = Bytes.toBytes("row1");
2511     byte[] qf1 = Bytes.toBytes("qualifier1");
2512     byte[] qf2 = Bytes.toBytes("qualifier2");
2513     byte[] fam1 = Bytes.toBytes("fam1");
2514     byte[][] families = { fam1 };
2515 
2516     long ts1 = System.currentTimeMillis();
2517     long ts2 = ts1 + 1;
2518     long ts3 = ts1 + 2;
2519 
2520     // Setting up region
2521     String method = this.getName();
2522     this.region = initHRegion(tableName, method, CONF, families);
2523     try {
2524       // Putting data in Region
2525       Put put = null;
2526       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2527       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2528       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2529 
2530       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2531       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2532       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2533 
2534       put = new Put(row1);
2535       put.add(kv13);
2536       put.add(kv12);
2537       put.add(kv11);
2538       put.add(kv23);
2539       put.add(kv22);
2540       put.add(kv21);
2541       region.put(put);
2542 
2543       // Expected
2544       List<Cell> expected = new ArrayList<Cell>();
2545       expected.add(kv13);
2546       expected.add(kv12);
2547       expected.add(kv23);
2548       expected.add(kv22);
2549 
2550       Scan scan = new Scan(row1);
2551       scan.addFamily(fam1);
2552       scan.setMaxVersions(MAX_VERSIONS);
2553       List<Cell> actual = new ArrayList<Cell>();
2554       InternalScanner scanner = region.getScanner(scan);
2555 
2556       boolean hasNext = scanner.next(actual);
2557       assertEquals(false, hasNext);
2558 
2559       // Verify result
2560       for (int i = 0; i < expected.size(); i++) {
2561         assertEquals(expected.get(i), actual.get(i));
2562       }
2563     } finally {
2564       HRegion.closeHRegion(this.region);
2565       this.region = null;
2566     }
2567   }
2568 
2569   @Test
2570   public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
2571     byte[] row1 = Bytes.toBytes("row1");
2572     byte[] qf1 = Bytes.toBytes("qualifier1");
2573     byte[] qf2 = Bytes.toBytes("qualifier2");
2574     byte[] fam1 = Bytes.toBytes("fam1");
2575 
2576     long ts1 = 1; // System.currentTimeMillis();
2577     long ts2 = ts1 + 1;
2578     long ts3 = ts1 + 2;
2579 
2580     // Setting up region
2581     String method = this.getName();
2582     this.region = initHRegion(tableName, method, CONF, fam1);
2583     try {
2584       // Putting data in Region
2585       Put put = null;
2586       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2587       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2588       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2589 
2590       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2591       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2592       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2593 
2594       put = new Put(row1);
2595       put.add(kv13);
2596       put.add(kv12);
2597       put.add(kv11);
2598       put.add(kv23);
2599       put.add(kv22);
2600       put.add(kv21);
2601       region.put(put);
2602       region.flushcache();
2603 
2604       // Expected
2605       List<Cell> expected = new ArrayList<Cell>();
2606       expected.add(kv13);
2607       expected.add(kv12);
2608       expected.add(kv23);
2609       expected.add(kv22);
2610 
2611       Scan scan = new Scan(row1);
2612       scan.addFamily(fam1);
2613       scan.setMaxVersions(MAX_VERSIONS);
2614       List<Cell> actual = new ArrayList<Cell>();
2615       InternalScanner scanner = region.getScanner(scan);
2616 
2617       boolean hasNext = scanner.next(actual);
2618       assertEquals(false, hasNext);
2619 
2620       // Verify result
2621       for (int i = 0; i < expected.size(); i++) {
2622         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
2623       }
2624     } finally {
2625       HRegion.closeHRegion(this.region);
2626       this.region = null;
2627     }
2628   }
2629 
2630   @Test
2631   public void testScanner_StopRow1542() throws IOException {
2632     byte[] family = Bytes.toBytes("testFamily");
2633     this.region = initHRegion(tableName, getName(), CONF, family);
2634     try {
2635       byte[] row1 = Bytes.toBytes("row111");
2636       byte[] row2 = Bytes.toBytes("row222");
2637       byte[] row3 = Bytes.toBytes("row333");
2638       byte[] row4 = Bytes.toBytes("row444");
2639       byte[] row5 = Bytes.toBytes("row555");
2640 
2641       byte[] col1 = Bytes.toBytes("Pub111");
2642       byte[] col2 = Bytes.toBytes("Pub222");
2643 
2644       Put put = new Put(row1);
2645       put.add(family, col1, Bytes.toBytes(10L));
2646       region.put(put);
2647 
2648       put = new Put(row2);
2649       put.add(family, col1, Bytes.toBytes(15L));
2650       region.put(put);
2651 
2652       put = new Put(row3);
2653       put.add(family, col2, Bytes.toBytes(20L));
2654       region.put(put);
2655 
2656       put = new Put(row4);
2657       put.add(family, col2, Bytes.toBytes(30L));
2658       region.put(put);
2659 
2660       put = new Put(row5);
2661       put.add(family, col1, Bytes.toBytes(40L));
2662       region.put(put);
2663 
2664       Scan scan = new Scan(row3, row4);
2665       scan.setMaxVersions();
2666       scan.addColumn(family, col1);
2667       InternalScanner s = region.getScanner(scan);
2668 
2669       List<Cell> results = new ArrayList<Cell>();
2670       assertEquals(false, s.next(results));
2671       assertEquals(0, results.size());
2672     } finally {
2673       HRegion.closeHRegion(this.region);
2674       this.region = null;
2675     }
2676   }
2677 
2678   @Test
2679   public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
2680     byte[] row1 = Bytes.toBytes("row1");
2681     byte[] fam1 = Bytes.toBytes("fam1");
2682     byte[] qf1 = Bytes.toBytes("qualifier1");
2683     byte[] qf2 = Bytes.toBytes("quateslifier2");
2684 
2685     long ts1 = 1;
2686     long ts2 = ts1 + 1;
2687     long ts3 = ts1 + 2;
2688     long ts4 = ts1 + 3;
2689 
2690     // Setting up region
2691     String method = this.getName();
2692     this.region = initHRegion(tableName, method, CONF, fam1);
2693     try {
2694       // Putting data in Region
2695       KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
2696       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2697       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2698       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2699 
2700       KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
2701       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2702       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2703       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2704 
2705       Put put = null;
2706       put = new Put(row1);
2707       put.add(kv14);
2708       put.add(kv24);
2709       region.put(put);
2710       region.flushcache();
2711 
2712       put = new Put(row1);
2713       put.add(kv23);
2714       put.add(kv13);
2715       region.put(put);
2716       region.flushcache();
2717 
2718       put = new Put(row1);
2719       put.add(kv22);
2720       put.add(kv12);
2721       region.put(put);
2722       region.flushcache();
2723 
2724       put = new Put(row1);
2725       put.add(kv21);
2726       put.add(kv11);
2727       region.put(put);
2728 
2729       // Expected
2730       List<KeyValue> expected = new ArrayList<KeyValue>();
2731       expected.add(kv14);
2732       expected.add(kv13);
2733       expected.add(kv12);
2734       expected.add(kv24);
2735       expected.add(kv23);
2736       expected.add(kv22);
2737 
2738       Scan scan = new Scan(row1);
2739       int versions = 3;
2740       scan.setMaxVersions(versions);
2741       List<Cell> actual = new ArrayList<Cell>();
2742       InternalScanner scanner = region.getScanner(scan);
2743 
2744       boolean hasNext = scanner.next(actual);
2745       assertEquals(false, hasNext);
2746 
2747       // Verify result
2748       for (int i = 0; i < expected.size(); i++) {
2749         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
2750       }
2751     } finally {
2752       HRegion.closeHRegion(this.region);
2753       this.region = null;
2754     }
2755   }
2756 
2757   /**
2758    * Added for HBASE-5416
2759    * 
2760    * Here we test scan optimization when only subset of CFs are used in filter
2761    * conditions.
2762    */
2763   @Test
2764   public void testScanner_JoinedScanners() throws IOException {
2765     byte[] cf_essential = Bytes.toBytes("essential");
2766     byte[] cf_joined = Bytes.toBytes("joined");
2767     byte[] cf_alpha = Bytes.toBytes("alpha");
2768     this.region = initHRegion(tableName, getName(), CONF, cf_essential, cf_joined, cf_alpha);
2769     try {
2770       byte[] row1 = Bytes.toBytes("row1");
2771       byte[] row2 = Bytes.toBytes("row2");
2772       byte[] row3 = Bytes.toBytes("row3");
2773 
2774       byte[] col_normal = Bytes.toBytes("d");
2775       byte[] col_alpha = Bytes.toBytes("a");
2776 
2777       byte[] filtered_val = Bytes.toBytes(3);
2778 
2779       Put put = new Put(row1);
2780       put.add(cf_essential, col_normal, Bytes.toBytes(1));
2781       put.add(cf_joined, col_alpha, Bytes.toBytes(1));
2782       region.put(put);
2783 
2784       put = new Put(row2);
2785       put.add(cf_essential, col_alpha, Bytes.toBytes(2));
2786       put.add(cf_joined, col_normal, Bytes.toBytes(2));
2787       put.add(cf_alpha, col_alpha, Bytes.toBytes(2));
2788       region.put(put);
2789 
2790       put = new Put(row3);
2791       put.add(cf_essential, col_normal, filtered_val);
2792       put.add(cf_joined, col_normal, filtered_val);
2793       region.put(put);
2794 
2795       // Check two things:
2796       // 1. result list contains expected values
2797       // 2. result list is sorted properly
2798 
2799       Scan scan = new Scan();
2800       Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
2801           CompareOp.NOT_EQUAL, filtered_val);
2802       scan.setFilter(filter);
2803       scan.setLoadColumnFamiliesOnDemand(true);
2804       InternalScanner s = region.getScanner(scan);
2805 
2806       List<Cell> results = new ArrayList<Cell>();
2807       assertTrue(s.next(results));
2808       assertEquals(results.size(), 1);
2809       results.clear();
2810 
2811       assertTrue(s.next(results));
2812       assertEquals(results.size(), 3);
2813       assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
2814       assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
2815       assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
2816       results.clear();
2817 
2818       assertFalse(s.next(results));
2819       assertEquals(results.size(), 0);
2820     } finally {
2821       HRegion.closeHRegion(this.region);
2822       this.region = null;
2823     }
2824   }
2825 
2826   /**
2827    * HBASE-5416
2828    * 
2829    * Test case when scan limits amount of KVs returned on each next() call.
2830    */
2831   @Test
2832   public void testScanner_JoinedScannersWithLimits() throws IOException {
2833     final byte[] cf_first = Bytes.toBytes("first");
2834     final byte[] cf_second = Bytes.toBytes("second");
2835 
2836     this.region = initHRegion(tableName, getName(), CONF, cf_first, cf_second);
2837     try {
2838       final byte[] col_a = Bytes.toBytes("a");
2839       final byte[] col_b = Bytes.toBytes("b");
2840 
2841       Put put;
2842 
2843       for (int i = 0; i < 10; i++) {
2844         put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
2845         put.add(cf_first, col_a, Bytes.toBytes(i));
2846         if (i < 5) {
2847           put.add(cf_first, col_b, Bytes.toBytes(i));
2848           put.add(cf_second, col_a, Bytes.toBytes(i));
2849           put.add(cf_second, col_b, Bytes.toBytes(i));
2850         }
2851         region.put(put);
2852       }
2853 
2854       Scan scan = new Scan();
2855       scan.setLoadColumnFamiliesOnDemand(true);
2856       Filter bogusFilter = new FilterBase() {
2857         @Override
2858         public boolean isFamilyEssential(byte[] name) {
2859           return Bytes.equals(name, cf_first);
2860         }
2861       };
2862 
2863       scan.setFilter(bogusFilter);
2864       InternalScanner s = region.getScanner(scan);
2865 
2866       // Our data looks like this:
2867       // r0: first:a, first:b, second:a, second:b
2868       // r1: first:a, first:b, second:a, second:b
2869       // r2: first:a, first:b, second:a, second:b
2870       // r3: first:a, first:b, second:a, second:b
2871       // r4: first:a, first:b, second:a, second:b
2872       // r5: first:a
2873       // r6: first:a
2874       // r7: first:a
2875       // r8: first:a
2876       // r9: first:a
2877 
2878       // But due to next's limit set to 3, we should get this:
2879       // r0: first:a, first:b, second:a
2880       // r0: second:b
2881       // r1: first:a, first:b, second:a
2882       // r1: second:b
2883       // r2: first:a, first:b, second:a
2884       // r2: second:b
2885       // r3: first:a, first:b, second:a
2886       // r3: second:b
2887       // r4: first:a, first:b, second:a
2888       // r4: second:b
2889       // r5: first:a
2890       // r6: first:a
2891       // r7: first:a
2892       // r8: first:a
2893       // r9: first:a
2894 
2895       List<Cell> results = new ArrayList<Cell>();
2896       int index = 0;
2897       while (true) {
2898         boolean more = s.next(results, 3);
2899         if ((index >> 1) < 5) {
2900           if (index % 2 == 0)
2901             assertEquals(results.size(), 3);
2902           else
2903             assertEquals(results.size(), 1);
2904         } else
2905           assertEquals(results.size(), 1);
2906         results.clear();
2907         index++;
2908         if (!more)
2909           break;
2910       }
2911     } finally {
2912       HRegion.closeHRegion(this.region);
2913       this.region = null;
2914     }
2915   }
2916 
2917   // ////////////////////////////////////////////////////////////////////////////
2918   // Split test
2919   // ////////////////////////////////////////////////////////////////////////////
2920   /**
2921    * Splits twice and verifies getting from each of the split regions.
2922    * 
2923    * @throws Exception
2924    */
2925   @Test
2926   public void testBasicSplit() throws Exception {
2927     byte[][] families = { fam1, fam2, fam3 };
2928 
2929     Configuration hc = initSplit();
2930     // Setting up region
2931     String method = this.getName();
2932     this.region = initHRegion(tableName, method, hc, families);
2933 
2934     try {
2935       LOG.info("" + HBaseTestCase.addContent(region, fam3));
2936       region.flushcache();
2937       region.compactStores();
2938       byte[] splitRow = region.checkSplit();
2939       assertNotNull(splitRow);
2940       LOG.info("SplitRow: " + Bytes.toString(splitRow));
2941       HRegion[] regions = splitRegion(region, splitRow);
2942       try {
2943         // Need to open the regions.
2944         // TODO: Add an 'open' to HRegion... don't do open by constructing
2945         // instance.
2946         for (int i = 0; i < regions.length; i++) {
2947           regions[i] = HRegion.openHRegion(regions[i], null);
2948         }
2949         // Assert can get rows out of new regions. Should be able to get first
2950         // row from first region and the midkey from second region.
2951         assertGet(regions[0], fam3, Bytes.toBytes(START_KEY));
2952         assertGet(regions[1], fam3, splitRow);
2953         // Test I can get scanner and that it starts at right place.
2954         assertScan(regions[0], fam3, Bytes.toBytes(START_KEY));
2955         assertScan(regions[1], fam3, splitRow);
2956         // Now prove can't split regions that have references.
2957         for (int i = 0; i < regions.length; i++) {
2958           // Add so much data to this region, we create a store file that is >
2959           // than one of our unsplitable references. it will.
2960           for (int j = 0; j < 2; j++) {
2961             HBaseTestCase.addContent(regions[i], fam3);
2962           }
2963           HBaseTestCase.addContent(regions[i], fam2);
2964           HBaseTestCase.addContent(regions[i], fam1);
2965           regions[i].flushcache();
2966         }
2967 
2968         byte[][] midkeys = new byte[regions.length][];
2969         // To make regions splitable force compaction.
2970         for (int i = 0; i < regions.length; i++) {
2971           regions[i].compactStores();
2972           midkeys[i] = regions[i].checkSplit();
2973         }
2974 
2975         TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>();
2976         // Split these two daughter regions so then I'll have 4 regions. Will
2977         // split because added data above.
2978         for (int i = 0; i < regions.length; i++) {
2979           HRegion[] rs = null;
2980           if (midkeys[i] != null) {
2981             rs = splitRegion(regions[i], midkeys[i]);
2982             for (int j = 0; j < rs.length; j++) {
2983               sortedMap.put(Bytes.toString(rs[j].getRegionName()), HRegion.openHRegion(rs[j], null));
2984             }
2985           }
2986         }
2987         LOG.info("Made 4 regions");
2988         // The splits should have been even. Test I can get some arbitrary row
2989         // out of each.
2990         int interval = (LAST_CHAR - FIRST_CHAR) / 3;
2991         byte[] b = Bytes.toBytes(START_KEY);
2992         for (HRegion r : sortedMap.values()) {
2993           assertGet(r, fam3, b);
2994           b[0] += interval;
2995         }
2996       } finally {
2997         for (int i = 0; i < regions.length; i++) {
2998           try {
2999             regions[i].close();
3000           } catch (IOException e) {
3001             // Ignore.
3002           }
3003         }
3004       }
3005     } finally {
3006       HRegion.closeHRegion(this.region);
3007       this.region = null;
3008     }
3009   }
3010 
3011   @Test
3012   public void testSplitRegion() throws IOException {
3013     byte[] qualifier = Bytes.toBytes("qualifier");
3014     Configuration hc = initSplit();
3015     int numRows = 10;
3016     byte[][] families = { fam1, fam3 };
3017 
3018     // Setting up region
3019     String method = this.getName();
3020     this.region = initHRegion(tableName, method, hc, families);
3021 
3022     // Put data in region
3023     int startRow = 100;
3024     putData(startRow, numRows, qualifier, families);
3025     int splitRow = startRow + numRows;
3026     putData(splitRow, numRows, qualifier, families);
3027     region.flushcache();
3028 
3029     HRegion[] regions = null;
3030     try {
3031       regions = splitRegion(region, Bytes.toBytes("" + splitRow));
3032       // Opening the regions returned.
3033       for (int i = 0; i < regions.length; i++) {
3034         regions[i] = HRegion.openHRegion(regions[i], null);
3035       }
3036       // Verifying that the region has been split
3037       assertEquals(2, regions.length);
3038 
3039       // Verifying that all data is still there and that data is in the right
3040       // place
3041       verifyData(regions[0], startRow, numRows, qualifier, families);
3042       verifyData(regions[1], splitRow, numRows, qualifier, families);
3043 
3044     } finally {
3045       HRegion.closeHRegion(this.region);
3046       this.region = null;
3047     }
3048   }
3049 
3050   /**
3051    * Flushes the cache in a thread while scanning. The tests verify that the
3052    * scan is coherent - e.g. the returned results are always of the same or
3053    * later update as the previous results.
3054    * 
3055    * @throws IOException
3056    *           scan / compact
3057    * @throws InterruptedException
3058    *           thread join
3059    */
3060   @Test
3061   public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3062     byte[] family = Bytes.toBytes("family");
3063     int numRows = 1000;
3064     int flushAndScanInterval = 10;
3065     int compactInterval = 10 * flushAndScanInterval;
3066 
3067     String method = "testFlushCacheWhileScanning";
3068     this.region = initHRegion(tableName, method, CONF, family);
3069     try {
3070       FlushThread flushThread = new FlushThread();
3071       flushThread.start();
3072 
3073       Scan scan = new Scan();
3074       scan.addFamily(family);
3075       scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOp.EQUAL,
3076           new BinaryComparator(Bytes.toBytes(5L))));
3077 
3078       int expectedCount = 0;
3079       List<Cell> res = new ArrayList<Cell>();
3080 
3081       boolean toggle = true;
3082       for (long i = 0; i < numRows; i++) {
3083         Put put = new Put(Bytes.toBytes(i));
3084         put.setDurability(Durability.SKIP_WAL);
3085         put.add(family, qual1, Bytes.toBytes(i % 10));
3086         region.put(put);
3087 
3088         if (i != 0 && i % compactInterval == 0) {
3089           // System.out.println("iteration = " + i);
3090           region.compactStores(true);
3091         }
3092 
3093         if (i % 10 == 5L) {
3094           expectedCount++;
3095         }
3096 
3097         if (i != 0 && i % flushAndScanInterval == 0) {
3098           res.clear();
3099           InternalScanner scanner = region.getScanner(scan);
3100           if (toggle) {
3101             flushThread.flush();
3102           }
3103           while (scanner.next(res))
3104             ;
3105           if (!toggle) {
3106             flushThread.flush();
3107           }
3108           assertEquals("i=" + i, expectedCount, res.size());
3109           toggle = !toggle;
3110         }
3111       }
3112 
3113       flushThread.done();
3114       flushThread.join();
3115       flushThread.checkNoError();
3116     } finally {
3117       HRegion.closeHRegion(this.region);
3118       this.region = null;
3119     }
3120   }
3121 
3122   protected class FlushThread extends Thread {
3123     private volatile boolean done;
3124     private Throwable error = null;
3125 
3126     public void done() {
3127       done = true;
3128       synchronized (this) {
3129         interrupt();
3130       }
3131     }
3132 
3133     public void checkNoError() {
3134       if (error != null) {
3135         assertNull(error);
3136       }
3137     }
3138 
3139     @Override
3140     public void run() {
3141       done = false;
3142       while (!done) {
3143         synchronized (this) {
3144           try {
3145             wait();
3146           } catch (InterruptedException ignored) {
3147             if (done) {
3148               break;
3149             }
3150           }
3151         }
3152         try {
3153           region.flushcache();
3154         } catch (IOException e) {
3155           if (!done) {
3156             LOG.error("Error while flusing cache", e);
3157             error = e;
3158           }
3159           break;
3160         }
3161       }
3162 
3163     }
3164 
3165     public void flush() {
3166       synchronized (this) {
3167         notify();
3168       }
3169 
3170     }
3171   }
3172 
3173   /**
3174    * Writes very wide records and scans for the latest every time.. Flushes and
3175    * compacts the region every now and then to keep things realistic.
3176    * 
3177    * @throws IOException
3178    *           by flush / scan / compaction
3179    * @throws InterruptedException
3180    *           when joining threads
3181    */
3182   @Test
3183   public void testWritesWhileScanning() throws IOException, InterruptedException {
3184     int testCount = 100;
3185     int numRows = 1;
3186     int numFamilies = 10;
3187     int numQualifiers = 100;
3188     int flushInterval = 7;
3189     int compactInterval = 5 * flushInterval;
3190     byte[][] families = new byte[numFamilies][];
3191     for (int i = 0; i < numFamilies; i++) {
3192       families[i] = Bytes.toBytes("family" + i);
3193     }
3194     byte[][] qualifiers = new byte[numQualifiers][];
3195     for (int i = 0; i < numQualifiers; i++) {
3196       qualifiers[i] = Bytes.toBytes("qual" + i);
3197     }
3198 
3199     String method = "testWritesWhileScanning";
3200     this.region = initHRegion(tableName, method, CONF, families);
3201     try {
3202       PutThread putThread = new PutThread(numRows, families, qualifiers);
3203       putThread.start();
3204       putThread.waitForFirstPut();
3205 
3206       FlushThread flushThread = new FlushThread();
3207       flushThread.start();
3208 
3209       Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
3210 
3211       int expectedCount = numFamilies * numQualifiers;
3212       List<Cell> res = new ArrayList<Cell>();
3213 
3214       long prevTimestamp = 0L;
3215       for (int i = 0; i < testCount; i++) {
3216 
3217         if (i != 0 && i % compactInterval == 0) {
3218           region.compactStores(true);
3219         }
3220 
3221         if (i != 0 && i % flushInterval == 0) {
3222           flushThread.flush();
3223         }
3224 
3225         boolean previousEmpty = res.isEmpty();
3226         res.clear();
3227         InternalScanner scanner = region.getScanner(scan);
3228         while (scanner.next(res))
3229           ;
3230         if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3231           assertEquals("i=" + i, expectedCount, res.size());
3232           long timestamp = res.get(0).getTimestamp();
3233           assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3234               timestamp >= prevTimestamp);
3235           prevTimestamp = timestamp;
3236         }
3237       }
3238 
3239       putThread.done();
3240 
3241       region.flushcache();
3242 
3243       putThread.join();
3244       putThread.checkNoError();
3245 
3246       flushThread.done();
3247       flushThread.join();
3248       flushThread.checkNoError();
3249     } finally {
3250       try {
3251         HRegion.closeHRegion(this.region);
3252       } catch (DroppedSnapshotException dse) {
3253         // We could get this on way out because we interrupt the background flusher and it could
3254         // fail anywhere causing a DSE over in the background flusher... only it is not properly
3255         // dealt with so could still be memory hanging out when we get to here -- memory we can't
3256         // flush because the accounting is 'off' since original DSE.
3257       }
3258       this.region = null;
3259     }
3260   }
3261 
3262   protected class PutThread extends Thread {
3263     private volatile boolean done;
3264     private volatile int numPutsFinished = 0;
3265 
3266     private Throwable error = null;
3267     private int numRows;
3268     private byte[][] families;
3269     private byte[][] qualifiers;
3270 
3271     private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3272       this.numRows = numRows;
3273       this.families = families;
3274       this.qualifiers = qualifiers;
3275     }
3276 
3277     /**
3278      * Block until this thread has put at least one row.
3279      */
3280     public void waitForFirstPut() throws InterruptedException {
3281       // wait until put thread actually puts some data
3282       while (numPutsFinished == 0) {
3283         checkNoError();
3284         Thread.sleep(50);
3285       }
3286     }
3287 
3288     public void done() {
3289       done = true;
3290       synchronized (this) {
3291         interrupt();
3292       }
3293     }
3294 
3295     public void checkNoError() {
3296       if (error != null) {
3297         assertNull(error);
3298       }
3299     }
3300 
3301     @Override
3302     public void run() {
3303       done = false;
3304       while (!done) {
3305         try {
3306           for (int r = 0; r < numRows; r++) {
3307             byte[] row = Bytes.toBytes("row" + r);
3308             Put put = new Put(row);
3309             put.setDurability(Durability.SKIP_WAL);
3310             byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3311             for (byte[] family : families) {
3312               for (byte[] qualifier : qualifiers) {
3313                 put.add(family, qualifier, (long) numPutsFinished, value);
3314               }
3315             }
3316             region.put(put);
3317             numPutsFinished++;
3318             if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
3319               System.out.println("put iteration = " + numPutsFinished);
3320               Delete delete = new Delete(row, (long) numPutsFinished - 30);
3321               region.delete(delete);
3322             }
3323             numPutsFinished++;
3324           }
3325         } catch (InterruptedIOException e) {
3326           // This is fine. It means we are done, or didn't get the lock on time
3327         } catch (IOException e) {
3328           LOG.error("error while putting records", e);
3329           error = e;
3330           break;
3331         }
3332       }
3333 
3334     }
3335 
3336   }
3337 
3338   /**
3339    * Writes very wide records and gets the latest row every time.. Flushes and
3340    * compacts the region aggressivly to catch issues.
3341    * 
3342    * @throws IOException
3343    *           by flush / scan / compaction
3344    * @throws InterruptedException
3345    *           when joining threads
3346    */
3347   @Test
3348   public void testWritesWhileGetting() throws Exception {
3349     int testCount = 100;
3350     int numRows = 1;
3351     int numFamilies = 10;
3352     int numQualifiers = 100;
3353     int compactInterval = 100;
3354     byte[][] families = new byte[numFamilies][];
3355     for (int i = 0; i < numFamilies; i++) {
3356       families[i] = Bytes.toBytes("family" + i);
3357     }
3358     byte[][] qualifiers = new byte[numQualifiers][];
3359     for (int i = 0; i < numQualifiers; i++) {
3360       qualifiers[i] = Bytes.toBytes("qual" + i);
3361     }
3362 
3363 
3364     String method = "testWritesWhileGetting";
3365     // This test flushes constantly and can cause many files to be created,
3366     // possibly
3367     // extending over the ulimit. Make sure compactions are aggressive in
3368     // reducing
3369     // the number of HFiles created.
3370     Configuration conf = HBaseConfiguration.create(CONF);
3371     conf.setInt("hbase.hstore.compaction.min", 1);
3372     conf.setInt("hbase.hstore.compaction.max", 1000);
3373     this.region = initHRegion(tableName, method, conf, families);
3374     PutThread putThread = null;
3375     MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
3376     try {
3377       putThread = new PutThread(numRows, families, qualifiers);
3378       putThread.start();
3379       putThread.waitForFirstPut();
3380 
3381       // Add a thread that flushes as fast as possible
3382       ctx.addThread(new RepeatingTestThread(ctx) {
3383         private int flushesSinceCompact = 0;
3384         private final int maxFlushesSinceCompact = 20;
3385 
3386         @Override
3387         public void doAnAction() throws Exception {
3388           if (region.flushcache().isCompactionNeeded()) {
3389             ++flushesSinceCompact;
3390           }
3391           // Compact regularly to avoid creating too many files and exceeding
3392           // the ulimit.
3393           if (flushesSinceCompact == maxFlushesSinceCompact) {
3394             region.compactStores(false);
3395             flushesSinceCompact = 0;
3396           }
3397         }
3398       });
3399       ctx.startThreads();
3400 
3401       Get get = new Get(Bytes.toBytes("row0"));
3402       Result result = null;
3403 
3404       int expectedCount = numFamilies * numQualifiers;
3405 
3406       long prevTimestamp = 0L;
3407       for (int i = 0; i < testCount; i++) {
3408 
3409         boolean previousEmpty = result == null || result.isEmpty();
3410         result = region.get(get);
3411         if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
3412           assertEquals("i=" + i, expectedCount, result.size());
3413           // TODO this was removed, now what dangit?!
3414           // search looking for the qualifier in question?
3415           long timestamp = 0;
3416           for (Cell kv : result.rawCells()) {
3417             if (CellUtil.matchingFamily(kv, families[0])
3418                 && CellUtil.matchingQualifier(kv, qualifiers[0])) {
3419               timestamp = kv.getTimestamp();
3420             }
3421           }
3422           assertTrue(timestamp >= prevTimestamp);
3423           prevTimestamp = timestamp;
3424           Cell previousKV = null;
3425 
3426           for (Cell kv : result.rawCells()) {
3427             byte[] thisValue = CellUtil.cloneValue(kv);
3428             if (previousKV != null) {
3429               if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
3430                 LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
3431                     + "(memStoreTS:" + previousKV.getMvccVersion() + ")" + ", New KV: " + kv
3432                     + "(memStoreTS:" + kv.getMvccVersion() + ")");
3433                 assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
3434               }
3435             }
3436             previousKV = kv;
3437           }
3438         }
3439       }
3440     } finally {
3441       if (putThread != null)
3442         putThread.done();
3443 
3444       region.flushcache();
3445 
3446       if (putThread != null) {
3447         putThread.join();
3448         putThread.checkNoError();
3449       }
3450 
3451       ctx.stop();
3452       HRegion.closeHRegion(this.region);
3453       this.region = null;
3454     }
3455   }
3456 
3457   @Test
3458   public void testHolesInMeta() throws Exception {
3459     byte[] family = Bytes.toBytes("family");
3460     this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
3461         false, family);
3462     try {
3463       byte[] rowNotServed = Bytes.toBytes("a");
3464       Get g = new Get(rowNotServed);
3465       try {
3466         region.get(g);
3467         fail();
3468       } catch (WrongRegionException x) {
3469         // OK
3470       }
3471       byte[] row = Bytes.toBytes("y");
3472       g = new Get(row);
3473       region.get(g);
3474     } finally {
3475       HRegion.closeHRegion(this.region);
3476       this.region = null;
3477     }
3478   }
3479 
3480   @Test
3481   public void testIndexesScanWithOneDeletedRow() throws IOException {
3482     byte[] family = Bytes.toBytes("family");
3483 
3484     // Setting up region
3485     String method = "testIndexesScanWithOneDeletedRow";
3486     this.region = initHRegion(tableName, method, CONF, family);
3487     try {
3488       Put put = new Put(Bytes.toBytes(1L));
3489       put.add(family, qual1, 1L, Bytes.toBytes(1L));
3490       region.put(put);
3491 
3492       region.flushcache();
3493 
3494       Delete delete = new Delete(Bytes.toBytes(1L), 1L);
3495       region.delete(delete);
3496 
3497       put = new Put(Bytes.toBytes(2L));
3498       put.add(family, qual1, 2L, Bytes.toBytes(2L));
3499       region.put(put);
3500 
3501       Scan idxScan = new Scan();
3502       idxScan.addFamily(family);
3503       idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
3504           new SingleColumnValueFilter(family, qual1, CompareOp.GREATER_OR_EQUAL,
3505               new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
3506               CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
3507       InternalScanner scanner = region.getScanner(idxScan);
3508       List<Cell> res = new ArrayList<Cell>();
3509 
3510       while (scanner.next(res))
3511         ;
3512       assertEquals(1L, res.size());
3513     } finally {
3514       HRegion.closeHRegion(this.region);
3515       this.region = null;
3516     }
3517   }
3518 
3519   // ////////////////////////////////////////////////////////////////////////////
3520   // Bloom filter test
3521   // ////////////////////////////////////////////////////////////////////////////
3522   @Test
3523   public void testBloomFilterSize() throws IOException {
3524     byte[] fam1 = Bytes.toBytes("fam1");
3525     byte[] qf1 = Bytes.toBytes("col");
3526     byte[] val1 = Bytes.toBytes("value1");
3527     // Create Table
3528     HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE)
3529         .setBloomFilterType(BloomType.ROWCOL);
3530 
3531     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
3532     htd.addFamily(hcd);
3533     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
3534     this.region = TEST_UTIL.createLocalHRegion(info, htd);
3535     try {
3536       int num_unique_rows = 10;
3537       int duplicate_multiplier = 2;
3538       int num_storefiles = 4;
3539 
3540       int version = 0;
3541       for (int f = 0; f < num_storefiles; f++) {
3542         for (int i = 0; i < duplicate_multiplier; i++) {
3543           for (int j = 0; j < num_unique_rows; j++) {
3544             Put put = new Put(Bytes.toBytes("row" + j));
3545             put.setDurability(Durability.SKIP_WAL);
3546             put.add(fam1, qf1, version++, val1);
3547             region.put(put);
3548           }
3549         }
3550         region.flushcache();
3551       }
3552       // before compaction
3553       HStore store = (HStore) region.getStore(fam1);
3554       Collection<StoreFile> storeFiles = store.getStorefiles();
3555       for (StoreFile storefile : storeFiles) {
3556         StoreFile.Reader reader = storefile.getReader();
3557         reader.loadFileInfo();
3558         reader.loadBloomfilter();
3559         assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
3560         assertEquals(num_unique_rows, reader.getFilterEntries());
3561       }
3562 
3563       region.compactStores(true);
3564 
3565       // after compaction
3566       storeFiles = store.getStorefiles();
3567       for (StoreFile storefile : storeFiles) {
3568         StoreFile.Reader reader = storefile.getReader();
3569         reader.loadFileInfo();
3570         reader.loadBloomfilter();
3571         assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
3572         assertEquals(num_unique_rows, reader.getFilterEntries());
3573       }
3574     } finally {
3575       HRegion.closeHRegion(this.region);
3576       this.region = null;
3577     }
3578   }
3579 
3580   @Test
3581   public void testAllColumnsWithBloomFilter() throws IOException {
3582     byte[] TABLE = Bytes.toBytes("testAllColumnsWithBloomFilter");
3583     byte[] FAMILY = Bytes.toBytes("family");
3584 
3585     // Create table
3586     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE)
3587         .setBloomFilterType(BloomType.ROWCOL);
3588     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
3589     htd.addFamily(hcd);
3590     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
3591     this.region = TEST_UTIL.createLocalHRegion(info, htd);
3592     try {
3593       // For row:0, col:0: insert versions 1 through 5.
3594       byte row[] = Bytes.toBytes("row:" + 0);
3595       byte column[] = Bytes.toBytes("column:" + 0);
3596       Put put = new Put(row);
3597       put.setDurability(Durability.SKIP_WAL);
3598       for (long idx = 1; idx <= 4; idx++) {
3599         put.add(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
3600       }
3601       region.put(put);
3602 
3603       // Flush
3604       region.flushcache();
3605 
3606       // Get rows
3607       Get get = new Get(row);
3608       get.setMaxVersions();
3609       Cell[] kvs = region.get(get).rawCells();
3610 
3611       // Check if rows are correct
3612       assertEquals(4, kvs.length);
3613       checkOneCell(kvs[0], FAMILY, 0, 0, 4);
3614       checkOneCell(kvs[1], FAMILY, 0, 0, 3);
3615       checkOneCell(kvs[2], FAMILY, 0, 0, 2);
3616       checkOneCell(kvs[3], FAMILY, 0, 0, 1);
3617     } finally {
3618       HRegion.closeHRegion(this.region);
3619       this.region = null;
3620     }
3621   }
3622 
3623   /**
3624    * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
3625    * issuing delete row on columns with bloom filter set to row+col
3626    * (BloomType.ROWCOL)
3627    */
3628   @Test
3629   public void testDeleteRowWithBloomFilter() throws IOException {
3630     byte[] familyName = Bytes.toBytes("familyName");
3631 
3632     // Create Table
3633     HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE)
3634         .setBloomFilterType(BloomType.ROWCOL);
3635 
3636     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
3637     htd.addFamily(hcd);
3638     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
3639     this.region = TEST_UTIL.createLocalHRegion(info, htd);
3640     try {
3641       // Insert some data
3642       byte row[] = Bytes.toBytes("row1");
3643       byte col[] = Bytes.toBytes("col1");
3644 
3645       Put put = new Put(row);
3646       put.add(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
3647       region.put(put);
3648       region.flushcache();
3649 
3650       Delete del = new Delete(row);
3651       region.delete(del);
3652       region.flushcache();
3653 
3654       // Get remaining rows (should have none)
3655       Get get = new Get(row);
3656       get.addColumn(familyName, col);
3657 
3658       Cell[] keyValues = region.get(get).rawCells();
3659       assertTrue(keyValues.length == 0);
3660     } finally {
3661       HRegion.closeHRegion(this.region);
3662       this.region = null;
3663     }
3664   }
3665 
3666   @Test
3667   public void testgetHDFSBlocksDistribution() throws Exception {
3668     HBaseTestingUtility htu = new HBaseTestingUtility();
3669     final int DEFAULT_BLOCK_SIZE = 1024;
3670     htu.getConfiguration().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
3671     htu.getConfiguration().setInt("dfs.replication", 2);
3672 
3673     // set up a cluster with 3 nodes
3674     MiniHBaseCluster cluster = null;
3675     String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
3676     int regionServersCount = 3;
3677 
3678     try {
3679       cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts);
3680       byte[][] families = { fam1, fam2 };
3681       HTable ht = htu.createTable(Bytes.toBytes(this.getName()), families);
3682 
3683       // Setting up region
3684       byte row[] = Bytes.toBytes("row1");
3685       byte col[] = Bytes.toBytes("col1");
3686 
3687       Put put = new Put(row);
3688       put.add(fam1, col, 1, Bytes.toBytes("test1"));
3689       put.add(fam2, col, 1, Bytes.toBytes("test2"));
3690       ht.put(put);
3691 
3692       HRegion firstRegion = htu.getHBaseCluster().getRegions(TableName.valueOf(this.getName()))
3693           .get(0);
3694       firstRegion.flushcache();
3695       HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
3696 
3697       // given the default replication factor is 2 and we have 2 HFiles,
3698       // we will have total of 4 replica of blocks on 3 datanodes; thus there
3699       // must be at least one host that have replica for 2 HFiles. That host's
3700       // weight will be equal to the unique block weight.
3701       long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
3702 
3703       String topHost = blocksDistribution1.getTopHosts().get(0);
3704       long topHostWeight = blocksDistribution1.getWeight(topHost);
3705       assertTrue(uniqueBlocksWeight1 == topHostWeight);
3706 
3707       // use the static method to compute the value, it should be the same.
3708       // static method is used by load balancer or other components
3709       HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
3710           htu.getConfiguration(), firstRegion.getTableDesc(), firstRegion.getRegionInfo());
3711       long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
3712 
3713       assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
3714 
3715       ht.close();
3716     } finally {
3717       if (cluster != null) {
3718         htu.shutdownMiniCluster();
3719       }
3720     }
3721   }
3722 
3723   /**
3724    * Testcase to check state of region initialization task set to ABORTED or not
3725    * if any exceptions during initialization
3726    * 
3727    * @throws Exception
3728    */
3729   @Test
3730   public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
3731     TableName tableName = TableName.valueOf(name.getMethodName());
3732     HRegionInfo info = null;
3733     try {
3734       FileSystem fs = Mockito.mock(FileSystem.class);
3735       Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
3736       HTableDescriptor htd = new HTableDescriptor(tableName);
3737       htd.addFamily(new HColumnDescriptor("cf"));
3738       info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
3739           HConstants.EMPTY_BYTE_ARRAY, false);
3740       Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
3741       region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
3742       // region initialization throws IOException and set task state to ABORTED.
3743       region.initialize();
3744       fail("Region initialization should fail due to IOException");
3745     } catch (IOException io) {
3746       List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
3747       for (MonitoredTask monitoredTask : tasks) {
3748         if (!(monitoredTask instanceof MonitoredRPCHandler)
3749             && monitoredTask.getDescription().contains(region.toString())) {
3750           assertTrue("Region state should be ABORTED.",
3751               monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
3752           break;
3753         }
3754       }
3755     } finally {
3756       HRegion.closeHRegion(region);
3757     }
3758   }
3759 
3760   /**
3761    * Verifies that the .regioninfo file is written on region creation and that
3762    * is recreated if missing during region opening.
3763    */
3764   @Test
3765   public void testRegionInfoFileCreation() throws IOException {
3766     Path rootDir = new Path(dir + "testRegionInfoFileCreation");
3767 
3768     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testtb"));
3769     htd.addFamily(new HColumnDescriptor("cf"));
3770 
3771     HRegionInfo hri = new HRegionInfo(htd.getTableName());
3772 
3773     // Create a region and skip the initialization (like CreateTableHandler)
3774     HRegion region = HRegion.createHRegion(hri, rootDir, CONF, htd, null, false, true);
3775 //    HRegion region = TEST_UTIL.createLocalHRegion(hri, htd);
3776     Path regionDir = region.getRegionFileSystem().getRegionDir();
3777     FileSystem fs = region.getRegionFileSystem().getFileSystem();
3778     HRegion.closeHRegion(region);
3779 
3780     Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
3781 
3782     // Verify that the .regioninfo file is present
3783     assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
3784         fs.exists(regionInfoFile));
3785 
3786     // Try to open the region
3787     region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
3788     assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
3789     HRegion.closeHRegion(region);
3790 
3791     // Verify that the .regioninfo file is still there
3792     assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
3793         fs.exists(regionInfoFile));
3794 
3795     // Remove the .regioninfo file and verify is recreated on region open
3796     fs.delete(regionInfoFile);
3797     assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
3798         fs.exists(regionInfoFile));
3799 
3800     region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
3801 //    region = TEST_UTIL.openHRegion(hri, htd);
3802     assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
3803     HRegion.closeHRegion(region);
3804 
3805     // Verify that the .regioninfo file is still there
3806     assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
3807         fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
3808   }
3809 
3810   /**
3811    * TestCase for increment
3812    */
3813   private static class Incrementer implements Runnable {
3814     private HRegion region;
3815     private final static byte[] incRow = Bytes.toBytes("incRow");
3816     private final static byte[] family = Bytes.toBytes("family");
3817     private final static byte[] qualifier = Bytes.toBytes("qualifier");
3818     private final static long ONE = 1l;
3819     private int incCounter;
3820 
3821     public Incrementer(HRegion region, int incCounter) {
3822       this.region = region;
3823       this.incCounter = incCounter;
3824     }
3825 
3826     @Override
3827     public void run() {
3828       int count = 0;
3829       while (count < incCounter) {
3830         Increment inc = new Increment(incRow);
3831         inc.addColumn(family, qualifier, ONE);
3832         count++;
3833         try {
3834           region.increment(inc);
3835         } catch (IOException e) {
3836           e.printStackTrace();
3837           break;
3838         }
3839       }
3840     }
3841   }
3842 
3843   /**
3844    * Test case to check increment function with memstore flushing
3845    * @throws Exception
3846    */
3847   @Test
3848   public void testParallelIncrementWithMemStoreFlush() throws Exception {
3849     byte[] family = Incrementer.family;
3850     this.region = initHRegion(tableName, method, CONF, family);
3851     final HRegion region = this.region;
3852     final AtomicBoolean incrementDone = new AtomicBoolean(false);
3853     Runnable flusher = new Runnable() {
3854       @Override
3855       public void run() {
3856         while (!incrementDone.get()) {
3857           try {
3858             region.flushcache();
3859           } catch (Exception e) {
3860             e.printStackTrace();
3861           }
3862         }
3863       }
3864     };
3865 
3866     // after all increment finished, the row will increment to 20*100 = 2000
3867     int threadNum = 20;
3868     int incCounter = 100;
3869     long expected = threadNum * incCounter;
3870     Thread[] incrementers = new Thread[threadNum];
3871     Thread flushThread = new Thread(flusher);
3872     for (int i = 0; i < threadNum; i++) {
3873       incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
3874       incrementers[i].start();
3875     }
3876     flushThread.start();
3877     for (int i = 0; i < threadNum; i++) {
3878       incrementers[i].join();
3879     }
3880 
3881     incrementDone.set(true);
3882     flushThread.join();
3883 
3884     Get get = new Get(Incrementer.incRow);
3885     get.addColumn(Incrementer.family, Incrementer.qualifier);
3886     get.setMaxVersions(1);
3887     Result res = this.region.get(get);
3888     List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
3889 
3890     // we just got the latest version
3891     assertEquals(kvs.size(), 1);
3892     Cell kv = kvs.get(0);
3893     assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
3894     this.region = null;
3895   }
3896 
3897   /**
3898    * TestCase for append
3899    */
3900   private static class Appender implements Runnable {
3901     private HRegion region;
3902     private final static byte[] appendRow = Bytes.toBytes("appendRow");
3903     private final static byte[] family = Bytes.toBytes("family");
3904     private final static byte[] qualifier = Bytes.toBytes("qualifier");
3905     private final static byte[] CHAR = Bytes.toBytes("a");
3906     private int appendCounter;
3907 
3908     public Appender(HRegion region, int appendCounter) {
3909       this.region = region;
3910       this.appendCounter = appendCounter;
3911     }
3912 
3913     @Override
3914     public void run() {
3915       int count = 0;
3916       while (count < appendCounter) {
3917         Append app = new Append(appendRow);
3918         app.add(family, qualifier, CHAR);
3919         count++;
3920         try {
3921           region.append(app);
3922         } catch (IOException e) {
3923           e.printStackTrace();
3924           break;
3925         }
3926       }
3927     }
3928   }
3929 
3930   /**
3931    * Test case to check append function with memstore flushing
3932    * @throws Exception
3933    */
3934   @Test
3935   public void testParallelAppendWithMemStoreFlush() throws Exception {
3936     byte[] family = Appender.family;
3937     this.region = initHRegion(tableName, method, CONF, family);
3938     final HRegion region = this.region;
3939     final AtomicBoolean appendDone = new AtomicBoolean(false);
3940     Runnable flusher = new Runnable() {
3941       @Override
3942       public void run() {
3943         while (!appendDone.get()) {
3944           try {
3945             region.flushcache();
3946           } catch (Exception e) {
3947             e.printStackTrace();
3948           }
3949         }
3950       }
3951     };
3952 
3953     // after all append finished, the value will append to threadNum *
3954     // appendCounter Appender.CHAR
3955     int threadNum = 20;
3956     int appendCounter = 100;
3957     byte[] expected = new byte[threadNum * appendCounter];
3958     for (int i = 0; i < threadNum * appendCounter; i++) {
3959       System.arraycopy(Appender.CHAR, 0, expected, i, 1);
3960     }
3961     Thread[] appenders = new Thread[threadNum];
3962     Thread flushThread = new Thread(flusher);
3963     for (int i = 0; i < threadNum; i++) {
3964       appenders[i] = new Thread(new Appender(this.region, appendCounter));
3965       appenders[i].start();
3966     }
3967     flushThread.start();
3968     for (int i = 0; i < threadNum; i++) {
3969       appenders[i].join();
3970     }
3971 
3972     appendDone.set(true);
3973     flushThread.join();
3974 
3975     Get get = new Get(Appender.appendRow);
3976     get.addColumn(Appender.family, Appender.qualifier);
3977     get.setMaxVersions(1);
3978     Result res = this.region.get(get);
3979     List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
3980 
3981     // we just got the latest version
3982     assertEquals(kvs.size(), 1);
3983     Cell kv = kvs.get(0);
3984     byte[] appendResult = new byte[kv.getValueLength()];
3985     System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
3986     assertArrayEquals(expected, appendResult);
3987     this.region = null;
3988   }
3989 
3990   /**
3991    * Test case to check put function with memstore flushing for same row, same ts
3992    * @throws Exception
3993    */
3994   @Test
3995   public void testPutWithMemStoreFlush() throws Exception {
3996     byte[] family = Bytes.toBytes("family");
3997     ;
3998     byte[] qualifier = Bytes.toBytes("qualifier");
3999     byte[] row = Bytes.toBytes("putRow");
4000     byte[] value = null;
4001     this.region = initHRegion(tableName, method, CONF, family);
4002     Put put = null;
4003     Get get = null;
4004     List<Cell> kvs = null;
4005     Result res = null;
4006 
4007     put = new Put(row);
4008     value = Bytes.toBytes("value0");
4009     put.add(family, qualifier, 1234567l, value);
4010     region.put(put);
4011     get = new Get(row);
4012     get.addColumn(family, qualifier);
4013     get.setMaxVersions();
4014     res = this.region.get(get);
4015     kvs = res.getColumnCells(family, qualifier);
4016     assertEquals(1, kvs.size());
4017     assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4018 
4019     region.flushcache();
4020     get = new Get(row);
4021     get.addColumn(family, qualifier);
4022     get.setMaxVersions();
4023     res = this.region.get(get);
4024     kvs = res.getColumnCells(family, qualifier);
4025     assertEquals(1, kvs.size());
4026     assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4027 
4028     put = new Put(row);
4029     value = Bytes.toBytes("value1");
4030     put.add(family, qualifier, 1234567l, value);
4031     region.put(put);
4032     get = new Get(row);
4033     get.addColumn(family, qualifier);
4034     get.setMaxVersions();
4035     res = this.region.get(get);
4036     kvs = res.getColumnCells(family, qualifier);
4037     assertEquals(1, kvs.size());
4038     assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4039 
4040     region.flushcache();
4041     get = new Get(row);
4042     get.addColumn(family, qualifier);
4043     get.setMaxVersions();
4044     res = this.region.get(get);
4045     kvs = res.getColumnCells(family, qualifier);
4046     assertEquals(1, kvs.size());
4047     assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4048   }
4049 
4050   @Test
4051   public void testDurability() throws Exception {
4052     String method = "testDurability";
4053     // there are 5 x 5 cases:
4054     // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4055     // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4056 
4057     // expected cases for append and sync wal
4058     durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4059     durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4060     durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4061 
4062     durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4063     durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4064     durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4065 
4066     durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4067     durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4068 
4069     durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4070     durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4071 
4072     durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4073     durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4074     durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4075 
4076     // expected cases for async wal
4077     durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4078     durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4079     durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4080     durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4081     durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4082     durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4083 
4084     durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4085     durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4086     durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4087     durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4088     durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4089     durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4090 
4091     // expect skip wal cases
4092     durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4093     durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4094     durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4095     durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
4096     durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
4097     durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
4098 
4099   }
4100 
4101   private void durabilityTest(String method, Durability tableDurability,
4102       Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4103       final boolean expectSyncFromLogSyncer) throws Exception {
4104     Configuration conf = HBaseConfiguration.create(CONF);
4105     method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4106     TableName tableName = TableName.valueOf(method);
4107     byte[] family = Bytes.toBytes("family");
4108     Path logDir = new Path(new Path(dir + method), "log");
4109     HLog hlog = HLogFactory.createHLog(FILESYSTEM, logDir, UUID.randomUUID().toString(), conf);
4110     final HLog log = spy(hlog);
4111     this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
4112         HConstants.EMPTY_END_ROW, method, conf, false, tableDurability, log,
4113         new byte[][] { family });
4114 
4115     Put put = new Put(Bytes.toBytes("r1"));
4116     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4117     put.setDurability(mutationDurability);
4118     region.put(put);
4119 
4120     //verify append called or not
4121     verify(log, expectAppend ? times(1) : never())
4122       .appendNoSync((HRegionInfo)any(), eq(tableName), (WALEdit)any(), (List<UUID>)any(),
4123         anyLong(), (HTableDescriptor)any(), (AtomicLong)any(), anyBoolean(), anyLong(), anyLong());
4124 
4125     // verify sync called or not
4126     if (expectSync || expectSyncFromLogSyncer) {
4127       TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4128         @Override
4129         public boolean evaluate() throws Exception {
4130           try {
4131             if (expectSync) {
4132               verify(log, times(1)).sync(anyLong()); // Hregion calls this one
4133             } else if (expectSyncFromLogSyncer) {
4134               verify(log, times(1)).sync(); // log syncer calls this one
4135             }
4136           } catch (Throwable ignore) {
4137           }
4138           return true;
4139         }
4140       });
4141     } else {
4142       verify(log, never()).sync(anyLong());
4143       verify(log, never()).sync();
4144     }
4145 
4146     HRegion.closeHRegion(this.region);
4147     this.region = null;
4148   }
4149 
4150   private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4151     for (int i = startRow; i < startRow + numRows; i++) {
4152       Put put = new Put(Bytes.toBytes("" + i));
4153       put.setDurability(Durability.SKIP_WAL);
4154       for (byte[] family : families) {
4155         put.add(family, qf, null);
4156       }
4157       region.put(put);
4158     }
4159   }
4160 
4161   private void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
4162       throws IOException {
4163     for (int i = startRow; i < startRow + numRows; i++) {
4164       byte[] row = Bytes.toBytes("" + i);
4165       Get get = new Get(row);
4166       for (byte[] family : families) {
4167         get.addColumn(family, qf);
4168       }
4169       Result result = newReg.get(get);
4170       Cell[] raw = result.rawCells();
4171       assertEquals(families.length, result.size());
4172       for (int j = 0; j < families.length; j++) {
4173         assertTrue(CellUtil.matchingRow(raw[j], row));
4174         assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
4175         assertTrue(CellUtil.matchingQualifier(raw[j], qf));
4176       }
4177     }
4178   }
4179 
4180   private void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
4181     // Now I have k, get values out and assert they are as expected.
4182     Get get = new Get(k).addFamily(family).setMaxVersions();
4183     Cell[] results = r.get(get).rawCells();
4184     for (int j = 0; j < results.length; j++) {
4185       byte[] tmp = CellUtil.cloneValue(results[j]);
4186       // Row should be equal to value every time.
4187       assertTrue(Bytes.equals(k, tmp));
4188     }
4189   }
4190 
4191   /*
4192    * Assert first value in the passed region is <code>firstValue</code>.
4193    * 
4194    * @param r
4195    * 
4196    * @param fs
4197    * 
4198    * @param firstValue
4199    * 
4200    * @throws IOException
4201    */
4202   private void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
4203       throws IOException {
4204     byte[][] families = { fs };
4205     Scan scan = new Scan();
4206     for (int i = 0; i < families.length; i++)
4207       scan.addFamily(families[i]);
4208     InternalScanner s = r.getScanner(scan);
4209     try {
4210       List<Cell> curVals = new ArrayList<Cell>();
4211       boolean first = true;
4212       OUTER_LOOP: while (s.next(curVals)) {
4213         for (Cell kv : curVals) {
4214           byte[] val = CellUtil.cloneValue(kv);
4215           byte[] curval = val;
4216           if (first) {
4217             first = false;
4218             assertTrue(Bytes.compareTo(curval, firstValue) == 0);
4219           } else {
4220             // Not asserting anything. Might as well break.
4221             break OUTER_LOOP;
4222           }
4223         }
4224       }
4225     } finally {
4226       s.close();
4227     }
4228   }
4229 
4230   /**
4231    * Test that we get the expected flush results back
4232    * @throws IOException
4233    */
4234   @Test
4235   public void testFlushResult() throws IOException {
4236     String method = name.getMethodName();
4237     byte[] tableName = Bytes.toBytes(method);
4238     byte[] family = Bytes.toBytes("family");
4239 
4240     this.region = initHRegion(tableName, method, family);
4241 
4242     // empty memstore, flush doesn't run
4243     HRegion.FlushResult fr = region.flushcache();
4244     assertFalse(fr.isFlushSucceeded());
4245     assertFalse(fr.isCompactionNeeded());
4246 
4247     // Flush enough files to get up to the threshold, doesn't need compactions
4248     for (int i = 0; i < 2; i++) {
4249       Put put = new Put(tableName).add(family, family, tableName);
4250       region.put(put);
4251       fr = region.flushcache();
4252       assertTrue(fr.isFlushSucceeded());
4253       assertFalse(fr.isCompactionNeeded());
4254     }
4255 
4256     // Two flushes after the threshold, compactions are needed
4257     for (int i = 0; i < 2; i++) {
4258       Put put = new Put(tableName).add(family, family, tableName);
4259       region.put(put);
4260       fr = region.flushcache();
4261       assertTrue(fr.isFlushSucceeded());
4262       assertTrue(fr.isCompactionNeeded());
4263     }
4264   }
4265 
4266   private Configuration initSplit() {
4267     // Always compact if there is more than one store file.
4268     CONF.setInt("hbase.hstore.compactionThreshold", 2);
4269 
4270     // Make lease timeout longer, lease checks less frequent
4271     CONF.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
4272 
4273     CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
4274 
4275     // Increase the amount of time between client retries
4276     CONF.setLong("hbase.client.pause", 15 * 1000);
4277 
4278     // This size should make it so we always split using the addContent
4279     // below. After adding all data, the first region is 1.3M
4280     CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
4281     return CONF;
4282   }
4283 
4284   /**
4285    * @param tableName
4286    * @param callingMethod
4287    * @param conf
4288    * @param families
4289    * @throws IOException
4290    * @return A region on which you must call
4291    *         {@link HRegion#closeHRegion(HRegion)} when done.
4292    */
4293   public static HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
4294       byte[]... families) throws IOException {
4295     return initHRegion(tableName.getName(), null, null, callingMethod, conf, false, families);
4296   }
4297 
4298   /**
4299    * @param tableName
4300    * @param callingMethod
4301    * @param conf
4302    * @param families
4303    * @throws IOException
4304    * @return A region on which you must call
4305    *         {@link HRegion#closeHRegion(HRegion)} when done.
4306    */
4307   public static HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf,
4308       byte[]... families) throws IOException {
4309     return initHRegion(tableName, null, null, callingMethod, conf, false, families);
4310   }
4311 
4312   /**
4313    * @param tableName
4314    * @param callingMethod
4315    * @param conf
4316    * @param isReadOnly
4317    * @param families
4318    * @throws IOException
4319    * @return A region on which you must call
4320    *         {@link HRegion#closeHRegion(HRegion)} when done.
4321    */
4322   public static HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf,
4323       boolean isReadOnly, byte[]... families) throws IOException {
4324     return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
4325   }
4326 
4327   private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
4328       String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
4329       throws IOException {
4330     return initHRegion(tableName, startKey, stopKey, callingMethod, conf, isReadOnly,
4331         Durability.SYNC_WAL, null, families);
4332   }
4333 
4334   /**
4335    * @param tableName
4336    * @param startKey
4337    * @param stopKey
4338    * @param callingMethod
4339    * @param conf
4340    * @param isReadOnly
4341    * @param families
4342    * @throws IOException
4343    * @return A region on which you must call
4344    *         {@link HRegion#closeHRegion(HRegion)} when done.
4345    */
4346   private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
4347       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
4348       HLog hlog, byte[]... families) throws IOException {
4349     return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf, isReadOnly, durability, hlog, families);
4350   }
4351 
4352   /**
4353    * Assert that the passed in Cell has expected contents for the specified row,
4354    * column & timestamp.
4355    */
4356   private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
4357     String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
4358     assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
4359         Bytes.toString(CellUtil.cloneRow(kv)));
4360     assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
4361         Bytes.toString(CellUtil.cloneFamily(kv)));
4362     assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
4363         Bytes.toString(CellUtil.cloneQualifier(kv)));
4364     assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
4365     assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
4366         Bytes.toString(CellUtil.cloneValue(kv)));
4367   }
4368 
4369   public void testReverseScanner_FromMemStore_SingleCF_Normal()
4370       throws IOException {
4371     byte[] rowC = Bytes.toBytes("rowC");
4372     byte[] rowA = Bytes.toBytes("rowA");
4373     byte[] rowB = Bytes.toBytes("rowB");
4374     byte[] cf = Bytes.toBytes("CF");
4375     byte[][] families = { cf };
4376     byte[] col = Bytes.toBytes("C");
4377     long ts = 1;
4378     String method = this.getName();
4379     this.region = initHRegion(tableName, method, families);
4380     try {
4381       KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
4382       KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
4383           null);
4384       KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
4385       KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
4386       Put put = null;
4387       put = new Put(rowC);
4388       put.add(kv1);
4389       put.add(kv11);
4390       region.put(put);
4391       put = new Put(rowA);
4392       put.add(kv2);
4393       region.put(put);
4394       put = new Put(rowB);
4395       put.add(kv3);
4396       region.put(put);
4397 
4398       Scan scan = new Scan(rowC);
4399       scan.setMaxVersions(5);
4400       scan.setReversed(true);
4401       InternalScanner scanner = region.getScanner(scan);
4402       List<Cell> currRow = new ArrayList<Cell>();
4403       boolean hasNext = scanner.next(currRow);
4404       assertEquals(2, currRow.size());
4405       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
4406       assertTrue(hasNext);
4407       currRow.clear();
4408       hasNext = scanner.next(currRow);
4409       assertEquals(1, currRow.size());
4410       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
4411       assertTrue(hasNext);
4412       currRow.clear();
4413       hasNext = scanner.next(currRow);
4414       assertEquals(1, currRow.size());
4415       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
4416       assertFalse(hasNext);
4417       scanner.close();
4418     } finally {
4419       HRegion.closeHRegion(this.region);
4420       this.region = null;
4421     }
4422   }
4423 
4424   public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
4425       throws IOException {
4426     byte[] rowC = Bytes.toBytes("rowC");
4427     byte[] rowA = Bytes.toBytes("rowA");
4428     byte[] rowB = Bytes.toBytes("rowB");
4429     byte[] rowD = Bytes.toBytes("rowD");
4430     byte[] cf = Bytes.toBytes("CF");
4431     byte[][] families = { cf };
4432     byte[] col = Bytes.toBytes("C");
4433     long ts = 1;
4434     String method = this.getName();
4435     this.region = initHRegion(tableName, method, families);
4436     try {
4437       KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
4438       KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
4439           null);
4440       KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
4441       KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
4442       Put put = null;
4443       put = new Put(rowC);
4444       put.add(kv1);
4445       put.add(kv11);
4446       region.put(put);
4447       put = new Put(rowA);
4448       put.add(kv2);
4449       region.put(put);
4450       put = new Put(rowB);
4451       put.add(kv3);
4452       region.put(put);
4453 
4454       Scan scan = new Scan(rowD);
4455       List<Cell> currRow = new ArrayList<Cell>();
4456       scan.setReversed(true);
4457       scan.setMaxVersions(5);
4458       InternalScanner scanner = region.getScanner(scan);
4459       boolean hasNext = scanner.next(currRow);
4460       assertEquals(2, currRow.size());
4461       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
4462       assertTrue(hasNext);
4463       currRow.clear();
4464       hasNext = scanner.next(currRow);
4465       assertEquals(1, currRow.size());
4466       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
4467       assertTrue(hasNext);
4468       currRow.clear();
4469       hasNext = scanner.next(currRow);
4470       assertEquals(1, currRow.size());
4471       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
4472       assertFalse(hasNext);
4473       scanner.close();
4474     } finally {
4475       HRegion.closeHRegion(this.region);
4476       this.region = null;
4477     }
4478   }
4479 
4480   public void testReverseScanner_FromMemStore_SingleCF_FullScan()
4481       throws IOException {
4482     byte[] rowC = Bytes.toBytes("rowC");
4483     byte[] rowA = Bytes.toBytes("rowA");
4484     byte[] rowB = Bytes.toBytes("rowB");
4485     byte[] cf = Bytes.toBytes("CF");
4486     byte[][] families = { cf };
4487     byte[] col = Bytes.toBytes("C");
4488     long ts = 1;
4489     String method = this.getName();
4490     this.region = initHRegion(tableName, method, families);
4491     try {
4492       KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
4493       KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
4494           null);
4495       KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
4496       KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
4497       Put put = null;
4498       put = new Put(rowC);
4499       put.add(kv1);
4500       put.add(kv11);
4501       region.put(put);
4502       put = new Put(rowA);
4503       put.add(kv2);
4504       region.put(put);
4505       put = new Put(rowB);
4506       put.add(kv3);
4507       region.put(put);
4508       Scan scan = new Scan();
4509       List<Cell> currRow = new ArrayList<Cell>();
4510       scan.setReversed(true);
4511       InternalScanner scanner = region.getScanner(scan);
4512       boolean hasNext = scanner.next(currRow);
4513       assertEquals(1, currRow.size());
4514       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
4515       assertTrue(hasNext);
4516       currRow.clear();
4517       hasNext = scanner.next(currRow);
4518       assertEquals(1, currRow.size());
4519       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
4520       assertTrue(hasNext);
4521       currRow.clear();
4522       hasNext = scanner.next(currRow);
4523       assertEquals(1, currRow.size());
4524       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
4525       assertFalse(hasNext);
4526       scanner.close();
4527     } finally {
4528       HRegion.closeHRegion(this.region);
4529       this.region = null;
4530     }
4531   }
4532 
4533   public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
4534     // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
4535     byte[] rowA = Bytes.toBytes("rowA");
4536     byte[] rowB = Bytes.toBytes("rowB");
4537     byte[] rowC = Bytes.toBytes("rowC");
4538     byte[] rowD = Bytes.toBytes("rowD");
4539     byte[] rowE = Bytes.toBytes("rowE");
4540     byte[] cf = Bytes.toBytes("CF");
4541     byte[][] families = { cf };
4542     byte[] col1 = Bytes.toBytes("col1");
4543     byte[] col2 = Bytes.toBytes("col2");
4544     long ts = 1;
4545     String method = this.getName();
4546     this.region = initHRegion(tableName, method, families);
4547     try {
4548       KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
4549       KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
4550       KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
4551       KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
4552       KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
4553       KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
4554       Put put = null;
4555       put = new Put(rowA);
4556       put.add(kv1);
4557       region.put(put);
4558       put = new Put(rowB);
4559       put.add(kv2);
4560       region.put(put);
4561       put = new Put(rowC);
4562       put.add(kv3);
4563       region.put(put);
4564       put = new Put(rowD);
4565       put.add(kv4_1);
4566       region.put(put);
4567       put = new Put(rowD);
4568       put.add(kv4_2);
4569       region.put(put);
4570       put = new Put(rowE);
4571       put.add(kv5);
4572       region.put(put);
4573       region.flushcache();
4574       Scan scan = new Scan(rowD, rowA);
4575       scan.addColumn(families[0], col1);
4576       scan.setReversed(true);
4577       List<Cell> currRow = new ArrayList<Cell>();
4578       InternalScanner scanner = region.getScanner(scan);
4579       boolean hasNext = scanner.next(currRow);
4580       assertEquals(1, currRow.size());
4581       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
4582       assertTrue(hasNext);
4583       currRow.clear();
4584       hasNext = scanner.next(currRow);
4585       assertEquals(1, currRow.size());
4586       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
4587       assertTrue(hasNext);
4588       currRow.clear();
4589       hasNext = scanner.next(currRow);
4590       assertEquals(1, currRow.size());
4591       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
4592       assertFalse(hasNext);
4593       scanner.close();
4594 
4595       scan = new Scan(rowD, rowA);
4596       scan.addColumn(families[0], col2);
4597       scan.setReversed(true);
4598       currRow.clear();
4599       scanner = region.getScanner(scan);
4600       hasNext = scanner.next(currRow);
4601       assertEquals(1, currRow.size());
4602       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
4603       scanner.close();
4604     } finally {
4605       HRegion.closeHRegion(this.region);
4606       this.region = null;
4607     }
4608   }
4609 
4610   public void testReverseScanner_smaller_blocksize() throws IOException {
4611     // case to ensure no conflict with HFile index optimization
4612     byte[] rowA = Bytes.toBytes("rowA");
4613     byte[] rowB = Bytes.toBytes("rowB");
4614     byte[] rowC = Bytes.toBytes("rowC");
4615     byte[] rowD = Bytes.toBytes("rowD");
4616     byte[] rowE = Bytes.toBytes("rowE");
4617     byte[] cf = Bytes.toBytes("CF");
4618     byte[][] families = { cf };
4619     byte[] col1 = Bytes.toBytes("col1");
4620     byte[] col2 = Bytes.toBytes("col2");
4621     long ts = 1;
4622     String method = this.getName();
4623     HBaseConfiguration config = new HBaseConfiguration();
4624     config.setInt("test.block.size", 1);
4625     this.region = initHRegion(tableName, method, config, families);
4626     try {
4627       KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
4628       KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
4629       KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
4630       KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
4631       KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
4632       KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
4633       Put put = null;
4634       put = new Put(rowA);
4635       put.add(kv1);
4636       region.put(put);
4637       put = new Put(rowB);
4638       put.add(kv2);
4639       region.put(put);
4640       put = new Put(rowC);
4641       put.add(kv3);
4642       region.put(put);
4643       put = new Put(rowD);
4644       put.add(kv4_1);
4645       region.put(put);
4646       put = new Put(rowD);
4647       put.add(kv4_2);
4648       region.put(put);
4649       put = new Put(rowE);
4650       put.add(kv5);
4651       region.put(put);
4652       region.flushcache();
4653       Scan scan = new Scan(rowD, rowA);
4654       scan.addColumn(families[0], col1);
4655       scan.setReversed(true);
4656       List<Cell> currRow = new ArrayList<Cell>();
4657       InternalScanner scanner = region.getScanner(scan);
4658       boolean hasNext = scanner.next(currRow);
4659       assertEquals(1, currRow.size());
4660       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
4661       assertTrue(hasNext);
4662       currRow.clear();
4663       hasNext = scanner.next(currRow);
4664       assertEquals(1, currRow.size());
4665       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
4666       assertTrue(hasNext);
4667       currRow.clear();
4668       hasNext = scanner.next(currRow);
4669       assertEquals(1, currRow.size());
4670       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
4671       assertFalse(hasNext);
4672       scanner.close();
4673 
4674       scan = new Scan(rowD, rowA);
4675       scan.addColumn(families[0], col2);
4676       scan.setReversed(true);
4677       currRow.clear();
4678       scanner = region.getScanner(scan);
4679       hasNext = scanner.next(currRow);
4680       assertEquals(1, currRow.size());
4681       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
4682       scanner.close();
4683     } finally {
4684       HRegion.closeHRegion(this.region);
4685       this.region = null;
4686     }
4687   }
4688 
4689   public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
4690       throws IOException {
4691     byte[] row0 = Bytes.toBytes("row0"); // 1 kv
4692     byte[] row1 = Bytes.toBytes("row1"); // 2 kv
4693     byte[] row2 = Bytes.toBytes("row2"); // 4 kv
4694     byte[] row3 = Bytes.toBytes("row3"); // 2 kv
4695     byte[] row4 = Bytes.toBytes("row4"); // 5 kv
4696     byte[] row5 = Bytes.toBytes("row5"); // 2 kv
4697     byte[] cf1 = Bytes.toBytes("CF1");
4698     byte[] cf2 = Bytes.toBytes("CF2");
4699     byte[] cf3 = Bytes.toBytes("CF3");
4700     byte[][] families = { cf1, cf2, cf3 };
4701     byte[] col = Bytes.toBytes("C");
4702     long ts = 1;
4703     String method = this.getName();
4704     HBaseConfiguration conf = new HBaseConfiguration();
4705     // disable compactions in this test.
4706     conf.setInt("hbase.hstore.compactionThreshold", 10000);
4707     this.region = initHRegion(tableName, method, conf, families);
4708     try {
4709       // kv naming style: kv(row number) totalKvCountInThisRow seq no
4710       KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
4711           null);
4712       KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
4713           null);
4714       KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
4715           KeyValue.Type.Put, null);
4716       KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
4717           null);
4718       KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
4719           null);
4720       KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
4721           null);
4722       KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
4723           KeyValue.Type.Put, null);
4724       KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
4725           null);
4726       KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
4727           KeyValue.Type.Put, null);
4728       KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
4729           null);
4730       KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
4731           null);
4732       KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
4733           KeyValue.Type.Put, null);
4734       KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
4735           null);
4736       KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
4737           KeyValue.Type.Put, null);
4738       KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
4739           null);
4740       KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
4741           null);
4742       // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
4743       Put put = null;
4744       put = new Put(row1);
4745       put.add(kv1_2_1);
4746       region.put(put);
4747       put = new Put(row2);
4748       put.add(kv2_4_1);
4749       region.put(put);
4750       put = new Put(row4);
4751       put.add(kv4_5_4);
4752       put.add(kv4_5_5);
4753       region.put(put);
4754       region.flushcache();
4755       // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
4756       put = new Put(row4);
4757       put.add(kv4_5_1);
4758       put.add(kv4_5_3);
4759       region.put(put);
4760       put = new Put(row1);
4761       put.add(kv1_2_2);
4762       region.put(put);
4763       put = new Put(row2);
4764       put.add(kv2_4_4);
4765       region.put(put);
4766       region.flushcache();
4767       // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
4768       put = new Put(row4);
4769       put.add(kv4_5_2);
4770       region.put(put);
4771       put = new Put(row2);
4772       put.add(kv2_4_2);
4773       put.add(kv2_4_3);
4774       region.put(put);
4775       put = new Put(row3);
4776       put.add(kv3_2_2);
4777       region.put(put);
4778       region.flushcache();
4779       // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
4780       // ( 2 kv)
4781       put = new Put(row0);
4782       put.add(kv0_1_1);
4783       region.put(put);
4784       put = new Put(row3);
4785       put.add(kv3_2_1);
4786       region.put(put);
4787       put = new Put(row5);
4788       put.add(kv5_2_1);
4789       put.add(kv5_2_2);
4790       region.put(put);
4791       // scan range = ["row4", min), skip the max "row5"
4792       Scan scan = new Scan(row4);
4793       scan.setMaxVersions(5);
4794       scan.setBatch(3);
4795       scan.setReversed(true);
4796       InternalScanner scanner = region.getScanner(scan);
4797       List<Cell> currRow = new ArrayList<Cell>();
4798       boolean hasNext = false;
4799       // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
4800       // included in scan range
4801       // "row4" takes 2 next() calls since batch=3
4802       hasNext = scanner.next(currRow);
4803       assertEquals(3, currRow.size());
4804       assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
4805       assertTrue(hasNext);
4806       currRow.clear();
4807       hasNext = scanner.next(currRow);
4808       assertEquals(2, currRow.size());
4809       assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
4810       assertTrue(hasNext);
4811       // 2. scan out "row3" (2 kv)
4812       currRow.clear();
4813       hasNext = scanner.next(currRow);
4814       assertEquals(2, currRow.size());
4815       assertTrue(Bytes.equals(currRow.get(0).getRow(), row3));
4816       assertTrue(hasNext);
4817       // 3. scan out "row2" (4 kvs)
4818       // "row2" takes 2 next() calls since batch=3
4819       currRow.clear();
4820       hasNext = scanner.next(currRow);
4821       assertEquals(3, currRow.size());
4822       assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
4823       assertTrue(hasNext);
4824       currRow.clear();
4825       hasNext = scanner.next(currRow);
4826       assertEquals(1, currRow.size());
4827       assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
4828       assertTrue(hasNext);
4829       // 4. scan out "row1" (2 kv)
4830       currRow.clear();
4831       hasNext = scanner.next(currRow);
4832       assertEquals(2, currRow.size());
4833       assertTrue(Bytes.equals(currRow.get(0).getRow(), row1));
4834       assertTrue(hasNext);
4835       // 5. scan out "row0" (1 kv)
4836       currRow.clear();
4837       hasNext = scanner.next(currRow);
4838       assertEquals(1, currRow.size());
4839       assertTrue(Bytes.equals(currRow.get(0).getRow(), row0));
4840       assertFalse(hasNext);
4841 
4842       scanner.close();
4843     } finally {
4844       HRegion.closeHRegion(this.region);
4845       this.region = null;
4846     }
4847   }
4848 
4849   public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
4850       throws IOException {
4851     byte[] row1 = Bytes.toBytes("row1");
4852     byte[] row2 = Bytes.toBytes("row2");
4853     byte[] row3 = Bytes.toBytes("row3");
4854     byte[] row4 = Bytes.toBytes("row4");
4855     byte[] cf1 = Bytes.toBytes("CF1");
4856     byte[] cf2 = Bytes.toBytes("CF2");
4857     byte[] cf3 = Bytes.toBytes("CF3");
4858     byte[] cf4 = Bytes.toBytes("CF4");
4859     byte[][] families = { cf1, cf2, cf3, cf4 };
4860     byte[] col = Bytes.toBytes("C");
4861     long ts = 1;
4862     String method = this.getName();
4863     HBaseConfiguration conf = new HBaseConfiguration();
4864     // disable compactions in this test.
4865     conf.setInt("hbase.hstore.compactionThreshold", 10000);
4866     this.region = initHRegion(tableName, method, conf, families);
4867     try {
4868       KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
4869       KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
4870       KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
4871       KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
4872       // storefile1
4873       Put put = new Put(row1);
4874       put.add(kv1);
4875       region.put(put);
4876       region.flushcache();
4877       // storefile2
4878       put = new Put(row2);
4879       put.add(kv2);
4880       region.put(put);
4881       region.flushcache();
4882       // storefile3
4883       put = new Put(row3);
4884       put.add(kv3);
4885       region.put(put);
4886       region.flushcache();
4887       // memstore
4888       put = new Put(row4);
4889       put.add(kv4);
4890       region.put(put);
4891       // scan range = ["row4", min)
4892       Scan scan = new Scan(row4);
4893       scan.setReversed(true);
4894       scan.setBatch(10);
4895       InternalScanner scanner = region.getScanner(scan);
4896       List<Cell> currRow = new ArrayList<Cell>();
4897       boolean hasNext = scanner.next(currRow);
4898       assertEquals(1, currRow.size());
4899       assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
4900       assertTrue(hasNext);
4901       currRow.clear();
4902       hasNext = scanner.next(currRow);
4903       assertEquals(1, currRow.size());
4904       assertTrue(Bytes.equals(currRow.get(0).getRow(), row3));
4905       assertTrue(hasNext);
4906       currRow.clear();
4907       hasNext = scanner.next(currRow);
4908       assertEquals(1, currRow.size());
4909       assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
4910       assertTrue(hasNext);
4911       currRow.clear();
4912       hasNext = scanner.next(currRow);
4913       assertEquals(1, currRow.size());
4914       assertTrue(Bytes.equals(currRow.get(0).getRow(), row1));
4915       assertFalse(hasNext);
4916     } finally {
4917       HRegion.closeHRegion(this.region);
4918       this.region = null;
4919     }
4920   }
4921 
4922   @Test
4923   public void testWriteRequestsCounter() throws IOException {
4924     byte[] fam = Bytes.toBytes("info");
4925     byte[][] families = { fam };
4926     this.region = initHRegion(tableName, method, CONF, families);
4927 
4928     Assert.assertEquals(0L, region.getWriteRequestsCount());
4929 
4930     Put put = new Put(row);
4931     put.add(fam, fam, fam);
4932 
4933     Assert.assertEquals(0L, region.getWriteRequestsCount());
4934     region.put(put);
4935     Assert.assertEquals(1L, region.getWriteRequestsCount());
4936     region.put(put);
4937     Assert.assertEquals(2L, region.getWriteRequestsCount());
4938     region.put(put);
4939     Assert.assertEquals(3L, region.getWriteRequestsCount());
4940 
4941     region.delete(new Delete(row));
4942     Assert.assertEquals(4L, region.getWriteRequestsCount());
4943 
4944     HRegion.closeHRegion(this.region);
4945     this.region = null;
4946   }
4947 
4948   private static HRegion initHRegion(byte[] tableName, String callingMethod,
4949       byte[]... families) throws IOException {
4950     return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
4951         families);
4952   }
4953 }