1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.replication;
19
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.fail;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.KeyValueUtil;
39 import org.apache.hadoop.hbase.LargeTests;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.Tag;
42 import org.apache.hadoop.hbase.client.Durability;
43 import org.apache.hadoop.hbase.client.Get;
44 import org.apache.hadoop.hbase.client.HBaseAdmin;
45 import org.apache.hadoop.hbase.client.HTable;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
49 import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
50 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
51 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
52 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
53 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
54 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
57 import org.junit.AfterClass;
58 import org.junit.BeforeClass;
59 import org.junit.Test;
60 import org.junit.experimental.categories.Category;
61
62 @Category(LargeTests.class)
63 public class TestReplicationWithTags {
64
65 private static final Log LOG = LogFactory.getLog(TestReplicationWithTags.class);
66 private static final byte TAG_TYPE = 1;
67
68 private static Configuration conf1 = HBaseConfiguration.create();
69 private static Configuration conf2;
70
71 private static ReplicationAdmin replicationAdmin;
72
73 private static HTable htable1;
74 private static HTable htable2;
75
76 private static HBaseTestingUtility utility1;
77 private static HBaseTestingUtility utility2;
78 private static final long SLEEP_TIME = 500;
79 private static final int NB_RETRIES = 10;
80
81 private static final byte[] TABLE_NAME = Bytes.toBytes("TestReplicationWithTags");
82 private static final byte[] FAMILY = Bytes.toBytes("f");
83 private static final byte[] ROW = Bytes.toBytes("row");
84
85 @BeforeClass
86 public static void setUpBeforeClass() throws Exception {
87 conf1.setInt("hfile.format.version", 3);
88 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
89 conf1.setInt("replication.source.size.capacity", 10240);
90 conf1.setLong("replication.source.sleepforretries", 100);
91 conf1.setInt("hbase.regionserver.maxlogs", 10);
92 conf1.setLong("hbase.master.logcleaner.ttl", 10);
93 conf1.setInt("zookeeper.recovery.retry", 1);
94 conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
95 conf1.setBoolean("dfs.support.append", true);
96 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
97 conf1.setInt("replication.stats.thread.period.seconds", 5);
98 conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
99 conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
100 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
101 TestCoprocessorForTagsAtSource.class.getName());
102
103 utility1 = new HBaseTestingUtility(conf1);
104 utility1.startMiniZKCluster();
105 MiniZooKeeperCluster miniZK = utility1.getZkCluster();
106
107
108 conf1 = utility1.getConfiguration();
109 replicationAdmin = new ReplicationAdmin(conf1);
110 LOG.info("Setup first Zk");
111
112
113 conf2 = HBaseConfiguration.create(conf1);
114 conf2.setInt("hfile.format.version", 3);
115 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
116 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
117 conf2.setBoolean("dfs.support.append", true);
118 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
119 conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
120 conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
121 TestCoprocessorForTagsAtSink.class.getName());
122
123 utility2 = new HBaseTestingUtility(conf2);
124 utility2.setZkCluster(miniZK);
125
126 replicationAdmin.addPeer("2", utility2.getClusterKey());
127
128 LOG.info("Setup second Zk");
129 utility1.startMiniCluster(2);
130 utility2.startMiniCluster(2);
131
132 HTableDescriptor table = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
133 HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
134 fam.setMaxVersions(3);
135 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
136 table.addFamily(fam);
137 HBaseAdmin admin = null;
138 try {
139 admin = new HBaseAdmin(conf1);
140 admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
141 } finally {
142 if (admin != null) {
143 admin.close();
144 }
145 }
146 try {
147 admin = new HBaseAdmin(conf2);
148 admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
149 } finally {
150 if(admin != null){
151 admin.close();
152 }
153 }
154 htable1 = new HTable(conf1, TABLE_NAME);
155 htable1.setWriteBufferSize(1024);
156 htable2 = new HTable(conf2, TABLE_NAME);
157 }
158
159
160
161
162 @AfterClass
163 public static void tearDownAfterClass() throws Exception {
164 utility2.shutdownMiniCluster();
165 utility1.shutdownMiniCluster();
166 }
167
168 @Test(timeout = 300000)
169 public void testReplicationWithCellTags() throws Exception {
170 LOG.info("testSimplePutDelete");
171 Put put = new Put(ROW);
172 put.setAttribute("visibility", Bytes.toBytes("myTag3"));
173 put.add(FAMILY, ROW, ROW);
174
175 htable1 = new HTable(conf1, TABLE_NAME);
176 htable1.put(put);
177
178 Get get = new Get(ROW);
179 try {
180 for (int i = 0; i < NB_RETRIES; i++) {
181 if (i == NB_RETRIES - 1) {
182 fail("Waited too much time for put replication");
183 }
184 Result res = htable2.get(get);
185 if (res.size() == 0) {
186 LOG.info("Row not available");
187 Thread.sleep(SLEEP_TIME);
188 } else {
189 assertArrayEquals(res.value(), ROW);
190 assertEquals(1, TestCoprocessorForTagsAtSink.tags.size());
191 Tag tag = TestCoprocessorForTagsAtSink.tags.get(0);
192 assertEquals(TAG_TYPE, tag.getType());
193 break;
194 }
195 }
196 } finally {
197 TestCoprocessorForTagsAtSink.tags = null;
198 }
199 }
200
201 public static class TestCoprocessorForTagsAtSource extends BaseRegionObserver {
202 @Override
203 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
204 final WALEdit edit, final Durability durability) throws IOException {
205 byte[] attribute = put.getAttribute("visibility");
206 byte[] cf = null;
207 List<Cell> updatedCells = new ArrayList<Cell>();
208 if (attribute != null) {
209 for (List<? extends Cell> edits : put.getFamilyCellMap().values()) {
210 for (Cell cell : edits) {
211 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
212 if (cf == null) {
213 cf = kv.getFamily();
214 }
215 Tag tag = new Tag(TAG_TYPE, attribute);
216 List<Tag> tagList = new ArrayList<Tag>();
217 tagList.add(tag);
218
219 KeyValue newKV = new KeyValue(kv.getRow(), 0, kv.getRowLength(), kv.getFamily(), 0,
220 kv.getFamilyLength(), kv.getQualifier(), 0, kv.getQualifierLength(),
221 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getType()), kv.getValue(), 0,
222 kv.getValueLength(), tagList);
223 ((List<Cell>) updatedCells).add(newKV);
224 }
225 }
226 put.getFamilyCellMap().remove(cf);
227
228 put.getFamilyCellMap().put(cf, updatedCells);
229 }
230 }
231 }
232
233 public static class TestCoprocessorForTagsAtSink extends BaseRegionObserver {
234 public static List<Tag> tags = null;
235
236 @Override
237 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
238 List<Cell> results) throws IOException {
239 if (results.size() > 0) {
240
241 if (!results.isEmpty()) {
242 Cell cell = results.get(0);
243 tags = Tag
244 .asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLengthUnsigned());
245 }
246 }
247 }
248 }
249 }