View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import static org.apache.hadoop.hbase.util.Bytes.len;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.OutputStream;
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Comparator;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.io.HeapSize;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.ClassSize;
43  import org.apache.hadoop.io.IOUtils;
44  import org.apache.hadoop.io.RawComparator;
45  
46  import com.google.common.primitives.Longs;
47  
48  /**
49   * An HBase Key/Value. This is the fundamental HBase Type.  
50   * <p>
51   * HBase applications and users should use the Cell interface and avoid directly using KeyValue
52   * and member functions not defined in Cell.
53   * <p>
54   * If being used client-side, the primary methods to access individual fields are {@link #getRow()},
55   * {@link #getFamily()}, {@link #getQualifier()}, {@link #getTimestamp()}, and {@link #getValue()}.
56   * These methods allocate new byte arrays and return copies. Avoid their use server-side.
57   * <p>
58   * Instances of this class are immutable. They do not implement Comparable but Comparators are
59   * provided. Comparators change with context, whether user table or a catalog table comparison. Its
60   * critical you use the appropriate comparator. There are Comparators for normal HFiles, Meta's
61   * Hfiles, and bloom filter keys.
62   * <p>
63   * KeyValue wraps a byte array and takes offsets and lengths into passed array at where to start
64   * interpreting the content as KeyValue. The KeyValue format inside a byte array is:
65   * <code>&lt;keylength> &lt;valuelength> &lt;key> &lt;value></code> Key is further decomposed as:
66   * <code>&lt;rowlength> &lt;row> &lt;columnfamilylength> &lt;columnfamily> &lt;columnqualifier>
67   * &lt;timestamp> &lt;keytype></code>
68   * The <code>rowlength</code> maximum is <code>Short.MAX_SIZE</code>, column family length maximum
69   * is <code>Byte.MAX_SIZE</code>, and column qualifier + key length must be <
70   * <code>Integer.MAX_SIZE</code>. The column does not contain the family/qualifier delimiter,
71   * {@link #COLUMN_FAMILY_DELIMITER}<br>
72   * KeyValue can optionally contain Tags. When it contains tags, it is added in the byte array after
73   * the value part. The format for this part is: <code>&lt;tagslength>&lt;tagsbytes></code>.
74   * <code>tagslength</code> maximum is <code>Short.MAX_SIZE</code>. The <code>tagsbytes</code>
75   * contain one or more tags where as each tag is of the form
76   * <code>&lt;taglength>&lt;tagtype>&lt;tagbytes></code>.  <code>tagtype</code> is one byte and
77   * <code>taglength</code> maximum is <code>Short.MAX_SIZE</code> and it includes 1 byte type length
78   * and actual tag bytes length.
79   */
80  @InterfaceAudience.Private
81  public class KeyValue implements Cell, HeapSize, Cloneable {
82    private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>();
83  
84    static final Log LOG = LogFactory.getLog(KeyValue.class);
85  
86    /**
87     * Colon character in UTF-8
88     */
89    public static final char COLUMN_FAMILY_DELIMITER = ':';
90  
91    public static final byte[] COLUMN_FAMILY_DELIM_ARRAY =
92      new byte[]{COLUMN_FAMILY_DELIMITER};
93  
94    /**
95     * Comparator for plain key/values; i.e. non-catalog table key/values. Works on Key portion
96     * of KeyValue only.
97     */
98    public static final KVComparator COMPARATOR = new KVComparator();
99    /**
100    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
101    * {@link KeyValue}s.
102    */
103   public static final KVComparator META_COMPARATOR = new MetaComparator();
104 
105   /**
106    * Needed for Bloom Filters.
107    */
108   public static final KVComparator RAW_COMPARATOR = new RawBytesComparator();
109 
110   /** Size of the key length field in bytes*/
111   public static final int KEY_LENGTH_SIZE = Bytes.SIZEOF_INT;
112 
113   /** Size of the key type field in bytes */
114   public static final int TYPE_SIZE = Bytes.SIZEOF_BYTE;
115 
116   /** Size of the row length field in bytes */
117   public static final int ROW_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
118 
119   /** Size of the family length field in bytes */
120   public static final int FAMILY_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
121 
122   /** Size of the timestamp field in bytes */
123   public static final int TIMESTAMP_SIZE = Bytes.SIZEOF_LONG;
124 
125   // Size of the timestamp and type byte on end of a key -- a long + a byte.
126   public static final int TIMESTAMP_TYPE_SIZE = TIMESTAMP_SIZE + TYPE_SIZE;
127 
128   // Size of the length shorts and bytes in key.
129   public static final int KEY_INFRASTRUCTURE_SIZE = ROW_LENGTH_SIZE
130       + FAMILY_LENGTH_SIZE + TIMESTAMP_TYPE_SIZE;
131 
132   // How far into the key the row starts at. First thing to read is the short
133   // that says how long the row is.
134   public static final int ROW_OFFSET =
135     Bytes.SIZEOF_INT /*keylength*/ +
136     Bytes.SIZEOF_INT /*valuelength*/;
137 
138   // Size of the length ints in a KeyValue datastructure.
139   public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET;
140 
141   /** Size of the tags length field in bytes */
142   public static final int TAGS_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
143 
144   public static final int KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE = ROW_OFFSET + TAGS_LENGTH_SIZE;
145 
146   private static final int MAX_TAGS_LENGTH = (2 * Short.MAX_VALUE) + 1;
147 
148   /**
149    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
150    * characteristics would take up for its underlying data structure.
151    *
152    * @param rlength row length
153    * @param flength family length
154    * @param qlength qualifier length
155    * @param vlength value length
156    *
157    * @return the <code>KeyValue</code> data structure length
158    */
159   public static long getKeyValueDataStructureSize(int rlength,
160       int flength, int qlength, int vlength) {
161     return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE
162         + getKeyDataStructureSize(rlength, flength, qlength) + vlength;
163   }
164 
165   /**
166    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
167    * characteristics would take up for its underlying data structure.
168    *
169    * @param rlength row length
170    * @param flength family length
171    * @param qlength qualifier length
172    * @param vlength value length
173    * @param tagsLength total length of the tags
174    *
175    * @return the <code>KeyValue</code> data structure length
176    */
177   public static long getKeyValueDataStructureSize(int rlength, int flength, int qlength,
178       int vlength, int tagsLength) {
179     if (tagsLength == 0) {
180       return getKeyValueDataStructureSize(rlength, flength, qlength, vlength);
181     }
182     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE
183         + getKeyDataStructureSize(rlength, flength, qlength) + vlength + tagsLength;
184   }
185 
186   /**
187    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
188    * characteristics would take up for its underlying data structure.
189    *
190    * @param klength key length
191    * @param vlength value length
192    * @param tagsLength total length of the tags
193    *
194    * @return the <code>KeyValue</code> data structure length
195    */
196   public static long getKeyValueDataStructureSize(int klength, int vlength, int tagsLength) {
197     if (tagsLength == 0) {
198       return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + klength + vlength;
199     }
200     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + klength + vlength + tagsLength;
201   }
202 
203   /**
204    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
205    * characteristics would take up in its underlying data structure for the key.
206    *
207    * @param rlength row length
208    * @param flength family length
209    * @param qlength qualifier length
210    *
211    * @return the key data structure length
212    */
213   public static long getKeyDataStructureSize(int rlength, int flength, int qlength) {
214     return KeyValue.KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
215   }
216 
217   /**
218    * Key type.
219    * Has space for other key types to be added later.  Cannot rely on
220    * enum ordinals . They change if item is removed or moved.  Do our own codes.
221    */
222   public static enum Type {
223     Minimum((byte)0),
224     Put((byte)4),
225 
226     Delete((byte)8),
227     DeleteFamilyVersion((byte)10),
228     DeleteColumn((byte)12),
229     DeleteFamily((byte)14),
230 
231     // Maximum is used when searching; you look from maximum on down.
232     Maximum((byte)255);
233 
234     private final byte code;
235 
236     Type(final byte c) {
237       this.code = c;
238     }
239 
240     public byte getCode() {
241       return this.code;
242     }
243 
244     /**
245      * Cannot rely on enum ordinals . They change if item is removed or moved.
246      * Do our own codes.
247      * @param b
248      * @return Type associated with passed code.
249      */
250     public static Type codeToType(final byte b) {
251       for (Type t : Type.values()) {
252         if (t.getCode() == b) {
253           return t;
254         }
255       }
256       throw new RuntimeException("Unknown code " + b);
257     }
258   }
259 
260   /**
261    * Lowest possible key.
262    * Makes a Key with highest possible Timestamp, empty row and column.  No
263    * key can be equal or lower than this one in memstore or in store file.
264    */
265   public static final KeyValue LOWESTKEY =
266     new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP);
267 
268   ////
269   // KeyValue core instance fields.
270   private byte [] bytes = null;  // an immutable byte array that contains the KV
271   private int offset = 0;  // offset into bytes buffer KV starts at
272   private int length = 0;  // length of the KV starting from offset.
273 
274   /**
275    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
276    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
277    * KeyValue type.
278    */
279   public static boolean isDelete(byte t) {
280     return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
281   }
282 
283   /** Here be dragons **/
284 
285   // used to achieve atomic operations in the memstore.
286   @Override
287   public long getMvccVersion() {
288     return mvcc;
289   }
290 
291   public void setMvccVersion(long mvccVersion){
292     this.mvcc = mvccVersion;
293   }
294 
295   // multi-version concurrency control version.  default value is 0, aka do not care.
296   private long mvcc = 0;  // this value is not part of a serialized KeyValue (not in HFiles)
297 
298   /** Dragon time over, return to normal business */
299 
300 
301   /** Writable Constructor -- DO NOT USE */
302   public KeyValue() {}
303 
304   /**
305    * Creates a KeyValue from the start of the specified byte array.
306    * Presumes <code>bytes</code> content is formatted as a KeyValue blob.
307    * @param bytes byte array
308    */
309   public KeyValue(final byte [] bytes) {
310     this(bytes, 0);
311   }
312 
313   /**
314    * Creates a KeyValue from the specified byte array and offset.
315    * Presumes <code>bytes</code> content starting at <code>offset</code> is
316    * formatted as a KeyValue blob.
317    * @param bytes byte array
318    * @param offset offset to start of KeyValue
319    */
320   public KeyValue(final byte [] bytes, final int offset) {
321     this(bytes, offset, getLength(bytes, offset));
322   }
323 
324   /**
325    * Creates a KeyValue from the specified byte array, starting at offset, and
326    * for length <code>length</code>.
327    * @param bytes byte array
328    * @param offset offset to start of the KeyValue
329    * @param length length of the KeyValue
330    */
331   public KeyValue(final byte [] bytes, final int offset, final int length) {
332     this.bytes = bytes;
333     this.offset = offset;
334     this.length = length;
335   }
336 
337   /**
338    * Creates a KeyValue from the specified byte array, starting at offset, and
339    * for length <code>length</code>.
340    *
341    * @param bytes  byte array
342    * @param offset offset to start of the KeyValue
343    * @param length length of the KeyValue
344    * @param ts
345    */
346   public KeyValue(final byte[] bytes, final int offset, final int length, long ts) {
347     this(bytes, offset, length, null, 0, 0, null, 0, 0, ts, Type.Maximum, null, 0, 0, null);
348   }
349 
350   /** Constructors that build a new backing byte array from fields */
351 
352   /**
353    * Constructs KeyValue structure filled with null value.
354    * Sets type to {@link KeyValue.Type#Maximum}
355    * @param row - row key (arbitrary byte array)
356    * @param timestamp
357    */
358   public KeyValue(final byte [] row, final long timestamp) {
359     this(row, null, null, timestamp, Type.Maximum, null);
360   }
361 
362   /**
363    * Constructs KeyValue structure filled with null value.
364    * @param row - row key (arbitrary byte array)
365    * @param timestamp
366    */
367   public KeyValue(final byte [] row, final long timestamp, Type type) {
368     this(row, null, null, timestamp, type, null);
369   }
370 
371   /**
372    * Constructs KeyValue structure filled with null value.
373    * Sets type to {@link KeyValue.Type#Maximum}
374    * @param row - row key (arbitrary byte array)
375    * @param family family name
376    * @param qualifier column qualifier
377    */
378   public KeyValue(final byte [] row, final byte [] family,
379       final byte [] qualifier) {
380     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
381   }
382 
383   /**
384    * Constructs KeyValue structure filled with null value.
385    * @param row - row key (arbitrary byte array)
386    * @param family family name
387    * @param qualifier column qualifier
388    */
389   public KeyValue(final byte [] row, final byte [] family,
390       final byte [] qualifier, final byte [] value) {
391     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value);
392   }
393 
394   /**
395    * Constructs KeyValue structure filled with specified values.
396    * @param row row key
397    * @param family family name
398    * @param qualifier column qualifier
399    * @param timestamp version timestamp
400    * @param type key type
401    * @throws IllegalArgumentException
402    */
403   public KeyValue(final byte[] row, final byte[] family,
404       final byte[] qualifier, final long timestamp, Type type) {
405     this(row, family, qualifier, timestamp, type, null);
406   }
407 
408   /**
409    * Constructs KeyValue structure filled with specified values.
410    * @param row row key
411    * @param family family name
412    * @param qualifier column qualifier
413    * @param timestamp version timestamp
414    * @param value column value
415    * @throws IllegalArgumentException
416    */
417   public KeyValue(final byte[] row, final byte[] family,
418       final byte[] qualifier, final long timestamp, final byte[] value) {
419     this(row, family, qualifier, timestamp, Type.Put, value);
420   }
421 
422   /**
423    * Constructs KeyValue structure filled with specified values.
424    * @param row row key
425    * @param family family name
426    * @param qualifier column qualifier
427    * @param timestamp version timestamp
428    * @param value column value
429    * @param tags tags
430    * @throws IllegalArgumentException
431    */
432   public KeyValue(final byte[] row, final byte[] family,
433       final byte[] qualifier, final long timestamp, final byte[] value,
434       final Tag[] tags) {
435     this(row, family, qualifier, timestamp, value, tags != null ? Arrays.asList(tags) : null);
436   }
437 
438   /**
439    * Constructs KeyValue structure filled with specified values.
440    * @param row row key
441    * @param family family name
442    * @param qualifier column qualifier
443    * @param timestamp version timestamp
444    * @param value column value
445    * @param tags tags non-empty list of tags or null
446    * @throws IllegalArgumentException
447    */
448   public KeyValue(final byte[] row, final byte[] family,
449       final byte[] qualifier, final long timestamp, final byte[] value,
450       final List<Tag> tags) {
451     this(row, 0, row==null ? 0 : row.length,
452       family, 0, family==null ? 0 : family.length,
453       qualifier, 0, qualifier==null ? 0 : qualifier.length,
454       timestamp, Type.Put,
455       value, 0, value==null ? 0 : value.length, tags);
456   }
457 
458   /**
459    * Constructs KeyValue structure filled with specified values.
460    * @param row row key
461    * @param family family name
462    * @param qualifier column qualifier
463    * @param timestamp version timestamp
464    * @param type key type
465    * @param value column value
466    * @throws IllegalArgumentException
467    */
468   public KeyValue(final byte[] row, final byte[] family,
469       final byte[] qualifier, final long timestamp, Type type,
470       final byte[] value) {
471     this(row, 0, len(row),   family, 0, len(family),   qualifier, 0, len(qualifier),
472         timestamp, type,   value, 0, len(value));
473   }
474 
475   /**
476    * Constructs KeyValue structure filled with specified values.
477    * <p>
478    * Column is split into two fields, family and qualifier.
479    * @param row row key
480    * @param family family name
481    * @param qualifier column qualifier
482    * @param timestamp version timestamp
483    * @param type key type
484    * @param value column value
485    * @throws IllegalArgumentException
486    */
487   public KeyValue(final byte[] row, final byte[] family,
488       final byte[] qualifier, final long timestamp, Type type,
489       final byte[] value, final List<Tag> tags) {
490     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
491         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
492   }
493 
494   /**
495    * Constructs KeyValue structure filled with specified values.
496    * @param row row key
497    * @param family family name
498    * @param qualifier column qualifier
499    * @param timestamp version timestamp
500    * @param type key type
501    * @param value column value
502    * @throws IllegalArgumentException
503    */
504   public KeyValue(final byte[] row, final byte[] family,
505       final byte[] qualifier, final long timestamp, Type type,
506       final byte[] value, final byte[] tags) {
507     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
508         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
509   }
510 
511   /**
512    * Constructs KeyValue structure filled with specified values.
513    * @param row row key
514    * @param family family name
515    * @param qualifier column qualifier
516    * @param qoffset qualifier offset
517    * @param qlength qualifier length
518    * @param timestamp version timestamp
519    * @param type key type
520    * @param value column value
521    * @param voffset value offset
522    * @param vlength value length
523    * @throws IllegalArgumentException
524    */
525   public KeyValue(byte [] row, byte [] family,
526       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
527       byte [] value, int voffset, int vlength, List<Tag> tags) {
528     this(row, 0, row==null ? 0 : row.length,
529         family, 0, family==null ? 0 : family.length,
530         qualifier, qoffset, qlength, timestamp, type,
531         value, voffset, vlength, tags);
532   }
533 
534   /**
535    * @param row
536    * @param family
537    * @param qualifier
538    * @param qoffset
539    * @param qlength
540    * @param timestamp
541    * @param type
542    * @param value
543    * @param voffset
544    * @param vlength
545    * @param tags
546    */
547   public KeyValue(byte [] row, byte [] family,
548       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
549       byte [] value, int voffset, int vlength, byte[] tags) {
550     this(row, 0, row==null ? 0 : row.length,
551         family, 0, family==null ? 0 : family.length,
552         qualifier, qoffset, qlength, timestamp, type,
553         value, voffset, vlength, tags, 0, tags==null ? 0 : tags.length);
554   }
555 
556   /**
557    * Constructs KeyValue structure filled with specified values.
558    * <p>
559    * Column is split into two fields, family and qualifier.
560    * @param row row key
561    * @throws IllegalArgumentException
562    */
563   public KeyValue(final byte [] row, final int roffset, final int rlength,
564       final byte [] family, final int foffset, final int flength,
565       final byte [] qualifier, final int qoffset, final int qlength,
566       final long timestamp, final Type type,
567       final byte [] value, final int voffset, final int vlength) {
568     this(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
569       qlength, timestamp, type, value, voffset, vlength, null);
570   }
571   
572   /**
573    * Constructs KeyValue structure filled with specified values. Uses the provided buffer as the
574    * data buffer.
575    * <p>
576    * Column is split into two fields, family and qualifier.
577    *
578    * @param buffer the bytes buffer to use
579    * @param boffset buffer offset
580    * @param row row key
581    * @param roffset row offset
582    * @param rlength row length
583    * @param family family name
584    * @param foffset family offset
585    * @param flength family length
586    * @param qualifier column qualifier
587    * @param qoffset qualifier offset
588    * @param qlength qualifier length
589    * @param timestamp version timestamp
590    * @param type key type
591    * @param value column value
592    * @param voffset value offset
593    * @param vlength value length
594    * @param tags non-empty list of tags or null
595    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
596    * remaining in the buffer
597    */
598   public KeyValue(byte [] buffer, final int boffset,
599       final byte [] row, final int roffset, final int rlength,
600       final byte [] family, final int foffset, final int flength,
601       final byte [] qualifier, final int qoffset, final int qlength,
602       final long timestamp, final Type type,
603       final byte [] value, final int voffset, final int vlength,
604       final Tag[] tags) {
605      this.bytes  = buffer;
606      this.length = writeByteArray(buffer, boffset,
607          row, roffset, rlength,
608          family, foffset, flength, qualifier, qoffset, qlength,
609         timestamp, type, value, voffset, vlength, tags);
610      this.offset = boffset;
611    }
612 
613   /**
614    * Constructs KeyValue structure filled with specified values.
615    * <p>
616    * Column is split into two fields, family and qualifier.
617    * @param row row key
618    * @param roffset row offset
619    * @param rlength row length
620    * @param family family name
621    * @param foffset family offset
622    * @param flength family length
623    * @param qualifier column qualifier
624    * @param qoffset qualifier offset
625    * @param qlength qualifier length
626    * @param timestamp version timestamp
627    * @param type key type
628    * @param value column value
629    * @param voffset value offset
630    * @param vlength value length
631    * @param tags tags
632    * @throws IllegalArgumentException
633    */
634   public KeyValue(final byte [] row, final int roffset, final int rlength,
635       final byte [] family, final int foffset, final int flength,
636       final byte [] qualifier, final int qoffset, final int qlength,
637       final long timestamp, final Type type,
638       final byte [] value, final int voffset, final int vlength,
639       final List<Tag> tags) {
640     this.bytes = createByteArray(row, roffset, rlength,
641         family, foffset, flength, qualifier, qoffset, qlength,
642         timestamp, type, value, voffset, vlength, tags);
643     this.length = bytes.length;
644     this.offset = 0;
645   }
646 
647   /**
648    * @param row
649    * @param roffset
650    * @param rlength
651    * @param family
652    * @param foffset
653    * @param flength
654    * @param qualifier
655    * @param qoffset
656    * @param qlength
657    * @param timestamp
658    * @param type
659    * @param value
660    * @param voffset
661    * @param vlength
662    * @param tags
663    */
664   public KeyValue(final byte [] row, final int roffset, final int rlength,
665       final byte [] family, final int foffset, final int flength,
666       final byte [] qualifier, final int qoffset, final int qlength,
667       final long timestamp, final Type type,
668       final byte [] value, final int voffset, final int vlength,
669       final byte[] tags, final int tagsOffset, final int tagsLength) {
670     this.bytes = createByteArray(row, roffset, rlength,
671         family, foffset, flength, qualifier, qoffset, qlength,
672         timestamp, type, value, voffset, vlength, tags, tagsOffset, tagsLength);
673     this.length = bytes.length;
674     this.offset = 0;
675   }
676 
677   /**
678    * Constructs an empty KeyValue structure, with specified sizes.
679    * This can be used to partially fill up KeyValues.
680    * <p>
681    * Column is split into two fields, family and qualifier.
682    * @param rlength row length
683    * @param flength family length
684    * @param qlength qualifier length
685    * @param timestamp version timestamp
686    * @param type key type
687    * @param vlength value length
688    * @throws IllegalArgumentException
689    */
690   public KeyValue(final int rlength,
691       final int flength,
692       final int qlength,
693       final long timestamp, final Type type,
694       final int vlength) {
695     this(rlength, flength, qlength, timestamp, type, vlength, 0);
696   }
697 
698   /**
699    * Constructs an empty KeyValue structure, with specified sizes.
700    * This can be used to partially fill up KeyValues.
701    * <p>
702    * Column is split into two fields, family and qualifier.
703    * @param rlength row length
704    * @param flength family length
705    * @param qlength qualifier length
706    * @param timestamp version timestamp
707    * @param type key type
708    * @param vlength value length
709    * @param tagsLength
710    * @throws IllegalArgumentException
711    */
712   public KeyValue(final int rlength,
713       final int flength,
714       final int qlength,
715       final long timestamp, final Type type,
716       final int vlength, final int tagsLength) {
717     this.bytes = createEmptyByteArray(rlength, flength, qlength, timestamp, type, vlength,
718         tagsLength);
719     this.length = bytes.length;
720     this.offset = 0;
721   }
722 
723 
724   public KeyValue(byte[] row, int roffset, int rlength,
725                   byte[] family, int foffset, int flength,
726                   ByteBuffer qualifier, long ts, Type type, ByteBuffer value, List<Tag> tags) {
727     this.bytes = createByteArray(row, roffset, rlength, family, foffset, flength,
728         qualifier, 0, qualifier == null ? 0 : qualifier.remaining(), ts, type,
729         value, 0, value == null ? 0 : value.remaining(), tags);
730     this.length = bytes.length;
731     this.offset = 0;
732   }
733 
734   public KeyValue(Cell c) {
735     this(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
736         c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), 
737         c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
738         c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), 
739         c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLengthUnsigned());
740   }
741 
742   /**
743    * Create an empty byte[] representing a KeyValue
744    * All lengths are preset and can be filled in later.
745    * @param rlength
746    * @param flength
747    * @param qlength
748    * @param timestamp
749    * @param type
750    * @param vlength
751    * @return The newly created byte array.
752    */
753   private static byte[] createEmptyByteArray(final int rlength, int flength,
754       int qlength, final long timestamp, final Type type, int vlength, int tagsLength) {
755     if (rlength > Short.MAX_VALUE) {
756       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
757     }
758     if (flength > Byte.MAX_VALUE) {
759       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
760     }
761     // Qualifier length
762     if (qlength > Integer.MAX_VALUE - rlength - flength) {
763       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
764     }
765     checkForTagsLength(tagsLength);
766     // Key length
767     long longkeylength = getKeyDataStructureSize(rlength, flength, qlength);
768     if (longkeylength > Integer.MAX_VALUE) {
769       throw new IllegalArgumentException("keylength " + longkeylength + " > " +
770         Integer.MAX_VALUE);
771     }
772     int keylength = (int)longkeylength;
773     // Value length
774     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
775       throw new IllegalArgumentException("Valuer > " +
776           HConstants.MAXIMUM_VALUE_LENGTH);
777     }
778 
779     // Allocate right-sized byte array.
780     byte[] bytes= new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
781         tagsLength)];
782     // Write the correct size markers
783     int pos = 0;
784     pos = Bytes.putInt(bytes, pos, keylength);
785     pos = Bytes.putInt(bytes, pos, vlength);
786     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
787     pos += rlength;
788     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
789     pos += flength + qlength;
790     pos = Bytes.putLong(bytes, pos, timestamp);
791     pos = Bytes.putByte(bytes, pos, type.getCode());
792     pos += vlength;
793     if (tagsLength > 0) {
794       pos = Bytes.putAsShort(bytes, pos, tagsLength);
795     }
796     return bytes;
797   }
798 
799   /**
800    * Checks the parameters passed to a constructor.
801    *
802    * @param row row key
803    * @param rlength row length
804    * @param family family name
805    * @param flength family length
806    * @param qlength qualifier length
807    * @param vlength value length
808    *
809    * @throws IllegalArgumentException an illegal value was passed
810    */
811   private static void checkParameters(final byte [] row, final int rlength,
812       final byte [] family, int flength, int qlength, int vlength)
813           throws IllegalArgumentException {
814     if (rlength > Short.MAX_VALUE) {
815       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
816     }
817     if (row == null) {
818       throw new IllegalArgumentException("Row is null");
819     }
820     // Family length
821     flength = family == null ? 0 : flength;
822     if (flength > Byte.MAX_VALUE) {
823       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
824     }
825     // Qualifier length
826     if (qlength > Integer.MAX_VALUE - rlength - flength) {
827       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
828     }
829     // Key length
830     long longKeyLength = getKeyDataStructureSize(rlength, flength, qlength);
831     if (longKeyLength > Integer.MAX_VALUE) {
832       throw new IllegalArgumentException("keylength " + longKeyLength + " > " +
833           Integer.MAX_VALUE);
834     }
835     // Value length
836     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
837       throw new IllegalArgumentException("Value length " + vlength + " > " +
838           HConstants.MAXIMUM_VALUE_LENGTH);
839     }
840   }
841 
842   /**
843    * Write KeyValue format into the provided byte array.
844    *
845    * @param buffer the bytes buffer to use
846    * @param boffset buffer offset
847    * @param row row key
848    * @param roffset row offset
849    * @param rlength row length
850    * @param family family name
851    * @param foffset family offset
852    * @param flength family length
853    * @param qualifier column qualifier
854    * @param qoffset qualifier offset
855    * @param qlength qualifier length
856    * @param timestamp version timestamp
857    * @param type key type
858    * @param value column value
859    * @param voffset value offset
860    * @param vlength value length
861    *
862    * @return The number of useful bytes in the buffer.
863    *
864    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
865    * remaining in the buffer
866    */
867   private static int writeByteArray(byte [] buffer, final int boffset,
868       final byte [] row, final int roffset, final int rlength,
869       final byte [] family, final int foffset, int flength,
870       final byte [] qualifier, final int qoffset, int qlength,
871       final long timestamp, final Type type,
872       final byte [] value, final int voffset, int vlength, Tag[] tags) {
873 
874     checkParameters(row, rlength, family, flength, qlength, vlength);
875 
876     // Calculate length of tags area
877     int tagsLength = 0;
878     if (tags != null && tags.length > 0) {
879       for (Tag t: tags) {
880         tagsLength += t.getLength();
881       }
882     }
883     checkForTagsLength(tagsLength);
884     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
885     int keyValueLength = (int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
886         tagsLength);
887     if (keyValueLength > buffer.length - boffset) {
888       throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < " +
889           keyValueLength);
890     }
891 
892     // Write key, value and key row length.
893     int pos = boffset;
894     pos = Bytes.putInt(buffer, pos, keyLength);
895     pos = Bytes.putInt(buffer, pos, vlength);
896     pos = Bytes.putShort(buffer, pos, (short)(rlength & 0x0000ffff));
897     pos = Bytes.putBytes(buffer, pos, row, roffset, rlength);
898     pos = Bytes.putByte(buffer, pos, (byte) (flength & 0x0000ff));
899     if (flength != 0) {
900       pos = Bytes.putBytes(buffer, pos, family, foffset, flength);
901     }
902     if (qlength != 0) {
903       pos = Bytes.putBytes(buffer, pos, qualifier, qoffset, qlength);
904     }
905     pos = Bytes.putLong(buffer, pos, timestamp);
906     pos = Bytes.putByte(buffer, pos, type.getCode());
907     if (value != null && value.length > 0) {
908       pos = Bytes.putBytes(buffer, pos, value, voffset, vlength);
909     }
910     // Write the number of tags. If it is 0 then it means there are no tags.
911     if (tagsLength > 0) {
912       pos = Bytes.putAsShort(buffer, pos, tagsLength);
913       for (Tag t : tags) {
914         pos = Bytes.putBytes(buffer, pos, t.getBuffer(), t.getOffset(), t.getLength());
915       }
916     }
917     return keyValueLength;
918   }
919 
920   private static void checkForTagsLength(int tagsLength) {
921     if (tagsLength > MAX_TAGS_LENGTH) {
922       throw new IllegalArgumentException("tagslength "+ tagsLength + " > " + MAX_TAGS_LENGTH);
923     }
924   }
925 
926   /**
927    * Write KeyValue format into a byte array.
928    * @param row row key
929    * @param roffset row offset
930    * @param rlength row length
931    * @param family family name
932    * @param foffset family offset
933    * @param flength family length
934    * @param qualifier column qualifier
935    * @param qoffset qualifier offset
936    * @param qlength qualifier length
937    * @param timestamp version timestamp
938    * @param type key type
939    * @param value column value
940    * @param voffset value offset
941    * @param vlength value length
942    * @return The newly created byte array.
943    */
944   private static byte [] createByteArray(final byte [] row, final int roffset,
945       final int rlength, final byte [] family, final int foffset, int flength,
946       final byte [] qualifier, final int qoffset, int qlength,
947       final long timestamp, final Type type,
948       final byte [] value, final int voffset, 
949       int vlength, byte[] tags, int tagsOffset, int tagsLength) {
950 
951     checkParameters(row, rlength, family, flength, qlength, vlength);
952     checkForTagsLength(tagsLength);
953     // Allocate right-sized byte array.
954     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
955     byte [] bytes =
956         new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, tagsLength)];
957     // Write key, value and key row length.
958     int pos = 0;
959     pos = Bytes.putInt(bytes, pos, keyLength);
960     pos = Bytes.putInt(bytes, pos, vlength);
961     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
962     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
963     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
964     if(flength != 0) {
965       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
966     }
967     if(qlength != 0) {
968       pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
969     }
970     pos = Bytes.putLong(bytes, pos, timestamp);
971     pos = Bytes.putByte(bytes, pos, type.getCode());
972     if (value != null && value.length > 0) {
973       pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
974     }
975     // Add the tags after the value part
976     if (tagsLength > 0) {
977       pos = Bytes.putAsShort(bytes, pos, tagsLength);
978       pos = Bytes.putBytes(bytes, pos, tags, tagsOffset, tagsLength);
979     }
980     return bytes;
981   }
982 
983   /**
984    * @param qualifier can be a ByteBuffer or a byte[], or null.
985    * @param value can be a ByteBuffer or a byte[], or null.
986    */
987   private static byte [] createByteArray(final byte [] row, final int roffset,
988       final int rlength, final byte [] family, final int foffset, int flength,
989       final Object qualifier, final int qoffset, int qlength,
990       final long timestamp, final Type type,
991       final Object value, final int voffset, int vlength, List<Tag> tags) {
992 
993     checkParameters(row, rlength, family, flength, qlength, vlength);
994 
995     // Calculate length of tags area
996     int tagsLength = 0;
997     if (tags != null && !tags.isEmpty()) {
998       for (Tag t : tags) {
999         tagsLength += t.getLength();
1000       }
1001     }
1002     checkForTagsLength(tagsLength);
1003     // Allocate right-sized byte array.
1004     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
1005     byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
1006         tagsLength)];
1007 
1008     // Write key, value and key row length.
1009     int pos = 0;
1010     pos = Bytes.putInt(bytes, pos, keyLength);
1011 
1012     pos = Bytes.putInt(bytes, pos, vlength);
1013     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
1014     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
1015     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
1016     if(flength != 0) {
1017       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
1018     }
1019     if (qlength > 0) {
1020       if (qualifier instanceof ByteBuffer) {
1021         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) qualifier);
1022       } else {
1023         pos = Bytes.putBytes(bytes, pos, (byte[]) qualifier, qoffset, qlength);
1024       }
1025     }
1026     pos = Bytes.putLong(bytes, pos, timestamp);
1027     pos = Bytes.putByte(bytes, pos, type.getCode());
1028     if (vlength > 0) {
1029       if (value instanceof ByteBuffer) {
1030         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) value);
1031       } else {
1032         pos = Bytes.putBytes(bytes, pos, (byte[]) value, voffset, vlength);
1033       }
1034     }
1035     // Add the tags after the value part
1036     if (tagsLength > 0) {
1037       pos = Bytes.putAsShort(bytes, pos, tagsLength);
1038       for (Tag t : tags) {
1039         pos = Bytes.putBytes(bytes, pos, t.getBuffer(), t.getOffset(), t.getLength());
1040       }
1041     }
1042     return bytes;
1043   }
1044 
1045   /**
1046    * Needed doing 'contains' on List.  Only compares the key portion, not the value.
1047    */
1048   @Override
1049   public boolean equals(Object other) {
1050     if (!(other instanceof Cell)) {
1051       return false;
1052     }
1053     return CellComparator.equals(this, (Cell)other);
1054   }
1055 
1056   @Override
1057   public int hashCode() {
1058     byte[] b = getBuffer();
1059     int start = getOffset(), end = getOffset() + getLength();
1060     int h = b[start++];
1061     for (int i = start; i < end; i++) {
1062       h = (h * 13) ^ b[i];
1063     }
1064     return h;
1065   }
1066 
1067   //---------------------------------------------------------------------------
1068   //
1069   //  KeyValue cloning
1070   //
1071   //---------------------------------------------------------------------------
1072 
1073   /**
1074    * Clones a KeyValue.  This creates a copy, re-allocating the buffer.
1075    * @return Fully copied clone of this KeyValue
1076    * @throws CloneNotSupportedException
1077    */
1078   @Override
1079   public KeyValue clone() throws CloneNotSupportedException {
1080     super.clone();
1081     byte [] b = new byte[this.length];
1082     System.arraycopy(this.bytes, this.offset, b, 0, this.length);
1083     KeyValue ret = new KeyValue(b, 0, b.length);
1084     // Important to clone the memstoreTS as well - otherwise memstore's
1085     // update-in-place methods (eg increment) will end up creating
1086     // new entries
1087     ret.setMvccVersion(mvcc);
1088     return ret;
1089   }
1090 
1091   /**
1092    * Creates a shallow copy of this KeyValue, reusing the data byte buffer.
1093    * http://en.wikipedia.org/wiki/Object_copy
1094    * @return Shallow copy of this KeyValue
1095    */
1096   public KeyValue shallowCopy() {
1097     KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
1098     shallowCopy.setMvccVersion(this.mvcc);
1099     return shallowCopy;
1100   }
1101 
1102   //---------------------------------------------------------------------------
1103   //
1104   //  String representation
1105   //
1106   //---------------------------------------------------------------------------
1107 
1108   public String toString() {
1109     if (this.bytes == null || this.bytes.length == 0) {
1110       return "empty";
1111     }
1112     return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
1113       "/vlen=" + getValueLength() + "/mvcc=" + mvcc;
1114   }
1115 
1116   /**
1117    * @param k Key portion of a KeyValue.
1118    * @return Key as a String, empty string if k is null. 
1119    */
1120   public static String keyToString(final byte [] k) {
1121     if (k == null) { 
1122       return "";
1123     }
1124     return keyToString(k, 0, k.length);
1125   }
1126 
1127   /**
1128    * Produces a string map for this key/value pair. Useful for programmatic use
1129    * and manipulation of the data stored in an HLogKey, for example, printing
1130    * as JSON. Values are left out due to their tendency to be large. If needed,
1131    * they can be added manually.
1132    *
1133    * @return the Map<String,?> containing data from this key
1134    */
1135   public Map<String, Object> toStringMap() {
1136     Map<String, Object> stringMap = new HashMap<String, Object>();
1137     stringMap.put("row", Bytes.toStringBinary(getRow()));
1138     stringMap.put("family", Bytes.toStringBinary(getFamily()));
1139     stringMap.put("qualifier", Bytes.toStringBinary(getQualifier()));
1140     stringMap.put("timestamp", getTimestamp());
1141     stringMap.put("vlen", getValueLength());
1142     List<Tag> tags = getTags();
1143     if (tags != null) {
1144       List<String> tagsString = new ArrayList<String>();
1145       for (Tag t : tags) {
1146         tagsString.add((t.getType()) + ":" +Bytes.toStringBinary(t.getValue()));
1147       }
1148       stringMap.put("tag", tagsString);
1149     }
1150     return stringMap;
1151   }
1152 
1153   /**
1154    * Use for logging.
1155    * @param b Key portion of a KeyValue.
1156    * @param o Offset to start of key
1157    * @param l Length of key.
1158    * @return Key as a String.
1159    */
1160   public static String keyToString(final byte [] b, final int o, final int l) {
1161     if (b == null) return "";
1162     int rowlength = Bytes.toShort(b, o);
1163     String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
1164     int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
1165     int familylength = b[columnoffset - 1];
1166     int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE);
1167     String family = familylength == 0? "":
1168       Bytes.toStringBinary(b, columnoffset, familylength);
1169     String qualifier = columnlength == 0? "":
1170       Bytes.toStringBinary(b, columnoffset + familylength,
1171       columnlength - familylength);
1172     long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE));
1173     String timestampStr = humanReadableTimestamp(timestamp);
1174     byte type = b[o + l - 1];
1175     return row + "/" + family +
1176       (family != null && family.length() > 0? ":" :"") +
1177       qualifier + "/" + timestampStr + "/" + Type.codeToType(type);
1178   }
1179 
1180   public static String humanReadableTimestamp(final long timestamp) {
1181     if (timestamp == HConstants.LATEST_TIMESTAMP) {
1182       return "LATEST_TIMESTAMP";
1183     }
1184     if (timestamp == HConstants.OLDEST_TIMESTAMP) {
1185       return "OLDEST_TIMESTAMP";
1186     }
1187     return String.valueOf(timestamp);
1188   }
1189 
1190   //---------------------------------------------------------------------------
1191   //
1192   //  Public Member Accessors
1193   //
1194   //---------------------------------------------------------------------------
1195 
1196   /**
1197    * @return The byte array backing this KeyValue.
1198    * @deprecated Since 0.98.0.  Use Cell Interface instead.  Do not presume single backing buffer.
1199    */
1200   @Deprecated
1201   public byte [] getBuffer() {
1202     return this.bytes;
1203   }
1204 
1205   /**
1206    * @return Offset into {@link #getBuffer()} at which this KeyValue starts.
1207    */
1208   public int getOffset() {
1209     return this.offset;
1210   }
1211 
1212   /**
1213    * @return Length of bytes this KeyValue occupies in {@link #getBuffer()}.
1214    */
1215   public int getLength() {
1216     return length;
1217   }
1218 
1219   //---------------------------------------------------------------------------
1220   //
1221   //  Length and Offset Calculators
1222   //
1223   //---------------------------------------------------------------------------
1224 
1225   /**
1226    * Determines the total length of the KeyValue stored in the specified
1227    * byte array and offset.  Includes all headers.
1228    * @param bytes byte array
1229    * @param offset offset to start of the KeyValue
1230    * @return length of entire KeyValue, in bytes
1231    */
1232   private static int getLength(byte [] bytes, int offset) {
1233     int klength = ROW_OFFSET + Bytes.toInt(bytes, offset);
1234     int vlength = Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
1235     return klength + vlength;
1236   }
1237 
1238   /**
1239    * @return Key offset in backing buffer..
1240    */
1241   public int getKeyOffset() {
1242     return this.offset + ROW_OFFSET;
1243   }
1244 
1245   public String getKeyString() {
1246     return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength());
1247   }
1248 
1249   /**
1250    * @return Length of key portion.
1251    */
1252   public int getKeyLength() {
1253     return Bytes.toInt(this.bytes, this.offset);
1254   }
1255 
1256   /**
1257    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1258    */
1259   @Override
1260   public byte[] getValueArray() {
1261     return bytes;
1262   }
1263 
1264   /**
1265    * @return the value offset
1266    */
1267   @Override
1268   public int getValueOffset() {
1269     int voffset = getKeyOffset() + getKeyLength();
1270     return voffset;
1271   }
1272 
1273   /**
1274    * @return Value length
1275    */
1276   @Override
1277   public int getValueLength() {
1278     int vlength = Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT);
1279     return vlength;
1280   }
1281 
1282   /**
1283    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1284    */
1285   @Override
1286   public byte[] getRowArray() {
1287     return bytes;
1288   }
1289 
1290   /**
1291    * @return Row offset
1292    */
1293   @Override
1294   public int getRowOffset() {
1295     return getKeyOffset() + Bytes.SIZEOF_SHORT;
1296   }
1297 
1298   /**
1299    * @return Row length
1300    */
1301   @Override
1302   public short getRowLength() {
1303     return Bytes.toShort(this.bytes, getKeyOffset());
1304   }
1305 
1306   /**
1307    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1308    */
1309   @Override
1310   public byte[] getFamilyArray() {
1311     return bytes;
1312   }
1313 
1314   /**
1315    * @return Family offset
1316    */
1317   @Override
1318   public int getFamilyOffset() {
1319     return getFamilyOffset(getRowLength());
1320   }
1321 
1322   /**
1323    * @return Family offset
1324    */
1325   private int getFamilyOffset(int rlength) {
1326     return this.offset + ROW_OFFSET + Bytes.SIZEOF_SHORT + rlength + Bytes.SIZEOF_BYTE;
1327   }
1328 
1329   /**
1330    * @return Family length
1331    */
1332   @Override
1333   public byte getFamilyLength() {
1334     return getFamilyLength(getFamilyOffset());
1335   }
1336 
1337   /**
1338    * @return Family length
1339    */
1340   public byte getFamilyLength(int foffset) {
1341     return this.bytes[foffset-1];
1342   }
1343 
1344   /**
1345    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1346    */
1347   @Override
1348   public byte[] getQualifierArray() {
1349     return bytes;
1350   }
1351 
1352   /**
1353    * @return Qualifier offset
1354    */
1355   @Override
1356   public int getQualifierOffset() {
1357     return getQualifierOffset(getFamilyOffset());
1358   }
1359 
1360   /**
1361    * @return Qualifier offset
1362    */
1363   private int getQualifierOffset(int foffset) {
1364     return foffset + getFamilyLength(foffset);
1365   }
1366 
1367   /**
1368    * @return Qualifier length
1369    */
1370   @Override
1371   public int getQualifierLength() {
1372     return getQualifierLength(getRowLength(),getFamilyLength());
1373   }
1374 
1375   /**
1376    * @return Qualifier length
1377    */
1378   private int getQualifierLength(int rlength, int flength) {
1379     return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
1380   }
1381 
1382   /**
1383    * @return Column (family + qualifier) length
1384    */
1385   private int getTotalColumnLength(int rlength, int foffset) {
1386     int flength = getFamilyLength(foffset);
1387     int qlength = getQualifierLength(rlength,flength);
1388     return flength + qlength;
1389   }
1390 
1391   /**
1392    * @return Timestamp offset
1393    */
1394   public int getTimestampOffset() {
1395     return getTimestampOffset(getKeyLength());
1396   }
1397 
1398   /**
1399    * @param keylength Pass if you have it to save on a int creation.
1400    * @return Timestamp offset
1401    */
1402   private int getTimestampOffset(final int keylength) {
1403     return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE;
1404   }
1405 
1406   /**
1407    * @return True if this KeyValue has a LATEST_TIMESTAMP timestamp.
1408    */
1409   public boolean isLatestTimestamp() {
1410     return Bytes.equals(getBuffer(), getTimestampOffset(), Bytes.SIZEOF_LONG,
1411       HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG);
1412   }
1413 
1414   /**
1415    * @param now Time to set into <code>this</code> IFF timestamp ==
1416    * {@link HConstants#LATEST_TIMESTAMP} (else, its a noop).
1417    * @return True is we modified this.
1418    */
1419   public boolean updateLatestStamp(final byte [] now) {
1420     if (this.isLatestTimestamp()) {
1421       int tsOffset = getTimestampOffset();
1422       System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
1423       // clear cache or else getTimestamp() possibly returns an old value
1424       return true;
1425     }
1426     return false;
1427   }
1428 
1429   //---------------------------------------------------------------------------
1430   //
1431   //  Methods that return copies of fields
1432   //
1433   //---------------------------------------------------------------------------
1434 
1435   /**
1436    * Do not use unless you have to.  Used internally for compacting and testing.
1437    *
1438    * Use {@link #getRow()}, {@link #getFamily()}, {@link #getQualifier()}, and
1439    * {@link #getValue()} if accessing a KeyValue client-side.
1440    * @return Copy of the key portion only.
1441    */
1442   public byte [] getKey() {
1443     int keylength = getKeyLength();
1444     byte [] key = new byte[keylength];
1445     System.arraycopy(getBuffer(), getKeyOffset(), key, 0, keylength);
1446     return key;
1447   }
1448 
1449   /**
1450    * Returns value in a new byte array.
1451    * Primarily for use client-side. If server-side, use
1452    * {@link #getBuffer()} with appropriate offsets and lengths instead to
1453    * save on allocations.
1454    * @return Value in a new byte array.
1455    */
1456   @Deprecated // use CellUtil.getValueArray()
1457   public byte [] getValue() {
1458     return CellUtil.cloneValue(this);
1459   }
1460 
1461   /**
1462    * Primarily for use client-side.  Returns the row of this KeyValue in a new
1463    * byte array.<p>
1464    *
1465    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1466    * lengths instead.
1467    * @return Row in a new byte array.
1468    */
1469   @Deprecated // use CellUtil.getRowArray()
1470   public byte [] getRow() {
1471     return CellUtil.cloneRow(this);
1472   }
1473 
1474   /**
1475    *
1476    * @return Timestamp
1477    */
1478   @Override
1479   public long getTimestamp() {
1480     return getTimestamp(getKeyLength());
1481   }
1482 
1483   /**
1484    * @param keylength Pass if you have it to save on a int creation.
1485    * @return Timestamp
1486    */
1487   long getTimestamp(final int keylength) {
1488     int tsOffset = getTimestampOffset(keylength);
1489     return Bytes.toLong(this.bytes, tsOffset);
1490   }
1491 
1492   /**
1493    * @return Type of this KeyValue.
1494    */
1495   @Deprecated
1496   public byte getType() {
1497     return getTypeByte();
1498   }
1499 
1500   /**
1501    * @return KeyValue.TYPE byte representation
1502    */
1503   @Override
1504   public byte getTypeByte() {
1505     return this.bytes[this.offset + getKeyLength() - 1 + ROW_OFFSET];
1506   }
1507 
1508   /**
1509    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
1510    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
1511    * KeyValue type.
1512    */
1513   @Deprecated // use CellUtil#isDelete
1514   public boolean isDelete() {
1515     return KeyValue.isDelete(getType());
1516   }
1517 
1518   /**
1519    * @return True if this KV is a {@link KeyValue.Type#Delete} type.
1520    */
1521   public boolean isDeleteType() {
1522     // TODO: Fix this method name vis-a-vis isDelete!
1523     return getTypeByte() == Type.Delete.getCode();
1524   }
1525 
1526   /**
1527    * @return True if this KV is a delete family type.
1528    */
1529   public boolean isDeleteFamily() {
1530     return getTypeByte() == Type.DeleteFamily.getCode();
1531   }
1532 
1533   /**
1534    * @return True if this KV is a delete family-version type.
1535    */
1536   public boolean isDeleteFamilyVersion() {
1537     return getTypeByte() == Type.DeleteFamilyVersion.getCode();
1538   }
1539 
1540   /**
1541    *
1542    * @return True if this KV is a delete family or column type.
1543    */
1544   public boolean isDeleteColumnOrFamily() {
1545     int t = getTypeByte();
1546     return t == Type.DeleteColumn.getCode() || t == Type.DeleteFamily.getCode();
1547   }
1548 
1549   /**
1550    * Primarily for use client-side.  Returns the family of this KeyValue in a
1551    * new byte array.<p>
1552    *
1553    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1554    * lengths instead.
1555    * @return Returns family. Makes a copy.
1556    */
1557   @Deprecated // use CellUtil.getFamilyArray
1558   public byte [] getFamily() {
1559     return CellUtil.cloneFamily(this);
1560   }
1561 
1562   /**
1563    * Primarily for use client-side.  Returns the column qualifier of this
1564    * KeyValue in a new byte array.<p>
1565    *
1566    * If server-side, use {@link #getBuffer()} with appropriate offsets and
1567    * lengths instead.
1568    * Use {@link #getBuffer()} with appropriate offsets and lengths instead.
1569    * @return Returns qualifier. Makes a copy.
1570    */
1571   @Deprecated // use CellUtil.getQualifierArray
1572   public byte [] getQualifier() {
1573     return CellUtil.cloneQualifier(this);
1574   }
1575 
1576   /**
1577    * This returns the offset where the tag actually starts.
1578    */
1579   @Override
1580   public int getTagsOffset() {
1581     int tagsLen = getTagsLengthUnsigned();
1582     if (tagsLen == 0) {
1583       return this.offset + this.length;
1584     }
1585     return this.offset + this.length - tagsLen;
1586   }
1587 
1588   /**
1589    * This returns the total length of the tag bytes
1590    */
1591   @Override
1592   @Deprecated
1593   public int getTagsLengthUnsigned() {
1594     int tagsLen = this.length - (getKeyLength() + getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE);
1595     if (tagsLen > 0) {
1596       // There are some Tag bytes in the byte[]. So reduce 2 bytes which is added to denote the tags
1597       // length
1598       tagsLen -= TAGS_LENGTH_SIZE;
1599     }
1600     return tagsLen;
1601   }
1602 
1603   @Override
1604   @Deprecated
1605   public short getTagsLength() {
1606     return (short) getTagsLengthUnsigned();
1607   }
1608 
1609   /**
1610    * Returns any tags embedded in the KeyValue.  Used in testcases.
1611    * @return The tags
1612    */
1613   public List<Tag> getTags() {
1614     int tagsLength = getTagsLengthUnsigned();
1615     if (tagsLength == 0) {
1616       return EMPTY_ARRAY_LIST;
1617     }
1618     return Tag.asList(getBuffer(), getTagsOffset(), tagsLength);
1619   }
1620 
1621   /**
1622    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1623    */
1624   @Override
1625   public byte[] getTagsArray() {
1626     return bytes;
1627   }
1628 
1629   //---------------------------------------------------------------------------
1630   //
1631   //  Compare specified fields against those contained in this KeyValue
1632   //
1633   //---------------------------------------------------------------------------
1634 
1635   /**
1636    * @param family
1637    * @return True if matching families.
1638    */
1639   public boolean matchingFamily(final byte [] family) {
1640     if (this.length == 0 || this.bytes.length == 0) {
1641       return false;
1642     }
1643     return Bytes.equals(family, 0, family.length,
1644         this.bytes, getFamilyOffset(), getFamilyLength());
1645   }
1646 
1647   /**
1648    * @param qualifier
1649    * @return True if matching qualifiers.
1650    */
1651   public boolean matchingQualifier(final byte [] qualifier) {
1652     return matchingQualifier(qualifier, 0, qualifier.length);
1653   }
1654 
1655   public boolean matchingQualifier(final byte [] qualifier, int offset, int length) {
1656     return Bytes.equals(qualifier, offset, length,
1657         this.bytes, getQualifierOffset(), getQualifierLength());
1658   }
1659 
1660   public boolean matchingQualifier(final KeyValue other) {
1661     return matchingQualifier(other.getBuffer(), other.getQualifierOffset(),
1662         other.getQualifierLength());
1663   }
1664 
1665   public boolean matchingRow(final byte [] row) {
1666     return matchingRow(row, 0, row.length);
1667   }
1668 
1669   public boolean matchingRow(final byte[] row, int offset, int length) {
1670     return Bytes.equals(row, offset, length,
1671         this.bytes, getRowOffset(), getRowLength());
1672   }
1673 
1674   public boolean matchingRow(KeyValue other) {
1675     return matchingRow(other.getBuffer(), other.getRowOffset(),
1676         other.getRowLength());
1677   }
1678 
1679   /**
1680    *
1681    * @param family column family
1682    * @param qualifier column qualifier
1683    * @return True if column matches
1684    */
1685   public boolean matchingColumn(final byte[] family, final byte[] qualifier) {
1686     return matchingColumn(family, 0, len(family), qualifier, 0, len(qualifier));
1687   }
1688 
1689   /**
1690    * Checks if column matches.
1691    *
1692    * @param family family name
1693    * @param foffset family offset
1694    * @param flength family length
1695    * @param qualifier column qualifier
1696    * @param qoffset qualifier offset
1697    * @param qlength qualifier length
1698    *
1699    * @return True if column matches
1700    */
1701   public boolean matchingColumn(final byte [] family, final int foffset, final int flength,
1702       final byte [] qualifier, final int qoffset, final int qlength) {
1703     int rl = getRowLength();
1704     int o = getFamilyOffset(rl);
1705     int fl = getFamilyLength(o);
1706     if (!Bytes.equals(family, foffset, flength, this.bytes, o, fl)) {
1707       return false;
1708     }
1709 
1710     int ql = getQualifierLength(rl, fl);
1711     if (qualifier == null || qlength == 0) {
1712       return (ql == 0);
1713     }
1714     return Bytes.equals(qualifier, qoffset, qlength, this.bytes, o + fl, ql);
1715   }
1716 
1717   /**
1718    * Creates a new KeyValue that only contains the key portion (the value is
1719    * set to be null).
1720    *
1721    * TODO only used by KeyOnlyFilter -- move there.
1722    * @param lenAsVal replace value with the actual value length (false=empty)
1723    */
1724   public KeyValue createKeyOnly(boolean lenAsVal) {
1725     // KV format:  <keylen:4><valuelen:4><key:keylen><value:valuelen>
1726     // Rebuild as: <keylen:4><0:4><key:keylen>
1727     int dataLen = lenAsVal? Bytes.SIZEOF_INT : 0;
1728     byte [] newBuffer = new byte[getKeyLength() + ROW_OFFSET + dataLen];
1729     System.arraycopy(this.bytes, this.offset, newBuffer, 0,
1730         Math.min(newBuffer.length,this.length));
1731     Bytes.putInt(newBuffer, Bytes.SIZEOF_INT, dataLen);
1732     if (lenAsVal) {
1733       Bytes.putInt(newBuffer, newBuffer.length - dataLen, this.getValueLength());
1734     }
1735     return new KeyValue(newBuffer);
1736   }
1737 
1738   /**
1739    * Splits a column in {@code family:qualifier} form into separate byte arrays. An empty qualifier
1740    * (ie, {@code fam:}) is parsed as <code>{ fam, EMPTY_BYTE_ARRAY }</code> while no delimiter (ie,
1741    * {@code fam}) is parsed as an array of one element, <code>{ fam }</code>.
1742    * <p>
1743    * Don't forget, HBase DOES support empty qualifiers. (see HBASE-9549)
1744    * </p>
1745    * <p>
1746    * Not recommend to be used as this is old-style API.
1747    * </p>
1748    * @param c The column.
1749    * @return The parsed column.
1750    */
1751   public static byte [][] parseColumn(byte [] c) {
1752     final int index = getDelimiter(c, 0, c.length, COLUMN_FAMILY_DELIMITER);
1753     if (index == -1) {
1754       // If no delimiter, return array of size 1
1755       return new byte [][] { c };
1756     } else if(index == c.length - 1) {
1757       // family with empty qualifier, return array size 2
1758       byte [] family = new byte[c.length-1];
1759       System.arraycopy(c, 0, family, 0, family.length);
1760       return new byte [][] { family, HConstants.EMPTY_BYTE_ARRAY};
1761     }
1762     // Family and column, return array size 2
1763     final byte [][] result = new byte [2][];
1764     result[0] = new byte [index];
1765     System.arraycopy(c, 0, result[0], 0, index);
1766     final int len = c.length - (index + 1);
1767     result[1] = new byte[len];
1768     System.arraycopy(c, index + 1 /* Skip delimiter */, result[1], 0, len);
1769     return result;
1770   }
1771 
1772   /**
1773    * Makes a column in family:qualifier form from separate byte arrays.
1774    * <p>
1775    * Not recommended for usage as this is old-style API.
1776    * @param family
1777    * @param qualifier
1778    * @return family:qualifier
1779    */
1780   public static byte [] makeColumn(byte [] family, byte [] qualifier) {
1781     return Bytes.add(family, COLUMN_FAMILY_DELIM_ARRAY, qualifier);
1782   }
1783 
1784   /**
1785    * This function is only used in Meta key comparisons so its error message
1786    * is specific for meta key errors.
1787    */
1788   static int getRequiredDelimiterInReverse(final byte [] b,
1789       final int offset, final int length, final int delimiter) {
1790     int index = getDelimiterInReverse(b, offset, length, delimiter);
1791     if (index < 0) {
1792       throw new IllegalArgumentException("hbase:meta key must have two '" + (char)delimiter + "' "
1793         + "delimiters and have the following format: '<table>,<key>,<etc>'");
1794     }
1795     return index;
1796   }
1797 
1798   /**
1799    * @param b
1800    * @param delimiter
1801    * @return Index of delimiter having started from start of <code>b</code>
1802    * moving rightward.
1803    */
1804   public static int getDelimiter(final byte [] b, int offset, final int length,
1805       final int delimiter) {
1806     if (b == null) {
1807       throw new IllegalArgumentException("Passed buffer is null");
1808     }
1809     int result = -1;
1810     for (int i = offset; i < length + offset; i++) {
1811       if (b[i] == delimiter) {
1812         result = i;
1813         break;
1814       }
1815     }
1816     return result;
1817   }
1818 
1819   /**
1820    * Find index of passed delimiter walking from end of buffer backwards.
1821    * @param b
1822    * @param delimiter
1823    * @return Index of delimiter
1824    */
1825   public static int getDelimiterInReverse(final byte [] b, final int offset,
1826       final int length, final int delimiter) {
1827     if (b == null) {
1828       throw new IllegalArgumentException("Passed buffer is null");
1829     }
1830     int result = -1;
1831     for (int i = (offset + length) - 1; i >= offset; i--) {
1832       if (b[i] == delimiter) {
1833         result = i;
1834         break;
1835       }
1836     }
1837     return result;
1838   }
1839 
1840   /**
1841    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
1842    * {@link KeyValue}s.
1843    */
1844   public static class MetaComparator extends KVComparator {
1845     /**
1846      * Compare key portion of a {@link KeyValue} for keys in <code>hbase:meta</code>
1847      * table.
1848      */
1849     @Override
1850     public int compareRows(byte [] left, int loffset, int llength,
1851         byte [] right, int roffset, int rlength) {
1852       int leftDelimiter = getDelimiter(left, loffset, llength,
1853           HConstants.DELIMITER);
1854       int rightDelimiter = getDelimiter(right, roffset, rlength,
1855           HConstants.DELIMITER);
1856       if (leftDelimiter < 0 && rightDelimiter >= 0) {
1857         // Nothing between hbase:meta and regionid.  Its first key.
1858         return -1;
1859       } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1860         return 1;
1861       } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1862         return 0;
1863       }
1864       // Compare up to the delimiter
1865       int result = Bytes.compareTo(left, loffset, leftDelimiter - loffset,
1866           right, roffset, rightDelimiter - roffset);
1867       if (result != 0) {
1868         return result;
1869       }
1870       // Compare middle bit of the row.
1871       // Move past delimiter
1872       leftDelimiter++;
1873       rightDelimiter++;
1874       int leftFarDelimiter = getRequiredDelimiterInReverse(left, leftDelimiter,
1875           llength - (leftDelimiter - loffset), HConstants.DELIMITER);
1876       int rightFarDelimiter = getRequiredDelimiterInReverse(right,
1877           rightDelimiter, rlength - (rightDelimiter - roffset),
1878           HConstants.DELIMITER);
1879       // Now compare middlesection of row.
1880       result = super.compareRows(left, leftDelimiter,
1881           leftFarDelimiter - leftDelimiter, right, rightDelimiter,
1882           rightFarDelimiter - rightDelimiter);
1883       if (result != 0) {
1884         return result;
1885       }
1886       // Compare last part of row, the rowid.
1887       leftFarDelimiter++;
1888       rightFarDelimiter++;
1889       result = Bytes.compareTo(left, leftFarDelimiter, llength - (leftFarDelimiter - loffset),
1890           right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
1891       return result;
1892     }
1893 
1894     /**
1895      * Don't do any fancy Block Index splitting tricks.
1896      */
1897     @Override
1898     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
1899       return Arrays.copyOf(rightKey, rightKey.length);
1900     }
1901 
1902     /**
1903      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1904      * instantiate the appropriate comparator.
1905      * TODO: With V3 consider removing this.
1906      * @return legacy class name for FileFileTrailer#comparatorClassName
1907      */
1908     @Override
1909     public String getLegacyKeyComparatorName() {
1910       return "org.apache.hadoop.hbase.KeyValue$MetaKeyComparator";
1911     }
1912 
1913     @Override
1914     protected Object clone() throws CloneNotSupportedException {
1915       return new MetaComparator();
1916     }
1917 
1918     /**
1919      * Override the row key comparison to parse and compare the meta row key parts.
1920      */
1921     @Override
1922     protected int compareRowKey(final Cell l, final Cell r) {
1923       byte[] left = l.getRowArray();
1924       int loffset = l.getRowOffset();
1925       int llength = l.getRowLength();
1926       byte[] right = r.getRowArray();
1927       int roffset = r.getRowOffset();
1928       int rlength = r.getRowLength();
1929       return compareRows(left, loffset, llength, right, roffset, rlength);
1930     }
1931   }
1932 
1933   /**
1934    * Compare KeyValues.  When we compare KeyValues, we only compare the Key
1935    * portion.  This means two KeyValues with same Key but different Values are
1936    * considered the same as far as this Comparator is concerned.
1937    */
1938   public static class KVComparator implements RawComparator<Cell>, SamePrefixComparator<byte[]> {
1939 
1940     /**
1941      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1942      * instantiate the appropriate comparator.
1943      * TODO: With V3 consider removing this.
1944      * @return legacy class name for FileFileTrailer#comparatorClassName
1945      */
1946     public String getLegacyKeyComparatorName() {
1947       return "org.apache.hadoop.hbase.KeyValue$KeyComparator";
1948     }
1949 
1950     @Override // RawComparator
1951     public int compare(byte[] l, int loff, int llen, byte[] r, int roff, int rlen) {
1952       return compareFlatKey(l,loff,llen, r,roff,rlen);
1953     }
1954 
1955     
1956     /**
1957      * Compares the only the user specified portion of a Key.  This is overridden by MetaComparator.
1958      * @param left
1959      * @param right
1960      * @return 0 if equal, <0 if left smaller, >0 if right smaller
1961      */
1962     protected int compareRowKey(final Cell left, final Cell right) {
1963       return Bytes.compareTo(
1964           left.getRowArray(),  left.getRowOffset(),  left.getRowLength(),
1965           right.getRowArray(), right.getRowOffset(), right.getRowLength());
1966     }
1967 
1968     /**
1969      * Compares left to right assuming that left,loffset,llength and right,roffset,rlength are
1970      * full KVs laid out in a flat byte[]s.
1971      * @param left
1972      * @param loffset
1973      * @param llength
1974      * @param right
1975      * @param roffset
1976      * @param rlength
1977      * @return  0 if equal, <0 if left smaller, >0 if right smaller
1978      */
1979     public int compareFlatKey(byte[] left, int loffset, int llength,
1980         byte[] right, int roffset, int rlength) {
1981       // Compare row
1982       short lrowlength = Bytes.toShort(left, loffset);
1983       short rrowlength = Bytes.toShort(right, roffset);
1984       int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT,
1985           lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
1986       if (compare != 0) {
1987         return compare;
1988       }
1989 
1990       // Compare the rest of the two KVs without making any assumptions about
1991       // the common prefix. This function will not compare rows anyway, so we
1992       // don't need to tell it that the common prefix includes the row.
1993       return compareWithoutRow(0, left, loffset, llength, right, roffset,
1994           rlength, rrowlength);
1995     }
1996 
1997     public int compareFlatKey(byte[] left, byte[] right) {
1998       return compareFlatKey(left, 0, left.length, right, 0, right.length);
1999     }
2000 
2001     /**
2002      * Compares the Key of a cell -- with fields being more significant in this order:
2003      * rowkey, colfam/qual, timestamp, type, mvcc
2004      */
2005     public int compare(final Cell left, final Cell right) {
2006       // compare row
2007       int compare = compareRowKey(left, right);
2008       if (compare != 0) {
2009         return compare;
2010       }
2011 
2012       // compare vs minimum
2013       byte ltype = left.getTypeByte();
2014       byte rtype = right.getTypeByte();
2015       // If the column is not specified, the "minimum" key type appears the
2016       // latest in the sorted order, regardless of the timestamp. This is used
2017       // for specifying the last key/value in a given row, because there is no
2018       // "lexicographically last column" (it would be infinitely long). The
2019       // "maximum" key type does not need this behavior.
2020       int lcfqLen = left.getFamilyLength() + left.getQualifierLength() ;
2021       int rcfqLen = right.getFamilyLength() + right.getQualifierLength() ;
2022       if (lcfqLen == 0 && ltype == Type.Minimum.getCode()) {
2023         // left is "bigger", i.e. it appears later in the sorted order
2024         return 1;
2025       }
2026       if (rcfqLen == 0 && rtype == Type.Minimum.getCode()) {
2027         return -1;
2028       }
2029 
2030 
2031       // compare col family / col fam + qual
2032       // If left family size is not equal to right family size, we need not
2033       // compare the qualifiers.
2034       compare = Bytes.compareTo(
2035         left.getFamilyArray(),  left.getFamilyOffset(),  left.getFamilyLength(),
2036         right.getFamilyArray(), right.getFamilyOffset(), right.getFamilyLength());
2037       if (compare != 0) {
2038         return compare;
2039       }
2040 
2041       // Compare qualifier
2042       compare = Bytes.compareTo(
2043           left.getQualifierArray(), left.getQualifierOffset(), left.getQualifierLength(),
2044           right.getQualifierArray(), right.getQualifierOffset(), right.getQualifierLength());
2045       if (compare!= 0) {
2046         return compare;
2047       }
2048 
2049       // compare timestamp
2050       long ltimestamp = left.getTimestamp();
2051       long rtimestamp = right.getTimestamp();
2052       compare = compareTimestamps(ltimestamp, rtimestamp);
2053       if (compare != 0) {
2054         return compare;
2055       }
2056 
2057       // Compare types. Let the delete types sort ahead of puts; i.e. types
2058       // of higher numbers sort before those of lesser numbers. Maximum (255)
2059       // appears ahead of everything, and minimum (0) appears after
2060       // everything.
2061       compare = (0xff & rtype) - (0xff & ltype);
2062       if (compare != 0) {
2063         return compare;
2064       }
2065 
2066       // Negate following comparisons so later edits show up first
2067 
2068       // compare log replay tag value if there is any
2069       // when either keyvalue tagged with log replay sequence number, we need to compare them:
2070       // 1) when both keyvalues have the tag, then use the tag values for comparison
2071       // 2) when one has and the other doesn't have, the one without the log replay tag wins because
2072       // it means the edit isn't from recovery but new one coming from clients during recovery
2073       // 3) when both doesn't have, then skip to the next mvcc comparison
2074       long leftChangeSeqNum = getReplaySeqNum(left);
2075       long RightChangeSeqNum = getReplaySeqNum(right);
2076       if (leftChangeSeqNum != Long.MAX_VALUE || RightChangeSeqNum != Long.MAX_VALUE) {
2077         return Longs.compare(RightChangeSeqNum, leftChangeSeqNum);
2078       }
2079 
2080       // compare Mvcc Version
2081       return Longs.compare(right.getMvccVersion(), left.getMvccVersion());
2082     }
2083     
2084     /**
2085      * Return replay log sequence number for the cell
2086      * @param c
2087      * @return Long.MAX_VALUE if there is no LOG_REPLAY_TAG
2088      */
2089     private long getReplaySeqNum(final Cell c) {
2090       Tag tag = Tag.getTag(c.getTagsArray(), c.getTagsOffset(), c.getTagsLengthUnsigned(),
2091           TagType.LOG_REPLAY_TAG_TYPE);
2092 
2093       if(tag != null) {
2094         return Bytes.toLong(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
2095       }
2096       return Long.MAX_VALUE;
2097     }
2098 
2099     public int compareTimestamps(final KeyValue left, final KeyValue right) {
2100       // Compare timestamps
2101       long ltimestamp = left.getTimestamp(left.getKeyLength());
2102       long rtimestamp = right.getTimestamp(right.getKeyLength());
2103       return compareTimestamps(ltimestamp, rtimestamp);
2104     }
2105 
2106     /**
2107      * @param left
2108      * @param right
2109      * @return Result comparing rows.
2110      */
2111     public int compareRows(final KeyValue left, final KeyValue right) {
2112       return compareRows(left.getBuffer(),left.getRowOffset(), left.getRowLength(),
2113       right.getBuffer(), right.getRowOffset(), right.getRowLength());
2114     }
2115 
2116     /**
2117      * Get the b[],o,l for left and right rowkey portions and compare.
2118      * @param left
2119      * @param loffset
2120      * @param llength
2121      * @param right
2122      * @param roffset
2123      * @param rlength
2124      * @return 0 if equal, <0 if left smaller, >0 if right smaller
2125      */
2126     public int compareRows(byte [] left, int loffset, int llength,
2127         byte [] right, int roffset, int rlength) {
2128       return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
2129     }
2130 
2131     int compareColumns(final KeyValue left, final short lrowlength,
2132         final KeyValue right, final short rrowlength) {
2133       int lfoffset = left.getFamilyOffset(lrowlength);
2134       int rfoffset = right.getFamilyOffset(rrowlength);
2135       int lclength = left.getTotalColumnLength(lrowlength,lfoffset);
2136       int rclength = right.getTotalColumnLength(rrowlength, rfoffset);
2137       int lfamilylength = left.getFamilyLength(lfoffset);
2138       int rfamilylength = right.getFamilyLength(rfoffset);
2139       return compareColumns(left.getBuffer(), lfoffset,
2140           lclength, lfamilylength,
2141         right.getBuffer(), rfoffset, rclength, rfamilylength);
2142     }
2143 
2144     protected int compareColumns(
2145         byte [] left, int loffset, int llength, final int lfamilylength,
2146         byte [] right, int roffset, int rlength, final int rfamilylength) {
2147       // Compare family portion first.
2148       int diff = Bytes.compareTo(left, loffset, lfamilylength,
2149         right, roffset, rfamilylength);
2150       if (diff != 0) {
2151         return diff;
2152       }
2153       // Compare qualifier portion
2154       return Bytes.compareTo(left, loffset + lfamilylength,
2155         llength - lfamilylength,
2156         right, roffset + rfamilylength, rlength - rfamilylength);
2157       }
2158 
2159     static int compareTimestamps(final long ltimestamp, final long rtimestamp) {
2160       // The below older timestamps sorting ahead of newer timestamps looks
2161       // wrong but it is intentional. This way, newer timestamps are first
2162       // found when we iterate over a memstore and newer versions are the
2163       // first we trip over when reading from a store file.
2164       if (ltimestamp < rtimestamp) {
2165         return 1;
2166       } else if (ltimestamp > rtimestamp) {
2167         return -1;
2168       }
2169       return 0;
2170     }
2171 
2172     /**
2173      * Overridden
2174      * @param commonPrefix
2175      * @param left
2176      * @param loffset
2177      * @param llength
2178      * @param right
2179      * @param roffset
2180      * @param rlength
2181      * @return 0 if equal, <0 if left smaller, >0 if right smaller
2182      */
2183     @Override // SamePrefixComparator
2184     public int compareIgnoringPrefix(int commonPrefix, byte[] left,
2185         int loffset, int llength, byte[] right, int roffset, int rlength) {
2186       // Compare row
2187       short lrowlength = Bytes.toShort(left, loffset);
2188       short rrowlength;
2189 
2190       int comparisonResult = 0;
2191       if (commonPrefix < ROW_LENGTH_SIZE) {
2192         // almost nothing in common
2193         rrowlength = Bytes.toShort(right, roffset);
2194         comparisonResult = compareRows(left, loffset + ROW_LENGTH_SIZE,
2195             lrowlength, right, roffset + ROW_LENGTH_SIZE, rrowlength);
2196       } else { // the row length is the same
2197         rrowlength = lrowlength;
2198         if (commonPrefix < ROW_LENGTH_SIZE + rrowlength) {
2199           // The rows are not the same. Exclude the common prefix and compare
2200           // the rest of the two rows.
2201           int common = commonPrefix - ROW_LENGTH_SIZE;
2202           comparisonResult = compareRows(
2203               left, loffset + common + ROW_LENGTH_SIZE, lrowlength - common,
2204               right, roffset + common + ROW_LENGTH_SIZE, rrowlength - common);
2205         }
2206       }
2207       if (comparisonResult != 0) {
2208         return comparisonResult;
2209       }
2210 
2211       assert lrowlength == rrowlength;
2212       return compareWithoutRow(commonPrefix, left, loffset, llength, right,
2213           roffset, rlength, lrowlength);
2214     }
2215 
2216     /**
2217      * Compare columnFamily, qualifier, timestamp, and key type (everything
2218      * except the row). This method is used both in the normal comparator and
2219      * the "same-prefix" comparator. Note that we are assuming that row portions
2220      * of both KVs have already been parsed and found identical, and we don't
2221      * validate that assumption here.
2222      * @param commonPrefix
2223      *          the length of the common prefix of the two key-values being
2224      *          compared, including row length and row
2225      */
2226     private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
2227         int llength, byte[] right, int roffset, int rlength, short rowlength) {
2228       /***
2229        * KeyValue Format and commonLength:
2230        * |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
2231        * ------------------|-------commonLength--------|--------------
2232        */
2233       int commonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rowlength;
2234 
2235       // commonLength + TIMESTAMP_TYPE_SIZE
2236       int commonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + commonLength;
2237       // ColumnFamily + Qualifier length.
2238       int lcolumnlength = llength - commonLengthWithTSAndType;
2239       int rcolumnlength = rlength - commonLengthWithTSAndType;
2240 
2241       byte ltype = left[loffset + (llength - 1)];
2242       byte rtype = right[roffset + (rlength - 1)];
2243 
2244       // If the column is not specified, the "minimum" key type appears the
2245       // latest in the sorted order, regardless of the timestamp. This is used
2246       // for specifying the last key/value in a given row, because there is no
2247       // "lexicographically last column" (it would be infinitely long). The
2248       // "maximum" key type does not need this behavior.
2249       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
2250         // left is "bigger", i.e. it appears later in the sorted order
2251         return 1;
2252       }
2253       if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
2254         return -1;
2255       }
2256 
2257       int lfamilyoffset = commonLength + loffset;
2258       int rfamilyoffset = commonLength + roffset;
2259 
2260       // Column family length.
2261       int lfamilylength = left[lfamilyoffset - 1];
2262       int rfamilylength = right[rfamilyoffset - 1];
2263       // If left family size is not equal to right family size, we need not
2264       // compare the qualifiers.
2265       boolean sameFamilySize = (lfamilylength == rfamilylength);
2266       int common = 0;
2267       if (commonPrefix > 0) {
2268         common = Math.max(0, commonPrefix - commonLength);
2269         if (!sameFamilySize) {
2270           // Common should not be larger than Math.min(lfamilylength,
2271           // rfamilylength).
2272           common = Math.min(common, Math.min(lfamilylength, rfamilylength));
2273         } else {
2274           common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
2275         }
2276       }
2277       if (!sameFamilySize) {
2278         // comparing column family is enough.
2279         return Bytes.compareTo(left, lfamilyoffset + common, lfamilylength
2280             - common, right, rfamilyoffset + common, rfamilylength - common);
2281       }
2282       // Compare family & qualifier together.
2283       final int comparison = Bytes.compareTo(left, lfamilyoffset + common,
2284           lcolumnlength - common, right, rfamilyoffset + common,
2285           rcolumnlength - common);
2286       if (comparison != 0) {
2287         return comparison;
2288       }
2289 
2290       ////
2291       // Next compare timestamps.
2292       long ltimestamp = Bytes.toLong(left,
2293           loffset + (llength - TIMESTAMP_TYPE_SIZE));
2294       long rtimestamp = Bytes.toLong(right,
2295           roffset + (rlength - TIMESTAMP_TYPE_SIZE));
2296       int compare = compareTimestamps(ltimestamp, rtimestamp);
2297       if (compare != 0) {
2298         return compare;
2299       }
2300 
2301       // Compare types. Let the delete types sort ahead of puts; i.e. types
2302       // of higher numbers sort before those of lesser numbers. Maximum (255)
2303       // appears ahead of everything, and minimum (0) appears after
2304       // everything.
2305       return (0xff & rtype) - (0xff & ltype);
2306     }
2307 
2308     /**
2309      * Compares the row and column of two keyvalues for equality
2310      * @param left
2311      * @param right
2312      * @return True if same row and column.
2313      */
2314     public boolean matchingRowColumn(final KeyValue left,
2315         final KeyValue right) {
2316       short lrowlength = left.getRowLength();
2317       short rrowlength = right.getRowLength();
2318 
2319       // TsOffset = end of column data. just comparing Row+CF length of each
2320       if ((left.getTimestampOffset() - left.getOffset()) !=
2321           (right.getTimestampOffset() - right.getOffset())) {
2322         return false;
2323       }
2324 
2325       if (!matchingRows(left, lrowlength, right, rrowlength)) {
2326         return false;
2327       }
2328 
2329       int lfoffset = left.getFamilyOffset(lrowlength);
2330       int rfoffset = right.getFamilyOffset(rrowlength);
2331       int lclength = left.getTotalColumnLength(lrowlength,lfoffset);
2332       int rclength = right.getTotalColumnLength(rrowlength, rfoffset);
2333       int lfamilylength = left.getFamilyLength(lfoffset);
2334       int rfamilylength = right.getFamilyLength(rfoffset);
2335       int ccRes = compareColumns(left.getBuffer(), lfoffset, lclength, lfamilylength,
2336           right.getBuffer(), rfoffset, rclength, rfamilylength);
2337       return ccRes == 0;
2338     }
2339 
2340     /**
2341      * Compares the row of two keyvalues for equality
2342      * @param left
2343      * @param right
2344      * @return True if rows match.
2345      */
2346     public boolean matchingRows(final KeyValue left, final KeyValue right) {
2347       short lrowlength = left.getRowLength();
2348       short rrowlength = right.getRowLength();
2349       return matchingRows(left, lrowlength, right, rrowlength);
2350     }
2351 
2352     /**
2353      * @param left
2354      * @param lrowlength
2355      * @param right
2356      * @param rrowlength
2357      * @return True if rows match.
2358      */
2359     private boolean matchingRows(final KeyValue left, final short lrowlength,
2360         final KeyValue right, final short rrowlength) {
2361       return lrowlength == rrowlength &&
2362           matchingRows(left.getBuffer(), left.getRowOffset(), lrowlength,
2363               right.getBuffer(), right.getRowOffset(), rrowlength);
2364     }
2365 
2366     /**
2367      * Compare rows. Just calls Bytes.equals, but it's good to have this encapsulated.
2368      * @param left Left row array.
2369      * @param loffset Left row offset.
2370      * @param llength Left row length.
2371      * @param right Right row array.
2372      * @param roffset Right row offset.
2373      * @param rlength Right row length.
2374      * @return Whether rows are the same row.
2375      */
2376     public boolean matchingRows(final byte [] left, final int loffset, final int llength,
2377         final byte [] right, final int roffset, final int rlength) {
2378       return Bytes.equals(left, loffset, llength, right, roffset, rlength);
2379     }
2380 
2381     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2382       byte[] fakeKey = getShortMidpointKey(lastKeyOfPreviousBlock, firstKeyInBlock);
2383       if (compareFlatKey(fakeKey, firstKeyInBlock) > 0) {
2384         LOG.error("Unexpected getShortMidpointKey result, fakeKey:"
2385             + Bytes.toStringBinary(fakeKey) + ", firstKeyInBlock:"
2386             + Bytes.toStringBinary(firstKeyInBlock));
2387         return firstKeyInBlock;
2388       }
2389       if (lastKeyOfPreviousBlock != null && compareFlatKey(lastKeyOfPreviousBlock, fakeKey) >= 0) {
2390         LOG.error("Unexpected getShortMidpointKey result, lastKeyOfPreviousBlock:" +
2391             Bytes.toStringBinary(lastKeyOfPreviousBlock) + ", fakeKey:" +
2392             Bytes.toStringBinary(fakeKey));
2393         return firstKeyInBlock;
2394       }
2395       return fakeKey;
2396     }
2397 
2398     /**
2399      * This is a HFile block index key optimization.
2400      * @param leftKey
2401      * @param rightKey
2402      * @return 0 if equal, <0 if left smaller, >0 if right smaller
2403      */
2404     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
2405       if (rightKey == null) {
2406         throw new IllegalArgumentException("rightKey can not be null");
2407       }
2408       if (leftKey == null) {
2409         return Arrays.copyOf(rightKey, rightKey.length);
2410       }
2411       if (compareFlatKey(leftKey, rightKey) >= 0) {
2412         throw new IllegalArgumentException("Unexpected input, leftKey:" + Bytes.toString(leftKey)
2413           + ", rightKey:" + Bytes.toString(rightKey));
2414       }
2415 
2416       short leftRowLength = Bytes.toShort(leftKey, 0);
2417       short rightRowLength = Bytes.toShort(rightKey, 0);
2418       int leftCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + leftRowLength;
2419       int rightCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rightRowLength;
2420       int leftCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + leftCommonLength;
2421       int rightCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + rightCommonLength;
2422       int leftColumnLength = leftKey.length - leftCommonLengthWithTSAndType;
2423       int rightColumnLength = rightKey.length - rightCommonLengthWithTSAndType;
2424       // rows are equal
2425       if (leftRowLength == rightRowLength && compareRows(leftKey, ROW_LENGTH_SIZE, leftRowLength,
2426         rightKey, ROW_LENGTH_SIZE, rightRowLength) == 0) {
2427         // Compare family & qualifier together.
2428         int comparison = Bytes.compareTo(leftKey, leftCommonLength, leftColumnLength, rightKey,
2429           rightCommonLength, rightColumnLength);
2430         // same with "row + family + qualifier", return rightKey directly
2431         if (comparison == 0) {
2432           return Arrays.copyOf(rightKey, rightKey.length);
2433         }
2434         // "family + qualifier" are different, generate a faked key per rightKey
2435         byte[] newKey = Arrays.copyOf(rightKey, rightKey.length);
2436         Bytes.putLong(newKey, rightKey.length - TIMESTAMP_TYPE_SIZE, HConstants.LATEST_TIMESTAMP);
2437         Bytes.putByte(newKey, rightKey.length - TYPE_SIZE, Type.Maximum.getCode());
2438         return newKey;
2439       }
2440       // rows are different
2441       short minLength = leftRowLength < rightRowLength ? leftRowLength : rightRowLength;
2442       short diffIdx = 0;
2443       while (diffIdx < minLength
2444           && leftKey[ROW_LENGTH_SIZE + diffIdx] == rightKey[ROW_LENGTH_SIZE + diffIdx]) {
2445         diffIdx++;
2446       }
2447       byte[] newRowKey = null;
2448       if (diffIdx >= minLength) {
2449         // leftKey's row is prefix of rightKey's.
2450         newRowKey = new byte[diffIdx + 1];
2451         System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2452       } else {
2453         int diffByte = leftKey[ROW_LENGTH_SIZE + diffIdx];
2454         if ((0xff & diffByte) < 0xff && (diffByte + 1) <
2455             (rightKey[ROW_LENGTH_SIZE + diffIdx] & 0xff)) {
2456           newRowKey = new byte[diffIdx + 1];
2457           System.arraycopy(leftKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx);
2458           newRowKey[diffIdx] = (byte) (diffByte + 1);
2459         } else {
2460           newRowKey = new byte[diffIdx + 1];
2461           System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2462         }
2463       }
2464       return new KeyValue(newRowKey, null, null, HConstants.LATEST_TIMESTAMP,
2465         Type.Maximum).getKey();
2466     }
2467 
2468     @Override
2469     protected Object clone() throws CloneNotSupportedException {
2470       return new KVComparator();
2471     }
2472 
2473   }
2474 
2475   /**
2476    * Creates a KeyValue that is last on the specified row id. That is,
2477    * every other possible KeyValue for the given row would compareTo()
2478    * less than the result of this call.
2479    * @param row row key
2480    * @return Last possible KeyValue on passed <code>row</code>
2481    */
2482   public static KeyValue createLastOnRow(final byte[] row) {
2483     return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
2484   }
2485 
2486   /**
2487    * Create a KeyValue that is smaller than all other possible KeyValues
2488    * for the given row. That is any (valid) KeyValue on 'row' would sort
2489    * _after_ the result.
2490    *
2491    * @param row - row key (arbitrary byte array)
2492    * @return First possible KeyValue on passed <code>row</code>
2493    */
2494   public static KeyValue createFirstOnRow(final byte [] row) {
2495     return createFirstOnRow(row, HConstants.LATEST_TIMESTAMP);
2496   }
2497 
2498   /**
2499    * Create a KeyValue that is smaller than all other possible KeyValues
2500    * for the given row. That is any (valid) KeyValue on 'row' would sort
2501    * _after_ the result.
2502    *
2503    * @param row - row key (arbitrary byte array)
2504    * @return First possible KeyValue on passed <code>row</code>
2505    */
2506   public static KeyValue createFirstOnRow(final byte [] row, int roffset, short rlength) {
2507     return new KeyValue(row, roffset, rlength,
2508         null, 0, 0, null, 0, 0, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
2509   }
2510 
2511   /**
2512    * Creates a KeyValue that is smaller than all other KeyValues that
2513    * are older than the passed timestamp.
2514    * @param row - row key (arbitrary byte array)
2515    * @param ts - timestamp
2516    * @return First possible key on passed <code>row</code> and timestamp.
2517    */
2518   public static KeyValue createFirstOnRow(final byte [] row,
2519       final long ts) {
2520     return new KeyValue(row, null, null, ts, Type.Maximum);
2521   }
2522 
2523   /**
2524    * Create a KeyValue for the specified row, family and qualifier that would be
2525    * smaller than all other possible KeyValues that have the same row,family,qualifier.
2526    * Used for seeking.
2527    * @param row - row key (arbitrary byte array)
2528    * @param family - family name
2529    * @param qualifier - column qualifier
2530    * @return First possible key on passed <code>row</code>, and column.
2531    */
2532   public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
2533       final byte [] qualifier) {
2534     return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
2535   }
2536 
2537   /**
2538    * Create a Delete Family KeyValue for the specified row and family that would
2539    * be smaller than all other possible Delete Family KeyValues that have the
2540    * same row and family.
2541    * Used for seeking.
2542    * @param row - row key (arbitrary byte array)
2543    * @param family - family name
2544    * @return First Delete Family possible key on passed <code>row</code>.
2545    */
2546   public static KeyValue createFirstDeleteFamilyOnRow(final byte [] row,
2547       final byte [] family) {
2548     return new KeyValue(row, family, null, HConstants.LATEST_TIMESTAMP,
2549         Type.DeleteFamily);
2550   }
2551 
2552   /**
2553    * @param row - row key (arbitrary byte array)
2554    * @param f - family name
2555    * @param q - column qualifier
2556    * @param ts - timestamp
2557    * @return First possible key on passed <code>row</code>, column and timestamp
2558    */
2559   public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
2560       final byte [] q, final long ts) {
2561     return new KeyValue(row, f, q, ts, Type.Maximum);
2562   }
2563 
2564   /**
2565    * Create a KeyValue for the specified row, family and qualifier that would be
2566    * smaller than all other possible KeyValues that have the same row,
2567    * family, qualifier.
2568    * Used for seeking.
2569    * @param row row key
2570    * @param roffset row offset
2571    * @param rlength row length
2572    * @param family family name
2573    * @param foffset family offset
2574    * @param flength family length
2575    * @param qualifier column qualifier
2576    * @param qoffset qualifier offset
2577    * @param qlength qualifier length
2578    * @return First possible key on passed Row, Family, Qualifier.
2579    */
2580   public static KeyValue createFirstOnRow(final byte [] row,
2581       final int roffset, final int rlength, final byte [] family,
2582       final int foffset, final int flength, final byte [] qualifier,
2583       final int qoffset, final int qlength) {
2584     return new KeyValue(row, roffset, rlength, family,
2585         foffset, flength, qualifier, qoffset, qlength,
2586         HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0);
2587   }
2588 
2589   /**
2590    * Create a KeyValue for the specified row, family and qualifier that would be
2591    * smaller than all other possible KeyValues that have the same row,
2592    * family, qualifier.
2593    * Used for seeking.
2594    *
2595    * @param buffer the buffer to use for the new <code>KeyValue</code> object
2596    * @param row the value key
2597    * @param family family name
2598    * @param qualifier column qualifier
2599    *
2600    * @return First possible key on passed Row, Family, Qualifier.
2601    *
2602    * @throws IllegalArgumentException The resulting <code>KeyValue</code> object would be larger
2603    * than the provided buffer or than <code>Integer.MAX_VALUE</code>
2604    */
2605   public static KeyValue createFirstOnRow(byte [] buffer, final byte [] row,
2606       final byte [] family, final byte [] qualifier)
2607           throws IllegalArgumentException {
2608 
2609     return createFirstOnRow(buffer, 0, row, 0, row.length,
2610         family, 0, family.length,
2611         qualifier, 0, qualifier.length);
2612   }
2613 
2614   /**
2615    * Create a KeyValue for the specified row, family and qualifier that would be
2616    * smaller than all other possible KeyValues that have the same row,
2617    * family, qualifier.
2618    * Used for seeking.
2619    *
2620    * @param buffer the buffer to use for the new <code>KeyValue</code> object
2621    * @param boffset buffer offset
2622    * @param row the value key
2623    * @param roffset row offset
2624    * @param rlength row length
2625    * @param family family name
2626    * @param foffset family offset
2627    * @param flength family length
2628    * @param qualifier column qualifier
2629    * @param qoffset qualifier offset
2630    * @param qlength qualifier length
2631    *
2632    * @return First possible key on passed Row, Family, Qualifier.
2633    *
2634    * @throws IllegalArgumentException The resulting <code>KeyValue</code> object would be larger
2635    * than the provided buffer or than <code>Integer.MAX_VALUE</code>
2636    */
2637   public static KeyValue createFirstOnRow(byte [] buffer, final int boffset,
2638       final byte [] row, final int roffset, final int rlength,
2639       final byte [] family, final int foffset, final int flength,
2640       final byte [] qualifier, final int qoffset, final int qlength)
2641           throws IllegalArgumentException {
2642 
2643     long lLength = getKeyValueDataStructureSize(rlength, flength, qlength, 0);
2644 
2645     if (lLength > Integer.MAX_VALUE) {
2646       throw new IllegalArgumentException("KeyValue length " + lLength + " > " + Integer.MAX_VALUE);
2647     }
2648     int iLength = (int) lLength;
2649     if (buffer.length - boffset < iLength) {
2650       throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < " +
2651           iLength);
2652     }
2653 
2654     int len = writeByteArray(buffer, boffset, row, roffset, rlength, family, foffset, flength,
2655         qualifier, qoffset, qlength, HConstants.LATEST_TIMESTAMP, KeyValue.Type.Maximum,
2656         null, 0, 0, null);
2657     return new KeyValue(buffer, boffset, len);
2658   }
2659 
2660   /**
2661    * Create a KeyValue for the specified row, family and qualifier that would be
2662    * larger than or equal to all other possible KeyValues that have the same
2663    * row, family, qualifier.
2664    * Used for reseeking.
2665    * @param row row key
2666    * @param roffset row offset
2667    * @param rlength row length
2668    * @param family family name
2669    * @param foffset family offset
2670    * @param flength family length
2671    * @param qualifier column qualifier
2672    * @param qoffset qualifier offset
2673    * @param qlength qualifier length
2674    * @return Last possible key on passed row, family, qualifier.
2675    */
2676   public static KeyValue createLastOnRow(final byte [] row,
2677       final int roffset, final int rlength, final byte [] family,
2678       final int foffset, final int flength, final byte [] qualifier,
2679       final int qoffset, final int qlength) {
2680     return new KeyValue(row, roffset, rlength, family,
2681         foffset, flength, qualifier, qoffset, qlength,
2682         HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
2683   }
2684 
2685   /**
2686    * Similar to {@link #createLastOnRow(byte[], int, int, byte[], int, int,
2687    * byte[], int, int)} but creates the last key on the row/column of this KV
2688    * (the value part of the returned KV is always empty). Used in creating
2689    * "fake keys" for the multi-column Bloom filter optimization to skip the
2690    * row/column we already know is not in the file.
2691    * @return the last key on the row/column of the given key-value pair
2692    */
2693   public KeyValue createLastOnRowCol() {
2694     return new KeyValue(
2695         bytes, getRowOffset(), getRowLength(),
2696         bytes, getFamilyOffset(), getFamilyLength(),
2697         bytes, getQualifierOffset(), getQualifierLength(),
2698         HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
2699   }
2700 
2701   /**
2702    * Creates the first KV with the row/family/qualifier of this KV and the
2703    * given timestamp. Uses the "maximum" KV type that guarantees that the new
2704    * KV is the lowest possible for this combination of row, family, qualifier,
2705    * and timestamp. This KV's own timestamp is ignored. While this function
2706    * copies the value from this KV, it is normally used on key-only KVs.
2707    */
2708   public KeyValue createFirstOnRowColTS(long ts) {
2709     return new KeyValue(
2710         bytes, getRowOffset(), getRowLength(),
2711         bytes, getFamilyOffset(), getFamilyLength(),
2712         bytes, getQualifierOffset(), getQualifierLength(),
2713         ts, Type.Maximum, bytes, getValueOffset(), getValueLength());
2714   }
2715 
2716   /**
2717    * @param b
2718    * @return A KeyValue made of a byte array that holds the key-only part.
2719    * Needed to convert hfile index members to KeyValues.
2720    */
2721   public static KeyValue createKeyValueFromKey(final byte [] b) {
2722     return createKeyValueFromKey(b, 0, b.length);
2723   }
2724 
2725   /**
2726    * @param bb
2727    * @return A KeyValue made of a byte buffer that holds the key-only part.
2728    * Needed to convert hfile index members to KeyValues.
2729    */
2730   public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
2731     return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
2732   }
2733 
2734   /**
2735    * @param b
2736    * @param o
2737    * @param l
2738    * @return A KeyValue made of a byte array that holds the key-only part.
2739    * Needed to convert hfile index members to KeyValues.
2740    */
2741   public static KeyValue createKeyValueFromKey(final byte [] b, final int o,
2742       final int l) {
2743     byte [] newb = new byte[l + ROW_OFFSET];
2744     System.arraycopy(b, o, newb, ROW_OFFSET, l);
2745     Bytes.putInt(newb, 0, l);
2746     Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
2747     return new KeyValue(newb);
2748   }
2749 
2750   /**
2751    * @param in Where to read bytes from.  Creates a byte array to hold the KeyValue
2752    * backing bytes copied from the steam.
2753    * @return KeyValue created by deserializing from <code>in</code> OR if we find a length
2754    * of zero, we will return null which can be useful marking a stream as done.
2755    * @throws IOException
2756    */
2757   public static KeyValue create(final DataInput in) throws IOException {
2758     return create(in.readInt(), in);
2759   }
2760 
2761   /**
2762    * Create a KeyValue reading <code>length</code> from <code>in</code>
2763    * @param length
2764    * @param in
2765    * @return Created KeyValue OR if we find a length of zero, we will return null which
2766    * can be useful marking a stream as done.
2767    * @throws IOException
2768    */
2769   public static KeyValue create(int length, final DataInput in) throws IOException {
2770 
2771     if (length <= 0) {
2772       if (length == 0) return null;
2773       throw new IOException("Failed read " + length + " bytes, stream corrupt?");
2774     }
2775 
2776     // This is how the old Writables.readFrom used to deserialize.  Didn't even vint.
2777     byte [] bytes = new byte[length];
2778     in.readFully(bytes);
2779     return new KeyValue(bytes, 0, length);
2780   }
2781   
2782   /**
2783    * Create a new KeyValue by copying existing cell and adding new tags
2784    * @param c
2785    * @param newTags
2786    * @return a new KeyValue instance with new tags
2787    */
2788   public static KeyValue cloneAndAddTags(Cell c, List<Tag> newTags) {
2789     List<Tag> existingTags = null;
2790     if(c.getTagsLengthUnsigned() > 0) {
2791       existingTags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLengthUnsigned());
2792       existingTags.addAll(newTags);
2793     } else {
2794       existingTags = newTags;
2795     }
2796     return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
2797       c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), 
2798       c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
2799       c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), 
2800       c.getValueLength(), existingTags);
2801   }
2802 
2803   /**
2804    * Create a KeyValue reading from the raw InputStream.
2805    * Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}
2806    * @param in
2807    * @return Created KeyValue OR if we find a length of zero, we will return null which
2808    * can be useful marking a stream as done.
2809    * @throws IOException
2810    */
2811   public static KeyValue iscreate(final InputStream in) throws IOException {
2812     byte [] intBytes = new byte[Bytes.SIZEOF_INT];
2813     int bytesRead = 0;
2814     while (bytesRead < intBytes.length) {
2815       int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
2816       if (n < 0) {
2817         if (bytesRead == 0) return null; // EOF at start is ok
2818         throw new IOException("Failed read of int, read " + bytesRead + " bytes");
2819       }
2820       bytesRead += n;
2821     }
2822     // TODO: perhaps some sanity check is needed here.
2823     byte [] bytes = new byte[Bytes.toInt(intBytes)];
2824     IOUtils.readFully(in, bytes, 0, bytes.length);
2825     return new KeyValue(bytes, 0, bytes.length);
2826   }
2827 
2828   /**
2829    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable.
2830    * @param kv
2831    * @param out
2832    * @return Length written on stream
2833    * @throws IOException
2834    * @see #create(DataInput) for the inverse function
2835    */
2836   public static long write(final KeyValue kv, final DataOutput out) throws IOException {
2837     // This is how the old Writables write used to serialize KVs.  Need to figure way to make it
2838     // work for all implementations.
2839     int length = kv.getLength();
2840     out.writeInt(length);
2841     out.write(kv.getBuffer(), kv.getOffset(), length);
2842     return length + Bytes.SIZEOF_INT;
2843   }
2844 
2845   /**
2846    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2847    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2848    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2849    * @param kv
2850    * @param out
2851    * @return Length written on stream
2852    * @throws IOException
2853    * @see #create(DataInput) for the inverse function
2854    * @see #write(KeyValue, DataOutput)
2855    */
2856   @Deprecated
2857   public static long oswrite(final KeyValue kv, final OutputStream out)
2858       throws IOException {
2859     int length = kv.getLength();
2860     // This does same as DataOuput#writeInt (big-endian, etc.)
2861     out.write(Bytes.toBytes(length));
2862     out.write(kv.getBuffer(), kv.getOffset(), length);
2863     return length + Bytes.SIZEOF_INT;
2864   }
2865 
2866   /**
2867    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2868    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2869    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2870    * @param kv
2871    * @param out
2872    * @param withTags
2873    * @return Length written on stream
2874    * @throws IOException
2875    * @see #create(DataInput) for the inverse function
2876    * @see #write(KeyValue, DataOutput)
2877    */
2878   public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
2879       throws IOException {
2880     int length = kv.getLength();
2881     if (!withTags) {
2882       length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
2883     }
2884     // This does same as DataOuput#writeInt (big-endian, etc.)
2885     out.write(Bytes.toBytes(length));
2886     out.write(kv.getBuffer(), kv.getOffset(), length);
2887     return length + Bytes.SIZEOF_INT;
2888   }
2889 
2890   /**
2891    * Comparator that compares row component only of a KeyValue.
2892    */
2893   public static class RowOnlyComparator implements Comparator<KeyValue> {
2894     final KVComparator comparator;
2895 
2896     public RowOnlyComparator(final KVComparator c) {
2897       this.comparator = c;
2898     }
2899 
2900     public int compare(KeyValue left, KeyValue right) {
2901       return comparator.compareRows(left, right);
2902     }
2903   }
2904 
2905 
2906   /**
2907    * Avoids redundant comparisons for better performance.
2908    * 
2909    * TODO get rid of this wart
2910    */
2911   public interface SamePrefixComparator<T> {
2912     /**
2913      * Compare two keys assuming that the first n bytes are the same.
2914      * @param commonPrefix How many bytes are the same.
2915      */
2916     int compareIgnoringPrefix(
2917       int commonPrefix, byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength
2918     );
2919   }
2920 
2921   /**
2922    * This is a TEST only Comparator used in TestSeekTo and TestReseekTo.
2923    */
2924   public static class RawBytesComparator extends KVComparator {
2925     /**
2926      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
2927      * instantiate the appropriate comparator.
2928      * TODO: With V3 consider removing this.
2929      * @return legacy class name for FileFileTrailer#comparatorClassName
2930      */
2931     public String getLegacyKeyComparatorName() {
2932       return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator";
2933     }
2934 
2935     public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right,
2936         int roffset, int rlength) {
2937       return Bytes.BYTES_RAWCOMPARATOR.compare(left,  loffset, llength, right, roffset, rlength);
2938     }
2939 
2940     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2941       return firstKeyInBlock;
2942     }
2943 
2944   }
2945 
2946   /**
2947    * HeapSize implementation
2948    *
2949    * We do not count the bytes in the rowCache because it should be empty for a KeyValue in the
2950    * MemStore.
2951    */
2952   @Override
2953   public long heapSize() {
2954     int sum = 0;
2955     sum += ClassSize.OBJECT;// the KeyValue object itself
2956     sum += ClassSize.REFERENCE;// pointer to "bytes"
2957     sum += ClassSize.align(ClassSize.ARRAY);// "bytes"
2958     sum += ClassSize.align(length);// number of bytes of data in the "bytes" array
2959     sum += 2 * Bytes.SIZEOF_INT;// offset, length
2960     sum += Bytes.SIZEOF_LONG;// memstoreTS
2961     return ClassSize.align(sum);
2962   }
2963 }