其他分享
首页 > 其他分享> > ES中的version机制

ES中的version机制

作者:互联网

ES version的管理是在LiveVersionMap中实现的, 主要包含以下4部分信息

1. version

2. seqNo

3. term

4. translogLocation

    IndexVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) {
        super(version, seqNo, term);
        this.translogLocation = translogLocation;
    }

其中前3个持久化存储在一下docValue中

    private final NumericDocValues versionDV;
    private final NumericDocValues seqNoDV;
    private final NumericDocValues primaryTermDV;
LiveVersionMap更新的时机是在初始化InternalEngine或者index时
 /**
     * Restores the live version map and local checkpoint of this engine using documents (including soft-deleted)
     * after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker
     * are in sync with the Lucene commit.
     */
    private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException {
        final IndexSearcher searcher = new IndexSearcher(directoryReader);
        searcher.setQueryCache(null);
        final Query query = new BooleanQuery.Builder()
            .add(LongPoint.newRangeQuery(
                    SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE), BooleanClause.Occur.MUST)
            // exclude non-root nested documents
            .add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST)
            .build();
        final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1.0f);
        for (LeafReaderContext leaf : directoryReader.leaves()) {
            final Scorer scorer = weight.scorer(leaf);
            if (scorer == null) {
                continue;
            }
            final CombinedDocValues dv = new CombinedDocValues(leaf.reader());
            final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor();
            final DocIdSetIterator iterator = scorer.iterator();
            int docId;
            while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
                final long primaryTerm = dv.docPrimaryTerm(docId);
                final long seqNo = dv.docSeqNo(docId);
                localCheckpointTracker.markSeqNoAsProcessed(seqNo);
                localCheckpointTracker.markSeqNoAsPersisted(seqNo);
                idFieldVisitor.reset();
                leaf.reader().document(docId, idFieldVisitor);
                if (idFieldVisitor.getId() == null) {
                    assert dv.isTombstone(docId);
                    continue;
                }
                final BytesRef uid = new Term(IdFieldMapper.NAME, Uid.encodeId(idFieldVisitor.getId())).bytes();
                try (Releasable ignored = versionMap.acquireLock(uid)) {
                    final VersionValue curr = versionMap.getUnderLock(uid);
                    if (curr == null ||
                        compareOpToVersionMapOnSeqNo(idFieldVisitor.getId(), seqNo, primaryTerm, curr) == OpVsLuceneDocStatus.OP_NEWER) {
                        if (dv.isTombstone(docId)) {
                            // use 0L for the start time so we can prune this delete tombstone quickly
                            // when the local checkpoint advances (i.e., after a recovery completed).
                            final long startTime = 0L;
                            versionMap.putDeleteUnderLock(uid, new DeleteVersionValue(dv.docVersion(docId), seqNo, primaryTerm, startTime));
                        } else {
                            versionMap.putIndexUnderLock(uid, new IndexVersionValue(null, dv.docVersion(docId), seqNo, primaryTerm));
                        }
                    }
                }
            }
        }
        // remove live entries in the version map
        refresh("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL, true);
    }

    /**
     * Adds this uid/version to the pending adds map iff the map needs safe access.
     */
    void maybePutIndexUnderLock(BytesRef uid, IndexVersionValue version) {
        assert assertKeyedLockHeldByCurrentThread(uid);
        Maps maps = this.maps;
        if (maps.isSafeAccessMode()) {
            putIndexUnderLock(uid, version);
        } else {
            // Even though we don't store a record of the indexing operation (and mark as unsafe),
            // we should still remove any previous delete for this uuid (avoid accidental accesses).
            // Not this should not hurt performance because the tombstone is small (or empty) when unsafe is relevant.
            removeTombstoneUnderLock(uid);
            maps.current.markAsUnsafe();
            assert putAssertionMap(uid, version);
        }
    }

当更新时如果带version,则会检测version是否与当前的version冲突,来达到乐观锁的目的

else if (index.versionType().isVersionConflictForWrites(
                currentVersion, index.version(), currentNotFoundOrDeleted)) {
                final VersionConflictEngineException e =
                        new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted);
                plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
            }

当冲突时会报VersionConflictEngineException

标签:docId,uid,seqNo,version,new,机制,final,ES
来源: https://blog.csdn.net/chuanyangwang/article/details/120575983