View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.trace;
20  
21  import org.apache.commons.cli.CommandLine;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.HBaseConfiguration;
26  import org.apache.hadoop.hbase.IntegrationTestingUtility;
27  import org.apache.hadoop.hbase.IntegrationTests;
28  import org.apache.hadoop.hbase.client.Get;
29  import org.apache.hadoop.hbase.client.HBaseAdmin;
30  import org.apache.hadoop.hbase.client.HTable;
31  import org.apache.hadoop.hbase.client.Put;
32  import org.apache.hadoop.hbase.client.Result;
33  import org.apache.hadoop.hbase.client.ResultScanner;
34  import org.apache.hadoop.hbase.client.Scan;
35  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.util.ToolRunner;
38  import org.cloudera.htrace.Sampler;
39  import org.cloudera.htrace.Span;
40  import org.cloudera.htrace.Trace;
41  import org.cloudera.htrace.TraceScope;
42  import org.junit.Before;
43  import org.junit.Test;
44  import org.junit.experimental.categories.Category;
45  
46  import java.io.IOException;
47  import java.util.Random;
48  import java.util.concurrent.ExecutorService;
49  import java.util.concurrent.Executors;
50  import java.util.concurrent.LinkedBlockingQueue;
51  import java.util.concurrent.TimeUnit;
52  
53  @Category(IntegrationTests.class)
54  public class IntegrationTestSendTraceRequests extends AbstractHBaseTool {
55  
56    public static final String TABLE_ARG = "t";
57    public static final String CF_ARG = "f";
58  
59    public static final String TABLE_NAME_DEFAULT = "SendTracesTable";
60    public static final String COLUMN_FAMILY_DEFAULT = "D";
61    private String tableName = TABLE_NAME_DEFAULT;
62    private String familyName = COLUMN_FAMILY_DEFAULT;
63    private IntegrationTestingUtility util;
64    private Random random = new Random();
65    private HBaseAdmin admin;
66    private SpanReceiverHost receiverHost;
67  
68    public static void main(String[] args) throws Exception {
69      Configuration configuration = HBaseConfiguration.create();
70      IntegrationTestingUtility.setUseDistributedCluster(configuration);
71      IntegrationTestSendTraceRequests tool = new IntegrationTestSendTraceRequests();
72      ToolRunner.run(configuration, tool, args);
73    }
74  
75    @Override
76    protected void addOptions() {
77      addOptWithArg(TABLE_ARG, "The table name to target.  Will be created if not there already.");
78      addOptWithArg(CF_ARG, "The family to target");
79    }
80  
81    @Override
82    public void processOptions(CommandLine cmd) {
83      String tableNameString = cmd.getOptionValue(TABLE_ARG, TABLE_NAME_DEFAULT);
84      String familyString = cmd.getOptionValue(CF_ARG, COLUMN_FAMILY_DEFAULT);
85  
86      this.tableName = tableNameString;
87      this.familyName = familyString;
88    }
89  
90    @Override
91    public int doWork() throws Exception {
92      internalDoWork();
93      return 0;
94    }
95  
96    @Test
97    public void internalDoWork() throws Exception {
98      util = createUtil();
99      admin = util.getHBaseAdmin();
100     setupReceiver();
101 
102     deleteTable();
103     createTable();
104     LinkedBlockingQueue<Long> rks = insertData();
105 
106     ExecutorService service = Executors.newFixedThreadPool(20);
107     doScans(service, rks);
108     doGets(service, rks);
109 
110     service.shutdown();
111     service.awaitTermination(100, TimeUnit.SECONDS);
112     Thread.sleep(90000);
113     receiverHost.closeReceivers();
114     util.restoreCluster();
115     util = null;
116   }
117 
118   private void doScans(ExecutorService service, final LinkedBlockingQueue<Long> rks) {
119 
120       for (int i = 0; i < 100; i++) {
121         Runnable runnable = new Runnable() {
122           private TraceScope innerScope = null;
123           private final LinkedBlockingQueue<Long> rowKeyQueue = rks;
124           @Override
125           public void run() {
126             ResultScanner rs = null;
127             try {
128               innerScope = Trace.startSpan("Scan", Sampler.ALWAYS);
129               HTable ht = new HTable(util.getConfiguration(), tableName);
130               Scan s = new Scan();
131               s.setStartRow(Bytes.toBytes(rowKeyQueue.take()));
132               s.setBatch(7);
133               rs = ht.getScanner(s);
134               // Something to keep the jvm from removing the loop.
135               long accum = 0;
136 
137               for(int x = 0; x < 1000; x++) {
138                 Result r = rs.next();
139                 accum |= Bytes.toLong(r.getRow());
140               }
141 
142               innerScope.getSpan().addTimelineAnnotation("Accum result = " + accum);
143 
144               ht.close();
145               ht = null;
146             } catch (IOException e) {
147               e.printStackTrace();
148 
149               innerScope.getSpan().addKVAnnotation(
150                   Bytes.toBytes("exception"),
151                   Bytes.toBytes(e.getClass().getSimpleName()));
152 
153             } catch (Exception e) {
154             } finally {
155               if (innerScope != null) innerScope.close();
156               if (rs != null) rs.close();
157             }
158 
159           }
160         };
161         service.submit(runnable);
162       }
163 
164   }
165 
166   private void doGets(ExecutorService service, final LinkedBlockingQueue<Long> rowKeys)
167       throws IOException {
168     for (int i = 0; i < 100; i++) {
169       Runnable runnable = new Runnable() {
170         private TraceScope innerScope = null;
171         private final LinkedBlockingQueue<Long> rowKeyQueue = rowKeys;
172 
173         @Override
174         public void run() {
175 
176 
177           HTable ht = null;
178           try {
179             ht = new HTable(util.getConfiguration(), tableName);
180           } catch (IOException e) {
181             e.printStackTrace();
182           }
183 
184           long accum = 0;
185           for (int x = 0; x < 5; x++) {
186             try {
187               innerScope = Trace.startSpan("gets", Sampler.ALWAYS);
188               long rk = rowKeyQueue.take();
189               Result r1 = ht.get(new Get(Bytes.toBytes(rk)));
190               if (r1 != null) {
191                 accum |= Bytes.toLong(r1.getRow());
192               }
193               Result r2 = ht.get(new Get(Bytes.toBytes(rk)));
194               if (r2 != null) {
195                 accum |= Bytes.toLong(r2.getRow());
196               }
197               innerScope.getSpan().addTimelineAnnotation("Accum = " + accum);
198 
199             } catch (IOException e) {
200               // IGNORED
201             } catch (InterruptedException ie) {
202               // IGNORED
203             } finally {
204               if (innerScope != null) innerScope.close();
205             }
206           }
207 
208         }
209       };
210       service.submit(runnable);
211     }
212   }
213 
214   private void createTable() throws IOException {
215     TraceScope createScope = null;
216     try {
217       createScope = Trace.startSpan("createTable", Sampler.ALWAYS);
218       util.createTable(tableName, familyName);
219     } finally {
220       if (createScope != null) createScope.close();
221     }
222   }
223 
224   private void deleteTable() throws IOException {
225     TraceScope deleteScope = null;
226 
227     try {
228       if (admin.tableExists(tableName)) {
229         deleteScope = Trace.startSpan("deleteTable", Sampler.ALWAYS);
230         util.deleteTable(tableName);
231       }
232     } finally {
233       if (deleteScope != null) deleteScope.close();
234     }
235   }
236 
237   private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException {
238     LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<Long>(25000);
239     HTable ht = new HTable(util.getConfiguration(), this.tableName);
240     byte[] value = new byte[300];
241     for (int x = 0; x < 5000; x++) {
242       TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS);
243       try {
244         ht.setAutoFlush(false, true);
245         for (int i = 0; i < 5; i++) {
246           long rk = random.nextLong();
247           rowKeys.add(rk);
248           Put p = new Put(Bytes.toBytes(rk));
249           for (int y = 0; y < 10; y++) {
250             random.nextBytes(value);
251             p.add(Bytes.toBytes(familyName),
252                 Bytes.toBytes(random.nextLong()),
253                 value);
254           }
255           ht.put(p);
256         }
257         if ((x % 1000) == 0) {
258           admin.flush(Bytes.toBytes(tableName));
259         }
260       } finally {
261         traceScope.close();
262       }
263     }
264     admin.flush(Bytes.toBytes(tableName));
265     return rowKeys;
266   }
267 
268   private IntegrationTestingUtility createUtil() throws Exception {
269     Configuration conf = getConf();
270     if (this.util == null) {
271       IntegrationTestingUtility u;
272       if (conf == null) {
273         u = new IntegrationTestingUtility();
274       } else {
275         u = new IntegrationTestingUtility(conf);
276       }
277       util = u;
278       util.initializeCluster(1);
279 
280     }
281     return this.util;
282   }
283 
284   private void setupReceiver() {
285     Configuration conf = new Configuration(util.getConfiguration());
286     conf.setBoolean("hbase.zipkin.is-in-client-mode", true);
287 
288     this.receiverHost = SpanReceiverHost.getInstance(conf);
289   }
290 }