1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.trace;
20
21 import org.apache.commons.cli.CommandLine;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.IntegrationTestingUtility;
27 import org.apache.hadoop.hbase.IntegrationTests;
28 import org.apache.hadoop.hbase.client.Get;
29 import org.apache.hadoop.hbase.client.HBaseAdmin;
30 import org.apache.hadoop.hbase.client.HTable;
31 import org.apache.hadoop.hbase.client.Put;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.client.ResultScanner;
34 import org.apache.hadoop.hbase.client.Scan;
35 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.util.ToolRunner;
38 import org.cloudera.htrace.Sampler;
39 import org.cloudera.htrace.Span;
40 import org.cloudera.htrace.Trace;
41 import org.cloudera.htrace.TraceScope;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45
46 import java.io.IOException;
47 import java.util.Random;
48 import java.util.concurrent.ExecutorService;
49 import java.util.concurrent.Executors;
50 import java.util.concurrent.LinkedBlockingQueue;
51 import java.util.concurrent.TimeUnit;
52
53 @Category(IntegrationTests.class)
54 public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
55
56 public static final String TABLE_ARG = "t";
57 public static final String CF_ARG = "f";
58
59 public static final String TABLE_NAME_DEFAULT = "SendTracesTable";
60 public static final String COLUMN_FAMILY_DEFAULT = "D";
61 private String tableName = TABLE_NAME_DEFAULT;
62 private String familyName = COLUMN_FAMILY_DEFAULT;
63 private IntegrationTestingUtility util;
64 private Random random = new Random();
65 private HBaseAdmin admin;
66 private SpanReceiverHost receiverHost;
67
68 public static void main(String[] args) throws Exception {
69 Configuration configuration = HBaseConfiguration.create();
70 IntegrationTestingUtility.setUseDistributedCluster(configuration);
71 IntegrationTestSendTraceRequests tool = new IntegrationTestSendTraceRequests();
72 ToolRunner.run(configuration, tool, args);
73 }
74
75 @Override
76 protected void addOptions() {
77 addOptWithArg(TABLE_ARG, "The table name to target. Will be created if not there already.");
78 addOptWithArg(CF_ARG, "The family to target");
79 }
80
81 @Override
82 public void processOptions(CommandLine cmd) {
83 String tableNameString = cmd.getOptionValue(TABLE_ARG, TABLE_NAME_DEFAULT);
84 String familyString = cmd.getOptionValue(CF_ARG, COLUMN_FAMILY_DEFAULT);
85
86 this.tableName = tableNameString;
87 this.familyName = familyString;
88 }
89
90 @Override
91 public int doWork() throws Exception {
92 internalDoWork();
93 return 0;
94 }
95
96 @Test
97 public void internalDoWork() throws Exception {
98 util = createUtil();
99 admin = util.getHBaseAdmin();
100 setupReceiver();
101
102 deleteTable();
103 createTable();
104 LinkedBlockingQueue<Long> rks = insertData();
105
106 ExecutorService service = Executors.newFixedThreadPool(20);
107 doScans(service, rks);
108 doGets(service, rks);
109
110 service.shutdown();
111 service.awaitTermination(100, TimeUnit.SECONDS);
112 Thread.sleep(90000);
113 receiverHost.closeReceivers();
114 util.restoreCluster();
115 util = null;
116 }
117
118 private void doScans(ExecutorService service, final LinkedBlockingQueue<Long> rks) {
119
120 for (int i = 0; i < 100; i++) {
121 Runnable runnable = new Runnable() {
122 private TraceScope innerScope = null;
123 private final LinkedBlockingQueue<Long> rowKeyQueue = rks;
124 @Override
125 public void run() {
126 ResultScanner rs = null;
127 try {
128 innerScope = Trace.startSpan("Scan", Sampler.ALWAYS);
129 HTable ht = new HTable(util.getConfiguration(), tableName);
130 Scan s = new Scan();
131 s.setStartRow(Bytes.toBytes(rowKeyQueue.take()));
132 s.setBatch(7);
133 rs = ht.getScanner(s);
134
135 long accum = 0;
136
137 for(int x = 0; x < 1000; x++) {
138 Result r = rs.next();
139 accum |= Bytes.toLong(r.getRow());
140 }
141
142 innerScope.getSpan().addTimelineAnnotation("Accum result = " + accum);
143
144 ht.close();
145 ht = null;
146 } catch (IOException e) {
147 e.printStackTrace();
148
149 innerScope.getSpan().addKVAnnotation(
150 Bytes.toBytes("exception"),
151 Bytes.toBytes(e.getClass().getSimpleName()));
152
153 } catch (Exception e) {
154 } finally {
155 if (innerScope != null) innerScope.close();
156 if (rs != null) rs.close();
157 }
158
159 }
160 };
161 service.submit(runnable);
162 }
163
164 }
165
166 private void doGets(ExecutorService service, final LinkedBlockingQueue<Long> rowKeys)
167 throws IOException {
168 for (int i = 0; i < 100; i++) {
169 Runnable runnable = new Runnable() {
170 private TraceScope innerScope = null;
171 private final LinkedBlockingQueue<Long> rowKeyQueue = rowKeys;
172
173 @Override
174 public void run() {
175
176
177 HTable ht = null;
178 try {
179 ht = new HTable(util.getConfiguration(), tableName);
180 } catch (IOException e) {
181 e.printStackTrace();
182 }
183
184 long accum = 0;
185 for (int x = 0; x < 5; x++) {
186 try {
187 innerScope = Trace.startSpan("gets", Sampler.ALWAYS);
188 long rk = rowKeyQueue.take();
189 Result r1 = ht.get(new Get(Bytes.toBytes(rk)));
190 if (r1 != null) {
191 accum |= Bytes.toLong(r1.getRow());
192 }
193 Result r2 = ht.get(new Get(Bytes.toBytes(rk)));
194 if (r2 != null) {
195 accum |= Bytes.toLong(r2.getRow());
196 }
197 innerScope.getSpan().addTimelineAnnotation("Accum = " + accum);
198
199 } catch (IOException e) {
200
201 } catch (InterruptedException ie) {
202
203 } finally {
204 if (innerScope != null) innerScope.close();
205 }
206 }
207
208 }
209 };
210 service.submit(runnable);
211 }
212 }
213
214 private void createTable() throws IOException {
215 TraceScope createScope = null;
216 try {
217 createScope = Trace.startSpan("createTable", Sampler.ALWAYS);
218 util.createTable(tableName, familyName);
219 } finally {
220 if (createScope != null) createScope.close();
221 }
222 }
223
224 private void deleteTable() throws IOException {
225 TraceScope deleteScope = null;
226
227 try {
228 if (admin.tableExists(tableName)) {
229 deleteScope = Trace.startSpan("deleteTable", Sampler.ALWAYS);
230 util.deleteTable(tableName);
231 }
232 } finally {
233 if (deleteScope != null) deleteScope.close();
234 }
235 }
236
237 private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException {
238 LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<Long>(25000);
239 HTable ht = new HTable(util.getConfiguration(), this.tableName);
240 byte[] value = new byte[300];
241 for (int x = 0; x < 5000; x++) {
242 TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS);
243 try {
244 ht.setAutoFlush(false, true);
245 for (int i = 0; i < 5; i++) {
246 long rk = random.nextLong();
247 rowKeys.add(rk);
248 Put p = new Put(Bytes.toBytes(rk));
249 for (int y = 0; y < 10; y++) {
250 random.nextBytes(value);
251 p.add(Bytes.toBytes(familyName),
252 Bytes.toBytes(random.nextLong()),
253 value);
254 }
255 ht.put(p);
256 }
257 if ((x % 1000) == 0) {
258 admin.flush(Bytes.toBytes(tableName));
259 }
260 } finally {
261 traceScope.close();
262 }
263 }
264 admin.flush(Bytes.toBytes(tableName));
265 return rowKeys;
266 }
267
268 private IntegrationTestingUtility createUtil() throws Exception {
269 Configuration conf = getConf();
270 if (this.util == null) {
271 IntegrationTestingUtility u;
272 if (conf == null) {
273 u = new IntegrationTestingUtility();
274 } else {
275 u = new IntegrationTestingUtility(conf);
276 }
277 util = u;
278 util.initializeCluster(1);
279
280 }
281 return this.util;
282 }
283
284 private void setupReceiver() {
285 Configuration conf = new Configuration(util.getConfiguration());
286 conf.setBoolean("hbase.zipkin.is-in-client-mode", true);
287
288 this.receiverHost = SpanReceiverHost.getInstance(conf);
289 }
290 }