summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db_updater.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-11 15:22:33 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-11 17:39:37 -0400
commit81bdbed444df2cbcf3cdb32f7d4a74019de06454 (patch)
treeeade7d0d9bb4cac01b55fd8642adfe0f7da35161 /src/couchdb/couch_db_updater.erl
parentcc1910f73fbd20c5ffc94bd61e7701d7f5e4c92a (diff)
reorganize couch .erl and driver code into rebar layout
Diffstat (limited to 'src/couchdb/couch_db_updater.erl')
-rw-r--r--src/couchdb/couch_db_updater.erl879
1 files changed, 0 insertions, 879 deletions
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
deleted file mode 100644
index 19a4c165..00000000
--- a/src/couchdb/couch_db_updater.erl
+++ /dev/null
@@ -1,879 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_db_updater).
--behaviour(gen_server).
-
--export([btree_by_id_reduce/2,btree_by_seq_reduce/2]).
--export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-
--include("couch_db.hrl").
-
-
-init({MainPid, DbName, Filepath, Fd, Options}) ->
- process_flag(trap_exit, true),
- case lists:member(create, Options) of
- true ->
- % create a new header and writes it to the file
- Header = #db_header{},
- ok = couch_file:write_header(Fd, Header),
- % delete any old compaction files that might be hanging around
- RootDir = couch_config:get("couchdb", "database_dir", "."),
- couch_file:delete(RootDir, Filepath ++ ".compact");
- false ->
- ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE
- case couch_file:read_header(Fd) of
- {ok, Header} ->
- ok;
- no_valid_header ->
- % create a new header and writes it to the file
- Header = #db_header{},
- ok = couch_file:write_header(Fd, Header),
- % delete any old compaction files that might be hanging around
- file:delete(Filepath ++ ".compact")
- end
- end,
-
- Db = init_db(DbName, Filepath, Fd, Header),
- Db2 = refresh_validate_doc_funs(Db),
- {ok, Db2#db{main_pid = MainPid, is_sys_db = lists:member(sys_db, Options)}}.
-
-
-terminate(_Reason, Db) ->
- couch_file:close(Db#db.fd),
- couch_util:shutdown_sync(Db#db.compactor_pid),
- couch_util:shutdown_sync(Db#db.fd_ref_counter),
- ok.
-
-handle_call(get_db, _From, Db) ->
- {reply, {ok, Db}, Db};
-handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
- {reply, ok, Db}; % no data waiting, return ok immediately
-handle_call(full_commit, _From, Db) ->
- {reply, ok, commit_data(Db)}; % commit the data and return ok
-handle_call(increment_update_seq, _From, Db) ->
- Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
- couch_db_update_notifier:notify({updated, Db#db.name}),
- {reply, {ok, Db2#db.update_seq}, Db2};
-
-handle_call({set_security, NewSec}, _From, Db) ->
- {ok, Ptr} = couch_file:append_term(Db#db.fd, NewSec),
- Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
- update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
- {reply, ok, Db2};
-
-handle_call({set_revs_limit, Limit}, _From, Db) ->
- Db2 = commit_data(Db#db{revs_limit=Limit,
- update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
- {reply, ok, Db2};
-
-handle_call({purge_docs, _IdRevs}, _From,
- #db{compactor_pid=Pid}=Db) when Pid /= nil ->
- {reply, {error, purge_during_compaction}, Db};
-handle_call({purge_docs, IdRevs}, _From, Db) ->
- #db{
- fd=Fd,
- fulldocinfo_by_id_btree = DocInfoByIdBTree,
- docinfo_by_seq_btree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- header = Header = #db_header{purge_seq=PurgeSeq}
- } = Db,
- DocLookups = couch_btree:lookup(DocInfoByIdBTree,
- [Id || {Id, _Revs} <- IdRevs]),
-
- NewDocInfos = lists:zipwith(
- fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
- case couch_key_tree:remove_leafs(Tree, Revs) of
- {_, []=_RemovedRevs} -> % no change
- nil;
- {NewTree, RemovedRevs} ->
- {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
- end;
- (_, not_found) ->
- nil
- end,
- IdRevs, DocLookups),
-
- SeqsToRemove = [Seq
- || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
-
- FullDocInfoToUpdate = [FullInfo
- || {#full_doc_info{rev_tree=Tree}=FullInfo,_}
- <- NewDocInfos, Tree /= []],
-
- IdRevsPurged = [{Id, Revs}
- || {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
-
- {DocInfoToUpdate, NewSeq} = lists:mapfoldl(
- fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) ->
- Tree2 = couch_key_tree:map_leafs( fun(RevInfo) ->
- RevInfo#rev_info{seq=SeqAcc + 1}
- end, Tree),
- {couch_doc:to_doc_info(FullInfo#full_doc_info{rev_tree=Tree2}),
- SeqAcc + 1}
- end, LastSeq, FullDocInfoToUpdate),
-
- IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_}
- <- NewDocInfos],
-
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
- DocInfoToUpdate, SeqsToRemove),
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
- FullDocInfoToUpdate, IdsToRemove),
- {ok, Pointer} = couch_file:append_term(Fd, IdRevsPurged),
-
- Db2 = commit_data(
- Db#db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree2,
- docinfo_by_seq_btree = DocInfoBySeqBTree2,
- update_seq = NewSeq + 1,
- header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
-
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
- couch_db_update_notifier:notify({updated, Db#db.name}),
- {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2};
-handle_call(start_compact, _From, Db) ->
- case Db#db.compactor_pid of
- nil ->
- ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
- Pid = spawn_link(fun() -> start_copy_compact(Db) end),
- Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
- {reply, ok, Db2};
- _ ->
- % compact currently running, this is a no-op
- {reply, ok, Db}
- end.
-
-
-
-handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
- {ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader} = couch_file:read_header(NewFd),
- #db{update_seq=NewSeq} = NewDb =
- init_db(Db#db.name, Filepath, NewFd, NewHeader),
- unlink(NewFd),
- case Db#db.update_seq == NewSeq of
- true ->
- % suck up all the local docs into memory and write them to the new db
- {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
- fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs),
-
- NewDb2 = commit_data(NewDb#db{
- local_docs_btree = NewLocalBtree,
- main_pid = Db#db.main_pid,
- filepath = Filepath,
- instance_start_time = Db#db.instance_start_time,
- revs_limit = Db#db.revs_limit
- }),
-
- ?LOG_DEBUG("CouchDB swapping files ~s and ~s.",
- [Filepath, CompactFilepath]),
- RootDir = couch_config:get("couchdb", "database_dir", "."),
- couch_file:delete(RootDir, Filepath),
- ok = file:rename(CompactFilepath, Filepath),
- close_db(Db),
- ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}),
- ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
- {noreply, NewDb2#db{compactor_pid=nil}};
- false ->
- ?LOG_INFO("Compaction file still behind main file "
- "(update seq=~p. compact update seq=~p). Retrying.",
- [Db#db.update_seq, NewSeq]),
- close_db(NewDb),
- Pid = spawn_link(fun() -> start_copy_compact(Db) end),
- Db2 = Db#db{compactor_pid=Pid},
- {noreply, Db2}
- end.
-
-
-handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
- FullCommit}, Db) ->
- GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
- if NonRepDocs == [] ->
- {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
- [Client], MergeConflicts, FullCommit);
- true ->
- GroupedDocs3 = GroupedDocs2,
- FullCommit2 = FullCommit,
- Clients = [Client]
- end,
- NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
- try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
- FullCommit2) of
- {ok, Db2} ->
- ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
- if Db2#db.update_seq /= Db#db.update_seq ->
- couch_db_update_notifier:notify({updated, Db2#db.name});
- true -> ok
- end,
- [catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
- {noreply, Db2}
- catch
- throw: retry ->
- [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
- {noreply, Db}
- end;
-handle_info(delayed_commit, #db{waiting_delayed_commit=nil}=Db) ->
- %no outstanding delayed commits, ignore
- {noreply, Db};
-handle_info(delayed_commit, Db) ->
- case commit_data(Db) of
- Db ->
- {noreply, Db};
- Db2 ->
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
- {noreply, Db2}
- end;
-handle_info({'EXIT', _Pid, normal}, Db) ->
- {noreply, Db};
-handle_info({'EXIT', _Pid, Reason}, Db) ->
- {stop, Reason, Db}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-merge_updates([], RestB, AccOutGroups) ->
- lists:reverse(AccOutGroups, RestB);
-merge_updates(RestA, [], AccOutGroups) ->
- lists:reverse(AccOutGroups, RestA);
-merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA],
- [[{_, #doc{id=IdB}}|_]=GroupB | RestB], AccOutGroups) ->
- if IdA == IdB ->
- merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]);
- IdA < IdB ->
- merge_updates(RestA, [GroupB | RestB], [GroupA | AccOutGroups]);
- true ->
- merge_updates([GroupA | RestA], RestB, [GroupB | AccOutGroups])
- end.
-
-collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
- receive
- % Only collect updates with the same MergeConflicts flag and without
- % local docs. It's easier to just avoid multiple _local doc
- % updaters than deal with their possible conflicts, and local docs
- % writes are relatively rare. Can be optmized later if really needed.
- {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} ->
- GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup]
- || DocGroup <- GroupedDocs],
- GroupedDocsAcc2 =
- merge_updates(GroupedDocsAcc, GroupedDocs2, []),
- collect_updates(GroupedDocsAcc2, [Client | ClientsAcc],
- MergeConflicts, (FullCommit or FullCommit2))
- after 0 ->
- {GroupedDocsAcc, ClientsAcc, FullCommit}
- end.
-
-
-btree_by_seq_split(#doc_info{id=Id, high_seq=KeySeq, revs=Revs}) ->
- RevInfos = [{Rev, Seq, Bp} ||
- #rev_info{rev=Rev,seq=Seq,deleted=false,body_sp=Bp} <- Revs],
- DeletedRevInfos = [{Rev, Seq, Bp} ||
- #rev_info{rev=Rev,seq=Seq,deleted=true,body_sp=Bp} <- Revs],
- {KeySeq,{Id, RevInfos, DeletedRevInfos}}.
-
-btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
- #doc_info{
- id = Id,
- high_seq=KeySeq,
- revs =
- [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} ||
- {Rev, Seq, Bp} <- RevInfos] ++
- [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
- {Rev, Seq, Bp} <- DeletedRevInfos]};
-btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) ->
- % 09 UPGRADE CODE
- % this is the 0.9.0 and earlier by_seq record. It's missing the body pointers
- % and individual seq nums for conflicts that are currently in the index,
- % meaning the filtered _changes api will not work except for on main docs.
- % Simply compact a 0.9.0 database to upgrade the index.
- #doc_info{
- id=Id,
- high_seq=KeySeq,
- revs = [#rev_info{rev=Rev,seq=KeySeq,deleted=Deleted,body_sp=Bp}] ++
- [#rev_info{rev=Rev1,seq=KeySeq,deleted=false} || Rev1 <- Conflicts] ++
- [#rev_info{rev=Rev2,seq=KeySeq,deleted=true} || Rev2 <- DelConflicts]}.
-
-btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
- deleted=Deleted, rev_tree=Tree}) ->
- DiskTree =
- couch_key_tree:map(
- fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) ->
- {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq};
- (_RevId, ?REV_MISSING) ->
- ?REV_MISSING
- end, Tree),
- {Id, {Seq, if Deleted -> 1; true -> 0 end, DiskTree}}.
-
-btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
- Tree =
- couch_key_tree:map(
- fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) ->
- {IsDeleted == 1, BodyPointer, UpdateSeq};
- (_RevId, ?REV_MISSING) ->
- ?REV_MISSING;
- (_RevId, {IsDeleted, BodyPointer}) ->
- % 09 UPGRADE CODE
- % this is the 0.9.0 and earlier rev info record. It's missing the seq
- % nums, which means couchdb will sometimes reexamine unchanged
- % documents with the _changes API.
- % This is fixed by compacting the database.
- {IsDeleted == 1, BodyPointer, HighSeq}
- end, DiskTree),
-
- #full_doc_info{id=Id, update_seq=HighSeq, deleted=Deleted==1, rev_tree=Tree}.
-
-btree_by_id_reduce(reduce, FullDocInfos) ->
- % count the number of not deleted documents
- {length([1 || #full_doc_info{deleted=false} <- FullDocInfos]),
- length([1 || #full_doc_info{deleted=true} <- FullDocInfos])};
-btree_by_id_reduce(rereduce, Reds) ->
- {lists:sum([Count || {Count,_} <- Reds]),
- lists:sum([DelCount || {_, DelCount} <- Reds])}.
-
-btree_by_seq_reduce(reduce, DocInfos) ->
- % count the number of documents
- length(DocInfos);
-btree_by_seq_reduce(rereduce, Reds) ->
- lists:sum(Reds).
-
-simple_upgrade_record(Old, New) when tuple_size(Old) =:= tuple_size(New) ->
- Old;
-simple_upgrade_record(Old, New) when tuple_size(Old) < tuple_size(New) ->
- OldSz = tuple_size(Old),
- NewValuesTail =
- lists:sublist(tuple_to_list(New), OldSz + 1, tuple_size(New) - OldSz),
- list_to_tuple(tuple_to_list(Old) ++ NewValuesTail).
-
-
-init_db(DbName, Filepath, Fd, Header0) ->
- Header1 = simple_upgrade_record(Header0, #db_header{}),
- Header =
- case element(2, Header1) of
- 1 -> Header1#db_header{unused = 0, security_ptr = nil}; % 0.9
- 2 -> Header1#db_header{unused = 0, security_ptr = nil}; % post 0.9 and pre 0.10
- 3 -> Header1#db_header{security_ptr = nil}; % post 0.9 and pre 0.10
- 4 -> Header1#db_header{security_ptr = nil}; % 0.10 and pre 0.11
- ?LATEST_DISK_VERSION -> Header1;
- _ -> throw({database_disk_version_error, "Incorrect disk header version"})
- end,
-
- {ok, FsyncOptions} = couch_util:parse_term(
- couch_config:get("couchdb", "fsync_options",
- "[before_header, after_header, on_file_open]")),
-
- case lists:member(on_file_open, FsyncOptions) of
- true -> ok = couch_file:sync(Fd);
- _ -> ok
- end,
-
- {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
- [{split, fun(X) -> btree_by_id_split(X) end},
- {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
- {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}]),
- {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
- [{split, fun(X) -> btree_by_seq_split(X) end},
- {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
- {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]),
- {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
- case Header#db_header.security_ptr of
- nil ->
- Security = [],
- SecurityPtr = nil;
- SecurityPtr ->
- {ok, Security} = couch_file:pread_term(Fd, SecurityPtr)
- end,
- % convert start time tuple to microsecs and store as a binary string
- {MegaSecs, Secs, MicroSecs} = now(),
- StartTime = ?l2b(io_lib:format("~p",
- [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
- {ok, RefCntr} = couch_ref_counter:start([Fd]),
- #db{
- update_pid=self(),
- fd=Fd,
- fd_ref_counter = RefCntr,
- header=Header,
- fulldocinfo_by_id_btree = IdBtree,
- docinfo_by_seq_btree = SeqBtree,
- local_docs_btree = LocalDocsBtree,
- committed_update_seq = Header#db_header.update_seq,
- update_seq = Header#db_header.update_seq,
- name = DbName,
- filepath = Filepath,
- security = Security,
- security_ptr = SecurityPtr,
- instance_start_time = StartTime,
- revs_limit = Header#db_header.revs_limit,
- fsync_options = FsyncOptions
- }.
-
-
-close_db(#db{fd_ref_counter = RefCntr}) ->
- couch_ref_counter:drop(RefCntr).
-
-
-refresh_validate_doc_funs(Db) ->
- {ok, DesignDocs} = couch_db:get_design_docs(Db),
- ProcessDocFuns = lists:flatmap(
- fun(DesignDoc) ->
- case couch_doc:get_validate_doc_fun(DesignDoc) of
- nil -> [];
- Fun -> [Fun]
- end
- end, DesignDocs),
- Db#db{validate_doc_funs=ProcessDocFuns}.
-
-% rev tree functions
-
-flush_trees(_Db, [], AccFlushedTrees) ->
- {ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd=Fd,header=Header}=Db,
- [InfoUnflushed | RestUnflushed], AccFlushed) ->
- #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
- Flushed = couch_key_tree:map(
- fun(_Rev, Value) ->
- case Value of
- #doc{atts=Atts,deleted=IsDeleted}=Doc ->
- % this node value is actually an unwritten document summary,
- % write to disk.
- % make sure the Fd in the written bins is the same Fd we are
- % and convert bins, removing the FD.
- % All bins should have been written to disk already.
- DiskAtts =
- case Atts of
- [] -> [];
- [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd ->
- [{N,T,P,AL,DL,R,M,E}
- || #att{name=N,type=T,data={_,P},md5=M,revpos=R,
- att_len=AL,disk_len=DL,encoding=E}
- <- Atts];
- _ ->
- % BinFd must not equal our Fd. This can happen when a database
- % is being switched out during a compaction
- ?LOG_DEBUG("File where the attachments are written has"
- " changed. Possibly retrying.", []),
- throw(retry)
- end,
- {ok, NewSummaryPointer} =
- case Header#db_header.disk_version < 4 of
- true ->
- couch_file:append_term(Fd, {Doc#doc.body, DiskAtts});
- false ->
- couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts})
- end,
- {IsDeleted, NewSummaryPointer, UpdateSeq};
- _ ->
- Value
- end
- end, Unflushed),
- flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
-
-
-send_result(Client, Id, OriginalRevs, NewResult) ->
- % used to send a result to the client
- catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
-
-merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
- {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
-merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
- [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
- #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
- = OldDocInfo,
- NewRevTree = lists:foldl(
- fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) ->
- if not MergeConflicts ->
- case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
- {_NewTree, conflicts} when (not OldDeleted) ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
- AccTree;
- {NewTree, conflicts} when PrevRevs /= [] ->
- % Check to be sure if prev revision was specified, it's
- % a leaf node in the tree
- Leafs = couch_key_tree:get_all_leafs(AccTree),
- IsPrevLeaf = lists:any(fun({_, {LeafPos, [LeafRevId|_]}}) ->
- {LeafPos, LeafRevId} == {Pos-1, hd(PrevRevs)}
- end, Leafs),
- if IsPrevLeaf ->
- NewTree;
- true ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
- AccTree
- end;
- {NewTree, no_conflicts} when AccTree == NewTree ->
- % the tree didn't change at all
- % meaning we are saving a rev that's already
- % been editted again.
- if (Pos == 1) and OldDeleted ->
- % this means we are recreating a brand new document
- % into a state that already existed before.
- % put the rev into a subsequent edit of the deletion
- #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} =
- couch_doc:to_doc_info(OldDocInfo),
- NewRevId = couch_db:new_revid(
- NewDoc#doc{revs={OldPos, [OldRev]}}),
- NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
- {NewTree2, _} = couch_key_tree:merge(AccTree,
- [couch_db:doc_to_tree(NewDoc2)]),
- % we changed the rev id, this tells the caller we did
- send_result(Client, Id, {Pos-1,PrevRevs},
- {ok, {OldPos + 1, NewRevId}}),
- NewTree2;
- true ->
- send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
- AccTree
- end;
- {NewTree, _} ->
- NewTree
- end;
- true ->
- {NewTree, _} = couch_key_tree:merge(AccTree,
- [couch_db:doc_to_tree(NewDoc)]),
- NewTree
- end
- end,
- OldTree, NewDocs),
- if NewRevTree == OldTree ->
- % nothing changed
- merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos,
- AccRemoveSeqs, AccSeq);
- true ->
- % we have updated the document, give it a new seq #
- NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
- RemoveSeqs = case OldSeq of
- 0 -> AccRemoveSeqs;
- _ -> [OldSeq | AccRemoveSeqs]
- end,
- merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo,
- [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
- end.
-
-
-
-new_index_entries([], AccById, AccBySeq) ->
- {AccById, AccBySeq};
-new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) ->
- #doc_info{revs=[#rev_info{deleted=Deleted}|_]} = DocInfo =
- couch_doc:to_doc_info(FullDocInfo),
- new_index_entries(RestInfos,
- [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
- [DocInfo|AccBySeq]).
-
-
-stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
- [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
- #full_doc_info{rev_tree=Tree}=Info <- DocInfos].
-
-update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
- #db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree,
- docinfo_by_seq_btree = DocInfoBySeqBTree,
- update_seq = LastSeq
- } = Db,
- Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
- % lookup up the old documents, if they exist.
- OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
- OldDocInfos = lists:zipwith(
- fun(_Id, {ok, FullDocInfo}) ->
- FullDocInfo;
- (Id, not_found) ->
- #full_doc_info{id=Id}
- end,
- Ids, OldDocLookups),
- % Merge the new docs into the revision trees.
- {ok, NewDocInfos0, RemoveSeqs, NewSeq} = merge_rev_trees(
- MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
-
- NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0),
-
- % All documents are now ready to write.
-
- {ok, Db2} = update_local_docs(Db, NonRepDocs),
-
- % Write out the document summaries (the bodies are stored in the nodes of
- % the trees, the attachments are already written to disk)
- {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
-
- {IndexFullDocInfos, IndexDocInfos} =
- new_index_entries(FlushedFullDocInfos, [], []),
-
- % and the indexes
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs),
-
- Db3 = Db2#db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree2,
- docinfo_by_seq_btree = DocInfoBySeqBTree2,
- update_seq = NewSeq},
-
- % Check if we just updated any design documents, and update the validation
- % funs if we did.
- case [1 || <<"_design/",_/binary>> <- Ids] of
- [] ->
- Db4 = Db3;
- _ ->
- Db4 = refresh_validate_doc_funs(Db3)
- end,
-
- {ok, commit_data(Db4, not FullCommit)}.
-
-
-update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
- Ids = [Id || {_Client, #doc{id=Id}} <- Docs],
- OldDocLookups = couch_btree:lookup(Btree, Ids),
- BtreeEntries = lists:zipwith(
- fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, OldDocLookup) ->
- case PrevRevs of
- [RevStr|_] ->
- PrevRev = list_to_integer(?b2l(RevStr));
- [] ->
- PrevRev = 0
- end,
- OldRev =
- case OldDocLookup of
- {ok, {_, {OldRev0, _}}} -> OldRev0;
- not_found -> 0
- end,
- case OldRev == PrevRev of
- true ->
- case Delete of
- false ->
- send_result(Client, Id, {0, PrevRevs}, {ok,
- {0, ?l2b(integer_to_list(PrevRev + 1))}}),
- {update, {Id, {PrevRev + 1, Body}}};
- true ->
- send_result(Client, Id, {0, PrevRevs},
- {ok, {0, <<"0">>}}),
- {remove, Id}
- end;
- false ->
- send_result(Client, Id, {0, PrevRevs}, conflict),
- ignore
- end
- end, Docs, OldDocLookups),
-
- BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
- BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
-
- {ok, Btree2} =
- couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
-
- {ok, Db#db{local_docs_btree = Btree2}}.
-
-
-commit_data(Db) ->
- commit_data(Db, false).
-
-db_to_header(Db, Header) ->
- Header#db_header{
- update_seq = Db#db.update_seq,
- docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
- fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
- local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
- security_ptr = Db#db.security_ptr,
- revs_limit = Db#db.revs_limit}.
-
-commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
- Db#db{waiting_delayed_commit=erlang:send_after(1000,self(),delayed_commit)};
-commit_data(Db, true) ->
- Db;
-commit_data(Db, _) ->
- #db{
- fd = Fd,
- filepath = Filepath,
- header = OldHeader,
- fsync_options = FsyncOptions,
- waiting_delayed_commit = Timer
- } = Db,
- if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
- case db_to_header(Db, OldHeader) of
- OldHeader ->
- Db#db{waiting_delayed_commit=nil};
- Header ->
- case lists:member(before_header, FsyncOptions) of
- true -> ok = couch_file:sync(Filepath);
- _ -> ok
- end,
-
- ok = couch_file:write_header(Fd, Header),
-
- case lists:member(after_header, FsyncOptions) of
- true -> ok = couch_file:sync(Filepath);
- _ -> ok
- end,
-
- Db#db{waiting_delayed_commit=nil,
- header=Header,
- committed_update_seq=Db#db.update_seq}
- end.
-
-
-copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) ->
- {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcDb, SrcSp),
- % copy the bin values
- NewBinInfos = lists:map(
- fun({Name, {Type, BinSp, AttLen}}) when is_tuple(BinSp) orelse BinSp == null ->
- % 09 UPGRADE CODE
- {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
- couch_stream:old_copy_to_new_stream(SrcFd, BinSp, AttLen, DestFd),
- {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, identity};
- ({Name, {Type, BinSp, AttLen}}) ->
- % 09 UPGRADE CODE
- {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, identity};
- ({Name, Type, BinSp, AttLen, _RevPos, <<>>}) when
- is_tuple(BinSp) orelse BinSp == null ->
- % 09 UPGRADE CODE
- {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
- couch_stream:old_copy_to_new_stream(SrcFd, BinSp, AttLen, DestFd),
- {Name, Type, NewBinSp, AttLen, AttLen, AttLen, Md5, identity};
- ({Name, Type, BinSp, AttLen, RevPos, Md5}) ->
- % 010 UPGRADE CODE
- {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- {Name, Type, NewBinSp, AttLen, AttLen, RevPos, Md5, identity};
- ({Name, Type, BinSp, AttLen, DiskLen, RevPos, Md5, Enc1}) ->
- {NewBinSp, AttLen, _, Md5, _IdentityMd5} =
- couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- Enc = case Enc1 of
- true ->
- % 0110 UPGRADE CODE
- gzip;
- false ->
- % 0110 UPGRADE CODE
- identity;
- _ ->
- Enc1
- end,
- {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, Md5, Enc}
- end, BinInfos),
- {BodyData, NewBinInfos}.
-
-copy_rev_tree_attachments(SrcDb, DestFd, Tree) ->
- couch_key_tree:map(
- fun(Rev, {IsDel, Sp, Seq}, leaf) ->
- DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd),
- {IsDel, DocBody, Seq};
- (_, _, branch) ->
- ?REV_MISSING
- end, Tree).
-
-
-copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
- Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
- LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
-
- % write out the attachments
- NewFullDocInfos0 = lists:map(
- fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)}
- end, LookupResults),
- % write out the docs
- % we do this in 2 stages so the docs are written out contiguously, making
- % view indexing and replication faster.
- NewFullDocInfos1 = lists:map(
- fun(#full_doc_info{rev_tree=RevTree}=Info) ->
- Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
- fun(_Key, {IsDel, DocBody, Seq}) ->
- {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody),
- {IsDel, Pos, Seq}
- end, RevTree)}
- end, NewFullDocInfos0),
-
- NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1),
- NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
- RemoveSeqs =
- case Retry of
- false ->
- [];
- true ->
- % We are retrying a compaction, meaning the documents we are copying may
- % already exist in our file and must be removed from the by_seq index.
- Existing = couch_btree:lookup(NewDb#db.fulldocinfo_by_id_btree, Ids),
- [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
- end,
-
- {ok, DocInfoBTree} = couch_btree:add_remove(
- NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs),
- {ok, FullDocInfoBTree} = couch_btree:add_remove(
- NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
- NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree,
- docinfo_by_seq_btree=DocInfoBTree}.
-
-
-
-copy_compact(Db, NewDb0, Retry) ->
- FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
- NewDb = NewDb0#db{fsync_options=FsyncOptions},
- TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
- EnumBySeqFun =
- fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
- couch_task_status:update("Copied ~p of ~p changes (~p%)",
- [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]),
- if TotalCopied rem 1000 =:= 0 ->
- NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
- if TotalCopied rem 10000 =:= 0 ->
- {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}};
- true ->
- {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
- end;
- true ->
- {ok, {AccNewDb, [DocInfo | AccUncopied], TotalCopied + 1}}
- end
- end,
-
- couch_task_status:set_update_frequency(500),
-
- {ok, _, {NewDb2, Uncopied, TotalChanges}} =
- couch_btree:foldl(Db#db.docinfo_by_seq_btree, EnumBySeqFun,
- {NewDb, [], 0},
- [{start_key, NewDb#db.update_seq + 1}]),
-
- couch_task_status:update("Flushing"),
-
- NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
-
- % copy misc header values
- if NewDb3#db.security /= Db#db.security ->
- {ok, Ptr} = couch_file:append_term(NewDb3#db.fd, Db#db.security),
- NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
- true ->
- NewDb4 = NewDb3
- end,
-
- commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
-
-start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
- CompactFile = Filepath ++ ".compact",
- ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
- case couch_file:open(CompactFile) of
- {ok, Fd} ->
- couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
- Retry = true,
- {ok, Header} = couch_file:read_header(Fd);
- {error, enoent} ->
- couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
- {ok, Fd} = couch_file:open(CompactFile, [create]),
- Retry = false,
- ok = couch_file:write_header(Fd, Header=#db_header{})
- end,
- NewDb = init_db(Name, CompactFile, Fd, Header),
- unlink(Fd),
- NewDb2 = copy_compact(Db, NewDb, Retry),
- close_db(NewDb2),
- gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}).
-