1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.NavigableMap;
28 import java.util.TreeMap;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.SynchronousQueue;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.classification.InterfaceStability;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.Cell;
43 import org.apache.hadoop.hbase.HBaseConfiguration;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HRegionLocation;
47 import org.apache.hadoop.hbase.HTableDescriptor;
48 import org.apache.hadoop.hbase.KeyValueUtil;
49 import org.apache.hadoop.hbase.ServerName;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.TableNotFoundException;
52 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
53 import org.apache.hadoop.hbase.client.coprocessor.Batch;
54 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
55 import org.apache.hadoop.hbase.filter.BinaryComparator;
56 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
57 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
58 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
59 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
60 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
61 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62 import org.apache.hadoop.hbase.protobuf.RequestConverter;
63 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
64 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
65 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
66 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
67 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
68 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
69 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
70 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
71 import org.apache.hadoop.hbase.util.Bytes;
72 import org.apache.hadoop.hbase.util.Pair;
73 import org.apache.hadoop.hbase.util.ReflectionUtils;
74 import org.apache.hadoop.hbase.util.Threads;
75
76 import com.google.common.annotations.VisibleForTesting;
77 import com.google.protobuf.Descriptors;
78 import com.google.protobuf.Message;
79 import com.google.protobuf.Service;
80 import com.google.protobuf.ServiceException;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 @InterfaceAudience.Private
115 @InterfaceStability.Stable
116 public class HTable implements HTableInterface, RegionLocator {
117 private static final Log LOG = LogFactory.getLog(HTable.class);
118 protected ClusterConnection connection;
119 private final TableName tableName;
120 private volatile Configuration configuration;
121 private ConnectionConfiguration connConfiguration;
122 protected BufferedMutatorImpl mutator;
123 private boolean autoFlush = true;
124 private boolean closed = false;
125 protected int scannerCaching;
126 protected long scannerMaxResultSize;
127 private ExecutorService pool;
128 private int operationTimeout;
129 private final boolean cleanupPoolOnClose;
130 private final boolean cleanupConnectionOnClose;
131 private Consistency defaultConsistency = Consistency.STRONG;
132 private HRegionLocator locator;
133
134
135 protected AsyncProcess multiAp;
136 private RpcRetryingCallerFactory rpcCallerFactory;
137 private RpcControllerFactory rpcControllerFactory;
138
139
140
141
142
143
144
145
146
147 @Deprecated
148 public HTable(Configuration conf, final String tableName)
149 throws IOException {
150 this(conf, TableName.valueOf(tableName));
151 }
152
153
154
155
156
157
158
159
160
161 @Deprecated
162 public HTable(Configuration conf, final byte[] tableName)
163 throws IOException {
164 this(conf, TableName.valueOf(tableName));
165 }
166
167
168
169
170
171
172
173
174
175 @Deprecated
176 public HTable(Configuration conf, final TableName tableName)
177 throws IOException {
178 this.tableName = tableName;
179 this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
180 if (conf == null) {
181 this.connection = null;
182 return;
183 }
184 this.connection = ConnectionManager.getConnectionInternal(conf);
185 this.configuration = conf;
186
187 this.pool = getDefaultExecutor(conf);
188 this.finishSetup();
189 }
190
191
192
193
194
195
196
197
198 @Deprecated
199 public HTable(TableName tableName, Connection connection) throws IOException {
200 this.tableName = tableName;
201 this.cleanupPoolOnClose = true;
202 this.cleanupConnectionOnClose = false;
203 this.connection = (ClusterConnection)connection;
204 this.configuration = connection.getConfiguration();
205
206 this.pool = getDefaultExecutor(this.configuration);
207 this.finishSetup();
208 }
209
210
211 @InterfaceAudience.Private
212 public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
213 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
214 if (maxThreads == 0) {
215 maxThreads = 1;
216 }
217 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
218
219
220
221
222
223 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
224 new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
225 pool.allowCoreThreadTimeOut(true);
226 return pool;
227 }
228
229
230
231
232
233
234
235
236
237
238 @Deprecated
239 public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
240 throws IOException {
241 this(conf, TableName.valueOf(tableName), pool);
242 }
243
244
245
246
247
248
249
250
251
252
253 @Deprecated
254 public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
255 throws IOException {
256 this.connection = ConnectionManager.getConnectionInternal(conf);
257 this.configuration = conf;
258 this.pool = pool;
259 if (pool == null) {
260 this.pool = getDefaultExecutor(conf);
261 this.cleanupPoolOnClose = true;
262 } else {
263 this.cleanupPoolOnClose = false;
264 }
265 this.tableName = tableName;
266 this.cleanupConnectionOnClose = true;
267 this.finishSetup();
268 }
269
270
271
272
273
274
275
276
277
278 @Deprecated
279 public HTable(final byte[] tableName, final Connection connection,
280 final ExecutorService pool) throws IOException {
281 this(TableName.valueOf(tableName), connection, pool);
282 }
283
284
285 @Deprecated
286 public HTable(TableName tableName, final Connection connection,
287 final ExecutorService pool) throws IOException {
288 this(tableName, (ClusterConnection)connection, null, null, null, pool);
289 }
290
291
292
293
294
295
296
297
298
299
300 @InterfaceAudience.Private
301 public HTable(TableName tableName, final ClusterConnection connection,
302 final ConnectionConfiguration tableConfig,
303 final RpcRetryingCallerFactory rpcCallerFactory,
304 final RpcControllerFactory rpcControllerFactory,
305 final ExecutorService pool) throws IOException {
306 if (connection == null || connection.isClosed()) {
307 throw new IllegalArgumentException("Connection is null or closed.");
308 }
309 this.tableName = tableName;
310 this.cleanupConnectionOnClose = false;
311 this.connection = connection;
312 this.configuration = connection.getConfiguration();
313 this.connConfiguration = tableConfig;
314 this.pool = pool;
315 if (pool == null) {
316 this.pool = getDefaultExecutor(this.configuration);
317 this.cleanupPoolOnClose = true;
318 } else {
319 this.cleanupPoolOnClose = false;
320 }
321
322 this.rpcCallerFactory = rpcCallerFactory;
323 this.rpcControllerFactory = rpcControllerFactory;
324
325 this.finishSetup();
326 }
327
328
329
330
331
332 @VisibleForTesting
333 protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
334 connection = conn;
335 tableName = params.getTableName();
336 connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
337 cleanupPoolOnClose = false;
338 cleanupConnectionOnClose = false;
339
340 this.mutator = new BufferedMutatorImpl(conn, null, null, params);
341 }
342
343
344
345
346 public static int getMaxKeyValueSize(Configuration conf) {
347 return conf.getInt("hbase.client.keyvalue.maxsize", -1);
348 }
349
350
351
352
353 private void finishSetup() throws IOException {
354 if (connConfiguration == null) {
355 connConfiguration = new ConnectionConfiguration(configuration);
356 }
357
358 this.operationTimeout = tableName.isSystemTable() ?
359 connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
360 this.scannerCaching = connConfiguration.getScannerCaching();
361 this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
362 if (this.rpcCallerFactory == null) {
363 this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
364 }
365 if (this.rpcControllerFactory == null) {
366 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
367 }
368
369
370 multiAp = this.connection.getAsyncProcess();
371
372 this.closed = false;
373
374 this.locator = new HRegionLocator(tableName, connection);
375 }
376
377
378
379
380 @Override
381 public Configuration getConfiguration() {
382 return configuration;
383 }
384
385
386
387
388
389
390
391
392
393
394 @Deprecated
395 public static boolean isTableEnabled(String tableName) throws IOException {
396 return isTableEnabled(TableName.valueOf(tableName));
397 }
398
399
400
401
402
403
404
405
406
407
408 @Deprecated
409 public static boolean isTableEnabled(byte[] tableName) throws IOException {
410 return isTableEnabled(TableName.valueOf(tableName));
411 }
412
413
414
415
416
417
418
419
420
421
422 @Deprecated
423 public static boolean isTableEnabled(TableName tableName) throws IOException {
424 return isTableEnabled(HBaseConfiguration.create(), tableName);
425 }
426
427
428
429
430
431
432
433
434
435 @Deprecated
436 public static boolean isTableEnabled(Configuration conf, String tableName)
437 throws IOException {
438 return isTableEnabled(conf, TableName.valueOf(tableName));
439 }
440
441
442
443
444
445
446
447
448
449 @Deprecated
450 public static boolean isTableEnabled(Configuration conf, byte[] tableName)
451 throws IOException {
452 return isTableEnabled(conf, TableName.valueOf(tableName));
453 }
454
455
456
457
458
459
460
461
462
463 @Deprecated
464 public static boolean isTableEnabled(Configuration conf,
465 final TableName tableName) throws IOException {
466 return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
467 @Override
468 public Boolean connect(HConnection connection) throws IOException {
469 return connection.isTableEnabled(tableName);
470 }
471 });
472 }
473
474
475
476
477
478
479
480
481 @Deprecated
482 public HRegionLocation getRegionLocation(final String row)
483 throws IOException {
484 return getRegionLocation(Bytes.toBytes(row), false);
485 }
486
487
488
489
490 @Override
491 @Deprecated
492 public HRegionLocation getRegionLocation(final byte [] row)
493 throws IOException {
494 return locator.getRegionLocation(row);
495 }
496
497
498
499
500 @Override
501 @Deprecated
502 public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
503 throws IOException {
504 return locator.getRegionLocation(row, reload);
505 }
506
507
508
509
510 @Override
511 public byte [] getTableName() {
512 return this.tableName.getName();
513 }
514
515 @Override
516 public TableName getName() {
517 return tableName;
518 }
519
520
521
522
523
524
525
526
527 @Deprecated
528 @VisibleForTesting
529 public HConnection getConnection() {
530 return this.connection;
531 }
532
533
534
535
536
537
538
539 @Deprecated
540 public int getScannerCaching() {
541 return scannerCaching;
542 }
543
544
545
546
547
548 @Deprecated
549 public List<Row> getWriteBuffer() {
550 return mutator == null ? null : mutator.getWriteBuffer();
551 }
552
553
554
555
556
557
558
559
560
561
562
563
564 @Deprecated
565 public void setScannerCaching(int scannerCaching) {
566 this.scannerCaching = scannerCaching;
567 }
568
569
570
571
572 @Override
573 public HTableDescriptor getTableDescriptor() throws IOException {
574 HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
575 rpcControllerFactory, operationTimeout);
576 if (htd != null) {
577 return new UnmodifyableHTableDescriptor(htd);
578 }
579 return null;
580 }
581
582
583
584
585
586 @Override
587 @Deprecated
588 public byte [][] getStartKeys() throws IOException {
589 return locator.getStartKeys();
590 }
591
592
593
594
595
596 @Override
597 @Deprecated
598 public byte[][] getEndKeys() throws IOException {
599 return locator.getEndKeys();
600 }
601
602
603
604
605
606 @Override
607 @Deprecated
608 public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
609 return locator.getStartEndKeys();
610 }
611
612
613
614
615
616
617
618
619
620 @Deprecated
621 public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
622
623 return MetaScanner.allTableRegions(this.connection, getName());
624 }
625
626
627
628
629
630
631
632
633
634
635 @Override
636 @Deprecated
637 public List<HRegionLocation> getAllRegionLocations() throws IOException {
638 return locator.getAllRegionLocations();
639 }
640
641
642
643
644
645
646
647
648
649
650
651 @Deprecated
652 public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
653 final byte [] endKey) throws IOException {
654 return getRegionsInRange(startKey, endKey, false);
655 }
656
657
658
659
660
661
662
663
664
665
666
667
668 @Deprecated
669 public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
670 final byte [] endKey, final boolean reload) throws IOException {
671 return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
672 }
673
674
675
676
677
678
679
680
681
682
683
684
685
686 @Deprecated
687 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
688 final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
689 throws IOException {
690 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
691 }
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706 @Deprecated
707 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
708 final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
709 final boolean reload) throws IOException {
710 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
711 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
712 throw new IllegalArgumentException(
713 "Invalid range: " + Bytes.toStringBinary(startKey) +
714 " > " + Bytes.toStringBinary(endKey));
715 }
716 List<byte[]> keysInRange = new ArrayList<byte[]>();
717 List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
718 byte[] currentKey = startKey;
719 do {
720 HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
721 keysInRange.add(currentKey);
722 regionsInRange.add(regionLocation);
723 currentKey = regionLocation.getRegionInfo().getEndKey();
724 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
725 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
726 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
727 return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
728 regionsInRange);
729 }
730
731
732
733
734
735 @Override
736 @Deprecated
737 public Result getRowOrBefore(final byte[] row, final byte[] family)
738 throws IOException {
739 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
740 tableName, row) {
741 @Override
742 public Result call(int callTimeout) throws IOException {
743 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
744 controller.setPriority(tableName);
745 controller.setCallTimeout(callTimeout);
746 ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
747 getLocation().getRegionInfo().getRegionName(), row, family);
748 try {
749 ClientProtos.GetResponse response = getStub().get(controller, request);
750 if (!response.hasResult()) return null;
751 return ProtobufUtil.toResult(response.getResult());
752 } catch (ServiceException se) {
753 throw ProtobufUtil.getRemoteException(se);
754 }
755 }
756 };
757 return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
758 }
759
760
761
762
763
764 @Override
765 public ResultScanner getScanner(final Scan scan) throws IOException {
766 if (scan.getBatch() > 0 && scan.isSmall()) {
767 throw new IllegalArgumentException("Small scan should not be used with batching");
768 }
769
770 if (scan.getCaching() <= 0) {
771 scan.setCaching(getScannerCaching());
772 }
773 if (scan.getMaxResultSize() <= 0) {
774 scan.setMaxResultSize(scannerMaxResultSize);
775 }
776
777 if (scan.isReversed()) {
778 if (scan.isSmall()) {
779 return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
780 this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
781 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
782 } else {
783 return new ReversedClientScanner(getConfiguration(), scan, getName(),
784 this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
785 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
786 }
787 }
788
789 if (scan.isSmall()) {
790 return new ClientSmallScanner(getConfiguration(), scan, getName(),
791 this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
792 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
793 } else {
794 return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
795 this.rpcCallerFactory, this.rpcControllerFactory,
796 pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
797 }
798 }
799
800
801
802
803
804 @Override
805 public ResultScanner getScanner(byte [] family) throws IOException {
806 Scan scan = new Scan();
807 scan.addFamily(family);
808 return getScanner(scan);
809 }
810
811
812
813
814
815 @Override
816 public ResultScanner getScanner(byte [] family, byte [] qualifier)
817 throws IOException {
818 Scan scan = new Scan();
819 scan.addColumn(family, qualifier);
820 return getScanner(scan);
821 }
822
823
824
825
826 @Override
827 public Result get(final Get get) throws IOException {
828 return get(get, get.isCheckExistenceOnly());
829 }
830
831 private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
832
833 if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
834 get = ReflectionUtils.newInstance(get.getClass(), get);
835 get.setCheckExistenceOnly(checkExistenceOnly);
836 if (get.getConsistency() == null){
837 get.setConsistency(defaultConsistency);
838 }
839 }
840
841 if (get.getConsistency() == Consistency.STRONG) {
842
843 final Get getReq = get;
844 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
845 getName(), get.getRow()) {
846 @Override
847 public Result call(int callTimeout) throws IOException {
848 ClientProtos.GetRequest request =
849 RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
850 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
851 controller.setPriority(tableName);
852 controller.setCallTimeout(callTimeout);
853 try {
854 ClientProtos.GetResponse response = getStub().get(controller, request);
855 if (response == null) return null;
856 return ProtobufUtil.toResult(response.getResult());
857 } catch (ServiceException se) {
858 throw ProtobufUtil.getRemoteException(se);
859 }
860 }
861 };
862 return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
863 }
864
865
866 RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
867 rpcControllerFactory, tableName, this.connection, get, pool,
868 connConfiguration.getRetriesNumber(),
869 operationTimeout,
870 connConfiguration.getPrimaryCallTimeoutMicroSecond());
871 return callable.call();
872 }
873
874
875
876
877
878 @Override
879 public Result[] get(List<Get> gets) throws IOException {
880 if (gets.size() == 1) {
881 return new Result[]{get(gets.get(0))};
882 }
883 try {
884 Object [] r1 = batch((List)gets);
885
886
887 Result [] results = new Result[r1.length];
888 int i=0;
889 for (Object o : r1) {
890
891 results[i++] = (Result) o;
892 }
893
894 return results;
895 } catch (InterruptedException e) {
896 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
897 }
898 }
899
900
901
902
903 @Override
904 public void batch(final List<? extends Row> actions, final Object[] results)
905 throws InterruptedException, IOException {
906 AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
907 ars.waitUntilDone();
908 if (ars.hasError()) {
909 throw ars.getErrors();
910 }
911 }
912
913
914
915
916
917
918 @Deprecated
919 @Override
920 public Object[] batch(final List<? extends Row> actions)
921 throws InterruptedException, IOException {
922 Object[] results = new Object[actions.size()];
923 batch(actions, results);
924 return results;
925 }
926
927
928
929
930 @Override
931 public <R> void batchCallback(
932 final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
933 throws IOException, InterruptedException {
934 connection.processBatchCallback(actions, tableName, pool, results, callback);
935 }
936
937
938
939
940
941
942
943
944 @Deprecated
945 @Override
946 public <R> Object[] batchCallback(
947 final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
948 InterruptedException {
949 Object[] results = new Object[actions.size()];
950 batchCallback(actions, results, callback);
951 return results;
952 }
953
954
955
956
957 @Override
958 public void delete(final Delete delete)
959 throws IOException {
960 RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
961 tableName, delete.getRow()) {
962 @Override
963 public Boolean call(int callTimeout) throws IOException {
964 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
965 controller.setPriority(tableName);
966 controller.setCallTimeout(callTimeout);
967
968 try {
969 MutateRequest request = RequestConverter.buildMutateRequest(
970 getLocation().getRegionInfo().getRegionName(), delete);
971 MutateResponse response = getStub().mutate(controller, request);
972 return Boolean.valueOf(response.getProcessed());
973 } catch (ServiceException se) {
974 throw ProtobufUtil.getRemoteException(se);
975 }
976 }
977 };
978 rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
979 }
980
981
982
983
984 @Override
985 public void delete(final List<Delete> deletes)
986 throws IOException {
987 Object[] results = new Object[deletes.size()];
988 try {
989 batch(deletes, results);
990 } catch (InterruptedException e) {
991 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
992 } finally {
993
994
995
996 for (int i = results.length - 1; i>=0; i--) {
997
998 if (results[i] instanceof Result) {
999 deletes.remove(i);
1000 }
1001 }
1002 }
1003 }
1004
1005
1006
1007
1008
1009 @Override
1010 public void put(final Put put) throws IOException {
1011 getBufferedMutator().mutate(put);
1012 if (autoFlush) {
1013 flushCommits();
1014 }
1015 }
1016
1017
1018
1019
1020
1021 @Override
1022 public void put(final List<Put> puts) throws IOException {
1023 getBufferedMutator().mutate(puts);
1024 if (autoFlush) {
1025 flushCommits();
1026 }
1027 }
1028
1029
1030
1031
1032 @Override
1033 public void mutateRow(final RowMutations rm) throws IOException {
1034 RegionServerCallable<Void> callable =
1035 new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
1036 @Override
1037 public Void call(int callTimeout) throws IOException {
1038 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1039 controller.setPriority(tableName);
1040 controller.setCallTimeout(callTimeout);
1041 try {
1042 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
1043 getLocation().getRegionInfo().getRegionName(), rm);
1044 regionMutationBuilder.setAtomic(true);
1045 MultiRequest request =
1046 MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
1047 ClientProtos.MultiResponse response = getStub().multi(controller, request);
1048 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1049 if (res.hasException()) {
1050 Throwable ex = ProtobufUtil.toException(res.getException());
1051 if(ex instanceof IOException) {
1052 throw (IOException)ex;
1053 }
1054 throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex);
1055 }
1056 } catch (ServiceException se) {
1057 throw ProtobufUtil.getRemoteException(se);
1058 }
1059 return null;
1060 }
1061 };
1062 rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
1063 }
1064
1065
1066
1067
1068 @Override
1069 public Result append(final Append append) throws IOException {
1070 if (append.numFamilies() == 0) {
1071 throw new IOException(
1072 "Invalid arguments to append, no columns specified");
1073 }
1074
1075 NonceGenerator ng = this.connection.getNonceGenerator();
1076 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1077 RegionServerCallable<Result> callable =
1078 new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
1079 @Override
1080 public Result call(int callTimeout) throws IOException {
1081 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1082 controller.setPriority(getTableName());
1083 controller.setCallTimeout(callTimeout);
1084 try {
1085 MutateRequest request = RequestConverter.buildMutateRequest(
1086 getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
1087 MutateResponse response = getStub().mutate(controller, request);
1088 if (!response.hasResult()) return null;
1089 return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1090 } catch (ServiceException se) {
1091 throw ProtobufUtil.getRemoteException(se);
1092 }
1093 }
1094 };
1095 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1096 }
1097
1098
1099
1100
1101 @Override
1102 public Result increment(final Increment increment) throws IOException {
1103 if (!increment.hasFamilies()) {
1104 throw new IOException(
1105 "Invalid arguments to increment, no columns specified");
1106 }
1107 NonceGenerator ng = this.connection.getNonceGenerator();
1108 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1109 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
1110 getName(), increment.getRow()) {
1111 @Override
1112 public Result call(int callTimeout) throws IOException {
1113 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1114 controller.setPriority(getTableName());
1115 controller.setCallTimeout(callTimeout);
1116 try {
1117 MutateRequest request = RequestConverter.buildMutateRequest(
1118 getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
1119 MutateResponse response = getStub().mutate(controller, request);
1120 return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1121 } catch (ServiceException se) {
1122 throw ProtobufUtil.getRemoteException(se);
1123 }
1124 }
1125 };
1126 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
1127 }
1128
1129
1130
1131
1132 @Override
1133 public long incrementColumnValue(final byte [] row, final byte [] family,
1134 final byte [] qualifier, final long amount)
1135 throws IOException {
1136 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
1137 }
1138
1139
1140
1141
1142
1143
1144
1145 @Deprecated
1146 @Override
1147 public long incrementColumnValue(final byte [] row, final byte [] family,
1148 final byte [] qualifier, final long amount, final boolean writeToWAL)
1149 throws IOException {
1150 return incrementColumnValue(row, family, qualifier, amount,
1151 writeToWAL? Durability.SYNC_WAL: Durability.SKIP_WAL);
1152 }
1153
1154
1155
1156
1157 @Override
1158 public long incrementColumnValue(final byte [] row, final byte [] family,
1159 final byte [] qualifier, final long amount, final Durability durability)
1160 throws IOException {
1161 NullPointerException npe = null;
1162 if (row == null) {
1163 npe = new NullPointerException("row is null");
1164 } else if (family == null) {
1165 npe = new NullPointerException("family is null");
1166 } else if (qualifier == null) {
1167 npe = new NullPointerException("qualifier is null");
1168 }
1169 if (npe != null) {
1170 throw new IOException(
1171 "Invalid arguments to incrementColumnValue", npe);
1172 }
1173
1174 NonceGenerator ng = this.connection.getNonceGenerator();
1175 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
1176 RegionServerCallable<Long> callable =
1177 new RegionServerCallable<Long>(connection, getName(), row) {
1178 @Override
1179 public Long call(int callTimeout) throws IOException {
1180 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1181 controller.setPriority(getTableName());
1182 controller.setCallTimeout(callTimeout);
1183 try {
1184 MutateRequest request = RequestConverter.buildIncrementRequest(
1185 getLocation().getRegionInfo().getRegionName(), row, family,
1186 qualifier, amount, durability, nonceGroup, nonce);
1187 MutateResponse response = getStub().mutate(controller, request);
1188 Result result =
1189 ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
1190 return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
1191 } catch (ServiceException se) {
1192 throw ProtobufUtil.getRemoteException(se);
1193 }
1194 }
1195 };
1196 return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
1197 }
1198
1199
1200
1201
1202 @Override
1203 public boolean checkAndPut(final byte [] row,
1204 final byte [] family, final byte [] qualifier, final byte [] value,
1205 final Put put)
1206 throws IOException {
1207 RegionServerCallable<Boolean> callable =
1208 new RegionServerCallable<Boolean>(connection, getName(), row) {
1209 @Override
1210 public Boolean call(int callTimeout) throws IOException {
1211 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1212 controller.setPriority(tableName);
1213 controller.setCallTimeout(callTimeout);
1214 try {
1215 MutateRequest request = RequestConverter.buildMutateRequest(
1216 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1217 new BinaryComparator(value), CompareType.EQUAL, put);
1218 MutateResponse response = getStub().mutate(controller, request);
1219 return Boolean.valueOf(response.getProcessed());
1220 } catch (ServiceException se) {
1221 throw ProtobufUtil.getRemoteException(se);
1222 }
1223 }
1224 };
1225 return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1226 }
1227
1228
1229
1230
1231 @Override
1232 public boolean checkAndPut(final byte [] row, final byte [] family,
1233 final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1234 final Put put)
1235 throws IOException {
1236 RegionServerCallable<Boolean> callable =
1237 new RegionServerCallable<Boolean>(connection, getName(), row) {
1238 @Override
1239 public Boolean call(int callTimeout) throws IOException {
1240 PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
1241 controller.setPriority(tableName);
1242 controller.setCallTimeout(callTimeout);
1243 try {
1244 CompareType compareType = CompareType.valueOf(compareOp.name());
1245 MutateRequest request = RequestConverter.buildMutateRequest(
1246 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1247 new BinaryComparator(value), compareType, put);
1248 MutateResponse response = getStub().mutate(controller, request);
1249 return Boolean.valueOf(response.getProcessed());
1250 } catch (ServiceException se) {
1251 throw ProtobufUtil.getRemoteException(se);
1252 }
1253 }
1254 };
1255 return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1256 }
1257
1258
1259
1260
1261 @Override
1262 public boolean checkAndDelete(final byte [] row,
1263 final byte [] family, final byte [] qualifier, final byte [] value,
1264 final Delete delete)
1265 throws IOException {
1266 RegionServerCallable<Boolean> callable =
1267 new RegionServerCallable<Boolean>(connection, getName(), row) {
1268 @Override
1269 public Boolean call(int callTimeout) throws IOException {
1270 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1271 controller.setPriority(tableName);
1272 controller.setCallTimeout(callTimeout);
1273 try {
1274 MutateRequest request = RequestConverter.buildMutateRequest(
1275 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1276 new BinaryComparator(value), CompareType.EQUAL, delete);
1277 MutateResponse response = getStub().mutate(controller, request);
1278 return Boolean.valueOf(response.getProcessed());
1279 } catch (ServiceException se) {
1280 throw ProtobufUtil.getRemoteException(se);
1281 }
1282 }
1283 };
1284 return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1285 }
1286
1287
1288
1289
1290 @Override
1291 public boolean checkAndDelete(final byte [] row, final byte [] family,
1292 final byte [] qualifier, final CompareOp compareOp, final byte [] value,
1293 final Delete delete)
1294 throws IOException {
1295 RegionServerCallable<Boolean> callable =
1296 new RegionServerCallable<Boolean>(connection, getName(), row) {
1297 @Override
1298 public Boolean call(int callTimeout) throws IOException {
1299 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1300 controller.setPriority(tableName);
1301 controller.setCallTimeout(callTimeout);
1302 try {
1303 CompareType compareType = CompareType.valueOf(compareOp.name());
1304 MutateRequest request = RequestConverter.buildMutateRequest(
1305 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1306 new BinaryComparator(value), compareType, delete);
1307 MutateResponse response = getStub().mutate(controller, request);
1308 return Boolean.valueOf(response.getProcessed());
1309 } catch (ServiceException se) {
1310 throw ProtobufUtil.getRemoteException(se);
1311 }
1312 }
1313 };
1314 return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1315 }
1316
1317
1318
1319
1320 @Override
1321 public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
1322 final CompareOp compareOp, final byte [] value, final RowMutations rm)
1323 throws IOException {
1324 RegionServerCallable<Boolean> callable =
1325 new RegionServerCallable<Boolean>(connection, getName(), row) {
1326 @Override
1327 public Boolean call(int callTimeout) throws IOException {
1328 PayloadCarryingRpcController controller = rpcControllerFactory.newController();
1329 controller.setPriority(tableName);
1330 controller.setCallTimeout(callTimeout);
1331 try {
1332 CompareType compareType = CompareType.valueOf(compareOp.name());
1333 MultiRequest request = RequestConverter.buildMutateRequest(
1334 getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
1335 new BinaryComparator(value), compareType, rm);
1336 ClientProtos.MultiResponse response = getStub().multi(controller, request);
1337 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
1338 if (res.hasException()) {
1339 Throwable ex = ProtobufUtil.toException(res.getException());
1340 if(ex instanceof IOException) {
1341 throw (IOException)ex;
1342 }
1343 throw new IOException("Failed to checkAndMutate row: "+
1344 Bytes.toStringBinary(rm.getRow()), ex);
1345 }
1346 return Boolean.valueOf(response.getProcessed());
1347 } catch (ServiceException se) {
1348 throw ProtobufUtil.getRemoteException(se);
1349 }
1350 }
1351 };
1352 return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
1353 }
1354
1355
1356
1357
1358 @Override
1359 public boolean exists(final Get get) throws IOException {
1360 Result r = get(get, true);
1361 assert r.getExists() != null;
1362 return r.getExists();
1363 }
1364
1365
1366
1367
1368 @Override
1369 public boolean[] existsAll(final List<Get> gets) throws IOException {
1370 if (gets.isEmpty()) return new boolean[]{};
1371 if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
1372
1373 ArrayList<Get> exists = new ArrayList<Get>(gets.size());
1374 for (Get g: gets){
1375 Get ge = new Get(g);
1376 ge.setCheckExistenceOnly(true);
1377 exists.add(ge);
1378 }
1379
1380 Object[] r1;
1381 try {
1382 r1 = batch(exists);
1383 } catch (InterruptedException e) {
1384 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1385 }
1386
1387
1388 boolean[] results = new boolean[r1.length];
1389 int i = 0;
1390 for (Object o : r1) {
1391
1392 results[i++] = ((Result)o).getExists();
1393 }
1394
1395 return results;
1396 }
1397
1398
1399
1400
1401 @Override
1402 @Deprecated
1403 public Boolean[] exists(final List<Get> gets) throws IOException {
1404 boolean[] results = existsAll(gets);
1405 Boolean[] objectResults = new Boolean[results.length];
1406 for (int i = 0; i < results.length; ++i) {
1407 objectResults[i] = results[i];
1408 }
1409 return objectResults;
1410 }
1411
1412
1413
1414
1415
1416 @Override
1417 public void flushCommits() throws IOException {
1418 if (mutator == null) {
1419
1420 return;
1421 }
1422 getBufferedMutator().flush();
1423 }
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436 public <R> void processBatchCallback(
1437 final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
1438 throws IOException, InterruptedException {
1439 this.batchCallback(list, results, callback);
1440 }
1441
1442
1443
1444
1445
1446
1447 public void processBatch(final List<? extends Row> list, final Object[] results)
1448 throws IOException, InterruptedException {
1449 this.batch(list, results);
1450 }
1451
1452
1453 @Override
1454 public void close() throws IOException {
1455 if (this.closed) {
1456 return;
1457 }
1458 flushCommits();
1459 if (cleanupPoolOnClose) {
1460 this.pool.shutdown();
1461 try {
1462 boolean terminated = false;
1463 do {
1464
1465 terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1466 } while (!terminated);
1467 } catch (InterruptedException e) {
1468 this.pool.shutdownNow();
1469 LOG.warn("waitForTermination interrupted");
1470 }
1471 }
1472 if (cleanupConnectionOnClose) {
1473 if (this.connection != null) {
1474 this.connection.close();
1475 }
1476 }
1477 this.closed = true;
1478 }
1479
1480
1481 public void validatePut(final Put put) throws IllegalArgumentException {
1482 validatePut(put, connConfiguration.getMaxKeyValueSize());
1483 }
1484
1485
1486 public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
1487 if (put.isEmpty()) {
1488 throw new IllegalArgumentException("No columns to insert");
1489 }
1490 if (maxKeyValueSize > 0) {
1491 for (List<Cell> list : put.getFamilyCellMap().values()) {
1492 for (Cell cell : list) {
1493 if (KeyValueUtil.length(cell) > maxKeyValueSize) {
1494 throw new IllegalArgumentException("KeyValue size too large");
1495 }
1496 }
1497 }
1498 }
1499 }
1500
1501
1502
1503
1504 @Override
1505 public boolean isAutoFlush() {
1506 return autoFlush;
1507 }
1508
1509
1510
1511
1512 @Deprecated
1513 @Override
1514 public void setAutoFlush(boolean autoFlush) {
1515 this.autoFlush = autoFlush;
1516 }
1517
1518
1519
1520
1521 @Override
1522 public void setAutoFlushTo(boolean autoFlush) {
1523 this.autoFlush = autoFlush;
1524 }
1525
1526
1527
1528
1529 @Override
1530 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
1531 this.autoFlush = autoFlush;
1532 }
1533
1534
1535
1536
1537
1538
1539
1540
1541 @Override
1542 public long getWriteBufferSize() {
1543 if (mutator == null) {
1544 return connConfiguration.getWriteBufferSize();
1545 } else {
1546 return mutator.getWriteBufferSize();
1547 }
1548 }
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558 @Override
1559 public void setWriteBufferSize(long writeBufferSize) throws IOException {
1560 getBufferedMutator();
1561 mutator.setWriteBufferSize(writeBufferSize);
1562 }
1563
1564
1565
1566
1567
1568 ExecutorService getPool() {
1569 return this.pool;
1570 }
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582 @Deprecated
1583 public static void setRegionCachePrefetch(final byte[] tableName,
1584 final boolean enable) throws IOException {
1585 }
1586
1587
1588
1589
1590 @Deprecated
1591 public static void setRegionCachePrefetch(
1592 final TableName tableName,
1593 final boolean enable) throws IOException {
1594 }
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607 @Deprecated
1608 public static void setRegionCachePrefetch(final Configuration conf,
1609 final byte[] tableName, final boolean enable) throws IOException {
1610 }
1611
1612
1613
1614
1615 @Deprecated
1616 public static void setRegionCachePrefetch(final Configuration conf,
1617 final TableName tableName,
1618 final boolean enable) throws IOException {
1619 }
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630 @Deprecated
1631 public static boolean getRegionCachePrefetch(final Configuration conf,
1632 final byte[] tableName) throws IOException {
1633 return false;
1634 }
1635
1636
1637
1638
1639 @Deprecated
1640 public static boolean getRegionCachePrefetch(final Configuration conf,
1641 final TableName tableName) throws IOException {
1642 return false;
1643 }
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653 @Deprecated
1654 public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
1655 return false;
1656 }
1657
1658
1659
1660
1661 @Deprecated
1662 public static boolean getRegionCachePrefetch(
1663 final TableName tableName) throws IOException {
1664 return false;
1665 }
1666
1667
1668
1669
1670
1671 public void clearRegionCache() {
1672 this.connection.clearRegionCache();
1673 }
1674
1675
1676
1677
1678 @Override
1679 public CoprocessorRpcChannel coprocessorService(byte[] row) {
1680 return new RegionCoprocessorRpcChannel(connection, tableName, row);
1681 }
1682
1683
1684
1685
1686 @Override
1687 public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
1688 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
1689 throws ServiceException, Throwable {
1690 final Map<byte[],R> results = Collections.synchronizedMap(
1691 new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
1692 coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
1693 @Override
1694 public void update(byte[] region, byte[] row, R value) {
1695 if (region != null) {
1696 results.put(region, value);
1697 }
1698 }
1699 });
1700 return results;
1701 }
1702
1703
1704
1705
1706 @Override
1707 public <T extends Service, R> void coprocessorService(final Class<T> service,
1708 byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
1709 final Batch.Callback<R> callback) throws ServiceException, Throwable {
1710
1711
1712 List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1713
1714 Map<byte[],Future<R>> futures =
1715 new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
1716 for (final byte[] r : keys) {
1717 final RegionCoprocessorRpcChannel channel =
1718 new RegionCoprocessorRpcChannel(connection, tableName, r);
1719 Future<R> future = pool.submit(
1720 new Callable<R>() {
1721 @Override
1722 public R call() throws Exception {
1723 T instance = ProtobufUtil.newServiceStub(service, channel);
1724 R result = callable.call(instance);
1725 byte[] region = channel.getLastRegion();
1726 if (callback != null) {
1727 callback.update(region, r, result);
1728 }
1729 return result;
1730 }
1731 });
1732 futures.put(r, future);
1733 }
1734 for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
1735 try {
1736 e.getValue().get();
1737 } catch (ExecutionException ee) {
1738 LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
1739 + Bytes.toStringBinary(e.getKey()), ee);
1740 throw ee.getCause();
1741 } catch (InterruptedException ie) {
1742 throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
1743 + " for row " + Bytes.toStringBinary(e.getKey()))
1744 .initCause(ie);
1745 }
1746 }
1747 }
1748
1749 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
1750 throws IOException {
1751 if (start == null) {
1752 start = HConstants.EMPTY_START_ROW;
1753 }
1754 if (end == null) {
1755 end = HConstants.EMPTY_END_ROW;
1756 }
1757 return getKeysAndRegionsInRange(start, end, true).getFirst();
1758 }
1759
1760 public void setOperationTimeout(int operationTimeout) {
1761 this.operationTimeout = operationTimeout;
1762 }
1763
1764 public int getOperationTimeout() {
1765 return operationTimeout;
1766 }
1767
1768 @Override
1769 public String toString() {
1770 return tableName + ";" + connection;
1771 }
1772
1773
1774
1775
1776 @Override
1777 public <R extends Message> Map<byte[], R> batchCoprocessorService(
1778 Descriptors.MethodDescriptor methodDescriptor, Message request,
1779 byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1780 final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
1781 Bytes.BYTES_COMPARATOR));
1782 batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1783 new Callback<R>() {
1784
1785 @Override
1786 public void update(byte[] region, byte[] row, R result) {
1787 if (region != null) {
1788 results.put(region, result);
1789 }
1790 }
1791 });
1792 return results;
1793 }
1794
1795
1796
1797
1798 @Override
1799 public <R extends Message> void batchCoprocessorService(
1800 final Descriptors.MethodDescriptor methodDescriptor, final Message request,
1801 byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
1802 throws ServiceException, Throwable {
1803
1804 if (startKey == null) {
1805 startKey = HConstants.EMPTY_START_ROW;
1806 }
1807 if (endKey == null) {
1808 endKey = HConstants.EMPTY_END_ROW;
1809 }
1810
1811 Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1812 getKeysAndRegionsInRange(startKey, endKey, true);
1813 List<byte[]> keys = keysAndRegions.getFirst();
1814 List<HRegionLocation> regions = keysAndRegions.getSecond();
1815
1816
1817 if (keys.isEmpty()) {
1818 LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
1819 ", end=" + Bytes.toStringBinary(endKey));
1820 return;
1821 }
1822
1823 List<RegionCoprocessorServiceExec> execs = new ArrayList<RegionCoprocessorServiceExec>();
1824 final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1825 new TreeMap<byte[], RegionCoprocessorServiceExec>(Bytes.BYTES_COMPARATOR);
1826 for (int i = 0; i < keys.size(); i++) {
1827 final byte[] rowKey = keys.get(i);
1828 final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1829 RegionCoprocessorServiceExec exec =
1830 new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1831 execs.add(exec);
1832 execsByRow.put(rowKey, exec);
1833 }
1834
1835
1836
1837 final List<Throwable> callbackErrorExceptions = new ArrayList<Throwable>();
1838 final List<Row> callbackErrorActions = new ArrayList<Row>();
1839 final List<String> callbackErrorServers = new ArrayList<String>();
1840 Object[] results = new Object[execs.size()];
1841
1842 AsyncProcess asyncProcess =
1843 new AsyncProcess(connection, configuration, pool,
1844 RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
1845 true, RpcControllerFactory.instantiate(configuration));
1846
1847 AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
1848 new Callback<ClientProtos.CoprocessorServiceResult>() {
1849 @Override
1850 public void update(byte[] region, byte[] row,
1851 ClientProtos.CoprocessorServiceResult serviceResult) {
1852 if (LOG.isTraceEnabled()) {
1853 LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
1854 ": region=" + Bytes.toStringBinary(region) +
1855 ", row=" + Bytes.toStringBinary(row) +
1856 ", value=" + serviceResult.getValue().getValue());
1857 }
1858 try {
1859 Message.Builder builder = responsePrototype.newBuilderForType();
1860 ProtobufUtil.mergeFrom(builder, serviceResult.getValue().getValue());
1861 callback.update(region, row, (R) builder.build());
1862 } catch (IOException e) {
1863 LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
1864 e);
1865 callbackErrorExceptions.add(e);
1866 callbackErrorActions.add(execsByRow.get(row));
1867 callbackErrorServers.add("null");
1868 }
1869 }
1870 }, results);
1871
1872 future.waitUntilDone();
1873
1874 if (future.hasError()) {
1875 throw future.getErrors();
1876 } else if (!callbackErrorExceptions.isEmpty()) {
1877 throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
1878 callbackErrorServers);
1879 }
1880 }
1881
1882 public RegionLocator getRegionLocator() {
1883 return this.locator;
1884 }
1885
1886 @VisibleForTesting
1887 BufferedMutator getBufferedMutator() throws IOException {
1888 if (mutator == null) {
1889 this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
1890 new BufferedMutatorParams(tableName)
1891 .pool(pool)
1892 .writeBufferSize(connConfiguration.getWriteBufferSize())
1893 .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
1894 );
1895 }
1896 return mutator;
1897 }
1898 }