View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
22  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
23  import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.ListIterator;
29  import java.util.Map;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HRegionInfo;
37  import org.apache.hadoop.hbase.MetaMutationAnnotation;
38  import org.apache.hadoop.hbase.RegionTransition;
39  import org.apache.hadoop.hbase.Server;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.catalog.CatalogTracker;
42  import org.apache.hadoop.hbase.catalog.MetaEditor;
43  import org.apache.hadoop.hbase.catalog.MetaReader;
44  import org.apache.hadoop.hbase.client.Delete;
45  import org.apache.hadoop.hbase.client.HTable;
46  import org.apache.hadoop.hbase.client.Mutation;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.executor.EventType;
49  import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52  import org.apache.hadoop.hbase.util.Pair;
53  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
54  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
55  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
56  import org.apache.zookeeper.KeeperException;
57  import org.apache.zookeeper.KeeperException.NodeExistsException;
58  import org.apache.zookeeper.data.Stat;
59  
60  /**
61   * Executes region merge as a "transaction". It is similar with
62   * SplitTransaction. Call {@link #prepare(RegionServerServices)} to setup the
63   * transaction, {@link #execute(Server, RegionServerServices)} to run the
64   * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if
65   * execute fails.
66   * 
67   * <p>
68   * Here is an example of how you would use this class:
69   * 
70   * <pre>
71   *  RegionMergeTransaction mt = new RegionMergeTransaction(this.conf, parent, midKey)
72   *  if (!mt.prepare(services)) return;
73   *  try {
74   *    mt.execute(server, services);
75   *  } catch (IOException ioe) {
76   *    try {
77   *      mt.rollback(server, services);
78   *      return;
79   *    } catch (RuntimeException e) {
80   *      myAbortable.abort("Failed merge, abort");
81   *    }
82   *  }
83   * </Pre>
84   * <p>
85   * This class is not thread safe. Caller needs ensure merge is run by one thread
86   * only.
87   */
88  @InterfaceAudience.Private
89  public class RegionMergeTransaction {
90    private static final Log LOG = LogFactory.getLog(RegionMergeTransaction.class);
91  
92    // Merged region info
93    private HRegionInfo mergedRegionInfo;
94    // region_a sorts before region_b
95    private final HRegion region_a;
96    private final HRegion region_b;
97    // merges dir is under region_a
98    private final Path mergesdir;
99    private int znodeVersion = -1;
100   // We only merge adjacent regions if forcible is false
101   private final boolean forcible;
102 
103   /**
104    * Types to add to the transaction journal. Each enum is a step in the merge
105    * transaction. Used to figure how much we need to rollback.
106    */
107   enum JournalEntry {
108     /**
109      * Set region as in transition, set it into MERGING state.
110      */
111     SET_MERGING_IN_ZK,
112     /**
113      * We created the temporary merge data directory.
114      */
115     CREATED_MERGE_DIR,
116     /**
117      * Closed the merging region A.
118      */
119     CLOSED_REGION_A,
120     /**
121      * The merging region A has been taken out of the server's online regions list.
122      */
123     OFFLINED_REGION_A,
124     /**
125      * Closed the merging region B.
126      */
127     CLOSED_REGION_B,
128     /**
129      * The merging region B has been taken out of the server's online regions list.
130      */
131     OFFLINED_REGION_B,
132     /**
133      * Started in on creation of the merged region.
134      */
135     STARTED_MERGED_REGION_CREATION,
136     /**
137      * Point of no return. If we got here, then transaction is not recoverable
138      * other than by crashing out the regionserver.
139      */
140     PONR
141   }
142 
143   /*
144    * Journal of how far the merge transaction has progressed.
145    */
146   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
147 
148   private static IOException closedByOtherException = new IOException(
149       "Failed to close region: already closed by another thread");
150 
151   private RegionServerCoprocessorHost rsCoprocessorHost = null;
152 
153   /**
154    * Constructor
155    * @param a region a to merge
156    * @param b region b to merge
157    * @param forcible if false, we will only merge adjacent regions
158    */
159   public RegionMergeTransaction(final HRegion a, final HRegion b,
160       final boolean forcible) {
161     if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
162       this.region_a = a;
163       this.region_b = b;
164     } else {
165       this.region_a = b;
166       this.region_b = a;
167     }
168     this.forcible = forcible;
169     this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
170   }
171 
172   /**
173    * Does checks on merge inputs.
174    * @param services
175    * @return <code>true</code> if the regions are mergeable else
176    *         <code>false</code> if they are not (e.g. its already closed, etc.).
177    */
178   public boolean prepare(final RegionServerServices services) {
179     if (!region_a.getTableDesc().getTableName()
180         .equals(region_b.getTableDesc().getTableName())) {
181       LOG.info("Can't merge regions " + region_a + "," + region_b
182           + " because they do not belong to the same table");
183       return false;
184     }
185     if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) {
186       LOG.info("Can't merge the same region " + region_a);
187       return false;
188     }
189     if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(),
190             region_b.getRegionInfo())) {
191       String msg = "Skip merging " + this.region_a.getRegionNameAsString()
192           + " and " + this.region_b.getRegionNameAsString()
193           + ", because they are not adjacent.";
194       LOG.info(msg);
195       return false;
196     }
197     if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) {
198       return false;
199     }
200     try {
201       boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services,
202           region_a.getRegionName());
203       if (regionAHasMergeQualifier ||
204           hasMergeQualifierInMeta(services, region_b.getRegionName())) {
205         LOG.debug("Region " + (regionAHasMergeQualifier ? region_a.getRegionNameAsString()
206                 : region_b.getRegionNameAsString())
207             + " is not mergeable because it has merge qualifier in META");
208         return false;
209       }
210     } catch (IOException e) {
211       LOG.warn("Failed judging whether merge transaction is available for "
212               + region_a.getRegionNameAsString() + " and "
213               + region_b.getRegionNameAsString(), e);
214       return false;
215     }
216 
217     // WARN: make sure there is no parent region of the two merging regions in
218     // hbase:meta If exists, fixing up daughters would cause daughter regions(we
219     // have merged one) online again when we restart master, so we should clear
220     // the parent region to prevent the above case
221     // Since HBASE-7721, we don't need fix up daughters any more. so here do
222     // nothing
223 
224     this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(),
225         region_b.getRegionInfo());
226     return true;
227   }
228 
229   /**
230    * Run the transaction.
231    * @param server Hosting server instance. Can be null when testing (won't try
232    *          and update in zk if a null server)
233    * @param services Used to online/offline regions.
234    * @throws IOException If thrown, transaction failed. Call
235    *           {@link #rollback(Server, RegionServerServices)}
236    * @return merged region
237    * @throws IOException
238    * @see #rollback(Server, RegionServerServices)
239    */
240   public HRegion execute(final Server server,
241       final RegionServerServices services) throws IOException {
242     if (rsCoprocessorHost == null) {
243       rsCoprocessorHost = server != null ? ((HRegionServer) server).getCoprocessorHost() : null;
244     }
245     HRegion mergedRegion = createMergedRegion(server, services);
246     if (rsCoprocessorHost != null) {
247       rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
248     }
249     return stepsAfterPONR(server, services, mergedRegion);
250   }
251 
252   public HRegion stepsAfterPONR(final Server server, final RegionServerServices services,
253       HRegion mergedRegion) throws IOException {
254     openMergedRegion(server, services, mergedRegion);
255     transitionZKNode(server, services, mergedRegion);
256     return mergedRegion;
257   }
258 
259   /**
260    * Prepare the merged region and region files.
261    * @param server Hosting server instance. Can be null when testing (won't try
262    *          and update in zk if a null server)
263    * @param services Used to online/offline regions.
264    * @return merged region
265    * @throws IOException If thrown, transaction failed. Call
266    *           {@link #rollback(Server, RegionServerServices)}
267    */
268   HRegion createMergedRegion(final Server server,
269       final RegionServerServices services) throws IOException {
270     LOG.info("Starting merge of " + region_a + " and "
271         + region_b.getRegionNameAsString() + ", forcible=" + forcible);
272     if ((server != null && server.isStopped())
273         || (services != null && services.isStopping())) {
274       throw new IOException("Server is stopped or stopping");
275     }
276 
277     if (rsCoprocessorHost != null) {
278       if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) {
279         throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
280             + this.region_b + " merge.");
281       }
282     }
283 
284     // If true, no cluster to write meta edits to or to update znodes in.
285     boolean testing = server == null ? true : server.getConfiguration()
286         .getBoolean("hbase.testing.nocluster", false);
287 
288     HRegion mergedRegion = stepsBeforePONR(server, services, testing);
289 
290     @MetaMutationAnnotation
291     List<Mutation> metaEntries = new ArrayList<Mutation>();
292     if (rsCoprocessorHost != null) {
293       if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) {
294         throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
295             + this.region_b + " merge.");
296       }
297       try {
298         for (Mutation p : metaEntries) {
299           HRegionInfo.parseRegionName(p.getRow());
300         }
301       } catch (IOException e) {
302         LOG.error("Row key of mutation from coprocessor is not parsable as region name."
303             + "Mutations from coprocessor should only be for hbase:meta table.", e);
304         throw e;
305       }
306     }
307 
308     // This is the point of no return. Similar with SplitTransaction.
309     // IF we reach the PONR then subsequent failures need to crash out this
310     // regionserver
311     this.journal.add(JournalEntry.PONR);
312 
313     // Add merged region and delete region_a and region_b
314     // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
315     // will determine whether the region is merged or not in case of failures.
316     // If it is successful, master will roll-forward, if not, master will
317     // rollback
318     if (!testing) {
319       if (metaEntries.isEmpty()) {
320         MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a
321             .getRegionInfo(), region_b.getRegionInfo(), server.getServerName());
322       } else {
323         mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(),
324           region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries);
325       }
326     }
327     return mergedRegion;
328   }
329 
330   private void mergeRegionsAndPutMetaEntries(CatalogTracker catalogTracker,
331       HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB, ServerName serverName,
332       List<Mutation> metaEntries) throws IOException {
333     prepareMutationsForMerge(mergedRegion, regionA, regionB, serverName, metaEntries);
334     MetaEditor.mutateMetaTable(catalogTracker, metaEntries);
335   }
336 
337   public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA,
338       HRegionInfo regionB, ServerName serverName, List<Mutation> mutations) throws IOException {
339     HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
340 
341     // Put for parent
342     Put putOfMerged = MetaEditor.makePutFromRegionInfo(copyOfMerged);
343     putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray());
344     putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray());
345     mutations.add(putOfMerged);
346     // Deletes for merging regions
347     Delete deleteA = MetaEditor.makeDeleteFromRegionInfo(regionA);
348     Delete deleteB = MetaEditor.makeDeleteFromRegionInfo(regionB);
349     mutations.add(deleteA);
350     mutations.add(deleteB);
351     // The merged is a new region, openSeqNum = 1 is fine.
352     addLocation(putOfMerged, serverName, 1);
353   }
354 
355   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
356     p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
357         .toBytes(sn.getHostAndPort()));
358     p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
359         .getStartcode()));
360     p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
361     return p;
362   }
363 
364   public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
365       boolean testing) throws IOException {
366     // Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't
367     // have zookeeper so don't do zk stuff if server or zookeeper is null
368     if (server != null && server.getZooKeeper() != null) {
369       try {
370         createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
371           server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
372       } catch (KeeperException e) {
373         throw new IOException("Failed creating PENDING_MERGE znode on "
374             + this.mergedRegionInfo.getRegionNameAsString(), e);
375       }
376     }
377     this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
378     if (server != null && server.getZooKeeper() != null) {
379       // After creating the merge node, wait for master to transition it
380       // from PENDING_MERGE to MERGING so that we can move on. We want master
381       // knows about it and won't transition any region which is merging.
382       znodeVersion = getZKNode(server, services);
383     }
384 
385     this.region_a.getRegionFileSystem().createMergesDir();
386     this.journal.add(JournalEntry.CREATED_MERGE_DIR);
387 
388     Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
389         services, this.region_a, true, testing);
390     Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
391         services, this.region_b, false, testing);
392 
393     assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
394 
395 
396     //
397     // mergeStoreFiles creates merged region dirs under the region_a merges dir
398     // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will
399     // clean this up.
400     mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
401 
402     if (server != null && server.getZooKeeper() != null) {
403       try {
404         // Do one more check on the merging znode (before it is too late) in case
405         // any merging region is moved somehow. If so, the znode transition will fail.
406         this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
407           this.mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
408           server.getServerName(), this.znodeVersion,
409           RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGING);
410       } catch (KeeperException e) {
411         throw new IOException("Failed setting MERGING znode on "
412             + this.mergedRegionInfo.getRegionNameAsString(), e);
413       }
414     }
415 
416     // Log to the journal that we are creating merged region. We could fail
417     // halfway through. If we do, we could have left
418     // stuff in fs that needs cleanup -- a storefile or two. Thats why we
419     // add entry to journal BEFORE rather than AFTER the change.
420     this.journal.add(JournalEntry.STARTED_MERGED_REGION_CREATION);
421     HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
422         this.region_b, this.mergedRegionInfo);
423     return mergedRegion;
424   }
425 
426   /**
427    * Create a merged region from the merges directory under region a. In order
428    * to mock it for tests, place it with a new method.
429    * @param a hri of region a
430    * @param b hri of region b
431    * @param mergedRegion hri of merged region
432    * @return merged HRegion.
433    * @throws IOException
434    */
435   HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b,
436       final HRegionInfo mergedRegion) throws IOException {
437     return a.createMergedRegionFromMerges(mergedRegion, b);
438   }
439 
440   /**
441    * Close the merging region and offline it in regionserver
442    * @param services
443    * @param region
444    * @param isRegionA true if it is merging region a, false if it is region b
445    * @param testing true if it is testing
446    * @return a map of family name to list of store files
447    * @throws IOException
448    */
449   private Map<byte[], List<StoreFile>> closeAndOfflineRegion(
450       final RegionServerServices services, final HRegion region,
451       final boolean isRegionA, final boolean testing) throws IOException {
452     Map<byte[], List<StoreFile>> hstoreFilesToMerge = null;
453     Exception exceptionToThrow = null;
454     try {
455       hstoreFilesToMerge = region.close(false);
456     } catch (Exception e) {
457       exceptionToThrow = e;
458     }
459     if (exceptionToThrow == null && hstoreFilesToMerge == null) {
460       // The region was closed by a concurrent thread. We can't continue
461       // with the merge, instead we must just abandon the merge. If we
462       // reopen or merge this could cause problems because the region has
463       // probably already been moved to a different server, or is in the
464       // process of moving to a different server.
465       exceptionToThrow = closedByOtherException;
466     }
467     if (exceptionToThrow != closedByOtherException) {
468       this.journal.add(isRegionA ? JournalEntry.CLOSED_REGION_A
469           : JournalEntry.CLOSED_REGION_B);
470     }
471     if (exceptionToThrow != null) {
472       if (exceptionToThrow instanceof IOException)
473         throw (IOException) exceptionToThrow;
474       throw new IOException(exceptionToThrow);
475     }
476 
477     if (!testing) {
478       services.removeFromOnlineRegions(region, null);
479     }
480     this.journal.add(isRegionA ? JournalEntry.OFFLINED_REGION_A
481         : JournalEntry.OFFLINED_REGION_B);
482     return hstoreFilesToMerge;
483   }
484 
485   /**
486    * Get merged region info through the specified two regions
487    * @param a merging region A
488    * @param b merging region B
489    * @return the merged region info
490    */
491   public static HRegionInfo getMergedRegionInfo(final HRegionInfo a,
492       final HRegionInfo b) {
493     long rid = EnvironmentEdgeManager.currentTimeMillis();
494     // Regionid is timestamp. Merged region's id can't be less than that of
495     // merging regions else will insert at wrong location in hbase:meta
496     if (rid < a.getRegionId() || rid < b.getRegionId()) {
497       LOG.warn("Clock skew; merging regions id are " + a.getRegionId()
498           + " and " + b.getRegionId() + ", but current time here is " + rid);
499       rid = Math.max(a.getRegionId(), b.getRegionId()) + 1;
500     }
501 
502     byte[] startKey = null;
503     byte[] endKey = null;
504     // Choose the smaller as start key
505     if (a.compareTo(b) <= 0) {
506       startKey = a.getStartKey();
507     } else {
508       startKey = b.getStartKey();
509     }
510     // Choose the bigger as end key
511     if (Bytes.equals(a.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
512         || (!Bytes.equals(b.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
513             && Bytes.compareTo(a.getEndKey(), b.getEndKey()) > 0)) {
514       endKey = a.getEndKey();
515     } else {
516       endKey = b.getEndKey();
517     }
518 
519     // Merged region is sorted between two merging regions in META
520     HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTable(), startKey,
521         endKey, false, rid);
522     return mergedRegionInfo;
523   }
524 
525   /**
526    * Perform time consuming opening of the merged region.
527    * @param server Hosting server instance. Can be null when testing (won't try
528    *          and update in zk if a null server)
529    * @param services Used to online/offline regions.
530    * @param merged the merged region
531    * @throws IOException If thrown, transaction failed. Call
532    *           {@link #rollback(Server, RegionServerServices)}
533    */
534   void openMergedRegion(final Server server,
535       final RegionServerServices services, HRegion merged) throws IOException {
536     boolean stopped = server != null && server.isStopped();
537     boolean stopping = services != null && services.isStopping();
538     if (stopped || stopping) {
539       LOG.info("Not opening merged region  " + merged.getRegionNameAsString()
540           + " because stopping=" + stopping + ", stopped=" + stopped);
541       return;
542     }
543     HRegionInfo hri = merged.getRegionInfo();
544     LoggingProgressable reporter = server == null ? null
545         : new LoggingProgressable(hri, server.getConfiguration().getLong(
546             "hbase.regionserver.regionmerge.open.log.interval", 10000));
547     merged.openHRegion(reporter);
548 
549     if (services != null) {
550       try {
551         services.postOpenDeployTasks(merged, server.getCatalogTracker());
552         services.addToOnlineRegions(merged);
553       } catch (KeeperException ke) {
554         throw new IOException(ke);
555       }
556     }
557 
558   }
559 
560   /**
561    * Finish off merge transaction, transition the zknode
562    * @param server Hosting server instance. Can be null when testing (won't try
563    *          and update in zk if a null server)
564    * @param services Used to online/offline regions.
565    * @throws IOException If thrown, transaction failed. Call
566    *           {@link #rollback(Server, RegionServerServices)}
567    */
568   void transitionZKNode(final Server server, final RegionServerServices services,
569       HRegion mergedRegion) throws IOException {
570     if (server == null || server.getZooKeeper() == null) {
571       return;
572     }
573 
574     // Tell master about merge by updating zk. If we fail, abort.
575     try {
576       this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
577         this.mergedRegionInfo, region_a.getRegionInfo(),
578         region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
579         RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
580 
581       long startTime = EnvironmentEdgeManager.currentTimeMillis();
582       int spins = 0;
583       // Now wait for the master to process the merge. We know it's done
584       // when the znode is deleted. The reason we keep tickling the znode is
585       // that it's possible for the master to miss an event.
586       do {
587         if (spins % 10 == 0) {
588           LOG.debug("Still waiting on the master to process the merge for "
589               + this.mergedRegionInfo.getEncodedName() + ", waited "
590               + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
591         }
592         Thread.sleep(100);
593         // When this returns -1 it means the znode doesn't exist
594         this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
595           this.mergedRegionInfo, region_a.getRegionInfo(),
596           region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
597           RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
598         spins++;
599       } while (this.znodeVersion != -1 && !server.isStopped()
600           && !services.isStopping());
601     } catch (Exception e) {
602       if (e instanceof InterruptedException) {
603         Thread.currentThread().interrupt();
604       }
605       throw new IOException("Failed telling master about merge "
606           + mergedRegionInfo.getEncodedName(), e);
607     }
608 
609     if (rsCoprocessorHost != null) {
610       rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
611     }
612 
613     // Leaving here, the mergedir with its dross will be in place but since the
614     // merge was successful, just leave it; it'll be cleaned when region_a is
615     // cleaned up by CatalogJanitor on master
616   }
617 
618   /**
619    * Wait for the merging node to be transitioned from pending_merge
620    * to merging by master. That's how we are sure master has processed
621    * the event and is good with us to move on. If we don't get any update,
622    * we periodically transition the node so that master gets the callback.
623    * If the node is removed or is not in pending_merge state any more,
624    * we abort the merge.
625    */
626   private int getZKNode(final Server server,
627       final RegionServerServices services) throws IOException {
628     // Wait for the master to process the pending_merge.
629     try {
630       int spins = 0;
631       Stat stat = new Stat();
632       ZooKeeperWatcher zkw = server.getZooKeeper();
633       ServerName expectedServer = server.getServerName();
634       String node = mergedRegionInfo.getEncodedName();
635       while (!(server.isStopped() || services.isStopping())) {
636         if (spins % 5 == 0) {
637           LOG.debug("Still waiting for master to process "
638             + "the pending_merge for " + node);
639           transitionMergingNode(zkw, mergedRegionInfo, region_a.getRegionInfo(),
640             region_b.getRegionInfo(), expectedServer, -1, RS_ZK_REQUEST_REGION_MERGE,
641             RS_ZK_REQUEST_REGION_MERGE);
642         }
643         Thread.sleep(100);
644         spins++;
645         byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
646         if (data == null) {
647           throw new IOException("Data is null, merging node "
648             + node + " no longer exists");
649         }
650         RegionTransition rt = RegionTransition.parseFrom(data);
651         EventType et = rt.getEventType();
652         if (et == RS_ZK_REGION_MERGING) {
653           ServerName serverName = rt.getServerName();
654           if (!serverName.equals(expectedServer)) {
655             throw new IOException("Merging node " + node + " is for "
656               + serverName + ", not us " + expectedServer);
657           }
658           byte [] payloadOfMerging = rt.getPayload();
659           List<HRegionInfo> mergingRegions = HRegionInfo.parseDelimitedFrom(
660             payloadOfMerging, 0, payloadOfMerging.length);
661           assert mergingRegions.size() == 3;
662           HRegionInfo a = mergingRegions.get(1);
663           HRegionInfo b = mergingRegions.get(2);
664           HRegionInfo hri_a = region_a.getRegionInfo();
665           HRegionInfo hri_b = region_b.getRegionInfo();
666           if (!(hri_a.equals(a) && hri_b.equals(b))) {
667             throw new IOException("Merging node " + node + " is for " + a + ", "
668               + b + ", not expected regions: " + hri_a + ", " + hri_b);
669           }
670           // Master has processed it.
671           return stat.getVersion();
672         }
673         if (et != RS_ZK_REQUEST_REGION_MERGE) {
674           throw new IOException("Merging node " + node
675             + " moved out of merging to " + et);
676         }
677       }
678       // Server is stopping/stopped
679       throw new IOException("Server is "
680         + (services.isStopping() ? "stopping" : "stopped"));
681     } catch (Exception e) {
682       if (e instanceof InterruptedException) {
683         Thread.currentThread().interrupt();
684       }
685       throw new IOException("Failed getting MERGING znode on "
686         + mergedRegionInfo.getRegionNameAsString(), e);
687     }
688   }
689 
690   /**
691    * Create reference file(s) of merging regions under the region_a merges dir
692    * @param hstoreFilesOfRegionA
693    * @param hstoreFilesOfRegionB
694    * @throws IOException
695    */
696   private void mergeStoreFiles(
697       Map<byte[], List<StoreFile>> hstoreFilesOfRegionA,
698       Map<byte[], List<StoreFile>> hstoreFilesOfRegionB)
699       throws IOException {
700     // Create reference file(s) of region A in mergdir
701     HRegionFileSystem fs_a = this.region_a.getRegionFileSystem();
702     for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionA
703         .entrySet()) {
704       String familyName = Bytes.toString(entry.getKey());
705       for (StoreFile storeFile : entry.getValue()) {
706         fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
707             this.mergesdir);
708       }
709     }
710     // Create reference file(s) of region B in mergedir
711     HRegionFileSystem fs_b = this.region_b.getRegionFileSystem();
712     for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionB
713         .entrySet()) {
714       String familyName = Bytes.toString(entry.getKey());
715       for (StoreFile storeFile : entry.getValue()) {
716         fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
717             this.mergesdir);
718       }
719     }
720   }
721 
722   /**
723    * @param server Hosting server instance (May be null when testing).
724    * @param services Services of regionserver, used to online regions.
725    * @throws IOException If thrown, rollback failed. Take drastic action.
726    * @return True if we successfully rolled back, false if we got to the point
727    *         of no return and so now need to abort the server to minimize
728    *         damage.
729    */
730   @SuppressWarnings("deprecation")
731   public boolean rollback(final Server server,
732       final RegionServerServices services) throws IOException {
733     assert this.mergedRegionInfo != null;
734     // Coprocessor callback
735     if (rsCoprocessorHost != null) {
736       rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b);
737     }
738 
739     boolean result = true;
740     ListIterator<JournalEntry> iterator = this.journal
741         .listIterator(this.journal.size());
742     // Iterate in reverse.
743     while (iterator.hasPrevious()) {
744       JournalEntry je = iterator.previous();
745       switch (je) {
746 
747         case SET_MERGING_IN_ZK:
748           if (server != null && server.getZooKeeper() != null) {
749             cleanZK(server, this.mergedRegionInfo);
750           }
751           break;
752 
753         case CREATED_MERGE_DIR:
754           this.region_a.writestate.writesEnabled = true;
755           this.region_b.writestate.writesEnabled = true;
756           this.region_a.getRegionFileSystem().cleanupMergesDir();
757           break;
758 
759         case CLOSED_REGION_A:
760           try {
761             // So, this returns a seqid but if we just closed and then reopened,
762             // we should be ok. On close, we flushed using sequenceid obtained
763             // from hosting regionserver so no need to propagate the sequenceid
764             // returned out of initialize below up into regionserver as we
765             // normally do.
766             this.region_a.initialize();
767           } catch (IOException e) {
768             LOG.error("Failed rollbacking CLOSED_REGION_A of region "
769                 + this.region_a.getRegionNameAsString(), e);
770             throw new RuntimeException(e);
771           }
772           break;
773 
774         case OFFLINED_REGION_A:
775           if (services != null)
776             services.addToOnlineRegions(this.region_a);
777           break;
778 
779         case CLOSED_REGION_B:
780           try {
781             this.region_b.initialize();
782           } catch (IOException e) {
783             LOG.error("Failed rollbacking CLOSED_REGION_A of region "
784                 + this.region_b.getRegionNameAsString(), e);
785             throw new RuntimeException(e);
786           }
787           break;
788 
789         case OFFLINED_REGION_B:
790           if (services != null)
791             services.addToOnlineRegions(this.region_b);
792           break;
793 
794         case STARTED_MERGED_REGION_CREATION:
795           this.region_a.getRegionFileSystem().cleanupMergedRegion(
796               this.mergedRegionInfo);
797           break;
798 
799         case PONR:
800           // We got to the point-of-no-return so we need to just abort. Return
801           // immediately. Do not clean up created merged regions.
802           return false;
803 
804         default:
805           throw new RuntimeException("Unhandled journal entry: " + je);
806       }
807     }
808     // Coprocessor callback
809     if (rsCoprocessorHost != null) {
810       rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b);
811     }
812 
813     return result;
814   }
815 
816   HRegionInfo getMergedRegionInfo() {
817     return this.mergedRegionInfo;
818   }
819 
820   // For unit testing.
821   Path getMergesDir() {
822     return this.mergesdir;
823   }
824 
825   private static void cleanZK(final Server server, final HRegionInfo hri) {
826     try {
827       // Only delete if its in expected state; could have been hijacked.
828       if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
829           RS_ZK_REQUEST_REGION_MERGE, server.getServerName())) {
830         ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
831           RS_ZK_REGION_MERGING, server.getServerName());
832       }
833     } catch (KeeperException.NoNodeException e) {
834       LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
835     } catch (KeeperException e) {
836       server.abort("Failed cleanup zk node of " + hri.getRegionNameAsString(),e);
837     }
838   }
839 
840   /**
841    * Creates a new ephemeral node in the PENDING_MERGE state for the merged region.
842    * Create it ephemeral in case regionserver dies mid-merge.
843    *
844    * <p>
845    * Does not transition nodes from other states. If a node already exists for
846    * this region, a {@link NodeExistsException} will be thrown.
847    *
848    * @param zkw zk reference
849    * @param region region to be created as offline
850    * @param serverName server event originates from
851    * @throws KeeperException
852    * @throws IOException
853    */
854   public static void createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region,
855       final ServerName serverName, final HRegionInfo a,
856       final HRegionInfo b) throws KeeperException, IOException {
857     LOG.debug(zkw.prefix("Creating ephemeral node for "
858       + region.getEncodedName() + " in PENDING_MERGE state"));
859     byte [] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
860     RegionTransition rt = RegionTransition.createRegionTransition(
861       RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(), serverName, payload);
862     String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
863     if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
864       throw new IOException("Failed create of ephemeral " + node);
865     }
866   }
867 
868   /**
869    * Transitions an existing ephemeral node for the specified region which is
870    * currently in the begin state to be in the end state. Master cleans up the
871    * final MERGE znode when it reads it (or if we crash, zk will clean it up).
872    *
873    * <p>
874    * Does not transition nodes from other states. If for some reason the node
875    * could not be transitioned, the method returns -1. If the transition is
876    * successful, the version of the node after transition is returned.
877    *
878    * <p>
879    * This method can fail and return false for three different reasons:
880    * <ul>
881    * <li>Node for this region does not exist</li>
882    * <li>Node for this region is not in the begin state</li>
883    * <li>After verifying the begin state, update fails because of wrong version
884    * (this should never actually happen since an RS only does this transition
885    * following a transition to the begin state. If two RS are conflicting, one would
886    * fail the original transition to the begin state and not this transition)</li>
887    * </ul>
888    *
889    * <p>
890    * Does not set any watches.
891    *
892    * <p>
893    * This method should only be used by a RegionServer when merging two regions.
894    *
895    * @param zkw zk reference
896    * @param merged region to be transitioned to opened
897    * @param a merging region A
898    * @param b merging region B
899    * @param serverName server event originates from
900    * @param znodeVersion expected version of data before modification
901    * @param beginState the expected current state the znode should be
902    * @param endState the state to be transition to
903    * @return version of node after transition, -1 if unsuccessful transition
904    * @throws KeeperException if unexpected zookeeper exception
905    * @throws IOException
906    */
907   public static int transitionMergingNode(ZooKeeperWatcher zkw,
908       HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName,
909       final int znodeVersion, final EventType beginState,
910       final EventType endState) throws KeeperException, IOException {
911     byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
912     return ZKAssign.transitionNode(zkw, merged, serverName,
913       beginState, endState, znodeVersion, payload);
914   }
915 
916   /**
917    * Checks if the given region has merge qualifier in hbase:meta
918    * @param services
919    * @param regionName name of specified region
920    * @return true if the given region has merge qualifier in META.(It will be
921    *         cleaned by CatalogJanitor)
922    * @throws IOException
923    */
924   boolean hasMergeQualifierInMeta(final RegionServerServices services,
925       final byte[] regionName) throws IOException {
926     if (services == null) return false;
927     // Get merge regions if it is a merged region and already has merge
928     // qualifier
929     Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaReader
930         .getRegionsFromMergeQualifier(services.getCatalogTracker(), regionName);
931     if (mergeRegions != null &&
932         (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) {
933       // It has merge qualifier
934       return true;
935     }
936     return false;
937   }
938 }