1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.concurrent.ThreadPoolExecutor;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.ServerName;
29 import org.apache.hadoop.hbase.errorhandling.ForeignException;
30 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
31 import org.apache.hadoop.hbase.master.MasterServices;
32 import org.apache.hadoop.hbase.master.MetricsMaster;
33 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
34 import org.apache.zookeeper.KeeperException;
35
36 public class SimpleMasterProcedureManager extends MasterProcedureManager {
37
38 public static final String SIMPLE_SIGNATURE = "simle_test";
39 public static final String SIMPLE_DATA = "simple_test_data";
40
41 private static final Log LOG = LogFactory.getLog(SimpleMasterProcedureManager.class);
42
43 private MasterServices master;
44 private ProcedureCoordinator coordinator;
45
46 private boolean done;
47
48 @Override
49 public void stop(String why) {
50 LOG.info("stop: " + why);
51 }
52
53 @Override
54 public boolean isStopped() {
55 return false;
56 }
57
58 @Override
59 public void initialize(MasterServices master, MetricsMaster metricsMaster)
60 throws KeeperException, IOException, UnsupportedOperationException {
61 this.master = master;
62 this.done = false;
63
64
65 String name = master.getServerName().toString();
66 ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
67 ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
68 master.getZooKeeper(), getProcedureSignature(), name);
69
70 this.coordinator = new ProcedureCoordinator(comms, tpool);
71 }
72
73 @Override
74 public String getProcedureSignature() {
75 return SIMPLE_SIGNATURE;
76 }
77
78 @Override
79 public byte[] execProcedureWithRet(ProcedureDescription desc) throws IOException {
80 this.done = false;
81
82 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
83
84 List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
85 List<String> servers = new ArrayList<String>();
86 for (ServerName sn : serverNames) {
87 servers.add(sn.toString());
88 }
89 Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers);
90 if (proc == null) {
91 String msg = "Failed to submit distributed procedure for '"
92 + getProcedureSignature() + "'";
93 LOG.error(msg);
94 throw new IOException(msg);
95 }
96
97 HashMap<String, byte[]> returnData = null;
98 try {
99
100
101 returnData = proc.waitForCompletedWithRet();
102 LOG.info("Done waiting - exec procedure for " + desc.getInstance());
103 this.done = true;
104 } catch (InterruptedException e) {
105 ForeignException ee =
106 new ForeignException("Interrupted while waiting for procdure to finish", e);
107 monitor.receive(ee);
108 Thread.currentThread().interrupt();
109 } catch (ForeignException e) {
110 monitor.receive(e);
111 }
112
113 return returnData.values().iterator().next();
114 }
115
116 @Override
117 public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
118 return done;
119 }
120
121 }