1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.snapshot;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.Map;
24 import java.util.TreeMap;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.io.HLogLink;
34 import org.apache.hadoop.hbase.regionserver.HRegion;
35 import org.apache.hadoop.hbase.regionserver.wal.HLog;
36 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
37 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
38 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.FSUtils;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 @InterfaceAudience.Private
58 class SnapshotLogSplitter implements Closeable {
59 static final Log LOG = LogFactory.getLog(SnapshotLogSplitter.class);
60
61 private final class LogWriter implements Closeable {
62 private HLog.Writer writer;
63 private Path logFile;
64 private long seqId;
65
66 public LogWriter(final Configuration conf, final FileSystem fs,
67 final Path logDir, long seqId) throws IOException {
68 logFile = new Path(logDir, logFileName(seqId, true));
69 this.writer = HLogFactory.createRecoveredEditsWriter(fs, logFile, conf);
70 this.seqId = seqId;
71 }
72
73 public void close() throws IOException {
74 writer.close();
75
76 Path finalFile = new Path(logFile.getParent(), logFileName(seqId, false));
77 LOG.debug("LogWriter tmpLogFile=" + logFile + " -> logFile=" + finalFile);
78 fs.rename(logFile, finalFile);
79 }
80
81 public void append(final HLog.Entry entry) throws IOException {
82 writer.append(entry);
83 if (seqId < entry.getKey().getLogSeqNum()) {
84 seqId = entry.getKey().getLogSeqNum();
85 }
86 }
87
88 private String logFileName(long seqId, boolean temp) {
89 String fileName = String.format("%019d", seqId);
90 if (temp) fileName += HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
91 return fileName;
92 }
93 }
94
95 private final Map<byte[], LogWriter> regionLogWriters =
96 new TreeMap<byte[], LogWriter>(Bytes.BYTES_COMPARATOR);
97
98 private final Map<byte[], byte[]> regionsMap;
99 private final Configuration conf;
100 private final TableName snapshotTableName;
101 private final TableName tableName;
102 private final Path tableDir;
103 private final FileSystem fs;
104
105
106
107
108
109 public SnapshotLogSplitter(final Configuration conf, final FileSystem fs,
110 final Path tableDir, final TableName snapshotTableName,
111 final Map<byte[], byte[]> regionsMap) {
112 this.regionsMap = regionsMap;
113 this.snapshotTableName = snapshotTableName;
114 this.tableName = FSUtils.getTableName(tableDir);
115 this.tableDir = tableDir;
116 this.conf = conf;
117 this.fs = fs;
118 }
119
120 public void close() throws IOException {
121 for (LogWriter writer: regionLogWriters.values()) {
122 writer.close();
123 }
124 }
125
126 public void splitLog(final String serverName, final String logfile) throws IOException {
127 LOG.debug("Restore log=" + logfile + " server=" + serverName +
128 " for snapshotTable=" + snapshotTableName +
129 " to table=" + tableName);
130 splitLog(new HLogLink(conf, serverName, logfile).getAvailablePath(fs));
131 }
132
133 public void splitRecoveredEdit(final Path editPath) throws IOException {
134 LOG.debug("Restore recover.edits=" + editPath +
135 " for snapshotTable=" + snapshotTableName +
136 " to table=" + tableName);
137 splitLog(editPath);
138 }
139
140
141
142
143
144
145
146
147
148 public void splitLog(final Path logPath) throws IOException {
149 HLog.Reader log = HLogFactory.createReader(fs, logPath, conf);
150 try {
151 HLog.Entry entry;
152 LogWriter writer = null;
153 byte[] regionName = null;
154 byte[] newRegionName = null;
155 while ((entry = log.next()) != null) {
156 HLogKey key = entry.getKey();
157
158
159 if (!key.getTablename().equals(snapshotTableName)) continue;
160
161
162 if (!Bytes.equals(regionName, key.getEncodedRegionName())) {
163 regionName = key.getEncodedRegionName().clone();
164
165
166 newRegionName = regionsMap.get(regionName);
167 if (newRegionName == null) newRegionName = regionName;
168
169 writer = getOrCreateWriter(newRegionName, key.getLogSeqNum());
170 LOG.debug("+ regionName=" + Bytes.toString(regionName));
171 }
172
173
174 key = new HLogKey(newRegionName, tableName, key.getLogSeqNum(), key.getWriteTime(),
175 key.getClusterIds(), key.getNonceGroup(), key.getNonce());
176 writer.append(new HLog.Entry(key, entry.getEdit()));
177 }
178 } catch (IOException e) {
179 LOG.warn("Something wrong during the log split", e);
180 } finally {
181 log.close();
182 }
183 }
184
185
186
187
188 private LogWriter getOrCreateWriter(final byte[] regionName, long seqId) throws IOException {
189 LogWriter writer = regionLogWriters.get(regionName);
190 if (writer == null) {
191 Path regionDir = HRegion.getRegionDir(tableDir, Bytes.toString(regionName));
192 Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regionDir);
193 fs.mkdirs(dir);
194
195 writer = new LogWriter(conf, fs, dir, seqId);
196 regionLogWriters.put(regionName, writer);
197 }
198 return(writer);
199 }
200 }