View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.io.UnsupportedEncodingException;
25  import java.lang.reflect.Constructor;
26  import java.text.ParseException;
27  import java.util.AbstractList;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.NavigableMap;
36  import java.util.NavigableSet;
37  import java.util.Set;
38  import java.util.TreeMap;
39  import java.util.UUID;
40  import java.util.concurrent.Callable;
41  import java.util.concurrent.CompletionService;
42  import java.util.concurrent.ConcurrentHashMap;
43  import java.util.concurrent.ConcurrentSkipListMap;
44  import java.util.concurrent.CountDownLatch;
45  import java.util.concurrent.ExecutionException;
46  import java.util.concurrent.ExecutorCompletionService;
47  import java.util.concurrent.ExecutorService;
48  import java.util.concurrent.Executors;
49  import java.util.concurrent.Future;
50  import java.util.concurrent.FutureTask;
51  import java.util.concurrent.ThreadFactory;
52  import java.util.concurrent.ThreadPoolExecutor;
53  import java.util.concurrent.TimeUnit;
54  import java.util.concurrent.TimeoutException;
55  import java.util.concurrent.atomic.AtomicBoolean;
56  import java.util.concurrent.atomic.AtomicInteger;
57  import java.util.concurrent.atomic.AtomicLong;
58  import java.util.concurrent.locks.Lock;
59  import java.util.concurrent.locks.ReentrantReadWriteLock;
60  
61  import org.apache.commons.logging.Log;
62  import org.apache.commons.logging.LogFactory;
63  import org.apache.hadoop.classification.InterfaceAudience;
64  import org.apache.hadoop.conf.Configuration;
65  import org.apache.hadoop.fs.FileStatus;
66  import org.apache.hadoop.fs.FileSystem;
67  import org.apache.hadoop.fs.Path;
68  import org.apache.hadoop.hbase.Cell;
69  import org.apache.hadoop.hbase.CellScanner;
70  import org.apache.hadoop.hbase.CellUtil;
71  import org.apache.hadoop.hbase.CompoundConfiguration;
72  import org.apache.hadoop.hbase.DoNotRetryIOException;
73  import org.apache.hadoop.hbase.DroppedSnapshotException;
74  import org.apache.hadoop.hbase.HBaseConfiguration;
75  import org.apache.hadoop.hbase.HColumnDescriptor;
76  import org.apache.hadoop.hbase.HConstants;
77  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
78  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
79  import org.apache.hadoop.hbase.HRegionInfo;
80  import org.apache.hadoop.hbase.HTableDescriptor;
81  import org.apache.hadoop.hbase.KeyValue;
82  import org.apache.hadoop.hbase.KeyValueUtil;
83  import org.apache.hadoop.hbase.NotServingRegionException;
84  import org.apache.hadoop.hbase.RegionTooBusyException;
85  import org.apache.hadoop.hbase.TableName;
86  import org.apache.hadoop.hbase.UnknownScannerException;
87  import org.apache.hadoop.hbase.backup.HFileArchiver;
88  import org.apache.hadoop.hbase.client.Append;
89  import org.apache.hadoop.hbase.client.Delete;
90  import org.apache.hadoop.hbase.client.Durability;
91  import org.apache.hadoop.hbase.client.Get;
92  import org.apache.hadoop.hbase.client.Increment;
93  import org.apache.hadoop.hbase.client.IsolationLevel;
94  import org.apache.hadoop.hbase.client.Mutation;
95  import org.apache.hadoop.hbase.client.Put;
96  import org.apache.hadoop.hbase.client.Result;
97  import org.apache.hadoop.hbase.client.RowMutations;
98  import org.apache.hadoop.hbase.client.Scan;
99  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
100 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
101 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
102 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
103 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
104 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
105 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
106 import org.apache.hadoop.hbase.filter.FilterWrapper;
107 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
108 import org.apache.hadoop.hbase.io.HeapSize;
109 import org.apache.hadoop.hbase.io.TimeRange;
110 import org.apache.hadoop.hbase.io.hfile.BlockCache;
111 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
112 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
113 import org.apache.hadoop.hbase.ipc.RpcCallContext;
114 import org.apache.hadoop.hbase.ipc.RpcServer;
115 import org.apache.hadoop.hbase.master.AssignmentManager;
116 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
117 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
119 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
121 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
122 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
123 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
124 import org.apache.hadoop.hbase.regionserver.wal.HLog;
125 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
126 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
127 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
128 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
129 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
130 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
131 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
132 import org.apache.hadoop.hbase.util.Bytes;
133 import org.apache.hadoop.hbase.util.CancelableProgressable;
134 import org.apache.hadoop.hbase.util.ClassSize;
135 import org.apache.hadoop.hbase.util.CompressionTest;
136 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
137 import org.apache.hadoop.hbase.util.FSUtils;
138 import org.apache.hadoop.hbase.util.HashedBytes;
139 import org.apache.hadoop.hbase.util.Pair;
140 import org.apache.hadoop.hbase.util.Threads;
141 import org.apache.hadoop.io.MultipleIOException;
142 import org.apache.hadoop.util.StringUtils;
143 import org.cliffc.high_scale_lib.Counter;
144 
145 import com.google.common.annotations.VisibleForTesting;
146 import com.google.common.base.Preconditions;
147 import com.google.common.collect.Lists;
148 import com.google.common.collect.Maps;
149 import com.google.common.io.Closeables;
150 import com.google.protobuf.Descriptors;
151 import com.google.protobuf.Message;
152 import com.google.protobuf.RpcCallback;
153 import com.google.protobuf.RpcController;
154 import com.google.protobuf.Service;
155 
156 /**
157  * HRegion stores data for a certain region of a table.  It stores all columns
158  * for each row. A given table consists of one or more HRegions.
159  *
160  * <p>We maintain multiple HStores for a single HRegion.
161  *
162  * <p>An Store is a set of rows with some column data; together,
163  * they make up all the data for the rows.
164  *
165  * <p>Each HRegion has a 'startKey' and 'endKey'.
166  * <p>The first is inclusive, the second is exclusive (except for
167  * the final region)  The endKey of region 0 is the same as
168  * startKey for region 1 (if it exists).  The startKey for the
169  * first region is null. The endKey for the final region is null.
170  *
171  * <p>Locking at the HRegion level serves only one purpose: preventing the
172  * region from being closed (and consequently split) while other operations
173  * are ongoing. Each row level operation obtains both a row lock and a region
174  * read lock for the duration of the operation. While a scanner is being
175  * constructed, getScanner holds a read lock. If the scanner is successfully
176  * constructed, it holds a read lock until it is closed. A close takes out a
177  * write lock and consequently will block for ongoing operations and will block
178  * new operations from starting while the close is in progress.
179  *
180  * <p>An HRegion is defined by its table and its key extent.
181  *
182  * <p>It consists of at least one Store.  The number of Stores should be
183  * configurable, so that data which is accessed together is stored in the same
184  * Store.  Right now, we approximate that by building a single Store for
185  * each column family.  (This config info will be communicated via the
186  * tabledesc.)
187  *
188  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
189  * regionName is a unique identifier for this HRegion. (startKey, endKey]
190  * defines the keyspace for this HRegion.
191  */
192 @InterfaceAudience.Private
193 public class HRegion implements HeapSize { // , Writable{
194   public static final Log LOG = LogFactory.getLog(HRegion.class);
195 
196   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
197       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
198       
199   /**
200    * This is the global default value for durability. All tables/mutations not
201    * defining a durability or using USE_DEFAULT will default to this value.
202    */
203   private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
204 
205   final AtomicBoolean closed = new AtomicBoolean(false);
206   /* Closing can take some time; use the closing flag if there is stuff we don't
207    * want to do while in closing state; e.g. like offer this region up to the
208    * master as a region to close if the carrying regionserver is overloaded.
209    * Once set, it is never cleared.
210    */
211   final AtomicBoolean closing = new AtomicBoolean(false);
212 
213   protected volatile long completeSequenceId = -1L;
214 
215   /**
216    * Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1,
217    * as a marker that the region hasn't opened yet. Once it is opened, it is set to
218    * {@link #openSeqNum}.
219    */
220   private final AtomicLong sequenceId = new AtomicLong(-1L);
221 
222   /**
223    * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
224    * startRegionOperation to possibly invoke different checks before any region operations. Not all
225    * operations have to be defined here. It's only needed when a special check is need in
226    * startRegionOperation
227    */
228   public enum Operation {
229     ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
230     REPLAY_BATCH_MUTATE, COMPACT_REGION
231   }
232 
233   //////////////////////////////////////////////////////////////////////////////
234   // Members
235   //////////////////////////////////////////////////////////////////////////////
236 
237   // map from a locked row to the context for that lock including:
238   // - CountDownLatch for threads waiting on that row
239   // - the thread that owns the lock (allow reentrancy)
240   // - reference count of (reentrant) locks held by the thread
241   // - the row itself
242   private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
243       new ConcurrentHashMap<HashedBytes, RowLockContext>();
244 
245   protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
246       Bytes.BYTES_RAWCOMPARATOR);
247 
248   // TODO: account for each registered handler in HeapSize computation
249   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
250 
251   public final AtomicLong memstoreSize = new AtomicLong(0);
252 
253   // Debug possible data loss due to WAL off
254   final Counter numMutationsWithoutWAL = new Counter();
255   final Counter dataInMemoryWithoutWAL = new Counter();
256 
257   // Debug why CAS operations are taking a while.
258   final Counter checkAndMutateChecksPassed = new Counter();
259   final Counter checkAndMutateChecksFailed = new Counter();
260 
261   //Number of requests
262   final Counter readRequestsCount = new Counter();
263   final Counter writeRequestsCount = new Counter();
264 
265   // Compaction counters
266   final AtomicLong compactionsFinished = new AtomicLong(0L);
267   final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
268   final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
269 
270 
271   private final HLog log;
272   private final HRegionFileSystem fs;
273   protected final Configuration conf;
274   private final Configuration baseConf;
275   private final KeyValue.KVComparator comparator;
276   private final int rowLockWaitDuration;
277   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
278 
279   // The internal wait duration to acquire a lock before read/update
280   // from the region. It is not per row. The purpose of this wait time
281   // is to avoid waiting a long time while the region is busy, so that
282   // we can release the IPC handler soon enough to improve the
283   // availability of the region server. It can be adjusted by
284   // tuning configuration "hbase.busy.wait.duration".
285   final long busyWaitDuration;
286   static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
287 
288   // If updating multiple rows in one call, wait longer,
289   // i.e. waiting for busyWaitDuration * # of rows. However,
290   // we can limit the max multiplier.
291   final int maxBusyWaitMultiplier;
292 
293   // Max busy wait duration. There is no point to wait longer than the RPC
294   // purge timeout, when a RPC call will be terminated by the RPC engine.
295   final long maxBusyWaitDuration;
296 
297   // negative number indicates infinite timeout
298   static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
299   final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
300 
301   private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
302 
303   /**
304    * The sequence ID that was encountered when this region was opened.
305    */
306   private long openSeqNum = HConstants.NO_SEQNUM;
307 
308   /**
309    * The default setting for whether to enable on-demand CF loading for
310    * scan requests to this region. Requests can override it.
311    */
312   private boolean isLoadingCfsOnDemandDefault = false;
313 
314   private final AtomicInteger majorInProgress = new AtomicInteger(0);
315   private final AtomicInteger minorInProgress = new AtomicInteger(0);
316 
317   //
318   // Context: During replay we want to ensure that we do not lose any data. So, we
319   // have to be conservative in how we replay logs. For each store, we calculate
320   // the maxSeqId up to which the store was flushed. And, skip the edits which
321   // are equal to or lower than maxSeqId for each store.
322   // The following map is populated when opening the region
323   Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
324 
325   /**
326    * Config setting for whether to allow writes when a region is in recovering or not.
327    */
328   private boolean disallowWritesInRecovering = false;
329 
330   // when a region is in recovering state, it can only accept writes not reads
331   private volatile boolean isRecovering = false;
332 
333   /**
334    * @return The smallest mvcc readPoint across all the scanners in this
335    * region. Writes older than this readPoint, are included  in every
336    * read operation.
337    */
338   public long getSmallestReadPoint() {
339     long minimumReadPoint;
340     // We need to ensure that while we are calculating the smallestReadPoint
341     // no new RegionScanners can grab a readPoint that we are unaware of.
342     // We achieve this by synchronizing on the scannerReadPoints object.
343     synchronized(scannerReadPoints) {
344       minimumReadPoint = mvcc.memstoreReadPoint();
345 
346       for (Long readPoint: this.scannerReadPoints.values()) {
347         if (readPoint < minimumReadPoint) {
348           minimumReadPoint = readPoint;
349         }
350       }
351     }
352     return minimumReadPoint;
353   }
354   /*
355    * Data structure of write state flags used coordinating flushes,
356    * compactions and closes.
357    */
358   static class WriteState {
359     // Set while a memstore flush is happening.
360     volatile boolean flushing = false;
361     // Set when a flush has been requested.
362     volatile boolean flushRequested = false;
363     // Number of compactions running.
364     volatile int compacting = 0;
365     // Gets set in close. If set, cannot compact or flush again.
366     volatile boolean writesEnabled = true;
367     // Set if region is read-only
368     volatile boolean readOnly = false;
369 
370     /**
371      * Set flags that make this region read-only.
372      *
373      * @param onOff flip value for region r/o setting
374      */
375     synchronized void setReadOnly(final boolean onOff) {
376       this.writesEnabled = !onOff;
377       this.readOnly = onOff;
378     }
379 
380     boolean isReadOnly() {
381       return this.readOnly;
382     }
383 
384     boolean isFlushRequested() {
385       return this.flushRequested;
386     }
387 
388     static final long HEAP_SIZE = ClassSize.align(
389         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
390   }
391 
392   /**
393    * Objects from this class are created when flushing to describe all the different states that
394    * that method ends up in. The Result enum describes those states. The sequence id should only
395    * be specified if the flush was successful, and the failure message should only be speficied
396    * if it didn't flush.
397    */
398   public static class FlushResult {
399     enum Result {
400       FLUSHED_NO_COMPACTION_NEEDED,
401       FLUSHED_COMPACTION_NEEDED,
402       // Special case where a flush didn't run because there's nothing in the memstores. Used when
403       // bulk loading to know when we can still load even if a flush didn't happen.
404       CANNOT_FLUSH_MEMSTORE_EMPTY,
405       CANNOT_FLUSH
406       // Be careful adding more to this enum, look at the below methods to make sure
407     }
408 
409     final Result result;
410     final String failureReason;
411     final long flushSequenceId;
412 
413     /**
414      * Convenience constructor to use when the flush is successful, the failure message is set to
415      * null.
416      * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
417      * @param flushSequenceId Generated sequence id that comes right after the edits in the
418      *                        memstores.
419      */
420     FlushResult(Result result, long flushSequenceId) {
421       this(result, flushSequenceId, null);
422       assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
423           .FLUSHED_COMPACTION_NEEDED;
424     }
425 
426     /**
427      * Convenience constructor to use when we cannot flush.
428      * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
429      * @param failureReason Reason why we couldn't flush.
430      */
431     FlushResult(Result result, String failureReason) {
432       this(result, -1, failureReason);
433       assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
434     }
435 
436     /**
437      * Constructor with all the parameters.
438      * @param result Any of the Result.
439      * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
440      * @param failureReason Reason why we couldn't flush, or null.
441      */
442     FlushResult(Result result, long flushSequenceId, String failureReason) {
443       this.result = result;
444       this.flushSequenceId = flushSequenceId;
445       this.failureReason = failureReason;
446     }
447 
448     /**
449      * Convenience method, the equivalent of checking if result is
450      * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
451      * @return true if the memstores were flushed, else false.
452      */
453     public boolean isFlushSucceeded() {
454       return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
455           .FLUSHED_COMPACTION_NEEDED;
456     }
457 
458     /**
459      * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
460      * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
461      */
462     public boolean isCompactionNeeded() {
463       return result == Result.FLUSHED_COMPACTION_NEEDED;
464     }
465   }
466 
467   final WriteState writestate = new WriteState();
468 
469   long memstoreFlushSize;
470   final long timestampSlop;
471   final long rowProcessorTimeout;
472   private volatile long lastFlushTime;
473   final RegionServerServices rsServices;
474   private RegionServerAccounting rsAccounting;
475   private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
476   private long flushCheckInterval;
477   // flushPerChanges is to prevent too many changes in memstore    
478   private long flushPerChanges;
479   private long blockingMemStoreSize;
480   final long threadWakeFrequency;
481   // Used to guard closes
482   final ReentrantReadWriteLock lock =
483     new ReentrantReadWriteLock();
484 
485   // Stop updates lock
486   private final ReentrantReadWriteLock updatesLock =
487     new ReentrantReadWriteLock();
488   private boolean splitRequest;
489   private byte[] explicitSplitPoint = null;
490 
491   private final MultiVersionConsistencyControl mvcc =
492       new MultiVersionConsistencyControl();
493 
494   // Coprocessor host
495   private RegionCoprocessorHost coprocessorHost;
496 
497   private HTableDescriptor htableDescriptor = null;
498   private RegionSplitPolicy splitPolicy;
499 
500   private final MetricsRegion metricsRegion;
501   private final MetricsRegionWrapperImpl metricsRegionWrapper;
502   private final Durability durability;
503 
504   /**
505    * HRegion constructor. This constructor should only be used for testing and
506    * extensions.  Instances of HRegion should be instantiated with the
507    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
508    *
509    * @param tableDir qualified path of directory where region should be located,
510    * usually the table directory.
511    * @param log The HLog is the outbound log for any updates to the HRegion
512    * (There's a single HLog for all the HRegions on a single HRegionServer.)
513    * The log file is a logfile from the previous execution that's
514    * custom-computed for this HRegion. The HRegionServer computes and sorts the
515    * appropriate log info for this HRegion. If there is a previous log file
516    * (implying that the HRegion has been written-to before), then read it from
517    * the supplied path.
518    * @param fs is the filesystem.
519    * @param confParam is global configuration settings.
520    * @param regionInfo - HRegionInfo that describes the region
521    * is new), then read them from the supplied path.
522    * @param htd the table descriptor
523    * @param rsServices reference to {@link RegionServerServices} or null
524    */
525   @Deprecated
526   public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
527       final Configuration confParam, final HRegionInfo regionInfo,
528       final HTableDescriptor htd, final RegionServerServices rsServices) {
529     this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
530       log, confParam, htd, rsServices);
531   }
532 
533   /**
534    * HRegion constructor. This constructor should only be used for testing and
535    * extensions.  Instances of HRegion should be instantiated with the
536    * {@link HRegion#createHRegion} or {@link HRegion#openHRegion} method.
537    *
538    * @param fs is the filesystem.
539    * @param log The HLog is the outbound log for any updates to the HRegion
540    * (There's a single HLog for all the HRegions on a single HRegionServer.)
541    * The log file is a logfile from the previous execution that's
542    * custom-computed for this HRegion. The HRegionServer computes and sorts the
543    * appropriate log info for this HRegion. If there is a previous log file
544    * (implying that the HRegion has been written-to before), then read it from
545    * the supplied path.
546    * @param confParam is global configuration settings.
547    * @param htd the table descriptor
548    * @param rsServices reference to {@link RegionServerServices} or null
549    */
550   public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
551       final HTableDescriptor htd, final RegionServerServices rsServices) {
552     if (htd == null) {
553       throw new IllegalArgumentException("Need table descriptor");
554     }
555 
556     if (confParam instanceof CompoundConfiguration) {
557       throw new IllegalArgumentException("Need original base configuration");
558     }
559 
560     this.comparator = fs.getRegionInfo().getComparator();
561     this.log = log;
562     this.fs = fs;
563 
564     // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor
565     this.baseConf = confParam;
566     this.conf = new CompoundConfiguration()
567       .add(confParam)
568       .addStringMap(htd.getConfiguration())
569       .addWritableMap(htd.getValues());
570     this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
571         DEFAULT_CACHE_FLUSH_INTERVAL);
572     this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
573     if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
574       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
575           + MAX_FLUSH_PER_CHANGES);
576     }
577     
578     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
579                     DEFAULT_ROWLOCK_WAIT_DURATION);
580 
581     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
582     this.htableDescriptor = htd;
583     this.rsServices = rsServices;
584     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
585     setHTableSpecificConf();
586     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
587 
588     this.busyWaitDuration = conf.getLong(
589       "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
590     this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
591     if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
592       throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
593         + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
594         + maxBusyWaitMultiplier + "). Their product should be positive");
595     }
596     this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
597       conf.getLong("ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
598 
599     /*
600      * timestamp.slop provides a server-side constraint on the timestamp. This
601      * assumes that you base your TS around currentTimeMillis(). In this case,
602      * throw an error to the user if the user-specified TS is newer than now +
603      * slop. LATEST_TIMESTAMP == don't use this functionality
604      */
605     this.timestampSlop = conf.getLong(
606         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
607         HConstants.LATEST_TIMESTAMP);
608 
609     /**
610      * Timeout for the process time in processRowsWithLocks().
611      * Use -1 to switch off time bound.
612      */
613     this.rowProcessorTimeout = conf.getLong(
614         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
615     this.durability = htd.getDurability() == Durability.USE_DEFAULT
616         ? DEFAULT_DURABLITY
617         : htd.getDurability();
618     if (rsServices != null) {
619       this.rsAccounting = this.rsServices.getRegionServerAccounting();
620       // don't initialize coprocessors if not running within a regionserver
621       // TODO: revisit if coprocessors should load in other cases
622       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
623       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
624       this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
625 
626       Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
627       String encodedName = getRegionInfo().getEncodedName();
628       if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
629         this.isRecovering = true;
630         recoveringRegions.put(encodedName, this);
631       }
632     } else {
633       this.metricsRegionWrapper = null;
634       this.metricsRegion = null;
635     }
636     if (LOG.isDebugEnabled()) {
637       // Write out region name as string and its encoded name.
638       LOG.debug("Instantiated " + this);
639     }
640 
641     // by default, we allow writes against a region when it's in recovering
642     this.disallowWritesInRecovering =
643         conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
644           HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
645   }
646 
647   void setHTableSpecificConf() {
648     if (this.htableDescriptor == null) return;
649     long flushSize = this.htableDescriptor.getMemStoreFlushSize();
650 
651     if (flushSize <= 0) {
652       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
653         HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
654     }
655     this.memstoreFlushSize = flushSize;
656     this.blockingMemStoreSize = this.memstoreFlushSize *
657         conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
658   }
659 
660   /**
661    * Initialize this region.
662    * Used only by tests and SplitTransaction to reopen the region.
663    * You should use createHRegion() or openHRegion()
664    * @return What the next sequence (edit) id should be.
665    * @throws IOException e
666    * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
667    */
668   @Deprecated
669   public long initialize() throws IOException {
670     return initialize(null);
671   }
672 
673   /**
674    * Initialize this region.
675    *
676    * @param reporter Tickle every so often if initialize is taking a while.
677    * @return What the next sequence (edit) id should be.
678    * @throws IOException e
679    */
680   private long initialize(final CancelableProgressable reporter) throws IOException {
681     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
682     long nextSeqId = -1;
683     try {
684       nextSeqId = initializeRegionInternals(reporter, status);
685       return nextSeqId;
686     } finally {
687       // nextSeqid will be -1 if the initialization fails.
688       // At least it will be 0 otherwise.
689       if (nextSeqId == -1) {
690         status
691             .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
692       }
693     }
694   }
695 
696   private long initializeRegionInternals(final CancelableProgressable reporter,
697       final MonitoredTask status) throws IOException, UnsupportedEncodingException {
698     if (coprocessorHost != null) {
699       status.setStatus("Running coprocessor pre-open hook");
700       coprocessorHost.preOpen();
701     }
702 
703     // Write HRI to a file in case we need to recover hbase:meta
704     status.setStatus("Writing region info on filesystem");
705     fs.checkRegionInfoOnFilesystem();
706 
707     // Remove temporary data left over from old regions
708     status.setStatus("Cleaning up temporary data from old regions");
709     fs.cleanupTempDir();
710 
711     // Initialize all the HStores
712     status.setStatus("Initializing all the Stores");
713     long maxSeqId = initializeRegionStores(reporter, status);
714 
715     status.setStatus("Cleaning up detritus from prior splits");
716     // Get rid of any splits or merges that were lost in-progress.  Clean out
717     // these directories here on open.  We may be opening a region that was
718     // being split but we crashed in the middle of it all.
719     fs.cleanupAnySplitDetritus();
720     fs.cleanupMergesDir();
721 
722     this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
723     this.writestate.flushRequested = false;
724     this.writestate.compacting = 0;
725 
726     // Initialize split policy
727     this.splitPolicy = RegionSplitPolicy.create(this, conf);
728 
729     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
730     // Use maximum of log sequenceid or that which was found in stores
731     // (particularly if no recovered edits, seqid will be -1).
732     long nextSeqid = maxSeqId + 1;
733     if (this.isRecovering) {
734       // In distributedLogReplay mode, we don't know the last change sequence number because region
735       // is opened before recovery completes. So we add a safety bumper to avoid new sequence number
736       // overlaps used sequence numbers
737       nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million
738     }
739     LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
740       "; next sequenceid=" + nextSeqid);
741 
742     // A region can be reopened if failed a split; reset flags
743     this.closing.set(false);
744     this.closed.set(false);
745 
746     if (coprocessorHost != null) {
747       status.setStatus("Running coprocessor post-open hooks");
748       coprocessorHost.postOpen();
749     }
750 
751     status.markComplete("Region opened successfully");
752     return nextSeqid;
753   }
754 
755   private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
756       throws IOException, UnsupportedEncodingException {
757     // Load in all the HStores.
758 
759     long maxSeqId = -1;
760     // initialized to -1 so that we pick up MemstoreTS from column families
761     long maxMemstoreTS = -1;
762 
763     if (!htableDescriptor.getFamilies().isEmpty()) {
764       // initialize the thread pool for opening stores in parallel.
765       ThreadPoolExecutor storeOpenerThreadPool =
766         getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
767       CompletionService<HStore> completionService =
768         new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
769 
770       // initialize each store in parallel
771       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
772         status.setStatus("Instantiating store for column family " + family);
773         completionService.submit(new Callable<HStore>() {
774           @Override
775           public HStore call() throws IOException {
776             return instantiateHStore(family);
777           }
778         });
779       }
780       boolean allStoresOpened = false;
781       try {
782         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
783           Future<HStore> future = completionService.take();
784           HStore store = future.get();
785           this.stores.put(store.getColumnFamilyName().getBytes(), store);
786 
787           long storeMaxSequenceId = store.getMaxSequenceId();
788           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
789               storeMaxSequenceId);
790           if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
791             maxSeqId = storeMaxSequenceId;
792           }
793           long maxStoreMemstoreTS = store.getMaxMemstoreTS();
794           if (maxStoreMemstoreTS > maxMemstoreTS) {
795             maxMemstoreTS = maxStoreMemstoreTS;
796           }
797         }
798         allStoresOpened = true;
799       } catch (InterruptedException e) {
800         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
801       } catch (ExecutionException e) {
802         throw new IOException(e.getCause());
803       } finally {
804         storeOpenerThreadPool.shutdownNow();
805         if (!allStoresOpened) {
806           // something went wrong, close all opened stores
807           LOG.error("Could not initialize all stores for the region=" + this);
808           for (Store store : this.stores.values()) {
809             try {
810               store.close();
811             } catch (IOException e) { 
812               LOG.warn(e.getMessage());
813             }
814           }
815         }
816       }
817     }
818     mvcc.initialize(maxMemstoreTS + 1);
819     // Recover any edits if available.
820     maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
821         this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
822     return maxSeqId;
823   }
824 
825   /**
826    * @return True if this region has references.
827    */
828   public boolean hasReferences() {
829     for (Store store : this.stores.values()) {
830       if (store.hasReferences()) return true;
831     }
832     return false;
833   }
834 
835   /**
836    * This function will return the HDFS blocks distribution based on the data
837    * captured when HFile is created
838    * @return The HDFS blocks distribution for the region.
839    */
840   public HDFSBlocksDistribution getHDFSBlocksDistribution() {
841     HDFSBlocksDistribution hdfsBlocksDistribution =
842       new HDFSBlocksDistribution();
843     synchronized (this.stores) {
844       for (Store store : this.stores.values()) {
845         for (StoreFile sf : store.getStorefiles()) {
846           HDFSBlocksDistribution storeFileBlocksDistribution =
847             sf.getHDFSBlockDistribution();
848           hdfsBlocksDistribution.add(storeFileBlocksDistribution);
849         }
850       }
851     }
852     return hdfsBlocksDistribution;
853   }
854 
855   /**
856    * This is a helper function to compute HDFS block distribution on demand
857    * @param conf configuration
858    * @param tableDescriptor HTableDescriptor of the table
859    * @param regionInfo encoded name of the region
860    * @return The HDFS blocks distribution for the given region.
861    * @throws IOException
862    */
863   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
864       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
865     Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
866     return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
867   }
868 
869   /**
870    * This is a helper function to compute HDFS block distribution on demand
871    * @param conf configuration
872    * @param tableDescriptor HTableDescriptor of the table
873    * @param regionInfo encoded name of the region
874    * @param tablePath the table directory
875    * @return The HDFS blocks distribution for the given region.
876    * @throws IOException
877    */
878   public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
879       final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo,  Path tablePath)
880       throws IOException {
881     HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
882     FileSystem fs = tablePath.getFileSystem(conf);
883 
884     HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
885     for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
886       Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
887       if (storeFiles == null) continue;
888 
889       for (StoreFileInfo storeFileInfo : storeFiles) {
890         hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
891       }
892     }
893     return hdfsBlocksDistribution;
894   }
895 
896   public AtomicLong getMemstoreSize() {
897     return memstoreSize;
898   }
899 
900   /**
901    * Increase the size of mem store in this region and the size of global mem
902    * store
903    * @param memStoreSize
904    * @return the size of memstore in this region
905    */
906   public long addAndGetGlobalMemstoreSize(long memStoreSize) {
907     if (this.rsAccounting != null) {
908       rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
909     }
910     return this.memstoreSize.addAndGet(memStoreSize);
911   }
912 
913   /** @return a HRegionInfo object for this region */
914   public HRegionInfo getRegionInfo() {
915     return this.fs.getRegionInfo();
916   }
917 
918   /**
919    * @return Instance of {@link RegionServerServices} used by this HRegion.
920    * Can be null.
921    */
922   RegionServerServices getRegionServerServices() {
923     return this.rsServices;
924   }
925 
926   /** @return readRequestsCount for this region */
927   long getReadRequestsCount() {
928     return this.readRequestsCount.get();
929   }
930 
931   /** @return writeRequestsCount for this region */
932   long getWriteRequestsCount() {
933     return this.writeRequestsCount.get();
934   }
935 
936   MetricsRegion getMetrics() {
937     return metricsRegion;
938   }
939 
940   /** @return true if region is closed */
941   public boolean isClosed() {
942     return this.closed.get();
943   }
944 
945   /**
946    * @return True if closing process has started.
947    */
948   public boolean isClosing() {
949     return this.closing.get();
950   }
951 
952   /**
953    * Reset recovering state of current region
954    * @param newState
955    */
956   public void setRecovering(boolean newState) {
957     boolean wasRecovering = this.isRecovering;
958     this.isRecovering = newState;
959     if (wasRecovering && !isRecovering) {
960       // Call only when log replay is over.
961       coprocessorHost.postLogReplay();
962     }
963   }
964 
965   /**
966    * @return True if current region is in recovering
967    */
968   public boolean isRecovering() {
969     return this.isRecovering;
970   }
971 
972   /** @return true if region is available (not closed and not closing) */
973   public boolean isAvailable() {
974     return !isClosed() && !isClosing();
975   }
976 
977   /** @return true if region is splittable */
978   public boolean isSplittable() {
979     return isAvailable() && !hasReferences();
980   }
981 
982   /**
983    * @return true if region is mergeable
984    */
985   public boolean isMergeable() {
986     if (!isAvailable()) {
987       LOG.debug("Region " + this.getRegionNameAsString()
988           + " is not mergeable because it is closing or closed");
989       return false;
990     }
991     if (hasReferences()) {
992       LOG.debug("Region " + this.getRegionNameAsString()
993           + " is not mergeable because it has references");
994       return false;
995     }
996 
997     return true;
998   }
999 
1000   public boolean areWritesEnabled() {
1001     synchronized(this.writestate) {
1002       return this.writestate.writesEnabled;
1003     }
1004   }
1005 
1006    public MultiVersionConsistencyControl getMVCC() {
1007      return mvcc;
1008    }
1009 
1010    /*
1011     * Returns readpoint considering given IsolationLevel
1012     */
1013    public long getReadpoint(IsolationLevel isolationLevel) {
1014      if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1015        // This scan can read even uncommitted transactions
1016        return Long.MAX_VALUE;
1017      }
1018      return mvcc.memstoreReadPoint();
1019    }
1020 
1021    public boolean isLoadingCfsOnDemandDefault() {
1022      return this.isLoadingCfsOnDemandDefault;
1023    }
1024 
1025   /**
1026    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
1027    * service any more calls.
1028    *
1029    * <p>This method could take some time to execute, so don't call it from a
1030    * time-sensitive thread.
1031    *
1032    * @return Vector of all the storage files that the HRegion's component
1033    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
1034    * vector if already closed and null if judged that it should not close.
1035    *
1036    * @throws IOException e
1037    */
1038   public Map<byte[], List<StoreFile>> close() throws IOException {
1039     return close(false);
1040   }
1041 
1042   private final Object closeLock = new Object();
1043 
1044   /** Conf key for the periodic flush interval */
1045   public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1046       "hbase.regionserver.optionalcacheflushinterval";
1047   /** Default interval for the memstore flush */
1048   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1049 
1050   /** Conf key to force a flush if there are already enough changes for one region in memstore */
1051   public static final String MEMSTORE_FLUSH_PER_CHANGES =
1052       "hbase.regionserver.flush.per.changes";
1053   public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
1054   /**
1055    * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
1056    * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
1057    */
1058   public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
1059 
1060   /**
1061    * Close down this HRegion.  Flush the cache unless abort parameter is true,
1062    * Shut down each HStore, don't service any more calls.
1063    *
1064    * This method could take some time to execute, so don't call it from a
1065    * time-sensitive thread.
1066    *
1067    * @param abort true if server is aborting (only during testing)
1068    * @return Vector of all the storage files that the HRegion's component
1069    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
1070    * we are not to close at this time or we are already closed.
1071    *
1072    * @throws IOException e
1073    */
1074   public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1075     // Only allow one thread to close at a time. Serialize them so dual
1076     // threads attempting to close will run up against each other.
1077     MonitoredTask status = TaskMonitor.get().createStatus(
1078         "Closing region " + this +
1079         (abort ? " due to abort" : ""));
1080 
1081     status.setStatus("Waiting for close lock");
1082     try {
1083       synchronized (closeLock) {
1084         return doClose(abort, status);
1085       }
1086     } finally {
1087       status.cleanup();
1088     }
1089   }
1090 
1091   private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1092       throws IOException {
1093     if (isClosed()) {
1094       LOG.warn("Region " + this + " already closed");
1095       return null;
1096     }
1097 
1098     if (coprocessorHost != null) {
1099       status.setStatus("Running coprocessor pre-close hooks");
1100       this.coprocessorHost.preClose(abort);
1101     }
1102 
1103     status.setStatus("Disabling compacts and flushes for region");
1104     synchronized (writestate) {
1105       // Disable compacting and flushing by background threads for this
1106       // region.
1107       writestate.writesEnabled = false;
1108       LOG.debug("Closing " + this + ": disabling compactions & flushes");
1109       waitForFlushesAndCompactions();
1110     }
1111     // If we were not just flushing, is it worth doing a preflush...one
1112     // that will clear out of the bulk of the memstore before we put up
1113     // the close flag?
1114     if (!abort && worthPreFlushing()) {
1115       status.setStatus("Pre-flushing region before close");
1116       LOG.info("Running close preflush of " + this.getRegionNameAsString());
1117       try {
1118         internalFlushcache(status);
1119       } catch (IOException ioe) {
1120         // Failed to flush the region. Keep going.
1121         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1122       }
1123     }
1124 
1125     this.closing.set(true);
1126     status.setStatus("Disabling writes for close");
1127     // block waiting for the lock for closing
1128     lock.writeLock().lock();
1129     try {
1130       if (this.isClosed()) {
1131         status.abort("Already got closed by another process");
1132         // SplitTransaction handles the null
1133         return null;
1134       }
1135       LOG.debug("Updates disabled for region " + this);
1136       // Don't flush the cache if we are aborting
1137       if (!abort) {
1138         int flushCount = 0;
1139         while (this.getMemstoreSize().get() > 0) {
1140           try {
1141             if (flushCount++ > 0) {
1142               int actualFlushes = flushCount - 1;
1143               if (actualFlushes > 5) {
1144                 // If we tried 5 times and are unable to clear memory, abort
1145                 // so we do not lose data
1146                 throw new DroppedSnapshotException("Failed clearing memory after " +
1147                   actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1148               } 
1149               LOG.info("Running extra flush, " + actualFlushes +
1150                 " (carrying snapshot?) " + this);
1151             }
1152             internalFlushcache(status);
1153           } catch (IOException ioe) {
1154             status.setStatus("Failed flush " + this + ", putting online again");
1155             synchronized (writestate) {
1156               writestate.writesEnabled = true;
1157             }
1158             // Have to throw to upper layers.  I can't abort server from here.
1159             throw ioe;
1160           }
1161         }
1162       }
1163 
1164       Map<byte[], List<StoreFile>> result =
1165         new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1166       if (!stores.isEmpty()) {
1167         // initialize the thread pool for closing stores in parallel.
1168         ThreadPoolExecutor storeCloserThreadPool =
1169           getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1170         CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1171           new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1172 
1173         // close each store in parallel
1174         for (final Store store : stores.values()) {
1175           assert abort? true: store.getFlushableSize() == 0;
1176           completionService
1177               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1178                 @Override
1179                 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1180                   return new Pair<byte[], Collection<StoreFile>>(
1181                     store.getFamily().getName(), store.close());
1182                 }
1183               });
1184         }
1185         try {
1186           for (int i = 0; i < stores.size(); i++) {
1187             Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1188             Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1189             List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1190             if (familyFiles == null) {
1191               familyFiles = new ArrayList<StoreFile>();
1192               result.put(storeFiles.getFirst(), familyFiles);
1193             }
1194             familyFiles.addAll(storeFiles.getSecond());
1195           }
1196         } catch (InterruptedException e) {
1197           throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1198         } catch (ExecutionException e) {
1199           throw new IOException(e.getCause());
1200         } finally {
1201           storeCloserThreadPool.shutdownNow();
1202         }
1203       }
1204       this.closed.set(true);
1205       if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1206       if (coprocessorHost != null) {
1207         status.setStatus("Running coprocessor post-close hooks");
1208         this.coprocessorHost.postClose(abort);
1209       }
1210       if ( this.metricsRegion != null) {
1211         this.metricsRegion.close();
1212       }
1213       if ( this.metricsRegionWrapper != null) {
1214         Closeables.closeQuietly(this.metricsRegionWrapper);
1215       }
1216       status.markComplete("Closed");
1217       LOG.info("Closed " + this);
1218       return result;
1219     } finally {
1220       lock.writeLock().unlock();
1221     }
1222   }
1223 
1224   /**
1225    * Wait for all current flushes and compactions of the region to complete.
1226    * <p>
1227    * Exposed for TESTING.
1228    */
1229   public void waitForFlushesAndCompactions() {
1230     synchronized (writestate) {
1231       while (writestate.compacting > 0 || writestate.flushing) {
1232         LOG.debug("waiting for " + writestate.compacting + " compactions"
1233             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1234         try {
1235           writestate.wait();
1236         } catch (InterruptedException iex) {
1237           // essentially ignore and propagate the interrupt back up
1238           Thread.currentThread().interrupt();
1239         }
1240       }
1241     }
1242   }
1243 
1244   protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1245       final String threadNamePrefix) {
1246     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1247     int maxThreads = Math.min(numStores,
1248         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1249             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1250     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1251   }
1252 
1253   protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1254       final String threadNamePrefix) {
1255     int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1256     int maxThreads = Math.max(1,
1257         conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1258             HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1259             / numStores);
1260     return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1261   }
1262 
1263   static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1264       final String threadNamePrefix) {
1265     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1266       new ThreadFactory() {
1267         private int count = 1;
1268 
1269         @Override
1270         public Thread newThread(Runnable r) {
1271           return new Thread(r, threadNamePrefix + "-" + count++);
1272         }
1273       });
1274   }
1275 
1276    /**
1277     * @return True if its worth doing a flush before we put up the close flag.
1278     */
1279   private boolean worthPreFlushing() {
1280     return this.memstoreSize.get() >
1281       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1282   }
1283 
1284   //////////////////////////////////////////////////////////////////////////////
1285   // HRegion accessors
1286   //////////////////////////////////////////////////////////////////////////////
1287 
1288   /** @return start key for region */
1289   public byte [] getStartKey() {
1290     return this.getRegionInfo().getStartKey();
1291   }
1292 
1293   /** @return end key for region */
1294   public byte [] getEndKey() {
1295     return this.getRegionInfo().getEndKey();
1296   }
1297 
1298   /** @return region id */
1299   public long getRegionId() {
1300     return this.getRegionInfo().getRegionId();
1301   }
1302 
1303   /** @return region name */
1304   public byte [] getRegionName() {
1305     return this.getRegionInfo().getRegionName();
1306   }
1307 
1308   /** @return region name as string for logging */
1309   public String getRegionNameAsString() {
1310     return this.getRegionInfo().getRegionNameAsString();
1311   }
1312 
1313   /** @return HTableDescriptor for this region */
1314   public HTableDescriptor getTableDesc() {
1315     return this.htableDescriptor;
1316   }
1317 
1318   /** @return HLog in use for this region */
1319   public HLog getLog() {
1320     return this.log;
1321   }
1322 
1323   /**
1324    * A split takes the config from the parent region & passes it to the daughter
1325    * region's constructor. If 'conf' was passed, you would end up using the HTD
1326    * of the parent region in addition to the new daughter HTD. Pass 'baseConf'
1327    * to the daughter regions to avoid this tricky dedupe problem.
1328    * @return Configuration object
1329    */
1330   Configuration getBaseConf() {
1331     return this.baseConf;
1332   }
1333 
1334   /** @return {@link FileSystem} being used by this region */
1335   public FileSystem getFilesystem() {
1336     return fs.getFileSystem();
1337   }
1338 
1339   /** @return the {@link HRegionFileSystem} used by this region */
1340   public HRegionFileSystem getRegionFileSystem() {
1341     return this.fs;
1342   }
1343 
1344   /** @return the last time the region was flushed */
1345   public long getLastFlushTime() {
1346     return this.lastFlushTime;
1347   }
1348 
1349   //////////////////////////////////////////////////////////////////////////////
1350   // HRegion maintenance.
1351   //
1352   // These methods are meant to be called periodically by the HRegionServer for
1353   // upkeep.
1354   //////////////////////////////////////////////////////////////////////////////
1355 
1356   /** @return returns size of largest HStore. */
1357   public long getLargestHStoreSize() {
1358     long size = 0;
1359     for (Store h : stores.values()) {
1360       long storeSize = h.getSize();
1361       if (storeSize > size) {
1362         size = storeSize;
1363       }
1364     }
1365     return size;
1366   }
1367 
1368   /**
1369    * @return KeyValue Comparator
1370    */
1371   public KeyValue.KVComparator getComparator() {
1372     return this.comparator;
1373   }
1374 
1375   /*
1376    * Do preparation for pending compaction.
1377    * @throws IOException
1378    */
1379   protected void doRegionCompactionPrep() throws IOException {
1380   }
1381 
1382   void triggerMajorCompaction() {
1383     for (Store h : stores.values()) {
1384       h.triggerMajorCompaction();
1385     }
1386   }
1387 
1388   /**
1389    * This is a helper function that compact all the stores synchronously
1390    * It is used by utilities and testing
1391    *
1392    * @param majorCompaction True to force a major compaction regardless of thresholds
1393    * @throws IOException e
1394    */
1395   public void compactStores(final boolean majorCompaction)
1396   throws IOException {
1397     if (majorCompaction) {
1398       this.triggerMajorCompaction();
1399     }
1400     compactStores();
1401   }
1402 
1403   /**
1404    * This is a helper function that compact all the stores synchronously
1405    * It is used by utilities and testing
1406    *
1407    * @throws IOException e
1408    */
1409   public void compactStores() throws IOException {
1410     for (Store s : getStores().values()) {
1411       CompactionContext compaction = s.requestCompaction();
1412       if (compaction != null) {
1413         compact(compaction, s);
1414       }
1415     }
1416   }
1417 
1418   /*
1419    * Called by compaction thread and after region is opened to compact the
1420    * HStores if necessary.
1421    *
1422    * <p>This operation could block for a long time, so don't call it from a
1423    * time-sensitive thread.
1424    *
1425    * Note that no locking is necessary at this level because compaction only
1426    * conflicts with a region split, and that cannot happen because the region
1427    * server does them sequentially and not in parallel.
1428    *
1429    * @param cr Compaction details, obtained by requestCompaction()
1430    * @return whether the compaction completed
1431    * @throws IOException e
1432    */
1433   public boolean compact(CompactionContext compaction, Store store) throws IOException {
1434     assert compaction != null && compaction.hasSelection();
1435     assert !compaction.getRequest().getFiles().isEmpty();
1436     if (this.closing.get() || this.closed.get()) {
1437       LOG.debug("Skipping compaction on " + this + " because closing/closed");
1438       store.cancelRequestedCompaction(compaction);
1439       return false;
1440     }
1441     MonitoredTask status = null;
1442     boolean didPerformCompaction = false;
1443     // block waiting for the lock for compaction
1444     lock.readLock().lock();
1445     try {
1446       byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1447       if (stores.get(cf) != store) {
1448         LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1449             + " has been re-instantiated, cancel this compaction request. "
1450             + " It may be caused by the roll back of split transaction");
1451         return false;
1452       }
1453 
1454       status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1455       if (this.closed.get()) {
1456         String msg = "Skipping compaction on " + this + " because closed";
1457         LOG.debug(msg);
1458         status.abort(msg);
1459         return false;
1460       }
1461       boolean wasStateSet = false;
1462       try {
1463         synchronized (writestate) {
1464           if (writestate.writesEnabled) {
1465             wasStateSet = true;
1466             ++writestate.compacting;
1467           } else {
1468             String msg = "NOT compacting region " + this + ". Writes disabled.";
1469             LOG.info(msg);
1470             status.abort(msg);
1471             return false;
1472           }
1473         }
1474         LOG.info("Starting compaction on " + store + " in region " + this
1475             + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1476         doRegionCompactionPrep();
1477         try {
1478           status.setStatus("Compacting store " + store);
1479           didPerformCompaction = true;
1480           store.compact(compaction);
1481         } catch (InterruptedIOException iioe) {
1482           String msg = "compaction interrupted";
1483           LOG.info(msg, iioe);
1484           status.abort(msg);
1485           return false;
1486         }
1487       } finally {
1488         if (wasStateSet) {
1489           synchronized (writestate) {
1490             --writestate.compacting;
1491             if (writestate.compacting <= 0) {
1492               writestate.notifyAll();
1493             }
1494           }
1495         }
1496       }
1497       status.markComplete("Compaction complete");
1498       return true;
1499     } finally {
1500       try {
1501         if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
1502         if (status != null) status.cleanup();
1503       } finally {
1504         lock.readLock().unlock();
1505       }
1506     }
1507   }
1508 
1509   /**
1510    * Flush the cache.
1511    *
1512    * When this method is called the cache will be flushed unless:
1513    * <ol>
1514    *   <li>the cache is empty</li>
1515    *   <li>the region is closed.</li>
1516    *   <li>a flush is already in progress</li>
1517    *   <li>writes are disabled</li>
1518    * </ol>
1519    *
1520    * <p>This method may block for some time, so it should not be called from a
1521    * time-sensitive thread.
1522    *
1523    * @return true if the region needs compacting
1524    *
1525    * @throws IOException general io exceptions
1526    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1527    * because a Snapshot was not properly persisted.
1528    */
1529   public FlushResult flushcache() throws IOException {
1530     // fail-fast instead of waiting on the lock
1531     if (this.closing.get()) {
1532       String msg = "Skipping flush on " + this + " because closing";
1533       LOG.debug(msg);
1534       return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1535     }
1536     MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1537     status.setStatus("Acquiring readlock on region");
1538     // block waiting for the lock for flushing cache
1539     lock.readLock().lock();
1540     try {
1541       if (this.closed.get()) {
1542         String msg = "Skipping flush on " + this + " because closed";
1543         LOG.debug(msg);
1544         status.abort(msg);
1545         return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1546       }
1547       if (coprocessorHost != null) {
1548         status.setStatus("Running coprocessor pre-flush hooks");
1549         coprocessorHost.preFlush();
1550       }
1551       if (numMutationsWithoutWAL.get() > 0) {
1552         numMutationsWithoutWAL.set(0);
1553         dataInMemoryWithoutWAL.set(0);
1554       }
1555       synchronized (writestate) {
1556         if (!writestate.flushing && writestate.writesEnabled) {
1557           this.writestate.flushing = true;
1558         } else {
1559           if (LOG.isDebugEnabled()) {
1560             LOG.debug("NOT flushing memstore for region " + this
1561                 + ", flushing=" + writestate.flushing + ", writesEnabled="
1562                 + writestate.writesEnabled);
1563           }
1564           String msg = "Not flushing since "
1565               + (writestate.flushing ? "already flushing"
1566               : "writes not enabled");
1567           status.abort(msg);
1568           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1569         }
1570       }
1571       try {
1572         FlushResult fs = internalFlushcache(status);
1573 
1574         if (coprocessorHost != null) {
1575           status.setStatus("Running post-flush coprocessor hooks");
1576           coprocessorHost.postFlush();
1577         }
1578 
1579         status.markComplete("Flush successful");
1580         return fs;
1581       } finally {
1582         synchronized (writestate) {
1583           writestate.flushing = false;
1584           this.writestate.flushRequested = false;
1585           writestate.notifyAll();
1586         }
1587       }
1588     } finally {
1589       lock.readLock().unlock();
1590       status.cleanup();
1591     }
1592   }
1593 
1594   /**
1595    * Should the memstore be flushed now
1596    */
1597   boolean shouldFlush() {
1598     // This is a rough measure.
1599     if (this.completeSequenceId > 0 
1600           && (this.completeSequenceId + this.flushPerChanges < this.sequenceId.get())) {
1601       return true;
1602     }
1603     if (flushCheckInterval <= 0) { //disabled
1604       return false;
1605     }
1606     long now = EnvironmentEdgeManager.currentTimeMillis();
1607     //if we flushed in the recent past, we don't need to do again now
1608     if ((now - getLastFlushTime() < flushCheckInterval)) {
1609       return false;
1610     }
1611     //since we didn't flush in the recent past, flush now if certain conditions
1612     //are met. Return true on first such memstore hit.
1613     for (Store s : this.getStores().values()) {
1614       if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1615         // we have an old enough edit in the memstore, flush
1616         return true;
1617       }
1618     }
1619     return false;
1620   }
1621 
1622   /**
1623    * Flush the memstore.
1624    *
1625    * Flushing the memstore is a little tricky. We have a lot of updates in the
1626    * memstore, all of which have also been written to the log. We need to
1627    * write those updates in the memstore out to disk, while being able to
1628    * process reads/writes as much as possible during the flush operation. Also,
1629    * the log has to state clearly the point in time at which the memstore was
1630    * flushed. (That way, during recovery, we know when we can rely on the
1631    * on-disk flushed structures and when we have to recover the memstore from
1632    * the log.)
1633    *
1634    * <p>So, we have a three-step process:
1635    *
1636    * <ul><li>A. Flush the memstore to the on-disk stores, noting the current
1637    * sequence ID for the log.<li>
1638    *
1639    * <li>B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence
1640    * ID that was current at the time of memstore-flush.</li>
1641    *
1642    * <li>C. Get rid of the memstore structures that are now redundant, as
1643    * they've been flushed to the on-disk HStores.</li>
1644    * </ul>
1645    * <p>This method is protected, but can be accessed via several public
1646    * routes.
1647    *
1648    * <p> This method may block for some time.
1649    * @param status
1650    *
1651    * @return object describing the flush's state
1652    *
1653    * @throws IOException general io exceptions
1654    * @throws DroppedSnapshotException Thrown when replay of hlog is required
1655    * because a Snapshot was not properly persisted.
1656    */
1657   protected FlushResult internalFlushcache(MonitoredTask status)
1658       throws IOException {
1659     return internalFlushcache(this.log, -1, status);
1660   }
1661 
1662   /**
1663    * @param wal Null if we're NOT to go via hlog/wal.
1664    * @param myseqid The seqid to use if <code>wal</code> is null writing out
1665    * flush file.
1666    * @param status
1667    * @return true if the region needs compacting
1668    * @throws IOException
1669    * @see #internalFlushcache(MonitoredTask)
1670    */
1671   protected FlushResult internalFlushcache(
1672       final HLog wal, final long myseqid, MonitoredTask status)
1673   throws IOException {
1674     if (this.rsServices != null && this.rsServices.isAborted()) {
1675       // Don't flush when server aborting, it's unsafe
1676       throw new IOException("Aborting flush because server is abortted...");
1677     }
1678     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
1679     // Clear flush flag.
1680     // If nothing to flush, return and avoid logging start/stop flush.
1681     if (this.memstoreSize.get() <= 0) {
1682       if(LOG.isDebugEnabled()) {
1683         LOG.debug("Empty memstore size for the current region "+this);
1684       }
1685       return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
1686     }
1687     if (LOG.isDebugEnabled()) {
1688       LOG.debug("Started memstore flush for " + this +
1689         ", current region memstore size " +
1690         StringUtils.humanReadableInt(this.memstoreSize.get()) +
1691         ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1692     }
1693 
1694     // Stop updates while we snapshot the memstore of all stores. We only have
1695     // to do this for a moment.  Its quick.  The subsequent sequence id that
1696     // goes into the HLog after we've flushed all these snapshots also goes
1697     // into the info file that sits beside the flushed files.
1698     // We also set the memstore size to zero here before we allow updates
1699     // again so its value will represent the size of the updates received
1700     // during the flush
1701     MultiVersionConsistencyControl.WriteEntry w = null;
1702 
1703     // We have to take a write lock during snapshot, or else a write could
1704     // end up in both snapshot and memstore (makes it difficult to do atomic
1705     // rows then)
1706     status.setStatus("Obtaining lock to block concurrent updates");
1707     // block waiting for the lock for internal flush
1708     this.updatesLock.writeLock().lock();
1709     long totalFlushableSize = 0;
1710     status.setStatus("Preparing to flush by snapshotting stores");
1711     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1712     long flushSeqId = -1L;
1713     try {
1714       // Record the mvcc for all transactions in progress.
1715       w = mvcc.beginMemstoreInsert();
1716       mvcc.advanceMemstore(w);
1717       // check if it is not closing.
1718       if (wal != null) {
1719         if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
1720           String msg = "Flush will not be started for ["
1721               + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
1722           status.setStatus(msg);
1723           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1724         }
1725         flushSeqId = this.sequenceId.incrementAndGet();
1726       } else {
1727         // use the provided sequence Id as WAL is not being used for this flush.
1728         flushSeqId = myseqid;
1729       }
1730 
1731       for (Store s : stores.values()) {
1732         totalFlushableSize += s.getFlushableSize();
1733         storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1734       }
1735 
1736       // prepare flush (take a snapshot)
1737       for (StoreFlushContext flush : storeFlushCtxs) {
1738         flush.prepare();
1739       }
1740     } finally {
1741       this.updatesLock.writeLock().unlock();
1742     }
1743     String s = "Finished memstore snapshotting " + this +
1744       ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
1745     status.setStatus(s);
1746     if (LOG.isTraceEnabled()) LOG.trace(s);
1747 
1748     // sync unflushed WAL changes when deferred log sync is enabled
1749     // see HBASE-8208 for details
1750     if (wal != null && !shouldSyncLog()) {
1751       wal.sync();
1752     }
1753 
1754     // wait for all in-progress transactions to commit to HLog before
1755     // we can start the flush. This prevents
1756     // uncommitted transactions from being written into HFiles.
1757     // We have to block before we start the flush, otherwise keys that
1758     // were removed via a rollbackMemstore could be written to Hfiles.
1759     mvcc.waitForRead(w);
1760 
1761     s = "Flushing stores of " + this;
1762     status.setStatus(s);
1763     if (LOG.isTraceEnabled()) LOG.trace(s);
1764 
1765     // Any failure from here on out will be catastrophic requiring server
1766     // restart so hlog content can be replayed and put back into the memstore.
1767     // Otherwise, the snapshot content while backed up in the hlog, it will not
1768     // be part of the current running servers state.
1769     boolean compactionRequested = false;
1770     try {
1771       // A.  Flush memstore to all the HStores.
1772       // Keep running vector of all store files that includes both old and the
1773       // just-made new flush store file. The new flushed file is still in the
1774       // tmp directory.
1775 
1776       for (StoreFlushContext flush : storeFlushCtxs) {
1777         flush.flushCache(status);
1778       }
1779 
1780       // Switch snapshot (in memstore) -> new hfile (thus causing
1781       // all the store scanners to reset/reseek).
1782       for (StoreFlushContext flush : storeFlushCtxs) {
1783         boolean needsCompaction = flush.commit(status);
1784         if (needsCompaction) {
1785           compactionRequested = true;
1786         }
1787       }
1788       storeFlushCtxs.clear();
1789 
1790       // Set down the memstore size by amount of flush.
1791       this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
1792     } catch (Throwable t) {
1793       // An exception here means that the snapshot was not persisted.
1794       // The hlog needs to be replayed so its content is restored to memstore.
1795       // Currently, only a server restart will do this.
1796       // We used to only catch IOEs but its possible that we'd get other
1797       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
1798       // all and sundry.
1799       if (wal != null) {
1800         wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1801       }
1802       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1803           Bytes.toStringBinary(getRegionName()));
1804       dse.initCause(t);
1805       status.abort("Flush failed: " + StringUtils.stringifyException(t));
1806       throw dse;
1807     }
1808 
1809     // If we get to here, the HStores have been written.
1810     if (wal != null) {
1811       wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1812     }
1813 
1814     // Record latest flush time
1815     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
1816 
1817     // Update the last flushed sequence id for region
1818     completeSequenceId = flushSeqId;
1819 
1820     // C. Finally notify anyone waiting on memstore to clear:
1821     // e.g. checkResources().
1822     synchronized (this) {
1823       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1824     }
1825 
1826     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1827     long memstoresize = this.memstoreSize.get();
1828     String msg = "Finished memstore flush of ~" +
1829       StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
1830       ", currentsize=" +
1831       StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
1832       " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1833       ", compaction requested=" + compactionRequested +
1834       ((wal == null)? "; wal=null": "");
1835     LOG.info(msg);
1836     status.setStatus(msg);
1837     this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
1838 
1839     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
1840         FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
1841   }
1842 
1843   //////////////////////////////////////////////////////////////////////////////
1844   // get() methods for client use.
1845   //////////////////////////////////////////////////////////////////////////////
1846   /**
1847    * Return all the data for the row that matches <i>row</i> exactly,
1848    * or the one that immediately preceeds it, at or immediately before
1849    * <i>ts</i>.
1850    *
1851    * @param row row key
1852    * @return map of values
1853    * @throws IOException
1854    */
1855   Result getClosestRowBefore(final byte [] row)
1856   throws IOException{
1857     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
1858   }
1859 
1860   /**
1861    * Return all the data for the row that matches <i>row</i> exactly,
1862    * or the one that immediately preceeds it, at or immediately before
1863    * <i>ts</i>.
1864    *
1865    * @param row row key
1866    * @param family column family to find on
1867    * @return map of values
1868    * @throws IOException read exceptions
1869    */
1870   public Result getClosestRowBefore(final byte [] row, final byte [] family)
1871   throws IOException {
1872     if (coprocessorHost != null) {
1873       Result result = new Result();
1874       if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
1875         return result;
1876       }
1877     }
1878     // look across all the HStores for this region and determine what the
1879     // closest key is across all column families, since the data may be sparse
1880     checkRow(row, "getClosestRowBefore");
1881     startRegionOperation(Operation.GET);
1882     this.readRequestsCount.increment();
1883     try {
1884       Store store = getStore(family);
1885       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
1886       KeyValue key = store.getRowKeyAtOrBefore(row);
1887       Result result = null;
1888       if (key != null) {
1889         Get get = new Get(key.getRow());
1890         get.addFamily(family);
1891         result = get(get);
1892       }
1893       if (coprocessorHost != null) {
1894         coprocessorHost.postGetClosestRowBefore(row, family, result);
1895       }
1896       return result;
1897     } finally {
1898       closeRegionOperation(Operation.GET);
1899     }
1900   }
1901 
1902   /**
1903    * Return an iterator that scans over the HRegion, returning the indicated
1904    * columns and rows specified by the {@link Scan}.
1905    * <p>
1906    * This Iterator must be closed by the caller.
1907    *
1908    * @param scan configured {@link Scan}
1909    * @return RegionScanner
1910    * @throws IOException read exceptions
1911    */
1912   public RegionScanner getScanner(Scan scan) throws IOException {
1913    return getScanner(scan, null);
1914   }
1915 
1916   void prepareScanner(Scan scan) throws IOException {
1917     if(!scan.hasFamilies()) {
1918       // Adding all families to scanner
1919       for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
1920         scan.addFamily(family);
1921       }
1922     }
1923   }
1924 
1925   protected RegionScanner getScanner(Scan scan,
1926       List<KeyValueScanner> additionalScanners) throws IOException {
1927     startRegionOperation(Operation.SCAN);
1928     try {
1929       // Verify families are all valid
1930       prepareScanner(scan);
1931       if(scan.hasFamilies()) {
1932         for(byte [] family : scan.getFamilyMap().keySet()) {
1933           checkFamily(family);
1934         }
1935       }
1936       return instantiateRegionScanner(scan, additionalScanners);
1937     } finally {
1938       closeRegionOperation(Operation.SCAN);
1939     }
1940   }
1941 
1942   protected RegionScanner instantiateRegionScanner(Scan scan,
1943       List<KeyValueScanner> additionalScanners) throws IOException {
1944     if (scan.isReversed()) {
1945       if (scan.getFilter() != null) {
1946         scan.getFilter().setReversed(true);
1947       }
1948       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
1949     }
1950     return new RegionScannerImpl(scan, additionalScanners, this);
1951   }
1952 
1953   /*
1954    * @param delete The passed delete is modified by this method. WARNING!
1955    */
1956   void prepareDelete(Delete delete) throws IOException {
1957     // Check to see if this is a deleteRow insert
1958     if(delete.getFamilyCellMap().isEmpty()){
1959       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
1960         // Don't eat the timestamp
1961         delete.deleteFamily(family, delete.getTimeStamp());
1962       }
1963     } else {
1964       for(byte [] family : delete.getFamilyCellMap().keySet()) {
1965         if(family == null) {
1966           throw new NoSuchColumnFamilyException("Empty family is invalid");
1967         }
1968         checkFamily(family);
1969       }
1970     }
1971   }
1972 
1973   //////////////////////////////////////////////////////////////////////////////
1974   // set() methods for client use.
1975   //////////////////////////////////////////////////////////////////////////////
1976   /**
1977    * @param delete delete object
1978    * @throws IOException read exceptions
1979    */
1980   public void delete(Delete delete)
1981   throws IOException {
1982     checkReadOnly();
1983     checkResources();
1984     startRegionOperation(Operation.DELETE);
1985     try {
1986       delete.getRow();
1987       // All edits for the given row (across all column families) must happen atomically.
1988       doBatchMutate(delete);
1989     } finally {
1990       closeRegionOperation(Operation.DELETE);
1991     }
1992   }
1993 
1994   /**
1995    * Row needed by below method.
1996    */
1997   private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
1998   /**
1999    * This is used only by unit tests. Not required to be a public API.
2000    * @param familyMap map of family to edits for the given family.
2001    * @param durability
2002    * @throws IOException
2003    */
2004   void delete(NavigableMap<byte[], List<Cell>> familyMap,
2005       Durability durability) throws IOException {
2006     Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2007     delete.setFamilyCellMap(familyMap);
2008     delete.setDurability(durability);
2009     doBatchMutate(delete);
2010   }
2011 
2012   /**
2013    * Setup correct timestamps in the KVs in Delete object.
2014    * Caller should have the row and region locks.
2015    * @param mutation
2016    * @param familyMap
2017    * @param byteNow
2018    * @throws IOException
2019    */
2020   void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2021       byte[] byteNow) throws IOException {
2022     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2023 
2024       byte[] family = e.getKey();
2025       List<Cell> cells = e.getValue();
2026       Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2027 
2028       for (Cell cell: cells) {
2029         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2030         //  Check if time is LATEST, change to time of most recent addition if so
2031         //  This is expensive.
2032         if (kv.isLatestTimestamp() && kv.isDeleteType()) {
2033           byte[] qual = kv.getQualifier();
2034           if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2035 
2036           Integer count = kvCount.get(qual);
2037           if (count == null) {
2038             kvCount.put(qual, 1);
2039           } else {
2040             kvCount.put(qual, count + 1);
2041           }
2042           count = kvCount.get(qual);
2043 
2044           Get get = new Get(kv.getRow());
2045           get.setMaxVersions(count);
2046           get.addColumn(family, qual);
2047           if (coprocessorHost != null) {
2048             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2049                 byteNow, get)) {
2050               updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2051             }
2052           } else {
2053             updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2054           }
2055         } else {
2056           kv.updateLatestStamp(byteNow);
2057         }
2058       }
2059     }
2060   }
2061 
2062   void updateDeleteLatestVersionTimeStamp(KeyValue kv, Get get, int count, byte[] byteNow)
2063       throws IOException {
2064     List<Cell> result = get(get, false);
2065 
2066     if (result.size() < count) {
2067       // Nothing to delete
2068       kv.updateLatestStamp(byteNow);
2069       return;
2070     }
2071     if (result.size() > count) {
2072       throw new RuntimeException("Unexpected size: " + result.size());
2073     }
2074     KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
2075     Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(),
2076         getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
2077   }
2078 
2079   /**
2080    * @param put
2081    * @throws IOException
2082    */
2083   public void put(Put put)
2084   throws IOException {
2085     checkReadOnly();
2086 
2087     // Do a rough check that we have resources to accept a write.  The check is
2088     // 'rough' in that between the resource check and the call to obtain a
2089     // read lock, resources may run out.  For now, the thought is that this
2090     // will be extremely rare; we'll deal with it when it happens.
2091     checkResources();
2092     startRegionOperation(Operation.PUT);
2093     try {
2094       // All edits for the given row (across all column families) must happen atomically.
2095       doBatchMutate(put);
2096     } finally {
2097       closeRegionOperation(Operation.PUT);
2098     }
2099   }
2100 
2101   /**
2102    * Struct-like class that tracks the progress of a batch operation,
2103    * accumulating status codes and tracking the index at which processing
2104    * is proceeding.
2105    */
2106   private abstract static class BatchOperationInProgress<T> {
2107     T[] operations;
2108     int nextIndexToProcess = 0;
2109     OperationStatus[] retCodeDetails;
2110     WALEdit[] walEditsFromCoprocessors;
2111 
2112     public BatchOperationInProgress(T[] operations) {
2113       this.operations = operations;
2114       this.retCodeDetails = new OperationStatus[operations.length];
2115       this.walEditsFromCoprocessors = new WALEdit[operations.length];
2116       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2117     }
2118 
2119     public abstract Mutation getMutation(int index);
2120     public abstract long getNonceGroup(int index);
2121     public abstract long getNonce(int index);
2122     /** This method is potentially expensive and should only be used for non-replay CP path. */
2123     public abstract Mutation[] getMutationsForCoprocs();
2124     public abstract boolean isInReplay();
2125 
2126     public boolean isDone() {
2127       return nextIndexToProcess == operations.length;
2128     }
2129   }
2130 
2131   private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2132     private long nonceGroup;
2133     private long nonce;
2134     public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2135       super(operations);
2136       this.nonceGroup = nonceGroup;
2137       this.nonce = nonce;
2138     }
2139 
2140     public Mutation getMutation(int index) {
2141       return this.operations[index];
2142     }
2143 
2144     @Override
2145     public long getNonceGroup(int index) {
2146       return nonceGroup;
2147     }
2148 
2149     @Override
2150     public long getNonce(int index) {
2151       return nonce;
2152     }
2153 
2154     @Override
2155     public Mutation[] getMutationsForCoprocs() {
2156       return this.operations;
2157     }
2158 
2159     @Override
2160     public boolean isInReplay() {
2161       return false;
2162     }
2163   }
2164 
2165   private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
2166     public ReplayBatch(MutationReplay[] operations) {
2167       super(operations);
2168     }
2169 
2170     @Override
2171     public Mutation getMutation(int index) {
2172       return this.operations[index].mutation;
2173     }
2174 
2175     @Override
2176     public long getNonceGroup(int index) {
2177       return this.operations[index].nonceGroup;
2178     }
2179 
2180     @Override
2181     public long getNonce(int index) {
2182       return this.operations[index].nonce;
2183     }
2184 
2185     @Override
2186     public Mutation[] getMutationsForCoprocs() {
2187       assert false;
2188       throw new RuntimeException("Should not be called for replay batch");
2189     }
2190 
2191     @Override
2192     public boolean isInReplay() {
2193       return true;
2194     }
2195   }
2196 
2197   /**
2198    * Perform a batch of mutations.
2199    * It supports only Put and Delete mutations and will ignore other types passed.
2200    * @param mutations the list of mutations
2201    * @return an array of OperationStatus which internally contains the
2202    *         OperationStatusCode and the exceptionMessage if any.
2203    * @throws IOException
2204    */
2205   public OperationStatus[] batchMutate(
2206       Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2207     // As it stands, this is used for 3 things
2208     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
2209     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
2210     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
2211     return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2212   }
2213 
2214   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2215     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2216   }
2217 
2218   /**
2219    * Replay a batch of mutations.
2220    * @param mutations mutations to replay.
2221    * @return an array of OperationStatus which internally contains the
2222    *         OperationStatusCode and the exceptionMessage if any.
2223    * @throws IOException
2224    */
2225   public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations)
2226       throws IOException {
2227     return batchMutate(new ReplayBatch(mutations));
2228   }
2229 
2230   /**
2231    * Perform a batch of mutations.
2232    * It supports only Put and Delete mutations and will ignore other types passed.
2233    * @param mutations the list of mutations
2234    * @return an array of OperationStatus which internally contains the
2235    *         OperationStatusCode and the exceptionMessage if any.
2236    * @throws IOException
2237    */
2238   OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2239     boolean initialized = false;
2240     while (!batchOp.isDone()) {
2241       if (!batchOp.isInReplay()) {
2242         checkReadOnly();
2243       }
2244       checkResources();
2245 
2246       long newSize;
2247       Operation op = Operation.BATCH_MUTATE;
2248       if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
2249       startRegionOperation(op);
2250 
2251       try {
2252         if (!initialized) {
2253           this.writeRequestsCount.add(batchOp.operations.length);
2254           if (!batchOp.isInReplay()) {
2255             doPreMutationHook(batchOp);
2256           }
2257           initialized = true;
2258         }
2259         long addedSize = doMiniBatchMutation(batchOp);
2260         newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2261       } finally {
2262         closeRegionOperation(op);
2263       }
2264       if (isFlushSize(newSize)) {
2265         requestFlush();
2266       }
2267     }
2268     return batchOp.retCodeDetails;
2269   }
2270 
2271 
2272   private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2273       throws IOException {
2274     /* Run coprocessor pre hook outside of locks to avoid deadlock */
2275     WALEdit walEdit = new WALEdit();
2276     if (coprocessorHost != null) {
2277       for (int i = 0 ; i < batchOp.operations.length; i++) {
2278         Mutation m = batchOp.getMutation(i);
2279         if (m instanceof Put) {
2280           if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2281             // pre hook says skip this Put
2282             // mark as success and skip in doMiniBatchMutation
2283             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2284           }
2285         } else if (m instanceof Delete) {
2286           Delete curDel = (Delete) m;
2287           if (curDel.getFamilyCellMap().isEmpty()) {
2288             // handle deleting a row case
2289             prepareDelete(curDel);
2290           }
2291           if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2292             // pre hook says skip this Delete
2293             // mark as success and skip in doMiniBatchMutation
2294             batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2295           }
2296         } else {
2297           // In case of passing Append mutations along with the Puts and Deletes in batchMutate
2298           // mark the operation return code as failure so that it will not be considered in
2299           // the doMiniBatchMutation
2300           batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2301               "Put/Delete mutations only supported in batchMutate() now");
2302         }
2303         if (!walEdit.isEmpty()) {
2304           batchOp.walEditsFromCoprocessors[i] = walEdit;
2305           walEdit = new WALEdit();
2306         }
2307       }
2308     }
2309   }
2310 
2311   @SuppressWarnings("unchecked")
2312   private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2313     boolean isInReplay = batchOp.isInReplay();
2314     // variable to note if all Put items are for the same CF -- metrics related
2315     boolean putsCfSetConsistent = true;
2316     //The set of columnFamilies first seen for Put.
2317     Set<byte[]> putsCfSet = null;
2318     // variable to note if all Delete items are for the same CF -- metrics related
2319     boolean deletesCfSetConsistent = true;
2320     //The set of columnFamilies first seen for Delete.
2321     Set<byte[]> deletesCfSet = null;
2322 
2323     long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2324     WALEdit walEdit = new WALEdit(isInReplay);
2325     MultiVersionConsistencyControl.WriteEntry w = null;
2326     long txid = 0;
2327     boolean doRollBackMemstore = false;
2328     boolean locked = false;
2329 
2330     /** Keep track of the locks we hold so we can release them in finally clause */
2331     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2332     // reference family maps directly so coprocessors can mutate them if desired
2333     Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2334     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
2335     int firstIndex = batchOp.nextIndexToProcess;
2336     int lastIndexExclusive = firstIndex;
2337     boolean success = false;
2338     int noOfPuts = 0, noOfDeletes = 0;
2339     try {
2340       // ------------------------------------
2341       // STEP 1. Try to acquire as many locks as we can, and ensure
2342       // we acquire at least one.
2343       // ----------------------------------
2344       int numReadyToWrite = 0;
2345       long now = EnvironmentEdgeManager.currentTimeMillis();
2346       while (lastIndexExclusive < batchOp.operations.length) {
2347         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2348         boolean isPutMutation = mutation instanceof Put;
2349 
2350         Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2351         // store the family map reference to allow for mutations
2352         familyMaps[lastIndexExclusive] = familyMap;
2353 
2354         // skip anything that "ran" already
2355         if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2356             != OperationStatusCode.NOT_RUN) {
2357           lastIndexExclusive++;
2358           continue;
2359         }
2360 
2361         try {
2362           if (isPutMutation) {
2363             // Check the families in the put. If bad, skip this one.
2364             if (isInReplay) {
2365               removeNonExistentColumnFamilyForReplay(familyMap);
2366             } else {
2367               checkFamilies(familyMap.keySet());
2368             }
2369             checkTimestamps(mutation.getFamilyCellMap(), now);
2370           } else {
2371             prepareDelete((Delete) mutation);
2372           }
2373         } catch (NoSuchColumnFamilyException nscf) {
2374           LOG.warn("No such column family in batch mutation", nscf);
2375           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2376               OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2377           lastIndexExclusive++;
2378           continue;
2379         } catch (FailedSanityCheckException fsce) {
2380           LOG.warn("Batch Mutation did not pass sanity check", fsce);
2381           batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2382               OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2383           lastIndexExclusive++;
2384           continue;
2385         }
2386 
2387         // If we haven't got any rows in our batch, we should block to
2388         // get the next one.
2389         boolean shouldBlock = numReadyToWrite == 0;
2390         RowLock rowLock = null;
2391         try {
2392           rowLock = getRowLock(mutation.getRow(), shouldBlock);
2393         } catch (IOException ioe) {
2394           LOG.warn("Failed getting lock in batch put, row="
2395             + Bytes.toStringBinary(mutation.getRow()), ioe);
2396         }
2397         if (rowLock == null) {
2398           // We failed to grab another lock
2399           assert !shouldBlock : "Should never fail to get lock when blocking";
2400           break; // stop acquiring more rows for this batch
2401         } else {
2402           acquiredRowLocks.add(rowLock);
2403         }
2404 
2405         lastIndexExclusive++;
2406         numReadyToWrite++;
2407 
2408         if (isPutMutation) {
2409           // If Column Families stay consistent through out all of the
2410           // individual puts then metrics can be reported as a mutliput across
2411           // column families in the first put.
2412           if (putsCfSet == null) {
2413             putsCfSet = mutation.getFamilyCellMap().keySet();
2414           } else {
2415             putsCfSetConsistent = putsCfSetConsistent
2416                 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2417           }
2418         } else {
2419           if (deletesCfSet == null) {
2420             deletesCfSet = mutation.getFamilyCellMap().keySet();
2421           } else {
2422             deletesCfSetConsistent = deletesCfSetConsistent
2423                 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2424           }
2425         }
2426       }
2427 
2428       // we should record the timestamp only after we have acquired the rowLock,
2429       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
2430       now = EnvironmentEdgeManager.currentTimeMillis();
2431       byte[] byteNow = Bytes.toBytes(now);
2432 
2433       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
2434       if (numReadyToWrite <= 0) return 0L;
2435 
2436       // We've now grabbed as many mutations off the list as we can
2437 
2438       // ------------------------------------
2439       // STEP 2. Update any LATEST_TIMESTAMP timestamps
2440       // ----------------------------------
2441       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2442         // skip invalid
2443         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2444             != OperationStatusCode.NOT_RUN) continue;
2445 
2446         Mutation mutation = batchOp.getMutation(i);
2447         if (mutation instanceof Put) {
2448           updateKVTimestamps(familyMaps[i].values(), byteNow);
2449           noOfPuts++;
2450         } else {
2451           if (!isInReplay) {
2452             prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2453           }
2454           noOfDeletes++;
2455         }
2456       }
2457 
2458       lock(this.updatesLock.readLock(), numReadyToWrite);
2459       locked = true;
2460 
2461       //
2462       // ------------------------------------
2463       // Acquire the latest mvcc number
2464       // ----------------------------------
2465       w = mvcc.beginMemstoreInsert();
2466 
2467       // calling the pre CP hook for batch mutation
2468       if (!isInReplay && coprocessorHost != null) {
2469         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2470           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2471           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2472         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2473       }
2474 
2475       // ------------------------------------
2476       // STEP 3. Write back to memstore
2477       // Write to memstore. It is ok to write to memstore
2478       // first without updating the HLog because we do not roll
2479       // forward the memstore MVCC. The MVCC will be moved up when
2480       // the complete operation is done. These changes are not yet
2481       // visible to scanners till we update the MVCC. The MVCC is
2482       // moved only when the sync is complete.
2483       // ----------------------------------
2484       long addedSize = 0;
2485       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2486         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2487             != OperationStatusCode.NOT_RUN) {
2488           continue;
2489         }
2490         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
2491         addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
2492       }
2493 
2494       // ------------------------------------
2495       // STEP 4. Build WAL edit
2496       // ----------------------------------
2497       boolean hasWalAppends = false;
2498       Durability durability = Durability.USE_DEFAULT;
2499       for (int i = firstIndex; i < lastIndexExclusive; i++) {
2500         // Skip puts that were determined to be invalid during preprocessing
2501         if (batchOp.retCodeDetails[i].getOperationStatusCode()
2502             != OperationStatusCode.NOT_RUN) {
2503           continue;
2504         }
2505         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2506 
2507         Mutation m = batchOp.getMutation(i);
2508         Durability tmpDur = getEffectiveDurability(m.getDurability());
2509         if (tmpDur.ordinal() > durability.ordinal()) {
2510           durability = tmpDur;
2511         }
2512         if (tmpDur == Durability.SKIP_WAL) {
2513           recordMutationWithoutWal(m.getFamilyCellMap());
2514           continue;
2515         }
2516 
2517         long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2518         // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
2519         // Given how nonces are originally written, these should be contiguous.
2520         // They don't have to be, it will still work, just write more WALEdits than needed.
2521         if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2522           if (walEdit.size() > 0) {
2523             assert isInReplay;
2524             if (!isInReplay) {
2525               throw new IOException("Multiple nonces per batch and not in replay");
2526             }
2527             // txid should always increase, so having the one from the last call is ok.
2528             txid = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(),
2529                   walEdit, m.getClusterIds(), now, htableDescriptor, this.sequenceId, true,
2530                   currentNonceGroup, currentNonce);
2531             hasWalAppends = true;
2532             walEdit = new WALEdit(isInReplay);
2533           }
2534           currentNonceGroup = nonceGroup;
2535           currentNonce = nonce;
2536         }
2537 
2538         // Add WAL edits by CP
2539         WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2540         if (fromCP != null) {
2541           for (KeyValue kv : fromCP.getKeyValues()) {
2542             walEdit.add(kv);
2543           }
2544         }
2545         addFamilyMapToWALEdit(familyMaps[i], walEdit);
2546       }
2547 
2548       // -------------------------
2549       // STEP 5. Append the final edit to WAL. Do not sync wal.
2550       // -------------------------
2551       Mutation mutation = batchOp.getMutation(firstIndex);
2552       if (walEdit.size() > 0) {
2553         txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
2554               walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId,
2555               true, currentNonceGroup, currentNonce);
2556         hasWalAppends = true;
2557       }
2558 
2559       // -------------------------------
2560       // STEP 6. Release row locks, etc.
2561       // -------------------------------
2562       if (locked) {
2563         this.updatesLock.readLock().unlock();
2564         locked = false;
2565       }
2566       releaseRowLocks(acquiredRowLocks);
2567 
2568       // -------------------------
2569       // STEP 7. Sync wal.
2570       // -------------------------
2571       if (hasWalAppends) {
2572         syncOrDefer(txid, durability);
2573       }
2574       doRollBackMemstore = false;
2575       // calling the post CP hook for batch mutation
2576       if (!isInReplay && coprocessorHost != null) {
2577         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2578           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2579           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2580         coprocessorHost.postBatchMutate(miniBatchOp);
2581       }
2582 
2583       // ------------------------------------------------------------------
2584       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
2585       // ------------------------------------------------------------------
2586       if (w != null) {
2587         mvcc.completeMemstoreInsert(w);
2588         w = null;
2589       }
2590 
2591       // ------------------------------------
2592       // STEP 9. Run coprocessor post hooks. This should be done after the wal is
2593       // synced so that the coprocessor contract is adhered to.
2594       // ------------------------------------
2595       if (!isInReplay && coprocessorHost != null) {
2596         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2597           // only for successful puts
2598           if (batchOp.retCodeDetails[i].getOperationStatusCode()
2599               != OperationStatusCode.SUCCESS) {
2600             continue;
2601           }
2602           Mutation m = batchOp.getMutation(i);
2603           if (m instanceof Put) {
2604             coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2605           } else {
2606             coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2607           }
2608         }
2609       }
2610 
2611       success = true;
2612       return addedSize;
2613     } finally {
2614 
2615       // if the wal sync was unsuccessful, remove keys from memstore
2616       if (doRollBackMemstore) {
2617         rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
2618       }
2619       if (w != null) mvcc.completeMemstoreInsert(w);
2620 
2621       if (locked) {
2622         this.updatesLock.readLock().unlock();
2623       }
2624       releaseRowLocks(acquiredRowLocks);
2625 
2626       // See if the column families were consistent through the whole thing.
2627       // if they were then keep them. If they were not then pass a null.
2628       // null will be treated as unknown.
2629       // Total time taken might be involving Puts and Deletes.
2630       // Split the time for puts and deletes based on the total number of Puts and Deletes.
2631 
2632       if (noOfPuts > 0) {
2633         // There were some Puts in the batch.
2634         if (this.metricsRegion != null) {
2635           this.metricsRegion.updatePut();
2636         }
2637       }
2638       if (noOfDeletes > 0) {
2639         // There were some Deletes in the batch.
2640         if (this.metricsRegion != null) {
2641           this.metricsRegion.updateDelete();
2642         }
2643       }
2644       if (!success) {
2645         for (int i = firstIndex; i < lastIndexExclusive; i++) {
2646           if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2647             batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2648           }
2649         }
2650       }
2651       if (coprocessorHost != null && !batchOp.isInReplay()) {
2652         // call the coprocessor hook to do any finalization steps
2653         // after the put is done
2654         MiniBatchOperationInProgress<Mutation> miniBatchOp =
2655             new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2656                 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
2657                 lastIndexExclusive);
2658         coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
2659       }
2660 
2661       batchOp.nextIndexToProcess = lastIndexExclusive;
2662     }
2663   }
2664 
2665   /**
2666    * Returns effective durability from the passed durability and
2667    * the table descriptor.
2668    */
2669   protected Durability getEffectiveDurability(Durability d) {
2670     return d == Durability.USE_DEFAULT ? this.durability : d;
2671   }
2672 
2673   //TODO, Think that gets/puts and deletes should be refactored a bit so that
2674   //the getting of the lock happens before, so that you would just pass it into
2675   //the methods. So in the case of checkAndMutate you could just do lockRow,
2676   //get, put, unlockRow or something
2677   /**
2678    *
2679    * @param row
2680    * @param family
2681    * @param qualifier
2682    * @param compareOp
2683    * @param comparator
2684    * @param w
2685    * @param writeToWAL
2686    * @throws IOException
2687    * @return true if the new put was executed, false otherwise
2688    */
2689   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2690       CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2691       boolean writeToWAL)
2692   throws IOException{
2693     checkReadOnly();
2694     //TODO, add check for value length or maybe even better move this to the
2695     //client if this becomes a global setting
2696     checkResources();
2697     boolean isPut = w instanceof Put;
2698     if (!isPut && !(w instanceof Delete))
2699       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
2700           "be Put or Delete");
2701     if (!Bytes.equals(row, w.getRow())) {
2702       throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
2703           "getRow must match the passed row");
2704     }
2705 
2706     startRegionOperation();
2707     try {
2708       Get get = new Get(row);
2709       checkFamily(family);
2710       get.addColumn(family, qualifier);
2711 
2712       // Lock row - note that doBatchMutate will relock this row if called
2713       RowLock rowLock = getRowLock(get.getRow());
2714       // wait for all previous transactions to complete (with lock held)
2715       mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
2716       try {
2717         if (this.getCoprocessorHost() != null) {
2718           Boolean processed = null;
2719           if (w instanceof Put) {
2720             processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family, 
2721                 qualifier, compareOp, comparator, (Put) w);
2722           } else if (w instanceof Delete) {
2723             processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
2724                 qualifier, compareOp, comparator, (Delete) w);
2725           }
2726           if (processed != null) {
2727             return processed;
2728           }
2729         }
2730         List<Cell> result = get(get, false);
2731 
2732         boolean valueIsNull = comparator.getValue() == null ||
2733           comparator.getValue().length == 0;
2734         boolean matches = false;
2735         if (result.size() == 0 && valueIsNull) {
2736           matches = true;
2737         } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
2738             valueIsNull) {
2739           matches = true;
2740         } else if (result.size() == 1 && !valueIsNull) {
2741           Cell kv = result.get(0);
2742           int compareResult = comparator.compareTo(kv.getValueArray(),
2743               kv.getValueOffset(), kv.getValueLength());
2744           switch (compareOp) {
2745           case LESS:
2746             matches = compareResult < 0;
2747             break;
2748           case LESS_OR_EQUAL:
2749             matches = compareResult <= 0;
2750             break;
2751           case EQUAL:
2752             matches = compareResult == 0;
2753             break;
2754           case NOT_EQUAL:
2755             matches = compareResult != 0;
2756             break;
2757           case GREATER_OR_EQUAL:
2758             matches = compareResult >= 0;
2759             break;
2760           case GREATER:
2761             matches = compareResult > 0;
2762             break;
2763           default:
2764             throw new RuntimeException("Unknown Compare op " + compareOp.name());
2765           }
2766         }
2767         //If matches put the new put or delete the new delete
2768         if (matches) {
2769           // All edits for the given row (across all column families) must
2770           // happen atomically.
2771           doBatchMutate((Mutation)w);
2772           this.checkAndMutateChecksPassed.increment();
2773           return true;
2774         }
2775         this.checkAndMutateChecksFailed.increment();
2776         return false;
2777       } finally {
2778         rowLock.release();
2779       }
2780     } finally {
2781       closeRegionOperation();
2782     }
2783   }
2784 
2785   private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
2786     // Currently this is only called for puts and deletes, so no nonces.
2787     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
2788         HConstants.NO_NONCE, HConstants.NO_NONCE);
2789     if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
2790       throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
2791     } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
2792       throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
2793     }
2794   }
2795 
2796   /**
2797    * Complete taking the snapshot on the region. Writes the region info and adds references to the
2798    * working snapshot directory.
2799    *
2800    * TODO for api consistency, consider adding another version with no {@link ForeignExceptionSnare}
2801    * arg.  (In the future other cancellable HRegion methods could eventually add a
2802    * {@link ForeignExceptionSnare}, or we could do something fancier).
2803    *
2804    * @param desc snasphot description object
2805    * @param exnSnare ForeignExceptionSnare that captures external exeptions in case we need to
2806    *   bail out.  This is allowed to be null and will just be ignored in that case.
2807    * @throws IOException if there is an external or internal error causing the snapshot to fail
2808    */
2809   public void addRegionToSnapshot(SnapshotDescription desc,
2810       ForeignExceptionSnare exnSnare) throws IOException {
2811     // This should be "fast" since we don't rewrite store files but instead
2812     // back up the store files by creating a reference
2813     Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
2814     Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
2815 
2816     // 1. dump region meta info into the snapshot directory
2817     LOG.debug("Storing region-info for snapshot.");
2818     HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
2819         this.fs.getFileSystem(), snapshotDir, getRegionInfo());
2820 
2821     // 2. iterate through all the stores in the region
2822     LOG.debug("Creating references for hfiles");
2823 
2824     // This ensures that we have an atomic view of the directory as long as we have < ls limit
2825     // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files in
2826     // batches and may miss files being added/deleted. This could be more robust (iteratively
2827     // checking to see if we have all the files until we are sure), but the limit is currently 1000
2828     // files/batch, far more than the number of store files under a single column family.
2829     for (Store store : stores.values()) {
2830       // 2.1. build the snapshot reference directory for the store
2831       Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
2832       List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());
2833       if (LOG.isDebugEnabled()) {
2834         LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
2835       }
2836 
2837       // 2.2. iterate through all the store's files and create "references".
2838       int sz = storeFiles.size();
2839       for (int i = 0; i < sz; i++) {
2840         if (exnSnare != null) {
2841           exnSnare.rethrowException();
2842         }
2843         StoreFile storeFile = storeFiles.get(i);
2844         Path file = storeFile.getPath();
2845 
2846         LOG.debug("Creating reference for file (" + (i+1) + "/" + sz + ") : " + file);
2847         Path referenceFile = new Path(dstStoreDir, file.getName());
2848         boolean success = true;
2849         if (storeFile.isReference()) {
2850           // write the Reference object to the snapshot
2851           storeFile.getFileInfo().getReference().write(fs.getFileSystem(), referenceFile);
2852         } else {
2853           // create "reference" to this store file.  It is intentionally an empty file -- all
2854           // necessary information is captured by its fs location and filename.  This allows us to
2855           // only figure out what needs to be done via a single nn operation (instead of having to
2856           // open and read the files as well).
2857           success = fs.getFileSystem().createNewFile(referenceFile);
2858         }
2859         if (!success) {
2860           throw new IOException("Failed to create reference file:" + referenceFile);
2861         }
2862       }
2863     }
2864   }
2865 
2866   /**
2867    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the
2868    * provided current timestamp.
2869    */
2870   void updateKVTimestamps(final Iterable<List<Cell>> keyLists, final byte[] now) {
2871     for (List<Cell> cells: keyLists) {
2872       if (cells == null) continue;
2873       for (Cell cell : cells) {
2874         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2875         kv.updateLatestStamp(now);
2876       }
2877     }
2878   }
2879 
2880   /*
2881    * Check if resources to support an update.
2882    *
2883    * We throw RegionTooBusyException if above memstore limit
2884    * and expect client to retry using some kind of backoff
2885   */
2886   private void checkResources()
2887     throws RegionTooBusyException {
2888     // If catalog region, do not impose resource constraints or block updates.
2889     if (this.getRegionInfo().isMetaRegion()) return;
2890 
2891     if (this.memstoreSize.get() > this.blockingMemStoreSize) {
2892       requestFlush();
2893       throw new RegionTooBusyException("Above memstore limit, " +
2894           "regionName=" + (this.getRegionInfo() == null ? "unknown" :
2895           this.getRegionInfo().getRegionNameAsString()) +
2896           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
2897           this.getRegionServerServices().getServerName()) +
2898           ", memstoreSize=" + memstoreSize.get() +
2899           ", blockingMemStoreSize=" + blockingMemStoreSize);
2900     }
2901   }
2902 
2903   /**
2904    * @throws IOException Throws exception if region is in read-only mode.
2905    */
2906   protected void checkReadOnly() throws IOException {
2907     if (this.writestate.isReadOnly()) {
2908       throw new IOException("region is read only");
2909     }
2910   }
2911 
2912   /**
2913    * Add updates first to the hlog and then add values to memstore.
2914    * Warning: Assumption is caller has lock on passed in row.
2915    * @param family
2916    * @param edits Cell updates by column
2917    * @praram now
2918    * @throws IOException
2919    */
2920   private void put(final byte [] row, byte [] family, List<Cell> edits)
2921   throws IOException {
2922     NavigableMap<byte[], List<Cell>> familyMap;
2923     familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
2924 
2925     familyMap.put(family, edits);
2926     Put p = new Put(row);
2927     p.setFamilyCellMap(familyMap);
2928     doBatchMutate(p);
2929   }
2930 
2931   /**
2932    * Atomically apply the given map of family->edits to the memstore.
2933    * This handles the consistency control on its own, but the caller
2934    * should already have locked updatesLock.readLock(). This also does
2935    * <b>not</b> check the families for validity.
2936    *
2937    * @param familyMap Map of kvs per family
2938    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
2939    *        If null, then this method internally creates a mvcc transaction.
2940    * @return the additional memory usage of the memstore caused by the
2941    * new entries.
2942    */
2943   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
2944     MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
2945     long size = 0;
2946     boolean freemvcc = false;
2947 
2948     try {
2949       if (localizedWriteEntry == null) {
2950         localizedWriteEntry = mvcc.beginMemstoreInsert();
2951         freemvcc = true;
2952       }
2953 
2954       for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2955         byte[] family = e.getKey();
2956         List<Cell> cells = e.getValue();
2957 
2958         Store store = getStore(family);
2959         for (Cell cell: cells) {
2960           KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2961           kv.setMvccVersion(localizedWriteEntry.getWriteNumber());
2962           size += store.add(kv);
2963         }
2964       }
2965     } finally {
2966       if (freemvcc) {
2967         mvcc.completeMemstoreInsert(localizedWriteEntry);
2968       }
2969     }
2970 
2971      return size;
2972    }
2973 
2974   /**
2975    * Remove all the keys listed in the map from the memstore. This method is
2976    * called when a Put/Delete has updated memstore but subequently fails to update
2977    * the wal. This method is then invoked to rollback the memstore.
2978    */
2979   private void rollbackMemstore(BatchOperationInProgress<?> batchOp,
2980                                 Map<byte[], List<Cell>>[] familyMaps,
2981                                 int start, int end) {
2982     int kvsRolledback = 0;
2983     for (int i = start; i < end; i++) {
2984       // skip over request that never succeeded in the first place.
2985       if (batchOp.retCodeDetails[i].getOperationStatusCode()
2986             != OperationStatusCode.SUCCESS) {
2987         continue;
2988       }
2989 
2990       // Rollback all the kvs for this row.
2991       Map<byte[], List<Cell>> familyMap  = familyMaps[i];
2992       for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2993         byte[] family = e.getKey();
2994         List<Cell> cells = e.getValue();
2995 
2996         // Remove those keys from the memstore that matches our
2997         // key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
2998         // that even the memstoreTS has to match for keys that will be rolleded-back.
2999         Store store = getStore(family);
3000         for (Cell cell: cells) {
3001           store.rollback(KeyValueUtil.ensureKeyValue(cell));
3002           kvsRolledback++;
3003         }
3004       }
3005     }
3006     LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
3007         " keyvalues from start:" + start + " to end:" + end);
3008   }
3009 
3010   /**
3011    * Check the collection of families for validity.
3012    * @throws NoSuchColumnFamilyException if a family does not exist.
3013    */
3014   void checkFamilies(Collection<byte[]> families)
3015   throws NoSuchColumnFamilyException {
3016     for (byte[] family : families) {
3017       checkFamily(family);
3018     }
3019   }
3020 
3021   /**
3022    * During replay, there could exist column families which are removed between region server
3023    * failure and replay
3024    */
3025   private void removeNonExistentColumnFamilyForReplay(
3026       final Map<byte[], List<Cell>> familyMap) {
3027     List<byte[]> nonExistentList = null;
3028     for (byte[] family : familyMap.keySet()) {
3029       if (!this.htableDescriptor.hasFamily(family)) {
3030         if (nonExistentList == null) {
3031           nonExistentList = new ArrayList<byte[]>();
3032         }
3033         nonExistentList.add(family);
3034       }
3035     }
3036     if (nonExistentList != null) {
3037       for (byte[] family : nonExistentList) {
3038         // Perhaps schema was changed between crash and replay
3039         LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3040         familyMap.remove(family);
3041       }
3042     }
3043   }
3044 
3045   void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
3046       long now) throws FailedSanityCheckException {
3047     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3048       return;
3049     }
3050     long maxTs = now + timestampSlop;
3051     for (List<Cell> kvs : familyMap.values()) {
3052       for (Cell cell : kvs) {
3053         // see if the user-side TS is out of range. latest = server-side
3054         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3055         if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
3056           throw new FailedSanityCheckException("Timestamp for KV out of range "
3057               + cell + " (too.new=" + timestampSlop + ")");
3058         }
3059       }
3060     }
3061   }
3062 
3063   /**
3064    * Append the given map of family->edits to a WALEdit data structure.
3065    * This does not write to the HLog itself.
3066    * @param familyMap map of family->edits
3067    * @param walEdit the destination entry to append into
3068    */
3069   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3070       WALEdit walEdit) {
3071     for (List<Cell> edits : familyMap.values()) {
3072       for (Cell cell : edits) {
3073         walEdit.add(KeyValueUtil.ensureKeyValue(cell));
3074       }
3075     }
3076   }
3077 
3078   private void requestFlush() {
3079     if (this.rsServices == null) {
3080       return;
3081     }
3082     synchronized (writestate) {
3083       if (this.writestate.isFlushRequested()) {
3084         return;
3085       }
3086       writestate.flushRequested = true;
3087     }
3088     // Make request outside of synchronize block; HBASE-818.
3089     this.rsServices.getFlushRequester().requestFlush(this);
3090     if (LOG.isDebugEnabled()) {
3091       LOG.debug("Flush requested on " + this);
3092     }
3093   }
3094 
3095   /*
3096    * @param size
3097    * @return True if size is over the flush threshold
3098    */
3099   private boolean isFlushSize(final long size) {
3100     return size > this.memstoreFlushSize;
3101   }
3102 
3103   /**
3104    * Read the edits log put under this region by wal log splitting process.  Put
3105    * the recovered edits back up into this region.
3106    *
3107    * <p>We can ignore any log message that has a sequence ID that's equal to or
3108    * lower than minSeqId.  (Because we know such log messages are already
3109    * reflected in the HFiles.)
3110    *
3111    * <p>While this is running we are putting pressure on memory yet we are
3112    * outside of our usual accounting because we are not yet an onlined region
3113    * (this stuff is being run as part of Region initialization).  This means
3114    * that if we're up against global memory limits, we'll not be flagged to flush
3115    * because we are not online. We can't be flushed by usual mechanisms anyways;
3116    * we're not yet online so our relative sequenceids are not yet aligned with
3117    * HLog sequenceids -- not till we come up online, post processing of split
3118    * edits.
3119    *
3120    * <p>But to help relieve memory pressure, at least manage our own heap size
3121    * flushing if are in excess of per-region limits.  Flushing, though, we have
3122    * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
3123    * on a different line to whats going on in here in this region context so if we
3124    * crashed replaying these edits, but in the midst had a flush that used the
3125    * regionserver log with a sequenceid in excess of whats going on in here
3126    * in this region and with its split editlogs, then we could miss edits the
3127    * next time we go to recover. So, we have to flush inline, using seqids that
3128    * make sense in a this single region context only -- until we online.
3129    *
3130    * @param regiondir
3131    * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
3132    * the maxSeqId for the store to be applied, else its skipped.
3133    * @param reporter
3134    * @return the sequence id of the last edit added to this region out of the
3135    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3136    * @throws UnsupportedEncodingException
3137    * @throws IOException
3138    */
3139   protected long replayRecoveredEditsIfAny(final Path regiondir,
3140       Map<byte[], Long> maxSeqIdInStores,
3141       final CancelableProgressable reporter, final MonitoredTask status)
3142       throws UnsupportedEncodingException, IOException {
3143     long minSeqIdForTheRegion = -1;
3144     for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3145       if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3146         minSeqIdForTheRegion = maxSeqIdInStore;
3147       }
3148     }
3149     long seqid = minSeqIdForTheRegion;
3150 
3151     FileSystem fs = this.fs.getFileSystem();
3152     NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
3153     if (LOG.isDebugEnabled()) {
3154       LOG.debug("Found " + (files == null ? 0 : files.size())
3155         + " recovered edits file(s) under " + regiondir);
3156     }
3157 
3158     if (files == null || files.isEmpty()) return seqid;
3159 
3160     for (Path edits: files) {
3161       if (edits == null || !fs.exists(edits)) {
3162         LOG.warn("Null or non-existent edits file: " + edits);
3163         continue;
3164       }
3165       if (isZeroLengthThenDelete(fs, edits)) continue;
3166 
3167       long maxSeqId;
3168       String fileName = edits.getName();
3169       maxSeqId = Math.abs(Long.parseLong(fileName));
3170       if (maxSeqId <= minSeqIdForTheRegion) {
3171         if (LOG.isDebugEnabled()) {
3172           String msg = "Maximum sequenceid for this log is " + maxSeqId
3173             + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3174             + ", skipped the whole file, path=" + edits;
3175           LOG.debug(msg);
3176         }
3177         continue;
3178       }
3179 
3180       try {
3181         // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
3182         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3183       } catch (IOException e) {
3184         boolean skipErrors = conf.getBoolean(
3185             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3186             conf.getBoolean(
3187                 "hbase.skip.errors",
3188                 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3189         if (conf.get("hbase.skip.errors") != null) {
3190           LOG.warn(
3191               "The property 'hbase.skip.errors' has been deprecated. Please use " +
3192               HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3193         }
3194         if (skipErrors) {
3195           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3196           LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3197               + "=true so continuing. Renamed " + edits +
3198               " as " + p, e);
3199         } else {
3200           throw e;
3201         }
3202       }
3203     }
3204     // The edits size added into rsAccounting during this replaying will not
3205     // be required any more. So just clear it.
3206     if (this.rsAccounting != null) {
3207       this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3208     }
3209     if (seqid > minSeqIdForTheRegion) {
3210       // Then we added some edits to memory. Flush and cleanup split edit files.
3211       internalFlushcache(null, seqid, status);
3212     }
3213     // Now delete the content of recovered edits.  We're done w/ them.
3214     for (Path file: files) {
3215       if (!fs.delete(file, false)) {
3216         LOG.error("Failed delete of " + file);
3217       } else {
3218         LOG.debug("Deleted recovered.edits file=" + file);
3219       }
3220     }
3221     return seqid;
3222   }
3223 
3224   /*
3225    * @param edits File of recovered edits.
3226    * @param maxSeqIdInStores Maximum sequenceid found in each store.  Edits in log
3227    * must be larger than this to be replayed for each store.
3228    * @param reporter
3229    * @return the sequence id of the last edit added to this region out of the
3230    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
3231    * @throws IOException
3232    */
3233   private long replayRecoveredEdits(final Path edits,
3234       Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3235     throws IOException {
3236     String msg = "Replaying edits from " + edits;
3237     LOG.info(msg);
3238     MonitoredTask status = TaskMonitor.get().createStatus(msg);
3239     FileSystem fs = this.fs.getFileSystem();
3240 
3241     status.setStatus("Opening logs");
3242     HLog.Reader reader = null;
3243     try {
3244       reader = HLogFactory.createReader(fs, edits, conf);
3245       long currentEditSeqId = -1;
3246       long firstSeqIdInLog = -1;
3247       long skippedEdits = 0;
3248       long editsCount = 0;
3249       long intervalEdits = 0;
3250       HLog.Entry entry;
3251       Store store = null;
3252       boolean reported_once = false;
3253       ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3254 
3255       try {
3256         // How many edits seen before we check elapsed time
3257         int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3258             2000);
3259         // How often to send a progress report (default 1/2 master timeout)
3260         int period = this.conf.getInt("hbase.hstore.report.period",
3261           this.conf.getInt(AssignmentManager.ASSIGNMENT_TIMEOUT,
3262             AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT) / 2);
3263         long lastReport = EnvironmentEdgeManager.currentTimeMillis();
3264 
3265         while ((entry = reader.next()) != null) {
3266           HLogKey key = entry.getKey();
3267           WALEdit val = entry.getEdit();
3268 
3269           if (ng != null) { // some test, or nonces disabled
3270             ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3271           }
3272 
3273           if (reporter != null) {
3274             intervalEdits += val.size();
3275             if (intervalEdits >= interval) {
3276               // Number of edits interval reached
3277               intervalEdits = 0;
3278               long cur = EnvironmentEdgeManager.currentTimeMillis();
3279               if (lastReport + period <= cur) {
3280                 status.setStatus("Replaying edits..." +
3281                     " skipped=" + skippedEdits +
3282                     " edits=" + editsCount);
3283                 // Timeout reached
3284                 if(!reporter.progress()) {
3285                   msg = "Progressable reporter failed, stopping replay";
3286                   LOG.warn(msg);
3287                   status.abort(msg);
3288                   throw new IOException(msg);
3289                 }
3290                 reported_once = true;
3291                 lastReport = cur;
3292               }
3293             }
3294           }
3295 
3296           // Start coprocessor replay here. The coprocessor is for each WALEdit
3297           // instead of a KeyValue.
3298           if (coprocessorHost != null) {
3299             status.setStatus("Running pre-WAL-restore hook in coprocessors");
3300             if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3301               // if bypass this log entry, ignore it ...
3302               continue;
3303             }
3304           }
3305 
3306           if (firstSeqIdInLog == -1) {
3307             firstSeqIdInLog = key.getLogSeqNum();
3308           }
3309           currentEditSeqId = key.getLogSeqNum();
3310           boolean flush = false;
3311           for (KeyValue kv: val.getKeyValues()) {
3312             // Check this edit is for me. Also, guard against writing the special
3313             // METACOLUMN info such as HBASE::CACHEFLUSH entries
3314             if (kv.matchingFamily(WALEdit.METAFAMILY) ||
3315                 !Bytes.equals(key.getEncodedRegionName(),
3316                   this.getRegionInfo().getEncodedNameAsBytes())) {
3317               //this is a special edit, we should handle it
3318               CompactionDescriptor compaction = WALEdit.getCompaction(kv);
3319               if (compaction != null) {
3320                 //replay the compaction
3321                 completeCompactionMarker(compaction);
3322               }
3323 
3324               skippedEdits++;
3325               continue;
3326             }
3327             // Figure which store the edit is meant for.
3328             if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
3329               store = this.stores.get(kv.getFamily());
3330             }
3331             if (store == null) {
3332               // This should never happen.  Perhaps schema was changed between
3333               // crash and redeploy?
3334               LOG.warn("No family for " + kv);
3335               skippedEdits++;
3336               continue;
3337             }
3338             // Now, figure if we should skip this edit.
3339             if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3340                 .getName())) {
3341               skippedEdits++;
3342               continue;
3343             }
3344             // Once we are over the limit, restoreEdit will keep returning true to
3345             // flush -- but don't flush until we've played all the kvs that make up
3346             // the WALEdit.
3347             flush = restoreEdit(store, kv);
3348             editsCount++;
3349           }
3350           if (flush) internalFlushcache(null, currentEditSeqId, status);
3351 
3352           if (coprocessorHost != null) {
3353             coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3354           }
3355         }
3356       } catch (EOFException eof) {
3357         Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3358         msg = "Encountered EOF. Most likely due to Master failure during " +
3359             "log spliting, so we have this data in another edit.  " +
3360             "Continuing, but renaming " + edits + " as " + p;
3361         LOG.warn(msg, eof);
3362         status.abort(msg);
3363       } catch (IOException ioe) {
3364         // If the IOE resulted from bad file format,
3365         // then this problem is idempotent and retrying won't help
3366         if (ioe.getCause() instanceof ParseException) {
3367           Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3368           msg = "File corruption encountered!  " +
3369               "Continuing, but renaming " + edits + " as " + p;
3370           LOG.warn(msg, ioe);
3371           status.setStatus(msg);
3372         } else {
3373           status.abort(StringUtils.stringifyException(ioe));
3374           // other IO errors may be transient (bad network connection,
3375           // checksum exception on one datanode, etc).  throw & retry
3376           throw ioe;
3377         }
3378       }
3379       if (reporter != null && !reported_once) {
3380         reporter.progress();
3381       }
3382       msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3383         ", firstSequenceidInLog=" + firstSeqIdInLog +
3384         ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits;
3385       status.markComplete(msg);
3386       LOG.debug(msg);
3387       return currentEditSeqId;
3388     } finally {
3389       status.cleanup();
3390       if (reader != null) {
3391          reader.close();
3392       }
3393     }
3394   }
3395 
3396   /**
3397    * Call to complete a compaction. Its for the case where we find in the WAL a compaction
3398    * that was not finished.  We could find one recovering a WAL after a regionserver crash.
3399    * See HBASE-2331.
3400    * @param compaction
3401    */
3402   void completeCompactionMarker(CompactionDescriptor compaction)
3403       throws IOException {
3404     Store store = this.getStore(compaction.getFamilyName().toByteArray());
3405     if (store == null) {
3406       LOG.warn("Found Compaction WAL edit for deleted family:" +
3407           Bytes.toString(compaction.getFamilyName().toByteArray()));
3408       return;
3409     }
3410     store.completeCompactionMarker(compaction);
3411   }
3412 
3413   /**
3414    * Used by tests
3415    * @param s Store to add edit too.
3416    * @param kv KeyValue to add.
3417    * @return True if we should flush.
3418    */
3419   protected boolean restoreEdit(final Store s, final KeyValue kv) {
3420     long kvSize = s.add(kv);
3421     if (this.rsAccounting != null) {
3422       rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3423     }
3424     return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3425   }
3426 
3427   /*
3428    * @param fs
3429    * @param p File to check.
3430    * @return True if file was zero-length (and if so, we'll delete it in here).
3431    * @throws IOException
3432    */
3433   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3434       throws IOException {
3435     FileStatus stat = fs.getFileStatus(p);
3436     if (stat.getLen() > 0) return false;
3437     LOG.warn("File " + p + " is zero-length, deleting.");
3438     fs.delete(p, false);
3439     return true;
3440   }
3441 
3442   protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3443     return new HStore(this, family, this.conf);
3444   }
3445 
3446   /**
3447    * Return HStore instance.
3448    * Use with caution.  Exposed for use of fixup utilities.
3449    * @param column Name of column family hosted by this region.
3450    * @return Store that goes with the family on passed <code>column</code>.
3451    * TODO: Make this lookup faster.
3452    */
3453   public Store getStore(final byte[] column) {
3454     return this.stores.get(column);
3455   }
3456 
3457   public Map<byte[], Store> getStores() {
3458     return this.stores;
3459   }
3460 
3461   /**
3462    * Return list of storeFiles for the set of CFs.
3463    * Uses closeLock to prevent the race condition where a region closes
3464    * in between the for loop - closing the stores one by one, some stores
3465    * will return 0 files.
3466    * @return List of storeFiles.
3467    */
3468   public List<String> getStoreFileList(final byte [][] columns)
3469     throws IllegalArgumentException {
3470     List<String> storeFileNames = new ArrayList<String>();
3471     synchronized(closeLock) {
3472       for(byte[] column : columns) {
3473         Store store = this.stores.get(column);
3474         if (store == null) {
3475           throw new IllegalArgumentException("No column family : " +
3476               new String(column) + " available");
3477         }
3478         for (StoreFile storeFile: store.getStorefiles()) {
3479           storeFileNames.add(storeFile.getPath().toString());
3480         }
3481       }
3482     }
3483     return storeFileNames;
3484   }
3485   //////////////////////////////////////////////////////////////////////////////
3486   // Support code
3487   //////////////////////////////////////////////////////////////////////////////
3488 
3489   /** Make sure this is a valid row for the HRegion */
3490   void checkRow(final byte [] row, String op) throws IOException {
3491     if (!rowIsInRange(getRegionInfo(), row)) {
3492       throw new WrongRegionException("Requested row out of range for " +
3493           op + " on HRegion " + this + ", startKey='" +
3494           Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3495           Bytes.toStringBinary(getEndKey()) + "', row='" +
3496           Bytes.toStringBinary(row) + "'");
3497     }
3498   }
3499 
3500   /**
3501    * Tries to acquire a lock on the given row.
3502    * @param waitForLock if true, will block until the lock is available.
3503    *        Otherwise, just tries to obtain the lock and returns
3504    *        false if unavailable.
3505    * @return the row lock if acquired,
3506    *   null if waitForLock was false and the lock was not acquired
3507    * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
3508    */
3509   public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3510     checkRow(row, "row lock");
3511     startRegionOperation();
3512     try {
3513       HashedBytes rowKey = new HashedBytes(row);
3514       RowLockContext rowLockContext = new RowLockContext(rowKey);
3515 
3516       // loop until we acquire the row lock (unless !waitForLock)
3517       while (true) {
3518         RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3519         if (existingContext == null) {
3520           // Row is not already locked by any thread, use newly created context.
3521           break;
3522         } else if (existingContext.ownedByCurrentThread()) {
3523           // Row is already locked by current thread, reuse existing context instead.
3524           rowLockContext = existingContext;
3525           break;
3526         } else {
3527           // Row is already locked by some other thread, give up or wait for it
3528           if (!waitForLock) {
3529             return null;
3530           }
3531           try {
3532             if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
3533               throw new IOException("Timed out waiting for lock for row: " + rowKey);
3534             }
3535           } catch (InterruptedException ie) {
3536             LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
3537             InterruptedIOException iie = new InterruptedIOException();
3538             iie.initCause(ie);
3539             throw iie;
3540           }
3541         }
3542       }
3543 
3544       // allocate new lock for this thread
3545       return rowLockContext.newLock();
3546     } finally {
3547       closeRegionOperation();
3548     }
3549   }
3550 
3551   /**
3552    * Acqures a lock on the given row.
3553    * The same thread may acquire multiple locks on the same row.
3554    * @return the acquired row lock
3555    * @throws IOException if the lock could not be acquired after waiting
3556    */
3557   public RowLock getRowLock(byte[] row) throws IOException {
3558     return getRowLock(row, true);
3559   }
3560 
3561   /**
3562    * If the given list of row locks is not null, releases all locks.
3563    */
3564   public void releaseRowLocks(List<RowLock> rowLocks) {
3565     if (rowLocks != null) {
3566       for (RowLock rowLock : rowLocks) {
3567         rowLock.release();
3568       }
3569       rowLocks.clear();
3570     }
3571   }
3572 
3573   /**
3574    * Determines whether multiple column families are present
3575    * Precondition: familyPaths is not null
3576    *
3577    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3578    */
3579   private static boolean hasMultipleColumnFamilies(
3580       List<Pair<byte[], String>> familyPaths) {
3581     boolean multipleFamilies = false;
3582     byte[] family = null;
3583     for (Pair<byte[], String> pair : familyPaths) {
3584       byte[] fam = pair.getFirst();
3585       if (family == null) {
3586         family = fam;
3587       } else if (!Bytes.equals(family, fam)) {
3588         multipleFamilies = true;
3589         break;
3590       }
3591     }
3592     return multipleFamilies;
3593   }
3594 
3595 
3596   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3597                                 boolean assignSeqId) throws IOException {
3598     return bulkLoadHFiles(familyPaths, assignSeqId, null);
3599   }
3600 
3601   /**
3602    * Attempts to atomically load a group of hfiles.  This is critical for loading
3603    * rows with multiple column families atomically.
3604    *
3605    * @param familyPaths List of Pair<byte[] column family, String hfilePath>
3606    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
3607    * file about to be bulk loaded
3608    * @param assignSeqId
3609    * @return true if successful, false if failed recoverably
3610    * @throws IOException if failed unrecoverably.
3611    */
3612   public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3613       BulkLoadListener bulkLoadListener) throws IOException {
3614     Preconditions.checkNotNull(familyPaths);
3615     // we need writeLock for multi-family bulk load
3616     startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3617     try {
3618       this.writeRequestsCount.increment();
3619 
3620       // There possibly was a split that happend between when the split keys
3621       // were gathered and before the HReiogn's write lock was taken.  We need
3622       // to validate the HFile region before attempting to bulk load all of them
3623       List<IOException> ioes = new ArrayList<IOException>();
3624       List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3625       for (Pair<byte[], String> p : familyPaths) {
3626         byte[] familyName = p.getFirst();
3627         String path = p.getSecond();
3628 
3629         Store store = getStore(familyName);
3630         if (store == null) {
3631           IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
3632               "No such column family " + Bytes.toStringBinary(familyName));
3633           ioes.add(ioe);
3634         } else {
3635           try {
3636             store.assertBulkLoadHFileOk(new Path(path));
3637           } catch (WrongRegionException wre) {
3638             // recoverable (file doesn't fit in region)
3639             failures.add(p);
3640           } catch (IOException ioe) {
3641             // unrecoverable (hdfs problem)
3642             ioes.add(ioe);
3643           }
3644         }
3645       }
3646 
3647       // validation failed because of some sort of IO problem.
3648       if (ioes.size() != 0) {
3649         IOException e = MultipleIOException.createIOException(ioes);
3650         LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3651         throw e;
3652       }
3653 
3654       // validation failed, bail out before doing anything permanent.
3655       if (failures.size() != 0) {
3656         StringBuilder list = new StringBuilder();
3657         for (Pair<byte[], String> p : failures) {
3658           list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3659             .append(p.getSecond());
3660         }
3661         // problem when validating
3662         LOG.warn("There was a recoverable bulk load failure likely due to a" +
3663             " split.  These (family, HFile) pairs were not loaded: " + list);
3664         return false;
3665       }
3666 
3667       long seqId = -1;
3668       // We need to assign a sequential ID that's in between two memstores in order to preserve
3669       // the guarantee that all the edits lower than the highest sequential ID from all the
3670       // HFiles are flushed on disk. See HBASE-10958.
3671       if (assignSeqId) {
3672         FlushResult fs = this.flushcache();
3673         if (fs.isFlushSucceeded()) {
3674           seqId = fs.flushSequenceId;
3675         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
3676           seqId = this.sequenceId.incrementAndGet();
3677         } else {
3678           throw new IOException("Could not bulk load with an assigned sequential ID because the " +
3679               "flush didn't run. Reason for not flushing: " + fs.failureReason);
3680         }
3681       }
3682 
3683       for (Pair<byte[], String> p : familyPaths) {
3684         byte[] familyName = p.getFirst();
3685         String path = p.getSecond();
3686         Store store = getStore(familyName);
3687         try {
3688           String finalPath = path;
3689           if(bulkLoadListener != null) {
3690             finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3691           }
3692           store.bulkLoadHFile(finalPath, seqId);
3693           if(bulkLoadListener != null) {
3694             bulkLoadListener.doneBulkLoad(familyName, path);
3695           }
3696         } catch (IOException ioe) {
3697           // A failure here can cause an atomicity violation that we currently
3698           // cannot recover from since it is likely a failed HDFS operation.
3699 
3700           // TODO Need a better story for reverting partial failures due to HDFS.
3701           LOG.error("There was a partial failure due to IO when attempting to" +
3702               " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3703           if(bulkLoadListener != null) {
3704             try {
3705               bulkLoadListener.failedBulkLoad(familyName, path);
3706             } catch (Exception ex) {
3707               LOG.error("Error while calling failedBulkLoad for family "+
3708                   Bytes.toString(familyName)+" with path "+path, ex);
3709             }
3710           }
3711           throw ioe;
3712         }
3713       }
3714       return true;
3715     } finally {
3716       closeBulkRegionOperation();
3717     }
3718   }
3719 
3720   @Override
3721   public boolean equals(Object o) {
3722     return o instanceof HRegion && Bytes.equals(this.getRegionName(),
3723                                                 ((HRegion) o).getRegionName());
3724   }
3725 
3726   @Override
3727   public int hashCode() {
3728     return Bytes.hashCode(this.getRegionName());
3729   }
3730 
3731   @Override
3732   public String toString() {
3733     return this.getRegionNameAsString();
3734   }
3735 
3736   /**
3737    * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
3738    */
3739   class RegionScannerImpl implements RegionScanner {
3740     // Package local for testability
3741     KeyValueHeap storeHeap = null;
3742     /** Heap of key-values that are not essential for the provided filters and are thus read
3743      * on demand, if on-demand column family loading is enabled.*/
3744     KeyValueHeap joinedHeap = null;
3745     /**
3746      * If the joined heap data gathering is interrupted due to scan limits, this will
3747      * contain the row for which we are populating the values.*/
3748     protected KeyValue joinedContinuationRow = null;
3749     // KeyValue indicating that limit is reached when scanning
3750     private final KeyValue KV_LIMIT = new KeyValue();
3751     protected final byte[] stopRow;
3752     private final FilterWrapper filter;
3753     private int batch;
3754     protected int isScan;
3755     private boolean filterClosed = false;
3756     private long readPt;
3757     private long maxResultSize;
3758     protected HRegion region;
3759 
3760     @Override
3761     public HRegionInfo getRegionInfo() {
3762       return region.getRegionInfo();
3763     }
3764 
3765     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
3766         throws IOException {
3767 
3768       this.region = region;
3769       this.maxResultSize = scan.getMaxResultSize();
3770       if (scan.hasFilter()) {
3771         this.filter = new FilterWrapper(scan.getFilter());
3772       } else {
3773         this.filter = null;
3774       }
3775 
3776       this.batch = scan.getBatch();
3777       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
3778         this.stopRow = null;
3779       } else {
3780         this.stopRow = scan.getStopRow();
3781       }
3782       // If we are doing a get, we want to be [startRow,endRow] normally
3783       // it is [startRow,endRow) and if startRow=endRow we get nothing.
3784       this.isScan = scan.isGetScan() ? -1 : 0;
3785 
3786       // synchronize on scannerReadPoints so that nobody calculates
3787       // getSmallestReadPoint, before scannerReadPoints is updated.
3788       IsolationLevel isolationLevel = scan.getIsolationLevel();
3789       synchronized(scannerReadPoints) {
3790         this.readPt = getReadpoint(isolationLevel);
3791         scannerReadPoints.put(this, this.readPt);
3792       }
3793 
3794       // Here we separate all scanners into two lists - scanner that provide data required
3795       // by the filter to operate (scanners list) and all others (joinedScanners list).
3796       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
3797       List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
3798       if (additionalScanners != null) {
3799         scanners.addAll(additionalScanners);
3800       }
3801 
3802       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
3803           scan.getFamilyMap().entrySet()) {
3804         Store store = stores.get(entry.getKey());
3805         KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
3806         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
3807           || this.filter.isFamilyEssential(entry.getKey())) {
3808           scanners.add(scanner);
3809         } else {
3810           joinedScanners.add(scanner);
3811         }
3812       }
3813       initializeKVHeap(scanners, joinedScanners, region);
3814     }
3815 
3816     RegionScannerImpl(Scan scan, HRegion region) throws IOException {
3817       this(scan, null, region);
3818     }
3819 
3820     protected void initializeKVHeap(List<KeyValueScanner> scanners,
3821         List<KeyValueScanner> joinedScanners, HRegion region)
3822         throws IOException {
3823       this.storeHeap = new KeyValueHeap(scanners, region.comparator);
3824       if (!joinedScanners.isEmpty()) {
3825         this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
3826       }
3827     }
3828 
3829     @Override
3830     public long getMaxResultSize() {
3831       return maxResultSize;
3832     }
3833 
3834     @Override
3835     public long getMvccReadPoint() {
3836       return this.readPt;
3837     }
3838 
3839     /**
3840      * Reset both the filter and the old filter.
3841      *
3842      * @throws IOException in case a filter raises an I/O exception.
3843      */
3844     protected void resetFilters() throws IOException {
3845       if (filter != null) {
3846         filter.reset();
3847       }
3848     }
3849 
3850     @Override
3851     public boolean next(List<Cell> outResults)
3852         throws IOException {
3853       // apply the batching limit by default
3854       return next(outResults, batch);
3855     }
3856 
3857     @Override
3858     public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
3859       if (this.filterClosed) {
3860         throw new UnknownScannerException("Scanner was closed (timed out?) " +
3861             "after we renewed it. Could be caused by a very slow scanner " +
3862             "or a lengthy garbage collection");
3863       }
3864       startRegionOperation(Operation.SCAN);
3865       readRequestsCount.increment();
3866       try {
3867         return nextRaw(outResults, limit);
3868       } finally {
3869         closeRegionOperation(Operation.SCAN);
3870       }
3871     }
3872 
3873     @Override
3874     public boolean nextRaw(List<Cell> outResults)
3875         throws IOException {
3876       return nextRaw(outResults, batch);
3877     }
3878 
3879     @Override
3880     public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
3881       boolean returnResult;
3882       if (outResults.isEmpty()) {
3883         // Usually outResults is empty. This is true when next is called
3884         // to handle scan or get operation.
3885         returnResult = nextInternal(outResults, limit);
3886       } else {
3887         List<Cell> tmpList = new ArrayList<Cell>();
3888         returnResult = nextInternal(tmpList, limit);
3889         outResults.addAll(tmpList);
3890       }
3891       resetFilters();
3892       if (isFilterDoneInternal()) {
3893         returnResult = false;
3894       }
3895       if (region != null && region.metricsRegion != null) {
3896         long totalSize = 0;
3897         for(Cell c:outResults) {
3898           // TODO clean up
3899           KeyValue kv = KeyValueUtil.ensureKeyValue(c);
3900           totalSize += kv.getLength();
3901         }
3902         region.metricsRegion.updateScanNext(totalSize);
3903       }
3904       return returnResult;
3905     }
3906 
3907 
3908     private void populateFromJoinedHeap(List<Cell> results, int limit)
3909         throws IOException {
3910       assert joinedContinuationRow != null;
3911       KeyValue kv = populateResult(results, this.joinedHeap, limit,
3912           joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(),
3913           joinedContinuationRow.getRowLength());
3914       if (kv != KV_LIMIT) {
3915         // We are done with this row, reset the continuation.
3916         joinedContinuationRow = null;
3917       }
3918       // As the data is obtained from two independent heaps, we need to
3919       // ensure that result list is sorted, because Result relies on that.
3920       Collections.sort(results, comparator);
3921     }
3922 
3923     /**
3924      * Fetches records with currentRow into results list, until next row or limit (if not -1).
3925      * @param results
3926      * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
3927      * @param limit Max amount of KVs to place in result list, -1 means no limit.
3928      * @param currentRow Byte array with key we are fetching.
3929      * @param offset offset for currentRow
3930      * @param length length for currentRow
3931      * @return KV_LIMIT if limit reached, next KeyValue otherwise.
3932      */
3933     private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
3934         byte[] currentRow, int offset, short length) throws IOException {
3935       KeyValue nextKv;
3936       do {
3937         heap.next(results, limit - results.size());
3938         if (limit > 0 && results.size() == limit) {
3939           return KV_LIMIT;
3940         }
3941         nextKv = heap.peek();
3942       } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
3943 
3944       return nextKv;
3945     }
3946 
3947     /*
3948      * @return True if a filter rules the scanner is over, done.
3949      */
3950     @Override
3951     public synchronized boolean isFilterDone() throws IOException {
3952       return isFilterDoneInternal();
3953     }
3954 
3955     private boolean isFilterDoneInternal() throws IOException {
3956       return this.filter != null && this.filter.filterAllRemaining();
3957     }
3958 
3959     private boolean nextInternal(List<Cell> results, int limit)
3960     throws IOException {
3961       if (!results.isEmpty()) {
3962         throw new IllegalArgumentException("First parameter should be an empty list");
3963       }
3964       RpcCallContext rpcCall = RpcServer.getCurrentCall();
3965       // The loop here is used only when at some point during the next we determine
3966       // that due to effects of filters or otherwise, we have an empty row in the result.
3967       // Then we loop and try again. Otherwise, we must get out on the first iteration via return,
3968       // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
3969       // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
3970       while (true) {
3971         if (rpcCall != null) {
3972           // If a user specifies a too-restrictive or too-slow scanner, the
3973           // client might time out and disconnect while the server side
3974           // is still processing the request. We should abort aggressively
3975           // in that case.
3976           long afterTime = rpcCall.disconnectSince();
3977           if (afterTime >= 0) {
3978             throw new CallerDisconnectedException(
3979                 "Aborting on region " + getRegionNameAsString() + ", call " +
3980                     this + " after " + afterTime + " ms, since " +
3981                     "caller disconnected");
3982           }
3983         }
3984 
3985         // Let's see what we have in the storeHeap.
3986         KeyValue current = this.storeHeap.peek();
3987 
3988         byte[] currentRow = null;
3989         int offset = 0;
3990         short length = 0;
3991         if (current != null) {
3992           currentRow = current.getBuffer();
3993           offset = current.getRowOffset();
3994           length = current.getRowLength();
3995         }
3996         boolean stopRow = isStopRow(currentRow, offset, length);
3997         // Check if we were getting data from the joinedHeap and hit the limit.
3998         // If not, then it's main path - getting results from storeHeap.
3999         if (joinedContinuationRow == null) {
4000           // First, check if we are at a stop row. If so, there are no more results.
4001           if (stopRow) {
4002             if (filter != null && filter.hasFilterRow()) {
4003               filter.filterRowCells(results);
4004             }
4005             return false;
4006           }
4007 
4008           // Check if rowkey filter wants to exclude this row. If so, loop to next.
4009           // Technically, if we hit limits before on this row, we don't need this call.
4010           if (filterRowKey(currentRow, offset, length)) {
4011             boolean moreRows = nextRow(currentRow, offset, length);
4012             if (!moreRows) return false;
4013             results.clear();
4014             continue;
4015           }
4016 
4017           KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4018               length);
4019           // Ok, we are good, let's try to get some results from the main heap.
4020           if (nextKv == KV_LIMIT) {
4021             if (this.filter != null && filter.hasFilterRow()) {
4022               throw new IncompatibleFilterException(
4023                 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4024             }
4025             return true; // We hit the limit.
4026           }
4027 
4028           stopRow = nextKv == null ||
4029               isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
4030           // save that the row was empty before filters applied to it.
4031           final boolean isEmptyRow = results.isEmpty();
4032           
4033           // We have the part of the row necessary for filtering (all of it, usually).
4034           // First filter with the filterRow(List).
4035           FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
4036           if (filter != null && filter.hasFilterRow()) {
4037             ret = filter.filterRowCellsWithRet(results);
4038           }
4039           
4040           if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
4041             results.clear();
4042             boolean moreRows = nextRow(currentRow, offset, length);
4043             if (!moreRows) return false;
4044 
4045             // This row was totally filtered out, if this is NOT the last row,
4046             // we should continue on. Otherwise, nothing else to do.
4047             if (!stopRow) continue;
4048             return false;
4049           }
4050 
4051           // Ok, we are done with storeHeap for this row.
4052           // Now we may need to fetch additional, non-essential data into row.
4053           // These values are not needed for filter to work, so we postpone their
4054           // fetch to (possibly) reduce amount of data loads from disk.
4055           if (this.joinedHeap != null) {
4056             KeyValue nextJoinedKv = joinedHeap.peek();
4057             // If joinedHeap is pointing to some other row, try to seek to a correct one.
4058             boolean mayHaveData =
4059               (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
4060               || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
4061                 true, true)
4062                 && joinedHeap.peek() != null
4063                 && joinedHeap.peek().matchingRow(currentRow, offset, length));
4064             if (mayHaveData) {
4065               joinedContinuationRow = current;
4066               populateFromJoinedHeap(results, limit);
4067             }
4068           }
4069         } else {
4070           // Populating from the joined heap was stopped by limits, populate some more.
4071           populateFromJoinedHeap(results, limit);
4072         }
4073 
4074         // We may have just called populateFromJoinedMap and hit the limits. If that is
4075         // the case, we need to call it again on the next next() invocation.
4076         if (joinedContinuationRow != null) {
4077           return true;
4078         }
4079 
4080         // Finally, we are done with both joinedHeap and storeHeap.
4081         // Double check to prevent empty rows from appearing in result. It could be
4082         // the case when SingleColumnValueExcludeFilter is used.
4083         if (results.isEmpty()) {
4084           boolean moreRows = nextRow(currentRow, offset, length);
4085           if (!moreRows) return false;
4086           if (!stopRow) continue;
4087         }
4088 
4089         // We are done. Return the result.
4090         return !stopRow;
4091       }
4092     }
4093 
4094     /**
4095      * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
4096      * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
4097      * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
4098      * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
4099      * filterRow() will be skipped.
4100      */
4101     private boolean filterRow() throws IOException {
4102       // when hasFilterRow returns true, filter.filterRow() will be called automatically inside
4103       // filterRowCells(List<Cell> kvs) so we skip that scenario here.
4104       return filter != null && (!filter.hasFilterRow())
4105           && filter.filterRow();
4106     }
4107     
4108     private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4109       return filter != null
4110           && filter.filterRowKey(row, offset, length);
4111     }
4112 
4113     protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4114       assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4115       KeyValue next;
4116       while ((next = this.storeHeap.peek()) != null &&
4117              next.matchingRow(currentRow, offset, length)) {
4118         this.storeHeap.next(MOCKED_LIST);
4119       }
4120       resetFilters();
4121       // Calling the hook in CP which allows it to do a fast forward
4122       return this.region.getCoprocessorHost() == null
4123           || this.region.getCoprocessorHost()
4124               .postScannerFilterRow(this, currentRow, offset, length);
4125     }
4126 
4127     protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4128       return currentRow == null ||
4129           (stopRow != null &&
4130           comparator.compareRows(stopRow, 0, stopRow.length,
4131             currentRow, offset, length) <= isScan);
4132     }
4133 
4134     @Override
4135     public synchronized void close() {
4136       if (storeHeap != null) {
4137         storeHeap.close();
4138         storeHeap = null;
4139       }
4140       if (joinedHeap != null) {
4141         joinedHeap.close();
4142         joinedHeap = null;
4143       }
4144       // no need to sychronize here.
4145       scannerReadPoints.remove(this);
4146       this.filterClosed = true;
4147     }
4148 
4149     KeyValueHeap getStoreHeapForTesting() {
4150       return storeHeap;
4151     }
4152 
4153     @Override
4154     public synchronized boolean reseek(byte[] row) throws IOException {
4155       if (row == null) {
4156         throw new IllegalArgumentException("Row cannot be null.");
4157       }
4158       boolean result = false;
4159       startRegionOperation();
4160       try {
4161         KeyValue kv = KeyValue.createFirstOnRow(row);
4162         // use request seek to make use of the lazy seek option. See HBASE-5520
4163         result = this.storeHeap.requestSeek(kv, true, true);
4164         if (this.joinedHeap != null) {
4165           result = this.joinedHeap.requestSeek(kv, true, true) || result;
4166         }
4167       } finally {
4168         closeRegionOperation();
4169       }
4170       return result;
4171     }
4172   }
4173 
4174   // Utility methods
4175   /**
4176    * A utility method to create new instances of HRegion based on the
4177    * {@link HConstants#REGION_IMPL} configuration property.
4178    * @param tableDir qualified path of directory where region should be located,
4179    * usually the table directory.
4180    * @param log The HLog is the outbound log for any updates to the HRegion
4181    * (There's a single HLog for all the HRegions on a single HRegionServer.)
4182    * The log file is a logfile from the previous execution that's
4183    * custom-computed for this HRegion. The HRegionServer computes and sorts the
4184    * appropriate log info for this HRegion. If there is a previous log file
4185    * (implying that the HRegion has been written-to before), then read it from
4186    * the supplied path.
4187    * @param fs is the filesystem.
4188    * @param conf is global configuration settings.
4189    * @param regionInfo - HRegionInfo that describes the region
4190    * is new), then read them from the supplied path.
4191    * @param htd the table descriptor
4192    * @param rsServices
4193    * @return the new instance
4194    */
4195   static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
4196       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4197       RegionServerServices rsServices) {
4198     try {
4199       @SuppressWarnings("unchecked")
4200       Class<? extends HRegion> regionClass =
4201           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4202 
4203       Constructor<? extends HRegion> c =
4204           regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
4205               Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4206               RegionServerServices.class);
4207 
4208       return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
4209     } catch (Throwable e) {
4210       // todo: what should I throw here?
4211       throw new IllegalStateException("Could not instantiate a region instance.", e);
4212     }
4213   }
4214 
4215   /**
4216    * Convenience method creating new HRegions. Used by createTable and by the
4217    * bootstrap code in the HMaster constructor.
4218    * Note, this method creates an {@link HLog} for the created region. It
4219    * needs to be closed explicitly.  Use {@link HRegion#getLog()} to get
4220    * access.  <b>When done with a region created using this method, you will
4221    * need to explicitly close the {@link HLog} it created too; it will not be
4222    * done for you.  Not closing the log will leave at least a daemon thread
4223    * running.</b>  Call {@link #closeHRegion(HRegion)} and it will do
4224    * necessary cleanup for you.
4225    * @param info Info for region to create.
4226    * @param rootDir Root directory for HBase instance
4227    * @param conf
4228    * @param hTableDescriptor
4229    * @return new HRegion
4230    *
4231    * @throws IOException
4232    */
4233   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4234       final Configuration conf, final HTableDescriptor hTableDescriptor)
4235   throws IOException {
4236     return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4237   }
4238 
4239   /**
4240    * This will do the necessary cleanup a call to
4241    * {@link #createHRegion(HRegionInfo, Path, Configuration, HTableDescriptor)}
4242    * requires.  This method will close the region and then close its
4243    * associated {@link HLog} file.  You use it if you call the other createHRegion,
4244    * the one that takes an {@link HLog} instance but don't be surprised by the
4245    * call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
4246    * HRegion was carrying.
4247    * @param r
4248    * @throws IOException
4249    */
4250   public static void closeHRegion(final HRegion r) throws IOException {
4251     if (r == null) return;
4252     r.close();
4253     if (r.getLog() == null) return;
4254     r.getLog().closeAndDelete();
4255   }
4256 
4257   /**
4258    * Convenience method creating new HRegions. Used by createTable.
4259    * The {@link HLog} for the created region needs to be closed explicitly.
4260    * Use {@link HRegion#getLog()} to get access.
4261    *
4262    * @param info Info for region to create.
4263    * @param rootDir Root directory for HBase instance
4264    * @param conf
4265    * @param hTableDescriptor
4266    * @param hlog shared HLog
4267    * @param initialize - true to initialize the region
4268    * @return new HRegion
4269    *
4270    * @throws IOException
4271    */
4272   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4273                                       final Configuration conf,
4274                                       final HTableDescriptor hTableDescriptor,
4275                                       final HLog hlog,
4276                                       final boolean initialize)
4277       throws IOException {
4278     return createHRegion(info, rootDir, conf, hTableDescriptor,
4279         hlog, initialize, false);
4280   }
4281 
4282   /**
4283    * Convenience method creating new HRegions. Used by createTable.
4284    * The {@link HLog} for the created region needs to be closed
4285    * explicitly, if it is not null.
4286    * Use {@link HRegion#getLog()} to get access.
4287    *
4288    * @param info Info for region to create.
4289    * @param rootDir Root directory for HBase instance
4290    * @param conf
4291    * @param hTableDescriptor
4292    * @param hlog shared HLog
4293    * @param initialize - true to initialize the region
4294    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4295    * @return new HRegion
4296    * @throws IOException
4297    */
4298   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4299                                       final Configuration conf,
4300                                       final HTableDescriptor hTableDescriptor,
4301                                       final HLog hlog,
4302                                       final boolean initialize, final boolean ignoreHLog)
4303       throws IOException {
4304       Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4305       return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
4306   }
4307 
4308   /**
4309    * Convenience method creating new HRegions. Used by createTable.
4310    * The {@link HLog} for the created region needs to be closed
4311    * explicitly, if it is not null.
4312    * Use {@link HRegion#getLog()} to get access.
4313    *
4314    * @param info Info for region to create.
4315    * @param rootDir Root directory for HBase instance
4316    * @param tableDir table directory
4317    * @param conf
4318    * @param hTableDescriptor
4319    * @param hlog shared HLog
4320    * @param initialize - true to initialize the region
4321    * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
4322    * @return new HRegion
4323    * @throws IOException
4324    */
4325   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
4326                                       final Configuration conf,
4327                                       final HTableDescriptor hTableDescriptor,
4328                                       final HLog hlog,
4329                                       final boolean initialize, final boolean ignoreHLog)
4330       throws IOException {
4331     LOG.info("creating HRegion " + info.getTable().getNameAsString()
4332         + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4333         " Table name == " + info.getTable().getNameAsString());
4334     FileSystem fs = FileSystem.get(conf);
4335     HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4336     HLog effectiveHLog = hlog;
4337     if (hlog == null && !ignoreHLog) {
4338       effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
4339                                              HConstants.HREGION_LOGDIR_NAME, conf);
4340     }
4341     HRegion region = HRegion.newHRegion(tableDir,
4342         effectiveHLog, fs, conf, info, hTableDescriptor, null);
4343     if (initialize) {
4344       // If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
4345       // verifying the WALEdits.
4346       region.setSequenceId(region.initialize());
4347     }
4348     return region;
4349   }
4350 
4351   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4352                                       final Configuration conf,
4353                                       final HTableDescriptor hTableDescriptor,
4354                                       final HLog hlog)
4355     throws IOException {
4356     return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4357   }
4358 
4359 
4360   /**
4361    * Open a Region.
4362    * @param info Info for region to be opened.
4363    * @param wal HLog for region to use. This method will call
4364    * HLog#setSequenceNumber(long) passing the result of the call to
4365    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4366    * up.  HRegionStore does this every time it opens a new region.
4367    * @param conf
4368    * @return new HRegion
4369    *
4370    * @throws IOException
4371    */
4372   public static HRegion openHRegion(final HRegionInfo info,
4373       final HTableDescriptor htd, final HLog wal,
4374       final Configuration conf)
4375   throws IOException {
4376     return openHRegion(info, htd, wal, conf, null, null);
4377   }
4378 
4379   /**
4380    * Open a Region.
4381    * @param info Info for region to be opened
4382    * @param htd the table descriptor
4383    * @param wal HLog for region to use. This method will call
4384    * HLog#setSequenceNumber(long) passing the result of the call to
4385    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4386    * up.  HRegionStore does this every time it opens a new region.
4387    * @param conf The Configuration object to use.
4388    * @param rsServices An interface we can request flushes against.
4389    * @param reporter An interface we can report progress against.
4390    * @return new HRegion
4391    *
4392    * @throws IOException
4393    */
4394   public static HRegion openHRegion(final HRegionInfo info,
4395     final HTableDescriptor htd, final HLog wal, final Configuration conf,
4396     final RegionServerServices rsServices,
4397     final CancelableProgressable reporter)
4398   throws IOException {
4399     return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4400   }
4401 
4402   /**
4403    * Open a Region.
4404    * @param rootDir Root directory for HBase instance
4405    * @param info Info for region to be opened.
4406    * @param htd the table descriptor
4407    * @param wal HLog for region to use. This method will call
4408    * HLog#setSequenceNumber(long) passing the result of the call to
4409    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4410    * up.  HRegionStore does this every time it opens a new region.
4411    * @param conf The Configuration object to use.
4412    * @return new HRegion
4413    * @throws IOException
4414    */
4415   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4416       final HTableDescriptor htd, final HLog wal, final Configuration conf)
4417   throws IOException {
4418     return openHRegion(rootDir, info, htd, wal, conf, null, null);
4419   }
4420 
4421   /**
4422    * Open a Region.
4423    * @param rootDir Root directory for HBase instance
4424    * @param info Info for region to be opened.
4425    * @param htd the table descriptor
4426    * @param wal HLog for region to use. This method will call
4427    * HLog#setSequenceNumber(long) passing the result of the call to
4428    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4429    * up.  HRegionStore does this every time it opens a new region.
4430    * @param conf The Configuration object to use.
4431    * @param rsServices An interface we can request flushes against.
4432    * @param reporter An interface we can report progress against.
4433    * @return new HRegion
4434    * @throws IOException
4435    */
4436   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4437       final HTableDescriptor htd, final HLog wal, final Configuration conf,
4438       final RegionServerServices rsServices,
4439       final CancelableProgressable reporter)
4440   throws IOException {
4441     FileSystem fs = null;
4442     if (rsServices != null) {
4443       fs = rsServices.getFileSystem();
4444     }
4445     if (fs == null) {
4446       fs = FileSystem.get(conf);
4447     }
4448     return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4449   }
4450 
4451   /**
4452    * Open a Region.
4453    * @param conf The Configuration object to use.
4454    * @param fs Filesystem to use
4455    * @param rootDir Root directory for HBase instance
4456    * @param info Info for region to be opened.
4457    * @param htd the table descriptor
4458    * @param wal HLog for region to use. This method will call
4459    * HLog#setSequenceNumber(long) passing the result of the call to
4460    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4461    * up.  HRegionStore does this every time it opens a new region.
4462    * @return new HRegion
4463    * @throws IOException
4464    */
4465   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4466       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
4467       throws IOException {
4468     return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4469   }
4470 
4471   /**
4472    * Open a Region.
4473    * @param conf The Configuration object to use.
4474    * @param fs Filesystem to use
4475    * @param rootDir Root directory for HBase instance
4476    * @param info Info for region to be opened.
4477    * @param htd the table descriptor
4478    * @param wal HLog for region to use. This method will call
4479    * HLog#setSequenceNumber(long) passing the result of the call to
4480    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4481    * up.  HRegionStore does this every time it opens a new region.
4482    * @param rsServices An interface we can request flushes against.
4483    * @param reporter An interface we can report progress against.
4484    * @return new HRegion
4485    * @throws IOException
4486    */
4487   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4488       final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4489       final RegionServerServices rsServices, final CancelableProgressable reporter)
4490       throws IOException {
4491     Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4492     return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4493   }
4494 
4495   /**
4496    * Open a Region.
4497    * @param conf The Configuration object to use.
4498    * @param fs Filesystem to use
4499    * @param rootDir Root directory for HBase instance
4500    * @param info Info for region to be opened.
4501    * @param htd the table descriptor
4502    * @param wal HLog for region to use. This method will call
4503    * HLog#setSequenceNumber(long) passing the result of the call to
4504    * HRegion#getMinSequenceId() to ensure the log id is properly kept
4505    * up.  HRegionStore does this every time it opens a new region.
4506    * @param rsServices An interface we can request flushes against.
4507    * @param reporter An interface we can report progress against.
4508    * @return new HRegion
4509    * @throws IOException
4510    */
4511   public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4512       final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4513       final RegionServerServices rsServices, final CancelableProgressable reporter)
4514       throws IOException {
4515     if (info == null) throw new NullPointerException("Passed region info is null");
4516     if (LOG.isDebugEnabled()) {
4517       LOG.debug("Opening region: " + info);
4518     }
4519     HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4520     return r.openHRegion(reporter);
4521   }
4522 
4523 
4524   /**
4525    * Useful when reopening a closed region (normally for unit tests)
4526    * @param other original object
4527    * @param reporter An interface we can report progress against.
4528    * @return new HRegion
4529    * @throws IOException
4530    */
4531   public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4532       throws IOException {
4533     HRegionFileSystem regionFs = other.getRegionFileSystem();
4534     HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
4535         other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4536     return r.openHRegion(reporter);
4537   }
4538 
4539   /**
4540    * Open HRegion.
4541    * Calls initialize and sets sequenceid.
4542    * @param reporter
4543    * @return Returns <code>this</code>
4544    * @throws IOException
4545    */
4546   protected HRegion openHRegion(final CancelableProgressable reporter)
4547   throws IOException {
4548     checkCompressionCodecs();
4549 
4550     this.openSeqNum = initialize(reporter);
4551     this.setSequenceId(openSeqNum);
4552     return this;
4553   }
4554 
4555   private void checkCompressionCodecs() throws IOException {
4556     for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4557       CompressionTest.testCompression(fam.getCompression());
4558       CompressionTest.testCompression(fam.getCompactionCompression());
4559     }
4560   }
4561 
4562   /**
4563    * Create a daughter region from given a temp directory with the region data.
4564    * @param hri Spec. for daughter region to open.
4565    * @throws IOException
4566    */
4567   HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4568     // Move the files from the temporary .splits to the final /table/region directory
4569     fs.commitDaughterRegion(hri);
4570 
4571     // Create the daughter HRegion instance
4572     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
4573         this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4574     r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4575     r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4576     return r;
4577   }
4578 
4579   /**
4580    * Create a merged region given a temp directory with the region data.
4581    * @param mergedRegionInfo
4582    * @param region_b another merging region
4583    * @return merged hregion
4584    * @throws IOException
4585    */
4586   HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4587       final HRegion region_b) throws IOException {
4588     HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
4589         fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4590         this.getTableDesc(), this.rsServices);
4591     r.readRequestsCount.set(this.getReadRequestsCount()
4592         + region_b.getReadRequestsCount());
4593     r.writeRequestsCount.set(this.getWriteRequestsCount()
4594         + region_b.getWriteRequestsCount());
4595     this.fs.commitMergedRegion(mergedRegionInfo);
4596     return r;
4597   }
4598 
4599   /**
4600    * Inserts a new region's meta information into the passed
4601    * <code>meta</code> region. Used by the HMaster bootstrap code adding
4602    * new table to hbase:meta table.
4603    *
4604    * @param meta hbase:meta HRegion to be updated
4605    * @param r HRegion to add to <code>meta</code>
4606    *
4607    * @throws IOException
4608    */
4609   // TODO remove since only test and merge use this
4610   public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4611     meta.checkResources();
4612     // The row key is the region name
4613     byte[] row = r.getRegionName();
4614     final long now = EnvironmentEdgeManager.currentTimeMillis();
4615     final List<Cell> cells = new ArrayList<Cell>(2);
4616     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4617       HConstants.REGIONINFO_QUALIFIER, now,
4618       r.getRegionInfo().toByteArray()));
4619     // Set into the root table the version of the meta table.
4620     cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4621       HConstants.META_VERSION_QUALIFIER, now,
4622       Bytes.toBytes(HConstants.META_VERSION)));
4623     meta.put(row, HConstants.CATALOG_FAMILY, cells);
4624   }
4625 
4626   /**
4627    * Computes the Path of the HRegion
4628    *
4629    * @param tabledir qualified path for table
4630    * @param name ENCODED region name
4631    * @return Path of HRegion directory
4632    */
4633   @Deprecated
4634   public static Path getRegionDir(final Path tabledir, final String name) {
4635     return new Path(tabledir, name);
4636   }
4637 
4638   /**
4639    * Computes the Path of the HRegion
4640    *
4641    * @param rootdir qualified path of HBase root directory
4642    * @param info HRegionInfo for the region
4643    * @return qualified path of region directory
4644    */
4645   @Deprecated
4646   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4647     return new Path(
4648       FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
4649   }
4650 
4651   /**
4652    * Determines if the specified row is within the row range specified by the
4653    * specified HRegionInfo
4654    *
4655    * @param info HRegionInfo that specifies the row range
4656    * @param row row to be checked
4657    * @return true if the row is within the range specified by the HRegionInfo
4658    */
4659   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4660     return ((info.getStartKey().length == 0) ||
4661         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4662         ((info.getEndKey().length == 0) ||
4663             (Bytes.compareTo(info.getEndKey(), row) > 0));
4664   }
4665 
4666   /**
4667    * Merge two HRegions.  The regions must be adjacent and must not overlap.
4668    *
4669    * @param srcA
4670    * @param srcB
4671    * @return new merged HRegion
4672    * @throws IOException
4673    */
4674   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4675   throws IOException {
4676     HRegion a = srcA;
4677     HRegion b = srcB;
4678 
4679     // Make sure that srcA comes first; important for key-ordering during
4680     // write of the merged file.
4681     if (srcA.getStartKey() == null) {
4682       if (srcB.getStartKey() == null) {
4683         throw new IOException("Cannot merge two regions with null start key");
4684       }
4685       // A's start key is null but B's isn't. Assume A comes before B
4686     } else if ((srcB.getStartKey() == null) ||
4687       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4688       a = srcB;
4689       b = srcA;
4690     }
4691 
4692     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4693       throw new IOException("Cannot merge non-adjacent regions");
4694     }
4695     return merge(a, b);
4696   }
4697 
4698   /**
4699    * Merge two regions whether they are adjacent or not.
4700    *
4701    * @param a region a
4702    * @param b region b
4703    * @return new merged region
4704    * @throws IOException
4705    */
4706   public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4707     if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
4708       throw new IOException("Regions do not belong to the same table");
4709     }
4710 
4711     FileSystem fs = a.getRegionFileSystem().getFileSystem();
4712     // Make sure each region's cache is empty
4713     a.flushcache();
4714     b.flushcache();
4715 
4716     // Compact each region so we only have one store file per family
4717     a.compactStores(true);
4718     if (LOG.isDebugEnabled()) {
4719       LOG.debug("Files for region: " + a);
4720       a.getRegionFileSystem().logFileSystemState(LOG);
4721     }
4722     b.compactStores(true);
4723     if (LOG.isDebugEnabled()) {
4724       LOG.debug("Files for region: " + b);
4725       b.getRegionFileSystem().logFileSystemState(LOG);
4726     }
4727 
4728     RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4729     if (!rmt.prepare(null)) {
4730       throw new IOException("Unable to merge regions " + a + " and " + b);
4731     }
4732     HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4733     LOG.info("starting merge of regions: " + a + " and " + b
4734         + " into new region " + mergedRegionInfo.getRegionNameAsString()
4735         + " with start key <"
4736         + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4737         + "> and end key <"
4738         + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4739     HRegion dstRegion;
4740     try {
4741       dstRegion = rmt.execute(null, null);
4742     } catch (IOException ioe) {
4743       rmt.rollback(null, null);
4744       throw new IOException("Failed merging region " + a + " and " + b
4745           + ", and succssfully rolled back");
4746     }
4747     dstRegion.compactStores(true);
4748 
4749     if (LOG.isDebugEnabled()) {
4750       LOG.debug("Files for new region");
4751       dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4752     }
4753 
4754     if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4755       throw new IOException("Merged region " + dstRegion
4756           + " still has references after the compaction, is compaction canceled?");
4757     }
4758 
4759     // Archiving the 'A' region
4760     HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4761     // Archiving the 'B' region
4762     HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4763 
4764     LOG.info("merge completed. New region is " + dstRegion);
4765     return dstRegion;
4766   }
4767 
4768   /**
4769    * @return True if needs a major compaction.
4770    * @throws IOException
4771    */
4772   boolean isMajorCompaction() throws IOException {
4773     for (Store store : this.stores.values()) {
4774       if (store.isMajorCompaction()) {
4775         return true;
4776       }
4777     }
4778     return false;
4779   }
4780 
4781   //
4782   // HBASE-880
4783   //
4784   /**
4785    * @param get get object
4786    * @return result
4787    * @throws IOException read exceptions
4788    */
4789   public Result get(final Get get) throws IOException {
4790     checkRow(get.getRow(), "Get");
4791     // Verify families are all valid
4792     if (get.hasFamilies()) {
4793       for (byte [] family: get.familySet()) {
4794         checkFamily(family);
4795       }
4796     } else { // Adding all families to scanner
4797       for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
4798         get.addFamily(family);
4799       }
4800     }
4801     List<Cell> results = get(get, true);
4802     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null);
4803   }
4804 
4805   /*
4806    * Do a get based on the get parameter.
4807    * @param withCoprocessor invoke coprocessor or not. We don't want to
4808    * always invoke cp for this private method.
4809    */
4810   public List<Cell> get(Get get, boolean withCoprocessor)
4811   throws IOException {
4812 
4813     List<Cell> results = new ArrayList<Cell>();
4814 
4815     // pre-get CP hook
4816     if (withCoprocessor && (coprocessorHost != null)) {
4817        if (coprocessorHost.preGet(get, results)) {
4818          return results;
4819        }
4820     }
4821 
4822     Scan scan = new Scan(get);
4823 
4824     RegionScanner scanner = null;
4825     try {
4826       scanner = getScanner(scan);
4827       scanner.next(results);
4828     } finally {
4829       if (scanner != null)
4830         scanner.close();
4831     }
4832 
4833     // post-get CP hook
4834     if (withCoprocessor && (coprocessorHost != null)) {
4835       coprocessorHost.postGet(get, results);
4836     }
4837 
4838     // do after lock
4839     if (this.metricsRegion != null) {
4840       long totalSize = 0l;
4841       if (results != null) {
4842         for (Cell kv:results) {
4843           totalSize += KeyValueUtil.ensureKeyValue(kv).getLength();
4844         }
4845       }
4846       this.metricsRegion.updateGet(totalSize);
4847     }
4848 
4849     return results;
4850   }
4851 
4852   public void mutateRow(RowMutations rm) throws IOException {
4853     // Don't need nonces here - RowMutations only supports puts and deletes
4854     mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
4855   }
4856 
4857   /**
4858    * Perform atomic mutations within the region w/o nonces.
4859    * See {@link #mutateRowsWithLocks(Collection, Collection, long, long)}
4860    */
4861   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4862       Collection<byte[]> rowsToLock) throws IOException {
4863     mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
4864   }
4865 
4866   /**
4867    * Perform atomic mutations within the region.
4868    * @param mutations The list of mutations to perform.
4869    * <code>mutations</code> can contain operations for multiple rows.
4870    * Caller has to ensure that all rows are contained in this region.
4871    * @param rowsToLock Rows to lock
4872    * @param nonceGroup Optional nonce group of the operation (client Id)
4873    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4874    * If multiple rows are locked care should be taken that
4875    * <code>rowsToLock</code> is sorted in order to avoid deadlocks.
4876    * @throws IOException
4877    */
4878   public void mutateRowsWithLocks(Collection<Mutation> mutations,
4879       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
4880     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
4881     processRowsWithLocks(proc, -1, nonceGroup, nonce);
4882   }
4883 
4884   /**
4885    * Performs atomic multiple reads and writes on a given row.
4886    *
4887    * @param processor The object defines the reads and writes to a row.
4888    * @param nonceGroup Optional nonce group of the operation (client Id)
4889    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4890    */
4891   public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
4892       throws IOException {
4893     processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
4894   }
4895 
4896   /**
4897    * Performs atomic multiple reads and writes on a given row.
4898    *
4899    * @param processor The object defines the reads and writes to a row.
4900    * @param timeout The timeout of the processor.process() execution
4901    *                Use a negative number to switch off the time bound
4902    * @param nonceGroup Optional nonce group of the operation (client Id)
4903    * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
4904    */
4905   public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
4906       long nonceGroup, long nonce) throws IOException {
4907 
4908     for (byte[] row : processor.getRowsToLock()) {
4909       checkRow(row, "processRowsWithLocks");
4910     }
4911     if (!processor.readOnly()) {
4912       checkReadOnly();
4913     }
4914     checkResources();
4915 
4916     startRegionOperation();
4917     WALEdit walEdit = new WALEdit();
4918 
4919     // 1. Run pre-process hook
4920     try {
4921       processor.preProcess(this, walEdit);
4922     } catch (IOException e) {
4923       closeRegionOperation();
4924       throw e;
4925     }
4926     // Short circuit the read only case
4927     if (processor.readOnly()) {
4928       try {
4929         long now = EnvironmentEdgeManager.currentTimeMillis();
4930         doProcessRowWithTimeout(
4931             processor, now, this, null, null, timeout);
4932         processor.postProcess(this, walEdit, true);
4933       } catch (IOException e) {
4934         throw e;
4935       } finally {
4936         closeRegionOperation();
4937       }
4938       return;
4939     }
4940 
4941     MultiVersionConsistencyControl.WriteEntry writeEntry = null;
4942     boolean locked = false;
4943     boolean walSyncSuccessful = false;
4944     List<RowLock> acquiredRowLocks = null;
4945     long addedSize = 0;
4946     List<Mutation> mutations = new ArrayList<Mutation>();
4947     Collection<byte[]> rowsToLock = processor.getRowsToLock();
4948     try {
4949       // 2. Acquire the row lock(s)
4950       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
4951       for (byte[] row : rowsToLock) {
4952         // Attempt to lock all involved rows, throw if any lock times out
4953         acquiredRowLocks.add(getRowLock(row));
4954       }
4955       // 3. Region lock
4956       lock(this.updatesLock.readLock(), acquiredRowLocks.size());
4957       locked = true;
4958 
4959       long now = EnvironmentEdgeManager.currentTimeMillis();
4960       try {
4961         // 4. Let the processor scan the rows, generate mutations and add
4962         //    waledits
4963         doProcessRowWithTimeout(
4964             processor, now, this, mutations, walEdit, timeout);
4965 
4966         if (!mutations.isEmpty()) {
4967           // 5. Get a mvcc write number
4968           writeEntry = mvcc.beginMemstoreInsert();
4969           // 6. Call the preBatchMutate hook
4970           processor.preBatchMutate(this, walEdit);
4971           // 7. Apply to memstore
4972           for (Mutation m : mutations) {
4973             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
4974               KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
4975               kv.setMvccVersion(writeEntry.getWriteNumber());
4976               byte[] family = kv.getFamily();
4977               checkFamily(family);
4978               addedSize += stores.get(family).add(kv);
4979             }
4980           }
4981 
4982           long txid = 0;
4983           // 8. Append no sync
4984           if (!walEdit.isEmpty()) {
4985             txid = this.log.appendNoSync(this.getRegionInfo(),
4986               this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now,
4987               this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce);
4988           }
4989           // 9. Release region lock
4990           if (locked) {
4991             this.updatesLock.readLock().unlock();
4992             locked = false;
4993           }
4994 
4995           // 10. Release row lock(s)
4996           releaseRowLocks(acquiredRowLocks);
4997 
4998           // 11. Sync edit log
4999           if (txid != 0) {
5000             syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
5001           }
5002           walSyncSuccessful = true;
5003           // 12. call postBatchMutate hook
5004           processor.postBatchMutate(this);
5005         }
5006       } finally {
5007         if (!mutations.isEmpty() && !walSyncSuccessful) {
5008           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
5009               " memstore keyvalues for row(s):" +
5010               processor.getRowsToLock().iterator().next() + "...");
5011           for (Mutation m : mutations) {
5012             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5013               KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
5014               stores.get(kv.getFamily()).rollback(kv);
5015             }
5016           }
5017         }
5018         // 13. Roll mvcc forward
5019         if (writeEntry != null) {
5020           mvcc.completeMemstoreInsert(writeEntry);
5021           writeEntry = null;
5022         }
5023         if (locked) {
5024           this.updatesLock.readLock().unlock();
5025           locked = false;
5026         }
5027         // release locks if some were acquired but another timed out
5028         releaseRowLocks(acquiredRowLocks);
5029       }
5030 
5031       // 14. Run post-process hook
5032       processor.postProcess(this, walEdit, walSyncSuccessful);
5033 
5034     } catch (IOException e) {
5035       throw e;
5036     } finally {
5037       closeRegionOperation();
5038       if (!mutations.isEmpty() &&
5039           isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
5040         requestFlush();
5041       }
5042     }
5043   }
5044 
5045   private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
5046                                        final long now,
5047                                        final HRegion region,
5048                                        final List<Mutation> mutations,
5049                                        final WALEdit walEdit,
5050                                        final long timeout) throws IOException {
5051     // Short circuit the no time bound case.
5052     if (timeout < 0) {
5053       try {
5054         processor.process(now, region, mutations, walEdit);
5055       } catch (IOException e) {
5056         LOG.warn("RowProcessor:" + processor.getClass().getName() +
5057             " throws Exception on row(s):" +
5058             Bytes.toStringBinary(
5059               processor.getRowsToLock().iterator().next()) + "...", e);
5060         throw e;
5061       }
5062       return;
5063     }
5064 
5065     // Case with time bound
5066     FutureTask<Void> task =
5067       new FutureTask<Void>(new Callable<Void>() {
5068         @Override
5069         public Void call() throws IOException {
5070           try {
5071             processor.process(now, region, mutations, walEdit);
5072             return null;
5073           } catch (IOException e) {
5074             LOG.warn("RowProcessor:" + processor.getClass().getName() +
5075                 " throws Exception on row(s):" +
5076                 Bytes.toStringBinary(
5077                     processor.getRowsToLock().iterator().next()) + "...", e);
5078             throw e;
5079           }
5080         }
5081       });
5082     rowProcessorExecutor.execute(task);
5083     try {
5084       task.get(timeout, TimeUnit.MILLISECONDS);
5085     } catch (TimeoutException te) {
5086       LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5087           Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5088           "...");
5089       throw new IOException(te);
5090     } catch (Exception e) {
5091       throw new IOException(e);
5092     }
5093   }
5094 
5095   public Result append(Append append) throws IOException {
5096     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5097   }
5098 
5099   // TODO: There's a lot of boiler plate code identical
5100   // to increment... See how to better unify that.
5101   /**
5102    * Perform one or more append operations on a row.
5103    *
5104    * @param append
5105    * @return new keyvalues after increment
5106    * @throws IOException
5107    */
5108   public Result append(Append append, long nonceGroup, long nonce)
5109       throws IOException {
5110     byte[] row = append.getRow();
5111     checkRow(row, "append");
5112     boolean flush = false;
5113     Durability durability = getEffectiveDurability(append.getDurability());
5114     boolean writeToWAL = durability != Durability.SKIP_WAL;
5115     WALEdit walEdits = null;
5116     List<Cell> allKVs = new ArrayList<Cell>(append.size());
5117     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5118 
5119     long size = 0;
5120     long txid = 0;
5121 
5122     checkReadOnly();
5123     checkResources();
5124     // Lock row
5125     startRegionOperation(Operation.APPEND);
5126     this.writeRequestsCount.increment();
5127     WriteEntry w = null;
5128     RowLock rowLock;
5129     try {
5130       rowLock = getRowLock(row);
5131       try {
5132         lock(this.updatesLock.readLock());
5133         try {
5134           // wait for all prior MVCC transactions to finish - while we hold the row lock
5135           // (so that we are guaranteed to see the latest state)
5136           mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
5137           if (this.coprocessorHost != null) {
5138             Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5139             if(r!= null) {
5140               return r;
5141             }
5142           }
5143           // now start my own transaction
5144           w = mvcc.beginMemstoreInsert();
5145           long now = EnvironmentEdgeManager.currentTimeMillis();
5146           // Process each family
5147           for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5148 
5149             Store store = stores.get(family.getKey());
5150             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5151 
5152             // Sort the cells so that they match the order that they
5153             // appear in the Get results. Otherwise, we won't be able to
5154             // find the existing values if the cells are not specified
5155             // in order by the client since cells are in an array list.
5156             Collections.sort(family.getValue(), store.getComparator());
5157             // Get previous values for all columns in this family
5158             Get get = new Get(row);
5159             for (Cell cell : family.getValue()) {
5160               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5161               get.addColumn(family.getKey(), kv.getQualifier());
5162             }
5163             List<Cell> results = get(get, false);
5164 
5165             // Iterate the input columns and update existing values if they were
5166             // found, otherwise add new column initialized to the append value
5167 
5168             // Avoid as much copying as possible. Every byte is copied at most
5169             // once.
5170             // Would be nice if KeyValue had scatter/gather logic
5171             int idx = 0;
5172             for (Cell cell : family.getValue()) {
5173               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5174               KeyValue newKV;
5175               KeyValue oldKv = null;
5176               if (idx < results.size()
5177                   && CellUtil.matchingQualifier(results.get(idx),kv)) {
5178                 oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
5179                 // allocate an empty kv once
5180                 newKV = new KeyValue(row.length, kv.getFamilyLength(),
5181                     kv.getQualifierLength(), now, KeyValue.Type.Put,
5182                     oldKv.getValueLength() + kv.getValueLength(),
5183                     oldKv.getTagsLengthUnsigned() + kv.getTagsLengthUnsigned());
5184                 // copy in the value
5185                 System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
5186                     newKV.getBuffer(), newKV.getValueOffset(),
5187                     oldKv.getValueLength());
5188                 System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
5189                     newKV.getBuffer(),
5190                     newKV.getValueOffset() + oldKv.getValueLength(),
5191                     kv.getValueLength());
5192                 // copy in the tags
5193                 System.arraycopy(oldKv.getBuffer(), oldKv.getTagsOffset(), newKV.getBuffer(),
5194                     newKV.getTagsOffset(), oldKv.getTagsLengthUnsigned());
5195                 System.arraycopy(kv.getBuffer(), kv.getTagsOffset(), newKV.getBuffer(),
5196                     newKV.getTagsOffset() + oldKv.getTagsLengthUnsigned(),
5197                     kv.getTagsLengthUnsigned());
5198                 // copy in row, family, and qualifier
5199                 System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
5200                     newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
5201                 System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
5202                     newKV.getBuffer(), newKV.getFamilyOffset(),
5203                     kv.getFamilyLength());
5204                 System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
5205                     newKV.getBuffer(), newKV.getQualifierOffset(),
5206                     kv.getQualifierLength());
5207                 idx++;
5208               } else {
5209                 newKV = kv;
5210                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
5211                 // so only need to update the timestamp to 'now'
5212                 newKV.updateLatestStamp(Bytes.toBytes(now));
5213               }
5214               newKV.setMvccVersion(w.getWriteNumber());
5215               // Give coprocessors a chance to update the new cell
5216               if (coprocessorHost != null) {
5217                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5218                     RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKV));
5219               }
5220               kvs.add(newKV);
5221 
5222               // Append update to WAL
5223               if (writeToWAL) {
5224                 if (walEdits == null) {
5225                   walEdits = new WALEdit();
5226                 }
5227                 walEdits.add(newKV);
5228               }
5229             }
5230 
5231             //store the kvs to the temporary memstore before writing HLog
5232             tempMemstore.put(store, kvs);
5233           }
5234 
5235           // Actually write to WAL now
5236           if (writeToWAL) {
5237             // Using default cluster id, as this can only happen in the orginating
5238             // cluster. A slave cluster receives the final value (not the delta)
5239             // as a Put.
5240             txid = this.log.appendNoSync(this.getRegionInfo(),
5241               this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
5242               EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
5243               true, nonceGroup, nonce);
5244           } else {
5245             recordMutationWithoutWal(append.getFamilyCellMap());
5246           }
5247 
5248           //Actually write to Memstore now
5249           for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5250             Store store = entry.getKey();
5251             if (store.getFamily().getMaxVersions() == 1) {
5252               // upsert if VERSIONS for this CF == 1
5253               size += store.upsert(entry.getValue(), getSmallestReadPoint());
5254             } else {
5255               // otherwise keep older versions around
5256               for (Cell cell: entry.getValue()) {
5257                 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5258                 size += store.add(kv);
5259               }
5260             }
5261             allKVs.addAll(entry.getValue());
5262           }
5263           size = this.addAndGetGlobalMemstoreSize(size);
5264           flush = isFlushSize(size);
5265         } finally {
5266           this.updatesLock.readLock().unlock();
5267         }
5268       } finally {
5269         rowLock.release();
5270       }
5271       if (writeToWAL) {
5272         // sync the transaction log outside the rowlock
5273         syncOrDefer(txid, durability);
5274       }
5275     } finally {
5276       if (w != null) {
5277         mvcc.completeMemstoreInsert(w);
5278       }
5279       closeRegionOperation(Operation.APPEND);
5280     }
5281 
5282     if (this.metricsRegion != null) {
5283       this.metricsRegion.updateAppend();
5284     }
5285 
5286     if (flush) {
5287       // Request a cache flush. Do it outside update lock.
5288       requestFlush();
5289     }
5290 
5291 
5292     return append.isReturnResults() ? Result.create(allKVs) : null;
5293   }
5294 
5295   public Result increment(Increment increment) throws IOException {
5296     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5297   }
5298 
5299   /**
5300    * Perform one or more increment operations on a row.
5301    * @param increment
5302    * @return new keyvalues after increment
5303    * @throws IOException
5304    */
5305   public Result increment(Increment increment, long nonceGroup, long nonce)
5306   throws IOException {
5307     byte [] row = increment.getRow();
5308     checkRow(row, "increment");
5309     TimeRange tr = increment.getTimeRange();
5310     boolean flush = false;
5311     Durability durability = getEffectiveDurability(increment.getDurability());
5312     boolean writeToWAL = durability != Durability.SKIP_WAL;
5313     WALEdit walEdits = null;
5314     List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5315     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5316 
5317     long size = 0;
5318     long txid = 0;
5319 
5320     checkReadOnly();
5321     checkResources();
5322     // Lock row
5323     startRegionOperation(Operation.INCREMENT);
5324     this.writeRequestsCount.increment();
5325     WriteEntry w = null;
5326     try {
5327       RowLock rowLock = getRowLock(row);
5328       try {
5329         lock(this.updatesLock.readLock());
5330         try {
5331           // wait for all prior MVCC transactions to finish - while we hold the row lock
5332           // (so that we are guaranteed to see the latest state)
5333           mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
5334           if (this.coprocessorHost != null) {
5335             Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5336             if (r != null) {
5337               return r;
5338             }
5339           }
5340           // now start my own transaction
5341           w = mvcc.beginMemstoreInsert();
5342           long now = EnvironmentEdgeManager.currentTimeMillis();
5343           // Process each family
5344           for (Map.Entry<byte [], List<Cell>> family:
5345               increment.getFamilyCellMap().entrySet()) {
5346 
5347             Store store = stores.get(family.getKey());
5348             List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5349 
5350             // Sort the cells so that they match the order that they
5351             // appear in the Get results. Otherwise, we won't be able to
5352             // find the existing values if the cells are not specified
5353             // in order by the client since cells are in an array list.
5354             Collections.sort(family.getValue(), store.getComparator());
5355             // Get previous values for all columns in this family
5356             Get get = new Get(row);
5357             for (Cell cell: family.getValue()) {
5358               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5359               get.addColumn(family.getKey(), kv.getQualifier());
5360             }
5361             get.setTimeRange(tr.getMin(), tr.getMax());
5362             List<Cell> results = get(get, false);
5363 
5364             // Iterate the input columns and update existing values if they were
5365             // found, otherwise add new column initialized to the increment amount
5366             int idx = 0;
5367             for (Cell kv: family.getValue()) {
5368               long amount = Bytes.toLong(CellUtil.cloneValue(kv));
5369               boolean noWriteBack = (amount == 0);
5370 
5371               Cell c = null;
5372               if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
5373                 c = results.get(idx);
5374                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5375                   amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5376                 } else {
5377                   // throw DoNotRetryIOException instead of IllegalArgumentException
5378                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5379                       "Attempted to increment field that isn't 64 bits wide");
5380                 }
5381                 idx++;
5382               }
5383 
5384               // Append new incremented KeyValue to list
5385               byte[] q = CellUtil.cloneQualifier(kv);
5386               byte[] val = Bytes.toBytes(amount);
5387               int oldCellTagsLen = (c == null) ? 0 : c.getTagsLengthUnsigned();
5388               int incCellTagsLen = kv.getTagsLengthUnsigned();
5389               KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
5390                   KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
5391               System.arraycopy(row, 0, newKV.getBuffer(), newKV.getRowOffset(), row.length);
5392               System.arraycopy(family.getKey(), 0, newKV.getBuffer(), newKV.getFamilyOffset(),
5393                   family.getKey().length);
5394               System.arraycopy(q, 0, newKV.getBuffer(), newKV.getQualifierOffset(), q.length);
5395               // copy in the value
5396               System.arraycopy(val, 0, newKV.getBuffer(), newKV.getValueOffset(), val.length);
5397               // copy tags
5398               if (oldCellTagsLen > 0) {
5399                 System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getBuffer(),
5400                     newKV.getTagsOffset(), oldCellTagsLen);
5401               }
5402               if (incCellTagsLen > 0) {
5403                 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getBuffer(),
5404                     newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
5405               }
5406               newKV.setMvccVersion(w.getWriteNumber());
5407               // Give coprocessors a chance to update the new cell
5408               if (coprocessorHost != null) {
5409                 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5410                     RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV));
5411               }
5412               allKVs.add(newKV);
5413 
5414               if (!noWriteBack) {
5415                 kvs.add(newKV);
5416 
5417                 // Prepare WAL updates
5418                 if (writeToWAL) {
5419                   if (walEdits == null) {
5420                     walEdits = new WALEdit();
5421                   }
5422                   walEdits.add(newKV);
5423                 }
5424               }
5425             }
5426 
5427             //store the kvs to the temporary memstore before writing HLog
5428             if (!kvs.isEmpty()) {
5429               tempMemstore.put(store, kvs);
5430             }
5431           }
5432 
5433           // Actually write to WAL now
5434           if (walEdits != null && !walEdits.isEmpty()) {
5435             if (writeToWAL) {
5436               // Using default cluster id, as this can only happen in the orginating
5437               // cluster. A slave cluster receives the final value (not the delta)
5438               // as a Put.
5439               txid = this.log.appendNoSync(this.getRegionInfo(),
5440                   this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
5441                   EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
5442                   true, nonceGroup, nonce);
5443             } else {
5444               recordMutationWithoutWal(increment.getFamilyCellMap());
5445             }
5446           }
5447           //Actually write to Memstore now
5448           if (!tempMemstore.isEmpty()) {
5449             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5450               Store store = entry.getKey();
5451               if (store.getFamily().getMaxVersions() == 1) {
5452                 // upsert if VERSIONS for this CF == 1
5453                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5454               } else {
5455                 // otherwise keep older versions around
5456                 for (Cell cell : entry.getValue()) {
5457                   KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5458                   size += store.add(kv);
5459                 }
5460               }
5461             }
5462             size = this.addAndGetGlobalMemstoreSize(size);
5463             flush = isFlushSize(size);
5464           }
5465         } finally {
5466           this.updatesLock.readLock().unlock();
5467         }
5468       } finally {
5469         rowLock.release();
5470       }
5471       if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) {
5472         // sync the transaction log outside the rowlock
5473         syncOrDefer(txid, durability);
5474       }
5475     } finally {
5476       if (w != null) {
5477         mvcc.completeMemstoreInsert(w);
5478       }
5479       closeRegionOperation(Operation.INCREMENT);
5480       if (this.metricsRegion != null) {
5481         this.metricsRegion.updateIncrement();
5482       }
5483     }
5484 
5485     if (flush) {
5486       // Request a cache flush.  Do it outside update lock.
5487       requestFlush();
5488     }
5489 
5490     return Result.create(allKVs);
5491   }
5492 
5493   //
5494   // New HBASE-880 Helpers
5495   //
5496 
5497   private void checkFamily(final byte [] family)
5498   throws NoSuchColumnFamilyException {
5499     if (!this.htableDescriptor.hasFamily(family)) {
5500       throw new NoSuchColumnFamilyException("Column family " +
5501           Bytes.toString(family) + " does not exist in region " + this
5502           + " in table " + this.htableDescriptor);
5503     }
5504   }
5505 
5506   public static final long FIXED_OVERHEAD = ClassSize.align(
5507       ClassSize.OBJECT +
5508       ClassSize.ARRAY +
5509       41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5510       (12 * Bytes.SIZEOF_LONG) +
5511       4 * Bytes.SIZEOF_BOOLEAN);
5512 
5513   // woefully out of date - currently missing:
5514   // 1 x HashMap - coprocessorServiceHandlers
5515   // 6 org.cliffc.high_scale_lib.Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL,
5516   //   checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount,
5517   //   writeRequestsCount, updatesBlockedMs
5518   // 1 x HRegion$WriteState - writestate
5519   // 1 x RegionCoprocessorHost - coprocessorHost
5520   // 1 x RegionSplitPolicy - splitPolicy
5521   // 1 x MetricsRegion - metricsRegion
5522   // 1 x MetricsRegionWrapperImpl - metricsRegionWrapper
5523   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5524       ClassSize.OBJECT + // closeLock
5525       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
5526       (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL
5527       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, scannerReadPoints
5528       WriteState.HEAP_SIZE + // writestate
5529       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
5530       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
5531       ClassSize.ARRAYLIST + // recentFlushes
5532       MultiVersionConsistencyControl.FIXED_SIZE // mvcc
5533       + ClassSize.TREEMAP // maxSeqIdInStores
5534       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
5535       ;
5536 
5537   @Override
5538   public long heapSize() {
5539     long heapSize = DEEP_OVERHEAD;
5540     for (Store store : this.stores.values()) {
5541       heapSize += store.heapSize();
5542     }
5543     // this does not take into account row locks, recent flushes, mvcc entries, and more
5544     return heapSize;
5545   }
5546 
5547   /*
5548    * This method calls System.exit.
5549    * @param message Message to print out.  May be null.
5550    */
5551   private static void printUsageAndExit(final String message) {
5552     if (message != null && message.length() > 0) System.out.println(message);
5553     System.out.println("Usage: HRegion CATLALOG_TABLE_DIR [major_compact]");
5554     System.out.println("Options:");
5555     System.out.println(" major_compact  Pass this option to major compact " +
5556       "passed region.");
5557     System.out.println("Default outputs scan of passed region.");
5558     System.exit(1);
5559   }
5560 
5561   /**
5562    * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to
5563    * be available for handling
5564    * {@link HRegion#execService(com.google.protobuf.RpcController,
5565    *    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls.
5566    *
5567    * <p>
5568    * Only a single instance may be registered per region for a given {@link Service} subclass (the
5569    * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}.
5570    * After the first registration, subsequent calls with the same service name will fail with
5571    * a return value of {@code false}.
5572    * </p>
5573    * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint
5574    * @return {@code true} if the registration was successful, {@code false}
5575    * otherwise
5576    */
5577   public boolean registerService(Service instance) {
5578     /*
5579      * No stacking of instances is allowed for a single service name
5580      */
5581     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5582     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5583       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5584           " already registered, rejecting request from "+instance
5585       );
5586       return false;
5587     }
5588 
5589     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5590     if (LOG.isDebugEnabled()) {
5591       LOG.debug("Registered coprocessor service: region="+
5592           Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5593     }
5594     return true;
5595   }
5596 
5597   /**
5598    * Executes a single protocol buffer coprocessor endpoint {@link Service} method using
5599    * the registered protocol handlers.  {@link Service} implementations must be registered via the
5600    * {@link HRegion#registerService(com.google.protobuf.Service)}
5601    * method before they are available.
5602    *
5603    * @param controller an {@code RpcContoller} implementation to pass to the invoked service
5604    * @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
5605    *     and parameters for the method invocation
5606    * @return a protocol buffer {@code Message} instance containing the method's result
5607    * @throws IOException if no registered service handler is found or an error
5608    *     occurs during the invocation
5609    * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service)
5610    */
5611   public Message execService(RpcController controller, CoprocessorServiceCall call)
5612       throws IOException {
5613     String serviceName = call.getServiceName();
5614     String methodName = call.getMethodName();
5615     if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5616       throw new UnknownProtocolException(null,
5617           "No registered coprocessor service found for name "+serviceName+
5618           " in region "+Bytes.toStringBinary(getRegionName()));
5619     }
5620 
5621     Service service = coprocessorServiceHandlers.get(serviceName);
5622     Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5623     Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5624     if (methodDesc == null) {
5625       throw new UnknownProtocolException(service.getClass(),
5626           "Unknown method "+methodName+" called on service "+serviceName+
5627               " in region "+Bytes.toStringBinary(getRegionName()));
5628     }
5629 
5630     Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5631         .mergeFrom(call.getRequest()).build();
5632 
5633     if (coprocessorHost != null) {
5634       request = coprocessorHost.preEndpointInvocation(service, methodName, request);
5635     }
5636 
5637     final Message.Builder responseBuilder =
5638         service.getResponsePrototype(methodDesc).newBuilderForType();
5639     service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5640       @Override
5641       public void run(Message message) {
5642         if (message != null) {
5643           responseBuilder.mergeFrom(message);
5644         }
5645       }
5646     });
5647 
5648     if (coprocessorHost != null) {
5649       coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
5650     }
5651 
5652     return responseBuilder.build();
5653   }
5654 
5655   /*
5656    * Process table.
5657    * Do major compaction or list content.
5658    * @param fs
5659    * @param p
5660    * @param log
5661    * @param c
5662    * @param majorCompact
5663    * @throws IOException
5664    */
5665   private static void processTable(final FileSystem fs, final Path p,
5666       final HLog log, final Configuration c,
5667       final boolean majorCompact)
5668   throws IOException {
5669     HRegion region = null;
5670     // Currently expects tables have one region only.
5671     if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
5672       region = HRegion.newHRegion(p, log, fs, c,
5673         HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
5674     } else {
5675       throw new IOException("Not a known catalog table: " + p.toString());
5676     }
5677     try {
5678       region.initialize();
5679       if (majorCompact) {
5680         region.compactStores(true);
5681       } else {
5682         // Default behavior
5683         Scan scan = new Scan();
5684         // scan.addFamily(HConstants.CATALOG_FAMILY);
5685         RegionScanner scanner = region.getScanner(scan);
5686         try {
5687           List<Cell> kvs = new ArrayList<Cell>();
5688           boolean done;
5689           do {
5690             kvs.clear();
5691             done = scanner.next(kvs);
5692             if (kvs.size() > 0) LOG.info(kvs);
5693           } while (done);
5694         } finally {
5695           scanner.close();
5696         }
5697       }
5698     } finally {
5699       region.close();
5700     }
5701   }
5702 
5703   boolean shouldForceSplit() {
5704     return this.splitRequest;
5705   }
5706 
5707   byte[] getExplicitSplitPoint() {
5708     return this.explicitSplitPoint;
5709   }
5710 
5711   void forceSplit(byte[] sp) {
5712     // NOTE : this HRegion will go away after the forced split is successfull
5713     //        therefore, no reason to clear this value
5714     this.splitRequest = true;
5715     if (sp != null) {
5716       this.explicitSplitPoint = sp;
5717     }
5718   }
5719 
5720   void clearSplit_TESTS_ONLY() {
5721     this.splitRequest = false;
5722   }
5723 
5724   /**
5725    * Give the region a chance to prepare before it is split.
5726    */
5727   protected void prepareToSplit() {
5728     // nothing
5729   }
5730 
5731   /**
5732    * Return the splitpoint. null indicates the region isn't splittable
5733    * If the splitpoint isn't explicitly specified, it will go over the stores
5734    * to find the best splitpoint. Currently the criteria of best splitpoint
5735    * is based on the size of the store.
5736    */
5737   public byte[] checkSplit() {
5738     // Can't split META
5739     if (this.getRegionInfo().isMetaTable() ||
5740         TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
5741       if (shouldForceSplit()) {
5742         LOG.warn("Cannot split meta region in HBase 0.20 and above");
5743       }
5744       return null;
5745     }
5746 
5747     // Can't split region which is in recovering state
5748     if (this.isRecovering()) {
5749       LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
5750       return null;
5751     }
5752 
5753     if (!splitPolicy.shouldSplit()) {
5754       return null;
5755     }
5756 
5757     byte[] ret = splitPolicy.getSplitPoint();
5758 
5759     if (ret != null) {
5760       try {
5761         checkRow(ret, "calculated split");
5762       } catch (IOException e) {
5763         LOG.error("Ignoring invalid split", e);
5764         return null;
5765       }
5766     }
5767     return ret;
5768   }
5769 
5770   /**
5771    * @return The priority that this region should have in the compaction queue
5772    */
5773   public int getCompactPriority() {
5774     int count = Integer.MAX_VALUE;
5775     for (Store store : stores.values()) {
5776       count = Math.min(count, store.getCompactPriority());
5777     }
5778     return count;
5779   }
5780 
5781   /**
5782    * Checks every store to see if one has too many
5783    * store files
5784    * @return true if any store has too many store files
5785    */
5786   public boolean needsCompaction() {
5787     for (Store store : stores.values()) {
5788       if(store.needsCompaction()) {
5789         return true;
5790       }
5791     }
5792     return false;
5793   }
5794 
5795   /** @return the coprocessor host */
5796   public RegionCoprocessorHost getCoprocessorHost() {
5797     return coprocessorHost;
5798   }
5799 
5800   /** @param coprocessorHost the new coprocessor host */
5801   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
5802     this.coprocessorHost = coprocessorHost;
5803   }
5804 
5805   /**
5806    * This method needs to be called before any public call that reads or
5807    * modifies data. It has to be called just before a try.
5808    * #closeRegionOperation needs to be called in the try's finally block
5809    * Acquires a read lock and checks if the region is closing or closed.
5810    * @throws IOException 
5811    */
5812   public void startRegionOperation() throws IOException {
5813     startRegionOperation(Operation.ANY);
5814   }
5815 
5816   /**
5817    * @param op The operation is about to be taken on the region
5818    * @throws IOException 
5819    */
5820   protected void startRegionOperation(Operation op) throws IOException {
5821     switch (op) {
5822     case INCREMENT:
5823     case APPEND:
5824     case GET:
5825     case SCAN:
5826     case SPLIT_REGION:
5827     case MERGE_REGION:
5828     case PUT:
5829     case DELETE:
5830     case BATCH_MUTATE:
5831     case COMPACT_REGION:
5832       // when a region is in recovering state, no read, split or merge is allowed
5833       if (this.isRecovering() && (this.disallowWritesInRecovering ||
5834               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
5835         throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering");
5836       }
5837       break;
5838     default:
5839       break;
5840     }
5841     if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
5842         || op == Operation.COMPACT_REGION) {
5843       // split, merge or compact region doesn't need to check the closing/closed state or lock the
5844       // region
5845       return;
5846     }
5847     if (this.closing.get()) {
5848       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5849     }
5850     lock(lock.readLock());
5851     if (this.closed.get()) {
5852       lock.readLock().unlock();
5853       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5854     }
5855     try {
5856       if (coprocessorHost != null) {
5857         coprocessorHost.postStartRegionOperation(op);
5858       }
5859     } catch (Exception e) {
5860       lock.readLock().unlock();
5861       throw new IOException(e);
5862     }
5863   }
5864 
5865   /**
5866    * Closes the lock. This needs to be called in the finally block corresponding
5867    * to the try block of #startRegionOperation
5868    * @throws IOException 
5869    */
5870   public void closeRegionOperation() throws IOException {
5871     closeRegionOperation(Operation.ANY);
5872   }
5873 
5874   /**
5875    * Closes the lock. This needs to be called in the finally block corresponding
5876    * to the try block of {@link #startRegionOperation(Operation)}
5877    * @param operation
5878    * @throws IOException
5879    */
5880   public void closeRegionOperation(Operation operation) throws IOException {
5881     lock.readLock().unlock();
5882     if (coprocessorHost != null) {
5883       coprocessorHost.postCloseRegionOperation(operation);
5884     }
5885   }
5886 
5887   /**
5888    * This method needs to be called before any public call that reads or
5889    * modifies stores in bulk. It has to be called just before a try.
5890    * #closeBulkRegionOperation needs to be called in the try's finally block
5891    * Acquires a writelock and checks if the region is closing or closed.
5892    * @throws NotServingRegionException when the region is closing or closed
5893    * @throws RegionTooBusyException if failed to get the lock in time
5894    * @throws InterruptedIOException if interrupted while waiting for a lock
5895    */
5896   private void startBulkRegionOperation(boolean writeLockNeeded)
5897       throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
5898     if (this.closing.get()) {
5899       throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5900     }
5901     if (writeLockNeeded) lock(lock.writeLock());
5902     else lock(lock.readLock());
5903     if (this.closed.get()) {
5904       if (writeLockNeeded) lock.writeLock().unlock();
5905       else lock.readLock().unlock();
5906       throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5907     }
5908   }
5909 
5910   /**
5911    * Closes the lock. This needs to be called in the finally block corresponding
5912    * to the try block of #startRegionOperation
5913    */
5914   private void closeBulkRegionOperation(){
5915     if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
5916     else lock.readLock().unlock();
5917   }
5918 
5919   /**
5920    * Update counters for numer of puts without wal and the size of possible data loss.
5921    * These information are exposed by the region server metrics.
5922    */
5923   private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
5924     numMutationsWithoutWAL.increment();
5925     if (numMutationsWithoutWAL.get() <= 1) {
5926       LOG.info("writing data to region " + this +
5927                " with WAL disabled. Data may be lost in the event of a crash.");
5928     }
5929 
5930     long mutationSize = 0;
5931     for (List<Cell> cells: familyMap.values()) {
5932       for (Cell cell : cells) {
5933         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5934         mutationSize += kv.getKeyLength() + kv.getValueLength();
5935       }
5936     }
5937 
5938     dataInMemoryWithoutWAL.add(mutationSize);
5939   }
5940 
5941   private void lock(final Lock lock)
5942       throws RegionTooBusyException, InterruptedIOException {
5943     lock(lock, 1);
5944   }
5945 
5946   /**
5947    * Try to acquire a lock.  Throw RegionTooBusyException
5948    * if failed to get the lock in time. Throw InterruptedIOException
5949    * if interrupted while waiting for the lock.
5950    */
5951   private void lock(final Lock lock, final int multiplier)
5952       throws RegionTooBusyException, InterruptedIOException {
5953     try {
5954       final long waitTime = Math.min(maxBusyWaitDuration,
5955           busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
5956       if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
5957         throw new RegionTooBusyException(
5958             "failed to get a lock in " + waitTime + " ms. " +
5959                 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
5960                 this.getRegionInfo().getRegionNameAsString()) +
5961                 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
5962                 this.getRegionServerServices().getServerName()));
5963       }
5964     } catch (InterruptedException ie) {
5965       LOG.info("Interrupted while waiting for a lock");
5966       InterruptedIOException iie = new InterruptedIOException();
5967       iie.initCause(ie);
5968       throw iie;
5969     }
5970   }
5971 
5972   /**
5973    * Calls sync with the given transaction ID if the region's table is not
5974    * deferring it.
5975    * @param txid should sync up to which transaction
5976    * @throws IOException If anything goes wrong with DFS
5977    */
5978   private void syncOrDefer(long txid, Durability durability) throws IOException {
5979     if (this.getRegionInfo().isMetaRegion()) {
5980       this.log.sync(txid);
5981     } else {
5982       switch(durability) {
5983       case USE_DEFAULT:
5984         // do what table defaults to
5985         if (shouldSyncLog()) {
5986           this.log.sync(txid);
5987         }
5988         break;
5989       case SKIP_WAL:
5990         // nothing do to
5991         break;
5992       case ASYNC_WAL:
5993         // nothing do to
5994         break;
5995       case SYNC_WAL:
5996       case FSYNC_WAL:
5997         // sync the WAL edit (SYNC and FSYNC treated the same for now)
5998         this.log.sync(txid);
5999         break;
6000       }
6001     }
6002   }
6003 
6004   /**
6005    * Check whether we should sync the log from the table's durability settings
6006    */
6007   private boolean shouldSyncLog() {
6008     return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
6009   }
6010 
6011   /**
6012    * A mocked list implementaion - discards all updates.
6013    */
6014   private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
6015 
6016     @Override
6017     public void add(int index, Cell element) {
6018       // do nothing
6019     }
6020 
6021     @Override
6022     public boolean addAll(int index, Collection<? extends Cell> c) {
6023       return false; // this list is never changed as a result of an update
6024     }
6025 
6026     @Override
6027     public KeyValue get(int index) {
6028       throw new UnsupportedOperationException();
6029     }
6030 
6031     @Override
6032     public int size() {
6033       return 0;
6034     }
6035   };
6036 
6037   /**
6038    * Facility for dumping and compacting catalog tables.
6039    * Only does catalog tables since these are only tables we for sure know
6040    * schema on.  For usage run:
6041    * <pre>
6042    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
6043    * </pre>
6044    * @param args
6045    * @throws IOException
6046    */
6047   public static void main(String[] args) throws IOException {
6048     if (args.length < 1) {
6049       printUsageAndExit(null);
6050     }
6051     boolean majorCompact = false;
6052     if (args.length > 1) {
6053       if (!args[1].toLowerCase().startsWith("major")) {
6054         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6055       }
6056       majorCompact = true;
6057     }
6058     final Path tableDir = new Path(args[0]);
6059     final Configuration c = HBaseConfiguration.create();
6060     final FileSystem fs = FileSystem.get(c);
6061     final Path logdir = new Path(c.get("hbase.tmp.dir"));
6062     final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6063 
6064     final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
6065     try {
6066       processTable(fs, tableDir, log, c, majorCompact);
6067     } finally {
6068        log.close();
6069        // TODO: is this still right?
6070        BlockCache bc = new CacheConfig(c).getBlockCache();
6071        if (bc != null) bc.shutdown();
6072     }
6073   }
6074 
6075   /**
6076    * Gets the latest sequence number that was read from storage when this region was opened.
6077    */
6078   public long getOpenSeqNum() {
6079     return this.openSeqNum;
6080   }
6081 
6082   /**
6083    * Gets max sequence ids of stores that was read from storage when this region was opened. WAL
6084    * Edits with smaller or equal sequence number will be skipped from replay.
6085    */
6086   public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6087     return this.maxSeqIdInStores;
6088   }
6089 
6090   /**
6091    * @return if a given region is in compaction now.
6092    */
6093   public CompactionState getCompactionState() {
6094     boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6095     return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6096         : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6097   }
6098 
6099   public void reportCompactionRequestStart(boolean isMajor){
6100     (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6101   }
6102 
6103   public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted){
6104     int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6105 
6106     // metrics
6107     compactionsFinished.incrementAndGet();
6108     compactionNumFilesCompacted.addAndGet(numFiles);
6109     compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6110 
6111     assert newValue >= 0;
6112   }
6113 
6114   /**
6115    * @return sequenceId.
6116    */
6117   public AtomicLong getSequenceId() {
6118     return this.sequenceId;
6119   }
6120 
6121   /**
6122    * sets this region's sequenceId.
6123    * @param value new value
6124    */
6125   private void setSequenceId(long value) {
6126     this.sequenceId.set(value);
6127   }
6128 
6129   /**
6130    * Listener class to enable callers of
6131    * bulkLoadHFile() to perform any necessary
6132    * pre/post processing of a given bulkload call
6133    */
6134   public interface BulkLoadListener {
6135 
6136     /**
6137      * Called before an HFile is actually loaded
6138      * @param family family being loaded to
6139      * @param srcPath path of HFile
6140      * @return final path to be used for actual loading
6141      * @throws IOException
6142      */
6143     String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6144 
6145     /**
6146      * Called after a successful HFile load
6147      * @param family family being loaded to
6148      * @param srcPath path of HFile
6149      * @throws IOException
6150      */
6151     void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6152 
6153     /**
6154      * Called after a failed HFile load
6155      * @param family family being loaded to
6156      * @param srcPath path of HFile
6157      * @throws IOException
6158      */
6159     void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6160   }
6161 
6162   @VisibleForTesting class RowLockContext {
6163     private final HashedBytes row;
6164     private final CountDownLatch latch = new CountDownLatch(1);
6165     private final Thread thread;
6166     private int lockCount = 0;
6167 
6168     RowLockContext(HashedBytes row) {
6169       this.row = row;
6170       this.thread = Thread.currentThread();
6171     }
6172 
6173     boolean ownedByCurrentThread() {
6174       return thread == Thread.currentThread();
6175     }
6176 
6177     RowLock newLock() {
6178       lockCount++;
6179       return new RowLock(this);
6180     }
6181 
6182     void releaseLock() {
6183       if (!ownedByCurrentThread()) {
6184         throw new IllegalArgumentException("Lock held by thread: " + thread
6185           + " cannot be released by different thread: " + Thread.currentThread());
6186       }
6187       lockCount--;
6188       if (lockCount == 0) {
6189         // no remaining locks by the thread, unlock and allow other threads to access
6190         RowLockContext existingContext = lockedRows.remove(row);
6191         if (existingContext != this) {
6192           throw new RuntimeException(
6193               "Internal row lock state inconsistent, should not happen, row: " + row);
6194         }
6195         latch.countDown();
6196       }
6197     }
6198   }
6199 
6200   /**
6201    * Row lock held by a given thread.
6202    * One thread may acquire multiple locks on the same row simultaneously.
6203    * The locks must be released by calling release() from the same thread.
6204    */
6205   public static class RowLock {
6206     @VisibleForTesting final RowLockContext context;
6207     private boolean released = false;
6208 
6209     @VisibleForTesting RowLock(RowLockContext context) {
6210       this.context = context;
6211     }
6212 
6213     /**
6214      * Release the given lock.  If there are no remaining locks held by the current thread
6215      * then unlock the row and allow other threads to acquire the lock.
6216      * @throws IllegalArgumentException if called by a different thread than the lock owning thread
6217      */
6218     public void release() {
6219       if (!released) {
6220         context.releaseLock();
6221         released = true;
6222       }
6223     }
6224   }
6225 
6226   /**
6227    * Lock the updates' readLock first, so that we could safely append logs in coprocessors.
6228    * @throws RegionTooBusyException
6229    * @throws InterruptedIOException
6230    */
6231   public void updatesLock() throws RegionTooBusyException, InterruptedIOException {
6232     lock(updatesLock.readLock());
6233   }
6234 
6235   /**
6236    * Unlock the updates' readLock after appending logs in coprocessors.
6237    * @throws InterruptedIOException
6238    */
6239   public void updatesUnlock() throws InterruptedIOException {
6240     updatesLock.readLock().unlock();
6241   }
6242 }