View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.IOException;
23  import java.lang.reflect.Field;
24  import java.lang.reflect.InvocationTargetException;
25  import java.util.TreeMap;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35  import org.apache.hadoop.hbase.util.FSUtils;
36  import org.apache.hadoop.hbase.wal.WAL;
37  import org.apache.hadoop.hbase.wal.WALProvider;
38  import org.apache.hadoop.io.SequenceFile;
39  import org.apache.hadoop.io.SequenceFile.CompressionType;
40  import org.apache.hadoop.io.SequenceFile.Metadata;
41  import org.apache.hadoop.io.Text;
42  import org.apache.hadoop.io.compress.CompressionCodec;
43  import org.apache.hadoop.io.compress.DefaultCodec;
44  
45  /**
46   * Implementation of {@link WALProvider.Writer} that delegates to
47   * SequenceFile.Writer. Legacy implementation only used for compat tests.
48   *
49   * Note that because this class writes to the legacy hadoop-specific SequenceFile
50   * format, users of it must write {@link HLogKey} keys and not arbitrary
51   * {@link WALKey}s because the latter are not Writables (nor made to work with
52   * Hadoop serialization).
53   */
54  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
55  public class SequenceFileLogWriter extends WriterBase {
56    private static final Log LOG = LogFactory.getLog(SequenceFileLogWriter.class);
57    // The sequence file we delegate to.
58    private SequenceFile.Writer writer;
59    // This is the FSDataOutputStream instance that is the 'out' instance
60    // in the SequenceFile.Writer 'writer' instance above.
61    private FSDataOutputStream writer_out;
62  
63    // Legacy stuff from pre-PB WAL metadata.
64    private static final Text WAL_VERSION_KEY = new Text("version");
65    private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
66    private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
67  
68    /**
69     * Default constructor.
70     */
71    public SequenceFileLogWriter() {
72      super();
73    }
74    /**
75     * Create sequence file Metadata for our WAL file with version and compression
76     * type (if any).
77     * @param conf
78     * @param compress
79     * @return Metadata instance.
80     */
81    private static Metadata createMetadata(final Configuration conf,
82        final boolean compress) {
83      TreeMap<Text, Text> metaMap = new TreeMap<Text, Text>();
84      metaMap.put(WAL_VERSION_KEY, new Text("1"));
85      if (compress) {
86        // Currently we only do one compression type.
87        metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE);
88      }
89      return new Metadata(metaMap);
90    }
91  
92    @Override
93    public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
94    throws IOException {
95      super.init(fs, path, conf, overwritable);
96      boolean compress = initializeCompressionContext(conf, path);
97      // Create a SF.Writer instance.
98      try {
99        // reflection for a version of SequenceFile.createWriter that doesn't
100       // automatically create the parent directory (see HBASE-2312)
101       this.writer = (SequenceFile.Writer) SequenceFile.class
102         .getMethod("createWriter", new Class[] {FileSystem.class,
103             Configuration.class, Path.class, Class.class, Class.class,
104             Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
105             CompressionType.class, CompressionCodec.class, Metadata.class})
106         .invoke(null, new Object[] {fs, conf, path, HLogKey.class, WALEdit.class,
107             Integer.valueOf(FSUtils.getDefaultBufferSize(fs)),
108             Short.valueOf((short)
109               conf.getInt("hbase.regionserver.hlog.replication",
110               FSUtils.getDefaultReplication(fs, path))),
111             Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize",
112                 FSUtils.getDefaultBlockSize(fs, path))),
113             Boolean.valueOf(false) /*createParent*/,
114             SequenceFile.CompressionType.NONE, new DefaultCodec(),
115             createMetadata(conf, compress)
116             });
117     } catch (InvocationTargetException ite) {
118       // function was properly called, but threw it's own exception
119       throw new IOException(ite.getCause());
120     } catch (Exception e) {
121       // ignore all other exceptions. related to reflection failure
122     }
123 
124     // if reflection failed, use the old createWriter
125     if (this.writer == null) {
126       LOG.debug("new createWriter -- HADOOP-6840 -- not available");
127       this.writer = SequenceFile.createWriter(fs, conf, path,
128         HLogKey.class, WALEdit.class,
129         FSUtils.getDefaultBufferSize(fs),
130         (short) conf.getInt("hbase.regionserver.hlog.replication",
131           FSUtils.getDefaultReplication(fs, path)),
132         conf.getLong("hbase.regionserver.hlog.blocksize",
133           FSUtils.getDefaultBlockSize(fs, path)),
134         SequenceFile.CompressionType.NONE,
135         new DefaultCodec(),
136         null,
137         createMetadata(conf, compress));
138     } else {
139       if (LOG.isTraceEnabled()) LOG.trace("Using new createWriter -- HADOOP-6840");
140     }
141     
142     this.writer_out = getSequenceFilePrivateFSDataOutputStreamAccessible();
143     if (LOG.isTraceEnabled()) LOG.trace("Path=" + path + ", compression=" + compress);
144   }
145 
146   // Get at the private FSDataOutputStream inside in SequenceFile so we can
147   // call sync on it.  Make it accessible.
148   private FSDataOutputStream getSequenceFilePrivateFSDataOutputStreamAccessible()
149   throws IOException {
150     FSDataOutputStream out = null;
151     final Field fields [] = this.writer.getClass().getDeclaredFields();
152     final String fieldName = "out";
153     for (int i = 0; i < fields.length; ++i) {
154       if (fieldName.equals(fields[i].getName())) {
155         try {
156           // Make the 'out' field up in SF.Writer accessible.
157           fields[i].setAccessible(true);
158           out = (FSDataOutputStream)fields[i].get(this.writer);
159           break;
160         } catch (IllegalAccessException ex) {
161           throw new IOException("Accessing " + fieldName, ex);
162         } catch (SecurityException e) {
163           LOG.warn("Does not have access to out field from FSDataOutputStream",
164               e);
165         }
166       }
167     }
168     return out;
169   }
170 
171   @Override
172   public void append(WAL.Entry entry) throws IOException {
173     entry.setCompressionContext(compressionContext);
174     try {
175       this.writer.append(entry.getKey(), entry.getEdit());
176     } catch (NullPointerException npe) {
177       // Concurrent close...
178       throw new IOException(npe);
179     }
180   }
181 
182   @Override
183   public void close() throws IOException {
184     if (this.writer != null) {
185       try {
186         this.writer.close();
187       } catch (NullPointerException npe) {
188         // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
189         LOG.warn(npe);
190       }
191       this.writer = null;
192     }
193   }
194 
195   @Override
196   public void sync() throws IOException {
197     try {
198       this.writer.syncFs();
199     } catch (NullPointerException npe) {
200       // Concurrent close...
201       throw new IOException(npe);
202     }
203   }
204 
205   @Override
206   public long getLength() throws IOException {
207     try {
208       return this.writer.getLength();
209     } catch (NullPointerException npe) {
210       // Concurrent close...
211       throw new IOException(npe);
212     }
213   }
214 
215   /**
216    * @return The dfsclient out stream up inside SF.Writer made accessible, or
217    * null if not available.
218    */
219   public FSDataOutputStream getWriterFSDataOutputStream() {
220     return this.writer_out;
221   }
222 }