1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.lang.reflect.Constructor;
26 import java.lang.reflect.InvocationTargetException;
27 import java.net.InetAddress;
28 import java.net.InetSocketAddress;
29 import java.net.UnknownHostException;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.classification.InterfaceAudience;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.hbase.ClusterStatus;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.ServerName;
42 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
43 import org.apache.hadoop.hbase.util.Threads;
44 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
45 import org.jboss.netty.channel.ChannelHandlerContext;
46 import org.jboss.netty.channel.Channels;
47 import org.jboss.netty.channel.ExceptionEvent;
48 import org.jboss.netty.channel.MessageEvent;
49 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
50 import org.jboss.netty.channel.socket.DatagramChannel;
51 import org.jboss.netty.channel.socket.DatagramChannelFactory;
52 import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
53 import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
54
55
56
57
58
59
60
61 @InterfaceAudience.Private
62 class ClusterStatusListener implements Closeable {
63 private static final Log LOG = LogFactory.getLog(ClusterStatusListener.class);
64 private final List<ServerName> deadServers = new ArrayList<ServerName>();
65 protected final DeadServerHandler deadServerHandler;
66 private final Listener listener;
67
68
69
70
71 public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
72 public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
73 MulticastListener.class;
74
75
76
77
78 public interface DeadServerHandler {
79
80
81
82
83
84
85
86 void newDead(ServerName sn);
87 }
88
89
90
91
92
93 interface Listener extends Closeable {
94
95
96
97 @Override
98 void close();
99
100
101
102
103
104
105
106 void connect(Configuration conf) throws IOException;
107 }
108
109 public ClusterStatusListener(DeadServerHandler dsh, Configuration conf,
110 Class<? extends Listener> listenerClass) throws IOException {
111 this.deadServerHandler = dsh;
112 try {
113 Constructor<? extends Listener> ctor =
114 listenerClass.getConstructor(ClusterStatusListener.class);
115 this.listener = ctor.newInstance(this);
116 } catch (InstantiationException e) {
117 throw new IOException("Can't create listener " + listenerClass.getName(), e);
118 } catch (IllegalAccessException e) {
119 throw new IOException("Can't create listener " + listenerClass.getName(), e);
120 } catch (NoSuchMethodException e) {
121 throw new IllegalStateException();
122 } catch (InvocationTargetException e) {
123 throw new IllegalStateException();
124 }
125
126 this.listener.connect(conf);
127 }
128
129
130
131
132
133
134 public void receive(ClusterStatus ncs) {
135 if (ncs.getDeadServerNames() != null) {
136 for (ServerName sn : ncs.getDeadServerNames()) {
137 if (!isDeadServer(sn)) {
138 LOG.info("There is a new dead server: " + sn);
139 deadServers.add(sn);
140 if (deadServerHandler != null) {
141 deadServerHandler.newDead(sn);
142 }
143 }
144 }
145 }
146 }
147
148 @Override
149 public void close() {
150 listener.close();
151 }
152
153
154
155
156
157
158
159 public boolean isDeadServer(ServerName sn) {
160 if (sn.getStartcode() <= 0) {
161 return false;
162 }
163
164 for (ServerName dead : deadServers) {
165 if (dead.getStartcode() >= sn.getStartcode() &&
166 dead.getPort() == sn.getPort() &&
167 dead.getHostname().equals(sn.getHostname())) {
168 return true;
169 }
170 }
171
172 return false;
173 }
174
175
176
177
178
179 class MulticastListener implements Listener {
180 private DatagramChannel channel;
181 private final ExecutorService service = Executors.newSingleThreadExecutor(
182 Threads.newDaemonThreadFactory("hbase-client-clusterStatus-multiCastListener"));
183
184
185 public MulticastListener() {
186 }
187
188 @Override
189 public void connect(Configuration conf) throws IOException {
190
191 DatagramChannelFactory f = new OioDatagramChannelFactory(service);
192
193 ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
194 b.setPipeline(Channels.pipeline(
195 new ProtobufDecoder(ClusterStatusProtos.ClusterStatus.getDefaultInstance()),
196 new ClusterStatusHandler()));
197
198 String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
199 HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
200 String bindAddress = conf.get(HConstants.STATUS_MULTICAST_BIND_ADDRESS,
201 HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
202 int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
203 HConstants.DEFAULT_STATUS_MULTICAST_PORT);
204
205 channel = (DatagramChannel) b.bind(new InetSocketAddress(bindAddress, port));
206
207 channel.getConfig().setReuseAddress(true);
208
209 InetAddress ina;
210 try {
211 ina = InetAddress.getByName(mcAddress);
212 } catch (UnknownHostException e) {
213 throw new IOException("Can't connect to " + mcAddress, e);
214 }
215 channel.joinGroup(ina);
216 }
217
218 @Override
219 public void close() {
220 if (channel != null) {
221 channel.close();
222 channel = null;
223 }
224 service.shutdown();
225 }
226
227
228
229
230
231 private class ClusterStatusHandler extends SimpleChannelUpstreamHandler {
232
233 @Override
234 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
235 ClusterStatusProtos.ClusterStatus csp = (ClusterStatusProtos.ClusterStatus) e.getMessage();
236 ClusterStatus ncs = ClusterStatus.convert(csp);
237 receive(ncs);
238 }
239
240
241
242
243
244 @Override
245 public void exceptionCaught(
246 ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
247 LOG.error("Unexpected exception, continuing.", e.getCause());
248 }
249 }
250 }
251 }