From 2bdd75901fba402068d08e316a3ac32249307e27 Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Fri, 26 Feb 2010 01:11:02 +0000 Subject: fdmananas patch for filtered replication via COUCHDB-673 git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@916518 13f79535-47bb-0310-9956-ffa450edef68 --- share/www/script/test/replication.js | 77 +++++++++ src/couchdb/Makefile.am | 2 + src/couchdb/couch_db.hrl | 12 ++ src/couchdb/couch_httpd_db.erl | 265 +++++++++---------------------- src/couchdb/couch_query_servers.erl | 7 +- src/couchdb/couch_rep.erl | 6 +- src/couchdb/couch_rep_changes_feed.erl | 136 ++++++++-------- src/couchdb/couch_rep_missing_revs.erl | 20 ++- test/etap/111-replication-changes-feed.t | 10 +- 9 files changed, 270 insertions(+), 265 deletions(-) diff --git a/share/www/script/test/replication.js b/share/www/script/test/replication.js index a5ed5110..c6f6ff61 100644 --- a/share/www/script/test/replication.js +++ b/share/www/script/test/replication.js @@ -377,4 +377,81 @@ couchTests.replication = function(debug) { T(docFoo666 === null); } + // test filtered replication + + var sourceDb = new CouchDB( + "test_suite_filtered_rep_db_a", {"X-Couch-Full-Commit":"false"} + ); + + sourceDb.deleteDb(); + sourceDb.createDb(); + + T(sourceDb.save({_id:"foo1",value:1}).ok); + T(sourceDb.save({_id:"foo2",value:2}).ok); + T(sourceDb.save({_id:"foo3",value:3}).ok); + T(sourceDb.save({_id:"foo4",value:4}).ok); + T(sourceDb.save({ + "_id": "_design/mydesign", + "language" : "javascript", + "filters" : { + "myfilter" : (function(doc, req) { + if (doc.value < Number(req.query.maxvalue)) { + return true; + } else { + return false; + } + }).toString() + } + }).ok); + + var dbPairs = [ + {source:"test_suite_filtered_rep_db_a", + target:"test_suite_filtered_rep_db_b"}, + {source:"test_suite_filtered_rep_db_a", + target:"http://" + host + "/test_suite_filtered_rep_db_b"}, + {source:"http://" + host + "/test_suite_filtered_rep_db_a", + target:"test_suite_filtered_rep_db_b"}, + {source:"http://" + host + "/test_suite_filtered_rep_db_a", + target:"http://" + host + "/test_suite_filtered_rep_db_b"} + ]; + + for (var i = 0; i < dbPairs.length; i++) { + var targetDb = new CouchDB("test_suite_filtered_rep_db_b"); + targetDb.deleteDb(); + targetDb.createDb(); + + var dbA = dbPairs[i].source; + var dbB = dbPairs[i].target; + + var repResult = CouchDB.replicate(dbA, dbB, { + body: { + "filter" : "mydesign/myfilter", + "query_params" : { + "maxvalue": "3" + } + } + }); + + T(repResult.ok); + T($.isArray(repResult.history)); + T(repResult.history.length === 1); + T(repResult.history[0].docs_written === 2); + T(repResult.history[0].docs_read === 2); + T(repResult.history[0].doc_write_failures === 0); + + var docFoo1 = targetDb.open("foo1"); + T(docFoo1 !== null); + T(docFoo1.value === 1); + + var docFoo2 = targetDb.open("foo2"); + T(docFoo2 !== null); + T(docFoo2.value === 2); + + var docFoo3 = targetDb.open("foo3"); + T(docFoo3 === null); + + var docFoo4 = targetDb.open("foo4"); + T(docFoo4 === null); + } + }; diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 9c7e9d4d..b5c84f33 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -30,6 +30,7 @@ source_files = \ couch.erl \ couch_app.erl \ couch_btree.erl \ + couch_changes.erl \ couch_config.erl \ couch_config_writer.erl \ couch_db.erl \ @@ -86,6 +87,7 @@ compiled_files = \ couch.beam \ couch_app.beam \ couch_btree.beam \ + couch_changes.beam \ couch_config.beam \ couch_config_writer.beam \ couch_db.beam \ diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index f74bade7..bb6b4f73 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -264,3 +264,15 @@ % small value used in revision trees to indicate the revision isn't stored -define(REV_MISSING, []). + +-record(changes_args, { + feed = "normal", + dir = fwd, + since = 0, + limit = 1000000000000000, + style = main_only, + heartbeat, + timeout, + filter = "", + include_docs = false +}). diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 9ad34752..1028e857 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -55,200 +55,62 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method, do_db_req(Req, Handler) end. -get_changes_timeout(Req, Resp) -> - DefaultTimeout = list_to_integer( - couch_config:get("httpd", "changes_timeout", "60000")), - case couch_httpd:qs_value(Req, "heartbeat") of - undefined -> - case couch_httpd:qs_value(Req, "timeout") of - undefined -> - {DefaultTimeout, fun() -> stop end}; - TimeoutList -> - {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]), - fun() -> stop end} - end; - "true" -> - {DefaultTimeout, fun() -> send_chunk(Resp, "\n"), ok end}; - TimeoutList -> - {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]), - fun() -> send_chunk(Resp, "\n"), ok end} - end. - - -start_sending_changes(_Resp, "continuous") -> - ok; -start_sending_changes(Resp, _Else) -> - send_chunk(Resp, "{\"results\":[\n"). - -handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> - FilterFun = make_filter_fun(Req, Db), - {ok, Info} = couch_db:get_db_info(Db), - Seq = proplists:get_value(update_seq, Info), - {Dir, StartSeq} = case couch_httpd:qs_value(Req, "descending", "false") of - "false" -> - {fwd, list_to_integer(couch_httpd:qs_value(Req, "since", "0"))}; - "true" -> - {rev, Seq}; - _Bad -> throw({bad_request, "descending must be true or false"}) +handle_changes_req(#httpd{method='GET'}=Req, Db) -> + MakeCallback = fun(Resp) -> + fun({change, Change, _}, "continuous") -> + send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]); + ({change, Change, Prepend}, _) -> + send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]); + (start, "continuous") -> + ok; + (start, _) -> + send_chunk(Resp, "{\"results\":[\n"); + ({stop, EndSeq}, "continuous") -> + send_chunk( + Resp, + [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"] + ), + end_json_response(Resp); + ({stop, EndSeq}, _) -> + send_chunk( + Resp, + io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq]) + ), + end_json_response(Resp); + (timeout, _) -> + send_chunk(Resp, "\n") + end end, - Limit = list_to_integer(couch_httpd:qs_value(Req, "limit", "1000000000000000")), - ResponseType = couch_httpd:qs_value(Req, "feed", "normal"), - if ResponseType == "continuous" orelse ResponseType == "longpoll" -> - {ok, Resp} = start_json_response(Req, 200), - start_sending_changes(Resp, ResponseType), - - Self = self(), - {ok, Notify} = couch_db_update_notifier:start_link( - fun({_, DbName0}) when DbName0 == DbName -> - Self ! db_updated; - (_) -> - ok - end), - {Timeout, TimeoutFun} = get_changes_timeout(Req, Resp), - couch_stats_collector:track_process_count(Self, - {httpd, clients_requesting_changes}), - try - keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>, Timeout, - TimeoutFun, ResponseType, Limit, FilterFun) - after - couch_db_update_notifier:stop(Notify), - get_rest_db_updated() % clean out any remaining update messages - end; - true -> + ChangesArgs = parse_changes_query(Req), + ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db), + WrapperFun = case ChangesArgs#changes_args.feed of + "normal" -> + {ok, Info} = couch_db:get_db_info(Db), CurrentEtag = couch_httpd:make_etag(Info), - couch_httpd:etag_respond(Req, CurrentEtag, fun() -> - % send the etag - {ok, Resp} = start_json_response(Req, 200, [{"Etag", CurrentEtag}]), - start_sending_changes(Resp, ResponseType), - {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} = - send_changes(Req, Resp, Db, Dir, StartSeq, <<"">>, "normal", - Limit, FilterFun), - end_sending_changes(Resp, LastSeq, ResponseType) - end) - end; + fun(FeedChangesFun) -> + couch_httpd:etag_respond( + Req, + CurrentEtag, + fun() -> + {ok, Resp} = couch_httpd:start_json_response( + Req, 200, [{"Etag", CurrentEtag}] + ), + FeedChangesFun(MakeCallback(Resp)) + end + ) + end; + _ -> + % "longpoll" or "continuous" + {ok, Resp} = couch_httpd:start_json_response(Req, 200), + fun(FeedChangesFun) -> + FeedChangesFun(MakeCallback(Resp)) + end + end, + WrapperFun(ChangesFun); handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> send_method_not_allowed(Req, "GET,HEAD"). -% waits for a db_updated msg, if there are multiple msgs, collects them. -wait_db_updated(Timeout, TimeoutFun) -> - receive db_updated -> get_rest_db_updated() - after Timeout -> - case TimeoutFun() of - ok -> wait_db_updated(Timeout, TimeoutFun); - stop -> stop - end - end. - -get_rest_db_updated() -> - receive db_updated -> get_rest_db_updated() - after 0 -> updated - end. - -end_sending_changes(Resp, EndSeq, "continuous") -> - send_chunk(Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]), - end_json_response(Resp); -end_sending_changes(Resp, EndSeq, _Else) -> - send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])), - end_json_response(Resp). - -keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, - Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType, Limit, Filter) -> - {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(Req, Resp, Db, fwd, StartSeq, - Prepend, ResponseType, Limit, Filter), - couch_db:close(Db), - if - Limit > NewLimit, ResponseType == "longpoll" -> - end_sending_changes(Resp, EndSeq, ResponseType); - true -> - case wait_db_updated(Timeout, TimeoutFun) of - updated -> - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db2} -> - keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, - TimeoutFun, ResponseType, NewLimit, Filter); - _Else -> - end_sending_changes(Resp, EndSeq, ResponseType) - end; - stop -> - end_sending_changes(Resp, EndSeq, ResponseType) - end - end. - -changes_enumerator(DocInfos, {Db, _, _, FilterFun, Resp, "continuous", Limit, IncludeDocs}) -> - [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos, - Results0 = FilterFun(DocInfos), - Results = [Result || Result <- Results0, Result /= null], - Go = if Limit =< 1 -> stop; true -> ok end, - case Results of - [] -> - {Go, {Db, Seq, nil, FilterFun, Resp, "continuous", Limit, IncludeDocs}}; - _ -> - send_chunk(Resp, [?JSON_ENCODE(changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs)) - |"\n"]), - {Go, {Db, Seq, nil, FilterFun, Resp, "continuous", Limit-1, IncludeDocs}} - end; -changes_enumerator(DocInfos, {Db, _, Prepend, FilterFun, Resp, _, Limit, IncludeDocs}) -> - [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos, - Results0 = FilterFun(DocInfos), - Results = [Result || Result <- Results0, Result /= null], - Go = if Limit =< 1 -> stop; true -> ok end, - case Results of - [] -> - {Go, {Db, Seq, Prepend, FilterFun, Resp, nil, Limit, IncludeDocs}}; - _ -> - send_chunk(Resp, [Prepend, ?JSON_ENCODE( - changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs))]), - {Go, {Db, Seq, <<",\n">>, FilterFun, Resp, nil, Limit-1, IncludeDocs}} - end. - -changes_row(Db, Seq, Id, Del, Results, Rev, true) -> - {[{seq,Seq},{id,Id},{changes,Results}] ++ deleted_item(Del) ++ - couch_httpd_view:doc_member(Db, {Id, Rev})}; -changes_row(_, Seq, Id, Del, Results, _, false) -> - {[{seq,Seq},{id,Id},{changes,Results}] ++ deleted_item(Del)}. - -deleted_item(true) -> [{deleted,true}]; -deleted_item(_) -> []. - -send_changes(Req, Resp, Db, Dir, StartSeq, Prepend, ResponseType, Limit, FilterFun) -> - Style = list_to_existing_atom( - couch_httpd:qs_value(Req, "style", "main_only")), - IncludeDocs = list_to_existing_atom( - couch_httpd:qs_value(Req, "include_docs", "false")), - couch_db:changes_since(Db, Style, StartSeq, fun changes_enumerator/2, - [{dir, Dir}], {Db, StartSeq, Prepend, FilterFun, Resp, ResponseType, Limit, IncludeDocs}). - -make_filter_fun(Req, Db) -> - Filter = couch_httpd:qs_value(Req, "filter", ""), - case [list_to_binary(couch_httpd:unquote(Part)) - || Part <- string:tokens(Filter, "/")] of - [] -> - fun(DocInfos) -> - % doing this as a batch is more efficient for external filters - [{[{rev, couch_doc:rev_to_str(Rev)}]} || - #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos] - end; - [DName, FName] -> - DesignId = <<"_design/", DName/binary>>, - DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), - % validate that the ddoc has the filter fun - #doc{body={Props}} = DDoc, - couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), - fun(DocInfos) -> - Docs = [Doc || {ok, Doc} <- [ - {ok, _Doc} = couch_db:open_doc(Db, DInfo, [deleted, conflicts]) - || DInfo <- DocInfos]], - {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), - [{[{rev, couch_doc:rev_to_str(Rev)}]} - || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos, - Pass <- Passes, Pass == true] - end; - _Else -> - throw({bad_request, - "filter parameter must be of the form `designname/filtername`"}) - end. - handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, _Db) -> ok = couch_view_compactor:start_compact(DbName, Id), send_json(Req, 202, {[{ok, true}]}); @@ -1188,6 +1050,33 @@ parse_doc_query(Req) -> end end, #doc_query_args{}, couch_httpd:qs(Req)). +parse_changes_query(Req) -> + lists:foldl(fun({Key, Value}, Args) -> + case {Key, Value} of + {"feed", _} -> + Args#changes_args{feed=Value}; + {"descending", "true"} -> + Args#changes_args{dir=rev}; + {"since", _} -> + Args#changes_args{since=list_to_integer(Value)}; + {"limit", _} -> + Args#changes_args{limit=list_to_integer(Value)}; + {"style", _} -> + Args#changes_args{style=list_to_existing_atom(Value)}; + {"heartbeat", "true"} -> + Args#changes_args{heartbeat=true}; + {"heartbeat", _} -> + Args#changes_args{heartbeat=list_to_integer(Value)}; + {"timeout", _} -> + Args#changes_args{timeout=list_to_integer(Value)}; + {"include_docs", "true"} -> + Args#changes_args{include_docs=true}; + {"filter", _} -> + Args#changes_args{filter=Value}; + _Else -> % unknown key value pair, ignore. + Args + end + end, #changes_args{}, couch_httpd:qs(Req)). extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)-> extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev)); diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index a90681c5..3095b199 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -183,7 +183,12 @@ json_doc(Doc) -> couch_doc:to_json_obj(Doc, [revs]). filter_docs(Req, Db, DDoc, FName, Docs) -> - JsonReq = couch_httpd_external:json_req_obj(Req, Db), + JsonReq = case Req of + {json_req, JsonObj} -> + JsonObj; + #httpd{} = HttpReq -> + couch_httpd_external:json_req_obj(HttpReq, Db) + end, JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs], JsonCtx = couch_util:json_user_ctx(Db), [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName], [JsonDocs, JsonReq, JsonCtx]), diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 4bad4a9f..c0010a94 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -451,7 +451,11 @@ make_replication_id({Props}, UserCtx) -> % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, proplists:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, proplists:get_value(<<"target">>, Props)), - Base = couch_util:to_hex(erlang:md5(term_to_binary([HostName, Src, Tgt]))), + Filter = proplists:get_value(<<"filter">>, Props), + QueryParams = proplists:get_value(<<"query_params">>, Props), + Base = couch_util:to_hex(erlang:md5( + term_to_binary([HostName, Src, Tgt, Filter, QueryParams]) + )), Extension = maybe_append_options( [<<"continuous">>, <<"create_target">>], Props), {Base, Extension}. diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index cdfed6a0..67cfabe4 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -53,11 +53,35 @@ init([_Parent, #http_db{}=Source, Since, PostProps] = Args) -> true -> continuous end, + BaseQS = [ + {"style", all_docs}, + {"heartbeat", 10000}, + {"since", Since}, + {"feed", Feed} + ], + QS = case proplists:get_value(<<"filter">>, PostProps) of + undefined -> + BaseQS; + FilterName -> + {Params} = proplists:get_value(<<"query_params">>, PostProps, {[]}), + lists:foldr( + fun({K, V}, QSAcc) -> + Ks = couch_util:to_list(K), + case proplists:is_defined(Ks, QSAcc) of + true -> + QSAcc; + false -> + [{Ks, V} | QSAcc] + end + end, + [{"filter", FilterName} | BaseQS], + Params + ) + end, Pid = couch_rep_httpc:spawn_link_worker_process(Source), Req = Source#http_db{ resource = "_changes", - qs = [{style, all_docs}, {heartbeat, 10000}, {since, Since}, - {feed, Feed}], + qs = QS, conn = Pid, options = [{stream_to, {self(), once}}, {response_format, binary}, {inactivity_timeout, 31000}], % miss 3 heartbeats, assume death @@ -94,20 +118,54 @@ init([_Parent, #http_db{}=Source, Since, PostProps] = Args) -> init([_Parent, Source, Since, PostProps] = InitArgs) -> process_flag(trap_exit, true), Server = self(), - ChangesPid = - case proplists:get_value(<<"continuous">>, PostProps, false) of - false -> - spawn_link(fun() -> send_local_changes_once(Server, Source, Since) end); - true -> - spawn_link(fun() -> - Self = self(), - {ok, _} = couch_db_update_notifier:start_link(fun(Msg) -> - local_update_notification(Self, Source#db.name, Msg) end), - send_local_changes_forever(Server, Source, Since) + ChangesArgs = #changes_args{ + style = all_docs, + since = Since, + filter = ?b2l(proplists:get_value(<<"filter">>, PostProps, <<>>)), + feed = case proplists:get_value(<<"continuous">>, PostProps, false) of + true -> + "continuous"; + false -> + "normal" + end + }, + ChangesPid = spawn_link(fun() -> + ChangesFeedFun = couch_changes:handle_changes( + ChangesArgs, + {json_req, filter_json_req(Source, PostProps)}, + Source + ), + ChangesFeedFun(fun({change, Change, _}, _) -> + gen_server:call(Server, {add_change, Change}, infinity); + (_, _) -> + ok end) - end, + end), {ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}. +filter_json_req(Db, PostProps) -> + case proplists:get_value(<<"filter">>, PostProps) of + undefined -> + {[]}; + FilterName -> + {Query} = proplists:get_value(<<"query_params">>, PostProps, {[]}), + {ok, Info} = couch_db:get_db_info(Db), + % simulate a request to db_name/_changes + {[ + {<<"info">>, {Info}}, + {<<"id">>, null}, + {<<"method">>, 'GET'}, + {<<"path">>, [couch_db:name(Db), <<"_changes">>]}, + {<<"query">>, {[{<<"filter">>, FilterName} | Query]}}, + {<<"headers">>, []}, + {<<"body">>, []}, + {<<"peer">>, <<"replicator">>}, + {<<"form">>, []}, + {<<"cookie">>, []}, + {<<"userCtx">>, couch_util:json_user_ctx(Db)} + ]} + end. + handle_call({add_change, Row}, From, State) -> handle_add_change(Row, From, State); @@ -302,23 +360,7 @@ by_seq_loop(Server, Source, StartSeq) -> decode_row(<<",", Rest/binary>>) -> decode_row(Rest); decode_row(Row) -> - {Props} = ?JSON_DECODE(Row), - % [Seq, Id, {<<"changes">>,C}] - Seq = proplists:get_value(<<"seq">>, Props), - Id = proplists:get_value(<<"id">>, Props), - C = proplists:get_value(<<"changes">>, Props), - C2 = [{[{<<"rev">>,couch_doc:parse_rev(R)}]} || {[{<<"rev">>,R}]} <- C], - {[{<<"seq">>, Seq}, {<<"id">>,Id}, {<<"changes">>,C2}]}. - -flush_updated_messages() -> - receive updated -> flush_updated_messages() - after 0 -> ok - end. - -local_update_notification(Self, DbName, {updated, DbName}) -> - Self ! updated; -local_update_notification(_, _, _) -> - ok. + ?JSON_DECODE(Row). maybe_stream_next(#state{reqid=nil}) -> ok; @@ -327,35 +369,6 @@ maybe_stream_next(#state{complete=false, count=N} = S) when N < ?BUFFER_SIZE -> maybe_stream_next(_) -> ok. -send_local_changes_forever(Server, Db, Since) -> - #db{name = DbName, user_ctx = UserCtx} = Db, - {ok, NewSeq} = send_local_changes_once(Server, Db, Since), - couch_db:close(Db), - ok = wait_db_updated(), - {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]), - send_local_changes_forever(Server, NewDb, NewSeq). - -send_local_changes_once(Server, Db, Since) -> - FilterFun = - fun(#doc_info{revs=[#rev_info{rev=Rev}|_]}) -> - {[{<<"rev">>, Rev}]} - end, - - ChangesFun = - fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, _) -> - Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], - Results = [Result || Result <- Results0, Result /= null], - if Results /= [] -> - Change = {[{<<"seq">>,Seq}, {<<"id">>,Id}, {<<"changes">>,Results}]}, - gen_server:call(Server, {add_change, Change}, infinity); - true -> - ok - end, - {ok, Seq} - end, - - couch_db:changes_since(Db, all_docs, Since, ChangesFun, Since). - start_http_request(RawUrl) -> Url = ibrowse_lib:parse_url(RawUrl), {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port), @@ -367,8 +380,3 @@ start_http_request(RawUrl) -> {ibrowse_req_id, Id} = ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity), {Pid, Id}. - -wait_db_updated() -> - receive updated -> - flush_updated_messages() - end. diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl index 5790dd71..7b4956e2 100644 --- a/src/couchdb/couch_rep_missing_revs.erl +++ b/src/couchdb/couch_rep_missing_revs.erl @@ -144,11 +144,15 @@ changes_loop(OurServer, SourceChangesServer, Target) -> changes_loop(OurServer, SourceChangesServer, Target). get_missing_revs(#http_db{}=Target, Changes) -> - Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) -> - {Id, [couch_doc:rev_to_str(R) || {[{<<"rev">>, R}]} <- C]} end, + Transform = fun({Props}) -> + C = proplists:get_value(<<"changes">>, Props), + Id = proplists:get_value(<<"id">>, Props), + {Id, [R || {[{<<"rev">>, R}]} <- C]} + end, IdRevsList = [Transform(Change) || Change <- Changes], SeqDict = changes_dictionary(Changes), - {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes), + {LastProps} = lists:last(Changes), + HighSeq = proplists:get_value(<<"seq">>, LastProps), Request = Target#http_db{ resource = "_missing_revs", method = post, @@ -165,11 +169,15 @@ get_missing_revs(#http_db{}=Target, Changes) -> end; get_missing_revs(Target, Changes) -> - Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) -> - {Id, [R || {[{<<"rev">>, R}]} <- C]} end, + Transform = fun({Props}) -> + C = proplists:get_value(<<"changes">>, Props), + Id = proplists:get_value(<<"id">>, Props), + {Id, [couch_doc:parse_rev(R) || {[{<<"rev">>, R}]} <- C]} + end, IdRevsList = [Transform(Change) || Change <- Changes], SeqDict = changes_dictionary(Changes), - {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes), + {LastProps} = lists:last(Changes), + HighSeq = proplists:get_value(<<"seq">>, LastProps), {ok, Results} = couch_db:get_missing_revs(Target, IdRevsList), {HighSeq, [{Id, dict:fetch(Id, SeqDict), Revs} || {Id, Revs, _} <- Results]}. diff --git a/test/etap/111-replication-changes-feed.t b/test/etap/111-replication-changes-feed.t index ccd64047..e92889e9 100755 --- a/test/etap/111-replication-changes-feed.t +++ b/test/etap/111-replication-changes-feed.t @@ -157,7 +157,7 @@ test_deleted_conflicts(Type) -> [Win, {[{<<"rev">>, Lose}]}] = proplists:get_value(<<"changes">>, ExpectProps), Doc = couch_doc:from_json_obj({[ {<<"_id">>, Id}, - {<<"_rev">>, couch_doc:rev_to_str(Lose)}, + {<<"_rev">>, Lose}, {<<"_deleted">>, true} ]}), Db = get_db(), @@ -167,7 +167,7 @@ test_deleted_conflicts(Type) -> Expect = {[ {<<"seq">>, get_update_seq()}, {<<"id">>, Id}, - {<<"changes">>, [Win, {[{<<"rev">>, Rev}]}]} + {<<"changes">>, [Win, {[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]} ]}, {ok, Pid} = start_changes_feed(Type, Since, false), @@ -210,7 +210,7 @@ generate_change(Id, EJson) -> {[ {<<"seq">>, get_update_seq()}, {<<"id">>, Id}, - {<<"changes">>, [{[{<<"rev">>, Rev}]}]} + {<<"changes">>, [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]} ]}. generate_conflict() -> @@ -220,9 +220,9 @@ generate_conflict() -> Doc2 = (couch_doc:from_json_obj({[<<"foo">>, <<"baz">>]}))#doc{id = Id}, {ok, Rev1} = couch_db:update_doc(Db, Doc1, [full_commit]), {ok, Rev2} = couch_db:update_doc(Db, Doc2, [full_commit, all_or_nothing]), - + %% relies on undocumented CouchDB conflict winner algo and revision sorting! - RevList = [{[{<<"rev">>, R}]} || R + RevList = [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || R <- lists:sort(fun(A,B) -> B>, get_update_seq()}, -- cgit v1.2.3