1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.util;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.CompletionService;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorCompletionService;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.classification.InterfaceAudience;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.HTableDescriptor;
42 import org.apache.hadoop.hbase.regionserver.HRegion;
43
44
45
46
47 @InterfaceAudience.Private
48 public abstract class ModifyRegionUtils {
49 private static final Log LOG = LogFactory.getLog(ModifyRegionUtils.class);
50
51 private ModifyRegionUtils() {
52 }
53
54 public interface RegionFillTask {
55 void fillRegion(final HRegion region) throws IOException;
56 }
57
58
59
60
61
62
63
64
65
66
67
68 public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
69 final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions) throws IOException {
70 return createRegions(conf, rootDir, hTableDescriptor, newRegions, null);
71 }
72
73
74
75
76
77
78
79
80
81
82
83
84 public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
85 final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
86 final RegionFillTask task) throws IOException {
87
88 Path tableDir = FSUtils.getTableDir(rootDir, hTableDescriptor.getTableName());
89 return createRegions(conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
90 }
91
92
93
94
95
96
97
98
99
100
101
102
103
104 public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
105 final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
106 final RegionFillTask task) throws IOException {
107 if (newRegions == null) return null;
108 int regionNumber = newRegions.length;
109 ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
110 "RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber);
111 CompletionService<HRegionInfo> completionService = new ExecutorCompletionService<HRegionInfo>(
112 regionOpenAndInitThreadPool);
113 List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
114 for (final HRegionInfo newRegion : newRegions) {
115 completionService.submit(new Callable<HRegionInfo>() {
116 @Override
117 public HRegionInfo call() throws IOException {
118 return createRegion(conf, rootDir, tableDir, hTableDescriptor, newRegion, task);
119 }
120 });
121 }
122 try {
123
124 for (int i = 0; i < regionNumber; i++) {
125 Future<HRegionInfo> future = completionService.take();
126 HRegionInfo regionInfo = future.get();
127 regionInfos.add(regionInfo);
128 }
129 } catch (InterruptedException e) {
130 LOG.error("Caught " + e + " during region creation");
131 throw new InterruptedIOException(e.getMessage());
132 } catch (ExecutionException e) {
133 throw new IOException(e);
134 } finally {
135 regionOpenAndInitThreadPool.shutdownNow();
136 }
137 return regionInfos;
138 }
139
140
141
142
143
144
145
146
147
148
149
150 public static HRegionInfo createRegion(final Configuration conf, final Path rootDir,
151 final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion,
152 final RegionFillTask task) throws IOException {
153
154 HRegion region = HRegion.createHRegion(newRegion,
155 rootDir, tableDir, conf, hTableDescriptor, null,
156 false, true);
157 try {
158
159 if (task != null) {
160 task.fillRegion(region);
161 }
162 } finally {
163
164 region.close();
165 }
166 return region.getRegionInfo();
167 }
168
169
170
171
172
173 static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
174 final String threadNamePrefix, int regionNumber) {
175 int maxThreads = Math.min(regionNumber, conf.getInt(
176 "hbase.hregion.open.and.init.threads.max", 10));
177 ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
178 .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
179 new ThreadFactory() {
180 private int count = 1;
181
182 @Override
183 public Thread newThread(Runnable r) {
184 Thread t = new Thread(r, threadNamePrefix + "-" + count++);
185 return t;
186 }
187 });
188 return regionOpenAndInitThreadPool;
189 }
190 }