View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.fs;
20  
21  
22  import java.lang.reflect.Field;
23  import java.lang.reflect.InvocationTargetException;
24  import java.lang.reflect.Method;
25  import java.net.BindException;
26  import java.net.ServerSocket;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.commons.logging.impl.Log4JLogger;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.BlockLocation;
33  import org.apache.hadoop.fs.FSDataInputStream;
34  import org.apache.hadoop.fs.FSDataOutputStream;
35  import org.apache.hadoop.fs.FileStatus;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.LargeTests;
41  import org.apache.hadoop.hbase.MiniHBaseCluster;
42  import org.apache.hadoop.hbase.client.HTable;
43  import org.apache.hadoop.hbase.client.Put;
44  import org.apache.hadoop.hbase.regionserver.HRegionServer;
45  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
46  import org.apache.hadoop.hbase.util.FSUtils;
47  import org.apache.hadoop.hbase.util.Threads;
48  import org.apache.hadoop.hdfs.DFSClient;
49  import org.apache.hadoop.hdfs.DistributedFileSystem;
50  import org.apache.hadoop.hdfs.MiniDFSCluster;
51  import org.apache.hadoop.hdfs.protocol.ClientProtocol;
52  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
53  import org.apache.hadoop.hdfs.protocol.DirectoryListing;
54  import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
55  import org.apache.hadoop.hdfs.protocol.LocatedBlock;
56  import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
57  import org.apache.hadoop.hdfs.server.datanode.DataNode;
58  import org.apache.log4j.Level;
59  import org.junit.After;
60  import org.junit.Assert;
61  import org.junit.Before;
62  import org.junit.Test;
63  import org.junit.experimental.categories.Category;
64  
65  /**
66   * Tests for the hdfs fix from HBASE-6435.
67   */
68  @Category(LargeTests.class)
69  public class TestBlockReorder {
70    private static final Log LOG = LogFactory.getLog(TestBlockReorder.class);
71  
72    static {
73      ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
74      ((Log4JLogger) HFileSystem.LOG).getLogger().setLevel(Level.ALL);
75    }
76  
77    private Configuration conf;
78    private MiniDFSCluster cluster;
79    private HBaseTestingUtility htu;
80    private DistributedFileSystem dfs;
81    private static final String host1 = "host1";
82    private static final String host2 = "host2";
83    private static final String host3 = "host3";
84  
85    @Before
86    public void setUp() throws Exception {
87      htu = new HBaseTestingUtility();
88      htu.getConfiguration().setInt("dfs.block.size", 1024);// For the test with multiple blocks
89      htu.getConfiguration().setBoolean("dfs.support.append", true);
90      htu.getConfiguration().setInt("dfs.replication", 3);
91      htu.startMiniDFSCluster(3,
92          new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3});
93  
94      conf = htu.getConfiguration();
95      cluster = htu.getDFSCluster();
96      dfs = (DistributedFileSystem) FileSystem.get(conf);
97    }
98  
99    @After
100   public void tearDownAfterClass() throws Exception {
101     htu.shutdownMiniCluster();
102   }
103 
104   /**
105    * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS.
106    */
107   @Test
108   public void testBlockLocationReorder() throws Exception {
109     Path p = new Path("hello");
110 
111     Assert.assertTrue((short) cluster.getDataNodes().size() > 1);
112     final int repCount = 2;
113 
114     // Let's write the file
115     FSDataOutputStream fop = dfs.create(p, (short) repCount);
116     final double toWrite = 875.5613;
117     fop.writeDouble(toWrite);
118     fop.close();
119 
120     // Let's check we can read it when everybody's there
121     long start = System.currentTimeMillis();
122     FSDataInputStream fin = dfs.open(p);
123     Assert.assertTrue(toWrite == fin.readDouble());
124     long end = System.currentTimeMillis();
125     LOG.info("readtime= " + (end - start));
126     fin.close();
127     Assert.assertTrue((end - start) < 30 * 1000);
128 
129     // Let's kill the first location. But actually the fist location returned will change
130     // The first thing to do is to get the location, then the port
131     FileStatus f = dfs.getFileStatus(p);
132     BlockLocation[] lbs;
133     do {
134       lbs = dfs.getFileBlockLocations(f, 0, 1);
135     } while (lbs.length != 1 && lbs[0].getLength() != repCount);
136     final String name = lbs[0].getNames()[0];
137     Assert.assertTrue(name.indexOf(':') > 0);
138     String portS = name.substring(name.indexOf(':') + 1);
139     final int port = Integer.parseInt(portS);
140     LOG.info("port= " + port);
141     int ipcPort = -1;
142 
143     // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need
144     // to iterate ourselves.
145     boolean ok = false;
146     final String lookup = lbs[0].getHosts()[0];
147     StringBuilder sb = new StringBuilder();
148     for (DataNode dn : cluster.getDataNodes()) {
149       final String dnName = getHostName(dn);
150       sb.append(dnName).append(' ');
151       if (lookup.equals(dnName)) {
152         ok = true;
153         LOG.info("killing datanode " + name + " / " + lookup);
154         ipcPort = dn.ipcServer.getListenerAddress().getPort();
155         dn.shutdown();
156         LOG.info("killed datanode " + name + " / " + lookup);
157         break;
158       }
159     }
160     Assert.assertTrue(
161         "didn't find the server to kill, was looking for " + lookup + " found " + sb, ok);
162     LOG.info("ipc port= " + ipcPort);
163 
164     // Add the hook, with an implementation checking that we don't use the port we've just killed.
165     Assert.assertTrue(HFileSystem.addLocationsOrderInterceptor(conf,
166         new HFileSystem.ReorderBlocks() {
167           @Override
168           public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
169             for (LocatedBlock lb : lbs.getLocatedBlocks()) {
170               if (lb.getLocations().length > 1) {
171                 if (lb.getLocations()[0].getHostName().equals(lookup)) {
172                   LOG.info("HFileSystem bad host, inverting");
173                   DatanodeInfo tmp = lb.getLocations()[0];
174                   lb.getLocations()[0] = lb.getLocations()[1];
175                   lb.getLocations()[1] = tmp;
176                 }
177               }
178             }
179           }
180         }));
181 
182 
183     final int retries = 10;
184     ServerSocket ss = null;
185     ServerSocket ssI;
186     try {
187       ss = new ServerSocket(port);// We're taking the port to have a timeout issue later.
188       ssI = new ServerSocket(ipcPort);
189     } catch (BindException be) {
190       LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort +
191           ", this means that the datanode has not closed the socket or" +
192           " someone else took it. It may happen, skipping this test for this time.", be);
193       if (ss != null) {
194         ss.close();
195       }
196       return;
197     }
198 
199     // Now it will fail with a timeout, unfortunately it does not always connect to the same box,
200     // so we try retries times;  with the reorder it will never last more than a few milli seconds
201     for (int i = 0; i < retries; i++) {
202       start = System.currentTimeMillis();
203 
204       fin = dfs.open(p);
205       Assert.assertTrue(toWrite == fin.readDouble());
206       fin.close();
207       end = System.currentTimeMillis();
208       LOG.info("HFileSystem readtime= " + (end - start));
209       Assert.assertFalse("We took too much time to read", (end - start) > 60000);
210     }
211 
212     ss.close();
213     ssI.close();
214   }
215 
216   /**
217    * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2)
218    */
219   private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
220     Method m;
221     try {
222       m = DataNode.class.getMethod("getDisplayName");
223     } catch (NoSuchMethodException e) {
224       try {
225         m = DataNode.class.getMethod("getHostName");
226       } catch (NoSuchMethodException e1) {
227         throw new RuntimeException(e1);
228       }
229     }
230 
231     String res = (String) m.invoke(dn);
232     if (res.contains(":")) {
233       return res.split(":")[0];
234     } else {
235       return res;
236     }
237   }
238 
239   /**
240    * Test that the hook works within HBase, including when there are multiple blocks.
241    */
242   @Test()
243   public void testHBaseCluster() throws Exception {
244     byte[] sb = "sb".getBytes();
245     htu.startMiniZKCluster();
246 
247     MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
248     hbm.waitForActiveAndReadyMaster();
249     hbm.getRegionServer(0).waitForServerOnline();
250 
251     // We want to have a datanode with the same name as the region server, so
252     //  we're going to get the regionservername, and start a new datanode with this name.
253     String host4 = hbm.getRegionServer(0).getServerName().getHostname();
254     LOG.info("Starting a new datanode with the name=" + host4);
255     cluster.startDataNodes(conf, 1, true, null, new String[]{"/r4"}, new String[]{host4}, null);
256     cluster.waitClusterUp();
257 
258     final int repCount = 3;
259     HRegionServer targetRs = hbm.getRegionServer(0);
260 
261     // We use the regionserver file system & conf as we expect it to have the hook.
262     conf = targetRs.getConfiguration();
263     HFileSystem rfs = (HFileSystem) targetRs.getFileSystem();
264     HTable h = htu.createTable("table".getBytes(), sb);
265 
266     // Now, we have 4 datanodes and a replication count of 3. So we don't know if the datanode
267     // with the same node will be used. We can't really stop an existing datanode, this would
268     // make us fall in nasty hdfs bugs/issues. So we're going to try multiple times.
269 
270     // Now we need to find the log file, its locations, and look at it
271 
272     String rootDir = new Path(FSUtils.getRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
273             "/" + targetRs.getServerName().toString()).toUri().getPath();
274 
275     DistributedFileSystem mdfs = (DistributedFileSystem)
276         hbm.getMaster().getMasterFileSystem().getFileSystem();
277 
278 
279     int nbTest = 0;
280     while (nbTest < 10) {
281       htu.getHBaseAdmin().rollHLogWriter(targetRs.getServerName().toString());
282 
283       // We need a sleep as the namenode is informed asynchronously
284       Thread.sleep(100);
285 
286       // insert one put to ensure a minimal size
287       Put p = new Put(sb);
288       p.add(sb, sb, sb);
289       h.put(p);
290 
291       DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME);
292       HdfsFileStatus[] hfs = dl.getPartialListing();
293 
294       // As we wrote a put, we should have at least one log file.
295       Assert.assertTrue(hfs.length >= 1);
296       for (HdfsFileStatus hf : hfs) {
297         LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
298         String logFile = rootDir + "/" + hf.getLocalName();
299         FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
300 
301         LOG.info("Checking log file: " + logFile);
302         // Now checking that the hook is up and running
303         // We can't call directly getBlockLocations, it's not available in HFileSystem
304         // We're trying multiple times to be sure, as the order is random
305 
306         BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
307         if (bls.length > 0) {
308           BlockLocation bl = bls[0];
309 
310           LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
311           for (int i = 0; i < bl.getHosts().length - 1; i++) {
312             LOG.info(bl.getHosts()[i] + "    " + logFile);
313             Assert.assertNotSame(bl.getHosts()[i], host4);
314           }
315           String last = bl.getHosts()[bl.getHosts().length - 1];
316           LOG.info(last + "    " + logFile);
317           if (host4.equals(last)) {
318             nbTest++;
319             LOG.info(logFile + " is on the new datanode and is ok");
320             if (bl.getHosts().length == 3) {
321               // We can test this case from the file system as well
322               // Checking the underlying file system. Multiple times as the order is random
323               testFromDFS(dfs, logFile, repCount, host4);
324 
325               // now from the master
326               testFromDFS(mdfs, logFile, repCount, host4);
327             }
328           }
329         }
330       }
331     }
332   }
333 
334   private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost)
335       throws Exception {
336     // Multiple times as the order is random
337     for (int i = 0; i < 10; i++) {
338       LocatedBlocks l;
339       // The NN gets the block list asynchronously, so we may need multiple tries to get the list
340       final long max = System.currentTimeMillis() + 10000;
341       boolean done;
342       do {
343         Assert.assertTrue("Can't get enouth replica.", System.currentTimeMillis() < max);
344         l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
345         Assert.assertNotNull("Can't get block locations for " + src, l);
346         Assert.assertNotNull(l.getLocatedBlocks());
347         Assert.assertTrue(l.getLocatedBlocks().size() > 0);
348 
349         done = true;
350         for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
351           done = (l.get(y).getLocations().length == repCount);
352         }
353       } while (!done);
354 
355       for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
356         Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName());
357       }
358     }
359   }
360 
361   private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception {
362     Field nf = DFSClient.class.getDeclaredField("namenode");
363     nf.setAccessible(true);
364     return (ClientProtocol) nf.get(dfsc);
365   }
366 
367   /**
368    * Test that the reorder algo works as we expect.
369    */
370   @Test
371   public void testBlockLocation() throws Exception {
372     // We need to start HBase to get  HConstants.HBASE_DIR set in conf
373     htu.startMiniZKCluster();
374     MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
375     conf = hbm.getConfiguration();
376 
377 
378     // The "/" is mandatory, without it we've got a null pointer exception on the namenode
379     final String fileName = "/helloWorld";
380     Path p = new Path(fileName);
381 
382     final int repCount = 3;
383     Assert.assertTrue((short) cluster.getDataNodes().size() >= repCount);
384 
385     // Let's write the file
386     FSDataOutputStream fop = dfs.create(p, (short) repCount);
387     final double toWrite = 875.5613;
388     fop.writeDouble(toWrite);
389     fop.close();
390 
391     for (int i=0; i<10; i++){
392       // The interceptor is not set in this test, so we get the raw list at this point
393       LocatedBlocks l;
394       final long max = System.currentTimeMillis() + 10000;
395       do {
396         l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1);
397         Assert.assertNotNull(l.getLocatedBlocks());
398         Assert.assertEquals(l.getLocatedBlocks().size(), 1);
399         Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length,
400             System.currentTimeMillis() < max);
401       } while (l.get(0).getLocations().length != repCount);
402 
403       // Should be filtered, the name is different => The order won't change
404       Object originalList[] = l.getLocatedBlocks().toArray();
405       HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks();
406       lrb.reorderBlocks(conf, l, fileName);
407       Assert.assertArrayEquals(originalList, l.getLocatedBlocks().toArray());
408 
409       // Should be reordered, as we pretend to be a file name with a compliant stuff
410       Assert.assertNotNull(conf.get(HConstants.HBASE_DIR));
411       Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty());
412       String pseudoLogFile = conf.get(HConstants.HBASE_DIR) + "/" +
413           HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile";
414 
415       // Check that it will be possible to extract a ServerName from our construction
416       Assert.assertNotNull("log= " + pseudoLogFile,
417           HLogUtil.getServerNameFromHLogDirectoryName(dfs.getConf(), pseudoLogFile));
418 
419       // And check we're doing the right reorder.
420       lrb.reorderBlocks(conf, l, pseudoLogFile);
421       Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
422 
423       // Check again, it should remain the same.
424       lrb.reorderBlocks(conf, l, pseudoLogFile);
425       Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
426     }
427   }
428 
429 }