1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.security.access;
20
21 import com.google.protobuf.RpcCallback;
22 import com.google.protobuf.RpcController;
23 import com.google.protobuf.Service;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.FileUtil;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.fs.permission.FsPermission;
33 import org.apache.hadoop.hbase.Coprocessor;
34 import org.apache.hadoop.hbase.CoprocessorEnvironment;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.DoNotRetryIOException;
37 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
38 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
39 import org.apache.hadoop.hbase.ipc.RequestContext;
40 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
41 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
42 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
43 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
44 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
45 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadResponse;
46 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
47 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadResponse;
48 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest;
49 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse;
50 import org.apache.hadoop.hbase.regionserver.HRegion;
51 import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
52 import org.apache.hadoop.hbase.security.User;
53 import org.apache.hadoop.hbase.security.UserProvider;
54 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.apache.hadoop.hbase.util.FSHDFSUtils;
57 import org.apache.hadoop.hbase.util.Methods;
58 import org.apache.hadoop.hbase.util.Pair;
59 import org.apache.hadoop.io.Text;
60 import org.apache.hadoop.security.UserGroupInformation;
61 import org.apache.hadoop.security.token.Token;
62
63 import java.io.IOException;
64 import java.math.BigInteger;
65 import java.security.PrivilegedAction;
66 import java.security.SecureRandom;
67 import java.util.ArrayList;
68 import java.util.List;
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 @InterfaceAudience.Private
97 public class SecureBulkLoadEndpoint extends SecureBulkLoadService
98 implements CoprocessorService, Coprocessor {
99
100 public static final long VERSION = 0L;
101
102
103 private static final int RANDOM_WIDTH = 320;
104 private static final int RANDOM_RADIX = 32;
105
106 private static Log LOG = LogFactory.getLog(SecureBulkLoadEndpoint.class);
107
108 private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
109 private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
110
111 private SecureRandom random;
112 private FileSystem fs;
113 private Configuration conf;
114
115
116
117 private Path baseStagingDir;
118
119 private RegionCoprocessorEnvironment env;
120
121 private UserProvider userProvider;
122
123 @Override
124 public void start(CoprocessorEnvironment env) {
125 this.env = (RegionCoprocessorEnvironment)env;
126 random = new SecureRandom();
127 conf = env.getConfiguration();
128 baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf);
129 this.userProvider = UserProvider.instantiate(conf);
130
131 try {
132 fs = FileSystem.get(conf);
133 fs.mkdirs(baseStagingDir, PERM_HIDDEN);
134 fs.setPermission(baseStagingDir, PERM_HIDDEN);
135
136 fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN);
137 FileStatus status = fs.getFileStatus(baseStagingDir);
138 if(status == null) {
139 throw new IllegalStateException("Failed to create staging directory");
140 }
141 if(!status.getPermission().equals(PERM_HIDDEN)) {
142 throw new IllegalStateException(
143 "Directory already exists but permissions aren't set to '-rwx--x--x' ");
144 }
145 } catch (IOException e) {
146 throw new IllegalStateException("Failed to get FileSystem instance",e);
147 }
148 }
149
150 @Override
151 public void stop(CoprocessorEnvironment env) throws IOException {
152 }
153
154 @Override
155 public void prepareBulkLoad(RpcController controller,
156 PrepareBulkLoadRequest request,
157 RpcCallback<PrepareBulkLoadResponse> done){
158 try {
159 getAccessController().prePrepareBulkLoad(env);
160 String bulkToken = createStagingDir(baseStagingDir,
161 getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString();
162 done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
163 } catch (IOException e) {
164 ResponseConverter.setControllerException(controller, e);
165 }
166 done.run(null);
167 }
168
169 @Override
170 public void cleanupBulkLoad(RpcController controller,
171 CleanupBulkLoadRequest request,
172 RpcCallback<CleanupBulkLoadResponse> done) {
173 try {
174 getAccessController().preCleanupBulkLoad(env);
175 fs.delete(createStagingDir(baseStagingDir,
176 getActiveUser(),
177 env.getRegion().getTableDesc().getTableName(),
178 new Path(request.getBulkToken()).getName()),
179 true);
180 done.run(CleanupBulkLoadResponse.newBuilder().build());
181 } catch (IOException e) {
182 ResponseConverter.setControllerException(controller, e);
183 }
184 done.run(null);
185 }
186
187 @Override
188 public void secureBulkLoadHFiles(RpcController controller,
189 SecureBulkLoadHFilesRequest request,
190 RpcCallback<SecureBulkLoadHFilesResponse> done) {
191 final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
192 for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
193 familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
194 }
195 final Token userToken =
196 new Token(request.getFsToken().getIdentifier().toByteArray(),
197 request.getFsToken().getPassword().toByteArray(),
198 new Text(request.getFsToken().getKind()),
199 new Text(request.getFsToken().getService()));
200 final String bulkToken = request.getBulkToken();
201 User user = getActiveUser();
202 final UserGroupInformation ugi = user.getUGI();
203 if(userToken != null) {
204 ugi.addToken(userToken);
205 } else if (userProvider.isHadoopSecurityEnabled()) {
206
207
208 ResponseConverter.setControllerException(controller,
209 new DoNotRetryIOException("User token cannot be null"));
210 return;
211 }
212
213 HRegion region = env.getRegion();
214 boolean bypass = false;
215 if (region.getCoprocessorHost() != null) {
216 try {
217 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
218 } catch (IOException e) {
219 ResponseConverter.setControllerException(controller, e);
220 done.run(null);
221 return;
222 }
223 }
224 boolean loaded = false;
225 if (!bypass) {
226
227
228
229
230
231 FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
232 try {
233 targetfsDelegationToken.acquireDelegationToken(fs);
234 } catch (IOException e) {
235 ResponseConverter.setControllerException(controller, e);
236 done.run(null);
237 return;
238 }
239 Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
240 if (targetFsToken != null && (userToken == null
241 || !targetFsToken.getService().equals(userToken.getService()))) {
242 ugi.addToken(targetFsToken);
243 }
244
245 loaded = ugi.doAs(new PrivilegedAction<Boolean>() {
246 @Override
247 public Boolean run() {
248 FileSystem fs = null;
249 try {
250 Configuration conf = env.getConfiguration();
251 fs = FileSystem.get(conf);
252 for(Pair<byte[], String> el: familyPaths) {
253 Path p = new Path(el.getSecond());
254 LOG.trace("Setting permission for: " + p);
255 fs.setPermission(p, PERM_ALL_ACCESS);
256
257 Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
258 if(!fs.exists(stageFamily)) {
259 fs.mkdirs(stageFamily);
260 fs.setPermission(stageFamily, PERM_ALL_ACCESS);
261 }
262 }
263
264
265 return env.getRegion().bulkLoadHFiles(familyPaths, true,
266 new SecureBulkLoadListener(fs, bulkToken, conf));
267 } catch (Exception e) {
268 LOG.error("Failed to complete bulk load", e);
269 }
270 return false;
271 }
272 });
273 }
274 if (region.getCoprocessorHost() != null) {
275 try {
276 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
277 } catch (IOException e) {
278 ResponseConverter.setControllerException(controller, e);
279 done.run(null);
280 return;
281 }
282 }
283 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());
284 }
285
286 private AccessController getAccessController() {
287 return (AccessController) this.env.getRegion()
288 .getCoprocessorHost().findCoprocessor(AccessController.class.getName());
289 }
290
291 private Path createStagingDir(Path baseDir,
292 User user,
293 TableName tableName) throws IOException {
294 String randomDir = user.getShortName()+"__"+ tableName +"__"+
295 (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));
296 return createStagingDir(baseDir, user, tableName, randomDir);
297 }
298
299 private Path createStagingDir(Path baseDir,
300 User user,
301 TableName tableName,
302 String randomDir) throws IOException {
303 Path p = new Path(baseDir, randomDir);
304 fs.mkdirs(p, PERM_ALL_ACCESS);
305 fs.setPermission(p, PERM_ALL_ACCESS);
306 return p;
307 }
308
309 private User getActiveUser() {
310 User user = RequestContext.getRequestUser();
311 if (!RequestContext.isInRequestContext()) {
312 return null;
313 }
314
315
316 if("simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
317 return User.createUserForTesting(conf, user.getShortName(), new String[]{});
318 }
319
320 return user;
321 }
322
323 @Override
324 public Service getService() {
325 return this;
326 }
327
328 private static class SecureBulkLoadListener implements HRegion.BulkLoadListener {
329
330 private FileSystem fs;
331 private String stagingDir;
332 private Configuration conf;
333
334 private FileSystem srcFs = null;
335
336 public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
337 this.fs = fs;
338 this.stagingDir = stagingDir;
339 this.conf = conf;
340 }
341
342 @Override
343 public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
344 Path p = new Path(srcPath);
345 Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
346 if (srcFs == null) {
347 srcFs = FileSystem.get(p.toUri(), conf);
348 }
349
350 if(!isFile(p)) {
351 throw new IOException("Path does not reference a file: " + p);
352 }
353
354
355 if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
356 LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
357 "the destination filesystem. Copying file over to destination staging dir.");
358 FileUtil.copy(srcFs, p, fs, stageP, false, conf);
359 }
360 else {
361 LOG.debug("Moving " + p + " to " + stageP);
362 if(!fs.rename(p, stageP)) {
363 throw new IOException("Failed to move HFile: " + p + " to " + stageP);
364 }
365 }
366 return stageP.toString();
367 }
368
369 @Override
370 public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
371 LOG.debug("Bulk Load done for: " + srcPath);
372 }
373
374 @Override
375 public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
376 Path p = new Path(srcPath);
377 Path stageP = new Path(stagingDir,
378 new Path(Bytes.toString(family), p.getName()));
379 LOG.debug("Moving " + stageP + " back to " + p);
380 if(!fs.rename(stageP, p))
381 throw new IOException("Failed to move HFile: " + stageP + " to " + p);
382 }
383
384
385
386
387
388
389
390
391 private boolean isFile(Path p) throws IOException {
392 FileStatus status = srcFs.getFileStatus(p);
393 boolean isFile = !status.isDir();
394 try {
395 isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);
396 } catch (Exception e) {
397 }
398 return isFile;
399 }
400 }
401 }