1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
25 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
26
27
28
29
30
31 @InterfaceAudience.Private
32 public class MetricsSource {
33
34 public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
35 public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
36 public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead";
37 public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
38 public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
39 public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs";
40 public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
41 public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes";
42
43 public static final Log LOG = LogFactory.getLog(MetricsSource.class);
44 private String id;
45
46 private long lastTimestamp = 0;
47 private int lastQueueSize = 0;
48
49 private String sizeOfLogQueKey;
50 private String ageOfLastShippedOpKey;
51 private String logEditsReadKey;
52 private String logEditsFilteredKey;
53 private final String shippedBatchesKey;
54 private final String shippedOpsKey;
55 private final String shippedKBsKey;
56 private final String logReadInBytesKey;
57
58 private MetricsReplicationSource rms;
59
60
61
62
63
64
65 public MetricsSource(String id) {
66 this.id = id;
67
68 sizeOfLogQueKey = "source." + id + ".sizeOfLogQueue";
69 ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp";
70 logEditsReadKey = "source." + id + ".logEditsRead";
71 logEditsFilteredKey = "source." + id + ".logEditsFiltered";
72 shippedBatchesKey = "source." + this.id + ".shippedBatches";
73 shippedOpsKey = "source." + this.id + ".shippedOps";
74 shippedKBsKey = "source." + this.id + ".shippedKBs";
75 logReadInBytesKey = "source." + this.id + ".logReadInBytes";
76 rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class);
77 }
78
79
80
81
82
83
84 public void setAgeOfLastShippedOp(long timestamp) {
85 long age = EnvironmentEdgeManager.currentTimeMillis() - timestamp;
86 rms.setGauge(ageOfLastShippedOpKey, age);
87 rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age);
88 this.lastTimestamp = timestamp;
89 }
90
91
92
93
94
95 public void refreshAgeOfLastShippedOp() {
96 if (this.lastTimestamp > 0) {
97 setAgeOfLastShippedOp(this.lastTimestamp);
98 }
99 }
100
101
102
103
104
105
106 public void setSizeOfLogQueue(int size) {
107 rms.setGauge(sizeOfLogQueKey, size);
108 rms.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize);
109 lastQueueSize = size;
110 }
111
112
113
114
115
116
117 private void incrLogEditsRead(long delta) {
118 rms.incCounters(logEditsReadKey, delta);
119 rms.incCounters(SOURCE_LOG_EDITS_READ, delta);
120 }
121
122
123 public void incrLogEditsRead() {
124 incrLogEditsRead(1);
125 }
126
127
128
129
130
131
132 private void incrLogEditsFiltered(long delta) {
133 rms.incCounters(logEditsFilteredKey, delta);
134 rms.incCounters(SOURCE_LOG_EDITS_FILTERED, delta);
135 }
136
137
138 public void incrLogEditsFiltered() {
139 incrLogEditsFiltered(1);
140 }
141
142
143
144
145
146
147 public void shipBatch(long batchSize, int sizeInKB) {
148 rms.incCounters(shippedBatchesKey, 1);
149 rms.incCounters(SOURCE_SHIPPED_BATCHES, 1);
150 rms.incCounters(shippedOpsKey, batchSize);
151 rms.incCounters(SOURCE_SHIPPED_OPS, batchSize);
152 rms.incCounters(shippedKBsKey, sizeInKB);
153 rms.incCounters(SOURCE_SHIPPED_KBS, sizeInKB);
154 }
155
156
157 public void incrLogReadInBytes(long readInBytes) {
158 rms.incCounters(logReadInBytesKey, readInBytes);
159 rms.incCounters(SOURCE_LOG_READ_IN_BYTES, readInBytes);
160 }
161
162
163 public void clear() {
164 rms.removeMetric(sizeOfLogQueKey);
165 rms.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize);
166 lastQueueSize = 0;
167 rms.removeMetric(ageOfLastShippedOpKey);
168
169 rms.removeMetric(logEditsFilteredKey);
170 rms.removeMetric(logEditsReadKey);
171
172 }
173 }