diff options
-rw-r--r-- | share/www/script/couch_tests.js | 305 | ||||
-rw-r--r-- | src/couchdb/couch_db.erl | 11 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 1 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 10 | ||||
-rw-r--r-- | src/couchdb/couch_file.erl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 5 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 43 | ||||
-rw-r--r-- | src/couchdb/couch_view_group.erl | 2 |
8 files changed, 286 insertions, 97 deletions
diff --git a/share/www/script/couch_tests.js b/share/www/script/couch_tests.js index 20d8733f..3263c532 100644 --- a/share/www/script/couch_tests.js +++ b/share/www/script/couch_tests.js @@ -136,6 +136,89 @@ var tests = { // make sure we can still open T(db.open(existingDoc._id, {rev: existingDoc._rev}) != null); }, + + delayed_commits: function(debug) { + var db = new CouchDB("test_suite_db"); + db.deleteDb(); + db.createDb(); + if (debug) debugger; + + // By default, couchdb doesn't fully commit documents to disk right away, + // it waits about a second to batch the full commit flush along with any + // other updates. If it crashes or is restarted you may lose the most + // recent commits. + + T(db.save({_id:"1",a:2,b:4}).ok); + T(db.open("1") != null); + + restartServer(); + + T(db.open("1") == null); // lost the update. + // note if we waited > 1 sec before the restart, the doc would likely + // commit. + + + // Retry the same thing but with full commits on. + + var db2 = new CouchDB("test_suite_db", {"X-Couch-Full-Commit":"true"}); + + T(db2.save({_id:"1",a:2,b:4}).ok); + T(db2.open("1") != null); + + restartServer(); + + T(db2.open("1") != null); + + // You can update but without committing immediately, and then ensure + // everything is commited in the last step. + + T(db.save({_id:"2",a:2,b:4}).ok); + T(db.open("2") != null); + T(db.ensureFullCommit().ok); + restartServer(); + + T(db.open("2") != null); + + // However, it's possible even when flushed, that the server crashed between + // the update and the commit, and you don't want to check to make sure + // every doc you updated actually made it to disk. So record the instance + // start time of the database before the updates and then check it again + // after the flush (the instance start time is returned by the flush + // operation). if they are the same, we know everything was updated + // safely. + + // First try it with a crash. + + var instanceStartTime = db.info().instance_start_time; + + T(db.save({_id:"3",a:2,b:4}).ok); + T(db.open("3") != null); + + restartServer(); + + var commitResult = db.ensureFullCommit(); + T(commitResult.ok && commitResult.instance_start_time != instanceStartTime); + // start times don't match, meaning the server lost our change + + T(db.open("3") == null); // yup lost it + + // retry with no server restart + + var instanceStartTime = db.info().instance_start_time; + + T(db.save({_id:"4",a:2,b:4}).ok); + T(db.open("4") != null); + + var commitResult = db.ensureFullCommit(); + T(commitResult.ok && commitResult.instance_start_time == instanceStartTime); + // Successful commit, start times match! + + restartServer(); + + T(db.open("4") != null); + + }, + all_docs: function(debug) { var db = new CouchDB("test_suite_db"); db.deleteDb(); @@ -1881,97 +1964,153 @@ var tests = { dbA.createDb(); dbB.deleteDb(); dbB.createDb(); - - var docs = makeDocs(0, numDocs); - T(dbA.bulkSave(docs).ok); - - T(CouchDB.replicate(A, B).ok); - - for (var j = 0; j < numDocs; j++) { - docA = dbA.open("" + j); - docB = dbB.open("" + j); - T(docA._rev == docB._rev); - } - - // check documents with a '/' in the ID - // need to re-encode the slash when replicating from a remote source - dbA.save({ _id:"abc/def", val:"one" }); - T(CouchDB.replicate(A, B).ok); - T(CouchDB.replicate(B, A).ok); + var repTests = { + // copy and paste and put your code in. delete unused steps. + test_template: new function () { + this.init = function(dbA, dbB) { + // before anything has happened + } + this.afterAB1 = function(dbA, dbB) { + // called after replicating src=A tgt=B first time. + }; + this.afterBA1 = function(dbA, dbB) { + // called after replicating src=B tgt=A first time. + }; + this.afterAB2 = function(dbA, dbB) { + // called after replicating src=A tgt=B second time. + }; + this.afterBA2 = function(dbA, dbB) { + // etc... + }; + }, + + simple_test: new function () { + this.init = function(dbA, dbB) { + var docs = makeDocs(0, numDocs); + T(dbA.bulkSave(docs).ok); + }; + + this.afterAB1 = function(dbA, dbB) { + for (var j = 0; j < numDocs; j++) { + var docA = dbA.open("" + j); + var docB = dbB.open("" + j); + T(docA._rev == docB._rev); + } + }; + }, - docA = dbA.open("abc/def"); - docB = dbB.open("abc/def"); - T(docA._rev == docB._rev); + deletes_test: new function () { + this.init = function(dbA, dbB) { + T(dbA.save({_id:"foo1",value:"a"}).ok); + }; + + this.afterAB1 = function(dbA, dbB) { + var docA = dbA.open("foo1"); + var docB = dbB.open("foo1"); + T(docA._rev == docB._rev); + + dbA.deleteDoc(docA); + }; + + this.afterAB2 = function(dbA, dbB) { + T(dbA.open("foo1") == null); + T(dbB.open("foo1") == null); + }; + }, + + slashes_in_ids_test: new function () { + // make sure docs with slashes in id replicate properly + this.init = function(dbA, dbB) { + dbA.save({ _id:"abc/def", val:"one" }); + }; + + this.afterAB1 = function(dbA, dbB) { + var docA = dbA.open("abc/def"); + var docB = dbB.open("abc/def"); + T(docA._rev == docB._rev); + }; + }, - // now check binary attachments - var binDoc = { - _id:"bin_doc", - _attachments:{ - "foo.txt": { - "type":"base64", - "data": "VGhpcyBpcyBhIGJhc2U2NCBlbmNvZGVkIHRleHQ=" - } + attachments_test: new function () { + // Test attachments + this.init = function(dbA, dbB) { + dbA.save({ + _id:"bin_doc", + _attachments:{ + "foo.txt": { + "type":"base64", + "data": "VGhpcyBpcyBhIGJhc2U2NCBlbmNvZGVkIHRleHQ=" + } + } + }); + }; + + this.afterAB1 = function(dbA, dbB) { + var xhr = CouchDB.request("GET", "/test_suite_db_a/bin_doc/foo.txt"); + T(xhr.responseText == "This is a base64 encoded text") + + xhr = CouchDB.request("GET", "/test_suite_db_b/bin_doc/foo.txt"); + T(xhr.responseText == "This is a base64 encoded text") + }; + }, + + conflicts_test: new function () { + // test conflicts + this.init = function(dbA, dbB) { + dbA.save({_id:"foo",value:"a"}); + dbB.save({_id:"foo",value:"b"}); + }; + + this.afterBA1 = function(dbA, dbB) { + var docA = dbA.open("foo", {conflicts: true}); + var docB = dbB.open("foo", {conflicts: true}); + + // make sure the same rev is in each db + T(docA._rev === docB._rev); + + // make sure the conflicts are the same in each db + T(docA._conflicts[0] === docB._conflicts[0]); + + // delete a conflict. + dbA.deleteDoc({_id:"foo", _rev:docA._conflicts[0]}); + }; + + this.afterBA2 = function(dbA, dbB) { + // open documents and include the conflict meta data + var docA = dbA.open("foo", {conflicts: true}); + var docB = dbB.open("foo", {conflicts: true}); + + // We should have no conflicts this time + T(docA._conflicts === undefined) + T(docB._conflicts === undefined); + }; } - } - - dbA.save(binDoc); - - T(CouchDB.replicate(A, B).ok); - T(CouchDB.replicate(B, A).ok); - - xhr = CouchDB.request("GET", "/test_suite_db_a/bin_doc/foo.txt"); - T(xhr.responseText == "This is a base64 encoded text") - - xhr = CouchDB.request("GET", "/test_suite_db_b/bin_doc/foo.txt"); - T(xhr.responseText == "This is a base64 encoded text") - - dbA.save({_id:"foo1",value:"a"}); - - T(CouchDB.replicate(A, B).ok); - T(CouchDB.replicate(B, A).ok); - - docA = dbA.open("foo1"); - docB = dbB.open("foo1"); - T(docA._rev == docB._rev); - - dbA.deleteDoc(docA); - + }; + var test; + for(test in repTests) + if(repTests[test].init) repTests[test].init(dbA, dbB); + T(CouchDB.replicate(A, B).ok); + + for(test in repTests) + if(repTests[test].afterAB1) repTests[test].afterAB1(dbA, dbB); + T(CouchDB.replicate(B, A).ok); - - T(dbA.open("foo1") == null); - T(dbB.open("foo1") == null); - - dbA.save({_id:"foo",value:"a"}); - dbB.save({_id:"foo",value:"b"}); - + + for(test in repTests) + if(repTests[test].afterBA1) repTests[test].afterBA1(dbA, dbB); + T(CouchDB.replicate(A, B).ok); + + for(test in repTests) + if(repTests[test].afterAB2) repTests[test].afterAB2(dbA, dbB); + T(CouchDB.replicate(B, A).ok); - - // open documents and include the conflict meta data - docA = dbA.open("foo", {conflicts: true}); - docB = dbB.open("foo", {conflicts: true}); - - // make sure the same rev is in each db - T(docA._rev === docB._rev); - - // make sure the conflicts are the same in each db - T(docA._conflicts[0] === docB._conflicts[0]); - - // delete a conflict. - dbA.deleteDoc({_id:"foo", _rev:docA._conflicts[0]}); - - // replicate the change - T(CouchDB.replicate(A, B).ok); - - // open documents and include the conflict meta data - docA = dbA.open("foo", {conflicts: true}); - docB = dbB.open("foo", {conflicts: true}); - - // We should have no conflicts this time - T(docA._conflicts === undefined) - T(docB._conflicts === undefined); + + for(test in repTests) + if(repTests[test].afterBA2) repTests[test].afterBA2(dbA, dbB); + } }, diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index c9546240..3011d744 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -64,8 +64,9 @@ create(DbName, Options) -> open(DbName, Options) -> couch_server:open(DbName, Options). -ensure_full_commit(#db{update_pid=UpdatePid}) -> - gen_server:call(UpdatePid, full_commit, infinity). +ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) -> + ok = gen_server:call(UpdatePid, full_commit, infinity), + {ok, StartTime}. close(#db{fd=Fd}) -> couch_file:drop_ref(Fd). @@ -166,7 +167,8 @@ get_db_info(Db) -> compactor_pid=Compactor, update_seq=SeqNum, name=Name, - fulldocinfo_by_id_btree=FullDocBtree} = Db, + fulldocinfo_by_id_btree=FullDocBtree, + instance_start_time=StartTime} = Db, {ok, Size} = couch_file:bytes(Fd), {ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree), InfoList = [ @@ -176,7 +178,8 @@ get_db_info(Db) -> {update_seq, SeqNum}, {purge_seq, couch_db:get_purge_seq(Db)}, {compact_running, Compactor/=nil}, - {disk_size, Size} + {disk_size, Size}, + {instance_start_time, StartTime} ], {ok, InfoList}. diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 830f2f9a..238025a2 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -120,6 +120,7 @@ {main_pid=nil, update_pid=nil, compactor_pid=nil, + instance_start_time, % number of microsecs since jan 1 1970 as a binary string fd, header = #db_header{}, summary_stream, diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index 94605a3c..cbeda223 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -279,6 +279,12 @@ init_db(DbName, Filepath, Fd, Header0) -> AdminsPtr -> {ok, Admins} = couch_file:pread_term(Fd, AdminsPtr) end, + + % convert start time tuple to microsecs and store as a binary string + {MegaSecs, Secs, MicroSecs} = now(), + StartTime = ?l2b(io_lib:format("~p", + [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])), + #db{ update_pid=self(), fd=Fd, @@ -291,7 +297,9 @@ init_db(DbName, Filepath, Fd, Header0) -> name = DbName, filepath = Filepath, admins = Admins, - admins_ptr = AdminsPtr}. + admins_ptr = AdminsPtr, + instance_start_time = StartTime + }. close_db(#db{fd=Fd,summary_stream=Ss}) -> diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index 9e60eb09..b29f45d2 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -170,7 +170,8 @@ close(Fd) -> Result. close_maybe(Fd) -> - gen_server:cast(Fd, {close_maybe, self()}). + catch unlink(Fd), + catch gen_server:cast(Fd, close_maybe). drop_ref(Fd) -> drop_ref(Fd, self()). @@ -372,8 +373,7 @@ handle_call(num_refs, _From, Fd) -> handle_cast(close, Fd) -> {stop,normal,Fd}; -handle_cast({close_maybe, Pid}, Fd) -> - catch unlink(Pid), +handle_cast(close_maybe, Fd) -> maybe_close_async(Fd); handle_cast({drop_ref, Pid}, Fd) -> case get(Pid) of diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 5366da1e..0519061b 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -90,9 +90,10 @@ db_req(#httpd{path_parts=[_DbName]}=Req, _Db) -> send_method_not_allowed(Req, "DELETE,GET,HEAD,POST"); db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) -> - ok = couch_db:ensure_full_commit(Db), + {ok, DbStartTime} = couch_db:ensure_full_commit(Db), send_json(Req, 201, {[ - {ok, true} + {ok, true}, + {instance_start_time, DbStartTime} ]}); db_req(#httpd{path_parts=[_,<<"_ensure_full_commit">>]}=Req, _Db) -> diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 881525f0..29f1fc80 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -69,7 +69,10 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) -> RepRecKey = <<?LOCAL_DOC_PREFIX, HostNameBin/binary, ":", Source/binary, ":", Target/binary>>, - StartTime = httpd_util:rfc1123_date(), + ReplicationStartTime = httpd_util:rfc1123_date(), + + {ok, SrcInstanceStartTime} = get_db_info(DbSrc), + {ok, TgtInstanceStartTime} = get_db_info(DbTgt), case proplists:get_value(full, Options, false) orelse proplists:get_value("full", Options, false) of @@ -115,9 +118,28 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) -> % nothing changed, don't record results {ok, {OldRepHistoryProps}}; false -> + % commit changes to both src and tgt. The src because if changes + % we replicated are lost, we'll record the a seq number of ahead + % of what was committed and therefore lose future changes with the + % same seq nums. + + {ok, SrcInstanceStartTime2} = ensure_full_commit(DbSrc), + {ok, TgtInstanceStartTime2} = ensure_full_commit(DbTgt), + + RecordSeqNum = + if SrcInstanceStartTime2 == SrcInstanceStartTime andalso + TgtInstanceStartTime2 == TgtInstanceStartTime -> + NewSeqNum; + true -> + ?LOG_INFO("A server has restarted sinced replication start. " + "Not recording the new sequence number to ensure the " + "replication is redone and documents reexamined.", []), + SeqNum + end, + HistEntries =[ { - [{<<"start_time">>, list_to_binary(StartTime)}, + [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, {<<"start_last_seq">>, SeqNum}, {<<"end_last_seq">>, NewSeqNum} | Stats]} @@ -126,7 +148,7 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) -> NewRepHistory = { [{<<"session_id">>, couch_util:new_uuid()}, - {<<"source_last_seq">>, NewSeqNum}, + {<<"source_last_seq">>, RecordSeqNum}, {<<"history">>, lists:sublist(HistEntries, 50)}]}, {ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []), @@ -276,6 +298,21 @@ close_db(#http_db{})-> close_db(Db)-> couch_db:close(Db). +get_db_info(#http_db{uri=DbUrl, headers=Headers}) -> + {DbProps} = do_http_request(DbUrl, get, Headers), + {ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]}; +get_db_info(Db) -> + couch_db:get_db_info(Db). + + +ensure_full_commit(#http_db{uri=DbUrl, headers=Headers}) -> + {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post, Headers, true), + true = proplists:get_value(<<"ok">>, ResultProps), + {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)}; +ensure_full_commit(Db) -> + couch_db:ensure_full_commit(Db). + + get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) -> Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey=" ++ integer_to_list(StartSeq), diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 45e92dce..68c6c5cb 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -149,7 +149,7 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {noreply, State#group_state{waiting_commit=false}}; true -> % We can't commit the header because the database seq that's fully - % committed to disk is still behind us. It we committed now and the + % committed to disk is still behind us. If we committed now and the % database lost those changes our view could be forever out of sync % with the database. But a crash before we commit these changes, no big % deal, we only lose incremental changes since last committal. |