summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl427
1 files changed, 211 insertions, 216 deletions
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index 3de178a5..7c2d05b0 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -15,17 +15,21 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([start/2, start_link/2, all/1, next/1, stop/1]).
+-export([start_link/4, next/1, stop/1]).
--define(MIN_BUFFER_SIZE, 100).
+-define(BUFFER_SIZE, 1000).
-include("couch_db.hrl").
-include("../ibrowse/ibrowse.hrl").
--record (remote, {
- conn,
- reqid,
+-record (state, {
+ changes_from = nil,
+ changes_loop = nil,
last_seq,
+ conn = nil,
+ reqid = nil,
+ by_seq_from = nil,
+ by_seq_loop = nil,
complete = false,
count = 0,
partial_chunk = nil,
@@ -33,194 +37,110 @@
rows = queue:new()
}).
--record (local, {
- changes_from = nil,
- changes_pid = nil,
- complete = false,
- count = 0,
- reply_to = nil,
- rows = queue:new()
-}).
-
-start(Url, Options) ->
- gen_server:start(?MODULE, [Url, Options], []).
-
-start_link(Url, Options) ->
- gen_server:start_link(?MODULE, [Url, Options], []).
+start_link(Parent, Source, StartSeq, PostProps) ->
+ gen_server:start_link(?MODULE, [Parent, Source, StartSeq, PostProps], []).
-%% @doc does not block
-all(Server) ->
- gen_server:call(Server, all_changes).
-
-%% @doc returns the next change from the feed, blocking if necessary
next(Server) ->
- gen_server:call(Server, next_change, infinity).
+ gen_server:call(Server, next_changes, infinity).
stop(Server) ->
gen_server:call(Server, stop).
-init([{remote, Url}, Options]) ->
- Since = proplists:get_value(since, Options, 0),
- Continuous = proplists:get_value(continuous, Options, false),
- {Pid, ReqId} = start_http_request(lists:concat([Url, "/_changes",
- "?style=all_docs", "&since=", Since, "&continuous=", Continuous])),
- {ok, #remote{conn=Pid, last_seq=Since, reqid=ReqId}};
-
-init([{local, DbName}, Options]) when is_list(DbName) ->
- init([{local, ?l2b(DbName)}, Options]);
-init([{local, DbName}, Options]) ->
- ?LOG_DEBUG("initializing local changes feed for ~s with ~p", [DbName, Options]),
+init([_Parent, #http_db{}=Source, Since, PostProps]) ->
+ Feed = case proplists:get_value(<<"continuous">>, PostProps, false) of
+ false ->
+ normal;
+ true ->
+ continuous
+ end,
+ Pid = couch_rep_httpc:spawn_link_worker_process(Source),
+ Req = Source#http_db{
+ resource = "_changes",
+ qs = [{style, all_docs}, {heartbeat, true}, {since, Since},
+ {feed, Feed}],
+ conn = Pid,
+ options = [{stream_to, {self(), once}}, {response_format, binary}],
+ headers = Source#http_db.headers -- [{"Accept-Encoding", "gzip"}]
+ },
+ {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req),
+
+ receive
+ {ibrowse_async_headers, ReqId, "200", _} ->
+ ibrowse:stream_next(ReqId),
+ {ok, #state{conn=Pid, last_seq=Since, reqid=ReqId}};
+ {ibrowse_async_headers, ReqId, "301", Hdrs} ->
+ catch ibrowse:stop_worker_process(Pid),
+ Url2 = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
+ %% TODO use couch_httpc:request instead of start_http_request
+ {Pid2, ReqId2} = start_http_request(Url2),
+ receive {ibrowse_async_headers, ReqId2, "200", _} ->
+ {ok, #state{conn=Pid2, last_seq=Since, reqid=ReqId2}}
+ after 30000 ->
+ {stop, changes_timeout}
+ end;
+ {ibrowse_async_headers, ReqId, "404", _} ->
+ catch ibrowse:stop_worker_process(Pid),
+ ?LOG_INFO("source doesn't have _changes, trying _all_docs_by_seq", []),
+ Self = self(),
+ BySeqPid = spawn_link(fun() -> by_seq_loop(Self, Source, Since) end),
+ {ok, #state{last_seq=Since, by_seq_loop=BySeqPid}};
+ {ibrowse_async_headers, ReqId, Code, _} ->
+ {stop, {changes_error_code, list_to_integer(Code)}}
+ after 10000 ->
+ {stop, changes_timeout}
+ end;
+
+init([_Parent, Source, Since, PostProps]) ->
process_flag(trap_exit, true),
Server = self(),
- Since = proplists:get_value(since, Options, 0),
ChangesPid =
- case proplists:get_value(continuous, Options, false) of
+ case proplists:get_value(<<"continuous">>, PostProps, false) of
false ->
- spawn_link(fun() -> send_local_changes_once(Server, DbName, Since) end);
+ spawn_link(fun() -> send_local_changes_once(Server, Source, Since) end);
true ->
- spawn_link(fun() -> send_local_changes_forever(Server, DbName, Since) end)
+ spawn_link(fun() -> send_local_changes_forever(Server, Source, Since) end)
end,
- {ok, #local{changes_pid=ChangesPid}}.
-
-handle_call({add, Row}, _From, #local{count=Count, rows=Rows}=State)
- when Count < ?MIN_BUFFER_SIZE->
- case State of
- #local{reply_to=nil} ->
- {reply, ok, State#local{count=Count+1, rows = queue:in(Row, Rows)}};
- #local{count=0, reply_to=Requestor}->
- gen_server:reply(Requestor, Row),
- {reply, ok, State#local{reply_to=nil}}
- end;
-handle_call({add, Row}, From, #local{}=State) ->
- #local{
- count = Count,
- rows = Rows
- } = State,
- {noreply, State#local{count=Count+1, changes_from=From, rows=queue:in(Row,Rows)}};
+ {ok, #state{changes_loop=ChangesPid}}.
-handle_call(all_changes, _From, #local{complete=Complete, count=Count}=State)
- when Complete =:= false; Count > 0 ->
- #local{
- changes_from = ChangesFrom,
- rows = Rows
- } = State,
- if Count < ?MIN_BUFFER_SIZE, ChangesFrom =/= nil ->
- gen_server:reply(ChangesFrom, ok);
- true -> ok end,
- {reply, queue:to_list(Rows), State#local{count=0, rows=queue:new()}};
-handle_call(all_changes, _From, #local{}=State) ->
- {stop, normal, complete, State};
-
-handle_call(all_changes, _From, #remote{complete=Complete, count=Count}=State)
- when Complete =:= false; Count > 0 ->
- #remote{
- reqid = Id,
- rows = Rows
- } = State,
- ok = maybe_stream_next(Complete, 0, Id),
- {reply, queue:to_list(Rows), State#remote{count=0, rows=queue:new()}};
-handle_call(all_changes, _From, #remote{}=State) ->
- {stop, normal, complete, State};
-
-handle_call(next_change, From, #local{count=0}=State) ->
- if State#local.complete ->
- {stop, normal, complete, State};
- true ->
- {noreply, State#local{reply_to=From}}
- end;
-handle_call(next_change, _From, #local{}=State) ->
- #local{
- count = Count,
- changes_from = ChangesFrom,
- rows = Rows
- } = State,
- {{value, Row}, NewRows} = queue:out(Rows),
- if Count =:= ?MIN_BUFFER_SIZE, ChangesFrom =/= nil ->
- gen_server:reply(ChangesFrom, ok),
- {reply, Row, State#local{count=Count-1, changes_from=nil, rows=NewRows}};
- true ->
- {reply, Row, State#local{count=Count-1, rows=NewRows}}
- end;
+handle_call({add_change, Row}, From, State) ->
+ handle_add_change(Row, From, State);
-handle_call(next_change, From, #remote{count=0}=State) ->
- if State#remote.complete ->
- {stop, normal, complete, State};
- true ->
- {noreply, State#remote{reply_to=From}}
- end;
-handle_call(next_change, _From, #remote{}=State) ->
- #remote{
- reqid = Id,
- complete = Complete,
- count = Count,
- rows = Rows
+handle_call(next_changes, From, State) ->
+ handle_next_changes(From, State);
+
+handle_call(stop, _From, State) ->
+ #state{
+ changes_loop = ChangesPid,
+ conn = Conn
} = State,
- ok = maybe_stream_next(Complete, Count, Id),
- {{value, Row}, NewRows} = queue:out(Rows),
- {reply, Row, State#remote{count=Count-1, rows=NewRows}};
-
-handle_call(stop, _From, #local{changes_pid=ChangesPid} = State) ->
- exit(ChangesPid, stop),
- {stop, normal, ok, State};
-
-handle_call(stop, _From, #remote{conn=Conn} = State) ->
- catch ibrowse:stop_worker_process(Conn),
+ if is_pid(ChangesPid) -> exit(ChangesPid, stop); true -> ok end,
+ if is_pid(Conn) -> catch ibrowse:stop_worker_process(Conn); true -> ok end,
{stop, normal, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({ibrowse_async_headers, Id, "200", _}, #remote{reqid=Id}=State) ->
- #remote{
- complete = Complete,
- count = Count
- } = State,
- ?LOG_DEBUG("~p reqid ~p ibrowse_async_headers 200", [?MODULE, Id]),
- ok = maybe_stream_next(Complete, Count, Id),
- {noreply, State};
-handle_info({ibrowse_async_headers, Id, "301", Hdrs}, #remote{reqid=Id}=State) ->
- ?LOG_DEBUG("~p reqid ~p ibrowse_async_headers 301", [?MODULE, Id]),
- catch ibrowse:stop_worker_process(State#remote.conn),
- Url = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
- {Pid, ReqId} = start_http_request(Url),
- {noreply, State#remote{conn=Pid, reqid=ReqId}};
-handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #remote{reqid=Id}=State) ->
- ?LOG_ERROR("replicator changes feed failed with code ~s and Headers ~n~p",
- [Code,Hdrs]),
- {stop, {error, list_to_integer(Code)}, State};
+handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) ->
+ handle_headers(list_to_integer(Code), Hdrs, State);
-handle_info({ibrowse_async_response, _, {error, Reason}}, State) ->
- {stop, {error, Reason}, State};
-handle_info({ibrowse_async_response, Id, Msg}, #remote{reqid=Id} = State) ->
- ?LOG_DEBUG("~p reqid ~p ibrowse_async_response ~p", [?MODULE, Id, Msg]),
- {noreply, process_response(Msg, Id, State)};
-
-handle_info({ibrowse_async_response_end, Id}, #remote{reqid=Id} = State) ->
- ?LOG_DEBUG("got ibrowse_async_response_end ~p", [State#remote.reply_to]),
- case State of
- #remote{reply_to=nil} ->
- {noreply, State#remote{complete=true}};
- #remote{count=0, reply_to=From}->
- gen_server:reply(From, complete),
- {stop, normal, State}
- end;
+handle_info({ibrowse_async_response, Id, Msg}, #state{reqid=Id} = State) ->
+ handle_response(Msg, State);
-handle_info({'EXIT', From, normal}, #local{changes_pid=From} = State) ->
- if State#local.reply_to =/= nil ->
- gen_server:reply(State#local.reply_to, complete),
- {stop, normal, State};
- true ->
- {noreply, State#local{complete=true}}
- end;
-handle_info({'EXIT', From, Reason}, #local{changes_pid=From} = State) ->
- ?LOG_ERROR("changes_pid died with reason ~p", [Reason]),
- {stop, changes_pid_died, State};
+handle_info({ibrowse_async_response_end, Id}, #state{reqid=Id} = State) ->
+ handle_feed_completion(State);
+
+handle_info({'EXIT', From, normal}, #state{changes_loop=From} = State) ->
+ handle_feed_completion(State);
+
+handle_info({'EXIT', From, Reason}, #state{changes_loop=From} = State) ->
+ ?LOG_ERROR("changes_loop died with reason ~p", [Reason]),
+ {stop, changes_loop_died, State};
handle_info(Msg, State) ->
?LOG_INFO("unexpected message ~p", [Msg]),
{noreply, State}.
-terminate(_Reason, #remote{conn=Pid}) when is_pid(Pid) ->
+terminate(_Reason, #state{conn=Pid}) when is_pid(Pid) ->
catch ibrowse:stop_worker_process(Pid),
ok;
terminate(_Reason, _State) ->
@@ -231,66 +151,141 @@ code_change(_OldVsn, State, _Extra) ->
%internal funs
-process_response(<<"{\"results\":[\n">>, Id, State) ->
- #remote{
- complete = Complete,
- count = Count
+handle_add_change(Row, From, #state{reply_to=nil} = State) ->
+ #state{
+ count = Count,
+ rows = Rows
} = State,
- ok = maybe_stream_next(Complete, Count, Id),
- State;
-process_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, _, State) ->
+ NewState = State#state{count=Count+1, rows=queue:in(Row,Rows)},
+ if Count < ?BUFFER_SIZE ->
+ {reply, ok, NewState};
+ true ->
+ {noreply, NewState#state{changes_from=From}}
+ end;
+handle_add_change(Row, _From, #state{count=0} = State) ->
+ gen_server:reply(State#state.reply_to, [Row]),
+ {reply, ok, State#state{reply_to=nil}}.
+
+handle_next_changes(From, #state{count=0}=State) ->
+ if State#state.complete ->
+ {stop, normal, complete, State};
+ true ->
+ {noreply, State#state{reply_to=From}}
+ end;
+handle_next_changes(_From, State) ->
+ #state{
+ changes_from = ChangesFrom,
+ rows = Rows
+ } = State,
+ NewState = State#state{count=0, changes_from=nil, rows=queue:new()},
+ ok = maybe_stream_next(NewState),
+ if ChangesFrom =/= nil -> gen_server:reply(ChangesFrom, ok); true -> ok end,
+ {reply, queue:to_list(Rows), NewState}.
+
+handle_headers(200, _, State) ->
+ ok = maybe_stream_next(State),
+ {noreply, State};
+handle_headers(301, Hdrs, State) ->
+ catch ibrowse:stop_worker_process(State#state.conn),
+ Url = mochiweb_headers:get_value("Location", mochiweb_headers:make(Hdrs)),
+ %% TODO use couch_httpc:request instead of start_http_request
+ {Pid, ReqId} = start_http_request(Url),
+ {noreply, State#state{conn=Pid, reqid=ReqId}};
+handle_headers(Code, Hdrs, State) ->
+ ?LOG_ERROR("replicator changes feed failed with code ~s and Headers ~n~p",
+ [Code,Hdrs]),
+ {stop, {error, Code}, State}.
+
+handle_response({error, Reason}, State) ->
+ {stop, {error, Reason}, State};
+handle_response(<<"\n">>, State) ->
+ ?LOG_DEBUG("got a heartbeat from the remote server", []),
+ {noreply, State};
+handle_response(<<"{\"results\":[\n">>, State) ->
+ ok = maybe_stream_next(State),
+ {noreply, State};
+handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) ->
LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
- State#remote{last_seq = LastSeq};
-process_response(Chunk, Id, #remote{partial_chunk=nil} = State) ->
- #remote{
- complete = Complete,
+ {noreply, State#state{last_seq = LastSeq}};
+handle_response(Chunk, #state{partial_chunk=nil} = State) ->
+ #state{
count = Count,
rows = Rows
} = State,
+ ok = maybe_stream_next(State),
try
Row = decode_row(Chunk),
- ok = maybe_stream_next(Complete, Count+1, Id),
case State of
- #remote{reply_to=nil} ->
- State#remote{count=Count+1, rows = queue:in(Row, Rows)};
- #remote{count=0, reply_to=From}->
- gen_server:reply(From, Row),
- State#remote{reply_to=nil}
+ #state{reply_to=nil} ->
+ {noreply, State#state{count=Count+1, rows = queue:in(Row, Rows)}};
+ #state{count=0, reply_to=From}->
+ gen_server:reply(From, [Row]),
+ {noreply, State#state{reply_to=nil}}
end
catch
throw:{invalid_json, Bad} ->
- ?LOG_DEBUG("got invalid_json ~p", [Bad]),
- ok = maybe_stream_next(Complete, Count, Id),
- State#remote{partial_chunk = Bad}
+ {noreply, State#state{partial_chunk = Bad}}
end;
-process_response(Chunk, Id, State) ->
- #remote{
- complete = Complete,
+handle_response(Chunk, State) ->
+ #state{
count = Count,
partial_chunk = Partial,
rows = Rows
} = State,
+ ok = maybe_stream_next(State),
try
Row = decode_row(<<Partial/binary, Chunk/binary>>),
- ok = maybe_stream_next(Complete, Count+1, Id),
- case State of
- #remote{reply_to=nil} ->
- State#remote{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)};
- #remote{count=0, reply_to=From}->
- gen_server:reply(From, Row),
- State#remote{reply_to=nil, partial_chunk=nil}
- end
+ {noreply, case State of
+ #state{reply_to=nil} ->
+ State#state{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)};
+ #state{count=0, reply_to=From}->
+ gen_server:reply(From, [Row]),
+ State#state{reply_to=nil, partial_chunk=nil}
+ end}
catch
throw:{invalid_json, Bad} ->
- ?LOG_DEBUG("got invalid_json ~p", [Bad]),
- ok = maybe_stream_next(Complete, Count, Id),
- State#remote{partial_chunk = Bad}
+ {noreply, State#state{partial_chunk = Bad}}
end.
-
+
+handle_feed_completion(#state{reply_to=nil} = State)->
+ {noreply, State#state{complete=true}};
+handle_feed_completion(#state{count=0} = State) ->
+ gen_server:reply(State#state.reply_to, complete),
+ {stop, normal, State}.
+
+by_seq_loop(Server, Source, StartSeq) ->
+ Req = Source#http_db{
+ resource = "_all_docs_by_seq",
+ qs = [{limit, 1000}, {startkey, StartSeq}]
+ },
+ {Results} = couch_rep_httpc:request(Req),
+ if Results =:= [] -> exit(normal); true -> ok end,
+ EndSeq = lists:foldl(fun({RowInfoList}, _) ->
+ Id = proplists:get_value(<<"id">>, RowInfoList),
+ Seq = proplists:get_value(<<"key">>, RowInfoList),
+ {RowProps} = proplists:get_value(<<"value">>, RowInfoList),
+ RawRevs = [
+ proplists:get_value(<<"rev">>, RowProps),
+ proplists:get_value(<<"conflicts">>, RowProps, []),
+ proplists:get_value(<<"deleted_conflicts">>, RowProps, [])
+ ],
+ ParsedRevs = couch_doc:parse_revs(lists:flatten(RawRevs)),
+ Change = {[
+ {<<"seq">>, Seq},
+ {<<"Id">>, Id},
+ {<<"changes">>, [{[{<<"rev">>,R}]} || R <- ParsedRevs]}
+ ]},
+ gen_server:call(Server, {add_change, Change}),
+ Seq
+ end, 0, proplists:get_value(<<"rows">>, Results)),
+ by_seq_loop(Server, Source, EndSeq+1).
+
decode_row(<<",\n", Rest/binary>>) ->
decode_row(Rest);
decode_row(Row) ->
- ?JSON_DECODE(Row).
+ {[Seq, Id, {<<"changes">>,C}]} = ?JSON_DECODE(Row),
+ C2 = [{[{<<"rev">>,couch_doc:parse_rev(R)}]} || {[{<<"rev">>,R}]} <- C],
+ {[Seq, Id, {<<"changes">>,C2}]}.
flush_updated_messages() ->
receive updated -> flush_updated_messages()
@@ -304,27 +299,28 @@ local_update_notification(Self, DbName, {deleted, DbName}) ->
local_update_notification(_, _, _) ->
ok.
-maybe_stream_next(false, Count, Id) when Count < ?MIN_BUFFER_SIZE ->
- ?LOG_DEBUG("~p reqid ~p streaming next chunk", [?MODULE, Id]),
- ibrowse:stream_next(Id);
-maybe_stream_next(_Complete, _Count, Id) ->
- ?LOG_DEBUG("~p reqid ~p not streaming", [?MODULE, Id]),
+maybe_stream_next(#state{reqid=nil}) ->
+ ok;
+maybe_stream_next(#state{complete=false, count=N} = S) when N < ?BUFFER_SIZE ->
+ ibrowse:stream_next(S#state.reqid);
+maybe_stream_next(_) ->
ok.
-send_local_changes_forever(Server, DbName, Since) ->
+send_local_changes_forever(Server, Db, Since) ->
+ #db{name = DbName, user_ctx = UserCtx} = Db,
Self = self(),
{ok, _} = couch_db_update_notifier:start_link(
fun(Msg) -> local_update_notification(Self, DbName, Msg) end),
- {ok, NewSeq} = send_local_changes_once(Server, DbName, Since),
+ {ok, NewSeq} = send_local_changes_once(Server, Db, Since),
+ couch_db:close(Db),
ok = wait_db_updated(),
- send_local_changes_forever(Server, DbName, NewSeq).
-
-send_local_changes_once(Server, DbName, Since) ->
- {ok, Db} = couch_db:open(DbName, []),
+ {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+ send_local_changes_forever(Server, NewDb, NewSeq).
+send_local_changes_once(Server, Db, Since) ->
FilterFun =
fun(#doc_info{revs=[#rev_info{rev=Rev}|_]}) ->
- {[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}
+ {[{<<"rev">>, Rev}]}
end,
ChangesFun =
@@ -333,9 +329,8 @@ send_local_changes_once(Server, DbName, Since) ->
Results = [Result || Result <- Results0, Result /= null],
if Results /= [] ->
Change = {[{<<"seq">>,Seq}, {<<"id">>,Id}, {<<"changes">>,Results}]},
- gen_server:call(Server, {add, Change}, infinity);
+ gen_server:call(Server, {add_change, Change}, infinity);
true ->
- ?LOG_DEBUG("Results was empty ~p", [Results0]),
ok
end,
{ok, Seq}