1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
22 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
23
24 import java.io.IOException;
25 import java.io.PrintWriter;
26 import java.io.StringWriter;
27 import java.util.Arrays;
28 import java.util.HashSet;
29 import java.util.Set;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.client.HTableInterface;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
39 import org.apache.hadoop.hbase.client.Table;
40 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
41 import org.apache.hadoop.util.StringUtils;
42
43
44 public class MultiThreadedWriter extends MultiThreadedWriterBase {
45 private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
46
47 protected Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
48
49 protected boolean isMultiPut = false;
50
51 public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
52 TableName tableName) throws IOException {
53 super(dataGen, conf, tableName, "W");
54 }
55
56
57 public void setMultiPut(boolean isMultiPut) {
58 this.isMultiPut = isMultiPut;
59 }
60
61 @Override
62 public void start(long startKey, long endKey, int numThreads) throws IOException {
63 super.start(startKey, endKey, numThreads);
64
65 if (verbose) {
66 LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
67 }
68
69 createWriterThreads(numThreads);
70
71 startThreads(writers);
72 }
73
74 protected void createWriterThreads(int numThreads) throws IOException {
75 for (int i = 0; i < numThreads; ++i) {
76 HBaseWriterThread writer = new HBaseWriterThread(i);
77 Threads.setLoggingUncaughtExceptionHandler(writer);
78 writers.add(writer);
79 }
80 }
81
82 public class HBaseWriterThread extends Thread {
83 private final Table table;
84
85 public HBaseWriterThread(int writerId) throws IOException {
86 setName(getClass().getSimpleName() + "_" + writerId);
87 table = createTable();
88 }
89
90 protected HTableInterface createTable() throws IOException {
91 return connection.getTable(tableName);
92 }
93
94 @Override
95 public void run() {
96 try {
97 long rowKeyBase;
98 byte[][] columnFamilies = dataGenerator.getColumnFamilies();
99 while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) {
100 byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
101 Put put = new Put(rowKey);
102 numKeys.addAndGet(1);
103 int columnCount = 0;
104 for (byte[] cf : columnFamilies) {
105 byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
106 for (byte[] column : columns) {
107 byte[] value = dataGenerator.generateValue(rowKey, cf, column);
108 put.add(cf, column, value);
109 ++columnCount;
110 if (!isMultiPut) {
111 insert(table, put, rowKeyBase);
112 numCols.addAndGet(1);
113 put = new Put(rowKey);
114 }
115 }
116 long rowKeyHash = Arrays.hashCode(rowKey);
117 put.add(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY);
118 put.add(cf, INCREMENT, Bytes.toBytes(rowKeyHash));
119 if (!isMultiPut) {
120 insert(table, put, rowKeyBase);
121 numCols.addAndGet(1);
122 put = new Put(rowKey);
123 }
124 }
125 if (isMultiPut) {
126 if (verbose) {
127 LOG.debug("Preparing put for key = [" + rowKey + "], " + columnCount + " columns");
128 }
129 insert(table, put, rowKeyBase);
130 numCols.addAndGet(columnCount);
131 }
132 if (trackWroteKeys) {
133 wroteKeys.add(rowKeyBase);
134 }
135 }
136 } finally {
137 closeHTable();
138 numThreadsWorking.decrementAndGet();
139 }
140 }
141
142 public void insert(Table table, Put put, long keyBase) {
143 long start = System.currentTimeMillis();
144 try {
145 put = (Put) dataGenerator.beforeMutate(keyBase, put);
146 table.put(put);
147 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
148 } catch (IOException e) {
149 failedKeySet.add(keyBase);
150 String exceptionInfo;
151 if (e instanceof RetriesExhaustedWithDetailsException) {
152 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
153 exceptionInfo = aggEx.getExhaustiveDescription();
154 } else {
155 StringWriter stackWriter = new StringWriter();
156 PrintWriter pw = new PrintWriter(stackWriter);
157 e.printStackTrace(pw);
158 pw.flush();
159 exceptionInfo = StringUtils.stringifyException(e);
160 }
161 LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
162 + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow())
163 + "; errors: " + exceptionInfo);
164 }
165 }
166 protected void closeHTable() {
167 try {
168 if (table != null) {
169 table.close();
170 }
171 } catch (IOException e) {
172 LOG.error("Error closing table", e);
173 }
174 }
175 }
176
177 @Override
178 public void waitForFinish() {
179 super.waitForFinish();
180 System.out.println("Failed to write keys: " + failedKeySet.size());
181 for (Long key : failedKeySet) {
182 System.out.println("Failed to write key: " + key);
183 }
184 }
185 }