1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.text.ParseException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.NavigableSet;
33 import java.util.Set;
34 import java.util.TreeMap;
35 import java.util.TreeSet;
36 import java.util.UUID;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.CompletionService;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.ExecutorCompletionService;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.ThreadFactory;
44 import java.util.concurrent.ThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicLong;
48 import java.util.concurrent.atomic.AtomicReference;
49 import java.util.regex.Matcher;
50 import java.util.regex.Pattern;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.hadoop.fs.FileAlreadyExistsException;
56 import org.apache.hadoop.fs.FileStatus;
57 import org.apache.hadoop.fs.FileSystem;
58 import org.apache.hadoop.fs.Path;
59 import org.apache.hadoop.fs.PathFilter;
60 import org.apache.hadoop.hbase.Cell;
61 import org.apache.hadoop.hbase.CellScanner;
62 import org.apache.hadoop.hbase.CellUtil;
63 import org.apache.hadoop.hbase.CoordinatedStateException;
64 import org.apache.hadoop.hbase.CoordinatedStateManager;
65 import org.apache.hadoop.hbase.HBaseConfiguration;
66 import org.apache.hadoop.hbase.HConstants;
67 import org.apache.hadoop.hbase.HRegionInfo;
68 import org.apache.hadoop.hbase.HRegionLocation;
69 import org.apache.hadoop.hbase.RemoteExceptionHandler;
70 import org.apache.hadoop.hbase.ServerName;
71 import org.apache.hadoop.hbase.TableName;
72 import org.apache.hadoop.hbase.TableNotFoundException;
73 import org.apache.hadoop.hbase.TableStateManager;
74 import org.apache.hadoop.hbase.classification.InterfaceAudience;
75 import org.apache.hadoop.hbase.client.ConnectionUtils;
76 import org.apache.hadoop.hbase.client.Delete;
77 import org.apache.hadoop.hbase.client.Durability;
78 import org.apache.hadoop.hbase.client.HConnection;
79 import org.apache.hadoop.hbase.client.HConnectionManager;
80 import org.apache.hadoop.hbase.client.Mutation;
81 import org.apache.hadoop.hbase.client.Put;
82 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
83 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
84 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
85 import org.apache.hadoop.hbase.io.HeapSize;
86 import org.apache.hadoop.hbase.master.SplitLogManager;
87 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
88 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
89 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
90 import org.apache.hadoop.hbase.protobuf.RequestConverter;
91 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
92 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
93 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
94 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
95 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
96 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
97 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
98 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
99 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
100 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
101 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
102 import org.apache.hadoop.hbase.regionserver.HRegion;
103 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
104
105 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
106 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
107 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
108 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
109 import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
110 import org.apache.hadoop.hbase.util.Bytes;
111 import org.apache.hadoop.hbase.util.CancelableProgressable;
112 import org.apache.hadoop.hbase.util.ClassSize;
113 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
114 import org.apache.hadoop.hbase.util.FSUtils;
115 import org.apache.hadoop.hbase.util.Pair;
116 import org.apache.hadoop.hbase.util.Threads;
117 import org.apache.hadoop.hbase.wal.WAL.Entry;
118 import org.apache.hadoop.hbase.wal.WAL.Reader;
119 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
120 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
121 import org.apache.hadoop.io.MultipleIOException;
122
123 import com.google.common.annotations.VisibleForTesting;
124 import com.google.common.base.Preconditions;
125 import com.google.common.collect.Lists;
126 import com.google.protobuf.ServiceException;
127 import com.google.protobuf.TextFormat;
128
129
130
131
132
133
134 @InterfaceAudience.Private
135 public class WALSplitter {
136 private static final Log LOG = LogFactory.getLog(WALSplitter.class);
137
138
139 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
140
141
142 protected final Path rootDir;
143 protected final FileSystem fs;
144 protected final Configuration conf;
145
146
147
148 PipelineController controller;
149 OutputSink outputSink;
150 EntryBuffers entryBuffers;
151
152 private Set<TableName> disablingOrDisabledTables =
153 new HashSet<TableName>();
154 private BaseCoordinatedStateManager csm;
155 private final WALFactory walFactory;
156
157 private MonitoredTask status;
158
159
160 protected final LastSequenceId sequenceIdChecker;
161
162 protected boolean distributedLogReplay;
163
164
165 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
166
167
168 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
169 new ConcurrentHashMap<String, Map<byte[], Long>>();
170
171
172 protected String failedServerName = "";
173
174
175 private final int numWriterThreads;
176
177
178 private final int minBatchSize;
179
180 WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
181 FileSystem fs, LastSequenceId idChecker,
182 CoordinatedStateManager csm, RecoveryMode mode) {
183 this.conf = HBaseConfiguration.create(conf);
184 String codecClassName = conf
185 .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
186 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
187 this.rootDir = rootDir;
188 this.fs = fs;
189 this.sequenceIdChecker = idChecker;
190 this.csm = (BaseCoordinatedStateManager)csm;
191 this.walFactory = factory;
192 this.controller = new PipelineController();
193
194 entryBuffers = new EntryBuffers(controller,
195 this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
196 128*1024*1024));
197
198
199
200 this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
201 this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
202
203 this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
204 if (csm != null && this.distributedLogReplay) {
205 outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
206 } else {
207 if (this.distributedLogReplay) {
208 LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
209 }
210 this.distributedLogReplay = false;
211 outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
212 }
213
214 }
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232 public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
233 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
234 CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
235 WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode);
236 return s.splitLogFile(logfile, reporter);
237 }
238
239
240
241
242
243 public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
244 FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
245 final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
246 Collections.singletonList(logDir), null);
247 List<Path> splits = new ArrayList<Path>();
248 if (logfiles != null && logfiles.length > 0) {
249 for (FileStatus logfile: logfiles) {
250 WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
251 RecoveryMode.LOG_SPLITTING);
252 if (s.splitLogFile(logfile, null)) {
253 finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
254 if (s.outputSink.splits != null) {
255 splits.addAll(s.outputSink.splits);
256 }
257 }
258 }
259 }
260 if (!fs.delete(logDir, true)) {
261 throw new IOException("Unable to delete src dir: " + logDir);
262 }
263 return splits;
264 }
265
266
267
268
269
270 boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
271 Preconditions.checkState(status == null);
272 Preconditions.checkArgument(logfile.isFile(),
273 "passed in file status is for something other than a regular file.");
274 boolean isCorrupted = false;
275 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
276 SPLIT_SKIP_ERRORS_DEFAULT);
277 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
278 Path logPath = logfile.getPath();
279 boolean outputSinkStarted = false;
280 boolean progress_failed = false;
281 int editsCount = 0;
282 int editsSkipped = 0;
283
284 status =
285 TaskMonitor.get().createStatus(
286 "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
287 Reader in = null;
288 try {
289 long logLength = logfile.getLen();
290 LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
291 LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
292 status.setStatus("Opening log file");
293 if (reporter != null && !reporter.progress()) {
294 progress_failed = true;
295 return false;
296 }
297 try {
298 in = getReader(logfile, skipErrors, reporter);
299 } catch (CorruptedLogFileException e) {
300 LOG.warn("Could not get reader, corrupted log file " + logPath, e);
301 ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
302 isCorrupted = true;
303 }
304 if (in == null) {
305 LOG.warn("Nothing to split in log file " + logPath);
306 return true;
307 }
308 if (csm != null) {
309 try {
310 TableStateManager tsm = csm.getTableStateManager();
311 disablingOrDisabledTables = tsm.getTablesInStates(
312 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
313 } catch (CoordinatedStateException e) {
314 throw new IOException("Can't get disabling/disabled tables", e);
315 }
316 }
317 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
318 int numOpenedFilesLastCheck = 0;
319 outputSink.setReporter(reporter);
320 outputSink.startWriterThreads();
321 outputSinkStarted = true;
322 Entry entry;
323 Long lastFlushedSequenceId = -1L;
324 ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logPath);
325 failedServerName = (serverName == null) ? "" : serverName.getServerName();
326 while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
327 byte[] region = entry.getKey().getEncodedRegionName();
328 String encodedRegionNameAsStr = Bytes.toString(region);
329 lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
330 if (lastFlushedSequenceId == null) {
331 if (this.distributedLogReplay) {
332 RegionStoreSequenceIds ids =
333 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
334 encodedRegionNameAsStr);
335 if (ids != null) {
336 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
337 if (LOG.isDebugEnabled()) {
338 LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
339 TextFormat.shortDebugString(ids));
340 }
341 }
342 } else if (sequenceIdChecker != null) {
343 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
344 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
345 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
346 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
347 storeSeqId.getSequenceId());
348 }
349 regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
350 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
351 if (LOG.isDebugEnabled()) {
352 LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
353 TextFormat.shortDebugString(ids));
354 }
355 }
356 if (lastFlushedSequenceId == null) {
357 lastFlushedSequenceId = -1L;
358 }
359 lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
360 }
361 if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
362 editsSkipped++;
363 continue;
364 }
365
366 if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvents()) {
367 editsSkipped++;
368 continue;
369 }
370 entryBuffers.appendEntry(entry);
371 editsCount++;
372 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
373
374 if (editsCount % interval == 0
375 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
376 numOpenedFilesLastCheck = this.getNumOpenWriters();
377 String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
378 + " edits, skipped " + editsSkipped + " edits.";
379 status.setStatus("Split " + countsStr);
380 if (reporter != null && !reporter.progress()) {
381 progress_failed = true;
382 return false;
383 }
384 }
385 }
386 } catch (InterruptedException ie) {
387 IOException iie = new InterruptedIOException();
388 iie.initCause(ie);
389 throw iie;
390 } catch (CorruptedLogFileException e) {
391 LOG.warn("Could not parse, corrupted log file " + logPath, e);
392 csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
393 logfile.getPath().getName(), fs);
394 isCorrupted = true;
395 } catch (IOException e) {
396 e = RemoteExceptionHandler.checkIOException(e);
397 throw e;
398 } finally {
399 LOG.debug("Finishing writing output logs and closing down.");
400 try {
401 if (null != in) {
402 in.close();
403 }
404 } catch (IOException exception) {
405 LOG.warn("Could not close wal reader: " + exception.getMessage());
406 LOG.debug("exception details", exception);
407 }
408 try {
409 if (outputSinkStarted) {
410
411
412 progress_failed = true;
413 progress_failed = outputSink.finishWritingAndClose() == null;
414 }
415 } finally {
416 String msg =
417 "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
418 + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
419 ", length=" + logfile.getLen() +
420 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
421 LOG.info(msg);
422 status.markComplete(msg);
423 }
424 }
425 return !progress_failed;
426 }
427
428
429
430
431
432
433
434
435
436
437
438
439 public static void finishSplitLogFile(String logfile,
440 Configuration conf) throws IOException {
441 Path rootdir = FSUtils.getRootDir(conf);
442 Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
443 Path logPath;
444 if (FSUtils.isStartingWithPath(rootdir, logfile)) {
445 logPath = new Path(logfile);
446 } else {
447 logPath = new Path(rootdir, logfile);
448 }
449 finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
450 }
451
452 static void finishSplitLogFile(Path rootdir, Path oldLogDir,
453 Path logPath, Configuration conf) throws IOException {
454 List<Path> processedLogs = new ArrayList<Path>();
455 List<Path> corruptedLogs = new ArrayList<Path>();
456 FileSystem fs;
457 fs = rootdir.getFileSystem(conf);
458 if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
459 corruptedLogs.add(logPath);
460 } else {
461 processedLogs.add(logPath);
462 }
463 archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
464 Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
465 fs.delete(stagingDir, true);
466 }
467
468
469
470
471
472
473
474
475
476
477
478
479
480 private static void archiveLogs(
481 final List<Path> corruptedLogs,
482 final List<Path> processedLogs, final Path oldLogDir,
483 final FileSystem fs, final Configuration conf) throws IOException {
484 final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
485 "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
486
487 if (!fs.mkdirs(corruptDir)) {
488 LOG.info("Unable to mkdir " + corruptDir);
489 }
490 fs.mkdirs(oldLogDir);
491
492
493
494 for (Path corrupted : corruptedLogs) {
495 Path p = new Path(corruptDir, corrupted.getName());
496 if (fs.exists(corrupted)) {
497 if (!fs.rename(corrupted, p)) {
498 LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
499 } else {
500 LOG.warn("Moved corrupted log " + corrupted + " to " + p);
501 }
502 }
503 }
504
505 for (Path p : processedLogs) {
506 Path newPath = FSHLog.getWALArchivePath(oldLogDir, p);
507 if (fs.exists(p)) {
508 if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
509 LOG.warn("Unable to move " + p + " to " + newPath);
510 } else {
511 LOG.info("Archived processed log " + p + " to " + newPath);
512 }
513 }
514 }
515 }
516
517
518
519
520
521
522
523
524
525
526
527
528
529 @SuppressWarnings("deprecation")
530 static Path getRegionSplitEditsPath(final FileSystem fs,
531 final Entry logEntry, final Path rootDir, boolean isCreate)
532 throws IOException {
533 Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
534 String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
535 Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
536 Path dir = getRegionDirRecoveredEditsDir(regiondir);
537
538 if (!fs.exists(regiondir)) {
539 LOG.info("This region's directory doesn't exist: "
540 + regiondir.toString() + ". It is very likely that it was" +
541 " already split so it's safe to discard those edits.");
542 return null;
543 }
544 if (fs.exists(dir) && fs.isFile(dir)) {
545 Path tmp = new Path("/tmp");
546 if (!fs.exists(tmp)) {
547 fs.mkdirs(tmp);
548 }
549 tmp = new Path(tmp,
550 HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
551 LOG.warn("Found existing old file: " + dir + ". It could be some "
552 + "leftover of an old installation. It should be a folder instead. "
553 + "So moving it to " + tmp);
554 if (!fs.rename(dir, tmp)) {
555 LOG.warn("Failed to sideline old file " + dir);
556 }
557 }
558
559 if (isCreate && !fs.exists(dir)) {
560 if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
561 }
562
563
564 String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
565 fileName = getTmpRecoveredEditsFileName(fileName);
566 return new Path(dir, fileName);
567 }
568
569 static String getTmpRecoveredEditsFileName(String fileName) {
570 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
571 }
572
573
574
575
576
577
578
579
580
581 static Path getCompletedRecoveredEditsFilePath(Path srcPath,
582 Long maximumEditLogSeqNum) {
583 String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
584 return new Path(srcPath.getParent(), fileName);
585 }
586
587 static String formatRecoveredEditsFileName(final long seqid) {
588 return String.format("%019d", seqid);
589 }
590
591 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
592 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
593
594
595
596
597
598
599
600 public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
601 return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
602 }
603
604
605
606
607
608
609
610
611
612
613 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
614 final Path regiondir) throws IOException {
615 NavigableSet<Path> filesSorted = new TreeSet<Path>();
616 Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
617 if (!fs.exists(editsdir))
618 return filesSorted;
619 FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
620 @Override
621 public boolean accept(Path p) {
622 boolean result = false;
623 try {
624
625
626
627
628 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
629 result = fs.isFile(p) && m.matches();
630
631
632 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
633 result = false;
634 }
635
636 if (isSequenceIdFile(p)) {
637 result = false;
638 }
639 } catch (IOException e) {
640 LOG.warn("Failed isFile check on " + p);
641 }
642 return result;
643 }
644 });
645 if (files == null) {
646 return filesSorted;
647 }
648 for (FileStatus status : files) {
649 filesSorted.add(status.getPath());
650 }
651 return filesSorted;
652 }
653
654
655
656
657
658
659
660
661
662
663 public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
664 throws IOException {
665 Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
666 + System.currentTimeMillis());
667 if (!fs.rename(edits, moveAsideName)) {
668 LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
669 }
670 return moveAsideName;
671 }
672
673 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
674 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
675 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
676
677
678
679
680 @VisibleForTesting
681 public static boolean isSequenceIdFile(final Path file) {
682 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
683 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
684 }
685
686
687
688
689
690
691
692
693
694
695 public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
696 long newSeqId, long saftyBumper) throws IOException {
697
698 Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
699 long maxSeqId = 0;
700 FileStatus[] files = null;
701 if (fs.exists(editsdir)) {
702 files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
703 @Override
704 public boolean accept(Path p) {
705 return isSequenceIdFile(p);
706 }
707 });
708 if (files != null) {
709 for (FileStatus status : files) {
710 String fileName = status.getPath().getName();
711 try {
712 Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
713 - SEQUENCE_ID_FILE_SUFFIX_LENGTH));
714 maxSeqId = Math.max(tmpSeqId, maxSeqId);
715 } catch (NumberFormatException ex) {
716 LOG.warn("Invalid SeqId File Name=" + fileName);
717 }
718 }
719 }
720 }
721 if (maxSeqId > newSeqId) {
722 newSeqId = maxSeqId;
723 }
724 newSeqId += saftyBumper;
725
726
727 Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
728 if (newSeqId != maxSeqId) {
729 try {
730 if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
731 throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
732 }
733 if (LOG.isDebugEnabled()) {
734 LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId
735 + ", maxSeqId=" + maxSeqId);
736 }
737 } catch (FileAlreadyExistsException ignored) {
738
739 }
740 }
741
742 if (files != null) {
743 for (FileStatus status : files) {
744 if (newSeqIdFile.equals(status.getPath())) {
745 continue;
746 }
747 fs.delete(status.getPath(), false);
748 }
749 }
750 return newSeqId;
751 }
752
753
754
755
756
757
758
759
760
761 protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
762 throws IOException, CorruptedLogFileException {
763 Path path = file.getPath();
764 long length = file.getLen();
765 Reader in;
766
767
768
769
770 if (length <= 0) {
771 LOG.warn("File " + path + " might be still open, length is 0");
772 }
773
774 try {
775 FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
776 try {
777 in = getReader(path, reporter);
778 } catch (EOFException e) {
779 if (length <= 0) {
780
781
782
783
784
785 LOG.warn("Could not open " + path + " for reading. File is empty", e);
786 return null;
787 } else {
788
789 return null;
790 }
791 }
792 } catch (IOException e) {
793 if (e instanceof FileNotFoundException) {
794
795 LOG.warn("File " + path + " doesn't exist anymore.", e);
796 return null;
797 }
798 if (!skipErrors || e instanceof InterruptedIOException) {
799 throw e;
800 }
801 CorruptedLogFileException t =
802 new CorruptedLogFileException("skipErrors=true Could not open wal " +
803 path + " ignoring");
804 t.initCause(e);
805 throw t;
806 }
807 return in;
808 }
809
810 static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
811 throws CorruptedLogFileException, IOException {
812 try {
813 return in.next();
814 } catch (EOFException eof) {
815
816 LOG.info("EOF from wal " + path + ". continuing");
817 return null;
818 } catch (IOException e) {
819
820
821 if (e.getCause() != null &&
822 (e.getCause() instanceof ParseException ||
823 e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
824 LOG.warn("Parse exception " + e.getCause().toString() + " from wal "
825 + path + ". continuing");
826 return null;
827 }
828 if (!skipErrors) {
829 throw e;
830 }
831 CorruptedLogFileException t =
832 new CorruptedLogFileException("skipErrors=true Ignoring exception" +
833 " while parsing wal " + path + ". Marking as corrupted");
834 t.initCause(e);
835 throw t;
836 }
837 }
838
839
840
841
842
843 protected Writer createWriter(Path logfile)
844 throws IOException {
845 return walFactory.createRecoveredEditsWriter(fs, logfile);
846 }
847
848
849
850
851
852 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
853 return walFactory.createReader(fs, curLogFile, reporter);
854 }
855
856
857
858
859 private int getNumOpenWriters() {
860 int result = 0;
861 if (this.outputSink != null) {
862 result += this.outputSink.getNumOpenWriters();
863 }
864 return result;
865 }
866
867
868
869
870 public static class PipelineController {
871
872
873 AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
874
875
876
877 public final Object dataAvailable = new Object();
878
879 void writerThreadError(Throwable t) {
880 thrown.compareAndSet(null, t);
881 }
882
883
884
885
886 void checkForErrors() throws IOException {
887 Throwable thrown = this.thrown.get();
888 if (thrown == null) return;
889 if (thrown instanceof IOException) {
890 throw new IOException(thrown);
891 } else {
892 throw new RuntimeException(thrown);
893 }
894 }
895 }
896
897
898
899
900
901
902
903
904 public static class EntryBuffers {
905 PipelineController controller;
906
907 Map<byte[], RegionEntryBuffer> buffers =
908 new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
909
910
911
912
913 Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
914
915 long totalBuffered = 0;
916 long maxHeapUsage;
917
918 public EntryBuffers(PipelineController controller, long maxHeapUsage) {
919 this.controller = controller;
920 this.maxHeapUsage = maxHeapUsage;
921 }
922
923
924
925
926
927
928
929
930 public void appendEntry(Entry entry) throws InterruptedException, IOException {
931 WALKey key = entry.getKey();
932
933 RegionEntryBuffer buffer;
934 long incrHeap;
935 synchronized (this) {
936 buffer = buffers.get(key.getEncodedRegionName());
937 if (buffer == null) {
938 buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
939 buffers.put(key.getEncodedRegionName(), buffer);
940 }
941 incrHeap= buffer.appendEntry(entry);
942 }
943
944
945 synchronized (controller.dataAvailable) {
946 totalBuffered += incrHeap;
947 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
948 LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
949 controller.dataAvailable.wait(2000);
950 }
951 controller.dataAvailable.notifyAll();
952 }
953 controller.checkForErrors();
954 }
955
956
957
958
959 synchronized RegionEntryBuffer getChunkToWrite() {
960 long biggestSize = 0;
961 byte[] biggestBufferKey = null;
962
963 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
964 long size = entry.getValue().heapSize();
965 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
966 biggestSize = size;
967 biggestBufferKey = entry.getKey();
968 }
969 }
970 if (biggestBufferKey == null) {
971 return null;
972 }
973
974 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
975 currentlyWriting.add(biggestBufferKey);
976 return buffer;
977 }
978
979 void doneWriting(RegionEntryBuffer buffer) {
980 synchronized (this) {
981 boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
982 assert removed;
983 }
984 long size = buffer.heapSize();
985
986 synchronized (controller.dataAvailable) {
987 totalBuffered -= size;
988
989 controller.dataAvailable.notifyAll();
990 }
991 }
992
993 synchronized boolean isRegionCurrentlyWriting(byte[] region) {
994 return currentlyWriting.contains(region);
995 }
996
997 public void waitUntilDrained() {
998 synchronized (controller.dataAvailable) {
999 while (totalBuffered > 0) {
1000 try {
1001 controller.dataAvailable.wait(2000);
1002 } catch (InterruptedException e) {
1003 LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
1004 Thread.interrupted();
1005 break;
1006 }
1007 }
1008 }
1009 }
1010 }
1011
1012
1013
1014
1015
1016
1017
1018 public static class RegionEntryBuffer implements HeapSize {
1019 long heapInBuffer = 0;
1020 List<Entry> entryBuffer;
1021 TableName tableName;
1022 byte[] encodedRegionName;
1023
1024 RegionEntryBuffer(TableName tableName, byte[] region) {
1025 this.tableName = tableName;
1026 this.encodedRegionName = region;
1027 this.entryBuffer = new LinkedList<Entry>();
1028 }
1029
1030 long appendEntry(Entry entry) {
1031 internify(entry);
1032 entryBuffer.add(entry);
1033 long incrHeap = entry.getEdit().heapSize() +
1034 ClassSize.align(2 * ClassSize.REFERENCE) +
1035 0;
1036 heapInBuffer += incrHeap;
1037 return incrHeap;
1038 }
1039
1040 private void internify(Entry entry) {
1041 WALKey k = entry.getKey();
1042 k.internTableName(this.tableName);
1043 k.internEncodedRegionName(this.encodedRegionName);
1044 }
1045
1046 @Override
1047 public long heapSize() {
1048 return heapInBuffer;
1049 }
1050
1051 public byte[] getEncodedRegionName() {
1052 return encodedRegionName;
1053 }
1054
1055 public List<Entry> getEntryBuffer() {
1056 return entryBuffer;
1057 }
1058
1059 public TableName getTableName() {
1060 return tableName;
1061 }
1062 }
1063
1064 public static class WriterThread extends Thread {
1065 private volatile boolean shouldStop = false;
1066 private PipelineController controller;
1067 private EntryBuffers entryBuffers;
1068 private OutputSink outputSink = null;
1069
1070 WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
1071 super(Thread.currentThread().getName() + "-Writer-" + i);
1072 this.controller = controller;
1073 this.entryBuffers = entryBuffers;
1074 outputSink = sink;
1075 }
1076
1077 @Override
1078 public void run() {
1079 try {
1080 doRun();
1081 } catch (Throwable t) {
1082 LOG.error("Exiting thread", t);
1083 controller.writerThreadError(t);
1084 }
1085 }
1086
1087 private void doRun() throws IOException {
1088 if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting");
1089 while (true) {
1090 RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1091 if (buffer == null) {
1092
1093 synchronized (controller.dataAvailable) {
1094 if (shouldStop && !this.outputSink.flush()) {
1095 return;
1096 }
1097 try {
1098 controller.dataAvailable.wait(500);
1099 } catch (InterruptedException ie) {
1100 if (!shouldStop) {
1101 throw new RuntimeException(ie);
1102 }
1103 }
1104 }
1105 continue;
1106 }
1107
1108 assert buffer != null;
1109 try {
1110 writeBuffer(buffer);
1111 } finally {
1112 entryBuffers.doneWriting(buffer);
1113 }
1114 }
1115 }
1116
1117 private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1118 outputSink.append(buffer);
1119 }
1120
1121 void finish() {
1122 synchronized (controller.dataAvailable) {
1123 shouldStop = true;
1124 controller.dataAvailable.notifyAll();
1125 }
1126 }
1127 }
1128
1129
1130
1131
1132
1133 public static abstract class OutputSink {
1134
1135 protected PipelineController controller;
1136 protected EntryBuffers entryBuffers;
1137
1138 protected Map<byte[], SinkWriter> writers = Collections
1139 .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
1140
1141 protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
1142 .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
1143
1144 protected final List<WriterThread> writerThreads = Lists.newArrayList();
1145
1146
1147 protected final Set<byte[]> blacklistedRegions = Collections
1148 .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
1149
1150 protected boolean closeAndCleanCompleted = false;
1151
1152 protected boolean writersClosed = false;
1153
1154 protected final int numThreads;
1155
1156 protected CancelableProgressable reporter = null;
1157
1158 protected AtomicLong skippedEdits = new AtomicLong();
1159
1160 protected List<Path> splits = null;
1161
1162 public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
1163 numThreads = numWriters;
1164 this.controller = controller;
1165 this.entryBuffers = entryBuffers;
1166 }
1167
1168 void setReporter(CancelableProgressable reporter) {
1169 this.reporter = reporter;
1170 }
1171
1172
1173
1174
1175 public synchronized void startWriterThreads() {
1176 for (int i = 0; i < numThreads; i++) {
1177 WriterThread t = new WriterThread(controller, entryBuffers, this, i);
1178 t.start();
1179 writerThreads.add(t);
1180 }
1181 }
1182
1183
1184
1185
1186
1187 void updateRegionMaximumEditLogSeqNum(Entry entry) {
1188 synchronized (regionMaximumEditLogSeqNum) {
1189 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
1190 .getEncodedRegionName());
1191 if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
1192 regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
1193 .getLogSeqNum());
1194 }
1195 }
1196 }
1197
1198 Long getRegionMaximumEditLogSeqNum(byte[] region) {
1199 return regionMaximumEditLogSeqNum.get(region);
1200 }
1201
1202
1203
1204
1205 int getNumOpenWriters() {
1206 return this.writers.size();
1207 }
1208
1209 long getSkippedEdits() {
1210 return this.skippedEdits.get();
1211 }
1212
1213
1214
1215
1216
1217
1218 protected boolean finishWriting(boolean interrupt) throws IOException {
1219 LOG.debug("Waiting for split writer threads to finish");
1220 boolean progress_failed = false;
1221 for (WriterThread t : writerThreads) {
1222 t.finish();
1223 }
1224 if (interrupt) {
1225 for (WriterThread t : writerThreads) {
1226 t.interrupt();
1227 }
1228 }
1229
1230 for (WriterThread t : writerThreads) {
1231 if (!progress_failed && reporter != null && !reporter.progress()) {
1232 progress_failed = true;
1233 }
1234 try {
1235 t.join();
1236 } catch (InterruptedException ie) {
1237 IOException iie = new InterruptedIOException();
1238 iie.initCause(ie);
1239 throw iie;
1240 }
1241 }
1242 controller.checkForErrors();
1243 LOG.info(this.writerThreads.size() + " split writers finished; closing...");
1244 return (!progress_failed);
1245 }
1246
1247 public abstract List<Path> finishWritingAndClose() throws IOException;
1248
1249
1250
1251
1252 public abstract Map<byte[], Long> getOutputCounts();
1253
1254
1255
1256
1257 public abstract int getNumberOfRecoveredRegions();
1258
1259
1260
1261
1262
1263 public abstract void append(RegionEntryBuffer buffer) throws IOException;
1264
1265
1266
1267
1268
1269 public boolean flush() throws IOException {
1270 return false;
1271 }
1272
1273
1274
1275
1276
1277
1278
1279
1280 public abstract boolean keepRegionEvents();
1281 }
1282
1283
1284
1285
1286 class LogRecoveredEditsOutputSink extends OutputSink {
1287
1288 public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1289 int numWriters) {
1290
1291
1292
1293
1294
1295 super(controller, entryBuffers, numWriters);
1296 }
1297
1298
1299
1300
1301
1302 @Override
1303 public List<Path> finishWritingAndClose() throws IOException {
1304 boolean isSuccessful = false;
1305 List<Path> result = null;
1306 try {
1307 isSuccessful = finishWriting(false);
1308 } finally {
1309 result = close();
1310 List<IOException> thrown = closeLogWriters(null);
1311 if (thrown != null && !thrown.isEmpty()) {
1312 throw MultipleIOException.createIOException(thrown);
1313 }
1314 }
1315 if (isSuccessful) {
1316 splits = result;
1317 }
1318 return splits;
1319 }
1320
1321
1322
1323
1324
1325 private List<Path> close() throws IOException {
1326 Preconditions.checkState(!closeAndCleanCompleted);
1327
1328 final List<Path> paths = new ArrayList<Path>();
1329 final List<IOException> thrown = Lists.newArrayList();
1330 ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1331 TimeUnit.SECONDS, new ThreadFactory() {
1332 private int count = 1;
1333
1334 @Override
1335 public Thread newThread(Runnable r) {
1336 Thread t = new Thread(r, "split-log-closeStream-" + count++);
1337 return t;
1338 }
1339 });
1340 CompletionService<Void> completionService =
1341 new ExecutorCompletionService<Void>(closeThreadPool);
1342 for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1343 if (LOG.isTraceEnabled()) {
1344 LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1345 }
1346 completionService.submit(new Callable<Void>() {
1347 @Override
1348 public Void call() throws Exception {
1349 WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1350 if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
1351 try {
1352 wap.w.close();
1353 } catch (IOException ioe) {
1354 LOG.error("Couldn't close log at " + wap.p, ioe);
1355 thrown.add(ioe);
1356 return null;
1357 }
1358 if (LOG.isDebugEnabled()) {
1359 LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
1360 + " edits, skipped " + wap.editsSkipped + " edits in "
1361 + (wap.nanosSpent / 1000 / 1000) + "ms");
1362 }
1363 if (wap.editsWritten == 0) {
1364
1365 if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1366 LOG.warn("Failed deleting empty " + wap.p);
1367 throw new IOException("Failed deleting empty " + wap.p);
1368 }
1369 return null;
1370 }
1371
1372 Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1373 regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1374 try {
1375 if (!dst.equals(wap.p) && fs.exists(dst)) {
1376 LOG.warn("Found existing old edits file. It could be the "
1377 + "result of a previous failed split attempt. Deleting " + dst + ", length="
1378 + fs.getFileStatus(dst).getLen());
1379 if (!fs.delete(dst, false)) {
1380 LOG.warn("Failed deleting of old " + dst);
1381 throw new IOException("Failed deleting of old " + dst);
1382 }
1383 }
1384
1385
1386
1387 if (fs.exists(wap.p)) {
1388 if (!fs.rename(wap.p, dst)) {
1389 throw new IOException("Failed renaming " + wap.p + " to " + dst);
1390 }
1391 LOG.info("Rename " + wap.p + " to " + dst);
1392 }
1393 } catch (IOException ioe) {
1394 LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1395 thrown.add(ioe);
1396 return null;
1397 }
1398 paths.add(dst);
1399 return null;
1400 }
1401 });
1402 }
1403
1404 boolean progress_failed = false;
1405 try {
1406 for (int i = 0, n = this.writers.size(); i < n; i++) {
1407 Future<Void> future = completionService.take();
1408 future.get();
1409 if (!progress_failed && reporter != null && !reporter.progress()) {
1410 progress_failed = true;
1411 }
1412 }
1413 } catch (InterruptedException e) {
1414 IOException iie = new InterruptedIOException();
1415 iie.initCause(e);
1416 throw iie;
1417 } catch (ExecutionException e) {
1418 throw new IOException(e.getCause());
1419 } finally {
1420 closeThreadPool.shutdownNow();
1421 }
1422
1423 if (!thrown.isEmpty()) {
1424 throw MultipleIOException.createIOException(thrown);
1425 }
1426 writersClosed = true;
1427 closeAndCleanCompleted = true;
1428 if (progress_failed) {
1429 return null;
1430 }
1431 return paths;
1432 }
1433
1434 private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1435 if (writersClosed) {
1436 return thrown;
1437 }
1438
1439 if (thrown == null) {
1440 thrown = Lists.newArrayList();
1441 }
1442 try {
1443 for (WriterThread t : writerThreads) {
1444 while (t.isAlive()) {
1445 t.shouldStop = true;
1446 t.interrupt();
1447 try {
1448 t.join(10);
1449 } catch (InterruptedException e) {
1450 IOException iie = new InterruptedIOException();
1451 iie.initCause(e);
1452 throw iie;
1453 }
1454 }
1455 }
1456 } finally {
1457 synchronized (writers) {
1458 WriterAndPath wap = null;
1459 for (SinkWriter tmpWAP : writers.values()) {
1460 try {
1461 wap = (WriterAndPath) tmpWAP;
1462 wap.w.close();
1463 } catch (IOException ioe) {
1464 LOG.error("Couldn't close log at " + wap.p, ioe);
1465 thrown.add(ioe);
1466 continue;
1467 }
1468 LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1469 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1470 }
1471 }
1472 writersClosed = true;
1473 }
1474
1475 return thrown;
1476 }
1477
1478
1479
1480
1481
1482
1483 private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1484 byte region[] = entry.getKey().getEncodedRegionName();
1485 WriterAndPath ret = (WriterAndPath) writers.get(region);
1486 if (ret != null) {
1487 return ret;
1488 }
1489
1490
1491 if (blacklistedRegions.contains(region)) {
1492 return null;
1493 }
1494 ret = createWAP(region, entry, rootDir);
1495 if (ret == null) {
1496 blacklistedRegions.add(region);
1497 return null;
1498 }
1499 writers.put(region, ret);
1500 return ret;
1501 }
1502
1503
1504
1505
1506 private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
1507 Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1508 if (regionedits == null) {
1509 return null;
1510 }
1511 if (fs.exists(regionedits)) {
1512 LOG.warn("Found old edits file. It could be the "
1513 + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1514 + fs.getFileStatus(regionedits).getLen());
1515 if (!fs.delete(regionedits, false)) {
1516 LOG.warn("Failed delete of old " + regionedits);
1517 }
1518 }
1519 Writer w = createWriter(regionedits);
1520 LOG.debug("Creating writer path=" + regionedits);
1521 return new WriterAndPath(regionedits, w);
1522 }
1523
1524 private void filterCellByStore(Entry logEntry) {
1525 Map<byte[], Long> maxSeqIdInStores =
1526 regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
1527 if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
1528 return;
1529 }
1530
1531
1532 ArrayList<Cell> keptCells = new ArrayList<Cell>(logEntry.getEdit().getCells().size());
1533 for (Cell cell : logEntry.getEdit().getCells()) {
1534 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1535 keptCells.add(cell);
1536 } else {
1537 byte[] family = CellUtil.cloneFamily(cell);
1538 Long maxSeqId = maxSeqIdInStores.get(family);
1539
1540
1541 if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getLogSeqNum()) {
1542 keptCells.add(cell);
1543 }
1544 }
1545 }
1546
1547
1548
1549
1550 logEntry.getEdit().setCells(keptCells);
1551 }
1552
1553 @Override
1554 public void append(RegionEntryBuffer buffer) throws IOException {
1555 List<Entry> entries = buffer.entryBuffer;
1556 if (entries.isEmpty()) {
1557 LOG.warn("got an empty buffer, skipping");
1558 return;
1559 }
1560
1561 WriterAndPath wap = null;
1562
1563 long startTime = System.nanoTime();
1564 try {
1565 int editsCount = 0;
1566
1567 for (Entry logEntry : entries) {
1568 if (wap == null) {
1569 wap = getWriterAndPath(logEntry);
1570 if (wap == null) {
1571 if (LOG.isDebugEnabled()) {
1572 LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
1573 }
1574 return;
1575 }
1576 }
1577 filterCellByStore(logEntry);
1578 if (!logEntry.getEdit().isEmpty()) {
1579 wap.w.append(logEntry);
1580 this.updateRegionMaximumEditLogSeqNum(logEntry);
1581 editsCount++;
1582 } else {
1583 wap.incrementSkippedEdits(1);
1584 }
1585 }
1586
1587 wap.incrementEdits(editsCount);
1588 wap.incrementNanoTime(System.nanoTime() - startTime);
1589 } catch (IOException e) {
1590 e = RemoteExceptionHandler.checkIOException(e);
1591 LOG.fatal(" Got while writing log entry to log", e);
1592 throw e;
1593 }
1594 }
1595
1596 @Override
1597 public boolean keepRegionEvents() {
1598 return false;
1599 }
1600
1601
1602
1603
1604 @Override
1605 public Map<byte[], Long> getOutputCounts() {
1606 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1607 synchronized (writers) {
1608 for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1609 ret.put(entry.getKey(), entry.getValue().editsWritten);
1610 }
1611 }
1612 return ret;
1613 }
1614
1615 @Override
1616 public int getNumberOfRecoveredRegions() {
1617 return writers.size();
1618 }
1619 }
1620
1621
1622
1623
1624 public abstract static class SinkWriter {
1625
1626 long editsWritten = 0;
1627
1628 long editsSkipped = 0;
1629
1630 long nanosSpent = 0;
1631
1632 void incrementEdits(int edits) {
1633 editsWritten += edits;
1634 }
1635
1636 void incrementSkippedEdits(int skipped) {
1637 editsSkipped += skipped;
1638 }
1639
1640 void incrementNanoTime(long nanos) {
1641 nanosSpent += nanos;
1642 }
1643 }
1644
1645
1646
1647
1648
1649 private final static class WriterAndPath extends SinkWriter {
1650 final Path p;
1651 final Writer w;
1652
1653 WriterAndPath(final Path p, final Writer w) {
1654 this.p = p;
1655 this.w = w;
1656 }
1657 }
1658
1659
1660
1661
1662 class LogReplayOutputSink extends OutputSink {
1663 private static final double BUFFER_THRESHOLD = 0.35;
1664 private static final String KEY_DELIMITER = "#";
1665
1666 private long waitRegionOnlineTimeOut;
1667 private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1668 private final Map<String, RegionServerWriter> writers =
1669 new ConcurrentHashMap<String, RegionServerWriter>();
1670
1671 private final Map<String, HRegionLocation> onlineRegions =
1672 new ConcurrentHashMap<String, HRegionLocation>();
1673
1674 private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1675 .synchronizedMap(new TreeMap<TableName, HConnection>());
1676
1677
1678
1679
1680 private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
1681 new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();
1682 private List<Throwable> thrown = new ArrayList<Throwable>();
1683
1684
1685
1686
1687
1688 private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1689 private boolean hasEditsInDisablingOrDisabledTables = false;
1690
1691 public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1692 int numWriters) {
1693 super(controller, entryBuffers, numWriters);
1694 this.waitRegionOnlineTimeOut = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
1695 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
1696 this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
1697 entryBuffers, numWriters);
1698 this.logRecoveredEditsOutputSink.setReporter(reporter);
1699 }
1700
1701 @Override
1702 public void append(RegionEntryBuffer buffer) throws IOException {
1703 List<Entry> entries = buffer.entryBuffer;
1704 if (entries.isEmpty()) {
1705 LOG.warn("got an empty buffer, skipping");
1706 return;
1707 }
1708
1709
1710 if (disablingOrDisabledTables.contains(buffer.tableName)) {
1711
1712 logRecoveredEditsOutputSink.append(buffer);
1713 hasEditsInDisablingOrDisabledTables = true;
1714
1715 addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1716 return;
1717 }
1718
1719
1720 groupEditsByServer(entries);
1721
1722
1723 String maxLocKey = null;
1724 int maxSize = 0;
1725 List<Pair<HRegionLocation, Entry>> maxQueue = null;
1726 synchronized (this.serverToBufferQueueMap) {
1727 for (String key : this.serverToBufferQueueMap.keySet()) {
1728 List<Pair<HRegionLocation, Entry>> curQueue = this.serverToBufferQueueMap.get(key);
1729 if (curQueue.size() > maxSize) {
1730 maxSize = curQueue.size();
1731 maxQueue = curQueue;
1732 maxLocKey = key;
1733 }
1734 }
1735 if (maxSize < minBatchSize
1736 && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1737
1738 return;
1739 } else if (maxSize > 0) {
1740 this.serverToBufferQueueMap.remove(maxLocKey);
1741 }
1742 }
1743
1744 if (maxSize > 0) {
1745 processWorkItems(maxLocKey, maxQueue);
1746 }
1747 }
1748
1749 private void addToRecoveredRegions(String encodedRegionName) {
1750 if (!recoveredRegions.contains(encodedRegionName)) {
1751 recoveredRegions.add(encodedRegionName);
1752 }
1753 }
1754
1755
1756
1757
1758
1759 private void groupEditsByServer(List<Entry> entries) throws IOException {
1760 Set<TableName> nonExistentTables = null;
1761 Long cachedLastFlushedSequenceId = -1l;
1762 for (Entry entry : entries) {
1763 WALEdit edit = entry.getEdit();
1764 TableName table = entry.getKey().getTablename();
1765
1766 entry.getKey().setScopes(null);
1767 String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1768
1769 if (nonExistentTables != null && nonExistentTables.contains(table)) {
1770 this.skippedEdits.incrementAndGet();
1771 continue;
1772 }
1773
1774 Map<byte[], Long> maxStoreSequenceIds = null;
1775 boolean needSkip = false;
1776 HRegionLocation loc = null;
1777 String locKey = null;
1778 List<Cell> cells = edit.getCells();
1779 List<Cell> skippedCells = new ArrayList<Cell>();
1780 HConnection hconn = this.getConnectionByTableName(table);
1781
1782 for (Cell cell : cells) {
1783 byte[] row = CellUtil.cloneRow(cell);
1784 byte[] family = CellUtil.cloneFamily(cell);
1785 boolean isCompactionEntry = false;
1786 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1787 CompactionDescriptor compaction = WALEdit.getCompaction(cell);
1788 if (compaction != null && compaction.hasRegionName()) {
1789 try {
1790 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1791 .toByteArray());
1792 row = regionName[1];
1793 family = compaction.getFamilyName().toByteArray();
1794 isCompactionEntry = true;
1795 } catch (Exception ex) {
1796 LOG.warn("Unexpected exception received, ignoring " + ex);
1797 skippedCells.add(cell);
1798 continue;
1799 }
1800 } else {
1801 skippedCells.add(cell);
1802 continue;
1803 }
1804 }
1805
1806 try {
1807 loc =
1808 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1809 encodeRegionNameStr);
1810
1811 if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1812 loc.getRegionInfo().getEncodedName())) {
1813 LOG.info("Not replaying a compaction marker for an older region: "
1814 + encodeRegionNameStr);
1815 needSkip = true;
1816 }
1817 } catch (TableNotFoundException ex) {
1818
1819 LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1820 + encodeRegionNameStr);
1821 lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1822 if (nonExistentTables == null) {
1823 nonExistentTables = new TreeSet<TableName>();
1824 }
1825 nonExistentTables.add(table);
1826 this.skippedEdits.incrementAndGet();
1827 needSkip = true;
1828 break;
1829 }
1830
1831 cachedLastFlushedSequenceId =
1832 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1833 if (cachedLastFlushedSequenceId != null
1834 && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1835
1836 this.skippedEdits.incrementAndGet();
1837 needSkip = true;
1838 break;
1839 } else {
1840 if (maxStoreSequenceIds == null) {
1841 maxStoreSequenceIds =
1842 regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1843 }
1844 if (maxStoreSequenceIds != null) {
1845 Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1846 if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1847
1848 skippedCells.add(cell);
1849 continue;
1850 }
1851 }
1852 }
1853 }
1854
1855
1856 if (loc == null || needSkip) continue;
1857
1858 if (!skippedCells.isEmpty()) {
1859 cells.removeAll(skippedCells);
1860 }
1861
1862 synchronized (serverToBufferQueueMap) {
1863 locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1864 List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey);
1865 if (queue == null) {
1866 queue =
1867 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>());
1868 serverToBufferQueueMap.put(locKey, queue);
1869 }
1870 queue.add(new Pair<HRegionLocation, Entry>(loc, entry));
1871 }
1872
1873 addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1874 }
1875 }
1876
1877
1878
1879
1880
1881
1882 private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1883 TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1884
1885 HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1886 if(loc != null) return loc;
1887
1888 loc = hconn.getRegionLocation(table, row, true);
1889 if (loc == null) {
1890 throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1891 + " of table:" + table);
1892 }
1893
1894 if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1895
1896 lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1897 HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1898 if (tmpLoc != null) return tmpLoc;
1899 }
1900
1901 Long lastFlushedSequenceId = -1l;
1902 AtomicBoolean isRecovering = new AtomicBoolean(true);
1903 loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1904 if (!isRecovering.get()) {
1905
1906
1907 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1908 LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1909 + " because it's not in recovering.");
1910 } else {
1911 Long cachedLastFlushedSequenceId =
1912 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1913
1914
1915
1916 RegionStoreSequenceIds ids =
1917 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
1918 loc.getRegionInfo().getEncodedName());
1919 if (ids != null) {
1920 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1921 Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1922 List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1923 for (StoreSequenceId id : maxSeqIdInStores) {
1924 storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1925 }
1926 regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1927 }
1928
1929 if (cachedLastFlushedSequenceId == null
1930 || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1931 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1932 }
1933 }
1934
1935 onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1936 return loc;
1937 }
1938
1939 private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions)
1940 throws IOException {
1941 RegionServerWriter rsw = null;
1942
1943 long startTime = System.nanoTime();
1944 try {
1945 rsw = getRegionServerWriter(key);
1946 rsw.sink.replayEntries(actions);
1947
1948
1949 rsw.incrementEdits(actions.size());
1950 rsw.incrementNanoTime(System.nanoTime() - startTime);
1951 } catch (IOException e) {
1952 e = RemoteExceptionHandler.checkIOException(e);
1953 LOG.fatal(" Got while writing log entry to log", e);
1954 throw e;
1955 }
1956 }
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967 private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1968 final long timeout, AtomicBoolean isRecovering)
1969 throws IOException {
1970 final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1971 final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1972 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1973 boolean reloadLocation = false;
1974 TableName tableName = loc.getRegionInfo().getTable();
1975 int tries = 0;
1976 Throwable cause = null;
1977 while (endTime > EnvironmentEdgeManager.currentTime()) {
1978 try {
1979
1980 HConnection hconn = getConnectionByTableName(tableName);
1981 if(reloadLocation) {
1982 loc = hconn.getRegionLocation(tableName, row, true);
1983 }
1984 BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1985 HRegionInfo region = loc.getRegionInfo();
1986 try {
1987 GetRegionInfoRequest request =
1988 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1989 GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1990 if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1991 isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1992 return loc;
1993 }
1994 } catch (ServiceException se) {
1995 throw ProtobufUtil.getRemoteException(se);
1996 }
1997 } catch (IOException e) {
1998 cause = e.getCause();
1999 if(!(cause instanceof RegionOpeningException)) {
2000 reloadLocation = true;
2001 }
2002 }
2003 long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
2004 try {
2005 Thread.sleep(expectedSleep);
2006 } catch (InterruptedException e) {
2007 throw new IOException("Interrupted when waiting region " +
2008 loc.getRegionInfo().getEncodedName() + " online.", e);
2009 }
2010 tries++;
2011 }
2012
2013 throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
2014 " online for " + timeout + " milliseconds.", cause);
2015 }
2016
2017 @Override
2018 public boolean flush() throws IOException {
2019 String curLoc = null;
2020 int curSize = 0;
2021 List<Pair<HRegionLocation, Entry>> curQueue = null;
2022 synchronized (this.serverToBufferQueueMap) {
2023 for (String locationKey : this.serverToBufferQueueMap.keySet()) {
2024 curQueue = this.serverToBufferQueueMap.get(locationKey);
2025 if (!curQueue.isEmpty()) {
2026 curSize = curQueue.size();
2027 curLoc = locationKey;
2028 break;
2029 }
2030 }
2031 if (curSize > 0) {
2032 this.serverToBufferQueueMap.remove(curLoc);
2033 }
2034 }
2035
2036 if (curSize > 0) {
2037 this.processWorkItems(curLoc, curQueue);
2038
2039 synchronized(controller.dataAvailable) {
2040 controller.dataAvailable.notifyAll();
2041 }
2042 return true;
2043 }
2044 return false;
2045 }
2046
2047 @Override
2048 public boolean keepRegionEvents() {
2049 return true;
2050 }
2051
2052 void addWriterError(Throwable t) {
2053 thrown.add(t);
2054 }
2055
2056 @Override
2057 public List<Path> finishWritingAndClose() throws IOException {
2058 try {
2059 if (!finishWriting(false)) {
2060 return null;
2061 }
2062 if (hasEditsInDisablingOrDisabledTables) {
2063 splits = logRecoveredEditsOutputSink.finishWritingAndClose();
2064 } else {
2065 splits = new ArrayList<Path>();
2066 }
2067
2068 return splits;
2069 } finally {
2070 List<IOException> thrown = closeRegionServerWriters();
2071 if (thrown != null && !thrown.isEmpty()) {
2072 throw MultipleIOException.createIOException(thrown);
2073 }
2074 }
2075 }
2076
2077 @Override
2078 int getNumOpenWriters() {
2079 return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
2080 }
2081
2082 private List<IOException> closeRegionServerWriters() throws IOException {
2083 List<IOException> result = null;
2084 if (!writersClosed) {
2085 result = Lists.newArrayList();
2086 try {
2087 for (WriterThread t : writerThreads) {
2088 while (t.isAlive()) {
2089 t.shouldStop = true;
2090 t.interrupt();
2091 try {
2092 t.join(10);
2093 } catch (InterruptedException e) {
2094 IOException iie = new InterruptedIOException();
2095 iie.initCause(e);
2096 throw iie;
2097 }
2098 }
2099 }
2100 } finally {
2101 synchronized (writers) {
2102 for (String locationKey : writers.keySet()) {
2103 RegionServerWriter tmpW = writers.get(locationKey);
2104 try {
2105 tmpW.close();
2106 } catch (IOException ioe) {
2107 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
2108 result.add(ioe);
2109 }
2110 }
2111 }
2112
2113
2114 synchronized (this.tableNameToHConnectionMap) {
2115 for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
2116 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2117 try {
2118 hconn.clearRegionCache();
2119 hconn.close();
2120 } catch (IOException ioe) {
2121 result.add(ioe);
2122 }
2123 }
2124 }
2125 writersClosed = true;
2126 }
2127 }
2128 return result;
2129 }
2130
2131 @Override
2132 public Map<byte[], Long> getOutputCounts() {
2133 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2134 synchronized (writers) {
2135 for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2136 ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
2137 }
2138 }
2139 return ret;
2140 }
2141
2142 @Override
2143 public int getNumberOfRecoveredRegions() {
2144 return this.recoveredRegions.size();
2145 }
2146
2147
2148
2149
2150
2151
2152 private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
2153 RegionServerWriter ret = writers.get(loc);
2154 if (ret != null) {
2155 return ret;
2156 }
2157
2158 TableName tableName = getTableFromLocationStr(loc);
2159 if(tableName == null){
2160 throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
2161 }
2162
2163 HConnection hconn = getConnectionByTableName(tableName);
2164 synchronized (writers) {
2165 ret = writers.get(loc);
2166 if (ret == null) {
2167 ret = new RegionServerWriter(conf, tableName, hconn);
2168 writers.put(loc, ret);
2169 }
2170 }
2171 return ret;
2172 }
2173
2174 private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
2175 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2176 if (hconn == null) {
2177 synchronized (this.tableNameToHConnectionMap) {
2178 hconn = this.tableNameToHConnectionMap.get(tableName);
2179 if (hconn == null) {
2180 hconn = HConnectionManager.getConnection(conf);
2181 this.tableNameToHConnectionMap.put(tableName, hconn);
2182 }
2183 }
2184 }
2185 return hconn;
2186 }
2187 private TableName getTableFromLocationStr(String loc) {
2188
2189
2190
2191 String[] splits = loc.split(KEY_DELIMITER);
2192 if (splits.length != 2) {
2193 return null;
2194 }
2195 return TableName.valueOf(splits[1]);
2196 }
2197 }
2198
2199
2200
2201
2202
2203 private final static class RegionServerWriter extends SinkWriter {
2204 final WALEditsReplaySink sink;
2205
2206 RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
2207 throws IOException {
2208 this.sink = new WALEditsReplaySink(conf, tableName, conn);
2209 }
2210
2211 void close() throws IOException {
2212 }
2213 }
2214
2215 static class CorruptedLogFileException extends Exception {
2216 private static final long serialVersionUID = 1L;
2217
2218 CorruptedLogFileException(String s) {
2219 super(s);
2220 }
2221 }
2222
2223
2224 public static class MutationReplay {
2225 public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
2226 this.type = type;
2227 this.mutation = mutation;
2228 if(this.mutation.getDurability() != Durability.SKIP_WAL) {
2229
2230 this.mutation.setDurability(Durability.ASYNC_WAL);
2231 }
2232 this.nonceGroup = nonceGroup;
2233 this.nonce = nonce;
2234 }
2235
2236 public final MutationType type;
2237 public final Mutation mutation;
2238 public final long nonceGroup;
2239 public final long nonce;
2240 }
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253 public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
2254 Pair<WALKey, WALEdit> logEntry, Durability durability)
2255 throws IOException {
2256 if (entry == null) {
2257
2258 return new ArrayList<MutationReplay>();
2259 }
2260
2261 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2262 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2263 int count = entry.getAssociatedCellCount();
2264 List<MutationReplay> mutations = new ArrayList<MutationReplay>();
2265 Cell previousCell = null;
2266 Mutation m = null;
2267 WALKey key = null;
2268 WALEdit val = null;
2269 if (logEntry != null) val = new WALEdit();
2270
2271 for (int i = 0; i < count; i++) {
2272
2273 if (!cells.advance()) {
2274 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
2275 }
2276 Cell cell = cells.current();
2277 if (val != null) val.add(cell);
2278
2279 boolean isNewRowOrType =
2280 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
2281 || !CellUtil.matchingRow(previousCell, cell);
2282 if (isNewRowOrType) {
2283
2284 if (CellUtil.isDelete(cell)) {
2285 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2286
2287 mutations.add(new MutationReplay(
2288 MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
2289 } else {
2290 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2291
2292 long nonceGroup = entry.getKey().hasNonceGroup()
2293 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2294 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2295 mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
2296 }
2297 }
2298 if (CellUtil.isDelete(cell)) {
2299 ((Delete) m).addDeleteMarker(cell);
2300 } else {
2301 ((Put) m).add(cell);
2302 }
2303 m.setDurability(durability);
2304 previousCell = cell;
2305 }
2306
2307
2308 if (logEntry != null) {
2309 org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKeyProto = entry.getKey();
2310 List<UUID> clusterIds = new ArrayList<UUID>(walKeyProto.getClusterIdsCount());
2311 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
2312 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
2313 }
2314
2315 key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
2316 walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
2317 clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
2318 logEntry.setFirst(key);
2319 logEntry.setSecond(val);
2320 }
2321
2322 return mutations;
2323 }
2324 }