1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.net.URLEncoder;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Comparator;
30 import java.util.HashMap;
31 import java.util.LinkedList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.NavigableMap;
35 import java.util.TreeMap;
36 import java.util.UUID;
37 import java.util.concurrent.ConcurrentSkipListMap;
38 import java.util.concurrent.CopyOnWriteArrayList;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.atomic.AtomicLong;
42 import java.util.concurrent.locks.ReentrantLock;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.classification.InterfaceAudience;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.fs.FSDataOutputStream;
49 import org.apache.hadoop.fs.FileStatus;
50 import org.apache.hadoop.fs.FileSystem;
51 import org.apache.hadoop.fs.Path;
52 import org.apache.hadoop.fs.Syncable;
53 import org.apache.hadoop.hbase.HBaseConfiguration;
54 import org.apache.hadoop.hbase.HConstants;
55 import org.apache.hadoop.hbase.HRegionInfo;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.KeyValue;
58 import org.apache.hadoop.hbase.TableName;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.hbase.util.ClassSize;
61 import org.apache.hadoop.hbase.util.DrainBarrier;
62 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63 import org.apache.hadoop.hbase.util.FSUtils;
64 import org.apache.hadoop.hbase.util.HasThread;
65 import org.apache.hadoop.util.StringUtils;
66 import org.cloudera.htrace.Trace;
67 import org.cloudera.htrace.TraceScope;
68
69 import com.google.common.annotations.VisibleForTesting;
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 @InterfaceAudience.Private
111 class FSHLog implements HLog, Syncable {
112 static final Log LOG = LogFactory.getLog(FSHLog.class);
113
114 private final FileSystem fs;
115 private final Path rootDir;
116 private final Path dir;
117 private final Configuration conf;
118
119 private List<WALActionsListener> listeners =
120 new CopyOnWriteArrayList<WALActionsListener>();
121 private final long blocksize;
122 private final String prefix;
123 private final AtomicLong unflushedEntries = new AtomicLong(0);
124 private final AtomicLong syncedTillHere = new AtomicLong(0);
125 private long lastUnSyncedTxid;
126 private final Path oldLogDir;
127
128
129
130 private final AtomicLong failedTxid = new AtomicLong(0);
131 private volatile IOException asyncIOE = null;
132
133 private WALCoprocessorHost coprocessorHost;
134
135 private FSDataOutputStream hdfs_out;
136
137
138 private int minTolerableReplication;
139 private Method getNumCurrentReplicas;
140 final static Object [] NO_ARGS = new Object []{};
141
142
143 private DrainBarrier closeBarrier = new DrainBarrier();
144
145
146
147
148 Writer writer;
149
150
151
152
153
154
155 private final Object oldestSeqNumsLock = new Object();
156
157
158
159
160
161 private final ReentrantLock rollWriterLock = new ReentrantLock(true);
162
163
164
165
166 private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
167 new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
168
169
170
171
172
173 private final Map<byte[], Long> oldestFlushingSeqNums =
174 new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
175
176 private volatile boolean closed = false;
177
178 private boolean forMeta = false;
179
180
181 private volatile long filenum = -1;
182
183
184 private final AtomicInteger numEntries = new AtomicInteger(0);
185
186
187
188
189
190 private AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
191 private final int lowReplicationRollLimit;
192
193
194
195
196 private volatile boolean lowReplicationRollEnabled = true;
197
198
199
200 private final long logrollsize;
201
202
203 private long curLogSize = 0;
204
205
206
207
208 private AtomicLong totalLogSize = new AtomicLong(0);
209
210
211
212
213 private final Object updateLock = new Object();
214 private final Object pendingWritesLock = new Object();
215
216 private final boolean enabled;
217
218
219
220
221
222
223 private final int maxLogs;
224
225
226
227
228
229
230 private List<Entry> pendingWrites = new LinkedList<Entry>();
231
232 private final AsyncWriter asyncWriter;
233
234
235
236
237 private final AsyncSyncer[] asyncSyncers;
238 private final AsyncNotifier asyncNotifier;
239
240
241 private final int closeErrorsTolerated;
242
243 private final AtomicInteger closeErrorCount = new AtomicInteger();
244 private final MetricsWAL metrics;
245
246
247
248
249
250
251
252
253
254
255
256
257 private Map<byte[], Long> latestSequenceNums = new HashMap<byte[], Long>();
258
259
260
261
262 public final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
263 @Override
264 public int compare(Path o1, Path o2) {
265 long t1 = getFileNumFromFileName(o1);
266 long t2 = getFileNumFromFileName(o2);
267 if (t1 == t2) return 0;
268 return (t1 > t2) ? 1 : -1;
269 }
270 };
271
272
273
274
275
276 private NavigableMap<Path, Map<byte[], Long>> hlogSequenceNums =
277 new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
278
279
280
281
282
283
284
285
286
287
288 public FSHLog(final FileSystem fs, final Path root, final String logDir,
289 final Configuration conf)
290 throws IOException {
291 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
292 conf, null, true, null, false);
293 }
294
295
296
297
298
299
300
301
302
303
304
305 public FSHLog(final FileSystem fs, final Path root, final String logDir,
306 final String oldLogDir, final Configuration conf)
307 throws IOException {
308 this(fs, root, logDir, oldLogDir,
309 conf, null, true, null, false);
310 }
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331 public FSHLog(final FileSystem fs, final Path root, final String logDir,
332 final Configuration conf, final List<WALActionsListener> listeners,
333 final String prefix) throws IOException {
334 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
335 conf, listeners, true, prefix, false);
336 }
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360 public FSHLog(final FileSystem fs, final Path root, final String logDir,
361 final String oldLogDir, final Configuration conf,
362 final List<WALActionsListener> listeners,
363 final boolean failIfLogDirExists, final String prefix, boolean forMeta)
364 throws IOException {
365 super();
366 this.fs = fs;
367 this.rootDir = root;
368 this.dir = new Path(this.rootDir, logDir);
369 this.oldLogDir = new Path(this.rootDir, oldLogDir);
370 this.forMeta = forMeta;
371 this.conf = conf;
372
373 if (listeners != null) {
374 for (WALActionsListener i: listeners) {
375 registerWALActionsListener(i);
376 }
377 }
378
379 this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
380 FSUtils.getDefaultBlockSize(this.fs, this.dir));
381
382 float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
383 this.logrollsize = (long)(this.blocksize * multi);
384
385 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
386 this.minTolerableReplication = conf.getInt(
387 "hbase.regionserver.hlog.tolerable.lowreplication",
388 FSUtils.getDefaultReplication(fs, this.dir));
389 this.lowReplicationRollLimit = conf.getInt(
390 "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
391 this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
392 this.closeErrorsTolerated = conf.getInt(
393 "hbase.regionserver.logroll.errors.tolerated", 0);
394
395
396 LOG.info("WAL/HLog configuration: blocksize=" +
397 StringUtils.byteDesc(this.blocksize) +
398 ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
399 ", enabled=" + this.enabled);
400
401 this.prefix = prefix == null || prefix.isEmpty() ?
402 "hlog" : URLEncoder.encode(prefix, "UTF8");
403
404 boolean dirExists = false;
405 if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
406 throw new IOException("Target HLog directory already exists: " + dir);
407 }
408 if (!dirExists && !fs.mkdirs(dir)) {
409 throw new IOException("Unable to mkdir " + dir);
410 }
411
412 if (!fs.exists(this.oldLogDir)) {
413 if (!fs.mkdirs(this.oldLogDir)) {
414 throw new IOException("Unable to mkdir " + this.oldLogDir);
415 }
416 }
417
418 rollWriter();
419
420
421 this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
422
423 final String n = Thread.currentThread().getName();
424
425
426 asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter");
427 asyncWriter.start();
428
429 int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5);
430 asyncSyncers = new AsyncSyncer[syncerNums];
431 for (int i = 0; i < asyncSyncers.length; ++i) {
432 asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i);
433 asyncSyncers[i].start();
434 }
435
436 asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier");
437 asyncNotifier.start();
438
439 coprocessorHost = new WALCoprocessorHost(this, conf);
440
441 this.metrics = new MetricsWAL();
442 }
443
444
445
446
447
448 private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
449 Method m = null;
450 if (os != null) {
451 Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
452 .getClass();
453 try {
454 m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas",
455 new Class<?>[] {});
456 m.setAccessible(true);
457 } catch (NoSuchMethodException e) {
458 LOG.info("FileSystem's output stream doesn't support"
459 + " getNumCurrentReplicas; --HDFS-826 not available; fsOut="
460 + wrappedStreamClass.getName());
461 } catch (SecurityException e) {
462 LOG.info("Doesn't have access to getNumCurrentReplicas on "
463 + "FileSystems's output stream --HDFS-826 not available; fsOut="
464 + wrappedStreamClass.getName(), e);
465 m = null;
466 }
467 }
468 if (m != null) {
469 if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas--HDFS-826");
470 }
471 return m;
472 }
473
474 @Override
475 public void registerWALActionsListener(final WALActionsListener listener) {
476 this.listeners.add(listener);
477 }
478
479 @Override
480 public boolean unregisterWALActionsListener(final WALActionsListener listener) {
481 return this.listeners.remove(listener);
482 }
483
484 @Override
485 public long getFilenum() {
486 return this.filenum;
487 }
488
489
490
491
492
493
494
495
496
497 OutputStream getOutputStream() {
498 return this.hdfs_out.getWrappedStream();
499 }
500
501 @Override
502 public byte [][] rollWriter() throws FailedLogCloseException, IOException {
503 return rollWriter(false);
504 }
505
506 @Override
507 public byte [][] rollWriter(boolean force)
508 throws FailedLogCloseException, IOException {
509 rollWriterLock.lock();
510 try {
511
512 if (!force && this.writer != null && this.numEntries.get() <= 0) {
513 return null;
514 }
515 byte [][] regionsToFlush = null;
516 if (closed) {
517 LOG.debug("HLog closed. Skipping rolling of writer");
518 return null;
519 }
520 try {
521 if (!closeBarrier.beginOp()) {
522 LOG.debug("HLog closing. Skipping rolling of writer");
523 return regionsToFlush;
524 }
525
526
527 long currentFilenum = this.filenum;
528 Path oldPath = null;
529 if (currentFilenum > 0) {
530
531 oldPath = computeFilename(currentFilenum);
532 }
533 this.filenum = System.currentTimeMillis();
534 Path newPath = computeFilename();
535 while (fs.exists(newPath)) {
536 this.filenum++;
537 newPath = computeFilename();
538 }
539
540
541 if (!this.listeners.isEmpty()) {
542 for (WALActionsListener i : this.listeners) {
543 i.preLogRoll(oldPath, newPath);
544 }
545 }
546 FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
547
548 FSDataOutputStream nextHdfsOut = null;
549 if (nextWriter instanceof ProtobufLogWriter) {
550 nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
551
552 try {
553 nextWriter.sync();
554 } catch (IOException e) {
555
556 LOG.warn("pre-sync failed", e);
557 }
558 }
559
560 Path oldFile = null;
561 int oldNumEntries = 0;
562 synchronized (updateLock) {
563
564 oldNumEntries = this.numEntries.get();
565 oldFile = cleanupCurrentWriter(currentFilenum);
566 this.writer = nextWriter;
567 this.hdfs_out = nextHdfsOut;
568 this.numEntries.set(0);
569 if (oldFile != null) {
570 this.hlogSequenceNums.put(oldFile, this.latestSequenceNums);
571 this.latestSequenceNums = new HashMap<byte[], Long>();
572 }
573 }
574 if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath));
575 else {
576 long oldFileLen = this.fs.getFileStatus(oldFile).getLen();
577 this.totalLogSize.addAndGet(oldFileLen);
578 LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries="
579 + oldNumEntries + ", filesize="
580 + StringUtils.humanReadableInt(oldFileLen) + "; new WAL "
581 + FSUtils.getPath(newPath));
582 }
583
584
585 if (!this.listeners.isEmpty()) {
586 for (WALActionsListener i : this.listeners) {
587 i.postLogRoll(oldPath, newPath);
588 }
589 }
590
591
592 if (getNumRolledLogFiles() > 0) {
593 cleanOldLogs();
594 regionsToFlush = findRegionsToForceFlush();
595 }
596 } finally {
597 closeBarrier.endOp();
598 }
599 return regionsToFlush;
600 } finally {
601 rollWriterLock.unlock();
602 }
603 }
604
605
606
607
608
609
610
611
612
613
614
615 protected Writer createWriterInstance(final FileSystem fs, final Path path,
616 final Configuration conf) throws IOException {
617 if (forMeta) {
618
619 }
620 return HLogFactory.createWALWriter(fs, path, conf);
621 }
622
623
624
625
626
627
628
629
630
631
632
633
634 private void cleanOldLogs() throws IOException {
635 Map<byte[], Long> oldestFlushingSeqNumsLocal = null;
636 Map<byte[], Long> oldestUnflushedSeqNumsLocal = null;
637 List<Path> logsToArchive = new ArrayList<Path>();
638
639 synchronized (oldestSeqNumsLock) {
640 oldestFlushingSeqNumsLocal = new HashMap<byte[], Long>(this.oldestFlushingSeqNums);
641 oldestUnflushedSeqNumsLocal = new HashMap<byte[], Long>(this.oldestUnflushedSeqNums);
642 }
643 for (Map.Entry<Path, Map<byte[], Long>> e : hlogSequenceNums.entrySet()) {
644
645 Path log = e.getKey();
646 Map<byte[], Long> sequenceNums = e.getValue();
647
648 if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal,
649 oldestUnflushedSeqNumsLocal)) {
650 logsToArchive.add(log);
651 LOG.debug("log file is ready for archiving " + log);
652 }
653 }
654 for (Path p : logsToArchive) {
655 this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
656 archiveLogFile(p);
657 this.hlogSequenceNums.remove(p);
658 }
659 }
660
661
662
663
664
665
666
667
668
669
670
671
672 static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums,
673 Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) {
674 for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) {
675
676
677 long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ?
678 oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
679 long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ?
680 oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
681
682 long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed);
683 if (minSeqNum <= regionSeqIdEntry.getValue()) return false;
684 }
685 return true;
686 }
687
688
689
690
691
692
693
694
695
696
697
698 private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) {
699 List<byte[]> regionsToFlush = null;
700
701 synchronized (oldestSeqNumsLock) {
702 for (Map.Entry<byte[], Long> e : regionsSequenceNums.entrySet()) {
703 Long unFlushedVal = this.oldestUnflushedSeqNums.get(e.getKey());
704 if (unFlushedVal != null && unFlushedVal <= e.getValue()) {
705 if (regionsToFlush == null) regionsToFlush = new ArrayList<byte[]>();
706 regionsToFlush.add(e.getKey());
707 }
708 }
709 }
710 return regionsToFlush == null ? null : regionsToFlush
711 .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
712 }
713
714
715
716
717
718
719
720
721 byte[][] findRegionsToForceFlush() throws IOException {
722 byte [][] regions = null;
723 int logCount = getNumRolledLogFiles();
724 if (logCount > this.maxLogs && logCount > 0) {
725 Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
726 this.hlogSequenceNums.firstEntry();
727 regions = findEligibleMemstoresToFlush(firstWALEntry.getValue());
728 }
729 if (regions != null) {
730 StringBuilder sb = new StringBuilder();
731 for (int i = 0; i < regions.length; i++) {
732 if (i > 0) sb.append(", ");
733 sb.append(Bytes.toStringBinary(regions[i]));
734 }
735 LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
736 this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
737 sb.toString());
738 }
739 return regions;
740 }
741
742
743
744
745
746
747
748 Path cleanupCurrentWriter(final long currentfilenum) throws IOException {
749 Path oldFile = null;
750 if (this.writer != null) {
751
752 try {
753
754
755 if (this.unflushedEntries.get() != this.syncedTillHere.get()) {
756 LOG.debug("cleanupCurrentWriter " +
757 " waiting for transactions to get synced " +
758 " total " + this.unflushedEntries.get() +
759 " synced till here " + this.syncedTillHere.get());
760 sync();
761 }
762 this.writer.close();
763 this.writer = null;
764 closeErrorCount.set(0);
765 } catch (IOException e) {
766 LOG.error("Failed close of HLog writer", e);
767 int errors = closeErrorCount.incrementAndGet();
768 if (errors <= closeErrorsTolerated && !hasUnSyncedEntries()) {
769 LOG.warn("Riding over HLog close failure! error count="+errors);
770 } else {
771 if (hasUnSyncedEntries()) {
772 LOG.error("Aborting due to unflushed edits in HLog");
773 }
774
775
776
777 FailedLogCloseException flce =
778 new FailedLogCloseException("#" + currentfilenum);
779 flce.initCause(e);
780 throw flce;
781 }
782 }
783 if (currentfilenum >= 0) {
784 oldFile = computeFilename(currentfilenum);
785 }
786 }
787 return oldFile;
788 }
789
790 private void archiveLogFile(final Path p) throws IOException {
791 Path newPath = getHLogArchivePath(this.oldLogDir, p);
792
793 if (!this.listeners.isEmpty()) {
794 for (WALActionsListener i : this.listeners) {
795 i.preLogArchive(p, newPath);
796 }
797 }
798 if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
799 throw new IOException("Unable to rename " + p + " to " + newPath);
800 }
801
802 if (!this.listeners.isEmpty()) {
803 for (WALActionsListener i : this.listeners) {
804 i.postLogArchive(p, newPath);
805 }
806 }
807 }
808
809
810
811
812
813
814 protected Path computeFilename() {
815 return computeFilename(this.filenum);
816 }
817
818
819
820
821
822
823
824 protected Path computeFilename(long filenum) {
825 if (filenum < 0) {
826 throw new RuntimeException("hlog file number can't be < 0");
827 }
828 String child = prefix + "." + filenum;
829 if (forMeta) {
830 child += HLog.META_HLOG_FILE_EXTN;
831 }
832 return new Path(dir, child);
833 }
834
835
836
837
838
839
840
841
842
843 protected long getFileNumFromFileName(Path fileName) {
844 if (fileName == null) throw new IllegalArgumentException("file name can't be null");
845
846 String prefixPathStr = new Path(dir, prefix + ".").toString();
847 if (!fileName.toString().startsWith(prefixPathStr)) {
848 throw new IllegalArgumentException("The log file " + fileName + " doesn't belong to" +
849 " this regionserver " + prefixPathStr);
850 }
851 String chompedPath = fileName.toString().substring(prefixPathStr.length());
852 if (forMeta) chompedPath = chompedPath.substring(0, chompedPath.indexOf(META_HLOG_FILE_EXTN));
853 return Long.parseLong(chompedPath);
854 }
855
856 @Override
857 public void closeAndDelete() throws IOException {
858 close();
859 if (!fs.exists(this.dir)) return;
860 FileStatus[] files = fs.listStatus(this.dir);
861 if (files != null) {
862 for(FileStatus file : files) {
863
864 Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
865
866 if (!this.listeners.isEmpty()) {
867 for (WALActionsListener i : this.listeners) {
868 i.preLogArchive(file.getPath(), p);
869 }
870 }
871
872 if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
873 throw new IOException("Unable to rename " + file.getPath() + " to " + p);
874 }
875
876 if (!this.listeners.isEmpty()) {
877 for (WALActionsListener i : this.listeners) {
878 i.postLogArchive(file.getPath(), p);
879 }
880 }
881 }
882 LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.oldLogDir));
883 }
884 if (!fs.delete(dir, true)) {
885 LOG.info("Unable to delete " + dir);
886 }
887 }
888
889 @Override
890 public void close() throws IOException {
891 if (this.closed) {
892 return;
893 }
894
895 try {
896 asyncNotifier.interrupt();
897 asyncNotifier.join();
898 } catch (InterruptedException e) {
899 LOG.error("Exception while waiting for " + asyncNotifier.getName() +
900 " threads to die", e);
901 }
902
903 for (int i = 0; i < asyncSyncers.length; ++i) {
904 try {
905 asyncSyncers[i].interrupt();
906 asyncSyncers[i].join();
907 } catch (InterruptedException e) {
908 LOG.error("Exception while waiting for " + asyncSyncers[i].getName() +
909 " threads to die", e);
910 }
911 }
912
913 try {
914 asyncWriter.interrupt();
915 asyncWriter.join();
916 } catch (InterruptedException e) {
917 LOG.error("Exception while waiting for " + asyncWriter.getName() +
918 " thread to die", e);
919 }
920
921 try {
922
923 closeBarrier.stopAndDrainOps();
924 } catch (InterruptedException e) {
925 LOG.error("Exception while waiting for cache flushes and log rolls", e);
926 Thread.currentThread().interrupt();
927 }
928
929
930 if (!this.listeners.isEmpty()) {
931 for (WALActionsListener i : this.listeners) {
932 i.logCloseRequested();
933 }
934 }
935 synchronized (updateLock) {
936 this.closed = true;
937 if (LOG.isDebugEnabled()) {
938 LOG.debug("Closing WAL writer in " + this.dir.toString());
939 }
940 if (this.writer != null) {
941 this.writer.close();
942 this.writer = null;
943 }
944 }
945 }
946
947
948
949
950
951
952
953
954
955 protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
956 long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
957 return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
958 }
959
960 @Override
961 @VisibleForTesting
962 public void append(HRegionInfo info, TableName tableName, WALEdit edits,
963 final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException {
964 append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, true, sequenceId,
965 HConstants.NO_NONCE, HConstants.NO_NONCE);
966 }
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995 @SuppressWarnings("deprecation")
996 private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds,
997 final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore,
998 AtomicLong sequenceId, long nonceGroup, long nonce) throws IOException {
999 if (edits.isEmpty()) return this.unflushedEntries.get();
1000 if (this.closed) {
1001 throw new IOException("Cannot append; log is closed");
1002 }
1003 TraceScope traceScope = Trace.startSpan("FSHlog.append");
1004 try {
1005 long txid = 0;
1006 synchronized (this.updateLock) {
1007
1008
1009 long seqNum = sequenceId.incrementAndGet();
1010
1011
1012
1013
1014
1015
1016
1017 byte [] encodedRegionName = info.getEncodedNameAsBytes();
1018 if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
1019 HLogKey logKey = makeKey(
1020 encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce);
1021
1022 synchronized (pendingWritesLock) {
1023 doWrite(info, logKey, edits, htd);
1024 txid = this.unflushedEntries.incrementAndGet();
1025 }
1026 this.numEntries.incrementAndGet();
1027 this.asyncWriter.setPendingTxid(txid);
1028
1029 if (htd.isDeferredLogFlush()) {
1030 lastUnSyncedTxid = txid;
1031 }
1032 this.latestSequenceNums.put(encodedRegionName, seqNum);
1033 }
1034
1035
1036
1037
1038 if (doSync &&
1039 (info.isMetaRegion() ||
1040 !htd.isDeferredLogFlush())) {
1041
1042 this.sync(txid);
1043 }
1044 return txid;
1045 } finally {
1046 traceScope.close();
1047 }
1048 }
1049
1050 @Override
1051 public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
1052 List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
1053 boolean isInMemstore, long nonceGroup, long nonce) throws IOException {
1054 return append(info, tableName, edits, clusterIds,
1055 now, htd, false, isInMemstore, sequenceId, nonceGroup, nonce);
1056 }
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078 private class AsyncWriter extends HasThread {
1079 private long pendingTxid = 0;
1080 private long txidToWrite = 0;
1081 private long lastWrittenTxid = 0;
1082 private Object writeLock = new Object();
1083
1084 public AsyncWriter(String name) {
1085 super(name);
1086 }
1087
1088
1089
1090 public void setPendingTxid(long txid) {
1091 synchronized (this.writeLock) {
1092 if (txid <= this.pendingTxid)
1093 return;
1094
1095 this.pendingTxid = txid;
1096 this.writeLock.notify();
1097 }
1098 }
1099
1100 public void run() {
1101 try {
1102 while (!this.isInterrupted()) {
1103
1104 synchronized (this.writeLock) {
1105 while (this.pendingTxid <= this.lastWrittenTxid) {
1106 this.writeLock.wait();
1107 }
1108 }
1109
1110
1111
1112
1113
1114
1115
1116
1117 List<Entry> pendWrites = null;
1118 synchronized (pendingWritesLock) {
1119 this.txidToWrite = unflushedEntries.get();
1120 pendWrites = pendingWrites;
1121 pendingWrites = new LinkedList<Entry>();
1122 }
1123
1124
1125 try {
1126 for (Entry e : pendWrites) {
1127 writer.append(e);
1128 }
1129 } catch(IOException e) {
1130 LOG.error("Error while AsyncWriter write, request close of hlog ", e);
1131 requestLogRoll();
1132
1133 asyncIOE = e;
1134 failedTxid.set(this.txidToWrite);
1135 }
1136
1137
1138 this.lastWrittenTxid = this.txidToWrite;
1139 boolean hasIdleSyncer = false;
1140 for (int i = 0; i < asyncSyncers.length; ++i) {
1141 if (!asyncSyncers[i].isSyncing()) {
1142 hasIdleSyncer = true;
1143 asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid);
1144 break;
1145 }
1146 }
1147 if (!hasIdleSyncer) {
1148 int idx = (int)(this.lastWrittenTxid % asyncSyncers.length);
1149 asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid);
1150 }
1151 }
1152 } catch (InterruptedException e) {
1153 LOG.debug(getName() + " interrupted while waiting for " +
1154 "newer writes added to local buffer");
1155 } catch (Exception e) {
1156 LOG.error("UNEXPECTED", e);
1157 } finally {
1158 LOG.info(getName() + " exiting");
1159 }
1160 }
1161 }
1162
1163
1164
1165 private class AsyncSyncer extends HasThread {
1166 private long writtenTxid = 0;
1167 private long txidToSync = 0;
1168 private long lastSyncedTxid = 0;
1169 private volatile boolean isSyncing = false;
1170 private Object syncLock = new Object();
1171
1172 public AsyncSyncer(String name) {
1173 super(name);
1174 }
1175
1176 public boolean isSyncing() {
1177 return this.isSyncing;
1178 }
1179
1180
1181
1182 public void setWrittenTxid(long txid) {
1183 synchronized (this.syncLock) {
1184 if (txid <= this.writtenTxid)
1185 return;
1186
1187 this.writtenTxid = txid;
1188 this.syncLock.notify();
1189 }
1190 }
1191
1192 public void run() {
1193 try {
1194 while (!this.isInterrupted()) {
1195
1196
1197 synchronized (this.syncLock) {
1198 while (this.writtenTxid <= this.lastSyncedTxid) {
1199 this.syncLock.wait();
1200 }
1201 this.txidToSync = this.writtenTxid;
1202 }
1203
1204
1205
1206
1207
1208 if (this.txidToSync <= syncedTillHere.get()) {
1209 this.lastSyncedTxid = this.txidToSync;
1210 continue;
1211 }
1212
1213
1214 long now = EnvironmentEdgeManager.currentTimeMillis();
1215 try {
1216 if (writer == null) {
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235 LOG.fatal("should never happen: has unsynced writes but writer is null!");
1236 asyncIOE = new IOException("has unsynced writes but writer is null!");
1237 failedTxid.set(this.txidToSync);
1238 } else {
1239 this.isSyncing = true;
1240 writer.sync();
1241 this.isSyncing = false;
1242 }
1243 postSync();
1244 } catch (IOException e) {
1245 LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e);
1246 requestLogRoll();
1247
1248 asyncIOE = e;
1249 failedTxid.set(this.txidToSync);
1250
1251 this.isSyncing = false;
1252 }
1253 metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
1254
1255
1256
1257 this.lastSyncedTxid = this.txidToSync;
1258 asyncNotifier.setFlushedTxid(this.lastSyncedTxid);
1259
1260
1261 boolean logRollNeeded = false;
1262 if (rollWriterLock.tryLock()) {
1263 try {
1264 logRollNeeded = checkLowReplication();
1265 } finally {
1266 rollWriterLock.unlock();
1267 }
1268 try {
1269 if (logRollNeeded || writer != null && writer.getLength() > logrollsize) {
1270 requestLogRoll();
1271 }
1272 } catch (IOException e) {
1273 LOG.warn("writer.getLength() failed,this failure won't block here");
1274 }
1275 }
1276 }
1277 } catch (InterruptedException e) {
1278 LOG.debug(getName() + " interrupted while waiting for " +
1279 "notification from AsyncWriter thread");
1280 } catch (Exception e) {
1281 LOG.error("UNEXPECTED", e);
1282 } finally {
1283 LOG.info(getName() + " exiting");
1284 }
1285 }
1286 }
1287
1288
1289
1290
1291
1292
1293
1294 private class AsyncNotifier extends HasThread {
1295 private long flushedTxid = 0;
1296 private long lastNotifiedTxid = 0;
1297 private Object notifyLock = new Object();
1298
1299 public AsyncNotifier(String name) {
1300 super(name);
1301 }
1302
1303 public void setFlushedTxid(long txid) {
1304 synchronized (this.notifyLock) {
1305 if (txid <= this.flushedTxid) {
1306 return;
1307 }
1308
1309 this.flushedTxid = txid;
1310 this.notifyLock.notify();
1311 }
1312 }
1313
1314 public void run() {
1315 try {
1316 while (!this.isInterrupted()) {
1317 synchronized (this.notifyLock) {
1318 while (this.flushedTxid <= this.lastNotifiedTxid) {
1319 this.notifyLock.wait();
1320 }
1321 this.lastNotifiedTxid = this.flushedTxid;
1322 }
1323
1324
1325
1326 synchronized (syncedTillHere) {
1327 syncedTillHere.set(this.lastNotifiedTxid);
1328 syncedTillHere.notifyAll();
1329 }
1330 }
1331 } catch (InterruptedException e) {
1332 LOG.debug(getName() + " interrupted while waiting for " +
1333 " notification from AsyncSyncer thread");
1334 } catch (Exception e) {
1335 LOG.error("UNEXPECTED", e);
1336 } finally {
1337 LOG.info(getName() + " exiting");
1338 }
1339 }
1340 }
1341
1342
1343 private void syncer() throws IOException {
1344 syncer(this.unflushedEntries.get());
1345 }
1346
1347
1348 private void syncer(long txid) throws IOException {
1349 synchronized (this.syncedTillHere) {
1350 while (this.syncedTillHere.get() < txid) {
1351 try {
1352 this.syncedTillHere.wait();
1353
1354 if (txid <= this.failedTxid.get()) {
1355 assert asyncIOE != null :
1356 "current txid is among(under) failed txids, but asyncIOE is null!";
1357 throw asyncIOE;
1358 }
1359 } catch (InterruptedException e) {
1360 LOG.debug("interrupted while waiting for notification from AsyncNotifier");
1361 }
1362 }
1363 }
1364 }
1365
1366 @Override
1367 public void postSync() {}
1368
1369 @Override
1370 public void postAppend(List<Entry> entries) {}
1371
1372
1373
1374
1375 private boolean checkLowReplication() {
1376 boolean logRollNeeded = false;
1377
1378
1379 try {
1380 int numCurrentReplicas = getLogReplication();
1381 if (numCurrentReplicas != 0
1382 && numCurrentReplicas < this.minTolerableReplication) {
1383 if (this.lowReplicationRollEnabled) {
1384 if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1385 LOG.warn("HDFS pipeline error detected. " + "Found "
1386 + numCurrentReplicas + " replicas but expecting no less than "
1387 + this.minTolerableReplication + " replicas. "
1388 + " Requesting close of hlog.");
1389 logRollNeeded = true;
1390
1391
1392
1393 this.consecutiveLogRolls.getAndIncrement();
1394 } else {
1395 LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1396 + "the total number of live datanodes is lower than the tolerable replicas.");
1397 this.consecutiveLogRolls.set(0);
1398 this.lowReplicationRollEnabled = false;
1399 }
1400 }
1401 } else if (numCurrentReplicas >= this.minTolerableReplication) {
1402
1403 if (!this.lowReplicationRollEnabled) {
1404
1405
1406
1407 if (this.numEntries.get() <= 1) {
1408 return logRollNeeded;
1409 }
1410
1411
1412 this.lowReplicationRollEnabled = true;
1413 LOG.info("LowReplication-Roller was enabled.");
1414 }
1415 }
1416 } catch (Exception e) {
1417 LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
1418 " still proceeding ahead...");
1419 }
1420 return logRollNeeded;
1421 }
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435 int getLogReplication()
1436 throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1437 if (this.getNumCurrentReplicas != null && this.hdfs_out != null) {
1438 Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS);
1439 if (repl instanceof Integer) {
1440 return ((Integer)repl).intValue();
1441 }
1442 }
1443 return 0;
1444 }
1445
1446 boolean canGetCurReplicas() {
1447 return this.getNumCurrentReplicas != null;
1448 }
1449
1450 @Override
1451 public void hsync() throws IOException {
1452 syncer();
1453 }
1454
1455 @Override
1456 public void hflush() throws IOException {
1457 syncer();
1458 }
1459
1460 @Override
1461 public void sync() throws IOException {
1462 syncer();
1463 }
1464
1465 @Override
1466 public void sync(long txid) throws IOException {
1467 syncer(txid);
1468 }
1469
1470 private void requestLogRoll() {
1471 if (!this.listeners.isEmpty()) {
1472 for (WALActionsListener i: this.listeners) {
1473 i.logRollRequested();
1474 }
1475 }
1476 }
1477
1478
1479 protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
1480 HTableDescriptor htd)
1481 throws IOException {
1482 if (!this.enabled) {
1483 return;
1484 }
1485 if (!this.listeners.isEmpty()) {
1486 for (WALActionsListener i: this.listeners) {
1487 i.visitLogEntryBeforeWrite(htd, logKey, logEdit);
1488 }
1489 }
1490 try {
1491 long now = EnvironmentEdgeManager.currentTimeMillis();
1492
1493 if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
1494 if (logEdit.isReplay()) {
1495
1496 logKey.setScopes(null);
1497 }
1498
1499 this.pendingWrites.add(new HLog.Entry(logKey, logEdit));
1500 }
1501 long took = EnvironmentEdgeManager.currentTimeMillis() - now;
1502 coprocessorHost.postWALWrite(info, logKey, logEdit);
1503 long len = 0;
1504 for (KeyValue kv : logEdit.getKeyValues()) {
1505 len += kv.getLength();
1506 }
1507 this.metrics.finishAppend(took, len);
1508 } catch (IOException e) {
1509 LOG.fatal("Could not append. Requesting close of hlog", e);
1510 requestLogRoll();
1511 throw e;
1512 }
1513 }
1514
1515
1516
1517 int getNumEntries() {
1518 return numEntries.get();
1519 }
1520
1521
1522 public int getNumRolledLogFiles() {
1523 return hlogSequenceNums.size();
1524 }
1525
1526
1527 @Override
1528 public int getNumLogFiles() {
1529
1530 return getNumRolledLogFiles() + 1;
1531 }
1532
1533
1534 @Override
1535 public long getLogFileSize() {
1536 return totalLogSize.get() + curLogSize;
1537 }
1538
1539 @Override
1540 public boolean startCacheFlush(final byte[] encodedRegionName) {
1541 Long oldRegionSeqNum = null;
1542 if (!closeBarrier.beginOp()) {
1543 LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
1544 " - because the server is closing.");
1545 return false;
1546 }
1547 synchronized (oldestSeqNumsLock) {
1548 oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
1549 if (oldRegionSeqNum != null) {
1550 Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
1551 assert oldValue == null : "Flushing map not cleaned up for "
1552 + Bytes.toString(encodedRegionName);
1553 }
1554 }
1555 if (oldRegionSeqNum == null) {
1556
1557
1558
1559
1560
1561 LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
1562 + Bytes.toString(encodedRegionName) + "]");
1563 }
1564 return true;
1565 }
1566
1567 @Override
1568 public void completeCacheFlush(final byte [] encodedRegionName)
1569 {
1570 synchronized (oldestSeqNumsLock) {
1571 this.oldestFlushingSeqNums.remove(encodedRegionName);
1572 }
1573 closeBarrier.endOp();
1574 }
1575
1576 @Override
1577 public void abortCacheFlush(byte[] encodedRegionName) {
1578 Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
1579 synchronized (oldestSeqNumsLock) {
1580 seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
1581 if (seqNumBeforeFlushStarts != null) {
1582 currentSeqNum =
1583 this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
1584 }
1585 }
1586 closeBarrier.endOp();
1587 if ((currentSeqNum != null)
1588 && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
1589 String errorStr = "Region " + Bytes.toString(encodedRegionName) +
1590 "acquired edits out of order current memstore seq=" + currentSeqNum
1591 + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
1592 LOG.error(errorStr);
1593 assert false : errorStr;
1594 Runtime.getRuntime().halt(1);
1595 }
1596 }
1597
1598 @Override
1599 public boolean isLowReplicationRollEnabled() {
1600 return lowReplicationRollEnabled;
1601 }
1602
1603
1604
1605
1606
1607
1608 protected Path getDir() {
1609 return dir;
1610 }
1611
1612 static Path getHLogArchivePath(Path oldLogDir, Path p) {
1613 return new Path(oldLogDir, p.getName());
1614 }
1615
1616 static String formatRecoveredEditsFileName(final long seqid) {
1617 return String.format("%019d", seqid);
1618 }
1619
1620 public static final long FIXED_OVERHEAD = ClassSize.align(
1621 ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1622 ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1623
1624 private static void usage() {
1625 System.err.println("Usage: HLog <ARGS>");
1626 System.err.println("Arguments:");
1627 System.err.println(" --dump Dump textual representation of passed one or more files");
1628 System.err.println(" For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1629 System.err.println(" --split Split the passed directory of WAL logs");
1630 System.err.println(" For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1631 }
1632
1633 private static void split(final Configuration conf, final Path p)
1634 throws IOException {
1635 FileSystem fs = FileSystem.get(conf);
1636 if (!fs.exists(p)) {
1637 throw new FileNotFoundException(p.toString());
1638 }
1639 if (!fs.getFileStatus(p).isDir()) {
1640 throw new IOException(p + " is not a directory");
1641 }
1642
1643 final Path baseDir = FSUtils.getRootDir(conf);
1644 final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1645 HLogSplitter.split(baseDir, p, oldLogDir, fs, conf);
1646 }
1647
1648 @Override
1649 public WALCoprocessorHost getCoprocessorHost() {
1650 return coprocessorHost;
1651 }
1652
1653
1654 boolean hasUnSyncedEntries() {
1655 return this.lastUnSyncedTxid > this.syncedTillHere.get();
1656 }
1657
1658 @Override
1659 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1660 Long result = oldestUnflushedSeqNums.get(encodedRegionName);
1661 return result == null ? HConstants.NO_SEQNUM : result.longValue();
1662 }
1663
1664
1665
1666
1667
1668
1669
1670
1671 public static void main(String[] args) throws IOException {
1672 if (args.length < 2) {
1673 usage();
1674 System.exit(-1);
1675 }
1676
1677 if (args[0].compareTo("--dump") == 0) {
1678 HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1679 } else if (args[0].compareTo("--split") == 0) {
1680 Configuration conf = HBaseConfiguration.create();
1681 for (int i = 1; i < args.length; i++) {
1682 try {
1683 Path logPath = new Path(args[i]);
1684 FSUtils.setFsDefault(conf, logPath);
1685 split(conf, logPath);
1686 } catch (Throwable t) {
1687 t.printStackTrace(System.err);
1688 System.exit(-1);
1689 }
1690 }
1691 } else {
1692 usage();
1693 System.exit(-1);
1694 }
1695 }
1696 }