1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client.coprocessor;
20
21 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
22 import static org.apache.hadoop.hbase.HConstants.LAST_ROW;
23
24 import org.apache.hadoop.hbase.util.ByteStringer;
25
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.client.HTable;
30 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
31 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
32 import org.apache.hadoop.hbase.ipc.ServerRpcController;
33 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
35 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
36 import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
37 import org.apache.hadoop.hbase.util.Pair;
38 import org.apache.hadoop.security.token.Token;
39
40 import java.io.IOException;
41 import java.util.ArrayList;
42 import java.util.List;
43
44
45
46
47
48 @InterfaceAudience.Private
49 public class SecureBulkLoadClient {
50 private HTable table;
51
52 public SecureBulkLoadClient(HTable table) {
53 this.table = table;
54 }
55
56 public String prepareBulkLoad(final TableName tableName) throws IOException {
57 try {
58 return
59 table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
60 EMPTY_START_ROW,
61 LAST_ROW,
62 new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService,String>() {
63 @Override
64 public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
65 ServerRpcController controller = new ServerRpcController();
66
67 BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
68 new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
69
70 SecureBulkLoadProtos.PrepareBulkLoadRequest request =
71 SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
72 .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
73
74 instance.prepareBulkLoad(controller,
75 request,
76 rpcCallback);
77
78 SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
79 if (controller.failedOnException()) {
80 throw controller.getFailedOn();
81 }
82 return response.getBulkToken();
83 }
84 }).entrySet().iterator().next().getValue();
85 } catch (Throwable throwable) {
86 throw new IOException(throwable);
87 }
88 }
89
90 public void cleanupBulkLoad(final String bulkToken) throws IOException {
91 try {
92 table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
93 EMPTY_START_ROW,
94 LAST_ROW,
95 new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService, String>() {
96
97 @Override
98 public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
99 ServerRpcController controller = new ServerRpcController();
100
101 BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
102 new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
103
104 SecureBulkLoadProtos.CleanupBulkLoadRequest request =
105 SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
106 .setBulkToken(bulkToken).build();
107
108 instance.cleanupBulkLoad(controller,
109 request,
110 rpcCallback);
111
112 if (controller.failedOnException()) {
113 throw controller.getFailedOn();
114 }
115 return null;
116 }
117 });
118 } catch (Throwable throwable) {
119 throw new IOException(throwable);
120 }
121 }
122
123 public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
124 final Token<?> userToken,
125 final String bulkToken,
126 final byte[] startRow) throws IOException {
127
128
129 try {
130 CoprocessorRpcChannel channel = table.coprocessorService(startRow);
131 SecureBulkLoadProtos.SecureBulkLoadService instance =
132 ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
133
134 SecureBulkLoadProtos.DelegationToken protoDT =
135 SecureBulkLoadProtos.DelegationToken.newBuilder().build();
136 if(userToken != null) {
137 protoDT =
138 SecureBulkLoadProtos.DelegationToken.newBuilder()
139 .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
140 .setPassword(ByteStringer.wrap(userToken.getPassword()))
141 .setKind(userToken.getKind().toString())
142 .setService(userToken.getService().toString()).build();
143 }
144
145 List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
146 new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
147 for(Pair<byte[], String> el: familyPaths) {
148 protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
149 .setFamily(ByteStringer.wrap(el.getFirst()))
150 .setPath(el.getSecond()).build());
151 }
152
153 SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
154 SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
155 .setFsToken(protoDT)
156 .addAllFamilyPath(protoFamilyPaths)
157 .setBulkToken(bulkToken).build();
158
159 ServerRpcController controller = new ServerRpcController();
160 BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback =
161 new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
162 instance.secureBulkLoadHFiles(controller,
163 request,
164 rpcCallback);
165
166 SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
167 if (controller.failedOnException()) {
168 throw controller.getFailedOn();
169 }
170 return response.getLoaded();
171 } catch (Throwable throwable) {
172 throw new IOException(throwable);
173 }
174 }
175
176 public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
177 return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
178 }
179 }