View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.replication.regionserver;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.mockito.Mockito.mock;
22  import static org.mockito.Mockito.when;
23  
24  import java.util.List;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.ServerName;
28  import org.apache.hadoop.hbase.SmallTests;
29  import org.apache.hadoop.hbase.client.HConnection;
30  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
31  import org.apache.hadoop.hbase.replication.ReplicationPeers;
32  import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
33  import org.junit.Before;
34  import org.junit.Test;
35  import org.junit.experimental.categories.Category;
36  
37  import com.google.common.collect.Lists;
38  
39  @Category(SmallTests.class)
40  public class TestReplicationSinkManager {
41  
42    private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
43  
44    private ReplicationPeers replicationPeers;
45    private ReplicationSinkManager sinkManager;
46  
47    @Before
48    public void setUp() {
49      replicationPeers = mock(ReplicationPeers.class);
50      sinkManager = new ReplicationSinkManager(mock(HConnection.class),
51                        PEER_CLUSTER_ID, replicationPeers, new Configuration());
52    }
53  
54    @Test
55    public void testChooseSinks() {
56      List<ServerName> serverNames = Lists.newArrayList();
57      for (int i = 0; i < 20; i++) {
58        serverNames.add(mock(ServerName.class));
59      }
60  
61      when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
62            .thenReturn(serverNames);
63  
64      sinkManager.chooseSinks();
65  
66      assertEquals(2, sinkManager.getSinks().size());
67  
68    }
69  
70    @Test
71    public void testChooseSinks_LessThanRatioAvailable() {
72      List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
73        mock(ServerName.class));
74  
75      when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
76            .thenReturn(serverNames);
77  
78      sinkManager.chooseSinks();
79  
80      assertEquals(1, sinkManager.getSinks().size());
81    }
82  
83    @Test
84    public void testReportBadSink() {
85      ServerName serverNameA = mock(ServerName.class);
86      ServerName serverNameB = mock(ServerName.class);
87      when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(
88        Lists.newArrayList(serverNameA, serverNameB));
89  
90      sinkManager.chooseSinks();
91      // Sanity check
92      assertEquals(1, sinkManager.getSinks().size());
93  
94      SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
95  
96      sinkManager.reportBadSink(sinkPeer);
97  
98      // Just reporting a bad sink once shouldn't have an effect
99      assertEquals(1, sinkManager.getSinks().size());
100 
101   }
102 
103   /**
104    * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
105    * be replicated to anymore.
106    */
107   @Test
108   public void testReportBadSink_PastThreshold() {
109     List<ServerName> serverNames = Lists.newArrayList();
110     for (int i = 0; i < 20; i++) {
111       serverNames.add(mock(ServerName.class));
112     }
113     when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
114           .thenReturn(serverNames);
115 
116 
117     sinkManager.chooseSinks();
118     // Sanity check
119     assertEquals(2, sinkManager.getSinks().size());
120 
121     ServerName serverName = sinkManager.getSinks().get(0);
122 
123     SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
124 
125     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
126       sinkManager.reportBadSink(sinkPeer);
127     }
128 
129     // Reporting a bad sink more than the threshold count should remove it
130     // from the list of potential sinks
131     assertEquals(1, sinkManager.getSinks().size());
132   }
133 
134   @Test
135   public void testReportBadSink_DownToZeroSinks() {
136     List<ServerName> serverNames = Lists.newArrayList();
137     for (int i = 0; i < 20; i++) {
138       serverNames.add(mock(ServerName.class));
139     }
140     when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
141           .thenReturn(serverNames);
142 
143 
144     sinkManager.chooseSinks();
145     // Sanity check
146 
147     List<ServerName> sinkList = sinkManager.getSinks();
148     assertEquals(2, sinkList.size());
149 
150     ServerName serverNameA = sinkList.get(0);
151     ServerName serverNameB = sinkList.get(1);
152 
153     SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
154     SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
155 
156     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
157       sinkManager.reportBadSink(sinkPeerA);
158       sinkManager.reportBadSink(sinkPeerB);
159     }
160 
161     // We've gone down to 0 good sinks, so the replication sinks
162     // should have been refreshed now
163     assertEquals(2, sinkManager.getSinks().size());
164   }
165 
166 }