From ebac05f686b56791511cb9b599dfb5a742dcfc96 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Mon, 25 Oct 2010 15:46:05 -0400 Subject: use get-deps to pull down individual cloudant projects --- apps/fabric/src/fabric.app.src | 6 - apps/fabric/src/fabric.erl | 264 ------------------ apps/fabric/src/fabric_db_create.erl | 79 ------ apps/fabric/src/fabric_db_delete.erl | 55 ---- apps/fabric/src/fabric_db_doc_count.erl | 46 ---- apps/fabric/src/fabric_db_info.erl | 69 ----- apps/fabric/src/fabric_db_meta.erl | 49 ---- apps/fabric/src/fabric_dict.erl | 51 ---- apps/fabric/src/fabric_doc_attachments.erl | 116 -------- apps/fabric/src/fabric_doc_missing_revs.erl | 78 ------ apps/fabric/src/fabric_doc_open.erl | 120 --------- apps/fabric/src/fabric_doc_open_revs.erl | 284 -------------------- apps/fabric/src/fabric_doc_update.erl | 147 ---------- apps/fabric/src/fabric_group_info.erl | 66 ----- apps/fabric/src/fabric_rpc.erl | 402 ---------------------------- apps/fabric/src/fabric_util.erl | 97 ------- apps/fabric/src/fabric_view.erl | 235 ---------------- apps/fabric/src/fabric_view_all_docs.erl | 181 ------------- apps/fabric/src/fabric_view_changes.erl | 271 ------------------- apps/fabric/src/fabric_view_map.erl | 151 ----------- apps/fabric/src/fabric_view_reduce.erl | 99 ------- 21 files changed, 2866 deletions(-) delete mode 100644 apps/fabric/src/fabric.app.src delete mode 100644 apps/fabric/src/fabric.erl delete mode 100644 apps/fabric/src/fabric_db_create.erl delete mode 100644 apps/fabric/src/fabric_db_delete.erl delete mode 100644 apps/fabric/src/fabric_db_doc_count.erl delete mode 100644 apps/fabric/src/fabric_db_info.erl delete mode 100644 apps/fabric/src/fabric_db_meta.erl delete mode 100644 apps/fabric/src/fabric_dict.erl delete mode 100644 apps/fabric/src/fabric_doc_attachments.erl delete mode 100644 apps/fabric/src/fabric_doc_missing_revs.erl delete mode 100644 apps/fabric/src/fabric_doc_open.erl delete mode 100644 apps/fabric/src/fabric_doc_open_revs.erl delete mode 100644 apps/fabric/src/fabric_doc_update.erl delete mode 100644 apps/fabric/src/fabric_group_info.erl delete mode 100644 apps/fabric/src/fabric_rpc.erl delete mode 100644 apps/fabric/src/fabric_util.erl delete mode 100644 apps/fabric/src/fabric_view.erl delete mode 100644 apps/fabric/src/fabric_view_all_docs.erl delete mode 100644 apps/fabric/src/fabric_view_changes.erl delete mode 100644 apps/fabric/src/fabric_view_map.erl delete mode 100644 apps/fabric/src/fabric_view_reduce.erl (limited to 'apps/fabric/src') diff --git a/apps/fabric/src/fabric.app.src b/apps/fabric/src/fabric.app.src deleted file mode 100644 index 0b626ba7..00000000 --- a/apps/fabric/src/fabric.app.src +++ /dev/null @@ -1,6 +0,0 @@ -{application, fabric, [ - {description, "Routing and proxying layer for CouchDB cluster"}, - {vsn, "1.0.3"}, - {registered, []}, - {applications, [kernel, stdlib, couch, rexi, mem3]} -]}. diff --git a/apps/fabric/src/fabric.erl b/apps/fabric/src/fabric.erl deleted file mode 100644 index 9f9db032..00000000 --- a/apps/fabric/src/fabric.erl +++ /dev/null @@ -1,264 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric). - --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -% DBs --export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1, - delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3, - set_security/3, get_revs_limit/1, get_security/1]). - -% Documents --export([open_doc/3, open_revs/4, get_missing_revs/2, update_doc/3, - update_docs/3, att_receiver/2]). - -% Views --export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6, - get_view_group_info/2]). - -% miscellany --export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0, - cleanup_index_files/1]). - --include("fabric.hrl"). - -% db operations - -all_dbs() -> - all_dbs(<<>>). - -all_dbs(Prefix) when is_list(Prefix) -> - all_dbs(list_to_binary(Prefix)); -all_dbs(Prefix) when is_binary(Prefix) -> - Length = byte_size(Prefix), - MatchingDbs = ets:foldl(fun(#shard{dbname=DbName}, Acc) -> - case DbName of - <> -> - [DbName | Acc]; - _ -> - Acc - end - end, [], partitions), - {ok, lists:usort(MatchingDbs)}. - -get_db_info(DbName) -> - fabric_db_info:go(dbname(DbName)). - -get_doc_count(DbName) -> - fabric_db_doc_count:go(dbname(DbName)). - -create_db(DbName) -> - create_db(DbName, []). - -create_db(DbName, Options) -> - fabric_db_create:go(dbname(DbName), opts(Options)). - -delete_db(DbName) -> - delete_db(DbName, []). - -delete_db(DbName, Options) -> - fabric_db_delete:go(dbname(DbName), opts(Options)). - -set_revs_limit(DbName, Limit, Options) when is_integer(Limit), Limit > 0 -> - fabric_db_meta:set_revs_limit(dbname(DbName), Limit, opts(Options)). - -get_revs_limit(DbName) -> - {ok, Db} = fabric_util:get_db(dbname(DbName)), - try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end. - -set_security(DbName, SecObj, Options) -> - fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)). - -get_security(DbName) -> - {ok, Db} = fabric_util:get_db(dbname(DbName)), - try couch_db:get_security(Db) after catch couch_db:close(Db) end. - -% doc operations -open_doc(DbName, Id, Options) -> - fabric_doc_open:go(dbname(DbName), docid(Id), opts(Options)). - -open_revs(DbName, Id, Revs, Options) -> - fabric_doc_open_revs:go(dbname(DbName), docid(Id), Revs, opts(Options)). - -get_missing_revs(DbName, IdsRevs) when is_list(IdsRevs) -> - Sanitized = [idrevs(IdR) || IdR <- IdsRevs], - fabric_doc_missing_revs:go(dbname(DbName), Sanitized). - -update_doc(DbName, Doc, Options) -> - case update_docs(DbName, [Doc], opts(Options)) of - {ok, [{ok, NewRev}]} -> - {ok, NewRev}; - {ok, [Error]} -> - throw(Error); - {ok, []} -> - % replication success - #doc{revs = {Pos, [RevId | _]}} = doc(Doc), - {ok, {Pos, RevId}} - end. - -update_docs(DbName, Docs, Options) -> - try fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)) - catch {aborted, PreCommitFailures} -> - {aborted, PreCommitFailures} - end. - -att_receiver(Req, Length) -> - fabric_doc_attachments:receiver(Req, Length). - -all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs) when - is_function(Callback, 2) -> - fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0). - -changes(DbName, Callback, Acc0, Options) -> - % TODO use a keylist for Options instead of #changes_args, BugzID 10281 - Feed = Options#changes_args.feed, - fabric_view_changes:go(dbname(DbName), Feed, Options, Callback, Acc0). - -query_view(DbName, DesignName, ViewName) -> - query_view(DbName, DesignName, ViewName, #view_query_args{}). - -query_view(DbName, DesignName, ViewName, QueryArgs) -> - Callback = fun default_callback/2, - query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs). - -query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) -> - Db = dbname(DbName), View = name(ViewName), - case is_reduce_view(Db, Design, View, QueryArgs) of - true -> - Mod = fabric_view_reduce; - false -> - Mod = fabric_view_map - end, - Mod:go(Db, Design, View, QueryArgs, Callback, Acc0). - -get_view_group_info(DbName, DesignId) -> - fabric_group_info:go(dbname(DbName), design_doc(DesignId)). - -design_docs(DbName) -> - QueryArgs = #view_query_args{start_key = <<"_design/">>, include_docs=true}, - Callback = fun({total_and_offset, _, _}, []) -> - {ok, []}; - ({row, {Props}}, Acc) -> - case couch_util:get_value(id, Props) of - <<"_design/", _/binary>> -> - {ok, [couch_util:get_value(doc, Props) | Acc]}; - _ -> - {stop, Acc} - end; - (complete, Acc) -> - {ok, lists:reverse(Acc)} - end, - fabric:all_docs(dbname(DbName), Callback, [], QueryArgs). - -reset_validation_funs(DbName) -> - [rexi:cast(Node, {fabric_rpc, reset_validation_funs, [Name]}) || - #shard{node=Node, name=Name} <- mem3:shards(DbName)]. - -cleanup_index_files() -> - {ok, DbNames} = fabric:all_dbs(), - [cleanup_index_files(Name) || Name <- DbNames]. - -cleanup_index_files(DbName) -> - {ok, DesignDocs} = fabric:design_docs(DbName), - - ActiveSigs = lists:map(fun(#doc{id = GroupId}) -> - {ok, Info} = fabric:get_view_group_info(DbName, GroupId), - binary_to_list(couch_util:get_value(signature, Info)) - end, [couch_doc:from_json_obj(DD) || DD <- DesignDocs]), - - FileList = filelib:wildcard([couch_config:get("couchdb", "view_index_dir"), - "/.shards/*/", couch_util:to_list(DbName), "_design/*"]), - - DeleteFiles = if ActiveSigs =:= [] -> FileList; true -> - {ok, RegExp} = re:compile([$(, string:join(ActiveSigs, "|"), $)]), - lists:filter(fun(FilePath) -> - re:run(FilePath, RegExp, [{capture, none}]) == nomatch - end, FileList) - end, - [file:delete(File) || File <- DeleteFiles], - ok. - -%% some simple type validation and transcoding - -dbname(DbName) when is_list(DbName) -> - list_to_binary(DbName); -dbname(DbName) when is_binary(DbName) -> - DbName; -dbname(#db{name=Name}) -> - Name; -dbname(DbName) -> - erlang:error({illegal_database_name, DbName}). - -name(Thing) -> - couch_util:to_binary(Thing). - -docid(DocId) when is_list(DocId) -> - list_to_binary(DocId); -docid(DocId) when is_binary(DocId) -> - DocId; -docid(DocId) -> - erlang:error({illegal_docid, DocId}). - -docs(Docs) when is_list(Docs) -> - [doc(D) || D <- Docs]; -docs(Docs) -> - erlang:error({illegal_docs_list, Docs}). - -doc(#doc{} = Doc) -> - Doc; -doc({_} = Doc) -> - couch_doc:from_json_obj(Doc); -doc(Doc) -> - erlang:error({illegal_doc_format, Doc}). - -design_doc(#doc{} = DDoc) -> - DDoc; -design_doc(DocId) when is_list(DocId) -> - design_doc(list_to_binary(DocId)); -design_doc(<<"_design/", _/binary>> = DocId) -> - DocId; -design_doc(GroupName) -> - <<"_design/", GroupName/binary>>. - -idrevs({Id, Revs}) when is_list(Revs) -> - {docid(Id), [rev(R) || R <- Revs]}. - -rev(Rev) when is_list(Rev); is_binary(Rev) -> - couch_doc:parse_rev(Rev); -rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) -> - Rev. - -opts(Options) -> - case couch_util:get_value(user_ctx, Options) of - undefined -> - case erlang:get(user_ctx) of - #user_ctx{} = Ctx -> - [{user_ctx, Ctx} | Options]; - _ -> - Options - end; - _ -> - Options - end. - -default_callback(complete, Acc) -> - {ok, lists:reverse(Acc)}; -default_callback(Row, Acc) -> - {ok, [Row | Acc]}. - -is_reduce_view(_, _, _, #view_query_args{view_type=Reduce}) -> - Reduce =:= reduce. diff --git a/apps/fabric/src/fabric_db_create.erl b/apps/fabric/src/fabric_db_create.erl deleted file mode 100644 index ccea943d..00000000 --- a/apps/fabric/src/fabric_db_create.erl +++ /dev/null @@ -1,79 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_create). --export([go/2]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - --define(DBNAME_REGEX, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/\\s.]*$"). - -%% @doc Create a new database, and all its partition files across the cluster -%% Options is proplist with user_ctx, n, q -go(DbName, Options) -> - case re:run(DbName, ?DBNAME_REGEX, [{capture,none}]) of - match -> - Shards = mem3:choose_shards(DbName, Options), - Doc = make_document(Shards), - Workers = fabric_util:submit_jobs(Shards, create_db, [Options, Doc]), - Acc0 = fabric_dict:init(Workers, nil), - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {ok, _} -> - ok; - Else -> - Else - end; - nomatch -> - {error, illegal_database_name} - end. - -handle_message(Msg, Shard, Counters) -> - C1 = fabric_dict:store(Shard, Msg, Counters), - case fabric_dict:any(nil, C1) of - true -> - {ok, C1}; - false -> - final_answer(C1) - end. - -make_document([#shard{dbname=DbName}|_] = Shards) -> - {RawOut, ByNodeOut, ByRangeOut} = - lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) -> - Range = ?l2b([couch_util:to_hex(<>), "-", - couch_util:to_hex(<>)]), - Node = couch_util:to_binary(N), - {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode), - orddict:append(Range, Node, ByRange)} - end, {[], [], []}, Shards), - #doc{id=DbName, body = {[ - {<<"changelog">>, lists:sort(RawOut)}, - {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, - {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} - ]}}. - -final_answer(Counters) -> - Successes = [X || {_, M} = X <- Counters, M == ok orelse M == file_exists], - case fabric_view:is_progress_possible(Successes) of - true -> - case lists:keymember(file_exists, 2, Successes) of - true -> - {error, file_exists}; - false -> - {stop, ok} - end; - false -> - {error, internal_server_error} - end. diff --git a/apps/fabric/src/fabric_db_delete.erl b/apps/fabric/src/fabric_db_delete.erl deleted file mode 100644 index c3000e57..00000000 --- a/apps/fabric/src/fabric_db_delete.erl +++ /dev/null @@ -1,55 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_delete). --export([go/2]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -go(DbName, Options) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, delete_db, [Options, DbName]), - Acc0 = fabric_dict:init(Workers, nil), - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {ok, ok} -> - ok; - {ok, not_found} -> - erlang:error(database_does_not_exist); - Error -> - Error - end. - -handle_message(Msg, Shard, Counters) -> - C1 = fabric_dict:store(Shard, Msg, Counters), - case fabric_dict:any(nil, C1) of - true -> - {ok, C1}; - false -> - final_answer(C1) - end. - -final_answer(Counters) -> - Successes = [X || {_, M} = X <- Counters, M == ok orelse M == not_found], - case fabric_view:is_progress_possible(Successes) of - true -> - case lists:keymember(ok, 2, Successes) of - true -> - {stop, ok}; - false -> - {stop, not_found} - end; - false -> - {error, internal_server_error} - end. diff --git a/apps/fabric/src/fabric_db_doc_count.erl b/apps/fabric/src/fabric_db_doc_count.erl deleted file mode 100644 index faa755e6..00000000 --- a/apps/fabric/src/fabric_db_doc_count.erl +++ /dev/null @@ -1,46 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_doc_count). - --export([go/1]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, get_doc_count, []), - Acc0 = {fabric_dict:init(Workers, nil), 0}, - fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). - -handle_message({ok, Count}, Shard, {Counters, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, {Counters, Acc}}; - nil -> - C1 = fabric_dict:store(Shard, ok, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, Count+Acc}}; - false -> - {stop, Count+Acc} - end - end; -handle_message(_, _, Acc) -> - {ok, Acc}. - diff --git a/apps/fabric/src/fabric_db_info.erl b/apps/fabric/src/fabric_db_info.erl deleted file mode 100644 index a0acb379..00000000 --- a/apps/fabric/src/fabric_db_info.erl +++ /dev/null @@ -1,69 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_info). - --export([go/1]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -go(DbName) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, get_db_info, []), - Acc0 = {fabric_dict:init(Workers, nil), []}, - fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). - -handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, {Counters, Acc}}; - nil -> - Seq = couch_util:get_value(update_seq, Info), - C1 = fabric_dict:store(Shard, Seq, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, [Info|Acc]}}; - false -> - {stop, [ - {db_name,Name}, - {update_seq, fabric_view_changes:pack_seqs(C2)} | - merge_results(lists:flatten([Info|Acc])) - ]} - end - end; -handle_message(_, _, Acc) -> - {ok, Acc}. - -merge_results(Info) -> - Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, - orddict:new(), Info), - orddict:fold(fun - (doc_count, X, Acc) -> - [{doc_count, lists:sum(X)} | Acc]; - (doc_del_count, X, Acc) -> - [{doc_del_count, lists:sum(X)} | Acc]; - (purge_seq, X, Acc) -> - [{purge_seq, lists:sum(X)} | Acc]; - (compact_running, X, Acc) -> - [{compact_running, lists:member(true, X)} | Acc]; - (disk_size, X, Acc) -> - [{disk_size, lists:sum(X)} | Acc]; - (disk_format_version, X, Acc) -> - [{disk_format_version, lists:max(X)} | Acc]; - (_, _, Acc) -> - Acc - end, [{instance_start_time, <<"0">>}], Dict). diff --git a/apps/fabric/src/fabric_db_meta.erl b/apps/fabric/src/fabric_db_meta.erl deleted file mode 100644 index cb46f380..00000000 --- a/apps/fabric/src/fabric_db_meta.erl +++ /dev/null @@ -1,49 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_meta). - --export([set_revs_limit/3, set_security/3]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -set_revs_limit(DbName, Limit, Options) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, set_revs_limit, [Limit, Options]), - Waiting = length(Workers) - 1, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of - {ok, ok} -> - ok; - Error -> - Error - end. - -set_security(DbName, SecObj, Options) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, set_security, [SecObj, Options]), - Waiting = length(Workers) - 1, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of - {ok, ok} -> - ok; - Error -> - Error - end. - -handle_message(ok, _, 0) -> - {stop, ok}; -handle_message(ok, _, Waiting) -> - {ok, Waiting - 1}; -handle_message(Error, _, _Waiting) -> - {error, Error}. \ No newline at end of file diff --git a/apps/fabric/src/fabric_dict.erl b/apps/fabric/src/fabric_dict.erl deleted file mode 100644 index 7db98923..00000000 --- a/apps/fabric/src/fabric_dict.erl +++ /dev/null @@ -1,51 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_dict). --compile(export_all). - -% Instead of ets, let's use an ordered keylist. We'll need to revisit if we -% have >> 100 shards, so a private interface is a good idea. - APK June 2010 - -init(Keys, InitialValue) -> - orddict:from_list([{Key, InitialValue} || Key <- Keys]). - - -decrement_all(Dict) -> - [{K,V-1} || {K,V} <- Dict]. - -store(Key, Value, Dict) -> - orddict:store(Key, Value, Dict). - -erase(Key, Dict) -> - orddict:erase(Key, Dict). - -update_counter(Key, Incr, Dict0) -> - orddict:update_counter(Key, Incr, Dict0). - - -lookup_element(Key, Dict) -> - couch_util:get_value(Key, Dict). - -size(Dict) -> - orddict:size(Dict). - -any(Value, Dict) -> - lists:keymember(Value, 2, Dict). - -filter(Fun, Dict) -> - orddict:filter(Fun, Dict). - -fold(Fun, Acc0, Dict) -> - orddict:fold(Fun, Acc0, Dict). diff --git a/apps/fabric/src/fabric_doc_attachments.erl b/apps/fabric/src/fabric_doc_attachments.erl deleted file mode 100644 index b66e2ae4..00000000 --- a/apps/fabric/src/fabric_doc_attachments.erl +++ /dev/null @@ -1,116 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_doc_attachments). - --include("fabric.hrl"). - -%% couch api calls --export([receiver/2]). - -receiver(_Req, undefined) -> - <<"">>; -receiver(_Req, {unknown_transfer_encoding, Unknown}) -> - exit({unknown_transfer_encoding, Unknown}); -receiver(Req, chunked) -> - MiddleMan = spawn(fun() -> middleman(Req, chunked) end), - fun(4096, ChunkFun, ok) -> - write_chunks(MiddleMan, ChunkFun) - end; -receiver(_Req, 0) -> - <<"">>; -receiver(Req, Length) when is_integer(Length) -> - Middleman = spawn(fun() -> middleman(Req, Length) end), - fun() -> - Middleman ! {self(), gimme_data}, - receive {Middleman, Data} -> Data end - end; -receiver(_Req, Length) -> - exit({length_not_integer, Length}). - -%% -%% internal -%% - -write_chunks(MiddleMan, ChunkFun) -> - MiddleMan ! {self(), gimme_data}, - receive - {MiddleMan, {0, _Footers}} -> - % MiddleMan ! {self(), done}, - ok; - {MiddleMan, ChunkRecord} -> - ChunkFun(ChunkRecord, ok), - write_chunks(MiddleMan, ChunkFun) - end. - -receive_unchunked_attachment(_Req, 0) -> - ok; -receive_unchunked_attachment(Req, Length) -> - receive {MiddleMan, go} -> - Data = couch_httpd:recv(Req, 0), - MiddleMan ! {self(), Data} - end, - receive_unchunked_attachment(Req, Length - size(Data)). - -middleman(Req, chunked) -> - % spawn a process to actually receive the uploaded data - RcvFun = fun(ChunkRecord, ok) -> - receive {From, go} -> From ! {self(), ChunkRecord} end, ok - end, - Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end), - - % take requests from the DB writers and get data from the receiver - N = erlang:list_to_integer(couch_config:get("cluster","n")), - middleman_loop(Receiver, N, dict:new(), 0, []); - -middleman(Req, Length) -> - Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end), - N = erlang:list_to_integer(couch_config:get("cluster","n")), - middleman_loop(Receiver, N, dict:new(), 0, []). - -middleman_loop(Receiver, N, Counters, Offset, ChunkList) -> - receive {From, gimme_data} -> - % figure out how far along this writer (From) is in the list - {NewCounters, WhichChunk} = case dict:find(From, Counters) of - {ok, I} -> - {dict:update_counter(From, 1, Counters), I}; - error -> - {dict:store(From, 2, Counters), 1} - end, - ListIndex = WhichChunk - Offset, - - % talk to the receiver to get another chunk if necessary - ChunkList1 = if ListIndex > length(ChunkList) -> - Receiver ! {self(), go}, - receive {Receiver, ChunkRecord} -> ChunkList ++ [ChunkRecord] end; - true -> ChunkList end, - - % reply to the writer - From ! {self(), lists:nth(ListIndex, ChunkList1)}, - - % check if we can drop a chunk from the head of the list - SmallestIndex = dict:fold(fun(_, Val, Acc) -> lists:min([Val,Acc]) end, - WhichChunk+1, NewCounters), - Size = dict:size(NewCounters), - - {NewChunkList, NewOffset} = - if Size == N andalso (SmallestIndex - Offset) == 2 -> - {tl(ChunkList1), Offset+1}; - true -> - {ChunkList1, Offset} - end, - middleman_loop(Receiver, N, NewCounters, NewOffset, NewChunkList) - after 10000 -> - ok - end. diff --git a/apps/fabric/src/fabric_doc_missing_revs.erl b/apps/fabric/src/fabric_doc_missing_revs.erl deleted file mode 100644 index a4d54192..00000000 --- a/apps/fabric/src/fabric_doc_missing_revs.erl +++ /dev/null @@ -1,78 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_doc_missing_revs). - --export([go/2]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - -go(DbName, AllIdsRevs) -> - Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) -> - Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}), - Shard#shard{ref=Ref} - end, group_idrevs_by_shard(DbName, AllIdsRevs)), - ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]), - Acc0 = {length(Workers), ResultDict}, - fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). - -handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({rexi_EXIT, _, _, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({ok, Results}, _Worker, {1, D0}) -> - D = update_dict(D0, Results), - {stop, dict:fold(fun force_reply/3, [], D)}; -handle_message({ok, Results}, _Worker, {WaitingCount, D0}) -> - D = update_dict(D0, Results), - case dict:fold(fun maybe_reply/3, {stop, []}, D) of - continue -> - % still haven't heard about some Ids - {ok, {WaitingCount - 1, D}}; - {stop, FinalReply} -> - {stop, FinalReply} - end. - -force_reply(Id, {nil,Revs}, Acc) -> - % never heard about this ID, assume it's missing - [{Id, Revs} | Acc]; -force_reply(_, [], Acc) -> - Acc; -force_reply(Id, Revs, Acc) -> - [{Id, Revs} | Acc]. - -maybe_reply(_, _, continue) -> - continue; -maybe_reply(_, {nil, _}, _) -> - continue; -maybe_reply(_, [], {stop, Acc}) -> - {stop, Acc}; -maybe_reply(Id, Revs, {stop, Acc}) -> - {stop, [{Id, Revs} | Acc]}. - -group_idrevs_by_shard(DbName, IdsRevs) -> - dict:to_list(lists:foldl(fun({Id, Revs}, D0) -> - lists:foldl(fun(Shard, D1) -> - dict:append(Shard, {Id, Revs}, D1) - end, D0, mem3:shards(DbName,Id)) - end, dict:new(), IdsRevs)). - -update_dict(D0, KVs) -> - lists:foldl(fun({K,V,_}, D1) -> dict:store(K, V, D1) end, D0, KVs). - -skip_message({1, Dict}) -> - {stop, dict:fold(fun force_reply/3, [], Dict)}; -skip_message({WaitingCount, Dict}) -> - {ok, {WaitingCount-1, Dict}}. diff --git a/apps/fabric/src/fabric_doc_open.erl b/apps/fabric/src/fabric_doc_open.erl deleted file mode 100644 index dd4917b9..00000000 --- a/apps/fabric/src/fabric_doc_open.erl +++ /dev/null @@ -1,120 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_doc_open). - --export([go/3]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, Id, Options) -> - Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc, - [Id, [deleted|Options]]), - SuppressDeletedDoc = not lists:member(deleted, Options), - R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")), - RepairOpts = [{r, integer_to_list(mem3:n(DbName))} | Options], - Acc0 = {length(Workers), list_to_integer(R), []}, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {ok, Reply} -> - format_reply(Reply, SuppressDeletedDoc); - {error, needs_repair, Reply} -> - spawn(fabric, open_revs, [DbName, Id, all, RepairOpts]), - format_reply(Reply, SuppressDeletedDoc); - {error, needs_repair} -> - % we couldn't determine the correct reply, so we'll run a sync repair - {ok, Results} = fabric:open_revs(DbName, Id, all, RepairOpts), - case lists:partition(fun({ok, #doc{deleted=Del}}) -> Del end, Results) of - {[], []} -> - {not_found, missing}; - {_DeletedDocs, []} when SuppressDeletedDoc -> - {not_found, deleted}; - {DeletedDocs, []} -> - lists:last(lists:sort(DeletedDocs)); - {_, LiveDocs} -> - lists:last(lists:sort(LiveDocs)) - end; - Error -> - Error - end. - -format_reply({ok, #doc{deleted=true}}, true) -> - {not_found, deleted}; -format_reply(Else, _) -> - Else. - -handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({rexi_EXIT, _Reason}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message(Reply, _Worker, {WaitingCount, R, Replies}) -> - NewReplies = orddict:update_counter(Reply, 1, Replies), - Reduced = fabric_util:remove_ancestors(NewReplies, []), - case lists:dropwhile(fun({_, Count}) -> Count < R end, Reduced) of - [{QuorumReply, _} | _] -> - if length(NewReplies) =:= 1 -> - {stop, QuorumReply}; - true -> - % we had some disagreement amongst the workers, so repair is useful - {error, needs_repair, QuorumReply} - end; - [] -> - if WaitingCount =:= 1 -> - {error, needs_repair}; - true -> - {ok, {WaitingCount-1, R, NewReplies}} - end - end. - -skip_message({1, _R, _Replies}) -> - {error, needs_repair}; -skip_message({WaitingCount, R, Replies}) -> - {ok, {WaitingCount-1, R, Replies}}. - - -open_doc_test() -> - Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}}, - Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}}, - Bar1 = {ok, #doc{revs = {1,[<<"bar">>]}}}, - Baz1 = {ok, #doc{revs = {1,[<<"baz">>]}}}, - NF = {not_found, missing}, - State0 = {3, 2, []}, - State1 = {2, 2, [{Foo1,1}]}, - State2 = {1, 2, [{Bar1,1}, {Foo1,1}]}, - ?assertEqual({ok, State1}, handle_message(Foo1, nil, State0)), - - % normal case - quorum reached, no disagreement - ?assertEqual({stop, Foo1}, handle_message(Foo1, nil, State1)), - - % 2nd worker disagrees, voting continues - ?assertEqual({ok, State2}, handle_message(Bar1, nil, State1)), - - % 3rd worker resolves voting, but repair is needed - ?assertEqual({error, needs_repair, Foo1}, handle_message(Foo1, nil, State2)), - - % 2nd worker comes up with descendant of Foo1, voting resolved, run repair - ?assertEqual({error, needs_repair, Foo2}, handle_message(Foo2, nil, State1)), - - % not_found is considered to be an ancestor of everybody - ?assertEqual({error, needs_repair, Foo1}, handle_message(NF, nil, State1)), - - % 3 distinct edit branches result in quorum failure - ?assertEqual({error, needs_repair}, handle_message(Baz1, nil, State2)), - - % bad node concludes voting w/o success, run sync repair to get the result - ?assertEqual( - {error, needs_repair}, - handle_message({rexi_DOWN, 1, 2, 3}, nil, State2) - ). diff --git a/apps/fabric/src/fabric_doc_open_revs.erl b/apps/fabric/src/fabric_doc_open_revs.erl deleted file mode 100644 index d0aec6e4..00000000 --- a/apps/fabric/src/fabric_doc_open_revs.erl +++ /dev/null @@ -1,284 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_doc_open_revs). - --export([go/4]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("eunit/include/eunit.hrl"). - --record(state, { - dbname, - worker_count, - reply_count = 0, - r, - revs, - latest, - replies = [] -}). - -go(DbName, Id, Revs, Options) -> - Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs, - [Id, Revs, Options]), - R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")), - State = #state{ - dbname = DbName, - worker_count = length(Workers), - r = list_to_integer(R), - revs = Revs, - latest = lists:member(latest, Options), - replies = case Revs of all -> []; Revs -> [{Rev,[]} || Rev <- Revs] end - }, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of - {ok, {ok, Reply}} -> - {ok, Reply}; - Else -> - Else - end. - -handle_message({rexi_DOWN, _, _, _}, _Worker, State) -> - skip(State); -handle_message({rexi_EXIT, _}, _Worker, State) -> - skip(State); -handle_message({ok, RawReplies}, _Worker, #state{revs = all} = State) -> - #state{ - dbname = DbName, - reply_count = ReplyCount, - worker_count = WorkerCount, - replies = All0, - r = R - } = State, - All = lists:foldl(fun(Reply,D) -> orddict:update_counter(Reply,1,D) end, - All0, RawReplies), - Reduced = fabric_util:remove_ancestors(All, []), - Complete = (ReplyCount =:= (WorkerCount - 1)), - QuorumMet = lists:all(fun({_, C}) -> C >= R end, Reduced), - case Reduced of All when QuorumMet andalso ReplyCount =:= (R-1) -> - Repair = false; - _ -> - Repair = [D || {{ok,D}, _} <- Reduced] - end, - case maybe_reply(DbName, Reduced, Complete, Repair, R) of - noreply -> - {ok, State#state{replies = All, reply_count = ReplyCount+1}}; - {reply, FinalReply} -> - {stop, FinalReply} - end; -handle_message({ok, RawReplies0}, _Worker, State) -> - % we've got an explicit revision list, but if latest=true the workers may - % return a descendant of the requested revision. Take advantage of the - % fact that revisions are returned in order to keep track. - RawReplies = strip_not_found_missing(RawReplies0), - #state{ - dbname = DbName, - reply_count = ReplyCount, - worker_count = WorkerCount, - replies = All0, - r = R - } = State, - All = lists:zipwith(fun({Rev, D}, Reply) -> - if Reply =:= error -> {Rev, D}; true -> - {Rev, orddict:update_counter(Reply, 1, D)} - end - end, All0, RawReplies), - Reduced = [fabric_util:remove_ancestors(X, []) || {_, X} <- All], - FinalReplies = [choose_winner(X, R) || X <- Reduced], - Complete = (ReplyCount =:= (WorkerCount - 1)), - case is_repair_needed(All, FinalReplies) of - true -> - Repair = [D || {{ok,D}, _} <- lists:flatten(Reduced)]; - false -> - Repair = false - end, - case maybe_reply(DbName, FinalReplies, Complete, Repair, R) of - noreply -> - {ok, State#state{replies = All, reply_count = ReplyCount+1}}; - {reply, FinalReply} -> - {stop, FinalReply} - end. - -skip(#state{revs=all} = State) -> - handle_message({ok, []}, nil, State); -skip(#state{revs=Revs} = State) -> - handle_message({ok, [error || _Rev <- Revs]}, nil, State). - -maybe_reply(_, [], false, _, _) -> - noreply; -maybe_reply(DbName, ReplyDict, IsComplete, RepairDocs, R) -> - case lists:all(fun({_, C}) -> C >= R end, ReplyDict) of - true -> - maybe_execute_read_repair(DbName, RepairDocs), - {reply, unstrip_not_found_missing(orddict:fetch_keys(ReplyDict))}; - false -> - case IsComplete of false -> noreply; true -> - maybe_execute_read_repair(DbName, RepairDocs), - {reply, unstrip_not_found_missing(orddict:fetch_keys(ReplyDict))} - end - end. - -choose_winner(Options, R) -> - case lists:dropwhile(fun({_Reply, C}) -> C < R end, Options) of - [] -> - case [Elem || {{ok, #doc{}}, _} = Elem <- Options] of - [] -> - hd(Options); - Docs -> - lists:last(lists:sort(Docs)) - end; - [QuorumMet | _] -> - QuorumMet - end. - -% repair needed if any reply other than the winner has been received for a rev -is_repair_needed([], []) -> - false; -is_repair_needed([{_Rev, [Reply]} | Tail1], [Reply | Tail2]) -> - is_repair_needed(Tail1, Tail2); -is_repair_needed(_, _) -> - true. - -maybe_execute_read_repair(_Db, false) -> - ok; -maybe_execute_read_repair(Db, Docs) -> - spawn(fun() -> - [#doc{id=Id} | _] = Docs, - Ctx = #user_ctx{roles=[<<"_admin">>]}, - Res = fabric:update_docs(Db, Docs, [replicated_changes, {user_ctx,Ctx}]), - ?LOG_INFO("read_repair ~s ~s ~p", [Db, Id, Res]) - end). - -% hackery required so that not_found sorts first -strip_not_found_missing([]) -> - []; -strip_not_found_missing([{{not_found, missing}, Rev} | Rest]) -> - [{not_found, Rev} | strip_not_found_missing(Rest)]; -strip_not_found_missing([Else | Rest]) -> - [Else | strip_not_found_missing(Rest)]. - -unstrip_not_found_missing([]) -> - []; -unstrip_not_found_missing([{not_found, Rev} | Rest]) -> - [{{not_found, missing}, Rev} | unstrip_not_found_missing(Rest)]; -unstrip_not_found_missing([Else | Rest]) -> - [Else | unstrip_not_found_missing(Rest)]. - -all_revs_test() -> - State0 = #state{worker_count = 3, r = 2, revs = all}, - Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, - Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, - Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, - - % an empty worker response does not count as meeting quorum - ?assertMatch( - {ok, #state{}}, - handle_message({ok, []}, nil, State0) - ), - - ?assertMatch( - {ok, #state{}}, - handle_message({ok, [Foo1, Bar1]}, nil, State0) - ), - {ok, State1} = handle_message({ok, [Foo1, Bar1]}, nil, State0), - - % the normal case - workers agree - ?assertEqual( - {stop, [Bar1, Foo1]}, - handle_message({ok, [Foo1, Bar1]}, nil, State1) - ), - - % a case where the 2nd worker has a newer Foo - currently we're considering - % Foo to have reached quorum and execute_read_repair() - ?assertEqual( - {stop, [Bar1, Foo2]}, - handle_message({ok, [Foo2, Bar1]}, nil, State1) - ), - - % a case where quorum has not yet been reached for Foo - ?assertMatch( - {ok, #state{}}, - handle_message({ok, [Bar1]}, nil, State1) - ), - {ok, State2} = handle_message({ok, [Bar1]}, nil, State1), - - % still no quorum, but all workers have responded. We include Foo1 in the - % response and execute_read_repair() - ?assertEqual( - {stop, [Bar1, Foo1]}, - handle_message({ok, [Bar1]}, nil, State2) - ). - -specific_revs_test() -> - Revs = [{1,<<"foo">>}, {1,<<"bar">>}, {1,<<"baz">>}], - State0 = #state{ - worker_count = 3, - r = 2, - revs = Revs, - latest = false, - replies = [{Rev,[]} || Rev <- Revs] - }, - Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, - Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, - Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, - Baz1 = {{not_found, missing}, {1,<<"baz">>}}, - Baz2 = {ok, #doc{revs = {1, [<<"baz">>]}}}, - - ?assertMatch( - {ok, #state{}}, - handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0) - ), - {ok, State1} = handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0), - - % the normal case - workers agree - ?assertEqual( - {stop, [Foo1, Bar1, Baz1]}, - handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1) - ), - - % latest=true, worker responds with Foo2 and we return it - State0L = State0#state{latest = true}, - ?assertMatch( - {ok, #state{}}, - handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L) - ), - {ok, State1L} = handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L), - ?assertEqual( - {stop, [Foo2, Bar1, Baz1]}, - handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State1L) - ), - - % Foo1 is included in the read quorum for Foo2 - ?assertEqual( - {stop, [Foo2, Bar1, Baz1]}, - handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1L) - ), - - % {not_found, missing} is included in the quorum for any found revision - ?assertEqual( - {stop, [Foo2, Bar1, Baz2]}, - handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State1L) - ), - - % a worker failure is skipped - ?assertMatch( - {ok, #state{}}, - handle_message({rexi_EXIT, foo}, nil, State1L) - ), - {ok, State2L} = handle_message({rexi_EXIT, foo}, nil, State1L), - ?assertEqual( - {stop, [Foo2, Bar1, Baz2]}, - handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State2L) - ). diff --git a/apps/fabric/src/fabric_doc_update.erl b/apps/fabric/src/fabric_doc_update.erl deleted file mode 100644 index 50d02888..00000000 --- a/apps/fabric/src/fabric_doc_update.erl +++ /dev/null @@ -1,147 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_doc_update). - --export([go/3]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(_, [], _) -> - {ok, []}; -go(DbName, AllDocs, Opts) -> - validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), - Options = lists:delete(all_or_nothing, Opts), - GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> - Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), - {Shard#shard{ref=Ref}, Docs} - end, group_docs_by_shard(DbName, AllDocs)), - {Workers, _} = lists:unzip(GroupedDocs), - W = couch_util:get_value(w, Options, couch_config:get("cluster","w","2")), - Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, - dict:from_list([{Doc,[]} || Doc <- AllDocs])}, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {ok, Results} -> - Reordered = couch_util:reorder_results(AllDocs, Results), - {ok, [R || R <- Reordered, R =/= noreply]}; - Else -> - Else - end. - -handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({rexi_EXIT, _}, _Worker, Acc0) -> - {WaitingCount, _, W, _, DocReplyDict} = Acc0, - if WaitingCount =:= 1 -> - {W, Reply} = dict:fold(fun force_reply/3, {W,[]}, DocReplyDict), - {stop, Reply}; - true -> - {ok, setelement(1, Acc0, WaitingCount-1)} - end; -handle_message({ok, Replies}, Worker, Acc0) -> - {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, - Docs = couch_util:get_value(Worker, GroupedDocs), - DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), - case {WaitingCount, dict:size(DocReplyDict)} of - {1, _} -> - % last message has arrived, we need to conclude things - {W, Reply} = dict:fold(fun force_reply/3, {W,[]}, DocReplyDict), - {stop, Reply}; - {_, DocCount} -> - % we've got at least one reply for each document, let's take a look - case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of - continue -> - {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}; - {stop, W, FinalReplies} -> - {stop, FinalReplies} - end; - {_, N} when N < DocCount -> - % no point in trying to finalize anything yet - {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}} - end; -handle_message({missing_stub, Stub}, _, _) -> - throw({missing_stub, Stub}); -handle_message({not_found, no_db_file} = X, Worker, Acc0) -> - {_, _, _, GroupedDocs, _} = Acc0, - Docs = couch_util:get_value(Worker, GroupedDocs), - handle_message({ok, [X || _D <- Docs]}, Worker, Acc0). - -force_reply(Doc, [], {W, Acc}) -> - {W, [{Doc, {error, internal_server_error}} | Acc]}; -force_reply(Doc, [FirstReply|_] = Replies, {W, Acc}) -> - case update_quorum_met(W, Replies) of - {true, Reply} -> - {W, [{Doc,Reply} | Acc]}; - false -> - ?LOG_ERROR("write quorum (~p) failed, reply ~p", [W, FirstReply]), - % TODO make a smarter choice than just picking the first reply - {W, [{Doc,FirstReply} | Acc]} - end. - -maybe_reply(_, _, continue) -> - % we didn't meet quorum for all docs, so we're fast-forwarding the fold - continue; -maybe_reply(Doc, Replies, {stop, W, Acc}) -> - case update_quorum_met(W, Replies) of - {true, Reply} -> - {stop, W, [{Doc, Reply} | Acc]}; - false -> - continue - end. - -update_quorum_met(W, Replies) -> - Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end, - orddict:new(), Replies), - case lists:dropwhile(fun({_, Count}) -> Count < W end, Counters) of - [] -> - false; - [{FinalReply, _} | _] -> - {true, FinalReply} - end. - --spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. -group_docs_by_shard(DbName, Docs) -> - dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> - lists:foldl(fun(Shard, D1) -> - dict:append(Shard, Doc, D1) - end, D0, mem3:shards(DbName,Id)) - end, dict:new(), Docs)). - -append_update_replies([], [], DocReplyDict) -> - DocReplyDict; -append_update_replies([Doc|Rest], [], Dict0) -> - % icky, if replicated_changes only errors show up in result - append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0)); -append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> - % TODO what if the same document shows up twice in one update_docs call? - append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). - -skip_message(Acc0) -> - % TODO fix this - {ok, Acc0}. - -validate_atomic_update(_, _, false) -> - ok; -validate_atomic_update(_DbName, AllDocs, true) -> - % TODO actually perform the validation. This requires some hackery, we need - % to basically extract the prep_and_validate_updates function from couch_db - % and only run that, without actually writing in case of a success. - Error = {not_implemented, <<"all_or_nothing is not supported yet">>}, - PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) -> - case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end, - {{Id, {Pos, RevId}}, Error} - end, AllDocs), - throw({aborted, PreCommitFailures}). diff --git a/apps/fabric/src/fabric_group_info.erl b/apps/fabric/src/fabric_group_info.erl deleted file mode 100644 index d5260271..00000000 --- a/apps/fabric/src/fabric_group_info.erl +++ /dev/null @@ -1,66 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_group_info). - --export([go/2]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, GroupId) when is_binary(GroupId) -> - {ok, DDoc} = fabric:open_doc(DbName, GroupId, []), - go(DbName, DDoc); - -go(DbName, #doc{} = DDoc) -> - Group = couch_view_group:design_doc_to_view_group(DDoc), - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, group_info, [Group]), - Acc0 = {fabric_dict:init(Workers, nil), []}, - fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). - -handle_message({ok, Info}, Shard, {Counters, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, {Counters, Acc}}; - nil -> - C1 = fabric_dict:store(Shard, ok, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, [Info|Acc]}}; - false -> - {stop, merge_results(lists:flatten([Info|Acc]))} - end - end; -handle_message(_, _, Acc) -> - {ok, Acc}. - -merge_results(Info) -> - Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, - orddict:new(), Info), - orddict:fold(fun - (signature, [X|_], Acc) -> - [{signature, X} | Acc]; - (language, [X|_], Acc) -> - [{language, X} | Acc]; - (disk_size, X, Acc) -> - [{disk_size, lists:sum(X)} | Acc]; - (compact_running, X, Acc) -> - [{compact_running, lists:member(true, X)} | Acc]; - (_, _, Acc) -> - Acc - end, [], Dict). diff --git a/apps/fabric/src/fabric_rpc.erl b/apps/fabric/src/fabric_rpc.erl deleted file mode 100644 index a7d555e0..00000000 --- a/apps/fabric/src/fabric_rpc.erl +++ /dev/null @@ -1,402 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_rpc). - --export([get_db_info/1, get_doc_count/1, get_update_seq/1]). --export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]). --export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]). --export([create_db/3, delete_db/3, reset_validation_funs/1, set_security/3, - set_revs_limit/3]). - --include("fabric.hrl"). --include_lib("couch/include/couch_db.hrl"). - --record (view_acc, { - db, - limit, - include_docs, - offset = nil, - total_rows, - reduce_fun = fun couch_db:enum_docs_reduce_to_count/1, - group_level = 0 -}). - -%% rpc endpoints -%% call to with_db will supply your M:F with a #db{} and then remaining args - -all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> - {ok, Db} = couch_db:open(DbName, []), - #view_query_args{ - start_key = StartKey, - start_docid = StartDocId, - end_key = EndKey, - end_docid = EndDocId, - limit = Limit, - skip = Skip, - include_docs = IncludeDocs, - direction = Dir, - inclusive_end = Inclusive - } = QueryArgs, - {ok, Total} = couch_db:get_doc_count(Db), - Acc0 = #view_acc{ - db = Db, - include_docs = IncludeDocs, - limit = Limit+Skip, - total_rows = Total - }, - EndKeyType = if Inclusive -> end_key; true -> end_key_gt end, - Options = [ - {dir, Dir}, - {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end}, - {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end} - ], - {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options), - final_response(Total, Acc#view_acc.offset). - -changes(DbName, Args, StartSeq) -> - #changes_args{style=Style, dir=Dir} = Args, - case couch_db:open(DbName, []) of - {ok, Db} -> - Enum = fun changes_enumerator/2, - Opts = [{dir,Dir}], - Acc0 = {Db, StartSeq, Args}, - try - {ok, {_, LastSeq, _}} = - couch_db:changes_since(Db, Style, StartSeq, Enum, Opts, Acc0), - rexi:reply({complete, LastSeq}) - after - couch_db:close(Db) - end; - Error -> - rexi:reply(Error) - end. - -map_view(DbName, DDoc, ViewName, QueryArgs) -> - {ok, Db} = couch_db:open(DbName, []), - #view_query_args{ - limit = Limit, - skip = Skip, - keys = Keys, - include_docs = IncludeDocs, - stale = Stale, - view_type = ViewType - } = QueryArgs, - MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end, - Group0 = couch_view_group:design_doc_to_view_group(DDoc), - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, Group} = couch_view_group:request_group(Pid, MinSeq), - View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType), - {ok, Total} = couch_view:get_row_count(View), - Acc0 = #view_acc{ - db = Db, - include_docs = IncludeDocs, - limit = Limit+Skip, - total_rows = Total, - reduce_fun = fun couch_view:reduce_to_count/1 - }, - case Keys of - nil -> - Options = couch_httpd_view:make_key_options(QueryArgs), - {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options); - _ -> - Acc = lists:foldl(fun(Key, AccIn) -> - KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, - Options = couch_httpd_view:make_key_options(KeyArgs), - {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn, - Options), - Out - end, Acc0, Keys) - end, - final_response(Total, Acc#view_acc.offset). - -reduce_view(DbName, Group0, ViewName, QueryArgs) -> - {ok, Db} = couch_db:open(DbName, []), - #view_query_args{ - group_level = GroupLevel, - limit = Limit, - skip = Skip, - keys = Keys, - stale = Stale - } = QueryArgs, - GroupFun = group_rows_fun(GroupLevel), - MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end, - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - {ok, #group{views=Views, def_lang=Lang}} = couch_view_group:request_group( - Pid, MinSeq), - {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce), - ReduceView = {reduce, NthRed, Lang, View}, - Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip}, - case Keys of - nil -> - Options0 = couch_httpd_view:make_key_options(QueryArgs), - Options = [{key_group_fun, GroupFun} | Options0], - couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options); - _ -> - lists:map(fun(Key) -> - KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, - Options0 = couch_httpd_view:make_key_options(KeyArgs), - Options = [{key_group_fun, GroupFun} | Options0], - couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options) - end, Keys) - end, - rexi:reply(complete). - -create_db(DbName, Options, Doc) -> - mem3_util:write_db_doc(Doc), - rexi:reply(case couch_server:create(DbName, Options) of - {ok, _} -> - ok; - Error -> - Error - end). - -delete_db(DbName, Options, DocId) -> - mem3_util:delete_db_doc(DocId), - rexi:reply(couch_server:delete(DbName, Options)). - -get_db_info(DbName) -> - with_db(DbName, [], {couch_db, get_db_info, []}). - -get_doc_count(DbName) -> - with_db(DbName, [], {couch_db, get_doc_count, []}). - -get_update_seq(DbName) -> - with_db(DbName, [], {couch_db, get_update_seq, []}). - -set_security(DbName, SecObj, Options) -> - with_db(DbName, Options, {couch_db, set_security, [SecObj]}). - -set_revs_limit(DbName, Limit, Options) -> - with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). - -open_doc(DbName, DocId, Options) -> - with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). - -open_revs(DbName, Id, Revs, Options) -> - with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}). - -get_missing_revs(DbName, IdRevsList) -> - % reimplement here so we get [] for Ids with no missing revs in response - rexi:reply(case couch_db:open(DbName, []) of - {ok, Db} -> - Ids = [Id1 || {Id1, _Revs} <- IdRevsList], - {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> - case FullDocInfoResult of - {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} -> - MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs), - {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)}; - not_found -> - {Id, Revs, []} - end - end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))}; - Error -> - Error - end). - -update_docs(DbName, Docs0, Options) -> - case proplists:get_value(replicated_changes, Options) of - true -> - X = replicated_changes; - _ -> - X = interactive_edit - end, - Docs = make_att_readers(Docs0), - with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). - -group_info(DbName, Group0) -> - {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), - rexi:reply(couch_view_group:request_group_info(Pid)). - -reset_validation_funs(DbName) -> - case couch_db:open(DbName, []) of - {ok, #db{main_pid = Pid}} -> - gen_server:cast(Pid, {load_validation_funs, undefined}); - _ -> - ok - end. - -%% -%% internal -%% - -with_db(DbName, Options, {M,F,A}) -> - case couch_db:open(DbName, Options) of - {ok, Db} -> - rexi:reply(try - apply(M, F, [Db | A]) - catch Exception -> - Exception; - error:Reason -> - ?LOG_ERROR("~p ~p ~p~n~p", [?MODULE, {M,F}, Reason, - erlang:get_stacktrace()]), - {error, Reason} - end); - Error -> - rexi:reply(Error) - end. - -view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> - % matches for _all_docs and translates #full_doc_info{} -> KV pair - case couch_doc:to_doc_info(FullDocInfo) of - #doc_info{revs=[#rev_info{deleted=false, rev=Rev}|_]} -> - Id = FullDocInfo#full_doc_info.id, - Value = {[{rev,couch_doc:rev_to_str(Rev)}]}, - view_fold({{Id,Id}, Value}, OffsetReds, Acc); - #doc_info{revs=[#rev_info{deleted=true}|_]} -> - {ok, Acc} - end; -view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) -> - % calculates the offset for this shard - #view_acc{reduce_fun=Reduce} = Acc, - Offset = Reduce(OffsetReds), - case rexi:sync_reply({total_and_offset, Total, Offset}) of - ok -> - view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset}); - stop -> - exit(normal); - timeout -> - exit(timeout) - end; -view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) -> - % we scanned through limit+skip local rows - {stop, Acc}; -view_fold({{Key,Id}, Value}, _Offset, Acc) -> - % the normal case - #view_acc{ - db = Db, - limit = Limit, - include_docs = IncludeDocs - } = Acc, - Doc = if not IncludeDocs -> undefined; true -> - case couch_db:open_doc(Db, Id, []) of - {not_found, deleted} -> - null; - {not_found, missing} -> - undefined; - {ok, Doc0} -> - couch_doc:to_json_obj(Doc0, []) - end - end, - case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of - ok -> - {ok, Acc#view_acc{limit=Limit-1}}; - timeout -> - exit(timeout) - end. - -final_response(Total, nil) -> - case rexi:sync_reply({total_and_offset, Total, Total}) of ok -> - rexi:reply(complete); - stop -> - ok; - timeout -> - exit(timeout) - end; -final_response(_Total, _Offset) -> - rexi:reply(complete). - -group_rows_fun(exact) -> - fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end; -group_rows_fun(0) -> - fun(_A, _B) -> true end; -group_rows_fun(GroupLevel) when is_integer(GroupLevel) -> - fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) -> - lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel); - ({Key1,_}, {Key2,_}) -> - Key1 == Key2 - end. - -reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) -> - {stop, Acc}; -reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) -> - send(null, Red, Acc); -reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) -> - send(Key, Red, Acc); -reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) -> - send(lists:sublist(K, I), Red, Acc). - -send(Key, Value, #view_acc{limit=Limit} = Acc) -> - case rexi:sync_reply(#view_row{key=Key, value=Value}) of - ok -> - {ok, Acc#view_acc{limit=Limit-1}}; - stop -> - exit(normal); - timeout -> - exit(timeout) - end. - -changes_enumerator(DocInfo, {Db, _Seq, Args}) -> - #changes_args{include_docs=IncludeDocs, filter=Acc} = Args, - #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} - = DocInfo, - case [X || X <- couch_changes:filter(DocInfo, Acc), X /= null] of - [] -> - {ok, {Db, Seq, Args}}; - Results -> - ChangesRow = changes_row(Db, Seq, Id, Results, Rev, Del, IncludeDocs), - Go = rexi:sync_reply(ChangesRow), - {Go, {Db, Seq, Args}} - end. - -changes_row(_, Seq, Id, Results, _, true, true) -> - #view_row{key=Seq, id=Id, value=Results, doc=deleted}; -changes_row(_, Seq, Id, Results, _, true, false) -> - #view_row{key=Seq, id=Id, value=Results, doc=deleted}; -changes_row(Db, Seq, Id, Results, Rev, false, true) -> - #view_row{key=Seq, id=Id, value=Results, doc=doc_member(Db, Id, Rev)}; -changes_row(_, Seq, Id, Results, _, false, false) -> - #view_row{key=Seq, id=Id, value=Results}. - -doc_member(Shard, Id, Rev) -> - case couch_db:open_doc_revs(Shard, Id, [Rev], []) of - {ok, [{ok,Doc}]} -> - couch_doc:to_json_obj(Doc, []); - Error -> - Error - end. - -possible_ancestors(_FullInfo, []) -> - []; -possible_ancestors(FullInfo, MissingRevs) -> - #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), - LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], - % Find the revs that are possible parents of this rev - lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> - % this leaf is a "possible ancenstor" of the missing - % revs if this LeafPos lessthan any of the missing revs - case lists:any(fun({MissingPos, _}) -> - LeafPos < MissingPos end, MissingRevs) of - true -> - [{LeafPos, LeafRevId} | Acc]; - false -> - Acc - end - end, [], LeafRevs). - -make_att_readers([]) -> - []; -make_att_readers([#doc{atts=Atts0} = Doc | Rest]) -> - % % go through the attachments looking for 'follows' in the data, - % % replace with function that reads the data from MIME stream. - Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0], - [Doc#doc{atts = Atts} | make_att_readers(Rest)]. - -make_att_reader({follows, Parser}) -> - fun() -> - Parser ! {get_bytes, self()}, - receive {bytes, Bytes} -> Bytes end - end; -make_att_reader(Else) -> - Else. diff --git a/apps/fabric/src/fabric_util.erl b/apps/fabric/src/fabric_util.erl deleted file mode 100644 index 4ae8e126..00000000 --- a/apps/fabric/src/fabric_util.erl +++ /dev/null @@ -1,97 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_util). - --export([submit_jobs/3, cleanup/1, recv/4, get_db/1, remove_ancestors/2]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("eunit/include/eunit.hrl"). - -submit_jobs(Shards, EndPoint, ExtraArgs) -> - lists:map(fun(#shard{node=Node, name=ShardName} = Shard) -> - Ref = rexi:cast(Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}), - Shard#shard{ref = Ref} - end, Shards). - -cleanup(Workers) -> - [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers]. - -recv(Workers, Keypos, Fun, Acc0) -> - Timeout = case couch_config:get("fabric", "request_timeout", "60000") of - "infinity" -> infinity; - N -> list_to_integer(N) - end, - rexi_utils:recv(Workers, Keypos, Fun, Acc0, Timeout, infinity). - - -get_db(DbName) -> - Shards = mem3:shards(DbName), - case lists:partition(fun(#shard{node = N}) -> N =:= node() end, Shards) of - {[#shard{name = ShardName}|_], _} -> - % prefer node-local DBs - couch_db:open(ShardName, []); - {[], [#shard{node = Node, name = ShardName}|_]} -> - % but don't require them - rpc:call(Node, couch_db, open, [ShardName, []]) - end. - -% this presumes the incoming list is sorted, i.e. shorter revlists come first -remove_ancestors([], Acc) -> - lists:reverse(Acc); -remove_ancestors([{{not_found, _}, Count} = Head | Tail], Acc) -> - % any document is a descendant - case lists:filter(fun({{ok, #doc{}}, _}) -> true; (_) -> false end, Tail) of - [{{ok, #doc{}} = Descendant, _} | _] -> - remove_ancestors(orddict:update_counter(Descendant, Count, Tail), Acc); - [] -> - remove_ancestors(Tail, [Head | Acc]) - end; -remove_ancestors([{{ok, #doc{revs = {Pos, Revs}}}, Count} = Head | Tail], Acc) -> - Descendants = lists:dropwhile(fun - ({{ok, #doc{revs = {Pos2, Revs2}}}, _}) -> - case lists:nthtail(Pos2 - Pos, Revs2) of - [] -> - % impossible to tell if Revs2 is a descendant - assume no - true; - History -> - % if Revs2 is a descendant, History is a prefix of Revs - not lists:prefix(History, Revs) - end - end, Tail), - case Descendants of [] -> - remove_ancestors(Tail, [Head | Acc]); - [{Descendant, _} | _] -> - remove_ancestors(orddict:update_counter(Descendant, Count, Tail), Acc) - end. - -remove_ancestors_test() -> - Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, - Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, - Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, - Bar2 = {not_found, {1,<<"bar">>}}, - ?assertEqual( - [{Bar1,1}, {Foo1,1}], - remove_ancestors([{Bar1,1}, {Foo1,1}], []) - ), - ?assertEqual( - [{Bar1,1}, {Foo2,2}], - remove_ancestors([{Bar1,1}, {Foo1,1}, {Foo2,1}], []) - ), - ?assertEqual( - [{Bar1,2}], - remove_ancestors([{Bar2,1}, {Bar1,1}], []) - ). diff --git a/apps/fabric/src/fabric_view.erl b/apps/fabric/src/fabric_view.erl deleted file mode 100644 index e5f19b73..00000000 --- a/apps/fabric/src/fabric_view.erl +++ /dev/null @@ -1,235 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_view). - --export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, - maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1, - extract_view/4]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -%% @doc looks for a fully covered keyrange in the list of counters --spec is_progress_possible([{#shard{}, term()}]) -> boolean(). -is_progress_possible([]) -> - false; -is_progress_possible(Counters) -> - Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, - [], Counters), - [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges), - Result = lists:foldl(fun - (_, fail) -> - % we've already declared failure - fail; - (_, complete) -> - % this is the success condition, we can fast-forward - complete; - ({X,_}, Tail) when X > (Tail+1) -> - % gap in the keyrange, we're dead - fail; - ({_,Y}, Tail) -> - case erlang:max(Tail, Y) of - End when (End+1) =:= (2 bsl 31) -> - complete; - Else -> - % the normal condition, adding to the tail - Else - end - end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest), - (Start =:= 0) andalso (Result =:= complete). - --spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) -> - [{#shard{}, any()}]. -remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> - fabric_dict:filter(fun(#shard{range=[X,Y]} = Shard, _Value) -> - if Shard =:= Shard0 -> - % we can't remove ourselves - true; - A < B, X >= A, X < B -> - % lower bound is inside our range - false; - A < B, Y > A, Y =< B -> - % upper bound is inside our range - false; - B < A, X >= A orelse B < A, X < B -> - % target shard wraps the key range, lower bound is inside - false; - B < A, Y > A orelse B < A, Y =< B -> - % target shard wraps the key range, upper bound is inside - false; - true -> - true - end - end, Shards). - -maybe_pause_worker(Worker, From, State) -> - #collector{buffer_size = BufferSize, counters = Counters} = State, - case fabric_dict:lookup_element(Worker, Counters) of - BufferSize -> - State#collector{blocked = [{Worker,From} | State#collector.blocked]}; - _Count -> - gen_server:reply(From, ok), - State - end. - -maybe_resume_worker(Worker, State) -> - #collector{buffer_size = Buffer, counters = C, blocked = B} = State, - case fabric_dict:lookup_element(Worker, C) of - Count when Count < Buffer/2 -> - case couch_util:get_value(Worker, B) of - undefined -> - State; - From -> - gen_server:reply(From, ok), - State#collector{blocked = lists:keydelete(Worker, 1, B)} - end; - _Other -> - State - end. - -maybe_send_row(#collector{limit=0} = State) -> - #collector{user_acc=AccIn, callback=Callback} = State, - {_, Acc} = Callback(complete, AccIn), - {stop, State#collector{user_acc=Acc}}; -maybe_send_row(State) -> - #collector{ - callback = Callback, - counters = Counters, - skip = Skip, - limit = Limit, - user_acc = AccIn - } = State, - case fabric_dict:any(0, Counters) of - true -> - {ok, State}; - false -> - try get_next_row(State) of - {_, NewState} when Skip > 0 -> - maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1}); - {Row, NewState} -> - case Callback(transform_row(Row), AccIn) of - {stop, Acc} -> - {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; - {ok, Acc} -> - maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) - end - catch complete -> - {_, Acc} = Callback(complete, AccIn), - {stop, State#collector{user_acc=Acc}} - end - end. - -keydict(nil) -> - undefined; -keydict(Keys) -> - {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end, - {dict:new(),0}, Keys), - Dict. - -%% internal %% - -get_next_row(#collector{rows = []}) -> - throw(complete); -get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> - #collector{ - query_args = #view_query_args{direction=Dir}, - keys = Keys, - rows = RowDict, - os_proc = Proc, - counters = Counters0 - } = St, - {Key, RestKeys} = find_next_key(Keys, Dir, RowDict), - case dict:find(Key, RowDict) of - {ok, Records} -> - NewRowDict = dict:erase(Key, RowDict), - Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) -> - fabric_dict:update_counter(Worker, -1, CountersAcc) - end, Counters0, Records), - Wrapped = [[V] || #view_row{value=V} <- Records], - {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped), - NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters}, - NewState = lists:foldl(fun(#view_row{worker=Worker}, StateAcc) -> - maybe_resume_worker(Worker, StateAcc) - end, NewSt, Records), - {#view_row{key=Key, id=reduced, value=Reduced}, NewState}; - error -> - get_next_row(St#collector{keys=RestKeys}) - end; -get_next_row(State) -> - #collector{rows = [Row|Rest], counters = Counters0} = State, - Worker = Row#view_row.worker, - Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), - NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), - {Row, NewState#collector{rows = Rest}}. - -find_next_key(nil, Dir, RowDict) -> - case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of - [] -> - throw(complete); - [Key|_] -> - {Key, nil} - end; -find_next_key([], _, _) -> - throw(complete); -find_next_key([Key|Rest], _, _) -> - {Key, Rest}. - -transform_row(#view_row{key=Key, id=reduced, value=Value}) -> - {row, {[{key,Key}, {value,Value}]}}; -transform_row(#view_row{key=Key, id=undefined}) -> - {row, {[{key,Key}, {error,not_found}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}}; -transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> - {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}. - -sort_fun(fwd) -> - fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end; -sort_fun(rev) -> - fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end. - -extract_view(Pid, ViewName, [], _ViewType) -> - ?LOG_ERROR("missing_named_view ~p", [ViewName]), - exit(Pid, kill), - exit(missing_named_view); -extract_view(Pid, ViewName, [View|Rest], ViewType) -> - case lists:member(ViewName, view_names(View, ViewType)) of - true -> - if ViewType == reduce -> - {index_of(ViewName, view_names(View, reduce)), View}; - true -> - View - end; - false -> - extract_view(Pid, ViewName, Rest, ViewType) - end. - -view_names(View, Type) when Type == red_map; Type == reduce -> - [Name || {Name, _} <- View#view.reduce_funs]; -view_names(View, map) -> - View#view.map_names. - -index_of(X, List) -> - index_of(X, List, 1). - -index_of(_X, [], _I) -> - not_found; -index_of(X, [X|_Rest], I) -> - I; -index_of(X, [_|Rest], I) -> - index_of(X, Rest, I+1). diff --git a/apps/fabric/src/fabric_view_all_docs.erl b/apps/fabric/src/fabric_view_all_docs.erl deleted file mode 100644 index b3436171..00000000 --- a/apps/fabric/src/fabric_view_all_docs.erl +++ /dev/null @@ -1,181 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_view_all_docs). - --export([go/4]). --export([open_doc/3]). % exported for spawn - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> - Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> - Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}), - Shard#shard{ref = Ref} - end, mem3:shards(DbName)), - BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), - #view_query_args{limit = Limit, skip = Skip} = QueryArgs, - State = #collector{ - query_args = QueryArgs, - callback = Callback, - buffer_size = list_to_integer(BufferSize), - counters = fabric_dict:init(Workers, 0), - skip = Skip, - limit = Limit, - user_acc = Acc0 - }, - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, infinity, 5000) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - Error -> - Error - after - fabric_util:cleanup(Workers) - end; - -go(DbName, QueryArgs, Callback, Acc0) -> - #view_query_args{ - direction = Dir, - include_docs = IncludeDocs, - limit = Limit0, - skip = Skip0, - keys = Keys - } = QueryArgs, - {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end), - Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) || - Id <- Keys], - Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end, - receive {'DOWN', Ref0, _, _, {ok, TotalRows}} -> - {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0), - {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1), - Callback(complete, Acc2) - after 10000 -> - Callback(timeout, Acc0) - end. - -handle_message({rexi_DOWN, _, _, _}, nil, State) -> - % TODO see if progress can be made here, possibly by removing all shards - % from that node and checking is_progress_possible - {ok, State}; - -handle_message({rexi_EXIT, _}, Worker, State) -> - #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, - Counters = fabric_dict:erase(Worker, Counters0), - case fabric_view:is_progress_possible(Counters) of - true -> - {ok, State#collector{counters = Counters}}; - false -> - Callback({error, dead_shards}, Acc), - {error, dead_shards} - end; - -handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> - #collector{ - callback = Callback, - counters = Counters0, - total_rows = Total0, - offset = Offset0, - user_acc = AccIn - } = State, - case fabric_dict:lookup_element(Worker, Counters0) of - undefined -> - % this worker lost the race with other partition copies, terminate - gen_server:reply(From, stop), - {ok, State}; - 0 -> - gen_server:reply(From, ok), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), - Total = Total0 + Tot, - Offset = Offset0 + Off, - case fabric_dict:any(0, Counters2) of - true -> - {ok, State#collector{ - counters = Counters2, - total_rows = Total, - offset = Offset - }}; - false -> - FinalOffset = erlang:min(Total, Offset+State#collector.skip), - {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), - {Go, State#collector{ - counters = fabric_dict:decrement_all(Counters2), - total_rows = Total, - offset = FinalOffset, - user_acc = Acc - }} - end - end; - -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, - Dir = Args#view_query_args.direction, - Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows=Rows, counters=Counters1}, - State2 = fabric_view:maybe_pause_worker(Worker, From, State1), - fabric_view:maybe_send_row(State2); - -handle_message(complete, Worker, State) -> - Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), - fabric_view:maybe_send_row(State#collector{counters = Counters}). - - -merge_row(fwd, Row, Rows) -> - lists:keymerge(#view_row.id, [Row], Rows); -merge_row(rev, Row, Rows) -> - lists:rkeymerge(#view_row.id, [Row], Rows). - -doc_receive_loop([], _, _, _, Acc) -> - {ok, Acc}; -doc_receive_loop(_, _, 0, _, Acc) -> - {ok, Acc}; -doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 -> - receive {'DOWN', Ref, process, Pid, #view_row{}} -> - doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc) - after 10000 -> - timeout - end; -doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) -> - receive {'DOWN', Ref, process, Pid, #view_row{} = Row} -> - case Callback(fabric_view:transform_row(Row), AccIn) of - {ok, Acc} -> - doc_receive_loop(Rest, 0, Limit-1, Callback, Acc); - {stop, Acc} -> - {ok, Acc} - end - after 10000 -> - timeout - end. - -open_doc(DbName, Id, IncludeDocs) -> - Row = case fabric:open_doc(DbName, Id, [deleted]) of - {not_found, missing} -> - Doc = undefined, - #view_row{key=Id}; - {ok, #doc{deleted=true, revs=Revs}} -> - Doc = null, - {RevPos, [RevId|_]} = Revs, - Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]}, - #view_row{key=Id, id=Id, value=Value}; - {ok, #doc{revs=Revs} = Doc0} -> - Doc = couch_doc:to_json_obj(Doc0, []), - {RevPos, [RevId|_]} = Revs, - Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]}, - #view_row{key=Id, id=Id, value=Value} - end, - exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end). diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl deleted file mode 100644 index a4421a92..00000000 --- a/apps/fabric/src/fabric_view_changes.erl +++ /dev/null @@ -1,271 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_view_changes). - --export([go/5, start_update_notifier/1, pack_seqs/1]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse - Feed == "longpoll" -> - Args = make_changes_args(Options), - {ok, Acc} = Callback(start, Acc0), - Notifiers = start_update_notifiers(DbName), - {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback), - try - keep_sending_changes( - DbName, - Args, - Callback, - get_start_seq(DbName, Args), - Acc, - Timeout - ) - after - stop_update_notifiers(Notifiers), - couch_changes:get_rest_db_updated() - end; - -go(DbName, "normal", Options, Callback, Acc0) -> - Args = make_changes_args(Options), - {ok, Acc} = Callback(start, Acc0), - {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes( - DbName, - Args, - Callback, - get_start_seq(DbName, Args), - Acc - ), - Callback({stop, pack_seqs(Seqs)}, AccOut). - -keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout) -> - #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args, - {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn), - #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector, - LastSeq = pack_seqs(NewSeqs), - if Limit > Limit2, Feed == "longpoll" -> - Callback({stop, LastSeq}, AccOut); - true -> - case wait_db_updated(Timeout) of - updated -> - keep_sending_changes( - DbName, - Args#changes_args{limit=Limit2}, - Callback, - LastSeq, - AccOut, - Timeout - ); - timeout -> - case Heartbeat of undefined -> - Callback({stop, LastSeq}, AccOut); - _ -> - {ok, AccTimeout} = Callback(timeout, AccOut), - keep_sending_changes(DbName, Args#changes_args{limit=Limit2}, - Callback, LastSeq, AccTimeout, Timeout) - end - end - end. - -send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) -> - AllShards = mem3:shards(DbName), - Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> - case lists:member(Shard, AllShards) of - true -> - Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}), - [{Shard#shard{ref = Ref}, Seq}]; - false -> - % Find some replacement shards to cover the missing range - % TODO It's possible in rare cases of shard merging to end up - % with overlapping shard ranges from this technique - lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> - Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}), - {NewShard#shard{ref = Ref}, 0} - end, find_replacement_shards(Shard, AllShards)) - end - end, unpack_seqs(PackedSeqs, DbName)), - {Workers, _} = lists:unzip(Seqs), - State = #collector{ - query_args = ChangesArgs, - callback = Callback, - counters = fabric_dict:init(Workers, 0), - user_acc = AccIn, - limit = ChangesArgs#changes_args.limit, - rows = Seqs % store sequence positions instead - }, - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, infinity, 5000) - after - fabric_util:cleanup(Workers) - end. - -handle_message({rexi_DOWN, _, _, _}, nil, State) -> - % TODO see if progress can be made here, possibly by removing all shards - % from that node and checking is_progress_possible - {ok, State}; - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), - #collector{ - callback=Callback, - counters=Counters0, - rows = Seqs0, - user_acc=Acc - } = State, - Counters = fabric_dict:erase(Worker, Counters0), - Seqs = fabric_dict:erase(Worker, Seqs0), - case fabric_view:is_progress_possible(Counters) of - true -> - {ok, State#collector{counters = Counters, rows=Seqs}}; - false -> - Callback({error, dead_shards}, Acc), - {error, dead_shards} - end; - -handle_message(_, _, #collector{limit=0} = State) -> - {stop, State}; - -handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) -> - #collector{ - query_args = #changes_args{include_docs=IncludeDocs}, - callback = Callback, - counters = S0, - limit = Limit, - user_acc = AccIn - } = St, - case fabric_dict:lookup_element(Worker, S0) of - undefined -> - % this worker lost the race with other partition copies, terminate it - gen_server:reply(From, stop), - {ok, St}; - _ -> - S1 = fabric_dict:store(Worker, Seq, S0), - S2 = fabric_view:remove_overlapping_shards(Worker, S1), - Row = Row0#view_row{key = pack_seqs(S2)}, - {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn), - gen_server:reply(From, Go), - {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}} - end; - -handle_message({complete, EndSeq}, Worker, State) -> - #collector{ - counters = S0, - total_rows = Completed % override - } = State, - case fabric_dict:lookup_element(Worker, S0) of - undefined -> - {ok, State}; - _ -> - S1 = fabric_dict:store(Worker, EndSeq, S0), - % unlikely to have overlaps here, but possible w/ filters - S2 = fabric_view:remove_overlapping_shards(Worker, S1), - NewState = State#collector{counters=S2, total_rows=Completed+1}, - case fabric_dict:size(S2) =:= (Completed+1) of - true -> - {stop, NewState}; - false -> - {ok, NewState} - end - end. - -make_changes_args(#changes_args{style=Style, filter=undefined}=Args) -> - Args#changes_args{filter = Style}; -make_changes_args(Args) -> - Args. - -get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) -> - Since; -get_start_seq(DbName, #changes_args{dir=rev}) -> - Shards = mem3:shards(DbName), - Workers = fabric_util:submit_jobs(Shards, get_update_seq, []), - {ok, Since} = fabric_util:recv(Workers, #shard.ref, - fun collect_update_seqs/3, fabric_dict:init(Workers, -1)), - Since. - -collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, Counters}; - -1 -> - C1 = fabric_dict:store(Shard, Seq, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(-1, C2) of - true -> - {ok, C2}; - false -> - {stop, pack_seqs(C2)} - end - end. - -pack_seqs(Workers) -> - SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers], - SeqSum = lists:sum(element(2, lists:unzip(Workers))), - Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])), - list_to_binary([integer_to_list(SeqSum), $-, Opaque]). - -unpack_seqs(0, DbName) -> - fabric_dict:init(mem3:shards(DbName), 0); - -unpack_seqs("0", DbName) -> - fabric_dict:init(mem3:shards(DbName), 0); - -unpack_seqs(Packed, DbName) -> - {match, [Opaque]} = re:run(Packed, "^([0-9]+-)?(?.*)", [{capture, - [opaque], binary}]), - % TODO relies on internal structure of fabric_dict as keylist - lists:map(fun({Node, [A,B], Seq}) -> - Shard = #shard{node=Node, range=[A,B], dbname=DbName}, - {mem3_util:name_shard(Shard), Seq} - end, binary_to_term(couch_util:decodeBase64Url(Opaque))). - -start_update_notifiers(DbName) -> - lists:map(fun(#shard{node=Node, name=Name}) -> - {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})} - end, mem3:shards(DbName)). - -% rexi endpoint -start_update_notifier(DbName) -> - {Caller, _} = get(rexi_from), - Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end, - Id = {couch_db_update_notifier, make_ref()}, - ok = gen_event:add_sup_handler(couch_db_update, Id, Fun), - receive {gen_event_EXIT, Id, Reason} -> - rexi:reply({gen_event_EXIT, DbName, Reason}) - end. - -stop_update_notifiers(Notifiers) -> - [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers]. - -changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}}; -changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}}; -changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}}; -changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}}; -changes_row(#view_row{key=Seq, id=Id, value=Value}, false) -> - {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}. - -find_replacement_shards(#shard{range=Range}, AllShards) -> - % TODO make this moar betta -- we might have split or merged the partition - [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. - -wait_db_updated(Timeout) -> - receive db_updated -> couch_changes:get_rest_db_updated() - after Timeout -> timeout end. diff --git a/apps/fabric/src/fabric_view_map.erl b/apps/fabric/src/fabric_view_map.erl deleted file mode 100644 index e1210a84..00000000 --- a/apps/fabric/src/fabric_view_map.erl +++ /dev/null @@ -1,151 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_view_map). - --export([go/6]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> - {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), - go(DbName, DDoc, View, Args, Callback, Acc0); - -go(DbName, DDoc, View, Args, Callback, Acc0) -> - Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> - Ref = rexi:cast(Node, {fabric_rpc, map_view, [Name, DDoc, View, Args]}), - Shard#shard{ref = Ref} - end, mem3:shards(DbName)), - BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), - #view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args, - State = #collector{ - query_args = Args, - callback = Callback, - buffer_size = list_to_integer(BufferSize), - counters = fabric_dict:init(Workers, 0), - skip = Skip, - limit = Limit, - keys = fabric_view:keydict(Keys), - sorted = Args#view_query_args.sorted, - user_acc = Acc0 - }, - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, infinity, 1000 * 60 * 60) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - Error -> - Error - after - fabric_util:cleanup(Workers) - end. - -handle_message({rexi_DOWN, _, _, _}, nil, State) -> - % TODO see if progress can be made here, possibly by removing all shards - % from that node and checking is_progress_possible - {ok, State}; - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), - #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, - Counters = fabric_dict:erase(Worker, Counters0), - case fabric_view:is_progress_possible(Counters) of - true -> - {ok, State#collector{counters = Counters}}; - false -> - Callback({error, dead_shards}, Acc), - {error, dead_shards} - end; - -handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> - #collector{ - callback = Callback, - counters = Counters0, - total_rows = Total0, - offset = Offset0, - user_acc = AccIn - } = State, - case fabric_dict:lookup_element(Worker, Counters0) of - undefined -> - % this worker lost the race with other partition copies, terminate - gen_server:reply(From, stop), - {ok, State}; - 0 -> - gen_server:reply(From, ok), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), - Total = Total0 + Tot, - Offset = Offset0 + Off, - case fabric_dict:any(0, Counters2) of - true -> - {ok, State#collector{ - counters = Counters2, - total_rows = Total, - offset = Offset - }}; - false -> - FinalOffset = erlang:min(Total, Offset+State#collector.skip), - {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), - {Go, State#collector{ - counters = fabric_dict:decrement_all(Counters2), - total_rows = Total, - offset = FinalOffset, - user_acc = Acc - }} - end - end; - -handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) -> - #collector{callback=Callback} = State, - {_, Acc} = Callback(complete, State#collector.user_acc), - {stop, State#collector{user_acc=Acc}}; - -handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) -> - #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St, - {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), - gen_server:reply(From, ok), - {Go, St#collector{user_acc=Acc, limit=Limit-1}}; - -handle_message(#view_row{} = Row, {Worker, From}, State) -> - #collector{ - query_args = #view_query_args{direction=Dir}, - counters = Counters0, - rows = Rows0, - keys = KeyDict - } = State, - Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0), - Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), - State1 = State#collector{rows=Rows, counters=Counters1}, - State2 = fabric_view:maybe_pause_worker(Worker, From, State1), - fabric_view:maybe_send_row(State2); - -handle_message(complete, Worker, State) -> - Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), - fabric_view:maybe_send_row(State#collector{counters = Counters}). - -merge_row(fwd, undefined, Row, Rows) -> - lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> - couch_view:less_json([KeyA, IdA], [KeyB, IdB]) - end, [Row], Rows); -merge_row(rev, undefined, Row, Rows) -> - lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> - couch_view:less_json([KeyB, IdB], [KeyA, IdA]) - end, [Row], Rows); -merge_row(_, KeyDict, Row, Rows) -> - lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) -> - if A =:= B -> IdA < IdB; true -> - dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict) - end - end, [Row], Rows). diff --git a/apps/fabric/src/fabric_view_reduce.erl b/apps/fabric/src/fabric_view_reduce.erl deleted file mode 100644 index 6ae564cc..00000000 --- a/apps/fabric/src/fabric_view_reduce.erl +++ /dev/null @@ -1,99 +0,0 @@ -% Copyright 2010 Cloudant -% -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_view_reduce). - --export([go/6]). - --include("fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). - -go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> - {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), - go(DbName, DDoc, View, Args, Callback, Acc0); - -go(DbName, DDoc, VName, Args, Callback, Acc0) -> - #group{def_lang=Lang, views=Views} = Group = - couch_view_group:design_doc_to_view_group(DDoc), - {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce), - {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs), - Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) -> - Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}), - Shard#shard{ref = Ref} - end, mem3:shards(DbName)), - BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"), - #view_query_args{limit = Limit, skip = Skip} = Args, - State = #collector{ - query_args = Args, - callback = Callback, - buffer_size = list_to_integer(BufferSize), - counters = fabric_dict:init(Workers, 0), - keys = Args#view_query_args.keys, - skip = Skip, - limit = Limit, - lang = Group#group.def_lang, - os_proc = couch_query_servers:get_os_process(Lang), - reducer = RedSrc, - rows = dict:new(), - user_acc = Acc0 - }, - try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, - State, infinity, 1000 * 60 * 60) of - {ok, NewState} -> - {ok, NewState#collector.user_acc}; - Error -> - Error - after - fabric_util:cleanup(Workers), - catch couch_query_servers:ret_os_process(State#collector.os_proc) - end. - -handle_message({rexi_DOWN, _, _, _}, nil, State) -> - % TODO see if progress can be made here, possibly by removing all shards - % from that node and checking is_progress_possible - {ok, State}; - -handle_message({rexi_EXIT, Reason}, Worker, State) -> - ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]), - #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, - Counters = fabric_dict:erase(Worker, Counters0), - case fabric_view:is_progress_possible(Counters) of - true -> - {ok, State#collector{counters = Counters}}; - false -> - Callback({error, dead_shards}, Acc), - {error, dead_shards} - end; - -handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> - #collector{counters = Counters0, rows = Rows0} = State, - case fabric_dict:lookup_element(Worker, Counters0) of - undefined -> - % this worker lost the race with other partition copies, terminate it - gen_server:reply(From, stop), - {ok, State}; - _ -> - Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0), - C1 = fabric_dict:update_counter(Worker, 1, Counters0), - % TODO time this call, if slow don't do it every time - C2 = fabric_view:remove_overlapping_shards(Worker, C1), - State1 = State#collector{rows=Rows, counters=C2}, - State2 = fabric_view:maybe_pause_worker(Worker, From, State1), - fabric_view:maybe_send_row(State2) - end; - -handle_message(complete, Worker, State) -> - Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), - fabric_view:maybe_send_row(State#collector{counters = Counters}). -- cgit v1.2.3