1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import static org.junit.Assert.assertFalse;
21
22 import java.io.IOException;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HRegionInfo;
31 import org.apache.hadoop.hbase.HTableDescriptor;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.SmallTests;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.junit.Test;
36 import org.junit.experimental.categories.Category;
37
38
39
40
41 @Category(SmallTests.class)
42 public class TestLogRollingNoCluster {
43 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
44 private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
45 private static final int THREAD_COUNT = 100;
46
47
48
49
50
51
52
53 @Test
54 public void testContendedLogRolling() throws IOException, InterruptedException {
55 FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
56 Path dir = TEST_UTIL.getDataTestDir();
57 HLog wal = HLogFactory.createHLog(fs, dir, "logs",
58 TEST_UTIL.getConfiguration());
59
60 Appender [] appenders = null;
61
62 final int count = THREAD_COUNT;
63 appenders = new Appender[count];
64 try {
65 for (int i = 0; i < count; i++) {
66
67 appenders[i] = new Appender(wal, i, count);
68 }
69 for (int i = 0; i < count; i++) {
70 appenders[i].start();
71 }
72 for (int i = 0; i < count; i++) {
73
74 appenders[i].join();
75 }
76 } finally {
77 wal.close();
78 }
79 for (int i = 0; i < count; i++) {
80 assertFalse(appenders[i].isException());
81 }
82 }
83
84
85
86
87 static class Appender extends Thread {
88 private final Log log;
89 private final HLog wal;
90 private final int count;
91 private Exception e = null;
92
93 Appender(final HLog wal, final int index, final int count) {
94 super("" + index);
95 this.wal = wal;
96 this.count = count;
97 this.log = LogFactory.getLog("Appender:" + getName());
98 }
99
100
101
102
103 boolean isException() {
104 return !isAlive() && this.e != null;
105 }
106
107 Exception getException() {
108 return this.e;
109 }
110
111 @Override
112 public void run() {
113 this.log.info(getName() +" started");
114 final AtomicLong sequenceId = new AtomicLong(1);
115 try {
116 for (int i = 0; i < this.count; i++) {
117 long now = System.currentTimeMillis();
118
119 if (i % 10 == 0 && ((FSHLog) this.wal).getNumEntries() > 0) {
120 this.wal.rollWriter();
121 }
122 WALEdit edit = new WALEdit();
123 byte[] bytes = Bytes.toBytes(i);
124 edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
125
126 this.wal.append(HRegionInfo.FIRST_META_REGIONINFO,
127 HTableDescriptor.META_TABLEDESC.getTableName(),
128 edit, now, HTableDescriptor.META_TABLEDESC, sequenceId);
129 }
130 String msg = getName() + " finished";
131 if (isException())
132 this.log.info(msg, getException());
133 else
134 this.log.info(msg);
135 } catch (Exception e) {
136 this.e = e;
137 log.info("Caught exception from Appender:" + getName(), e);
138 }
139 }
140 }
141
142
143
144
145 }