View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  import java.util.TreeMap;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.SynchronousQueue;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.TimeUnit;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.classification.InterfaceStability;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HRegionLocation;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.KeyValueUtil;
49  import org.apache.hadoop.hbase.ServerName;
50  import org.apache.hadoop.hbase.TableName;
51  import org.apache.hadoop.hbase.TableNotFoundException;
52  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53  import org.apache.hadoop.hbase.client.coprocessor.Batch;
54  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
55  import org.apache.hadoop.hbase.filter.BinaryComparator;
56  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
57  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
58  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
59  import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
60  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
61  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62  import org.apache.hadoop.hbase.protobuf.RequestConverter;
63  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
64  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
65  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
68  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
69  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
70  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.Pair;
73  import org.apache.hadoop.hbase.util.ReflectionUtils;
74  import org.apache.hadoop.hbase.util.Threads;
75  
76  import com.google.common.annotations.VisibleForTesting;
77  import com.google.protobuf.Descriptors;
78  import com.google.protobuf.Message;
79  import com.google.protobuf.Service;
80  import com.google.protobuf.ServiceException;
81  
82  /**
83   * An implementation of {@link Table}. Used to communicate with a single HBase table.
84   * Lightweight. Get as needed and just close when done.
85   * Instances of this class SHOULD NOT be constructed directly.
86   * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
87   * class comment for an example of how.
88   *
89   * <p>This class is NOT thread safe for reads nor writes.
90   * In the case of writes (Put, Delete), the underlying write buffer can
91   * be corrupted if multiple threads contend over a single HTable instance.
92   * In the case of reads, some fields used by a Scan are shared among all threads.
93   *
94   * <p>HTable is no longer a client API. Use {@link Table} instead. It is marked
95   * InterfaceAudience.Private as of hbase-1.0.0 indicating that this is an
96   * HBase-internal class as defined in
97   * <a href="https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
98   * Interface Classification</a>. There are no guarantees for backwards
99   * source / binary compatibility and methods or the class can
100  * change or go away without deprecation.
101  * <p>Near all methods of this * class made it out to the new {@link Table}
102  * Interface or were * instantiations of methods defined in {@link HTableInterface}.
103  * A few did not. Namely, the {@link #getStartEndKeys}, {@link #getEndKeys},
104  * and {@link #getStartKeys} methods. These three methods are available
105  * in {@link RegionLocator} as of 1.0.0 but were NOT marked as
106  * deprecated when we released 1.0.0. In spite of this oversight on our
107  * part, these methods will be removed in 2.0.0.
108  *
109  * @see Table
110  * @see Admin
111  * @see Connection
112  * @see ConnectionFactory
113  */
114 @InterfaceAudience.Private
115 @InterfaceStability.Stable
116 public class HTable implements HTableInterface, RegionLocator {
117   private static final Log LOG = LogFactory.getLog(HTable.class);
118   protected ClusterConnection connection;
119   private final TableName tableName;
120   private volatile Configuration configuration;
121   private ConnectionConfiguration connConfiguration;
122   protected BufferedMutatorImpl mutator;
123   private boolean autoFlush = true;
124   private boolean closed = false;
125   protected int scannerCaching;
126   protected long scannerMaxResultSize;
127   private ExecutorService pool;  // For Multi & Scan
128   private int operationTimeout;
129   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
130   private final boolean cleanupConnectionOnClose; // close the connection in close()
131   private Consistency defaultConsistency = Consistency.STRONG;
132   private HRegionLocator locator;
133 
134   /** The Async process for batch */
135   protected AsyncProcess multiAp;
136   private RpcRetryingCallerFactory rpcCallerFactory;
137   private RpcControllerFactory rpcControllerFactory;
138 
139   /**
140    * Creates an object to access a HBase table.
141    * @param conf Configuration object to use.
142    * @param tableName Name of the table.
143    * @throws IOException if a remote or network exception occurs
144    * @deprecated Constructing HTable objects manually has been deprecated. Please use
145    * {@link Connection} to instantiate a {@link Table} instead.
146    */
147   @Deprecated
148   public HTable(Configuration conf, final String tableName)
149   throws IOException {
150     this(conf, TableName.valueOf(tableName));
151   }
152 
153   /**
154    * Creates an object to access a HBase table.
155    * @param conf Configuration object to use.
156    * @param tableName Name of the table.
157    * @throws IOException if a remote or network exception occurs
158    * @deprecated Constructing HTable objects manually has been deprecated. Please use
159    * {@link Connection} to instantiate a {@link Table} instead.
160    */
161   @Deprecated
162   public HTable(Configuration conf, final byte[] tableName)
163   throws IOException {
164     this(conf, TableName.valueOf(tableName));
165   }
166 
167   /**
168    * Creates an object to access a HBase table.
169    * @param conf Configuration object to use.
170    * @param tableName table name pojo
171    * @throws IOException if a remote or network exception occurs
172    * @deprecated Constructing HTable objects manually has been deprecated. Please use
173    * {@link Connection} to instantiate a {@link Table} instead.
174    */
175   @Deprecated
176   public HTable(Configuration conf, final TableName tableName)
177   throws IOException {
178     this.tableName = tableName;
179     this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
180     if (conf == null) {
181       this.connection = null;
182       return;
183     }
184     this.connection = ConnectionManager.getConnectionInternal(conf);
185     this.configuration = conf;
186 
187     this.pool = getDefaultExecutor(conf);
188     this.finishSetup();
189   }
190 
191   /**
192    * Creates an object to access a HBase table.
193    * @param tableName Name of the table.
194    * @param connection HConnection to be used.
195    * @throws IOException if a remote or network exception occurs
196    * @deprecated Do not use.
197    */
198   @Deprecated
199   public HTable(TableName tableName, Connection connection) throws IOException {
200     this.tableName = tableName;
201     this.cleanupPoolOnClose = true;
202     this.cleanupConnectionOnClose = false;
203     this.connection = (ClusterConnection)connection;
204     this.configuration = connection.getConfiguration();
205 
206     this.pool = getDefaultExecutor(this.configuration);
207     this.finishSetup();
208   }
209 
210   // Marked Private @since 1.0
211   @InterfaceAudience.Private
212   public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
213     int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
214     if (maxThreads == 0) {
215       maxThreads = 1; // is there a better default?
216     }
217     long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
218 
219     // Using the "direct handoff" approach, new threads will only be created
220     // if it is necessary and will grow unbounded. This could be bad but in HCM
221     // we only create as many Runnables as there are region servers. It means
222     // it also scales when new region servers are added.
223     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
224         new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
225     pool.allowCoreThreadTimeOut(true);
226     return pool;
227   }
228 
229   /**
230    * Creates an object to access a HBase table.
231    * @param conf Configuration object to use.
232    * @param tableName Name of the table.
233    * @param pool ExecutorService to be used.
234    * @throws IOException if a remote or network exception occurs
235    * @deprecated Constructing HTable objects manually has been deprecated. Please use
236    * {@link Connection} to instantiate a {@link Table} instead.
237    */
238   @Deprecated
239   public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
240       throws IOException {
241     this(conf, TableName.valueOf(tableName), pool);
242   }
243 
244   /**
245    * Creates an object to access a HBase table.
246    * @param conf Configuration object to use.
247    * @param tableName Name of the table.
248    * @param pool ExecutorService to be used.
249    * @throws IOException if a remote or network exception occurs
250    * @deprecated Constructing HTable objects manually has been deprecated. Please use
251    * {@link Connection} to instantiate a {@link Table} instead.
252    */
253   @Deprecated
254   public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
255       throws IOException {
256     this.connection = ConnectionManager.getConnectionInternal(conf);
257     this.configuration = conf;
258     this.pool = pool;
259     if (pool == null) {
260       this.pool = getDefaultExecutor(conf);
261       this.cleanupPoolOnClose = true;
262     } else {
263       this.cleanupPoolOnClose = false;
264     }
265     this.tableName = tableName;
266     this.cleanupConnectionOnClose = true;
267     this.finishSetup();
268   }
269 
270   /**
271    * Creates an object to access a HBase table.
272    * @param tableName Name of the table.
273    * @param connection HConnection to be used.
274    * @param pool ExecutorService to be used.
275    * @throws IOException if a remote or network exception occurs.
276    * @deprecated Do not use, internal ctor.
277    */
278   @Deprecated
279   public HTable(final byte[] tableName, final Connection connection,
280       final ExecutorService pool) throws IOException {
281     this(TableName.valueOf(tableName), connection, pool);
282   }
283 
284   /** @deprecated Do not use, internal ctor. */
285   @Deprecated
286   public HTable(TableName tableName, final Connection connection,
287       final ExecutorService pool) throws IOException {
288     this(tableName, (ClusterConnection)connection, null, null, null, pool);
289   }
290 
291   /**
292    * Creates an object to access a HBase table.
293    * Used by HBase internally.  DO NOT USE. See {@link ConnectionFactory} class comment for how to
294    * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
295    * @param tableName Name of the table.
296    * @param connection HConnection to be used.
297    * @param pool ExecutorService to be used.
298    * @throws IOException if a remote or network exception occurs
299    */
300   @InterfaceAudience.Private
301   public HTable(TableName tableName, final ClusterConnection connection,
302       final ConnectionConfiguration tableConfig,
303       final RpcRetryingCallerFactory rpcCallerFactory,
304       final RpcControllerFactory rpcControllerFactory,
305       final ExecutorService pool) throws IOException {
306     if (connection == null || connection.isClosed()) {
307       throw new IllegalArgumentException("Connection is null or closed.");
308     }
309     this.tableName = tableName;
310     this.cleanupConnectionOnClose = false;
311     this.connection = connection;
312     this.configuration = connection.getConfiguration();
313     this.connConfiguration = tableConfig;
314     this.pool = pool;
315     if (pool == null) {
316       this.pool = getDefaultExecutor(this.configuration);
317       this.cleanupPoolOnClose = true;
318     } else {
319       this.cleanupPoolOnClose = false;
320     }
321 
322     this.rpcCallerFactory = rpcCallerFactory;
323     this.rpcControllerFactory = rpcControllerFactory;
324 
325     this.finishSetup();
326   }
327 
328   /**
329    * For internal testing. Uses Connection provided in {@code params}.
330    * @throws IOException
331    */
332   @VisibleForTesting
333   protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
334     connection = conn;
335     tableName = params.getTableName();
336     connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
337     cleanupPoolOnClose = false;
338     cleanupConnectionOnClose = false;
339     // used from tests, don't trust the connection is real
340     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
341   }
342 
343   /**
344    * @return maxKeyValueSize from configuration.
345    */
346   public static int getMaxKeyValueSize(Configuration conf) {
347     return conf.getInt("hbase.client.keyvalue.maxsize", -1);
348   }
349 
350   /**
351    * setup this HTable's parameter based on the passed configuration
352    */
353   private void finishSetup() throws IOException {
354     if (connConfiguration == null) {
355       connConfiguration = new ConnectionConfiguration(configuration);
356     }
357 
358     this.operationTimeout = tableName.isSystemTable() ?
359         connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
360     this.scannerCaching = connConfiguration.getScannerCaching();
361     this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
362     if (this.rpcCallerFactory == null) {
363       this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
364     }
365     if (this.rpcControllerFactory == null) {
366       this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
367     }
368 
369     // puts need to track errors globally due to how the APIs currently work.
370     multiAp = this.connection.getAsyncProcess();
371 
372     this.closed = false;
373 
374     this.locator = new HRegionLocator(tableName, connection);
375   }
376 
377   /**
378    * {@inheritDoc}
379    */
380   @Override
381   public Configuration getConfiguration() {
382     return configuration;
383   }
384 
385   /**
386    * Tells whether or not a table is enabled or not. This method creates a
387    * new HBase configuration, so it might make your unit tests fail due to
388    * incorrect ZK client port.
389    * @param tableName Name of table to check.
390    * @return {@code true} if table is online.
391    * @throws IOException if a remote or network exception occurs
392    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
393    */
394   @Deprecated
395   public static boolean isTableEnabled(String tableName) throws IOException {
396     return isTableEnabled(TableName.valueOf(tableName));
397   }
398 
399   /**
400    * Tells whether or not a table is enabled or not. This method creates a
401    * new HBase configuration, so it might make your unit tests fail due to
402    * incorrect ZK client port.
403    * @param tableName Name of table to check.
404    * @return {@code true} if table is online.
405    * @throws IOException if a remote or network exception occurs
406    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
407    */
408   @Deprecated
409   public static boolean isTableEnabled(byte[] tableName) throws IOException {
410     return isTableEnabled(TableName.valueOf(tableName));
411   }
412 
413   /**
414    * Tells whether or not a table is enabled or not. This method creates a
415    * new HBase configuration, so it might make your unit tests fail due to
416    * incorrect ZK client port.
417    * @param tableName Name of table to check.
418    * @return {@code true} if table is online.
419    * @throws IOException if a remote or network exception occurs
420    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
421    */
422   @Deprecated
423   public static boolean isTableEnabled(TableName tableName) throws IOException {
424     return isTableEnabled(HBaseConfiguration.create(), tableName);
425   }
426 
427   /**
428    * Tells whether or not a table is enabled or not.
429    * @param conf The Configuration object to use.
430    * @param tableName Name of table to check.
431    * @return {@code true} if table is online.
432    * @throws IOException if a remote or network exception occurs
433    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
434    */
435   @Deprecated
436   public static boolean isTableEnabled(Configuration conf, String tableName)
437   throws IOException {
438     return isTableEnabled(conf, TableName.valueOf(tableName));
439   }
440 
441   /**
442    * Tells whether or not a table is enabled or not.
443    * @param conf The Configuration object to use.
444    * @param tableName Name of table to check.
445    * @return {@code true} if table is online.
446    * @throws IOException if a remote or network exception occurs
447    * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
448    */
449   @Deprecated
450   public static boolean isTableEnabled(Configuration conf, byte[] tableName)
451   throws IOException {
452     return isTableEnabled(conf, TableName.valueOf(tableName));
453   }
454 
455   /**
456    * Tells whether or not a table is enabled or not.
457    * @param conf The Configuration object to use.
458    * @param tableName Name of table to check.
459    * @return {@code true} if table is online.
460    * @throws IOException if a remote or network exception occurs
461    * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
462    */
463   @Deprecated
464   public static boolean isTableEnabled(Configuration conf,
465       final TableName tableName) throws IOException {
466     return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
467       @Override
468       public Boolean connect(HConnection connection) throws IOException {
469         return connection.isTableEnabled(tableName);
470       }
471     });
472   }
473 
474   /**
475    * Find region location hosting passed row using cached info
476    * @param row Row to find.
477    * @return The location of the given row.
478    * @throws IOException if a remote or network exception occurs
479    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
480    */
481   @Deprecated
482   public HRegionLocation getRegionLocation(final String row)
483   throws IOException {
484     return getRegionLocation(Bytes.toBytes(row), false);
485   }
486 
487   /**
488    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead.
489    */
490   @Override
491   @Deprecated
492   public HRegionLocation getRegionLocation(final byte [] row)
493   throws IOException {
494     return locator.getRegionLocation(row);
495   }
496 
497   /**
498    * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead.
499    */
500   @Override
501   @Deprecated
502   public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
503   throws IOException {
504     return locator.getRegionLocation(row, reload);
505   }
506 
507   /**
508    * {@inheritDoc}
509    */
510   @Override
511   public byte [] getTableName() {
512     return this.tableName.getName();
513   }
514 
515   @Override
516   public TableName getName() {
517     return tableName;
518   }
519 
520   /**
521    * <em>INTERNAL</em> Used by unit tests and tools to do low-level
522    * manipulations.
523    * @return An HConnection instance.
524    * @deprecated This method will be changed from public to package protected.
525    */
526   // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
527   @Deprecated
528   @VisibleForTesting
529   public HConnection getConnection() {
530     return this.connection;
531   }
532 
533   /**
534    * Gets the number of rows that a scanner will fetch at once.
535    * <p>
536    * The default value comes from {@code hbase.client.scanner.caching}.
537    * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
538    */
539   @Deprecated
540   public int getScannerCaching() {
541     return scannerCaching;
542   }
543 
544   /**
545    * Kept in 0.96 for backward compatibility
546    * @deprecated  since 0.96. This is an internal buffer that should not be read nor write.
547    */
548   @Deprecated
549   public List<Row> getWriteBuffer() {
550     return mutator == null ? null : mutator.getWriteBuffer();
551   }
552 
553   /**
554    * Sets the number of rows that a scanner will fetch at once.
555    * <p>
556    * This will override the value specified by
557    * {@code hbase.client.scanner.caching}.
558    * Increasing this value will reduce the amount of work needed each time
559    * {@code next()} is called on a scanner, at the expense of memory use
560    * (since more rows will need to be maintained in memory by the scanners).
561    * @param scannerCaching the number of rows a scanner will fetch at once.
562    * @deprecated Use {@link Scan#setCaching(int)}
563    */
564   @Deprecated
565   public void setScannerCaching(int scannerCaching) {
566     this.scannerCaching = scannerCaching;
567   }
568 
569   /**
570    * {@inheritDoc}
571    */
572   @Override
573   public HTableDescriptor getTableDescriptor() throws IOException {
574     HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
575       rpcControllerFactory, operationTimeout);
576     if (htd != null) {
577       return new UnmodifyableHTableDescriptor(htd);
578     }
579     return null;
580   }
581 
582   /**
583    * To be removed in 2.0.0.
584    * @deprecated Since 1.1.0. Use {@link RegionLocator#getStartEndKeys()} instead
585    */
586   @Override
587   @Deprecated
588   public byte [][] getStartKeys() throws IOException {
589     return locator.getStartKeys();
590   }
591 
592   /**
593    * To be removed in 2.0.0.
594    * @deprecated Since 1.1.0. Use {@link RegionLocator#getEndKeys()} instead;
595    */
596   @Override
597   @Deprecated
598   public byte[][] getEndKeys() throws IOException {
599     return locator.getEndKeys();
600   }
601 
602   /**
603    * To be removed in 2.0.0.
604    * @deprecated Since 1.1.0. Use {@link RegionLocator#getStartEndKeys()} instead;
605    */
606   @Override
607   @Deprecated
608   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
609     return locator.getStartEndKeys();
610   }
611 
612   /**
613    * Gets all the regions and their address for this table.
614    * <p>
615    * This is mainly useful for the MapReduce integration.
616    * @return A map of HRegionInfo with it's server address
617    * @throws IOException if a remote or network exception occurs
618    * @deprecated This is no longer a public API.  Use {@link #getAllRegionLocations()} instead.
619    */
620   @Deprecated
621   public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
622     // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular, returns an HRegionLocation.
623     return MetaScanner.allTableRegions(this.connection, getName());
624   }
625 
626   /**
627    * Gets all the regions and their address for this table.
628    * <p>
629    * This is mainly useful for the MapReduce integration.
630    * @return A map of HRegionInfo with it's server address
631    * @throws IOException if a remote or network exception occurs
632    *
633    * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead;
634    */
635   @Override
636   @Deprecated
637   public List<HRegionLocation> getAllRegionLocations() throws IOException {
638     return locator.getAllRegionLocations();
639   }
640 
641   /**
642    * Get the corresponding regions for an arbitrary range of keys.
643    * <p>
644    * @param startKey Starting row in range, inclusive
645    * @param endKey Ending row in range, exclusive
646    * @return A list of HRegionLocations corresponding to the regions that
647    * contain the specified range
648    * @throws IOException if a remote or network exception occurs
649    * @deprecated This is no longer a public API
650    */
651   @Deprecated
652   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
653     final byte [] endKey) throws IOException {
654     return getRegionsInRange(startKey, endKey, false);
655   }
656 
657   /**
658    * Get the corresponding regions for an arbitrary range of keys.
659    * <p>
660    * @param startKey Starting row in range, inclusive
661    * @param endKey Ending row in range, exclusive
662    * @param reload true to reload information or false to use cached information
663    * @return A list of HRegionLocations corresponding to the regions that
664    * contain the specified range
665    * @throws IOException if a remote or network exception occurs
666    * @deprecated This is no longer a public API
667    */
668   @Deprecated
669   public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
670       final byte [] endKey, final boolean reload) throws IOException {
671     return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
672   }
673 
674   /**
675    * Get the corresponding start keys and regions for an arbitrary range of
676    * keys.
677    * <p>
678    * @param startKey Starting row in range, inclusive
679    * @param endKey Ending row in range
680    * @param includeEndKey true if endRow is inclusive, false if exclusive
681    * @return A pair of list of start keys and list of HRegionLocations that
682    *         contain the specified range
683    * @throws IOException if a remote or network exception occurs
684    * @deprecated This is no longer a public API
685    */
686   @Deprecated
687   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
688       final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
689       throws IOException {
690     return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
691   }
692 
693   /**
694    * Get the corresponding start keys and regions for an arbitrary range of
695    * keys.
696    * <p>
697    * @param startKey Starting row in range, inclusive
698    * @param endKey Ending row in range
699    * @param includeEndKey true if endRow is inclusive, false if exclusive
700    * @param reload true to reload information or false to use cached information
701    * @return A pair of list of start keys and list of HRegionLocations that
702    *         contain the specified range
703    * @throws IOException if a remote or network exception occurs
704    * @deprecated This is no longer a public API
705    */
706   @Deprecated
707   private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
708       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
709       final boolean reload) throws IOException {
710     final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
711     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
712       throw new IllegalArgumentException(
713         "Invalid range: " + Bytes.toStringBinary(startKey) +
714         " > " + Bytes.toStringBinary(endKey));
715     }
716     List<byte[]> keysInRange = new ArrayList<byte[]>();
717     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
718     byte[] currentKey = startKey;
719     do {
720       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
721       keysInRange.add(currentKey);
722       regionsInRange.add(regionLocation);
723       currentKey = regionLocation.getRegionInfo().getEndKey();
724     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
725         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
726             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
727     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
728         regionsInRange);
729   }
730 
731   /**
732    * {@inheritDoc}
733    * @deprecated Use reversed scan instead.
734    */
735    @Override
736    @Deprecated
737    public Result getRowOrBefore(final byte[] row, final byte[] family)
738        throws IOException {
739      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
740          tableName, row) {
741        @Override
742       public Result call(int callTimeout) throws IOException {
743          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
744          controller.setPriority(tableName);
745          controller.setCallTimeout(callTimeout);
746          ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
747              getLocation().getRegionInfo().getRegionName(), row, family);
748          try {
749            ClientProtos.GetResponse response = getStub().get(controller, request);
750            if (!response.hasResult()) return null;
751            return ProtobufUtil.toResult(response.getResult());
752          } catch (ServiceException se) {
753            throw ProtobufUtil.getRemoteException(se);
754          }
755        }
756      };
757      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
758    }
759 
760   /**
761    * The underlying {@link HTable} must not be closed.
762    * {@link HTableInterface#getScanner(Scan)} has other usage details.
763    */
764   @Override
765   public ResultScanner getScanner(final Scan scan) throws IOException {
766     if (scan.getBatch() > 0 && scan.isSmall()) {
767       throw new IllegalArgumentException("Small scan should not be used with batching");
768     }
769 
770     if (scan.getCaching() <= 0) {
771       scan.setCaching(getScannerCaching());
772     }
773     if (scan.getMaxResultSize() <= 0) {
774       scan.setMaxResultSize(scannerMaxResultSize);
775     }
776 
777     if (scan.isReversed()) {
778       if (scan.isSmall()) {
779         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
780             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
781             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
782       } else {
783         return new ReversedClientScanner(getConfiguration(), scan, getName(),
784             this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
785             pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
786       }
787     }
788 
789     if (scan.isSmall()) {
790       return new ClientSmallScanner(getConfiguration(), scan, getName(),
791           this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
792           pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
793     } else {
794       return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
795           this.rpcCallerFactory, this.rpcControllerFactory,
796           pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
797     }
798   }
799 
800   /**
801    * The underlying {@link HTable} must not be closed.
802    * {@link HTableInterface#getScanner(byte[])} has other usage details.
803    */
804   @Override
805   public ResultScanner getScanner(byte [] family) throws IOException {
806     Scan scan = new Scan();
807     scan.addFamily(family);
808     return getScanner(scan);
809   }
810 
811   /**
812    * The underlying {@link HTable} must not be closed.
813    * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
814    */
815   @Override
816   public ResultScanner getScanner(byte [] family, byte [] qualifier)
817   throws IOException {
818     Scan scan = new Scan();
819     scan.addColumn(family, qualifier);
820     return getScanner(scan);
821   }
822 
823   /**
824    * {@inheritDoc}
825    */
826   @Override
827   public Result get(final Get get) throws IOException {
828     return get(get, get.isCheckExistenceOnly());
829   }
830 
831   private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
832     // if we are changing settings to the get, clone it.
833     if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
834       get = ReflectionUtils.newInstance(get.getClass(), get);
835       get.setCheckExistenceOnly(checkExistenceOnly);
836       if (get.getConsistency() == null){
837         get.setConsistency(defaultConsistency);
838       }
839     }
840 
841     if (get.getConsistency() == Consistency.STRONG) {
842       // Good old call.
843       final Get getReq = get;
844       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
845           getName(), get.getRow()) {
846         @Override
847         public Result call(int callTimeout) throws IOException {
848           ClientProtos.GetRequest request =
849             RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
850           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
851           controller.setPriority(tableName);
852           controller.setCallTimeout(callTimeout);
853           try {
854             ClientProtos.GetResponse response = getStub().get(controller, request);
855             if (response == null) return null;
856             return ProtobufUtil.toResult(response.getResult());
857           } catch (ServiceException se) {
858             throw ProtobufUtil.getRemoteException(se);
859           }
860         }
861       };
862       return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
863     }
864 
865     // Call that takes into account the replica
866     RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
867       rpcControllerFactory, tableName, this.connection, get, pool,
868       connConfiguration.getRetriesNumber(),
869       operationTimeout,
870       connConfiguration.getPrimaryCallTimeoutMicroSecond());
871     return callable.call();
872   }
873 
874 
875   /**
876    * {@inheritDoc}
877    */
878   @Override
879   public Result[] get(List<Get> gets) throws IOException {
880     if (gets.size() == 1) {
881       return new Result[]{get(gets.get(0))};
882     }
883     try {
884       Object [] r1 = batch((List)gets);
885 
886       // translate.
887       Result [] results = new Result[r1.length];
888       int i=0;
889       for (Object o : r1) {
890         // batch ensures if there is a failure we get an exception instead
891         results[i++] = (Result) o;
892       }
893 
894       return results;
895     } catch (InterruptedException e) {
896       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
897     }
898   }
899 
900   /**
901    * {@inheritDoc}
902    */
903   @Override
904   public void batch(final List<? extends Row> actions, final Object[] results)
905       throws InterruptedException, IOException {
906     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
907     ars.waitUntilDone();
908     if (ars.hasError()) {
909       throw ars.getErrors();
910     }
911   }
912 
913   /**
914    * {@inheritDoc}
915    * @deprecated If any exception is thrown by one of the actions, there is no way to
916    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
917    */
918   @Deprecated
919   @Override
920   public Object[] batch(final List<? extends Row> actions)
921      throws InterruptedException, IOException {
922     Object[] results = new Object[actions.size()];
923     batch(actions, results);
924     return results;
925   }
926 
927   /**
928    * {@inheritDoc}
929    */
930   @Override
931   public <R> void batchCallback(
932       final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
933       throws IOException, InterruptedException {
934     connection.processBatchCallback(actions, tableName, pool, results, callback);
935   }
936 
937   /**
938    * {@inheritDoc}
939    * @deprecated If any exception is thrown by one of the actions, there is no way to
940    * retrieve the partially executed results. Use
941    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
942    * instead.
943    */
944   @Deprecated
945   @Override
946   public <R> Object[] batchCallback(
947     final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
948       InterruptedException {
949     Object[] results = new Object[actions.size()];
950     batchCallback(actions, results, callback);
951     return results;
952   }
953 
954   /**
955    * {@inheritDoc}
956    */
957   @Override
958   public void delete(final Delete delete)
959   throws IOException {
960     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
961         tableName, delete.getRow()) {
962       @Override
963       public Boolean call(int callTimeout) throws IOException {
964         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
965         controller.setPriority(tableName);
966         controller.setCallTimeout(callTimeout);
967 
968         try {
969           MutateRequest request = RequestConverter.buildMutateRequest(
970             getLocation().getRegionInfo().getRegionName(), delete);
971           MutateResponse response = getStub().mutate(controller, request);
972           return Boolean.valueOf(response.getProcessed());
973         } catch (ServiceException se) {
974           throw ProtobufUtil.getRemoteException(se);
975         }
976       }
977     };
978     rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
979   }
980 
981   /**
982    * {@inheritDoc}
983    */
984   @Override
985   public void delete(final List<Delete> deletes)
986   throws IOException {
987     Object[] results = new Object[deletes.size()];
988     try {
989       batch(deletes, results);
990     } catch (InterruptedException e) {
991       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
992     } finally {
993       // mutate list so that it is empty for complete success, or contains only failed records
994       // results are returned in the same order as the requests in list
995       // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
996       for (int i = results.length - 1; i>=0; i--) {
997         // if result is not null, it succeeded
998         if (results[i] instanceof Result) {
999           deletes.remove(i);
1000         }
1001       }
1002     }
1003   }
1004 
1005   /**
1006    * {@inheritDoc}
1007    * @throws IOException
1008    */
1009   @Override
1010   public void put(final Put put) throws IOException {
1011     getBufferedMutator().mutate(put);
1012     if (autoFlush) {
1013       flushCommits();
1014     }
1015   }
1016 
1017   /**
1018    * {@inheritDoc}
1019    * @throws IOException
1020    */
1021   @Override
1022   public void put(final List<Put> puts) throws IOException {
1023     getBufferedMutator().mutate(puts);
1024     if (autoFlush) {
1025       flushCommits();
1026     }
1027   }
1028 
1029   /**
1030    * {@inheritDoc}
1031    */
1032   @Override
1033   public void mutateRow(final RowMutations rm) throws IOException {
1034     RegionServerCallable<Void> callable =
1035         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1036       @Override
1037       public Void call(int callTimeout) throws IOException {
1038         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1039         controller.setPriority(tableName);
1040         controller.setCallTimeout(callTimeout);
1041         try {
1042           RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1043             getLocation().getRegionInfo().getRegionName(), rm);
1044           regionMutationBuilder.setAtomic(true);
1045           MultiRequest request =
1046             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1047           ClientProtos.MultiResponse response = getStub().multi(controller, request);
1048           ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1049           if (res.hasException()) {
1050             Throwable ex = ProtobufUtil.toException(res.getException());
1051             if(ex instanceof IOException) {
1052               throw (IOException)ex;
1053             }
1054             throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
1055           }
1056         } catch (ServiceException se) {
1057           throw ProtobufUtil.getRemoteException(se);
1058         }
1059         return null;
1060       }
1061     };
1062     rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1063   }
1064 
1065   /**
1066    * {@inheritDoc}
1067    */
1068   @Override
1069   public Result append(final Append append) throws IOException {
1070     if (append.numFamilies() == 0) {
1071       throw new IOException(
1072           "Invalid arguments to append, no columns specified");
1073     }
1074 
1075     NonceGenerator ng = this.connection.getNonceGenerator();
1076     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1077     RegionServerCallable<Result> callable =
1078       new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1079         @Override
1080         public Result call(int callTimeout) throws IOException {
1081           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1082           controller.setPriority(getTableName());
1083           controller.setCallTimeout(callTimeout);
1084           try {
1085             MutateRequest request = RequestConverter.buildMutateRequest(
1086               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1087             MutateResponse response = getStub().mutate(controller, request);
1088             if (!response.hasResult()) return null;
1089             return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1090           } catch (ServiceException se) {
1091             throw ProtobufUtil.getRemoteException(se);
1092           }
1093         }
1094       };
1095     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1096   }
1097 
1098   /**
1099    * {@inheritDoc}
1100    */
1101   @Override
1102   public Result increment(final Increment increment) throws IOException {
1103     if (!increment.hasFamilies()) {
1104       throw new IOException(
1105           "Invalid arguments to increment, no columns specified");
1106     }
1107     NonceGenerator ng = this.connection.getNonceGenerator();
1108     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1109     RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1110         getName(), increment.getRow()) {
1111       @Override
1112       public Result call(int callTimeout) throws IOException {
1113         PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1114         controller.setPriority(getTableName());
1115         controller.setCallTimeout(callTimeout);
1116         try {
1117           MutateRequest request = RequestConverter.buildMutateRequest(
1118             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1119           MutateResponse response = getStub().mutate(controller, request);
1120           return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1121         } catch (ServiceException se) {
1122           throw ProtobufUtil.getRemoteException(se);
1123         }
1124       }
1125     };
1126     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1127   }
1128 
1129   /**
1130    * {@inheritDoc}
1131    */
1132   @Override
1133   public long incrementColumnValue(final byte [] row, final byte [] family,
1134       final byte [] qualifier, final long amount)
1135   throws IOException {
1136     return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1137   }
1138 
1139   /**
1140    * @deprecated As of release 0.96
1141    *             (<a href="https://issues.apache.org/jira/browse/HBASE-9508">HBASE-9508</a>).
1142    *             This will be removed in HBase 2.0.0.
1143    *             Use {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}.
1144    */
1145   @Deprecated
1146   @Override
1147   public long incrementColumnValue(final byte [] row, final byte [] family,
1148       final byte [] qualifier, final long amount, final boolean writeToWAL)
1149   throws IOException {
1150     return incrementColumnValue(row, family, qualifier, amount,
1151       writeToWAL? Durability.SYNC_WAL: Durability.SKIP_WAL);
1152   }
1153 
1154   /**
1155    * {@inheritDoc}
1156    */
1157   @Override
1158   public long incrementColumnValue(final byte [] row, final byte [] family,
1159       final byte [] qualifier, final long amount, final Durability durability)
1160   throws IOException {
1161     NullPointerException npe = null;
1162     if (row == null) {
1163       npe = new NullPointerException("row is null");
1164     } else if (family == null) {
1165       npe = new NullPointerException("family is null");
1166     } else if (qualifier == null) {
1167       npe = new NullPointerException("qualifier is null");
1168     }
1169     if (npe != null) {
1170       throw new IOException(
1171           "Invalid arguments to incrementColumnValue", npe);
1172     }
1173 
1174     NonceGenerator ng = this.connection.getNonceGenerator();
1175     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1176     RegionServerCallable<Long> callable =
1177       new RegionServerCallable<Long>(connection, getName(), row) {
1178         @Override
1179         public Long call(int callTimeout) throws IOException {
1180           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1181           controller.setPriority(getTableName());
1182           controller.setCallTimeout(callTimeout);
1183           try {
1184             MutateRequest request = RequestConverter.buildIncrementRequest(
1185               getLocation().getRegionInfo().getRegionName(), row, family,
1186               qualifier, amount, durability, nonceGroup, nonce);
1187             MutateResponse response = getStub().mutate(controller, request);
1188             Result result =
1189               ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1190             return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1191           } catch (ServiceException se) {
1192             throw ProtobufUtil.getRemoteException(se);
1193           }
1194         }
1195       };
1196     return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1197   }
1198 
1199   /**
1200    * {@inheritDoc}
1201    */
1202   @Override
1203   public boolean checkAndPut(final byte [] row,
1204       final byte [] family, final byte [] qualifier, final byte [] value,
1205       final Put put)
1206   throws IOException {
1207     RegionServerCallable<Boolean> callable =
1208       new RegionServerCallable<Boolean>(connection, getName(), row) {
1209         @Override
1210         public Boolean call(int callTimeout) throws IOException {
1211           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1212           controller.setPriority(tableName);
1213           controller.setCallTimeout(callTimeout);
1214           try {
1215             MutateRequest request = RequestConverter.buildMutateRequest(
1216                 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1217                 new BinaryComparator(value), CompareType.EQUAL, put);
1218             MutateResponse response = getStub().mutate(controller, request);
1219             return Boolean.valueOf(response.getProcessed());
1220           } catch (ServiceException se) {
1221             throw ProtobufUtil.getRemoteException(se);
1222           }
1223         }
1224       };
1225     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1226   }
1227 
1228   /**
1229    * {@inheritDoc}
1230    */
1231   @Override
1232   public boolean checkAndPut(final byte [] row, final byte [] family,
1233       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1234       final Put put)
1235   throws IOException {
1236     RegionServerCallable<Boolean> callable =
1237       new RegionServerCallable<Boolean>(connection, getName(), row) {
1238         @Override
1239         public Boolean call(int callTimeout) throws IOException {
1240           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1241           controller.setPriority(tableName);
1242           controller.setCallTimeout(callTimeout);
1243           try {
1244             CompareType compareType = CompareType.valueOf(compareOp.name());
1245             MutateRequest request = RequestConverter.buildMutateRequest(
1246               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1247                 new BinaryComparator(value), compareType, put);
1248             MutateResponse response = getStub().mutate(controller, request);
1249             return Boolean.valueOf(response.getProcessed());
1250           } catch (ServiceException se) {
1251             throw ProtobufUtil.getRemoteException(se);
1252           }
1253         }
1254       };
1255     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1256   }
1257 
1258   /**
1259    * {@inheritDoc}
1260    */
1261   @Override
1262   public boolean checkAndDelete(final byte [] row,
1263       final byte [] family, final byte [] qualifier, final byte [] value,
1264       final Delete delete)
1265   throws IOException {
1266     RegionServerCallable<Boolean> callable =
1267       new RegionServerCallable<Boolean>(connection, getName(), row) {
1268         @Override
1269         public Boolean call(int callTimeout) throws IOException {
1270           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1271           controller.setPriority(tableName);
1272           controller.setCallTimeout(callTimeout);
1273           try {
1274             MutateRequest request = RequestConverter.buildMutateRequest(
1275               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1276                 new BinaryComparator(value), CompareType.EQUAL, delete);
1277             MutateResponse response = getStub().mutate(controller, request);
1278             return Boolean.valueOf(response.getProcessed());
1279           } catch (ServiceException se) {
1280             throw ProtobufUtil.getRemoteException(se);
1281           }
1282         }
1283       };
1284     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1285   }
1286 
1287   /**
1288    * {@inheritDoc}
1289    */
1290   @Override
1291   public boolean checkAndDelete(final byte [] row, final byte [] family,
1292       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1293       final Delete delete)
1294   throws IOException {
1295     RegionServerCallable<Boolean> callable =
1296       new RegionServerCallable<Boolean>(connection, getName(), row) {
1297         @Override
1298         public Boolean call(int callTimeout) throws IOException {
1299           PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1300           controller.setPriority(tableName);
1301           controller.setCallTimeout(callTimeout);
1302           try {
1303             CompareType compareType = CompareType.valueOf(compareOp.name());
1304             MutateRequest request = RequestConverter.buildMutateRequest(
1305               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1306                 new BinaryComparator(value), compareType, delete);
1307             MutateResponse response = getStub().mutate(controller, request);
1308             return Boolean.valueOf(response.getProcessed());
1309           } catch (ServiceException se) {
1310             throw ProtobufUtil.getRemoteException(se);
1311           }
1312         }
1313       };
1314     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1315   }
1316 
1317   /**
1318    * {@inheritDoc}
1319    */
1320   @Override
1321   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1322       final CompareOp compareOp, final byte [] value, final RowMutations rm)
1323   throws IOException {
1324     RegionServerCallable<Boolean> callable =
1325         new RegionServerCallable<Boolean>(connection, getName(), row) {
1326           @Override
1327           public Boolean call(int callTimeout) throws IOException {
1328             PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1329             controller.setPriority(tableName);
1330             controller.setCallTimeout(callTimeout);
1331             try {
1332               CompareType compareType = CompareType.valueOf(compareOp.name());
1333               MultiRequest request = RequestConverter.buildMutateRequest(
1334                   getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1335                   new BinaryComparator(value), compareType, rm);
1336               ClientProtos.MultiResponse response = getStub().multi(controller, request);
1337               ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1338               if (res.hasException()) {
1339                 Throwable ex = ProtobufUtil.toException(res.getException());
1340                 if(ex instanceof IOException) {
1341                   throw (IOException)ex;
1342                 }
1343                 throw new IOException("Failed to checkAndMutate row: "+
1344                     Bytes.toStringBinary(rm.getRow()), ex);
1345               }
1346               return Boolean.valueOf(response.getProcessed());
1347             } catch (ServiceException se) {
1348               throw ProtobufUtil.getRemoteException(se);
1349             }
1350           }
1351         };
1352     return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1353   }
1354 
1355   /**
1356    * {@inheritDoc}
1357    */
1358   @Override
1359   public boolean exists(final Get get) throws IOException {
1360     Result r = get(get, true);
1361     assert r.getExists() != null;
1362     return r.getExists();
1363   }
1364 
1365   /**
1366    * {@inheritDoc}
1367    */
1368   @Override
1369   public boolean[] existsAll(final List<Get> gets) throws IOException {
1370     if (gets.isEmpty()) return new boolean[]{};
1371     if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1372 
1373     ArrayList<Get> exists = new ArrayList<Get>(gets.size());
1374     for (Get g: gets){
1375       Get ge = new Get(g);
1376       ge.setCheckExistenceOnly(true);
1377       exists.add(ge);
1378     }
1379 
1380     Object[] r1;
1381     try {
1382       r1 = batch(exists);
1383     } catch (InterruptedException e) {
1384       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1385     }
1386 
1387     // translate.
1388     boolean[] results = new boolean[r1.length];
1389     int i = 0;
1390     for (Object o : r1) {
1391       // batch ensures if there is a failure we get an exception instead
1392       results[i++] = ((Result)o).getExists();
1393     }
1394 
1395     return results;
1396   }
1397 
1398   /**
1399    * {@inheritDoc}
1400    */
1401   @Override
1402   @Deprecated
1403   public Boolean[] exists(final List<Get> gets) throws IOException {
1404     boolean[] results = existsAll(gets);
1405     Boolean[] objectResults = new Boolean[results.length];
1406     for (int i = 0; i < results.length; ++i) {
1407       objectResults[i] = results[i];
1408     }
1409     return objectResults;
1410   }
1411 
1412   /**
1413    * {@inheritDoc}
1414    * @throws IOException
1415    */
1416   @Override
1417   public void flushCommits() throws IOException {
1418     if (mutator == null) {
1419       // nothing to flush if there's no mutator; don't bother creating one.
1420       return;
1421     }
1422     getBufferedMutator().flush();
1423   }
1424 
1425   /**
1426    * Process a mixed batch of Get, Put and Delete actions. All actions for a
1427    * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
1428    *
1429    * @param list The collection of actions.
1430    * @param results An empty array, same size as list. If an exception is thrown,
1431    * you can test here for partial results, and to determine which actions
1432    * processed successfully.
1433    * @throws IOException if there are problems talking to META. Per-item
1434    * exceptions are stored in the results array.
1435    */
1436   public <R> void processBatchCallback(
1437     final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1438     throws IOException, InterruptedException {
1439     this.batchCallback(list, results, callback);
1440   }
1441 
1442 
1443   /**
1444    * Parameterized batch processing, allowing varying return types for different
1445    * {@link Row} implementations.
1446    */
1447   public void processBatch(final List<? extends Row> list, final Object[] results)
1448     throws IOException, InterruptedException {
1449     this.batch(list, results);
1450   }
1451 
1452 
1453   @Override
1454   public void close() throws IOException {
1455     if (this.closed) {
1456       return;
1457     }
1458     flushCommits();
1459     if (cleanupPoolOnClose) {
1460       this.pool.shutdown();
1461       try {
1462         boolean terminated = false;
1463         do {
1464           // wait until the pool has terminated
1465           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1466         } while (!terminated);
1467       } catch (InterruptedException e) {
1468         this.pool.shutdownNow();
1469         LOG.warn("waitForTermination interrupted");
1470       }
1471     }
1472     if (cleanupConnectionOnClose) {
1473       if (this.connection != null) {
1474         this.connection.close();
1475       }
1476     }
1477     this.closed = true;
1478   }
1479 
1480   // validate for well-formedness
1481   public void validatePut(final Put put) throws IllegalArgumentException {
1482     validatePut(put, connConfiguration.getMaxKeyValueSize());
1483   }
1484 
1485   // validate for well-formedness
1486   public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1487     if (put.isEmpty()) {
1488       throw new IllegalArgumentException("No columns to insert");
1489     }
1490     if (maxKeyValueSize > 0) {
1491       for (List<Cell> list : put.getFamilyCellMap().values()) {
1492         for (Cell cell : list) {
1493           if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1494             throw new IllegalArgumentException("KeyValue size too large");
1495           }
1496         }
1497       }
1498     }
1499   }
1500 
1501   /**
1502    * {@inheritDoc}
1503    */
1504   @Override
1505   public boolean isAutoFlush() {
1506     return autoFlush;
1507   }
1508 
1509   /**
1510    * {@inheritDoc}
1511    */
1512   @Deprecated
1513   @Override
1514   public void setAutoFlush(boolean autoFlush) {
1515     this.autoFlush = autoFlush;
1516   }
1517 
1518   /**
1519    * {@inheritDoc}
1520    */
1521   @Override
1522   public void setAutoFlushTo(boolean autoFlush) {
1523     this.autoFlush = autoFlush;
1524   }
1525 
1526   /**
1527    * {@inheritDoc}
1528    */
1529   @Override
1530   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1531     this.autoFlush = autoFlush;
1532   }
1533 
1534   /**
1535    * Returns the maximum size in bytes of the write buffer for this HTable.
1536    * <p>
1537    * The default value comes from the configuration parameter
1538    * {@code hbase.client.write.buffer}.
1539    * @return The size of the write buffer in bytes.
1540    */
1541   @Override
1542   public long getWriteBufferSize() {
1543     if (mutator == null) {
1544       return connConfiguration.getWriteBufferSize();
1545     } else {
1546       return mutator.getWriteBufferSize();
1547     }
1548   }
1549 
1550   /**
1551    * Sets the size of the buffer in bytes.
1552    * <p>
1553    * If the new size is less than the current amount of data in the
1554    * write buffer, the buffer gets flushed.
1555    * @param writeBufferSize The new write buffer size, in bytes.
1556    * @throws IOException if a remote or network exception occurs.
1557    */
1558   @Override
1559   public void setWriteBufferSize(long writeBufferSize) throws IOException {
1560     getBufferedMutator();
1561     mutator.setWriteBufferSize(writeBufferSize);
1562   }
1563 
1564   /**
1565    * The pool is used for mutli requests for this HTable
1566    * @return the pool used for mutli
1567    */
1568   ExecutorService getPool() {
1569     return this.pool;
1570   }
1571 
1572   /**
1573    * Enable or disable region cache prefetch for the table. It will be
1574    * applied for the given table's all HTable instances who share the same
1575    * connection. By default, the cache prefetch is enabled.
1576    * @param tableName name of table to configure.
1577    * @param enable Set to true to enable region cache prefetch. Or set to
1578    * false to disable it.
1579    * @throws IOException
1580    * @deprecated does nothing since 0.99
1581    */
1582   @Deprecated
1583   public static void setRegionCachePrefetch(final byte[] tableName,
1584       final boolean enable)  throws IOException {
1585   }
1586 
1587   /**
1588    * @deprecated does nothing since 0.99
1589    */
1590   @Deprecated
1591   public static void setRegionCachePrefetch(
1592       final TableName tableName,
1593       final boolean enable) throws IOException {
1594   }
1595 
1596   /**
1597    * Enable or disable region cache prefetch for the table. It will be
1598    * applied for the given table's all HTable instances who share the same
1599    * connection. By default, the cache prefetch is enabled.
1600    * @param conf The Configuration object to use.
1601    * @param tableName name of table to configure.
1602    * @param enable Set to true to enable region cache prefetch. Or set to
1603    * false to disable it.
1604    * @throws IOException
1605    * @deprecated does nothing since 0.99
1606    */
1607   @Deprecated
1608   public static void setRegionCachePrefetch(final Configuration conf,
1609       final byte[] tableName, final boolean enable) throws IOException {
1610   }
1611 
1612   /**
1613    * @deprecated does nothing since 0.99
1614    */
1615   @Deprecated
1616   public static void setRegionCachePrefetch(final Configuration conf,
1617       final TableName tableName,
1618       final boolean enable) throws IOException {
1619   }
1620 
1621   /**
1622    * Check whether region cache prefetch is enabled or not for the table.
1623    * @param conf The Configuration object to use.
1624    * @param tableName name of table to check
1625    * @return true if table's region cache prefecth is enabled. Otherwise
1626    * it is disabled.
1627    * @throws IOException
1628    * @deprecated always return false since 0.99
1629    */
1630   @Deprecated
1631   public static boolean getRegionCachePrefetch(final Configuration conf,
1632       final byte[] tableName) throws IOException {
1633     return false;
1634   }
1635 
1636   /**
1637    * @deprecated always return false since 0.99
1638    */
1639   @Deprecated
1640   public static boolean getRegionCachePrefetch(final Configuration conf,
1641       final TableName tableName) throws IOException {
1642     return false;
1643   }
1644 
1645   /**
1646    * Check whether region cache prefetch is enabled or not for the table.
1647    * @param tableName name of table to check
1648    * @return true if table's region cache prefecth is enabled. Otherwise
1649    * it is disabled.
1650    * @throws IOException
1651    * @deprecated always return false since 0.99
1652    */
1653   @Deprecated
1654   public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1655     return false;
1656   }
1657 
1658   /**
1659    * @deprecated always return false since 0.99
1660    */
1661   @Deprecated
1662   public static boolean getRegionCachePrefetch(
1663       final TableName tableName) throws IOException {
1664     return false;
1665   }
1666 
1667   /**
1668    * Explicitly clears the region cache to fetch the latest value from META.
1669    * This is a power user function: avoid unless you know the ramifications.
1670    */
1671   public void clearRegionCache() {
1672     this.connection.clearRegionCache();
1673   }
1674 
1675   /**
1676    * {@inheritDoc}
1677    */
1678   @Override
1679   public CoprocessorRpcChannel coprocessorService(byte[] row) {
1680     return new RegionCoprocessorRpcChannel(connection, tableName, row);
1681   }
1682 
1683   /**
1684    * {@inheritDoc}
1685    */
1686   @Override
1687   public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1688       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1689       throws ServiceException, Throwable {
1690     final Map<byte[],R> results =  Collections.synchronizedMap(
1691         new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1692     coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1693       @Override
1694       public void update(byte[] region, byte[] row, R value) {
1695         if (region != null) {
1696           results.put(region, value);
1697         }
1698       }
1699     });
1700     return results;
1701   }
1702 
1703   /**
1704    * {@inheritDoc}
1705    */
1706   @Override
1707   public <T extends Service, R> void coprocessorService(final Class<T> service,
1708       byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1709       final Batch.Callback<R> callback) throws ServiceException, Throwable {
1710 
1711     // get regions covered by the row range
1712     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1713 
1714     Map<byte[],Future<R>> futures =
1715         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1716     for (final byte[] r : keys) {
1717       final RegionCoprocessorRpcChannel channel =
1718           new RegionCoprocessorRpcChannel(connection, tableName, r);
1719       Future<R> future = pool.submit(
1720           new Callable<R>() {
1721             @Override
1722             public R call() throws Exception {
1723               T instance = ProtobufUtil.newServiceStub(service, channel);
1724               R result = callable.call(instance);
1725               byte[] region = channel.getLastRegion();
1726               if (callback != null) {
1727                 callback.update(region, r, result);
1728               }
1729               return result;
1730             }
1731           });
1732       futures.put(r, future);
1733     }
1734     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1735       try {
1736         e.getValue().get();
1737       } catch (ExecutionException ee) {
1738         LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1739             + Bytes.toStringBinary(e.getKey()), ee);
1740         throw ee.getCause();
1741       } catch (InterruptedException ie) {
1742         throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1743             + " for row " + Bytes.toStringBinary(e.getKey()))
1744             .initCause(ie);
1745       }
1746     }
1747   }
1748 
1749   private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1750   throws IOException {
1751     if (start == null) {
1752       start = HConstants.EMPTY_START_ROW;
1753     }
1754     if (end == null) {
1755       end = HConstants.EMPTY_END_ROW;
1756     }
1757     return getKeysAndRegionsInRange(start, end, true).getFirst();
1758   }
1759 
1760   public void setOperationTimeout(int operationTimeout) {
1761     this.operationTimeout = operationTimeout;
1762   }
1763 
1764   public int getOperationTimeout() {
1765     return operationTimeout;
1766   }
1767 
1768   @Override
1769   public String toString() {
1770     return tableName + ";" + connection;
1771   }
1772 
1773   /**
1774    * {@inheritDoc}
1775    */
1776   @Override
1777   public <R extends Message> Map<byte[], R> batchCoprocessorService(
1778       Descriptors.MethodDescriptor methodDescriptor, Message request,
1779       byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1780     final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1781         Bytes.BYTES_COMPARATOR));
1782     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1783         new Callback<R>() {
1784 
1785           @Override
1786           public void update(byte[] region, byte[] row, R result) {
1787             if (region != null) {
1788               results.put(region, result);
1789             }
1790           }
1791         });
1792     return results;
1793   }
1794 
1795   /**
1796    * {@inheritDoc}
1797    */
1798   @Override
1799   public <R extends Message> void batchCoprocessorService(
1800       final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1801       byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1802       throws ServiceException, Throwable {
1803 
1804     if (startKey == null) {
1805       startKey = HConstants.EMPTY_START_ROW;
1806     }
1807     if (endKey == null) {
1808       endKey = HConstants.EMPTY_END_ROW;
1809     }
1810     // get regions covered by the row range
1811     Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1812         getKeysAndRegionsInRange(startKey, endKey, true);
1813     List<byte[]> keys = keysAndRegions.getFirst();
1814     List<HRegionLocation> regions = keysAndRegions.getSecond();
1815 
1816     // check if we have any calls to make
1817     if (keys.isEmpty()) {
1818       LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1819           ", end=" + Bytes.toStringBinary(endKey));
1820       return;
1821     }
1822 
1823     List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1824     final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1825         new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1826     for (int i = 0; i < keys.size(); i++) {
1827       final byte[] rowKey = keys.get(i);
1828       final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1829       RegionCoprocessorServiceExec exec =
1830           new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1831       execs.add(exec);
1832       execsByRow.put(rowKey, exec);
1833     }
1834 
1835     // tracking for any possible deserialization errors on success callback
1836     // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1837     final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1838     final List<Row> callbackErrorActions = new ArrayList<Row>();
1839     final List<String> callbackErrorServers = new ArrayList<String>();
1840     Object[] results = new Object[execs.size()];
1841 
1842     AsyncProcess asyncProcess =
1843         new AsyncProcess(connection, configuration, pool,
1844             RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1845             true, RpcControllerFactory.instantiate(configuration));
1846 
1847     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1848         new Callback<ClientProtos.CoprocessorServiceResult>() {
1849           @Override
1850           public void update(byte[] region, byte[] row,
1851                               ClientProtos.CoprocessorServiceResult serviceResult) {
1852             if (LOG.isTraceEnabled()) {
1853               LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1854                   ": region=" + Bytes.toStringBinary(region) +
1855                   ", row=" + Bytes.toStringBinary(row) +
1856                   ", value=" + serviceResult.getValue().getValue());
1857             }
1858             try {
1859               Message.Builder builder = responsePrototype.newBuilderForType();
1860               ProtobufUtil.mergeFrom(builder, serviceResult.getValue().getValue());
1861               callback.update(region, row, (R) builder.build());
1862             } catch (IOException e) {
1863               LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1864                   e);
1865               callbackErrorExceptions.add(e);
1866               callbackErrorActions.add(execsByRow.get(row));
1867               callbackErrorServers.add("null");
1868             }
1869           }
1870         }, results);
1871 
1872     future.waitUntilDone();
1873 
1874     if (future.hasError()) {
1875       throw future.getErrors();
1876     } else if (!callbackErrorExceptions.isEmpty()) {
1877       throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1878           callbackErrorServers);
1879     }
1880   }
1881 
1882   public RegionLocator getRegionLocator() {
1883     return this.locator;
1884   }
1885 
1886   @VisibleForTesting
1887   BufferedMutator getBufferedMutator() throws IOException {
1888     if (mutator == null) {
1889       this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1890           new BufferedMutatorParams(tableName)
1891               .pool(pool)
1892               .writeBufferSize(connConfiguration.getWriteBufferSize())
1893               .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
1894       );
1895     }
1896     return mutator;
1897   }
1898 }