1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.lang.reflect.Constructor;
23 import java.lang.reflect.InvocationTargetException;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.UnknownHostException;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.Comparator;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41
42 import javax.management.ObjectName;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.classification.InterfaceAudience;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.fs.Path;
49 import org.apache.hadoop.hbase.Abortable;
50 import org.apache.hadoop.hbase.Chore;
51 import org.apache.hadoop.hbase.ClusterId;
52 import org.apache.hadoop.hbase.ClusterStatus;
53 import org.apache.hadoop.hbase.HBaseIOException;
54 import org.apache.hadoop.hbase.HColumnDescriptor;
55 import org.apache.hadoop.hbase.HConstants;
56 import org.apache.hadoop.hbase.HRegionInfo;
57 import org.apache.hadoop.hbase.HTableDescriptor;
58 import org.apache.hadoop.hbase.HealthCheckChore;
59 import org.apache.hadoop.hbase.MasterNotRunningException;
60 import org.apache.hadoop.hbase.NamespaceDescriptor;
61 import org.apache.hadoop.hbase.NamespaceNotFoundException;
62 import org.apache.hadoop.hbase.PleaseHoldException;
63 import org.apache.hadoop.hbase.Server;
64 import org.apache.hadoop.hbase.ServerLoad;
65 import org.apache.hadoop.hbase.ServerName;
66 import org.apache.hadoop.hbase.TableDescriptors;
67 import org.apache.hadoop.hbase.TableName;
68 import org.apache.hadoop.hbase.TableNotDisabledException;
69 import org.apache.hadoop.hbase.TableNotFoundException;
70 import org.apache.hadoop.hbase.UnknownRegionException;
71 import org.apache.hadoop.hbase.catalog.CatalogTracker;
72 import org.apache.hadoop.hbase.catalog.MetaReader;
73 import org.apache.hadoop.hbase.client.HConnectionManager;
74 import org.apache.hadoop.hbase.client.MetaScanner;
75 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
76 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
77 import org.apache.hadoop.hbase.client.Result;
78 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
79 import org.apache.hadoop.hbase.exceptions.DeserializationException;
80 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
81 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
82 import org.apache.hadoop.hbase.executor.ExecutorService;
83 import org.apache.hadoop.hbase.executor.ExecutorType;
84 import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
85 import org.apache.hadoop.hbase.ipc.RequestContext;
86 import org.apache.hadoop.hbase.ipc.RpcServer;
87 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
88 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
89 import org.apache.hadoop.hbase.ipc.ServerRpcController;
90 import org.apache.hadoop.hbase.master.RegionState.State;
91 import org.apache.hadoop.hbase.master.balancer.BalancerChore;
92 import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
93 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
94 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
95 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
96 import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
97 import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
98 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
99 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
100 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
101 import org.apache.hadoop.hbase.master.handler.ModifyTableHandler;
102 import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
103 import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
104 import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
105 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
106 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
107 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
108 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
109 import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
110 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
111 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
112 import org.apache.hadoop.hbase.protobuf.RequestConverter;
113 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
114 import org.apache.hadoop.hbase.protobuf.generated.*;
115 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
116 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
117 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
118 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
119 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
175 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
176 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
177 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
178 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
179 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
180 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
181 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
182 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
183 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
184 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
185 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
186 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
187 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
188 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
189 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
190 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
191 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
192 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
193 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
194 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
195 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
196 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
197 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
198 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
199 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
200 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
201 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
202 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
203 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
204 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
205 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
206 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
207 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
208 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
209 import org.apache.hadoop.hbase.replication.regionserver.Replication;
210 import org.apache.hadoop.hbase.security.UserProvider;
211 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
212 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
213 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
214 import org.apache.hadoop.hbase.util.Bytes;
215 import org.apache.hadoop.hbase.util.CompressionTest;
216 import org.apache.hadoop.hbase.util.FSTableDescriptors;
217 import org.apache.hadoop.hbase.util.FSUtils;
218 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
219 import org.apache.hadoop.hbase.util.HasThread;
220 import org.apache.hadoop.hbase.util.InfoServer;
221 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
222 import org.apache.hadoop.hbase.util.Pair;
223 import org.apache.hadoop.hbase.util.Sleeper;
224 import org.apache.hadoop.hbase.util.Strings;
225 import org.apache.hadoop.hbase.util.Threads;
226 import org.apache.hadoop.hbase.util.VersionInfo;
227 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
228 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
229 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
230 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
231 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
232 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
233 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
234 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
235 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
236 import org.apache.hadoop.metrics.util.MBeanUtil;
237 import org.apache.hadoop.net.DNS;
238 import org.apache.zookeeper.KeeperException;
239 import org.apache.zookeeper.Watcher;
240
241 import com.google.common.collect.Lists;
242 import com.google.common.collect.Maps;
243 import com.google.protobuf.Descriptors;
244 import com.google.protobuf.Message;
245 import com.google.protobuf.RpcCallback;
246 import com.google.protobuf.RpcController;
247 import com.google.protobuf.Service;
248 import com.google.protobuf.ServiceException;
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265 @InterfaceAudience.Private
266 @SuppressWarnings("deprecation")
267 public class HMaster extends HasThread implements MasterProtos.MasterService.BlockingInterface,
268 RegionServerStatusProtos.RegionServerStatusService.BlockingInterface,
269 MasterServices, Server {
270 private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
271
272
273
274 public static final String MASTER = "master";
275
276
277 private final Configuration conf;
278
279 private InfoServer infoServer;
280
281
282 private ZooKeeperWatcher zooKeeper;
283
284 private ActiveMasterManager activeMasterManager;
285
286 RegionServerTracker regionServerTracker;
287
288 private DrainingServerTracker drainingServerTracker;
289
290 private LoadBalancerTracker loadBalancerTracker;
291
292 private MasterAddressTracker masterAddressTracker;
293
294
295 private final RpcServerInterface rpcServer;
296 private JvmPauseMonitor pauseMonitor;
297
298
299 private volatile boolean rpcServerOpen = false;
300
301
302 private TableNamespaceManager tableNamespaceManager;
303 private NamespaceJanitor namespaceJanitorChore;
304
305
306
307
308 private final InetSocketAddress isa;
309
310
311 private final MetricsMaster metricsMaster;
312
313 private MasterFileSystem fileSystemManager;
314
315
316 ServerManager serverManager;
317
318
319 AssignmentManager assignmentManager;
320
321 private CatalogTracker catalogTracker;
322
323 private ClusterStatusTracker clusterStatusTracker;
324
325
326
327
328 private MemoryBoundedLogMessageBuffer rsFatals;
329
330
331
332 private volatile boolean stopped = false;
333
334 private volatile boolean abort = false;
335
336 private volatile boolean isActiveMaster = false;
337
338
339
340 volatile boolean initialized = false;
341
342
343 private volatile boolean serverShutdownHandlerEnabled = false;
344
345
346 ExecutorService executorService;
347
348 private LoadBalancer balancer;
349 private Thread balancerChore;
350 private Thread clusterStatusChore;
351 private ClusterStatusPublisher clusterStatusPublisherChore = null;
352
353 private CatalogJanitor catalogJanitorChore;
354 private LogCleaner logCleaner;
355 private HFileCleaner hfileCleaner;
356
357 private MasterCoprocessorHost cpHost;
358 private final ServerName serverName;
359
360 private TableDescriptors tableDescriptors;
361
362
363 private TableLockManager tableLockManager;
364
365
366 private long masterStartTime;
367 private long masterActiveTime;
368
369
370 private final int msgInterval;
371
372
373
374 private ObjectName mxBean = null;
375
376
377 private final boolean masterCheckCompression;
378
379 private SpanReceiverHost spanReceiverHost;
380
381 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
382
383
384 private SnapshotManager snapshotManager;
385
386 private MasterProcedureManagerHost mpmHost;
387
388
389 private HealthCheckChore healthCheckChore;
390
391
392 private volatile boolean initializationBeforeMetaAssignment = false;
393
394
395 private List<ZooKeeperListener> registeredZKListenersBeforeRecovery;
396
397
398
399
400
401
402
403
404
405
406
407
408
409 public HMaster(final Configuration conf)
410 throws IOException, KeeperException, InterruptedException {
411 this.conf = new Configuration(conf);
412
413 this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
414 FSUtils.setupShortCircuitRead(conf);
415
416 String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
417 conf.get("hbase.master.dns.interface", "default"),
418 conf.get("hbase.master.dns.nameserver", "default")));
419 int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
420
421 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
422 if (initialIsa.getAddress() == null) {
423 throw new IllegalArgumentException("Failed resolve of hostname " + initialIsa);
424 }
425
426 String bindAddress = conf.get("hbase.master.ipc.address");
427 if (bindAddress != null) {
428 initialIsa = new InetSocketAddress(bindAddress, port);
429 if (initialIsa.getAddress() == null) {
430 throw new IllegalArgumentException("Failed resolve of bind address " + initialIsa);
431 }
432 }
433 String name = "master/" + initialIsa.toString();
434
435 HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
436 int numHandlers = conf.getInt(HConstants.MASTER_HANDLER_COUNT,
437 conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_MASTER_HANLDER_COUNT));
438 this.rpcServer = new RpcServer(this, name, getServices(),
439 initialIsa,
440 conf,
441 new FifoRpcScheduler(conf, numHandlers));
442
443 this.isa = this.rpcServer.getListenerAddress();
444
445 this.serverName = ServerName.valueOf(hostname, this.isa.getPort(), System.currentTimeMillis());
446 this.rsFatals = new MemoryBoundedLogMessageBuffer(
447 conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
448
449
450 ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
451 "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
452
453
454 UserProvider provider = UserProvider.instantiate(conf);
455 provider.login("hbase.master.keytab.file",
456 "hbase.master.kerberos.principal", this.isa.getHostName());
457
458 LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
459 ", hbase.cluster.distributed=" + this.conf.getBoolean("hbase.cluster.distributed", false));
460
461
462 setName(MASTER + ":" + this.serverName.toShortString());
463
464 Replication.decorateMasterConfiguration(this.conf);
465
466
467
468 if (this.conf.get("mapred.task.id") == null) {
469 this.conf.set("mapred.task.id", "hb_m_" + this.serverName.toString());
470 }
471
472 this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
473 this.rpcServer.startThreads();
474 this.pauseMonitor = new JvmPauseMonitor(conf);
475 this.pauseMonitor.start();
476
477
478 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
479
480
481 this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
482
483 this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this));
484
485
486 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
487 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
488 if (isHealthCheckerConfigured()) {
489 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
490 }
491
492
493 boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
494 HConstants.STATUS_PUBLISHED_DEFAULT);
495 Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
496 conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
497 ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
498 ClusterStatusPublisher.Publisher.class);
499
500 if (shouldPublish) {
501 if (publisherClass == null) {
502 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
503 ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
504 " is not set - not publishing status");
505 } else {
506 clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
507 Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
508 }
509 }
510 }
511
512
513
514
515 private List<BlockingServiceAndInterface> getServices() {
516 List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(3);
517 bssi.add(new BlockingServiceAndInterface(
518 MasterProtos.MasterService.newReflectiveBlockingService(this),
519 MasterProtos.MasterService.BlockingInterface.class));
520 bssi.add(new BlockingServiceAndInterface(
521 RegionServerStatusProtos.RegionServerStatusService.newReflectiveBlockingService(this),
522 RegionServerStatusProtos.RegionServerStatusService.BlockingInterface.class));
523 return bssi;
524 }
525
526
527
528
529
530
531
532
533 private static void stallIfBackupMaster(final Configuration c,
534 final ActiveMasterManager amm)
535 throws InterruptedException {
536
537 if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP,
538 HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
539 return;
540 }
541 LOG.debug("HMaster started in backup mode. " +
542 "Stalling until master znode is written.");
543
544
545 while (!amm.isActiveMaster()) {
546 LOG.debug("Waiting for master address ZNode to be written " +
547 "(Also watching cluster state node)");
548 Thread.sleep(
549 c.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT));
550 }
551
552 }
553
554 MetricsMaster getMetrics() {
555 return metricsMaster;
556 }
557
558
559
560
561
562
563
564
565
566
567 @Override
568 public void run() {
569 MonitoredTask startupStatus =
570 TaskMonitor.get().createStatus("Master startup");
571 startupStatus.setDescription("Master startup");
572 masterStartTime = System.currentTimeMillis();
573 try {
574 this.masterAddressTracker = new MasterAddressTracker(getZooKeeperWatcher(), this);
575 this.masterAddressTracker.start();
576
577
578 int port = this.conf.getInt("hbase.master.info.port", 60010);
579 if (port >= 0) {
580 String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
581 this.infoServer = new InfoServer(MASTER, a, port, false, this.conf);
582 this.infoServer.addServlet("status", "/master-status", MasterStatusServlet.class);
583 this.infoServer.addServlet("dump", "/dump", MasterDumpServlet.class);
584 this.infoServer.setAttribute(MASTER, this);
585 this.infoServer.start();
586 }
587
588 this.registeredZKListenersBeforeRecovery = this.zooKeeper.getListeners();
589
590
591
592
593
594
595
596
597
598
599 becomeActiveMaster(startupStatus);
600
601
602 if (!this.stopped) {
603 finishInitialization(startupStatus, false);
604 loop();
605 }
606 } catch (Throwable t) {
607
608 if (t instanceof NoClassDefFoundError &&
609 t.getMessage().contains("org/apache/hadoop/hdfs/protocol/FSConstants$SafeModeAction")) {
610
611 abort("HBase is having a problem with its Hadoop jars. You may need to "
612 + "recompile HBase against Hadoop version "
613 + org.apache.hadoop.util.VersionInfo.getVersion()
614 + " or change your hadoop jars to start properly", t);
615 } else {
616 abort("Unhandled exception. Starting shutdown.", t);
617 }
618 } finally {
619 startupStatus.cleanup();
620
621 stopChores();
622
623
624 if (!this.abort && this.serverManager != null &&
625 this.serverManager.isClusterShutdown()) {
626 this.serverManager.letRegionServersShutdown();
627 }
628 stopServiceThreads();
629
630 if (this.activeMasterManager != null) this.activeMasterManager.stop();
631 if (this.catalogTracker != null) this.catalogTracker.stop();
632 if (this.serverManager != null) this.serverManager.stop();
633 if (this.assignmentManager != null) this.assignmentManager.stop();
634 if (this.fileSystemManager != null) this.fileSystemManager.stop();
635 if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
636 this.zooKeeper.close();
637 }
638 LOG.info("HMaster main thread exiting");
639 }
640
641
642
643
644
645
646
647 private boolean becomeActiveMaster(MonitoredTask startupStatus)
648 throws InterruptedException {
649
650
651 this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,
652 this);
653 this.zooKeeper.registerListener(activeMasterManager);
654 stallIfBackupMaster(this.conf, this.activeMasterManager);
655
656
657
658
659 this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
660 this.clusterStatusTracker.start();
661 return this.activeMasterManager.blockUntilBecomingActiveMaster(startupStatus);
662 }
663
664
665
666
667
668
669 void initializeZKBasedSystemTrackers() throws IOException,
670 InterruptedException, KeeperException {
671 this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf, this);
672 this.catalogTracker.start();
673
674 this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
675 this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
676 this.loadBalancerTracker.start();
677 this.assignmentManager = new AssignmentManager(this, serverManager,
678 this.catalogTracker, this.balancer, this.executorService, this.metricsMaster,
679 this.tableLockManager);
680 zooKeeper.registerListenerFirst(assignmentManager);
681
682 this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
683 this.serverManager);
684 this.regionServerTracker.start();
685
686 this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
687 this.serverManager);
688 this.drainingServerTracker.start();
689
690
691
692 boolean wasUp = this.clusterStatusTracker.isClusterUp();
693 if (!wasUp) this.clusterStatusTracker.setClusterUp();
694
695 LOG.info("Server active/primary master=" + this.serverName +
696 ", sessionid=0x" +
697 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
698 ", setting cluster-up flag (Was=" + wasUp + ")");
699
700
701 this.snapshotManager = new SnapshotManager();
702 this.mpmHost = new MasterProcedureManagerHost();
703 this.mpmHost.register(this.snapshotManager);
704 this.mpmHost.loadProcedures(conf);
705 this.mpmHost.initialize(this, this.metricsMaster);
706 }
707
708
709
710
711
712
713
714
715
716
717
718
719
720 CatalogTracker createCatalogTracker(final ZooKeeperWatcher zk,
721 final Configuration conf, Abortable abortable)
722 throws IOException {
723 return new CatalogTracker(zk, conf, abortable);
724 }
725
726
727 private Sleeper stopSleeper = new Sleeper(100, this);
728
729 private void loop() {
730 long lastMsgTs = 0l;
731 long now = 0l;
732 while (!this.stopped) {
733 now = System.currentTimeMillis();
734 if ((now - lastMsgTs) >= this.msgInterval) {
735 doMetrics();
736 lastMsgTs = System.currentTimeMillis();
737 }
738 stopSleeper.sleep();
739 }
740 }
741
742
743
744
745
746 private void doMetrics() {
747 try {
748 this.assignmentManager.updateRegionsInTransitionMetrics();
749 } catch (Throwable e) {
750 LOG.error("Couldn't update metrics: " + e.getMessage());
751 }
752 }
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775 private void finishInitialization(MonitoredTask status, boolean masterRecovery)
776 throws IOException, InterruptedException, KeeperException {
777
778 isActiveMaster = true;
779
780
781
782
783
784
785
786 status.setStatus("Initializing Master file system");
787
788 this.masterActiveTime = System.currentTimeMillis();
789
790 this.fileSystemManager = new MasterFileSystem(this, this, masterRecovery);
791
792 this.tableDescriptors =
793 new FSTableDescriptors(this.fileSystemManager.getFileSystem(),
794 this.fileSystemManager.getRootDir());
795
796
797 status.setStatus("Publishing Cluster ID in ZooKeeper");
798 ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
799
800 if (!masterRecovery) {
801 this.executorService = new ExecutorService(getServerName().toShortString());
802 this.serverManager = createServerManager(this, this);
803 }
804
805
806
807 this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName);
808 if (!masterRecovery) {
809 this.tableLockManager.reapWriteLocks();
810 }
811
812 status.setStatus("Initializing ZK system trackers");
813 initializeZKBasedSystemTrackers();
814
815 if (!masterRecovery) {
816
817 status.setStatus("Initializing master coprocessors");
818 this.cpHost = new MasterCoprocessorHost(this, this.conf);
819
820 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
821
822
823 status.setStatus("Initializing master service threads");
824 startServiceThreads();
825 }
826
827
828 this.serverManager.waitForRegionServers(status);
829
830 for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
831
832 if (!this.serverManager.isServerOnline(sn)
833 && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
834 LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
835 }
836 }
837
838 if (!masterRecovery) {
839 this.assignmentManager.startTimeOutMonitor();
840 }
841
842
843
844
845 Set<ServerName> previouslyFailedServers = this.fileSystemManager
846 .getFailedServersFromLogFolders();
847
848
849 this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
850
851
852 ServerName oldMetaServerLocation = this.catalogTracker.getMetaLocation();
853 if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
854 splitMetaLogBeforeAssignment(oldMetaServerLocation);
855
856
857 }
858 Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
859
860
861
862
863
864
865
866
867 previouslyFailedMetaRSs.addAll(previouslyFailedServers);
868
869 this.initializationBeforeMetaAssignment = true;
870
871
872 this.balancer.setClusterStatus(getClusterStatus());
873 this.balancer.setMasterServices(this);
874 this.balancer.initialize();
875
876
877 status.setStatus("Assigning Meta Region");
878 assignMeta(status, previouslyFailedMetaRSs);
879
880
881 if(this.stopped) return;
882
883 status.setStatus("Submitting log splitting work for previously failed region servers");
884
885
886 for (ServerName tmpServer : previouslyFailedServers) {
887 this.serverManager.processDeadServer(tmpServer, true);
888 }
889
890
891
892
893 org.apache.hadoop.hbase.catalog.MetaMigrationConvertingToPB
894 .updateMetaIfNecessary(this);
895
896
897 status.setStatus("Starting assignment manager");
898 this.assignmentManager.joinCluster();
899
900
901 this.balancer.setClusterStatus(getClusterStatus());
902
903 if (!masterRecovery) {
904
905
906 status.setStatus("Starting balancer and catalog janitor");
907 this.clusterStatusChore = getAndStartClusterStatusChore(this);
908 this.balancerChore = getAndStartBalancerChore(this);
909 this.catalogJanitorChore = new CatalogJanitor(this, this);
910 startCatalogJanitorChore();
911 }
912
913 status.setStatus("Starting namespace manager");
914 initNamespace();
915
916 if (this.cpHost != null) {
917 try {
918 this.cpHost.preMasterInitialization();
919 } catch (IOException e) {
920 LOG.error("Coprocessor preMasterInitialization() hook failed", e);
921 }
922 }
923
924 status.markComplete("Initialization successful");
925 LOG.info("Master has completed initialization");
926 initialized = true;
927
928
929
930 this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
931
932 if (!masterRecovery) {
933 if (this.cpHost != null) {
934
935 try {
936 this.cpHost.postStartMaster();
937 } catch (IOException ioe) {
938 LOG.error("Coprocessor postStartMaster() hook failed", ioe);
939 }
940 }
941 }
942 }
943
944
945
946
947
948 protected void startCatalogJanitorChore() {
949 Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
950 }
951
952
953
954
955
956 protected void startNamespaceJanitorChore() {
957 Threads.setDaemonThreadRunning(namespaceJanitorChore.getThread());
958 }
959
960
961
962
963
964
965
966
967
968 ServerManager createServerManager(final Server master,
969 final MasterServices services)
970 throws IOException {
971
972
973 return new ServerManager(master, services);
974 }
975
976
977
978
979
980
981
982
983
984 void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs)
985 throws InterruptedException, IOException, KeeperException {
986
987 int assigned = 0;
988 long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
989 status.setStatus("Assigning hbase:meta region");
990
991 RegionStates regionStates = assignmentManager.getRegionStates();
992 regionStates.createRegionState(HRegionInfo.FIRST_META_REGIONINFO);
993 boolean rit = this.assignmentManager
994 .processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
995 boolean metaRegionLocation = this.catalogTracker.verifyMetaRegionLocation(timeout);
996 ServerName currentMetaServer = this.catalogTracker.getMetaLocation();
997 if (!metaRegionLocation) {
998
999
1000 assigned++;
1001 if (!rit) {
1002
1003 if (currentMetaServer != null) {
1004
1005
1006
1007
1008
1009
1010
1011 if (serverManager.isServerOnline(currentMetaServer)) {
1012 LOG.info("Forcing expire of " + currentMetaServer);
1013 serverManager.expireServer(currentMetaServer);
1014 }
1015 splitMetaLogBeforeAssignment(currentMetaServer);
1016 previouslyFailedMetaRSs.add(currentMetaServer);
1017 }
1018 assignmentManager.assignMeta();
1019 }
1020 } else {
1021
1022 regionStates.updateRegionState(
1023 HRegionInfo.FIRST_META_REGIONINFO, State.OPEN, currentMetaServer);
1024 this.assignmentManager.regionOnline(
1025 HRegionInfo.FIRST_META_REGIONINFO, currentMetaServer);
1026 }
1027
1028 enableMeta(TableName.META_TABLE_NAME);
1029
1030 if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
1031 && (!previouslyFailedMetaRSs.isEmpty())) {
1032
1033 status.setStatus("replaying log for Meta Region");
1034 this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
1035 }
1036
1037
1038
1039
1040
1041 enableServerShutdownHandler(assigned != 0);
1042
1043 LOG.info("hbase:meta assigned=" + assigned + ", rit=" + rit +
1044 ", location=" + catalogTracker.getMetaLocation());
1045 status.setStatus("META assigned.");
1046 }
1047
1048 void initNamespace() throws IOException {
1049
1050 tableNamespaceManager = new TableNamespaceManager(this);
1051 tableNamespaceManager.start();
1052 }
1053
1054 private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
1055 if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
1056
1057 Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
1058 regions.add(HRegionInfo.FIRST_META_REGIONINFO);
1059 this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
1060 } else {
1061
1062 this.fileSystemManager.splitMetaLog(currentMetaServer);
1063 }
1064 }
1065
1066 private void enableServerShutdownHandler(
1067 final boolean waitForMeta) throws IOException, InterruptedException {
1068
1069
1070
1071
1072
1073 if (!serverShutdownHandlerEnabled) {
1074 serverShutdownHandlerEnabled = true;
1075 this.serverManager.processQueuedDeadServers();
1076 }
1077
1078 if (waitForMeta) {
1079 this.catalogTracker.waitForMeta();
1080
1081
1082 this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
1083 }
1084 }
1085
1086 private void enableMeta(TableName metaTableName) {
1087 if (!this.assignmentManager.getZKTable().isEnabledTable(metaTableName)) {
1088 this.assignmentManager.setEnabledTable(metaTableName);
1089 }
1090 }
1091
1092
1093
1094
1095
1096
1097 private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
1098 Set<ServerName> result = new HashSet<ServerName>();
1099 String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1100 HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1101 List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1102 if (regionFailedServers == null) return result;
1103
1104 for(String failedServer : regionFailedServers) {
1105 ServerName server = ServerName.parseServerName(failedServer);
1106 result.add(server);
1107 }
1108 return result;
1109 }
1110
1111 @Override
1112 public TableDescriptors getTableDescriptors() {
1113 return this.tableDescriptors;
1114 }
1115
1116
1117 public InfoServer getInfoServer() {
1118 return this.infoServer;
1119 }
1120
1121 @Override
1122 public Configuration getConfiguration() {
1123 return this.conf;
1124 }
1125
1126 @Override
1127 public ServerManager getServerManager() {
1128 return this.serverManager;
1129 }
1130
1131 @Override
1132 public ExecutorService getExecutorService() {
1133 return this.executorService;
1134 }
1135
1136 @Override
1137 public MasterFileSystem getMasterFileSystem() {
1138 return this.fileSystemManager;
1139 }
1140
1141
1142
1143
1144
1145 public ZooKeeperWatcher getZooKeeperWatcher() {
1146 return this.zooKeeper;
1147 }
1148
1149 public ActiveMasterManager getActiveMasterManager() {
1150 return this.activeMasterManager;
1151 }
1152
1153 public MasterAddressTracker getMasterAddressTracker() {
1154 return this.masterAddressTracker;
1155 }
1156
1157
1158
1159
1160
1161
1162
1163
1164 void startServiceThreads() throws IOException{
1165
1166 this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1167 conf.getInt("hbase.master.executor.openregion.threads", 5));
1168 this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1169 conf.getInt("hbase.master.executor.closeregion.threads", 5));
1170 this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1171 conf.getInt("hbase.master.executor.serverops.threads", 5));
1172 this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1173 conf.getInt("hbase.master.executor.serverops.threads", 5));
1174 this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1175 conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1176
1177
1178
1179
1180
1181
1182 this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1183
1184
1185 String n = Thread.currentThread().getName();
1186 int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1187 this.logCleaner =
1188 new LogCleaner(cleanerInterval,
1189 this, conf, getMasterFileSystem().getFileSystem(),
1190 getMasterFileSystem().getOldLogDir());
1191 Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner");
1192
1193
1194 Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1195 this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1196 .getFileSystem(), archiveDir);
1197 Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner");
1198
1199
1200 if (this.healthCheckChore != null) {
1201 Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker");
1202 }
1203
1204
1205 this.rpcServer.openServer();
1206 this.rpcServerOpen = true;
1207 if (LOG.isTraceEnabled()) {
1208 LOG.trace("Started service threads");
1209 }
1210 }
1211
1212
1213
1214
1215
1216 boolean isRpcServerOpen() {
1217 return this.rpcServerOpen;
1218 }
1219
1220 private void stopServiceThreads() {
1221 if (LOG.isDebugEnabled()) {
1222 LOG.debug("Stopping service threads");
1223 }
1224 if (this.rpcServer != null) this.rpcServer.stop();
1225 this.rpcServerOpen = false;
1226
1227 if (this.logCleaner!= null) this.logCleaner.interrupt();
1228 if (this.hfileCleaner != null) this.hfileCleaner.interrupt();
1229
1230 if (this.infoServer != null) {
1231 LOG.info("Stopping infoServer");
1232 try {
1233 this.infoServer.stop();
1234 } catch (Exception ex) {
1235 ex.printStackTrace();
1236 }
1237 }
1238 if (this.executorService != null) this.executorService.shutdown();
1239 if (this.healthCheckChore != null) {
1240 this.healthCheckChore.interrupt();
1241 }
1242 if (this.pauseMonitor != null) {
1243 this.pauseMonitor.stop();
1244 }
1245 }
1246
1247 private static Thread getAndStartClusterStatusChore(HMaster master) {
1248 if (master == null || master.balancer == null) {
1249 return null;
1250 }
1251 Chore chore = new ClusterStatusChore(master, master.balancer);
1252 return Threads.setDaemonThreadRunning(chore.getThread());
1253 }
1254
1255 private static Thread getAndStartBalancerChore(final HMaster master) {
1256
1257 Chore chore = new BalancerChore(master);
1258 return Threads.setDaemonThreadRunning(chore.getThread());
1259 }
1260
1261 private void stopChores() {
1262 if (this.balancerChore != null) {
1263 this.balancerChore.interrupt();
1264 }
1265 if (this.clusterStatusChore != null) {
1266 this.clusterStatusChore.interrupt();
1267 }
1268 if (this.catalogJanitorChore != null) {
1269 this.catalogJanitorChore.interrupt();
1270 }
1271 if (this.clusterStatusPublisherChore != null){
1272 clusterStatusPublisherChore.interrupt();
1273 }
1274 if (this.namespaceJanitorChore != null){
1275 namespaceJanitorChore.interrupt();
1276 }
1277 }
1278
1279 @Override
1280 public RegionServerStartupResponse regionServerStartup(
1281 RpcController controller, RegionServerStartupRequest request) throws ServiceException {
1282
1283 try {
1284 InetAddress ia = getRemoteInetAddress(request.getPort(), request.getServerStartCode());
1285 ServerName rs = this.serverManager.regionServerStartup(ia, request.getPort(),
1286 request.getServerStartCode(), request.getServerCurrentTime());
1287
1288
1289 RegionServerStartupResponse.Builder resp = createConfigurationSubset();
1290 NameStringPair.Builder entry = NameStringPair.newBuilder()
1291 .setName(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)
1292 .setValue(rs.getHostname());
1293 resp.addMapEntries(entry.build());
1294
1295 return resp.build();
1296 } catch (IOException ioe) {
1297 throw new ServiceException(ioe);
1298 }
1299 }
1300
1301
1302
1303
1304
1305 InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
1306 throws UnknownHostException {
1307
1308
1309 return RpcServer.getRemoteIp();
1310 }
1311
1312
1313
1314
1315
1316 protected RegionServerStartupResponse.Builder createConfigurationSubset() {
1317 RegionServerStartupResponse.Builder resp = addConfig(
1318 RegionServerStartupResponse.newBuilder(), HConstants.HBASE_DIR);
1319 resp = addConfig(resp, "fs.default.name");
1320 return addConfig(resp, "hbase.master.info.port");
1321 }
1322
1323 private RegionServerStartupResponse.Builder addConfig(
1324 final RegionServerStartupResponse.Builder resp, final String key) {
1325 NameStringPair.Builder entry = NameStringPair.newBuilder()
1326 .setName(key)
1327 .setValue(this.conf.get(key));
1328 resp.addMapEntries(entry.build());
1329 return resp;
1330 }
1331
1332 @Override
1333 public GetLastFlushedSequenceIdResponse getLastFlushedSequenceId(RpcController controller,
1334 GetLastFlushedSequenceIdRequest request) throws ServiceException {
1335 byte[] regionName = request.getRegionName().toByteArray();
1336 long seqId = serverManager.getLastFlushedSequenceId(regionName);
1337 return ResponseConverter.buildGetLastFlushedSequenceIdResponse(seqId);
1338 }
1339
1340 @Override
1341 public RegionServerReportResponse regionServerReport(
1342 RpcController controller, RegionServerReportRequest request) throws ServiceException {
1343 try {
1344 ClusterStatusProtos.ServerLoad sl = request.getLoad();
1345 ServerName serverName = ProtobufUtil.toServerName(request.getServer());
1346 ServerLoad oldLoad = serverManager.getLoad(serverName);
1347 this.serverManager.regionServerReport(serverName, new ServerLoad(sl));
1348 if (sl != null && this.metricsMaster != null) {
1349
1350 this.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
1351 - (oldLoad != null ? oldLoad.getTotalNumberOfRequests() : 0));
1352 }
1353 } catch (IOException ioe) {
1354 throw new ServiceException(ioe);
1355 }
1356
1357 return RegionServerReportResponse.newBuilder().build();
1358 }
1359
1360 @Override
1361 public ReportRSFatalErrorResponse reportRSFatalError(
1362 RpcController controller, ReportRSFatalErrorRequest request) throws ServiceException {
1363 String errorText = request.getErrorMessage();
1364 ServerName sn = ProtobufUtil.toServerName(request.getServer());
1365 String msg = "Region server " + sn +
1366 " reported a fatal error:\n" + errorText;
1367 LOG.error(msg);
1368 rsFatals.add(msg);
1369
1370 return ReportRSFatalErrorResponse.newBuilder().build();
1371 }
1372
1373 public boolean isMasterRunning() {
1374 return !isStopped();
1375 }
1376
1377 @Override
1378 public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req)
1379 throws ServiceException {
1380 return IsMasterRunningResponse.newBuilder().setIsMasterRunning(isMasterRunning()).build();
1381 }
1382
1383 @Override
1384 public RunCatalogScanResponse runCatalogScan(RpcController c,
1385 RunCatalogScanRequest req) throws ServiceException {
1386 try {
1387 return ResponseConverter.buildRunCatalogScanResponse(catalogJanitorChore.scan());
1388 } catch (IOException ioe) {
1389 throw new ServiceException(ioe);
1390 }
1391 }
1392
1393 @Override
1394 public EnableCatalogJanitorResponse enableCatalogJanitor(RpcController c,
1395 EnableCatalogJanitorRequest req) throws ServiceException {
1396 return EnableCatalogJanitorResponse.newBuilder().
1397 setPrevValue(catalogJanitorChore.setEnabled(req.getEnable())).build();
1398 }
1399
1400 @Override
1401 public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(RpcController c,
1402 IsCatalogJanitorEnabledRequest req) throws ServiceException {
1403 boolean isEnabled = catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false;
1404 return IsCatalogJanitorEnabledResponse.newBuilder().setValue(isEnabled).build();
1405 }
1406
1407
1408
1409
1410 private int getBalancerCutoffTime() {
1411 int balancerCutoffTime =
1412 getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1413 if (balancerCutoffTime == -1) {
1414
1415 int balancerPeriod =
1416 getConfiguration().getInt("hbase.balancer.period", 300000);
1417 balancerCutoffTime = balancerPeriod;
1418
1419 if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
1420 }
1421 return balancerCutoffTime;
1422 }
1423
1424 public boolean balance() throws HBaseIOException {
1425
1426 if (!this.initialized) {
1427 LOG.debug("Master has not been initialized, don't run balancer.");
1428 return false;
1429 }
1430
1431 int maximumBalanceTime = getBalancerCutoffTime();
1432 boolean balancerRan;
1433 synchronized (this.balancer) {
1434
1435 if (!this.loadBalancerTracker.isBalancerOn()) return false;
1436
1437 if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1438 Map<String, RegionState> regionsInTransition =
1439 this.assignmentManager.getRegionStates().getRegionsInTransition();
1440 LOG.debug("Not running balancer because " + regionsInTransition.size() +
1441 " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1442 abbreviate(regionsInTransition.toString(), 256));
1443 return false;
1444 }
1445 if (this.serverManager.areDeadServersInProgress()) {
1446 LOG.debug("Not running balancer because processing dead regionserver(s): " +
1447 this.serverManager.getDeadServers());
1448 return false;
1449 }
1450
1451 if (this.cpHost != null) {
1452 try {
1453 if (this.cpHost.preBalance()) {
1454 LOG.debug("Coprocessor bypassing balancer request");
1455 return false;
1456 }
1457 } catch (IOException ioe) {
1458 LOG.error("Error invoking master coprocessor preBalance()", ioe);
1459 return false;
1460 }
1461 }
1462
1463 Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1464 this.assignmentManager.getRegionStates().getAssignmentsByTable();
1465
1466 List<RegionPlan> plans = new ArrayList<RegionPlan>();
1467
1468 this.balancer.setClusterStatus(getClusterStatus());
1469 for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
1470 List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
1471 if (partialPlans != null) plans.addAll(partialPlans);
1472 }
1473 long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1474 int rpCount = 0;
1475 long totalRegPlanExecTime = 0;
1476 balancerRan = plans != null;
1477 if (plans != null && !plans.isEmpty()) {
1478 for (RegionPlan plan: plans) {
1479 LOG.info("balance " + plan);
1480 long balStartTime = System.currentTimeMillis();
1481
1482 this.assignmentManager.balance(plan);
1483 totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1484 rpCount++;
1485 if (rpCount < plans.size() &&
1486
1487 (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1488
1489 LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1490 maximumBalanceTime);
1491 break;
1492 }
1493 }
1494 }
1495 if (this.cpHost != null) {
1496 try {
1497 this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1498 } catch (IOException ioe) {
1499
1500 LOG.error("Error invoking master coprocessor postBalance()", ioe);
1501 }
1502 }
1503 }
1504 return balancerRan;
1505 }
1506
1507 @Override
1508 public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException {
1509 try {
1510 return BalanceResponse.newBuilder().setBalancerRan(balance()).build();
1511 } catch (HBaseIOException ex) {
1512 throw new ServiceException(ex);
1513 }
1514 }
1515
1516 enum BalanceSwitchMode {
1517 SYNC,
1518 ASYNC
1519 }
1520
1521
1522
1523
1524
1525
1526
1527 public boolean switchBalancer(final boolean b, BalanceSwitchMode mode) throws IOException {
1528 boolean oldValue = this.loadBalancerTracker.isBalancerOn();
1529 boolean newValue = b;
1530 try {
1531 if (this.cpHost != null) {
1532 newValue = this.cpHost.preBalanceSwitch(newValue);
1533 }
1534 try {
1535 if (mode == BalanceSwitchMode.SYNC) {
1536 synchronized (this.balancer) {
1537 this.loadBalancerTracker.setBalancerOn(newValue);
1538 }
1539 } else {
1540 this.loadBalancerTracker.setBalancerOn(newValue);
1541 }
1542 } catch (KeeperException ke) {
1543 throw new IOException(ke);
1544 }
1545 LOG.info(getClientIdAuditPrefix() + " set balanceSwitch=" + newValue);
1546 if (this.cpHost != null) {
1547 this.cpHost.postBalanceSwitch(oldValue, newValue);
1548 }
1549 } catch (IOException ioe) {
1550 LOG.warn("Error flipping balance switch", ioe);
1551 }
1552 return oldValue;
1553 }
1554
1555
1556
1557
1558 String getClientIdAuditPrefix() {
1559 return "Client=" + RequestContext.getRequestUserName() + "/" +
1560 RequestContext.get().getRemoteAddress();
1561 }
1562
1563 public boolean synchronousBalanceSwitch(final boolean b) throws IOException {
1564 return switchBalancer(b, BalanceSwitchMode.SYNC);
1565 }
1566
1567 public boolean balanceSwitch(final boolean b) throws IOException {
1568 return switchBalancer(b, BalanceSwitchMode.ASYNC);
1569 }
1570
1571 @Override
1572 public SetBalancerRunningResponse setBalancerRunning(
1573 RpcController controller, SetBalancerRunningRequest req) throws ServiceException {
1574 try {
1575 boolean prevValue = (req.getSynchronous())?
1576 synchronousBalanceSwitch(req.getOn()):balanceSwitch(req.getOn());
1577 return SetBalancerRunningResponse.newBuilder().setPrevBalanceValue(prevValue).build();
1578 } catch (IOException ioe) {
1579 throw new ServiceException(ioe);
1580 }
1581 }
1582
1583
1584
1585
1586
1587
1588
1589 public void setCatalogJanitorEnabled(final boolean b) {
1590 this.catalogJanitorChore.setEnabled(b);
1591 }
1592
1593 @Override
1594 public DispatchMergingRegionsResponse dispatchMergingRegions(
1595 RpcController controller, DispatchMergingRegionsRequest request)
1596 throws ServiceException {
1597 final byte[] encodedNameOfRegionA = request.getRegionA().getValue()
1598 .toByteArray();
1599 final byte[] encodedNameOfRegionB = request.getRegionB().getValue()
1600 .toByteArray();
1601 final boolean forcible = request.getForcible();
1602 if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME
1603 || request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) {
1604 LOG.warn("mergeRegions specifier type: expected: "
1605 + RegionSpecifierType.ENCODED_REGION_NAME + " actual: region_a="
1606 + request.getRegionA().getType() + ", region_b="
1607 + request.getRegionB().getType());
1608 }
1609 RegionState regionStateA = assignmentManager.getRegionStates()
1610 .getRegionState(Bytes.toString(encodedNameOfRegionA));
1611 RegionState regionStateB = assignmentManager.getRegionStates()
1612 .getRegionState(Bytes.toString(encodedNameOfRegionB));
1613 if (regionStateA == null || regionStateB == null) {
1614 throw new ServiceException(new UnknownRegionException(
1615 Bytes.toStringBinary(regionStateA == null ? encodedNameOfRegionA
1616 : encodedNameOfRegionB)));
1617 }
1618
1619 if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
1620 throw new ServiceException(new MergeRegionException(
1621 "Unable to merge regions not online " + regionStateA + ", " + regionStateB));
1622 }
1623
1624 HRegionInfo regionInfoA = regionStateA.getRegion();
1625 HRegionInfo regionInfoB = regionStateB.getRegion();
1626 if (regionInfoA.compareTo(regionInfoB) == 0) {
1627 throw new ServiceException(new MergeRegionException(
1628 "Unable to merge a region to itself " + regionInfoA + ", " + regionInfoB));
1629 }
1630
1631 if (!forcible && !HRegionInfo.areAdjacent(regionInfoA, regionInfoB)) {
1632 throw new ServiceException(new MergeRegionException(
1633 "Unable to merge not adjacent regions "
1634 + regionInfoA.getRegionNameAsString() + ", "
1635 + regionInfoB.getRegionNameAsString()
1636 + " where forcible = " + forcible));
1637 }
1638
1639 try {
1640 dispatchMergingRegions(regionInfoA, regionInfoB, forcible);
1641 } catch (IOException ioe) {
1642 throw new ServiceException(ioe);
1643 }
1644
1645 return DispatchMergingRegionsResponse.newBuilder().build();
1646 }
1647
1648 @Override
1649 public void dispatchMergingRegions(final HRegionInfo region_a,
1650 final HRegionInfo region_b, final boolean forcible) throws IOException {
1651 checkInitialized();
1652 this.executorService.submit(new DispatchMergingRegionHandler(this,
1653 this.catalogJanitorChore, region_a, region_b, forcible));
1654 }
1655
1656 @Override
1657 public MoveRegionResponse moveRegion(RpcController controller, MoveRegionRequest req)
1658 throws ServiceException {
1659 final byte [] encodedRegionName = req.getRegion().getValue().toByteArray();
1660 RegionSpecifierType type = req.getRegion().getType();
1661 final byte [] destServerName = (req.hasDestServerName())?
1662 Bytes.toBytes(ProtobufUtil.toServerName(req.getDestServerName()).getServerName()):null;
1663 MoveRegionResponse mrr = MoveRegionResponse.newBuilder().build();
1664
1665 if (type != RegionSpecifierType.ENCODED_REGION_NAME) {
1666 LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME
1667 + " actual: " + type);
1668 }
1669
1670 try {
1671 move(encodedRegionName, destServerName);
1672 } catch (HBaseIOException ioe) {
1673 throw new ServiceException(ioe);
1674 }
1675 return mrr;
1676 }
1677
1678 void move(final byte[] encodedRegionName,
1679 final byte[] destServerName) throws HBaseIOException {
1680 RegionState regionState = assignmentManager.getRegionStates().
1681 getRegionState(Bytes.toString(encodedRegionName));
1682 if (regionState == null) {
1683 throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1684 }
1685
1686 HRegionInfo hri = regionState.getRegion();
1687 ServerName dest;
1688 if (destServerName == null || destServerName.length == 0) {
1689 LOG.info("Passed destination servername is null/empty so " +
1690 "choosing a server at random");
1691 final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1692 regionState.getServerName());
1693 dest = balancer.randomAssignment(hri, destServers);
1694 } else {
1695 dest = ServerName.valueOf(Bytes.toString(destServerName));
1696 if (dest.equals(regionState.getServerName())) {
1697 LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1698 + " because region already assigned to the same server " + dest + ".");
1699 return;
1700 }
1701 }
1702
1703
1704 RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1705
1706 try {
1707 checkInitialized();
1708 if (this.cpHost != null) {
1709 if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1710 return;
1711 }
1712 }
1713 LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1714 this.assignmentManager.balance(rp);
1715 if (this.cpHost != null) {
1716 this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1717 }
1718 } catch (IOException ioe) {
1719 if (ioe instanceof HBaseIOException) {
1720 throw (HBaseIOException)ioe;
1721 }
1722 throw new HBaseIOException(ioe);
1723 }
1724 }
1725
1726 @Override
1727 public void createTable(HTableDescriptor hTableDescriptor,
1728 byte [][] splitKeys)
1729 throws IOException {
1730 if (!isMasterRunning()) {
1731 throw new MasterNotRunningException();
1732 }
1733
1734 String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1735 getNamespaceDescriptor(namespace);
1736
1737 HRegionInfo[] newRegions = getHRegionInfos(hTableDescriptor, splitKeys);
1738 checkInitialized();
1739 checkCompression(hTableDescriptor);
1740 if (cpHost != null) {
1741 cpHost.preCreateTable(hTableDescriptor, newRegions);
1742 }
1743 LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1744 this.executorService.submit(new CreateTableHandler(this,
1745 this.fileSystemManager, hTableDescriptor, conf,
1746 newRegions, this).prepare());
1747 if (cpHost != null) {
1748 cpHost.postCreateTable(hTableDescriptor, newRegions);
1749 }
1750
1751 }
1752
1753 private void checkCompression(final HTableDescriptor htd)
1754 throws IOException {
1755 if (!this.masterCheckCompression) return;
1756 for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1757 checkCompression(hcd);
1758 }
1759 }
1760
1761 private void checkCompression(final HColumnDescriptor hcd)
1762 throws IOException {
1763 if (!this.masterCheckCompression) return;
1764 CompressionTest.testCompression(hcd.getCompression());
1765 CompressionTest.testCompression(hcd.getCompactionCompression());
1766 }
1767
1768 @Override
1769 public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
1770 throws ServiceException {
1771 HTableDescriptor hTableDescriptor = HTableDescriptor.convert(req.getTableSchema());
1772 byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
1773 try {
1774 createTable(hTableDescriptor,splitKeys);
1775 } catch (IOException ioe) {
1776 throw new ServiceException(ioe);
1777 }
1778 return CreateTableResponse.newBuilder().build();
1779 }
1780
1781 private HRegionInfo[] getHRegionInfos(HTableDescriptor hTableDescriptor,
1782 byte[][] splitKeys) {
1783 HRegionInfo[] hRegionInfos = null;
1784 if (splitKeys == null || splitKeys.length == 0) {
1785 hRegionInfos = new HRegionInfo[]{
1786 new HRegionInfo(hTableDescriptor.getTableName(), null, null)};
1787 } else {
1788 int numRegions = splitKeys.length + 1;
1789 hRegionInfos = new HRegionInfo[numRegions];
1790 byte[] startKey = null;
1791 byte[] endKey = null;
1792 for (int i = 0; i < numRegions; i++) {
1793 endKey = (i == splitKeys.length) ? null : splitKeys[i];
1794 hRegionInfos[i] =
1795 new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey);
1796 startKey = endKey;
1797 }
1798 }
1799 return hRegionInfos;
1800 }
1801
1802 private static boolean isCatalogTable(final TableName tableName) {
1803 return tableName.equals(TableName.META_TABLE_NAME);
1804 }
1805
1806 @Override
1807 public void deleteTable(final TableName tableName) throws IOException {
1808 checkInitialized();
1809 if (cpHost != null) {
1810 cpHost.preDeleteTable(tableName);
1811 }
1812 LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1813 this.executorService.submit(new DeleteTableHandler(tableName, this, this).prepare());
1814 if (cpHost != null) {
1815 cpHost.postDeleteTable(tableName);
1816 }
1817 }
1818
1819 @Override
1820 public DeleteTableResponse deleteTable(RpcController controller, DeleteTableRequest request)
1821 throws ServiceException {
1822 try {
1823 deleteTable(ProtobufUtil.toTableName(request.getTableName()));
1824 } catch (IOException ioe) {
1825 throw new ServiceException(ioe);
1826 }
1827 return DeleteTableResponse.newBuilder().build();
1828 }
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838 @Override
1839 public GetSchemaAlterStatusResponse getSchemaAlterStatus(
1840 RpcController controller, GetSchemaAlterStatusRequest req) throws ServiceException {
1841
1842
1843
1844
1845 TableName tableName = ProtobufUtil.toTableName(req.getTableName());
1846
1847 try {
1848 Pair<Integer,Integer> pair = this.assignmentManager.getReopenStatus(tableName);
1849 GetSchemaAlterStatusResponse.Builder ret = GetSchemaAlterStatusResponse.newBuilder();
1850 ret.setYetToUpdateRegions(pair.getFirst());
1851 ret.setTotalRegions(pair.getSecond());
1852 return ret.build();
1853 } catch (IOException ioe) {
1854 throw new ServiceException(ioe);
1855 }
1856 }
1857
1858 @Override
1859 public void addColumn(final TableName tableName, final HColumnDescriptor column)
1860 throws IOException {
1861 checkInitialized();
1862 if (cpHost != null) {
1863 if (cpHost.preAddColumn(tableName, column)) {
1864 return;
1865 }
1866 }
1867
1868 new TableAddFamilyHandler(tableName, column, this, this).prepare().process();
1869 if (cpHost != null) {
1870 cpHost.postAddColumn(tableName, column);
1871 }
1872 }
1873
1874 @Override
1875 public AddColumnResponse addColumn(RpcController controller, AddColumnRequest req)
1876 throws ServiceException {
1877 try {
1878 addColumn(ProtobufUtil.toTableName(req.getTableName()),
1879 HColumnDescriptor.convert(req.getColumnFamilies()));
1880 } catch (IOException ioe) {
1881 throw new ServiceException(ioe);
1882 }
1883 return AddColumnResponse.newBuilder().build();
1884 }
1885
1886 @Override
1887 public void modifyColumn(TableName tableName, HColumnDescriptor descriptor)
1888 throws IOException {
1889 checkInitialized();
1890 checkCompression(descriptor);
1891 if (cpHost != null) {
1892 if (cpHost.preModifyColumn(tableName, descriptor)) {
1893 return;
1894 }
1895 }
1896 LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1897 new TableModifyFamilyHandler(tableName, descriptor, this, this)
1898 .prepare().process();
1899 if (cpHost != null) {
1900 cpHost.postModifyColumn(tableName, descriptor);
1901 }
1902 }
1903
1904 @Override
1905 public ModifyColumnResponse modifyColumn(RpcController controller, ModifyColumnRequest req)
1906 throws ServiceException {
1907 try {
1908 modifyColumn(ProtobufUtil.toTableName(req.getTableName()),
1909 HColumnDescriptor.convert(req.getColumnFamilies()));
1910 } catch (IOException ioe) {
1911 throw new ServiceException(ioe);
1912 }
1913 return ModifyColumnResponse.newBuilder().build();
1914 }
1915
1916 @Override
1917 public void deleteColumn(final TableName tableName, final byte[] columnName)
1918 throws IOException {
1919 checkInitialized();
1920 if (cpHost != null) {
1921 if (cpHost.preDeleteColumn(tableName, columnName)) {
1922 return;
1923 }
1924 }
1925 LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
1926 new TableDeleteFamilyHandler(tableName, columnName, this, this).prepare().process();
1927 if (cpHost != null) {
1928 cpHost.postDeleteColumn(tableName, columnName);
1929 }
1930 }
1931
1932 @Override
1933 public DeleteColumnResponse deleteColumn(RpcController controller, DeleteColumnRequest req)
1934 throws ServiceException {
1935 try {
1936 deleteColumn(ProtobufUtil.toTableName(req.getTableName()),
1937 req.getColumnName().toByteArray());
1938 } catch (IOException ioe) {
1939 throw new ServiceException(ioe);
1940 }
1941 return DeleteColumnResponse.newBuilder().build();
1942 }
1943
1944 @Override
1945 public void enableTable(final TableName tableName) throws IOException {
1946 checkInitialized();
1947 if (cpHost != null) {
1948 cpHost.preEnableTable(tableName);
1949 }
1950 LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
1951 this.executorService.submit(new EnableTableHandler(this, tableName,
1952 catalogTracker, assignmentManager, tableLockManager, false).prepare());
1953 if (cpHost != null) {
1954 cpHost.postEnableTable(tableName);
1955 }
1956 }
1957
1958 @Override
1959 public EnableTableResponse enableTable(RpcController controller, EnableTableRequest request)
1960 throws ServiceException {
1961 try {
1962 enableTable(ProtobufUtil.toTableName(request.getTableName()));
1963 } catch (IOException ioe) {
1964 throw new ServiceException(ioe);
1965 }
1966 return EnableTableResponse.newBuilder().build();
1967 }
1968
1969 @Override
1970 public void disableTable(final TableName tableName) throws IOException {
1971 checkInitialized();
1972 if (cpHost != null) {
1973 cpHost.preDisableTable(tableName);
1974 }
1975 LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
1976 this.executorService.submit(new DisableTableHandler(this, tableName,
1977 catalogTracker, assignmentManager, tableLockManager, false).prepare());
1978 if (cpHost != null) {
1979 cpHost.postDisableTable(tableName);
1980 }
1981 }
1982
1983 @Override
1984 public DisableTableResponse disableTable(RpcController controller, DisableTableRequest request)
1985 throws ServiceException {
1986 try {
1987 disableTable(ProtobufUtil.toTableName(request.getTableName()));
1988 } catch (IOException ioe) {
1989 throw new ServiceException(ioe);
1990 }
1991 return DisableTableResponse.newBuilder().build();
1992 }
1993
1994
1995
1996
1997
1998
1999
2000 Pair<HRegionInfo, ServerName> getTableRegionForRow(
2001 final TableName tableName, final byte [] rowKey)
2002 throws IOException {
2003 final AtomicReference<Pair<HRegionInfo, ServerName>> result =
2004 new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
2005
2006 MetaScannerVisitor visitor =
2007 new MetaScannerVisitorBase() {
2008 @Override
2009 public boolean processRow(Result data) throws IOException {
2010 if (data == null || data.size() <= 0) {
2011 return true;
2012 }
2013 Pair<HRegionInfo, ServerName> pair = HRegionInfo.getHRegionInfoAndServerName(data);
2014 if (pair == null) {
2015 return false;
2016 }
2017 if (!pair.getFirst().getTable().equals(tableName)) {
2018 return false;
2019 }
2020 result.set(pair);
2021 return true;
2022 }
2023 };
2024
2025 MetaScanner.metaScan(conf, visitor, tableName, rowKey, 1);
2026 return result.get();
2027 }
2028
2029 @Override
2030 public void modifyTable(final TableName tableName, final HTableDescriptor descriptor)
2031 throws IOException {
2032 checkInitialized();
2033 checkCompression(descriptor);
2034 if (cpHost != null) {
2035 cpHost.preModifyTable(tableName, descriptor);
2036 }
2037 LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2038 new ModifyTableHandler(tableName, descriptor, this, this).prepare().process();
2039 if (cpHost != null) {
2040 cpHost.postModifyTable(tableName, descriptor);
2041 }
2042 }
2043
2044 @Override
2045 public ModifyTableResponse modifyTable(RpcController controller, ModifyTableRequest req)
2046 throws ServiceException {
2047 try {
2048 modifyTable(ProtobufUtil.toTableName(req.getTableName()),
2049 HTableDescriptor.convert(req.getTableSchema()));
2050 } catch (IOException ioe) {
2051 throw new ServiceException(ioe);
2052 }
2053 return ModifyTableResponse.newBuilder().build();
2054 }
2055
2056 @Override
2057 public void checkTableModifiable(final TableName tableName)
2058 throws IOException, TableNotFoundException, TableNotDisabledException {
2059 if (isCatalogTable(tableName)) {
2060 throw new IOException("Can't modify catalog tables");
2061 }
2062 if (!MetaReader.tableExists(getCatalogTracker(), tableName)) {
2063 throw new TableNotFoundException(tableName);
2064 }
2065 if (!getAssignmentManager().getZKTable().
2066 isDisabledTable(tableName)) {
2067 throw new TableNotDisabledException(tableName);
2068 }
2069 }
2070
2071 @Override
2072 public GetClusterStatusResponse getClusterStatus(RpcController controller,
2073 GetClusterStatusRequest req)
2074 throws ServiceException {
2075 GetClusterStatusResponse.Builder response = GetClusterStatusResponse.newBuilder();
2076 response.setClusterStatus(getClusterStatus().convert());
2077 return response.build();
2078 }
2079
2080
2081
2082
2083 public ClusterStatus getClusterStatus() {
2084
2085 List<String> backupMasterStrings;
2086 try {
2087 backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2088 this.zooKeeper.backupMasterAddressesZNode);
2089 } catch (KeeperException e) {
2090 LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2091 backupMasterStrings = new ArrayList<String>(0);
2092 }
2093 List<ServerName> backupMasters = new ArrayList<ServerName>(
2094 backupMasterStrings.size());
2095 for (String s: backupMasterStrings) {
2096 try {
2097 byte [] bytes =
2098 ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2099 this.zooKeeper.backupMasterAddressesZNode, s));
2100 if (bytes != null) {
2101 ServerName sn;
2102 try {
2103 sn = ServerName.parseFrom(bytes);
2104 } catch (DeserializationException e) {
2105 LOG.warn("Failed parse, skipping registering backup server", e);
2106 continue;
2107 }
2108 backupMasters.add(sn);
2109 }
2110 } catch (KeeperException e) {
2111 LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2112 "backup servers"), e);
2113 }
2114 }
2115 Collections.sort(backupMasters, new Comparator<ServerName>() {
2116 @Override
2117 public int compare(ServerName s1, ServerName s2) {
2118 return s1.getServerName().compareTo(s2.getServerName());
2119 }});
2120
2121 return new ClusterStatus(VersionInfo.getVersion(),
2122 this.fileSystemManager.getClusterId().toString(),
2123 this.serverManager.getOnlineServers(),
2124 this.serverManager.getDeadServers().copyServerNames(),
2125 this.serverName,
2126 backupMasters,
2127 this.assignmentManager.getRegionStates().getRegionsInTransition(),
2128 this.getCoprocessors(), this.loadBalancerTracker.isBalancerOn());
2129 }
2130
2131 public String getClusterId() {
2132 if (fileSystemManager == null) {
2133 return "";
2134 }
2135 ClusterId id = fileSystemManager.getClusterId();
2136 if (id == null) {
2137 return "";
2138 }
2139 return id.toString();
2140 }
2141
2142
2143
2144
2145
2146
2147
2148
2149 public static String getLoadedCoprocessors() {
2150 return CoprocessorHost.getLoadedCoprocessors().toString();
2151 }
2152
2153
2154
2155
2156 public long getMasterStartTime() {
2157 return masterStartTime;
2158 }
2159
2160
2161
2162
2163 public long getMasterActiveTime() {
2164 return masterActiveTime;
2165 }
2166
2167 public int getRegionServerInfoPort(final ServerName sn) {
2168 RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2169 if (info == null || info.getInfoPort() == 0) {
2170 return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2171 HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2172 }
2173 return info.getInfoPort();
2174 }
2175
2176
2177
2178
2179 public String[] getCoprocessors() {
2180 Set<String> masterCoprocessors =
2181 getCoprocessorHost().getCoprocessors();
2182 return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2183 }
2184
2185 @Override
2186 public void abort(final String msg, final Throwable t) {
2187 if (cpHost != null) {
2188
2189 LOG.fatal("Master server abort: loaded coprocessors are: " +
2190 getLoadedCoprocessors());
2191 }
2192
2193 if (abortNow(msg, t)) {
2194 if (t != null) LOG.fatal(msg, t);
2195 else LOG.fatal(msg);
2196 this.abort = true;
2197 stop("Aborting");
2198 }
2199 }
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218 private boolean tryRecoveringExpiredZKSession() throws InterruptedException,
2219 IOException, KeeperException, ExecutionException {
2220
2221 this.zooKeeper.unregisterAllListeners();
2222
2223
2224 if (this.registeredZKListenersBeforeRecovery != null) {
2225 for (ZooKeeperListener curListener : this.registeredZKListenersBeforeRecovery) {
2226 this.zooKeeper.registerListener(curListener);
2227 }
2228 }
2229
2230 this.zooKeeper.reconnectAfterExpiration();
2231
2232 Callable<Boolean> callable = new Callable<Boolean> () {
2233 @Override
2234 public Boolean call() throws InterruptedException,
2235 IOException, KeeperException {
2236 MonitoredTask status =
2237 TaskMonitor.get().createStatus("Recovering expired ZK session");
2238 try {
2239 if (!becomeActiveMaster(status)) {
2240 return Boolean.FALSE;
2241 }
2242 serverShutdownHandlerEnabled = false;
2243 initialized = false;
2244 finishInitialization(status, true);
2245 return !stopped;
2246 } finally {
2247 status.cleanup();
2248 }
2249 }
2250 };
2251
2252 long timeout =
2253 conf.getLong("hbase.master.zksession.recover.timeout", 300000);
2254 java.util.concurrent.ExecutorService executor =
2255 Executors.newSingleThreadExecutor();
2256 Future<Boolean> result = executor.submit(callable);
2257 executor.shutdown();
2258 if (executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)
2259 && result.isDone()) {
2260 Boolean recovered = result.get();
2261 if (recovered != null) {
2262 return recovered.booleanValue();
2263 }
2264 }
2265 executor.shutdownNow();
2266 return false;
2267 }
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277 private boolean abortNow(final String msg, final Throwable t) {
2278 if (!this.isActiveMaster || this.stopped) {
2279 return true;
2280 }
2281
2282 boolean failFast = conf.getBoolean("fail.fast.expired.active.master", false);
2283 if (t != null && t instanceof KeeperException.SessionExpiredException
2284 && !failFast) {
2285 try {
2286 LOG.info("Primary Master trying to recover from ZooKeeper session " +
2287 "expiry.");
2288 return !tryRecoveringExpiredZKSession();
2289 } catch (Throwable newT) {
2290 LOG.error("Primary master encountered unexpected exception while " +
2291 "trying to recover from ZooKeeper session" +
2292 " expiry. Proceeding with server abort.", newT);
2293 }
2294 }
2295 return true;
2296 }
2297
2298 @Override
2299 public ZooKeeperWatcher getZooKeeper() {
2300 return zooKeeper;
2301 }
2302
2303 @Override
2304 public MasterCoprocessorHost getCoprocessorHost() {
2305 return cpHost;
2306 }
2307
2308 @Override
2309 public ServerName getServerName() {
2310 return this.serverName;
2311 }
2312
2313 @Override
2314 public CatalogTracker getCatalogTracker() {
2315 return catalogTracker;
2316 }
2317
2318 @Override
2319 public AssignmentManager getAssignmentManager() {
2320 return this.assignmentManager;
2321 }
2322
2323 @Override
2324 public TableLockManager getTableLockManager() {
2325 return this.tableLockManager;
2326 }
2327
2328 public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2329 return rsFatals;
2330 }
2331
2332 public void shutdown() {
2333 if (spanReceiverHost != null) {
2334 spanReceiverHost.closeReceivers();
2335 }
2336 if (cpHost != null) {
2337 try {
2338 cpHost.preShutdown();
2339 } catch (IOException ioe) {
2340 LOG.error("Error call master coprocessor preShutdown()", ioe);
2341 }
2342 }
2343 if (mxBean != null) {
2344 MBeanUtil.unregisterMBean(mxBean);
2345 mxBean = null;
2346 }
2347 if (this.assignmentManager != null) this.assignmentManager.shutdown();
2348 if (this.serverManager != null) this.serverManager.shutdownCluster();
2349 try {
2350 if (this.clusterStatusTracker != null){
2351 this.clusterStatusTracker.setClusterDown();
2352 }
2353 } catch (KeeperException e) {
2354 LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2355 }
2356 }
2357
2358 @Override
2359 public ShutdownResponse shutdown(RpcController controller, ShutdownRequest request)
2360 throws ServiceException {
2361 LOG.info(getClientIdAuditPrefix() + " shutdown");
2362 shutdown();
2363 return ShutdownResponse.newBuilder().build();
2364 }
2365
2366 public void stopMaster() {
2367 if (cpHost != null) {
2368 try {
2369 cpHost.preStopMaster();
2370 } catch (IOException ioe) {
2371 LOG.error("Error call master coprocessor preStopMaster()", ioe);
2372 }
2373 }
2374 stop("Stopped by " + Thread.currentThread().getName());
2375 }
2376
2377 @Override
2378 public StopMasterResponse stopMaster(RpcController controller, StopMasterRequest request)
2379 throws ServiceException {
2380 LOG.info(getClientIdAuditPrefix() + " stop");
2381 stopMaster();
2382 return StopMasterResponse.newBuilder().build();
2383 }
2384
2385 @Override
2386 public void stop(final String why) {
2387 LOG.info(why);
2388 this.stopped = true;
2389
2390 stopSleeper.skipSleepCycle();
2391
2392 if (this.activeMasterManager != null) {
2393 synchronized (this.activeMasterManager.clusterHasActiveMaster) {
2394 this.activeMasterManager.clusterHasActiveMaster.notifyAll();
2395 }
2396 }
2397
2398
2399 if (this.catalogTracker != null && this.serverManager.getOnlineServers().isEmpty()) {
2400 this.catalogTracker.stop();
2401 }
2402 }
2403
2404 @Override
2405 public boolean isStopped() {
2406 return this.stopped;
2407 }
2408
2409 @Override
2410 public boolean isAborted() {
2411 return this.abort;
2412 }
2413
2414 void checkInitialized() throws PleaseHoldException {
2415 if (!this.initialized) {
2416 throw new PleaseHoldException("Master is initializing");
2417 }
2418 }
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428 public boolean isActiveMaster() {
2429 return isActiveMaster;
2430 }
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441 @Override
2442 public boolean isInitialized() {
2443 return initialized;
2444 }
2445
2446
2447
2448
2449
2450
2451 @Override
2452 public boolean isServerShutdownHandlerEnabled() {
2453 return this.serverShutdownHandlerEnabled;
2454 }
2455
2456
2457
2458
2459
2460 public boolean isInitializationStartsMetaRegionAssignment() {
2461 return this.initializationBeforeMetaAssignment;
2462 }
2463
2464 @Override
2465 public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req)
2466 throws ServiceException {
2467 try {
2468 final byte [] regionName = req.getRegion().getValue().toByteArray();
2469 RegionSpecifierType type = req.getRegion().getType();
2470 AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
2471
2472 checkInitialized();
2473 if (type != RegionSpecifierType.REGION_NAME) {
2474 LOG.warn("assignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2475 + " actual: " + type);
2476 }
2477 HRegionInfo regionInfo = assignmentManager.getRegionStates().getRegionInfo(regionName);
2478 if (regionInfo == null) throw new UnknownRegionException(Bytes.toString(regionName));
2479 if (cpHost != null) {
2480 if (cpHost.preAssign(regionInfo)) {
2481 return arr;
2482 }
2483 }
2484 LOG.info(getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString());
2485 assignmentManager.assign(regionInfo, true, true);
2486 if (cpHost != null) {
2487 cpHost.postAssign(regionInfo);
2488 }
2489
2490 return arr;
2491 } catch (IOException ioe) {
2492 throw new ServiceException(ioe);
2493 }
2494 }
2495
2496 public void assignRegion(HRegionInfo hri) {
2497 assignmentManager.assign(hri, true);
2498 }
2499
2500 @Override
2501 public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req)
2502 throws ServiceException {
2503 try {
2504 final byte [] regionName = req.getRegion().getValue().toByteArray();
2505 RegionSpecifierType type = req.getRegion().getType();
2506 final boolean force = req.getForce();
2507 UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build();
2508
2509 checkInitialized();
2510 if (type != RegionSpecifierType.REGION_NAME) {
2511 LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2512 + " actual: " + type);
2513 }
2514 Pair<HRegionInfo, ServerName> pair =
2515 MetaReader.getRegion(this.catalogTracker, regionName);
2516 if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
2517 HRegionInfo hri = pair.getFirst();
2518 if (cpHost != null) {
2519 if (cpHost.preUnassign(hri, force)) {
2520 return urr;
2521 }
2522 }
2523 LOG.debug(getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString()
2524 + " in current location if it is online and reassign.force=" + force);
2525 this.assignmentManager.unassign(hri, force);
2526 if (this.assignmentManager.getRegionStates().isRegionOffline(hri)) {
2527 LOG.debug("Region " + hri.getRegionNameAsString()
2528 + " is not online on any region server, reassigning it.");
2529 assignRegion(hri);
2530 }
2531 if (cpHost != null) {
2532 cpHost.postUnassign(hri, force);
2533 }
2534
2535 return urr;
2536 } catch (IOException ioe) {
2537 throw new ServiceException(ioe);
2538 }
2539 }
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549 @Override
2550 public GetTableDescriptorsResponse getTableDescriptors(
2551 RpcController controller, GetTableDescriptorsRequest req) throws ServiceException {
2552 List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
2553 List<TableName> tableNameList = new ArrayList<TableName>();
2554 for(HBaseProtos.TableName tableNamePB: req.getTableNamesList()) {
2555 tableNameList.add(ProtobufUtil.toTableName(tableNamePB));
2556 }
2557 boolean bypass = false;
2558 if (this.cpHost != null) {
2559 try {
2560 bypass = this.cpHost.preGetTableDescriptors(tableNameList, descriptors);
2561 } catch (IOException ioe) {
2562 throw new ServiceException(ioe);
2563 }
2564 }
2565
2566 if (!bypass) {
2567 if (req.getTableNamesCount() == 0) {
2568
2569 Map<String, HTableDescriptor> descriptorMap = null;
2570 try {
2571 descriptorMap = this.tableDescriptors.getAll();
2572 } catch (IOException e) {
2573 LOG.warn("Failed getting all descriptors", e);
2574 }
2575 if (descriptorMap != null) {
2576 for(HTableDescriptor desc: descriptorMap.values()) {
2577 if(!desc.getTableName().isSystemTable()) {
2578 descriptors.add(desc);
2579 }
2580 }
2581 }
2582 } else {
2583 for (TableName s: tableNameList) {
2584 try {
2585 HTableDescriptor desc = this.tableDescriptors.get(s);
2586 if (desc != null) {
2587 descriptors.add(desc);
2588 }
2589 } catch (IOException e) {
2590 LOG.warn("Failed getting descriptor for " + s, e);
2591 }
2592 }
2593 }
2594
2595 if (this.cpHost != null) {
2596 try {
2597 this.cpHost.postGetTableDescriptors(descriptors);
2598 } catch (IOException ioe) {
2599 throw new ServiceException(ioe);
2600 }
2601 }
2602 }
2603
2604 GetTableDescriptorsResponse.Builder builder = GetTableDescriptorsResponse.newBuilder();
2605 for (HTableDescriptor htd: descriptors) {
2606 builder.addTableSchema(htd.convert());
2607 }
2608 return builder.build();
2609 }
2610
2611
2612
2613
2614
2615
2616
2617
2618 @Override
2619 public GetTableNamesResponse getTableNames(
2620 RpcController controller, GetTableNamesRequest req) throws ServiceException {
2621 try {
2622 Collection<HTableDescriptor> descriptors = this.tableDescriptors.getAll().values();
2623 GetTableNamesResponse.Builder builder = GetTableNamesResponse.newBuilder();
2624 for (HTableDescriptor descriptor: descriptors) {
2625 if (descriptor.getTableName().isSystemTable()) {
2626 continue;
2627 }
2628 builder.addTableNames(ProtobufUtil.toProtoTableName(descriptor.getTableName()));
2629 }
2630 return builder.build();
2631 } catch (IOException e) {
2632 throw new ServiceException(e);
2633 }
2634 }
2635
2636
2637
2638
2639
2640
2641
2642 public double getAverageLoad() {
2643 if (this.assignmentManager == null) {
2644 return 0;
2645 }
2646
2647 RegionStates regionStates = this.assignmentManager.getRegionStates();
2648 if (regionStates == null) {
2649 return 0;
2650 }
2651 return regionStates.getAverageLoad();
2652 }
2653
2654
2655
2656
2657
2658
2659
2660
2661 @Override
2662 public OfflineRegionResponse offlineRegion(RpcController controller, OfflineRegionRequest request)
2663 throws ServiceException {
2664 final byte [] regionName = request.getRegion().getValue().toByteArray();
2665 RegionSpecifierType type = request.getRegion().getType();
2666 if (type != RegionSpecifierType.REGION_NAME) {
2667 LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2668 + " actual: " + type);
2669 }
2670
2671 try {
2672 Pair<HRegionInfo, ServerName> pair =
2673 MetaReader.getRegion(this.catalogTracker, regionName);
2674 if (pair == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
2675 HRegionInfo hri = pair.getFirst();
2676 if (cpHost != null) {
2677 cpHost.preRegionOffline(hri);
2678 }
2679 LOG.info(getClientIdAuditPrefix() + " offline " + hri.getRegionNameAsString());
2680 this.assignmentManager.regionOffline(hri);
2681 if (cpHost != null) {
2682 cpHost.postRegionOffline(hri);
2683 }
2684 } catch (IOException ioe) {
2685 throw new ServiceException(ioe);
2686 }
2687 return OfflineRegionResponse.newBuilder().build();
2688 }
2689
2690 @Override
2691 public boolean registerService(Service instance) {
2692
2693
2694
2695 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2696 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
2697 LOG.error("Coprocessor service "+serviceDesc.getFullName()+
2698 " already registered, rejecting request from "+instance
2699 );
2700 return false;
2701 }
2702
2703 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
2704 if (LOG.isDebugEnabled()) {
2705 LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
2706 }
2707 return true;
2708 }
2709
2710 @Override
2711 public ClientProtos.CoprocessorServiceResponse execMasterService(final RpcController controller,
2712 final ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
2713 try {
2714 ServerRpcController execController = new ServerRpcController();
2715
2716 ClientProtos.CoprocessorServiceCall call = request.getCall();
2717 String serviceName = call.getServiceName();
2718 String methodName = call.getMethodName();
2719 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
2720 throw new UnknownProtocolException(null,
2721 "No registered master coprocessor service found for name "+serviceName);
2722 }
2723
2724 Service service = coprocessorServiceHandlers.get(serviceName);
2725 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
2726 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
2727 if (methodDesc == null) {
2728 throw new UnknownProtocolException(service.getClass(),
2729 "Unknown method "+methodName+" called on master service "+serviceName);
2730 }
2731
2732
2733 Message execRequest = service.getRequestPrototype(methodDesc).newBuilderForType()
2734 .mergeFrom(call.getRequest()).build();
2735 final Message.Builder responseBuilder =
2736 service.getResponsePrototype(methodDesc).newBuilderForType();
2737 service.callMethod(methodDesc, execController, execRequest, new RpcCallback<Message>() {
2738 @Override
2739 public void run(Message message) {
2740 if (message != null) {
2741 responseBuilder.mergeFrom(message);
2742 }
2743 }
2744 });
2745 Message execResult = responseBuilder.build();
2746
2747 if (execController.getFailedOn() != null) {
2748 throw execController.getFailedOn();
2749 }
2750 ClientProtos.CoprocessorServiceResponse.Builder builder =
2751 ClientProtos.CoprocessorServiceResponse.newBuilder();
2752 builder.setRegion(RequestConverter.buildRegionSpecifier(
2753 RegionSpecifierType.REGION_NAME, HConstants.EMPTY_BYTE_ARRAY));
2754 builder.setValue(
2755 builder.getValueBuilder().setName(execResult.getClass().getName())
2756 .setValue(execResult.toByteString()));
2757 return builder.build();
2758 } catch (IOException ie) {
2759 throw new ServiceException(ie);
2760 }
2761 }
2762
2763
2764
2765
2766
2767
2768
2769 public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2770 final Configuration conf) {
2771 try {
2772 Constructor<? extends HMaster> c =
2773 masterClass.getConstructor(Configuration.class);
2774 return c.newInstance(conf);
2775 } catch (InvocationTargetException ite) {
2776 Throwable target = ite.getTargetException() != null?
2777 ite.getTargetException(): ite;
2778 if (target.getCause() != null) target = target.getCause();
2779 throw new RuntimeException("Failed construction of Master: " +
2780 masterClass.toString(), target);
2781 } catch (Exception e) {
2782 throw new RuntimeException("Failed construction of Master: " +
2783 masterClass.toString() + ((e.getCause() != null)?
2784 e.getCause().getMessage(): ""), e);
2785 }
2786 }
2787
2788
2789
2790
2791 public static void main(String [] args) {
2792 VersionInfo.logVersion();
2793 new HMasterCommandLine(HMaster.class).doMain(args);
2794 }
2795
2796 public HFileCleaner getHFileCleaner() {
2797 return this.hfileCleaner;
2798 }
2799
2800
2801
2802
2803
2804 public SnapshotManager getSnapshotManagerForTesting() {
2805 return this.snapshotManager;
2806 }
2807
2808
2809
2810
2811
2812 @Override
2813 public SnapshotResponse snapshot(RpcController controller, SnapshotRequest request)
2814 throws ServiceException {
2815 try {
2816 this.snapshotManager.checkSnapshotSupport();
2817 } catch (UnsupportedOperationException e) {
2818 throw new ServiceException(e);
2819 }
2820
2821 LOG.info(getClientIdAuditPrefix() + " snapshot request for:" +
2822 ClientSnapshotDescriptionUtils.toString(request.getSnapshot()));
2823
2824 SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(request.getSnapshot(),
2825 this.conf);
2826 try {
2827 snapshotManager.takeSnapshot(snapshot);
2828 } catch (IOException e) {
2829 throw new ServiceException(e);
2830 }
2831
2832
2833 long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(conf, snapshot.getType(),
2834 SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
2835 return SnapshotResponse.newBuilder().setExpectedTimeout(waitTime).build();
2836 }
2837
2838
2839
2840
2841 @Override
2842 public GetCompletedSnapshotsResponse getCompletedSnapshots(RpcController controller,
2843 GetCompletedSnapshotsRequest request) throws ServiceException {
2844 try {
2845 GetCompletedSnapshotsResponse.Builder builder = GetCompletedSnapshotsResponse.newBuilder();
2846 List<SnapshotDescription> snapshots = snapshotManager.getCompletedSnapshots();
2847
2848
2849 for (SnapshotDescription snapshot : snapshots) {
2850 builder.addSnapshots(snapshot);
2851 }
2852 return builder.build();
2853 } catch (IOException e) {
2854 throw new ServiceException(e);
2855 }
2856 }
2857
2858
2859
2860
2861
2862
2863
2864
2865 @Override
2866 public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
2867 DeleteSnapshotRequest request) throws ServiceException {
2868 try {
2869 this.snapshotManager.checkSnapshotSupport();
2870 } catch (UnsupportedOperationException e) {
2871 throw new ServiceException(e);
2872 }
2873
2874 try {
2875 LOG.info(getClientIdAuditPrefix() + " delete " + request.getSnapshot());
2876 snapshotManager.deleteSnapshot(request.getSnapshot());
2877 return DeleteSnapshotResponse.newBuilder().build();
2878 } catch (IOException e) {
2879 throw new ServiceException(e);
2880 }
2881 }
2882
2883
2884
2885
2886
2887
2888
2889
2890 @Override
2891 public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
2892 IsSnapshotDoneRequest request) throws ServiceException {
2893 LOG.debug("Checking to see if snapshot from request:" +
2894 ClientSnapshotDescriptionUtils.toString(request.getSnapshot()) + " is done");
2895 try {
2896 IsSnapshotDoneResponse.Builder builder = IsSnapshotDoneResponse.newBuilder();
2897 boolean done = snapshotManager.isSnapshotDone(request.getSnapshot());
2898 builder.setDone(done);
2899 return builder.build();
2900 } catch (IOException e) {
2901 throw new ServiceException(e);
2902 }
2903 }
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918 @Override
2919 public RestoreSnapshotResponse restoreSnapshot(RpcController controller,
2920 RestoreSnapshotRequest request) throws ServiceException {
2921 try {
2922 this.snapshotManager.checkSnapshotSupport();
2923 } catch (UnsupportedOperationException e) {
2924 throw new ServiceException(e);
2925 }
2926
2927
2928 try {
2929 TableName dstTable = TableName.valueOf(request.getSnapshot().getTable());
2930 getNamespaceDescriptor(dstTable.getNamespaceAsString());
2931 } catch (IOException ioe) {
2932 throw new ServiceException(ioe);
2933 }
2934
2935 try {
2936 SnapshotDescription reqSnapshot = request.getSnapshot();
2937 snapshotManager.restoreSnapshot(reqSnapshot);
2938 return RestoreSnapshotResponse.newBuilder().build();
2939 } catch (IOException e) {
2940 throw new ServiceException(e);
2941 }
2942 }
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954 @Override
2955 public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(RpcController controller,
2956 IsRestoreSnapshotDoneRequest request) throws ServiceException {
2957 try {
2958 SnapshotDescription snapshot = request.getSnapshot();
2959 IsRestoreSnapshotDoneResponse.Builder builder = IsRestoreSnapshotDoneResponse.newBuilder();
2960 boolean done = snapshotManager.isRestoreDone(snapshot);
2961 builder.setDone(done);
2962 return builder.build();
2963 } catch (IOException e) {
2964 throw new ServiceException(e);
2965 }
2966 }
2967
2968
2969
2970
2971
2972 @Override
2973 public ExecProcedureResponse execProcedure(RpcController controller,
2974 ExecProcedureRequest request) throws ServiceException {
2975 ProcedureDescription desc = request.getProcedure();
2976 MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
2977 .getSignature());
2978 if (mpm == null) {
2979 throw new ServiceException("The procedure is not registered: "
2980 + desc.getSignature());
2981 }
2982
2983 LOG.info(getClientIdAuditPrefix() + " procedure request for: "
2984 + desc.getSignature());
2985
2986 try {
2987 mpm.execProcedure(desc);
2988 } catch (IOException e) {
2989 throw new ServiceException(e);
2990 }
2991
2992
2993
2994 long waitTime = SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME;
2995 return ExecProcedureResponse.newBuilder().setExpectedTimeout(waitTime)
2996 .build();
2997 }
2998
2999
3000
3001
3002
3003
3004
3005
3006 @Override
3007 public IsProcedureDoneResponse isProcedureDone(RpcController controller,
3008 IsProcedureDoneRequest request) throws ServiceException {
3009 ProcedureDescription desc = request.getProcedure();
3010 MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
3011 .getSignature());
3012 if (mpm == null) {
3013 throw new ServiceException("The procedure is not registered: "
3014 + desc.getSignature());
3015 }
3016 LOG.debug("Checking to see if procedure from request:"
3017 + desc.getSignature() + " is done");
3018
3019 try {
3020 IsProcedureDoneResponse.Builder builder = IsProcedureDoneResponse
3021 .newBuilder();
3022 boolean done = mpm.isProcedureDone(desc);
3023 builder.setDone(done);
3024 return builder.build();
3025 } catch (IOException e) {
3026 throw new ServiceException(e);
3027 }
3028 }
3029
3030 @Override
3031 public ModifyNamespaceResponse modifyNamespace(RpcController controller,
3032 ModifyNamespaceRequest request) throws ServiceException {
3033 try {
3034 modifyNamespace(ProtobufUtil.toNamespaceDescriptor(request.getNamespaceDescriptor()));
3035 return ModifyNamespaceResponse.getDefaultInstance();
3036 } catch (IOException e) {
3037 throw new ServiceException(e);
3038 }
3039 }
3040
3041 @Override
3042 public CreateNamespaceResponse createNamespace(RpcController controller,
3043 CreateNamespaceRequest request) throws ServiceException {
3044 try {
3045 createNamespace(ProtobufUtil.toNamespaceDescriptor(request.getNamespaceDescriptor()));
3046 return CreateNamespaceResponse.getDefaultInstance();
3047 } catch (IOException e) {
3048 throw new ServiceException(e);
3049 }
3050 }
3051
3052 @Override
3053 public DeleteNamespaceResponse deleteNamespace(RpcController controller,
3054 DeleteNamespaceRequest request) throws ServiceException {
3055 try {
3056 deleteNamespace(request.getNamespaceName());
3057 return DeleteNamespaceResponse.getDefaultInstance();
3058 } catch (IOException e) {
3059 throw new ServiceException(e);
3060 }
3061 }
3062
3063 @Override
3064 public GetNamespaceDescriptorResponse getNamespaceDescriptor(
3065 RpcController controller, GetNamespaceDescriptorRequest request)
3066 throws ServiceException {
3067 try {
3068 return GetNamespaceDescriptorResponse.newBuilder()
3069 .setNamespaceDescriptor(
3070 ProtobufUtil.toProtoNamespaceDescriptor(getNamespaceDescriptor(request.getNamespaceName())))
3071 .build();
3072 } catch (IOException e) {
3073 throw new ServiceException(e);
3074 }
3075 }
3076
3077 @Override
3078 public ListNamespaceDescriptorsResponse listNamespaceDescriptors(
3079 RpcController controller, ListNamespaceDescriptorsRequest request)
3080 throws ServiceException {
3081 try {
3082 ListNamespaceDescriptorsResponse.Builder response =
3083 ListNamespaceDescriptorsResponse.newBuilder();
3084 for(NamespaceDescriptor ns: listNamespaceDescriptors()) {
3085 response.addNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(ns));
3086 }
3087 return response.build();
3088 } catch (IOException e) {
3089 throw new ServiceException(e);
3090 }
3091 }
3092
3093 @Override
3094 public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
3095 RpcController controller, ListTableDescriptorsByNamespaceRequest request)
3096 throws ServiceException {
3097 try {
3098 ListTableDescriptorsByNamespaceResponse.Builder b =
3099 ListTableDescriptorsByNamespaceResponse.newBuilder();
3100 for(HTableDescriptor htd: listTableDescriptorsByNamespace(request.getNamespaceName())) {
3101 b.addTableSchema(htd.convert());
3102 }
3103 return b.build();
3104 } catch (IOException e) {
3105 throw new ServiceException(e);
3106 }
3107 }
3108
3109 @Override
3110 public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
3111 RpcController controller, ListTableNamesByNamespaceRequest request)
3112 throws ServiceException {
3113 try {
3114 ListTableNamesByNamespaceResponse.Builder b =
3115 ListTableNamesByNamespaceResponse.newBuilder();
3116 for (TableName tableName: listTableNamesByNamespace(request.getNamespaceName())) {
3117 b.addTableName(ProtobufUtil.toProtoTableName(tableName));
3118 }
3119 return b.build();
3120 } catch (IOException e) {
3121 throw new ServiceException(e);
3122 }
3123 }
3124
3125 private boolean isHealthCheckerConfigured() {
3126 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3127 return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3128 }
3129
3130 @Override
3131 public void createNamespace(NamespaceDescriptor descriptor) throws IOException {
3132 TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
3133 if (cpHost != null) {
3134 if (cpHost.preCreateNamespace(descriptor)) {
3135 return;
3136 }
3137 }
3138 LOG.info(getClientIdAuditPrefix() + " creating " + descriptor);
3139 tableNamespaceManager.create(descriptor);
3140 if (cpHost != null) {
3141 cpHost.postCreateNamespace(descriptor);
3142 }
3143 }
3144
3145 @Override
3146 public void modifyNamespace(NamespaceDescriptor descriptor) throws IOException {
3147 TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
3148 if (cpHost != null) {
3149 if (cpHost.preModifyNamespace(descriptor)) {
3150 return;
3151 }
3152 }
3153 LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
3154 tableNamespaceManager.update(descriptor);
3155 if (cpHost != null) {
3156 cpHost.postModifyNamespace(descriptor);
3157 }
3158 }
3159
3160 @Override
3161 public void deleteNamespace(String name) throws IOException {
3162 if (cpHost != null) {
3163 if (cpHost.preDeleteNamespace(name)) {
3164 return;
3165 }
3166 }
3167 LOG.info(getClientIdAuditPrefix() + " delete " + name);
3168 tableNamespaceManager.remove(name);
3169 if (cpHost != null) {
3170 cpHost.postDeleteNamespace(name);
3171 }
3172 }
3173
3174 @Override
3175 public NamespaceDescriptor getNamespaceDescriptor(String name) throws IOException {
3176 boolean ready = tableNamespaceManager != null &&
3177 tableNamespaceManager.isTableAvailableAndInitialized();
3178 if (!ready) {
3179 throw new IOException("Table Namespace Manager not ready yet, try again later");
3180 }
3181 NamespaceDescriptor nsd = tableNamespaceManager.get(name);
3182 if (nsd == null) {
3183 throw new NamespaceNotFoundException(name);
3184 }
3185 return nsd;
3186 }
3187
3188 @Override
3189 public List<NamespaceDescriptor> listNamespaceDescriptors() throws IOException {
3190 return Lists.newArrayList(tableNamespaceManager.list());
3191 }
3192
3193 @Override
3194 public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
3195 getNamespaceDescriptor(name);
3196 return Lists.newArrayList(tableDescriptors.getByNamespace(name).values());
3197 }
3198
3199 @Override
3200 public List<TableName> listTableNamesByNamespace(String name) throws IOException {
3201 List<TableName> tableNames = Lists.newArrayList();
3202 getNamespaceDescriptor(name);
3203 for (HTableDescriptor descriptor: tableDescriptors.getByNamespace(name).values()) {
3204 tableNames.add(descriptor.getTableName());
3205 }
3206 return tableNames;
3207 }
3208
3209 }