1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static junit.framework.Assert.assertEquals;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.Random;
26 import java.util.SortedMap;
27 import java.util.TreeMap;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.HBaseTestingUtility;
33 import org.apache.hadoop.hbase.MediumTests;
34 import org.apache.hadoop.hbase.client.Delete;
35 import org.apache.hadoop.hbase.client.HTable;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.master.HMaster;
38 import org.apache.hadoop.hbase.protobuf.RequestConverter;
39 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
40 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
41 import org.apache.hadoop.hbase.regionserver.HRegion;
42 import org.apache.hadoop.hbase.regionserver.HRegionServer;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.experimental.categories.Category;
48
49 import com.google.common.collect.Lists;
50 import com.google.protobuf.ServiceException;
51
52 @Category(MediumTests.class)
53 public class TestHLogFiltering {
54 private static final Log LOG = LogFactory.getLog(TestHLogFiltering.class);
55
56 private static final int NUM_MASTERS = 1;
57 private static final int NUM_RS = 4;
58
59 private static final TableName TABLE_NAME =
60 TableName.valueOf("TestHLogFiltering");
61 private static final byte[] CF1 = Bytes.toBytes("MyCF1");
62 private static final byte[] CF2 = Bytes.toBytes("MyCF2");
63 private static final byte[][] FAMILIES = { CF1, CF2 };
64
65 private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
66
67 @Before
68 public void setUp() throws Exception {
69 TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
70 fillTable();
71 }
72
73 @After
74 public void tearDown() throws Exception {
75 TEST_UTIL.shutdownMiniCluster();
76 }
77
78 private void fillTable() throws IOException, InterruptedException {
79 HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES, 3,
80 Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_RS);
81 Random rand = new Random(19387129L);
82 for (int iStoreFile = 0; iStoreFile < 4; ++iStoreFile) {
83 for (int iRow = 0; iRow < 100; ++iRow) {
84 final byte[] row = Bytes.toBytes("row" + iRow);
85 Put put = new Put(row);
86 Delete del = new Delete(row);
87 for (int iCol = 0; iCol < 10; ++iCol) {
88 final byte[] cf = rand.nextBoolean() ? CF1 : CF2;
89 final long ts = Math.abs(rand.nextInt());
90 final byte[] qual = Bytes.toBytes("col" + iCol);
91 if (rand.nextBoolean()) {
92 final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
93 "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
94 ts + "_random_" + rand.nextLong());
95 put.add(cf, qual, ts, value);
96 } else if (rand.nextDouble() < 0.8) {
97 del.deleteColumn(cf, qual, ts);
98 } else {
99 del.deleteColumns(cf, qual, ts);
100 }
101 }
102 table.put(put);
103 table.delete(del);
104 table.flushCommits();
105 }
106 }
107 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
108 }
109
110 @Test
111 public void testFlushedSequenceIdsSentToHMaster()
112 throws IOException, InterruptedException, ServiceException {
113 SortedMap<byte[], Long> allFlushedSequenceIds =
114 new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
115 for (int i = 0; i < NUM_RS; ++i) {
116 flushAllRegions(i);
117 }
118 Thread.sleep(10000);
119 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
120 for (int i = 0; i < NUM_RS; ++i) {
121 for (byte[] regionName : getRegionsByServer(i)) {
122 if (allFlushedSequenceIds.containsKey(regionName)) {
123 GetLastFlushedSequenceIdRequest req =
124 RequestConverter.buildGetLastFlushedSequenceIdRequest(regionName);
125
126 assertEquals((long)allFlushedSequenceIds.get(regionName),
127 master.getLastFlushedSequenceId(null, req).getLastFlushedSequenceId());
128 }
129 }
130 }
131 }
132
133 private List<byte[]> getRegionsByServer(int rsId) throws IOException {
134 List<byte[]> regionNames = Lists.newArrayList();
135 HRegionServer hrs = getRegionServer(rsId);
136 for (HRegion r : hrs.getOnlineRegions(TABLE_NAME)) {
137 regionNames.add(r.getRegionName());
138 }
139 return regionNames;
140 }
141
142 private HRegionServer getRegionServer(int rsId) {
143 return TEST_UTIL.getMiniHBaseCluster().getRegionServer(rsId);
144 }
145
146 private void flushAllRegions(int rsId)
147 throws ServiceException, IOException {
148 HRegionServer hrs = getRegionServer(rsId);
149 for (byte[] regionName : getRegionsByServer(rsId)) {
150 FlushRegionRequest request =
151 RequestConverter.buildFlushRegionRequest(regionName);
152 hrs.flushRegion(null, request);
153 }
154 }
155
156 }