View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import java.io.IOException;
22  import java.util.PriorityQueue;
23  import java.util.Queue;
24  import java.util.Set;
25  import java.util.concurrent.ArrayBlockingQueue;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.ConcurrentSkipListSet;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HRegionLocation;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
38  
39  /** Creates multiple threads that write key/values into the */
40  public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
41    private static final Log LOG = LogFactory.getLog(MultiThreadedWriterBase.class);
42  
43    /**
44     * A temporary place to keep track of inserted/updated keys. This is written to by
45     * all writers and is drained on a separate thread that populates
46     * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
47     * being inserted/updated. This queue is supposed to stay small.
48     */
49    protected BlockingQueue<Long> wroteKeys = new ArrayBlockingQueue<Long>(10000);
50  
51    /**
52     * This is the current key to be inserted/updated by any thread. Each thread does an
53     * atomic get and increment operation and inserts the current value.
54     */
55    protected AtomicLong nextKeyToWrite = new AtomicLong();
56  
57    /**
58     * The highest key in the contiguous range of keys .
59     */
60    protected AtomicLong wroteUpToKey = new AtomicLong();
61  
62    /** The sorted set of keys NOT inserted/updated by the writers */
63    protected Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
64  
65    /**
66     * The total size of the temporary inserted/updated key set that have not yet lined
67     * up in a our contiguous sequence starting from startKey. Supposed to stay
68     * small.
69     */
70    protected AtomicLong wroteKeyQueueSize = new AtomicLong();
71  
72    /** Enable this if used in conjunction with a concurrent reader. */
73    protected boolean trackWroteKeys;
74  
75    public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
76        TableName tableName, String actionLetter) {
77      super(dataGen, conf, tableName, actionLetter);
78    }
79  
80    @Override
81    public void start(long startKey, long endKey, int numThreads) throws IOException {
82      super.start(startKey, endKey, numThreads);
83      nextKeyToWrite.set(startKey);
84      wroteUpToKey.set(startKey - 1);
85  
86      if (trackWroteKeys) {
87        new Thread(new WroteKeysTracker()).start();
88        numThreadsWorking.incrementAndGet();
89      }
90    }
91  
92    protected String getRegionDebugInfoSafe(HTable table, byte[] rowKey) {
93      HRegionLocation cached = null, real = null;
94      try {
95        cached = table.getRegionLocation(rowKey, false);
96        real = table.getRegionLocation(rowKey, true);
97      } catch (Throwable t) {
98        // Cannot obtain region information for another catch block - too bad!
99      }
100     String result = "no information can be obtained";
101     if (cached != null) {
102       result = "cached: " + cached.toString();
103     }
104     if (real != null) {
105       if (real.equals(cached)) {
106         result += "; cache is up to date";
107       } else {
108         result = (cached != null) ? (result + "; ") : "";
109         result += "real: " + real.toString();
110       }
111     }
112     return result;
113   }
114 
115   /**
116    * A thread that keeps track of the highest key in the contiguous range of
117    * inserted/updated keys.
118    */
119   private class WroteKeysTracker implements Runnable {
120 
121     @Override
122     public void run() {
123       Thread.currentThread().setName(getClass().getSimpleName());
124       try {
125         long expectedKey = startKey;
126         Queue<Long> sortedKeys = new PriorityQueue<Long>();
127         while (expectedKey < endKey) {
128           // Block until a new element is available.
129           Long k;
130           try {
131             k = wroteKeys.poll(1, TimeUnit.SECONDS);
132           } catch (InterruptedException e) {
133             LOG.info("Inserted key tracker thread interrupted", e);
134             break;
135           }
136           if (k == null) {
137             continue;
138           }
139           if (k == expectedKey) {
140             // Skip the "sorted key" queue and consume this key.
141             wroteUpToKey.set(k);
142             ++expectedKey;
143           } else {
144             sortedKeys.add(k);
145           }
146 
147           // See if we have a sequence of contiguous keys lined up.
148           while (!sortedKeys.isEmpty()
149               && ((k = sortedKeys.peek()) == expectedKey)) {
150             sortedKeys.poll();
151             wroteUpToKey.set(k);
152             ++expectedKey;
153           }
154 
155           wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
156         }
157       } catch (Exception ex) {
158         LOG.error("Error in inserted/updaed key tracker", ex);
159       } finally {
160         numThreadsWorking.decrementAndGet();
161       }
162     }
163   }
164 
165   public int getNumWriteFailures() {
166     return failedKeySet.size();
167   }
168 
169   /**
170    * The max key until which all keys have been inserted/updated (successfully or not).
171    * @return the last key that we have inserted/updated all keys up to (inclusive)
172    */
173   public long wroteUpToKey() {
174     return wroteUpToKey.get();
175   }
176 
177   public boolean failedToWriteKey(long k) {
178     return failedKeySet.contains(k);
179   }
180 
181   @Override
182   protected String progressInfo() {
183     StringBuilder sb = new StringBuilder();
184     appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
185     appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
186     return sb.toString();
187   }
188 
189   /**
190    * Used for a joint write/read workload. Enables tracking the last inserted/updated
191    * key, which requires a blocking queue and a consumer thread.
192    * @param enable whether to enable tracking the last inserted/updated key
193    */
194   public void setTrackWroteKeys(boolean enable) {
195     trackWroteKeys = enable;
196   }
197 }