1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.testclassification.LargeTests;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.junit.AfterClass;
35 import org.junit.BeforeClass;
36 import org.junit.Test;
37 import org.junit.experimental.categories.Category;
38
39 @Category(LargeTests.class)
40 public class TestHTableMultiplexer {
41 private static final Log LOG = LogFactory.getLog(TestHTableMultiplexer.class);
42 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
43 private static byte[] FAMILY = Bytes.toBytes("testFamily");
44 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
45 private static byte[] VALUE1 = Bytes.toBytes("testValue1");
46 private static byte[] VALUE2 = Bytes.toBytes("testValue2");
47 private static int SLAVES = 3;
48 private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
49
50
51
52
53 @BeforeClass
54 public static void setUpBeforeClass() throws Exception {
55 TEST_UTIL.startMiniCluster(SLAVES);
56 }
57
58
59
60
61 @AfterClass
62 public static void tearDownAfterClass() throws Exception {
63 TEST_UTIL.shutdownMiniCluster();
64 }
65
66 private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality)
67 throws Exception {
68
69 Result r;
70 Get get = new Get(row);
71 get.addColumn(FAMILY, QUALIFIER);
72 int nbTry = 0;
73 do {
74 assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50);
75 nbTry++;
76 Thread.sleep(100);
77 r = htable.get(get);
78 } while (r == null || r.getValue(FAMILY, QUALIFIER) == null);
79 assertEquals("value", Bytes.toStringBinary(VALUE1),
80 Bytes.toStringBinary(r.getValue(FAMILY, QUALIFIER)));
81 }
82
83 @Test
84 public void testHTableMultiplexer() throws Exception {
85 TableName TABLE_1 = TableName.valueOf("testHTableMultiplexer_1");
86 TableName TABLE_2 = TableName.valueOf("testHTableMultiplexer_2");
87 final int NUM_REGIONS = 10;
88 final int VERSION = 3;
89 List<Put> failedPuts;
90 boolean success;
91
92 HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
93 PER_REGIONSERVER_QUEUE_SIZE);
94
95 HTable htable1 =
96 TEST_UTIL.createTable(TABLE_1, new byte[][] { FAMILY }, VERSION,
97 Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
98 HTable htable2 =
99 TEST_UTIL.createTable(TABLE_2, new byte[][] { FAMILY }, VERSION, Bytes.toBytes("aaaaa"),
100 Bytes.toBytes("zzzzz"), NUM_REGIONS);
101 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_1);
102 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_2);
103
104 byte[][] startRows = htable1.getStartKeys();
105 byte[][] endRows = htable1.getEndKeys();
106
107
108 for (int i = 0; i < NUM_REGIONS; i++) {
109 byte [] row = startRows[i];
110 if (row == null || row.length <= 0) continue;
111 Put put = new Put(row).add(FAMILY, QUALIFIER, VALUE1);
112 success = multiplexer.put(TABLE_1, put);
113 assertTrue("multiplexer.put returns", success);
114
115 put = new Put(row).add(FAMILY, QUALIFIER, VALUE1);
116 success = multiplexer.put(TABLE_2, put);
117 assertTrue("multiplexer.put failed", success);
118
119 LOG.info("Put for " + Bytes.toStringBinary(startRows[i]) + " @ iteration " + (i + 1));
120
121
122 checkExistence(htable1, startRows[i], FAMILY, QUALIFIER);
123 checkExistence(htable2, startRows[i], FAMILY, QUALIFIER);
124 }
125
126
127 List<Put> multiput = new ArrayList<Put>();
128 for (int i = 0; i < NUM_REGIONS; i++) {
129 byte [] row = endRows[i];
130 if (row == null || row.length <= 0) continue;
131 Put put = new Put(row);
132 put.add(FAMILY, QUALIFIER, VALUE2);
133 multiput.add(put);
134 }
135 failedPuts = multiplexer.put(TABLE_1, multiput);
136 assertTrue(failedPuts == null);
137
138
139 for (int i = 0; i < NUM_REGIONS; i++) {
140 byte [] row = endRows[i];
141 if (row == null || row.length <= 0) continue;
142 Get get = new Get(row);
143 get.addColumn(FAMILY, QUALIFIER);
144 Result r;
145 int nbTry = 0;
146 do {
147 assertTrue(nbTry++ < 50);
148 Thread.sleep(100);
149 r = htable1.get(get);
150 } while (r == null || r.getValue(FAMILY, QUALIFIER) == null ||
151 Bytes.compareTo(VALUE2, r.getValue(FAMILY, QUALIFIER)) != 0);
152 }
153 }
154 }