View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  import static org.mockito.Matchers.any;
25  import static org.mockito.Mockito.doAnswer;
26  import static org.mockito.Mockito.mock;
27  import static org.mockito.Mockito.when;
28  
29  import java.io.ByteArrayOutputStream;
30  import java.io.IOException;
31  import java.io.PrintStream;
32  import java.net.URL;
33  import java.util.ArrayList;
34  import java.util.List;
35  
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.MediumTests;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.Delete;
49  import org.apache.hadoop.hbase.client.Durability;
50  import org.apache.hadoop.hbase.client.Get;
51  import org.apache.hadoop.hbase.client.HTable;
52  import org.apache.hadoop.hbase.client.Put;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.client.ResultScanner;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.filter.Filter;
57  import org.apache.hadoop.hbase.filter.FilterBase;
58  import org.apache.hadoop.hbase.filter.PrefixFilter;
59  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
60  import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
61  import org.apache.hadoop.hbase.regionserver.wal.HLog;
62  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
63  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
64  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.LauncherSecurityManager;
67  import org.apache.hadoop.mapreduce.Job;
68  import org.apache.hadoop.mapreduce.Mapper.Context;
69  import org.apache.hadoop.util.GenericOptionsParser;
70  import org.junit.After;
71  import org.junit.AfterClass;
72  import org.junit.Assert;
73  import org.junit.Before;
74  import org.junit.BeforeClass;
75  import org.junit.Test;
76  import org.junit.experimental.categories.Category;
77  import org.mockito.invocation.InvocationOnMock;
78  import org.mockito.stubbing.Answer;
79  
80  /**
81   * Tests the table import and table export MR job functionality
82   */
83  @Category(MediumTests.class)
84  public class TestImportExport {
85    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
86    private static final byte[] ROW1 = Bytes.toBytes("row1");
87    private static final byte[] ROW2 = Bytes.toBytes("row2");
88    private static final String FAMILYA_STRING = "a";
89    private static final String FAMILYB_STRING = "b";
90    private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
91    private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
92    private static final byte[] QUAL = Bytes.toBytes("q");
93    private static final String OUTPUT_DIR = "outputdir";
94    private static String FQ_OUTPUT_DIR;
95    private static final String EXPORT_BATCH_SIZE = "100";
96  
97    private static long now = System.currentTimeMillis();
98  
99    @BeforeClass
100   public static void beforeClass() throws Exception {
101     UTIL.startMiniCluster();
102     UTIL.startMiniMapReduceCluster();
103     FQ_OUTPUT_DIR =  new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
104   }
105 
106   @AfterClass
107   public static void afterClass() throws Exception {
108     UTIL.shutdownMiniMapReduceCluster();
109     UTIL.shutdownMiniCluster();
110   }
111 
112   @Before
113   @After
114   public void cleanup() throws Exception {
115     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
116     fs.delete(new Path(OUTPUT_DIR), true);
117   }
118 
119   /**
120    * Runs an export job with the specified command line args
121    * @param args
122    * @return true if job completed successfully
123    * @throws IOException
124    * @throws InterruptedException
125    * @throws ClassNotFoundException
126    */
127   boolean runExport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
128     // need to make a copy of the configuration because to make sure different temp dirs are used.
129     GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
130     Configuration conf = opts.getConfiguration();
131     args = opts.getRemainingArgs();
132     Job job = Export.createSubmittableJob(conf, args);
133     job.waitForCompletion(false);
134     return job.isSuccessful();
135   }
136 
137   /**
138    * Runs an import job with the specified command line args
139    * @param args
140    * @return true if job completed successfully
141    * @throws IOException
142    * @throws InterruptedException
143    * @throws ClassNotFoundException
144    */
145   boolean runImport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
146     // need to make a copy of the configuration because to make sure different temp dirs are used.
147     GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
148     Configuration conf = opts.getConfiguration();
149     args = opts.getRemainingArgs();
150     Job job = Import.createSubmittableJob(conf, args);
151     job.waitForCompletion(false);
152     return job.isSuccessful();
153   }
154 
155   /**
156    * Test simple replication case with column mapping
157    * @throws Exception
158    */
159   @Test
160   public void testSimpleCase() throws Exception {
161     String EXPORT_TABLE = "exportSimpleCase";
162     HTable t = UTIL.createTable(Bytes.toBytes(EXPORT_TABLE), FAMILYA, 3);
163     Put p = new Put(ROW1);
164     p.add(FAMILYA, QUAL, now, QUAL);
165     p.add(FAMILYA, QUAL, now+1, QUAL);
166     p.add(FAMILYA, QUAL, now+2, QUAL);
167     t.put(p);
168     p = new Put(ROW2);
169     p.add(FAMILYA, QUAL, now, QUAL);
170     p.add(FAMILYA, QUAL, now+1, QUAL);
171     p.add(FAMILYA, QUAL, now+2, QUAL);
172     t.put(p);
173 
174     String[] args = new String[] {
175         EXPORT_TABLE,
176         FQ_OUTPUT_DIR,
177         "1000", // max number of key versions per key to export
178     };
179     assertTrue(runExport(args));
180 
181     String IMPORT_TABLE = "importTableSimpleCase";
182     t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), FAMILYB, 3);
183     args = new String[] {
184         "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
185         IMPORT_TABLE,
186         FQ_OUTPUT_DIR
187     };
188     assertTrue(runImport(args));
189 
190     Get g = new Get(ROW1);
191     g.setMaxVersions();
192     Result r = t.get(g);
193     assertEquals(3, r.size());
194     g = new Get(ROW2);
195     g.setMaxVersions();
196     r = t.get(g);
197     assertEquals(3, r.size());
198   }
199 
200   /**
201    * Test export hbase:meta table
202    *
203    * @throws Exception
204    */
205   @Test
206   public void testMetaExport() throws Exception {
207     String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
208     String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" };
209     assertTrue(runExport(args));
210   }
211 
212   /**
213    * Test import data from 0.94 exported file
214    * @throws Exception
215    */
216   @Test
217   public void testImport94Table() throws Exception {
218     URL url = TestImportExport.class.getResource(
219         "exportedTableIn94Format");
220     Path importPath = new Path(url.getPath());
221     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
222     fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR
223         + "exportedTableIn94Format"));
224     String IMPORT_TABLE = "importTableExportedFrom94";
225     HTable t = UTIL.createTable(Bytes.toBytes(IMPORT_TABLE), Bytes.toBytes("f1"), 3);
226     String[] args = new String[] {
227         "-Dhbase.import.version=0.94" ,
228         IMPORT_TABLE, FQ_OUTPUT_DIR
229     };
230     assertTrue(runImport(args));
231 
232     /* exportedTableIn94Format contains 5 rows
233      ROW         COLUMN+CELL
234      r1          column=f1:c1, timestamp=1383766761171, value=val1
235      r2          column=f1:c1, timestamp=1383766771642, value=val2
236      r3          column=f1:c1, timestamp=1383766777615, value=val3
237      r4          column=f1:c1, timestamp=1383766785146, value=val4
238      r5          column=f1:c1, timestamp=1383766791506, value=val5
239      */
240     assertEquals(5, UTIL.countRows(t));
241     t.close();
242   }
243 
244   /**
245    * Test export scanner batching
246    */
247    @Test
248    public void testExportScannerBatching() throws Exception {
249     String BATCH_TABLE = "exportWithBatch";
250     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(BATCH_TABLE));
251     desc.addFamily(new HColumnDescriptor(FAMILYA)
252         .setMaxVersions(1)
253     );
254     UTIL.getHBaseAdmin().createTable(desc);
255     HTable t = new HTable(UTIL.getConfiguration(), BATCH_TABLE);
256 
257     Put p = new Put(ROW1);
258     p.add(FAMILYA, QUAL, now, QUAL);
259     p.add(FAMILYA, QUAL, now+1, QUAL);
260     p.add(FAMILYA, QUAL, now+2, QUAL);
261     p.add(FAMILYA, QUAL, now+3, QUAL);
262     p.add(FAMILYA, QUAL, now+4, QUAL);
263     t.put(p);
264 
265     String[] args = new String[] {
266         "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,  // added scanner batching arg.
267         BATCH_TABLE,
268         FQ_OUTPUT_DIR
269     };
270     assertTrue(runExport(args));
271 
272     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
273     fs.delete(new Path(FQ_OUTPUT_DIR), true);
274     t.close();
275   }
276 
277   @Test
278   public void testWithDeletes() throws Exception {
279     String EXPORT_TABLE = "exportWithDeletes";
280     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
281     desc.addFamily(new HColumnDescriptor(FAMILYA)
282         .setMaxVersions(5)
283         .setKeepDeletedCells(true)
284     );
285     UTIL.getHBaseAdmin().createTable(desc);
286     HTable t = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
287 
288     Put p = new Put(ROW1);
289     p.add(FAMILYA, QUAL, now, QUAL);
290     p.add(FAMILYA, QUAL, now+1, QUAL);
291     p.add(FAMILYA, QUAL, now+2, QUAL);
292     p.add(FAMILYA, QUAL, now+3, QUAL);
293     p.add(FAMILYA, QUAL, now+4, QUAL);
294     t.put(p);
295 
296     Delete d = new Delete(ROW1, now+3);
297     t.delete(d);
298     d = new Delete(ROW1);
299     d.deleteColumns(FAMILYA, QUAL, now+2);
300     t.delete(d);
301 
302     String[] args = new String[] {
303         "-D" + Export.RAW_SCAN + "=true",
304         EXPORT_TABLE,
305         FQ_OUTPUT_DIR,
306         "1000", // max number of key versions per key to export
307     };
308     assertTrue(runExport(args));
309 
310     String IMPORT_TABLE = "importWithDeletes";
311     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
312     desc.addFamily(new HColumnDescriptor(FAMILYA)
313         .setMaxVersions(5)
314         .setKeepDeletedCells(true)
315     );
316     UTIL.getHBaseAdmin().createTable(desc);
317     t.close();
318     t = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
319     args = new String[] {
320         IMPORT_TABLE,
321         FQ_OUTPUT_DIR
322     };
323     assertTrue(runImport(args));
324 
325     Scan s = new Scan();
326     s.setMaxVersions();
327     s.setRaw(true);
328     ResultScanner scanner = t.getScanner(s);
329     Result r = scanner.next();
330     Cell[] res = r.rawCells();
331     assertTrue(CellUtil.isDeleteFamily(res[0]));
332     assertEquals(now+4, res[1].getTimestamp());
333     assertEquals(now+3, res[2].getTimestamp());
334     assertTrue(CellUtil.isDelete(res[3]));
335     assertEquals(now+2, res[4].getTimestamp());
336     assertEquals(now+1, res[5].getTimestamp());
337     assertEquals(now, res[6].getTimestamp());
338     t.close();
339   }
340 
341   /**
342    * Create a simple table, run an Export Job on it, Import with filtering on,  verify counts,
343    * attempt with invalid values.
344    */
345   @Test
346   public void testWithFilter() throws Exception {
347     // Create simple table to export
348     String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
349     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
350     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
351     UTIL.getHBaseAdmin().createTable(desc);
352     HTable exportTable = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
353 
354     Put p = new Put(ROW1);
355     p.add(FAMILYA, QUAL, now, QUAL);
356     p.add(FAMILYA, QUAL, now + 1, QUAL);
357     p.add(FAMILYA, QUAL, now + 2, QUAL);
358     p.add(FAMILYA, QUAL, now + 3, QUAL);
359     p.add(FAMILYA, QUAL, now + 4, QUAL);
360     exportTable.put(p);
361 
362     // Having another row would actually test the filter.
363     p = new Put(ROW2);
364     p.add(FAMILYA, QUAL, now, QUAL);
365     exportTable.put(p);
366     // Flush the commits.
367     exportTable.flushCommits();
368 
369     // Export the simple table
370     String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
371     assertTrue(runExport(args));
372 
373     // Import to a new table
374     String IMPORT_TABLE = "importWithFilter";
375     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
376     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
377     UTIL.getHBaseAdmin().createTable(desc);
378 
379     HTable importTable = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
380     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
381         "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
382         "1000" };
383     assertTrue(runImport(args));
384 
385     // get the count of the source table for that time range
386     PrefixFilter filter = new PrefixFilter(ROW1);
387     int count = getCount(exportTable, filter);
388 
389     Assert.assertEquals("Unexpected row count between export and import tables", count,
390       getCount(importTable, null));
391 
392     // and then test that a broken command doesn't bork everything - easier here because we don't
393     // need to re-run the export job
394 
395     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
396         "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE,
397         FQ_OUTPUT_DIR, "1000" };
398     assertFalse(runImport(args));
399 
400     // cleanup
401     exportTable.close();
402     importTable.close();
403   }
404 
405   /**
406    * Count the number of keyvalues in the specified table for the given timerange
407    * @param start
408    * @param end
409    * @param table
410    * @return
411    * @throws IOException
412    */
413   private int getCount(HTable table, Filter filter) throws IOException {
414     Scan scan = new Scan();
415     scan.setFilter(filter);
416     ResultScanner results = table.getScanner(scan);
417     int count = 0;
418     for (Result res : results) {
419       count += res.size();
420     }
421     results.close();
422     return count;
423   }
424   
425   /**
426    * test main method. Import should print help and call System.exit
427    */
428   @Test
429   public void testImportMain() throws Exception {
430     PrintStream oldPrintStream = System.err;
431     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
432     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
433     System.setSecurityManager(newSecurityManager);
434     ByteArrayOutputStream data = new ByteArrayOutputStream();
435     String[] args = {};
436     System.setErr(new PrintStream(data));
437     try {
438       System.setErr(new PrintStream(data));
439       Import.main(args);
440       fail("should be SecurityException");
441     } catch (SecurityException e) {
442       assertEquals(-1, newSecurityManager.getExitCode());
443       assertTrue(data.toString().contains("Wrong number of arguments:"));
444       assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
445       assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
446       assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
447       assertTrue(data.toString().contains("-Dmapred.reduce.tasks.speculative.execution=false"));
448     } finally {
449       System.setErr(oldPrintStream);
450       System.setSecurityManager(SECURITY_MANAGER);
451     }
452   }
453 
454   /**
455    * test main method. Export should print help and call System.exit
456    */
457   @Test
458   public void testExportMain() throws Exception {
459     PrintStream oldPrintStream = System.err;
460     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
461     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
462     System.setSecurityManager(newSecurityManager);
463     ByteArrayOutputStream data = new ByteArrayOutputStream();
464     String[] args = {};
465     System.setErr(new PrintStream(data));
466     try {
467       System.setErr(new PrintStream(data));
468       Export.main(args);
469       fail("should be SecurityException");
470     } catch (SecurityException e) {
471       assertEquals(-1, newSecurityManager.getExitCode());
472       assertTrue(data.toString().contains("Wrong number of arguments:"));
473       assertTrue(data.toString().contains(
474               "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
475               "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
476       assertTrue(data.toString().contains("-D hbase.mapreduce.scan.column.family=<familyName>"));
477       assertTrue(data.toString().contains("-D hbase.mapreduce.include.deleted.rows=true"));
478       assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
479       assertTrue(data.toString().contains("-Dmapred.map.tasks.speculative.execution=false"));
480       assertTrue(data.toString().contains("-Dmapred.reduce.tasks.speculative.execution=false"));
481       assertTrue(data.toString().contains("-Dhbase.export.scanner.batch=10"));
482     } finally {
483       System.setErr(oldPrintStream);
484       System.setSecurityManager(SECURITY_MANAGER);
485     }
486   }
487 
488   /**
489    * Test map method of Importer
490    */
491   @SuppressWarnings({ "unchecked", "rawtypes" })
492   @Test
493   public void testKeyValueImporter() throws Exception {
494     KeyValueImporter importer = new KeyValueImporter();
495     Configuration configuration = new Configuration();
496     Context ctx = mock(Context.class);
497     when(ctx.getConfiguration()).thenReturn(configuration);
498 
499     doAnswer(new Answer<Void>() {
500 
501       @Override
502       public Void answer(InvocationOnMock invocation) throws Throwable {
503         ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
504         KeyValue key = (KeyValue) invocation.getArguments()[1];
505         assertEquals("Key", Bytes.toString(writer.get()));
506         assertEquals("row", Bytes.toString(key.getRow()));
507         return null;
508       }
509     }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
510 
511     importer.setup(ctx);
512     Result value = mock(Result.class);
513     KeyValue[] keys = {
514         new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
515             Bytes.toBytes("value")),
516         new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
517             Bytes.toBytes("value1")) };
518     when(value.rawCells()).thenReturn(keys);
519     importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
520 
521   }
522 
523   /**
524    * Test addFilterAndArguments method of Import This method set couple
525    * parameters into Configuration
526    */
527   @Test
528   public void testAddFilterAndArguments() throws IOException {
529     Configuration configuration = new Configuration();
530 
531     List<String> args = new ArrayList<String>();
532     args.add("param1");
533     args.add("param2");
534 
535     Import.addFilterAndArguments(configuration, FilterBase.class, args);
536     assertEquals("org.apache.hadoop.hbase.filter.FilterBase", 
537         configuration.get(Import.FILTER_CLASS_CONF_KEY));
538     assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
539   }
540 
541   @Test
542   public void testDurability() throws IOException, InterruptedException, ClassNotFoundException {
543     // Create an export table.
544     String exportTableName = "exporttestDurability";
545     HTable exportTable = UTIL.createTable(Bytes.toBytes(exportTableName), FAMILYA, 3);
546 
547     // Insert some data
548     Put put = new Put(ROW1);
549     put.add(FAMILYA, QUAL, now, QUAL);
550     put.add(FAMILYA, QUAL, now + 1, QUAL);
551     put.add(FAMILYA, QUAL, now + 2, QUAL);
552     exportTable.put(put);
553 
554     put = new Put(ROW2);
555     put.add(FAMILYA, QUAL, now, QUAL);
556     put.add(FAMILYA, QUAL, now + 1, QUAL);
557     put.add(FAMILYA, QUAL, now + 2, QUAL);
558     exportTable.put(put);
559 
560     // Run the export
561     String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
562     assertTrue(runExport(args));
563 
564     // Create the table for import
565     String importTableName = "importTestDurability1";
566     HTable importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
567 
568     // Register the hlog listener for the import table
569     TableWALActionListener walListener = new TableWALActionListener(importTableName);
570     HLog hLog = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL();
571     hLog.registerWALActionsListener(walListener);
572 
573     // Run the import with SKIP_WAL
574     args =
575         new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
576             importTableName, FQ_OUTPUT_DIR };
577     assertTrue(runImport(args));
578     //Assert that the wal is not visisted
579     assertTrue(!walListener.isWALVisited());
580     //Ensure that the count is 2 (only one version of key value is obtained)
581     assertTrue(getCount(importTable, null) == 2);
582 
583     // Run the import with the default durability option
584     importTableName = "importTestDurability2";
585     importTable = UTIL.createTable(Bytes.toBytes(importTableName), FAMILYA, 3);
586     hLog.unregisterWALActionsListener(walListener);    
587     walListener = new TableWALActionListener(importTableName);
588     hLog.registerWALActionsListener(walListener);
589     args = new String[] { importTableName, FQ_OUTPUT_DIR };
590     assertTrue(runImport(args));
591     //Assert that the wal is visisted
592     assertTrue(walListener.isWALVisited());
593     //Ensure that the count is 2 (only one version of key value is obtained)
594     assertTrue(getCount(importTable, null) == 2);
595   }
596 
597   /**
598    * This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, HLogKey, WALEdit)} to
599    * identify that an entry is written to the Write Ahead Log for the given table.
600    */
601   private static class TableWALActionListener implements WALActionsListener {
602 
603     private String tableName;
604     private boolean isVisited = false;
605 
606     public TableWALActionListener(String tableName) {
607       this.tableName = tableName;
608     }
609 
610     @Override
611     public void preLogRoll(Path oldPath, Path newPath) throws IOException {
612       // Not interested in this method.
613     }
614 
615     @Override
616     public void postLogRoll(Path oldPath, Path newPath) throws IOException {
617       // Not interested in this method.
618     }
619 
620     @Override
621     public void preLogArchive(Path oldPath, Path newPath) throws IOException {
622       // Not interested in this method.
623     }
624 
625     @Override
626     public void postLogArchive(Path oldPath, Path newPath) throws IOException {
627       // Not interested in this method.
628     }
629 
630     @Override
631     public void logRollRequested() {
632       // Not interested in this method.
633     }
634 
635     @Override
636     public void logCloseRequested() {
637       // Not interested in this method.
638     }
639 
640     @Override
641     public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
642       // Not interested in this method.
643     }
644 
645     @Override
646     public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
647       if (tableName.equalsIgnoreCase(htd.getNameAsString())) {
648         isVisited = true;
649       }
650     }
651 
652     public boolean isWALVisited() {
653       return isVisited;
654     }
655   }  
656 }