1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.encoding;
18
19 import static org.junit.Assert.assertTrue;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Random;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.testclassification.LargeTests;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.client.Admin;
40 import org.apache.hadoop.hbase.client.Connection;
41 import org.apache.hadoop.hbase.client.ConnectionFactory;
42 import org.apache.hadoop.hbase.client.Durability;
43 import org.apache.hadoop.hbase.client.Get;
44 import org.apache.hadoop.hbase.client.HBaseAdmin;
45 import org.apache.hadoop.hbase.client.HTable;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.Table;
49 import org.apache.hadoop.hbase.regionserver.HRegionServer;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.Threads;
52 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
53 import org.junit.AfterClass;
54 import org.junit.BeforeClass;
55 import org.junit.Test;
56 import org.junit.experimental.categories.Category;
57
58
59
60
61 @Category(LargeTests.class)
62 public class TestChangingEncoding {
63 private static final Log LOG = LogFactory.getLog(TestChangingEncoding.class);
64 static final String CF = "EncodingTestCF";
65 static final byte[] CF_BYTES = Bytes.toBytes(CF);
66
67 private static final int NUM_ROWS_PER_BATCH = 100;
68 private static final int NUM_COLS_PER_ROW = 20;
69
70 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
71 private static final Configuration conf = TEST_UTIL.getConfiguration();
72
73 private static final int TIMEOUT_MS = 600000;
74
75 private HColumnDescriptor hcd;
76
77 private TableName tableName;
78 private static final List<DataBlockEncoding> ENCODINGS_TO_ITERATE =
79 createEncodingsToIterate();
80
81 private static final List<DataBlockEncoding> createEncodingsToIterate() {
82 List<DataBlockEncoding> encodings = new ArrayList<DataBlockEncoding>(
83 Arrays.asList(DataBlockEncoding.values()));
84 encodings.add(DataBlockEncoding.NONE);
85 return Collections.unmodifiableList(encodings);
86 }
87
88
89 private int numBatchesWritten;
90
91 private void prepareTest(String testId) throws IOException {
92 tableName = TableName.valueOf("test_table_" + testId);
93 HTableDescriptor htd = new HTableDescriptor(tableName);
94 hcd = new HColumnDescriptor(CF);
95 htd.addFamily(hcd);
96 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
97 admin.createTable(htd);
98 }
99 numBatchesWritten = 0;
100 }
101
102 @BeforeClass
103 public static void setUpBeforeClass() throws Exception {
104
105 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
106
107
108 conf.setBoolean("hbase.online.schema.update.enable", true);
109 TEST_UTIL.startMiniCluster();
110 }
111
112 @AfterClass
113 public static void tearDownAfterClass() throws Exception {
114 TEST_UTIL.shutdownMiniCluster();
115 }
116
117 private static byte[] getRowKey(int batchId, int i) {
118 return Bytes.toBytes("batch" + batchId + "_row" + i);
119 }
120
121 private static byte[] getQualifier(int j) {
122 return Bytes.toBytes("col" + j);
123 }
124
125 private static byte[] getValue(int batchId, int i, int j) {
126 return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i))
127 + "_col" + j);
128 }
129
130 static void writeTestDataBatch(Configuration conf, TableName tableName,
131 int batchId) throws Exception {
132 LOG.debug("Writing test data batch " + batchId);
133 List<Put> puts = new ArrayList<>();
134 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
135 Put put = new Put(getRowKey(batchId, i));
136 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
137 put.add(CF_BYTES, getQualifier(j),
138 getValue(batchId, i, j));
139 }
140 put.setDurability(Durability.SKIP_WAL);
141 puts.add(put);
142 }
143 try (Connection conn = ConnectionFactory.createConnection(conf);
144 Table table = conn.getTable(tableName)) {
145 table.put(puts);
146 }
147 }
148
149 static void verifyTestDataBatch(Configuration conf, TableName tableName,
150 int batchId) throws Exception {
151 LOG.debug("Verifying test data batch " + batchId);
152 Table table = new HTable(conf, tableName);
153 for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
154 Get get = new Get(getRowKey(batchId, i));
155 Result result = table.get(get);
156 for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
157 Cell kv = result.getColumnLatestCell(CF_BYTES, getQualifier(j));
158 assertTrue(CellUtil.matchingValue(kv, getValue(batchId, i, j)));
159 }
160 }
161 table.close();
162 }
163
164 private void writeSomeNewData() throws Exception {
165 writeTestDataBatch(conf, tableName, numBatchesWritten);
166 ++numBatchesWritten;
167 }
168
169 private void verifyAllData() throws Exception {
170 for (int i = 0; i < numBatchesWritten; ++i) {
171 verifyTestDataBatch(conf, tableName, i);
172 }
173 }
174
175 private void setEncodingConf(DataBlockEncoding encoding,
176 boolean onlineChange) throws Exception {
177 LOG.debug("Setting CF encoding to " + encoding + " (ordinal="
178 + encoding.ordinal() + "), onlineChange=" + onlineChange);
179 hcd.setDataBlockEncoding(encoding);
180 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
181 if (!onlineChange) {
182 admin.disableTable(tableName);
183 }
184 admin.modifyColumn(tableName, hcd);
185 if (!onlineChange) {
186 admin.enableTable(tableName);
187 }
188 }
189
190
191
192
193 ZKAssign.blockUntilNoRIT(TEST_UTIL.getZooKeeperWatcher());
194 }
195
196 @Test(timeout=TIMEOUT_MS)
197 public void testChangingEncoding() throws Exception {
198 prepareTest("ChangingEncoding");
199 for (boolean onlineChange : new boolean[]{false, true}) {
200 for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
201 setEncodingConf(encoding, onlineChange);
202 writeSomeNewData();
203 verifyAllData();
204 }
205 }
206 }
207
208 @Test(timeout=TIMEOUT_MS)
209 public void testChangingEncodingWithCompaction() throws Exception {
210 prepareTest("ChangingEncodingWithCompaction");
211 for (boolean onlineChange : new boolean[]{false, true}) {
212 for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
213 setEncodingConf(encoding, onlineChange);
214 writeSomeNewData();
215 verifyAllData();
216 compactAndWait();
217 verifyAllData();
218 }
219 }
220 }
221
222 private void compactAndWait() throws IOException, InterruptedException {
223 LOG.debug("Compacting table " + tableName);
224 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
225 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
226 admin.majorCompact(tableName);
227
228
229 final long maxWaitime = System.currentTimeMillis() + 500;
230 boolean cont;
231 do {
232 cont = rs.compactSplitThread.getCompactionQueueSize() == 0;
233 Threads.sleep(1);
234 } while (cont && System.currentTimeMillis() < maxWaitime);
235
236 while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
237 Threads.sleep(1);
238 }
239 LOG.debug("Compaction queue size reached 0, continuing");
240 }
241
242 @Test
243 public void testCrazyRandomChanges() throws Exception {
244 prepareTest("RandomChanges");
245 Random rand = new Random(2934298742974297L);
246 for (int i = 0; i < 20; ++i) {
247 int encodingOrdinal = rand.nextInt(DataBlockEncoding.values().length);
248 DataBlockEncoding encoding = DataBlockEncoding.values()[encodingOrdinal];
249 setEncodingConf(encoding, rand.nextBoolean());
250 writeSomeNewData();
251 verifyAllData();
252 }
253 }
254
255 }