1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.lang.reflect.Constructor;
24 import java.lang.reflect.InvocationTargetException;
25 import java.net.InetAddress;
26 import java.net.InetSocketAddress;
27 import java.net.UnknownHostException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.Comparator;
32 import java.util.HashSet;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import java.util.regex.Pattern;
40
41 import javax.servlet.ServletException;
42 import javax.servlet.http.HttpServlet;
43 import javax.servlet.http.HttpServletRequest;
44 import javax.servlet.http.HttpServletResponse;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.hbase.ClusterStatus;
51 import org.apache.hadoop.hbase.CoordinatedStateException;
52 import org.apache.hadoop.hbase.CoordinatedStateManager;
53 import org.apache.hadoop.hbase.DoNotRetryIOException;
54 import org.apache.hadoop.hbase.HBaseIOException;
55 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
56 import org.apache.hadoop.hbase.HColumnDescriptor;
57 import org.apache.hadoop.hbase.HConstants;
58 import org.apache.hadoop.hbase.HRegionInfo;
59 import org.apache.hadoop.hbase.HTableDescriptor;
60 import org.apache.hadoop.hbase.MasterNotRunningException;
61 import org.apache.hadoop.hbase.MetaMigrationConvertingToPB;
62 import org.apache.hadoop.hbase.MetaTableAccessor;
63 import org.apache.hadoop.hbase.NamespaceDescriptor;
64 import org.apache.hadoop.hbase.NamespaceNotFoundException;
65 import org.apache.hadoop.hbase.PleaseHoldException;
66 import org.apache.hadoop.hbase.ProcedureInfo;
67 import org.apache.hadoop.hbase.RegionStateListener;
68 import org.apache.hadoop.hbase.Server;
69 import org.apache.hadoop.hbase.ServerLoad;
70 import org.apache.hadoop.hbase.ServerName;
71 import org.apache.hadoop.hbase.TableDescriptors;
72 import org.apache.hadoop.hbase.TableName;
73 import org.apache.hadoop.hbase.TableNotDisabledException;
74 import org.apache.hadoop.hbase.TableNotFoundException;
75 import org.apache.hadoop.hbase.UnknownRegionException;
76 import org.apache.hadoop.hbase.classification.InterfaceAudience;
77 import org.apache.hadoop.hbase.client.MetaScanner;
78 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
79 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
80 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
81 import org.apache.hadoop.hbase.client.Result;
82 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
83 import org.apache.hadoop.hbase.exceptions.DeserializationException;
84 import org.apache.hadoop.hbase.executor.ExecutorType;
85 import org.apache.hadoop.hbase.ipc.RpcServer;
86 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
87 import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
88 import org.apache.hadoop.hbase.master.RegionState.State;
89 import org.apache.hadoop.hbase.master.balancer.BalancerChore;
90 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
91 import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
92 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
93 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
94 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
95 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
96 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
97 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
98 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
99 import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
100 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
101 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
102 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
103 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
104 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
105 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
106 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
107 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
108 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
109 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
110 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
111 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
112 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
113 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
114 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
115 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
116 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
117 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
118 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
119 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
120 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
121 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
122 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
123 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
124 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
125 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
126 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
127 import org.apache.hadoop.hbase.regionserver.HRegionServer;
128 import org.apache.hadoop.hbase.regionserver.HStore;
129 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
130 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
131 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
132 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
133 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
134 import org.apache.hadoop.hbase.replication.regionserver.Replication;
135 import org.apache.hadoop.hbase.security.UserProvider;
136 import org.apache.hadoop.hbase.util.Addressing;
137 import org.apache.hadoop.hbase.util.Bytes;
138 import org.apache.hadoop.hbase.util.CompressionTest;
139 import org.apache.hadoop.hbase.util.ConfigUtil;
140 import org.apache.hadoop.hbase.util.EncryptionTest;
141 import org.apache.hadoop.hbase.util.FSUtils;
142 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
143 import org.apache.hadoop.hbase.util.HasThread;
144 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
145 import org.apache.hadoop.hbase.util.Pair;
146 import org.apache.hadoop.hbase.util.Threads;
147 import org.apache.hadoop.hbase.util.VersionInfo;
148 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
149 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
150 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
151 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
152 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
153 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
154 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
155 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
156 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
157 import org.apache.zookeeper.KeeperException;
158 import org.mortbay.jetty.Connector;
159 import org.mortbay.jetty.nio.SelectChannelConnector;
160 import org.mortbay.jetty.servlet.Context;
161
162 import com.google.common.annotations.VisibleForTesting;
163 import com.google.common.collect.Maps;
164 import com.google.protobuf.Descriptors;
165 import com.google.protobuf.Service;
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
183 @SuppressWarnings("deprecation")
184 public class HMaster extends HRegionServer implements MasterServices, Server {
185 private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
186
187
188
189
190
191 private static class InitializationMonitor extends HasThread {
192
193 public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
194 public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
195
196
197
198
199
200 public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
201 public static final boolean HALT_DEFAULT = false;
202
203 private final HMaster master;
204 private final long timeout;
205 private final boolean haltOnTimeout;
206
207
208 InitializationMonitor(HMaster master) {
209 super("MasterInitializationMonitor");
210 this.master = master;
211 this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
212 this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
213 this.setDaemon(true);
214 }
215
216 @Override
217 public void run() {
218 try {
219 while (!master.isStopped() && master.isActiveMaster()) {
220 Thread.sleep(timeout);
221 if (master.isInitialized()) {
222 LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
223 } else {
224 LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
225 + " consider submitting a bug report including a thread dump of this process.");
226 if (haltOnTimeout) {
227 LOG.error("Zombie Master exiting. Thread dump to stdout");
228 Threads.printThreadInfo(System.out, "Zombie HMaster");
229 System.exit(-1);
230 }
231 }
232 }
233 } catch (InterruptedException ie) {
234 LOG.trace("InitMonitor thread interrupted. Existing.");
235 }
236 }
237 }
238
239
240
241 public static final String MASTER = "master";
242
243
244 private final ActiveMasterManager activeMasterManager;
245
246 RegionServerTracker regionServerTracker;
247
248 private DrainingServerTracker drainingServerTracker;
249
250 LoadBalancerTracker loadBalancerTracker;
251
252
253 private RegionNormalizerTracker regionNormalizerTracker;
254
255
256 private TableNamespaceManager tableNamespaceManager;
257
258
259 final MetricsMaster metricsMaster;
260
261 private MasterFileSystem fileSystemManager;
262
263
264 volatile ServerManager serverManager;
265
266
267 AssignmentManager assignmentManager;
268
269
270
271
272 MemoryBoundedLogMessageBuffer rsFatals;
273
274
275 private volatile boolean isActiveMaster = false;
276
277
278
279 private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
280
281
282
283 volatile boolean serviceStarted = false;
284
285
286 private final ProcedureEvent serverCrashProcessingEnabled =
287 new ProcedureEvent("server crash processing");
288
289 LoadBalancer balancer;
290 private RegionNormalizer normalizer;
291 private BalancerChore balancerChore;
292 private RegionNormalizerChore normalizerChore;
293 private ClusterStatusChore clusterStatusChore;
294 private ClusterStatusPublisher clusterStatusPublisherChore = null;
295
296 CatalogJanitor catalogJanitorChore;
297 private LogCleaner logCleaner;
298 private HFileCleaner hfileCleaner;
299
300 MasterCoprocessorHost cpHost;
301
302 private final boolean preLoadTableDescriptors;
303
304
305 private long masterActiveTime;
306
307
308 private final boolean masterCheckCompression;
309
310
311 private final boolean masterCheckEncryption;
312
313 Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
314
315
316 SnapshotManager snapshotManager;
317
318 MasterProcedureManagerHost mpmHost;
319
320
321 private volatile MasterQuotaManager quotaManager;
322
323 private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
324 private WALProcedureStore procedureStore;
325
326
327 private volatile boolean initializationBeforeMetaAssignment = false;
328
329
330 private org.mortbay.jetty.Server masterJettyServer;
331
332 public static class RedirectServlet extends HttpServlet {
333 private static final long serialVersionUID = 2894774810058302472L;
334 private static int regionServerInfoPort;
335
336 @Override
337 public void doGet(HttpServletRequest request,
338 HttpServletResponse response) throws ServletException, IOException {
339 String redirectUrl = request.getScheme() + "://"
340 + request.getServerName() + ":" + regionServerInfoPort
341 + request.getRequestURI();
342 response.sendRedirect(redirectUrl);
343 }
344 }
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362 public HMaster(final Configuration conf, CoordinatedStateManager csm)
363 throws IOException, KeeperException, InterruptedException {
364 super(conf, csm);
365 this.rsFatals = new MemoryBoundedLogMessageBuffer(
366 conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
367
368 LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
369 ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
370
371
372 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
373
374 Replication.decorateMasterConfiguration(this.conf);
375
376
377
378 if (this.conf.get("mapreduce.task.attempt.id") == null) {
379 this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
380 }
381
382
383 this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
384
385
386 this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
387
388 this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
389
390
391 this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
392
393
394
395 boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
396 HConstants.STATUS_PUBLISHED_DEFAULT);
397 Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
398 conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
399 ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
400 ClusterStatusPublisher.Publisher.class);
401
402 if (shouldPublish) {
403 if (publisherClass == null) {
404 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
405 ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
406 " is not set - not publishing status");
407 } else {
408 clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
409 getChoreService().scheduleChore(clusterStatusPublisherChore);
410 }
411 }
412
413
414 if (!conf.getBoolean("hbase.testing.nocluster", false)) {
415 activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
416 int infoPort = putUpJettyServer();
417 startActiveMasterManager(infoPort);
418 } else {
419 activeMasterManager = null;
420 }
421 }
422
423
424 private int putUpJettyServer() throws IOException {
425 if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
426 return -1;
427 }
428 int infoPort = conf.getInt("hbase.master.info.port.orig",
429 HConstants.DEFAULT_MASTER_INFOPORT);
430
431 if (infoPort < 0 || infoServer == null) {
432 return -1;
433 }
434 String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
435 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
436 String msg =
437 "Failed to start redirecting jetty server. Address " + addr
438 + " does not belong to this host. Correct configuration parameter: "
439 + "hbase.master.info.bindAddress";
440 LOG.error(msg);
441 throw new IOException(msg);
442 }
443
444 RedirectServlet.regionServerInfoPort = infoServer.getPort();
445 if(RedirectServlet.regionServerInfoPort == infoPort) {
446 return infoPort;
447 }
448 masterJettyServer = new org.mortbay.jetty.Server();
449 Connector connector = new SelectChannelConnector();
450 connector.setHost(addr);
451 connector.setPort(infoPort);
452 masterJettyServer.addConnector(connector);
453 masterJettyServer.setStopAtShutdown(true);
454 Context context = new Context(masterJettyServer, "/", Context.NO_SESSIONS);
455 context.addServlet(RedirectServlet.class, "/*");
456 try {
457 masterJettyServer.start();
458 } catch (Exception e) {
459 throw new IOException("Failed to start redirecting jetty server", e);
460 }
461 return connector.getLocalPort();
462 }
463
464
465
466
467 @Override
468 protected void login(UserProvider user, String host) throws IOException {
469 try {
470 super.login(user, host);
471 } catch (IOException ie) {
472 user.login("hbase.master.keytab.file",
473 "hbase.master.kerberos.principal", host);
474 }
475 }
476
477
478
479
480
481
482 @Override
483 protected void waitForMasterActive(){
484 boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
485 while (!(tablesOnMaster && isActiveMaster)
486 && !isStopped() && !isAborted()) {
487 sleeper.sleep();
488 }
489 }
490
491 @VisibleForTesting
492 public MasterRpcServices getMasterRpcServices() {
493 return (MasterRpcServices)rpcServices;
494 }
495
496 public boolean balanceSwitch(final boolean b) throws IOException {
497 return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
498 }
499
500 @Override
501 protected String getProcessName() {
502 return MASTER;
503 }
504
505 @Override
506 protected boolean canCreateBaseZNode() {
507 return true;
508 }
509
510 @Override
511 protected boolean canUpdateTableDescriptor() {
512 return true;
513 }
514
515 @Override
516 protected RSRpcServices createRpcServices() throws IOException {
517 return new MasterRpcServices(this);
518 }
519
520 @Override
521 protected void configureInfoServer() {
522 infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
523 infoServer.setAttribute(MASTER, this);
524 if (BaseLoadBalancer.tablesOnMaster(conf)) {
525 super.configureInfoServer();
526 }
527 }
528
529 @Override
530 protected Class<? extends HttpServlet> getDumpServlet() {
531 return MasterDumpServlet.class;
532 }
533
534
535
536
537
538 @Override
539 protected void doMetrics() {
540 try {
541 if (assignmentManager != null) {
542 assignmentManager.updateRegionsInTransitionMetrics();
543 }
544 } catch (Throwable e) {
545 LOG.error("Couldn't update metrics: " + e.getMessage());
546 }
547 }
548
549 MetricsMaster getMasterMetrics() {
550 return metricsMaster;
551 }
552
553
554
555
556
557
558
559
560 void initializeZKBasedSystemTrackers() throws IOException,
561 InterruptedException, KeeperException, CoordinatedStateException {
562 this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
563 this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
564 this.normalizer.setMasterServices(this);
565 this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
566 this.loadBalancerTracker.start();
567 this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
568 this.regionNormalizerTracker.start();
569 this.assignmentManager = new AssignmentManager(this, serverManager,
570 this.balancer, this.service, this.metricsMaster,
571 this.tableLockManager);
572 zooKeeper.registerListenerFirst(assignmentManager);
573
574 this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
575 this.serverManager);
576 this.regionServerTracker.start();
577
578 this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
579 this.serverManager);
580 this.drainingServerTracker.start();
581
582
583
584 boolean wasUp = this.clusterStatusTracker.isClusterUp();
585 if (!wasUp) this.clusterStatusTracker.setClusterUp();
586
587 LOG.info("Server active/primary master=" + this.serverName +
588 ", sessionid=0x" +
589 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
590 ", setting cluster-up flag (Was=" + wasUp + ")");
591
592
593 this.snapshotManager = new SnapshotManager();
594 this.mpmHost = new MasterProcedureManagerHost();
595 this.mpmHost.register(this.snapshotManager);
596 this.mpmHost.register(new MasterFlushTableProcedureManager());
597 this.mpmHost.loadProcedures(conf);
598 this.mpmHost.initialize(this, this.metricsMaster);
599 }
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621 private void finishActiveMasterInitialization(MonitoredTask status)
622 throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
623
624 isActiveMaster = true;
625 Thread zombieDetector = new Thread(new InitializationMonitor(this));
626 zombieDetector.start();
627
628
629
630
631
632
633
634 status.setStatus("Initializing Master file system");
635
636 this.masterActiveTime = System.currentTimeMillis();
637
638 this.fileSystemManager = new MasterFileSystem(this, this);
639
640
641 this.tableDescriptors.setCacheOn();
642
643 this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
644 conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
645
646 if (preLoadTableDescriptors) {
647 status.setStatus("Pre-loading table descriptors");
648 this.tableDescriptors.getAll();
649 }
650
651
652 status.setStatus("Publishing Cluster ID in ZooKeeper");
653 ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
654 this.serverManager = createServerManager(this, this);
655
656 setupClusterConnection();
657
658
659 this.tableLockManager.reapWriteLocks();
660
661 status.setStatus("Initializing ZK system trackers");
662 initializeZKBasedSystemTrackers();
663
664
665 status.setStatus("Initializing master coprocessors");
666 this.cpHost = new MasterCoprocessorHost(this, this.conf);
667
668
669 status.setStatus("Initializing master service threads");
670 startServiceThreads();
671
672
673 sleeper.skipSleepCycle();
674
675
676 this.serverManager.waitForRegionServers(status);
677
678 for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
679
680 if (!this.serverManager.isServerOnline(sn)
681 && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
682 LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
683 }
684 }
685
686
687
688
689 Set<ServerName> previouslyFailedServers =
690 this.fileSystemManager.getFailedServersFromLogFolders();
691
692
693 ServerName oldMetaServerLocation = metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
694 if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
695 splitMetaLogBeforeAssignment(oldMetaServerLocation);
696
697
698 }
699 Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
700
701
702
703
704
705
706
707
708 previouslyFailedMetaRSs.addAll(previouslyFailedServers);
709
710 this.initializationBeforeMetaAssignment = true;
711
712
713 if (BaseLoadBalancer.tablesOnMaster(conf)) {
714 waitForServerOnline();
715 }
716
717
718 this.balancer.setClusterStatus(getClusterStatus());
719 this.balancer.setMasterServices(this);
720 this.balancer.initialize();
721
722
723
724 if (isStopped()) return;
725
726
727 status.setStatus("Assigning Meta Region");
728 assignMeta(status, previouslyFailedMetaRSs, HRegionInfo.DEFAULT_REPLICA_ID);
729
730
731 if (isStopped()) return;
732
733 status.setStatus("Submitting log splitting work for previously failed region servers");
734
735
736 for (ServerName tmpServer : previouslyFailedServers) {
737 this.serverManager.processDeadServer(tmpServer, true);
738 }
739
740
741
742 if (this.conf.getBoolean("hbase.MetaMigrationConvertingToPB", true)) {
743 MetaMigrationConvertingToPB.updateMetaIfNecessary(this);
744 }
745
746
747 status.setStatus("Starting assignment manager");
748 this.assignmentManager.joinCluster();
749
750
751 this.balancer.setClusterStatus(getClusterStatus());
752
753
754 status.setStatus("Starting balancer and catalog janitor");
755 this.clusterStatusChore = new ClusterStatusChore(this, balancer);
756 getChoreService().scheduleChore(clusterStatusChore);
757 this.balancerChore = new BalancerChore(this);
758 getChoreService().scheduleChore(balancerChore);
759 this.normalizerChore = new RegionNormalizerChore(this);
760 getChoreService().scheduleChore(normalizerChore);
761 this.catalogJanitorChore = new CatalogJanitor(this, this);
762 getChoreService().scheduleChore(catalogJanitorChore);
763
764 status.setStatus("Starting namespace manager");
765 initNamespace();
766
767 if (this.cpHost != null) {
768 try {
769 this.cpHost.preMasterInitialization();
770 } catch (IOException e) {
771 LOG.error("Coprocessor preMasterInitialization() hook failed", e);
772 }
773 }
774
775 status.markComplete("Initialization successful");
776 LOG.info("Master has completed initialization");
777 configurationManager.registerObserver(this.balancer);
778
779
780 setInitialized(true);
781
782 status.setStatus("Starting quota manager");
783 initQuotaManager();
784
785
786 Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
787 int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
788 HConstants.DEFAULT_META_REPLICA_NUM);
789 for (int i = 1; i < numReplicas; i++) {
790 assignMeta(status, EMPTY_SET, i);
791 }
792 unassignExcessMetaReplica(zooKeeper, numReplicas);
793
794
795
796
797 this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
798
799
800 status.setStatus("Checking ZNode ACLs");
801 zooKeeper.checkAndSetZNodeAcls();
802
803 status.setStatus("Calling postStartMaster coprocessors");
804 if (this.cpHost != null) {
805
806 try {
807 this.cpHost.postStartMaster();
808 } catch (IOException ioe) {
809 LOG.error("Coprocessor postStartMaster() hook failed", ioe);
810 }
811 }
812
813 zombieDetector.interrupt();
814 }
815
816 private void initQuotaManager() throws IOException {
817 quotaManager = new MasterQuotaManager(this);
818 this.assignmentManager.setRegionStateListener((RegionStateListener) quotaManager);
819 quotaManager.start();
820 }
821
822
823
824
825
826
827
828
829
830 ServerManager createServerManager(final Server master,
831 final MasterServices services)
832 throws IOException {
833
834
835 return new ServerManager(master, services);
836 }
837
838 private void unassignExcessMetaReplica(ZooKeeperWatcher zkw, int numMetaReplicasConfigured) {
839
840
841 try {
842 List<String> metaReplicaZnodes = zooKeeper.getMetaReplicaNodes();
843 for (String metaReplicaZnode : metaReplicaZnodes) {
844 int replicaId = zooKeeper.getMetaReplicaIdFromZnode(metaReplicaZnode);
845 if (replicaId >= numMetaReplicasConfigured) {
846 RegionState r = MetaTableLocator.getMetaRegionState(zkw, replicaId);
847 LOG.info("Closing excess replica of meta region " + r.getRegion());
848
849 ServerManager.closeRegionSilentlyAndWait(getConnection(), r.getServerName(),
850 r.getRegion(), 30000);
851 ZKUtil.deleteNode(zkw, zkw.getZNodeForReplica(replicaId));
852 }
853 }
854 } catch (Exception ex) {
855
856
857 LOG.warn("Ignoring exception " + ex);
858 }
859 }
860
861
862
863
864
865
866
867
868
869
870 void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs, int replicaId)
871 throws InterruptedException, IOException, KeeperException {
872
873 int assigned = 0;
874 long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
875 if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
876 status.setStatus("Assigning hbase:meta region");
877 } else {
878 status.setStatus("Assigning hbase:meta region, replicaId " + replicaId);
879 }
880
881 RegionStates regionStates = assignmentManager.getRegionStates();
882 RegionState metaState = MetaTableLocator.getMetaRegionState(getZooKeeper(), replicaId);
883 HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO,
884 replicaId);
885 ServerName currentMetaServer = metaState.getServerName();
886 if (!ConfigUtil.useZKForAssignment(conf)) {
887 regionStates.createRegionState(hri, metaState.getState(),
888 currentMetaServer, null);
889 } else {
890 regionStates.createRegionState(hri);
891 }
892 boolean rit = this.assignmentManager.
893 processRegionInTransitionAndBlockUntilAssigned(hri);
894 boolean metaRegionLocation = metaTableLocator.verifyMetaRegionLocation(
895 this.getConnection(), this.getZooKeeper(), timeout, replicaId);
896 if (!metaRegionLocation || !metaState.isOpened()) {
897
898
899 assigned++;
900 if (!ConfigUtil.useZKForAssignment(conf)) {
901 assignMetaZkLess(regionStates, metaState, timeout, previouslyFailedMetaRSs);
902 } else if (!rit) {
903
904 if (currentMetaServer != null) {
905
906
907
908
909
910
911
912 if (serverManager.isServerOnline(currentMetaServer)) {
913 LOG.info("Forcing expire of " + currentMetaServer);
914 serverManager.expireServer(currentMetaServer);
915 }
916 if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
917 splitMetaLogBeforeAssignment(currentMetaServer);
918 previouslyFailedMetaRSs.add(currentMetaServer);
919 }
920 }
921 assignmentManager.assignMeta(hri);
922 }
923 } else {
924
925 regionStates.updateRegionState(
926 HRegionInfo.FIRST_META_REGIONINFO, State.OPEN, currentMetaServer);
927 this.assignmentManager.regionOnline(
928 HRegionInfo.FIRST_META_REGIONINFO, currentMetaServer);
929 }
930
931 if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableMeta(TableName.META_TABLE_NAME);
932
933 if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
934 && (!previouslyFailedMetaRSs.isEmpty())) {
935
936 status.setStatus("replaying log for Meta Region");
937 this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
938 }
939
940
941
942
943
944 if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0);
945 LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", rit=" + rit +
946 ", location=" + metaTableLocator.getMetaRegionLocation(this.getZooKeeper(), replicaId));
947 status.setStatus("META assigned.");
948 }
949
950 private void assignMetaZkLess(RegionStates regionStates, RegionState regionState, long timeout,
951 Set<ServerName> previouslyFailedRs) throws IOException, KeeperException {
952 ServerName currentServer = regionState.getServerName();
953 if (serverManager.isServerOnline(currentServer)) {
954 LOG.info("Meta was in transition on " + currentServer);
955 assignmentManager.processRegionInTransitionZkLess();
956 } else {
957 if (currentServer != null) {
958 if (regionState.getRegion().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
959 splitMetaLogBeforeAssignment(currentServer);
960 regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
961 previouslyFailedRs.add(currentServer);
962 }
963 }
964 LOG.info("Re-assigning hbase:meta, it was on " + currentServer);
965 regionStates.updateRegionState(regionState.getRegion(), State.OFFLINE);
966 assignmentManager.assignMeta(regionState.getRegion());
967 }
968 }
969
970 void initNamespace() throws IOException {
971
972 tableNamespaceManager = new TableNamespaceManager(this);
973 tableNamespaceManager.start();
974 }
975
976 boolean isCatalogJanitorEnabled() {
977 return catalogJanitorChore != null ?
978 catalogJanitorChore.getEnabled() : false;
979 }
980
981 private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
982 if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
983
984 Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
985 regions.add(HRegionInfo.FIRST_META_REGIONINFO);
986 this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
987 } else {
988
989 this.fileSystemManager.splitMetaLog(currentMetaServer);
990 }
991 }
992
993 private void enableCrashedServerProcessing(final boolean waitForMeta)
994 throws IOException, InterruptedException {
995
996
997
998
999 if (!isServerCrashProcessingEnabled()) {
1000 setServerCrashProcessingEnabled(true);
1001 this.serverManager.processQueuedDeadServers();
1002 }
1003
1004 if (waitForMeta) {
1005 metaTableLocator.waitMetaRegionLocation(this.getZooKeeper());
1006
1007
1008 this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
1009 }
1010 }
1011
1012 private void enableMeta(TableName metaTableName) {
1013 if (!this.assignmentManager.getTableStateManager().isTableState(metaTableName,
1014 ZooKeeperProtos.Table.State.ENABLED)) {
1015 this.assignmentManager.setEnabledTable(metaTableName);
1016 }
1017 }
1018
1019
1020
1021
1022
1023
1024 private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
1025 Set<ServerName> result = new HashSet<ServerName>();
1026 String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1027 HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1028 List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1029 if (regionFailedServers == null) return result;
1030
1031 for(String failedServer : regionFailedServers) {
1032 ServerName server = ServerName.parseServerName(failedServer);
1033 result.add(server);
1034 }
1035 return result;
1036 }
1037
1038 @Override
1039 public TableDescriptors getTableDescriptors() {
1040 return this.tableDescriptors;
1041 }
1042
1043 @Override
1044 public ServerManager getServerManager() {
1045 return this.serverManager;
1046 }
1047
1048 @Override
1049 public MasterFileSystem getMasterFileSystem() {
1050 return this.fileSystemManager;
1051 }
1052
1053
1054
1055
1056
1057
1058
1059
1060 private void startServiceThreads() throws IOException{
1061
1062 this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1063 conf.getInt("hbase.master.executor.openregion.threads", 5));
1064 this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1065 conf.getInt("hbase.master.executor.closeregion.threads", 5));
1066 this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1067 conf.getInt("hbase.master.executor.serverops.threads", 5));
1068 this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1069 conf.getInt("hbase.master.executor.serverops.threads", 5));
1070 this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1071 conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1072
1073
1074
1075
1076
1077
1078 this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1079 startProcedureExecutor();
1080
1081
1082 int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1083 this.logCleaner =
1084 new LogCleaner(cleanerInterval,
1085 this, conf, getMasterFileSystem().getFileSystem(),
1086 getMasterFileSystem().getOldLogDir());
1087 getChoreService().scheduleChore(logCleaner);
1088
1089
1090 Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1091 this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1092 .getFileSystem(), archiveDir);
1093 getChoreService().scheduleChore(hfileCleaner);
1094 serviceStarted = true;
1095 if (LOG.isTraceEnabled()) {
1096 LOG.trace("Started service threads");
1097 }
1098 }
1099
1100 @Override
1101 protected void sendShutdownInterrupt() {
1102 super.sendShutdownInterrupt();
1103 stopProcedureExecutor();
1104 }
1105
1106 @Override
1107 protected void stopServiceThreads() {
1108 if (masterJettyServer != null) {
1109 LOG.info("Stopping master jetty server");
1110 try {
1111 masterJettyServer.stop();
1112 } catch (Exception e) {
1113 LOG.error("Failed to stop master jetty server", e);
1114 }
1115 }
1116 super.stopServiceThreads();
1117 stopChores();
1118
1119
1120
1121 if (!isAborted() && this.serverManager != null &&
1122 this.serverManager.isClusterShutdown()) {
1123 this.serverManager.letRegionServersShutdown();
1124 }
1125 if (LOG.isDebugEnabled()) {
1126 LOG.debug("Stopping service threads");
1127 }
1128
1129 if (this.logCleaner != null) this.logCleaner.cancel(true);
1130 if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
1131 if (this.quotaManager != null) this.quotaManager.stop();
1132 if (this.activeMasterManager != null) this.activeMasterManager.stop();
1133 if (this.serverManager != null) this.serverManager.stop();
1134 if (this.assignmentManager != null) this.assignmentManager.stop();
1135 if (this.fileSystemManager != null) this.fileSystemManager.stop();
1136 if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
1137 }
1138
1139 private void startProcedureExecutor() throws IOException {
1140 final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1141 final Path logDir = new Path(fileSystemManager.getRootDir(),
1142 MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
1143
1144 procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
1145 new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1146 procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
1147 procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
1148 procEnv.getProcedureQueue());
1149
1150 final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
1151 Math.max(Runtime.getRuntime().availableProcessors(),
1152 MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1153 final boolean abortOnCorruption = conf.getBoolean(
1154 MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1155 MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1156 procedureStore.start(numThreads);
1157 procedureExecutor.start(numThreads, abortOnCorruption);
1158 }
1159
1160 private void stopProcedureExecutor() {
1161 if (procedureExecutor != null) {
1162 procedureExecutor.stop();
1163 }
1164
1165 if (procedureStore != null) {
1166 procedureStore.stop(isAborted());
1167 }
1168 }
1169
1170 private void stopChores() {
1171 if (this.balancerChore != null) {
1172 this.balancerChore.cancel(true);
1173 }
1174 if (this.normalizerChore != null) {
1175 this.normalizerChore.cancel(true);
1176 }
1177 if (this.clusterStatusChore != null) {
1178 this.clusterStatusChore.cancel(true);
1179 }
1180 if (this.catalogJanitorChore != null) {
1181 this.catalogJanitorChore.cancel(true);
1182 }
1183 if (this.clusterStatusPublisherChore != null){
1184 clusterStatusPublisherChore.cancel(true);
1185 }
1186 }
1187
1188
1189
1190
1191
1192 InetAddress getRemoteInetAddress(final int port,
1193 final long serverStartCode) throws UnknownHostException {
1194
1195
1196 InetAddress ia = RpcServer.getRemoteIp();
1197
1198
1199
1200 if (ia == null && serverStartCode == startcode) {
1201 InetSocketAddress isa = rpcServices.getSocketAddress();
1202 if (isa != null && isa.getPort() == port) {
1203 ia = isa.getAddress();
1204 }
1205 }
1206 return ia;
1207 }
1208
1209
1210
1211
1212 private int getBalancerCutoffTime() {
1213 int balancerCutoffTime =
1214 getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1215 if (balancerCutoffTime == -1) {
1216
1217 int balancerPeriod =
1218 getConfiguration().getInt("hbase.balancer.period", 300000);
1219 balancerCutoffTime = balancerPeriod;
1220
1221 if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
1222 }
1223 return balancerCutoffTime;
1224 }
1225
1226 public boolean balance() throws IOException {
1227
1228 if (!isInitialized()) {
1229 LOG.debug("Master has not been initialized, don't run balancer.");
1230 return false;
1231 }
1232
1233 int maximumBalanceTime = getBalancerCutoffTime();
1234 synchronized (this.balancer) {
1235
1236 if (!this.loadBalancerTracker.isBalancerOn()) return false;
1237
1238 if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1239 Map<String, RegionState> regionsInTransition =
1240 this.assignmentManager.getRegionStates().getRegionsInTransition();
1241 LOG.debug("Not running balancer because " + regionsInTransition.size() +
1242 " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1243 abbreviate(regionsInTransition.toString(), 256));
1244 return false;
1245 }
1246 if (this.serverManager.areDeadServersInProgress()) {
1247 LOG.debug("Not running balancer because processing dead regionserver(s): " +
1248 this.serverManager.getDeadServers());
1249 return false;
1250 }
1251
1252 if (this.cpHost != null) {
1253 try {
1254 if (this.cpHost.preBalance()) {
1255 LOG.debug("Coprocessor bypassing balancer request");
1256 return false;
1257 }
1258 } catch (IOException ioe) {
1259 LOG.error("Error invoking master coprocessor preBalance()", ioe);
1260 return false;
1261 }
1262 }
1263
1264 Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1265 this.assignmentManager.getRegionStates().getAssignmentsByTable();
1266
1267 List<RegionPlan> plans = new ArrayList<RegionPlan>();
1268
1269 this.balancer.setClusterStatus(getClusterStatus());
1270 for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
1271 List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
1272 if (partialPlans != null) plans.addAll(partialPlans);
1273 }
1274 long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1275 int rpCount = 0;
1276 long totalRegPlanExecTime = 0;
1277 if (plans != null && !plans.isEmpty()) {
1278 for (RegionPlan plan: plans) {
1279 LOG.info("balance " + plan);
1280 long balStartTime = System.currentTimeMillis();
1281
1282 this.assignmentManager.balance(plan);
1283 totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1284 rpCount++;
1285 if (rpCount < plans.size() &&
1286
1287 (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1288
1289 LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1290 maximumBalanceTime);
1291 break;
1292 }
1293 }
1294 }
1295 if (this.cpHost != null) {
1296 try {
1297 this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1298 } catch (IOException ioe) {
1299
1300 LOG.error("Error invoking master coprocessor postBalance()", ioe);
1301 }
1302 }
1303 }
1304
1305
1306 return true;
1307 }
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318 public boolean normalizeRegions() throws IOException, CoordinatedStateException {
1319 if (!isInitialized()) {
1320 LOG.debug("Master has not been initialized, don't run region normalizer.");
1321 return false;
1322 }
1323
1324 if (!this.regionNormalizerTracker.isNormalizerOn()) {
1325 LOG.debug("Region normalization is disabled, don't run region normalizer.");
1326 return false;
1327 }
1328
1329 synchronized (this.normalizer) {
1330
1331 List<TableName> allEnabledTables = new ArrayList<>(
1332 this.assignmentManager.getTableStateManager().getTablesInStates(
1333 ZooKeeperProtos.Table.State.ENABLED));
1334
1335 Collections.shuffle(allEnabledTables);
1336
1337 for (TableName table : allEnabledTables) {
1338 if (quotaManager.getNamespaceQuotaManager() != null &&
1339 quotaManager.getNamespaceQuotaManager().getState(table.getNamespaceAsString()) != null){
1340 LOG.debug("Skipping normalizing " + table + " since its namespace has quota");
1341 continue;
1342 }
1343 if (table.isSystemTable() || (getTableDescriptors().get(table) != null &&
1344 !getTableDescriptors().get(table).isNormalizationEnabled())) {
1345 LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
1346 + " table or doesn't have auto normalization turned on");
1347 continue;
1348 }
1349 List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1350 if (plans != null) {
1351 for (NormalizationPlan plan : plans) {
1352 plan.execute(clusterConnection.getAdmin());
1353 }
1354 }
1355 }
1356 }
1357
1358
1359 return true;
1360 }
1361
1362
1363
1364
1365 String getClientIdAuditPrefix() {
1366 return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1367 }
1368
1369
1370
1371
1372
1373
1374
1375 public void setCatalogJanitorEnabled(final boolean b) {
1376 this.catalogJanitorChore.setEnabled(b);
1377 }
1378
1379 @Override
1380 public void dispatchMergingRegions(final HRegionInfo region_a,
1381 final HRegionInfo region_b, final boolean forcible) throws IOException {
1382 checkInitialized();
1383 this.service.submit(new DispatchMergingRegionHandler(this,
1384 this.catalogJanitorChore, region_a, region_b, forcible));
1385 }
1386
1387 void move(final byte[] encodedRegionName,
1388 final byte[] destServerName) throws HBaseIOException {
1389 RegionState regionState = assignmentManager.getRegionStates().
1390 getRegionState(Bytes.toString(encodedRegionName));
1391 if (regionState == null) {
1392 throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1393 }
1394
1395 HRegionInfo hri = regionState.getRegion();
1396 ServerName dest;
1397 if (destServerName == null || destServerName.length == 0) {
1398 LOG.info("Passed destination servername is null/empty so " +
1399 "choosing a server at random");
1400 final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1401 regionState.getServerName());
1402 dest = balancer.randomAssignment(hri, destServers);
1403 if (dest == null) {
1404 LOG.debug("Unable to determine a plan to assign " + hri);
1405 return;
1406 }
1407 } else {
1408 dest = ServerName.valueOf(Bytes.toString(destServerName));
1409 if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1410 && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1411
1412
1413
1414 LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1415 + " to avoid unnecessary region moving later by load balancer,"
1416 + " because it should not be on master");
1417 return;
1418 }
1419 }
1420
1421 if (dest.equals(regionState.getServerName())) {
1422 LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1423 + " because region already assigned to the same server " + dest + ".");
1424 return;
1425 }
1426
1427
1428 RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1429
1430 try {
1431 checkInitialized();
1432 if (this.cpHost != null) {
1433 if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1434 return;
1435 }
1436 }
1437
1438
1439
1440 serverManager.sendRegionWarmup(rp.getDestination(), hri);
1441
1442 LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1443 this.assignmentManager.balance(rp);
1444 if (this.cpHost != null) {
1445 this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1446 }
1447 } catch (IOException ioe) {
1448 if (ioe instanceof HBaseIOException) {
1449 throw (HBaseIOException)ioe;
1450 }
1451 throw new HBaseIOException(ioe);
1452 }
1453 }
1454
1455 @Override
1456 public long createTable(
1457 final HTableDescriptor hTableDescriptor,
1458 final byte [][] splitKeys,
1459 final long nonceGroup,
1460 final long nonce) throws IOException {
1461 if (isStopped()) {
1462 throw new MasterNotRunningException();
1463 }
1464
1465 String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1466 ensureNamespaceExists(namespace);
1467
1468 HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
1469 checkInitialized();
1470 sanityCheckTableDescriptor(hTableDescriptor);
1471
1472 if (cpHost != null) {
1473 cpHost.preCreateTable(hTableDescriptor, newRegions);
1474 }
1475 LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1476
1477
1478
1479 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1480 long procId = this.procedureExecutor.submitProcedure(
1481 new CreateTableProcedure(
1482 procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch),
1483 nonceGroup,
1484 nonce);
1485 latch.await();
1486
1487 if (cpHost != null) {
1488 cpHost.postCreateTable(hTableDescriptor, newRegions);
1489 }
1490
1491 return procId;
1492 }
1493
1494
1495
1496
1497
1498
1499 private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1500 final String CONF_KEY = "hbase.table.sanity.checks";
1501 boolean logWarn = false;
1502 if (!conf.getBoolean(CONF_KEY, true)) {
1503 logWarn = true;
1504 }
1505 String tableVal = htd.getConfigurationValue(CONF_KEY);
1506 if (tableVal != null && !Boolean.valueOf(tableVal)) {
1507 logWarn = true;
1508 }
1509
1510
1511 long maxFileSizeLowerLimit = 2 * 1024 * 1024L;
1512 long maxFileSize = htd.getMaxFileSize();
1513 if (maxFileSize < 0) {
1514 maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1515 }
1516 if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1517 String message = "MAX_FILESIZE for table descriptor or "
1518 + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1519 + ") is too small, which might cause over splitting into unmanageable "
1520 + "number of regions.";
1521 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1522 }
1523
1524
1525 long flushSizeLowerLimit = 1024 * 1024L;
1526 long flushSize = htd.getMemStoreFlushSize();
1527 if (flushSize < 0) {
1528 flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1529 }
1530 if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1531 String message = "MEMSTORE_FLUSHSIZE for table descriptor or "
1532 + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1533 + " very frequent flushing.";
1534 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1535 }
1536
1537
1538 try {
1539 checkClassLoading(conf, htd);
1540 } catch (Exception ex) {
1541 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, ex.getMessage(), null);
1542 }
1543
1544
1545 try {
1546 checkCompression(htd);
1547 } catch (IOException e) {
1548 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1549 }
1550
1551
1552 try {
1553 checkEncryption(conf, htd);
1554 } catch (IOException e) {
1555 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1556 }
1557
1558 try{
1559 checkCompactionPolicy(conf, htd);
1560 } catch(IOException e){
1561 warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e);
1562 }
1563
1564 if (htd.getColumnFamilies().length == 0) {
1565 String message = "Table should have at least one column family.";
1566 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1567 }
1568
1569 for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1570 if (hcd.getTimeToLive() <= 0) {
1571 String message = "TTL for column family " + hcd.getNameAsString() + " must be positive.";
1572 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1573 }
1574
1575
1576 if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1577 String message = "Block size for column family " + hcd.getNameAsString()
1578 + " must be between 1K and 16MB.";
1579 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1580 }
1581
1582
1583 if (hcd.getMinVersions() < 0) {
1584 String message = "Min versions for column family " + hcd.getNameAsString()
1585 + " must be positive.";
1586 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1587 }
1588
1589
1590
1591
1592
1593 if (hcd.getMinVersions() > hcd.getMaxVersions()) {
1594 String message = "Min versions for column family " + hcd.getNameAsString()
1595 + " must be less than the Max versions.";
1596 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1597 }
1598
1599
1600 if (hcd.getScope() < 0) {
1601 String message = "Replication scope for column family "
1602 + hcd.getNameAsString() + " must be positive.";
1603 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1604 }
1605
1606
1607
1608 if (hcd.getDFSReplication() < 0) {
1609 String message = "HFile Replication for column family " + hcd.getNameAsString()
1610 + " must be greater than zero.";
1611 warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1612 }
1613
1614
1615 }
1616 }
1617
1618 private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
1619 throws IOException {
1620
1621
1622 String className =
1623 htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1624 if (className == null) {
1625 className =
1626 conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1627 ExploringCompactionPolicy.class.getName());
1628 }
1629
1630 int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT;
1631 String sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1632 if (sv != null) {
1633 blockingFileCount = Integer.parseInt(sv);
1634 } else {
1635 blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount);
1636 }
1637
1638 for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1639 String compactionPolicy =
1640 hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1641 if (compactionPolicy == null) {
1642 compactionPolicy = className;
1643 }
1644 if (!compactionPolicy.equals(FIFOCompactionPolicy.class.getName())) {
1645 continue;
1646 }
1647
1648 String message = null;
1649
1650
1651 if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) {
1652 message = "Default TTL is not supported for FIFO compaction";
1653 throw new IOException(message);
1654 }
1655
1656
1657 if (hcd.getMinVersions() > 0) {
1658 message = "MIN_VERSION > 0 is not supported for FIFO compaction";
1659 throw new IOException(message);
1660 }
1661
1662
1663 String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1664 if (sbfc != null) {
1665 blockingFileCount = Integer.parseInt(sbfc);
1666 }
1667 if (blockingFileCount < 1000) {
1668 message =
1669 "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount
1670 + " is below recommended minimum of 1000";
1671 throw new IOException(message);
1672 }
1673 }
1674 }
1675
1676
1677 private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
1678 String message, Exception cause) throws IOException {
1679 if (!logWarn) {
1680 throw new DoNotRetryIOException(message + " Set " + confKey +
1681 " to false at conf or table descriptor if you want to bypass sanity checks", cause);
1682 }
1683 LOG.warn(message);
1684 }
1685
1686 private void startActiveMasterManager(int infoPort) throws KeeperException {
1687 String backupZNode = ZKUtil.joinZNode(
1688 zooKeeper.backupMasterAddressesZNode, serverName.toString());
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699 LOG.info("Adding backup master ZNode " + backupZNode);
1700 if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
1701 serverName, infoPort)) {
1702 LOG.warn("Failed create of " + backupZNode + " by " + serverName);
1703 }
1704
1705 activeMasterManager.setInfoPort(infoPort);
1706
1707 Threads.setDaemonThreadRunning(new Thread(new Runnable() {
1708 @Override
1709 public void run() {
1710 int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1711 HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1712
1713 if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
1714 HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
1715 LOG.debug("HMaster started in backup mode. "
1716 + "Stalling until master znode is written.");
1717
1718
1719 while (!activeMasterManager.hasActiveMaster()) {
1720 LOG.debug("Waiting for master address ZNode to be written "
1721 + "(Also watching cluster state node)");
1722 Threads.sleep(timeout);
1723 }
1724 }
1725 MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
1726 status.setDescription("Master startup");
1727 try {
1728 if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
1729 finishActiveMasterInitialization(status);
1730 }
1731 } catch (Throwable t) {
1732 status.setStatus("Failed to become active: " + t.getMessage());
1733 LOG.fatal("Failed to become active master", t);
1734
1735 if (t instanceof NoClassDefFoundError &&
1736 t.getMessage()
1737 .contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
1738
1739 abort("HBase is having a problem with its Hadoop jars. You may need to "
1740 + "recompile HBase against Hadoop version "
1741 + org.apache.hadoop.util.VersionInfo.getVersion()
1742 + " or change your hadoop jars to start properly", t);
1743 } else {
1744 abort("Unhandled exception. Starting shutdown.", t);
1745 }
1746 } finally {
1747 status.cleanup();
1748 }
1749 }
1750 }, getServerName().toShortString() + ".activeMasterManager"));
1751 }
1752
1753 private void checkCompression(final HTableDescriptor htd)
1754 throws IOException {
1755 if (!this.masterCheckCompression) return;
1756 for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1757 checkCompression(hcd);
1758 }
1759 }
1760
1761 private void checkCompression(final HColumnDescriptor hcd)
1762 throws IOException {
1763 if (!this.masterCheckCompression) return;
1764 CompressionTest.testCompression(hcd.getCompression());
1765 CompressionTest.testCompression(hcd.getCompactionCompression());
1766 }
1767
1768 private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
1769 throws IOException {
1770 if (!this.masterCheckEncryption) return;
1771 for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1772 checkEncryption(conf, hcd);
1773 }
1774 }
1775
1776 private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
1777 throws IOException {
1778 if (!this.masterCheckEncryption) return;
1779 EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
1780 }
1781
1782 private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
1783 throws IOException {
1784 RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1785 RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
1786 }
1787
1788 private static boolean isCatalogTable(final TableName tableName) {
1789 return tableName.equals(TableName.META_TABLE_NAME);
1790 }
1791
1792 @Override
1793 public long deleteTable(
1794 final TableName tableName,
1795 final long nonceGroup,
1796 final long nonce) throws IOException {
1797 checkInitialized();
1798 if (cpHost != null) {
1799 cpHost.preDeleteTable(tableName);
1800 }
1801 LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1802
1803
1804 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1805 long procId = this.procedureExecutor.submitProcedure(
1806 new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch),
1807 nonceGroup,
1808 nonce);
1809 latch.await();
1810
1811 if (cpHost != null) {
1812 cpHost.postDeleteTable(tableName);
1813 }
1814
1815 return procId;
1816 }
1817
1818 @Override
1819 public void truncateTable(
1820 final TableName tableName,
1821 final boolean preserveSplits,
1822 final long nonceGroup,
1823 final long nonce) throws IOException {
1824 checkInitialized();
1825 if (cpHost != null) {
1826 cpHost.preTruncateTable(tableName);
1827 }
1828 LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
1829
1830 long procId = this.procedureExecutor.submitProcedure(
1831 new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName, preserveSplits),
1832 nonceGroup,
1833 nonce);
1834 ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1835
1836 if (cpHost != null) {
1837 cpHost.postTruncateTable(tableName);
1838 }
1839 }
1840
1841 @Override
1842 public void addColumn(
1843 final TableName tableName,
1844 final HColumnDescriptor columnDescriptor,
1845 final long nonceGroup,
1846 final long nonce)
1847 throws IOException {
1848 checkInitialized();
1849 checkCompression(columnDescriptor);
1850 checkEncryption(conf, columnDescriptor);
1851 if (cpHost != null) {
1852 if (cpHost.preAddColumn(tableName, columnDescriptor)) {
1853 return;
1854 }
1855 }
1856
1857 long procId = this.procedureExecutor.submitProcedure(
1858 new AddColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnDescriptor),
1859 nonceGroup,
1860 nonce);
1861 ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1862 if (cpHost != null) {
1863 cpHost.postAddColumn(tableName, columnDescriptor);
1864 }
1865 }
1866
1867 @Override
1868 public void modifyColumn(
1869 final TableName tableName,
1870 final HColumnDescriptor descriptor,
1871 final long nonceGroup,
1872 final long nonce)
1873 throws IOException {
1874 checkInitialized();
1875 checkCompression(descriptor);
1876 checkEncryption(conf, descriptor);
1877 if (cpHost != null) {
1878 if (cpHost.preModifyColumn(tableName, descriptor)) {
1879 return;
1880 }
1881 }
1882 LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1883
1884
1885 long procId = this.procedureExecutor.submitProcedure(
1886 new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, descriptor),
1887 nonceGroup,
1888 nonce);
1889 ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1890
1891 if (cpHost != null) {
1892 cpHost.postModifyColumn(tableName, descriptor);
1893 }
1894 }
1895
1896 @Override
1897 public void deleteColumn(
1898 final TableName tableName,
1899 final byte[] columnName,
1900 final long nonceGroup,
1901 final long nonce)
1902 throws IOException {
1903 checkInitialized();
1904 if (cpHost != null) {
1905 if (cpHost.preDeleteColumn(tableName, columnName)) {
1906 return;
1907 }
1908 }
1909 LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
1910
1911
1912 long procId = this.procedureExecutor.submitProcedure(
1913 new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnName),
1914 nonceGroup,
1915 nonce);
1916 ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1917
1918 if (cpHost != null) {
1919 cpHost.postDeleteColumn(tableName, columnName);
1920 }
1921 }
1922
1923 @Override
1924 public long enableTable(
1925 final TableName tableName,
1926 final long nonceGroup,
1927 final long nonce) throws IOException {
1928 checkInitialized();
1929 if (cpHost != null) {
1930 cpHost.preEnableTable(tableName);
1931 }
1932 LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
1933
1934
1935 final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1936 long procId = this.procedureExecutor.submitProcedure(
1937 new EnableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1938 nonceGroup,
1939 nonce);
1940
1941
1942
1943
1944 prepareLatch.await();
1945
1946 if (cpHost != null) {
1947 cpHost.postEnableTable(tableName);
1948 }
1949
1950 return procId;
1951 }
1952
1953 @Override
1954 public long disableTable(
1955 final TableName tableName,
1956 final long nonceGroup,
1957 final long nonce) throws IOException {
1958 checkInitialized();
1959 if (cpHost != null) {
1960 cpHost.preDisableTable(tableName);
1961 }
1962 LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
1963
1964
1965 final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1966
1967 long procId = this.procedureExecutor.submitProcedure(
1968 new DisableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1969 nonceGroup,
1970 nonce);
1971
1972
1973
1974
1975 prepareLatch.await();
1976
1977 if (cpHost != null) {
1978 cpHost.postDisableTable(tableName);
1979 }
1980
1981 return procId;
1982 }
1983
1984
1985
1986
1987
1988
1989
1990 @VisibleForTesting
1991 Pair<HRegionInfo, ServerName> getTableRegionForRow(
1992 final TableName tableName, final byte [] rowKey)
1993 throws IOException {
1994 final AtomicReference<Pair<HRegionInfo, ServerName>> result =
1995 new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
1996
1997 MetaScannerVisitor visitor =
1998 new MetaScannerVisitorBase() {
1999 @Override
2000 public boolean processRow(Result data) throws IOException {
2001 if (data == null || data.size() <= 0) {
2002 return true;
2003 }
2004 Pair<HRegionInfo, ServerName> pair = HRegionInfo.getHRegionInfoAndServerName(data);
2005 if (pair == null) {
2006 return false;
2007 }
2008 if (!pair.getFirst().getTable().equals(tableName)) {
2009 return false;
2010 }
2011 result.set(pair);
2012 return true;
2013 }
2014 };
2015
2016 MetaScanner.metaScan(clusterConnection, visitor, tableName, rowKey, 1);
2017 return result.get();
2018 }
2019
2020 @Override
2021 public void modifyTable(
2022 final TableName tableName,
2023 final HTableDescriptor descriptor,
2024 final long nonceGroup,
2025 final long nonce)
2026 throws IOException {
2027 checkInitialized();
2028 sanityCheckTableDescriptor(descriptor);
2029 if (cpHost != null) {
2030 cpHost.preModifyTable(tableName, descriptor);
2031 }
2032
2033 LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2034
2035
2036 long procId = this.procedureExecutor.submitProcedure(
2037 new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor),
2038 nonceGroup,
2039 nonce);
2040
2041 ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2042
2043 if (cpHost != null) {
2044 cpHost.postModifyTable(tableName, descriptor);
2045 }
2046 }
2047
2048 @Override
2049 public void checkTableModifiable(final TableName tableName)
2050 throws IOException, TableNotFoundException, TableNotDisabledException {
2051 if (isCatalogTable(tableName)) {
2052 throw new IOException("Can't modify catalog tables");
2053 }
2054 if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2055 throw new TableNotFoundException(tableName);
2056 }
2057 if (!getAssignmentManager().getTableStateManager().
2058 isTableState(tableName, ZooKeeperProtos.Table.State.DISABLED)) {
2059 throw new TableNotDisabledException(tableName);
2060 }
2061 }
2062
2063
2064
2065
2066 public ClusterStatus getClusterStatus() throws InterruptedIOException {
2067
2068 List<String> backupMasterStrings;
2069 try {
2070 backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2071 this.zooKeeper.backupMasterAddressesZNode);
2072 } catch (KeeperException e) {
2073 LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2074 backupMasterStrings = null;
2075 }
2076
2077 List<ServerName> backupMasters = null;
2078 if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2079 backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
2080 for (String s: backupMasterStrings) {
2081 try {
2082 byte [] bytes;
2083 try {
2084 bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2085 this.zooKeeper.backupMasterAddressesZNode, s));
2086 } catch (InterruptedException e) {
2087 throw new InterruptedIOException();
2088 }
2089 if (bytes != null) {
2090 ServerName sn;
2091 try {
2092 sn = ServerName.parseFrom(bytes);
2093 } catch (DeserializationException e) {
2094 LOG.warn("Failed parse, skipping registering backup server", e);
2095 continue;
2096 }
2097 backupMasters.add(sn);
2098 }
2099 } catch (KeeperException e) {
2100 LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2101 "backup servers"), e);
2102 }
2103 }
2104 Collections.sort(backupMasters, new Comparator<ServerName>() {
2105 @Override
2106 public int compare(ServerName s1, ServerName s2) {
2107 return s1.getServerName().compareTo(s2.getServerName());
2108 }});
2109 }
2110
2111 String clusterId = fileSystemManager != null ?
2112 fileSystemManager.getClusterId().toString() : null;
2113 Map<String, RegionState> regionsInTransition = assignmentManager != null ?
2114 assignmentManager.getRegionStates().getRegionsInTransition() : null;
2115 String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
2116 boolean balancerOn = loadBalancerTracker != null ?
2117 loadBalancerTracker.isBalancerOn() : false;
2118 Map<ServerName, ServerLoad> onlineServers = null;
2119 Set<ServerName> deadServers = null;
2120 if (serverManager != null) {
2121 deadServers = serverManager.getDeadServers().copyServerNames();
2122 onlineServers = serverManager.getOnlineServers();
2123 }
2124 return new ClusterStatus(VersionInfo.getVersion(), clusterId,
2125 onlineServers, deadServers, serverName, backupMasters,
2126 regionsInTransition, coprocessors, balancerOn);
2127 }
2128
2129
2130
2131
2132
2133
2134
2135
2136 public static String getLoadedCoprocessors() {
2137 return CoprocessorHost.getLoadedCoprocessors().toString();
2138 }
2139
2140
2141
2142
2143 public long getMasterStartTime() {
2144 return startcode;
2145 }
2146
2147
2148
2149
2150 public long getMasterActiveTime() {
2151 return masterActiveTime;
2152 }
2153
2154 public int getRegionServerInfoPort(final ServerName sn) {
2155 RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2156 if (info == null || info.getInfoPort() == 0) {
2157 return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2158 HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2159 }
2160 return info.getInfoPort();
2161 }
2162
2163 public String getRegionServerVersion(final ServerName sn) {
2164 RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2165 if (info != null && info.hasVersionInfo()) {
2166 return info.getVersionInfo().getVersion();
2167 }
2168 return "Unknown";
2169 }
2170
2171
2172
2173
2174 public String[] getMasterCoprocessors() {
2175 Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2176 return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2177 }
2178
2179 @Override
2180 public void abort(final String msg, final Throwable t) {
2181 if (isAborted() || isStopped()) {
2182 return;
2183 }
2184 if (cpHost != null) {
2185
2186 LOG.fatal("Master server abort: loaded coprocessors are: " +
2187 getLoadedCoprocessors());
2188 }
2189 if (t != null) LOG.fatal(msg, t);
2190 stop(msg);
2191 }
2192
2193 @Override
2194 public ZooKeeperWatcher getZooKeeper() {
2195 return zooKeeper;
2196 }
2197
2198 @Override
2199 public MasterCoprocessorHost getMasterCoprocessorHost() {
2200 return cpHost;
2201 }
2202
2203 @Override
2204 public MasterQuotaManager getMasterQuotaManager() {
2205 return quotaManager;
2206 }
2207
2208 @Override
2209 public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2210 return procedureExecutor;
2211 }
2212
2213 @Override
2214 public ServerName getServerName() {
2215 return this.serverName;
2216 }
2217
2218 @Override
2219 public AssignmentManager getAssignmentManager() {
2220 return this.assignmentManager;
2221 }
2222
2223 public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2224 return rsFatals;
2225 }
2226
2227 public void shutdown() {
2228 if (cpHost != null) {
2229 try {
2230 cpHost.preShutdown();
2231 } catch (IOException ioe) {
2232 LOG.error("Error call master coprocessor preShutdown()", ioe);
2233 }
2234 }
2235
2236 if (this.serverManager != null) {
2237 this.serverManager.shutdownCluster();
2238 }
2239 if (this.clusterStatusTracker != null){
2240 try {
2241 this.clusterStatusTracker.setClusterDown();
2242 } catch (KeeperException e) {
2243 LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2244 }
2245 }
2246 }
2247
2248 public void stopMaster() {
2249 if (cpHost != null) {
2250 try {
2251 cpHost.preStopMaster();
2252 } catch (IOException ioe) {
2253 LOG.error("Error call master coprocessor preStopMaster()", ioe);
2254 }
2255 }
2256 stop("Stopped by " + Thread.currentThread().getName());
2257 }
2258
2259 void checkServiceStarted() throws ServerNotRunningYetException {
2260 if (!serviceStarted) {
2261 throw new ServerNotRunningYetException("Server is not running yet");
2262 }
2263 }
2264
2265 void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
2266 checkServiceStarted();
2267 if (!isInitialized()) {
2268 throw new PleaseHoldException("Master is initializing");
2269 }
2270 }
2271
2272 void checkNamespaceManagerReady() throws IOException {
2273 checkInitialized();
2274 if (tableNamespaceManager == null ||
2275 !tableNamespaceManager.isTableAvailableAndInitialized()) {
2276 throw new IOException("Table Namespace Manager not ready yet, try again later");
2277 }
2278 }
2279
2280
2281
2282
2283
2284
2285
2286
2287 public boolean isActiveMaster() {
2288 return isActiveMaster;
2289 }
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300 @Override
2301 public boolean isInitialized() {
2302 return initialized.isReady();
2303 }
2304
2305 @VisibleForTesting
2306 public void setInitialized(boolean isInitialized) {
2307 procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2308 }
2309
2310 public ProcedureEvent getInitializedEvent() {
2311 return initialized;
2312 }
2313
2314
2315
2316
2317
2318
2319 @Override
2320 public boolean isServerCrashProcessingEnabled() {
2321 return serverCrashProcessingEnabled.isReady();
2322 }
2323
2324 @VisibleForTesting
2325 public void setServerCrashProcessingEnabled(final boolean b) {
2326 procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
2327 }
2328
2329 public ProcedureEvent getServerCrashProcessingEnabledEvent() {
2330 return serverCrashProcessingEnabled;
2331 }
2332
2333
2334
2335
2336
2337 public boolean isInitializationStartsMetaRegionAssignment() {
2338 return this.initializationBeforeMetaAssignment;
2339 }
2340
2341 public void assignRegion(HRegionInfo hri) {
2342 assignmentManager.assign(hri, true);
2343 }
2344
2345
2346
2347
2348
2349
2350
2351 public double getAverageLoad() {
2352 if (this.assignmentManager == null) {
2353 return 0;
2354 }
2355
2356 RegionStates regionStates = this.assignmentManager.getRegionStates();
2357 if (regionStates == null) {
2358 return 0;
2359 }
2360 return regionStates.getAverageLoad();
2361 }
2362
2363 @Override
2364 public boolean registerService(Service instance) {
2365
2366
2367
2368 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2369 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
2370 LOG.error("Coprocessor service "+serviceDesc.getFullName()+
2371 " already registered, rejecting request from "+instance
2372 );
2373 return false;
2374 }
2375
2376 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
2377 if (LOG.isDebugEnabled()) {
2378 LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
2379 }
2380 return true;
2381 }
2382
2383
2384
2385
2386
2387
2388
2389 public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2390 final Configuration conf, final CoordinatedStateManager cp) {
2391 try {
2392 Constructor<? extends HMaster> c =
2393 masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
2394 return c.newInstance(conf, cp);
2395 } catch(Exception e) {
2396 Throwable error = e;
2397 if (e instanceof InvocationTargetException &&
2398 ((InvocationTargetException)e).getTargetException() != null) {
2399 error = ((InvocationTargetException)e).getTargetException();
2400 }
2401 throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2402 , error);
2403 }
2404 }
2405
2406
2407
2408
2409 public static void main(String [] args) {
2410 VersionInfo.logVersion();
2411 new HMasterCommandLine(HMaster.class).doMain(args);
2412 }
2413
2414 public HFileCleaner getHFileCleaner() {
2415 return this.hfileCleaner;
2416 }
2417
2418
2419
2420
2421
2422 public SnapshotManager getSnapshotManagerForTesting() {
2423 return this.snapshotManager;
2424 }
2425
2426 @Override
2427 public void createNamespace(NamespaceDescriptor descriptor) throws IOException {
2428 TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
2429 checkNamespaceManagerReady();
2430 if (cpHost != null) {
2431 if (cpHost.preCreateNamespace(descriptor)) {
2432 return;
2433 }
2434 }
2435 LOG.info(getClientIdAuditPrefix() + " creating " + descriptor);
2436 tableNamespaceManager.create(descriptor);
2437 if (cpHost != null) {
2438 cpHost.postCreateNamespace(descriptor);
2439 }
2440 }
2441
2442 @Override
2443 public void modifyNamespace(NamespaceDescriptor descriptor) throws IOException {
2444 TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
2445 checkNamespaceManagerReady();
2446 if (cpHost != null) {
2447 if (cpHost.preModifyNamespace(descriptor)) {
2448 return;
2449 }
2450 }
2451 LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
2452 tableNamespaceManager.update(descriptor);
2453 if (cpHost != null) {
2454 cpHost.postModifyNamespace(descriptor);
2455 }
2456 }
2457
2458 @Override
2459 public void deleteNamespace(String name) throws IOException {
2460 checkNamespaceManagerReady();
2461 if (cpHost != null) {
2462 if (cpHost.preDeleteNamespace(name)) {
2463 return;
2464 }
2465 }
2466 LOG.info(getClientIdAuditPrefix() + " delete " + name);
2467 tableNamespaceManager.remove(name);
2468 if (cpHost != null) {
2469 cpHost.postDeleteNamespace(name);
2470 }
2471 }
2472
2473
2474
2475
2476
2477
2478
2479
2480 private void ensureNamespaceExists(final String name)
2481 throws IOException, NamespaceNotFoundException {
2482 checkNamespaceManagerReady();
2483 NamespaceDescriptor nsd = tableNamespaceManager.get(name);
2484 if (nsd == null) {
2485 throw new NamespaceNotFoundException(name);
2486 }
2487 }
2488
2489 @Override
2490 public NamespaceDescriptor getNamespaceDescriptor(String name) throws IOException {
2491 checkNamespaceManagerReady();
2492
2493 if (cpHost != null) {
2494 cpHost.preGetNamespaceDescriptor(name);
2495 }
2496
2497 NamespaceDescriptor nsd = tableNamespaceManager.get(name);
2498 if (nsd == null) {
2499 throw new NamespaceNotFoundException(name);
2500 }
2501
2502 if (cpHost != null) {
2503 cpHost.postGetNamespaceDescriptor(nsd);
2504 }
2505
2506 return nsd;
2507 }
2508
2509 @Override
2510 public List<NamespaceDescriptor> listNamespaceDescriptors() throws IOException {
2511 checkNamespaceManagerReady();
2512
2513 final List<NamespaceDescriptor> descriptors = new ArrayList<NamespaceDescriptor>();
2514 boolean bypass = false;
2515 if (cpHost != null) {
2516 bypass = cpHost.preListNamespaceDescriptors(descriptors);
2517 }
2518
2519 if (!bypass) {
2520 descriptors.addAll(tableNamespaceManager.list());
2521
2522 if (cpHost != null) {
2523 cpHost.postListNamespaceDescriptors(descriptors);
2524 }
2525 }
2526 return descriptors;
2527 }
2528
2529 @Override
2530 public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
2531 throws IOException {
2532 if (cpHost != null) {
2533 cpHost.preAbortProcedure(this.procedureExecutor, procId);
2534 }
2535
2536 final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
2537
2538 if (cpHost != null) {
2539 cpHost.postAbortProcedure();
2540 }
2541
2542 return result;
2543 }
2544
2545 @Override
2546 public List<ProcedureInfo> listProcedures() throws IOException {
2547 if (cpHost != null) {
2548 cpHost.preListProcedures();
2549 }
2550
2551 final List<ProcedureInfo> procInfoList = this.procedureExecutor.listProcedures();
2552
2553 if (cpHost != null) {
2554 cpHost.postListProcedures(procInfoList);
2555 }
2556
2557 return procInfoList;
2558 }
2559
2560 @Override
2561 public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
2562 ensureNamespaceExists(name);
2563 return listTableDescriptors(name, null, null, true);
2564 }
2565
2566 @Override
2567 public List<TableName> listTableNamesByNamespace(String name) throws IOException {
2568 ensureNamespaceExists(name);
2569 return listTableNames(name, null, true);
2570 }
2571
2572
2573
2574
2575
2576
2577
2578
2579
2580
2581 public List<HTableDescriptor> listTableDescriptors(final String namespace, final String regex,
2582 final List<TableName> tableNameList, final boolean includeSysTables)
2583 throws IOException {
2584 final List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
2585
2586 boolean bypass = false;
2587 if (cpHost != null) {
2588 bypass = cpHost.preGetTableDescriptors(tableNameList, descriptors);
2589
2590 bypass |= cpHost.preGetTableDescriptors(tableNameList, descriptors, regex);
2591 }
2592
2593 if (!bypass) {
2594 if (tableNameList == null || tableNameList.size() == 0) {
2595
2596 Collection<HTableDescriptor> htds;
2597 if (namespace != null && namespace.length() > 0) {
2598 htds = tableDescriptors.getByNamespace(namespace).values();
2599 } else {
2600 htds = tableDescriptors.getAll().values();
2601 }
2602
2603 for (HTableDescriptor desc: htds) {
2604 if (includeSysTables || !desc.getTableName().isSystemTable()) {
2605 descriptors.add(desc);
2606 }
2607 }
2608 } else {
2609 for (TableName s: tableNameList) {
2610 HTableDescriptor desc = tableDescriptors.get(s);
2611 if (desc != null) {
2612 descriptors.add(desc);
2613 }
2614 }
2615 }
2616
2617
2618 if (regex != null) {
2619 filterTablesByRegex(descriptors, Pattern.compile(regex));
2620 }
2621
2622 if (cpHost != null) {
2623 cpHost.postGetTableDescriptors(descriptors);
2624
2625 cpHost.postGetTableDescriptors(tableNameList, descriptors, regex);
2626 }
2627 }
2628 return descriptors;
2629 }
2630
2631
2632
2633
2634
2635
2636
2637
2638 public List<TableName> listTableNames(final String namespace, final String regex,
2639 final boolean includeSysTables) throws IOException {
2640 final List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
2641
2642 boolean bypass = false;
2643 if (cpHost != null) {
2644 bypass = cpHost.preGetTableNames(descriptors, regex);
2645 }
2646
2647 if (!bypass) {
2648
2649 Collection<HTableDescriptor> htds;
2650 if (namespace != null && namespace.length() > 0) {
2651 htds = tableDescriptors.getByNamespace(namespace).values();
2652 } else {
2653 htds = tableDescriptors.getAll().values();
2654 }
2655
2656 for (HTableDescriptor htd: htds) {
2657 if (includeSysTables || !htd.getTableName().isSystemTable()) {
2658 descriptors.add(htd);
2659 }
2660 }
2661
2662
2663 if (regex != null) {
2664 filterTablesByRegex(descriptors, Pattern.compile(regex));
2665 }
2666
2667 if (cpHost != null) {
2668 cpHost.postGetTableNames(descriptors, regex);
2669 }
2670 }
2671
2672 List<TableName> result = new ArrayList<TableName>(descriptors.size());
2673 for (HTableDescriptor htd: descriptors) {
2674 result.add(htd.getTableName());
2675 }
2676 return result;
2677 }
2678
2679
2680
2681
2682
2683
2684
2685 private static void filterTablesByRegex(final Collection<HTableDescriptor> descriptors,
2686 final Pattern pattern) {
2687 final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
2688 Iterator<HTableDescriptor> itr = descriptors.iterator();
2689 while (itr.hasNext()) {
2690 HTableDescriptor htd = itr.next();
2691 String tableName = htd.getTableName().getNameAsString();
2692 boolean matched = pattern.matcher(tableName).matches();
2693 if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
2694 matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
2695 }
2696 if (!matched) {
2697 itr.remove();
2698 }
2699 }
2700 }
2701
2702 @Override
2703 public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
2704 return getClusterStatus().getLastMajorCompactionTsForTable(table);
2705 }
2706
2707 @Override
2708 public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
2709 return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
2710 }
2711
2712
2713
2714
2715
2716
2717
2718 public boolean isBalancerOn() {
2719 if (null == loadBalancerTracker) return false;
2720 return loadBalancerTracker.isBalancerOn();
2721 }
2722
2723
2724
2725
2726
2727 public boolean isNormalizerOn() {
2728 if (null == regionNormalizerTracker) {
2729 return false;
2730 }
2731 return regionNormalizerTracker.isNormalizerOn();
2732 }
2733
2734
2735
2736
2737
2738
2739 public String getLoadBalancerClassName() {
2740 return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
2741 .getDefaultLoadBalancerClass().getName());
2742 }
2743
2744
2745
2746
2747 public RegionNormalizerTracker getRegionNormalizerTracker() {
2748 return regionNormalizerTracker;
2749 }
2750 }