View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellScanner;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.HColumnDescriptor;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.MediumTests;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.Tag;
40  import org.apache.hadoop.hbase.client.Append;
41  import org.apache.hadoop.hbase.client.Durability;
42  import org.apache.hadoop.hbase.client.HBaseAdmin;
43  import org.apache.hadoop.hbase.client.HTable;
44  import org.apache.hadoop.hbase.client.Increment;
45  import org.apache.hadoop.hbase.client.Mutation;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.client.ResultScanner;
49  import org.apache.hadoop.hbase.client.Scan;
50  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
51  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
52  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
53  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
54  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
55  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
56  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57  import org.apache.hadoop.hbase.util.Bytes;
58  import org.junit.After;
59  import org.junit.AfterClass;
60  import org.junit.BeforeClass;
61  import org.junit.Rule;
62  import org.junit.Test;
63  import org.junit.experimental.categories.Category;
64  import org.junit.rules.TestName;
65  
66  /**
67   * Class that test tags
68   */
69  @Category(MediumTests.class)
70  public class TestTags {
71    static boolean useFilter = false;
72  
73    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
74  
75    @Rule
76    public final TestName TEST_NAME = new TestName();
77  
78    @BeforeClass
79    public static void setUpBeforeClass() throws Exception {
80      Configuration conf = TEST_UTIL.getConfiguration();
81      conf.setInt("hfile.format.version", 3);
82      conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
83          TestCoprocessorForTags.class.getName());
84      TEST_UTIL.startMiniCluster(1, 2);
85    }
86  
87    @AfterClass
88    public static void tearDownAfterClass() throws Exception {
89      TEST_UTIL.shutdownMiniCluster();
90    }
91  
92    @After
93    public void tearDown() {
94      useFilter = false;
95    }
96  
97    @Test
98    public void testTags() throws Exception {
99      HTable table = null;
100     try {
101       TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
102       byte[] fam = Bytes.toBytes("info");
103       byte[] row = Bytes.toBytes("rowa");
104       // column names
105       byte[] qual = Bytes.toBytes("qual");
106 
107       byte[] row1 = Bytes.toBytes("rowb");
108 
109       byte[] row2 = Bytes.toBytes("rowc");
110 
111       HTableDescriptor desc = new HTableDescriptor(tableName);
112       HColumnDescriptor colDesc = new HColumnDescriptor(fam);
113       colDesc.setBlockCacheEnabled(true);
114       // colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
115       colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
116       desc.addFamily(colDesc);
117       HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
118       admin.createTable(desc);
119       byte[] value = Bytes.toBytes("value");
120       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
121       Put put = new Put(row);
122       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
123       put.setAttribute("visibility", Bytes.toBytes("myTag"));
124       table.put(put);
125       admin.flush(tableName.getName());
126       List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
127       for (HRegion region : regions) {
128         Store store = region.getStore(fam);
129         while (!(store.getStorefilesCount() > 0)) {
130           Thread.sleep(10);
131         }
132       }
133 
134       Put put1 = new Put(row1);
135       byte[] value1 = Bytes.toBytes("1000dfsdf");
136       put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
137       // put1.setAttribute("visibility", Bytes.toBytes("myTag3"));
138       table.put(put1);
139       admin.flush(tableName.getName());
140       regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
141       for (HRegion region : regions) {
142         Store store = region.getStore(fam);
143         while (!(store.getStorefilesCount() > 1)) {
144           Thread.sleep(10);
145         }
146       }
147 
148       Put put2 = new Put(row2);
149       byte[] value2 = Bytes.toBytes("1000dfsdf");
150       put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
151       put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
152       table.put(put2);
153 
154       admin.flush(tableName.getName());
155       regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
156       for (HRegion region : regions) {
157         Store store = region.getStore(fam);
158         while (!(store.getStorefilesCount() > 2)) {
159           Thread.sleep(10);
160         }
161       }
162       result(fam, row, qual, row2, table, value, value2, row1, value1);
163       admin.compact(tableName.getName());
164       while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
165         Thread.sleep(10);
166       }
167       result(fam, row, qual, row2, table, value, value2, row1, value1);
168     } finally {
169       if (table != null) {
170         table.close();
171       }
172     }
173   }
174 
175   @Test
176   public void testFlushAndCompactionWithoutTags() throws Exception {
177     HTable table = null;
178     try {
179       TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
180       byte[] fam = Bytes.toBytes("info");
181       byte[] row = Bytes.toBytes("rowa");
182       // column names
183       byte[] qual = Bytes.toBytes("qual");
184 
185       byte[] row1 = Bytes.toBytes("rowb");
186 
187       byte[] row2 = Bytes.toBytes("rowc");
188 
189       HTableDescriptor desc = new HTableDescriptor(tableName);
190       HColumnDescriptor colDesc = new HColumnDescriptor(fam);
191       colDesc.setBlockCacheEnabled(true);
192       // colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
193       colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
194       desc.addFamily(colDesc);
195       HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
196       admin.createTable(desc);
197 
198       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
199       Put put = new Put(row);
200       byte[] value = Bytes.toBytes("value");
201       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
202       table.put(put);
203       admin.flush(tableName.getName());
204       List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
205       for (HRegion region : regions) {
206         Store store = region.getStore(fam);
207         while (!(store.getStorefilesCount() > 0)) {
208           Thread.sleep(10);
209         }
210       }
211 
212       Put put1 = new Put(row1);
213       byte[] value1 = Bytes.toBytes("1000dfsdf");
214       put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
215       table.put(put1);
216       admin.flush(tableName.getName());
217       regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
218       for (HRegion region : regions) {
219         Store store = region.getStore(fam);
220         while (!(store.getStorefilesCount() > 1)) {
221           Thread.sleep(10);
222         }
223       }
224 
225       Put put2 = new Put(row2);
226       byte[] value2 = Bytes.toBytes("1000dfsdf");
227       put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
228       table.put(put2);
229 
230       admin.flush(tableName.getName());
231       regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
232       for (HRegion region : regions) {
233         Store store = region.getStore(fam);
234         while (!(store.getStorefilesCount() > 2)) {
235           Thread.sleep(10);
236         }
237       }
238       Scan s = new Scan(row);
239       ResultScanner scanner = table.getScanner(s);
240       try {
241         Result[] next = scanner.next(3);
242         for (Result result : next) {
243           CellScanner cellScanner = result.cellScanner();
244           cellScanner.advance();
245           KeyValue current = (KeyValue) cellScanner.current();
246           assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
247         }
248       } finally {
249         if (scanner != null)
250           scanner.close();
251       }
252       admin.compact(tableName.getName());
253       while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
254         Thread.sleep(10);
255       }
256       s = new Scan(row);
257       scanner = table.getScanner(s);
258       try {
259         Result[] next = scanner.next(3);
260         for (Result result : next) {
261           CellScanner cellScanner = result.cellScanner();
262           cellScanner.advance();
263           KeyValue current = (KeyValue) cellScanner.current();
264           assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
265         }
266       } finally {
267         if (scanner != null) {
268           scanner.close();
269         }
270       }
271     } finally {
272       if (table != null) {
273         table.close();
274       }
275     }
276   }
277 
278   @Test
279   public void testFlushAndCompactionwithCombinations() throws Exception {
280     HTable table = null;
281     try {
282       TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
283       byte[] fam = Bytes.toBytes("info");
284       byte[] row = Bytes.toBytes("rowa");
285       // column names
286       byte[] qual = Bytes.toBytes("qual");
287 
288       byte[] row1 = Bytes.toBytes("rowb");
289 
290       byte[] row2 = Bytes.toBytes("rowc");
291       byte[] rowd = Bytes.toBytes("rowd");
292       byte[] rowe = Bytes.toBytes("rowe");
293 
294       HTableDescriptor desc = new HTableDescriptor(tableName);
295       HColumnDescriptor colDesc = new HColumnDescriptor(fam);
296       colDesc.setBlockCacheEnabled(true);
297       // colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
298       colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
299       desc.addFamily(colDesc);
300       HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
301       admin.createTable(desc);
302 
303       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
304       Put put = new Put(row);
305       byte[] value = Bytes.toBytes("value");
306       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
307       int bigTagLen = Short.MAX_VALUE + 5;
308       put.setAttribute("visibility", new byte[bigTagLen]);
309       table.put(put);
310       Put put1 = new Put(row1);
311       byte[] value1 = Bytes.toBytes("1000dfsdf");
312       put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
313       table.put(put1);
314       admin.flush(tableName.getName());
315       List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
316       for (HRegion region : regions) {
317         Store store = region.getStore(fam);
318         while (!(store.getStorefilesCount() > 0)) {
319           Thread.sleep(10);
320         }
321       }
322 
323       put1 = new Put(row2);
324       value1 = Bytes.toBytes("1000dfsdf");
325       put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
326       table.put(put1);
327       admin.flush(tableName.getName());
328       regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
329       for (HRegion region : regions) {
330         Store store = region.getStore(fam);
331         while (!(store.getStorefilesCount() > 1)) {
332           Thread.sleep(10);
333         }
334       }
335       Put put2 = new Put(rowd);
336       byte[] value2 = Bytes.toBytes("1000dfsdf");
337       put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
338       table.put(put2);
339       put2 = new Put(rowe);
340       value2 = Bytes.toBytes("1000dfsddfdf");
341       put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
342       put.setAttribute("visibility", Bytes.toBytes("ram"));
343       table.put(put2);
344       admin.flush(tableName.getName());
345       regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
346       for (HRegion region : regions) {
347         Store store = region.getStore(fam);
348         while (!(store.getStorefilesCount() > 2)) {
349           Thread.sleep(10);
350         }
351       }
352       TestCoprocessorForTags.checkTagPresence = true;
353       Scan s = new Scan(row);
354       s.setCaching(1);
355       ResultScanner scanner = table.getScanner(s);
356       try {
357         Result next = null;
358         while ((next = scanner.next()) != null) {
359           CellScanner cellScanner = next.cellScanner();
360           cellScanner.advance();
361           KeyValue current = (KeyValue) cellScanner.current();
362           if (CellUtil.matchingRow(current, row)) {
363             assertEquals(1, TestCoprocessorForTags.tags.size());
364             Tag tag = TestCoprocessorForTags.tags.get(0);
365             assertEquals(bigTagLen, tag.getTagLength());
366           } else {
367             assertEquals(0, TestCoprocessorForTags.tags.size());
368           }
369         }
370       } finally {
371         if (scanner != null) {
372           scanner.close();
373         }
374         TestCoprocessorForTags.checkTagPresence = false;
375       }
376       while (admin.getCompactionState(tableName.getName()) != CompactionState.NONE) {
377         Thread.sleep(10);
378       }
379       TestCoprocessorForTags.checkTagPresence = true;
380       scanner = table.getScanner(s);
381       try {
382         Result next = null;
383         while ((next = scanner.next()) != null) {
384           CellScanner cellScanner = next.cellScanner();
385           cellScanner.advance();
386           KeyValue current = (KeyValue) cellScanner.current();
387           if (CellUtil.matchingRow(current, row)) {
388             assertEquals(1, TestCoprocessorForTags.tags.size());
389             Tag tag = TestCoprocessorForTags.tags.get(0);
390             assertEquals(bigTagLen, tag.getTagLength());
391           } else {
392             assertEquals(0, TestCoprocessorForTags.tags.size());
393           }
394         }
395       } finally {
396         if (scanner != null) {
397           scanner.close();
398         }
399         TestCoprocessorForTags.checkTagPresence = false;
400       }
401     } finally {
402       if (table != null) {
403         table.close();
404       }
405     }
406   }
407 
408   @Test
409   public void testTagsWithAppendAndIncrement() throws Exception {
410     TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
411     byte[] f = Bytes.toBytes("f");
412     byte[] q = Bytes.toBytes("q");
413     byte[] row1 = Bytes.toBytes("r1");
414     byte[] row2 = Bytes.toBytes("r2");
415 
416     HTableDescriptor desc = new HTableDescriptor(tableName);
417     HColumnDescriptor colDesc = new HColumnDescriptor(f);
418     desc.addFamily(colDesc);
419     TEST_UTIL.getHBaseAdmin().createTable(desc);
420 
421     HTable table = null;
422     try {
423       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
424       Put put = new Put(row1);
425       byte[] v = Bytes.toBytes(2L);
426       put.add(f, q, v);
427       put.setAttribute("visibility", Bytes.toBytes("tag1"));
428       table.put(put);
429       Increment increment = new Increment(row1);
430       increment.addColumn(f, q, 1L);
431       table.increment(increment);
432       TestCoprocessorForTags.checkTagPresence = true;
433       ResultScanner scanner = table.getScanner(new Scan());
434       Result result = scanner.next();
435       KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
436       List<Tag> tags = TestCoprocessorForTags.tags;
437       assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
438       assertEquals(1, tags.size());
439       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
440       TestCoprocessorForTags.checkTagPresence = false;
441       TestCoprocessorForTags.tags = null;
442 
443       increment = new Increment(row1);
444       increment.add(new KeyValue(row1, f, q, 1234L, v));
445       increment.setAttribute("visibility", Bytes.toBytes("tag2"));
446       table.increment(increment);
447       TestCoprocessorForTags.checkTagPresence = true;
448       scanner = table.getScanner(new Scan());
449       result = scanner.next();
450       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
451       tags = TestCoprocessorForTags.tags;
452       assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
453       assertEquals(2, tags.size());
454       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
455       assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
456       TestCoprocessorForTags.checkTagPresence = false;
457       TestCoprocessorForTags.tags = null;
458 
459       put = new Put(row2);
460       v = Bytes.toBytes(2L);
461       put.add(f, q, v);
462       table.put(put);
463       increment = new Increment(row2);
464       increment.add(new KeyValue(row2, f, q, 1234L, v));
465       increment.setAttribute("visibility", Bytes.toBytes("tag2"));
466       table.increment(increment);
467       Scan scan = new Scan();
468       scan.setStartRow(row2);
469       TestCoprocessorForTags.checkTagPresence = true;
470       scanner = table.getScanner(scan);
471       result = scanner.next();
472       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
473       tags = TestCoprocessorForTags.tags;
474       assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
475       assertEquals(1, tags.size());
476       assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
477       TestCoprocessorForTags.checkTagPresence = false;
478       TestCoprocessorForTags.tags = null;
479 
480       // Test Append
481       byte[] row3 = Bytes.toBytes("r3");
482       put = new Put(row3);
483       put.add(f, q, Bytes.toBytes("a"));
484       put.setAttribute("visibility", Bytes.toBytes("tag1"));
485       table.put(put);
486       Append append = new Append(row3);
487       append.add(f, q, Bytes.toBytes("b"));
488       table.append(append);
489       scan = new Scan();
490       scan.setStartRow(row3);
491       TestCoprocessorForTags.checkTagPresence = true;
492       scanner = table.getScanner(scan);
493       result = scanner.next();
494       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
495       tags = TestCoprocessorForTags.tags;
496       assertEquals(1, tags.size());
497       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
498       TestCoprocessorForTags.checkTagPresence = false;
499       TestCoprocessorForTags.tags = null;
500 
501       append = new Append(row3);
502       append.add(new KeyValue(row3, f, q, 1234L, v));
503       append.setAttribute("visibility", Bytes.toBytes("tag2"));
504       table.append(append);
505       TestCoprocessorForTags.checkTagPresence = true;
506       scanner = table.getScanner(scan);
507       result = scanner.next();
508       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
509       tags = TestCoprocessorForTags.tags;
510       assertEquals(2, tags.size());
511       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
512       assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
513       TestCoprocessorForTags.checkTagPresence = false;
514       TestCoprocessorForTags.tags = null;
515 
516       byte[] row4 = Bytes.toBytes("r4");
517       put = new Put(row4);
518       put.add(f, q, Bytes.toBytes("a"));
519       table.put(put);
520       append = new Append(row4);
521       append.add(new KeyValue(row4, f, q, 1234L, v));
522       append.setAttribute("visibility", Bytes.toBytes("tag2"));
523       table.append(append);
524       scan = new Scan();
525       scan.setStartRow(row4);
526       TestCoprocessorForTags.checkTagPresence = true;
527       scanner = table.getScanner(scan);
528       result = scanner.next();
529       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
530       tags = TestCoprocessorForTags.tags;
531       assertEquals(1, tags.size());
532       assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
533     } finally {
534       TestCoprocessorForTags.checkTagPresence = false;
535       TestCoprocessorForTags.tags = null;
536       if (table != null) {
537         table.close();
538       }
539     }
540   }
541 
542   private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, HTable table, byte[] value,
543       byte[] value2, byte[] row1, byte[] value1) throws IOException {
544     Scan s = new Scan(row);
545     // If filters are used this attribute can be specifically check for in
546     // filterKV method and
547     // kvs can be filtered out if the tags of interest is not found in that kv
548     s.setAttribute("visibility", Bytes.toBytes("myTag"));
549     ResultScanner scanner = null;
550     try {
551       scanner = table.getScanner(s);
552       Result next = scanner.next();
553 
554       assertTrue(Bytes.equals(next.getRow(), row));
555       assertTrue(Bytes.equals(next.getValue(fam, qual), value));
556 
557       Result next2 = scanner.next();
558       assertTrue(next2 != null);
559       assertTrue(Bytes.equals(next2.getRow(), row1));
560       assertTrue(Bytes.equals(next2.getValue(fam, qual), value1));
561 
562       next2 = scanner.next();
563       assertTrue(next2 != null);
564       assertTrue(Bytes.equals(next2.getRow(), row2));
565       assertTrue(Bytes.equals(next2.getValue(fam, qual), value2));
566 
567     } finally {
568       if (scanner != null)
569         scanner.close();
570     }
571   }
572 
573   public static class TestCoprocessorForTags extends BaseRegionObserver {
574 
575     public static boolean checkTagPresence = false;
576     public static List<Tag> tags = null;
577 
578     @Override
579     public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
580         final WALEdit edit, final Durability durability) throws IOException {
581       updateMutationAddingTags(put);
582     }
583 
584     private void updateMutationAddingTags(final Mutation m) {
585       byte[] attribute = m.getAttribute("visibility");
586       byte[] cf = null;
587       List<Cell> updatedCells = new ArrayList<Cell>();
588       if (attribute != null) {
589         for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
590           for (Cell cell : edits) {
591             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
592             if (cf == null) {
593               cf = kv.getFamily();
594             }
595             Tag tag = new Tag((byte) 1, attribute);
596             List<Tag> tagList = new ArrayList<Tag>();
597             tagList.add(tag);
598 
599             KeyValue newKV = new KeyValue(kv.getRow(), 0, kv.getRowLength(), kv.getFamily(), 0,
600                 kv.getFamilyLength(), kv.getQualifier(), 0, kv.getQualifierLength(),
601                 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getType()), kv.getValue(), 0,
602                 kv.getValueLength(), tagList);
603             ((List<Cell>) updatedCells).add(newKV);
604           }
605         }
606         m.getFamilyCellMap().remove(cf);
607         // Update the family map
608         m.getFamilyCellMap().put(cf, updatedCells);
609       }
610     }
611 
612     @Override
613     public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
614         throws IOException {
615       updateMutationAddingTags(increment);
616       return super.preIncrement(e, increment);
617     }
618 
619     @Override
620     public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
621         throws IOException {
622       updateMutationAddingTags(append);
623       return super.preAppend(e, append);
624     }
625 
626     @Override
627     public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
628         InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
629       if (checkTagPresence) {
630         if (results.size() > 0) {
631           // Check tag presence in the 1st cell in 1st Result
632           Result result = results.get(0);
633           CellScanner cellScanner = result.cellScanner();
634           if (cellScanner.advance()) {
635             Cell cell = cellScanner.current();
636             tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
637                 cell.getTagsLengthUnsigned());
638           }
639         }
640       }
641       return hasMore;
642     }
643   }
644 }