1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.LargeTests;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.junit.AfterClass;
35 import org.junit.BeforeClass;
36 import org.junit.Test;
37 import org.junit.experimental.categories.Category;
38
39 @Category(LargeTests.class)
40 public class TestHTableMultiplexer {
41 final Log LOG = LogFactory.getLog(getClass());
42 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
43 private static byte[] FAMILY = Bytes.toBytes("testFamily");
44 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
45 private static byte[] VALUE1 = Bytes.toBytes("testValue1");
46 private static byte[] VALUE2 = Bytes.toBytes("testValue2");
47 private static int SLAVES = 3;
48 private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
49
50
51
52
53 @BeforeClass
54 public static void setUpBeforeClass() throws Exception {
55 TEST_UTIL.startMiniCluster(SLAVES);
56 }
57
58
59
60
61 @AfterClass
62 public static void tearDownAfterClass() throws Exception {
63 TEST_UTIL.shutdownMiniCluster();
64 }
65
66 @Test
67 public void testHTableMultiplexer() throws Exception {
68 TableName TABLE =
69 TableName.valueOf("testHTableMultiplexer");
70 final int NUM_REGIONS = 10;
71 final int VERSION = 3;
72 List<Put> failedPuts;
73 boolean success;
74
75 HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
76 PER_REGIONSERVER_QUEUE_SIZE);
77
78 HTable ht = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, VERSION,
79 Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
80 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
81
82 byte[][] startRows = ht.getStartKeys();
83 byte[][] endRows = ht.getEndKeys();
84
85
86 for (int i = 0; i < NUM_REGIONS; i++) {
87 byte [] row = startRows[i];
88 if (row == null || row.length <= 0) continue;
89 Put put = new Put(row);
90 put.add(FAMILY, QUALIFIER, VALUE1);
91 success = multiplexer.put(TABLE, put);
92 assertTrue(success);
93
94 LOG.info("Put for " + Bytes.toString(startRows[i]) + " @ iteration " + (i+1));
95
96
97 Get get = new Get(startRows[i]);
98 get.addColumn(FAMILY, QUALIFIER);
99 Result r;
100 int nbTry = 0;
101 do {
102 assertTrue(nbTry++ < 50);
103 Thread.sleep(100);
104 r = ht.get(get);
105 } while (r == null || r.getValue(FAMILY, QUALIFIER) == null);
106 assertEquals(0, Bytes.compareTo(VALUE1, r.getValue(FAMILY, QUALIFIER)));
107 }
108
109
110 List<Put> multiput = new ArrayList<Put>();
111 for (int i = 0; i < NUM_REGIONS; i++) {
112 byte [] row = endRows[i];
113 if (row == null || row.length <= 0) continue;
114 Put put = new Put(row);
115 put.add(FAMILY, QUALIFIER, VALUE2);
116 multiput.add(put);
117 }
118 failedPuts = multiplexer.put(TABLE, multiput);
119 assertTrue(failedPuts == null);
120
121
122 for (int i = 0; i < NUM_REGIONS; i++) {
123 byte [] row = endRows[i];
124 if (row == null || row.length <= 0) continue;
125 Get get = new Get(row);
126 get.addColumn(FAMILY, QUALIFIER);
127 Result r;
128 int nbTry = 0;
129 do {
130 assertTrue(nbTry++ < 50);
131 Thread.sleep(100);
132 r = ht.get(get);
133 } while (r == null || r.getValue(FAMILY, QUALIFIER) == null ||
134 Bytes.compareTo(VALUE2, r.getValue(FAMILY, QUALIFIER)) != 0);
135 }
136 }
137 }