1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Collection;
28 import java.util.Deque;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.LinkedList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.Set;
36 import java.util.TreeMap;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.LinkedBlockingQueue;
42 import java.util.concurrent.ThreadPoolExecutor;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicLong;
45
46 import org.apache.commons.lang.mutable.MutableInt;
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49 import org.apache.hadoop.classification.InterfaceAudience;
50 import org.apache.hadoop.classification.InterfaceStability;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.hadoop.conf.Configured;
53 import org.apache.hadoop.fs.permission.FsPermission;
54 import org.apache.hadoop.fs.FileStatus;
55 import org.apache.hadoop.fs.FileSystem;
56 import org.apache.hadoop.fs.FileUtil;
57 import org.apache.hadoop.fs.Path;
58 import org.apache.hadoop.hbase.HBaseConfiguration;
59 import org.apache.hadoop.hbase.HColumnDescriptor;
60 import org.apache.hadoop.hbase.HConstants;
61 import org.apache.hadoop.hbase.HTableDescriptor;
62 import org.apache.hadoop.hbase.KeyValue;
63 import org.apache.hadoop.hbase.TableName;
64 import org.apache.hadoop.hbase.TableNotFoundException;
65 import org.apache.hadoop.hbase.client.HBaseAdmin;
66 import org.apache.hadoop.hbase.client.HConnection;
67 import org.apache.hadoop.hbase.client.HTable;
68 import org.apache.hadoop.hbase.client.RegionServerCallable;
69 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
70 import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
71 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
72 import org.apache.hadoop.hbase.io.Reference;
73 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
74 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
75 import org.apache.hadoop.hbase.io.hfile.HFile;
76 import org.apache.hadoop.hbase.io.hfile.HFileContext;
77 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
78 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
79 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
80 import org.apache.hadoop.hbase.regionserver.BloomType;
81 import org.apache.hadoop.hbase.regionserver.HStore;
82 import org.apache.hadoop.hbase.regionserver.StoreFile;
83 import org.apache.hadoop.hbase.security.UserProvider;
84 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
85 import org.apache.hadoop.hbase.util.Bytes;
86 import org.apache.hadoop.hbase.util.FSHDFSUtils;
87 import org.apache.hadoop.hbase.util.Pair;
88 import org.apache.hadoop.util.Tool;
89 import org.apache.hadoop.util.ToolRunner;
90
91 import com.google.common.collect.HashMultimap;
92 import com.google.common.collect.Multimap;
93 import com.google.common.collect.Multimaps;
94 import com.google.common.util.concurrent.ThreadFactoryBuilder;
95
96
97
98
99
100 @InterfaceAudience.Public
101 @InterfaceStability.Stable
102 public class LoadIncrementalHFiles extends Configured implements Tool {
103 private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
104 static final AtomicLong regionCount = new AtomicLong(0);
105 private HBaseAdmin hbAdmin;
106
107 public static final String NAME = "completebulkload";
108 public static final String MAX_FILES_PER_REGION_PER_FAMILY
109 = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
110 private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
111
112 private int maxFilesPerRegionPerFamily;
113 private boolean assignSeqIds;
114
115
116 private FileSystem fs;
117
118 private FsDelegationToken fsDelegationToken;
119 private String bulkToken;
120 private UserProvider userProvider;
121
122 public LoadIncrementalHFiles(Configuration conf) throws Exception {
123 super(conf);
124
125 setConf(HBaseConfiguration.create(getConf()));
126
127 getConf().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
128 this.hbAdmin = new HBaseAdmin(conf);
129 this.userProvider = UserProvider.instantiate(conf);
130 this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
131 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
132 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
133 }
134
135 private void usage() {
136 System.err.println("usage: " + NAME +
137 " /path/to/hfileoutputformat-output " +
138 "tablename");
139 }
140
141
142
143
144
145
146
147
148
149 static class LoadQueueItem {
150 final byte[] family;
151 final Path hfilePath;
152
153 public LoadQueueItem(byte[] family, Path hfilePath) {
154 this.family = family;
155 this.hfilePath = hfilePath;
156 }
157
158 public String toString() {
159 return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
160 }
161 }
162
163
164
165
166
167 private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
168 throws IOException {
169 fs = hfofDir.getFileSystem(getConf());
170
171 if (!fs.exists(hfofDir)) {
172 throw new FileNotFoundException("HFileOutputFormat dir " +
173 hfofDir + " not found");
174 }
175
176 FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
177 if (familyDirStatuses == null) {
178 throw new FileNotFoundException("No families found in " + hfofDir);
179 }
180
181 for (FileStatus stat : familyDirStatuses) {
182 if (!stat.isDir()) {
183 LOG.warn("Skipping non-directory " + stat.getPath());
184 continue;
185 }
186 Path familyDir = stat.getPath();
187
188 if (familyDir.getName().startsWith("_")) continue;
189 byte[] family = familyDir.getName().getBytes();
190 Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
191 for (Path hfile : hfiles) {
192 if (hfile.getName().startsWith("_")) continue;
193 ret.add(new LoadQueueItem(family, hfile));
194 }
195 }
196 }
197
198
199
200
201
202
203
204
205
206
207 @SuppressWarnings("deprecation")
208 public void doBulkLoad(Path hfofDir, final HTable table)
209 throws TableNotFoundException, IOException
210 {
211 final HConnection conn = table.getConnection();
212
213 if (!conn.isTableAvailable(table.getName())) {
214 throw new TableNotFoundException("Table " +
215 Bytes.toStringBinary(table.getTableName()) +
216 "is not currently available.");
217 }
218
219
220 int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
221 Runtime.getRuntime().availableProcessors());
222 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
223 builder.setNameFormat("LoadIncrementalHFiles-%1$d");
224 ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
225 60, TimeUnit.SECONDS,
226 new LinkedBlockingQueue<Runnable>(),
227 builder.build());
228 ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
229
230
231
232 Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
233 try {
234 discoverLoadQueue(queue, hfofDir);
235
236 Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
237 ArrayList<String> familyNames = new ArrayList<String>();
238 for (HColumnDescriptor family : families) {
239 familyNames.add(family.getNameAsString());
240 }
241 ArrayList<String> unmatchedFamilies = new ArrayList<String>();
242 for (LoadQueueItem lqi : queue) {
243 String familyNameInHFile = Bytes.toString(lqi.family);
244 if (!familyNames.contains(familyNameInHFile)) {
245 unmatchedFamilies.add(familyNameInHFile);
246 }
247 }
248 if (unmatchedFamilies.size() > 0) {
249 String msg =
250 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
251 + unmatchedFamilies + "; valid family names of table "
252 + Bytes.toString(table.getTableName()) + " are: " + familyNames;
253 LOG.error(msg);
254 throw new IOException(msg);
255 }
256 int count = 0;
257
258 if (queue.isEmpty()) {
259 LOG.warn("Bulk load operation did not find any files to load in " +
260 "directory " + hfofDir.toUri() + ". Does it contain files in " +
261 "subdirectories that correspond to column family names?");
262 return;
263 }
264
265
266
267 if (userProvider.isHBaseSecurityEnabled()) {
268
269 fsDelegationToken.acquireDelegationToken(fs);
270
271 bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
272 }
273
274
275 while (!queue.isEmpty()) {
276
277 final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys();
278 if (count != 0) {
279 LOG.info("Split occured while grouping HFiles, retry attempt " +
280 + count + " with " + queue.size() + " files remaining to group or split");
281 }
282
283 int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 0);
284 if (maxRetries != 0 && count >= maxRetries) {
285 LOG.error("Retry attempted " + count + " times without completing, bailing out");
286 return;
287 }
288 count++;
289
290
291 Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
292 pool, queue, startEndKeys);
293
294 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
295
296 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
297 + " hfiles to one family of one region");
298 }
299
300 bulkLoadPhase(table, conn, pool, queue, regionGroups);
301
302
303
304
305 }
306
307 } finally {
308 if (userProvider.isHBaseSecurityEnabled()) {
309 fsDelegationToken.releaseDelegationToken();
310
311 if(bulkToken != null) {
312 new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken);
313 }
314 }
315 pool.shutdown();
316 if (queue != null && !queue.isEmpty()) {
317 StringBuilder err = new StringBuilder();
318 err.append("-------------------------------------------------\n");
319 err.append("Bulk load aborted with some files not yet loaded:\n");
320 err.append("-------------------------------------------------\n");
321 for (LoadQueueItem q : queue) {
322 err.append(" ").append(q.hfilePath).append('\n');
323 }
324 LOG.error(err);
325 }
326 }
327
328 if (queue != null && !queue.isEmpty()) {
329 throw new RuntimeException("Bulk load aborted with some files not yet loaded."
330 + "Please check log for more details.");
331 }
332 }
333
334
335
336
337
338
339 protected void bulkLoadPhase(final HTable table, final HConnection conn,
340 ExecutorService pool, Deque<LoadQueueItem> queue,
341 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
342
343 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
344 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
345 final byte[] first = e.getKey().array();
346 final Collection<LoadQueueItem> lqis = e.getValue();
347
348 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
349 public List<LoadQueueItem> call() throws Exception {
350 List<LoadQueueItem> toRetry =
351 tryAtomicRegionLoad(conn, table.getName(), first, lqis);
352 return toRetry;
353 }
354 };
355 loadingFutures.add(pool.submit(call));
356 }
357
358
359 for (Future<List<LoadQueueItem>> future : loadingFutures) {
360 try {
361 List<LoadQueueItem> toRetry = future.get();
362
363
364 queue.addAll(toRetry);
365
366 } catch (ExecutionException e1) {
367 Throwable t = e1.getCause();
368 if (t instanceof IOException) {
369
370
371 throw new IOException("BulkLoad encountered an unrecoverable problem", t);
372 }
373 LOG.error("Unexpected execution exception during bulk load", e1);
374 throw new IllegalStateException(t);
375 } catch (InterruptedException e1) {
376 LOG.error("Unexpected interrupted exception during bulk load", e1);
377 throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
378 }
379 }
380 }
381
382 private boolean checkHFilesCountPerRegionPerFamily(
383 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
384 for (Entry<ByteBuffer,
385 ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
386 final Collection<LoadQueueItem> lqis = e.getValue();
387 HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>();
388 for (LoadQueueItem lqi: lqis) {
389 MutableInt count = filesMap.get(lqi.family);
390 if (count == null) {
391 count = new MutableInt();
392 filesMap.put(lqi.family, count);
393 }
394 count.increment();
395 if (count.intValue() > maxFilesPerRegionPerFamily) {
396 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily
397 + " hfiles to family " + Bytes.toStringBinary(lqi.family)
398 + " of region with start key "
399 + Bytes.toStringBinary(e.getKey()));
400 return false;
401 }
402 }
403 }
404 return true;
405 }
406
407
408
409
410
411 private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final HTable table,
412 ExecutorService pool, Deque<LoadQueueItem> queue,
413 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
414
415
416 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
417 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
418
419
420 Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
421 while (!queue.isEmpty()) {
422 final LoadQueueItem item = queue.remove();
423
424 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
425 public List<LoadQueueItem> call() throws Exception {
426 List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table, startEndKeys);
427 return splits;
428 }
429 };
430 splittingFutures.add(pool.submit(call));
431 }
432
433
434 for (Future<List<LoadQueueItem>> lqis : splittingFutures) {
435 try {
436 List<LoadQueueItem> splits = lqis.get();
437 if (splits != null) {
438 queue.addAll(splits);
439 }
440 } catch (ExecutionException e1) {
441 Throwable t = e1.getCause();
442 if (t instanceof IOException) {
443 LOG.error("IOException during splitting", e1);
444 throw (IOException)t;
445 }
446 LOG.error("Unexpected execution exception during splitting", e1);
447 throw new IllegalStateException(t);
448 } catch (InterruptedException e1) {
449 LOG.error("Unexpected interrupted exception during splitting", e1);
450 throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
451 }
452 }
453 return regionGroups;
454 }
455
456
457 String getUniqueName(TableName tableName) {
458 String name = tableName + "," + regionCount.incrementAndGet();
459 return name;
460 }
461
462 protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
463 final HTable table, byte[] startKey,
464 byte[] splitKey) throws IOException {
465 final Path hfilePath = item.hfilePath;
466
467
468
469 final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
470
471 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
472 "region. Splitting...");
473
474 String uniqueName = getUniqueName(table.getName());
475 HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
476 Path botOut = new Path(tmpDir, uniqueName + ".bottom");
477 Path topOut = new Path(tmpDir, uniqueName + ".top");
478 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
479 botOut, topOut);
480
481 FileSystem fs = tmpDir.getFileSystem(getConf());
482 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
483 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
484
485
486
487 List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
488 lqis.add(new LoadQueueItem(item.family, botOut));
489 lqis.add(new LoadQueueItem(item.family, topOut));
490
491 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
492 return lqis;
493 }
494
495
496
497
498
499
500
501
502
503
504 protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
505 final LoadQueueItem item, final HTable table,
506 final Pair<byte[][], byte[][]> startEndKeys)
507 throws IOException {
508 final Path hfilePath = item.hfilePath;
509 HFile.Reader hfr = HFile.createReader(fs, hfilePath,
510 new CacheConfig(getConf()), getConf());
511 final byte[] first, last;
512 try {
513 hfr.loadFileInfo();
514 first = hfr.getFirstRowKey();
515 last = hfr.getLastRowKey();
516 } finally {
517 hfr.close();
518 }
519
520 LOG.info("Trying to load hfile=" + hfilePath +
521 " first=" + Bytes.toStringBinary(first) +
522 " last=" + Bytes.toStringBinary(last));
523 if (first == null || last == null) {
524 assert first == null && last == null;
525
526 LOG.info("hfile " + hfilePath + " has no entries, skipping");
527 return null;
528 }
529 if (Bytes.compareTo(first, last) > 0) {
530 throw new IllegalArgumentException(
531 "Invalid range: " + Bytes.toStringBinary(first) +
532 " > " + Bytes.toStringBinary(last));
533 }
534 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
535 Bytes.BYTES_COMPARATOR);
536 if (idx < 0) {
537
538
539 idx = -(idx + 1) - 1;
540 }
541 final int indexForCallable = idx;
542
543
544
545
546
547
548 if (indexForCallable < 0) {
549 throw new IOException("The first region info for table "
550 + Bytes.toString(table.getTableName())
551 + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
552 } else if ((indexForCallable == startEndKeys.getFirst().length - 1)
553 && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
554 throw new IOException("The last region info for table "
555 + Bytes.toString(table.getTableName())
556 + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
557 } else if (indexForCallable + 1 < startEndKeys.getFirst().length
558 && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
559 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
560 throw new IOException("The endkey of one region for table "
561 + Bytes.toString(table.getTableName())
562 + " is not equal to the startkey of the next region in hbase:meta."
563 + "Please use hbck tool to fix it first.");
564 }
565
566 boolean lastKeyInRange =
567 Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
568 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
569 if (!lastKeyInRange) {
570 List<LoadQueueItem> lqis = splitStoreFile(item, table,
571 startEndKeys.getFirst()[indexForCallable],
572 startEndKeys.getSecond()[indexForCallable]);
573 return lqis;
574 }
575
576
577 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
578 return null;
579 }
580
581
582
583
584 @Deprecated
585 protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
586 final byte [] tableName, final byte[] first, Collection<LoadQueueItem> lqis)
587 throws IOException {
588 return tryAtomicRegionLoad(conn, TableName.valueOf(tableName), first, lqis);
589 }
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604 protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
605 final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
606 throws IOException {
607 final List<Pair<byte[], String>> famPaths =
608 new ArrayList<Pair<byte[], String>>(lqis.size());
609 for (LoadQueueItem lqi : lqis) {
610 famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
611 }
612
613 final RegionServerCallable<Boolean> svrCallable =
614 new RegionServerCallable<Boolean>(conn, tableName, first) {
615 @Override
616 public Boolean call() throws Exception {
617 SecureBulkLoadClient secureClient = null;
618 boolean success = false;
619
620 try {
621 LOG.debug("Going to connect to server " + getLocation() + " for row "
622 + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
623 byte[] regionName = getLocation().getRegionInfo().getRegionName();
624 if(!userProvider.isHBaseSecurityEnabled()) {
625 success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
626 } else {
627 HTable table = new HTable(conn.getConfiguration(), getTableName());
628 secureClient = new SecureBulkLoadClient(table);
629 success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
630 bulkToken, getLocation().getRegionInfo().getStartKey());
631 }
632 return success;
633 } finally {
634
635
636
637 if(secureClient != null && !success) {
638 FileSystem targetFs = FileSystem.get(getConf());
639
640
641
642 if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
643 for(Pair<byte[], String> el : famPaths) {
644 Path hfileStagingPath = null;
645 Path hfileOrigPath = new Path(el.getSecond());
646 try {
647 hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
648 hfileOrigPath.getName());
649 if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
650 LOG.debug("Moved back file " + hfileOrigPath + " from " +
651 hfileStagingPath);
652 } else if(targetFs.exists(hfileStagingPath)){
653 LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
654 hfileStagingPath);
655 }
656 } catch(Exception ex) {
657 LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
658 hfileStagingPath, ex);
659 }
660 }
661 }
662 }
663 }
664 }
665 };
666
667 try {
668 List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
669 Configuration conf = getConf();
670 boolean success = RpcRetryingCallerFactory.instantiate(conf).<Boolean> newCaller()
671 .callWithRetries(svrCallable);
672 if (!success) {
673 LOG.warn("Attempt to bulk load region containing "
674 + Bytes.toStringBinary(first) + " into table "
675 + tableName + " with files " + lqis
676 + " failed. This is recoverable and they will be retried.");
677 toRetry.addAll(lqis);
678 }
679
680 return toRetry;
681 } catch (IOException e) {
682 LOG.error("Encountered unrecoverable error from region server", e);
683 throw e;
684 }
685 }
686
687
688
689
690
691 static void splitStoreFile(
692 Configuration conf, Path inFile,
693 HColumnDescriptor familyDesc, byte[] splitKey,
694 Path bottomOut, Path topOut) throws IOException
695 {
696
697 Reference topReference = Reference.createTopReference(splitKey);
698 Reference bottomReference = Reference.createBottomReference(splitKey);
699
700 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
701 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
702 }
703
704
705
706
707 private static void copyHFileHalf(
708 Configuration conf, Path inFile, Path outFile, Reference reference,
709 HColumnDescriptor familyDescriptor)
710 throws IOException {
711 FileSystem fs = inFile.getFileSystem(conf);
712 CacheConfig cacheConf = new CacheConfig(conf);
713 HalfStoreFileReader halfReader = null;
714 StoreFile.Writer halfWriter = null;
715 try {
716 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf);
717 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
718
719 int blocksize = familyDescriptor.getBlocksize();
720 Algorithm compression = familyDescriptor.getCompression();
721 BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
722 HFileContext hFileContext = new HFileContextBuilder()
723 .withCompression(compression)
724 .withChecksumType(HStore.getChecksumType(conf))
725 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
726 .withBlockSize(blocksize)
727 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
728 .build();
729 halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
730 fs)
731 .withFilePath(outFile)
732 .withBloomType(bloomFilterType)
733 .withFileContext(hFileContext)
734 .build();
735 HFileScanner scanner = halfReader.getScanner(false, false, false);
736 scanner.seekTo();
737 do {
738 KeyValue kv = scanner.getKeyValue();
739 halfWriter.append(kv);
740 } while (scanner.next());
741
742 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
743 if (shouldCopyHFileMetaKey(entry.getKey())) {
744 halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
745 }
746 }
747 } finally {
748 if (halfWriter != null) halfWriter.close();
749 if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose());
750 }
751 }
752
753 private static boolean shouldCopyHFileMetaKey(byte[] key) {
754 return !HFile.isReservedFileInfoKey(key);
755 }
756
757 private boolean doesTableExist(TableName tableName) throws Exception {
758 return hbAdmin.tableExists(tableName);
759 }
760
761
762
763
764
765
766
767
768
769
770
771
772
773 public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
774 ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
775 int runningValue = 0;
776 byte[] currStartKey = null;
777 boolean firstBoundary = true;
778
779 for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
780 if (runningValue == 0) currStartKey = item.getKey();
781 runningValue += item.getValue();
782 if (runningValue == 0) {
783 if (!firstBoundary) keysArray.add(currStartKey);
784 firstBoundary = false;
785 }
786 }
787
788 return keysArray.toArray(new byte[0][0]);
789 }
790
791
792
793
794
795 private void createTable(TableName tableName, String dirPath) throws Exception {
796 Path hfofDir = new Path(dirPath);
797 FileSystem fs = hfofDir.getFileSystem(getConf());
798
799 if (!fs.exists(hfofDir)) {
800 throw new FileNotFoundException("HFileOutputFormat dir " +
801 hfofDir + " not found");
802 }
803
804 FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
805 if (familyDirStatuses == null) {
806 throw new FileNotFoundException("No families found in " + hfofDir);
807 }
808
809 HTableDescriptor htd = new HTableDescriptor(tableName);
810 HColumnDescriptor hcd;
811
812
813
814 byte[][] keys;
815 TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
816
817 for (FileStatus stat : familyDirStatuses) {
818 if (!stat.isDir()) {
819 LOG.warn("Skipping non-directory " + stat.getPath());
820 continue;
821 }
822 Path familyDir = stat.getPath();
823
824 if (familyDir.getName().startsWith("_")) continue;
825 byte[] family = familyDir.getName().getBytes();
826
827 hcd = new HColumnDescriptor(family);
828 htd.addFamily(hcd);
829
830 Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
831 for (Path hfile : hfiles) {
832 if (hfile.getName().startsWith("_")) continue;
833 HFile.Reader reader = HFile.createReader(fs, hfile,
834 new CacheConfig(getConf()), getConf());
835 final byte[] first, last;
836 try {
837 if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
838 hcd.setCompressionType(reader.getFileContext().getCompression());
839 LOG.info("Setting compression " + hcd.getCompressionType().name() +
840 " for family " + hcd.toString());
841 }
842 reader.loadFileInfo();
843 first = reader.getFirstRowKey();
844 last = reader.getLastRowKey();
845
846 LOG.info("Trying to figure out region boundaries hfile=" + hfile +
847 " first=" + Bytes.toStringBinary(first) +
848 " last=" + Bytes.toStringBinary(last));
849
850
851 Integer value = map.containsKey(first)? map.get(first):0;
852 map.put(first, value+1);
853
854 value = map.containsKey(last)? map.get(last):0;
855 map.put(last, value-1);
856 } finally {
857 reader.close();
858 }
859 }
860 }
861
862 keys = LoadIncrementalHFiles.inferBoundaries(map);
863 this.hbAdmin.createTable(htd,keys);
864
865 LOG.info("Table "+ tableName +" is available!!");
866 }
867
868 @Override
869 public int run(String[] args) throws Exception {
870 if (args.length != 2) {
871 usage();
872 return -1;
873 }
874
875 String dirPath = args[0];
876 TableName tableName = TableName.valueOf(args[1]);
877
878 boolean tableExists = this.doesTableExist(tableName);
879 if (!tableExists) this.createTable(tableName,dirPath);
880
881 Path hfofDir = new Path(dirPath);
882 HTable table = new HTable(getConf(), tableName);
883
884 doBulkLoad(hfofDir, table);
885 return 0;
886 }
887
888 public static void main(String[] args) throws Exception {
889 Configuration conf = HBaseConfiguration.create();
890 int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), args);
891 System.exit(ret);
892 }
893
894 }