1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.util;
19
20 import java.io.IOException;
21 import java.io.PrintWriter;
22 import java.io.StringWriter;
23 import java.security.PrivilegedExceptionAction;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.client.HTable;
30 import org.apache.hadoop.hbase.client.Put;
31 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
32 import org.apache.hadoop.hbase.security.User;
33 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
34 import org.apache.hadoop.util.StringUtils;
35
36
37
38
39 public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
40
41 private static final Log LOG = LogFactory.getLog(MultiThreadedWriterWithACL.class);
42 private User userOwner;
43
44 public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
45 TableName tableName, User userOwner) {
46 super(dataGen, conf, tableName);
47 this.userOwner = userOwner;
48 }
49
50 @Override
51 public void start(long startKey, long endKey, int numThreads) throws IOException {
52 super.start(startKey, endKey, numThreads);
53 }
54
55 @Override
56 protected void createWriterThreads(int numThreads) throws IOException {
57 for (int i = 0; i < numThreads; ++i) {
58 HBaseWriterThread writer = new HBaseWriterThreadWithACL(i);
59 writers.add(writer);
60 }
61 }
62
63 public class HBaseWriterThreadWithACL extends HBaseWriterThread {
64
65 private HTable table;
66 private WriteAccessAction writerAction = new WriteAccessAction();
67
68 public HBaseWriterThreadWithACL(int writerId) throws IOException {
69 super(writerId);
70 }
71
72 @Override
73 protected HTable createTable() throws IOException {
74 return null;
75 }
76
77 @Override
78 protected void closeHTable() {
79 if (table != null) {
80 try {
81 table.close();
82 } catch (Exception e) {
83 LOG.error("Error in closing the table "+table.getName(), e);
84 }
85 }
86 }
87
88 @Override
89 public void insert(final HTable table, Put put, final long keyBase) {
90 final long start = System.currentTimeMillis();
91 try {
92 put = (Put) dataGenerator.beforeMutate(keyBase, put);
93 writerAction.setPut(put);
94 writerAction.setKeyBase(keyBase);
95 writerAction.setStartTime(start);
96 userOwner.runAs(writerAction);
97 } catch (IOException e) {
98 recordFailure(table, put, keyBase, start, e);
99 } catch (InterruptedException e) {
100 failedKeySet.add(keyBase);
101 }
102 }
103
104 class WriteAccessAction implements PrivilegedExceptionAction<Object> {
105 private Put put;
106 private long keyBase;
107 private long start;
108
109 public WriteAccessAction() {
110 }
111
112 public void setPut(final Put put) {
113 this.put = put;
114 }
115
116 public void setKeyBase(final long keyBase) {
117 this.keyBase = keyBase;
118 }
119
120 public void setStartTime(final long start) {
121 this.start = start;
122 }
123
124 @Override
125 public Object run() throws Exception {
126 try {
127 if (table == null) {
128 table = new HTable(conf, tableName);
129 }
130 table.put(put);
131 } catch (IOException e) {
132 recordFailure(table, put, keyBase, start, e);
133 }
134 return null;
135 }
136 }
137 }
138
139 private void recordFailure(final HTable table, final Put put, final long keyBase,
140 final long start, IOException e) {
141 failedKeySet.add(keyBase);
142 String exceptionInfo;
143 if (e instanceof RetriesExhaustedWithDetailsException) {
144 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
145 exceptionInfo = aggEx.getExhaustiveDescription();
146 } else {
147 StringWriter stackWriter = new StringWriter();
148 PrintWriter pw = new PrintWriter(stackWriter);
149 e.printStackTrace(pw);
150 pw.flush();
151 exceptionInfo = StringUtils.stringifyException(e);
152 }
153 LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
154 + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
155 + exceptionInfo);
156 }
157 }