summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/Makefile.am2
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl174
-rw-r--r--test/etap/110-replication-changes-feed.t185
3 files changed, 361 insertions, 0 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 4b4f4ab2..80f4b541 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -72,6 +72,7 @@ source_files = \
couch_query_servers.erl \
couch_ref_counter.erl \
couch_rep.erl \
+ couch_rep_changes_feed.erl \
couch_rep_sup.erl \
couch_server.erl \
couch_server_sup.erl \
@@ -116,6 +117,7 @@ compiled_files = \
couch_query_servers.beam \
couch_ref_counter.beam \
couch_rep.beam \
+ couch_rep_changes_feed.beam \
couch_rep_sup.beam \
couch_server.beam \
couch_server_sup.beam \
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
new file mode 100644
index 00000000..bf69e100
--- /dev/null
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -0,0 +1,174 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_changes_feed).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start/2, start_link/2, next/1, stop/1]).
+
+-define(MIN_BUFFER_SIZE, 100).
+
+-include("couch_db.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-record (state, {
+ conn = nil,
+ reqid = nil,
+ count = 0,
+ reply_to = nil,
+ rows = queue:new(),
+ complete = false
+}).
+
+start(Url, Options) ->
+ gen_server:start(?MODULE, [Url, Options], []).
+
+start_link(Url, Options) ->
+ gen_server:start_link(?MODULE, [Url, Options], []).
+
+next(Server) ->
+ ?LOG_DEBUG("~p at ~p received next_change call", [?MODULE, Server]),
+ gen_server:call(Server, next_change, infinity).
+
+stop(Server) ->
+ gen_server:call(Server, stop).
+
+init([{remote, Url}, Options]) ->
+ Since = proplists:get_value(since, Options, 0),
+ Continuous = proplists:get_value(continuous, Options, false),
+ {Pid, ReqId} = start_http_request(lists:concat([Url, "/_changes",
+ "?style=all_docs", "&since=", Since, "&continuous=", Continuous])),
+ {ok, #state{conn=Pid, reqid=ReqId}};
+
+init([{local, DbName}, Options]) ->
+ init([{remote, "http://" ++ couch_config:get("httpd", "bind_address") ++ ":"
+ ++ couch_config:get("httpd", "port") ++ "/" ++ DbName}, Options]).
+
+handle_call(next_change, From, State) ->
+ #state{
+ reqid = Id,
+ complete = Complete,
+ count = Count,
+ rows = Rows
+ } = State,
+
+ ok = maybe_stream_next(Complete, Count, Id),
+
+ case queue:out(Rows) of
+ {{value, Row}, NewRows} ->
+ {reply, Row, State#state{count=Count-1, rows=NewRows}};
+ {empty, Rows} ->
+ if State#state.complete ->
+ {stop, normal, complete, State};
+ % State#state.waiting_on_headers ->
+ % {noreply, State#state{reply_to=From}};
+ true ->
+ {noreply, State#state{reply_to=From}}
+ end
+ end;
+
+handle_call(stop, _From, State) ->
+ catch ibrowse:stop_worker_process(State#state.conn),
+ {stop, normal, ok, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({ibrowse_async_headers, Id, "200", _}, #state{reqid=Id}=State) ->
+ #state{
+ complete = Complete,
+ count = Count
+ } = State,
+ ?LOG_DEBUG("~p reqid ~p ibrowse_async_headers 200", [?MODULE, Id]),
+ ok = maybe_stream_next(Complete, Count, Id),
+ {noreply, State};
+handle_info({ibrowse_async_headers, Id, "301", Hdrs}, #state{reqid=Id}=State) ->
+ ?LOG_DEBUG("~p reqid ~p ibrowse_async_headers 301", [?MODULE, Id]),
+ catch ibrowse:stop_worker_process(State#state.conn),
+ Url = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
+ {Pid, ReqId} = start_http_request(Url),
+ {noreply, State#state{conn=Pid, reqid=ReqId}};
+handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) ->
+ ?LOG_ERROR("replicator changes feed failed with code ~s and Headers ~n~p",
+ [Code,Hdrs]),
+ {stop, {error, list_to_integer(Code)}, State};
+
+handle_info({ibrowse_async_response, Id, Msg}, #state{reqid=Id} = State) ->
+ ?LOG_DEBUG("~p reqid ~p ibrowse_async_response ~p", [?MODULE, Id, Msg]),
+ #state{
+ complete = Complete,
+ count = Count,
+ rows = Rows
+ } = State,
+ try
+ Row = decode_row(Msg),
+ case State of
+ #state{reply_to=nil} ->
+ {noreply, State#state{count=Count+1, rows = queue:in(Row, Rows)}};
+ #state{count=0, reply_to=From}->
+ gen_server:reply(From, Row),
+ {noreply, State#state{reply_to=nil}}
+ end
+ catch
+ throw:{invalid_json, Msg} ->
+ ?LOG_DEBUG("got invalid_json ~p", [Msg]),
+ ok = maybe_stream_next(Complete, Count, Id),
+ {noreply, State}
+ end;
+
+handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) ->
+ ?LOG_DEBUG("got ibrowse_async_response_end ~p", [State#state.reply_to]),
+ case State of
+ #state{reply_to=nil} ->
+ {noreply, State#state{complete=true}};
+ #state{count=0, reply_to=From}->
+ gen_server:reply(From, complete),
+ {stop, normal, State}
+ end;
+
+handle_info(Msg, State) ->
+ ?LOG_INFO("unexpected message ~p", [Msg]),
+ {noreply, State}.
+
+terminate(_Reason, #state{conn=Pid}) when is_pid(Pid) ->
+ catch ibrowse:stop_worker_process(Pid),
+ ok;
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+decode_row([$,, $\n | Rest]) ->
+ decode_row(Rest);
+decode_row(Row) ->
+ ?JSON_DECODE(Row).
+
+maybe_stream_next(false, Count, Id) when Count < ?MIN_BUFFER_SIZE ->
+ ?LOG_DEBUG("~p reqid ~p streaming next chunk", [?MODULE, Id]),
+ ibrowse:stream_next(Id);
+maybe_stream_next(_Complete, _Count, Id) ->
+ ?LOG_DEBUG("~p reqid ~p not streaming", [?MODULE, Id]),
+ ok.
+
+start_http_request(RawUrl) ->
+ Url = ibrowse_lib:parse_url(RawUrl),
+ {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
+ Opts = [
+ {stream_to, {self(), once}},
+ {inactivity_timeout, 30000}
+ ],
+ {ibrowse_req_id, Id} =
+ ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity),
+ {Pid, Id}.
diff --git a/test/etap/110-replication-changes-feed.t b/test/etap/110-replication-changes-feed.t
new file mode 100644
index 00000000..467e43dd
--- /dev/null
+++ b/test/etap/110-replication-changes-feed.t
@@ -0,0 +1,185 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% XXX: Figure out how to -include("couch_db.hrl")
+-record(doc, {id= <<"">>, revs={0, []}, body={[]},
+ attachments=[], deleted=false, meta=[]}).
+
+main(_) ->
+ code:add_pathz("src/couchdb"),
+ code:add_pathz("src/ibrowse"),
+ code:add_pathz("src/mochiweb"),
+
+ etap:plan(6),
+ case (catch test()) of
+ ok ->
+ etap:end_tests();
+ Other ->
+ etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+ etap:bail(Other)
+ end,
+ ok.
+
+test() ->
+ couch_server:start(
+ ["etc/couchdb/default_dev.ini", "etc/couchdb/local_dev.ini"]
+ ),
+ ibrowse:start(),
+ crypto:start(),
+
+ couch_server:delete(<<"etap-test-db">>, []),
+ {ok, Db} = couch_db:create(<<"etap-test-db">>, []),
+
+ test_unchanged_db(),
+ test_simple_change(),
+ test_since_parameter(),
+ test_continuous_parameter(),
+ test_conflicts(),
+ test_deleted_conflicts(),
+
+ couch_db:close(Db),
+ couch_server:delete(<<"etap-test-db">>, []),
+ ok.
+
+test_unchanged_db() ->
+ {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"}, []),
+ etap:is(
+ couch_rep_changes_feed:next(Pid),
+ complete,
+ "changes feed for unchanged DB is automatically complete"
+ ).
+
+test_simple_change() ->
+ Expect = generate_change(),
+ {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"}, []),
+ etap:is(
+ {couch_rep_changes_feed:next(Pid), couch_rep_changes_feed:next(Pid)},
+ {Expect, complete},
+ "change one document, get one row"
+ ).
+
+test_since_parameter() ->
+ {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"},
+ [{since, get_update_seq()}]),
+ etap:is(
+ couch_rep_changes_feed:next(Pid),
+ complete,
+ "since query-string parameter allows us to skip changes"
+ ).
+
+test_continuous_parameter() ->
+ {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"},
+ [{since, get_update_seq()}, {continuous, true}]),
+
+ % make the changes_feed request before the next update
+ Self = self(),
+ spawn(fun() ->
+ Change = couch_rep_changes_feed:next(Pid),
+ Self ! {actual, Change}
+ end),
+
+ Expect = generate_change(),
+ etap:is(
+ receive {actual, Actual} -> Actual end,
+ Expect,
+ "continuous query-string parameter picks up new changes"
+ ),
+
+ ok = couch_rep_changes_feed:stop(Pid).
+
+test_conflicts() ->
+ Since = get_update_seq(),
+ Expect = generate_conflict(),
+ {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"},
+ [{since, Since}]),
+ etap:is(
+ {couch_rep_changes_feed:next(Pid), couch_rep_changes_feed:next(Pid)},
+ {Expect, complete},
+ "conflict revisions show up in feed"
+ ).
+
+test_deleted_conflicts() ->
+ Since = get_update_seq(),
+ {ExpectProps} = generate_conflict(),
+
+ %% delete the conflict revision
+ Id = proplists:get_value(<<"id">>, ExpectProps),
+ [Win, {[{<<"rev">>, Lose}]}] = proplists:get_value(<<"changes">>, ExpectProps),
+ Doc = couch_doc:from_json_obj({[
+ {<<"_id">>, Id},
+ {<<"_rev">>, Lose},
+ {<<"_deleted">>, true}
+ ]}),
+ Db = get_db(),
+ {ok, Rev} = couch_db:update_doc(Db, Doc, [full_commit]),
+ couch_db:close(Db),
+
+ Expect = {[
+ {<<"seq">>, get_update_seq()},
+ {<<"id">>, Id},
+ {<<"changes">>, [Win, {[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]}
+ ]},
+
+ {ok, Pid} = couch_rep_changes_feed:start({local, "etap-test-db"},
+ [{since, Since}]),
+ etap:is(
+ {couch_rep_changes_feed:next(Pid), couch_rep_changes_feed:next(Pid)},
+ {Expect, complete},
+ "deleted conflict revisions show up in feed"
+ ).
+
+generate_change() ->
+ generate_change(couch_util:new_uuid()).
+
+generate_change(Id) ->
+ generate_change(Id, {[]}).
+
+generate_change(Id, EJson) ->
+ Doc = couch_doc:from_json_obj(EJson),
+ Db = get_db(),
+ {ok, Rev} = couch_db:update_doc(Db, Doc#doc{id = Id}, [full_commit]),
+ couch_db:close(Db),
+ {[
+ {<<"seq">>, get_update_seq()},
+ {<<"id">>, Id},
+ {<<"changes">>, [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]}
+ ]}.
+
+generate_conflict() ->
+ Id = couch_util:new_uuid(),
+ Db = get_db(),
+ Doc1 = (couch_doc:from_json_obj({[<<"foo">>, <<"bar">>]}))#doc{id = Id},
+ Doc2 = (couch_doc:from_json_obj({[<<"foo">>, <<"baz">>]}))#doc{id = Id},
+ {ok, Rev1} = couch_db:update_doc(Db, Doc1, [full_commit]),
+ {ok, Rev2} = couch_db:update_doc(Db, Doc2, [full_commit, all_or_nothing]),
+
+ %% relies on undocumented CouchDB conflict winner algo and revision sorting!
+ RevList = [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || R
+ <- lists:sort(fun(A,B) -> B<A end, [Rev1,Rev2])],
+ {[
+ {<<"seq">>, get_update_seq()},
+ {<<"id">>, Id},
+ {<<"changes">>, RevList}
+ ]}.
+
+get_db() ->
+ {ok, Db} = couch_db:open(<<"etap-test-db">>, []),
+ Db.
+
+get_update_seq() ->
+ Db = get_db(),
+ Seq = couch_db:get_update_seq(Db),
+ couch_db:close(Db),
+ Seq.