View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.List;
23  
24  import org.apache.commons.lang.NotImplementedException;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.KeyValue;
27  import org.apache.hadoop.hbase.KeyValue.KVComparator;
28  
29  /**
30   * ReversedKeyValueHeap is used for supporting reversed scanning. Compared with
31   * KeyValueHeap, its scanner comparator is a little different (see
32   * ReversedKVScannerComparator), all seek is backward seek(see
33   * {@link KeyValueScanner#backwardSeek}), and it will jump to the previous row
34   * if it is already at the end of one row when calling next().
35   */
36  @InterfaceAudience.Private
37  public class ReversedKeyValueHeap extends KeyValueHeap {
38  
39    /**
40     * @param scanners
41     * @param comparator
42     * @throws IOException
43     */
44    public ReversedKeyValueHeap(List<? extends KeyValueScanner> scanners,
45        KVComparator comparator) throws IOException {
46      super(scanners, new ReversedKVScannerComparator(comparator));
47    }
48  
49    @Override
50    public boolean seek(KeyValue seekKey) throws IOException {
51      throw new IllegalStateException(
52          "seek cannot be called on ReversedKeyValueHeap");
53    }
54  
55    @Override
56    public boolean reseek(KeyValue seekKey) throws IOException {
57      throw new IllegalStateException(
58          "reseek cannot be called on ReversedKeyValueHeap");
59    }
60  
61    @Override
62    public boolean requestSeek(KeyValue key, boolean forward, boolean useBloom)
63        throws IOException {
64      throw new IllegalStateException(
65          "requestSeek cannot be called on ReversedKeyValueHeap");
66    }
67  
68    @Override
69    public boolean seekToPreviousRow(KeyValue seekKey) throws IOException {
70      if (current == null) {
71        return false;
72      }
73      heap.add(current);
74      current = null;
75  
76      KeyValueScanner scanner;
77      while ((scanner = heap.poll()) != null) {
78        KeyValue topKey = scanner.peek();
79        if (comparator.getComparator().compareRows(topKey.getBuffer(),
80            topKey.getRowOffset(), topKey.getRowLength(), seekKey.getBuffer(),
81            seekKey.getRowOffset(), seekKey.getRowLength()) < 0) {
82          // Row of Top KeyValue is before Seek row.
83          heap.add(scanner);
84          current = pollRealKV();
85          return current != null;
86        }
87  
88        if (!scanner.seekToPreviousRow(seekKey)) {
89          scanner.close();
90        } else {
91          heap.add(scanner);
92        }
93      }
94  
95      // Heap is returning empty, scanner is done
96      return false;
97    }
98  
99    @Override
100   public boolean backwardSeek(KeyValue seekKey) throws IOException {
101     if (current == null) {
102       return false;
103     }
104     heap.add(current);
105     current = null;
106 
107     KeyValueScanner scanner;
108     while ((scanner = heap.poll()) != null) {
109       KeyValue topKey = scanner.peek();
110       if ((comparator.getComparator().matchingRows(seekKey, topKey) && comparator
111           .getComparator().compare(seekKey, topKey) <= 0)
112           || comparator.getComparator().compareRows(seekKey, topKey) > 0) {
113         heap.add(scanner);
114         current = pollRealKV();
115         return current != null;
116       }
117       if (!scanner.backwardSeek(seekKey)) {
118         scanner.close();
119       } else {
120         heap.add(scanner);
121       }
122     }
123     return false;
124   }
125 
126   @Override
127   public KeyValue next() throws IOException {
128     if (this.current == null) {
129       return null;
130     }
131     KeyValue kvReturn = this.current.next();
132     KeyValue kvNext = this.current.peek();
133     if (kvNext == null
134         || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) {
135       if (this.current.seekToPreviousRow(kvReturn)) {
136         this.heap.add(this.current);
137       } else {
138         this.current.close();
139       }
140       this.current = pollRealKV();
141     } else {
142       KeyValueScanner topScanner = this.heap.peek();
143       if (topScanner != null
144           && this.comparator.compare(this.current, topScanner) > 0) {
145         this.heap.add(this.current);
146         this.current = pollRealKV();
147       }
148     }
149     return kvReturn;
150   }
151 
152   /**
153    * In ReversedKVScannerComparator, we compare the row of scanners' peek values
154    * first, sort bigger one before the smaller one. Then compare the KeyValue if
155    * they have the equal row, sort smaller one before the bigger one
156    */
157   private static class ReversedKVScannerComparator extends
158       KVScannerComparator {
159 
160     /**
161      * Constructor
162      * @param kvComparator
163      */
164     public ReversedKVScannerComparator(KVComparator kvComparator) {
165       super(kvComparator);
166     }
167 
168     @Override
169     public int compare(KeyValueScanner left, KeyValueScanner right) {
170       int rowComparison = compareRows(left.peek(), right.peek());
171       if (rowComparison != 0) {
172         return -rowComparison;
173       }
174       return super.compare(left, right);
175     }
176 
177     /**
178      * Compares rows of two KeyValue
179      * @param left
180      * @param right
181      * @return less than 0 if left is smaller, 0 if equal etc..
182      */
183     public int compareRows(KeyValue left, KeyValue right) {
184       return super.kvComparator.compareRows(left, right);
185     }
186   }
187 
188   @Override
189   public boolean seekToLastRow() throws IOException {
190     throw new NotImplementedException("Not implemented");
191   }
192 }