1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.security.token;
20
21 import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26 import java.net.InetSocketAddress;
27 import java.security.PrivilegedExceptionAction;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.ExecutorService;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.ClusterId;
37 import org.apache.hadoop.hbase.Coprocessor;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.MediumTests;
41 import org.apache.hadoop.hbase.Server;
42 import org.apache.hadoop.hbase.ServerName;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.catalog.CatalogTracker;
45 import org.apache.hadoop.hbase.client.HTableInterface;
46 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
47 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
48 import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
49 import org.apache.hadoop.hbase.ipc.RequestContext;
50 import org.apache.hadoop.hbase.ipc.RpcClient;
51 import org.apache.hadoop.hbase.ipc.RpcServer;
52 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
53 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
54 import org.apache.hadoop.hbase.ipc.ServerRpcController;
55 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
56 import org.apache.hadoop.hbase.regionserver.HRegion;
57 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
58 import org.apache.hadoop.hbase.security.SecurityInfo;
59 import org.apache.hadoop.hbase.security.User;
60 import org.apache.hadoop.hbase.util.Bytes;
61 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62 import org.apache.hadoop.hbase.util.Sleeper;
63 import org.apache.hadoop.hbase.util.Strings;
64 import org.apache.hadoop.hbase.util.Threads;
65 import org.apache.hadoop.hbase.util.Writables;
66 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
67 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
68 import org.apache.hadoop.net.DNS;
69 import org.apache.hadoop.security.UserGroupInformation;
70 import org.apache.hadoop.security.authorize.PolicyProvider;
71 import org.apache.hadoop.security.authorize.Service;
72 import org.apache.hadoop.security.token.SecretManager;
73 import org.apache.hadoop.security.token.Token;
74 import org.apache.hadoop.security.token.TokenIdentifier;
75 import org.junit.AfterClass;
76 import org.junit.BeforeClass;
77 import org.junit.Test;
78 import org.junit.experimental.categories.Category;
79
80 import com.google.protobuf.BlockingRpcChannel;
81 import com.google.protobuf.BlockingService;
82 import com.google.protobuf.RpcController;
83 import com.google.protobuf.ServiceException;
84
85
86
87
88 @Category(MediumTests.class)
89 public class TestTokenAuthentication {
90 static {
91
92
93 System.setProperty("java.security.krb5.realm", "hbase");
94 System.setProperty("java.security.krb5.kdc", "blah");
95 }
96 private static Log LOG = LogFactory.getLog(TestTokenAuthentication.class);
97
98 public interface AuthenticationServiceSecurityInfo {}
99
100
101
102
103 private static class TokenServer extends TokenProvider
104 implements AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
105 private static Log LOG = LogFactory.getLog(TokenServer.class);
106 private Configuration conf;
107 private RpcServerInterface rpcServer;
108 private InetSocketAddress isa;
109 private ZooKeeperWatcher zookeeper;
110 private Sleeper sleeper;
111 private boolean started = false;
112 private boolean aborted = false;
113 private boolean stopped = false;
114 private long startcode;
115
116 public TokenServer(Configuration conf) throws IOException {
117 this.conf = conf;
118 this.startcode = EnvironmentEdgeManager.currentTimeMillis();
119
120 String hostname =
121 Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default"));
122 int port = 0;
123
124 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
125 if (initialIsa.getAddress() == null) {
126 throw new IllegalArgumentException("Failed resolve of " + initialIsa);
127 }
128 final List<BlockingServiceAndInterface> sai =
129 new ArrayList<BlockingServiceAndInterface>(1);
130 BlockingService service =
131 AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
132 sai.add(new BlockingServiceAndInterface(service,
133 AuthenticationProtos.AuthenticationService.BlockingInterface.class));
134 this.rpcServer =
135 new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
136 this.isa = this.rpcServer.getListenerAddress();
137 this.sleeper = new Sleeper(1000, this);
138 }
139
140 @Override
141 public Configuration getConfiguration() {
142 return conf;
143 }
144
145 @Override
146 public CatalogTracker getCatalogTracker() {
147 return null;
148 }
149
150 @Override
151 public ZooKeeperWatcher getZooKeeper() {
152 return zookeeper;
153 }
154
155 @Override
156 public boolean isAborted() {
157 return aborted;
158 }
159
160 @Override
161 public ServerName getServerName() {
162 return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
163 }
164
165 @Override
166 public void abort(String reason, Throwable error) {
167 LOG.fatal("Aborting on: "+reason, error);
168 this.aborted = true;
169 this.stopped = true;
170 sleeper.skipSleepCycle();
171 }
172
173 private void initialize() throws IOException {
174
175 Configuration zkConf = new Configuration(conf);
176 zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
177 this.zookeeper = new ZooKeeperWatcher(zkConf, TokenServer.class.getSimpleName(),
178 this, true);
179 this.rpcServer.start();
180
181
182 final RegionServerServices mockServices = TEST_UTIL.createMockRegionServerService(rpcServer);
183
184
185 super.start(new RegionCoprocessorEnvironment() {
186 @Override
187 public HRegion getRegion() { return null; }
188
189 @Override
190 public RegionServerServices getRegionServerServices() {
191 return mockServices;
192 }
193
194 @Override
195 public ConcurrentMap<String, Object> getSharedData() { return null; }
196
197 @Override
198 public int getVersion() { return 0; }
199
200 @Override
201 public String getHBaseVersion() { return null; }
202
203 @Override
204 public Coprocessor getInstance() { return null; }
205
206 @Override
207 public int getPriority() { return 0; }
208
209 @Override
210 public int getLoadSequence() { return 0; }
211
212 @Override
213 public Configuration getConfiguration() { return conf; }
214
215 @Override
216 public HTableInterface getTable(TableName tableName) throws IOException
217 { return null; }
218
219 @Override
220 public HTableInterface getTable(TableName tableName, ExecutorService service)
221 throws IOException {
222 return null;
223 }
224
225 @Override
226 public ClassLoader getClassLoader() {
227 return Thread.currentThread().getContextClassLoader();
228 }
229 });
230
231 started = true;
232 }
233
234 public void run() {
235 try {
236 initialize();
237 while (!stopped) {
238 this.sleeper.sleep();
239 }
240 } catch (Exception e) {
241 abort(e.getMessage(), e);
242 }
243 this.rpcServer.stop();
244 }
245
246 public boolean isStarted() {
247 return started;
248 }
249
250 @Override
251 public void stop(String reason) {
252 LOG.info("Stopping due to: "+reason);
253 this.stopped = true;
254 sleeper.skipSleepCycle();
255 }
256
257 @Override
258 public boolean isStopped() {
259 return stopped;
260 }
261
262 public InetSocketAddress getAddress() {
263 return isa;
264 }
265
266 public SecretManager<? extends TokenIdentifier> getSecretManager() {
267 return ((RpcServer)rpcServer).getSecretManager();
268 }
269
270 @Override
271 public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
272 RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
273 throws ServiceException {
274 LOG.debug("Authentication token request from "+RequestContext.getRequestUserName());
275
276 ServerRpcController serverController = new ServerRpcController();
277 BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> callback =
278 new BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse>();
279 getAuthenticationToken(serverController, request, callback);
280 try {
281 serverController.checkFailed();
282 return callback.get();
283 } catch (IOException ioe) {
284 throw new ServiceException(ioe);
285 }
286 }
287
288 @Override
289 public AuthenticationProtos.WhoAmIResponse whoAmI(
290 RpcController controller, AuthenticationProtos.WhoAmIRequest request)
291 throws ServiceException {
292 LOG.debug("whoAmI() request from "+RequestContext.getRequestUserName());
293
294 ServerRpcController serverController = new ServerRpcController();
295 BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
296 new BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse>();
297 whoAmI(serverController, request, callback);
298 try {
299 serverController.checkFailed();
300 return callback.get();
301 } catch (IOException ioe) {
302 throw new ServiceException(ioe);
303 }
304 }
305 }
306
307
308 private static HBaseTestingUtility TEST_UTIL;
309 private static TokenServer server;
310 private static Thread serverThread;
311 private static AuthenticationTokenSecretManager secretManager;
312 private static ClusterId clusterId = new ClusterId();
313
314 @BeforeClass
315 public static void setupBeforeClass() throws Exception {
316 TEST_UTIL = new HBaseTestingUtility();
317 TEST_UTIL.startMiniZKCluster();
318
319 SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),
320 new SecurityInfo("hbase.test.kerberos.principal",
321 AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN));
322
323 Configuration conf = TEST_UTIL.getConfiguration();
324 conf.set("hadoop.security.authentication", "kerberos");
325 conf.set("hbase.security.authentication", "kerberos");
326 conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
327 server = new TokenServer(conf);
328 serverThread = new Thread(server);
329 Threads.setDaemonThreadRunning(serverThread, "TokenServer:"+server.getServerName().toString());
330
331 while (!server.isStarted() && !server.isStopped()) {
332 Thread.sleep(10);
333 }
334 server.rpcServer.refreshAuthManager(new PolicyProvider() {
335 @Override
336 public Service[] getServices() {
337 return new Service [] {
338 new Service("security.client.protocol.acl",
339 AuthenticationProtos.AuthenticationService.BlockingInterface.class)};
340 }
341 });
342 ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
343 secretManager = (AuthenticationTokenSecretManager)server.getSecretManager();
344 while(secretManager.getCurrentKey() == null) {
345 Thread.sleep(1);
346 }
347 }
348
349 @AfterClass
350 public static void tearDownAfterClass() throws Exception {
351 server.stop("Test complete");
352 Threads.shutdown(serverThread);
353 TEST_UTIL.shutdownMiniZKCluster();
354 }
355
356 @Test
357 public void testTokenCreation() throws Exception {
358 Token<AuthenticationTokenIdentifier> token =
359 secretManager.generateToken("testuser");
360
361 AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
362 Writables.getWritable(token.getIdentifier(), ident);
363 assertEquals("Token username should match", "testuser",
364 ident.getUsername());
365 byte[] passwd = secretManager.retrievePassword(ident);
366 assertTrue("Token password and password from secret manager should match",
367 Bytes.equals(token.getPassword(), passwd));
368 }
369
370 @Test
371 public void testTokenAuthentication() throws Exception {
372 UserGroupInformation testuser =
373 UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
374
375 testuser.setAuthenticationMethod(
376 UserGroupInformation.AuthenticationMethod.TOKEN);
377 final Configuration conf = TEST_UTIL.getConfiguration();
378 UserGroupInformation.setConfiguration(conf);
379 Token<AuthenticationTokenIdentifier> token =
380 secretManager.generateToken("testuser");
381 LOG.debug("Got token: " + token.toString());
382 testuser.addToken(token);
383
384
385 testuser.doAs(new PrivilegedExceptionAction<Object>() {
386 public Object run() throws Exception {
387 Configuration c = server.getConfiguration();
388 RpcClient rpcClient = new RpcClient(c, clusterId.toString());
389 ServerName sn =
390 ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
391 System.currentTimeMillis());
392 try {
393 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
394 User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
395 AuthenticationProtos.AuthenticationService.BlockingInterface stub =
396 AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
397 AuthenticationProtos.WhoAmIResponse response =
398 stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
399 String myname = response.getUsername();
400 assertEquals("testuser", myname);
401 String authMethod = response.getAuthMethod();
402 assertEquals("TOKEN", authMethod);
403 } finally {
404 rpcClient.stop();
405 }
406 return null;
407 }
408 });
409 }
410 }