欢迎来到Doc100.Net免费学习资源知识分享平台!
您的位置:首页 > 程序异常 >

hbase 代码阅览笔记-1 - put-2-定位rs和region(基于0.94.12)

更新时间: 2014-01-05 02:56:35 责任编辑: Author_N1

 

HBASE 代码阅读笔记-1 - PUT-2-定位RS和REGION(基于0.94.12)
上一篇http://dennis-lee-gammy.iteye.com/admin/blogs/1972269把put操作的客户端主流程捋了一遍,但是很多比较重要的核心代码还未涉及,会在这一篇或者以后的文章中解析(本篇已完结)

先接着看
HConnectionManager.HConnectionImplementation.processBatchCallback方法吧,主要是完成如何根据tableName和RowKey定位到RS和具体的R
        public HRegionLocation locateRegion(final byte[] tableName,
                                            final byte[] row)
                throws IOException {
            return locateRegion(tableName, row, true, true);
        }
        public HRegionLocation relocateRegion(final byte[] tableName,
                                              final byte[] row)
                throws IOException {

            // Since this is an explicit request not to use any caching, finding
            // disabled tables should not be desirable.  This will ensure that an exception is thrown when
            // the first time a disabled table is interacted with.
            if (isTableDisabled(tableName)) {
                throw new DoNotRetryIOException(Bytes.toString(tableName) + " is disabled.");
            }

            return locateRegion(tableName, row, false, true);
        }

        //这里mark一下,标记为【0】
        private HRegionLocation locateRegion(final byte[] tableName,
                                             final byte[] row, boolean useCache, boolean retry)
                throws IOException {
            // 判断链接已经关闭以及tableName是否为空
            if (this.closed) throw new IOException(toString() + " closed");
            if (tableName == null || tableName.length == 0) {
                throw new IllegalArgumentException(
                        "table name cannot be null or zero length");
            }
            ensureZookeeperTrackers();//MY TODO
            // 如果是查询 -ROOT- 表,会有独立的逻辑进行处理,继续mark为【1】
            if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
                try {
                    ServerName servername = this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout);
                    LOG.debug("Looked up root region location, connection=" + this +
                            "; serverName=" + ((servername == null) ? "" : servername.toString()));
                    if (servername == null) return null;
                    return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO,
                            servername.getHostname(), servername.getPort());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return null;
                }
            } 
            // 下面是处理.META.和用户表,流程是完全一致的,通过父表查子表
            // 这是一个向上递归的过程,因为首先进入的是用户表,这里mark为【2】
            //,进入mark【4】
            else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
                return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row,
                        useCache, metaRegionLock, retry);
            } else {
                // Region not in the cache - have to go to the meta RS
                // 这里mark为【3】,进入mark【4】
                return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
                        useCache, userRegionLock, retry);
            }
        } 



