1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
22 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
23 import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.ListIterator;
29 import java.util.Map;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.classification.InterfaceAudience;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HRegionInfo;
37 import org.apache.hadoop.hbase.MetaMutationAnnotation;
38 import org.apache.hadoop.hbase.RegionTransition;
39 import org.apache.hadoop.hbase.Server;
40 import org.apache.hadoop.hbase.ServerName;
41 import org.apache.hadoop.hbase.catalog.CatalogTracker;
42 import org.apache.hadoop.hbase.catalog.MetaEditor;
43 import org.apache.hadoop.hbase.catalog.MetaReader;
44 import org.apache.hadoop.hbase.client.Delete;
45 import org.apache.hadoop.hbase.client.HTable;
46 import org.apache.hadoop.hbase.client.Mutation;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.executor.EventType;
49 import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52 import org.apache.hadoop.hbase.util.Pair;
53 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
54 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
55 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
56 import org.apache.zookeeper.KeeperException;
57 import org.apache.zookeeper.KeeperException.NodeExistsException;
58 import org.apache.zookeeper.data.Stat;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 @InterfaceAudience.Private
89 public class RegionMergeTransaction {
90 private static final Log LOG = LogFactory.getLog(RegionMergeTransaction.class);
91
92
93 private HRegionInfo mergedRegionInfo;
94
95 private final HRegion region_a;
96 private final HRegion region_b;
97
98 private final Path mergesdir;
99 private int znodeVersion = -1;
100
101 private final boolean forcible;
102
103
104
105
106
107 enum JournalEntry {
108
109
110
111 SET_MERGING_IN_ZK,
112
113
114
115 CREATED_MERGE_DIR,
116
117
118
119 CLOSED_REGION_A,
120
121
122
123 OFFLINED_REGION_A,
124
125
126
127 CLOSED_REGION_B,
128
129
130
131 OFFLINED_REGION_B,
132
133
134
135 STARTED_MERGED_REGION_CREATION,
136
137
138
139
140 PONR
141 }
142
143
144
145
146 private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
147
148 private static IOException closedByOtherException = new IOException(
149 "Failed to close region: already closed by another thread");
150
151 private RegionServerCoprocessorHost rsCoprocessorHost = null;
152
153
154
155
156
157
158
159 public RegionMergeTransaction(final HRegion a, final HRegion b,
160 final boolean forcible) {
161 if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
162 this.region_a = a;
163 this.region_b = b;
164 } else {
165 this.region_a = b;
166 this.region_b = a;
167 }
168 this.forcible = forcible;
169 this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
170 }
171
172
173
174
175
176
177
178 public boolean prepare(final RegionServerServices services) {
179 if (!region_a.getTableDesc().getTableName()
180 .equals(region_b.getTableDesc().getTableName())) {
181 LOG.info("Can't merge regions " + region_a + "," + region_b
182 + " because they do not belong to the same table");
183 return false;
184 }
185 if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) {
186 LOG.info("Can't merge the same region " + region_a);
187 return false;
188 }
189 if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(),
190 region_b.getRegionInfo())) {
191 String msg = "Skip merging " + this.region_a.getRegionNameAsString()
192 + " and " + this.region_b.getRegionNameAsString()
193 + ", because they are not adjacent.";
194 LOG.info(msg);
195 return false;
196 }
197 if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) {
198 return false;
199 }
200 try {
201 boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services,
202 region_a.getRegionName());
203 if (regionAHasMergeQualifier ||
204 hasMergeQualifierInMeta(services, region_b.getRegionName())) {
205 LOG.debug("Region " + (regionAHasMergeQualifier ? region_a.getRegionNameAsString()
206 : region_b.getRegionNameAsString())
207 + " is not mergeable because it has merge qualifier in META");
208 return false;
209 }
210 } catch (IOException e) {
211 LOG.warn("Failed judging whether merge transaction is available for "
212 + region_a.getRegionNameAsString() + " and "
213 + region_b.getRegionNameAsString(), e);
214 return false;
215 }
216
217
218
219
220
221
222
223
224 this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(),
225 region_b.getRegionInfo());
226 return true;
227 }
228
229
230
231
232
233
234
235
236
237
238
239
240 public HRegion execute(final Server server,
241 final RegionServerServices services) throws IOException {
242 if (rsCoprocessorHost == null) {
243 rsCoprocessorHost = server != null ? ((HRegionServer) server).getCoprocessorHost() : null;
244 }
245 HRegion mergedRegion = createMergedRegion(server, services);
246 if (rsCoprocessorHost != null) {
247 rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
248 }
249 return stepsAfterPONR(server, services, mergedRegion);
250 }
251
252 public HRegion stepsAfterPONR(final Server server, final RegionServerServices services,
253 HRegion mergedRegion) throws IOException {
254 openMergedRegion(server, services, mergedRegion);
255 transitionZKNode(server, services, mergedRegion);
256 return mergedRegion;
257 }
258
259
260
261
262
263
264
265
266
267
268 HRegion createMergedRegion(final Server server,
269 final RegionServerServices services) throws IOException {
270 LOG.info("Starting merge of " + region_a + " and "
271 + region_b.getRegionNameAsString() + ", forcible=" + forcible);
272 if ((server != null && server.isStopped())
273 || (services != null && services.isStopping())) {
274 throw new IOException("Server is stopped or stopping");
275 }
276
277 if (rsCoprocessorHost != null) {
278 if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) {
279 throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
280 + this.region_b + " merge.");
281 }
282 }
283
284
285 boolean testing = server == null ? true : server.getConfiguration()
286 .getBoolean("hbase.testing.nocluster", false);
287
288 HRegion mergedRegion = stepsBeforePONR(server, services, testing);
289
290 @MetaMutationAnnotation
291 List<Mutation> metaEntries = new ArrayList<Mutation>();
292 if (rsCoprocessorHost != null) {
293 if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) {
294 throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
295 + this.region_b + " merge.");
296 }
297 try {
298 for (Mutation p : metaEntries) {
299 HRegionInfo.parseRegionName(p.getRow());
300 }
301 } catch (IOException e) {
302 LOG.error("Row key of mutation from coprocessor is not parsable as region name."
303 + "Mutations from coprocessor should only be for hbase:meta table.", e);
304 throw e;
305 }
306 }
307
308
309
310
311 this.journal.add(JournalEntry.PONR);
312
313
314
315
316
317
318 if (!testing) {
319 if (metaEntries.isEmpty()) {
320 MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a
321 .getRegionInfo(), region_b.getRegionInfo(), server.getServerName());
322 } else {
323 mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(),
324 region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries);
325 }
326 }
327 return mergedRegion;
328 }
329
330 private void mergeRegionsAndPutMetaEntries(CatalogTracker catalogTracker,
331 HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, ServerName serverName,
332 List<Mutation> metaEntries) throws IOException {
333 prepareMutationsForMerge(mergedRegion, regionA, regionB, serverName, metaEntries);
334 MetaEditor.mutateMetaTable(catalogTracker, metaEntries);
335 }
336
337 public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA,
338 HRegionInfo regionB, ServerName serverName, List<Mutation> mutations) throws IOException {
339 HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
340
341
342 Put putOfMerged = MetaEditor.makePutFromRegionInfo(copyOfMerged);
343 putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray());
344 putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray());
345 mutations.add(putOfMerged);
346
347 Delete deleteA = MetaEditor.makeDeleteFromRegionInfo(regionA);
348 Delete deleteB = MetaEditor.makeDeleteFromRegionInfo(regionB);
349 mutations.add(deleteA);
350 mutations.add(deleteB);
351
352 addLocation(putOfMerged, serverName, 1);
353 }
354
355 public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
356 p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
357 .toBytes(sn.getHostAndPort()));
358 p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
359 .getStartcode()));
360 p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
361 return p;
362 }
363
364 public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
365 boolean testing) throws IOException {
366
367
368 if (server != null && server.getZooKeeper() != null) {
369 try {
370 createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
371 server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
372 } catch (KeeperException e) {
373 throw new IOException("Failed creating PENDING_MERGE znode on "
374 + this.mergedRegionInfo.getRegionNameAsString(), e);
375 }
376 }
377 this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
378 if (server != null && server.getZooKeeper() != null) {
379
380
381
382 znodeVersion = getZKNode(server, services);
383 }
384
385 this.region_a.getRegionFileSystem().createMergesDir();
386 this.journal.add(JournalEntry.CREATED_MERGE_DIR);
387
388 Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
389 services, this.region_a, true, testing);
390 Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
391 services, this.region_b, false, testing);
392
393 assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
394
395
396
397
398
399
400 mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
401
402 if (server != null && server.getZooKeeper() != null) {
403 try {
404
405
406 this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
407 this.mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
408 server.getServerName(), this.znodeVersion,
409 RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGING);
410 } catch (KeeperException e) {
411 throw new IOException("Failed setting MERGING znode on "
412 + this.mergedRegionInfo.getRegionNameAsString(), e);
413 }
414 }
415
416
417
418
419
420 this.journal.add(JournalEntry.STARTED_MERGED_REGION_CREATION);
421 HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
422 this.region_b, this.mergedRegionInfo);
423 return mergedRegion;
424 }
425
426
427
428
429
430
431
432
433
434
435 HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b,
436 final HRegionInfo mergedRegion) throws IOException {
437 return a.createMergedRegionFromMerges(mergedRegion, b);
438 }
439
440
441
442
443
444
445
446
447
448
449 private Map<byte[], List<StoreFile>> closeAndOfflineRegion(
450 final RegionServerServices services, final HRegion region,
451 final boolean isRegionA, final boolean testing) throws IOException {
452 Map<byte[], List<StoreFile>> hstoreFilesToMerge = null;
453 Exception exceptionToThrow = null;
454 try {
455 hstoreFilesToMerge = region.close(false);
456 } catch (Exception e) {
457 exceptionToThrow = e;
458 }
459 if (exceptionToThrow == null && hstoreFilesToMerge == null) {
460
461
462
463
464
465 exceptionToThrow = closedByOtherException;
466 }
467 if (exceptionToThrow != closedByOtherException) {
468 this.journal.add(isRegionA ? JournalEntry.CLOSED_REGION_A
469 : JournalEntry.CLOSED_REGION_B);
470 }
471 if (exceptionToThrow != null) {
472 if (exceptionToThrow instanceof IOException)
473 throw (IOException) exceptionToThrow;
474 throw new IOException(exceptionToThrow);
475 }
476
477 if (!testing) {
478 services.removeFromOnlineRegions(region, null);
479 }
480 this.journal.add(isRegionA ? JournalEntry.OFFLINED_REGION_A
481 : JournalEntry.OFFLINED_REGION_B);
482 return hstoreFilesToMerge;
483 }
484
485
486
487
488
489
490
491 public static HRegionInfo getMergedRegionInfo(final HRegionInfo a,
492 final HRegionInfo b) {
493 long rid = EnvironmentEdgeManager.currentTimeMillis();
494
495
496 if (rid < a.getRegionId() || rid < b.getRegionId()) {
497 LOG.warn("Clock skew; merging regions id are " + a.getRegionId()
498 + " and " + b.getRegionId() + ", but current time here is " + rid);
499 rid = Math.max(a.getRegionId(), b.getRegionId()) + 1;
500 }
501
502 byte[] startKey = null;
503 byte[] endKey = null;
504
505 if (a.compareTo(b) <= 0) {
506 startKey = a.getStartKey();
507 } else {
508 startKey = b.getStartKey();
509 }
510
511 if (Bytes.equals(a.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
512 || (!Bytes.equals(b.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
513 && Bytes.compareTo(a.getEndKey(), b.getEndKey()) > 0)) {
514 endKey = a.getEndKey();
515 } else {
516 endKey = b.getEndKey();
517 }
518
519
520 HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTable(), startKey,
521 endKey, false, rid);
522 return mergedRegionInfo;
523 }
524
525
526
527
528
529
530
531
532
533
534 void openMergedRegion(final Server server,
535 final RegionServerServices services, HRegion merged) throws IOException {
536 boolean stopped = server != null && server.isStopped();
537 boolean stopping = services != null && services.isStopping();
538 if (stopped || stopping) {
539 LOG.info("Not opening merged region " + merged.getRegionNameAsString()
540 + " because stopping=" + stopping + ", stopped=" + stopped);
541 return;
542 }
543 HRegionInfo hri = merged.getRegionInfo();
544 LoggingProgressable reporter = server == null ? null
545 : new LoggingProgressable(hri, server.getConfiguration().getLong(
546 "hbase.regionserver.regionmerge.open.log.interval", 10000));
547 merged.openHRegion(reporter);
548
549 if (services != null) {
550 try {
551 services.postOpenDeployTasks(merged, server.getCatalogTracker());
552 services.addToOnlineRegions(merged);
553 } catch (KeeperException ke) {
554 throw new IOException(ke);
555 }
556 }
557
558 }
559
560
561
562
563
564
565
566
567
568 void transitionZKNode(final Server server, final RegionServerServices services,
569 HRegion mergedRegion) throws IOException {
570 if (server == null || server.getZooKeeper() == null) {
571 return;
572 }
573
574
575 try {
576 this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
577 this.mergedRegionInfo, region_a.getRegionInfo(),
578 region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
579 RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
580
581 long startTime = EnvironmentEdgeManager.currentTimeMillis();
582 int spins = 0;
583
584
585
586 do {
587 if (spins % 10 == 0) {
588 LOG.debug("Still waiting on the master to process the merge for "
589 + this.mergedRegionInfo.getEncodedName() + ", waited "
590 + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
591 }
592 Thread.sleep(100);
593
594 this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
595 this.mergedRegionInfo, region_a.getRegionInfo(),
596 region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
597 RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
598 spins++;
599 } while (this.znodeVersion != -1 && !server.isStopped()
600 && !services.isStopping());
601 } catch (Exception e) {
602 if (e instanceof InterruptedException) {
603 Thread.currentThread().interrupt();
604 }
605 throw new IOException("Failed telling master about merge "
606 + mergedRegionInfo.getEncodedName(), e);
607 }
608
609 if (rsCoprocessorHost != null) {
610 rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
611 }
612
613
614
615
616 }
617
618
619
620
621
622
623
624
625
626 private int getZKNode(final Server server,
627 final RegionServerServices services) throws IOException {
628
629 try {
630 int spins = 0;
631 Stat stat = new Stat();
632 ZooKeeperWatcher zkw = server.getZooKeeper();
633 ServerName expectedServer = server.getServerName();
634 String node = mergedRegionInfo.getEncodedName();
635 while (!(server.isStopped() || services.isStopping())) {
636 if (spins % 5 == 0) {
637 LOG.debug("Still waiting for master to process "
638 + "the pending_merge for " + node);
639 transitionMergingNode(zkw, mergedRegionInfo, region_a.getRegionInfo(),
640 region_b.getRegionInfo(), expectedServer, -1, RS_ZK_REQUEST_REGION_MERGE,
641 RS_ZK_REQUEST_REGION_MERGE);
642 }
643 Thread.sleep(100);
644 spins++;
645 byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
646 if (data == null) {
647 throw new IOException("Data is null, merging node "
648 + node + " no longer exists");
649 }
650 RegionTransition rt = RegionTransition.parseFrom(data);
651 EventType et = rt.getEventType();
652 if (et == RS_ZK_REGION_MERGING) {
653 ServerName serverName = rt.getServerName();
654 if (!serverName.equals(expectedServer)) {
655 throw new IOException("Merging node " + node + " is for "
656 + serverName + ", not us " + expectedServer);
657 }
658 byte [] payloadOfMerging = rt.getPayload();
659 List<HRegionInfo> mergingRegions = HRegionInfo.parseDelimitedFrom(
660 payloadOfMerging, 0, payloadOfMerging.length);
661 assert mergingRegions.size() == 3;
662 HRegionInfo a = mergingRegions.get(1);
663 HRegionInfo b = mergingRegions.get(2);
664 HRegionInfo hri_a = region_a.getRegionInfo();
665 HRegionInfo hri_b = region_b.getRegionInfo();
666 if (!(hri_a.equals(a) && hri_b.equals(b))) {
667 throw new IOException("Merging node " + node + " is for " + a + ", "
668 + b + ", not expected regions: " + hri_a + ", " + hri_b);
669 }
670
671 return stat.getVersion();
672 }
673 if (et != RS_ZK_REQUEST_REGION_MERGE) {
674 throw new IOException("Merging node " + node
675 + " moved out of merging to " + et);
676 }
677 }
678
679 throw new IOException("Server is "
680 + (services.isStopping() ? "stopping" : "stopped"));
681 } catch (Exception e) {
682 if (e instanceof InterruptedException) {
683 Thread.currentThread().interrupt();
684 }
685 throw new IOException("Failed getting MERGING znode on "
686 + mergedRegionInfo.getRegionNameAsString(), e);
687 }
688 }
689
690
691
692
693
694
695
696 private void mergeStoreFiles(
697 Map<byte[], List<StoreFile>> hstoreFilesOfRegionA,
698 Map<byte[], List<StoreFile>> hstoreFilesOfRegionB)
699 throws IOException {
700
701 HRegionFileSystem fs_a = this.region_a.getRegionFileSystem();
702 for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionA
703 .entrySet()) {
704 String familyName = Bytes.toString(entry.getKey());
705 for (StoreFile storeFile : entry.getValue()) {
706 fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
707 this.mergesdir);
708 }
709 }
710
711 HRegionFileSystem fs_b = this.region_b.getRegionFileSystem();
712 for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionB
713 .entrySet()) {
714 String familyName = Bytes.toString(entry.getKey());
715 for (StoreFile storeFile : entry.getValue()) {
716 fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
717 this.mergesdir);
718 }
719 }
720 }
721
722
723
724
725
726
727
728
729
730 @SuppressWarnings("deprecation")
731 public boolean rollback(final Server server,
732 final RegionServerServices services) throws IOException {
733 assert this.mergedRegionInfo != null;
734
735 if (rsCoprocessorHost != null) {
736 rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b);
737 }
738
739 boolean result = true;
740 ListIterator<JournalEntry> iterator = this.journal
741 .listIterator(this.journal.size());
742
743 while (iterator.hasPrevious()) {
744 JournalEntry je = iterator.previous();
745 switch (je) {
746
747 case SET_MERGING_IN_ZK:
748 if (server != null && server.getZooKeeper() != null) {
749 cleanZK(server, this.mergedRegionInfo);
750 }
751 break;
752
753 case CREATED_MERGE_DIR:
754 this.region_a.writestate.writesEnabled = true;
755 this.region_b.writestate.writesEnabled = true;
756 this.region_a.getRegionFileSystem().cleanupMergesDir();
757 break;
758
759 case CLOSED_REGION_A:
760 try {
761
762
763
764
765
766 this.region_a.initialize();
767 } catch (IOException e) {
768 LOG.error("Failed rollbacking CLOSED_REGION_A of region "
769 + this.region_a.getRegionNameAsString(), e);
770 throw new RuntimeException(e);
771 }
772 break;
773
774 case OFFLINED_REGION_A:
775 if (services != null)
776 services.addToOnlineRegions(this.region_a);
777 break;
778
779 case CLOSED_REGION_B:
780 try {
781 this.region_b.initialize();
782 } catch (IOException e) {
783 LOG.error("Failed rollbacking CLOSED_REGION_A of region "
784 + this.region_b.getRegionNameAsString(), e);
785 throw new RuntimeException(e);
786 }
787 break;
788
789 case OFFLINED_REGION_B:
790 if (services != null)
791 services.addToOnlineRegions(this.region_b);
792 break;
793
794 case STARTED_MERGED_REGION_CREATION:
795 this.region_a.getRegionFileSystem().cleanupMergedRegion(
796 this.mergedRegionInfo);
797 break;
798
799 case PONR:
800
801
802 return false;
803
804 default:
805 throw new RuntimeException("Unhandled journal entry: " + je);
806 }
807 }
808
809 if (rsCoprocessorHost != null) {
810 rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b);
811 }
812
813 return result;
814 }
815
816 HRegionInfo getMergedRegionInfo() {
817 return this.mergedRegionInfo;
818 }
819
820
821 Path getMergesDir() {
822 return this.mergesdir;
823 }
824
825 private static void cleanZK(final Server server, final HRegionInfo hri) {
826 try {
827
828 if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
829 RS_ZK_REQUEST_REGION_MERGE, server.getServerName())) {
830 ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
831 RS_ZK_REGION_MERGING, server.getServerName());
832 }
833 } catch (KeeperException.NoNodeException e) {
834 LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
835 } catch (KeeperException e) {
836 server.abort("Failed cleanup zk node of " + hri.getRegionNameAsString(),e);
837 }
838 }
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854 public static void createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region,
855 final ServerName serverName, final HRegionInfo a,
856 final HRegionInfo b) throws KeeperException, IOException {
857 LOG.debug(zkw.prefix("Creating ephemeral node for "
858 + region.getEncodedName() + " in PENDING_MERGE state"));
859 byte [] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
860 RegionTransition rt = RegionTransition.createRegionTransition(
861 RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(), serverName, payload);
862 String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
863 if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
864 throw new IOException("Failed create of ephemeral " + node);
865 }
866 }
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907 public static int transitionMergingNode(ZooKeeperWatcher zkw,
908 HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName,
909 final int znodeVersion, final EventType beginState,
910 final EventType endState) throws KeeperException, IOException {
911 byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
912 return ZKAssign.transitionNode(zkw, merged, serverName,
913 beginState, endState, znodeVersion, payload);
914 }
915
916
917
918
919
920
921
922
923
924 boolean hasMergeQualifierInMeta(final RegionServerServices services,
925 final byte[] regionName) throws IOException {
926 if (services == null) return false;
927
928
929 Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaReader
930 .getRegionsFromMergeQualifier(services.getCatalogTracker(), regionName);
931 if (mergeRegions != null &&
932 (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) {
933
934 return true;
935 }
936 return false;
937 }
938 }