1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.List;
23
24 import org.apache.commons.lang.NotImplementedException;
25 import org.apache.hadoop.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.KeyValue;
27 import org.apache.hadoop.hbase.KeyValue.KVComparator;
28
29
30
31
32
33
34
35
36 @InterfaceAudience.Private
37 public class ReversedKeyValueHeap extends KeyValueHeap {
38
39
40
41
42
43
44 public ReversedKeyValueHeap(List<? extends KeyValueScanner> scanners,
45 KVComparator comparator) throws IOException {
46 super(scanners, new ReversedKVScannerComparator(comparator));
47 }
48
49 @Override
50 public boolean seek(KeyValue seekKey) throws IOException {
51 throw new IllegalStateException(
52 "seek cannot be called on ReversedKeyValueHeap");
53 }
54
55 @Override
56 public boolean reseek(KeyValue seekKey) throws IOException {
57 throw new IllegalStateException(
58 "reseek cannot be called on ReversedKeyValueHeap");
59 }
60
61 @Override
62 public boolean requestSeek(KeyValue key, boolean forward, boolean useBloom)
63 throws IOException {
64 throw new IllegalStateException(
65 "requestSeek cannot be called on ReversedKeyValueHeap");
66 }
67
68 @Override
69 public boolean seekToPreviousRow(KeyValue seekKey) throws IOException {
70 if (current == null) {
71 return false;
72 }
73 heap.add(current);
74 current = null;
75
76 KeyValueScanner scanner;
77 while ((scanner = heap.poll()) != null) {
78 KeyValue topKey = scanner.peek();
79 if (comparator.getComparator().compareRows(topKey.getBuffer(),
80 topKey.getRowOffset(), topKey.getRowLength(), seekKey.getBuffer(),
81 seekKey.getRowOffset(), seekKey.getRowLength()) < 0) {
82
83 heap.add(scanner);
84 current = pollRealKV();
85 return current != null;
86 }
87
88 if (!scanner.seekToPreviousRow(seekKey)) {
89 scanner.close();
90 } else {
91 heap.add(scanner);
92 }
93 }
94
95
96 return false;
97 }
98
99 @Override
100 public boolean backwardSeek(KeyValue seekKey) throws IOException {
101 if (current == null) {
102 return false;
103 }
104 heap.add(current);
105 current = null;
106
107 KeyValueScanner scanner;
108 while ((scanner = heap.poll()) != null) {
109 KeyValue topKey = scanner.peek();
110 if ((comparator.getComparator().matchingRows(seekKey, topKey) && comparator
111 .getComparator().compare(seekKey, topKey) <= 0)
112 || comparator.getComparator().compareRows(seekKey, topKey) > 0) {
113 heap.add(scanner);
114 current = pollRealKV();
115 return current != null;
116 }
117 if (!scanner.backwardSeek(seekKey)) {
118 scanner.close();
119 } else {
120 heap.add(scanner);
121 }
122 }
123 return false;
124 }
125
126 @Override
127 public KeyValue next() throws IOException {
128 if (this.current == null) {
129 return null;
130 }
131 KeyValue kvReturn = this.current.next();
132 KeyValue kvNext = this.current.peek();
133 if (kvNext == null
134 || this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) {
135 if (this.current.seekToPreviousRow(kvReturn)) {
136 this.heap.add(this.current);
137 } else {
138 this.current.close();
139 }
140 this.current = pollRealKV();
141 } else {
142 KeyValueScanner topScanner = this.heap.peek();
143 if (topScanner != null
144 && this.comparator.compare(this.current, topScanner) > 0) {
145 this.heap.add(this.current);
146 this.current = pollRealKV();
147 }
148 }
149 return kvReturn;
150 }
151
152
153
154
155
156
157 private static class ReversedKVScannerComparator extends
158 KVScannerComparator {
159
160
161
162
163
164 public ReversedKVScannerComparator(KVComparator kvComparator) {
165 super(kvComparator);
166 }
167
168 @Override
169 public int compare(KeyValueScanner left, KeyValueScanner right) {
170 int rowComparison = compareRows(left.peek(), right.peek());
171 if (rowComparison != 0) {
172 return -rowComparison;
173 }
174 return super.compare(left, right);
175 }
176
177
178
179
180
181
182
183 public int compareRows(KeyValue left, KeyValue right) {
184 return super.kvComparator.compareRows(left, right);
185 }
186 }
187
188 @Override
189 public boolean seekToLastRow() throws IOException {
190 throw new NotImplementedException("Not implemented");
191 }
192 }