1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.IOException;
25 import java.util.Random;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.HColumnDescriptor;
33 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.HTableDescriptor;
36 import org.apache.hadoop.hbase.MediumTests;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.client.Get;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Rule;
46 import org.junit.Test;
47 import org.junit.experimental.categories.Category;
48 import org.junit.rules.TestName;
49
50
51
52
53
54
55 @Category(MediumTests.class)
56 public class TestParallelPut {
57 static final Log LOG = LogFactory.getLog(TestParallelPut.class);
58 @Rule public TestName name = new TestName();
59
60 private static HRegion region = null;
61 private static HBaseTestingUtility hbtu = new HBaseTestingUtility();
62
63
64 static byte[] tableName;
65 static final byte[] qual1 = Bytes.toBytes("qual1");
66 static final byte[] qual2 = Bytes.toBytes("qual2");
67 static final byte[] qual3 = Bytes.toBytes("qual3");
68 static final byte[] value1 = Bytes.toBytes("value1");
69 static final byte[] value2 = Bytes.toBytes("value2");
70 static final byte [] row = Bytes.toBytes("rowA");
71 static final byte [] row2 = Bytes.toBytes("rowB");
72
73
74
75
76 @Before
77 public void setUp() throws Exception {
78 tableName = Bytes.toBytes(name.getMethodName());
79 }
80
81 @After
82 public void tearDown() throws Exception {
83 EnvironmentEdgeManagerTestHelper.reset();
84 }
85
86 public String getName() {
87 return name.getMethodName();
88 }
89
90
91
92
93
94
95
96
97
98 @Test
99 public void testPut() throws IOException {
100 LOG.info("Starting testPut");
101 initHRegion(tableName, getName(), fam1);
102
103 long value = 1L;
104
105 Put put = new Put(row);
106 put.add(fam1, qual1, Bytes.toBytes(value));
107 region.put(put);
108
109 assertGet(row, fam1, qual1, Bytes.toBytes(value));
110 }
111
112
113
114
115 @Test
116 public void testParallelPuts() throws IOException {
117
118 LOG.info("Starting testParallelPuts");
119 initHRegion(tableName, getName(), fam1);
120 int numOps = 1000;
121
122
123 int numThreads = 100;
124 Putter[] all = new Putter[numThreads];
125
126
127 for (int i = 0; i < numThreads; i++) {
128 all[i] = new Putter(region, i, numOps);
129 }
130
131
132 for (int i = 0; i < numThreads; i++) {
133 all[i].start();
134 }
135
136
137 for (int i = 0; i < numThreads; i++) {
138 try {
139 all[i].join();
140 } catch (InterruptedException e) {
141 LOG.warn("testParallelPuts encountered InterruptedException." +
142 " Ignoring....", e);
143 }
144 }
145 LOG.info("testParallelPuts successfully verified " +
146 (numOps * numThreads) + " put operations.");
147 }
148
149
150 static private void assertGet(byte [] row,
151 byte [] familiy,
152 byte[] qualifier,
153 byte[] value) throws IOException {
154
155 Get get = new Get(row);
156 get.addColumn(familiy, qualifier);
157 Result result = region.get(get);
158 assertEquals(1, result.size());
159
160 Cell kv = result.rawCells()[0];
161 byte[] r = CellUtil.cloneValue(kv);
162 assertTrue(Bytes.compareTo(r, value) == 0);
163 }
164
165 private void initHRegion(byte [] tableName, String callingMethod,
166 byte[] ... families)
167 throws IOException {
168 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
169 for(byte [] family : families) {
170 htd.addFamily(new HColumnDescriptor(family));
171 }
172 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
173 region = hbtu.createLocalHRegion(info, htd);
174 }
175
176
177
178
179 public static class Putter extends Thread {
180
181 private final HRegion region;
182 private final int threadNumber;
183 private final int numOps;
184 private final Random rand = new Random();
185 byte [] rowkey = null;
186
187 public Putter(HRegion region, int threadNumber, int numOps) {
188 this.region = region;
189 this.threadNumber = threadNumber;
190 this.numOps = numOps;
191 this.rowkey = Bytes.toBytes((long)threadNumber);
192 setDaemon(true);
193 }
194
195 @Override
196 public void run() {
197 byte[] value = new byte[100];
198 Put[] in = new Put[1];
199
200
201 for (int i=0; i<numOps; i++) {
202
203 rand.nextBytes(value);
204
205
206
207 Put put = new Put(rowkey);
208 put.add(fam1, qual1, value);
209 in[0] = put;
210 try {
211 OperationStatus[] ret = region.batchMutate(in);
212 assertEquals(1, ret.length);
213 assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
214 assertGet(rowkey, fam1, qual1, value);
215 } catch (IOException e) {
216 assertTrue("Thread id " + threadNumber + " operation " + i + " failed.",
217 false);
218 }
219 }
220 }
221 }
222
223 }
224