View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.hadoop.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.coprocessor.Batch;
33  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
34  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.hbase.util.PoolMap;
37  import org.apache.hadoop.hbase.util.PoolMap.PoolType;
38  
39  import com.google.protobuf.Descriptors;
40  import com.google.protobuf.Message;
41  import com.google.protobuf.Service;
42  import com.google.protobuf.ServiceException;
43  
44  /**
45   * A simple pool of HTable instances.
46   *
47   * Each HTablePool acts as a pool for all tables. To use, instantiate an
48   * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
49   *
50     * This method is not needed anymore, clients should call
51     * HTableInterface.close() rather than returning the tables to the pool
52     *
53   * Once you are done with it, close your instance of {@link HTableInterface}
54   * by calling {@link HTableInterface#close()} rather than returning the tables
55   * to the pool with (deprecated) {@link #putTable(HTableInterface)}.
56   *
57   * <p>
58   * A pool can be created with a <i>maxSize</i> which defines the most HTable
59   * references that will ever be retained for each table. Otherwise the default
60   * is {@link Integer#MAX_VALUE}.
61   *
62   * <p>
63   * Pool will manage its own connections to the cluster. See
64   * {@link HConnectionManager}.
65   * @deprecated as of 0.98.1. See {@link HConnection#getTable(String)}.
66   */
67  @InterfaceAudience.Private
68  @Deprecated
69  public class HTablePool implements Closeable {
70    private final PoolMap<String, HTableInterface> tables;
71    private final int maxSize;
72    private final PoolType poolType;
73    private final Configuration config;
74    private final HTableInterfaceFactory tableFactory;
75  
76    /**
77     * Default Constructor. Default HBaseConfiguration and no limit on pool size.
78     */
79    public HTablePool() {
80      this(HBaseConfiguration.create(), Integer.MAX_VALUE);
81    }
82  
83    /**
84     * Constructor to set maximum versions and use the specified configuration.
85     *
86     * @param config
87     *          configuration
88     * @param maxSize
89     *          maximum number of references to keep for each table
90     */
91    public HTablePool(final Configuration config, final int maxSize) {
92      this(config, maxSize, null, null);
93    }
94  
95    /**
96     * Constructor to set maximum versions and use the specified configuration and
97     * table factory.
98     *
99     * @param config
100    *          configuration
101    * @param maxSize
102    *          maximum number of references to keep for each table
103    * @param tableFactory
104    *          table factory
105    */
106   public HTablePool(final Configuration config, final int maxSize,
107       final HTableInterfaceFactory tableFactory) {
108     this(config, maxSize, tableFactory, PoolType.Reusable);
109   }
110 
111   /**
112    * Constructor to set maximum versions and use the specified configuration and
113    * pool type.
114    *
115    * @param config
116    *          configuration
117    * @param maxSize
118    *          maximum number of references to keep for each table
119    * @param poolType
120    *          pool type which is one of {@link PoolType#Reusable} or
121    *          {@link PoolType#ThreadLocal}
122    */
123   public HTablePool(final Configuration config, final int maxSize,
124       final PoolType poolType) {
125     this(config, maxSize, null, poolType);
126   }
127 
128   /**
129    * Constructor to set maximum versions and use the specified configuration,
130    * table factory and pool type. The HTablePool supports the
131    * {@link PoolType#Reusable} and {@link PoolType#ThreadLocal}. If the pool
132    * type is null or not one of those two values, then it will default to
133    * {@link PoolType#Reusable}.
134    *
135    * @param config
136    *          configuration
137    * @param maxSize
138    *          maximum number of references to keep for each table
139    * @param tableFactory
140    *          table factory
141    * @param poolType
142    *          pool type which is one of {@link PoolType#Reusable} or
143    *          {@link PoolType#ThreadLocal}
144    */
145   public HTablePool(final Configuration config, final int maxSize,
146       final HTableInterfaceFactory tableFactory, PoolType poolType) {
147     // Make a new configuration instance so I can safely cleanup when
148     // done with the pool.
149     this.config = config == null ? HBaseConfiguration.create() : config;
150     this.maxSize = maxSize;
151     this.tableFactory = tableFactory == null ? new HTableFactory()
152         : tableFactory;
153     if (poolType == null) {
154       this.poolType = PoolType.Reusable;
155     } else {
156       switch (poolType) {
157       case Reusable:
158       case ThreadLocal:
159         this.poolType = poolType;
160         break;
161       default:
162         this.poolType = PoolType.Reusable;
163         break;
164       }
165     }
166     this.tables = new PoolMap<String, HTableInterface>(this.poolType,
167         this.maxSize);
168   }
169 
170   /**
171    * Get a reference to the specified table from the pool.
172    * <p>
173    * <p/>
174    *
175    * @param tableName
176    *          table name
177    * @return a reference to the specified table
178    * @throws RuntimeException
179    *           if there is a problem instantiating the HTable
180    */
181   public HTableInterface getTable(String tableName) {
182     // call the old getTable implementation renamed to findOrCreateTable
183     HTableInterface table = findOrCreateTable(tableName);
184     // return a proxy table so when user closes the proxy, the actual table
185     // will be returned to the pool
186     return new PooledHTable(table);
187   }
188 
189   /**
190    * Get a reference to the specified table from the pool.
191    * <p>
192    *
193    * Create a new one if one is not available.
194    *
195    * @param tableName
196    *          table name
197    * @return a reference to the specified table
198    * @throws RuntimeException
199    *           if there is a problem instantiating the HTable
200    */
201   private HTableInterface findOrCreateTable(String tableName) {
202     HTableInterface table = tables.get(tableName);
203     if (table == null) {
204       table = createHTable(tableName);
205     }
206     return table;
207   }
208 
209   /**
210    * Get a reference to the specified table from the pool.
211    * <p>
212    *
213    * Create a new one if one is not available.
214    *
215    * @param tableName
216    *          table name
217    * @return a reference to the specified table
218    * @throws RuntimeException
219    *           if there is a problem instantiating the HTable
220    */
221   public HTableInterface getTable(byte[] tableName) {
222     return getTable(Bytes.toString(tableName));
223   }
224 
225   /**
226    * This method is not needed anymore, clients should call
227    * HTableInterface.close() rather than returning the tables to the pool
228    *
229    * @param table
230    *          the proxy table user got from pool
231    * @deprecated
232    */
233   public void putTable(HTableInterface table) throws IOException {
234     // we need to be sure nobody puts a proxy implementation in the pool
235     // but if the client code is not updated
236     // and it will continue to call putTable() instead of calling close()
237     // then we need to return the wrapped table to the pool instead of the
238     // proxy
239     // table
240     if (table instanceof PooledHTable) {
241       returnTable(((PooledHTable) table).getWrappedTable());
242     } else {
243       // normally this should not happen if clients pass back the same
244       // table
245       // object they got from the pool
246       // but if it happens then it's better to reject it
247       throw new IllegalArgumentException("not a pooled table: " + table);
248     }
249   }
250 
251   /**
252    * Puts the specified HTable back into the pool.
253    * <p>
254    *
255    * If the pool already contains <i>maxSize</i> references to the table, then
256    * the table instance gets closed after flushing buffered edits.
257    *
258    * @param table
259    *          table
260    */
261   private void returnTable(HTableInterface table) throws IOException {
262     // this is the old putTable method renamed and made private
263     String tableName = Bytes.toString(table.getTableName());
264     if (tables.size(tableName) >= maxSize) {
265       // release table instance since we're not reusing it
266       this.tables.removeValue(tableName, table);
267       this.tableFactory.releaseHTableInterface(table);
268       return;
269     }
270     tables.put(tableName, table);
271   }
272 
273   protected HTableInterface createHTable(String tableName) {
274     return this.tableFactory.createHTableInterface(config,
275         Bytes.toBytes(tableName));
276   }
277 
278   /**
279    * Closes all the HTable instances , belonging to the given table, in the
280    * table pool.
281    * <p>
282    * Note: this is a 'shutdown' of the given table pool and different from
283    * {@link #putTable(HTableInterface)}, that is used to return the table
284    * instance to the pool for future re-use.
285    *
286    * @param tableName
287    */
288   public void closeTablePool(final String tableName) throws IOException {
289     Collection<HTableInterface> tables = this.tables.values(tableName);
290     if (tables != null) {
291       for (HTableInterface table : tables) {
292         this.tableFactory.releaseHTableInterface(table);
293       }
294     }
295     this.tables.remove(tableName);
296   }
297 
298   /**
299    * See {@link #closeTablePool(String)}.
300    *
301    * @param tableName
302    */
303   public void closeTablePool(final byte[] tableName) throws IOException {
304     closeTablePool(Bytes.toString(tableName));
305   }
306 
307   /**
308    * Closes all the HTable instances , belonging to all tables in the table
309    * pool.
310    * <p>
311    * Note: this is a 'shutdown' of all the table pools.
312    */
313   public void close() throws IOException {
314     for (String tableName : tables.keySet()) {
315       closeTablePool(tableName);
316     }
317     this.tables.clear();
318   }
319 
320   public int getCurrentPoolSize(String tableName) {
321     return tables.size(tableName);
322   }
323 
324   /**
325    * A proxy class that implements HTableInterface.close method to return the
326    * wrapped table back to the table pool
327    *
328    */
329   class PooledHTable implements HTableInterface {
330 
331     private boolean open = false;
332 
333     private HTableInterface table; // actual table implementation
334 
335     public PooledHTable(HTableInterface table) {
336       this.table = table;
337       this.open = true;
338     }
339 
340     @Override
341     public byte[] getTableName() {
342       checkState();
343       return table.getTableName();
344     }
345 
346     @Override
347     public TableName getName() {
348       return table.getName();
349     }
350 
351     @Override
352     public Configuration getConfiguration() {
353       checkState();
354       return table.getConfiguration();
355     }
356 
357     @Override
358     public HTableDescriptor getTableDescriptor() throws IOException {
359       checkState();
360       return table.getTableDescriptor();
361     }
362 
363     @Override
364     public boolean exists(Get get) throws IOException {
365       checkState();
366       return table.exists(get);
367     }
368 
369     @Override
370     public Boolean[] exists(List<Get> gets) throws IOException {
371       checkState();
372       return table.exists(gets);
373     }
374 
375     @Override
376     public void batch(List<? extends Row> actions, Object[] results) throws IOException,
377         InterruptedException {
378       checkState();
379       table.batch(actions, results);
380     }
381 
382     /**
383      * {@inheritDoc}
384      * @deprecated If any exception is thrown by one of the actions, there is no way to
385      * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
386      */
387     @Override
388     public Object[] batch(List<? extends Row> actions) throws IOException,
389         InterruptedException {
390       checkState();
391       return table.batch(actions);
392     }
393 
394     @Override
395     public Result get(Get get) throws IOException {
396       checkState();
397       return table.get(get);
398     }
399 
400     @Override
401     public Result[] get(List<Get> gets) throws IOException {
402       checkState();
403       return table.get(gets);
404     }
405 
406     @Override
407     @SuppressWarnings("deprecation")
408     public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
409       checkState();
410       return table.getRowOrBefore(row, family);
411     }
412 
413     @Override
414     public ResultScanner getScanner(Scan scan) throws IOException {
415       checkState();
416       return table.getScanner(scan);
417     }
418 
419     @Override
420     public ResultScanner getScanner(byte[] family) throws IOException {
421       checkState();
422       return table.getScanner(family);
423     }
424 
425     @Override
426     public ResultScanner getScanner(byte[] family, byte[] qualifier)
427         throws IOException {
428       checkState();
429       return table.getScanner(family, qualifier);
430     }
431 
432     @Override
433     public void put(Put put) throws IOException {
434       checkState();
435       table.put(put);
436     }
437 
438     @Override
439     public void put(List<Put> puts) throws IOException {
440       checkState();
441       table.put(puts);
442     }
443 
444     @Override
445     public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
446         byte[] value, Put put) throws IOException {
447       checkState();
448       return table.checkAndPut(row, family, qualifier, value, put);
449     }
450 
451     @Override
452     public void delete(Delete delete) throws IOException {
453       checkState();
454       table.delete(delete);
455     }
456 
457     @Override
458     public void delete(List<Delete> deletes) throws IOException {
459       checkState();
460       table.delete(deletes);
461     }
462 
463     @Override
464     public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
465         byte[] value, Delete delete) throws IOException {
466       checkState();
467       return table.checkAndDelete(row, family, qualifier, value, delete);
468     }
469 
470     @Override
471     public Result increment(Increment increment) throws IOException {
472       checkState();
473       return table.increment(increment);
474     }
475 
476     @Override
477     public long incrementColumnValue(byte[] row, byte[] family,
478         byte[] qualifier, long amount) throws IOException {
479       checkState();
480       return table.incrementColumnValue(row, family, qualifier, amount);
481     }
482 
483     @Override
484     public long incrementColumnValue(byte[] row, byte[] family,
485         byte[] qualifier, long amount, Durability durability) throws IOException {
486       checkState();
487       return table.incrementColumnValue(row, family, qualifier, amount,
488           durability);
489     }
490 
491     @Override
492     public boolean isAutoFlush() {
493       checkState();
494       return table.isAutoFlush();
495     }
496 
497     @Override
498     public void flushCommits() throws IOException {
499       checkState();
500       table.flushCommits();
501     }
502 
503     /**
504      * Returns the actual table back to the pool
505      *
506      * @throws IOException
507      */
508     public void close() throws IOException {
509       checkState();
510       open = false;
511       returnTable(table);
512     }
513 
514     @Override
515     public CoprocessorRpcChannel coprocessorService(byte[] row) {
516       checkState();
517       return table.coprocessorService(row);
518     }
519 
520     @Override
521     public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
522         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
523         throws ServiceException, Throwable {
524       checkState();
525       return table.coprocessorService(service, startKey, endKey, callable);
526     }
527 
528     @Override
529     public <T extends Service, R> void coprocessorService(Class<T> service,
530         byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
531         throws ServiceException, Throwable {
532       checkState();
533       table.coprocessorService(service, startKey, endKey, callable, callback);
534     }
535 
536     @Override
537     public String toString() {
538       return "PooledHTable{" + ", table=" + table + '}';
539     }
540 
541     /**
542      * Expose the wrapped HTable to tests in the same package
543      *
544      * @return wrapped htable
545      */
546     HTableInterface getWrappedTable() {
547       return table;
548     }
549 
550     @Override
551     public <R> void batchCallback(List<? extends Row> actions,
552         Object[] results, Callback<R> callback) throws IOException,
553         InterruptedException {
554       checkState();
555       table.batchCallback(actions, results, callback);
556     }
557 
558     /**
559      * {@inheritDoc}
560      * @deprecated If any exception is thrown by one of the actions, there is no way to
561      * retrieve the partially executed results. Use
562      * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
563      * instead.
564      */
565     @Override
566     public <R> Object[] batchCallback(List<? extends Row> actions,
567         Callback<R> callback) throws IOException, InterruptedException {
568       checkState();
569       return table.batchCallback(actions,  callback);
570     }
571 
572     @Override
573     public void mutateRow(RowMutations rm) throws IOException {
574       checkState();
575       table.mutateRow(rm);
576     }
577 
578     @Override
579     public Result append(Append append) throws IOException {
580       checkState();
581       return table.append(append);
582     }
583 
584     @Override
585     public void setAutoFlush(boolean autoFlush) {
586       checkState();
587       table.setAutoFlush(autoFlush, autoFlush);
588     }
589 
590     @Override
591     public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
592       checkState();
593       table.setAutoFlush(autoFlush, clearBufferOnFail);
594     }
595 
596     @Override
597     public void setAutoFlushTo(boolean autoFlush) {
598       table.setAutoFlushTo(autoFlush);
599     }
600 
601     @Override
602     public long getWriteBufferSize() {
603       checkState();
604       return table.getWriteBufferSize();
605     }
606 
607     @Override
608     public void setWriteBufferSize(long writeBufferSize) throws IOException {
609       checkState();
610       table.setWriteBufferSize(writeBufferSize);
611     }
612 
613     boolean isOpen() {
614       return open;
615     }
616 
617     private void checkState() {
618       if (!isOpen()) {
619         throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
620       }
621     }
622 
623     @Override
624     public long incrementColumnValue(byte[] row, byte[] family,
625         byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
626       return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
627     }
628 
629     @Override
630     public <R extends Message> Map<byte[], R> batchCoprocessorService(
631         Descriptors.MethodDescriptor method, Message request,
632         byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
633       checkState();
634       return table.batchCoprocessorService(method, request, startKey, endKey,
635           responsePrototype);
636     }
637 
638     @Override
639     public <R extends Message> void batchCoprocessorService(
640         Descriptors.MethodDescriptor method, Message request,
641         byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
642         throws ServiceException, Throwable {
643       checkState();
644       table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
645     }
646   }
647 }