1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift;
20
21 import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.UnknownHostException;
27 import java.nio.ByteBuffer;
28 import java.security.PrivilegedAction;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.TreeMap;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41
42 import javax.security.auth.callback.Callback;
43 import javax.security.auth.callback.UnsupportedCallbackException;
44 import javax.security.sasl.AuthorizeCallback;
45 import javax.security.sasl.Sasl;
46 import javax.security.sasl.SaslServer;
47
48 import org.apache.commons.cli.CommandLine;
49 import org.apache.commons.cli.Option;
50 import org.apache.commons.cli.OptionGroup;
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53 import org.apache.hadoop.classification.InterfaceAudience;
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.hadoop.hbase.HBaseConfiguration;
56 import org.apache.hadoop.hbase.HColumnDescriptor;
57 import org.apache.hadoop.hbase.HConstants;
58 import org.apache.hadoop.hbase.HRegionInfo;
59 import org.apache.hadoop.hbase.HTableDescriptor;
60 import org.apache.hadoop.hbase.KeyValue;
61 import org.apache.hadoop.hbase.ServerName;
62 import org.apache.hadoop.hbase.TableName;
63 import org.apache.hadoop.hbase.TableNotFoundException;
64 import org.apache.hadoop.hbase.client.Delete;
65 import org.apache.hadoop.hbase.client.Durability;
66 import org.apache.hadoop.hbase.client.Get;
67 import org.apache.hadoop.hbase.client.HBaseAdmin;
68 import org.apache.hadoop.hbase.client.HTable;
69 import org.apache.hadoop.hbase.client.Increment;
70 import org.apache.hadoop.hbase.client.OperationWithAttributes;
71 import org.apache.hadoop.hbase.client.Put;
72 import org.apache.hadoop.hbase.client.Result;
73 import org.apache.hadoop.hbase.client.ResultScanner;
74 import org.apache.hadoop.hbase.client.Scan;
75 import org.apache.hadoop.hbase.filter.Filter;
76 import org.apache.hadoop.hbase.filter.ParseFilter;
77 import org.apache.hadoop.hbase.filter.PrefixFilter;
78 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
79 import org.apache.hadoop.hbase.security.SecurityUtil;
80 import org.apache.hadoop.hbase.security.UserProvider;
81 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
82 import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
83 import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
84 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
85 import org.apache.hadoop.hbase.thrift.generated.Hbase;
86 import org.apache.hadoop.hbase.thrift.generated.IOError;
87 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
88 import org.apache.hadoop.hbase.thrift.generated.Mutation;
89 import org.apache.hadoop.hbase.thrift.generated.TCell;
90 import org.apache.hadoop.hbase.thrift.generated.TIncrement;
91 import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
92 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
93 import org.apache.hadoop.hbase.thrift.generated.TScan;
94 import org.apache.hadoop.hbase.util.Bytes;
95 import org.apache.hadoop.hbase.util.ConnectionCache;
96 import org.apache.hadoop.hbase.util.Strings;
97 import org.apache.hadoop.net.DNS;
98 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
99 import org.apache.hadoop.security.UserGroupInformation;
100 import org.apache.thrift.TException;
101 import org.apache.thrift.TProcessor;
102 import org.apache.thrift.protocol.TBinaryProtocol;
103 import org.apache.thrift.protocol.TCompactProtocol;
104 import org.apache.thrift.protocol.TProtocol;
105 import org.apache.thrift.protocol.TProtocolFactory;
106 import org.apache.thrift.server.THsHaServer;
107 import org.apache.thrift.server.TNonblockingServer;
108 import org.apache.thrift.server.TServer;
109 import org.apache.thrift.server.TThreadedSelectorServer;
110 import org.apache.thrift.transport.TFramedTransport;
111 import org.apache.thrift.transport.TNonblockingServerSocket;
112 import org.apache.thrift.transport.TNonblockingServerTransport;
113 import org.apache.thrift.transport.TSaslServerTransport;
114 import org.apache.thrift.transport.TServerSocket;
115 import org.apache.thrift.transport.TServerTransport;
116 import org.apache.thrift.transport.TTransportFactory;
117
118 import com.google.common.base.Joiner;
119 import com.google.common.util.concurrent.ThreadFactoryBuilder;
120
121
122
123
124
125 @InterfaceAudience.Private
126 @SuppressWarnings("deprecation")
127 public class ThriftServerRunner implements Runnable {
128
129 private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
130
131 static final String SERVER_TYPE_CONF_KEY =
132 "hbase.regionserver.thrift.server.type";
133
134 static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
135 static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
136 static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
137 static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
138 static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
139 static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
140
141
142
143
144
145
146
147
148
149
150 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
151
152 private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
153 public static final int DEFAULT_LISTEN_PORT = 9090;
154 private final int listenPort;
155
156 private Configuration conf;
157 volatile TServer tserver;
158 private final Hbase.Iface handler;
159 private final ThriftMetrics metrics;
160 private final HBaseHandler hbaseHandler;
161 private final UserGroupInformation realUser;
162
163 private final String qop;
164 private String host;
165
166
167 enum ImplType {
168 HS_HA("hsha", true, THsHaServer.class, true),
169 NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
170 THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
171 THREADED_SELECTOR(
172 "threadedselector", true, TThreadedSelectorServer.class, true);
173
174 public static final ImplType DEFAULT = THREAD_POOL;
175
176 final String option;
177 final boolean isAlwaysFramed;
178 final Class<? extends TServer> serverClass;
179 final boolean canSpecifyBindIP;
180
181 ImplType(String option, boolean isAlwaysFramed,
182 Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
183 this.option = option;
184 this.isAlwaysFramed = isAlwaysFramed;
185 this.serverClass = serverClass;
186 this.canSpecifyBindIP = canSpecifyBindIP;
187 }
188
189
190
191
192
193 @Override
194 public String toString() {
195 return "-" + option;
196 }
197
198 String getDescription() {
199 StringBuilder sb = new StringBuilder("Use the " +
200 serverClass.getSimpleName());
201 if (isAlwaysFramed) {
202 sb.append(" This implies the framed transport.");
203 }
204 if (this == DEFAULT) {
205 sb.append("This is the default.");
206 }
207 return sb.toString();
208 }
209
210 static OptionGroup createOptionGroup() {
211 OptionGroup group = new OptionGroup();
212 for (ImplType t : values()) {
213 group.addOption(new Option(t.option, t.getDescription()));
214 }
215 return group;
216 }
217
218 static ImplType getServerImpl(Configuration conf) {
219 String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
220 for (ImplType t : values()) {
221 if (confType.equals(t.option)) {
222 return t;
223 }
224 }
225 throw new AssertionError("Unknown server ImplType.option:" + confType);
226 }
227
228 static void setServerImpl(CommandLine cmd, Configuration conf) {
229 ImplType chosenType = null;
230 int numChosen = 0;
231 for (ImplType t : values()) {
232 if (cmd.hasOption(t.option)) {
233 chosenType = t;
234 ++numChosen;
235 }
236 }
237 if (numChosen < 1) {
238 LOG.info("Using default thrift server type");
239 chosenType = DEFAULT;
240 } else if (numChosen > 1) {
241 throw new AssertionError("Exactly one option out of " +
242 Arrays.toString(values()) + " has to be specified");
243 }
244 LOG.info("Using thrift server type " + chosenType.option);
245 conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
246 }
247
248 public String simpleClassName() {
249 return serverClass.getSimpleName();
250 }
251
252 public static List<String> serversThatCannotSpecifyBindIP() {
253 List<String> l = new ArrayList<String>();
254 for (ImplType t : values()) {
255 if (!t.canSpecifyBindIP) {
256 l.add(t.simpleClassName());
257 }
258 }
259 return l;
260 }
261
262 }
263
264 public ThriftServerRunner(Configuration conf) throws IOException {
265 UserProvider userProvider = UserProvider.instantiate(conf);
266
267 boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
268 && userProvider.isHBaseSecurityEnabled();
269 if (securityEnabled) {
270 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
271 conf.get("hbase.thrift.dns.interface", "default"),
272 conf.get("hbase.thrift.dns.nameserver", "default")));
273 userProvider.login("hbase.thrift.keytab.file",
274 "hbase.thrift.kerberos.principal", host);
275 }
276 this.conf = HBaseConfiguration.create(conf);
277 this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
278 this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
279 this.hbaseHandler = new HBaseHandler(conf, userProvider);
280 this.hbaseHandler.initMetrics(metrics);
281 this.handler = HbaseHandlerMetricsProxy.newInstance(
282 hbaseHandler, metrics, conf);
283 this.realUser = userProvider.getCurrent().getUGI();
284 qop = conf.get(THRIFT_QOP_KEY);
285 if (qop != null) {
286 if (!qop.equals("auth") && !qop.equals("auth-int")
287 && !qop.equals("auth-conf")) {
288 throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
289 + ", it must be 'auth', 'auth-int', or 'auth-conf'");
290 }
291 if (!securityEnabled) {
292 throw new IOException("Thrift server must"
293 + " run in secure mode to support authentication");
294 }
295 }
296 }
297
298
299
300
301 @Override
302 public void run() {
303 realUser.doAs(
304 new PrivilegedAction<Object>() {
305 @Override
306 public Object run() {
307 try {
308 setupServer();
309 tserver.serve();
310 } catch (Exception e) {
311 LOG.fatal("Cannot run ThriftServer", e);
312
313 System.exit(-1);
314 }
315 return null;
316 }
317 });
318 }
319
320 public void shutdown() {
321 if (tserver != null) {
322 tserver.stop();
323 tserver = null;
324 }
325 }
326
327
328
329
330 private void setupServer() throws Exception {
331
332 TProtocolFactory protocolFactory;
333 if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
334 LOG.debug("Using compact protocol");
335 protocolFactory = new TCompactProtocol.Factory();
336 } else {
337 LOG.debug("Using binary protocol");
338 protocolFactory = new TBinaryProtocol.Factory();
339 }
340
341 final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
342 ImplType implType = ImplType.getServerImpl(conf);
343 TProcessor processor = p;
344
345
346 TTransportFactory transportFactory;
347 if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
348 if (qop != null) {
349 throw new RuntimeException("Thrift server authentication"
350 + " doesn't work with framed transport yet");
351 }
352 transportFactory = new TFramedTransport.Factory(
353 conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2) * 1024 * 1024);
354 LOG.debug("Using framed transport");
355 } else if (qop == null) {
356 transportFactory = new TTransportFactory();
357 } else {
358
359 String name = SecurityUtil.getUserFromPrincipal(
360 conf.get("hbase.thrift.kerberos.principal"));
361 Map<String, String> saslProperties = new HashMap<String, String>();
362 saslProperties.put(Sasl.QOP, qop);
363 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
364 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
365 new SaslGssCallbackHandler() {
366 @Override
367 public void handle(Callback[] callbacks)
368 throws UnsupportedCallbackException {
369 AuthorizeCallback ac = null;
370 for (Callback callback : callbacks) {
371 if (callback instanceof AuthorizeCallback) {
372 ac = (AuthorizeCallback) callback;
373 } else {
374 throw new UnsupportedCallbackException(callback,
375 "Unrecognized SASL GSSAPI Callback");
376 }
377 }
378 if (ac != null) {
379 String authid = ac.getAuthenticationID();
380 String authzid = ac.getAuthorizationID();
381 if (!authid.equals(authzid)) {
382 ac.setAuthorized(false);
383 } else {
384 ac.setAuthorized(true);
385 String userName = SecurityUtil.getUserFromPrincipal(authzid);
386 LOG.info("Effective user: " + userName);
387 ac.setAuthorizedID(userName);
388 }
389 }
390 }
391 });
392 transportFactory = saslFactory;
393
394
395 processor = new TProcessor() {
396 @Override
397 public boolean process(TProtocol inProt,
398 TProtocol outProt) throws TException {
399 TSaslServerTransport saslServerTransport =
400 (TSaslServerTransport)inProt.getTransport();
401 SaslServer saslServer = saslServerTransport.getSaslServer();
402 String principal = saslServer.getAuthorizationID();
403 hbaseHandler.setEffectiveUser(principal);
404 return p.process(inProt, outProt);
405 }
406 };
407 }
408
409 if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
410 LOG.error("Server types " + Joiner.on(", ").join(
411 ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
412 "address binding at the moment. See " +
413 "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
414 throw new RuntimeException(
415 "-" + BIND_CONF_KEY + " not supported with " + implType);
416 }
417
418 if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
419 implType == ImplType.THREADED_SELECTOR) {
420
421 InetAddress listenAddress = getBindAddress(conf);
422 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
423 new InetSocketAddress(listenAddress, listenPort));
424
425 if (implType == ImplType.NONBLOCKING) {
426 TNonblockingServer.Args serverArgs =
427 new TNonblockingServer.Args(serverTransport);
428 serverArgs.processor(processor)
429 .transportFactory(transportFactory)
430 .protocolFactory(protocolFactory);
431 tserver = new TNonblockingServer(serverArgs);
432 } else if (implType == ImplType.HS_HA) {
433 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
434 CallQueue callQueue =
435 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
436 ExecutorService executorService = createExecutor(
437 callQueue, serverArgs.getWorkerThreads());
438 serverArgs.executorService(executorService)
439 .processor(processor)
440 .transportFactory(transportFactory)
441 .protocolFactory(protocolFactory);
442 tserver = new THsHaServer(serverArgs);
443 } else {
444 TThreadedSelectorServer.Args serverArgs =
445 new HThreadedSelectorServerArgs(serverTransport, conf);
446 CallQueue callQueue =
447 new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
448 ExecutorService executorService = createExecutor(
449 callQueue, serverArgs.getWorkerThreads());
450 serverArgs.executorService(executorService)
451 .processor(processor)
452 .transportFactory(transportFactory)
453 .protocolFactory(protocolFactory);
454 tserver = new TThreadedSelectorServer(serverArgs);
455 }
456 LOG.info("starting HBase " + implType.simpleClassName() +
457 " server on " + Integer.toString(listenPort));
458 } else if (implType == ImplType.THREAD_POOL) {
459
460 InetAddress listenAddress = getBindAddress(conf);
461
462 TServerTransport serverTransport = new TServerSocket(
463 new InetSocketAddress(listenAddress, listenPort));
464
465 TBoundedThreadPoolServer.Args serverArgs =
466 new TBoundedThreadPoolServer.Args(serverTransport, conf);
467 serverArgs.processor(processor)
468 .transportFactory(transportFactory)
469 .protocolFactory(protocolFactory);
470 LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
471 + listenAddress + ":" + Integer.toString(listenPort)
472 + "; " + serverArgs);
473 TBoundedThreadPoolServer tserver =
474 new TBoundedThreadPoolServer(serverArgs, metrics);
475 this.tserver = tserver;
476 } else {
477 throw new AssertionError("Unsupported Thrift server implementation: " +
478 implType.simpleClassName());
479 }
480
481
482 if (tserver.getClass() != implType.serverClass) {
483 throw new AssertionError("Expected to create Thrift server class " +
484 implType.serverClass.getName() + " but got " +
485 tserver.getClass().getName());
486 }
487
488
489
490 registerFilters(conf);
491 }
492
493 ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
494 int workerThreads) {
495 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
496 tfb.setDaemon(true);
497 tfb.setNameFormat("thrift-worker-%d");
498 return new ThreadPoolExecutor(workerThreads, workerThreads,
499 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
500 }
501
502 private InetAddress getBindAddress(Configuration conf)
503 throws UnknownHostException {
504 String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
505 return InetAddress.getByName(bindAddressStr);
506 }
507
508 protected static class ResultScannerWrapper {
509
510 private final ResultScanner scanner;
511 private final boolean sortColumns;
512 public ResultScannerWrapper(ResultScanner resultScanner,
513 boolean sortResultColumns) {
514 scanner = resultScanner;
515 sortColumns = sortResultColumns;
516 }
517
518 public ResultScanner getScanner() {
519 return scanner;
520 }
521
522 public boolean isColumnSorted() {
523 return sortColumns;
524 }
525 }
526
527
528
529
530
531 public static class HBaseHandler implements Hbase.Iface {
532 protected Configuration conf;
533 protected final Log LOG = LogFactory.getLog(this.getClass().getName());
534
535
536 protected int nextScannerId = 0;
537 protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
538 private ThriftMetrics metrics = null;
539
540 private final ConnectionCache connectionCache;
541
542 private static ThreadLocal<Map<String, HTable>> threadLocalTables =
543 new ThreadLocal<Map<String, HTable>>() {
544 @Override
545 protected Map<String, HTable> initialValue() {
546 return new TreeMap<String, HTable>();
547 }
548 };
549
550 IncrementCoalescer coalescer = null;
551
552 static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
553 static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
554
555
556
557
558
559
560
561 byte[][] getAllColumns(HTable table) throws IOException {
562 HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
563 byte[][] columns = new byte[cds.length][];
564 for (int i = 0; i < cds.length; i++) {
565 columns[i] = Bytes.add(cds[i].getName(),
566 KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
567 }
568 return columns;
569 }
570
571
572
573
574
575
576
577
578
579
580 public HTable getTable(final byte[] tableName) throws
581 IOException {
582 String table = Bytes.toString(tableName);
583 Map<String, HTable> tables = threadLocalTables.get();
584 if (!tables.containsKey(table)) {
585 tables.put(table, (HTable)connectionCache.getTable(table));
586 }
587 return tables.get(table);
588 }
589
590 public HTable getTable(final ByteBuffer tableName) throws IOException {
591 return getTable(getBytes(tableName));
592 }
593
594
595
596
597
598
599
600
601 protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
602 int id = nextScannerId++;
603 ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
604 scannerMap.put(id, resultScannerWrapper);
605 return id;
606 }
607
608
609
610
611
612
613
614 protected synchronized ResultScannerWrapper getScanner(int id) {
615 return scannerMap.get(id);
616 }
617
618
619
620
621
622
623
624
625 protected synchronized ResultScannerWrapper removeScanner(int id) {
626 return scannerMap.remove(id);
627 }
628
629 protected HBaseHandler(final Configuration c,
630 final UserProvider userProvider) throws IOException {
631 this.conf = c;
632 scannerMap = new HashMap<Integer, ResultScannerWrapper>();
633 this.coalescer = new IncrementCoalescer(this);
634
635 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
636 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
637 connectionCache = new ConnectionCache(
638 conf, userProvider, cleanInterval, maxIdleTime);
639 }
640
641
642
643
644 private HBaseAdmin getHBaseAdmin() throws IOException {
645 return connectionCache.getAdmin();
646 }
647
648 void setEffectiveUser(String effectiveUser) {
649 connectionCache.setEffectiveUser(effectiveUser);
650 }
651
652 @Override
653 public void enableTable(ByteBuffer tableName) throws IOError {
654 try{
655 getHBaseAdmin().enableTable(getBytes(tableName));
656 } catch (IOException e) {
657 LOG.warn(e.getMessage(), e);
658 throw new IOError(e.getMessage());
659 }
660 }
661
662 @Override
663 public void disableTable(ByteBuffer tableName) throws IOError{
664 try{
665 getHBaseAdmin().disableTable(getBytes(tableName));
666 } catch (IOException e) {
667 LOG.warn(e.getMessage(), e);
668 throw new IOError(e.getMessage());
669 }
670 }
671
672 @Override
673 public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
674 try {
675 return HTable.isTableEnabled(this.conf, getBytes(tableName));
676 } catch (IOException e) {
677 LOG.warn(e.getMessage(), e);
678 throw new IOError(e.getMessage());
679 }
680 }
681
682 @Override
683 public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
684 try{
685 getHBaseAdmin().compact(getBytes(tableNameOrRegionName));
686 } catch (InterruptedException e) {
687 throw new IOError(e.getMessage());
688 } catch (IOException e) {
689 LOG.warn(e.getMessage(), e);
690 throw new IOError(e.getMessage());
691 }
692 }
693
694 @Override
695 public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
696 try{
697 getHBaseAdmin().majorCompact(getBytes(tableNameOrRegionName));
698 } catch (InterruptedException e) {
699 LOG.warn(e.getMessage(), e);
700 throw new IOError(e.getMessage());
701 } catch (IOException e) {
702 LOG.warn(e.getMessage(), e);
703 throw new IOError(e.getMessage());
704 }
705 }
706
707 @Override
708 public List<ByteBuffer> getTableNames() throws IOError {
709 try {
710 TableName[] tableNames = this.getHBaseAdmin().listTableNames();
711 ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
712 for (int i = 0; i < tableNames.length; i++) {
713 list.add(ByteBuffer.wrap(tableNames[i].getName()));
714 }
715 return list;
716 } catch (IOException e) {
717 LOG.warn(e.getMessage(), e);
718 throw new IOError(e.getMessage());
719 }
720 }
721
722
723
724
725 @Override
726 public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
727 throws IOError {
728 try {
729 HTable table;
730 try {
731 table = getTable(tableName);
732 } catch (TableNotFoundException ex) {
733 return new ArrayList<TRegionInfo>();
734 }
735 Map<HRegionInfo, ServerName> regionLocations =
736 table.getRegionLocations();
737 List<TRegionInfo> results = new ArrayList<TRegionInfo>();
738 for (Map.Entry<HRegionInfo, ServerName> entry :
739 regionLocations.entrySet()) {
740 HRegionInfo info = entry.getKey();
741 ServerName serverName = entry.getValue();
742 TRegionInfo region = new TRegionInfo();
743 region.serverName = ByteBuffer.wrap(
744 Bytes.toBytes(serverName.getHostname()));
745 region.port = serverName.getPort();
746 region.startKey = ByteBuffer.wrap(info.getStartKey());
747 region.endKey = ByteBuffer.wrap(info.getEndKey());
748 region.id = info.getRegionId();
749 region.name = ByteBuffer.wrap(info.getRegionName());
750 region.version = info.getVersion();
751 results.add(region);
752 }
753 return results;
754 } catch (TableNotFoundException e) {
755
756 return Collections.emptyList();
757 } catch (IOException e){
758 LOG.warn(e.getMessage(), e);
759 throw new IOError(e.getMessage());
760 }
761 }
762
763 @Deprecated
764 @Override
765 public List<TCell> get(
766 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
767 Map<ByteBuffer, ByteBuffer> attributes)
768 throws IOError {
769 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
770 if (famAndQf.length == 1) {
771 return get(tableName, row, famAndQf[0], null, attributes);
772 }
773 if (famAndQf.length == 2) {
774 return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
775 }
776 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
777 }
778
779
780
781
782
783
784
785
786 protected List<TCell> get(ByteBuffer tableName,
787 ByteBuffer row,
788 byte[] family,
789 byte[] qualifier,
790 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
791 try {
792 HTable table = getTable(tableName);
793 Get get = new Get(getBytes(row));
794 addAttributes(get, attributes);
795 if (qualifier == null) {
796 get.addFamily(family);
797 } else {
798 get.addColumn(family, qualifier);
799 }
800 Result result = table.get(get);
801 return ThriftUtilities.cellFromHBase(result.rawCells());
802 } catch (IOException e) {
803 LOG.warn(e.getMessage(), e);
804 throw new IOError(e.getMessage());
805 }
806 }
807
808 @Deprecated
809 @Override
810 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
811 int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
812 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
813 if(famAndQf.length == 1) {
814 return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
815 }
816 if (famAndQf.length == 2) {
817 return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
818 }
819 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
820
821 }
822
823
824
825
826
827
828
829
830
831 public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
832 byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
833 try {
834 HTable table = getTable(tableName);
835 Get get = new Get(getBytes(row));
836 addAttributes(get, attributes);
837 if (null == qualifier) {
838 get.addFamily(family);
839 } else {
840 get.addColumn(family, qualifier);
841 }
842 get.setMaxVersions(numVersions);
843 Result result = table.get(get);
844 return ThriftUtilities.cellFromHBase(result.rawCells());
845 } catch (IOException e) {
846 LOG.warn(e.getMessage(), e);
847 throw new IOError(e.getMessage());
848 }
849 }
850
851 @Deprecated
852 @Override
853 public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
854 long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
855 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
856 if (famAndQf.length == 1) {
857 return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
858 }
859 if (famAndQf.length == 2) {
860 return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
861 attributes);
862 }
863 throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
864 }
865
866
867
868
869
870
871
872
873 protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
874 byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
875 throws IOError {
876 try {
877 HTable table = getTable(tableName);
878 Get get = new Get(getBytes(row));
879 addAttributes(get, attributes);
880 if (null == qualifier) {
881 get.addFamily(family);
882 } else {
883 get.addColumn(family, qualifier);
884 }
885 get.setTimeRange(0, timestamp);
886 get.setMaxVersions(numVersions);
887 Result result = table.get(get);
888 return ThriftUtilities.cellFromHBase(result.rawCells());
889 } catch (IOException e) {
890 LOG.warn(e.getMessage(), e);
891 throw new IOError(e.getMessage());
892 }
893 }
894
895 @Override
896 public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
897 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
898 return getRowWithColumnsTs(tableName, row, null,
899 HConstants.LATEST_TIMESTAMP,
900 attributes);
901 }
902
903 @Override
904 public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
905 ByteBuffer row,
906 List<ByteBuffer> columns,
907 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
908 return getRowWithColumnsTs(tableName, row, columns,
909 HConstants.LATEST_TIMESTAMP,
910 attributes);
911 }
912
913 @Override
914 public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
915 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
916 return getRowWithColumnsTs(tableName, row, null,
917 timestamp, attributes);
918 }
919
920 @Override
921 public List<TRowResult> getRowWithColumnsTs(
922 ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
923 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
924 try {
925 HTable table = getTable(tableName);
926 if (columns == null) {
927 Get get = new Get(getBytes(row));
928 addAttributes(get, attributes);
929 get.setTimeRange(0, timestamp);
930 Result result = table.get(get);
931 return ThriftUtilities.rowResultFromHBase(result);
932 }
933 Get get = new Get(getBytes(row));
934 addAttributes(get, attributes);
935 for(ByteBuffer column : columns) {
936 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
937 if (famAndQf.length == 1) {
938 get.addFamily(famAndQf[0]);
939 } else {
940 get.addColumn(famAndQf[0], famAndQf[1]);
941 }
942 }
943 get.setTimeRange(0, timestamp);
944 Result result = table.get(get);
945 return ThriftUtilities.rowResultFromHBase(result);
946 } catch (IOException e) {
947 LOG.warn(e.getMessage(), e);
948 throw new IOError(e.getMessage());
949 }
950 }
951
952 @Override
953 public List<TRowResult> getRows(ByteBuffer tableName,
954 List<ByteBuffer> rows,
955 Map<ByteBuffer, ByteBuffer> attributes)
956 throws IOError {
957 return getRowsWithColumnsTs(tableName, rows, null,
958 HConstants.LATEST_TIMESTAMP,
959 attributes);
960 }
961
962 @Override
963 public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
964 List<ByteBuffer> rows,
965 List<ByteBuffer> columns,
966 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
967 return getRowsWithColumnsTs(tableName, rows, columns,
968 HConstants.LATEST_TIMESTAMP,
969 attributes);
970 }
971
972 @Override
973 public List<TRowResult> getRowsTs(ByteBuffer tableName,
974 List<ByteBuffer> rows,
975 long timestamp,
976 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
977 return getRowsWithColumnsTs(tableName, rows, null,
978 timestamp, attributes);
979 }
980
981 @Override
982 public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
983 List<ByteBuffer> rows,
984 List<ByteBuffer> columns, long timestamp,
985 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
986 try {
987 List<Get> gets = new ArrayList<Get>(rows.size());
988 HTable table = getTable(tableName);
989 if (metrics != null) {
990 metrics.incNumRowKeysInBatchGet(rows.size());
991 }
992 for (ByteBuffer row : rows) {
993 Get get = new Get(getBytes(row));
994 addAttributes(get, attributes);
995 if (columns != null) {
996
997 for(ByteBuffer column : columns) {
998 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
999 if (famAndQf.length == 1) {
1000 get.addFamily(famAndQf[0]);
1001 } else {
1002 get.addColumn(famAndQf[0], famAndQf[1]);
1003 }
1004 }
1005 }
1006 get.setTimeRange(0, timestamp);
1007 gets.add(get);
1008 }
1009 Result[] result = table.get(gets);
1010 return ThriftUtilities.rowResultFromHBase(result);
1011 } catch (IOException e) {
1012 LOG.warn(e.getMessage(), e);
1013 throw new IOError(e.getMessage());
1014 }
1015 }
1016
1017 @Override
1018 public void deleteAll(
1019 ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1020 Map<ByteBuffer, ByteBuffer> attributes)
1021 throws IOError {
1022 deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1023 attributes);
1024 }
1025
1026 @Override
1027 public void deleteAllTs(ByteBuffer tableName,
1028 ByteBuffer row,
1029 ByteBuffer column,
1030 long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1031 try {
1032 HTable table = getTable(tableName);
1033 Delete delete = new Delete(getBytes(row));
1034 addAttributes(delete, attributes);
1035 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1036 if (famAndQf.length == 1) {
1037 delete.deleteFamily(famAndQf[0], timestamp);
1038 } else {
1039 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1040 }
1041 table.delete(delete);
1042
1043 } catch (IOException e) {
1044 LOG.warn(e.getMessage(), e);
1045 throw new IOError(e.getMessage());
1046 }
1047 }
1048
1049 @Override
1050 public void deleteAllRow(
1051 ByteBuffer tableName, ByteBuffer row,
1052 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1053 deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1054 }
1055
1056 @Override
1057 public void deleteAllRowTs(
1058 ByteBuffer tableName, ByteBuffer row, long timestamp,
1059 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1060 try {
1061 HTable table = getTable(tableName);
1062 Delete delete = new Delete(getBytes(row), timestamp);
1063 addAttributes(delete, attributes);
1064 table.delete(delete);
1065 } catch (IOException e) {
1066 LOG.warn(e.getMessage(), e);
1067 throw new IOError(e.getMessage());
1068 }
1069 }
1070
1071 @Override
1072 public void createTable(ByteBuffer in_tableName,
1073 List<ColumnDescriptor> columnFamilies) throws IOError,
1074 IllegalArgument, AlreadyExists {
1075 byte [] tableName = getBytes(in_tableName);
1076 try {
1077 if (getHBaseAdmin().tableExists(tableName)) {
1078 throw new AlreadyExists("table name already in use");
1079 }
1080 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1081 for (ColumnDescriptor col : columnFamilies) {
1082 HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1083 desc.addFamily(colDesc);
1084 }
1085 getHBaseAdmin().createTable(desc);
1086 } catch (IOException e) {
1087 LOG.warn(e.getMessage(), e);
1088 throw new IOError(e.getMessage());
1089 } catch (IllegalArgumentException e) {
1090 LOG.warn(e.getMessage(), e);
1091 throw new IllegalArgument(e.getMessage());
1092 }
1093 }
1094
1095 @Override
1096 public void deleteTable(ByteBuffer in_tableName) throws IOError {
1097 byte [] tableName = getBytes(in_tableName);
1098 if (LOG.isDebugEnabled()) {
1099 LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
1100 }
1101 try {
1102 if (!getHBaseAdmin().tableExists(tableName)) {
1103 throw new IOException("table does not exist");
1104 }
1105 getHBaseAdmin().deleteTable(tableName);
1106 } catch (IOException e) {
1107 LOG.warn(e.getMessage(), e);
1108 throw new IOError(e.getMessage());
1109 }
1110 }
1111
1112 @Override
1113 public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1114 List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1115 throws IOError, IllegalArgument {
1116 mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1117 attributes);
1118 }
1119
1120 @Override
1121 public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1122 List<Mutation> mutations, long timestamp,
1123 Map<ByteBuffer, ByteBuffer> attributes)
1124 throws IOError, IllegalArgument {
1125 HTable table = null;
1126 try {
1127 table = getTable(tableName);
1128 Put put = new Put(getBytes(row), timestamp);
1129 addAttributes(put, attributes);
1130
1131 Delete delete = new Delete(getBytes(row));
1132 addAttributes(delete, attributes);
1133 if (metrics != null) {
1134 metrics.incNumRowKeysInBatchMutate(mutations.size());
1135 }
1136
1137
1138 for (Mutation m : mutations) {
1139 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1140 if (m.isDelete) {
1141 if (famAndQf.length == 1) {
1142 delete.deleteFamily(famAndQf[0], timestamp);
1143 } else {
1144 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1145 }
1146 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1147 : Durability.SKIP_WAL);
1148 } else {
1149 if(famAndQf.length == 1) {
1150 LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1151 + "over the whole column family.");
1152 } else {
1153 put.addImmutable(famAndQf[0], famAndQf[1],
1154 m.value != null ? getBytes(m.value)
1155 : HConstants.EMPTY_BYTE_ARRAY);
1156 }
1157 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1158 }
1159 }
1160 if (!delete.isEmpty())
1161 table.delete(delete);
1162 if (!put.isEmpty())
1163 table.put(put);
1164 } catch (IOException e) {
1165 LOG.warn(e.getMessage(), e);
1166 throw new IOError(e.getMessage());
1167 } catch (IllegalArgumentException e) {
1168 LOG.warn(e.getMessage(), e);
1169 throw new IllegalArgument(e.getMessage());
1170 }
1171 }
1172
1173 @Override
1174 public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1175 Map<ByteBuffer, ByteBuffer> attributes)
1176 throws IOError, IllegalArgument, TException {
1177 mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1178 }
1179
1180 @Override
1181 public void mutateRowsTs(
1182 ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1183 Map<ByteBuffer, ByteBuffer> attributes)
1184 throws IOError, IllegalArgument, TException {
1185 List<Put> puts = new ArrayList<Put>();
1186 List<Delete> deletes = new ArrayList<Delete>();
1187
1188 for (BatchMutation batch : rowBatches) {
1189 byte[] row = getBytes(batch.row);
1190 List<Mutation> mutations = batch.mutations;
1191 Delete delete = new Delete(row);
1192 addAttributes(delete, attributes);
1193 Put put = new Put(row, timestamp);
1194 addAttributes(put, attributes);
1195 for (Mutation m : mutations) {
1196 byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1197 if (m.isDelete) {
1198
1199 if (famAndQf.length == 1) {
1200 delete.deleteFamily(famAndQf[0], timestamp);
1201 } else {
1202 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1203 }
1204 delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1205 : Durability.SKIP_WAL);
1206 } else {
1207 if (famAndQf.length == 1) {
1208 LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1209 + "over the whole column family.");
1210 }
1211 if (famAndQf.length == 2) {
1212 put.addImmutable(famAndQf[0], famAndQf[1],
1213 m.value != null ? getBytes(m.value)
1214 : HConstants.EMPTY_BYTE_ARRAY);
1215 } else {
1216 throw new IllegalArgumentException("Invalid famAndQf provided.");
1217 }
1218 put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1219 }
1220 }
1221 if (!delete.isEmpty())
1222 deletes.add(delete);
1223 if (!put.isEmpty())
1224 puts.add(put);
1225 }
1226
1227 HTable table = null;
1228 try {
1229 table = getTable(tableName);
1230 if (!puts.isEmpty())
1231 table.put(puts);
1232 if (!deletes.isEmpty())
1233 table.delete(deletes);
1234
1235 } catch (IOException e) {
1236 LOG.warn(e.getMessage(), e);
1237 throw new IOError(e.getMessage());
1238 } catch (IllegalArgumentException e) {
1239 LOG.warn(e.getMessage(), e);
1240 throw new IllegalArgument(e.getMessage());
1241 }
1242 }
1243
1244 @Deprecated
1245 @Override
1246 public long atomicIncrement(
1247 ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1248 throws IOError, IllegalArgument, TException {
1249 byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1250 if(famAndQf.length == 1) {
1251 return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1252 }
1253 return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1254 }
1255
1256 protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1257 byte [] family, byte [] qualifier, long amount)
1258 throws IOError, IllegalArgument, TException {
1259 HTable table;
1260 try {
1261 table = getTable(tableName);
1262 return table.incrementColumnValue(
1263 getBytes(row), family, qualifier, amount);
1264 } catch (IOException e) {
1265 LOG.warn(e.getMessage(), e);
1266 throw new IOError(e.getMessage());
1267 }
1268 }
1269
1270 @Override
1271 public void scannerClose(int id) throws IOError, IllegalArgument {
1272 LOG.debug("scannerClose: id=" + id);
1273 ResultScannerWrapper resultScannerWrapper = getScanner(id);
1274 if (resultScannerWrapper == null) {
1275 String message = "scanner ID is invalid";
1276 LOG.warn(message);
1277 throw new IllegalArgument("scanner ID is invalid");
1278 }
1279 resultScannerWrapper.getScanner().close();
1280 removeScanner(id);
1281 }
1282
1283 @Override
1284 public List<TRowResult> scannerGetList(int id,int nbRows)
1285 throws IllegalArgument, IOError {
1286 LOG.debug("scannerGetList: id=" + id);
1287 ResultScannerWrapper resultScannerWrapper = getScanner(id);
1288 if (null == resultScannerWrapper) {
1289 String message = "scanner ID is invalid";
1290 LOG.warn(message);
1291 throw new IllegalArgument("scanner ID is invalid");
1292 }
1293
1294 Result [] results = null;
1295 try {
1296 results = resultScannerWrapper.getScanner().next(nbRows);
1297 if (null == results) {
1298 return new ArrayList<TRowResult>();
1299 }
1300 } catch (IOException e) {
1301 LOG.warn(e.getMessage(), e);
1302 throw new IOError(e.getMessage());
1303 }
1304 return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1305 }
1306
1307 @Override
1308 public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1309 return scannerGetList(id,1);
1310 }
1311
1312 @Override
1313 public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1314 Map<ByteBuffer, ByteBuffer> attributes)
1315 throws IOError {
1316 try {
1317 HTable table = getTable(tableName);
1318 Scan scan = new Scan();
1319 addAttributes(scan, attributes);
1320 if (tScan.isSetStartRow()) {
1321 scan.setStartRow(tScan.getStartRow());
1322 }
1323 if (tScan.isSetStopRow()) {
1324 scan.setStopRow(tScan.getStopRow());
1325 }
1326 if (tScan.isSetTimestamp()) {
1327 scan.setTimeRange(0, tScan.getTimestamp());
1328 }
1329 if (tScan.isSetCaching()) {
1330 scan.setCaching(tScan.getCaching());
1331 }
1332 if (tScan.isSetBatchSize()) {
1333 scan.setBatch(tScan.getBatchSize());
1334 }
1335 if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1336 for(ByteBuffer column : tScan.getColumns()) {
1337 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1338 if(famQf.length == 1) {
1339 scan.addFamily(famQf[0]);
1340 } else {
1341 scan.addColumn(famQf[0], famQf[1]);
1342 }
1343 }
1344 }
1345 if (tScan.isSetFilterString()) {
1346 ParseFilter parseFilter = new ParseFilter();
1347 scan.setFilter(
1348 parseFilter.parseFilterString(tScan.getFilterString()));
1349 }
1350 return addScanner(table.getScanner(scan), tScan.sortColumns);
1351 } catch (IOException e) {
1352 LOG.warn(e.getMessage(), e);
1353 throw new IOError(e.getMessage());
1354 }
1355 }
1356
1357 @Override
1358 public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1359 List<ByteBuffer> columns,
1360 Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1361 try {
1362 HTable table = getTable(tableName);
1363 Scan scan = new Scan(getBytes(startRow));
1364 addAttributes(scan, attributes);
1365 if(columns != null && columns.size() != 0) {
1366 for(ByteBuffer column : columns) {
1367 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1368 if(famQf.length == 1) {
1369 scan.addFamily(famQf[0]);
1370 } else {
1371 scan.addColumn(famQf[0], famQf[1]);
1372 }
1373 }
1374 }
1375 return addScanner(table.getScanner(scan), false);
1376 } catch (IOException e) {
1377 LOG.warn(e.getMessage(), e);
1378 throw new IOError(e.getMessage());
1379 }
1380 }
1381
1382 @Override
1383 public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1384 ByteBuffer stopRow, List<ByteBuffer> columns,
1385 Map<ByteBuffer, ByteBuffer> attributes)
1386 throws IOError, TException {
1387 try {
1388 HTable table = getTable(tableName);
1389 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1390 addAttributes(scan, attributes);
1391 if(columns != null && columns.size() != 0) {
1392 for(ByteBuffer column : columns) {
1393 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1394 if(famQf.length == 1) {
1395 scan.addFamily(famQf[0]);
1396 } else {
1397 scan.addColumn(famQf[0], famQf[1]);
1398 }
1399 }
1400 }
1401 return addScanner(table.getScanner(scan), false);
1402 } catch (IOException e) {
1403 LOG.warn(e.getMessage(), e);
1404 throw new IOError(e.getMessage());
1405 }
1406 }
1407
1408 @Override
1409 public int scannerOpenWithPrefix(ByteBuffer tableName,
1410 ByteBuffer startAndPrefix,
1411 List<ByteBuffer> columns,
1412 Map<ByteBuffer, ByteBuffer> attributes)
1413 throws IOError, TException {
1414 try {
1415 HTable table = getTable(tableName);
1416 Scan scan = new Scan(getBytes(startAndPrefix));
1417 addAttributes(scan, attributes);
1418 Filter f = new WhileMatchFilter(
1419 new PrefixFilter(getBytes(startAndPrefix)));
1420 scan.setFilter(f);
1421 if (columns != null && columns.size() != 0) {
1422 for(ByteBuffer column : columns) {
1423 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1424 if(famQf.length == 1) {
1425 scan.addFamily(famQf[0]);
1426 } else {
1427 scan.addColumn(famQf[0], famQf[1]);
1428 }
1429 }
1430 }
1431 return addScanner(table.getScanner(scan), false);
1432 } catch (IOException e) {
1433 LOG.warn(e.getMessage(), e);
1434 throw new IOError(e.getMessage());
1435 }
1436 }
1437
1438 @Override
1439 public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1440 List<ByteBuffer> columns, long timestamp,
1441 Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1442 try {
1443 HTable table = getTable(tableName);
1444 Scan scan = new Scan(getBytes(startRow));
1445 addAttributes(scan, attributes);
1446 scan.setTimeRange(0, timestamp);
1447 if (columns != null && columns.size() != 0) {
1448 for (ByteBuffer column : columns) {
1449 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1450 if(famQf.length == 1) {
1451 scan.addFamily(famQf[0]);
1452 } else {
1453 scan.addColumn(famQf[0], famQf[1]);
1454 }
1455 }
1456 }
1457 return addScanner(table.getScanner(scan), false);
1458 } catch (IOException e) {
1459 LOG.warn(e.getMessage(), e);
1460 throw new IOError(e.getMessage());
1461 }
1462 }
1463
1464 @Override
1465 public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1466 ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1467 Map<ByteBuffer, ByteBuffer> attributes)
1468 throws IOError, TException {
1469 try {
1470 HTable table = getTable(tableName);
1471 Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1472 addAttributes(scan, attributes);
1473 scan.setTimeRange(0, timestamp);
1474 if (columns != null && columns.size() != 0) {
1475 for (ByteBuffer column : columns) {
1476 byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1477 if(famQf.length == 1) {
1478 scan.addFamily(famQf[0]);
1479 } else {
1480 scan.addColumn(famQf[0], famQf[1]);
1481 }
1482 }
1483 }
1484 scan.setTimeRange(0, timestamp);
1485 return addScanner(table.getScanner(scan), false);
1486 } catch (IOException e) {
1487 LOG.warn(e.getMessage(), e);
1488 throw new IOError(e.getMessage());
1489 }
1490 }
1491
1492 @Override
1493 public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1494 ByteBuffer tableName) throws IOError, TException {
1495 try {
1496 TreeMap<ByteBuffer, ColumnDescriptor> columns =
1497 new TreeMap<ByteBuffer, ColumnDescriptor>();
1498
1499 HTable table = getTable(tableName);
1500 HTableDescriptor desc = table.getTableDescriptor();
1501
1502 for (HColumnDescriptor e : desc.getFamilies()) {
1503 ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1504 columns.put(col.name, col);
1505 }
1506 return columns;
1507 } catch (IOException e) {
1508 LOG.warn(e.getMessage(), e);
1509 throw new IOError(e.getMessage());
1510 }
1511 }
1512
1513 @Override
1514 public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1515 ByteBuffer family) throws IOError {
1516 try {
1517 HTable table = getTable(getBytes(tableName));
1518 Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
1519 return ThriftUtilities.cellFromHBase(result.rawCells());
1520 } catch (IOException e) {
1521 LOG.warn(e.getMessage(), e);
1522 throw new IOError(e.getMessage());
1523 }
1524 }
1525
1526 @Override
1527 public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1528 try {
1529 HTable table = getTable(TableName.META_TABLE_NAME.getName());
1530 byte[] row = getBytes(searchRow);
1531 Result startRowResult = table.getRowOrBefore(
1532 row, HConstants.CATALOG_FAMILY);
1533
1534 if (startRowResult == null) {
1535 throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1536 + Bytes.toStringBinary(row));
1537 }
1538
1539
1540 HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1541 if (regionInfo == null) {
1542 throw new IOException("HRegionInfo REGIONINFO was null or " +
1543 " empty in Meta for row="
1544 + Bytes.toStringBinary(row));
1545 }
1546 TRegionInfo region = new TRegionInfo();
1547 region.setStartKey(regionInfo.getStartKey());
1548 region.setEndKey(regionInfo.getEndKey());
1549 region.id = regionInfo.getRegionId();
1550 region.setName(regionInfo.getRegionName());
1551 region.version = regionInfo.getVersion();
1552
1553
1554 ServerName serverName = HRegionInfo.getServerName(startRowResult);
1555 if (serverName != null) {
1556 region.setServerName(Bytes.toBytes(serverName.getHostname()));
1557 region.port = serverName.getPort();
1558 }
1559 return region;
1560 } catch (IOException e) {
1561 LOG.warn(e.getMessage(), e);
1562 throw new IOError(e.getMessage());
1563 }
1564 }
1565
1566 private void initMetrics(ThriftMetrics metrics) {
1567 this.metrics = metrics;
1568 }
1569
1570 @Override
1571 public void increment(TIncrement tincrement) throws IOError, TException {
1572
1573 if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1574 throw new TException("Must supply a table and a row key; can't increment");
1575 }
1576
1577 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1578 this.coalescer.queueIncrement(tincrement);
1579 return;
1580 }
1581
1582 try {
1583 HTable table = getTable(tincrement.getTable());
1584 Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1585 table.increment(inc);
1586 } catch (IOException e) {
1587 LOG.warn(e.getMessage(), e);
1588 throw new IOError(e.getMessage());
1589 }
1590 }
1591
1592 @Override
1593 public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1594 if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1595 this.coalescer.queueIncrements(tincrements);
1596 return;
1597 }
1598 for (TIncrement tinc : tincrements) {
1599 increment(tinc);
1600 }
1601 }
1602 }
1603
1604
1605
1606
1607
1608
1609 private static void addAttributes(OperationWithAttributes op,
1610 Map<ByteBuffer, ByteBuffer> attributes) {
1611 if (attributes == null || attributes.size() == 0) {
1612 return;
1613 }
1614 for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1615 String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1616 byte[] value = getBytes(entry.getValue());
1617 op.setAttribute(name, value);
1618 }
1619 }
1620
1621 public static void registerFilters(Configuration conf) {
1622 String[] filters = conf.getStrings("hbase.thrift.filters");
1623 if(filters != null) {
1624 for(String filterClass: filters) {
1625 String[] filterPart = filterClass.split(":");
1626 if(filterPart.length != 2) {
1627 LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1628 } else {
1629 ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1630 }
1631 }
1632 }
1633 }
1634 }