-ROOT- 表的定位先放一放,咱们就按实际的处理顺序来看看代码。像幼儿园的小学生一样掰着手指头捋一遍。
首先,进入流程的是用户表,locateRegionInMeta方法负责读取用户表和.META.表的信息。
流程进入mark0,然后判断为非ROOT表和META表,进入mark3

         /*
          * Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation
          * info that contains the table and row we're seeking.
          */
        private HRegionLocation locateRegionInMeta(final byte[] parentTable,
                                                   final byte[] tableName, final byte[] row, boolean useCache,
                                                   Object regionLockObject, boolean retry)
                throws IOException {
            HRegionLocation location;
            // If we are supposed to be using the cache, look in the cache to see if
            // we already have the region.
            // 如果我们已经有了缓存,则直接从缓存中查询,当然,缓存是会失效的
            // 比如遇到region的分割、合并、balance,甚至RS当机
            if (useCache) { // 这里mark为【4】
                location = getCachedLocation(tableName, row);
                if (location != null) {
                    return location;
                }
            }

            int localNumRetries = retry ? numRetries : 1;
            // build the key of the meta region we should be looking for.
            // the extra 9's on the end are necessary to allow "exact" matches
            // without knowing the precise region names.
            // 创建一个key来查询region的meta信息,
            // 用9999999999做id是用于在不知道精确的regionname
            // 的情况下来定位region(目测暂时不知道为什么这么多9可行,请高手提示)
            byte[] metaKey = HRegionInfo.createRegionName(tableName, row,
                    HConstants.NINES, false);
            for (int tries = 0; true; tries++) {
                if (tries >= localNumRetries) {
                    throw new NoServerForRegionException("Unable to find region for "
                            + Bytes.toStringBinary(row) + " after " + numRetries + " tries.");
                }
                // 重试多次仍然无法获取数据则抛出异常
                HRegionLocation metaLocation = null;
                try {
                    // locate the root or meta region
                    // 定位父表(ROOT或者META,回到mark【0】)
                    metaLocation = locateRegion(parentTable, metaKey, true, false);
                    // If null still, go around again.没拿到信息,重试
                    if (metaLocation == null) continue;
                    // 拿到信息,来一个远程调用吧(备注【i】)
                    HRegionInterface server =
                            getHRegionConnection(metaLocation.getHostname(), metaLocation.getPort());

                    Result regionInfoRow = null;
                    // This block guards against two threads trying to load the meta
                    // region at the same time. The first will load the meta region and
                    // the second will use the value that the first one found.

                    synchronized (regionLockObject) {
                        // Check the cache again for a hit in case some other thread made the
                        // same query while we were waiting on the lock.
                        // 类似于一个单例模式的双锁检查
                        // 如果我们在等锁的过程中,已经有其他
                        // 的线程完成了定位工作并将其放入缓存,那么直接读取缓存
                        if (useCache) {
                            location = getCachedLocation(tableName, row);
                            if (location != null) {
                                return location;
                            }
                            // If the parent table is META, we may want to pre-fetch some
                            // region info into the global region cache for this table.
                            // 如果父表是META表(当前为用户表),则将该表的一些信息预抓取到全局缓存中
                            if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME)
                                    && (getRegionCachePrefetch(tableName))) {
                                prefetchRegionCache(tableName, row);//MY TODO
                            }
                            //完事了再试一遍缓存
                            location = getCachedLocation(tableName, row);
                            if (location != null) {
                                return location;
                            }
                        } else {
                            // If we are not supposed to be using the cache, delete any existing cached location
                            // so it won't interfere.
                            // 如果不用缓存就删了,region重定位的时候会走到这里
                            deleteCachedLocation(tableName, row);
                        }

                        // Query the root or meta region for the location of the meta region
                        // 这里才是正儿八经的远程调用获取信息。
                        // MS是坑了,备注【i】处的操作可以放到这里,
                        // 在缓存获取成功的情况下,可以省不少不必要的事。备注【ii】,读操作,后续再看
                        regionInfoRow = server.getClosestRowBefore(
                                metaLocation.getRegionInfo().getRegionName(), metaKey,
                                HConstants.CATALOG_FAMILY);
                    }
                    // 拿不到信息只能异常了
                    if (regionInfoRow == null) {
                        throw new TableNotFoundException(Bytes.toString(tableName));
                    }
                    // info:regionInfo,value就是当前需要获取的region的信息
                    byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
                            HConstants.REGIONINFO_QUALIFIER);
                    if (value == null || value.length == 0) {
                        throw new IOException("HRegionInfo was null or empty in " +
                                Bytes.toString(parentTable) + ", row=" + regionInfoRow);
                    }
                    // convert the row result into the HRegionLocation we need!
                    HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(
                            value, new HRegionInfo());
                    // possible we got a region of a different table...
                    // 还有可能拿到的region信息跟不属于我们查找的表?
                    // 还有这种情况?再次不明觉厉
                    if (!Bytes.equals(regionInfo.getTableName(), tableName)) {
                        throw new TableNotFoundException(
                                "Table '" + Bytes.toString(tableName) + "' was not found, got: " +
                                        Bytes.toString(regionInfo.getTableName()) + ".");
                    }
                    // 如果region正在split,而两个daughter region还未上线,
                    // 那只能说再见了
                    if (regionInfo.isSplit()) {
                        throw new RegionOfflineException("the only available region for" +
                                " the required row is a split parent," +
                                " the daughters should be online soon: " +
                                regionInfo.getRegionNameAsString());
                    }
                    // region下线,也只能说88  
                    if (regionInfo.isOffline()) {
                        throw new RegionOfflineException("the region is offline, could" +
                                " be caused by a disable table call: " +
                                regionInfo.getRegionNameAsString());
                    }
                    // info:server,拿到RS信息
                    value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
                            HConstants.SERVER_QUALIFIER);
                    String hostAndPort = "";
                    if (value != null) {
                        hostAndPort = Bytes.toString(value);
                    }
                    if (hostAndPort.equals("")) {
                        throw new NoServerForRegionException("No server address listed " +
                                "in " + Bytes.toString(parentTable) + " for region " +
                                regionInfo.getRegionNameAsString() + " containing row " +
                                Bytes.toStringBinary(row));
                    }

                    // Instantiate the location
                    String hostname = Addressing.parseHostname(hostAndPort);
                    int port = Addressing.parsePort(hostAndPort);
                    location = new HRegionLocation(regionInfo, hostname, port);
                    cacheLocation(tableName, location);//缓存当前region信息
                    return location;
                } catch (TableNotFoundException e) {
                    // if we got this error, probably means the table just plain doesn't
                    // exist. rethrow the error immediately. this should always be coming
                    // from the HTable constructor.
                    throw e;
                } catch (IOException e) {
                    if (e instanceof RemoteException) {
                        e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
                    }
                    if (tries < numRetries - 1) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("locateRegionInMeta parentTable=" +
                                    Bytes.toString(parentTable) + ", metaLocation=" +
                                    ((metaLocation == null) ? "null" : "{" + metaLocation + "}") +
                                    ", attempt=" + tries + " of " +
                                    this.numRetries + " failed; retrying after sleep of " +
                                    ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
                        }
                    } else {
                        throw e;
                    }
                    // Only relocate the parent region if necessary
                    if (!(e instanceof RegionOfflineException ||
                            e instanceof NoServerForRegionException)) {
                        relocateRegion(parentTable, metaKey); 
                        //如果不是region下线或者找不到RS,都重试,根据上面展示的代码,重试的时候会将usecache置为false,以达到清理错误缓存信息的目的
                    }
                }
                try {
                    Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Giving up trying to location region in " +
                            "meta: thread is interrupted.");
                }
            }
        }


