diff options
-rw-r--r-- | share/www/script/test/replicator_db.js | 67 | ||||
-rw-r--r-- | src/couchdb/couch_rep.erl | 45 | ||||
-rw-r--r-- | src/couchdb/couch_replication_manager.erl | 41 |
3 files changed, 142 insertions, 11 deletions
diff --git a/share/www/script/test/replicator_db.js b/share/www/script/test/replicator_db.js index 4434124e..058b6a7a 100644 --- a/share/www/script/test/replicator_db.js +++ b/share/www/script/test/replicator_db.js @@ -1279,6 +1279,68 @@ couchTests.replicator_db = function(debug) { } + function test_invalid_filter() { + // COUCHDB-1199 - replication document with a filter field that was invalid + // crashed the CouchDB server. + var repDoc1 = { + _id: "rep1", + source: "couch_foo_test_db", + target: "couch_bar_test_db", + filter: "test/foofilter" + }; + + TEquals(true, repDb.save(repDoc1).ok); + + waitForRep(repDb, repDoc1, "error"); + repDoc1 = repDb.open(repDoc1._id); + TEquals("undefined", typeof repDoc1._replication_id); + TEquals("error", repDoc1._replication_state); + + populate_db(dbA, docs1); + populate_db(dbB, []); + + var repDoc2 = { + _id: "rep2", + source: dbA.name, + target: dbB.name, + filter: "test/foofilter" + }; + + TEquals(true, repDb.save(repDoc2).ok); + + waitForRep(repDb, repDoc2, "error"); + repDoc2 = repDb.open(repDoc2._id); + TEquals("undefined", typeof repDoc2._replication_id); + TEquals("error", repDoc2._replication_state); + + var ddoc = { + _id: "_design/mydesign", + language : "javascript", + filters : { + myfilter : (function(doc, req) { + return true; + }).toString() + } + }; + + TEquals(true, dbA.save(ddoc).ok); + + var repDoc3 = { + _id: "rep3", + source: dbA.name, + target: dbB.name, + filter: "mydesign/myfilter" + }; + + TEquals(true, repDb.save(repDoc3).ok); + + waitForRep(repDb, repDoc3, "completed"); + repDoc3 = repDb.open(repDoc3._id); + TEquals("string", typeof repDoc3._replication_id); + TEquals("completed", repDoc3._replication_state); + } + + // run all the tests var server_config = [ { @@ -1355,6 +1417,11 @@ couchTests.replicator_db = function(debug) { restartServer(); run_on_modified_server(server_config, rep_doc_field_validation); + + repDb.deleteDb(); + restartServer(); + run_on_modified_server(server_config, test_invalid_filter); + /* * Disabled, since error state would be set on the document only after * the exponential backoff retry done by the replicator database listener diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 6e4295ea..9d86e7a5 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -486,7 +486,17 @@ make_replication_id(RepProps, UserCtx) -> % add a new clause and increase ?REP_ID_VERSION at the top make_replication_id({Props}, UserCtx, 2) -> {ok, HostName} = inet:gethostname(), - Port = mochiweb_socket_server:get(couch_httpd, port), + Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of + P when is_number(P) -> + P; + _ -> + % On restart we might be called before the couch_httpd process is + % started. + % TODO: we might be under an SSL socket server only, or both under + % SSL and a non-SSL socket. + % ... mochiweb_socket_server:get(https, port) + list_to_integer(couch_config:get("httpd", "port", "5984")) + end, Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), maybe_append_filters({Props}, [HostName, Port, Src, Tgt], UserCtx); @@ -513,16 +523,37 @@ maybe_append_filters({Props}, Base, UserCtx) -> couch_util:to_hex(couch_util:md5(term_to_binary(Base2))). filter_code(Filter, Props, UserCtx) -> - {match, [DDocName, FilterName]} = - re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]), + {DDocName, FilterName} = + case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of + {match, [DDocName0, FilterName0]} -> + {DDocName0, FilterName0}; + _ -> + throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>}) + end, ProxyParams = parse_proxy_params( couch_util:get_value(<<"proxy">>, Props, [])), - Source = open_db( - couch_util:get_value(<<"source">>, Props), UserCtx, ProxyParams), + DbName = couch_util:get_value(<<"source">>, Props), + Source = try + open_db(DbName, UserCtx, ProxyParams) + catch + _Tag:DbError -> + DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s", + [couch_util:url_strip_password(DbName), couch_util:to_binary(DbError)]), + throw({error, iolist_to_binary(DbErrorMsg)}) + end, try - {ok, DDoc} = open_doc(Source, <<"_design/", DDocName/binary>>), + Body = case (catch open_doc(Source, <<"_design/", DDocName/binary>>)) of + {ok, #doc{body = Body0}} -> + Body0; + DocError -> + DocErrorMsg = io_lib:format( + "Couldn't open document `_design/~s` from source " + "database `~s`: ~s", + [dbname(Source), DDocName, couch_util:to_binary(DocError)]), + throw({error, iolist_to_binary(DocErrorMsg)}) + end, Code = couch_util:get_nested_json_value( - DDoc#doc.body, [<<"filters">>, FilterName]), + Body, [<<"filters">>, FilterName]), re:replace(Code, "^\s*(.*?)\s*$", "\\1", [{return, binary}]) after close_db(Source) diff --git a/src/couchdb/couch_replication_manager.erl b/src/couchdb/couch_replication_manager.erl index 6537c8b2..e3d97c37 100644 --- a/src/couchdb/couch_replication_manager.erl +++ b/src/couchdb/couch_replication_manager.erl @@ -32,7 +32,8 @@ -import(couch_util, [ get_value/2, - get_value/3 + get_value/3, + to_binary/1 ]). @@ -62,8 +63,16 @@ init(_) -> }}. -handle_call({rep_db_update, Change}, _From, State) -> - {reply, ok, process_update(State, Change)}; +handle_call({rep_db_update, {ChangeProps} = Change}, _From, State) -> + NewState = try + process_update(State, Change) + catch + _Tag:Error -> + JsonRepDoc = get_value(doc, ChangeProps), + rep_db_update_error(Error, JsonRepDoc), + State + end, + {reply, ok, NewState}; handle_call({triggered, {BaseId, _}}, _From, State) -> [{BaseId, {DocId, true}}] = ets:lookup(?REP_ID_TO_DOC_ID, BaseId), @@ -250,6 +259,19 @@ process_update(State, {Change}) -> end. +rep_db_update_error(Error, {Props} = JsonRepDoc) -> + case Error of + {bad_rep_doc, Reason} -> + ok; + _ -> + Reason = to_binary(Error) + end, + ?LOG_ERROR("Replication manager, error processing document `~s`: ~s", + [get_value(<<"_id">>, Props), Reason]), + couch_rep:update_rep_doc( + JsonRepDoc, [{<<"_replication_state">>, <<"error">>}]). + + rep_user_ctx({RepDoc}) -> case get_value(<<"user_ctx">>, RepDoc) of undefined -> @@ -265,7 +287,7 @@ rep_user_ctx({RepDoc}) -> maybe_start_replication(#state{max_retries = MaxRetries} = State, DocId, JsonRepDoc) -> UserCtx = rep_user_ctx(JsonRepDoc), - {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), + {BaseId, _} = RepId = make_rep_id(JsonRepDoc, UserCtx), case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of [] -> true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, {DocId, true}}), @@ -290,6 +312,17 @@ maybe_start_replication(#state{max_retries = MaxRetries} = State, end. +make_rep_id(RepDoc, UserCtx) -> + try + couch_rep:make_replication_id(RepDoc, UserCtx) + catch + throw:{error, Reason} -> + throw({bad_rep_doc, Reason}); + Tag:Err -> + throw({bad_rep_doc, to_binary({Tag, Err})}) + end. + + maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> case get_value(<<"_replication_id">>, Props) of RepId -> |