欢迎来到Doc100.Net免费学习资源知识分享平台!
您的位置:首页 > 程序异常 >

hbase 代码翻阅笔记-1 - put-1-主流程(基于0.94.12)

更新时间: 2014-01-05 02:56:39 责任编辑: Author_N1

 

HBASE 代码阅读笔记-1 - PUT-1-主流程(基于0.94.12)
最近闲来无事看看hbase源代码,为了加强理解和记忆随便写了一些东西在这里。初次整理,内容和思路都比较凌乱。(本篇已完结)
本代码基于hbase 0.94.12
HTable
@Override
    public void put(final List<Put> puts) throws IOException {
        for (Put put : puts) {
            doPut(put);
        }
        if (autoFlush) {
            flushCommits();
        }
    }

    private void doPut(Put put) throws IOException {
        validatePut(put);
        writeBuffer.add(put);
        currentWriteBufferSize += put.heapSize();
        if (currentWriteBufferSize > writeBufferSize) {
            flushCommits();
        }
    }

    这里执行put操作,批量put操作只是单纯的循环调用单次的put操作而已。
单次put操作中,会先校验put的正确性,比较简单,这里不做累述。然后会将put操作放入一个等待队列中(writeBuffer),并累计客户端cache大小(currentWriteBufferSize)。如果cache大小已经超过限制(默认是2m,由hbase.client.write.buffer参数配置),则提交到服务端。

@Override
    public void flushCommits() throws IOException {
        try {
            Object[] results = new Object[writeBuffer.size()];
            try {
                this.connection.processBatch(writeBuffer, tableName, pool, results);//这里是关键的操作
            } catch (InterruptedException e) {
                throw new IOException(e);
            } finally {
                
                // mutate list so that it is empty for complete success, or contains
                // only failed records results are returned in the same order as the
                // requests in list walk the list backwards, so we can remove from list
                // without impacting the indexes of earlier members
                // 删除已经成功返回的操作
                for (int i = results.length - 1; i >= 0; i--) {
                    if (results[i] instanceof Result) {
                        // successful Puts are removed from the list here.
                        writeBuffer.remove(i);
                    }
                }
            }
        } finally {
            if (clearBufferOnFail) {//如果设置为clearBufferOnFail模式,则不管成功与否都清除操作队列和缓存大小,该值默认=autoFlush
                writeBuffer.clear();
                currentWriteBufferSize = 0;
            } else {
                // the write buffer was adjusted by processBatchOfPuts
                currentWriteBufferSize = 0;
                for (Put aPut : writeBuffer) {
                    currentWriteBufferSize += aPut.heapSize();
                }// 重新计算一遍缓存大小
            }
        }
    }
    public void setAutoFlush(boolean autoFlush) {
        setAutoFlush(autoFlush, autoFlush);
    }
    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
        this.autoFlush = autoFlush;
        this.clearBufferOnFail = autoFlush || clearBufferOnFail;
    }


   
HConnectionManager.HConnectionImplementation:HTable内部HConnection成员的实际实现类。这里是上面提到的所谓关键操作的代码
public void processBatch(List<? extends Row> list,
                                 final byte[] tableName,
                                 ExecutorService pool,
                                 Object[] results) throws IOException, InterruptedException {
            // This belongs in HTable!!! Not in here.  St.Ack

            // results must be the same size as list
            if (results.length != list.size()) {
                throw new IllegalArgumentException("argument results must be the same size as argument list");
            }

            processBatchCallback(list, tableName, pool, results, null);
        }

   
