1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.junit.Assert.assertTrue;
21 import static org.junit.Assert.fail;
22
23 import java.io.IOException;
24 import java.net.SocketTimeoutException;
25 import java.util.Comparator;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.Random;
29 import java.util.SortedMap;
30 import java.util.concurrent.ConcurrentSkipListMap;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import org.apache.hadoop.hbase.util.ByteStringer;
37 import org.apache.commons.lang.NotImplementedException;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.conf.Configured;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.HRegionLocation;
46 import org.apache.hadoop.hbase.RegionLocations;
47 import org.apache.hadoop.hbase.KeyValue;
48 import org.apache.hadoop.hbase.RegionTooBusyException;
49 import org.apache.hadoop.hbase.ServerName;
50 import org.apache.hadoop.hbase.testclassification.SmallTests;
51 import org.apache.hadoop.hbase.TableName;
52 import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
53 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
54 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
55 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
56 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
57 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
58 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
59 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
60 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
61 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
62 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
63 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
64 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
65 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
66 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
67 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
68 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
69 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
70 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
71 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
72 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
73 import org.apache.hadoop.hbase.security.User;
74 import org.apache.hadoop.hbase.util.Bytes;
75 import org.apache.hadoop.hbase.util.Pair;
76 import org.apache.hadoop.hbase.util.Threads;
77 import org.apache.hadoop.util.Tool;
78 import org.apache.hadoop.util.ToolRunner;
79 import org.junit.Before;
80 import org.junit.Ignore;
81 import org.junit.Test;
82 import org.junit.experimental.categories.Category;
83 import org.mockito.Mockito;
84
85 import com.google.common.base.Stopwatch;
86 import com.google.protobuf.ByteString;
87 import com.google.protobuf.RpcController;
88 import com.google.protobuf.ServiceException;
89
90
91
92
93
94 @Category(SmallTests.class)
95 public class TestClientNoCluster extends Configured implements Tool {
96 private static final Log LOG = LogFactory.getLog(TestClientNoCluster.class);
97 private Configuration conf;
98 public static final ServerName META_SERVERNAME =
99 ServerName.valueOf("meta.example.org", 16010, 12345);
100
101 @Before
102 public void setUp() throws Exception {
103 this.conf = HBaseConfiguration.create();
104
105
106
107 this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
108 }
109
110
111
112
113 static class SimpleRegistry implements Registry {
114 final ServerName META_HOST = META_SERVERNAME;
115
116 @Override
117 public void init(Connection connection) {
118 }
119
120 @Override
121 public RegionLocations getMetaRegionLocation() throws IOException {
122 return new RegionLocations(
123 new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, META_HOST));
124 }
125
126 @Override
127 public String getClusterId() {
128 return HConstants.CLUSTER_ID_DEFAULT;
129 }
130
131 @Override
132 public boolean isTableOnlineState(TableName tableName, boolean enabled)
133 throws IOException {
134 return enabled;
135 }
136
137 @Override
138 public int getCurrentNrHRS() throws IOException {
139 return 1;
140 }
141 }
142
143
144
145
146
147 @Ignore
148 @Test
149 public void testTimeoutAndRetries() throws IOException {
150 Configuration localConfig = HBaseConfiguration.create(this.conf);
151
152 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
153 Table table = new HTable(localConfig, TableName.META_TABLE_NAME);
154 Throwable t = null;
155 LOG.info("Start");
156 try {
157
158 table.exists(new Get(Bytes.toBytes("abc")));
159 } catch (SocketTimeoutException e) {
160
161 LOG.info("Got expected exception", e);
162 t = e;
163 } catch (RetriesExhaustedException e) {
164
165 fail();
166 } finally {
167 table.close();
168 }
169 LOG.info("Stop");
170 assertTrue(t != null);
171 }
172
173
174
175
176
177 @Test
178 public void testRpcTimeout() throws IOException {
179 Configuration localConfig = HBaseConfiguration.create(this.conf);
180
181 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
182 int pause = 10;
183 localConfig.setInt("hbase.client.pause", pause);
184 localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10);
185
186
187
188
189 localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1);
190 Table table = new HTable(localConfig, TableName.META_TABLE_NAME);
191 Throwable t = null;
192 try {
193
194 table.exists(new Get(Bytes.toBytes("abc")));
195 } catch (SocketTimeoutException e) {
196
197 LOG.info("Got expected exception", e);
198 t = e;
199 } catch (RetriesExhaustedException e) {
200
201 fail();
202 } finally {
203 table.close();
204 }
205 assertTrue(t != null);
206 }
207
208 @Test
209 public void testDoNotRetryMetaScanner() throws IOException {
210 this.conf.set("hbase.client.connection.impl",
211 RegionServerStoppedOnScannerOpenConnection.class.getName());
212 try (Connection connection = ConnectionFactory.createConnection(conf)) {
213 MetaScanner.metaScan(connection, null);
214 }
215 }
216
217 @Test
218 public void testDoNotRetryOnScanNext() throws IOException {
219 this.conf.set("hbase.client.connection.impl",
220 RegionServerStoppedOnScannerOpenConnection.class.getName());
221
222
223
224 Table table = new HTable(this.conf, TableName.META_TABLE_NAME);
225 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
226 try {
227 Result result = null;
228 while ((result = scanner.next()) != null) {
229 LOG.info(result);
230 }
231 } finally {
232 scanner.close();
233 table.close();
234 }
235 }
236
237 @Test
238 public void testRegionServerStoppedOnScannerOpen() throws IOException {
239 this.conf.set("hbase.client.connection.impl",
240 RegionServerStoppedOnScannerOpenConnection.class.getName());
241
242
243
244 Table table = new HTable(this.conf, TableName.META_TABLE_NAME);
245 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
246 try {
247 Result result = null;
248 while ((result = scanner.next()) != null) {
249 LOG.info(result);
250 }
251 } finally {
252 scanner.close();
253 table.close();
254 }
255 }
256
257
258
259
260 static class ScanOpenNextThenExceptionThenRecoverConnection
261 extends ConnectionManager.HConnectionImplementation {
262 final ClientService.BlockingInterface stub;
263
264 ScanOpenNextThenExceptionThenRecoverConnection(Configuration conf,
265 boolean managed, ExecutorService pool) throws IOException {
266 super(conf, managed);
267
268
269 this.stub = Mockito.mock(ClientService.BlockingInterface.class);
270 long sid = 12345L;
271 try {
272 Mockito.when(stub.scan((RpcController)Mockito.any(),
273 (ClientProtos.ScanRequest)Mockito.any())).
274 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
275 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
276 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
277 setMoreResults(false).build());
278 } catch (ServiceException e) {
279 throw new IOException(e);
280 }
281 }
282
283 @Override
284 public BlockingInterface getClient(ServerName sn) throws IOException {
285 return this.stub;
286 }
287 }
288
289
290
291
292 static class RegionServerStoppedOnScannerOpenConnection
293 extends ConnectionManager.HConnectionImplementation {
294 final ClientService.BlockingInterface stub;
295
296 RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed,
297 ExecutorService pool, User user) throws IOException {
298 super(conf, managed);
299
300
301 this.stub = Mockito.mock(ClientService.BlockingInterface.class);
302 long sid = 12345L;
303 try {
304 Mockito.when(stub.scan((RpcController)Mockito.any(),
305 (ClientProtos.ScanRequest)Mockito.any())).
306 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
307 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
308 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
309 setMoreResults(false).build());
310 } catch (ServiceException e) {
311 throw new IOException(e);
312 }
313 }
314
315 @Override
316 public BlockingInterface getClient(ServerName sn) throws IOException {
317 return this.stub;
318 }
319 }
320
321
322
323
324 static class RpcTimeoutConnection
325 extends ConnectionManager.HConnectionImplementation {
326 final ClientService.BlockingInterface stub;
327
328 RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool, User user)
329 throws IOException {
330 super(conf, managed);
331
332 this.stub = Mockito.mock(ClientService.BlockingInterface.class);
333 try {
334 Mockito.when(stub.get((RpcController)Mockito.any(),
335 (ClientProtos.GetRequest)Mockito.any())).
336 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito")));
337 } catch (ServiceException e) {
338 throw new IOException(e);
339 }
340 }
341
342 @Override
343 public BlockingInterface getClient(ServerName sn) throws IOException {
344 return this.stub;
345 }
346 }
347
348
349
350
351 static class ManyServersManyRegionsConnection
352 extends ConnectionManager.HConnectionImplementation {
353
354 final Map<ServerName, ClientService.BlockingInterface> serversByClient;
355
356
357
358
359 final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
360 final AtomicLong sequenceids = new AtomicLong(0);
361 private final Configuration conf;
362
363 ManyServersManyRegionsConnection(Configuration conf, boolean managed,
364 ExecutorService pool, User user)
365 throws IOException {
366 super(conf, managed, pool, user);
367 int serverCount = conf.getInt("hbase.test.servers", 10);
368 this.serversByClient =
369 new HashMap<ServerName, ClientService.BlockingInterface>(serverCount);
370 this.meta = makeMeta(Bytes.toBytes(
371 conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))),
372 conf.getInt("hbase.test.regions", 100),
373 conf.getLong("hbase.test.namespace.span", 1000),
374 serverCount);
375 this.conf = conf;
376 }
377
378 @Override
379 public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
380
381 ClientService.BlockingInterface stub = null;
382 synchronized (this.serversByClient) {
383 stub = this.serversByClient.get(sn);
384 if (stub == null) {
385 stub = new FakeServer(this.conf, meta, sequenceids);
386 this.serversByClient.put(sn, stub);
387 }
388 }
389 return stub;
390 }
391 }
392
393 static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
394 final AtomicLong sequenceids, final MultiRequest request) {
395
396 ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
397
398 RegionActionResult.Builder regionActionResultBuilder =
399 RegionActionResult.newBuilder();
400 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
401 for (RegionAction regionAction: request.getRegionActionList()) {
402 regionActionResultBuilder.clear();
403
404 for (ClientProtos.Action action: regionAction.getActionList()) {
405 roeBuilder.clear();
406
407 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
408 roeBuilder.setIndex(action.getIndex());
409 regionActionResultBuilder.addResultOrException(roeBuilder.build());
410 }
411 builder.addRegionActionResult(regionActionResultBuilder.build());
412 }
413 return builder.build();
414 }
415
416
417
418
419
420
421 static class FakeServer implements ClientService.BlockingInterface {
422 private AtomicInteger multiInvocationsCount = new AtomicInteger(0);
423 private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
424 private final AtomicLong sequenceids;
425 private final long multiPause;
426 private final int tooManyMultiRequests;
427
428 FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
429 final AtomicLong sequenceids) {
430 this.meta = meta;
431 this.sequenceids = sequenceids;
432
433
434
435 this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0);
436 this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3);
437 }
438
439 @Override
440 public GetResponse get(RpcController controller, GetRequest request)
441 throws ServiceException {
442 boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(),
443 request.getRegion().getType());
444 if (!metaRegion) {
445 return doGetResponse(request);
446 }
447 return doMetaGetResponse(meta, request);
448 }
449
450 private GetResponse doGetResponse(GetRequest request) {
451 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
452 ByteString row = request.getGet().getRow();
453 resultBuilder.addCell(getStartCode(row));
454 GetResponse.Builder builder = GetResponse.newBuilder();
455 builder.setResult(resultBuilder.build());
456 return builder.build();
457 }
458
459 @Override
460 public MutateResponse mutate(RpcController controller,
461 MutateRequest request) throws ServiceException {
462 throw new NotImplementedException();
463 }
464
465 @Override
466 public ScanResponse scan(RpcController controller,
467 ScanRequest request) throws ServiceException {
468
469
470 return doMetaScanResponse(meta, sequenceids, request);
471 }
472
473 @Override
474 public BulkLoadHFileResponse bulkLoadHFile(
475 RpcController controller, BulkLoadHFileRequest request)
476 throws ServiceException {
477 throw new NotImplementedException();
478 }
479
480 @Override
481 public CoprocessorServiceResponse execService(
482 RpcController controller, CoprocessorServiceRequest request)
483 throws ServiceException {
484 throw new NotImplementedException();
485 }
486
487 @Override
488 public MultiResponse multi(RpcController controller, MultiRequest request)
489 throws ServiceException {
490 int concurrentInvocations = this.multiInvocationsCount.incrementAndGet();
491 try {
492 if (concurrentInvocations >= tooManyMultiRequests) {
493 throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" +
494 concurrentInvocations));
495 }
496 Threads.sleep(multiPause);
497 return doMultiResponse(meta, sequenceids, request);
498 } finally {
499 this.multiInvocationsCount.decrementAndGet();
500 }
501 }
502
503 @Override
504 public CoprocessorServiceResponse execRegionServerService(RpcController controller,
505 CoprocessorServiceRequest request) throws ServiceException {
506 throw new NotImplementedException();
507 }
508 }
509
510 static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
511 final AtomicLong sequenceids, final ScanRequest request) {
512 ScanResponse.Builder builder = ScanResponse.newBuilder();
513 int max = request.getNumberOfRows();
514 int count = 0;
515 Map<byte [], Pair<HRegionInfo, ServerName>> tail =
516 request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta;
517 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
518 for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) {
519
520 if (max <= 0) break;
521 if (++count > max) break;
522 HRegionInfo hri = e.getValue().getFirst();
523 ByteString row = ByteStringer.wrap(hri.getRegionName());
524 resultBuilder.clear();
525 resultBuilder.addCell(getRegionInfo(row, hri));
526 resultBuilder.addCell(getServer(row, e.getValue().getSecond()));
527 resultBuilder.addCell(getStartCode(row));
528 builder.addResults(resultBuilder.build());
529
530 if (hri.getEndKey().length <= 0) builder.setMoreResults(false);
531 else builder.setMoreResults(true);
532 }
533
534 builder.setScannerId(request.hasScannerId()?
535 request.getScannerId(): sequenceids.incrementAndGet());
536 return builder.build();
537 }
538
539 static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
540 final GetRequest request) {
541 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
542 ByteString row = request.getGet().getRow();
543 Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray());
544 if (p == null) {
545 if (request.getGet().getClosestRowBefore()) {
546 byte [] bytes = row.toByteArray();
547 SortedMap<byte [], Pair<HRegionInfo, ServerName>> head =
548 bytes != null? meta.headMap(bytes): meta;
549 p = head == null? null: head.get(head.lastKey());
550 }
551 }
552 if (p != null) {
553 resultBuilder.addCell(getRegionInfo(row, p.getFirst()));
554 resultBuilder.addCell(getServer(row, p.getSecond()));
555 }
556 resultBuilder.addCell(getStartCode(row));
557 GetResponse.Builder builder = GetResponse.newBuilder();
558 builder.setResult(resultBuilder.build());
559 return builder.build();
560 }
561
562
563
564
565
566
567 static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) {
568 switch (type) {
569 case REGION_NAME:
570 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name);
571 case ENCODED_REGION_NAME:
572 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name);
573 default: throw new UnsupportedOperationException();
574 }
575 }
576
577 private final static ByteString CATALOG_FAMILY_BYTESTRING =
578 ByteStringer.wrap(HConstants.CATALOG_FAMILY);
579 private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING =
580 ByteStringer.wrap(HConstants.REGIONINFO_QUALIFIER);
581 private final static ByteString SERVER_QUALIFIER_BYTESTRING =
582 ByteStringer.wrap(HConstants.SERVER_QUALIFIER);
583
584 static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) {
585 CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder();
586 cellBuilder.setRow(row);
587 cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING);
588 cellBuilder.setTimestamp(System.currentTimeMillis());
589 return cellBuilder;
590 }
591
592 static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) {
593 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
594 cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING);
595 cellBuilder.setValue(ByteStringer.wrap(hri.toByteArray()));
596 return cellBuilder.build();
597 }
598
599 static CellProtos.Cell getServer(final ByteString row, final ServerName sn) {
600 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
601 cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING);
602 cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort()));
603 return cellBuilder.build();
604 }
605
606 static CellProtos.Cell getStartCode(final ByteString row) {
607 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
608 cellBuilder.setQualifier(ByteStringer.wrap(HConstants.STARTCODE_QUALIFIER));
609
610 cellBuilder.setValue(ByteStringer.wrap(Bytes.toBytes(META_SERVERNAME.getStartcode())));
611 return cellBuilder.build();
612 }
613
614 private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t");
615
616
617
618
619
620
621
622
623 private static byte [] format(final long number) {
624 byte [] b = new byte[10];
625 long d = number;
626 for (int i = b.length - 1; i >= 0; i--) {
627 b[i] = (byte)((d % 10) + '0');
628 d /= 10;
629 }
630 return b;
631 }
632
633
634
635
636
637
638 private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count,
639 final long namespaceSpan) {
640 byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
641 byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
642 long interval = namespaceSpan / count;
643 HRegionInfo [] hris = new HRegionInfo[count];
644 for (int i = 0; i < count; i++) {
645 if (i == 0) {
646 endKey = format(interval);
647 } else {
648 startKey = endKey;
649 if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY;
650 else endKey = format((i + 1) * interval);
651 }
652 hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey);
653 }
654 return hris;
655 }
656
657
658
659
660
661 private static ServerName [] makeServerNames(final int count) {
662 ServerName [] sns = new ServerName[count];
663 for (int i = 0; i < count; i++) {
664 sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i);
665 }
666 return sns;
667 }
668
669
670
671
672 private static class MetaRowsComparator implements Comparator<byte []> {
673 private final KeyValue.KVComparator delegate = new KeyValue.MetaComparator();
674 @Override
675 public int compare(byte[] left, byte[] right) {
676 return delegate.compareRows(left, 0, left.length, right, 0, right.length);
677 }
678 }
679
680
681
682
683
684
685 static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName,
686 final int regionCount, final long namespaceSpan, final int serverCount) {
687
688 SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta =
689 new ConcurrentSkipListMap<byte[], Pair<HRegionInfo,ServerName>>(new MetaRowsComparator());
690 HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan);
691 ServerName [] serverNames = makeServerNames(serverCount);
692 int per = regionCount / serverCount;
693 int count = 0;
694 for (HRegionInfo hri: hris) {
695 Pair<HRegionInfo, ServerName> p =
696 new Pair<HRegionInfo, ServerName>(hri, serverNames[count++ / per]);
697 meta.put(hri.getRegionName(), p);
698 }
699 return meta;
700 }
701
702
703
704
705
706
707
708
709
710 static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException {
711 long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
712 long startTime = System.currentTimeMillis();
713 final int printInterval = 100000;
714 Random rd = new Random(id);
715 boolean get = c.getBoolean("hbase.test.do.gets", false);
716 TableName tableName = TableName.valueOf(BIG_USER_TABLE);
717 if (get) {
718 try (Table table = sharedConnection.getTable(tableName)){
719 Stopwatch stopWatch = new Stopwatch();
720 stopWatch.start();
721 for (int i = 0; i < namespaceSpan; i++) {
722 byte [] b = format(rd.nextLong());
723 Get g = new Get(b);
724 table.get(g);
725 if (i % printInterval == 0) {
726 LOG.info("Get " + printInterval + "/" + stopWatch.elapsedMillis());
727 stopWatch.reset();
728 stopWatch.start();
729 }
730 }
731 LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
732 (System.currentTimeMillis() - startTime) + "ms");
733 }
734 } else {
735 try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) {
736 Stopwatch stopWatch = new Stopwatch();
737 stopWatch.start();
738 for (int i = 0; i < namespaceSpan; i++) {
739 byte [] b = format(rd.nextLong());
740 Put p = new Put(b);
741 p.add(HConstants.CATALOG_FAMILY, b, b);
742 mutator.mutate(p);
743 if (i % printInterval == 0) {
744 LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
745 stopWatch.reset();
746 stopWatch.start();
747 }
748 }
749 LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
750 (System.currentTimeMillis() - startTime) + "ms");
751 }
752 }
753 }
754
755 @Override
756 public int run(String[] arg0) throws Exception {
757 int errCode = 0;
758
759
760 final int servers = 1;
761
762 final int regions = 100000;
763
764 final long namespaceSpan = 50000000;
765
766
767 final long multiPause = 0;
768
769 if ((namespaceSpan < regions) || (regions < servers)) {
770 throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" +
771 regions + " which must be > servers=" + servers);
772 }
773
774
775 getConf().set("hbase.client.connection.impl",
776 ManyServersManyRegionsConnection.class.getName());
777
778 getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName());
779
780
781
782 getConf().setInt("hbase.client.start.log.errors.counter", 0);
783
784
785 getConf().setInt("hbase.test.regions", regions);
786 getConf().setLong("hbase.test.namespace.span", namespaceSpan);
787 getConf().setLong("hbase.test.servers", servers);
788 getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE));
789 getConf().setLong("hbase.test.multi.pause.when.done", multiPause);
790
791 getConf().setInt("hbase.test.multi.too.many", 10);
792 final int clients = 2;
793
794
795
796 final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p"));
797
798
799 final Connection sharedConnection = ConnectionFactory.createConnection(getConf()
800 try {
801 Thread [] ts = new Thread[clients];
802 for (int j = 0; j < ts.length; j++) {
803 final int id = j;
804 ts[j] = new Thread("" + j) {
805 final Configuration c = getConf();
806
807 @Override
808 public void run() {
809 try {
810 cycle(id, c, sharedConnection);
811 } catch (IOException e) {
812 e.printStackTrace();
813 }
814 }
815 };
816 ts[j].start();
817 }
818 for (int j = 0; j < ts.length; j++) {
819 ts[j].join();
820 }
821 } finally {
822 sharedConnection.close();
823 }
824 return errCode;
825 }
826
827
828
829
830
831
832 public static void main(String[] args) throws Exception {
833 System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args));
834 }
835 }