diff options
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r-- | src/couchdb/couch_db.erl | 259 |
1 files changed, 192 insertions, 67 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index e567d27b..51d55822 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -13,20 +13,21 @@ -module(couch_db). -behaviour(gen_server). --export([open/2,create/2,create/3,get_doc_info/2]). +-export([open/2,create/2,create/3,get_doc_info/2,start_compact/1]). -export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]). --export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]). +-export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). -export([start_update_loop/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). +-export([start_copy_compact_int/1,continue_copy_compact_int/2]). -include("couch_db.hrl"). -record(db_header, {write_version = 0, - last_update_seq = 0, + update_seq = 0, summary_stream_state = nil, - docinfo_by_Id_btree_state = nil, + fulldocinfo_by_id_btree_state = nil, docinfo_by_seq_btree_state = nil, local_docs_btree_state = nil, doc_count=0, @@ -34,20 +35,24 @@ }). -record(db, - {main_pid, - update_pid, + {main_pid=nil, + update_pid=nil, + compactor_pid=nil, fd, header = #db_header{}, summary_stream, - docinfo_by_Id_btree, + fulldocinfo_by_id_btree, docinfo_by_seq_btree, local_docs_btree, - last_update_seq, + update_seq, doc_count, doc_del_count, name }). +% small value used in revision trees to indicate the revision isn't stored +-define(REV_MISSING, []). + start_link(DbName, Filepath, Options) -> case couch_file:open(Filepath, Options) of {ok, Fd} -> @@ -72,6 +77,9 @@ create(DbName, Filepath, Options) when is_list(Options) -> open(DbName, Filepath) -> start_link(DbName, Filepath, []). +start_compact(MainPid) -> + gen_server:cast(MainPid, start_compact). + delete_doc(MainPid, Id, Revisions) -> DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], {ok, [Result]} = update_docs(MainPid, DeletedDocs, [new_edits]), @@ -128,15 +136,22 @@ get_full_doc_info(Db, Id) -> get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) -> get_full_doc_infos(get_db(MainPid), Ids); get_full_doc_infos(#db{}=Db, Ids) -> - couch_btree:lookup(Db#db.docinfo_by_Id_btree, Ids). + couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids). get_db_info(MainPid) when is_pid(MainPid) -> get_db_info(get_db(MainPid)); -get_db_info(#db{doc_count=Count, doc_del_count=DelCount, last_update_seq=SeqNum}) -> +get_db_info(Db) -> + #db{fd=Fd, + compactor_pid=Compactor, + doc_count=Count, + doc_del_count=DelCount, + update_seq=SeqNum} = Db, InfoList = [ {doc_count, Count}, {doc_del_count, DelCount}, - {last_update_seq, SeqNum} + {last_update_seq, SeqNum}, + {compacting, Compactor==nil}, + {size, couch_file:bytes(Fd)} ], {ok, InfoList}. @@ -315,21 +330,11 @@ enum_docs_since(MainPid, SinceSeq, InFun, Acc) -> enum_docs(MainPid, StartId, Direction, InFun, InAcc) -> Db = get_db(MainPid), - couch_btree:fold(Db#db.docinfo_by_Id_btree, StartId, Direction, InFun, InAcc). + couch_btree:fold(Db#db.fulldocinfo_by_id_btree, StartId, Direction, InFun, InAcc). enum_docs(MainPid, StartId, InFun, Ctx) -> enum_docs(MainPid, StartId, fwd, InFun, Ctx). -close(MainPid) -> - Ref = erlang:monitor(process, MainPid), - unlink(MainPid), - exit(MainPid, normal), - receive - {'DOWN', Ref, process, MainPid, _Reason} -> - ok - end. - - % server functions init({DbName, Fd, Options}) -> @@ -339,12 +344,16 @@ init({DbName, Fd, Options}) -> % create a new header and writes it to the file Header = #db_header{}, ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), - ok = couch_file:sync(Fd), - init_main(DbName, Fd, Header); + ok = couch_file:sync(Fd); false -> - {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>), - init_main(DbName, Fd, Header) - end. + {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>) + end, + + Db = init_db(DbName, Fd, Header), + + UpdatePid = spawn_link(couch_db, start_update_loop, [Db]), + + {ok, Db#db{update_pid=UpdatePid}}. btree_by_seq_split(DocInfo) -> #doc_info{ @@ -374,10 +383,10 @@ btree_by_name_join(Id, {Seq, Tree}) -> #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}. -init_main(DbName, Fd, Header) -> +init_db(DbName, Fd, Header) -> {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), ok = couch_stream:set_min_buffer(SummaryStream, 10000), - {ok, IdBtree} = couch_btree:open(Header#db_header.docinfo_by_Id_btree_state, Fd, + {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, [{split, fun(V) -> btree_by_name_split(V) end}, {join, fun(K,V) -> btree_by_name_join(K,V) end}] ), {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, @@ -385,26 +394,22 @@ init_main(DbName, Fd, Header) -> {join, fun(K,V) -> btree_by_seq_join(K,V) end}] ), {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), - Db = #db{ + #db{ main_pid=self(), fd=Fd, header=Header, summary_stream = SummaryStream, - docinfo_by_Id_btree = IdBtree, + fulldocinfo_by_id_btree = IdBtree, docinfo_by_seq_btree = SeqBtree, local_docs_btree = LocalDocsBtree, - last_update_seq = Header#db_header.last_update_seq, + update_seq = Header#db_header.update_seq, doc_count = Header#db_header.doc_count, doc_del_count = Header#db_header.doc_del_count, name = DbName - }, - - UpdatePid = spawn_link(couch_db, start_update_loop, [Db]), - - {ok, Db#db{update_pid=UpdatePid}}. + }. terminate(_Reason, Db) -> - Db#db.update_pid ! close, + exit(Db#db.update_pid, kill), couch_file:close(Db#db.fd). handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) -> @@ -416,8 +421,17 @@ handle_call({db_updated, NewDb}, _From, _OldDb) -> {reply, ok, NewDb}. -handle_cast(foo, Main) -> - {noreply, Main}. +handle_cast(start_compact, #db{update_pid=Updater}=Db) -> + Updater ! compact, + {noreply, Db}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(Msg, Db) -> + couch_log:error("Bad message received for db ~s: ~p", [Db#db.name, Msg]), + exit({error, Msg}). + %%% Internal function %%% @@ -439,11 +453,40 @@ update_loop(Db) -> Error -> exit(Error) % we crashed end; - close -> - % terminate loop - exit(normal) + compact -> + case Db#db.compactor_pid of + nil -> + Pid = spawn_link(couch_db, start_copy_compact_int, [Db]), + Db2 = Db#db{compactor_pid=Pid}, + ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + update_loop(Db2); + _ -> + update_loop(Db) % already started + end; + {compact_done, #db{update_seq=CompactSeq}=NewDb} -> + case CompactSeq == Db#db.update_seq of + true -> + NewDb2 = swap_files(Db, NewDb), + update_loop(NewDb2#db{compactor_pid=nil}); + false -> + Pid = spawn_link(couch_db, continue_copy_compact_int, [Db, NewDb]), + Db2 = Db#db{compactor_pid=Pid}, + ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + update_loop(Db2) + end; + Else -> + couch_log:error("Unknown message received in db ~s:~p", [Db#db.name, Else]), + exit({error, Else}) end. +swap_files(#db{fd=OldFd, name=Name}=_DbOld, DbNew) -> + NormalFilename = couch_server:get_filename(Name), + true = file:rename(NormalFilename, NormalFilename ++ ".old"), + true = file:rename(NormalFilename ++ ".compact", NormalFilename), + couch_file:close(OldFd), + file:delete(NormalFilename ++ ".old"), + DbNew. + get_db(MainPid) -> {ok, Db} = gen_server:call(MainPid, get_db), Db. @@ -466,7 +509,7 @@ open_doc_revs_int(Db, Id, Revs, Options) -> FoundResults = lists:map(fun({Rev, Value, FoundRevPath}) -> case Value of - 0 -> + ?REV_MISSING -> % we have the rev in our list but know nothing about it {{not_found, missing}, Rev}; {IsDeleted, SummaryPtr} -> @@ -538,7 +581,7 @@ doc_to_tree(Doc) -> doc_to_tree(Doc, [RevId]) -> [{RevId, Doc, []}]; doc_to_tree(Doc, [RevId | Rest]) -> - [{RevId, [], doc_to_tree(Doc, Rest)}]. + [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}]. make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) -> {BodyData, BinValues} = @@ -613,9 +656,9 @@ new_index_entries([Id|RestIds], [RevTree|RestTrees], Seq0, DocCount, DelCount, A update_docs_int(Db, DocsList, Options) -> #db{ - docinfo_by_Id_btree = DocInfoByIdBTree, + fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, - last_update_seq = LastSeq, + update_seq = LastSeq, doc_count = FullDocCount, doc_del_count = FullDelCount } = Db, @@ -678,9 +721,9 @@ update_docs_int(Db, DocsList, Options) -> {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), Db3 = Db2#db{ - docinfo_by_Id_btree = DocInfoByIdBTree2, + fulldocinfo_by_id_btree = DocInfoByIdBTree2, docinfo_by_seq_btree = DocInfoBySeqBTree2, - last_update_seq = NewSeq, + update_seq = NewSeq, doc_count = FullDocCount + NewDocsCount - OldCount, doc_del_count = FullDelCount + NewDelCount - OldDelCount }, @@ -689,7 +732,7 @@ update_docs_int(Db, DocsList, Options) -> true -> {ok, Db3}; false -> - commit_outstanding(Db3) + {ok, commit_data(Db3)} end. update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> @@ -697,20 +740,20 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) -> - BasedOnRev = + NewRev = case Revs of [] -> 0; - [RevStr|_] -> list_to_integer(RevStr) - 1 + [RevStr|_] -> list_to_integer(RevStr) end, OldRev = case OldDocLookup of {ok, {_, {OldRev0, _}}} -> OldRev0; not_found -> 0 end, - case OldRev == BasedOnRev of + case OldRev + 1 == NewRev of true -> case Delete of - false -> {update, {Id, {OldRev+1, Body}}}; + false -> {update, {Id, {NewRev, Body}}}; true -> {remove, Id} end; false -> @@ -729,29 +772,111 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> -commit_outstanding(#db{fd=Fd, header=Header} = Db) -> +commit_data(#db{fd=Fd, header=Header} = Db) -> ok = couch_file:sync(Fd), % commit outstanding data Header2 = Header#db_header{ - last_update_seq = Db#db.last_update_seq, + update_seq = Db#db.update_seq, summary_stream_state = couch_stream:get_state(Db#db.summary_stream), docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), - docinfo_by_Id_btree_state = couch_btree:get_state(Db#db.docinfo_by_Id_btree), + fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), doc_count = Db#db.doc_count, doc_del_count = Db#db.doc_del_count }, - ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2), - ok = couch_file:sync(Fd), % commit header to disk - Db2 = Db#db{ - header = Header2 - }, - {ok, Db2}. + if Header == Header2 -> + Db; % unchanged. nothing to do + true -> + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2), + ok = couch_file:sync(Fd), % commit header to disk + Db#db{header = Header2} + end. +copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> + {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp), + % copy the bin values + NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) -> + {ok, NewBinSp} = couch_stream:copy_stream(SrcFd, BinSp, Len, DestFd), + {Name, {Type, NewBinSp, Len}} + end, BinInfos), + % now write the document summary + {ok, _SummaryPointer} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}). + +copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> + []; +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTrees]) -> + % This is a leaf node, copy it over + NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), + [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)]; +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTrees} | RestTrees]) -> + % inner node, only copy info/data from leaf nodes + [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTrees)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)]. + +copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) -> + Ids = [Id || #doc_info{id=Id} <- InfoBySeq], + LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), + NewFullDocInfos = lists:map( + fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> + Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)} + end, LookupResults), + NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos], + {ok, DocInfoBTree} = + couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []), + {ok, FullDocInfoBTree} = + couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []), + NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}. + + + +copy_compact_docs(Db, NewDb) -> + EnumBySeqFun = + fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) -> + case couch_util:should_flush() of + true -> + NewDb2 = copy_docs(Db, AccNewDb, lists:reverse(AccUncopied, DocInfo)), + {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}}; + false -> + {ok, {AccNewDb, [DocInfo | AccUncopied]}} + end + end, + {ok, {NewDb2, Uncopied}} = + couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}), -code_change(_OldVsn, State, _Extra) -> - {ok, State}. + case Uncopied of + [#doc_info{update_seq=LastSeq} | _] -> + commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq}, + lists:reverse(Uncopied))); + [] -> + NewDb2 + end. -handle_info(_Info, State) -> - {noreply, State}. +start_copy_compact_int(#db{name=Name}=Db) -> + couch_log:debug("New compaction process spawned for db \"%s\"", [Name]), + Filename = couch_server:get_compaction_filename(Name), + case couch_file:open(Filename) of + {ok, Fd} -> + couch_log:debug("Found existing compaction file for db \"%s\"", [Name]), + {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>); + {error, enoent} -> % + {ok, Fd} = couch_file:open(Filename, [create]), + Header = #db_header{}, + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), + ok = couch_file:sync(Fd) + end, + NewDb = init_db(Name, Fd, Header), + NewDb2 = copy_compact_docs(Db, NewDb), + + % suck up all the local docs into memory and write them to the new db + {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, + fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), + {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), + NewDb3 = commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}), + + NewDb3#db.update_pid ! {compact_done, NewDb3}. + +continue_copy_compact_int(#db{name=Name}=Db, NewDb) -> + couch_log:debug("Continued compaction process spawned for db \"%s\"", [Name]), + NewDb2 = copy_compact_docs(Db, NewDb), + NewDb2#db.update_pid ! {compact_done, NewDb2}. +
\ No newline at end of file |