From afaa5d561826ccf7cab4fde2af9ad39d32ea4d0d Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Fri, 4 Apr 2008 03:10:34 +0000 Subject: compaction code, not hooked up to webserver yet git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@644593 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 219 ++++++++++++++++++++++++++++------------------- 1 file changed, 133 insertions(+), 86 deletions(-) (limited to 'src/couchdb/couch_db.erl') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 51d55822..6f8a4ac6 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -17,9 +17,9 @@ -export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]). -export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). --export([start_update_loop/1]). +-export([start_update_loop/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). --export([start_copy_compact_int/1,continue_copy_compact_int/2]). +-export([start_copy_compact_int/2]). -include("couch_db.hrl"). @@ -47,24 +47,52 @@ update_seq, doc_count, doc_del_count, - name + name, + filepath }). % small value used in revision trees to indicate the revision isn't stored -define(REV_MISSING, []). start_link(DbName, Filepath, Options) -> + catch start_link0(DbName, Filepath, Options). + +start_link0(DbName, Filepath, Options) -> + % first delete the old file previous compaction + Fd = case couch_file:open(Filepath, Options) of - {ok, Fd} -> - Result = gen_server:start_link(couch_db, {DbName, Fd, Options}, []), - unlink(Fd), - Result; + {ok, Fd0} -> + Fd0; {error, enoent} -> - % couldn't find file - {error, not_found}; + % couldn't find file. is there a compact version? This can happen if + % crashed during the file switch. + case couch_file:open(Filepath ++ ".compact") of + {ok, Fd0} -> + couch_log:info("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]), + ok = file:rename(Filepath ++ ".compact", Filepath), + Fd0; + {error, enoent} -> + throw({error, notfound}) + end; Else -> - Else - end. + throw(Else) + end, + + StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []), + unlink(Fd), + case StartResult of + {ok, _} -> + % We successfully opened the db, delete old storage files if around + case file:delete(Filepath ++ ".old") of + ok -> + couch_log:info("Deleted old storage file ~s~s", [Filepath, ".old"]); + {error, enoent} -> + ok % normal result + end; + _ -> + ok + end, + StartResult. %%% Interface functions %%% @@ -146,12 +174,13 @@ get_db_info(Db) -> doc_count=Count, doc_del_count=DelCount, update_seq=SeqNum} = Db, + {ok, Size} = couch_file:bytes(Fd), InfoList = [ {doc_count, Count}, {doc_del_count, DelCount}, - {last_update_seq, SeqNum}, - {compacting, Compactor==nil}, - {size, couch_file:bytes(Fd)} + {update_seq, SeqNum}, + {compacting, Compactor/=nil}, + {size, Size} ], {ok, InfoList}. @@ -337,23 +366,12 @@ enum_docs(MainPid, StartId, InFun, Ctx) -> % server functions -init({DbName, Fd, Options}) -> - link(Fd), - case lists:member(create, Options) of - true -> - % create a new header and writes it to the file - Header = #db_header{}, - ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), - ok = couch_file:sync(Fd); - false -> - {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>) - end, - - Db = init_db(DbName, Fd, Header), - - UpdatePid = spawn_link(couch_db, start_update_loop, [Db]), - - {ok, Db#db{update_pid=UpdatePid}}. +init(InitArgs) -> + spawn_link(couch_db, start_update_loop, [self(), InitArgs]), + receive + {initialized, Db} -> + {ok, Db} + end. btree_by_seq_split(DocInfo) -> #doc_info{ @@ -383,7 +401,7 @@ btree_by_name_join(Id, {Seq, Tree}) -> #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}. -init_db(DbName, Fd, Header) -> +init_db(DbName, Filepath, Fd, Header) -> {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), ok = couch_stream:set_min_buffer(SummaryStream, 10000), {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, @@ -395,7 +413,7 @@ init_db(DbName, Fd, Header) -> {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), #db{ - main_pid=self(), + update_pid=self(), fd=Fd, header=Header, summary_stream = SummaryStream, @@ -405,12 +423,16 @@ init_db(DbName, Fd, Header) -> update_seq = Header#db_header.update_seq, doc_count = Header#db_header.doc_count, doc_del_count = Header#db_header.doc_del_count, - name = DbName + name = DbName, + filepath=Filepath }. +close_db(#db{fd=Fd,summary_stream=Ss}) -> + couch_file:close(Fd), + couch_stream:close(Ss). + terminate(_Reason, Db) -> - exit(Db#db.update_pid, kill), - couch_file:close(Db#db.fd). + exit(Db#db.update_pid, kill). handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) -> Updater ! {From, update_docs, DocActions, Options}, @@ -435,17 +457,34 @@ handle_info(Msg, Db) -> %%% Internal function %%% -start_update_loop(Db) -> - update_loop(Db#db{update_pid=self()}). +start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) -> + link(Fd), + + case lists:member(create, Options) of + true -> + % create a new header and writes it to the file + Header = #db_header{}, + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), + % delete any old compaction files that might be hanging around + file:delete(Filepath ++ ".compact"), + file:delete(Filepath ++ ".old"); + false -> + {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>) + end, + + Db = init_db(DbName, Filepath, Fd, Header), + Db2 = Db#db{main_pid=MainPid}, + MainPid ! {initialized, Db2}, + update_loop(Db2). -update_loop(Db) -> +update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) -> receive {OrigFrom, update_docs, DocActions, Options} -> case (catch update_docs_int(Db, DocActions, Options)) of {ok, Db2} -> - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(MainPid, {db_updated, Db2}), gen_server:reply(OrigFrom, ok), - couch_db_update_notifier:notify({updated, Db2#db.name}), + couch_db_update_notifier:notify({updated, Name}), update_loop(Db2); conflict -> gen_server:reply(OrigFrom, conflict), @@ -456,22 +495,40 @@ update_loop(Db) -> compact -> case Db#db.compactor_pid of nil -> - Pid = spawn_link(couch_db, start_copy_compact_int, [Db]), + couch_log:info("Starting compaction for db \"~s\"", [Name]), + Pid = spawn_link(couch_db, start_copy_compact_int, [Db, true]), Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(MainPid, {db_updated, Db2}), update_loop(Db2); _ -> update_loop(Db) % already started end; - {compact_done, #db{update_seq=CompactSeq}=NewDb} -> - case CompactSeq == Db#db.update_seq of + {compact_done, CompactFilepath} -> + {ok, NewFd} = couch_file:open(CompactFilepath), + {ok, NewHeader} = couch_file:read_header(NewFd, <<$g, $m, $k, 0>>), + #db{update_seq=NewSeq}= NewDb = + init_db(Name, CompactFilepath, NewFd, NewHeader), + case Db#db.update_seq == NewSeq of true -> - NewDb2 = swap_files(Db, NewDb), + couch_log:debug("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]), + ok = file:rename(Filepath, Filepath ++ ".old"), + ok = file:rename(CompactFilepath, Filepath), + + NewDb2 = NewDb#db{ + main_pid = Db#db.main_pid, + doc_count = Db#db.doc_count, + doc_del_count = Db#db.doc_del_count, + filepath = Filepath}, + close_db(Db), + ok = gen_server:call(MainPid, {db_updated, NewDb2}), + couch_log:info("Compaction for db ~p completed.", [Name]), update_loop(NewDb2#db{compactor_pid=nil}); false -> - Pid = spawn_link(couch_db, continue_copy_compact_int, [Db, NewDb]), + couch_log:info("Compaction file still behind main file " + "(update seq=~p. compact update seq=~p). Retrying.", + [Db#db.update_seq, NewSeq]), + Pid = spawn_link(couch_db, start_copy_compact_int, [Db, false]), Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), update_loop(Db2) end; Else -> @@ -479,14 +536,6 @@ update_loop(Db) -> exit({error, Else}) end. -swap_files(#db{fd=OldFd, name=Name}=_DbOld, DbNew) -> - NormalFilename = couch_server:get_filename(Name), - true = file:rename(NormalFilename, NormalFilename ++ ".old"), - true = file:rename(NormalFilename ++ ".compact", NormalFilename), - couch_file:close(OldFd), - file:delete(NormalFilename ++ ".old"), - DbNew. - get_db(MainPid) -> {ok, Db} = gen_server:call(MainPid, get_db), Db. @@ -773,7 +822,6 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> commit_data(#db{fd=Fd, header=Header} = Db) -> - ok = couch_file:sync(Fd), % commit outstanding data Header2 = Header#db_header{ update_seq = Db#db.update_seq, summary_stream_state = couch_stream:get_state(Db#db.summary_stream), @@ -787,7 +835,6 @@ commit_data(#db{fd=Fd, header=Header} = Db) -> Db; % unchanged. nothing to do true -> ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2), - ok = couch_file:sync(Fd), % commit header to disk Db#db{header = Header2} end. @@ -795,21 +842,22 @@ copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp), % copy the bin values NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) -> - {ok, NewBinSp} = couch_stream:copy_stream(SrcFd, BinSp, Len, DestFd), + {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd), {Name, {Type, NewBinSp, Len}} end, BinInfos), % now write the document summary - {ok, _SummaryPointer} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}). + {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}), + Sp. copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> []; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTrees]) -> +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) -> % This is a leaf node, copy it over NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), - [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)]; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTrees} | RestTrees]) -> + [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) -> % inner node, only copy info/data from leaf nodes - [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTrees)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTrees)]. + [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]. copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) -> Ids = [Id || #doc_info{id=Id} <- InfoBySeq], @@ -849,34 +897,33 @@ copy_compact_docs(Db, NewDb) -> NewDb2 end. -start_copy_compact_int(#db{name=Name}=Db) -> - couch_log:debug("New compaction process spawned for db \"%s\"", [Name]), - Filename = couch_server:get_compaction_filename(Name), - case couch_file:open(Filename) of +start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) -> + CompactFile = Filepath ++ ".compact", + couch_log:debug("Compaction process spawned for db \"~s\"", [Name]), + case couch_file:open(CompactFile) of {ok, Fd} -> - couch_log:debug("Found existing compaction file for db \"%s\"", [Name]), + couch_log:debug("Found existing compaction file for db \"~s\"", [Name]), {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>); {error, enoent} -> % - {ok, Fd} = couch_file:open(Filename, [create]), + {ok, Fd} = couch_file:open(CompactFile, [create]), Header = #db_header{}, - ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), - ok = couch_file:sync(Fd) + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header) end, - NewDb = init_db(Name, Fd, Header), + NewDb = init_db(Name, CompactFile, Fd, Header), NewDb2 = copy_compact_docs(Db, NewDb), - - % suck up all the local docs into memory and write them to the new db - {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, - fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), - {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), - NewDb3 = commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}), - - NewDb3#db.update_pid ! {compact_done, NewDb3}. - -continue_copy_compact_int(#db{name=Name}=Db, NewDb) -> - couch_log:debug("Continued compaction process spawned for db \"%s\"", [Name]), - NewDb2 = copy_compact_docs(Db, NewDb), - NewDb2#db.update_pid ! {compact_done, NewDb2}. + NewDb3 = + case CopyLocal of + true -> + % suck up all the local docs into memory and write them to the new db + {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, + fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), + {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), + commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}); + _ -> + NewDb2 + end, + close_db(NewDb3), + Db#db.update_pid ! {compact_done, CompactFile}. \ No newline at end of file -- cgit v1.2.3