View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.net.URLEncoder;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.List;
28  import java.util.SortedMap;
29  import java.util.SortedSet;
30  import java.util.TreeSet;
31  import java.util.UUID;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.ClusterId;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HBaseTestingUtility;
44  import org.apache.hadoop.hbase.HColumnDescriptor;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HRegionInfo;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.KeyValue;
49  import org.apache.hadoop.hbase.MediumTests;
50  import org.apache.hadoop.hbase.Server;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.catalog.CatalogTracker;
53  import org.apache.hadoop.hbase.regionserver.wal.HLog;
54  import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
55  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
56  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
57  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
58  import org.apache.hadoop.hbase.replication.ReplicationFactory;
59  import org.apache.hadoop.hbase.replication.ReplicationPeers;
60  import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
61  import org.apache.hadoop.hbase.replication.ReplicationQueues;
62  import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
63  import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
64  import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
67  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
68  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
69  import org.junit.After;
70  import org.junit.AfterClass;
71  import org.junit.Before;
72  import org.junit.BeforeClass;
73  import org.junit.Test;
74  import org.junit.experimental.categories.Category;
75  
76  import com.google.common.collect.Sets;
77  
78  @Category(MediumTests.class)
79  public class TestReplicationSourceManager {
80  
81    private static final Log LOG =
82        LogFactory.getLog(TestReplicationSourceManager.class);
83  
84    private static Configuration conf;
85  
86    private static HBaseTestingUtility utility;
87  
88    private static Replication replication;
89  
90    private static ReplicationSourceManager manager;
91  
92    private static ZooKeeperWatcher zkw;
93  
94    private static HTableDescriptor htd;
95  
96    private static HRegionInfo hri;
97  
98    private static final byte[] r1 = Bytes.toBytes("r1");
99  
100   private static final byte[] r2 = Bytes.toBytes("r2");
101 
102   private static final byte[] f1 = Bytes.toBytes("f1");
103 
104   private static final TableName test =
105       TableName.valueOf("test");
106 
107   private static final String slaveId = "1";
108 
109   private static FileSystem fs;
110 
111   private static String logName;
112 
113   private static Path oldLogDir;
114 
115   private static Path logDir;
116   
117   private static CountDownLatch latch;
118 
119   private static List<String> files = new ArrayList<String>();
120 
121   @BeforeClass
122   public static void setUpBeforeClass() throws Exception {
123 
124     conf = HBaseConfiguration.create();
125     conf.set("replication.replicationsource.implementation",
126         ReplicationSourceDummy.class.getCanonicalName());
127     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
128         HConstants.REPLICATION_ENABLE_DEFAULT);
129     utility = new HBaseTestingUtility(conf);
130     utility.startMiniZKCluster();
131 
132     zkw = new ZooKeeperWatcher(conf, "test", null);
133     ZKUtil.createWithParents(zkw, "/hbase/replication");
134     ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
135     ZKUtil.setData(zkw, "/hbase/replication/peers/1",
136         Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
137             + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
138     ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
139     ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
140       ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
141     ZKUtil.createWithParents(zkw, "/hbase/replication/state");
142     ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
143 
144     ZKClusterId.setClusterId(zkw, new ClusterId());
145     fs = FileSystem.get(conf);
146     oldLogDir = new Path(utility.getDataTestDir(),
147         HConstants.HREGION_OLDLOGDIR_NAME);
148     logDir = new Path(utility.getDataTestDir(),
149         HConstants.HREGION_LOGDIR_NAME);
150     replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
151     manager = replication.getReplicationManager();
152     
153     logName = HConstants.HREGION_LOGDIR_NAME;
154 
155     manager.addSource(slaveId);
156 
157     htd = new HTableDescriptor(test);
158     HColumnDescriptor col = new HColumnDescriptor("f1");
159     col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
160     htd.addFamily(col);
161     col = new HColumnDescriptor("f2");
162     col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
163     htd.addFamily(col);
164 
165     hri = new HRegionInfo(htd.getTableName(), r1, r2);
166   }
167 
168   @AfterClass
169   public static void tearDownAfterClass() throws Exception {
170     manager.join();
171     utility.shutdownMiniCluster();
172   }
173 
174   @Before
175   public void setUp() throws Exception {
176     fs.delete(logDir, true);
177     fs.delete(oldLogDir, true);
178   }
179 
180   @After
181   public void tearDown() throws Exception {
182     setUp();
183   }
184 
185   @Test
186   public void testLogRoll() throws Exception {
187     long seq = 0;
188     long baseline = 1000;
189     long time = baseline;
190     KeyValue kv = new KeyValue(r1, f1, r1);
191     WALEdit edit = new WALEdit();
192     edit.add(kv);
193 
194     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
195     listeners.add(replication);
196     HLog hlog = HLogFactory.createHLog(fs, utility.getDataTestDir(), logName,
197         conf, listeners, URLEncoder.encode("regionserver:60020", "UTF8"));
198     final AtomicLong sequenceId = new AtomicLong(1);
199     manager.init();
200     HTableDescriptor htd = new HTableDescriptor();
201     htd.addFamily(new HColumnDescriptor(f1));
202     // Testing normal log rolling every 20
203     for(long i = 1; i < 101; i++) {
204       if(i > 1 && i % 20 == 0) {
205         hlog.rollWriter();
206       }
207       LOG.info(i);
208       HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
209           System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
210       hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
211     }
212 
213     // Simulate a rapid insert that's followed
214     // by a report that's still not totally complete (missing last one)
215     LOG.info(baseline + " and " + time);
216     baseline += 101;
217     time = baseline;
218     LOG.info(baseline + " and " + time);
219 
220     for (int i = 0; i < 3; i++) {
221       hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
222     }
223 
224     assertEquals(6, manager.getHLogs().get(slaveId).size());
225 
226     hlog.rollWriter();
227 
228     manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
229         "1", 0, false, false);
230 
231     hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
232 
233     assertEquals(1, manager.getHLogs().size());
234 
235 
236     // TODO Need a case with only 2 HLogs and we only want to delete the first one
237   }
238   
239   @Test
240   public void testClaimQueues() throws Exception {
241     LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
242     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
243     final Server server = new DummyServer("hostname0.example.org");
244     ReplicationQueues rq =
245         ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
246           server);
247     rq.init(server.getServerName().toString());
248     // populate some znodes in the peer znode
249     files.add("log1");
250     files.add("log2");
251     for (String file : files) {
252       rq.addLog("1", file);
253     }
254     // create 3 DummyServers
255     Server s1 = new DummyServer("dummyserver1.example.org");
256     Server s2 = new DummyServer("dummyserver2.example.org");
257     Server s3 = new DummyServer("dummyserver3.example.org");
258 
259     // create 3 DummyNodeFailoverWorkers
260     DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
261         server.getServerName().getServerName(), s1);
262     DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
263         server.getServerName().getServerName(), s2);
264     DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
265         server.getServerName().getServerName(), s3);
266 
267     latch = new CountDownLatch(3);
268     // start the threads
269     w1.start();
270     w2.start();
271     w3.start();
272     // make sure only one is successful
273     int populatedMap = 0;
274     // wait for result now... till all the workers are done.
275     latch.await();
276     populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
277         + w3.isLogZnodesMapPopulated();
278     assertEquals(1, populatedMap);
279     server.abort("", null);
280   }
281   
282   @Test
283   public void testCleanupFailoverQueues() throws Exception {
284     final Server server = new DummyServer("hostname1.example.org");
285     ReplicationQueues rq =
286         ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
287           server);
288     rq.init(server.getServerName().toString());
289     // populate some znodes in the peer znode
290     SortedSet<String> files = new TreeSet<String>();
291     files.add("log1");
292     files.add("log2");
293     for (String file : files) {
294       rq.addLog("1", file);
295     }
296     Server s1 = new DummyServer("dummyserver1.example.org");
297     ReplicationQueues rq1 =
298         ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
299     rq1.init(s1.getServerName().toString());
300     ReplicationPeers rp1 =
301         ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
302     rp1.init();
303     NodeFailoverWorker w1 =
304         manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
305             new Long(1), new Long(2)));
306     w1.start();
307     w1.join(5000);
308     assertEquals(1, manager.getHlogsByIdRecoveredQueues().size());
309     String id = "1-" + server.getServerName().getServerName();
310     assertEquals(files, manager.getHlogsByIdRecoveredQueues().get(id));
311     manager.cleanOldLogs("log2", id, true);
312     // log1 should be deleted
313     assertEquals(Sets.newHashSet("log2"), manager.getHlogsByIdRecoveredQueues().get(id));
314   }
315 
316   @Test
317   public void testNodeFailoverDeadServerParsing() throws Exception {
318     LOG.debug("testNodeFailoverDeadServerParsing");
319     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
320     final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
321     ReplicationQueues repQueues =
322         ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
323     repQueues.init(server.getServerName().toString());
324     // populate some znodes in the peer znode
325     files.add("log1");
326     files.add("log2");
327     for (String file : files) {
328       repQueues.addLog("1", file);
329     }
330 
331     // create 3 DummyServers
332     Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
333     Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
334     Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
335 
336     // simulate three servers fail sequentially
337     ReplicationQueues rq1 =
338         ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
339     rq1.init(s1.getServerName().toString());
340     SortedMap<String, SortedSet<String>> testMap =
341         rq1.claimQueues(server.getServerName().getServerName());
342     ReplicationQueues rq2 =
343         ReplicationFactory.getReplicationQueues(s2.getZooKeeper(), s2.getConfiguration(), s2);
344     rq2.init(s2.getServerName().toString());
345     testMap = rq2.claimQueues(s1.getServerName().getServerName());
346     ReplicationQueues rq3 =
347         ReplicationFactory.getReplicationQueues(s3.getZooKeeper(), s3.getConfiguration(), s3);
348     rq3.init(s3.getServerName().toString());
349     testMap = rq3.claimQueues(s2.getServerName().getServerName());
350 
351     ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
352     List<String> result = replicationQueueInfo.getDeadRegionServers();
353 
354     // verify
355     assertTrue(result.contains(server.getServerName().getServerName()));
356     assertTrue(result.contains(s1.getServerName().getServerName()));
357     assertTrue(result.contains(s2.getServerName().getServerName()));
358 
359     server.abort("", null);
360   }
361   
362   
363   static class DummyNodeFailoverWorker extends Thread {
364     private SortedMap<String, SortedSet<String>> logZnodesMap;
365     Server server;
366     private String deadRsZnode;
367     ReplicationQueues rq;
368 
369     public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
370       this.deadRsZnode = znode;
371       this.server = s;
372       this.rq =
373           ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
374             server);
375       this.rq.init(this.server.getServerName().toString());
376     }
377 
378     @Override
379     public void run() {
380       try {
381         logZnodesMap = rq.claimQueues(deadRsZnode);
382         server.abort("Done with testing", null);
383       } catch (Exception e) {
384         LOG.error("Got exception while running NodeFailoverWorker", e);
385       } finally {
386         latch.countDown();
387       }
388     }
389 
390     /**
391      * @return 1 when the map is not empty.
392      */
393     private int isLogZnodesMapPopulated() {
394       Collection<SortedSet<String>> sets = logZnodesMap.values();
395       if (sets.size() > 1) {
396         throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size());
397       }
398       if (sets.size() == 1) {
399         SortedSet<String> s = sets.iterator().next();
400         for (String file : files) {
401           // at least one file was missing
402           if (!s.contains(file)) {
403             return 0;
404           }
405         }
406         return 1; // we found all the files
407       }
408       return 0;
409     }
410   }
411   
412   static class DummyServer implements Server {
413     String hostname;
414 
415     DummyServer() {
416       hostname = "hostname.example.org";
417     }
418 
419     DummyServer(String hostname) {
420       this.hostname = hostname;
421     }
422 
423     @Override
424     public Configuration getConfiguration() {
425       return conf;
426     }
427 
428     @Override
429     public ZooKeeperWatcher getZooKeeper() {
430       return zkw;
431     }
432 
433     @Override
434     public CatalogTracker getCatalogTracker() {
435       return null; // To change body of implemented methods use File | Settings | File Templates.
436     }
437 
438     @Override
439     public ServerName getServerName() {
440       return ServerName.valueOf(hostname, 1234, 1L);
441     }
442 
443     @Override
444     public void abort(String why, Throwable e) {
445       // To change body of implemented methods use File | Settings | File Templates.
446     }
447 
448     @Override
449     public boolean isAborted() {
450       return false;
451     }
452 
453     @Override
454     public void stop(String why) {
455       // To change body of implemented methods use File | Settings | File Templates.
456     }
457 
458     @Override
459     public boolean isStopped() {
460       return false; // To change body of implemented methods use File | Settings | File Templates.
461     }
462   }
463 
464 }
465