summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-01-09 22:20:48 +0000
committerDamien F. Katz <damien@apache.org>2009-01-09 22:20:48 +0000
commit87f45e73df3e37fbb631bcb14871c621ee77489b (patch)
tree0e33d44479bc94dd8a28387d771ba5feb037441d
parentf6664de58f489627fee6e4283a1d17e0c6a99433 (diff)
Added support so clients can detect if a server has potentially lost commits after multiple updates, like during bulk imports and so the replicator can detect lost commits on remote replications.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@733174 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--share/www/script/couch_tests.js305
-rw-r--r--src/couchdb/couch_db.erl11
-rw-r--r--src/couchdb/couch_db.hrl1
-rw-r--r--src/couchdb/couch_db_updater.erl10
-rw-r--r--src/couchdb/couch_file.erl6
-rw-r--r--src/couchdb/couch_httpd_db.erl5
-rw-r--r--src/couchdb/couch_rep.erl43
-rw-r--r--src/couchdb/couch_view_group.erl2
8 files changed, 286 insertions, 97 deletions
diff --git a/share/www/script/couch_tests.js b/share/www/script/couch_tests.js
index 20d8733f..3263c532 100644
--- a/share/www/script/couch_tests.js
+++ b/share/www/script/couch_tests.js
@@ -136,6 +136,89 @@ var tests = {
// make sure we can still open
T(db.open(existingDoc._id, {rev: existingDoc._rev}) != null);
},
+
+ delayed_commits: function(debug) {
+ var db = new CouchDB("test_suite_db");
+ db.deleteDb();
+ db.createDb();
+ if (debug) debugger;
+
+ // By default, couchdb doesn't fully commit documents to disk right away,
+ // it waits about a second to batch the full commit flush along with any
+ // other updates. If it crashes or is restarted you may lose the most
+ // recent commits.
+
+ T(db.save({_id:"1",a:2,b:4}).ok);
+ T(db.open("1") != null);
+
+ restartServer();
+
+ T(db.open("1") == null); // lost the update.
+ // note if we waited > 1 sec before the restart, the doc would likely
+ // commit.
+
+
+ // Retry the same thing but with full commits on.
+
+ var db2 = new CouchDB("test_suite_db", {"X-Couch-Full-Commit":"true"});
+
+ T(db2.save({_id:"1",a:2,b:4}).ok);
+ T(db2.open("1") != null);
+
+ restartServer();
+
+ T(db2.open("1") != null);
+
+ // You can update but without committing immediately, and then ensure
+ // everything is commited in the last step.
+
+ T(db.save({_id:"2",a:2,b:4}).ok);
+ T(db.open("2") != null);
+ T(db.ensureFullCommit().ok);
+ restartServer();
+
+ T(db.open("2") != null);
+
+ // However, it's possible even when flushed, that the server crashed between
+ // the update and the commit, and you don't want to check to make sure
+ // every doc you updated actually made it to disk. So record the instance
+ // start time of the database before the updates and then check it again
+ // after the flush (the instance start time is returned by the flush
+ // operation). if they are the same, we know everything was updated
+ // safely.
+
+ // First try it with a crash.
+
+ var instanceStartTime = db.info().instance_start_time;
+
+ T(db.save({_id:"3",a:2,b:4}).ok);
+ T(db.open("3") != null);
+
+ restartServer();
+
+ var commitResult = db.ensureFullCommit();
+ T(commitResult.ok && commitResult.instance_start_time != instanceStartTime);
+ // start times don't match, meaning the server lost our change
+
+ T(db.open("3") == null); // yup lost it
+
+ // retry with no server restart
+
+ var instanceStartTime = db.info().instance_start_time;
+
+ T(db.save({_id:"4",a:2,b:4}).ok);
+ T(db.open("4") != null);
+
+ var commitResult = db.ensureFullCommit();
+ T(commitResult.ok && commitResult.instance_start_time == instanceStartTime);
+ // Successful commit, start times match!
+
+ restartServer();
+
+ T(db.open("4") != null);
+
+ },
+
all_docs: function(debug) {
var db = new CouchDB("test_suite_db");
db.deleteDb();
@@ -1881,97 +1964,153 @@ var tests = {
dbA.createDb();
dbB.deleteDb();
dbB.createDb();
-
- var docs = makeDocs(0, numDocs);
- T(dbA.bulkSave(docs).ok);
-
- T(CouchDB.replicate(A, B).ok);
-
- for (var j = 0; j < numDocs; j++) {
- docA = dbA.open("" + j);
- docB = dbB.open("" + j);
- T(docA._rev == docB._rev);
- }
-
- // check documents with a '/' in the ID
- // need to re-encode the slash when replicating from a remote source
- dbA.save({ _id:"abc/def", val:"one" });
- T(CouchDB.replicate(A, B).ok);
- T(CouchDB.replicate(B, A).ok);
+ var repTests = {
+ // copy and paste and put your code in. delete unused steps.
+ test_template: new function () {
+ this.init = function(dbA, dbB) {
+ // before anything has happened
+ }
+ this.afterAB1 = function(dbA, dbB) {
+ // called after replicating src=A tgt=B first time.
+ };
+ this.afterBA1 = function(dbA, dbB) {
+ // called after replicating src=B tgt=A first time.
+ };
+ this.afterAB2 = function(dbA, dbB) {
+ // called after replicating src=A tgt=B second time.
+ };
+ this.afterBA2 = function(dbA, dbB) {
+ // etc...
+ };
+ },
+
+ simple_test: new function () {
+ this.init = function(dbA, dbB) {
+ var docs = makeDocs(0, numDocs);
+ T(dbA.bulkSave(docs).ok);
+ };
+
+ this.afterAB1 = function(dbA, dbB) {
+ for (var j = 0; j < numDocs; j++) {
+ var docA = dbA.open("" + j);
+ var docB = dbB.open("" + j);
+ T(docA._rev == docB._rev);
+ }
+ };
+ },
- docA = dbA.open("abc/def");
- docB = dbB.open("abc/def");
- T(docA._rev == docB._rev);
+ deletes_test: new function () {
+ this.init = function(dbA, dbB) {
+ T(dbA.save({_id:"foo1",value:"a"}).ok);
+ };
+
+ this.afterAB1 = function(dbA, dbB) {
+ var docA = dbA.open("foo1");
+ var docB = dbB.open("foo1");
+ T(docA._rev == docB._rev);
+
+ dbA.deleteDoc(docA);
+ };
+
+ this.afterAB2 = function(dbA, dbB) {
+ T(dbA.open("foo1") == null);
+ T(dbB.open("foo1") == null);
+ };
+ },
+
+ slashes_in_ids_test: new function () {
+ // make sure docs with slashes in id replicate properly
+ this.init = function(dbA, dbB) {
+ dbA.save({ _id:"abc/def", val:"one" });
+ };
+
+ this.afterAB1 = function(dbA, dbB) {
+ var docA = dbA.open("abc/def");
+ var docB = dbB.open("abc/def");
+ T(docA._rev == docB._rev);
+ };
+ },
- // now check binary attachments
- var binDoc = {
- _id:"bin_doc",
- _attachments:{
- "foo.txt": {
- "type":"base64",
- "data": "VGhpcyBpcyBhIGJhc2U2NCBlbmNvZGVkIHRleHQ="
- }
+ attachments_test: new function () {
+ // Test attachments
+ this.init = function(dbA, dbB) {
+ dbA.save({
+ _id:"bin_doc",
+ _attachments:{
+ "foo.txt": {
+ "type":"base64",
+ "data": "VGhpcyBpcyBhIGJhc2U2NCBlbmNvZGVkIHRleHQ="
+ }
+ }
+ });
+ };
+
+ this.afterAB1 = function(dbA, dbB) {
+ var xhr = CouchDB.request("GET", "/test_suite_db_a/bin_doc/foo.txt");
+ T(xhr.responseText == "This is a base64 encoded text")
+
+ xhr = CouchDB.request("GET", "/test_suite_db_b/bin_doc/foo.txt");
+ T(xhr.responseText == "This is a base64 encoded text")
+ };
+ },
+
+ conflicts_test: new function () {
+ // test conflicts
+ this.init = function(dbA, dbB) {
+ dbA.save({_id:"foo",value:"a"});
+ dbB.save({_id:"foo",value:"b"});
+ };
+
+ this.afterBA1 = function(dbA, dbB) {
+ var docA = dbA.open("foo", {conflicts: true});
+ var docB = dbB.open("foo", {conflicts: true});
+
+ // make sure the same rev is in each db
+ T(docA._rev === docB._rev);
+
+ // make sure the conflicts are the same in each db
+ T(docA._conflicts[0] === docB._conflicts[0]);
+
+ // delete a conflict.
+ dbA.deleteDoc({_id:"foo", _rev:docA._conflicts[0]});
+ };
+
+ this.afterBA2 = function(dbA, dbB) {
+ // open documents and include the conflict meta data
+ var docA = dbA.open("foo", {conflicts: true});
+ var docB = dbB.open("foo", {conflicts: true});
+
+ // We should have no conflicts this time
+ T(docA._conflicts === undefined)
+ T(docB._conflicts === undefined);
+ };
}
- }
-
- dbA.save(binDoc);
-
- T(CouchDB.replicate(A, B).ok);
- T(CouchDB.replicate(B, A).ok);
-
- xhr = CouchDB.request("GET", "/test_suite_db_a/bin_doc/foo.txt");
- T(xhr.responseText == "This is a base64 encoded text")
-
- xhr = CouchDB.request("GET", "/test_suite_db_b/bin_doc/foo.txt");
- T(xhr.responseText == "This is a base64 encoded text")
-
- dbA.save({_id:"foo1",value:"a"});
-
- T(CouchDB.replicate(A, B).ok);
- T(CouchDB.replicate(B, A).ok);
-
- docA = dbA.open("foo1");
- docB = dbB.open("foo1");
- T(docA._rev == docB._rev);
-
- dbA.deleteDoc(docA);
-
+ };
+ var test;
+ for(test in repTests)
+ if(repTests[test].init) repTests[test].init(dbA, dbB);
+
T(CouchDB.replicate(A, B).ok);
+
+ for(test in repTests)
+ if(repTests[test].afterAB1) repTests[test].afterAB1(dbA, dbB);
+
T(CouchDB.replicate(B, A).ok);
-
- T(dbA.open("foo1") == null);
- T(dbB.open("foo1") == null);
-
- dbA.save({_id:"foo",value:"a"});
- dbB.save({_id:"foo",value:"b"});
-
+
+ for(test in repTests)
+ if(repTests[test].afterBA1) repTests[test].afterBA1(dbA, dbB);
+
T(CouchDB.replicate(A, B).ok);
+
+ for(test in repTests)
+ if(repTests[test].afterAB2) repTests[test].afterAB2(dbA, dbB);
+
T(CouchDB.replicate(B, A).ok);
-
- // open documents and include the conflict meta data
- docA = dbA.open("foo", {conflicts: true});
- docB = dbB.open("foo", {conflicts: true});
-
- // make sure the same rev is in each db
- T(docA._rev === docB._rev);
-
- // make sure the conflicts are the same in each db
- T(docA._conflicts[0] === docB._conflicts[0]);
-
- // delete a conflict.
- dbA.deleteDoc({_id:"foo", _rev:docA._conflicts[0]});
-
- // replicate the change
- T(CouchDB.replicate(A, B).ok);
-
- // open documents and include the conflict meta data
- docA = dbA.open("foo", {conflicts: true});
- docB = dbB.open("foo", {conflicts: true});
-
- // We should have no conflicts this time
- T(docA._conflicts === undefined)
- T(docB._conflicts === undefined);
+
+ for(test in repTests)
+ if(repTests[test].afterBA2) repTests[test].afterBA2(dbA, dbB);
+
}
},
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index c9546240..3011d744 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -64,8 +64,9 @@ create(DbName, Options) ->
open(DbName, Options) ->
couch_server:open(DbName, Options).
-ensure_full_commit(#db{update_pid=UpdatePid}) ->
- gen_server:call(UpdatePid, full_commit, infinity).
+ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) ->
+ ok = gen_server:call(UpdatePid, full_commit, infinity),
+ {ok, StartTime}.
close(#db{fd=Fd}) ->
couch_file:drop_ref(Fd).
@@ -166,7 +167,8 @@ get_db_info(Db) ->
compactor_pid=Compactor,
update_seq=SeqNum,
name=Name,
- fulldocinfo_by_id_btree=FullDocBtree} = Db,
+ fulldocinfo_by_id_btree=FullDocBtree,
+ instance_start_time=StartTime} = Db,
{ok, Size} = couch_file:bytes(Fd),
{ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree),
InfoList = [
@@ -176,7 +178,8 @@ get_db_info(Db) ->
{update_seq, SeqNum},
{purge_seq, couch_db:get_purge_seq(Db)},
{compact_running, Compactor/=nil},
- {disk_size, Size}
+ {disk_size, Size},
+ {instance_start_time, StartTime}
],
{ok, InfoList}.
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 830f2f9a..238025a2 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -120,6 +120,7 @@
{main_pid=nil,
update_pid=nil,
compactor_pid=nil,
+ instance_start_time, % number of microsecs since jan 1 1970 as a binary string
fd,
header = #db_header{},
summary_stream,
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index 94605a3c..cbeda223 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -279,6 +279,12 @@ init_db(DbName, Filepath, Fd, Header0) ->
AdminsPtr ->
{ok, Admins} = couch_file:pread_term(Fd, AdminsPtr)
end,
+
+ % convert start time tuple to microsecs and store as a binary string
+ {MegaSecs, Secs, MicroSecs} = now(),
+ StartTime = ?l2b(io_lib:format("~p",
+ [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
+
#db{
update_pid=self(),
fd=Fd,
@@ -291,7 +297,9 @@ init_db(DbName, Filepath, Fd, Header0) ->
name = DbName,
filepath = Filepath,
admins = Admins,
- admins_ptr = AdminsPtr}.
+ admins_ptr = AdminsPtr,
+ instance_start_time = StartTime
+ }.
close_db(#db{fd=Fd,summary_stream=Ss}) ->
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index 9e60eb09..b29f45d2 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -170,7 +170,8 @@ close(Fd) ->
Result.
close_maybe(Fd) ->
- gen_server:cast(Fd, {close_maybe, self()}).
+ catch unlink(Fd),
+ catch gen_server:cast(Fd, close_maybe).
drop_ref(Fd) ->
drop_ref(Fd, self()).
@@ -372,8 +373,7 @@ handle_call(num_refs, _From, Fd) ->
handle_cast(close, Fd) ->
{stop,normal,Fd};
-handle_cast({close_maybe, Pid}, Fd) ->
- catch unlink(Pid),
+handle_cast(close_maybe, Fd) ->
maybe_close_async(Fd);
handle_cast({drop_ref, Pid}, Fd) ->
case get(Pid) of
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 5366da1e..0519061b 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -90,9 +90,10 @@ db_req(#httpd{path_parts=[_DbName]}=Req, _Db) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST");
db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) ->
- ok = couch_db:ensure_full_commit(Db),
+ {ok, DbStartTime} = couch_db:ensure_full_commit(Db),
send_json(Req, 201, {[
- {ok, true}
+ {ok, true},
+ {instance_start_time, DbStartTime}
]});
db_req(#httpd{path_parts=[_,<<"_ensure_full_commit">>]}=Req, _Db) ->
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 881525f0..29f1fc80 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -69,7 +69,10 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) ->
RepRecKey = <<?LOCAL_DOC_PREFIX, HostNameBin/binary,
":", Source/binary, ":", Target/binary>>,
- StartTime = httpd_util:rfc1123_date(),
+ ReplicationStartTime = httpd_util:rfc1123_date(),
+
+ {ok, SrcInstanceStartTime} = get_db_info(DbSrc),
+ {ok, TgtInstanceStartTime} = get_db_info(DbTgt),
case proplists:get_value(full, Options, false)
orelse proplists:get_value("full", Options, false) of
@@ -115,9 +118,28 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) ->
% nothing changed, don't record results
{ok, {OldRepHistoryProps}};
false ->
+ % commit changes to both src and tgt. The src because if changes
+ % we replicated are lost, we'll record the a seq number of ahead
+ % of what was committed and therefore lose future changes with the
+ % same seq nums.
+
+ {ok, SrcInstanceStartTime2} = ensure_full_commit(DbSrc),
+ {ok, TgtInstanceStartTime2} = ensure_full_commit(DbTgt),
+
+ RecordSeqNum =
+ if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
+ TgtInstanceStartTime2 == TgtInstanceStartTime ->
+ NewSeqNum;
+ true ->
+ ?LOG_INFO("A server has restarted sinced replication start. "
+ "Not recording the new sequence number to ensure the "
+ "replication is redone and documents reexamined.", []),
+ SeqNum
+ end,
+
HistEntries =[
{
- [{<<"start_time">>, list_to_binary(StartTime)},
+ [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
{<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
{<<"start_last_seq">>, SeqNum},
{<<"end_last_seq">>, NewSeqNum} | Stats]}
@@ -126,7 +148,7 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) ->
NewRepHistory =
{
[{<<"session_id">>, couch_util:new_uuid()},
- {<<"source_last_seq">>, NewSeqNum},
+ {<<"source_last_seq">>, RecordSeqNum},
{<<"history">>, lists:sublist(HistEntries, 50)}]},
{ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []),
@@ -276,6 +298,21 @@ close_db(#http_db{})->
close_db(Db)->
couch_db:close(Db).
+get_db_info(#http_db{uri=DbUrl, headers=Headers}) ->
+ {DbProps} = do_http_request(DbUrl, get, Headers),
+ {ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]};
+get_db_info(Db) ->
+ couch_db:get_db_info(Db).
+
+
+ensure_full_commit(#http_db{uri=DbUrl, headers=Headers}) ->
+ {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post, Headers, true),
+ true = proplists:get_value(<<"ok">>, ResultProps),
+ {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)};
+ensure_full_commit(Db) ->
+ couch_db:ensure_full_commit(Db).
+
+
get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) ->
Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey="
++ integer_to_list(StartSeq),
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index 45e92dce..68c6c5cb 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -149,7 +149,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{noreply, State#group_state{waiting_commit=false}};
true ->
% We can't commit the header because the database seq that's fully
- % committed to disk is still behind us. It we committed now and the
+ % committed to disk is still behind us. If we committed now and the
% database lost those changes our view could be forever out of sync
% with the database. But a crash before we commit these changes, no big
% deal, we only lose incremental changes since last committal.