View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.util;
18  
19  import java.io.IOException;
20  import java.security.PrivilegedExceptionAction;
21  import java.util.HashMap;
22  import java.util.Map;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.TableName;
28  import org.apache.hadoop.hbase.client.Get;
29  import org.apache.hadoop.hbase.client.HTable;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.security.User;
32  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
33  import org.apache.hadoop.security.UserGroupInformation;
34  
35  /**
36   * A MultiThreadReader that helps to work with ACL
37   */
38  public class MultiThreadedReaderWithACL extends MultiThreadedReader {
39    private static final Log LOG = LogFactory.getLog(MultiThreadedReaderWithACL.class);
40  
41    private static final String COMMA = ",";
42    /**
43     * Maps user with Table instance. Because the table instance has to be created
44     * per user inorder to work in that user's context
45     */
46    private Map<String, HTable> userVsTable = new HashMap<String, HTable>();
47    private Map<String, User> users = new HashMap<String, User>();
48    private String[] userNames;
49  
50    public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf,
51        TableName tableName, double verifyPercent, String userNames) {
52      super(dataGen, conf, tableName, verifyPercent);
53      this.userNames = userNames.split(COMMA);
54    }
55  
56    @Override
57    protected void addReaderThreads(int numThreads) throws IOException {
58      for (int i = 0; i < numThreads; ++i) {
59        HBaseReaderThread reader = new HBaseReaderThreadWithACL(i);
60        readers.add(reader);
61      }
62    }
63  
64    public class HBaseReaderThreadWithACL extends HBaseReaderThread {
65  
66      public HBaseReaderThreadWithACL(int readerId) throws IOException {
67        super(readerId);
68      }
69  
70      @Override
71      protected HTable createTable() throws IOException {
72        return null;
73      }
74  
75      @Override
76      protected void closeTable() {
77        for (HTable table : userVsTable.values()) {
78          try {
79            table.close();
80          } catch (Exception e) {
81            LOG.error("Error while closing the table " + table.getName(), e);
82          }
83        }
84      }
85  
86      @Override
87      public void queryKey(final Get get, final boolean verify, final long keyToRead)
88          throws IOException {
89        final String rowKey = Bytes.toString(get.getRow());
90  
91        // read the data
92        final long start = System.currentTimeMillis();
93        PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
94          @Override
95          public Object run() throws Exception {
96            HTable localTable = null;
97            try {
98              Result result = null;
99              int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
100             int mod = ((int) keyToRead % userNames.length);
101             if (userVsTable.get(userNames[mod]) == null) {
102               localTable = new HTable(conf, tableName);
103               userVsTable.put(userNames[mod], localTable);
104               result = localTable.get(get);
105             } else {
106               localTable = userVsTable.get(userNames[mod]);
107               result = localTable.get(get);
108             }
109             boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0);
110             getResultMetricUpdation(verify, rowKey, start, result, localTable, isNullExpected);
111           } catch (IOException e) {
112             recordFailure(keyToRead);
113           }
114           return null;
115         }
116       };
117       if (userNames != null && userNames.length > 0) {
118         int mod = ((int) keyToRead % userNames.length);
119         User user;
120         UserGroupInformation realUserUgi;
121         if(!users.containsKey(userNames[mod])) {
122           if(User.isHBaseSecurityEnabled(conf)) {
123             realUserUgi = LoadTestTool.loginAndReturnUGI(conf, userNames[mod]);
124           } else {
125             realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
126           }
127           user = User.create(realUserUgi);
128           users.put(userNames[mod], user);
129         } else {
130           user = users.get(userNames[mod]);
131         }
132         try {
133           user.runAs(action);
134         } catch (Exception e) {
135           recordFailure(keyToRead);
136         }
137       }
138     }
139 
140     private void recordFailure(final long keyToRead) {
141       numReadFailures.addAndGet(1);
142       LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", "
143           + "time from start: " + (System.currentTimeMillis() - startTimeMs) + " ms");
144     }
145   }
146 
147 }