1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.regionserver.wal;
22
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.io.InterruptedIOException;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataInputStream;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
36 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
37 import org.apache.hadoop.hbase.util.CancelableProgressable;
38 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39
40 public class HLogFactory {
41 private static final Log LOG = LogFactory.getLog(HLogFactory.class);
42
43 public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
44 final Configuration conf) throws IOException {
45 return new FSHLog(fs, root, logName, conf);
46 }
47
48 public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
49 final String oldLogName, final Configuration conf) throws IOException {
50 return new FSHLog(fs, root, logName, oldLogName, conf);
51 }
52
53 public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
54 final Configuration conf, final List<WALActionsListener> listeners,
55 final String prefix) throws IOException {
56 return new FSHLog(fs, root, logName, conf, listeners, prefix);
57 }
58
59 public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName,
60 final Configuration conf, final List<WALActionsListener> listeners,
61 final String prefix) throws IOException {
62 return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
63 conf, listeners, false, prefix, true);
64 }
65
66
67
68
69 private static Class<? extends Reader> logReaderClass;
70
71 static void resetLogReaderClass() {
72 logReaderClass = null;
73 }
74
75 public static HLog.Reader createReader(final FileSystem fs,
76 final Path path, Configuration conf) throws IOException {
77 return createReader(fs, path, conf, null);
78 }
79
80
81
82
83
84
85
86
87 public static HLog.Reader createReader(final FileSystem fs, final Path path,
88 Configuration conf, CancelableProgressable reporter) throws IOException {
89 return createReader(fs, path, conf, reporter, true);
90 }
91
92 public static HLog.Reader createReader(final FileSystem fs, final Path path,
93 Configuration conf, CancelableProgressable reporter, boolean allowCustom)
94 throws IOException {
95 if (allowCustom && (logReaderClass == null)) {
96 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
97 ProtobufLogReader.class, Reader.class);
98 }
99 Class<? extends Reader> lrClass = allowCustom ? logReaderClass : ProtobufLogReader.class;
100
101 try {
102
103
104
105 long startWaiting = EnvironmentEdgeManager.currentTimeMillis();
106 long openTimeout = conf.getInt("hbase.hlog.open.timeout", 300000) + startWaiting;
107 int nbAttempt = 0;
108 while (true) {
109 try {
110 if (lrClass != ProtobufLogReader.class) {
111
112 HLog.Reader reader = lrClass.newInstance();
113 reader.init(fs, path, conf, null);
114 return reader;
115 } else {
116 FSDataInputStream stream = fs.open(path);
117
118
119
120
121 byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
122 boolean isPbWal = (stream.read(magic) == magic.length)
123 && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
124 HLog.Reader reader =
125 isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
126 reader.init(fs, path, conf, stream);
127 return reader;
128 }
129 } catch (IOException e) {
130 String msg = e.getMessage();
131 if (msg != null && (msg.contains("Cannot obtain block length")
132 || msg.contains("Could not obtain the last block")
133 || msg.matches("Blocklist for [^ ]* has changed.*"))) {
134 if (++nbAttempt == 1) {
135 LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
136 }
137 if (reporter != null && !reporter.progress()) {
138 throw new InterruptedIOException("Operation is cancelled");
139 }
140 if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTimeMillis()) {
141 LOG.error("Can't open after " + nbAttempt + " attempts and "
142 + (EnvironmentEdgeManager.currentTimeMillis() - startWaiting)
143 + "ms " + " for " + path);
144 } else {
145 try {
146 Thread.sleep(nbAttempt < 3 ? 500 : 1000);
147 continue;
148 } catch (InterruptedException ie) {
149 InterruptedIOException iioe = new InterruptedIOException();
150 iioe.initCause(ie);
151 throw iioe;
152 }
153 }
154 }
155 throw e;
156 }
157 }
158 } catch (IOException ie) {
159 throw ie;
160 } catch (Exception e) {
161 throw new IOException("Cannot get log reader", e);
162 }
163 }
164
165
166
167
168 private static Class<? extends Writer> logWriterClass;
169
170
171
172
173
174
175 public static HLog.Writer createWALWriter(final FileSystem fs,
176 final Path path, Configuration conf) throws IOException {
177 return createWriter(fs, path, conf, false);
178 }
179
180 public static HLog.Writer createRecoveredEditsWriter(final FileSystem fs,
181 final Path path, Configuration conf) throws IOException {
182 return createWriter(fs, path, conf, true);
183 }
184
185 private static HLog.Writer createWriter(final FileSystem fs,
186 final Path path, Configuration conf, boolean overwritable)
187 throws IOException {
188 try {
189 if (logWriterClass == null) {
190 logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
191 ProtobufLogWriter.class, Writer.class);
192 }
193 HLog.Writer writer = (HLog.Writer)logWriterClass.newInstance();
194 writer.init(fs, path, conf, overwritable);
195 return writer;
196 } catch (Exception e) {
197 throw new IOException("cannot get log writer", e);
198 }
199 }
200 }