1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.io.UnsupportedEncodingException;
25 import java.lang.reflect.Constructor;
26 import java.text.ParseException;
27 import java.util.AbstractList;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.NavigableMap;
36 import java.util.NavigableSet;
37 import java.util.Set;
38 import java.util.TreeMap;
39 import java.util.UUID;
40 import java.util.concurrent.Callable;
41 import java.util.concurrent.CompletionService;
42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentSkipListMap;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.ExecutorCompletionService;
47 import java.util.concurrent.ExecutorService;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.Future;
50 import java.util.concurrent.FutureTask;
51 import java.util.concurrent.ThreadFactory;
52 import java.util.concurrent.ThreadPoolExecutor;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.TimeoutException;
55 import java.util.concurrent.atomic.AtomicBoolean;
56 import java.util.concurrent.atomic.AtomicInteger;
57 import java.util.concurrent.atomic.AtomicLong;
58 import java.util.concurrent.locks.Lock;
59 import java.util.concurrent.locks.ReentrantReadWriteLock;
60
61 import org.apache.commons.logging.Log;
62 import org.apache.commons.logging.LogFactory;
63 import org.apache.hadoop.classification.InterfaceAudience;
64 import org.apache.hadoop.conf.Configuration;
65 import org.apache.hadoop.fs.FileStatus;
66 import org.apache.hadoop.fs.FileSystem;
67 import org.apache.hadoop.fs.Path;
68 import org.apache.hadoop.hbase.Cell;
69 import org.apache.hadoop.hbase.CellScanner;
70 import org.apache.hadoop.hbase.CellUtil;
71 import org.apache.hadoop.hbase.CompoundConfiguration;
72 import org.apache.hadoop.hbase.DoNotRetryIOException;
73 import org.apache.hadoop.hbase.DroppedSnapshotException;
74 import org.apache.hadoop.hbase.HBaseConfiguration;
75 import org.apache.hadoop.hbase.HColumnDescriptor;
76 import org.apache.hadoop.hbase.HConstants;
77 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
78 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
79 import org.apache.hadoop.hbase.HRegionInfo;
80 import org.apache.hadoop.hbase.HTableDescriptor;
81 import org.apache.hadoop.hbase.KeyValue;
82 import org.apache.hadoop.hbase.KeyValueUtil;
83 import org.apache.hadoop.hbase.NotServingRegionException;
84 import org.apache.hadoop.hbase.RegionTooBusyException;
85 import org.apache.hadoop.hbase.TableName;
86 import org.apache.hadoop.hbase.UnknownScannerException;
87 import org.apache.hadoop.hbase.backup.HFileArchiver;
88 import org.apache.hadoop.hbase.client.Append;
89 import org.apache.hadoop.hbase.client.Delete;
90 import org.apache.hadoop.hbase.client.Durability;
91 import org.apache.hadoop.hbase.client.Get;
92 import org.apache.hadoop.hbase.client.Increment;
93 import org.apache.hadoop.hbase.client.IsolationLevel;
94 import org.apache.hadoop.hbase.client.Mutation;
95 import org.apache.hadoop.hbase.client.Put;
96 import org.apache.hadoop.hbase.client.Result;
97 import org.apache.hadoop.hbase.client.RowMutations;
98 import org.apache.hadoop.hbase.client.Scan;
99 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
100 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
101 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
102 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
103 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
104 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
105 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
106 import org.apache.hadoop.hbase.filter.FilterWrapper;
107 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
108 import org.apache.hadoop.hbase.io.HeapSize;
109 import org.apache.hadoop.hbase.io.TimeRange;
110 import org.apache.hadoop.hbase.io.hfile.BlockCache;
111 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
112 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
113 import org.apache.hadoop.hbase.ipc.RpcCallContext;
114 import org.apache.hadoop.hbase.ipc.RpcServer;
115 import org.apache.hadoop.hbase.master.AssignmentManager;
116 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
117 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
119 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
121 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
122 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
123 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
124 import org.apache.hadoop.hbase.regionserver.wal.HLog;
125 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
126 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
127 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
128 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay;
129 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
130 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
131 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
132 import org.apache.hadoop.hbase.util.Bytes;
133 import org.apache.hadoop.hbase.util.CancelableProgressable;
134 import org.apache.hadoop.hbase.util.ClassSize;
135 import org.apache.hadoop.hbase.util.CompressionTest;
136 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
137 import org.apache.hadoop.hbase.util.FSUtils;
138 import org.apache.hadoop.hbase.util.HashedBytes;
139 import org.apache.hadoop.hbase.util.Pair;
140 import org.apache.hadoop.hbase.util.Threads;
141 import org.apache.hadoop.io.MultipleIOException;
142 import org.apache.hadoop.util.StringUtils;
143 import org.cliffc.high_scale_lib.Counter;
144
145 import com.google.common.annotations.VisibleForTesting;
146 import com.google.common.base.Preconditions;
147 import com.google.common.collect.Lists;
148 import com.google.common.collect.Maps;
149 import com.google.common.io.Closeables;
150 import com.google.protobuf.Descriptors;
151 import com.google.protobuf.Message;
152 import com.google.protobuf.RpcCallback;
153 import com.google.protobuf.RpcController;
154 import com.google.protobuf.Service;
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192 @InterfaceAudience.Private
193 public class HRegion implements HeapSize {
194 public static final Log LOG = LogFactory.getLog(HRegion.class);
195
196 public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
197 "hbase.hregion.scan.loadColumnFamiliesOnDemand";
198
199
200
201
202
203 private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
204
205 final AtomicBoolean closed = new AtomicBoolean(false);
206
207
208
209
210
211 final AtomicBoolean closing = new AtomicBoolean(false);
212
213 protected volatile long completeSequenceId = -1L;
214
215
216
217
218
219
220 private final AtomicLong sequenceId = new AtomicLong(-1L);
221
222
223
224
225
226
227
228 public enum Operation {
229 ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
230 REPLAY_BATCH_MUTATE, COMPACT_REGION
231 }
232
233
234
235
236
237
238
239
240
241
242 private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
243 new ConcurrentHashMap<HashedBytes, RowLockContext>();
244
245 protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
246 Bytes.BYTES_RAWCOMPARATOR);
247
248
249 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
250
251 public final AtomicLong memstoreSize = new AtomicLong(0);
252
253
254 final Counter numMutationsWithoutWAL = new Counter();
255 final Counter dataInMemoryWithoutWAL = new Counter();
256
257
258 final Counter checkAndMutateChecksPassed = new Counter();
259 final Counter checkAndMutateChecksFailed = new Counter();
260
261
262 final Counter readRequestsCount = new Counter();
263 final Counter writeRequestsCount = new Counter();
264
265
266 final AtomicLong compactionsFinished = new AtomicLong(0L);
267 final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
268 final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
269
270
271 private final HLog log;
272 private final HRegionFileSystem fs;
273 protected final Configuration conf;
274 private final Configuration baseConf;
275 private final KeyValue.KVComparator comparator;
276 private final int rowLockWaitDuration;
277 static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
278
279
280
281
282
283
284
285 final long busyWaitDuration;
286 static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
287
288
289
290
291 final int maxBusyWaitMultiplier;
292
293
294
295 final long maxBusyWaitDuration;
296
297
298 static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
299 final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
300
301 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
302
303
304
305
306 private long openSeqNum = HConstants.NO_SEQNUM;
307
308
309
310
311
312 private boolean isLoadingCfsOnDemandDefault = false;
313
314 private final AtomicInteger majorInProgress = new AtomicInteger(0);
315 private final AtomicInteger minorInProgress = new AtomicInteger(0);
316
317
318
319
320
321
322
323 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
324
325
326
327
328 private boolean disallowWritesInRecovering = false;
329
330
331 private volatile boolean isRecovering = false;
332
333
334
335
336
337
338 public long getSmallestReadPoint() {
339 long minimumReadPoint;
340
341
342
343 synchronized(scannerReadPoints) {
344 minimumReadPoint = mvcc.memstoreReadPoint();
345
346 for (Long readPoint: this.scannerReadPoints.values()) {
347 if (readPoint < minimumReadPoint) {
348 minimumReadPoint = readPoint;
349 }
350 }
351 }
352 return minimumReadPoint;
353 }
354
355
356
357
358 static class WriteState {
359
360 volatile boolean flushing = false;
361
362 volatile boolean flushRequested = false;
363
364 volatile int compacting = 0;
365
366 volatile boolean writesEnabled = true;
367
368 volatile boolean readOnly = false;
369
370
371
372
373
374
375 synchronized void setReadOnly(final boolean onOff) {
376 this.writesEnabled = !onOff;
377 this.readOnly = onOff;
378 }
379
380 boolean isReadOnly() {
381 return this.readOnly;
382 }
383
384 boolean isFlushRequested() {
385 return this.flushRequested;
386 }
387
388 static final long HEAP_SIZE = ClassSize.align(
389 ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
390 }
391
392
393
394
395
396
397
398 public static class FlushResult {
399 enum Result {
400 FLUSHED_NO_COMPACTION_NEEDED,
401 FLUSHED_COMPACTION_NEEDED,
402
403
404 CANNOT_FLUSH_MEMSTORE_EMPTY,
405 CANNOT_FLUSH
406
407 }
408
409 final Result result;
410 final String failureReason;
411 final long flushSequenceId;
412
413
414
415
416
417
418
419
420 FlushResult(Result result, long flushSequenceId) {
421 this(result, flushSequenceId, null);
422 assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
423 .FLUSHED_COMPACTION_NEEDED;
424 }
425
426
427
428
429
430
431 FlushResult(Result result, String failureReason) {
432 this(result, -1, failureReason);
433 assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
434 }
435
436
437
438
439
440
441
442 FlushResult(Result result, long flushSequenceId, String failureReason) {
443 this.result = result;
444 this.flushSequenceId = flushSequenceId;
445 this.failureReason = failureReason;
446 }
447
448
449
450
451
452
453 public boolean isFlushSucceeded() {
454 return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
455 .FLUSHED_COMPACTION_NEEDED;
456 }
457
458
459
460
461
462 public boolean isCompactionNeeded() {
463 return result == Result.FLUSHED_COMPACTION_NEEDED;
464 }
465 }
466
467 final WriteState writestate = new WriteState();
468
469 long memstoreFlushSize;
470 final long timestampSlop;
471 final long rowProcessorTimeout;
472 private volatile long lastFlushTime;
473 final RegionServerServices rsServices;
474 private RegionServerAccounting rsAccounting;
475 private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
476 private long flushCheckInterval;
477
478 private long flushPerChanges;
479 private long blockingMemStoreSize;
480 final long threadWakeFrequency;
481
482 final ReentrantReadWriteLock lock =
483 new ReentrantReadWriteLock();
484
485
486 private final ReentrantReadWriteLock updatesLock =
487 new ReentrantReadWriteLock();
488 private boolean splitRequest;
489 private byte[] explicitSplitPoint = null;
490
491 private final MultiVersionConsistencyControl mvcc =
492 new MultiVersionConsistencyControl();
493
494
495 private RegionCoprocessorHost coprocessorHost;
496
497 private HTableDescriptor htableDescriptor = null;
498 private RegionSplitPolicy splitPolicy;
499
500 private final MetricsRegion metricsRegion;
501 private final MetricsRegionWrapperImpl metricsRegionWrapper;
502 private final Durability durability;
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525 @Deprecated
526 public HRegion(final Path tableDir, final HLog log, final FileSystem fs,
527 final Configuration confParam, final HRegionInfo regionInfo,
528 final HTableDescriptor htd, final RegionServerServices rsServices) {
529 this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
530 log, confParam, htd, rsServices);
531 }
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550 public HRegion(final HRegionFileSystem fs, final HLog log, final Configuration confParam,
551 final HTableDescriptor htd, final RegionServerServices rsServices) {
552 if (htd == null) {
553 throw new IllegalArgumentException("Need table descriptor");
554 }
555
556 if (confParam instanceof CompoundConfiguration) {
557 throw new IllegalArgumentException("Need original base configuration");
558 }
559
560 this.comparator = fs.getRegionInfo().getComparator();
561 this.log = log;
562 this.fs = fs;
563
564
565 this.baseConf = confParam;
566 this.conf = new CompoundConfiguration()
567 .add(confParam)
568 .addStringMap(htd.getConfiguration())
569 .addWritableMap(htd.getValues());
570 this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
571 DEFAULT_CACHE_FLUSH_INTERVAL);
572 this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
573 if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
574 throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
575 + MAX_FLUSH_PER_CHANGES);
576 }
577
578 this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
579 DEFAULT_ROWLOCK_WAIT_DURATION);
580
581 this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
582 this.htableDescriptor = htd;
583 this.rsServices = rsServices;
584 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
585 setHTableSpecificConf();
586 this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
587
588 this.busyWaitDuration = conf.getLong(
589 "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
590 this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
591 if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
592 throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
593 + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
594 + maxBusyWaitMultiplier + "). Their product should be positive");
595 }
596 this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
597 conf.getLong("ipc.client.call.purge.timeout", 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
598
599
600
601
602
603
604
605 this.timestampSlop = conf.getLong(
606 "hbase.hregion.keyvalue.timestamp.slop.millisecs",
607 HConstants.LATEST_TIMESTAMP);
608
609
610
611
612
613 this.rowProcessorTimeout = conf.getLong(
614 "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
615 this.durability = htd.getDurability() == Durability.USE_DEFAULT
616 ? DEFAULT_DURABLITY
617 : htd.getDurability();
618 if (rsServices != null) {
619 this.rsAccounting = this.rsServices.getRegionServerAccounting();
620
621
622 this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
623 this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
624 this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
625
626 Map<String, HRegion> recoveringRegions = rsServices.getRecoveringRegions();
627 String encodedName = getRegionInfo().getEncodedName();
628 if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
629 this.isRecovering = true;
630 recoveringRegions.put(encodedName, this);
631 }
632 } else {
633 this.metricsRegionWrapper = null;
634 this.metricsRegion = null;
635 }
636 if (LOG.isDebugEnabled()) {
637
638 LOG.debug("Instantiated " + this);
639 }
640
641
642 this.disallowWritesInRecovering =
643 conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
644 HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
645 }
646
647 void setHTableSpecificConf() {
648 if (this.htableDescriptor == null) return;
649 long flushSize = this.htableDescriptor.getMemStoreFlushSize();
650
651 if (flushSize <= 0) {
652 flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
653 HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
654 }
655 this.memstoreFlushSize = flushSize;
656 this.blockingMemStoreSize = this.memstoreFlushSize *
657 conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
658 }
659
660
661
662
663
664
665
666
667
668 @Deprecated
669 public long initialize() throws IOException {
670 return initialize(null);
671 }
672
673
674
675
676
677
678
679
680 private long initialize(final CancelableProgressable reporter) throws IOException {
681 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
682 long nextSeqId = -1;
683 try {
684 nextSeqId = initializeRegionInternals(reporter, status);
685 return nextSeqId;
686 } finally {
687
688
689 if (nextSeqId == -1) {
690 status
691 .abort("Exception during region " + this.getRegionNameAsString() + " initialization.");
692 }
693 }
694 }
695
696 private long initializeRegionInternals(final CancelableProgressable reporter,
697 final MonitoredTask status) throws IOException, UnsupportedEncodingException {
698 if (coprocessorHost != null) {
699 status.setStatus("Running coprocessor pre-open hook");
700 coprocessorHost.preOpen();
701 }
702
703
704 status.setStatus("Writing region info on filesystem");
705 fs.checkRegionInfoOnFilesystem();
706
707
708 status.setStatus("Cleaning up temporary data from old regions");
709 fs.cleanupTempDir();
710
711
712 status.setStatus("Initializing all the Stores");
713 long maxSeqId = initializeRegionStores(reporter, status);
714
715 status.setStatus("Cleaning up detritus from prior splits");
716
717
718
719 fs.cleanupAnySplitDetritus();
720 fs.cleanupMergesDir();
721
722 this.writestate.setReadOnly(this.htableDescriptor.isReadOnly());
723 this.writestate.flushRequested = false;
724 this.writestate.compacting = 0;
725
726
727 this.splitPolicy = RegionSplitPolicy.create(this, conf);
728
729 this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
730
731
732 long nextSeqid = maxSeqId + 1;
733 if (this.isRecovering) {
734
735
736
737 nextSeqid += this.flushPerChanges + 10000000;
738 }
739 LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
740 "; next sequenceid=" + nextSeqid);
741
742
743 this.closing.set(false);
744 this.closed.set(false);
745
746 if (coprocessorHost != null) {
747 status.setStatus("Running coprocessor post-open hooks");
748 coprocessorHost.postOpen();
749 }
750
751 status.markComplete("Region opened successfully");
752 return nextSeqid;
753 }
754
755 private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
756 throws IOException, UnsupportedEncodingException {
757
758
759 long maxSeqId = -1;
760
761 long maxMemstoreTS = -1;
762
763 if (!htableDescriptor.getFamilies().isEmpty()) {
764
765 ThreadPoolExecutor storeOpenerThreadPool =
766 getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
767 CompletionService<HStore> completionService =
768 new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
769
770
771 for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
772 status.setStatus("Instantiating store for column family " + family);
773 completionService.submit(new Callable<HStore>() {
774 @Override
775 public HStore call() throws IOException {
776 return instantiateHStore(family);
777 }
778 });
779 }
780 boolean allStoresOpened = false;
781 try {
782 for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
783 Future<HStore> future = completionService.take();
784 HStore store = future.get();
785 this.stores.put(store.getColumnFamilyName().getBytes(), store);
786
787 long storeMaxSequenceId = store.getMaxSequenceId();
788 maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
789 storeMaxSequenceId);
790 if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
791 maxSeqId = storeMaxSequenceId;
792 }
793 long maxStoreMemstoreTS = store.getMaxMemstoreTS();
794 if (maxStoreMemstoreTS > maxMemstoreTS) {
795 maxMemstoreTS = maxStoreMemstoreTS;
796 }
797 }
798 allStoresOpened = true;
799 } catch (InterruptedException e) {
800 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
801 } catch (ExecutionException e) {
802 throw new IOException(e.getCause());
803 } finally {
804 storeOpenerThreadPool.shutdownNow();
805 if (!allStoresOpened) {
806
807 LOG.error("Could not initialize all stores for the region=" + this);
808 for (Store store : this.stores.values()) {
809 try {
810 store.close();
811 } catch (IOException e) {
812 LOG.warn(e.getMessage());
813 }
814 }
815 }
816 }
817 }
818 mvcc.initialize(maxMemstoreTS + 1);
819
820 maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
821 this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
822 return maxSeqId;
823 }
824
825
826
827
828 public boolean hasReferences() {
829 for (Store store : this.stores.values()) {
830 if (store.hasReferences()) return true;
831 }
832 return false;
833 }
834
835
836
837
838
839
840 public HDFSBlocksDistribution getHDFSBlocksDistribution() {
841 HDFSBlocksDistribution hdfsBlocksDistribution =
842 new HDFSBlocksDistribution();
843 synchronized (this.stores) {
844 for (Store store : this.stores.values()) {
845 for (StoreFile sf : store.getStorefiles()) {
846 HDFSBlocksDistribution storeFileBlocksDistribution =
847 sf.getHDFSBlockDistribution();
848 hdfsBlocksDistribution.add(storeFileBlocksDistribution);
849 }
850 }
851 }
852 return hdfsBlocksDistribution;
853 }
854
855
856
857
858
859
860
861
862
863 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
864 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
865 Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
866 return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
867 }
868
869
870
871
872
873
874
875
876
877
878 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
879 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath)
880 throws IOException {
881 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
882 FileSystem fs = tablePath.getFileSystem(conf);
883
884 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
885 for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
886 Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
887 if (storeFiles == null) continue;
888
889 for (StoreFileInfo storeFileInfo : storeFiles) {
890 hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
891 }
892 }
893 return hdfsBlocksDistribution;
894 }
895
896 public AtomicLong getMemstoreSize() {
897 return memstoreSize;
898 }
899
900
901
902
903
904
905
906 public long addAndGetGlobalMemstoreSize(long memStoreSize) {
907 if (this.rsAccounting != null) {
908 rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
909 }
910 return this.memstoreSize.addAndGet(memStoreSize);
911 }
912
913
914 public HRegionInfo getRegionInfo() {
915 return this.fs.getRegionInfo();
916 }
917
918
919
920
921
922 RegionServerServices getRegionServerServices() {
923 return this.rsServices;
924 }
925
926
927 long getReadRequestsCount() {
928 return this.readRequestsCount.get();
929 }
930
931
932 long getWriteRequestsCount() {
933 return this.writeRequestsCount.get();
934 }
935
936 MetricsRegion getMetrics() {
937 return metricsRegion;
938 }
939
940
941 public boolean isClosed() {
942 return this.closed.get();
943 }
944
945
946
947
948 public boolean isClosing() {
949 return this.closing.get();
950 }
951
952
953
954
955
956 public void setRecovering(boolean newState) {
957 boolean wasRecovering = this.isRecovering;
958 this.isRecovering = newState;
959 if (wasRecovering && !isRecovering) {
960
961 coprocessorHost.postLogReplay();
962 }
963 }
964
965
966
967
968 public boolean isRecovering() {
969 return this.isRecovering;
970 }
971
972
973 public boolean isAvailable() {
974 return !isClosed() && !isClosing();
975 }
976
977
978 public boolean isSplittable() {
979 return isAvailable() && !hasReferences();
980 }
981
982
983
984
985 public boolean isMergeable() {
986 if (!isAvailable()) {
987 LOG.debug("Region " + this.getRegionNameAsString()
988 + " is not mergeable because it is closing or closed");
989 return false;
990 }
991 if (hasReferences()) {
992 LOG.debug("Region " + this.getRegionNameAsString()
993 + " is not mergeable because it has references");
994 return false;
995 }
996
997 return true;
998 }
999
1000 public boolean areWritesEnabled() {
1001 synchronized(this.writestate) {
1002 return this.writestate.writesEnabled;
1003 }
1004 }
1005
1006 public MultiVersionConsistencyControl getMVCC() {
1007 return mvcc;
1008 }
1009
1010
1011
1012
1013 public long getReadpoint(IsolationLevel isolationLevel) {
1014 if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1015
1016 return Long.MAX_VALUE;
1017 }
1018 return mvcc.memstoreReadPoint();
1019 }
1020
1021 public boolean isLoadingCfsOnDemandDefault() {
1022 return this.isLoadingCfsOnDemandDefault;
1023 }
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038 public Map<byte[], List<StoreFile>> close() throws IOException {
1039 return close(false);
1040 }
1041
1042 private final Object closeLock = new Object();
1043
1044
1045 public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1046 "hbase.regionserver.optionalcacheflushinterval";
1047
1048 public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1049
1050
1051 public static final String MEMSTORE_FLUSH_PER_CHANGES =
1052 "hbase.regionserver.flush.per.changes";
1053 public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000;
1054
1055
1056
1057
1058 public static final long MAX_FLUSH_PER_CHANGES = 1000000000;
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074 public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1075
1076
1077 MonitoredTask status = TaskMonitor.get().createStatus(
1078 "Closing region " + this +
1079 (abort ? " due to abort" : ""));
1080
1081 status.setStatus("Waiting for close lock");
1082 try {
1083 synchronized (closeLock) {
1084 return doClose(abort, status);
1085 }
1086 } finally {
1087 status.cleanup();
1088 }
1089 }
1090
1091 private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1092 throws IOException {
1093 if (isClosed()) {
1094 LOG.warn("Region " + this + " already closed");
1095 return null;
1096 }
1097
1098 if (coprocessorHost != null) {
1099 status.setStatus("Running coprocessor pre-close hooks");
1100 this.coprocessorHost.preClose(abort);
1101 }
1102
1103 status.setStatus("Disabling compacts and flushes for region");
1104 synchronized (writestate) {
1105
1106
1107 writestate.writesEnabled = false;
1108 LOG.debug("Closing " + this + ": disabling compactions & flushes");
1109 waitForFlushesAndCompactions();
1110 }
1111
1112
1113
1114 if (!abort && worthPreFlushing()) {
1115 status.setStatus("Pre-flushing region before close");
1116 LOG.info("Running close preflush of " + this.getRegionNameAsString());
1117 try {
1118 internalFlushcache(status);
1119 } catch (IOException ioe) {
1120
1121 status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1122 }
1123 }
1124
1125 this.closing.set(true);
1126 status.setStatus("Disabling writes for close");
1127
1128 lock.writeLock().lock();
1129 try {
1130 if (this.isClosed()) {
1131 status.abort("Already got closed by another process");
1132
1133 return null;
1134 }
1135 LOG.debug("Updates disabled for region " + this);
1136
1137 if (!abort) {
1138 int flushCount = 0;
1139 while (this.getMemstoreSize().get() > 0) {
1140 try {
1141 if (flushCount++ > 0) {
1142 int actualFlushes = flushCount - 1;
1143 if (actualFlushes > 5) {
1144
1145
1146 throw new DroppedSnapshotException("Failed clearing memory after " +
1147 actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
1148 }
1149 LOG.info("Running extra flush, " + actualFlushes +
1150 " (carrying snapshot?) " + this);
1151 }
1152 internalFlushcache(status);
1153 } catch (IOException ioe) {
1154 status.setStatus("Failed flush " + this + ", putting online again");
1155 synchronized (writestate) {
1156 writestate.writesEnabled = true;
1157 }
1158
1159 throw ioe;
1160 }
1161 }
1162 }
1163
1164 Map<byte[], List<StoreFile>> result =
1165 new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1166 if (!stores.isEmpty()) {
1167
1168 ThreadPoolExecutor storeCloserThreadPool =
1169 getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
1170 CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1171 new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1172
1173
1174 for (final Store store : stores.values()) {
1175 assert abort? true: store.getFlushableSize() == 0;
1176 completionService
1177 .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1178 @Override
1179 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1180 return new Pair<byte[], Collection<StoreFile>>(
1181 store.getFamily().getName(), store.close());
1182 }
1183 });
1184 }
1185 try {
1186 for (int i = 0; i < stores.size(); i++) {
1187 Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1188 Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1189 List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1190 if (familyFiles == null) {
1191 familyFiles = new ArrayList<StoreFile>();
1192 result.put(storeFiles.getFirst(), familyFiles);
1193 }
1194 familyFiles.addAll(storeFiles.getSecond());
1195 }
1196 } catch (InterruptedException e) {
1197 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1198 } catch (ExecutionException e) {
1199 throw new IOException(e.getCause());
1200 } finally {
1201 storeCloserThreadPool.shutdownNow();
1202 }
1203 }
1204 this.closed.set(true);
1205 if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
1206 if (coprocessorHost != null) {
1207 status.setStatus("Running coprocessor post-close hooks");
1208 this.coprocessorHost.postClose(abort);
1209 }
1210 if ( this.metricsRegion != null) {
1211 this.metricsRegion.close();
1212 }
1213 if ( this.metricsRegionWrapper != null) {
1214 Closeables.closeQuietly(this.metricsRegionWrapper);
1215 }
1216 status.markComplete("Closed");
1217 LOG.info("Closed " + this);
1218 return result;
1219 } finally {
1220 lock.writeLock().unlock();
1221 }
1222 }
1223
1224
1225
1226
1227
1228
1229 public void waitForFlushesAndCompactions() {
1230 synchronized (writestate) {
1231 while (writestate.compacting > 0 || writestate.flushing) {
1232 LOG.debug("waiting for " + writestate.compacting + " compactions"
1233 + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1234 try {
1235 writestate.wait();
1236 } catch (InterruptedException iex) {
1237
1238 Thread.currentThread().interrupt();
1239 }
1240 }
1241 }
1242 }
1243
1244 protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1245 final String threadNamePrefix) {
1246 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1247 int maxThreads = Math.min(numStores,
1248 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1249 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1250 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1251 }
1252
1253 protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1254 final String threadNamePrefix) {
1255 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1256 int maxThreads = Math.max(1,
1257 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1258 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1259 / numStores);
1260 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1261 }
1262
1263 static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1264 final String threadNamePrefix) {
1265 return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1266 new ThreadFactory() {
1267 private int count = 1;
1268
1269 @Override
1270 public Thread newThread(Runnable r) {
1271 return new Thread(r, threadNamePrefix + "-" + count++);
1272 }
1273 });
1274 }
1275
1276
1277
1278
1279 private boolean worthPreFlushing() {
1280 return this.memstoreSize.get() >
1281 this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1282 }
1283
1284
1285
1286
1287
1288
1289 public byte [] getStartKey() {
1290 return this.getRegionInfo().getStartKey();
1291 }
1292
1293
1294 public byte [] getEndKey() {
1295 return this.getRegionInfo().getEndKey();
1296 }
1297
1298
1299 public long getRegionId() {
1300 return this.getRegionInfo().getRegionId();
1301 }
1302
1303
1304 public byte [] getRegionName() {
1305 return this.getRegionInfo().getRegionName();
1306 }
1307
1308
1309 public String getRegionNameAsString() {
1310 return this.getRegionInfo().getRegionNameAsString();
1311 }
1312
1313
1314 public HTableDescriptor getTableDesc() {
1315 return this.htableDescriptor;
1316 }
1317
1318
1319 public HLog getLog() {
1320 return this.log;
1321 }
1322
1323
1324
1325
1326
1327
1328
1329
1330 Configuration getBaseConf() {
1331 return this.baseConf;
1332 }
1333
1334
1335 public FileSystem getFilesystem() {
1336 return fs.getFileSystem();
1337 }
1338
1339
1340 public HRegionFileSystem getRegionFileSystem() {
1341 return this.fs;
1342 }
1343
1344
1345 public long getLastFlushTime() {
1346 return this.lastFlushTime;
1347 }
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357 public long getLargestHStoreSize() {
1358 long size = 0;
1359 for (Store h : stores.values()) {
1360 long storeSize = h.getSize();
1361 if (storeSize > size) {
1362 size = storeSize;
1363 }
1364 }
1365 return size;
1366 }
1367
1368
1369
1370
1371 public KeyValue.KVComparator getComparator() {
1372 return this.comparator;
1373 }
1374
1375
1376
1377
1378
1379 protected void doRegionCompactionPrep() throws IOException {
1380 }
1381
1382 void triggerMajorCompaction() {
1383 for (Store h : stores.values()) {
1384 h.triggerMajorCompaction();
1385 }
1386 }
1387
1388
1389
1390
1391
1392
1393
1394
1395 public void compactStores(final boolean majorCompaction)
1396 throws IOException {
1397 if (majorCompaction) {
1398 this.triggerMajorCompaction();
1399 }
1400 compactStores();
1401 }
1402
1403
1404
1405
1406
1407
1408
1409 public void compactStores() throws IOException {
1410 for (Store s : getStores().values()) {
1411 CompactionContext compaction = s.requestCompaction();
1412 if (compaction != null) {
1413 compact(compaction, s);
1414 }
1415 }
1416 }
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433 public boolean compact(CompactionContext compaction, Store store) throws IOException {
1434 assert compaction != null && compaction.hasSelection();
1435 assert !compaction.getRequest().getFiles().isEmpty();
1436 if (this.closing.get() || this.closed.get()) {
1437 LOG.debug("Skipping compaction on " + this + " because closing/closed");
1438 store.cancelRequestedCompaction(compaction);
1439 return false;
1440 }
1441 MonitoredTask status = null;
1442 boolean didPerformCompaction = false;
1443
1444 lock.readLock().lock();
1445 try {
1446 byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1447 if (stores.get(cf) != store) {
1448 LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1449 + " has been re-instantiated, cancel this compaction request. "
1450 + " It may be caused by the roll back of split transaction");
1451 return false;
1452 }
1453
1454 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1455 if (this.closed.get()) {
1456 String msg = "Skipping compaction on " + this + " because closed";
1457 LOG.debug(msg);
1458 status.abort(msg);
1459 return false;
1460 }
1461 boolean wasStateSet = false;
1462 try {
1463 synchronized (writestate) {
1464 if (writestate.writesEnabled) {
1465 wasStateSet = true;
1466 ++writestate.compacting;
1467 } else {
1468 String msg = "NOT compacting region " + this + ". Writes disabled.";
1469 LOG.info(msg);
1470 status.abort(msg);
1471 return false;
1472 }
1473 }
1474 LOG.info("Starting compaction on " + store + " in region " + this
1475 + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1476 doRegionCompactionPrep();
1477 try {
1478 status.setStatus("Compacting store " + store);
1479 didPerformCompaction = true;
1480 store.compact(compaction);
1481 } catch (InterruptedIOException iioe) {
1482 String msg = "compaction interrupted";
1483 LOG.info(msg, iioe);
1484 status.abort(msg);
1485 return false;
1486 }
1487 } finally {
1488 if (wasStateSet) {
1489 synchronized (writestate) {
1490 --writestate.compacting;
1491 if (writestate.compacting <= 0) {
1492 writestate.notifyAll();
1493 }
1494 }
1495 }
1496 }
1497 status.markComplete("Compaction complete");
1498 return true;
1499 } finally {
1500 try {
1501 if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
1502 if (status != null) status.cleanup();
1503 } finally {
1504 lock.readLock().unlock();
1505 }
1506 }
1507 }
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529 public FlushResult flushcache() throws IOException {
1530
1531 if (this.closing.get()) {
1532 String msg = "Skipping flush on " + this + " because closing";
1533 LOG.debug(msg);
1534 return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1535 }
1536 MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1537 status.setStatus("Acquiring readlock on region");
1538
1539 lock.readLock().lock();
1540 try {
1541 if (this.closed.get()) {
1542 String msg = "Skipping flush on " + this + " because closed";
1543 LOG.debug(msg);
1544 status.abort(msg);
1545 return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1546 }
1547 if (coprocessorHost != null) {
1548 status.setStatus("Running coprocessor pre-flush hooks");
1549 coprocessorHost.preFlush();
1550 }
1551 if (numMutationsWithoutWAL.get() > 0) {
1552 numMutationsWithoutWAL.set(0);
1553 dataInMemoryWithoutWAL.set(0);
1554 }
1555 synchronized (writestate) {
1556 if (!writestate.flushing && writestate.writesEnabled) {
1557 this.writestate.flushing = true;
1558 } else {
1559 if (LOG.isDebugEnabled()) {
1560 LOG.debug("NOT flushing memstore for region " + this
1561 + ", flushing=" + writestate.flushing + ", writesEnabled="
1562 + writestate.writesEnabled);
1563 }
1564 String msg = "Not flushing since "
1565 + (writestate.flushing ? "already flushing"
1566 : "writes not enabled");
1567 status.abort(msg);
1568 return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1569 }
1570 }
1571 try {
1572 FlushResult fs = internalFlushcache(status);
1573
1574 if (coprocessorHost != null) {
1575 status.setStatus("Running post-flush coprocessor hooks");
1576 coprocessorHost.postFlush();
1577 }
1578
1579 status.markComplete("Flush successful");
1580 return fs;
1581 } finally {
1582 synchronized (writestate) {
1583 writestate.flushing = false;
1584 this.writestate.flushRequested = false;
1585 writestate.notifyAll();
1586 }
1587 }
1588 } finally {
1589 lock.readLock().unlock();
1590 status.cleanup();
1591 }
1592 }
1593
1594
1595
1596
1597 boolean shouldFlush() {
1598
1599 if (this.completeSequenceId > 0
1600 && (this.completeSequenceId + this.flushPerChanges < this.sequenceId.get())) {
1601 return true;
1602 }
1603 if (flushCheckInterval <= 0) {
1604 return false;
1605 }
1606 long now = EnvironmentEdgeManager.currentTimeMillis();
1607
1608 if ((now - getLastFlushTime() < flushCheckInterval)) {
1609 return false;
1610 }
1611
1612
1613 for (Store s : this.getStores().values()) {
1614 if (s.timeOfOldestEdit() < now - flushCheckInterval) {
1615
1616 return true;
1617 }
1618 }
1619 return false;
1620 }
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657 protected FlushResult internalFlushcache(MonitoredTask status)
1658 throws IOException {
1659 return internalFlushcache(this.log, -1, status);
1660 }
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671 protected FlushResult internalFlushcache(
1672 final HLog wal, final long myseqid, MonitoredTask status)
1673 throws IOException {
1674 if (this.rsServices != null && this.rsServices.isAborted()) {
1675
1676 throw new IOException("Aborting flush because server is abortted...");
1677 }
1678 final long startTime = EnvironmentEdgeManager.currentTimeMillis();
1679
1680
1681 if (this.memstoreSize.get() <= 0) {
1682 if(LOG.isDebugEnabled()) {
1683 LOG.debug("Empty memstore size for the current region "+this);
1684 }
1685 return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
1686 }
1687 if (LOG.isDebugEnabled()) {
1688 LOG.debug("Started memstore flush for " + this +
1689 ", current region memstore size " +
1690 StringUtils.humanReadableInt(this.memstoreSize.get()) +
1691 ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
1692 }
1693
1694
1695
1696
1697
1698
1699
1700
1701 MultiVersionConsistencyControl.WriteEntry w = null;
1702
1703
1704
1705
1706 status.setStatus("Obtaining lock to block concurrent updates");
1707
1708 this.updatesLock.writeLock().lock();
1709 long totalFlushableSize = 0;
1710 status.setStatus("Preparing to flush by snapshotting stores");
1711 List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
1712 long flushSeqId = -1L;
1713 try {
1714
1715 w = mvcc.beginMemstoreInsert();
1716 mvcc.advanceMemstore(w);
1717
1718 if (wal != null) {
1719 if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
1720 String msg = "Flush will not be started for ["
1721 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
1722 status.setStatus(msg);
1723 return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
1724 }
1725 flushSeqId = this.sequenceId.incrementAndGet();
1726 } else {
1727
1728 flushSeqId = myseqid;
1729 }
1730
1731 for (Store s : stores.values()) {
1732 totalFlushableSize += s.getFlushableSize();
1733 storeFlushCtxs.add(s.createFlushContext(flushSeqId));
1734 }
1735
1736
1737 for (StoreFlushContext flush : storeFlushCtxs) {
1738 flush.prepare();
1739 }
1740 } finally {
1741 this.updatesLock.writeLock().unlock();
1742 }
1743 String s = "Finished memstore snapshotting " + this +
1744 ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
1745 status.setStatus(s);
1746 if (LOG.isTraceEnabled()) LOG.trace(s);
1747
1748
1749
1750 if (wal != null && !shouldSyncLog()) {
1751 wal.sync();
1752 }
1753
1754
1755
1756
1757
1758
1759 mvcc.waitForRead(w);
1760
1761 s = "Flushing stores of " + this;
1762 status.setStatus(s);
1763 if (LOG.isTraceEnabled()) LOG.trace(s);
1764
1765
1766
1767
1768
1769 boolean compactionRequested = false;
1770 try {
1771
1772
1773
1774
1775
1776 for (StoreFlushContext flush : storeFlushCtxs) {
1777 flush.flushCache(status);
1778 }
1779
1780
1781
1782 for (StoreFlushContext flush : storeFlushCtxs) {
1783 boolean needsCompaction = flush.commit(status);
1784 if (needsCompaction) {
1785 compactionRequested = true;
1786 }
1787 }
1788 storeFlushCtxs.clear();
1789
1790
1791 this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
1792 } catch (Throwable t) {
1793
1794
1795
1796
1797
1798
1799 if (wal != null) {
1800 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1801 }
1802 DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1803 Bytes.toStringBinary(getRegionName()));
1804 dse.initCause(t);
1805 status.abort("Flush failed: " + StringUtils.stringifyException(t));
1806 throw dse;
1807 }
1808
1809
1810 if (wal != null) {
1811 wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
1812 }
1813
1814
1815 this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
1816
1817
1818 completeSequenceId = flushSeqId;
1819
1820
1821
1822 synchronized (this) {
1823 notifyAll();
1824 }
1825
1826 long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1827 long memstoresize = this.memstoreSize.get();
1828 String msg = "Finished memstore flush of ~" +
1829 StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
1830 ", currentsize=" +
1831 StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
1832 " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
1833 ", compaction requested=" + compactionRequested +
1834 ((wal == null)? "; wal=null": "");
1835 LOG.info(msg);
1836 status.setStatus(msg);
1837 this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
1838
1839 return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
1840 FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
1841 }
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855 Result getClosestRowBefore(final byte [] row)
1856 throws IOException{
1857 return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
1858 }
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870 public Result getClosestRowBefore(final byte [] row, final byte [] family)
1871 throws IOException {
1872 if (coprocessorHost != null) {
1873 Result result = new Result();
1874 if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
1875 return result;
1876 }
1877 }
1878
1879
1880 checkRow(row, "getClosestRowBefore");
1881 startRegionOperation(Operation.GET);
1882 this.readRequestsCount.increment();
1883 try {
1884 Store store = getStore(family);
1885
1886 KeyValue key = store.getRowKeyAtOrBefore(row);
1887 Result result = null;
1888 if (key != null) {
1889 Get get = new Get(key.getRow());
1890 get.addFamily(family);
1891 result = get(get);
1892 }
1893 if (coprocessorHost != null) {
1894 coprocessorHost.postGetClosestRowBefore(row, family, result);
1895 }
1896 return result;
1897 } finally {
1898 closeRegionOperation(Operation.GET);
1899 }
1900 }
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912 public RegionScanner getScanner(Scan scan) throws IOException {
1913 return getScanner(scan, null);
1914 }
1915
1916 void prepareScanner(Scan scan) throws IOException {
1917 if(!scan.hasFamilies()) {
1918
1919 for(byte[] family: this.htableDescriptor.getFamiliesKeys()){
1920 scan.addFamily(family);
1921 }
1922 }
1923 }
1924
1925 protected RegionScanner getScanner(Scan scan,
1926 List<KeyValueScanner> additionalScanners) throws IOException {
1927 startRegionOperation(Operation.SCAN);
1928 try {
1929
1930 prepareScanner(scan);
1931 if(scan.hasFamilies()) {
1932 for(byte [] family : scan.getFamilyMap().keySet()) {
1933 checkFamily(family);
1934 }
1935 }
1936 return instantiateRegionScanner(scan, additionalScanners);
1937 } finally {
1938 closeRegionOperation(Operation.SCAN);
1939 }
1940 }
1941
1942 protected RegionScanner instantiateRegionScanner(Scan scan,
1943 List<KeyValueScanner> additionalScanners) throws IOException {
1944 if (scan.isReversed()) {
1945 if (scan.getFilter() != null) {
1946 scan.getFilter().setReversed(true);
1947 }
1948 return new ReversedRegionScannerImpl(scan, additionalScanners, this);
1949 }
1950 return new RegionScannerImpl(scan, additionalScanners, this);
1951 }
1952
1953
1954
1955
1956 void prepareDelete(Delete delete) throws IOException {
1957
1958 if(delete.getFamilyCellMap().isEmpty()){
1959 for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
1960
1961 delete.deleteFamily(family, delete.getTimeStamp());
1962 }
1963 } else {
1964 for(byte [] family : delete.getFamilyCellMap().keySet()) {
1965 if(family == null) {
1966 throw new NoSuchColumnFamilyException("Empty family is invalid");
1967 }
1968 checkFamily(family);
1969 }
1970 }
1971 }
1972
1973
1974
1975
1976
1977
1978
1979
1980 public void delete(Delete delete)
1981 throws IOException {
1982 checkReadOnly();
1983 checkResources();
1984 startRegionOperation(Operation.DELETE);
1985 try {
1986 delete.getRow();
1987
1988 doBatchMutate(delete);
1989 } finally {
1990 closeRegionOperation(Operation.DELETE);
1991 }
1992 }
1993
1994
1995
1996
1997 private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
1998
1999
2000
2001
2002
2003
2004 void delete(NavigableMap<byte[], List<Cell>> familyMap,
2005 Durability durability) throws IOException {
2006 Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2007 delete.setFamilyCellMap(familyMap);
2008 delete.setDurability(durability);
2009 doBatchMutate(delete);
2010 }
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020 void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2021 byte[] byteNow) throws IOException {
2022 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2023
2024 byte[] family = e.getKey();
2025 List<Cell> cells = e.getValue();
2026 Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2027
2028 for (Cell cell: cells) {
2029 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2030
2031
2032 if (kv.isLatestTimestamp() && kv.isDeleteType()) {
2033 byte[] qual = kv.getQualifier();
2034 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2035
2036 Integer count = kvCount.get(qual);
2037 if (count == null) {
2038 kvCount.put(qual, 1);
2039 } else {
2040 kvCount.put(qual, count + 1);
2041 }
2042 count = kvCount.get(qual);
2043
2044 Get get = new Get(kv.getRow());
2045 get.setMaxVersions(count);
2046 get.addColumn(family, qual);
2047 if (coprocessorHost != null) {
2048 if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2049 byteNow, get)) {
2050 updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2051 }
2052 } else {
2053 updateDeleteLatestVersionTimeStamp(kv, get, count, byteNow);
2054 }
2055 } else {
2056 kv.updateLatestStamp(byteNow);
2057 }
2058 }
2059 }
2060 }
2061
2062 void updateDeleteLatestVersionTimeStamp(KeyValue kv, Get get, int count, byte[] byteNow)
2063 throws IOException {
2064 List<Cell> result = get(get, false);
2065
2066 if (result.size() < count) {
2067
2068 kv.updateLatestStamp(byteNow);
2069 return;
2070 }
2071 if (result.size() > count) {
2072 throw new RuntimeException("Unexpected size: " + result.size());
2073 }
2074 KeyValue getkv = KeyValueUtil.ensureKeyValue(result.get(count - 1));
2075 Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(),
2076 getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
2077 }
2078
2079
2080
2081
2082
2083 public void put(Put put)
2084 throws IOException {
2085 checkReadOnly();
2086
2087
2088
2089
2090
2091 checkResources();
2092 startRegionOperation(Operation.PUT);
2093 try {
2094
2095 doBatchMutate(put);
2096 } finally {
2097 closeRegionOperation(Operation.PUT);
2098 }
2099 }
2100
2101
2102
2103
2104
2105
2106 private abstract static class BatchOperationInProgress<T> {
2107 T[] operations;
2108 int nextIndexToProcess = 0;
2109 OperationStatus[] retCodeDetails;
2110 WALEdit[] walEditsFromCoprocessors;
2111
2112 public BatchOperationInProgress(T[] operations) {
2113 this.operations = operations;
2114 this.retCodeDetails = new OperationStatus[operations.length];
2115 this.walEditsFromCoprocessors = new WALEdit[operations.length];
2116 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2117 }
2118
2119 public abstract Mutation getMutation(int index);
2120 public abstract long getNonceGroup(int index);
2121 public abstract long getNonce(int index);
2122
2123 public abstract Mutation[] getMutationsForCoprocs();
2124 public abstract boolean isInReplay();
2125
2126 public boolean isDone() {
2127 return nextIndexToProcess == operations.length;
2128 }
2129 }
2130
2131 private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2132 private long nonceGroup;
2133 private long nonce;
2134 public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2135 super(operations);
2136 this.nonceGroup = nonceGroup;
2137 this.nonce = nonce;
2138 }
2139
2140 public Mutation getMutation(int index) {
2141 return this.operations[index];
2142 }
2143
2144 @Override
2145 public long getNonceGroup(int index) {
2146 return nonceGroup;
2147 }
2148
2149 @Override
2150 public long getNonce(int index) {
2151 return nonce;
2152 }
2153
2154 @Override
2155 public Mutation[] getMutationsForCoprocs() {
2156 return this.operations;
2157 }
2158
2159 @Override
2160 public boolean isInReplay() {
2161 return false;
2162 }
2163 }
2164
2165 private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
2166 public ReplayBatch(MutationReplay[] operations) {
2167 super(operations);
2168 }
2169
2170 @Override
2171 public Mutation getMutation(int index) {
2172 return this.operations[index].mutation;
2173 }
2174
2175 @Override
2176 public long getNonceGroup(int index) {
2177 return this.operations[index].nonceGroup;
2178 }
2179
2180 @Override
2181 public long getNonce(int index) {
2182 return this.operations[index].nonce;
2183 }
2184
2185 @Override
2186 public Mutation[] getMutationsForCoprocs() {
2187 assert false;
2188 throw new RuntimeException("Should not be called for replay batch");
2189 }
2190
2191 @Override
2192 public boolean isInReplay() {
2193 return true;
2194 }
2195 }
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205 public OperationStatus[] batchMutate(
2206 Mutation[] mutations, long nonceGroup, long nonce) throws IOException {
2207
2208
2209
2210
2211 return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2212 }
2213
2214 public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2215 return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2216 }
2217
2218
2219
2220
2221
2222
2223
2224
2225 public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations)
2226 throws IOException {
2227 return batchMutate(new ReplayBatch(mutations));
2228 }
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238 OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2239 boolean initialized = false;
2240 while (!batchOp.isDone()) {
2241 if (!batchOp.isInReplay()) {
2242 checkReadOnly();
2243 }
2244 checkResources();
2245
2246 long newSize;
2247 Operation op = Operation.BATCH_MUTATE;
2248 if (batchOp.isInReplay()) op = Operation.REPLAY_BATCH_MUTATE;
2249 startRegionOperation(op);
2250
2251 try {
2252 if (!initialized) {
2253 this.writeRequestsCount.add(batchOp.operations.length);
2254 if (!batchOp.isInReplay()) {
2255 doPreMutationHook(batchOp);
2256 }
2257 initialized = true;
2258 }
2259 long addedSize = doMiniBatchMutation(batchOp);
2260 newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2261 } finally {
2262 closeRegionOperation(op);
2263 }
2264 if (isFlushSize(newSize)) {
2265 requestFlush();
2266 }
2267 }
2268 return batchOp.retCodeDetails;
2269 }
2270
2271
2272 private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2273 throws IOException {
2274
2275 WALEdit walEdit = new WALEdit();
2276 if (coprocessorHost != null) {
2277 for (int i = 0 ; i < batchOp.operations.length; i++) {
2278 Mutation m = batchOp.getMutation(i);
2279 if (m instanceof Put) {
2280 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2281
2282
2283 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2284 }
2285 } else if (m instanceof Delete) {
2286 Delete curDel = (Delete) m;
2287 if (curDel.getFamilyCellMap().isEmpty()) {
2288
2289 prepareDelete(curDel);
2290 }
2291 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2292
2293
2294 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2295 }
2296 } else {
2297
2298
2299
2300 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2301 "Put/Delete mutations only supported in batchMutate() now");
2302 }
2303 if (!walEdit.isEmpty()) {
2304 batchOp.walEditsFromCoprocessors[i] = walEdit;
2305 walEdit = new WALEdit();
2306 }
2307 }
2308 }
2309 }
2310
2311 @SuppressWarnings("unchecked")
2312 private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2313 boolean isInReplay = batchOp.isInReplay();
2314
2315 boolean putsCfSetConsistent = true;
2316
2317 Set<byte[]> putsCfSet = null;
2318
2319 boolean deletesCfSetConsistent = true;
2320
2321 Set<byte[]> deletesCfSet = null;
2322
2323 long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2324 WALEdit walEdit = new WALEdit(isInReplay);
2325 MultiVersionConsistencyControl.WriteEntry w = null;
2326 long txid = 0;
2327 boolean doRollBackMemstore = false;
2328 boolean locked = false;
2329
2330
2331 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2332
2333 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2334
2335 int firstIndex = batchOp.nextIndexToProcess;
2336 int lastIndexExclusive = firstIndex;
2337 boolean success = false;
2338 int noOfPuts = 0, noOfDeletes = 0;
2339 try {
2340
2341
2342
2343
2344 int numReadyToWrite = 0;
2345 long now = EnvironmentEdgeManager.currentTimeMillis();
2346 while (lastIndexExclusive < batchOp.operations.length) {
2347 Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2348 boolean isPutMutation = mutation instanceof Put;
2349
2350 Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2351
2352 familyMaps[lastIndexExclusive] = familyMap;
2353
2354
2355 if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2356 != OperationStatusCode.NOT_RUN) {
2357 lastIndexExclusive++;
2358 continue;
2359 }
2360
2361 try {
2362 if (isPutMutation) {
2363
2364 if (isInReplay) {
2365 removeNonExistentColumnFamilyForReplay(familyMap);
2366 } else {
2367 checkFamilies(familyMap.keySet());
2368 }
2369 checkTimestamps(mutation.getFamilyCellMap(), now);
2370 } else {
2371 prepareDelete((Delete) mutation);
2372 }
2373 } catch (NoSuchColumnFamilyException nscf) {
2374 LOG.warn("No such column family in batch mutation", nscf);
2375 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2376 OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2377 lastIndexExclusive++;
2378 continue;
2379 } catch (FailedSanityCheckException fsce) {
2380 LOG.warn("Batch Mutation did not pass sanity check", fsce);
2381 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2382 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2383 lastIndexExclusive++;
2384 continue;
2385 }
2386
2387
2388
2389 boolean shouldBlock = numReadyToWrite == 0;
2390 RowLock rowLock = null;
2391 try {
2392 rowLock = getRowLock(mutation.getRow(), shouldBlock);
2393 } catch (IOException ioe) {
2394 LOG.warn("Failed getting lock in batch put, row="
2395 + Bytes.toStringBinary(mutation.getRow()), ioe);
2396 }
2397 if (rowLock == null) {
2398
2399 assert !shouldBlock : "Should never fail to get lock when blocking";
2400 break;
2401 } else {
2402 acquiredRowLocks.add(rowLock);
2403 }
2404
2405 lastIndexExclusive++;
2406 numReadyToWrite++;
2407
2408 if (isPutMutation) {
2409
2410
2411
2412 if (putsCfSet == null) {
2413 putsCfSet = mutation.getFamilyCellMap().keySet();
2414 } else {
2415 putsCfSetConsistent = putsCfSetConsistent
2416 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2417 }
2418 } else {
2419 if (deletesCfSet == null) {
2420 deletesCfSet = mutation.getFamilyCellMap().keySet();
2421 } else {
2422 deletesCfSetConsistent = deletesCfSetConsistent
2423 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2424 }
2425 }
2426 }
2427
2428
2429
2430 now = EnvironmentEdgeManager.currentTimeMillis();
2431 byte[] byteNow = Bytes.toBytes(now);
2432
2433
2434 if (numReadyToWrite <= 0) return 0L;
2435
2436
2437
2438
2439
2440
2441 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2442
2443 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2444 != OperationStatusCode.NOT_RUN) continue;
2445
2446 Mutation mutation = batchOp.getMutation(i);
2447 if (mutation instanceof Put) {
2448 updateKVTimestamps(familyMaps[i].values(), byteNow);
2449 noOfPuts++;
2450 } else {
2451 if (!isInReplay) {
2452 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
2453 }
2454 noOfDeletes++;
2455 }
2456 }
2457
2458 lock(this.updatesLock.readLock(), numReadyToWrite);
2459 locked = true;
2460
2461
2462
2463
2464
2465 w = mvcc.beginMemstoreInsert();
2466
2467
2468 if (!isInReplay && coprocessorHost != null) {
2469 MiniBatchOperationInProgress<Mutation> miniBatchOp =
2470 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2471 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2472 if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
2473 }
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484 long addedSize = 0;
2485 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2486 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2487 != OperationStatusCode.NOT_RUN) {
2488 continue;
2489 }
2490 doRollBackMemstore = true;
2491 addedSize += applyFamilyMapToMemstore(familyMaps[i], w);
2492 }
2493
2494
2495
2496
2497 boolean hasWalAppends = false;
2498 Durability durability = Durability.USE_DEFAULT;
2499 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2500
2501 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2502 != OperationStatusCode.NOT_RUN) {
2503 continue;
2504 }
2505 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2506
2507 Mutation m = batchOp.getMutation(i);
2508 Durability tmpDur = getEffectiveDurability(m.getDurability());
2509 if (tmpDur.ordinal() > durability.ordinal()) {
2510 durability = tmpDur;
2511 }
2512 if (tmpDur == Durability.SKIP_WAL) {
2513 recordMutationWithoutWal(m.getFamilyCellMap());
2514 continue;
2515 }
2516
2517 long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
2518
2519
2520
2521 if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
2522 if (walEdit.size() > 0) {
2523 assert isInReplay;
2524 if (!isInReplay) {
2525 throw new IOException("Multiple nonces per batch and not in replay");
2526 }
2527
2528 txid = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(),
2529 walEdit, m.getClusterIds(), now, htableDescriptor, this.sequenceId, true,
2530 currentNonceGroup, currentNonce);
2531 hasWalAppends = true;
2532 walEdit = new WALEdit(isInReplay);
2533 }
2534 currentNonceGroup = nonceGroup;
2535 currentNonce = nonce;
2536 }
2537
2538
2539 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
2540 if (fromCP != null) {
2541 for (KeyValue kv : fromCP.getKeyValues()) {
2542 walEdit.add(kv);
2543 }
2544 }
2545 addFamilyMapToWALEdit(familyMaps[i], walEdit);
2546 }
2547
2548
2549
2550
2551 Mutation mutation = batchOp.getMutation(firstIndex);
2552 if (walEdit.size() > 0) {
2553 txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
2554 walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId,
2555 true, currentNonceGroup, currentNonce);
2556 hasWalAppends = true;
2557 }
2558
2559
2560
2561
2562 if (locked) {
2563 this.updatesLock.readLock().unlock();
2564 locked = false;
2565 }
2566 releaseRowLocks(acquiredRowLocks);
2567
2568
2569
2570
2571 if (hasWalAppends) {
2572 syncOrDefer(txid, durability);
2573 }
2574 doRollBackMemstore = false;
2575
2576 if (!isInReplay && coprocessorHost != null) {
2577 MiniBatchOperationInProgress<Mutation> miniBatchOp =
2578 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2579 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
2580 coprocessorHost.postBatchMutate(miniBatchOp);
2581 }
2582
2583
2584
2585
2586 if (w != null) {
2587 mvcc.completeMemstoreInsert(w);
2588 w = null;
2589 }
2590
2591
2592
2593
2594
2595 if (!isInReplay && coprocessorHost != null) {
2596 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2597
2598 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2599 != OperationStatusCode.SUCCESS) {
2600 continue;
2601 }
2602 Mutation m = batchOp.getMutation(i);
2603 if (m instanceof Put) {
2604 coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
2605 } else {
2606 coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
2607 }
2608 }
2609 }
2610
2611 success = true;
2612 return addedSize;
2613 } finally {
2614
2615
2616 if (doRollBackMemstore) {
2617 rollbackMemstore(batchOp, familyMaps, firstIndex, lastIndexExclusive);
2618 }
2619 if (w != null) mvcc.completeMemstoreInsert(w);
2620
2621 if (locked) {
2622 this.updatesLock.readLock().unlock();
2623 }
2624 releaseRowLocks(acquiredRowLocks);
2625
2626
2627
2628
2629
2630
2631
2632 if (noOfPuts > 0) {
2633
2634 if (this.metricsRegion != null) {
2635 this.metricsRegion.updatePut();
2636 }
2637 }
2638 if (noOfDeletes > 0) {
2639
2640 if (this.metricsRegion != null) {
2641 this.metricsRegion.updateDelete();
2642 }
2643 }
2644 if (!success) {
2645 for (int i = firstIndex; i < lastIndexExclusive; i++) {
2646 if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
2647 batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
2648 }
2649 }
2650 }
2651 if (coprocessorHost != null && !batchOp.isInReplay()) {
2652
2653
2654 MiniBatchOperationInProgress<Mutation> miniBatchOp =
2655 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
2656 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
2657 lastIndexExclusive);
2658 coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
2659 }
2660
2661 batchOp.nextIndexToProcess = lastIndexExclusive;
2662 }
2663 }
2664
2665
2666
2667
2668
2669 protected Durability getEffectiveDurability(Durability d) {
2670 return d == Durability.USE_DEFAULT ? this.durability : d;
2671 }
2672
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683
2684
2685
2686
2687
2688
2689 public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
2690 CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
2691 boolean writeToWAL)
2692 throws IOException{
2693 checkReadOnly();
2694
2695
2696 checkResources();
2697 boolean isPut = w instanceof Put;
2698 if (!isPut && !(w instanceof Delete))
2699 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
2700 "be Put or Delete");
2701 if (!Bytes.equals(row, w.getRow())) {
2702 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
2703 "getRow must match the passed row");
2704 }
2705
2706 startRegionOperation();
2707 try {
2708 Get get = new Get(row);
2709 checkFamily(family);
2710 get.addColumn(family, qualifier);
2711
2712
2713 RowLock rowLock = getRowLock(get.getRow());
2714
2715 mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
2716 try {
2717 if (this.getCoprocessorHost() != null) {
2718 Boolean processed = null;
2719 if (w instanceof Put) {
2720 processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
2721 qualifier, compareOp, comparator, (Put) w);
2722 } else if (w instanceof Delete) {
2723 processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
2724 qualifier, compareOp, comparator, (Delete) w);
2725 }
2726 if (processed != null) {
2727 return processed;
2728 }
2729 }
2730 List<Cell> result = get(get, false);
2731
2732 boolean valueIsNull = comparator.getValue() == null ||
2733 comparator.getValue().length == 0;
2734 boolean matches = false;
2735 if (result.size() == 0 && valueIsNull) {
2736 matches = true;
2737 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
2738 valueIsNull) {
2739 matches = true;
2740 } else if (result.size() == 1 && !valueIsNull) {
2741 Cell kv = result.get(0);
2742 int compareResult = comparator.compareTo(kv.getValueArray(),
2743 kv.getValueOffset(), kv.getValueLength());
2744 switch (compareOp) {
2745 case LESS:
2746 matches = compareResult < 0;
2747 break;
2748 case LESS_OR_EQUAL:
2749 matches = compareResult <= 0;
2750 break;
2751 case EQUAL:
2752 matches = compareResult == 0;
2753 break;
2754 case NOT_EQUAL:
2755 matches = compareResult != 0;
2756 break;
2757 case GREATER_OR_EQUAL:
2758 matches = compareResult >= 0;
2759 break;
2760 case GREATER:
2761 matches = compareResult > 0;
2762 break;
2763 default:
2764 throw new RuntimeException("Unknown Compare op " + compareOp.name());
2765 }
2766 }
2767
2768 if (matches) {
2769
2770
2771 doBatchMutate((Mutation)w);
2772 this.checkAndMutateChecksPassed.increment();
2773 return true;
2774 }
2775 this.checkAndMutateChecksFailed.increment();
2776 return false;
2777 } finally {
2778 rowLock.release();
2779 }
2780 } finally {
2781 closeRegionOperation();
2782 }
2783 }
2784
2785 private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
2786
2787 OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
2788 HConstants.NO_NONCE, HConstants.NO_NONCE);
2789 if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
2790 throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
2791 } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
2792 throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
2793 }
2794 }
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809 public void addRegionToSnapshot(SnapshotDescription desc,
2810 ForeignExceptionSnare exnSnare) throws IOException {
2811
2812
2813 Path rootDir = FSUtils.getRootDir(this.rsServices.getConfiguration());
2814 Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
2815
2816
2817 LOG.debug("Storing region-info for snapshot.");
2818 HRegionFileSystem snapshotRegionFs = HRegionFileSystem.createRegionOnFileSystem(conf,
2819 this.fs.getFileSystem(), snapshotDir, getRegionInfo());
2820
2821
2822 LOG.debug("Creating references for hfiles");
2823
2824
2825
2826
2827
2828
2829 for (Store store : stores.values()) {
2830
2831 Path dstStoreDir = snapshotRegionFs.getStoreDir(store.getFamily().getNameAsString());
2832 List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());
2833 if (LOG.isDebugEnabled()) {
2834 LOG.debug("Adding snapshot references for " + storeFiles + " hfiles");
2835 }
2836
2837
2838 int sz = storeFiles.size();
2839 for (int i = 0; i < sz; i++) {
2840 if (exnSnare != null) {
2841 exnSnare.rethrowException();
2842 }
2843 StoreFile storeFile = storeFiles.get(i);
2844 Path file = storeFile.getPath();
2845
2846 LOG.debug("Creating reference for file (" + (i+1) + "/" + sz + ") : " + file);
2847 Path referenceFile = new Path(dstStoreDir, file.getName());
2848 boolean success = true;
2849 if (storeFile.isReference()) {
2850
2851 storeFile.getFileInfo().getReference().write(fs.getFileSystem(), referenceFile);
2852 } else {
2853
2854
2855
2856
2857 success = fs.getFileSystem().createNewFile(referenceFile);
2858 }
2859 if (!success) {
2860 throw new IOException("Failed to create reference file:" + referenceFile);
2861 }
2862 }
2863 }
2864 }
2865
2866
2867
2868
2869
2870 void updateKVTimestamps(final Iterable<List<Cell>> keyLists, final byte[] now) {
2871 for (List<Cell> cells: keyLists) {
2872 if (cells == null) continue;
2873 for (Cell cell : cells) {
2874 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2875 kv.updateLatestStamp(now);
2876 }
2877 }
2878 }
2879
2880
2881
2882
2883
2884
2885
2886 private void checkResources()
2887 throws RegionTooBusyException {
2888
2889 if (this.getRegionInfo().isMetaRegion()) return;
2890
2891 if (this.memstoreSize.get() > this.blockingMemStoreSize) {
2892 requestFlush();
2893 throw new RegionTooBusyException("Above memstore limit, " +
2894 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
2895 this.getRegionInfo().getRegionNameAsString()) +
2896 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
2897 this.getRegionServerServices().getServerName()) +
2898 ", memstoreSize=" + memstoreSize.get() +
2899 ", blockingMemStoreSize=" + blockingMemStoreSize);
2900 }
2901 }
2902
2903
2904
2905
2906 protected void checkReadOnly() throws IOException {
2907 if (this.writestate.isReadOnly()) {
2908 throw new IOException("region is read only");
2909 }
2910 }
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920 private void put(final byte [] row, byte [] family, List<Cell> edits)
2921 throws IOException {
2922 NavigableMap<byte[], List<Cell>> familyMap;
2923 familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
2924
2925 familyMap.put(family, edits);
2926 Put p = new Put(row);
2927 p.setFamilyCellMap(familyMap);
2928 doBatchMutate(p);
2929 }
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943 private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
2944 MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) {
2945 long size = 0;
2946 boolean freemvcc = false;
2947
2948 try {
2949 if (localizedWriteEntry == null) {
2950 localizedWriteEntry = mvcc.beginMemstoreInsert();
2951 freemvcc = true;
2952 }
2953
2954 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2955 byte[] family = e.getKey();
2956 List<Cell> cells = e.getValue();
2957
2958 Store store = getStore(family);
2959 for (Cell cell: cells) {
2960 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
2961 kv.setMvccVersion(localizedWriteEntry.getWriteNumber());
2962 size += store.add(kv);
2963 }
2964 }
2965 } finally {
2966 if (freemvcc) {
2967 mvcc.completeMemstoreInsert(localizedWriteEntry);
2968 }
2969 }
2970
2971 return size;
2972 }
2973
2974
2975
2976
2977
2978
2979 private void rollbackMemstore(BatchOperationInProgress<?> batchOp,
2980 Map<byte[], List<Cell>>[] familyMaps,
2981 int start, int end) {
2982 int kvsRolledback = 0;
2983 for (int i = start; i < end; i++) {
2984
2985 if (batchOp.retCodeDetails[i].getOperationStatusCode()
2986 != OperationStatusCode.SUCCESS) {
2987 continue;
2988 }
2989
2990
2991 Map<byte[], List<Cell>> familyMap = familyMaps[i];
2992 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2993 byte[] family = e.getKey();
2994 List<Cell> cells = e.getValue();
2995
2996
2997
2998
2999 Store store = getStore(family);
3000 for (Cell cell: cells) {
3001 store.rollback(KeyValueUtil.ensureKeyValue(cell));
3002 kvsRolledback++;
3003 }
3004 }
3005 }
3006 LOG.debug("rollbackMemstore rolled back " + kvsRolledback +
3007 " keyvalues from start:" + start + " to end:" + end);
3008 }
3009
3010
3011
3012
3013
3014 void checkFamilies(Collection<byte[]> families)
3015 throws NoSuchColumnFamilyException {
3016 for (byte[] family : families) {
3017 checkFamily(family);
3018 }
3019 }
3020
3021
3022
3023
3024
3025 private void removeNonExistentColumnFamilyForReplay(
3026 final Map<byte[], List<Cell>> familyMap) {
3027 List<byte[]> nonExistentList = null;
3028 for (byte[] family : familyMap.keySet()) {
3029 if (!this.htableDescriptor.hasFamily(family)) {
3030 if (nonExistentList == null) {
3031 nonExistentList = new ArrayList<byte[]>();
3032 }
3033 nonExistentList.add(family);
3034 }
3035 }
3036 if (nonExistentList != null) {
3037 for (byte[] family : nonExistentList) {
3038
3039 LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3040 familyMap.remove(family);
3041 }
3042 }
3043 }
3044
3045 void checkTimestamps(final Map<byte[], List<Cell>> familyMap,
3046 long now) throws FailedSanityCheckException {
3047 if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3048 return;
3049 }
3050 long maxTs = now + timestampSlop;
3051 for (List<Cell> kvs : familyMap.values()) {
3052 for (Cell cell : kvs) {
3053
3054 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
3055 if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
3056 throw new FailedSanityCheckException("Timestamp for KV out of range "
3057 + cell + " (too.new=" + timestampSlop + ")");
3058 }
3059 }
3060 }
3061 }
3062
3063
3064
3065
3066
3067
3068
3069 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3070 WALEdit walEdit) {
3071 for (List<Cell> edits : familyMap.values()) {
3072 for (Cell cell : edits) {
3073 walEdit.add(KeyValueUtil.ensureKeyValue(cell));
3074 }
3075 }
3076 }
3077
3078 private void requestFlush() {
3079 if (this.rsServices == null) {
3080 return;
3081 }
3082 synchronized (writestate) {
3083 if (this.writestate.isFlushRequested()) {
3084 return;
3085 }
3086 writestate.flushRequested = true;
3087 }
3088
3089 this.rsServices.getFlushRequester().requestFlush(this);
3090 if (LOG.isDebugEnabled()) {
3091 LOG.debug("Flush requested on " + this);
3092 }
3093 }
3094
3095
3096
3097
3098
3099 private boolean isFlushSize(final long size) {
3100 return size > this.memstoreFlushSize;
3101 }
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139 protected long replayRecoveredEditsIfAny(final Path regiondir,
3140 Map<byte[], Long> maxSeqIdInStores,
3141 final CancelableProgressable reporter, final MonitoredTask status)
3142 throws UnsupportedEncodingException, IOException {
3143 long minSeqIdForTheRegion = -1;
3144 for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3145 if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3146 minSeqIdForTheRegion = maxSeqIdInStore;
3147 }
3148 }
3149 long seqid = minSeqIdForTheRegion;
3150
3151 FileSystem fs = this.fs.getFileSystem();
3152 NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
3153 if (LOG.isDebugEnabled()) {
3154 LOG.debug("Found " + (files == null ? 0 : files.size())
3155 + " recovered edits file(s) under " + regiondir);
3156 }
3157
3158 if (files == null || files.isEmpty()) return seqid;
3159
3160 for (Path edits: files) {
3161 if (edits == null || !fs.exists(edits)) {
3162 LOG.warn("Null or non-existent edits file: " + edits);
3163 continue;
3164 }
3165 if (isZeroLengthThenDelete(fs, edits)) continue;
3166
3167 long maxSeqId;
3168 String fileName = edits.getName();
3169 maxSeqId = Math.abs(Long.parseLong(fileName));
3170 if (maxSeqId <= minSeqIdForTheRegion) {
3171 if (LOG.isDebugEnabled()) {
3172 String msg = "Maximum sequenceid for this log is " + maxSeqId
3173 + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3174 + ", skipped the whole file, path=" + edits;
3175 LOG.debug(msg);
3176 }
3177 continue;
3178 }
3179
3180 try {
3181
3182 seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3183 } catch (IOException e) {
3184 boolean skipErrors = conf.getBoolean(
3185 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3186 conf.getBoolean(
3187 "hbase.skip.errors",
3188 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3189 if (conf.get("hbase.skip.errors") != null) {
3190 LOG.warn(
3191 "The property 'hbase.skip.errors' has been deprecated. Please use " +
3192 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3193 }
3194 if (skipErrors) {
3195 Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3196 LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3197 + "=true so continuing. Renamed " + edits +
3198 " as " + p, e);
3199 } else {
3200 throw e;
3201 }
3202 }
3203 }
3204
3205
3206 if (this.rsAccounting != null) {
3207 this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
3208 }
3209 if (seqid > minSeqIdForTheRegion) {
3210
3211 internalFlushcache(null, seqid, status);
3212 }
3213
3214 for (Path file: files) {
3215 if (!fs.delete(file, false)) {
3216 LOG.error("Failed delete of " + file);
3217 } else {
3218 LOG.debug("Deleted recovered.edits file=" + file);
3219 }
3220 }
3221 return seqid;
3222 }
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233 private long replayRecoveredEdits(final Path edits,
3234 Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3235 throws IOException {
3236 String msg = "Replaying edits from " + edits;
3237 LOG.info(msg);
3238 MonitoredTask status = TaskMonitor.get().createStatus(msg);
3239 FileSystem fs = this.fs.getFileSystem();
3240
3241 status.setStatus("Opening logs");
3242 HLog.Reader reader = null;
3243 try {
3244 reader = HLogFactory.createReader(fs, edits, conf);
3245 long currentEditSeqId = -1;
3246 long firstSeqIdInLog = -1;
3247 long skippedEdits = 0;
3248 long editsCount = 0;
3249 long intervalEdits = 0;
3250 HLog.Entry entry;
3251 Store store = null;
3252 boolean reported_once = false;
3253 ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3254
3255 try {
3256
3257 int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
3258 2000);
3259
3260 int period = this.conf.getInt("hbase.hstore.report.period",
3261 this.conf.getInt(AssignmentManager.ASSIGNMENT_TIMEOUT,
3262 AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT) / 2);
3263 long lastReport = EnvironmentEdgeManager.currentTimeMillis();
3264
3265 while ((entry = reader.next()) != null) {
3266 HLogKey key = entry.getKey();
3267 WALEdit val = entry.getEdit();
3268
3269 if (ng != null) {
3270 ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3271 }
3272
3273 if (reporter != null) {
3274 intervalEdits += val.size();
3275 if (intervalEdits >= interval) {
3276
3277 intervalEdits = 0;
3278 long cur = EnvironmentEdgeManager.currentTimeMillis();
3279 if (lastReport + period <= cur) {
3280 status.setStatus("Replaying edits..." +
3281 " skipped=" + skippedEdits +
3282 " edits=" + editsCount);
3283
3284 if(!reporter.progress()) {
3285 msg = "Progressable reporter failed, stopping replay";
3286 LOG.warn(msg);
3287 status.abort(msg);
3288 throw new IOException(msg);
3289 }
3290 reported_once = true;
3291 lastReport = cur;
3292 }
3293 }
3294 }
3295
3296
3297
3298 if (coprocessorHost != null) {
3299 status.setStatus("Running pre-WAL-restore hook in coprocessors");
3300 if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
3301
3302 continue;
3303 }
3304 }
3305
3306 if (firstSeqIdInLog == -1) {
3307 firstSeqIdInLog = key.getLogSeqNum();
3308 }
3309 currentEditSeqId = key.getLogSeqNum();
3310 boolean flush = false;
3311 for (KeyValue kv: val.getKeyValues()) {
3312
3313
3314 if (kv.matchingFamily(WALEdit.METAFAMILY) ||
3315 !Bytes.equals(key.getEncodedRegionName(),
3316 this.getRegionInfo().getEncodedNameAsBytes())) {
3317
3318 CompactionDescriptor compaction = WALEdit.getCompaction(kv);
3319 if (compaction != null) {
3320
3321 completeCompactionMarker(compaction);
3322 }
3323
3324 skippedEdits++;
3325 continue;
3326 }
3327
3328 if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
3329 store = this.stores.get(kv.getFamily());
3330 }
3331 if (store == null) {
3332
3333
3334 LOG.warn("No family for " + kv);
3335 skippedEdits++;
3336 continue;
3337 }
3338
3339 if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
3340 .getName())) {
3341 skippedEdits++;
3342 continue;
3343 }
3344
3345
3346
3347 flush = restoreEdit(store, kv);
3348 editsCount++;
3349 }
3350 if (flush) internalFlushcache(null, currentEditSeqId, status);
3351
3352 if (coprocessorHost != null) {
3353 coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
3354 }
3355 }
3356 } catch (EOFException eof) {
3357 Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3358 msg = "Encountered EOF. Most likely due to Master failure during " +
3359 "log spliting, so we have this data in another edit. " +
3360 "Continuing, but renaming " + edits + " as " + p;
3361 LOG.warn(msg, eof);
3362 status.abort(msg);
3363 } catch (IOException ioe) {
3364
3365
3366 if (ioe.getCause() instanceof ParseException) {
3367 Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
3368 msg = "File corruption encountered! " +
3369 "Continuing, but renaming " + edits + " as " + p;
3370 LOG.warn(msg, ioe);
3371 status.setStatus(msg);
3372 } else {
3373 status.abort(StringUtils.stringifyException(ioe));
3374
3375
3376 throw ioe;
3377 }
3378 }
3379 if (reporter != null && !reported_once) {
3380 reporter.progress();
3381 }
3382 msg = "Applied " + editsCount + ", skipped " + skippedEdits +
3383 ", firstSequenceidInLog=" + firstSeqIdInLog +
3384 ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits;
3385 status.markComplete(msg);
3386 LOG.debug(msg);
3387 return currentEditSeqId;
3388 } finally {
3389 status.cleanup();
3390 if (reader != null) {
3391 reader.close();
3392 }
3393 }
3394 }
3395
3396
3397
3398
3399
3400
3401
3402 void completeCompactionMarker(CompactionDescriptor compaction)
3403 throws IOException {
3404 Store store = this.getStore(compaction.getFamilyName().toByteArray());
3405 if (store == null) {
3406 LOG.warn("Found Compaction WAL edit for deleted family:" +
3407 Bytes.toString(compaction.getFamilyName().toByteArray()));
3408 return;
3409 }
3410 store.completeCompactionMarker(compaction);
3411 }
3412
3413
3414
3415
3416
3417
3418
3419 protected boolean restoreEdit(final Store s, final KeyValue kv) {
3420 long kvSize = s.add(kv);
3421 if (this.rsAccounting != null) {
3422 rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
3423 }
3424 return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
3425 }
3426
3427
3428
3429
3430
3431
3432
3433 private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
3434 throws IOException {
3435 FileStatus stat = fs.getFileStatus(p);
3436 if (stat.getLen() > 0) return false;
3437 LOG.warn("File " + p + " is zero-length, deleting.");
3438 fs.delete(p, false);
3439 return true;
3440 }
3441
3442 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
3443 return new HStore(this, family, this.conf);
3444 }
3445
3446
3447
3448
3449
3450
3451
3452
3453 public Store getStore(final byte[] column) {
3454 return this.stores.get(column);
3455 }
3456
3457 public Map<byte[], Store> getStores() {
3458 return this.stores;
3459 }
3460
3461
3462
3463
3464
3465
3466
3467
3468 public List<String> getStoreFileList(final byte [][] columns)
3469 throws IllegalArgumentException {
3470 List<String> storeFileNames = new ArrayList<String>();
3471 synchronized(closeLock) {
3472 for(byte[] column : columns) {
3473 Store store = this.stores.get(column);
3474 if (store == null) {
3475 throw new IllegalArgumentException("No column family : " +
3476 new String(column) + " available");
3477 }
3478 for (StoreFile storeFile: store.getStorefiles()) {
3479 storeFileNames.add(storeFile.getPath().toString());
3480 }
3481 }
3482 }
3483 return storeFileNames;
3484 }
3485
3486
3487
3488
3489
3490 void checkRow(final byte [] row, String op) throws IOException {
3491 if (!rowIsInRange(getRegionInfo(), row)) {
3492 throw new WrongRegionException("Requested row out of range for " +
3493 op + " on HRegion " + this + ", startKey='" +
3494 Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" +
3495 Bytes.toStringBinary(getEndKey()) + "', row='" +
3496 Bytes.toStringBinary(row) + "'");
3497 }
3498 }
3499
3500
3501
3502
3503
3504
3505
3506
3507
3508
3509 public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
3510 checkRow(row, "row lock");
3511 startRegionOperation();
3512 try {
3513 HashedBytes rowKey = new HashedBytes(row);
3514 RowLockContext rowLockContext = new RowLockContext(rowKey);
3515
3516
3517 while (true) {
3518 RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
3519 if (existingContext == null) {
3520
3521 break;
3522 } else if (existingContext.ownedByCurrentThread()) {
3523
3524 rowLockContext = existingContext;
3525 break;
3526 } else {
3527
3528 if (!waitForLock) {
3529 return null;
3530 }
3531 try {
3532 if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
3533 throw new IOException("Timed out waiting for lock for row: " + rowKey);
3534 }
3535 } catch (InterruptedException ie) {
3536 LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
3537 InterruptedIOException iie = new InterruptedIOException();
3538 iie.initCause(ie);
3539 throw iie;
3540 }
3541 }
3542 }
3543
3544
3545 return rowLockContext.newLock();
3546 } finally {
3547 closeRegionOperation();
3548 }
3549 }
3550
3551
3552
3553
3554
3555
3556
3557 public RowLock getRowLock(byte[] row) throws IOException {
3558 return getRowLock(row, true);
3559 }
3560
3561
3562
3563
3564 public void releaseRowLocks(List<RowLock> rowLocks) {
3565 if (rowLocks != null) {
3566 for (RowLock rowLock : rowLocks) {
3567 rowLock.release();
3568 }
3569 rowLocks.clear();
3570 }
3571 }
3572
3573
3574
3575
3576
3577
3578
3579 private static boolean hasMultipleColumnFamilies(
3580 List<Pair<byte[], String>> familyPaths) {
3581 boolean multipleFamilies = false;
3582 byte[] family = null;
3583 for (Pair<byte[], String> pair : familyPaths) {
3584 byte[] fam = pair.getFirst();
3585 if (family == null) {
3586 family = fam;
3587 } else if (!Bytes.equals(family, fam)) {
3588 multipleFamilies = true;
3589 break;
3590 }
3591 }
3592 return multipleFamilies;
3593 }
3594
3595
3596 public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
3597 boolean assignSeqId) throws IOException {
3598 return bulkLoadHFiles(familyPaths, assignSeqId, null);
3599 }
3600
3601
3602
3603
3604
3605
3606
3607
3608
3609
3610
3611
3612 public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, boolean assignSeqId,
3613 BulkLoadListener bulkLoadListener) throws IOException {
3614 Preconditions.checkNotNull(familyPaths);
3615
3616 startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
3617 try {
3618 this.writeRequestsCount.increment();
3619
3620
3621
3622
3623 List<IOException> ioes = new ArrayList<IOException>();
3624 List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
3625 for (Pair<byte[], String> p : familyPaths) {
3626 byte[] familyName = p.getFirst();
3627 String path = p.getSecond();
3628
3629 Store store = getStore(familyName);
3630 if (store == null) {
3631 IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
3632 "No such column family " + Bytes.toStringBinary(familyName));
3633 ioes.add(ioe);
3634 } else {
3635 try {
3636 store.assertBulkLoadHFileOk(new Path(path));
3637 } catch (WrongRegionException wre) {
3638
3639 failures.add(p);
3640 } catch (IOException ioe) {
3641
3642 ioes.add(ioe);
3643 }
3644 }
3645 }
3646
3647
3648 if (ioes.size() != 0) {
3649 IOException e = MultipleIOException.createIOException(ioes);
3650 LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
3651 throw e;
3652 }
3653
3654
3655 if (failures.size() != 0) {
3656 StringBuilder list = new StringBuilder();
3657 for (Pair<byte[], String> p : failures) {
3658 list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
3659 .append(p.getSecond());
3660 }
3661
3662 LOG.warn("There was a recoverable bulk load failure likely due to a" +
3663 " split. These (family, HFile) pairs were not loaded: " + list);
3664 return false;
3665 }
3666
3667 long seqId = -1;
3668
3669
3670
3671 if (assignSeqId) {
3672 FlushResult fs = this.flushcache();
3673 if (fs.isFlushSucceeded()) {
3674 seqId = fs.flushSequenceId;
3675 } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
3676 seqId = this.sequenceId.incrementAndGet();
3677 } else {
3678 throw new IOException("Could not bulk load with an assigned sequential ID because the " +
3679 "flush didn't run. Reason for not flushing: " + fs.failureReason);
3680 }
3681 }
3682
3683 for (Pair<byte[], String> p : familyPaths) {
3684 byte[] familyName = p.getFirst();
3685 String path = p.getSecond();
3686 Store store = getStore(familyName);
3687 try {
3688 String finalPath = path;
3689 if(bulkLoadListener != null) {
3690 finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
3691 }
3692 store.bulkLoadHFile(finalPath, seqId);
3693 if(bulkLoadListener != null) {
3694 bulkLoadListener.doneBulkLoad(familyName, path);
3695 }
3696 } catch (IOException ioe) {
3697
3698
3699
3700
3701 LOG.error("There was a partial failure due to IO when attempting to" +
3702 " load " + Bytes.toString(p.getFirst()) + " : "+ p.getSecond(), ioe);
3703 if(bulkLoadListener != null) {
3704 try {
3705 bulkLoadListener.failedBulkLoad(familyName, path);
3706 } catch (Exception ex) {
3707 LOG.error("Error while calling failedBulkLoad for family "+
3708 Bytes.toString(familyName)+" with path "+path, ex);
3709 }
3710 }
3711 throw ioe;
3712 }
3713 }
3714 return true;
3715 } finally {
3716 closeBulkRegionOperation();
3717 }
3718 }
3719
3720 @Override
3721 public boolean equals(Object o) {
3722 return o instanceof HRegion && Bytes.equals(this.getRegionName(),
3723 ((HRegion) o).getRegionName());
3724 }
3725
3726 @Override
3727 public int hashCode() {
3728 return Bytes.hashCode(this.getRegionName());
3729 }
3730
3731 @Override
3732 public String toString() {
3733 return this.getRegionNameAsString();
3734 }
3735
3736
3737
3738
3739 class RegionScannerImpl implements RegionScanner {
3740
3741 KeyValueHeap storeHeap = null;
3742
3743
3744 KeyValueHeap joinedHeap = null;
3745
3746
3747
3748 protected KeyValue joinedContinuationRow = null;
3749
3750 private final KeyValue KV_LIMIT = new KeyValue();
3751 protected final byte[] stopRow;
3752 private final FilterWrapper filter;
3753 private int batch;
3754 protected int isScan;
3755 private boolean filterClosed = false;
3756 private long readPt;
3757 private long maxResultSize;
3758 protected HRegion region;
3759
3760 @Override
3761 public HRegionInfo getRegionInfo() {
3762 return region.getRegionInfo();
3763 }
3764
3765 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
3766 throws IOException {
3767
3768 this.region = region;
3769 this.maxResultSize = scan.getMaxResultSize();
3770 if (scan.hasFilter()) {
3771 this.filter = new FilterWrapper(scan.getFilter());
3772 } else {
3773 this.filter = null;
3774 }
3775
3776 this.batch = scan.getBatch();
3777 if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
3778 this.stopRow = null;
3779 } else {
3780 this.stopRow = scan.getStopRow();
3781 }
3782
3783
3784 this.isScan = scan.isGetScan() ? -1 : 0;
3785
3786
3787
3788 IsolationLevel isolationLevel = scan.getIsolationLevel();
3789 synchronized(scannerReadPoints) {
3790 this.readPt = getReadpoint(isolationLevel);
3791 scannerReadPoints.put(this, this.readPt);
3792 }
3793
3794
3795
3796 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
3797 List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
3798 if (additionalScanners != null) {
3799 scanners.addAll(additionalScanners);
3800 }
3801
3802 for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
3803 scan.getFamilyMap().entrySet()) {
3804 Store store = stores.get(entry.getKey());
3805 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
3806 if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
3807 || this.filter.isFamilyEssential(entry.getKey())) {
3808 scanners.add(scanner);
3809 } else {
3810 joinedScanners.add(scanner);
3811 }
3812 }
3813 initializeKVHeap(scanners, joinedScanners, region);
3814 }
3815
3816 RegionScannerImpl(Scan scan, HRegion region) throws IOException {
3817 this(scan, null, region);
3818 }
3819
3820 protected void initializeKVHeap(List<KeyValueScanner> scanners,
3821 List<KeyValueScanner> joinedScanners, HRegion region)
3822 throws IOException {
3823 this.storeHeap = new KeyValueHeap(scanners, region.comparator);
3824 if (!joinedScanners.isEmpty()) {
3825 this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
3826 }
3827 }
3828
3829 @Override
3830 public long getMaxResultSize() {
3831 return maxResultSize;
3832 }
3833
3834 @Override
3835 public long getMvccReadPoint() {
3836 return this.readPt;
3837 }
3838
3839
3840
3841
3842
3843
3844 protected void resetFilters() throws IOException {
3845 if (filter != null) {
3846 filter.reset();
3847 }
3848 }
3849
3850 @Override
3851 public boolean next(List<Cell> outResults)
3852 throws IOException {
3853
3854 return next(outResults, batch);
3855 }
3856
3857 @Override
3858 public synchronized boolean next(List<Cell> outResults, int limit) throws IOException {
3859 if (this.filterClosed) {
3860 throw new UnknownScannerException("Scanner was closed (timed out?) " +
3861 "after we renewed it. Could be caused by a very slow scanner " +
3862 "or a lengthy garbage collection");
3863 }
3864 startRegionOperation(Operation.SCAN);
3865 readRequestsCount.increment();
3866 try {
3867 return nextRaw(outResults, limit);
3868 } finally {
3869 closeRegionOperation(Operation.SCAN);
3870 }
3871 }
3872
3873 @Override
3874 public boolean nextRaw(List<Cell> outResults)
3875 throws IOException {
3876 return nextRaw(outResults, batch);
3877 }
3878
3879 @Override
3880 public boolean nextRaw(List<Cell> outResults, int limit) throws IOException {
3881 boolean returnResult;
3882 if (outResults.isEmpty()) {
3883
3884
3885 returnResult = nextInternal(outResults, limit);
3886 } else {
3887 List<Cell> tmpList = new ArrayList<Cell>();
3888 returnResult = nextInternal(tmpList, limit);
3889 outResults.addAll(tmpList);
3890 }
3891 resetFilters();
3892 if (isFilterDoneInternal()) {
3893 returnResult = false;
3894 }
3895 if (region != null && region.metricsRegion != null) {
3896 long totalSize = 0;
3897 for(Cell c:outResults) {
3898
3899 KeyValue kv = KeyValueUtil.ensureKeyValue(c);
3900 totalSize += kv.getLength();
3901 }
3902 region.metricsRegion.updateScanNext(totalSize);
3903 }
3904 return returnResult;
3905 }
3906
3907
3908 private void populateFromJoinedHeap(List<Cell> results, int limit)
3909 throws IOException {
3910 assert joinedContinuationRow != null;
3911 KeyValue kv = populateResult(results, this.joinedHeap, limit,
3912 joinedContinuationRow.getBuffer(), joinedContinuationRow.getRowOffset(),
3913 joinedContinuationRow.getRowLength());
3914 if (kv != KV_LIMIT) {
3915
3916 joinedContinuationRow = null;
3917 }
3918
3919
3920 Collections.sort(results, comparator);
3921 }
3922
3923
3924
3925
3926
3927
3928
3929
3930
3931
3932
3933 private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
3934 byte[] currentRow, int offset, short length) throws IOException {
3935 KeyValue nextKv;
3936 do {
3937 heap.next(results, limit - results.size());
3938 if (limit > 0 && results.size() == limit) {
3939 return KV_LIMIT;
3940 }
3941 nextKv = heap.peek();
3942 } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
3943
3944 return nextKv;
3945 }
3946
3947
3948
3949
3950 @Override
3951 public synchronized boolean isFilterDone() throws IOException {
3952 return isFilterDoneInternal();
3953 }
3954
3955 private boolean isFilterDoneInternal() throws IOException {
3956 return this.filter != null && this.filter.filterAllRemaining();
3957 }
3958
3959 private boolean nextInternal(List<Cell> results, int limit)
3960 throws IOException {
3961 if (!results.isEmpty()) {
3962 throw new IllegalArgumentException("First parameter should be an empty list");
3963 }
3964 RpcCallContext rpcCall = RpcServer.getCurrentCall();
3965
3966
3967
3968
3969
3970 while (true) {
3971 if (rpcCall != null) {
3972
3973
3974
3975
3976 long afterTime = rpcCall.disconnectSince();
3977 if (afterTime >= 0) {
3978 throw new CallerDisconnectedException(
3979 "Aborting on region " + getRegionNameAsString() + ", call " +
3980 this + " after " + afterTime + " ms, since " +
3981 "caller disconnected");
3982 }
3983 }
3984
3985
3986 KeyValue current = this.storeHeap.peek();
3987
3988 byte[] currentRow = null;
3989 int offset = 0;
3990 short length = 0;
3991 if (current != null) {
3992 currentRow = current.getBuffer();
3993 offset = current.getRowOffset();
3994 length = current.getRowLength();
3995 }
3996 boolean stopRow = isStopRow(currentRow, offset, length);
3997
3998
3999 if (joinedContinuationRow == null) {
4000
4001 if (stopRow) {
4002 if (filter != null && filter.hasFilterRow()) {
4003 filter.filterRowCells(results);
4004 }
4005 return false;
4006 }
4007
4008
4009
4010 if (filterRowKey(currentRow, offset, length)) {
4011 boolean moreRows = nextRow(currentRow, offset, length);
4012 if (!moreRows) return false;
4013 results.clear();
4014 continue;
4015 }
4016
4017 KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
4018 length);
4019
4020 if (nextKv == KV_LIMIT) {
4021 if (this.filter != null && filter.hasFilterRow()) {
4022 throw new IncompatibleFilterException(
4023 "Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
4024 }
4025 return true;
4026 }
4027
4028 stopRow = nextKv == null ||
4029 isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
4030
4031 final boolean isEmptyRow = results.isEmpty();
4032
4033
4034
4035 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
4036 if (filter != null && filter.hasFilterRow()) {
4037 ret = filter.filterRowCellsWithRet(results);
4038 }
4039
4040 if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
4041 results.clear();
4042 boolean moreRows = nextRow(currentRow, offset, length);
4043 if (!moreRows) return false;
4044
4045
4046
4047 if (!stopRow) continue;
4048 return false;
4049 }
4050
4051
4052
4053
4054
4055 if (this.joinedHeap != null) {
4056 KeyValue nextJoinedKv = joinedHeap.peek();
4057
4058 boolean mayHaveData =
4059 (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
4060 || (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
4061 true, true)
4062 && joinedHeap.peek() != null
4063 && joinedHeap.peek().matchingRow(currentRow, offset, length));
4064 if (mayHaveData) {
4065 joinedContinuationRow = current;
4066 populateFromJoinedHeap(results, limit);
4067 }
4068 }
4069 } else {
4070
4071 populateFromJoinedHeap(results, limit);
4072 }
4073
4074
4075
4076 if (joinedContinuationRow != null) {
4077 return true;
4078 }
4079
4080
4081
4082
4083 if (results.isEmpty()) {
4084 boolean moreRows = nextRow(currentRow, offset, length);
4085 if (!moreRows) return false;
4086 if (!stopRow) continue;
4087 }
4088
4089
4090 return !stopRow;
4091 }
4092 }
4093
4094
4095
4096
4097
4098
4099
4100
4101 private boolean filterRow() throws IOException {
4102
4103
4104 return filter != null && (!filter.hasFilterRow())
4105 && filter.filterRow();
4106 }
4107
4108 private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
4109 return filter != null
4110 && filter.filterRowKey(row, offset, length);
4111 }
4112
4113 protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
4114 assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
4115 KeyValue next;
4116 while ((next = this.storeHeap.peek()) != null &&
4117 next.matchingRow(currentRow, offset, length)) {
4118 this.storeHeap.next(MOCKED_LIST);
4119 }
4120 resetFilters();
4121
4122 return this.region.getCoprocessorHost() == null
4123 || this.region.getCoprocessorHost()
4124 .postScannerFilterRow(this, currentRow, offset, length);
4125 }
4126
4127 protected boolean isStopRow(byte[] currentRow, int offset, short length) {
4128 return currentRow == null ||
4129 (stopRow != null &&
4130 comparator.compareRows(stopRow, 0, stopRow.length,
4131 currentRow, offset, length) <= isScan);
4132 }
4133
4134 @Override
4135 public synchronized void close() {
4136 if (storeHeap != null) {
4137 storeHeap.close();
4138 storeHeap = null;
4139 }
4140 if (joinedHeap != null) {
4141 joinedHeap.close();
4142 joinedHeap = null;
4143 }
4144
4145 scannerReadPoints.remove(this);
4146 this.filterClosed = true;
4147 }
4148
4149 KeyValueHeap getStoreHeapForTesting() {
4150 return storeHeap;
4151 }
4152
4153 @Override
4154 public synchronized boolean reseek(byte[] row) throws IOException {
4155 if (row == null) {
4156 throw new IllegalArgumentException("Row cannot be null.");
4157 }
4158 boolean result = false;
4159 startRegionOperation();
4160 try {
4161 KeyValue kv = KeyValue.createFirstOnRow(row);
4162
4163 result = this.storeHeap.requestSeek(kv, true, true);
4164 if (this.joinedHeap != null) {
4165 result = this.joinedHeap.requestSeek(kv, true, true) || result;
4166 }
4167 } finally {
4168 closeRegionOperation();
4169 }
4170 return result;
4171 }
4172 }
4173
4174
4175
4176
4177
4178
4179
4180
4181
4182
4183
4184
4185
4186
4187
4188
4189
4190
4191
4192
4193
4194
4195 static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
4196 Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
4197 RegionServerServices rsServices) {
4198 try {
4199 @SuppressWarnings("unchecked")
4200 Class<? extends HRegion> regionClass =
4201 (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
4202
4203 Constructor<? extends HRegion> c =
4204 regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
4205 Configuration.class, HRegionInfo.class, HTableDescriptor.class,
4206 RegionServerServices.class);
4207
4208 return c.newInstance(tableDir, log, fs, conf, regionInfo, htd, rsServices);
4209 } catch (Throwable e) {
4210
4211 throw new IllegalStateException("Could not instantiate a region instance.", e);
4212 }
4213 }
4214
4215
4216
4217
4218
4219
4220
4221
4222
4223
4224
4225
4226
4227
4228
4229
4230
4231
4232
4233 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4234 final Configuration conf, final HTableDescriptor hTableDescriptor)
4235 throws IOException {
4236 return createHRegion(info, rootDir, conf, hTableDescriptor, null);
4237 }
4238
4239
4240
4241
4242
4243
4244
4245
4246
4247
4248
4249
4250 public static void closeHRegion(final HRegion r) throws IOException {
4251 if (r == null) return;
4252 r.close();
4253 if (r.getLog() == null) return;
4254 r.getLog().closeAndDelete();
4255 }
4256
4257
4258
4259
4260
4261
4262
4263
4264
4265
4266
4267
4268
4269
4270
4271
4272 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4273 final Configuration conf,
4274 final HTableDescriptor hTableDescriptor,
4275 final HLog hlog,
4276 final boolean initialize)
4277 throws IOException {
4278 return createHRegion(info, rootDir, conf, hTableDescriptor,
4279 hlog, initialize, false);
4280 }
4281
4282
4283
4284
4285
4286
4287
4288
4289
4290
4291
4292
4293
4294
4295
4296
4297
4298 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4299 final Configuration conf,
4300 final HTableDescriptor hTableDescriptor,
4301 final HLog hlog,
4302 final boolean initialize, final boolean ignoreHLog)
4303 throws IOException {
4304 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4305 return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
4306 }
4307
4308
4309
4310
4311
4312
4313
4314
4315
4316
4317
4318
4319
4320
4321
4322
4323
4324
4325 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
4326 final Configuration conf,
4327 final HTableDescriptor hTableDescriptor,
4328 final HLog hlog,
4329 final boolean initialize, final boolean ignoreHLog)
4330 throws IOException {
4331 LOG.info("creating HRegion " + info.getTable().getNameAsString()
4332 + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
4333 " Table name == " + info.getTable().getNameAsString());
4334 FileSystem fs = FileSystem.get(conf);
4335 HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
4336 HLog effectiveHLog = hlog;
4337 if (hlog == null && !ignoreHLog) {
4338 effectiveHLog = HLogFactory.createHLog(fs, rfs.getRegionDir(),
4339 HConstants.HREGION_LOGDIR_NAME, conf);
4340 }
4341 HRegion region = HRegion.newHRegion(tableDir,
4342 effectiveHLog, fs, conf, info, hTableDescriptor, null);
4343 if (initialize) {
4344
4345
4346 region.setSequenceId(region.initialize());
4347 }
4348 return region;
4349 }
4350
4351 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
4352 final Configuration conf,
4353 final HTableDescriptor hTableDescriptor,
4354 final HLog hlog)
4355 throws IOException {
4356 return createHRegion(info, rootDir, conf, hTableDescriptor, hlog, true);
4357 }
4358
4359
4360
4361
4362
4363
4364
4365
4366
4367
4368
4369
4370
4371
4372 public static HRegion openHRegion(final HRegionInfo info,
4373 final HTableDescriptor htd, final HLog wal,
4374 final Configuration conf)
4375 throws IOException {
4376 return openHRegion(info, htd, wal, conf, null, null);
4377 }
4378
4379
4380
4381
4382
4383
4384
4385
4386
4387
4388
4389
4390
4391
4392
4393
4394 public static HRegion openHRegion(final HRegionInfo info,
4395 final HTableDescriptor htd, final HLog wal, final Configuration conf,
4396 final RegionServerServices rsServices,
4397 final CancelableProgressable reporter)
4398 throws IOException {
4399 return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
4400 }
4401
4402
4403
4404
4405
4406
4407
4408
4409
4410
4411
4412
4413
4414
4415 public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
4416 final HTableDescriptor htd, final HLog wal, final Configuration conf)
4417 throws IOException {
4418 return openHRegion(rootDir, info, htd, wal, conf, null, null);
4419 }
4420
4421
4422
4423
4424
4425
4426
4427
4428
4429
4430
4431
4432
4433
4434
4435
4436 public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
4437 final HTableDescriptor htd, final HLog wal, final Configuration conf,
4438 final RegionServerServices rsServices,
4439 final CancelableProgressable reporter)
4440 throws IOException {
4441 FileSystem fs = null;
4442 if (rsServices != null) {
4443 fs = rsServices.getFileSystem();
4444 }
4445 if (fs == null) {
4446 fs = FileSystem.get(conf);
4447 }
4448 return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
4449 }
4450
4451
4452
4453
4454
4455
4456
4457
4458
4459
4460
4461
4462
4463
4464
4465 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4466 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
4467 throws IOException {
4468 return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
4469 }
4470
4471
4472
4473
4474
4475
4476
4477
4478
4479
4480
4481
4482
4483
4484
4485
4486
4487 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4488 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4489 final RegionServerServices rsServices, final CancelableProgressable reporter)
4490 throws IOException {
4491 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
4492 return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
4493 }
4494
4495
4496
4497
4498
4499
4500
4501
4502
4503
4504
4505
4506
4507
4508
4509
4510
4511 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
4512 final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
4513 final RegionServerServices rsServices, final CancelableProgressable reporter)
4514 throws IOException {
4515 if (info == null) throw new NullPointerException("Passed region info is null");
4516 if (LOG.isDebugEnabled()) {
4517 LOG.debug("Opening region: " + info);
4518 }
4519 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
4520 return r.openHRegion(reporter);
4521 }
4522
4523
4524
4525
4526
4527
4528
4529
4530
4531 public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
4532 throws IOException {
4533 HRegionFileSystem regionFs = other.getRegionFileSystem();
4534 HRegion r = newHRegion(regionFs.getTableDir(), other.getLog(), regionFs.getFileSystem(),
4535 other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
4536 return r.openHRegion(reporter);
4537 }
4538
4539
4540
4541
4542
4543
4544
4545
4546 protected HRegion openHRegion(final CancelableProgressable reporter)
4547 throws IOException {
4548 checkCompressionCodecs();
4549
4550 this.openSeqNum = initialize(reporter);
4551 this.setSequenceId(openSeqNum);
4552 return this;
4553 }
4554
4555 private void checkCompressionCodecs() throws IOException {
4556 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
4557 CompressionTest.testCompression(fam.getCompression());
4558 CompressionTest.testCompression(fam.getCompactionCompression());
4559 }
4560 }
4561
4562
4563
4564
4565
4566
4567 HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
4568
4569 fs.commitDaughterRegion(hri);
4570
4571
4572 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
4573 this.getBaseConf(), hri, this.getTableDesc(), rsServices);
4574 r.readRequestsCount.set(this.getReadRequestsCount() / 2);
4575 r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
4576 return r;
4577 }
4578
4579
4580
4581
4582
4583
4584
4585
4586 HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
4587 final HRegion region_b) throws IOException {
4588 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(),
4589 fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
4590 this.getTableDesc(), this.rsServices);
4591 r.readRequestsCount.set(this.getReadRequestsCount()
4592 + region_b.getReadRequestsCount());
4593 r.writeRequestsCount.set(this.getWriteRequestsCount()
4594 + region_b.getWriteRequestsCount());
4595 this.fs.commitMergedRegion(mergedRegionInfo);
4596 return r;
4597 }
4598
4599
4600
4601
4602
4603
4604
4605
4606
4607
4608
4609
4610 public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
4611 meta.checkResources();
4612
4613 byte[] row = r.getRegionName();
4614 final long now = EnvironmentEdgeManager.currentTimeMillis();
4615 final List<Cell> cells = new ArrayList<Cell>(2);
4616 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4617 HConstants.REGIONINFO_QUALIFIER, now,
4618 r.getRegionInfo().toByteArray()));
4619
4620 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
4621 HConstants.META_VERSION_QUALIFIER, now,
4622 Bytes.toBytes(HConstants.META_VERSION)));
4623 meta.put(row, HConstants.CATALOG_FAMILY, cells);
4624 }
4625
4626
4627
4628
4629
4630
4631
4632
4633 @Deprecated
4634 public static Path getRegionDir(final Path tabledir, final String name) {
4635 return new Path(tabledir, name);
4636 }
4637
4638
4639
4640
4641
4642
4643
4644
4645 @Deprecated
4646 public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
4647 return new Path(
4648 FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
4649 }
4650
4651
4652
4653
4654
4655
4656
4657
4658
4659 public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
4660 return ((info.getStartKey().length == 0) ||
4661 (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
4662 ((info.getEndKey().length == 0) ||
4663 (Bytes.compareTo(info.getEndKey(), row) > 0));
4664 }
4665
4666
4667
4668
4669
4670
4671
4672
4673
4674 public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
4675 throws IOException {
4676 HRegion a = srcA;
4677 HRegion b = srcB;
4678
4679
4680
4681 if (srcA.getStartKey() == null) {
4682 if (srcB.getStartKey() == null) {
4683 throw new IOException("Cannot merge two regions with null start key");
4684 }
4685
4686 } else if ((srcB.getStartKey() == null) ||
4687 (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
4688 a = srcB;
4689 b = srcA;
4690 }
4691
4692 if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
4693 throw new IOException("Cannot merge non-adjacent regions");
4694 }
4695 return merge(a, b);
4696 }
4697
4698
4699
4700
4701
4702
4703
4704
4705
4706 public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
4707 if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
4708 throw new IOException("Regions do not belong to the same table");
4709 }
4710
4711 FileSystem fs = a.getRegionFileSystem().getFileSystem();
4712
4713 a.flushcache();
4714 b.flushcache();
4715
4716
4717 a.compactStores(true);
4718 if (LOG.isDebugEnabled()) {
4719 LOG.debug("Files for region: " + a);
4720 a.getRegionFileSystem().logFileSystemState(LOG);
4721 }
4722 b.compactStores(true);
4723 if (LOG.isDebugEnabled()) {
4724 LOG.debug("Files for region: " + b);
4725 b.getRegionFileSystem().logFileSystemState(LOG);
4726 }
4727
4728 RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
4729 if (!rmt.prepare(null)) {
4730 throw new IOException("Unable to merge regions " + a + " and " + b);
4731 }
4732 HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
4733 LOG.info("starting merge of regions: " + a + " and " + b
4734 + " into new region " + mergedRegionInfo.getRegionNameAsString()
4735 + " with start key <"
4736 + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
4737 + "> and end key <"
4738 + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
4739 HRegion dstRegion;
4740 try {
4741 dstRegion = rmt.execute(null, null);
4742 } catch (IOException ioe) {
4743 rmt.rollback(null, null);
4744 throw new IOException("Failed merging region " + a + " and " + b
4745 + ", and succssfully rolled back");
4746 }
4747 dstRegion.compactStores(true);
4748
4749 if (LOG.isDebugEnabled()) {
4750 LOG.debug("Files for new region");
4751 dstRegion.getRegionFileSystem().logFileSystemState(LOG);
4752 }
4753
4754 if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
4755 throw new IOException("Merged region " + dstRegion
4756 + " still has references after the compaction, is compaction canceled?");
4757 }
4758
4759
4760 HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
4761
4762 HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
4763
4764 LOG.info("merge completed. New region is " + dstRegion);
4765 return dstRegion;
4766 }
4767
4768
4769
4770
4771
4772 boolean isMajorCompaction() throws IOException {
4773 for (Store store : this.stores.values()) {
4774 if (store.isMajorCompaction()) {
4775 return true;
4776 }
4777 }
4778 return false;
4779 }
4780
4781
4782
4783
4784
4785
4786
4787
4788
4789 public Result get(final Get get) throws IOException {
4790 checkRow(get.getRow(), "Get");
4791
4792 if (get.hasFamilies()) {
4793 for (byte [] family: get.familySet()) {
4794 checkFamily(family);
4795 }
4796 } else {
4797 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
4798 get.addFamily(family);
4799 }
4800 }
4801 List<Cell> results = get(get, true);
4802 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null);
4803 }
4804
4805
4806
4807
4808
4809
4810 public List<Cell> get(Get get, boolean withCoprocessor)
4811 throws IOException {
4812
4813 List<Cell> results = new ArrayList<Cell>();
4814
4815
4816 if (withCoprocessor && (coprocessorHost != null)) {
4817 if (coprocessorHost.preGet(get, results)) {
4818 return results;
4819 }
4820 }
4821
4822 Scan scan = new Scan(get);
4823
4824 RegionScanner scanner = null;
4825 try {
4826 scanner = getScanner(scan);
4827 scanner.next(results);
4828 } finally {
4829 if (scanner != null)
4830 scanner.close();
4831 }
4832
4833
4834 if (withCoprocessor && (coprocessorHost != null)) {
4835 coprocessorHost.postGet(get, results);
4836 }
4837
4838
4839 if (this.metricsRegion != null) {
4840 long totalSize = 0l;
4841 if (results != null) {
4842 for (Cell kv:results) {
4843 totalSize += KeyValueUtil.ensureKeyValue(kv).getLength();
4844 }
4845 }
4846 this.metricsRegion.updateGet(totalSize);
4847 }
4848
4849 return results;
4850 }
4851
4852 public void mutateRow(RowMutations rm) throws IOException {
4853
4854 mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
4855 }
4856
4857
4858
4859
4860
4861 public void mutateRowsWithLocks(Collection<Mutation> mutations,
4862 Collection<byte[]> rowsToLock) throws IOException {
4863 mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
4864 }
4865
4866
4867
4868
4869
4870
4871
4872
4873
4874
4875
4876
4877
4878 public void mutateRowsWithLocks(Collection<Mutation> mutations,
4879 Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
4880 MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
4881 processRowsWithLocks(proc, -1, nonceGroup, nonce);
4882 }
4883
4884
4885
4886
4887
4888
4889
4890
4891 public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
4892 throws IOException {
4893 processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
4894 }
4895
4896
4897
4898
4899
4900
4901
4902
4903
4904
4905 public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
4906 long nonceGroup, long nonce) throws IOException {
4907
4908 for (byte[] row : processor.getRowsToLock()) {
4909 checkRow(row, "processRowsWithLocks");
4910 }
4911 if (!processor.readOnly()) {
4912 checkReadOnly();
4913 }
4914 checkResources();
4915
4916 startRegionOperation();
4917 WALEdit walEdit = new WALEdit();
4918
4919
4920 try {
4921 processor.preProcess(this, walEdit);
4922 } catch (IOException e) {
4923 closeRegionOperation();
4924 throw e;
4925 }
4926
4927 if (processor.readOnly()) {
4928 try {
4929 long now = EnvironmentEdgeManager.currentTimeMillis();
4930 doProcessRowWithTimeout(
4931 processor, now, this, null, null, timeout);
4932 processor.postProcess(this, walEdit, true);
4933 } catch (IOException e) {
4934 throw e;
4935 } finally {
4936 closeRegionOperation();
4937 }
4938 return;
4939 }
4940
4941 MultiVersionConsistencyControl.WriteEntry writeEntry = null;
4942 boolean locked = false;
4943 boolean walSyncSuccessful = false;
4944 List<RowLock> acquiredRowLocks = null;
4945 long addedSize = 0;
4946 List<Mutation> mutations = new ArrayList<Mutation>();
4947 Collection<byte[]> rowsToLock = processor.getRowsToLock();
4948 try {
4949
4950 acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
4951 for (byte[] row : rowsToLock) {
4952
4953 acquiredRowLocks.add(getRowLock(row));
4954 }
4955
4956 lock(this.updatesLock.readLock(), acquiredRowLocks.size());
4957 locked = true;
4958
4959 long now = EnvironmentEdgeManager.currentTimeMillis();
4960 try {
4961
4962
4963 doProcessRowWithTimeout(
4964 processor, now, this, mutations, walEdit, timeout);
4965
4966 if (!mutations.isEmpty()) {
4967
4968 writeEntry = mvcc.beginMemstoreInsert();
4969
4970 processor.preBatchMutate(this, walEdit);
4971
4972 for (Mutation m : mutations) {
4973 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
4974 KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
4975 kv.setMvccVersion(writeEntry.getWriteNumber());
4976 byte[] family = kv.getFamily();
4977 checkFamily(family);
4978 addedSize += stores.get(family).add(kv);
4979 }
4980 }
4981
4982 long txid = 0;
4983
4984 if (!walEdit.isEmpty()) {
4985 txid = this.log.appendNoSync(this.getRegionInfo(),
4986 this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now,
4987 this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce);
4988 }
4989
4990 if (locked) {
4991 this.updatesLock.readLock().unlock();
4992 locked = false;
4993 }
4994
4995
4996 releaseRowLocks(acquiredRowLocks);
4997
4998
4999 if (txid != 0) {
5000 syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
5001 }
5002 walSyncSuccessful = true;
5003
5004 processor.postBatchMutate(this);
5005 }
5006 } finally {
5007 if (!mutations.isEmpty() && !walSyncSuccessful) {
5008 LOG.warn("Wal sync failed. Roll back " + mutations.size() +
5009 " memstore keyvalues for row(s):" +
5010 processor.getRowsToLock().iterator().next() + "...");
5011 for (Mutation m : mutations) {
5012 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
5013 KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
5014 stores.get(kv.getFamily()).rollback(kv);
5015 }
5016 }
5017 }
5018
5019 if (writeEntry != null) {
5020 mvcc.completeMemstoreInsert(writeEntry);
5021 writeEntry = null;
5022 }
5023 if (locked) {
5024 this.updatesLock.readLock().unlock();
5025 locked = false;
5026 }
5027
5028 releaseRowLocks(acquiredRowLocks);
5029 }
5030
5031
5032 processor.postProcess(this, walEdit, walSyncSuccessful);
5033
5034 } catch (IOException e) {
5035 throw e;
5036 } finally {
5037 closeRegionOperation();
5038 if (!mutations.isEmpty() &&
5039 isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
5040 requestFlush();
5041 }
5042 }
5043 }
5044
5045 private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
5046 final long now,
5047 final HRegion region,
5048 final List<Mutation> mutations,
5049 final WALEdit walEdit,
5050 final long timeout) throws IOException {
5051
5052 if (timeout < 0) {
5053 try {
5054 processor.process(now, region, mutations, walEdit);
5055 } catch (IOException e) {
5056 LOG.warn("RowProcessor:" + processor.getClass().getName() +
5057 " throws Exception on row(s):" +
5058 Bytes.toStringBinary(
5059 processor.getRowsToLock().iterator().next()) + "...", e);
5060 throw e;
5061 }
5062 return;
5063 }
5064
5065
5066 FutureTask<Void> task =
5067 new FutureTask<Void>(new Callable<Void>() {
5068 @Override
5069 public Void call() throws IOException {
5070 try {
5071 processor.process(now, region, mutations, walEdit);
5072 return null;
5073 } catch (IOException e) {
5074 LOG.warn("RowProcessor:" + processor.getClass().getName() +
5075 " throws Exception on row(s):" +
5076 Bytes.toStringBinary(
5077 processor.getRowsToLock().iterator().next()) + "...", e);
5078 throw e;
5079 }
5080 }
5081 });
5082 rowProcessorExecutor.execute(task);
5083 try {
5084 task.get(timeout, TimeUnit.MILLISECONDS);
5085 } catch (TimeoutException te) {
5086 LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
5087 Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
5088 "...");
5089 throw new IOException(te);
5090 } catch (Exception e) {
5091 throw new IOException(e);
5092 }
5093 }
5094
5095 public Result append(Append append) throws IOException {
5096 return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
5097 }
5098
5099
5100
5101
5102
5103
5104
5105
5106
5107
5108 public Result append(Append append, long nonceGroup, long nonce)
5109 throws IOException {
5110 byte[] row = append.getRow();
5111 checkRow(row, "append");
5112 boolean flush = false;
5113 Durability durability = getEffectiveDurability(append.getDurability());
5114 boolean writeToWAL = durability != Durability.SKIP_WAL;
5115 WALEdit walEdits = null;
5116 List<Cell> allKVs = new ArrayList<Cell>(append.size());
5117 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5118
5119 long size = 0;
5120 long txid = 0;
5121
5122 checkReadOnly();
5123 checkResources();
5124
5125 startRegionOperation(Operation.APPEND);
5126 this.writeRequestsCount.increment();
5127 WriteEntry w = null;
5128 RowLock rowLock;
5129 try {
5130 rowLock = getRowLock(row);
5131 try {
5132 lock(this.updatesLock.readLock());
5133 try {
5134
5135
5136 mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
5137 if (this.coprocessorHost != null) {
5138 Result r = this.coprocessorHost.preAppendAfterRowLock(append);
5139 if(r!= null) {
5140 return r;
5141 }
5142 }
5143
5144 w = mvcc.beginMemstoreInsert();
5145 long now = EnvironmentEdgeManager.currentTimeMillis();
5146
5147 for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
5148
5149 Store store = stores.get(family.getKey());
5150 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5151
5152
5153
5154
5155
5156 Collections.sort(family.getValue(), store.getComparator());
5157
5158 Get get = new Get(row);
5159 for (Cell cell : family.getValue()) {
5160 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5161 get.addColumn(family.getKey(), kv.getQualifier());
5162 }
5163 List<Cell> results = get(get, false);
5164
5165
5166
5167
5168
5169
5170
5171 int idx = 0;
5172 for (Cell cell : family.getValue()) {
5173 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5174 KeyValue newKV;
5175 KeyValue oldKv = null;
5176 if (idx < results.size()
5177 && CellUtil.matchingQualifier(results.get(idx),kv)) {
5178 oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
5179
5180 newKV = new KeyValue(row.length, kv.getFamilyLength(),
5181 kv.getQualifierLength(), now, KeyValue.Type.Put,
5182 oldKv.getValueLength() + kv.getValueLength(),
5183 oldKv.getTagsLengthUnsigned() + kv.getTagsLengthUnsigned());
5184
5185 System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
5186 newKV.getBuffer(), newKV.getValueOffset(),
5187 oldKv.getValueLength());
5188 System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
5189 newKV.getBuffer(),
5190 newKV.getValueOffset() + oldKv.getValueLength(),
5191 kv.getValueLength());
5192
5193 System.arraycopy(oldKv.getBuffer(), oldKv.getTagsOffset(), newKV.getBuffer(),
5194 newKV.getTagsOffset(), oldKv.getTagsLengthUnsigned());
5195 System.arraycopy(kv.getBuffer(), kv.getTagsOffset(), newKV.getBuffer(),
5196 newKV.getTagsOffset() + oldKv.getTagsLengthUnsigned(),
5197 kv.getTagsLengthUnsigned());
5198
5199 System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
5200 newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
5201 System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
5202 newKV.getBuffer(), newKV.getFamilyOffset(),
5203 kv.getFamilyLength());
5204 System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
5205 newKV.getBuffer(), newKV.getQualifierOffset(),
5206 kv.getQualifierLength());
5207 idx++;
5208 } else {
5209 newKV = kv;
5210
5211
5212 newKV.updateLatestStamp(Bytes.toBytes(now));
5213 }
5214 newKV.setMvccVersion(w.getWriteNumber());
5215
5216 if (coprocessorHost != null) {
5217 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5218 RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKV));
5219 }
5220 kvs.add(newKV);
5221
5222
5223 if (writeToWAL) {
5224 if (walEdits == null) {
5225 walEdits = new WALEdit();
5226 }
5227 walEdits.add(newKV);
5228 }
5229 }
5230
5231
5232 tempMemstore.put(store, kvs);
5233 }
5234
5235
5236 if (writeToWAL) {
5237
5238
5239
5240 txid = this.log.appendNoSync(this.getRegionInfo(),
5241 this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
5242 EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
5243 true, nonceGroup, nonce);
5244 } else {
5245 recordMutationWithoutWal(append.getFamilyCellMap());
5246 }
5247
5248
5249 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5250 Store store = entry.getKey();
5251 if (store.getFamily().getMaxVersions() == 1) {
5252
5253 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5254 } else {
5255
5256 for (Cell cell: entry.getValue()) {
5257 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5258 size += store.add(kv);
5259 }
5260 }
5261 allKVs.addAll(entry.getValue());
5262 }
5263 size = this.addAndGetGlobalMemstoreSize(size);
5264 flush = isFlushSize(size);
5265 } finally {
5266 this.updatesLock.readLock().unlock();
5267 }
5268 } finally {
5269 rowLock.release();
5270 }
5271 if (writeToWAL) {
5272
5273 syncOrDefer(txid, durability);
5274 }
5275 } finally {
5276 if (w != null) {
5277 mvcc.completeMemstoreInsert(w);
5278 }
5279 closeRegionOperation(Operation.APPEND);
5280 }
5281
5282 if (this.metricsRegion != null) {
5283 this.metricsRegion.updateAppend();
5284 }
5285
5286 if (flush) {
5287
5288 requestFlush();
5289 }
5290
5291
5292 return append.isReturnResults() ? Result.create(allKVs) : null;
5293 }
5294
5295 public Result increment(Increment increment) throws IOException {
5296 return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
5297 }
5298
5299
5300
5301
5302
5303
5304
5305 public Result increment(Increment increment, long nonceGroup, long nonce)
5306 throws IOException {
5307 byte [] row = increment.getRow();
5308 checkRow(row, "increment");
5309 TimeRange tr = increment.getTimeRange();
5310 boolean flush = false;
5311 Durability durability = getEffectiveDurability(increment.getDurability());
5312 boolean writeToWAL = durability != Durability.SKIP_WAL;
5313 WALEdit walEdits = null;
5314 List<Cell> allKVs = new ArrayList<Cell>(increment.size());
5315 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
5316
5317 long size = 0;
5318 long txid = 0;
5319
5320 checkReadOnly();
5321 checkResources();
5322
5323 startRegionOperation(Operation.INCREMENT);
5324 this.writeRequestsCount.increment();
5325 WriteEntry w = null;
5326 try {
5327 RowLock rowLock = getRowLock(row);
5328 try {
5329 lock(this.updatesLock.readLock());
5330 try {
5331
5332
5333 mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
5334 if (this.coprocessorHost != null) {
5335 Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
5336 if (r != null) {
5337 return r;
5338 }
5339 }
5340
5341 w = mvcc.beginMemstoreInsert();
5342 long now = EnvironmentEdgeManager.currentTimeMillis();
5343
5344 for (Map.Entry<byte [], List<Cell>> family:
5345 increment.getFamilyCellMap().entrySet()) {
5346
5347 Store store = stores.get(family.getKey());
5348 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
5349
5350
5351
5352
5353
5354 Collections.sort(family.getValue(), store.getComparator());
5355
5356 Get get = new Get(row);
5357 for (Cell cell: family.getValue()) {
5358 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5359 get.addColumn(family.getKey(), kv.getQualifier());
5360 }
5361 get.setTimeRange(tr.getMin(), tr.getMax());
5362 List<Cell> results = get(get, false);
5363
5364
5365
5366 int idx = 0;
5367 for (Cell kv: family.getValue()) {
5368 long amount = Bytes.toLong(CellUtil.cloneValue(kv));
5369 boolean noWriteBack = (amount == 0);
5370
5371 Cell c = null;
5372 if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
5373 c = results.get(idx);
5374 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
5375 amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG);
5376 } else {
5377
5378 throw new org.apache.hadoop.hbase.DoNotRetryIOException(
5379 "Attempted to increment field that isn't 64 bits wide");
5380 }
5381 idx++;
5382 }
5383
5384
5385 byte[] q = CellUtil.cloneQualifier(kv);
5386 byte[] val = Bytes.toBytes(amount);
5387 int oldCellTagsLen = (c == null) ? 0 : c.getTagsLengthUnsigned();
5388 int incCellTagsLen = kv.getTagsLengthUnsigned();
5389 KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now,
5390 KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
5391 System.arraycopy(row, 0, newKV.getBuffer(), newKV.getRowOffset(), row.length);
5392 System.arraycopy(family.getKey(), 0, newKV.getBuffer(), newKV.getFamilyOffset(),
5393 family.getKey().length);
5394 System.arraycopy(q, 0, newKV.getBuffer(), newKV.getQualifierOffset(), q.length);
5395
5396 System.arraycopy(val, 0, newKV.getBuffer(), newKV.getValueOffset(), val.length);
5397
5398 if (oldCellTagsLen > 0) {
5399 System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getBuffer(),
5400 newKV.getTagsOffset(), oldCellTagsLen);
5401 }
5402 if (incCellTagsLen > 0) {
5403 System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getBuffer(),
5404 newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
5405 }
5406 newKV.setMvccVersion(w.getWriteNumber());
5407
5408 if (coprocessorHost != null) {
5409 newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
5410 RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV));
5411 }
5412 allKVs.add(newKV);
5413
5414 if (!noWriteBack) {
5415 kvs.add(newKV);
5416
5417
5418 if (writeToWAL) {
5419 if (walEdits == null) {
5420 walEdits = new WALEdit();
5421 }
5422 walEdits.add(newKV);
5423 }
5424 }
5425 }
5426
5427
5428 if (!kvs.isEmpty()) {
5429 tempMemstore.put(store, kvs);
5430 }
5431 }
5432
5433
5434 if (walEdits != null && !walEdits.isEmpty()) {
5435 if (writeToWAL) {
5436
5437
5438
5439 txid = this.log.appendNoSync(this.getRegionInfo(),
5440 this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
5441 EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
5442 true, nonceGroup, nonce);
5443 } else {
5444 recordMutationWithoutWal(increment.getFamilyCellMap());
5445 }
5446 }
5447
5448 if (!tempMemstore.isEmpty()) {
5449 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
5450 Store store = entry.getKey();
5451 if (store.getFamily().getMaxVersions() == 1) {
5452
5453 size += store.upsert(entry.getValue(), getSmallestReadPoint());
5454 } else {
5455
5456 for (Cell cell : entry.getValue()) {
5457 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5458 size += store.add(kv);
5459 }
5460 }
5461 }
5462 size = this.addAndGetGlobalMemstoreSize(size);
5463 flush = isFlushSize(size);
5464 }
5465 } finally {
5466 this.updatesLock.readLock().unlock();
5467 }
5468 } finally {
5469 rowLock.release();
5470 }
5471 if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) {
5472
5473 syncOrDefer(txid, durability);
5474 }
5475 } finally {
5476 if (w != null) {
5477 mvcc.completeMemstoreInsert(w);
5478 }
5479 closeRegionOperation(Operation.INCREMENT);
5480 if (this.metricsRegion != null) {
5481 this.metricsRegion.updateIncrement();
5482 }
5483 }
5484
5485 if (flush) {
5486
5487 requestFlush();
5488 }
5489
5490 return Result.create(allKVs);
5491 }
5492
5493
5494
5495
5496
5497 private void checkFamily(final byte [] family)
5498 throws NoSuchColumnFamilyException {
5499 if (!this.htableDescriptor.hasFamily(family)) {
5500 throw new NoSuchColumnFamilyException("Column family " +
5501 Bytes.toString(family) + " does not exist in region " + this
5502 + " in table " + this.htableDescriptor);
5503 }
5504 }
5505
5506 public static final long FIXED_OVERHEAD = ClassSize.align(
5507 ClassSize.OBJECT +
5508 ClassSize.ARRAY +
5509 41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
5510 (12 * Bytes.SIZEOF_LONG) +
5511 4 * Bytes.SIZEOF_BOOLEAN);
5512
5513
5514
5515
5516
5517
5518
5519
5520
5521
5522
5523 public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
5524 ClassSize.OBJECT +
5525 (2 * ClassSize.ATOMIC_BOOLEAN) +
5526 (3 * ClassSize.ATOMIC_LONG) +
5527 (2 * ClassSize.CONCURRENT_HASHMAP) +
5528 WriteState.HEAP_SIZE +
5529 ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY +
5530 (2 * ClassSize.REENTRANT_LOCK) +
5531 ClassSize.ARRAYLIST +
5532 MultiVersionConsistencyControl.FIXED_SIZE
5533 + ClassSize.TREEMAP
5534 + 2 * ClassSize.ATOMIC_INTEGER
5535 ;
5536
5537 @Override
5538 public long heapSize() {
5539 long heapSize = DEEP_OVERHEAD;
5540 for (Store store : this.stores.values()) {
5541 heapSize += store.heapSize();
5542 }
5543
5544 return heapSize;
5545 }
5546
5547
5548
5549
5550
5551 private static void printUsageAndExit(final String message) {
5552 if (message != null && message.length() > 0) System.out.println(message);
5553 System.out.println("Usage: HRegion CATLALOG_TABLE_DIR [major_compact]");
5554 System.out.println("Options:");
5555 System.out.println(" major_compact Pass this option to major compact " +
5556 "passed region.");
5557 System.out.println("Default outputs scan of passed region.");
5558 System.exit(1);
5559 }
5560
5561
5562
5563
5564
5565
5566
5567
5568
5569
5570
5571
5572
5573
5574
5575
5576
5577 public boolean registerService(Service instance) {
5578
5579
5580
5581 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
5582 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
5583 LOG.error("Coprocessor service "+serviceDesc.getFullName()+
5584 " already registered, rejecting request from "+instance
5585 );
5586 return false;
5587 }
5588
5589 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
5590 if (LOG.isDebugEnabled()) {
5591 LOG.debug("Registered coprocessor service: region="+
5592 Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName());
5593 }
5594 return true;
5595 }
5596
5597
5598
5599
5600
5601
5602
5603
5604
5605
5606
5607
5608
5609
5610
5611 public Message execService(RpcController controller, CoprocessorServiceCall call)
5612 throws IOException {
5613 String serviceName = call.getServiceName();
5614 String methodName = call.getMethodName();
5615 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
5616 throw new UnknownProtocolException(null,
5617 "No registered coprocessor service found for name "+serviceName+
5618 " in region "+Bytes.toStringBinary(getRegionName()));
5619 }
5620
5621 Service service = coprocessorServiceHandlers.get(serviceName);
5622 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
5623 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
5624 if (methodDesc == null) {
5625 throw new UnknownProtocolException(service.getClass(),
5626 "Unknown method "+methodName+" called on service "+serviceName+
5627 " in region "+Bytes.toStringBinary(getRegionName()));
5628 }
5629
5630 Message request = service.getRequestPrototype(methodDesc).newBuilderForType()
5631 .mergeFrom(call.getRequest()).build();
5632
5633 if (coprocessorHost != null) {
5634 request = coprocessorHost.preEndpointInvocation(service, methodName, request);
5635 }
5636
5637 final Message.Builder responseBuilder =
5638 service.getResponsePrototype(methodDesc).newBuilderForType();
5639 service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
5640 @Override
5641 public void run(Message message) {
5642 if (message != null) {
5643 responseBuilder.mergeFrom(message);
5644 }
5645 }
5646 });
5647
5648 if (coprocessorHost != null) {
5649 coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
5650 }
5651
5652 return responseBuilder.build();
5653 }
5654
5655
5656
5657
5658
5659
5660
5661
5662
5663
5664
5665 private static void processTable(final FileSystem fs, final Path p,
5666 final HLog log, final Configuration c,
5667 final boolean majorCompact)
5668 throws IOException {
5669 HRegion region = null;
5670
5671 if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
5672 region = HRegion.newHRegion(p, log, fs, c,
5673 HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC, null);
5674 } else {
5675 throw new IOException("Not a known catalog table: " + p.toString());
5676 }
5677 try {
5678 region.initialize();
5679 if (majorCompact) {
5680 region.compactStores(true);
5681 } else {
5682
5683 Scan scan = new Scan();
5684
5685 RegionScanner scanner = region.getScanner(scan);
5686 try {
5687 List<Cell> kvs = new ArrayList<Cell>();
5688 boolean done;
5689 do {
5690 kvs.clear();
5691 done = scanner.next(kvs);
5692 if (kvs.size() > 0) LOG.info(kvs);
5693 } while (done);
5694 } finally {
5695 scanner.close();
5696 }
5697 }
5698 } finally {
5699 region.close();
5700 }
5701 }
5702
5703 boolean shouldForceSplit() {
5704 return this.splitRequest;
5705 }
5706
5707 byte[] getExplicitSplitPoint() {
5708 return this.explicitSplitPoint;
5709 }
5710
5711 void forceSplit(byte[] sp) {
5712
5713
5714 this.splitRequest = true;
5715 if (sp != null) {
5716 this.explicitSplitPoint = sp;
5717 }
5718 }
5719
5720 void clearSplit_TESTS_ONLY() {
5721 this.splitRequest = false;
5722 }
5723
5724
5725
5726
5727 protected void prepareToSplit() {
5728
5729 }
5730
5731
5732
5733
5734
5735
5736
5737 public byte[] checkSplit() {
5738
5739 if (this.getRegionInfo().isMetaTable() ||
5740 TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
5741 if (shouldForceSplit()) {
5742 LOG.warn("Cannot split meta region in HBase 0.20 and above");
5743 }
5744 return null;
5745 }
5746
5747
5748 if (this.isRecovering()) {
5749 LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
5750 return null;
5751 }
5752
5753 if (!splitPolicy.shouldSplit()) {
5754 return null;
5755 }
5756
5757 byte[] ret = splitPolicy.getSplitPoint();
5758
5759 if (ret != null) {
5760 try {
5761 checkRow(ret, "calculated split");
5762 } catch (IOException e) {
5763 LOG.error("Ignoring invalid split", e);
5764 return null;
5765 }
5766 }
5767 return ret;
5768 }
5769
5770
5771
5772
5773 public int getCompactPriority() {
5774 int count = Integer.MAX_VALUE;
5775 for (Store store : stores.values()) {
5776 count = Math.min(count, store.getCompactPriority());
5777 }
5778 return count;
5779 }
5780
5781
5782
5783
5784
5785
5786 public boolean needsCompaction() {
5787 for (Store store : stores.values()) {
5788 if(store.needsCompaction()) {
5789 return true;
5790 }
5791 }
5792 return false;
5793 }
5794
5795
5796 public RegionCoprocessorHost getCoprocessorHost() {
5797 return coprocessorHost;
5798 }
5799
5800
5801 public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
5802 this.coprocessorHost = coprocessorHost;
5803 }
5804
5805
5806
5807
5808
5809
5810
5811
5812 public void startRegionOperation() throws IOException {
5813 startRegionOperation(Operation.ANY);
5814 }
5815
5816
5817
5818
5819
5820 protected void startRegionOperation(Operation op) throws IOException {
5821 switch (op) {
5822 case INCREMENT:
5823 case APPEND:
5824 case GET:
5825 case SCAN:
5826 case SPLIT_REGION:
5827 case MERGE_REGION:
5828 case PUT:
5829 case DELETE:
5830 case BATCH_MUTATE:
5831 case COMPACT_REGION:
5832
5833 if (this.isRecovering() && (this.disallowWritesInRecovering ||
5834 (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
5835 throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering");
5836 }
5837 break;
5838 default:
5839 break;
5840 }
5841 if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
5842 || op == Operation.COMPACT_REGION) {
5843
5844
5845 return;
5846 }
5847 if (this.closing.get()) {
5848 throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5849 }
5850 lock(lock.readLock());
5851 if (this.closed.get()) {
5852 lock.readLock().unlock();
5853 throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5854 }
5855 try {
5856 if (coprocessorHost != null) {
5857 coprocessorHost.postStartRegionOperation(op);
5858 }
5859 } catch (Exception e) {
5860 lock.readLock().unlock();
5861 throw new IOException(e);
5862 }
5863 }
5864
5865
5866
5867
5868
5869
5870 public void closeRegionOperation() throws IOException {
5871 closeRegionOperation(Operation.ANY);
5872 }
5873
5874
5875
5876
5877
5878
5879
5880 public void closeRegionOperation(Operation operation) throws IOException {
5881 lock.readLock().unlock();
5882 if (coprocessorHost != null) {
5883 coprocessorHost.postCloseRegionOperation(operation);
5884 }
5885 }
5886
5887
5888
5889
5890
5891
5892
5893
5894
5895
5896 private void startBulkRegionOperation(boolean writeLockNeeded)
5897 throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
5898 if (this.closing.get()) {
5899 throw new NotServingRegionException(getRegionNameAsString() + " is closing");
5900 }
5901 if (writeLockNeeded) lock(lock.writeLock());
5902 else lock(lock.readLock());
5903 if (this.closed.get()) {
5904 if (writeLockNeeded) lock.writeLock().unlock();
5905 else lock.readLock().unlock();
5906 throw new NotServingRegionException(getRegionNameAsString() + " is closed");
5907 }
5908 }
5909
5910
5911
5912
5913
5914 private void closeBulkRegionOperation(){
5915 if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
5916 else lock.readLock().unlock();
5917 }
5918
5919
5920
5921
5922
5923 private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
5924 numMutationsWithoutWAL.increment();
5925 if (numMutationsWithoutWAL.get() <= 1) {
5926 LOG.info("writing data to region " + this +
5927 " with WAL disabled. Data may be lost in the event of a crash.");
5928 }
5929
5930 long mutationSize = 0;
5931 for (List<Cell> cells: familyMap.values()) {
5932 for (Cell cell : cells) {
5933 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
5934 mutationSize += kv.getKeyLength() + kv.getValueLength();
5935 }
5936 }
5937
5938 dataInMemoryWithoutWAL.add(mutationSize);
5939 }
5940
5941 private void lock(final Lock lock)
5942 throws RegionTooBusyException, InterruptedIOException {
5943 lock(lock, 1);
5944 }
5945
5946
5947
5948
5949
5950
5951 private void lock(final Lock lock, final int multiplier)
5952 throws RegionTooBusyException, InterruptedIOException {
5953 try {
5954 final long waitTime = Math.min(maxBusyWaitDuration,
5955 busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
5956 if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
5957 throw new RegionTooBusyException(
5958 "failed to get a lock in " + waitTime + " ms. " +
5959 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
5960 this.getRegionInfo().getRegionNameAsString()) +
5961 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
5962 this.getRegionServerServices().getServerName()));
5963 }
5964 } catch (InterruptedException ie) {
5965 LOG.info("Interrupted while waiting for a lock");
5966 InterruptedIOException iie = new InterruptedIOException();
5967 iie.initCause(ie);
5968 throw iie;
5969 }
5970 }
5971
5972
5973
5974
5975
5976
5977
5978 private void syncOrDefer(long txid, Durability durability) throws IOException {
5979 if (this.getRegionInfo().isMetaRegion()) {
5980 this.log.sync(txid);
5981 } else {
5982 switch(durability) {
5983 case USE_DEFAULT:
5984
5985 if (shouldSyncLog()) {
5986 this.log.sync(txid);
5987 }
5988 break;
5989 case SKIP_WAL:
5990
5991 break;
5992 case ASYNC_WAL:
5993
5994 break;
5995 case SYNC_WAL:
5996 case FSYNC_WAL:
5997
5998 this.log.sync(txid);
5999 break;
6000 }
6001 }
6002 }
6003
6004
6005
6006
6007 private boolean shouldSyncLog() {
6008 return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
6009 }
6010
6011
6012
6013
6014 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
6015
6016 @Override
6017 public void add(int index, Cell element) {
6018
6019 }
6020
6021 @Override
6022 public boolean addAll(int index, Collection<? extends Cell> c) {
6023 return false;
6024 }
6025
6026 @Override
6027 public KeyValue get(int index) {
6028 throw new UnsupportedOperationException();
6029 }
6030
6031 @Override
6032 public int size() {
6033 return 0;
6034 }
6035 };
6036
6037
6038
6039
6040
6041
6042
6043
6044
6045
6046
6047 public static void main(String[] args) throws IOException {
6048 if (args.length < 1) {
6049 printUsageAndExit(null);
6050 }
6051 boolean majorCompact = false;
6052 if (args.length > 1) {
6053 if (!args[1].toLowerCase().startsWith("major")) {
6054 printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
6055 }
6056 majorCompact = true;
6057 }
6058 final Path tableDir = new Path(args[0]);
6059 final Configuration c = HBaseConfiguration.create();
6060 final FileSystem fs = FileSystem.get(c);
6061 final Path logdir = new Path(c.get("hbase.tmp.dir"));
6062 final String logname = "hlog" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
6063
6064 final HLog log = HLogFactory.createHLog(fs, logdir, logname, c);
6065 try {
6066 processTable(fs, tableDir, log, c, majorCompact);
6067 } finally {
6068 log.close();
6069
6070 BlockCache bc = new CacheConfig(c).getBlockCache();
6071 if (bc != null) bc.shutdown();
6072 }
6073 }
6074
6075
6076
6077
6078 public long getOpenSeqNum() {
6079 return this.openSeqNum;
6080 }
6081
6082
6083
6084
6085
6086 public Map<byte[], Long> getMaxStoreSeqIdForLogReplay() {
6087 return this.maxSeqIdInStores;
6088 }
6089
6090
6091
6092
6093 public CompactionState getCompactionState() {
6094 boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
6095 return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
6096 : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
6097 }
6098
6099 public void reportCompactionRequestStart(boolean isMajor){
6100 (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
6101 }
6102
6103 public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted){
6104 int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
6105
6106
6107 compactionsFinished.incrementAndGet();
6108 compactionNumFilesCompacted.addAndGet(numFiles);
6109 compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
6110
6111 assert newValue >= 0;
6112 }
6113
6114
6115
6116
6117 public AtomicLong getSequenceId() {
6118 return this.sequenceId;
6119 }
6120
6121
6122
6123
6124
6125 private void setSequenceId(long value) {
6126 this.sequenceId.set(value);
6127 }
6128
6129
6130
6131
6132
6133
6134 public interface BulkLoadListener {
6135
6136
6137
6138
6139
6140
6141
6142
6143 String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
6144
6145
6146
6147
6148
6149
6150
6151 void doneBulkLoad(byte[] family, String srcPath) throws IOException;
6152
6153
6154
6155
6156
6157
6158
6159 void failedBulkLoad(byte[] family, String srcPath) throws IOException;
6160 }
6161
6162 @VisibleForTesting class RowLockContext {
6163 private final HashedBytes row;
6164 private final CountDownLatch latch = new CountDownLatch(1);
6165 private final Thread thread;
6166 private int lockCount = 0;
6167
6168 RowLockContext(HashedBytes row) {
6169 this.row = row;
6170 this.thread = Thread.currentThread();
6171 }
6172
6173 boolean ownedByCurrentThread() {
6174 return thread == Thread.currentThread();
6175 }
6176
6177 RowLock newLock() {
6178 lockCount++;
6179 return new RowLock(this);
6180 }
6181
6182 void releaseLock() {
6183 if (!ownedByCurrentThread()) {
6184 throw new IllegalArgumentException("Lock held by thread: " + thread
6185 + " cannot be released by different thread: " + Thread.currentThread());
6186 }
6187 lockCount--;
6188 if (lockCount == 0) {
6189
6190 RowLockContext existingContext = lockedRows.remove(row);
6191 if (existingContext != this) {
6192 throw new RuntimeException(
6193 "Internal row lock state inconsistent, should not happen, row: " + row);
6194 }
6195 latch.countDown();
6196 }
6197 }
6198 }
6199
6200
6201
6202
6203
6204
6205 public static class RowLock {
6206 @VisibleForTesting final RowLockContext context;
6207 private boolean released = false;
6208
6209 @VisibleForTesting RowLock(RowLockContext context) {
6210 this.context = context;
6211 }
6212
6213
6214
6215
6216
6217
6218 public void release() {
6219 if (!released) {
6220 context.releaseLock();
6221 released = true;
6222 }
6223 }
6224 }
6225
6226
6227
6228
6229
6230
6231 public void updatesLock() throws RegionTooBusyException, InterruptedIOException {
6232 lock(updatesLock.readLock());
6233 }
6234
6235
6236
6237
6238
6239 public void updatesUnlock() throws InterruptedIOException {
6240 updatesLock.readLock().unlock();
6241 }
6242 }