View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TreeMap;
32  import java.util.TreeSet;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellUtil;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.NamespaceDescriptor;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.Tag;
45  import org.apache.hadoop.hbase.TagType;
46  import org.apache.hadoop.hbase.client.Delete;
47  import org.apache.hadoop.hbase.client.Get;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.client.Result;
51  import org.apache.hadoop.hbase.client.ResultScanner;
52  import org.apache.hadoop.hbase.client.Scan;
53  import org.apache.hadoop.hbase.exceptions.DeserializationException;
54  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
55  import org.apache.hadoop.hbase.filter.QualifierFilter;
56  import org.apache.hadoop.hbase.filter.RegexStringComparator;
57  import org.apache.hadoop.hbase.io.compress.Compression;
58  import org.apache.hadoop.hbase.master.MasterServices;
59  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
60  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
61  import org.apache.hadoop.hbase.regionserver.BloomType;
62  import org.apache.hadoop.hbase.regionserver.HRegion;
63  import org.apache.hadoop.hbase.regionserver.InternalScanner;
64  import org.apache.hadoop.hbase.security.User;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.Pair;
67  import org.apache.hadoop.io.Text;
68  
69  import com.google.common.collect.ArrayListMultimap;
70  import com.google.common.collect.ListMultimap;
71  import com.google.common.collect.Lists;
72  import com.google.protobuf.InvalidProtocolBufferException;
73  
74  /**
75   * Maintains lists of permission grants to users and groups to allow for
76   * authorization checks by {@link AccessController}.
77   *
78   * <p>
79   * Access control lists are stored in an "internal" metadata table named
80   * {@code _acl_}. Each table's permission grants are stored as a separate row,
81   * keyed by the table name. KeyValues for permissions assignments are stored
82   * in one of the formats:
83   * <pre>
84   * Key                      Desc
85   * --------                 --------
86   * user                     table level permissions for a user [R=read, W=write]
87   * group                    table level permissions for a group
88   * user,family              column family level permissions for a user
89   * group,family             column family level permissions for a group
90   * user,family,qualifier    column qualifier level permissions for a user
91   * group,family,qualifier   column qualifier level permissions for a group
92   * </pre>
93   * All values are encoded as byte arrays containing the codes from the
94   * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
95   * </p>
96   */
97  public class AccessControlLists {
98    /** Internal storage table for access control lists */
99    public static final TableName ACL_TABLE_NAME =
100       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
101   public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
102   /** Column family used to store ACL grants */
103   public static final String ACL_LIST_FAMILY_STR = "l";
104   public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
105   /** KV tag to store per cell access control lists */
106   public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
107 
108   public static final char NAMESPACE_PREFIX = '@';
109 
110   /** Table descriptor for ACL internal table */
111   public static final HTableDescriptor ACL_TABLEDESC = new HTableDescriptor(ACL_TABLE_NAME);
112   static {
113     ACL_TABLEDESC.addFamily(
114         new HColumnDescriptor(ACL_LIST_FAMILY,
115             10, // Ten is arbitrary number.  Keep versions to help debugging.
116             Compression.Algorithm.NONE.getName(), true, true, 8 * 1024,
117             HConstants.FOREVER, BloomType.NONE.toString(),
118             HConstants.REPLICATION_SCOPE_LOCAL));
119   }
120 
121   /**
122    * Delimiter to separate user, column family, and qualifier in
123    * _acl_ table info: column keys */
124   public static final char ACL_KEY_DELIMITER = ',';
125   /** Prefix character to denote group names */
126   public static final String GROUP_PREFIX = "@";
127   /** Configuration key for superusers */
128   public static final String SUPERUSER_CONF_KEY = "hbase.superuser";
129 
130   private static Log LOG = LogFactory.getLog(AccessControlLists.class);
131 
132   /**
133    * Check for existence of {@code _acl_} table and create it if it does not exist
134    * @param master reference to HMaster
135    */
136   static void init(MasterServices master) throws IOException {
137     master.createTable(ACL_TABLEDESC, null);
138   }
139 
140   /**
141    * Stores a new user permission grant in the access control lists table.
142    * @param conf the configuration
143    * @param userPerm the details of the permission to be granted
144    * @throws IOException in the case of an error accessing the metadata table
145    */
146   static void addUserPermission(Configuration conf, UserPermission userPerm)
147       throws IOException {
148     Permission.Action[] actions = userPerm.getActions();
149     byte[] rowKey = userPermissionRowKey(userPerm);
150     Put p = new Put(rowKey);
151     byte[] key = userPermissionKey(userPerm);
152 
153     if ((actions == null) || (actions.length == 0)) {
154       String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
155       LOG.warn(msg);
156       throw new IOException(msg);
157     }
158 
159     byte[] value = new byte[actions.length];
160     for (int i = 0; i < actions.length; i++) {
161       value[i] = actions[i].code();
162     }
163     p.addImmutable(ACL_LIST_FAMILY, key, value);
164     if (LOG.isDebugEnabled()) {
165       LOG.debug("Writing permission with rowKey "+
166           Bytes.toString(rowKey)+" "+
167           Bytes.toString(key)+": "+Bytes.toStringBinary(value)
168       );
169     }
170     HTable acls = null;
171     try {
172       acls = new HTable(conf, ACL_TABLE_NAME);
173       acls.put(p);
174     } finally {
175       if (acls != null) acls.close();
176     }
177   }
178 
179   /**
180    * Removes a previously granted permission from the stored access control
181    * lists.  The {@link TablePermission} being removed must exactly match what
182    * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
183    * been granted "READ" access to the "data" table, but only to column family
184    * plus qualifier "info:colA", then trying to call this method with only
185    * user "bob" and the table name "data" (but without specifying the
186    * column qualifier "info:colA") will have no effect.
187    *
188    * @param conf the configuration
189    * @param userPerm the details of the permission to be revoked
190    * @throws IOException if there is an error accessing the metadata table
191    */
192   static void removeUserPermission(Configuration conf, UserPermission userPerm)
193       throws IOException {
194     Delete d = new Delete(userPermissionRowKey(userPerm));
195     byte[] key = userPermissionKey(userPerm);
196 
197     if (LOG.isDebugEnabled()) {
198       LOG.debug("Removing permission "+ userPerm.toString());
199     }
200     d.deleteColumns(ACL_LIST_FAMILY, key);
201     HTable acls = null;
202     try {
203       acls = new HTable(conf, ACL_TABLE_NAME);
204       acls.delete(d);
205     } finally {
206       if (acls != null) acls.close();
207     }
208   }
209 
210   /**
211    * Remove specified table from the _acl_ table.
212    */
213   static void removeTablePermissions(Configuration conf, TableName tableName)
214       throws IOException{
215     Delete d = new Delete(tableName.getName());
216 
217     if (LOG.isDebugEnabled()) {
218       LOG.debug("Removing permissions of removed table "+ tableName);
219     }
220 
221     HTable acls = null;
222     try {
223       acls = new HTable(conf, ACL_TABLE_NAME);
224       acls.delete(d);
225     } finally {
226       if (acls != null) acls.close();
227     }
228   }
229 
230   /**
231    * Remove specified namespace from the acl table.
232    */
233   static void removeNamespacePermissions(Configuration conf, String namespace)
234       throws IOException{
235     Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
236 
237     if (LOG.isDebugEnabled()) {
238       LOG.debug("Removing permissions of removed namespace "+ namespace);
239     }
240 
241     HTable acls = null;
242     try {
243       acls = new HTable(conf, ACL_TABLE_NAME);
244       acls.delete(d);
245     } finally {
246       if (acls != null) acls.close();
247     }
248   }
249 
250   /**
251    * Remove specified table column from the acl table.
252    */
253   static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column)
254       throws IOException{
255 
256     if (LOG.isDebugEnabled()) {
257       LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
258                 " from table "+ tableName);
259     }
260 
261     HTable acls = null;
262     try {
263       acls = new HTable(conf, ACL_TABLE_NAME);
264 
265       Scan scan = new Scan();
266       scan.addFamily(ACL_LIST_FAMILY);
267 
268       String columnName = Bytes.toString(column);
269       scan.setFilter(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(
270                      String.format("(%s%s%s)|(%s%s)$",
271                      ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
272                      ACL_KEY_DELIMITER, columnName))));
273 
274       Set<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
275       ResultScanner scanner = acls.getScanner(scan);
276       try {
277         for (Result res : scanner) {
278           for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
279             qualifierSet.add(q);
280           }
281         }
282       } finally {
283         scanner.close();
284       }
285 
286       if (qualifierSet.size() > 0) {
287         Delete d = new Delete(tableName.getName());
288         for (byte[] qualifier : qualifierSet) {
289           d.deleteColumns(ACL_LIST_FAMILY, qualifier);
290         }
291         acls.delete(d);
292       }
293     } finally {
294       if (acls != null) acls.close();
295     }
296   }
297 
298   static byte[] userPermissionRowKey(UserPermission userPerm) {
299     byte[] row;
300     if(userPerm.hasNamespace()) {
301       row = Bytes.toBytes(toNamespaceEntry(userPerm.getNamespace()));
302     } else if(userPerm.isGlobal()) {
303       row = ACL_GLOBAL_NAME;
304     } else {
305       row = userPerm.getTableName().getName();
306     }
307     return row;
308   }
309 
310   /**
311    * Build qualifier key from user permission:
312    *  username
313    *  username,family
314    *  username,family,qualifier
315    */
316   static byte[] userPermissionKey(UserPermission userPerm) {
317     byte[] qualifier = userPerm.getQualifier();
318     byte[] family = userPerm.getFamily();
319     byte[] key = userPerm.getUser();
320 
321     if (family != null && family.length > 0) {
322       key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
323       if (qualifier != null && qualifier.length > 0) {
324         key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
325       }
326     }
327 
328     return key;
329   }
330 
331   /**
332    * Returns {@code true} if the given region is part of the {@code _acl_}
333    * metadata table.
334    */
335   static boolean isAclRegion(HRegion region) {
336     return ACL_TABLE_NAME.equals(region.getTableDesc().getTableName());
337   }
338 
339   /**
340    * Returns {@code true} if the given table is {@code _acl_} metadata table.
341    */
342   static boolean isAclTable(HTableDescriptor desc) {
343     return ACL_TABLE_NAME.equals(desc.getTableName());
344   }
345 
346   /**
347    * Loads all of the permission grants stored in a region of the {@code _acl_}
348    * table.
349    *
350    * @param aclRegion
351    * @return a map of the permissions for this table.
352    * @throws IOException
353    */
354   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
355       HRegion aclRegion)
356     throws IOException {
357 
358     if (!isAclRegion(aclRegion)) {
359       throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
360     }
361 
362     Map<byte[], ListMultimap<String, TablePermission>> allPerms =
363         new TreeMap<byte[], ListMultimap<String, TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
364 
365     // do a full scan of _acl_ table
366 
367     Scan scan = new Scan();
368     scan.addFamily(ACL_LIST_FAMILY);
369 
370     InternalScanner iScanner = null;
371     try {
372       iScanner = aclRegion.getScanner(scan);
373 
374       while (true) {
375         List<Cell> row = new ArrayList<Cell>();
376 
377         boolean hasNext = iScanner.next(row);
378         ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
379         byte[] entry = null;
380         for (Cell kv : row) {
381           if (entry == null) {
382             entry = CellUtil.cloneRow(kv);
383           }
384           Pair<String,TablePermission> permissionsOfUserOnTable =
385               parsePermissionRecord(entry, kv);
386           if (permissionsOfUserOnTable != null) {
387             String username = permissionsOfUserOnTable.getFirst();
388             TablePermission permissions = permissionsOfUserOnTable.getSecond();
389             perms.put(username, permissions);
390           }
391         }
392         if (entry != null) {
393           allPerms.put(entry, perms);
394         }
395         if (!hasNext) {
396           break;
397         }
398       }
399     } finally {
400       if (iScanner != null) {
401         iScanner.close();
402       }
403     }
404 
405     return allPerms;
406   }
407 
408   /**
409    * Load all permissions from the region server holding {@code _acl_},
410    * primarily intended for testing purposes.
411    */
412   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
413       Configuration conf) throws IOException {
414     Map<byte[], ListMultimap<String,TablePermission>> allPerms =
415         new TreeMap<byte[], ListMultimap<String,TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
416 
417     // do a full scan of _acl_, filtering on only first table region rows
418 
419     Scan scan = new Scan();
420     scan.addFamily(ACL_LIST_FAMILY);
421 
422     HTable acls = null;
423     ResultScanner scanner = null;
424     try {
425       acls = new HTable(conf, ACL_TABLE_NAME);
426       scanner = acls.getScanner(scan);
427       for (Result row : scanner) {
428         ListMultimap<String,TablePermission> resultPerms =
429             parsePermissions(row.getRow(), row);
430         allPerms.put(row.getRow(), resultPerms);
431       }
432     } finally {
433       if (scanner != null) scanner.close();
434       if (acls != null) acls.close();
435     }
436 
437     return allPerms;
438   }
439 
440   static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
441         TableName tableName) throws IOException {
442     return getPermissions(conf, tableName != null ? tableName.getName() : null);
443   }
444 
445   static ListMultimap<String, TablePermission> getNamespacePermissions(Configuration conf,
446         String namespace) throws IOException {
447     return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
448   }
449 
450   /**
451    * Reads user permission assignments stored in the <code>l:</code> column
452    * family of the first table row in <code>_acl_</code>.
453    *
454    * <p>
455    * See {@link AccessControlLists class documentation} for the key structure
456    * used for storage.
457    * </p>
458    */
459   static ListMultimap<String, TablePermission> getPermissions(Configuration conf,
460       byte[] entryName) throws IOException {
461     if (entryName == null) entryName = ACL_TABLE_NAME.getName();
462 
463     // for normal user tables, we just read the table row from _acl_
464     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
465     HTable acls = null;
466     try {
467       acls = new HTable(conf, ACL_TABLE_NAME);
468       Get get = new Get(entryName);
469       get.addFamily(ACL_LIST_FAMILY);
470       Result row = acls.get(get);
471       if (!row.isEmpty()) {
472         perms = parsePermissions(entryName, row);
473       } else {
474         LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
475             + Bytes.toString(entryName));
476       }
477     } finally {
478       if (acls != null) acls.close();
479     }
480 
481     return perms;
482   }
483 
484   /**
485    * Returns the currently granted permissions for a given table as a list of
486    * user plus associated permissions.
487    */
488   static List<UserPermission> getUserTablePermissions(
489       Configuration conf, TableName tableName) throws IOException {
490     return getUserPermissions(conf, tableName == null ? null : tableName.getName());
491   }
492 
493   static List<UserPermission> getUserNamespacePermissions(
494       Configuration conf, String namespace) throws IOException {
495     return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
496   }
497 
498   static List<UserPermission> getUserPermissions(
499       Configuration conf, byte[] entryName)
500   throws IOException {
501     ListMultimap<String,TablePermission> allPerms = getPermissions(
502       conf, entryName);
503 
504     List<UserPermission> perms = new ArrayList<UserPermission>();
505 
506     for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
507       UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
508           entry.getValue().getTableName(), entry.getValue().getFamily(),
509           entry.getValue().getQualifier(), entry.getValue().getActions());
510       perms.add(up);
511     }
512     return perms;
513   }
514 
515   private static ListMultimap<String, TablePermission> parsePermissions(
516       byte[] entryName, Result result) {
517     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
518     if (result != null && result.size() > 0) {
519       for (Cell kv : result.rawCells()) {
520 
521         Pair<String,TablePermission> permissionsOfUserOnTable =
522             parsePermissionRecord(entryName, kv);
523 
524         if (permissionsOfUserOnTable != null) {
525           String username = permissionsOfUserOnTable.getFirst();
526           TablePermission permissions = permissionsOfUserOnTable.getSecond();
527           perms.put(username, permissions);
528         }
529       }
530     }
531     return perms;
532   }
533 
534   private static Pair<String, TablePermission> parsePermissionRecord(
535       byte[] entryName, Cell kv) {
536     // return X given a set of permissions encoded in the permissionRecord kv.
537     byte[] family = CellUtil.cloneFamily(kv);
538 
539     if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
540       return null;
541     }
542 
543     byte[] key = CellUtil.cloneQualifier(kv);
544     byte[] value = CellUtil.cloneValue(kv);
545     if (LOG.isDebugEnabled()) {
546       LOG.debug("Read acl: kv ["+
547                 Bytes.toStringBinary(key)+": "+
548                 Bytes.toStringBinary(value)+"]");
549     }
550 
551     // check for a column family appended to the key
552     // TODO: avoid the string conversion to make this more efficient
553     String username = Bytes.toString(key);
554 
555     //Handle namespace entry
556     if(isNamespaceEntry(entryName)) {
557       return new Pair<String, TablePermission>(username,
558           new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
559     }
560 
561     //Handle table and global entry
562     //TODO global entry should be handled differently
563     int idx = username.indexOf(ACL_KEY_DELIMITER);
564     byte[] permFamily = null;
565     byte[] permQualifier = null;
566     if (idx > 0 && idx < username.length()-1) {
567       String remainder = username.substring(idx+1);
568       username = username.substring(0, idx);
569       idx = remainder.indexOf(ACL_KEY_DELIMITER);
570       if (idx > 0 && idx < remainder.length()-1) {
571         permFamily = Bytes.toBytes(remainder.substring(0, idx));
572         permQualifier = Bytes.toBytes(remainder.substring(idx+1));
573       } else {
574         permFamily = Bytes.toBytes(remainder);
575       }
576     }
577 
578     return new Pair<String,TablePermission>(username,
579         new TablePermission(TableName.valueOf(entryName), permFamily, permQualifier, value));
580   }
581 
582   /**
583    * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
584    * and returns the resulting byte array.
585    *
586    * Writes a set of permission [user: table permission]
587    */
588   public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
589       Configuration conf) {
590     return ProtobufUtil.prependPBMagic(ProtobufUtil.toUserTablePermissions(perms).toByteArray());
591   }
592 
593   /**
594    * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
595    * from the input stream.
596    */
597   public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
598       Configuration conf)
599   throws DeserializationException {
600     if (ProtobufUtil.isPBMagicPrefix(data)) {
601       int pblen = ProtobufUtil.lengthOfPBMagic();
602       try {
603         AccessControlProtos.UsersAndPermissions perms =
604           AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
605             data, pblen, data.length - pblen).build();
606         return ProtobufUtil.toUserTablePermissions(perms);
607       } catch (InvalidProtocolBufferException e) {
608         throw new DeserializationException(e);
609       }
610     } else {
611       ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
612       try {
613         DataInput in = new DataInputStream(new ByteArrayInputStream(data));
614         int length = in.readInt();
615         for (int i=0; i<length; i++) {
616           String user = Text.readString(in);
617           List<TablePermission> userPerms =
618             (List)HbaseObjectWritableFor96Migration.readObject(in, conf);
619           perms.putAll(user, userPerms);
620         }
621       } catch (IOException e) {
622         throw new DeserializationException(e);
623       }
624       return perms;
625     }
626   }
627 
628   /**
629    * Returns whether or not the given name should be interpreted as a group
630    * principal.  Currently this simply checks if the name starts with the
631    * special group prefix character ("@").
632    */
633   public static boolean isGroupPrincipal(String name) {
634     return name != null && name.startsWith(GROUP_PREFIX);
635   }
636 
637   /**
638    * Returns the actual name for a group principal (stripped of the
639    * group prefix).
640    */
641   public static String getGroupName(String aclKey) {
642     if (!isGroupPrincipal(aclKey)) {
643       return aclKey;
644     }
645 
646     return aclKey.substring(GROUP_PREFIX.length());
647   }
648 
649   public static boolean isNamespaceEntry(String entryName) {
650     return entryName.charAt(0) == NAMESPACE_PREFIX;
651   }
652 
653   public static boolean isNamespaceEntry(byte[] entryName) {
654     return entryName[0] == NAMESPACE_PREFIX;
655   }
656   
657   public static String toNamespaceEntry(String namespace) {
658      return NAMESPACE_PREFIX + namespace;
659    }
660 
661    public static String fromNamespaceEntry(String namespace) {
662      if(namespace.charAt(0) != NAMESPACE_PREFIX)
663        throw new IllegalArgumentException("Argument is not a valid namespace entry");
664      return namespace.substring(1);
665    }
666 
667    public static byte[] toNamespaceEntry(byte[] namespace) {
668      byte[] ret = new byte[namespace.length+1];
669      ret[0] = NAMESPACE_PREFIX;
670      System.arraycopy(namespace, 0, ret, 1, namespace.length);
671      return ret;
672    }
673 
674    public static byte[] fromNamespaceEntry(byte[] namespace) {
675      if(namespace[0] != NAMESPACE_PREFIX) {
676        throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
677            Bytes.toString(namespace));
678      }
679      return Arrays.copyOfRange(namespace, 1, namespace.length);
680    }
681 
682    public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
683        throws IOException {
684      List<Permission> results = Lists.newArrayList();
685      Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
686         cell.getTagsLengthUnsigned());
687      while (tagsIterator.hasNext()) {
688        Tag tag = tagsIterator.next();
689        if (tag.getType() == ACL_TAG_TYPE) {
690          // Deserialize the table permissions from the KV
691          ListMultimap<String,Permission> kvPerms = ProtobufUtil.toUsersAndPermissions(
692            AccessControlProtos.UsersAndPermissions.newBuilder().mergeFrom(
693              tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()).build());
694          // Are there permissions for this user?
695          List<Permission> userPerms = kvPerms.get(user.getShortName());
696          if (userPerms != null) {
697            results.addAll(userPerms);
698          }
699          // Are there permissions for any of the groups this user belongs to?
700          String groupNames[] = user.getGroupNames();
701          if (groupNames != null) {
702            for (String group : groupNames) {
703              List<Permission> groupPerms = kvPerms.get(GROUP_PREFIX + group);
704              if (results != null) {
705                results.addAll(groupPerms);
706              }
707            }
708          }
709        }
710      }
711      return results;
712    }
713 }