diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-10-25 15:46:05 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-10-25 21:45:32 -0400 |
commit | ebac05f686b56791511cb9b599dfb5a742dcfc96 (patch) | |
tree | 00a789cd058f98fa014d927f094f9e6e9f91f6f2 /apps/mem3/src/mem3_rep.erl | |
parent | 952a85381ff4b5b34426000b1dee73c9e74becdd (diff) |
use get-deps to pull down individual cloudant projects
Diffstat (limited to 'apps/mem3/src/mem3_rep.erl')
-rw-r--r-- | apps/mem3/src/mem3_rep.erl | 106 |
1 files changed, 0 insertions, 106 deletions
diff --git a/apps/mem3/src/mem3_rep.erl b/apps/mem3/src/mem3_rep.erl deleted file mode 100644 index f80eeb3d..00000000 --- a/apps/mem3/src/mem3_rep.erl +++ /dev/null @@ -1,106 +0,0 @@ --module(mem3_rep). - --export([go/2, changes_enumerator/2, make_local_id/2]). - --include("mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --define(CTX, #user_ctx{roles = [<<"_admin">>]}). - --record(acc, {revcount = 0, infos = [], seq, localid, source, target}). - -go(#shard{} = Source, #shard{} = Target) -> - LocalId = make_local_id(Source, Target), - {ok, Db} = couch_db:open(Source#shard.name, [{user_ctx,?CTX}]), - try go(Db, Target, LocalId) after couch_db:close(Db) end. - -go(#db{} = Db, #shard{} = Target, LocalId) -> - Seq = calculate_start_seq(Db, Target, LocalId), - Acc0 = #acc{source=Db, target=Target, seq=Seq, localid=LocalId}, - Fun = fun ?MODULE:changes_enumerator/2, - {ok, AccOut} = couch_db:changes_since(Db, all_docs, Seq, Fun, [], Acc0), - {ok, _} = replicate_batch(AccOut). - -make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}) -> - S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))), - T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))), - <<"_local/shard-sync-", S/binary, "-", T/binary>>. - -changes_enumerator(DocInfo, #acc{revcount = C} = Acc) when C >= 99 -> - Seq = DocInfo#doc_info.high_seq, - replicate_batch(Acc#acc{seq = Seq, infos = [DocInfo | Acc#acc.infos]}); - -changes_enumerator(DocInfo, #acc{revcount = C, infos = Infos} = Acc) -> - RevCount = C + length(DocInfo#doc_info.revs), - Seq = DocInfo#doc_info.high_seq, - {ok, Acc#acc{seq = Seq, revcount = RevCount, infos = [DocInfo | Infos]}}. - -replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) -> - case find_missing_revs(Acc) of - [] -> - ok; - Missing -> - ok = save_on_target(Node, Name, open_docs(Acc#acc.source, Missing)) - end, - update_locals(Acc), - {ok, Acc#acc{revcount=0, infos=[]}}. - -find_missing_revs(Acc) -> - #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc, - IdsRevs = lists:map(fun(#doc_info{id=Id, revs=RevInfos}) -> - {Id, [R || #rev_info{rev=R} <- RevInfos]} - end, Infos), - rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}). - -open_docs(Db, Missing) -> - lists:flatmap(fun({Id, Revs, _}) -> - {ok, Docs} = couch_db:open_doc_revs(Db, Id, Revs, [latest]), - [Doc || {ok, Doc} <- Docs] - end, Missing). - -save_on_target(Node, Name, Docs) -> - Options = [replicated_changes, full_commit, {user_ctx, ?CTX}], - rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), - ok. - -update_locals(Acc) -> - #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc, - #shard{name=Name, node=Node} = Target, - Doc = #doc{id = Id, body = {[ - {<<"seq">>, Seq}, - {<<"timestamp">>, list_to_binary(iso8601_timestamp())} - ]}}, - {ok, _} = couch_db:update_doc(Db, Doc, []), - Options = [{user_ctx, ?CTX}], - rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}). - -rexi_call(Node, MFA) -> - Ref = rexi:cast(Node, MFA), - receive {Ref, {ok, Reply}} -> - Reply; - {Ref, Error} -> - erlang:error(Error) - after 60000 -> - erlang:error(timeout) - end. - -calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) -> - case couch_db:open_doc(Db, LocalId, []) of - {ok, #doc{body = {SProps}}} -> - try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, []]}) of - #doc{body = {TProps}} -> - SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0), - TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0), - erlang:min(SourceSeq, TargetSeq) - catch error:_ -> - 0 - end; - _ -> - 0 - end. - -iso8601_timestamp() -> - {_,_,Micro} = Now = os:timestamp(), - {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now), - Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ", - io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]). |