View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import static org.junit.Assert.assertTrue;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Random;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.HColumnDescriptor;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.LargeTests;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.Durability;
40  import org.apache.hadoop.hbase.client.Get;
41  import org.apache.hadoop.hbase.client.HBaseAdmin;
42  import org.apache.hadoop.hbase.client.HTable;
43  import org.apache.hadoop.hbase.client.Put;
44  import org.apache.hadoop.hbase.client.Result;
45  import org.apache.hadoop.hbase.regionserver.HRegionServer;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.Threads;
48  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
49  import org.junit.After;
50  import org.junit.AfterClass;
51  import org.junit.Before;
52  import org.junit.BeforeClass;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  
56  /**
57   * Tests changing data block encoding settings of a column family.
58   */
59  @Category(LargeTests.class)
60  public class TestChangingEncoding {
61    private static final Log LOG = LogFactory.getLog(TestChangingEncoding.class);
62    static final String CF = "EncodingTestCF";
63    static final byte[] CF_BYTES = Bytes.toBytes(CF);
64  
65    private static final int NUM_ROWS_PER_BATCH = 100;
66    private static final int NUM_COLS_PER_ROW = 20;
67  
68    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
69    private static final Configuration conf = TEST_UTIL.getConfiguration();
70  
71    private static final int TIMEOUT_MS = 600000;
72  
73    private HBaseAdmin admin;
74    private HColumnDescriptor hcd;
75  
76    private String tableName;
77    private static final List<DataBlockEncoding> ENCODINGS_TO_ITERATE =
78        createEncodingsToIterate();
79  
80    private static final List<DataBlockEncoding> createEncodingsToIterate() {
81      List<DataBlockEncoding> encodings = new ArrayList<DataBlockEncoding>(
82          Arrays.asList(DataBlockEncoding.values()));
83      encodings.add(DataBlockEncoding.NONE);
84      return Collections.unmodifiableList(encodings);
85    }
86  
87    /** A zero-based index of the current batch of test data being written */
88    private int numBatchesWritten;
89  
90    private void prepareTest(String testId) throws IOException {
91      tableName = "test_table_" + testId;
92      HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
93      hcd = new HColumnDescriptor(CF);
94      htd.addFamily(hcd);
95      admin.createTable(htd);
96      numBatchesWritten = 0;
97    }
98  
99    @BeforeClass
100   public static void setUpBeforeClass() throws Exception {
101     // Use a small flush size to create more HFiles.
102     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
103     // ((Log4JLogger)RpcServerImplementation.LOG).getLogger().setLevel(Level.TRACE);
104     // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.TRACE);
105     conf.setBoolean("hbase.online.schema.update.enable", true);
106     TEST_UTIL.startMiniCluster();
107   }
108 
109   @AfterClass
110   public static void tearDownAfterClass() throws Exception {
111     TEST_UTIL.shutdownMiniCluster();
112   }
113 
114   @Before
115   public void setUp() throws Exception {
116     admin = new HBaseAdmin(conf);
117   }
118 
119   @After
120   public void tearDown() throws IOException {
121     admin.close();
122   }
123 
124   private static byte[] getRowKey(int batchId, int i) {
125     return Bytes.toBytes("batch" + batchId + "_row" + i);
126   }
127 
128   private static byte[] getQualifier(int j) {
129     return Bytes.toBytes("col" + j);
130   }
131 
132   private static byte[] getValue(int batchId, int i, int j) {
133     return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i))
134         + "_col" + j);
135   }
136 
137   static void writeTestDataBatch(Configuration conf, String tableName,
138       int batchId) throws Exception {
139     LOG.debug("Writing test data batch " + batchId);
140     HTable table = new HTable(conf, tableName);
141     table.setAutoFlushTo(false);
142     for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
143       Put put = new Put(getRowKey(batchId, i));
144       for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
145         put.add(CF_BYTES, getQualifier(j),
146             getValue(batchId, i, j));
147       }
148       put.setDurability(Durability.SKIP_WAL);
149       table.put(put);
150     }
151     table.flushCommits();
152     table.close();
153   }
154 
155   static void verifyTestDataBatch(Configuration conf, String tableName,
156       int batchId) throws Exception {
157     LOG.debug("Verifying test data batch " + batchId);
158     HTable table = new HTable(conf, tableName);
159     for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
160       Get get = new Get(getRowKey(batchId, i));
161       Result result = table.get(get);
162       for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
163         Cell kv = result.getColumnLatestCell(CF_BYTES, getQualifier(j));
164         assertTrue(CellUtil.matchingValue(kv, getValue(batchId, i, j)));
165       }
166     }
167     table.close();
168   }
169 
170   private void writeSomeNewData() throws Exception {
171     writeTestDataBatch(conf, tableName, numBatchesWritten);
172     ++numBatchesWritten;
173   }
174 
175   private void verifyAllData() throws Exception {
176     for (int i = 0; i < numBatchesWritten; ++i) {
177       verifyTestDataBatch(conf, tableName, i);
178     }
179   }
180 
181   private void setEncodingConf(DataBlockEncoding encoding,
182       boolean onlineChange) throws Exception {
183     LOG.debug("Setting CF encoding to " + encoding + " (ordinal="
184       + encoding.ordinal() + "), onlineChange=" + onlineChange);
185     hcd.setDataBlockEncoding(encoding);
186     if (!onlineChange) {
187       admin.disableTable(tableName);
188     }
189     admin.modifyColumn(tableName, hcd);
190     if (!onlineChange) {
191       admin.enableTable(tableName);
192     }
193     // This is a unit test, not integration test. So let's
194     // wait for regions out of transition. Otherwise, for online
195     // encoding change, verification phase may be flaky because
196     // regions could be still in transition.
197     ZKAssign.blockUntilNoRIT(TEST_UTIL.getZooKeeperWatcher());
198   }
199 
200   @Test(timeout=TIMEOUT_MS)
201   public void testChangingEncoding() throws Exception {
202     prepareTest("ChangingEncoding");
203     for (boolean onlineChange : new boolean[]{false, true}) {
204       for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
205         setEncodingConf(encoding, onlineChange);
206         writeSomeNewData();
207         verifyAllData();
208       }
209     }
210   }
211 
212   @Test(timeout=TIMEOUT_MS)
213   public void testChangingEncodingWithCompaction() throws Exception {
214     prepareTest("ChangingEncodingWithCompaction");
215     for (boolean onlineChange : new boolean[]{false, true}) {
216       for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
217         setEncodingConf(encoding, onlineChange);
218         writeSomeNewData();
219         verifyAllData();
220         compactAndWait();
221         verifyAllData();
222       }
223     }
224   }
225 
226   private void compactAndWait() throws IOException, InterruptedException {
227     LOG.debug("Compacting table " + tableName);
228     HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
229     admin.majorCompact(tableName);
230 
231     // Waiting for the compaction to start, at least .5s.
232     final long maxWaitime = System.currentTimeMillis() + 500;
233     boolean cont;
234     do {
235       cont = rs.compactSplitThread.getCompactionQueueSize() == 0;
236       Threads.sleep(1);
237     } while (cont && System.currentTimeMillis() < maxWaitime);
238 
239     while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
240       Threads.sleep(1);
241     }
242     LOG.debug("Compaction queue size reached 0, continuing");
243   }
244 
245   @Test
246   public void testCrazyRandomChanges() throws Exception {
247     prepareTest("RandomChanges");
248     Random rand = new Random(2934298742974297L);
249     for (int i = 0; i < 20; ++i) {
250       int encodingOrdinal = rand.nextInt(DataBlockEncoding.values().length);
251       DataBlockEncoding encoding = DataBlockEncoding.values()[encodingOrdinal];
252       setEncodingConf(encoding, rand.nextBoolean());
253       writeSomeNewData();
254       verifyAllData();
255     }
256   }
257 
258 }