1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.fs;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.lang.reflect.Field;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.net.BindException;
27 import java.net.ServerSocket;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.commons.logging.impl.Log4JLogger;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.BlockLocation;
36 import org.apache.hadoop.fs.FSDataInputStream;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.ipc.RemoteException;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.testclassification.LargeTests;
45 import org.apache.hadoop.hbase.MiniHBaseCluster;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.client.Table;
49 import org.apache.hadoop.hbase.regionserver.HRegion;
50 import org.apache.hadoop.hbase.regionserver.HRegionServer;
51 import org.apache.hadoop.hbase.regionserver.Region;
52 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
53 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
54 import org.apache.hadoop.hbase.util.FSUtils;
55 import org.apache.hadoop.hdfs.DFSClient;
56 import org.apache.hadoop.hdfs.DistributedFileSystem;
57 import org.apache.hadoop.hdfs.MiniDFSCluster;
58 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
59 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
60 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
61 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
62 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
63 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
64 import org.apache.hadoop.hdfs.server.datanode.DataNode;
65 import org.apache.log4j.Level;
66 import org.junit.After;
67 import org.junit.Assert;
68 import org.junit.Before;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71
72
73
74
75 @Category(LargeTests.class)
76 public class TestBlockReorder {
77 private static final Log LOG = LogFactory.getLog(TestBlockReorder.class);
78
79 static {
80 ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
81 ((Log4JLogger) HFileSystem.LOG).getLogger().setLevel(Level.ALL);
82 }
83
84 private Configuration conf;
85 private MiniDFSCluster cluster;
86 private HBaseTestingUtility htu;
87 private DistributedFileSystem dfs;
88 private static final String host1 = "host1";
89 private static final String host2 = "host2";
90 private static final String host3 = "host3";
91
92 @Before
93 public void setUp() throws Exception {
94 htu = new HBaseTestingUtility();
95 htu.getConfiguration().setInt("dfs.blocksize", 1024);
96 htu.getConfiguration().setBoolean("dfs.support.append", true);
97 htu.getConfiguration().setInt("dfs.replication", 3);
98 htu.startMiniDFSCluster(3,
99 new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3});
100
101 conf = htu.getConfiguration();
102 cluster = htu.getDFSCluster();
103 dfs = (DistributedFileSystem) FileSystem.get(conf);
104 }
105
106 @After
107 public void tearDownAfterClass() throws Exception {
108 htu.shutdownMiniCluster();
109 }
110
111
112
113
114 @Test
115 public void testBlockLocationReorder() throws Exception {
116 Path p = new Path("hello");
117
118 Assert.assertTrue((short) cluster.getDataNodes().size() > 1);
119 final int repCount = 2;
120
121
122 FSDataOutputStream fop = dfs.create(p, (short) repCount);
123 final double toWrite = 875.5613;
124 fop.writeDouble(toWrite);
125 fop.close();
126
127
128 long start = System.currentTimeMillis();
129 FSDataInputStream fin = dfs.open(p);
130 Assert.assertTrue(toWrite == fin.readDouble());
131 long end = System.currentTimeMillis();
132 LOG.info("readtime= " + (end - start));
133 fin.close();
134 Assert.assertTrue((end - start) < 30 * 1000);
135
136
137
138 FileStatus f = dfs.getFileStatus(p);
139 BlockLocation[] lbs;
140 do {
141 lbs = dfs.getFileBlockLocations(f, 0, 1);
142 } while (lbs.length != 1 && lbs[0].getLength() != repCount);
143 final String name = lbs[0].getNames()[0];
144 Assert.assertTrue(name.indexOf(':') > 0);
145 String portS = name.substring(name.indexOf(':') + 1);
146 final int port = Integer.parseInt(portS);
147 LOG.info("port= " + port);
148 int ipcPort = -1;
149
150
151
152 boolean ok = false;
153 final String lookup = lbs[0].getHosts()[0];
154 StringBuilder sb = new StringBuilder();
155 for (DataNode dn : cluster.getDataNodes()) {
156 final String dnName = getHostName(dn);
157 sb.append(dnName).append(' ');
158 if (lookup.equals(dnName)) {
159 ok = true;
160 LOG.info("killing datanode " + name + " / " + lookup);
161 ipcPort = dn.ipcServer.getListenerAddress().getPort();
162 dn.shutdown();
163 LOG.info("killed datanode " + name + " / " + lookup);
164 break;
165 }
166 }
167 Assert.assertTrue(
168 "didn't find the server to kill, was looking for " + lookup + " found " + sb, ok);
169 LOG.info("ipc port= " + ipcPort);
170
171
172 Assert.assertTrue(HFileSystem.addLocationsOrderInterceptor(conf,
173 new HFileSystem.ReorderBlocks() {
174 @Override
175 public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
176 for (LocatedBlock lb : lbs.getLocatedBlocks()) {
177 if (lb.getLocations().length > 1) {
178 DatanodeInfo[] infos = lb.getLocations();
179 if (infos[0].getHostName().equals(lookup)) {
180 LOG.info("HFileSystem bad host, inverting");
181 DatanodeInfo tmp = infos[0];
182 infos[0] = infos[1];
183 infos[1] = tmp;
184 }
185 }
186 }
187 }
188 }));
189
190
191 final int retries = 10;
192 ServerSocket ss = null;
193 ServerSocket ssI;
194 try {
195 ss = new ServerSocket(port);
196 ssI = new ServerSocket(ipcPort);
197 } catch (BindException be) {
198 LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort +
199 ", this means that the datanode has not closed the socket or" +
200 " someone else took it. It may happen, skipping this test for this time.", be);
201 if (ss != null) {
202 ss.close();
203 }
204 return;
205 }
206
207
208
209 for (int i = 0; i < retries; i++) {
210 start = System.currentTimeMillis();
211
212 fin = dfs.open(p);
213 Assert.assertTrue(toWrite == fin.readDouble());
214 fin.close();
215 end = System.currentTimeMillis();
216 LOG.info("HFileSystem readtime= " + (end - start));
217 Assert.assertFalse("We took too much time to read", (end - start) > 60000);
218 }
219
220 ss.close();
221 ssI.close();
222 }
223
224
225
226
227 private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
228 Method m;
229 try {
230 m = DataNode.class.getMethod("getDisplayName");
231 } catch (NoSuchMethodException e) {
232 try {
233 m = DataNode.class.getMethod("getHostName");
234 } catch (NoSuchMethodException e1) {
235 throw new RuntimeException(e1);
236 }
237 }
238
239 String res = (String) m.invoke(dn);
240 if (res.contains(":")) {
241 return res.split(":")[0];
242 } else {
243 return res;
244 }
245 }
246
247
248
249
250 @Test()
251 public void testHBaseCluster() throws Exception {
252 byte[] sb = "sb".getBytes();
253 htu.startMiniZKCluster();
254
255 MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
256 hbm.waitForActiveAndReadyMaster();
257 hbm.getRegionServer(0).waitForServerOnline();
258 HRegionServer targetRs = hbm.getRegionServer(0);
259
260
261
262 String host4 = targetRs.getServerName().getHostname();
263 LOG.info("Starting a new datanode with the name=" + host4);
264 cluster.startDataNodes(conf, 1, true, null, new String[]{"/r4"}, new String[]{host4}, null);
265 cluster.waitClusterUp();
266
267 final int repCount = 3;
268
269
270 conf = targetRs.getConfiguration();
271 HFileSystem rfs = (HFileSystem) targetRs.getFileSystem();
272 Table h = htu.createTable(TableName.valueOf("table"), sb);
273
274
275
276
277
278
279
280 String rootDir = new Path(FSUtils.getRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
281 "/" + targetRs.getServerName().toString()).toUri().getPath();
282
283 DistributedFileSystem mdfs = (DistributedFileSystem)
284 hbm.getMaster().getMasterFileSystem().getFileSystem();
285
286
287 int nbTest = 0;
288 while (nbTest < 10) {
289 final List<Region> regions = targetRs.getOnlineRegions(h.getName());
290 final CountDownLatch latch = new CountDownLatch(regions.size());
291
292 final WALActionsListener listener = new WALActionsListener.Base() {
293 @Override
294 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
295 latch.countDown();
296 }
297 };
298 for (Region region : regions) {
299 ((HRegion)region).getWAL().registerWALActionsListener(listener);
300 }
301
302 htu.getHBaseAdmin().rollWALWriter(targetRs.getServerName());
303
304
305 try {
306 latch.await();
307 } catch (InterruptedException exception) {
308 LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later " +
309 "tests fail, it's probably because we should still be waiting.");
310 Thread.currentThread().interrupt();
311 }
312 for (Region region : regions) {
313 ((HRegion)region).getWAL().unregisterWALActionsListener(listener);
314 }
315
316
317 Thread.sleep(100);
318
319
320 Put p = new Put(sb);
321 p.add(sb, sb, sb);
322 h.put(p);
323
324 DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME);
325 HdfsFileStatus[] hfs = dl.getPartialListing();
326
327
328 Assert.assertTrue(hfs.length >= 1);
329 for (HdfsFileStatus hf : hfs) {
330
331 try {
332 LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
333 String logFile = rootDir + "/" + hf.getLocalName();
334 FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
335
336 LOG.info("Checking log file: " + logFile);
337
338
339
340
341 BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
342 if (bls.length > 0) {
343 BlockLocation bl = bls[0];
344
345 LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
346 for (int i = 0; i < bl.getHosts().length - 1; i++) {
347 LOG.info(bl.getHosts()[i] + " " + logFile);
348 Assert.assertNotSame(bl.getHosts()[i], host4);
349 }
350 String last = bl.getHosts()[bl.getHosts().length - 1];
351 LOG.info(last + " " + logFile);
352 if (host4.equals(last)) {
353 nbTest++;
354 LOG.info(logFile + " is on the new datanode and is ok");
355 if (bl.getHosts().length == 3) {
356
357
358 testFromDFS(dfs, logFile, repCount, host4);
359
360
361 testFromDFS(mdfs, logFile, repCount, host4);
362 }
363 }
364 }
365 } catch (FileNotFoundException exception) {
366 LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
367 "archived out from under us so we'll ignore and retry. If this test hangs " +
368 "indefinitely you should treat this failure as a symptom.", exception);
369 } catch (RemoteException exception) {
370 if (exception.unwrapRemoteException() instanceof FileNotFoundException) {
371 LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
372 "archived out from under us so we'll ignore and retry. If this test hangs " +
373 "indefinitely you should treat this failure as a symptom.", exception);
374 } else {
375 throw exception;
376 }
377 }
378 }
379 }
380 }
381
382 private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost)
383 throws Exception {
384
385 for (int i = 0; i < 10; i++) {
386 LocatedBlocks l;
387
388 final long max = System.currentTimeMillis() + 10000;
389 boolean done;
390 do {
391 Assert.assertTrue("Can't get enouth replica.", System.currentTimeMillis() < max);
392 l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
393 Assert.assertNotNull("Can't get block locations for " + src, l);
394 Assert.assertNotNull(l.getLocatedBlocks());
395 Assert.assertTrue(l.getLocatedBlocks().size() > 0);
396
397 done = true;
398 for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
399 done = (l.get(y).getLocations().length == repCount);
400 }
401 } while (!done);
402
403 for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
404 Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName());
405 }
406 }
407 }
408
409 private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception {
410 Field nf = DFSClient.class.getDeclaredField("namenode");
411 nf.setAccessible(true);
412 return (ClientProtocol) nf.get(dfsc);
413 }
414
415
416
417
418 @Test
419 public void testBlockLocation() throws Exception {
420
421 htu.startMiniZKCluster();
422 MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
423 conf = hbm.getConfiguration();
424
425
426
427 final String fileName = "/helloWorld";
428 Path p = new Path(fileName);
429
430 final int repCount = 3;
431 Assert.assertTrue((short) cluster.getDataNodes().size() >= repCount);
432
433
434 FSDataOutputStream fop = dfs.create(p, (short) repCount);
435 final double toWrite = 875.5613;
436 fop.writeDouble(toWrite);
437 fop.close();
438
439 for (int i=0; i<10; i++){
440
441 LocatedBlocks l;
442 final long max = System.currentTimeMillis() + 10000;
443 do {
444 l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1);
445 Assert.assertNotNull(l.getLocatedBlocks());
446 Assert.assertEquals(l.getLocatedBlocks().size(), 1);
447 Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length,
448 System.currentTimeMillis() < max);
449 } while (l.get(0).getLocations().length != repCount);
450
451
452 Object originalList[] = l.getLocatedBlocks().toArray();
453 HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks();
454 lrb.reorderBlocks(conf, l, fileName);
455 Assert.assertArrayEquals(originalList, l.getLocatedBlocks().toArray());
456
457
458 Assert.assertNotNull(conf.get(HConstants.HBASE_DIR));
459 Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty());
460 String pseudoLogFile = conf.get(HConstants.HBASE_DIR) + "/" +
461 HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile";
462
463
464 Assert.assertNotNull("log= " + pseudoLogFile,
465 DefaultWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile));
466
467
468 lrb.reorderBlocks(conf, l, pseudoLogFile);
469 Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
470
471
472 lrb.reorderBlocks(conf, l, pseudoLogFile);
473 Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
474 }
475 }
476
477 }