View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import static org.apache.hadoop.hbase.util.Bytes.getBytes;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.nio.ByteBuffer;
28  import java.security.PrivilegedAction;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  import java.util.concurrent.BlockingQueue;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.LinkedBlockingQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  
42  import javax.security.auth.callback.Callback;
43  import javax.security.auth.callback.UnsupportedCallbackException;
44  import javax.security.sasl.AuthorizeCallback;
45  import javax.security.sasl.Sasl;
46  import javax.security.sasl.SaslServer;
47  
48  import org.apache.commons.cli.CommandLine;
49  import org.apache.commons.cli.Option;
50  import org.apache.commons.cli.OptionGroup;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.classification.InterfaceAudience;
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.hadoop.hbase.HBaseConfiguration;
56  import org.apache.hadoop.hbase.HColumnDescriptor;
57  import org.apache.hadoop.hbase.HConstants;
58  import org.apache.hadoop.hbase.HRegionInfo;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.KeyValue;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.client.Delete;
65  import org.apache.hadoop.hbase.client.Durability;
66  import org.apache.hadoop.hbase.client.Get;
67  import org.apache.hadoop.hbase.client.HBaseAdmin;
68  import org.apache.hadoop.hbase.client.HTable;
69  import org.apache.hadoop.hbase.client.Increment;
70  import org.apache.hadoop.hbase.client.OperationWithAttributes;
71  import org.apache.hadoop.hbase.client.Put;
72  import org.apache.hadoop.hbase.client.Result;
73  import org.apache.hadoop.hbase.client.ResultScanner;
74  import org.apache.hadoop.hbase.client.Scan;
75  import org.apache.hadoop.hbase.filter.Filter;
76  import org.apache.hadoop.hbase.filter.ParseFilter;
77  import org.apache.hadoop.hbase.filter.PrefixFilter;
78  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
79  import org.apache.hadoop.hbase.security.SecurityUtil;
80  import org.apache.hadoop.hbase.security.UserProvider;
81  import org.apache.hadoop.hbase.thrift.CallQueue.Call;
82  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
83  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
84  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
85  import org.apache.hadoop.hbase.thrift.generated.Hbase;
86  import org.apache.hadoop.hbase.thrift.generated.IOError;
87  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
88  import org.apache.hadoop.hbase.thrift.generated.Mutation;
89  import org.apache.hadoop.hbase.thrift.generated.TCell;
90  import org.apache.hadoop.hbase.thrift.generated.TIncrement;
91  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
92  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
93  import org.apache.hadoop.hbase.thrift.generated.TScan;
94  import org.apache.hadoop.hbase.util.Bytes;
95  import org.apache.hadoop.hbase.util.ConnectionCache;
96  import org.apache.hadoop.hbase.util.Strings;
97  import org.apache.hadoop.net.DNS;
98  import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
99  import org.apache.hadoop.security.UserGroupInformation;
100 import org.apache.thrift.TException;
101 import org.apache.thrift.TProcessor;
102 import org.apache.thrift.protocol.TBinaryProtocol;
103 import org.apache.thrift.protocol.TCompactProtocol;
104 import org.apache.thrift.protocol.TProtocol;
105 import org.apache.thrift.protocol.TProtocolFactory;
106 import org.apache.thrift.server.THsHaServer;
107 import org.apache.thrift.server.TNonblockingServer;
108 import org.apache.thrift.server.TServer;
109 import org.apache.thrift.server.TThreadedSelectorServer;
110 import org.apache.thrift.transport.TFramedTransport;
111 import org.apache.thrift.transport.TNonblockingServerSocket;
112 import org.apache.thrift.transport.TNonblockingServerTransport;
113 import org.apache.thrift.transport.TSaslServerTransport;
114 import org.apache.thrift.transport.TServerSocket;
115 import org.apache.thrift.transport.TServerTransport;
116 import org.apache.thrift.transport.TTransportFactory;
117 
118 import com.google.common.base.Joiner;
119 import com.google.common.util.concurrent.ThreadFactoryBuilder;
120 
121 /**
122  * ThriftServerRunner - this class starts up a Thrift server which implements
123  * the Hbase API specified in the Hbase.thrift IDL file.
124  */
125 @InterfaceAudience.Private
126 @SuppressWarnings("deprecation")
127 public class ThriftServerRunner implements Runnable {
128 
129   private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
130 
131   static final String SERVER_TYPE_CONF_KEY =
132       "hbase.regionserver.thrift.server.type";
133 
134   static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
135   static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
136   static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
137   static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
138   static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
139   static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
140 
141   /**
142    * Thrift quality of protection configuration key. Valid values can be:
143    * auth-conf: authentication, integrity and confidentiality checking
144    * auth-int: authentication and integrity checking
145    * auth: authentication only
146    *
147    * This is used to authenticate the callers and support impersonation.
148    * The thrift server and the HBase cluster must run in secure mode.
149    */
150   static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
151 
152   private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
153   public static final int DEFAULT_LISTEN_PORT = 9090;
154   private final int listenPort;
155 
156   private Configuration conf;
157   volatile TServer tserver;
158   private final Hbase.Iface handler;
159   private final ThriftMetrics metrics;
160   private final HBaseHandler hbaseHandler;
161   private final UserGroupInformation realUser;
162 
163   private final String qop;
164   private String host;
165 
166   /** An enum of server implementation selections */
167   enum ImplType {
168     HS_HA("hsha", true, THsHaServer.class, true),
169     NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
170     THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
171     THREADED_SELECTOR(
172         "threadedselector", true, TThreadedSelectorServer.class, true);
173 
174     public static final ImplType DEFAULT = THREAD_POOL;
175 
176     final String option;
177     final boolean isAlwaysFramed;
178     final Class<? extends TServer> serverClass;
179     final boolean canSpecifyBindIP;
180 
181     ImplType(String option, boolean isAlwaysFramed,
182         Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
183       this.option = option;
184       this.isAlwaysFramed = isAlwaysFramed;
185       this.serverClass = serverClass;
186       this.canSpecifyBindIP = canSpecifyBindIP;
187     }
188 
189     /**
190      * @return <code>-option</code> so we can get the list of options from
191      *         {@link #values()}
192      */
193     @Override
194     public String toString() {
195       return "-" + option;
196     }
197 
198     String getDescription() {
199       StringBuilder sb = new StringBuilder("Use the " +
200           serverClass.getSimpleName());
201       if (isAlwaysFramed) {
202         sb.append(" This implies the framed transport.");
203       }
204       if (this == DEFAULT) {
205         sb.append("This is the default.");
206       }
207       return sb.toString();
208     }
209 
210     static OptionGroup createOptionGroup() {
211       OptionGroup group = new OptionGroup();
212       for (ImplType t : values()) {
213         group.addOption(new Option(t.option, t.getDescription()));
214       }
215       return group;
216     }
217 
218     static ImplType getServerImpl(Configuration conf) {
219       String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
220       for (ImplType t : values()) {
221         if (confType.equals(t.option)) {
222           return t;
223         }
224       }
225       throw new AssertionError("Unknown server ImplType.option:" + confType);
226     }
227 
228     static void setServerImpl(CommandLine cmd, Configuration conf) {
229       ImplType chosenType = null;
230       int numChosen = 0;
231       for (ImplType t : values()) {
232         if (cmd.hasOption(t.option)) {
233           chosenType = t;
234           ++numChosen;
235         }
236       }
237       if (numChosen < 1) {
238         LOG.info("Using default thrift server type");
239         chosenType = DEFAULT;
240       } else if (numChosen > 1) {
241         throw new AssertionError("Exactly one option out of " +
242           Arrays.toString(values()) + " has to be specified");
243       }
244       LOG.info("Using thrift server type " + chosenType.option);
245       conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
246     }
247 
248     public String simpleClassName() {
249       return serverClass.getSimpleName();
250     }
251 
252     public static List<String> serversThatCannotSpecifyBindIP() {
253       List<String> l = new ArrayList<String>();
254       for (ImplType t : values()) {
255         if (!t.canSpecifyBindIP) {
256           l.add(t.simpleClassName());
257         }
258       }
259       return l;
260     }
261 
262   }
263 
264   public ThriftServerRunner(Configuration conf) throws IOException {
265     UserProvider userProvider = UserProvider.instantiate(conf);
266     // login the server principal (if using secure Hadoop)
267     boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
268       && userProvider.isHBaseSecurityEnabled();
269     if (securityEnabled) {
270       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
271         conf.get("hbase.thrift.dns.interface", "default"),
272         conf.get("hbase.thrift.dns.nameserver", "default")));
273       userProvider.login("hbase.thrift.keytab.file",
274         "hbase.thrift.kerberos.principal", host);
275     }
276     this.conf = HBaseConfiguration.create(conf);
277     this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
278     this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
279     this.hbaseHandler = new HBaseHandler(conf, userProvider);
280     this.hbaseHandler.initMetrics(metrics);
281     this.handler = HbaseHandlerMetricsProxy.newInstance(
282       hbaseHandler, metrics, conf);
283     this.realUser = userProvider.getCurrent().getUGI();
284     qop = conf.get(THRIFT_QOP_KEY);
285     if (qop != null) {
286       if (!qop.equals("auth") && !qop.equals("auth-int")
287           && !qop.equals("auth-conf")) {
288         throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
289           + ", it must be 'auth', 'auth-int', or 'auth-conf'");
290       }
291       if (!securityEnabled) {
292         throw new IOException("Thrift server must"
293           + " run in secure mode to support authentication");
294       }
295     }
296   }
297 
298   /*
299    * Runs the Thrift server
300    */
301   @Override
302   public void run() {
303     realUser.doAs(
304       new PrivilegedAction<Object>() {
305         @Override
306         public Object run() {
307           try {
308             setupServer();
309             tserver.serve();
310           } catch (Exception e) {
311             LOG.fatal("Cannot run ThriftServer", e);
312             // Crash the process if the ThriftServer is not running
313             System.exit(-1);
314           }
315           return null;
316         }
317       });
318   }
319 
320   public void shutdown() {
321     if (tserver != null) {
322       tserver.stop();
323       tserver = null;
324     }
325   }
326 
327   /**
328    * Setting up the thrift TServer
329    */
330   private void setupServer() throws Exception {
331     // Construct correct ProtocolFactory
332     TProtocolFactory protocolFactory;
333     if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
334       LOG.debug("Using compact protocol");
335       protocolFactory = new TCompactProtocol.Factory();
336     } else {
337       LOG.debug("Using binary protocol");
338       protocolFactory = new TBinaryProtocol.Factory();
339     }
340 
341     final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
342     ImplType implType = ImplType.getServerImpl(conf);
343     TProcessor processor = p;
344 
345     // Construct correct TransportFactory
346     TTransportFactory transportFactory;
347     if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
348       if (qop != null) {
349         throw new RuntimeException("Thrift server authentication"
350           + " doesn't work with framed transport yet");
351       }
352       transportFactory = new TFramedTransport.Factory(
353           conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
354       LOG.debug("Using framed transport");
355     } else if (qop == null) {
356       transportFactory = new TTransportFactory();
357     } else {
358       // Extract the name from the principal
359       String name = SecurityUtil.getUserFromPrincipal(
360         conf.get("hbase.thrift.kerberos.principal"));
361       Map<String, String> saslProperties = new HashMap<String, String>();
362       saslProperties.put(Sasl.QOP, qop);
363       TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
364       saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
365         new SaslGssCallbackHandler() {
366           @Override
367           public void handle(Callback[] callbacks)
368               throws UnsupportedCallbackException {
369             AuthorizeCallback ac = null;
370             for (Callback callback : callbacks) {
371               if (callback instanceof AuthorizeCallback) {
372                 ac = (AuthorizeCallback) callback;
373               } else {
374                 throw new UnsupportedCallbackException(callback,
375                     "Unrecognized SASL GSSAPI Callback");
376               }
377             }
378             if (ac != null) {
379               String authid = ac.getAuthenticationID();
380               String authzid = ac.getAuthorizationID();
381               if (!authid.equals(authzid)) {
382                 ac.setAuthorized(false);
383               } else {
384                 ac.setAuthorized(true);
385                 String userName = SecurityUtil.getUserFromPrincipal(authzid);
386                 LOG.info("Effective user: " + userName);
387                 ac.setAuthorizedID(userName);
388               }
389             }
390           }
391         });
392       transportFactory = saslFactory;
393 
394       // Create a processor wrapper, to get the caller
395       processor = new TProcessor() {
396         @Override
397         public boolean process(TProtocol inProt,
398             TProtocol outProt) throws TException {
399           TSaslServerTransport saslServerTransport =
400             (TSaslServerTransport)inProt.getTransport();
401           SaslServer saslServer = saslServerTransport.getSaslServer();
402           String principal = saslServer.getAuthorizationID();
403           hbaseHandler.setEffectiveUser(principal);
404           return p.process(inProt, outProt);
405         }
406       };
407     }
408 
409     if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
410       LOG.error("Server types " + Joiner.on(", ").join(
411           ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
412           "address binding at the moment. See " +
413           "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
414       throw new RuntimeException(
415           "-" + BIND_CONF_KEY + " not supported with " + implType);
416     }
417 
418     if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
419         implType == ImplType.THREADED_SELECTOR) {
420 
421       InetAddress listenAddress = getBindAddress(conf);
422       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
423           new InetSocketAddress(listenAddress, listenPort));
424 
425       if (implType == ImplType.NONBLOCKING) {
426         TNonblockingServer.Args serverArgs =
427             new TNonblockingServer.Args(serverTransport);
428         serverArgs.processor(processor)
429                   .transportFactory(transportFactory)
430                   .protocolFactory(protocolFactory);
431         tserver = new TNonblockingServer(serverArgs);
432       } else if (implType == ImplType.HS_HA) {
433         THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
434         CallQueue callQueue =
435             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
436         ExecutorService executorService = createExecutor(
437             callQueue, serverArgs.getWorkerThreads());
438         serverArgs.executorService(executorService)
439                   .processor(processor)
440                   .transportFactory(transportFactory)
441                   .protocolFactory(protocolFactory);
442         tserver = new THsHaServer(serverArgs);
443       } else { // THREADED_SELECTOR
444         TThreadedSelectorServer.Args serverArgs =
445             new HThreadedSelectorServerArgs(serverTransport, conf);
446         CallQueue callQueue =
447             new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
448         ExecutorService executorService = createExecutor(
449             callQueue, serverArgs.getWorkerThreads());
450         serverArgs.executorService(executorService)
451                   .processor(processor)
452                   .transportFactory(transportFactory)
453                   .protocolFactory(protocolFactory);
454         tserver = new TThreadedSelectorServer(serverArgs);
455       }
456       LOG.info("starting HBase " + implType.simpleClassName() +
457           " server on " + Integer.toString(listenPort));
458     } else if (implType == ImplType.THREAD_POOL) {
459       // Thread pool server. Get the IP address to bind to.
460       InetAddress listenAddress = getBindAddress(conf);
461 
462       TServerTransport serverTransport = new TServerSocket(
463           new InetSocketAddress(listenAddress, listenPort));
464 
465       TBoundedThreadPoolServer.Args serverArgs =
466           new TBoundedThreadPoolServer.Args(serverTransport, conf);
467       serverArgs.processor(processor)
468                 .transportFactory(transportFactory)
469                 .protocolFactory(protocolFactory);
470       LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
471           + listenAddress + ":" + Integer.toString(listenPort)
472           + "; " + serverArgs);
473       TBoundedThreadPoolServer tserver =
474           new TBoundedThreadPoolServer(serverArgs, metrics);
475       this.tserver = tserver;
476     } else {
477       throw new AssertionError("Unsupported Thrift server implementation: " +
478           implType.simpleClassName());
479     }
480 
481     // A sanity check that we instantiated the right type of server.
482     if (tserver.getClass() != implType.serverClass) {
483       throw new AssertionError("Expected to create Thrift server class " +
484           implType.serverClass.getName() + " but got " +
485           tserver.getClass().getName());
486     }
487 
488 
489 
490     registerFilters(conf);
491   }
492 
493   ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
494                                  int workerThreads) {
495     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
496     tfb.setDaemon(true);
497     tfb.setNameFormat("thrift-worker-%d");
498     return new ThreadPoolExecutor(workerThreads, workerThreads,
499             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
500   }
501 
502   private InetAddress getBindAddress(Configuration conf)
503       throws UnknownHostException {
504     String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
505     return InetAddress.getByName(bindAddressStr);
506   }
507 
508   protected static class ResultScannerWrapper {
509 
510     private final ResultScanner scanner;
511     private final boolean sortColumns;
512     public ResultScannerWrapper(ResultScanner resultScanner,
513                                 boolean sortResultColumns) {
514       scanner = resultScanner;
515       sortColumns = sortResultColumns;
516    }
517 
518     public ResultScanner getScanner() {
519       return scanner;
520     }
521 
522     public boolean isColumnSorted() {
523       return sortColumns;
524     }
525   }
526 
527   /**
528    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
529    * HBase client API primarily defined in the HBaseAdmin and HTable objects.
530    */
531   public static class HBaseHandler implements Hbase.Iface {
532     protected Configuration conf;
533     protected final Log LOG = LogFactory.getLog(this.getClass().getName());
534 
535     // nextScannerId and scannerMap are used to manage scanner state
536     protected int nextScannerId = 0;
537     protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
538     private ThriftMetrics metrics = null;
539 
540     private final ConnectionCache connectionCache;
541 
542     private static ThreadLocal<Map<String, HTable>> threadLocalTables =
543         new ThreadLocal<Map<String, HTable>>() {
544       @Override
545       protected Map<String, HTable> initialValue() {
546         return new TreeMap<String, HTable>();
547       }
548     };
549 
550     IncrementCoalescer coalescer = null;
551 
552     static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
553     static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
554 
555     /**
556      * Returns a list of all the column families for a given htable.
557      *
558      * @param table
559      * @throws IOException
560      */
561     byte[][] getAllColumns(HTable table) throws IOException {
562       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
563       byte[][] columns = new byte[cds.length][];
564       for (int i = 0; i < cds.length; i++) {
565         columns[i] = Bytes.add(cds[i].getName(),
566             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
567       }
568       return columns;
569     }
570 
571     /**
572      * Creates and returns an HTable instance from a given table name.
573      *
574      * @param tableName
575      *          name of table
576      * @return HTable object
577      * @throws IOException
578      * @throws IOError
579      */
580     public HTable getTable(final byte[] tableName) throws
581         IOException {
582       String table = Bytes.toString(tableName);
583       Map<String, HTable> tables = threadLocalTables.get();
584       if (!tables.containsKey(table)) {
585         tables.put(table, (HTable)connectionCache.getTable(table));
586       }
587       return tables.get(table);
588     }
589 
590     public HTable getTable(final ByteBuffer tableName) throws IOException {
591       return getTable(getBytes(tableName));
592     }
593 
594     /**
595      * Assigns a unique ID to the scanner and adds the mapping to an internal
596      * hash-map.
597      *
598      * @param scanner
599      * @return integer scanner id
600      */
601     protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
602       int id = nextScannerId++;
603       ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
604       scannerMap.put(id, resultScannerWrapper);
605       return id;
606     }
607 
608     /**
609      * Returns the scanner associated with the specified ID.
610      *
611      * @param id
612      * @return a Scanner, or null if ID was invalid.
613      */
614     protected synchronized ResultScannerWrapper getScanner(int id) {
615       return scannerMap.get(id);
616     }
617 
618     /**
619      * Removes the scanner associated with the specified ID from the internal
620      * id->scanner hash-map.
621      *
622      * @param id
623      * @return a Scanner, or null if ID was invalid.
624      */
625     protected synchronized ResultScannerWrapper removeScanner(int id) {
626       return scannerMap.remove(id);
627     }
628 
629     protected HBaseHandler(final Configuration c,
630         final UserProvider userProvider) throws IOException {
631       this.conf = c;
632       scannerMap = new HashMap<Integer, ResultScannerWrapper>();
633       this.coalescer = new IncrementCoalescer(this);
634 
635       int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
636       int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
637       connectionCache = new ConnectionCache(
638         conf, userProvider, cleanInterval, maxIdleTime);
639     }
640 
641     /**
642      * Obtain HBaseAdmin. Creates the instance if it is not already created.
643      */
644     private HBaseAdmin getHBaseAdmin() throws IOException {
645       return connectionCache.getAdmin();
646     }
647 
648     void setEffectiveUser(String effectiveUser) {
649       connectionCache.setEffectiveUser(effectiveUser);
650     }
651 
652     @Override
653     public void enableTable(ByteBuffer tableName) throws IOError {
654       try{
655         getHBaseAdmin().enableTable(getBytes(tableName));
656       } catch (IOException e) {
657         LOG.warn(e.getMessage(), e);
658         throw new IOError(e.getMessage());
659       }
660     }
661 
662     @Override
663     public void disableTable(ByteBuffer tableName) throws IOError{
664       try{
665         getHBaseAdmin().disableTable(getBytes(tableName));
666       } catch (IOException e) {
667         LOG.warn(e.getMessage(), e);
668         throw new IOError(e.getMessage());
669       }
670     }
671 
672     @Override
673     public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
674       try {
675         return HTable.isTableEnabled(this.conf, getBytes(tableName));
676       } catch (IOException e) {
677         LOG.warn(e.getMessage(), e);
678         throw new IOError(e.getMessage());
679       }
680     }
681 
682     @Override
683     public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
684       try{
685         getHBaseAdmin().compact(getBytes(tableNameOrRegionName));
686       } catch (InterruptedException e) {
687         throw new IOError(e.getMessage());
688       } catch (IOException e) {
689         LOG.warn(e.getMessage(), e);
690         throw new IOError(e.getMessage());
691       }
692     }
693 
694     @Override
695     public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
696       try{
697         getHBaseAdmin().majorCompact(getBytes(tableNameOrRegionName));
698       } catch (InterruptedException e) {
699         LOG.warn(e.getMessage(), e);
700         throw new IOError(e.getMessage());
701       } catch (IOException e) {
702         LOG.warn(e.getMessage(), e);
703         throw new IOError(e.getMessage());
704       }
705     }
706 
707     @Override
708     public List<ByteBuffer> getTableNames() throws IOError {
709       try {
710         TableName[] tableNames = this.getHBaseAdmin().listTableNames();
711         ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tableNames.length);
712         for (int i = 0; i < tableNames.length; i++) {
713           list.add(ByteBuffer.wrap(tableNames[i].getName()));
714         }
715         return list;
716       } catch (IOException e) {
717         LOG.warn(e.getMessage(), e);
718         throw new IOError(e.getMessage());
719       }
720     }
721 
722     /**
723      * @return the list of regions in the given table, or an empty list if the table does not exist
724      */
725     @Override
726     public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
727     throws IOError {
728       try {
729         HTable table;
730         try {
731           table = getTable(tableName);
732         } catch (TableNotFoundException ex) {
733           return new ArrayList<TRegionInfo>();
734         }
735         Map<HRegionInfo, ServerName> regionLocations =
736             table.getRegionLocations();
737         List<TRegionInfo> results = new ArrayList<TRegionInfo>();
738         for (Map.Entry<HRegionInfo, ServerName> entry :
739             regionLocations.entrySet()) {
740           HRegionInfo info = entry.getKey();
741           ServerName serverName = entry.getValue();
742           TRegionInfo region = new TRegionInfo();
743           region.serverName = ByteBuffer.wrap(
744               Bytes.toBytes(serverName.getHostname()));
745           region.port = serverName.getPort();
746           region.startKey = ByteBuffer.wrap(info.getStartKey());
747           region.endKey = ByteBuffer.wrap(info.getEndKey());
748           region.id = info.getRegionId();
749           region.name = ByteBuffer.wrap(info.getRegionName());
750           region.version = info.getVersion();
751           results.add(region);
752         }
753         return results;
754       } catch (TableNotFoundException e) {
755         // Return empty list for non-existing table
756         return Collections.emptyList();
757       } catch (IOException e){
758         LOG.warn(e.getMessage(), e);
759         throw new IOError(e.getMessage());
760       }
761     }
762 
763     @Deprecated
764     @Override
765     public List<TCell> get(
766         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
767         Map<ByteBuffer, ByteBuffer> attributes)
768         throws IOError {
769       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
770       if (famAndQf.length == 1) {
771         return get(tableName, row, famAndQf[0], null, attributes);
772       }
773       if (famAndQf.length == 2) {
774         return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
775       }
776       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
777     }
778 
779     /**
780      * Note: this internal interface is slightly different from public APIs in regard to handling
781      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
782      * we respect qual == null as a request for the entire column family. The caller (
783      * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
784      * column is parse like normal.
785      */
786     protected List<TCell> get(ByteBuffer tableName,
787                               ByteBuffer row,
788                               byte[] family,
789                               byte[] qualifier,
790                               Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
791       try {
792         HTable table = getTable(tableName);
793         Get get = new Get(getBytes(row));
794         addAttributes(get, attributes);
795         if (qualifier == null) {
796           get.addFamily(family);
797         } else {
798           get.addColumn(family, qualifier);
799         }
800         Result result = table.get(get);
801         return ThriftUtilities.cellFromHBase(result.rawCells());
802       } catch (IOException e) {
803         LOG.warn(e.getMessage(), e);
804         throw new IOError(e.getMessage());
805       }
806     }
807 
808     @Deprecated
809     @Override
810     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
811         int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
812       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
813       if(famAndQf.length == 1) {
814         return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
815       }
816       if (famAndQf.length == 2) {
817         return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
818       }
819       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
820 
821     }
822 
823     /**
824      * Note: this public interface is slightly different from public Java APIs in regard to
825      * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
826      * Rather, we respect qual == null as a request for the entire column family. If you want to
827      * access the entire column family, use
828      * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
829      * that lacks a {@code ':'}.
830      */
831     public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
832         byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
833       try {
834         HTable table = getTable(tableName);
835         Get get = new Get(getBytes(row));
836         addAttributes(get, attributes);
837         if (null == qualifier) {
838           get.addFamily(family);
839         } else {
840           get.addColumn(family, qualifier);
841         }
842         get.setMaxVersions(numVersions);
843         Result result = table.get(get);
844         return ThriftUtilities.cellFromHBase(result.rawCells());
845       } catch (IOException e) {
846         LOG.warn(e.getMessage(), e);
847         throw new IOError(e.getMessage());
848       }
849     }
850 
851     @Deprecated
852     @Override
853     public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
854         long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
855       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
856       if (famAndQf.length == 1) {
857         return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
858       }
859       if (famAndQf.length == 2) {
860         return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
861           attributes);
862       }
863       throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
864     }
865 
866     /**
867      * Note: this internal interface is slightly different from public APIs in regard to handling
868      * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
869      * we respect qual == null as a request for the entire column family. The caller (
870      * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
871      * consistent in that the column is parse like normal.
872      */
873     protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
874         byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
875         throws IOError {
876       try {
877         HTable table = getTable(tableName);
878         Get get = new Get(getBytes(row));
879         addAttributes(get, attributes);
880         if (null == qualifier) {
881           get.addFamily(family);
882         } else {
883           get.addColumn(family, qualifier);
884         }
885         get.setTimeRange(0, timestamp);
886         get.setMaxVersions(numVersions);
887         Result result = table.get(get);
888         return ThriftUtilities.cellFromHBase(result.rawCells());
889       } catch (IOException e) {
890         LOG.warn(e.getMessage(), e);
891         throw new IOError(e.getMessage());
892       }
893     }
894 
895     @Override
896     public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
897         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
898       return getRowWithColumnsTs(tableName, row, null,
899                                  HConstants.LATEST_TIMESTAMP,
900                                  attributes);
901     }
902 
903     @Override
904     public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
905                                               ByteBuffer row,
906         List<ByteBuffer> columns,
907         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
908       return getRowWithColumnsTs(tableName, row, columns,
909                                  HConstants.LATEST_TIMESTAMP,
910                                  attributes);
911     }
912 
913     @Override
914     public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
915         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
916       return getRowWithColumnsTs(tableName, row, null,
917                                  timestamp, attributes);
918     }
919 
920     @Override
921     public List<TRowResult> getRowWithColumnsTs(
922         ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
923         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
924       try {
925         HTable table = getTable(tableName);
926         if (columns == null) {
927           Get get = new Get(getBytes(row));
928           addAttributes(get, attributes);
929           get.setTimeRange(0, timestamp);
930           Result result = table.get(get);
931           return ThriftUtilities.rowResultFromHBase(result);
932         }
933         Get get = new Get(getBytes(row));
934         addAttributes(get, attributes);
935         for(ByteBuffer column : columns) {
936           byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
937           if (famAndQf.length == 1) {
938               get.addFamily(famAndQf[0]);
939           } else {
940               get.addColumn(famAndQf[0], famAndQf[1]);
941           }
942         }
943         get.setTimeRange(0, timestamp);
944         Result result = table.get(get);
945         return ThriftUtilities.rowResultFromHBase(result);
946       } catch (IOException e) {
947         LOG.warn(e.getMessage(), e);
948         throw new IOError(e.getMessage());
949       }
950     }
951 
952     @Override
953     public List<TRowResult> getRows(ByteBuffer tableName,
954                                     List<ByteBuffer> rows,
955         Map<ByteBuffer, ByteBuffer> attributes)
956         throws IOError {
957       return getRowsWithColumnsTs(tableName, rows, null,
958                                   HConstants.LATEST_TIMESTAMP,
959                                   attributes);
960     }
961 
962     @Override
963     public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
964                                                List<ByteBuffer> rows,
965         List<ByteBuffer> columns,
966         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
967       return getRowsWithColumnsTs(tableName, rows, columns,
968                                   HConstants.LATEST_TIMESTAMP,
969                                   attributes);
970     }
971 
972     @Override
973     public List<TRowResult> getRowsTs(ByteBuffer tableName,
974                                       List<ByteBuffer> rows,
975         long timestamp,
976         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
977       return getRowsWithColumnsTs(tableName, rows, null,
978                                   timestamp, attributes);
979     }
980 
981     @Override
982     public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
983                                                  List<ByteBuffer> rows,
984         List<ByteBuffer> columns, long timestamp,
985         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
986       try {
987         List<Get> gets = new ArrayList<Get>(rows.size());
988         HTable table = getTable(tableName);
989         if (metrics != null) {
990           metrics.incNumRowKeysInBatchGet(rows.size());
991         }
992         for (ByteBuffer row : rows) {
993           Get get = new Get(getBytes(row));
994           addAttributes(get, attributes);
995           if (columns != null) {
996 
997             for(ByteBuffer column : columns) {
998               byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
999               if (famAndQf.length == 1) {
1000                 get.addFamily(famAndQf[0]);
1001               } else {
1002                 get.addColumn(famAndQf[0], famAndQf[1]);
1003               }
1004             }
1005           }
1006           get.setTimeRange(0, timestamp);
1007           gets.add(get);
1008         }
1009         Result[] result = table.get(gets);
1010         return ThriftUtilities.rowResultFromHBase(result);
1011       } catch (IOException e) {
1012         LOG.warn(e.getMessage(), e);
1013         throw new IOError(e.getMessage());
1014       }
1015     }
1016 
1017     @Override
1018     public void deleteAll(
1019         ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1020         Map<ByteBuffer, ByteBuffer> attributes)
1021         throws IOError {
1022       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
1023                   attributes);
1024     }
1025 
1026     @Override
1027     public void deleteAllTs(ByteBuffer tableName,
1028                             ByteBuffer row,
1029                             ByteBuffer column,
1030         long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1031       try {
1032         HTable table = getTable(tableName);
1033         Delete delete  = new Delete(getBytes(row));
1034         addAttributes(delete, attributes);
1035         byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1036         if (famAndQf.length == 1) {
1037           delete.deleteFamily(famAndQf[0], timestamp);
1038         } else {
1039           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1040         }
1041         table.delete(delete);
1042 
1043       } catch (IOException e) {
1044         LOG.warn(e.getMessage(), e);
1045         throw new IOError(e.getMessage());
1046       }
1047     }
1048 
1049     @Override
1050     public void deleteAllRow(
1051         ByteBuffer tableName, ByteBuffer row,
1052         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1053       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
1054     }
1055 
1056     @Override
1057     public void deleteAllRowTs(
1058         ByteBuffer tableName, ByteBuffer row, long timestamp,
1059         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1060       try {
1061         HTable table = getTable(tableName);
1062         Delete delete  = new Delete(getBytes(row), timestamp);
1063         addAttributes(delete, attributes);
1064         table.delete(delete);
1065       } catch (IOException e) {
1066         LOG.warn(e.getMessage(), e);
1067         throw new IOError(e.getMessage());
1068       }
1069     }
1070 
1071     @Override
1072     public void createTable(ByteBuffer in_tableName,
1073         List<ColumnDescriptor> columnFamilies) throws IOError,
1074         IllegalArgument, AlreadyExists {
1075       byte [] tableName = getBytes(in_tableName);
1076       try {
1077         if (getHBaseAdmin().tableExists(tableName)) {
1078           throw new AlreadyExists("table name already in use");
1079         }
1080         HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1081         for (ColumnDescriptor col : columnFamilies) {
1082           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
1083           desc.addFamily(colDesc);
1084         }
1085         getHBaseAdmin().createTable(desc);
1086       } catch (IOException e) {
1087         LOG.warn(e.getMessage(), e);
1088         throw new IOError(e.getMessage());
1089       } catch (IllegalArgumentException e) {
1090         LOG.warn(e.getMessage(), e);
1091         throw new IllegalArgument(e.getMessage());
1092       }
1093     }
1094 
1095     @Override
1096     public void deleteTable(ByteBuffer in_tableName) throws IOError {
1097       byte [] tableName = getBytes(in_tableName);
1098       if (LOG.isDebugEnabled()) {
1099         LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
1100       }
1101       try {
1102         if (!getHBaseAdmin().tableExists(tableName)) {
1103           throw new IOException("table does not exist");
1104         }
1105         getHBaseAdmin().deleteTable(tableName);
1106       } catch (IOException e) {
1107         LOG.warn(e.getMessage(), e);
1108         throw new IOError(e.getMessage());
1109       }
1110     }
1111 
1112     @Override
1113     public void mutateRow(ByteBuffer tableName, ByteBuffer row,
1114         List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
1115         throws IOError, IllegalArgument {
1116       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP,
1117                   attributes);
1118     }
1119 
1120     @Override
1121     public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
1122         List<Mutation> mutations, long timestamp,
1123         Map<ByteBuffer, ByteBuffer> attributes)
1124         throws IOError, IllegalArgument {
1125       HTable table = null;
1126       try {
1127         table = getTable(tableName);
1128         Put put = new Put(getBytes(row), timestamp);
1129         addAttributes(put, attributes);
1130 
1131         Delete delete = new Delete(getBytes(row));
1132         addAttributes(delete, attributes);
1133         if (metrics != null) {
1134           metrics.incNumRowKeysInBatchMutate(mutations.size());
1135         }
1136 
1137         // I apologize for all this mess :)
1138         for (Mutation m : mutations) {
1139           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1140           if (m.isDelete) {
1141             if (famAndQf.length == 1) {
1142               delete.deleteFamily(famAndQf[0], timestamp);
1143             } else {
1144               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1145             }
1146             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1147                 : Durability.SKIP_WAL);
1148           } else {
1149             if(famAndQf.length == 1) {
1150               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1151                   + "over the whole column family.");
1152             } else {
1153               put.addImmutable(famAndQf[0], famAndQf[1],
1154                   m.value != null ? getBytes(m.value)
1155                       : HConstants.EMPTY_BYTE_ARRAY);
1156             }
1157             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1158           }
1159         }
1160         if (!delete.isEmpty())
1161           table.delete(delete);
1162         if (!put.isEmpty())
1163           table.put(put);
1164       } catch (IOException e) {
1165         LOG.warn(e.getMessage(), e);
1166         throw new IOError(e.getMessage());
1167       } catch (IllegalArgumentException e) {
1168         LOG.warn(e.getMessage(), e);
1169         throw new IllegalArgument(e.getMessage());
1170       }
1171     }
1172 
1173     @Override
1174     public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
1175         Map<ByteBuffer, ByteBuffer> attributes)
1176         throws IOError, IllegalArgument, TException {
1177       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
1178     }
1179 
1180     @Override
1181     public void mutateRowsTs(
1182         ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
1183         Map<ByteBuffer, ByteBuffer> attributes)
1184         throws IOError, IllegalArgument, TException {
1185       List<Put> puts = new ArrayList<Put>();
1186       List<Delete> deletes = new ArrayList<Delete>();
1187 
1188       for (BatchMutation batch : rowBatches) {
1189         byte[] row = getBytes(batch.row);
1190         List<Mutation> mutations = batch.mutations;
1191         Delete delete = new Delete(row);
1192         addAttributes(delete, attributes);
1193         Put put = new Put(row, timestamp);
1194         addAttributes(put, attributes);
1195         for (Mutation m : mutations) {
1196           byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
1197           if (m.isDelete) {
1198             // no qualifier, family only.
1199             if (famAndQf.length == 1) {
1200               delete.deleteFamily(famAndQf[0], timestamp);
1201             } else {
1202               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
1203             }
1204             delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
1205                 : Durability.SKIP_WAL);
1206           } else {
1207             if (famAndQf.length == 1) {
1208               LOG.warn("No column qualifier specified. Delete is the only mutation supported "
1209                   + "over the whole column family.");
1210             }
1211             if (famAndQf.length == 2) {
1212               put.addImmutable(famAndQf[0], famAndQf[1],
1213                   m.value != null ? getBytes(m.value)
1214                       : HConstants.EMPTY_BYTE_ARRAY);
1215             } else {
1216               throw new IllegalArgumentException("Invalid famAndQf provided.");
1217             }
1218             put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1219           }
1220         }
1221         if (!delete.isEmpty())
1222           deletes.add(delete);
1223         if (!put.isEmpty())
1224           puts.add(put);
1225       }
1226 
1227       HTable table = null;
1228       try {
1229         table = getTable(tableName);
1230         if (!puts.isEmpty())
1231           table.put(puts);
1232         if (!deletes.isEmpty())
1233           table.delete(deletes);
1234 
1235       } catch (IOException e) {
1236         LOG.warn(e.getMessage(), e);
1237         throw new IOError(e.getMessage());
1238       } catch (IllegalArgumentException e) {
1239         LOG.warn(e.getMessage(), e);
1240         throw new IllegalArgument(e.getMessage());
1241       }
1242     }
1243 
1244     @Deprecated
1245     @Override
1246     public long atomicIncrement(
1247         ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
1248             throws IOError, IllegalArgument, TException {
1249       byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
1250       if(famAndQf.length == 1) {
1251         return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
1252       }
1253       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
1254     }
1255 
1256     protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
1257         byte [] family, byte [] qualifier, long amount)
1258         throws IOError, IllegalArgument, TException {
1259       HTable table;
1260       try {
1261         table = getTable(tableName);
1262         return table.incrementColumnValue(
1263             getBytes(row), family, qualifier, amount);
1264       } catch (IOException e) {
1265         LOG.warn(e.getMessage(), e);
1266         throw new IOError(e.getMessage());
1267       }
1268     }
1269 
1270     @Override
1271     public void scannerClose(int id) throws IOError, IllegalArgument {
1272       LOG.debug("scannerClose: id=" + id);
1273       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1274       if (resultScannerWrapper == null) {
1275         String message = "scanner ID is invalid";
1276         LOG.warn(message);
1277         throw new IllegalArgument("scanner ID is invalid");
1278       }
1279       resultScannerWrapper.getScanner().close();
1280       removeScanner(id);
1281     }
1282 
1283     @Override
1284     public List<TRowResult> scannerGetList(int id,int nbRows)
1285         throws IllegalArgument, IOError {
1286       LOG.debug("scannerGetList: id=" + id);
1287       ResultScannerWrapper resultScannerWrapper = getScanner(id);
1288       if (null == resultScannerWrapper) {
1289         String message = "scanner ID is invalid";
1290         LOG.warn(message);
1291         throw new IllegalArgument("scanner ID is invalid");
1292       }
1293 
1294       Result [] results = null;
1295       try {
1296         results = resultScannerWrapper.getScanner().next(nbRows);
1297         if (null == results) {
1298           return new ArrayList<TRowResult>();
1299         }
1300       } catch (IOException e) {
1301         LOG.warn(e.getMessage(), e);
1302         throw new IOError(e.getMessage());
1303       }
1304       return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
1305     }
1306 
1307     @Override
1308     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
1309       return scannerGetList(id,1);
1310     }
1311 
1312     @Override
1313     public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
1314         Map<ByteBuffer, ByteBuffer> attributes)
1315         throws IOError {
1316       try {
1317         HTable table = getTable(tableName);
1318         Scan scan = new Scan();
1319         addAttributes(scan, attributes);
1320         if (tScan.isSetStartRow()) {
1321           scan.setStartRow(tScan.getStartRow());
1322         }
1323         if (tScan.isSetStopRow()) {
1324           scan.setStopRow(tScan.getStopRow());
1325         }
1326         if (tScan.isSetTimestamp()) {
1327           scan.setTimeRange(0, tScan.getTimestamp());
1328         }
1329         if (tScan.isSetCaching()) {
1330           scan.setCaching(tScan.getCaching());
1331         }
1332         if (tScan.isSetBatchSize()) {
1333           scan.setBatch(tScan.getBatchSize());
1334         }
1335         if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
1336           for(ByteBuffer column : tScan.getColumns()) {
1337             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1338             if(famQf.length == 1) {
1339               scan.addFamily(famQf[0]);
1340             } else {
1341               scan.addColumn(famQf[0], famQf[1]);
1342             }
1343           }
1344         }
1345         if (tScan.isSetFilterString()) {
1346           ParseFilter parseFilter = new ParseFilter();
1347           scan.setFilter(
1348               parseFilter.parseFilterString(tScan.getFilterString()));
1349         }
1350         return addScanner(table.getScanner(scan), tScan.sortColumns);
1351       } catch (IOException e) {
1352         LOG.warn(e.getMessage(), e);
1353         throw new IOError(e.getMessage());
1354       }
1355     }
1356 
1357     @Override
1358     public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
1359         List<ByteBuffer> columns,
1360         Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
1361       try {
1362         HTable table = getTable(tableName);
1363         Scan scan = new Scan(getBytes(startRow));
1364         addAttributes(scan, attributes);
1365         if(columns != null && columns.size() != 0) {
1366           for(ByteBuffer column : columns) {
1367             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1368             if(famQf.length == 1) {
1369               scan.addFamily(famQf[0]);
1370             } else {
1371               scan.addColumn(famQf[0], famQf[1]);
1372             }
1373           }
1374         }
1375         return addScanner(table.getScanner(scan), false);
1376       } catch (IOException e) {
1377         LOG.warn(e.getMessage(), e);
1378         throw new IOError(e.getMessage());
1379       }
1380     }
1381 
1382     @Override
1383     public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
1384         ByteBuffer stopRow, List<ByteBuffer> columns,
1385         Map<ByteBuffer, ByteBuffer> attributes)
1386         throws IOError, TException {
1387       try {
1388         HTable table = getTable(tableName);
1389         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1390         addAttributes(scan, attributes);
1391         if(columns != null && columns.size() != 0) {
1392           for(ByteBuffer column : columns) {
1393             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1394             if(famQf.length == 1) {
1395               scan.addFamily(famQf[0]);
1396             } else {
1397               scan.addColumn(famQf[0], famQf[1]);
1398             }
1399           }
1400         }
1401         return addScanner(table.getScanner(scan), false);
1402       } catch (IOException e) {
1403         LOG.warn(e.getMessage(), e);
1404         throw new IOError(e.getMessage());
1405       }
1406     }
1407 
1408     @Override
1409     public int scannerOpenWithPrefix(ByteBuffer tableName,
1410                                      ByteBuffer startAndPrefix,
1411                                      List<ByteBuffer> columns,
1412         Map<ByteBuffer, ByteBuffer> attributes)
1413         throws IOError, TException {
1414       try {
1415         HTable table = getTable(tableName);
1416         Scan scan = new Scan(getBytes(startAndPrefix));
1417         addAttributes(scan, attributes);
1418         Filter f = new WhileMatchFilter(
1419             new PrefixFilter(getBytes(startAndPrefix)));
1420         scan.setFilter(f);
1421         if (columns != null && columns.size() != 0) {
1422           for(ByteBuffer column : columns) {
1423             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1424             if(famQf.length == 1) {
1425               scan.addFamily(famQf[0]);
1426             } else {
1427               scan.addColumn(famQf[0], famQf[1]);
1428             }
1429           }
1430         }
1431         return addScanner(table.getScanner(scan), false);
1432       } catch (IOException e) {
1433         LOG.warn(e.getMessage(), e);
1434         throw new IOError(e.getMessage());
1435       }
1436     }
1437 
1438     @Override
1439     public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1440         List<ByteBuffer> columns, long timestamp,
1441         Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1442       try {
1443         HTable table = getTable(tableName);
1444         Scan scan = new Scan(getBytes(startRow));
1445         addAttributes(scan, attributes);
1446         scan.setTimeRange(0, timestamp);
1447         if (columns != null && columns.size() != 0) {
1448           for (ByteBuffer column : columns) {
1449             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1450             if(famQf.length == 1) {
1451               scan.addFamily(famQf[0]);
1452             } else {
1453               scan.addColumn(famQf[0], famQf[1]);
1454             }
1455           }
1456         }
1457         return addScanner(table.getScanner(scan), false);
1458       } catch (IOException e) {
1459         LOG.warn(e.getMessage(), e);
1460         throw new IOError(e.getMessage());
1461       }
1462     }
1463 
1464     @Override
1465     public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1466         ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1467         Map<ByteBuffer, ByteBuffer> attributes)
1468         throws IOError, TException {
1469       try {
1470         HTable table = getTable(tableName);
1471         Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1472         addAttributes(scan, attributes);
1473         scan.setTimeRange(0, timestamp);
1474         if (columns != null && columns.size() != 0) {
1475           for (ByteBuffer column : columns) {
1476             byte [][] famQf = KeyValue.parseColumn(getBytes(column));
1477             if(famQf.length == 1) {
1478               scan.addFamily(famQf[0]);
1479             } else {
1480               scan.addColumn(famQf[0], famQf[1]);
1481             }
1482           }
1483         }
1484         scan.setTimeRange(0, timestamp);
1485         return addScanner(table.getScanner(scan), false);
1486       } catch (IOException e) {
1487         LOG.warn(e.getMessage(), e);
1488         throw new IOError(e.getMessage());
1489       }
1490     }
1491 
1492     @Override
1493     public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1494         ByteBuffer tableName) throws IOError, TException {
1495       try {
1496         TreeMap<ByteBuffer, ColumnDescriptor> columns =
1497           new TreeMap<ByteBuffer, ColumnDescriptor>();
1498 
1499         HTable table = getTable(tableName);
1500         HTableDescriptor desc = table.getTableDescriptor();
1501 
1502         for (HColumnDescriptor e : desc.getFamilies()) {
1503           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1504           columns.put(col.name, col);
1505         }
1506         return columns;
1507       } catch (IOException e) {
1508         LOG.warn(e.getMessage(), e);
1509         throw new IOError(e.getMessage());
1510       }
1511     }
1512 
1513     @Override
1514     public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
1515         ByteBuffer family) throws IOError {
1516       try {
1517         HTable table = getTable(getBytes(tableName));
1518         Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
1519         return ThriftUtilities.cellFromHBase(result.rawCells());
1520       } catch (IOException e) {
1521         LOG.warn(e.getMessage(), e);
1522         throw new IOError(e.getMessage());
1523       }
1524     }
1525 
1526     @Override
1527     public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1528       try {
1529         HTable table = getTable(TableName.META_TABLE_NAME.getName());
1530         byte[] row = getBytes(searchRow);
1531         Result startRowResult = table.getRowOrBefore(
1532           row, HConstants.CATALOG_FAMILY);
1533 
1534         if (startRowResult == null) {
1535           throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1536                                 + Bytes.toStringBinary(row));
1537         }
1538 
1539         // find region start and end keys
1540         HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(startRowResult);
1541         if (regionInfo == null) {
1542           throw new IOException("HRegionInfo REGIONINFO was null or " +
1543                                 " empty in Meta for row="
1544                                 + Bytes.toStringBinary(row));
1545         }
1546         TRegionInfo region = new TRegionInfo();
1547         region.setStartKey(regionInfo.getStartKey());
1548         region.setEndKey(regionInfo.getEndKey());
1549         region.id = regionInfo.getRegionId();
1550         region.setName(regionInfo.getRegionName());
1551         region.version = regionInfo.getVersion();
1552 
1553         // find region assignment to server
1554         ServerName serverName = HRegionInfo.getServerName(startRowResult);
1555         if (serverName != null) {
1556           region.setServerName(Bytes.toBytes(serverName.getHostname()));
1557           region.port = serverName.getPort();
1558         }
1559         return region;
1560       } catch (IOException e) {
1561         LOG.warn(e.getMessage(), e);
1562         throw new IOError(e.getMessage());
1563       }
1564     }
1565 
1566     private void initMetrics(ThriftMetrics metrics) {
1567       this.metrics = metrics;
1568     }
1569 
1570     @Override
1571     public void increment(TIncrement tincrement) throws IOError, TException {
1572 
1573       if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1574         throw new TException("Must supply a table and a row key; can't increment");
1575       }
1576 
1577       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1578         this.coalescer.queueIncrement(tincrement);
1579         return;
1580       }
1581 
1582       try {
1583         HTable table = getTable(tincrement.getTable());
1584         Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1585         table.increment(inc);
1586       } catch (IOException e) {
1587         LOG.warn(e.getMessage(), e);
1588         throw new IOError(e.getMessage());
1589       }
1590     }
1591 
1592     @Override
1593     public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1594       if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1595         this.coalescer.queueIncrements(tincrements);
1596         return;
1597       }
1598       for (TIncrement tinc : tincrements) {
1599         increment(tinc);
1600       }
1601     }
1602   }
1603 
1604 
1605 
1606   /**
1607    * Adds all the attributes into the Operation object
1608    */
1609   private static void addAttributes(OperationWithAttributes op,
1610     Map<ByteBuffer, ByteBuffer> attributes) {
1611     if (attributes == null || attributes.size() == 0) {
1612       return;
1613     }
1614     for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1615       String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1616       byte[] value =  getBytes(entry.getValue());
1617       op.setAttribute(name, value);
1618     }
1619   }
1620 
1621   public static void registerFilters(Configuration conf) {
1622     String[] filters = conf.getStrings("hbase.thrift.filters");
1623     if(filters != null) {
1624       for(String filterClass: filters) {
1625         String[] filterPart = filterClass.split(":");
1626         if(filterPart.length != 2) {
1627           LOG.warn("Invalid filter specification " + filterClass + " - skipping");
1628         } else {
1629           ParseFilter.registerFilter(filterPart[0], filterPart[1]);
1630         }
1631       }
1632     }
1633   }
1634 }