1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.lang.annotation.Retention;
25 import java.lang.annotation.RetentionPolicy;
26 import java.lang.management.ManagementFactory;
27 import java.lang.management.MemoryUsage;
28 import java.lang.reflect.Constructor;
29 import java.net.BindException;
30 import java.net.InetSocketAddress;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.Comparator;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.NavigableMap;
42 import java.util.Random;
43 import java.util.Set;
44 import java.util.SortedMap;
45 import java.util.TreeMap;
46 import java.util.TreeSet;
47 import java.util.concurrent.atomic.AtomicLong;
48 import java.util.concurrent.ConcurrentHashMap;
49 import java.util.concurrent.ConcurrentMap;
50 import java.util.concurrent.ConcurrentSkipListMap;
51 import java.util.concurrent.locks.ReentrantReadWriteLock;
52
53 import javax.management.ObjectName;
54
55 import com.google.common.annotations.VisibleForTesting;
56 import org.apache.hadoop.hbase.util.ByteStringer;
57 import org.apache.commons.logging.Log;
58 import org.apache.commons.logging.LogFactory;
59 import org.apache.hadoop.classification.InterfaceAudience;
60 import org.apache.hadoop.conf.Configuration;
61 import org.apache.hadoop.fs.FileSystem;
62 import org.apache.hadoop.fs.Path;
63 import org.apache.hadoop.hbase.Cell;
64 import org.apache.hadoop.hbase.CellScannable;
65 import org.apache.hadoop.hbase.CellScanner;
66 import org.apache.hadoop.hbase.CellUtil;
67 import org.apache.hadoop.hbase.Chore;
68 import org.apache.hadoop.hbase.ClockOutOfSyncException;
69 import org.apache.hadoop.hbase.DoNotRetryIOException;
70 import org.apache.hadoop.hbase.HBaseConfiguration;
71 import org.apache.hadoop.hbase.HBaseIOException;
72 import org.apache.hadoop.hbase.HConstants;
73 import org.apache.hadoop.hbase.HRegionInfo;
74 import org.apache.hadoop.hbase.HTableDescriptor;
75 import org.apache.hadoop.hbase.HealthCheckChore;
76 import org.apache.hadoop.hbase.KeyValueUtil;
77 import org.apache.hadoop.hbase.NotServingRegionException;
78 import org.apache.hadoop.hbase.RemoteExceptionHandler;
79 import org.apache.hadoop.hbase.ServerName;
80 import org.apache.hadoop.hbase.Stoppable;
81 import org.apache.hadoop.hbase.TableDescriptors;
82 import org.apache.hadoop.hbase.TableName;
83 import org.apache.hadoop.hbase.UnknownScannerException;
84 import org.apache.hadoop.hbase.YouAreDeadException;
85 import org.apache.hadoop.hbase.ZNodeClearer;
86 import org.apache.hadoop.hbase.catalog.CatalogTracker;
87 import org.apache.hadoop.hbase.catalog.MetaEditor;
88 import org.apache.hadoop.hbase.catalog.MetaReader;
89 import org.apache.hadoop.hbase.client.Append;
90 import org.apache.hadoop.hbase.client.Delete;
91 import org.apache.hadoop.hbase.client.Get;
92 import org.apache.hadoop.hbase.client.HConnectionManager;
93 import org.apache.hadoop.hbase.client.Increment;
94 import org.apache.hadoop.hbase.client.Mutation;
95 import org.apache.hadoop.hbase.client.Put;
96 import org.apache.hadoop.hbase.client.Result;
97 import org.apache.hadoop.hbase.client.RowMutations;
98 import org.apache.hadoop.hbase.client.Scan;
99 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
100 import org.apache.hadoop.hbase.DroppedSnapshotException;
101 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
102 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
103 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
104 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
105 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
106 import org.apache.hadoop.hbase.executor.ExecutorService;
107 import org.apache.hadoop.hbase.executor.ExecutorType;
108 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
109 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
110 import org.apache.hadoop.hbase.fs.HFileSystem;
111 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
112 import org.apache.hadoop.hbase.io.hfile.HFile;
113 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
114 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
115 import org.apache.hadoop.hbase.ipc.PriorityFunction;
116 import org.apache.hadoop.hbase.ipc.RpcCallContext;
117 import org.apache.hadoop.hbase.ipc.RpcClient;
118 import org.apache.hadoop.hbase.ipc.RpcServer;
119 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
120 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
121 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
122 import org.apache.hadoop.hbase.ipc.ServerRpcController;
123 import org.apache.hadoop.hbase.master.SplitLogManager;
124 import org.apache.hadoop.hbase.master.TableLockManager;
125 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
126 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
127 import org.apache.hadoop.hbase.protobuf.RequestConverter;
128 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
130 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
134 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
137 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
140 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
148 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
150 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
151 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
152 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
153 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
154 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
155 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
156 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
157 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
158 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
159 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
160 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
161 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
162 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
163 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
164 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
165 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
166 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
167 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
168 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
169 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
170 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
171 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
172 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
173 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
174 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
175 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
176 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
177 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
178 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
179 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
180 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
181 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
182 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
183 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
184 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
185 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
186 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
187 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
188 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
189 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
190 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
191 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
192 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
193 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
194 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
195 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
196 import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
197 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
198 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
199 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
200 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
201 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
202 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
203 import org.apache.hadoop.hbase.regionserver.wal.HLog;
204 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
205 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
206 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
207 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
208 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
209 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
210 import org.apache.hadoop.hbase.security.UserProvider;
211 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
212 import org.apache.hadoop.hbase.util.Bytes;
213 import org.apache.hadoop.hbase.util.CompressionTest;
214 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
215 import org.apache.hadoop.hbase.util.FSTableDescriptors;
216 import org.apache.hadoop.hbase.util.FSUtils;
217 import org.apache.hadoop.hbase.util.InfoServer;
218 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
219 import org.apache.hadoop.hbase.util.Pair;
220 import org.apache.hadoop.hbase.util.Sleeper;
221 import org.apache.hadoop.hbase.util.Strings;
222 import org.apache.hadoop.hbase.util.Threads;
223 import org.apache.hadoop.hbase.util.VersionInfo;
224 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
225 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
226 import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
227 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
228 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
229 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
230 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
231 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
232 import org.apache.hadoop.ipc.RemoteException;
233 import org.apache.hadoop.metrics.util.MBeanUtil;
234 import org.apache.hadoop.net.DNS;
235 import org.apache.hadoop.util.ReflectionUtils;
236 import org.apache.hadoop.util.StringUtils;
237 import org.apache.zookeeper.KeeperException;
238 import org.apache.zookeeper.data.Stat;
239 import org.cliffc.high_scale_lib.Counter;
240
241 import com.google.protobuf.BlockingRpcChannel;
242 import com.google.protobuf.ByteString;
243 import com.google.protobuf.Message;
244 import com.google.protobuf.RpcController;
245 import com.google.protobuf.ServiceException;
246 import com.google.protobuf.TextFormat;
247
248
249
250
251
252 @InterfaceAudience.Private
253 @SuppressWarnings("deprecation")
254 public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,
255 AdminProtos.AdminService.BlockingInterface, Runnable, RegionServerServices,
256 HBaseRPCErrorHandler, LastSequenceId {
257
258 public static final Log LOG = LogFactory.getLog(HRegionServer.class);
259
260 private final Random rand;
261
262 private final AtomicLong scannerIdGen = new AtomicLong(0L);
263
264
265
266
267
268 protected static final String OPEN = "OPEN";
269 protected static final String CLOSE = "CLOSE";
270
271
272
273
274 protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
275 new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
276
277
278 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
279 "hbase.region.server.rpc.scheduler.factory.class";
280
281 protected long maxScannerResultSize;
282
283
284 protected MemStoreFlusher cacheFlusher;
285
286
287 protected CatalogTracker catalogTracker;
288
289
290 @SuppressWarnings("unused")
291 private RecoveringRegionWatcher recoveringRegionWatcher;
292
293
294
295
296 protected TableDescriptors tableDescriptors;
297
298
299 protected ReplicationSourceService replicationSourceHandler;
300 protected ReplicationSinkService replicationSinkHandler;
301
302
303 public CompactSplitThread compactSplitThread;
304
305 final ConcurrentHashMap<String, RegionScannerHolder> scanners =
306 new ConcurrentHashMap<String, RegionScannerHolder>();
307
308
309
310
311
312 protected final Map<String, HRegion> onlineRegions =
313 new ConcurrentHashMap<String, HRegion>();
314
315
316
317
318
319
320
321
322
323
324 protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
325 new ConcurrentHashMap<String, InetSocketAddress[]>();
326
327
328
329
330
331 protected final Map<String, HRegion> recoveringRegions = Collections
332 .synchronizedMap(new HashMap<String, HRegion>());
333
334
335 protected Leases leases;
336
337
338 protected ExecutorService service;
339
340
341 final Counter requestCount = new Counter();
342
343
344 protected volatile boolean fsOk;
345 protected HFileSystem fs;
346
347
348
349
350 protected volatile boolean stopped = false;
351
352
353
354 protected volatile boolean abortRequested;
355
356
357 private RegionServerInfo.Builder rsInfo;
358
359 ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
360
361
362
363 private boolean stopping = false;
364
365 private volatile boolean killed = false;
366
367 protected final Configuration conf;
368
369 private Path rootDir;
370
371 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
372
373 final int numRetries;
374 protected final int threadWakeFrequency;
375 private final int msgInterval;
376
377 protected final int numRegionsToReport;
378
379
380 private RegionServerStatusService.BlockingInterface rssStub;
381
382 RpcClient rpcClient;
383
384
385
386 RpcServerInterface rpcServer;
387
388 private final InetSocketAddress isa;
389 private UncaughtExceptionHandler uncaughtExceptionHandler;
390
391
392
393
394 InfoServer infoServer;
395 private JvmPauseMonitor pauseMonitor;
396
397
398 public static final String REGIONSERVER = "regionserver";
399
400
401 public static final String REGIONSERVER_CONF = "regionserver_conf";
402
403 private MetricsRegionServer metricsRegionServer;
404 private SpanReceiverHost spanReceiverHost;
405
406
407
408
409 Chore compactionChecker;
410
411
412
413
414 Chore periodicFlusher;
415
416
417
418 protected volatile HLog hlog;
419
420
421 protected volatile HLog hlogForMeta;
422
423 LogRoller hlogRoller;
424 LogRoller metaHLogRoller;
425
426
427 protected volatile boolean isOnline;
428
429
430 private ZooKeeperWatcher zooKeeper;
431
432
433 private MasterAddressTracker masterAddressTracker;
434
435
436 private ClusterStatusTracker clusterStatusTracker;
437
438
439 private SplitLogWorker splitLogWorker;
440
441
442 private final Sleeper sleeper;
443
444 private final int rpcTimeout;
445
446 private final RegionServerAccounting regionServerAccounting;
447
448
449 final CacheConfig cacheConfig;
450
451
452 private HealthCheckChore healthCheckChore;
453
454
455 private Chore nonceManagerChore;
456
457
458
459
460
461
462
463 private ServerName serverNameFromMasterPOV;
464
465
466
467
468 private final long startcode;
469
470
471
472
473 private String clusterId;
474
475
476
477
478 private ObjectName mxBean = null;
479
480
481
482
483 private MovedRegionsCleaner movedRegionsCleaner;
484
485
486
487
488 private final int scannerLeaseTimeoutPeriod;
489
490
491
492
493 private final PriorityFunction priority;
494
495 private RegionServerCoprocessorHost rsHost;
496
497 private RegionServerProcedureManagerHost rspmHost;
498
499
500 private TableLockManager tableLockManager;
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520 private final ServerNonceManager nonceManager;
521
522 private UserProvider userProvider;
523
524
525
526
527
528
529
530
531 public HRegionServer(Configuration conf)
532 throws IOException, InterruptedException {
533 this.fsOk = true;
534 this.conf = conf;
535 this.isOnline = false;
536 checkCodecs(this.conf);
537 this.userProvider = UserProvider.instantiate(conf);
538
539 FSUtils.setupShortCircuitRead(this.conf);
540
541
542 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
543 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
544 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
545 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
546
547 this.sleeper = new Sleeper(this.msgInterval, this);
548
549 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
550 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
551
552 this.maxScannerResultSize = conf.getLong(
553 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
554 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
555
556 this.numRegionsToReport = conf.getInt(
557 "hbase.regionserver.numregionstoreport", 10);
558
559 this.rpcTimeout = conf.getInt(
560 HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
561 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
562
563 this.abortRequested = false;
564 this.stopped = false;
565
566 this.scannerLeaseTimeoutPeriod = HBaseConfiguration.getInt(conf,
567 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
568 HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
569 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
570
571
572 String hostname = conf.get("hbase.regionserver.ipc.address",
573 Strings.domainNamePointerToHostName(DNS.getDefaultHost(
574 conf.get("hbase.regionserver.dns.interface", "default"),
575 conf.get("hbase.regionserver.dns.nameserver", "default"))));
576 int port = conf.getInt(HConstants.REGIONSERVER_PORT,
577 HConstants.DEFAULT_REGIONSERVER_PORT);
578
579 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
580 if (initialIsa.getAddress() == null) {
581 throw new IllegalArgumentException("Failed resolve of " + initialIsa);
582 }
583 this.rand = new Random(initialIsa.hashCode());
584 String name = "regionserver/" + initialIsa.toString();
585
586 HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
587 this.priority = new AnnotationReadingPriorityFunction(this);
588 RpcSchedulerFactory rpcSchedulerFactory;
589 try {
590 Class<?> rpcSchedulerFactoryClass = conf.getClass(
591 REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
592 SimpleRpcSchedulerFactory.class);
593 rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
594 } catch (InstantiationException e) {
595 throw new IllegalArgumentException(e);
596 } catch (IllegalAccessException e) {
597 throw new IllegalArgumentException(e);
598 }
599 this.rpcServer = new RpcServer(this, name, getServices(),
600
601 initialIsa,
602 conf,
603 rpcSchedulerFactory.create(conf, this));
604
605
606 this.isa = this.rpcServer.getListenerAddress();
607
608 this.rpcServer.setErrorHandler(this);
609 this.startcode = System.currentTimeMillis();
610
611
612 ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
613 "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
614
615
616 userProvider.login("hbase.regionserver.keytab.file",
617 "hbase.regionserver.kerberos.principal", this.isa.getHostName());
618 regionServerAccounting = new RegionServerAccounting();
619 cacheConfig = new CacheConfig(conf);
620 uncaughtExceptionHandler = new UncaughtExceptionHandler() {
621 @Override
622 public void uncaughtException(Thread t, Throwable e) {
623 abort("Uncaught exception in service thread " + t.getName(), e);
624 }
625 };
626
627 this.rsInfo = RegionServerInfo.newBuilder();
628
629
630 this.rsInfo.setInfoPort(putUpWebUI());
631 }
632
633
634
635
636 private List<BlockingServiceAndInterface> getServices() {
637 List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
638 bssi.add(new BlockingServiceAndInterface(
639 ClientProtos.ClientService.newReflectiveBlockingService(this),
640 ClientProtos.ClientService.BlockingInterface.class));
641 bssi.add(new BlockingServiceAndInterface(
642 AdminProtos.AdminService.newReflectiveBlockingService(this),
643 AdminProtos.AdminService.BlockingInterface.class));
644 return bssi;
645 }
646
647
648
649
650
651
652 private static void checkCodecs(final Configuration c) throws IOException {
653
654 String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
655 if (codecs == null) return;
656 for (String codec : codecs) {
657 if (!CompressionTest.testCompression(codec)) {
658 throw new IOException("Compression codec " + codec +
659 " not supported, aborting RS construction");
660 }
661 }
662 }
663
664 String getClusterId() {
665 return this.clusterId;
666 }
667
668 @Override
669 public int getPriority(RequestHeader header, Message param) {
670 return priority.getPriority(header, param);
671 }
672
673 @Retention(RetentionPolicy.RUNTIME)
674 protected @interface QosPriority {
675 int priority() default 0;
676 }
677
678 PriorityFunction getPriority() {
679 return priority;
680 }
681
682 RegionScanner getScanner(long scannerId) {
683 String scannerIdString = Long.toString(scannerId);
684 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
685 if (scannerHolder != null) {
686 return scannerHolder.s;
687 }
688 return null;
689 }
690
691
692
693
694
695
696
697 private void preRegistrationInitialization(){
698 try {
699 initializeZooKeeper();
700 initializeThreads();
701 } catch (Throwable t) {
702
703
704 this.rpcServer.stop();
705 abort("Initialization of RS failed. Hence aborting RS.", t);
706 }
707 }
708
709
710
711
712
713
714
715
716
717 private void initializeZooKeeper() throws IOException, InterruptedException {
718
719 this.zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
720 this.isa.getPort(), this);
721
722
723
724
725 this.masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
726 this.masterAddressTracker.start();
727 blockAndCheckIfStopped(this.masterAddressTracker);
728
729
730
731 this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
732 this.clusterStatusTracker.start();
733 blockAndCheckIfStopped(this.clusterStatusTracker);
734
735
736 this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this);
737 catalogTracker.start();
738
739
740
741
742 try {
743 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
744 if (clusterId == null) {
745 this.abort("Cluster ID has not been set");
746 }
747 LOG.info("ClusterId : "+clusterId);
748 } catch (KeeperException e) {
749 this.abort("Failed to retrieve Cluster ID",e);
750 }
751
752
753 try {
754 rspmHost = new RegionServerProcedureManagerHost();
755 rspmHost.loadProcedures(conf);
756 rspmHost.initialize(this);
757 } catch (KeeperException e) {
758 this.abort("Failed to reach zk cluster when creating procedure handler.", e);
759 }
760 this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
761 ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode));
762
763
764 this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
765 }
766
767
768
769
770
771
772
773
774 private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
775 throws IOException, InterruptedException {
776 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
777 if (this.stopped) {
778 throw new IOException("Received the shutdown message while waiting.");
779 }
780 }
781 }
782
783
784
785
786 private boolean isClusterUp() {
787 return this.clusterStatusTracker.isClusterUp();
788 }
789
790 private void initializeThreads() throws IOException {
791
792 this.cacheFlusher = new MemStoreFlusher(conf, this);
793
794
795 this.compactSplitThread = new CompactSplitThread(this);
796
797
798
799 this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
800 this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
801
802 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
803 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
804 if (isHealthCheckerConfigured()) {
805 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
806 }
807
808 this.leases = new Leases(this.threadWakeFrequency);
809
810
811 movedRegionsCleaner = MovedRegionsCleaner.createAndStart(this);
812
813 if (this.nonceManager != null) {
814
815 nonceManagerChore = this.nonceManager.createCleanupChore(this);
816 }
817
818
819 rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress(
820 this.isa.getAddress(), 0));
821 this.pauseMonitor = new JvmPauseMonitor(conf);
822 pauseMonitor.start();
823 }
824
825
826
827
828 @Override
829 public void run() {
830 try {
831
832 preRegistrationInitialization();
833 } catch (Throwable e) {
834 abort("Fatal exception during initialization", e);
835 }
836
837 try {
838
839
840 while (keepLooping()) {
841 RegionServerStartupResponse w = reportForDuty();
842 if (w == null) {
843 LOG.warn("reportForDuty failed; sleeping and then retrying.");
844 this.sleeper.sleep();
845 } else {
846 handleReportForDutyResponse(w);
847 break;
848 }
849 }
850
851
852
853
854 this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
855
856 if (!this.stopped && isHealthy()){
857
858
859 rspmHost.start();
860 }
861
862
863 long lastMsg = 0;
864 long oldRequestCount = -1;
865
866 while (!this.stopped && isHealthy()) {
867 if (!isClusterUp()) {
868 if (isOnlineRegionsEmpty()) {
869 stop("Exiting; cluster shutdown set and not carrying any regions");
870 } else if (!this.stopping) {
871 this.stopping = true;
872 LOG.info("Closing user regions");
873 closeUserRegions(this.abortRequested);
874 } else if (this.stopping) {
875 boolean allUserRegionsOffline = areAllUserRegionsOffline();
876 if (allUserRegionsOffline) {
877
878
879
880 if (oldRequestCount == getWriteRequestCount()) {
881 stop("Stopped; only catalog regions remaining online");
882 break;
883 }
884 oldRequestCount = getWriteRequestCount();
885 } else {
886
887
888
889 closeUserRegions(this.abortRequested);
890 }
891 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
892 }
893 }
894 long now = System.currentTimeMillis();
895 if ((now - lastMsg) >= msgInterval) {
896 tryRegionServerReport(lastMsg, now);
897 lastMsg = System.currentTimeMillis();
898 }
899 if (!this.stopped) this.sleeper.sleep();
900 }
901 } catch (Throwable t) {
902 if (!checkOOME(t)) {
903 String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
904 abort(prefix + t.getMessage(), t);
905 }
906 }
907
908 if (mxBean != null) {
909 MBeanUtil.unregisterMBean(mxBean);
910 mxBean = null;
911 }
912 if (this.leases != null) this.leases.closeAfterLeasesExpire();
913 this.rpcServer.stop();
914 if (this.splitLogWorker != null) {
915 splitLogWorker.stop();
916 }
917 if (this.infoServer != null) {
918 LOG.info("Stopping infoServer");
919 try {
920 this.infoServer.stop();
921 } catch (Exception e) {
922 e.printStackTrace();
923 }
924 }
925
926 if (cacheConfig.isBlockCacheEnabled()) {
927 cacheConfig.getBlockCache().shutdown();
928 }
929
930 if (movedRegionsCleaner != null) {
931 movedRegionsCleaner.stop("Region Server stopping");
932 }
933
934
935
936 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
937 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
938 if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
939 if (this.metaHLogRoller != null) this.metaHLogRoller.interruptIfNecessary();
940 if (this.compactionChecker != null)
941 this.compactionChecker.interrupt();
942 if (this.healthCheckChore != null) {
943 this.healthCheckChore.interrupt();
944 }
945 if (this.nonceManagerChore != null) {
946 this.nonceManagerChore.interrupt();
947 }
948
949
950 if (rspmHost != null) {
951 rspmHost.stop(this.abortRequested || this.killed);
952 }
953
954 if (this.killed) {
955
956 } else if (abortRequested) {
957 if (this.fsOk) {
958 closeUserRegions(abortRequested);
959 }
960 LOG.info("aborting server " + this.serverNameFromMasterPOV);
961 } else {
962 closeUserRegions(abortRequested);
963 closeAllScanners();
964 LOG.info("stopping server " + this.serverNameFromMasterPOV);
965 }
966
967
968 if (this.catalogTracker != null) this.catalogTracker.stop();
969
970
971 if (!this.killed && containsMetaTableRegions()) {
972 if (!abortRequested || this.fsOk) {
973 if (this.compactSplitThread != null) {
974 this.compactSplitThread.join();
975 this.compactSplitThread = null;
976 }
977 closeMetaTableRegions(abortRequested);
978 }
979 }
980
981 if (!this.killed && this.fsOk) {
982 waitOnAllRegionsToClose(abortRequested);
983 LOG.info("stopping server " + this.serverNameFromMasterPOV +
984 "; all regions closed.");
985 }
986
987
988 if (this.fsOk) {
989 closeWAL(!abortRequested);
990 }
991
992
993 if (this.rssStub != null) {
994 this.rssStub = null;
995 }
996 if (this.rpcClient != null) {
997 this.rpcClient.stop();
998 }
999 if (this.leases != null) {
1000 this.leases.close();
1001 }
1002 if (this.pauseMonitor != null) {
1003 this.pauseMonitor.stop();
1004 }
1005
1006 if (!killed) {
1007 join();
1008 }
1009
1010 try {
1011 deleteMyEphemeralNode();
1012 } catch (KeeperException e) {
1013 LOG.warn("Failed deleting my ephemeral node", e);
1014 }
1015
1016
1017 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1018 if (this.zooKeeper != null) {
1019 this.zooKeeper.close();
1020 }
1021 LOG.info("stopping server " + this.serverNameFromMasterPOV +
1022 "; zookeeper connection closed.");
1023
1024 LOG.info(Thread.currentThread().getName() + " exiting");
1025 }
1026
1027 private boolean containsMetaTableRegions() {
1028 return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1029 }
1030
1031 private boolean areAllUserRegionsOffline() {
1032 if (getNumberOfOnlineRegions() > 2) return false;
1033 boolean allUserRegionsOffline = true;
1034 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1035 if (!e.getValue().getRegionInfo().isMetaTable()) {
1036 allUserRegionsOffline = false;
1037 break;
1038 }
1039 }
1040 return allUserRegionsOffline;
1041 }
1042
1043
1044
1045
1046 private long getWriteRequestCount() {
1047 int writeCount = 0;
1048 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1049 writeCount += e.getValue().getWriteRequestsCount();
1050 }
1051 return writeCount;
1052 }
1053
1054 @VisibleForTesting
1055 protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1056 throws IOException {
1057 if (this.rssStub == null) {
1058
1059 return;
1060 }
1061 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1062 try {
1063 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1064 ServerName sn = ServerName.parseVersionedServerName(
1065 this.serverNameFromMasterPOV.getVersionedBytes());
1066 request.setServer(ProtobufUtil.toServerName(sn));
1067 request.setLoad(sl);
1068 this.rssStub.regionServerReport(null, request.build());
1069 } catch (ServiceException se) {
1070 IOException ioe = ProtobufUtil.getRemoteException(se);
1071 if (ioe instanceof YouAreDeadException) {
1072
1073 throw ioe;
1074 }
1075
1076
1077 Pair<ServerName, RegionServerStatusService.BlockingInterface> p =
1078 createRegionServerStatusStub();
1079 this.rssStub = p.getSecond();
1080 }
1081 }
1082
1083 ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) {
1084
1085
1086
1087
1088
1089
1090
1091 MetricsRegionServerWrapper regionServerWrapper = this.metricsRegionServer.getRegionServerWrapper();
1092 Collection<HRegion> regions = getOnlineRegionsLocalContext();
1093 MemoryUsage memory =
1094 ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1095
1096 ClusterStatusProtos.ServerLoad.Builder serverLoad =
1097 ClusterStatusProtos.ServerLoad.newBuilder();
1098 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1099 serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1100 serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1101 serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1102 Set<String> coprocessors = this.hlog.getCoprocessorHost().getCoprocessors();
1103 for (String coprocessor : coprocessors) {
1104 serverLoad.addCoprocessors(
1105 Coprocessor.newBuilder().setName(coprocessor).build());
1106 }
1107 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1108 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1109 for (HRegion region : regions) {
1110 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1111 }
1112 serverLoad.setReportStartTime(reportStartTime);
1113 serverLoad.setReportEndTime(reportEndTime);
1114 if (this.infoServer != null) {
1115 serverLoad.setInfoServerPort(this.infoServer.getPort());
1116 } else {
1117 serverLoad.setInfoServerPort(-1);
1118 }
1119 return serverLoad.build();
1120 }
1121
1122 String getOnlineRegionsAsPrintableString() {
1123 StringBuilder sb = new StringBuilder();
1124 for (HRegion r: this.onlineRegions.values()) {
1125 if (sb.length() > 0) sb.append(", ");
1126 sb.append(r.getRegionInfo().getEncodedName());
1127 }
1128 return sb.toString();
1129 }
1130
1131
1132
1133
1134 private void waitOnAllRegionsToClose(final boolean abort) {
1135
1136 int lastCount = -1;
1137 long previousLogTime = 0;
1138 Set<String> closedRegions = new HashSet<String>();
1139 while (!isOnlineRegionsEmpty()) {
1140 int count = getNumberOfOnlineRegions();
1141
1142 if (count != lastCount) {
1143
1144 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1145 previousLogTime = System.currentTimeMillis();
1146 lastCount = count;
1147 LOG.info("Waiting on " + count + " regions to close");
1148
1149
1150 if (count < 10 && LOG.isDebugEnabled()) {
1151 LOG.debug(this.onlineRegions);
1152 }
1153 }
1154 }
1155
1156
1157
1158 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1159 HRegionInfo hri = e.getValue().getRegionInfo();
1160 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1161 && !closedRegions.contains(hri.getEncodedName())) {
1162 closedRegions.add(hri.getEncodedName());
1163
1164 closeRegionIgnoreErrors(hri, abort);
1165 }
1166 }
1167
1168 if (this.regionsInTransitionInRS.isEmpty()) {
1169 if (!isOnlineRegionsEmpty()) {
1170 LOG.info("We were exiting though online regions are not empty," +
1171 " because some regions failed closing");
1172 }
1173 break;
1174 }
1175 Threads.sleep(200);
1176 }
1177 }
1178
1179 private void closeWAL(final boolean delete) {
1180 if (this.hlogForMeta != null) {
1181
1182
1183
1184
1185 try {
1186 this.hlogForMeta.close();
1187 } catch (Throwable e) {
1188 LOG.error("Metalog close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1189 }
1190 }
1191 if (this.hlog != null) {
1192 try {
1193 if (delete) {
1194 hlog.closeAndDelete();
1195 } else {
1196 hlog.close();
1197 }
1198 } catch (Throwable e) {
1199 LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e));
1200 }
1201 }
1202 }
1203
1204 private void closeAllScanners() {
1205
1206
1207 for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
1208 try {
1209 e.getValue().s.close();
1210 } catch (IOException ioe) {
1211 LOG.warn("Closing scanner " + e.getKey(), ioe);
1212 }
1213 }
1214 }
1215
1216
1217
1218
1219
1220
1221 protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1222 throws IOException {
1223 try {
1224 for (NameStringPair e : c.getMapEntriesList()) {
1225 String key = e.getName();
1226
1227 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1228 String hostnameFromMasterPOV = e.getValue();
1229 this.serverNameFromMasterPOV = ServerName.valueOf(hostnameFromMasterPOV,
1230 this.isa.getPort(), this.startcode);
1231 if (!hostnameFromMasterPOV.equals(this.isa.getHostName())) {
1232 LOG.info("Master passed us a different hostname to use; was=" +
1233 this.isa.getHostName() + ", but now=" + hostnameFromMasterPOV);
1234 }
1235 continue;
1236 }
1237 String value = e.getValue();
1238 if (LOG.isDebugEnabled()) {
1239 LOG.debug("Config from master: " + key + "=" + value);
1240 }
1241 this.conf.set(key, value);
1242 }
1243
1244
1245
1246 if (this.conf.get("mapred.task.id") == null) {
1247 this.conf.set("mapred.task.id", "hb_rs_" +
1248 this.serverNameFromMasterPOV.toString());
1249 }
1250
1251 createMyEphemeralNode();
1252
1253
1254 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1255
1256
1257
1258
1259
1260
1261 FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
1262
1263
1264 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
1265 this.fs = new HFileSystem(this.conf, useHBaseChecksum);
1266 this.rootDir = FSUtils.getRootDir(this.conf);
1267 this.tableDescriptors = new FSTableDescriptors(this.fs, this.rootDir, true);
1268 this.hlog = setupWALAndReplication();
1269
1270 this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));
1271
1272 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
1273
1274 startServiceThreads();
1275 LOG.info("Serving as " + this.serverNameFromMasterPOV +
1276 ", RpcServer on " + this.isa +
1277 ", sessionid=0x" +
1278 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1279 isOnline = true;
1280 } catch (Throwable e) {
1281 this.isOnline = false;
1282 stop("Failed initialization");
1283 throw convertThrowableToIOE(cleanup(e, "Failed init"),
1284 "Region server startup failed");
1285 } finally {
1286 sleeper.skipSleepCycle();
1287 }
1288 }
1289
1290 private void createMyEphemeralNode() throws KeeperException, IOException {
1291 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1292 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1293 getMyEphemeralNodePath(), data);
1294 }
1295
1296 private void deleteMyEphemeralNode() throws KeeperException {
1297 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1298 }
1299
1300 @Override
1301 public RegionServerAccounting getRegionServerAccounting() {
1302 return regionServerAccounting;
1303 }
1304
1305 @Override
1306 public TableLockManager getTableLockManager() {
1307 return tableLockManager;
1308 }
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318 private RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1319 RegionSpecifier.Builder regionSpecifier) {
1320 byte[] name = r.getRegionName();
1321 int stores = 0;
1322 int storefiles = 0;
1323 int storeUncompressedSizeMB = 0;
1324 int storefileSizeMB = 0;
1325 int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
1326 int storefileIndexSizeMB = 0;
1327 int rootIndexSizeKB = 0;
1328 int totalStaticIndexSizeKB = 0;
1329 int totalStaticBloomSizeKB = 0;
1330 long totalCompactingKVs = 0;
1331 long currentCompactedKVs = 0;
1332 synchronized (r.stores) {
1333 stores += r.stores.size();
1334 for (Store store : r.stores.values()) {
1335 storefiles += store.getStorefilesCount();
1336 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
1337 / 1024 / 1024);
1338 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1339 storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1340 CompactionProgress progress = store.getCompactionProgress();
1341 if (progress != null) {
1342 totalCompactingKVs += progress.totalCompactingKVs;
1343 currentCompactedKVs += progress.currentCompactedKVs;
1344 }
1345
1346 rootIndexSizeKB +=
1347 (int) (store.getStorefilesIndexSize() / 1024);
1348
1349 totalStaticIndexSizeKB +=
1350 (int) (store.getTotalStaticIndexSize() / 1024);
1351
1352 totalStaticBloomSizeKB +=
1353 (int) (store.getTotalStaticBloomSize() / 1024);
1354 }
1355 }
1356 if (regionLoadBldr == null) {
1357 regionLoadBldr = RegionLoad.newBuilder();
1358 }
1359 if (regionSpecifier == null) {
1360 regionSpecifier = RegionSpecifier.newBuilder();
1361 }
1362 regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1363 regionSpecifier.setValue(ByteStringer.wrap(name));
1364 regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1365 .setStores(stores)
1366 .setStorefiles(storefiles)
1367 .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1368 .setStorefileSizeMB(storefileSizeMB)
1369 .setMemstoreSizeMB(memstoreSizeMB)
1370 .setStorefileIndexSizeMB(storefileIndexSizeMB)
1371 .setRootIndexSizeKB(rootIndexSizeKB)
1372 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1373 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1374 .setReadRequestsCount(r.readRequestsCount.get())
1375 .setWriteRequestsCount(r.writeRequestsCount.get())
1376 .setTotalCompactingKVs(totalCompactingKVs)
1377 .setCurrentCompactedKVs(currentCompactedKVs)
1378 .setCompleteSequenceId(r.completeSequenceId);
1379
1380 return regionLoadBldr.build();
1381 }
1382
1383
1384
1385
1386
1387 public RegionLoad createRegionLoad(final String encodedRegionName) {
1388 HRegion r = null;
1389 r = this.onlineRegions.get(encodedRegionName);
1390 return r != null ? createRegionLoad(r, null, null) : null;
1391 }
1392
1393
1394
1395
1396 private static class CompactionChecker extends Chore {
1397 private final HRegionServer instance;
1398 private final int majorCompactPriority;
1399 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1400 private long iteration = 0;
1401
1402 CompactionChecker(final HRegionServer h, final int sleepTime,
1403 final Stoppable stopper) {
1404 super("CompactionChecker", sleepTime, h);
1405 this.instance = h;
1406 LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1407
1408
1409
1410
1411 this.majorCompactPriority = this.instance.conf.
1412 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1413 DEFAULT_PRIORITY);
1414 }
1415
1416 @Override
1417 protected void chore() {
1418 for (HRegion r : this.instance.onlineRegions.values()) {
1419 if (r == null)
1420 continue;
1421 for (Store s : r.getStores().values()) {
1422 try {
1423 long multiplier = s.getCompactionCheckMultiplier();
1424 assert multiplier > 0;
1425 if (iteration % multiplier != 0) continue;
1426 if (s.needsCompaction()) {
1427
1428 this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1429 + " requests compaction");
1430 } else if (s.isMajorCompaction()) {
1431 if (majorCompactPriority == DEFAULT_PRIORITY
1432 || majorCompactPriority > r.getCompactPriority()) {
1433 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1434 + " requests major compaction; use default priority", null);
1435 } else {
1436 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1437 + " requests major compaction; use configured priority",
1438 this.majorCompactPriority, null);
1439 }
1440 }
1441 } catch (IOException e) {
1442 LOG.warn("Failed major compaction check on " + r, e);
1443 }
1444 }
1445 }
1446 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1447 }
1448 }
1449
1450 class PeriodicMemstoreFlusher extends Chore {
1451 final HRegionServer server;
1452 final static int RANGE_OF_DELAY = 20000;
1453 final static int MIN_DELAY_TIME = 3000;
1454 public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1455 super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
1456 this.server = server;
1457 }
1458
1459 @Override
1460 protected void chore() {
1461 for (HRegion r : this.server.onlineRegions.values()) {
1462 if (r == null)
1463 continue;
1464 if (r.shouldFlush()) {
1465 FlushRequester requester = server.getFlushRequester();
1466 if (requester != null) {
1467 long randomDelay = rand.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1468 LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
1469 " after a delay of " + randomDelay);
1470
1471
1472
1473 requester.requestDelayedFlush(r, randomDelay);
1474 }
1475 }
1476 }
1477 }
1478 }
1479
1480
1481
1482
1483
1484
1485
1486
1487 public boolean isOnline() {
1488 return isOnline;
1489 }
1490
1491
1492
1493
1494
1495
1496
1497 private HLog setupWALAndReplication() throws IOException {
1498 final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1499 final String logName
1500 = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
1501
1502 Path logdir = new Path(rootDir, logName);
1503 if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1504 if (this.fs.exists(logdir)) {
1505 throw new RegionServerRunningException("Region server has already " +
1506 "created directory at " + this.serverNameFromMasterPOV.toString());
1507 }
1508
1509
1510
1511 createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
1512
1513 return instantiateHLog(rootDir, logName);
1514 }
1515
1516 private HLog getMetaWAL() throws IOException {
1517 if (this.hlogForMeta != null) return this.hlogForMeta;
1518 final String logName = HLogUtil.getHLogDirectoryName(this.serverNameFromMasterPOV.toString());
1519 Path logdir = new Path(rootDir, logName);
1520 if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
1521 this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), rootDir, logName,
1522 this.conf, getMetaWALActionListeners(), this.serverNameFromMasterPOV.toString());
1523 return this.hlogForMeta;
1524 }
1525
1526
1527
1528
1529
1530
1531
1532
1533 protected HLog instantiateHLog(Path rootdir, String logName) throws IOException {
1534 return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf,
1535 getWALActionListeners(), this.serverNameFromMasterPOV.toString());
1536 }
1537
1538
1539
1540
1541
1542
1543
1544 protected List<WALActionsListener> getWALActionListeners() {
1545 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1546
1547 this.hlogRoller = new LogRoller(this, this);
1548 listeners.add(this.hlogRoller);
1549 if (this.replicationSourceHandler != null &&
1550 this.replicationSourceHandler.getWALActionsListener() != null) {
1551
1552 listeners.add(this.replicationSourceHandler.getWALActionsListener());
1553 }
1554 return listeners;
1555 }
1556
1557 protected List<WALActionsListener> getMetaWALActionListeners() {
1558 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1559
1560
1561 MetaLogRoller tmpLogRoller = new MetaLogRoller(this, this);
1562 String n = Thread.currentThread().getName();
1563 Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1564 n + "-MetaLogRoller", uncaughtExceptionHandler);
1565 this.metaHLogRoller = tmpLogRoller;
1566 tmpLogRoller = null;
1567 listeners.add(this.metaHLogRoller);
1568 return listeners;
1569 }
1570
1571 protected LogRoller getLogRoller() {
1572 return hlogRoller;
1573 }
1574
1575 public MetricsRegionServer getMetrics() {
1576 return this.metricsRegionServer;
1577 }
1578
1579
1580
1581
1582 public MasterAddressTracker getMasterAddressTracker() {
1583 return this.masterAddressTracker;
1584 }
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598 private void startServiceThreads() throws IOException {
1599 String n = Thread.currentThread().getName();
1600
1601 this.service = new ExecutorService(getServerName().toShortString());
1602 this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1603 conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1604 this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1605 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1606 this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1607 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1608 this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1609 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1610 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1611 this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1612 conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1613 }
1614 this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
1615 conf.getInt("hbase.regionserver.wal.max.splitters", SplitLogWorker.DEFAULT_MAX_SPLITTERS));
1616
1617 Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
1618 uncaughtExceptionHandler);
1619 this.cacheFlusher.start(uncaughtExceptionHandler);
1620 Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
1621 ".compactionChecker", uncaughtExceptionHandler);
1622 Threads.setDaemonThreadRunning(this.periodicFlusher.getThread(), n +
1623 ".periodicFlusher", uncaughtExceptionHandler);
1624 if (this.healthCheckChore != null) {
1625 Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
1626 uncaughtExceptionHandler);
1627 }
1628 if (this.nonceManagerChore != null) {
1629 Threads.setDaemonThreadRunning(this.nonceManagerChore.getThread(), n + ".nonceCleaner",
1630 uncaughtExceptionHandler);
1631 }
1632
1633
1634
1635 this.leases.setName(n + ".leaseChecker");
1636 this.leases.start();
1637
1638 if (this.replicationSourceHandler == this.replicationSinkHandler &&
1639 this.replicationSourceHandler != null) {
1640 this.replicationSourceHandler.startReplicationService();
1641 } else {
1642 if (this.replicationSourceHandler != null) {
1643 this.replicationSourceHandler.startReplicationService();
1644 }
1645 if (this.replicationSinkHandler != null) {
1646 this.replicationSinkHandler.startReplicationService();
1647 }
1648 }
1649
1650
1651
1652 this.rpcServer.start();
1653
1654
1655
1656
1657
1658 Configuration sinkConf = HBaseConfiguration.create(conf);
1659 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1660 conf.getInt("hbase.log.replay.retries.number", 8));
1661 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1662 conf.getInt("hbase.log.replay.rpc.timeout", 30000));
1663 sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1664 this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
1665 splitLogWorker.start();
1666 }
1667
1668
1669
1670
1671
1672
1673 private int putUpWebUI() throws IOException {
1674 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 60030);
1675
1676 if (port < 0) return port;
1677 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1678
1679 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1680 false);
1681 while (true) {
1682 try {
1683 this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf);
1684 this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class);
1685 this.infoServer.addServlet("dump", "/dump", RSDumpServlet.class);
1686 this.infoServer.setAttribute(REGIONSERVER, this);
1687 this.infoServer.setAttribute(REGIONSERVER_CONF, conf);
1688 this.infoServer.start();
1689 break;
1690 } catch (BindException e) {
1691 if (!auto) {
1692
1693 LOG.error("Failed binding http info server to port: " + port);
1694 throw e;
1695 }
1696
1697 LOG.info("Failed binding http info server to port: " + port);
1698 port++;
1699 }
1700 }
1701 return this.infoServer.getPort();
1702 }
1703
1704
1705
1706
1707 private boolean isHealthy() {
1708 if (!fsOk) {
1709
1710 return false;
1711 }
1712
1713 if (!(leases.isAlive()
1714 && cacheFlusher.isAlive() && hlogRoller.isAlive()
1715 && this.compactionChecker.isAlive()
1716 && this.periodicFlusher.isAlive())) {
1717 stop("One or more threads are no longer alive -- stop");
1718 return false;
1719 }
1720 if (metaHLogRoller != null && !metaHLogRoller.isAlive()) {
1721 stop("Meta HLog roller thread is no longer alive -- stop");
1722 return false;
1723 }
1724 return true;
1725 }
1726
1727 public HLog getWAL() {
1728 try {
1729 return getWAL(null);
1730 } catch (IOException e) {
1731 LOG.warn("getWAL threw exception " + e);
1732 return null;
1733 }
1734 }
1735
1736 @Override
1737 public HLog getWAL(HRegionInfo regionInfo) throws IOException {
1738
1739
1740
1741
1742 if (regionInfo != null && regionInfo.isMetaTable()) {
1743 return getMetaWAL();
1744 }
1745 return this.hlog;
1746 }
1747
1748 @Override
1749 public CatalogTracker getCatalogTracker() {
1750 return this.catalogTracker;
1751 }
1752
1753 @Override
1754 public void stop(final String msg) {
1755 if (!this.stopped) {
1756 try {
1757 if (this.rsHost != null) {
1758 this.rsHost.preStop(msg);
1759 }
1760 this.stopped = true;
1761 LOG.info("STOPPED: " + msg);
1762
1763 sleeper.skipSleepCycle();
1764 } catch (IOException exp) {
1765 LOG.warn("The region server did not stop", exp);
1766 }
1767 }
1768 }
1769
1770 public void waitForServerOnline(){
1771 while (!isOnline() && !isStopped()){
1772 sleeper.sleep();
1773 }
1774 }
1775
1776 @Override
1777 public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct)
1778 throws KeeperException, IOException {
1779 checkOpen();
1780 LOG.info("Post open deploy tasks for region=" + r.getRegionNameAsString());
1781
1782 for (Store s : r.getStores().values()) {
1783 if (s.hasReferences() || s.needsCompaction()) {
1784 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
1785 }
1786 }
1787 long openSeqNum = r.getOpenSeqNum();
1788 if (openSeqNum == HConstants.NO_SEQNUM) {
1789
1790 LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
1791 openSeqNum = 0;
1792 }
1793
1794
1795 updateRecoveringRegionLastFlushedSequenceId(r);
1796
1797
1798 if (r.getRegionInfo().isMetaRegion()) {
1799 MetaRegionTracker.setMetaLocation(getZooKeeper(),
1800 this.serverNameFromMasterPOV);
1801 } else {
1802 MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
1803 this.serverNameFromMasterPOV, openSeqNum);
1804 }
1805 LOG.info("Finished post open deploy task for " + r.getRegionNameAsString());
1806
1807 }
1808
1809 @Override
1810 public RpcServerInterface getRpcServer() {
1811 return rpcServer;
1812 }
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824 @Override
1825 public void abort(String reason, Throwable cause) {
1826 String msg = "ABORTING region server " + this + ": " + reason;
1827 if (cause != null) {
1828 LOG.fatal(msg, cause);
1829 } else {
1830 LOG.fatal(msg);
1831 }
1832 this.abortRequested = true;
1833
1834
1835
1836 LOG.fatal("RegionServer abort: loaded coprocessors are: " +
1837 CoprocessorHost.getLoadedCoprocessors());
1838
1839 try {
1840 if (cause != null) {
1841 msg += "\nCause:\n" + StringUtils.stringifyException(cause);
1842 }
1843
1844 if (rssStub != null && this.serverNameFromMasterPOV != null) {
1845 ReportRSFatalErrorRequest.Builder builder =
1846 ReportRSFatalErrorRequest.newBuilder();
1847 ServerName sn =
1848 ServerName.parseVersionedServerName(this.serverNameFromMasterPOV.getVersionedBytes());
1849 builder.setServer(ProtobufUtil.toServerName(sn));
1850 builder.setErrorMessage(msg);
1851 rssStub.reportRSFatalError(null, builder.build());
1852 }
1853 } catch (Throwable t) {
1854 LOG.warn("Unable to report fatal error to master", t);
1855 }
1856 stop(reason);
1857 }
1858
1859
1860
1861
1862 public void abort(String reason) {
1863 abort(reason, null);
1864 }
1865
1866 @Override
1867 public boolean isAborted() {
1868 return this.abortRequested;
1869 }
1870
1871
1872
1873
1874
1875
1876 protected void kill() {
1877 this.killed = true;
1878 abort("Simulated kill");
1879 }
1880
1881
1882
1883
1884
1885 protected void join() {
1886 if (this.nonceManagerChore != null) {
1887 Threads.shutdown(this.nonceManagerChore.getThread());
1888 }
1889 if (this.compactionChecker != null) {
1890 Threads.shutdown(this.compactionChecker.getThread());
1891 }
1892 if (this.periodicFlusher != null) {
1893 Threads.shutdown(this.periodicFlusher.getThread());
1894 }
1895 if (this.cacheFlusher != null) {
1896 this.cacheFlusher.join();
1897 }
1898 if (this.healthCheckChore != null) {
1899 Threads.shutdown(this.healthCheckChore.getThread());
1900 }
1901 if (this.spanReceiverHost != null) {
1902 this.spanReceiverHost.closeReceivers();
1903 }
1904 if (this.hlogRoller != null) {
1905 Threads.shutdown(this.hlogRoller.getThread());
1906 }
1907 if (this.metaHLogRoller != null) {
1908 Threads.shutdown(this.metaHLogRoller.getThread());
1909 }
1910 if (this.compactSplitThread != null) {
1911 this.compactSplitThread.join();
1912 }
1913 if (this.service != null) this.service.shutdown();
1914 if (this.replicationSourceHandler != null &&
1915 this.replicationSourceHandler == this.replicationSinkHandler) {
1916 this.replicationSourceHandler.stopReplicationService();
1917 } else {
1918 if (this.replicationSourceHandler != null) {
1919 this.replicationSourceHandler.stopReplicationService();
1920 }
1921 if (this.replicationSinkHandler != null) {
1922 this.replicationSinkHandler.stopReplicationService();
1923 }
1924 }
1925 }
1926
1927
1928
1929
1930
1931 ReplicationSourceService getReplicationSourceService() {
1932 return replicationSourceHandler;
1933 }
1934
1935
1936
1937
1938
1939 ReplicationSinkService getReplicationSinkService() {
1940 return replicationSinkHandler;
1941 }
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951 private Pair<ServerName, RegionServerStatusService.BlockingInterface>
1952 createRegionServerStatusStub() {
1953 ServerName sn = null;
1954 long previousLogTime = 0;
1955 RegionServerStatusService.BlockingInterface master = null;
1956 boolean refresh = false;
1957 RegionServerStatusService.BlockingInterface intf = null;
1958 while (keepLooping() && master == null) {
1959 sn = this.masterAddressTracker.getMasterAddress(refresh);
1960 if (sn == null) {
1961 if (!keepLooping()) {
1962
1963 LOG.debug("No master found and cluster is stopped; bailing out");
1964 return null;
1965 }
1966 LOG.debug("No master found; retry");
1967 previousLogTime = System.currentTimeMillis();
1968 refresh = true;
1969 sleeper.sleep();
1970 continue;
1971 }
1972
1973 new InetSocketAddress(sn.getHostname(), sn.getPort());
1974 try {
1975 BlockingRpcChannel channel =
1976 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(), this.rpcTimeout);
1977 intf = RegionServerStatusService.newBlockingStub(channel);
1978 break;
1979 } catch (IOException e) {
1980 e = e instanceof RemoteException ?
1981 ((RemoteException)e).unwrapRemoteException() : e;
1982 if (e instanceof ServerNotRunningYetException) {
1983 if (System.currentTimeMillis() > (previousLogTime+1000)){
1984 LOG.info("Master isn't available yet, retrying");
1985 previousLogTime = System.currentTimeMillis();
1986 }
1987 } else {
1988 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1989 LOG.warn("Unable to connect to master. Retrying. Error was:", e);
1990 previousLogTime = System.currentTimeMillis();
1991 }
1992 }
1993 try {
1994 Thread.sleep(200);
1995 } catch (InterruptedException ignored) {
1996 }
1997 }
1998 }
1999 return new Pair<ServerName, RegionServerStatusService.BlockingInterface>(sn, intf);
2000 }
2001
2002
2003
2004
2005
2006 private boolean keepLooping() {
2007 return !this.stopped && isClusterUp();
2008 }
2009
2010
2011
2012
2013
2014
2015
2016
2017 private RegionServerStartupResponse reportForDuty() throws IOException {
2018 RegionServerStartupResponse result = null;
2019 Pair<ServerName, RegionServerStatusService.BlockingInterface> p =
2020 createRegionServerStatusStub();
2021 this.rssStub = p.getSecond();
2022 ServerName masterServerName = p.getFirst();
2023 if (masterServerName == null) return result;
2024 try {
2025 this.requestCount.set(0);
2026 LOG.info("reportForDuty to master=" + masterServerName + " with port=" + this.isa.getPort() +
2027 ", startcode=" + this.startcode);
2028 long now = EnvironmentEdgeManager.currentTimeMillis();
2029 int port = this.isa.getPort();
2030 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2031 request.setPort(port);
2032 request.setServerStartCode(this.startcode);
2033 request.setServerCurrentTime(now);
2034 result = this.rssStub.regionServerStartup(null, request.build());
2035 } catch (ServiceException se) {
2036 IOException ioe = ProtobufUtil.getRemoteException(se);
2037 if (ioe instanceof ClockOutOfSyncException) {
2038 LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2039
2040 throw ioe;
2041 } else if (ioe instanceof ServerNotRunningYetException) {
2042 LOG.debug("Master is not running yet");
2043 } else {
2044 LOG.warn("error telling master we are up", se);
2045 }
2046 }
2047 return result;
2048 }
2049
2050 @Override
2051 public long getLastSequenceId(byte[] region) {
2052 Long lastFlushedSequenceId = -1l;
2053 try {
2054 GetLastFlushedSequenceIdRequest req = RequestConverter
2055 .buildGetLastFlushedSequenceIdRequest(region);
2056 lastFlushedSequenceId = rssStub.getLastFlushedSequenceId(null, req)
2057 .getLastFlushedSequenceId();
2058 } catch (ServiceException e) {
2059 lastFlushedSequenceId = -1l;
2060 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id", e);
2061 }
2062 return lastFlushedSequenceId;
2063 }
2064
2065
2066
2067
2068
2069
2070 protected void closeAllRegions(final boolean abort) {
2071 closeUserRegions(abort);
2072 closeMetaTableRegions(abort);
2073 }
2074
2075
2076
2077
2078
2079 void closeMetaTableRegions(final boolean abort) {
2080 HRegion meta = null;
2081 this.lock.writeLock().lock();
2082 try {
2083 for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
2084 HRegionInfo hri = e.getValue().getRegionInfo();
2085 if (hri.isMetaRegion()) {
2086 meta = e.getValue();
2087 }
2088 if (meta != null) break;
2089 }
2090 } finally {
2091 this.lock.writeLock().unlock();
2092 }
2093 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2094 }
2095
2096
2097
2098
2099
2100
2101
2102 void closeUserRegions(final boolean abort) {
2103 this.lock.writeLock().lock();
2104 try {
2105 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
2106 HRegion r = e.getValue();
2107 if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2108
2109 closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2110 }
2111 }
2112 } finally {
2113 this.lock.writeLock().unlock();
2114 }
2115 }
2116
2117
2118 public InfoServer getInfoServer() {
2119 return infoServer;
2120 }
2121
2122
2123
2124
2125 @Override
2126 public boolean isStopped() {
2127 return this.stopped;
2128 }
2129
2130 @Override
2131 public boolean isStopping() {
2132 return this.stopping;
2133 }
2134
2135 @Override
2136 public Map<String, HRegion> getRecoveringRegions() {
2137 return this.recoveringRegions;
2138 }
2139
2140
2141
2142
2143
2144 @Override
2145 public Configuration getConfiguration() {
2146 return conf;
2147 }
2148
2149
2150 ReentrantReadWriteLock.WriteLock getWriteLock() {
2151 return lock.writeLock();
2152 }
2153
2154 public int getNumberOfOnlineRegions() {
2155 return this.onlineRegions.size();
2156 }
2157
2158 boolean isOnlineRegionsEmpty() {
2159 return this.onlineRegions.isEmpty();
2160 }
2161
2162
2163
2164
2165
2166
2167 public Collection<HRegion> getOnlineRegionsLocalContext() {
2168 Collection<HRegion> regions = this.onlineRegions.values();
2169 return Collections.unmodifiableCollection(regions);
2170 }
2171
2172 @Override
2173 public void addToOnlineRegions(HRegion region) {
2174 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2175 }
2176
2177
2178
2179
2180
2181
2182 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
2183
2184 SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
2185 new Comparator<Long>() {
2186 @Override
2187 public int compare(Long a, Long b) {
2188 return -1 * a.compareTo(b);
2189 }
2190 });
2191
2192 for (HRegion region : this.onlineRegions.values()) {
2193 sortedRegions.put(region.memstoreSize.get(), region);
2194 }
2195 return sortedRegions;
2196 }
2197
2198
2199
2200
2201 public long getStartcode() {
2202 return this.startcode;
2203 }
2204
2205
2206 @Override
2207 public FlushRequester getFlushRequester() {
2208 return this.cacheFlusher;
2209 }
2210
2211
2212
2213
2214
2215
2216
2217 protected HRegionInfo[] getMostLoadedRegions() {
2218 ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2219 for (HRegion r : onlineRegions.values()) {
2220 if (!r.isAvailable()) {
2221 continue;
2222 }
2223 if (regions.size() < numRegionsToReport) {
2224 regions.add(r.getRegionInfo());
2225 } else {
2226 break;
2227 }
2228 }
2229 return regions.toArray(new HRegionInfo[regions.size()]);
2230 }
2231
2232 @Override
2233 public Leases getLeases() {
2234 return leases;
2235 }
2236
2237
2238
2239
2240 protected Path getRootDir() {
2241 return rootDir;
2242 }
2243
2244
2245
2246
2247 @Override
2248 public FileSystem getFileSystem() {
2249 return fs;
2250 }
2251
2252 @Override
2253 public String toString() {
2254 return getServerName().toString();
2255 }
2256
2257
2258
2259
2260
2261
2262 public int getThreadWakeFrequency() {
2263 return threadWakeFrequency;
2264 }
2265
2266 @Override
2267 public ZooKeeperWatcher getZooKeeper() {
2268 return zooKeeper;
2269 }
2270
2271 @Override
2272 public ServerName getServerName() {
2273
2274 return this.serverNameFromMasterPOV == null?
2275 ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), this.startcode) :
2276 this.serverNameFromMasterPOV;
2277 }
2278
2279 @Override
2280 public CompactionRequestor getCompactionRequester() {
2281 return this.compactSplitThread;
2282 }
2283
2284 public ZooKeeperWatcher getZooKeeperWatcher() {
2285 return this.zooKeeper;
2286 }
2287
2288 public RegionServerCoprocessorHost getCoprocessorHost(){
2289 return this.rsHost;
2290 }
2291
2292 @Override
2293 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2294 return this.regionsInTransitionInRS;
2295 }
2296
2297 @Override
2298 public ExecutorService getExecutorService() {
2299 return service;
2300 }
2301
2302
2303
2304
2305
2306
2307
2308
2309 static private void createNewReplicationInstance(Configuration conf,
2310 HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
2311
2312
2313 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2314 HConstants.REPLICATION_ENABLE_DEFAULT)) {
2315 return;
2316 }
2317
2318
2319 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2320 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2321
2322
2323 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2324 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2325
2326
2327
2328 if (sourceClassname.equals(sinkClassname)) {
2329 server.replicationSourceHandler = (ReplicationSourceService)
2330 newReplicationInstance(sourceClassname,
2331 conf, server, fs, logDir, oldLogDir);
2332 server.replicationSinkHandler = (ReplicationSinkService)
2333 server.replicationSourceHandler;
2334 } else {
2335 server.replicationSourceHandler = (ReplicationSourceService)
2336 newReplicationInstance(sourceClassname,
2337 conf, server, fs, logDir, oldLogDir);
2338 server.replicationSinkHandler = (ReplicationSinkService)
2339 newReplicationInstance(sinkClassname,
2340 conf, server, fs, logDir, oldLogDir);
2341 }
2342 }
2343
2344 static private ReplicationService newReplicationInstance(String classname,
2345 Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
2346 Path oldLogDir) throws IOException{
2347
2348 Class<?> clazz = null;
2349 try {
2350 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2351 clazz = Class.forName(classname, true, classLoader);
2352 } catch (java.lang.ClassNotFoundException nfe) {
2353 throw new IOException("Could not find class for " + classname);
2354 }
2355
2356
2357 ReplicationService service = (ReplicationService)
2358 ReflectionUtils.newInstance(clazz, conf);
2359 service.initialize(server, fs, logDir, oldLogDir);
2360 return service;
2361 }
2362
2363
2364
2365
2366
2367
2368 public static Thread startRegionServer(final HRegionServer hrs)
2369 throws IOException {
2370 return startRegionServer(hrs, "regionserver" + hrs.isa.getPort());
2371 }
2372
2373
2374
2375
2376
2377
2378
2379 public static Thread startRegionServer(final HRegionServer hrs,
2380 final String name) throws IOException {
2381 Thread t = new Thread(hrs);
2382 t.setName(name);
2383 t.start();
2384
2385
2386 ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
2387 .getConfiguration()), hrs, t);
2388 return t;
2389 }
2390
2391
2392
2393
2394
2395
2396
2397
2398 public static HRegionServer constructRegionServer(
2399 Class<? extends HRegionServer> regionServerClass,
2400 final Configuration conf2) {
2401 try {
2402 Constructor<? extends HRegionServer> c = regionServerClass
2403 .getConstructor(Configuration.class);
2404 return c.newInstance(conf2);
2405 } catch (Exception e) {
2406 throw new RuntimeException("Failed construction of " + "Regionserver: "
2407 + regionServerClass.toString(), e);
2408 }
2409 }
2410
2411
2412
2413
2414 public static void main(String[] args) throws Exception {
2415 VersionInfo.logVersion();
2416 Configuration conf = HBaseConfiguration.create();
2417 @SuppressWarnings("unchecked")
2418 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2419 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2420
2421 new HRegionServerCommandLine(regionServerClass).doMain(args);
2422 }
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434 @Override
2435 public List<HRegion> getOnlineRegions(TableName tableName) {
2436 List<HRegion> tableRegions = new ArrayList<HRegion>();
2437 synchronized (this.onlineRegions) {
2438 for (HRegion region: this.onlineRegions.values()) {
2439 HRegionInfo regionInfo = region.getRegionInfo();
2440 if(regionInfo.getTable().equals(tableName)) {
2441 tableRegions.add(region);
2442 }
2443 }
2444 }
2445 return tableRegions;
2446 }
2447
2448
2449 public String[] getCoprocessors() {
2450 TreeSet<String> coprocessors = new TreeSet<String>(
2451 this.hlog.getCoprocessorHost().getCoprocessors());
2452 Collection<HRegion> regions = getOnlineRegionsLocalContext();
2453 for (HRegion region: regions) {
2454 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2455 }
2456 return coprocessors.toArray(new String[coprocessors.size()]);
2457 }
2458
2459
2460
2461
2462
2463 private class ScannerListener implements LeaseListener {
2464 private final String scannerName;
2465
2466 ScannerListener(final String n) {
2467 this.scannerName = n;
2468 }
2469
2470 @Override
2471 public void leaseExpired() {
2472 RegionScannerHolder rsh = scanners.remove(this.scannerName);
2473 if (rsh != null) {
2474 RegionScanner s = rsh.s;
2475 LOG.info("Scanner " + this.scannerName + " lease expired on region "
2476 + s.getRegionInfo().getRegionNameAsString());
2477 try {
2478 HRegion region = getRegion(s.getRegionInfo().getRegionName());
2479 if (region != null && region.getCoprocessorHost() != null) {
2480 region.getCoprocessorHost().preScannerClose(s);
2481 }
2482
2483 s.close();
2484 if (region != null && region.getCoprocessorHost() != null) {
2485 region.getCoprocessorHost().postScannerClose(s);
2486 }
2487 } catch (IOException e) {
2488 LOG.error("Closing scanner for "
2489 + s.getRegionInfo().getRegionNameAsString(), e);
2490 }
2491 } else {
2492 LOG.info("Scanner " + this.scannerName + " lease expired");
2493 }
2494 }
2495 }
2496
2497
2498
2499
2500
2501
2502 protected void checkOpen() throws IOException {
2503 if (this.stopped || this.abortRequested) {
2504 throw new RegionServerStoppedException("Server " + getServerName() +
2505 " not running" + (this.abortRequested ? ", aborting" : ""));
2506 }
2507 if (!fsOk) {
2508 throw new RegionServerStoppedException("File system not available");
2509 }
2510 }
2511
2512
2513
2514
2515
2516
2517 private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2518 try {
2519 if (!closeRegion(region.getEncodedName(), abort, false, -1, null)) {
2520 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2521 " - ignoring and continuing");
2522 }
2523 } catch (IOException e) {
2524 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2525 " - ignoring and continuing", e);
2526 }
2527 }
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553 protected boolean closeRegion(String encodedName, final boolean abort,
2554 final boolean zk, final int versionOfClosingNode, final ServerName sn)
2555 throws NotServingRegionException, RegionAlreadyInTransitionException {
2556
2557 HRegion actualRegion = this.getFromOnlineRegions(encodedName);
2558 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2559 try {
2560 actualRegion.getCoprocessorHost().preClose(false);
2561 } catch (IOException exp) {
2562 LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2563 return false;
2564 }
2565 }
2566
2567 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2568 Boolean.FALSE);
2569
2570 if (Boolean.TRUE.equals(previous)) {
2571 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
2572 "trying to OPEN. Cancelling OPENING.");
2573 if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
2574
2575
2576 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
2577 " Doing a standard close now");
2578 return closeRegion(encodedName, abort, zk, versionOfClosingNode, sn);
2579 }
2580
2581 actualRegion = this.getFromOnlineRegions(encodedName);
2582 if (actualRegion == null) {
2583 LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
2584
2585 throw new NotServingRegionException("The region " + encodedName +
2586 " was opening but not yet served. Opening is cancelled.");
2587 }
2588 } else if (Boolean.FALSE.equals(previous)) {
2589 LOG.info("Received CLOSE for the region: " + encodedName +
2590 " ,which we are already trying to CLOSE, but not completed yet");
2591
2592
2593
2594
2595
2596
2597 throw new RegionAlreadyInTransitionException("The region " + encodedName +
2598 " was already closing. New CLOSE request is ignored.");
2599 }
2600
2601 if (actualRegion == null) {
2602 LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
2603 this.regionsInTransitionInRS.remove(encodedName.getBytes());
2604
2605 throw new NotServingRegionException("The region " + encodedName +
2606 " is not online, and is not opening.");
2607 }
2608
2609 CloseRegionHandler crh;
2610 final HRegionInfo hri = actualRegion.getRegionInfo();
2611 if (hri.isMetaRegion()) {
2612 crh = new CloseMetaHandler(this, this, hri, abort, zk, versionOfClosingNode);
2613 } else {
2614 crh = new CloseRegionHandler(this, this, hri, abort, zk, versionOfClosingNode, sn);
2615 }
2616 this.service.submit(crh);
2617 return true;
2618 }
2619
2620
2621
2622
2623
2624
2625 public HRegion getOnlineRegion(final byte[] regionName) {
2626 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2627 return this.onlineRegions.get(encodedRegionName);
2628 }
2629
2630 public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
2631 return this.regionFavoredNodesMap.get(encodedRegionName);
2632 }
2633
2634 @Override
2635 public HRegion getFromOnlineRegions(final String encodedRegionName) {
2636 return this.onlineRegions.get(encodedRegionName);
2637 }
2638
2639
2640 @Override
2641 public boolean removeFromOnlineRegions(final HRegion r, ServerName destination) {
2642 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
2643
2644 if (destination != null) {
2645 HLog wal = getWAL();
2646 long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
2647 if (closeSeqNum == HConstants.NO_SEQNUM) {
2648
2649 closeSeqNum = r.getOpenSeqNum();
2650 if (closeSeqNum == HConstants.NO_SEQNUM) {
2651 closeSeqNum = 0;
2652 }
2653 }
2654 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
2655 }
2656 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
2657 return toReturn != null;
2658 }
2659
2660
2661
2662
2663
2664
2665
2666
2667
2668 protected HRegion getRegion(final byte[] regionName)
2669 throws NotServingRegionException {
2670 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
2671 return getRegionByEncodedName(regionName, encodedRegionName);
2672 }
2673
2674 protected HRegion getRegionByEncodedName(String encodedRegionName)
2675 throws NotServingRegionException {
2676 return getRegionByEncodedName(null, encodedRegionName);
2677 }
2678
2679 protected HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
2680 throws NotServingRegionException {
2681 HRegion region = this.onlineRegions.get(encodedRegionName);
2682 if (region == null) {
2683 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
2684 if (moveInfo != null) {
2685 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
2686 }
2687 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
2688 String regionNameStr = regionName == null?
2689 encodedRegionName: Bytes.toStringBinary(regionName);
2690 if (isOpening != null && isOpening.booleanValue()) {
2691 throw new RegionOpeningException("Region " + regionNameStr +
2692 " is opening on " + this.serverNameFromMasterPOV);
2693 }
2694 throw new NotServingRegionException("Region " + regionNameStr +
2695 " is not online on " + this.serverNameFromMasterPOV);
2696 }
2697 return region;
2698 }
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708 protected Throwable cleanup(final Throwable t) {
2709 return cleanup(t, null);
2710 }
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722 protected Throwable cleanup(final Throwable t, final String msg) {
2723
2724 if (t instanceof NotServingRegionException) {
2725 LOG.debug("NotServingRegionException; " + t.getMessage());
2726 return t;
2727 }
2728 if (msg == null) {
2729 LOG.error("", RemoteExceptionHandler.checkThrowable(t));
2730 } else {
2731 LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
2732 }
2733 if (!checkOOME(t)) {
2734 checkFileSystem();
2735 }
2736 return t;
2737 }
2738
2739
2740
2741
2742
2743
2744
2745
2746 protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
2747 return (t instanceof IOException ? (IOException) t : msg == null
2748 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
2749 }
2750
2751
2752
2753
2754
2755
2756
2757
2758 @Override
2759 public boolean checkOOME(final Throwable e) {
2760 boolean stop = false;
2761 try {
2762 if (e instanceof OutOfMemoryError
2763 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
2764 || (e.getMessage() != null && e.getMessage().contains(
2765 "java.lang.OutOfMemoryError"))) {
2766 stop = true;
2767 LOG.fatal(
2768 "Run out of memory; HRegionServer will abort itself immediately", e);
2769 }
2770 } finally {
2771 if (stop) {
2772 Runtime.getRuntime().halt(1);
2773 }
2774 }
2775 return stop;
2776 }
2777
2778
2779
2780
2781
2782
2783
2784 public boolean checkFileSystem() {
2785 if (this.fsOk && this.fs != null) {
2786 try {
2787 FSUtils.checkFileSystemAvailable(this.fs);
2788 } catch (IOException e) {
2789 abort("File System not available", e);
2790 this.fsOk = false;
2791 }
2792 }
2793 return this.fsOk;
2794 }
2795
2796 protected long addScanner(RegionScanner s, HRegion r) throws LeaseStillHeldException {
2797 long scannerId = this.scannerIdGen.incrementAndGet();
2798 String scannerName = String.valueOf(scannerId);
2799
2800 RegionScannerHolder existing =
2801 scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
2802 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
2803
2804 this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
2805 new ScannerListener(scannerName));
2806
2807 return scannerId;
2808 }
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819 @Override
2820 public GetResponse get(final RpcController controller,
2821 final GetRequest request) throws ServiceException {
2822 long before = EnvironmentEdgeManager.currentTimeMillis();
2823 try {
2824 checkOpen();
2825 requestCount.increment();
2826 HRegion region = getRegion(request.getRegion());
2827
2828 GetResponse.Builder builder = GetResponse.newBuilder();
2829 ClientProtos.Get get = request.getGet();
2830 Boolean existence = null;
2831 Result r = null;
2832
2833 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2834 if (get.getColumnCount() != 1) {
2835 throw new DoNotRetryIOException(
2836 "get ClosestRowBefore supports one and only one family now, not "
2837 + get.getColumnCount() + " families");
2838 }
2839 byte[] row = get.getRow().toByteArray();
2840 byte[] family = get.getColumn(0).getFamily().toByteArray();
2841 r = region.getClosestRowBefore(row, family);
2842 } else {
2843 Get clientGet = ProtobufUtil.toGet(get);
2844 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2845 existence = region.getCoprocessorHost().preExists(clientGet);
2846 }
2847 if (existence == null) {
2848 r = region.get(clientGet);
2849 if (get.getExistenceOnly()) {
2850 boolean exists = r.getExists();
2851 if (region.getCoprocessorHost() != null) {
2852 exists = region.getCoprocessorHost().postExists(clientGet, exists);
2853 }
2854 existence = exists;
2855 }
2856 }
2857 }
2858 if (existence != null){
2859 ClientProtos.Result pbr = ProtobufUtil.toResult(existence);
2860 builder.setResult(pbr);
2861 } else if (r != null) {
2862 ClientProtos.Result pbr = ProtobufUtil.toResult(r);
2863 builder.setResult(pbr);
2864 }
2865 return builder.build();
2866 } catch (IOException ie) {
2867 throw new ServiceException(ie);
2868 } finally {
2869 metricsRegionServer.updateGet(EnvironmentEdgeManager.currentTimeMillis() - before);
2870 }
2871 }
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881 @Override
2882 public MutateResponse mutate(final RpcController rpcc,
2883 final MutateRequest request) throws ServiceException {
2884
2885
2886 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2887 CellScanner cellScanner = controller != null? controller.cellScanner(): null;
2888
2889 if (controller != null) controller.setCellScanner(null);
2890 try {
2891 checkOpen();
2892 requestCount.increment();
2893 HRegion region = getRegion(request.getRegion());
2894 MutateResponse.Builder builder = MutateResponse.newBuilder();
2895 MutationProto mutation = request.getMutation();
2896 if (!region.getRegionInfo().isMetaTable()) {
2897 cacheFlusher.reclaimMemStoreMemory();
2898 }
2899 long nonceGroup = request.hasNonceGroup()
2900 ? request.getNonceGroup() : HConstants.NO_NONCE;
2901 Result r = null;
2902 Boolean processed = null;
2903 MutationType type = mutation.getMutateType();
2904 switch (type) {
2905 case APPEND:
2906
2907 r = append(region, mutation, cellScanner, nonceGroup);
2908 break;
2909 case INCREMENT:
2910
2911 r = increment(region, mutation, cellScanner, nonceGroup);
2912 break;
2913 case PUT:
2914 Put put = ProtobufUtil.toPut(mutation, cellScanner);
2915 if (request.hasCondition()) {
2916 Condition condition = request.getCondition();
2917 byte[] row = condition.getRow().toByteArray();
2918 byte[] family = condition.getFamily().toByteArray();
2919 byte[] qualifier = condition.getQualifier().toByteArray();
2920 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2921 ByteArrayComparable comparator =
2922 ProtobufUtil.toComparator(condition.getComparator());
2923 if (region.getCoprocessorHost() != null) {
2924 processed = region.getCoprocessorHost().preCheckAndPut(
2925 row, family, qualifier, compareOp, comparator, put);
2926 }
2927 if (processed == null) {
2928 boolean result = region.checkAndMutate(row, family,
2929 qualifier, compareOp, comparator, put, true);
2930 if (region.getCoprocessorHost() != null) {
2931 result = region.getCoprocessorHost().postCheckAndPut(row, family,
2932 qualifier, compareOp, comparator, put, result);
2933 }
2934 processed = result;
2935 }
2936 } else {
2937 region.put(put);
2938 processed = Boolean.TRUE;
2939 }
2940 break;
2941 case DELETE:
2942 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2943 if (request.hasCondition()) {
2944 Condition condition = request.getCondition();
2945 byte[] row = condition.getRow().toByteArray();
2946 byte[] family = condition.getFamily().toByteArray();
2947 byte[] qualifier = condition.getQualifier().toByteArray();
2948 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2949 ByteArrayComparable comparator =
2950 ProtobufUtil.toComparator(condition.getComparator());
2951 if (region.getCoprocessorHost() != null) {
2952 processed = region.getCoprocessorHost().preCheckAndDelete(
2953 row, family, qualifier, compareOp, comparator, delete);
2954 }
2955 if (processed == null) {
2956 boolean result = region.checkAndMutate(row, family,
2957 qualifier, compareOp, comparator, delete, true);
2958 if (region.getCoprocessorHost() != null) {
2959 result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2960 qualifier, compareOp, comparator, delete, result);
2961 }
2962 processed = result;
2963 }
2964 } else {
2965 region.delete(delete);
2966 processed = Boolean.TRUE;
2967 }
2968 break;
2969 default:
2970 throw new DoNotRetryIOException(
2971 "Unsupported mutate type: " + type.name());
2972 }
2973 if (processed != null) builder.setProcessed(processed.booleanValue());
2974 addResult(builder, r, controller);
2975 return builder.build();
2976 } catch (IOException ie) {
2977 checkFileSystem();
2978 throw new ServiceException(ie);
2979 }
2980 }
2981
2982
2983
2984
2985
2986 private boolean isClientCellBlockSupport() {
2987 RpcCallContext context = RpcServer.getCurrentCall();
2988 return context != null && context.isClientCellBlockSupport();
2989 }
2990
2991 private void addResult(final MutateResponse.Builder builder,
2992 final Result result, final PayloadCarryingRpcController rpcc) {
2993 if (result == null) return;
2994 if (isClientCellBlockSupport()) {
2995 builder.setResult(ProtobufUtil.toResultNoData(result));
2996 rpcc.setCellScanner(result.cellScanner());
2997 } else {
2998 ClientProtos.Result pbr = ProtobufUtil.toResult(result);
2999 builder.setResult(pbr);
3000 }
3001 }
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014 @Override
3015 public ScanResponse scan(final RpcController controller, final ScanRequest request)
3016 throws ServiceException {
3017 Leases.Lease lease = null;
3018 String scannerName = null;
3019 try {
3020 if (!request.hasScannerId() && !request.hasScan()) {
3021 throw new DoNotRetryIOException(
3022 "Missing required input: scannerId or scan");
3023 }
3024 long scannerId = -1;
3025 if (request.hasScannerId()) {
3026 scannerId = request.getScannerId();
3027 scannerName = String.valueOf(scannerId);
3028 }
3029 try {
3030 checkOpen();
3031 } catch (IOException e) {
3032
3033
3034 if (scannerName != null) {
3035 try {
3036 leases.cancelLease(scannerName);
3037 } catch (LeaseException le) {
3038 LOG.info("Server shutting down and client tried to access missing scanner " +
3039 scannerName);
3040 }
3041 }
3042 throw e;
3043 }
3044 requestCount.increment();
3045
3046 int ttl = 0;
3047 HRegion region = null;
3048 RegionScanner scanner = null;
3049 RegionScannerHolder rsh = null;
3050 boolean moreResults = true;
3051 boolean closeScanner = false;
3052 ScanResponse.Builder builder = ScanResponse.newBuilder();
3053 if (request.hasCloseScanner()) {
3054 closeScanner = request.getCloseScanner();
3055 }
3056 int rows = closeScanner ? 0 : 1;
3057 if (request.hasNumberOfRows()) {
3058 rows = request.getNumberOfRows();
3059 }
3060 if (request.hasScannerId()) {
3061 rsh = scanners.get(scannerName);
3062 if (rsh == null) {
3063 LOG.info("Client tried to access missing scanner " + scannerName);
3064 throw new UnknownScannerException(
3065 "Name: " + scannerName + ", already closed?");
3066 }
3067 scanner = rsh.s;
3068 HRegionInfo hri = scanner.getRegionInfo();
3069 region = getRegion(hri.getRegionName());
3070 if (region != rsh.r) {
3071 throw new NotServingRegionException("Region was re-opened after the scanner"
3072 + scannerName + " was created: " + hri.getRegionNameAsString());
3073 }
3074 } else {
3075 region = getRegion(request.getRegion());
3076 ClientProtos.Scan protoScan = request.getScan();
3077 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
3078 Scan scan = ProtobufUtil.toScan(protoScan);
3079
3080 if (!isLoadingCfsOnDemandSet) {
3081 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
3082 }
3083 scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
3084 region.prepareScanner(scan);
3085 if (region.getCoprocessorHost() != null) {
3086 scanner = region.getCoprocessorHost().preScannerOpen(scan);
3087 }
3088 if (scanner == null) {
3089 scanner = region.getScanner(scan);
3090 }
3091 if (region.getCoprocessorHost() != null) {
3092 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
3093 }
3094 scannerId = addScanner(scanner, region);
3095 scannerName = String.valueOf(scannerId);
3096 ttl = this.scannerLeaseTimeoutPeriod;
3097 }
3098
3099 if (rows > 0) {
3100
3101
3102
3103 if (request.hasNextCallSeq()) {
3104 if (rsh == null) {
3105 rsh = scanners.get(scannerName);
3106 }
3107 if (rsh != null) {
3108 if (request.getNextCallSeq() != rsh.nextCallSeq) {
3109 throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
3110 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
3111 "; request=" + TextFormat.shortDebugString(request));
3112 }
3113
3114 rsh.nextCallSeq++;
3115 }
3116 }
3117 try {
3118
3119
3120 lease = leases.removeLease(scannerName);
3121 List<Result> results = new ArrayList<Result>(rows);
3122 long currentScanResultSize = 0;
3123
3124 boolean done = false;
3125
3126 if (region != null && region.getCoprocessorHost() != null) {
3127 Boolean bypass = region.getCoprocessorHost().preScannerNext(
3128 scanner, results, rows);
3129 if (!results.isEmpty()) {
3130 for (Result r : results) {
3131 if (maxScannerResultSize < Long.MAX_VALUE){
3132 for (Cell kv : r.rawCells()) {
3133
3134 currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
3135 }
3136 }
3137 }
3138 }
3139 if (bypass != null && bypass.booleanValue()) {
3140 done = true;
3141 }
3142 }
3143
3144 if (!done) {
3145 long maxResultSize = scanner.getMaxResultSize();
3146 if (maxResultSize <= 0) {
3147 maxResultSize = maxScannerResultSize;
3148 }
3149 List<Cell> values = new ArrayList<Cell>();
3150 region.startRegionOperation(Operation.SCAN);
3151 try {
3152 int i = 0;
3153 synchronized(scanner) {
3154 for (; i < rows
3155 && currentScanResultSize < maxResultSize; ) {
3156
3157 boolean moreRows = scanner.nextRaw(values);
3158 if (!values.isEmpty()) {
3159 if (maxScannerResultSize < Long.MAX_VALUE){
3160 for (Cell kv : values) {
3161 currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
3162 }
3163 }
3164 results.add(Result.create(values));
3165 i++;
3166 }
3167 if (!moreRows) {
3168 break;
3169 }
3170 values.clear();
3171 }
3172 }
3173 region.readRequestsCount.add(i);
3174 } finally {
3175 region.closeRegionOperation();
3176 }
3177
3178
3179 if (region != null && region.getCoprocessorHost() != null) {
3180 region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
3181 }
3182 }
3183
3184
3185
3186
3187 if (scanner.isFilterDone() && results.isEmpty()) {
3188 moreResults = false;
3189 results = null;
3190 } else {
3191 addResults(builder, results, controller);
3192 }
3193 } finally {
3194
3195
3196 if (scanners.containsKey(scannerName)) {
3197 if (lease != null) leases.addLease(lease);
3198 ttl = this.scannerLeaseTimeoutPeriod;
3199 }
3200 }
3201 }
3202
3203 if (!moreResults || closeScanner) {
3204 ttl = 0;
3205 moreResults = false;
3206 if (region != null && region.getCoprocessorHost() != null) {
3207 if (region.getCoprocessorHost().preScannerClose(scanner)) {
3208 return builder.build();
3209 }
3210 }
3211 rsh = scanners.remove(scannerName);
3212 if (rsh != null) {
3213 scanner = rsh.s;
3214 scanner.close();
3215 leases.cancelLease(scannerName);
3216 if (region != null && region.getCoprocessorHost() != null) {
3217 region.getCoprocessorHost().postScannerClose(scanner);
3218 }
3219 }
3220 }
3221
3222 if (ttl > 0) {
3223 builder.setTtl(ttl);
3224 }
3225 builder.setScannerId(scannerId);
3226 builder.setMoreResults(moreResults);
3227 return builder.build();
3228 } catch (IOException ie) {
3229 if (scannerName != null && ie instanceof NotServingRegionException) {
3230 RegionScannerHolder rsh = scanners.remove(scannerName);
3231 if (rsh != null) {
3232 try {
3233 RegionScanner scanner = rsh.s;
3234 scanner.close();
3235 leases.cancelLease(scannerName);
3236 } catch (IOException e) {}
3237 }
3238 }
3239 throw new ServiceException(ie);
3240 }
3241 }
3242
3243 private void addResults(final ScanResponse.Builder builder, final List<Result> results,
3244 final RpcController controller) {
3245 if (results == null || results.isEmpty()) return;
3246 if (isClientCellBlockSupport()) {
3247 for (Result res : results) {
3248 builder.addCellsPerResult(res.size());
3249 }
3250 ((PayloadCarryingRpcController)controller).
3251 setCellScanner(CellUtil.createCellScanner(results));
3252 } else {
3253 for (Result res: results) {
3254 ClientProtos.Result pbr = ProtobufUtil.toResult(res);
3255 builder.addResults(pbr);
3256 }
3257 }
3258 }
3259
3260
3261
3262
3263
3264
3265 @Override
3266 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
3267 final BulkLoadHFileRequest request) throws ServiceException {
3268 try {
3269 checkOpen();
3270 requestCount.increment();
3271 HRegion region = getRegion(request.getRegion());
3272 List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
3273 for (FamilyPath familyPath: request.getFamilyPathList()) {
3274 familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
3275 familyPath.getPath()));
3276 }
3277 boolean bypass = false;
3278 if (region.getCoprocessorHost() != null) {
3279 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
3280 }
3281 boolean loaded = false;
3282 if (!bypass) {
3283 loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum());
3284 }
3285 if (region.getCoprocessorHost() != null) {
3286 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
3287 }
3288 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
3289 builder.setLoaded(loaded);
3290 return builder.build();
3291 } catch (IOException ie) {
3292 throw new ServiceException(ie);
3293 }
3294 }
3295
3296 @Override
3297 public CoprocessorServiceResponse execService(final RpcController controller,
3298 final CoprocessorServiceRequest request) throws ServiceException {
3299 try {
3300 checkOpen();
3301 requestCount.increment();
3302 HRegion region = getRegion(request.getRegion());
3303 Message result = execServiceOnRegion(region, request.getCall());
3304 CoprocessorServiceResponse.Builder builder =
3305 CoprocessorServiceResponse.newBuilder();
3306 builder.setRegion(RequestConverter.buildRegionSpecifier(
3307 RegionSpecifierType.REGION_NAME, region.getRegionName()));
3308 builder.setValue(
3309 builder.getValueBuilder().setName(result.getClass().getName())
3310 .setValue(result.toByteString()));
3311 return builder.build();
3312 } catch (IOException ie) {
3313 throw new ServiceException(ie);
3314 }
3315 }
3316
3317 private Message execServiceOnRegion(HRegion region,
3318 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
3319
3320 ServerRpcController execController = new ServerRpcController();
3321 Message result = region.execService(execController, serviceCall);
3322 if (execController.getFailedOn() != null) {
3323 throw execController.getFailedOn();
3324 }
3325 return result;
3326 }
3327
3328
3329
3330
3331
3332
3333
3334
3335 @Override
3336 public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
3337 throws ServiceException {
3338 try {
3339 checkOpen();
3340 } catch (IOException ie) {
3341 throw new ServiceException(ie);
3342 }
3343
3344
3345
3346 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
3347 CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
3348 if (controller != null) controller.setCellScanner(null);
3349
3350 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
3351
3352
3353 List<CellScannable> cellsToReturn = null;
3354 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
3355 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
3356
3357 for (RegionAction regionAction : request.getRegionActionList()) {
3358 this.requestCount.add(regionAction.getActionCount());
3359 HRegion region;
3360 regionActionResultBuilder.clear();
3361 try {
3362 region = getRegion(regionAction.getRegion());
3363 } catch (IOException e) {
3364 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
3365 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
3366 continue;
3367 }
3368
3369 if (regionAction.hasAtomic() && regionAction.getAtomic()) {
3370
3371
3372 try {
3373 mutateRows(region, regionAction.getActionList(), cellScanner);
3374 } catch (IOException e) {
3375
3376 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
3377 }
3378 } else {
3379
3380 cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
3381 regionActionResultBuilder, cellsToReturn, nonceGroup);
3382 }
3383 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
3384 }
3385
3386 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
3387 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
3388 }
3389 return responseBuilder.build();
3390 }
3391
3392
3393
3394
3395
3396
3397
3398
3399
3400
3401
3402
3403 private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
3404 final RegionAction actions, final CellScanner cellScanner,
3405 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
3406
3407
3408
3409
3410 List<ClientProtos.Action> mutations = null;
3411 for (ClientProtos.Action action: actions.getActionList()) {
3412 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
3413 try {
3414 Result r = null;
3415 if (action.hasGet()) {
3416 Get get = ProtobufUtil.toGet(action.getGet());
3417 r = region.get(get);
3418 } else if (action.hasServiceCall()) {
3419 resultOrExceptionBuilder = ResultOrException.newBuilder();
3420 try {
3421 Message result = execServiceOnRegion(region, action.getServiceCall());
3422 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
3423 ClientProtos.CoprocessorServiceResult.newBuilder();
3424 resultOrExceptionBuilder.setServiceResult(
3425 serviceResultBuilder.setValue(
3426 serviceResultBuilder.getValueBuilder()
3427 .setName(result.getClass().getName())
3428 .setValue(result.toByteString())));
3429 } catch (IOException ioe) {
3430 resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
3431 }
3432 } else if (action.hasMutation()) {
3433 MutationType type = action.getMutation().getMutateType();
3434 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
3435 !mutations.isEmpty()) {
3436
3437 doBatchOp(builder, region, mutations, cellScanner);
3438 mutations.clear();
3439 }
3440 switch (type) {
3441 case APPEND:
3442 r = append(region, action.getMutation(), cellScanner, nonceGroup);
3443 break;
3444 case INCREMENT:
3445 r = increment(region, action.getMutation(), cellScanner, nonceGroup);
3446 break;
3447 case PUT:
3448 case DELETE:
3449
3450 if (mutations == null) {
3451 mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
3452 }
3453 mutations.add(action);
3454 break;
3455 default:
3456 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
3457 }
3458 } else {
3459 throw new HBaseIOException("Unexpected Action type");
3460 }
3461 if (r != null) {
3462 ClientProtos.Result pbResult = null;
3463 if (isClientCellBlockSupport()) {
3464 pbResult = ProtobufUtil.toResultNoData(r);
3465
3466 if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
3467 cellsToReturn.add(r);
3468 } else {
3469 pbResult = ProtobufUtil.toResult(r);
3470 }
3471 resultOrExceptionBuilder =
3472 ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
3473 }
3474
3475
3476
3477
3478 } catch (IOException ie) {
3479 resultOrExceptionBuilder = ResultOrException.newBuilder().
3480 setException(ResponseConverter.buildException(ie));
3481 }
3482 if (resultOrExceptionBuilder != null) {
3483
3484 resultOrExceptionBuilder.setIndex(action.getIndex());
3485 builder.addResultOrException(resultOrExceptionBuilder.build());
3486 }
3487 }
3488
3489 if (mutations != null && !mutations.isEmpty()) {
3490 doBatchOp(builder, region, mutations, cellScanner);
3491 }
3492 return cellsToReturn;
3493 }
3494
3495
3496
3497
3498 @Override
3499 @QosPriority(priority=HConstants.HIGH_QOS)
3500 public GetRegionInfoResponse getRegionInfo(final RpcController controller,
3501 final GetRegionInfoRequest request) throws ServiceException {
3502 try {
3503 checkOpen();
3504 requestCount.increment();
3505 HRegion region = getRegion(request.getRegion());
3506 HRegionInfo info = region.getRegionInfo();
3507 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
3508 builder.setRegionInfo(HRegionInfo.convert(info));
3509 if (request.hasCompactionState() && request.getCompactionState()) {
3510 builder.setCompactionState(region.getCompactionState());
3511 }
3512 builder.setIsRecovering(region.isRecovering());
3513 return builder.build();
3514 } catch (IOException ie) {
3515 throw new ServiceException(ie);
3516 }
3517 }
3518
3519 @Override
3520 public GetStoreFileResponse getStoreFile(final RpcController controller,
3521 final GetStoreFileRequest request) throws ServiceException {
3522 try {
3523 checkOpen();
3524 HRegion region = getRegion(request.getRegion());
3525 requestCount.increment();
3526 Set<byte[]> columnFamilies;
3527 if (request.getFamilyCount() == 0) {
3528 columnFamilies = region.getStores().keySet();
3529 } else {
3530 columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
3531 for (ByteString cf: request.getFamilyList()) {
3532 columnFamilies.add(cf.toByteArray());
3533 }
3534 }
3535 int nCF = columnFamilies.size();
3536 List<String> fileList = region.getStoreFileList(
3537 columnFamilies.toArray(new byte[nCF][]));
3538 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
3539 builder.addAllStoreFile(fileList);
3540 return builder.build();
3541 } catch (IOException ie) {
3542 throw new ServiceException(ie);
3543 }
3544 }
3545
3546 @Override
3547 @QosPriority(priority=HConstants.HIGH_QOS)
3548 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
3549 final GetOnlineRegionRequest request) throws ServiceException {
3550 try {
3551 checkOpen();
3552 requestCount.increment();
3553 List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
3554 for (HRegion region: this.onlineRegions.values()) {
3555 list.add(region.getRegionInfo());
3556 }
3557 Collections.sort(list);
3558 return ResponseConverter.buildGetOnlineRegionResponse(list);
3559 } catch (IOException ie) {
3560 throw new ServiceException(ie);
3561 }
3562 }
3563
3564
3565
3566
3567
3568
3569
3570
3571
3572
3573
3574
3575
3576
3577
3578
3579
3580
3581
3582
3583
3584
3585
3586
3587
3588
3589 @Override
3590 @QosPriority(priority=HConstants.HIGH_QOS)
3591 public OpenRegionResponse openRegion(final RpcController controller,
3592 final OpenRegionRequest request) throws ServiceException {
3593 try {
3594 checkOpen();
3595 } catch (IOException ie) {
3596 throw new ServiceException(ie);
3597 }
3598 requestCount.increment();
3599 if (request.hasServerStartCode() && this.serverNameFromMasterPOV != null) {
3600
3601 long serverStartCode = request.getServerStartCode();
3602 if (this.serverNameFromMasterPOV.getStartcode() != serverStartCode) {
3603 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
3604 "different server with startCode: " + serverStartCode + ", this server is: "
3605 + this.serverNameFromMasterPOV));
3606 }
3607 }
3608 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
3609 final int regionCount = request.getOpenInfoCount();
3610 final Map<TableName, HTableDescriptor> htds =
3611 new HashMap<TableName, HTableDescriptor>(regionCount);
3612 final boolean isBulkAssign = regionCount > 1;
3613 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
3614 final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
3615
3616 int versionOfOfflineNode = -1;
3617 if (regionOpenInfo.hasVersionOfOfflineNode()) {
3618 versionOfOfflineNode = regionOpenInfo.getVersionOfOfflineNode();
3619 }
3620 HTableDescriptor htd;
3621 try {
3622 final HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName());
3623 if (onlineRegion != null) {
3624
3625 if (onlineRegion.getCoprocessorHost() != null) {
3626 onlineRegion.getCoprocessorHost().preOpen();
3627 }
3628
3629
3630 Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
3631 this.catalogTracker, region.getRegionName());
3632 if (this.getServerName().equals(p.getSecond())) {
3633 Boolean closing = regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
3634
3635
3636
3637
3638
3639 if (!Boolean.FALSE.equals(closing)
3640 && getFromOnlineRegions(region.getEncodedName()) != null) {
3641 LOG.warn("Attempted open of " + region.getEncodedName()
3642 + " but already online on this server");
3643 builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
3644 continue;
3645 }
3646 } else {
3647 LOG.warn("The region " + region.getEncodedName() + " is online on this server" +
3648 " but hbase:meta does not have this server - continue opening.");
3649 removeFromOnlineRegions(onlineRegion, null);
3650 }
3651 }
3652 LOG.info("Open " + region.getRegionNameAsString());
3653 htd = htds.get(region.getTable());
3654 if (htd == null) {
3655 htd = this.tableDescriptors.get(region.getTable());
3656 htds.put(region.getTable(), htd);
3657 }
3658
3659 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(
3660 region.getEncodedNameAsBytes(), Boolean.TRUE);
3661
3662 if (Boolean.FALSE.equals(previous)) {
3663
3664 OpenRegionHandler.
3665 tryTransitionFromOfflineToFailedOpen(this, region, versionOfOfflineNode);
3666
3667 throw new RegionAlreadyInTransitionException("Received OPEN for the region:" +
3668 region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
3669 }
3670
3671 if (Boolean.TRUE.equals(previous)) {
3672
3673 LOG.info("Receiving OPEN for the region:" +
3674 region.getRegionNameAsString() + " , which we are already trying to OPEN" +
3675 " - ignoring this new request for this region.");
3676 }
3677
3678
3679
3680 removeFromMovedRegions(region.getEncodedName());
3681
3682 if (previous == null) {
3683
3684 if (SplitLogManager.isRegionMarkedRecoveringInZK(this.zooKeeper,
3685 region.getEncodedName())) {
3686
3687
3688 if (!regionOpenInfo.hasOpenForDistributedLogReplay()
3689 || regionOpenInfo.getOpenForDistributedLogReplay()) {
3690 this.recoveringRegions.put(region.getEncodedName(), null);
3691 } else {
3692
3693
3694 List<String> tmpRegions = new ArrayList<String>();
3695 tmpRegions.add(region.getEncodedName());
3696 SplitLogManager.deleteRecoveringRegionZNodes(this.zooKeeper, tmpRegions);
3697 }
3698 }
3699
3700
3701 if (region.isMetaRegion()) {
3702 this.service.submit(new OpenMetaHandler(this, this, region, htd,
3703 versionOfOfflineNode));
3704 } else {
3705 updateRegionFavoredNodesMapping(region.getEncodedName(),
3706 regionOpenInfo.getFavoredNodesList());
3707 this.service.submit(new OpenRegionHandler(this, this, region, htd,
3708 versionOfOfflineNode));
3709 }
3710 }
3711
3712 builder.addOpeningState(RegionOpeningState.OPENED);
3713
3714 } catch (KeeperException zooKeeperEx) {
3715 LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
3716 throw new ServiceException(zooKeeperEx);
3717 } catch (IOException ie) {
3718 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
3719 if (isBulkAssign) {
3720 builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
3721 } else {
3722 throw new ServiceException(ie);
3723 }
3724 }
3725 }
3726
3727 return builder.build();
3728 }
3729
3730 @Override
3731 public void updateRegionFavoredNodesMapping(String encodedRegionName,
3732 List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3733 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3734
3735
3736 for (int i = 0; i < favoredNodes.size(); i++) {
3737 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3738 favoredNodes.get(i).getPort());
3739 }
3740 regionFavoredNodesMap.put(encodedRegionName, addr);
3741 }
3742
3743
3744
3745
3746
3747
3748
3749 @Override
3750 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3751 return regionFavoredNodesMap.get(encodedRegionName);
3752 }
3753
3754
3755
3756
3757
3758
3759
3760
3761 @Override
3762 @QosPriority(priority=HConstants.HIGH_QOS)
3763 public CloseRegionResponse closeRegion(final RpcController controller,
3764 final CloseRegionRequest request) throws ServiceException {
3765 int versionOfClosingNode = -1;
3766 if (request.hasVersionOfClosingNode()) {
3767 versionOfClosingNode = request.getVersionOfClosingNode();
3768 }
3769 boolean zk = request.getTransitionInZK();
3770 final ServerName sn = (request.hasDestinationServer() ?
3771 ProtobufUtil.toServerName(request.getDestinationServer()) : null);
3772
3773 try {
3774 checkOpen();
3775 if (request.hasServerStartCode() && this.serverNameFromMasterPOV != null) {
3776
3777 long serverStartCode = request.getServerStartCode();
3778 if (this.serverNameFromMasterPOV.getStartcode() != serverStartCode) {
3779 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
3780 "different server with startCode: " + serverStartCode + ", this server is: "
3781 + this.serverNameFromMasterPOV));
3782 }
3783 }
3784 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
3785
3786
3787 final HRegion region = this.getFromOnlineRegions(encodedRegionName);
3788 if ((region != null) && (region .getCoprocessorHost() != null)) {
3789 region.getCoprocessorHost().preClose(false);
3790 }
3791
3792 requestCount.increment();
3793 LOG.info("Close " + encodedRegionName + ", via zk=" + (zk ? "yes" : "no") +
3794 ", znode version=" + versionOfClosingNode + ", on " + sn);
3795
3796 boolean closed = closeRegion(encodedRegionName, false, zk, versionOfClosingNode, sn);
3797 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
3798 return builder.build();
3799 } catch (IOException ie) {
3800 throw new ServiceException(ie);
3801 }
3802 }
3803
3804
3805
3806
3807
3808
3809
3810
3811 @Override
3812 @QosPriority(priority=HConstants.HIGH_QOS)
3813 public FlushRegionResponse flushRegion(final RpcController controller,
3814 final FlushRegionRequest request) throws ServiceException {
3815 try {
3816 checkOpen();
3817 requestCount.increment();
3818 HRegion region = getRegion(request.getRegion());
3819 LOG.info("Flushing " + region.getRegionNameAsString());
3820 boolean shouldFlush = true;
3821 if (request.hasIfOlderThanTs()) {
3822 shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
3823 }
3824 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
3825 if (shouldFlush) {
3826 boolean result = region.flushcache().isCompactionNeeded();
3827 if (result) {
3828 this.compactSplitThread.requestSystemCompaction(region,
3829 "Compaction through user triggered flush");
3830 }
3831 builder.setFlushed(result);
3832 }
3833 builder.setLastFlushTime(region.getLastFlushTime());
3834 return builder.build();
3835 } catch (DroppedSnapshotException ex) {
3836
3837
3838
3839
3840 abort("Replay of HLog required. Forcing server shutdown", ex);
3841 throw new ServiceException(ex);
3842 } catch (IOException ie) {
3843 throw new ServiceException(ie);
3844 }
3845 }
3846
3847
3848
3849
3850
3851
3852
3853
3854 @Override
3855 @QosPriority(priority=HConstants.HIGH_QOS)
3856 public SplitRegionResponse splitRegion(final RpcController controller,
3857 final SplitRegionRequest request) throws ServiceException {
3858 try {
3859 checkOpen();
3860 requestCount.increment();
3861 HRegion region = getRegion(request.getRegion());
3862 region.startRegionOperation(Operation.SPLIT_REGION);
3863 LOG.info("Splitting " + region.getRegionNameAsString());
3864 region.flushcache();
3865 byte[] splitPoint = null;
3866 if (request.hasSplitPoint()) {
3867 splitPoint = request.getSplitPoint().toByteArray();
3868 }
3869 region.forceSplit(splitPoint);
3870 compactSplitThread.requestSplit(region, region.checkSplit());
3871 return SplitRegionResponse.newBuilder().build();
3872 } catch (IOException ie) {
3873 throw new ServiceException(ie);
3874 }
3875 }
3876
3877
3878
3879
3880
3881
3882
3883
3884
3885 @Override
3886 @QosPriority(priority = HConstants.HIGH_QOS)
3887 public MergeRegionsResponse mergeRegions(final RpcController controller,
3888 final MergeRegionsRequest request) throws ServiceException {
3889 try {
3890 checkOpen();
3891 requestCount.increment();
3892 HRegion regionA = getRegion(request.getRegionA());
3893 HRegion regionB = getRegion(request.getRegionB());
3894 boolean forcible = request.getForcible();
3895 regionA.startRegionOperation(Operation.MERGE_REGION);
3896 regionB.startRegionOperation(Operation.MERGE_REGION);
3897 LOG.info("Receiving merging request for " + regionA + ", " + regionB
3898 + ",forcible=" + forcible);
3899 regionA.flushcache();
3900 regionB.flushcache();
3901 compactSplitThread.requestRegionsMerge(regionA, regionB, forcible);
3902 return MergeRegionsResponse.newBuilder().build();
3903 } catch (IOException ie) {
3904 throw new ServiceException(ie);
3905 }
3906 }
3907
3908
3909
3910
3911
3912
3913
3914
3915 @Override
3916 @QosPriority(priority=HConstants.HIGH_QOS)
3917 public CompactRegionResponse compactRegion(final RpcController controller,
3918 final CompactRegionRequest request) throws ServiceException {
3919 try {
3920 checkOpen();
3921 requestCount.increment();
3922 HRegion region = getRegion(request.getRegion());
3923 region.startRegionOperation(Operation.COMPACT_REGION);
3924 LOG.info("Compacting " + region.getRegionNameAsString());
3925 boolean major = false;
3926 byte [] family = null;
3927 Store store = null;
3928 if (request.hasFamily()) {
3929 family = request.getFamily().toByteArray();
3930 store = region.getStore(family);
3931 if (store == null) {
3932 throw new ServiceException(new IOException("column family " + Bytes.toString(family) +
3933 " does not exist in region " + region.getRegionNameAsString()));
3934 }
3935 }
3936 if (request.hasMajor()) {
3937 major = request.getMajor();
3938 }
3939 if (major) {
3940 if (family != null) {
3941 store.triggerMajorCompaction();
3942 } else {
3943 region.triggerMajorCompaction();
3944 }
3945 }
3946
3947 String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
3948 LOG.trace("User-triggered compaction requested for region " +
3949 region.getRegionNameAsString() + familyLogMsg);
3950 String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
3951 if(family != null) {
3952 compactSplitThread.requestCompaction(region, store, log,
3953 Store.PRIORITY_USER, null);
3954 } else {
3955 compactSplitThread.requestCompaction(region, log,
3956 Store.PRIORITY_USER, null);
3957 }
3958 return CompactRegionResponse.newBuilder().build();
3959 } catch (IOException ie) {
3960 throw new ServiceException(ie);
3961 }
3962 }
3963
3964
3965
3966
3967
3968
3969
3970
3971 @Override
3972 @QosPriority(priority=HConstants.REPLICATION_QOS)
3973 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
3974 final ReplicateWALEntryRequest request)
3975 throws ServiceException {
3976 try {
3977 if (replicationSinkHandler != null) {
3978 checkOpen();
3979 requestCount.increment();
3980 this.replicationSinkHandler.replicateLogEntries(request.getEntryList(),
3981 ((PayloadCarryingRpcController)controller).cellScanner());
3982 }
3983 return ReplicateWALEntryResponse.newBuilder().build();
3984 } catch (IOException ie) {
3985 throw new ServiceException(ie);
3986 }
3987 }
3988
3989
3990
3991
3992
3993
3994
3995
3996
3997 @Override
3998 @QosPriority(priority = HConstants.REPLAY_QOS)
3999 public ReplicateWALEntryResponse replay(final RpcController controller,
4000 final ReplicateWALEntryRequest request) throws ServiceException {
4001 long before = EnvironmentEdgeManager.currentTimeMillis();
4002 CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
4003 try {
4004 checkOpen();
4005 List<WALEntry> entries = request.getEntryList();
4006 if (entries == null || entries.isEmpty()) {
4007
4008 return ReplicateWALEntryResponse.newBuilder().build();
4009 }
4010 HRegion region = this.getRegionByEncodedName(
4011 entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
4012 RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
4013 List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
4014 List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
4015
4016 boolean needAddReplayTag = (HFile.getFormatVersion(this.conf) >= 3);
4017 for (WALEntry entry : entries) {
4018 if (nonceManager != null) {
4019 long nonceGroup = entry.getKey().hasNonceGroup()
4020 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
4021 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
4022 nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
4023 }
4024 Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
4025 new Pair<HLogKey, WALEdit>();
4026 List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
4027 cells, walEntry, needAddReplayTag);
4028 if (coprocessorHost != null) {
4029
4030
4031 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
4032 walEntry.getSecond())) {
4033
4034 continue;
4035 }
4036 walEntries.add(walEntry);
4037 }
4038 mutations.addAll(edits);
4039 }
4040
4041 if (!mutations.isEmpty()) {
4042 OperationStatus[] result = doReplayBatchOp(region, mutations);
4043
4044 for (int i = 0; result != null && i < result.length; i++) {
4045 if (result[i] != OperationStatus.SUCCESS) {
4046 throw new IOException(result[i].getExceptionMsg());
4047 }
4048 }
4049 }
4050 if (coprocessorHost != null) {
4051 for (Pair<HLogKey, WALEdit> wal : walEntries) {
4052 coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
4053 wal.getSecond());
4054 }
4055 }
4056 return ReplicateWALEntryResponse.newBuilder().build();
4057 } catch (IOException ie) {
4058 throw new ServiceException(ie);
4059 } finally {
4060 metricsRegionServer.updateReplay(EnvironmentEdgeManager.currentTimeMillis() - before);
4061 }
4062 }
4063
4064
4065
4066
4067
4068
4069
4070 @Override
4071 public RollWALWriterResponse rollWALWriter(final RpcController controller,
4072 final RollWALWriterRequest request) throws ServiceException {
4073 try {
4074 checkOpen();
4075 requestCount.increment();
4076 HLog wal = this.getWAL();
4077 byte[][] regionsToFlush = wal.rollWriter(true);
4078 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
4079 if (regionsToFlush != null) {
4080 for (byte[] region: regionsToFlush) {
4081 builder.addRegionToFlush(ByteStringer.wrap(region));
4082 }
4083 }
4084 return builder.build();
4085 } catch (IOException ie) {
4086 throw new ServiceException(ie);
4087 }
4088 }
4089
4090
4091
4092
4093
4094
4095
4096
4097 @Override
4098 public StopServerResponse stopServer(final RpcController controller,
4099 final StopServerRequest request) throws ServiceException {
4100 requestCount.increment();
4101 String reason = request.getReason();
4102 stop(reason);
4103 return StopServerResponse.newBuilder().build();
4104 }
4105
4106
4107
4108
4109
4110
4111
4112
4113 @Override
4114 public GetServerInfoResponse getServerInfo(final RpcController controller,
4115 final GetServerInfoRequest request) throws ServiceException {
4116 try {
4117 checkOpen();
4118 } catch (IOException ie) {
4119 throw new ServiceException(ie);
4120 }
4121 ServerName serverName = getServerName();
4122 requestCount.increment();
4123 return ResponseConverter.buildGetServerInfoResponse(serverName, rsInfo.getInfoPort());
4124 }
4125
4126
4127
4128
4129
4130
4131
4132
4133
4134
4135
4136 protected HRegion getRegion(
4137 final RegionSpecifier regionSpecifier) throws IOException {
4138 return getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
4139 ProtobufUtil.getRegionEncodedName(regionSpecifier));
4140 }
4141
4142
4143
4144
4145
4146
4147
4148
4149
4150
4151
4152 protected Result append(final HRegion region,
4153 final MutationProto m, final CellScanner cellScanner, long nonceGroup) throws IOException {
4154 long before = EnvironmentEdgeManager.currentTimeMillis();
4155 Append append = ProtobufUtil.toAppend(m, cellScanner);
4156 Result r = null;
4157 if (region.getCoprocessorHost() != null) {
4158 r = region.getCoprocessorHost().preAppend(append);
4159 }
4160 if (r == null) {
4161 long nonce = startNonceOperation(m, nonceGroup);
4162 boolean success = false;
4163 try {
4164 r = region.append(append, nonceGroup, nonce);
4165 success = true;
4166 } finally {
4167 endNonceOperation(m, nonceGroup, success);
4168 }
4169 if (region.getCoprocessorHost() != null) {
4170 region.getCoprocessorHost().postAppend(append, r);
4171 }
4172 }
4173 metricsRegionServer.updateAppend(EnvironmentEdgeManager.currentTimeMillis() - before);
4174 return r;
4175 }
4176
4177
4178
4179
4180
4181
4182
4183
4184
4185 protected Result increment(final HRegion region, final MutationProto mutation,
4186 final CellScanner cells, long nonceGroup) throws IOException {
4187 long before = EnvironmentEdgeManager.currentTimeMillis();
4188 Increment increment = ProtobufUtil.toIncrement(mutation, cells);
4189 Result r = null;
4190 if (region.getCoprocessorHost() != null) {
4191 r = region.getCoprocessorHost().preIncrement(increment);
4192 }
4193 if (r == null) {
4194 long nonce = startNonceOperation(mutation, nonceGroup);
4195 boolean success = false;
4196 try {
4197 r = region.increment(increment, nonceGroup, nonce);
4198 success = true;
4199 } finally {
4200 endNonceOperation(mutation, nonceGroup, success);
4201 }
4202 if (region.getCoprocessorHost() != null) {
4203 r = region.getCoprocessorHost().postIncrement(increment, r);
4204 }
4205 }
4206 metricsRegionServer.updateIncrement(EnvironmentEdgeManager.currentTimeMillis() - before);
4207 return r;
4208 }
4209
4210
4211
4212
4213
4214
4215
4216 private long startNonceOperation(final MutationProto mutation, long nonceGroup)
4217 throws IOException, OperationConflictException {
4218 if (nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
4219 boolean canProceed = false;
4220 try {
4221 canProceed = nonceManager.startOperation(nonceGroup, mutation.getNonce(), this);
4222 } catch (InterruptedException ex) {
4223 throw new InterruptedIOException("Nonce start operation interrupted");
4224 }
4225 if (!canProceed) {
4226
4227 String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
4228 + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
4229 + "] may have already completed";
4230 throw new OperationConflictException(message);
4231 }
4232 return mutation.getNonce();
4233 }
4234
4235
4236
4237
4238
4239
4240
4241 private void endNonceOperation(final MutationProto mutation, long nonceGroup,
4242 boolean success) {
4243 if (nonceManager == null || !mutation.hasNonce()) return;
4244 nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
4245 }
4246
4247 @Override
4248 public ServerNonceManager getNonceManager() {
4249 return this.nonceManager;
4250 }
4251
4252
4253
4254
4255
4256
4257
4258
4259 protected void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
4260 final List<ClientProtos.Action> mutations, final CellScanner cells) {
4261 Mutation[] mArray = new Mutation[mutations.size()];
4262 long before = EnvironmentEdgeManager.currentTimeMillis();
4263 boolean batchContainsPuts = false, batchContainsDelete = false;
4264 try {
4265 int i = 0;
4266 for (ClientProtos.Action action: mutations) {
4267 MutationProto m = action.getMutation();
4268 Mutation mutation;
4269 if (m.getMutateType() == MutationType.PUT) {
4270 mutation = ProtobufUtil.toPut(m, cells);
4271 batchContainsPuts = true;
4272 } else {
4273 mutation = ProtobufUtil.toDelete(m, cells);
4274 batchContainsDelete = true;
4275 }
4276 mArray[i++] = mutation;
4277 }
4278
4279 if (!region.getRegionInfo().isMetaTable()) {
4280 cacheFlusher.reclaimMemStoreMemory();
4281 }
4282
4283 OperationStatus codes[] = region.batchMutate(mArray);
4284 for (i = 0; i < codes.length; i++) {
4285 int index = mutations.get(i).getIndex();
4286 Exception e = null;
4287 switch (codes[i].getOperationStatusCode()) {
4288 case BAD_FAMILY:
4289 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
4290 builder.addResultOrException(getResultOrException(e, index));
4291 break;
4292
4293 case SANITY_CHECK_FAILURE:
4294 e = new FailedSanityCheckException(codes[i].getExceptionMsg());
4295 builder.addResultOrException(getResultOrException(e, index));
4296 break;
4297
4298 default:
4299 e = new DoNotRetryIOException(codes[i].getExceptionMsg());
4300 builder.addResultOrException(getResultOrException(e, index));
4301 break;
4302
4303 case SUCCESS:
4304 builder.addResultOrException(getResultOrException(ClientProtos.Result.getDefaultInstance(), index));
4305 break;
4306 }
4307 }
4308 } catch (IOException ie) {
4309 for (int i = 0; i < mutations.size(); i++) {
4310 builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
4311 }
4312 }
4313 long after = EnvironmentEdgeManager.currentTimeMillis();
4314 if (batchContainsPuts) {
4315 metricsRegionServer.updatePut(after - before);
4316 }
4317 if (batchContainsDelete) {
4318 metricsRegionServer.updateDelete(after - before);
4319 }
4320 }
4321 private static ResultOrException getResultOrException(final ClientProtos.Result r,
4322 final int index) {
4323 return getResultOrException(ResponseConverter.buildActionResult(r), index);
4324 }
4325 private static ResultOrException getResultOrException(final Exception e, final int index) {
4326 return getResultOrException(ResponseConverter.buildActionResult(e), index);
4327 }
4328
4329 private static ResultOrException getResultOrException(final ResultOrException.Builder builder,
4330 final int index) {
4331 return builder.setIndex(index).build();
4332 }
4333
4334
4335
4336
4337
4338
4339
4340
4341
4342
4343 protected OperationStatus [] doReplayBatchOp(final HRegion region,
4344 final List<HLogSplitter.MutationReplay> mutations) throws IOException {
4345
4346 long before = EnvironmentEdgeManager.currentTimeMillis();
4347 boolean batchContainsPuts = false, batchContainsDelete = false;
4348 try {
4349 for (Iterator<HLogSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
4350 HLogSplitter.MutationReplay m = it.next();
4351 if (m.type == MutationType.PUT) {
4352 batchContainsPuts = true;
4353 } else {
4354 batchContainsDelete = true;
4355 }
4356 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
4357 List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
4358 if (metaCells != null && !metaCells.isEmpty()) {
4359 for (Cell metaCell : metaCells) {
4360 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
4361 if (compactionDesc != null) {
4362 region.completeCompactionMarker(compactionDesc);
4363 }
4364 }
4365 it.remove();
4366 }
4367 }
4368 requestCount.add(mutations.size());
4369 if (!region.getRegionInfo().isMetaTable()) {
4370 cacheFlusher.reclaimMemStoreMemory();
4371 }
4372 return region.batchReplay(mutations.toArray(
4373 new HLogSplitter.MutationReplay[mutations.size()]));
4374 } finally {
4375 long after = EnvironmentEdgeManager.currentTimeMillis();
4376 if (batchContainsPuts) {
4377 metricsRegionServer.updatePut(after - before);
4378 }
4379 if (batchContainsDelete) {
4380 metricsRegionServer.updateDelete(after - before);
4381 }
4382 }
4383 }
4384
4385
4386
4387
4388
4389
4390
4391
4392
4393 protected void mutateRows(final HRegion region, final List<ClientProtos.Action> actions,
4394 final CellScanner cellScanner)
4395 throws IOException {
4396 if (!region.getRegionInfo().isMetaTable()) {
4397 cacheFlusher.reclaimMemStoreMemory();
4398 }
4399 RowMutations rm = null;
4400 for (ClientProtos.Action action: actions) {
4401 if (action.hasGet()) {
4402 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
4403 action.getGet());
4404 }
4405 MutationType type = action.getMutation().getMutateType();
4406 if (rm == null) {
4407 rm = new RowMutations(action.getMutation().getRow().toByteArray());
4408 }
4409 switch (type) {
4410 case PUT:
4411 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
4412 break;
4413 case DELETE:
4414 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
4415 break;
4416 default:
4417 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
4418 }
4419 }
4420 region.mutateRow(rm);
4421 }
4422
4423 private static class MovedRegionInfo {
4424 private final ServerName serverName;
4425 private final long seqNum;
4426 private final long ts;
4427
4428 public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
4429 this.serverName = serverName;
4430 this.seqNum = closeSeqNum;
4431 ts = EnvironmentEdgeManager.currentTimeMillis();
4432 }
4433
4434 public ServerName getServerName() {
4435 return serverName;
4436 }
4437
4438 public long getSeqNum() {
4439 return seqNum;
4440 }
4441
4442 public long getMoveTime() {
4443 return ts;
4444 }
4445 }
4446
4447
4448
4449 protected Map<String, MovedRegionInfo> movedRegions =
4450 new ConcurrentHashMap<String, MovedRegionInfo>(3000);
4451
4452
4453
4454 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
4455
4456 protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
4457 if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
4458 LOG.warn("Not adding moved region record: " + encodedName + " to self.");
4459 return;
4460 }
4461 LOG.info("Adding moved region record: " + encodedName + " to "
4462 + destination.getServerName() + ":" + destination.getPort()
4463 + " as of " + closeSeqNum);
4464 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
4465 }
4466
4467 private void removeFromMovedRegions(String encodedName) {
4468 movedRegions.remove(encodedName);
4469 }
4470
4471 private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
4472 MovedRegionInfo dest = movedRegions.get(encodedRegionName);
4473
4474 long now = EnvironmentEdgeManager.currentTimeMillis();
4475 if (dest != null) {
4476 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
4477 return dest;
4478 } else {
4479 movedRegions.remove(encodedRegionName);
4480 }
4481 }
4482
4483 return null;
4484 }
4485
4486
4487
4488
4489 protected void cleanMovedRegions() {
4490 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
4491 Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
4492
4493 while (it.hasNext()){
4494 Map.Entry<String, MovedRegionInfo> e = it.next();
4495 if (e.getValue().getMoveTime() < cutOff) {
4496 it.remove();
4497 }
4498 }
4499 }
4500
4501
4502
4503
4504 protected static class MovedRegionsCleaner extends Chore implements Stoppable {
4505 private HRegionServer regionServer;
4506 Stoppable stoppable;
4507
4508 private MovedRegionsCleaner(
4509 HRegionServer regionServer, Stoppable stoppable){
4510 super("MovedRegionsCleaner for region "+regionServer, TIMEOUT_REGION_MOVED, stoppable);
4511 this.regionServer = regionServer;
4512 this.stoppable = stoppable;
4513 }
4514
4515 static MovedRegionsCleaner createAndStart(HRegionServer rs){
4516 Stoppable stoppable = new Stoppable() {
4517 private volatile boolean isStopped = false;
4518 @Override public void stop(String why) { isStopped = true;}
4519 @Override public boolean isStopped() {return isStopped;}
4520 };
4521
4522 return new MovedRegionsCleaner(rs, stoppable);
4523 }
4524
4525 @Override
4526 protected void chore() {
4527 regionServer.cleanMovedRegions();
4528 }
4529
4530 @Override
4531 public void stop(String why) {
4532 stoppable.stop(why);
4533 }
4534
4535 @Override
4536 public boolean isStopped() {
4537 return stoppable.isStopped();
4538 }
4539 }
4540
4541 private String getMyEphemeralNodePath() {
4542 return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
4543 }
4544
4545
4546
4547
4548 private static class RegionScannerHolder {
4549 private RegionScanner s;
4550 private long nextCallSeq = 0L;
4551 private HRegion r;
4552
4553 public RegionScannerHolder(RegionScanner s, HRegion r) {
4554 this.s = s;
4555 this.r = r;
4556 }
4557 }
4558
4559 private boolean isHealthCheckerConfigured() {
4560 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
4561 return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
4562 }
4563
4564
4565
4566
4567 public CompactSplitThread getCompactSplitThread() {
4568 return this.compactSplitThread;
4569 }
4570
4571
4572
4573
4574
4575
4576
4577
4578 private void updateRecoveringRegionLastFlushedSequenceId(HRegion r) throws KeeperException,
4579 IOException {
4580 if (!r.isRecovering()) {
4581
4582 return;
4583 }
4584
4585 HRegionInfo region = r.getRegionInfo();
4586 ZooKeeperWatcher zkw = getZooKeeper();
4587 String previousRSName = this.getLastFailedRSFromZK(region.getEncodedName());
4588 Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqIdForLogReplay();
4589 long minSeqIdForLogReplay = -1;
4590 for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
4591 if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
4592 minSeqIdForLogReplay = storeSeqIdForReplay;
4593 }
4594 }
4595 long lastRecordedFlushedSequenceId = -1;
4596 String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
4597 region.getEncodedName());
4598
4599 byte[] data = ZKUtil.getData(zkw, nodePath);
4600 if (data != null) {
4601 lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
4602 }
4603 if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
4604 ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
4605 }
4606 if (previousRSName != null) {
4607
4608 nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
4609 ZKUtil.setData(zkw, nodePath,
4610 ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
4611 LOG.debug("Update last flushed sequence id of region " + region.getEncodedName() + " for "
4612 + previousRSName);
4613 } else {
4614 LOG.warn("Can't find failed region server for recovering region " + region.getEncodedName());
4615 }
4616 }
4617
4618
4619
4620
4621
4622
4623 private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
4624 String result = null;
4625 long maxZxid = 0;
4626 ZooKeeperWatcher zkw = this.getZooKeeper();
4627 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
4628 List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
4629 if (failedServers == null || failedServers.isEmpty()) {
4630 return result;
4631 }
4632 for (String failedServer : failedServers) {
4633 String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
4634 Stat stat = new Stat();
4635 ZKUtil.getDataNoWatch(zkw, rsPath, stat);
4636 if (maxZxid < stat.getCzxid()) {
4637 maxZxid = stat.getCzxid();
4638 result = failedServer;
4639 }
4640 }
4641 return result;
4642 }
4643
4644 @Override
4645 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
4646 UpdateFavoredNodesRequest request) throws ServiceException {
4647 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
4648 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
4649 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
4650 HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
4651 updateRegionFavoredNodesMapping(hri.getEncodedName(),
4652 regionUpdateInfo.getFavoredNodesList());
4653 }
4654 respBuilder.setResponse(openInfoList.size());
4655 return respBuilder.build();
4656 }
4657
4658
4659
4660
4661 public CacheConfig getCacheConfig() {
4662 return this.cacheConfig;
4663 }
4664 }