View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.lang.reflect.InvocationTargetException;
25  import java.lang.reflect.Method;
26  import java.net.URLEncoder;
27  import java.util.ArrayList;
28  import java.util.Arrays;
29  import java.util.Comparator;
30  import java.util.HashMap;
31  import java.util.LinkedList;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.NavigableMap;
35  import java.util.TreeMap;
36  import java.util.UUID;
37  import java.util.concurrent.ConcurrentSkipListMap;
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicInteger;
41  import java.util.concurrent.atomic.AtomicLong;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.classification.InterfaceAudience;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.fs.FSDataOutputStream;
49  import org.apache.hadoop.fs.FileStatus;
50  import org.apache.hadoop.fs.FileSystem;
51  import org.apache.hadoop.fs.Path;
52  import org.apache.hadoop.fs.Syncable;
53  import org.apache.hadoop.hbase.HBaseConfiguration;
54  import org.apache.hadoop.hbase.HConstants;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.KeyValue;
58  import org.apache.hadoop.hbase.TableName;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.util.ClassSize;
61  import org.apache.hadoop.hbase.util.DrainBarrier;
62  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63  import org.apache.hadoop.hbase.util.FSUtils;
64  import org.apache.hadoop.hbase.util.HasThread;
65  import org.apache.hadoop.util.StringUtils;
66  import org.cloudera.htrace.Trace;
67  import org.cloudera.htrace.TraceScope;
68  
69  import com.google.common.annotations.VisibleForTesting;
70  
71  /**
72   * HLog stores all the edits to the HStore.  Its the hbase write-ahead-log
73   * implementation.
74   *
75   * It performs logfile-rolling, so external callers are not aware that the
76   * underlying file is being rolled.
77   *
78   * <p>
79   * There is one HLog per RegionServer.  All edits for all Regions carried by
80   * a particular RegionServer are entered first in the HLog.
81   *
82   * <p>
83   * Each HRegion is identified by a unique long <code>int</code>. HRegions do
84   * not need to declare themselves before using the HLog; they simply include
85   * their HRegion-id in the <code>append</code> or
86   * <code>completeCacheFlush</code> calls.
87   *
88   * <p>
89   * An HLog consists of multiple on-disk files, which have a chronological order.
90   * As data is flushed to other (better) on-disk structures, the log becomes
91   * obsolete. We can destroy all the log messages for a given HRegion-id up to
92   * the most-recent CACHEFLUSH message from that HRegion.
93   *
94   * <p>
95   * It's only practical to delete entire files. Thus, we delete an entire on-disk
96   * file F when all of the messages in F have a log-sequence-id that's older
97   * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
98   * a message in F.
99   *
100  * <p>
101  * Synchronized methods can never execute in parallel. However, between the
102  * start of a cache flush and the completion point, appends are allowed but log
103  * rolling is not. To prevent log rolling taking place during this period, a
104  * separate reentrant lock is used.
105  *
106  * <p>To read an HLog, call {@link HLogFactory#createReader(org.apache.hadoop.fs.FileSystem,
107  * org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}.
108  *
109  */
110 @InterfaceAudience.Private
111 class FSHLog implements HLog, Syncable {
112   static final Log LOG = LogFactory.getLog(FSHLog.class);
113 
114   private final FileSystem fs;
115   private final Path rootDir;
116   private final Path dir;
117   private final Configuration conf;
118   // Listeners that are called on WAL events.
119   private List<WALActionsListener> listeners =
120     new CopyOnWriteArrayList<WALActionsListener>();
121   private final long blocksize;
122   private final String prefix;
123   private final AtomicLong unflushedEntries = new AtomicLong(0);
124   private final AtomicLong syncedTillHere = new AtomicLong(0);
125   private long lastUnSyncedTxid;
126   private final Path oldLogDir;
127 
128   // all writes pending on AsyncWriter/AsyncSyncer thread with
129   // txid <= failedTxid will fail by throwing asyncIOE
130   private final AtomicLong failedTxid = new AtomicLong(0);
131   private volatile IOException asyncIOE = null;
132 
133   private WALCoprocessorHost coprocessorHost;
134 
135   private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer
136   // Minimum tolerable replicas, if the actual value is lower than it,
137   // rollWriter will be triggered
138   private int minTolerableReplication;
139   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
140   final static Object [] NO_ARGS = new Object []{};
141 
142   /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
143   private DrainBarrier closeBarrier = new DrainBarrier();
144 
145   /**
146    * Current log file.
147    */
148   Writer writer;
149 
150   /**
151    * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums,
152    * with the exception of append's putIfAbsent into oldestUnflushedSeqNums.
153    * We only use these to find out the low bound seqNum, or to find regions with old seqNums to
154    * force flush them, so we don't care about these numbers messing with anything. */
155   private final Object oldestSeqNumsLock = new Object();
156 
157   /**
158    * This lock makes sure only one log roll runs at the same time. Should not be taken while
159    * any other lock is held. We don't just use synchronized because that results in bogus and
160    * tedious findbugs warning when it thinks synchronized controls writer thread safety */
161   private final ReentrantLock rollWriterLock = new ReentrantLock(true);
162 
163   /**
164    * Map of encoded region names to their most recent sequence/edit id in their memstore.
165    */
166   private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
167     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
168   /**
169    * Map of encoded region names to their most recent sequence/edit id in their memstore;
170    * contains the regions that are currently flushing. That way we can store two numbers for
171    * flushing and non-flushing (oldestUnflushedSeqNums) memstore for the same region.
172    */
173   private final Map<byte[], Long> oldestFlushingSeqNums =
174     new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
175 
176   private volatile boolean closed = false;
177 
178   private boolean forMeta = false;
179 
180   // The timestamp (in ms) when the log file was created.
181   private volatile long filenum = -1;
182 
183   //number of transactions in the current Hlog.
184   private final AtomicInteger numEntries = new AtomicInteger(0);
185 
186   // If live datanode count is lower than the default replicas value,
187   // RollWriter will be triggered in each sync(So the RollWriter will be
188   // triggered one by one in a short time). Using it as a workaround to slow
189   // down the roll frequency triggered by checkLowReplication().
190   private AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
191   private final int lowReplicationRollLimit;
192 
193   // If consecutiveLogRolls is larger than lowReplicationRollLimit,
194   // then disable the rolling in checkLowReplication().
195   // Enable it if the replications recover.
196   private volatile boolean lowReplicationRollEnabled = true;
197 
198   // If > than this size, roll the log. This is typically 0.95 times the size
199   // of the default Hdfs block size.
200   private final long logrollsize;
201   
202   /** size of current log */
203   private long curLogSize = 0;
204 
205   /**
206    * The total size of hlog
207    */
208   private AtomicLong totalLogSize = new AtomicLong(0);
209   
210   // We synchronize on updateLock to prevent updates and to prevent a log roll
211   // during an update
212   // locked during appends
213   private final Object updateLock = new Object();
214   private final Object pendingWritesLock = new Object();
215 
216   private final boolean enabled;
217 
218   /*
219    * If more than this many logs, force flush of oldest region to oldest edit
220    * goes to disk.  If too many and we crash, then will take forever replaying.
221    * Keep the number of logs tidy.
222    */
223   private final int maxLogs;
224 
225   // List of pending writes to the HLog. There corresponds to transactions
226   // that have not yet returned to the client. We keep them cached here
227   // instead of writing them to HDFS piecemeal. The goal is to increase
228   // the batchsize for writing-to-hdfs as well as sync-to-hdfs, so that
229   // we can get better system throughput.
230   private List<Entry> pendingWrites = new LinkedList<Entry>();
231 
232   private final AsyncWriter   asyncWriter;
233   // since AsyncSyncer takes much longer than other phase(add WALEdits to local
234   // buffer, write local buffer to HDFS, notify pending write handler threads),
235   // when a sync is ongoing, all other phase pend, we use multiple parallel
236   // AsyncSyncer threads to improve overall throughput.
237   private final AsyncSyncer[] asyncSyncers;
238   private final AsyncNotifier asyncNotifier;
239 
240   /** Number of log close errors tolerated before we abort */
241   private final int closeErrorsTolerated;
242 
243   private final AtomicInteger closeErrorCount = new AtomicInteger();
244   private final MetricsWAL metrics;
245 /**
246  * Map of region encoded names to the latest sequence num obtained from them while appending
247  * WALEdits to the wal. We create one map for each WAL file at the time it is rolled.
248  * <p>
249  * When deciding whether to archive a WAL file, we compare the sequence IDs in this map to
250  * {@link #oldestFlushingSeqNums} and {@link #oldestUnflushedSeqNums}.
251  * See {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} for more info.
252  * <p>
253  * This map uses byte[] as the key, and uses reference equality. It works in our use case as we
254  * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
255  * the same array.
256  */
257   private Map<byte[], Long> latestSequenceNums = new HashMap<byte[], Long>();
258 
259   /**
260    * WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
261    */
262   public final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
263     @Override
264     public int compare(Path o1, Path o2) {
265       long t1 = getFileNumFromFileName(o1);
266       long t2 = getFileNumFromFileName(o2);
267       if (t1 == t2) return 0;
268       return (t1 > t2) ? 1 : -1;
269     }
270   };
271 
272   /**
273    * Map of log file to the latest sequence nums of all regions it has entries of.
274    * The map is sorted by the log file creation timestamp (contained in the log file name).
275    */
276   private NavigableMap<Path, Map<byte[], Long>> hlogSequenceNums =
277     new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
278 
279   /**
280    * Constructor.
281    *
282    * @param fs filesystem handle
283    * @param root path for stored and archived hlogs
284    * @param logDir dir where hlogs are stored
285    * @param conf configuration to use
286    * @throws IOException
287    */
288   public FSHLog(final FileSystem fs, final Path root, final String logDir,
289                 final Configuration conf)
290   throws IOException {
291     this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
292         conf, null, true, null, false);
293   }
294 
295   /**
296    * Constructor.
297    *
298    * @param fs filesystem handle
299    * @param root path for stored and archived hlogs
300    * @param logDir dir where hlogs are stored
301    * @param oldLogDir dir where hlogs are archived
302    * @param conf configuration to use
303    * @throws IOException
304    */
305   public FSHLog(final FileSystem fs, final Path root, final String logDir,
306                 final String oldLogDir, final Configuration conf)
307   throws IOException {
308     this(fs, root, logDir, oldLogDir,
309         conf, null, true, null, false);
310   }
311 
312   /**
313    * Create an edit log at the given <code>dir</code> location.
314    *
315    * You should never have to load an existing log. If there is a log at
316    * startup, it should have already been processed and deleted by the time the
317    * HLog object is started up.
318    *
319    * @param fs filesystem handle
320    * @param root path for stored and archived hlogs
321    * @param logDir dir where hlogs are stored
322    * @param conf configuration to use
323    * @param listeners Listeners on WAL events. Listeners passed here will
324    * be registered before we do anything else; e.g. the
325    * Constructor {@link #rollWriter()}.
326    * @param prefix should always be hostname and port in distributed env and
327    *        it will be URL encoded before being used.
328    *        If prefix is null, "hlog" will be used
329    * @throws IOException
330    */
331   public FSHLog(final FileSystem fs, final Path root, final String logDir,
332       final Configuration conf, final List<WALActionsListener> listeners,
333       final String prefix) throws IOException {
334     this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
335         conf, listeners, true, prefix, false);
336   }
337 
338   /**
339    * Create an edit log at the given <code>dir</code> location.
340    *
341    * You should never have to load an existing log. If there is a log at
342    * startup, it should have already been processed and deleted by the time the
343    * HLog object is started up.
344    *
345    * @param fs filesystem handle
346    * @param root path to where logs and oldlogs
347    * @param logDir dir where hlogs are stored
348    * @param oldLogDir dir where hlogs are archived
349    * @param conf configuration to use
350    * @param listeners Listeners on WAL events. Listeners passed here will
351    * be registered before we do anything else; e.g. the
352    * Constructor {@link #rollWriter()}.
353    * @param failIfLogDirExists If true IOException will be thrown if dir already exists.
354    * @param prefix should always be hostname and port in distributed env and
355    *        it will be URL encoded before being used.
356    *        If prefix is null, "hlog" will be used
357    * @param forMeta if this hlog is meant for meta updates
358    * @throws IOException
359    */
360   public FSHLog(final FileSystem fs, final Path root, final String logDir,
361       final String oldLogDir, final Configuration conf,
362       final List<WALActionsListener> listeners,
363       final boolean failIfLogDirExists, final String prefix, boolean forMeta)
364   throws IOException {
365     super();
366     this.fs = fs;
367     this.rootDir = root;
368     this.dir = new Path(this.rootDir, logDir);
369     this.oldLogDir = new Path(this.rootDir, oldLogDir);
370     this.forMeta = forMeta;
371     this.conf = conf;
372 
373     if (listeners != null) {
374       for (WALActionsListener i: listeners) {
375         registerWALActionsListener(i);
376       }
377     }
378 
379     this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
380         FSUtils.getDefaultBlockSize(this.fs, this.dir));
381     // Roll at 95% of block size.
382     float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
383     this.logrollsize = (long)(this.blocksize * multi);
384 
385     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
386     this.minTolerableReplication = conf.getInt(
387         "hbase.regionserver.hlog.tolerable.lowreplication",
388         FSUtils.getDefaultReplication(fs, this.dir));
389     this.lowReplicationRollLimit = conf.getInt(
390         "hbase.regionserver.hlog.lowreplication.rolllimit", 5);
391     this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
392     this.closeErrorsTolerated = conf.getInt(
393         "hbase.regionserver.logroll.errors.tolerated", 0);
394 
395 
396     LOG.info("WAL/HLog configuration: blocksize=" +
397       StringUtils.byteDesc(this.blocksize) +
398       ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
399       ", enabled=" + this.enabled);
400     // If prefix is null||empty then just name it hlog
401     this.prefix = prefix == null || prefix.isEmpty() ?
402         "hlog" : URLEncoder.encode(prefix, "UTF8");
403 
404     boolean dirExists = false;
405     if (failIfLogDirExists && (dirExists = this.fs.exists(dir))) {
406       throw new IOException("Target HLog directory already exists: " + dir);
407     }
408     if (!dirExists && !fs.mkdirs(dir)) {
409       throw new IOException("Unable to mkdir " + dir);
410     }
411 
412     if (!fs.exists(this.oldLogDir)) {
413       if (!fs.mkdirs(this.oldLogDir)) {
414         throw new IOException("Unable to mkdir " + this.oldLogDir);
415       }
416     }
417     // rollWriter sets this.hdfs_out if it can.
418     rollWriter();
419 
420     // handle the reflection necessary to call getNumCurrentReplicas()
421     this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
422 
423     final String n = Thread.currentThread().getName();
424 
425 
426     asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter");
427     asyncWriter.start();
428    
429     int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5);
430     asyncSyncers = new AsyncSyncer[syncerNums];
431     for (int i = 0; i < asyncSyncers.length; ++i) {
432       asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i);
433       asyncSyncers[i].start();
434     }
435 
436     asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier");
437     asyncNotifier.start();
438 
439     coprocessorHost = new WALCoprocessorHost(this, conf);
440 
441     this.metrics = new MetricsWAL();
442   }
443 
444   /**
445    * Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
446    * @return Method or null.
447    */
448   private Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
449     Method m = null;
450     if (os != null) {
451       Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream()
452           .getClass();
453       try {
454         m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas",
455             new Class<?>[] {});
456         m.setAccessible(true);
457       } catch (NoSuchMethodException e) {
458         LOG.info("FileSystem's output stream doesn't support"
459             + " getNumCurrentReplicas; --HDFS-826 not available; fsOut="
460             + wrappedStreamClass.getName());
461       } catch (SecurityException e) {
462         LOG.info("Doesn't have access to getNumCurrentReplicas on "
463             + "FileSystems's output stream --HDFS-826 not available; fsOut="
464             + wrappedStreamClass.getName(), e);
465         m = null; // could happen on setAccessible()
466       }
467     }
468     if (m != null) {
469       if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas--HDFS-826");
470     }
471     return m;
472   }
473 
474   @Override
475   public void registerWALActionsListener(final WALActionsListener listener) {
476     this.listeners.add(listener);
477   }
478 
479   @Override
480   public boolean unregisterWALActionsListener(final WALActionsListener listener) {
481     return this.listeners.remove(listener);
482   }
483 
484   @Override
485   public long getFilenum() {
486     return this.filenum;
487   }
488 
489   /**
490    * Method used internal to this class and for tests only.
491    * @return The wrapped stream our writer is using; its not the
492    * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps
493    * (In hdfs its an instance of DFSDataOutputStream).
494    *
495    * usage: see TestLogRolling.java
496    */
497   OutputStream getOutputStream() {
498     return this.hdfs_out.getWrappedStream();
499   }
500 
501   @Override
502   public byte [][] rollWriter() throws FailedLogCloseException, IOException {
503     return rollWriter(false);
504   }
505 
506   @Override
507   public byte [][] rollWriter(boolean force)
508       throws FailedLogCloseException, IOException {
509     rollWriterLock.lock();
510     try {
511       // Return if nothing to flush.
512       if (!force && this.writer != null && this.numEntries.get() <= 0) {
513         return null;
514       }
515       byte [][] regionsToFlush = null;
516       if (closed) {
517         LOG.debug("HLog closed. Skipping rolling of writer");
518         return null;
519       }
520       try {
521         if (!closeBarrier.beginOp()) {
522           LOG.debug("HLog closing. Skipping rolling of writer");
523           return regionsToFlush;
524         }
525         // Do all the preparation outside of the updateLock to block
526         // as less as possible the incoming writes
527         long currentFilenum = this.filenum;
528         Path oldPath = null;
529         if (currentFilenum > 0) {
530           //computeFilename  will take care of meta hlog filename
531           oldPath = computeFilename(currentFilenum);
532         }
533         this.filenum = System.currentTimeMillis();
534         Path newPath = computeFilename();
535         while (fs.exists(newPath)) {
536           this.filenum++;
537           newPath = computeFilename();
538         }
539 
540         // Tell our listeners that a new log is about to be created
541         if (!this.listeners.isEmpty()) {
542           for (WALActionsListener i : this.listeners) {
543             i.preLogRoll(oldPath, newPath);
544           }
545         }
546         FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
547         // Can we get at the dfsclient outputstream?
548         FSDataOutputStream nextHdfsOut = null;
549         if (nextWriter instanceof ProtobufLogWriter) {
550           nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
551           // perform the costly sync before we get the lock to roll writers.
552           try {
553             nextWriter.sync();
554           } catch (IOException e) {
555             // optimization failed, no need to abort here.
556             LOG.warn("pre-sync failed", e);
557           }
558         }
559 
560         Path oldFile = null;
561         int oldNumEntries = 0;
562         synchronized (updateLock) {
563           // Clean up current writer.
564           oldNumEntries = this.numEntries.get();
565           oldFile = cleanupCurrentWriter(currentFilenum);
566           this.writer = nextWriter;
567           this.hdfs_out = nextHdfsOut;
568           this.numEntries.set(0);
569           if (oldFile != null) {
570             this.hlogSequenceNums.put(oldFile, this.latestSequenceNums);
571             this.latestSequenceNums = new HashMap<byte[], Long>();
572           }
573         }
574         if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath));
575         else {
576           long oldFileLen = this.fs.getFileStatus(oldFile).getLen();
577           this.totalLogSize.addAndGet(oldFileLen);
578           LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries="
579               + oldNumEntries + ", filesize="
580               + StringUtils.humanReadableInt(oldFileLen) + "; new WAL "
581               + FSUtils.getPath(newPath));
582         }
583 
584         // Tell our listeners that a new log was created
585         if (!this.listeners.isEmpty()) {
586           for (WALActionsListener i : this.listeners) {
587             i.postLogRoll(oldPath, newPath);
588           }
589         }
590 
591         // Can we delete any of the old log files?
592         if (getNumRolledLogFiles() > 0) {
593           cleanOldLogs();
594           regionsToFlush = findRegionsToForceFlush();
595         }
596       } finally {
597         closeBarrier.endOp();
598       }
599       return regionsToFlush;
600     } finally {
601       rollWriterLock.unlock();
602     }
603   }
604 
605   /**
606    * This method allows subclasses to inject different writers without having to
607    * extend other methods like rollWriter().
608    *
609    * @param fs
610    * @param path
611    * @param conf
612    * @return Writer instance
613    * @throws IOException
614    */
615   protected Writer createWriterInstance(final FileSystem fs, final Path path,
616       final Configuration conf) throws IOException {
617     if (forMeta) {
618       //TODO: set a higher replication for the hlog files (HBASE-6773)
619     }
620     return HLogFactory.createWALWriter(fs, path, conf);
621   }
622 
623   /**
624    * Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits
625    * are already flushed by the corresponding regions.
626    * <p>
627    * For each log file, it compares its region to sequenceId map
628    * (@link {@link FSHLog#latestSequenceNums} with corresponding region entries in
629    * {@link FSHLog#oldestFlushingSeqNums} and {@link FSHLog#oldestUnflushedSeqNums}.
630    * If all the regions in the map are flushed past of their value, then the wal is eligible for
631    * archiving.
632    * @throws IOException
633    */
634   private void cleanOldLogs() throws IOException {
635     Map<byte[], Long> oldestFlushingSeqNumsLocal = null;
636     Map<byte[], Long> oldestUnflushedSeqNumsLocal = null;
637     List<Path> logsToArchive = new ArrayList<Path>();
638     // make a local copy so as to avoid locking when we iterate over these maps.
639     synchronized (oldestSeqNumsLock) {
640       oldestFlushingSeqNumsLocal = new HashMap<byte[], Long>(this.oldestFlushingSeqNums);
641       oldestUnflushedSeqNumsLocal = new HashMap<byte[], Long>(this.oldestUnflushedSeqNums);
642     }
643     for (Map.Entry<Path, Map<byte[], Long>> e : hlogSequenceNums.entrySet()) {
644       // iterate over the log file.
645       Path log = e.getKey();
646       Map<byte[], Long> sequenceNums = e.getValue();
647       // iterate over the map for this log file, and tell whether it should be archive or not.
648       if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal,
649         oldestUnflushedSeqNumsLocal)) {
650         logsToArchive.add(log);
651         LOG.debug("log file is ready for archiving " + log);
652       }
653     }
654     for (Path p : logsToArchive) {
655       this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
656       archiveLogFile(p);
657       this.hlogSequenceNums.remove(p);
658     }
659   }
660 
661   /**
662    * Takes a region:sequenceId map for a WAL file, and checks whether the file can be archived.
663    * It compares the region entries present in the passed sequenceNums map with the local copy of
664    * {@link #oldestUnflushedSeqNums} and {@link #oldestFlushingSeqNums}. If, for all regions,
665    * the value is lesser than the minimum of values present in the oldestFlushing/UnflushedSeqNums,
666    * then the wal file is eligible for archiving.
667    * @param sequenceNums for a HLog, at the time when it was rolled.
668    * @param oldestFlushingMap
669    * @param oldestUnflushedMap
670    * @return true if wal is eligible for archiving, false otherwise.
671    */
672    static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums,
673       Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) {
674     for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) {
675       // find region entries in the flushing/unflushed map. If there is no entry, it means
676       // a region doesn't have any unflushed entry.
677       long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ?
678           oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
679       long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ?
680           oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
681           // do a minimum to be sure to contain oldest sequence Id
682       long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed);
683       if (minSeqNum <= regionSeqIdEntry.getValue()) return false;// can't archive
684     }
685     return true;
686   }
687 
688   /**
689    * Iterates over the given map of regions, and compares their sequence numbers with corresponding
690    * entries in {@link #oldestUnflushedSeqNums}. If the sequence number is greater or equal, the
691    * region is eligible to flush, otherwise, there is no benefit to flush (from the perspective of
692    * passed regionsSequenceNums map), because the region has already flushed the entries present
693    * in the WAL file for which this method is called for (typically, the oldest wal file).
694    * @param regionsSequenceNums
695    * @return regions which should be flushed (whose sequence numbers are larger than their
696    * corresponding un-flushed entries.
697    */
698   private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) {
699     List<byte[]> regionsToFlush = null;
700     // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
701     synchronized (oldestSeqNumsLock) {
702       for (Map.Entry<byte[], Long> e : regionsSequenceNums.entrySet()) {
703         Long unFlushedVal = this.oldestUnflushedSeqNums.get(e.getKey());
704         if (unFlushedVal != null && unFlushedVal <= e.getValue()) {
705           if (regionsToFlush == null) regionsToFlush = new ArrayList<byte[]>();
706           regionsToFlush.add(e.getKey());
707         }
708       }
709     }
710     return regionsToFlush == null ? null : regionsToFlush
711         .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
712   }
713 
714   /**
715    * If the number of un-archived WAL files is greater than maximum allowed, it checks
716    * the first (oldest) WAL file, and returns the regions which should be flushed so that it could
717    * be archived.
718    * @return regions to flush in order to archive oldest wal file.
719    * @throws IOException
720    */
721   byte[][] findRegionsToForceFlush() throws IOException {
722     byte [][] regions = null;
723     int logCount = getNumRolledLogFiles();
724     if (logCount > this.maxLogs && logCount > 0) {
725       Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
726         this.hlogSequenceNums.firstEntry();
727       regions = findEligibleMemstoresToFlush(firstWALEntry.getValue());
728     }
729     if (regions != null) {
730       StringBuilder sb = new StringBuilder();
731       for (int i = 0; i < regions.length; i++) {
732         if (i > 0) sb.append(", ");
733         sb.append(Bytes.toStringBinary(regions[i]));
734       }
735       LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
736          this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
737          sb.toString());
738     }
739     return regions;
740   }
741 
742   /*
743    * Cleans up current writer closing.
744    * Presumes we're operating inside an updateLock scope.
745    * @return Path to current writer or null if none.
746    * @throws IOException
747    */
748   Path cleanupCurrentWriter(final long currentfilenum) throws IOException {
749     Path oldFile = null;
750     if (this.writer != null) {
751       // Close the current writer, get a new one.
752       try {
753         // Wait till all current transactions are written to the hlog.
754         // No new transactions can occur because we have the updatelock.
755         if (this.unflushedEntries.get() != this.syncedTillHere.get()) {
756           LOG.debug("cleanupCurrentWriter " +
757                    " waiting for transactions to get synced " +
758                    " total " + this.unflushedEntries.get() +
759                    " synced till here " + this.syncedTillHere.get());
760           sync();
761         }
762         this.writer.close();
763         this.writer = null;
764         closeErrorCount.set(0);
765       } catch (IOException e) {
766         LOG.error("Failed close of HLog writer", e);
767         int errors = closeErrorCount.incrementAndGet();
768         if (errors <= closeErrorsTolerated && !hasUnSyncedEntries()) {
769           LOG.warn("Riding over HLog close failure! error count="+errors);
770         } else {
771           if (hasUnSyncedEntries()) {
772             LOG.error("Aborting due to unflushed edits in HLog");
773           }
774           // Failed close of log file.  Means we're losing edits.  For now,
775           // shut ourselves down to minimize loss.  Alternative is to try and
776           // keep going.  See HBASE-930.
777           FailedLogCloseException flce =
778             new FailedLogCloseException("#" + currentfilenum);
779           flce.initCause(e);
780           throw flce;
781         }
782       }
783       if (currentfilenum >= 0) {
784         oldFile = computeFilename(currentfilenum);
785       }
786     }
787     return oldFile;
788   }
789 
790   private void archiveLogFile(final Path p) throws IOException {
791     Path newPath = getHLogArchivePath(this.oldLogDir, p);
792     // Tell our listeners that a log is going to be archived.
793     if (!this.listeners.isEmpty()) {
794       for (WALActionsListener i : this.listeners) {
795         i.preLogArchive(p, newPath);
796       }
797     }
798     if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
799       throw new IOException("Unable to rename " + p + " to " + newPath);
800     }
801     // Tell our listeners that a log has been archived.
802     if (!this.listeners.isEmpty()) {
803       for (WALActionsListener i : this.listeners) {
804         i.postLogArchive(p, newPath);
805       }
806     }
807   }
808 
809   /**
810    * This is a convenience method that computes a new filename with a given
811    * using the current HLog file-number
812    * @return Path
813    */
814   protected Path computeFilename() {
815     return computeFilename(this.filenum);
816   }
817 
818   /**
819    * This is a convenience method that computes a new filename with a given
820    * file-number.
821    * @param filenum to use
822    * @return Path
823    */
824   protected Path computeFilename(long filenum) {
825     if (filenum < 0) {
826       throw new RuntimeException("hlog file number can't be < 0");
827     }
828     String child = prefix + "." + filenum;
829     if (forMeta) {
830       child += HLog.META_HLOG_FILE_EXTN;
831     }
832     return new Path(dir, child);
833   }
834 
835 /**
836  * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}.
837  * This helper method returns the creation timestamp from a given log file.
838  * It extracts the timestamp assuming the filename is created with the
839  * {@link #computeFilename(long filenum)} method.
840  * @param fileName
841  * @return timestamp, as in the log file name.
842  */
843   protected long getFileNumFromFileName(Path fileName) {
844     if (fileName == null) throw new IllegalArgumentException("file name can't be null");
845     // The path should start with dir/<prefix>.
846     String prefixPathStr = new Path(dir, prefix + ".").toString();
847     if (!fileName.toString().startsWith(prefixPathStr)) {
848       throw new IllegalArgumentException("The log file " + fileName + " doesn't belong to" +
849       		" this regionserver " + prefixPathStr);
850     }
851     String chompedPath = fileName.toString().substring(prefixPathStr.length());
852     if (forMeta) chompedPath = chompedPath.substring(0, chompedPath.indexOf(META_HLOG_FILE_EXTN));
853     return Long.parseLong(chompedPath);
854   }
855 
856   @Override
857   public void closeAndDelete() throws IOException {
858     close();
859     if (!fs.exists(this.dir)) return;
860     FileStatus[] files = fs.listStatus(this.dir);
861     if (files != null) {
862       for(FileStatus file : files) {
863 
864         Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
865         // Tell our listeners that a log is going to be archived.
866         if (!this.listeners.isEmpty()) {
867           for (WALActionsListener i : this.listeners) {
868             i.preLogArchive(file.getPath(), p);
869           }
870         }
871 
872         if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
873           throw new IOException("Unable to rename " + file.getPath() + " to " + p);
874         }
875         // Tell our listeners that a log was archived.
876         if (!this.listeners.isEmpty()) {
877           for (WALActionsListener i : this.listeners) {
878             i.postLogArchive(file.getPath(), p);
879           }
880         }
881       }
882       LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.oldLogDir));
883     }
884     if (!fs.delete(dir, true)) {
885       LOG.info("Unable to delete " + dir);
886     }
887   }
888 
889   @Override
890   public void close() throws IOException {
891     if (this.closed) {
892       return;
893     }
894 
895     try {
896       asyncNotifier.interrupt();
897       asyncNotifier.join();
898     } catch (InterruptedException e) {
899       LOG.error("Exception while waiting for " + asyncNotifier.getName() +
900           " threads to die", e);
901     }
902 
903     for (int i = 0; i < asyncSyncers.length; ++i) {
904       try {
905         asyncSyncers[i].interrupt();
906         asyncSyncers[i].join();
907       } catch (InterruptedException e) {
908         LOG.error("Exception while waiting for " + asyncSyncers[i].getName() +
909             " threads to die", e);
910       }
911     }
912 
913     try {
914       asyncWriter.interrupt();
915       asyncWriter.join();
916     } catch (InterruptedException e) {
917       LOG.error("Exception while waiting for " + asyncWriter.getName() +
918           " thread to die", e);
919     }
920 
921     try {
922       // Prevent all further flushing and rolling.
923       closeBarrier.stopAndDrainOps();
924     } catch (InterruptedException e) {
925       LOG.error("Exception while waiting for cache flushes and log rolls", e);
926       Thread.currentThread().interrupt();
927     }
928 
929     // Tell our listeners that the log is closing
930     if (!this.listeners.isEmpty()) {
931       for (WALActionsListener i : this.listeners) {
932         i.logCloseRequested();
933       }
934     }
935     synchronized (updateLock) {
936       this.closed = true;
937       if (LOG.isDebugEnabled()) {
938         LOG.debug("Closing WAL writer in " + this.dir.toString());
939       }
940       if (this.writer != null) {
941         this.writer.close();
942         this.writer = null;
943       }
944     }
945   }
946 
947   /**
948    * @param now
949    * @param encodedRegionName Encoded name of the region as returned by
950    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
951    * @param tableName
952    * @param clusterIds that have consumed the change
953    * @return New log key.
954    */
955   protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
956       long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
957     return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
958   }
959 
960   @Override
961   @VisibleForTesting
962   public void append(HRegionInfo info, TableName tableName, WALEdit edits,
963     final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException {
964     append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, true, sequenceId,
965         HConstants.NO_NONCE, HConstants.NO_NONCE);
966   }
967 
968   /**
969    * Append a set of edits to the log. Log edits are keyed by (encoded)
970    * regionName, rowname, and log-sequence-id.
971    *
972    * Later, if we sort by these keys, we obtain all the relevant edits for a
973    * given key-range of the HRegion (TODO). Any edits that do not have a
974    * matching COMPLETE_CACHEFLUSH message can be discarded.
975    *
976    * <p>
977    * Logs cannot be restarted once closed, or once the HLog process dies. Each
978    * time the HLog starts, it must create a new log. This means that other
979    * systems should process the log appropriately upon each startup (and prior
980    * to initializing HLog).
981    *
982    * synchronized prevents appends during the completion of a cache flush or for
983    * the duration of a log roll.
984    *
985    * @param info
986    * @param tableName
987    * @param edits
988    * @param clusterIds that have consumed the change (for replication)
989    * @param now
990    * @param doSync shall we sync?
991    * @param sequenceId of the region.
992    * @return txid of this transaction
993    * @throws IOException
994    */
995   @SuppressWarnings("deprecation")
996   private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds,
997       final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, 
998       AtomicLong sequenceId, long nonceGroup, long nonce) throws IOException {
999       if (edits.isEmpty()) return this.unflushedEntries.get();
1000       if (this.closed) {
1001         throw new IOException("Cannot append; log is closed");
1002       }
1003       TraceScope traceScope = Trace.startSpan("FSHlog.append");
1004       try {
1005         long txid = 0;
1006         synchronized (this.updateLock) {
1007           // get the sequence number from the passed Long. In normal flow, it is coming from the
1008           // region.
1009           long seqNum = sequenceId.incrementAndGet();
1010           // The 'lastSeqWritten' map holds the sequence number of the oldest
1011           // write for each region (i.e. the first edit added to the particular
1012           // memstore). . When the cache is flushed, the entry for the
1013           // region being flushed is removed if the sequence number of the flush
1014           // is greater than or equal to the value in lastSeqWritten.
1015           // Use encoded name.  Its shorter, guaranteed unique and a subset of
1016           // actual  name.
1017           byte [] encodedRegionName = info.getEncodedNameAsBytes();
1018           if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
1019           HLogKey logKey = makeKey(
1020             encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce);
1021 
1022           synchronized (pendingWritesLock) {
1023             doWrite(info, logKey, edits, htd);
1024             txid = this.unflushedEntries.incrementAndGet();
1025           }
1026           this.numEntries.incrementAndGet();
1027           this.asyncWriter.setPendingTxid(txid);
1028 
1029           if (htd.isDeferredLogFlush()) {
1030             lastUnSyncedTxid = txid;
1031           }
1032           this.latestSequenceNums.put(encodedRegionName, seqNum);
1033         }
1034         // TODO: note that only tests currently call append w/sync.
1035         //       Therefore, this code here is not actually used by anything.
1036         // Sync if catalog region, and if not then check if that table supports
1037         // deferred log flushing
1038         if (doSync &&
1039             (info.isMetaRegion() ||
1040             !htd.isDeferredLogFlush())) {
1041           // sync txn to file system
1042           this.sync(txid);
1043         }
1044         return txid;
1045       } finally {
1046         traceScope.close();
1047       }
1048     }
1049 
1050   @Override
1051   public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
1052       List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
1053       boolean isInMemstore, long nonceGroup, long nonce) throws IOException {
1054     return append(info, tableName, edits, clusterIds,
1055         now, htd, false, isInMemstore, sequenceId, nonceGroup, nonce);
1056   }
1057 
1058   /* The work of current write process of HLog goes as below:
1059    * 1). All write handler threads append edits to HLog's local pending buffer;
1060    *     (it notifies AsyncWriter thread that there is new edits in local buffer)
1061    * 2). All write handler threads wait in HLog.syncer() function for underlying threads to
1062    *     finish the sync that contains its txid;
1063    * 3). An AsyncWriter thread is responsible for retrieving all edits in HLog's
1064    *     local pending buffer and writing to the hdfs (hlog.writer.append);
1065    *     (it notifies AsyncSyncer threads that there is new writes to hdfs which needs a sync)
1066    * 4). AsyncSyncer threads are responsible for issuing sync request to hdfs to persist the
1067    *     writes by AsyncWriter; (they notify the AsyncNotifier thread that sync is done)
1068    * 5). An AsyncNotifier thread is responsible for notifying all pending write handler
1069    *     threads which are waiting in the HLog.syncer() function
1070    * 6). No LogSyncer thread any more (since there is always AsyncWriter/AsyncFlusher threads
1071    *     do the same job it does)
1072    * note: more than one AsyncSyncer threads are needed here to guarantee good enough performance
1073    *       when less concurrent write handler threads. since sync is the most time-consuming
1074    *       operation in the whole write process, multiple AsyncSyncer threads can provide better
1075    *       parallelism of sync to get better overall throughput
1076    */
1077   // thread to write locally buffered writes to HDFS
1078   private class AsyncWriter extends HasThread {
1079     private long pendingTxid = 0;
1080     private long txidToWrite = 0;
1081     private long lastWrittenTxid = 0;
1082     private Object writeLock = new Object();
1083 
1084     public AsyncWriter(String name) {
1085       super(name);
1086     }
1087 
1088     // wake up (called by (write) handler thread) AsyncWriter thread
1089     // to write buffered writes to HDFS
1090     public void setPendingTxid(long txid) {
1091       synchronized (this.writeLock) {
1092         if (txid <= this.pendingTxid)
1093           return;
1094 
1095         this.pendingTxid = txid;
1096         this.writeLock.notify();
1097       }
1098     }
1099 
1100     public void run() {
1101       try {
1102         while (!this.isInterrupted()) {
1103           // 1. wait until there is new writes in local buffer
1104           synchronized (this.writeLock) {
1105             while (this.pendingTxid <= this.lastWrittenTxid) {
1106               this.writeLock.wait();
1107             }
1108           }
1109 
1110           // 2. get all buffered writes and update 'real' pendingTxid
1111           //    since maybe newer writes enter buffer as AsyncWriter wakes
1112           //    up and holds the lock
1113           // NOTE! can't hold 'updateLock' here since rollWriter will pend
1114           // on 'sync()' with 'updateLock', but 'sync()' will wait for
1115           // AsyncWriter/AsyncSyncer/AsyncNotifier series. without updateLock
1116           // can leads to pendWrites more than pendingTxid, but not problem
1117           List<Entry> pendWrites = null;
1118           synchronized (pendingWritesLock) {
1119             this.txidToWrite = unflushedEntries.get();
1120             pendWrites = pendingWrites;
1121             pendingWrites = new LinkedList<Entry>();
1122           }
1123 
1124           // 3. write all buffered writes to HDFS(append, without sync)
1125           try {
1126             for (Entry e : pendWrites) {
1127               writer.append(e);
1128             }
1129           } catch(IOException e) {
1130             LOG.error("Error while AsyncWriter write, request close of hlog ", e);
1131             requestLogRoll();
1132 
1133             asyncIOE = e;
1134             failedTxid.set(this.txidToWrite);
1135           }
1136 
1137           // 4. update 'lastWrittenTxid' and notify AsyncSyncer to do 'sync'
1138           this.lastWrittenTxid = this.txidToWrite;
1139           boolean hasIdleSyncer = false;
1140           for (int i = 0; i < asyncSyncers.length; ++i) {
1141             if (!asyncSyncers[i].isSyncing()) {
1142               hasIdleSyncer = true;
1143               asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid);
1144               break;
1145             }
1146           }
1147           if (!hasIdleSyncer) {
1148             int idx = (int)(this.lastWrittenTxid % asyncSyncers.length);
1149             asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid);
1150           }
1151         }
1152       } catch (InterruptedException e) {
1153         LOG.debug(getName() + " interrupted while waiting for " +
1154             "newer writes added to local buffer");
1155       } catch (Exception e) {
1156         LOG.error("UNEXPECTED", e);
1157       } finally {
1158         LOG.info(getName() + " exiting");
1159       }
1160     }
1161   }
1162 
1163   // thread to request HDFS to sync the WALEdits written by AsyncWriter
1164   // to make those WALEdits durable on HDFS side
1165   private class AsyncSyncer extends HasThread {
1166     private long writtenTxid = 0;
1167     private long txidToSync = 0;
1168     private long lastSyncedTxid = 0;
1169     private volatile boolean isSyncing = false;
1170     private Object syncLock = new Object();
1171 
1172     public AsyncSyncer(String name) {
1173       super(name);
1174     }
1175 
1176     public boolean isSyncing() {
1177       return this.isSyncing;
1178     }
1179 
1180     // wake up (called by AsyncWriter thread) AsyncSyncer thread
1181     // to sync(flush) writes written by AsyncWriter in HDFS
1182     public void setWrittenTxid(long txid) {
1183       synchronized (this.syncLock) {
1184         if (txid <= this.writtenTxid)
1185           return;
1186 
1187         this.writtenTxid = txid;
1188         this.syncLock.notify();
1189       }
1190     }
1191 
1192     public void run() {
1193       try {
1194         while (!this.isInterrupted()) {
1195           // 1. wait until AsyncWriter has written data to HDFS and
1196           //    called setWrittenTxid to wake up us
1197           synchronized (this.syncLock) {
1198             while (this.writtenTxid <= this.lastSyncedTxid) {
1199               this.syncLock.wait();
1200             }
1201             this.txidToSync = this.writtenTxid;
1202           }
1203 
1204           // if this syncer's writes have been synced by other syncer:
1205           // 1. just set lastSyncedTxid
1206           // 2. don't do real sync, don't notify AsyncNotifier, don't logroll check
1207           // regardless of whether the writer is null or not
1208           if (this.txidToSync <= syncedTillHere.get()) {
1209             this.lastSyncedTxid = this.txidToSync;
1210             continue;
1211           }
1212 
1213           // 2. do 'sync' to HDFS to provide durability
1214           long now = EnvironmentEdgeManager.currentTimeMillis();
1215           try {
1216             if (writer == null) {
1217               // the only possible case where writer == null is as below:
1218               // 1. t1: AsyncWriter append writes to hdfs,
1219               //        envokes AsyncSyncer 1 with writtenTxid==100
1220               // 2. t2: AsyncWriter append writes to hdfs,
1221               //        envokes AsyncSyncer 2 with writtenTxid==200
1222               // 3. t3: rollWriter starts, it grabs the updateLock which
1223               //        prevents further writes entering pendingWrites and
1224               //        wait for all items(200) in pendingWrites to append/sync
1225               //        to hdfs
1226               // 4. t4: AsyncSyncer 2 finishes, now syncedTillHere==200
1227               // 5. t5: rollWriter close writer, set writer=null...
1228               // 6. t6: AsyncSyncer 1 starts to use writer to do sync... before
1229               //        rollWriter set writer to the newly created Writer
1230               //
1231               // Now writer == null and txidToSync > syncedTillHere here:
1232               // we need fail all the writes with txid <= txidToSync to avoid
1233               // 'data loss' where user get successful write response but can't
1234               // read the writes!
1235               LOG.fatal("should never happen: has unsynced writes but writer is null!");
1236               asyncIOE = new IOException("has unsynced writes but writer is null!");
1237               failedTxid.set(this.txidToSync);
1238             } else {
1239               this.isSyncing = true;            
1240               writer.sync();
1241               this.isSyncing = false;
1242             }
1243             postSync();
1244           } catch (IOException e) {
1245             LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e);
1246             requestLogRoll();
1247 
1248             asyncIOE = e;
1249             failedTxid.set(this.txidToSync);
1250 
1251             this.isSyncing = false;
1252           }
1253           metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
1254 
1255           // 3. wake up AsyncNotifier to notify(wake-up) all pending 'put'
1256           // handler threads on 'sync()'
1257           this.lastSyncedTxid = this.txidToSync;
1258           asyncNotifier.setFlushedTxid(this.lastSyncedTxid);
1259 
1260           // 4. check and do logRoll if needed
1261           boolean logRollNeeded = false;
1262           if (rollWriterLock.tryLock()) {
1263             try {
1264               logRollNeeded = checkLowReplication();
1265             } finally {
1266               rollWriterLock.unlock();
1267             }            
1268             try {
1269               if (logRollNeeded || writer != null && writer.getLength() > logrollsize) {
1270                 requestLogRoll();
1271               }
1272             } catch (IOException e) {
1273               LOG.warn("writer.getLength() failed,this failure won't block here");
1274             }
1275           }
1276         }
1277       } catch (InterruptedException e) {
1278         LOG.debug(getName() + " interrupted while waiting for " +
1279             "notification from AsyncWriter thread");
1280       } catch (Exception e) {
1281         LOG.error("UNEXPECTED", e);
1282       } finally {
1283         LOG.info(getName() + " exiting");
1284       }
1285     }
1286   }
1287 
1288   // thread to notify all write handler threads which are pending on
1289   // their written WALEdits' durability(sync)
1290   // why an extra 'notifier' thread is needed rather than letting
1291   // AsyncSyncer thread itself notifies when sync is done is to let
1292   // AsyncSyncer thread do next sync as soon as possible since 'notify'
1293   // has heavy synchronization with all pending write handler threads
1294   private class AsyncNotifier extends HasThread {
1295     private long flushedTxid = 0;
1296     private long lastNotifiedTxid = 0;
1297     private Object notifyLock = new Object();
1298 
1299     public AsyncNotifier(String name) {
1300       super(name);
1301     }
1302 
1303     public void setFlushedTxid(long txid) {
1304       synchronized (this.notifyLock) {
1305         if (txid <= this.flushedTxid) {
1306           return;
1307         }
1308 
1309         this.flushedTxid = txid;
1310         this.notifyLock.notify();
1311       }
1312     }
1313 
1314     public void run() {
1315       try {
1316         while (!this.isInterrupted()) {
1317           synchronized (this.notifyLock) {
1318             while (this.flushedTxid <= this.lastNotifiedTxid) {
1319               this.notifyLock.wait();
1320             }
1321             this.lastNotifiedTxid = this.flushedTxid;
1322           }
1323 
1324           // notify(wake-up) all pending (write) handler thread
1325           // (or logroller thread which also may pend on sync())
1326           synchronized (syncedTillHere) {
1327             syncedTillHere.set(this.lastNotifiedTxid);
1328             syncedTillHere.notifyAll();
1329           }
1330         }
1331       } catch (InterruptedException e) {
1332         LOG.debug(getName() + " interrupted while waiting for " +
1333             " notification from AsyncSyncer thread");
1334       } catch (Exception e) {
1335         LOG.error("UNEXPECTED", e);
1336       } finally {
1337         LOG.info(getName() + " exiting");
1338       }
1339     }
1340   }
1341 
1342   // sync all known transactions
1343   private void syncer() throws IOException {
1344     syncer(this.unflushedEntries.get()); // sync all pending items
1345   }
1346 
1347   // sync all transactions upto the specified txid
1348   private void syncer(long txid) throws IOException {
1349     synchronized (this.syncedTillHere) {
1350       while (this.syncedTillHere.get() < txid) {
1351         try {
1352           this.syncedTillHere.wait();
1353 
1354           if (txid <= this.failedTxid.get()) {
1355             assert asyncIOE != null :
1356               "current txid is among(under) failed txids, but asyncIOE is null!";
1357             throw asyncIOE;
1358           }
1359         } catch (InterruptedException e) {
1360           LOG.debug("interrupted while waiting for notification from AsyncNotifier");
1361         }
1362       }
1363     }
1364   }
1365 
1366   @Override
1367   public void postSync() {}
1368 
1369   @Override
1370   public void postAppend(List<Entry> entries) {}
1371 
1372   /*
1373    * @return whether log roll should be requested
1374    */
1375   private boolean checkLowReplication() {
1376     boolean logRollNeeded = false;
1377     // if the number of replicas in HDFS has fallen below the configured
1378     // value, then roll logs.
1379     try {
1380       int numCurrentReplicas = getLogReplication();
1381       if (numCurrentReplicas != 0
1382           && numCurrentReplicas < this.minTolerableReplication) {
1383         if (this.lowReplicationRollEnabled) {
1384           if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1385             LOG.warn("HDFS pipeline error detected. " + "Found "
1386                 + numCurrentReplicas + " replicas but expecting no less than "
1387                 + this.minTolerableReplication + " replicas. "
1388                 + " Requesting close of hlog.");
1389             logRollNeeded = true;
1390             // If rollWriter is requested, increase consecutiveLogRolls. Once it
1391             // is larger than lowReplicationRollLimit, disable the
1392             // LowReplication-Roller
1393             this.consecutiveLogRolls.getAndIncrement();
1394           } else {
1395             LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1396                 + "the total number of live datanodes is lower than the tolerable replicas.");
1397             this.consecutiveLogRolls.set(0);
1398             this.lowReplicationRollEnabled = false;
1399           }
1400         }
1401       } else if (numCurrentReplicas >= this.minTolerableReplication) {
1402 
1403         if (!this.lowReplicationRollEnabled) {
1404           // The new writer's log replicas is always the default value.
1405           // So we should not enable LowReplication-Roller. If numEntries
1406           // is lower than or equals 1, we consider it as a new writer.
1407           if (this.numEntries.get() <= 1) {
1408             return logRollNeeded;
1409           }
1410           // Once the live datanode number and the replicas return to normal,
1411           // enable the LowReplication-Roller.
1412           this.lowReplicationRollEnabled = true;
1413           LOG.info("LowReplication-Roller was enabled.");
1414         }
1415       }
1416     } catch (Exception e) {
1417       LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
1418           " still proceeding ahead...");
1419     }
1420     return logRollNeeded;
1421   }
1422 
1423   /**
1424    * This method gets the datanode replication count for the current HLog.
1425    *
1426    * If the pipeline isn't started yet or is empty, you will get the default
1427    * replication factor.  Therefore, if this function returns 0, it means you
1428    * are not properly running with the HDFS-826 patch.
1429    * @throws InvocationTargetException
1430    * @throws IllegalAccessException
1431    * @throws IllegalArgumentException
1432    *
1433    * @throws Exception
1434    */
1435   int getLogReplication()
1436   throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1437     if (this.getNumCurrentReplicas != null && this.hdfs_out != null) {
1438       Object repl = this.getNumCurrentReplicas.invoke(getOutputStream(), NO_ARGS);
1439       if (repl instanceof Integer) {
1440         return ((Integer)repl).intValue();
1441       }
1442     }
1443     return 0;
1444   }
1445 
1446   boolean canGetCurReplicas() {
1447     return this.getNumCurrentReplicas != null;
1448   }
1449 
1450   @Override
1451   public void hsync() throws IOException {
1452     syncer();
1453   }
1454 
1455   @Override
1456   public void hflush() throws IOException {
1457     syncer();
1458   }
1459 
1460   @Override
1461   public void sync() throws IOException {
1462     syncer();
1463   }
1464 
1465   @Override
1466   public void sync(long txid) throws IOException {
1467     syncer(txid);
1468   }
1469 
1470   private void requestLogRoll() {
1471     if (!this.listeners.isEmpty()) {
1472       for (WALActionsListener i: this.listeners) {
1473         i.logRollRequested();
1474       }
1475     }
1476   }
1477 
1478   // TODO: Remove info.  Unused.
1479   protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
1480  HTableDescriptor htd)
1481   throws IOException {
1482     if (!this.enabled) {
1483       return;
1484     }
1485     if (!this.listeners.isEmpty()) {
1486       for (WALActionsListener i: this.listeners) {
1487         i.visitLogEntryBeforeWrite(htd, logKey, logEdit);
1488       }
1489     }
1490     try {
1491       long now = EnvironmentEdgeManager.currentTimeMillis();
1492       // coprocessor hook:
1493       if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
1494         if (logEdit.isReplay()) {
1495           // set replication scope null so that this won't be replicated
1496           logKey.setScopes(null);
1497         }
1498         // write to our buffer for the Hlog file.
1499         this.pendingWrites.add(new HLog.Entry(logKey, logEdit));
1500       }
1501       long took = EnvironmentEdgeManager.currentTimeMillis() - now;
1502       coprocessorHost.postWALWrite(info, logKey, logEdit);
1503       long len = 0;
1504       for (KeyValue kv : logEdit.getKeyValues()) {
1505         len += kv.getLength();
1506       }
1507       this.metrics.finishAppend(took, len);
1508     } catch (IOException e) {
1509       LOG.fatal("Could not append. Requesting close of hlog", e);
1510       requestLogRoll();
1511       throw e;
1512     }
1513   }
1514 
1515 
1516   /** @return How many items have been added to the log */
1517   int getNumEntries() {
1518     return numEntries.get();
1519   }
1520 
1521   /** @return the number of rolled log files */
1522   public int getNumRolledLogFiles() {
1523     return hlogSequenceNums.size();
1524   }
1525 
1526   /** @return the number of log files in use */
1527   @Override
1528   public int getNumLogFiles() {
1529     // +1 for current use log
1530     return getNumRolledLogFiles() + 1;
1531   }
1532   
1533   /** @return the size of log files in use */
1534   @Override
1535   public long getLogFileSize() {
1536     return totalLogSize.get() + curLogSize;
1537   }
1538   
1539   @Override
1540   public boolean startCacheFlush(final byte[] encodedRegionName) {
1541     Long oldRegionSeqNum = null;
1542     if (!closeBarrier.beginOp()) {
1543       LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
1544         " - because the server is closing.");
1545       return false;
1546     }
1547     synchronized (oldestSeqNumsLock) {
1548       oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
1549       if (oldRegionSeqNum != null) {
1550         Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
1551         assert oldValue == null : "Flushing map not cleaned up for "
1552           + Bytes.toString(encodedRegionName);
1553       }
1554     }
1555     if (oldRegionSeqNum == null) {
1556       // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either
1557       //       the region is already flushing (which would make this call invalid), or there
1558       //       were no appends after last flush, so why are we starting flush? Maybe we should
1559       //       assert not null, and switch to "long" everywhere. Less rigorous, but safer,
1560       //       alternative is telling the caller to stop. For now preserve old logic.
1561       LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
1562         + Bytes.toString(encodedRegionName) + "]");
1563     }
1564     return true;
1565   }
1566 
1567   @Override
1568   public void completeCacheFlush(final byte [] encodedRegionName)
1569   {
1570     synchronized (oldestSeqNumsLock) {
1571       this.oldestFlushingSeqNums.remove(encodedRegionName);
1572     }
1573     closeBarrier.endOp();
1574   }
1575 
1576   @Override
1577   public void abortCacheFlush(byte[] encodedRegionName) {
1578     Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
1579     synchronized (oldestSeqNumsLock) {
1580       seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
1581       if (seqNumBeforeFlushStarts != null) {
1582         currentSeqNum =
1583           this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
1584       }
1585     }
1586     closeBarrier.endOp();
1587     if ((currentSeqNum != null)
1588         && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
1589       String errorStr = "Region " + Bytes.toString(encodedRegionName) +
1590           "acquired edits out of order current memstore seq=" + currentSeqNum
1591           + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
1592       LOG.error(errorStr);
1593       assert false : errorStr;
1594       Runtime.getRuntime().halt(1);
1595     }
1596   }
1597 
1598   @Override
1599   public boolean isLowReplicationRollEnabled() {
1600       return lowReplicationRollEnabled;
1601   }
1602 
1603   /**
1604    * Get the directory we are making logs in.
1605    *
1606    * @return dir
1607    */
1608   protected Path getDir() {
1609     return dir;
1610   }
1611 
1612   static Path getHLogArchivePath(Path oldLogDir, Path p) {
1613     return new Path(oldLogDir, p.getName());
1614   }
1615 
1616   static String formatRecoveredEditsFileName(final long seqid) {
1617     return String.format("%019d", seqid);
1618   }
1619 
1620   public static final long FIXED_OVERHEAD = ClassSize.align(
1621     ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1622     ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1623 
1624   private static void usage() {
1625     System.err.println("Usage: HLog <ARGS>");
1626     System.err.println("Arguments:");
1627     System.err.println(" --dump  Dump textual representation of passed one or more files");
1628     System.err.println("         For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1629     System.err.println(" --split Split the passed directory of WAL logs");
1630     System.err.println("         For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1631   }
1632 
1633   private static void split(final Configuration conf, final Path p)
1634   throws IOException {
1635     FileSystem fs = FileSystem.get(conf);
1636     if (!fs.exists(p)) {
1637       throw new FileNotFoundException(p.toString());
1638     }
1639     if (!fs.getFileStatus(p).isDir()) {
1640       throw new IOException(p + " is not a directory");
1641     }
1642 
1643     final Path baseDir = FSUtils.getRootDir(conf);
1644     final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1645     HLogSplitter.split(baseDir, p, oldLogDir, fs, conf);
1646   }
1647 
1648   @Override
1649   public WALCoprocessorHost getCoprocessorHost() {
1650     return coprocessorHost;
1651   }
1652 
1653   /** Provide access to currently deferred sequence num for tests */
1654   boolean hasUnSyncedEntries() {
1655     return this.lastUnSyncedTxid > this.syncedTillHere.get();
1656   }
1657 
1658   @Override
1659   public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1660     Long result = oldestUnflushedSeqNums.get(encodedRegionName);
1661     return result == null ? HConstants.NO_SEQNUM : result.longValue();
1662   }
1663 
1664   /**
1665    * Pass one or more log file names and it will either dump out a text version
1666    * on <code>stdout</code> or split the specified log files.
1667    *
1668    * @param args
1669    * @throws IOException
1670    */
1671   public static void main(String[] args) throws IOException {
1672     if (args.length < 2) {
1673       usage();
1674       System.exit(-1);
1675     }
1676     // either dump using the HLogPrettyPrinter or split, depending on args
1677     if (args[0].compareTo("--dump") == 0) {
1678       HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1679     } else if (args[0].compareTo("--split") == 0) {
1680       Configuration conf = HBaseConfiguration.create();
1681       for (int i = 1; i < args.length; i++) {
1682         try {
1683           Path logPath = new Path(args[i]);
1684           FSUtils.setFsDefault(conf, logPath);
1685           split(conf, logPath);
1686         } catch (Throwable t) {
1687           t.printStackTrace(System.err);
1688           System.exit(-1);
1689         }
1690       }
1691     } else {
1692       usage();
1693       System.exit(-1);
1694     }
1695   }
1696 }