1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.NavigableMap;
30 import java.util.TreeMap;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.SynchronousQueue;
36 import java.util.concurrent.ThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
38
39 import com.google.protobuf.InvalidProtocolBufferException;
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.classification.InterfaceAudience;
43 import org.apache.hadoop.classification.InterfaceStability;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.hbase.Cell;
46 import org.apache.hadoop.hbase.DoNotRetryIOException;
47 import org.apache.hadoop.hbase.HBaseConfiguration;
48 import org.apache.hadoop.hbase.HConstants;
49 import org.apache.hadoop.hbase.HRegionInfo;
50 import org.apache.hadoop.hbase.HRegionLocation;
51 import org.apache.hadoop.hbase.HTableDescriptor;
52 import org.apache.hadoop.hbase.KeyValue;
53 import org.apache.hadoop.hbase.KeyValueUtil;
54 import org.apache.hadoop.hbase.ServerName;
55 import org.apache.hadoop.hbase.TableName;
56 import org.apache.hadoop.hbase.client.coprocessor.Batch;
57 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
58 import org.apache.hadoop.hbase.filter.BinaryComparator;
59 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
60 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
61 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
62 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
63 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
64 import org.apache.hadoop.hbase.protobuf.RequestConverter;
65 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
66 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
67 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
68 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
69 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
70 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
71 import org.apache.hadoop.hbase.util.Bytes;
72 import org.apache.hadoop.hbase.util.Pair;
73 import org.apache.hadoop.hbase.util.Threads;
74
75 import com.google.protobuf.Descriptors;
76 import com.google.protobuf.GeneratedMessage;
77 import com.google.protobuf.Message;
78 import com.google.protobuf.Service;
79 import com.google.protobuf.ServiceException;
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 @InterfaceAudience.Public
123 @InterfaceStability.Stable
124 public class HTable implements HTableInterface {
125 private static final Log LOG = LogFactory.getLog(HTable.class);
126 protected HConnection connection;
127 private final TableName tableName;
128 private volatile Configuration configuration;
129 protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
130 private long writeBufferSize;
131 private boolean clearBufferOnFail;
132 private boolean autoFlush;
133 protected long currentWriteBufferSize;
134 protected int scannerCaching;
135 private int maxKeyValueSize;
136 private ExecutorService pool;
137 private boolean closed;
138 private int operationTimeout;
139 private final boolean cleanupPoolOnClose;
140 private final boolean cleanupConnectionOnClose;
141
142
143 protected AsyncProcess<Object> ap;
144 private RpcRetryingCallerFactory rpcCallerFactory;
145 private RpcControllerFactory rpcControllerFactory;
146
147
148
149
150
151
152
153
154
155
156
157 public HTable(Configuration conf, final String tableName)
158 throws IOException {
159 this(conf, TableName.valueOf(tableName));
160 }
161
162
163
164
165
166
167
168
169
170
171
172 public HTable(Configuration conf, final byte[] tableName)
173 throws IOException {
174 this(conf, TableName.valueOf(tableName));
175 }
176
177
178
179
180
181
182
183
184
185
186
187
188
189 public HTable(Configuration conf, final TableName tableName)
190 throws IOException {
191 this.tableName = tableName;
192 this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
193 if (conf == null) {
194 this.connection = null;
195 return;
196 }
197 this.connection = HConnectionManager.getConnection(conf);
198 this.configuration = conf;
199
200 this.pool = getDefaultExecutor(conf);
201 this.finishSetup();
202 }
203
204
205
206
207
208
209
210
211
212 public HTable(TableName tableName, HConnection connection) throws IOException {
213 this.tableName = tableName;
214 this.cleanupPoolOnClose = true;
215 this.cleanupConnectionOnClose = false;
216 this.connection = connection;
217 this.configuration = connection.getConfiguration();
218
219 this.pool = getDefaultExecutor(this.configuration);
220 this.finishSetup();
221 }
222
223 public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
224 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
225 if (maxThreads == 0) {
226 maxThreads = 1;
227 }
228 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
229
230
231
232
233
234 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
235 new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
236 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
237 return pool;
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251
252 public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
253 throws IOException {
254 this(conf, TableName.valueOf(tableName), pool);
255 }
256
257
258
259
260
261
262
263
264
265
266
267
268
269 public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
270 throws IOException {
271 this.connection = HConnectionManager.getConnection(conf);
272 this.configuration = conf;
273 this.pool = pool;
274 this.tableName = tableName;
275 this.cleanupPoolOnClose = false;
276 this.cleanupConnectionOnClose = true;
277
278 this.finishSetup();
279 }
280
281
282
283
284
285
286
287
288
289
290
291
292 public HTable(final byte[] tableName, final HConnection connection,
293 final ExecutorService pool) throws IOException {
294 this(TableName.valueOf(tableName), connection, pool);
295 }
296
297
298
299
300
301
302
303
304
305
306
307
308 public HTable(TableName tableName, final HConnection connection,
309 final ExecutorService pool) throws IOException {
310 if (connection == null || connection.isClosed()) {
311 throw new IllegalArgumentException("Connection is null or closed.");
312 }
313 this.tableName = tableName;
314 this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
315 this.connection = connection;
316 this.configuration = connection.getConfiguration();
317 this.pool = pool;
318
319 this.finishSetup();
320 }
321
322
323
324
325 protected HTable(){
326 tableName = null;
327 cleanupPoolOnClose = false;
328 cleanupConnectionOnClose = false;
329 }
330
331
332
333
334 private void finishSetup() throws IOException {
335 this.operationTimeout = tableName.isSystemTable() ?
336 this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
337 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT):
338 this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
339 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
340 this.writeBufferSize = this.configuration.getLong(
341 "hbase.client.write.buffer", 2097152);
342 this.clearBufferOnFail = true;
343 this.autoFlush = true;
344 this.currentWriteBufferSize = 0;
345 this.scannerCaching = this.configuration.getInt(
346 HConstants.HBASE_CLIENT_SCANNER_CACHING,
347 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
348
349 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
350 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
351 ap = new AsyncProcess<Object>(connection, tableName, pool, null,
352 configuration, rpcCallerFactory, rpcControllerFactory);
353
354 this.maxKeyValueSize = this.configuration.getInt(
355 "hbase.client.keyvalue.maxsize", -1);
356 this.closed = false;
357 }
358
359
360
361
362
363
364 @Override
365 public Configuration getConfiguration() {
366 return configuration;
367 }
368
369
370
371
372
373
374
375
376
377
378 @Deprecated
379 public static boolean isTableEnabled(String tableName) throws IOException {
380 return isTableEnabled(TableName.valueOf(tableName));
381 }
382
383
384
385
386
387
388
389
390
391
392 @Deprecated
393 public static boolean isTableEnabled(byte[] tableName) throws IOException {
394 return isTableEnabled(TableName.valueOf(tableName));
395 }
396
397
398
399
400
401
402
403
404
405
406 @Deprecated
407 public static boolean isTableEnabled(TableName tableName) throws IOException {
408 return isTableEnabled(HBaseConfiguration.create(), tableName);
409 }
410
411
412
413
414
415
416
417
418
419 @Deprecated
420 public static boolean isTableEnabled(Configuration conf, String tableName)
421 throws IOException {
422 return isTableEnabled(conf, TableName.valueOf(tableName));
423 }
424
425
426
427
428
429
430
431
432
433 @Deprecated
434 public static boolean isTableEnabled(Configuration conf, byte[] tableName)
435 throws IOException {
436 return isTableEnabled(conf, TableName.valueOf(tableName));
437 }
438
439
440
441
442
443
444
445
446
447 @Deprecated
448 public static boolean isTableEnabled(Configuration conf,
449 final TableName tableName) throws IOException {
450 return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
451 @Override
452 public Boolean connect(HConnection connection) throws IOException {
453 return connection.isTableEnabled(tableName);
454 }
455 });
456 }
457
458
459
460
461
462
463
464 public HRegionLocation getRegionLocation(final String row)
465 throws IOException {
466 return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
467 }
468
469
470
471
472
473
474
475 public HRegionLocation getRegionLocation(final byte [] row)
476 throws IOException {
477 return connection.getRegionLocation(tableName, row, false);
478 }
479
480
481
482
483
484
485
486
487 public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
488 throws IOException {
489 return connection.getRegionLocation(tableName, row, reload);
490 }
491
492
493
494
495 @Override
496 public byte [] getTableName() {
497 return this.tableName.getName();
498 }
499
500 @Override
501 public TableName getName() {
502 return tableName;
503 }
504
505
506
507
508
509
510
511
512 @Deprecated
513 public HConnection getConnection() {
514 return this.connection;
515 }
516
517
518
519
520
521
522
523 @Deprecated
524 public int getScannerCaching() {
525 return scannerCaching;
526 }
527
528
529
530
531
532 @Deprecated
533 public List<Row> getWriteBuffer() {
534 return writeAsyncBuffer;
535 }
536
537
538
539
540
541
542
543
544
545
546
547
548 @Deprecated
549 public void setScannerCaching(int scannerCaching) {
550 this.scannerCaching = scannerCaching;
551 }
552
553
554
555
556 @Override
557 public HTableDescriptor getTableDescriptor() throws IOException {
558 return new UnmodifyableHTableDescriptor(
559 this.connection.getHTableDescriptor(this.tableName));
560 }
561
562
563
564
565
566
567
568
569 public byte [][] getStartKeys() throws IOException {
570 return getStartEndKeys().getFirst();
571 }
572
573
574
575
576
577
578
579
580 public byte[][] getEndKeys() throws IOException {
581 return getStartEndKeys().getSecond();
582 }
583
584
585
586
587
588
589
590
591
592 public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
593 NavigableMap<HRegionInfo, ServerName> regions = getRegionLocations();
594 final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
595 final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
596
597 for (HRegionInfo region : regions.keySet()) {
598 startKeyList.add(region.getStartKey());
599 endKeyList.add(region.getEndKey());
600 }
601
602 return new Pair<byte [][], byte [][]>(
603 startKeyList.toArray(new byte[startKeyList.size()][]),
604 endKeyList.toArray(new byte[endKeyList.size()][]));
605 }
606
607
608
609
610
611
612
613
614 public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
615
616 return MetaScanner.allTableRegions(getConfiguration(), this.connection, getName(), false);
617 }
618
619
620
621
622
623
624
625
626
627
628 public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
629 final byte [] endKey) throws IOException {
630 return getRegionsInRange(startKey, endKey, false);
631 }
632
633
634
635
636
637
638
639
640
641
642
643 public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
644 final byte [] endKey, final boolean reload) throws IOException {
645 return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
646 }
647
648
649
650
651
652
653
654
655
656
657
658
659 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
660 final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
661 throws IOException {
662 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
663 }
664
665
666
667
668
669
670
671
672
673
674
675
676
677 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
678 final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
679 final boolean reload) throws IOException {
680 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
681 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
682 throw new IllegalArgumentException(
683 "Invalid range: " + Bytes.toStringBinary(startKey) +
684 " > " + Bytes.toStringBinary(endKey));
685 }
686 List<byte[]> keysInRange = new ArrayList<byte[]>();
687 List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
688 byte[] currentKey = startKey;
689 do {
690 HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
691 keysInRange.add(currentKey);
692 regionsInRange.add(regionLocation);
693 currentKey = regionLocation.getRegionInfo().getEndKey();
694 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
695 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
696 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
697 return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
698 regionsInRange);
699 }
700
701
702
703
704 @Override
705 public Result getRowOrBefore(final byte[] row, final byte[] family)
706 throws IOException {
707 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
708 tableName, row) {
709 public Result call() throws IOException {
710 return ProtobufUtil.getRowOrBefore(getStub(), getLocation().getRegionInfo()
711 .getRegionName(), row, family, rpcControllerFactory.newController());
712 }
713 };
714 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
715 }
716
717
718
719
720 @Override
721 public ResultScanner getScanner(final Scan scan) throws IOException {
722 if (scan.getCaching() <= 0) {
723 scan.setCaching(getScannerCaching());
724 }
725
726 if (scan.isReversed()) {
727 if (scan.isSmall()) {
728 return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
729 this.connection);
730 } else {
731 return new ReversedClientScanner(getConfiguration(), scan, getName(), this.connection);
732 }
733 }
734
735 if (scan.isSmall()) {
736 return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection);
737 } else {
738 return new ClientScanner(getConfiguration(), scan, getName(), this.connection);
739 }
740 }
741
742
743
744
745 @Override
746 public ResultScanner getScanner(byte [] family) throws IOException {
747 Scan scan = new Scan();
748 scan.addFamily(family);
749 return getScanner(scan);
750 }
751
752
753
754
755 @Override
756 public ResultScanner getScanner(byte [] family, byte [] qualifier)
757 throws IOException {
758 Scan scan = new Scan();
759 scan.addColumn(family, qualifier);
760 return getScanner(scan);
761 }
762
763
764
765
766 @Override
767 public Result get(final Get get) throws IOException {
768
769
770
771 final PayloadCarryingRpcController controller = rpcControllerFactory.newController();
772 controller.setPriority(tableName);
773 RegionServerCallable<Result> callable =
774 new RegionServerCallable<Result>(this.connection, getName(), get.getRow()) {
775 public Result call() throws IOException {
776 return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get,
777 controller);
778 }
779 };
780 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
781 }
782
783
784
785
786 @Override
787 public Result[] get(List<Get> gets) throws IOException {
788 if (gets.size() == 1) {
789 return new Result[]{get(gets.get(0))};
790 }
791 try {
792 Object [] r1 = batch((List)gets);
793
794
795 Result [] results = new Result[r1.length];
796 int i=0;
797 for (Object o : r1) {
798
799 results[i++] = (Result) o;
800 }
801
802 return results;
803 } catch (InterruptedException e) {
804 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
805 }
806 }
807
808
809
810
811 @Override
812 public void batch(final List<?extends Row> actions, final Object[] results)
813 throws InterruptedException, IOException {
814 batchCallback(actions, results, null);
815 }
816
817
818
819
820
821
822 @Override
823 public Object[] batch(final List<? extends Row> actions)
824 throws InterruptedException, IOException {
825 return batchCallback(actions, null);
826 }
827
828
829
830
831 @Override
832 public <R> void batchCallback(
833 final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
834 throws IOException, InterruptedException {
835 connection.processBatchCallback(actions, tableName, pool, results, callback);
836 }
837
838
839
840
841
842
843
844
845 @Override
846 public <R> Object[] batchCallback(
847 final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
848 InterruptedException {
849 Object[] results = new Object[actions.size()];
850 batchCallback(actions, results, callback);
851 return results;
852 }
853
854
855
856
857 @Override
858 public void delete(final Delete delete)
859 throws IOException {
860 RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
861 tableName, delete.getRow()) {
862 public Boolean call() throws IOException {
863 try {
864 MutateRequest request = RequestConverter.buildMutateRequest(
865 getLocation().getRegionInfo().getRegionName(), delete);
866 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
867 controller.setPriority(tableName);
868 MutateResponse response = getStub().mutate(controller, request);
869 return Boolean.valueOf(response.getProcessed());
870 } catch (ServiceException se) {
871 throw ProtobufUtil.getRemoteException(se);
872 }
873 }
874 };
875 rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
876 }
877
878
879
880
881 @Override
882 public void delete(final List<Delete> deletes)
883 throws IOException {
884 Object[] results = new Object[deletes.size()];
885 try {
886 batch(deletes, results);
887 } catch (InterruptedException e) {
888 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
889 } finally {
890
891
892
893 for (int i = results.length - 1; i>=0; i--) {
894
895 if (results[i] instanceof Result) {
896 deletes.remove(i);
897 }
898 }
899 }
900 }
901
902
903
904
905 @Override
906 public void put(final Put put)
907 throws InterruptedIOException, RetriesExhaustedWithDetailsException {
908 doPut(put);
909 if (autoFlush) {
910 flushCommits();
911 }
912 }
913
914
915
916
917 @Override
918 public void put(final List<Put> puts)
919 throws InterruptedIOException, RetriesExhaustedWithDetailsException {
920 for (Put put : puts) {
921 doPut(put);
922 }
923 if (autoFlush) {
924 flushCommits();
925 }
926 }
927
928
929
930
931
932
933
934
935 private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
936 if (ap.hasError()){
937 writeAsyncBuffer.add(put);
938 backgroundFlushCommits(true);
939 }
940
941 validatePut(put);
942
943 currentWriteBufferSize += put.heapSize();
944 writeAsyncBuffer.add(put);
945
946 while (currentWriteBufferSize > writeBufferSize) {
947 backgroundFlushCommits(false);
948 }
949 }
950
951
952
953
954
955
956
957
958
959 private void backgroundFlushCommits(boolean synchronous) throws
960 InterruptedIOException, RetriesExhaustedWithDetailsException {
961
962 try {
963 do {
964 ap.submit(writeAsyncBuffer, true);
965 } while (synchronous && !writeAsyncBuffer.isEmpty());
966
967 if (synchronous) {
968 ap.waitUntilDone();
969 }
970
971 if (ap.hasError()) {
972 LOG.debug(tableName + ": One or more of the operations have failed -" +
973 " waiting for all operation in progress to finish (successfully or not)");
974 while (!writeAsyncBuffer.isEmpty()) {
975 ap.submit(writeAsyncBuffer, true);
976 }
977 ap.waitUntilDone();
978
979 if (!clearBufferOnFail) {
980
981
982 writeAsyncBuffer.addAll(ap.getFailedOperations());
983 }
984 RetriesExhaustedWithDetailsException e = ap.getErrors();
985 ap.clearErrors();
986 throw e;
987 }
988 } finally {
989 currentWriteBufferSize = 0;
990 for (Row mut : writeAsyncBuffer) {
991 if (mut instanceof Mutation) {
992 currentWriteBufferSize += ((Mutation) mut).heapSize();
993 }
994 }
995 }
996 }
997
998
999
1000
1001 @Override
1002 public void mutateRow(final RowMutations rm) throws IOException {
1003 RegionServerCallable<Void> callable =
1004 new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1005 public Void call() throws IOException {
1006 try {
1007 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1008 getLocation().getRegionInfo().getRegionName(), rm);
1009 regionMutationBuilder.setAtomic(true);
1010 MultiRequest request =
1011 MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1012 PayloadCarryingRpcController pcrc = rpcControllerFactory.newController();
1013 pcrc.setPriority(tableName);
1014 getStub().multi(pcrc, request);
1015 } catch (ServiceException se) {
1016 throw ProtobufUtil.getRemoteException(se);
1017 }
1018 return null;
1019 }
1020 };
1021 rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1022 }
1023
1024
1025
1026
1027 @Override
1028 public Result append(final Append append) throws IOException {
1029 if (append.numFamilies() == 0) {
1030 throw new IOException(
1031 "Invalid arguments to append, no columns specified");
1032 }
1033
1034 NonceGenerator ng = this.connection.getNonceGenerator();
1035 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1036 RegionServerCallable<Result> callable =
1037 new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1038 public Result call() throws IOException {
1039 try {
1040 MutateRequest request = RequestConverter.buildMutateRequest(
1041 getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1042 PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1043 rpcController.setPriority(getTableName());
1044 MutateResponse response = getStub().mutate(rpcController, request);
1045 if (!response.hasResult()) return null;
1046 return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1047 } catch (ServiceException se) {
1048 throw ProtobufUtil.getRemoteException(se);
1049 }
1050 }
1051 };
1052 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1053 }
1054
1055
1056
1057
1058 @Override
1059 public Result increment(final Increment increment) throws IOException {
1060 if (!increment.hasFamilies()) {
1061 throw new IOException(
1062 "Invalid arguments to increment, no columns specified");
1063 }
1064 NonceGenerator ng = this.connection.getNonceGenerator();
1065 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1066 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1067 getName(), increment.getRow()) {
1068 public Result call() throws IOException {
1069 try {
1070 MutateRequest request = RequestConverter.buildMutateRequest(
1071 getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1072 PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1073 rpcController.setPriority(getTableName());
1074 MutateResponse response = getStub().mutate(rpcController, request);
1075 return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1076 } catch (ServiceException se) {
1077 throw ProtobufUtil.getRemoteException(se);
1078 }
1079 }
1080 };
1081 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1082 }
1083
1084
1085
1086
1087 @Override
1088 public long incrementColumnValue(final byte [] row, final byte [] family,
1089 final byte [] qualifier, final long amount)
1090 throws IOException {
1091 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1092 }
1093
1094
1095
1096
1097 @Deprecated
1098 @Override
1099 public long incrementColumnValue(final byte [] row, final byte [] family,
1100 final byte [] qualifier, final long amount, final boolean writeToWAL)
1101 throws IOException {
1102 return incrementColumnValue(row, family, qualifier, amount,
1103 writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
1104 }
1105
1106
1107
1108
1109 @Override
1110 public long incrementColumnValue(final byte [] row, final byte [] family,
1111 final byte [] qualifier, final long amount, final Durability durability)
1112 throws IOException {
1113 NullPointerException npe = null;
1114 if (row == null) {
1115 npe = new NullPointerException("row is null");
1116 } else if (family == null) {
1117 npe = new NullPointerException("family is null");
1118 } else if (qualifier == null) {
1119 npe = new NullPointerException("qualifier is null");
1120 }
1121 if (npe != null) {
1122 throw new IOException(
1123 "Invalid arguments to incrementColumnValue", npe);
1124 }
1125
1126 NonceGenerator ng = this.connection.getNonceGenerator();
1127 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1128 RegionServerCallable<Long> callable =
1129 new RegionServerCallable<Long>(connection, getName(), row) {
1130 public Long call() throws IOException {
1131 try {
1132 MutateRequest request = RequestConverter.buildIncrementRequest(
1133 getLocation().getRegionInfo().getRegionName(), row, family,
1134 qualifier, amount, durability, nonceGroup, nonce);
1135 PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1136 rpcController.setPriority(getTableName());
1137 MutateResponse response = getStub().mutate(rpcController, request);
1138 Result result =
1139 ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
1140 return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1141 } catch (ServiceException se) {
1142 throw ProtobufUtil.getRemoteException(se);
1143 }
1144 }
1145 };
1146 return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1147 }
1148
1149
1150
1151
1152 @Override
1153 public boolean checkAndPut(final byte [] row,
1154 final byte [] family, final byte [] qualifier, final byte [] value,
1155 final Put put)
1156 throws IOException {
1157 RegionServerCallable<Boolean> callable =
1158 new RegionServerCallable<Boolean>(connection, getName(), row) {
1159 public Boolean call() throws IOException {
1160 try {
1161 MutateRequest request = RequestConverter.buildMutateRequest(
1162 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1163 new BinaryComparator(value), CompareType.EQUAL, put);
1164 PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1165 rpcController.setPriority(getTableName());
1166 MutateResponse response = getStub().mutate(rpcController, request);
1167 return Boolean.valueOf(response.getProcessed());
1168 } catch (ServiceException se) {
1169 throw ProtobufUtil.getRemoteException(se);
1170 }
1171 }
1172 };
1173 return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1174 }
1175
1176
1177
1178
1179
1180 @Override
1181 public boolean checkAndDelete(final byte [] row,
1182 final byte [] family, final byte [] qualifier, final byte [] value,
1183 final Delete delete)
1184 throws IOException {
1185 RegionServerCallable<Boolean> callable =
1186 new RegionServerCallable<Boolean>(connection, getName(), row) {
1187 public Boolean call() throws IOException {
1188 try {
1189 MutateRequest request = RequestConverter.buildMutateRequest(
1190 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1191 new BinaryComparator(value), CompareType.EQUAL, delete);
1192 PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
1193 rpcController.setPriority(getTableName());
1194 MutateResponse response = getStub().mutate(rpcController, request);
1195 return Boolean.valueOf(response.getProcessed());
1196 } catch (ServiceException se) {
1197 throw ProtobufUtil.getRemoteException(se);
1198 }
1199 }
1200 };
1201 return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1202 }
1203
1204
1205
1206
1207 @Override
1208 public boolean exists(final Get get) throws IOException {
1209 get.setCheckExistenceOnly(true);
1210 Result r = get(get);
1211 assert r.getExists() != null;
1212 return r.getExists();
1213 }
1214
1215
1216
1217
1218 @Override
1219 public Boolean[] exists(final List<Get> gets) throws IOException {
1220 if (gets.isEmpty()) return new Boolean[]{};
1221 if (gets.size() == 1) return new Boolean[]{exists(gets.get(0))};
1222
1223 for (Get g: gets){
1224 g.setCheckExistenceOnly(true);
1225 }
1226
1227 Object[] r1;
1228 try {
1229 r1 = batch(gets);
1230 } catch (InterruptedException e) {
1231 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1232 }
1233
1234
1235 Boolean[] results = new Boolean[r1.length];
1236 int i = 0;
1237 for (Object o : r1) {
1238
1239 results[i++] = ((Result)o).getExists();
1240 }
1241
1242 return results;
1243 }
1244
1245
1246
1247
1248 @Override
1249 public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
1250
1251
1252 backgroundFlushCommits(true);
1253 }
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266 public <R> void processBatchCallback(
1267 final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1268 throws IOException, InterruptedException {
1269 this.batchCallback(list, results, callback);
1270 }
1271
1272
1273
1274
1275
1276
1277 public void processBatch(final List<? extends Row> list, final Object[] results)
1278 throws IOException, InterruptedException {
1279
1280 this.processBatchCallback(list, results, null);
1281 }
1282
1283
1284 @Override
1285 public void close() throws IOException {
1286 if (this.closed) {
1287 return;
1288 }
1289 flushCommits();
1290 if (cleanupPoolOnClose) {
1291 this.pool.shutdown();
1292 }
1293 if (cleanupConnectionOnClose) {
1294 if (this.connection != null) {
1295 this.connection.close();
1296 }
1297 }
1298 this.closed = true;
1299 }
1300
1301
1302 public void validatePut(final Put put) throws IllegalArgumentException{
1303 if (put.isEmpty()) {
1304 throw new IllegalArgumentException("No columns to insert");
1305 }
1306 if (maxKeyValueSize > 0) {
1307 for (List<Cell> list : put.getFamilyCellMap().values()) {
1308 for (Cell cell : list) {
1309
1310 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
1311 if (kv.getLength() > maxKeyValueSize) {
1312 throw new IllegalArgumentException("KeyValue size too large");
1313 }
1314 }
1315 }
1316 }
1317 }
1318
1319
1320
1321
1322 @Override
1323 public boolean isAutoFlush() {
1324 return autoFlush;
1325 }
1326
1327
1328
1329
1330 @Deprecated
1331 @Override
1332 public void setAutoFlush(boolean autoFlush) {
1333 setAutoFlush(autoFlush, autoFlush);
1334 }
1335
1336
1337
1338
1339 @Override
1340 public void setAutoFlushTo(boolean autoFlush) {
1341 setAutoFlush(autoFlush, clearBufferOnFail);
1342 }
1343
1344
1345
1346
1347 @Override
1348 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1349 this.autoFlush = autoFlush;
1350 this.clearBufferOnFail = autoFlush || clearBufferOnFail;
1351 }
1352
1353
1354
1355
1356
1357
1358
1359
1360 @Override
1361 public long getWriteBufferSize() {
1362 return writeBufferSize;
1363 }
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373 public void setWriteBufferSize(long writeBufferSize) throws IOException {
1374 this.writeBufferSize = writeBufferSize;
1375 if(currentWriteBufferSize > writeBufferSize) {
1376 flushCommits();
1377 }
1378 }
1379
1380
1381
1382
1383
1384 ExecutorService getPool() {
1385 return this.pool;
1386 }
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397 public static void setRegionCachePrefetch(final byte[] tableName,
1398 final boolean enable) throws IOException {
1399 setRegionCachePrefetch(TableName.valueOf(tableName), enable);
1400 }
1401
1402 public static void setRegionCachePrefetch(
1403 final TableName tableName,
1404 final boolean enable) throws IOException {
1405 HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration.create()) {
1406 @Override
1407 public Void connect(HConnection connection) throws IOException {
1408 connection.setRegionCachePrefetch(tableName, enable);
1409 return null;
1410 }
1411 });
1412 }
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424 public static void setRegionCachePrefetch(final Configuration conf,
1425 final byte[] tableName, final boolean enable) throws IOException {
1426 setRegionCachePrefetch(conf, TableName.valueOf(tableName), enable);
1427 }
1428
1429 public static void setRegionCachePrefetch(final Configuration conf,
1430 final TableName tableName,
1431 final boolean enable) throws IOException {
1432 HConnectionManager.execute(new HConnectable<Void>(conf) {
1433 @Override
1434 public Void connect(HConnection connection) throws IOException {
1435 connection.setRegionCachePrefetch(tableName, enable);
1436 return null;
1437 }
1438 });
1439 }
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449 public static boolean getRegionCachePrefetch(final Configuration conf,
1450 final byte[] tableName) throws IOException {
1451 return getRegionCachePrefetch(conf, TableName.valueOf(tableName));
1452 }
1453
1454 public static boolean getRegionCachePrefetch(final Configuration conf,
1455 final TableName tableName) throws IOException {
1456 return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
1457 @Override
1458 public Boolean connect(HConnection connection) throws IOException {
1459 return connection.getRegionCachePrefetch(tableName);
1460 }
1461 });
1462 }
1463
1464
1465
1466
1467
1468
1469
1470
1471 public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1472 return getRegionCachePrefetch(TableName.valueOf(tableName));
1473 }
1474
1475 public static boolean getRegionCachePrefetch(
1476 final TableName tableName) throws IOException {
1477 return HConnectionManager.execute(new HConnectable<Boolean>(
1478 HBaseConfiguration.create()) {
1479 @Override
1480 public Boolean connect(HConnection connection) throws IOException {
1481 return connection.getRegionCachePrefetch(tableName);
1482 }
1483 });
1484 }
1485
1486
1487
1488
1489
1490 public void clearRegionCache() {
1491 this.connection.clearRegionCache();
1492 }
1493
1494
1495
1496
1497 public CoprocessorRpcChannel coprocessorService(byte[] row) {
1498 return new RegionCoprocessorRpcChannel(connection, tableName, row, rpcCallerFactory,
1499 rpcControllerFactory);
1500 }
1501
1502
1503
1504
1505 @Override
1506 public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1507 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1508 throws ServiceException, Throwable {
1509 final Map<byte[],R> results = Collections.synchronizedMap(
1510 new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1511 coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1512 public void update(byte[] region, byte[] row, R value) {
1513 if (region != null) {
1514 results.put(region, value);
1515 }
1516 }
1517 });
1518 return results;
1519 }
1520
1521
1522
1523
1524 @Override
1525 public <T extends Service, R> void coprocessorService(final Class<T> service,
1526 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1527 final Batch.Callback<R> callback) throws ServiceException, Throwable {
1528
1529
1530 List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1531
1532 Map<byte[],Future<R>> futures =
1533 new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1534 for (final byte[] r : keys) {
1535 final RegionCoprocessorRpcChannel channel =
1536 new RegionCoprocessorRpcChannel(connection, tableName, r, rpcCallerFactory,
1537 rpcControllerFactory);
1538 Future<R> future = pool.submit(
1539 new Callable<R>() {
1540 public R call() throws Exception {
1541 T instance = ProtobufUtil.newServiceStub(service, channel);
1542 R result = callable.call(instance);
1543 byte[] region = channel.getLastRegion();
1544 if (callback != null) {
1545 callback.update(region, r, result);
1546 }
1547 return result;
1548 }
1549 });
1550 futures.put(r, future);
1551 }
1552 for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1553 try {
1554 e.getValue().get();
1555 } catch (ExecutionException ee) {
1556 LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1557 + Bytes.toStringBinary(e.getKey()), ee);
1558 throw ee.getCause();
1559 } catch (InterruptedException ie) {
1560 throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1561 + " for row " + Bytes.toStringBinary(e.getKey()))
1562 .initCause(ie);
1563 }
1564 }
1565 }
1566
1567 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1568 throws IOException {
1569 if (start == null) {
1570 start = HConstants.EMPTY_START_ROW;
1571 }
1572 if (end == null) {
1573 end = HConstants.EMPTY_END_ROW;
1574 }
1575 return getKeysAndRegionsInRange(start, end, true).getFirst();
1576 }
1577
1578 public void setOperationTimeout(int operationTimeout) {
1579 this.operationTimeout = operationTimeout;
1580 }
1581
1582 public int getOperationTimeout() {
1583 return operationTimeout;
1584 }
1585
1586 @Override
1587 public String toString() {
1588 return tableName + ";" + connection;
1589 }
1590
1591
1592
1593
1594
1595
1596 public static void main(String[] args) throws IOException {
1597 HTable t = new HTable(HBaseConfiguration.create(), args[0]);
1598 try {
1599 System.out.println(t.get(new Get(Bytes.toBytes(args[1]))));
1600 } finally {
1601 t.close();
1602 }
1603 }
1604
1605
1606
1607
1608 @Override
1609 public <R extends Message> Map<byte[], R> batchCoprocessorService(
1610 Descriptors.MethodDescriptor methodDescriptor, Message request,
1611 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1612 final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1613 Bytes.BYTES_COMPARATOR));
1614 batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1615 new Callback<R>() {
1616
1617 @Override
1618 public void update(byte[] region, byte[] row, R result) {
1619 if (region != null) {
1620 results.put(region, result);
1621 }
1622 }
1623 });
1624 return results;
1625 }
1626
1627
1628
1629
1630 @Override
1631 public <R extends Message> void batchCoprocessorService(
1632 final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1633 byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1634 throws ServiceException, Throwable {
1635
1636
1637 Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1638 getKeysAndRegionsInRange(startKey, endKey, true);
1639 List<byte[]> keys = keysAndRegions.getFirst();
1640 List<HRegionLocation> regions = keysAndRegions.getSecond();
1641
1642
1643 if (keys.isEmpty()) {
1644 LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1645 ", end=" + Bytes.toStringBinary(endKey));
1646 return;
1647 }
1648
1649 List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1650 final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1651 new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1652 for (int i = 0; i < keys.size(); i++) {
1653 final byte[] rowKey = keys.get(i);
1654 final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1655 RegionCoprocessorServiceExec exec =
1656 new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1657 execs.add(exec);
1658 execsByRow.put(rowKey, exec);
1659 }
1660
1661
1662
1663 final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1664 final List<Row> callbackErrorActions = new ArrayList<Row>();
1665 final List<String> callbackErrorServers = new ArrayList<String>();
1666
1667 AsyncProcess<ClientProtos.CoprocessorServiceResult> asyncProcess =
1668 new AsyncProcess<ClientProtos.CoprocessorServiceResult>(connection, tableName, pool,
1669 new AsyncProcess.AsyncProcessCallback<ClientProtos.CoprocessorServiceResult>() {
1670 @SuppressWarnings("unchecked")
1671 @Override
1672 public void success(int originalIndex, byte[] region, Row row,
1673 ClientProtos.CoprocessorServiceResult serviceResult) {
1674 if (LOG.isTraceEnabled()) {
1675 LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1676 " call #" + originalIndex + ": region=" + Bytes.toStringBinary(region) +
1677 ", row=" + Bytes.toStringBinary(row.getRow()) +
1678 ", value=" + serviceResult.getValue().getValue());
1679 }
1680 try {
1681 callback.update(region, row.getRow(),
1682 (R) responsePrototype.newBuilderForType().mergeFrom(
1683 serviceResult.getValue().getValue()).build());
1684 } catch (InvalidProtocolBufferException e) {
1685 LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1686 e);
1687 callbackErrorExceptions.add(e);
1688 callbackErrorActions.add(row);
1689 callbackErrorServers.add("null");
1690 }
1691 }
1692
1693 @Override
1694 public boolean failure(int originalIndex, byte[] region, Row row, Throwable t) {
1695 RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
1696 LOG.error("Failed calling endpoint " + methodDescriptor.getFullName() + ": region="
1697 + Bytes.toStringBinary(exec.getRegion()), t);
1698 return true;
1699 }
1700
1701 @Override
1702 public boolean retriableFailure(int originalIndex, Row row, byte[] region,
1703 Throwable exception) {
1704 RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
1705 LOG.error("Failed calling endpoint " + methodDescriptor.getFullName() + ": region="
1706 + Bytes.toStringBinary(exec.getRegion()), exception);
1707 return !(exception instanceof DoNotRetryIOException);
1708 }
1709 },
1710 configuration, rpcCallerFactory, rpcControllerFactory);
1711
1712 asyncProcess.submitAll(execs);
1713 asyncProcess.waitUntilDone();
1714
1715 if (asyncProcess.hasError()) {
1716 throw asyncProcess.getErrors();
1717 } else if (!callbackErrorExceptions.isEmpty()) {
1718 throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1719 callbackErrorServers);
1720 }
1721 }
1722 }