1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.master;
22
23
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.Chore;
27 import org.apache.hadoop.hbase.ClusterStatus;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.ServerName;
30 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
31 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
32 import org.apache.hadoop.hbase.util.Pair;
33 import org.apache.hadoop.hbase.util.Threads;
34 import org.apache.hadoop.hbase.util.VersionInfo;
35 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
36 import org.jboss.netty.channel.ChannelEvent;
37 import org.jboss.netty.channel.ChannelHandlerContext;
38 import org.jboss.netty.channel.ChannelUpstreamHandler;
39 import org.jboss.netty.channel.Channels;
40 import org.jboss.netty.channel.socket.DatagramChannel;
41 import org.jboss.netty.channel.socket.DatagramChannelFactory;
42 import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
43 import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
44
45 import java.io.Closeable;
46 import java.io.IOException;
47 import java.net.InetAddress;
48 import java.net.InetSocketAddress;
49 import java.net.UnknownHostException;
50 import java.util.ArrayList;
51 import java.util.Collections;
52 import java.util.Comparator;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.concurrent.ConcurrentHashMap;
56 import java.util.concurrent.ConcurrentMap;
57 import java.util.concurrent.ExecutorService;
58 import java.util.concurrent.Executors;
59
60
61
62
63
64
65
66 @InterfaceAudience.Private
67 public class ClusterStatusPublisher extends Chore {
68
69
70
71
72
73 public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
74 public static final Class<? extends ClusterStatusPublisher.Publisher>
75 DEFAULT_STATUS_PUBLISHER_CLASS =
76 org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
77
78
79
80
81 public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
82 public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
83
84 private long lastMessageTime = 0;
85 private final HMaster master;
86 private final int messagePeriod;
87 private final ConcurrentMap<ServerName, Integer> lastSent =
88 new ConcurrentHashMap<ServerName, Integer>();
89 private Publisher publisher;
90 private boolean connected = false;
91
92
93
94
95
96 public final static int MAX_SERVER_PER_MESSAGE = 10;
97
98
99
100
101
102 public final static int NB_SEND = 5;
103
104 public ClusterStatusPublisher(HMaster master, Configuration conf,
105 Class<? extends Publisher> publisherClass)
106 throws IOException {
107 super("HBase clusterStatusPublisher for " + master.getName(),
108 conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD), master);
109 this.master = master;
110 this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
111 try {
112 this.publisher = publisherClass.newInstance();
113 } catch (InstantiationException e) {
114 throw new IOException("Can't create publisher " + publisherClass.getName(), e);
115 } catch (IllegalAccessException e) {
116 throw new IOException("Can't create publisher " + publisherClass.getName(), e);
117 }
118 this.publisher.connect(conf);
119 connected = true;
120 }
121
122
123 protected ClusterStatusPublisher() {
124 master = null;
125 messagePeriod = 0;
126 }
127
128 @Override
129 protected void chore() {
130 if (!connected) {
131 return;
132 }
133
134 List<ServerName> sns = generateDeadServersListToSend();
135 if (sns.isEmpty()) {
136
137 return;
138 }
139
140 final long curTime = EnvironmentEdgeManager.currentTimeMillis();
141 if (lastMessageTime > curTime - messagePeriod) {
142
143 return;
144 }
145
146
147 lastMessageTime = curTime;
148
149
150
151
152 ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(),
153 master.getMasterFileSystem().getClusterId().toString(),
154 null,
155 sns,
156 master.getServerName(),
157 null,
158 null,
159 null,
160 null);
161
162
163 publisher.publish(cs);
164 }
165
166 protected void cleanup() {
167 connected = false;
168 publisher.close();
169 }
170
171
172
173
174
175
176 protected List<ServerName> generateDeadServersListToSend() {
177
178 long since = EnvironmentEdgeManager.currentTimeMillis() - messagePeriod * 2;
179 for (Pair<ServerName, Long> dead : getDeadServers(since)) {
180 lastSent.putIfAbsent(dead.getFirst(), 0);
181 }
182
183
184 List<Map.Entry<ServerName, Integer>> entries = new ArrayList<Map.Entry<ServerName, Integer>>();
185 entries.addAll(lastSent.entrySet());
186 Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>() {
187 @Override
188 public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
189 return o1.getValue().compareTo(o2.getValue());
190 }
191 });
192
193
194 int max = entries.size() > MAX_SERVER_PER_MESSAGE ? MAX_SERVER_PER_MESSAGE : entries.size();
195 List<ServerName> res = new ArrayList<ServerName>(max);
196
197 for (int i = 0; i < max; i++) {
198 Map.Entry<ServerName, Integer> toSend = entries.get(i);
199 if (toSend.getValue() >= (NB_SEND - 1)) {
200 lastSent.remove(toSend.getKey());
201 } else {
202 lastSent.replace(toSend.getKey(), toSend.getValue(), toSend.getValue() + 1);
203 }
204
205 res.add(toSend.getKey());
206 }
207
208 return res;
209 }
210
211
212
213
214
215 protected List<Pair<ServerName, Long>> getDeadServers(long since) {
216 if (master.getServerManager() == null) {
217 return Collections.emptyList();
218 }
219
220 return master.getServerManager().getDeadServers().copyDeadServersSince(since);
221 }
222
223
224 public interface Publisher extends Closeable {
225
226 void connect(Configuration conf) throws IOException;
227
228 void publish(ClusterStatus cs);
229
230 @Override
231 void close();
232 }
233
234 public static class MulticastPublisher implements Publisher {
235 private DatagramChannel channel;
236 private final ExecutorService service = Executors.newSingleThreadExecutor(
237 Threads.newDaemonThreadFactory("hbase-master-clusterStatus-worker"));
238
239 public MulticastPublisher() {
240 }
241
242 @Override
243 public void connect(Configuration conf) throws IOException {
244 String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
245 HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
246 int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
247 HConstants.DEFAULT_STATUS_MULTICAST_PORT);
248
249
250 DatagramChannelFactory f = new OioDatagramChannelFactory(service);
251
252 ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
253 b.setPipeline(Channels.pipeline(new ProtobufEncoder(),
254 new ChannelUpstreamHandler() {
255 @Override
256 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
257 throws Exception {
258
259 }
260 }));
261
262
263 channel = (DatagramChannel) b.bind(new InetSocketAddress(0));
264 channel.getConfig().setReuseAddress(true);
265
266 InetAddress ina;
267 try {
268 ina = InetAddress.getByName(mcAddress);
269 } catch (UnknownHostException e) {
270 throw new IOException("Can't connect to " + mcAddress, e);
271 }
272 channel.joinGroup(ina);
273 channel.connect(new InetSocketAddress(mcAddress, port));
274 }
275
276 @Override
277 public void publish(ClusterStatus cs) {
278 ClusterStatusProtos.ClusterStatus csp = cs.convert();
279 channel.write(csp);
280 }
281
282 @Override
283 public void close() {
284 if (channel != null) {
285 channel.close();
286 }
287 service.shutdown();
288 }
289 }
290 }