1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.handler;
20
21 import java.io.IOException;
22 import java.util.concurrent.atomic.AtomicBoolean;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.HRegionInfo;
28 import org.apache.hadoop.hbase.HTableDescriptor;
29 import org.apache.hadoop.hbase.Server;
30 import org.apache.hadoop.hbase.executor.EventHandler;
31 import org.apache.hadoop.hbase.executor.EventType;
32 import org.apache.hadoop.hbase.master.AssignmentManager;
33 import org.apache.hadoop.hbase.regionserver.HRegion;
34 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
35 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
36 import org.apache.hadoop.hbase.util.CancelableProgressable;
37 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
38 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
39 import org.apache.zookeeper.KeeperException;
40
41
42
43
44
45 @InterfaceAudience.Private
46 public class OpenRegionHandler extends EventHandler {
47 private static final Log LOG = LogFactory.getLog(OpenRegionHandler.class);
48
49 protected final RegionServerServices rsServices;
50
51 private final HRegionInfo regionInfo;
52 private final HTableDescriptor htd;
53
54 private boolean tomActivated;
55 private int assignmentTimeout;
56
57
58
59
60 private volatile int version = -1;
61
62 private volatile int versionOfOfflineNode = -1;
63
64 public OpenRegionHandler(final Server server,
65 final RegionServerServices rsServices, HRegionInfo regionInfo,
66 HTableDescriptor htd) {
67 this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1);
68 }
69 public OpenRegionHandler(final Server server,
70 final RegionServerServices rsServices, HRegionInfo regionInfo,
71 HTableDescriptor htd, int versionOfOfflineNode) {
72 this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION,
73 versionOfOfflineNode);
74 }
75
76 protected OpenRegionHandler(final Server server,
77 final RegionServerServices rsServices, final HRegionInfo regionInfo,
78 final HTableDescriptor htd, EventType eventType,
79 final int versionOfOfflineNode) {
80 super(server, eventType);
81 this.rsServices = rsServices;
82 this.regionInfo = regionInfo;
83 this.htd = htd;
84 this.versionOfOfflineNode = versionOfOfflineNode;
85 tomActivated = this.server.getConfiguration().
86 getBoolean(AssignmentManager.ASSIGNMENT_TIMEOUT_MANAGEMENT,
87 AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT);
88 assignmentTimeout = this.server.getConfiguration().
89 getInt(AssignmentManager.ASSIGNMENT_TIMEOUT,
90 AssignmentManager.DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT);
91 }
92
93 public HRegionInfo getRegionInfo() {
94 return regionInfo;
95 }
96
97 @Override
98 public void process() throws IOException {
99 boolean openSuccessful = false;
100 boolean transitionedToOpening = false;
101 final String regionName = regionInfo.getRegionNameAsString();
102 HRegion region = null;
103
104 try {
105 if (this.server.isStopped() || this.rsServices.isStopping()) {
106 return;
107 }
108 final String encodedName = regionInfo.getEncodedName();
109
110
111
112
113
114
115
116 if (this.rsServices.getFromOnlineRegions(encodedName) != null) {
117 LOG.error("Region " + encodedName +
118 " was already online when we started processing the opening. " +
119 "Marking this new attempt as failed");
120 return;
121 }
122
123
124
125
126 if (!isRegionStillOpening()){
127 LOG.error("Region " + encodedName + " opening cancelled");
128 return;
129 }
130
131 if (!transitionZookeeperOfflineToOpening(encodedName, versionOfOfflineNode)) {
132 LOG.warn("Region was hijacked? Opening cancelled for encodedName=" + encodedName);
133
134 return;
135 }
136 transitionedToOpening = true;
137
138
139 region = openRegion();
140 if (region == null) {
141 return;
142 }
143
144 boolean failed = true;
145 if (tickleOpening("post_region_open")) {
146 if (updateMeta(region)) {
147 failed = false;
148 }
149 }
150 if (failed || this.server.isStopped() ||
151 this.rsServices.isStopping()) {
152 return;
153 }
154
155
156 if (!isRegionStillOpening() || !transitionToOpened(region)) {
157
158
159
160
161
162 return;
163 }
164
165
166
167
168
169
170
171
172
173
174
175 this.rsServices.addToOnlineRegions(region);
176 openSuccessful = true;
177
178
179 LOG.debug("Opened " + regionName + " on " +
180 this.server.getServerName());
181
182
183 } finally {
184
185 if (!openSuccessful) {
186 doCleanUpOnFailedOpen(region, transitionedToOpening);
187 }
188 final Boolean current = this.rsServices.getRegionsInTransitionInRS().
189 remove(this.regionInfo.getEncodedNameAsBytes());
190
191
192
193
194
195
196
197
198 if (openSuccessful) {
199 if (current == null) {
200 LOG.error("Bad state: we've just opened a region that was NOT in transition. Region="
201 + regionName);
202 } else if (Boolean.FALSE.equals(current)) {
203
204 LOG.error("Race condition: we've finished to open a region, while a close was requested "
205 + " on region=" + regionName + ". It can be a critical error, as a region that"
206 + " should be closed is now opened. Closing it now");
207 cleanupFailedOpen(region);
208 }
209 }
210 }
211 }
212
213 private void doCleanUpOnFailedOpen(HRegion region, boolean transitionedToOpening)
214 throws IOException {
215 if (transitionedToOpening) {
216 try {
217 if (region != null) {
218 cleanupFailedOpen(region);
219 }
220 } finally {
221
222
223 tryTransitionFromOpeningToFailedOpen(regionInfo);
224 }
225 } else {
226
227
228 tryTransitionFromOfflineToFailedOpen(this.rsServices, regionInfo, versionOfOfflineNode);
229 }
230 }
231
232
233
234
235
236
237
238
239 boolean updateMeta(final HRegion r) {
240 if (this.server.isStopped() || this.rsServices.isStopping()) {
241 return false;
242 }
243
244
245 final AtomicBoolean signaller = new AtomicBoolean(false);
246 PostOpenDeployTasksThread t = new PostOpenDeployTasksThread(r,
247 this.server, this.rsServices, signaller);
248 t.start();
249
250
251 long timeout = assignmentTimeout * 10;
252 long now = System.currentTimeMillis();
253 long endTime = now + timeout;
254
255
256 long period = Math.max(1, assignmentTimeout/ 3);
257 long lastUpdate = now;
258 boolean tickleOpening = true;
259 while (!signaller.get() && t.isAlive() && !this.server.isStopped() &&
260 !this.rsServices.isStopping() && (endTime > now)) {
261 long elapsed = now - lastUpdate;
262 if (elapsed > period) {
263
264 lastUpdate = now;
265 tickleOpening = tickleOpening("post_open_deploy");
266 }
267 synchronized (signaller) {
268 try {
269 if (!signaller.get()) signaller.wait(period);
270 } catch (InterruptedException e) {
271
272 }
273 }
274 now = System.currentTimeMillis();
275 }
276
277
278 if (t.isAlive()) {
279 if (!signaller.get()) {
280
281 LOG.debug("Interrupting thread " + t);
282 t.interrupt();
283 }
284 try {
285 t.join();
286 } catch (InterruptedException ie) {
287 LOG.warn("Interrupted joining " +
288 r.getRegionInfo().getRegionNameAsString(), ie);
289 Thread.currentThread().interrupt();
290 }
291 }
292
293
294
295
296 return ((!Thread.interrupted() && t.getException() == null) && tickleOpening);
297 }
298
299
300
301
302
303
304
305
306 static class PostOpenDeployTasksThread extends Thread {
307 private Exception exception = null;
308 private final Server server;
309 private final RegionServerServices services;
310 private final HRegion region;
311 private final AtomicBoolean signaller;
312
313 PostOpenDeployTasksThread(final HRegion region, final Server server,
314 final RegionServerServices services, final AtomicBoolean signaller) {
315 super("PostOpenDeployTasks:" + region.getRegionInfo().getEncodedName());
316 this.setDaemon(true);
317 this.server = server;
318 this.services = services;
319 this.region = region;
320 this.signaller = signaller;
321 }
322
323 public void run() {
324 try {
325 this.services.postOpenDeployTasks(this.region,
326 this.server.getCatalogTracker());
327 } catch (KeeperException e) {
328 server.abort("Exception running postOpenDeployTasks; region=" +
329 this.region.getRegionInfo().getEncodedName(), e);
330 } catch (Exception e) {
331 LOG.warn("Exception running postOpenDeployTasks; region=" +
332 this.region.getRegionInfo().getEncodedName(), e);
333 this.exception = e;
334 }
335
336 this.signaller.set(true);
337 synchronized (this.signaller) {
338 this.signaller.notify();
339 }
340 }
341
342
343
344
345 Exception getException() {
346 return this.exception;
347 }
348 }
349
350
351
352
353
354
355
356 boolean transitionToOpened(final HRegion r) throws IOException {
357 boolean result = false;
358 HRegionInfo hri = r.getRegionInfo();
359 final String name = hri.getRegionNameAsString();
360
361 try {
362 if (ZKAssign.transitionNodeOpened(this.server.getZooKeeper(), hri,
363 this.server.getServerName(), this.version) == -1) {
364 String warnMsg = "Completed the OPEN of region " + name +
365 " but when transitioning from " + " OPENING to OPENED ";
366 try {
367 String node = ZKAssign.getNodeName(this.server.getZooKeeper(), hri.getEncodedName());
368 if (ZKUtil.checkExists(this.server.getZooKeeper(), node) < 0) {
369
370 rsServices.abort(warnMsg + "the znode disappeared", null);
371 } else {
372 LOG.warn(warnMsg + "got a version mismatch, someone else clashed; " +
373 "so now unassigning -- closing region on server: " + this.server.getServerName());
374 }
375 } catch (KeeperException ke) {
376 rsServices.abort(warnMsg, ke);
377 }
378 } else {
379 LOG.debug("Transitioned " + r.getRegionInfo().getEncodedName() +
380 " to OPENED in zk on " + this.server.getServerName());
381 result = true;
382 }
383 } catch (KeeperException e) {
384 LOG.error("Failed transitioning node " + name +
385 " from OPENING to OPENED -- closing region", e);
386 }
387 return result;
388 }
389
390
391
392
393
394
395 private boolean tryTransitionFromOpeningToFailedOpen(final HRegionInfo hri) {
396 boolean result = false;
397 final String name = hri.getRegionNameAsString();
398 try {
399 LOG.info("Opening of region " + hri + " failed, transitioning" +
400 " from OPENING to FAILED_OPEN in ZK, expecting version " + this.version);
401 if (ZKAssign.transitionNode(
402 this.server.getZooKeeper(), hri,
403 this.server.getServerName(),
404 EventType.RS_ZK_REGION_OPENING,
405 EventType.RS_ZK_REGION_FAILED_OPEN,
406 this.version) == -1) {
407 LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
408 "It's likely that the master already timed out this open " +
409 "attempt, and thus another RS already has the region.");
410 } else {
411 result = true;
412 }
413 } catch (KeeperException e) {
414 LOG.error("Failed transitioning node " + name +
415 " from OPENING to FAILED_OPEN", e);
416 }
417 return result;
418 }
419
420
421
422
423
424
425
426
427
428
429
430
431 public static boolean tryTransitionFromOfflineToFailedOpen(RegionServerServices rsServices,
432 final HRegionInfo hri, final int versionOfOfflineNode) {
433 boolean result = false;
434 final String name = hri.getRegionNameAsString();
435 try {
436 LOG.info("Opening of region " + hri + " failed, transitioning" +
437 " from OFFLINE to FAILED_OPEN in ZK, expecting version " + versionOfOfflineNode);
438 if (ZKAssign.transitionNode(
439 rsServices.getZooKeeper(), hri,
440 rsServices.getServerName(),
441 EventType.M_ZK_REGION_OFFLINE,
442 EventType.RS_ZK_REGION_FAILED_OPEN,
443 versionOfOfflineNode) == -1) {
444 LOG.warn("Unable to mark region " + hri + " as FAILED_OPEN. " +
445 "It's likely that the master already timed out this open " +
446 "attempt, and thus another RS already has the region.");
447 } else {
448 result = true;
449 }
450 } catch (KeeperException e) {
451 LOG.error("Failed transitioning node " + name + " from OFFLINE to FAILED_OPEN", e);
452 }
453 return result;
454 }
455
456
457
458
459
460 HRegion openRegion() {
461 HRegion region = null;
462 try {
463
464
465 region = HRegion.openHRegion(this.regionInfo, this.htd,
466 this.rsServices.getWAL(this.regionInfo),
467 this.server.getConfiguration(),
468 this.rsServices,
469 new CancelableProgressable() {
470 public boolean progress() {
471
472
473
474 return tickleOpening("open_region_progress");
475 }
476 });
477 } catch (Throwable t) {
478
479
480
481 LOG.error(
482 "Failed open of region=" + this.regionInfo.getRegionNameAsString()
483 + ", starting to roll back the global memstore size.", t);
484
485 if (this.rsServices != null) {
486 RegionServerAccounting rsAccounting =
487 this.rsServices.getRegionServerAccounting();
488 if (rsAccounting != null) {
489 rsAccounting.rollbackRegionReplayEditsSize(this.regionInfo.getRegionName());
490 }
491 }
492 }
493 return region;
494 }
495
496 void cleanupFailedOpen(final HRegion region) throws IOException {
497 if (region != null) {
498 this.rsServices.removeFromOnlineRegions(region, null);
499 region.close();
500 }
501 }
502
503 private boolean isRegionStillOpening() {
504 byte[] encodedName = regionInfo.getEncodedNameAsBytes();
505 Boolean action = rsServices.getRegionsInTransitionInRS().get(encodedName);
506 return Boolean.TRUE.equals(action);
507 }
508
509
510
511
512
513
514
515
516
517 boolean transitionZookeeperOfflineToOpening(final String encodedName,
518 int versionOfOfflineNode) {
519
520 try {
521
522 this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo,
523 server.getServerName(), EventType.M_ZK_REGION_OFFLINE,
524 EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode);
525 } catch (KeeperException e) {
526 LOG.error("Error transition from OFFLINE to OPENING for region=" +
527 encodedName, e);
528 this.version = -1;
529 return false;
530 }
531 boolean b = isGoodVersion();
532 if (!b) {
533 LOG.warn("Failed transition from OFFLINE to OPENING for region=" +
534 encodedName);
535 }
536 return b;
537 }
538
539
540
541
542
543
544
545 boolean tickleOpening(final String context) {
546 if (!isRegionStillOpening()) {
547 LOG.warn("Open region aborted since it isn't opening any more");
548 return false;
549 }
550
551 if (!isGoodVersion()) return false;
552 String encodedName = this.regionInfo.getEncodedName();
553 try {
554 this.version =
555 ZKAssign.retransitionNodeOpening(server.getZooKeeper(),
556 this.regionInfo, this.server.getServerName(), this.version, tomActivated);
557 } catch (KeeperException e) {
558 server.abort("Exception refreshing OPENING; region=" + encodedName +
559 ", context=" + context, e);
560 this.version = -1;
561 return false;
562 }
563 boolean b = isGoodVersion();
564 if (!b) {
565 LOG.warn("Failed refreshing OPENING; region=" + encodedName +
566 ", context=" + context);
567 }
568 return b;
569 }
570
571 private boolean isGoodVersion() {
572 return this.version != -1;
573 }
574 }