View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR;
24  import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR;
25  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
26  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
27  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
28  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
29  import static org.junit.Assert.assertArrayEquals;
30  import static org.junit.Assert.assertEquals;
31  import static org.junit.Assert.assertFalse;
32  import static org.junit.Assert.assertNotNull;
33  import static org.junit.Assert.assertNull;
34  import static org.junit.Assert.assertTrue;
35  import static org.junit.Assert.fail;
36  import static org.mockito.Matchers.any;
37  import static org.mockito.Matchers.anyBoolean;
38  import static org.mockito.Matchers.anyLong;
39  import static org.mockito.Mockito.mock;
40  import static org.mockito.Mockito.never;
41  import static org.mockito.Mockito.spy;
42  import static org.mockito.Mockito.times;
43  import static org.mockito.Mockito.verify;
44  import static org.mockito.Mockito.when;
45  
46  import java.io.IOException;
47  import java.io.InterruptedIOException;
48  import java.security.PrivilegedExceptionAction;
49  import java.util.ArrayList;
50  import java.util.Arrays;
51  import java.util.Collection;
52  import java.util.Collections;
53  import java.util.HashMap;
54  import java.util.List;
55  import java.util.Map;
56  import java.util.NavigableMap;
57  import java.util.TreeMap;
58  import java.util.UUID;
59  import java.util.concurrent.Callable;
60  import java.util.concurrent.CountDownLatch;
61  import java.util.concurrent.ExecutorService;
62  import java.util.concurrent.Executors;
63  import java.util.concurrent.Future;
64  import java.util.concurrent.TimeUnit;
65  import java.util.concurrent.atomic.AtomicBoolean;
66  import java.util.concurrent.atomic.AtomicInteger;
67  import java.util.concurrent.atomic.AtomicReference;
68  
69  import org.apache.commons.lang.RandomStringUtils;
70  import org.apache.commons.logging.Log;
71  import org.apache.commons.logging.LogFactory;
72  import org.apache.hadoop.conf.Configuration;
73  import org.apache.hadoop.fs.FSDataOutputStream;
74  import org.apache.hadoop.fs.FileStatus;
75  import org.apache.hadoop.fs.FileSystem;
76  import org.apache.hadoop.fs.Path;
77  import org.apache.hadoop.hbase.Cell;
78  import org.apache.hadoop.hbase.CellComparator;
79  import org.apache.hadoop.hbase.CellUtil;
80  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
81  import org.apache.hadoop.hbase.DroppedSnapshotException;
82  import org.apache.hadoop.hbase.HBaseConfiguration;
83  import org.apache.hadoop.hbase.HBaseTestCase;
84  import org.apache.hadoop.hbase.HBaseTestingUtility;
85  import org.apache.hadoop.hbase.HColumnDescriptor;
86  import org.apache.hadoop.hbase.HConstants;
87  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
88  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
89  import org.apache.hadoop.hbase.HRegionInfo;
90  import org.apache.hadoop.hbase.HTableDescriptor;
91  import org.apache.hadoop.hbase.KeyValue;
92  import org.apache.hadoop.hbase.MiniHBaseCluster;
93  import org.apache.hadoop.hbase.MultithreadedTestUtil;
94  import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
95  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
96  import org.apache.hadoop.hbase.NotServingRegionException;
97  import org.apache.hadoop.hbase.RegionTooBusyException;
98  import org.apache.hadoop.hbase.ServerName;
99  import org.apache.hadoop.hbase.TableName;
100 import org.apache.hadoop.hbase.Tag;
101 import org.apache.hadoop.hbase.TagType;
102 import org.apache.hadoop.hbase.Waiter;
103 import org.apache.hadoop.hbase.client.Append;
104 import org.apache.hadoop.hbase.client.Delete;
105 import org.apache.hadoop.hbase.client.Durability;
106 import org.apache.hadoop.hbase.client.Get;
107 import org.apache.hadoop.hbase.client.Increment;
108 import org.apache.hadoop.hbase.client.Mutation;
109 import org.apache.hadoop.hbase.client.Put;
110 import org.apache.hadoop.hbase.client.Result;
111 import org.apache.hadoop.hbase.client.RowMutations;
112 import org.apache.hadoop.hbase.client.Scan;
113 import org.apache.hadoop.hbase.client.Table;
114 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
115 import org.apache.hadoop.hbase.filter.BinaryComparator;
116 import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
117 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
118 import org.apache.hadoop.hbase.filter.Filter;
119 import org.apache.hadoop.hbase.filter.FilterBase;
120 import org.apache.hadoop.hbase.filter.FilterList;
121 import org.apache.hadoop.hbase.filter.NullComparator;
122 import org.apache.hadoop.hbase.filter.PrefixFilter;
123 import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
124 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
125 import org.apache.hadoop.hbase.io.hfile.HFile;
126 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
127 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
128 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
129 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
130 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
131 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
132 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
133 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
134 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
135 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
136 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
137 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
138 import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
139 import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
140 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
141 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
142 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
143 import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
144 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
145 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
146 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
147 import org.apache.hadoop.hbase.security.User;
148 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
149 import org.apache.hadoop.hbase.testclassification.MediumTests;
150 import org.apache.hadoop.hbase.util.Bytes;
151 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
152 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
153 import org.apache.hadoop.hbase.util.FSUtils;
154 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
155 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
156 import org.apache.hadoop.hbase.util.PairOfSameType;
157 import org.apache.hadoop.hbase.util.Threads;
158 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
159 import org.apache.hadoop.hbase.wal.FaultyFSLog;
160 import org.apache.hadoop.hbase.wal.WAL;
161 import org.apache.hadoop.hbase.wal.WALFactory;
162 import org.apache.hadoop.hbase.wal.WALKey;
163 import org.apache.hadoop.hbase.wal.WALProvider;
164 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
165 import org.apache.hadoop.hbase.wal.WALSplitter;
166 import org.junit.After;
167 import org.junit.Assert;
168 import org.junit.Before;
169 import org.junit.Rule;
170 import org.junit.Test;
171 import org.junit.experimental.categories.Category;
172 import org.junit.rules.TestName;
173 import org.mockito.ArgumentCaptor;
174 import org.mockito.ArgumentMatcher;
175 import org.mockito.Mockito;
176 import org.mockito.invocation.InvocationOnMock;
177 import org.mockito.stubbing.Answer;
178 
179 import com.google.common.collect.ImmutableList;
180 import com.google.common.collect.Lists;
181 import com.google.common.collect.Maps;
182 import com.google.protobuf.ByteString;
183 
184 /**
185  * Basic stand-alone testing of HRegion.  No clusters!
186  *
187  * A lot of the meta information for an HRegion now lives inside other HRegions
188  * or in the HBaseMaster, so only basic testing is possible.
189  */
190 @Category(MediumTests.class)
191 @SuppressWarnings("deprecation")
192 public class TestHRegion {
193   // Do not spin up clusters in here. If you need to spin up a cluster, do it
194   // over in TestHRegionOnCluster.
195   private static final Log LOG = LogFactory.getLog(TestHRegion.class);
196   @Rule public TestName name = new TestName();
197 
198   private static final String COLUMN_FAMILY = "MyCF";
199   private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
200 
201   HRegion region = null;
202   // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
203   private static HBaseTestingUtility TEST_UTIL;
204   public static Configuration CONF ;
205   private String dir;
206   private static FileSystem FILESYSTEM;
207   private final int MAX_VERSIONS = 2;
208 
209   // Test names
210   protected byte[] tableName;
211   protected String method;
212   protected final byte[] qual1 = Bytes.toBytes("qual1");
213   protected final byte[] qual2 = Bytes.toBytes("qual2");
214   protected final byte[] qual3 = Bytes.toBytes("qual3");
215   protected final byte[] value1 = Bytes.toBytes("value1");
216   protected final byte[] value2 = Bytes.toBytes("value2");
217   protected final byte[] row = Bytes.toBytes("rowA");
218   protected final byte[] row2 = Bytes.toBytes("rowB");
219 
220   protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
221       .getInstance(MetricsAssertHelper.class);
222 
223   @Before
224   public void setup() throws IOException {
225     TEST_UTIL = HBaseTestingUtility.createLocalHTU();
226     FILESYSTEM = TEST_UTIL.getTestFileSystem();
227     CONF = TEST_UTIL.getConfiguration();
228     dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
229     method = name.getMethodName();
230     tableName = Bytes.toBytes(name.getMethodName());
231   }
232 
233   @After
234   public void tearDown() throws Exception {
235     EnvironmentEdgeManagerTestHelper.reset();
236     LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
237     TEST_UTIL.cleanupTestDir();
238   }
239 
240   String getName() {
241     return name.getMethodName();
242   }
243 
244   /**
245    * Test that I can use the max flushed sequence id after the close.
246    * @throws IOException
247    */
248   @Test (timeout = 100000)
249   public void testSequenceId() throws IOException {
250     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
251     assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
252     // Weird. This returns 0 if no store files or no edits. Afraid to change it.
253     assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
254     region.close();
255     assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
256     assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
257     // Open region again.
258     region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
259     byte [] value = Bytes.toBytes(name.getMethodName());
260     // Make a random put against our cf.
261     Put put = new Put(value);
262     put.addColumn(COLUMN_FAMILY_BYTES, null, value);
263     region.put(put);
264     // No flush yet so init numbers should still be in place.
265     assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
266     assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
267     region.flush(true);
268     long max = region.getMaxFlushedSeqId();
269     region.close();
270     assertEquals(max, region.getMaxFlushedSeqId());
271   }
272 
273   /**
274    * Test for Bug 2 of HBASE-10466.
275    * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
276    * is smaller than a certain value, or when region close starts a flush is ongoing, the first
277    * flush is skipped and only the second flush takes place. However, two flushes are required in
278    * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
279    * in current memstore. The fix is removing all conditions except abort check so we ensure 2
280    * flushes for region close."
281    * @throws IOException
282    */
283   @Test (timeout=60000)
284   public void testCloseCarryingSnapshot() throws IOException {
285     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
286     Store store = region.getStore(COLUMN_FAMILY_BYTES);
287     // Get some random bytes.
288     byte [] value = Bytes.toBytes(name.getMethodName());
289     // Make a random put against our cf.
290     Put put = new Put(value);
291     put.add(COLUMN_FAMILY_BYTES, null, value);
292     // First put something in current memstore, which will be in snapshot after flusher.prepare()
293     region.put(put);
294     StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
295     storeFlushCtx.prepare();
296     // Second put something in current memstore
297     put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
298     region.put(put);
299     // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
300     region.close();
301     assertEquals(0, region.getMemstoreSize());
302     HRegion.closeHRegion(region);
303   }
304 
305 
306 
307   /*
308    * This test is for verifying memstore snapshot size is correctly updated in case of rollback
309    * See HBASE-10845
310    */
311   @Test (timeout=60000)
312   public void testMemstoreSnapshotSize() throws IOException {
313     class MyFaultyFSLog extends FaultyFSLog {
314       StoreFlushContext storeFlushCtx;
315       public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
316           throws IOException {
317         super(fs, rootDir, logName, conf);
318       }
319 
320       void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
321         this.storeFlushCtx = storeFlushCtx;
322       }
323 
324       @Override
325       public void sync(long txid) throws IOException {
326         storeFlushCtx.prepare();
327         super.sync(txid);
328       }
329     }
330 
331     FileSystem fs = FileSystem.get(CONF);
332     Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
333     MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
334     HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
335         CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
336 
337     Store store = region.getStore(COLUMN_FAMILY_BYTES);
338     // Get some random bytes.
339     byte [] value = Bytes.toBytes(name.getMethodName());
340     faultyLog.setStoreFlushCtx(store.createFlushContext(12345));
341 
342     Put put = new Put(value);
343     put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
344     faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
345 
346     boolean threwIOE = false;
347     try {
348       region.put(put);
349     } catch (IOException ioe) {
350       threwIOE = true;
351     } finally {
352       assertTrue("The regionserver should have thrown an exception", threwIOE);
353     }
354     long sz = store.getFlushableSize();
355     assertTrue("flushable size should be zero, but it is " + sz, sz == 0);
356     HRegion.closeHRegion(region);
357   }
358 
359   /**
360    * Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down
361    */
362   @Test
363   public void testMemstoreSizeWithFlushCanceling() throws IOException {
364     FileSystem fs = FileSystem.get(CONF);
365     Path rootDir = new Path(dir + "testMemstoreSizeWithFlushCanceling");
366     FSHLog hLog = new FSHLog(fs, rootDir, "testMemstoreSizeWithFlushCanceling", CONF);
367     HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
368         CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES);
369     Store store = region.getStore(COLUMN_FAMILY_BYTES);
370     assertEquals(0, region.getMemstoreSize());
371 
372     // Put some value and make sure flush could be completed normally
373     byte [] value = Bytes.toBytes(name.getMethodName());
374     Put put = new Put(value);
375     put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
376     region.put(put);
377     long onePutSize = region.getMemstoreSize();
378     assertTrue(onePutSize > 0);
379     region.flush(true);
380     assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize());
381     assertEquals("flushable size should be zero", 0, store.getFlushableSize());
382 
383     // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests
384     RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
385     RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
386     when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class))).
387       thenReturn(null);
388     region.setCoprocessorHost(mockedCPHost);
389     region.put(put);
390     region.flush(true);
391     assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemstoreSize());
392     assertEquals("flushable size should NOT be zero", onePutSize, store.getFlushableSize());
393 
394     // set normalCPHost and flush again, the snapshot will be flushed
395     region.setCoprocessorHost(normalCPHost);
396     region.flush(true);
397     assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize());
398     assertEquals("flushable size should be zero", 0, store.getFlushableSize());
399     HRegion.closeHRegion(region);
400   }
401 
402   /**
403    * Test we do not lose data if we fail a flush and then close.
404    * Part of HBase-10466.  Tests the following from the issue description:
405    * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
406    * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
407    * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
408    * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
409    * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
410    * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
411    * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
412    * much smaller than expected. In extreme case, if the error accumulates to even bigger than
413    * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
414    * if memstoreSize is not larger than 0."
415    * @throws Exception
416    */
417   @Test (timeout=60000)
418   public void testFlushSizeAccounting() throws Exception {
419     final Configuration conf = HBaseConfiguration.create(CONF);
420     // Only retry once.
421     conf.setInt("hbase.hstore.flush.retries.number", 1);
422     final User user =
423       User.createUserForTesting(conf, this.name.getMethodName(), new String[]{"foo"});
424     // Inject our faulty LocalFileSystem
425     conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
426     user.runAs(new PrivilegedExceptionAction<Object>() {
427       @Override
428       public Object run() throws Exception {
429         // Make sure it worked (above is sensitive to caching details in hadoop core)
430         FileSystem fs = FileSystem.get(conf);
431         Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
432         FaultyFileSystem ffs = (FaultyFileSystem)fs;
433         HRegion region = null;
434         try {
435           // Initialize region
436           region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
437           long size = region.getMemstoreSize();
438           Assert.assertEquals(0, size);
439           // Put one item into memstore.  Measure the size of one item in memstore.
440           Put p1 = new Put(row);
441           p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
442           region.put(p1);
443           final long sizeOfOnePut = region.getMemstoreSize();
444           // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
445           try {
446             LOG.info("Flushing");
447             region.flush(true);
448             Assert.fail("Didn't bubble up IOE!");
449           } catch (DroppedSnapshotException dse) {
450             // What we are expecting
451             region.closing.set(false); // this is needed for the rest of the test to work
452           } catch (Exception e) {
453             // What we are expecting
454             region.closing.set(false); // this is needed for the rest of the test to work
455           }
456           // Make it so all writes succeed from here on out
457           ffs.fault.set(false);
458           // WAL is bad because of above faulty fs. Roll WAL.
459           try {
460             region.getWAL().rollWriter(true);
461           } catch (Exception e) {
462             int x = 0;
463           }
464           // Check sizes.  Should still be the one entry.
465           Assert.assertEquals(sizeOfOnePut, region.getMemstoreSize());
466           // Now add two entries so that on this next flush that fails, we can see if we
467           // subtract the right amount, the snapshot size only.
468           Put p2 = new Put(row);
469           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
470           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
471           region.put(p2);
472           Assert.assertEquals(sizeOfOnePut * 3, region.getMemstoreSize());
473           // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
474           // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
475           // it
476           region.flush(true);
477           // Make sure our memory accounting is right.
478           Assert.assertEquals(sizeOfOnePut * 2, region.getMemstoreSize());
479         } catch (Exception e) {
480           int x = 0;
481         } finally {
482           HRegion.closeHRegion(region);
483         }
484         return null;
485       }
486     });
487     FileSystem.closeAllForUGI(user.getUGI());
488   }
489 
490   @Test (timeout=60000)
491   public void testCloseWithFailingFlush() throws Exception {
492     final Configuration conf = HBaseConfiguration.create(CONF);
493     // Only retry once.
494     conf.setInt("hbase.hstore.flush.retries.number", 1);
495     final User user =
496       User.createUserForTesting(conf, this.name.getMethodName(), new String[]{"foo"});
497     // Inject our faulty LocalFileSystem
498     conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
499     user.runAs(new PrivilegedExceptionAction<Object>() {
500       @Override
501       public Object run() throws Exception {
502         // Make sure it worked (above is sensitive to caching details in hadoop core)
503         FileSystem fs = FileSystem.get(conf);
504         Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
505         FaultyFileSystem ffs = (FaultyFileSystem)fs;
506         HRegion region = null;
507         try {
508           // Initialize region
509           region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
510           long size = region.getMemstoreSize();
511           Assert.assertEquals(0, size);
512           // Put one item into memstore.  Measure the size of one item in memstore.
513           Put p1 = new Put(row);
514           p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
515           region.put(p1);
516           // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
517           Store store = region.getStore(COLUMN_FAMILY_BYTES);
518           StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
519           storeFlushCtx.prepare();
520           // Now add two entries to the foreground memstore.
521           Put p2 = new Put(row);
522           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
523           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
524           region.put(p2);
525           // Now try close on top of a failing flush.
526           region.close();
527           fail();
528         } catch (IOException dse) {
529           // Expected
530           LOG.info("Expected DroppedSnapshotException");
531         } finally {
532           // Make it so all writes succeed from here on out so can close clean
533           ffs.fault.set(false);
534           region.getWAL().rollWriter(true);
535           HRegion.closeHRegion(region);
536         }
537         return null;
538       }
539     });
540     FileSystem.closeAllForUGI(user.getUGI());
541   }
542 
543   @Test
544   public void testCompactionAffectedByScanners() throws Exception {
545     byte[] family = Bytes.toBytes("family");
546     this.region = initHRegion(tableName, method, CONF, family);
547 
548     Put put = new Put(Bytes.toBytes("r1"));
549     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
550     region.put(put);
551     region.flush(true);
552 
553     Scan scan = new Scan();
554     scan.setMaxVersions(3);
555     // open the first scanner
556     RegionScanner scanner1 = region.getScanner(scan);
557 
558     Delete delete = new Delete(Bytes.toBytes("r1"));
559     region.delete(delete);
560     region.flush(true);
561 
562     // open the second scanner
563     RegionScanner scanner2 = region.getScanner(scan);
564 
565     List<Cell> results = new ArrayList<Cell>();
566 
567     System.out.println("Smallest read point:" + region.getSmallestReadPoint());
568 
569     // make a major compaction
570     region.compact(true);
571 
572     // open the third scanner
573     RegionScanner scanner3 = region.getScanner(scan);
574 
575     // get data from scanner 1, 2, 3 after major compaction
576     scanner1.next(results);
577     System.out.println(results);
578     assertEquals(1, results.size());
579 
580     results.clear();
581     scanner2.next(results);
582     System.out.println(results);
583     assertEquals(0, results.size());
584 
585     results.clear();
586     scanner3.next(results);
587     System.out.println(results);
588     assertEquals(0, results.size());
589   }
590 
591   @Test
592   public void testToShowNPEOnRegionScannerReseek() throws Exception {
593     byte[] family = Bytes.toBytes("family");
594     this.region = initHRegion(tableName, method, CONF, family);
595 
596     Put put = new Put(Bytes.toBytes("r1"));
597     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
598     region.put(put);
599     put = new Put(Bytes.toBytes("r2"));
600     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
601     region.put(put);
602     region.flush(true);
603 
604     Scan scan = new Scan();
605     scan.setMaxVersions(3);
606     // open the first scanner
607     RegionScanner scanner1 = region.getScanner(scan);
608 
609     System.out.println("Smallest read point:" + region.getSmallestReadPoint());
610 
611     region.compact(true);
612 
613     scanner1.reseek(Bytes.toBytes("r2"));
614     List<Cell> results = new ArrayList<Cell>();
615     scanner1.next(results);
616     Cell keyValue = results.get(0);
617     Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
618     scanner1.close();
619   }
620 
621   @Test
622   public void testSkipRecoveredEditsReplay() throws Exception {
623     String method = "testSkipRecoveredEditsReplay";
624     TableName tableName = TableName.valueOf(method);
625     byte[] family = Bytes.toBytes("family");
626     this.region = initHRegion(tableName, method, CONF, family);
627     final WALFactory wals = new WALFactory(CONF, null, method);
628     try {
629       Path regiondir = region.getRegionFileSystem().getRegionDir();
630       FileSystem fs = region.getRegionFileSystem().getFileSystem();
631       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
632 
633       Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
634 
635       long maxSeqId = 1050;
636       long minSeqId = 1000;
637 
638       for (long i = minSeqId; i <= maxSeqId; i += 10) {
639         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
640         fs.create(recoveredEdits);
641         WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
642 
643         long time = System.nanoTime();
644         WALEdit edit = new WALEdit();
645         edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
646             .toBytes(i)));
647         writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time,
648             HConstants.DEFAULT_CLUSTER_ID), edit));
649 
650         writer.close();
651       }
652       MonitoredTask status = TaskMonitor.get().createStatus(method);
653       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
654       for (Store store : region.getStores()) {
655         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId - 1);
656       }
657       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
658       assertEquals(maxSeqId, seqId);
659       region.getMVCC().advanceTo(seqId);
660       Get get = new Get(row);
661       Result result = region.get(get);
662       for (long i = minSeqId; i <= maxSeqId; i += 10) {
663         List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
664         assertEquals(1, kvs.size());
665         assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
666       }
667     } finally {
668       HRegion.closeHRegion(this.region);
669       this.region = null;
670       wals.close();
671     }
672   }
673 
674   @Test
675   public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
676     String method = "testSkipRecoveredEditsReplaySomeIgnored";
677     TableName tableName = TableName.valueOf(method);
678     byte[] family = Bytes.toBytes("family");
679     this.region = initHRegion(tableName, method, CONF, family);
680     final WALFactory wals = new WALFactory(CONF, null, method);
681     try {
682       Path regiondir = region.getRegionFileSystem().getRegionDir();
683       FileSystem fs = region.getRegionFileSystem().getFileSystem();
684       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
685 
686       Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
687 
688       long maxSeqId = 1050;
689       long minSeqId = 1000;
690 
691       for (long i = minSeqId; i <= maxSeqId; i += 10) {
692         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
693         fs.create(recoveredEdits);
694         WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
695 
696         long time = System.nanoTime();
697         WALEdit edit = new WALEdit();
698         edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
699             .toBytes(i)));
700         writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time,
701             HConstants.DEFAULT_CLUSTER_ID), edit));
702 
703         writer.close();
704       }
705       long recoverSeqId = 1030;
706       MonitoredTask status = TaskMonitor.get().createStatus(method);
707       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
708       for (Store store : region.getStores()) {
709         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1);
710       }
711       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
712       assertEquals(maxSeqId, seqId);
713       region.getMVCC().advanceTo(seqId);
714       Get get = new Get(row);
715       Result result = region.get(get);
716       for (long i = minSeqId; i <= maxSeqId; i += 10) {
717         List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
718         if (i < recoverSeqId) {
719           assertEquals(0, kvs.size());
720         } else {
721           assertEquals(1, kvs.size());
722           assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
723         }
724       }
725     } finally {
726       HRegion.closeHRegion(this.region);
727       this.region = null;
728       wals.close();
729     }
730   }
731 
732   @Test
733   public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
734     byte[] family = Bytes.toBytes("family");
735     this.region = initHRegion(tableName, method, CONF, family);
736     try {
737       Path regiondir = region.getRegionFileSystem().getRegionDir();
738       FileSystem fs = region.getRegionFileSystem().getFileSystem();
739 
740       Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
741       for (int i = 1000; i < 1050; i += 10) {
742         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
743         FSDataOutputStream dos = fs.create(recoveredEdits);
744         dos.writeInt(i);
745         dos.close();
746       }
747       long minSeqId = 2000;
748       Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
749       FSDataOutputStream dos = fs.create(recoveredEdits);
750       dos.close();
751 
752       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
753       for (Store store : region.getStores()) {
754         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId);
755       }
756       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, null);
757       assertEquals(minSeqId, seqId);
758     } finally {
759       HRegion.closeHRegion(this.region);
760       this.region = null;
761     }
762   }
763 
764   @Test
765   public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
766     String method = "testSkipRecoveredEditsReplayTheLastFileIgnored";
767     TableName tableName = TableName.valueOf(method);
768     byte[] family = Bytes.toBytes("family");
769     this.region = initHRegion(tableName, method, CONF, family);
770     final WALFactory wals = new WALFactory(CONF, null, method);
771     try {
772       Path regiondir = region.getRegionFileSystem().getRegionDir();
773       FileSystem fs = region.getRegionFileSystem().getFileSystem();
774       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
775       byte[][] columns = region.getTableDesc().getFamiliesKeys().toArray(new byte[0][]);
776 
777       assertEquals(0, region.getStoreFileList(columns).size());
778 
779       Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
780 
781       long maxSeqId = 1050;
782       long minSeqId = 1000;
783 
784       for (long i = minSeqId; i <= maxSeqId; i += 10) {
785         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
786         fs.create(recoveredEdits);
787         WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
788 
789         long time = System.nanoTime();
790         WALEdit edit = null;
791         if (i == maxSeqId) {
792           edit = WALEdit.createCompaction(region.getRegionInfo(),
793           CompactionDescriptor.newBuilder()
794           .setTableName(ByteString.copyFrom(tableName.getName()))
795           .setFamilyName(ByteString.copyFrom(regionName))
796           .setEncodedRegionName(ByteString.copyFrom(regionName))
797           .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
798           .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
799           .build());
800         } else {
801           edit = new WALEdit();
802           edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
803             .toBytes(i)));
804         }
805         writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time,
806             HConstants.DEFAULT_CLUSTER_ID), edit));
807         writer.close();
808       }
809 
810       long recoverSeqId = 1030;
811       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
812       MonitoredTask status = TaskMonitor.get().createStatus(method);
813       for (Store store : region.getStores()) {
814         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1);
815       }
816       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
817       assertEquals(maxSeqId, seqId);
818 
819       // assert that the files are flushed
820       assertEquals(1, region.getStoreFileList(columns).size());
821 
822     } finally {
823       HRegion.closeHRegion(this.region);
824       this.region = null;
825       wals.close();
826     }
827   }
828 
829   @Test
830   public void testRecoveredEditsReplayCompaction() throws Exception {
831     testRecoveredEditsReplayCompaction(false);
832     testRecoveredEditsReplayCompaction(true);
833   }
834   public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
835     String method = name.getMethodName();
836     TableName tableName = TableName.valueOf(method);
837     byte[] family = Bytes.toBytes("family");
838     this.region = initHRegion(tableName, method, CONF, family);
839     final WALFactory wals = new WALFactory(CONF, null, method);
840     try {
841       Path regiondir = region.getRegionFileSystem().getRegionDir();
842       FileSystem fs = region.getRegionFileSystem().getFileSystem();
843       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
844 
845       long maxSeqId = 3;
846       long minSeqId = 0;
847 
848       for (long i = minSeqId; i < maxSeqId; i++) {
849         Put put = new Put(Bytes.toBytes(i));
850         put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
851         region.put(put);
852         region.flush(true);
853       }
854 
855       // this will create a region with 3 files
856       assertEquals(3, region.getStore(family).getStorefilesCount());
857       List<Path> storeFiles = new ArrayList<Path>(3);
858       for (StoreFile sf : region.getStore(family).getStorefiles()) {
859         storeFiles.add(sf.getPath());
860       }
861 
862       // disable compaction completion
863       CONF.setBoolean("hbase.hstore.compaction.complete", false);
864       region.compactStores();
865 
866       // ensure that nothing changed
867       assertEquals(3, region.getStore(family).getStorefilesCount());
868 
869       // now find the compacted file, and manually add it to the recovered edits
870       Path tmpDir = region.getRegionFileSystem().getTempDir();
871       FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
872       String errorMsg = "Expected to find 1 file in the region temp directory "
873           + "from the compaction, could not find any";
874       assertNotNull(errorMsg, files);
875       assertEquals(errorMsg, 1, files.length);
876       // move the file inside region dir
877       Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
878           files[0].getPath());
879 
880       byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
881       byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
882       for (int i=0; i < encodedNameAsBytes.length; i++) {
883         // Mix the byte array to have a new encodedName
884         fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
885       }
886 
887       CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
888         .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
889             storeFiles, Lists.newArrayList(newFile),
890             region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
891 
892       WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
893           this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
894 
895       Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
896 
897       Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
898       fs.create(recoveredEdits);
899       WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
900 
901       long time = System.nanoTime();
902 
903       writer.append(new WAL.Entry(new HLogKey(regionName, tableName, 10, time,
904           HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
905           compactionDescriptor)));
906       writer.close();
907 
908       // close the region now, and reopen again
909       region.getTableDesc();
910       region.getRegionInfo();
911       region.close();
912       try {
913         region = HRegion.openHRegion(region, null);
914       } catch (WrongRegionException wre) {
915         fail("Matching encoded region name should not have produced WrongRegionException");
916       }
917 
918       // now check whether we have only one store file, the compacted one
919       Collection<StoreFile> sfs = region.getStore(family).getStorefiles();
920       for (StoreFile sf : sfs) {
921         LOG.info(sf.getPath());
922       }
923       if (!mismatchedRegionName) {
924         assertEquals(1, region.getStore(family).getStorefilesCount());
925       }
926       files = FSUtils.listStatus(fs, tmpDir);
927       assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
928 
929       for (long i = minSeqId; i < maxSeqId; i++) {
930         Get get = new Get(Bytes.toBytes(i));
931         Result result = region.get(get);
932         byte[] value = result.getValue(family, Bytes.toBytes(i));
933         assertArrayEquals(Bytes.toBytes(i), value);
934       }
935     } finally {
936       HRegion.closeHRegion(this.region);
937       this.region = null;
938       wals.close();
939     }
940   }
941 
942   @Test
943   public void testFlushMarkers() throws Exception {
944     // tests that flush markers are written to WAL and handled at recovered edits
945     String method = name.getMethodName();
946     TableName tableName = TableName.valueOf(method);
947     byte[] family = Bytes.toBytes("family");
948     Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
949     final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
950     FSUtils.setRootDir(walConf, logDir);
951     final WALFactory wals = new WALFactory(walConf, null, method);
952     final WAL wal = wals.getWAL(tableName.getName());
953 
954     this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
955       HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family);
956     try {
957       Path regiondir = region.getRegionFileSystem().getRegionDir();
958       FileSystem fs = region.getRegionFileSystem().getFileSystem();
959       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
960 
961       long maxSeqId = 3;
962       long minSeqId = 0;
963 
964       for (long i = minSeqId; i < maxSeqId; i++) {
965         Put put = new Put(Bytes.toBytes(i));
966         put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
967         region.put(put);
968         region.flush(true);
969       }
970 
971       // this will create a region with 3 files from flush
972       assertEquals(3, region.getStore(family).getStorefilesCount());
973       List<String> storeFiles = new ArrayList<String>(3);
974       for (StoreFile sf : region.getStore(family).getStorefiles()) {
975         storeFiles.add(sf.getPath().getName());
976       }
977 
978       // now verify that the flush markers are written
979       wal.shutdown();
980       WAL.Reader reader = WALFactory.createReader(fs, DefaultWALProvider.getCurrentFileName(wal),
981         TEST_UTIL.getConfiguration());
982       try {
983         List<WAL.Entry> flushDescriptors = new ArrayList<WAL.Entry>();
984         long lastFlushSeqId = -1;
985         while (true) {
986           WAL.Entry entry = reader.next();
987           if (entry == null) {
988             break;
989           }
990           Cell cell = entry.getEdit().getCells().get(0);
991           if (WALEdit.isMetaEditFamily(cell)) {
992             FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
993             assertNotNull(flushDesc);
994             assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
995             if (flushDesc.getAction() == FlushAction.START_FLUSH) {
996               assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
997             } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
998               assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
999             }
1000             lastFlushSeqId = flushDesc.getFlushSequenceNumber();
1001             assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
1002             assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
1003             StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
1004             assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
1005             assertEquals("family", storeFlushDesc.getStoreHomeDir());
1006             if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1007               assertEquals(0, storeFlushDesc.getFlushOutputCount());
1008             } else {
1009               assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
1010               assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
1011             }
1012 
1013             flushDescriptors.add(entry);
1014           }
1015         }
1016 
1017         assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
1018 
1019         // now write those markers to the recovered edits again.
1020 
1021         Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
1022 
1023         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1024         fs.create(recoveredEdits);
1025         WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1026 
1027         for (WAL.Entry entry : flushDescriptors) {
1028           writer.append(entry);
1029         }
1030         writer.close();
1031       } finally {
1032         if (null != reader) {
1033           try {
1034             reader.close();
1035           } catch (IOException exception) {
1036             LOG.warn("Problem closing wal: " + exception.getMessage());
1037             LOG.debug("exception details", exception);
1038           }
1039         }
1040       }
1041 
1042 
1043       // close the region now, and reopen again
1044       region.close();
1045       region = HRegion.openHRegion(region, null);
1046 
1047       // now check whether we have can read back the data from region
1048       for (long i = minSeqId; i < maxSeqId; i++) {
1049         Get get = new Get(Bytes.toBytes(i));
1050         Result result = region.get(get);
1051         byte[] value = result.getValue(family, Bytes.toBytes(i));
1052         assertArrayEquals(Bytes.toBytes(i), value);
1053       }
1054     } finally {
1055       HRegion.closeHRegion(this.region);
1056       this.region = null;
1057       wals.close();
1058     }
1059   }
1060 
1061   class IsFlushWALMarker extends ArgumentMatcher<WALEdit> {
1062     volatile FlushAction[] actions;
1063     public IsFlushWALMarker(FlushAction... actions) {
1064       this.actions = actions;
1065     }
1066     @Override
1067     public boolean matches(Object edit) {
1068       List<Cell> cells = ((WALEdit)edit).getCells();
1069       if (cells.isEmpty()) {
1070         return false;
1071       }
1072       if (WALEdit.isMetaEditFamily(cells.get(0))) {
1073         FlushDescriptor desc = null;
1074         try {
1075           desc = WALEdit.getFlushDescriptor(cells.get(0));
1076         } catch (IOException e) {
1077           LOG.warn(e);
1078           return false;
1079         }
1080         if (desc != null) {
1081           for (FlushAction action : actions) {
1082             if (desc.getAction() == action) {
1083               return true;
1084             }
1085           }
1086         }
1087       }
1088       return false;
1089     }
1090     public IsFlushWALMarker set(FlushAction... actions) {
1091       this.actions = actions;
1092       return this;
1093     }
1094   }
1095 
1096   @Test (timeout=60000)
1097   public void testFlushMarkersWALFail() throws Exception {
1098     // test the cases where the WAL append for flush markers fail.
1099     String method = name.getMethodName();
1100     TableName tableName = TableName.valueOf(method);
1101     byte[] family = Bytes.toBytes("family");
1102 
1103     // spy an actual WAL implementation to throw exception (was not able to mock)
1104     Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
1105 
1106     final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1107     FSUtils.setRootDir(walConf, logDir);
1108 
1109     // Make up a WAL that we can manipulate at append time.
1110     class FailAppendFlushMarkerWAL extends FSHLog {
1111       volatile FlushAction [] flushActions = null;
1112 
1113       public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
1114       throws IOException {
1115         super(fs, root, logDir, conf);
1116       }
1117 
1118       @Override
1119       protected Writer createWriterInstance(Path path) throws IOException {
1120         final Writer w = super.createWriterInstance(path);
1121         return new Writer() {
1122           @Override
1123           public void close() throws IOException {
1124             w.close();
1125           }
1126 
1127           @Override
1128           public void sync() throws IOException {
1129             w.sync();
1130           }
1131 
1132           @Override
1133           public void append(Entry entry) throws IOException {
1134             List<Cell> cells = entry.getEdit().getCells();
1135             if (WALEdit.isMetaEditFamily(cells.get(0))) {
1136                FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
1137               if (desc != null) {
1138                 for (FlushAction flushAction: flushActions) {
1139                   if (desc.getAction().equals(flushAction)) {
1140                     throw new IOException("Failed to append flush marker! " + flushAction);
1141                   }
1142                 }
1143               }
1144             }
1145             w.append(entry);
1146           }
1147 
1148           @Override
1149           public long getLength() throws IOException {
1150             return w.getLength();
1151           }
1152         };
1153       }
1154     }
1155     FailAppendFlushMarkerWAL wal =
1156       new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1157         getName(), walConf);
1158     this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
1159       HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family);
1160     try {
1161       int i = 0;
1162       Put put = new Put(Bytes.toBytes(i));
1163       put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
1164       put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
1165       region.put(put);
1166 
1167       // 1. Test case where START_FLUSH throws exception
1168       wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
1169 
1170       // start cache flush will throw exception
1171       try {
1172         region.flush(true);
1173         fail("This should have thrown exception");
1174       } catch (DroppedSnapshotException unexpected) {
1175         // this should not be a dropped snapshot exception. Meaning that RS will not abort
1176         throw unexpected;
1177       } catch (IOException expected) {
1178         // expected
1179       }
1180       // The WAL is hosed now. It has two edits appended. We cannot roll the log without it
1181       // throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
1182       region.close(true);
1183       wal.close();
1184 
1185       // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1186       wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
1187       wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1188             getName(), walConf);
1189       this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
1190         HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family);
1191       region.put(put);
1192 
1193       // 3. Test case where ABORT_FLUSH will throw exception.
1194       // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1195       // DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort
1196       wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
1197 
1198       try {
1199         region.flush(true);
1200         fail("This should have thrown exception");
1201       } catch (DroppedSnapshotException expected) {
1202         // we expect this exception, since we were able to write the snapshot, but failed to
1203         // write the flush marker to WAL
1204       } catch (IOException unexpected) {
1205         throw unexpected;
1206       }
1207 
1208     } finally {
1209       HRegion.closeHRegion(this.region);
1210       this.region = null;
1211     }
1212   }
1213 
1214   @Test
1215   public void testGetWhileRegionClose() throws IOException {
1216     TableName tableName = TableName.valueOf(name.getMethodName());
1217     Configuration hc = initSplit();
1218     int numRows = 100;
1219     byte[][] families = { fam1, fam2, fam3 };
1220 
1221     // Setting up region
1222     String method = name.getMethodName();
1223     this.region = initHRegion(tableName, method, hc, families);
1224     try {
1225       // Put data in region
1226       final int startRow = 100;
1227       putData(startRow, numRows, qual1, families);
1228       putData(startRow, numRows, qual2, families);
1229       putData(startRow, numRows, qual3, families);
1230       final AtomicBoolean done = new AtomicBoolean(false);
1231       final AtomicInteger gets = new AtomicInteger(0);
1232       GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
1233       try {
1234         // Set ten threads running concurrently getting from the region.
1235         for (int i = 0; i < threads.length / 2; i++) {
1236           threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1237           threads[i].setDaemon(true);
1238           threads[i].start();
1239         }
1240         // Artificially make the condition by setting closing flag explicitly.
1241         // I can't make the issue happen with a call to region.close().
1242         this.region.closing.set(true);
1243         for (int i = threads.length / 2; i < threads.length; i++) {
1244           threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1245           threads[i].setDaemon(true);
1246           threads[i].start();
1247         }
1248       } finally {
1249         if (this.region != null) {
1250           HRegion.closeHRegion(this.region);
1251         }
1252       }
1253       done.set(true);
1254       for (GetTillDoneOrException t : threads) {
1255         try {
1256           t.join();
1257         } catch (InterruptedException e) {
1258           e.printStackTrace();
1259         }
1260         if (t.e != null) {
1261           LOG.info("Exception=" + t.e);
1262           assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
1263         }
1264       }
1265     } finally {
1266       HRegion.closeHRegion(this.region);
1267       this.region = null;
1268     }
1269   }
1270 
1271   /*
1272    * Thread that does get on single row until 'done' flag is flipped. If an
1273    * exception causes us to fail, it records it.
1274    */
1275   class GetTillDoneOrException extends Thread {
1276     private final Get g;
1277     private final AtomicBoolean done;
1278     private final AtomicInteger count;
1279     private Exception e;
1280 
1281     GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final AtomicInteger c) {
1282       super("getter." + i);
1283       this.g = new Get(r);
1284       this.done = d;
1285       this.count = c;
1286     }
1287 
1288     @Override
1289     public void run() {
1290       while (!this.done.get()) {
1291         try {
1292           assertTrue(region.get(g).size() > 0);
1293           this.count.incrementAndGet();
1294         } catch (Exception e) {
1295           this.e = e;
1296           break;
1297         }
1298       }
1299     }
1300   }
1301 
1302   /*
1303    * An involved filter test. Has multiple column families and deletes in mix.
1304    */
1305   @Test
1306   public void testWeirdCacheBehaviour() throws Exception {
1307     byte[] TABLE = Bytes.toBytes("testWeirdCacheBehaviour");
1308     byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
1309         Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
1310     this.region = initHRegion(TABLE, getName(), CONF, FAMILIES);
1311     try {
1312       String value = "this is the value";
1313       String value2 = "this is some other value";
1314       String keyPrefix1 = "prefix1";
1315       String keyPrefix2 = "prefix2";
1316       String keyPrefix3 = "prefix3";
1317       putRows(this.region, 3, value, keyPrefix1);
1318       putRows(this.region, 3, value, keyPrefix2);
1319       putRows(this.region, 3, value, keyPrefix3);
1320       putRows(this.region, 3, value2, keyPrefix1);
1321       putRows(this.region, 3, value2, keyPrefix2);
1322       putRows(this.region, 3, value2, keyPrefix3);
1323       System.out.println("Checking values for key: " + keyPrefix1);
1324       assertEquals("Got back incorrect number of rows from scan", 3,
1325           getNumberOfRows(keyPrefix1, value2, this.region));
1326       System.out.println("Checking values for key: " + keyPrefix2);
1327       assertEquals("Got back incorrect number of rows from scan", 3,
1328           getNumberOfRows(keyPrefix2, value2, this.region));
1329       System.out.println("Checking values for key: " + keyPrefix3);
1330       assertEquals("Got back incorrect number of rows from scan", 3,
1331           getNumberOfRows(keyPrefix3, value2, this.region));
1332       deleteColumns(this.region, value2, keyPrefix1);
1333       deleteColumns(this.region, value2, keyPrefix2);
1334       deleteColumns(this.region, value2, keyPrefix3);
1335       System.out.println("Starting important checks.....");
1336       assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
1337           getNumberOfRows(keyPrefix1, value2, this.region));
1338       assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
1339           getNumberOfRows(keyPrefix2, value2, this.region));
1340       assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
1341           getNumberOfRows(keyPrefix3, value2, this.region));
1342     } finally {
1343       HRegion.closeHRegion(this.region);
1344       this.region = null;
1345     }
1346   }
1347 
1348   @Test
1349   public void testAppendWithReadOnlyTable() throws Exception {
1350     byte[] TABLE = Bytes.toBytes("readOnlyTable");
1351     this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily"));
1352     boolean exceptionCaught = false;
1353     Append append = new Append(Bytes.toBytes("somerow"));
1354     append.setDurability(Durability.SKIP_WAL);
1355     append.add(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
1356         Bytes.toBytes("somevalue"));
1357     try {
1358       region.append(append);
1359     } catch (IOException e) {
1360       exceptionCaught = true;
1361     } finally {
1362       HRegion.closeHRegion(this.region);
1363       this.region = null;
1364     }
1365     assertTrue(exceptionCaught == true);
1366   }
1367 
1368   @Test
1369   public void testIncrWithReadOnlyTable() throws Exception {
1370     byte[] TABLE = Bytes.toBytes("readOnlyTable");
1371     this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily"));
1372     boolean exceptionCaught = false;
1373     Increment inc = new Increment(Bytes.toBytes("somerow"));
1374     inc.setDurability(Durability.SKIP_WAL);
1375     inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
1376     try {
1377       region.increment(inc);
1378     } catch (IOException e) {
1379       exceptionCaught = true;
1380     } finally {
1381       HRegion.closeHRegion(this.region);
1382       this.region = null;
1383     }
1384     assertTrue(exceptionCaught == true);
1385   }
1386 
1387   private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
1388     InternalScanner scanner = buildScanner(keyPrefix, value, r);
1389     int count = 0;
1390     boolean more = false;
1391     List<Cell> results = new ArrayList<Cell>();
1392     do {
1393       more = scanner.next(results);
1394       if (results != null && !results.isEmpty())
1395         count++;
1396       else
1397         break;
1398       Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
1399       delete.deleteColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
1400       r.delete(delete);
1401       results.clear();
1402     } while (more);
1403     assertEquals("Did not perform correct number of deletes", 3, count);
1404   }
1405 
1406   private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
1407     InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
1408     int numberOfResults = 0;
1409     List<Cell> results = new ArrayList<Cell>();
1410     boolean more = false;
1411     do {
1412       more = resultScanner.next(results);
1413       if (results != null && !results.isEmpty())
1414         numberOfResults++;
1415       else
1416         break;
1417       for (Cell kv : results) {
1418         System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
1419       }
1420       results.clear();
1421     } while (more);
1422     return numberOfResults;
1423   }
1424 
1425   private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
1426       throws IOException {
1427     // Defaults FilterList.Operator.MUST_PASS_ALL.
1428     FilterList allFilters = new FilterList();
1429     allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
1430     // Only return rows where this column value exists in the row.
1431     SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
1432         Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value));
1433     filter.setFilterIfMissing(true);
1434     allFilters.addFilter(filter);
1435     Scan scan = new Scan();
1436     scan.addFamily(Bytes.toBytes("trans-blob"));
1437     scan.addFamily(Bytes.toBytes("trans-type"));
1438     scan.addFamily(Bytes.toBytes("trans-date"));
1439     scan.addFamily(Bytes.toBytes("trans-tags"));
1440     scan.addFamily(Bytes.toBytes("trans-group"));
1441     scan.setFilter(allFilters);
1442     return r.getScanner(scan);
1443   }
1444 
1445   private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
1446     for (int i = 0; i < numRows; i++) {
1447       String row = key + "_" + i/* UUID.randomUUID().toString() */;
1448       System.out.println(String.format("Saving row: %s, with value %s", row, value));
1449       Put put = new Put(Bytes.toBytes(row));
1450       put.setDurability(Durability.SKIP_WAL);
1451       put.add(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
1452       put.add(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
1453       put.add(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
1454       put.add(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
1455       put.add(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
1456       r.put(put);
1457     }
1458   }
1459 
1460   @Test
1461   public void testFamilyWithAndWithoutColon() throws Exception {
1462     byte[] b = Bytes.toBytes(getName());
1463     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1464     this.region = initHRegion(b, getName(), CONF, cf);
1465     try {
1466       Put p = new Put(b);
1467       byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
1468       p.add(cfwithcolon, cfwithcolon, cfwithcolon);
1469       boolean exception = false;
1470       try {
1471         this.region.put(p);
1472       } catch (NoSuchColumnFamilyException e) {
1473         exception = true;
1474       }
1475       assertTrue(exception);
1476     } finally {
1477       HRegion.closeHRegion(this.region);
1478       this.region = null;
1479     }
1480   }
1481 
1482   @Test
1483   public void testBatchPut_whileNoRowLocksHeld() throws IOException {
1484     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1485     byte[] qual = Bytes.toBytes("qual");
1486     byte[] val = Bytes.toBytes("val");
1487     this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf);
1488     MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1489     try {
1490       long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1491       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1492 
1493       LOG.info("First a batch put with all valid puts");
1494       final Put[] puts = new Put[10];
1495       for (int i = 0; i < 10; i++) {
1496         puts[i] = new Put(Bytes.toBytes("row_" + i));
1497         puts[i].add(cf, qual, val);
1498       }
1499 
1500       OperationStatus[] codes = this.region.batchMutate(puts);
1501       assertEquals(10, codes.length);
1502       for (int i = 0; i < 10; i++) {
1503         assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1504       }
1505       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1506 
1507       LOG.info("Next a batch put with one invalid family");
1508       puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
1509       codes = this.region.batchMutate(puts);
1510       assertEquals(10, codes.length);
1511       for (int i = 0; i < 10; i++) {
1512         assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1513             codes[i].getOperationStatusCode());
1514       }
1515 
1516       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1517     } finally {
1518       HRegion.closeHRegion(this.region);
1519       this.region = null;
1520     }
1521   }
1522 
1523   @Test
1524   public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
1525     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1526     byte[] qual = Bytes.toBytes("qual");
1527     byte[] val = Bytes.toBytes("val");
1528     this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf);
1529     MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1530     try {
1531       long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1532       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1533 
1534       final Put[] puts = new Put[10];
1535       for (int i = 0; i < 10; i++) {
1536         puts[i] = new Put(Bytes.toBytes("row_" + i));
1537         puts[i].add(cf, qual, val);
1538       }
1539       puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
1540 
1541       LOG.info("batchPut will have to break into four batches to avoid row locks");
1542       RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
1543       RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1"));
1544       RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
1545       RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
1546 
1547 
1548       MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1549       final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>();
1550       final CountDownLatch startingPuts = new CountDownLatch(1);
1551       final CountDownLatch startingClose = new CountDownLatch(1);
1552       TestThread putter = new TestThread(ctx) {
1553         @Override
1554         public void doWork() throws IOException {
1555           startingPuts.countDown();
1556           retFromThread.set(region.batchMutate(puts));
1557         }
1558       };
1559       LOG.info("...starting put thread while holding locks");
1560       ctx.addThread(putter);
1561       ctx.startThreads();
1562 
1563       // Now attempt to close the region from another thread.  Prior to HBASE-12565
1564       // this would cause the in-progress batchMutate operation to to fail with
1565       // exception because it use to release and re-acquire the close-guard lock
1566       // between batches.  Caller then didn't get status indicating which writes succeeded.
1567       // We now expect this thread to block until the batchMutate call finishes.
1568       Thread regionCloseThread = new TestThread(ctx) {
1569         @Override
1570         public void doWork() {
1571           try {
1572             startingPuts.await();
1573             // Give some time for the batch mutate to get in.
1574             // We don't want to race with the mutate
1575             Thread.sleep(10);
1576             startingClose.countDown();
1577             HBaseTestingUtility.closeRegionAndWAL(region);
1578           } catch (IOException e) {
1579             throw new RuntimeException(e);
1580           } catch (InterruptedException e) {
1581             throw new RuntimeException(e);
1582           }
1583         }
1584       };
1585       regionCloseThread.start();
1586 
1587       startingClose.await();
1588       startingPuts.await();
1589       Thread.sleep(100);
1590       LOG.info("...releasing row lock 1, which should let put thread continue");
1591       rowLock1.release();
1592       rowLock2.release();
1593       rowLock3.release();
1594       waitForCounter(source, "syncTimeNumOps", syncs + 1);
1595 
1596       LOG.info("...joining on put thread");
1597       ctx.stop();
1598       regionCloseThread.join();
1599 
1600       OperationStatus[] codes = retFromThread.get();
1601       for (int i = 0; i < codes.length; i++) {
1602         assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1603             codes[i].getOperationStatusCode());
1604       }
1605       rowLock4.release();
1606     } finally {
1607       HRegion.closeHRegion(this.region);
1608       this.region = null;
1609     }
1610   }
1611 
1612   private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
1613       throws InterruptedException {
1614     long startWait = System.currentTimeMillis();
1615     long currentCount;
1616     while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
1617       Thread.sleep(100);
1618       if (System.currentTimeMillis() - startWait > 10000) {
1619         fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
1620           expectedCount, currentCount));
1621       }
1622     }
1623   }
1624 
1625   @Test
1626   public void testBatchPutWithTsSlop() throws Exception {
1627     byte[] b = Bytes.toBytes(getName());
1628     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1629     byte[] qual = Bytes.toBytes("qual");
1630     byte[] val = Bytes.toBytes("val");
1631 
1632     // add data with a timestamp that is too recent for range. Ensure assert
1633     CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1634     this.region = initHRegion(b, getName(), CONF, cf);
1635 
1636     try {
1637       MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1638       long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1639       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1640 
1641       final Put[] puts = new Put[10];
1642       for (int i = 0; i < 10; i++) {
1643         puts[i] = new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100);
1644         puts[i].add(cf, qual, val);
1645       }
1646 
1647       OperationStatus[] codes = this.region.batchMutate(puts);
1648       assertEquals(10, codes.length);
1649       for (int i = 0; i < 10; i++) {
1650         assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1651       }
1652       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1653 
1654     } finally {
1655       HRegion.closeHRegion(this.region);
1656       this.region = null;
1657     }
1658 
1659   }
1660 
1661   // ////////////////////////////////////////////////////////////////////////////
1662   // checkAndMutate tests
1663   // ////////////////////////////////////////////////////////////////////////////
1664   @Test
1665   public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1666     byte[] row1 = Bytes.toBytes("row1");
1667     byte[] fam1 = Bytes.toBytes("fam1");
1668     byte[] qf1 = Bytes.toBytes("qualifier");
1669     byte[] emptyVal = new byte[] {};
1670     byte[] val1 = Bytes.toBytes("value1");
1671     byte[] val2 = Bytes.toBytes("value2");
1672 
1673     // Setting up region
1674     String method = this.getName();
1675     this.region = initHRegion(tableName, method, CONF, fam1);
1676     try {
1677       // Putting empty data in key
1678       Put put = new Put(row1);
1679       put.add(fam1, qf1, emptyVal);
1680 
1681       // checkAndPut with empty value
1682       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1683           emptyVal), put, true);
1684       assertTrue(res);
1685 
1686       // Putting data in key
1687       put = new Put(row1);
1688       put.add(fam1, qf1, val1);
1689 
1690       // checkAndPut with correct value
1691       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1692           put, true);
1693       assertTrue(res);
1694 
1695       // not empty anymore
1696       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1697           put, true);
1698       assertFalse(res);
1699 
1700       Delete delete = new Delete(row1);
1701       delete.deleteColumn(fam1, qf1);
1702       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1703           delete, true);
1704       assertFalse(res);
1705 
1706       put = new Put(row1);
1707       put.add(fam1, qf1, val2);
1708       // checkAndPut with correct value
1709       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val1),
1710           put, true);
1711       assertTrue(res);
1712 
1713       // checkAndDelete with correct value
1714       delete = new Delete(row1);
1715       delete.deleteColumn(fam1, qf1);
1716       delete.deleteColumn(fam1, qf1);
1717       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val2),
1718           delete, true);
1719       assertTrue(res);
1720 
1721       delete = new Delete(row1);
1722       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1723           delete, true);
1724       assertTrue(res);
1725 
1726       // checkAndPut looking for a null value
1727       put = new Put(row1);
1728       put.add(fam1, qf1, val1);
1729 
1730       res = region
1731           .checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new NullComparator(), put, true);
1732       assertTrue(res);
1733     } finally {
1734       HRegion.closeHRegion(this.region);
1735       this.region = null;
1736     }
1737   }
1738 
1739   @Test
1740   public void testCheckAndMutate_WithWrongValue() throws IOException {
1741     byte[] row1 = Bytes.toBytes("row1");
1742     byte[] fam1 = Bytes.toBytes("fam1");
1743     byte[] qf1 = Bytes.toBytes("qualifier");
1744     byte[] val1 = Bytes.toBytes("value1");
1745     byte[] val2 = Bytes.toBytes("value2");
1746 
1747     // Setting up region
1748     String method = this.getName();
1749     this.region = initHRegion(tableName, method, CONF, fam1);
1750     try {
1751       // Putting data in key
1752       Put put = new Put(row1);
1753       put.add(fam1, qf1, val1);
1754       region.put(put);
1755 
1756       // checkAndPut with wrong value
1757       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1758           val2), put, true);
1759       assertEquals(false, res);
1760 
1761       // checkAndDelete with wrong value
1762       Delete delete = new Delete(row1);
1763       delete.deleteFamily(fam1);
1764       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val2),
1765           put, true);
1766       assertEquals(false, res);
1767     } finally {
1768       HRegion.closeHRegion(this.region);
1769       this.region = null;
1770     }
1771   }
1772 
1773   @Test
1774   public void testCheckAndMutate_WithCorrectValue() throws IOException {
1775     byte[] row1 = Bytes.toBytes("row1");
1776     byte[] fam1 = Bytes.toBytes("fam1");
1777     byte[] qf1 = Bytes.toBytes("qualifier");
1778     byte[] val1 = Bytes.toBytes("value1");
1779 
1780     // Setting up region
1781     String method = this.getName();
1782     this.region = initHRegion(tableName, method, CONF, fam1);
1783     try {
1784       // Putting data in key
1785       Put put = new Put(row1);
1786       put.add(fam1, qf1, val1);
1787       region.put(put);
1788 
1789       // checkAndPut with correct value
1790       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1791           val1), put, true);
1792       assertEquals(true, res);
1793 
1794       // checkAndDelete with correct value
1795       Delete delete = new Delete(row1);
1796       delete.deleteColumn(fam1, qf1);
1797       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val1),
1798           delete, true);
1799       assertEquals(true, res);
1800     } finally {
1801       HRegion.closeHRegion(this.region);
1802       this.region = null;
1803     }
1804   }
1805 
1806   @Test
1807   public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
1808     byte[] row1 = Bytes.toBytes("row1");
1809     byte[] fam1 = Bytes.toBytes("fam1");
1810     byte[] qf1 = Bytes.toBytes("qualifier");
1811     byte[] val1 = Bytes.toBytes("value1");
1812     byte[] val2 = Bytes.toBytes("value2");
1813     byte[] val3 = Bytes.toBytes("value3");
1814     byte[] val4 = Bytes.toBytes("value4");
1815 
1816     // Setting up region
1817     String method = this.getName();
1818     this.region = initHRegion(tableName, method, CONF, fam1);
1819     try {
1820       // Putting val3 in key
1821       Put put = new Put(row1);
1822       put.add(fam1, qf1, val3);
1823       region.put(put);
1824 
1825       // Test CompareOp.LESS: original = val3, compare with val3, fail
1826       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS,
1827           new BinaryComparator(val3), put, true);
1828       assertEquals(false, res);
1829 
1830       // Test CompareOp.LESS: original = val3, compare with val4, fail
1831       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS,
1832           new BinaryComparator(val4), put, true);
1833       assertEquals(false, res);
1834 
1835       // Test CompareOp.LESS: original = val3, compare with val2,
1836       // succeed (now value = val2)
1837       put = new Put(row1);
1838       put.add(fam1, qf1, val2);
1839       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS,
1840           new BinaryComparator(val2), put, true);
1841       assertEquals(true, res);
1842 
1843       // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1844       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS_OR_EQUAL,
1845           new BinaryComparator(val3), put, true);
1846       assertEquals(false, res);
1847 
1848       // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1849       // succeed (value still = val2)
1850       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS_OR_EQUAL,
1851           new BinaryComparator(val2), put, true);
1852       assertEquals(true, res);
1853 
1854       // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1855       // succeed (now value = val3)
1856       put = new Put(row1);
1857       put.add(fam1, qf1, val3);
1858       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS_OR_EQUAL,
1859           new BinaryComparator(val1), put, true);
1860       assertEquals(true, res);
1861 
1862       // Test CompareOp.GREATER: original = val3, compare with val3, fail
1863       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER,
1864           new BinaryComparator(val3), put, true);
1865       assertEquals(false, res);
1866 
1867       // Test CompareOp.GREATER: original = val3, compare with val2, fail
1868       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER,
1869           new BinaryComparator(val2), put, true);
1870       assertEquals(false, res);
1871 
1872       // Test CompareOp.GREATER: original = val3, compare with val4,
1873       // succeed (now value = val2)
1874       put = new Put(row1);
1875       put.add(fam1, qf1, val2);
1876       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER,
1877           new BinaryComparator(val4), put, true);
1878       assertEquals(true, res);
1879 
1880       // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
1881       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER_OR_EQUAL,
1882           new BinaryComparator(val1), put, true);
1883       assertEquals(false, res);
1884 
1885       // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
1886       // succeed (value still = val2)
1887       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER_OR_EQUAL,
1888           new BinaryComparator(val2), put, true);
1889       assertEquals(true, res);
1890 
1891       // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
1892       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER_OR_EQUAL,
1893           new BinaryComparator(val3), put, true);
1894       assertEquals(true, res);
1895     } finally {
1896       HRegion.closeHRegion(this.region);
1897       this.region = null;
1898     }
1899   }
1900 
1901   @Test
1902   public void testCheckAndPut_ThatPutWasWritten() throws IOException {
1903     byte[] row1 = Bytes.toBytes("row1");
1904     byte[] fam1 = Bytes.toBytes("fam1");
1905     byte[] fam2 = Bytes.toBytes("fam2");
1906     byte[] qf1 = Bytes.toBytes("qualifier");
1907     byte[] val1 = Bytes.toBytes("value1");
1908     byte[] val2 = Bytes.toBytes("value2");
1909 
1910     byte[][] families = { fam1, fam2 };
1911 
1912     // Setting up region
1913     String method = this.getName();
1914     this.region = initHRegion(tableName, method, CONF, families);
1915     try {
1916       // Putting data in the key to check
1917       Put put = new Put(row1);
1918       put.add(fam1, qf1, val1);
1919       region.put(put);
1920 
1921       // Creating put to add
1922       long ts = System.currentTimeMillis();
1923       KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
1924       put = new Put(row1);
1925       put.add(kv);
1926 
1927       // checkAndPut with wrong value
1928       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1929           val1), put, true);
1930       assertEquals(true, res);
1931 
1932       Get get = new Get(row1);
1933       get.addColumn(fam2, qf1);
1934       Cell[] actual = region.get(get).rawCells();
1935 
1936       Cell[] expected = { kv };
1937 
1938       assertEquals(expected.length, actual.length);
1939       for (int i = 0; i < actual.length; i++) {
1940         assertEquals(expected[i], actual[i]);
1941       }
1942     } finally {
1943       HRegion.closeHRegion(this.region);
1944       this.region = null;
1945     }
1946   }
1947 
1948   @Test
1949   public void testCheckAndPut_wrongRowInPut() throws IOException {
1950     TableName tableName = TableName.valueOf(name.getMethodName());
1951     this.region = initHRegion(tableName, this.getName(), CONF, COLUMNS);
1952     try {
1953       Put put = new Put(row2);
1954       put.add(fam1, qual1, value1);
1955       try {
1956         region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL,
1957             new BinaryComparator(value2), put, false);
1958         fail();
1959       } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
1960         // expected exception.
1961       }
1962     } finally {
1963       HRegion.closeHRegion(this.region);
1964       this.region = null;
1965     }
1966   }
1967 
1968   @Test
1969   public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
1970     byte[] row1 = Bytes.toBytes("row1");
1971     byte[] fam1 = Bytes.toBytes("fam1");
1972     byte[] fam2 = Bytes.toBytes("fam2");
1973     byte[] qf1 = Bytes.toBytes("qualifier1");
1974     byte[] qf2 = Bytes.toBytes("qualifier2");
1975     byte[] qf3 = Bytes.toBytes("qualifier3");
1976     byte[] val1 = Bytes.toBytes("value1");
1977     byte[] val2 = Bytes.toBytes("value2");
1978     byte[] val3 = Bytes.toBytes("value3");
1979     byte[] emptyVal = new byte[] {};
1980 
1981     byte[][] families = { fam1, fam2 };
1982 
1983     // Setting up region
1984     String method = this.getName();
1985     this.region = initHRegion(tableName, method, CONF, families);
1986     try {
1987       // Put content
1988       Put put = new Put(row1);
1989       put.add(fam1, qf1, val1);
1990       region.put(put);
1991       Threads.sleep(2);
1992 
1993       put = new Put(row1);
1994       put.add(fam1, qf1, val2);
1995       put.add(fam2, qf1, val3);
1996       put.add(fam2, qf2, val2);
1997       put.add(fam2, qf3, val1);
1998       put.add(fam1, qf3, val1);
1999       region.put(put);
2000 
2001       // Multi-column delete
2002       Delete delete = new Delete(row1);
2003       delete.deleteColumn(fam1, qf1);
2004       delete.deleteColumn(fam2, qf1);
2005       delete.deleteColumn(fam1, qf3);
2006       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
2007           val2), delete, true);
2008       assertEquals(true, res);
2009 
2010       Get get = new Get(row1);
2011       get.addColumn(fam1, qf1);
2012       get.addColumn(fam1, qf3);
2013       get.addColumn(fam2, qf2);
2014       Result r = region.get(get);
2015       assertEquals(2, r.size());
2016       assertArrayEquals(val1, r.getValue(fam1, qf1));
2017       assertArrayEquals(val2, r.getValue(fam2, qf2));
2018 
2019       // Family delete
2020       delete = new Delete(row1);
2021       delete.deleteFamily(fam2);
2022       res = region.checkAndMutate(row1, fam2, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
2023           delete, true);
2024       assertEquals(true, res);
2025 
2026       get = new Get(row1);
2027       r = region.get(get);
2028       assertEquals(1, r.size());
2029       assertArrayEquals(val1, r.getValue(fam1, qf1));
2030 
2031       // Row delete
2032       delete = new Delete(row1);
2033       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val1),
2034           delete, true);
2035       assertEquals(true, res);
2036       get = new Get(row1);
2037       r = region.get(get);
2038       assertEquals(0, r.size());
2039     } finally {
2040       HRegion.closeHRegion(this.region);
2041       this.region = null;
2042     }
2043   }
2044 
2045   // ////////////////////////////////////////////////////////////////////////////
2046   // Delete tests
2047   // ////////////////////////////////////////////////////////////////////////////
2048   @Test
2049   public void testDelete_multiDeleteColumn() throws IOException {
2050     byte[] row1 = Bytes.toBytes("row1");
2051     byte[] fam1 = Bytes.toBytes("fam1");
2052     byte[] qual = Bytes.toBytes("qualifier");
2053     byte[] value = Bytes.toBytes("value");
2054 
2055     Put put = new Put(row1);
2056     put.add(fam1, qual, 1, value);
2057     put.add(fam1, qual, 2, value);
2058 
2059     String method = this.getName();
2060     this.region = initHRegion(tableName, method, CONF, fam1);
2061     try {
2062       region.put(put);
2063 
2064       // We do support deleting more than 1 'latest' version
2065       Delete delete = new Delete(row1);
2066       delete.deleteColumn(fam1, qual);
2067       delete.deleteColumn(fam1, qual);
2068       region.delete(delete);
2069 
2070       Get get = new Get(row1);
2071       get.addFamily(fam1);
2072       Result r = region.get(get);
2073       assertEquals(0, r.size());
2074     } finally {
2075       HRegion.closeHRegion(this.region);
2076       this.region = null;
2077     }
2078   }
2079 
2080   @Test
2081   public void testDelete_CheckFamily() throws IOException {
2082     byte[] row1 = Bytes.toBytes("row1");
2083     byte[] fam1 = Bytes.toBytes("fam1");
2084     byte[] fam2 = Bytes.toBytes("fam2");
2085     byte[] fam3 = Bytes.toBytes("fam3");
2086     byte[] fam4 = Bytes.toBytes("fam4");
2087 
2088     // Setting up region
2089     String method = this.getName();
2090     this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
2091     try {
2092       List<Cell> kvs = new ArrayList<Cell>();
2093       kvs.add(new KeyValue(row1, fam4, null, null));
2094 
2095       // testing existing family
2096       byte[] family = fam2;
2097       try {
2098         NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<byte[], List<Cell>>(
2099             Bytes.BYTES_COMPARATOR);
2100         deleteMap.put(family, kvs);
2101         region.delete(deleteMap, Durability.SYNC_WAL);
2102       } catch (Exception e) {
2103         assertTrue("Family " + new String(family) + " does not exist", false);
2104       }
2105 
2106       // testing non existing family
2107       boolean ok = false;
2108       family = fam4;
2109       try {
2110         NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<byte[], List<Cell>>(
2111             Bytes.BYTES_COMPARATOR);
2112         deleteMap.put(family, kvs);
2113         region.delete(deleteMap, Durability.SYNC_WAL);
2114       } catch (Exception e) {
2115         ok = true;
2116       }
2117       assertEquals("Family " + new String(family) + " does exist", true, ok);
2118     } finally {
2119       HRegion.closeHRegion(this.region);
2120       this.region = null;
2121     }
2122   }
2123 
2124   @Test
2125   public void testDelete_mixed() throws IOException, InterruptedException {
2126     byte[] fam = Bytes.toBytes("info");
2127     byte[][] families = { fam };
2128     String method = this.getName();
2129     this.region = initHRegion(tableName, method, CONF, families);
2130     try {
2131       EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2132 
2133       byte[] row = Bytes.toBytes("table_name");
2134       // column names
2135       byte[] serverinfo = Bytes.toBytes("serverinfo");
2136       byte[] splitA = Bytes.toBytes("splitA");
2137       byte[] splitB = Bytes.toBytes("splitB");
2138 
2139       // add some data:
2140       Put put = new Put(row);
2141       put.add(fam, splitA, Bytes.toBytes("reference_A"));
2142       region.put(put);
2143 
2144       put = new Put(row);
2145       put.add(fam, splitB, Bytes.toBytes("reference_B"));
2146       region.put(put);
2147 
2148       put = new Put(row);
2149       put.add(fam, serverinfo, Bytes.toBytes("ip_address"));
2150       region.put(put);
2151 
2152       // ok now delete a split:
2153       Delete delete = new Delete(row);
2154       delete.deleteColumns(fam, splitA);
2155       region.delete(delete);
2156 
2157       // assert some things:
2158       Get get = new Get(row).addColumn(fam, serverinfo);
2159       Result result = region.get(get);
2160       assertEquals(1, result.size());
2161 
2162       get = new Get(row).addColumn(fam, splitA);
2163       result = region.get(get);
2164       assertEquals(0, result.size());
2165 
2166       get = new Get(row).addColumn(fam, splitB);
2167       result = region.get(get);
2168       assertEquals(1, result.size());
2169 
2170       // Assert that after a delete, I can put.
2171       put = new Put(row);
2172       put.add(fam, splitA, Bytes.toBytes("reference_A"));
2173       region.put(put);
2174       get = new Get(row);
2175       result = region.get(get);
2176       assertEquals(3, result.size());
2177 
2178       // Now delete all... then test I can add stuff back
2179       delete = new Delete(row);
2180       region.delete(delete);
2181       assertEquals(0, region.get(get).size());
2182 
2183       region.put(new Put(row).add(fam, splitA, Bytes.toBytes("reference_A")));
2184       result = region.get(get);
2185       assertEquals(1, result.size());
2186     } finally {
2187       HRegion.closeHRegion(this.region);
2188       this.region = null;
2189     }
2190   }
2191 
2192   @Test
2193   public void testDeleteRowWithFutureTs() throws IOException {
2194     byte[] fam = Bytes.toBytes("info");
2195     byte[][] families = { fam };
2196     String method = this.getName();
2197     this.region = initHRegion(tableName, method, CONF, families);
2198     try {
2199       byte[] row = Bytes.toBytes("table_name");
2200       // column names
2201       byte[] serverinfo = Bytes.toBytes("serverinfo");
2202 
2203       // add data in the far future
2204       Put put = new Put(row);
2205       put.add(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
2206       region.put(put);
2207 
2208       // now delete something in the present
2209       Delete delete = new Delete(row);
2210       region.delete(delete);
2211 
2212       // make sure we still see our data
2213       Get get = new Get(row).addColumn(fam, serverinfo);
2214       Result result = region.get(get);
2215       assertEquals(1, result.size());
2216 
2217       // delete the future row
2218       delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
2219       region.delete(delete);
2220 
2221       // make sure it is gone
2222       get = new Get(row).addColumn(fam, serverinfo);
2223       result = region.get(get);
2224       assertEquals(0, result.size());
2225     } finally {
2226       HRegion.closeHRegion(this.region);
2227       this.region = null;
2228     }
2229   }
2230 
2231   /**
2232    * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2233    * the actual timestamp
2234    */
2235   @Test
2236   public void testPutWithLatestTS() throws IOException {
2237     byte[] fam = Bytes.toBytes("info");
2238     byte[][] families = { fam };
2239     String method = this.getName();
2240     this.region = initHRegion(tableName, method, CONF, families);
2241     try {
2242       byte[] row = Bytes.toBytes("row1");
2243       // column names
2244       byte[] qual = Bytes.toBytes("qual");
2245 
2246       // add data with LATEST_TIMESTAMP, put without WAL
2247       Put put = new Put(row);
2248       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2249       region.put(put);
2250 
2251       // Make sure it shows up with an actual timestamp
2252       Get get = new Get(row).addColumn(fam, qual);
2253       Result result = region.get(get);
2254       assertEquals(1, result.size());
2255       Cell kv = result.rawCells()[0];
2256       LOG.info("Got: " + kv);
2257       assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2258           kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2259 
2260       // Check same with WAL enabled (historically these took different
2261       // code paths, so check both)
2262       row = Bytes.toBytes("row2");
2263       put = new Put(row);
2264       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2265       region.put(put);
2266 
2267       // Make sure it shows up with an actual timestamp
2268       get = new Get(row).addColumn(fam, qual);
2269       result = region.get(get);
2270       assertEquals(1, result.size());
2271       kv = result.rawCells()[0];
2272       LOG.info("Got: " + kv);
2273       assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2274           kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2275     } finally {
2276       HRegion.closeHRegion(this.region);
2277       this.region = null;
2278     }
2279 
2280   }
2281 
2282   /**
2283    * Tests that there is server-side filtering for invalid timestamp upper
2284    * bound. Note that the timestamp lower bound is automatically handled for us
2285    * by the TTL field.
2286    */
2287   @Test
2288   public void testPutWithTsSlop() throws IOException {
2289     byte[] fam = Bytes.toBytes("info");
2290     byte[][] families = { fam };
2291     String method = this.getName();
2292 
2293     // add data with a timestamp that is too recent for range. Ensure assert
2294     CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2295     this.region = initHRegion(tableName, method, CONF, families);
2296     boolean caughtExcep = false;
2297     try {
2298       try {
2299         // no TS specified == use latest. should not error
2300         region.put(new Put(row).add(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
2301         // TS out of range. should error
2302         region.put(new Put(row).add(fam, Bytes.toBytes("qual"), System.currentTimeMillis() + 2000,
2303             Bytes.toBytes("value")));
2304         fail("Expected IOE for TS out of configured timerange");
2305       } catch (FailedSanityCheckException ioe) {
2306         LOG.debug("Received expected exception", ioe);
2307         caughtExcep = true;
2308       }
2309       assertTrue("Should catch FailedSanityCheckException", caughtExcep);
2310     } finally {
2311       HRegion.closeHRegion(this.region);
2312       this.region = null;
2313     }
2314   }
2315 
2316   @Test
2317   public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
2318     byte[] fam1 = Bytes.toBytes("columnA");
2319     byte[] fam2 = Bytes.toBytes("columnB");
2320     this.region = initHRegion(tableName, getName(), CONF, fam1, fam2);
2321     try {
2322       byte[] rowA = Bytes.toBytes("rowA");
2323       byte[] rowB = Bytes.toBytes("rowB");
2324 
2325       byte[] value = Bytes.toBytes("value");
2326 
2327       Delete delete = new Delete(rowA);
2328       delete.deleteFamily(fam1);
2329 
2330       region.delete(delete);
2331 
2332       // now create data.
2333       Put put = new Put(rowA);
2334       put.add(fam2, null, value);
2335       region.put(put);
2336 
2337       put = new Put(rowB);
2338       put.add(fam1, null, value);
2339       put.add(fam2, null, value);
2340       region.put(put);
2341 
2342       Scan scan = new Scan();
2343       scan.addFamily(fam1).addFamily(fam2);
2344       InternalScanner s = region.getScanner(scan);
2345       List<Cell> results = new ArrayList<Cell>();
2346       s.next(results);
2347       assertTrue(CellUtil.matchingRow(results.get(0), rowA));
2348 
2349       results.clear();
2350       s.next(results);
2351       assertTrue(CellUtil.matchingRow(results.get(0), rowB));
2352     } finally {
2353       HRegion.closeHRegion(this.region);
2354       this.region = null;
2355     }
2356   }
2357 
2358   @Test
2359   public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
2360     Delete delete = new Delete(row);
2361     delete.deleteColumns(fam1, qual1);
2362     doTestDelete_AndPostInsert(delete);
2363   }
2364 
2365   @Test
2366   public void testDeleteFamily_PostInsert() throws IOException, InterruptedException {
2367     Delete delete = new Delete(row);
2368     delete.deleteFamily(fam1);
2369     doTestDelete_AndPostInsert(delete);
2370   }
2371 
2372   public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
2373     TableName tableName = TableName.valueOf(name.getMethodName());
2374     this.region = initHRegion(tableName, getName(), CONF, fam1);
2375     try {
2376       EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2377       Put put = new Put(row);
2378       put.add(fam1, qual1, value1);
2379       region.put(put);
2380 
2381       // now delete the value:
2382       region.delete(delete);
2383 
2384       // ok put data:
2385       put = new Put(row);
2386       put.add(fam1, qual1, value2);
2387       region.put(put);
2388 
2389       // ok get:
2390       Get get = new Get(row);
2391       get.addColumn(fam1, qual1);
2392 
2393       Result r = region.get(get);
2394       assertEquals(1, r.size());
2395       assertArrayEquals(value2, r.getValue(fam1, qual1));
2396 
2397       // next:
2398       Scan scan = new Scan(row);
2399       scan.addColumn(fam1, qual1);
2400       InternalScanner s = region.getScanner(scan);
2401 
2402       List<Cell> results = new ArrayList<Cell>();
2403       assertEquals(false, s.next(results));
2404       assertEquals(1, results.size());
2405       Cell kv = results.get(0);
2406 
2407       assertArrayEquals(value2, CellUtil.cloneValue(kv));
2408       assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
2409       assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
2410       assertArrayEquals(row, CellUtil.cloneRow(kv));
2411     } finally {
2412       HRegion.closeHRegion(this.region);
2413       this.region = null;
2414     }
2415   }
2416 
2417   @Test
2418   public void testDelete_CheckTimestampUpdated() throws IOException {
2419     TableName tableName = TableName.valueOf(name.getMethodName());
2420     byte[] row1 = Bytes.toBytes("row1");
2421     byte[] col1 = Bytes.toBytes("col1");
2422     byte[] col2 = Bytes.toBytes("col2");
2423     byte[] col3 = Bytes.toBytes("col3");
2424 
2425     // Setting up region
2426     String method = this.getName();
2427     this.region = initHRegion(tableName, method, CONF, fam1);
2428     try {
2429       // Building checkerList
2430       List<Cell> kvs = new ArrayList<Cell>();
2431       kvs.add(new KeyValue(row1, fam1, col1, null));
2432       kvs.add(new KeyValue(row1, fam1, col2, null));
2433       kvs.add(new KeyValue(row1, fam1, col3, null));
2434 
2435       NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<byte[], List<Cell>>(
2436           Bytes.BYTES_COMPARATOR);
2437       deleteMap.put(fam1, kvs);
2438       region.delete(deleteMap, Durability.SYNC_WAL);
2439 
2440       // extract the key values out the memstore:
2441       // This is kinda hacky, but better than nothing...
2442       long now = System.currentTimeMillis();
2443       DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore;
2444       Cell firstCell = memstore.cellSet.first();
2445       assertTrue(firstCell.getTimestamp() <= now);
2446       now = firstCell.getTimestamp();
2447       for (Cell cell : memstore.cellSet) {
2448         assertTrue(cell.getTimestamp() <= now);
2449         now = cell.getTimestamp();
2450       }
2451     } finally {
2452       HRegion.closeHRegion(this.region);
2453       this.region = null;
2454     }
2455   }
2456 
2457   // ////////////////////////////////////////////////////////////////////////////
2458   // Get tests
2459   // ////////////////////////////////////////////////////////////////////////////
2460   @Test
2461   public void testGet_FamilyChecker() throws IOException {
2462     byte[] row1 = Bytes.toBytes("row1");
2463     byte[] fam1 = Bytes.toBytes("fam1");
2464     byte[] fam2 = Bytes.toBytes("False");
2465     byte[] col1 = Bytes.toBytes("col1");
2466 
2467     // Setting up region
2468     String method = this.getName();
2469     this.region = initHRegion(tableName, method, CONF, fam1);
2470     try {
2471       Get get = new Get(row1);
2472       get.addColumn(fam2, col1);
2473 
2474       // Test
2475       try {
2476         region.get(get);
2477       } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
2478         assertFalse(false);
2479         return;
2480       }
2481       assertFalse(true);
2482     } finally {
2483       HRegion.closeHRegion(this.region);
2484       this.region = null;
2485     }
2486   }
2487 
2488   @Test
2489   public void testGet_Basic() throws IOException {
2490     byte[] row1 = Bytes.toBytes("row1");
2491     byte[] fam1 = Bytes.toBytes("fam1");
2492     byte[] col1 = Bytes.toBytes("col1");
2493     byte[] col2 = Bytes.toBytes("col2");
2494     byte[] col3 = Bytes.toBytes("col3");
2495     byte[] col4 = Bytes.toBytes("col4");
2496     byte[] col5 = Bytes.toBytes("col5");
2497 
2498     // Setting up region
2499     String method = this.getName();
2500     this.region = initHRegion(tableName, method, CONF, fam1);
2501     try {
2502       // Add to memstore
2503       Put put = new Put(row1);
2504       put.add(fam1, col1, null);
2505       put.add(fam1, col2, null);
2506       put.add(fam1, col3, null);
2507       put.add(fam1, col4, null);
2508       put.add(fam1, col5, null);
2509       region.put(put);
2510 
2511       Get get = new Get(row1);
2512       get.addColumn(fam1, col2);
2513       get.addColumn(fam1, col4);
2514       // Expected result
2515       KeyValue kv1 = new KeyValue(row1, fam1, col2);
2516       KeyValue kv2 = new KeyValue(row1, fam1, col4);
2517       KeyValue[] expected = { kv1, kv2 };
2518 
2519       // Test
2520       Result res = region.get(get);
2521       assertEquals(expected.length, res.size());
2522       for (int i = 0; i < res.size(); i++) {
2523         assertTrue(CellUtil.matchingRow(expected[i], res.rawCells()[i]));
2524         assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
2525         assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
2526       }
2527 
2528       // Test using a filter on a Get
2529       Get g = new Get(row1);
2530       final int count = 2;
2531       g.setFilter(new ColumnCountGetFilter(count));
2532       res = region.get(g);
2533       assertEquals(count, res.size());
2534     } finally {
2535       HRegion.closeHRegion(this.region);
2536       this.region = null;
2537     }
2538   }
2539 
2540   @Test
2541   public void testGet_Empty() throws IOException {
2542     byte[] row = Bytes.toBytes("row");
2543     byte[] fam = Bytes.toBytes("fam");
2544 
2545     String method = this.getName();
2546     this.region = initHRegion(tableName, method, CONF, fam);
2547     try {
2548       Get get = new Get(row);
2549       get.addFamily(fam);
2550       Result r = region.get(get);
2551 
2552       assertTrue(r.isEmpty());
2553     } finally {
2554       HRegion.closeHRegion(this.region);
2555       this.region = null;
2556     }
2557   }
2558 
2559   // ////////////////////////////////////////////////////////////////////////////
2560   // Merge test
2561   // ////////////////////////////////////////////////////////////////////////////
2562   @Test
2563   public void testMerge() throws IOException {
2564     byte[][] families = { fam1, fam2, fam3 };
2565     Configuration hc = initSplit();
2566     // Setting up region
2567     String method = this.getName();
2568     this.region = initHRegion(tableName, method, hc, families);
2569     try {
2570       LOG.info("" + HBaseTestCase.addContent(region, fam3));
2571       region.flush(true);
2572       region.compactStores();
2573       byte[] splitRow = region.checkSplit();
2574       assertNotNull(splitRow);
2575       LOG.info("SplitRow: " + Bytes.toString(splitRow));
2576       HRegion[] subregions = splitRegion(region, splitRow);
2577       try {
2578         // Need to open the regions.
2579         for (int i = 0; i < subregions.length; i++) {
2580           HRegion.openHRegion(subregions[i], null);
2581           subregions[i].compactStores();
2582         }
2583         Path oldRegionPath = region.getRegionFileSystem().getRegionDir();
2584         Path oldRegion1 = subregions[0].getRegionFileSystem().getRegionDir();
2585         Path oldRegion2 = subregions[1].getRegionFileSystem().getRegionDir();
2586         long startTime = System.currentTimeMillis();
2587         region = HRegion.mergeAdjacent(subregions[0], subregions[1]);
2588         LOG.info("Merge regions elapsed time: "
2589             + ((System.currentTimeMillis() - startTime) / 1000.0));
2590         FILESYSTEM.delete(oldRegion1, true);
2591         FILESYSTEM.delete(oldRegion2, true);
2592         FILESYSTEM.delete(oldRegionPath, true);
2593         LOG.info("splitAndMerge completed.");
2594       } finally {
2595         for (int i = 0; i < subregions.length; i++) {
2596           try {
2597             HRegion.closeHRegion(subregions[i]);
2598           } catch (IOException e) {
2599             // Ignore.
2600           }
2601         }
2602       }
2603     } finally {
2604       HRegion.closeHRegion(this.region);
2605       this.region = null;
2606     }
2607   }
2608 
2609   /**
2610    * @param parent
2611    *          Region to split.
2612    * @param midkey
2613    *          Key to split around.
2614    * @return The Regions we created.
2615    * @throws IOException
2616    */
2617   HRegion[] splitRegion(final HRegion parent, final byte[] midkey) throws IOException {
2618     PairOfSameType<Region> result = null;
2619     SplitTransactionImpl st = new SplitTransactionImpl(parent, midkey);
2620     // If prepare does not return true, for some reason -- logged inside in
2621     // the prepare call -- we are not ready to split just now. Just return.
2622     if (!st.prepare()) {
2623       parent.clearSplit();
2624       return null;
2625     }
2626     try {
2627       result = st.execute(null, null);
2628     } catch (IOException ioe) {
2629       try {
2630         LOG.info("Running rollback of failed split of " +
2631           parent.getRegionInfo().getRegionNameAsString() + "; " + ioe.getMessage());
2632         st.rollback(null, null);
2633         LOG.info("Successful rollback of failed split of " +
2634           parent.getRegionInfo().getRegionNameAsString());
2635         return null;
2636       } catch (RuntimeException e) {
2637         // If failed rollback, kill this server to avoid having a hole in table.
2638         LOG.info("Failed rollback of failed split of " +
2639           parent.getRegionInfo().getRegionNameAsString() + " -- aborting server", e);
2640       }
2641     }
2642     finally {
2643       parent.clearSplit();
2644     }
2645     return new HRegion[] { (HRegion)result.getFirst(), (HRegion)result.getSecond() };
2646   }
2647 
2648   // ////////////////////////////////////////////////////////////////////////////
2649   // Scanner tests
2650   // ////////////////////////////////////////////////////////////////////////////
2651   @Test
2652   public void testGetScanner_WithOkFamilies() throws IOException {
2653     byte[] fam1 = Bytes.toBytes("fam1");
2654     byte[] fam2 = Bytes.toBytes("fam2");
2655 
2656     byte[][] families = { fam1, fam2 };
2657 
2658     // Setting up region
2659     String method = this.getName();
2660     this.region = initHRegion(tableName, method, CONF, families);
2661     try {
2662       Scan scan = new Scan();
2663       scan.addFamily(fam1);
2664       scan.addFamily(fam2);
2665       try {
2666         region.getScanner(scan);
2667       } catch (Exception e) {
2668         assertTrue("Families could not be found in Region", false);
2669       }
2670     } finally {
2671       HRegion.closeHRegion(this.region);
2672       this.region = null;
2673     }
2674   }
2675 
2676   @Test
2677   public void testGetScanner_WithNotOkFamilies() throws IOException {
2678     byte[] fam1 = Bytes.toBytes("fam1");
2679     byte[] fam2 = Bytes.toBytes("fam2");
2680 
2681     byte[][] families = { fam1 };
2682 
2683     // Setting up region
2684     String method = this.getName();
2685     this.region = initHRegion(tableName, method, CONF, families);
2686     try {
2687       Scan scan = new Scan();
2688       scan.addFamily(fam2);
2689       boolean ok = false;
2690       try {
2691         region.getScanner(scan);
2692       } catch (Exception e) {
2693         ok = true;
2694       }
2695       assertTrue("Families could not be found in Region", ok);
2696     } finally {
2697       HRegion.closeHRegion(this.region);
2698       this.region = null;
2699     }
2700   }
2701 
2702   @Test
2703   public void testGetScanner_WithNoFamilies() throws IOException {
2704     byte[] row1 = Bytes.toBytes("row1");
2705     byte[] fam1 = Bytes.toBytes("fam1");
2706     byte[] fam2 = Bytes.toBytes("fam2");
2707     byte[] fam3 = Bytes.toBytes("fam3");
2708     byte[] fam4 = Bytes.toBytes("fam4");
2709 
2710     byte[][] families = { fam1, fam2, fam3, fam4 };
2711 
2712     // Setting up region
2713     String method = this.getName();
2714     this.region = initHRegion(tableName, method, CONF, families);
2715     try {
2716 
2717       // Putting data in Region
2718       Put put = new Put(row1);
2719       put.add(fam1, null, null);
2720       put.add(fam2, null, null);
2721       put.add(fam3, null, null);
2722       put.add(fam4, null, null);
2723       region.put(put);
2724 
2725       Scan scan = null;
2726       HRegion.RegionScannerImpl is = null;
2727 
2728       // Testing to see how many scanners that is produced by getScanner,
2729       // starting
2730       // with known number, 2 - current = 1
2731       scan = new Scan();
2732       scan.addFamily(fam2);
2733       scan.addFamily(fam4);
2734       is = (RegionScannerImpl) region.getScanner(scan);
2735       assertEquals(1, ((RegionScannerImpl) is).storeHeap.getHeap().size());
2736 
2737       scan = new Scan();
2738       is = (RegionScannerImpl) region.getScanner(scan);
2739       assertEquals(families.length - 1, ((RegionScannerImpl) is).storeHeap.getHeap().size());
2740     } finally {
2741       HRegion.closeHRegion(this.region);
2742       this.region = null;
2743     }
2744   }
2745 
2746   /**
2747    * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
2748    *
2749    * @throws IOException
2750    */
2751   @Test
2752   public void testGetScanner_WithRegionClosed() throws IOException {
2753     byte[] fam1 = Bytes.toBytes("fam1");
2754     byte[] fam2 = Bytes.toBytes("fam2");
2755 
2756     byte[][] families = { fam1, fam2 };
2757 
2758     // Setting up region
2759     String method = this.getName();
2760     try {
2761       this.region = initHRegion(tableName, method, CONF, families);
2762     } catch (IOException e) {
2763       e.printStackTrace();
2764       fail("Got IOException during initHRegion, " + e.getMessage());
2765     }
2766     try {
2767       region.closed.set(true);
2768       try {
2769         region.getScanner(null);
2770         fail("Expected to get an exception during getScanner on a region that is closed");
2771       } catch (NotServingRegionException e) {
2772         // this is the correct exception that is expected
2773       } catch (IOException e) {
2774         fail("Got wrong type of exception - should be a NotServingRegionException, but was an IOException: "
2775             + e.getMessage());
2776       }
2777     } finally {
2778       HRegion.closeHRegion(this.region);
2779       this.region = null;
2780     }
2781   }
2782 
2783   @Test
2784   public void testRegionScanner_Next() throws IOException {
2785     byte[] row1 = Bytes.toBytes("row1");
2786     byte[] row2 = Bytes.toBytes("row2");
2787     byte[] fam1 = Bytes.toBytes("fam1");
2788     byte[] fam2 = Bytes.toBytes("fam2");
2789     byte[] fam3 = Bytes.toBytes("fam3");
2790     byte[] fam4 = Bytes.toBytes("fam4");
2791 
2792     byte[][] families = { fam1, fam2, fam3, fam4 };
2793     long ts = System.currentTimeMillis();
2794 
2795     // Setting up region
2796     String method = this.getName();
2797     this.region = initHRegion(tableName, method, CONF, families);
2798     try {
2799       // Putting data in Region
2800       Put put = null;
2801       put = new Put(row1);
2802       put.add(fam1, (byte[]) null, ts, null);
2803       put.add(fam2, (byte[]) null, ts, null);
2804       put.add(fam3, (byte[]) null, ts, null);
2805       put.add(fam4, (byte[]) null, ts, null);
2806       region.put(put);
2807 
2808       put = new Put(row2);
2809       put.add(fam1, (byte[]) null, ts, null);
2810       put.add(fam2, (byte[]) null, ts, null);
2811       put.add(fam3, (byte[]) null, ts, null);
2812       put.add(fam4, (byte[]) null, ts, null);
2813       region.put(put);
2814 
2815       Scan scan = new Scan();
2816       scan.addFamily(fam2);
2817       scan.addFamily(fam4);
2818       InternalScanner is = region.getScanner(scan);
2819 
2820       List<Cell> res = null;
2821 
2822       // Result 1
2823       List<Cell> expected1 = new ArrayList<Cell>();
2824       expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
2825       expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
2826 
2827       res = new ArrayList<Cell>();
2828       is.next(res);
2829       for (int i = 0; i < res.size(); i++) {
2830         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
2831       }
2832 
2833       // Result 2
2834       List<Cell> expected2 = new ArrayList<Cell>();
2835       expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
2836       expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
2837 
2838       res = new ArrayList<Cell>();
2839       is.next(res);
2840       for (int i = 0; i < res.size(); i++) {
2841         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
2842       }
2843     } finally {
2844       HRegion.closeHRegion(this.region);
2845       this.region = null;
2846     }
2847   }
2848 
2849   @Test
2850   public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
2851     byte[] row1 = Bytes.toBytes("row1");
2852     byte[] qf1 = Bytes.toBytes("qualifier1");
2853     byte[] qf2 = Bytes.toBytes("qualifier2");
2854     byte[] fam1 = Bytes.toBytes("fam1");
2855     byte[][] families = { fam1 };
2856 
2857     long ts1 = System.currentTimeMillis();
2858     long ts2 = ts1 + 1;
2859     long ts3 = ts1 + 2;
2860 
2861     // Setting up region
2862     String method = this.getName();
2863     this.region = initHRegion(tableName, method, CONF, families);
2864     try {
2865       // Putting data in Region
2866       Put put = null;
2867       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2868       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2869       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2870 
2871       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2872       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2873       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2874 
2875       put = new Put(row1);
2876       put.add(kv13);
2877       put.add(kv12);
2878       put.add(kv11);
2879       put.add(kv23);
2880       put.add(kv22);
2881       put.add(kv21);
2882       region.put(put);
2883 
2884       // Expected
2885       List<Cell> expected = new ArrayList<Cell>();
2886       expected.add(kv13);
2887       expected.add(kv12);
2888 
2889       Scan scan = new Scan(row1);
2890       scan.addColumn(fam1, qf1);
2891       scan.setMaxVersions(MAX_VERSIONS);
2892       List<Cell> actual = new ArrayList<Cell>();
2893       InternalScanner scanner = region.getScanner(scan);
2894 
2895       boolean hasNext = scanner.next(actual);
2896       assertEquals(false, hasNext);
2897 
2898       // Verify result
2899       for (int i = 0; i < expected.size(); i++) {
2900         assertEquals(expected.get(i), actual.get(i));
2901       }
2902     } finally {
2903       HRegion.closeHRegion(this.region);
2904       this.region = null;
2905     }
2906   }
2907 
2908   @Test
2909   public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
2910     byte[] row1 = Bytes.toBytes("row1");
2911     byte[] qf1 = Bytes.toBytes("qualifier1");
2912     byte[] qf2 = Bytes.toBytes("qualifier2");
2913     byte[] fam1 = Bytes.toBytes("fam1");
2914     byte[][] families = { fam1 };
2915 
2916     long ts1 = 1; // System.currentTimeMillis();
2917     long ts2 = ts1 + 1;
2918     long ts3 = ts1 + 2;
2919 
2920     // Setting up region
2921     String method = this.getName();
2922     this.region = initHRegion(tableName, method, CONF, families);
2923     try {
2924       // Putting data in Region
2925       Put put = null;
2926       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2927       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2928       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2929 
2930       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2931       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2932       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2933 
2934       put = new Put(row1);
2935       put.add(kv13);
2936       put.add(kv12);
2937       put.add(kv11);
2938       put.add(kv23);
2939       put.add(kv22);
2940       put.add(kv21);
2941       region.put(put);
2942       region.flush(true);
2943 
2944       // Expected
2945       List<Cell> expected = new ArrayList<Cell>();
2946       expected.add(kv13);
2947       expected.add(kv12);
2948       expected.add(kv23);
2949       expected.add(kv22);
2950 
2951       Scan scan = new Scan(row1);
2952       scan.addColumn(fam1, qf1);
2953       scan.addColumn(fam1, qf2);
2954       scan.setMaxVersions(MAX_VERSIONS);
2955       List<Cell> actual = new ArrayList<Cell>();
2956       InternalScanner scanner = region.getScanner(scan);
2957 
2958       boolean hasNext = scanner.next(actual);
2959       assertEquals(false, hasNext);
2960 
2961       // Verify result
2962       for (int i = 0; i < expected.size(); i++) {
2963         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
2964       }
2965     } finally {
2966       HRegion.closeHRegion(this.region);
2967       this.region = null;
2968     }
2969   }
2970 
2971   @Test
2972   public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws IOException {
2973     byte[] row1 = Bytes.toBytes("row1");
2974     byte[] fam1 = Bytes.toBytes("fam1");
2975     byte[][] families = { fam1 };
2976     byte[] qf1 = Bytes.toBytes("qualifier1");
2977     byte[] qf2 = Bytes.toBytes("qualifier2");
2978 
2979     long ts1 = 1;
2980     long ts2 = ts1 + 1;
2981     long ts3 = ts1 + 2;
2982     long ts4 = ts1 + 3;
2983 
2984     // Setting up region
2985     String method = this.getName();
2986     this.region = initHRegion(tableName, method, CONF, families);
2987     try {
2988       // Putting data in Region
2989       KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
2990       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2991       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2992       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2993 
2994       KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
2995       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2996       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2997       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2998 
2999       Put put = null;
3000       put = new Put(row1);
3001       put.add(kv14);
3002       put.add(kv24);
3003       region.put(put);
3004       region.flush(true);
3005 
3006       put = new Put(row1);
3007       put.add(kv23);
3008       put.add(kv13);
3009       region.put(put);
3010       region.flush(true);
3011 
3012       put = new Put(row1);
3013       put.add(kv22);
3014       put.add(kv12);
3015       region.put(put);
3016       region.flush(true);
3017 
3018       put = new Put(row1);
3019       put.add(kv21);
3020       put.add(kv11);
3021       region.put(put);
3022 
3023       // Expected
3024       List<Cell> expected = new ArrayList<Cell>();
3025       expected.add(kv14);
3026       expected.add(kv13);
3027       expected.add(kv12);
3028       expected.add(kv24);
3029       expected.add(kv23);
3030       expected.add(kv22);
3031 
3032       Scan scan = new Scan(row1);
3033       scan.addColumn(fam1, qf1);
3034       scan.addColumn(fam1, qf2);
3035       int versions = 3;
3036       scan.setMaxVersions(versions);
3037       List<Cell> actual = new ArrayList<Cell>();
3038       InternalScanner scanner = region.getScanner(scan);
3039 
3040       boolean hasNext = scanner.next(actual);
3041       assertEquals(false, hasNext);
3042 
3043       // Verify result
3044       for (int i = 0; i < expected.size(); i++) {
3045         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3046       }
3047     } finally {
3048       HRegion.closeHRegion(this.region);
3049       this.region = null;
3050     }
3051   }
3052 
3053   @Test
3054   public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
3055     byte[] row1 = Bytes.toBytes("row1");
3056     byte[] qf1 = Bytes.toBytes("qualifier1");
3057     byte[] qf2 = Bytes.toBytes("qualifier2");
3058     byte[] fam1 = Bytes.toBytes("fam1");
3059     byte[][] families = { fam1 };
3060 
3061     long ts1 = System.currentTimeMillis();
3062     long ts2 = ts1 + 1;
3063     long ts3 = ts1 + 2;
3064 
3065     // Setting up region
3066     String method = this.getName();
3067     this.region = initHRegion(tableName, method, CONF, families);
3068     try {
3069       // Putting data in Region
3070       Put put = null;
3071       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3072       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3073       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3074 
3075       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3076       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3077       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3078 
3079       put = new Put(row1);
3080       put.add(kv13);
3081       put.add(kv12);
3082       put.add(kv11);
3083       put.add(kv23);
3084       put.add(kv22);
3085       put.add(kv21);
3086       region.put(put);
3087 
3088       // Expected
3089       List<Cell> expected = new ArrayList<Cell>();
3090       expected.add(kv13);
3091       expected.add(kv12);
3092       expected.add(kv23);
3093       expected.add(kv22);
3094 
3095       Scan scan = new Scan(row1);
3096       scan.addFamily(fam1);
3097       scan.setMaxVersions(MAX_VERSIONS);
3098       List<Cell> actual = new ArrayList<Cell>();
3099       InternalScanner scanner = region.getScanner(scan);
3100 
3101       boolean hasNext = scanner.next(actual);
3102       assertEquals(false, hasNext);
3103 
3104       // Verify result
3105       for (int i = 0; i < expected.size(); i++) {
3106         assertEquals(expected.get(i), actual.get(i));
3107       }
3108     } finally {
3109       HRegion.closeHRegion(this.region);
3110       this.region = null;
3111     }
3112   }
3113 
3114   @Test
3115   public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
3116     byte[] row1 = Bytes.toBytes("row1");
3117     byte[] qf1 = Bytes.toBytes("qualifier1");
3118     byte[] qf2 = Bytes.toBytes("qualifier2");
3119     byte[] fam1 = Bytes.toBytes("fam1");
3120 
3121     long ts1 = 1; // System.currentTimeMillis();
3122     long ts2 = ts1 + 1;
3123     long ts3 = ts1 + 2;
3124 
3125     // Setting up region
3126     String method = this.getName();
3127     this.region = initHRegion(tableName, method, CONF, fam1);
3128     try {
3129       // Putting data in Region
3130       Put put = null;
3131       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3132       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3133       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3134 
3135       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3136       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3137       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3138 
3139       put = new Put(row1);
3140       put.add(kv13);
3141       put.add(kv12);
3142       put.add(kv11);
3143       put.add(kv23);
3144       put.add(kv22);
3145       put.add(kv21);
3146       region.put(put);
3147       region.flush(true);
3148 
3149       // Expected
3150       List<Cell> expected = new ArrayList<Cell>();
3151       expected.add(kv13);
3152       expected.add(kv12);
3153       expected.add(kv23);
3154       expected.add(kv22);
3155 
3156       Scan scan = new Scan(row1);
3157       scan.addFamily(fam1);
3158       scan.setMaxVersions(MAX_VERSIONS);
3159       List<Cell> actual = new ArrayList<Cell>();
3160       InternalScanner scanner = region.getScanner(scan);
3161 
3162       boolean hasNext = scanner.next(actual);
3163       assertEquals(false, hasNext);
3164 
3165       // Verify result
3166       for (int i = 0; i < expected.size(); i++) {
3167         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3168       }
3169     } finally {
3170       HRegion.closeHRegion(this.region);
3171       this.region = null;
3172     }
3173   }
3174 
3175   @Test
3176   public void testScanner_StopRow1542() throws IOException {
3177     byte[] family = Bytes.toBytes("testFamily");
3178     this.region = initHRegion(tableName, getName(), CONF, family);
3179     try {
3180       byte[] row1 = Bytes.toBytes("row111");
3181       byte[] row2 = Bytes.toBytes("row222");
3182       byte[] row3 = Bytes.toBytes("row333");
3183       byte[] row4 = Bytes.toBytes("row444");
3184       byte[] row5 = Bytes.toBytes("row555");
3185 
3186       byte[] col1 = Bytes.toBytes("Pub111");
3187       byte[] col2 = Bytes.toBytes("Pub222");
3188 
3189       Put put = new Put(row1);
3190       put.add(family, col1, Bytes.toBytes(10L));
3191       region.put(put);
3192 
3193       put = new Put(row2);
3194       put.add(family, col1, Bytes.toBytes(15L));
3195       region.put(put);
3196 
3197       put = new Put(row3);
3198       put.add(family, col2, Bytes.toBytes(20L));
3199       region.put(put);
3200 
3201       put = new Put(row4);
3202       put.add(family, col2, Bytes.toBytes(30L));
3203       region.put(put);
3204 
3205       put = new Put(row5);
3206       put.add(family, col1, Bytes.toBytes(40L));
3207       region.put(put);
3208 
3209       Scan scan = new Scan(row3, row4);
3210       scan.setMaxVersions();
3211       scan.addColumn(family, col1);
3212       InternalScanner s = region.getScanner(scan);
3213 
3214       List<Cell> results = new ArrayList<Cell>();
3215       assertEquals(false, s.next(results));
3216       assertEquals(0, results.size());
3217     } finally {
3218       HRegion.closeHRegion(this.region);
3219       this.region = null;
3220     }
3221   }
3222 
3223   @Test
3224   public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
3225     byte[] row1 = Bytes.toBytes("row1");
3226     byte[] fam1 = Bytes.toBytes("fam1");
3227     byte[] qf1 = Bytes.toBytes("qualifier1");
3228     byte[] qf2 = Bytes.toBytes("quateslifier2");
3229 
3230     long ts1 = 1;
3231     long ts2 = ts1 + 1;
3232     long ts3 = ts1 + 2;
3233     long ts4 = ts1 + 3;
3234 
3235     // Setting up region
3236     String method = this.getName();
3237     this.region = initHRegion(tableName, method, CONF, fam1);
3238     try {
3239       // Putting data in Region
3240       KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3241       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3242       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3243       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3244 
3245       KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3246       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3247       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3248       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3249 
3250       Put put = null;
3251       put = new Put(row1);
3252       put.add(kv14);
3253       put.add(kv24);
3254       region.put(put);
3255       region.flush(true);
3256 
3257       put = new Put(row1);
3258       put.add(kv23);
3259       put.add(kv13);
3260       region.put(put);
3261       region.flush(true);
3262 
3263       put = new Put(row1);
3264       put.add(kv22);
3265       put.add(kv12);
3266       region.put(put);
3267       region.flush(true);
3268 
3269       put = new Put(row1);
3270       put.add(kv21);
3271       put.add(kv11);
3272       region.put(put);
3273 
3274       // Expected
3275       List<KeyValue> expected = new ArrayList<KeyValue>();
3276       expected.add(kv14);
3277       expected.add(kv13);
3278       expected.add(kv12);
3279       expected.add(kv24);
3280       expected.add(kv23);
3281       expected.add(kv22);
3282 
3283       Scan scan = new Scan(row1);
3284       int versions = 3;
3285       scan.setMaxVersions(versions);
3286       List<Cell> actual = new ArrayList<Cell>();
3287       InternalScanner scanner = region.getScanner(scan);
3288 
3289       boolean hasNext = scanner.next(actual);
3290       assertEquals(false, hasNext);
3291 
3292       // Verify result
3293       for (int i = 0; i < expected.size(); i++) {
3294         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3295       }
3296     } finally {
3297       HRegion.closeHRegion(this.region);
3298       this.region = null;
3299     }
3300   }
3301 
3302   /**
3303    * Added for HBASE-5416
3304    *
3305    * Here we test scan optimization when only subset of CFs are used in filter
3306    * conditions.
3307    */
3308   @Test
3309   public void testScanner_JoinedScanners() throws IOException {
3310     byte[] cf_essential = Bytes.toBytes("essential");
3311     byte[] cf_joined = Bytes.toBytes("joined");
3312     byte[] cf_alpha = Bytes.toBytes("alpha");
3313     this.region = initHRegion(tableName, getName(), CONF, cf_essential, cf_joined, cf_alpha);
3314     try {
3315       byte[] row1 = Bytes.toBytes("row1");
3316       byte[] row2 = Bytes.toBytes("row2");
3317       byte[] row3 = Bytes.toBytes("row3");
3318 
3319       byte[] col_normal = Bytes.toBytes("d");
3320       byte[] col_alpha = Bytes.toBytes("a");
3321 
3322       byte[] filtered_val = Bytes.toBytes(3);
3323 
3324       Put put = new Put(row1);
3325       put.add(cf_essential, col_normal, Bytes.toBytes(1));
3326       put.add(cf_joined, col_alpha, Bytes.toBytes(1));
3327       region.put(put);
3328 
3329       put = new Put(row2);
3330       put.add(cf_essential, col_alpha, Bytes.toBytes(2));
3331       put.add(cf_joined, col_normal, Bytes.toBytes(2));
3332       put.add(cf_alpha, col_alpha, Bytes.toBytes(2));
3333       region.put(put);
3334 
3335       put = new Put(row3);
3336       put.add(cf_essential, col_normal, filtered_val);
3337       put.add(cf_joined, col_normal, filtered_val);
3338       region.put(put);
3339 
3340       // Check two things:
3341       // 1. result list contains expected values
3342       // 2. result list is sorted properly
3343 
3344       Scan scan = new Scan();
3345       Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
3346           CompareOp.NOT_EQUAL, filtered_val);
3347       scan.setFilter(filter);
3348       scan.setLoadColumnFamiliesOnDemand(true);
3349       InternalScanner s = region.getScanner(scan);
3350 
3351       List<Cell> results = new ArrayList<Cell>();
3352       assertTrue(s.next(results));
3353       assertEquals(results.size(), 1);
3354       results.clear();
3355 
3356       assertTrue(s.next(results));
3357       assertEquals(results.size(), 3);
3358       assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
3359       assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
3360       assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
3361       results.clear();
3362 
3363       assertFalse(s.next(results));
3364       assertEquals(results.size(), 0);
3365     } finally {
3366       HRegion.closeHRegion(this.region);
3367       this.region = null;
3368     }
3369   }
3370 
3371   /**
3372    * HBASE-5416
3373    *
3374    * Test case when scan limits amount of KVs returned on each next() call.
3375    */
3376   @Test
3377   public void testScanner_JoinedScannersWithLimits() throws IOException {
3378     final byte[] cf_first = Bytes.toBytes("first");
3379     final byte[] cf_second = Bytes.toBytes("second");
3380 
3381     this.region = initHRegion(tableName, getName(), CONF, cf_first, cf_second);
3382     try {
3383       final byte[] col_a = Bytes.toBytes("a");
3384       final byte[] col_b = Bytes.toBytes("b");
3385 
3386       Put put;
3387 
3388       for (int i = 0; i < 10; i++) {
3389         put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
3390         put.add(cf_first, col_a, Bytes.toBytes(i));
3391         if (i < 5) {
3392           put.add(cf_first, col_b, Bytes.toBytes(i));
3393           put.add(cf_second, col_a, Bytes.toBytes(i));
3394           put.add(cf_second, col_b, Bytes.toBytes(i));
3395         }
3396         region.put(put);
3397       }
3398 
3399       Scan scan = new Scan();
3400       scan.setLoadColumnFamiliesOnDemand(true);
3401       Filter bogusFilter = new FilterBase() {
3402         @Override
3403         public ReturnCode filterKeyValue(Cell ignored) throws IOException {
3404           return ReturnCode.INCLUDE;
3405         }
3406         @Override
3407         public boolean isFamilyEssential(byte[] name) {
3408           return Bytes.equals(name, cf_first);
3409         }
3410       };
3411 
3412       scan.setFilter(bogusFilter);
3413       InternalScanner s = region.getScanner(scan);
3414 
3415       // Our data looks like this:
3416       // r0: first:a, first:b, second:a, second:b
3417       // r1: first:a, first:b, second:a, second:b
3418       // r2: first:a, first:b, second:a, second:b
3419       // r3: first:a, first:b, second:a, second:b
3420       // r4: first:a, first:b, second:a, second:b
3421       // r5: first:a
3422       // r6: first:a
3423       // r7: first:a
3424       // r8: first:a
3425       // r9: first:a
3426 
3427       // But due to next's limit set to 3, we should get this:
3428       // r0: first:a, first:b, second:a
3429       // r0: second:b
3430       // r1: first:a, first:b, second:a
3431       // r1: second:b
3432       // r2: first:a, first:b, second:a
3433       // r2: second:b
3434       // r3: first:a, first:b, second:a
3435       // r3: second:b
3436       // r4: first:a, first:b, second:a
3437       // r4: second:b
3438       // r5: first:a
3439       // r6: first:a
3440       // r7: first:a
3441       // r8: first:a
3442       // r9: first:a
3443 
3444       List<Cell> results = new ArrayList<Cell>();
3445       int index = 0;
3446       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build();
3447       while (true) {
3448         boolean more = s.next(results, scannerContext);
3449         if ((index >> 1) < 5) {
3450           if (index % 2 == 0)
3451             assertEquals(results.size(), 3);
3452           else
3453             assertEquals(results.size(), 1);
3454         } else
3455           assertEquals(results.size(), 1);
3456         results.clear();
3457         index++;
3458         if (!more)
3459           break;
3460       }
3461     } finally {
3462       HRegion.closeHRegion(this.region);
3463       this.region = null;
3464     }
3465   }
3466 
3467   /**
3468    * Write an HFile block full with Cells whose qualifier that are identical between
3469    * 0 and Short.MAX_VALUE. See HBASE-13329.
3470    * @throws Exception
3471    */
3472   @Test
3473   public void testLongQualifier() throws Exception {
3474     String method = name.getMethodName();
3475     TableName tableName = TableName.valueOf(method);
3476     byte[] family = Bytes.toBytes("family");
3477     this.region = initHRegion(tableName, method, CONF, family);
3478     byte[] q = new byte[Short.MAX_VALUE+2];
3479     Arrays.fill(q, 0, q.length-1, (byte)42);
3480     for (byte i=0; i<10; i++) {
3481       Put p = new Put(Bytes.toBytes("row"));
3482       // qualifiers that differ past Short.MAX_VALUE
3483       q[q.length-1]=i;
3484       p.addColumn(family, q, q);
3485       region.put(p);
3486     }
3487     region.flush(false);
3488     HRegion.closeHRegion(this.region);
3489     this.region = null;
3490   }
3491   // ////////////////////////////////////////////////////////////////////////////
3492   // Split test
3493   // ////////////////////////////////////////////////////////////////////////////
3494   /**
3495    * Splits twice and verifies getting from each of the split regions.
3496    *
3497    * @throws Exception
3498    */
3499   @Test
3500   public void testBasicSplit() throws Exception {
3501     byte[][] families = { fam1, fam2, fam3 };
3502 
3503     Configuration hc = initSplit();
3504     // Setting up region
3505     String method = this.getName();
3506     this.region = initHRegion(tableName, method, hc, families);
3507 
3508     try {
3509       LOG.info("" + HBaseTestCase.addContent(region, fam3));
3510       region.flush(true);
3511       region.compactStores();
3512       byte[] splitRow = region.checkSplit();
3513       assertNotNull(splitRow);
3514       LOG.info("SplitRow: " + Bytes.toString(splitRow));
3515       HRegion[] regions = splitRegion(region, splitRow);
3516       try {
3517         // Need to open the regions.
3518         // TODO: Add an 'open' to HRegion... don't do open by constructing
3519         // instance.
3520         for (int i = 0; i < regions.length; i++) {
3521           regions[i] = HRegion.openHRegion(regions[i], null);
3522         }
3523         // Assert can get rows out of new regions. Should be able to get first
3524         // row from first region and the midkey from second region.
3525         assertGet(regions[0], fam3, Bytes.toBytes(START_KEY));
3526         assertGet(regions[1], fam3, splitRow);
3527         // Test I can get scanner and that it starts at right place.
3528         assertScan(regions[0], fam3, Bytes.toBytes(START_KEY));
3529         assertScan(regions[1], fam3, splitRow);
3530         // Now prove can't split regions that have references.
3531         for (int i = 0; i < regions.length; i++) {
3532           // Add so much data to this region, we create a store file that is >
3533           // than one of our unsplitable references. it will.
3534           for (int j = 0; j < 2; j++) {
3535             HBaseTestCase.addContent(regions[i], fam3);
3536           }
3537           HBaseTestCase.addContent(regions[i], fam2);
3538           HBaseTestCase.addContent(regions[i], fam1);
3539           regions[i].flush(true);
3540         }
3541 
3542         byte[][] midkeys = new byte[regions.length][];
3543         // To make regions splitable force compaction.
3544         for (int i = 0; i < regions.length; i++) {
3545           regions[i].compactStores();
3546           midkeys[i] = regions[i].checkSplit();
3547         }
3548 
3549         TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>();
3550         // Split these two daughter regions so then I'll have 4 regions. Will
3551         // split because added data above.
3552         for (int i = 0; i < regions.length; i++) {
3553           HRegion[] rs = null;
3554           if (midkeys[i] != null) {
3555             rs = splitRegion(regions[i], midkeys[i]);
3556             for (int j = 0; j < rs.length; j++) {
3557               sortedMap.put(Bytes.toString(rs[j].getRegionInfo().getRegionName()),
3558                 HRegion.openHRegion(rs[j], null));
3559             }
3560           }
3561         }
3562         LOG.info("Made 4 regions");
3563         // The splits should have been even. Test I can get some arbitrary row
3564         // out of each.
3565         int interval = (LAST_CHAR - FIRST_CHAR) / 3;
3566         byte[] b = Bytes.toBytes(START_KEY);
3567         for (HRegion r : sortedMap.values()) {
3568           assertGet(r, fam3, b);
3569           b[0] += interval;
3570         }
3571       } finally {
3572         for (int i = 0; i < regions.length; i++) {
3573           try {
3574             regions[i].close();
3575           } catch (IOException e) {
3576             // Ignore.
3577           }
3578         }
3579       }
3580     } finally {
3581       HRegion.closeHRegion(this.region);
3582       this.region = null;
3583     }
3584   }
3585 
3586   @Test
3587   public void testSplitRegion() throws IOException {
3588     byte[] qualifier = Bytes.toBytes("qualifier");
3589     Configuration hc = initSplit();
3590     int numRows = 10;
3591     byte[][] families = { fam1, fam3 };
3592 
3593     // Setting up region
3594     String method = this.getName();
3595     this.region = initHRegion(tableName, method, hc, families);
3596 
3597     // Put data in region
3598     int startRow = 100;
3599     putData(startRow, numRows, qualifier, families);
3600     int splitRow = startRow + numRows;
3601     putData(splitRow, numRows, qualifier, families);
3602     region.flush(true);
3603 
3604     HRegion[] regions = null;
3605     try {
3606       regions = splitRegion(region, Bytes.toBytes("" + splitRow));
3607       // Opening the regions returned.
3608       for (int i = 0; i < regions.length; i++) {
3609         regions[i] = HRegion.openHRegion(regions[i], null);
3610       }
3611       // Verifying that the region has been split
3612       assertEquals(2, regions.length);
3613 
3614       // Verifying that all data is still there and that data is in the right
3615       // place
3616       verifyData(regions[0], startRow, numRows, qualifier, families);
3617       verifyData(regions[1], splitRow, numRows, qualifier, families);
3618 
3619     } finally {
3620       HRegion.closeHRegion(this.region);
3621       this.region = null;
3622     }
3623   }
3624 
3625   @Test
3626   public void testClearForceSplit() throws IOException {
3627     byte[] qualifier = Bytes.toBytes("qualifier");
3628     Configuration hc = initSplit();
3629     int numRows = 10;
3630     byte[][] families = { fam1, fam3 };
3631 
3632     // Setting up region
3633     String method = this.getName();
3634     this.region = initHRegion(tableName, method, hc, families);
3635 
3636     // Put data in region
3637     int startRow = 100;
3638     putData(startRow, numRows, qualifier, families);
3639     int splitRow = startRow + numRows;
3640     byte[] splitRowBytes = Bytes.toBytes("" + splitRow);
3641     putData(splitRow, numRows, qualifier, families);
3642     region.flush(true);
3643 
3644     HRegion[] regions = null;
3645     try {
3646       // Set force split
3647       region.forceSplit(splitRowBytes);
3648       assertTrue(region.shouldForceSplit());
3649       // Split point should be the force split row
3650       assertTrue(Bytes.equals(splitRowBytes, region.checkSplit()));
3651 
3652       // Add a store that has references.
3653       HStore storeMock = Mockito.mock(HStore.class);
3654       when(storeMock.hasReferences()).thenReturn(true);
3655       when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf"));
3656       when(storeMock.close()).thenReturn(ImmutableList.<StoreFile>of());
3657       when(storeMock.getColumnFamilyName()).thenReturn("cf");
3658       region.stores.put(Bytes.toBytes(storeMock.getColumnFamilyName()), storeMock);
3659       assertTrue(region.hasReferences());
3660 
3661       // Will not split since the store has references.
3662       regions = splitRegion(region, splitRowBytes);
3663       assertNull(regions);
3664 
3665       // Region force split should be cleared after the split try.
3666       assertFalse(region.shouldForceSplit());
3667 
3668       // Remove the store that has references.
3669       region.stores.remove(Bytes.toBytes(storeMock.getColumnFamilyName()));
3670       assertFalse(region.hasReferences());
3671 
3672       // Now we can split.
3673       regions = splitRegion(region, splitRowBytes);
3674 
3675       // Opening the regions returned.
3676       for (int i = 0; i < regions.length; i++) {
3677         regions[i] = HRegion.openHRegion(regions[i], null);
3678       }
3679       // Verifying that the region has been split
3680       assertEquals(2, regions.length);
3681 
3682       // Verifying that all data is still there and that data is in the right
3683       // place
3684       verifyData(regions[0], startRow, numRows, qualifier, families);
3685       verifyData(regions[1], splitRow, numRows, qualifier, families);
3686 
3687     } finally {
3688       HRegion.closeHRegion(this.region);
3689       this.region = null;
3690     }
3691   }
3692 
3693   /**
3694    * Flushes the cache in a thread while scanning. The tests verify that the
3695    * scan is coherent - e.g. the returned results are always of the same or
3696    * later update as the previous results.
3697    *
3698    * @throws IOException
3699    *           scan / compact
3700    * @throws InterruptedException
3701    *           thread join
3702    */
3703   @Test
3704   public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3705     byte[] family = Bytes.toBytes("family");
3706     int numRows = 1000;
3707     int flushAndScanInterval = 10;
3708     int compactInterval = 10 * flushAndScanInterval;
3709 
3710     String method = "testFlushCacheWhileScanning";
3711     this.region = initHRegion(tableName, method, CONF, family);
3712     try {
3713       FlushThread flushThread = new FlushThread();
3714       flushThread.start();
3715 
3716       Scan scan = new Scan();
3717       scan.addFamily(family);
3718       scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOp.EQUAL,
3719           new BinaryComparator(Bytes.toBytes(5L))));
3720 
3721       int expectedCount = 0;
3722       List<Cell> res = new ArrayList<Cell>();
3723 
3724       boolean toggle = true;
3725       for (long i = 0; i < numRows; i++) {
3726         Put put = new Put(Bytes.toBytes(i));
3727         put.setDurability(Durability.SKIP_WAL);
3728         put.add(family, qual1, Bytes.toBytes(i % 10));
3729         region.put(put);
3730 
3731         if (i != 0 && i % compactInterval == 0) {
3732           // System.out.println("iteration = " + i);
3733           region.compact(true);
3734         }
3735 
3736         if (i % 10 == 5L) {
3737           expectedCount++;
3738         }
3739 
3740         if (i != 0 && i % flushAndScanInterval == 0) {
3741           res.clear();
3742           InternalScanner scanner = region.getScanner(scan);
3743           if (toggle) {
3744             flushThread.flush();
3745           }
3746           while (scanner.next(res))
3747             ;
3748           if (!toggle) {
3749             flushThread.flush();
3750           }
3751           assertEquals("i=" + i, expectedCount, res.size());
3752           toggle = !toggle;
3753         }
3754       }
3755 
3756       flushThread.done();
3757       flushThread.join();
3758       flushThread.checkNoError();
3759     } finally {
3760       HRegion.closeHRegion(this.region);
3761       this.region = null;
3762     }
3763   }
3764 
3765   protected class FlushThread extends Thread {
3766     private volatile boolean done;
3767     private Throwable error = null;
3768 
3769     FlushThread() {
3770       super("FlushThread");
3771     }
3772 
3773     public void done() {
3774       done = true;
3775       synchronized (this) {
3776         interrupt();
3777       }
3778     }
3779 
3780     public void checkNoError() {
3781       if (error != null) {
3782         assertNull(error);
3783       }
3784     }
3785 
3786     @Override
3787     public void run() {
3788       done = false;
3789       while (!done) {
3790         synchronized (this) {
3791           try {
3792             wait();
3793           } catch (InterruptedException ignored) {
3794             if (done) {
3795               break;
3796             }
3797           }
3798         }
3799         try {
3800           region.flush(true);
3801         } catch (IOException e) {
3802           if (!done) {
3803             LOG.error("Error while flushing cache", e);
3804             error = e;
3805           }
3806           break;
3807         } catch (Throwable t) {
3808           LOG.error("Uncaught exception", t);
3809           throw t;
3810         }
3811       }
3812     }
3813 
3814     public void flush() {
3815       synchronized (this) {
3816         notify();
3817       }
3818     }
3819   }
3820 
3821   /**
3822    * Writes very wide records and scans for the latest every time.. Flushes and
3823    * compacts the region every now and then to keep things realistic.
3824    *
3825    * @throws IOException
3826    *           by flush / scan / compaction
3827    * @throws InterruptedException
3828    *           when joining threads
3829    */
3830   @Test
3831   public void testWritesWhileScanning() throws IOException, InterruptedException {
3832     int testCount = 100;
3833     int numRows = 1;
3834     int numFamilies = 10;
3835     int numQualifiers = 100;
3836     int flushInterval = 7;
3837     int compactInterval = 5 * flushInterval;
3838     byte[][] families = new byte[numFamilies][];
3839     for (int i = 0; i < numFamilies; i++) {
3840       families[i] = Bytes.toBytes("family" + i);
3841     }
3842     byte[][] qualifiers = new byte[numQualifiers][];
3843     for (int i = 0; i < numQualifiers; i++) {
3844       qualifiers[i] = Bytes.toBytes("qual" + i);
3845     }
3846 
3847     String method = "testWritesWhileScanning";
3848     this.region = initHRegion(tableName, method, CONF, families);
3849     try {
3850       PutThread putThread = new PutThread(numRows, families, qualifiers);
3851       putThread.start();
3852       putThread.waitForFirstPut();
3853 
3854       FlushThread flushThread = new FlushThread();
3855       flushThread.start();
3856 
3857       Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
3858 
3859       int expectedCount = numFamilies * numQualifiers;
3860       List<Cell> res = new ArrayList<Cell>();
3861 
3862       long prevTimestamp = 0L;
3863       for (int i = 0; i < testCount; i++) {
3864 
3865         if (i != 0 && i % compactInterval == 0) {
3866           region.compact(true);
3867         }
3868 
3869         if (i != 0 && i % flushInterval == 0) {
3870           flushThread.flush();
3871         }
3872 
3873         boolean previousEmpty = res.isEmpty();
3874         res.clear();
3875         InternalScanner scanner = region.getScanner(scan);
3876         while (scanner.next(res))
3877           ;
3878         if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3879           assertEquals("i=" + i, expectedCount, res.size());
3880           long timestamp = res.get(0).getTimestamp();
3881           assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3882               timestamp >= prevTimestamp);
3883           prevTimestamp = timestamp;
3884         }
3885       }
3886 
3887       putThread.done();
3888 
3889       region.flush(true);
3890 
3891       putThread.join();
3892       putThread.checkNoError();
3893 
3894       flushThread.done();
3895       flushThread.join();
3896       flushThread.checkNoError();
3897     } finally {
3898       try {
3899         LOG.info("Before close: " + this.region.getMVCC());
3900         HRegion.closeHRegion(this.region);
3901       } catch (DroppedSnapshotException dse) {
3902         // We could get this on way out because we interrupt the background flusher and it could
3903         // fail anywhere causing a DSE over in the background flusher... only it is not properly
3904         // dealt with so could still be memory hanging out when we get to here -- memory we can't
3905         // flush because the accounting is 'off' since original DSE.
3906       }
3907       this.region = null;
3908     }
3909   }
3910 
3911   protected class PutThread extends Thread {
3912     private volatile boolean done;
3913     private volatile int numPutsFinished = 0;
3914 
3915     private Throwable error = null;
3916     private int numRows;
3917     private byte[][] families;
3918     private byte[][] qualifiers;
3919 
3920     private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3921       super("PutThread");
3922       this.numRows = numRows;
3923       this.families = families;
3924       this.qualifiers = qualifiers;
3925     }
3926 
3927     /**
3928      * Block until this thread has put at least one row.
3929      */
3930     public void waitForFirstPut() throws InterruptedException {
3931       // wait until put thread actually puts some data
3932       while (numPutsFinished == 0) {
3933         checkNoError();
3934         Thread.sleep(50);
3935       }
3936     }
3937 
3938     public void done() {
3939       done = true;
3940       synchronized (this) {
3941         interrupt();
3942       }
3943     }
3944 
3945     public void checkNoError() {
3946       if (error != null) {
3947         assertNull(error);
3948       }
3949     }
3950 
3951     @Override
3952     public void run() {
3953       done = false;
3954       while (!done) {
3955         try {
3956           for (int r = 0; r < numRows; r++) {
3957             byte[] row = Bytes.toBytes("row" + r);
3958             Put put = new Put(row);
3959             put.setDurability(Durability.SKIP_WAL);
3960             byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3961             for (byte[] family : families) {
3962               for (byte[] qualifier : qualifiers) {
3963                 put.add(family, qualifier, (long) numPutsFinished, value);
3964               }
3965             }
3966             region.put(put);
3967             numPutsFinished++;
3968             if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
3969               System.out.println("put iteration = " + numPutsFinished);
3970               Delete delete = new Delete(row, (long) numPutsFinished - 30);
3971               region.delete(delete);
3972             }
3973             numPutsFinished++;
3974           }
3975         } catch (InterruptedIOException e) {
3976           // This is fine. It means we are done, or didn't get the lock on time
3977           LOG.info("Interrupted", e);
3978         } catch (IOException e) {
3979           LOG.error("Error while putting records", e);
3980           error = e;
3981           break;
3982         }
3983       }
3984 
3985     }
3986 
3987   }
3988 
3989   /**
3990    * Writes very wide records and gets the latest row every time.. Flushes and
3991    * compacts the region aggressivly to catch issues.
3992    *
3993    * @throws IOException
3994    *           by flush / scan / compaction
3995    * @throws InterruptedException
3996    *           when joining threads
3997    */
3998   @Test
3999   public void testWritesWhileGetting() throws Exception {
4000     int testCount = 100;
4001     int numRows = 1;
4002     int numFamilies = 10;
4003     int numQualifiers = 100;
4004     int compactInterval = 100;
4005     byte[][] families = new byte[numFamilies][];
4006     for (int i = 0; i < numFamilies; i++) {
4007       families[i] = Bytes.toBytes("family" + i);
4008     }
4009     byte[][] qualifiers = new byte[numQualifiers][];
4010     for (int i = 0; i < numQualifiers; i++) {
4011       qualifiers[i] = Bytes.toBytes("qual" + i);
4012     }
4013 
4014 
4015     String method = "testWritesWhileGetting";
4016     // This test flushes constantly and can cause many files to be created,
4017     // possibly
4018     // extending over the ulimit. Make sure compactions are aggressive in
4019     // reducing
4020     // the number of HFiles created.
4021     Configuration conf = HBaseConfiguration.create(CONF);
4022     conf.setInt("hbase.hstore.compaction.min", 1);
4023     conf.setInt("hbase.hstore.compaction.max", 1000);
4024     this.region = initHRegion(tableName, method, conf, families);
4025     PutThread putThread = null;
4026     MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
4027     try {
4028       putThread = new PutThread(numRows, families, qualifiers);
4029       putThread.start();
4030       putThread.waitForFirstPut();
4031 
4032       // Add a thread that flushes as fast as possible
4033       ctx.addThread(new RepeatingTestThread(ctx) {
4034         private int flushesSinceCompact = 0;
4035         private final int maxFlushesSinceCompact = 20;
4036 
4037         @Override
4038         public void doAnAction() throws Exception {
4039           if (region.flush(true).isCompactionNeeded()) {
4040             ++flushesSinceCompact;
4041           }
4042           // Compact regularly to avoid creating too many files and exceeding
4043           // the ulimit.
4044           if (flushesSinceCompact == maxFlushesSinceCompact) {
4045             region.compact(false);
4046             flushesSinceCompact = 0;
4047           }
4048         }
4049       });
4050       ctx.startThreads();
4051 
4052       Get get = new Get(Bytes.toBytes("row0"));
4053       Result result = null;
4054 
4055       int expectedCount = numFamilies * numQualifiers;
4056 
4057       long prevTimestamp = 0L;
4058       for (int i = 0; i < testCount; i++) {
4059 
4060         boolean previousEmpty = result == null || result.isEmpty();
4061         result = region.get(get);
4062         if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
4063           assertEquals("i=" + i, expectedCount, result.size());
4064           // TODO this was removed, now what dangit?!
4065           // search looking for the qualifier in question?
4066           long timestamp = 0;
4067           for (Cell kv : result.rawCells()) {
4068             if (CellUtil.matchingFamily(kv, families[0])
4069                 && CellUtil.matchingQualifier(kv, qualifiers[0])) {
4070               timestamp = kv.getTimestamp();
4071             }
4072           }
4073           assertTrue(timestamp >= prevTimestamp);
4074           prevTimestamp = timestamp;
4075           Cell previousKV = null;
4076 
4077           for (Cell kv : result.rawCells()) {
4078             byte[] thisValue = CellUtil.cloneValue(kv);
4079             if (previousKV != null) {
4080               if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
4081                 LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
4082                     + "(memStoreTS:" + previousKV.getMvccVersion() + ")" + ", New KV: " + kv
4083                     + "(memStoreTS:" + kv.getMvccVersion() + ")");
4084                 assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
4085               }
4086             }
4087             previousKV = kv;
4088           }
4089         }
4090       }
4091     } finally {
4092       if (putThread != null)
4093         putThread.done();
4094 
4095       region.flush(true);
4096 
4097       if (putThread != null) {
4098         putThread.join();
4099         putThread.checkNoError();
4100       }
4101 
4102       ctx.stop();
4103       HRegion.closeHRegion(this.region);
4104       this.region = null;
4105     }
4106   }
4107 
4108   @Test
4109   public void testHolesInMeta() throws Exception {
4110     byte[] family = Bytes.toBytes("family");
4111     this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
4112         false, family);
4113     try {
4114       byte[] rowNotServed = Bytes.toBytes("a");
4115       Get g = new Get(rowNotServed);
4116       try {
4117         region.get(g);
4118         fail();
4119       } catch (WrongRegionException x) {
4120         // OK
4121       }
4122       byte[] row = Bytes.toBytes("y");
4123       g = new Get(row);
4124       region.get(g);
4125     } finally {
4126       HRegion.closeHRegion(this.region);
4127       this.region = null;
4128     }
4129   }
4130 
4131   @Test
4132   public void testIndexesScanWithOneDeletedRow() throws IOException {
4133     byte[] family = Bytes.toBytes("family");
4134 
4135     // Setting up region
4136     String method = "testIndexesScanWithOneDeletedRow";
4137     this.region = initHRegion(tableName, method, CONF, family);
4138     try {
4139       Put put = new Put(Bytes.toBytes(1L));
4140       put.add(family, qual1, 1L, Bytes.toBytes(1L));
4141       region.put(put);
4142 
4143       region.flush(true);
4144 
4145       Delete delete = new Delete(Bytes.toBytes(1L), 1L);
4146       region.delete(delete);
4147 
4148       put = new Put(Bytes.toBytes(2L));
4149       put.add(family, qual1, 2L, Bytes.toBytes(2L));
4150       region.put(put);
4151 
4152       Scan idxScan = new Scan();
4153       idxScan.addFamily(family);
4154       idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
4155           new SingleColumnValueFilter(family, qual1, CompareOp.GREATER_OR_EQUAL,
4156               new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
4157               CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
4158       InternalScanner scanner = region.getScanner(idxScan);
4159       List<Cell> res = new ArrayList<Cell>();
4160 
4161       while (scanner.next(res))
4162         ;
4163       assertEquals(1L, res.size());
4164     } finally {
4165       HRegion.closeHRegion(this.region);
4166       this.region = null;
4167     }
4168   }
4169 
4170   // ////////////////////////////////////////////////////////////////////////////
4171   // Bloom filter test
4172   // ////////////////////////////////////////////////////////////////////////////
4173   @Test
4174   public void testBloomFilterSize() throws IOException {
4175     byte[] fam1 = Bytes.toBytes("fam1");
4176     byte[] qf1 = Bytes.toBytes("col");
4177     byte[] val1 = Bytes.toBytes("value1");
4178     // Create Table
4179     HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE)
4180         .setBloomFilterType(BloomType.ROWCOL);
4181 
4182     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
4183     htd.addFamily(hcd);
4184     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4185     this.region = TEST_UTIL.createLocalHRegion(info, htd);
4186     try {
4187       int num_unique_rows = 10;
4188       int duplicate_multiplier = 2;
4189       int num_storefiles = 4;
4190 
4191       int version = 0;
4192       for (int f = 0; f < num_storefiles; f++) {
4193         for (int i = 0; i < duplicate_multiplier; i++) {
4194           for (int j = 0; j < num_unique_rows; j++) {
4195             Put put = new Put(Bytes.toBytes("row" + j));
4196             put.setDurability(Durability.SKIP_WAL);
4197             put.add(fam1, qf1, version++, val1);
4198             region.put(put);
4199           }
4200         }
4201         region.flush(true);
4202       }
4203       // before compaction
4204       HStore store = (HStore) region.getStore(fam1);
4205       Collection<StoreFile> storeFiles = store.getStorefiles();
4206       for (StoreFile storefile : storeFiles) {
4207         StoreFile.Reader reader = storefile.getReader();
4208         reader.loadFileInfo();
4209         reader.loadBloomfilter();
4210         assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
4211         assertEquals(num_unique_rows, reader.getFilterEntries());
4212       }
4213 
4214       region.compact(true);
4215 
4216       // after compaction
4217       storeFiles = store.getStorefiles();
4218       for (StoreFile storefile : storeFiles) {
4219         StoreFile.Reader reader = storefile.getReader();
4220         reader.loadFileInfo();
4221         reader.loadBloomfilter();
4222         assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
4223         assertEquals(num_unique_rows, reader.getFilterEntries());
4224       }
4225     } finally {
4226       HRegion.closeHRegion(this.region);
4227       this.region = null;
4228     }
4229   }
4230 
4231   @Test
4232   public void testAllColumnsWithBloomFilter() throws IOException {
4233     byte[] TABLE = Bytes.toBytes("testAllColumnsWithBloomFilter");
4234     byte[] FAMILY = Bytes.toBytes("family");
4235 
4236     // Create table
4237     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE)
4238         .setBloomFilterType(BloomType.ROWCOL);
4239     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
4240     htd.addFamily(hcd);
4241     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4242     this.region = TEST_UTIL.createLocalHRegion(info, htd);
4243     try {
4244       // For row:0, col:0: insert versions 1 through 5.
4245       byte row[] = Bytes.toBytes("row:" + 0);
4246       byte column[] = Bytes.toBytes("column:" + 0);
4247       Put put = new Put(row);
4248       put.setDurability(Durability.SKIP_WAL);
4249       for (long idx = 1; idx <= 4; idx++) {
4250         put.add(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
4251       }
4252       region.put(put);
4253 
4254       // Flush
4255       region.flush(true);
4256 
4257       // Get rows
4258       Get get = new Get(row);
4259       get.setMaxVersions();
4260       Cell[] kvs = region.get(get).rawCells();
4261 
4262       // Check if rows are correct
4263       assertEquals(4, kvs.length);
4264       checkOneCell(kvs[0], FAMILY, 0, 0, 4);
4265       checkOneCell(kvs[1], FAMILY, 0, 0, 3);
4266       checkOneCell(kvs[2], FAMILY, 0, 0, 2);
4267       checkOneCell(kvs[3], FAMILY, 0, 0, 1);
4268     } finally {
4269       HRegion.closeHRegion(this.region);
4270       this.region = null;
4271     }
4272   }
4273 
4274   /**
4275    * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
4276    * issuing delete row on columns with bloom filter set to row+col
4277    * (BloomType.ROWCOL)
4278    */
4279   @Test
4280   public void testDeleteRowWithBloomFilter() throws IOException {
4281     byte[] familyName = Bytes.toBytes("familyName");
4282 
4283     // Create Table
4284     HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE)
4285         .setBloomFilterType(BloomType.ROWCOL);
4286 
4287     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
4288     htd.addFamily(hcd);
4289     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4290     this.region = TEST_UTIL.createLocalHRegion(info, htd);
4291     try {
4292       // Insert some data
4293       byte row[] = Bytes.toBytes("row1");
4294       byte col[] = Bytes.toBytes("col1");
4295 
4296       Put put = new Put(row);
4297       put.add(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
4298       region.put(put);
4299       region.flush(true);
4300 
4301       Delete del = new Delete(row);
4302       region.delete(del);
4303       region.flush(true);
4304 
4305       // Get remaining rows (should have none)
4306       Get get = new Get(row);
4307       get.addColumn(familyName, col);
4308 
4309       Cell[] keyValues = region.get(get).rawCells();
4310       assertTrue(keyValues.length == 0);
4311     } finally {
4312       HRegion.closeHRegion(this.region);
4313       this.region = null;
4314     }
4315   }
4316 
4317   @Test
4318   public void testgetHDFSBlocksDistribution() throws Exception {
4319     HBaseTestingUtility htu = new HBaseTestingUtility();
4320     // Why do we set the block size in this test?  If we set it smaller than the kvs, then we'll
4321     // break up the file in to more pieces that can be distributed across the three nodes and we
4322     // won't be able to have the condition this test asserts; that at least one node has
4323     // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4324     // the three nodes.  hfilev3 with tags seems to put us over the block size.  St.Ack.
4325     // final int DEFAULT_BLOCK_SIZE = 1024;
4326     // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4327     htu.getConfiguration().setInt("dfs.replication", 2);
4328 
4329     // set up a cluster with 3 nodes
4330     MiniHBaseCluster cluster = null;
4331     String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
4332     int regionServersCount = 3;
4333 
4334     try {
4335       cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts);
4336       byte[][] families = { fam1, fam2 };
4337       Table ht = htu.createTable(Bytes.toBytes(this.getName()), families);
4338 
4339       // Setting up region
4340       byte row[] = Bytes.toBytes("row1");
4341       byte col[] = Bytes.toBytes("col1");
4342 
4343       Put put = new Put(row);
4344       put.add(fam1, col, 1, Bytes.toBytes("test1"));
4345       put.add(fam2, col, 1, Bytes.toBytes("test2"));
4346       ht.put(put);
4347 
4348       HRegion firstRegion = htu.getHBaseCluster().getRegions(TableName.valueOf(this.getName()))
4349           .get(0);
4350       firstRegion.flush(true);
4351       HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
4352 
4353       // Given the default replication factor is 2 and we have 2 HFiles,
4354       // we will have total of 4 replica of blocks on 3 datanodes; thus there
4355       // must be at least one host that have replica for 2 HFiles. That host's
4356       // weight will be equal to the unique block weight.
4357       long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
4358       StringBuilder sb = new StringBuilder();
4359       for (String host: blocksDistribution1.getTopHosts()) {
4360         if (sb.length() > 0) sb.append(", ");
4361         sb.append(host);
4362         sb.append("=");
4363         sb.append(blocksDistribution1.getWeight(host));
4364       }
4365 
4366       String topHost = blocksDistribution1.getTopHosts().get(0);
4367       long topHostWeight = blocksDistribution1.getWeight(topHost);
4368       String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" +
4369         topHostWeight + ", topHost=" + topHost + "; " + sb.toString();
4370       LOG.info(msg);
4371       assertTrue(msg, uniqueBlocksWeight1 == topHostWeight);
4372 
4373       // use the static method to compute the value, it should be the same.
4374       // static method is used by load balancer or other components
4375       HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
4376           htu.getConfiguration(), firstRegion.getTableDesc(), firstRegion.getRegionInfo());
4377       long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
4378 
4379       assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
4380 
4381       ht.close();
4382     } finally {
4383       if (cluster != null) {
4384         htu.shutdownMiniCluster();
4385       }
4386     }
4387   }
4388 
4389   /**
4390    * Testcase to check state of region initialization task set to ABORTED or not
4391    * if any exceptions during initialization
4392    *
4393    * @throws Exception
4394    */
4395   @Test
4396   public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
4397     TableName tableName = TableName.valueOf(name.getMethodName());
4398     HRegionInfo info = null;
4399     try {
4400       FileSystem fs = Mockito.mock(FileSystem.class);
4401       Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
4402       HTableDescriptor htd = new HTableDescriptor(tableName);
4403       htd.addFamily(new HColumnDescriptor("cf"));
4404       info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
4405           HConstants.EMPTY_BYTE_ARRAY, false);
4406       Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4407       region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
4408       // region initialization throws IOException and set task state to ABORTED.
4409       region.initialize();
4410       fail("Region initialization should fail due to IOException");
4411     } catch (IOException io) {
4412       List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
4413       for (MonitoredTask monitoredTask : tasks) {
4414         if (!(monitoredTask instanceof MonitoredRPCHandler)
4415             && monitoredTask.getDescription().contains(region.toString())) {
4416           assertTrue("Region state should be ABORTED.",
4417               monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
4418           break;
4419         }
4420       }
4421     } finally {
4422       HRegion.closeHRegion(region);
4423     }
4424   }
4425 
4426   /**
4427    * Verifies that the .regioninfo file is written on region creation and that
4428    * is recreated if missing during region opening.
4429    */
4430   @Test
4431   public void testRegionInfoFileCreation() throws IOException {
4432     Path rootDir = new Path(dir + "testRegionInfoFileCreation");
4433 
4434     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testtb"));
4435     htd.addFamily(new HColumnDescriptor("cf"));
4436 
4437     HRegionInfo hri = new HRegionInfo(htd.getTableName());
4438 
4439     // Create a region and skip the initialization (like CreateTableHandler)
4440     HRegion region = HRegion.createHRegion(hri, rootDir, CONF, htd, null, false, true);
4441 //    HRegion region = TEST_UTIL.createLocalHRegion(hri, htd);
4442     Path regionDir = region.getRegionFileSystem().getRegionDir();
4443     FileSystem fs = region.getRegionFileSystem().getFileSystem();
4444     HRegion.closeHRegion(region);
4445 
4446     Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
4447 
4448     // Verify that the .regioninfo file is present
4449     assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4450         fs.exists(regionInfoFile));
4451 
4452     // Try to open the region
4453     region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4454     assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4455     HRegion.closeHRegion(region);
4456 
4457     // Verify that the .regioninfo file is still there
4458     assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4459         fs.exists(regionInfoFile));
4460 
4461     // Remove the .regioninfo file and verify is recreated on region open
4462     fs.delete(regionInfoFile, true);
4463     assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
4464         fs.exists(regionInfoFile));
4465 
4466     region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4467 //    region = TEST_UTIL.openHRegion(hri, htd);
4468     assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4469     HRegion.closeHRegion(region);
4470 
4471     // Verify that the .regioninfo file is still there
4472     assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4473         fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
4474   }
4475 
4476   /**
4477    * TestCase for increment
4478    */
4479   private static class Incrementer implements Runnable {
4480     private HRegion region;
4481     private final static byte[] incRow = Bytes.toBytes("incRow");
4482     private final static byte[] family = Bytes.toBytes("family");
4483     private final static byte[] qualifier = Bytes.toBytes("qualifier");
4484     private final static long ONE = 1l;
4485     private int incCounter;
4486 
4487     public Incrementer(HRegion region, int incCounter) {
4488       this.region = region;
4489       this.incCounter = incCounter;
4490     }
4491 
4492     @Override
4493     public void run() {
4494       int count = 0;
4495       while (count < incCounter) {
4496         Increment inc = new Increment(incRow);
4497         inc.addColumn(family, qualifier, ONE);
4498         count++;
4499         try {
4500           region.increment(inc);
4501         } catch (IOException e) {
4502           e.printStackTrace();
4503           break;
4504         }
4505       }
4506     }
4507   }
4508 
4509   /**
4510    * Test case to check increment function with memstore flushing
4511    * @throws Exception
4512    */
4513   @Test
4514   public void testParallelIncrementWithMemStoreFlush() throws Exception {
4515     byte[] family = Incrementer.family;
4516     this.region = initHRegion(tableName, method, CONF, family);
4517     final HRegion region = this.region;
4518     final AtomicBoolean incrementDone = new AtomicBoolean(false);
4519     Runnable flusher = new Runnable() {
4520       @Override
4521       public void run() {
4522         while (!incrementDone.get()) {
4523           try {
4524             region.flush(true);
4525           } catch (Exception e) {
4526             e.printStackTrace();
4527           }
4528         }
4529       }
4530     };
4531 
4532     // after all increment finished, the row will increment to 20*100 = 2000
4533     int threadNum = 20;
4534     int incCounter = 100;
4535     long expected = threadNum * incCounter;
4536     Thread[] incrementers = new Thread[threadNum];
4537     Thread flushThread = new Thread(flusher);
4538     for (int i = 0; i < threadNum; i++) {
4539       incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
4540       incrementers[i].start();
4541     }
4542     flushThread.start();
4543     for (int i = 0; i < threadNum; i++) {
4544       incrementers[i].join();
4545     }
4546 
4547     incrementDone.set(true);
4548     flushThread.join();
4549 
4550     Get get = new Get(Incrementer.incRow);
4551     get.addColumn(Incrementer.family, Incrementer.qualifier);
4552     get.setMaxVersions(1);
4553     Result res = this.region.get(get);
4554     List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
4555 
4556     // we just got the latest version
4557     assertEquals(kvs.size(), 1);
4558     Cell kv = kvs.get(0);
4559     assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
4560     this.region = null;
4561   }
4562 
4563   /**
4564    * TestCase for append
4565    */
4566   private static class Appender implements Runnable {
4567     private HRegion region;
4568     private final static byte[] appendRow = Bytes.toBytes("appendRow");
4569     private final static byte[] family = Bytes.toBytes("family");
4570     private final static byte[] qualifier = Bytes.toBytes("qualifier");
4571     private final static byte[] CHAR = Bytes.toBytes("a");
4572     private int appendCounter;
4573 
4574     public Appender(HRegion region, int appendCounter) {
4575       this.region = region;
4576       this.appendCounter = appendCounter;
4577     }
4578 
4579     @Override
4580     public void run() {
4581       int count = 0;
4582       while (count < appendCounter) {
4583         Append app = new Append(appendRow);
4584         app.add(family, qualifier, CHAR);
4585         count++;
4586         try {
4587           region.append(app);
4588         } catch (IOException e) {
4589           e.printStackTrace();
4590           break;
4591         }
4592       }
4593     }
4594   }
4595 
4596   /**
4597    * Test case to check append function with memstore flushing
4598    * @throws Exception
4599    */
4600   @Test
4601   public void testParallelAppendWithMemStoreFlush() throws Exception {
4602     byte[] family = Appender.family;
4603     this.region = initHRegion(tableName, method, CONF, family);
4604     final HRegion region = this.region;
4605     final AtomicBoolean appendDone = new AtomicBoolean(false);
4606     Runnable flusher = new Runnable() {
4607       @Override
4608       public void run() {
4609         while (!appendDone.get()) {
4610           try {
4611             region.flush(true);
4612           } catch (Exception e) {
4613             e.printStackTrace();
4614           }
4615         }
4616       }
4617     };
4618 
4619     // after all append finished, the value will append to threadNum *
4620     // appendCounter Appender.CHAR
4621     int threadNum = 20;
4622     int appendCounter = 100;
4623     byte[] expected = new byte[threadNum * appendCounter];
4624     for (int i = 0; i < threadNum * appendCounter; i++) {
4625       System.arraycopy(Appender.CHAR, 0, expected, i, 1);
4626     }
4627     Thread[] appenders = new Thread[threadNum];
4628     Thread flushThread = new Thread(flusher);
4629     for (int i = 0; i < threadNum; i++) {
4630       appenders[i] = new Thread(new Appender(this.region, appendCounter));
4631       appenders[i].start();
4632     }
4633     flushThread.start();
4634     for (int i = 0; i < threadNum; i++) {
4635       appenders[i].join();
4636     }
4637 
4638     appendDone.set(true);
4639     flushThread.join();
4640 
4641     Get get = new Get(Appender.appendRow);
4642     get.addColumn(Appender.family, Appender.qualifier);
4643     get.setMaxVersions(1);
4644     Result res = this.region.get(get);
4645     List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
4646 
4647     // we just got the latest version
4648     assertEquals(kvs.size(), 1);
4649     Cell kv = kvs.get(0);
4650     byte[] appendResult = new byte[kv.getValueLength()];
4651     System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
4652     assertArrayEquals(expected, appendResult);
4653     this.region = null;
4654   }
4655 
4656   /**
4657    * Test case to check put function with memstore flushing for same row, same ts
4658    * @throws Exception
4659    */
4660   @Test
4661   public void testPutWithMemStoreFlush() throws Exception {
4662     byte[] family = Bytes.toBytes("family");
4663     ;
4664     byte[] qualifier = Bytes.toBytes("qualifier");
4665     byte[] row = Bytes.toBytes("putRow");
4666     byte[] value = null;
4667     this.region = initHRegion(tableName, method, CONF, family);
4668     Put put = null;
4669     Get get = null;
4670     List<Cell> kvs = null;
4671     Result res = null;
4672 
4673     put = new Put(row);
4674     value = Bytes.toBytes("value0");
4675     put.add(family, qualifier, 1234567l, value);
4676     region.put(put);
4677     get = new Get(row);
4678     get.addColumn(family, qualifier);
4679     get.setMaxVersions();
4680     res = this.region.get(get);
4681     kvs = res.getColumnCells(family, qualifier);
4682     assertEquals(1, kvs.size());
4683     assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4684 
4685     region.flush(true);
4686     get = new Get(row);
4687     get.addColumn(family, qualifier);
4688     get.setMaxVersions();
4689     res = this.region.get(get);
4690     kvs = res.getColumnCells(family, qualifier);
4691     assertEquals(1, kvs.size());
4692     assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4693 
4694     put = new Put(row);
4695     value = Bytes.toBytes("value1");
4696     put.add(family, qualifier, 1234567l, value);
4697     region.put(put);
4698     get = new Get(row);
4699     get.addColumn(family, qualifier);
4700     get.setMaxVersions();
4701     res = this.region.get(get);
4702     kvs = res.getColumnCells(family, qualifier);
4703     assertEquals(1, kvs.size());
4704     assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4705 
4706     region.flush(true);
4707     get = new Get(row);
4708     get.addColumn(family, qualifier);
4709     get.setMaxVersions();
4710     res = this.region.get(get);
4711     kvs = res.getColumnCells(family, qualifier);
4712     assertEquals(1, kvs.size());
4713     assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4714   }
4715 
4716   @Test
4717   public void testDurability() throws Exception {
4718     String method = "testDurability";
4719     // there are 5 x 5 cases:
4720     // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4721     // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4722 
4723     // expected cases for append and sync wal
4724     durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4725     durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4726     durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4727 
4728     durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4729     durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4730     durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4731 
4732     durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4733     durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4734 
4735     durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4736     durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4737 
4738     durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4739     durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4740     durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4741 
4742     // expected cases for async wal
4743     durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4744     durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4745     durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4746     durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4747     durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4748     durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4749 
4750     durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4751     durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4752     durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4753     durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4754     durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4755     durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4756 
4757     // expect skip wal cases
4758     durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
4759     durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
4760     durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
4761     durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, true, false, false);
4762     durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, true, false, false);
4763     durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4764 
4765   }
4766 
4767   private void durabilityTest(String method, Durability tableDurability,
4768       Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4769       final boolean expectSyncFromLogSyncer) throws Exception {
4770     Configuration conf = HBaseConfiguration.create(CONF);
4771     method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4772     TableName tableName = TableName.valueOf(method);
4773     byte[] family = Bytes.toBytes("family");
4774     Path logDir = new Path(new Path(dir + method), "log");
4775     final Configuration walConf = new Configuration(conf);
4776     FSUtils.setRootDir(walConf, logDir);
4777     final WALFactory wals = new WALFactory(walConf, null, UUID.randomUUID().toString());
4778     final WAL wal = spy(wals.getWAL(tableName.getName()));
4779     this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
4780         HConstants.EMPTY_END_ROW, method, conf, false, tableDurability, wal,
4781         new byte[][] { family });
4782 
4783     Put put = new Put(Bytes.toBytes("r1"));
4784     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4785     put.setDurability(mutationDurability);
4786     region.put(put);
4787 
4788     //verify append called or not
4789     verify(wal, expectAppend ? times(1) : never())
4790       .append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
4791           (WALEdit)any(), Mockito.anyBoolean());
4792 
4793     // verify sync called or not
4794     if (expectSync || expectSyncFromLogSyncer) {
4795       TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4796         @Override
4797         public boolean evaluate() throws Exception {
4798           try {
4799             if (expectSync) {
4800               verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
4801             } else if (expectSyncFromLogSyncer) {
4802               verify(wal, times(1)).sync(); // wal syncer calls this one
4803             }
4804           } catch (Throwable ignore) {
4805           }
4806           return true;
4807         }
4808       });
4809     } else {
4810       //verify(wal, never()).sync(anyLong());
4811       verify(wal, never()).sync();
4812     }
4813 
4814     HRegion.closeHRegion(this.region);
4815     this.region = null;
4816   }
4817 
4818   @Test
4819   public void testRegionReplicaSecondary() throws IOException {
4820     // create a primary region, load some data and flush
4821     // create a secondary region, and do a get against that
4822     Path rootDir = new Path(dir + "testRegionReplicaSecondary");
4823     FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4824 
4825     byte[][] families = new byte[][] {
4826         Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4827     };
4828     byte[] cq = Bytes.toBytes("cq");
4829     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary"));
4830     for (byte[] family : families) {
4831       htd.addFamily(new HColumnDescriptor(family));
4832     }
4833 
4834     long time = System.currentTimeMillis();
4835     HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4836       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4837       false, time, 0);
4838     HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4839       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4840       false, time, 1);
4841 
4842     HRegion primaryRegion = null, secondaryRegion = null;
4843 
4844     try {
4845       primaryRegion = HRegion.createHRegion(primaryHri,
4846         rootDir, TEST_UTIL.getConfiguration(), htd);
4847 
4848       // load some data
4849       putData(primaryRegion, 0, 1000, cq, families);
4850 
4851       // flush region
4852       primaryRegion.flush(true);
4853 
4854       // open secondary region
4855       secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4856 
4857       verifyData(secondaryRegion, 0, 1000, cq, families);
4858     } finally {
4859       if (primaryRegion != null) {
4860         HRegion.closeHRegion(primaryRegion);
4861       }
4862       if (secondaryRegion != null) {
4863         HRegion.closeHRegion(secondaryRegion);
4864       }
4865     }
4866   }
4867 
4868   @Test
4869   public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
4870     // create a primary region, load some data and flush
4871     // create a secondary region, and do a put against that
4872     Path rootDir = new Path(dir + "testRegionReplicaSecondary");
4873     FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4874 
4875     byte[][] families = new byte[][] {
4876         Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4877     };
4878     byte[] cq = Bytes.toBytes("cq");
4879     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary"));
4880     for (byte[] family : families) {
4881       htd.addFamily(new HColumnDescriptor(family));
4882     }
4883 
4884     long time = System.currentTimeMillis();
4885     HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4886       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4887       false, time, 0);
4888     HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4889       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4890       false, time, 1);
4891 
4892     HRegion primaryRegion = null, secondaryRegion = null;
4893 
4894     try {
4895       primaryRegion = HRegion.createHRegion(primaryHri,
4896         rootDir, TEST_UTIL.getConfiguration(), htd);
4897 
4898       // load some data
4899       putData(primaryRegion, 0, 1000, cq, families);
4900 
4901       // flush region
4902       primaryRegion.flush(true);
4903 
4904       // open secondary region
4905       secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4906 
4907       try {
4908         putData(secondaryRegion, 0, 1000, cq, families);
4909         fail("Should have thrown exception");
4910       } catch (IOException ex) {
4911         // expected
4912       }
4913     } finally {
4914       if (primaryRegion != null) {
4915         HRegion.closeHRegion(primaryRegion);
4916       }
4917       if (secondaryRegion != null) {
4918         HRegion.closeHRegion(secondaryRegion);
4919       }
4920     }
4921   }
4922 
4923   static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
4924     Configuration confForWAL = new Configuration(conf);
4925     confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4926     return new WALFactory(confForWAL,
4927         Collections.<WALActionsListener>singletonList(new MetricsWAL()),
4928         "hregion-" + RandomStringUtils.randomNumeric(8));
4929   }
4930 
4931   @Test
4932   public void testCompactionFromPrimary() throws IOException {
4933     Path rootDir = new Path(dir + "testRegionReplicaSecondary");
4934     FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4935 
4936     byte[][] families = new byte[][] {
4937         Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4938     };
4939     byte[] cq = Bytes.toBytes("cq");
4940     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary"));
4941     for (byte[] family : families) {
4942       htd.addFamily(new HColumnDescriptor(family));
4943     }
4944 
4945     long time = System.currentTimeMillis();
4946     HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4947       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4948       false, time, 0);
4949     HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4950       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4951       false, time, 1);
4952 
4953     HRegion primaryRegion = null, secondaryRegion = null;
4954 
4955     try {
4956       primaryRegion = HRegion.createHRegion(primaryHri,
4957         rootDir, TEST_UTIL.getConfiguration(), htd);
4958 
4959       // load some data
4960       putData(primaryRegion, 0, 1000, cq, families);
4961 
4962       // flush region
4963       primaryRegion.flush(true);
4964 
4965       // open secondary region
4966       secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4967 
4968       // move the file of the primary region to the archive, simulating a compaction
4969       Collection<StoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
4970       primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
4971       Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]);
4972       Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0);
4973 
4974       verifyData(secondaryRegion, 0, 1000, cq, families);
4975     } finally {
4976       if (primaryRegion != null) {
4977         HRegion.closeHRegion(primaryRegion);
4978       }
4979       if (secondaryRegion != null) {
4980         HRegion.closeHRegion(secondaryRegion);
4981       }
4982     }
4983   }
4984 
4985   private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4986     putData(this.region, startRow, numRows, qf, families);
4987   }
4988 
4989   private void putData(HRegion region,
4990       int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4991     putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
4992   }
4993 
4994   static void putData(HRegion region, Durability durability,
4995       int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4996     for (int i = startRow; i < startRow + numRows; i++) {
4997       Put put = new Put(Bytes.toBytes("" + i));
4998       put.setDurability(durability);
4999       for (byte[] family : families) {
5000         put.add(family, qf, null);
5001       }
5002       region.put(put);
5003     }
5004   }
5005 
5006   static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
5007       throws IOException {
5008     for (int i = startRow; i < startRow + numRows; i++) {
5009       byte[] row = Bytes.toBytes("" + i);
5010       Get get = new Get(row);
5011       for (byte[] family : families) {
5012         get.addColumn(family, qf);
5013       }
5014       Result result = newReg.get(get);
5015       Cell[] raw = result.rawCells();
5016       assertEquals(families.length, result.size());
5017       for (int j = 0; j < families.length; j++) {
5018         assertTrue(CellUtil.matchingRow(raw[j], row));
5019         assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
5020         assertTrue(CellUtil.matchingQualifier(raw[j], qf));
5021       }
5022     }
5023   }
5024 
5025   static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
5026     // Now I have k, get values out and assert they are as expected.
5027     Get get = new Get(k).addFamily(family).setMaxVersions();
5028     Cell[] results = r.get(get).rawCells();
5029     for (int j = 0; j < results.length; j++) {
5030       byte[] tmp = CellUtil.cloneValue(results[j]);
5031       // Row should be equal to value every time.
5032       assertTrue(Bytes.equals(k, tmp));
5033     }
5034   }
5035 
5036   /*
5037    * Assert first value in the passed region is <code>firstValue</code>.
5038    *
5039    * @param r
5040    *
5041    * @param fs
5042    *
5043    * @param firstValue
5044    *
5045    * @throws IOException
5046    */
5047   private void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
5048       throws IOException {
5049     byte[][] families = { fs };
5050     Scan scan = new Scan();
5051     for (int i = 0; i < families.length; i++)
5052       scan.addFamily(families[i]);
5053     InternalScanner s = r.getScanner(scan);
5054     try {
5055       List<Cell> curVals = new ArrayList<Cell>();
5056       boolean first = true;
5057       OUTER_LOOP: while (s.next(curVals)) {
5058         for (Cell kv : curVals) {
5059           byte[] val = CellUtil.cloneValue(kv);
5060           byte[] curval = val;
5061           if (first) {
5062             first = false;
5063             assertTrue(Bytes.compareTo(curval, firstValue) == 0);
5064           } else {
5065             // Not asserting anything. Might as well break.
5066             break OUTER_LOOP;
5067           }
5068         }
5069       }
5070     } finally {
5071       s.close();
5072     }
5073   }
5074 
5075   /**
5076    * Test that we get the expected flush results back
5077    * @throws IOException
5078    */
5079   @Test
5080   public void testFlushResult() throws IOException {
5081     String method = name.getMethodName();
5082     byte[] tableName = Bytes.toBytes(method);
5083     byte[] family = Bytes.toBytes("family");
5084 
5085     this.region = initHRegion(tableName, method, family);
5086 
5087     // empty memstore, flush doesn't run
5088     HRegion.FlushResult fr = region.flush(true);
5089     assertFalse(fr.isFlushSucceeded());
5090     assertFalse(fr.isCompactionNeeded());
5091 
5092     // Flush enough files to get up to the threshold, doesn't need compactions
5093     for (int i = 0; i < 2; i++) {
5094       Put put = new Put(tableName).add(family, family, tableName);
5095       region.put(put);
5096       fr = region.flush(true);
5097       assertTrue(fr.isFlushSucceeded());
5098       assertFalse(fr.isCompactionNeeded());
5099     }
5100 
5101     // Two flushes after the threshold, compactions are needed
5102     for (int i = 0; i < 2; i++) {
5103       Put put = new Put(tableName).add(family, family, tableName);
5104       region.put(put);
5105       fr = region.flush(true);
5106       assertTrue(fr.isFlushSucceeded());
5107       assertTrue(fr.isCompactionNeeded());
5108     }
5109   }
5110 
5111   private Configuration initSplit() {
5112     // Always compact if there is more than one store file.
5113     CONF.setInt("hbase.hstore.compactionThreshold", 2);
5114 
5115     // Make lease timeout longer, lease checks less frequent
5116     CONF.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
5117 
5118     CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
5119 
5120     // Increase the amount of time between client retries
5121     CONF.setLong("hbase.client.pause", 15 * 1000);
5122 
5123     // This size should make it so we always split using the addContent
5124     // below. After adding all data, the first region is 1.3M
5125     CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
5126     return CONF;
5127   }
5128 
5129   /**
5130    * @param tableName
5131    * @param callingMethod
5132    * @param conf
5133    * @param families
5134    * @throws IOException
5135    * @return A region on which you must call
5136    *         {@link HRegion#closeHRegion(HRegion)} when done.
5137    */
5138   public static HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5139       byte[]... families) throws IOException {
5140     return initHRegion(tableName.getName(), null, null, callingMethod, conf, false, families);
5141   }
5142 
5143   /**
5144    * @param tableName
5145    * @param callingMethod
5146    * @param conf
5147    * @param families
5148    * @throws IOException
5149    * @return A region on which you must call
5150    *         {@link HRegion#closeHRegion(HRegion)} when done.
5151    */
5152   public static HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf,
5153       byte[]... families) throws IOException {
5154     return initHRegion(tableName, null, null, callingMethod, conf, false, families);
5155   }
5156 
5157   /**
5158    * @param tableName
5159    * @param callingMethod
5160    * @param conf
5161    * @param isReadOnly
5162    * @param families
5163    * @throws IOException
5164    * @return A region on which you must call
5165    *         {@link HRegion#closeHRegion(HRegion)} when done.
5166    */
5167   public static HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf,
5168       boolean isReadOnly, byte[]... families) throws IOException {
5169     return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
5170   }
5171 
5172   public static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
5173       String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
5174       throws IOException {
5175     return initHRegion(tableName, startKey, stopKey, callingMethod, conf, isReadOnly,
5176         Durability.SYNC_WAL, null, families);
5177   }
5178 
5179   /**
5180    * @param tableName
5181    * @param startKey
5182    * @param stopKey
5183    * @param callingMethod
5184    * @param conf
5185    * @param isReadOnly
5186    * @param families
5187    * @throws IOException
5188    * @return A region on which you must call
5189    *         {@link HRegion#closeHRegion(HRegion)} when done.
5190    */
5191   public static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
5192       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
5193       WAL wal, byte[]... families) throws IOException {
5194     return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
5195         isReadOnly, durability, wal, families);
5196   }
5197 
5198   /**
5199    * Assert that the passed in Cell has expected contents for the specified row,
5200    * column & timestamp.
5201    */
5202   private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
5203     String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
5204     assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
5205         Bytes.toString(CellUtil.cloneRow(kv)));
5206     assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
5207         Bytes.toString(CellUtil.cloneFamily(kv)));
5208     assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
5209         Bytes.toString(CellUtil.cloneQualifier(kv)));
5210     assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
5211     assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
5212         Bytes.toString(CellUtil.cloneValue(kv)));
5213   }
5214 
5215   @Test (timeout=60000)
5216   public void testReverseScanner_FromMemStore_SingleCF_Normal()
5217       throws IOException {
5218     byte[] rowC = Bytes.toBytes("rowC");
5219     byte[] rowA = Bytes.toBytes("rowA");
5220     byte[] rowB = Bytes.toBytes("rowB");
5221     byte[] cf = Bytes.toBytes("CF");
5222     byte[][] families = { cf };
5223     byte[] col = Bytes.toBytes("C");
5224     long ts = 1;
5225     String method = this.getName();
5226     this.region = initHRegion(tableName, method, families);
5227     try {
5228       KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5229       KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5230           null);
5231       KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5232       KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5233       Put put = null;
5234       put = new Put(rowC);
5235       put.add(kv1);
5236       put.add(kv11);
5237       region.put(put);
5238       put = new Put(rowA);
5239       put.add(kv2);
5240       region.put(put);
5241       put = new Put(rowB);
5242       put.add(kv3);
5243       region.put(put);
5244 
5245       Scan scan = new Scan(rowC);
5246       scan.setMaxVersions(5);
5247       scan.setReversed(true);
5248       InternalScanner scanner = region.getScanner(scan);
5249       List<Cell> currRow = new ArrayList<Cell>();
5250       boolean hasNext = scanner.next(currRow);
5251       assertEquals(2, currRow.size());
5252       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
5253       assertTrue(hasNext);
5254       currRow.clear();
5255       hasNext = scanner.next(currRow);
5256       assertEquals(1, currRow.size());
5257       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
5258       assertTrue(hasNext);
5259       currRow.clear();
5260       hasNext = scanner.next(currRow);
5261       assertEquals(1, currRow.size());
5262       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
5263       assertFalse(hasNext);
5264       scanner.close();
5265     } finally {
5266       HRegion.closeHRegion(this.region);
5267       this.region = null;
5268     }
5269   }
5270 
5271   @Test (timeout=60000)
5272   public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
5273       throws IOException {
5274     byte[] rowC = Bytes.toBytes("rowC");
5275     byte[] rowA = Bytes.toBytes("rowA");
5276     byte[] rowB = Bytes.toBytes("rowB");
5277     byte[] rowD = Bytes.toBytes("rowD");
5278     byte[] cf = Bytes.toBytes("CF");
5279     byte[][] families = { cf };
5280     byte[] col = Bytes.toBytes("C");
5281     long ts = 1;
5282     String method = this.getName();
5283     this.region = initHRegion(tableName, method, families);
5284     try {
5285       KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5286       KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5287           null);
5288       KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5289       KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5290       Put put = null;
5291       put = new Put(rowC);
5292       put.add(kv1);
5293       put.add(kv11);
5294       region.put(put);
5295       put = new Put(rowA);
5296       put.add(kv2);
5297       region.put(put);
5298       put = new Put(rowB);
5299       put.add(kv3);
5300       region.put(put);
5301 
5302       Scan scan = new Scan(rowD);
5303       List<Cell> currRow = new ArrayList<Cell>();
5304       scan.setReversed(true);
5305       scan.setMaxVersions(5);
5306       InternalScanner scanner = region.getScanner(scan);
5307       boolean hasNext = scanner.next(currRow);
5308       assertEquals(2, currRow.size());
5309       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
5310       assertTrue(hasNext);
5311       currRow.clear();
5312       hasNext = scanner.next(currRow);
5313       assertEquals(1, currRow.size());
5314       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
5315       assertTrue(hasNext);
5316       currRow.clear();
5317       hasNext = scanner.next(currRow);
5318       assertEquals(1, currRow.size());
5319       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
5320       assertFalse(hasNext);
5321       scanner.close();
5322     } finally {
5323       HRegion.closeHRegion(this.region);
5324       this.region = null;
5325     }
5326   }
5327 
5328   @Test (timeout=60000)
5329   public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5330       throws IOException {
5331     byte[] rowC = Bytes.toBytes("rowC");
5332     byte[] rowA = Bytes.toBytes("rowA");
5333     byte[] rowB = Bytes.toBytes("rowB");
5334     byte[] cf = Bytes.toBytes("CF");
5335     byte[][] families = { cf };
5336     byte[] col = Bytes.toBytes("C");
5337     long ts = 1;
5338     String method = this.getName();
5339     this.region = initHRegion(tableName, method, families);
5340     try {
5341       KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5342       KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5343           null);
5344       KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5345       KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5346       Put put = null;
5347       put = new Put(rowC);
5348       put.add(kv1);
5349       put.add(kv11);
5350       region.put(put);
5351       put = new Put(rowA);
5352       put.add(kv2);
5353       region.put(put);
5354       put = new Put(rowB);
5355       put.add(kv3);
5356       region.put(put);
5357       Scan scan = new Scan();
5358       List<Cell> currRow = new ArrayList<Cell>();
5359       scan.setReversed(true);
5360       InternalScanner scanner = region.getScanner(scan);
5361       boolean hasNext = scanner.next(currRow);
5362       assertEquals(1, currRow.size());
5363       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
5364       assertTrue(hasNext);
5365       currRow.clear();
5366       hasNext = scanner.next(currRow);
5367       assertEquals(1, currRow.size());
5368       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
5369       assertTrue(hasNext);
5370       currRow.clear();
5371       hasNext = scanner.next(currRow);
5372       assertEquals(1, currRow.size());
5373       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
5374       assertFalse(hasNext);
5375       scanner.close();
5376     } finally {
5377       HRegion.closeHRegion(this.region);
5378       this.region = null;
5379     }
5380   }
5381 
5382   @Test (timeout=60000)
5383   public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
5384     // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5385     byte[] rowA = Bytes.toBytes("rowA");
5386     byte[] rowB = Bytes.toBytes("rowB");
5387     byte[] rowC = Bytes.toBytes("rowC");
5388     byte[] rowD = Bytes.toBytes("rowD");
5389     byte[] rowE = Bytes.toBytes("rowE");
5390     byte[] cf = Bytes.toBytes("CF");
5391     byte[][] families = { cf };
5392     byte[] col1 = Bytes.toBytes("col1");
5393     byte[] col2 = Bytes.toBytes("col2");
5394     long ts = 1;
5395     String method = this.getName();
5396     this.region = initHRegion(tableName, method, families);
5397     try {
5398       KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5399       KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5400       KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5401       KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5402       KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5403       KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5404       Put put = null;
5405       put = new Put(rowA);
5406       put.add(kv1);
5407       region.put(put);
5408       put = new Put(rowB);
5409       put.add(kv2);
5410       region.put(put);
5411       put = new Put(rowC);
5412       put.add(kv3);
5413       region.put(put);
5414       put = new Put(rowD);
5415       put.add(kv4_1);
5416       region.put(put);
5417       put = new Put(rowD);
5418       put.add(kv4_2);
5419       region.put(put);
5420       put = new Put(rowE);
5421       put.add(kv5);
5422       region.put(put);
5423       region.flush(true);
5424       Scan scan = new Scan(rowD, rowA);
5425       scan.addColumn(families[0], col1);
5426       scan.setReversed(true);
5427       List<Cell> currRow = new ArrayList<Cell>();
5428       InternalScanner scanner = region.getScanner(scan);
5429       boolean hasNext = scanner.next(currRow);
5430       assertEquals(1, currRow.size());
5431       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
5432       assertTrue(hasNext);
5433       currRow.clear();
5434       hasNext = scanner.next(currRow);
5435       assertEquals(1, currRow.size());
5436       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
5437       assertTrue(hasNext);
5438       currRow.clear();
5439       hasNext = scanner.next(currRow);
5440       assertEquals(1, currRow.size());
5441       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
5442       assertFalse(hasNext);
5443       scanner.close();
5444 
5445       scan = new Scan(rowD, rowA);
5446       scan.addColumn(families[0], col2);
5447       scan.setReversed(true);
5448       currRow.clear();
5449       scanner = region.getScanner(scan);
5450       hasNext = scanner.next(currRow);
5451       assertEquals(1, currRow.size());
5452       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
5453       scanner.close();
5454     } finally {
5455       HRegion.closeHRegion(this.region);
5456       this.region = null;
5457     }
5458   }
5459 
5460   @Test (timeout=60000)
5461   public void testReverseScanner_smaller_blocksize() throws IOException {
5462     // case to ensure no conflict with HFile index optimization
5463     byte[] rowA = Bytes.toBytes("rowA");
5464     byte[] rowB = Bytes.toBytes("rowB");
5465     byte[] rowC = Bytes.toBytes("rowC");
5466     byte[] rowD = Bytes.toBytes("rowD");
5467     byte[] rowE = Bytes.toBytes("rowE");
5468     byte[] cf = Bytes.toBytes("CF");
5469     byte[][] families = { cf };
5470     byte[] col1 = Bytes.toBytes("col1");
5471     byte[] col2 = Bytes.toBytes("col2");
5472     long ts = 1;
5473     String method = this.getName();
5474     HBaseConfiguration config = new HBaseConfiguration();
5475     config.setInt("test.block.size", 1);
5476     this.region = initHRegion(tableName, method, config, families);
5477     try {
5478       KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5479       KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5480       KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5481       KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5482       KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5483       KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5484       Put put = null;
5485       put = new Put(rowA);
5486       put.add(kv1);
5487       region.put(put);
5488       put = new Put(rowB);
5489       put.add(kv2);
5490       region.put(put);
5491       put = new Put(rowC);
5492       put.add(kv3);
5493       region.put(put);
5494       put = new Put(rowD);
5495       put.add(kv4_1);
5496       region.put(put);
5497       put = new Put(rowD);
5498       put.add(kv4_2);
5499       region.put(put);
5500       put = new Put(rowE);
5501       put.add(kv5);
5502       region.put(put);
5503       region.flush(true);
5504       Scan scan = new Scan(rowD, rowA);
5505       scan.addColumn(families[0], col1);
5506       scan.setReversed(true);
5507       List<Cell> currRow = new ArrayList<Cell>();
5508       InternalScanner scanner = region.getScanner(scan);
5509       boolean hasNext = scanner.next(currRow);
5510       assertEquals(1, currRow.size());
5511       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
5512       assertTrue(hasNext);
5513       currRow.clear();
5514       hasNext = scanner.next(currRow);
5515       assertEquals(1, currRow.size());
5516       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
5517       assertTrue(hasNext);
5518       currRow.clear();
5519       hasNext = scanner.next(currRow);
5520       assertEquals(1, currRow.size());
5521       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
5522       assertFalse(hasNext);
5523       scanner.close();
5524 
5525       scan = new Scan(rowD, rowA);
5526       scan.addColumn(families[0], col2);
5527       scan.setReversed(true);
5528       currRow.clear();
5529       scanner = region.getScanner(scan);
5530       hasNext = scanner.next(currRow);
5531       assertEquals(1, currRow.size());
5532       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
5533       scanner.close();
5534     } finally {
5535       HRegion.closeHRegion(this.region);
5536       this.region = null;
5537     }
5538   }
5539 
5540   @Test (timeout=60000)
5541   public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5542       throws IOException {
5543     byte[] row0 = Bytes.toBytes("row0"); // 1 kv
5544     byte[] row1 = Bytes.toBytes("row1"); // 2 kv
5545     byte[] row2 = Bytes.toBytes("row2"); // 4 kv
5546     byte[] row3 = Bytes.toBytes("row3"); // 2 kv
5547     byte[] row4 = Bytes.toBytes("row4"); // 5 kv
5548     byte[] row5 = Bytes.toBytes("row5"); // 2 kv
5549     byte[] cf1 = Bytes.toBytes("CF1");
5550     byte[] cf2 = Bytes.toBytes("CF2");
5551     byte[] cf3 = Bytes.toBytes("CF3");
5552     byte[][] families = { cf1, cf2, cf3 };
5553     byte[] col = Bytes.toBytes("C");
5554     long ts = 1;
5555     String method = this.getName();
5556     HBaseConfiguration conf = new HBaseConfiguration();
5557     // disable compactions in this test.
5558     conf.setInt("hbase.hstore.compactionThreshold", 10000);
5559     this.region = initHRegion(tableName, method, conf, families);
5560     try {
5561       // kv naming style: kv(row number) totalKvCountInThisRow seq no
5562       KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
5563           null);
5564       KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
5565           null);
5566       KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
5567           KeyValue.Type.Put, null);
5568       KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
5569           null);
5570       KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
5571           null);
5572       KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
5573           null);
5574       KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
5575           KeyValue.Type.Put, null);
5576       KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
5577           null);
5578       KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
5579           KeyValue.Type.Put, null);
5580       KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
5581           null);
5582       KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
5583           null);
5584       KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
5585           KeyValue.Type.Put, null);
5586       KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
5587           null);
5588       KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
5589           KeyValue.Type.Put, null);
5590       KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
5591           null);
5592       KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
5593           null);
5594       // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5595       Put put = null;
5596       put = new Put(row1);
5597       put.add(kv1_2_1);
5598       region.put(put);
5599       put = new Put(row2);
5600       put.add(kv2_4_1);
5601       region.put(put);
5602       put = new Put(row4);
5603       put.add(kv4_5_4);
5604       put.add(kv4_5_5);
5605       region.put(put);
5606       region.flush(true);
5607       // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5608       put = new Put(row4);
5609       put.add(kv4_5_1);
5610       put.add(kv4_5_3);
5611       region.put(put);
5612       put = new Put(row1);
5613       put.add(kv1_2_2);
5614       region.put(put);
5615       put = new Put(row2);
5616       put.add(kv2_4_4);
5617       region.put(put);
5618       region.flush(true);
5619       // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5620       put = new Put(row4);
5621       put.add(kv4_5_2);
5622       region.put(put);
5623       put = new Put(row2);
5624       put.add(kv2_4_2);
5625       put.add(kv2_4_3);
5626       region.put(put);
5627       put = new Put(row3);
5628       put.add(kv3_2_2);
5629       region.put(put);
5630       region.flush(true);
5631       // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5632       // ( 2 kv)
5633       put = new Put(row0);
5634       put.add(kv0_1_1);
5635       region.put(put);
5636       put = new Put(row3);
5637       put.add(kv3_2_1);
5638       region.put(put);
5639       put = new Put(row5);
5640       put.add(kv5_2_1);
5641       put.add(kv5_2_2);
5642       region.put(put);
5643       // scan range = ["row4", min), skip the max "row5"
5644       Scan scan = new Scan(row4);
5645       scan.setMaxVersions(5);
5646       scan.setBatch(3);
5647       scan.setReversed(true);
5648       InternalScanner scanner = region.getScanner(scan);
5649       List<Cell> currRow = new ArrayList<Cell>();
5650       boolean hasNext = false;
5651       // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5652       // included in scan range
5653       // "row4" takes 2 next() calls since batch=3
5654       hasNext = scanner.next(currRow);
5655       assertEquals(3, currRow.size());
5656       assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
5657       assertTrue(hasNext);
5658       currRow.clear();
5659       hasNext = scanner.next(currRow);
5660       assertEquals(2, currRow.size());
5661       assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
5662       assertTrue(hasNext);
5663       // 2. scan out "row3" (2 kv)
5664       currRow.clear();
5665       hasNext = scanner.next(currRow);
5666       assertEquals(2, currRow.size());
5667       assertTrue(Bytes.equals(currRow.get(0).getRow(), row3));
5668       assertTrue(hasNext);
5669       // 3. scan out "row2" (4 kvs)
5670       // "row2" takes 2 next() calls since batch=3
5671       currRow.clear();
5672       hasNext = scanner.next(currRow);
5673       assertEquals(3, currRow.size());
5674       assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
5675       assertTrue(hasNext);
5676       currRow.clear();
5677       hasNext = scanner.next(currRow);
5678       assertEquals(1, currRow.size());
5679       assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
5680       assertTrue(hasNext);
5681       // 4. scan out "row1" (2 kv)
5682       currRow.clear();
5683       hasNext = scanner.next(currRow);
5684       assertEquals(2, currRow.size());
5685       assertTrue(Bytes.equals(currRow.get(0).getRow(), row1));
5686       assertTrue(hasNext);
5687       // 5. scan out "row0" (1 kv)
5688       currRow.clear();
5689       hasNext = scanner.next(currRow);
5690       assertEquals(1, currRow.size());
5691       assertTrue(Bytes.equals(currRow.get(0).getRow(), row0));
5692       assertFalse(hasNext);
5693 
5694       scanner.close();
5695     } finally {
5696       HRegion.closeHRegion(this.region);
5697       this.region = null;
5698     }
5699   }
5700 
5701   @Test (timeout=60000)
5702   public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5703       throws IOException {
5704     byte[] row1 = Bytes.toBytes("row1");
5705     byte[] row2 = Bytes.toBytes("row2");
5706     byte[] row3 = Bytes.toBytes("row3");
5707     byte[] row4 = Bytes.toBytes("row4");
5708     byte[] cf1 = Bytes.toBytes("CF1");
5709     byte[] cf2 = Bytes.toBytes("CF2");
5710     byte[] cf3 = Bytes.toBytes("CF3");
5711     byte[] cf4 = Bytes.toBytes("CF4");
5712     byte[][] families = { cf1, cf2, cf3, cf4 };
5713     byte[] col = Bytes.toBytes("C");
5714     long ts = 1;
5715     String method = this.getName();
5716     HBaseConfiguration conf = new HBaseConfiguration();
5717     // disable compactions in this test.
5718     conf.setInt("hbase.hstore.compactionThreshold", 10000);
5719     this.region = initHRegion(tableName, method, conf, families);
5720     try {
5721       KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
5722       KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
5723       KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
5724       KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
5725       // storefile1
5726       Put put = new Put(row1);
5727       put.add(kv1);
5728       region.put(put);
5729       region.flush(true);
5730       // storefile2
5731       put = new Put(row2);
5732       put.add(kv2);
5733       region.put(put);
5734       region.flush(true);
5735       // storefile3
5736       put = new Put(row3);
5737       put.add(kv3);
5738       region.put(put);
5739       region.flush(true);
5740       // memstore
5741       put = new Put(row4);
5742       put.add(kv4);
5743       region.put(put);
5744       // scan range = ["row4", min)
5745       Scan scan = new Scan(row4);
5746       scan.setReversed(true);
5747       scan.setBatch(10);
5748       InternalScanner scanner = region.getScanner(scan);
5749       List<Cell> currRow = new ArrayList<Cell>();
5750       boolean hasNext = scanner.next(currRow);
5751       assertEquals(1, currRow.size());
5752       assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
5753       assertTrue(hasNext);
5754       currRow.clear();
5755       hasNext = scanner.next(currRow);
5756       assertEquals(1, currRow.size());
5757       assertTrue(Bytes.equals(currRow.get(0).getRow(), row3));
5758       assertTrue(hasNext);
5759       currRow.clear();
5760       hasNext = scanner.next(currRow);
5761       assertEquals(1, currRow.size());
5762       assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
5763       assertTrue(hasNext);
5764       currRow.clear();
5765       hasNext = scanner.next(currRow);
5766       assertEquals(1, currRow.size());
5767       assertTrue(Bytes.equals(currRow.get(0).getRow(), row1));
5768       assertFalse(hasNext);
5769     } finally {
5770       HRegion.closeHRegion(this.region);
5771       this.region = null;
5772     }
5773   }
5774 
5775   @Test (timeout=60000)
5776   public void testSplitRegionWithReverseScan() throws IOException {
5777     byte [] tableName = Bytes.toBytes("testSplitRegionWithReverseScan");
5778     byte [] qualifier = Bytes.toBytes("qualifier");
5779     Configuration hc = initSplit();
5780     int numRows = 3;
5781     byte [][] families = {fam1};
5782 
5783     //Setting up region
5784     String method = this.getName();
5785     this.region = initHRegion(tableName, method, hc, families);
5786 
5787     //Put data in region
5788     int startRow = 100;
5789     putData(startRow, numRows, qualifier, families);
5790     int splitRow = startRow + numRows;
5791     putData(splitRow, numRows, qualifier, families);
5792     region.flush(true);
5793 
5794     HRegion [] regions = null;
5795     try {
5796       regions = splitRegion(region, Bytes.toBytes("" + splitRow));
5797       //Opening the regions returned.
5798       for (int i = 0; i < regions.length; i++) {
5799         regions[i] = HRegion.openHRegion(regions[i], null);
5800       }
5801       //Verifying that the region has been split
5802       assertEquals(2, regions.length);
5803 
5804       //Verifying that all data is still there and that data is in the right
5805       //place
5806       verifyData(regions[0], startRow, numRows, qualifier, families);
5807       verifyData(regions[1], splitRow, numRows, qualifier, families);
5808 
5809       //fire the reverse scan1:  top range, and larger than the last row
5810       Scan scan = new Scan(Bytes.toBytes(String.valueOf(startRow + 10 * numRows)));
5811       scan.setReversed(true);
5812       InternalScanner scanner = regions[1].getScanner(scan);
5813       List<Cell> currRow = new ArrayList<Cell>();
5814       boolean more = false;
5815       int verify = startRow + 2 * numRows - 1;
5816       do {
5817         more = scanner.next(currRow);
5818         assertEquals(Bytes.toString(currRow.get(0).getRow()), verify + "");
5819         verify--;
5820         currRow.clear();
5821       } while(more);
5822       assertEquals(verify, startRow + numRows - 1);
5823       scanner.close();
5824       //fire the reverse scan2:  top range, and equals to the last row
5825       scan = new Scan(Bytes.toBytes(String.valueOf(startRow + 2 * numRows - 1)));
5826       scan.setReversed(true);
5827       scanner = regions[1].getScanner(scan);
5828       verify = startRow + 2 * numRows - 1;
5829       do {
5830         more = scanner.next(currRow);
5831         assertEquals(Bytes.toString(currRow.get(0).getRow()), verify + "");
5832         verify--;
5833         currRow.clear();
5834       } while(more);
5835       assertEquals(verify, startRow + numRows - 1);
5836       scanner.close();
5837       //fire the reverse scan3:  bottom range, and larger than the last row
5838       scan = new Scan(Bytes.toBytes(String.valueOf(startRow + numRows)));
5839       scan.setReversed(true);
5840       scanner = regions[0].getScanner(scan);
5841       verify = startRow + numRows - 1;
5842       do {
5843         more = scanner.next(currRow);
5844         assertEquals(Bytes.toString(currRow.get(0).getRow()), verify + "");
5845         verify--;
5846         currRow.clear();
5847       } while(more);
5848       assertEquals(verify, 99);
5849       scanner.close();
5850       //fire the reverse scan4:  bottom range, and equals to the last row
5851       scan = new Scan(Bytes.toBytes(String.valueOf(startRow + numRows - 1)));
5852       scan.setReversed(true);
5853       scanner = regions[0].getScanner(scan);
5854       verify = startRow + numRows - 1;
5855       do {
5856         more = scanner.next(currRow);
5857         assertEquals(Bytes.toString(currRow.get(0).getRow()), verify + "");
5858         verify--;
5859         currRow.clear();
5860       } while(more);
5861       assertEquals(verify, startRow - 1);
5862       scanner.close();
5863     } finally {
5864       HRegion.closeHRegion(this.region);
5865       this.region = null;
5866     }
5867   }
5868 
5869   @Test
5870   public void testWriteRequestsCounter() throws IOException {
5871     byte[] fam = Bytes.toBytes("info");
5872     byte[][] families = { fam };
5873     this.region = initHRegion(tableName, method, CONF, families);
5874 
5875     Assert.assertEquals(0L, region.getWriteRequestsCount());
5876 
5877     Put put = new Put(row);
5878     put.add(fam, fam, fam);
5879 
5880     Assert.assertEquals(0L, region.getWriteRequestsCount());
5881     region.put(put);
5882     Assert.assertEquals(1L, region.getWriteRequestsCount());
5883     region.put(put);
5884     Assert.assertEquals(2L, region.getWriteRequestsCount());
5885     region.put(put);
5886     Assert.assertEquals(3L, region.getWriteRequestsCount());
5887 
5888     region.delete(new Delete(row));
5889     Assert.assertEquals(4L, region.getWriteRequestsCount());
5890 
5891     HRegion.closeHRegion(this.region);
5892     this.region = null;
5893   }
5894 
5895   @Test
5896   public void testOpenRegionWrittenToWAL() throws Exception {
5897     final ServerName serverName = ServerName.valueOf("testOpenRegionWrittenToWAL", 100, 42);
5898     final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5899 
5900     HTableDescriptor htd
5901         = new HTableDescriptor(TableName.valueOf("testOpenRegionWrittenToWAL"));
5902     htd.addFamily(new HColumnDescriptor(fam1));
5903     htd.addFamily(new HColumnDescriptor(fam2));
5904 
5905     HRegionInfo hri = new HRegionInfo(htd.getTableName(),
5906       HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
5907 
5908     // open the region w/o rss and wal and flush some files
5909     HRegion region =
5910          HRegion.createHRegion(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
5911             .getConfiguration(), htd);
5912     assertNotNull(region);
5913 
5914     // create a file in fam1 for the region before opening in OpenRegionHandler
5915     region.put(new Put(Bytes.toBytes("a")).add(fam1, fam1, fam1));
5916     region.flush(true);
5917     region.close();
5918 
5919     ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5920 
5921     // capture append() calls
5922     WAL wal = mockWAL();
5923     when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
5924 
5925     try {
5926       region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5927         TEST_UTIL.getConfiguration(), rss, null);
5928 
5929       verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
5930         , editCaptor.capture(), anyBoolean());
5931 
5932       WALEdit edit = editCaptor.getValue();
5933       assertNotNull(edit);
5934       assertNotNull(edit.getCells());
5935       assertEquals(1, edit.getCells().size());
5936       RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5937       assertNotNull(desc);
5938 
5939       LOG.info("RegionEventDescriptor from WAL: " + desc);
5940 
5941       assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
5942       assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getName()));
5943       assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5944         hri.getEncodedNameAsBytes()));
5945       assertTrue(desc.getLogSequenceNumber() > 0);
5946       assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5947       assertEquals(2, desc.getStoresCount());
5948 
5949       StoreDescriptor store = desc.getStores(0);
5950       assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5951       assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5952       assertEquals(1, store.getStoreFileCount()); // 1store file
5953       assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
5954 
5955       store = desc.getStores(1);
5956       assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5957       assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5958       assertEquals(0, store.getStoreFileCount()); // no store files
5959 
5960     } finally {
5961       HRegion.closeHRegion(region);
5962     }
5963   }
5964 
5965   // Helper for test testOpenRegionWrittenToWALForLogReplay
5966   static class HRegionWithSeqId extends HRegion {
5967     public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
5968         final Configuration confParam, final HRegionInfo regionInfo,
5969         final HTableDescriptor htd, final RegionServerServices rsServices) {
5970       super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
5971     }
5972     @Override
5973     protected long getNextSequenceId(WAL wal) throws IOException {
5974       return 42;
5975     }
5976   }
5977 
5978   @Test
5979   public void testFlushedFileWithNoTags() throws Exception {
5980     String method = "testFlushedFileWithNoTags";
5981     HTableDescriptor htd = new HTableDescriptor(tableName);
5982     htd.addFamily(new HColumnDescriptor(fam1));
5983     region = initHRegion(Bytes.toBytes(method), method, TEST_UTIL.getConfiguration(), fam1);
5984     Put put = new Put(Bytes.toBytes("a-b-0-0"));
5985     put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
5986     region.put(put);
5987     region.flush(true);
5988     Store store = region.getStore(fam1);
5989     Collection<StoreFile> storefiles = store.getStorefiles();
5990     for (StoreFile sf : storefiles) {
5991       assertFalse("Tags should not be present "
5992           ,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
5993     }
5994   }
5995 
5996   @Test
5997   public void testOpenRegionWrittenToWALForLogReplay() throws Exception {
5998     // similar to the above test but with distributed log replay
5999     final ServerName serverName = ServerName.valueOf("testOpenRegionWrittenToWALForLogReplay",
6000       100, 42);
6001     final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6002 
6003     HTableDescriptor htd
6004         = new HTableDescriptor(TableName.valueOf("testOpenRegionWrittenToWALForLogReplay"));
6005     htd.addFamily(new HColumnDescriptor(fam1));
6006     htd.addFamily(new HColumnDescriptor(fam2));
6007 
6008     HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6009       HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6010 
6011     // open the region w/o rss and wal and flush some files
6012     HRegion region = HRegion.createHRegion(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
6013              .getConfiguration(), htd);
6014     assertNotNull(region);
6015 
6016     // create a file in fam1 for the region before opening in OpenRegionHandler
6017     region.put(new Put(Bytes.toBytes("a")).add(fam1, fam1, fam1));
6018     region.flush(true);
6019     HRegion.closeHRegion(region);
6020 
6021     ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
6022 
6023     // capture append() calls
6024     WAL wal = mockWAL();
6025     when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
6026 
6027     // add the region to recovering regions
6028     HashMap<String, Region> recoveringRegions = Maps.newHashMap();
6029     recoveringRegions.put(region.getRegionInfo().getEncodedName(), null);
6030     when(rss.getRecoveringRegions()).thenReturn(recoveringRegions);
6031 
6032     try {
6033       Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
6034       conf.set(HConstants.REGION_IMPL, HRegionWithSeqId.class.getName());
6035       region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
6036         conf, rss, null);
6037 
6038       // verify that we have not appended region open event to WAL because this region is still
6039       // recovering
6040       verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
6041         , editCaptor.capture(), anyBoolean());
6042 
6043       // not put the region out of recovering state
6044       new FinishRegionRecoveringHandler(rss, region.getRegionInfo().getEncodedName(), "/foo")
6045         .prepare().process();
6046 
6047       // now we should have put the entry
6048       verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
6049         , editCaptor.capture(), anyBoolean());
6050 
6051       WALEdit edit = editCaptor.getValue();
6052       assertNotNull(edit);
6053       assertNotNull(edit.getCells());
6054       assertEquals(1, edit.getCells().size());
6055       RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
6056       assertNotNull(desc);
6057 
6058       LOG.info("RegionEventDescriptor from WAL: " + desc);
6059 
6060       assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
6061       assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getName()));
6062       assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
6063         hri.getEncodedNameAsBytes()));
6064       assertTrue(desc.getLogSequenceNumber() > 0);
6065       assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
6066       assertEquals(2, desc.getStoresCount());
6067 
6068       StoreDescriptor store = desc.getStores(0);
6069       assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
6070       assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
6071       assertEquals(1, store.getStoreFileCount()); // 1store file
6072       assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
6073 
6074       store = desc.getStores(1);
6075       assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
6076       assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
6077       assertEquals(0, store.getStoreFileCount()); // no store files
6078 
6079     } finally {
6080       HRegion.closeHRegion(region);
6081     }
6082   }
6083 
6084   /**
6085    * Utility method to setup a WAL mock.
6086    * Needs to do the bit where we close latch on the WALKey on append else test hangs.
6087    * @return
6088    * @throws IOException
6089    */
6090   private WAL mockWAL() throws IOException {
6091     WAL wal = mock(WAL.class);
6092     Mockito.when(wal.append((HTableDescriptor)Mockito.any(), (HRegionInfo)Mockito.any(),
6093         (WALKey)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
6094       thenAnswer(new Answer<Long>() {
6095         @Override
6096         public Long answer(InvocationOnMock invocation) throws Throwable {
6097           WALKey key = invocation.getArgumentAt(2, WALKey.class);
6098           MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
6099           key.setWriteEntry(we);
6100           return 1L;
6101         }
6102 
6103     });
6104     return wal;
6105   }
6106 
6107   @Test
6108   public void testCloseRegionWrittenToWAL() throws Exception {
6109     final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
6110     final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6111 
6112     HTableDescriptor htd
6113     = new HTableDescriptor(TableName.valueOf("testOpenRegionWrittenToWAL"));
6114     htd.addFamily(new HColumnDescriptor(fam1));
6115     htd.addFamily(new HColumnDescriptor(fam2));
6116 
6117     final HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6118       HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6119 
6120     ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
6121 
6122     // capture append() calls
6123     WAL wal = mockWAL();
6124     when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
6125 
6126 
6127     // open a region first so that it can be closed later
6128     region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
6129       TEST_UTIL.getConfiguration(), rss, null);
6130 
6131     // close the region
6132     region.close(false);
6133 
6134     // 2 times, one for region open, the other close region
6135     verify(wal, times(2)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
6136       editCaptor.capture(), anyBoolean());
6137 
6138     WALEdit edit = editCaptor.getAllValues().get(1);
6139     assertNotNull(edit);
6140     assertNotNull(edit.getCells());
6141     assertEquals(1, edit.getCells().size());
6142     RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
6143     assertNotNull(desc);
6144 
6145     LOG.info("RegionEventDescriptor from WAL: " + desc);
6146 
6147     assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
6148     assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getName()));
6149     assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
6150       hri.getEncodedNameAsBytes()));
6151     assertTrue(desc.getLogSequenceNumber() > 0);
6152     assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
6153     assertEquals(2, desc.getStoresCount());
6154 
6155     StoreDescriptor store = desc.getStores(0);
6156     assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
6157     assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
6158     assertEquals(0, store.getStoreFileCount()); // no store files
6159 
6160     store = desc.getStores(1);
6161     assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
6162     assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
6163     assertEquals(0, store.getStoreFileCount()); // no store files
6164   }
6165 
6166   /**
6167    * Test RegionTooBusyException thrown when region is busy
6168    */
6169   @Test (timeout=24000)
6170   public void testRegionTooBusy() throws IOException {
6171     String method = "testRegionTooBusy";
6172     byte[] tableName = Bytes.toBytes(method);
6173     byte[] family = Bytes.toBytes("family");
6174     long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration",
6175       HRegion.DEFAULT_BUSY_WAIT_DURATION);
6176     CONF.setLong("hbase.busy.wait.duration", 1000);
6177     region = initHRegion(tableName, method, CONF, family);
6178     final AtomicBoolean stopped = new AtomicBoolean(true);
6179     Thread t = new Thread(new Runnable() {
6180       @Override
6181       public void run() {
6182         try {
6183           region.lock.writeLock().lock();
6184           stopped.set(false);
6185           while (!stopped.get()) {
6186             Thread.sleep(100);
6187           }
6188         } catch (InterruptedException ie) {
6189         } finally {
6190           region.lock.writeLock().unlock();
6191         }
6192       }
6193     });
6194     t.start();
6195     Get get = new Get(row);
6196     try {
6197       while (stopped.get()) {
6198         Thread.sleep(100);
6199       }
6200       region.get(get);
6201       fail("Should throw RegionTooBusyException");
6202     } catch (InterruptedException ie) {
6203       fail("test interrupted");
6204     } catch (RegionTooBusyException e) {
6205       // Good, expected
6206     } finally {
6207       stopped.set(true);
6208       try {
6209         t.join();
6210       } catch (Throwable e) {
6211       }
6212 
6213       HRegion.closeHRegion(region);
6214       region = null;
6215       CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration);
6216     }
6217   }
6218 
6219   @Test
6220   public void testCellTTLs() throws IOException {
6221     IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
6222     EnvironmentEdgeManager.injectEdge(edge);
6223 
6224     final byte[] row = Bytes.toBytes("testRow");
6225     final byte[] q1 = Bytes.toBytes("q1");
6226     final byte[] q2 = Bytes.toBytes("q2");
6227     final byte[] q3 = Bytes.toBytes("q3");
6228     final byte[] q4 = Bytes.toBytes("q4");
6229 
6230     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
6231     HColumnDescriptor hcd = new HColumnDescriptor(fam1);
6232     hcd.setTimeToLive(10); // 10 seconds
6233     htd.addFamily(hcd);
6234 
6235     Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
6236     conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
6237 
6238     HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
6239         HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
6240       TEST_UTIL.getDataTestDir(), conf, htd);
6241     assertNotNull(region);
6242     try {
6243       long now = EnvironmentEdgeManager.currentTime();
6244       // Add a cell that will expire in 5 seconds via cell TTL
6245       region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
6246         HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
6247           // TTL tags specify ts in milliseconds
6248           new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
6249       // Add a cell that will expire after 10 seconds via family setting
6250       region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
6251       // Add a cell that will expire in 15 seconds via cell TTL
6252       region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
6253         HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
6254           // TTL tags specify ts in milliseconds
6255           new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
6256       // Add a cell that will expire in 20 seconds via family setting
6257       region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
6258 
6259       // Flush so we are sure store scanning gets this right
6260       region.flush(true);
6261 
6262       // A query at time T+0 should return all cells
6263       Result r = region.get(new Get(row));
6264       assertNotNull(r.getValue(fam1, q1));
6265       assertNotNull(r.getValue(fam1, q2));
6266       assertNotNull(r.getValue(fam1, q3));
6267       assertNotNull(r.getValue(fam1, q4));
6268 
6269       // Increment time to T+5 seconds
6270       edge.incrementTime(5000);
6271 
6272       r = region.get(new Get(row));
6273       assertNull(r.getValue(fam1, q1));
6274       assertNotNull(r.getValue(fam1, q2));
6275       assertNotNull(r.getValue(fam1, q3));
6276       assertNotNull(r.getValue(fam1, q4));
6277 
6278       // Increment time to T+10 seconds
6279       edge.incrementTime(5000);
6280 
6281       r = region.get(new Get(row));
6282       assertNull(r.getValue(fam1, q1));
6283       assertNull(r.getValue(fam1, q2));
6284       assertNotNull(r.getValue(fam1, q3));
6285       assertNotNull(r.getValue(fam1, q4));
6286 
6287       // Increment time to T+15 seconds
6288       edge.incrementTime(5000);
6289 
6290       r = region.get(new Get(row));
6291       assertNull(r.getValue(fam1, q1));
6292       assertNull(r.getValue(fam1, q2));
6293       assertNull(r.getValue(fam1, q3));
6294       assertNotNull(r.getValue(fam1, q4));
6295 
6296       // Increment time to T+20 seconds
6297       edge.incrementTime(10000);
6298 
6299       r = region.get(new Get(row));
6300       assertNull(r.getValue(fam1, q1));
6301       assertNull(r.getValue(fam1, q2));
6302       assertNull(r.getValue(fam1, q3));
6303       assertNull(r.getValue(fam1, q4));
6304 
6305       // Fun with disappearing increments
6306 
6307       // Start at 1
6308       region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
6309       r = region.get(new Get(row));
6310       byte[] val = r.getValue(fam1, q1);
6311       assertNotNull(val);
6312       assertEquals(Bytes.toLong(val), 1L);
6313 
6314       // Increment with a TTL of 5 seconds
6315       Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
6316       incr.setTTL(5000);
6317       region.increment(incr); // 2
6318 
6319       // New value should be 2
6320       r = region.get(new Get(row));
6321       val = r.getValue(fam1, q1);
6322       assertNotNull(val);
6323       assertEquals(Bytes.toLong(val), 2L);
6324 
6325       // Increment time to T+25 seconds
6326       edge.incrementTime(5000);
6327 
6328       // Value should be back to 1
6329       r = region.get(new Get(row));
6330       val = r.getValue(fam1, q1);
6331       assertNotNull(val);
6332       assertEquals(Bytes.toLong(val), 1L);
6333 
6334       // Increment time to T+30 seconds
6335       edge.incrementTime(5000);
6336 
6337       // Original value written at T+20 should be gone now via family TTL
6338       r = region.get(new Get(row));
6339       assertNull(r.getValue(fam1, q1));
6340 
6341     } finally {
6342       HRegion.closeHRegion(region);
6343     }
6344   }
6345 
6346   @Test
6347   public void testIncrementTimestampsAreMonotonic() throws IOException {
6348     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
6349     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6350     EnvironmentEdgeManager.injectEdge(edge);
6351 
6352     edge.setValue(10);
6353     Increment inc = new Increment(row);
6354     inc.setDurability(Durability.SKIP_WAL);
6355     inc.addColumn(fam1, qual1, 1L);
6356     region.increment(inc);
6357 
6358     Result result = region.get(new Get(row));
6359     Cell c = result.getColumnLatestCell(fam1, qual1);
6360     assertNotNull(c);
6361     assertEquals(c.getTimestamp(), 10L);
6362 
6363     edge.setValue(1); // clock goes back
6364     region.increment(inc);
6365     result = region.get(new Get(row));
6366     c = result.getColumnLatestCell(fam1, qual1);
6367     assertEquals(c.getTimestamp(), 10L);
6368     assertEquals(Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()), 2L);
6369   }
6370 
6371   @Test
6372   public void testAppendTimestampsAreMonotonic() throws IOException {
6373     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
6374     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6375     EnvironmentEdgeManager.injectEdge(edge);
6376 
6377     edge.setValue(10);
6378     Append a = new Append(row);
6379     a.setDurability(Durability.SKIP_WAL);
6380     a.add(fam1, qual1, qual1);
6381     region.append(a);
6382 
6383     Result result = region.get(new Get(row));
6384     Cell c = result.getColumnLatestCell(fam1, qual1);
6385     assertNotNull(c);
6386     assertEquals(c.getTimestamp(), 10L);
6387 
6388     edge.setValue(1); // clock goes back
6389     region.append(a);
6390     result = region.get(new Get(row));
6391     c = result.getColumnLatestCell(fam1, qual1);
6392     assertEquals(c.getTimestamp(), 10L);
6393 
6394     byte[] expected = new byte[qual1.length*2];
6395     System.arraycopy(qual1, 0, expected, 0, qual1.length);
6396     System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
6397 
6398     assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6399       expected, 0, expected.length));
6400   }
6401 
6402   @Test
6403   public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
6404     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
6405     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6406     EnvironmentEdgeManager.injectEdge(edge);
6407 
6408     edge.setValue(10);
6409     Put p = new Put(row);
6410     p.setDurability(Durability.SKIP_WAL);
6411     p.addColumn(fam1, qual1, qual1);
6412     region.put(p);
6413 
6414     Result result = region.get(new Get(row));
6415     Cell c = result.getColumnLatestCell(fam1, qual1);
6416     assertNotNull(c);
6417     assertEquals(c.getTimestamp(), 10L);
6418 
6419     edge.setValue(1); // clock goes back
6420     p = new Put(row);
6421     p.setDurability(Durability.SKIP_WAL);
6422     p.addColumn(fam1, qual1, qual2);
6423     region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1), p, false);
6424     result = region.get(new Get(row));
6425     c = result.getColumnLatestCell(fam1, qual1);
6426     assertEquals(c.getTimestamp(), 10L);
6427 
6428     assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6429       qual2, 0, qual2.length));
6430   }
6431 
6432   @Test(timeout = 60000)
6433   public void testBatchMutateWithWrongRegionException() throws Exception {
6434     final byte[] a = Bytes.toBytes("a");
6435     final byte[] b = Bytes.toBytes("b");
6436     final byte[] c = Bytes.toBytes("c"); // exclusive
6437 
6438     int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
6439     CONF.setInt("hbase.rowlock.wait.duration", 1000);
6440     final HRegion region = initHRegion(tableName, a, c, name.getMethodName(), CONF, false, fam1);
6441 
6442     Mutation[] mutations = new Mutation[] {
6443         new Put(a).addImmutable(fam1, null, null),
6444         new Put(c).addImmutable(fam1, null, null), // this is outside the region boundary
6445         new Put(b).addImmutable(fam1, null, null),
6446     };
6447 
6448     OperationStatus[] status = region.batchMutate(mutations);
6449     assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS);
6450     assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SANITY_CHECK_FAILURE);
6451     assertEquals(status[2].getOperationStatusCode(), OperationStatusCode.SUCCESS);
6452 
6453 
6454     // test with a row lock held for a long time
6455     final CountDownLatch obtainedRowLock = new CountDownLatch(1);
6456     ExecutorService exec = Executors.newFixedThreadPool(2);
6457     Future<Void> f1 = exec.submit(new Callable<Void>() {
6458       @Override
6459       public Void call() throws Exception {
6460         LOG.info("Acquiring row lock");
6461         RowLock rl = region.getRowLock(b);
6462         obtainedRowLock.countDown();
6463         LOG.info("Waiting for 5 seconds before releasing lock");
6464         Threads.sleep(5000);
6465         LOG.info("Releasing row lock");
6466         rl.release();
6467         return null;
6468       }
6469     });
6470     obtainedRowLock.await(30, TimeUnit.SECONDS);
6471 
6472     Future<Void> f2 = exec.submit(new Callable<Void>() {
6473       @Override
6474       public Void call() throws Exception {
6475         Mutation[] mutations = new Mutation[] {
6476             new Put(a).addImmutable(fam1, null, null),
6477             new Put(b).addImmutable(fam1, null, null),
6478         };
6479 
6480         // this will wait for the row lock, and it will eventually succeed
6481         OperationStatus[] status = region.batchMutate(mutations);
6482         assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS);
6483         assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SUCCESS);
6484         return null;
6485       }
6486     });
6487 
6488     f1.get();
6489     f2.get();
6490 
6491     CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
6492   }
6493 
6494   @Test
6495   public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
6496     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
6497     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6498     EnvironmentEdgeManager.injectEdge(edge);
6499 
6500     edge.setValue(10);
6501     Put p = new Put(row);
6502     p.setDurability(Durability.SKIP_WAL);
6503     p.addColumn(fam1, qual1, qual1);
6504     region.put(p);
6505 
6506     Result result = region.get(new Get(row));
6507     Cell c = result.getColumnLatestCell(fam1, qual1);
6508     assertNotNull(c);
6509     assertEquals(c.getTimestamp(), 10L);
6510 
6511     edge.setValue(1); // clock goes back
6512     p = new Put(row);
6513     p.setDurability(Durability.SKIP_WAL);
6514     p.addColumn(fam1, qual1, qual2);
6515     RowMutations rm = new RowMutations(row);
6516     rm.add(p);
6517     region.checkAndRowMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1),
6518       rm, false);
6519     result = region.get(new Get(row));
6520     c = result.getColumnLatestCell(fam1, qual1);
6521     assertEquals(c.getTimestamp(), 10L);
6522 
6523     assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6524       qual2, 0, qual2.length));
6525   }
6526 
6527   static HRegion initHRegion(byte[] tableName, String callingMethod,
6528       byte[]... families) throws IOException {
6529     return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
6530         families);
6531   }
6532 }