summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--share/server/main.js2
-rw-r--r--share/www/script/couch.js11
-rw-r--r--share/www/script/couch_tests.js27
-rw-r--r--src/couchdb/couch_db.erl63
-rw-r--r--src/couchdb/couch_query_servers.erl20
-rw-r--r--src/couchdb/couch_rep.erl51
-rw-r--r--src/couchdb/couch_stream.erl2
-rw-r--r--src/couchdb/mod_couch.erl6
8 files changed, 106 insertions, 76 deletions
diff --git a/share/server/main.js b/share/server/main.js
index 91e13742..5cc854aa 100644
--- a/share/server/main.js
+++ b/share/server/main.js
@@ -68,7 +68,7 @@ while (cmd = eval(readline())) {
// [
// [["Key","Value"]], <- fun 1 returned 1 key value
// [], <- fun 2 returned 0 key values
- // [["Key1","Value1"],["Key2","Value2"]],<- fun 3 returned 2 key values
+ // [["Key1","Value1"],["Key2","Value2"]] <- fun 3 returned 2 key values
// ]
//
var doc = cmd[1];
diff --git a/share/www/script/couch.js b/share/www/script/couch.js
index 9815882a..f1544893 100644
--- a/share/www/script/couch.js
+++ b/share/www/script/couch.js
@@ -87,6 +87,9 @@ function CouchDB(name) {
var result = JSON.parse(req.responseText);
if (req.status != 201)
throw result;
+ for(i in docs) {
+ docs[i]._rev = result.new_revs[i].rev;
+ }
return result;
}
@@ -130,6 +133,14 @@ function CouchDB(name) {
throw result;
return result;
}
+
+ this.compact = function() {
+ var req = request("POST", this.uri + "_compact");
+ var result = JSON.parse(req.responseText);
+ if (req.status != 202)
+ throw result;
+ return result;
+ }
// Convert a options object to an url query string.
// ex: {key:'value',key2:'value2'} becomes '?key="value"&key2="value2"'
diff --git a/share/www/script/couch_tests.js b/share/www/script/couch_tests.js
index 986a90b2..70879479 100644
--- a/share/www/script/couch_tests.js
+++ b/share/www/script/couch_tests.js
@@ -839,8 +839,33 @@ var tests = {
headers: {"if-match": etag}
});
T(xhr.status == 202)
- }
+ },
+ compact: function(debug) {
+ var db = new CouchDB("test_suite_db");
+ db.deleteDb();
+ db.createDb();
+ if (debug) debugger;
+ var docs = makeDocs(0, 10);
+ var saveResult = db.bulkSave(docs);
+ T(saveResult.ok);
+ var originalsize = db.info().disk_size;
+
+ for(var i in docs) {
+ db.deleteDoc(docs[i]);
+ }
+ var deletesize = db.info().disk_size;
+ T(deletesize > originalsize);
+
+ var xhr = CouchDB.request("POST", "/test_suite_db/_compact");
+ T(xhr.status == 202);
+ //compaction isn't instantaneous, loop until done
+ while(db.info().compact_running) {};
+
+ var compactedsize = db.info().disk_size;
+
+ T(deletesize > originalsize);
+ }
};
function makeDocs(start, end, templateDoc) {
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index e242c382..cdb0598e 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -105,6 +105,9 @@ create(DbName, Filepath, Options) when is_list(Options) ->
open(DbName, Filepath) ->
start_link(DbName, Filepath, []).
+
+% Compaction still needs work. Right now readers and writers can get an error
+% file compaction changeover. This doesn't need to be the case.
start_compact(MainPid) ->
gen_server:cast(MainPid, start_compact).
@@ -179,8 +182,8 @@ get_db_info(Db) ->
{doc_count, Count},
{doc_del_count, DelCount},
{update_seq, SeqNum},
- {compacting, Compactor/=nil},
- {size, Size}
+ {compact_running, Compactor/=nil},
+ {disk_size, Size}
],
{ok, InfoList}.
@@ -253,6 +256,7 @@ update_docs(MainPid, Docs, Options) ->
Doc#doc{revs=[integer_to_list(couch_util:rand32()) | Revs]}
end
end, Docs),
+ NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2],
DocBuckets = group_alike_docs(Docs2),
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
Db = get_db(MainPid),
@@ -275,11 +279,17 @@ update_docs(MainPid, Docs, Options) ->
% flush unwritten binaries to disk.
DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2],
-
+
case gen_server:call(MainPid, {update_docs, DocBuckets3, Options}) of
- ok ->
- % return back the new rev ids, in the same order input.
- {ok, [NewRev || #doc{revs=[NewRev|_]} <- Docs2]};
+ ok -> {ok, NewRevs};
+ retry ->
+ Db2 = get_db(MainPid),
+ DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3],
+ % We only retry once
+ case gen_server:call(MainPid, {update_docs, DocBuckets4, Options}) of
+ ok -> {ok, NewRevs};
+ Else -> throw(Else)
+ end;
Else->
throw(Else)
end.
@@ -477,7 +487,7 @@ start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) ->
MainPid ! {initialized, Db2},
update_loop(Db2).
-update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
+update_loop(#db{fd=Fd,name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
receive
{OrigFrom, update_docs, DocActions, Options} ->
case (catch update_docs_int(Db, DocActions, Options)) of
@@ -486,6 +496,9 @@ update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
gen_server:reply(OrigFrom, ok),
couch_db_update_notifier:notify({updated, Name}),
update_loop(Db2);
+ retry ->
+ gen_server:reply(OrigFrom, retry),
+ update_loop(Db);
conflict ->
gen_server:reply(OrigFrom, conflict),
update_loop(Db);
@@ -519,7 +532,17 @@ update_loop(#db{name=Name,filepath=Filepath, main_pid=MainPid}=Db) ->
doc_count = Db#db.doc_count,
doc_del_count = Db#db.doc_del_count,
filepath = Filepath},
- close_db(Db),
+
+ couch_stream:close(Db#db.summary_stream),
+ % close file handle async.
+ % wait 5 secs before closing, allowing readers to finish
+ unlink(Fd),
+ spawn_link(fun() ->
+ receive after 5000 -> ok end,
+ couch_file:close(Fd),
+ file:delete(Filepath ++ ".old")
+ end),
+
ok = gen_server:call(MainPid, {db_updated, NewDb2}),
couch_log:info("Compaction for db ~p completed.", [Name]),
update_loop(NewDb2#db{compactor_pid=nil});
@@ -651,17 +674,29 @@ make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
-flush_trees(Db, [Unflushed | RestUnflushed], AccFlushed) ->
+flush_trees(#db{fd=Fd}=Db, [Unflushed | RestUnflushed], AccFlushed) ->
Flushed = couch_key_tree:map(
fun(_Rev, Value) ->
case Value of
#doc{attachments=Atts,deleted=IsDeleted}=Doc ->
% this node value is actually an unwritten document summary,
% write to disk.
-
- % convert bins, removing the FD.
- % All bins should have been flushed to disk already.
- Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Atts],
+ % make sure the Fd in the written bins is the same Fd we are.
+ Bins =
+ case Atts of
+ [] -> [];
+ [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
+ % convert bins, removing the FD.
+ % All bins should have been flushed to disk already.
+ [{BinName, {BinType, BinSp, BinLen}}
+ || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
+ <- Atts];
+ _ ->
+ % BinFd must not equal our Fd. This can happen when a database
+ % is being updated during a compaction
+ couch_log:debug("File where the attachments are written has changed. Possibly retrying."),
+ throw(retry)
+ end,
{ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
{IsDeleted, NewSummaryPointer};
_ ->
@@ -880,7 +915,7 @@ copy_compact_docs(Db, NewDb) ->
fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
case couch_util:should_flush() of
true ->
- NewDb2 = copy_docs(Db, AccNewDb, lists:reverse(AccUncopied, DocInfo)),
+ NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])),
{ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
false ->
{ok, {AccNewDb, [DocInfo | AccUncopied]}}
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index 19cba9bd..33962705 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -36,8 +36,6 @@ readline(Port) ->
readline(Port, []).
readline(Port, Acc) ->
- Timer = erlang:send_after(timeout(), self(), timeout),
- Result =
receive
{Port, {data, {noeol, Data}}} ->
readline(Port, [Data|Acc]);
@@ -45,20 +43,11 @@ readline(Port, Acc) ->
lists:flatten(lists:reverse(Acc, Data));
{Port, Err} ->
catch port_close(Port),
- erlang:cancel_timer(Timer),
- throw({map_process_error, Err});
- timeout ->
+ throw({map_process_error, Err})
+ after timeout() ->
catch port_close(Port),
throw({map_process_error, "map function timed out"})
- end,
- case erlang:cancel_timer(Timer) of
- false ->
- % message already sent. clear it
- receive timeout -> ok end;
- _ ->
- ok
- end,
- Result.
+ end.
read_json(Port) ->
case cjson:decode(readline(Port)) of
@@ -108,8 +97,7 @@ start_doc_map(Lang, Functions) ->
map_docs({_Lang, Port}, Docs) ->
% send the documents
- Results =
- lists:map(
+ Results = lists:map(
fun(Doc) ->
Json = couch_doc:to_json_obj(Doc, []),
case prompt(Port, {"map_doc", Json}) of
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 9590d5c1..4a6a415a 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -14,7 +14,7 @@
-include("couch_db.hrl").
--export([replicate/2, replicate/3, test/0, test_write_docs/3]).
+-export([replicate/2, replicate/3]).
-record(stats, {
docs_read=0,
@@ -117,8 +117,7 @@ replicate(Source, Target, Options) ->
end.
pull_rep(DbTarget, DbSource, SourceSeqNum, Stats) ->
- {ok, NewSeq} =
- enum_docs_since(DbSource, SourceSeqNum,
+ {ok, NewSeq} = enum_docs_since(DbSource, SourceSeqNum,
fun(#doc_info{update_seq=Seq}=SrcDocInfo, _, {_, AccStats}) ->
Stats2 = maybe_save_docs(DbTarget, DbSource, SrcDocInfo, AccStats),
{ok, {Seq, Stats2}}
@@ -136,23 +135,15 @@ maybe_save_docs(DbTarget, DbSource,
[] ->
Stats;
_Else ->
- % the 'ok' below validates no unrecoverable errors (like network failure, etc).
{ok, DocResults} = open_doc_revs(DbSource, Id, MissingRevs, [latest]),
+ % only save successful reads
+ Docs = [RevDoc || {ok, RevDoc} <- DocResults],
+ ok = save_docs(DbTarget, Docs, []),
- Docs = [RevDoc || {ok, RevDoc} <- DocResults], % only match successful loads
-
- Stats2 = Stats#stats{
+ Stats#stats{
docs_read=Stats#stats.docs_read + length(Docs),
- read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs)},
-
- case Docs of
- [] ->
- Stats2;
- _ ->
- % the 'ok' below validates no unrecoverable errors (like network failure, etc).
- ok = save_docs(DbTarget, Docs, []),
- Stats2#stats{docs_copied=Stats2#stats.docs_copied+length(Docs)}
- end
+ read_errors=Stats#stats.read_errors + length(DocResults) - length(Docs),
+ docs_copied=Stats#stats.docs_copied + length(Docs)}
end.
@@ -280,29 +271,3 @@ open_doc_revs(Db, DocId, Revs, Options) ->
couch_db:open_doc_revs(Db, DocId, Revs, Options).
-
-
-
-test() ->
- couch_server:start(),
- %{ok, LocalA} = couch_server:open("replica_a"),
- {ok, LocalA} = couch_server:create("replica_a", [overwrite]),
- {ok, _} = couch_server:create("replica_b", [overwrite]),
- %DbA = "replica_a",
- DbA = "http://localhost:5984/replica_a/",
- %DbB = "replica_b",
- DbB = "http://localhost:5984/replica_b/",
- _DocUnids = test_write_docs(10, LocalA, []),
- replicate(DbA, DbB),
- %{ok, _Rev} = couch_db:delete_doc(LocalA, lists:nth(1, DocUnids), any),
- % replicate(DbA, DbB),
- ok.
-
-test_write_docs(0, _Db, Output) ->
- lists:reverse(Output);
-test_write_docs(N, Db, Output) ->
- Doc = #doc{
- id=integer_to_list(N),
- body={obj, [{"foo", integer_to_list(N)}, {"num", N}, {"bar", "blah"}]}},
- couch_db:save_doc(Db, Doc, []),
- test_write_docs(N-1, Db, [integer_to_list(N) | Output]).
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index 8d5260f1..ca43562a 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -79,7 +79,7 @@ read(Fd, Sp, Num) ->
{ok, Bin, Sp2}.
copy_to_new_stream(Src, Sp, Len, DestFd) ->
- Dest = open(DestFd),
+ {ok, Dest} = open(DestFd),
{ok, NewSp} = copy(Src, Sp, Len, Dest),
close(Dest),
{ok, NewSp}.
diff --git a/src/couchdb/mod_couch.erl b/src/couchdb/mod_couch.erl
index 8373dbe9..0d157b1e 100644
--- a/src/couchdb/mod_couch.erl
+++ b/src/couchdb/mod_couch.erl
@@ -195,6 +195,8 @@ do(#mod{method="POST"}=Mod, #uri_parts{db="_restart", doc=""}) ->
send_ok(Mod, 201);
do(#mod{method="POST"}=Mod, #uri_parts{doc="_missing_revs"}=Parts) ->
handle_missing_revs_request(Mod, Parts);
+do(#mod{method="POST"}=Mod, #uri_parts{doc="_compact"}=Parts) ->
+ handle_compact(Mod, Parts);
do(#mod{method="PUT"}=Mod, #uri_parts{doc=""}=Parts) ->
handle_db_create(Mod, Parts);
do(#mod{method="DELETE"}=Mod, #uri_parts{doc=""}=Parts) ->
@@ -487,6 +489,10 @@ handle_missing_revs_request(#mod{entity_body=RawJson}=Mod, Parts) ->
JsonResults = [{Id, list_to_tuple(Revs)} || {Id, Revs} <- Results],
send_json(Mod, 200, {obj, [{missing_revs, {obj, JsonResults}}]}).
+handle_compact(Mod, Parts) ->
+ ok = couch_db:start_compact(open_db(Parts)),
+ send_ok(Mod, 202).
+
handle_replication_request(#mod{entity_body=RawJson}=Mod) ->
{obj, Props} = cjson:decode(RawJson),
Src = proplists:get_value("source", Props),