1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.util;
19
20 import java.io.IOException;
21 import java.io.PrintWriter;
22 import java.io.StringWriter;
23 import java.security.PrivilegedExceptionAction;
24 import java.util.HashMap;
25 import java.util.Map;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.client.Append;
32 import org.apache.hadoop.hbase.client.Delete;
33 import org.apache.hadoop.hbase.client.Get;
34 import org.apache.hadoop.hbase.client.HTable;
35 import org.apache.hadoop.hbase.client.Increment;
36 import org.apache.hadoop.hbase.client.Mutation;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Result;
39 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
40 import org.apache.hadoop.hbase.security.User;
41 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
42 import org.apache.hadoop.security.UserGroupInformation;
43 import org.apache.hadoop.util.StringUtils;
44
45
46
47
48 public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
49 private static final Log LOG = LogFactory.getLog(MultiThreadedUpdaterWithACL.class);
50 private final static String COMMA= ",";
51 private User userOwner;
52
53
54
55
56 private Map<String, HTable> userVsTable = new HashMap<String, HTable>();
57 private Map<String, User> users = new HashMap<String, User>();
58 private String[] userNames;
59
60 public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
61 TableName tableName, double updatePercent, User userOwner, String userNames) {
62 super(dataGen, conf, tableName, updatePercent);
63 this.userOwner = userOwner;
64 this.userNames = userNames.split(COMMA);
65 }
66
67 @Override
68 protected void addUpdaterThreads(int numThreads) throws IOException {
69 for (int i = 0; i < numThreads; ++i) {
70 HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i);
71 updaters.add(updater);
72 }
73 }
74
75 public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread {
76
77 private HTable table;
78 private MutateAccessAction mutateAction = new MutateAccessAction();
79
80 public HBaseUpdaterThreadWithACL(int updaterId) throws IOException {
81 super(updaterId);
82 }
83
84 @Override
85 protected HTable createTable() throws IOException {
86 return null;
87 }
88
89 @Override
90 protected void closeHTable() {
91 try {
92 if (table != null) {
93 table.close();
94 }
95 for (HTable table : userVsTable.values()) {
96 try {
97 table.close();
98 } catch (Exception e) {
99 LOG.error("Error while closing the table " + table.getName(), e);
100 }
101 }
102 } catch (Exception e) {
103 LOG.error("Error while closing the HTable "+table.getName(), e);
104 }
105 }
106
107 @Override
108 protected Result getRow(final Get get, final long rowKeyBase, final byte[] cf) {
109 PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
110
111 @Override
112 public Object run() throws Exception {
113 Result res = null;
114 HTable localTable = null;
115 try {
116 int mod = ((int) rowKeyBase % userNames.length);
117 if (userVsTable.get(userNames[mod]) == null) {
118 localTable = new HTable(conf, tableName);
119 userVsTable.put(userNames[mod], localTable);
120 res = localTable.get(get);
121 } else {
122 localTable = userVsTable.get(userNames[mod]);
123 res = localTable.get(get);
124 }
125 } catch (IOException ie) {
126 LOG.warn("Failed to get the row for key = [" + get.getRow() + "], column family = ["
127 + Bytes.toString(cf) + "]", ie);
128 }
129 return res;
130 }
131 };
132
133 if (userNames != null && userNames.length > 0) {
134 int mod = ((int) rowKeyBase % userNames.length);
135 User user;
136 UserGroupInformation realUserUgi;
137 try {
138 if (!users.containsKey(userNames[mod])) {
139 if (User.isHBaseSecurityEnabled(conf)) {
140 realUserUgi = LoadTestTool.loginAndReturnUGI(conf, userNames[mod]);
141 } else {
142 realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
143 }
144 user = User.create(realUserUgi);
145 users.put(userNames[mod], user);
146 } else {
147 user = users.get(userNames[mod]);
148 }
149 Result result = (Result) user.runAs(action);
150 return result;
151 } catch (Exception ie) {
152 LOG.warn("Failed to get the row for key = [" + get.getRow() + "], column family = ["
153 + Bytes.toString(cf) + "]", ie);
154 }
155 }
156
157 return null;
158 }
159
160 @Override
161 public void mutate(final HTable table, Mutation m, final long keyBase, final byte[] row,
162 final byte[] cf, final byte[] q, final byte[] v) {
163 final long start = System.currentTimeMillis();
164 try {
165 m = dataGenerator.beforeMutate(keyBase, m);
166 mutateAction.setMutation(m);
167 mutateAction.setCF(cf);
168 mutateAction.setRow(row);
169 mutateAction.setQualifier(q);
170 mutateAction.setValue(v);
171 mutateAction.setStartTime(start);
172 mutateAction.setKeyBase(keyBase);
173 userOwner.runAs(mutateAction);
174 } catch (IOException e) {
175 recordFailure(m, keyBase, start, e);
176 } catch (InterruptedException e) {
177 failedKeySet.add(keyBase);
178 }
179 }
180
181 class MutateAccessAction implements PrivilegedExceptionAction<Object> {
182 private HTable table;
183 private long start;
184 private Mutation m;
185 private long keyBase;
186 private byte[] row;
187 private byte[] cf;
188 private byte[] q;
189 private byte[] v;
190
191 public MutateAccessAction() {
192
193 }
194
195 public void setStartTime(final long start) {
196 this.start = start;
197 }
198
199 public void setMutation(final Mutation m) {
200 this.m = m;
201 }
202
203 public void setRow(final byte[] row) {
204 this.row = row;
205 }
206
207 public void setCF(final byte[] cf) {
208 this.cf = cf;
209 }
210
211 public void setQualifier(final byte[] q) {
212 this.q = q;
213 }
214
215 public void setValue(final byte[] v) {
216 this.v = v;
217 }
218
219 public void setKeyBase(final long keyBase) {
220 this.keyBase = keyBase;
221 }
222
223 @Override
224 public Object run() throws Exception {
225 try {
226 if (table == null) {
227 table = new HTable(conf, tableName);
228 }
229 if (m instanceof Increment) {
230 table.increment((Increment) m);
231 } else if (m instanceof Append) {
232 table.append((Append) m);
233 } else if (m instanceof Put) {
234 table.checkAndPut(row, cf, q, v, (Put) m);
235 } else if (m instanceof Delete) {
236 table.checkAndDelete(row, cf, q, v, (Delete) m);
237 } else {
238 throw new IllegalArgumentException("unsupported mutation "
239 + m.getClass().getSimpleName());
240 }
241 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
242 } catch (IOException e) {
243 recordFailure(m, keyBase, start, e);
244 }
245 return null;
246 }
247 }
248
249 private void recordFailure(final Mutation m, final long keyBase,
250 final long start, IOException e) {
251 failedKeySet.add(keyBase);
252 String exceptionInfo;
253 if (e instanceof RetriesExhaustedWithDetailsException) {
254 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
255 exceptionInfo = aggEx.getExhaustiveDescription();
256 } else {
257 StringWriter stackWriter = new StringWriter();
258 PrintWriter pw = new PrintWriter(stackWriter);
259 e.printStackTrace(pw);
260 pw.flush();
261 exceptionInfo = StringUtils.stringifyException(e);
262 }
263 LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start)
264 + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
265 + exceptionInfo);
266 }
267 }
268 }