1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift;
20
21 import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.UnknownHostException;
27 import java.nio.ByteBuffer;
28 import java.security.PrivilegedAction;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.TreeMap;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41
42 import javax.security.auth.callback.Callback;
43 import javax.security.auth.callback.UnsupportedCallbackException;
44 import javax.security.sasl.AuthorizeCallback;
45 import javax.security.sasl.Sasl;
46 import javax.security.sasl.SaslServer;
47
48 import org.apache.commons.cli.CommandLine;
49 import org.apache.commons.cli.Option;
50 import org.apache.commons.cli.OptionGroup;
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53 import org.apache.hadoop.conf.Configuration;
54 import org.apache.hadoop.hbase.HBaseConfiguration;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionInfo;
58 import org.apache.hadoop.hbase.HRegionLocation;
59 import org.apache.hadoop.hbase.HTableDescriptor;
60 import org.apache.hadoop.hbase.KeyValue;
61 import org.apache.hadoop.hbase.ServerName;
62 import org.apache.hadoop.hbase.TableName;
63 import org.apache.hadoop.hbase.TableNotFoundException;
64 import org.apache.hadoop.hbase.classification.InterfaceAudience;
65 import org.apache.hadoop.hbase.client.Admin;
66 import org.apache.hadoop.hbase.client.Append;
67 import org.apache.hadoop.hbase.client.Delete;
68 import org.apache.hadoop.hbase.client.Durability;
69 import org.apache.hadoop.hbase.client.Get;
70 import org.apache.hadoop.hbase.client.HBaseAdmin;
71 import org.apache.hadoop.hbase.client.Increment;
72 import org.apache.hadoop.hbase.client.OperationWithAttributes;
73 import org.apache.hadoop.hbase.client.Put;
74 import org.apache.hadoop.hbase.client.RegionLocator;
75 import org.apache.hadoop.hbase.client.Result;
76 import org.apache.hadoop.hbase.client.ResultScanner;
77 import org.apache.hadoop.hbase.client.Scan;
78 import org.apache.hadoop.hbase.client.Table;
79 import org.apache.hadoop.hbase.filter.Filter;
80 import org.apache.hadoop.hbase.filter.ParseFilter;
81 import org.apache.hadoop.hbase.filter.PrefixFilter;
82 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
83 import org.apache.hadoop.hbase.security.SecurityUtil;
84 import org.apache.hadoop.hbase.security.UserProvider;
85 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
86 import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
87 import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
88 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
89 import org.apache.hadoop.hbase.thrift.generated.Hbase;
90 import org.apache.hadoop.hbase.thrift.generated.IOError;
91 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
92 import org.apache.hadoop.hbase.thrift.generated.Mutation;
93 import org.apache.hadoop.hbase.thrift.generated.TAppend;
94 import org.apache.hadoop.hbase.thrift.generated.TCell;
95 import org.apache.hadoop.hbase.thrift.generated.TIncrement;
96 import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
97 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
98 import org.apache.hadoop.hbase.thrift.generated.TScan;
99 import org.apache.hadoop.hbase.util.Bytes;
100 import org.apache.hadoop.hbase.util.ConnectionCache;
101 import org.apache.hadoop.hbase.util.DNS;
102 import org.apache.hadoop.hbase.util.Strings;
103 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
104 import org.apache.hadoop.security.UserGroupInformation;
105 import org.apache.hadoop.security.authorize.ProxyUsers;
106 import org.apache.thrift.TException;
107 import org.apache.thrift.TProcessor;
108 import org.apache.thrift.protocol.TBinaryProtocol;
109 import org.apache.thrift.protocol.TCompactProtocol;
110 import org.apache.thrift.protocol.TProtocol;
111 import org.apache.thrift.protocol.TProtocolFactory;
112 import org.apache.thrift.server.THsHaServer;
113 import org.apache.thrift.server.TNonblockingServer;
114 import org.apache.thrift.server.TServer;
115 import org.apache.thrift.server.TServlet;
116 import org.apache.thrift.server.TThreadedSelectorServer;
117 import org.apache.thrift.transport.TFramedTransport;
118 import org.apache.thrift.transport.TNonblockingServerSocket;
119 import org.apache.thrift.transport.TNonblockingServerTransport;
120 import org.apache.thrift.transport.TSaslServerTransport;
121 import org.apache.thrift.transport.TServerSocket;
122 import org.apache.thrift.transport.TServerTransport;
123 import org.apache.thrift.transport.TTransportFactory;
124 import org.mortbay.jetty.Connector;
125 import org.mortbay.jetty.Server;
126 import org.mortbay.jetty.nio.SelectChannelConnector;
127 import org.mortbay.jetty.security.SslSelectChannelConnector;
128 import org.mortbay.jetty.servlet.Context;
129 import org.mortbay.jetty.servlet.ServletHolder;
130 import org.mortbay.thread.QueuedThreadPool;
131
132 import com.google.common.base.Joiner;
133 import com.google.common.base.Throwables;
134 import com.google.common.util.concurrent.ThreadFactoryBuilder;
135
136
137
138
139
140 @InterfaceAudience.Private
141 public class ThriftServerRunner implements Runnable {
142
143 private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
144
145 static final String SERVER_TYPE_CONF_KEY =
146 "hbase.regionserver.thrift.server.type";
147
148 static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
149 static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
150 static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
151 static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
152 static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
153 static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
154 static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
155 static final String HTTP_MIN_THREADS = "hbase.thrift.http_threads.min";
156 static final String HTTP_MAX_THREADS = "hbase.thrift.http_threads.max";
157
158 static final String THRIFT_SSL_ENABLED = "hbase.thrift.ssl.enabled";
159 static final String THRIFT_SSL_KEYSTORE_STORE = "hbase.thrift.ssl.keystore.store";
160 static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
161 static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
162
163
164
165
166
167
168 public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
169 "hbase.thrift.server.socket.read.timeout";
170 public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
171
172
173
174
175
176
177
178
179
180
181
182 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
183 static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
184
185 private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
186 public static final int DEFAULT_LISTEN_PORT = 9090;
187 public static final int HREGION_VERSION = 1;
188 static final String THRIFT_SUPPORT_PROXYUSER = "hbase.thrift.support.proxyuser";
189 private final int listenPort;
190
191 private Configuration conf;
192 volatile TServer tserver;
193 volatile Server httpServer;
194 private final Hbase.Iface handler;
195 private final ThriftMetrics metrics;
196 private final HBaseHandler hbaseHandler;
197 private final UserGroupInformation realUser;
198
199 private final String qop;
200 private String host;
201
202 private final boolean securityEnabled;
203 private final boolean doAsEnabled;
204
205
206 enum ImplType {
207 HS_HA("hsha", true, THsHaServer.class, true),
208 NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
209 THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
210 THREADED_SELECTOR(
211 "threadedselector", true, TThreadedSelectorServer.class, true);
212
213 public static final ImplType DEFAULT = THREAD_POOL;
214
215 final String option;
216 final boolean isAlwaysFramed;
217 final Class<? extends TServer> serverClass;
218 final boolean canSpecifyBindIP;
219
220 ImplType(String option, boolean isAlwaysFramed,
221 Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
222 this.option = option;
223 this.isAlwaysFramed = isAlwaysFramed;
224 this.serverClass = serverClass;
225 this.canSpecifyBindIP = canSpecifyBindIP;
226 }
227
228
229
230
231
232 @Override
233 public String toString() {
234 return "-" + option;
235 }
236
237 String getDescription() {
238 StringBuilder sb = new StringBuilder("Use the " +
239 serverClass.getSimpleName());
240 if (isAlwaysFramed) {
241 sb.append(" This implies the framed transport.");
242 }
243 if (this == DEFAULT) {
244 sb.append("This is the default.");
245 }
246 return sb.toString();
247 }
248
249 static OptionGroup createOptionGroup() {
250 OptionGroup group = new OptionGroup();
251 for (ImplType t : values()) {
252 group.addOption(new Option(t.option, t.getDescription()));
253 }
254 return group;
255 }
256
257 static ImplType getServerImpl(Configuration conf) {
258 String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
259 for (ImplType t : values()) {
260 if (confType.equals(t.option)) {
261 return t;
262 }
263 }
264 throw new AssertionError("Unknown server ImplType.option:" + confType);
265 }
266
267 static void setServerImpl(CommandLine cmd, Configuration conf) {
268 ImplType chosenType = null;
269 int numChosen = 0;
270 for (ImplType t : values()) {
271 if (cmd.hasOption(t.option)) {
272 chosenType = t;
273 ++numChosen;
274 }
275 }
276 if (numChosen < 1) {
277 LOG.info("Using default thrift server type");
278 chosenType = DEFAULT;
279 } else if (numChosen > 1) {
280 throw new AssertionError("Exactly one option out of " +
281 Arrays.toString(values()) + " has to be specified");
282 }
283 LOG.info("Using thrift server type " + chosenType.option);
284 conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
285 }
286
287 public String simpleClassName() {
288 return serverClass.getSimpleName();
289 }
290
291 public static List<String> serversThatCannotSpecifyBindIP() {
292 List<String> l = new ArrayList<String>();
293 for (ImplType t : values()) {
294 if (!t.canSpecifyBindIP) {
295 l.add(t.simpleClassName());
296 }
297 }
298 return l;
299 }
300
301 }
302
303 public ThriftServerRunner(Configuration conf) throws IOException {
304 UserProvider userProvider = UserProvider.instantiate(conf);
305
306 securityEnabled = userProvider.isHadoopSecurityEnabled()
307 && userProvider.isHBaseSecurityEnabled();
308 if (securityEnabled) {
309 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
310 conf.get("hbase.thrift.dns.interface", "default"),
311 conf.get("hbase.thrift.dns.nameserver", "default")));
312 userProvider.login("hbase.thrift.keytab.file",
313 "hbase.thrift.kerberos.principal", host);
314 }
315 this.conf = HBaseConfiguration.create(conf);
316 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
317 this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
318 this.hbaseHandler = new HBaseHandler(conf, userProvider);
319 this.hbaseHandler.initMetrics(metrics);
320 this.handler = HbaseHandlerMetricsProxy.newInstance(
321 hbaseHandler, metrics, conf);
322 this.realUser = userProvider.getCurrent().getUGI();
323 qop = conf.get(THRIFT_QOP_KEY);
324 doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
325 if (qop != null) {
326 if (!qop.equals("auth") && !qop.equals("auth-int")
327 && !qop.equals("auth-conf")) {
328 throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
329 + ", it must be 'auth', 'auth-int', or 'auth-conf'");
330 }
331 if (!securityEnabled) {
332 throw new IOException("Thrift server must"
333 + " run in secure mode to support authentication");
334 }
335 }
336 }
337
338
339
340
341 @Override
342 public void run() {
343 realUser.doAs(new PrivilegedAction<Object>() {
344 @Override
345 public Object run() {
346 try {
347 if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
348 setupHTTPServer();
349 httpServer.start();
350 httpServer.join();
351 } else {
352 setupServer();
353 tserver.serve();
354 }
355 } catch (Exception e) {
356 LOG.fatal("Cannot run ThriftServer", e);
357
358 System.exit(-1);
359 }
360 return null;
361 }
362 });
363
364 }
365
366 public void shutdown() {
367 if (tserver != null) {
368 tserver.stop();
369 tserver = null;
370 }
371 if (httpServer != null) {
372 try {
373 httpServer.stop();
374 httpServer = null;
375 } catch (Exception e) {
376 LOG.error("Problem encountered in shutting down HTTP server " + e.getCause());
377 }
378 httpServer = null;
379 }
380 }
381
382 private void setupHTTPServer() throws IOException {
383 TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
384 TProcessor processor = new Hbase.Processor<Hbase.Iface>(handler);
385 TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, realUser,
386 conf, hbaseHandler, securityEnabled, doAsEnabled);
387
388 httpServer = new Server();
389
390 Context context = new Context(httpServer, "/", Context.SESSIONS);
391 context.setContextPath("/");
392 String httpPath = "/*";
393 httpServer.setHandler(context);
394 context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
395
396
397 Connector connector = new SelectChannelConnector();
398 if(conf.getBoolean(THRIFT_SSL_ENABLED, false)) {
399 SslSelectChannelConnector sslConnector = new SslSelectChannelConnector();
400 String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE);
401 String password = HBaseConfiguration.getPassword(conf,
402 THRIFT_SSL_KEYSTORE_PASSWORD, null);
403 String keyPassword = HBaseConfiguration.getPassword(conf,
404 THRIFT_SSL_KEYSTORE_KEYPASSWORD, password);
405 sslConnector.setKeystore(keystore);
406 sslConnector.setPassword(password);
407 sslConnector.setKeyPassword(keyPassword);
408 connector = sslConnector;
409 }
410 String host = getBindAddress(conf).getHostAddress();
411 connector.setPort(listenPort);
412 connector.setHost(host);
413 connector.setHeaderBufferSize(1024 * 64);
414 httpServer.addConnector(connector);
415
416 if (doAsEnabled) {
417 ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
418 }
419
420
421
422
423
424
425 int minThreads = conf.getInt(HTTP_MIN_THREADS, 2);
426 int maxThreads = conf.getInt(HTTP_MAX_THREADS, 100);
427 QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
428 threadPool.setMinThreads(minThreads);
429 httpServer.setThreadPool(threadPool);
430
431 httpServer.setSendServerVersion(false);
432 httpServer.setSendDateHeader(false);
433 httpServer.setStopAtShutdown(true);
434
435 LOG.info("Starting Thrift HTTP Server on " + Integer.toString(listenPort));
436 }
437
438
439
440
441 private void setupServer() throws Exception {
442
443 TProtocolFactory protocolFactory;
444 if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
445 LOG.debug("Using compact protocol");
446 protocolFactory = new TCompactProtocol.Factory();
447 } else {
448 LOG.debug("Using binary protocol");
449 protocolFactory = new TBinaryProtocol.Factory();
450 }
451
452 final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
453 ImplType implType = ImplType.getServerImpl(conf);
454 TProcessor processor = p;
455
456
457 TTransportFactory transportFactory;
458 if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
459 if (qop != null) {
460 throw new RuntimeException("Thrift server authentication"
461 + " doesn't work with framed transport yet");
462 }
463 transportFactory = new TFramedTransport.Factory(
464 conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2) * 1024 * 1024);
465 LOG.debug("Using framed transport");
466 } else if (qop == null) {
467 transportFactory = new TTransportFactory();
468 } else {
469
470 String name = SecurityUtil.getUserFromPrincipal(
471 conf.get("hbase.thrift.kerberos.principal"));
472 Map<String, String> saslProperties = new HashMap<String, String>();
473 saslProperties.put(Sasl.QOP, qop);
474 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
475 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
476 new SaslGssCallbackHandler() {
477 @Override
478 public void handle(Callback[] callbacks)
479 throws UnsupportedCallbackException {
480 AuthorizeCallback ac = null;
481 for (Callback callback : callbacks) {
482 if (callback instanceof AuthorizeCallback) {
483 ac = (AuthorizeCallback) callback;
484 } else {
485 throw new UnsupportedCallbackException(callback,
486 "Unrecognized SASL GSSAPI Callback");
487 }
488 }
489 if (ac != null) {
490 String authid = ac.getAuthenticationID();
491 String authzid = ac.getAuthorizationID();
492 if (!authid.equals(authzid)) {
493 ac.setAuthorized(false);
494 } else {
495 ac.setAuthorized(true);
496 String userName = SecurityUtil.getUserFromPrincipal(authzid);
497 LOG.info("Effective user: " + userName);
498 ac.setAuthorizedID(userName);
499 }
500 }
501 }
502 });
503 transportFactory = saslFactory;
504
505
506 processor = new TProcessor() {
507 @Override
508 public boolean process(TProtocol inProt,
509 TProtocol outProt) throws TException {
510 TSaslServerTransport saslServerTransport =
511 (TSaslServerTransport)inProt.getTransport();
512 SaslServer saslServer = saslServerTransport.getSaslServer();
513 String principal = saslServer.getAuthorizationID();
514 hbaseHandler.setEffectiveUser(principal);
515 return p.process(inProt, outProt);
516 }
517 };
518 }
519
520 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
521 LOG.error("Server types " + Joiner.on(", ").join(
522 ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
523 "address binding at the moment. See " +
524 "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
525 throw new RuntimeException(
526 "-" + BIND_CONF_KEY + " not supported with " + implType);
527 }
528
529
530 int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
531
532 if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
533 implType == ImplType.THREADED_SELECTOR) {
534 InetAddress listenAddress = getBindAddress(conf);
535 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
536 new InetSocketAddress(listenAddress, listenPort));
537
538 if (implType == ImplType.NONBLOCKING) {
539 TNonblockingServer.Args serverArgs =
540 new TNonblockingServer.Args(serverTransport);
541 serverArgs.processor(processor)
542 .transportFactory(transportFactory)
543 .protocolFactory(protocolFactory);
544 tserver = new TNonblockingServer(serverArgs);
545 } else if (implType == ImplType.HS_HA) {
546 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
547 CallQueue callQueue =
548 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
549 ExecutorService executorService = createExecutor(
550 callQueue, serverArgs.getMinWorkerThreads(), serverArgs.getMaxWorkerThreads());
551 serverArgs.executorService(executorService)
552 .processor(processor)
553 .transportFactory(transportFactory)
554 .protocolFactory(protocolFactory);
555 tserver = new THsHaServer(serverArgs);
556 } else {
557 TThreadedSelectorServer.Args serverArgs =
558 new HThreadedSelectorServerArgs(serverTransport, conf);
559 CallQueue callQueue =
560 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
561 ExecutorService executorService = createExecutor(
562 callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
563 serverArgs.executorService(executorService)
564 .processor(processor)
565 .transportFactory(transportFactory)
566 .protocolFactory(protocolFactory);
567 tserver = new TThreadedSelectorServer(serverArgs);
568 }
569 LOG.info("starting HBase " + implType.simpleClassName() +
570 " server on " + Integer.toString(listenPort));
571 } else if (implType == ImplType.THREAD_POOL) {
572
573 InetAddress listenAddress = getBindAddress(conf);
574 int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
575 THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
576 TServerTransport serverTransport = new TServerSocket(
577 new TServerSocket.ServerSocketTransportArgs().
578 bindAddr(new InetSocketAddress(listenAddress, listenPort)).
579 backlog(backlog).
580 clientTimeout(readTimeout));
581
582 TBoundedThreadPoolServer.Args serverArgs =
583 new TBoundedThreadPoolServer.Args(serverTransport, conf);
584 serverArgs.processor(processor)
585 .transportFactory(transportFactory)
586 .protocolFactory(protocolFactory);
587 LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
588 + listenAddress + ":" + Integer.toString(listenPort)
589 + " with readTimeout " + readTimeout + "ms; " + serverArgs);
590 TBoundedThreadPoolServer tserver =
591 new TBoundedThreadPoolServer(serverArgs, metrics);
592 this.tserver = tserver;
593 } else {
594 throw new AssertionError("Unsupported Thrift server implementation: " +
595 implType.simpleClassName());
596 }
597
598
599 if (tserver.getClass() != implType.serverClass) {
600 throw new AssertionError("Expected to create Thrift server class " +
601 implType.serverClass.getName() + " but got " +
602 tserver.getClass().getName());
603 }
604
605
606
607 registerFilters(conf);
608 }
609
610 ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
611 int minWorkers, int maxWorkers) {
612 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
613 tfb.setDaemon(true);
614 tfb.setNameFormat("thrift-worker-%d");
615 return new ThreadPoolExecutor(minWorkers, maxWorkers,
616 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
617 }
618
619 private InetAddress getBindAddress(Configuration conf)
620 throws UnknownHostException {
621 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
622 return InetAddress.getByName(bindAddressStr);
623 }
624
625 protected static class ResultScannerWrapper {
626
627 private final ResultScanner scanner;
628 private final boolean sortColumns;
629 public ResultScannerWrapper(ResultScanner resultScanner,
630 boolean sortResultColumns) {
631 scanner = resultScanner;
632 sortColumns = sortResultColumns;
633 }
634
635 public ResultScanner getScanner() {
636 return scanner;
637 }
638
639 public boolean isColumnSorted() {
640 return sortColumns;
641 }
642 }
643
644
645
646
647
648 public static class HBaseHandler implements Hbase.Iface {
649 protected Configuration conf;
650 protected static final Log LOG = LogFactory.getLog(HBaseHandler.class);
651
652
653 protected int nextScannerId = 0;
654 protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
655 private ThriftMetrics metrics = null;
656
657 private final ConnectionCache connectionCache;
658 IncrementCoalescer coalescer = null;
659
660 static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
661 static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
662
663
664
665
666
667
668
669 byte[][] getAllColumns(Table table) throws IOException {
670 HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
671 byte[][] columns = new byte[cds.length][];
672 for (int i = 0; i < cds.length; i++) {
673 columns[i] = Bytes.add(cds[i].getName(),
674 KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
675 }
676 return columns;
677 }
678
679
680
681
682
683
684
685
686
687 public Table getTable(final byte[] tableName) throws
688 IOException {
689 String table = Bytes.toString(tableName);
690 return connectionCache.getTable(table);
691 }
692
693 public Table getTable(final ByteBuffer tableName) throws IOException {
694 return getTable(getBytes(tableName));
695 }
696
697
698
699
700
701
702
703
704 protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
705 int id = nextScannerId++;
706 ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
707 scannerMap.put(id, resultScannerWrapper);
708 return id;
709 }
710
711
712
713
714
715
716
717 protected synchronized ResultScannerWrapper getScanner(int id) {
718 return scannerMap.get(id);
719 }
720
721
722
723
724
725
726
727
728 protected synchronized ResultScannerWrapper removeScanner(int id) {
729 return scannerMap.remove(id);
730 }
731
732 protected HBaseHandler(final Configuration c,
733 final UserProvider userProvider) throws IOException {
734 this.conf = c;
735 scannerMap = new HashMap<Integer, ResultScannerWrapper>();
736 this.coalescer = new IncrementCoalescer(this);
737
738 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
739 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
740 connectionCache = new ConnectionCache(
741 conf, userProvider, cleanInterval, maxIdleTime);
742 }
743
744
745
746
747 private Admin getAdmin() throws IOException {
748 return connectionCache.getAdmin();
749 }
750
751 void setEffectiveUser(String effectiveUser) {
752 connectionCache.setEffectiveUser(effectiveUser);
753 }
754
755 @Override
756 public void enableTable(ByteBuffer tableName) throws IOError {
757 try{
758 getAdmin().enableTable(getTableName(tableName));
759 } catch (IOException e) {
760 LOG.warn(e.getMessage(), e);
761 throw new IOError(Throwables.getStackTraceAsString(e));
762 }
763 }
764
765 @Override
766 public void disableTable(ByteBuffer tableName) throws IOError{
767 try{
768 getAdmin().disableTable(getTableName(tableName));
769 } catch (IOException e) {
770 LOG.warn(e.getMessage(), e);
771 throw new IOError(Throwables.getStackTraceAsString(e));
772 }
773 }
774
775 @Override
776 public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
777 try {
778 return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
779 } catch (IOException e) {
780 LOG.warn(e.getMessage(), e);
781 throw new IOError(Throwables.getStackTraceAsString(e));
782 }
783 }
784
785 @Override
786 public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
787 try {
788
789
790
791 ((HBaseAdmin) getAdmin()).compact(getBytes(tableNameOrRegionName));
792 } catch (IOException e) {
793 LOG.warn(e.getMessage(), e);
794 throw new IOError(Throwables.getStackTraceAsString(e));
795 }
796 }
797
798 @Override
799 public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
800 try {
801
802
803
804 ((HBaseAdmin) getAdmin()).majorCompact(getBytes(tableNameOrRegionName));
805 } catch (IOException e) {
806 LOG.warn(e.getMessage(), e);
807 throw new IOError(Throwables.getStackTraceAsString(e));
808 }
809 }
810
811 @Override
812 public List<ByteBuffer> getTableNames() throws IOError {
813 try {
814 TableName[] tableNames = this.getAdmin().listTableNames();
815 ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
816 for (int i = 0; i < tableNames.length; i++) {
817 list.add(ByteBuffer.wrap(tableNames[i].getName()));
818 }
819 return list;
820 } catch (IOException e) {
821 LOG.warn(e.getMessage(), e);
822 throw new IOError(Throwables.getStackTraceAsString(e));
823 }
824 }
825
826
827
828
829 @Override
830 public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
831 throws IOError {
832 try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
833 List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
834 List<TRegionInfo> results = new ArrayList<TRegionInfo>();
835 for (HRegionLocation regionLocation : regionLocations) {
836 HRegionInfo info = regionLocation.getRegionInfo();
837 ServerName serverName = regionLocation.getServerName();
838 TRegionInfo region = new TRegionInfo();
839 region.serverName = ByteBuffer.wrap(
840 Bytes.toBytes(serverName.getHostname()));
841 region.port = serverName.getPort();
842 region.startKey = ByteBuffer.wrap(info.getStartKey());
843 region.endKey = ByteBuffer.wrap(info.getEndKey());
844 region.id = info.getRegionId();
845 region.name = ByteBuffer.wrap(info.getRegionName());
846 region.version = info.getVersion();
847 results.add(region);
848 }
849 return results;
850 } catch (TableNotFoundException e) {
851
852 return Collections.emptyList();
853 } catch (IOException e){
854 LOG.warn(e.getMessage(), e);
855 throw new IOError(Throwables.getStackTraceAsString(e));
856 }
857 }
858
859 @Override
860 public List<TCell> get(
861 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
862 Map<ByteBuffer, ByteBuffer> attributes)
863 throws IOError {
864 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
865 if (famAndQf.length == 1) {
866 return get(tableName, row, famAndQf[0], null, attributes);
867 }
868 if (famAndQf.length == 2) {
869 return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
870 }
871 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
872 }
873
874
875
876
877
878
879
880
881 protected List<TCell> get(ByteBuffer tableName,
882 ByteBuffer row,
883 byte[] family,
884 byte[] qualifier,
885 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
886 Table table = null;
887 try {
888 table = getTable(tableName);
889 Get get = new Get(getBytes(row));
890 addAttributes(get, attributes);
891 if (qualifier == null) {
892 get.addFamily(family);
893 } else {
894 get.addColumn(family, qualifier);
895 }
896 Result result = table.get(get);
897 return ThriftUtilities.cellFromHBase(result.rawCells());
898 } catch (IOException e) {
899 LOG.warn(e.getMessage(), e);
900 throw new IOError(Throwables.getStackTraceAsString(e));
901 } finally {
902 closeTable(table);
903 }
904 }
905
906 @Override
907 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
908 int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
909 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
910 if(famAndQf.length == 1) {
911 return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
912 }
913 if (famAndQf.length == 2) {
914 return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
915 }
916 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
917
918 }
919
920
921
922
923
924
925
926
927
928 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
929 byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
930
931 Table table = null;
932 try {
933 table = getTable(tableName);
934 Get get = new Get(getBytes(row));
935 addAttributes(get, attributes);
936 if (null == qualifier) {
937 get.addFamily(family);
938 } else {
939 get.addColumn(family, qualifier);
940 }
941 get.setMaxVersions(numVersions);
942 Result result = table.get(get);
943 return ThriftUtilities.cellFromHBase(result.rawCells());
944 } catch (IOException e) {
945 LOG.warn(e.getMessage(), e);
946 throw new IOError(Throwables.getStackTraceAsString(e));
947 } finally{
948 closeTable(table);
949 }
950 }
951
952 @Override
953 public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
954 long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
955 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
956 if (famAndQf.length == 1) {
957 return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
958 }
959 if (famAndQf.length == 2) {
960 return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
961 attributes);
962 }
963 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
964 }
965
966
967
968
969
970
971
972
973 protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
974 byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
975 throws IOError {
976
977 Table table = null;
978 try {
979 table = getTable(tableName);
980 Get get = new Get(getBytes(row));
981 addAttributes(get, attributes);
982 if (null == qualifier) {
983 get.addFamily(family);
984 } else {
985 get.addColumn(family, qualifier);
986 }
987 get.setTimeRange(0, timestamp);
988 get.setMaxVersions(numVersions);
989 Result result = table.get(get);
990 return ThriftUtilities.cellFromHBase(result.rawCells());
991 } catch (IOException e) {
992 LOG.warn(e.getMessage(), e);
993 throw new IOError(Throwables.getStackTraceAsString(e));
994 } finally{
995 closeTable(table);
996 }
997 }
998
999 @Override
1000 public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
1001 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1002 return getRowWithColumnsTs(tableName, row, null,
1003 HConstants.LATEST_TIMESTAMP,
1004 attributes);
1005 }
1006
1007 @Override
1008 public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
1009 ByteBuffer row,
1010 List<ByteBuffer> columns,
1011 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1012 return getRowWithColumnsTs(tableName, row, columns,
1013 HConstants.LATEST_TIMESTAMP,
1014 attributes);
1015 }
1016
1017 @Override
1018 public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
1019 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1020 return getRowWithColumnsTs(tableName, row, null,
1021 timestamp, attributes);
1022 }
1023
1024 @Override
1025 public List<TRowResult> getRowWithColumnsTs(
1026 ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
1027 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1028
1029 Table table = null;
1030 try {
1031 table = getTable(tableName);
1032 if (columns == null) {
1033 Get get = new Get(getBytes(row));
1034 addAttributes(get, attributes);
1035 get.setTimeRange(0, timestamp);
1036 Result result = table.get(get);
1037 return ThriftUtilities.rowResultFromHBase(result);
1038 }
1039 Get get = new Get(getBytes(row));
1040 addAttributes(get, attributes);
1041 for(ByteBuffer column : columns) {
1042 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1043 if (famAndQf.length == 1) {
1044 get.addFamily(famAndQf[0]);
1045 } else {
1046 get.addColumn(famAndQf[0], famAndQf[1]);
1047 }
1048 }
1049 get.setTimeRange(0, timestamp);
1050 Result result = table.get(get);
1051 return ThriftUtilities.rowResultFromHBase(result);
1052 } catch (IOException e) {
1053 LOG.warn(e.getMessage(), e);
1054 throw new IOError(Throwables.getStackTraceAsString(e));
1055 } finally{
1056 closeTable(table);
1057 }
1058 }
1059
1060 @Override
1061 public List<TRowResult> getRows(ByteBuffer tableName,
1062 List<ByteBuffer> rows,
1063 Map<ByteBuffer, ByteBuffer> attributes)
1064 throws IOError {
1065 return getRowsWithColumnsTs(tableName, rows, null,
1066 HConstants.LATEST_TIMESTAMP,
1067 attributes);
1068 }
1069
1070 @Override
1071 public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
1072 List<ByteBuffer> rows,
1073 List<ByteBuffer> columns,
1074 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1075 return getRowsWithColumnsTs(tableName, rows, columns,
1076 HConstants.LATEST_TIMESTAMP,
1077 attributes);
1078 }
1079
1080 @Override
1081 public List<TRowResult> getRowsTs(ByteBuffer tableName,
1082 List<ByteBuffer> rows,
1083 long timestamp,
1084 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1085 return getRowsWithColumnsTs(tableName, rows, null,
1086 timestamp, attributes);
1087 }
1088
1089 @Override
1090 public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
1091 List<ByteBuffer> rows,
1092 List<ByteBuffer> columns, long timestamp,
1093 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1094
1095 Table table= null;
1096 try {
1097 List<Get> gets = new ArrayList<Get>(rows.size());
1098 table = getTable(tableName);
1099 if (metrics != null) {
1100 metrics.incNumRowKeysInBatchGet(rows.size());
1101 }
1102 for (ByteBuffer row : rows) {
1103 Get get = new Get(getBytes(row));
1104 addAttributes(get, attributes);
1105 if (columns != null) {
1106
1107 for(ByteBuffer column : columns) {
1108 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1109 if (famAndQf.length == 1) {
1110 get.addFamily(famAndQf[0]);
1111 } else {
1112 get.addColumn(famAndQf[0], famAndQf[1]);
1113 }
1114 }
1115 }
1116 get.setTimeRange(0, timestamp);
1117 gets.add(get);
1118 }
1119 Result[] result = table.get(gets);
1120 return ThriftUtilities.rowResultFromHBase(result);
1121 } catch (IOException e) {
1122 LOG.warn(e.getMessage(), e);
1123 throw new IOError(Throwables.getStackTraceAsString(e));
1124 } finally{
1125 closeTable(table);
1126 }
1127 }
1128
1129 @Override
1130 public void deleteAll(
1131 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1132 Map<ByteBuffer, ByteBuffer> attributes)
1133 throws IOError {
1134 deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1135 attributes);
1136 }
1137
1138 @Override
1139 public void deleteAllTs(ByteBuffer tableName,
1140 ByteBuffer row,
1141 ByteBuffer column,
1142 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1143 Table table = null;
1144 try {
1145 table = getTable(tableName);
1146 Delete delete = new Delete(getBytes(row));
1147 addAttributes(delete, attributes);
1148 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1149 if (famAndQf.length == 1) {
1150 delete.deleteFamily(famAndQf[0], timestamp);
1151 } else {
1152 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1153 }
1154 table.delete(delete);
1155
1156 } catch (IOException e) {
1157 LOG.warn(e.getMessage(), e);
1158 throw new IOError(Throwables.getStackTraceAsString(e));
1159 } finally {
1160 closeTable(table);
1161 }
1162 }
1163
1164 @Override
1165 public void deleteAllRow(
1166 ByteBuffer tableName, ByteBuffer row,
1167 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1168 deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1169 }
1170
1171 @Override
1172 public void deleteAllRowTs(
1173 ByteBuffer tableName, ByteBuffer row, long timestamp,
1174 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1175 Table table = null;
1176 try {
1177 table = getTable(tableName);
1178 Delete delete = new Delete(getBytes(row), timestamp);
1179 addAttributes(delete, attributes);
1180 table.delete(delete);
1181 } catch (IOException e) {
1182 LOG.warn(e.getMessage(), e);
1183 throw new IOError(Throwables.getStackTraceAsString(e));
1184 } finally {
1185 closeTable(table);
1186 }
1187 }
1188
1189 @Override
1190 public void createTable(ByteBuffer in_tableName,
1191 List<ColumnDescriptor> columnFamilies) throws IOError,
1192 IllegalArgument, AlreadyExists {
1193 TableName tableName = getTableName(in_tableName);
1194 try {
1195 if (getAdmin().tableExists(tableName)) {
1196 throw new AlreadyExists("table name already in use");
1197 }
1198 HTableDescriptor desc = new HTableDescriptor(tableName);
1199 for (ColumnDescriptor col : columnFamilies) {
1200 HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1201 desc.addFamily(colDesc);
1202 }
1203 getAdmin().createTable(desc);
1204 } catch (IOException e) {
1205 LOG.warn(e.getMessage(), e);
1206 throw new IOError(Throwables.getStackTraceAsString(e));
1207 } catch (IllegalArgumentException e) {
1208 LOG.warn(e.getMessage(), e);
1209 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1210 }
1211 }
1212
1213 private static TableName getTableName(ByteBuffer buffer) {
1214 return TableName.valueOf(getBytes(buffer));
1215 }
1216
1217 @Override
1218 public void deleteTable(ByteBuffer in_tableName) throws IOError {
1219 TableName tableName = getTableName(in_tableName);
1220 if (LOG.isDebugEnabled()) {
1221 LOG.debug("deleteTable: table=" + tableName);
1222 }
1223 try {
1224 if (!getAdmin().tableExists(tableName)) {
1225 throw new IOException("table does not exist");
1226 }
1227 getAdmin().deleteTable(tableName);
1228 } catch (IOException e) {
1229 LOG.warn(e.getMessage(), e);
1230 throw new IOError(Throwables.getStackTraceAsString(e));
1231 }
1232 }
1233
1234 @Override
1235 public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1236 List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1237 throws IOError, IllegalArgument {
1238 mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1239 attributes);
1240 }
1241
1242 @Override
1243 public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1244 List<Mutation> mutations, long timestamp,
1245 Map<ByteBuffer, ByteBuffer> attributes)
1246 throws IOError, IllegalArgument {
1247 Table table = null;
1248 try {
1249 table = getTable(tableName);
1250 Put put = new Put(getBytes(row), timestamp);
1251 addAttributes(put, attributes);
1252
1253 Delete delete = new Delete(getBytes(row));
1254 addAttributes(delete, attributes);
1255 if (metrics != null) {
1256 metrics.incNumRowKeysInBatchMutate(mutations.size());
1257 }
1258
1259
1260 for (Mutation m : mutations) {
1261 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1262 if (m.isDelete) {
1263 if (famAndQf.length == 1) {
1264 delete.deleteFamily(famAndQf[0], timestamp);
1265 } else {
1266 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1267 }
1268 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1269 : Durability.SKIP_WAL);
1270 } else {
1271 if(famAndQf.length == 1) {
1272 LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1273 + "over the whole column family.");
1274 } else {
1275 put.addImmutable(famAndQf[0], famAndQf[1],
1276 m.value != null ? getBytes(m.value)
1277 : HConstants.EMPTY_BYTE_ARRAY);
1278 }
1279 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1280 }
1281 }
1282 if (!delete.isEmpty())
1283 table.delete(delete);
1284 if (!put.isEmpty())
1285 table.put(put);
1286 } catch (IOException e) {
1287 LOG.warn(e.getMessage(), e);
1288 throw new IOError(Throwables.getStackTraceAsString(e));
1289 } catch (IllegalArgumentException e) {
1290 LOG.warn(e.getMessage(), e);
1291 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1292 } finally{
1293 closeTable(table);
1294 }
1295 }
1296
1297 @Override
1298 public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1299 Map<ByteBuffer, ByteBuffer> attributes)
1300 throws IOError, IllegalArgument, TException {
1301 mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1302 }
1303
1304 @Override
1305 public void mutateRowsTs(
1306 ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1307 Map<ByteBuffer, ByteBuffer> attributes)
1308 throws IOError, IllegalArgument, TException {
1309 List<Put> puts = new ArrayList<Put>();
1310 List<Delete> deletes = new ArrayList<Delete>();
1311
1312 for (BatchMutation batch : rowBatches) {
1313 byte[] row = getBytes(batch.row);
1314 List<Mutation> mutations = batch.mutations;
1315 Delete delete = new Delete(row);
1316 addAttributes(delete, attributes);
1317 Put put = new Put(row, timestamp);
1318 addAttributes(put, attributes);
1319 for (Mutation m : mutations) {
1320 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1321 if (m.isDelete) {
1322
1323 if (famAndQf.length == 1) {
1324 delete.deleteFamily(famAndQf[0], timestamp);
1325 } else {
1326 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1327 }
1328 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1329 : Durability.SKIP_WAL);
1330 } else {
1331 if (famAndQf.length == 1) {
1332 LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1333 + "over the whole column family.");
1334 }
1335 if (famAndQf.length == 2) {
1336 put.addImmutable(famAndQf[0], famAndQf[1],
1337 m.value != null ? getBytes(m.value)
1338 : HConstants.EMPTY_BYTE_ARRAY);
1339 } else {
1340 throw new IllegalArgumentException("Invalid famAndQf provided.");
1341 }
1342 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1343 }
1344 }
1345 if (!delete.isEmpty())
1346 deletes.add(delete);
1347 if (!put.isEmpty())
1348 puts.add(put);
1349 }
1350
1351 Table table = null;
1352 try {
1353 table = getTable(tableName);
1354 if (!puts.isEmpty())
1355 table.put(puts);
1356 if (!deletes.isEmpty())
1357 table.delete(deletes);
1358
1359 } catch (IOException e) {
1360 LOG.warn(e.getMessage(), e);
1361 throw new IOError(Throwables.getStackTraceAsString(e));
1362 } catch (IllegalArgumentException e) {
1363 LOG.warn(e.getMessage(), e);
1364 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1365 } finally{
1366 closeTable(table);
1367 }
1368 }
1369
1370 @Override
1371 public long atomicIncrement(
1372 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1373 throws IOError, IllegalArgument, TException {
1374 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1375 if(famAndQf.length == 1) {
1376 return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1377 }
1378 return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1379 }
1380
1381 protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1382 byte [] family, byte [] qualifier, long amount)
1383 throws IOError, IllegalArgument, TException {
1384 Table table = null;
1385 try {
1386 table = getTable(tableName);
1387 return table.incrementColumnValue(
1388 getBytes(row), family, qualifier, amount);
1389 } catch (IOException e) {
1390 LOG.warn(e.getMessage(), e);
1391 throw new IOError(Throwables.getStackTraceAsString(e));
1392 } finally {
1393 closeTable(table);
1394 }
1395 }
1396
1397 @Override
1398 public void scannerClose(int id) throws IOError, IllegalArgument {
1399 LOG.debug("scannerClose: id=" + id);
1400 ResultScannerWrapper resultScannerWrapper = getScanner(id);
1401 if (resultScannerWrapper == null) {
1402 String message = "scanner ID is invalid";
1403 LOG.warn(message);
1404 throw new IllegalArgument("scanner ID is invalid");
1405 }
1406 resultScannerWrapper.getScanner().close();
1407 removeScanner(id);
1408 }
1409
1410 @Override
1411 public List<TRowResult> scannerGetList(int id,int nbRows)
1412 throws IllegalArgument, IOError {
1413 LOG.debug("scannerGetList: id=" + id);
1414 ResultScannerWrapper resultScannerWrapper = getScanner(id);
1415 if (null == resultScannerWrapper) {
1416 String message = "scanner ID is invalid";
1417 LOG.warn(message);
1418 throw new IllegalArgument("scanner ID is invalid");
1419 }
1420
1421 Result [] results = null;
1422 try {
1423 results = resultScannerWrapper.getScanner().next(nbRows);
1424 if (null == results) {
1425 return new ArrayList<TRowResult>();
1426 }
1427 } catch (IOException e) {
1428 LOG.warn(e.getMessage(), e);
1429 throw new IOError(Throwables.getStackTraceAsString(e));
1430 }
1431 return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1432 }
1433
1434 @Override
1435 public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1436 return scannerGetList(id,1);
1437 }
1438
1439 @Override
1440 public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1441 Map<ByteBuffer, ByteBuffer> attributes)
1442 throws IOError {
1443
1444 Table table = null;
1445 try {
1446 table = getTable(tableName);
1447 Scan scan = new Scan();
1448 addAttributes(scan, attributes);
1449 if (tScan.isSetStartRow()) {
1450 scan.setStartRow(tScan.getStartRow());
1451 }
1452 if (tScan.isSetStopRow()) {
1453 scan.setStopRow(tScan.getStopRow());
1454 }
1455 if (tScan.isSetTimestamp()) {
1456 scan.setTimeRange(0, tScan.getTimestamp());
1457 }
1458 if (tScan.isSetCaching()) {
1459 scan.setCaching(tScan.getCaching());
1460 }
1461 if (tScan.isSetBatchSize()) {
1462 scan.setBatch(tScan.getBatchSize());
1463 }
1464 if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1465 for(ByteBuffer column : tScan.getColumns()) {
1466 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1467 if(famQf.length == 1) {
1468 scan.addFamily(famQf[0]);
1469 } else {
1470 scan.addColumn(famQf[0], famQf[1]);
1471 }
1472 }
1473 }
1474 if (tScan.isSetFilterString()) {
1475 ParseFilter parseFilter = new ParseFilter();
1476 scan.setFilter(
1477 parseFilter.parseFilterString(tScan.getFilterString()));
1478 }
1479 if (tScan.isSetReversed()) {
1480 scan.setReversed(tScan.isReversed());
1481 }
1482 return addScanner(table.getScanner(scan), tScan.sortColumns);
1483 } catch (IOException e) {
1484 LOG.warn(e.getMessage(), e);
1485 throw new IOError(Throwables.getStackTraceAsString(e));
1486 } finally{
1487 closeTable(table);
1488 }
1489 }
1490
1491 @Override
1492 public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1493 List<ByteBuffer> columns,
1494 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1495
1496 Table table = null;
1497 try {
1498 table = getTable(tableName);
1499 Scan scan = new Scan(getBytes(startRow));
1500 addAttributes(scan, attributes);
1501 if(columns != null && columns.size() != 0) {
1502 for(ByteBuffer column : columns) {
1503 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1504 if(famQf.length == 1) {
1505 scan.addFamily(famQf[0]);
1506 } else {
1507 scan.addColumn(famQf[0], famQf[1]);
1508 }
1509 }
1510 }
1511 return addScanner(table.getScanner(scan), false);
1512 } catch (IOException e) {
1513 LOG.warn(e.getMessage(), e);
1514 throw new IOError(Throwables.getStackTraceAsString(e));
1515 } finally{
1516 closeTable(table);
1517 }
1518 }
1519
1520 @Override
1521 public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1522 ByteBuffer stopRow, List<ByteBuffer> columns,
1523 Map<ByteBuffer, ByteBuffer> attributes)
1524 throws IOError, TException {
1525
1526 Table table = null;
1527 try {
1528 table = getTable(tableName);
1529 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1530 addAttributes(scan, attributes);
1531 if(columns != null && columns.size() != 0) {
1532 for(ByteBuffer column : columns) {
1533 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1534 if(famQf.length == 1) {
1535 scan.addFamily(famQf[0]);
1536 } else {
1537 scan.addColumn(famQf[0], famQf[1]);
1538 }
1539 }
1540 }
1541 return addScanner(table.getScanner(scan), false);
1542 } catch (IOException e) {
1543 LOG.warn(e.getMessage(), e);
1544 throw new IOError(Throwables.getStackTraceAsString(e));
1545 } finally{
1546 closeTable(table);
1547 }
1548 }
1549
1550 @Override
1551 public int scannerOpenWithPrefix(ByteBuffer tableName,
1552 ByteBuffer startAndPrefix,
1553 List<ByteBuffer> columns,
1554 Map<ByteBuffer, ByteBuffer> attributes)
1555 throws IOError, TException {
1556
1557 Table table = null;
1558 try {
1559 table = getTable(tableName);
1560 Scan scan = new Scan(getBytes(startAndPrefix));
1561 addAttributes(scan, attributes);
1562 Filter f = new WhileMatchFilter(
1563 new PrefixFilter(getBytes(startAndPrefix)));
1564 scan.setFilter(f);
1565 if (columns != null && columns.size() != 0) {
1566 for(ByteBuffer column : columns) {
1567 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1568 if(famQf.length == 1) {
1569 scan.addFamily(famQf[0]);
1570 } else {
1571 scan.addColumn(famQf[0], famQf[1]);
1572 }
1573 }
1574 }
1575 return addScanner(table.getScanner(scan), false);
1576 } catch (IOException e) {
1577 LOG.warn(e.getMessage(), e);
1578 throw new IOError(Throwables.getStackTraceAsString(e));
1579 } finally{
1580 closeTable(table);
1581 }
1582 }
1583
1584 @Override
1585 public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1586 List<ByteBuffer> columns, long timestamp,
1587 Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1588
1589 Table table = null;
1590 try {
1591 table = getTable(tableName);
1592 Scan scan = new Scan(getBytes(startRow));
1593 addAttributes(scan, attributes);
1594 scan.setTimeRange(0, timestamp);
1595 if (columns != null && columns.size() != 0) {
1596 for (ByteBuffer column : columns) {
1597 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1598 if(famQf.length == 1) {
1599 scan.addFamily(famQf[0]);
1600 } else {
1601 scan.addColumn(famQf[0], famQf[1]);
1602 }
1603 }
1604 }
1605 return addScanner(table.getScanner(scan), false);
1606 } catch (IOException e) {
1607 LOG.warn(e.getMessage(), e);
1608 throw new IOError(Throwables.getStackTraceAsString(e));
1609 } finally{
1610 closeTable(table);
1611 }
1612 }
1613
1614 @Override
1615 public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1616 ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1617 Map<ByteBuffer, ByteBuffer> attributes)
1618 throws IOError, TException {
1619
1620 Table table = null;
1621 try {
1622 table = getTable(tableName);
1623 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1624 addAttributes(scan, attributes);
1625 scan.setTimeRange(0, timestamp);
1626 if (columns != null && columns.size() != 0) {
1627 for (ByteBuffer column : columns) {
1628 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1629 if(famQf.length == 1) {
1630 scan.addFamily(famQf[0]);
1631 } else {
1632 scan.addColumn(famQf[0], famQf[1]);
1633 }
1634 }
1635 }
1636 scan.setTimeRange(0, timestamp);
1637 return addScanner(table.getScanner(scan), false);
1638 } catch (IOException e) {
1639 LOG.warn(e.getMessage(), e);
1640 throw new IOError(Throwables.getStackTraceAsString(e));
1641 } finally{
1642 closeTable(table);
1643 }
1644 }
1645
1646 @Override
1647 public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1648 ByteBuffer tableName) throws IOError, TException {
1649
1650 Table table = null;
1651 try {
1652 TreeMap<ByteBuffer, ColumnDescriptor> columns =
1653 new TreeMap<ByteBuffer, ColumnDescriptor>();
1654
1655 table = getTable(tableName);
1656 HTableDescriptor desc = table.getTableDescriptor();
1657
1658 for (HColumnDescriptor e : desc.getFamilies()) {
1659 ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1660 columns.put(col.name, col);
1661 }
1662 return columns;
1663 } catch (IOException e) {
1664 LOG.warn(e.getMessage(), e);
1665 throw new IOError(Throwables.getStackTraceAsString(e));
1666 } finally {
1667 closeTable(table);
1668 }
1669 }
1670
1671 @Deprecated
1672 @Override
1673 public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1674 ByteBuffer family) throws IOError {
1675 try {
1676 Result result = getRowOrBefore(getBytes(tableName), getBytes(row), getBytes(family));
1677 return ThriftUtilities.cellFromHBase(result.rawCells());
1678 } catch (IOException e) {
1679 LOG.warn(e.getMessage(), e);
1680 throw new IOError(Throwables.getStackTraceAsString(e));
1681 }
1682 }
1683
1684 @Override
1685 public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1686 try {
1687 byte[] row = getBytes(searchRow);
1688 Result startRowResult =
1689 getRowOrBefore(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
1690
1691 if (startRowResult == null) {
1692 throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1693 + Bytes.toStringBinary(row));
1694 }
1695
1696
1697 HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1698 if (regionInfo == null) {
1699 throw new IOException("HRegionInfo REGIONINFO was null or " +
1700 " empty in Meta for row="
1701 + Bytes.toStringBinary(row));
1702 }
1703 TRegionInfo region = new TRegionInfo();
1704 region.setStartKey(regionInfo.getStartKey());
1705 region.setEndKey(regionInfo.getEndKey());
1706 region.id = regionInfo.getRegionId();
1707 region.setName(regionInfo.getRegionName());
1708 region.version = regionInfo.getVersion();
1709
1710
1711 ServerName serverName = HRegionInfo.getServerName(startRowResult);
1712 if (serverName != null) {
1713 region.setServerName(Bytes.toBytes(serverName.getHostname()));
1714 region.port = serverName.getPort();
1715 }
1716 return region;
1717 } catch (IOException e) {
1718 LOG.warn(e.getMessage(), e);
1719 throw new IOError(Throwables.getStackTraceAsString(e));
1720 }
1721 }
1722
1723 private void closeTable(Table table) throws IOError
1724 {
1725 try{
1726 if(table != null){
1727 table.close();
1728 }
1729 } catch (IOException e){
1730 LOG.error(e.getMessage(), e);
1731 throw new IOError(Throwables.getStackTraceAsString(e));
1732 }
1733 }
1734
1735 private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
1736 Scan scan = new Scan(row);
1737 scan.setReversed(true);
1738 scan.addFamily(family);
1739 scan.setStartRow(row);
1740 Table table = getTable(tableName);
1741 try (ResultScanner scanner = table.getScanner(scan)) {
1742 return scanner.next();
1743 } finally{
1744 if(table != null){
1745 table.close();
1746 }
1747 }
1748 }
1749
1750 private void initMetrics(ThriftMetrics metrics) {
1751 this.metrics = metrics;
1752 }
1753
1754 @Override
1755 public void increment(TIncrement tincrement) throws IOError, TException {
1756
1757 if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1758 throw new TException("Must supply a table and a row key; can't increment");
1759 }
1760
1761 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1762 this.coalescer.queueIncrement(tincrement);
1763 return;
1764 }
1765
1766 Table table = null;
1767 try {
1768 table = getTable(tincrement.getTable());
1769 Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1770 table.increment(inc);
1771 } catch (IOException e) {
1772 LOG.warn(e.getMessage(), e);
1773 throw new IOError(Throwables.getStackTraceAsString(e));
1774 } finally{
1775 closeTable(table);
1776 }
1777 }
1778
1779 @Override
1780 public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1781 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1782 this.coalescer.queueIncrements(tincrements);
1783 return;
1784 }
1785 for (TIncrement tinc : tincrements) {
1786 increment(tinc);
1787 }
1788 }
1789
1790 @Override
1791 public List<TCell> append(TAppend tappend) throws IOError, TException {
1792 if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1793 throw new TException("Must supply a table and a row key; can't append");
1794 }
1795
1796 Table table = null;
1797 try {
1798 table = getTable(tappend.getTable());
1799 Append append = ThriftUtilities.appendFromThrift(tappend);
1800 Result result = table.append(append);
1801 return ThriftUtilities.cellFromHBase(result.rawCells());
1802 } catch (IOException e) {
1803 LOG.warn(e.getMessage(), e);
1804 throw new IOError(Throwables.getStackTraceAsString(e));
1805 } finally{
1806 closeTable(table);
1807 }
1808 }
1809
1810 @Override
1811 public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1812 ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1813 IllegalArgument, TException {
1814 Put put;
1815 try {
1816 put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1817 addAttributes(put, attributes);
1818
1819 byte[][] famAndQf = KeyValue.parseColumn(getBytes(mput.column));
1820
1821 put.addImmutable(famAndQf[0], famAndQf[1], mput.value != null ? getBytes(mput.value)
1822 : HConstants.EMPTY_BYTE_ARRAY);
1823
1824 put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1825 } catch (IllegalArgumentException e) {
1826 LOG.warn(e.getMessage(), e);
1827 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1828 }
1829
1830 Table table = null;
1831 try {
1832 table = getTable(tableName);
1833 byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
1834 return table.checkAndPut(getBytes(row), famAndQf[0], famAndQf[1],
1835 value != null ? getBytes(value) : HConstants.EMPTY_BYTE_ARRAY, put);
1836 } catch (IOException e) {
1837 LOG.warn(e.getMessage(), e);
1838 throw new IOError(Throwables.getStackTraceAsString(e));
1839 } catch (IllegalArgumentException e) {
1840 LOG.warn(e.getMessage(), e);
1841 throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1842 } finally {
1843 closeTable(table);
1844 }
1845 }
1846 }
1847
1848
1849
1850
1851
1852
1853 private static void addAttributes(OperationWithAttributes op,
1854 Map<ByteBuffer, ByteBuffer> attributes) {
1855 if (attributes == null || attributes.size() == 0) {
1856 return;
1857 }
1858 for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1859 String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1860 byte[] value = getBytes(entry.getValue());
1861 op.setAttribute(name, value);
1862 }
1863 }
1864
1865 public static void registerFilters(Configuration conf) {
1866 String[] filters = conf.getStrings("hbase.thrift.filters");
1867 if(filters != null) {
1868 for(String filterClass: filters) {
1869 String[] filterPart = filterClass.split(":");
1870 if(filterPart.length != 2) {
1871 LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1872 } else {
1873 ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1874 }
1875 }
1876 }
1877 }
1878 }