1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.replication;
19
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.fail;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.KeyValueUtil;
39 import org.apache.hadoop.hbase.testclassification.LargeTests;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.Tag;
42 import org.apache.hadoop.hbase.client.Admin;
43 import org.apache.hadoop.hbase.client.Connection;
44 import org.apache.hadoop.hbase.client.ConnectionFactory;
45 import org.apache.hadoop.hbase.client.Durability;
46 import org.apache.hadoop.hbase.client.Get;
47 import org.apache.hadoop.hbase.client.HBaseAdmin;
48 import org.apache.hadoop.hbase.client.HTable;
49 import org.apache.hadoop.hbase.client.Put;
50 import org.apache.hadoop.hbase.client.Result;
51 import org.apache.hadoop.hbase.client.Table;
52 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
53 import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
54 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
55 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
56 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
57 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
58 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
61 import org.junit.AfterClass;
62 import org.junit.BeforeClass;
63 import org.junit.Test;
64 import org.junit.experimental.categories.Category;
65
66 @Category(LargeTests.class)
67 public class TestReplicationWithTags {
68
69 private static final Log LOG = LogFactory.getLog(TestReplicationWithTags.class);
70 private static final byte TAG_TYPE = 1;
71
72 private static Configuration conf1 = HBaseConfiguration.create();
73 private static Configuration conf2;
74
75 private static ReplicationAdmin replicationAdmin;
76
77 private static Connection connection1;
78 private static Connection connection2;
79
80 private static Table htable1;
81 private static Table htable2;
82
83 private static HBaseTestingUtility utility1;
84 private static HBaseTestingUtility utility2;
85 private static final long SLEEP_TIME = 500;
86 private static final int NB_RETRIES = 10;
87
88 private static final TableName TABLE_NAME = TableName.valueOf("TestReplicationWithTags");
89 private static final byte[] FAMILY = Bytes.toBytes("f");
90 private static final byte[] ROW = Bytes.toBytes("row");
91
92 @BeforeClass
93 public static void setUpBeforeClass() throws Exception {
94 conf1.setInt("hfile.format.version", 3);
95 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
96 conf1.setInt("replication.source.size.capacity", 10240);
97 conf1.setLong("replication.source.sleepforretries", 100);
98 conf1.setInt("hbase.regionserver.maxlogs", 10);
99 conf1.setLong("hbase.master.logcleaner.ttl", 10);
100 conf1.setInt("zookeeper.recovery.retry", 1);
101 conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
102 conf1.setBoolean("dfs.support.append", true);
103 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
104 conf1.setInt("replication.stats.thread.period.seconds", 5);
105 conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
106 conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
107 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
108 TestCoprocessorForTagsAtSource.class.getName());
109
110 utility1 = new HBaseTestingUtility(conf1);
111 utility1.startMiniZKCluster();
112 MiniZooKeeperCluster miniZK = utility1.getZkCluster();
113
114
115 conf1 = utility1.getConfiguration();
116 replicationAdmin = new ReplicationAdmin(conf1);
117 LOG.info("Setup first Zk");
118
119
120 conf2 = HBaseConfiguration.create(conf1);
121 conf2.setInt("hfile.format.version", 3);
122 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
123 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
124 conf2.setBoolean("dfs.support.append", true);
125 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
126 conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName());
127 conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
128 TestCoprocessorForTagsAtSink.class.getName());
129
130 utility2 = new HBaseTestingUtility(conf2);
131 utility2.setZkCluster(miniZK);
132
133 replicationAdmin.addPeer("2", utility2.getClusterKey());
134
135 LOG.info("Setup second Zk");
136 utility1.startMiniCluster(2);
137 utility2.startMiniCluster(2);
138
139 HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
140 HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
141 fam.setMaxVersions(3);
142 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
143 table.addFamily(fam);
144 try (Connection conn = ConnectionFactory.createConnection(conf1);
145 Admin admin = conn.getAdmin()) {
146 admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
147 }
148 try (Connection conn = ConnectionFactory.createConnection(conf2);
149 Admin admin = conn.getAdmin()) {
150 admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
151 }
152 htable1 = utility1.getConnection().getTable(TABLE_NAME);
153 htable2 = utility2.getConnection().getTable(TABLE_NAME);
154 }
155
156
157
158
159 @AfterClass
160 public static void tearDownAfterClass() throws Exception {
161 utility2.shutdownMiniCluster();
162 utility1.shutdownMiniCluster();
163 }
164
165 @Test(timeout = 300000)
166 public void testReplicationWithCellTags() throws Exception {
167 LOG.info("testSimplePutDelete");
168 Put put = new Put(ROW);
169 put.setAttribute("visibility", Bytes.toBytes("myTag3"));
170 put.add(FAMILY, ROW, ROW);
171
172 htable1 = new HTable(conf1, TABLE_NAME);
173 htable1.put(put);
174
175 Get get = new Get(ROW);
176 try {
177 for (int i = 0; i < NB_RETRIES; i++) {
178 if (i == NB_RETRIES - 1) {
179 fail("Waited too much time for put replication");
180 }
181 Result res = htable2.get(get);
182 if (res.size() == 0) {
183 LOG.info("Row not available");
184 Thread.sleep(SLEEP_TIME);
185 } else {
186 assertArrayEquals(res.value(), ROW);
187 assertEquals(1, TestCoprocessorForTagsAtSink.tags.size());
188 Tag tag = TestCoprocessorForTagsAtSink.tags.get(0);
189 assertEquals(TAG_TYPE, tag.getType());
190 break;
191 }
192 }
193 } finally {
194 TestCoprocessorForTagsAtSink.tags = null;
195 }
196 }
197
198 public static class TestCoprocessorForTagsAtSource extends BaseRegionObserver {
199 @Override
200 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
201 final WALEdit edit, final Durability durability) throws IOException {
202 byte[] attribute = put.getAttribute("visibility");
203 byte[] cf = null;
204 List<Cell> updatedCells = new ArrayList<Cell>();
205 if (attribute != null) {
206 for (List<? extends Cell> edits : put.getFamilyCellMap().values()) {
207 for (Cell cell : edits) {
208 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
209 if (cf == null) {
210 cf = kv.getFamily();
211 }
212 Tag tag = new Tag(TAG_TYPE, attribute);
213 List<Tag> tagList = new ArrayList<Tag>();
214 tagList.add(tag);
215
216 KeyValue newKV = new KeyValue(kv.getRow(), 0, kv.getRowLength(), kv.getFamily(), 0,
217 kv.getFamilyLength(), kv.getQualifier(), 0, kv.getQualifierLength(),
218 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getType()), kv.getValue(), 0,
219 kv.getValueLength(), tagList);
220 ((List<Cell>) updatedCells).add(newKV);
221 }
222 }
223 put.getFamilyCellMap().remove(cf);
224
225 put.getFamilyCellMap().put(cf, updatedCells);
226 }
227 }
228 }
229
230 public static class TestCoprocessorForTagsAtSink extends BaseRegionObserver {
231 public static List<Tag> tags = null;
232
233 @Override
234 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
235 List<Cell> results) throws IOException {
236 if (results.size() > 0) {
237
238 if (!results.isEmpty()) {
239 Cell cell = results.get(0);
240 tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
241 }
242 }
243 }
244 }
245 }