定位region的主流程到这里就完结了,整个是一个向上递归的过程通过user表-meta表-root表-meta表-user表的流程。每一次都会尝试从缓存里面获取信息,并根据处理结果缓存信息或者清理失效的老信息。

接着看一下如何从缓存中获取信息吧
     HRegionLocation getCachedLocation(final byte[] tableName,
                                          final byte[] row) {
            SoftValueSortedMap<byte[], HRegionLocation> tableLocations =
                    getTableLocations(tableName);
            // 这里仅仅是根据tablename把缓存的该table的region信息都拿出来  
            // start to examine the cache. we can only do cache actions
            // if there's something in the cache for this table.
            if (tableLocations.isEmpty()) {
                return null;
            }
            // 如果已经有缓存的region信息,再直接用当前rowkey当key取换取region信息
            // region的存储索引格式为[startKey,endKey),缓存里面存的rowkey为startkey
            // 所以这一步的命中率其实是非常低的
            HRegionLocation possibleRegion = tableLocations.get(row);
            if (possibleRegion != null) {
                return possibleRegion;
            }
            // 直接获取失败,在返回一个key严格小于当前rowkey的region信息,即startKey < rowkey
            // 如果所有的starkKey都比rowkey大,那就说明当前rowkey对应的region信息没有被缓存
            // 因为region的存储索引格式为[startKey,endKey)
            possibleRegion = tableLocations.lowerValueByKey(row);
            if (possibleRegion == null) {
                return null;
            }

            // make sure that the end key is greater than the row we're looking
            // for, otherwise the row actually belongs in the next region, not
            // this one. the exception case is when the endkey is
            // HConstants.EMPTY_END_ROW, signifying that the region we're
            // checking is actually the last region in the table.
            // 如果拿到一个startKey < rowkey
            // 则当前rowkey就有可能在当前region中,再检查rowkey < endKey 即可
            // 因为region的存储索引格式为[startKey,endKey) 
            byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
            if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
                    KeyValue.getRowComparator(tableName).compareRows(
                            endKey, 0, endKey.length, row, 0, row.length) > 0) {
                return possibleRegion;
            }

            // Passed all the way through, so we got nothin - complete cache miss
            return null;
        }
        // 读取该table所有的region信息(已缓存的)
        private SoftValueSortedMap<byte[], HRegionLocation> getTableLocations(
                final byte[] tableName) {
            // find the map of cached locations for this table
            Integer key = Bytes.mapKey(tableName);//hash一下
            SoftValueSortedMap<byte[], HRegionLocation> result;
            synchronized (this.cachedRegionLocations) {
                result = this.cachedRegionLocations.get(key);
                // if tableLocations for this table isn't built yet, make one
                if (result == null) {
                    result = new SoftValueSortedMap<byte[], HRegionLocation>(
                            Bytes.BYTES_COMPARATOR);
                    this.cachedRegionLocations.put(key, result);//没有的话先创建好缓存对象,然后以当前rowkey为key先塞进去
                }
            }
            return result;
        }



