View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.replication.regionserver;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.mockito.Mockito.mock;
22  import static org.mockito.Mockito.when;
23  
24  import java.util.List;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.ServerName;
28  import org.apache.hadoop.hbase.testclassification.SmallTests;
29  import org.apache.hadoop.hbase.client.HConnection;
30  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
31  import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
32  import org.apache.hadoop.hbase.replication.ReplicationPeers;
33  import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
34  import org.junit.Before;
35  import org.junit.Test;
36  import org.junit.experimental.categories.Category;
37  
38  import com.google.common.collect.Lists;
39  
40  @Category(SmallTests.class)
41  public class TestReplicationSinkManager {
42  
43    private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
44  
45    private ReplicationPeers replicationPeers;
46    private HBaseReplicationEndpoint replicationEndpoint;
47    private ReplicationSinkManager sinkManager;
48  
49    @Before
50    public void setUp() {
51      replicationPeers = mock(ReplicationPeers.class);
52      replicationEndpoint = mock(HBaseReplicationEndpoint.class);
53      sinkManager = new ReplicationSinkManager(mock(HConnection.class),
54                        PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
55    }
56  
57    @Test
58    public void testChooseSinks() {
59      List<ServerName> serverNames = Lists.newArrayList();
60      for (int i = 0; i < 20; i++) {
61        serverNames.add(mock(ServerName.class));
62      }
63  
64      when(replicationEndpoint.getRegionServers())
65            .thenReturn(serverNames);
66  
67      sinkManager.chooseSinks();
68  
69      assertEquals(2, sinkManager.getNumSinks());
70  
71    }
72  
73    @Test
74    public void testChooseSinks_LessThanRatioAvailable() {
75      List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
76        mock(ServerName.class));
77  
78      when(replicationEndpoint.getRegionServers())
79            .thenReturn(serverNames);
80  
81      sinkManager.chooseSinks();
82  
83      assertEquals(1, sinkManager.getNumSinks());
84    }
85  
86    @Test
87    public void testReportBadSink() {
88      ServerName serverNameA = mock(ServerName.class);
89      ServerName serverNameB = mock(ServerName.class);
90      when(replicationEndpoint.getRegionServers())
91        .thenReturn(Lists.newArrayList(serverNameA, serverNameB));
92  
93      sinkManager.chooseSinks();
94      // Sanity check
95      assertEquals(1, sinkManager.getNumSinks());
96  
97      SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
98  
99      sinkManager.reportBadSink(sinkPeer);
100 
101     // Just reporting a bad sink once shouldn't have an effect
102     assertEquals(1, sinkManager.getNumSinks());
103 
104   }
105 
106   /**
107    * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
108    * be replicated to anymore.
109    */
110   @Test
111   public void testReportBadSink_PastThreshold() {
112     List<ServerName> serverNames = Lists.newArrayList();
113     for (int i = 0; i < 30; i++) {
114       serverNames.add(mock(ServerName.class));
115     }
116     when(replicationEndpoint.getRegionServers())
117           .thenReturn(serverNames);
118 
119 
120     sinkManager.chooseSinks();
121     // Sanity check
122     assertEquals(3, sinkManager.getNumSinks());
123 
124     ServerName serverName = sinkManager.getSinksForTesting().get(0);
125 
126     SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
127 
128     sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
129     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
130       sinkManager.reportBadSink(sinkPeer);
131     }
132 
133     // Reporting a bad sink more than the threshold count should remove it
134     // from the list of potential sinks
135     assertEquals(2, sinkManager.getNumSinks());
136 
137     //
138     // now try a sink that has some successes
139     //
140     serverName = sinkManager.getSinksForTesting().get(0);
141 
142     sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
143     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
144       sinkManager.reportBadSink(sinkPeer);
145     }
146     sinkManager.reportSinkSuccess(sinkPeer); // one success
147     sinkManager.reportBadSink(sinkPeer);
148 
149     // did not remove the sink, since we had one successful try
150     assertEquals(2, sinkManager.getNumSinks());
151 
152     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
153       sinkManager.reportBadSink(sinkPeer);
154     }
155     // still not remove, since the success reset the counter
156     assertEquals(2, sinkManager.getNumSinks());
157 
158     sinkManager.reportBadSink(sinkPeer);
159     // but we exhausted the tries
160     assertEquals(1, sinkManager.getNumSinks());
161   }
162 
163   @Test
164   public void testReportBadSink_DownToZeroSinks() {
165     List<ServerName> serverNames = Lists.newArrayList();
166     for (int i = 0; i < 20; i++) {
167       serverNames.add(mock(ServerName.class));
168     }
169     when(replicationEndpoint.getRegionServers())
170           .thenReturn(serverNames);
171 
172 
173     sinkManager.chooseSinks();
174     // Sanity check
175 
176     List<ServerName> sinkList = sinkManager.getSinksForTesting();
177     assertEquals(2, sinkList.size());
178 
179     ServerName serverNameA = sinkList.get(0);
180     ServerName serverNameB = sinkList.get(1);
181 
182     SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
183     SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
184 
185     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
186       sinkManager.reportBadSink(sinkPeerA);
187       sinkManager.reportBadSink(sinkPeerB);
188     }
189 
190     // We've gone down to 0 good sinks, so the replication sinks
191     // should have been refreshed now
192     assertEquals(2, sinkManager.getNumSinks());
193   }
194 
195 }