1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
22 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
23 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
24 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
25 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
26 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
27 import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
28 import static org.junit.Assert.*;
29 import static org.junit.Assert.assertEquals;
30 import static org.junit.Assert.assertFalse;
31 import static org.junit.Assert.assertTrue;
32 import static org.junit.Assert.fail;
33
34 import java.io.IOException;
35 import java.util.ArrayList;
36 import java.util.Arrays;
37 import java.util.HashSet;
38 import java.util.Iterator;
39 import java.util.LinkedList;
40 import java.util.List;
41 import java.util.NavigableSet;
42 import java.util.Set;
43 import java.util.TreeSet;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.Executors;
46 import java.util.concurrent.Future;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import java.util.concurrent.atomic.AtomicLong;
50
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53 import org.apache.hadoop.conf.Configuration;
54 import org.apache.hadoop.fs.FSDataOutputStream;
55 import org.apache.hadoop.fs.FileStatus;
56 import org.apache.hadoop.fs.FileSystem;
57 import org.apache.hadoop.fs.Path;
58 import org.apache.hadoop.hbase.HColumnDescriptor;
59 import org.apache.hadoop.hbase.TableName;
60 import org.apache.hadoop.hbase.HBaseConfiguration;
61 import org.apache.hadoop.hbase.HBaseTestingUtility;
62 import org.apache.hadoop.hbase.HConstants;
63 import org.apache.hadoop.hbase.HRegionInfo;
64 import org.apache.hadoop.hbase.HTableDescriptor;
65 import org.apache.hadoop.hbase.KeyValue;
66 import org.apache.hadoop.hbase.LargeTests;
67 import org.apache.hadoop.hbase.MiniHBaseCluster;
68 import org.apache.hadoop.hbase.NamespaceDescriptor;
69 import org.apache.hadoop.hbase.ServerName;
70 import org.apache.hadoop.hbase.SplitLogCounters;
71 import org.apache.hadoop.hbase.Waiter;
72 import org.apache.hadoop.hbase.client.Delete;
73 import org.apache.hadoop.hbase.client.Get;
74 import org.apache.hadoop.hbase.client.HConnectionManager;
75 import org.apache.hadoop.hbase.client.HTable;
76 import org.apache.hadoop.hbase.client.Increment;
77 import org.apache.hadoop.hbase.client.NonceGenerator;
78 import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
79 import org.apache.hadoop.hbase.client.Put;
80 import org.apache.hadoop.hbase.client.Result;
81 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
82 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
83 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
84 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
85 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
86 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
87 import org.apache.hadoop.hbase.regionserver.HRegion;
88 import org.apache.hadoop.hbase.regionserver.HRegionServer;
89 import org.apache.hadoop.hbase.regionserver.wal.HLog;
90 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
91 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
92 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
93 import org.apache.hadoop.hbase.util.Bytes;
94 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
95 import org.apache.hadoop.hbase.util.FSUtils;
96 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
97 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
98 import org.apache.hadoop.hbase.util.Threads;
99 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
100 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
101 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
102 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
103 import org.apache.hadoop.hdfs.MiniDFSCluster;
104 import org.apache.log4j.Level;
105 import org.apache.log4j.Logger;
106 import org.apache.zookeeper.KeeperException;
107 import org.junit.After;
108 import org.junit.AfterClass;
109 import org.junit.Assert;
110 import org.junit.Before;
111 import org.junit.BeforeClass;
112 import org.junit.Test;
113 import org.junit.experimental.categories.Category;
114
115 @Category(LargeTests.class)
116 public class TestDistributedLogSplitting {
117 private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
118 static {
119 Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
120
121
122
123 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
124
125 }
126
127
128 static final int NUM_MASTERS = 2;
129 static final int NUM_RS = 6;
130
131 MiniHBaseCluster cluster;
132 HMaster master;
133 Configuration conf;
134 static Configuration originalConf;
135 static HBaseTestingUtility TEST_UTIL;
136 static MiniDFSCluster dfsCluster;
137 static MiniZooKeeperCluster zkCluster;
138
139 @BeforeClass
140 public static void setup() throws Exception {
141 TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create());
142 dfsCluster = TEST_UTIL.startMiniDFSCluster(1);
143 zkCluster = TEST_UTIL.startMiniZKCluster();
144 originalConf = TEST_UTIL.getConfiguration();
145 }
146
147 @AfterClass
148 public static void tearDown() throws IOException {
149 TEST_UTIL.shutdownMiniZKCluster();
150 TEST_UTIL.shutdownMiniDFSCluster();
151 }
152
153 private void startCluster(int num_rs) throws Exception {
154 SplitLogCounters.resetCounters();
155 LOG.info("Starting cluster");
156 conf.getLong("hbase.splitlog.max.resubmit", 0);
157
158 conf.setInt("zookeeper.recovery.retry", 0);
159 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
160 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0);
161 conf.setInt("hbase.regionserver.wal.max.splitters", 3);
162 conf.setInt("hfile.format.version", 3);
163 TEST_UTIL = new HBaseTestingUtility(conf);
164 TEST_UTIL.setDFSCluster(dfsCluster);
165 TEST_UTIL.setZkCluster(zkCluster);
166 TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs);
167 cluster = TEST_UTIL.getHBaseCluster();
168 LOG.info("Waiting for active/ready master");
169 cluster.waitForActiveAndReadyMaster();
170 master = cluster.getMaster();
171 while (cluster.getLiveRegionServerThreads().size() < num_rs) {
172 Threads.sleep(1);
173 }
174 }
175
176 @Before
177 public void before() throws Exception {
178
179 conf = HBaseConfiguration.create(originalConf);
180 }
181
182 @After
183 public void after() throws Exception {
184 try {
185 if (TEST_UTIL.getHBaseCluster() != null) {
186 for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) {
187 mt.getMaster().abort("closing...", null);
188 }
189 }
190 TEST_UTIL.shutdownMiniHBaseCluster();
191 } finally {
192 TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
193 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
194 }
195 }
196
197 @Test (timeout=300000)
198 public void testRecoveredEdits() throws Exception {
199 LOG.info("testRecoveredEdits");
200 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024);
201 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
202 startCluster(NUM_RS);
203
204 final int NUM_LOG_LINES = 1000;
205 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
206
207
208 master.balanceSwitch(false);
209 FileSystem fs = master.getMasterFileSystem().getFileSystem();
210
211 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
212
213 Path rootdir = FSUtils.getRootDir(conf);
214
215 installTable(new ZooKeeperWatcher(conf, "table-creation", null),
216 "table", "family", 40);
217 TableName table = TableName.valueOf("table");
218 List<HRegionInfo> regions = null;
219 HRegionServer hrs = null;
220 for (int i = 0; i < NUM_RS; i++) {
221 boolean foundRs = false;
222 hrs = rsts.get(i).getRegionServer();
223 regions = ProtobufUtil.getOnlineRegions(hrs);
224 for (HRegionInfo region : regions) {
225 if (region.getTable().getNameAsString().equalsIgnoreCase("table")) {
226 foundRs = true;
227 break;
228 }
229 }
230 if (foundRs) break;
231 }
232 final Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(hrs
233 .getServerName().toString()));
234
235 LOG.info("#regions = " + regions.size());
236 Iterator<HRegionInfo> it = regions.iterator();
237 while (it.hasNext()) {
238 HRegionInfo region = it.next();
239 if (region.getTable().getNamespaceAsString()
240 .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
241 it.remove();
242 }
243 }
244 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
245
246 slm.splitLogDistributed(logDir);
247
248 int count = 0;
249 for (HRegionInfo hri : regions) {
250
251 Path tdir = FSUtils.getTableDir(rootdir, table);
252 @SuppressWarnings("deprecation")
253 Path editsdir =
254 HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
255 LOG.debug("checking edits dir " + editsdir);
256 FileStatus[] files = fs.listStatus(editsdir);
257 assertTrue(files.length > 1);
258 for (int i = 0; i < files.length; i++) {
259 int c = countHLog(files[i].getPath(), fs, conf);
260 count += c;
261 }
262 LOG.info(count + " edits in " + files.length + " recovered edits files.");
263 }
264 assertEquals(NUM_LOG_LINES, count);
265 }
266
267 @Test(timeout = 300000)
268 public void testLogReplayWithNonMetaRSDown() throws Exception {
269 LOG.info("testLogReplayWithNonMetaRSDown");
270 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024);
271 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
272 startCluster(NUM_RS);
273 final int NUM_REGIONS_TO_CREATE = 40;
274 final int NUM_LOG_LINES = 1000;
275
276
277 master.balanceSwitch(false);
278
279 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
280 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
281
282 HRegionServer hrs = findRSToKill(false, "table");
283 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
284 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
285
286
287 this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
288 ht.close();
289 zkw.close();
290 }
291
292 private static class NonceGeneratorWithDups extends PerClientRandomNonceGenerator {
293 private boolean isDups = false;
294 private LinkedList<Long> nonces = new LinkedList<Long>();
295
296 public void startDups() {
297 isDups = true;
298 }
299
300 @Override
301 public long newNonce() {
302 long nonce = isDups ? nonces.removeFirst() : super.newNonce();
303 if (!isDups) {
304 nonces.add(nonce);
305 }
306 return nonce;
307 }
308 }
309
310 @Test(timeout = 300000)
311 public void testNonceRecovery() throws Exception {
312 LOG.info("testNonceRecovery");
313 final String TABLE_NAME = "table";
314 final String FAMILY_NAME = "family";
315 final int NUM_REGIONS_TO_CREATE = 40;
316
317 conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024);
318 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
319 startCluster(NUM_RS);
320 master.balanceSwitch(false);
321
322 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
323 HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE);
324 NonceGeneratorWithDups ng = new NonceGeneratorWithDups();
325 NonceGenerator oldNg =
326 HConnectionManager.injectNonceGeneratorForTesting(ht.getConnection(), ng);
327
328 try {
329 List<Increment> reqs = new ArrayList<Increment>();
330 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
331 HRegionServer hrs = rst.getRegionServer();
332 List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs);
333 for (HRegionInfo hri : hris) {
334 if (TABLE_NAME.equalsIgnoreCase(hri.getTable().getNameAsString())) {
335 byte[] key = hri.getStartKey();
336 if (key == null || key.length == 0) {
337 key = Bytes.copy(hri.getEndKey());
338 --(key[key.length - 1]);
339 }
340 Increment incr = new Increment(key);
341 incr.addColumn(Bytes.toBytes(FAMILY_NAME), Bytes.toBytes("q"), 1);
342 ht.increment(incr);
343 reqs.add(incr);
344 }
345 }
346 }
347
348 HRegionServer hrs = findRSToKill(false, "table");
349 abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
350 ng.startDups();
351 for (Increment incr : reqs) {
352 try {
353 ht.increment(incr);
354 fail("should have thrown");
355 } catch (OperationConflictException ope) {
356 LOG.debug("Caught as expected: " + ope.getMessage());
357 }
358 }
359 } finally {
360 HConnectionManager.injectNonceGeneratorForTesting(ht.getConnection(), oldNg);
361 ht.close();
362 zkw.close();
363 }
364 }
365
366 @Test(timeout = 300000)
367 public void testLogReplayWithMetaRSDown() throws Exception {
368 LOG.info("testRecoveredEditsReplayWithMetaRSDown");
369 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
370 startCluster(NUM_RS);
371 final int NUM_REGIONS_TO_CREATE = 40;
372 final int NUM_LOG_LINES = 1000;
373
374
375 master.balanceSwitch(false);
376
377 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
378 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
379
380 HRegionServer hrs = findRSToKill(true, "table");
381 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
382 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
383
384 this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
385 ht.close();
386 zkw.close();
387 }
388
389 private void abortRSAndVerifyRecovery(HRegionServer hrs, HTable ht, final ZooKeeperWatcher zkw,
390 final int numRegions, final int numofLines) throws Exception {
391
392 abortRSAndWaitForRecovery(hrs, zkw, numRegions);
393 assertEquals(numofLines, TEST_UTIL.countRows(ht));
394 }
395
396 private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw,
397 final int numRegions) throws Exception {
398 final MiniHBaseCluster tmpCluster = this.cluster;
399
400
401 LOG.info("Aborting region server: " + hrs.getServerName());
402 hrs.abort("testing");
403
404
405 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
406 @Override
407 public boolean evaluate() throws Exception {
408 return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
409 }
410 });
411
412
413 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
414 @Override
415 public boolean evaluate() throws Exception {
416 return (getAllOnlineRegions(tmpCluster).size() >= (numRegions + 1));
417 }
418 });
419
420
421 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
422 @Override
423 public boolean evaluate() throws Exception {
424 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
425 zkw.recoveringRegionsZNode, false);
426 return (recoveringRegions != null && recoveringRegions.size() == 0);
427 }
428 });
429 }
430
431 @Test(timeout = 300000)
432 public void testMasterStartsUpWithLogSplittingWork() throws Exception {
433 LOG.info("testMasterStartsUpWithLogSplittingWork");
434 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
435 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
436 startCluster(NUM_RS);
437
438 final int NUM_REGIONS_TO_CREATE = 40;
439 final int NUM_LOG_LINES = 1000;
440
441
442 master.balanceSwitch(false);
443
444 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
445 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
446 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
447
448 HRegionServer hrs = findRSToKill(false, "table");
449 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
450 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
451
452
453 abortMaster(cluster);
454
455
456 LOG.info("Aborting region server: " + hrs.getServerName());
457 hrs.abort("testing");
458
459
460 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
461 @Override
462 public boolean evaluate() throws Exception {
463 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
464 }
465 });
466
467 Thread.sleep(2000);
468 LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
469
470 startMasterAndWaitUntilLogSplit(cluster);
471
472
473 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
474 @Override
475 public boolean evaluate() throws Exception {
476 return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
477 }
478 });
479
480 LOG.info("Current Open Regions After Master Node Starts Up:"
481 + getAllOnlineRegions(cluster).size());
482
483 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
484
485 ht.close();
486 zkw.close();
487 }
488
489 @Test(timeout = 300000)
490 public void testMasterStartsUpWithLogReplayWork() throws Exception {
491 LOG.info("testMasterStartsUpWithLogReplayWork");
492 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
493 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
494 startCluster(NUM_RS);
495
496 final int NUM_REGIONS_TO_CREATE = 40;
497 final int NUM_LOG_LINES = 1000;
498
499
500 master.balanceSwitch(false);
501
502 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
503 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
504 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
505
506 HRegionServer hrs = findRSToKill(false, "table");
507 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
508 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
509
510
511 abortMaster(cluster);
512
513
514 LOG.info("Aborting region server: " + hrs.getServerName());
515 hrs.abort("testing");
516
517
518 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
519 @Override
520 public boolean evaluate() throws Exception {
521 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
522 }
523 });
524
525 Thread.sleep(2000);
526 LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
527
528 startMasterAndWaitUntilLogSplit(cluster);
529
530
531 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
532 @Override
533 public boolean evaluate() throws Exception {
534 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
535 zkw.recoveringRegionsZNode, false);
536 return (recoveringRegions != null && recoveringRegions.size() == 0);
537 }
538 });
539
540 LOG.info("Current Open Regions After Master Node Starts Up:"
541 + getAllOnlineRegions(cluster).size());
542
543 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
544
545 ht.close();
546 zkw.close();
547 }
548
549
550 @Test(timeout = 300000)
551 public void testLogReplayTwoSequentialRSDown() throws Exception {
552 LOG.info("testRecoveredEditsReplayTwoSequentialRSDown");
553 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
554 startCluster(NUM_RS);
555 final int NUM_REGIONS_TO_CREATE = 40;
556 final int NUM_LOG_LINES = 1000;
557
558
559 master.balanceSwitch(false);
560
561 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
562 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
563 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
564
565 List<HRegionInfo> regions = null;
566 HRegionServer hrs1 = findRSToKill(false, "table");
567 regions = ProtobufUtil.getOnlineRegions(hrs1);
568
569 makeHLog(hrs1.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
570
571
572 LOG.info("Aborting region server: " + hrs1.getServerName());
573 hrs1.abort("testing");
574
575
576 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
577 @Override
578 public boolean evaluate() throws Exception {
579 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
580 }
581 });
582
583
584 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
585 @Override
586 public boolean evaluate() throws Exception {
587 return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
588 }
589 });
590
591
592 Thread.sleep(300);
593
594 rsts = cluster.getLiveRegionServerThreads();
595 HRegionServer hrs2 = rsts.get(0).getRegionServer();
596 LOG.info("Aborting one more region server: " + hrs2.getServerName());
597 hrs2.abort("testing");
598
599
600 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
601 @Override
602 public boolean evaluate() throws Exception {
603 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2));
604 }
605 });
606
607
608 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
609 @Override
610 public boolean evaluate() throws Exception {
611 return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
612 }
613 });
614
615
616 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
617 @Override
618 public boolean evaluate() throws Exception {
619 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
620 zkw.recoveringRegionsZNode, false);
621 return (recoveringRegions != null && recoveringRegions.size() == 0);
622 }
623 });
624
625 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
626 ht.close();
627 zkw.close();
628 }
629
630 @Test(timeout = 300000)
631 public void testMarkRegionsRecoveringInZK() throws Exception {
632 LOG.info("testMarkRegionsRecoveringInZK");
633 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
634 startCluster(NUM_RS);
635 master.balanceSwitch(false);
636 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
637 final ZooKeeperWatcher zkw = master.getZooKeeperWatcher();
638 HTable ht = installTable(zkw, "table", "family", 40);
639 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
640
641 Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
642 HRegionInfo region = null;
643 HRegionServer hrs = null;
644 ServerName firstFailedServer = null;
645 ServerName secondFailedServer = null;
646 for (int i = 0; i < NUM_RS; i++) {
647 hrs = rsts.get(i).getRegionServer();
648 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
649 if (regions.isEmpty()) continue;
650 region = regions.get(0);
651 regionSet.add(region);
652 firstFailedServer = hrs.getServerName();
653 secondFailedServer = rsts.get((i + 1) % NUM_RS).getRegionServer().getServerName();
654 break;
655 }
656
657 slm.markRegionsRecoveringInZK(firstFailedServer, regionSet);
658 slm.markRegionsRecoveringInZK(secondFailedServer, regionSet);
659
660 List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
661 ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName()));
662
663 assertEquals(recoveringRegions.size(), 2);
664
665
666 final HRegionServer tmphrs = hrs;
667 TEST_UTIL.waitFor(60000, 1000, new Waiter.Predicate<Exception>() {
668 @Override
669 public boolean evaluate() throws Exception {
670 return (tmphrs.getRecoveringRegions().size() == 0);
671 }
672 });
673 ht.close();
674 zkw.close();
675 }
676
677 @Test(timeout = 300000)
678 public void testReplayCmd() throws Exception {
679 LOG.info("testReplayCmd");
680 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
681 startCluster(NUM_RS);
682 final int NUM_REGIONS_TO_CREATE = 40;
683
684
685 master.balanceSwitch(false);
686
687 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
688 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
689 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
690
691 List<HRegionInfo> regions = null;
692 HRegionServer hrs = null;
693 for (int i = 0; i < NUM_RS; i++) {
694 boolean isCarryingMeta = false;
695 hrs = rsts.get(i).getRegionServer();
696 regions = ProtobufUtil.getOnlineRegions(hrs);
697 for (HRegionInfo region : regions) {
698 if (region.isMetaRegion()) {
699 isCarryingMeta = true;
700 break;
701 }
702 }
703 if (isCarryingMeta) {
704 continue;
705 }
706 if (regions.size() > 0) break;
707 }
708
709 this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1"));
710 String originalCheckSum = TEST_UTIL.checksumRows(ht);
711
712
713 abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
714
715 assertEquals("Data should remain after reopening of regions", originalCheckSum,
716 TEST_UTIL.checksumRows(ht));
717
718 ht.close();
719 zkw.close();
720 }
721
722 @Test(timeout = 300000)
723 public void testLogReplayForDisablingTable() throws Exception {
724 LOG.info("testLogReplayForDisablingTable");
725 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
726 startCluster(NUM_RS);
727 final int NUM_REGIONS_TO_CREATE = 40;
728 final int NUM_LOG_LINES = 1000;
729
730 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
731 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
732 HTable disablingHT = installTable(zkw, "disableTable", "family", NUM_REGIONS_TO_CREATE);
733 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE, NUM_REGIONS_TO_CREATE);
734
735
736
737 master.balanceSwitch(false);
738
739 List<HRegionInfo> regions = null;
740 HRegionServer hrs = null;
741 boolean hasRegionsForBothTables = false;
742 String tableName = null;
743 for (int i = 0; i < NUM_RS; i++) {
744 tableName = null;
745 hasRegionsForBothTables = false;
746 boolean isCarryingSystem = false;
747 hrs = rsts.get(i).getRegionServer();
748 regions = ProtobufUtil.getOnlineRegions(hrs);
749 for (HRegionInfo region : regions) {
750 if (region.getTable().isSystemTable()) {
751 isCarryingSystem = true;
752 break;
753 }
754 if (tableName != null &&
755 !tableName.equalsIgnoreCase(region.getTable().getNameAsString())) {
756
757 hasRegionsForBothTables = true;
758 break;
759 } else if (tableName == null) {
760 tableName = region.getTable().getNameAsString();
761 }
762 }
763 if (isCarryingSystem) {
764 continue;
765 }
766 if (hasRegionsForBothTables) {
767 break;
768 }
769 }
770
771
772 Assert.assertTrue(hasRegionsForBothTables);
773
774 LOG.info("#regions = " + regions.size());
775 Iterator<HRegionInfo> it = regions.iterator();
776 while (it.hasNext()) {
777 HRegionInfo region = it.next();
778 if (region.isMetaTable()) {
779 it.remove();
780 }
781 }
782 makeHLog(hrs.getWAL(), regions, "disableTable", "family", NUM_LOG_LINES, 100, false);
783 makeHLog(hrs.getWAL(), regions, "table", "family", NUM_LOG_LINES, 100);
784
785 LOG.info("Disabling table\n");
786 TEST_UTIL.getHBaseAdmin().disableTable(Bytes.toBytes("disableTable"));
787
788
789 LOG.info("Aborting region server: " + hrs.getServerName());
790 hrs.abort("testing");
791
792
793 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
794 @Override
795 public boolean evaluate() throws Exception {
796 return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
797 }
798 });
799
800
801 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
802 @Override
803 public boolean evaluate() throws Exception {
804 return (getAllOnlineRegions(cluster).size() >= (NUM_REGIONS_TO_CREATE + 1));
805 }
806 });
807
808
809 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
810 @Override
811 public boolean evaluate() throws Exception {
812 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
813 zkw.recoveringRegionsZNode, false);
814 ServerManager serverManager = master.getServerManager();
815 return (!serverManager.areDeadServersInProgress() &&
816 recoveringRegions != null && recoveringRegions.size() == 0);
817 }
818 });
819
820 int count = 0;
821 FileSystem fs = master.getMasterFileSystem().getFileSystem();
822 Path rootdir = FSUtils.getRootDir(conf);
823 Path tdir = FSUtils.getTableDir(rootdir, TableName.valueOf("disableTable"));
824 for (HRegionInfo hri : regions) {
825 @SuppressWarnings("deprecation")
826 Path editsdir =
827 HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
828 LOG.debug("checking edits dir " + editsdir);
829 if(!fs.exists(editsdir)) continue;
830 FileStatus[] files = fs.listStatus(editsdir);
831 if(files != null) {
832 for(FileStatus file : files) {
833 int c = countHLog(file.getPath(), fs, conf);
834 count += c;
835 LOG.info(c + " edits in " + file.getPath());
836 }
837 }
838 }
839
840 LOG.info("Verify edits in recovered.edits files");
841 assertEquals(NUM_LOG_LINES, count);
842 LOG.info("Verify replayed edits");
843 assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
844
845
846 for (HRegionInfo hri : regions) {
847 @SuppressWarnings("deprecation")
848 Path editsdir =
849 HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
850 fs.delete(editsdir, true);
851 }
852 disablingHT.close();
853 ht.close();
854 zkw.close();
855 }
856
857 @Test(timeout = 300000)
858 public void testDisallowWritesInRecovering() throws Exception {
859 LOG.info("testDisallowWritesInRecovering");
860 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
861 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
862 conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
863 startCluster(NUM_RS);
864 final int NUM_REGIONS_TO_CREATE = 40;
865
866
867 master.balanceSwitch(false);
868
869 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
870 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
871 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
872 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
873
874 Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
875 HRegionInfo region = null;
876 HRegionServer hrs = null;
877 HRegionServer dstRS = null;
878 for (int i = 0; i < NUM_RS; i++) {
879 hrs = rsts.get(i).getRegionServer();
880 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
881 if (regions.isEmpty()) continue;
882 region = regions.get(0);
883 regionSet.add(region);
884 dstRS = rsts.get((i+1) % NUM_RS).getRegionServer();
885 break;
886 }
887
888 slm.markRegionsRecoveringInZK(hrs.getServerName(), regionSet);
889
890 final HRegionInfo hri = region;
891 final HRegionServer tmpRS = dstRS;
892 TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(),
893 Bytes.toBytes(dstRS.getServerName().getServerName()));
894
895 final RegionStates regionStates =
896 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
897 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
898 @Override
899 public boolean evaluate() throws Exception {
900 ServerName sn = regionStates.getRegionServerOfRegion(hri);
901 return (sn != null && sn.equals(tmpRS.getServerName()));
902 }
903 });
904
905 try {
906 byte[] key = region.getStartKey();
907 if (key == null || key.length == 0) {
908 key = new byte[] { 0, 0, 0, 0, 1 };
909 }
910 ht.setAutoFlush(true, true);
911 Put put = new Put(key);
912 put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
913 ht.put(put);
914 ht.close();
915 } catch (IOException ioe) {
916 Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException);
917 RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe;
918 boolean foundRegionInRecoveryException = false;
919 for (Throwable t : re.getCauses()) {
920 if (t instanceof RegionInRecoveryException) {
921 foundRegionInRecoveryException = true;
922 break;
923 }
924 }
925 Assert.assertTrue(
926 "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(),
927 foundRegionInRecoveryException);
928 }
929
930 zkw.close();
931 }
932
933
934
935
936
937
938
939
940
941
942 @Test (timeout=300000)
943 public void testWorkerAbort() throws Exception {
944 LOG.info("testWorkerAbort");
945 startCluster(3);
946 final int NUM_LOG_LINES = 10000;
947 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
948 FileSystem fs = master.getMasterFileSystem().getFileSystem();
949
950 final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
951 HRegionServer hrs = findRSToKill(false, "table");
952 Path rootdir = FSUtils.getRootDir(conf);
953 final Path logDir = new Path(rootdir,
954 HLogUtil.getHLogDirectoryName(hrs.getServerName().toString()));
955
956 installTable(new ZooKeeperWatcher(conf, "table-creation", null),
957 "table", "family", 40);
958
959 makeHLog(hrs.getWAL(), ProtobufUtil.getOnlineRegions(hrs), "table", "family", NUM_LOG_LINES,
960 100);
961
962 new Thread() {
963 public void run() {
964 waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
965 for (RegionServerThread rst : rsts) {
966 rst.getRegionServer().abort("testing");
967 break;
968 }
969 }
970 }.start();
971
972 FileStatus[] logfiles = fs.listStatus(logDir);
973 TaskBatch batch = new TaskBatch();
974 slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
975
976 long curt = System.currentTimeMillis();
977 long waitTime = 80000;
978 long endt = curt + waitTime;
979 while (curt < endt) {
980 if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
981 tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
982 tot_wkr_preempt_task.get()) == 0) {
983 Thread.yield();
984 curt = System.currentTimeMillis();
985 } else {
986 assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
987 tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
988 tot_wkr_preempt_task.get()));
989 return;
990 }
991 }
992 fail("none of the following counters went up in " + waitTime +
993 " milliseconds - " +
994 "tot_wkr_task_resigned, tot_wkr_task_err, " +
995 "tot_wkr_final_transition_failed, tot_wkr_task_done, " +
996 "tot_wkr_preempt_task");
997 }
998
999 @Test (timeout=300000)
1000 public void testThreeRSAbort() throws Exception {
1001 LOG.info("testThreeRSAbort");
1002 final int NUM_REGIONS_TO_CREATE = 40;
1003 final int NUM_ROWS_PER_REGION = 100;
1004
1005 startCluster(NUM_RS);
1006
1007 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf,
1008 "distributed log splitting test", null);
1009
1010 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1011 populateDataInTable(NUM_ROWS_PER_REGION, "family");
1012
1013
1014 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1015 assertEquals(NUM_RS, rsts.size());
1016 rsts.get(0).getRegionServer().abort("testing");
1017 rsts.get(1).getRegionServer().abort("testing");
1018 rsts.get(2).getRegionServer().abort("testing");
1019
1020 long start = EnvironmentEdgeManager.currentTimeMillis();
1021 while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
1022 if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
1023 assertTrue(false);
1024 }
1025 Thread.sleep(200);
1026 }
1027
1028 start = EnvironmentEdgeManager.currentTimeMillis();
1029 while (getAllOnlineRegions(cluster).size() < (NUM_REGIONS_TO_CREATE + 1)) {
1030 if (EnvironmentEdgeManager.currentTimeMillis() - start > 60000) {
1031 assertTrue("Timedout", false);
1032 }
1033 Thread.sleep(200);
1034 }
1035
1036
1037 TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
1038 @Override
1039 public boolean evaluate() throws Exception {
1040 List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
1041 zkw.recoveringRegionsZNode, false);
1042 return (recoveringRegions != null && recoveringRegions.size() == 0);
1043 }
1044 });
1045
1046 assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
1047 TEST_UTIL.countRows(ht));
1048 ht.close();
1049 zkw.close();
1050 }
1051
1052
1053
1054 @Test(timeout=30000)
1055 public void testDelayedDeleteOnFailure() throws Exception {
1056 LOG.info("testDelayedDeleteOnFailure");
1057 startCluster(1);
1058 final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
1059 final FileSystem fs = master.getMasterFileSystem().getFileSystem();
1060 final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
1061 fs.mkdirs(logDir);
1062 ExecutorService executor = null;
1063 try {
1064 final Path corruptedLogFile = new Path(logDir, "x");
1065 FSDataOutputStream out;
1066 out = fs.create(corruptedLogFile);
1067 out.write(0);
1068 out.write(Bytes.toBytes("corrupted bytes"));
1069 out.close();
1070 slm.ignoreZKDeleteForTesting = true;
1071 executor = Executors.newSingleThreadExecutor();
1072 Runnable runnable = new Runnable() {
1073 @Override
1074 public void run() {
1075 try {
1076
1077
1078
1079 slm.splitLogDistributed(logDir);
1080 } catch (IOException ioe) {
1081 try {
1082 assertTrue(fs.exists(corruptedLogFile));
1083
1084
1085
1086 slm.splitLogDistributed(logDir);
1087 } catch (IOException e) {
1088 assertTrue(Thread.currentThread().isInterrupted());
1089 return;
1090 }
1091 fail("did not get the expected IOException from the 2nd call");
1092 }
1093 fail("did not get the expected IOException from the 1st call");
1094 }
1095 };
1096 Future<?> result = executor.submit(runnable);
1097 try {
1098 result.get(2000, TimeUnit.MILLISECONDS);
1099 } catch (TimeoutException te) {
1100
1101 }
1102 waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
1103 executor.shutdownNow();
1104 executor = null;
1105
1106
1107 result.get();
1108 } finally {
1109 if (executor != null) {
1110
1111
1112 executor.shutdownNow();
1113 }
1114 fs.delete(logDir, true);
1115 }
1116 }
1117
1118 @Test(timeout = 300000)
1119 public void testMetaRecoveryInZK() throws Exception {
1120 LOG.info("testMetaRecoveryInZK");
1121 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1122 startCluster(NUM_RS);
1123
1124
1125
1126 master.balanceSwitch(false);
1127 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1128 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1129
1130
1131 HRegionServer hrs = findRSToKill(true, null);
1132 List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs);
1133
1134 LOG.info("#regions = " + regions.size());
1135 Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>();
1136 tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO);
1137 master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), tmpRegions);
1138 Set<HRegionInfo> userRegionSet = new HashSet<HRegionInfo>();
1139 userRegionSet.addAll(regions);
1140 master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), userRegionSet);
1141 boolean isMetaRegionInRecovery = false;
1142 List<String> recoveringRegions =
1143 zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1144 for (String curEncodedRegionName : recoveringRegions) {
1145 if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1146 isMetaRegionInRecovery = true;
1147 break;
1148 }
1149 }
1150 assertTrue(isMetaRegionInRecovery);
1151
1152 master.getMasterFileSystem().splitMetaLog(hrs.getServerName());
1153
1154 isMetaRegionInRecovery = false;
1155 recoveringRegions =
1156 zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1157 for (String curEncodedRegionName : recoveringRegions) {
1158 if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1159 isMetaRegionInRecovery = true;
1160 break;
1161 }
1162 }
1163
1164 assertFalse(isMetaRegionInRecovery);
1165 zkw.close();
1166 }
1167
1168 @Test(timeout = 300000)
1169 public void testSameVersionUpdatesRecovery() throws Exception {
1170 LOG.info("testSameVersionUpdatesRecovery");
1171 conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1172 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1173 startCluster(NUM_RS);
1174 final AtomicLong sequenceId = new AtomicLong(100);
1175 final int NUM_REGIONS_TO_CREATE = 40;
1176 final int NUM_LOG_LINES = 1000;
1177
1178
1179 master.balanceSwitch(false);
1180
1181 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1182 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1183 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1184
1185 List<HRegionInfo> regions = null;
1186 HRegionServer hrs = null;
1187 for (int i = 0; i < NUM_RS; i++) {
1188 boolean isCarryingMeta = false;
1189 hrs = rsts.get(i).getRegionServer();
1190 regions = ProtobufUtil.getOnlineRegions(hrs);
1191 for (HRegionInfo region : regions) {
1192 if (region.isMetaRegion()) {
1193 isCarryingMeta = true;
1194 break;
1195 }
1196 }
1197 if (isCarryingMeta) {
1198 continue;
1199 }
1200 break;
1201 }
1202
1203 LOG.info("#regions = " + regions.size());
1204 Iterator<HRegionInfo> it = regions.iterator();
1205 while (it.hasNext()) {
1206 HRegionInfo region = it.next();
1207 if (region.isMetaTable()
1208 || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1209 it.remove();
1210 }
1211 }
1212 if (regions.size() == 0) return;
1213 HRegionInfo curRegionInfo = regions.get(0);
1214 byte[] startRow = curRegionInfo.getStartKey();
1215 if (startRow == null || startRow.length == 0) {
1216 startRow = new byte[] { 0, 0, 0, 0, 1 };
1217 }
1218 byte[] row = Bytes.incrementBytes(startRow, 1);
1219
1220 row = Arrays.copyOfRange(row, 3, 8);
1221 long value = 0;
1222 byte[] tableName = Bytes.toBytes("table");
1223 byte[] family = Bytes.toBytes("family");
1224 byte[] qualifier = Bytes.toBytes("c1");
1225 long timeStamp = System.currentTimeMillis();
1226 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
1227 htd.addFamily(new HColumnDescriptor(family));
1228 for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1229 WALEdit e = new WALEdit();
1230 value++;
1231 e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1232 hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e,
1233 System.currentTimeMillis(), htd, sequenceId);
1234 }
1235 hrs.getWAL().sync();
1236 hrs.getWAL().close();
1237
1238
1239 this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1240
1241
1242 LOG.info("Verification Starts...");
1243 Get g = new Get(row);
1244 Result r = ht.get(g);
1245 long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1246 assertEquals(value, theStoredVal);
1247
1248
1249 LOG.info("Verification after flush...");
1250 TEST_UTIL.getHBaseAdmin().flush(tableName);
1251 r = ht.get(g);
1252 theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1253 assertEquals(value, theStoredVal);
1254 ht.close();
1255 }
1256
1257 @Test(timeout = 300000)
1258 public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception {
1259 LOG.info("testSameVersionUpdatesRecoveryWithWrites");
1260 conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1261 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1262 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
1263 conf.setInt("hbase.hstore.compactionThreshold", 3);
1264 startCluster(NUM_RS);
1265 final AtomicLong sequenceId = new AtomicLong(100);
1266 final int NUM_REGIONS_TO_CREATE = 40;
1267 final int NUM_LOG_LINES = 1000;
1268
1269
1270 master.balanceSwitch(false);
1271
1272 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1273 final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1274 HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1275
1276 List<HRegionInfo> regions = null;
1277 HRegionServer hrs = null;
1278 for (int i = 0; i < NUM_RS; i++) {
1279 boolean isCarryingMeta = false;
1280 hrs = rsts.get(i).getRegionServer();
1281 regions = ProtobufUtil.getOnlineRegions(hrs);
1282 for (HRegionInfo region : regions) {
1283 if (region.isMetaRegion()) {
1284 isCarryingMeta = true;
1285 break;
1286 }
1287 }
1288 if (isCarryingMeta) {
1289 continue;
1290 }
1291 break;
1292 }
1293
1294 LOG.info("#regions = " + regions.size());
1295 Iterator<HRegionInfo> it = regions.iterator();
1296 while (it.hasNext()) {
1297 HRegionInfo region = it.next();
1298 if (region.isMetaTable()
1299 || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1300 it.remove();
1301 }
1302 }
1303 if (regions.size() == 0) return;
1304 HRegionInfo curRegionInfo = regions.get(0);
1305 byte[] startRow = curRegionInfo.getStartKey();
1306 if (startRow == null || startRow.length == 0) {
1307 startRow = new byte[] { 0, 0, 0, 0, 1 };
1308 }
1309 byte[] row = Bytes.incrementBytes(startRow, 1);
1310
1311 row = Arrays.copyOfRange(row, 3, 8);
1312 long value = 0;
1313 final byte[] tableName = Bytes.toBytes("table");
1314 byte[] family = Bytes.toBytes("family");
1315 byte[] qualifier = Bytes.toBytes("c1");
1316 long timeStamp = System.currentTimeMillis();
1317 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
1318 htd.addFamily(new HColumnDescriptor(family));
1319 for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1320 WALEdit e = new WALEdit();
1321 value++;
1322 e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1323 hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e,
1324 System.currentTimeMillis(), htd, sequenceId);
1325 }
1326 hrs.getWAL().sync();
1327 hrs.getWAL().close();
1328
1329
1330 this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1331
1332
1333 LOG.info("Verification Starts...");
1334 Get g = new Get(row);
1335 Result r = ht.get(g);
1336 long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1337 assertEquals(value, theStoredVal);
1338
1339
1340 LOG.info("Verification after flush...");
1341 TEST_UTIL.getHBaseAdmin().flush(tableName);
1342 TEST_UTIL.getHBaseAdmin().compact(tableName);
1343
1344
1345 TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
1346 @Override
1347 public boolean evaluate() throws Exception {
1348 return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE);
1349 }
1350 });
1351
1352 r = ht.get(g);
1353 theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1354 assertEquals(value, theStoredVal);
1355 ht.close();
1356 }
1357
1358 HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception {
1359 return installTable(zkw, tname, fname, nrs, 0);
1360 }
1361
1362 HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs,
1363 int existingRegions) throws Exception {
1364
1365 byte [] table = Bytes.toBytes(tname);
1366 byte [] family = Bytes.toBytes(fname);
1367 LOG.info("Creating table with " + nrs + " regions");
1368 HTable ht = TEST_UTIL.createTable(table, family);
1369 int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family, nrs);
1370 assertEquals(nrs, numRegions);
1371 LOG.info("Waiting for no more RIT\n");
1372 blockUntilNoRIT(zkw, master);
1373
1374
1375 LOG.debug("Disabling table\n");
1376 TEST_UTIL.getHBaseAdmin().disableTable(table);
1377 LOG.debug("Waiting for no more RIT\n");
1378 blockUntilNoRIT(zkw, master);
1379 NavigableSet<String> regions = getAllOnlineRegions(cluster);
1380 LOG.debug("Verifying only catalog and namespace regions are assigned\n");
1381 if (regions.size() != 2) {
1382 for (String oregion : regions)
1383 LOG.debug("Region still online: " + oregion);
1384 }
1385 assertEquals(2 + existingRegions, regions.size());
1386 LOG.debug("Enabling table\n");
1387 TEST_UTIL.getHBaseAdmin().enableTable(table);
1388 LOG.debug("Waiting for no more RIT\n");
1389 blockUntilNoRIT(zkw, master);
1390 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
1391 regions = getAllOnlineRegions(cluster);
1392 assertEquals(numRegions + 2 + existingRegions, regions.size());
1393 return ht;
1394 }
1395
1396 void populateDataInTable(int nrows, String fname) throws Exception {
1397 byte [] family = Bytes.toBytes(fname);
1398
1399 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1400 assertEquals(NUM_RS, rsts.size());
1401
1402 for (RegionServerThread rst : rsts) {
1403 HRegionServer hrs = rst.getRegionServer();
1404 List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs);
1405 for (HRegionInfo hri : hris) {
1406 if (hri.getTable().isSystemTable()) {
1407 continue;
1408 }
1409 LOG.debug("adding data to rs = " + rst.getName() +
1410 " region = "+ hri.getRegionNameAsString());
1411 HRegion region = hrs.getOnlineRegion(hri.getRegionName());
1412 assertTrue(region != null);
1413 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
1414 }
1415 }
1416 }
1417
1418 public void makeHLog(HLog log, List<HRegionInfo> regions, String tname, String fname,
1419 int num_edits, int edit_size) throws IOException {
1420 makeHLog(log, regions, tname, fname, num_edits, edit_size, true);
1421 }
1422
1423 public void makeHLog(HLog log, List<HRegionInfo> regions, String tname, String fname,
1424 int num_edits, int edit_size, boolean closeLog) throws IOException {
1425 TableName fullTName = TableName.valueOf(tname);
1426
1427 regions.remove(HRegionInfo.FIRST_META_REGIONINFO);
1428
1429 final AtomicLong sequenceId = new AtomicLong(10);
1430
1431
1432 for(Iterator<HRegionInfo> iter = regions.iterator(); iter.hasNext(); ) {
1433 HRegionInfo regionInfo = iter.next();
1434 if(regionInfo.getTable().isSystemTable()) {
1435 iter.remove();
1436 }
1437 }
1438 HTableDescriptor htd = new HTableDescriptor(fullTName);
1439 byte[] family = Bytes.toBytes(fname);
1440 htd.addFamily(new HColumnDescriptor(family));
1441 byte[] value = new byte[edit_size];
1442
1443 List<HRegionInfo> hris = new ArrayList<HRegionInfo>();
1444 for (HRegionInfo region : regions) {
1445 if (!region.getTable().getNameAsString().equalsIgnoreCase(tname)) {
1446 continue;
1447 }
1448 hris.add(region);
1449 }
1450 LOG.info("Creating wal edits across " + hris.size() + " regions.");
1451 for (int i = 0; i < edit_size; i++) {
1452 value[i] = (byte) ('a' + (i % 26));
1453 }
1454 int n = hris.size();
1455 int[] counts = new int[n];
1456 if (n > 0) {
1457 for (int i = 0; i < num_edits; i += 1) {
1458 WALEdit e = new WALEdit();
1459 HRegionInfo curRegionInfo = hris.get(i % n);
1460 byte[] startRow = curRegionInfo.getStartKey();
1461 if (startRow == null || startRow.length == 0) {
1462 startRow = new byte[] { 0, 0, 0, 0, 1 };
1463 }
1464 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
1465 row = Arrays.copyOfRange(row, 3, 8);
1466
1467
1468 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
1469 e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
1470 log.append(curRegionInfo, fullTName, e, System.currentTimeMillis(), htd, sequenceId);
1471 counts[i % n] += 1;
1472 }
1473 }
1474 log.sync();
1475 if(closeLog) {
1476 log.close();
1477 }
1478 for (int i = 0; i < n; i++) {
1479 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
1480 }
1481 return;
1482 }
1483
1484 private int countHLog(Path log, FileSystem fs, Configuration conf)
1485 throws IOException {
1486 int count = 0;
1487 HLog.Reader in = HLogFactory.createReader(fs, log, conf);
1488 while (in.next() != null) {
1489 count++;
1490 }
1491 return count;
1492 }
1493
1494 private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
1495 throws KeeperException, InterruptedException {
1496 ZKAssign.blockUntilNoRIT(zkw);
1497 master.assignmentManager.waitUntilNoRegionsInTransition(60000);
1498 }
1499
1500 private void putData(HRegion region, byte[] startRow, int numRows, byte [] qf,
1501 byte [] ...families)
1502 throws IOException {
1503 for(int i = 0; i < numRows; i++) {
1504 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
1505 for(byte [] family : families) {
1506 put.add(family, qf, null);
1507 }
1508 region.put(put);
1509 }
1510 }
1511
1512
1513
1514
1515 private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException {
1516 t.setAutoFlush(false, true);
1517 byte[] k = new byte[3];
1518
1519
1520 for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1521 for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1522 for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1523 k[0] = b1;
1524 k[1] = b2;
1525 k[2] = b3;
1526 Put put = new Put(k);
1527 put.add(f, column, k);
1528 t.put(put);
1529 }
1530 }
1531 }
1532 t.flushCommits();
1533
1534 for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1535 k[0] = 'a';
1536 k[1] = 'a';
1537 k[2] = b3;
1538 Delete del = new Delete(k);
1539 t.delete(del);
1540 }
1541 t.flushCommits();
1542 }
1543
1544 private NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
1545 throws IOException {
1546 NavigableSet<String> online = new TreeSet<String>();
1547 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
1548 for (HRegionInfo region : ProtobufUtil.getOnlineRegions(rst.getRegionServer())) {
1549 online.add(region.getRegionNameAsString());
1550 }
1551 }
1552 return online;
1553 }
1554
1555 private void waitForCounter(AtomicLong ctr, long oldval, long newval,
1556 long timems) {
1557 long curt = System.currentTimeMillis();
1558 long endt = curt + timems;
1559 while (curt < endt) {
1560 if (ctr.get() == oldval) {
1561 Thread.yield();
1562 curt = System.currentTimeMillis();
1563 } else {
1564 assertEquals(newval, ctr.get());
1565 return;
1566 }
1567 }
1568 assertTrue(false);
1569 }
1570
1571 private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
1572 for (MasterThread mt : cluster.getLiveMasterThreads()) {
1573 if (mt.getMaster().isActiveMaster()) {
1574 mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
1575 mt.join();
1576 break;
1577 }
1578 }
1579 LOG.debug("Master is aborted");
1580 }
1581
1582 private void startMasterAndWaitUntilLogSplit(MiniHBaseCluster cluster)
1583 throws IOException, InterruptedException {
1584 cluster.startMaster();
1585 HMaster master = cluster.getMaster();
1586 while (!master.isInitialized()) {
1587 Thread.sleep(100);
1588 }
1589 ServerManager serverManager = master.getServerManager();
1590 while (serverManager.areDeadServersInProgress()) {
1591 Thread.sleep(100);
1592 }
1593 }
1594
1595
1596
1597
1598
1599
1600
1601
1602 private HRegionServer findRSToKill(boolean hasMetaRegion, String tableName) throws Exception {
1603 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1604 int numOfRSs = rsts.size();
1605 List<HRegionInfo> regions = null;
1606 HRegionServer hrs = null;
1607
1608 for (int i = 0; i < numOfRSs; i++) {
1609 boolean isCarryingMeta = false;
1610 boolean foundTableRegion = false;
1611 hrs = rsts.get(i).getRegionServer();
1612 regions = ProtobufUtil.getOnlineRegions(hrs);
1613 for (HRegionInfo region : regions) {
1614 if (region.isMetaRegion()) {
1615 isCarryingMeta = true;
1616 }
1617 if (tableName == null || region.getTable().getNameAsString().equals(tableName)) {
1618 foundTableRegion = true;
1619 }
1620 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
1621 break;
1622 }
1623 }
1624 if (isCarryingMeta && hasMetaRegion) {
1625
1626 if (!foundTableRegion) {
1627 final HRegionServer destRS = hrs;
1628
1629 List<HRegionInfo> tableRegions =
1630 TEST_UTIL.getHBaseAdmin().getTableRegions(Bytes.toBytes(tableName));
1631 final HRegionInfo hri = tableRegions.get(0);
1632 TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
1633 Bytes.toBytes(destRS.getServerName().getServerName()));
1634
1635 final RegionStates regionStates =
1636 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
1637 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
1638 @Override
1639 public boolean evaluate() throws Exception {
1640 ServerName sn = regionStates.getRegionServerOfRegion(hri);
1641 return (sn != null && sn.equals(destRS.getServerName()));
1642 }
1643 });
1644 }
1645 return hrs;
1646 } else if (hasMetaRegion || isCarryingMeta) {
1647 continue;
1648 }
1649 if (foundTableRegion) break;
1650 }
1651
1652 return hrs;
1653 }
1654
1655 }