1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.NavigableMap;
27 import java.util.TreeMap;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.codec.Codec;
33 import org.apache.hadoop.hbase.io.HeapSize;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.CellUtil;
36 import org.apache.hadoop.hbase.HRegionInfo;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.ClassSize;
41 import org.apache.hadoop.io.Writable;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @InterfaceAudience.Private
79 public class WALEdit implements Writable, HeapSize {
80 public static final Log LOG = LogFactory.getLog(WALEdit.class);
81
82
83 public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
84 static final byte [] METAROW = Bytes.toBytes("METAROW");
85 static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
86 static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
87 private final int VERSION_2 = -1;
88 private final boolean isReplay;
89
90 private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(1);
91
92
93 @Deprecated
94 private NavigableMap<byte[], Integer> scopes;
95
96 private CompressionContext compressionContext;
97
98 public WALEdit() {
99 this(false);
100 }
101
102 public WALEdit(boolean isReplay) {
103 this.isReplay = isReplay;
104 }
105
106
107
108
109
110 public static boolean isMetaEditFamily(final byte [] f) {
111 return Bytes.equals(METAFAMILY, f);
112 }
113
114
115
116
117
118 public boolean isReplay() {
119 return this.isReplay;
120 }
121
122 public void setCompressionContext(final CompressionContext compressionContext) {
123 this.compressionContext = compressionContext;
124 }
125
126 public WALEdit add(KeyValue kv) {
127 this.kvs.add(kv);
128 return this;
129 }
130
131 public boolean isEmpty() {
132 return kvs.isEmpty();
133 }
134
135 public int size() {
136 return kvs.size();
137 }
138
139 public ArrayList<KeyValue> getKeyValues() {
140 return kvs;
141 }
142
143 public NavigableMap<byte[], Integer> getAndRemoveScopes() {
144 NavigableMap<byte[], Integer> result = scopes;
145 scopes = null;
146 return result;
147 }
148
149 @Override
150 public void readFields(DataInput in) throws IOException {
151 kvs.clear();
152 if (scopes != null) {
153 scopes.clear();
154 }
155 int versionOrLength = in.readInt();
156
157 if (versionOrLength == VERSION_2) {
158
159 int numEdits = in.readInt();
160 for (int idx = 0; idx < numEdits; idx++) {
161 if (compressionContext != null) {
162 this.add(KeyValueCompression.readKV(in, compressionContext));
163 } else {
164 this.add(KeyValue.create(in));
165 }
166 }
167 int numFamilies = in.readInt();
168 if (numFamilies > 0) {
169 if (scopes == null) {
170 scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
171 }
172 for (int i = 0; i < numFamilies; i++) {
173 byte[] fam = Bytes.readByteArray(in);
174 int scope = in.readInt();
175 scopes.put(fam, scope);
176 }
177 }
178 } else {
179
180
181 this.add(KeyValue.create(versionOrLength, in));
182 }
183 }
184
185 @Override
186 public void write(DataOutput out) throws IOException {
187 LOG.warn("WALEdit is being serialized to writable - only expected in test code");
188 out.writeInt(VERSION_2);
189 out.writeInt(kvs.size());
190
191 for (KeyValue kv : kvs) {
192 if (compressionContext != null) {
193 KeyValueCompression.writeKV(out, kv, compressionContext);
194 } else{
195 KeyValue.write(kv, out);
196 }
197 }
198 if (scopes == null) {
199 out.writeInt(0);
200 } else {
201 out.writeInt(scopes.size());
202 for (byte[] key : scopes.keySet()) {
203 Bytes.writeByteArray(out, key);
204 out.writeInt(scopes.get(key));
205 }
206 }
207 }
208
209
210
211
212
213
214
215 public int readFromCells(Codec.Decoder cellDecoder, int expectedCount) throws IOException {
216 kvs.clear();
217 kvs.ensureCapacity(expectedCount);
218 while (kvs.size() < expectedCount && cellDecoder.advance()) {
219 Cell cell = cellDecoder.current();
220 if (!(cell instanceof KeyValue)) {
221 throw new IOException("WAL edit only supports KVs as cells");
222 }
223 kvs.add((KeyValue)cell);
224 }
225 return kvs.size();
226 }
227
228 @Override
229 public long heapSize() {
230 long ret = ClassSize.ARRAYLIST;
231 for (KeyValue kv : kvs) {
232 ret += kv.heapSize();
233 }
234 if (scopes != null) {
235 ret += ClassSize.TREEMAP;
236 ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
237
238 }
239 return ret;
240 }
241
242 @Override
243 public String toString() {
244 StringBuilder sb = new StringBuilder();
245
246 sb.append("[#edits: " + kvs.size() + " = <");
247 for (KeyValue kv : kvs) {
248 sb.append(kv.toString());
249 sb.append("; ");
250 }
251 if (scopes != null) {
252 sb.append(" scopes: " + scopes.toString());
253 }
254 sb.append(">]");
255 return sb.toString();
256 }
257
258
259
260
261
262
263 public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) {
264 byte [] pbbytes = c.toByteArray();
265 KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes);
266 return new WALEdit().add(kv);
267 }
268
269 private static byte[] getRowForRegion(HRegionInfo hri) {
270 byte[] startKey = hri.getStartKey();
271 if (startKey.length == 0) {
272
273
274 return new byte[] {0};
275 }
276 return startKey;
277 }
278
279
280
281
282
283
284 public static CompactionDescriptor getCompaction(Cell kv) throws IOException {
285 if (CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) {
286 return CompactionDescriptor.parseFrom(kv.getValue());
287 }
288 return null;
289 }
290 }
291