summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-04-07 19:51:17 +0000
committerDamien F. Katz <damien@apache.org>2008-04-07 19:51:17 +0000
commit9c27e4d7db0e2e6a1b458f8545f584fcfaea4ef2 (patch)
tree94cf96af696ab3042d2afd1f4e4e7d83a98b1568 /src/couchdb/couch_db.erl
parent4708e70c612a797b5d15774149ed589996d0c2e3 (diff)
Compaction. Works, but still needs queueing and better handling for long reads/writes overlapping the compaction switchover.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@645661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r--src/couchdb/couch_db.erl63
1 files changed, 49 insertions, 14 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index e242c382..cdb0598e 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -105,6 +105,9 @@ create(DbName, Filepath, Options) when is_list(Options) ->
open(DbName, Filepath) ->
start_link(DbName, Filepath, []).
+
+% Compaction still needs work. Right now readers and writers can get an error
+% file compaction changeover. This doesn't need to be the case.
start_compact(MainPid) ->
gen_server:cast(MainPid, start_compact).
@@ -179,8 +182,8 @@ get_db_info(Db) ->
{doc_count, Count},
{doc_del_count, DelCount},
{update_seq, SeqNum},
- {compacting, Compactor/=nil},
- {size, Size}
+ {compact_running, Compactor/=nil},
+ {disk_size, Size}
],
{ok, InfoList}.
@@ -253,6 +256,7 @@ update_docs(MainPid, Docs, Options) ->
Doc#doc{revs=[integer_to_list(couch_util:rand32()) | Revs]}
end
end, Docs),
+ NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2],
DocBuckets = group_alike_docs(Docs2),
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
Db = get_db(MainPid),
@@ -275,11 +279,17 @@ update_docs(MainPid, Docs, Options) ->
% flush unwritten binaries to disk.
DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2],
-
+
case gen_server:call(MainPid, {update_docs, DocBuckets3, Options}) of
- ok ->
- % return back the new rev ids, in the same order input.
- {ok, [NewRev || #doc{revs=[NewRev|_]} <- Docs2]};
+ ok -> {ok, NewRevs};
+ retry ->
+ Db2 = get_db(MainPid),
+ DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3],
+ % We only retry once
+ case gen_server:call(MainPid, {update_docs, DocBuckets4, Options}) of
+ ok -> {ok, NewRevs};
+ Else -> throw(Else)
+ end;
Else->
throw(Else)
end.
@@ -477,7 +487,7 @@ start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) ->
MainPid ! {initialized, Db2},
update_loop(Db2).
-update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
+update_loop(#db{fd=Fd,name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
receive
{OrigFrom, update_docs, DocActions, Options} ->
case (catch update_docs_int(Db, DocActions, Options)) of
@@ -486,6 +496,9 @@ update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
gen_server:reply(OrigFrom, ok),
couch_db_update_notifier:notify({updated, Name}),
update_loop(Db2);
+ retry ->
+ gen_server:reply(OrigFrom, retry),
+ update_loop(Db);
conflict ->
gen_server:reply(OrigFrom, conflict),
update_loop(Db);
@@ -519,7 +532,17 @@ update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
doc_count = Db#db.doc_count,
doc_del_count = Db#db.doc_del_count,
filepath = Filepath},
- close_db(Db),
+
+ couch_stream:close(Db#db.summary_stream),
+ % close file handle async.
+ % wait 5 secs before closing, allowing readers to finish
+ unlink(Fd),
+ spawn_link(fun() ->
+ receive after 5000 -> ok end,
+ couch_file:close(Fd),
+ file:delete(Filepath ++ ".old")
+ end),
+
ok = gen_server:call(MainPid, {db_updated, NewDb2}),
couch_log:info("Compaction for db ~p completed.", [Name]),
update_loop(NewDb2#db{compactor_pid=nil});
@@ -651,17 +674,29 @@ make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
-flush_trees(Db, [Unflushed | RestUnflushed], AccFlushed) ->
+flush_trees(#db{fd=Fd}=Db, [Unflushed | RestUnflushed], AccFlushed) ->
Flushed = couch_key_tree:map(
fun(_Rev, Value) ->
case Value of
#doc{attachments=Atts,deleted=IsDeleted}=Doc ->
% this node value is actually an unwritten document summary,
% write to disk.
-
- % convert bins, removing the FD.
- % All bins should have been flushed to disk already.
- Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Atts],
+ % make sure the Fd in the written bins is the same Fd we are.
+ Bins =
+ case Atts of
+ [] -> [];
+ [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
+ % convert bins, removing the FD.
+ % All bins should have been flushed to disk already.
+ [{BinName, {BinType, BinSp, BinLen}}
+ || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
+ <- Atts];
+ _ ->
+ % BinFd must not equal our Fd. This can happen when a database
+ % is being updated during a compaction
+ couch_log:debug("File where the attachments are written has changed. Possibly retrying."),
+ throw(retry)
+ end,
{ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
{IsDeleted, NewSummaryPointer};
_ ->
@@ -880,7 +915,7 @@ copy_compact_docs(Db, NewDb) ->
fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
case couch_util:should_flush() of
true ->
- NewDb2 = copy_docs(Db, AccNewDb, lists:reverse(AccUncopied, DocInfo)),
+ NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])),
{ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
false ->
{ok, {AccNewDb, [DocInfo | AccUncopied]}}