1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.lang.reflect.Constructor;
26 import java.text.ParseException;
27 import java.util.AbstractList;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.Comparator;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Map.Entry;
39 import java.util.NavigableMap;
40 import java.util.NavigableSet;
41 import java.util.RandomAccess;
42 import java.util.Set;
43 import java.util.TreeMap;
44 import java.util.concurrent.Callable;
45 import java.util.concurrent.CompletionService;
46 import java.util.concurrent.ConcurrentHashMap;
47 import java.util.concurrent.ConcurrentMap;
48 import java.util.concurrent.ConcurrentSkipListMap;
49 import java.util.concurrent.ExecutionException;
50 import java.util.concurrent.ExecutorCompletionService;
51 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Executors;
53 import java.util.concurrent.Future;
54 import java.util.concurrent.FutureTask;
55 import java.util.concurrent.ThreadFactory;
56 import java.util.concurrent.ThreadPoolExecutor;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.TimeoutException;
59 import java.util.concurrent.atomic.AtomicBoolean;
60 import java.util.concurrent.atomic.AtomicInteger;
61 import java.util.concurrent.atomic.AtomicLong;
62 import java.util.concurrent.locks.Lock;
63 import java.util.concurrent.locks.ReadWriteLock;
64 import java.util.concurrent.locks.ReentrantReadWriteLock;
65
66 import org.apache.commons.lang.RandomStringUtils;
67 import org.apache.commons.logging.Log;
68 import org.apache.commons.logging.LogFactory;
69 import org.apache.hadoop.conf.Configuration;
70 import org.apache.hadoop.fs.FileStatus;
71 import org.apache.hadoop.fs.FileSystem;
72 import org.apache.hadoop.fs.Path;
73 import org.apache.hadoop.hbase.Cell;
74 import org.apache.hadoop.hbase.CellScanner;
75 import org.apache.hadoop.hbase.CellUtil;
76 import org.apache.hadoop.hbase.CompoundConfiguration;
77 import org.apache.hadoop.hbase.DoNotRetryIOException;
78 import org.apache.hadoop.hbase.DroppedSnapshotException;
79 import org.apache.hadoop.hbase.HBaseConfiguration;
80 import org.apache.hadoop.hbase.HColumnDescriptor;
81 import org.apache.hadoop.hbase.HConstants;
82 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
83 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
84 import org.apache.hadoop.hbase.HRegionInfo;
85 import org.apache.hadoop.hbase.HTableDescriptor;
86 import org.apache.hadoop.hbase.KeyValue;
87 import org.apache.hadoop.hbase.KeyValueUtil;
88 import org.apache.hadoop.hbase.NamespaceDescriptor;
89 import org.apache.hadoop.hbase.NotServingRegionException;
90 import org.apache.hadoop.hbase.RegionTooBusyException;
91 import org.apache.hadoop.hbase.TableName;
92 import org.apache.hadoop.hbase.Tag;
93 import org.apache.hadoop.hbase.TagType;
94 import org.apache.hadoop.hbase.UnknownScannerException;
95 import org.apache.hadoop.hbase.backup.HFileArchiver;
96 import org.apache.hadoop.hbase.classification.InterfaceAudience;
97 import org.apache.hadoop.hbase.client.Append;
98 import org.apache.hadoop.hbase.client.Delete;
99 import org.apache.hadoop.hbase.client.Durability;
100 import org.apache.hadoop.hbase.client.Get;
101 import org.apache.hadoop.hbase.client.Increment;
102 import org.apache.hadoop.hbase.client.IsolationLevel;
103 import org.apache.hadoop.hbase.client.Mutation;
104 import org.apache.hadoop.hbase.client.Put;
105 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
106 import org.apache.hadoop.hbase.client.Result;
107 import org.apache.hadoop.hbase.client.RowMutations;
108 import org.apache.hadoop.hbase.client.Scan;
109 import org.apache.hadoop.hbase.conf.ConfigurationManager;
110 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
111 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
112 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
113 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
114 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
115 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
116 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
117 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
118 import org.apache.hadoop.hbase.filter.FilterWrapper;
119 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
120 import org.apache.hadoop.hbase.io.HeapSize;
121 import org.apache.hadoop.hbase.io.TimeRange;
122 import org.apache.hadoop.hbase.io.hfile.BlockCache;
123 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
124 import org.apache.hadoop.hbase.io.hfile.HFile;
125 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
126 import org.apache.hadoop.hbase.ipc.RpcCallContext;
127 import org.apache.hadoop.hbase.ipc.RpcServer;
128 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
129 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
130 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
131 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
132 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
135 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
136 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
137 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
138 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
139 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
140 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
141 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
142 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
143 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
144 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
145 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
146 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
147 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
148 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
149 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
150 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
151 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
152 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
153 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
154 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
155 import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
156 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
157 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
158 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
159 import org.apache.hadoop.hbase.security.User;
160 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
161 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
162 import org.apache.hadoop.hbase.util.ByteStringer;
163 import org.apache.hadoop.hbase.util.Bytes;
164 import org.apache.hadoop.hbase.util.CancelableProgressable;
165 import org.apache.hadoop.hbase.util.ClassSize;
166 import org.apache.hadoop.hbase.util.CompressionTest;
167 import org.apache.hadoop.hbase.util.Counter;
168 import org.apache.hadoop.hbase.util.EncryptionTest;
169 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
170 import org.apache.hadoop.hbase.util.FSTableDescriptors;
171 import org.apache.hadoop.hbase.util.FSUtils;
172 import org.apache.hadoop.hbase.util.HashedBytes;
173 import org.apache.hadoop.hbase.util.Pair;
174 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
175 import org.apache.hadoop.hbase.util.Threads;
176 import org.apache.hadoop.hbase.wal.WAL;
177 import org.apache.hadoop.hbase.wal.WALFactory;
178 import org.apache.hadoop.hbase.wal.WALKey;
179 import org.apache.hadoop.hbase.wal.WALSplitter;
180 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
181 import org.apache.hadoop.io.MultipleIOException;
182 import org.apache.hadoop.util.StringUtils;
183 import org.apache.htrace.Trace;
184 import org.apache.htrace.TraceScope;
185
186 import com.google.common.annotations.VisibleForTesting;
187 import com.google.common.base.Optional;
188 import com.google.common.base.Preconditions;
189 import com.google.common.collect.Lists;
190 import com.google.common.collect.Maps;
191 import com.google.common.io.Closeables;
192 import com.google.protobuf.ByteString;
193 import com.google.protobuf.Descriptors;
194 import com.google.protobuf.Message;
195 import com.google.protobuf.RpcCallback;
196 import com.google.protobuf.RpcController;
197 import com.google.protobuf.Service;
198 import com.google.protobuf.TextFormat;
199
200 @InterfaceAudience.Private
201 public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
202 private static final Log LOG = LogFactory.getLog(HRegion.class);
203
204 public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
205 "hbase.hregion.scan.loadColumnFamiliesOnDemand";
206
207
208
209
210
211
212
213
214
215 private final int maxWaitForSeqId;
216 private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms";
217 private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000;
218
219
220
221
222
223 private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
224
225 final AtomicBoolean closed = new AtomicBoolean(false);
226
227
228
229
230
231
232 final AtomicBoolean closing = new AtomicBoolean(false);
233
234
235
236
237
238 private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
239
240
241
242
243
244
245 private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
246
247
248
249
250
251
252 protected volatile long lastReplayedOpenRegionSeqId = -1L;
253 protected volatile long lastReplayedCompactionSeqId = -1L;
254
255
256
257
258
259
260
261
262
263
264 private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
265 new ConcurrentHashMap<HashedBytes, RowLockContext>();
266
267 protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
268 Bytes.BYTES_RAWCOMPARATOR);
269
270
271 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
272
273 private final AtomicLong memstoreSize = new AtomicLong(0);
274
275
276 final Counter numMutationsWithoutWAL = new Counter();
277 final Counter dataInMemoryWithoutWAL = new Counter();
278
279
280 final Counter checkAndMutateChecksPassed = new Counter();
281 final Counter checkAndMutateChecksFailed = new Counter();
282
283
284 final Counter readRequestsCount = new Counter();
285 final Counter writeRequestsCount = new Counter();
286
287
288 private final Counter blockedRequestsCount = new Counter();
289
290
291 final AtomicLong compactionsFinished = new AtomicLong(0L);
292 final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
293 final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
294
295 private final WAL wal;
296 private final HRegionFileSystem fs;
297 protected final Configuration conf;
298 private final Configuration baseConf;
299 private final KeyValue.KVComparator comparator;
300 private final int rowLockWaitDuration;
301 static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
302
303
304
305
306
307
308
309 final long busyWaitDuration;
310 static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
311
312
313
314
315 final int maxBusyWaitMultiplier;
316
317
318
319 final long maxBusyWaitDuration;
320
321
322 static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
323 final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
324
325 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
326
327
328
329
330 private long openSeqNum = HConstants.NO_SEQNUM;
331
332
333
334
335
336 private boolean isLoadingCfsOnDemandDefault = false;
337
338 private final AtomicInteger majorInProgress = new AtomicInteger(0);
339 private final AtomicInteger minorInProgress = new AtomicInteger(0);
340
341
342
343
344
345
346
347 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
348
349
350 private PrepareFlushResult prepareFlushResult = null;
351
352
353
354
355 private boolean disallowWritesInRecovering = false;
356
357
358 private volatile boolean recovering = false;
359
360 private volatile Optional<ConfigurationManager> configurationManager;
361
362
363
364
365
366
367 public long getSmallestReadPoint() {
368 long minimumReadPoint;
369
370
371
372 synchronized(scannerReadPoints) {
373 minimumReadPoint = mvcc.getReadPoint();
374
375 for (Long readPoint: this.scannerReadPoints.values()) {
376 if (readPoint < minimumReadPoint) {
377 minimumReadPoint = readPoint;
378 }
379 }
380 }
381 return minimumReadPoint;
382 }
383
384
385
386
387
388 static class WriteState {
389
390 volatile boolean flushing = false;
391
392 volatile boolean flushRequested = false;
393
394 AtomicInteger compacting = new AtomicInteger(0);
395
396 volatile boolean writesEnabled = true;
397
398 volatile boolean readOnly = false;
399
400
401 volatile boolean readsEnabled = true;
402
403
404
405
406
407
408 synchronized void setReadOnly(final boolean onOff) {
409 this.writesEnabled = !onOff;
410 this.readOnly = onOff;
411 }
412
413 boolean isReadOnly() {
414 return this.readOnly;
415 }
416
417 boolean isFlushRequested() {
418 return this.flushRequested;
419 }
420
421 void setReadsEnabled(boolean readsEnabled) {
422 this.readsEnabled = readsEnabled;
423 }
424
425 static final long HEAP_SIZE = ClassSize.align(
426 ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
427 }
428
429
430
431
432
433
434
435 public static class FlushResultImpl implements FlushResult {
436 final Result result;
437 final String failureReason;
438 final long flushSequenceId;
439 final boolean wroteFlushWalMarker;
440
441
442
443
444
445
446
447
448 FlushResultImpl(Result result, long flushSequenceId) {
449 this(result, flushSequenceId, null, false);
450 assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
451 .FLUSHED_COMPACTION_NEEDED;
452 }
453
454
455
456
457
458
459 FlushResultImpl(Result result, String failureReason, boolean wroteFlushMarker) {
460 this(result, -1, failureReason, wroteFlushMarker);
461 assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
462 }
463
464
465
466
467
468
469
470 FlushResultImpl(Result result, long flushSequenceId, String failureReason,
471 boolean wroteFlushMarker) {
472 this.result = result;
473 this.flushSequenceId = flushSequenceId;
474 this.failureReason = failureReason;
475 this.wroteFlushWalMarker = wroteFlushMarker;
476 }
477
478
479
480
481
482
483 @Override
484 public boolean isFlushSucceeded() {
485 return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
486 .FLUSHED_COMPACTION_NEEDED;
487 }
488
489
490
491
492
493 @Override
494 public boolean isCompactionNeeded() {
495 return result == Result.FLUSHED_COMPACTION_NEEDED;
496 }
497
498 @Override
499 public String toString() {
500 return new StringBuilder()
501 .append("flush result:").append(result).append(", ")
502 .append("failureReason:").append(failureReason).append(",")
503 .append("flush seq id").append(flushSequenceId).toString();
504 }
505
506 @Override
507 public Result getResult() {
508 return result;
509 }
510 }
511
512
513 @VisibleForTesting
514 static class PrepareFlushResult {
515 final FlushResult result;
516 final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
517 final TreeMap<byte[], List<Path>> committedFiles;
518 final TreeMap<byte[], Long> storeFlushableSize;
519 final long startTime;
520 final long flushOpSeqId;
521 final long flushedSeqId;
522 final long totalFlushableSize;
523
524
525 PrepareFlushResult(FlushResult result, long flushSeqId) {
526 this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0);
527 }
528
529
530 PrepareFlushResult(
531 TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
532 TreeMap<byte[], List<Path>> committedFiles,
533 TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
534 long flushedSeqId, long totalFlushableSize) {
535 this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
536 flushSeqId, flushedSeqId, totalFlushableSize);
537 }
538
539 private PrepareFlushResult(
540 FlushResult result,
541 TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
542 TreeMap<byte[], List<Path>> committedFiles,
543 TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
544 long flushedSeqId, long totalFlushableSize) {
545 this.result = result;
546 this.storeFlushCtxs = storeFlushCtxs;
547 this.committedFiles = committedFiles;
548 this.storeFlushableSize = storeFlushableSize;
549 this.startTime = startTime;
550 this.flushOpSeqId = flushSeqId;
551 this.flushedSeqId = flushedSeqId;
552 this.totalFlushableSize = totalFlushableSize;
553 }
554
555 public FlushResult getResult() {
556 return this.result;
557 }
558 }
559
560 final WriteState writestate = new WriteState();
561
562 long memstoreFlushSize;
563 final long timestampSlop;
564 final long rowProcessorTimeout;
565
566
567 private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
568 new ConcurrentHashMap<Store, Long>();
569
570 final RegionServerServices rsServices;
571 private RegionServerAccounting rsAccounting;
572 private long flushCheckInterval;
573
574 private long flushPerChanges;
575 private long blockingMemStoreSize;
576 final long threadWakeFrequency;
577
578 final ReentrantReadWriteLock lock =
579 new ReentrantReadWriteLock();
580
581
582 private final ReentrantReadWriteLock updatesLock =
583 new ReentrantReadWriteLock();
584 private boolean splitRequest;
585 private byte[] explicitSplitPoint = null;
586
587 private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
588
589
590 private RegionCoprocessorHost coprocessorHost;
591
592 private HTableDescriptor htableDescriptor = null;
593 private RegionSplitPolicy splitPolicy;
594 private FlushPolicy flushPolicy;
595
596 private final MetricsRegion metricsRegion;
597 private final MetricsRegionWrapperImpl metricsRegionWrapper;
598 private final Durability durability;
599 private final boolean regionStatsEnabled;
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622 @Deprecated
623 @VisibleForTesting
624 public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
625 final Configuration confParam, final HRegionInfo regionInfo,
626 final HTableDescriptor htd, final RegionServerServices rsServices) {
627 this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
628 wal, confParam, htd, rsServices);
629 }
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647 public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
648 final HTableDescriptor htd, final RegionServerServices rsServices) {
649 if (htd == null) {
650 throw new IllegalArgumentException("Need table descriptor");
651 }
652
653 if (confParam instanceof CompoundConfiguration) {
654 throw new IllegalArgumentException("Need original base configuration");
655 }
656
657 this.comparator = fs.getRegionInfo().getComparator();
658 this.wal = wal;
659 this.fs = fs;
660
661
662 this.baseConf = confParam;
663 this.conf = new CompoundConfiguration()
664 .add(confParam)
665 .addStringMap(htd.getConfiguration())
666 .addWritableMap(htd.getValues());
667 this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
668 DEFAULT_CACHE_FLUSH_INTERVAL);
669 this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
670 if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
671 throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
672 + MAX_FLUSH_PER_CHANGES);
673 }
674 this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
675 DEFAULT_ROWLOCK_WAIT_DURATION);
676
677 this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID);
678 this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
679 this.htableDescriptor = htd;
680 this.rsServices = rsServices;
681 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
682 setHTableSpecificConf();
683 this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
684
685 this.busyWaitDuration = conf.getLong(
686 "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
687 this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
688 if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
689 throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
690 + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
691 + maxBusyWaitMultiplier + "). Their product should be positive");
692 }
693 this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
694 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
695
696
697
698
699
700
701
702 this.timestampSlop = conf.getLong(
703 "hbase.hregion.keyvalue.timestamp.slop.millisecs",
704 HConstants.LATEST_TIMESTAMP);
705
706
707
708
709
710 this.rowProcessorTimeout = conf.getLong(
711 "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
712 this.durability = htd.getDurability() == Durability.USE_DEFAULT
713 ? DEFAULT_DURABILITY
714 : htd.getDurability();
715 if (rsServices != null) {
716 this.rsAccounting = this.rsServices.getRegionServerAccounting();
717
718
719 this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
720 this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
721 this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
722
723 Map<String, Region> recoveringRegions = rsServices.getRecoveringRegions();
724 String encodedName = getRegionInfo().getEncodedName();
725 if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
726 this.recovering = true;
727 recoveringRegions.put(encodedName, this);
728 }
729 } else {
730 this.metricsRegionWrapper = null;
731 this.metricsRegion = null;
732 }
733 if (LOG.isDebugEnabled()) {
734
735 LOG.debug("Instantiated " + this);
736 }
737
738
739 this.disallowWritesInRecovering =
740 conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
741 HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
742 configurationManager = Optional.absent();
743
744
745 this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
746 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ?
747 false :
748 conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
749 HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
750 }
751
752 void setHTableSpecificConf() {
753 if (this.htableDescriptor == null) return;
754 long flushSize = this.htableDescriptor.getMemStoreFlushSize();
755
756 if (flushSize <= 0) {
757 flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
758 HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
759 }
760 this.memstoreFlushSize = flushSize;
761 this.blockingMemStoreSize = this.memstoreFlushSize *
762 conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
763 HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
764 }
765
766
767
768
769
770
771
772
773
774 @Deprecated
775 public long initialize() throws IOException {
776 return initialize(null);
777 }
778
779
780
781
782
783
784
785
786 private long initialize(final CancelableProgressable reporter) throws IOException {
787 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
788 long nextSeqId = -1;
789 try {
790 nextSeqId = initializeRegionInternals(reporter, status);
791 return nextSeqId;
792 } finally {
793
794
795 if (nextSeqId == -1) {
796 status.abort("Exception during region " + getRegionInfo().getRegionNameAsString() +
797 " initialization.");
798 }
799 }
800 }
801
802 private long initializeRegionInternals(final CancelableProgressable reporter,
803 final MonitoredTask status) throws IOException {
804 if (coprocessorHost != null) {
805 status.setStatus("Running coprocessor pre-open hook");
806 coprocessorHost.preOpen();
807 }
808
809
810 status.setStatus("Writing region info on filesystem");
811 fs.checkRegionInfoOnFilesystem();
812
813
814 status.setStatus("Initializing all the Stores");
815 long maxSeqId = initializeStores(reporter, status);
816 this.mvcc.advanceTo(maxSeqId);
817 if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
818
819 maxSeqId = Math.max(maxSeqId,
820 replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
821
822 this.mvcc.advanceTo(maxSeqId);
823 }
824 this.lastReplayedOpenRegionSeqId = maxSeqId;
825
826 this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
827 this.writestate.flushRequested = false;
828 this.writestate.compacting.set(0);
829
830 if (this.writestate.writesEnabled) {
831
832 status.setStatus("Cleaning up temporary data from old regions");
833 fs.cleanupTempDir();
834 }
835
836 if (this.writestate.writesEnabled) {
837 status.setStatus("Cleaning up detritus from prior splits");
838
839
840
841 fs.cleanupAnySplitDetritus();
842 fs.cleanupMergesDir();
843 }
844
845
846 this.splitPolicy = RegionSplitPolicy.create(this, conf);
847
848
849 this.flushPolicy = FlushPolicyFactory.create(this, conf);
850
851 long lastFlushTime = EnvironmentEdgeManager.currentTime();
852 for (Store store: stores.values()) {
853 this.lastStoreFlushTimeMap.put(store, lastFlushTime);
854 }
855
856
857
858 long nextSeqid = maxSeqId;
859
860
861
862
863 if (this.writestate.writesEnabled) {
864 nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
865 .getRegionDir(), nextSeqid, (this.recovering ? (this.flushPerChanges + 10000000) : 1));
866 } else {
867 nextSeqid++;
868 }
869
870 LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
871 "; next sequenceid=" + nextSeqid);
872
873
874 this.closing.set(false);
875 this.closed.set(false);
876
877 if (coprocessorHost != null) {
878 status.setStatus("Running coprocessor post-open hooks");
879 coprocessorHost.postOpen();
880 }
881
882 status.markComplete("Region opened successfully");
883 return nextSeqid;
884 }
885
886
887
888
889
890
891
892
893 private long initializeStores(final CancelableProgressable reporter, MonitoredTask status)
894 throws IOException {
895
896
897 long maxSeqId = -1;
898
899 long maxMemstoreTS = -1;
900
901 if (!htableDescriptor.getFamilies().isEmpty()) {
902
903 ThreadPoolExecutor storeOpenerThreadPool =
904 getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
905 CompletionService<HStore> completionService =
906 new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
907
908
909 for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
910 status.setStatus("Instantiating store for column family " + family);
911 completionService.submit(new Callable<HStore>() {
912 @Override
913 public HStore call() throws IOException {
914 return instantiateHStore(family);
915 }
916 });
917 }
918 boolean allStoresOpened = false;
919 try {
920 for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
921 Future<HStore> future = completionService.take();
922 HStore store = future.get();
923 this.stores.put(store.getFamily().getName(), store);
924
925 long storeMaxSequenceId = store.getMaxSequenceId();
926 maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
927 storeMaxSequenceId);
928 if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
929 maxSeqId = storeMaxSequenceId;
930 }
931 long maxStoreMemstoreTS = store.getMaxMemstoreTS();
932 if (maxStoreMemstoreTS > maxMemstoreTS) {
933 maxMemstoreTS = maxStoreMemstoreTS;
934 }
935 }
936 allStoresOpened = true;
937 } catch (InterruptedException e) {
938 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
939 } catch (ExecutionException e) {
940 throw new IOException(e.getCause());
941 } finally {
942 storeOpenerThreadPool.shutdownNow();
943 if (!allStoresOpened) {
944
945 LOG.error("Could not initialize all stores for the region=" + this);
946 for (Store store : this.stores.values()) {
947 try {
948 store.close();
949 } catch (IOException e) {
950 LOG.warn(e.getMessage());
951 }
952 }
953 }
954 }
955 }
956 return Math.max(maxSeqId, maxMemstoreTS + 1);
957 }
958
959 private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
960 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
961
962 status.setStatus("Warming up all the Stores");
963 try {
964 initializeStores(reporter, status);
965 } finally {
966 status.markComplete("Done warming up.");
967 }
968 }
969
970
971
972
973 private NavigableMap<byte[], List<Path>> getStoreFiles() {
974 NavigableMap<byte[], List<Path>> allStoreFiles =
975 new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
976 for (Store store: getStores()) {
977 Collection<StoreFile> storeFiles = store.getStorefiles();
978 if (storeFiles == null) continue;
979 List<Path> storeFileNames = new ArrayList<Path>();
980 for (StoreFile storeFile: storeFiles) {
981 storeFileNames.add(storeFile.getPath());
982 }
983 allStoreFiles.put(store.getFamily().getName(), storeFileNames);
984 }
985 return allStoreFiles;
986 }
987
988 private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
989 Map<byte[], List<Path>> storeFiles = getStoreFiles();
990 RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
991 RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
992 getRegionServerServices().getServerName(), storeFiles);
993 WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc, mvcc);
994 }
995
996 private void writeRegionCloseMarker(WAL wal) throws IOException {
997 Map<byte[], List<Path>> storeFiles = getStoreFiles();
998 RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
999 RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
1000 getRegionServerServices().getServerName(), storeFiles);
1001 WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc);
1002
1003
1004
1005
1006 if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
1007 WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
1008 mvcc.getReadPoint(), 0);
1009 }
1010 }
1011
1012
1013
1014
1015 public boolean hasReferences() {
1016 for (Store store : this.stores.values()) {
1017 if (store.hasReferences()) return true;
1018 }
1019 return false;
1020 }
1021
1022 @Override
1023 public HDFSBlocksDistribution getHDFSBlocksDistribution() {
1024 HDFSBlocksDistribution hdfsBlocksDistribution =
1025 new HDFSBlocksDistribution();
1026 synchronized (this.stores) {
1027 for (Store store : this.stores.values()) {
1028 Collection<StoreFile> storeFiles = store.getStorefiles();
1029 if (storeFiles == null) continue;
1030 for (StoreFile sf : storeFiles) {
1031 HDFSBlocksDistribution storeFileBlocksDistribution =
1032 sf.getHDFSBlockDistribution();
1033 hdfsBlocksDistribution.add(storeFileBlocksDistribution);
1034 }
1035 }
1036 }
1037 return hdfsBlocksDistribution;
1038 }
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1049 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
1050 Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
1051 return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
1052 }
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1064 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath)
1065 throws IOException {
1066 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
1067 FileSystem fs = tablePath.getFileSystem(conf);
1068
1069 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
1070 for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
1071 Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
1072 if (storeFiles == null) continue;
1073 for (StoreFileInfo storeFileInfo : storeFiles) {
1074 try {
1075 hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
1076 } catch (IOException ioe) {
1077 LOG.warn("Error getting hdfs block distribution for " + storeFileInfo);
1078 }
1079 }
1080 }
1081 return hdfsBlocksDistribution;
1082 }
1083
1084
1085
1086
1087
1088
1089 public long addAndGetGlobalMemstoreSize(long memStoreSize) {
1090 if (this.rsAccounting != null) {
1091 rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
1092 }
1093 return this.memstoreSize.addAndGet(memStoreSize);
1094 }
1095
1096 @Override
1097 public HRegionInfo getRegionInfo() {
1098 return this.fs.getRegionInfo();
1099 }
1100
1101
1102
1103
1104
1105 RegionServerServices getRegionServerServices() {
1106 return this.rsServices;
1107 }
1108
1109 @Override
1110 public long getReadRequestsCount() {
1111 return readRequestsCount.get();
1112 }
1113
1114 @Override
1115 public void updateReadRequestsCount(long i) {
1116 readRequestsCount.add(i);
1117 }
1118
1119 @Override
1120 public long getWriteRequestsCount() {
1121 return writeRequestsCount.get();
1122 }
1123
1124 @Override
1125 public void updateWriteRequestsCount(long i) {
1126 writeRequestsCount.add(i);
1127 }
1128
1129 @Override
1130 public long getMemstoreSize() {
1131 return memstoreSize.get();
1132 }
1133
1134 @Override
1135 public long getNumMutationsWithoutWAL() {
1136 return numMutationsWithoutWAL.get();
1137 }
1138
1139 @Override
1140 public long getDataInMemoryWithoutWAL() {
1141 return dataInMemoryWithoutWAL.get();
1142 }
1143
1144 @Override
1145 public long getBlockedRequestsCount() {
1146 return blockedRequestsCount.get();
1147 }
1148
1149 @Override
1150 public long getCheckAndMutateChecksPassed() {
1151 return checkAndMutateChecksPassed.get();
1152 }
1153
1154 @Override
1155 public long getCheckAndMutateChecksFailed() {
1156 return checkAndMutateChecksFailed.get();
1157 }
1158
1159 @Override
1160 public MetricsRegion getMetrics() {
1161 return metricsRegion;
1162 }
1163
1164 @Override
1165 public boolean isClosed() {
1166 return this.closed.get();
1167 }
1168
1169 @Override
1170 public boolean isClosing() {
1171 return this.closing.get();
1172 }
1173
1174 @Override
1175 public boolean isReadOnly() {
1176 return this.writestate.isReadOnly();
1177 }
1178
1179
1180
1181
1182 public void setRecovering(boolean newState) {
1183 boolean wasRecovering = this.recovering;
1184
1185
1186 if (wal != null && getRegionServerServices() != null && !writestate.readOnly
1187 && wasRecovering && !newState) {
1188
1189
1190 boolean forceFlush = getTableDesc().getRegionReplication() > 1;
1191
1192 MonitoredTask status = TaskMonitor.get().createStatus("Recovering region " + this);
1193
1194 try {
1195
1196 if (forceFlush) {
1197 status.setStatus("Flushing region " + this + " because recovery is finished");
1198 internalFlushcache(status);
1199 }
1200
1201 status.setStatus("Writing region open event marker to WAL because recovery is finished");
1202 try {
1203 long seqId = openSeqNum;
1204
1205 if (wal != null) {
1206 seqId = getNextSequenceId(wal);
1207 }
1208 writeRegionOpenMarker(wal, seqId);
1209 } catch (IOException e) {
1210
1211
1212 LOG.warn(getRegionInfo().getEncodedName() + " : was not able to write region opening "
1213 + "event to WAL, continuing", e);
1214 }
1215 } catch (IOException ioe) {
1216
1217
1218 LOG.warn(getRegionInfo().getEncodedName() + " : was not able to flush "
1219 + "event to WAL, continuing", ioe);
1220 } finally {
1221 status.cleanup();
1222 }
1223 }
1224
1225 this.recovering = newState;
1226 if (wasRecovering && !recovering) {
1227
1228 coprocessorHost.postLogReplay();
1229 }
1230 }
1231
1232 @Override
1233 public boolean isRecovering() {
1234 return this.recovering;
1235 }
1236
1237 @Override
1238 public boolean isAvailable() {
1239 return !isClosed() && !isClosing();
1240 }
1241
1242
1243 public boolean isSplittable() {
1244 return isAvailable() && !hasReferences();
1245 }
1246
1247
1248
1249
1250 public boolean isMergeable() {
1251 if (!isAvailable()) {
1252 LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1253 + " is not mergeable because it is closing or closed");
1254 return false;
1255 }
1256 if (hasReferences()) {
1257 LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1258 + " is not mergeable because it has references");
1259 return false;
1260 }
1261
1262 return true;
1263 }
1264
1265 public boolean areWritesEnabled() {
1266 synchronized(this.writestate) {
1267 return this.writestate.writesEnabled;
1268 }
1269 }
1270
1271 public MultiVersionConcurrencyControl getMVCC() {
1272 return mvcc;
1273 }
1274
1275 @Override
1276 public long getMaxFlushedSeqId() {
1277 return maxFlushedSeqId;
1278 }
1279
1280 @Override
1281 public long getReadpoint(IsolationLevel isolationLevel) {
1282 if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1283
1284 return Long.MAX_VALUE;
1285 }
1286 return mvcc.getReadPoint();
1287 }
1288
1289 @Override
1290 public boolean isLoadingCfsOnDemandDefault() {
1291 return this.isLoadingCfsOnDemandDefault;
1292 }
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310 public Map<byte[], List<StoreFile>> close() throws IOException {
1311 return close(false);
1312 }
1313
1314 private final Object closeLock = new Object();
1315
1316
1317 public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1318 "hbase.regionserver.optionalcacheflushinterval";
1319
1320 public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1321
1322 public static final int SYSTEM_CACHE_FLUSH_INTERVAL = 300000;
1323
1324
1325 public static final String MEMSTORE_FLUSH_PER_CHANGES =
1326 "hbase.regionserver.flush.per.changes";
1327 public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000;
1328
1329
1330
1331
1332 public static final long MAX_FLUSH_PER_CHANGES = 1000000000;
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351 public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1352
1353
1354 MonitoredTask status = TaskMonitor.get().createStatus(
1355 "Closing region " + this +
1356 (abort ? " due to abort" : ""));
1357
1358 status.setStatus("Waiting for close lock");
1359 try {
1360 synchronized (closeLock) {
1361 return doClose(abort, status);
1362 }
1363 } finally {
1364 status.cleanup();
1365 }
1366 }
1367
1368
1369
1370
1371 @VisibleForTesting
1372 public void setClosing(boolean closing) {
1373 this.closing.set(closing);
1374 }
1375
1376 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK_EXCEPTION_PATH",
1377 justification="I think FindBugs is confused")
1378 private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1379 throws IOException {
1380 if (isClosed()) {
1381 LOG.warn("Region " + this + " already closed");
1382 return null;
1383 }
1384
1385 if (coprocessorHost != null) {
1386 status.setStatus("Running coprocessor pre-close hooks");
1387 this.coprocessorHost.preClose(abort);
1388 }
1389
1390 status.setStatus("Disabling compacts and flushes for region");
1391 boolean canFlush = true;
1392 synchronized (writestate) {
1393
1394
1395 canFlush = !writestate.readOnly;
1396 writestate.writesEnabled = false;
1397 LOG.debug("Closing " + this + ": disabling compactions & flushes");
1398 waitForFlushesAndCompactions();
1399 }
1400
1401
1402
1403 if (!abort && worthPreFlushing() && canFlush) {
1404 status.setStatus("Pre-flushing region before close");
1405 LOG.info("Running close preflush of " + getRegionInfo().getRegionNameAsString());
1406 try {
1407 internalFlushcache(status);
1408 } catch (IOException ioe) {
1409
1410 status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1411 }
1412 }
1413
1414
1415 lock.writeLock().lock();
1416 this.closing.set(true);
1417 status.setStatus("Disabling writes for close");
1418 try {
1419 if (this.isClosed()) {
1420 status.abort("Already got closed by another process");
1421
1422 return null;
1423 }
1424 LOG.debug("Updates disabled for region " + this);
1425
1426 if (!abort && canFlush) {
1427 int flushCount = 0;
1428 while (this.memstoreSize.get() > 0) {
1429 try {
1430 if (flushCount++ > 0) {
1431 int actualFlushes = flushCount - 1;
1432 if (actualFlushes > 5) {
1433
1434
1435 throw new DroppedSnapshotException("Failed clearing memory after " +
1436 actualFlushes + " attempts on region: " +
1437 Bytes.toStringBinary(getRegionInfo().getRegionName()));
1438 }
1439 LOG.info("Running extra flush, " + actualFlushes +
1440 " (carrying snapshot?) " + this);
1441 }
1442 internalFlushcache(status);
1443 } catch (IOException ioe) {
1444 status.setStatus("Failed flush " + this + ", putting online again");
1445 synchronized (writestate) {
1446 writestate.writesEnabled = true;
1447 }
1448
1449 throw ioe;
1450 }
1451 }
1452 }
1453
1454 Map<byte[], List<StoreFile>> result =
1455 new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1456 if (!stores.isEmpty()) {
1457
1458 ThreadPoolExecutor storeCloserThreadPool =
1459 getStoreOpenAndCloseThreadPool("StoreCloserThread-" +
1460 getRegionInfo().getRegionNameAsString());
1461 CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1462 new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1463
1464
1465 for (final Store store : stores.values()) {
1466 long flushableSize = store.getFlushableSize();
1467 if (!(abort || flushableSize == 0 || writestate.readOnly)) {
1468 if (getRegionServerServices() != null) {
1469 getRegionServerServices().abort("Assertion failed while closing store "
1470 + getRegionInfo().getRegionNameAsString() + " " + store
1471 + ". flushableSize expected=0, actual= " + flushableSize
1472 + ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "
1473 + "operation failed and left the memstore in a partially updated state.", null);
1474 }
1475 }
1476 completionService
1477 .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1478 @Override
1479 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1480 return new Pair<byte[], Collection<StoreFile>>(
1481 store.getFamily().getName(), store.close());
1482 }
1483 });
1484 }
1485 try {
1486 for (int i = 0; i < stores.size(); i++) {
1487 Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1488 Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1489 List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1490 if (familyFiles == null) {
1491 familyFiles = new ArrayList<StoreFile>();
1492 result.put(storeFiles.getFirst(), familyFiles);
1493 }
1494 familyFiles.addAll(storeFiles.getSecond());
1495 }
1496 } catch (InterruptedException e) {
1497 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1498 } catch (ExecutionException e) {
1499 throw new IOException(e.getCause());
1500 } finally {
1501 storeCloserThreadPool.shutdownNow();
1502 }
1503 }
1504
1505 status.setStatus("Writing region close event to WAL");
1506 if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) {
1507 writeRegionCloseMarker(wal);
1508 }
1509
1510 this.closed.set(true);
1511 if (!canFlush) {
1512 addAndGetGlobalMemstoreSize(-memstoreSize.get());
1513 } else if (memstoreSize.get() != 0) {
1514 LOG.error("Memstore size is " + memstoreSize.get());
1515 }
1516 if (coprocessorHost != null) {
1517 status.setStatus("Running coprocessor post-close hooks");
1518 this.coprocessorHost.postClose(abort);
1519 }
1520 if (this.metricsRegion != null) {
1521 this.metricsRegion.close();
1522 }
1523 if (this.metricsRegionWrapper != null) {
1524 Closeables.closeQuietly(this.metricsRegionWrapper);
1525 }
1526 status.markComplete("Closed");
1527 LOG.info("Closed " + this);
1528 return result;
1529 } finally {
1530 lock.writeLock().unlock();
1531 }
1532 }
1533
1534 @Override
1535 public void waitForFlushesAndCompactions() {
1536 synchronized (writestate) {
1537 if (this.writestate.readOnly) {
1538
1539
1540 return;
1541 }
1542 boolean interrupted = false;
1543 try {
1544 while (writestate.compacting.get() > 0 || writestate.flushing) {
1545 LOG.debug("waiting for " + writestate.compacting + " compactions"
1546 + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1547 try {
1548 writestate.wait();
1549 } catch (InterruptedException iex) {
1550
1551 LOG.warn("Interrupted while waiting");
1552 interrupted = true;
1553 }
1554 }
1555 } finally {
1556 if (interrupted) {
1557 Thread.currentThread().interrupt();
1558 }
1559 }
1560 }
1561 }
1562
1563 protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1564 final String threadNamePrefix) {
1565 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1566 int maxThreads = Math.min(numStores,
1567 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1568 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1569 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1570 }
1571
1572 protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1573 final String threadNamePrefix) {
1574 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1575 int maxThreads = Math.max(1,
1576 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1577 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1578 / numStores);
1579 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1580 }
1581
1582 static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1583 final String threadNamePrefix) {
1584 return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1585 new ThreadFactory() {
1586 private int count = 1;
1587
1588 @Override
1589 public Thread newThread(Runnable r) {
1590 return new Thread(r, threadNamePrefix + "-" + count++);
1591 }
1592 });
1593 }
1594
1595
1596
1597
1598 private boolean worthPreFlushing() {
1599 return this.memstoreSize.get() >
1600 this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1601 }
1602
1603
1604
1605
1606
1607 @Override
1608 public HTableDescriptor getTableDesc() {
1609 return this.htableDescriptor;
1610 }
1611
1612
1613 public WAL getWAL() {
1614 return this.wal;
1615 }
1616
1617
1618
1619
1620
1621
1622
1623
1624 Configuration getBaseConf() {
1625 return this.baseConf;
1626 }
1627
1628
1629 public FileSystem getFilesystem() {
1630 return fs.getFileSystem();
1631 }
1632
1633
1634 public HRegionFileSystem getRegionFileSystem() {
1635 return this.fs;
1636 }
1637
1638 @Override
1639 public long getEarliestFlushTimeForAllStores() {
1640 return lastStoreFlushTimeMap.isEmpty() ? Long.MAX_VALUE : Collections.min(lastStoreFlushTimeMap
1641 .values());
1642 }
1643
1644 @Override
1645 public long getOldestHfileTs(boolean majorCompactioOnly) throws IOException {
1646 long result = Long.MAX_VALUE;
1647 for (Store store : getStores()) {
1648 Collection<StoreFile> storeFiles = store.getStorefiles();
1649 if (storeFiles == null) continue;
1650 for (StoreFile file : storeFiles) {
1651 StoreFile.Reader sfReader = file.getReader();
1652 if (sfReader == null) continue;
1653 HFile.Reader reader = sfReader.getHFileReader();
1654 if (reader == null) continue;
1655 if (majorCompactioOnly) {
1656 byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY);
1657 if (val == null) continue;
1658 if (val == null || !Bytes.toBoolean(val)) {
1659 continue;
1660 }
1661 }
1662 result = Math.min(result, reader.getFileContext().getFileCreateTime());
1663 }
1664 }
1665 return result == Long.MAX_VALUE ? 0 : result;
1666 }
1667
1668 RegionLoad.Builder setCompleteSequenceId(RegionLoad.Builder regionLoadBldr) {
1669 long lastFlushOpSeqIdLocal = this.lastFlushOpSeqId;
1670 byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes();
1671 regionLoadBldr.clearStoreCompleteSequenceId();
1672 for (byte[] familyName : this.stores.keySet()) {
1673 long earliest = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
1674
1675
1676
1677 long csid = (earliest == HConstants.NO_SEQNUM)? lastFlushOpSeqIdLocal: earliest - 1;
1678 regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId.
1679 newBuilder().setFamilyName(ByteString.copyFrom(familyName)).setSequenceId(csid).build());
1680 }
1681 return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId());
1682 }
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692 public long getLargestHStoreSize() {
1693 long size = 0;
1694 for (Store h : stores.values()) {
1695 long storeSize = h.getSize();
1696 if (storeSize > size) {
1697 size = storeSize;
1698 }
1699 }
1700 return size;
1701 }
1702
1703
1704
1705
1706 public KeyValue.KVComparator getComparator() {
1707 return this.comparator;
1708 }
1709
1710
1711
1712
1713
1714 protected void doRegionCompactionPrep() throws IOException {
1715 }
1716
1717 @Override
1718 public void triggerMajorCompaction() throws IOException {
1719 for (Store s : getStores()) {
1720 s.triggerMajorCompaction();
1721 }
1722 }
1723
1724 @Override
1725 public void compact(final boolean majorCompaction) throws IOException {
1726 if (majorCompaction) {
1727 triggerMajorCompaction();
1728 }
1729 for (Store s : getStores()) {
1730 CompactionContext compaction = s.requestCompaction();
1731 if (compaction != null) {
1732 CompactionThroughputController controller = null;
1733 if (rsServices != null) {
1734 controller = CompactionThroughputControllerFactory.create(rsServices, conf);
1735 }
1736 if (controller == null) {
1737 controller = NoLimitCompactionThroughputController.INSTANCE;
1738 }
1739 compact(compaction, s, controller, null);
1740 }
1741 }
1742 }
1743
1744
1745
1746
1747
1748
1749
1750 public void compactStores() throws IOException {
1751 for (Store s : getStores()) {
1752 CompactionContext compaction = s.requestCompaction();
1753 if (compaction != null) {
1754 compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null);
1755 }
1756 }
1757 }
1758
1759
1760
1761
1762
1763
1764
1765 @VisibleForTesting
1766 void compactStore(byte[] family, CompactionThroughputController throughputController)
1767 throws IOException {
1768 Store s = getStore(family);
1769 CompactionContext compaction = s.requestCompaction();
1770 if (compaction != null) {
1771 compact(compaction, s, throughputController, null);
1772 }
1773 }
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790 public boolean compact(CompactionContext compaction, Store store,
1791 CompactionThroughputController throughputController) throws IOException {
1792 return compact(compaction, store, throughputController, null);
1793 }
1794
1795 public boolean compact(CompactionContext compaction, Store store,
1796 CompactionThroughputController throughputController, User user) throws IOException {
1797 assert compaction != null && compaction.hasSelection();
1798 assert !compaction.getRequest().getFiles().isEmpty();
1799 if (this.closing.get() || this.closed.get()) {
1800 LOG.debug("Skipping compaction on " + this + " because closing/closed");
1801 store.cancelRequestedCompaction(compaction);
1802 return false;
1803 }
1804 MonitoredTask status = null;
1805 boolean requestNeedsCancellation = true;
1806
1807 lock.readLock().lock();
1808 try {
1809 byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1810 if (stores.get(cf) != store) {
1811 LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1812 + " has been re-instantiated, cancel this compaction request. "
1813 + " It may be caused by the roll back of split transaction");
1814 return false;
1815 }
1816
1817 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1818 if (this.closed.get()) {
1819 String msg = "Skipping compaction on " + this + " because closed";
1820 LOG.debug(msg);
1821 status.abort(msg);
1822 return false;
1823 }
1824 boolean wasStateSet = false;
1825 try {
1826 synchronized (writestate) {
1827 if (writestate.writesEnabled) {
1828 wasStateSet = true;
1829 writestate.compacting.incrementAndGet();
1830 } else {
1831 String msg = "NOT compacting region " + this + ". Writes disabled.";
1832 LOG.info(msg);
1833 status.abort(msg);
1834 return false;
1835 }
1836 }
1837 LOG.info("Starting compaction on " + store + " in region " + this
1838 + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1839 doRegionCompactionPrep();
1840 try {
1841 status.setStatus("Compacting store " + store);
1842
1843
1844 requestNeedsCancellation = false;
1845 store.compact(compaction, throughputController, user);
1846 } catch (InterruptedIOException iioe) {
1847 String msg = "compaction interrupted";
1848 LOG.info(msg, iioe);
1849 status.abort(msg);
1850 return false;
1851 }
1852 } finally {
1853 if (wasStateSet) {
1854 synchronized (writestate) {
1855 writestate.compacting.decrementAndGet();
1856 if (writestate.compacting.get() <= 0) {
1857 writestate.notifyAll();
1858 }
1859 }
1860 }
1861 }
1862 status.markComplete("Compaction complete");
1863 return true;
1864 } finally {
1865 try {
1866 if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
1867 if (status != null) status.cleanup();
1868 } finally {
1869 lock.readLock().unlock();
1870 }
1871 }
1872 }
1873
1874 @Override
1875 public FlushResult flush(boolean force) throws IOException {
1876 return flushcache(force, false);
1877 }
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901 public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
1902 throws IOException {
1903
1904 if (this.closing.get()) {
1905 String msg = "Skipping flush on " + this + " because closing";
1906 LOG.debug(msg);
1907 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1908 }
1909 MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1910 status.setStatus("Acquiring readlock on region");
1911
1912 lock.readLock().lock();
1913 try {
1914 if (this.closed.get()) {
1915 String msg = "Skipping flush on " + this + " because closed";
1916 LOG.debug(msg);
1917 status.abort(msg);
1918 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1919 }
1920 if (coprocessorHost != null) {
1921 status.setStatus("Running coprocessor pre-flush hooks");
1922 coprocessorHost.preFlush();
1923 }
1924
1925
1926 if (numMutationsWithoutWAL.get() > 0) {
1927 numMutationsWithoutWAL.set(0);
1928 dataInMemoryWithoutWAL.set(0);
1929 }
1930 synchronized (writestate) {
1931 if (!writestate.flushing && writestate.writesEnabled) {
1932 this.writestate.flushing = true;
1933 } else {
1934 if (LOG.isDebugEnabled()) {
1935 LOG.debug("NOT flushing memstore for region " + this
1936 + ", flushing=" + writestate.flushing + ", writesEnabled="
1937 + writestate.writesEnabled);
1938 }
1939 String msg = "Not flushing since "
1940 + (writestate.flushing ? "already flushing"
1941 : "writes not enabled");
1942 status.abort(msg);
1943 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1944 }
1945 }
1946
1947 try {
1948 Collection<Store> specificStoresToFlush =
1949 forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
1950 FlushResult fs = internalFlushcache(specificStoresToFlush,
1951 status, writeFlushRequestWalMarker);
1952
1953 if (coprocessorHost != null) {
1954 status.setStatus("Running post-flush coprocessor hooks");
1955 coprocessorHost.postFlush();
1956 }
1957
1958 status.markComplete("Flush successful");
1959 return fs;
1960 } finally {
1961 synchronized (writestate) {
1962 writestate.flushing = false;
1963 this.writestate.flushRequested = false;
1964 writestate.notifyAll();
1965 }
1966 }
1967 } finally {
1968 lock.readLock().unlock();
1969 status.cleanup();
1970 }
1971 }
1972
1973
1974
1975
1976
1977
1978
1979
1980 boolean shouldFlushStore(Store store) {
1981 long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
1982 store.getFamily().getName()) - 1;
1983 if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) {
1984 if (LOG.isDebugEnabled()) {
1985 LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " +
1986 getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest +
1987 " is > " + this.flushPerChanges + " from current=" + mvcc.getReadPoint());
1988 }
1989 return true;
1990 }
1991 if (this.flushCheckInterval <= 0) {
1992 return false;
1993 }
1994 long now = EnvironmentEdgeManager.currentTime();
1995 if (store.timeOfOldestEdit() < now - this.flushCheckInterval) {
1996 if (LOG.isDebugEnabled()) {
1997 LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " +
1998 getRegionInfo().getEncodedName() + " because time of oldest edit=" +
1999 store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now);
2000 }
2001 return true;
2002 }
2003 return false;
2004 }
2005
2006
2007
2008
2009 boolean shouldFlush(final StringBuffer whyFlush) {
2010 whyFlush.setLength(0);
2011
2012 if (this.maxFlushedSeqId > 0
2013 && (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.getReadPoint())) {
2014 whyFlush.append("more than max edits, " + this.flushPerChanges + ", since last flush");
2015 return true;
2016 }
2017 long modifiedFlushCheckInterval = flushCheckInterval;
2018 if (getRegionInfo().isSystemTable() &&
2019 getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
2020 modifiedFlushCheckInterval = SYSTEM_CACHE_FLUSH_INTERVAL;
2021 }
2022 if (modifiedFlushCheckInterval <= 0) {
2023 return false;
2024 }
2025 long now = EnvironmentEdgeManager.currentTime();
2026
2027 if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) {
2028 return false;
2029 }
2030
2031
2032 for (Store s : getStores()) {
2033 if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
2034
2035 whyFlush.append(s.toString() + " has an old edit so flush to free WALs");
2036 return true;
2037 }
2038 }
2039 return false;
2040 }
2041
2042
2043
2044
2045
2046
2047 private FlushResult internalFlushcache(MonitoredTask status)
2048 throws IOException {
2049 return internalFlushcache(stores.values(), status, false);
2050 }
2051
2052
2053
2054
2055
2056
2057 private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
2058 MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
2059 return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
2060 status, writeFlushWalMarker);
2061 }
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091 protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
2092 final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
2093 throws IOException {
2094 PrepareFlushResult result
2095 = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
2096 if (result.result == null) {
2097 return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
2098 } else {
2099 return result.result;
2100 }
2101 }
2102
2103 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
2104 justification="FindBugs seems confused about trxId")
2105 protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid,
2106 final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
2107 throws IOException {
2108 if (this.rsServices != null && this.rsServices.isAborted()) {
2109
2110 throw new IOException("Aborting flush because server is aborted...");
2111 }
2112 final long startTime = EnvironmentEdgeManager.currentTime();
2113
2114 if (this.memstoreSize.get() <= 0) {
2115
2116
2117 MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
2118 this.updatesLock.writeLock().lock();
2119 try {
2120 if (this.memstoreSize.get() <= 0) {
2121
2122
2123
2124
2125
2126
2127 if (wal != null) {
2128 writeEntry = mvcc.begin();
2129 long flushOpSeqId = writeEntry.getWriteNumber();
2130 FlushResult flushResult = new FlushResultImpl(
2131 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
2132 flushOpSeqId,
2133 "Nothing to flush",
2134 writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
2135
2136
2137 mvcc.completeAndWait(writeEntry);
2138 writeEntry = null;
2139 return new PrepareFlushResult(flushResult, myseqid);
2140 } else {
2141 return new PrepareFlushResult(
2142 new FlushResultImpl(
2143 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
2144 "Nothing to flush",
2145 false),
2146 myseqid);
2147 }
2148 }
2149 } finally {
2150 this.updatesLock.writeLock().unlock();
2151 if (writeEntry != null) {
2152 mvcc.complete(writeEntry);
2153 }
2154 }
2155 }
2156
2157 if (LOG.isInfoEnabled()) {
2158
2159 StringBuilder perCfExtras = null;
2160 if (!isAllFamilies(storesToFlush)) {
2161 perCfExtras = new StringBuilder();
2162 for (Store store: storesToFlush) {
2163 perCfExtras.append("; ").append(store.getColumnFamilyName());
2164 perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize()));
2165 }
2166 }
2167 LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
2168 " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
2169 ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
2170 ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid));
2171 }
2172
2173
2174
2175
2176
2177
2178
2179 status.setStatus("Obtaining lock to block concurrent updates");
2180
2181 this.updatesLock.writeLock().lock();
2182 status.setStatus("Preparing to flush by snapshotting stores in " +
2183 getRegionInfo().getEncodedName());
2184 long totalFlushableSizeOfFlushableStores = 0;
2185
2186 Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
2187 for (Store store: storesToFlush) {
2188 flushedFamilyNames.add(store.getFamily().getName());
2189 }
2190
2191 TreeMap<byte[], StoreFlushContext> storeFlushCtxs
2192 = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
2193 TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
2194 Bytes.BYTES_COMPARATOR);
2195 TreeMap<byte[], Long> storeFlushableSize
2196 = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2197
2198
2199
2200 long flushOpSeqId = HConstants.NO_SEQNUM;
2201
2202
2203 long flushedSeqId = HConstants.NO_SEQNUM;
2204 byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
2205
2206 long trxId = 0;
2207 MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin();
2208 try {
2209 try {
2210 if (wal != null) {
2211 Long earliestUnflushedSequenceIdForTheRegion =
2212 wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
2213 if (earliestUnflushedSequenceIdForTheRegion == null) {
2214
2215 String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
2216 status.setStatus(msg);
2217 return new PrepareFlushResult(
2218 new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
2219 myseqid);
2220 }
2221 flushOpSeqId = getNextSequenceId(wal);
2222
2223 flushedSeqId =
2224 earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
2225 flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
2226 } else {
2227
2228 flushedSeqId = flushOpSeqId = myseqid;
2229 }
2230
2231 for (Store s : storesToFlush) {
2232 totalFlushableSizeOfFlushableStores += s.getFlushableSize();
2233 storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
2234 committedFiles.put(s.getFamily().getName(), null);
2235 storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
2236 }
2237
2238
2239 if (wal != null && !writestate.readOnly) {
2240 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
2241 getRegionInfo(), flushOpSeqId, committedFiles);
2242
2243 trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2244 desc, false, mvcc);
2245 }
2246
2247
2248 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2249 flush.prepare();
2250 }
2251 } catch (IOException ex) {
2252 if (wal != null) {
2253 if (trxId > 0) {
2254 try {
2255 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2256 getRegionInfo(), flushOpSeqId, committedFiles);
2257 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2258 desc, false, mvcc);
2259 } catch (Throwable t) {
2260 LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2261 StringUtils.stringifyException(t));
2262
2263 }
2264 }
2265
2266 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2267 throw ex;
2268 }
2269 } finally {
2270 this.updatesLock.writeLock().unlock();
2271 }
2272 String s = "Finished memstore snapshotting " + this +
2273 ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
2274 status.setStatus(s);
2275 if (LOG.isTraceEnabled()) LOG.trace(s);
2276
2277
2278 if (wal != null) {
2279 try {
2280 wal.sync();
2281 } catch (IOException ioe) {
2282 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2283 throw ioe;
2284 }
2285 }
2286
2287
2288
2289
2290
2291
2292 mvcc.completeAndWait(writeEntry);
2293
2294
2295 writeEntry = null;
2296 } finally {
2297 if (writeEntry != null) {
2298
2299 mvcc.complete(writeEntry);
2300 }
2301 }
2302 return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
2303 flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores);
2304 }
2305
2306
2307
2308
2309
2310 private boolean isAllFamilies(final Collection<Store> families) {
2311 return families == null || this.stores.size() == families.size();
2312 }
2313
2314
2315
2316
2317
2318
2319
2320 private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
2321 if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
2322 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
2323 getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR));
2324 try {
2325 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2326 desc, true, mvcc);
2327 return true;
2328 } catch (IOException e) {
2329 LOG.warn(getRegionInfo().getEncodedName() + " : "
2330 + "Received exception while trying to write the flush request to wal", e);
2331 }
2332 }
2333 return false;
2334 }
2335
2336 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
2337 justification="Intentional; notify is about completed flush")
2338 protected FlushResult internalFlushCacheAndCommit(
2339 final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
2340 final Collection<Store> storesToFlush)
2341 throws IOException {
2342
2343
2344 TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
2345 TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
2346 long startTime = prepareResult.startTime;
2347 long flushOpSeqId = prepareResult.flushOpSeqId;
2348 long flushedSeqId = prepareResult.flushedSeqId;
2349 long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
2350
2351 String s = "Flushing stores of " + this;
2352 status.setStatus(s);
2353 if (LOG.isTraceEnabled()) LOG.trace(s);
2354
2355
2356
2357
2358
2359 boolean compactionRequested = false;
2360 try {
2361
2362
2363
2364
2365
2366 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2367 flush.flushCache(status);
2368 }
2369
2370
2371
2372 Iterator<Store> it = storesToFlush.iterator();
2373
2374 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2375 boolean needsCompaction = flush.commit(status);
2376 if (needsCompaction) {
2377 compactionRequested = true;
2378 }
2379 byte[] storeName = it.next().getFamily().getName();
2380 List<Path> storeCommittedFiles = flush.getCommittedFiles();
2381 committedFiles.put(storeName, storeCommittedFiles);
2382
2383 if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
2384 totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
2385 }
2386 }
2387 storeFlushCtxs.clear();
2388
2389
2390 this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
2391
2392 if (wal != null) {
2393
2394 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
2395 getRegionInfo(), flushOpSeqId, committedFiles);
2396 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2397 desc, true, mvcc);
2398 }
2399 } catch (Throwable t) {
2400
2401
2402
2403
2404
2405
2406 if (wal != null) {
2407 try {
2408 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2409 getRegionInfo(), flushOpSeqId, committedFiles);
2410 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2411 desc, false, mvcc);
2412 } catch (Throwable ex) {
2413 LOG.warn(getRegionInfo().getEncodedName() + " : "
2414 + "failed writing ABORT_FLUSH marker to WAL", ex);
2415
2416 }
2417 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2418 }
2419 DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
2420 Bytes.toStringBinary(getRegionInfo().getRegionName()));
2421 dse.initCause(t);
2422 status.abort("Flush failed: " + StringUtils.stringifyException(t));
2423
2424
2425
2426
2427
2428 this.closing.set(true);
2429
2430 if (rsServices != null) {
2431
2432 rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
2433 }
2434
2435 throw dse;
2436 }
2437
2438
2439 if (wal != null) {
2440 wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2441 }
2442
2443
2444 for (Store store: storesToFlush) {
2445 this.lastStoreFlushTimeMap.put(store, startTime);
2446 }
2447
2448 this.maxFlushedSeqId = flushedSeqId;
2449 this.lastFlushOpSeqId = flushOpSeqId;
2450
2451
2452
2453 synchronized (this) {
2454 notifyAll();
2455 }
2456
2457 long time = EnvironmentEdgeManager.currentTime() - startTime;
2458 long memstoresize = this.memstoreSize.get();
2459 String msg = "Finished memstore flush of ~"
2460 + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
2461 + totalFlushableSizeOfFlushableStores + ", currentsize="
2462 + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
2463 + " for region " + this + " in " + time + "ms, sequenceid="
2464 + flushOpSeqId + ", compaction requested=" + compactionRequested
2465 + ((wal == null) ? "; wal=null" : "");
2466 LOG.info(msg);
2467 status.setStatus(msg);
2468
2469 return new FlushResultImpl(compactionRequested ?
2470 FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
2471 FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
2472 flushOpSeqId);
2473 }
2474
2475
2476
2477
2478
2479
2480 @VisibleForTesting
2481 protected long getNextSequenceId(final WAL wal) throws IOException {
2482
2483
2484
2485
2486
2487
2488 WALKey key = this.appendEmptyEdit(wal);
2489 mvcc.complete(key.getWriteEntry());
2490 return key.getSequenceId(this.maxWaitForSeqId);
2491 }
2492
2493
2494
2495
2496
2497 @Override
2498 public Result getClosestRowBefore(final byte [] row, final byte [] family) throws IOException {
2499 if (coprocessorHost != null) {
2500 Result result = new Result();
2501 if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2502 return result;
2503 }
2504 }
2505
2506
2507 checkRow(row, "getClosestRowBefore");
2508 startRegionOperation(Operation.GET);
2509 this.readRequestsCount.increment();
2510 try {
2511 Store store = getStore(family);
2512
2513 Cell key = store.getRowKeyAtOrBefore(row);
2514 Result result = null;
2515 if (key != null) {
2516 Get get = new Get(CellUtil.cloneRow(key));
2517 get.addFamily(family);
2518 result = get(get);
2519 }
2520 if (coprocessorHost != null) {
2521 coprocessorHost.postGetClosestRowBefore(row, family, result);
2522 }
2523 return result;
2524 } finally {
2525 closeRegionOperation(Operation.GET);
2526 }
2527 }
2528
2529 @Override
2530 public RegionScanner getScanner(Scan scan) throws IOException {
2531 return getScanner(scan, null);
2532 }
2533
2534 @Override
2535 public RegionScanner getScanner(Scan scan,
2536 List<KeyValueScanner> additionalScanners) throws IOException {
2537 startRegionOperation(Operation.SCAN);
2538 try {
2539
2540 if (!scan.hasFamilies()) {
2541
2542 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
2543 scan.addFamily(family);
2544 }
2545 } else {
2546 for (byte [] family : scan.getFamilyMap().keySet()) {
2547 checkFamily(family);
2548 }
2549 }
2550 return instantiateRegionScanner(scan, additionalScanners);
2551 } finally {
2552 closeRegionOperation(Operation.SCAN);
2553 }
2554 }
2555
2556 protected RegionScanner instantiateRegionScanner(Scan scan,
2557 List<KeyValueScanner> additionalScanners) throws IOException {
2558 if (scan.isReversed()) {
2559 if (scan.getFilter() != null) {
2560 scan.getFilter().setReversed(true);
2561 }
2562 return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2563 }
2564 return new RegionScannerImpl(scan, additionalScanners, this);
2565 }
2566
2567 @Override
2568 public void prepareDelete(Delete delete) throws IOException {
2569
2570 if(delete.getFamilyCellMap().isEmpty()){
2571 for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2572
2573 delete.addFamily(family, delete.getTimeStamp());
2574 }
2575 } else {
2576 for(byte [] family : delete.getFamilyCellMap().keySet()) {
2577 if(family == null) {
2578 throw new NoSuchColumnFamilyException("Empty family is invalid");
2579 }
2580 checkFamily(family);
2581 }
2582 }
2583 }
2584
2585 @Override
2586 public void delete(Delete delete) throws IOException {
2587 checkReadOnly();
2588 checkResources();
2589 startRegionOperation(Operation.DELETE);
2590 try {
2591 delete.getRow();
2592
2593 doBatchMutate(delete);
2594 } finally {
2595 closeRegionOperation(Operation.DELETE);
2596 }
2597 }
2598
2599
2600
2601
2602 private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2603
2604
2605
2606
2607
2608
2609 void delete(NavigableMap<byte[], List<Cell>> familyMap,
2610 Durability durability) throws IOException {
2611 Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2612 delete.setFamilyCellMap(familyMap);
2613 delete.setDurability(durability);
2614 doBatchMutate(delete);
2615 }
2616
2617 @Override
2618 public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2619 byte[] byteNow) throws IOException {
2620 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2621
2622 byte[] family = e.getKey();
2623 List<Cell> cells = e.getValue();
2624 assert cells instanceof RandomAccess;
2625
2626 Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2627 int listSize = cells.size();
2628 for (int i=0; i < listSize; i++) {
2629 Cell cell = cells.get(i);
2630
2631
2632 if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {
2633 byte[] qual = CellUtil.cloneQualifier(cell);
2634 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2635
2636 Integer count = kvCount.get(qual);
2637 if (count == null) {
2638 kvCount.put(qual, 1);
2639 } else {
2640 kvCount.put(qual, count + 1);
2641 }
2642 count = kvCount.get(qual);
2643
2644 Get get = new Get(CellUtil.cloneRow(cell));
2645 get.setMaxVersions(count);
2646 get.addColumn(family, qual);
2647 if (coprocessorHost != null) {
2648 if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2649 byteNow, get)) {
2650 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2651 }
2652 } else {
2653 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2654 }
2655 } else {
2656 CellUtil.updateLatestStamp(cell, byteNow, 0);
2657 }
2658 }
2659 }
2660 }
2661
2662 void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)
2663 throws IOException {
2664 List<Cell> result = get(get, false);
2665
2666 if (result.size() < count) {
2667
2668 CellUtil.updateLatestStamp(cell, byteNow, 0);
2669 return;
2670 }
2671 if (result.size() > count) {
2672 throw new RuntimeException("Unexpected size: " + result.size());
2673 }
2674 Cell getCell = result.get(count - 1);
2675 CellUtil.setTimestamp(cell, getCell.getTimestamp());
2676 }
2677
2678 @Override
2679 public void put(Put put) throws IOException {
2680 checkReadOnly();
2681
2682
2683
2684
2685
2686 checkResources();
2687 startRegionOperation(Operation.PUT);
2688 try {
2689
2690 doBatchMutate(put);
2691 } finally {
2692 closeRegionOperation(Operation.PUT);
2693 }
2694 }
2695
2696
2697
2698
2699
2700
2701 private abstract static class BatchOperationInProgress<T> {
2702 T[] operations;
2703 int nextIndexToProcess = 0;
2704 OperationStatus[] retCodeDetails;
2705 WALEdit[] walEditsFromCoprocessors;
2706
2707 public BatchOperationInProgress(T[] operations) {
2708 this.operations = operations;
2709 this.retCodeDetails = new OperationStatus[operations.length];
2710 this.walEditsFromCoprocessors = new WALEdit[operations.length];
2711 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2712 }
2713
2714 public abstract Mutation getMutation(int index);
2715 public abstract long getNonceGroup(int index);
2716 public abstract long getNonce(int index);
2717
2718 public abstract Mutation[] getMutationsForCoprocs();
2719 public abstract boolean isInReplay();
2720 public abstract long getReplaySequenceId();
2721
2722 public boolean isDone() {
2723 return nextIndexToProcess == operations.length;
2724 }
2725 }
2726
2727 private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2728 private long nonceGroup;
2729 private long nonce;
2730 public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2731 super(operations);
2732 this.nonceGroup = nonceGroup;
2733 this.nonce = nonce;
2734 }
2735
2736 @Override
2737 public Mutation getMutation(int index) {
2738 return this.operations[index];
2739 }
2740
2741 @Override
2742 public long getNonceGroup(int index) {
2743 return nonceGroup;
2744 }
2745
2746 @Override
2747 public long getNonce(int index) {
2748 return nonce;
2749 }
2750
2751 @Override
2752 public Mutation[] getMutationsForCoprocs() {
2753 return this.operations;
2754 }
2755
2756 @Override
2757 public boolean isInReplay() {
2758 return false;
2759 }
2760
2761 @Override
2762 public long getReplaySequenceId() {
2763 return 0;
2764 }
2765 }
2766
2767 private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
2768 private long replaySeqId = 0;
2769 public ReplayBatch(MutationReplay[] operations, long seqId) {
2770 super(operations);
2771 this.replaySeqId = seqId;
2772 }
2773
2774 @Override
2775 public Mutation getMutation(int index) {
2776 return this.operations[index].mutation;
2777 }
2778
2779 @Override
2780 public long getNonceGroup(int index) {
2781 return this.operations[index].nonceGroup;
2782 }
2783
2784 @Override
2785 public long getNonce(int index) {
2786 return this.operations[index].nonce;
2787 }
2788
2789 @Override
2790 public Mutation[] getMutationsForCoprocs() {
2791 assert false;
2792 throw new RuntimeException("Should not be called for replay batch");
2793 }
2794
2795 @Override
2796 public boolean isInReplay() {
2797 return true;
2798 }
2799
2800 @Override
2801 public long getReplaySequenceId() {
2802 return this.replaySeqId;
2803 }
2804 }
2805
2806 @Override
2807 public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
2808 throws IOException {
2809
2810
2811
2812
2813 return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2814 }
2815
2816 public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2817 return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2818 }
2819
2820 @Override
2821 public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
2822 throws IOException {
2823 if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
2824 && replaySeqId < lastReplayedOpenRegionSeqId) {
2825
2826
2827 if (LOG.isTraceEnabled()) {
2828 LOG.trace(getRegionInfo().getEncodedName() + " : "
2829 + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId
2830 + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);
2831 for (MutationReplay mut : mutations) {
2832 LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);
2833 }
2834 }
2835
2836 OperationStatus[] statuses = new OperationStatus[mutations.length];
2837 for (int i = 0; i < statuses.length; i++) {
2838 statuses[i] = OperationStatus.SUCCESS;
2839 }
2840 return statuses;
2841 }
2842 return batchMutate(new ReplayBatch(mutations, replaySeqId));
2843 }
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853 OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2854 boolean initialized = false;
2855 Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
2856 startRegionOperation(op);
2857 try {
2858 while (!batchOp.isDone()) {
2859 if (!batchOp.isInReplay()) {
2860 checkReadOnly();
2861 }
2862 checkResources();
2863
2864 if (!initialized) {
2865 this.writeRequestsCount.add(batchOp.operations.length);
2866 if (!batchOp.isInReplay()) {
2867 doPreMutationHook(batchOp);
2868 }
2869 initialized = true;
2870 }
2871 long addedSize = doMiniBatchMutation(batchOp);
2872 long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2873 if (isFlushSize(newSize)) {
2874 requestFlush();
2875 }
2876 }
2877 } finally {
2878 closeRegionOperation(op);
2879 }
2880 return batchOp.retCodeDetails;
2881 }
2882
2883
2884 private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2885 throws IOException {
2886
2887 WALEdit walEdit = new WALEdit();
2888 if (coprocessorHost != null) {
2889 for (int i = 0 ; i < batchOp.operations.length; i++) {
2890 Mutation m = batchOp.getMutation(i);
2891 if (m instanceof Put) {
2892 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2893
2894
2895 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2896 }
2897 } else if (m instanceof Delete) {
2898 Delete curDel = (Delete) m;
2899 if (curDel.getFamilyCellMap().isEmpty()) {
2900
2901 prepareDelete(curDel);
2902 }
2903 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2904
2905
2906 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2907 }
2908 } else {
2909
2910
2911
2912 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2913 "Put/Delete mutations only supported in batchMutate() now");
2914 }
2915 if (!walEdit.isEmpty()) {
2916 batchOp.walEditsFromCoprocessors[i] = walEdit;
2917 walEdit = new WALEdit();
2918 }
2919 }
2920 }
2921 }
2922
2923 @SuppressWarnings("unchecked")
2924 private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2925 boolean isInReplay = batchOp.isInReplay();
2926
2927 boolean putsCfSetConsistent = true;
2928
2929 Set<byte[]> putsCfSet = null;
2930
2931 boolean deletesCfSetConsistent = true;
2932
2933 Set<byte[]> deletesCfSet = null;
2934
2935 long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2936 WALEdit walEdit = new WALEdit(isInReplay);
2937 MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
2938 long txid = 0;
2939 boolean doRollBackMemstore = false;
2940 boolean locked = false;
2941
2942
2943 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2944
2945 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2946
2947 int firstIndex = batchOp.nextIndexToProcess;
2948 int lastIndexExclusive = firstIndex;
2949 boolean success = false;
2950 int noOfPuts = 0, noOfDeletes = 0;
2951 WALKey walKey = null;
2952 long mvccNum = 0;
2953 try {
2954
2955
2956
2957
2958 int numReadyToWrite = 0;
2959 long now = EnvironmentEdgeManager.currentTime();
2960 while (lastIndexExclusive < batchOp.operations.length) {
2961 Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2962 boolean isPutMutation = mutation instanceof Put;
2963
2964 Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2965
2966 familyMaps[lastIndexExclusive] = familyMap;
2967
2968
2969 if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2970 != OperationStatusCode.NOT_RUN) {
2971 lastIndexExclusive++;
2972 continue;
2973 }
2974
2975 try {
2976 if (isPutMutation) {
2977
2978 if (isInReplay) {
2979 removeNonExistentColumnFamilyForReplay(familyMap);
2980 } else {
2981 checkFamilies(familyMap.keySet());
2982 }
2983 checkTimestamps(mutation.getFamilyCellMap(), now);
2984 } else {
2985 prepareDelete((Delete) mutation);
2986 }
2987 checkRow(mutation.getRow(), "doMiniBatchMutation");
2988 } catch (NoSuchColumnFamilyException nscf) {
2989 LOG.warn("No such column family in batch mutation", nscf);
2990 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2991 OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2992 lastIndexExclusive++;
2993 continue;
2994 } catch (FailedSanityCheckException fsce) {
2995 LOG.warn("Batch Mutation did not pass sanity check", fsce);
2996 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2997 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2998 lastIndexExclusive++;
2999 continue;
3000 } catch (WrongRegionException we) {
3001 LOG.warn("Batch mutation had a row that does not belong to this region", we);
3002 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
3003 OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
3004 lastIndexExclusive++;
3005 continue;
3006 }
3007
3008
3009
3010 RowLock rowLock = null;
3011 try {
3012 rowLock = getRowLock(mutation.getRow(), true);
3013 } catch (IOException ioe) {
3014 LOG.warn("Failed getting lock in batch put, row="
3015 + Bytes.toStringBinary(mutation.getRow()), ioe);
3016 }
3017 if (rowLock == null) {
3018
3019 break;
3020 } else {
3021 acquiredRowLocks.add(rowLock);
3022 }
3023
3024 lastIndexExclusive++;
3025 numReadyToWrite++;
3026
3027 if (isPutMutation) {
3028
3029
3030
3031 if (putsCfSet == null) {
3032 putsCfSet = mutation.getFamilyCellMap().keySet();
3033 } else {
3034 putsCfSetConsistent = putsCfSetConsistent
3035 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
3036 }
3037 } else {
3038 if (deletesCfSet == null) {
3039 deletesCfSet = mutation.getFamilyCellMap().keySet();
3040 } else {
3041 deletesCfSetConsistent = deletesCfSetConsistent
3042 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
3043 }
3044 }
3045 }
3046
3047
3048
3049 now = EnvironmentEdgeManager.currentTime();
3050 byte[] byteNow = Bytes.toBytes(now);
3051
3052
3053 if (numReadyToWrite <= 0) return 0L;
3054
3055
3056
3057
3058
3059
3060 for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
3061
3062 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3063 != OperationStatusCode.NOT_RUN) continue;
3064
3065 Mutation mutation = batchOp.getMutation(i);
3066 if (mutation instanceof Put) {
3067 updateCellTimestamps(familyMaps[i].values(), byteNow);
3068 noOfPuts++;
3069 } else {
3070 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
3071 noOfDeletes++;
3072 }
3073 rewriteCellTags(familyMaps[i], mutation);
3074 }
3075
3076 lock(this.updatesLock.readLock(), numReadyToWrite);
3077 locked = true;
3078
3079
3080 if (!isInReplay && coprocessorHost != null) {
3081 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3082 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3083 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3084 if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
3085 }
3086
3087
3088
3089
3090 Durability durability = Durability.USE_DEFAULT;
3091 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3092
3093 if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
3094 continue;
3095 }
3096
3097 Mutation m = batchOp.getMutation(i);
3098 Durability tmpDur = getEffectiveDurability(m.getDurability());
3099 if (tmpDur.ordinal() > durability.ordinal()) {
3100 durability = tmpDur;
3101 }
3102 if (tmpDur == Durability.SKIP_WAL) {
3103 recordMutationWithoutWal(m.getFamilyCellMap());
3104 continue;
3105 }
3106
3107 long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
3108
3109
3110
3111 if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
3112 if (walEdit.size() > 0) {
3113 assert isInReplay;
3114 if (!isInReplay) {
3115 throw new IOException("Multiple nonces per batch and not in replay");
3116 }
3117
3118
3119 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3120 this.htableDescriptor.getTableName(), now, m.getClusterIds(),
3121 currentNonceGroup, currentNonce, mvcc);
3122 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
3123 walEdit, true);
3124 walEdit = new WALEdit(isInReplay);
3125 walKey = null;
3126 }
3127 currentNonceGroup = nonceGroup;
3128 currentNonce = nonce;
3129 }
3130
3131
3132 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
3133 if (fromCP != null) {
3134 for (Cell cell : fromCP.getCells()) {
3135 walEdit.add(cell);
3136 }
3137 }
3138 addFamilyMapToWALEdit(familyMaps[i], walEdit);
3139 }
3140
3141
3142
3143
3144 Mutation mutation = batchOp.getMutation(firstIndex);
3145 if (isInReplay) {
3146
3147 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3148 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3149 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
3150 long replaySeqId = batchOp.getReplaySequenceId();
3151 walKey.setOrigLogSeqNum(replaySeqId);
3152 }
3153 if (walEdit.size() > 0) {
3154 if (!isInReplay) {
3155
3156 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3157 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3158 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
3159 }
3160 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
3161 }
3162
3163
3164
3165 if (walKey == null) {
3166
3167 walKey = this.appendEmptyEdit(this.wal);
3168 }
3169 if (!isInReplay) {
3170 writeEntry = walKey.getWriteEntry();
3171 mvccNum = writeEntry.getWriteNumber();
3172 } else {
3173 mvccNum = batchOp.getReplaySequenceId();
3174 }
3175
3176
3177
3178
3179
3180
3181
3182
3183
3184
3185 long addedSize = 0;
3186 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3187 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3188 != OperationStatusCode.NOT_RUN) {
3189 continue;
3190 }
3191 doRollBackMemstore = true;
3192 addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay);
3193 }
3194
3195
3196
3197
3198 if (locked) {
3199 this.updatesLock.readLock().unlock();
3200 locked = false;
3201 }
3202 releaseRowLocks(acquiredRowLocks);
3203
3204
3205
3206
3207 if (txid != 0) {
3208 syncOrDefer(txid, durability);
3209 }
3210
3211 doRollBackMemstore = false;
3212
3213 if (!isInReplay && coprocessorHost != null) {
3214 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3215 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3216 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3217 coprocessorHost.postBatchMutate(miniBatchOp);
3218 }
3219
3220
3221
3222
3223 if (writeEntry != null) {
3224 mvcc.completeAndWait(writeEntry);
3225 writeEntry = null;
3226 } else if (isInReplay) {
3227
3228 mvcc.advanceTo(mvccNum);
3229 }
3230
3231 for (int i = firstIndex; i < lastIndexExclusive; i ++) {
3232 if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
3233 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
3234 }
3235 }
3236
3237
3238
3239
3240
3241 if (!isInReplay && coprocessorHost != null) {
3242 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3243
3244 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3245 != OperationStatusCode.SUCCESS) {
3246 continue;
3247 }
3248 Mutation m = batchOp.getMutation(i);
3249 if (m instanceof Put) {
3250 coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
3251 } else {
3252 coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
3253 }
3254 }
3255 }
3256
3257 success = true;
3258 return addedSize;
3259 } finally {
3260
3261 if (doRollBackMemstore) {
3262 for (int j = 0; j < familyMaps.length; j++) {
3263 for(List<Cell> cells:familyMaps[j].values()) {
3264 rollbackMemstore(cells);
3265 }
3266 }
3267 if (writeEntry != null) mvcc.complete(writeEntry);
3268 } else if (writeEntry != null) {
3269 mvcc.completeAndWait(writeEntry);
3270 }
3271
3272 if (locked) {
3273 this.updatesLock.readLock().unlock();
3274 }
3275 releaseRowLocks(acquiredRowLocks);
3276
3277
3278
3279
3280
3281
3282
3283 if (noOfPuts > 0) {
3284
3285 if (this.metricsRegion != null) {
3286 this.metricsRegion.updatePut();
3287 }
3288 }
3289 if (noOfDeletes > 0) {
3290
3291 if (this.metricsRegion != null) {
3292 this.metricsRegion.updateDelete();
3293 }
3294 }
3295 if (!success) {
3296 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3297 if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
3298 batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
3299 }
3300 }
3301 }
3302 if (coprocessorHost != null && !batchOp.isInReplay()) {
3303
3304
3305 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3306 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3307 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
3308 lastIndexExclusive);
3309 coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
3310 }
3311
3312 batchOp.nextIndexToProcess = lastIndexExclusive;
3313 }
3314 }
3315
3316
3317
3318
3319
3320 protected Durability getEffectiveDurability(Durability d) {
3321 return d == Durability.USE_DEFAULT ? this.durability : d;
3322 }
3323
3324
3325
3326
3327
3328
3329 @Override
3330 public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
3331 CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
3332 boolean writeToWAL)
3333 throws IOException{
3334 checkReadOnly();
3335
3336
3337 checkResources();
3338 boolean isPut = w instanceof Put;
3339 if (!isPut && !(w instanceof Delete))
3340 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
3341 "be Put or Delete");
3342 if (!Bytes.equals(row, w.getRow())) {
3343 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
3344 "getRow must match the passed row");
3345 }
3346
3347 startRegionOperation();
3348 try {
3349 Get get = new Get(row);
3350 checkFamily(family);
3351 get.addColumn(family, qualifier);
3352
3353
3354 RowLock rowLock = getRowLock(get.getRow());
3355
3356 mvcc.await();
3357 try {
3358 if (this.getCoprocessorHost() != null) {
3359 Boolean processed = null;
3360 if (w instanceof Put) {
3361 processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
3362 qualifier, compareOp, comparator, (Put) w);
3363 } else if (w instanceof Delete) {
3364 processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
3365 qualifier, compareOp, comparator, (Delete) w);
3366 }
3367 if (processed != null) {
3368 return processed;
3369 }
3370 }
3371 List<Cell> result = get(get, false);
3372
3373 boolean valueIsNull = comparator.getValue() == null ||
3374 comparator.getValue().length == 0;
3375 boolean matches = false;
3376 long cellTs = 0;
3377 if (result.size() == 0 && valueIsNull) {
3378 matches = true;
3379 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3380 valueIsNull) {
3381 matches = true;
3382 cellTs = result.get(0).getTimestamp();
3383 } else if (result.size() == 1 && !valueIsNull) {
3384 Cell kv = result.get(0);
3385 cellTs = kv.getTimestamp();
3386 int compareResult = comparator.compareTo(kv.getValueArray(),
3387 kv.getValueOffset(), kv.getValueLength());
3388 switch (compareOp) {
3389 case LESS:
3390 matches = compareResult < 0;
3391 break;
3392 case LESS_OR_EQUAL:
3393 matches = compareResult <= 0;
3394 break;
3395 case EQUAL:
3396 matches = compareResult == 0;
3397 break;
3398 case NOT_EQUAL:
3399 matches = compareResult != 0;
3400 break;
3401 case GREATER_OR_EQUAL:
3402 matches = compareResult >= 0;
3403 break;
3404 case GREATER:
3405 matches = compareResult > 0;
3406 break;
3407 default:
3408 throw new RuntimeException("Unknown Compare op " + compareOp.name());
3409 }
3410 }
3411
3412 if (matches) {
3413
3414
3415
3416
3417 long now = EnvironmentEdgeManager.currentTime();
3418 long ts = Math.max(now, cellTs);
3419 byte[] byteTs = Bytes.toBytes(ts);
3420
3421 if (w instanceof Put) {
3422 updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3423 }
3424
3425
3426
3427
3428
3429 doBatchMutate(w);
3430 this.checkAndMutateChecksPassed.increment();
3431 return true;
3432 }
3433 this.checkAndMutateChecksFailed.increment();
3434 return false;
3435 } finally {
3436 rowLock.release();
3437 }
3438 } finally {
3439 closeRegionOperation();
3440 }
3441 }
3442
3443
3444
3445
3446
3447
3448 @Override
3449 public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
3450 CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
3451 boolean writeToWAL) throws IOException {
3452 checkReadOnly();
3453
3454
3455 checkResources();
3456
3457 startRegionOperation();
3458 try {
3459 Get get = new Get(row);
3460 checkFamily(family);
3461 get.addColumn(family, qualifier);
3462
3463
3464 RowLock rowLock = getRowLock(get.getRow());
3465
3466 mvcc.await();
3467 try {
3468 List<Cell> result = get(get, false);
3469
3470 boolean valueIsNull = comparator.getValue() == null ||
3471 comparator.getValue().length == 0;
3472 boolean matches = false;
3473 long cellTs = 0;
3474 if (result.size() == 0 && valueIsNull) {
3475 matches = true;
3476 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3477 valueIsNull) {
3478 matches = true;
3479 cellTs = result.get(0).getTimestamp();
3480 } else if (result.size() == 1 && !valueIsNull) {
3481 Cell kv = result.get(0);
3482 cellTs = kv.getTimestamp();
3483 int compareResult = comparator.compareTo(kv.getValueArray(),
3484 kv.getValueOffset(), kv.getValueLength());
3485 switch (compareOp) {
3486 case LESS:
3487 matches = compareResult < 0;
3488 break;
3489 case LESS_OR_EQUAL:
3490 matches = compareResult <= 0;
3491 break;
3492 case EQUAL:
3493 matches = compareResult == 0;
3494 break;
3495 case NOT_EQUAL:
3496 matches = compareResult != 0;
3497 break;
3498 case GREATER_OR_EQUAL:
3499 matches = compareResult >= 0;
3500 break;
3501 case GREATER:
3502 matches = compareResult > 0;
3503 break;
3504 default:
3505 throw new RuntimeException("Unknown Compare op " + compareOp.name());
3506 }
3507 }
3508
3509 if (matches) {
3510
3511
3512
3513
3514 long now = EnvironmentEdgeManager.currentTime();
3515 long ts = Math.max(now, cellTs);
3516 byte[] byteTs = Bytes.toBytes(ts);
3517
3518 for (Mutation w : rm.getMutations()) {
3519 if (w instanceof Put) {
3520 updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3521 }
3522
3523
3524 }
3525
3526
3527
3528 mutateRow(rm);
3529 this.checkAndMutateChecksPassed.increment();
3530 return true;
3531 }
3532 this.checkAndMutateChecksFailed.increment();
3533 return false;
3534 } finally {
3535 rowLock.release();
3536 }
3537 } finally {
3538 closeRegionOperation();
3539 }
3540 }
3541
3542 private void doBatchMutate(Mutation mutation) throws IOException {
3543
3544 OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
3545 if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
3546 throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
3547 } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
3548 throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
3549 }
3550 }
3551
3552
3553
3554
3555
3556
3557
3558
3559
3560
3561
3562
3563
3564
3565 public void addRegionToSnapshot(SnapshotDescription desc,
3566 ForeignExceptionSnare exnSnare) throws IOException {
3567 Path rootDir = FSUtils.getRootDir(conf);
3568 Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
3569
3570 SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3571 snapshotDir, desc, exnSnare);
3572 manifest.addRegion(this);
3573 }
3574
3575 @Override
3576 public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
3577 throws IOException {
3578 for (List<Cell> cells: cellItr) {
3579 if (cells == null) continue;
3580 assert cells instanceof RandomAccess;
3581 int listSize = cells.size();
3582 for (int i = 0; i < listSize; i++) {
3583 CellUtil.updateLatestStamp(cells.get(i), now, 0);
3584 }
3585 }
3586 }
3587
3588
3589
3590
3591 void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
3592
3593
3594
3595 if (m.getTTL() == Long.MAX_VALUE) {
3596 return;
3597 }
3598
3599
3600
3601 for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
3602 List<Cell> cells = e.getValue();
3603 assert cells instanceof RandomAccess;
3604 int listSize = cells.size();
3605 for (int i = 0; i < listSize; i++) {
3606 Cell cell = cells.get(i);
3607 List<Tag> newTags = Tag.carryForwardTags(null, cell);
3608 newTags = carryForwardTTLTag(newTags, m);
3609
3610
3611 cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
3612 cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3613 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
3614 cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
3615 cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
3616 newTags));
3617 }
3618 }
3619 }
3620
3621
3622
3623
3624
3625
3626
3627 private void checkResources() throws RegionTooBusyException {
3628
3629 if (this.getRegionInfo().isMetaRegion()) return;
3630
3631 if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3632 blockedRequestsCount.increment();
3633 requestFlush();
3634 throw new RegionTooBusyException("Above memstore limit, " +
3635 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3636 this.getRegionInfo().getRegionNameAsString()) +
3637 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3638 this.getRegionServerServices().getServerName()) +
3639 ", memstoreSize=" + memstoreSize.get() +
3640 ", blockingMemStoreSize=" + blockingMemStoreSize);
3641 }
3642 }
3643
3644
3645
3646
3647 protected void checkReadOnly() throws IOException {
3648 if (isReadOnly()) {
3649 throw new DoNotRetryIOException("region is read only");
3650 }
3651 }
3652
3653 protected void checkReadsEnabled() throws IOException {
3654 if (!this.writestate.readsEnabled) {
3655 throw new IOException(getRegionInfo().getEncodedName()
3656 + ": The region's reads are disabled. Cannot serve the request");
3657 }
3658 }
3659
3660 public void setReadsEnabled(boolean readsEnabled) {
3661 if (readsEnabled && !this.writestate.readsEnabled) {
3662 LOG.info(getRegionInfo().getEncodedName() + " : Enabling reads for region.");
3663 }
3664 this.writestate.setReadsEnabled(readsEnabled);
3665 }
3666
3667
3668
3669
3670
3671
3672
3673 private void put(final byte [] row, byte [] family, List<Cell> edits)
3674 throws IOException {
3675 NavigableMap<byte[], List<Cell>> familyMap;
3676 familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3677
3678 familyMap.put(family, edits);
3679 Put p = new Put(row);
3680 p.setFamilyCellMap(familyMap);
3681 doBatchMutate(p);
3682 }
3683
3684
3685
3686
3687
3688
3689
3690
3691
3692
3693
3694
3695
3696
3697
3698
3699 private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3700 long mvccNum, boolean isInReplay) throws IOException {
3701 long size = 0;
3702
3703 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3704 byte[] family = e.getKey();
3705 List<Cell> cells = e.getValue();
3706 assert cells instanceof RandomAccess;
3707 Store store = getStore(family);
3708 int listSize = cells.size();
3709 for (int i=0; i < listSize; i++) {
3710 Cell cell = cells.get(i);
3711 if (cell.getSequenceId() == 0 || isInReplay) {
3712 CellUtil.setSequenceId(cell, mvccNum);
3713 }
3714 size += store.add(cell);
3715 }
3716 }
3717
3718 return size;
3719 }
3720
3721
3722
3723
3724
3725
3726 private void rollbackMemstore(List<Cell> memstoreCells) {
3727 int kvsRolledback = 0;
3728
3729 for (Cell cell : memstoreCells) {
3730 byte[] family = CellUtil.cloneFamily(cell);
3731 Store store = getStore(family);
3732 store.rollback(cell);
3733 kvsRolledback++;
3734 }
3735 LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3736 }
3737
3738 @Override
3739 public void checkFamilies(Collection<byte[]> families) throws NoSuchColumnFamilyException {
3740 for (byte[] family : families) {
3741 checkFamily(family);
3742 }
3743 }
3744
3745
3746
3747
3748
3749 private void removeNonExistentColumnFamilyForReplay(
3750 final Map<byte[], List<Cell>> familyMap) {
3751 List<byte[]> nonExistentList = null;
3752 for (byte[] family : familyMap.keySet()) {
3753 if (!this.htableDescriptor.hasFamily(family)) {
3754 if (nonExistentList == null) {
3755 nonExistentList = new ArrayList<byte[]>();
3756 }
3757 nonExistentList.add(family);
3758 }
3759 }
3760 if (nonExistentList != null) {
3761 for (byte[] family : nonExistentList) {
3762
3763 LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3764 familyMap.remove(family);
3765 }
3766 }
3767 }
3768
3769 @Override
3770 public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now)
3771 throws FailedSanityCheckException {
3772 if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3773 return;
3774 }
3775 long maxTs = now + timestampSlop;
3776 for (List<Cell> kvs : familyMap.values()) {
3777 assert kvs instanceof RandomAccess;
3778 int listSize = kvs.size();
3779 for (int i=0; i < listSize; i++) {
3780 Cell cell = kvs.get(i);
3781
3782 long ts = cell.getTimestamp();
3783 if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) {
3784 throw new FailedSanityCheckException("Timestamp for KV out of range "
3785 + cell + " (too.new=" + timestampSlop + ")");
3786 }
3787 }
3788 }
3789 }
3790
3791
3792
3793
3794
3795
3796
3797 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3798 WALEdit walEdit) {
3799 for (List<Cell> edits : familyMap.values()) {
3800 assert edits instanceof RandomAccess;
3801 int listSize = edits.size();
3802 for (int i=0; i < listSize; i++) {
3803 Cell cell = edits.get(i);
3804 walEdit.add(cell);
3805 }
3806 }
3807 }
3808
3809 private void requestFlush() {
3810 if (this.rsServices == null) {
3811 return;
3812 }
3813 synchronized (writestate) {
3814 if (this.writestate.isFlushRequested()) {
3815 return;
3816 }
3817 writestate.flushRequested = true;
3818 }
3819
3820 this.rsServices.getFlushRequester().requestFlush(this, false);
3821 if (LOG.isDebugEnabled()) {
3822 LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
3823 }
3824 }
3825
3826
3827
3828
3829
3830 private boolean isFlushSize(final long size) {
3831 return size > this.memstoreFlushSize;
3832 }
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845
3846
3847
3848
3849
3850
3851
3852
3853
3854
3855
3856
3857
3858
3859
3860
3861
3862
3863
3864
3865
3866
3867 protected long replayRecoveredEditsIfAny(final Path regiondir,
3868 Map<byte[], Long> maxSeqIdInStores,
3869 final CancelableProgressable reporter, final MonitoredTask status)
3870 throws IOException {
3871 long minSeqIdForTheRegion = -1;
3872 for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3873 if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3874 minSeqIdForTheRegion = maxSeqIdInStore;
3875 }
3876 }
3877 long seqid = minSeqIdForTheRegion;
3878
3879 FileSystem fs = this.fs.getFileSystem();
3880 NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
3881 if (LOG.isDebugEnabled()) {
3882 LOG.debug("Found " + (files == null ? 0 : files.size())
3883 + " recovered edits file(s) under " + regiondir);
3884 }
3885
3886 if (files == null || files.isEmpty()) return seqid;
3887
3888 for (Path edits: files) {
3889 if (edits == null || !fs.exists(edits)) {
3890 LOG.warn("Null or non-existent edits file: " + edits);
3891 continue;
3892 }
3893 if (isZeroLengthThenDelete(fs, edits)) continue;
3894
3895 long maxSeqId;
3896 String fileName = edits.getName();
3897 maxSeqId = Math.abs(Long.parseLong(fileName));
3898 if (maxSeqId <= minSeqIdForTheRegion) {
3899 if (LOG.isDebugEnabled()) {
3900 String msg = "Maximum sequenceid for this wal is " + maxSeqId
3901 + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3902 + ", skipped the whole file, path=" + edits;
3903 LOG.debug(msg);
3904 }
3905 continue;
3906 }
3907
3908 try {
3909
3910
3911 seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3912 } catch (IOException e) {
3913 boolean skipErrors = conf.getBoolean(
3914 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3915 conf.getBoolean(
3916 "hbase.skip.errors",
3917 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3918 if (conf.get("hbase.skip.errors") != null) {
3919 LOG.warn(
3920 "The property 'hbase.skip.errors' has been deprecated. Please use " +
3921 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3922 }
3923 if (skipErrors) {
3924 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3925 LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3926 + "=true so continuing. Renamed " + edits +
3927 " as " + p, e);
3928 } else {
3929 throw e;
3930 }
3931 }
3932 }
3933
3934
3935 if (this.rsAccounting != null) {
3936 this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName());
3937 }
3938 if (seqid > minSeqIdForTheRegion) {
3939
3940 internalFlushcache(null, seqid, stores.values(), status, false);
3941 }
3942
3943 if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
3944
3945
3946
3947 String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
3948 Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
3949 for (Path file: files) {
3950 fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
3951 null, null));
3952 }
3953 getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
3954 } else {
3955 for (Path file: files) {
3956 if (!fs.delete(file, false)) {
3957 LOG.error("Failed delete of " + file);
3958 } else {
3959 LOG.debug("Deleted recovered.edits file=" + file);
3960 }
3961 }
3962 }
3963 return seqid;
3964 }
3965
3966
3967
3968
3969
3970
3971
3972
3973
3974
3975 private long replayRecoveredEdits(final Path edits,
3976 Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3977 throws IOException {
3978 String msg = "Replaying edits from " + edits;
3979 LOG.info(msg);
3980 MonitoredTask status = TaskMonitor.get().createStatus(msg);
3981 FileSystem fs = this.fs.getFileSystem();
3982
3983 status.setStatus("Opening recovered edits");
3984 WAL.Reader reader = null;
3985 try {
3986 reader = WALFactory.createReader(fs, edits, conf);
3987 long currentEditSeqId = -1;
3988 long currentReplaySeqId = -1;
3989 long firstSeqIdInLog = -1;
3990 long skippedEdits = 0;
3991 long editsCount = 0;
3992 long intervalEdits = 0;
3993 WAL.Entry entry;
3994 Store store = null;
3995 boolean reported_once = false;
3996 ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3997
3998 try {
3999
4000 int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
4001
4002 int period = this.conf.getInt("hbase.hstore.report.period", 300000);
4003 long lastReport = EnvironmentEdgeManager.currentTime();
4004
4005 while ((entry = reader.next()) != null) {
4006 WALKey key = entry.getKey();
4007 WALEdit val = entry.getEdit();
4008
4009 if (ng != null) {
4010 ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
4011 }
4012
4013 if (reporter != null) {
4014 intervalEdits += val.size();
4015 if (intervalEdits >= interval) {
4016
4017 intervalEdits = 0;
4018 long cur = EnvironmentEdgeManager.currentTime();
4019 if (lastReport + period <= cur) {
4020 status.setStatus("Replaying edits..." +
4021 " skipped=" + skippedEdits +
4022 " edits=" + editsCount);
4023
4024 if(!reporter.progress()) {
4025 msg = "Progressable reporter failed, stopping replay";
4026 LOG.warn(msg);
4027 status.abort(msg);
4028 throw new IOException(msg);
4029 }
4030 reported_once = true;
4031 lastReport = cur;
4032 }
4033 }
4034 }
4035
4036 if (firstSeqIdInLog == -1) {
4037 firstSeqIdInLog = key.getLogSeqNum();
4038 }
4039 if (currentEditSeqId > key.getLogSeqNum()) {
4040
4041
4042 LOG.error(getRegionInfo().getEncodedName() + " : "
4043 + "Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
4044 + "; edit=" + val);
4045 } else {
4046 currentEditSeqId = key.getLogSeqNum();
4047 }
4048 currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
4049 key.getOrigLogSeqNum() : currentEditSeqId;
4050
4051
4052
4053 if (coprocessorHost != null) {
4054 status.setStatus("Running pre-WAL-restore hook in coprocessors");
4055 if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
4056
4057 continue;
4058 }
4059 }
4060 boolean checkRowWithinBoundary = false;
4061
4062 if (!Bytes.equals(key.getEncodedRegionName(),
4063 this.getRegionInfo().getEncodedNameAsBytes())) {
4064 checkRowWithinBoundary = true;
4065 }
4066
4067 boolean flush = false;
4068 for (Cell cell: val.getCells()) {
4069
4070
4071 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
4072
4073 if (!checkRowWithinBoundary) {
4074
4075 CompactionDescriptor compaction = WALEdit.getCompaction(cell);
4076 if (compaction != null) {
4077
4078 replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
4079 }
4080 }
4081 skippedEdits++;
4082 continue;
4083 }
4084
4085 if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
4086 store = getStore(cell);
4087 }
4088 if (store == null) {
4089
4090
4091 LOG.warn("No family for " + cell);
4092 skippedEdits++;
4093 continue;
4094 }
4095 if (checkRowWithinBoundary && !rowIsInRange(this.getRegionInfo(),
4096 cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) {
4097 LOG.warn("Row of " + cell + " is not within region boundary");
4098 skippedEdits++;
4099 continue;
4100 }
4101
4102 if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
4103 .getName())) {
4104 skippedEdits++;
4105 continue;
4106 }
4107 CellUtil.setSequenceId(cell, currentReplaySeqId);
4108
4109
4110
4111
4112 flush |= restoreEdit(store, cell);
4113 editsCount++;
4114 }
4115 if (flush) {
4116 internalFlushcache(null, currentEditSeqId, stores.values(), status, false);
4117 }
4118
4119 if (coprocessorHost != null) {
4120 coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
4121 }
4122 }
4123 } catch (EOFException eof) {
4124 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4125 msg = "Encountered EOF. Most likely due to Master failure during " +
4126 "wal splitting, so we have this data in another edit. " +
4127 "Continuing, but renaming " + edits + " as " + p;
4128 LOG.warn(msg, eof);
4129 status.abort(msg);
4130 } catch (IOException ioe) {
4131
4132
4133 if (ioe.getCause() instanceof ParseException) {
4134 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4135 msg = "File corruption encountered! " +
4136 "Continuing, but renaming " + edits + " as " + p;
4137 LOG.warn(msg, ioe);
4138 status.setStatus(msg);
4139 } else {
4140 status.abort(StringUtils.stringifyException(ioe));
4141
4142
4143 throw ioe;
4144 }
4145 }
4146 if (reporter != null && !reported_once) {
4147 reporter.progress();
4148 }
4149 msg = "Applied " + editsCount + ", skipped " + skippedEdits +
4150 ", firstSequenceIdInLog=" + firstSeqIdInLog +
4151 ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
4152 status.markComplete(msg);
4153 LOG.debug(msg);
4154 return currentEditSeqId;
4155 } finally {
4156 status.cleanup();
4157 if (reader != null) {
4158 reader.close();
4159 }
4160 }
4161 }
4162
4163
4164
4165
4166
4167
4168 void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
4169 boolean removeFiles, long replaySeqId)
4170 throws IOException {
4171 try {
4172 checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
4173 "Compaction marker from WAL ", compaction);
4174 } catch (WrongRegionException wre) {
4175 if (RegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4176
4177 return;
4178 }
4179 throw wre;
4180 }
4181
4182 synchronized (writestate) {
4183 if (replaySeqId < lastReplayedOpenRegionSeqId) {
4184 LOG.warn(getRegionInfo().getEncodedName() + " : "
4185 + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4186 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4187 + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4188 return;
4189 }
4190 if (replaySeqId < lastReplayedCompactionSeqId) {
4191 LOG.warn(getRegionInfo().getEncodedName() + " : "
4192 + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4193 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4194 + "lastReplayedCompactionSeqId of " + lastReplayedCompactionSeqId);
4195 return;
4196 } else {
4197 lastReplayedCompactionSeqId = replaySeqId;
4198 }
4199
4200 if (LOG.isDebugEnabled()) {
4201 LOG.debug(getRegionInfo().getEncodedName() + " : "
4202 + "Replaying compaction marker " + TextFormat.shortDebugString(compaction)
4203 + " with seqId=" + replaySeqId + " and lastReplayedOpenRegionSeqId="
4204 + lastReplayedOpenRegionSeqId);
4205 }
4206
4207 startRegionOperation(Operation.REPLAY_EVENT);
4208 try {
4209 Store store = this.getStore(compaction.getFamilyName().toByteArray());
4210 if (store == null) {
4211 LOG.warn(getRegionInfo().getEncodedName() + " : "
4212 + "Found Compaction WAL edit for deleted family:"
4213 + Bytes.toString(compaction.getFamilyName().toByteArray()));
4214 return;
4215 }
4216 store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
4217 logRegionFiles();
4218 } catch (FileNotFoundException ex) {
4219 LOG.warn(getRegionInfo().getEncodedName() + " : "
4220 + "At least one of the store files in compaction: "
4221 + TextFormat.shortDebugString(compaction)
4222 + " doesn't exist any more. Skip loading the file(s)", ex);
4223 } finally {
4224 closeRegionOperation(Operation.REPLAY_EVENT);
4225 }
4226 }
4227 }
4228
4229 void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
4230 checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
4231 "Flush marker from WAL ", flush);
4232
4233 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4234 return;
4235 }
4236
4237 if (LOG.isDebugEnabled()) {
4238 LOG.debug(getRegionInfo().getEncodedName() + " : "
4239 + "Replaying flush marker " + TextFormat.shortDebugString(flush));
4240 }
4241
4242 startRegionOperation(Operation.REPLAY_EVENT);
4243 try {
4244 FlushAction action = flush.getAction();
4245 switch (action) {
4246 case START_FLUSH:
4247 replayWALFlushStartMarker(flush);
4248 break;
4249 case COMMIT_FLUSH:
4250 replayWALFlushCommitMarker(flush);
4251 break;
4252 case ABORT_FLUSH:
4253 replayWALFlushAbortMarker(flush);
4254 break;
4255 case CANNOT_FLUSH:
4256 replayWALFlushCannotFlushMarker(flush, replaySeqId);
4257 break;
4258 default:
4259 LOG.warn(getRegionInfo().getEncodedName() + " : " +
4260 "Received a flush event with unknown action, ignoring. " +
4261 TextFormat.shortDebugString(flush));
4262 break;
4263 }
4264
4265 logRegionFiles();
4266 } finally {
4267 closeRegionOperation(Operation.REPLAY_EVENT);
4268 }
4269 }
4270
4271
4272
4273
4274
4275 @VisibleForTesting
4276 PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
4277 long flushSeqId = flush.getFlushSequenceNumber();
4278
4279 HashSet<Store> storesToFlush = new HashSet<Store>();
4280 for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4281 byte[] family = storeFlush.getFamilyName().toByteArray();
4282 Store store = getStore(family);
4283 if (store == null) {
4284 LOG.warn(getRegionInfo().getEncodedName() + " : "
4285 + "Received a flush start marker from primary, but the family is not found. Ignoring"
4286 + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
4287 continue;
4288 }
4289 storesToFlush.add(store);
4290 }
4291
4292 MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
4293
4294
4295
4296 synchronized (writestate) {
4297 try {
4298 if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4299 LOG.warn(getRegionInfo().getEncodedName() + " : "
4300 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4301 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4302 + " of " + lastReplayedOpenRegionSeqId);
4303 return null;
4304 }
4305 if (numMutationsWithoutWAL.get() > 0) {
4306 numMutationsWithoutWAL.set(0);
4307 dataInMemoryWithoutWAL.set(0);
4308 }
4309
4310 if (!writestate.flushing) {
4311
4312
4313
4314
4315 PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
4316 flushSeqId, storesToFlush, status, false);
4317 if (prepareResult.result == null) {
4318
4319 this.writestate.flushing = true;
4320 this.prepareFlushResult = prepareResult;
4321 status.markComplete("Flush prepare successful");
4322 if (LOG.isDebugEnabled()) {
4323 LOG.debug(getRegionInfo().getEncodedName() + " : "
4324 + " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
4325 }
4326 } else {
4327
4328
4329 if (prepareResult.getResult().getResult() ==
4330 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
4331 this.writestate.flushing = true;
4332 this.prepareFlushResult = prepareResult;
4333 if (LOG.isDebugEnabled()) {
4334 LOG.debug(getRegionInfo().getEncodedName() + " : "
4335 + " Prepared empty flush with seqId:" + flush.getFlushSequenceNumber());
4336 }
4337 }
4338 status.abort("Flush prepare failed with " + prepareResult.result);
4339
4340 }
4341 return prepareResult;
4342 } else {
4343
4344 if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
4345
4346 LOG.warn(getRegionInfo().getEncodedName() + " : "
4347 + "Received a flush prepare marker with the same seqId: " +
4348 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4349 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4350
4351 } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
4352
4353
4354 LOG.warn(getRegionInfo().getEncodedName() + " : "
4355 + "Received a flush prepare marker with a smaller seqId: " +
4356 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4357 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4358
4359 } else {
4360
4361 LOG.warn(getRegionInfo().getEncodedName() + " : "
4362 + "Received a flush prepare marker with a larger seqId: " +
4363 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4364 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4365
4366
4367
4368
4369
4370
4371
4372
4373
4374
4375
4376 }
4377 }
4378 } finally {
4379 status.cleanup();
4380 writestate.notifyAll();
4381 }
4382 }
4383 return null;
4384 }
4385
4386 @VisibleForTesting
4387 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
4388 justification="Intentional; post memstore flush")
4389 void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
4390 MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
4391
4392
4393
4394
4395
4396 synchronized (writestate) {
4397 try {
4398 if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4399 LOG.warn(getRegionInfo().getEncodedName() + " : "
4400 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4401 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4402 + " of " + lastReplayedOpenRegionSeqId);
4403 return;
4404 }
4405
4406 if (writestate.flushing) {
4407 PrepareFlushResult prepareFlushResult = this.prepareFlushResult;
4408 if (flush.getFlushSequenceNumber() == prepareFlushResult.flushOpSeqId) {
4409 if (LOG.isDebugEnabled()) {
4410 LOG.debug(getRegionInfo().getEncodedName() + " : "
4411 + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4412 + " and a previous prepared snapshot was found");
4413 }
4414
4415
4416 replayFlushInStores(flush, prepareFlushResult, true);
4417
4418
4419 this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4420
4421 this.prepareFlushResult = null;
4422 writestate.flushing = false;
4423 } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
4424
4425
4426
4427
4428 LOG.warn(getRegionInfo().getEncodedName() + " : "
4429 + "Received a flush commit marker with smaller seqId: "
4430 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
4431 + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
4432 +" prepared memstore snapshot");
4433 replayFlushInStores(flush, prepareFlushResult, false);
4434
4435
4436
4437 } else {
4438
4439
4440
4441
4442
4443 LOG.warn(getRegionInfo().getEncodedName() + " : "
4444 + "Received a flush commit marker with larger seqId: "
4445 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
4446 prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
4447 +" memstore snapshot");
4448
4449 replayFlushInStores(flush, prepareFlushResult, true);
4450
4451
4452 this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4453
4454
4455
4456 dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4457
4458 this.prepareFlushResult = null;
4459 writestate.flushing = false;
4460 }
4461
4462
4463
4464
4465
4466 this.setReadsEnabled(true);
4467 } else {
4468 LOG.warn(getRegionInfo().getEncodedName() + " : "
4469 + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4470 + ", but no previous prepared snapshot was found");
4471
4472
4473 replayFlushInStores(flush, null, false);
4474
4475
4476
4477 dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4478 }
4479
4480 status.markComplete("Flush commit successful");
4481
4482
4483 this.maxFlushedSeqId = flush.getFlushSequenceNumber();
4484
4485
4486 mvcc.advanceTo(flush.getFlushSequenceNumber());
4487
4488 } catch (FileNotFoundException ex) {
4489 LOG.warn(getRegionInfo().getEncodedName() + " : "
4490 + "At least one of the store files in flush: " + TextFormat.shortDebugString(flush)
4491 + " doesn't exist any more. Skip loading the file(s)", ex);
4492 }
4493 finally {
4494 status.cleanup();
4495 writestate.notifyAll();
4496 }
4497 }
4498
4499
4500
4501 synchronized (this) {
4502 notifyAll();
4503 }
4504 }
4505
4506
4507
4508
4509
4510
4511
4512
4513
4514 private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
4515 boolean dropMemstoreSnapshot)
4516 throws IOException {
4517 for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4518 byte[] family = storeFlush.getFamilyName().toByteArray();
4519 Store store = getStore(family);
4520 if (store == null) {
4521 LOG.warn(getRegionInfo().getEncodedName() + " : "
4522 + "Received a flush commit marker from primary, but the family is not found."
4523 + "Ignoring StoreFlushDescriptor:" + storeFlush);
4524 continue;
4525 }
4526 List<String> flushFiles = storeFlush.getFlushOutputList();
4527 StoreFlushContext ctx = null;
4528 long startTime = EnvironmentEdgeManager.currentTime();
4529 if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) {
4530 ctx = store.createFlushContext(flush.getFlushSequenceNumber());
4531 } else {
4532 ctx = prepareFlushResult.storeFlushCtxs.get(family);
4533 startTime = prepareFlushResult.startTime;
4534 }
4535
4536 if (ctx == null) {
4537 LOG.warn(getRegionInfo().getEncodedName() + " : "
4538 + "Unexpected: flush commit marker received from store "
4539 + Bytes.toString(family) + " but no associated flush context. Ignoring");
4540 continue;
4541 }
4542
4543 ctx.replayFlush(flushFiles, dropMemstoreSnapshot);
4544
4545
4546 this.lastStoreFlushTimeMap.put(store, startTime);
4547 }
4548 }
4549
4550
4551
4552
4553
4554
4555 private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
4556 long totalFreedSize = 0;
4557 this.updatesLock.writeLock().lock();
4558 try {
4559
4560 long currentSeqId = mvcc.getReadPoint();
4561 if (seqId >= currentSeqId) {
4562
4563 LOG.info(getRegionInfo().getEncodedName() + " : "
4564 + "Dropping memstore contents as well since replayed flush seqId: "
4565 + seqId + " is greater than current seqId:" + currentSeqId);
4566
4567
4568 if (store == null) {
4569 for (Store s : stores.values()) {
4570 totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
4571 }
4572 } else {
4573 totalFreedSize += doDropStoreMemstoreContentsForSeqId(store, currentSeqId);
4574 }
4575 } else {
4576 LOG.info(getRegionInfo().getEncodedName() + " : "
4577 + "Not dropping memstore contents since replayed flush seqId: "
4578 + seqId + " is smaller than current seqId:" + currentSeqId);
4579 }
4580 } finally {
4581 this.updatesLock.writeLock().unlock();
4582 }
4583 return totalFreedSize;
4584 }
4585
4586 private long doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
4587 long snapshotSize = s.getFlushableSize();
4588 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4589 StoreFlushContext ctx = s.createFlushContext(currentSeqId);
4590 ctx.prepare();
4591 ctx.abort();
4592 return snapshotSize;
4593 }
4594
4595 private void replayWALFlushAbortMarker(FlushDescriptor flush) {
4596
4597
4598
4599 }
4600
4601 private void replayWALFlushCannotFlushMarker(FlushDescriptor flush, long replaySeqId) {
4602 synchronized (writestate) {
4603 if (this.lastReplayedOpenRegionSeqId > replaySeqId) {
4604 LOG.warn(getRegionInfo().getEncodedName() + " : "
4605 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4606 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4607 + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4608 return;
4609 }
4610
4611
4612
4613
4614
4615
4616 this.setReadsEnabled(true);
4617 }
4618 }
4619
4620 @VisibleForTesting
4621 PrepareFlushResult getPrepareFlushResult() {
4622 return prepareFlushResult;
4623 }
4624
4625 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
4626 justification="Intentional; cleared the memstore")
4627 void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
4628 checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
4629 "RegionEvent marker from WAL ", regionEvent);
4630
4631 startRegionOperation(Operation.REPLAY_EVENT);
4632 try {
4633 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4634 return;
4635 }
4636
4637 if (regionEvent.getEventType() == EventType.REGION_CLOSE) {
4638
4639 return;
4640 }
4641 if (regionEvent.getEventType() != EventType.REGION_OPEN) {
4642 LOG.warn(getRegionInfo().getEncodedName() + " : "
4643 + "Unknown region event received, ignoring :"
4644 + TextFormat.shortDebugString(regionEvent));
4645 return;
4646 }
4647
4648 if (LOG.isDebugEnabled()) {
4649 LOG.debug(getRegionInfo().getEncodedName() + " : "
4650 + "Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
4651 }
4652
4653
4654 synchronized (writestate) {
4655
4656
4657
4658
4659
4660
4661 if (this.lastReplayedOpenRegionSeqId <= regionEvent.getLogSequenceNumber()) {
4662 this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
4663 } else {
4664 LOG.warn(getRegionInfo().getEncodedName() + " : "
4665 + "Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
4666 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4667 + " of " + lastReplayedOpenRegionSeqId);
4668 return;
4669 }
4670
4671
4672
4673 for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
4674
4675 byte[] family = storeDescriptor.getFamilyName().toByteArray();
4676 Store store = getStore(family);
4677 if (store == null) {
4678 LOG.warn(getRegionInfo().getEncodedName() + " : "
4679 + "Received a region open marker from primary, but the family is not found. "
4680 + "Ignoring. StoreDescriptor:" + storeDescriptor);
4681 continue;
4682 }
4683
4684 long storeSeqId = store.getMaxSequenceId();
4685 List<String> storeFiles = storeDescriptor.getStoreFileList();
4686 try {
4687 store.refreshStoreFiles(storeFiles);
4688 } catch (FileNotFoundException ex) {
4689 LOG.warn(getRegionInfo().getEncodedName() + " : "
4690 + "At least one of the store files: " + storeFiles
4691 + " doesn't exist any more. Skip loading the file(s)", ex);
4692 continue;
4693 }
4694 if (store.getMaxSequenceId() != storeSeqId) {
4695
4696 lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
4697 }
4698
4699 if (writestate.flushing) {
4700
4701 if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
4702 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
4703 null : this.prepareFlushResult.storeFlushCtxs.get(family);
4704 if (ctx != null) {
4705 long snapshotSize = store.getFlushableSize();
4706 ctx.abort();
4707 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4708 this.prepareFlushResult.storeFlushCtxs.remove(family);
4709 }
4710 }
4711 }
4712
4713
4714 dropMemstoreContentsForSeqId(regionEvent.getLogSequenceNumber(), store);
4715 if (storeSeqId > this.maxFlushedSeqId) {
4716 this.maxFlushedSeqId = storeSeqId;
4717 }
4718 }
4719
4720
4721
4722 dropPrepareFlushIfPossible();
4723
4724
4725 mvcc.await();
4726
4727
4728
4729 this.setReadsEnabled(true);
4730
4731
4732
4733 synchronized (this) {
4734 notifyAll();
4735 }
4736 }
4737 logRegionFiles();
4738 } finally {
4739 closeRegionOperation(Operation.REPLAY_EVENT);
4740 }
4741 }
4742
4743 void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException {
4744 checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(),
4745 "BulkLoad marker from WAL ", bulkLoadEvent);
4746
4747 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4748 return;
4749 }
4750
4751 if (LOG.isDebugEnabled()) {
4752 LOG.debug(getRegionInfo().getEncodedName() + " : "
4753 + "Replaying bulkload event marker " + TextFormat.shortDebugString(bulkLoadEvent));
4754 }
4755
4756 boolean multipleFamilies = false;
4757 byte[] family = null;
4758 for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
4759 byte[] fam = storeDescriptor.getFamilyName().toByteArray();
4760 if (family == null) {
4761 family = fam;
4762 } else if (!Bytes.equals(family, fam)) {
4763 multipleFamilies = true;
4764 break;
4765 }
4766 }
4767
4768 startBulkRegionOperation(multipleFamilies);
4769 try {
4770
4771 synchronized (writestate) {
4772
4773
4774
4775
4776
4777
4778 if (bulkLoadEvent.getBulkloadSeqNum() >= 0
4779 && this.lastReplayedOpenRegionSeqId >= bulkLoadEvent.getBulkloadSeqNum()) {
4780 LOG.warn(getRegionInfo().getEncodedName() + " : "
4781 + "Skipping replaying bulkload event :"
4782 + TextFormat.shortDebugString(bulkLoadEvent)
4783 + " because its sequence id is smaller than this region's lastReplayedOpenRegionSeqId"
4784 + " =" + lastReplayedOpenRegionSeqId);
4785
4786 return;
4787 }
4788
4789 for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
4790
4791 family = storeDescriptor.getFamilyName().toByteArray();
4792 Store store = getStore(family);
4793 if (store == null) {
4794 LOG.warn(getRegionInfo().getEncodedName() + " : "
4795 + "Received a bulk load marker from primary, but the family is not found. "
4796 + "Ignoring. StoreDescriptor:" + storeDescriptor);
4797 continue;
4798 }
4799
4800 List<String> storeFiles = storeDescriptor.getStoreFileList();
4801 for (String storeFile : storeFiles) {
4802 StoreFileInfo storeFileInfo = null;
4803 try {
4804 storeFileInfo = fs.getStoreFileInfo(Bytes.toString(family), storeFile);
4805 store.bulkLoadHFile(storeFileInfo);
4806 } catch(FileNotFoundException ex) {
4807 LOG.warn(getRegionInfo().getEncodedName() + " : "
4808 + ((storeFileInfo != null) ? storeFileInfo.toString() :
4809 (new Path(Bytes.toString(family), storeFile)).toString())
4810 + " doesn't exist any more. Skip loading the file");
4811 }
4812 }
4813 }
4814 }
4815 if (bulkLoadEvent.getBulkloadSeqNum() > 0) {
4816 mvcc.advanceTo(bulkLoadEvent.getBulkloadSeqNum());
4817 }
4818 } finally {
4819 closeBulkRegionOperation();
4820 }
4821 }
4822
4823
4824
4825
4826 private void dropPrepareFlushIfPossible() {
4827 if (writestate.flushing) {
4828 boolean canDrop = true;
4829 if (prepareFlushResult.storeFlushCtxs != null) {
4830 for (Entry<byte[], StoreFlushContext> entry
4831 : prepareFlushResult.storeFlushCtxs.entrySet()) {
4832 Store store = getStore(entry.getKey());
4833 if (store == null) {
4834 continue;
4835 }
4836 if (store.getSnapshotSize() > 0) {
4837 canDrop = false;
4838 break;
4839 }
4840 }
4841 }
4842
4843
4844
4845 if (canDrop) {
4846 writestate.flushing = false;
4847 this.prepareFlushResult = null;
4848 }
4849 }
4850 }
4851
4852 @Override
4853 public boolean refreshStoreFiles() throws IOException {
4854 return refreshStoreFiles(false);
4855 }
4856
4857 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
4858 justification="Notify is about post replay. Intentional")
4859 protected boolean refreshStoreFiles(boolean force) throws IOException {
4860 if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4861 return false;
4862 }
4863
4864 if (LOG.isDebugEnabled()) {
4865 LOG.debug(getRegionInfo().getEncodedName() + " : "
4866 + "Refreshing store files to see whether we can free up memstore");
4867 }
4868
4869 long totalFreedSize = 0;
4870
4871 long smallestSeqIdInStores = Long.MAX_VALUE;
4872
4873 startRegionOperation();
4874 try {
4875 synchronized (writestate) {
4876 for (Store store : getStores()) {
4877
4878
4879 long maxSeqIdBefore = store.getMaxSequenceId();
4880
4881
4882 store.refreshStoreFiles();
4883
4884 long storeSeqId = store.getMaxSequenceId();
4885 if (storeSeqId < smallestSeqIdInStores) {
4886 smallestSeqIdInStores = storeSeqId;
4887 }
4888
4889
4890 if (storeSeqId > maxSeqIdBefore) {
4891
4892 if (writestate.flushing) {
4893
4894 if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) {
4895 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
4896 null : this.prepareFlushResult.storeFlushCtxs.get(store.getFamily().getName());
4897 if (ctx != null) {
4898 long snapshotSize = store.getFlushableSize();
4899 ctx.abort();
4900 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4901 this.prepareFlushResult.storeFlushCtxs.remove(store.getFamily().getName());
4902 totalFreedSize += snapshotSize;
4903 }
4904 }
4905 }
4906
4907
4908 totalFreedSize += dropMemstoreContentsForSeqId(storeSeqId, store);
4909 }
4910 }
4911
4912
4913
4914 dropPrepareFlushIfPossible();
4915
4916
4917
4918 for (Store s : getStores()) {
4919 mvcc.advanceTo(s.getMaxMemstoreTS());
4920 }
4921
4922
4923
4924
4925
4926
4927 if (this.lastReplayedOpenRegionSeqId < smallestSeqIdInStores) {
4928 this.lastReplayedOpenRegionSeqId = smallestSeqIdInStores;
4929 }
4930 }
4931
4932
4933 synchronized (this) {
4934 notifyAll();
4935 }
4936 return totalFreedSize > 0;
4937 } finally {
4938 closeRegionOperation();
4939 }
4940 }
4941
4942 private void logRegionFiles() {
4943 if (LOG.isTraceEnabled()) {
4944 LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: ");
4945 for (Store s : stores.values()) {
4946 Collection<StoreFile> storeFiles = s.getStorefiles();
4947 if (storeFiles == null) continue;
4948 for (StoreFile sf : storeFiles) {
4949 LOG.trace(getRegionInfo().getEncodedName() + " : " + sf);
4950 }
4951 }
4952 }
4953 }
4954
4955
4956
4957
4958 private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload)
4959 throws WrongRegionException {
4960 if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) {
4961 return;
4962 }
4963
4964 if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) &&
4965 Bytes.equals(encodedRegionName,
4966 this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) {
4967 return;
4968 }
4969
4970 throw new WrongRegionException(exceptionMsg + payload
4971 + " targetted for region " + Bytes.toStringBinary(encodedRegionName)
4972 + " does not match this region: " + this.getRegionInfo());
4973 }
4974
4975
4976
4977
4978
4979
4980
4981 protected boolean restoreEdit(final Store s, final Cell cell) {
4982 long kvSize = s.add(cell);
4983 if (this.rsAccounting != null) {
4984 rsAccounting.addAndGetRegionReplayEditsSize(getRegionInfo().getRegionName(), kvSize);
4985 }
4986 return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
4987 }
4988
4989
4990
4991
4992
4993
4994
4995 private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
4996 throws IOException {
4997 FileStatus stat = fs.getFileStatus(p);
4998 if (stat.getLen() > 0) return false;
4999 LOG.warn("File " + p + " is zero-length, deleting.");
5000 fs.delete(p, false);
5001 return true;
5002 }
5003
5004 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
5005 return new HStore(this, family, this.conf);
5006 }
5007
5008 @Override
5009 public Store getStore(final byte[] column) {
5010 return this.stores.get(column);
5011 }
5012
5013
5014
5015
5016
5017 private Store getStore(Cell cell) {
5018 for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
5019 if (Bytes.equals(
5020 cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
5021 famStore.getKey(), 0, famStore.getKey().length)) {
5022 return famStore.getValue();
5023 }
5024 }
5025
5026 return null;
5027 }
5028
5029 @Override
5030 public List<Store> getStores() {
5031 List<Store> list = new ArrayList<Store>(stores.size());
5032 list.addAll(stores.values());
5033 return list;
5034 }
5035
5036 @Override
5037 public List<String> getStoreFileList(final byte [][] columns)
5038 throws IllegalArgumentException {
5039 List<String> storeFileNames = new ArrayList<String>();
5040 synchronized(closeLock) {
5041 for(byte[] column : columns) {
5042 Store store = this.stores.get(column);
5043 if (store == null) {
5044 throw new IllegalArgumentException("No column family : " +
5045 new String(column) + " available");
5046 }
5047 Collection<StoreFile> storeFiles = store.getStorefiles();
5048 if (storeFiles == null) continue;
5049 for (StoreFile storeFile: storeFiles) {
5050 storeFileNames.add(storeFile.getPath().toString());
5051 }
5052
5053 logRegionFiles();
5054 }
5055 }
5056 return storeFileNames;
5057 }
5058
5059
5060
5061
5062
5063
5064 void checkRow(final byte [] row, String op) throws IOException {
5065 if (!rowIsInRange(getRegionInfo(), row)) {
5066 throw new WrongRegionException("Requested row out of range for " +
5067 op + " on HRegion " + this + ", startKey='" +
5068 Bytes.toStringBinary(getRegionInfo().getStartKey()) + "', getEndKey()='" +
5069 Bytes.toStringBinary(getRegionInfo().getEndKey()) + "', row='" +
5070 Bytes.toStringBinary(row) + "'");
5071 }
5072 }
5073
5074
5075
5076
5077
5078
5079
5080
5081 public RowLock getRowLock(byte[] row) throws IOException {
5082 return getRowLock(row, false);
5083 }
5084
5085
5086
5087
5088
5089
5090
5091
5092
5093
5094
5095 public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
5096
5097 checkRow(row, "row lock");
5098
5099 HashedBytes rowKey = new HashedBytes(row);
5100
5101 RowLockContext rowLockContext = null;
5102 RowLockImpl result = null;
5103 TraceScope traceScope = null;
5104
5105
5106 if (Trace.isTracing()) {
5107 traceScope = Trace.startSpan("HRegion.getRowLock");
5108 traceScope.getSpan().addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock"));
5109 }
5110
5111 try {
5112
5113
5114 while (result == null) {
5115
5116
5117
5118 rowLockContext = new RowLockContext(rowKey);
5119 RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
5120
5121
5122 if (existingContext != null) {
5123 rowLockContext = existingContext;
5124 }
5125
5126
5127
5128
5129 if (readLock) {
5130 result = rowLockContext.newReadLock();
5131 } else {
5132 result = rowLockContext.newWriteLock();
5133 }
5134 }
5135 if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
5136 if (traceScope != null) {
5137 traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
5138 }
5139 result = null;
5140
5141 rowLockContext.cleanUp();
5142 throw new IOException("Timed out waiting for lock for row: " + rowKey);
5143 }
5144 return result;
5145 } catch (InterruptedException ie) {
5146 LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
5147 InterruptedIOException iie = new InterruptedIOException();
5148 iie.initCause(ie);
5149 if (traceScope != null) {
5150 traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock");
5151 }
5152 Thread.currentThread().interrupt();
5153 throw iie;
5154 } finally {
5155 if (traceScope != null) {
5156 traceScope.close();
5157 }
5158 }
5159 }
5160
5161 @Override
5162 public void releaseRowLocks(List<RowLock> rowLocks) {
5163 if (rowLocks != null) {
5164 for (RowLock rowLock : rowLocks) {
5165 rowLock.release();
5166 }
5167 rowLocks.clear();
5168 }
5169 }
5170
5171 @VisibleForTesting
5172 class RowLockContext {
5173 private final HashedBytes row;
5174 final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
5175 final AtomicBoolean usable = new AtomicBoolean(true);
5176 final AtomicInteger count = new AtomicInteger(0);
5177 final Object lock = new Object();
5178
5179 RowLockContext(HashedBytes row) {
5180 this.row = row;
5181 }
5182
5183 RowLockImpl newWriteLock() {
5184 Lock l = readWriteLock.writeLock();
5185 return getRowLock(l);
5186 }
5187 RowLockImpl newReadLock() {
5188 Lock l = readWriteLock.readLock();
5189 return getRowLock(l);
5190 }
5191
5192 private RowLockImpl getRowLock(Lock l) {
5193 count.incrementAndGet();
5194 synchronized (lock) {
5195 if (usable.get()) {
5196 return new RowLockImpl(this, l);
5197 } else {
5198 return null;
5199 }
5200 }
5201 }
5202
5203 void cleanUp() {
5204 long c = count.decrementAndGet();
5205 if (c <= 0) {
5206 synchronized (lock) {
5207 if (count.get() <= 0 ){
5208 usable.set(false);
5209 RowLockContext removed = lockedRows.remove(row);
5210 assert removed == this: "we should never remove a different context";
5211 }
5212 }
5213 }
5214 }
5215
5216 @Override
5217 public String toString() {
5218 return "RowLockContext{" +
5219 "row=" + row +
5220 ", readWriteLock=" + readWriteLock +
5221 ", count=" + count +
5222 '}';
5223 }
5224 }
5225
5226
5227
5228
5229 public static class RowLockImpl implements RowLock {
5230 private final RowLockContext context;
5231 private final Lock lock;
5232
5233 public RowLockImpl(RowLockContext context, Lock lock) {
5234 this.context = context;
5235 this.lock = lock;
5236 }
5237
5238 public Lock getLock() {
5239 return lock;
5240 }
5241
5242 @VisibleForTesting
5243 public RowLockContext getContext() {
5244 return context;
5245 }
5246
5247 @Override
5248 public void release() {
5249 lock.unlock();
5250 context.cleanUp();
5251 }
5252
5253 @Override
5254 public String toString() {
5255 return "RowLockImpl{" +
5256 "context=" + context +
5257 ", lock=" + lock +
5258 '}';
5259 }
5260 }
5261
5262
5263
5264
5265
5266
5267
5268 private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>> familyPaths) {
5269 boolean multipleFamilies = false;
5270 byte[] family = null;
5271 for (Pair<byte[], String> pair : familyPaths) {
5272 byte[] fam = pair.getFirst();
5273 if (family == null) {
5274 family = fam;
5275 } else if (!Bytes.equals(family, fam)) {
5276 multipleFamilies = true;
5277 break;
5278 }
5279 }
5280 return multipleFamilies;
5281 }
5282
5283 @Override
5284 public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
5285 BulkLoadListener bulkLoadListener) throws IOException {
5286 long seqId = -1;
5287 Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
5288 Preconditions.checkNotNull(familyPaths);
5289
5290 startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
5291 try {
5292 this.writeRequestsCount.increment();
5293
5294
5295
5296
5297 List<IOException> ioes = new ArrayList<IOException>();
5298 List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
5299 for (Pair<byte[], String> p : familyPaths) {
5300 byte[] familyName = p.getFirst();
5301 String path = p.getSecond();
5302
5303 Store store = getStore(familyName);
5304 if (store == null) {
5305 IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
5306 "No such column family " + Bytes.toStringBinary(familyName));
5307 ioes.add(ioe);
5308 } else {
5309 try {
5310 store.assertBulkLoadHFileOk(new Path(path));
5311 } catch (WrongRegionException wre) {
5312
5313 failures.add(p);
5314 } catch (IOException ioe) {
5315
5316 ioes.add(ioe);
5317 }
5318 }
5319 }
5320
5321
5322 if (ioes.size() != 0) {
5323 IOException e = MultipleIOException.createIOException(ioes);
5324 LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
5325 throw e;
5326 }
5327
5328
5329 if (failures.size() != 0) {
5330 StringBuilder list = new StringBuilder();
5331 for (Pair<byte[], String> p : failures) {
5332 list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
5333 .append(p.getSecond());
5334 }
5335
5336 LOG.warn("There was a recoverable bulk load failure likely due to a" +
5337 " split. These (family, HFile) pairs were not loaded: " + list);
5338 return false;
5339 }
5340
5341
5342
5343
5344
5345
5346 if (assignSeqId) {
5347 FlushResult fs = flushcache(true, false);
5348 if (fs.isFlushSucceeded()) {
5349 seqId = ((FlushResultImpl)fs).flushSequenceId;
5350 } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
5351 seqId = ((FlushResultImpl)fs).flushSequenceId;
5352 } else {
5353 throw new IOException("Could not bulk load with an assigned sequential ID because the "+
5354 "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);
5355 }
5356 }
5357
5358 for (Pair<byte[], String> p : familyPaths) {
5359 byte[] familyName = p.getFirst();
5360 String path = p.getSecond();
5361 Store store = getStore(familyName);
5362 try {
5363 String finalPath = path;
5364 if (bulkLoadListener != null) {
5365 finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
5366 }
5367 Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
5368
5369 if(storeFiles.containsKey(familyName)) {
5370 storeFiles.get(familyName).add(commitedStoreFile);
5371 } else {
5372 List<Path> storeFileNames = new ArrayList<Path>();
5373 storeFileNames.add(commitedStoreFile);
5374 storeFiles.put(familyName, storeFileNames);
5375 }
5376 if (bulkLoadListener != null) {
5377 bulkLoadListener.doneBulkLoad(familyName, path);
5378 }
5379 } catch (IOException ioe) {
5380
5381
5382
5383
5384 LOG.error("There was a partial failure due to IO when attempting to" +
5385 " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
5386 if (bulkLoadListener != null) {
5387 try {
5388 bulkLoadListener.failedBulkLoad(familyName, path);
5389 } catch (Exception ex) {
5390 LOG.error("Error while calling failedBulkLoad for family " +
5391 Bytes.toString(familyName) + " with path " + path, ex);
5392 }
5393 }
5394 throw ioe;
5395 }
5396 }
5397
5398 return true;
5399 } finally {
5400 if (wal != null && !storeFiles.isEmpty()) {
5401
5402 try {
5403 WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
5404 this.getRegionInfo().getTable(),
5405 ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
5406 WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
5407 loadDescriptor, mvcc);
5408 } catch (IOException ioe) {
5409 if (this.rsServices != null) {
5410
5411
5412 this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);
5413 }
5414 }
5415 }
5416
5417 closeBulkRegionOperation();
5418 }
5419 }
5420
5421 @Override
5422 public boolean equals(Object o) {
5423 return o instanceof HRegion && Bytes.equals(getRegionInfo().getRegionName(),
5424 ((HRegion) o).getRegionInfo().getRegionName());
5425 }
5426
5427 @Override
5428 public int hashCode() {
5429 return Bytes.hashCode(getRegionInfo().getRegionName());
5430 }
5431
5432 @Override
5433 public String toString() {
5434 return getRegionInfo().getRegionNameAsString();
5435 }
5436
5437
5438
5439
5440 class RegionScannerImpl implements RegionScanner {
5441
5442 KeyValueHeap storeHeap = null;
5443
5444
5445 KeyValueHeap joinedHeap = null;
5446
5447
5448
5449 protected Cell joinedContinuationRow = null;
5450 private boolean filterClosed = false;
5451
5452 protected final int isScan;
5453 protected final byte[] stopRow;
5454 protected final HRegion region;
5455
5456 private final long readPt;
5457 private final long maxResultSize;
5458 private final ScannerContext defaultScannerContext;
5459 private final FilterWrapper filter;
5460
5461 @Override
5462 public HRegionInfo getRegionInfo() {
5463 return region.getRegionInfo();
5464 }
5465
5466 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
5467 throws IOException {
5468 this.region = region;
5469 this.maxResultSize = scan.getMaxResultSize();
5470 if (scan.hasFilter()) {
5471 this.filter = new FilterWrapper(scan.getFilter());
5472 } else {
5473 this.filter = null;
5474 }
5475
5476
5477
5478
5479
5480
5481 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
5482
5483 if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
5484 this.stopRow = null;
5485 } else {
5486 this.stopRow = scan.getStopRow();
5487 }
5488
5489
5490 this.isScan = scan.isGetScan() ? -1 : 0;
5491
5492
5493
5494 IsolationLevel isolationLevel = scan.getIsolationLevel();
5495 synchronized(scannerReadPoints) {
5496 this.readPt = getReadpoint(isolationLevel);
5497 scannerReadPoints.put(this, this.readPt);
5498 }
5499
5500
5501
5502 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
5503 List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
5504 if (additionalScanners != null) {
5505 scanners.addAll(additionalScanners);
5506 }
5507
5508 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
5509 Store store = stores.get(entry.getKey());
5510 KeyValueScanner scanner;
5511 try {
5512 scanner = store.getScanner(scan, entry.getValue(), this.readPt);
5513 } catch (FileNotFoundException e) {
5514 throw handleFileNotFound(e);
5515 }
5516 if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
5517 || this.filter.isFamilyEssential(entry.getKey())) {
5518 scanners.add(scanner);
5519 } else {
5520 joinedScanners.add(scanner);
5521 }
5522 }
5523 initializeKVHeap(scanners, joinedScanners, region);
5524 }
5525
5526 protected void initializeKVHeap(List<KeyValueScanner> scanners,
5527 List<KeyValueScanner> joinedScanners, HRegion region)
5528 throws IOException {
5529 this.storeHeap = new KeyValueHeap(scanners, region.comparator);
5530 if (!joinedScanners.isEmpty()) {
5531 this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
5532 }
5533 }
5534
5535 @Override
5536 public long getMaxResultSize() {
5537 return maxResultSize;
5538 }
5539
5540 @Override
5541 public long getMvccReadPoint() {
5542 return this.readPt;
5543 }
5544
5545 @Override
5546 public int getBatch() {
5547 return this.defaultScannerContext.getBatchLimit();
5548 }
5549
5550
5551
5552
5553
5554
5555 protected void resetFilters() throws IOException {
5556 if (filter != null) {
5557 filter.reset();
5558 }
5559 }
5560
5561 @Override
5562 public boolean next(List<Cell> outResults)
5563 throws IOException {
5564
5565 return next(outResults, defaultScannerContext);
5566 }
5567
5568 @Override
5569 public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
5570 throws IOException {
5571 if (this.filterClosed) {
5572 throw new UnknownScannerException("Scanner was closed (timed out?) " +
5573 "after we renewed it. Could be caused by a very slow scanner " +
5574 "or a lengthy garbage collection");
5575 }
5576 startRegionOperation(Operation.SCAN);
5577 readRequestsCount.increment();
5578 try {
5579 return nextRaw(outResults, scannerContext);
5580 } finally {
5581 closeRegionOperation(Operation.SCAN);
5582 }
5583 }
5584
5585 @Override
5586 public boolean nextRaw(List<Cell> outResults) throws IOException {
5587
5588 return nextRaw(outResults, defaultScannerContext);
5589 }
5590
5591 @Override
5592 public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
5593 throws IOException {
5594 if (storeHeap == null) {
5595
5596 throw new UnknownScannerException("Scanner was closed");
5597 }
5598 boolean moreValues;
5599 if (outResults.isEmpty()) {
5600
5601
5602 moreValues = nextInternal(outResults, scannerContext);
5603 } else {
5604 List<Cell> tmpList = new ArrayList<Cell>();
5605 moreValues = nextInternal(tmpList, scannerContext);
5606 outResults.addAll(tmpList);
5607 }
5608
5609
5610
5611
5612 if (!scannerContext.partialResultFormed()) resetFilters();
5613
5614 if (isFilterDoneInternal()) {
5615 moreValues = false;
5616 }
5617 return moreValues;
5618 }
5619
5620
5621
5622
5623 private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
5624 throws IOException {
5625 assert joinedContinuationRow != null;
5626 boolean moreValues =
5627 populateResult(results, this.joinedHeap, scannerContext,
5628 joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
5629 joinedContinuationRow.getRowLength());
5630
5631 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5632
5633 joinedContinuationRow = null;
5634 }
5635
5636
5637 Collections.sort(results, comparator);
5638 return moreValues;
5639 }
5640
5641
5642
5643
5644
5645
5646
5647
5648
5649
5650
5651 private boolean populateResult(List<Cell> results, KeyValueHeap heap,
5652 ScannerContext scannerContext, byte[] currentRow, int offset, short length)
5653 throws IOException {
5654 Cell nextKv;
5655 boolean moreCellsInRow = false;
5656 boolean tmpKeepProgress = scannerContext.getKeepProgress();
5657
5658 LimitScope limitScope = LimitScope.BETWEEN_CELLS;
5659 try {
5660 do {
5661
5662
5663
5664 scannerContext.setKeepProgress(true);
5665 heap.next(results, scannerContext);
5666 scannerContext.setKeepProgress(tmpKeepProgress);
5667
5668 nextKv = heap.peek();
5669 moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
5670 if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
5671
5672 if (scannerContext.checkBatchLimit(limitScope)) {
5673 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
5674 } else if (scannerContext.checkSizeLimit(limitScope)) {
5675 ScannerContext.NextState state =
5676 moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;
5677 return scannerContext.setScannerState(state).hasMoreValues();
5678 } else if (scannerContext.checkTimeLimit(limitScope)) {
5679 ScannerContext.NextState state =
5680 moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;
5681 return scannerContext.setScannerState(state).hasMoreValues();
5682 }
5683 } while (moreCellsInRow);
5684 } catch (FileNotFoundException e) {
5685 throw handleFileNotFound(e);
5686 }
5687 return nextKv != null;
5688 }
5689
5690
5691
5692
5693
5694
5695
5696
5697
5698
5699
5700 private boolean moreCellsInRow(final Cell nextKv, byte[] currentRow, int offset,
5701 short length) {
5702 return nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length);
5703 }
5704
5705
5706
5707
5708 @Override
5709 public synchronized boolean isFilterDone() throws IOException {
5710 return isFilterDoneInternal();
5711 }
5712
5713 private boolean isFilterDoneInternal() throws IOException {
5714 return this.filter != null && this.filter.filterAllRemaining();
5715 }
5716
5717 private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
5718 throws IOException {
5719 if (!results.isEmpty()) {
5720 throw new IllegalArgumentException("First parameter should be an empty list");
5721 }
5722 if (scannerContext == null) {
5723 throw new IllegalArgumentException("Scanner context cannot be null");
5724 }
5725 RpcCallContext rpcCall = RpcServer.getCurrentCall();
5726
5727
5728
5729
5730 int initialBatchProgress = scannerContext.getBatchProgress();
5731 long initialSizeProgress = scannerContext.getSizeProgress();
5732 long initialTimeProgress = scannerContext.getTimeProgress();
5733
5734
5735
5736
5737
5738
5739 while (true) {
5740
5741
5742 if (scannerContext.getKeepProgress()) {
5743
5744 scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
5745 initialTimeProgress);
5746 } else {
5747 scannerContext.clearProgress();
5748 }
5749
5750 if (rpcCall != null) {
5751
5752
5753
5754
5755 long afterTime = rpcCall.disconnectSince();
5756 if (afterTime >= 0) {
5757 throw new CallerDisconnectedException(
5758 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " +
5759 this + " after " + afterTime + " ms, since " +
5760 "caller disconnected");
5761 }
5762 }
5763
5764
5765 Cell current = this.storeHeap.peek();
5766
5767 byte[] currentRow = null;
5768 int offset = 0;
5769 short length = 0;
5770 if (current != null) {
5771 currentRow = current.getRowArray();
5772 offset = current.getRowOffset();
5773 length = current.getRowLength();
5774 }
5775
5776 boolean stopRow = isStopRow(currentRow, offset, length);
5777
5778
5779
5780
5781 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
5782
5783
5784
5785
5786
5787 if (hasFilterRow) {
5788 if (LOG.isTraceEnabled()) {
5789 LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
5790 + " formed. Changing scope of limits that may create partials");
5791 }
5792 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
5793 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
5794 }
5795
5796
5797
5798 if (joinedContinuationRow == null) {
5799
5800 if (stopRow) {
5801 if (hasFilterRow) {
5802 filter.filterRowCells(results);
5803 }
5804 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5805 }
5806
5807
5808
5809 if (filterRowKey(currentRow, offset, length)) {
5810 incrementCountOfRowsFilteredMetric(scannerContext);
5811
5812
5813
5814 incrementCountOfRowsScannedMetric(scannerContext);
5815 boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
5816 if (!moreRows) {
5817 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5818 }
5819 results.clear();
5820 continue;
5821 }
5822
5823
5824 populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
5825
5826 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5827 if (hasFilterRow) {
5828 throw new IncompatibleFilterException(
5829 "Filter whose hasFilterRow() returns true is incompatible with scans that must "
5830 + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
5831 }
5832 return true;
5833 }
5834
5835 Cell nextKv = this.storeHeap.peek();
5836 stopRow = nextKv == null ||
5837 isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
5838
5839 final boolean isEmptyRow = results.isEmpty();
5840
5841
5842
5843 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
5844 if (hasFilterRow) {
5845 ret = filter.filterRowCellsWithRet(results);
5846
5847
5848
5849
5850 long timeProgress = scannerContext.getTimeProgress();
5851 if (scannerContext.getKeepProgress()) {
5852 scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
5853 initialTimeProgress);
5854 } else {
5855 scannerContext.clearProgress();
5856 }
5857 scannerContext.setTimeProgress(timeProgress);
5858 scannerContext.incrementBatchProgress(results.size());
5859 for (Cell cell : results) {
5860 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
5861 }
5862 }
5863
5864 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
5865 incrementCountOfRowsFilteredMetric(scannerContext);
5866 results.clear();
5867 boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
5868 if (!moreRows) {
5869 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5870 }
5871
5872
5873
5874 if (!stopRow) continue;
5875 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5876 }
5877
5878
5879
5880
5881
5882 if (this.joinedHeap != null) {
5883 boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
5884 if (mayHaveData) {
5885 joinedContinuationRow = current;
5886 populateFromJoinedHeap(results, scannerContext);
5887
5888 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5889 return true;
5890 }
5891 }
5892 }
5893 } else {
5894
5895 populateFromJoinedHeap(results, scannerContext);
5896 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5897 return true;
5898 }
5899 }
5900
5901
5902 if (joinedContinuationRow != null) {
5903 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
5904 }
5905
5906
5907
5908
5909 if (results.isEmpty()) {
5910 incrementCountOfRowsFilteredMetric(scannerContext);
5911 boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
5912 if (!moreRows) {
5913 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5914 }
5915 if (!stopRow) continue;
5916 }
5917
5918 if (stopRow) {
5919 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5920 } else {
5921 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
5922 }
5923 }
5924 }
5925
5926 protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
5927 if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
5928
5929 scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
5930 }
5931
5932 protected void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
5933 if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
5934
5935 scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
5936 }
5937
5938
5939
5940
5941
5942
5943
5944
5945 private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length)
5946 throws IOException {
5947 Cell nextJoinedKv = joinedHeap.peek();
5948 boolean matchCurrentRow =
5949 nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length);
5950 boolean matchAfterSeek = false;
5951
5952
5953
5954 if (!matchCurrentRow) {
5955 Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length);
5956 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
5957 matchAfterSeek =
5958 seekSuccessful && joinedHeap.peek() != null
5959 && CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length);
5960 }
5961
5962 return matchCurrentRow || matchAfterSeek;
5963 }
5964
5965
5966
5967
5968
5969
5970
5971
5972 private boolean filterRow() throws IOException {
5973
5974
5975 return filter != null && (!filter.hasFilterRow())
5976 && filter.filterRow();
5977 }
5978
5979 private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
5980 return filter != null
5981 && filter.filterRowKey(row, offset, length);
5982 }
5983
5984 protected boolean nextRow(ScannerContext scannerContext, byte[] currentRow, int offset,
5985 short length) throws IOException {
5986 assert this.joinedContinuationRow == null:
5987 "Trying to go to next row during joinedHeap read.";
5988 Cell next;
5989 while ((next = this.storeHeap.peek()) != null &&
5990 CellUtil.matchingRow(next, currentRow, offset, length)) {
5991 this.storeHeap.next(MOCKED_LIST);
5992 }
5993 resetFilters();
5994
5995
5996 return this.region.getCoprocessorHost() == null
5997 || this.region.getCoprocessorHost()
5998 .postScannerFilterRow(this, currentRow, offset, length);
5999 }
6000
6001 protected boolean isStopRow(byte[] currentRow, int offset, short length) {
6002 return currentRow == null ||
6003 (stopRow != null &&
6004 comparator.compareRows(stopRow, 0, stopRow.length,
6005 currentRow, offset, length) <= isScan);
6006 }
6007
6008 @Override
6009 public synchronized void close() {
6010 if (storeHeap != null) {
6011 storeHeap.close();
6012 storeHeap = null;
6013 }
6014 if (joinedHeap != null) {
6015 joinedHeap.close();
6016 joinedHeap = null;
6017 }
6018
6019 scannerReadPoints.remove(this);
6020 this.filterClosed = true;
6021 }
6022
6023 KeyValueHeap getStoreHeapForTesting() {
6024 return storeHeap;
6025 }
6026
6027 @Override
6028 public synchronized boolean reseek(byte[] row) throws IOException {
6029 if (row == null) {
6030 throw new IllegalArgumentException("Row cannot be null.");
6031 }
6032 boolean result = false;
6033 startRegionOperation();
6034 KeyValue kv = KeyValueUtil.createFirstOnRow(row);
6035 try {
6036
6037 result = this.storeHeap.requestSeek(kv, true, true);
6038 if (this.joinedHeap != null) {
6039 result = this.joinedHeap.requestSeek(kv, true, true) || result;
6040 }
6041 } catch (FileNotFoundException e) {
6042 throw handleFileNotFound(e);
6043 } finally {
6044 closeRegionOperation();
6045 }
6046 return result;
6047 }
6048
6049 private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException {
6050
6051
6052 try {
6053 region.refreshStoreFiles(true);
6054 return new IOException("unable to read store file");
6055 } catch (IOException e) {
6056 String msg = "a store file got lost: " + fnfe.getMessage();
6057 LOG.error(msg);
6058 LOG.error("unable to refresh store files", e);
6059 abortRegionServer(msg);
6060 return new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " closing");
6061 }
6062 }
6063
6064 private void abortRegionServer(String msg) throws IOException {
6065 if (rsServices instanceof HRegionServer) {
6066 ((HRegionServer)rsServices).abort(msg);
6067 }
6068 throw new UnsupportedOperationException("not able to abort RS after: " + msg);
6069 }
6070 }
6071
6072
6073
6074
6075
6076
6077
6078
6079
6080
6081
6082
6083
6084
6085
6086
6087
6088
6089
6090
6091 static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
6092 Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
6093 RegionServerServices rsServices) {
6094 try {
6095 @SuppressWarnings("unchecked")
6096 Class<? extends HRegion> regionClass =
6097 (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
6098
6099 Constructor<? extends HRegion> c =
6100 regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
6101 Configuration.class, HRegionInfo.class, HTableDescriptor.class,
6102 RegionServerServices.class);
6103
6104 return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
6105 } catch (Throwable e) {
6106
6107 throw new IllegalStateException("Could not instantiate a region instance.", e);
6108 }
6109 }
6110
6111
6112
6113
6114
6115
6116
6117
6118
6119
6120
6121
6122
6123
6124
6125
6126
6127 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6128 final Configuration conf, final HTableDescriptor hTableDescriptor)
6129 throws IOException {
6130 return createHRegion(info, rootDir, conf, hTableDescriptor, null);
6131 }
6132
6133
6134
6135
6136
6137
6138
6139
6140
6141
6142
6143 public static void closeHRegion(final HRegion r) throws IOException {
6144 if (r == null) return;
6145 r.close();
6146 if (r.getWAL() == null) return;
6147 r.getWAL().close();
6148 }
6149
6150
6151
6152
6153
6154
6155
6156
6157
6158
6159
6160
6161
6162
6163 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6164 final Configuration conf,
6165 final HTableDescriptor hTableDescriptor,
6166 final WAL wal,
6167 final boolean initialize)
6168 throws IOException {
6169 return createHRegion(info, rootDir, conf, hTableDescriptor,
6170 wal, initialize, false);
6171 }
6172
6173
6174
6175
6176
6177
6178
6179
6180
6181
6182
6183
6184
6185
6186
6187 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6188 final Configuration conf,
6189 final HTableDescriptor hTableDescriptor,
6190 final WAL wal,
6191 final boolean initialize, final boolean ignoreWAL)
6192 throws IOException {
6193 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6194 return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, wal, initialize,
6195 ignoreWAL);
6196 }
6197
6198
6199
6200
6201
6202
6203
6204
6205
6206
6207
6208
6209
6210
6211
6212
6213 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6214 final Path tableDir, final Configuration conf, final HTableDescriptor hTableDescriptor,
6215 final WAL wal, final boolean initialize, final boolean ignoreWAL)
6216 throws IOException {
6217 LOG.info("creating HRegion " + info.getTable().getNameAsString()
6218 + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
6219 " Table name == " + info.getTable().getNameAsString());
6220 FileSystem fs = FileSystem.get(conf);
6221 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
6222 WAL effectiveWAL = wal;
6223 if (wal == null && !ignoreWAL) {
6224
6225
6226
6227 Configuration confForWAL = new Configuration(conf);
6228 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
6229 effectiveWAL = (new WALFactory(confForWAL,
6230 Collections.<WALActionsListener>singletonList(new MetricsWAL()),
6231 "hregion-" + RandomStringUtils.randomNumeric(8))).
6232 getWAL(info.getEncodedNameAsBytes());
6233 }
6234 HRegion region = HRegion.newHRegion(tableDir,
6235 effectiveWAL, fs, conf, info, hTableDescriptor, null);
6236 if (initialize) region.initialize(null);
6237 return region;
6238 }
6239
6240 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6241 final Configuration conf,
6242 final HTableDescriptor hTableDescriptor,
6243 final WAL wal)
6244 throws IOException {
6245 return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
6246 }
6247
6248
6249
6250
6251
6252
6253
6254
6255
6256
6257
6258
6259
6260 public static HRegion openHRegion(final HRegionInfo info,
6261 final HTableDescriptor htd, final WAL wal,
6262 final Configuration conf)
6263 throws IOException {
6264 return openHRegion(info, htd, wal, conf, null, null);
6265 }
6266
6267
6268
6269
6270
6271
6272
6273
6274
6275
6276
6277
6278
6279
6280
6281
6282 public static HRegion openHRegion(final HRegionInfo info,
6283 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6284 final RegionServerServices rsServices,
6285 final CancelableProgressable reporter)
6286 throws IOException {
6287 return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
6288 }
6289
6290
6291
6292
6293
6294
6295
6296
6297
6298
6299
6300
6301
6302
6303 public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
6304 final HTableDescriptor htd, final WAL wal, final Configuration conf)
6305 throws IOException {
6306 return openHRegion(rootDir, info, htd, wal, conf, null, null);
6307 }
6308
6309
6310
6311
6312
6313
6314
6315
6316
6317
6318
6319
6320
6321
6322
6323
6324 public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
6325 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6326 final RegionServerServices rsServices,
6327 final CancelableProgressable reporter)
6328 throws IOException {
6329 FileSystem fs = null;
6330 if (rsServices != null) {
6331 fs = rsServices.getFileSystem();
6332 }
6333 if (fs == null) {
6334 fs = FileSystem.get(conf);
6335 }
6336 return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
6337 }
6338
6339
6340
6341
6342
6343
6344
6345
6346
6347
6348
6349
6350
6351
6352
6353 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6354 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
6355 throws IOException {
6356 return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
6357 }
6358
6359
6360
6361
6362
6363
6364
6365
6366
6367
6368
6369
6370
6371
6372
6373
6374
6375 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6376 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
6377 final RegionServerServices rsServices, final CancelableProgressable reporter)
6378 throws IOException {
6379 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6380 return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
6381 }
6382
6383
6384
6385
6386
6387
6388
6389
6390
6391
6392
6393
6394
6395
6396
6397
6398
6399 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6400 final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
6401 final WAL wal, final RegionServerServices rsServices,
6402 final CancelableProgressable reporter)
6403 throws IOException {
6404 if (info == null) throw new NullPointerException("Passed region info is null");
6405 if (LOG.isDebugEnabled()) {
6406 LOG.debug("Opening region: " + info);
6407 }
6408 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
6409 return r.openHRegion(reporter);
6410 }
6411
6412
6413
6414
6415
6416
6417
6418
6419
6420 public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
6421 throws IOException {
6422 HRegionFileSystem regionFs = other.getRegionFileSystem();
6423 HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
6424 other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
6425 return r.openHRegion(reporter);
6426 }
6427
6428 public static Region openHRegion(final Region other, final CancelableProgressable reporter)
6429 throws IOException {
6430 return openHRegion((HRegion)other, reporter);
6431 }
6432
6433
6434
6435
6436
6437
6438
6439 protected HRegion openHRegion(final CancelableProgressable reporter)
6440 throws IOException {
6441
6442 checkCompressionCodecs();
6443
6444
6445 checkEncryption();
6446
6447 checkClassLoading();
6448 this.openSeqNum = initialize(reporter);
6449 this.mvcc.advanceTo(openSeqNum);
6450 if (wal != null && getRegionServerServices() != null && !writestate.readOnly
6451 && !recovering) {
6452
6453
6454
6455 writeRegionOpenMarker(wal, openSeqNum);
6456 }
6457 return this;
6458 }
6459
6460 public static void warmupHRegion(final HRegionInfo info,
6461 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6462 final RegionServerServices rsServices,
6463 final CancelableProgressable reporter)
6464 throws IOException {
6465
6466 if (info == null) throw new NullPointerException("Passed region info is null");
6467
6468 if (LOG.isDebugEnabled()) {
6469 LOG.debug("HRegion.Warming up region: " + info);
6470 }
6471
6472 Path rootDir = FSUtils.getRootDir(conf);
6473 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6474
6475 FileSystem fs = null;
6476 if (rsServices != null) {
6477 fs = rsServices.getFileSystem();
6478 }
6479 if (fs == null) {
6480 fs = FileSystem.get(conf);
6481 }
6482
6483 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null);
6484 r.initializeWarmup(reporter);
6485 }
6486
6487
6488 private void checkCompressionCodecs() throws IOException {
6489 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6490 CompressionTest.testCompression(fam.getCompression());
6491 CompressionTest.testCompression(fam.getCompactionCompression());
6492 }
6493 }
6494
6495 private void checkEncryption() throws IOException {
6496 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6497 EncryptionTest.testEncryption(conf, fam.getEncryptionType(), fam.getEncryptionKey());
6498 }
6499 }
6500
6501 private void checkClassLoading() throws IOException {
6502 RegionSplitPolicy.getSplitPolicyClass(this.htableDescriptor, conf);
6503 RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor);
6504 }
6505
6506
6507
6508
6509
6510
6511 HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
6512
6513 fs.commitDaughterRegion(hri);
6514
6515
6516 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
6517 this.getBaseConf(), hri, this.getTableDesc(), rsServices);
6518 r.readRequestsCount.set(this.getReadRequestsCount() / 2);
6519 r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
6520 return r;
6521 }
6522
6523
6524
6525
6526
6527
6528
6529 HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
6530 final HRegion region_b) throws IOException {
6531 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
6532 fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
6533 this.getTableDesc(), this.rsServices);
6534 r.readRequestsCount.set(this.getReadRequestsCount()
6535 + region_b.getReadRequestsCount());
6536 r.writeRequestsCount.set(this.getWriteRequestsCount()
6537
6538 + region_b.getWriteRequestsCount());
6539 this.fs.commitMergedRegion(mergedRegionInfo);
6540 return r;
6541 }
6542
6543
6544
6545
6546
6547
6548
6549
6550
6551
6552
6553
6554 public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
6555 meta.checkResources();
6556
6557 byte[] row = r.getRegionInfo().getRegionName();
6558 final long now = EnvironmentEdgeManager.currentTime();
6559 final List<Cell> cells = new ArrayList<Cell>(2);
6560 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6561 HConstants.REGIONINFO_QUALIFIER, now,
6562 r.getRegionInfo().toByteArray()));
6563
6564 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6565 HConstants.META_VERSION_QUALIFIER, now,
6566 Bytes.toBytes(HConstants.META_VERSION)));
6567 meta.put(row, HConstants.CATALOG_FAMILY, cells);
6568 }
6569
6570
6571
6572
6573
6574
6575
6576
6577
6578 @Deprecated
6579 public static Path getRegionDir(final Path tabledir, final String name) {
6580 return new Path(tabledir, name);
6581 }
6582
6583
6584
6585
6586
6587
6588
6589
6590
6591 @Deprecated
6592 @VisibleForTesting
6593 public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
6594 return new Path(
6595 FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
6596 }
6597
6598
6599
6600
6601
6602
6603
6604
6605
6606 public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
6607 return ((info.getStartKey().length == 0) ||
6608 (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
6609 ((info.getEndKey().length == 0) ||
6610 (Bytes.compareTo(info.getEndKey(), row) > 0));
6611 }
6612
6613 public static boolean rowIsInRange(HRegionInfo info, final byte [] row, final int offset,
6614 final short length) {
6615 return ((info.getStartKey().length == 0) ||
6616 (Bytes.compareTo(info.getStartKey(), 0, info.getStartKey().length,
6617 row, offset, length) <= 0)) &&
6618 ((info.getEndKey().length == 0) ||
6619 (Bytes.compareTo(info.getEndKey(), 0, info.getEndKey().length, row, offset, length) > 0));
6620 }
6621
6622
6623
6624
6625
6626
6627
6628 public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
6629 throws IOException {
6630 HRegion a = srcA;
6631 HRegion b = srcB;
6632
6633
6634
6635 if (srcA.getRegionInfo().getStartKey() == null) {
6636 if (srcB.getRegionInfo().getStartKey() == null) {
6637 throw new IOException("Cannot merge two regions with null start key");
6638 }
6639
6640 } else if ((srcB.getRegionInfo().getStartKey() == null) ||
6641 (Bytes.compareTo(srcA.getRegionInfo().getStartKey(),
6642 srcB.getRegionInfo().getStartKey()) > 0)) {
6643 a = srcB;
6644 b = srcA;
6645 }
6646
6647 if (!(Bytes.compareTo(a.getRegionInfo().getEndKey(),
6648 b.getRegionInfo().getStartKey()) == 0)) {
6649 throw new IOException("Cannot merge non-adjacent regions");
6650 }
6651 return merge(a, b);
6652 }
6653
6654
6655
6656
6657
6658
6659
6660
6661
6662 public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
6663 if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
6664 throw new IOException("Regions do not belong to the same table");
6665 }
6666
6667 FileSystem fs = a.getRegionFileSystem().getFileSystem();
6668
6669 a.flush(true);
6670 b.flush(true);
6671
6672
6673 a.compact(true);
6674 if (LOG.isDebugEnabled()) {
6675 LOG.debug("Files for region: " + a);
6676 a.getRegionFileSystem().logFileSystemState(LOG);
6677 }
6678 b.compact(true);
6679 if (LOG.isDebugEnabled()) {
6680 LOG.debug("Files for region: " + b);
6681 b.getRegionFileSystem().logFileSystemState(LOG);
6682 }
6683
6684 RegionMergeTransactionImpl rmt = new RegionMergeTransactionImpl(a, b, true);
6685 if (!rmt.prepare(null)) {
6686 throw new IOException("Unable to merge regions " + a + " and " + b);
6687 }
6688 HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
6689 LOG.info("starting merge of regions: " + a + " and " + b
6690 + " into new region " + mergedRegionInfo.getRegionNameAsString()
6691 + " with start key <"
6692 + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
6693 + "> and end key <"
6694 + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
6695 HRegion dstRegion;
6696 try {
6697 dstRegion = (HRegion)rmt.execute(null, null);
6698 } catch (IOException ioe) {
6699 rmt.rollback(null, null);
6700 throw new IOException("Failed merging region " + a + " and " + b
6701 + ", and successfully rolled back");
6702 }
6703 dstRegion.compact(true);
6704
6705 if (LOG.isDebugEnabled()) {
6706 LOG.debug("Files for new region");
6707 dstRegion.getRegionFileSystem().logFileSystemState(LOG);
6708 }
6709
6710 if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
6711 throw new IOException("Merged region " + dstRegion
6712 + " still has references after the compaction, is compaction canceled?");
6713 }
6714
6715
6716 HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
6717
6718 HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
6719
6720 LOG.info("merge completed. New region is " + dstRegion);
6721 return dstRegion;
6722 }
6723
6724 @Override
6725 public Result get(final Get get) throws IOException {
6726 checkRow(get.getRow(), "Get");
6727
6728 if (get.hasFamilies()) {
6729 for (byte [] family: get.familySet()) {
6730 checkFamily(family);
6731 }
6732 } else {
6733 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
6734 get.addFamily(family);
6735 }
6736 }
6737 List<Cell> results = get(get, true);
6738 boolean stale = this.getRegionInfo().getReplicaId() != 0;
6739 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
6740 }
6741
6742 @Override
6743 public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
6744
6745 List<Cell> results = new ArrayList<Cell>();
6746
6747
6748 if (withCoprocessor && (coprocessorHost != null)) {
6749 if (coprocessorHost.preGet(get, results)) {
6750 return results;
6751 }
6752 }
6753
6754 Scan scan = new Scan(get);
6755
6756 RegionScanner scanner = null;
6757 try {
6758 scanner = getScanner(scan);
6759 scanner.next(results);
6760 } finally {
6761 if (scanner != null)
6762 scanner.close();
6763 }
6764
6765
6766 if (withCoprocessor && (coprocessorHost != null)) {
6767 coprocessorHost.postGet(get, results);
6768 }
6769
6770
6771 if (this.metricsRegion != null) {
6772 long totalSize = 0L;
6773 for (Cell cell : results) {
6774 totalSize += CellUtil.estimatedSerializedSizeOf(cell);
6775 }
6776 this.metricsRegion.updateGet(totalSize);
6777 }
6778
6779 return results;
6780 }
6781
6782 @Override
6783 public void mutateRow(RowMutations rm) throws IOException {
6784
6785 mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
6786 }
6787
6788
6789
6790
6791
6792 public void mutateRowsWithLocks(Collection<Mutation> mutations,
6793 Collection<byte[]> rowsToLock) throws IOException {
6794 mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
6795 }
6796
6797
6798
6799
6800
6801
6802
6803
6804
6805
6806
6807
6808
6809 @Override
6810 public void mutateRowsWithLocks(Collection<Mutation> mutations,
6811 Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
6812 MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
6813 processRowsWithLocks(proc, -1, nonceGroup, nonce);
6814 }
6815
6816
6817
6818
6819 public ClientProtos.RegionLoadStats getRegionStats() {
6820 if (!regionStatsEnabled) {
6821 return null;
6822 }
6823 ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
6824 stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
6825 .memstoreFlushSize)));
6826 stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
6827 stats.setCompactionPressure((int)rsServices.getCompactionPressure()*100 > 100 ? 100 :
6828 (int)rsServices.getCompactionPressure()*100);
6829 return stats.build();
6830 }
6831
6832 @Override
6833 public void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException {
6834 processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE,
6835 HConstants.NO_NONCE);
6836 }
6837
6838 @Override
6839 public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
6840 throws IOException {
6841 processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
6842 }
6843
6844 @Override
6845 public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
6846 long nonceGroup, long nonce) throws IOException {
6847
6848 for (byte[] row : processor.getRowsToLock()) {
6849 checkRow(row, "processRowsWithLocks");
6850 }
6851 if (!processor.readOnly()) {
6852 checkReadOnly();
6853 }
6854 checkResources();
6855
6856 startRegionOperation();
6857 WALEdit walEdit = new WALEdit();
6858
6859
6860 try {
6861 processor.preProcess(this, walEdit);
6862 } catch (IOException e) {
6863 closeRegionOperation();
6864 throw e;
6865 }
6866
6867 if (processor.readOnly()) {
6868 try {
6869 long now = EnvironmentEdgeManager.currentTime();
6870 doProcessRowWithTimeout(
6871 processor, now, this, null, null, timeout);
6872 processor.postProcess(this, walEdit, true);
6873 } finally {
6874 closeRegionOperation();
6875 }
6876 return;
6877 }
6878
6879 MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
6880 boolean locked;
6881 boolean walSyncSuccessful = false;
6882 List<RowLock> acquiredRowLocks;
6883 long addedSize = 0;
6884 List<Mutation> mutations = new ArrayList<Mutation>();
6885 Collection<byte[]> rowsToLock = processor.getRowsToLock();
6886 long mvccNum = 0;
6887 WALKey walKey = null;
6888 try {
6889
6890 acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
6891 for (byte[] row : rowsToLock) {
6892
6893
6894 acquiredRowLocks.add(getRowLock(row));
6895 }
6896
6897 lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
6898 locked = true;
6899
6900 long now = EnvironmentEdgeManager.currentTime();
6901 try {
6902
6903
6904 doProcessRowWithTimeout(
6905 processor, now, this, mutations, walEdit, timeout);
6906
6907 if (!mutations.isEmpty()) {
6908
6909
6910 processor.preBatchMutate(this, walEdit);
6911
6912 long txid = 0;
6913
6914 if (!walEdit.isEmpty()) {
6915
6916 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
6917 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
6918 processor.getClusterIds(), nonceGroup, nonce, mvcc);
6919 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
6920 walKey, walEdit, false);
6921 }
6922 if(walKey == null){
6923
6924
6925 walKey = this.appendEmptyEdit(this.wal);
6926 }
6927
6928
6929 writeEntry = walKey.getWriteEntry();
6930 mvccNum = walKey.getSequenceId();
6931
6932
6933
6934
6935 for (Mutation m : mutations) {
6936
6937 rewriteCellTags(m.getFamilyCellMap(), m);
6938
6939 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
6940 Cell cell = cellScanner.current();
6941 CellUtil.setSequenceId(cell, mvccNum);
6942 Store store = getStore(cell);
6943 if (store == null) {
6944 checkFamily(CellUtil.cloneFamily(cell));
6945
6946 }
6947 addedSize += store.add(cell);
6948 }
6949 }
6950
6951
6952 if (locked) {
6953 this.updatesLock.readLock().unlock();
6954 locked = false;
6955 }
6956
6957
6958 releaseRowLocks(acquiredRowLocks);
6959
6960
6961 if (txid != 0) {
6962 syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
6963 }
6964 walSyncSuccessful = true;
6965
6966 processor.postBatchMutate(this);
6967 }
6968 } finally {
6969
6970
6971
6972 if (!mutations.isEmpty() && !walSyncSuccessful) {
6973 LOG.warn("Wal sync failed. Roll back " + mutations.size() +
6974 " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
6975 processor.getRowsToLock().iterator().next()) + "...");
6976 for (Mutation m : mutations) {
6977 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
6978 Cell cell = cellScanner.current();
6979 getStore(cell).rollback(cell);
6980 }
6981 }
6982 if (writeEntry != null) {
6983 mvcc.complete(writeEntry);
6984 writeEntry = null;
6985 }
6986 }
6987
6988 if (writeEntry != null) {
6989 mvcc.completeAndWait(writeEntry);
6990 }
6991 if (locked) {
6992 this.updatesLock.readLock().unlock();
6993 }
6994
6995 releaseRowLocks(acquiredRowLocks);
6996 }
6997
6998
6999 processor.postProcess(this, walEdit, walSyncSuccessful);
7000
7001 } finally {
7002 closeRegionOperation();
7003 if (!mutations.isEmpty() &&
7004 isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
7005 requestFlush();
7006 }
7007 }
7008 }
7009
7010 private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
7011 final long now,
7012 final HRegion region,
7013 final List<Mutation> mutations,
7014 final WALEdit walEdit,
7015 final long timeout) throws IOException {
7016
7017 if (timeout < 0) {
7018 try {
7019 processor.process(now, region, mutations, walEdit);
7020 } catch (IOException e) {
7021 LOG.warn("RowProcessor:" + processor.getClass().getName() +
7022 " throws Exception on row(s):" +
7023 Bytes.toStringBinary(
7024 processor.getRowsToLock().iterator().next()) + "...", e);
7025 throw e;
7026 }
7027 return;
7028 }
7029
7030
7031 FutureTask<Void> task =
7032 new FutureTask<Void>(new Callable<Void>() {
7033 @Override
7034 public Void call() throws IOException {
7035 try {
7036 processor.process(now, region, mutations, walEdit);
7037 return null;
7038 } catch (IOException e) {
7039 LOG.warn("RowProcessor:" + processor.getClass().getName() +
7040 " throws Exception on row(s):" +
7041 Bytes.toStringBinary(
7042 processor.getRowsToLock().iterator().next()) + "...", e);
7043 throw e;
7044 }
7045 }
7046 });
7047 rowProcessorExecutor.execute(task);
7048 try {
7049 task.get(timeout, TimeUnit.MILLISECONDS);
7050 } catch (TimeoutException te) {
7051 LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
7052 Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
7053 "...");
7054 throw new IOException(te);
7055 } catch (Exception e) {
7056 throw new IOException(e);
7057 }
7058 }
7059
7060
7061
7062
7063
7064
7065 private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {
7066 if (cell.getTagsLength() <= 0) return tags;
7067 List<Tag> newTags = tags == null? new ArrayList<Tag>():
7068 Iterator<Tag> i =
7069 CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
7070 while (i.hasNext()) newTags.add(i.next());
7071 return newTags;
7072 }
7073
7074
7075
7076
7077
7078
7079
7080
7081
7082
7083 private List<Cell> doGet(final Store store, final byte [] row,
7084 final Map.Entry<byte[], List<Cell>> family, final TimeRange tr)
7085 throws IOException {
7086
7087
7088
7089
7090 Collections.sort(family.getValue(), store.getComparator());
7091
7092 Get get = new Get(row);
7093 for (Cell cell : family.getValue()) {
7094 get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
7095 }
7096 if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax());
7097 return get(get, false);
7098 }
7099
7100 public Result append(Append append) throws IOException {
7101 return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
7102 }
7103
7104
7105
7106
7107
7108 @Override
7109 public Result append(Append mutate, long nonceGroup, long nonce) throws IOException {
7110 Operation op = Operation.APPEND;
7111 byte[] row = mutate.getRow();
7112 checkRow(row, op.toString());
7113 boolean flush = false;
7114 Durability durability = getEffectiveDurability(mutate.getDurability());
7115 boolean writeToWAL = durability != Durability.SKIP_WAL;
7116 WALEdit walEdits = null;
7117 List<Cell> allKVs = new ArrayList<Cell>(mutate.size());
7118 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
7119 long size = 0;
7120 long txid = 0;
7121 checkReadOnly();
7122 checkResources();
7123
7124 startRegionOperation(op);
7125 this.writeRequestsCount.increment();
7126 RowLock rowLock = null;
7127 WALKey walKey = null;
7128 boolean doRollBackMemstore = false;
7129 try {
7130 rowLock = getRowLock(row);
7131 assert rowLock != null;
7132 try {
7133 lock(this.updatesLock.readLock());
7134 try {
7135
7136
7137 mvcc.await();
7138 if (this.coprocessorHost != null) {
7139 Result r = this.coprocessorHost.preAppendAfterRowLock(mutate);
7140 if (r!= null) {
7141 return r;
7142 }
7143 }
7144 long now = EnvironmentEdgeManager.currentTime();
7145
7146 for (Map.Entry<byte[], List<Cell>> family : mutate.getFamilyCellMap().entrySet()) {
7147 Store store = stores.get(family.getKey());
7148 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
7149
7150 List<Cell> results = doGet(store, row, family, null);
7151
7152
7153
7154
7155
7156
7157
7158 int idx = 0;
7159 for (Cell cell : family.getValue()) {
7160 Cell newCell;
7161 Cell oldCell = null;
7162 if (idx < results.size()
7163 && CellUtil.matchingQualifier(results.get(idx), cell)) {
7164 oldCell = results.get(idx);
7165 long ts = Math.max(now, oldCell.getTimestamp());
7166
7167
7168
7169 List<Tag> tags = Tag.carryForwardTags(null, oldCell);
7170 tags = Tag.carryForwardTags(tags, cell);
7171 tags = carryForwardTTLTag(tags, mutate);
7172
7173 newCell = getNewCell(row, ts, cell, oldCell, Tag.fromList(tags));
7174
7175 idx++;
7176 } else {
7177
7178 CellUtil.updateLatestStamp(cell, now);
7179
7180
7181 newCell = getNewCell(mutate, cell);
7182 }
7183
7184
7185 if (coprocessorHost != null) {
7186 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
7187 mutate, oldCell, newCell);
7188 }
7189 kvs.add(newCell);
7190
7191
7192 if (writeToWAL) {
7193 if (walEdits == null) {
7194 walEdits = new WALEdit();
7195 }
7196 walEdits.add(newCell);
7197 }
7198 }
7199
7200
7201 tempMemstore.put(store, kvs);
7202 }
7203
7204
7205 if (walEdits != null && !walEdits.isEmpty()) {
7206 if (writeToWAL) {
7207
7208
7209
7210
7211 walKey = new HLogKey(
7212 getRegionInfo().getEncodedNameAsBytes(),
7213 this.htableDescriptor.getTableName(),
7214 WALKey.NO_SEQUENCE_ID,
7215 nonceGroup,
7216 nonce,
7217 mvcc);
7218 txid =
7219 this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);
7220 } else {
7221 recordMutationWithoutWal(mutate.getFamilyCellMap());
7222 }
7223 }
7224 if (walKey == null) {
7225
7226 walKey = this.appendEmptyEdit(this.wal);
7227 }
7228
7229
7230 walKey.getWriteEntry();
7231
7232
7233 if (!tempMemstore.isEmpty()) {
7234 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
7235 Store store = entry.getKey();
7236 if (store.getFamily().getMaxVersions() == 1) {
7237
7238
7239 size += store.upsert(entry.getValue(), getSmallestReadPoint());
7240 } else {
7241
7242 for (Cell cell: entry.getValue()) {
7243
7244
7245 CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber());
7246 size += store.add(cell);
7247 doRollBackMemstore = true;
7248 }
7249 }
7250
7251
7252 allKVs.addAll(entry.getValue());
7253 }
7254
7255 size = this.addAndGetGlobalMemstoreSize(size);
7256 flush = isFlushSize(size);
7257 }
7258 } finally {
7259 this.updatesLock.readLock().unlock();
7260 }
7261
7262 } finally {
7263 rowLock.release();
7264 rowLock = null;
7265 }
7266
7267 if(txid != 0){
7268 syncOrDefer(txid, durability);
7269 }
7270 doRollBackMemstore = false;
7271 } finally {
7272 if (rowLock != null) {
7273 rowLock.release();
7274 }
7275
7276 WriteEntry we = walKey != null? walKey.getWriteEntry(): null;
7277 if (doRollBackMemstore) {
7278 rollbackMemstore(allKVs);
7279 if (we != null) mvcc.complete(we);
7280 } else if (we != null) {
7281 mvcc.completeAndWait(we);
7282 }
7283
7284 closeRegionOperation(op);
7285 }
7286
7287 if (this.metricsRegion != null) {
7288 this.metricsRegion.updateAppend();
7289 }
7290
7291 if (flush) {
7292
7293 requestFlush();
7294 }
7295
7296 return mutate.isReturnResults() ? Result.create(allKVs) : null;
7297 }
7298
7299 private static Cell getNewCell(final byte [] row, final long ts, final Cell cell,
7300 final Cell oldCell, final byte [] tagBytes) {
7301
7302 Cell newCell = new KeyValue(row.length, cell.getFamilyLength(),
7303 cell.getQualifierLength(), ts, KeyValue.Type.Put,
7304 oldCell.getValueLength() + cell.getValueLength(),
7305 tagBytes == null? 0: tagBytes.length);
7306
7307 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
7308 newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
7309 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
7310 newCell.getFamilyArray(), newCell.getFamilyOffset(),
7311 cell.getFamilyLength());
7312 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
7313 newCell.getQualifierArray(), newCell.getQualifierOffset(),
7314 cell.getQualifierLength());
7315
7316 System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
7317 newCell.getValueArray(), newCell.getValueOffset(),
7318 oldCell.getValueLength());
7319 System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
7320 newCell.getValueArray(),
7321 newCell.getValueOffset() + oldCell.getValueLength(),
7322 cell.getValueLength());
7323
7324 if (tagBytes != null) {
7325 System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
7326 tagBytes.length);
7327 }
7328 return newCell;
7329 }
7330
7331 private static Cell getNewCell(final Mutation mutate, final Cell cell) {
7332 Cell newCell = null;
7333 if (mutate.getTTL() != Long.MAX_VALUE) {
7334
7335 newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
7336 cell.getRowLength(),
7337 cell.getFamilyArray(), cell.getFamilyOffset(),
7338 cell.getFamilyLength(),
7339 cell.getQualifierArray(), cell.getQualifierOffset(),
7340 cell.getQualifierLength(),
7341 cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
7342 cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
7343 carryForwardTTLTag(mutate));
7344 } else {
7345 newCell = cell;
7346 }
7347 return newCell;
7348 }
7349
7350 public Result increment(Increment increment) throws IOException {
7351 return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
7352 }
7353
7354
7355
7356
7357
7358
7359
7360
7361
7362 @Override
7363 public Result increment(Increment mutation, long nonceGroup, long nonce)
7364 throws IOException {
7365 Operation op = Operation.INCREMENT;
7366 checkReadOnly();
7367 checkResources();
7368 checkRow(mutation.getRow(), op.toString());
7369 checkFamilies(mutation.getFamilyCellMap().keySet());
7370 startRegionOperation(op);
7371 this.writeRequestsCount.increment();
7372 try {
7373
7374
7375
7376
7377
7378
7379
7380
7381
7382
7383
7384
7385
7386
7387
7388
7389 return doIncrement(mutation, nonceGroup, nonce);
7390 } finally {
7391 if (this.metricsRegion != null) this.metricsRegion.updateIncrement();
7392 closeRegionOperation(op);
7393 }
7394 }
7395
7396 private Result doIncrement(Increment increment, long nonceGroup, long nonce) throws IOException {
7397 RowLock rowLock = null;
7398 WALKey walKey = null;
7399 boolean doRollBackMemstore = false;
7400 long accumulatedResultSize = 0;
7401 List<Cell> allKVs = new ArrayList<Cell>(increment.size());
7402 List<Cell> memstoreCells = new ArrayList<Cell>();
7403 Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
7404 try {
7405 rowLock = getRowLock(increment.getRow());
7406 long txid = 0;
7407 try {
7408 lock(this.updatesLock.readLock());
7409 try {
7410
7411
7412 this.mvcc.await();
7413 if (this.coprocessorHost != null) {
7414 Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
7415 if (r != null) return r;
7416 }
7417 long now = EnvironmentEdgeManager.currentTime();
7418 final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
7419 WALEdit walEdits = null;
7420
7421
7422 Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>();
7423 for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) {
7424 byte [] columnFamilyName = entry.getKey();
7425 List<Cell> increments = entry.getValue();
7426 Store store = this.stores.get(columnFamilyName);
7427
7428
7429 List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName,
7430 sort(increments, store.getComparator()), now,
7431 MultiVersionConcurrencyControl.NO_WRITE_NUMBER, allKVs, null);
7432 if (!results.isEmpty()) {
7433 forMemStore.put(store, results);
7434
7435 if (writeToWAL) {
7436 if (walEdits == null) walEdits = new WALEdit();
7437 walEdits.getCells().addAll(results);
7438 }
7439 }
7440 }
7441
7442 if (walEdits != null && !walEdits.isEmpty()) {
7443
7444
7445
7446 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
7447 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce,
7448 getMVCC());
7449 txid =
7450 this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdits, true);
7451 } else {
7452
7453 walKey = this.appendEmptyEdit(this.wal);
7454 }
7455
7456 walKey.getWriteEntry();
7457
7458
7459 for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
7460 Store store = entry.getKey();
7461 List<Cell> results = entry.getValue();
7462 if (store.getFamily().getMaxVersions() == 1) {
7463
7464 accumulatedResultSize += store.upsert(results, getSmallestReadPoint());
7465
7466 } else {
7467
7468 for (Cell cell: results) {
7469
7470 CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber());
7471 accumulatedResultSize += store.add(cell);
7472 doRollBackMemstore = true;
7473 }
7474 }
7475 }
7476 } finally {
7477 this.updatesLock.readLock().unlock();
7478 }
7479 } finally {
7480 rowLock.release();
7481 rowLock = null;
7482 }
7483
7484 if(txid != 0) {
7485 syncOrDefer(txid, durability);
7486 }
7487 doRollBackMemstore = false;
7488 } finally {
7489 if (rowLock != null) {
7490 rowLock.release();
7491 }
7492
7493 if (doRollBackMemstore) {
7494 rollbackMemstore(memstoreCells);
7495 if (walKey != null) mvcc.complete(walKey.getWriteEntry());
7496 } else {
7497 if (walKey != null) mvcc.completeAndWait(walKey.getWriteEntry());
7498 }
7499 }
7500
7501
7502 if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush();
7503 return increment.isReturnResults() ? Result.create(allKVs) : null;
7504 }
7505
7506
7507
7508
7509 private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) {
7510 Collections.sort(cells, comparator);
7511 return cells;
7512 }
7513
7514
7515
7516
7517
7518
7519
7520
7521
7522
7523
7524
7525 private List<Cell> applyIncrementsToColumnFamily(Increment increment, byte[] columnFamilyName,
7526 List<Cell> sortedIncrements, long now, long mvccNum, List<Cell> allKVs,
7527 final IsolationLevel isolation)
7528 throws IOException {
7529 List<Cell> results = new ArrayList<Cell>(sortedIncrements.size());
7530 byte [] row = increment.getRow();
7531
7532 List<Cell> currentValues =
7533 getIncrementCurrentValue(increment, columnFamilyName, sortedIncrements, isolation);
7534
7535
7536 int idx = 0;
7537 for (int i = 0; i < sortedIncrements.size(); i++) {
7538 Cell inc = sortedIncrements.get(i);
7539 long incrementAmount = getLongValue(inc);
7540
7541 boolean writeBack = (incrementAmount != 0);
7542
7543 List<Tag> tags = Tag.carryForwardTags(inc);
7544
7545 Cell currentValue = null;
7546 long ts = now;
7547 if (idx < currentValues.size() && CellUtil.matchingQualifier(currentValues.get(idx), inc)) {
7548 currentValue = currentValues.get(idx);
7549 ts = Math.max(now, currentValue.getTimestamp());
7550 incrementAmount += getLongValue(currentValue);
7551
7552 tags = Tag.carryForwardTags(tags, currentValue);
7553 if (i < (sortedIncrements.size() - 1) &&
7554 !CellUtil.matchingQualifier(inc, sortedIncrements.get(i + 1))) idx++;
7555 }
7556
7557
7558 byte [] qualifier = CellUtil.cloneQualifier(inc);
7559 byte [] incrementAmountInBytes = Bytes.toBytes(incrementAmount);
7560 tags = carryForwardTTLTag(tags, increment);
7561
7562 Cell newValue = new KeyValue(row, 0, row.length,
7563 columnFamilyName, 0, columnFamilyName.length,
7564 qualifier, 0, qualifier.length,
7565 ts, KeyValue.Type.Put,
7566 incrementAmountInBytes, 0, incrementAmountInBytes.length,
7567 tags);
7568
7569
7570
7571 if (mvccNum != MultiVersionConcurrencyControl.NO_WRITE_NUMBER) {
7572 CellUtil.setSequenceId(newValue, mvccNum);
7573 }
7574
7575
7576 if (coprocessorHost != null) {
7577 newValue = coprocessorHost.postMutationBeforeWAL(
7578 RegionObserver.MutationType.INCREMENT, increment, currentValue, newValue);
7579 }
7580 allKVs.add(newValue);
7581 if (writeBack) {
7582 results.add(newValue);
7583 }
7584 }
7585 return results;
7586 }
7587
7588
7589
7590
7591
7592 private static long getLongValue(final Cell cell) throws DoNotRetryIOException {
7593 int len = cell.getValueLength();
7594 if (len != Bytes.SIZEOF_LONG) {
7595
7596 throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");
7597 }
7598 return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len);
7599 }
7600
7601
7602
7603
7604
7605
7606
7607
7608
7609
7610 private List<Cell> getIncrementCurrentValue(final Increment increment, byte [] columnFamily,
7611 final List<Cell> increments, final IsolationLevel isolation)
7612 throws IOException {
7613 Get get = new Get(increment.getRow());
7614 if (isolation != null) get.setIsolationLevel(isolation);
7615 for (Cell cell: increments) {
7616 get.addColumn(columnFamily, CellUtil.cloneQualifier(cell));
7617 }
7618 TimeRange tr = increment.getTimeRange();
7619 if (tr != null) {
7620 get.setTimeRange(tr.getMin(), tr.getMax());
7621 }
7622 return get(get, false);
7623 }
7624
7625 private static List<Tag> carryForwardTTLTag(final Mutation mutation) {
7626 return carryForwardTTLTag(null, mutation);
7627 }
7628
7629
7630
7631
7632 private static List<Tag> carryForwardTTLTag(final List<Tag> tagsOrNull,
7633 final Mutation mutation) {
7634 long ttl = mutation.getTTL();
7635 if (ttl == Long.MAX_VALUE) return tagsOrNull;
7636 List<Tag> tags = tagsOrNull;
7637
7638
7639
7640 if (tags == null) tags = new ArrayList<Tag>(1);
7641 tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
7642 return tags;
7643 }
7644
7645
7646
7647
7648
7649 private void checkFamily(final byte [] family)
7650 throws NoSuchColumnFamilyException {
7651 if (!this.htableDescriptor.hasFamily(family)) {
7652 throw new NoSuchColumnFamilyException("Column family " +
7653 Bytes.toString(family) + " does not exist in region " + this
7654 + " in table " + this.htableDescriptor);
7655 }
7656 }
7657
7658 public static final long FIXED_OVERHEAD = ClassSize.align(
7659 ClassSize.OBJECT +
7660 ClassSize.ARRAY +
7661 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
7662 (14 * Bytes.SIZEOF_LONG) +
7663 5 * Bytes.SIZEOF_BOOLEAN);
7664
7665
7666
7667
7668
7669
7670
7671
7672
7673
7674
7675 public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
7676 ClassSize.OBJECT +
7677 (2 * ClassSize.ATOMIC_BOOLEAN) +
7678 (3 * ClassSize.ATOMIC_LONG) +
7679 (2 * ClassSize.CONCURRENT_HASHMAP) +
7680 WriteState.HEAP_SIZE +
7681 ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY +
7682 (2 * ClassSize.REENTRANT_LOCK) +
7683 MultiVersionConcurrencyControl.FIXED_SIZE
7684 + ClassSize.TREEMAP
7685 + 2 * ClassSize.ATOMIC_INTEGER
7686 ;
7687
7688 @Override
7689 public long heapSize() {
7690 long heapSize = DEEP_OVERHEAD;
7691 for (Store store : this.stores.values()) {
7692 heapSize += store.heapSize();
7693 }
7694
7695 return heapSize;
7696 }
7697
7698
7699
7700
7701
7702 private static void printUsageAndExit(final String message) {
7703 if (message != null && message.length() > 0) System.out.println(message);
7704 System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
7705 System.out.println("Options:");
7706 System.out.println(" major_compact Pass this option to major compact " +
7707 "passed region.");
7708 System.out.println("Default outputs scan of passed region.");
7709 System.exit(1);
7710 }
7711
7712 @Override
7713 public boolean registerService(Service instance) {
7714
7715
7716
7717 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
7718 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
7719 LOG.error("Coprocessor service " + serviceDesc.getFullName() +
7720 " already registered, rejecting request from " + instance
7721 );
7722 return false;
7723 }
7724
7725 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
7726 if (LOG.isDebugEnabled()) {
7727 LOG.debug("Registered coprocessor service: region=" +
7728 Bytes.toStringBinary(getRegionInfo().getRegionName()) +
7729 " service=" + serviceDesc.getFullName());
7730 }
7731 return true;
7732 }
7733
7734 @Override
7735 public Message execService(RpcController controller, CoprocessorServiceCall call)
7736 throws IOException {
7737 String serviceName = call.getServiceName();
7738 String methodName = call.getMethodName();
7739 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
7740 throw new UnknownProtocolException(null,
7741 "No registered coprocessor service found for name "+serviceName+
7742 " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7743 }
7744
7745 Service service = coprocessorServiceHandlers.get(serviceName);
7746 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
7747 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
7748 if (methodDesc == null) {
7749 throw new UnknownProtocolException(service.getClass(),
7750 "Unknown method "+methodName+" called on service "+serviceName+
7751 " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7752 }
7753
7754 Message.Builder builder = service.getRequestPrototype(methodDesc).newBuilderForType();
7755 ProtobufUtil.mergeFrom(builder, call.getRequest());
7756 Message request = builder.build();
7757
7758 if (coprocessorHost != null) {
7759 request = coprocessorHost.preEndpointInvocation(service, methodName, request);
7760 }
7761
7762 final Message.Builder responseBuilder =
7763 service.getResponsePrototype(methodDesc).newBuilderForType();
7764 service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
7765 @Override
7766 public void run(Message message) {
7767 if (message != null) {
7768 responseBuilder.mergeFrom(message);
7769 }
7770 }
7771 });
7772
7773 if (coprocessorHost != null) {
7774 coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
7775 }
7776
7777 IOException exception = ResponseConverter.getControllerException(controller);
7778 if (exception != null) {
7779 throw exception;
7780 }
7781
7782 return responseBuilder.build();
7783 }
7784
7785
7786
7787
7788
7789
7790 private static void processTable(final FileSystem fs, final Path p,
7791 final WALFactory walFactory, final Configuration c,
7792 final boolean majorCompact)
7793 throws IOException {
7794 HRegion region;
7795 FSTableDescriptors fst = new FSTableDescriptors(c);
7796
7797 if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
7798 final WAL wal = walFactory.getMetaWAL(
7799 HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
7800 region = HRegion.newHRegion(p, wal, fs, c,
7801 HRegionInfo.FIRST_META_REGIONINFO, fst.get(TableName.META_TABLE_NAME), null);
7802 } else {
7803 throw new IOException("Not a known catalog table: " + p.toString());
7804 }
7805 try {
7806 region.mvcc.advanceTo(region.initialize(null));
7807 if (majorCompact) {
7808 region.compact(true);
7809 } else {
7810
7811 Scan scan = new Scan();
7812
7813 RegionScanner scanner = region.getScanner(scan);
7814 try {
7815 List<Cell> kvs = new ArrayList<Cell>();
7816 boolean done;
7817 do {
7818 kvs.clear();
7819 done = scanner.next(kvs);
7820 if (kvs.size() > 0) LOG.info(kvs);
7821 } while (done);
7822 } finally {
7823 scanner.close();
7824 }
7825 }
7826 } finally {
7827 region.close();
7828 }
7829 }
7830
7831 boolean shouldForceSplit() {
7832 return this.splitRequest;
7833 }
7834
7835 byte[] getExplicitSplitPoint() {
7836 return this.explicitSplitPoint;
7837 }
7838
7839 void forceSplit(byte[] sp) {
7840
7841
7842 this.splitRequest = true;
7843 if (sp != null) {
7844 this.explicitSplitPoint = sp;
7845 }
7846 }
7847
7848 void clearSplit() {
7849 this.splitRequest = false;
7850 this.explicitSplitPoint = null;
7851 }
7852
7853
7854
7855
7856 protected void prepareToSplit() {
7857
7858 }
7859
7860
7861
7862
7863
7864
7865
7866 public byte[] checkSplit() {
7867
7868 if (this.getRegionInfo().isMetaTable() ||
7869 TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
7870 if (shouldForceSplit()) {
7871 LOG.warn("Cannot split meta region in HBase 0.20 and above");
7872 }
7873 return null;
7874 }
7875
7876
7877 if (this.isRecovering()) {
7878 LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
7879 return null;
7880 }
7881
7882 if (!splitPolicy.shouldSplit()) {
7883 return null;
7884 }
7885
7886 byte[] ret = splitPolicy.getSplitPoint();
7887
7888 if (ret != null) {
7889 try {
7890 checkRow(ret, "calculated split");
7891 } catch (IOException e) {
7892 LOG.error("Ignoring invalid split", e);
7893 return null;
7894 }
7895 }
7896 return ret;
7897 }
7898
7899
7900
7901
7902 public int getCompactPriority() {
7903 int count = Integer.MAX_VALUE;
7904 for (Store store : stores.values()) {
7905 count = Math.min(count, store.getCompactPriority());
7906 }
7907 return count;
7908 }
7909
7910
7911
7912 @Override
7913 public RegionCoprocessorHost getCoprocessorHost() {
7914 return coprocessorHost;
7915 }
7916
7917
7918 public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
7919 this.coprocessorHost = coprocessorHost;
7920 }
7921
7922 @Override
7923 public void startRegionOperation() throws IOException {
7924 startRegionOperation(Operation.ANY);
7925 }
7926
7927 @Override
7928 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
7929 justification="Intentional")
7930 public void startRegionOperation(Operation op) throws IOException {
7931 switch (op) {
7932 case GET:
7933 case SCAN:
7934 checkReadsEnabled();
7935 case INCREMENT:
7936 case APPEND:
7937 case SPLIT_REGION:
7938 case MERGE_REGION:
7939 case PUT:
7940 case DELETE:
7941 case BATCH_MUTATE:
7942 case COMPACT_REGION:
7943
7944 if (isRecovering() && (this.disallowWritesInRecovering ||
7945 (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
7946 throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() +
7947 " is recovering; cannot take reads");
7948 }
7949 break;
7950 default:
7951 break;
7952 }
7953 if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
7954 || op == Operation.COMPACT_REGION) {
7955
7956
7957 return;
7958 }
7959 if (this.closing.get()) {
7960 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
7961 }
7962 lock(lock.readLock());
7963 if (this.closed.get()) {
7964 lock.readLock().unlock();
7965 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
7966 }
7967 try {
7968 if (coprocessorHost != null) {
7969 coprocessorHost.postStartRegionOperation(op);
7970 }
7971 } catch (Exception e) {
7972 lock.readLock().unlock();
7973 throw new IOException(e);
7974 }
7975 }
7976
7977 @Override
7978 public void closeRegionOperation() throws IOException {
7979 closeRegionOperation(Operation.ANY);
7980 }
7981
7982
7983
7984
7985
7986
7987 public void closeRegionOperation(Operation operation) throws IOException {
7988 lock.readLock().unlock();
7989 if (coprocessorHost != null) {
7990 coprocessorHost.postCloseRegionOperation(operation);
7991 }
7992 }
7993
7994
7995
7996
7997
7998
7999
8000
8001
8002
8003 private void startBulkRegionOperation(boolean writeLockNeeded)
8004 throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
8005 if (this.closing.get()) {
8006 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
8007 }
8008 if (writeLockNeeded) lock(lock.writeLock());
8009 else lock(lock.readLock());
8010 if (this.closed.get()) {
8011 if (writeLockNeeded) lock.writeLock().unlock();
8012 else lock.readLock().unlock();
8013 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
8014 }
8015 }
8016
8017
8018
8019
8020
8021 private void closeBulkRegionOperation(){
8022 if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
8023 else lock.readLock().unlock();
8024 }
8025
8026
8027
8028
8029
8030 private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
8031 numMutationsWithoutWAL.increment();
8032 if (numMutationsWithoutWAL.get() <= 1) {
8033 LOG.info("writing data to region " + this +
8034 " with WAL disabled. Data may be lost in the event of a crash.");
8035 }
8036
8037 long mutationSize = 0;
8038 for (List<Cell> cells: familyMap.values()) {
8039 assert cells instanceof RandomAccess;
8040 int listSize = cells.size();
8041 for (int i=0; i < listSize; i++) {
8042 Cell cell = cells.get(i);
8043
8044 mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
8045 }
8046 }
8047
8048 dataInMemoryWithoutWAL.add(mutationSize);
8049 }
8050
8051 private void lock(final Lock lock)
8052 throws RegionTooBusyException, InterruptedIOException {
8053 lock(lock, 1);
8054 }
8055
8056
8057
8058
8059
8060
8061 private void lock(final Lock lock, final int multiplier)
8062 throws RegionTooBusyException, InterruptedIOException {
8063 try {
8064 final long waitTime = Math.min(maxBusyWaitDuration,
8065 busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
8066 if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
8067 throw new RegionTooBusyException(
8068 "failed to get a lock in " + waitTime + " ms. " +
8069 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
8070 this.getRegionInfo().getRegionNameAsString()) +
8071 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
8072 this.getRegionServerServices().getServerName()));
8073 }
8074 } catch (InterruptedException ie) {
8075 LOG.info("Interrupted while waiting for a lock");
8076 InterruptedIOException iie = new InterruptedIOException();
8077 iie.initCause(ie);
8078 throw iie;
8079 }
8080 }
8081
8082
8083
8084
8085
8086
8087
8088 private void syncOrDefer(long txid, Durability durability) throws IOException {
8089 if (this.getRegionInfo().isMetaRegion()) {
8090 this.wal.sync(txid);
8091 } else {
8092 switch(durability) {
8093 case USE_DEFAULT:
8094
8095 if (shouldSyncWAL()) {
8096 this.wal.sync(txid);
8097 }
8098 break;
8099 case SKIP_WAL:
8100
8101 break;
8102 case ASYNC_WAL:
8103
8104 break;
8105 case SYNC_WAL:
8106 case FSYNC_WAL:
8107
8108 this.wal.sync(txid);
8109 break;
8110 default:
8111 throw new RuntimeException("Unknown durability " + durability);
8112 }
8113 }
8114 }
8115
8116
8117
8118
8119 private boolean shouldSyncWAL() {
8120 return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
8121 }
8122
8123
8124
8125
8126 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
8127
8128 @Override
8129 public void add(int index, Cell element) {
8130
8131 }
8132
8133 @Override
8134 public boolean addAll(int index, Collection<? extends Cell> c) {
8135 return false;
8136 }
8137
8138 @Override
8139 public KeyValue get(int index) {
8140 throw new UnsupportedOperationException();
8141 }
8142
8143 @Override
8144 public int size() {
8145 return 0;
8146 }
8147 };
8148
8149
8150
8151
8152
8153
8154
8155
8156
8157
8158 public static void main(String[] args) throws IOException {
8159 if (args.length < 1) {
8160 printUsageAndExit(null);
8161 }
8162 boolean majorCompact = false;
8163 if (args.length > 1) {
8164 if (!args[1].toLowerCase().startsWith("major")) {
8165 printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
8166 }
8167 majorCompact = true;
8168 }
8169 final Path tableDir = new Path(args[0]);
8170 final Configuration c = HBaseConfiguration.create();
8171 final FileSystem fs = FileSystem.get(c);
8172 final Path logdir = new Path(c.get("hbase.tmp.dir"));
8173 final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
8174
8175 final Configuration walConf = new Configuration(c);
8176 FSUtils.setRootDir(walConf, logdir);
8177 final WALFactory wals = new WALFactory(walConf, null, logname);
8178 try {
8179 processTable(fs, tableDir, wals, c, majorCompact);
8180 } finally {
8181 wals.close();
8182
8183 BlockCache bc = new CacheConfig(c).getBlockCache();
8184 if (bc != null) bc.shutdown();
8185 }
8186 }
8187
8188 @Override
8189 public long getOpenSeqNum() {
8190 return this.openSeqNum;
8191 }
8192
8193 @Override
8194 public Map<byte[], Long> getMaxStoreSeqId() {
8195 return this.maxSeqIdInStores;
8196 }
8197
8198 @Override
8199 public long getOldestSeqIdOfStore(byte[] familyName) {
8200 return wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName);
8201 }
8202
8203 @Override
8204 public CompactionState getCompactionState() {
8205 boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
8206 return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
8207 : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
8208 }
8209
8210 public void reportCompactionRequestStart(boolean isMajor){
8211 (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
8212 }
8213
8214 public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
8215 int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
8216
8217
8218 compactionsFinished.incrementAndGet();
8219 compactionNumFilesCompacted.addAndGet(numFiles);
8220 compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
8221
8222 assert newValue >= 0;
8223 }
8224
8225
8226
8227
8228
8229 @VisibleForTesting
8230 public long getSequenceId() {
8231 return this.mvcc.getReadPoint();
8232 }
8233
8234
8235
8236
8237
8238
8239
8240
8241
8242 private WALKey appendEmptyEdit(final WAL wal) throws IOException {
8243
8244 @SuppressWarnings("deprecation")
8245 WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
8246 getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
8247 HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
8248
8249
8250
8251 try {
8252 wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false);
8253 } catch (Throwable t) {
8254
8255 getMVCC().complete(key.getWriteEntry());
8256 }
8257 return key;
8258 }
8259
8260
8261
8262
8263 @Override
8264 public void onConfigurationChange(Configuration conf) {
8265
8266 }
8267
8268
8269
8270
8271 @Override
8272 public void registerChildren(ConfigurationManager manager) {
8273 configurationManager = Optional.of(manager);
8274 for (Store s : this.stores.values()) {
8275 configurationManager.get().registerObserver(s);
8276 }
8277 }
8278
8279
8280
8281
8282 @Override
8283 public void deregisterChildren(ConfigurationManager manager) {
8284 for (Store s : this.stores.values()) {
8285 configurationManager.get().deregisterObserver(s);
8286 }
8287 }
8288
8289
8290
8291
8292 public RegionSplitPolicy getSplitPolicy() {
8293 return this.splitPolicy;
8294 }
8295 }