summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2010-02-26 01:11:02 +0000
committerJohn Christopher Anderson <jchris@apache.org>2010-02-26 01:11:02 +0000
commit2bdd75901fba402068d08e316a3ac32249307e27 (patch)
tree6635fd73f4d3c299f19be391664d7c2b056263c2
parent5fb1e46f115e91f2788d2ac8649106667952ddfe (diff)
fdmananas patch for filtered replication via COUCHDB-673
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@916518 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--share/www/script/test/replication.js77
-rw-r--r--src/couchdb/Makefile.am2
-rw-r--r--src/couchdb/couch_db.hrl12
-rw-r--r--src/couchdb/couch_httpd_db.erl265
-rw-r--r--src/couchdb/couch_query_servers.erl7
-rw-r--r--src/couchdb/couch_rep.erl6
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl136
-rw-r--r--src/couchdb/couch_rep_missing_revs.erl20
-rwxr-xr-xtest/etap/111-replication-changes-feed.t10
9 files changed, 270 insertions, 265 deletions
diff --git a/share/www/script/test/replication.js b/share/www/script/test/replication.js
index a5ed5110..c6f6ff61 100644
--- a/share/www/script/test/replication.js
+++ b/share/www/script/test/replication.js
@@ -377,4 +377,81 @@ couchTests.replication = function(debug) {
T(docFoo666 === null);
}
+ // test filtered replication
+
+ var sourceDb = new CouchDB(
+ "test_suite_filtered_rep_db_a", {"X-Couch-Full-Commit":"false"}
+ );
+
+ sourceDb.deleteDb();
+ sourceDb.createDb();
+
+ T(sourceDb.save({_id:"foo1",value:1}).ok);
+ T(sourceDb.save({_id:"foo2",value:2}).ok);
+ T(sourceDb.save({_id:"foo3",value:3}).ok);
+ T(sourceDb.save({_id:"foo4",value:4}).ok);
+ T(sourceDb.save({
+ "_id": "_design/mydesign",
+ "language" : "javascript",
+ "filters" : {
+ "myfilter" : (function(doc, req) {
+ if (doc.value < Number(req.query.maxvalue)) {
+ return true;
+ } else {
+ return false;
+ }
+ }).toString()
+ }
+ }).ok);
+
+ var dbPairs = [
+ {source:"test_suite_filtered_rep_db_a",
+ target:"test_suite_filtered_rep_db_b"},
+ {source:"test_suite_filtered_rep_db_a",
+ target:"http://" + host + "/test_suite_filtered_rep_db_b"},
+ {source:"http://" + host + "/test_suite_filtered_rep_db_a",
+ target:"test_suite_filtered_rep_db_b"},
+ {source:"http://" + host + "/test_suite_filtered_rep_db_a",
+ target:"http://" + host + "/test_suite_filtered_rep_db_b"}
+ ];
+
+ for (var i = 0; i < dbPairs.length; i++) {
+ var targetDb = new CouchDB("test_suite_filtered_rep_db_b");
+ targetDb.deleteDb();
+ targetDb.createDb();
+
+ var dbA = dbPairs[i].source;
+ var dbB = dbPairs[i].target;
+
+ var repResult = CouchDB.replicate(dbA, dbB, {
+ body: {
+ "filter" : "mydesign/myfilter",
+ "query_params" : {
+ "maxvalue": "3"
+ }
+ }
+ });
+
+ T(repResult.ok);
+ T($.isArray(repResult.history));
+ T(repResult.history.length === 1);
+ T(repResult.history[0].docs_written === 2);
+ T(repResult.history[0].docs_read === 2);
+ T(repResult.history[0].doc_write_failures === 0);
+
+ var docFoo1 = targetDb.open("foo1");
+ T(docFoo1 !== null);
+ T(docFoo1.value === 1);
+
+ var docFoo2 = targetDb.open("foo2");
+ T(docFoo2 !== null);
+ T(docFoo2.value === 2);
+
+ var docFoo3 = targetDb.open("foo3");
+ T(docFoo3 === null);
+
+ var docFoo4 = targetDb.open("foo4");
+ T(docFoo4 === null);
+ }
+
};
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 9c7e9d4d..b5c84f33 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -30,6 +30,7 @@ source_files = \
couch.erl \
couch_app.erl \
couch_btree.erl \
+ couch_changes.erl \
couch_config.erl \
couch_config_writer.erl \
couch_db.erl \
@@ -86,6 +87,7 @@ compiled_files = \
couch.beam \
couch_app.beam \
couch_btree.beam \
+ couch_changes.beam \
couch_config.beam \
couch_config_writer.beam \
couch_db.beam \
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index f74bade7..bb6b4f73 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -264,3 +264,15 @@
% small value used in revision trees to indicate the revision isn't stored
-define(REV_MISSING, []).
+
+-record(changes_args, {
+ feed = "normal",
+ dir = fwd,
+ since = 0,
+ limit = 1000000000000000,
+ style = main_only,
+ heartbeat,
+ timeout,
+ filter = "",
+ include_docs = false
+}).
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 9ad34752..1028e857 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -55,200 +55,62 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method,
do_db_req(Req, Handler)
end.
-get_changes_timeout(Req, Resp) ->
- DefaultTimeout = list_to_integer(
- couch_config:get("httpd", "changes_timeout", "60000")),
- case couch_httpd:qs_value(Req, "heartbeat") of
- undefined ->
- case couch_httpd:qs_value(Req, "timeout") of
- undefined ->
- {DefaultTimeout, fun() -> stop end};
- TimeoutList ->
- {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
- fun() -> stop end}
- end;
- "true" ->
- {DefaultTimeout, fun() -> send_chunk(Resp, "\n"), ok end};
- TimeoutList ->
- {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
- fun() -> send_chunk(Resp, "\n"), ok end}
- end.
-
-
-start_sending_changes(_Resp, "continuous") ->
- ok;
-start_sending_changes(Resp, _Else) ->
- send_chunk(Resp, "{\"results\":[\n").
-
-handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) ->
- FilterFun = make_filter_fun(Req, Db),
- {ok, Info} = couch_db:get_db_info(Db),
- Seq = proplists:get_value(update_seq, Info),
- {Dir, StartSeq} = case couch_httpd:qs_value(Req, "descending", "false") of
- "false" ->
- {fwd, list_to_integer(couch_httpd:qs_value(Req, "since", "0"))};
- "true" ->
- {rev, Seq};
- _Bad -> throw({bad_request, "descending must be true or false"})
+handle_changes_req(#httpd{method='GET'}=Req, Db) ->
+ MakeCallback = fun(Resp) ->
+ fun({change, Change, _}, "continuous") ->
+ send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]);
+ ({change, Change, Prepend}, _) ->
+ send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]);
+ (start, "continuous") ->
+ ok;
+ (start, _) ->
+ send_chunk(Resp, "{\"results\":[\n");
+ ({stop, EndSeq}, "continuous") ->
+ send_chunk(
+ Resp,
+ [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]
+ ),
+ end_json_response(Resp);
+ ({stop, EndSeq}, _) ->
+ send_chunk(
+ Resp,
+ io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])
+ ),
+ end_json_response(Resp);
+ (timeout, _) ->
+ send_chunk(Resp, "\n")
+ end
end,
- Limit = list_to_integer(couch_httpd:qs_value(Req, "limit", "1000000000000000")),
- ResponseType = couch_httpd:qs_value(Req, "feed", "normal"),
- if ResponseType == "continuous" orelse ResponseType == "longpoll" ->
- {ok, Resp} = start_json_response(Req, 200),
- start_sending_changes(Resp, ResponseType),
-
- Self = self(),
- {ok, Notify} = couch_db_update_notifier:start_link(
- fun({_, DbName0}) when DbName0 == DbName ->
- Self ! db_updated;
- (_) ->
- ok
- end),
- {Timeout, TimeoutFun} = get_changes_timeout(Req, Resp),
- couch_stats_collector:track_process_count(Self,
- {httpd, clients_requesting_changes}),
- try
- keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>, Timeout,
- TimeoutFun, ResponseType, Limit, FilterFun)
- after
- couch_db_update_notifier:stop(Notify),
- get_rest_db_updated() % clean out any remaining update messages
- end;
- true ->
+ ChangesArgs = parse_changes_query(Req),
+ ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db),
+ WrapperFun = case ChangesArgs#changes_args.feed of
+ "normal" ->
+ {ok, Info} = couch_db:get_db_info(Db),
CurrentEtag = couch_httpd:make_etag(Info),
- couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
- % send the etag
- {ok, Resp} = start_json_response(Req, 200, [{"Etag", CurrentEtag}]),
- start_sending_changes(Resp, ResponseType),
- {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} =
- send_changes(Req, Resp, Db, Dir, StartSeq, <<"">>, "normal",
- Limit, FilterFun),
- end_sending_changes(Resp, LastSeq, ResponseType)
- end)
- end;
+ fun(FeedChangesFun) ->
+ couch_httpd:etag_respond(
+ Req,
+ CurrentEtag,
+ fun() ->
+ {ok, Resp} = couch_httpd:start_json_response(
+ Req, 200, [{"Etag", CurrentEtag}]
+ ),
+ FeedChangesFun(MakeCallback(Resp))
+ end
+ )
+ end;
+ _ ->
+ % "longpoll" or "continuous"
+ {ok, Resp} = couch_httpd:start_json_response(Req, 200),
+ fun(FeedChangesFun) ->
+ FeedChangesFun(MakeCallback(Resp))
+ end
+ end,
+ WrapperFun(ChangesFun);
handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) ->
send_method_not_allowed(Req, "GET,HEAD").
-% waits for a db_updated msg, if there are multiple msgs, collects them.
-wait_db_updated(Timeout, TimeoutFun) ->
- receive db_updated -> get_rest_db_updated()
- after Timeout ->
- case TimeoutFun() of
- ok -> wait_db_updated(Timeout, TimeoutFun);
- stop -> stop
- end
- end.
-
-get_rest_db_updated() ->
- receive db_updated -> get_rest_db_updated()
- after 0 -> updated
- end.
-
-end_sending_changes(Resp, EndSeq, "continuous") ->
- send_chunk(Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]),
- end_json_response(Resp);
-end_sending_changes(Resp, EndSeq, _Else) ->
- send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])),
- end_json_response(Resp).
-
-keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp,
- Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType, Limit, Filter) ->
- {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(Req, Resp, Db, fwd, StartSeq,
- Prepend, ResponseType, Limit, Filter),
- couch_db:close(Db),
- if
- Limit > NewLimit, ResponseType == "longpoll" ->
- end_sending_changes(Resp, EndSeq, ResponseType);
- true ->
- case wait_db_updated(Timeout, TimeoutFun) of
- updated ->
- case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
- {ok, Db2} ->
- keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout,
- TimeoutFun, ResponseType, NewLimit, Filter);
- _Else ->
- end_sending_changes(Resp, EndSeq, ResponseType)
- end;
- stop ->
- end_sending_changes(Resp, EndSeq, ResponseType)
- end
- end.
-
-changes_enumerator(DocInfos, {Db, _, _, FilterFun, Resp, "continuous", Limit, IncludeDocs}) ->
- [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos,
- Results0 = FilterFun(DocInfos),
- Results = [Result || Result <- Results0, Result /= null],
- Go = if Limit =< 1 -> stop; true -> ok end,
- case Results of
- [] ->
- {Go, {Db, Seq, nil, FilterFun, Resp, "continuous", Limit, IncludeDocs}};
- _ ->
- send_chunk(Resp, [?JSON_ENCODE(changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs))
- |"\n"]),
- {Go, {Db, Seq, nil, FilterFun, Resp, "continuous", Limit-1, IncludeDocs}}
- end;
-changes_enumerator(DocInfos, {Db, _, Prepend, FilterFun, Resp, _, Limit, IncludeDocs}) ->
- [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos,
- Results0 = FilterFun(DocInfos),
- Results = [Result || Result <- Results0, Result /= null],
- Go = if Limit =< 1 -> stop; true -> ok end,
- case Results of
- [] ->
- {Go, {Db, Seq, Prepend, FilterFun, Resp, nil, Limit, IncludeDocs}};
- _ ->
- send_chunk(Resp, [Prepend, ?JSON_ENCODE(
- changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs))]),
- {Go, {Db, Seq, <<",\n">>, FilterFun, Resp, nil, Limit-1, IncludeDocs}}
- end.
-
-changes_row(Db, Seq, Id, Del, Results, Rev, true) ->
- {[{seq,Seq},{id,Id},{changes,Results}] ++ deleted_item(Del) ++
- couch_httpd_view:doc_member(Db, {Id, Rev})};
-changes_row(_, Seq, Id, Del, Results, _, false) ->
- {[{seq,Seq},{id,Id},{changes,Results}] ++ deleted_item(Del)}.
-
-deleted_item(true) -> [{deleted,true}];
-deleted_item(_) -> [].
-
-send_changes(Req, Resp, Db, Dir, StartSeq, Prepend, ResponseType, Limit, FilterFun) ->
- Style = list_to_existing_atom(
- couch_httpd:qs_value(Req, "style", "main_only")),
- IncludeDocs = list_to_existing_atom(
- couch_httpd:qs_value(Req, "include_docs", "false")),
- couch_db:changes_since(Db, Style, StartSeq, fun changes_enumerator/2,
- [{dir, Dir}], {Db, StartSeq, Prepend, FilterFun, Resp, ResponseType, Limit, IncludeDocs}).
-
-make_filter_fun(Req, Db) ->
- Filter = couch_httpd:qs_value(Req, "filter", ""),
- case [list_to_binary(couch_httpd:unquote(Part))
- || Part <- string:tokens(Filter, "/")] of
- [] ->
- fun(DocInfos) ->
- % doing this as a batch is more efficient for external filters
- [{[{rev, couch_doc:rev_to_str(Rev)}]} ||
- #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos]
- end;
- [DName, FName] ->
- DesignId = <<"_design/", DName/binary>>,
- DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
- % validate that the ddoc has the filter fun
- #doc{body={Props}} = DDoc,
- couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
- fun(DocInfos) ->
- Docs = [Doc || {ok, Doc} <- [
- {ok, _Doc} = couch_db:open_doc(Db, DInfo, [deleted, conflicts])
- || DInfo <- DocInfos]],
- {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
- [{[{rev, couch_doc:rev_to_str(Rev)}]}
- || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos,
- Pass <- Passes, Pass == true]
- end;
- _Else ->
- throw({bad_request,
- "filter parameter must be of the form `designname/filtername`"})
- end.
-
handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, _Db) ->
ok = couch_view_compactor:start_compact(DbName, Id),
send_json(Req, 202, {[{ok, true}]});
@@ -1188,6 +1050,33 @@ parse_doc_query(Req) ->
end
end, #doc_query_args{}, couch_httpd:qs(Req)).
+parse_changes_query(Req) ->
+ lists:foldl(fun({Key, Value}, Args) ->
+ case {Key, Value} of
+ {"feed", _} ->
+ Args#changes_args{feed=Value};
+ {"descending", "true"} ->
+ Args#changes_args{dir=rev};
+ {"since", _} ->
+ Args#changes_args{since=list_to_integer(Value)};
+ {"limit", _} ->
+ Args#changes_args{limit=list_to_integer(Value)};
+ {"style", _} ->
+ Args#changes_args{style=list_to_existing_atom(Value)};
+ {"heartbeat", "true"} ->
+ Args#changes_args{heartbeat=true};
+ {"heartbeat", _} ->
+ Args#changes_args{heartbeat=list_to_integer(Value)};
+ {"timeout", _} ->
+ Args#changes_args{timeout=list_to_integer(Value)};
+ {"include_docs", "true"} ->
+ Args#changes_args{include_docs=true};
+ {"filter", _} ->
+ Args#changes_args{filter=Value};
+ _Else -> % unknown key value pair, ignore.
+ Args
+ end
+ end, #changes_args{}, couch_httpd:qs(Req)).
extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)->
extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev));
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index a90681c5..3095b199 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -183,7 +183,12 @@ json_doc(Doc) ->
couch_doc:to_json_obj(Doc, [revs]).
filter_docs(Req, Db, DDoc, FName, Docs) ->
- JsonReq = couch_httpd_external:json_req_obj(Req, Db),
+ JsonReq = case Req of
+ {json_req, JsonObj} ->
+ JsonObj;
+ #httpd{} = HttpReq ->
+ couch_httpd_external:json_req_obj(HttpReq, Db)
+ end,
JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs],
JsonCtx = couch_util:json_user_ctx(Db),
[true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName], [JsonDocs, JsonReq, JsonCtx]),
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 4bad4a9f..c0010a94 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -451,7 +451,11 @@ make_replication_id({Props}, UserCtx) ->
% Port = mochiweb_socket_server:get(couch_httpd, port),
Src = get_rep_endpoint(UserCtx, proplists:get_value(<<"source">>, Props)),
Tgt = get_rep_endpoint(UserCtx, proplists:get_value(<<"target">>, Props)),
- Base = couch_util:to_hex(erlang:md5(term_to_binary([HostName, Src, Tgt]))),
+ Filter = proplists:get_value(<<"filter">>, Props),
+ QueryParams = proplists:get_value(<<"query_params">>, Props),
+ Base = couch_util:to_hex(erlang:md5(
+ term_to_binary([HostName, Src, Tgt, Filter, QueryParams])
+ )),
Extension = maybe_append_options(
[<<"continuous">>, <<"create_target">>], Props),
{Base, Extension}.
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index cdfed6a0..67cfabe4 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -53,11 +53,35 @@ init([_Parent, #http_db{}=Source, Since, PostProps] = Args) ->
true ->
continuous
end,
+ BaseQS = [
+ {"style", all_docs},
+ {"heartbeat", 10000},
+ {"since", Since},
+ {"feed", Feed}
+ ],
+ QS = case proplists:get_value(<<"filter">>, PostProps) of
+ undefined ->
+ BaseQS;
+ FilterName ->
+ {Params} = proplists:get_value(<<"query_params">>, PostProps, {[]}),
+ lists:foldr(
+ fun({K, V}, QSAcc) ->
+ Ks = couch_util:to_list(K),
+ case proplists:is_defined(Ks, QSAcc) of
+ true ->
+ QSAcc;
+ false ->
+ [{Ks, V} | QSAcc]
+ end
+ end,
+ [{"filter", FilterName} | BaseQS],
+ Params
+ )
+ end,
Pid = couch_rep_httpc:spawn_link_worker_process(Source),
Req = Source#http_db{
resource = "_changes",
- qs = [{style, all_docs}, {heartbeat, 10000}, {since, Since},
- {feed, Feed}],
+ qs = QS,
conn = Pid,
options = [{stream_to, {self(), once}}, {response_format, binary},
{inactivity_timeout, 31000}], % miss 3 heartbeats, assume death
@@ -94,20 +118,54 @@ init([_Parent, #http_db{}=Source, Since, PostProps] = Args) ->
init([_Parent, Source, Since, PostProps] = InitArgs) ->
process_flag(trap_exit, true),
Server = self(),
- ChangesPid =
- case proplists:get_value(<<"continuous">>, PostProps, false) of
- false ->
- spawn_link(fun() -> send_local_changes_once(Server, Source, Since) end);
- true ->
- spawn_link(fun() ->
- Self = self(),
- {ok, _} = couch_db_update_notifier:start_link(fun(Msg) ->
- local_update_notification(Self, Source#db.name, Msg) end),
- send_local_changes_forever(Server, Source, Since)
+ ChangesArgs = #changes_args{
+ style = all_docs,
+ since = Since,
+ filter = ?b2l(proplists:get_value(<<"filter">>, PostProps, <<>>)),
+ feed = case proplists:get_value(<<"continuous">>, PostProps, false) of
+ true ->
+ "continuous";
+ false ->
+ "normal"
+ end
+ },
+ ChangesPid = spawn_link(fun() ->
+ ChangesFeedFun = couch_changes:handle_changes(
+ ChangesArgs,
+ {json_req, filter_json_req(Source, PostProps)},
+ Source
+ ),
+ ChangesFeedFun(fun({change, Change, _}, _) ->
+ gen_server:call(Server, {add_change, Change}, infinity);
+ (_, _) ->
+ ok
end)
- end,
+ end),
{ok, #state{changes_loop=ChangesPid, init_args=InitArgs}}.
+filter_json_req(Db, PostProps) ->
+ case proplists:get_value(<<"filter">>, PostProps) of
+ undefined ->
+ {[]};
+ FilterName ->
+ {Query} = proplists:get_value(<<"query_params">>, PostProps, {[]}),
+ {ok, Info} = couch_db:get_db_info(Db),
+ % simulate a request to db_name/_changes
+ {[
+ {<<"info">>, {Info}},
+ {<<"id">>, null},
+ {<<"method">>, 'GET'},
+ {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
+ {<<"query">>, {[{<<"filter">>, FilterName} | Query]}},
+ {<<"headers">>, []},
+ {<<"body">>, []},
+ {<<"peer">>, <<"replicator">>},
+ {<<"form">>, []},
+ {<<"cookie">>, []},
+ {<<"userCtx">>, couch_util:json_user_ctx(Db)}
+ ]}
+ end.
+
handle_call({add_change, Row}, From, State) ->
handle_add_change(Row, From, State);
@@ -302,23 +360,7 @@ by_seq_loop(Server, Source, StartSeq) ->
decode_row(<<",", Rest/binary>>) ->
decode_row(Rest);
decode_row(Row) ->
- {Props} = ?JSON_DECODE(Row),
- % [Seq, Id, {<<"changes">>,C}]
- Seq = proplists:get_value(<<"seq">>, Props),
- Id = proplists:get_value(<<"id">>, Props),
- C = proplists:get_value(<<"changes">>, Props),
- C2 = [{[{<<"rev">>,couch_doc:parse_rev(R)}]} || {[{<<"rev">>,R}]} <- C],
- {[{<<"seq">>, Seq}, {<<"id">>,Id}, {<<"changes">>,C2}]}.
-
-flush_updated_messages() ->
- receive updated -> flush_updated_messages()
- after 0 -> ok
- end.
-
-local_update_notification(Self, DbName, {updated, DbName}) ->
- Self ! updated;
-local_update_notification(_, _, _) ->
- ok.
+ ?JSON_DECODE(Row).
maybe_stream_next(#state{reqid=nil}) ->
ok;
@@ -327,35 +369,6 @@ maybe_stream_next(#state{complete=false, count=N} = S) when N < ?BUFFER_SIZE ->
maybe_stream_next(_) ->
ok.
-send_local_changes_forever(Server, Db, Since) ->
- #db{name = DbName, user_ctx = UserCtx} = Db,
- {ok, NewSeq} = send_local_changes_once(Server, Db, Since),
- couch_db:close(Db),
- ok = wait_db_updated(),
- {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
- send_local_changes_forever(Server, NewDb, NewSeq).
-
-send_local_changes_once(Server, Db, Since) ->
- FilterFun =
- fun(#doc_info{revs=[#rev_info{rev=Rev}|_]}) ->
- {[{<<"rev">>, Rev}]}
- end,
-
- ChangesFun =
- fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, _) ->
- Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos],
- Results = [Result || Result <- Results0, Result /= null],
- if Results /= [] ->
- Change = {[{<<"seq">>,Seq}, {<<"id">>,Id}, {<<"changes">>,Results}]},
- gen_server:call(Server, {add_change, Change}, infinity);
- true ->
- ok
- end,
- {ok, Seq}
- end,
-
- couch_db:changes_since(Db, all_docs, Since, ChangesFun, Since).
-
start_http_request(RawUrl) ->
Url = ibrowse_lib:parse_url(RawUrl),
{ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
@@ -367,8 +380,3 @@ start_http_request(RawUrl) ->
{ibrowse_req_id, Id} =
ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity),
{Pid, Id}.
-
-wait_db_updated() ->
- receive updated ->
- flush_updated_messages()
- end.
diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl
index 5790dd71..7b4956e2 100644
--- a/src/couchdb/couch_rep_missing_revs.erl
+++ b/src/couchdb/couch_rep_missing_revs.erl
@@ -144,11 +144,15 @@ changes_loop(OurServer, SourceChangesServer, Target) ->
changes_loop(OurServer, SourceChangesServer, Target).
get_missing_revs(#http_db{}=Target, Changes) ->
- Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) ->
- {Id, [couch_doc:rev_to_str(R) || {[{<<"rev">>, R}]} <- C]} end,
+ Transform = fun({Props}) ->
+ C = proplists:get_value(<<"changes">>, Props),
+ Id = proplists:get_value(<<"id">>, Props),
+ {Id, [R || {[{<<"rev">>, R}]} <- C]}
+ end,
IdRevsList = [Transform(Change) || Change <- Changes],
SeqDict = changes_dictionary(Changes),
- {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes),
+ {LastProps} = lists:last(Changes),
+ HighSeq = proplists:get_value(<<"seq">>, LastProps),
Request = Target#http_db{
resource = "_missing_revs",
method = post,
@@ -165,11 +169,15 @@ get_missing_revs(#http_db{}=Target, Changes) ->
end;
get_missing_revs(Target, Changes) ->
- Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) ->
- {Id, [R || {[{<<"rev">>, R}]} <- C]} end,
+ Transform = fun({Props}) ->
+ C = proplists:get_value(<<"changes">>, Props),
+ Id = proplists:get_value(<<"id">>, Props),
+ {Id, [couch_doc:parse_rev(R) || {[{<<"rev">>, R}]} <- C]}
+ end,
IdRevsList = [Transform(Change) || Change <- Changes],
SeqDict = changes_dictionary(Changes),
- {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes),
+ {LastProps} = lists:last(Changes),
+ HighSeq = proplists:get_value(<<"seq">>, LastProps),
{ok, Results} = couch_db:get_missing_revs(Target, IdRevsList),
{HighSeq, [{Id, dict:fetch(Id, SeqDict), Revs} || {Id, Revs, _} <- Results]}.
diff --git a/test/etap/111-replication-changes-feed.t b/test/etap/111-replication-changes-feed.t
index ccd64047..e92889e9 100755
--- a/test/etap/111-replication-changes-feed.t
+++ b/test/etap/111-replication-changes-feed.t
@@ -157,7 +157,7 @@ test_deleted_conflicts(Type) ->
[Win, {[{<<"rev">>, Lose}]}] = proplists:get_value(<<"changes">>, ExpectProps),
Doc = couch_doc:from_json_obj({[
{<<"_id">>, Id},
- {<<"_rev">>, couch_doc:rev_to_str(Lose)},
+ {<<"_rev">>, Lose},
{<<"_deleted">>, true}
]}),
Db = get_db(),
@@ -167,7 +167,7 @@ test_deleted_conflicts(Type) ->
Expect = {[
{<<"seq">>, get_update_seq()},
{<<"id">>, Id},
- {<<"changes">>, [Win, {[{<<"rev">>, Rev}]}]}
+ {<<"changes">>, [Win, {[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]}
]},
{ok, Pid} = start_changes_feed(Type, Since, false),
@@ -210,7 +210,7 @@ generate_change(Id, EJson) ->
{[
{<<"seq">>, get_update_seq()},
{<<"id">>, Id},
- {<<"changes">>, [{[{<<"rev">>, Rev}]}]}
+ {<<"changes">>, [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]}
]}.
generate_conflict() ->
@@ -220,9 +220,9 @@ generate_conflict() ->
Doc2 = (couch_doc:from_json_obj({[<<"foo">>, <<"baz">>]}))#doc{id = Id},
{ok, Rev1} = couch_db:update_doc(Db, Doc1, [full_commit]),
{ok, Rev2} = couch_db:update_doc(Db, Doc2, [full_commit, all_or_nothing]),
-
+
%% relies on undocumented CouchDB conflict winner algo and revision sorting!
- RevList = [{[{<<"rev">>, R}]} || R
+ RevList = [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || R
<- lists:sort(fun(A,B) -> B<A end, [Rev1,Rev2])],
{[
{<<"seq">>, get_update_seq()},