summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-04-07 19:51:17 +0000
committerDamien F. Katz <damien@apache.org>2008-04-07 19:51:17 +0000
commit9c27e4d7db0e2e6a1b458f8545f584fcfaea4ef2 (patch)
tree94cf96af696ab3042d2afd1f4e4e7d83a98b1568 /src
parent4708e70c612a797b5d15774149ed589996d0c2e3 (diff)
Compaction. Works, but still needs queueing and better handling for long reads/writes overlapping the compaction switchover.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@645661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_db.erl63
-rw-r--r--src/couchdb/couch_query_servers.erl20
-rw-r--r--src/couchdb/couch_rep.erl51
-rw-r--r--src/couchdb/couch_stream.erl2
-rw-r--r--src/couchdb/mod_couch.erl6
5 files changed, 68 insertions, 74 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index e242c382..cdb0598e 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -105,6 +105,9 @@ create(DbName, Filepath, Options) when is_list(Options) ->
open(DbName, Filepath) ->
start_link(DbName, Filepath, []).
+
+% Compaction still needs work. Right now readers and writers can get an error
+% file compaction changeover. This doesn't need to be the case.
start_compact(MainPid) ->
gen_server:cast(MainPid, start_compact).
@@ -179,8 +182,8 @@ get_db_info(Db) ->
{doc_count, Count},
{doc_del_count, DelCount},
{update_seq, SeqNum},
- {compacting, Compactor/=nil},
- {size, Size}
+ {compact_running, Compactor/=nil},
+ {disk_size, Size}
],
{ok, InfoList}.
@@ -253,6 +256,7 @@ update_docs(MainPid, Docs, Options) ->
Doc#doc{revs=[integer_to_list(couch_util:rand32()) | Revs]}
end
end, Docs),
+ NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2],
DocBuckets = group_alike_docs(Docs2),
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
Db = get_db(MainPid),
@@ -275,11 +279,17 @@ update_docs(MainPid, Docs, Options) ->
% flush unwritten binaries to disk.
DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2],
-
+
case gen_server:call(MainPid, {update_docs, DocBuckets3, Options}) of
- ok ->
- % return back the new rev ids, in the same order input.
- {ok, [NewRev || #doc{revs=[NewRev|_]} <- Docs2]};
+ ok -> {ok, NewRevs};
+ retry ->
+ Db2 = get_db(MainPid),
+ DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3],
+ % We only retry once
+ case gen_server:call(MainPid, {update_docs, DocBuckets4, Options}) of
+ ok -> {ok, NewRevs};
+ Else -> throw(Else)
+ end;
Else->
throw(Else)
end.
@@ -477,7 +487,7 @@ start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) ->
MainPid ! {initialized, Db2},
update_loop(Db2).
-update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
+update_loop(#db{fd=Fd,name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
receive
{OrigFrom, update_docs, DocActions, Options} ->
case (catch update_docs_int(Db, DocActions, Options)) of
@@ -486,6 +496,9 @@ update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
gen_server:reply(OrigFrom, ok),
couch_db_update_notifier:notify({updated, Name}),
update_loop(Db2);
+ retry ->
+ gen_server:reply(OrigFrom, retry),
+ update_loop(Db);
conflict ->
gen_server:reply(OrigFrom, conflict),
update_loop(Db);
@@ -519,7 +532,17 @@ update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
doc_count = Db#db.doc_count,
doc_del_count = Db#db.doc_del_count,
filepath = Filepath},
- close_db(Db),
+
+ couch_stream:close(Db#db.summary_stream),
+ % close file handle async.
+ % wait 5 secs before closing, allowing readers to finish
+ unlink(Fd),
+ spawn_link(fun() ->
+ receive after 5000 -> ok end,
+ couch_file:close(Fd),
+ file:delete(Filepath ++ ".old")
+ end),
+
ok = gen_server:call(MainPid, {db_updated, NewDb2}),
couch_log:info("Compaction for db ~p completed.", [Name]),
update_loop(NewDb2#db{compactor_pid=nil});
@@ -651,17 +674,29 @@ make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
-flush_trees(Db, [Unflushed | RestUnflushed], AccFlushed) ->
+flush_trees(#db{fd=Fd}=Db, [Unflushed | RestUnflushed], AccFlushed) ->
Flushed = couch_key_tree:map(
fun(_Rev, Value) ->
case Value of
#doc{attachments=Atts,deleted=IsDeleted}=Doc ->
% this node value is actually an unwritten document summary,
% write to disk.
-
- % convert bins, removing the FD.
- % All bins should have been flushed to disk already.
- Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Atts],
+ % make sure the Fd in the written bins is the same Fd we are.
+ Bins =
+ case Atts of
+ [] -> [];
+ [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
+ % convert bins, removing the FD.
+ % All bins should have been flushed to disk already.
+ [{BinName, {BinType, BinSp, BinLen}}
+ || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
+ <- Atts];
+ _ ->
+ % BinFd must not equal our Fd. This can happen when a database
+ % is being updated during a compaction
+ couch_log:debug("File where the attachments are written has changed. Possibly retrying."),
+ throw(retry)
+ end,
{ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
{IsDeleted, NewSummaryPointer};
_ ->
@@ -880,7 +915,7 @@ copy_compact_docs(Db, NewDb) ->
fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
case couch_util:should_flush() of
true ->
- NewDb2 = copy_docs(Db, AccNewDb, lists:reverse(AccUncopied, DocInfo)),
+ NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])),
{ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
false ->
{ok, {AccNewDb, [DocInfo | AccUncopied]}}
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index 19cba9bd..33962705 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -36,8 +36,6 @@ readline(Port) ->
readline(Port, []).
readline(Port, Acc) ->
- Timer = erlang:send_after(timeout(), self(), timeout),
- Result =
receive
{Port, {data, {noeol, Data}}} ->
readline(Port, [Data|Acc]);
@@ -45,20 +43,11 @@ readline(Port, Acc) ->
lists:flatten(lists:reverse(Acc, Data));
{Port, Err} ->
catch port_close(Port),
- erlang:cancel_timer(Timer),
- throw({map_process_error, Err});
- timeout ->
+ throw({map_process_error, Err})
+ after timeout() ->
catch port_close(Port),
throw({map_process_error, "map function timed out"})
- end,
- case erlang:cancel_timer(Timer) of
- false ->
- % message already sent. clear it
- receive timeout -> ok end;
- _ ->
- ok
- end,
- Result.
+ end.
read_json(Port) ->
case cjson:decode(readline(Port)) of
@@ -108,8 +97,7 @@ start_doc_map(Lang, Functions) ->
map_docs({_Lang, Port}, Docs) ->
% send the documents
- Results =
- lists:map(
+ Results = lists:map(
fun(Doc) ->
Json = couch_doc:to_json_obj(Doc, []),
case prompt(Port, {"map_doc", Json}) of
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 9590d5c1..4a6a415a 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -14,7 +14,7 @@
-include("couch_db.hrl").
--export([replicate/2, replicate/3, test/0, test_write_docs/3]).
+-export([replicate/2, replicate/3]).
-record(stats, {
docs_read=0,
@@ -117,8 +117,7 @@ replicate(Source, Target, Options) ->
end.
pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) ->
- {ok, NewSeq} =
- enum_docs_since(DbSource, SourceSeqNum,
+ {ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum,
fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) ->
Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats),
{ok, {Seq, Stats2}}
@@ -136,23 +135,15 @@ maybe_save_docs(DbTarget, DbSource,
[] ->
Stats;
_Else ->
- % the 'ok' below validates no unrecoverable errors (like network failure, etc).
{ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
+ % only save successful reads
+ Docs = [RevDoc || {ok, RevDoc} <- DocResults],
+ ok = save_docs(DbTarget, Docs, []),
- Docs = [RevDoc || {ok, RevDoc} <- DocResults], % only match successful loads
-
- Stats2 = Stats#stats{
+ Stats#stats{
docs_read=Stats#stats.docs_read + length(Docs),
- read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs)},
-
- case Docs of
- [] ->
- Stats2;
- _ ->
- % the 'ok' below validates no unrecoverable errors (like network failure, etc).
- ok = save_docs(DbTarget, Docs, []),
- Stats2#stats{docs_copied=Stats2#stats.docs_copied+length(Docs)}
- end
+ read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs),
+ docs_copied=Stats#stats.docs_copied + length(Docs)}
end.
@@ -280,29 +271,3 @@ open_doc_revs(Db, DocId, Revs, Options) ->
couch_db:open_doc_revs(Db, DocId, Revs, Options).
-
-
-
-test() ->
- couch_server:start(),
- %{ok, LocalA} = couch_server:open("replica_a"),
- {ok, LocalA} = couch_server:create("replica_a", [overwrite]),
- {ok, _} = couch_server:create("replica_b", [overwrite]),
- %DbA = "replica_a",
- DbA = "http://localhost:5984/replica_a/",
- %DbB = "replica_b",
- DbB = "http://localhost:5984/replica_b/",
- _DocUnids = test_write_docs(10, LocalA, []),
- replicate(DbA, DbB),
- %{ok, _Rev} = couch_db:delete_doc(LocalA, lists:nth(1, DocUnids), any),
- % replicate(DbA, DbB),
- ok.
-
-test_write_docs(0, _Db, Output) ->
- lists:reverse(Output);
-test_write_docs(N, Db, Output) ->
- Doc = #doc{
- id=integer_to_list(N),
- body={obj, [{"foo", integer_to_list(N)}, {"num", N}, {"bar", "blah"}]}},
- couch_db:save_doc(Db, Doc, []),
- test_write_docs(N-1, Db, [integer_to_list(N) | Output]).
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index 8d5260f1..ca43562a 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -79,7 +79,7 @@ read(Fd, Sp, Num) ->
{ok, Bin, Sp2}.
copy_to_new_stream(Src, Sp, Len, DestFd) ->
- Dest = open(DestFd),
+ {ok, Dest} = open(DestFd),
{ok, NewSp} = copy(Src, Sp, Len, Dest),
close(Dest),
{ok, NewSp}.
diff --git a/src/couchdb/mod_couch.erl b/src/couchdb/mod_couch.erl
index 8373dbe9..0d157b1e 100644
--- a/src/couchdb/mod_couch.erl
+++ b/src/couchdb/mod_couch.erl
@@ -195,6 +195,8 @@ do(#mod{method="POST"}=Mod, #uri_parts{db="_restart", doc=""}) ->
send_ok(Mod, 201);
do(#mod{method="POST"}=Mod, #uri_parts{doc="_missing_revs"}=Parts) ->
handle_missing_revs_request(Mod, Parts);
+do(#mod{method="POST"}=Mod, #uri_parts{doc="_compact"}=Parts) ->
+ handle_compact(Mod, Parts);
do(#mod{method="PUT"}=Mod, #uri_parts{doc=""}=Parts) ->
handle_db_create(Mod, Parts);
do(#mod{method="DELETE"}=Mod, #uri_parts{doc=""}=Parts) ->
@@ -487,6 +489,10 @@ handle_missing_revs_request(#mod{entity_body=RawJson}=Mod, Parts) ->
JsonResults = [{Id, list_to_tuple(Revs)} || {Id, Revs} <- Results],
send_json(Mod, 200, {obj, [{missing_revs, {obj, JsonResults}}]}).
+handle_compact(Mod, Parts) ->
+ ok = couch_db:start_compact(open_db(Parts)),
+ send_ok(Mod, 202).
+
handle_replication_request(#mod{entity_body=RawJson}=Mod) ->
{obj, Props} = cjson:decode(RawJson),
Src = proplists:get_value("source", Props),