8f2508f5acd88cf80c346db4474aa43fd5306229
[cloudant_bigcouch.git] / apps / couch / src / couch_rep.erl
1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not
2 % use this file except in compliance with the License. You may obtain a copy of
3 % the License at
4 %
5 %   http://www.apache.org/licenses/LICENSE-2.0
6 %
7 % Unless required by applicable law or agreed to in writing, software
8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10 % License for the specific language governing permissions and limitations under
11 % the License.
12
13 -module(couch_rep).
14 -behaviour(gen_server).
15 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
16     code_change/3]).
17
18 -export([replicate/2, replicate/3, checkpoint/1]).
19 -export([make_replication_id/2]).
20 -export([start_replication/4, end_replication/1, get_result/4]).
21
22 -include("couch_db.hrl").
23 -include("couch_js_functions.hrl").
24 -include_lib("ibrowse/include/ibrowse.hrl").
25
26 -define(REP_ID_VERSION, 2).
27
28 -record(state, {
29     changes_feed,
30     missing_revs,
31     reader,
32     writer,
33
34     source,
35     target,
36     continuous,
37     create_target,
38     init_args,
39     checkpoint_scheduled = nil,
40
41     start_seq,
42     history,
43     session_id,
44     source_log,
45     target_log,
46     rep_starttime,
47     src_starttime,
48     tgt_starttime,
49     checkpoint_history = nil,
50
51     listeners = [],
52     complete = false,
53     committed_seq = 0,
54
55     stats = nil,
56     source_db_update_notifier = nil,
57     target_db_update_notifier = nil
58 }).
59
60 %% convenience function to do a simple replication from the shell
61 replicate(Source, Target) when is_list(Source) ->
62     replicate(?l2b(Source), Target);
63 replicate(Source, Target) when is_binary(Source), is_list(Target) ->
64     replicate(Source, ?l2b(Target));
65 replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
66     replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{});
67
68 %% function handling POST to _replicate
69 replicate(PostBody, UserCtx) ->
70     replicate(PostBody, UserCtx, couch_replication_manager).
71
72 replicate({Props}=PostBody, UserCtx, Module) ->
73     RepId = make_replication_id(PostBody, UserCtx),
74     case couch_util:get_value(<<"cancel">>, Props, false) of
75     true ->
76         end_replication(RepId);
77     false ->
78         Server = start_replication(PostBody, RepId, UserCtx, Module),
79         get_result(Server, RepId, PostBody, UserCtx)
80     end.
81
82 end_replication({BaseId, Extension}) ->
83     RepId = BaseId ++ Extension,
84     case supervisor:terminate_child(couch_rep_sup, RepId) of
85     {error, not_found} = R ->
86         R;
87     ok ->
88         case supervisor:delete_child(couch_rep_sup, RepId) of
89             ok ->
90                 {ok, {cancelled, ?l2b(BaseId)}};
91             {error, not_found} ->
92                 {ok, {cancelled, ?l2b(BaseId)}};
93             {error, _} = Error ->
94                 Error
95         end
96     end.
97
98 start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx, Module) ->
99     Replicator = {
100         BaseId ++ Extension,
101         {gen_server, start_link,
102             [?MODULE, [RepId, RepDoc, UserCtx, Module], []]},
103         temporary,
104         1,
105         worker,
106         [?MODULE]
107     },
108     start_replication_server(Replicator).
109
110 checkpoint(Server) ->
111     gen_server:cast(Server, do_checkpoint).
112
113 get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) ->
114     case couch_util:get_value(<<"continuous">>, Props, false) of
115     true ->
116         {ok, {continuous, ?l2b(BaseId)}};
117     false ->
118         try gen_server:call(Server, get_result, infinity) of
119         retry -> replicate(PostBody, UserCtx);
120         Else -> Else
121         catch
122         exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} ->
123             %% oops, this replication just finished -- restart it.
124             replicate(PostBody, UserCtx);
125         exit:{normal, {gen_server, call, [Server, get_result, infinity]}} ->
126             %% we made the call during terminate
127             replicate(PostBody, UserCtx)
128         end
129     end.
130
131 init(InitArgs) ->
132     try
133         do_init(InitArgs)
134     catch
135     throw:Error ->
136         {stop, Error}
137     end.
138
139 do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx, Module] = InitArgs) ->
140     process_flag(trap_exit, true),
141
142     SourceProps = couch_util:get_value(<<"source">>, PostProps),
143     TargetProps = couch_util:get_value(<<"target">>, PostProps),
144
145     Continuous = couch_util:get_value(<<"continuous">>, PostProps, false),
146     CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false),
147
148     ProxyParams = parse_proxy_params(
149         couch_util:get_value(<<"proxy">>, PostProps, [])),
150     Source = open_db(SourceProps, UserCtx, ProxyParams),
151     Target = open_db(TargetProps, UserCtx, ProxyParams, CreateTarget),
152
153     SourceInfo = dbinfo(Source),
154     TargetInfo = dbinfo(Target),
155
156     [SourceLog, TargetLog] = find_replication_logs(
157         [Source, Target], BaseId, {PostProps}, UserCtx),
158     {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
159
160     {ok, ChangesFeed} =
161         couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
162     {ok, MissingRevs} =
163         couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
164     {ok, Reader} =
165         couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
166     {ok, Writer} =
167     couch_rep_writer:start_link(self(), Target, Reader, PostProps),
168
169     Stats = ets:new(replication_stats, [set, private]),
170     ets:insert(Stats, {total_revs,0}),
171     ets:insert(Stats, {missing_revs, 0}),
172     ets:insert(Stats, {docs_read, 0}),
173     ets:insert(Stats, {docs_written, 0}),
174     ets:insert(Stats, {doc_write_failures, 0}),
175
176     {ShortId, _} = lists:split(6, BaseId),
177     couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
178         [ShortId, dbname(Source), dbname(Target)]), "Starting"),
179
180     Module:replication_started(RepId),
181
182     State = #state{
183         changes_feed = ChangesFeed,
184         missing_revs = MissingRevs,
185         reader = Reader,
186         writer = Writer,
187
188         source = Source,
189         target = Target,
190         continuous = Continuous,
191         create_target = CreateTarget,
192         init_args = InitArgs,
193         stats = Stats,
194         checkpoint_scheduled = nil,
195
196         start_seq = StartSeq,
197         history = History,
198         session_id = couch_uuids:random(),
199         source_log = SourceLog,
200         target_log = TargetLog,
201         rep_starttime = httpd_util:rfc1123_date(),
202         src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
203         tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
204         source_db_update_notifier = source_db_update_notifier(Source),
205         target_db_update_notifier = target_db_update_notifier(Target)
206     },
207     {ok, State}.
208
209 handle_call(get_result, From, #state{complete=true, listeners=[]} = State) ->
210     {stop, normal, State#state{listeners=[From]}};
211 handle_call(get_result, From, State) ->
212     Listeners = State#state.listeners,
213     {noreply, State#state{listeners=[From|Listeners]}};
214
215 handle_call(get_source_db, _From, #state{source = Source} = State) ->
216     {reply, {ok, Source}, State};
217
218 handle_call(get_target_db, _From, #state{target = Target} = State) ->
219     {reply, {ok, Target}, State}.
220
221 handle_cast(reopen_source_db, #state{source = Source} = State) ->
222     {ok, NewSource} = couch_db:reopen(Source),
223     {noreply, State#state{source = NewSource}};
224
225 handle_cast(reopen_target_db, #state{target = Target} = State) ->
226     {ok, NewTarget} = couch_db:reopen(Target),
227     {noreply, State#state{target = NewTarget}};
228
229 handle_cast(do_checkpoint, State) ->
230     {noreply, do_checkpoint(State)};
231
232 handle_cast(_Msg, State) ->
233     {noreply, State}.
234
235 handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
236     couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
237     {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
238
239 handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
240         when SourceSeq > N ->
241     MissingRevs = State#state.missing_revs,
242     ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
243     couch_task_status:update("W Processed source update #~p", [SourceSeq]),
244     {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
245 handle_info({writer_checkpoint, _}, State) ->
246     {noreply, State};
247
248 handle_info({update_stats, Key, N}, State) ->
249     ets:update_counter(State#state.stats, Key, N),
250     {noreply, State};
251
252 handle_info({'DOWN', _, _, _, _}, State) ->
253     ?LOG_INFO("replication terminating because local DB is shutting down", []),
254     timer:cancel(State#state.checkpoint_scheduled),
255     {stop, shutdown, State};
256
257 handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) ->
258     case State#state.listeners of
259     [] ->
260         {noreply, State#state{complete = true}};
261     _Else ->
262         {stop, normal, State}
263     end;
264
265 handle_info({'EXIT', _, normal}, State) ->
266     {noreply, State};
267 handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error;
268         Err == target_error ->
269     ?LOG_INFO("replication terminating due to ~p: ~p", [Err, Reason]),
270     timer:cancel(State#state.checkpoint_scheduled),
271     {stop, shutdown, State};
272 handle_info({'EXIT', _Pid, Reason}, State) ->
273     {stop, Reason, State}.
274
275 terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId, _, _, Module]} = State) ->
276     do_terminate(State),
277     Module:replication_completed(RepId);
278     
279 terminate(normal, #state{init_args=[RepId, _, _, Module]} = State) ->
280     timer:cancel(State#state.checkpoint_scheduled),
281     do_terminate(do_checkpoint(State)),
282     Module:replication_completed(RepId);
283
284 terminate(shutdown, #state{listeners = Listeners} = State) ->
285     % continuous replication stopped
286     [gen_server:reply(L, {ok, stopped}) || L <- Listeners],
287     terminate_cleanup(State);
288
289 terminate(Reason, #state{listeners = Listeners, init_args=[RepId, _, _, Module]} = State) ->
290     [gen_server:reply(L, {error, Reason}) || L <- Listeners],
291     terminate_cleanup(State),
292     Module:replication_error(RepId, Reason).
293
294 code_change(_OldVsn, State, _Extra) ->
295     {ok, State}.
296
297 % internal funs
298
299 start_replication_server(Replicator) ->
300     RepId = element(1, Replicator),
301     case supervisor:start_child(couch_rep_sup, Replicator) of
302     {ok, Pid} ->
303         ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
304         Pid;
305     {error, already_present} ->
306         case supervisor:restart_child(couch_rep_sup, RepId) of
307         {ok, Pid} ->
308             ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
309             Pid;
310         {error, running} ->
311             %% this error occurs if multiple replicators are racing
312             %% each other to start and somebody else won.  Just grab
313             %% the Pid by calling start_child again.
314             {error, {already_started, Pid}} =
315                 supervisor:start_child(couch_rep_sup, Replicator),
316             ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
317             Pid;
318         {error, {db_not_found, DbUrl}} ->
319             throw({db_not_found, <<"could not open ", DbUrl/binary>>});
320         {error, {unauthorized, DbUrl}} ->
321             throw({unauthorized,
322                 <<"unauthorized to access or create database ", DbUrl/binary>>});
323         {error, {'EXIT', {badarg,
324             [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
325             % Clause to deal with a change in the supervisor module introduced
326             % in R14B02. For more details consult the thread at:
327             %     http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
328             _ = supervisor:delete_child(couch_rep_sup, RepId),
329             start_replication_server(Replicator)
330         end;
331     {error, {already_started, Pid}} ->
332         ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
333         Pid;
334     {error, {{db_not_found, DbUrl}, _}} ->
335         throw({db_not_found, <<"could not open ", DbUrl/binary>>});
336     {error, {{unauthorized, DbUrl}, _}} ->
337         throw({unauthorized,
338             <<"unauthorized to access or create database ", DbUrl/binary>>})
339     end.
340
341 compare_replication_logs(SrcDoc, TgtDoc) ->
342     #doc{body={RepRecProps}} = SrcDoc,
343     #doc{body={RepRecPropsTgt}} = TgtDoc,
344     case couch_util:get_value(<<"session_id">>, RepRecProps) ==
345             couch_util:get_value(<<"session_id">>, RepRecPropsTgt) of
346     true ->
347         % if the records have the same session id,
348         % then we have a valid replication history
349         OldSeqNum = couch_util:get_value(<<"source_last_seq">>, RepRecProps, 0),
350         OldHistory = couch_util:get_value(<<"history">>, RepRecProps, []),
351         {OldSeqNum, OldHistory};
352     false ->
353         SourceHistory = couch_util:get_value(<<"history">>, RepRecProps, []),
354         TargetHistory = couch_util:get_value(<<"history">>, RepRecPropsTgt, []),
355         ?LOG_INFO("Replication records differ. "
356                 "Scanning histories to find a common ancestor.", []),
357         ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
358                 [RepRecProps, RepRecPropsTgt]),
359         compare_rep_history(SourceHistory, TargetHistory)
360     end.
361
362 compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
363     ?LOG_INFO("no common ancestry -- performing full replication", []),
364     {0, []};
365 compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) ->
366     SourceId = couch_util:get_value(<<"session_id">>, S),
367     case has_session_id(SourceId, Target) of
368     true ->
369         RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, S, 0),
370         ?LOG_INFO("found a common replication record with source_seq ~p",
371             [RecordSeqNum]),
372         {RecordSeqNum, SourceRest};
373     false ->
374         TargetId = couch_util:get_value(<<"session_id">>, T),
375         case has_session_id(TargetId, SourceRest) of
376         true ->
377             RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, T, 0),
378             ?LOG_INFO("found a common replication record with source_seq ~p",
379                 [RecordSeqNum]),
380             {RecordSeqNum, TargetRest};
381         false ->
382             compare_rep_history(SourceRest, TargetRest)
383         end
384     end.
385
386 close_db(#http_db{}) ->
387     ok;
388 close_db(Db) ->
389     couch_db:close(Db).
390
391 dbname(#http_db{url = Url}) ->
392     couch_util:url_strip_password(Url);
393 dbname(#db{name = Name}) ->
394     Name.
395
396 dbinfo(#http_db{} = Db) ->
397     {DbProps} = couch_rep_httpc:request(Db),
398     [{couch_util:to_existing_atom(K), V} || {K,V} <- DbProps];
399 dbinfo(Db) ->
400     {ok, Info} = couch_db:get_db_info(Db),
401     Info.
402
403 do_terminate(State) ->
404     #state{
405         checkpoint_history = CheckpointHistory,
406         committed_seq = NewSeq,
407         listeners = Listeners,
408         source = Source,
409         continuous = Continuous,
410         source_log = #doc{body={OldHistory}}
411     } = State,
412     
413     NewRepHistory = case CheckpointHistory of
414     nil ->
415         {[{<<"no_changes">>, true} | OldHistory]};
416     _Else ->
417         CheckpointHistory
418     end,
419
420     %% reply to original requester
421     OtherListeners = case Continuous of
422     true ->
423         []; % continuous replications have no listeners
424     _ ->
425         [Original|Rest] = lists:reverse(Listeners),
426         gen_server:reply(Original, {ok, NewRepHistory}),
427         Rest
428     end,
429
430     %% maybe trigger another replication. If this replicator uses a local
431     %% source Db, changes to that Db since we started will not be included in
432     %% this pass.
433     case up_to_date(Source, NewSeq) of
434         true ->
435             [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners];
436         false ->
437             [gen_server:reply(R, retry) || R <- OtherListeners]
438     end,
439     couch_task_status:update("Finishing"),
440     terminate_cleanup(State).
441
442 terminate_cleanup(State) ->
443     close_db(State#state.source),
444     close_db(State#state.target),
445     stop_db_update_notifier(State#state.source_db_update_notifier),
446     stop_db_update_notifier(State#state.target_db_update_notifier),
447     ets:delete(State#state.stats).
448
449 stop_db_update_notifier(nil) ->
450     ok;
451 stop_db_update_notifier(Notifier) ->
452     couch_db_update_notifier:stop(Notifier).
453
454 has_session_id(_SessionId, []) ->
455     false;
456 has_session_id(SessionId, [{Props} | Rest]) ->
457     case couch_util:get_value(<<"session_id">>, Props, nil) of
458     SessionId ->
459         true;
460     _Else ->
461         has_session_id(SessionId, Rest)
462     end.
463
464 maybe_append_options(Options, {Props}) ->
465     lists:foldl(fun(Option, Acc) ->
466         Acc ++
467         case couch_util:get_value(Option, Props, false) of
468         true ->
469             "+" ++ ?b2l(Option);
470         false ->
471             ""
472         end
473     end, [], Options).
474
475 make_replication_id(RepProps, UserCtx) ->
476     BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION),
477     Extension = maybe_append_options(
478                   [<<"continuous">>, <<"create_target">>], RepProps),
479     {BaseId, Extension}.
480
481 % Versioned clauses for generating replication ids
482 % If a change is made to how replications are identified
483 % add a new clause and increase ?REP_ID_VERSION at the top
484 make_replication_id({Props}, UserCtx, 2) ->
485     {ok, HostName} = inet:gethostname(),
486     Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
487     P when is_number(P) ->
488         P;
489     _ ->
490         % On restart we might be called before the couch_httpd process is
491         % started.
492         % TODO: we might be under an SSL socket server only, or both under
493         % SSL and a non-SSL socket.
494         % ... mochiweb_socket_server:get(https, port)
495         list_to_integer(couch_config:get("httpd", "port", "5984"))
496     end,
497     Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
498     Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
499     maybe_append_filters({Props}, [HostName, Port, Src, Tgt], UserCtx);
500 make_replication_id({Props}, UserCtx, 1) ->
501     {ok, HostName} = inet:gethostname(),
502     Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
503     Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
504     maybe_append_filters({Props}, [HostName, Src, Tgt], UserCtx).
505
506 maybe_append_filters({Props}, Base, UserCtx) ->
507     Base2 = Base ++ 
508         case couch_util:get_value(<<"filter">>, Props) of
509         undefined ->
510             case couch_util:get_value(<<"doc_ids">>, Props) of
511             undefined ->
512                 [];
513             DocIds ->
514                 [DocIds]
515             end;
516         Filter ->
517             [filter_code(Filter, Props, UserCtx),
518                 couch_util:get_value(<<"query_params">>, Props, {[]})]
519         end,
520     couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
521
522 filter_code(Filter, Props, UserCtx) ->
523     {DDocName, FilterName} =
524     case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
525     {match, [DDocName0, FilterName0]} ->
526         {DDocName0, FilterName0};
527     _ ->
528         throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
529     end,
530     ProxyParams = parse_proxy_params(
531         couch_util:get_value(<<"proxy">>, Props, [])),
532     DbName = couch_util:get_value(<<"source">>, Props),
533     Source = try
534         open_db(DbName, UserCtx, ProxyParams)
535     catch
536     _Tag:DbError ->
537         DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
538            [couch_util:url_strip_password(DbName), couch_util:to_binary(DbError)]),
539         throw({error, iolist_to_binary(DbErrorMsg)})
540     end,
541     try
542         Body = case (catch open_doc(Source, <<"_design/", DDocName/binary>>)) of
543         {ok, #doc{body = Body0}} ->
544             Body0;
545         DocError ->
546             DocErrorMsg = io_lib:format(
547                 "Couldn't open document `_design/~s` from source "
548                 "database `~s`: ~s",
549                 [DDocName, dbname(Source), couch_util:to_binary(DocError)]),
550             throw({error, iolist_to_binary(DocErrorMsg)})
551         end,
552         Code = couch_util:get_nested_json_value(
553             Body, [<<"filters">>, FilterName]),
554         re:replace(Code, "^\s*(.*?)\s*$", "\\1", [{return, binary}])
555     after
556         close_db(Source)
557     end.
558
559 maybe_add_trailing_slash(Url) ->
560     re:replace(Url, "[^/]$", "&/", [{return, list}]).
561
562 get_rep_endpoint(_UserCtx, {Props}) ->
563     Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
564     {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
565     {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}),
566     case couch_util:get_value(<<"oauth">>, Auth) of
567     undefined ->
568         {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
569     {OAuth} ->
570         {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
571     end;
572 get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) ->
573     {remote, maybe_add_trailing_slash(Url), []};
574 get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) ->
575     {remote, maybe_add_trailing_slash(Url), []};
576 get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
577     {local, DbName, UserCtx}.
578
579 find_replication_logs(DbList, RepId, RepProps, UserCtx) ->
580     LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
581     fold_replication_logs(DbList, ?REP_ID_VERSION,
582         LogId, LogId, RepProps, UserCtx, []).
583
584 % Accumulate the replication logs
585 % Falls back to older log document ids and migrates them
586 fold_replication_logs([], _Vsn, _LogId, _NewId, _RepProps, _UserCtx, Acc) ->
587     lists:reverse(Acc);
588 fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId,
589         RepProps, UserCtx, Acc) ->
590     case open_replication_log(Db, LogId) of
591     {error, not_found} when Vsn > 1 ->
592         OldRepId = make_replication_id(RepProps, UserCtx, Vsn - 1),
593         fold_replication_logs(Dbs, Vsn - 1,
594             ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, RepProps, UserCtx, Acc);
595     {error, not_found} ->
596         fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
597             RepProps, UserCtx, [#doc{id=NewId}|Acc]);
598     {ok, Doc} when LogId =:= NewId ->
599         fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
600             RepProps, UserCtx, [Doc|Acc]);
601     {ok, Doc} ->
602         MigratedLog = #doc{id=NewId,body=Doc#doc.body},
603         fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
604             RepProps, UserCtx, [MigratedLog|Acc])
605     end.
606
607 open_replication_log(Db, DocId) ->
608     case open_doc(Db, DocId) of
609     {ok, Doc} ->
610         ?LOG_DEBUG("found a replication log for ~s", [dbname(Db)]),
611         {ok, Doc};
612     _ ->
613         ?LOG_DEBUG("didn't find a replication log for ~s", [dbname(Db)]),
614         {error, not_found}
615     end.
616
617 open_doc(#http_db{} = Db, DocId) ->
618     Req = Db#http_db{resource = couch_util:encode_doc_id(DocId)},
619     case couch_rep_httpc:request(Req) of
620     {[{<<"error">>, _}, {<<"reason">>, _}]} ->
621         {error, not_found};
622     Doc ->
623         {ok, couch_doc:from_json_obj(Doc)}
624     end;
625 open_doc(Db, DocId) ->
626     couch_db:open_doc(Db, DocId).
627
628 open_db(Props, UserCtx, ProxyParams) ->
629     open_db(Props, UserCtx, ProxyParams, false).
630
631 open_db({Props}, _UserCtx, ProxyParams, CreateTarget) ->
632     Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
633     {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}),
634     {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
635     Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders],
636     DefaultHeaders = (#http_db{})#http_db.headers,
637     Db1 = #http_db{
638         url = Url,
639         auth = AuthProps,
640         headers = lists:ukeymerge(1, Headers, DefaultHeaders)
641     },
642     Db = Db1#http_db{
643         options = Db1#http_db.options ++ ProxyParams ++
644             couch_rep_httpc:ssl_options(Db1)
645     },
646     couch_rep_httpc:db_exists(Db, CreateTarget);
647 open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
648     open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
649 open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
650     open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
651 open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) ->
652     try
653         case CreateTarget of
654         true ->
655             ok = couch_httpd:verify_is_server_admin(UserCtx),
656             couch_server:create(DbName, [{user_ctx, UserCtx}]);
657         false ->
658             ok
659         end,
660
661         case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
662         {ok, Db} ->
663             couch_db:monitor(Db),
664             Db;
665         {not_found, no_db_file} ->
666             throw({db_not_found, DbName})
667         end
668     catch throw:{unauthorized, _} ->
669         throw({unauthorized, DbName})
670     end.
671
672 schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) ->
673     Server = self(),
674     case timer:apply_after(5000, couch_rep, checkpoint, [Server]) of
675     {ok, TRef} ->
676         State#state{checkpoint_scheduled = TRef};
677     Error ->
678         ?LOG_ERROR("tried to schedule a checkpoint but got ~p", [Error]),
679         State
680     end;
681 schedule_checkpoint(State) ->
682     State.
683
684 do_checkpoint(State) ->
685     #state{
686         source = Source,
687         target = Target,
688         committed_seq = NewSeqNum,
689         start_seq = StartSeqNum,
690         history = OldHistory,
691         session_id = SessionId,
692         source_log = SourceLog,
693         target_log = TargetLog,
694         rep_starttime = ReplicationStartTime,
695         src_starttime = SrcInstanceStartTime,
696         tgt_starttime = TgtInstanceStartTime,
697         stats = Stats,
698         init_args = [_RepId, {RepDoc} | _]
699     } = State,
700     case commit_to_both(Source, Target, NewSeqNum) of
701     {SrcInstanceStartTime, TgtInstanceStartTime} ->
702         ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
703             [dbname(Source), dbname(Target), NewSeqNum]),
704         EndTime = ?l2b(httpd_util:rfc1123_date()),
705         StartTime = ?l2b(ReplicationStartTime),
706         DocsRead = ets:lookup_element(Stats, docs_read, 2),
707         DocsWritten = ets:lookup_element(Stats, docs_written, 2),
708         DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2),
709         NewHistoryEntry = {[
710             {<<"session_id">>, SessionId},
711             {<<"start_time">>, StartTime},
712             {<<"end_time">>, EndTime},
713             {<<"start_last_seq">>, StartSeqNum},
714             {<<"end_last_seq">>, NewSeqNum},
715             {<<"recorded_seq">>, NewSeqNum},
716             {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
717             {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
718             {<<"docs_read">>, DocsRead},
719             {<<"docs_written">>, DocsWritten},
720             {<<"doc_write_failures">>, DocWriteFailures}
721         ]},
722         BaseHistory = [
723             {<<"session_id">>, SessionId},
724             {<<"source_last_seq">>, NewSeqNum},
725             {<<"replication_id_version">>, ?REP_ID_VERSION}
726         ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of
727         undefined ->
728             [];
729         DocIds when is_list(DocIds) ->
730             % backwards compatibility with the result of a replication by
731             % doc IDs in versions 0.11.x and 1.0.x
732             [
733                 {<<"start_time">>, StartTime},
734                 {<<"end_time">>, EndTime},
735                 {<<"docs_read">>, DocsRead},
736                 {<<"docs_written">>, DocsWritten},
737                 {<<"doc_write_failures">>, DocWriteFailures}
738             ]
739         end,
740         % limit history to 50 entries
741         NewRepHistory = {
742             BaseHistory ++
743             [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
744         },
745
746         try
747         {SrcRevPos,SrcRevId} =
748             update_local_doc(Source, SourceLog#doc{body=NewRepHistory}),
749         {TgtRevPos,TgtRevId} =
750             update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
751         State#state{
752             checkpoint_scheduled = nil,
753             checkpoint_history = NewRepHistory,
754             source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
755             target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
756         }
757         catch throw:conflict ->
758         ?LOG_ERROR("checkpoint failure: conflict (are you replicating to "
759             "yourself?)", []),
760         State
761         end;
762     _Else ->
763         ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint",
764             [dbname(Source), dbname(Target)]),
765         #state{
766             changes_feed = CF,
767             missing_revs = MR,
768             reader = Reader,
769             writer = Writer
770         } = State,
771         Pids = [Writer, Reader, MR, CF],
772         [unlink(Pid) || Pid <- Pids],
773         [exit(Pid, shutdown) || Pid <- Pids],
774         close_db(Target),
775         close_db(Source),
776         {ok, NewState} = init(State#state.init_args),
777         NewState#state{listeners=State#state.listeners}
778     end.
779
780 commit_to_both(Source, Target, RequiredSeq) ->
781     % commit the src async
782     ParentPid = self(),
783     SrcCommitPid = spawn_link(fun() ->
784             ParentPid ! {self(), ensure_full_commit(Source, RequiredSeq)} end),
785
786     % commit tgt sync
787     TargetStartTime = ensure_full_commit(Target),
788
789     SourceStartTime =
790     receive
791     {SrcCommitPid, Timestamp} ->
792         Timestamp;
793     {'EXIT', SrcCommitPid, {http_request_failed, _}} ->
794         exit(replication_link_failure)
795     end,
796     {SourceStartTime, TargetStartTime}.
797     
798 ensure_full_commit(#http_db{headers = Headers} = Target) ->
799     Headers1 = [
800         {"Content-Length", 0} |
801         couch_util:proplist_apply_field(
802             {"Content-Type", "application/json"}, Headers)
803     ],
804     Req = Target#http_db{
805         resource = "_ensure_full_commit",
806         method = post,
807         headers = Headers1
808     },
809     {ResultProps} = couch_rep_httpc:request(Req),
810     true = couch_util:get_value(<<"ok">>, ResultProps),
811     couch_util:get_value(<<"instance_start_time">>, ResultProps);
812 ensure_full_commit(Target) ->
813     {ok, NewDb} = couch_db:open_int(Target#db.name, []),
814     UpdateSeq = couch_db:get_update_seq(Target),
815     CommitSeq = couch_db:get_committed_update_seq(NewDb),
816     InstanceStartTime = NewDb#db.instance_start_time,
817     couch_db:close(NewDb),
818     if UpdateSeq > CommitSeq ->
819         ?LOG_DEBUG("target needs a full commit: update ~p commit ~p",
820             [UpdateSeq, CommitSeq]),
821         {ok, DbStartTime} = couch_db:ensure_full_commit(Target),
822         DbStartTime;
823     true ->
824         ?LOG_DEBUG("target doesn't need a full commit", []),
825         InstanceStartTime
826     end.
827
828 ensure_full_commit(#http_db{headers = Headers} = Source, RequiredSeq) ->
829     Headers1 = [
830         {"Content-Length", 0} |
831         couch_util:proplist_apply_field(
832             {"Content-Type", "application/json"}, Headers)
833     ],
834     Req = Source#http_db{
835         resource = "_ensure_full_commit",
836         method = post,
837         qs = [{seq, iolist_to_binary(?JSON_ENCODE(RequiredSeq))}],
838         headers = Headers1
839     },
840     {ResultProps} = couch_rep_httpc:request(Req),
841     case couch_util:get_value(<<"ok">>, ResultProps) of
842     true ->
843         couch_util:get_value(<<"instance_start_time">>, ResultProps);
844     undefined -> nil end;
845 ensure_full_commit(Source, RequiredSeq) ->
846     {ok, NewDb} = couch_db:open_int(Source#db.name, []),
847     CommitSeq = couch_db:get_committed_update_seq(NewDb),
848     InstanceStartTime = NewDb#db.instance_start_time,
849     couch_db:close(NewDb),
850     if RequiredSeq > CommitSeq ->
851         ?LOG_DEBUG("source needs a full commit: required ~p committed ~p",
852             [RequiredSeq, CommitSeq]),
853         {ok, DbStartTime} = couch_db:ensure_full_commit(Source),
854         DbStartTime;
855     true ->
856         ?LOG_DEBUG("source doesn't need a full commit", []),
857         InstanceStartTime
858     end.
859
860 update_local_doc(#http_db{} = Db, Doc) ->
861     Req = Db#http_db{
862         resource = couch_util:encode_doc_id(Doc),
863         method = put,
864         body = couch_doc:to_json_obj(Doc, [attachments]),
865         headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers]
866     },
867     {ResponseMembers} = couch_rep_httpc:request(Req),
868     Rev = couch_util:get_value(<<"rev">>, ResponseMembers),
869     couch_doc:parse_rev(Rev);
870 update_local_doc(Db, Doc) ->
871     {ok, Result} = couch_db:update_doc(Db, Doc, [delay_commit]),
872     Result.
873
874 up_to_date(#http_db{}, _Seq) ->
875     true;
876 up_to_date(Source, Seq) ->
877     {ok, NewDb} = couch_db:open_int(Source#db.name, []),
878     T = NewDb#db.update_seq == Seq,
879     couch_db:close(NewDb),
880     T.
881
882 parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
883     parse_proxy_params(?b2l(ProxyUrl));
884 parse_proxy_params([]) ->
885     [];
886 parse_proxy_params(ProxyUrl) ->
887     #url{
888         host = Host,
889         port = Port,
890         username = User,
891         password = Passwd
892     } = ibrowse_lib:parse_url(ProxyUrl),
893     [{proxy_host, Host}, {proxy_port, Port}] ++
894         case is_list(User) andalso is_list(Passwd) of
895         false ->
896             [];
897         true ->
898             [{proxy_user, User}, {proxy_password, Passwd}]
899         end.
900
901 source_db_update_notifier(#db{name = DbName}) ->
902     Server = self(),
903     {ok, Notifier} = couch_db_update_notifier:start_link(
904         fun({compacted, DbName1}) when DbName1 =:= DbName ->
905                 ok = gen_server:cast(Server, reopen_source_db);
906             (_) ->
907                 ok
908         end),
909     Notifier;
910 source_db_update_notifier(_) ->
911     nil.
912
913 target_db_update_notifier(#db{name = DbName}) ->
914     Server = self(),
915     {ok, Notifier} = couch_db_update_notifier:start_link(
916         fun({compacted, DbName1}) when DbName1 =:= DbName ->
917                 ok = gen_server:cast(Server, reopen_target_db);
918             (_) ->
919                 ok
920         end),
921     Notifier;
922 target_db_update_notifier(_) ->
923     nil.