读取缓存的就完事了,然后看看写入和删除

先看看写入吧
      private void cacheLocation(final byte[] tableName,
                                   final HRegionLocation location) {
            byte[] startKey = location.getRegionInfo().getStartKey();
            Map<byte[], HRegionLocation> tableLocations =
                    getTableLocations(tableName);//还是先把已经该table已经缓存的region信息拿出来
            boolean hasNewCache = false;
            synchronized (this.cachedRegionLocations) {
                cachedServers.add(location.getHostnamePort());
                hasNewCache = (tableLocations.put(startKey, location) == null);
            }
            if (hasNewCache) {
                LOG.debug("Cached location for " +
                        location.getRegionInfo().getRegionNameAsString() +
                        " is " + location.getHostnamePort());
            }
        }
    //SoftValueSortedMap<K, V> implements SortedMap<K, V> 提供,key就不说了,value是使用软引用实现的,   
    public V put(K key, V value) {
        synchronized (sync) {
            checkReferences();
            SoftValue<K, V> oldValue = this.internalMap.put(key,
                    new SoftValue<K, V>(key, value, this.rq));
            return oldValue == null ? null : oldValue.get();
        }
    }
    // 软引用会在GC的时候在没有其他强引用的时候可能被清除,可以提供一个引用队列,
    // 如果软引用所引用的对象被垃圾回收器回收,Java虚拟机就会把这个软引用加入到与之关联的引用队列中。
    // 有关强引用、软引用、若引用以及幽灵引用和GC的知识这里就不关注了先
    // 这里就是检查已经被GC的缓存的对象,然后清除
    private int checkReferences() {
        int i = 0;
        for (Reference<? extends V> ref; (ref = this.rq.poll()) != null; ) {
            i++;
            this.internalMap.remove(((SoftValue<K, V>) ref).key);
        }
        return i;
    }


写入这就完事了,最后再看看删除,其实就是调用一遍读流程,不为空的话则删除
      void deleteCachedLocation(final byte[] tableName, final byte[] row) {
            synchronized (this.cachedRegionLocations) {
                Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);
                if (!tableLocations.isEmpty()) {
                    // start to examine the cache. we can only do cache actions
                    // if there's something in the cache for this table.
                    HRegionLocation rl = getCachedLocation(tableName, row);
                    if (rl != null) {
                        tableLocations.remove(rl.getRegionInfo().getStartKey());
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Removed " +
                                    rl.getRegionInfo().getRegionNameAsString() +
                                    " for tableName=" + Bytes.toString(tableName) +
                                    " from cache " + "because of " + Bytes.toStringBinary(row));
                        }
                    }
                }
            }
        }


最后还有一个小尾巴,先看看上面的注释:
// 如果父表是META表(当前为用户表),则将该表的一些信息预抓取到全局缓存中


