diff options
Diffstat (limited to 'apps/fabric/src')
-rw-r--r-- | apps/fabric/src/fabric.erl | 225 | ||||
-rw-r--r-- | apps/fabric/src/fabric_db_create.erl | 65 | ||||
-rw-r--r-- | apps/fabric/src/fabric_db_delete.erl | 41 | ||||
-rw-r--r-- | apps/fabric/src/fabric_db_doc_count.erl | 32 | ||||
-rw-r--r-- | apps/fabric/src/fabric_db_info.erl | 52 | ||||
-rw-r--r-- | apps/fabric/src/fabric_db_meta.erl | 35 | ||||
-rw-r--r-- | apps/fabric/src/fabric_dict.erl | 37 | ||||
-rw-r--r-- | apps/fabric/src/fabric_doc_attachments.erl | 102 | ||||
-rw-r--r-- | apps/fabric/src/fabric_doc_missing_revs.erl | 64 | ||||
-rw-r--r-- | apps/fabric/src/fabric_doc_open.erl | 66 | ||||
-rw-r--r-- | apps/fabric/src/fabric_doc_open_revs.erl | 65 | ||||
-rw-r--r-- | apps/fabric/src/fabric_doc_update.erl | 127 | ||||
-rw-r--r-- | apps/fabric/src/fabric_group_info.erl | 52 | ||||
-rw-r--r-- | apps/fabric/src/fabric_rpc.erl | 388 | ||||
-rw-r--r-- | apps/fabric/src/fabric_util.erl | 89 | ||||
-rw-r--r-- | apps/fabric/src/fabric_view.erl | 218 | ||||
-rw-r--r-- | apps/fabric/src/fabric_view_all_docs.erl | 167 | ||||
-rw-r--r-- | apps/fabric/src/fabric_view_changes.erl | 251 | ||||
-rw-r--r-- | apps/fabric/src/fabric_view_map.erl | 138 | ||||
-rw-r--r-- | apps/fabric/src/fabric_view_reduce.erl | 85 |
20 files changed, 2299 insertions, 0 deletions
diff --git a/apps/fabric/src/fabric.erl b/apps/fabric/src/fabric.erl new file mode 100644 index 00000000..1be97a98 --- /dev/null +++ b/apps/fabric/src/fabric.erl @@ -0,0 +1,225 @@ +-module(fabric). + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +% DBs +-export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1, + delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3, + set_security/3, get_revs_limit/1, get_security/1]). + +% Documents +-export([open_doc/3, open_revs/4, get_missing_revs/2, update_doc/3, + update_docs/3, att_receiver/2]). + +% Views +-export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6, + get_view_group_info/2]). + +% miscellany +-export([design_docs/1, reset_validation_funs/1]). + +-include("fabric.hrl"). + +% db operations + +all_dbs() -> + all_dbs(<<>>). + +all_dbs(Prefix) when is_list(Prefix) -> + all_dbs(list_to_binary(Prefix)); +all_dbs(Prefix) when is_binary(Prefix) -> + Length = byte_size(Prefix), + MatchingDbs = ets:foldl(fun(#shard{dbname=DbName}, Acc) -> + case DbName of + <<Prefix:Length/binary, _/binary>> -> + [DbName | Acc]; + _ -> + Acc + end + end, [], partitions), + {ok, lists:usort(MatchingDbs)}. + +get_db_info(DbName) -> + fabric_db_info:go(dbname(DbName)). + +get_doc_count(DbName) -> + fabric_db_doc_count:go(dbname(DbName)). + +create_db(DbName) -> + create_db(DbName, []). + +create_db(DbName, Options) -> + fabric_db_create:go(dbname(DbName), opts(Options)). + +delete_db(DbName) -> + delete_db(DbName, []). + +delete_db(DbName, Options) -> + fabric_db_delete:go(dbname(DbName), opts(Options)). + +set_revs_limit(DbName, Limit, Options) when is_integer(Limit), Limit > 0 -> + fabric_db_meta:set_revs_limit(dbname(DbName), Limit, opts(Options)). + +get_revs_limit(DbName) -> + {ok, Db} = fabric_util:get_db(dbname(DbName)), + try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end. + +set_security(DbName, SecObj, Options) -> + fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)). + +get_security(DbName) -> + {ok, Db} = fabric_util:get_db(dbname(DbName)), + try couch_db:get_security(Db) after catch couch_db:close(Db) end. + +% doc operations +open_doc(DbName, Id, Options) -> + fabric_doc_open:go(dbname(DbName), docid(Id), opts(Options)). + +open_revs(DbName, Id, Revs, Options) -> + fabric_doc_open_revs:go(dbname(DbName), docid(Id), Revs, opts(Options)). + +get_missing_revs(DbName, IdsRevs) when is_list(IdsRevs) -> + Sanitized = [idrevs(IdR) || IdR <- IdsRevs], + fabric_doc_missing_revs:go(dbname(DbName), Sanitized). + +update_doc(DbName, Doc, Options) -> + case update_docs(DbName, [Doc], opts(Options)) of + {ok, [{ok, NewRev}]} -> + {ok, NewRev}; + {ok, [Error]} -> + throw(Error); + {ok, []} -> + % replication success + #doc{revs = {Pos, [RevId | _]}} = doc(Doc), + {ok, {Pos, RevId}} + end. + +update_docs(DbName, Docs, Options) -> + try fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)) + catch {aborted, PreCommitFailures} -> + {aborted, PreCommitFailures} + end. + +att_receiver(Req, Length) -> + fabric_doc_attachments:receiver(Req, Length). + +all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs) when + is_function(Callback, 2) -> + fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0). + +changes(DbName, Callback, Acc0, Options) -> + % TODO use a keylist for Options instead of #changes_args, BugzID 10281 + Feed = Options#changes_args.feed, + fabric_view_changes:go(dbname(DbName), Feed, Options, Callback, Acc0). + +query_view(DbName, DesignName, ViewName) -> + query_view(DbName, DesignName, ViewName, #view_query_args{}). + +query_view(DbName, DesignName, ViewName, QueryArgs) -> + Callback = fun default_callback/2, + query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs). + +query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) -> + Db = dbname(DbName), View = name(ViewName), + case is_reduce_view(Db, Design, View, QueryArgs) of + true -> + Mod = fabric_view_reduce; + false -> + Mod = fabric_view_map + end, + Mod:go(Db, Design, View, QueryArgs, Callback, Acc0). + +get_view_group_info(DbName, DesignId) -> + fabric_group_info:go(dbname(DbName), design_doc(DesignId)). + +design_docs(DbName) -> + QueryArgs = #view_query_args{start_key = <<"_design/">>, include_docs=true}, + Callback = fun({total_and_offset, _, _}, []) -> + {ok, []}; + ({row, {Props}}, Acc) -> + case couch_util:get_value(id, Props) of + <<"_design/", _/binary>> -> + {ok, [couch_util:get_value(doc, Props) | Acc]}; + _ -> + {stop, Acc} + end; + (complete, Acc) -> + {ok, lists:reverse(Acc)} + end, + fabric:all_docs(dbname(DbName), Callback, [], QueryArgs). + +reset_validation_funs(DbName) -> + [rexi:cast(Node, {fabric_rpc, reset_validation_funs, [Name]}) || + #shard{node=Node, name=Name} <- mem3:shards(DbName)]. + +%% some simple type validation and transcoding + +dbname(DbName) when is_list(DbName) -> + list_to_binary(DbName); +dbname(DbName) when is_binary(DbName) -> + DbName; +dbname(#db{name=Name}) -> + Name; +dbname(DbName) -> + erlang:error({illegal_database_name, DbName}). + +name(Thing) -> + couch_util:to_binary(Thing). + +docid(DocId) when is_list(DocId) -> + list_to_binary(DocId); +docid(DocId) when is_binary(DocId) -> + DocId; +docid(DocId) -> + erlang:error({illegal_docid, DocId}). + +docs(Docs) when is_list(Docs) -> + [doc(D) || D <- Docs]; +docs(Docs) -> + erlang:error({illegal_docs_list, Docs}). + +doc(#doc{} = Doc) -> + Doc; +doc({_} = Doc) -> + couch_doc:from_json_obj(Doc); +doc(Doc) -> + erlang:error({illegal_doc_format, Doc}). + +design_doc(#doc{} = DDoc) -> + DDoc; +design_doc(DocId) when is_list(DocId) -> + design_doc(list_to_binary(DocId)); +design_doc(<<"_design/", _/binary>> = DocId) -> + DocId; +design_doc(GroupName) -> + <<"_design/", GroupName/binary>>. + +idrevs({Id, Revs}) when is_list(Revs) -> + {docid(Id), [rev(R) || R <- Revs]}. + +rev(Rev) when is_list(Rev); is_binary(Rev) -> + couch_doc:parse_rev(Rev); +rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) -> + Rev. + +opts(Options) -> + case couch_util:get_value(user_ctx, Options) of + undefined -> + case erlang:get(user_ctx) of + #user_ctx{} = Ctx -> + [{user_ctx, Ctx} | Options]; + _ -> + Options + end; + _ -> + Options + end. + +default_callback(complete, Acc) -> + {ok, lists:reverse(Acc)}; +default_callback(Row, Acc) -> + {ok, [Row | Acc]}. + +is_reduce_view(_, _, _, #view_query_args{view_type=Reduce}) -> + Reduce =:= reduce. diff --git a/apps/fabric/src/fabric_db_create.erl b/apps/fabric/src/fabric_db_create.erl new file mode 100644 index 00000000..d10bcc22 --- /dev/null +++ b/apps/fabric/src/fabric_db_create.erl @@ -0,0 +1,65 @@ +-module(fabric_db_create). +-export([go/2]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(DBNAME_REGEX, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/\\s.]*$"). + +%% @doc Create a new database, and all its partition files across the cluster +%% Options is proplist with user_ctx, n, q +go(DbName, Options) -> + case re:run(DbName, ?DBNAME_REGEX, [{capture,none}]) of + match -> + Shards = mem3:choose_shards(DbName, Options), + Doc = make_document(Shards), + Workers = fabric_util:submit_jobs(Shards, create_db, [Options, Doc]), + Acc0 = fabric_dict:init(Workers, nil), + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, _} -> + ok; + Else -> + Else + end; + nomatch -> + {error, illegal_database_name} + end. + +handle_message(Msg, Shard, Counters) -> + C1 = fabric_dict:store(Shard, Msg, Counters), + case fabric_dict:any(nil, C1) of + true -> + {ok, C1}; + false -> + final_answer(C1) + end. + +make_document([#shard{dbname=DbName}|_] = Shards) -> + {RawOut, ByNodeOut, ByRangeOut} = + lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) -> + Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-", + couch_util:to_hex(<<E:32/integer>>)]), + Node = couch_util:to_binary(N), + {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode), + orddict:append(Range, Node, ByRange)} + end, {[], [], []}, Shards), + #doc{id=DbName, body = {[ + {<<"changelog">>, lists:sort(RawOut)}, + {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, + {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} + ]}}. + +final_answer(Counters) -> + Successes = [X || {_, M} = X <- Counters, M == ok orelse M == file_exists], + case fabric_view:is_progress_possible(Successes) of + true -> + case lists:keymember(file_exists, 2, Successes) of + true -> + {error, file_exists}; + false -> + {stop, ok} + end; + false -> + {error, internal_server_error} + end. diff --git a/apps/fabric/src/fabric_db_delete.erl b/apps/fabric/src/fabric_db_delete.erl new file mode 100644 index 00000000..57eefa9e --- /dev/null +++ b/apps/fabric/src/fabric_db_delete.erl @@ -0,0 +1,41 @@ +-module(fabric_db_delete). +-export([go/2]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +go(DbName, Options) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, delete_db, [Options, DbName]), + Acc0 = fabric_dict:init(Workers, nil), + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, ok} -> + ok; + {ok, not_found} -> + erlang:error(database_does_not_exist); + Error -> + Error + end. + +handle_message(Msg, Shard, Counters) -> + C1 = fabric_dict:store(Shard, Msg, Counters), + case fabric_dict:any(nil, C1) of + true -> + {ok, C1}; + false -> + final_answer(C1) + end. + +final_answer(Counters) -> + Successes = [X || {_, M} = X <- Counters, M == ok orelse M == not_found], + case fabric_view:is_progress_possible(Successes) of + true -> + case lists:keymember(ok, 2, Successes) of + true -> + {stop, ok}; + false -> + {stop, not_found} + end; + false -> + {error, internal_server_error} + end. diff --git a/apps/fabric/src/fabric_db_doc_count.erl b/apps/fabric/src/fabric_db_doc_count.erl new file mode 100644 index 00000000..12d5cbf8 --- /dev/null +++ b/apps/fabric/src/fabric_db_doc_count.erl @@ -0,0 +1,32 @@ +-module(fabric_db_doc_count). + +-export([go/1]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, get_doc_count, []), + Acc0 = {fabric_dict:init(Workers, nil), 0}, + fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). + +handle_message({ok, Count}, Shard, {Counters, Acc}) -> + case fabric_dict:lookup_element(Shard, Counters) of + undefined -> + % already heard from someone else in this range + {ok, {Counters, Acc}}; + nil -> + C1 = fabric_dict:store(Shard, ok, Counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + case fabric_dict:any(nil, C2) of + true -> + {ok, {C2, Count+Acc}}; + false -> + {stop, Count+Acc} + end + end; +handle_message(_, _, Acc) -> + {ok, Acc}. + diff --git a/apps/fabric/src/fabric_db_info.erl b/apps/fabric/src/fabric_db_info.erl new file mode 100644 index 00000000..3758c5c3 --- /dev/null +++ b/apps/fabric/src/fabric_db_info.erl @@ -0,0 +1,52 @@ +-module(fabric_db_info). + +-export([go/1]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +go(DbName) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, get_db_info, []), + Acc0 = {fabric_dict:init(Workers, nil), []}, + fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). + +handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) -> + case fabric_dict:lookup_element(Shard, Counters) of + undefined -> + % already heard from someone else in this range + {ok, {Counters, Acc}}; + nil -> + C1 = fabric_dict:store(Shard, ok, Counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + case fabric_dict:any(nil, C2) of + true -> + {ok, {C2, [Info|Acc]}}; + false -> + {stop, [{db_name,Name}|merge_results(lists:flatten([Info|Acc]))]} + end + end; +handle_message(_, _, Acc) -> + {ok, Acc}. + +merge_results(Info) -> + Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, + orddict:new(), Info), + orddict:fold(fun + (doc_count, X, Acc) -> + [{doc_count, lists:sum(X)} | Acc]; + (doc_del_count, X, Acc) -> + [{doc_del_count, lists:sum(X)} | Acc]; + (update_seq, X, Acc) -> + [{update_seq, lists:sum(X)} | Acc]; + (purge_seq, X, Acc) -> + [{purge_seq, lists:sum(X)} | Acc]; + (compact_running, X, Acc) -> + [{compact_running, lists:member(true, X)} | Acc]; + (disk_size, X, Acc) -> + [{disk_size, lists:sum(X)} | Acc]; + (disk_format_version, X, Acc) -> + [{disk_format_version, lists:max(X)} | Acc]; + (_, _, Acc) -> + Acc + end, [{instance_start_time, <<"0">>}], Dict). diff --git a/apps/fabric/src/fabric_db_meta.erl b/apps/fabric/src/fabric_db_meta.erl new file mode 100644 index 00000000..ee15fc72 --- /dev/null +++ b/apps/fabric/src/fabric_db_meta.erl @@ -0,0 +1,35 @@ +-module(fabric_db_meta). + +-export([set_revs_limit/3, set_security/3]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +set_revs_limit(DbName, Limit, Options) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, set_revs_limit, [Limit, Options]), + Waiting = length(Workers) - 1, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of + {ok, ok} -> + ok; + Error -> + Error + end. + +set_security(DbName, SecObj, Options) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, set_security, [SecObj, Options]), + Waiting = length(Workers) - 1, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of + {ok, ok} -> + ok; + Error -> + Error + end. + +handle_message(ok, _, 0) -> + {stop, ok}; +handle_message(ok, _, Waiting) -> + {ok, Waiting - 1}; +handle_message(Error, _, _Waiting) -> + {error, Error}.
\ No newline at end of file diff --git a/apps/fabric/src/fabric_dict.erl b/apps/fabric/src/fabric_dict.erl new file mode 100644 index 00000000..42d46b34 --- /dev/null +++ b/apps/fabric/src/fabric_dict.erl @@ -0,0 +1,37 @@ +-module(fabric_dict). +-compile(export_all). + +% Instead of ets, let's use an ordered keylist. We'll need to revisit if we +% have >> 100 shards, so a private interface is a good idea. - APK June 2010 + +init(Keys, InitialValue) -> + orddict:from_list([{Key, InitialValue} || Key <- Keys]). + + +decrement_all(Dict) -> + [{K,V-1} || {K,V} <- Dict]. + +store(Key, Value, Dict) -> + orddict:store(Key, Value, Dict). + +erase(Key, Dict) -> + orddict:erase(Key, Dict). + +update_counter(Key, Incr, Dict0) -> + orddict:update_counter(Key, Incr, Dict0). + + +lookup_element(Key, Dict) -> + couch_util:get_value(Key, Dict). + +size(Dict) -> + orddict:size(Dict). + +any(Value, Dict) -> + lists:keymember(Value, 2, Dict). + +filter(Fun, Dict) -> + orddict:filter(Fun, Dict). + +fold(Fun, Acc0, Dict) -> + orddict:fold(Fun, Acc0, Dict). diff --git a/apps/fabric/src/fabric_doc_attachments.erl b/apps/fabric/src/fabric_doc_attachments.erl new file mode 100644 index 00000000..aecdaaef --- /dev/null +++ b/apps/fabric/src/fabric_doc_attachments.erl @@ -0,0 +1,102 @@ +-module(fabric_doc_attachments). + +-include("fabric.hrl"). + +%% couch api calls +-export([receiver/2]). + +receiver(_Req, undefined) -> + <<"">>; +receiver(_Req, {unknown_transfer_encoding, Unknown}) -> + exit({unknown_transfer_encoding, Unknown}); +receiver(Req, chunked) -> + MiddleMan = spawn(fun() -> middleman(Req, chunked) end), + fun(4096, ChunkFun, ok) -> + write_chunks(MiddleMan, ChunkFun) + end; +receiver(_Req, 0) -> + <<"">>; +receiver(Req, Length) when is_integer(Length) -> + Middleman = spawn(fun() -> middleman(Req, Length) end), + fun() -> + Middleman ! {self(), gimme_data}, + receive {Middleman, Data} -> Data end + end; +receiver(_Req, Length) -> + exit({length_not_integer, Length}). + +%% +%% internal +%% + +write_chunks(MiddleMan, ChunkFun) -> + MiddleMan ! {self(), gimme_data}, + receive + {MiddleMan, {0, _Footers}} -> + % MiddleMan ! {self(), done}, + ok; + {MiddleMan, ChunkRecord} -> + ChunkFun(ChunkRecord, ok), + write_chunks(MiddleMan, ChunkFun) + end. + +receive_unchunked_attachment(_Req, 0) -> + ok; +receive_unchunked_attachment(Req, Length) -> + receive {MiddleMan, go} -> + Data = couch_httpd:recv(Req, 0), + MiddleMan ! {self(), Data} + end, + receive_unchunked_attachment(Req, Length - size(Data)). + +middleman(Req, chunked) -> + % spawn a process to actually receive the uploaded data + RcvFun = fun(ChunkRecord, ok) -> + receive {From, go} -> From ! {self(), ChunkRecord} end, ok + end, + Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end), + + % take requests from the DB writers and get data from the receiver + N = erlang:list_to_integer(couch_config:get("cluster","n")), + middleman_loop(Receiver, N, dict:new(), 0, []); + +middleman(Req, Length) -> + Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end), + N = erlang:list_to_integer(couch_config:get("cluster","n")), + middleman_loop(Receiver, N, dict:new(), 0, []). + +middleman_loop(Receiver, N, Counters, Offset, ChunkList) -> + receive {From, gimme_data} -> + % figure out how far along this writer (From) is in the list + {NewCounters, WhichChunk} = case dict:find(From, Counters) of + {ok, I} -> + {dict:update_counter(From, 1, Counters), I}; + error -> + {dict:store(From, 2, Counters), 1} + end, + ListIndex = WhichChunk - Offset, + + % talk to the receiver to get another chunk if necessary + ChunkList1 = if ListIndex > length(ChunkList) -> + Receiver ! {self(), go}, + receive {Receiver, ChunkRecord} -> ChunkList ++ [ChunkRecord] end; + true -> ChunkList end, + + % reply to the writer + From ! {self(), lists:nth(ListIndex, ChunkList1)}, + + % check if we can drop a chunk from the head of the list + SmallestIndex = dict:fold(fun(_, Val, Acc) -> lists:min([Val,Acc]) end, + WhichChunk+1, NewCounters), + Size = dict:size(NewCounters), + + {NewChunkList, NewOffset} = + if Size == N andalso (SmallestIndex - Offset) == 2 -> + {tl(ChunkList1), Offset+1}; + true -> + {ChunkList1, Offset} + end, + middleman_loop(Receiver, N, NewCounters, NewOffset, NewChunkList) + after 10000 -> + ok + end. diff --git a/apps/fabric/src/fabric_doc_missing_revs.erl b/apps/fabric/src/fabric_doc_missing_revs.erl new file mode 100644 index 00000000..9a368783 --- /dev/null +++ b/apps/fabric/src/fabric_doc_missing_revs.erl @@ -0,0 +1,64 @@ +-module(fabric_doc_missing_revs). + +-export([go/2]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +go(DbName, AllIdsRevs) -> + Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) -> + Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}), + Shard#shard{ref=Ref} + end, group_idrevs_by_shard(DbName, AllIdsRevs)), + ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]), + Acc0 = {length(Workers), ResultDict}, + fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). + +handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> + skip_message(Acc0); +handle_message({rexi_EXIT, _, _, _}, _Worker, Acc0) -> + skip_message(Acc0); +handle_message({ok, Results}, _Worker, {1, D0}) -> + D = update_dict(D0, Results), + {stop, dict:fold(fun force_reply/3, [], D)}; +handle_message({ok, Results}, _Worker, {WaitingCount, D0}) -> + D = update_dict(D0, Results), + case dict:fold(fun maybe_reply/3, {stop, []}, D) of + continue -> + % still haven't heard about some Ids + {ok, {WaitingCount - 1, D}}; + {stop, FinalReply} -> + {stop, FinalReply} + end. + +force_reply(Id, {nil,Revs}, Acc) -> + % never heard about this ID, assume it's missing + [{Id, Revs} | Acc]; +force_reply(_, [], Acc) -> + Acc; +force_reply(Id, Revs, Acc) -> + [{Id, Revs} | Acc]. + +maybe_reply(_, _, continue) -> + continue; +maybe_reply(_, {nil, _}, _) -> + continue; +maybe_reply(_, [], {stop, Acc}) -> + {stop, Acc}; +maybe_reply(Id, Revs, {stop, Acc}) -> + {stop, [{Id, Revs} | Acc]}. + +group_idrevs_by_shard(DbName, IdsRevs) -> + dict:to_list(lists:foldl(fun({Id, Revs}, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, {Id, Revs}, D1) + end, D0, mem3:shards(DbName,Id)) + end, dict:new(), IdsRevs)). + +update_dict(D0, KVs) -> + lists:foldl(fun({K,V,_}, D1) -> dict:store(K, V, D1) end, D0, KVs). + +skip_message({1, Dict}) -> + {stop, dict:fold(fun force_reply/3, [], Dict)}; +skip_message({WaitingCount, Dict}) -> + {ok, {WaitingCount-1, Dict}}. diff --git a/apps/fabric/src/fabric_doc_open.erl b/apps/fabric/src/fabric_doc_open.erl new file mode 100644 index 00000000..5c5699c3 --- /dev/null +++ b/apps/fabric/src/fabric_doc_open.erl @@ -0,0 +1,66 @@ +-module(fabric_doc_open). + +-export([go/3]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, Id, Options) -> + Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc, + [Id, [deleted|Options]]), + SuppressDeletedDoc = not lists:member(deleted, Options), + R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")), + Acc0 = {length(Workers), list_to_integer(R), []}, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, {ok, #doc{deleted=true}}} when SuppressDeletedDoc -> + {not_found, deleted}; + {ok, Else} -> + Else; + Error -> + Error + end. + +handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> + skip_message(Acc0); +handle_message({rexi_EXIT, _Reason}, _Worker, Acc0) -> + skip_message(Acc0); +handle_message(Reply, _Worker, {WaitingCount, R, Replies}) -> + case merge_read_reply(make_key(Reply), Reply, Replies) of + {_, KeyCount} when KeyCount =:= R -> + {stop, Reply}; + {NewReplies, KeyCount} when KeyCount < R -> + if WaitingCount =:= 1 -> + % last message arrived, but still no quorum + repair_read_quorum_failure(NewReplies); + true -> + {ok, {WaitingCount-1, R, NewReplies}} + end + end. + +skip_message({1, _R, Replies}) -> + repair_read_quorum_failure(Replies); +skip_message({WaitingCount, R, Replies}) -> + {ok, {WaitingCount-1, R, Replies}}. + +merge_read_reply(Key, Reply, Replies) -> + case lists:keyfind(Key, 1, Replies) of + false -> + {[{Key, Reply, 1} | Replies], 1}; + {Key, _, N} -> + {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} + end. + +make_key({ok, #doc{id=Id, revs=Revs}}) -> + {Id, Revs}; +make_key(Else) -> + Else. + +repair_read_quorum_failure(Replies) -> + case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of + [] -> + {stop, {not_found, missing}}; + [Doc|_] -> + % TODO merge docs to find the winner as determined by replication + {stop, {ok, Doc}} + end.
\ No newline at end of file diff --git a/apps/fabric/src/fabric_doc_open_revs.erl b/apps/fabric/src/fabric_doc_open_revs.erl new file mode 100644 index 00000000..61ff466f --- /dev/null +++ b/apps/fabric/src/fabric_doc_open_revs.erl @@ -0,0 +1,65 @@ +-module(fabric_doc_open_revs). + +-export([go/4]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, Id, Revs, Options) -> + Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs, + [Id, Revs, Options]), + R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")), + Acc0 = {length(Workers), list_to_integer(R), []}, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, {ok, Reply}} -> + {ok, Reply}; + Else -> + Else + end. + +handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> + skip_message(Acc0); +handle_message({rexi_EXIT, _}, _Worker, Acc0) -> + skip_message(Acc0); +handle_message(Reply, _Worker, {WaitingCount, R, Replies}) -> + case merge_read_reply(make_key(Reply), Reply, Replies) of + {_, KeyCount} when KeyCount =:= R -> + {stop, Reply}; + {NewReplies, KeyCount} when KeyCount < R -> + if WaitingCount =:= 1 -> + % last message arrived, but still no quorum + repair_read_quorum_failure(NewReplies); + true -> + {ok, {WaitingCount-1, R, NewReplies}} + end + end. + +skip_message({1, _R, Replies}) -> + repair_read_quorum_failure(Replies); +skip_message({WaitingCount, R, Replies}) -> + {ok, {WaitingCount-1, R, Replies}}. + +merge_read_reply(Key, Reply, Replies) -> + case lists:keyfind(Key, 1, Replies) of + false -> + {[{Key, Reply, 1} | Replies], 1}; + {Key, _, N} -> + {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} + end. + +make_key({ok, #doc{id=Id, revs=Revs}}) -> + {Id, Revs}; +make_key(Else) -> + Else. + +repair_read_quorum_failure(Replies) -> + case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of + [] -> + {stop, {not_found, missing}}; + [Doc|_] -> + % TODO merge docs to find the winner as determined by replication + {stop, {ok, Doc}} + end. + +
\ No newline at end of file diff --git a/apps/fabric/src/fabric_doc_update.erl b/apps/fabric/src/fabric_doc_update.erl new file mode 100644 index 00000000..f0fcf112 --- /dev/null +++ b/apps/fabric/src/fabric_doc_update.erl @@ -0,0 +1,127 @@ +-module(fabric_doc_update). + +-export([go/3]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(_, [], _) -> + {ok, []}; +go(DbName, AllDocs, Opts) -> + validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), + Options = lists:delete(all_or_nothing, Opts), + GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> + Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), + {Shard#shard{ref=Ref}, Docs} + end, group_docs_by_shard(DbName, AllDocs)), + {Workers, _} = lists:unzip(GroupedDocs), + W = couch_util:get_value(w, Options, couch_config:get("cluster","w","2")), + Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, + dict:from_list([{Doc,[]} || Doc <- AllDocs])}, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, Results} -> + Reordered = couch_util:reorder_results(AllDocs, Results), + {ok, [R || R <- Reordered, R =/= noreply]}; + Else -> + Else + end. + +handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> + skip_message(Acc0); +handle_message({rexi_EXIT, _}, _Worker, Acc0) -> + skip_message(Acc0); +handle_message({ok, Replies}, Worker, Acc0) -> + {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, + Docs = couch_util:get_value(Worker, GroupedDocs), + DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), + case {WaitingCount, dict:size(DocReplyDict)} of + {1, _} -> + % last message has arrived, we need to conclude things + {W, Reply} = dict:fold(fun force_reply/3, {W,[]}, DocReplyDict), + {stop, Reply}; + {_, DocCount} -> + % we've got at least one reply for each document, let's take a look + case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of + continue -> + {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}; + {stop, W, FinalReplies} -> + {stop, FinalReplies} + end; + {_, N} when N < DocCount -> + % no point in trying to finalize anything yet + {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}} + end; +handle_message({missing_stub, Stub}, _, _) -> + throw({missing_stub, Stub}); +handle_message({not_found, no_db_file} = X, Worker, Acc0) -> + {_, _, _, GroupedDocs, _} = Acc0, + Docs = couch_util:get_value(Worker, GroupedDocs), + handle_message({ok, [X || _D <- Docs]}, Worker, Acc0). + +force_reply(Doc, [], {W, Acc}) -> + {W, [{Doc, {error, internal_server_error}} | Acc]}; +force_reply(Doc, [FirstReply|_] = Replies, {W, Acc}) -> + case update_quorum_met(W, Replies) of + {true, Reply} -> + {W, [{Doc,Reply} | Acc]}; + false -> + ?LOG_ERROR("write quorum (~p) failed, reply ~p", [W, FirstReply]), + % TODO make a smarter choice than just picking the first reply + {W, [{Doc,FirstReply} | Acc]} + end. + +maybe_reply(_, _, continue) -> + % we didn't meet quorum for all docs, so we're fast-forwarding the fold + continue; +maybe_reply(Doc, Replies, {stop, W, Acc}) -> + case update_quorum_met(W, Replies) of + {true, Reply} -> + {stop, W, [{Doc, Reply} | Acc]}; + false -> + continue + end. + +update_quorum_met(W, Replies) -> + Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end, + orddict:new(), Replies), + case lists:dropwhile(fun({_, Count}) -> Count < W end, Counters) of + [] -> + false; + [{FinalReply, _} | _] -> + {true, FinalReply} + end. + +-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. +group_docs_by_shard(DbName, Docs) -> + dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, Doc, D1) + end, D0, mem3:shards(DbName,Id)) + end, dict:new(), Docs)). + +append_update_replies([], [], DocReplyDict) -> + DocReplyDict; +append_update_replies([Doc|Rest], [], Dict0) -> + % icky, if replicated_changes only errors show up in result + append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0)); +append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> + % TODO what if the same document shows up twice in one update_docs call? + append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). + +skip_message(Acc0) -> + % TODO fix this + {ok, Acc0}. + +validate_atomic_update(_, _, false) -> + ok; +validate_atomic_update(_DbName, AllDocs, true) -> + % TODO actually perform the validation. This requires some hackery, we need + % to basically extract the prep_and_validate_updates function from couch_db + % and only run that, without actually writing in case of a success. + Error = {not_implemented, <<"all_or_nothing is not supported yet">>}, + PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) -> + case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end, + {{Id, {Pos, RevId}}, Error} + end, AllDocs), + throw({aborted, PreCommitFailures}). diff --git a/apps/fabric/src/fabric_group_info.erl b/apps/fabric/src/fabric_group_info.erl new file mode 100644 index 00000000..04605a66 --- /dev/null +++ b/apps/fabric/src/fabric_group_info.erl @@ -0,0 +1,52 @@ +-module(fabric_group_info). + +-export([go/2]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, GroupId) when is_binary(GroupId) -> + {ok, DDoc} = fabric:open_doc(DbName, GroupId, []), + go(DbName, DDoc); + +go(DbName, #doc{} = DDoc) -> + Group = couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc), + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, group_info, [Group]), + Acc0 = {fabric_dict:init(Workers, nil), []}, + fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). + +handle_message({ok, Info}, Shard, {Counters, Acc}) -> + case fabric_dict:lookup_element(Shard, Counters) of + undefined -> + % already heard from someone else in this range + {ok, {Counters, Acc}}; + nil -> + C1 = fabric_dict:store(Shard, ok, Counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + case fabric_dict:any(nil, C2) of + true -> + {ok, {C2, [Info|Acc]}}; + false -> + {stop, merge_results(lists:flatten([Info|Acc]))} + end + end; +handle_message(_, _, Acc) -> + {ok, Acc}. + +merge_results(Info) -> + Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, + orddict:new(), Info), + orddict:fold(fun + (signature, [X|_], Acc) -> + [{signature, X} | Acc]; + (language, [X|_], Acc) -> + [{language, X} | Acc]; + (disk_size, X, Acc) -> + [{disk_size, lists:sum(X)} | Acc]; + (compact_running, X, Acc) -> + [{compact_running, lists:member(true, X)} | Acc]; + (_, _, Acc) -> + Acc + end, [], Dict). diff --git a/apps/fabric/src/fabric_rpc.erl b/apps/fabric/src/fabric_rpc.erl new file mode 100644 index 00000000..f56e3f68 --- /dev/null +++ b/apps/fabric/src/fabric_rpc.erl @@ -0,0 +1,388 @@ +-module(fabric_rpc). + +-export([get_db_info/1, get_doc_count/1, get_update_seq/1]). +-export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]). +-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]). +-export([create_db/3, delete_db/3, reset_validation_funs/1, set_security/3, + set_revs_limit/3]). + +-include("fabric.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record (view_acc, { + db, + limit, + include_docs, + offset = nil, + total_rows, + reduce_fun = fun couch_db:enum_docs_reduce_to_count/1, + group_level = 0 +}). + +%% rpc endpoints +%% call to with_db will supply your M:F with a #db{} and then remaining args + +all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> + {ok, Db} = couch_db:open(DbName, []), + #view_query_args{ + start_key = StartKey, + start_docid = StartDocId, + end_key = EndKey, + end_docid = EndDocId, + limit = Limit, + skip = Skip, + include_docs = IncludeDocs, + direction = Dir, + inclusive_end = Inclusive + } = QueryArgs, + {ok, Total} = couch_db:get_doc_count(Db), + Acc0 = #view_acc{ + db = Db, + include_docs = IncludeDocs, + limit = Limit+Skip, + total_rows = Total + }, + EndKeyType = if Inclusive -> end_key; true -> end_key_gt end, + Options = [ + {dir, Dir}, + {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end}, + {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end} + ], + {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options), + final_response(Total, Acc#view_acc.offset). + +changes(DbName, Args, StartSeq) -> + #changes_args{style=Style, dir=Dir} = Args, + case couch_db:open(DbName, []) of + {ok, Db} -> + Enum = fun changes_enumerator/2, + Opts = [{dir,Dir}], + Acc0 = {Db, StartSeq, Args}, + try + {ok, {_, LastSeq, _}} = + couch_db:changes_since(Db, Style, StartSeq, Enum, Opts, Acc0), + rexi:reply({complete, LastSeq}) + after + couch_db:close(Db) + end; + Error -> + rexi:reply(Error) + end. + +map_view(DbName, DDoc, ViewName, QueryArgs) -> + {ok, Db} = couch_db:open(DbName, []), + #view_query_args{ + limit = Limit, + skip = Skip, + keys = Keys, + include_docs = IncludeDocs, + stale = Stale, + view_type = ViewType + } = QueryArgs, + MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end, + Group0 = couch_view_group:design_doc_to_view_group(Db, DDoc), + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + {ok, Group} = couch_view_group:request_group(Pid, MinSeq), + View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType), + {ok, Total} = couch_view:get_row_count(View), + Acc0 = #view_acc{ + db = Db, + include_docs = IncludeDocs, + limit = Limit+Skip, + total_rows = Total, + reduce_fun = fun couch_view:reduce_to_count/1 + }, + case Keys of + nil -> + Options = couch_httpd_view:make_key_options(QueryArgs), + {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options); + _ -> + Acc = lists:foldl(fun(Key, AccIn) -> + KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, + Options = couch_httpd_view:make_key_options(KeyArgs), + {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn, + Options), + Out + end, Acc0, Keys) + end, + final_response(Total, Acc#view_acc.offset). + +reduce_view(DbName, Group0, ViewName, QueryArgs) -> + {ok, Db} = couch_db:open(DbName, []), + #view_query_args{ + group_level = GroupLevel, + limit = Limit, + skip = Skip, + keys = Keys, + stale = Stale + } = QueryArgs, + GroupFun = group_rows_fun(GroupLevel), + MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end, + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + {ok, #group{views=Views, def_lang=Lang}} = couch_view_group:request_group( + Pid, MinSeq), + {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce), + ReduceView = {reduce, NthRed, Lang, View}, + Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip}, + case Keys of + nil -> + Options0 = couch_httpd_view:make_key_options(QueryArgs), + Options = [{key_group_fun, GroupFun} | Options0], + couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options); + _ -> + lists:map(fun(Key) -> + KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, + Options0 = couch_httpd_view:make_key_options(KeyArgs), + Options = [{key_group_fun, GroupFun} | Options0], + couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options) + end, Keys) + end, + rexi:reply(complete). + +create_db(DbName, Options, Doc) -> + mem3_util:write_db_doc(Doc), + rexi:reply(case couch_server:create(DbName, Options) of + {ok, _} -> + ok; + Error -> + Error + end). + +delete_db(DbName, Options, DocId) -> + mem3_util:delete_db_doc(DocId), + rexi:reply(couch_server:delete(DbName, Options)). + +get_db_info(DbName) -> + with_db(DbName, [], {couch_db, get_db_info, []}). + +get_doc_count(DbName) -> + with_db(DbName, [], {couch_db, get_doc_count, []}). + +get_update_seq(DbName) -> + with_db(DbName, [], {couch_db, get_update_seq, []}). + +set_security(DbName, SecObj, Options) -> + with_db(DbName, Options, {couch_db, set_security, [SecObj]}). + +set_revs_limit(DbName, Limit, Options) -> + with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). + +open_doc(DbName, DocId, Options) -> + with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). + +open_revs(DbName, Id, Revs, Options) -> + with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}). + +get_missing_revs(DbName, IdRevsList) -> + % reimplement here so we get [] for Ids with no missing revs in response + rexi:reply(case couch_db:open(DbName, []) of + {ok, Db} -> + Ids = [Id1 || {Id1, _Revs} <- IdRevsList], + {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> + case FullDocInfoResult of + {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} -> + MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs), + {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)}; + not_found -> + {Id, Revs, []} + end + end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))}; + Error -> + Error + end). + +update_docs(DbName, Docs0, Options) -> + case proplists:get_value(replicated_changes, Options) of + true -> + X = replicated_changes; + _ -> + X = interactive_edit + end, + Docs = make_att_readers(Docs0), + with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). + +group_info(DbName, Group0) -> + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + rexi:reply(couch_view_group:request_group_info(Pid)). + +reset_validation_funs(DbName) -> + case couch_db:open(DbName, []) of + {ok, #db{main_pid = Pid}} -> + gen_server:cast(Pid, {load_validation_funs, undefined}); + _ -> + ok + end. + +%% +%% internal +%% + +with_db(DbName, Options, {M,F,A}) -> + case couch_db:open(DbName, Options) of + {ok, Db} -> + rexi:reply(try + apply(M, F, [Db | A]) + catch Exception -> + Exception; + error:Reason -> + ?LOG_ERROR("~p ~p ~p~n~p", [?MODULE, {M,F}, Reason, + erlang:get_stacktrace()]), + {error, Reason} + end); + Error -> + rexi:reply(Error) + end. + +view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> + % matches for _all_docs and translates #full_doc_info{} -> KV pair + case couch_doc:to_doc_info(FullDocInfo) of + #doc_info{revs=[#rev_info{deleted=false, rev=Rev}|_]} -> + Id = FullDocInfo#full_doc_info.id, + Value = {[{rev,couch_doc:rev_to_str(Rev)}]}, + view_fold({{Id,Id}, Value}, OffsetReds, Acc); + #doc_info{revs=[#rev_info{deleted=true}|_]} -> + {ok, Acc} + end; +view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) -> + % calculates the offset for this shard + #view_acc{reduce_fun=Reduce} = Acc, + Offset = Reduce(OffsetReds), + case rexi:sync_reply({total_and_offset, Total, Offset}) of + ok -> + view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset}); + stop -> + exit(normal); + timeout -> + exit(timeout) + end; +view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) -> + % we scanned through limit+skip local rows + {stop, Acc}; +view_fold({{Key,Id}, Value}, _Offset, Acc) -> + % the normal case + #view_acc{ + db = Db, + limit = Limit, + include_docs = IncludeDocs + } = Acc, + Doc = if not IncludeDocs -> undefined; true -> + case couch_db:open_doc(Db, Id, []) of + {not_found, deleted} -> + null; + {not_found, missing} -> + undefined; + {ok, Doc0} -> + couch_doc:to_json_obj(Doc0, []) + end + end, + case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of + ok -> + {ok, Acc#view_acc{limit=Limit-1}}; + timeout -> + exit(timeout) + end. + +final_response(Total, nil) -> + case rexi:sync_reply({total_and_offset, Total, Total}) of ok -> + rexi:reply(complete); + stop -> + ok; + timeout -> + exit(timeout) + end; +final_response(_Total, _Offset) -> + rexi:reply(complete). + +group_rows_fun(exact) -> + fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end; +group_rows_fun(0) -> + fun(_A, _B) -> true end; +group_rows_fun(GroupLevel) when is_integer(GroupLevel) -> + fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) -> + lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel); + ({Key1,_}, {Key2,_}) -> + Key1 == Key2 + end. + +reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) -> + {stop, Acc}; +reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) -> + send(null, Red, Acc); +reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) -> + send(Key, Red, Acc); +reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) -> + send(lists:sublist(K, I), Red, Acc). + +send(Key, Value, #view_acc{limit=Limit} = Acc) -> + case rexi:sync_reply(#view_row{key=Key, value=Value}) of + ok -> + {ok, Acc#view_acc{limit=Limit-1}}; + stop -> + exit(normal); + timeout -> + exit(timeout) + end. + +changes_enumerator(DocInfo, {Db, _Seq, Args}) -> + #changes_args{include_docs=IncludeDocs, filter=FilterFun} = Args, + #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} + = DocInfo, + case [Result || Result <- FilterFun(DocInfo), Result /= null] of + [] -> + {ok, {Db, Seq, Args}}; + Results -> + ChangesRow = changes_row(Db, Seq, Id, Results, Rev, Del, IncludeDocs), + Go = rexi:sync_reply(ChangesRow), + {Go, {Db, Seq, Args}} + end. + +changes_row(_, Seq, Id, Results, _, true, true) -> + #view_row{key=Seq, id=Id, value=Results, doc=deleted}; +changes_row(_, Seq, Id, Results, _, true, false) -> + #view_row{key=Seq, id=Id, value=Results, doc=deleted}; +changes_row(Db, Seq, Id, Results, Rev, false, true) -> + #view_row{key=Seq, id=Id, value=Results, doc=doc_member(Db, Id, Rev)}; +changes_row(_, Seq, Id, Results, _, false, false) -> + #view_row{key=Seq, id=Id, value=Results}. + +doc_member(Shard, Id, Rev) -> + case couch_db:open_doc_revs(Shard, Id, [Rev], []) of + {ok, [{ok,Doc}]} -> + couch_doc:to_json_obj(Doc, []); + Error -> + Error + end. + +possible_ancestors(_FullInfo, []) -> + []; +possible_ancestors(FullInfo, MissingRevs) -> + #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), + LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], + % Find the revs that are possible parents of this rev + lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> + % this leaf is a "possible ancenstor" of the missing + % revs if this LeafPos lessthan any of the missing revs + case lists:any(fun({MissingPos, _}) -> + LeafPos < MissingPos end, MissingRevs) of + true -> + [{LeafPos, LeafRevId} | Acc]; + false -> + Acc + end + end, [], LeafRevs). + +make_att_readers([]) -> + []; +make_att_readers([#doc{atts=Atts0} = Doc | Rest]) -> + % % go through the attachments looking for 'follows' in the data, + % % replace with function that reads the data from MIME stream. + Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0], + [Doc#doc{atts = Atts} | make_att_readers(Rest)]. + +make_att_reader({follows, Parser}) -> + fun() -> + Parser ! {get_bytes, self()}, + receive {bytes, Bytes} -> Bytes end + end; +make_att_reader(Else) -> + Else. diff --git a/apps/fabric/src/fabric_util.erl b/apps/fabric/src/fabric_util.erl new file mode 100644 index 00000000..639a32e7 --- /dev/null +++ b/apps/fabric/src/fabric_util.erl @@ -0,0 +1,89 @@ +-module(fabric_util). + +-export([submit_jobs/3, cleanup/1, recv/4, receive_loop/4, receive_loop/6, + get_db/1]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +submit_jobs(Shards, EndPoint, ExtraArgs) -> + lists:map(fun(#shard{node=Node, name=ShardName} = Shard) -> + Ref = rexi:cast(Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}), + Shard#shard{ref = Ref} + end, Shards). + +cleanup(Workers) -> + [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers]. + +recv(Workers, Keypos, Fun, Acc0) -> + receive_loop(Workers, Keypos, Fun, Acc0). + +receive_loop(Workers, Keypos, Fun, Acc0) -> + case couch_config:get("fabric", "request_timeout", "60000") of + "infinity" -> + Timeout = infinity; + N -> + Timeout = list_to_integer(N) + end, + receive_loop(Workers, Keypos, Fun, Acc0, Timeout, infinity). + +%% @doc set up the receive loop with an overall timeout +-spec receive_loop([any()], integer(), function(), any(), timeout(), timeout()) -> + {ok, any()} | timeout | {error, any()}. +receive_loop(RefPartMap, Keypos, Fun, Acc0, infinity, PerMsgTO) -> + process_mailbox(RefPartMap, Keypos, Fun, Acc0, nil, PerMsgTO); +receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) -> + TimeoutRef = erlang:make_ref(), + {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}), + try + process_mailbox(RefPartMap, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) + after + timer:cancel(TRef) + end. + +process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> + case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of + {ok, Acc} -> + process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO); + {stop, Acc} -> + {ok, Acc}; + Error -> + Error + end. + +process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> + receive + {timeout, TimeoutRef} -> + timeout; + {Ref, Msg} -> + case lists:keyfind(Ref, Keypos, RefList) of + false -> + % this was some non-matching message which we will ignore + {ok, Acc0}; + Worker -> + Fun(Msg, Worker, Acc0) + end; + {Ref, From, Msg} -> + case lists:keyfind(Ref, Keypos, RefList) of + false -> + {ok, Acc0}; + Worker -> + Fun(Msg, {Worker, From}, Acc0) + end; + {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg -> + showroom_log:message(alert, "rexi_DOWN ~p ~p", [ServerPid, Reason]), + Fun(Msg, nil, Acc0) + after PerMsgTO -> + timeout + end. + +get_db(DbName) -> + Shards = mem3:shards(DbName), + case lists:partition(fun(#shard{node = N}) -> N =:= node() end, Shards) of + {[#shard{name = ShardName}|_], _} -> + % prefer node-local DBs + couch_db:open(ShardName, []); + {[], [#shard{node = Node, name = ShardName}|_]} -> + % but don't require them + rpc:call(Node, couch_db, open, [ShardName, []]) + end. diff --git a/apps/fabric/src/fabric_view.erl b/apps/fabric/src/fabric_view.erl new file mode 100644 index 00000000..49a3a55a --- /dev/null +++ b/apps/fabric/src/fabric_view.erl @@ -0,0 +1,218 @@ +-module(fabric_view). + +-export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, + maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1, + extract_view/4]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +%% @doc looks for a fully covered keyrange in the list of counters +-spec is_progress_possible([{#shard{}, term()}]) -> boolean(). +is_progress_possible([]) -> + false; +is_progress_possible(Counters) -> + Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, + [], Counters), + [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges), + Result = lists:foldl(fun + (_, fail) -> + % we've already declared failure + fail; + (_, complete) -> + % this is the success condition, we can fast-forward + complete; + ({X,_}, Tail) when X > (Tail+1) -> + % gap in the keyrange, we're dead + fail; + ({_,Y}, Tail) -> + case erlang:max(Tail, Y) of + End when (End+1) =:= (2 bsl 31) -> + complete; + Else -> + % the normal condition, adding to the tail + Else + end + end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest), + (Start =:= 0) andalso (Result =:= complete). + +-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) -> + [{#shard{}, any()}]. +remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> + fabric_dict:filter(fun(#shard{range=[X,Y]} = Shard, _Value) -> + if Shard =:= Shard0 -> + % we can't remove ourselves + true; + A < B, X >= A, X < B -> + % lower bound is inside our range + false; + A < B, Y > A, Y =< B -> + % upper bound is inside our range + false; + B < A, X >= A orelse B < A, X < B -> + % target shard wraps the key range, lower bound is inside + false; + B < A, Y > A orelse B < A, Y =< B -> + % target shard wraps the key range, upper bound is inside + false; + true -> + true + end + end, Shards). + +maybe_pause_worker(Worker, From, State) -> + #collector{buffer_size = BufferSize, counters = Counters} = State, + case fabric_dict:lookup_element(Worker, Counters) of + BufferSize -> + State#collector{blocked = [{Worker,From} | State#collector.blocked]}; + _Count -> + gen_server:reply(From, ok), + State + end. + +maybe_resume_worker(Worker, State) -> + #collector{buffer_size = Buffer, counters = C, blocked = B} = State, + case fabric_dict:lookup_element(Worker, C) of + Count when Count < Buffer/2 -> + case couch_util:get_value(Worker, B) of + undefined -> + State; + From -> + gen_server:reply(From, ok), + State#collector{blocked = lists:keydelete(Worker, 1, B)} + end; + _Other -> + State + end. + +maybe_send_row(#collector{limit=0} = State) -> + #collector{user_acc=AccIn, callback=Callback} = State, + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}}; +maybe_send_row(State) -> + #collector{ + callback = Callback, + counters = Counters, + skip = Skip, + limit = Limit, + user_acc = AccIn + } = State, + case fabric_dict:any(0, Counters) of + true -> + {ok, State}; + false -> + try get_next_row(State) of + {_, NewState} when Skip > 0 -> + maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1}); + {Row, NewState} -> + case Callback(transform_row(Row), AccIn) of + {stop, Acc} -> + {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; + {ok, Acc} -> + maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) + end + catch complete -> + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}} + end + end. + +keydict(nil) -> + undefined; +keydict(Keys) -> + {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end, + {dict:new(),0}, Keys), + Dict. + +%% internal %% + +get_next_row(#collector{rows = []}) -> + throw(complete); +get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> + #collector{ + query_args = #view_query_args{direction=Dir}, + keys = Keys, + rows = RowDict, + os_proc = Proc, + counters = Counters0 + } = St, + {Key, RestKeys} = find_next_key(Keys, Dir, RowDict), + case dict:find(Key, RowDict) of + {ok, Records} -> + NewRowDict = dict:erase(Key, RowDict), + Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) -> + fabric_dict:update_counter(Worker, -1, CountersAcc) + end, Counters0, Records), + Wrapped = [[V] || #view_row{value=V} <- Records], + {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped), + NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters}, + {#view_row{key=Key, id=reduced, value=Reduced}, NewSt}; + error -> + get_next_row(St#collector{keys=RestKeys}) + end; +get_next_row(State) -> + #collector{rows = [Row|Rest], counters = Counters0} = State, + Worker = Row#view_row.worker, + Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), + NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), + {Row, NewState#collector{rows = Rest}}. + +find_next_key(nil, Dir, RowDict) -> + case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of + [] -> + throw(complete); + [Key|_] -> + {Key, nil} + end; +find_next_key([], _, _) -> + throw(complete); +find_next_key([Key|Rest], _, _) -> + {Key, Rest}. + +transform_row(#view_row{key=Key, id=reduced, value=Value}) -> + {row, {[{key,Key}, {value,Value}]}}; +transform_row(#view_row{key=Key, id=undefined}) -> + {row, {[{key,Key}, {error,not_found}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> + {row, {[{id,Id}, {key,Key}, {value,Value}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> + {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> + {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}. + +sort_fun(fwd) -> + fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end; +sort_fun(rev) -> + fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end. + +extract_view(Pid, ViewName, [], _ViewType) -> + ?LOG_ERROR("missing_named_view ~p", [ViewName]), + exit(Pid, kill), + exit(missing_named_view); +extract_view(Pid, ViewName, [View|Rest], ViewType) -> + case lists:member(ViewName, view_names(View, ViewType)) of + true -> + if ViewType == reduce -> + {index_of(ViewName, view_names(View, reduce)), View}; + true -> + View + end; + false -> + extract_view(Pid, ViewName, Rest, ViewType) + end. + +view_names(View, Type) when Type == red_map; Type == reduce -> + [Name || {Name, _} <- View#view.reduce_funs]; +view_names(View, map) -> + View#view.map_names. + +index_of(X, List) -> + index_of(X, List, 1). + +index_of(_X, [], _I) -> + not_found; +index_of(X, [X|_Rest], I) -> + I; +index_of(X, [_|Rest], I) -> + index_of(X, Rest, I+1). diff --git a/apps/fabric/src/fabric_view_all_docs.erl b/apps/fabric/src/fabric_view_all_docs.erl new file mode 100644 index 00000000..d51a2831 --- /dev/null +++ b/apps/fabric/src/fabric_view_all_docs.erl @@ -0,0 +1,167 @@ +-module(fabric_view_all_docs). + +-export([go/4]). +-export([open_doc/3]). % exported for spawn + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> + Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> + Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}), + Shard#shard{ref = Ref} + end, mem3:shards(DbName)), + BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), + #view_query_args{limit = Limit, skip = Skip} = QueryArgs, + State = #collector{ + query_args = QueryArgs, + callback = Callback, + buffer_size = list_to_integer(BufferSize), + counters = fabric_dict:init(Workers, 0), + skip = Skip, + limit = Limit, + user_acc = Acc0 + }, + try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + State, infinity, 5000) of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + Error -> + Error + after + fabric_util:cleanup(Workers) + end; + +go(DbName, QueryArgs, Callback, Acc0) -> + #view_query_args{ + direction = Dir, + include_docs = IncludeDocs, + limit = Limit0, + skip = Skip0, + keys = Keys + } = QueryArgs, + {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end), + Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) || + Id <- Keys], + Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end, + receive {'DOWN', Ref0, _, _, {ok, TotalRows}} -> + {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0), + {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1), + Callback(complete, Acc2) + after 10000 -> + Callback(timeout, Acc0) + end. + +handle_message({rexi_DOWN, _, _, _}, nil, State) -> + % TODO see if progress can be made here, possibly by removing all shards + % from that node and checking is_progress_possible + {ok, State}; + +handle_message({rexi_EXIT, _}, Worker, State) -> + #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, + Counters = fabric_dict:erase(Worker, Counters0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters}}; + false -> + Callback({error, dead_shards}, Acc), + {error, dead_shards} + end; + +handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> + #collector{ + callback = Callback, + counters = Counters0, + total_rows = Total0, + offset = Offset0, + user_acc = AccIn + } = State, + case fabric_dict:lookup_element(Worker, Counters0) of + undefined -> + % this worker lost the race with other partition copies, terminate + gen_server:reply(From, stop), + {ok, State}; + 0 -> + gen_server:reply(From, ok), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), + Total = Total0 + Tot, + Offset = Offset0 + Off, + case fabric_dict:any(0, Counters2) of + true -> + {ok, State#collector{ + counters = Counters2, + total_rows = Total, + offset = Offset + }}; + false -> + FinalOffset = erlang:min(Total, Offset+State#collector.skip), + {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), + {Go, State#collector{ + counters = fabric_dict:decrement_all(Counters2), + total_rows = Total, + offset = FinalOffset, + user_acc = Acc + }} + end + end; + +handle_message(#view_row{} = Row, {Worker, From}, State) -> + #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, + Dir = Args#view_query_args.direction, + Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows=Rows, counters=Counters1}, + State2 = fabric_view:maybe_pause_worker(Worker, From, State1), + fabric_view:maybe_send_row(State2); + +handle_message(complete, Worker, State) -> + Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), + fabric_view:maybe_send_row(State#collector{counters = Counters}). + + +merge_row(fwd, Row, Rows) -> + lists:keymerge(#view_row.id, [Row], Rows); +merge_row(rev, Row, Rows) -> + lists:rkeymerge(#view_row.id, [Row], Rows). + +doc_receive_loop([], _, _, _, Acc) -> + {ok, Acc}; +doc_receive_loop(_, _, 0, _, Acc) -> + {ok, Acc}; +doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 -> + receive {'DOWN', Ref, process, Pid, #view_row{}} -> + doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc) + after 10000 -> + timeout + end; +doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) -> + receive {'DOWN', Ref, process, Pid, #view_row{} = Row} -> + case Callback(fabric_view:transform_row(Row), AccIn) of + {ok, Acc} -> + doc_receive_loop(Rest, 0, Limit-1, Callback, Acc); + {stop, Acc} -> + {ok, Acc} + end + after 10000 -> + timeout + end. + +open_doc(DbName, Id, IncludeDocs) -> + Row = case fabric:open_doc(DbName, Id, [deleted]) of + {not_found, missing} -> + Doc = undefined, + #view_row{key=Id}; + {ok, #doc{deleted=true, revs=Revs}} -> + Doc = null, + {RevPos, [RevId|_]} = Revs, + Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]}, + #view_row{key=Id, id=Id, value=Value}; + {ok, #doc{revs=Revs} = Doc0} -> + Doc = couch_doc:to_json_obj(Doc0, []), + {RevPos, [RevId|_]} = Revs, + Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]}, + #view_row{key=Id, id=Id, value=Value} + end, + exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end). diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl new file mode 100644 index 00000000..6030df1d --- /dev/null +++ b/apps/fabric/src/fabric_view_changes.erl @@ -0,0 +1,251 @@ +-module(fabric_view_changes). + +-export([go/5, start_update_notifier/1]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse + Feed == "longpoll" -> + Args = make_changes_args(Options), + {ok, Acc} = Callback(start, Acc0), + Notifiers = start_update_notifiers(DbName), + {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback), + try + keep_sending_changes( + DbName, + Args, + Callback, + get_start_seq(DbName, Args), + Acc, + Timeout, + TimeoutFun + ) + after + stop_update_notifiers(Notifiers), + couch_changes:get_rest_db_updated() + end; + +go(DbName, "normal", Options, Callback, Acc0) -> + Args = make_changes_args(Options), + {ok, Acc} = Callback(start, Acc0), + {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes( + DbName, + Args, + Callback, + get_start_seq(DbName, Args), + Acc + ), + Callback({stop, pack_seqs(Seqs)}, AccOut). + +keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) -> + #changes_args{limit=Limit, feed=Feed} = Args, + {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn), + #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector, + LastSeq = pack_seqs(NewSeqs), + if Limit > Limit2, Feed == "longpoll" -> + Callback({stop, LastSeq}, AccOut); + true -> + case couch_changes:wait_db_updated(Timeout, TFun) of + updated -> + keep_sending_changes( + DbName, + Args#changes_args{limit=Limit2}, + Callback, + LastSeq, + AccIn, + Timeout, + TFun + ); + stop -> + Callback({stop, LastSeq}, AccOut) + end + end. + +send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) -> + AllShards = mem3:shards(DbName), + Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> + case lists:member(Shard, AllShards) of + true -> + Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}), + [{Shard#shard{ref = Ref}, Seq}]; + false -> + % Find some replacement shards to cover the missing range + % TODO It's possible in rare cases of shard merging to end up + % with overlapping shard ranges from this technique + lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> + Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}), + {NewShard#shard{ref = Ref}, 0} + end, find_replacement_shards(Shard, AllShards)) + end + end, unpack_seqs(PackedSeqs, DbName)), + {Workers, _} = lists:unzip(Seqs), + State = #collector{ + query_args = ChangesArgs, + callback = Callback, + counters = fabric_dict:init(Workers, 0), + user_acc = AccIn, + limit = ChangesArgs#changes_args.limit, + rows = Seqs % store sequence positions instead + }, + try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + State, infinity, 5000) + after + fabric_util:cleanup(Workers) + end. + +handle_message({rexi_DOWN, _, _, _}, nil, State) -> + % TODO see if progress can be made here, possibly by removing all shards + % from that node and checking is_progress_possible + {ok, State}; + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), + #collector{ + callback=Callback, + counters=Counters0, + rows = Seqs0, + user_acc=Acc + } = State, + Counters = fabric_dict:erase(Worker, Counters0), + Seqs = fabric_dict:erase(Worker, Seqs0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters, rows=Seqs}}; + false -> + Callback({error, dead_shards}, Acc), + {error, dead_shards} + end; + +handle_message(_, _, #collector{limit=0} = State) -> + {stop, State}; + +handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) -> + #collector{ + query_args = #changes_args{include_docs=IncludeDocs}, + callback = Callback, + counters = S0, + limit = Limit, + user_acc = AccIn + } = St, + case fabric_dict:lookup_element(Worker, S0) of + undefined -> + % this worker lost the race with other partition copies, terminate it + gen_server:reply(From, stop), + {ok, St}; + _ -> + S1 = fabric_dict:store(Worker, Seq, S0), + S2 = fabric_view:remove_overlapping_shards(Worker, S1), + Row = Row0#view_row{key = pack_seqs(S2)}, + {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn), + gen_server:reply(From, Go), + {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}} + end; + +handle_message({complete, EndSeq}, Worker, State) -> + #collector{ + counters = S0, + total_rows = Completed % override + } = State, + case fabric_dict:lookup_element(Worker, S0) of + undefined -> + {ok, State}; + _ -> + S1 = fabric_dict:store(Worker, EndSeq, S0), + % unlikely to have overlaps here, but possible w/ filters + S2 = fabric_view:remove_overlapping_shards(Worker, S1), + NewState = State#collector{counters=S2, total_rows=Completed+1}, + case fabric_dict:size(S2) =:= (Completed+1) of + true -> + {stop, NewState}; + false -> + {ok, NewState} + end + end. + +make_changes_args(#changes_args{style=main_only, filter=undefined}=Args) -> + Args#changes_args{filter = fun couch_changes:main_only_filter/1}; +make_changes_args(#changes_args{style=all_docs, filter=undefined}=Args) -> + Args#changes_args{filter = fun couch_changes:all_docs_filter/1}; +make_changes_args(Args) -> + Args. + +get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) -> + Since; +get_start_seq(DbName, #changes_args{dir=rev}) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, get_update_seq, []), + {ok, Since} = fabric_util:recv(Workers, #shard.ref, + fun collect_update_seqs/3, fabric_dict:init(Workers, -1)), + Since. + +collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) -> + case fabric_dict:lookup_element(Shard, Counters) of + undefined -> + % already heard from someone else in this range + {ok, Counters}; + -1 -> + C1 = fabric_dict:store(Shard, Seq, Counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + case fabric_dict:any(-1, C2) of + true -> + {ok, C2}; + false -> + {stop, pack_seqs(C2)} + end + end. + +pack_seqs(Workers) -> + SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers], + SeqSum = lists:sum(element(2, lists:unzip(Workers))), + Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])), + list_to_binary([integer_to_list(SeqSum), $-, Opaque]). + +unpack_seqs(0, DbName) -> + fabric_dict:init(mem3:shards(DbName), 0); + +unpack_seqs("0", DbName) -> + fabric_dict:init(mem3:shards(DbName), 0); + +unpack_seqs(Packed, DbName) -> + {match, [Opaque]} = re:run(Packed, "^([0-9]+-)?(?<opaque>.*)", [{capture, + [opaque], binary}]), + % TODO relies on internal structure of fabric_dict as keylist + lists:map(fun({Node, [A,B], Seq}) -> + Shard = #shard{node=Node, range=[A,B], dbname=DbName}, + {mem3_util:name_shard(Shard), Seq} + end, binary_to_term(couch_util:decodeBase64Url(Opaque))). + +start_update_notifiers(DbName) -> + lists:map(fun(#shard{node=Node, name=Name}) -> + {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})} + end, mem3:shards(DbName)). + +% rexi endpoint +start_update_notifier(DbName) -> + {Caller, _} = get(rexi_from), + Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end, + Id = {couch_db_update_notifier, make_ref()}, + ok = gen_event:add_sup_handler(couch_db_update, Id, Fun), + receive {gen_event_EXIT, Id, Reason} -> + rexi:reply({gen_event_EXIT, DbName, Reason}) + end. + +stop_update_notifiers(Notifiers) -> + [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers]. + +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}}; +changes_row(#view_row{key=Seq, id=Id, value=Value}, false) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}. + +find_replacement_shards(#shard{range=Range}, AllShards) -> + % TODO make this moar betta -- we might have split or merged the partition + [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. diff --git a/apps/fabric/src/fabric_view_map.erl b/apps/fabric/src/fabric_view_map.erl new file mode 100644 index 00000000..ce8dd625 --- /dev/null +++ b/apps/fabric/src/fabric_view_map.erl @@ -0,0 +1,138 @@ +-module(fabric_view_map). + +-export([go/6]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> + {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), + go(DbName, DDoc, View, Args, Callback, Acc0); + +go(DbName, DDoc, View, Args, Callback, Acc0) -> + Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> + Ref = rexi:cast(Node, {fabric_rpc, map_view, [Name, DDoc, View, Args]}), + Shard#shard{ref = Ref} + end, mem3:shards(DbName)), + BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), + #view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args, + State = #collector{ + query_args = Args, + callback = Callback, + buffer_size = list_to_integer(BufferSize), + counters = fabric_dict:init(Workers, 0), + skip = Skip, + limit = Limit, + keys = fabric_view:keydict(Keys), + sorted = Args#view_query_args.sorted, + user_acc = Acc0 + }, + try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + State, infinity, 1000 * 60 * 60) of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + Error -> + Error + after + fabric_util:cleanup(Workers) + end. + +handle_message({rexi_DOWN, _, _, _}, nil, State) -> + % TODO see if progress can be made here, possibly by removing all shards + % from that node and checking is_progress_possible + {ok, State}; + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), + #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, + Counters = fabric_dict:erase(Worker, Counters0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters}}; + false -> + Callback({error, dead_shards}, Acc), + {error, dead_shards} + end; + +handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> + #collector{ + callback = Callback, + counters = Counters0, + total_rows = Total0, + offset = Offset0, + user_acc = AccIn + } = State, + case fabric_dict:lookup_element(Worker, Counters0) of + undefined -> + % this worker lost the race with other partition copies, terminate + gen_server:reply(From, stop), + {ok, State}; + 0 -> + gen_server:reply(From, ok), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), + Total = Total0 + Tot, + Offset = Offset0 + Off, + case fabric_dict:any(0, Counters2) of + true -> + {ok, State#collector{ + counters = Counters2, + total_rows = Total, + offset = Offset + }}; + false -> + FinalOffset = erlang:min(Total, Offset+State#collector.skip), + {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), + {Go, State#collector{ + counters = fabric_dict:decrement_all(Counters2), + total_rows = Total, + offset = FinalOffset, + user_acc = Acc + }} + end + end; + +handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) -> + #collector{callback=Callback} = State, + {_, Acc} = Callback(complete, State#collector.user_acc), + {stop, State#collector{user_acc=Acc}}; + +handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) -> + #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St, + {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), + gen_server:reply(From, ok), + {Go, St#collector{user_acc=Acc, limit=Limit-1}}; + +handle_message(#view_row{} = Row, {Worker, From}, State) -> + #collector{ + query_args = #view_query_args{direction=Dir}, + counters = Counters0, + rows = Rows0, + keys = KeyDict + } = State, + Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows=Rows, counters=Counters1}, + State2 = fabric_view:maybe_pause_worker(Worker, From, State1), + fabric_view:maybe_send_row(State2); + +handle_message(complete, Worker, State) -> + Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), + fabric_view:maybe_send_row(State#collector{counters = Counters}). + +merge_row(fwd, undefined, Row, Rows) -> + lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> + couch_view:less_json([KeyA, IdA], [KeyB, IdB]) + end, [Row], Rows); +merge_row(rev, undefined, Row, Rows) -> + lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> + couch_view:less_json([KeyB, IdB], [KeyA, IdA]) + end, [Row], Rows); +merge_row(_, KeyDict, Row, Rows) -> + lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) -> + if A =:= B -> IdA < IdB; true -> + dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict) + end + end, [Row], Rows). + diff --git a/apps/fabric/src/fabric_view_reduce.erl b/apps/fabric/src/fabric_view_reduce.erl new file mode 100644 index 00000000..ddde9f22 --- /dev/null +++ b/apps/fabric/src/fabric_view_reduce.erl @@ -0,0 +1,85 @@ +-module(fabric_view_reduce). + +-export([go/6]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> + {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), + go(DbName, DDoc, View, Args, Callback, Acc0); + +go(DbName, DDoc, VName, Args, Callback, Acc0) -> + #group{def_lang=Lang, views=Views} = Group = + couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc), + {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce), + {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs), + Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) -> + Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}), + Shard#shard{ref = Ref} + end, mem3:shards(DbName)), + BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"), + #view_query_args{limit = Limit, skip = Skip} = Args, + State = #collector{ + query_args = Args, + callback = Callback, + buffer_size = list_to_integer(BufferSize), + counters = fabric_dict:init(Workers, 0), + keys = Args#view_query_args.keys, + skip = Skip, + limit = Limit, + lang = Group#group.def_lang, + os_proc = couch_query_servers:get_os_process(Lang), + reducer = RedSrc, + rows = dict:new(), + user_acc = Acc0 + }, + try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3, + State, infinity, 1000 * 60 * 60) of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + Error -> + Error + after + fabric_util:cleanup(Workers), + catch couch_query_servers:ret_os_process(State#collector.os_proc) + end. + +handle_message({rexi_DOWN, _, _, _}, nil, State) -> + % TODO see if progress can be made here, possibly by removing all shards + % from that node and checking is_progress_possible + {ok, State}; + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), + #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, + Counters = fabric_dict:erase(Worker, Counters0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters}}; + false -> + Callback({error, dead_shards}, Acc), + {error, dead_shards} + end; + +handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> + #collector{counters = Counters0, rows = Rows0} = State, + case fabric_dict:lookup_element(Worker, Counters0) of + undefined -> + % this worker lost the race with other partition copies, terminate it + gen_server:reply(From, stop), + {ok, State}; + _ -> + Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0), + C1 = fabric_dict:update_counter(Worker, 1, Counters0), + % TODO time this call, if slow don't do it every time + C2 = fabric_view:remove_overlapping_shards(Worker, C1), + State1 = State#collector{rows=Rows, counters=C2}, + State2 = fabric_view:maybe_pause_worker(Worker, From, State1), + fabric_view:maybe_send_row(State2) + end; + +handle_message(complete, Worker, State) -> + Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), + fabric_view:maybe_send_row(State#collector{counters = Counters}). |