View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util;
19  
20  import static org.apache.hadoop.hbase.util.Order.ASCENDING;
21  import static org.apache.hadoop.hbase.util.Order.DESCENDING;
22  
23  import java.math.BigDecimal;
24  import java.math.BigInteger;
25  import java.math.MathContext;
26  import java.math.RoundingMode;
27  import java.nio.charset.Charset;
28  import java.util.Comparator;
29  
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.classification.InterfaceStability;
32  
33  import com.google.common.annotations.VisibleForTesting;
34  
35  /**
36   * Utility class that handles ordered byte arrays. That is, unlike
37   * {@link Bytes}, these methods produce byte arrays which maintain the sort
38   * order of the original values.
39   * <h3>Encoding Format summary</h3>
40   * <p>
41   * Each value is encoded as one or more bytes. The first byte of the encoding,
42   * its meaning, and a terse description of the bytes that follow is given by
43   * the following table:
44   * <table>
45   * <tr><th>Content Type</th><th>Encoding</th></tr>
46   * <tr><td>NULL</td><td>0x05</td></tr>
47   * <tr><td>negative infinity</td><td>0x07</td></tr>
48   * <tr><td>negative large</td><td>0x08, ~E, ~M</td></tr>
49   * <tr><td>negative medium</td><td>0x13-E, ~M</td></tr>
50   * <tr><td>negative small</td><td>0x14, -E, ~M</td></tr>
51   * <tr><td>zero</td><td>0x15</td></tr>
52   * <tr><td>positive small</td><td>0x16, ~-E, M</td></tr>
53   * <tr><td>positive medium</td><td>0x17+E, M</td></tr>
54   * <tr><td>positive large</td><td>0x22, E, M</td></tr>
55   * <tr><td>positive infinity</td><td>0x23</td></tr>
56   * <tr><td>NaN</td><td>0x25</td></tr>
57   * <tr><td>fixed-length 32-bit integer</td><td>0x27, I</td></tr>
58   * <tr><td>fixed-length 64-bit integer</td><td>0x28, I</td></tr>
59   * <tr><td>fixed-length 8-bit integer</td><td>0x29</td></tr>
60   * <tr><td>fixed-length 16-bit integer</td><td>0x2a</td></tr>
61   * <tr><td>fixed-length 32-bit float</td><td>0x30, F</td></tr>
62   * <tr><td>fixed-length 64-bit float</td><td>0x31, F</td></tr>
63   * <tr><td>TEXT</td><td>0x33, T</td></tr>
64   * <tr><td>variable length BLOB</td><td>0x35, B</td></tr>
65   * <tr><td>byte-for-byte BLOB</td><td>0x36, X</td></tr>
66   * </table>
67   * </p>
68   *
69   * <h3>Null Encoding</h3>
70   * <p>
71   * Each value that is a NULL encodes as a single byte of 0x05. Since every
72   * other value encoding begins with a byte greater than 0x05, this forces NULL
73   * values to sort first.
74   * </p>
75   * <h3>Text Encoding</h3>
76   * <p>
77   * Each text value begins with a single byte of 0x33 and ends with a single
78   * byte of 0x00. There are zero or more intervening bytes that encode the text
79   * value. The intervening bytes are chosen so that the encoding will sort in
80   * the desired collating order. The intervening bytes may not contain a 0x00
81   * character; the only 0x00 byte allowed in a text encoding is the final byte.
82   * </p>
83   * <p>
84   * The text encoding ends in 0x00 in order to ensure that when there are two
85   * strings where one is a prefix of the other that the shorter string will
86   * sort first.
87   * </p>
88   * <h3>Binary Encoding</h3>
89   * <p>
90   * There are two encoding strategies for binary fields, referred to as
91   * "BlobVar" and "BlobCopy". BlobVar is less efficient in both space and
92   * encoding time. It has no limitations on the range of encoded values.
93   * BlobCopy is a byte-for-byte copy of the input data followed by a
94   * termination byte. It is extremely fast to encode and decode. It carries the
95   * restriction of not allowing a 0x00 value in the input byte[] as this value
96   * is used as the termination byte.
97   * </p>
98   * <h4>BlobVar</h4>
99   * <p>
100  * "BlobVar" encodes the input byte[] in a manner similar to a variable length
101  * integer encoding. As with the other {@code OrderedBytes} encodings,
102  * the first encoded byte is used to indicate what kind of value follows. This
103  * header byte is 0x37 for BlobVar encoded values. As with the traditional
104  * varint encoding, the most significant bit of each subsequent encoded
105  * {@code byte} is used as a continuation marker. The 7 remaining bits
106  * contain the 7 most significant bits of the first unencoded byte. The next
107  * encoded byte starts with a continuation marker in the MSB. The least
108  * significant bit from the first unencoded byte follows, and the remaining 6
109  * bits contain the 6 MSBs of the second unencoded byte. The encoding
110  * continues, encoding 7 bytes on to 8 encoded bytes. The MSB of the final
111  * encoded byte contains a termination marker rather than a continuation
112  * marker, and any remaining bits from the final input byte. Any trailing bits
113  * in the final encoded byte are zeros.
114  * </p>
115  * <h4>BlobCopy</h4>
116  * <p>
117  * "BlobCopy" is a simple byte-for-byte copy of the input data. It uses 0x38
118  * as the header byte, and is terminated by 0x00 in the DESCENDING case. This
119  * alternative encoding is faster and more space-efficient, but it cannot
120  * accept values containing a 0x00 byte in DESCENDING order.
121  * </p>
122  * <h3>Variable-length Numeric Encoding</h3>
123  * <p>
124  * Numeric values must be coded so as to sort in numeric order. We assume that
125  * numeric values can be both integer and floating point values. Clients must
126  * be careful to use inspection methods for encoded values (such as
127  * {@link #isNumericInfinite(PositionedByteRange)} and
128  * {@link #isNumericNaN(PositionedByteRange)} to protect against decoding
129  * values into object which do not support these numeric concepts (such as
130  * {@link Long} and {@link BigDecimal}).
131  * </p>
132  * <p>
133  * Simplest cases first: If the numeric value is a NaN, then the encoding is a
134  * single byte of 0x25. This causes NaN values to sort after every other
135  * numeric value.
136  * </p>
137  * <p>
138  * If the numeric value is a negative infinity then the encoding is a single
139  * byte of 0x07. Since every other numeric value except NaN has a larger
140  * initial byte, this encoding ensures that negative infinity will sort prior
141  * to every other numeric value other than NaN.
142  * </p>
143  * <p>
144  * If the numeric value is a positive infinity then the encoding is a single
145  * byte of 0x23. Every other numeric value encoding begins with a smaller
146  * byte, ensuring that positive infinity always sorts last among numeric
147  * values. 0x23 is also smaller than 0x33, the initial byte of a text value,
148  * ensuring that every numeric value sorts before every text value.
149  * </p>
150  * <p>
151  * If the numeric value is exactly zero then it is encoded as a single byte of
152  * 0x15. Finite negative values will have initial bytes of 0x08 through 0x14
153  * and finite positive values will have initial bytes of 0x16 through 0x22.
154  * </p>
155  * <p>
156  * For all numeric values, we compute a mantissa M and an exponent E. The
157  * mantissa is a base-100 representation of the value. The exponent E
158  * determines where to put the decimal point.
159  * </p>
160  * <p>
161  * Each centimal digit of the mantissa is stored in a byte. If the value of
162  * the centimal digit is X (hence X&ge;0 and X&le;99) then the byte value will
163  * be 2*X+1 for every byte of the mantissa, except for the last byte which
164  * will be 2*X+0. The mantissa must be the minimum number of bytes necessary
165  * to represent the value; trailing X==0 digits are omitted. This means that
166  * the mantissa will never contain a byte with the value 0x00.
167  * </p>
168  * <p>
169  * If we assume all digits of the mantissa occur to the right of the decimal
170  * point, then the exponent E is the power of one hundred by which one must
171  * multiply the mantissa to recover the original value.
172  * </p>
173  * <p>
174  * Values are classified as large, medium, or small according to the value of
175  * E. If E is 11 or more, the value is large. For E between 0 and 10, the
176  * value is medium. For E less than zero, the value is small.
177  * </p>
178  * <p>
179  * Large positive values are encoded as a single byte 0x22 followed by E as a
180  * varint and then M. Medium positive values are a single byte of 0x17+E
181  * followed by M. Small positive values are encoded as a single byte 0x16
182  * followed by the ones-complement of the varint for -E followed by M.
183  * </p>
184  * <p>
185  * Small negative values are encoded as a single byte 0x14 followed by -E as a
186  * varint and then the ones-complement of M. Medium negative values are
187  * encoded as a byte 0x13-E followed by the ones-complement of M. Large
188  * negative values consist of the single byte 0x08 followed by the
189  * ones-complement of the varint encoding of E followed by the ones-complement
190  * of M.
191  * </p>
192  * <h3>Fixed-length Integer Encoding</h3>
193  * <p>
194  * All 4-byte integers are serialized to a 5-byte, fixed-width, sortable byte
195  * format. All 8-byte integers are serialized to the equivelant 9-byte format.
196  * Serialization is performed by writing a header byte, inverting the integer
197  * sign bit and writing the resulting bytes to the byte array in big endian
198  * order.
199  * </p>
200  * <h3>Fixed-length Floating Point Encoding</h3>
201  * <p>
202  * 32-bit and 64-bit floating point numbers are encoded to a 5-byte and 9-byte
203  * encoding format, respectively. The format is identical, save for the
204  * precision respected in each step of the operation.
205  * <p>
206  * This format ensures the following total ordering of floating point values:
207  * Float.NEGATIVE_INFINITY &lt; -Float.MAX_VALUE &lt; ... &lt;
208  * -Float.MIN_VALUE &lt; -0.0 &lt; +0.0; &lt; Float.MIN_VALUE &lt; ... &lt;
209  * Float.MAX_VALUE &lt; Float.POSITIVE_INFINITY &lt; Float.NaN
210  * </p>
211  * <p>
212  * Floating point numbers are encoded as specified in IEEE 754. A 32-bit
213  * single precision float consists of a sign bit, 8-bit unsigned exponent
214  * encoded in offset-127 notation, and a 23-bit significand. The format is
215  * described further in the <a
216  * href="http://en.wikipedia.org/wiki/Single_precision"> Single Precision
217  * Floating Point Wikipedia page</a>
218  * </p>
219  * <p>
220  * The value of a normal float is -1 <sup>sign bit</sup> &times;
221  * 2<sup>exponent - 127</sup> &times; 1.significand
222  * </p>
223  * <p>
224  * The IEE754 floating point format already preserves sort ordering for
225  * positive floating point numbers when the raw bytes are compared in most
226  * significant byte order. This is discussed further at <a href=
227  * "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm">
228  * http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm</a>
229  * </p>
230  * <p>
231  * Thus, we need only ensure that negative numbers sort in the the exact
232  * opposite order as positive numbers (so that say, negative infinity is less
233  * than negative 1), and that all negative numbers compare less than any
234  * positive number. To accomplish this, we invert the sign bit of all floating
235  * point numbers, and we also invert the exponent and significand bits if the
236  * floating point number was negative.
237  * </p>
238  * <p>
239  * More specifically, we first store the floating point bits into a 32-bit int
240  * {@code j} using {@link Float#floatToIntBits}. This method collapses
241  * all NaNs into a single, canonical NaN value but otherwise leaves the bits
242  * unchanged. We then compute
243  * </p>
244  *
245  * <pre>
246  * j &circ;= (j &gt;&gt; (Integer.SIZE - 1)) | Integer.MIN_SIZE
247  * </pre>
248  * <p>
249  * which inverts the sign bit and XOR's all other bits with the sign bit
250  * itself. Comparing the raw bytes of {@code j} in most significant byte
251  * order is equivalent to performing a single precision floating point
252  * comparison on the underlying bits (ignoring NaN comparisons, as NaNs don't
253  * compare equal to anything when performing floating point comparisons).
254  * </p>
255  * <p>
256  * The resulting integer is then converted into a byte array by serializing
257  * the integer one byte at a time in most significant byte order. The
258  * serialized integer is prefixed by a single header byte. All serialized
259  * values are 5 bytes in length.
260  * </p>
261  * <p>
262  * {@code OrderedBytes} encodings are heavily influenced by the <a href="
263  * http://sqlite.org/src4/doc/trunk/www/key_encoding.wiki">SQLite4 Key
264  * Encoding</a>. Slight deviations are make in the interest of order
265  * correctness and user extensibility. Fixed-width {@code Long} and
266  * {@link Double} encodings are based on implementations from the now defunct
267  * Orderly library.
268  * </p>
269  */
270 @InterfaceAudience.Public
271 @InterfaceStability.Evolving
272 public class OrderedBytes {
273 
274   /*
275    * These constants define header bytes used to identify encoded values. Note
276    * that the values here are not exhaustive as the Numeric format encodes
277    * portions of its value within the header byte. The values listed here are
278    * directly applied to persisted data -- DO NOT modify the values specified
279    * here. Instead, gaps are placed intentionally between values so that new
280    * implementations can be inserted into the total ordering enforced here.
281    */
282   private static final byte NULL = 0x05;
283   // room for 1 expansion type
284   private static final byte NEG_INF = 0x07;
285   private static final byte NEG_LARGE = 0x08;
286   private static final byte NEG_MED_MIN = 0x09;
287   private static final byte NEG_MED_MAX = 0x13;
288   private static final byte NEG_SMALL = 0x14;
289   private static final byte ZERO = 0x15;
290   private static final byte POS_SMALL = 0x16;
291   private static final byte POS_MED_MIN = 0x17;
292   private static final byte POS_MED_MAX = 0x21;
293   private static final byte POS_LARGE = 0x22;
294   private static final byte POS_INF = 0x23;
295   // room for 2 expansion type
296   private static final byte NAN = 0x26;
297   // room for 2 expansion types
298   private static final byte FIXED_INT8 = 0x29;
299   private static final byte FIXED_INT16 = 0x2a;
300   private static final byte FIXED_INT32 = 0x2b;
301   private static final byte FIXED_INT64 = 0x2c;
302   // room for 3 expansion types
303   private static final byte FIXED_FLOAT32 = 0x30;
304   private static final byte FIXED_FLOAT64 = 0x31;
305   // room for 2 expansion type
306   private static final byte TEXT = 0x34;
307   // room for 2 expansion type
308   private static final byte BLOB_VAR = 0x37;
309   private static final byte BLOB_COPY = 0x38;
310 
311   /*
312    * The following constant values are used by encoding implementations
313    */
314 
315   public static final Charset UTF8 = Charset.forName("UTF-8");
316   private static final byte TERM = 0x00;
317   private static final BigDecimal E8 = BigDecimal.valueOf(1e8);
318   private static final BigDecimal E32 = BigDecimal.valueOf(1e32);
319   private static final BigDecimal EN2 = BigDecimal.valueOf(1e-2);
320   private static final BigDecimal EN10 = BigDecimal.valueOf(1e-10);
321 
322   /**
323    * Max precision guaranteed to fit into a {@code long}.
324    */
325   public static final int MAX_PRECISION = 31;
326 
327   /**
328    * The context used to normalize {@link BigDecimal} values.
329    */
330   public static final MathContext DEFAULT_MATH_CONTEXT =
331       new MathContext(MAX_PRECISION, RoundingMode.HALF_UP);
332 
333   /**
334    * Creates the standard exception when the encoded header byte is unexpected for the decoding
335    * context.
336    * @param header value used in error message.
337    */
338   private static IllegalArgumentException unexpectedHeader(byte header) {
339     throw new IllegalArgumentException("unexpected value in first byte: 0x"
340         + Long.toHexString(header));
341   }
342 
343   /**
344    * Perform unsigned comparison between two long values. Conforms to the same interface as
345    * {@link Comparator#compare(Object, Object)}.
346    */
347   private static int unsignedCmp(long x1, long x2) {
348     int cmp;
349     if ((cmp = (x1 < x2 ? -1 : (x1 == x2 ? 0 : 1))) == 0) return 0;
350     // invert the result when either value is negative
351     if ((x1 < 0) != (x2 < 0)) return -cmp;
352     return cmp;
353   }
354 
355   /**
356    * Write a 32-bit unsigned integer to {@code dst} as 4 big-endian bytes.
357    * @return number of bytes written.
358    */
359   private static int putUint32(PositionedByteRange dst, int val) {
360     dst.put((byte) (val >>> 24))
361        .put((byte) (val >>> 16))
362        .put((byte) (val >>> 8))
363        .put((byte) val);
364     return 4;
365   }
366 
367   /**
368    * Encode an unsigned 64-bit unsigned integer {@code val} into {@code dst}.
369    * @param dst The destination to which encoded bytes are written.
370    * @param val The value to write.
371    * @param comp Compliment the encoded value when {@code comp} is true.
372    * @return number of bytes written.
373    */
374   @VisibleForTesting
375   static int putVaruint64(PositionedByteRange dst, long val, boolean comp) {
376     int w, y, len = 0;
377     final int offset = dst.getOffset(), start = dst.getPosition();
378     byte[] a = dst.getBytes();
379     Order ord = comp ? DESCENDING : ASCENDING;
380     if (-1 == unsignedCmp(val, 241L)) {
381       dst.put((byte) val);
382       len = dst.getPosition() - start;
383       ord.apply(a, offset + start, len);
384       return len;
385     }
386     if (-1 == unsignedCmp(val, 2288L)) {
387       y = (int) (val - 240);
388       dst.put((byte) (y / 256 + 241))
389          .put((byte) (y % 256));
390       len = dst.getPosition() - start;
391       ord.apply(a, offset + start, len);
392       return len;
393     }
394     if (-1 == unsignedCmp(val, 67824L)) {
395       y = (int) (val - 2288);
396       dst.put((byte) 249)
397          .put((byte) (y / 256))
398          .put((byte) (y % 256));
399       len = dst.getPosition() - start;
400       ord.apply(a, offset + start, len);
401       return len;
402     }
403     y = (int) val;
404     w = (int) (val >>> 32);
405     if (w == 0) {
406       if (-1 == unsignedCmp(y, 16777216L)) {
407         dst.put((byte) 250)
408            .put((byte) (y >>> 16))
409            .put((byte) (y >>> 8))
410            .put((byte) y);
411         len = dst.getPosition() - start;
412         ord.apply(a, offset + start, len);
413         return len;
414       }
415       dst.put((byte) 251);
416       putUint32(dst, y);
417       len = dst.getPosition() - start;
418       ord.apply(a, offset + start, len);
419       return len;
420     }
421     if (-1 == unsignedCmp(w, 256L)) {
422       dst.put((byte) 252)
423          .put((byte) w);
424       putUint32(dst, y);
425       len = dst.getPosition() - start;
426       ord.apply(a, offset + start, len);
427       return len;
428     }
429     if (-1 == unsignedCmp(w, 65536L)) {
430       dst.put((byte) 253)
431          .put((byte) (w >>> 8))
432          .put((byte) w);
433       putUint32(dst, y);
434       len = dst.getPosition() - start;
435       ord.apply(a, offset + start, len);
436       return len;
437     }
438     if (-1 == unsignedCmp(w, 16777216L)) {
439       dst.put((byte) 254)
440          .put((byte) (w >>> 16))
441          .put((byte) (w >>> 8))
442          .put((byte) w);
443       putUint32(dst, y);
444       len = dst.getPosition() - start;
445       ord.apply(a, offset + start, len);
446       return len;
447     }
448     dst.put((byte) 255);
449     putUint32(dst, w);
450     putUint32(dst, y);
451     len = dst.getPosition() - start;
452     ord.apply(a, offset + start, len);
453     return len;
454   }
455 
456   /**
457    * Inspect {@code src} for an encoded varuint64 for its length in bytes.
458    * Preserves the state of {@code src}.
459    * @param src source buffer
460    * @param comp if true, parse the compliment of the value.
461    * @return the number of bytes consumed by this value.
462    */
463   @VisibleForTesting
464   static int lengthVaruint64(PositionedByteRange src, boolean comp) {
465     int a0 = (comp ? DESCENDING : ASCENDING).apply(src.peek()) & 0xff;
466     if (a0 <= 240) return 1;
467     if (a0 >= 241 && a0 <= 248) return 2;
468     if (a0 == 249) return 3;
469     if (a0 == 250) return 4;
470     if (a0 == 251) return 5;
471     if (a0 == 252) return 6;
472     if (a0 == 253) return 7;
473     if (a0 == 254) return 8;
474     if (a0 == 255) return 9;
475     throw unexpectedHeader(src.peek());
476   }
477 
478   /**
479    * Skip {@code src} over the encoded varuint64.
480    * @param src source buffer
481    * @param cmp if true, parse the compliment of the value.
482    * @return the number of bytes skipped.
483    */
484   @VisibleForTesting
485   static int skipVaruint64(PositionedByteRange src, boolean cmp) {
486     final int len = lengthVaruint64(src, cmp);
487     src.setPosition(src.getPosition() + len);
488     return len;
489   }
490 
491   /**
492    * Decode a sequence of bytes in {@code src} as a varuint64. Compliment the
493    * encoded value when {@code comp} is true.
494    * @return the decoded value.
495    */
496   @VisibleForTesting
497   static long getVaruint64(PositionedByteRange src, boolean comp) {
498     assert src.getRemaining() >= lengthVaruint64(src, comp);
499     final long ret;
500     Order ord = comp ? DESCENDING : ASCENDING;
501     byte x = src.get();
502     final int a0 = ord.apply(x) & 0xff, a1, a2, a3, a4, a5, a6, a7, a8;
503     if (-1 == unsignedCmp(a0, 241)) {
504       return a0;
505     }
506     x = src.get();
507     a1 = ord.apply(x) & 0xff;
508     if (-1 == unsignedCmp(a0, 249)) {
509       return (a0 - 241) * 256 + a1 + 240;
510     }
511     x = src.get();
512     a2 = ord.apply(x) & 0xff;
513     if (a0 == 249) {
514       return 2288 + 256 * a1 + a2;
515     }
516     x = src.get();
517     a3 = ord.apply(x) & 0xff;
518     if (a0 == 250) {
519       return (a1 << 16) | (a2 << 8) | a3;
520     }
521     x = src.get();
522     a4 = ord.apply(x) & 0xff;
523     ret = (((long) a1) << 24) | (a2 << 16) | (a3 << 8) | a4;
524     if (a0 == 251) {
525       return ret;
526     }
527     x = src.get();
528     a5 = ord.apply(x) & 0xff;
529     if (a0 == 252) {
530       return (ret << 8) | a5;
531     }
532     x = src.get();
533     a6 = ord.apply(x) & 0xff;
534     if (a0 == 253) {
535       return (ret << 16) | (a5 << 8) | a6;
536     }
537     x = src.get();
538     a7 = ord.apply(x) & 0xff;
539     if (a0 == 254) {
540       return (ret << 24) | (a5 << 16) | (a6 << 8) | a7;
541     }
542     x = src.get();
543     a8 = ord.apply(x) & 0xff;
544     return (ret << 32) | (((long) a5) << 24) | (a6 << 16) | (a7 << 8) | a8;
545   }
546 
547   /**
548    * Strip all trailing zeros to ensure that no digit will be zero and round
549    * using our default context to ensure precision doesn't exceed max allowed.
550    * From Phoenix's {@code NumberUtil}.
551    * @return new {@link BigDecimal} instance
552    */
553   @VisibleForTesting
554   static BigDecimal normalize(BigDecimal val) {
555     return null == val ? null : val.stripTrailingZeros().round(DEFAULT_MATH_CONTEXT);
556   }
557 
558   /**
559    * Read significand digits from {@code src} according to the magnitude
560    * of {@code e}.
561    * @param src The source from which to read encoded digits.
562    * @param e The magnitude of the first digit read.
563    * @param comp Treat encoded bytes as compliments when {@code comp} is true.
564    * @return The decoded value.
565    * @throws IllegalArgumentException when read exceeds the remaining length
566    *     of {@code src}.
567    */
568   private static BigDecimal decodeSignificand(PositionedByteRange src, int e, boolean comp) {
569     // TODO: can this be made faster?
570     byte[] a = src.getBytes();
571     final int start = src.getPosition(), offset = src.getOffset(), remaining = src.getRemaining();
572     Order ord = comp ? DESCENDING : ASCENDING;
573     BigDecimal m = BigDecimal.ZERO;
574     e--;
575     for (int i = 0;; i++) {
576       if (i > remaining) {
577         // we've exceeded this range's window
578         src.setPosition(start);
579         throw new IllegalArgumentException(
580             "Read exceeds range before termination byte found. offset: " + offset + " position: "
581                 + (start + i));
582       }
583       // base-100 digits are encoded as val * 2 + 1 except for the termination digit.
584       m = m.add( // m +=
585         new BigDecimal(BigInteger.ONE, e * -2).multiply( // 100 ^ p * [decoded digit]
586           BigDecimal.valueOf((ord.apply(a[offset + start + i]) & 0xff) / 2)));
587       e--;
588       // detect termination digit
589       if ((ord.apply(a[offset + start + i]) & 1) == 0) {
590         src.setPosition(start + i + 1);
591         break;
592       }
593     }
594     return normalize(m);
595   }
596 
597   /**
598    * Skip {@code src} over the significand bytes.
599    * @param src The source from which to read encoded digits.
600    * @param comp Treat encoded bytes as compliments when {@code comp} is true.
601    * @return the number of bytes skipped.
602    */
603   private static int skipSignificand(PositionedByteRange src, boolean comp) {
604     byte[] a = src.getBytes();
605     final int offset = src.getOffset(), start = src.getPosition();
606     int i = src.getPosition();
607     while (((comp ? DESCENDING : ASCENDING).apply(a[offset + i++]) & 1) != 0)
608       ;
609     src.setPosition(i);
610     return i - start;
611   }
612 
613   /**
614    * <p>
615    * Encode the small magnitude floating point number {@code val} using the
616    * key encoding. The caller guarantees that 1.0 > abs(val) > 0.0.
617    * </p>
618    * <p>
619    * A floating point value is encoded as an integer exponent {@code E} and a
620    * mantissa {@code M}. The original value is equal to {@code (M * 100^E)}.
621    * {@code E} is set to the smallest value possible without making {@code M}
622    * greater than or equal to 1.0.
623    * </p>
624    * <p>
625    * For this routine, {@code E} will always be zero or negative, since the
626    * original value is less than one. The encoding written by this routine is
627    * the ones-complement of the varint of the negative of {@code E} followed
628    * by the mantissa:
629    * <pre>
630    *   Encoding:   ~-E  M
631    * </pre>
632    * </p>
633    * @param dst The destination to which encoded digits are written.
634    * @param val The value to encode.
635    * @return the number of bytes written.
636    */
637   private static int encodeNumericSmall(PositionedByteRange dst, BigDecimal val) {
638     // TODO: this can be done faster?
639     // assert 1.0 > abs(val) > 0.0
640     BigDecimal abs = val.abs();
641     assert BigDecimal.ZERO.compareTo(abs) < 0 && BigDecimal.ONE.compareTo(abs) > 0;
642     byte[] a = dst.getBytes();
643     boolean isNeg = val.signum() == -1;
644     final int offset = dst.getOffset(), start = dst.getPosition();
645     int e = 0, d, startM;
646 
647     if (isNeg) { /* Small negative number: 0x14, -E, ~M */
648       dst.put(NEG_SMALL);
649     } else { /* Small positive number: 0x16, ~-E, M */
650       dst.put(POS_SMALL);
651     }
652 
653     // normalize abs(val) to determine E
654     while (abs.compareTo(EN10) < 0) { abs = abs.movePointRight(8); e += 4; }
655     while (abs.compareTo(EN2) < 0) { abs = abs.movePointRight(2); e++; }
656 
657     putVaruint64(dst, e, !isNeg); // encode appropriate E value.
658 
659     // encode M by peeling off centimal digits, encoding x as 2x+1
660     startM = dst.getPosition();
661     // TODO: 18 is an arbitrary encoding limit. Reevaluate once we have a better handling of
662     // numeric scale.
663     for (int i = 0; i < 18 && abs.compareTo(BigDecimal.ZERO) != 0; i++) {
664       abs = abs.movePointRight(2);
665       d = abs.intValue();
666       dst.put((byte) ((2 * d + 1) & 0xff));
667       abs = abs.subtract(BigDecimal.valueOf(d));
668     }
669     a[offset + dst.getPosition() - 1] &= 0xfe; // terminal digit should be 2x
670     if (isNeg) {
671       // negative values encoded as ~M
672       DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
673     }
674     return dst.getPosition() - start;
675   }
676 
677   /**
678    * Encode the large magnitude floating point number {@code val} using
679    * the key encoding. The caller guarantees that {@code val} will be
680    * finite and abs(val) >= 1.0.
681    * <p>
682    * A floating point value is encoded as an integer exponent {@code E}
683    * and a mantissa {@code M}. The original value is equal to
684    * {@code (M * 100^E)}. {@code E} is set to the smallest value
685    * possible without making {@code M} greater than or equal to 1.0.
686    * </p>
687    * <p>
688    * Each centimal digit of the mantissa is stored in a byte. If the value of
689    * the centimal digit is {@code X} (hence {@code X>=0} and
690    * {@code X<=99}) then the byte value will be {@code 2*X+1} for
691    * every byte of the mantissa, except for the last byte which will be
692    * {@code 2*X+0}. The mantissa must be the minimum number of bytes
693    * necessary to represent the value; trailing {@code X==0} digits are
694    * omitted. This means that the mantissa will never contain a byte with the
695    * value {@code 0x00}.
696    * </p>
697    * <p>
698    * If {@code E > 10}, then this routine writes of {@code E} as a
699    * varint followed by the mantissa as described above. Otherwise, if
700    * {@code E <= 10}, this routine only writes the mantissa and leaves
701    * the {@code E} value to be encoded as part of the opening byte of the
702    * field by the calling function.
703    *
704    * <pre>
705    *   Encoding:  M       (if E<=10)
706    *              E M     (if E>10)
707    * </pre>
708    * </p>
709    * @param dst The destination to which encoded digits are written.
710    * @param val The value to encode.
711    * @return the number of bytes written.
712    */
713   private static int encodeNumericLarge(PositionedByteRange dst, BigDecimal val) {
714     // TODO: this can be done faster
715     BigDecimal abs = val.abs();
716     byte[] a = dst.getBytes();
717     boolean isNeg = val.signum() == -1;
718     final int start = dst.getPosition(), offset = dst.getOffset();
719     int e = 0, d, startM;
720 
721     if (isNeg) { /* Large negative number: 0x08, ~E, ~M */
722       dst.put(NEG_LARGE);
723     } else { /* Large positive number: 0x22, E, M */
724       dst.put(POS_LARGE);
725     }
726 
727     // normalize abs(val) to determine E
728     while (abs.compareTo(E32) >= 0 && e <= 350) { abs = abs.movePointLeft(32); e +=16; }
729     while (abs.compareTo(E8) >= 0 && e <= 350) { abs = abs.movePointLeft(8); e+= 4; }
730     while (abs.compareTo(BigDecimal.ONE) >= 0 && e <= 350) { abs = abs.movePointLeft(2); e++; }
731 
732     // encode appropriate header byte and/or E value.
733     if (e > 10) { /* large number, write out {~,}E */
734       putVaruint64(dst, e, isNeg);
735     } else {
736       if (isNeg) { /* Medium negative number: 0x13-E, ~M */
737         dst.put(start, (byte) (NEG_MED_MAX - e));
738       } else { /* Medium positive number: 0x17+E, M */
739         dst.put(start, (byte) (POS_MED_MIN + e));
740       }
741     }
742 
743     // encode M by peeling off centimal digits, encoding x as 2x+1
744     startM = dst.getPosition();
745     // TODO: 18 is an arbitrary encoding limit. Reevaluate once we have a better handling of
746     // numeric scale.
747     for (int i = 0; i < 18 && abs.compareTo(BigDecimal.ZERO) != 0; i++) {
748       abs = abs.movePointRight(2);
749       d = abs.intValue();
750       dst.put((byte) (2 * d + 1));
751       abs = abs.subtract(BigDecimal.valueOf(d));
752     }
753 
754     a[offset + dst.getPosition() - 1] &= 0xfe; // terminal digit should be 2x
755     if (isNeg) {
756       // negative values encoded as ~M
757       DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
758     }
759     return dst.getPosition() - start;
760   }
761 
762   /**
763    * Encode a numerical value using the variable-length encoding.
764    * @param dst The destination to which encoded digits are written.
765    * @param val The value to encode.
766    * @param ord The {@link Order} to respect while encoding {@code val}.
767    * @return the number of bytes written.
768    */
769   public static int encodeNumeric(PositionedByteRange dst, long val, Order ord) {
770     return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
771   }
772 
773   /**
774    * Encode a numerical value using the variable-length encoding.
775    * @param dst The destination to which encoded digits are written.
776    * @param val The value to encode.
777    * @param ord The {@link Order} to respect while encoding {@code val}.
778    * @return the number of bytes written.
779    */
780   public static int encodeNumeric(PositionedByteRange dst, double val, Order ord) {
781     if (val == 0.0) {
782       dst.put(ord.apply(ZERO));
783       return 1;
784     }
785     if (Double.isNaN(val)) {
786       dst.put(ord.apply(NAN));
787       return 1;
788     }
789     if (val == Double.NEGATIVE_INFINITY) {
790       dst.put(ord.apply(NEG_INF));
791       return 1;
792     }
793     if (val == Double.POSITIVE_INFINITY) {
794       dst.put(ord.apply(POS_INF));
795       return 1;
796     }
797     return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
798   }
799 
800   /**
801    * Encode a numerical value using the variable-length encoding.
802    * @param dst The destination to which encoded digits are written.
803    * @param val The value to encode.
804    * @param ord The {@link Order} to respect while encoding {@code val}.
805    * @return the number of bytes written.
806    */
807   public static int encodeNumeric(PositionedByteRange dst, BigDecimal val, Order ord) {
808     final int len, offset = dst.getOffset(), start = dst.getPosition();
809     if (null == val) {
810       return encodeNull(dst, ord);
811     } else if (BigDecimal.ZERO.compareTo(val) == 0) {
812       dst.put(ord.apply(ZERO));
813       return 1;
814     }
815     BigDecimal abs = val.abs();
816     if (BigDecimal.ONE.compareTo(abs) <= 0) { // abs(v) >= 1.0
817       len = encodeNumericLarge(dst, normalize(val));
818     } else { // 1.0 > abs(v) >= 0.0
819       len = encodeNumericSmall(dst, normalize(val));
820     }
821     ord.apply(dst.getBytes(), offset + start, len);
822     return len;
823   }
824 
825   /**
826    * Decode a {@link BigDecimal} from {@code src}. Assumes {@code src} encodes
827    * a value in Numeric encoding and is within the valid range of
828    * {@link BigDecimal} values. {@link BigDecimal} does not support {@code NaN}
829    * or {@code Infinte} values.
830    * @see #decodeNumericAsDouble(PositionedByteRange)
831    */
832   private static BigDecimal decodeNumericValue(PositionedByteRange src) {
833     final int e;
834     byte header = src.get();
835     boolean dsc = -1 == Integer.signum(header);
836     header = dsc ? DESCENDING.apply(header) : header;
837 
838     if (header == NULL) return null;
839     if (header == NEG_LARGE) { /* Large negative number: 0x08, ~E, ~M */
840       e = (int) getVaruint64(src, !dsc);
841       return decodeSignificand(src, e, !dsc).negate();
842     }
843     if (header >= NEG_MED_MIN && header <= NEG_MED_MAX) {
844       /* Medium negative number: 0x13-E, ~M */
845       e = NEG_MED_MAX - header;
846       return decodeSignificand(src, e, !dsc).negate();
847     }
848     if (header == NEG_SMALL) { /* Small negative number: 0x14, -E, ~M */
849       e = (int) -getVaruint64(src, dsc);
850       return decodeSignificand(src, e, !dsc).negate();
851     }
852     if (header == ZERO) {
853       return BigDecimal.ZERO;
854     }
855     if (header == POS_SMALL) { /* Small positive number: 0x16, ~-E, M */
856       e = (int) -getVaruint64(src, !dsc);
857       return decodeSignificand(src, e, dsc);
858     }
859     if (header >= POS_MED_MIN && header <= POS_MED_MAX) {
860       /* Medium positive number: 0x17+E, M */
861       e = header - POS_MED_MIN;
862       return decodeSignificand(src, e, dsc);
863     }
864     if (header == POS_LARGE) { /* Large positive number: 0x22, E, M */
865       e = (int) getVaruint64(src, dsc);
866       return decodeSignificand(src, e, dsc);
867     }
868     throw unexpectedHeader(header);
869   }
870 
871   /**
872    * Decode a primitive {@code double} value from the Numeric encoding. Numeric
873    * encoding is based on {@link BigDecimal}; in the event the encoded value is
874    * larger than can be represented in a {@code double}, this method performs
875    * an implicit narrowing conversion as described in
876    * {@link BigDecimal#doubleValue()}.
877    * @throws NullPointerException when the encoded value is {@code NULL}.
878    * @throws IllegalArgumentException when the encoded value is not a Numeric.
879    * @see #encodeNumeric(PositionedByteRange, double, Order)
880    * @see BigDecimal#doubleValue()
881    */
882   public static double decodeNumericAsDouble(PositionedByteRange src) {
883     // TODO: should an encoded NULL value throw unexpectedHeader() instead?
884     if (isNull(src)) {
885       throw new NullPointerException("A null value cannot be decoded to a double.");
886     }
887     if (isNumericNaN(src)) {
888       src.get();
889       return Double.NaN;
890     }
891     if (isNumericZero(src)) {
892       src.get();
893       return Double.valueOf(0.0);
894     }
895 
896     byte header = -1 == Integer.signum(src.peek()) ? DESCENDING.apply(src.peek()) : src.peek();
897 
898     if (header == NEG_INF) {
899       src.get();
900       return Double.NEGATIVE_INFINITY;
901     } else if (header == POS_INF) {
902       src.get();
903       return Double.POSITIVE_INFINITY;
904     } else {
905       return decodeNumericValue(src).doubleValue();
906     }
907   }
908 
909   /**
910    * Decode a primitive {@code long} value from the Numeric encoding. Numeric
911    * encoding is based on {@link BigDecimal}; in the event the encoded value is
912    * larger than can be represented in a {@code long}, this method performs an
913    * implicit narrowing conversion as described in
914    * {@link BigDecimal#doubleValue()}.
915    * @throws NullPointerException when the encoded value is {@code NULL}.
916    * @throws IllegalArgumentException when the encoded value is not a Numeric.
917    * @see #encodeNumeric(PositionedByteRange, long, Order)
918    * @see BigDecimal#longValue()
919    */
920   public static long decodeNumericAsLong(PositionedByteRange src) {
921     // TODO: should an encoded NULL value throw unexpectedHeader() instead?
922     if (isNull(src)) throw new NullPointerException();
923     if (!isNumeric(src)) throw unexpectedHeader(src.peek());
924     if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
925     if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
926 
927     if (isNumericZero(src)) {
928       src.get();
929       return Long.valueOf(0);
930     }
931     return decodeNumericValue(src).longValue();
932   }
933 
934   /**
935    * Decode a {@link BigDecimal} value from the variable-length encoding.
936    * @throws IllegalArgumentException when the encoded value is not a Numeric.
937    * @see #encodeNumeric(PositionedByteRange, BigDecimal, Order)
938    */
939   public static BigDecimal decodeNumericAsBigDecimal(PositionedByteRange src) {
940     if (isNull(src)) {
941       src.get();
942       return null;
943     }
944     if (!isNumeric(src)) throw unexpectedHeader(src.peek());
945     if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
946     if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
947     return decodeNumericValue(src);
948   }
949 
950   /**
951    * Encode a String value. String encoding is 0x00-terminated and so it does
952    * not support {@code \u0000} codepoints in the value.
953    * @param dst The destination to which the encoded value is written.
954    * @param val The value to encode.
955    * @param ord The {@link Order} to respect while encoding {@code val}.
956    * @return the number of bytes written.
957    * @throws IllegalArgumentException when {@code val} contains a {@code \u0000}.
958    */
959   public static int encodeString(PositionedByteRange dst, String val, Order ord) {
960     if (null == val) {
961       return encodeNull(dst, ord);
962     }
963     if (val.contains("\u0000"))
964       throw new IllegalArgumentException("Cannot encode String values containing '\\u0000'");
965     final int offset = dst.getOffset(), start = dst.getPosition();
966     dst.put(TEXT);
967     // TODO: is there no way to decode into dst directly?
968     dst.put(val.getBytes(UTF8));
969     dst.put(TERM);
970     ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
971     return dst.getPosition() - start;
972   }
973 
974   /**
975    * Decode a String value.
976    */
977   public static String decodeString(PositionedByteRange src) {
978     final byte header = src.get();
979     if (header == NULL || header == DESCENDING.apply(NULL))
980       return null;
981     assert header == TEXT || header == DESCENDING.apply(TEXT);
982     Order ord = header == TEXT ? ASCENDING : DESCENDING;
983     byte[] a = src.getBytes();
984     final int offset = src.getOffset(), start = src.getPosition();
985     final byte terminator = ord.apply(TERM);
986     int rawStartPos = offset + start, rawTermPos = rawStartPos;
987     for (; a[rawTermPos] != terminator; rawTermPos++)
988       ;
989     src.setPosition(rawTermPos - offset + 1); // advance position to TERM + 1
990     if (DESCENDING == ord) {
991       // make a copy so that we don't disturb encoded value with ord.
992       byte[] copy = new byte[rawTermPos - rawStartPos];
993       System.arraycopy(a, rawStartPos, copy, 0, copy.length);
994       ord.apply(copy);
995       return new String(copy, UTF8);
996     } else {
997       return new String(a, rawStartPos, rawTermPos - rawStartPos, UTF8);
998     }
999   }
1000 
1001   /**
1002    * Calculate the expected BlobVar encoded length based on unencoded length.
1003    */
1004   public static int blobVarEncodedLength(int len) {
1005     if (0 == len)
1006       return 2; // 1-byte header + 1-byte terminator
1007     else
1008       return (int)
1009           Math.ceil(
1010             (len * 8) // 8-bits per input byte
1011             / 7.0)    // 7-bits of input data per encoded byte, rounded up
1012           + 1;        // + 1-byte header
1013   }
1014 
1015   /**
1016    * Calculate the expected BlobVar decoded length based on encoded length.
1017    */
1018   @VisibleForTesting
1019   static int blobVarDecodedLength(int len) {
1020     return
1021         ((len
1022           - 1) // 1-byte header
1023           * 7) // 7-bits of payload per encoded byte
1024           / 8; // 8-bits per byte
1025   }
1026 
1027   /**
1028    * Encode a Blob value using a modified varint encoding scheme.
1029    * <p>
1030    * This format encodes a byte[] value such that no limitations on the input
1031    * value are imposed. The first byte encodes the encoding scheme that
1032    * follows, {@link #BLOB_VAR}. Each encoded byte thereafter consists of a
1033    * header bit followed by 7 bits of payload. A header bit of '1' indicates
1034    * continuation of the encoding. A header bit of '0' indicates this byte
1035    * contains the last of the payload. An empty input value is encoded as the
1036    * header byte immediately followed by a termination byte {@code 0x00}. This
1037    * is not ambiguous with the encoded value of {@code []}, which results in
1038    * {@code [0x80, 0x00]}.
1039    * </p>
1040    * @return the number of bytes written.
1041    */
1042   public static int encodeBlobVar(PositionedByteRange dst, byte[] val, int voff, int vlen,
1043       Order ord) {
1044     if (null == val) {
1045       return encodeNull(dst, ord);
1046     }
1047     // Empty value is null-terminated. All other values are encoded as 7-bits per byte.
1048     assert dst.getRemaining() >= blobVarEncodedLength(vlen) : "buffer overflow expected.";
1049     final int offset = dst.getOffset(), start = dst.getPosition();
1050     dst.put(BLOB_VAR);
1051     if (0 == vlen) {
1052       dst.put(TERM);
1053     } else {
1054       byte s = 1, t = 0;
1055       for (int i = voff; i < vlen; i++) {
1056         dst.put((byte) (0x80 | t | ((val[i] & 0xff) >>> s)));
1057         if (s < 7) {
1058           t = (byte) (val[i] << (7 - s));
1059           s++;
1060         } else {
1061           dst.put((byte) (0x80 | val[i]));
1062           s = 1;
1063           t = 0;
1064         }
1065       }
1066       if (s > 1) {
1067         dst.put((byte) (0x7f & t));
1068       } else {
1069         dst.getBytes()[offset + dst.getPosition() - 1] &= 0x7f;
1070       }
1071     }
1072     ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1073     return dst.getPosition() - start;
1074   }
1075 
1076   /**
1077    * Encode a blob value using a modified varint encoding scheme.
1078    * @return the number of bytes written.
1079    * @see #encodeBlobVar(PositionedByteRange, byte[], int, int, Order)
1080    */
1081   public static int encodeBlobVar(PositionedByteRange dst, byte[] val, Order ord) {
1082     return encodeBlobVar(dst, val, 0, null != val ? val.length : 0, ord);
1083   }
1084 
1085   /**
1086    * Decode a blob value that was encoded using BlobVar encoding.
1087    */
1088   public static byte[] decodeBlobVar(PositionedByteRange src) {
1089     final byte header = src.get();
1090     if (header == NULL || header == DESCENDING.apply(NULL)) {
1091       return null;
1092     }
1093     assert header == BLOB_VAR || header == DESCENDING.apply(BLOB_VAR);
1094     Order ord = BLOB_VAR == header ? ASCENDING : DESCENDING;
1095     if (src.peek() == ord.apply(TERM)) {
1096       // skip empty input buffer.
1097       src.get();
1098       return new byte[0];
1099     }
1100     final int offset = src.getOffset(), start = src.getPosition();
1101     int end;
1102     byte[] a = src.getBytes();
1103     for (end = start; (byte) (ord.apply(a[offset + end]) & 0x80) != TERM; end++)
1104       ;
1105     end++; // increment end to 1-past last byte
1106     // create ret buffer using length of encoded data + 1 (header byte)
1107     PositionedByteRange ret = new SimplePositionedByteRange(blobVarDecodedLength(end - start + 1));
1108     int s = 6;
1109     byte t = (byte) ((ord.apply(a[offset + start]) << 1) & 0xff);
1110     for (int i = start + 1; i < end; i++) {
1111       if (s == 7) {
1112         ret.put((byte) (t | (ord.apply(a[offset + i]) & 0x7f)));
1113         i++;
1114                // explicitly reset t -- clean up overflow buffer after decoding
1115                // a full cycle and retain assertion condition below. This happens
1116         t = 0; // when the LSB in the last encoded byte is 1. (HBASE-9893)
1117       } else {
1118         ret.put((byte) (t | ((ord.apply(a[offset + i]) & 0x7f) >>> s)));
1119       }
1120       if (i == end) break;
1121       t = (byte) ((ord.apply(a[offset + i]) << 8 - s) & 0xff);
1122       s = s == 1 ? 7 : s - 1;
1123     }
1124     src.setPosition(end);
1125     assert t == 0 : "Unexpected bits remaining after decoding blob.";
1126     assert ret.getPosition() == ret.getLength() : "Allocated unnecessarily large return buffer.";
1127     return ret.getBytes();
1128   }
1129 
1130   /**
1131    * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in
1132    * DESCENDING order is NULL terminated so as to preserve proper sorting of
1133    * {@code []} and so it does not support {@code 0x00} in the value.
1134    * @return the number of bytes written.
1135    * @throws IllegalArgumentException when {@code ord} is DESCENDING and
1136    *    {@code val} contains a {@code 0x00} byte.
1137    */
1138   public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, int voff, int vlen,
1139       Order ord) {
1140     if (null == val) {
1141       encodeNull(dst, ord);
1142       if (ASCENDING == ord) return 1;
1143       else {
1144         // DESCENDING ordered BlobCopy requires a termination bit to preserve
1145         // sort-order semantics of null values.
1146         dst.put(ord.apply(TERM));
1147         return 2;
1148       }
1149     }
1150     // Blobs as final entry in a compound key are written unencoded.
1151     assert dst.getRemaining() >= vlen + (ASCENDING == ord ? 1 : 2);
1152     if (DESCENDING == ord) {
1153       for (int i = 0; i < vlen; i++) {
1154         if (TERM == val[voff + i]) {
1155           throw new IllegalArgumentException("0x00 bytes not permitted in value.");
1156         }
1157       }
1158     }
1159     final int offset = dst.getOffset(), start = dst.getPosition();
1160     dst.put(BLOB_COPY);
1161     dst.put(val, voff, vlen);
1162     // DESCENDING ordered BlobCopy requires a termination bit to preserve
1163     // sort-order semantics of null values.
1164     if (DESCENDING == ord) dst.put(TERM);
1165     ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1166     return dst.getPosition() - start;
1167   }
1168 
1169   /**
1170    * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in
1171    * DESCENDING order is NULL terminated so as to preserve proper sorting of
1172    * {@code []} and so it does not support {@code 0x00} in the value.
1173    * @return the number of bytes written.
1174    * @throws IllegalArgumentException when {@code ord} is DESCENDING and
1175    *    {@code val} contains a {@code 0x00} byte.
1176    * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1177    */
1178   public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, Order ord) {
1179     return encodeBlobCopy(dst, val, 0, null != val ? val.length : 0, ord);
1180   }
1181 
1182   /**
1183    * Decode a Blob value, byte-for-byte copy.
1184    * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1185    */
1186   public static byte[] decodeBlobCopy(PositionedByteRange src) {
1187     byte header = src.get();
1188     if (header == NULL || header == DESCENDING.apply(NULL)) {
1189       return null;
1190     }
1191     assert header == BLOB_COPY || header == DESCENDING.apply(BLOB_COPY);
1192     Order ord = header == BLOB_COPY ? ASCENDING : DESCENDING;
1193     final int length = src.getRemaining() - (ASCENDING == ord ? 0 : 1);
1194     byte[] ret = new byte[length];
1195     src.get(ret);
1196     ord.apply(ret, 0, ret.length);
1197     // DESCENDING ordered BlobCopy requires a termination bit to preserve
1198     // sort-order semantics of null values.
1199     if (DESCENDING == ord) src.get();
1200     return ret;
1201   }
1202 
1203   /**
1204    * Encode a null value.
1205    * @param dst The destination to which encoded digits are written.
1206    * @param ord The {@link Order} to respect while encoding {@code val}.
1207    * @return the number of bytes written.
1208    */
1209   public static int encodeNull(PositionedByteRange dst, Order ord) {
1210     dst.put(ord.apply(NULL));
1211     return 1;
1212   }
1213 
1214   /**
1215    * Encode an {@code int8} value using the fixed-length encoding.
1216    * @return the number of bytes written.
1217    * @see #encodeInt64(PositionedByteRange, long, Order)
1218    * @see #decodeInt8(PositionedByteRange)
1219    */
1220   public static int encodeInt8(PositionedByteRange dst, byte val, Order ord) {
1221     final int offset = dst.getOffset(), start = dst.getPosition();
1222     dst.put(FIXED_INT8)
1223        .put((byte) (val ^ 0x80));
1224     ord.apply(dst.getBytes(), offset + start, 2);
1225     return 2;
1226   }
1227 
1228   /**
1229    * Decode an {@code int8} value.
1230    * @see #encodeInt8(PositionedByteRange, byte, Order)
1231    */
1232   public static byte decodeInt8(PositionedByteRange src) {
1233     final byte header = src.get();
1234     assert header == FIXED_INT8 || header == DESCENDING.apply(FIXED_INT8);
1235     Order ord = header == FIXED_INT8 ? ASCENDING : DESCENDING;
1236     return (byte)((ord.apply(src.get()) ^ 0x80) & 0xff);
1237   }
1238 
1239   /**
1240    * Encode an {@code int16} value using the fixed-length encoding.
1241    * @return the number of bytes written.
1242    * @see #encodeInt64(PositionedByteRange, long, Order)
1243    * @see #decodeInt16(PositionedByteRange)
1244    */
1245   public static int encodeInt16(PositionedByteRange dst, short val, Order ord) {
1246     final int offset = dst.getOffset(), start = dst.getPosition();
1247     dst.put(FIXED_INT16)
1248        .put((byte) ((val >> 8) ^ 0x80))
1249        .put((byte) val);
1250     ord.apply(dst.getBytes(), offset + start, 3);
1251     return 3;
1252   }
1253 
1254   /**
1255    * Decode an {@code int16} value.
1256    * @see #encodeInt16(PositionedByteRange, short, Order)
1257    */
1258   public static short decodeInt16(PositionedByteRange src) {
1259     final byte header = src.get();
1260     assert header == FIXED_INT16 || header == DESCENDING.apply(FIXED_INT16);
1261     Order ord = header == FIXED_INT16 ? ASCENDING : DESCENDING;
1262     short val = (short) ((ord.apply(src.get()) ^ 0x80) & 0xff);
1263     val = (short) ((val << 8) + (ord.apply(src.get()) & 0xff));
1264     return val;
1265   }
1266 
1267   /**
1268    * Encode an {@code int32} value using the fixed-length encoding.
1269    * @return the number of bytes written.
1270    * @see #encodeInt64(PositionedByteRange, long, Order)
1271    * @see #decodeInt32(PositionedByteRange)
1272    */
1273   public static int encodeInt32(PositionedByteRange dst, int val, Order ord) {
1274     final int offset = dst.getOffset(), start = dst.getPosition();
1275     dst.put(FIXED_INT32)
1276         .put((byte) ((val >> 24) ^ 0x80))
1277         .put((byte) (val >> 16))
1278         .put((byte) (val >> 8))
1279         .put((byte) val);
1280     ord.apply(dst.getBytes(), offset + start, 5);
1281     return 5;
1282   }
1283 
1284   /**
1285    * Decode an {@code int32} value.
1286    * @see #encodeInt32(PositionedByteRange, int, Order)
1287    */
1288   public static int decodeInt32(PositionedByteRange src) {
1289     final byte header = src.get();
1290     assert header == FIXED_INT32 || header == DESCENDING.apply(FIXED_INT32);
1291     Order ord = header == FIXED_INT32 ? ASCENDING : DESCENDING;
1292     int val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1293     for (int i = 1; i < 4; i++) {
1294       val = (val << 8) + (ord.apply(src.get()) & 0xff);
1295     }
1296     return val;
1297   }
1298 
1299   /**
1300    * Encode an {@code int64} value using the fixed-length encoding.
1301    * <p>
1302    * This format ensures that all longs sort in their natural order, as they
1303    * would sort when using signed long comparison.
1304    * </p>
1305    * <p>
1306    * All Longs are serialized to an 8-byte, fixed-width sortable byte format.
1307    * Serialization is performed by inverting the integer sign bit and writing
1308    * the resulting bytes to the byte array in big endian order. The encoded
1309    * value is prefixed by the {@link #FIXED_INT64} header byte. This encoding
1310    * is designed to handle java language primitives and so Null values are NOT
1311    * supported by this implementation.
1312    * </p>
1313    * <p>
1314    * For example:
1315    * </p>
1316    * <pre>
1317    * Input:   0x0000000000000005 (5)
1318    * Result:  0x288000000000000005
1319    *
1320    * Input:   0xfffffffffffffffb (-4)
1321    * Result:  0x280000000000000004
1322    *
1323    * Input:   0x7fffffffffffffff (Long.MAX_VALUE)
1324    * Result:  0x28ffffffffffffffff
1325    *
1326    * Input:   0x8000000000000000 (Long.MIN_VALUE)
1327    * Result:  0x287fffffffffffffff
1328    * </pre>
1329    * <p>
1330    * This encoding format, and much of this documentation string, is based on
1331    * Orderly's {@code FixedIntWritableRowKey}.
1332    * </p>
1333    * @return the number of bytes written.
1334    * @see #decodeInt64(PositionedByteRange)
1335    */
1336   public static int encodeInt64(PositionedByteRange dst, long val, Order ord) {
1337     final int offset = dst.getOffset(), start = dst.getPosition();
1338     dst.put(FIXED_INT64)
1339        .put((byte) ((val >> 56) ^ 0x80))
1340        .put((byte) (val >> 48))
1341        .put((byte) (val >> 40))
1342        .put((byte) (val >> 32))
1343        .put((byte) (val >> 24))
1344        .put((byte) (val >> 16))
1345        .put((byte) (val >> 8))
1346        .put((byte) val);
1347     ord.apply(dst.getBytes(), offset + start, 9);
1348     return 9;
1349   }
1350 
1351   /**
1352    * Decode an {@code int64} value.
1353    * @see #encodeInt64(PositionedByteRange, long, Order)
1354    */
1355   public static long decodeInt64(PositionedByteRange src) {
1356     final byte header = src.get();
1357     assert header == FIXED_INT64 || header == DESCENDING.apply(FIXED_INT64);
1358     Order ord = header == FIXED_INT64 ? ASCENDING : DESCENDING;
1359     long val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1360     for (int i = 1; i < 8; i++) {
1361       val = (val << 8) + (ord.apply(src.get()) & 0xff);
1362     }
1363     return val;
1364   }
1365 
1366   /**
1367    * Encode a 32-bit floating point value using the fixed-length encoding.
1368    * Encoding format is described at length in
1369    * {@link #encodeFloat64(PositionedByteRange, double, Order)}.
1370    * @return the number of bytes written.
1371    * @see #decodeFloat32(PositionedByteRange)
1372    * @see #encodeFloat64(PositionedByteRange, double, Order)
1373    */
1374   public static int encodeFloat32(PositionedByteRange dst, float val, Order ord) {
1375     final int offset = dst.getOffset(), start = dst.getPosition();
1376     int i = Float.floatToIntBits(val);
1377     i ^= ((i >> Integer.SIZE - 1) | Integer.MIN_VALUE);
1378     dst.put(FIXED_FLOAT32)
1379         .put((byte) (i >> 24))
1380         .put((byte) (i >> 16))
1381         .put((byte) (i >> 8))
1382         .put((byte) i);
1383     ord.apply(dst.getBytes(), offset + start, 5);
1384     return 5;
1385   }
1386 
1387   /**
1388    * Decode a 32-bit floating point value using the fixed-length encoding.
1389    * @see #encodeFloat32(PositionedByteRange, float, Order)
1390    */
1391   public static float decodeFloat32(PositionedByteRange src) {
1392     final byte header = src.get();
1393     assert header == FIXED_FLOAT32 || header == DESCENDING.apply(FIXED_FLOAT32);
1394     Order ord = header == FIXED_FLOAT32 ? ASCENDING : DESCENDING;
1395     int val = ord.apply(src.get()) & 0xff;
1396     for (int i = 1; i < 4; i++) {
1397       val = (val << 8) + (ord.apply(src.get()) & 0xff);
1398     }
1399     val ^= (~val >> Integer.SIZE - 1) | Integer.MIN_VALUE;
1400     return Float.intBitsToFloat(val);
1401   }
1402 
1403   /**
1404    * Encode a 64-bit floating point value using the fixed-length encoding.
1405    * <p>
1406    * This format ensures the following total ordering of floating point
1407    * values: Double.NEGATIVE_INFINITY &lt; -Double.MAX_VALUE &lt; ... &lt;
1408    * -Double.MIN_VALUE &lt; -0.0 &lt; +0.0; &lt; Double.MIN_VALUE &lt; ...
1409    * &lt; Double.MAX_VALUE &lt; Double.POSITIVE_INFINITY &lt; Double.NaN
1410    * </p>
1411    * Floating point numbers are encoded as specified in IEEE 754. A 64-bit
1412    * double precision float consists of a sign bit, 11-bit unsigned exponent
1413    * encoded in offset-1023 notation, and a 52-bit significand. The format is
1414    * described further in the <a
1415    * href="http://en.wikipedia.org/wiki/Double_precision"> Double Precision
1416    * Floating Point Wikipedia page</a> </p>
1417    * <p>
1418    * The value of a normal float is -1 <sup>sign bit</sup> &times;
1419    * 2<sup>exponent - 1023</sup> &times; 1.significand
1420    * </p>
1421    * <p>
1422    * The IEE754 floating point format already preserves sort ordering for
1423    * positive floating point numbers when the raw bytes are compared in most
1424    * significant byte order. This is discussed further at <a href=
1425    * "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm"
1426    * > http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.
1427    * htm</a>
1428    * </p>
1429    * <p>
1430    * Thus, we need only ensure that negative numbers sort in the the exact
1431    * opposite order as positive numbers (so that say, negative infinity is
1432    * less than negative 1), and that all negative numbers compare less than
1433    * any positive number. To accomplish this, we invert the sign bit of all
1434    * floating point numbers, and we also invert the exponent and significand
1435    * bits if the floating point number was negative.
1436    * </p>
1437    * <p>
1438    * More specifically, we first store the floating point bits into a 64-bit
1439    * long {@code l} using {@link Double#doubleToLongBits}. This method
1440    * collapses all NaNs into a single, canonical NaN value but otherwise
1441    * leaves the bits unchanged. We then compute
1442    * </p>
1443    * <pre>
1444    * l &circ;= (l &gt;&gt; (Long.SIZE - 1)) | Long.MIN_SIZE
1445    * </pre>
1446    * <p>
1447    * which inverts the sign bit and XOR's all other bits with the sign bit
1448    * itself. Comparing the raw bytes of {@code l} in most significant
1449    * byte order is equivalent to performing a double precision floating point
1450    * comparison on the underlying bits (ignoring NaN comparisons, as NaNs
1451    * don't compare equal to anything when performing floating point
1452    * comparisons).
1453    * </p>
1454    * <p>
1455    * The resulting long integer is then converted into a byte array by
1456    * serializing the long one byte at a time in most significant byte order.
1457    * The serialized integer is prefixed by a single header byte. All
1458    * serialized values are 9 bytes in length.
1459    * </p>
1460    * <p>
1461    * This encoding format, and much of this highly detailed documentation
1462    * string, is based on Orderly's {@code DoubleWritableRowKey}.
1463    * </p>
1464    * @return the number of bytes written.
1465    * @see #decodeFloat64(PositionedByteRange)
1466    */
1467   public static int encodeFloat64(PositionedByteRange dst, double val, Order ord) {
1468     final int offset = dst.getOffset(), start = dst.getPosition();
1469     long lng = Double.doubleToLongBits(val);
1470     lng ^= ((lng >> Long.SIZE - 1) | Long.MIN_VALUE);
1471     dst.put(FIXED_FLOAT64)
1472         .put((byte) (lng >> 56))
1473         .put((byte) (lng >> 48))
1474         .put((byte) (lng >> 40))
1475         .put((byte) (lng >> 32))
1476         .put((byte) (lng >> 24))
1477         .put((byte) (lng >> 16))
1478         .put((byte) (lng >> 8))
1479         .put((byte) lng);
1480     ord.apply(dst.getBytes(), offset + start, 9);
1481     return 9;
1482   }
1483 
1484   /**
1485    * Decode a 64-bit floating point value using the fixed-length encoding.
1486    * @see #encodeFloat64(PositionedByteRange, double, Order)
1487    */
1488   public static double decodeFloat64(PositionedByteRange src) {
1489     final byte header = src.get();
1490     assert header == FIXED_FLOAT64 || header == DESCENDING.apply(FIXED_FLOAT64);
1491     Order ord = header == FIXED_FLOAT64 ? ASCENDING : DESCENDING;
1492     long val = ord.apply(src.get()) & 0xff;
1493     for (int i = 1; i < 8; i++) {
1494       val = (val << 8) + (ord.apply(src.get()) & 0xff);
1495     }
1496     val ^= (~val >> Long.SIZE - 1) | Long.MIN_VALUE;
1497     return Double.longBitsToDouble(val);
1498   }
1499 
1500   /**
1501    * Returns true when {@code src} appears to be positioned an encoded value,
1502    * false otherwise.
1503    */
1504   public static boolean isEncodedValue(PositionedByteRange src) {
1505     return isNull(src) || isNumeric(src) || isFixedInt32(src) || isFixedInt64(src)
1506         || isFixedFloat32(src) || isFixedFloat64(src) || isText(src) || isBlobCopy(src)
1507         || isBlobVar(src);
1508   }
1509 
1510   /**
1511    * Return true when the next encoded value in {@code src} is null, false
1512    * otherwise.
1513    */
1514   public static boolean isNull(PositionedByteRange src) {
1515     return NULL ==
1516         (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1517   }
1518 
1519   /**
1520    * Return true when the next encoded value in {@code src} uses Numeric
1521    * encoding, false otherwise. {@code NaN}, {@code +/-Inf} are valid Numeric
1522    * values.
1523    */
1524   public static boolean isNumeric(PositionedByteRange src) {
1525     byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1526     return x >= NEG_INF && x <= NAN;
1527   }
1528 
1529   /**
1530    * Return true when the next encoded value in {@code src} uses Numeric
1531    * encoding and is {@code Infinite}, false otherwise.
1532    */
1533   public static boolean isNumericInfinite(PositionedByteRange src) {
1534     byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1535     return NEG_INF == x || POS_INF == x;
1536   }
1537 
1538   /**
1539    * Return true when the next encoded value in {@code src} uses Numeric
1540    * encoding and is {@code NaN}, false otherwise.
1541    */
1542   public static boolean isNumericNaN(PositionedByteRange src) {
1543     return NAN == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1544   }
1545 
1546   /**
1547    * Return true when the next encoded value in {@code src} uses Numeric
1548    * encoding and is {@code 0}, false otherwise.
1549    */
1550   public static boolean isNumericZero(PositionedByteRange src) {
1551     return ZERO ==
1552         (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1553   }
1554 
1555   /**
1556    * Return true when the next encoded value in {@code src} uses fixed-width
1557    * Int32 encoding, false otherwise.
1558    */
1559   public static boolean isFixedInt32(PositionedByteRange src) {
1560     return FIXED_INT32 ==
1561         (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1562   }
1563 
1564   /**
1565    * Return true when the next encoded value in {@code src} uses fixed-width
1566    * Int64 encoding, false otherwise.
1567    */
1568   public static boolean isFixedInt64(PositionedByteRange src) {
1569     return FIXED_INT64 ==
1570         (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1571   }
1572 
1573   /**
1574    * Return true when the next encoded value in {@code src} uses fixed-width
1575    * Float32 encoding, false otherwise.
1576    */
1577   public static boolean isFixedFloat32(PositionedByteRange src) {
1578     return FIXED_FLOAT32 ==
1579         (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1580   }
1581 
1582   /**
1583    * Return true when the next encoded value in {@code src} uses fixed-width
1584    * Float64 encoding, false otherwise.
1585    */
1586   public static boolean isFixedFloat64(PositionedByteRange src) {
1587     return FIXED_FLOAT64 ==
1588         (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1589   }
1590 
1591   /**
1592    * Return true when the next encoded value in {@code src} uses Text encoding,
1593    * false otherwise.
1594    */
1595   public static boolean isText(PositionedByteRange src) {
1596     return TEXT ==
1597         (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1598   }
1599 
1600   /**
1601    * Return true when the next encoded value in {@code src} uses BlobVar
1602    * encoding, false otherwise.
1603    */
1604   public static boolean isBlobVar(PositionedByteRange src) {
1605     return BLOB_VAR ==
1606         (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1607   }
1608 
1609   /**
1610    * Return true when the next encoded value in {@code src} uses BlobCopy
1611    * encoding, false otherwise.
1612    */
1613   public static boolean isBlobCopy(PositionedByteRange src) {
1614     return BLOB_COPY ==
1615         (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1616   }
1617 
1618   /**
1619    * Skip {@code buff}'s position forward over one encoded value.
1620    * @return number of bytes skipped.
1621    */
1622   public static int skip(PositionedByteRange src) {
1623     final int start = src.getPosition();
1624     byte header = src.get();
1625     Order ord = (-1 == Integer.signum(header)) ? DESCENDING : ASCENDING;
1626     header = ord.apply(header);
1627 
1628     switch (header) {
1629       case NULL:
1630       case NEG_INF:
1631         return 1;
1632       case NEG_LARGE: /* Large negative number: 0x08, ~E, ~M */
1633         skipVaruint64(src, DESCENDING != ord);
1634         skipSignificand(src, DESCENDING != ord);
1635         return src.getPosition() - start;
1636       case NEG_MED_MIN: /* Medium negative number: 0x13-E, ~M */
1637       case NEG_MED_MIN + 0x01:
1638       case NEG_MED_MIN + 0x02:
1639       case NEG_MED_MIN + 0x03:
1640       case NEG_MED_MIN + 0x04:
1641       case NEG_MED_MIN + 0x05:
1642       case NEG_MED_MIN + 0x06:
1643       case NEG_MED_MIN + 0x07:
1644       case NEG_MED_MIN + 0x08:
1645       case NEG_MED_MIN + 0x09:
1646       case NEG_MED_MAX:
1647         skipSignificand(src, DESCENDING != ord);
1648         return src.getPosition() - start;
1649       case NEG_SMALL: /* Small negative number: 0x14, -E, ~M */
1650         skipVaruint64(src, DESCENDING == ord);
1651         skipSignificand(src, DESCENDING != ord);
1652         return src.getPosition() - start;
1653       case ZERO:
1654         return 1;
1655       case POS_SMALL: /* Small positive number: 0x16, ~-E, M */
1656         skipVaruint64(src, DESCENDING != ord);
1657         skipSignificand(src, DESCENDING == ord);
1658         return src.getPosition() - start;
1659       case POS_MED_MIN: /* Medium positive number: 0x17+E, M */
1660       case POS_MED_MIN + 0x01:
1661       case POS_MED_MIN + 0x02:
1662       case POS_MED_MIN + 0x03:
1663       case POS_MED_MIN + 0x04:
1664       case POS_MED_MIN + 0x05:
1665       case POS_MED_MIN + 0x06:
1666       case POS_MED_MIN + 0x07:
1667       case POS_MED_MIN + 0x08:
1668       case POS_MED_MIN + 0x09:
1669       case POS_MED_MAX:
1670         skipSignificand(src, DESCENDING == ord);
1671         return src.getPosition() - start;
1672       case POS_LARGE: /* Large positive number: 0x22, E, M */
1673         skipVaruint64(src, DESCENDING == ord);
1674         skipSignificand(src, DESCENDING == ord);
1675         return src.getPosition() - start;
1676       case POS_INF:
1677         return 1;
1678       case NAN:
1679         return 1;
1680       case FIXED_INT8:
1681         src.setPosition(src.getPosition() + 1);
1682         return src.getPosition() - start;
1683       case FIXED_INT16:
1684         src.setPosition(src.getPosition() + 2);
1685         return src.getPosition() - start;
1686       case FIXED_INT32:
1687         src.setPosition(src.getPosition() + 4);
1688         return src.getPosition() - start;
1689       case FIXED_INT64:
1690         src.setPosition(src.getPosition() + 8);
1691         return src.getPosition() - start;
1692       case FIXED_FLOAT32:
1693         src.setPosition(src.getPosition() + 4);
1694         return src.getPosition() - start;
1695       case FIXED_FLOAT64:
1696         src.setPosition(src.getPosition() + 8);
1697         return src.getPosition() - start;
1698       case TEXT:
1699         // for null-terminated values, skip to the end.
1700         do {
1701           header = ord.apply(src.get());
1702         } while (header != TERM);
1703         return src.getPosition() - start;
1704       case BLOB_VAR:
1705         // read until we find a 0 in the MSB
1706         do {
1707           header = ord.apply(src.get());
1708         } while ((byte) (header & 0x80) != TERM);
1709         return src.getPosition() - start;
1710       case BLOB_COPY:
1711         if (Order.DESCENDING == ord) {
1712           // if descending, read to termination byte.
1713           do {
1714             header = ord.apply(src.get());
1715           } while (header != TERM);
1716           return src.getPosition() - start;
1717         } else {
1718           // otherwise, just skip to the end.
1719           src.setPosition(src.getLength());
1720           return src.getPosition() - start;
1721         }
1722       default:
1723         throw unexpectedHeader(header);
1724     }
1725   }
1726 
1727   /**
1728    * Return the number of encoded entries remaining in {@code buff}. The
1729    * state of {@code buff} is not modified through use of this method.
1730    */
1731   public static int length(PositionedByteRange buff) {
1732     PositionedByteRange b =
1733         new SimplePositionedByteRange(buff.getBytes(), buff.getOffset(), buff.getLength());
1734     b.setPosition(buff.getPosition());
1735     int cnt = 0;
1736     for (; isEncodedValue(b); skip(buff), cnt++)
1737       ;
1738     return cnt;
1739   }
1740 }