public <R> void processBatchCallback(
                List<? extends Row> list,//put操作列表
                byte[] tableName,//tableName
                ExecutorService pool,//HTable的executorService成员
                Object[] results,//返回结果
                Batch.Callback<R> callback)//回调,目测现在为NULL
                throws IOException, InterruptedException {
            // This belongs in HTable!!! Not in here.  St.Ack

            // results must be the same size as list
            if (results.length != list.size()) {
                throw new IllegalArgumentException(
                        "argument results must be the same size as argument list");
            }
            if (list.isEmpty()) {
                return;
            }

            // Keep track of the most recent servers for any given item for better
            // exceptional reporting.  We keep HRegionLocation to save on parsing.
            // Later below when we use lastServers, we'll pull what we need from
            // lastServers.
            // 记录下最近访问的服务器的信息以便报告异常。
            // 保存HRegionLocation来节省解析的成本。
            // 然后当我们要用到最新访问的服务器的时候,
            // 可以直接从lastservers里面获取想要的信息
            HRegionLocation[] lastServers = new HRegionLocation[results.length];
            List<Row> workingList = new ArrayList<Row>(list);
            boolean retry = true;
            // count that helps presize actions array
            int actionCount = 0;
            // numRetries,失败重试次数,
            // 由hbase.client.retries.number配置决定,默认为10
            for (int tries = 0; tries < numRetries && retry; ++tries) {

                // sleep first, if this is a retry
                if (tries >= 1) {
                    long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries);
                    LOG.debug("Retry " + tries + ", sleep for " + sleepTime + "ms!");
                    Thread.sleep(sleepTime);
                }
                // 以下方法会很长,每一个步骤都单独提取出来阅读吧
                // step 1: break up into regionserver-sized chunks and build the data structs
                

                // step 2: make the requests

                

                // step 3: collect the failures and successes and prepare for retry

                

                // step 4: identify failures and prep for a retry (if applicable).

                
        }
            // 成功或重试完成后,检查是否还有失败的任务
            List<Throwable> exceptions = new ArrayList<Throwable>(actionCount);
            List<Row> actions = new ArrayList<Row>(actionCount);
            List<String> addresses = new ArrayList<String>(actionCount);

            for (int i = 0; i < results.length; i++) {
                if (results[i] == null || results[i] instanceof Throwable) {
                    exceptions.add((Throwable) results[i]);
                    actions.add(list.get(i));
                    addresses.add(lastServers[i].getHostnamePort());
                }
            }

            if (!exceptions.isEmpty()) {
                throw new RetriesExhaustedWithDetailsException(exceptions,
                        actions,
                        addresses);
            }
        }

step1,按region封装和分配请求,这里有一步很重要的操作:定位rowkey所属的region和RS,详见http://dennis-lee-gammy.iteye.com/admin/blogs/1972477
         // MultiAction相当于是一个Action的集合(事实上它确实是一个有序的集合)
         // 而Action就是指Get,Put,Del等
         Map<HRegionLocation, MultiAction<R>> actionsByServer =
                        new HashMap<HRegionLocation, MultiAction<R>>();
                for (int i = 0; i < workingList.size(); i++) {
                    Row row = workingList.get(i);
                    if (row != null) {
                        HRegionLocation loc = locateRegion(tableName, row.getRow());
                        // 定位每一个rowkey对应的region,这里是个大头,详情请看这里
                        // [url]http://dance-lee-163-com.iteye.com/admin/blogs/1972477[/url]
                        byte[] regionName = loc.getRegionInfo().getRegionName();

                        MultiAction<R> actions = actionsByServer.get(loc);
                        if (actions == null) {
                            actions = new MultiAction<R>();
                            actionsByServer.put(loc, actions);
                        }// 按region信息和action列表的对应关系保存

                        Action<R> action = new Action<R>(row, i);
                        lastServers[i] = loc;
                        actions.add(regionName, action);
                        // 按regionanme和操作添加,其实内部现在只有一个entry
                    }
                }

step 2
Map<HRegionLocation, Future<MultiResponse>> futures =
                        new HashMap<HRegionLocation, Future<MultiResponse>>(
                                actionsByServer.size());

                for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) {
                    futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
                    // 这里代码比较简单,就是创建提交请求任务,
                    // 并将任务提交给HTable的线程池,createCallable方法后续介绍。
                }

