1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.util.PriorityQueue;
23 import java.util.Queue;
24 import java.util.Set;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentSkipListSet;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HRegionLocation;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
38
39
40 public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
41 private static final Log LOG = LogFactory.getLog(MultiThreadedWriterBase.class);
42
43
44
45
46
47
48
49 protected BlockingQueue<Long> wroteKeys = new ArrayBlockingQueue<Long>(10000);
50
51
52
53
54
55 protected AtomicLong nextKeyToWrite = new AtomicLong();
56
57
58
59
60 protected AtomicLong wroteUpToKey = new AtomicLong();
61
62
63 protected Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
64
65
66
67
68
69
70 protected AtomicLong wroteKeyQueueSize = new AtomicLong();
71
72
73 protected boolean trackWroteKeys;
74
75 public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
76 TableName tableName, String actionLetter) {
77 super(dataGen, conf, tableName, actionLetter);
78 }
79
80 @Override
81 public void start(long startKey, long endKey, int numThreads) throws IOException {
82 super.start(startKey, endKey, numThreads);
83 nextKeyToWrite.set(startKey);
84 wroteUpToKey.set(startKey - 1);
85
86 if (trackWroteKeys) {
87 new Thread(new WroteKeysTracker()).start();
88 numThreadsWorking.incrementAndGet();
89 }
90 }
91
92 protected String getRegionDebugInfoSafe(HTable table, byte[] rowKey) {
93 HRegionLocation cached = null, real = null;
94 try {
95 cached = table.getRegionLocation(rowKey, false);
96 real = table.getRegionLocation(rowKey, true);
97 } catch (Throwable t) {
98
99 }
100 String result = "no information can be obtained";
101 if (cached != null) {
102 result = "cached: " + cached.toString();
103 }
104 if (real != null) {
105 if (real.equals(cached)) {
106 result += "; cache is up to date";
107 } else {
108 result = (cached != null) ? (result + "; ") : "";
109 result += "real: " + real.toString();
110 }
111 }
112 return result;
113 }
114
115
116
117
118
119 private class WroteKeysTracker implements Runnable {
120
121 @Override
122 public void run() {
123 Thread.currentThread().setName(getClass().getSimpleName());
124 try {
125 long expectedKey = startKey;
126 Queue<Long> sortedKeys = new PriorityQueue<Long>();
127 while (expectedKey < endKey) {
128
129 Long k;
130 try {
131 k = wroteKeys.poll(1, TimeUnit.SECONDS);
132 } catch (InterruptedException e) {
133 LOG.info("Inserted key tracker thread interrupted", e);
134 break;
135 }
136 if (k == null) {
137 continue;
138 }
139 if (k == expectedKey) {
140
141 wroteUpToKey.set(k);
142 ++expectedKey;
143 } else {
144 sortedKeys.add(k);
145 }
146
147
148 while (!sortedKeys.isEmpty()
149 && ((k = sortedKeys.peek()) == expectedKey)) {
150 sortedKeys.poll();
151 wroteUpToKey.set(k);
152 ++expectedKey;
153 }
154
155 wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
156 }
157 } catch (Exception ex) {
158 LOG.error("Error in inserted/updaed key tracker", ex);
159 } finally {
160 numThreadsWorking.decrementAndGet();
161 }
162 }
163 }
164
165 public int getNumWriteFailures() {
166 return failedKeySet.size();
167 }
168
169
170
171
172
173 public long wroteUpToKey() {
174 return wroteUpToKey.get();
175 }
176
177 public boolean failedToWriteKey(long k) {
178 return failedKeySet.contains(k);
179 }
180
181 @Override
182 protected String progressInfo() {
183 StringBuilder sb = new StringBuilder();
184 appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
185 appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
186 return sb.toString();
187 }
188
189
190
191
192
193
194 public void setTrackWroteKeys(boolean enable) {
195 trackWroteKeys = enable;
196 }
197 }