1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.rest.client;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.util.Collections;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentHashMap;
27
28 import org.apache.commons.httpclient.Header;
29 import org.apache.commons.httpclient.HttpClient;
30 import org.apache.commons.httpclient.HttpMethod;
31 import org.apache.commons.httpclient.HttpVersion;
32 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
33 import org.apache.commons.httpclient.URI;
34 import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
35 import org.apache.commons.httpclient.methods.DeleteMethod;
36 import org.apache.commons.httpclient.methods.GetMethod;
37 import org.apache.commons.httpclient.methods.HeadMethod;
38 import org.apache.commons.httpclient.methods.PostMethod;
39 import org.apache.commons.httpclient.methods.PutMethod;
40 import org.apache.commons.httpclient.params.HttpClientParams;
41 import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.classification.InterfaceAudience;
45 import org.apache.hadoop.classification.InterfaceStability;
46
47
48
49
50
51 @InterfaceAudience.Public
52 @InterfaceStability.Stable
53 public class Client {
54 public static final Header[] EMPTY_HEADER_ARRAY = new Header[0];
55
56 private static final Log LOG = LogFactory.getLog(Client.class);
57
58 private HttpClient httpClient;
59 private Cluster cluster;
60
61 private Map<String, String> extraHeaders;
62
63
64
65
66 public Client() {
67 this(null);
68 }
69
70
71
72
73
74 public Client(Cluster cluster) {
75 this.cluster = cluster;
76 MultiThreadedHttpConnectionManager manager =
77 new MultiThreadedHttpConnectionManager();
78 HttpConnectionManagerParams managerParams = manager.getParams();
79 managerParams.setConnectionTimeout(2000);
80 managerParams.setDefaultMaxConnectionsPerHost(10);
81 managerParams.setMaxTotalConnections(100);
82 extraHeaders = new ConcurrentHashMap<String, String>();
83 this.httpClient = new HttpClient(manager);
84 HttpClientParams clientParams = httpClient.getParams();
85 clientParams.setVersion(HttpVersion.HTTP_1_1);
86 }
87
88
89
90
91 public void shutdown() {
92 MultiThreadedHttpConnectionManager manager =
93 (MultiThreadedHttpConnectionManager) httpClient.getHttpConnectionManager();
94 manager.shutdown();
95 }
96
97
98
99
100 public HttpClient getHttpClient() {
101 return httpClient;
102 }
103
104
105
106
107
108
109 public void addExtraHeader(final String name, final String value) {
110 extraHeaders.put(name, value);
111 }
112
113
114
115
116 public String getExtraHeader(final String name) {
117 return extraHeaders.get(name);
118 }
119
120
121
122
123 public Map<String, String> getExtraHeaders() {
124 return Collections.unmodifiableMap(extraHeaders);
125 }
126
127
128
129
130 public void removeExtraHeader(final String name) {
131 extraHeaders.remove(name);
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 public int executePathOnly(Cluster cluster, HttpMethod method,
148 Header[] headers, String path) throws IOException {
149 IOException lastException;
150 if (cluster.nodes.size() < 1) {
151 throw new IOException("Cluster is empty");
152 }
153 int start = (int)Math.round((cluster.nodes.size() - 1) * Math.random());
154 int i = start;
155 do {
156 cluster.lastHost = cluster.nodes.get(i);
157 try {
158 StringBuilder sb = new StringBuilder();
159 sb.append("http://");
160 sb.append(cluster.lastHost);
161 sb.append(path);
162 URI uri = new URI(sb.toString(), true);
163 return executeURI(method, headers, uri.toString());
164 } catch (IOException e) {
165 lastException = e;
166 }
167 } while (++i != start && i < cluster.nodes.size());
168 throw lastException;
169 }
170
171
172
173
174
175
176
177
178
179 public int executeURI(HttpMethod method, Header[] headers, String uri)
180 throws IOException {
181 method.setURI(new URI(uri, true));
182 for (Map.Entry<String, String> e: extraHeaders.entrySet()) {
183 method.addRequestHeader(e.getKey(), e.getValue());
184 }
185 if (headers != null) {
186 for (Header header: headers) {
187 method.addRequestHeader(header);
188 }
189 }
190 long startTime = System.currentTimeMillis();
191 int code = httpClient.executeMethod(method);
192 long endTime = System.currentTimeMillis();
193 if (LOG.isDebugEnabled()) {
194 LOG.debug(method.getName() + " " + uri + " " + code + " " +
195 method.getStatusText() + " in " + (endTime - startTime) + " ms");
196 }
197 return code;
198 }
199
200
201
202
203
204
205
206
207
208
209
210
211 public int execute(Cluster cluster, HttpMethod method, Header[] headers,
212 String path) throws IOException {
213 if (path.startsWith("/")) {
214 return executePathOnly(cluster, method, headers, path);
215 }
216 return executeURI(method, headers, path);
217 }
218
219
220
221
222 public Cluster getCluster() {
223 return cluster;
224 }
225
226
227
228
229 public void setCluster(Cluster cluster) {
230 this.cluster = cluster;
231 }
232
233
234
235
236
237
238
239 public Response head(String path) throws IOException {
240 return head(cluster, path, null);
241 }
242
243
244
245
246
247
248
249
250
251 public Response head(Cluster cluster, String path, Header[] headers)
252 throws IOException {
253 HeadMethod method = new HeadMethod();
254 try {
255 int code = execute(cluster, method, null, path);
256 headers = method.getResponseHeaders();
257 return new Response(code, headers, null);
258 } finally {
259 method.releaseConnection();
260 }
261 }
262
263
264
265
266
267
268
269 public Response get(String path) throws IOException {
270 return get(cluster, path);
271 }
272
273
274
275
276
277
278
279
280 public Response get(Cluster cluster, String path) throws IOException {
281 return get(cluster, path, EMPTY_HEADER_ARRAY);
282 }
283
284
285
286
287
288
289
290
291 public Response get(String path, String accept) throws IOException {
292 return get(cluster, path, accept);
293 }
294
295
296
297
298
299
300
301
302
303 public Response get(Cluster cluster, String path, String accept)
304 throws IOException {
305 Header[] headers = new Header[1];
306 headers[0] = new Header("Accept", accept);
307 return get(cluster, path, headers);
308 }
309
310
311
312
313
314
315
316
317
318 public Response get(String path, Header[] headers) throws IOException {
319 return get(cluster, path, headers);
320 }
321
322
323
324
325
326
327
328
329
330 public Response get(Cluster c, String path, Header[] headers)
331 throws IOException {
332 GetMethod method = new GetMethod();
333 try {
334 int code = execute(c, method, headers, path);
335 headers = method.getResponseHeaders();
336 byte[] body = method.getResponseBody();
337 InputStream in = method.getResponseBodyAsStream();
338 return new Response(code, headers, body, in);
339 } finally {
340 method.releaseConnection();
341 }
342 }
343
344
345
346
347
348
349
350
351
352 public Response put(String path, String contentType, byte[] content)
353 throws IOException {
354 return put(cluster, path, contentType, content);
355 }
356
357
358
359
360
361
362
363
364
365
366 public Response put(Cluster cluster, String path, String contentType,
367 byte[] content) throws IOException {
368 Header[] headers = new Header[1];
369 headers[0] = new Header("Content-Type", contentType);
370 return put(cluster, path, headers, content);
371 }
372
373
374
375
376
377
378
379
380
381
382 public Response put(String path, Header[] headers, byte[] content)
383 throws IOException {
384 return put(cluster, path, headers, content);
385 }
386
387
388
389
390
391
392
393
394
395
396
397 public Response put(Cluster cluster, String path, Header[] headers,
398 byte[] content) throws IOException {
399 PutMethod method = new PutMethod();
400 try {
401 method.setRequestEntity(new ByteArrayRequestEntity(content));
402 int code = execute(cluster, method, headers, path);
403 headers = method.getResponseHeaders();
404 content = method.getResponseBody();
405 return new Response(code, headers, content);
406 } finally {
407 method.releaseConnection();
408 }
409 }
410
411
412
413
414
415
416
417
418
419 public Response post(String path, String contentType, byte[] content)
420 throws IOException {
421 return post(cluster, path, contentType, content);
422 }
423
424
425
426
427
428
429
430
431
432
433 public Response post(Cluster cluster, String path, String contentType,
434 byte[] content) throws IOException {
435 Header[] headers = new Header[1];
436 headers[0] = new Header("Content-Type", contentType);
437 return post(cluster, path, headers, content);
438 }
439
440
441
442
443
444
445
446
447
448
449 public Response post(String path, Header[] headers, byte[] content)
450 throws IOException {
451 return post(cluster, path, headers, content);
452 }
453
454
455
456
457
458
459
460
461
462
463
464 public Response post(Cluster cluster, String path, Header[] headers,
465 byte[] content) throws IOException {
466 PostMethod method = new PostMethod();
467 try {
468 method.setRequestEntity(new ByteArrayRequestEntity(content));
469 int code = execute(cluster, method, headers, path);
470 headers = method.getResponseHeaders();
471 content = method.getResponseBody();
472 return new Response(code, headers, content);
473 } finally {
474 method.releaseConnection();
475 }
476 }
477
478
479
480
481
482
483
484 public Response delete(String path) throws IOException {
485 return delete(cluster, path);
486 }
487
488
489
490
491
492
493
494
495 public Response delete(Cluster cluster, String path) throws IOException {
496 DeleteMethod method = new DeleteMethod();
497 try {
498 int code = execute(cluster, method, null, path);
499 Header[] headers = method.getResponseHeaders();
500 byte[] content = method.getResponseBody();
501 return new Response(code, headers, content);
502 } finally {
503 method.releaseConnection();
504 }
505 }
506 }