1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift2;
20
21 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.*;
22 import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
23
24 import java.io.IOException;
25 import java.lang.reflect.InvocationHandler;
26 import java.lang.reflect.InvocationTargetException;
27 import java.lang.reflect.Method;
28 import java.lang.reflect.Proxy;
29 import java.nio.ByteBuffer;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.classification.InterfaceAudience;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.hbase.client.HTableFactory;
44 import org.apache.hadoop.hbase.client.HTableInterface;
45 import org.apache.hadoop.hbase.client.HTablePool;
46 import org.apache.hadoop.hbase.client.ResultScanner;
47 import org.apache.hadoop.hbase.security.UserProvider;
48 import org.apache.hadoop.hbase.thrift.ThriftMetrics;
49 import org.apache.hadoop.hbase.thrift2.generated.*;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.ConnectionCache;
52 import org.apache.thrift.TException;
53
54 import com.google.common.cache.Cache;
55 import com.google.common.cache.CacheBuilder;
56
57
58
59
60
61 @InterfaceAudience.Private
62 @SuppressWarnings("deprecation")
63 public class ThriftHBaseServiceHandler implements THBaseService.Iface {
64
65
66 private final Cache<String, HTablePool> htablePools;
67 private final Callable<? extends HTablePool> htablePoolCreater;
68 private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
69
70
71
72 private final AtomicInteger nextScannerId = new AtomicInteger(0);
73 private final Map<Integer, ResultScanner> scannerMap =
74 new ConcurrentHashMap<Integer, ResultScanner>();
75
76 private final ConnectionCache connectionCache;
77 private final HTableFactory tableFactory;
78 private final int maxPoolSize;
79
80 static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
81 static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
82
83 public static THBaseService.Iface newInstance(
84 THBaseService.Iface handler, ThriftMetrics metrics) {
85 return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
86 new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
87 }
88
89 private static class THBaseServiceMetricsProxy implements InvocationHandler {
90 private final THBaseService.Iface handler;
91 private final ThriftMetrics metrics;
92
93 private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
94 this.handler = handler;
95 this.metrics = metrics;
96 }
97
98 @Override
99 public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
100 Object result;
101 try {
102 long start = now();
103 result = m.invoke(handler, args);
104 int processTime = (int) (now() - start);
105 metrics.incMethodTime(m.getName(), processTime);
106 } catch (InvocationTargetException e) {
107 throw e.getTargetException();
108 } catch (Exception e) {
109 throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
110 }
111 return result;
112 }
113 }
114
115 private static long now() {
116 return System.nanoTime();
117 }
118
119 ThriftHBaseServiceHandler(final Configuration conf,
120 final UserProvider userProvider) throws IOException {
121 int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
122 int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
123 connectionCache = new ConnectionCache(
124 conf, userProvider, cleanInterval, maxIdleTime);
125 tableFactory = new HTableFactory() {
126 @Override
127 public HTableInterface createHTableInterface(Configuration config,
128 byte[] tableName) {
129 try {
130 return connectionCache.getTable(Bytes.toString(tableName));
131 } catch (IOException ioe) {
132 throw new RuntimeException(ioe);
133 }
134 }
135 };
136 htablePools = CacheBuilder.newBuilder().expireAfterAccess(
137 maxIdleTime, TimeUnit.MILLISECONDS).softValues().concurrencyLevel(4).build();
138 maxPoolSize = conf.getInt("hbase.thrift.htablepool.size.max", 1000);
139 htablePoolCreater = new Callable<HTablePool>() {
140 public HTablePool call() {
141 return new HTablePool(conf, maxPoolSize, tableFactory);
142 }
143 };
144 }
145
146 private HTableInterface getTable(ByteBuffer tableName) {
147 String currentUser = connectionCache.getEffectiveUser();
148 try {
149 HTablePool htablePool = htablePools.get(currentUser, htablePoolCreater);
150 return htablePool.getTable(byteBufferToByteArray(tableName));
151 } catch (ExecutionException ee) {
152 throw new RuntimeException(ee);
153 }
154 }
155
156 private void closeTable(HTableInterface table) throws TIOError {
157 try {
158 table.close();
159 } catch (IOException e) {
160 throw getTIOError(e);
161 }
162 }
163
164 private TIOError getTIOError(IOException e) {
165 TIOError err = new TIOError();
166 err.setMessage(e.getMessage());
167 return err;
168 }
169
170
171
172
173
174
175 private int addScanner(ResultScanner scanner) {
176 int id = nextScannerId.getAndIncrement();
177 scannerMap.put(id, scanner);
178 return id;
179 }
180
181
182
183
184
185
186 private ResultScanner getScanner(int id) {
187 return scannerMap.get(id);
188 }
189
190 void setEffectiveUser(String effectiveUser) {
191 connectionCache.setEffectiveUser(effectiveUser);
192 }
193
194
195
196
197
198
199 protected ResultScanner removeScanner(int id) {
200 return scannerMap.remove(id);
201 }
202
203 @Override
204 public boolean exists(ByteBuffer table, TGet get) throws TIOError, TException {
205 HTableInterface htable = getTable(table);
206 try {
207 return htable.exists(getFromThrift(get));
208 } catch (IOException e) {
209 throw getTIOError(e);
210 } finally {
211 closeTable(htable);
212 }
213 }
214
215 @Override
216 public TResult get(ByteBuffer table, TGet get) throws TIOError, TException {
217 HTableInterface htable = getTable(table);
218 try {
219 return resultFromHBase(htable.get(getFromThrift(get)));
220 } catch (IOException e) {
221 throw getTIOError(e);
222 } finally {
223 closeTable(htable);
224 }
225 }
226
227 @Override
228 public List<TResult> getMultiple(ByteBuffer table, List<TGet> gets) throws TIOError, TException {
229 HTableInterface htable = getTable(table);
230 try {
231 return resultsFromHBase(htable.get(getsFromThrift(gets)));
232 } catch (IOException e) {
233 throw getTIOError(e);
234 } finally {
235 closeTable(htable);
236 }
237 }
238
239 @Override
240 public void put(ByteBuffer table, TPut put) throws TIOError, TException {
241 HTableInterface htable = getTable(table);
242 try {
243 htable.put(putFromThrift(put));
244 } catch (IOException e) {
245 throw getTIOError(e);
246 } finally {
247 closeTable(htable);
248 }
249 }
250
251 @Override
252 public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
253 ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
254 HTableInterface htable = getTable(table);
255 try {
256 return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
257 byteBufferToByteArray(qualifier), (value == null) ? null : byteBufferToByteArray(value),
258 putFromThrift(put));
259 } catch (IOException e) {
260 throw getTIOError(e);
261 } finally {
262 closeTable(htable);
263 }
264 }
265
266 @Override
267 public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
268 HTableInterface htable = getTable(table);
269 try {
270 htable.put(putsFromThrift(puts));
271 } catch (IOException e) {
272 throw getTIOError(e);
273 } finally {
274 closeTable(htable);
275 }
276 }
277
278 @Override
279 public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
280 HTableInterface htable = getTable(table);
281 try {
282 htable.delete(deleteFromThrift(deleteSingle));
283 } catch (IOException e) {
284 throw getTIOError(e);
285 } finally {
286 closeTable(htable);
287 }
288 }
289
290 @Override
291 public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
292 TException {
293 HTableInterface htable = getTable(table);
294 try {
295 htable.delete(deletesFromThrift(deletes));
296 } catch (IOException e) {
297 throw getTIOError(e);
298 } finally {
299 closeTable(htable);
300 }
301 return Collections.emptyList();
302 }
303
304 @Override
305 public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
306 ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
307 HTableInterface htable = getTable(table);
308
309 try {
310 if (value == null) {
311 return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
312 byteBufferToByteArray(qualifier), null, deleteFromThrift(deleteSingle));
313 } else {
314 return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
315 byteBufferToByteArray(qualifier), byteBufferToByteArray(value),
316 deleteFromThrift(deleteSingle));
317 }
318 } catch (IOException e) {
319 throw getTIOError(e);
320 } finally {
321 closeTable(htable);
322 }
323 }
324
325 @Override
326 public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
327 HTableInterface htable = getTable(table);
328 try {
329 return resultFromHBase(htable.increment(incrementFromThrift(increment)));
330 } catch (IOException e) {
331 throw getTIOError(e);
332 } finally {
333 closeTable(htable);
334 }
335 }
336
337 @Override
338 public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
339 HTableInterface htable = getTable(table);
340 try {
341 return resultFromHBase(htable.append(appendFromThrift(append)));
342 } catch (IOException e) {
343 throw getTIOError(e);
344 } finally {
345 closeTable(htable);
346 }
347 }
348
349 @Override
350 public int openScanner(ByteBuffer table, TScan scan) throws TIOError, TException {
351 HTableInterface htable = getTable(table);
352 ResultScanner resultScanner = null;
353 try {
354 resultScanner = htable.getScanner(scanFromThrift(scan));
355 } catch (IOException e) {
356 throw getTIOError(e);
357 } finally {
358 closeTable(htable);
359 }
360 return addScanner(resultScanner);
361 }
362
363 @Override
364 public List<TResult> getScannerRows(int scannerId, int numRows) throws TIOError,
365 TIllegalArgument, TException {
366 ResultScanner scanner = getScanner(scannerId);
367 if (scanner == null) {
368 TIllegalArgument ex = new TIllegalArgument();
369 ex.setMessage("Invalid scanner Id");
370 throw ex;
371 }
372
373 try {
374 return resultsFromHBase(scanner.next(numRows));
375 } catch (IOException e) {
376 throw getTIOError(e);
377 }
378 }
379
380 @Override
381 public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
382 throws TIOError, TException {
383 HTableInterface htable = getTable(table);
384 List<TResult> results = null;
385 ResultScanner scanner = null;
386 try {
387 scanner = htable.getScanner(scanFromThrift(scan));
388 results = resultsFromHBase(scanner.next(numRows));
389 } catch (IOException e) {
390 throw getTIOError(e);
391 } finally {
392 if (scanner != null) {
393 scanner.close();
394 }
395 closeTable(htable);
396 }
397 return results;
398 }
399
400 @Override
401 public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
402 LOG.debug("scannerClose: id=" + scannerId);
403 ResultScanner scanner = getScanner(scannerId);
404 if (scanner == null) {
405 String message = "scanner ID is invalid";
406 LOG.warn(message);
407 TIllegalArgument ex = new TIllegalArgument();
408 ex.setMessage("Invalid scanner Id");
409 throw ex;
410 }
411 scanner.close();
412 removeScanner(scannerId);
413 }
414
415 @Override
416 public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
417 HTableInterface htable = getTable(table);
418 try {
419 htable.mutateRow(rowMutationsFromThrift(rowMutations));
420 } catch (IOException e) {
421 throw getTIOError(e);
422 } finally {
423 closeTable(htable);
424 }
425 }
426
427 }