1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.PrintWriter;
23 import java.io.StringWriter;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.PriorityBlockingQueue;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.RejectedExecutionHandler;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.classification.InterfaceAudience;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.RemoteExceptionHandler;
41 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
42 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
43 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
44 import org.apache.hadoop.hbase.util.Pair;
45 import org.apache.hadoop.util.StringUtils;
46
47 import com.google.common.base.Preconditions;
48
49
50
51
52 @InterfaceAudience.Private
53 public class CompactSplitThread implements CompactionRequestor {
54 static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
55
56 private final HRegionServer server;
57 private final Configuration conf;
58
59 private final ThreadPoolExecutor largeCompactions;
60 private final ThreadPoolExecutor smallCompactions;
61 private final ThreadPoolExecutor splits;
62 private final ThreadPoolExecutor mergePool;
63
64
65
66
67
68
69 private int regionSplitLimit;
70
71
72 CompactSplitThread(HRegionServer server) {
73 super();
74 this.server = server;
75 this.conf = server.getConfiguration();
76 this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit",
77 Integer.MAX_VALUE);
78
79 int largeThreads = Math.max(1, conf.getInt(
80 "hbase.regionserver.thread.compaction.large", 1));
81 int smallThreads = conf.getInt(
82 "hbase.regionserver.thread.compaction.small", 1);
83
84 int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
85
86
87 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
88
89 final String n = Thread.currentThread().getName();
90
91 this.largeCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
92 60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
93 new ThreadFactory() {
94 @Override
95 public Thread newThread(Runnable r) {
96 Thread t = new Thread(r);
97 t.setName(n + "-largeCompactions-" + System.currentTimeMillis());
98 return t;
99 }
100 });
101 this.largeCompactions.setRejectedExecutionHandler(new Rejection());
102 this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
103 60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
104 new ThreadFactory() {
105 @Override
106 public Thread newThread(Runnable r) {
107 Thread t = new Thread(r);
108 t.setName(n + "-smallCompactions-" + System.currentTimeMillis());
109 return t;
110 }
111 });
112 this.smallCompactions
113 .setRejectedExecutionHandler(new Rejection());
114 this.splits = (ThreadPoolExecutor)
115 Executors.newFixedThreadPool(splitThreads,
116 new ThreadFactory() {
117 @Override
118 public Thread newThread(Runnable r) {
119 Thread t = new Thread(r);
120 t.setName(n + "-splits-" + System.currentTimeMillis());
121 return t;
122 }
123 });
124 int mergeThreads = conf.getInt("hbase.regionserver.thread.merge", 1);
125 this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
126 mergeThreads, new ThreadFactory() {
127 @Override
128 public Thread newThread(Runnable r) {
129 Thread t = new Thread(r);
130 t.setName(n + "-merges-" + System.currentTimeMillis());
131 return t;
132 }
133 });
134 }
135
136 @Override
137 public String toString() {
138 return "compaction_queue=("
139 + largeCompactions.getQueue().size() + ":"
140 + smallCompactions.getQueue().size() + ")"
141 + ", split_queue=" + splits.getQueue().size()
142 + ", merge_queue=" + mergePool.getQueue().size();
143 }
144
145 public String dumpQueue() {
146 StringBuffer queueLists = new StringBuffer();
147 queueLists.append("Compaction/Split Queue dump:\n");
148 queueLists.append(" LargeCompation Queue:\n");
149 BlockingQueue<Runnable> lq = largeCompactions.getQueue();
150 Iterator it = lq.iterator();
151 while(it.hasNext()){
152 queueLists.append(" "+it.next().toString());
153 queueLists.append("\n");
154 }
155
156 if( smallCompactions != null ){
157 queueLists.append("\n");
158 queueLists.append(" SmallCompation Queue:\n");
159 lq = smallCompactions.getQueue();
160 it = lq.iterator();
161 while(it.hasNext()){
162 queueLists.append(" "+it.next().toString());
163 queueLists.append("\n");
164 }
165 }
166
167 queueLists.append("\n");
168 queueLists.append(" Split Queue:\n");
169 lq = splits.getQueue();
170 it = lq.iterator();
171 while(it.hasNext()){
172 queueLists.append(" "+it.next().toString());
173 queueLists.append("\n");
174 }
175
176 queueLists.append("\n");
177 queueLists.append(" Region Merge Queue:\n");
178 lq = mergePool.getQueue();
179 it = lq.iterator();
180 while (it.hasNext()) {
181 queueLists.append(" " + it.next().toString());
182 queueLists.append("\n");
183 }
184
185 return queueLists.toString();
186 }
187
188 public synchronized void requestRegionsMerge(final HRegion a,
189 final HRegion b, final boolean forcible) {
190 try {
191 mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible));
192 if (LOG.isDebugEnabled()) {
193 LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
194 + forcible + ". " + this);
195 }
196 } catch (RejectedExecutionException ree) {
197 LOG.warn("Could not execute merge for " + a + "," + b + ", forcible="
198 + forcible, ree);
199 }
200 }
201
202 public synchronized boolean requestSplit(final HRegion r) {
203
204 if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
205 byte[] midKey = r.checkSplit();
206 if (midKey != null) {
207 requestSplit(r, midKey);
208 return true;
209 }
210 }
211 return false;
212 }
213
214 public synchronized void requestSplit(final HRegion r, byte[] midKey) {
215 if (midKey == null) {
216 LOG.debug("Region " + r.getRegionNameAsString() +
217 " not splittable because midkey=null");
218 return;
219 }
220 try {
221 this.splits.execute(new SplitRequest(r, midKey, this.server));
222 if (LOG.isDebugEnabled()) {
223 LOG.debug("Split requested for " + r + ". " + this);
224 }
225 } catch (RejectedExecutionException ree) {
226 LOG.info("Could not execute split for " + r, ree);
227 }
228 }
229
230 @Override
231 public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why)
232 throws IOException {
233 return requestCompaction(r, why, null);
234 }
235
236 @Override
237 public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
238 List<Pair<CompactionRequest, Store>> requests) throws IOException {
239 return requestCompaction(r, why, Store.NO_PRIORITY, requests);
240 }
241
242 @Override
243 public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
244 final String why, CompactionRequest request) throws IOException {
245 return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
246 }
247
248 @Override
249 public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
250 int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
251 return requestCompactionInternal(r, why, p, requests, true);
252 }
253
254 private List<CompactionRequest> requestCompactionInternal(final HRegion r, final String why,
255 int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow) throws IOException {
256
257 List<CompactionRequest> ret = null;
258 if (requests == null) {
259 ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
260 for (Store s : r.getStores().values()) {
261 CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
262 if (selectNow) ret.add(cr);
263 }
264 } else {
265 Preconditions.checkArgument(selectNow);
266 ret = new ArrayList<CompactionRequest>(requests.size());
267 for (Pair<CompactionRequest, Store> pair : requests) {
268 ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
269 }
270 }
271 return ret;
272 }
273
274 public CompactionRequest requestCompaction(final HRegion r, final Store s,
275 final String why, int priority, CompactionRequest request) throws IOException {
276 return requestCompactionInternal(r, s, why, priority, request, true);
277 }
278
279 public synchronized void requestSystemCompaction(
280 final HRegion r, final String why) throws IOException {
281 requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
282 }
283
284 public void requestSystemCompaction(
285 final HRegion r, final Store s, final String why) throws IOException {
286 requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
287 }
288
289
290
291
292
293
294
295
296
297 private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final Store s,
298 final String why, int priority, CompactionRequest request, boolean selectNow)
299 throws IOException {
300 if (this.server.isStopped()
301 || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
302 return null;
303 }
304
305 CompactionContext compaction = null;
306 if (selectNow) {
307 compaction = selectCompaction(r, s, priority, request);
308 if (compaction == null) return null;
309 }
310
311
312
313 long size = selectNow ? compaction.getRequest().getSize() : 0;
314 ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
315 ? largeCompactions : smallCompactions;
316 pool.execute(new CompactionRunner(s, r, compaction, pool));
317 if (LOG.isDebugEnabled()) {
318 String type = (pool == smallCompactions) ? "Small " : "Large ";
319 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
320 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
321 }
322 return selectNow ? compaction.getRequest() : null;
323 }
324
325 private CompactionContext selectCompaction(final HRegion r, final Store s,
326 int priority, CompactionRequest request) throws IOException {
327 CompactionContext compaction = s.requestCompaction(priority, request);
328 if (compaction == null) {
329 if(LOG.isDebugEnabled()) {
330 LOG.debug("Not compacting " + r.getRegionNameAsString() +
331 " because compaction request was cancelled");
332 }
333 return null;
334 }
335 assert compaction.hasSelection();
336 if (priority != Store.NO_PRIORITY) {
337 compaction.getRequest().setPriority(priority);
338 }
339 return compaction;
340 }
341
342
343
344
345 void interruptIfNecessary() {
346 splits.shutdown();
347 mergePool.shutdown();
348 largeCompactions.shutdown();
349 smallCompactions.shutdown();
350 }
351
352 private void waitFor(ThreadPoolExecutor t, String name) {
353 boolean done = false;
354 while (!done) {
355 try {
356 done = t.awaitTermination(60, TimeUnit.SECONDS);
357 LOG.info("Waiting for " + name + " to finish...");
358 if (!done) {
359 t.shutdownNow();
360 }
361 } catch (InterruptedException ie) {
362 LOG.warn("Interrupted waiting for " + name + " to finish...");
363 }
364 }
365 }
366
367 void join() {
368 waitFor(splits, "Split Thread");
369 waitFor(mergePool, "Merge Thread");
370 waitFor(largeCompactions, "Large Compaction Thread");
371 waitFor(smallCompactions, "Small Compaction Thread");
372 }
373
374
375
376
377
378
379
380 public int getCompactionQueueSize() {
381 return largeCompactions.getQueue().size() + smallCompactions.getQueue().size();
382 }
383
384 public int getLargeCompactionQueueSize() {
385 return largeCompactions.getQueue().size();
386 }
387
388
389 public int getSmallCompactionQueueSize() {
390 return smallCompactions.getQueue().size();
391 }
392
393
394 private boolean shouldSplitRegion() {
395 return (regionSplitLimit > server.getNumberOfOnlineRegions());
396 }
397
398
399
400
401 public int getRegionSplitLimit() {
402 return this.regionSplitLimit;
403 }
404
405 private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
406 private final Store store;
407 private final HRegion region;
408 private CompactionContext compaction;
409 private int queuedPriority;
410 private ThreadPoolExecutor parent;
411
412 public CompactionRunner(Store store, HRegion region,
413 CompactionContext compaction, ThreadPoolExecutor parent) {
414 super();
415 this.store = store;
416 this.region = region;
417 this.compaction = compaction;
418 this.queuedPriority = (this.compaction == null)
419 ? store.getCompactPriority() : compaction.getRequest().getPriority();
420 this.parent = parent;
421 }
422
423 @Override
424 public String toString() {
425 return (this.compaction != null) ? ("Request = " + compaction.getRequest())
426 : ("Store = " + store.toString() + ", pri = " + queuedPriority);
427 }
428
429 @Override
430 public void run() {
431 Preconditions.checkNotNull(server);
432 if (server.isStopped()
433 || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
434 return;
435 }
436
437 if (this.compaction == null) {
438 int oldPriority = this.queuedPriority;
439 this.queuedPriority = this.store.getCompactPriority();
440 if (this.queuedPriority > oldPriority) {
441
442
443 this.parent.execute(this);
444 return;
445 }
446 try {
447 this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
448 } catch (IOException ex) {
449 LOG.error("Compaction selection failed " + this, ex);
450 server.checkFileSystem();
451 return;
452 }
453 if (this.compaction == null) return;
454
455
456 assert this.compaction.hasSelection();
457 ThreadPoolExecutor pool = store.throttleCompaction(
458 compaction.getRequest().getSize()) ? largeCompactions : smallCompactions;
459 if (this.parent != pool) {
460 this.store.cancelRequestedCompaction(this.compaction);
461 this.compaction = null;
462 this.parent = pool;
463 this.parent.execute(this);
464 return;
465 }
466 }
467
468 assert this.compaction != null;
469
470 this.compaction.getRequest().beforeExecute();
471 try {
472
473
474 long start = EnvironmentEdgeManager.currentTimeMillis();
475 boolean completed = region.compact(compaction, store);
476 long now = EnvironmentEdgeManager.currentTimeMillis();
477 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
478 this + "; duration=" + StringUtils.formatTimeDiff(now, start));
479 if (completed) {
480
481 if (store.getCompactPriority() <= 0) {
482 requestSystemCompaction(region, store, "Recursive enqueue");
483 } else {
484
485 requestSplit(region);
486 }
487 }
488 } catch (IOException ex) {
489 IOException remoteEx = RemoteExceptionHandler.checkIOException(ex);
490 LOG.error("Compaction failed " + this, remoteEx);
491 if (remoteEx != ex) {
492 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
493 }
494 server.checkFileSystem();
495 } catch (Exception ex) {
496 LOG.error("Compaction failed " + this, ex);
497 server.checkFileSystem();
498 } finally {
499 LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
500 }
501 this.compaction.getRequest().afterExecute();
502 }
503
504 private String formatStackTrace(Exception ex) {
505 StringWriter sw = new StringWriter();
506 PrintWriter pw = new PrintWriter(sw);
507 ex.printStackTrace(pw);
508 pw.flush();
509 return sw.toString();
510 }
511
512 @Override
513 public int compareTo(CompactionRunner o) {
514
515 int compareVal = queuedPriority - o.queuedPriority;
516 if (compareVal != 0) return compareVal;
517 CompactionContext tc = this.compaction, oc = o.compaction;
518
519 return (tc == null) ? ((oc == null) ? 0 : 1)
520 : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
521 }
522 }
523
524
525
526
527 private static class Rejection implements RejectedExecutionHandler {
528 @Override
529 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
530 if (runnable instanceof CompactionRunner) {
531 CompactionRunner runner = (CompactionRunner)runnable;
532 LOG.debug("Compaction Rejected: " + runner);
533 runner.store.cancelRequestedCompaction(runner.compaction);
534 }
535 }
536 }
537 }