好吧,这个尾巴其实不小
private void prefetchRegionCache(final byte[] tableName,
                                         final byte[] row) {
            // Implement a new visitor for MetaScanner, and use it to walk through
            // the .META.
            // 生成一个MetaScannerVisitor用来遍历META表
            MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
                public boolean processRow(Result result) throws IOException {
                    try {
                        // info:regioninfo
                        // 这里的流程跟之前获取用户表的region信息几乎是一模一样的
                        // 从value中获取region信息,然后检查region对应表是否正确,是否下线。然后缓存起来
                        // 不过目测没有检查是否在做split,META表是可以被split的,不明为啥,求解释
                        byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
                                HConstants.REGIONINFO_QUALIFIER);
                        HRegionInfo regionInfo = null;

                        if (value != null) {
                            // convert the row result into the HRegionLocation we need!
                            regionInfo = Writables.getHRegionInfo(value);

                            // possible we got a region of a different table...
                            if (!Bytes.equals(regionInfo.getTableName(),
                                    tableName)) {
                                return false; // stop scanning
                            }
                            if (regionInfo.isOffline()) {
                                // don't cache offline regions
                                return true;
                            }
                            value = result.getValue(HConstants.CATALOG_FAMILY,
                                    HConstants.SERVER_QUALIFIER);//info:server
                            if (value == null) {
                                return true;  // don't cache it
                            }
                            final String hostAndPort = Bytes.toString(value);
                            String hostname = Addressing.parseHostname(hostAndPort);
                            int port = Addressing.parsePort(hostAndPort);
                            value = result.getValue(HConstants.CATALOG_FAMILY,
                                    HConstants.STARTCODE_QUALIFIER);//info:serverStartCode,我去,这里取出来了怎么没用呢?
                            // instantiate the location
                            HRegionLocation loc =
                                    new HRegionLocation(regionInfo, hostname, port);
                            // cache this meta entry
                            cacheLocation(tableName, loc);
                        }
                        return true;
                    } catch (RuntimeException e) {
                        throw new IOException(e);
                    }
                }
            };
            try {
                // pre-fetch certain number of regions info at region cache.
                MetaScanner.metaScan(conf, this, visitor, tableName, row,
                        this.prefetchRegionLimit, HConstants.META_TABLE_NAME);
            } catch (IOException e) {
                LOG.warn("Encountered problems when prefetch META table: ", e);
            }
        }

看看这个MetaScannerVisitor 在整个scan过程中起了什么作用呢?MetaScanner.metaScan。
代码量看着不小,其实就是scan了一遍meta表,每扫描到一个rowkey就调用一次visitor。
作为一个客户端scan的过程,扫描游标大小同样受hbase.client.prefetch.limit配置限制,默认是10。
扫描的过程就是一个非常标准的读过程,这里不再继续深究了,等到研究读代码的时候再说
   public static void metaScan(Configuration configuration, HConnection connection,
                                MetaScannerVisitor visitor, byte[] tableName, byte[] row,
                                int rowLimit, final byte[] metaTableName)
            throws IOException {
        HTable metaTable = null;
        try {
            if (connection == null) {
                metaTable = new HTable(configuration, HConstants.META_TABLE_NAME, null);
            } else {
                metaTable = new HTable(HConstants.META_TABLE_NAME, connection, null);
            }
            int rowUpperLimit = rowLimit > 0 ? rowLimit : Integer.MAX_VALUE;

            // if row is not null, we want to use the startKey of the row's region as
            // the startRow for the meta scan.
            byte[] startRow;
            if (row != null) {
                // Scan starting at a particular row in a particular table
                assert tableName != null;
                byte[] searchRow =
                        HRegionInfo.createRegionName(tableName, row, HConstants.NINES,
                                false);
                Result startRowResult = metaTable.getRowOrBefore(searchRow,
                        HConstants.CATALOG_FAMILY);
                if (startRowResult == null) {
                    throw new TableNotFoundException("Cannot find row in .META. for table: "
                            + Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow));
                }
                byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
                        HConstants.REGIONINFO_QUALIFIER);
                if (value == null || value.length == 0) {
                    throw new IOException("HRegionInfo was null or empty in Meta for " +
                            Bytes.toString(tableName) + ", row=" + Bytes.toStringBinary(searchRow));
                }
                HRegionInfo regionInfo = Writables.getHRegionInfo(value);

                byte[] rowBefore = regionInfo.getStartKey();
                startRow = HRegionInfo.createRegionName(tableName, rowBefore,
                        HConstants.ZEROES, false);
            } else if (tableName == null || tableName.length == 0) {
                // Full META scan
                startRow = HConstants.EMPTY_START_ROW;
            } else {
                // Scan META for an entire table
                startRow = HRegionInfo.createRegionName(
                        tableName, HConstants.EMPTY_START_ROW, HConstants.ZEROES, false);
            }

            // Scan over each meta region
            ScannerCallable callable;
            int rows = Math.min(rowLimit, configuration.getInt(
                    HConstants.HBASE_META_SCANNER_CACHING,
                    HConstants.DEFAULT_HBASE_META_SCANNER_CACHING));
            do {
                final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Scanning " + Bytes.toString(metaTableName) +
                            " starting at row=" + Bytes.toStringBinary(startRow) + " for max=" +
                            rowUpperLimit + " rows using " + metaTable.getConnection().toString());
                }
                callable = new ScannerCallable(metaTable.getConnection(), metaTableName, scan, null);
                // Open scanner
                callable.withRetries();

                int processedRows = 0;
                try {
                    callable.setCaching(rows);
                    done:
                    do {
                        if (processedRows >= rowUpperLimit) {
                            break;
                        }
                        //we have all the rows here
                        Result[] rrs = callable.withRetries();
                        if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) {
                            break; //exit completely
                        }
                        for (Result rr : rrs) {
                            if (processedRows >= rowUpperLimit) {
                                break done;
                            }
                            if (!visitor.processRow(rr))
                                break done; //exit completely
                            processedRows++;
                        }
                        //here, we didn't break anywhere. Check if we have more rows
                    } while (true);
                    // Advance the startRow to the end key of the current region
                    startRow = callable.getHRegionInfo().getEndKey();
                } finally {
                    // Close scanner
                    callable.setClose();
                    callable.withRetries();
                }
            } while (Bytes.compareTo(startRow, HConstants.LAST_ROW) != 0);
        } finally {
            visitor.close();
            if (metaTable != null) {
                metaTable.close();
            }
        }
    }