step 3 这里主要是收集执行结果
MultiResponse维护了一个regionName - responseList的字典结构,其中responseList元素类型为Pair,first为当前结果对应的action在原actionList中的索引,second为实际结果。
for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer
                        : futures.entrySet()) {
                    HRegionLocation loc = responsePerServer.getKey();

                    try {
                        Future<MultiResponse> future = responsePerServer.getValue();
                        MultiResponse resp = future.get();

                        if (resp == null) {
                            // Entire server failed
                            LOG.debug("Failed all for server: " + loc.getHostnamePort() +
                                    ", removing from cache");
                            continue;
                        }

                        for (Entry<byte[], List<Pair<Integer, Object>>> e : resp.getResults().entrySet()) {
                            byte[] regionName = e.getKey();
                            List<Pair<Integer, Object>> regionResults = e.getValue();
                            for (Pair<Integer, Object> regionResult : regionResults) {
                                if (regionResult == null) {
                                    // if the first/only record is 'null' the entire region failed.
                                    LOG.debug("Failures for region: " +
                                            Bytes.toStringBinary(regionName) +
                                            ", removing from cache");
                                } else {
                                    // Result might be an Exception, including DNRIOE
                                    results[regionResult.getFirst()] = regionResult.getSecond();
                                    if (callback != null && !(regionResult.getSecond() instanceof Throwable)) {
                                        callback.update(e.getKey(),
                                                list.get(regionResult.getFirst()).getRow(),
                                                (R) regionResult.getSecond());
                                    }
                                }
                            }
                        }
                    } catch (ExecutionException e) {
                        LOG.warn("Failed all from " + loc, e);
                    }
                }

step 4 这里主要是集中处理异常
// Find failures (i.e. null Result), and add them to the workingList (in
                // order), so they can be retried.
                retry = false;
                workingList.clear();
                actionCount = 0;
                for (int i = 0; i < results.length; i++) {
                    // if null (fail) or instanceof Throwable && not instanceof DNRIOE
                    // then retry that row. else dont.
                    if (results[i] == null ||
                            (results[i] instanceof Throwable &&
                                    !(results[i] instanceof DoNotRetryIOException))) {
                        // 如果结果为空,或者结果是非DoNotRetryIOException的异常
                        // 就把该请求和应该处理该请求返回任务队列,
                        // 然后删除已经缓存的处理该任务的region信息
                        retry = true;
                        actionCount++;
                        Row row = list.get(i);
                        workingList.add(row);
                        deleteCachedLocation(tableName, row.getRow());
                    } else {
                        if (results[i] != null && results[i] instanceof Throwable) {
                            actionCount++;
                        }
                        // add null to workingList, so the order remains consistent with the original list argument.
                        workingList.add(null);
                    }
                }


最后是创建请求的方法,这里的call方法只是创建了一个ServerCallable对象。然后调用该对象的withoutRetries方法。
      private <R> Callable<MultiResponse> createCallable(
                         final HRegionLocation loc,
                         final MultiAction<R> multi, final byte[] tableName) {
            // TODO: This does not belong in here!!! St.Ack  HConnections should
            // not be dealing in Callables; Callables have HConnections, not other
            // way around.
            final HConnection connection = this;
            return new Callable<MultiResponse>() {
                public MultiResponse call() throws IOException {
                    ServerCallable<MultiResponse> callable =
                            new ServerCallable<MultiResponse>(connection, tableName, null) {
                                // 看这里看这里
                                public MultiResponse call() throws IOException {
                                    return server.multi(multi);
                                }

                                @Override
                                public void connect(boolean reload) throws IOException {
                                    server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());
                                }
                            };
                    return callable.withoutRetries();
                }
            };
        }
        
        public T withoutRetries()
            throws IOException, RuntimeException {
        globalStartTime = EnvironmentEdgeManager.currentTimeMillis();
        try {
            beforeCall();// 记录和计算时间
            connect(false);
            return call();
        } catch (Throwable t) {
            Throwable t2 = translateException(t);
            if (t2 instanceof IOException) {
                throw (IOException) t2;
            } else {
                throw new RuntimeException(t2);
            }
        } finally {
            afterCall();记录和计算时间
        }
    }
上一篇:上一篇
下一篇:下一篇

 

随机推荐程序问答结果

 

 

如对文章有任何疑问请提交到问题反馈,或者您对内容不满意,请您反馈给我们DOC100.NET论坛发贴求解。
DOC100.NET资源网,机器学习分类整理更新日期::2014-01-05 02:56:39
如需转载,请注明文章出处和来源网址:http://www.doc100.net/bugs/t/16442/
本文WWW.DOC100.NET DOC100.NET版权所有。