1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24 import static org.mockito.Matchers.any;
25 import static org.mockito.Mockito.doAnswer;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.when;
28
29 import java.io.ByteArrayOutputStream;
30 import java.io.IOException;
31 import java.io.PrintStream;
32 import java.net.URL;
33 import java.util.ArrayList;
34 import java.util.List;
35
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HRegionInfo;
44 import org.apache.hadoop.hbase.HTableDescriptor;
45 import org.apache.hadoop.hbase.KeyValue;
46 import org.apache.hadoop.hbase.MediumTests;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.Delete;
49 import org.apache.hadoop.hbase.client.Durability;
50 import org.apache.hadoop.hbase.client.Get;
51 import org.apache.hadoop.hbase.client.HTable;
52 import org.apache.hadoop.hbase.client.Put;
53 import org.apache.hadoop.hbase.client.Result;
54 import org.apache.hadoop.hbase.client.ResultScanner;
55 import org.apache.hadoop.hbase.client.Scan;
56 import org.apache.hadoop.hbase.filter.Filter;
57 import org.apache.hadoop.hbase.filter.FilterBase;
58 import org.apache.hadoop.hbase.filter.PrefixFilter;
59 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
60 import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
61 import org.apache.hadoop.hbase.regionserver.wal.HLog;
62 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
63 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
64 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
65 import org.apache.hadoop.hbase.util.Bytes;
66 import org.apache.hadoop.hbase.util.LauncherSecurityManager;
67 import org.apache.hadoop.mapreduce.Job;
68 import org.apache.hadoop.mapreduce.Mapper.Context;
69 import org.apache.hadoop.util.GenericOptionsParser;
70 import org.junit.After;
71 import org.junit.AfterClass;
72 import org.junit.Assert;
73 import org.junit.Before;
74 import org.junit.BeforeClass;
75 import org.junit.Test;
76 import org.junit.experimental.categories.Category;
77 import org.mockito.invocation.InvocationOnMock;
78 import org.mockito.stubbing.Answer;
79
80
81
82
83 @Category(MediumTests.class)
84 public class TestImportExport {
85 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
86 private static final byte[] ROW1 = Bytes.toBytes("row1");
87 private static final byte[] ROW2 = Bytes.toBytes("row2");
88 private static final String FAMILYA_STRING = "a";
89 private static final String FAMILYB_STRING = "b";
90 private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
91 private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
92 private static final byte[] QUAL = Bytes.toBytes("q");
93 private static final String OUTPUT_DIR = "outputdir";
94 private static String FQ_OUTPUT_DIR;
95 private static final String EXPORT_BATCH_SIZE = "100";
96
97 private static long now = System.currentTimeMillis();
98
99 @BeforeClass
100 public static void beforeClass() throws Exception {
101 UTIL.startMiniCluster();
102 UTIL.startMiniMapReduceCluster();
103 FQ_OUTPUT_DIR = new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
104 }
105
106 @AfterClass
107 public static void afterClass() throws Exception {
108 UTIL.shutdownMiniMapReduceCluster();
109 UTIL.shutdownMiniCluster();
110 }
111
112 @Before
113 @After
114 public void cleanup() throws Exception {
115 FileSystem fs = FileSystem.get(UTIL.getConfiguration());
116 fs.delete(new Path(OUTPUT_DIR), true);
117 }
118
119
120
121
122
123
124
125
126
127 boolean runExport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
128
129 GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
130 Configuration conf = opts.getConfiguration();
131 args = opts.getRemainingArgs();
132 Job job = Export.createSubmittableJob(conf, args);
133 job.waitForCompletion(false);
134 return job.isSuccessful();
135 }
136
137
138
139
140
141
142
143
144
145 boolean runImport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
146
147 GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
148 Configuration conf = opts.getConfiguration();
149 args = opts.getRemainingArgs();
150 Job job = Import.createSubmittableJob(conf, args);
151 job.waitForCompletion(false);
152 return job.isSuccessful();
153 }
154
155
156
157
158
159 @Test
160 public void testSimpleCase() throws Exception {
161 String EXPORT_TABLE = "exportSimpleCase";
162 HTable t = UTIL.createTable(Bytes.toBytes(EXPORT_TABLE), FAMILYA, 3);
163 Put p = new Put(ROW1);
164 p.add(FAMILYA, QUAL, now, QUAL);
165 p.add(FAMILYA, QUAL, now+1, QUAL);
166 p.add(FAMILYA, QUAL, now+2, QUAL);
167 t.put(p);
168 p = new Put(ROW2);
169 p.add(FAMILYA, QUAL, now, QUAL);
170 p.add(FAMILYA, QUAL, now+1, QUAL);
171 p.add(FAMILYA, QUAL, now+2, QUAL);
172 t.put(p);
173
174 String[] args = new String[] {
175 EXPORT_TABLE,
176 FQ_OUTPUT_DIR,
177 "1000",
178 };
179 assertTrue(runExport(args));
180
181 String IMPORT_TABLE = "importTableSimpleCase";
182 t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), FAMILYB, 3);
183 args = new String[] {
184 "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
185 IMPORT_TABLE,
186 FQ_OUTPUT_DIR
187 };
188 assertTrue(runImport(args));
189
190 Get g = new Get(ROW1);
191 g.setMaxVersions();
192 Result r = t.get(g);
193 assertEquals(3, r.size());
194 g = new Get(ROW2);
195 g.setMaxVersions();
196 r = t.get(g);
197 assertEquals(3, r.size());
198 }
199
200
201
202
203
204
205 @Test
206 public void testMetaExport() throws Exception {
207 String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
208 String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" };
209 assertTrue(runExport(args));
210 }
211
212
213
214
215
216 @Test
217 public void testImport94Table() throws Exception {
218 URL url = TestImportExport.class.getResource(
219 "exportedTableIn94Format");
220 Path importPath = new Path(url.getPath());
221 FileSystem fs = FileSystem.get(UTIL.getConfiguration());
222 fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR
223 + "exportedTableIn94Format"));
224 String IMPORT_TABLE = "importTableExportedFrom94";
225 HTable t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), Bytes.toBytes("f1"), 3);
226 String[] args = new String[] {
227 "-Dhbase.import.version=0.94" ,
228 IMPORT_TABLE, FQ_OUTPUT_DIR
229 };
230 assertTrue(runImport(args));
231
232
233
234
235
236
237
238
239
240 assertEquals(5, UTIL.countRows(t));
241 t.close();
242 }
243
244
245
246
247 @Test
248 public void testExportScannerBatching() throws Exception {
249 String BATCH_TABLE = "exportWithBatch";
250 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(BATCH_TABLE));
251 desc.addFamily(new HColumnDescriptor(FAMILYA)
252 .setMaxVersions(1)
253 );
254 UTIL.getHBaseAdmin().createTable(desc);
255 HTable t = new HTable(UTIL.getConfiguration(), BATCH_TABLE);
256
257 Put p = new Put(ROW1);
258 p.add(FAMILYA, QUAL, now, QUAL);
259 p.add(FAMILYA, QUAL, now+1, QUAL);
260 p.add(FAMILYA, QUAL, now+2, QUAL);
261 p.add(FAMILYA, QUAL, now+3, QUAL);
262 p.add(FAMILYA, QUAL, now+4, QUAL);
263 t.put(p);
264
265 String[] args = new String[] {
266 "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,
267 BATCH_TABLE,
268 FQ_OUTPUT_DIR
269 };
270 assertTrue(runExport(args));
271
272 FileSystem fs = FileSystem.get(UTIL.getConfiguration());
273 fs.delete(new Path(FQ_OUTPUT_DIR), true);
274 t.close();
275 }
276
277 @Test
278 public void testWithDeletes() throws Exception {
279 String EXPORT_TABLE = "exportWithDeletes";
280 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
281 desc.addFamily(new HColumnDescriptor(FAMILYA)
282 .setMaxVersions(5)
283 .setKeepDeletedCells(true)
284 );
285 UTIL.getHBaseAdmin().createTable(desc);
286 HTable t = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
287
288 Put p = new Put(ROW1);
289 p.add(FAMILYA, QUAL, now, QUAL);
290 p.add(FAMILYA, QUAL, now+1, QUAL);
291 p.add(FAMILYA, QUAL, now+2, QUAL);
292 p.add(FAMILYA, QUAL, now+3, QUAL);
293 p.add(FAMILYA, QUAL, now+4, QUAL);
294 t.put(p);
295
296 Delete d = new Delete(ROW1, now+3);
297 t.delete(d);
298 d = new Delete(ROW1);
299 d.deleteColumns(FAMILYA, QUAL, now+2);
300 t.delete(d);
301
302 String[] args = new String[] {
303 "-D" + Export.RAW_SCAN + "=true",
304 EXPORT_TABLE,
305 FQ_OUTPUT_DIR,
306 "1000",
307 };
308 assertTrue(runExport(args));
309
310 String IMPORT_TABLE = "importWithDeletes";
311 desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
312 desc.addFamily(new HColumnDescriptor(FAMILYA)
313 .setMaxVersions(5)
314 .setKeepDeletedCells(true)
315 );
316 UTIL.getHBaseAdmin().createTable(desc);
317 t.close();
318 t = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
319 args = new String[] {
320 IMPORT_TABLE,
321 FQ_OUTPUT_DIR
322 };
323 assertTrue(runImport(args));
324
325 Scan s = new Scan();
326 s.setMaxVersions();
327 s.setRaw(true);
328 ResultScanner scanner = t.getScanner(s);
329 Result r = scanner.next();
330 Cell[] res = r.rawCells();
331 assertTrue(CellUtil.isDeleteFamily(res[0]));
332 assertEquals(now+4, res[1].getTimestamp());
333 assertEquals(now+3, res[2].getTimestamp());
334 assertTrue(CellUtil.isDelete(res[3]));
335 assertEquals(now+2, res[4].getTimestamp());
336 assertEquals(now+1, res[5].getTimestamp());
337 assertEquals(now, res[6].getTimestamp());
338 t.close();
339 }
340
341
342
343
344
345 @Test
346 public void testWithFilter() throws Exception {
347
348 String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
349 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
350 desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
351 UTIL.getHBaseAdmin().createTable(desc);
352 HTable exportTable = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
353
354 Put p = new Put(ROW1);
355 p.add(FAMILYA, QUAL, now, QUAL);
356 p.add(FAMILYA, QUAL, now + 1, QUAL);
357 p.add(FAMILYA, QUAL, now + 2, QUAL);
358 p.add(FAMILYA, QUAL, now + 3, QUAL);
359 p.add(FAMILYA, QUAL, now + 4, QUAL);
360 exportTable.put(p);
361
362
363 p = new Put(ROW2);
364 p.add(FAMILYA, QUAL, now, QUAL);
365 exportTable.put(p);
366
367 exportTable.flushCommits();
368
369
370 String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
371 assertTrue(runExport(args));
372
373
374 String IMPORT_TABLE = "importWithFilter";
375 desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
376 desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
377 UTIL.getHBaseAdmin().createTable(desc);
378
379 HTable importTable = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
380 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
381 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
382 "1000" };
383 assertTrue(runImport(args));
384
385
386 PrefixFilter filter = new PrefixFilter(ROW1);
387 int count = getCount(exportTable, filter);
388
389 Assert.assertEquals("Unexpected row count between export and import tables", count,
390 getCount(importTable, null));
391
392
393
394
395 args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
396 "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE,
397 FQ_OUTPUT_DIR, "1000" };
398 assertFalse(runImport(args));
399
400
401 exportTable.close();
402 importTable.close();
403 }
404
405
406
407
408
409
410
411
412
413 private int getCount(HTable table, Filter filter) throws IOException {
414 Scan scan = new Scan();
415 scan.setFilter(filter);
416 ResultScanner results = table.getScanner(scan);
417 int count = 0;
418 for (Result res : results) {
419 count += res.size();
420 }
421 results.close();
422 return count;
423 }
424
425
426
427
428 @Test
429 public void testImportMain() throws Exception {
430 PrintStream oldPrintStream = System.err;
431 SecurityManager SECURITY_MANAGER = System.getSecurityManager();
432 LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
433 System.setSecurityManager(newSecurityManager);
434 ByteArrayOutputStream data = new ByteArrayOutputStream();
435 String[] args = {};
436 System.setErr(new PrintStream(data));
437 try {
438 System.setErr(new PrintStream(data));
439 Import.main(args);
440 fail("should be SecurityException");
441 } catch (SecurityException e) {
442 assertEquals(-1, newSecurityManager.getExitCode());
443 assertTrue(data.toString().contains("Wrong number of arguments:"));
444 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
445 assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
446 assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
447 assertTrue(data.toString().contains("-Dmapred.reduce.tasks.speculative.execution=false"));
448 } finally {
449 System.setErr(oldPrintStream);
450 System.setSecurityManager(SECURITY_MANAGER);
451 }
452 }
453
454
455
456
457 @Test
458 public void testExportMain() throws Exception {
459 PrintStream oldPrintStream = System.err;
460 SecurityManager SECURITY_MANAGER = System.getSecurityManager();
461 LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
462 System.setSecurityManager(newSecurityManager);
463 ByteArrayOutputStream data = new ByteArrayOutputStream();
464 String[] args = {};
465 System.setErr(new PrintStream(data));
466 try {
467 System.setErr(new PrintStream(data));
468 Export.main(args);
469 fail("should be SecurityException");
470 } catch (SecurityException e) {
471 assertEquals(-1, newSecurityManager.getExitCode());
472 assertTrue(data.toString().contains("Wrong number of arguments:"));
473 assertTrue(data.toString().contains(
474 "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
475 "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
476 assertTrue(data.toString().contains("-D hbase.mapreduce.scan.column.family=<familyName>"));
477 assertTrue(data.toString().contains("-D hbase.mapreduce.include.deleted.rows=true"));
478 assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
479 assertTrue(data.toString().contains("-Dmapred.map.tasks.speculative.execution=false"));
480 assertTrue(data.toString().contains("-Dmapred.reduce.tasks.speculative.execution=false"));
481 assertTrue(data.toString().contains("-Dhbase.export.scanner.batch=10"));
482 } finally {
483 System.setErr(oldPrintStream);
484 System.setSecurityManager(SECURITY_MANAGER);
485 }
486 }
487
488
489
490
491 @SuppressWarnings({ "unchecked", "rawtypes" })
492 @Test
493 public void testKeyValueImporter() throws Exception {
494 KeyValueImporter importer = new KeyValueImporter();
495 Configuration configuration = new Configuration();
496 Context ctx = mock(Context.class);
497 when(ctx.getConfiguration()).thenReturn(configuration);
498
499 doAnswer(new Answer<Void>() {
500
501 @Override
502 public Void answer(InvocationOnMock invocation) throws Throwable {
503 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
504 KeyValue key = (KeyValue) invocation.getArguments()[1];
505 assertEquals("Key", Bytes.toString(writer.get()));
506 assertEquals("row", Bytes.toString(key.getRow()));
507 return null;
508 }
509 }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
510
511 importer.setup(ctx);
512 Result value = mock(Result.class);
513 KeyValue[] keys = {
514 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
515 Bytes.toBytes("value")),
516 new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
517 Bytes.toBytes("value1")) };
518 when(value.rawCells()).thenReturn(keys);
519 importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
520
521 }
522
523
524
525
526
527 @Test
528 public void testAddFilterAndArguments() throws IOException {
529 Configuration configuration = new Configuration();
530
531 List<String> args = new ArrayList<String>();
532 args.add("param1");
533 args.add("param2");
534
535 Import.addFilterAndArguments(configuration, FilterBase.class, args);
536 assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
537 configuration.get(Import.FILTER_CLASS_CONF_KEY));
538 assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
539 }
540
541 @Test
542 public void testDurability() throws IOException, InterruptedException, ClassNotFoundException {
543
544 String exportTableName = "exporttestDurability";
545 HTable exportTable = UTIL.createTable(Bytes.toBytes(exportTableName), FAMILYA, 3);
546
547
548 Put put = new Put(ROW1);
549 put.add(FAMILYA, QUAL, now, QUAL);
550 put.add(FAMILYA, QUAL, now + 1, QUAL);
551 put.add(FAMILYA, QUAL, now + 2, QUAL);
552 exportTable.put(put);
553
554 put = new Put(ROW2);
555 put.add(FAMILYA, QUAL, now, QUAL);
556 put.add(FAMILYA, QUAL, now + 1, QUAL);
557 put.add(FAMILYA, QUAL, now + 2, QUAL);
558 exportTable.put(put);
559
560
561 String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
562 assertTrue(runExport(args));
563
564
565 String importTableName = "importTestDurability1";
566 HTable importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
567
568
569 TableWALActionListener walListener = new TableWALActionListener(importTableName);
570 HLog hLog = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL();
571 hLog.registerWALActionsListener(walListener);
572
573
574 args =
575 new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
576 importTableName, FQ_OUTPUT_DIR };
577 assertTrue(runImport(args));
578
579 assertTrue(!walListener.isWALVisited());
580
581 assertTrue(getCount(importTable, null) == 2);
582
583
584 importTableName = "importTestDurability2";
585 importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
586 hLog.unregisterWALActionsListener(walListener);
587 walListener = new TableWALActionListener(importTableName);
588 hLog.registerWALActionsListener(walListener);
589 args = new String[] { importTableName, FQ_OUTPUT_DIR };
590 assertTrue(runImport(args));
591
592 assertTrue(walListener.isWALVisited());
593
594 assertTrue(getCount(importTable, null) == 2);
595 }
596
597
598
599
600
601 private static class TableWALActionListener implements WALActionsListener {
602
603 private String tableName;
604 private boolean isVisited = false;
605
606 public TableWALActionListener(String tableName) {
607 this.tableName = tableName;
608 }
609
610 @Override
611 public void preLogRoll(Path oldPath, Path newPath) throws IOException {
612
613 }
614
615 @Override
616 public void postLogRoll(Path oldPath, Path newPath) throws IOException {
617
618 }
619
620 @Override
621 public void preLogArchive(Path oldPath, Path newPath) throws IOException {
622
623 }
624
625 @Override
626 public void postLogArchive(Path oldPath, Path newPath) throws IOException {
627
628 }
629
630 @Override
631 public void logRollRequested() {
632
633 }
634
635 @Override
636 public void logCloseRequested() {
637
638 }
639
640 @Override
641 public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
642
643 }
644
645 @Override
646 public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
647 if (tableName.equalsIgnoreCase(htd.getNameAsString())) {
648 isVisited = true;
649 }
650 }
651
652 public boolean isWALVisited() {
653 return isVisited;
654 }
655 }
656 }