View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.security.access;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.fail;
22  
23  import java.security.PrivilegedExceptionAction;
24  import java.util.HashMap;
25  import java.util.Map;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Coprocessor;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.HColumnDescriptor;
33  import org.apache.hadoop.hbase.HTableDescriptor;
34  import org.apache.hadoop.hbase.MediumTests;
35  import org.apache.hadoop.hbase.TableNotFoundException;
36  import org.apache.hadoop.hbase.client.Delete;
37  import org.apache.hadoop.hbase.client.Get;
38  import org.apache.hadoop.hbase.client.HBaseAdmin;
39  import org.apache.hadoop.hbase.client.HTable;
40  import org.apache.hadoop.hbase.client.Increment;
41  import org.apache.hadoop.hbase.client.Put;
42  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
43  import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
44  import org.apache.hadoop.hbase.security.User;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  import org.apache.hadoop.hbase.util.TestTableName;
48  import org.apache.log4j.Level;
49  import org.apache.log4j.Logger;
50  import org.junit.After;
51  import org.junit.AfterClass;
52  import org.junit.Before;
53  import org.junit.BeforeClass;
54  import org.junit.Rule;
55  import org.junit.Test;
56  import org.junit.experimental.categories.Category;
57  
58  @Category(MediumTests.class)
59  public class TestCellACLWithMultipleVersions extends SecureTestUtil {
60    private static final Log LOG = LogFactory.getLog(TestCellACLWithMultipleVersions.class);
61  
62    static {
63      Logger.getLogger(AccessController.class).setLevel(Level.TRACE);
64      Logger.getLogger(AccessControlFilter.class).setLevel(Level.TRACE);
65      Logger.getLogger(TableAuthManager.class).setLevel(Level.TRACE);
66    }
67  
68    @Rule
69    public TestTableName TEST_TABLE = new TestTableName();
70    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
71    private static final byte[] TEST_FAMILY1 = Bytes.toBytes("f1");
72    private static final byte[] TEST_FAMILY2 = Bytes.toBytes("f2");
73    private static final byte[] TEST_ROW = Bytes.toBytes("cellpermtest");
74    private static final byte[] TEST_Q1 = Bytes.toBytes("q1");
75    private static final byte[] TEST_Q2 = Bytes.toBytes("q2");
76    private static final byte[] ZERO = Bytes.toBytes(0L);
77    private static final byte[] ONE = Bytes.toBytes(1L);
78    private static final byte[] TWO = Bytes.toBytes(2L);
79  
80    private static Configuration conf;
81  
82    private static User USER_OWNER;
83    private static User USER_OTHER;
84    private static User USER_OTHER2;
85  
86    @BeforeClass
87    public static void setupBeforeClass() throws Exception {
88      // setup configuration
89      conf = TEST_UTIL.getConfiguration();
90      // Enable security
91      enableSecurity(conf);
92      // Verify enableSecurity sets up what we require
93      verifyConfiguration(conf);
94  
95      // We expect 0.98 cell ACL semantics
96      conf.setBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT, false);
97  
98      TEST_UTIL.startMiniCluster();
99      MasterCoprocessorHost cpHost = TEST_UTIL.getMiniHBaseCluster()
100         .getMaster().getCoprocessorHost();
101     cpHost.load(AccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
102     AccessController ac = (AccessController)
103       cpHost.findCoprocessor(AccessController.class.getName());
104     cpHost.createEnvironment(AccessController.class, ac, Coprocessor.PRIORITY_HIGHEST, 1, conf);
105     RegionServerCoprocessorHost rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
106         .getCoprocessorHost();
107     rsHost.createEnvironment(AccessController.class, ac, Coprocessor.PRIORITY_HIGHEST, 1, conf);
108 
109     // Wait for the ACL table to become available
110     TEST_UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME.getName());
111 
112     // create a set of test users
113     USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
114     USER_OTHER = User.createUserForTesting(conf, "other", new String[0]);
115     USER_OTHER2 = User.createUserForTesting(conf, "other2", new String[0]);
116   }
117 
118   @AfterClass
119   public static void tearDownAfterClass() throws Exception {
120     TEST_UTIL.shutdownMiniCluster();
121   }
122 
123   @Before
124   public void setUp() throws Exception {
125     // Create the test table (owner added to the _acl_ table)
126     HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
127     HTableDescriptor htd = new HTableDescriptor(TEST_TABLE.getTableName());
128     HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAMILY1);
129     hcd.setMaxVersions(4);
130     htd.setOwner(USER_OWNER);
131     htd.addFamily(hcd);
132     hcd = new HColumnDescriptor(TEST_FAMILY2);
133     hcd.setMaxVersions(4);
134     htd.setOwner(USER_OWNER);
135     htd.addFamily(hcd);
136     admin.createTable(htd, new byte[][] { Bytes.toBytes("s") });
137     TEST_UTIL.waitTableEnabled(TEST_TABLE.getTableName().getName());
138   }
139 
140   @Test
141   public void testCellPermissionwithVersions() throws Exception {
142     // store two sets of values, one store with a cell level ACL, and one
143     // without
144     verifyAllowed(new AccessTestAction() {
145       @Override
146       public Object run() throws Exception {
147         HTable t = new HTable(conf, TEST_TABLE.getTableName());
148         try {
149           Put p;
150           // with ro ACL
151           p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
152           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.WRITE));
153           t.put(p);
154           // with ro ACL
155           p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
156           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ));
157           t.put(p);
158           p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
159           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.WRITE));
160           t.put(p);
161           p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
162           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ));
163           t.put(p);
164           p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
165           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.WRITE));
166           t.put(p);
167         } finally {
168           t.close();
169         }
170         return null;
171       }
172     }, USER_OWNER);
173 
174     /* ---- Gets ---- */
175 
176     AccessTestAction getQ1 = new AccessTestAction() {
177       @Override
178       public Object run() throws Exception {
179         Get get = new Get(TEST_ROW);
180         get.setMaxVersions(10);
181         HTable t = new HTable(conf, TEST_TABLE.getTableName());
182         try {
183           return t.get(get).listCells();
184         } finally {
185           t.close();
186         }
187       }
188     };
189 
190     AccessTestAction get2 = new AccessTestAction() {
191       @Override
192       public Object run() throws Exception {
193         Get get = new Get(TEST_ROW);
194         get.setMaxVersions(10);
195         HTable t = new HTable(conf, TEST_TABLE.getTableName());
196         try {
197           return t.get(get).listCells();
198         } finally {
199           t.close();
200         }
201       }
202     };
203     // Confirm special read access set at cell level
204 
205     verifyAllowed(USER_OTHER, getQ1, 2);
206 
207     // store two sets of values, one store with a cell level ACL, and one
208     // without
209     verifyAllowed(new AccessTestAction() {
210       @Override
211       public Object run() throws Exception {
212         HTable t = new HTable(conf, TEST_TABLE.getTableName());
213         try {
214           Put p;
215           p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
216           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.WRITE));
217           t.put(p);
218           p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
219           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ));
220           t.put(p);
221           p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1, ZERO);
222           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.WRITE));
223           t.put(p);
224         } finally {
225           t.close();
226         }
227         return null;
228       }
229     }, USER_OWNER);
230     // Confirm special read access set at cell level
231 
232     verifyAllowed(USER_OTHER, get2, 1);
233   }
234 
235   @Test
236   public void testCellPermissionsWithDeleteMutipleVersions() throws Exception {
237     // table/column/qualifier level permissions
238     final byte[] TEST_ROW1 = Bytes.toBytes("r1");
239     final byte[] TEST_ROW2 = Bytes.toBytes("r2");
240     final byte[] TEST_Q1 = Bytes.toBytes("q1");
241     final byte[] TEST_Q2 = Bytes.toBytes("q2");
242     final byte[] ZERO = Bytes.toBytes(0L);
243 
244     // additional test user
245     final User user1 = User.createUserForTesting(conf, "user1", new String[0]);
246     final User user2 = User.createUserForTesting(conf, "user2", new String[0]);
247 
248     verifyAllowed(new AccessTestAction() {
249       @Override
250       public Object run() throws Exception {
251         HTable t = new HTable(conf, TEST_TABLE.getTableName());
252         try {
253           // with rw ACL for "user1"
254           Put p = new Put(TEST_ROW1);
255           p.add(TEST_FAMILY1, TEST_Q1, ZERO);
256           p.add(TEST_FAMILY1, TEST_Q2, ZERO);
257           p.setACL(user1.getShortName(), new Permission(Permission.Action.READ,
258               Permission.Action.WRITE));
259           t.put(p);
260           // with rw ACL for "user1"
261           p = new Put(TEST_ROW2);
262           p.add(TEST_FAMILY1, TEST_Q1, ZERO);
263           p.add(TEST_FAMILY1, TEST_Q2, ZERO);
264           p.setACL(user1.getShortName(), new Permission(Permission.Action.READ,
265               Permission.Action.WRITE));
266           t.put(p);
267         } finally {
268           t.close();
269         }
270         return null;
271       }
272     }, USER_OWNER);
273 
274     verifyAllowed(new AccessTestAction() {
275       @Override
276       public Object run() throws Exception {
277         HTable t = new HTable(conf, TEST_TABLE.getTableName());
278         try {
279           // with rw ACL for "user1" and "user2"
280           Put p = new Put(TEST_ROW1);
281           p.add(TEST_FAMILY1, TEST_Q1, ZERO);
282           p.add(TEST_FAMILY1, TEST_Q2, ZERO);
283           Map<String, Permission> perms = new HashMap<String, Permission>();
284           perms.put(user1.getShortName(), new Permission(Permission.Action.READ,
285               Permission.Action.WRITE));
286           perms.put(user2.getShortName(), new Permission(Permission.Action.READ,
287               Permission.Action.WRITE));
288           p.setACL(perms);
289           t.put(p);
290           // with rw ACL for "user1" and "user2"
291           p = new Put(TEST_ROW2);
292           p.add(TEST_FAMILY1, TEST_Q1, ZERO);
293           p.add(TEST_FAMILY1, TEST_Q2, ZERO);
294           p.setACL(perms);
295           t.put(p);
296         } finally {
297           t.close();
298         }
299         return null;
300       }
301     }, user1);
302 
303     // user1 should be allowed to delete TEST_ROW1 as he is having write permission on both
304     // versions of the cells
305     user1.runAs(new PrivilegedExceptionAction<Void>() {
306       @Override
307       public Void run() throws Exception {
308         HTable t = new HTable(conf, TEST_TABLE.getTableName());
309         try {
310           Delete d = new Delete(TEST_ROW1);
311           d.deleteColumns(TEST_FAMILY1, TEST_Q1);
312           d.deleteColumns(TEST_FAMILY1, TEST_Q2);
313           t.delete(d);
314         } finally {
315           t.close();
316         }
317         return null;
318       }
319     });
320     // user2 should not be allowed to delete TEST_ROW2 as he is having write permission only on one
321     // version of the cells.
322     user2.runAs(new PrivilegedExceptionAction<Void>() {
323       @Override
324       public Void run() throws Exception {
325         HTable t = new HTable(conf, TEST_TABLE.getTableName());
326         try {
327           Delete d = new Delete(TEST_ROW2);
328           d.deleteColumns(TEST_FAMILY1, TEST_Q1);
329           d.deleteColumns(TEST_FAMILY1, TEST_Q2);
330           t.delete(d);
331           fail("user2 should not be allowed to delete the row");
332         } catch (Exception e) {
333 
334         } finally {
335           t.close();
336         }
337         return null;
338       }
339     });
340     // user1 should be allowed to delete the cf. (All data under cf for a row)
341     user1.runAs(new PrivilegedExceptionAction<Void>() {
342       @Override
343       public Void run() throws Exception {
344         HTable t = new HTable(conf, TEST_TABLE.getTableName());
345         try {
346           Delete d = new Delete(TEST_ROW2);
347           d.deleteFamily(TEST_FAMILY1);
348           t.delete(d);
349         } finally {
350           t.close();
351         }
352         return null;
353       }
354     });
355   }
356 
357 
358   @Test
359   public void testDeleteWithFutureTimestamp() throws Exception {
360     // Store two values, one in the future
361 
362     verifyAllowed(new AccessTestAction() {
363       @Override
364       public Object run() throws Exception {
365         HTable t = new HTable(conf, TEST_TABLE.getTableName());
366         try {
367           // Store read only ACL at a future time
368           Put p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q1,
369             EnvironmentEdgeManager.currentTimeMillis() + 1000000,
370             ZERO);
371           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ));
372           t.put(p);
373           // Store a read write ACL without a timestamp, server will use current time
374           p = new Put(TEST_ROW).add(TEST_FAMILY1, TEST_Q2, ONE);
375           p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
376             Permission.Action.WRITE));
377           t.put(p);
378         } finally {
379           t.close();
380         }
381         return null;
382       }
383     }, USER_OWNER);
384 
385     // Confirm stores are visible
386 
387     AccessTestAction getQ1 = new AccessTestAction() {
388       @Override
389       public Object run() throws Exception {
390         Get get = new Get(TEST_ROW).addColumn(TEST_FAMILY1, TEST_Q1);
391         HTable t = new HTable(conf, TEST_TABLE.getTableName());
392         try {
393           return t.get(get).listCells();
394         } finally {
395           t.close();
396         }
397       }
398     };
399 
400     AccessTestAction getQ2 = new AccessTestAction() {
401       @Override
402       public Object run() throws Exception {
403         Get get = new Get(TEST_ROW).addColumn(TEST_FAMILY1, TEST_Q2);
404         HTable t = new HTable(conf, TEST_TABLE.getTableName());
405         try {
406           return t.get(get).listCells();
407         } finally {
408           t.close();
409         }
410       }
411     };
412 
413     verifyAllowed(getQ1, USER_OWNER, USER_OTHER);
414     verifyAllowed(getQ2, USER_OWNER, USER_OTHER);
415 
416 
417     // Issue a DELETE for the family, should succeed because the future ACL is
418     // not considered
419 
420     AccessTestAction deleteFamily = new AccessTestAction() {
421       @Override
422       public Object run() throws Exception {
423         Delete delete = new Delete(TEST_ROW).deleteFamily(TEST_FAMILY1);
424         HTable t = new HTable(conf, TEST_TABLE.getTableName());
425         try {
426           t.delete(delete);
427         } finally {
428           t.close();
429         }
430         return null;
431       }
432     };
433 
434     verifyAllowed(deleteFamily, USER_OTHER);
435 
436     // The future put should still exist
437     
438     verifyAllowed(getQ1, USER_OWNER, USER_OTHER);
439     
440     // The other put should be covered by the tombstone
441 
442     verifyDenied(getQ2, USER_OTHER);
443   }
444 
445   @Test
446   public void testCellPermissionsWithDeleteWithUserTs() throws Exception {
447     USER_OWNER.runAs(new AccessTestAction() {
448       @Override
449       public Object run() throws Exception {
450         HTable t = new HTable(conf, TEST_TABLE.getTableName());
451         try {
452           // This version (TS = 123) with rw ACL for USER_OTHER and USER_OTHER2
453           Put p = new Put(TEST_ROW);
454           p.add(TEST_FAMILY1, TEST_Q1, 123L, ZERO);
455           p.add(TEST_FAMILY1, TEST_Q2, 123L, ZERO);
456           Map<String, Permission> perms = new HashMap<String, Permission>();
457           perms.put(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
458             Permission.Action.WRITE));
459           perms.put(USER_OTHER2.getShortName(), new Permission(Permission.Action.READ,
460             Permission.Action.WRITE));
461           p.setACL(perms);
462           t.put(p);
463 
464           // This version (TS = 125) with rw ACL for USER_OTHER
465           p = new Put(TEST_ROW);
466           p.add(TEST_FAMILY1, TEST_Q1, 125L, ONE);
467           p.add(TEST_FAMILY1, TEST_Q2, 125L, ONE);
468           perms = new HashMap<String, Permission>();
469           perms.put(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
470             Permission.Action.WRITE));
471           p.setACL(perms);
472           t.put(p);
473 
474           // This version (TS = 127) with rw ACL for USER_OTHER
475           p = new Put(TEST_ROW);
476           p.add(TEST_FAMILY1, TEST_Q1, 127L, TWO);
477           p.add(TEST_FAMILY1, TEST_Q2, 127L, TWO);
478           perms = new HashMap<String, Permission>();
479           perms.put(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
480             Permission.Action.WRITE));
481           p.setACL(perms);
482           t.put(p);
483 
484           return null;
485         } finally {
486           t.close();
487         }
488       }
489     });
490 
491     // USER_OTHER2 should be allowed to delete the column f1:q1 versions older than TS 124L
492     USER_OTHER2.runAs(new AccessTestAction() {
493       @Override
494       public Object run() throws Exception {
495         HTable t = new HTable(conf, TEST_TABLE.getTableName());
496         try {
497           Delete d = new Delete(TEST_ROW, 124L);
498           d.deleteColumns(TEST_FAMILY1, TEST_Q1);
499           t.delete(d);
500         } finally {
501           t.close();
502         }
503         return null;
504       }
505     });
506 
507     // USER_OTHER2 should be allowed to delete the column f1:q2 versions older than TS 124L
508     USER_OTHER2.runAs(new AccessTestAction() {
509       @Override
510       public Object run() throws Exception {
511         HTable t = new HTable(conf, TEST_TABLE.getTableName());
512         try {
513           Delete d = new Delete(TEST_ROW);
514           d.deleteColumns(TEST_FAMILY1, TEST_Q2, 124L);
515           t.delete(d);
516         } finally {
517           t.close();
518         }
519         return null;
520       }
521     });
522   }
523 
524   @Test
525   public void testCellPermissionsWithDeleteExactVersion() throws Exception {
526     final byte[] TEST_ROW1 = Bytes.toBytes("r1");
527     final byte[] TEST_Q1 = Bytes.toBytes("q1");
528     final byte[] TEST_Q2 = Bytes.toBytes("q2");
529     final byte[] ZERO = Bytes.toBytes(0L);
530 
531     final User user1 = User.createUserForTesting(conf, "user1", new String[0]);
532     final User user2 = User.createUserForTesting(conf, "user2", new String[0]);
533 
534     verifyAllowed(new AccessTestAction() {
535       @Override
536       public Object run() throws Exception {
537         HTable t = new HTable(conf, TEST_TABLE.getTableName());
538         try {
539           Map<String, Permission> permsU1andOwner = new HashMap<String, Permission>();
540           permsU1andOwner.put(user1.getShortName(), new Permission(Permission.Action.READ,
541               Permission.Action.WRITE));
542           permsU1andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
543               Permission.Action.WRITE));
544           Map<String, Permission> permsU2andOwner = new HashMap<String, Permission>();
545           permsU2andOwner.put(user2.getShortName(), new Permission(Permission.Action.READ,
546               Permission.Action.WRITE));
547           permsU2andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
548               Permission.Action.WRITE));
549           Put p = new Put(TEST_ROW1);
550           p.add(TEST_FAMILY1, TEST_Q1, 123, ZERO);
551           p.setACL(permsU1andOwner);
552           t.put(p);
553           p = new Put(TEST_ROW1);
554           p.add(TEST_FAMILY1, TEST_Q2, 123, ZERO);
555           p.setACL(permsU2andOwner);
556           t.put(p);
557           p = new Put(TEST_ROW1);
558           p.add(TEST_FAMILY2, TEST_Q1, 123, ZERO);
559           p.add(TEST_FAMILY2, TEST_Q2, 123, ZERO);
560           p.setACL(permsU2andOwner);
561           t.put(p);
562 
563           p = new Put(TEST_ROW1);
564           p.add(TEST_FAMILY2, TEST_Q1, 125, ZERO);
565           p.add(TEST_FAMILY2, TEST_Q2, 125, ZERO);
566           p.setACL(permsU1andOwner);
567           t.put(p);
568 
569           p = new Put(TEST_ROW1);
570           p.add(TEST_FAMILY1, TEST_Q1, 127, ZERO);
571           p.setACL(permsU2andOwner);
572           t.put(p);
573           p = new Put(TEST_ROW1);
574           p.add(TEST_FAMILY1, TEST_Q2, 127, ZERO);
575           p.setACL(permsU1andOwner);
576           t.put(p);
577           p = new Put(TEST_ROW1);
578           p.add(TEST_FAMILY2, TEST_Q1, 129, ZERO);
579           p.add(TEST_FAMILY2, TEST_Q2, 129, ZERO);
580           p.setACL(permsU1andOwner);
581           t.put(p);
582         } finally {
583           t.close();
584         }
585         return null;
586       }
587     }, USER_OWNER);
588 
589     // user1 should be allowed to delete TEST_ROW1 as he is having write permission on both
590     // versions of the cells
591     user1.runAs(new PrivilegedExceptionAction<Void>() {
592       @Override
593       public Void run() throws Exception {
594         HTable t = new HTable(conf, TEST_TABLE.getTableName());
595         try {
596           Delete d = new Delete(TEST_ROW1);
597           d.deleteColumn(TEST_FAMILY1, TEST_Q1, 123);
598           d.deleteColumn(TEST_FAMILY1, TEST_Q2);
599           d.deleteFamilyVersion(TEST_FAMILY2, 125);
600           t.delete(d);
601         } finally {
602           t.close();
603         }
604         return null;
605       }
606     });
607 
608     user2.runAs(new PrivilegedExceptionAction<Void>() {
609       @Override
610       public Void run() throws Exception {
611         HTable t = new HTable(conf, TEST_TABLE.getTableName());
612         try {
613           Delete d = new Delete(TEST_ROW1, 127);
614           d.deleteColumns(TEST_FAMILY1, TEST_Q1);
615           d.deleteColumns(TEST_FAMILY1, TEST_Q2);
616           d.deleteFamily(TEST_FAMILY2, 129);
617           t.delete(d);
618           fail("user2 can not do the delete");
619         } catch (Exception e) {
620 
621         } finally {
622           t.close();
623         }
624         return null;
625       }
626     });
627   }
628 
629   @Test
630   public void testCellPermissionsForIncrementWithMultipleVersions() throws Exception {
631     final byte[] TEST_ROW1 = Bytes.toBytes("r1");
632     final byte[] TEST_Q1 = Bytes.toBytes("q1");
633     final byte[] TEST_Q2 = Bytes.toBytes("q2");
634     final byte[] ZERO = Bytes.toBytes(0L);
635 
636     final User user1 = User.createUserForTesting(conf, "user1", new String[0]);
637     final User user2 = User.createUserForTesting(conf, "user2", new String[0]);
638 
639     verifyAllowed(new AccessTestAction() {
640       @Override
641       public Object run() throws Exception {
642         HTable t = new HTable(conf, TEST_TABLE.getTableName());
643         try {
644           Map<String, Permission> permsU1andOwner = new HashMap<String, Permission>();
645           permsU1andOwner.put(user1.getShortName(), new Permission(Permission.Action.READ,
646               Permission.Action.WRITE));
647           permsU1andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
648               Permission.Action.WRITE));
649           Map<String, Permission> permsU2andOwner = new HashMap<String, Permission>();
650           permsU2andOwner.put(user2.getShortName(), new Permission(Permission.Action.READ,
651               Permission.Action.WRITE));
652           permsU2andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
653               Permission.Action.WRITE));
654           Put p = new Put(TEST_ROW1);
655           p.add(TEST_FAMILY1, TEST_Q1, 123, ZERO);
656           p.setACL(permsU1andOwner);
657           t.put(p);
658           p = new Put(TEST_ROW1);
659           p.add(TEST_FAMILY1, TEST_Q2, 123, ZERO);
660           p.setACL(permsU2andOwner);
661           t.put(p);
662 
663           p = new Put(TEST_ROW1);
664           p.add(TEST_FAMILY1, TEST_Q1, 127, ZERO);
665           p.setACL(permsU2andOwner);
666           t.put(p);
667           p = new Put(TEST_ROW1);
668           p.add(TEST_FAMILY1, TEST_Q2, 127, ZERO);
669           p.setACL(permsU1andOwner);
670           t.put(p);
671         } finally {
672           t.close();
673         }
674         return null;
675       }
676     }, USER_OWNER);
677 
678     // Increment considers the TimeRange set on it.
679     user1.runAs(new PrivilegedExceptionAction<Void>() {
680       @Override
681       public Void run() throws Exception {
682         HTable t = new HTable(conf, TEST_TABLE.getTableName());
683         try {
684           Increment inc = new Increment(TEST_ROW1);
685           inc.setTimeRange(0, 123);
686           inc.addColumn(TEST_FAMILY1, TEST_Q1, 2L);
687           t.increment(inc);
688           t.incrementColumnValue(TEST_ROW1, TEST_FAMILY1, TEST_Q2, 1L);
689         } finally {
690           t.close();
691         }
692         return null;
693       }
694     });
695 
696     user2.runAs(new PrivilegedExceptionAction<Void>() {
697       @Override
698       public Void run() throws Exception {
699         HTable t = new HTable(conf, TEST_TABLE.getTableName());
700         try {
701           Increment inc = new Increment(TEST_ROW1);
702           inc.setTimeRange(0, 127);
703           inc.addColumn(TEST_FAMILY1, TEST_Q2, 2L);
704           t.increment(inc);
705           fail();
706         } catch (Exception e) {
707 
708         } finally {
709           t.close();
710         }
711         return null;
712       }
713     });
714   }
715 
716   @Test
717   public void testCellPermissionsForPutWithMultipleVersions() throws Exception {
718     final byte[] TEST_ROW1 = Bytes.toBytes("r1");
719     final byte[] TEST_Q1 = Bytes.toBytes("q1");
720     final byte[] TEST_Q2 = Bytes.toBytes("q2");
721     final byte[] ZERO = Bytes.toBytes(0L);
722 
723     final User user1 = User.createUserForTesting(conf, "user1", new String[0]);
724     final User user2 = User.createUserForTesting(conf, "user2", new String[0]);
725 
726     verifyAllowed(new AccessTestAction() {
727       @Override
728       public Object run() throws Exception {
729         HTable t = new HTable(conf, TEST_TABLE.getTableName());
730         try {
731           Map<String, Permission> permsU1andOwner = new HashMap<String, Permission>();
732           permsU1andOwner.put(user1.getShortName(), new Permission(Permission.Action.READ,
733               Permission.Action.WRITE));
734           permsU1andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
735               Permission.Action.WRITE));
736           Map<String, Permission> permsU2andOwner = new HashMap<String, Permission>();
737           permsU2andOwner.put(user2.getShortName(), new Permission(Permission.Action.READ,
738               Permission.Action.WRITE));
739           permsU2andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
740               Permission.Action.WRITE));
741           Put p = new Put(TEST_ROW1);
742           p.add(TEST_FAMILY1, TEST_Q1, 123, ZERO);
743           p.setACL(permsU1andOwner);
744           t.put(p);
745           p = new Put(TEST_ROW1);
746           p.add(TEST_FAMILY1, TEST_Q2, 123, ZERO);
747           p.setACL(permsU2andOwner);
748           t.put(p);
749 
750           p = new Put(TEST_ROW1);
751           p.add(TEST_FAMILY1, TEST_Q1, 127, ZERO);
752           p.setACL(permsU2andOwner);
753           t.put(p);
754           p = new Put(TEST_ROW1);
755           p.add(TEST_FAMILY1, TEST_Q2, 127, ZERO);
756           p.setACL(permsU1andOwner);
757           t.put(p);
758         } finally {
759           t.close();
760         }
761         return null;
762       }
763     }, USER_OWNER);
764 
765     // new Put with TEST_Q1 column having TS=125. This covers old cell with TS 123 and user1 is
766     // having RW permission. While TEST_Q2 is with latest TS and so it covers old cell with TS 127.
767     // User1 is having RW permission on that too.
768     user1.runAs(new PrivilegedExceptionAction<Void>() {
769       @Override
770       public Void run() throws Exception {
771         HTable t = new HTable(conf, TEST_TABLE.getTableName());
772         try {
773           Put p = new Put(TEST_ROW1);
774           p.add(TEST_FAMILY1, TEST_Q1, 125, ZERO);
775           p.add(TEST_FAMILY1, TEST_Q2, ZERO);
776           p.setACL(user2.getShortName(), new Permission(Permission.Action.READ,
777               Permission.Action.WRITE));
778           t.put(p);
779         } finally {
780           t.close();
781         }
782         return null;
783       }
784     });
785 
786     // Should be denied.
787     user2.runAs(new PrivilegedExceptionAction<Void>() {
788       @Override
789       public Void run() throws Exception {
790         HTable t = new HTable(conf, TEST_TABLE.getTableName());
791         try {
792           Put p = new Put(TEST_ROW1);
793           // column Q1 covers version at 123 fr which user2 do not have permission
794           p.add(TEST_FAMILY1, TEST_Q1, 124, ZERO);
795           p.add(TEST_FAMILY1, TEST_Q2, ZERO);
796           t.put(p);
797           fail();
798         } catch (Exception e) {
799 
800         } finally {
801           t.close();
802         }
803         return null;
804       }
805     });
806   }
807 
808   @Test
809   public void testCellPermissionsForCheckAndDelete() throws Exception {
810     final byte[] TEST_ROW1 = Bytes.toBytes("r1");
811     final byte[] ZERO = Bytes.toBytes(0L);
812 
813     final User user1 = User.createUserForTesting(conf, "user1", new String[0]);
814     final User user2 = User.createUserForTesting(conf, "user2", new String[0]);
815     
816     verifyAllowed(new AccessTestAction() {
817       @Override
818       public Object run() throws Exception {
819         HTable t = new HTable(conf, TEST_TABLE.getTableName());
820         try {
821           Map<String, Permission> permsU1andOwner = new HashMap<String, Permission>();
822           permsU1andOwner.put(user1.getShortName(), new Permission(Permission.Action.READ,
823               Permission.Action.WRITE));
824           permsU1andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
825               Permission.Action.WRITE));
826           Map<String, Permission> permsU1andU2andOwner = new HashMap<String, Permission>();
827           permsU1andU2andOwner.put(user1.getShortName(), new Permission(Permission.Action.READ,
828               Permission.Action.WRITE));
829           permsU1andU2andOwner.put(user2.getShortName(), new Permission(Permission.Action.READ,
830               Permission.Action.WRITE));
831           permsU1andU2andOwner.put(USER_OWNER.getShortName(), new Permission(Permission.Action.READ,
832               Permission.Action.WRITE));
833           Map<String, Permission> permsU1andU2 = new HashMap<String, Permission>();
834           permsU1andU2.put(user1.getShortName(), new Permission(Permission.Action.READ,
835               Permission.Action.WRITE));
836           permsU1andU2.put(user2.getShortName(), new Permission(Permission.Action.READ,
837               Permission.Action.WRITE));
838 
839           Put p = new Put(TEST_ROW1);
840           p.add(TEST_FAMILY1, TEST_Q1, 120, ZERO);
841           p.add(TEST_FAMILY1, TEST_Q2, 120, ZERO);
842           p.setACL(permsU1andU2andOwner);
843           t.put(p);
844 
845           p = new Put(TEST_ROW1);
846           p.add(TEST_FAMILY1, TEST_Q1, 123, ZERO);
847           p.add(TEST_FAMILY1, TEST_Q2, 123, ZERO);
848           p.setACL(permsU1andOwner);
849           t.put(p);
850 
851           p = new Put(TEST_ROW1);
852           p.add(TEST_FAMILY1, TEST_Q1, 127, ZERO);
853           p.setACL(permsU1andU2);
854           t.put(p);
855 
856           p = new Put(TEST_ROW1);
857           p.add(TEST_FAMILY1, TEST_Q2, 127, ZERO);
858           p.setACL(user2.getShortName(), new Permission(Permission.Action.READ));
859           t.put(p);
860         } finally {
861           t.close();
862         }
863         return null;
864       }
865     }, USER_OWNER);
866 
867     // user1 should be allowed to do the checkAndDelete. user1 having read permission on the latest
868     // version cell and write permission on all versions
869     user1.runAs(new PrivilegedExceptionAction<Void>() {
870       @Override
871       public Void run() throws Exception {
872         HTable t = new HTable(conf, TEST_TABLE.getTableName());
873         try {
874           Delete d = new Delete(TEST_ROW1);
875           d.deleteColumns(TEST_FAMILY1, TEST_Q1, 120);
876           t.checkAndDelete(TEST_ROW1, TEST_FAMILY1, TEST_Q1, ZERO, d);
877         } finally {
878           t.close();
879         }
880         return null;
881       }
882     });
883     // user2 shouldn't be allowed to do the checkAndDelete. user2 having RW permission on the latest
884     // version cell but not on cell version TS=123
885     user2.runAs(new PrivilegedExceptionAction<Void>() {
886       @Override
887       public Void run() throws Exception {
888         HTable t = new HTable(conf, TEST_TABLE.getTableName());
889         try {
890           Delete d = new Delete(TEST_ROW1);
891           d.deleteColumns(TEST_FAMILY1, TEST_Q1);
892           t.checkAndDelete(TEST_ROW1, TEST_FAMILY1, TEST_Q1, ZERO, d);
893           fail("user2 should not be allowed to do checkAndDelete");
894         } catch (Exception e) {
895         } finally {
896           t.close();
897         }
898         return null;
899       }
900     });
901     // user2 should be allowed to do the checkAndDelete when delete tries to delete the old version
902     // TS=120. user2 having R permission on the latest version(no W permission) cell
903     // and W permission on cell version TS=120.
904     user2.runAs(new PrivilegedExceptionAction<Void>() {
905       @Override
906       public Void run() throws Exception {
907         HTable t = new HTable(conf, TEST_TABLE.getTableName());
908         try {
909           Delete d = new Delete(TEST_ROW1);
910           d.deleteColumn(TEST_FAMILY1, TEST_Q2, 120);
911           t.checkAndDelete(TEST_ROW1, TEST_FAMILY1, TEST_Q2, ZERO, d);
912         } finally {
913           t.close();
914         }
915         return null;
916       }
917     });
918   }
919 
920   @After
921   public void tearDown() throws Exception {
922     // Clean the _acl_ table
923     try {
924       TEST_UTIL.deleteTable(TEST_TABLE.getTableName());
925     } catch (TableNotFoundException ex) {
926       // Test deleted the table, no problem
927       LOG.info("Test deleted table " + TEST_TABLE.getTableName());
928     }
929     assertEquals(0, AccessControlLists.getTablePermissions(conf, TEST_TABLE.getTableName()).size());
930   }
931 }