summaryrefslogtreecommitdiff
path: root/apps/mem3/src/mem3_rep.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-10-25 15:46:05 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-10-25 21:45:32 -0400
commitebac05f686b56791511cb9b599dfb5a742dcfc96 (patch)
tree00a789cd058f98fa014d927f094f9e6e9f91f6f2 /apps/mem3/src/mem3_rep.erl
parent952a85381ff4b5b34426000b1dee73c9e74becdd (diff)
use get-deps to pull down individual cloudant projects
Diffstat (limited to 'apps/mem3/src/mem3_rep.erl')
-rw-r--r--apps/mem3/src/mem3_rep.erl106
1 files changed, 0 insertions, 106 deletions
diff --git a/apps/mem3/src/mem3_rep.erl b/apps/mem3/src/mem3_rep.erl
deleted file mode 100644
index f80eeb3d..00000000
--- a/apps/mem3/src/mem3_rep.erl
+++ /dev/null
@@ -1,106 +0,0 @@
--module(mem3_rep).
-
--export([go/2, changes_enumerator/2, make_local_id/2]).
-
--include("mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--define(CTX, #user_ctx{roles = [<<"_admin">>]}).
-
--record(acc, {revcount = 0, infos = [], seq, localid, source, target}).
-
-go(#shard{} = Source, #shard{} = Target) ->
- LocalId = make_local_id(Source, Target),
- {ok, Db} = couch_db:open(Source#shard.name, [{user_ctx,?CTX}]),
- try go(Db, Target, LocalId) after couch_db:close(Db) end.
-
-go(#db{} = Db, #shard{} = Target, LocalId) ->
- Seq = calculate_start_seq(Db, Target, LocalId),
- Acc0 = #acc{source=Db, target=Target, seq=Seq, localid=LocalId},
- Fun = fun ?MODULE:changes_enumerator/2,
- {ok, AccOut} = couch_db:changes_since(Db, all_docs, Seq, Fun, [], Acc0),
- {ok, _} = replicate_batch(AccOut).
-
-make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}) ->
- S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
- T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
- <<"_local/shard-sync-", S/binary, "-", T/binary>>.
-
-changes_enumerator(DocInfo, #acc{revcount = C} = Acc) when C >= 99 ->
- Seq = DocInfo#doc_info.high_seq,
- replicate_batch(Acc#acc{seq = Seq, infos = [DocInfo | Acc#acc.infos]});
-
-changes_enumerator(DocInfo, #acc{revcount = C, infos = Infos} = Acc) ->
- RevCount = C + length(DocInfo#doc_info.revs),
- Seq = DocInfo#doc_info.high_seq,
- {ok, Acc#acc{seq = Seq, revcount = RevCount, infos = [DocInfo | Infos]}}.
-
-replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
- case find_missing_revs(Acc) of
- [] ->
- ok;
- Missing ->
- ok = save_on_target(Node, Name, open_docs(Acc#acc.source, Missing))
- end,
- update_locals(Acc),
- {ok, Acc#acc{revcount=0, infos=[]}}.
-
-find_missing_revs(Acc) ->
- #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc,
- IdsRevs = lists:map(fun(#doc_info{id=Id, revs=RevInfos}) ->
- {Id, [R || #rev_info{rev=R} <- RevInfos]}
- end, Infos),
- rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}).
-
-open_docs(Db, Missing) ->
- lists:flatmap(fun({Id, Revs, _}) ->
- {ok, Docs} = couch_db:open_doc_revs(Db, Id, Revs, [latest]),
- [Doc || {ok, Doc} <- Docs]
- end, Missing).
-
-save_on_target(Node, Name, Docs) ->
- Options = [replicated_changes, full_commit, {user_ctx, ?CTX}],
- rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
- ok.
-
-update_locals(Acc) ->
- #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
- #shard{name=Name, node=Node} = Target,
- Doc = #doc{id = Id, body = {[
- {<<"seq">>, Seq},
- {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
- ]}},
- {ok, _} = couch_db:update_doc(Db, Doc, []),
- Options = [{user_ctx, ?CTX}],
- rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).
-
-rexi_call(Node, MFA) ->
- Ref = rexi:cast(Node, MFA),
- receive {Ref, {ok, Reply}} ->
- Reply;
- {Ref, Error} ->
- erlang:error(Error)
- after 60000 ->
- erlang:error(timeout)
- end.
-
-calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
- case couch_db:open_doc(Db, LocalId, []) of
- {ok, #doc{body = {SProps}}} ->
- try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, []]}) of
- #doc{body = {TProps}} ->
- SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
- TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
- erlang:min(SourceSeq, TargetSeq)
- catch error:_ ->
- 0
- end;
- _ ->
- 0
- end.
-
-iso8601_timestamp() ->
- {_,_,Micro} = Now = os:timestamp(),
- {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
- Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
- io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).