summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r--src/couchdb/couch_db.erl757
1 files changed, 757 insertions, 0 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
new file mode 100644
index 00000000..e567d27b
--- /dev/null
+++ b/src/couchdb/couch_db.erl
@@ -0,0 +1,757 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db).
+-behaviour(gen_server).
+
+-export([open/2,create/2,create/3,get_doc_info/2]).
+-export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]).
+-export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]).
+-export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
+-export([start_update_loop/1]).
+-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+-record(db_header,
+ {write_version = 0,
+ last_update_seq = 0,
+ summary_stream_state = nil,
+ docinfo_by_Id_btree_state = nil,
+ docinfo_by_seq_btree_state = nil,
+ local_docs_btree_state = nil,
+ doc_count=0,
+ doc_del_count=0
+ }).
+
+-record(db,
+ {main_pid,
+ update_pid,
+ fd,
+ header = #db_header{},
+ summary_stream,
+ docinfo_by_Id_btree,
+ docinfo_by_seq_btree,
+ local_docs_btree,
+ last_update_seq,
+ doc_count,
+ doc_del_count,
+ name
+ }).
+
+start_link(DbName, Filepath, Options) ->
+ case couch_file:open(Filepath, Options) of
+ {ok, Fd} ->
+ Result = gen_server:start_link(couch_db, {DbName, Fd, Options}, []),
+ unlink(Fd),
+ Result;
+ {error, enoent} ->
+ % couldn't find file
+ {error, not_found};
+ Else ->
+ Else
+ end.
+
+%%% Interface functions %%%
+
+create(Filepath, Options) ->
+ create(Filepath, Filepath, Options).
+
+create(DbName, Filepath, Options) when is_list(Options) ->
+ start_link(DbName, Filepath, [create | Options]).
+
+open(DbName, Filepath) ->
+ start_link(DbName, Filepath, []).
+
+delete_doc(MainPid, Id, Revisions) ->
+ DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
+ {ok, [Result]} = update_docs(MainPid, DeletedDocs, [new_edits]),
+ {ok, Result}.
+
+open_doc(MainPid, IdOrDocInfo) ->
+ open_doc(MainPid, IdOrDocInfo, []).
+
+open_doc(MainPid, Id, Options) ->
+ case open_doc_int(get_db(MainPid), Id, Options) of
+ {ok, #doc{deleted=true}=Doc} ->
+ case lists:member(deleted, Options) of
+ true ->
+ {ok, Doc};
+ false ->
+ {not_found, deleted}
+ end;
+ Else ->
+ Else
+ end.
+
+open_doc_revs(MainPid, Id, Revs, Options) ->
+ open_doc_revs_int(get_db(MainPid), Id, Revs, Options).
+
+get_missing_revs(MainPid, IdRevsList) ->
+ Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
+ FullDocInfoResults = get_full_doc_infos(MainPid, Ids),
+ Results = lists:zipwith(
+ fun({Id, Revs}, FullDocInfoResult) ->
+ case FullDocInfoResult of
+ {ok, #full_doc_info{rev_tree=RevisionTree}} ->
+ {Id, couch_key_tree:find_missing(RevisionTree, Revs)};
+ not_found ->
+ {Id, Revs}
+ end
+ end,
+ IdRevsList, FullDocInfoResults),
+ {ok, Results}.
+
+get_doc_info(Db, Id) ->
+ case get_full_doc_info(Db, Id) of
+ {ok, DocInfo} ->
+ {ok, couch_doc:to_doc_info(DocInfo)};
+ Else ->
+ Else
+ end.
+
+% returns {ok, DocInfo} or not_found
+get_full_doc_info(Db, Id) ->
+ [Result] = get_full_doc_infos(Db, [Id]),
+ Result.
+
+
+get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) ->
+ get_full_doc_infos(get_db(MainPid), Ids);
+get_full_doc_infos(#db{}=Db, Ids) ->
+ couch_btree:lookup(Db#db.docinfo_by_Id_btree, Ids).
+
+get_db_info(MainPid) when is_pid(MainPid) ->
+ get_db_info(get_db(MainPid));
+get_db_info(#db{doc_count=Count, doc_del_count=DelCount, last_update_seq=SeqNum}) ->
+ InfoList = [
+ {doc_count, Count},
+ {doc_del_count, DelCount},
+ {last_update_seq, SeqNum}
+ ],
+ {ok, InfoList}.
+
+update_doc(MainPid, Doc, Options) ->
+ {ok, [NewRev]} = update_docs(MainPid, [Doc], Options),
+ {ok, NewRev}.
+
+update_docs(MainPid, Docs) ->
+ update_docs(MainPid, Docs, []).
+
+% group_alike_docs groups the sorted documents into sublist buckets, by id.
+% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
+group_alike_docs(Docs) ->
+ Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
+ group_alike_docs(Sorted, []).
+
+group_alike_docs([], Buckets) ->
+ lists:reverse(Buckets);
+group_alike_docs([Doc|Rest], []) ->
+ group_alike_docs(Rest, [[Doc]]);
+group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
+ [#doc{id=BucketId}|_] = Bucket,
+ case Doc#doc.id == BucketId of
+ true ->
+ % add to existing bucket
+ group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]);
+ false ->
+ % add to new bucket
+ group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
+ end.
+
+
+prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocInfo, LeafRevsDict) ->
+ case PrevRevs of
+ [PrevRev|_] ->
+ case dict:find(PrevRev, LeafRevsDict) of
+ {ok, {Deleted, Sp, DiskRevs}} ->
+ case couch_doc:has_stubs(Doc) of
+ true ->
+ DiskDoc = make_doc(Db, Id, Deleted, Sp, DiskRevs),
+ Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
+ Doc2#doc{revs=[NewRev|DiskRevs]};
+ false ->
+ Doc#doc{revs=[NewRev|DiskRevs]}
+ end;
+ error ->
+ throw(conflict)
+ end;
+ [] ->
+ % new doc, and we have existing revs.
+ OldDocInfo = couch_doc:to_doc_info(OldFullDocInfo),
+ if OldDocInfo#doc_info.deleted ->
+ % existing doc is a deleton
+ % allow this new doc to be a later revision.
+ {_Deleted, _Sp, Revs} = dict:fetch(OldDocInfo#doc_info.rev, LeafRevsDict),
+ Doc#doc{revs=[NewRev|Revs]};
+ true ->
+ throw(conflict)
+ end
+ end.
+
+update_docs(MainPid, Docs, Options) ->
+ Docs2 = lists:map(
+ fun(#doc{id=Id,revs=Revs}=Doc) ->
+ case Id of
+ ?LOCAL_DOC_PREFIX ++ _ ->
+ Rev = case Revs of [] -> 0; [Rev0|_] -> list_to_integer(Rev0) end,
+ Doc#doc{revs=[integer_to_list(Rev + 1)]};
+ _ ->
+ Doc#doc{revs=[integer_to_list(couch_util:rand32()) | Revs]}
+ end
+ end, Docs),
+ DocBuckets = group_alike_docs(Docs2),
+ Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
+ Db = get_db(MainPid),
+
+ % first things first, lookup the doc by id and get the most recent
+
+ ExistingDocs = get_full_doc_infos(Db, Ids),
+
+ DocBuckets2 = lists:zipwith(
+ fun(Bucket, not_found) ->
+ % no existing revs, make sure no old revision is specified.
+ [throw(conflict) || #doc{revs=[_NewRev, _OldRev | _]} <- Bucket],
+ Bucket;
+ (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}) ->
+ Leafs = couch_key_tree:get_all_leafs(OldRevTree),
+ LeafRevsDict = dict:from_list([{Rev, {Deleted, Sp, Revs}} || {Rev, {Deleted, Sp}, Revs} <- Leafs]),
+ [prepare_doc_for_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket]
+ end,
+ DocBuckets, ExistingDocs),
+
+ % flush unwritten binaries to disk.
+ DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2],
+
+ case gen_server:call(MainPid, {update_docs, DocBuckets3, Options}) of
+ ok ->
+ % return back the new rev ids, in the same order input.
+ {ok, [NewRev || #doc{revs=[NewRev|_]} <- Docs2]};
+ Else->
+ throw(Else)
+ end.
+
+save_docs(MainPid, Docs) ->
+ save_docs(MainPid, Docs, []).
+
+save_docs(MainPid, Docs, Options) ->
+ % flush unwritten binaries to disk.
+ Db = get_db(MainPid),
+ DocBuckets = group_alike_docs(Docs),
+ DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ ok = gen_server:call(MainPid, {update_docs, DocBuckets2, Options}).
+
+
+doc_flush_binaries(Doc, Fd) ->
+ % calc size of binaries to write out
+ Bins = Doc#doc.attachments,
+ PreAllocSize = lists:foldl(
+ fun(BinValue, SizeAcc) ->
+ case BinValue of
+ {_Key, {_Type, {Fd0, _StreamPointer, _Len}}} when Fd0 == Fd ->
+ % already written to our file, nothing to write
+ SizeAcc;
+ {_Key, {_Type, {_OtherFd, _StreamPointer, Len}}} ->
+ % written to a different file
+ SizeAcc + Len;
+ {_Key, {_Type, Bin}} when is_binary(Bin) ->
+ SizeAcc + size(Bin)
+ end
+ end,
+ 0, Bins),
+
+ {ok, OutputStream} = couch_stream:open(Fd),
+ ok = couch_stream:ensure_buffer(OutputStream, PreAllocSize),
+
+ NewBins = lists:map(
+ fun({Key, {Type, BinValue}}) ->
+ NewBinValue =
+ case BinValue of
+ {Fd0, StreamPointer, Len} when Fd0 == Fd ->
+ % already written to our file, nothing to write
+ {Fd, StreamPointer, Len};
+ {OtherFd, StreamPointer, Len} ->
+ % written to a different file (or a closed file
+ % instance, which will cause an error)
+ {ok, {NewStreamPointer, Len}, _EndSp} =
+ couch_stream:foldl(OtherFd, StreamPointer, Len,
+ fun(Bin, {BeginPointer, SizeAcc}) ->
+ {ok, Pointer} = couch_stream:write(OutputStream, Bin),
+ case SizeAcc of
+ 0 -> % this was the first write, record the pointer
+ {ok, {Pointer, size(Bin)}};
+ _ ->
+ {ok, {BeginPointer, SizeAcc + size(Bin)}}
+ end
+ end,
+ {{0,0}, 0}),
+ {Fd, NewStreamPointer, Len};
+ Bin when is_binary(Bin), size(Bin) > 0 ->
+ {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
+ {Fd, StreamPointer, size(Bin)}
+ end,
+ {Key, {Type, NewBinValue}}
+ end, Bins),
+
+ {ok, _FinalPos} = couch_stream:close(OutputStream),
+
+ Doc#doc{attachments = NewBins}.
+
+enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) ->
+ Db = get_db(MainPid),
+ couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx).
+
+enum_docs_since(MainPid, SinceSeq, InFun, Acc) ->
+ enum_docs_since(MainPid, SinceSeq, fwd, InFun, Acc).
+
+enum_docs(MainPid, StartId, Direction, InFun, InAcc) ->
+ Db = get_db(MainPid),
+ couch_btree:fold(Db#db.docinfo_by_Id_btree, StartId, Direction, InFun, InAcc).
+
+enum_docs(MainPid, StartId, InFun, Ctx) ->
+ enum_docs(MainPid, StartId, fwd, InFun, Ctx).
+
+close(MainPid) ->
+ Ref = erlang:monitor(process, MainPid),
+ unlink(MainPid),
+ exit(MainPid, normal),
+ receive
+ {'DOWN', Ref, process, MainPid, _Reason} ->
+ ok
+ end.
+
+
+% server functions
+
+init({DbName, Fd, Options}) ->
+ link(Fd),
+ case lists:member(create, Options) of
+ true ->
+ % create a new header and writes it to the file
+ Header = #db_header{},
+ ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header),
+ ok = couch_file:sync(Fd),
+ init_main(DbName, Fd, Header);
+ false ->
+ {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>),
+ init_main(DbName, Fd, Header)
+ end.
+
+btree_by_seq_split(DocInfo) ->
+ #doc_info{
+ id = Id,
+ rev = Rev,
+ update_seq = Seq,
+ summary_pointer = Sp,
+ conflict_revs = Conflicts,
+ deleted_conflict_revs = DelConflicts,
+ deleted = Deleted} = DocInfo,
+ {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}.
+
+btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) ->
+ #doc_info{
+ id = Id,
+ rev = Rev,
+ update_seq = Seq,
+ summary_pointer = Sp,
+ conflict_revs = Conflicts,
+ deleted_conflict_revs = DelConflicts,
+ deleted = Deleted}.
+
+btree_by_name_split(#full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}) ->
+ {Id, {Seq, Tree}}.
+
+btree_by_name_join(Id, {Seq, Tree}) ->
+ #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}.
+
+
+init_main(DbName, Fd, Header) ->
+ {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
+ ok = couch_stream:set_min_buffer(SummaryStream, 10000),
+ {ok, IdBtree} = couch_btree:open(Header#db_header.docinfo_by_Id_btree_state, Fd,
+ [{split, fun(V) -> btree_by_name_split(V) end},
+ {join, fun(K,V) -> btree_by_name_join(K,V) end}] ),
+ {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
+ [{split, fun(V) -> btree_by_seq_split(V) end},
+ {join, fun(K,V) -> btree_by_seq_join(K,V) end}] ),
+ {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
+
+ Db = #db{
+ main_pid=self(),
+ fd=Fd,
+ header=Header,
+ summary_stream = SummaryStream,
+ docinfo_by_Id_btree = IdBtree,
+ docinfo_by_seq_btree = SeqBtree,
+ local_docs_btree = LocalDocsBtree,
+ last_update_seq = Header#db_header.last_update_seq,
+ doc_count = Header#db_header.doc_count,
+ doc_del_count = Header#db_header.doc_del_count,
+ name = DbName
+ },
+
+ UpdatePid = spawn_link(couch_db, start_update_loop, [Db]),
+
+ {ok, Db#db{update_pid=UpdatePid}}.
+
+terminate(_Reason, Db) ->
+ Db#db.update_pid ! close,
+ couch_file:close(Db#db.fd).
+
+handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) ->
+ Updater ! {From, update_docs, DocActions, Options},
+ {noreply, Db};
+handle_call(get_db, _From, Db) ->
+ {reply, {ok, Db}, Db};
+handle_call({db_updated, NewDb}, _From, _OldDb) ->
+ {reply, ok, NewDb}.
+
+
+handle_cast(foo, Main) ->
+ {noreply, Main}.
+
+%%% Internal function %%%
+
+start_update_loop(Db) ->
+ update_loop(Db#db{update_pid=self()}).
+
+update_loop(Db) ->
+ receive
+ {OrigFrom, update_docs, DocActions, Options} ->
+ case (catch update_docs_int(Db, DocActions, Options)) of
+ {ok, Db2} ->
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ gen_server:reply(OrigFrom, ok),
+ couch_db_update_notifier:notify({updated, Db2#db.name}),
+ update_loop(Db2);
+ conflict ->
+ gen_server:reply(OrigFrom, conflict),
+ update_loop(Db);
+ Error ->
+ exit(Error) % we crashed
+ end;
+ close ->
+ % terminate loop
+ exit(normal)
+ end.
+
+get_db(MainPid) ->
+ {ok, Db} = gen_server:call(MainPid, get_db),
+ Db.
+
+open_doc_revs_int(Db, Id, Revs, Options) ->
+ case get_full_doc_info(Db, Id) of
+ {ok, #full_doc_info{rev_tree=RevTree}} ->
+ {FoundRevs, MissingRevs} =
+ case Revs of
+ all ->
+ {couch_key_tree:get_all_leafs(RevTree), []};
+ _ ->
+ case lists:member(latest, Options) of
+ true ->
+ couch_key_tree:get_key_leafs(RevTree, Revs);
+ false ->
+ couch_key_tree:get(RevTree, Revs)
+ end
+ end,
+ FoundResults =
+ lists:map(fun({Rev, Value, FoundRevPath}) ->
+ case Value of
+ 0 ->
+ % we have the rev in our list but know nothing about it
+ {{not_found, missing}, Rev};
+ {IsDeleted, SummaryPtr} ->
+ {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
+ end
+ end, FoundRevs),
+ Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
+ {ok, Results};
+ not_found when Revs == all ->
+ {ok, []};
+ not_found ->
+ {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
+ end.
+
+open_doc_int(Db, ?LOCAL_DOC_PREFIX ++ _ = Id, _Options) ->
+ case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
+ [{ok, {_, {Rev, BodyData}}}] ->
+ {ok, #doc{id=Id, revs=[integer_to_list(Rev)], body=BodyData}};
+ [not_found] ->
+ {not_found, missing}
+ end;
+open_doc_int(Db, #doc_info{id=Id,rev=Rev,deleted=IsDeleted,summary_pointer=Sp}=DocInfo, Options) ->
+ Doc = make_doc(Db, Id, IsDeleted, Sp, [Rev]),
+ {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}};
+open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
+ #doc_info{deleted=IsDeleted,rev=Rev, summary_pointer=Sp} = DocInfo =
+ couch_doc:to_doc_info(FullDocInfo),
+ {[{_Rev,_Value, Revs}], []} = couch_key_tree:get(RevTree, [Rev]),
+ Doc = make_doc(Db, Id, IsDeleted, Sp, Revs),
+ {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}};
+open_doc_int(Db, Id, Options) ->
+ case get_full_doc_info(Db, Id) of
+ {ok, FullDocInfo} ->
+ open_doc_int(Db, FullDocInfo, Options);
+ not_found ->
+ throw({not_found, missing})
+ end.
+
+doc_meta_info(DocInfo, RevTree, Options) ->
+ case lists:member(revs_info, Options) of
+ false -> [];
+ true ->
+ {[RevPath],[]} =
+ couch_key_tree:get_full_key_paths(RevTree, [DocInfo#doc_info.rev]),
+ [{revs_info, [{Rev, Deleted} || {Rev, {Deleted, _Sp0}} <- RevPath]}]
+ end ++
+ case lists:member(conflicts, Options) of
+ false -> [];
+ true ->
+ case DocInfo#doc_info.conflict_revs of
+ [] -> [];
+ _ -> [{conflicts, DocInfo#doc_info.conflict_revs}]
+ end
+ end ++
+ case lists:member(deleted_conflicts, Options) of
+ false -> [];
+ true ->
+ case DocInfo#doc_info.deleted_conflict_revs of
+ [] -> [];
+ _ -> [{deleted_conflicts, DocInfo#doc_info.deleted_conflict_revs}]
+ end
+ end.
+
+% rev tree functions
+
+doc_to_tree(Doc) ->
+ doc_to_tree(Doc, lists:reverse(Doc#doc.revs)).
+
+doc_to_tree(Doc, [RevId]) ->
+ [{RevId, Doc, []}];
+doc_to_tree(Doc, [RevId | Rest]) ->
+ [{RevId, [], doc_to_tree(Doc, Rest)}].
+
+make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
+ {BodyData, BinValues} =
+ case SummaryPointer of
+ nil ->
+ {[], []};
+ _ ->
+ {ok, {BodyData0, BinValues0}} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer),
+ {BodyData0, [{Name, {Type, {Db#db.fd, Sp, Len}}} || {Name, {Type, Sp, Len}} <- BinValues0]}
+ end,
+ #doc{
+ id = Id,
+ revs = RevisionPath,
+ body = BodyData,
+ attachments = BinValues,
+ deleted = Deleted
+ }.
+
+flush_trees(_Db, [], AccFlushedTrees) ->
+ {ok, lists:reverse(AccFlushedTrees)};
+flush_trees(Db, [Unflushed | RestUnflushed], AccFlushed) ->
+ Flushed = couch_key_tree:map(
+ fun(_Rev, Value) ->
+ case Value of
+ #doc{attachments=Atts,deleted=IsDeleted}=Doc ->
+ % this node value is actually an unwritten document summary,
+ % write to disk.
+
+ % convert bins, removing the FD.
+ % All bins should have been flushed to disk already.
+ Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Atts],
+ {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+ {IsDeleted, NewSummaryPointer};
+ _ ->
+ Value
+ end
+ end, Unflushed),
+ flush_trees(Db, RestUnflushed, [Flushed | AccFlushed]).
+
+merge_rev_trees(_NoConflicts, [], [], AccNewTrees) ->
+ {ok, lists:reverse(AccNewTrees)};
+merge_rev_trees(NoConflicts, [NewDocs | RestDocsList],
+ [OldTree | RestOldTrees], AccNewTrees) ->
+ UpdatesRevTree = lists:foldl(
+ fun(NewDoc, AccTree) ->
+ couch_key_tree:merge(AccTree, doc_to_tree(NewDoc))
+ end,
+ [], NewDocs),
+ NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree),
+ if NoConflicts andalso OldTree == [] ->
+ OldConflicts = couch_key_tree:count_leafs(OldTree),
+ NewConflicts = couch_key_tree:count_leafs(NewRevTree),
+ if NewConflicts > OldConflicts ->
+ throw(conflict);
+ true -> ok
+ end;
+ true -> ok
+ end,
+ merge_rev_trees(NoConflicts, RestDocsList, RestOldTrees, [NewRevTree | AccNewTrees]).
+
+new_index_entries([], [], Seq, DocCount, DelCount, AccById, AccBySeq) ->
+ {ok, Seq, DocCount, DelCount, AccById, AccBySeq};
+new_index_entries([Id|RestIds], [RevTree|RestTrees], Seq0, DocCount, DelCount, AccById, AccBySeq) ->
+ Seq = Seq0 + 1,
+ FullDocInfo = #full_doc_info{id=Id, update_seq=Seq, rev_tree=RevTree},
+ #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo),
+ {DocCount2, DelCount2} =
+ if Deleted -> {DocCount, DelCount + 1};
+ true -> {DocCount + 1, DelCount}
+ end,
+ new_index_entries(RestIds, RestTrees, Seq, DocCount2, DelCount2, [FullDocInfo|AccById], [DocInfo|AccBySeq]).
+
+update_docs_int(Db, DocsList, Options) ->
+ #db{
+ docinfo_by_Id_btree = DocInfoByIdBTree,
+ docinfo_by_seq_btree = DocInfoBySeqBTree,
+ last_update_seq = LastSeq,
+ doc_count = FullDocCount,
+ doc_del_count = FullDelCount
+ } = Db,
+
+ % separate out the NonRep documents from the rest of the documents
+ {DocsList2, NonRepDocs} = lists:foldl(
+ fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
+ case Id of
+ ?LOCAL_DOC_PREFIX ++ _ when Rest==[] ->
+ % when saving NR (non rep) documents, you can only save a single rev
+ {DocsListAcc, [Doc | NonRepDocsAcc]};
+ Id->
+ {[Docs | DocsListAcc], NonRepDocsAcc}
+ end
+ end, {[], []}, DocsList),
+
+ Ids = [Id || [#doc{id=Id}|_] <- DocsList2],
+
+ % lookup up the existing documents, if they exist.
+ OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
+ OldDocTrees = lists:map(
+ fun({ok, #full_doc_info{rev_tree=OldRevTree}}) ->
+ OldRevTree;
+ (not_found) ->
+ []
+ end,
+ OldDocLookups),
+
+ {OldCount, OldDelCount} = lists:foldl(
+ fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) ->
+ case couch_doc:to_doc_info(FullDocInfo) of
+ #doc_info{deleted=false} ->
+ {OldCountAcc + 1, OldDelCountAcc};
+ _ ->
+ {OldCountAcc , OldDelCountAcc + 1}
+ end;
+ (not_found, Acc) ->
+ Acc
+ end, {0, 0}, OldDocLookups),
+
+ % Merge the new docs into the revision trees.
+ NoConflicts = lists:member(no_conflicts, Options),
+ {ok, NewRevTrees} = merge_rev_trees(NoConflicts, DocsList2, OldDocTrees, []),
+
+ RemoveSeqs = [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
+
+ % All regular documents are now ready to write.
+
+ % Try to write the local documents first, a conflict might be generated
+ {ok, Db2} = update_local_docs(Db, NonRepDocs),
+
+ % Write out the documents summaries (they are stored in the nodes of the rev trees)
+ {ok, FlushedRevTrees} = flush_trees(Db2, NewRevTrees, []),
+
+ {ok, NewSeq, NewDocsCount, NewDelCount, InfoById, InfoBySeq} =
+ new_index_entries(Ids, FlushedRevTrees, LastSeq, 0, 0, [], []),
+
+ % and the indexes to the documents
+ {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
+ {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
+
+ Db3 = Db2#db{
+ docinfo_by_Id_btree = DocInfoByIdBTree2,
+ docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ last_update_seq = NewSeq,
+ doc_count = FullDocCount + NewDocsCount - OldCount,
+ doc_del_count = FullDelCount + NewDelCount - OldDelCount
+ },
+
+ case lists:member(delay_commit, Options) of
+ true ->
+ {ok, Db3};
+ false ->
+ commit_outstanding(Db3)
+ end.
+
+update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
+ Ids = [Id || #doc{id=Id} <- Docs],
+ OldDocLookups = couch_btree:lookup(Btree, Ids),
+ BtreeEntries = lists:zipwith(
+ fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
+ BasedOnRev =
+ case Revs of
+ [] -> 0;
+ [RevStr|_] -> list_to_integer(RevStr) - 1
+ end,
+ OldRev =
+ case OldDocLookup of
+ {ok, {_, {OldRev0, _}}} -> OldRev0;
+ not_found -> 0
+ end,
+ case OldRev == BasedOnRev of
+ true ->
+ case Delete of
+ false -> {update, {Id, {OldRev+1, Body}}};
+ true -> {remove, Id}
+ end;
+ false ->
+ throw(conflict)
+ end
+
+ end, Docs, OldDocLookups),
+
+ BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
+ BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
+
+ {ok, Btree2} =
+ couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
+
+ {ok, Db#db{local_docs_btree = Btree2}}.
+
+
+
+commit_outstanding(#db{fd=Fd, header=Header} = Db) ->
+ ok = couch_file:sync(Fd), % commit outstanding data
+ Header2 = Header#db_header{
+ last_update_seq = Db#db.last_update_seq,
+ summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
+ docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
+ docinfo_by_Id_btree_state = couch_btree:get_state(Db#db.docinfo_by_Id_btree),
+ local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
+ doc_count = Db#db.doc_count,
+ doc_del_count = Db#db.doc_del_count
+ },
+ ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2),
+ ok = couch_file:sync(Fd), % commit header to disk
+ Db2 = Db#db{
+ header = Header2
+ },
+ {ok, Db2}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+