1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.util.AbstractMap.SimpleEntry;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentSkipListMap;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.classification.InterfaceAudience;
39 import org.apache.hadoop.classification.InterfaceStability;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.HRegionLocation;
44 import org.apache.hadoop.hbase.TableName;
45 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
46 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 @InterfaceAudience.Public
63 @InterfaceStability.Evolving
64 public class HTableMultiplexer {
65 private static final Log LOG = LogFactory.getLog(HTableMultiplexer.class.getName());
66 private static int poolID = 0;
67
68 static final String TABLE_MULTIPLEXER_FLUSH_FREQ_MS = "hbase.tablemultiplexer.flush.frequency.ms";
69
70 private Map<TableName, HTable> tableNameToHTableMap;
71
72
73 private Map<HRegionLocation, LinkedBlockingQueue<PutStatus>>
74 serverToBufferQueueMap;
75
76
77 private Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap;
78
79 private Configuration conf;
80 private int retryNum;
81 private int perRegionServerBufferQueueSize;
82
83
84
85
86
87
88
89 public HTableMultiplexer(Configuration conf,
90 int perRegionServerBufferQueueSize) throws ZooKeeperConnectionException {
91 this.conf = conf;
92 this.serverToBufferQueueMap = new ConcurrentHashMap<HRegionLocation,
93 LinkedBlockingQueue<PutStatus>>();
94 this.serverToFlushWorkerMap = new ConcurrentHashMap<HRegionLocation, HTableFlushWorker>();
95 this.tableNameToHTableMap = new ConcurrentSkipListMap<TableName, HTable>();
96 this.retryNum = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
97 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
98 this.perRegionServerBufferQueueSize = perRegionServerBufferQueueSize;
99 }
100
101
102
103
104
105
106
107
108
109 public boolean put(TableName tableName, final Put put) throws IOException {
110 return put(tableName, put, this.retryNum);
111 }
112
113 public boolean put(byte[] tableName, final Put put) throws IOException {
114 return put(TableName.valueOf(tableName), put);
115 }
116
117
118
119
120
121
122
123
124
125 public List<Put> put(TableName tableName, final List<Put> puts)
126 throws IOException {
127 if (puts == null)
128 return null;
129
130 List <Put> failedPuts = null;
131 boolean result;
132 for (Put put : puts) {
133 result = put(tableName, put, this.retryNum);
134 if (result == false) {
135
136
137 if (failedPuts == null) {
138 failedPuts = new ArrayList<Put>();
139 }
140
141 failedPuts.add(put);
142 }
143 }
144 return failedPuts;
145 }
146
147 public List<Put> put(byte[] tableName, final List<Put> puts) throws IOException {
148 return put(TableName.valueOf(tableName), puts);
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162 public boolean put(final TableName tableName, final Put put, int retry)
163 throws IOException {
164 if (retry <= 0) {
165 return false;
166 }
167
168 LinkedBlockingQueue<PutStatus> queue;
169 HTable htable = getHTable(tableName);
170 try {
171 htable.validatePut(put);
172 HRegionLocation loc = htable.getRegionLocation(put.getRow(), false);
173 if (loc != null) {
174
175 queue = addNewRegionServer(loc, htable);
176
177 PutStatus s = new PutStatus(loc.getRegionInfo(), put, retry);
178
179 return queue.offer(s);
180 }
181 } catch (Exception e) {
182 LOG.debug("Cannot process the put " + put + " because of " + e);
183 }
184 return false;
185 }
186
187 public boolean put(final byte[] tableName, final Put put, int retry)
188 throws IOException {
189 return put(TableName.valueOf(tableName), put, retry);
190 }
191
192
193
194
195 public HTableMultiplexerStatus getHTableMultiplexerStatus() {
196 return new HTableMultiplexerStatus(serverToFlushWorkerMap);
197 }
198
199
200 private HTable getHTable(TableName tableName) throws IOException {
201 HTable htable = this.tableNameToHTableMap.get(tableName);
202 if (htable == null) {
203 synchronized (this.tableNameToHTableMap) {
204 htable = this.tableNameToHTableMap.get(tableName);
205 if (htable == null) {
206 htable = new HTable(conf, tableName);
207 this.tableNameToHTableMap.put(tableName, htable);
208 }
209 }
210 }
211 return htable;
212 }
213
214 private synchronized LinkedBlockingQueue<PutStatus> addNewRegionServer(
215 HRegionLocation addr, HTable htable) {
216 LinkedBlockingQueue<PutStatus> queue =
217 serverToBufferQueueMap.get(addr);
218 if (queue == null) {
219
220 queue = new LinkedBlockingQueue<PutStatus>(perRegionServerBufferQueueSize);
221 serverToBufferQueueMap.put(addr, queue);
222
223
224 HTableFlushWorker worker = new HTableFlushWorker(conf, addr,
225 this, queue, htable);
226 this.serverToFlushWorkerMap.put(addr, worker);
227
228
229
230 String name = "HTableFlushWorker-" + addr.getHostnamePort() + "-"
231 + (poolID++);
232 Thread t = new Thread(worker, name);
233 t.setDaemon(true);
234 t.start();
235 }
236 return queue;
237 }
238
239
240
241
242
243
244 static class HTableMultiplexerStatus {
245 private long totalFailedPutCounter;
246 private long totalBufferedPutCounter;
247 private long maxLatency;
248 private long overallAverageLatency;
249 private Map<String, Long> serverToFailedCounterMap;
250 private Map<String, Long> serverToBufferedCounterMap;
251 private Map<String, Long> serverToAverageLatencyMap;
252 private Map<String, Long> serverToMaxLatencyMap;
253
254 public HTableMultiplexerStatus(
255 Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
256 this.totalBufferedPutCounter = 0;
257 this.totalFailedPutCounter = 0;
258 this.maxLatency = 0;
259 this.overallAverageLatency = 0;
260 this.serverToBufferedCounterMap = new HashMap<String, Long>();
261 this.serverToFailedCounterMap = new HashMap<String, Long>();
262 this.serverToAverageLatencyMap = new HashMap<String, Long>();
263 this.serverToMaxLatencyMap = new HashMap<String, Long>();
264 this.initialize(serverToFlushWorkerMap);
265 }
266
267 private void initialize(
268 Map<HRegionLocation, HTableFlushWorker> serverToFlushWorkerMap) {
269 if (serverToFlushWorkerMap == null) {
270 return;
271 }
272
273 long averageCalcSum = 0;
274 int averageCalcCount = 0;
275 for (Map.Entry<HRegionLocation, HTableFlushWorker> entry : serverToFlushWorkerMap
276 .entrySet()) {
277 HRegionLocation addr = entry.getKey();
278 HTableFlushWorker worker = entry.getValue();
279
280 long bufferedCounter = worker.getTotalBufferedCount();
281 long failedCounter = worker.getTotalFailedCount();
282 long serverMaxLatency = worker.getMaxLatency();
283 AtomicAverageCounter averageCounter = worker.getAverageLatencyCounter();
284
285 SimpleEntry<Long, Integer> averageComponents = averageCounter
286 .getComponents();
287 long serverAvgLatency = averageCounter.getAndReset();
288
289 this.totalBufferedPutCounter += bufferedCounter;
290 this.totalFailedPutCounter += failedCounter;
291 if (serverMaxLatency > this.maxLatency) {
292 this.maxLatency = serverMaxLatency;
293 }
294 averageCalcSum += averageComponents.getKey();
295 averageCalcCount += averageComponents.getValue();
296
297 this.serverToBufferedCounterMap.put(addr.getHostnamePort(),
298 bufferedCounter);
299 this.serverToFailedCounterMap
300 .put(addr.getHostnamePort(),
301 failedCounter);
302 this.serverToAverageLatencyMap.put(addr.getHostnamePort(),
303 serverAvgLatency);
304 this.serverToMaxLatencyMap
305 .put(addr.getHostnamePort(),
306 serverMaxLatency);
307 }
308 this.overallAverageLatency = averageCalcCount != 0 ? averageCalcSum
309 / averageCalcCount : 0;
310 }
311
312 public long getTotalBufferedCounter() {
313 return this.totalBufferedPutCounter;
314 }
315
316 public long getTotalFailedCounter() {
317 return this.totalFailedPutCounter;
318 }
319
320 public long getMaxLatency() {
321 return this.maxLatency;
322 }
323
324 public long getOverallAverageLatency() {
325 return this.overallAverageLatency;
326 }
327
328 public Map<String, Long> getBufferedCounterForEachRegionServer() {
329 return this.serverToBufferedCounterMap;
330 }
331
332 public Map<String, Long> getFailedCounterForEachRegionServer() {
333 return this.serverToFailedCounterMap;
334 }
335
336 public Map<String, Long> getMaxLatencyForEachRegionServer() {
337 return this.serverToMaxLatencyMap;
338 }
339
340 public Map<String, Long> getAverageLatencyForEachRegionServer() {
341 return this.serverToAverageLatencyMap;
342 }
343 }
344
345 private static class PutStatus {
346 private final HRegionInfo regionInfo;
347 private final Put put;
348 private final int retryCount;
349 public PutStatus(final HRegionInfo regionInfo, final Put put,
350 final int retryCount) {
351 this.regionInfo = regionInfo;
352 this.put = put;
353 this.retryCount = retryCount;
354 }
355
356 public HRegionInfo getRegionInfo() {
357 return regionInfo;
358 }
359 public Put getPut() {
360 return put;
361 }
362 public int getRetryCount() {
363 return retryCount;
364 }
365 }
366
367
368
369
370 private static class AtomicAverageCounter {
371 private long sum;
372 private int count;
373
374 public AtomicAverageCounter() {
375 this.sum = 0L;
376 this.count = 0;
377 }
378
379 public synchronized long getAndReset() {
380 long result = this.get();
381 this.reset();
382 return result;
383 }
384
385 public synchronized long get() {
386 if (this.count == 0) {
387 return 0;
388 }
389 return this.sum / this.count;
390 }
391
392 public synchronized SimpleEntry<Long, Integer> getComponents() {
393 return new SimpleEntry<Long, Integer>(sum, count);
394 }
395
396 public synchronized void reset() {
397 this.sum = 0l;
398 this.count = 0;
399 }
400
401 public synchronized void add(long value) {
402 this.sum += value;
403 this.count++;
404 }
405 }
406
407 private static class HTableFlushWorker implements Runnable {
408 private HRegionLocation addr;
409 private Configuration conf;
410 private LinkedBlockingQueue<PutStatus> queue;
411 private HTableMultiplexer htableMultiplexer;
412 private AtomicLong totalFailedPutCount;
413 private AtomicInteger currentProcessingPutCount;
414 private AtomicAverageCounter averageLatency;
415 private AtomicLong maxLatency;
416 private HTable htable;
417
418 public HTableFlushWorker(Configuration conf, HRegionLocation addr,
419 HTableMultiplexer htableMultiplexer,
420 LinkedBlockingQueue<PutStatus> queue, HTable htable) {
421 this.addr = addr;
422 this.conf = conf;
423 this.htableMultiplexer = htableMultiplexer;
424 this.queue = queue;
425 this.totalFailedPutCount = new AtomicLong(0);
426 this.currentProcessingPutCount = new AtomicInteger(0);
427 this.averageLatency = new AtomicAverageCounter();
428 this.maxLatency = new AtomicLong(0);
429 this.htable = htable;
430 }
431
432 public long getTotalFailedCount() {
433 return totalFailedPutCount.get();
434 }
435
436 public long getTotalBufferedCount() {
437 return queue.size() + currentProcessingPutCount.get();
438 }
439
440 public AtomicAverageCounter getAverageLatencyCounter() {
441 return this.averageLatency;
442 }
443
444 public long getMaxLatency() {
445 return this.maxLatency.getAndSet(0);
446 }
447
448 private boolean resubmitFailedPut(PutStatus failedPutStatus,
449 HRegionLocation oldLoc) throws IOException {
450 Put failedPut = failedPutStatus.getPut();
451
452 TableName tableName = failedPutStatus.getRegionInfo().getTable();
453
454 int retryCount = failedPutStatus.getRetryCount() - 1;
455
456 if (retryCount <= 0) {
457
458 return false;
459 } else {
460
461 return this.htableMultiplexer.put(tableName, failedPut, retryCount);
462 }
463 }
464
465 @Override
466 @edu.umd.cs.findbugs.annotations.SuppressWarnings
467 (value = "REC_CATCH_EXCEPTION", justification = "na")
468 public void run() {
469 List<PutStatus> processingList = new ArrayList<PutStatus>();
470
471
472
473
474 long frequency = conf.getLong(TABLE_MULTIPLEXER_FLUSH_FREQ_MS, 100);
475
476
477 try {
478 Thread.sleep(frequency);
479 } catch (InterruptedException e) {
480 }
481
482 long start, elapsed;
483 int failedCount = 0;
484 while (true) {
485 try {
486 start = elapsed = EnvironmentEdgeManager.currentTimeMillis();
487
488
489 processingList.clear();
490 failedCount = 0;
491
492
493 queue.drainTo(processingList);
494 currentProcessingPutCount.set(processingList.size());
495
496 if (processingList.size() > 0) {
497 ArrayList<Put> list = new ArrayList<Put>(processingList.size());
498 for (PutStatus putStatus: processingList) {
499 list.add(putStatus.getPut());
500 }
501
502
503 List<Put> failed = null;
504 Object[] results = new Object[list.size()];
505 try {
506 htable.batch(list, results);
507 } catch (IOException e) {
508 LOG.debug("Caught some exceptions " + e
509 + " when flushing puts to region server " + addr.getHostnamePort());
510 } finally {
511
512
513
514
515
516 for (int i = results.length - 1; i >= 0; i--) {
517 if (results[i] instanceof Result) {
518
519 list.remove(i);
520 }
521 }
522 failed = list;
523 }
524
525 if (failed != null) {
526 if (failed.size() == processingList.size()) {
527
528 for (PutStatus putStatus: processingList) {
529 if (!resubmitFailedPut(putStatus, this.addr)) {
530 failedCount++;
531 }
532 }
533 } else {
534 Set<Put> failedPutSet = new HashSet<Put>(failed);
535 for (PutStatus putStatus: processingList) {
536 if (failedPutSet.contains(putStatus.getPut())
537 && !resubmitFailedPut(putStatus, this.addr)) {
538 failedCount++;
539 }
540 }
541 }
542 }
543
544 this.totalFailedPutCount.addAndGet(failedCount);
545
546 elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
547
548 averageLatency.add(elapsed);
549 if (elapsed > maxLatency.get()) {
550 maxLatency.set(elapsed);
551 }
552
553
554 if (LOG.isDebugEnabled()) {
555 LOG.debug("Processed " + currentProcessingPutCount
556 + " put requests for " + addr.getHostnamePort() + " and "
557 + failedCount + " failed" + ", latency for this send: "
558 + elapsed);
559 }
560
561
562 currentProcessingPutCount.set(0);
563 }
564
565
566 if (elapsed == start) {
567 elapsed = EnvironmentEdgeManager.currentTimeMillis() - start;
568 }
569 if (elapsed < frequency) {
570 Thread.sleep(frequency - elapsed);
571 }
572 } catch (Exception e) {
573
574 LOG.debug("Caught some exceptions " + e
575 + " when flushing puts to region server "
576 + addr.getHostnamePort());
577 }
578 }
579 }
580 }
581 }