From 9c27e4d7db0e2e6a1b458f8545f584fcfaea4ef2 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Mon, 7 Apr 2008 19:51:17 +0000 Subject: Compaction. Works, but still needs queueing and better handling for long reads/writes overlapping the compaction switchover. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@645661 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 63 ++++++++++++++++++++++++++++--------- src/couchdb/couch_query_servers.erl | 20 +++--------- src/couchdb/couch_rep.erl | 51 +++++------------------------- src/couchdb/couch_stream.erl | 2 +- src/couchdb/mod_couch.erl | 6 ++++ 5 files changed, 68 insertions(+), 74 deletions(-) (limited to 'src') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index e242c382..cdb0598e 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -105,6 +105,9 @@ create(DbName, Filepath, Options) when is_list(Options) -> open(DbName, Filepath) -> start_link(DbName, Filepath, []). + +% Compaction still needs work. Right now readers and writers can get an error +% file compaction changeover. This doesn't need to be the case. start_compact(MainPid) -> gen_server:cast(MainPid, start_compact). @@ -179,8 +182,8 @@ get_db_info(Db) -> {doc_count, Count}, {doc_del_count, DelCount}, {update_seq, SeqNum}, - {compacting, Compactor/=nil}, - {size, Size} + {compact_running, Compactor/=nil}, + {disk_size, Size} ], {ok, InfoList}. @@ -253,6 +256,7 @@ update_docs(MainPid, Docs, Options) -> Doc#doc{revs=[integer_to_list(couch_util:rand32()) | Revs]} end end, Docs), + NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2], DocBuckets = group_alike_docs(Docs2), Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], Db = get_db(MainPid), @@ -275,11 +279,17 @@ update_docs(MainPid, Docs, Options) -> % flush unwritten binaries to disk. DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2], - + case gen_server:call(MainPid, {update_docs, DocBuckets3, Options}) of - ok -> - % return back the new rev ids, in the same order input. - {ok, [NewRev || #doc{revs=[NewRev|_]} <- Docs2]}; + ok -> {ok, NewRevs}; + retry -> + Db2 = get_db(MainPid), + DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3], + % We only retry once + case gen_server:call(MainPid, {update_docs, DocBuckets4, Options}) of + ok -> {ok, NewRevs}; + Else -> throw(Else) + end; Else-> throw(Else) end. @@ -477,7 +487,7 @@ start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) -> MainPid ! {initialized, Db2}, update_loop(Db2). -update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) -> +update_loop(#db{fd=Fd,name=Name,filepath=Filepath, main_pid=MainPid}=Db) -> receive {OrigFrom, update_docs, DocActions, Options} -> case (catch update_docs_int(Db, DocActions, Options)) of @@ -486,6 +496,9 @@ update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) -> gen_server:reply(OrigFrom, ok), couch_db_update_notifier:notify({updated, Name}), update_loop(Db2); + retry -> + gen_server:reply(OrigFrom, retry), + update_loop(Db); conflict -> gen_server:reply(OrigFrom, conflict), update_loop(Db); @@ -519,7 +532,17 @@ update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) -> doc_count = Db#db.doc_count, doc_del_count = Db#db.doc_del_count, filepath = Filepath}, - close_db(Db), + + couch_stream:close(Db#db.summary_stream), + % close file handle async. + % wait 5 secs before closing, allowing readers to finish + unlink(Fd), + spawn_link(fun() -> + receive after 5000 -> ok end, + couch_file:close(Fd), + file:delete(Filepath ++ ".old") + end), + ok = gen_server:call(MainPid, {db_updated, NewDb2}), couch_log:info("Compaction for db ~p completed.", [Name]), update_loop(NewDb2#db{compactor_pid=nil}); @@ -651,17 +674,29 @@ make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) -> flush_trees(_Db, [], AccFlushedTrees) -> {ok, lists:reverse(AccFlushedTrees)}; -flush_trees(Db, [Unflushed | RestUnflushed], AccFlushed) -> +flush_trees(#db{fd=Fd}=Db, [Unflushed | RestUnflushed], AccFlushed) -> Flushed = couch_key_tree:map( fun(_Rev, Value) -> case Value of #doc{attachments=Atts,deleted=IsDeleted}=Doc -> % this node value is actually an unwritten document summary, % write to disk. - - % convert bins, removing the FD. - % All bins should have been flushed to disk already. - Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Atts], + % make sure the Fd in the written bins is the same Fd we are. + Bins = + case Atts of + [] -> []; + [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd -> + % convert bins, removing the FD. + % All bins should have been flushed to disk already. + [{BinName, {BinType, BinSp, BinLen}} + || {BinName, {BinType, {_Fd, BinSp, BinLen}}} + <- Atts]; + _ -> + % BinFd must not equal our Fd. This can happen when a database + % is being updated during a compaction + couch_log:debug("File where the attachments are written has changed. Possibly retrying."), + throw(retry) + end, {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), {IsDeleted, NewSummaryPointer}; _ -> @@ -880,7 +915,7 @@ copy_compact_docs(Db, NewDb) -> fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) -> case couch_util:should_flush() of true -> - NewDb2 = copy_docs(Db, AccNewDb, lists:reverse(AccUncopied, DocInfo)), + NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])), {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}}; false -> {ok, {AccNewDb, [DocInfo | AccUncopied]}} diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index 19cba9bd..33962705 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -36,8 +36,6 @@ readline(Port) -> readline(Port, []). readline(Port, Acc) -> - Timer = erlang:send_after(timeout(), self(), timeout), - Result = receive {Port, {data, {noeol, Data}}} -> readline(Port, [Data|Acc]); @@ -45,20 +43,11 @@ readline(Port, Acc) -> lists:flatten(lists:reverse(Acc, Data)); {Port, Err} -> catch port_close(Port), - erlang:cancel_timer(Timer), - throw({map_process_error, Err}); - timeout -> + throw({map_process_error, Err}) + after timeout() -> catch port_close(Port), throw({map_process_error, "map function timed out"}) - end, - case erlang:cancel_timer(Timer) of - false -> - % message already sent. clear it - receive timeout -> ok end; - _ -> - ok - end, - Result. + end. read_json(Port) -> case cjson:decode(readline(Port)) of @@ -108,8 +97,7 @@ start_doc_map(Lang, Functions) -> map_docs({_Lang, Port}, Docs) -> % send the documents - Results = - lists:map( + Results = lists:map( fun(Doc) -> Json = couch_doc:to_json_obj(Doc, []), case prompt(Port, {"map_doc", Json}) of diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 9590d5c1..4a6a415a 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -14,7 +14,7 @@ -include("couch_db.hrl"). --export([replicate/2, replicate/3, test/0, test_write_docs/3]). +-export([replicate/2, replicate/3]). -record(stats, { docs_read=0, @@ -117,8 +117,7 @@ replicate(Source, Target, Options) -> end. pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) -> - {ok, NewSeq} = - enum_docs_since(DbSource, SourceSeqNum, + {ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum, fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) -> Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats), {ok, {Seq, Stats2}} @@ -136,23 +135,15 @@ maybe_save_docs(DbTarget, DbSource, [] -> Stats; _Else -> - % the 'ok' below validates no unrecoverable errors (like network failure, etc). {ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]), + % only save successful reads + Docs = [RevDoc || {ok, RevDoc} <- DocResults], + ok = save_docs(DbTarget, Docs, []), - Docs = [RevDoc || {ok, RevDoc} <- DocResults], % only match successful loads - - Stats2 = Stats#stats{ + Stats#stats{ docs_read=Stats#stats.docs_read + length(Docs), - read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs)}, - - case Docs of - [] -> - Stats2; - _ -> - % the 'ok' below validates no unrecoverable errors (like network failure, etc). - ok = save_docs(DbTarget, Docs, []), - Stats2#stats{docs_copied=Stats2#stats.docs_copied+length(Docs)} - end + read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs), + docs_copied=Stats#stats.docs_copied + length(Docs)} end. @@ -280,29 +271,3 @@ open_doc_revs(Db, DocId, Revs, Options) -> couch_db:open_doc_revs(Db, DocId, Revs, Options). - - - -test() -> - couch_server:start(), - %{ok, LocalA} = couch_server:open("replica_a"), - {ok, LocalA} = couch_server:create("replica_a", [overwrite]), - {ok, _} = couch_server:create("replica_b", [overwrite]), - %DbA = "replica_a", - DbA = "http://localhost:5984/replica_a/", - %DbB = "replica_b", - DbB = "http://localhost:5984/replica_b/", - _DocUnids = test_write_docs(10, LocalA, []), - replicate(DbA, DbB), - %{ok, _Rev} = couch_db:delete_doc(LocalA, lists:nth(1, DocUnids), any), - % replicate(DbA, DbB), - ok. - -test_write_docs(0, _Db, Output) -> - lists:reverse(Output); -test_write_docs(N, Db, Output) -> - Doc = #doc{ - id=integer_to_list(N), - body={obj, [{"foo", integer_to_list(N)}, {"num", N}, {"bar", "blah"}]}}, - couch_db:save_doc(Db, Doc, []), - test_write_docs(N-1, Db, [integer_to_list(N) | Output]). diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index 8d5260f1..ca43562a 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -79,7 +79,7 @@ read(Fd, Sp, Num) -> {ok, Bin, Sp2}. copy_to_new_stream(Src, Sp, Len, DestFd) -> - Dest = open(DestFd), + {ok, Dest} = open(DestFd), {ok, NewSp} = copy(Src, Sp, Len, Dest), close(Dest), {ok, NewSp}. diff --git a/src/couchdb/mod_couch.erl b/src/couchdb/mod_couch.erl index 8373dbe9..0d157b1e 100644 --- a/src/couchdb/mod_couch.erl +++ b/src/couchdb/mod_couch.erl @@ -195,6 +195,8 @@ do(#mod{method="POST"}=Mod, #uri_parts{db="_restart", doc=""}) -> send_ok(Mod, 201); do(#mod{method="POST"}=Mod, #uri_parts{doc="_missing_revs"}=Parts) -> handle_missing_revs_request(Mod, Parts); +do(#mod{method="POST"}=Mod, #uri_parts{doc="_compact"}=Parts) -> + handle_compact(Mod, Parts); do(#mod{method="PUT"}=Mod, #uri_parts{doc=""}=Parts) -> handle_db_create(Mod, Parts); do(#mod{method="DELETE"}=Mod, #uri_parts{doc=""}=Parts) -> @@ -487,6 +489,10 @@ handle_missing_revs_request(#mod{entity_body=RawJson}=Mod, Parts) -> JsonResults = [{Id, list_to_tuple(Revs)} || {Id, Revs} <- Results], send_json(Mod, 200, {obj, [{missing_revs, {obj, JsonResults}}]}). +handle_compact(Mod, Parts) -> + ok = couch_db:start_compact(open_db(Parts)), + send_ok(Mod, 202). + handle_replication_request(#mod{entity_body=RawJson}=Mod) -> {obj, Props} = cjson:decode(RawJson), Src = proplists:get_value("source", Props), -- cgit v1.2.3