View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Iterator;
25  import java.util.List;
26  
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.HBaseTestCase;
29  import org.apache.hadoop.hbase.KeyValue;
30  import org.apache.hadoop.hbase.testclassification.SmallTests;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.hbase.util.CollectionBackedScanner;
33  import org.junit.experimental.categories.Category;
34  
35  @Category(SmallTests.class)
36  public class TestKeyValueHeap extends HBaseTestCase {
37    private static final boolean PRINT = false;
38  
39    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
40  
41    private byte[] row1;
42    private byte[] fam1;
43    private byte[] col1;
44    private byte[] data;
45  
46    private byte[] row2;
47    private byte[] fam2;
48    private byte[] col2;
49  
50    private byte[] col3;
51    private byte[] col4;
52    private byte[] col5;
53  
54    public void setUp() throws Exception {
55      super.setUp();
56      data = Bytes.toBytes("data");
57      row1 = Bytes.toBytes("row1");
58      fam1 = Bytes.toBytes("fam1");
59      col1 = Bytes.toBytes("col1");
60      row2 = Bytes.toBytes("row2");
61      fam2 = Bytes.toBytes("fam2");
62      col2 = Bytes.toBytes("col2");
63      col3 = Bytes.toBytes("col3");
64      col4 = Bytes.toBytes("col4");
65      col5 = Bytes.toBytes("col5");
66    }
67  
68    public void testSorted() throws IOException{
69      //Cases that need to be checked are:
70      //1. The "smallest" KeyValue is in the same scanners as current
71      //2. Current scanner gets empty
72  
73      List<Cell> l1 = new ArrayList<Cell>();
74      l1.add(new KeyValue(row1, fam1, col5, data));
75      l1.add(new KeyValue(row2, fam1, col1, data));
76      l1.add(new KeyValue(row2, fam1, col2, data));
77      scanners.add(new Scanner(l1));
78  
79      List<Cell> l2 = new ArrayList<Cell>();
80      l2.add(new KeyValue(row1, fam1, col1, data));
81      l2.add(new KeyValue(row1, fam1, col2, data));
82      scanners.add(new Scanner(l2));
83  
84      List<Cell> l3 = new ArrayList<Cell>();
85      l3.add(new KeyValue(row1, fam1, col3, data));
86      l3.add(new KeyValue(row1, fam1, col4, data));
87      l3.add(new KeyValue(row1, fam2, col1, data));
88      l3.add(new KeyValue(row1, fam2, col2, data));
89      l3.add(new KeyValue(row2, fam1, col3, data));
90      scanners.add(new Scanner(l3));
91  
92      List<KeyValue> expected = new ArrayList<KeyValue>();
93      expected.add(new KeyValue(row1, fam1, col1, data));
94      expected.add(new KeyValue(row1, fam1, col2, data));
95      expected.add(new KeyValue(row1, fam1, col3, data));
96      expected.add(new KeyValue(row1, fam1, col4, data));
97      expected.add(new KeyValue(row1, fam1, col5, data));
98      expected.add(new KeyValue(row1, fam2, col1, data));
99      expected.add(new KeyValue(row1, fam2, col2, data));
100     expected.add(new KeyValue(row2, fam1, col1, data));
101     expected.add(new KeyValue(row2, fam1, col2, data));
102     expected.add(new KeyValue(row2, fam1, col3, data));
103 
104     //Creating KeyValueHeap
105     KeyValueHeap kvh =
106       new KeyValueHeap(scanners, KeyValue.COMPARATOR);
107 
108     List<Cell> actual = new ArrayList<Cell>();
109     while(kvh.peek() != null){
110       actual.add(kvh.next());
111     }
112 
113     assertEquals(expected.size(), actual.size());
114     for(int i=0; i<expected.size(); i++){
115       assertEquals(expected.get(i), actual.get(i));
116       if(PRINT){
117         System.out.println("expected " +expected.get(i)+
118             "\nactual   " +actual.get(i) +"\n");
119       }
120     }
121 
122     //Check if result is sorted according to Comparator
123     for(int i=0; i<actual.size()-1; i++){
124       int ret = KeyValue.COMPARATOR.compare(actual.get(i), actual.get(i+1));
125       assertTrue(ret < 0);
126     }
127 
128   }
129 
130   public void testSeek() throws IOException {
131     //Cases:
132     //1. Seek KeyValue that is not in scanner
133     //2. Check that smallest that is returned from a seek is correct
134 
135     List<Cell> l1 = new ArrayList<Cell>();
136     l1.add(new KeyValue(row1, fam1, col5, data));
137     l1.add(new KeyValue(row2, fam1, col1, data));
138     l1.add(new KeyValue(row2, fam1, col2, data));
139     scanners.add(new Scanner(l1));
140 
141     List<Cell> l2 = new ArrayList<Cell>();
142     l2.add(new KeyValue(row1, fam1, col1, data));
143     l2.add(new KeyValue(row1, fam1, col2, data));
144     scanners.add(new Scanner(l2));
145 
146     List<Cell> l3 = new ArrayList<Cell>();
147     l3.add(new KeyValue(row1, fam1, col3, data));
148     l3.add(new KeyValue(row1, fam1, col4, data));
149     l3.add(new KeyValue(row1, fam2, col1, data));
150     l3.add(new KeyValue(row1, fam2, col2, data));
151     l3.add(new KeyValue(row2, fam1, col3, data));
152     scanners.add(new Scanner(l3));
153 
154     List<KeyValue> expected = new ArrayList<KeyValue>();
155     expected.add(new KeyValue(row2, fam1, col1, data));
156 
157     //Creating KeyValueHeap
158     KeyValueHeap kvh =
159       new KeyValueHeap(scanners, KeyValue.COMPARATOR);
160 
161     KeyValue seekKv = new KeyValue(row2, fam1, null, null);
162     kvh.seek(seekKv);
163 
164     List<Cell> actual = new ArrayList<Cell>();
165     actual.add(kvh.peek());
166 
167     assertEquals(expected.size(), actual.size());
168     for(int i=0; i<expected.size(); i++){
169       assertEquals(expected.get(i), actual.get(i));
170       if(PRINT){
171         System.out.println("expected " +expected.get(i)+
172             "\nactual   " +actual.get(i) +"\n");
173       }
174     }
175 
176   }
177 
178   public void testScannerLeak() throws IOException {
179     // Test for unclosed scanners (HBASE-1927)
180 
181     List<Cell> l1 = new ArrayList<Cell>();
182     l1.add(new KeyValue(row1, fam1, col5, data));
183     l1.add(new KeyValue(row2, fam1, col1, data));
184     l1.add(new KeyValue(row2, fam1, col2, data));
185     scanners.add(new Scanner(l1));
186 
187     List<Cell> l2 = new ArrayList<Cell>();
188     l2.add(new KeyValue(row1, fam1, col1, data));
189     l2.add(new KeyValue(row1, fam1, col2, data));
190     scanners.add(new Scanner(l2));
191 
192     List<Cell> l3 = new ArrayList<Cell>();
193     l3.add(new KeyValue(row1, fam1, col3, data));
194     l3.add(new KeyValue(row1, fam1, col4, data));
195     l3.add(new KeyValue(row1, fam2, col1, data));
196     l3.add(new KeyValue(row1, fam2, col2, data));
197     l3.add(new KeyValue(row2, fam1, col3, data));
198     scanners.add(new Scanner(l3));
199 
200     List<Cell> l4 = new ArrayList<Cell>();
201     scanners.add(new Scanner(l4));
202 
203     //Creating KeyValueHeap
204     KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
205 
206     while(kvh.next() != null);
207 
208     for(KeyValueScanner scanner : scanners) {
209       assertTrue(((Scanner)scanner).isClosed());
210     }
211   }
212 
213   public void testScannerException() throws IOException {
214     // Test for NPE issue when exception happens in scanners (HBASE-13835)
215 
216     List<Cell> l1 = new ArrayList<Cell>();
217     l1.add(new KeyValue(row1, fam1, col5, data));
218     l1.add(new KeyValue(row2, fam1, col1, data));
219     l1.add(new KeyValue(row2, fam1, col2, data));
220     SeekScanner s1 = new SeekScanner(l1);
221     scanners.add(s1);
222 
223     List<Cell> l2 = new ArrayList<Cell>();
224     l2.add(new KeyValue(row1, fam1, col1, data));
225     l2.add(new KeyValue(row1, fam1, col2, data));
226     SeekScanner s2 = new SeekScanner(l2);
227     scanners.add(s2);
228 
229     List<Cell> l3 = new ArrayList<Cell>();
230     l3.add(new KeyValue(row1, fam1, col3, data));
231     l3.add(new KeyValue(row1, fam1, col4, data));
232     l3.add(new KeyValue(row1, fam2, col1, data));
233     l3.add(new KeyValue(row1, fam2, col2, data));
234     l3.add(new KeyValue(row2, fam1, col3, data));
235     SeekScanner s3 = new SeekScanner(l3);
236     scanners.add(s3);
237 
238     List<Cell> l4 = new ArrayList<Cell>();
239     SeekScanner s4 = new SeekScanner(l4);
240     scanners.add(s4);
241 
242     // Creating KeyValueHeap
243     KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
244 
245     try {
246       for (KeyValueScanner scanner : scanners) {
247         ((SeekScanner) scanner).setRealSeekDone(false);
248       }
249       while (kvh.next() != null);
250       // The pollRealKV should throw IOE.
251       assertTrue(false);
252     } catch (IOException ioe) {
253       kvh.close();
254     }
255 
256     // It implies there is no NPE thrown from kvh.close() if getting here
257     for (KeyValueScanner scanner : scanners) {
258       // Verify that close is called and only called once for each scanner
259       assertTrue(((SeekScanner) scanner).isClosed());
260       assertEquals(((SeekScanner) scanner).getClosedNum(), 1);
261     }
262   }
263 
264   private static class Scanner extends CollectionBackedScanner {
265     private Iterator<Cell> iter;
266     private Cell current;
267     private boolean closed = false;
268 
269     public Scanner(List<Cell> list) {
270       super(list);
271     }
272 
273     @Override
274     public void close(){
275       closed = true;
276     }
277 
278     public boolean isClosed() {
279       return closed;
280     }
281   }
282 
283   private static class SeekScanner extends Scanner {
284     private int closedNum = 0;
285     private boolean realSeekDone = true;
286 
287     public SeekScanner(List<Cell> list) {
288       super(list);
289     }
290 
291     @Override
292     public void close() {
293       super.close();
294       closedNum++;
295     }
296 
297     public int getClosedNum() {
298       return closedNum;
299     }
300 
301     @Override
302     public boolean realSeekDone() {
303       return realSeekDone;
304     }
305 
306     public void setRealSeekDone(boolean done) {
307       realSeekDone = done;
308     }
309 
310     @Override
311     public void enforceSeek() throws IOException {
312       throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner");
313     }
314   }
315 }