View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.mockito.Mockito.mock;
25  import static org.mockito.Mockito.when;
26  
27  import java.util.List;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.HBaseConfiguration;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.MediumTests;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.SplitLogCounters;
39  import org.apache.hadoop.hbase.SplitLogTask;
40  import org.apache.hadoop.hbase.Waiter;
41  import org.apache.hadoop.hbase.executor.ExecutorService;
42  import org.apache.hadoop.hbase.executor.ExecutorType;
43  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
44  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
45  import org.apache.hadoop.hbase.util.CancelableProgressable;
46  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
47  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
48  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
49  import org.apache.log4j.Level;
50  import org.apache.log4j.Logger;
51  import org.apache.zookeeper.CreateMode;
52  import org.apache.zookeeper.ZooDefs.Ids;
53  import org.junit.After;
54  import org.junit.Before;
55  import org.junit.Test;
56  import org.junit.experimental.categories.Category;
57  
58  @Category(MediumTests.class)
59  public class TestSplitLogWorker {
60    private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
61    private static final int WAIT_TIME = 15000;
62    private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
63    static {
64      Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
65    }
66    private final static HBaseTestingUtility TEST_UTIL =
67      new HBaseTestingUtility();
68    private ZooKeeperWatcher zkw;
69    private SplitLogWorker slw;
70    private ExecutorService executorService;
71    private RecoveryMode mode;
72  
73    private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
74        throws Exception {
75      assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval,
76        waitForCounterBoolean(ctr, oldval, newval, timems));
77    }
78  
79    private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval,
80        long timems) throws Exception {
81  
82      return waitForCounterBoolean(ctr, oldval, newval, timems, true);
83    }
84  
85    private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval,
86        long timems, boolean failIfTimeout) throws Exception {
87  
88      long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
89        new Waiter.Predicate<Exception>() {
90        @Override
91        public boolean evaluate() throws Exception {
92              return (ctr.get() >= newval);
93        }
94      });
95  
96      if( timeWaited > 0) {
97        // when not timed out
98        assertEquals(newval, ctr.get());
99      }
100     return true;
101   }
102 
103   @Before
104   public void setup() throws Exception {
105     TEST_UTIL.startMiniZKCluster();
106     Configuration conf = TEST_UTIL.getConfiguration();
107     zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
108         "split-log-worker-tests", null);
109     ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
110     ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
111     assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
112     LOG.debug(zkw.baseZNode + " created");
113     ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
114     assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
115     LOG.debug(zkw.splitLogZNode + " created");
116     ZKUtil.createAndFailSilent(zkw, zkw.rsZNode);
117     assertTrue(ZKUtil.checkExists(zkw, zkw.rsZNode) != -1);
118     SplitLogCounters.resetCounters();
119     executorService = new ExecutorService("TestSplitLogWorker");
120     executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
121     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? 
122         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
123   }
124 
125   @After
126   public void teardown() throws Exception {
127     if (executorService != null) {
128       executorService.shutdown();
129     }
130     TEST_UTIL.shutdownMiniZKCluster();
131   }
132 
133   SplitLogWorker.TaskExecutor neverEndingTask =
134     new SplitLogWorker.TaskExecutor() {
135 
136       @Override
137       public Status exec(String name, RecoveryMode mode, CancelableProgressable p) {
138         while (true) {
139           try {
140             Thread.sleep(1000);
141           } catch (InterruptedException e) {
142             return Status.PREEMPTED;
143           }
144           if (!p.progress()) {
145             return Status.PREEMPTED;
146           }
147         }
148       }
149 
150   };
151 
152   @Test(timeout=60000)
153   public void testAcquireTaskAtStartup() throws Exception {
154     LOG.info("testAcquireTaskAtStartup");
155     SplitLogCounters.resetCounters();
156     final String TATAS = "tatas";
157     final ServerName RS = ServerName.valueOf("rs,1,1");
158     RegionServerServices mockedRS = getRegionServer(RS);
159     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
160       new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), 
161         Ids.OPEN_ACL_UNSAFE,
162         CreateMode.PERSISTENT);
163 
164     SplitLogWorker slw =
165         new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
166     slw.start();
167     try {
168       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
169       byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
170       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
171       assertTrue(slt.isOwned(RS));
172     } finally {
173       stopSplitLogWorker(slw);
174     }
175   }
176 
177   private void stopSplitLogWorker(final SplitLogWorker slw)
178   throws InterruptedException {
179     if (slw != null) {
180       slw.stop();
181       slw.worker.join(WAIT_TIME);
182       if (slw.worker.isAlive()) {
183         assertTrue(("Could not stop the worker thread slw=" + slw) == null);
184       }
185     }
186   }
187 
188   @Test(timeout=60000)
189   public void testRaceForTask() throws Exception {
190     LOG.info("testRaceForTask");
191     SplitLogCounters.resetCounters();
192     final String TRFT = "trft";
193     final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
194     final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
195     zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
196       new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), 
197         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
198     RegionServerServices mockedRS1 = getRegionServer(SVR1);
199     RegionServerServices mockedRS2 = getRegionServer(SVR2);
200     SplitLogWorker slw1 =
201         new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
202     SplitLogWorker slw2 =
203         new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
204     slw1.start();
205     slw2.start();
206     try {
207       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
208       // Assert that either the tot_wkr_failed_to_grab_task_owned count was set of if
209       // not it, that we fell through to the next counter in line and it was set.
210       assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
211           WAIT_TIME, false) ||
212         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
213       byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
214       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
215       assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));
216     } finally {
217       stopSplitLogWorker(slw1);
218       stopSplitLogWorker(slw2);
219     }
220   }
221 
222   @Test(timeout=60000)
223   public void testPreemptTask() throws Exception {
224     LOG.info("testPreemptTask");
225     SplitLogCounters.resetCounters();
226     final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
227     final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
228     RegionServerServices mockedRS = getRegionServer(SRV);
229     SplitLogWorker slw =
230         new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
231     slw.start();
232     try {
233       Thread.yield(); // let the worker start
234       Thread.sleep(1000);
235       waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
236 
237       // this time create a task node after starting the splitLogWorker
238       zkw.getRecoverableZooKeeper().create(PATH,
239         new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(), 
240         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
241 
242       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
243       assertEquals(1, slw.taskReadySeq);
244       byte [] bytes = ZKUtil.getData(zkw, PATH);
245       SplitLogTask slt = SplitLogTask.parseFrom(bytes);
246       assertTrue(slt.isOwned(SRV));
247       slt = new SplitLogTask.Owned(MANAGER, this.mode);
248       ZKUtil.setData(zkw, PATH, slt.toByteArray());
249       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
250     } finally {
251       stopSplitLogWorker(slw);
252     }
253   }
254 
255   @Test(timeout=60000)
256   public void testMultipleTasks() throws Exception {
257     LOG.info("testMultipleTasks");
258     SplitLogCounters.resetCounters();
259     final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
260     final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
261     RegionServerServices mockedRS = getRegionServer(SRV);
262     SplitLogWorker slw =
263         new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
264     slw.start();
265     try {
266       Thread.yield(); // let the worker start
267       Thread.sleep(100);
268       waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
269 
270       SplitLogTask unassignedManager = 
271         new SplitLogTask.Unassigned(MANAGER, this.mode);
272       zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
273         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
274 
275       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
276       // now the worker is busy doing the above task
277 
278       // create another task
279       final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
280       zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
281         Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
282 
283       // preempt the first task, have it owned by another worker
284       final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
285       SplitLogTask slt = new SplitLogTask.Owned(anotherWorker, this.mode);
286       ZKUtil.setData(zkw, PATH1, slt.toByteArray());
287       waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
288 
289       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
290       assertEquals(2, slw.taskReadySeq);
291       byte [] bytes = ZKUtil.getData(zkw, PATH2);
292       slt = SplitLogTask.parseFrom(bytes);
293       assertTrue(slt.isOwned(SRV));
294     } finally {
295       stopSplitLogWorker(slw);
296     }
297   }
298 
299   @Test(timeout=60000)
300   public void testRescan() throws Exception {
301     LOG.info("testRescan");
302     SplitLogCounters.resetCounters();
303     final ServerName SRV = ServerName.valueOf("svr,1,1");
304     RegionServerServices mockedRS = getRegionServer(SRV);
305     slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
306     slw.start();
307     Thread.yield(); // let the worker start
308     Thread.sleep(100);
309 
310     String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
311     SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, this.mode);
312     zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
313       CreateMode.PERSISTENT);
314 
315     waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
316     // now the worker is busy doing the above task
317 
318     // preempt the task, have it owned by another worker
319     ZKUtil.setData(zkw, task, slt.toByteArray());
320     waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
321 
322     // create a RESCAN node
323     String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
324     rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
325       CreateMode.PERSISTENT_SEQUENTIAL);
326 
327     waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
328     // RESCAN node might not have been processed if the worker became busy
329     // with the above task. preempt the task again so that now the RESCAN
330     // node is processed
331     ZKUtil.setData(zkw, task, slt.toByteArray());
332     waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
333     waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);
334 
335     List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
336     LOG.debug(nodes);
337     int num = 0;
338     for (String node : nodes) {
339       num++;
340       if (node.startsWith("RESCAN")) {
341         String name = ZKSplitLog.getEncodedNodeName(zkw, node);
342         String fn = ZKSplitLog.getFileName(name);
343         byte [] data = ZKUtil.getData(zkw, ZKUtil.joinZNode(zkw.splitLogZNode, fn));
344         slt = SplitLogTask.parseFrom(data);
345         assertTrue(slt.toString(), slt.isDone(SRV));
346       }
347     }
348     assertEquals(2, num);
349   }
350 
351   @Test(timeout=60000)
352   public void testAcquireMultiTasks() throws Exception {
353     LOG.info("testAcquireMultiTasks");
354     SplitLogCounters.resetCounters();
355     final String TATAS = "tatas";
356     final ServerName RS = ServerName.valueOf("rs,1,1");
357     final int maxTasks = 3;
358     Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
359     testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
360     RegionServerServices mockedRS = getRegionServer(RS);
361 
362     for (int i = 0; i < maxTasks; i++) {
363       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
364         new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
365           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
366     }
367 
368     SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
369     slw.start();
370     try {
371       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
372       for (int i = 0; i < maxTasks; i++) {
373         byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
374         SplitLogTask slt = SplitLogTask.parseFrom(bytes);
375         assertTrue(slt.isOwned(RS));
376       }
377     } finally {
378       stopSplitLogWorker(slw);
379     }
380   }
381 
382   /**
383    * The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per
384    * RS
385    * @throws Exception
386    */
387   @Test(timeout=60000)
388   public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
389     LOG.info("testAcquireMultiTasks");
390     SplitLogCounters.resetCounters();
391     final String TATAS = "tatas";
392     final ServerName RS = ServerName.valueOf("rs,1,1");
393     final ServerName RS2 = ServerName.valueOf("rs,1,2");
394     final int maxTasks = 3;
395     Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
396     testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
397     RegionServerServices mockedRS = getRegionServer(RS);
398 
399     // create two RS nodes
400     String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName());
401     zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
402     rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName());
403     zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
404 
405     for (int i = 0; i < maxTasks; i++) {
406       zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
407         new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
408           Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
409     }
410 
411     SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
412     slw.start();
413     try {
414       int acquiredTasks = 0;
415       waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME);
416       for (int i = 0; i < maxTasks; i++) {
417         byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
418         SplitLogTask slt = SplitLogTask.parseFrom(bytes);
419         if (slt.isOwned(RS)) {
420           acquiredTasks++;
421         }
422       }
423       assertEquals(2, acquiredTasks);
424     } finally {
425       stopSplitLogWorker(slw);
426     }
427   }
428 
429   /**
430    * Create a mocked region server service instance
431    * @param server
432    * @return
433    */
434   private RegionServerServices getRegionServer(ServerName name) {
435 
436     RegionServerServices mockedServer = mock(RegionServerServices.class);
437     when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
438     when(mockedServer.getServerName()).thenReturn(name);
439     when(mockedServer.getZooKeeper()).thenReturn(zkw);
440     when(mockedServer.isStopped()).thenReturn(false);
441     when(mockedServer.getExecutorService()).thenReturn(executorService);
442 
443     return mockedServer;
444   }
445 
446 }