1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.security.token;
20
21 import java.io.IOException;
22 import java.lang.reflect.UndeclaredThrowableException;
23 import java.security.PrivilegedExceptionAction;
24
25 import com.google.protobuf.ServiceException;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
33 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
34 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
35 import org.apache.hadoop.io.Text;
36 import org.apache.hadoop.mapred.JobConf;
37 import org.apache.hadoop.mapreduce.Job;
38 import org.apache.hadoop.security.UserGroupInformation;
39 import org.apache.hadoop.security.token.Token;
40
41
42
43
44 public class TokenUtil {
45 private static Log LOG = LogFactory.getLog(TokenUtil.class);
46
47
48
49
50
51
52 public static Token<AuthenticationTokenIdentifier> obtainToken(
53 Configuration conf) throws IOException {
54 HTable meta = null;
55 try {
56 meta = new HTable(conf, TableName.META_TABLE_NAME);
57 CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
58 AuthenticationProtos.AuthenticationService.BlockingInterface service =
59 AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
60 AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
61 AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
62
63 return ProtobufUtil.toToken(response.getToken());
64 } catch (ServiceException se) {
65 ProtobufUtil.toIOException(se);
66 } finally {
67 if (meta != null) {
68 meta.close();
69 }
70 }
71
72 return null;
73 }
74
75 private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
76 throws IOException {
77 return token.getService() != null
78 ? token.getService() : new Text("default");
79 }
80
81
82
83
84
85
86
87
88
89 public static void obtainAndCacheToken(final Configuration conf,
90 UserGroupInformation user)
91 throws IOException, InterruptedException {
92 try {
93 Token<AuthenticationTokenIdentifier> token =
94 user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
95 public Token<AuthenticationTokenIdentifier> run() throws Exception {
96 return obtainToken(conf);
97 }
98 });
99
100 if (token == null) {
101 throw new IOException("No token returned for user "+user.getUserName());
102 }
103 if (LOG.isDebugEnabled()) {
104 LOG.debug("Obtained token "+token.getKind().toString()+" for user "+
105 user.getUserName());
106 }
107 user.addToken(token);
108 } catch (IOException ioe) {
109 throw ioe;
110 } catch (InterruptedException ie) {
111 throw ie;
112 } catch (RuntimeException re) {
113 throw re;
114 } catch (Exception e) {
115 throw new UndeclaredThrowableException(e,
116 "Unexpected exception obtaining token for user "+user.getUserName());
117 }
118 }
119
120
121
122
123
124
125
126
127
128
129 public static void obtainTokenForJob(final Configuration conf,
130 UserGroupInformation user, Job job)
131 throws IOException, InterruptedException {
132 try {
133 Token<AuthenticationTokenIdentifier> token =
134 user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
135 public Token<AuthenticationTokenIdentifier> run() throws Exception {
136 return obtainToken(conf);
137 }
138 });
139
140 if (token == null) {
141 throw new IOException("No token returned for user "+user.getUserName());
142 }
143 Text clusterId = getClusterId(token);
144 LOG.info("Obtained token "+token.getKind().toString()+" for user "+
145 user.getUserName() + " on cluster "+clusterId.toString());
146 job.getCredentials().addToken(clusterId, token);
147 } catch (IOException ioe) {
148 throw ioe;
149 } catch (InterruptedException ie) {
150 throw ie;
151 } catch (RuntimeException re) {
152 throw re;
153 } catch (Exception e) {
154 throw new UndeclaredThrowableException(e,
155 "Unexpected exception obtaining token for user "+user.getUserName());
156 }
157 }
158
159
160
161
162
163
164
165
166
167 public static void obtainTokenForJob(final JobConf job,
168 UserGroupInformation user)
169 throws IOException, InterruptedException {
170 try {
171 Token<AuthenticationTokenIdentifier> token =
172 user.doAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
173 public Token<AuthenticationTokenIdentifier> run() throws Exception {
174 return obtainToken(job);
175 }
176 });
177
178 if (token == null) {
179 throw new IOException("No token returned for user "+user.getUserName());
180 }
181 Text clusterId = getClusterId(token);
182 LOG.info("Obtained token "+token.getKind().toString()+" for user "+
183 user.getUserName()+" on cluster "+clusterId.toString());
184 job.getCredentials().addToken(clusterId, token);
185 } catch (IOException ioe) {
186 throw ioe;
187 } catch (InterruptedException ie) {
188 throw ie;
189 } catch (RuntimeException re) {
190 throw re;
191 } catch (Exception e) {
192 throw new UndeclaredThrowableException(e,
193 "Unexpected exception obtaining token for user "+user.getUserName());
194 }
195 }
196 }