diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_rep.erl | 31 | ||||
-rw-r--r-- | src/couchdb/couch_rep_reader.erl | 1 |
2 files changed, 27 insertions, 5 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 95a23869..9107918c 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -15,7 +15,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([replicate/2]). +-export([replicate/2, checkpoint/1]). -include("couch_db.hrl"). @@ -28,6 +28,7 @@ source, target, init_args, + checkpoint_scheduled = nil, start_seq, history, @@ -73,6 +74,9 @@ replicate({Props}=PostBody, UserCtx) -> get_result(Server, PostBody, UserCtx) end. +checkpoint(Server) -> + gen_server:cast(Server, do_checkpoint). + get_result(Server, PostBody, UserCtx) -> try gen_server:call(Server, get_result, infinity) of retry -> replicate(PostBody, UserCtx); @@ -137,6 +141,7 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> target = Target, init_args = InitArgs, stats = Stats, + checkpoint_scheduled = nil, start_seq = StartSeq, history = History, @@ -154,19 +159,22 @@ handle_call(get_result, From, State) -> Listeners = State#state.listeners, {noreply, State#state{listeners=[From|Listeners]}}. +handle_cast(do_checkpoint, State) -> + {noreply, do_checkpoint(State)}; + handle_cast(_Msg, State) -> {noreply, State}. handle_info({missing_revs_checkpoint, SourceSeq}, State) -> couch_task_status:update("MR Processed source update #~p", [SourceSeq]), - {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})}; + {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) when SourceSeq > N -> MissingRevs = State#state.missing_revs, ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), couch_task_status:update("W Processed source update #~p", [SourceSeq]), - {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})}; + {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, _}, State) -> {noreply, State}; @@ -190,6 +198,7 @@ handle_info({'EXIT', Pid, Reason}, State) -> terminate(normal, State) -> #state{ + checkpoint_scheduled = TRef, checkpoint_history = CheckpointHistory, committed_seq = NewSeq, listeners = Listeners, @@ -197,7 +206,8 @@ terminate(normal, State) -> target = Target, stats = Stats, source_log = #doc{body={OldHistory}} - } = State, + } = do_checkpoint(State), + timer:cancel(TRef), couch_task_status:update("Finishing"), ets:delete(Stats), close_db(Target), @@ -414,6 +424,18 @@ open_db(<<DbName/binary>>, UserCtx) -> {not_found, no_db_file} -> throw({db_not_found, DbName}) end. +schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) -> + Server = self(), + case timer:apply_after(5000, couch_rep, checkpoint, [Server]) of + {ok, TRef} -> + State#state{checkpoint_scheduled = TRef}; + Error -> + ?LOG_ERROR("tried to schedule a checkpoint but got ~p", [Error]), + State + end; +schedule_checkpoint(State) -> + State. + do_checkpoint(State) -> #state{ source = Source, @@ -466,6 +488,7 @@ do_checkpoint(State) -> {TgtRevPos,TgtRevId} = update_local_doc(Target, TargetLog#doc{body=NewRepHistory}), State#state{ + checkpoint_scheduled = nil, checkpoint_history = NewRepHistory, source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index 4feb3804..735ab742 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -130,7 +130,6 @@ handle_next_docs(_From, State) -> gen_server:reply(ReaderFrom, ok); true -> ok end, NewState = State#state{count=0, reader_from=nil, docs=queue:new()}, - ?LOG_INFO("replying to next_docs with HighSeq ~p", [calculate_new_high_seq(State)]), {reply, {calculate_new_high_seq(State), queue:to_list(Docs)}, NewState}. handle_open_remote_doc(Id, Seq, Revs, From, #state{monitor_count=N} = State) |