1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.apache.hadoop.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HTableDescriptor;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.coprocessor.Batch;
33 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
34 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.PoolMap;
37 import org.apache.hadoop.hbase.util.PoolMap.PoolType;
38
39 import com.google.protobuf.Descriptors;
40 import com.google.protobuf.Message;
41 import com.google.protobuf.Service;
42 import com.google.protobuf.ServiceException;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 @InterfaceAudience.Private
68 @Deprecated
69 public class HTablePool implements Closeable {
70 private final PoolMap<String, HTableInterface> tables;
71 private final int maxSize;
72 private final PoolType poolType;
73 private final Configuration config;
74 private final HTableInterfaceFactory tableFactory;
75
76
77
78
79 public HTablePool() {
80 this(HBaseConfiguration.create(), Integer.MAX_VALUE);
81 }
82
83
84
85
86
87
88
89
90
91 public HTablePool(final Configuration config, final int maxSize) {
92 this(config, maxSize, null, null);
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106 public HTablePool(final Configuration config, final int maxSize,
107 final HTableInterfaceFactory tableFactory) {
108 this(config, maxSize, tableFactory, PoolType.Reusable);
109 }
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public HTablePool(final Configuration config, final int maxSize,
124 final PoolType poolType) {
125 this(config, maxSize, null, poolType);
126 }
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 public HTablePool(final Configuration config, final int maxSize,
146 final HTableInterfaceFactory tableFactory, PoolType poolType) {
147
148
149 this.config = config == null ? HBaseConfiguration.create() : config;
150 this.maxSize = maxSize;
151 this.tableFactory = tableFactory == null ? new HTableFactory()
152 : tableFactory;
153 if (poolType == null) {
154 this.poolType = PoolType.Reusable;
155 } else {
156 switch (poolType) {
157 case Reusable:
158 case ThreadLocal:
159 this.poolType = poolType;
160 break;
161 default:
162 this.poolType = PoolType.Reusable;
163 break;
164 }
165 }
166 this.tables = new PoolMap<String, HTableInterface>(this.poolType,
167 this.maxSize);
168 }
169
170
171
172
173
174
175
176
177
178
179
180
181 public HTableInterface getTable(String tableName) {
182
183 HTableInterface table = findOrCreateTable(tableName);
184
185
186 return new PooledHTable(table);
187 }
188
189
190
191
192
193
194
195
196
197
198
199
200
201 private HTableInterface findOrCreateTable(String tableName) {
202 HTableInterface table = tables.get(tableName);
203 if (table == null) {
204 table = createHTable(tableName);
205 }
206 return table;
207 }
208
209
210
211
212
213
214
215
216
217
218
219
220
221 public HTableInterface getTable(byte[] tableName) {
222 return getTable(Bytes.toString(tableName));
223 }
224
225
226
227
228
229
230
231
232
233 public void putTable(HTableInterface table) throws IOException {
234
235
236
237
238
239
240 if (table instanceof PooledHTable) {
241 returnTable(((PooledHTable) table).getWrappedTable());
242 } else {
243
244
245
246
247 throw new IllegalArgumentException("not a pooled table: " + table);
248 }
249 }
250
251
252
253
254
255
256
257
258
259
260
261 private void returnTable(HTableInterface table) throws IOException {
262
263 String tableName = Bytes.toString(table.getTableName());
264 if (tables.size(tableName) >= maxSize) {
265
266 this.tables.removeValue(tableName, table);
267 this.tableFactory.releaseHTableInterface(table);
268 return;
269 }
270 tables.put(tableName, table);
271 }
272
273 protected HTableInterface createHTable(String tableName) {
274 return this.tableFactory.createHTableInterface(config,
275 Bytes.toBytes(tableName));
276 }
277
278
279
280
281
282
283
284
285
286
287
288 public void closeTablePool(final String tableName) throws IOException {
289 Collection<HTableInterface> tables = this.tables.values(tableName);
290 if (tables != null) {
291 for (HTableInterface table : tables) {
292 this.tableFactory.releaseHTableInterface(table);
293 }
294 }
295 this.tables.remove(tableName);
296 }
297
298
299
300
301
302
303 public void closeTablePool(final byte[] tableName) throws IOException {
304 closeTablePool(Bytes.toString(tableName));
305 }
306
307
308
309
310
311
312
313 public void close() throws IOException {
314 for (String tableName : tables.keySet()) {
315 closeTablePool(tableName);
316 }
317 this.tables.clear();
318 }
319
320 public int getCurrentPoolSize(String tableName) {
321 return tables.size(tableName);
322 }
323
324
325
326
327
328
329 class PooledHTable implements HTableInterface {
330
331 private boolean open = false;
332
333 private HTableInterface table;
334
335 public PooledHTable(HTableInterface table) {
336 this.table = table;
337 this.open = true;
338 }
339
340 @Override
341 public byte[] getTableName() {
342 checkState();
343 return table.getTableName();
344 }
345
346 @Override
347 public TableName getName() {
348 return table.getName();
349 }
350
351 @Override
352 public Configuration getConfiguration() {
353 checkState();
354 return table.getConfiguration();
355 }
356
357 @Override
358 public HTableDescriptor getTableDescriptor() throws IOException {
359 checkState();
360 return table.getTableDescriptor();
361 }
362
363 @Override
364 public boolean exists(Get get) throws IOException {
365 checkState();
366 return table.exists(get);
367 }
368
369 @Override
370 public Boolean[] exists(List<Get> gets) throws IOException {
371 checkState();
372 return table.exists(gets);
373 }
374
375 @Override
376 public void batch(List<? extends Row> actions, Object[] results) throws IOException,
377 InterruptedException {
378 checkState();
379 table.batch(actions, results);
380 }
381
382
383
384
385
386
387 @Override
388 public Object[] batch(List<? extends Row> actions) throws IOException,
389 InterruptedException {
390 checkState();
391 return table.batch(actions);
392 }
393
394 @Override
395 public Result get(Get get) throws IOException {
396 checkState();
397 return table.get(get);
398 }
399
400 @Override
401 public Result[] get(List<Get> gets) throws IOException {
402 checkState();
403 return table.get(gets);
404 }
405
406 @Override
407 @SuppressWarnings("deprecation")
408 public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
409 checkState();
410 return table.getRowOrBefore(row, family);
411 }
412
413 @Override
414 public ResultScanner getScanner(Scan scan) throws IOException {
415 checkState();
416 return table.getScanner(scan);
417 }
418
419 @Override
420 public ResultScanner getScanner(byte[] family) throws IOException {
421 checkState();
422 return table.getScanner(family);
423 }
424
425 @Override
426 public ResultScanner getScanner(byte[] family, byte[] qualifier)
427 throws IOException {
428 checkState();
429 return table.getScanner(family, qualifier);
430 }
431
432 @Override
433 public void put(Put put) throws IOException {
434 checkState();
435 table.put(put);
436 }
437
438 @Override
439 public void put(List<Put> puts) throws IOException {
440 checkState();
441 table.put(puts);
442 }
443
444 @Override
445 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
446 byte[] value, Put put) throws IOException {
447 checkState();
448 return table.checkAndPut(row, family, qualifier, value, put);
449 }
450
451 @Override
452 public void delete(Delete delete) throws IOException {
453 checkState();
454 table.delete(delete);
455 }
456
457 @Override
458 public void delete(List<Delete> deletes) throws IOException {
459 checkState();
460 table.delete(deletes);
461 }
462
463 @Override
464 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
465 byte[] value, Delete delete) throws IOException {
466 checkState();
467 return table.checkAndDelete(row, family, qualifier, value, delete);
468 }
469
470 @Override
471 public Result increment(Increment increment) throws IOException {
472 checkState();
473 return table.increment(increment);
474 }
475
476 @Override
477 public long incrementColumnValue(byte[] row, byte[] family,
478 byte[] qualifier, long amount) throws IOException {
479 checkState();
480 return table.incrementColumnValue(row, family, qualifier, amount);
481 }
482
483 @Override
484 public long incrementColumnValue(byte[] row, byte[] family,
485 byte[] qualifier, long amount, Durability durability) throws IOException {
486 checkState();
487 return table.incrementColumnValue(row, family, qualifier, amount,
488 durability);
489 }
490
491 @Override
492 public boolean isAutoFlush() {
493 checkState();
494 return table.isAutoFlush();
495 }
496
497 @Override
498 public void flushCommits() throws IOException {
499 checkState();
500 table.flushCommits();
501 }
502
503
504
505
506
507
508 public void close() throws IOException {
509 checkState();
510 open = false;
511 returnTable(table);
512 }
513
514 @Override
515 public CoprocessorRpcChannel coprocessorService(byte[] row) {
516 checkState();
517 return table.coprocessorService(row);
518 }
519
520 @Override
521 public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
522 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
523 throws ServiceException, Throwable {
524 checkState();
525 return table.coprocessorService(service, startKey, endKey, callable);
526 }
527
528 @Override
529 public <T extends Service, R> void coprocessorService(Class<T> service,
530 byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
531 throws ServiceException, Throwable {
532 checkState();
533 table.coprocessorService(service, startKey, endKey, callable, callback);
534 }
535
536 @Override
537 public String toString() {
538 return "PooledHTable{" + ", table=" + table + '}';
539 }
540
541
542
543
544
545
546 HTableInterface getWrappedTable() {
547 return table;
548 }
549
550 @Override
551 public <R> void batchCallback(List<? extends Row> actions,
552 Object[] results, Callback<R> callback) throws IOException,
553 InterruptedException {
554 checkState();
555 table.batchCallback(actions, results, callback);
556 }
557
558
559
560
561
562
563
564
565 @Override
566 public <R> Object[] batchCallback(List<? extends Row> actions,
567 Callback<R> callback) throws IOException, InterruptedException {
568 checkState();
569 return table.batchCallback(actions, callback);
570 }
571
572 @Override
573 public void mutateRow(RowMutations rm) throws IOException {
574 checkState();
575 table.mutateRow(rm);
576 }
577
578 @Override
579 public Result append(Append append) throws IOException {
580 checkState();
581 return table.append(append);
582 }
583
584 @Override
585 public void setAutoFlush(boolean autoFlush) {
586 checkState();
587 table.setAutoFlush(autoFlush, autoFlush);
588 }
589
590 @Override
591 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
592 checkState();
593 table.setAutoFlush(autoFlush, clearBufferOnFail);
594 }
595
596 @Override
597 public void setAutoFlushTo(boolean autoFlush) {
598 table.setAutoFlushTo(autoFlush);
599 }
600
601 @Override
602 public long getWriteBufferSize() {
603 checkState();
604 return table.getWriteBufferSize();
605 }
606
607 @Override
608 public void setWriteBufferSize(long writeBufferSize) throws IOException {
609 checkState();
610 table.setWriteBufferSize(writeBufferSize);
611 }
612
613 boolean isOpen() {
614 return open;
615 }
616
617 private void checkState() {
618 if (!isOpen()) {
619 throw new IllegalStateException("Table=" + new String(table.getTableName()) + " already closed");
620 }
621 }
622
623 @Override
624 public long incrementColumnValue(byte[] row, byte[] family,
625 byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
626 return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
627 }
628
629 @Override
630 public <R extends Message> Map<byte[], R> batchCoprocessorService(
631 Descriptors.MethodDescriptor method, Message request,
632 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
633 checkState();
634 return table.batchCoprocessorService(method, request, startKey, endKey,
635 responsePrototype);
636 }
637
638 @Override
639 public <R extends Message> void batchCoprocessorService(
640 Descriptors.MethodDescriptor method, Message request,
641 byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
642 throws ServiceException, Throwable {
643 checkState();
644 table.batchCoprocessorService(method, request, startKey, endKey, responsePrototype, callback);
645 }
646 }
647 }