View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.NavigableMap;
30  import java.util.TreeMap;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import com.google.protobuf.InvalidProtocolBufferException;
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.classification.InterfaceStability;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.HBaseConfiguration;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HRegionLocation;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.KeyValueUtil;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.client.coprocessor.Batch;
57  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
58  import org.apache.hadoop.hbase.filter.BinaryComparator;
59  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
60  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
61  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
62  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
63  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
64  import org.apache.hadoop.hbase.protobuf.RequestConverter;
65  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
70  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.Pair;
73  import org.apache.hadoop.hbase.util.Threads;
74  
75  import com.google.protobuf.Descriptors;
76  import com.google.protobuf.GeneratedMessage;
77  import com.google.protobuf.Message;
78  import com.google.protobuf.Service;
79  import com.google.protobuf.ServiceException;
80  
81  /**
82   * <p>Used to communicate with a single HBase table.  An implementation of
83   * {@link HTableInterface}.  Instances of this class can be constructed directly but it is
84   * encouraged that users get instances via {@link HConnection} and {@link HConnectionManager}.
85   * See {@link HConnectionManager} class comment for an example.
86   *
87   * <p>This class is not thread safe for reads nor write.
88   *
89   * <p>In case of writes (Put, Delete), the underlying write buffer can
90   * be corrupted if multiple threads contend over a single HTable instance.
91   *
92   * <p>In case of reads, some fields used by a Scan are shared among all threads.
93   * The HTable implementation can either not contract to be safe in case of a Get
94   *
95   * <p>Instances of HTable passed the same {@link Configuration} instance will
96   * share connections to servers out on the cluster and to the zookeeper ensemble
97   * as well as caches of region locations.  This is usually a *good* thing and it
98   * is recommended to reuse the same configuration object for all your tables.
99   * This happens because they will all share the same underlying
100  * {@link HConnection} instance. See {@link HConnectionManager} for more on
101  * how this mechanism works.
102  *
103  * <p>{@link HConnection} will read most of the
104  * configuration it needs from the passed {@link Configuration} on initial
105  * construction.  Thereafter, for settings such as
106  * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
107  * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
108  * passed {@link Configuration} subsequent to {@link HConnection} construction
109  * will go unnoticed.  To run with changed values, make a new
110  * {@link HTable} passing a new {@link Configuration} instance that has the
111  * new configuration.
112  *
113  * <p>Note that this class implements the {@link Closeable} interface. When a
114  * HTable instance is no longer required, it *should* be closed in order to ensure
115  * that the underlying resources are promptly released. Please note that the close
116  * method can throw java.io.IOException that must be handled.
117  *
118  * @see HBaseAdmin for create, drop, list, enable and disable of tables.
119  * @see HConnection
120  * @see HConnectionManager
121  */
122 @InterfaceAudience.Public
123 @InterfaceStability.Stable
124 public class HTable implements HTableInterface {
125   private static final Log LOG = LogFactory.getLog(HTable.class);
126   protected HConnection connection;
127   private final TableName tableName;
128   private volatile Configuration configuration;
129   protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
130   private long writeBufferSize;
131   private boolean clearBufferOnFail;
132   private boolean autoFlush;
133   protected long currentWriteBufferSize;
134   protected int scannerCaching;
135   private int maxKeyValueSize;
136   private ExecutorService pool;  // For Multi
137   private boolean closed;
138   private int operationTimeout;
139   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
140   private final boolean cleanupConnectionOnClose; // close the connection in close()
141 
142   /** The Async process for puts with autoflush set to false or multiputs */
143   protected AsyncProcess<Object> ap;
144   private RpcRetryingCallerFactory rpcCallerFactory;
145   private RpcControllerFactory rpcControllerFactory;
146 
147   /**
148    * Creates an object to access a HBase table.
149    * Shares zookeeper connection and other resources with other HTable instances
150    * created with the same <code>conf</code> instance.  Uses already-populated
151    * region cache if one is available, populated by any other HTable instances
152    * sharing this <code>conf</code> instance.  Recommended.
153    * @param conf Configuration object to use.
154    * @param tableName Name of the table.
155    * @throws IOException if a remote or network exception occurs
156    */
157   public HTable(Configuration conf, final String tableName)
158   throws IOException {
159     this(conf, TableName.valueOf(tableName));
160   }
161 
162   /**
163    * Creates an object to access a HBase table.
164    * Shares zookeeper connection and other resources with other HTable instances
165    * created with the same <code>conf</code> instance.  Uses already-populated
166    * region cache if one is available, populated by any other HTable instances
167    * sharing this <code>conf</code> instance.  Recommended.
168    * @param conf Configuration object to use.
169    * @param tableName Name of the table.
170    * @throws IOException if a remote or network exception occurs
171    */
172   public HTable(Configuration conf, final byte[] tableName)
173   throws IOException {
174     this(conf, TableName.valueOf(tableName));
175   }
176 
177 
178 
179   /**
180    * Creates an object to access a HBase table.
181    * Shares zookeeper connection and other resources with other HTable instances
182    * created with the same <code>conf</code> instance.  Uses already-populated
183    * region cache if one is available, populated by any other HTable instances
184    * sharing this <code>conf</code> instance.  Recommended.
185    * @param conf Configuration object to use.
186    * @param tableName table name pojo
187    * @throws IOException if a remote or network exception occurs
188    */
189   public HTable(Configuration conf, final TableName tableName)
190   throws IOException {
191     this.tableName = tableName;
192     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
193     if (conf == null) {
194       this.connection = null;
195       return;
196     }
197     this.connection = HConnectionManager.getConnection(conf);
198     this.configuration = conf;
199 
200     this.pool = getDefaultExecutor(conf);
201     this.finishSetup();
202   }
203 
204   /**
205    * Creates an object to access a HBase table. Shares zookeeper connection and other resources with
206    * other HTable instances created with the same <code>connection</code> instance. Use this
207    * constructor when the HConnection instance is externally managed.
208    * @param tableName Name of the table.
209    * @param connection HConnection to be used.
210    * @throws IOException if a remote or network exception occurs
211    */
212   public HTable(TableName tableName, HConnection connection) throws IOException {
213     this.tableName = tableName;
214     this.cleanupPoolOnClose = true;
215     this.cleanupConnectionOnClose = false;
216     this.connection = connection;
217     this.configuration = connection.getConfiguration();
218 
219     this.pool = getDefaultExecutor(this.configuration);
220     this.finishSetup();
221   }
222    
223   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
224     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
225     if (maxThreads == 0) {
226       maxThreads = 1; // is there a better default?
227     }
228     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
229 
230     // Using the "direct handoff" approach, new threads will only be created
231     // if it is necessary and will grow unbounded. This could be bad but in HCM
232     // we only create as many Runnables as there are region servers. It means
233     // it also scales when new region servers are added.
234     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
235         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
236     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
237     return pool;
238   }
239 
240   /**
241    * Creates an object to access a HBase table.
242    * Shares zookeeper connection and other resources with other HTable instances
243    * created with the same <code>conf</code> instance.  Uses already-populated
244    * region cache if one is available, populated by any other HTable instances
245    * sharing this <code>conf</code> instance.
246    * Use this constructor when the ExecutorService is externally managed.
247    * @param conf Configuration object to use.
248    * @param tableName Name of the table.
249    * @param pool ExecutorService to be used.
250    * @throws IOException if a remote or network exception occurs
251    */
252   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
253       throws IOException {
254     this(conf, TableName.valueOf(tableName), pool);
255   }
256 
257   /**
258    * Creates an object to access a HBase table.
259    * Shares zookeeper connection and other resources with other HTable instances
260    * created with the same <code>conf</code> instance.  Uses already-populated
261    * region cache if one is available, populated by any other HTable instances
262    * sharing this <code>conf</code> instance.
263    * Use this constructor when the ExecutorService is externally managed.
264    * @param conf Configuration object to use.
265    * @param tableName Name of the table.
266    * @param pool ExecutorService to be used.
267    * @throws IOException if a remote or network exception occurs
268    */
269   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
270       throws IOException {
271     this.connection = HConnectionManager.getConnection(conf);
272     this.configuration = conf;
273     this.pool = pool;
274     this.tableName = tableName;
275     this.cleanupPoolOnClose = false;
276     this.cleanupConnectionOnClose = true;
277 
278     this.finishSetup();
279   }
280 
281   /**
282    * Creates an object to access a HBase table.
283    * Shares zookeeper connection and other resources with other HTable instances
284    * created with the same <code>connection</code> instance.
285    * Use this constructor when the ExecutorService and HConnection instance are
286    * externally managed.
287    * @param tableName Name of the table.
288    * @param connection HConnection to be used.
289    * @param pool ExecutorService to be used.
290    * @throws IOException if a remote or network exception occurs
291    */
292   public HTable(final byte[] tableName, final HConnection connection,
293       final ExecutorService pool) throws IOException {
294     this(TableName.valueOf(tableName), connection, pool);
295   }
296 
297   /**
298    * Creates an object to access a HBase table.
299    * Shares zookeeper connection and other resources with other HTable instances
300    * created with the same <code>connection</code> instance.
301    * Use this constructor when the ExecutorService and HConnection instance are
302    * externally managed.
303    * @param tableName Name of the table.
304    * @param connection HConnection to be used.
305    * @param pool ExecutorService to be used.
306    * @throws IOException if a remote or network exception occurs
307    */
308   public HTable(TableName tableName, final HConnection connection,
309       final ExecutorService pool) throws IOException {
310     if (connection == null || connection.isClosed()) {
311       throw new IllegalArgumentException("Connection is null or closed.");
312     }
313     this.tableName = tableName;
314     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
315     this.connection = connection;
316     this.configuration = connection.getConfiguration();
317     this.pool = pool;
318 
319     this.finishSetup();
320   }
321 
322   /**
323    * For internal testing.
324    */
325   protected HTable(){
326     tableName = null;
327     cleanupPoolOnClose = false;
328     cleanupConnectionOnClose = false;
329   }
330 
331   /**
332    * setup this HTable's parameter based on the passed configuration
333    */
334   private void finishSetup() throws IOException {
335     this.operationTimeout = tableName.isSystemTable() ?
336       this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
337         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT):
338       this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
339         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
340     this.writeBufferSize = this.configuration.getLong(
341         "hbase.client.write.buffer", 2097152);
342     this.clearBufferOnFail = true;
343     this.autoFlush = true;
344     this.currentWriteBufferSize = 0;
345     this.scannerCaching = this.configuration.getInt(
346         HConstants.HBASE_CLIENT_SCANNER_CACHING,
347         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
348 
349     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
350     this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
351     ap = new AsyncProcess<Object>(connection, tableName, pool, null,
352         configuration, rpcCallerFactory, rpcControllerFactory);
353 
354     this.maxKeyValueSize = this.configuration.getInt(
355         "hbase.client.keyvalue.maxsize", -1);
356     this.closed = false;
357   }
358 
359 
360 
361   /**
362    * {@inheritDoc}
363    */
364   @Override
365   public Configuration getConfiguration() {
366     return configuration;
367   }
368 
369   /**
370    * Tells whether or not a table is enabled or not. This method creates a
371    * new HBase configuration, so it might make your unit tests fail due to
372    * incorrect ZK client port.
373    * @param tableName Name of table to check.
374    * @return {@code true} if table is online.
375    * @throws IOException if a remote or network exception occurs
376 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
377    */
378   @Deprecated
379   public static boolean isTableEnabled(String tableName) throws IOException {
380     return isTableEnabled(TableName.valueOf(tableName));
381   }
382 
383   /**
384    * Tells whether or not a table is enabled or not. This method creates a
385    * new HBase configuration, so it might make your unit tests fail due to
386    * incorrect ZK client port.
387    * @param tableName Name of table to check.
388    * @return {@code true} if table is online.
389    * @throws IOException if a remote or network exception occurs
390 	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
391    */
392   @Deprecated
393   public static boolean isTableEnabled(byte[] tableName) throws IOException {
394     return isTableEnabled(TableName.valueOf(tableName));
395   }
396 
397   /**
398    * Tells whether or not a table is enabled or not. This method creates a
399    * new HBase configuration, so it might make your unit tests fail due to
400    * incorrect ZK client port.
401    * @param tableName Name of table to check.
402    * @return {@code true} if table is online.
403    * @throws IOException if a remote or network exception occurs
404    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
405    */
406   @Deprecated
407   public static boolean isTableEnabled(TableName tableName) throws IOException {
408     return isTableEnabled(HBaseConfiguration.create(), tableName);
409   }
410 
411   /**
412    * Tells whether or not a table is enabled or not.
413    * @param conf The Configuration object to use.
414    * @param tableName Name of table to check.
415    * @return {@code true} if table is online.
416    * @throws IOException if a remote or network exception occurs
417 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
418    */
419   @Deprecated
420   public static boolean isTableEnabled(Configuration conf, String tableName)
421   throws IOException {
422     return isTableEnabled(conf, TableName.valueOf(tableName));
423   }
424 
425   /**
426    * Tells whether or not a table is enabled or not.
427    * @param conf The Configuration object to use.
428    * @param tableName Name of table to check.
429    * @return {@code true} if table is online.
430    * @throws IOException if a remote or network exception occurs
431 	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
432    */
433   @Deprecated
434   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
435   throws IOException {
436     return isTableEnabled(conf, TableName.valueOf(tableName));
437   }
438 
439   /**
440    * Tells whether or not a table is enabled or not.
441    * @param conf The Configuration object to use.
442    * @param tableName Name of table to check.
443    * @return {@code true} if table is online.
444    * @throws IOException if a remote or network exception occurs
445    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
446    */
447   @Deprecated
448   public static boolean isTableEnabled(Configuration conf,
449       final TableName tableName) throws IOException {
450     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
451       @Override
452       public Boolean connect(HConnection connection) throws IOException {
453         return connection.isTableEnabled(tableName);
454       }
455     });
456   }
457 
458   /**
459    * Find region location hosting passed row using cached info
460    * @param row Row to find.
461    * @return The location of the given row.
462    * @throws IOException if a remote or network exception occurs
463    */
464   public HRegionLocation getRegionLocation(final String row)
465   throws IOException {
466     return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
467   }
468 
469   /**
470    * Finds the region on which the given row is being served. Does not reload the cache.
471    * @param row Row to find.
472    * @return Location of the row.
473    * @throws IOException if a remote or network exception occurs
474    */
475   public HRegionLocation getRegionLocation(final byte [] row)
476   throws IOException {
477     return connection.getRegionLocation(tableName, row, false);
478   }
479 
480   /**
481    * Finds the region on which the given row is being served.
482    * @param row Row to find.
483    * @param reload true to reload information or false to use cached information
484    * @return Location of the row.
485    * @throws IOException if a remote or network exception occurs
486    */
487   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
488   throws IOException {
489     return connection.getRegionLocation(tableName, row, reload);
490   }
491 
492   /**
493    * {@inheritDoc}
494    */
495   @Override
496   public byte [] getTableName() {
497     return this.tableName.getName();
498   }
499 
500   @Override
501   public TableName getName() {
502     return tableName;
503   }
504 
505   /**
506    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
507    * manipulations.
508    * @return An HConnection instance.
509    * @deprecated This method will be changed from public to package protected.
510    */
511   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
512   @Deprecated
513   public HConnection getConnection() {
514     return this.connection;
515   }
516 
517   /**
518    * Gets the number of rows that a scanner will fetch at once.
519    * <p>
520    * The default value comes from {@code hbase.client.scanner.caching}.
521    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
522    */
523   @Deprecated
524   public int getScannerCaching() {
525     return scannerCaching;
526   }
527 
528   /**
529    * Kept in 0.96 for backward compatibility
530    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
531    */
532   @Deprecated
533   public List<Row> getWriteBuffer() {
534     return writeAsyncBuffer;
535   }
536 
537   /**
538    * Sets the number of rows that a scanner will fetch at once.
539    * <p>
540    * This will override the value specified by
541    * {@code hbase.client.scanner.caching}.
542    * Increasing this value will reduce the amount of work needed each time
543    * {@code next()} is called on a scanner, at the expense of memory use
544    * (since more rows will need to be maintained in memory by the scanners).
545    * @param scannerCaching the number of rows a scanner will fetch at once.
546    * @deprecated Use {@link Scan#setCaching(int)}
547    */
548   @Deprecated
549   public void setScannerCaching(int scannerCaching) {
550     this.scannerCaching = scannerCaching;
551   }
552 
553   /**
554    * {@inheritDoc}
555    */
556   @Override
557   public HTableDescriptor getTableDescriptor() throws IOException {
558     return new UnmodifyableHTableDescriptor(
559       this.connection.getHTableDescriptor(this.tableName));
560   }
561 
562   /**
563    * Gets the starting row key for every region in the currently open table.
564    * <p>
565    * This is mainly useful for the MapReduce integration.
566    * @return Array of region starting row keys
567    * @throws IOException if a remote or network exception occurs
568    */
569   public byte [][] getStartKeys() throws IOException {
570     return getStartEndKeys().getFirst();
571   }
572 
573   /**
574    * Gets the ending row key for every region in the currently open table.
575    * <p>
576    * This is mainly useful for the MapReduce integration.
577    * @return Array of region ending row keys
578    * @throws IOException if a remote or network exception occurs
579    */
580   public byte[][] getEndKeys() throws IOException {
581     return getStartEndKeys().getSecond();
582   }
583 
584   /**
585    * Gets the starting and ending row keys for every region in the currently
586    * open table.
587    * <p>
588    * This is mainly useful for the MapReduce integration.
589    * @return Pair of arrays of region starting and ending row keys
590    * @throws IOException if a remote or network exception occurs
591    */
592   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
593     NavigableMap<HRegionInfo, ServerName> regions = getRegionLocations();
594     final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
595     final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
596 
597     for (HRegionInfo region : regions.keySet()) {
598       startKeyList.add(region.getStartKey());
599       endKeyList.add(region.getEndKey());
600     }
601 
602     return new Pair<byte [][], byte [][]>(
603       startKeyList.toArray(new byte[startKeyList.size()][]),
604       endKeyList.toArray(new byte[endKeyList.size()][]));
605   }
606 
607   /**
608    * Gets all the regions and their address for this table.
609    * <p>
610    * This is mainly useful for the MapReduce integration.
611    * @return A map of HRegionInfo with it's server address
612    * @throws IOException if a remote or network exception occurs
613    */
614   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
615     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocation, singular, returns an HRegionLocation.
616     return MetaScanner.allTableRegions(getConfiguration(), this.connection, getName(), false);
617   }
618 
619   /**
620    * Get the corresponding regions for an arbitrary range of keys.
621    * <p>
622    * @param startKey Starting row in range, inclusive
623    * @param endKey Ending row in range, exclusive
624    * @return A list of HRegionLocations corresponding to the regions that
625    * contain the specified range
626    * @throws IOException if a remote or network exception occurs
627    */
628   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
629     final byte [] endKey) throws IOException {
630     return getRegionsInRange(startKey, endKey, false);
631   }
632 
633   /**
634    * Get the corresponding regions for an arbitrary range of keys.
635    * <p>
636    * @param startKey Starting row in range, inclusive
637    * @param endKey Ending row in range, exclusive
638    * @param reload true to reload information or false to use cached information
639    * @return A list of HRegionLocations corresponding to the regions that
640    * contain the specified range
641    * @throws IOException if a remote or network exception occurs
642    */
643   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
644       final byte [] endKey, final boolean reload) throws IOException {
645     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
646   }
647 
648   /**
649    * Get the corresponding start keys and regions for an arbitrary range of
650    * keys.
651    * <p>
652    * @param startKey Starting row in range, inclusive
653    * @param endKey Ending row in range
654    * @param includeEndKey true if endRow is inclusive, false if exclusive
655    * @return A pair of list of start keys and list of HRegionLocations that
656    *         contain the specified range
657    * @throws IOException if a remote or network exception occurs
658    */
659   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
660       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
661       throws IOException {
662     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
663   }
664 
665   /**
666    * Get the corresponding start keys and regions for an arbitrary range of
667    * keys.
668    * <p>
669    * @param startKey Starting row in range, inclusive
670    * @param endKey Ending row in range
671    * @param includeEndKey true if endRow is inclusive, false if exclusive
672    * @param reload true to reload information or false to use cached information
673    * @return A pair of list of start keys and list of HRegionLocations that
674    *         contain the specified range
675    * @throws IOException if a remote or network exception occurs
676    */
677   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
678       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
679       final boolean reload) throws IOException {
680     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
681     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
682       throw new IllegalArgumentException(
683         "Invalid range: " + Bytes.toStringBinary(startKey) +
684         " > " + Bytes.toStringBinary(endKey));
685     }
686     List<byte[]> keysInRange = new ArrayList<byte[]>();
687     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
688     byte[] currentKey = startKey;
689     do {
690       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
691       keysInRange.add(currentKey);
692       regionsInRange.add(regionLocation);
693       currentKey = regionLocation.getRegionInfo().getEndKey();
694     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
695         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
696             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
697     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
698         regionsInRange);
699   }
700 
701   /**
702    * {@inheritDoc}
703    */
704    @Override
705    public Result getRowOrBefore(final byte[] row, final byte[] family)
706    throws IOException {
707      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
708          tableName, row) {
709        public Result call() throws IOException {
710             return ProtobufUtil.getRowOrBefore(getStub(), getLocation().getRegionInfo()
711                 .getRegionName(), row, family, rpcControllerFactory.newController());
712           }
713      };
714     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
715   }
716 
717    /**
718     * {@inheritDoc}
719     */
720   @Override
721   public ResultScanner getScanner(final Scan scan) throws IOException {
722     if (scan.getCaching() <= 0) {
723       scan.setCaching(getScannerCaching());
724     }
725 
726     if (scan.isReversed()) {
727       if (scan.isSmall()) {
728         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
729             this.connection);
730       } else {
731         return new ReversedClientScanner(getConfiguration(), scan, getName(), this.connection);
732       }
733     }
734 
735     if (scan.isSmall()) {
736       return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection);
737     } else {
738       return new ClientScanner(getConfiguration(), scan, getName(), this.connection);
739     }
740   }
741 
742   /**
743    * {@inheritDoc}
744    */
745   @Override
746   public ResultScanner getScanner(byte [] family) throws IOException {
747     Scan scan = new Scan();
748     scan.addFamily(family);
749     return getScanner(scan);
750   }
751 
752   /**
753    * {@inheritDoc}
754    */
755   @Override
756   public ResultScanner getScanner(byte [] family, byte [] qualifier)
757   throws IOException {
758     Scan scan = new Scan();
759     scan.addColumn(family, qualifier);
760     return getScanner(scan);
761   }
762 
763   /**
764    * {@inheritDoc}
765    */
766   @Override
767   public Result get(final Get get) throws IOException {
768     // have to instanatiate this and set the priority here since in protobuf util we don't pass in
769     // the tablename... an unfortunate side-effect of public interfaces :-/ In 0.99+ we put all the
770     // logic back into HTable
771     final PayloadCarryingRpcController controller = rpcControllerFactory.newController();
772     controller.setPriority(tableName);
773     RegionServerCallable<Result> callable =
774         new RegionServerCallable<Result>(this.connection, getName(), get.getRow()) {
775           public Result call() throws IOException {
776             return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get,
777               controller);
778           }
779         };
780     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
781   }
782 
783   /**
784    * {@inheritDoc}
785    */
786   @Override
787   public Result[] get(List<Get> gets) throws IOException {
788     if (gets.size() == 1) {
789       return new Result[]{get(gets.get(0))};
790     }
791     try {
792       Object [] r1 = batch((List)gets);
793 
794       // translate.
795       Result [] results = new Result[r1.length];
796       int i=0;
797       for (Object o : r1) {
798         // batch ensures if there is a failure we get an exception instead
799         results[i++] = (Result) o;
800       }
801 
802       return results;
803     } catch (InterruptedException e) {
804       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
805     }
806   }
807 
808   /**
809    * {@inheritDoc}
810    */
811   @Override
812   public void batch(final List<?extends Row> actions, final Object[] results)
813       throws InterruptedException, IOException {
814     batchCallback(actions, results, null);
815   }
816 
817   /**
818    * {@inheritDoc}
819    * @deprecated If any exception is thrown by one of the actions, there is no way to
820    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
821    */
822   @Override
823   public Object[] batch(final List<? extends Row> actions)
824      throws InterruptedException, IOException {
825     return batchCallback(actions, null);
826   }
827 
828   /**
829    * {@inheritDoc}
830    */
831   @Override
832   public <R> void batchCallback(
833       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
834       throws IOException, InterruptedException {
835     connection.processBatchCallback(actions, tableName, pool, results, callback);
836   }
837 
838   /**
839    * {@inheritDoc}
840    * @deprecated If any exception is thrown by one of the actions, there is no way to
841    * retrieve the partially executed results. Use
842    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
843    * instead.
844    */
845   @Override
846   public <R> Object[] batchCallback(
847     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
848       InterruptedException {
849     Object[] results = new Object[actions.size()];
850     batchCallback(actions, results, callback);
851     return results;
852   }
853 
854   /**
855    * {@inheritDoc}
856    */
857   @Override
858   public void delete(final Delete delete)
859   throws IOException {
860     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
861         tableName, delete.getRow()) {
862       public Boolean call() throws IOException {
863         try {
864           MutateRequest request = RequestConverter.buildMutateRequest(
865             getLocation().getRegionInfo().getRegionName(), delete);
866               PayloadCarryingRpcController controller = rpcControllerFactory.newController();
867               controller.setPriority(tableName);
868               MutateResponse response = getStub().mutate(controller, request);
869           return Boolean.valueOf(response.getProcessed());
870         } catch (ServiceException se) {
871           throw ProtobufUtil.getRemoteException(se);
872         }
873       }
874     };
875     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
876   }
877 
878   /**
879    * {@inheritDoc}
880    */
881   @Override
882   public void delete(final List<Delete> deletes)
883   throws IOException {
884     Object[] results = new Object[deletes.size()];
885     try {
886       batch(deletes, results);
887     } catch (InterruptedException e) {
888       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
889     } finally {
890       // mutate list so that it is empty for complete success, or contains only failed records
891       // results are returned in the same order as the requests in list
892       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
893       for (int i = results.length - 1; i>=0; i--) {
894         // if result is not null, it succeeded
895         if (results[i] instanceof Result) {
896           deletes.remove(i);
897         }
898       }
899     }
900   }
901 
902   /**
903    * {@inheritDoc}
904    */
905   @Override
906   public void put(final Put put)
907       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
908     doPut(put);
909     if (autoFlush) {
910       flushCommits();
911     }
912   }
913 
914   /**
915    * {@inheritDoc}
916    */
917   @Override
918   public void put(final List<Put> puts)
919       throws InterruptedIOException, RetriesExhaustedWithDetailsException {
920     for (Put put : puts) {
921       doPut(put);
922     }
923     if (autoFlush) {
924       flushCommits();
925     }
926   }
927 
928 
929   /**
930    * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
931    *  cluster.
932    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
933    * @throws InterruptedIOException if we were interrupted.
934    */
935   private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
936     if (ap.hasError()){
937       writeAsyncBuffer.add(put);
938       backgroundFlushCommits(true);
939     }
940 
941     validatePut(put);
942 
943     currentWriteBufferSize += put.heapSize();
944     writeAsyncBuffer.add(put);
945 
946     while (currentWriteBufferSize > writeBufferSize) {
947       backgroundFlushCommits(false);
948     }
949   }
950 
951 
952   /**
953    * Send the operations in the buffer to the servers. Does not wait for the server's answer.
954    * If the is an error (max retried reach from a previous flush or bad operation), it tries to
955    * send all operations in the buffer and sends an exception.
956    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
957    *                     returning.
958    */
959   private void backgroundFlushCommits(boolean synchronous) throws
960       InterruptedIOException, RetriesExhaustedWithDetailsException {
961 
962     try {
963       do {
964         ap.submit(writeAsyncBuffer, true);
965       } while (synchronous && !writeAsyncBuffer.isEmpty());
966 
967       if (synchronous) {
968         ap.waitUntilDone();
969       }
970 
971       if (ap.hasError()) {
972         LOG.debug(tableName + ": One or more of the operations have failed -" +
973             " waiting for all operation in progress to finish (successfully or not)");
974         while (!writeAsyncBuffer.isEmpty()) {
975           ap.submit(writeAsyncBuffer, true);
976         }
977         ap.waitUntilDone();
978 
979         if (!clearBufferOnFail) {
980           // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
981           //  write buffer. This is a questionable feature kept here for backward compatibility
982           writeAsyncBuffer.addAll(ap.getFailedOperations());
983         }
984         RetriesExhaustedWithDetailsException e = ap.getErrors();
985         ap.clearErrors();
986         throw e;
987       }
988     } finally {
989       currentWriteBufferSize = 0;
990       for (Row mut : writeAsyncBuffer) {
991         if (mut instanceof Mutation) {
992           currentWriteBufferSize += ((Mutation) mut).heapSize();
993         }
994       }
995     }
996   }
997 
998   /**
999    * {@inheritDoc}
1000    */
1001   @Override
1002   public void mutateRow(final RowMutations rm) throws IOException {
1003     RegionServerCallable<Void> callable =
1004         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1005       public Void call() throws IOException {
1006         try {
1007           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1008             getLocation().getRegionInfo().getRegionName(), rm);
1009           regionMutationBuilder.setAtomic(true);
1010           MultiRequest request =
1011             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1012           PayloadCarryingRpcController pcrc = rpcControllerFactory.newController();
1013           pcrc.setPriority(tableName);
1014           getStub().multi(pcrc, request);
1015         } catch (ServiceException se) {
1016           throw ProtobufUtil.getRemoteException(se);
1017         }
1018         return null;
1019       }
1020     };
1021     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1022   }
1023 
1024   /**
1025    * {@inheritDoc}
1026    */
1027   @Override
1028   public Result append(final Append append) throws IOException {
1029     if (append.numFamilies() == 0) {
1030       throw new IOException(
1031           "Invalid arguments to append, no columns specified");
1032     }
1033 
1034     NonceGenerator ng = this.connection.getNonceGenerator();
1035     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1036     RegionServerCallable<Result> callable =
1037       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1038         public Result call() throws IOException {
1039           try {
1040             MutateRequest request = RequestConverter.buildMutateRequest(
1041               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1042             PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1043             rpcController.setPriority(getTableName());
1044             MutateResponse response = getStub().mutate(rpcController, request);
1045             if (!response.hasResult()) return null;
1046             return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1047           } catch (ServiceException se) {
1048             throw ProtobufUtil.getRemoteException(se);
1049           }
1050         }
1051       };
1052     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1053   }
1054 
1055   /**
1056    * {@inheritDoc}
1057    */
1058   @Override
1059   public Result increment(final Increment increment) throws IOException {
1060     if (!increment.hasFamilies()) {
1061       throw new IOException(
1062           "Invalid arguments to increment, no columns specified");
1063     }
1064     NonceGenerator ng = this.connection.getNonceGenerator();
1065     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1066     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1067         getName(), increment.getRow()) {
1068       public Result call() throws IOException {
1069         try {
1070           MutateRequest request = RequestConverter.buildMutateRequest(
1071             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1072           PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1073           rpcController.setPriority(getTableName());
1074           MutateResponse response = getStub().mutate(rpcController, request);
1075           return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1076         } catch (ServiceException se) {
1077           throw ProtobufUtil.getRemoteException(se);
1078         }
1079       }
1080     };
1081     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1082   }
1083 
1084   /**
1085    * {@inheritDoc}
1086    */
1087   @Override
1088   public long incrementColumnValue(final byte [] row, final byte [] family,
1089       final byte [] qualifier, final long amount)
1090   throws IOException {
1091     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1092   }
1093 
1094   /**
1095    * @deprecated Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
1096    */
1097   @Deprecated
1098   @Override
1099   public long incrementColumnValue(final byte [] row, final byte [] family,
1100       final byte [] qualifier, final long amount, final boolean writeToWAL)
1101   throws IOException {
1102     return incrementColumnValue(row, family, qualifier, amount,
1103       writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1104   }
1105 
1106   /**
1107    * {@inheritDoc}
1108    */
1109   @Override
1110   public long incrementColumnValue(final byte [] row, final byte [] family,
1111       final byte [] qualifier, final long amount, final Durability durability)
1112   throws IOException {
1113     NullPointerException npe = null;
1114     if (row == null) {
1115       npe = new NullPointerException("row is null");
1116     } else if (family == null) {
1117       npe = new NullPointerException("family is null");
1118     } else if (qualifier == null) {
1119       npe = new NullPointerException("qualifier is null");
1120     }
1121     if (npe != null) {
1122       throw new IOException(
1123           "Invalid arguments to incrementColumnValue", npe);
1124     }
1125 
1126     NonceGenerator ng = this.connection.getNonceGenerator();
1127     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1128     RegionServerCallable<Long> callable =
1129       new RegionServerCallable<Long>(connection, getName(), row) {
1130         public Long call() throws IOException {
1131           try {
1132             MutateRequest request = RequestConverter.buildIncrementRequest(
1133               getLocation().getRegionInfo().getRegionName(), row, family,
1134               qualifier, amount, durability, nonceGroup, nonce);
1135             PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1136             rpcController.setPriority(getTableName());
1137             MutateResponse response = getStub().mutate(rpcController, request);
1138             Result result =
1139               ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1140             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1141           } catch (ServiceException se) {
1142             throw ProtobufUtil.getRemoteException(se);
1143           }
1144         }
1145       };
1146     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1147   }
1148 
1149   /**
1150    * {@inheritDoc}
1151    */
1152   @Override
1153   public boolean checkAndPut(final byte [] row,
1154       final byte [] family, final byte [] qualifier, final byte [] value,
1155       final Put put)
1156   throws IOException {
1157     RegionServerCallable<Boolean> callable =
1158       new RegionServerCallable<Boolean>(connection, getName(), row) {
1159         public Boolean call() throws IOException {
1160           try {
1161             MutateRequest request = RequestConverter.buildMutateRequest(
1162               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1163                 new BinaryComparator(value), CompareType.EQUAL, put);
1164             PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1165             rpcController.setPriority(getTableName());
1166             MutateResponse response = getStub().mutate(rpcController, request);
1167             return Boolean.valueOf(response.getProcessed());
1168           } catch (ServiceException se) {
1169             throw ProtobufUtil.getRemoteException(se);
1170           }
1171         }
1172       };
1173     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1174   }
1175 
1176 
1177   /**
1178    * {@inheritDoc}
1179    */
1180   @Override
1181   public boolean checkAndDelete(final byte [] row,
1182       final byte [] family, final byte [] qualifier, final byte [] value,
1183       final Delete delete)
1184   throws IOException {
1185     RegionServerCallable<Boolean> callable =
1186       new RegionServerCallable<Boolean>(connection, getName(), row) {
1187         public Boolean call() throws IOException {
1188           try {
1189             MutateRequest request = RequestConverter.buildMutateRequest(
1190               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1191                 new BinaryComparator(value), CompareType.EQUAL, delete);
1192             PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1193             rpcController.setPriority(getTableName());
1194             MutateResponse response = getStub().mutate(rpcController, request);
1195             return Boolean.valueOf(response.getProcessed());
1196           } catch (ServiceException se) {
1197             throw ProtobufUtil.getRemoteException(se);
1198           }
1199         }
1200       };
1201     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1202   }
1203 
1204   /**
1205    * {@inheritDoc}
1206    */
1207   @Override
1208   public boolean exists(final Get get) throws IOException {
1209     get.setCheckExistenceOnly(true);
1210     Result r = get(get);
1211     assert r.getExists() != null;
1212     return r.getExists();
1213   }
1214 
1215   /**
1216    * {@inheritDoc}
1217    */
1218   @Override
1219   public Boolean[] exists(final List<Get> gets) throws IOException {
1220     if (gets.isEmpty()) return new Boolean[]{};
1221     if (gets.size() == 1) return new Boolean[]{exists(gets.get(0))};
1222 
1223     for (Get g: gets){
1224       g.setCheckExistenceOnly(true);
1225     }
1226 
1227     Object[] r1;
1228     try {
1229       r1 = batch(gets);
1230     } catch (InterruptedException e) {
1231       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1232     }
1233 
1234     // translate.
1235     Boolean[] results = new Boolean[r1.length];
1236     int i = 0;
1237     for (Object o : r1) {
1238       // batch ensures if there is a failure we get an exception instead
1239       results[i++] = ((Result)o).getExists();
1240     }
1241 
1242     return results;
1243   }
1244 
1245   /**
1246    * {@inheritDoc}
1247    */
1248   @Override
1249   public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1250     // As we can have an operation in progress even if the buffer is empty, we call
1251     //  backgroundFlushCommits at least one time.
1252     backgroundFlushCommits(true);
1253   }
1254 
1255   /**
1256    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1257    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1258    *
1259    * @param list The collection of actions.
1260    * @param results An empty array, same size as list. If an exception is thrown,
1261    * you can test here for partial results, and to determine which actions
1262    * processed successfully.
1263    * @throws IOException if there are problems talking to META. Per-item
1264    * exceptions are stored in the results array.
1265    */
1266   public <R> void processBatchCallback(
1267     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1268     throws IOException, InterruptedException {
1269     this.batchCallback(list, results, callback);
1270   }
1271 
1272 
1273   /**
1274    * Parameterized batch processing, allowing varying return types for different
1275    * {@link Row} implementations.
1276    */
1277   public void processBatch(final List<? extends Row> list, final Object[] results)
1278     throws IOException, InterruptedException {
1279 
1280     this.processBatchCallback(list, results, null);
1281   }
1282 
1283 
1284   @Override
1285   public void close() throws IOException {
1286     if (this.closed) {
1287       return;
1288     }
1289     flushCommits();
1290     if (cleanupPoolOnClose) {
1291       this.pool.shutdown();
1292     }
1293     if (cleanupConnectionOnClose) {
1294       if (this.connection != null) {
1295         this.connection.close();
1296       }
1297     }
1298     this.closed = true;
1299   }
1300 
1301   // validate for well-formedness
1302   public void validatePut(final Put put) throws IllegalArgumentException{
1303     if (put.isEmpty()) {
1304       throw new IllegalArgumentException("No columns to insert");
1305     }
1306     if (maxKeyValueSize > 0) {
1307       for (List<Cell> list : put.getFamilyCellMap().values()) {
1308         for (Cell cell : list) {
1309           // KeyValue v1 expectation.  Cast for now.
1310           KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1311           if (kv.getLength() > maxKeyValueSize) {
1312             throw new IllegalArgumentException("KeyValue size too large");
1313           }
1314         }
1315       }
1316     }
1317   }
1318 
1319   /**
1320    * {@inheritDoc}
1321    */
1322   @Override
1323   public boolean isAutoFlush() {
1324     return autoFlush;
1325   }
1326 
1327   /**
1328    * {@inheritDoc}
1329    */
1330   @Deprecated
1331   @Override
1332   public void setAutoFlush(boolean autoFlush) {
1333     setAutoFlush(autoFlush, autoFlush);
1334   }
1335 
1336   /**
1337    * {@inheritDoc}
1338    */
1339   @Override
1340   public void setAutoFlushTo(boolean autoFlush) {
1341     setAutoFlush(autoFlush, clearBufferOnFail);
1342   }
1343 
1344   /**
1345    * {@inheritDoc}
1346    */
1347   @Override
1348   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1349     this.autoFlush = autoFlush;
1350     this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1351   }
1352 
1353   /**
1354    * Returns the maximum size in bytes of the write buffer for this HTable.
1355    * <p>
1356    * The default value comes from the configuration parameter
1357    * {@code hbase.client.write.buffer}.
1358    * @return The size of the write buffer in bytes.
1359    */
1360   @Override
1361   public long getWriteBufferSize() {
1362     return writeBufferSize;
1363   }
1364 
1365   /**
1366    * Sets the size of the buffer in bytes.
1367    * <p>
1368    * If the new size is less than the current amount of data in the
1369    * write buffer, the buffer gets flushed.
1370    * @param writeBufferSize The new write buffer size, in bytes.
1371    * @throws IOException if a remote or network exception occurs.
1372    */
1373   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1374     this.writeBufferSize = writeBufferSize;
1375     if(currentWriteBufferSize > writeBufferSize) {
1376       flushCommits();
1377     }
1378   }
1379 
1380   /**
1381    * The pool is used for mutli requests for this HTable
1382    * @return the pool used for mutli
1383    */
1384   ExecutorService getPool() {
1385     return this.pool;
1386   }
1387 
1388   /**
1389    * Enable or disable region cache prefetch for the table. It will be
1390    * applied for the given table's all HTable instances who share the same
1391    * connection. By default, the cache prefetch is enabled.
1392    * @param tableName name of table to configure.
1393    * @param enable Set to true to enable region cache prefetch. Or set to
1394    * false to disable it.
1395    * @throws IOException
1396    */
1397   public static void setRegionCachePrefetch(final byte[] tableName,
1398       final boolean enable) throws IOException {
1399     setRegionCachePrefetch(TableName.valueOf(tableName), enable);
1400   }
1401 
1402   public static void setRegionCachePrefetch(
1403       final TableName tableName,
1404       final boolean enable) throws IOException {
1405     HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration.create()) {
1406       @Override
1407       public Void connect(HConnection connection) throws IOException {
1408         connection.setRegionCachePrefetch(tableName, enable);
1409         return null;
1410       }
1411     });
1412   }
1413 
1414   /**
1415    * Enable or disable region cache prefetch for the table. It will be
1416    * applied for the given table's all HTable instances who share the same
1417    * connection. By default, the cache prefetch is enabled.
1418    * @param conf The Configuration object to use.
1419    * @param tableName name of table to configure.
1420    * @param enable Set to true to enable region cache prefetch. Or set to
1421    * false to disable it.
1422    * @throws IOException
1423    */
1424   public static void setRegionCachePrefetch(final Configuration conf,
1425       final byte[] tableName, final boolean enable) throws IOException {
1426     setRegionCachePrefetch(conf, TableName.valueOf(tableName), enable);
1427   }
1428 
1429   public static void setRegionCachePrefetch(final Configuration conf,
1430       final TableName tableName,
1431       final boolean enable) throws IOException {
1432     HConnectionManager.execute(new HConnectable<Void>(conf) {
1433       @Override
1434       public Void connect(HConnection connection) throws IOException {
1435         connection.setRegionCachePrefetch(tableName, enable);
1436         return null;
1437       }
1438     });
1439   }
1440 
1441   /**
1442    * Check whether region cache prefetch is enabled or not for the table.
1443    * @param conf The Configuration object to use.
1444    * @param tableName name of table to check
1445    * @return true if table's region cache prefecth is enabled. Otherwise
1446    * it is disabled.
1447    * @throws IOException
1448    */
1449   public static boolean getRegionCachePrefetch(final Configuration conf,
1450       final byte[] tableName) throws IOException {
1451     return getRegionCachePrefetch(conf, TableName.valueOf(tableName));
1452   }
1453 
1454   public static boolean getRegionCachePrefetch(final Configuration conf,
1455       final TableName tableName) throws IOException {
1456     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
1457       @Override
1458       public Boolean connect(HConnection connection) throws IOException {
1459         return connection.getRegionCachePrefetch(tableName);
1460       }
1461     });
1462   }
1463 
1464   /**
1465    * Check whether region cache prefetch is enabled or not for the table.
1466    * @param tableName name of table to check
1467    * @return true if table's region cache prefecth is enabled. Otherwise
1468    * it is disabled.
1469    * @throws IOException
1470    */
1471   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1472     return getRegionCachePrefetch(TableName.valueOf(tableName));
1473   }
1474 
1475   public static boolean getRegionCachePrefetch(
1476       final TableName tableName) throws IOException {
1477     return HConnectionManager.execute(new HConnectable<Boolean>(
1478         HBaseConfiguration.create()) {
1479       @Override
1480       public Boolean connect(HConnection connection) throws IOException {
1481         return connection.getRegionCachePrefetch(tableName);
1482       }
1483     });
1484   }
1485 
1486   /**
1487    * Explicitly clears the region cache to fetch the latest value from META.
1488    * This is a power user function: avoid unless you know the ramifications.
1489    */
1490   public void clearRegionCache() {
1491     this.connection.clearRegionCache();
1492   }
1493 
1494   /**
1495    * {@inheritDoc}
1496    */
1497   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1498     return new RegionCoprocessorRpcChannel(connection, tableName, row, rpcCallerFactory,
1499         rpcControllerFactory);
1500   }
1501 
1502   /**
1503    * {@inheritDoc}
1504    */
1505   @Override
1506   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1507       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1508       throws ServiceException, Throwable {
1509     final Map<byte[],R> results =  Collections.synchronizedMap(
1510         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1511     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1512       public void update(byte[] region, byte[] row, R value) {
1513         if (region != null) {
1514           results.put(region, value);
1515         }
1516       }
1517     });
1518     return results;
1519   }
1520 
1521   /**
1522    * {@inheritDoc}
1523    */
1524   @Override
1525   public <T extends Service, R> void coprocessorService(final Class<T> service,
1526       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1527       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1528 
1529     // get regions covered by the row range
1530     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1531 
1532     Map<byte[],Future<R>> futures =
1533         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1534     for (final byte[] r : keys) {
1535       final RegionCoprocessorRpcChannel channel =
1536           new RegionCoprocessorRpcChannel(connection, tableName, r, rpcCallerFactory,
1537               rpcControllerFactory);
1538       Future<R> future = pool.submit(
1539           new Callable<R>() {
1540             public R call() throws Exception {
1541               T instance = ProtobufUtil.newServiceStub(service, channel);
1542               R result = callable.call(instance);
1543               byte[] region = channel.getLastRegion();
1544               if (callback != null) {
1545                 callback.update(region, r, result);
1546               }
1547               return result;
1548             }
1549           });
1550       futures.put(r, future);
1551     }
1552     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1553       try {
1554         e.getValue().get();
1555       } catch (ExecutionException ee) {
1556         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1557             + Bytes.toStringBinary(e.getKey()), ee);
1558         throw ee.getCause();
1559       } catch (InterruptedException ie) {
1560         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1561             + " for row " + Bytes.toStringBinary(e.getKey()))
1562             .initCause(ie);
1563       }
1564     }
1565   }
1566 
1567   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1568   throws IOException {
1569     if (start == null) {
1570       start = HConstants.EMPTY_START_ROW;
1571     }
1572     if (end == null) {
1573       end = HConstants.EMPTY_END_ROW;
1574     }
1575     return getKeysAndRegionsInRange(start, end, true).getFirst();
1576   }
1577 
1578   public void setOperationTimeout(int operationTimeout) {
1579     this.operationTimeout = operationTimeout;
1580   }
1581 
1582   public int getOperationTimeout() {
1583     return operationTimeout;
1584   }
1585 
1586   @Override
1587   public String toString() {
1588     return tableName + ";" + connection;
1589   }
1590 
1591   /**
1592    * Run basic test.
1593    * @param args Pass table name and row and will get the content.
1594    * @throws IOException
1595    */
1596   public static void main(String[] args) throws IOException {
1597     HTable t = new HTable(HBaseConfiguration.create(), args[0]);
1598     try {
1599       System.out.println(t.get(new Get(Bytes.toBytes(args[1]))));
1600     } finally {
1601       t.close();
1602     }
1603   }
1604 
1605   /**
1606    * {@inheritDoc}
1607    */
1608   @Override
1609   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1610       Descriptors.MethodDescriptor methodDescriptor, Message request,
1611       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1612     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1613         Bytes.BYTES_COMPARATOR));
1614     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1615         new Callback<R>() {
1616 
1617           @Override
1618           public void update(byte[] region, byte[] row, R result) {
1619             if (region != null) {
1620               results.put(region, result);
1621             }
1622           }
1623         });
1624     return results;
1625   }
1626 
1627   /**
1628    * {@inheritDoc}
1629    */
1630   @Override
1631   public <R extends Message> void batchCoprocessorService(
1632       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1633       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1634       throws ServiceException, Throwable {
1635 
1636     // get regions covered by the row range
1637     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1638         getKeysAndRegionsInRange(startKey, endKey, true);
1639     List<byte[]> keys = keysAndRegions.getFirst();
1640     List<HRegionLocation> regions = keysAndRegions.getSecond();
1641 
1642     // check if we have any calls to make
1643     if (keys.isEmpty()) {
1644       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1645           ", end=" + Bytes.toStringBinary(endKey));
1646       return;
1647     }
1648 
1649     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1650     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1651         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1652     for (int i = 0; i < keys.size(); i++) {
1653       final byte[] rowKey = keys.get(i);
1654       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1655       RegionCoprocessorServiceExec exec =
1656           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1657       execs.add(exec);
1658       execsByRow.put(rowKey, exec);
1659     }
1660 
1661     // tracking for any possible deserialization errors on success callback
1662     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1663     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1664     final List<Row> callbackErrorActions = new ArrayList<Row>();
1665     final List<String> callbackErrorServers = new ArrayList<String>();
1666 
1667     AsyncProcess<ClientProtos.CoprocessorServiceResult> asyncProcess =
1668         new AsyncProcess<ClientProtos.CoprocessorServiceResult>(connection, tableName, pool,
1669             new AsyncProcess.AsyncProcessCallback<ClientProtos.CoprocessorServiceResult>() {
1670           @SuppressWarnings("unchecked")
1671           @Override
1672           public void success(int originalIndex, byte[] region, Row row,
1673               ClientProtos.CoprocessorServiceResult serviceResult) {
1674             if (LOG.isTraceEnabled()) {
1675               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1676                 " call #" + originalIndex + ": region=" + Bytes.toStringBinary(region) +
1677                 ", row=" + Bytes.toStringBinary(row.getRow()) +
1678                 ", value=" + serviceResult.getValue().getValue());
1679             }
1680             try {
1681               callback.update(region, row.getRow(),
1682                 (R) responsePrototype.newBuilderForType().mergeFrom(
1683                   serviceResult.getValue().getValue()).build());
1684             } catch (InvalidProtocolBufferException e) {
1685               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1686                 e);
1687               callbackErrorExceptions.add(e);
1688               callbackErrorActions.add(row);
1689               callbackErrorServers.add("null");
1690             }
1691           }
1692 
1693           @Override
1694           public boolean failure(int originalIndex, byte[] region, Row row, Throwable t) {
1695             RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
1696             LOG.error("Failed calling endpoint " + methodDescriptor.getFullName() + ": region="
1697                 + Bytes.toStringBinary(exec.getRegion()), t);
1698             return true;
1699           }
1700 
1701           @Override
1702           public boolean retriableFailure(int originalIndex, Row row, byte[] region,
1703               Throwable exception) {
1704             RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
1705             LOG.error("Failed calling endpoint " + methodDescriptor.getFullName() + ": region="
1706                 + Bytes.toStringBinary(exec.getRegion()), exception);
1707             return !(exception instanceof DoNotRetryIOException);
1708           }
1709         },
1710         configuration, rpcCallerFactory, rpcControllerFactory);
1711 
1712     asyncProcess.submitAll(execs);
1713     asyncProcess.waitUntilDone();
1714 
1715     if (asyncProcess.hasError()) {
1716       throw asyncProcess.getErrors();
1717     } else if (!callbackErrorExceptions.isEmpty()) {
1718       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1719         callbackErrorServers);
1720     }
1721   }
1722 }