View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
22  import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
23  
24  import java.io.IOException;
25  import java.io.PrintWriter;
26  import java.io.StringWriter;
27  import java.util.Arrays;
28  import java.util.HashSet;
29  import java.util.Map;
30  import java.util.Set;
31  
32  import org.apache.commons.lang.math.RandomUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.client.Append;
41  import org.apache.hadoop.hbase.client.Delete;
42  import org.apache.hadoop.hbase.client.Get;
43  import org.apache.hadoop.hbase.client.HTableInterface;
44  import org.apache.hadoop.hbase.client.Increment;
45  import org.apache.hadoop.hbase.client.Mutation;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
49  import org.apache.hadoop.hbase.client.Table;
50  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
51  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
52  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
53  import org.apache.hadoop.util.StringUtils;
54  
55  import com.google.common.base.Preconditions;
56  
57  /** Creates multiple threads that write key/values into the */
58  public class MultiThreadedUpdater extends MultiThreadedWriterBase {
59    private static final Log LOG = LogFactory.getLog(MultiThreadedUpdater.class);
60  
61    protected Set<HBaseUpdaterThread> updaters = new HashSet<HBaseUpdaterThread>();
62  
63    private MultiThreadedWriterBase writer = null;
64    private boolean isBatchUpdate = false;
65    private boolean ignoreNonceConflicts = false;
66    private final double updatePercent;
67  
68    public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
69        TableName tableName, double updatePercent) throws IOException {
70      super(dataGen, conf, tableName, "U");
71      this.updatePercent = updatePercent;
72    }
73  
74    /** Use batch vs. separate updates for every column in a row */
75    public void setBatchUpdate(boolean isBatchUpdate) {
76      this.isBatchUpdate = isBatchUpdate;
77    }
78  
79    public void linkToWriter(MultiThreadedWriterBase writer) {
80      this.writer = writer;
81      writer.setTrackWroteKeys(true);
82    }
83  
84    @Override
85    public void start(long startKey, long endKey, int numThreads) throws IOException {
86      super.start(startKey, endKey, numThreads);
87  
88      if (verbose) {
89        LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
90      }
91  
92      addUpdaterThreads(numThreads);
93  
94      startThreads(updaters);
95    }
96  
97    protected void addUpdaterThreads(int numThreads) throws IOException {
98      for (int i = 0; i < numThreads; ++i) {
99        HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
100       updaters.add(updater);
101     }
102   }
103 
104   private long getNextKeyToUpdate() {
105     if (writer == null) {
106       return nextKeyToWrite.getAndIncrement();
107     }
108     synchronized (this) {
109       if (nextKeyToWrite.get() >= endKey) {
110         // Finished the whole key range
111         return endKey;
112       }
113       while (nextKeyToWrite.get() > writer.wroteUpToKey()) {
114         Threads.sleepWithoutInterrupt(100);
115       }
116       long k = nextKeyToWrite.getAndIncrement();
117       if (writer.failedToWriteKey(k)) {
118         failedKeySet.add(k);
119         return getNextKeyToUpdate();
120       }
121       return k;
122     }
123   }
124 
125   protected class HBaseUpdaterThread extends Thread {
126     protected final Table table;
127 
128     public HBaseUpdaterThread(int updaterId) throws IOException {
129       setName(getClass().getSimpleName() + "_" + updaterId);
130       table = createTable();
131     }
132 
133     protected HTableInterface createTable() throws IOException {
134       return connection.getTable(tableName);
135     }
136 
137     @Override
138     public void run() {
139       try {
140         long rowKeyBase;
141         StringBuilder buf = new StringBuilder();
142         byte[][] columnFamilies = dataGenerator.getColumnFamilies();
143         while ((rowKeyBase = getNextKeyToUpdate()) < endKey) {
144           if (RandomUtils.nextInt(100) < updatePercent) {
145             byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
146             Increment inc = new Increment(rowKey);
147             Append app = new Append(rowKey);
148             numKeys.addAndGet(1);
149             int columnCount = 0;
150             for (byte[] cf : columnFamilies) {
151               long cfHash = Arrays.hashCode(cf);
152               inc.addColumn(cf, INCREMENT, cfHash);
153               buf.setLength(0); // Clear the buffer
154               buf.append("#").append(Bytes.toString(INCREMENT));
155               buf.append(":").append(MutationType.INCREMENT.getNumber());
156               app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
157               ++columnCount;
158               if (!isBatchUpdate) {
159                 mutate(table, inc, rowKeyBase);
160                 numCols.addAndGet(1);
161                 inc = new Increment(rowKey);
162                 mutate(table, app, rowKeyBase);
163                 numCols.addAndGet(1);
164                 app = new Append(rowKey);
165               }
166               Get get = new Get(rowKey);
167               get.addFamily(cf);
168               try {
169                 get = dataGenerator.beforeGet(rowKeyBase, get);
170               } catch (Exception e) {
171                 // Ideally wont happen
172                 LOG.warn("Failed to modify the get from the load generator  = [" + get.getRow()
173                     + "], column family = [" + Bytes.toString(cf) + "]", e);
174               }
175               Result result = getRow(get, rowKeyBase, cf);
176               Map<byte[], byte[]> columnValues =
177                 result != null ? result.getFamilyMap(cf) : null;
178               if (columnValues == null) {
179                 int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
180                 if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) {
181                   LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey));
182                 } else {
183                   failedKeySet.add(rowKeyBase);
184                   LOG.error("Failed to update the row with key = [" + rowKey
185                       + "], since we could not get the original row");
186                 }
187               }
188               if(columnValues != null) {
189                 for (byte[] column : columnValues.keySet()) {
190                   if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) {
191                     continue;
192                   }
193                   MutationType mt = MutationType
194                       .valueOf(RandomUtils.nextInt(MutationType.values().length));
195                   long columnHash = Arrays.hashCode(column);
196                   long hashCode = cfHash + columnHash;
197                   byte[] hashCodeBytes = Bytes.toBytes(hashCode);
198                   byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
199                   if (hashCode % 2 == 0) {
200                     Cell kv = result.getColumnLatestCell(cf, column);
201                     checkedValue = kv != null ? CellUtil.cloneValue(kv) : null;
202                     Preconditions.checkNotNull(checkedValue,
203                         "Column value to be checked should not be null");
204                   }
205                   buf.setLength(0); // Clear the buffer
206                   buf.append("#").append(Bytes.toString(column)).append(":");
207                   ++columnCount;
208                   switch (mt) {
209                   case PUT:
210                     Put put = new Put(rowKey);
211                     put.add(cf, column, hashCodeBytes);
212                     mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
213                     buf.append(MutationType.PUT.getNumber());
214                     break;
215                   case DELETE:
216                     Delete delete = new Delete(rowKey);
217                     // Delete all versions since a put
218                     // could be called multiple times if CM is used
219                     delete.deleteColumns(cf, column);
220                     mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
221                     buf.append(MutationType.DELETE.getNumber());
222                     break;
223                   default:
224                     buf.append(MutationType.APPEND.getNumber());
225                     app.add(cf, column, hashCodeBytes);
226                   }
227                   app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
228                   if (!isBatchUpdate) {
229                     mutate(table, app, rowKeyBase);
230                     numCols.addAndGet(1);
231                     app = new Append(rowKey);
232                   }
233                 }
234               }
235             }
236             if (isBatchUpdate) {
237               if (verbose) {
238                 LOG.debug("Preparing increment and append for key = ["
239                   + rowKey + "], " + columnCount + " columns");
240               }
241               mutate(table, inc, rowKeyBase);
242               mutate(table, app, rowKeyBase);
243               numCols.addAndGet(columnCount);
244             }
245           }
246           if (trackWroteKeys) {
247             wroteKeys.add(rowKeyBase);
248           }
249         }
250       } finally {
251         closeHTable();
252         numThreadsWorking.decrementAndGet();
253       }
254     }
255 
256     protected void closeHTable() {
257       try {
258         if (table != null) {
259           table.close();
260         }
261       } catch (IOException e) {
262         LOG.error("Error closing table", e);
263       }
264     }
265 
266     protected Result getRow(Get get, long rowKeyBase, byte[] cf) {
267       Result result = null;
268       try {
269         result = table.get(get);
270       } catch (IOException ie) {
271         LOG.warn(
272             "Failed to get the row for key = [" + get.getRow() + "], column family = ["
273                 + Bytes.toString(cf) + "]", ie);
274       }
275       return result;
276     }
277 
278     public void mutate(Table table, Mutation m, long keyBase) {
279       mutate(table, m, keyBase, null, null, null, null);
280     }
281 
282     public void mutate(Table table, Mutation m,
283         long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
284       long start = System.currentTimeMillis();
285       try {
286         m = dataGenerator.beforeMutate(keyBase, m);
287         if (m instanceof Increment) {
288           table.increment((Increment)m);
289         } else if (m instanceof Append) {
290           table.append((Append)m);
291         } else if (m instanceof Put) {
292           table.checkAndPut(row, cf, q, v, (Put)m);
293         } else if (m instanceof Delete) {
294           table.checkAndDelete(row, cf, q, v, (Delete)m);
295         } else {
296           throw new IllegalArgumentException(
297             "unsupported mutation " + m.getClass().getSimpleName());
298         }
299         totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
300       } catch (IOException e) {
301         if (ignoreNonceConflicts && (e instanceof OperationConflictException)) {
302           LOG.info("Detected nonce conflict, ignoring: " + e.getMessage());
303           totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
304           return;
305         }
306         failedKeySet.add(keyBase);
307         String exceptionInfo;
308         if (e instanceof RetriesExhaustedWithDetailsException) {
309           RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
310           exceptionInfo = aggEx.getExhaustiveDescription();
311         } else {
312           exceptionInfo = StringUtils.stringifyException(e);
313         }
314         LOG.error("Failed to mutate: " + keyBase + " after " +
315             (System.currentTimeMillis() - start) +
316           "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
317             + exceptionInfo);
318       }
319     }
320   }
321 
322   @Override
323   public void waitForFinish() {
324     super.waitForFinish();
325     System.out.println("Failed to update keys: " + failedKeySet.size());
326     for (Long key : failedKeySet) {
327        System.out.println("Failed to update key: " + key);
328     }
329   }
330 
331   public void mutate(Table table, Mutation m, long keyBase) {
332     mutate(table, m, keyBase, null, null, null, null);
333   }
334 
335   public void mutate(Table table, Mutation m,
336       long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
337     long start = System.currentTimeMillis();
338     try {
339       m = dataGenerator.beforeMutate(keyBase, m);
340       if (m instanceof Increment) {
341         table.increment((Increment)m);
342       } else if (m instanceof Append) {
343         table.append((Append)m);
344       } else if (m instanceof Put) {
345         table.checkAndPut(row, cf, q, v, (Put)m);
346       } else if (m instanceof Delete) {
347         table.checkAndDelete(row, cf, q, v, (Delete)m);
348       } else {
349         throw new IllegalArgumentException(
350           "unsupported mutation " + m.getClass().getSimpleName());
351       }
352       totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
353     } catch (IOException e) {
354       failedKeySet.add(keyBase);
355       String exceptionInfo;
356       if (e instanceof RetriesExhaustedWithDetailsException) {
357         RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
358         exceptionInfo = aggEx.getExhaustiveDescription();
359       } else {
360         StringWriter stackWriter = new StringWriter();
361         PrintWriter pw = new PrintWriter(stackWriter);
362         e.printStackTrace(pw);
363         pw.flush();
364         exceptionInfo = StringUtils.stringifyException(e);
365       }
366       LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) +
367         "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
368           + exceptionInfo);
369     }
370   }
371 
372   public void setIgnoreNonceConflicts(boolean value) {
373     this.ignoreNonceConflicts = value;
374   }
375 }