于是乎,region的定位代码这里就OK了。

补充一段备注处的代码,看看是否值得将那里的操作后置
        public HRegionInterface getHRegionConnection(final String hostname,
                                                     final int port)
                throws IOException {
            return getHRegionConnection(hostname, port, false);
        }

        public HRegionInterface getHRegionConnection(final String hostname,
                                                     final int port, final boolean master)
                throws IOException {
            return getHRegionConnection(hostname, port, null, master);
        }

        HRegionInterface getHRegionConnection(final String hostname, final int port,
                                              final InetSocketAddress isa, final boolean master)
                throws IOException {
            if (master) getMaster();
            HRegionInterface server;
            String rsName = null;
            if (isa != null) {
                rsName = Addressing.createHostAndPortStr(isa.getHostName(),
                        isa.getPort());
            } else {
                rsName = Addressing.createHostAndPortStr(hostname, port);
            }
            ensureZookeeperTrackers();
            // See if we already have a connection (common case)
            // 这是一个HRegionInterface
            server = this.servers.get(rsName);
            if (server == null) {
                // create a unique lock for this RS (if necessary)
                this.connectionLock.putIfAbsent(rsName, rsName);
                // get the RS lock
                synchronized (this.connectionLock.get(rsName)) {
                    // 双锁检查,很严谨 
                    // do one more lookup in case we were stalled above
                    server = this.servers.get(rsName);
                    if (server == null) {
                        try {
                            // Only create isa when we need to.
                            InetSocketAddress address = isa != null ? isa :
                                    new InetSocketAddress(hostname, port);
                            // definitely a cache miss. establish an RPC for this RS
                            // 关键的代码在这里,标记一下吧,研究到RPC的时候再继续了
                            server = HBaseRPC.waitForProxy(this.rpcEngine,
                                    serverInterfaceClass, HRegionInterface.VERSION,
                                    address, this.conf,
                                    this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
                            this.servers.put(Addressing.createHostAndPortStr(
                                    address.getHostName(), address.getPort()), server);
                        } catch (RemoteException e) {
                            LOG.warn("RemoteException connecting to RS", e);
                            // Throw what the RemoteException was carrying.
                            throw e.unwrapRemoteException();
                        }
                    }
                }
            }
            return server;
        }
上一篇:上一篇
下一篇:下一篇

 

随机推荐程序问答结果

 

 

如对文章有任何疑问请提交到问题反馈,或者您对内容不满意,请您反馈给我们DOC100.NET论坛发贴求解。
DOC100.NET资源网,机器学习分类整理更新日期::2014-01-05 02:56:35
如需转载,请注明文章出处和来源网址:http://www.doc100.net/bugs/t/16424/
本文WWW.DOC100.NET DOC100.NET版权所有。