1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift2;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.net.InetSocketAddress;
24 import java.net.UnknownHostException;
25 import java.security.PrivilegedAction;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34 import javax.security.auth.callback.Callback;
35 import javax.security.auth.callback.UnsupportedCallbackException;
36 import javax.security.sasl.AuthorizeCallback;
37 import javax.security.sasl.Sasl;
38 import javax.security.sasl.SaslServer;
39
40 import org.apache.commons.cli.CommandLine;
41 import org.apache.commons.cli.CommandLineParser;
42 import org.apache.commons.cli.HelpFormatter;
43 import org.apache.commons.cli.Option;
44 import org.apache.commons.cli.OptionGroup;
45 import org.apache.commons.cli.Options;
46 import org.apache.commons.cli.ParseException;
47 import org.apache.commons.cli.PosixParser;
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50 import org.apache.hadoop.classification.InterfaceAudience;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.filter.ParseFilter;
54 import org.apache.hadoop.hbase.security.SecurityUtil;
55 import org.apache.hadoop.hbase.security.UserProvider;
56 import org.apache.hadoop.hbase.thrift.CallQueue;
57 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
58 import org.apache.hadoop.hbase.thrift.ThriftMetrics;
59 import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
60 import org.apache.hadoop.hbase.util.InfoServer;
61 import org.apache.hadoop.hbase.util.Strings;
62 import org.apache.hadoop.net.DNS;
63 import org.apache.hadoop.security.UserGroupInformation;
64 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
65 import org.apache.hadoop.util.GenericOptionsParser;
66 import org.apache.thrift.TException;
67 import org.apache.thrift.TProcessor;
68 import org.apache.thrift.protocol.TBinaryProtocol;
69 import org.apache.thrift.protocol.TCompactProtocol;
70 import org.apache.thrift.protocol.TProtocol;
71 import org.apache.thrift.protocol.TProtocolFactory;
72 import org.apache.thrift.server.THsHaServer;
73 import org.apache.thrift.server.TNonblockingServer;
74 import org.apache.thrift.server.TServer;
75 import org.apache.thrift.server.TThreadPoolServer;
76 import org.apache.thrift.transport.TFramedTransport;
77 import org.apache.thrift.transport.TNonblockingServerSocket;
78 import org.apache.thrift.transport.TNonblockingServerTransport;
79 import org.apache.thrift.transport.TSaslServerTransport;
80 import org.apache.thrift.transport.TServerSocket;
81 import org.apache.thrift.transport.TServerTransport;
82 import org.apache.thrift.transport.TTransportException;
83 import org.apache.thrift.transport.TTransportFactory;
84
85 import com.google.common.util.concurrent.ThreadFactoryBuilder;
86
87
88
89
90
91 @InterfaceAudience.Private
92 @SuppressWarnings({ "rawtypes", "unchecked" })
93 public class ThriftServer {
94 private static final Log log = LogFactory.getLog(ThriftServer.class);
95
96
97
98
99
100
101
102
103
104
105 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
106
107 public static final int DEFAULT_LISTEN_PORT = 9090;
108
109
110 public ThriftServer() {
111 }
112
113 private static void printUsage() {
114 HelpFormatter formatter = new HelpFormatter();
115 formatter.printHelp("Thrift", null, getOptions(),
116 "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
117 "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
118 " send a kill signal to the thrift server pid",
119 true);
120 }
121
122 private static Options getOptions() {
123 Options options = new Options();
124 options.addOption("b", "bind", true,
125 "Address to bind the Thrift server to. [default: 0.0.0.0]");
126 options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
127 options.addOption("f", "framed", false, "Use framed transport");
128 options.addOption("c", "compact", false, "Use the compact protocol");
129 options.addOption("h", "help", false, "Print help information");
130 options.addOption(null, "infoport", true, "Port for web UI");
131
132 OptionGroup servers = new OptionGroup();
133 servers.addOption(
134 new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
135 servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
136 servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
137 options.addOptionGroup(servers);
138 return options;
139 }
140
141 private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
142 throws ParseException, IOException {
143 GenericOptionsParser genParser = new GenericOptionsParser(conf, args);
144 String[] remainingArgs = genParser.getRemainingArgs();
145 CommandLineParser parser = new PosixParser();
146 return parser.parse(options, remainingArgs);
147 }
148
149 private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
150 if (isCompact) {
151 log.debug("Using compact protocol");
152 return new TCompactProtocol.Factory();
153 } else {
154 log.debug("Using binary protocol");
155 return new TBinaryProtocol.Factory();
156 }
157 }
158
159 private static TTransportFactory getTTransportFactory(
160 String qop, String name, String host, boolean framed, int frameSize) {
161 if (framed) {
162 if (qop != null) {
163 throw new RuntimeException("Thrift server authentication"
164 + " doesn't work with framed transport yet");
165 }
166 log.debug("Using framed transport");
167 return new TFramedTransport.Factory(frameSize);
168 } else if (qop == null) {
169 return new TTransportFactory();
170 } else {
171 Map<String, String> saslProperties = new HashMap<String, String>();
172 saslProperties.put(Sasl.QOP, qop);
173 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
174 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
175 new SaslGssCallbackHandler() {
176 @Override
177 public void handle(Callback[] callbacks)
178 throws UnsupportedCallbackException {
179 AuthorizeCallback ac = null;
180 for (Callback callback : callbacks) {
181 if (callback instanceof AuthorizeCallback) {
182 ac = (AuthorizeCallback) callback;
183 } else {
184 throw new UnsupportedCallbackException(callback,
185 "Unrecognized SASL GSSAPI Callback");
186 }
187 }
188 if (ac != null) {
189 String authid = ac.getAuthenticationID();
190 String authzid = ac.getAuthorizationID();
191 if (!authid.equals(authzid)) {
192 ac.setAuthorized(false);
193 } else {
194 ac.setAuthorized(true);
195 String userName = SecurityUtil.getUserFromPrincipal(authzid);
196 log.info("Effective user: " + userName);
197 ac.setAuthorizedID(userName);
198 }
199 }
200 }
201 });
202 return saslFactory;
203 }
204 }
205
206
207
208
209 private static InetSocketAddress bindToPort(String bindValue, int listenPort)
210 throws UnknownHostException {
211 try {
212 if (bindValue == null) {
213 return new InetSocketAddress(listenPort);
214 } else {
215 return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
216 }
217 } catch (UnknownHostException e) {
218 throw new RuntimeException("Could not bind to provided ip address", e);
219 }
220 }
221
222 private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
223 TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
224 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
225 log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
226 TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
227 serverArgs.processor(processor);
228 serverArgs.transportFactory(transportFactory);
229 serverArgs.protocolFactory(protocolFactory);
230 return new TNonblockingServer(serverArgs);
231 }
232
233 private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
234 TProcessor processor, TTransportFactory transportFactory,
235 InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
236 throws TTransportException {
237 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
238 log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
239 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
240 ExecutorService executorService = createExecutor(
241 serverArgs.getWorkerThreads(), metrics);
242 serverArgs.executorService(executorService);
243 serverArgs.processor(processor);
244 serverArgs.transportFactory(transportFactory);
245 serverArgs.protocolFactory(protocolFactory);
246 return new THsHaServer(serverArgs);
247 }
248
249 private static ExecutorService createExecutor(
250 int workerThreads, ThriftMetrics metrics) {
251 CallQueue callQueue = new CallQueue(
252 new LinkedBlockingQueue<Call>(), metrics);
253 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
254 tfb.setDaemon(true);
255 tfb.setNameFormat("thrift2-worker-%d");
256 return new ThreadPoolExecutor(workerThreads, workerThreads,
257 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
258 }
259
260 private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
261 TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
262 TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
263 log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
264 TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
265 serverArgs.processor(processor);
266 serverArgs.transportFactory(transportFactory);
267 serverArgs.protocolFactory(protocolFactory);
268 return new TThreadPoolServer(serverArgs);
269 }
270
271
272
273
274
275
276 protected static void registerFilters(Configuration conf) {
277 String[] filters = conf.getStrings("hbase.thrift.filters");
278 if(filters != null) {
279 for(String filterClass: filters) {
280 String[] filterPart = filterClass.split(":");
281 if(filterPart.length != 2) {
282 log.warn("Invalid filter specification " + filterClass + " - skipping");
283 } else {
284 ParseFilter.registerFilter(filterPart[0], filterPart[1]);
285 }
286 }
287 }
288 }
289
290
291
292
293
294
295 public static void main(String[] args) throws Exception {
296 TServer server = null;
297 Options options = getOptions();
298 Configuration conf = HBaseConfiguration.create();
299 CommandLine cmd = parseArguments(conf, options, args);
300
301
302
303
304
305 List<?> argList = cmd.getArgList();
306 if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
307 printUsage();
308 System.exit(1);
309 }
310
311
312 int listenPort = 0;
313 try {
314 if (cmd.hasOption("port")) {
315 listenPort = Integer.parseInt(cmd.getOptionValue("port"));
316 } else {
317 listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
318 }
319 } catch (NumberFormatException e) {
320 throw new RuntimeException("Could not parse the value provided for the port option", e);
321 }
322
323
324
325 String host = null;
326 String name = null;
327
328 UserProvider userProvider = UserProvider.instantiate(conf);
329
330 boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
331 && userProvider.isHBaseSecurityEnabled();
332 if (securityEnabled) {
333 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
334 conf.get("hbase.thrift.dns.interface", "default"),
335 conf.get("hbase.thrift.dns.nameserver", "default")));
336 userProvider.login("hbase.thrift.keytab.file",
337 "hbase.thrift.kerberos.principal", host);
338 }
339
340 UserGroupInformation realUser = userProvider.getCurrent().getUGI();
341 String qop = conf.get(THRIFT_QOP_KEY);
342 if (qop != null) {
343 if (!qop.equals("auth") && !qop.equals("auth-int")
344 && !qop.equals("auth-conf")) {
345 throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
346 + ", it must be 'auth', 'auth-int', or 'auth-conf'");
347 }
348 if (!securityEnabled) {
349 throw new IOException("Thrift server must"
350 + " run in secure mode to support authentication");
351 }
352
353 name = SecurityUtil.getUserFromPrincipal(
354 conf.get("hbase.thrift.kerberos.principal"));
355 }
356
357 boolean nonblocking = cmd.hasOption("nonblocking");
358 boolean hsha = cmd.hasOption("hsha");
359
360 ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
361
362 String implType = "threadpool";
363 if (nonblocking) {
364 implType = "nonblocking";
365 } else if (hsha) {
366 implType = "hsha";
367 }
368
369 conf.set("hbase.regionserver.thrift.server.type", implType);
370 conf.setInt("hbase.regionserver.thrift.port", listenPort);
371 registerFilters(conf);
372
373
374 boolean compact = cmd.hasOption("compact") ||
375 conf.getBoolean("hbase.regionserver.thrift.compact", false);
376 TProtocolFactory protocolFactory = getTProtocolFactory(compact);
377 final ThriftHBaseServiceHandler hbaseHandler =
378 new ThriftHBaseServiceHandler(conf, userProvider);
379 THBaseService.Iface handler =
380 ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
381 final THBaseService.Processor p = new THBaseService.Processor(handler);
382 conf.setBoolean("hbase.regionserver.thrift.compact", compact);
383 TProcessor processor = p;
384
385 boolean framed = cmd.hasOption("framed") ||
386 conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
387 TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
388 conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
389 InetSocketAddress inetSocketAddress = bindToPort(cmd.getOptionValue("bind"), listenPort);
390 conf.setBoolean("hbase.regionserver.thrift.framed", framed);
391 if (qop != null) {
392
393 processor = new TProcessor() {
394 @Override
395 public boolean process(TProtocol inProt,
396 TProtocol outProt) throws TException {
397 TSaslServerTransport saslServerTransport =
398 (TSaslServerTransport)inProt.getTransport();
399 SaslServer saslServer = saslServerTransport.getSaslServer();
400 String principal = saslServer.getAuthorizationID();
401 hbaseHandler.setEffectiveUser(principal);
402 return p.process(inProt, outProt);
403 }
404 };
405 }
406
407
408 try {
409 if (cmd.hasOption("infoport")) {
410 String val = cmd.getOptionValue("infoport");
411 conf.setInt("hbase.thrift.info.port", Integer.valueOf(val));
412 log.debug("Web UI port set to " + val);
413 }
414 } catch (NumberFormatException e) {
415 log.error("Could not parse the value provided for the infoport option", e);
416 printUsage();
417 System.exit(1);
418 }
419
420
421 int port = conf.getInt("hbase.thrift.info.port", 9095);
422 if (port >= 0) {
423 conf.setLong("startcode", System.currentTimeMillis());
424 String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
425 InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
426 infoServer.setAttribute("hbase.conf", conf);
427 infoServer.start();
428 }
429
430 if (nonblocking) {
431 server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
432 } else if (hsha) {
433 server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics);
434 } else {
435 server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
436 }
437
438 final TServer tserver = server;
439 realUser.doAs(
440 new PrivilegedAction<Object>() {
441 @Override
442 public Object run() {
443 tserver.serve();
444 return null;
445 }
446 });
447 }
448 }