1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.replication.regionserver;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.when;
23
24 import java.util.List;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.ServerName;
28 import org.apache.hadoop.hbase.SmallTests;
29 import org.apache.hadoop.hbase.client.HConnection;
30 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
31 import org.apache.hadoop.hbase.replication.ReplicationPeers;
32 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.junit.experimental.categories.Category;
36
37 import com.google.common.collect.Lists;
38
39 @Category(SmallTests.class)
40 public class TestReplicationSinkManager {
41
42 private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
43
44 private ReplicationPeers replicationPeers;
45 private ReplicationSinkManager sinkManager;
46
47 @Before
48 public void setUp() {
49 replicationPeers = mock(ReplicationPeers.class);
50 sinkManager = new ReplicationSinkManager(mock(HConnection.class),
51 PEER_CLUSTER_ID, replicationPeers, new Configuration());
52 }
53
54 @Test
55 public void testChooseSinks() {
56 List<ServerName> serverNames = Lists.newArrayList();
57 for (int i = 0; i < 20; i++) {
58 serverNames.add(mock(ServerName.class));
59 }
60
61 when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
62 .thenReturn(serverNames);
63
64 sinkManager.chooseSinks();
65
66 assertEquals(2, sinkManager.getSinks().size());
67
68 }
69
70 @Test
71 public void testChooseSinks_LessThanRatioAvailable() {
72 List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
73 mock(ServerName.class));
74
75 when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
76 .thenReturn(serverNames);
77
78 sinkManager.chooseSinks();
79
80 assertEquals(1, sinkManager.getSinks().size());
81 }
82
83 @Test
84 public void testReportBadSink() {
85 ServerName serverNameA = mock(ServerName.class);
86 ServerName serverNameB = mock(ServerName.class);
87 when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(
88 Lists.newArrayList(serverNameA, serverNameB));
89
90 sinkManager.chooseSinks();
91
92 assertEquals(1, sinkManager.getSinks().size());
93
94 SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
95
96 sinkManager.reportBadSink(sinkPeer);
97
98
99 assertEquals(1, sinkManager.getSinks().size());
100
101 }
102
103
104
105
106
107 @Test
108 public void testReportBadSink_PastThreshold() {
109 List<ServerName> serverNames = Lists.newArrayList();
110 for (int i = 0; i < 20; i++) {
111 serverNames.add(mock(ServerName.class));
112 }
113 when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
114 .thenReturn(serverNames);
115
116
117 sinkManager.chooseSinks();
118
119 assertEquals(2, sinkManager.getSinks().size());
120
121 ServerName serverName = sinkManager.getSinks().get(0);
122
123 SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
124
125 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
126 sinkManager.reportBadSink(sinkPeer);
127 }
128
129
130
131 assertEquals(1, sinkManager.getSinks().size());
132 }
133
134 @Test
135 public void testReportBadSink_DownToZeroSinks() {
136 List<ServerName> serverNames = Lists.newArrayList();
137 for (int i = 0; i < 20; i++) {
138 serverNames.add(mock(ServerName.class));
139 }
140 when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
141 .thenReturn(serverNames);
142
143
144 sinkManager.chooseSinks();
145
146
147 List<ServerName> sinkList = sinkManager.getSinks();
148 assertEquals(2, sinkList.size());
149
150 ServerName serverNameA = sinkList.get(0);
151 ServerName serverNameB = sinkList.get(1);
152
153 SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
154 SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
155
156 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
157 sinkManager.reportBadSink(sinkPeerA);
158 sinkManager.reportBadSink(sinkPeerB);
159 }
160
161
162
163 assertEquals(2, sinkManager.getSinks().size());
164 }
165
166 }