View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.IOException;
23  import java.lang.reflect.Field;
24  import java.lang.reflect.InvocationTargetException;
25  import java.util.TreeMap;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.classification.InterfaceAudience;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
35  import org.apache.hadoop.hbase.util.FSUtils;
36  import org.apache.hadoop.io.SequenceFile;
37  import org.apache.hadoop.io.SequenceFile.CompressionType;
38  import org.apache.hadoop.io.SequenceFile.Metadata;
39  import org.apache.hadoop.io.Text;
40  import org.apache.hadoop.io.compress.CompressionCodec;
41  import org.apache.hadoop.io.compress.DefaultCodec;
42  
43  /**
44   * Implementation of {@link HLog.Writer} that delegates to
45   * SequenceFile.Writer. Legacy implementation only used for compat tests.
46   */
47  @InterfaceAudience.Private
48  public class SequenceFileLogWriter extends WriterBase {
49    private final Log LOG = LogFactory.getLog(this.getClass());
50    // The sequence file we delegate to.
51    private SequenceFile.Writer writer;
52    // This is the FSDataOutputStream instance that is the 'out' instance
53    // in the SequenceFile.Writer 'writer' instance above.
54    private FSDataOutputStream writer_out;
55  
56    // Legacy stuff from pre-PB WAL metadata.
57    private static final Text WAL_VERSION_KEY = new Text("version");
58    private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
59    private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
60  
61    /**
62     * Default constructor.
63     */
64    public SequenceFileLogWriter() {
65      super();
66    }
67    /**
68     * Create sequence file Metadata for our WAL file with version and compression
69     * type (if any).
70     * @param conf
71     * @param compress
72     * @return Metadata instance.
73     */
74    private static Metadata createMetadata(final Configuration conf,
75        final boolean compress) {
76      TreeMap<Text, Text> metaMap = new TreeMap<Text, Text>();
77      metaMap.put(WAL_VERSION_KEY, new Text("1"));
78      if (compress) {
79        // Currently we only do one compression type.
80        metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE);
81      }
82      return new Metadata(metaMap);
83    }
84  
85    @Override
86    public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
87    throws IOException {
88      super.init(fs, path, conf, overwritable);
89      boolean compress = initializeCompressionContext(conf, path);
90      // Create a SF.Writer instance.
91      try {
92        // reflection for a version of SequenceFile.createWriter that doesn't
93        // automatically create the parent directory (see HBASE-2312)
94        this.writer = (SequenceFile.Writer) SequenceFile.class
95          .getMethod("createWriter", new Class[] {FileSystem.class,
96              Configuration.class, Path.class, Class.class, Class.class,
97              Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
98              CompressionType.class, CompressionCodec.class, Metadata.class})
99          .invoke(null, new Object[] {fs, conf, path, HLogKey.class, WALEdit.class,
100             Integer.valueOf(FSUtils.getDefaultBufferSize(fs)),
101             Short.valueOf((short)
102               conf.getInt("hbase.regionserver.hlog.replication",
103               FSUtils.getDefaultReplication(fs, path))),
104             Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize",
105                 FSUtils.getDefaultBlockSize(fs, path))),
106             Boolean.valueOf(false) /*createParent*/,
107             SequenceFile.CompressionType.NONE, new DefaultCodec(),
108             createMetadata(conf, compress)
109             });
110     } catch (InvocationTargetException ite) {
111       // function was properly called, but threw it's own exception
112       throw new IOException(ite.getCause());
113     } catch (Exception e) {
114       // ignore all other exceptions. related to reflection failure
115     }
116 
117     // if reflection failed, use the old createWriter
118     if (this.writer == null) {
119       LOG.debug("new createWriter -- HADOOP-6840 -- not available");
120       this.writer = SequenceFile.createWriter(fs, conf, path,
121         HLogKey.class, WALEdit.class,
122         FSUtils.getDefaultBufferSize(fs),
123         (short) conf.getInt("hbase.regionserver.hlog.replication",
124           FSUtils.getDefaultReplication(fs, path)),
125         conf.getLong("hbase.regionserver.hlog.blocksize",
126           FSUtils.getDefaultBlockSize(fs, path)),
127         SequenceFile.CompressionType.NONE,
128         new DefaultCodec(),
129         null,
130         createMetadata(conf, compress));
131     } else {
132       if (LOG.isTraceEnabled()) LOG.trace("Using new createWriter -- HADOOP-6840");
133     }
134     
135     this.writer_out = getSequenceFilePrivateFSDataOutputStreamAccessible();
136     if (LOG.isTraceEnabled()) LOG.trace("Path=" + path + ", compression=" + compress);
137   }
138 
139   // Get at the private FSDataOutputStream inside in SequenceFile so we can
140   // call sync on it.  Make it accessible.
141   private FSDataOutputStream getSequenceFilePrivateFSDataOutputStreamAccessible()
142   throws IOException {
143     FSDataOutputStream out = null;
144     final Field fields [] = this.writer.getClass().getDeclaredFields();
145     final String fieldName = "out";
146     for (int i = 0; i < fields.length; ++i) {
147       if (fieldName.equals(fields[i].getName())) {
148         try {
149           // Make the 'out' field up in SF.Writer accessible.
150           fields[i].setAccessible(true);
151           out = (FSDataOutputStream)fields[i].get(this.writer);
152           break;
153         } catch (IllegalAccessException ex) {
154           throw new IOException("Accessing " + fieldName, ex);
155         } catch (SecurityException e) {
156           LOG.warn("Does not have access to out field from FSDataOutputStream",
157               e);
158         }
159       }
160     }
161     return out;
162   }
163 
164   @Override
165   public void append(HLog.Entry entry) throws IOException {
166     entry.setCompressionContext(compressionContext);
167     try {
168       this.writer.append(entry.getKey(), entry.getEdit());
169     } catch (NullPointerException npe) {
170       // Concurrent close...
171       throw new IOException(npe);
172     }
173   }
174 
175   @Override
176   public void close() throws IOException {
177     if (this.writer != null) {
178       try {
179         this.writer.close();
180       } catch (NullPointerException npe) {
181         // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
182         LOG.warn(npe);
183       }
184       this.writer = null;
185     }
186   }
187 
188   @Override
189   public void sync() throws IOException {
190     try {
191       this.writer.syncFs();
192     } catch (NullPointerException npe) {
193       // Concurrent close...
194       throw new IOException(npe);
195     }
196   }
197 
198   @Override
199   public long getLength() throws IOException {
200     try {
201       return this.writer.getLength();
202     } catch (NullPointerException npe) {
203       // Concurrent close...
204       throw new IOException(npe);
205     }
206   }
207 
208   /**
209    * @return The dfsclient out stream up inside SF.Writer made accessible, or
210    * null if not available.
211    */
212   public FSDataOutputStream getWriterFSDataOutputStream() {
213     return this.writer_out;
214   }
215 
216   /**
217    * This method is empty as trailer is added only in Protobuf based hlog readers/writers.
218    */
219   @Override
220   public void setWALTrailer(WALTrailer walTrailer) {
221   }
222 }