View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.IOException;
22  import java.util.PriorityQueue;
23  import java.util.Queue;
24  import java.util.Set;
25  import java.util.concurrent.ArrayBlockingQueue;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.ConcurrentSkipListSet;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HRegionLocation;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.client.Table;
37  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
38  
39  /** Creates multiple threads that write key/values into the */
40  public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
41    private static final Log LOG = LogFactory.getLog(MultiThreadedWriterBase.class);
42  
43    /**
44     * A temporary place to keep track of inserted/updated keys. This is written to by
45     * all writers and is drained on a separate thread that populates
46     * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
47     * being inserted/updated. This queue is supposed to stay small.
48     */
49    protected BlockingQueue<Long> wroteKeys;
50  
51    /**
52     * This is the current key to be inserted/updated by any thread. Each thread does an
53     * atomic get and increment operation and inserts the current value.
54     */
55    protected AtomicLong nextKeyToWrite = new AtomicLong();
56  
57    /**
58     * The highest key in the contiguous range of keys .
59     */
60    protected AtomicLong wroteUpToKey = new AtomicLong();
61  
62    /** The sorted set of keys NOT inserted/updated by the writers */
63    protected Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
64  
65    /**
66     * The total size of the temporary inserted/updated key set that have not yet lined
67     * up in a our contiguous sequence starting from startKey. Supposed to stay
68     * small.
69     */
70    protected AtomicLong wroteKeyQueueSize = new AtomicLong();
71  
72    /** Enable this if used in conjunction with a concurrent reader. */
73    protected boolean trackWroteKeys;
74  
75    public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
76        TableName tableName, String actionLetter) throws IOException {
77      super(dataGen, conf, tableName, actionLetter);
78      this.wroteKeys = createWriteKeysQueue(conf);
79    }
80  
81    protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
82      return new ArrayBlockingQueue<Long>(10000);
83    }
84  
85    @Override
86    public void start(long startKey, long endKey, int numThreads) throws IOException {
87      super.start(startKey, endKey, numThreads);
88      nextKeyToWrite.set(startKey);
89      wroteUpToKey.set(startKey - 1);
90  
91      if (trackWroteKeys) {
92        new Thread(new WroteKeysTracker()).start();
93        numThreadsWorking.incrementAndGet();
94      }
95    }
96  
97    protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) {
98      HRegionLocation cached = null, real = null;
99      try {
100       cached = connection.getRegionLocation(tableName, rowKey, false);
101       real = connection.getRegionLocation(tableName, rowKey, true);
102     } catch (Throwable t) {
103       // Cannot obtain region information for another catch block - too bad!
104     }
105     String result = "no information can be obtained";
106     if (cached != null) {
107       result = "cached: " + cached.toString();
108     }
109     if (real != null && real.getServerName() != null) {
110       if (cached != null && cached.getServerName() != null && real.equals(cached)) {
111         result += "; cache is up to date";
112       } else {
113         result = (cached != null) ? (result + "; ") : "";
114         result += "real: " + real.toString();
115       }
116     }
117     return result;
118   }
119 
120   /**
121    * A thread that keeps track of the highest key in the contiguous range of
122    * inserted/updated keys.
123    */
124   private class WroteKeysTracker implements Runnable {
125 
126     @Override
127     public void run() {
128       Thread.currentThread().setName(getClass().getSimpleName());
129       try {
130         long expectedKey = startKey;
131         Queue<Long> sortedKeys = new PriorityQueue<Long>();
132         while (expectedKey < endKey) {
133           // Block until a new element is available.
134           Long k;
135           try {
136             k = wroteKeys.poll(1, TimeUnit.SECONDS);
137           } catch (InterruptedException e) {
138             LOG.info("Inserted key tracker thread interrupted", e);
139             break;
140           }
141           if (k == null) {
142             continue;
143           }
144           if (k == expectedKey) {
145             // Skip the "sorted key" queue and consume this key.
146             wroteUpToKey.set(k);
147             ++expectedKey;
148           } else {
149             sortedKeys.add(k);
150           }
151 
152           // See if we have a sequence of contiguous keys lined up.
153           while (!sortedKeys.isEmpty()
154               && ((k = sortedKeys.peek()) == expectedKey)) {
155             sortedKeys.poll();
156             wroteUpToKey.set(k);
157             ++expectedKey;
158           }
159 
160           wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
161         }
162       } catch (Exception ex) {
163         LOG.error("Error in inserted/updaed key tracker", ex);
164       } finally {
165         numThreadsWorking.decrementAndGet();
166       }
167     }
168   }
169 
170   public int getNumWriteFailures() {
171     return failedKeySet.size();
172   }
173 
174   /**
175    * The max key until which all keys have been inserted/updated (successfully or not).
176    * @return the last key that we have inserted/updated all keys up to (inclusive)
177    */
178   public long wroteUpToKey() {
179     return wroteUpToKey.get();
180   }
181 
182   public boolean failedToWriteKey(long k) {
183     return failedKeySet.contains(k);
184   }
185 
186   @Override
187   protected String progressInfo() {
188     StringBuilder sb = new StringBuilder();
189     appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
190     appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
191     return sb.toString();
192   }
193 
194   /**
195    * Used for a joint write/read workload. Enables tracking the last inserted/updated
196    * key, which requires a blocking queue and a consumer thread.
197    * @param enable whether to enable tracking the last inserted/updated key
198    */
199   public void setTrackWroteKeys(boolean enable) {
200     trackWroteKeys = enable;
201   }
202 }