summaryrefslogtreecommitdiff
path: root/apps/fabric
diff options
context:
space:
mode:
Diffstat (limited to 'apps/fabric')
-rw-r--r--apps/fabric/README.md27
-rw-r--r--apps/fabric/ebin/fabric.appup3
-rw-r--r--apps/fabric/include/fabric.hrl36
-rw-r--r--apps/fabric/src/fabric.app.src6
-rw-r--r--apps/fabric/src/fabric.erl264
-rw-r--r--apps/fabric/src/fabric_db_create.erl79
-rw-r--r--apps/fabric/src/fabric_db_delete.erl55
-rw-r--r--apps/fabric/src/fabric_db_doc_count.erl46
-rw-r--r--apps/fabric/src/fabric_db_info.erl69
-rw-r--r--apps/fabric/src/fabric_db_meta.erl49
-rw-r--r--apps/fabric/src/fabric_dict.erl51
-rw-r--r--apps/fabric/src/fabric_doc_attachments.erl116
-rw-r--r--apps/fabric/src/fabric_doc_missing_revs.erl78
-rw-r--r--apps/fabric/src/fabric_doc_open.erl120
-rw-r--r--apps/fabric/src/fabric_doc_open_revs.erl284
-rw-r--r--apps/fabric/src/fabric_doc_update.erl147
-rw-r--r--apps/fabric/src/fabric_group_info.erl66
-rw-r--r--apps/fabric/src/fabric_rpc.erl402
-rw-r--r--apps/fabric/src/fabric_util.erl97
-rw-r--r--apps/fabric/src/fabric_view.erl235
-rw-r--r--apps/fabric/src/fabric_view_all_docs.erl181
-rw-r--r--apps/fabric/src/fabric_view_changes.erl271
-rw-r--r--apps/fabric/src/fabric_view_map.erl151
-rw-r--r--apps/fabric/src/fabric_view_reduce.erl99
24 files changed, 0 insertions, 2932 deletions
diff --git a/apps/fabric/README.md b/apps/fabric/README.md
deleted file mode 100644
index 2d7f1ae2..00000000
--- a/apps/fabric/README.md
+++ /dev/null
@@ -1,27 +0,0 @@
-## fabric
-
-Fabric is a collection of proxy functions for [CouchDB][1] operations in a cluster. These functions are used in [BigCouch][2] as the remote procedure endpoints on each of the cluster nodes.
-
-For example, creating a database is a straightforward task in standalone CouchDB, but for BigCouch, each node that will store a shard/partition for the database needs to receive and execute a fabric function. The node handling the request also needs to compile the results from each of the nodes and respond accordingly to the client.
-
-Fabric is used in conjunction with 'Rexi' which is also an application within BigCouch.
-
-### Getting Started
-Dependencies:
- * Erlang R13B-03 (or higher)
-
-Build with rebar:
- make
-
-### License
-[Apache 2.0][3]
-
-### Contact
- * [http://cloudant.com][4]
- * [info@cloudant.com][5]
-
-[1]: http://couchdb.apache.org
-[2]: http://github.com/cloudant/bigcouch
-[3]: http://www.apache.org/licenses/LICENSE-2.0.html
-[4]: http://cloudant.com
-[5]: mailto:info@cloudant.com
diff --git a/apps/fabric/ebin/fabric.appup b/apps/fabric/ebin/fabric.appup
deleted file mode 100644
index ef5dc496..00000000
--- a/apps/fabric/ebin/fabric.appup
+++ /dev/null
@@ -1,3 +0,0 @@
-{"1.0.3",[{"1.0.2",[
- {load_module, fabric_view_changes}
-]}],[{"1.0.2",[]}]}.
diff --git a/apps/fabric/include/fabric.hrl b/apps/fabric/include/fabric.hrl
deleted file mode 100644
index 5e10f5cd..00000000
--- a/apps/fabric/include/fabric.hrl
+++ /dev/null
@@ -1,36 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--include_lib("eunit/include/eunit.hrl").
-
--record(collector, {
- query_args,
- callback,
- counters,
- buffer_size,
- blocked = [],
- total_rows = 0,
- offset = 0,
- rows = [],
- skip,
- limit,
- keys,
- os_proc,
- reducer,
- lang,
- sorted,
- user_acc
-}).
-
--record(view_row, {key, id, value, doc, worker}).
diff --git a/apps/fabric/src/fabric.app.src b/apps/fabric/src/fabric.app.src
deleted file mode 100644
index 0b626ba7..00000000
--- a/apps/fabric/src/fabric.app.src
+++ /dev/null
@@ -1,6 +0,0 @@
-{application, fabric, [
- {description, "Routing and proxying layer for CouchDB cluster"},
- {vsn, "1.0.3"},
- {registered, []},
- {applications, [kernel, stdlib, couch, rexi, mem3]}
-]}.
diff --git a/apps/fabric/src/fabric.erl b/apps/fabric/src/fabric.erl
deleted file mode 100644
index 9f9db032..00000000
--- a/apps/fabric/src/fabric.erl
+++ /dev/null
@@ -1,264 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric).
-
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-% DBs
--export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1,
- delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
- set_security/3, get_revs_limit/1, get_security/1]).
-
-% Documents
--export([open_doc/3, open_revs/4, get_missing_revs/2, update_doc/3,
- update_docs/3, att_receiver/2]).
-
-% Views
--export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6,
- get_view_group_info/2]).
-
-% miscellany
--export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0,
- cleanup_index_files/1]).
-
--include("fabric.hrl").
-
-% db operations
-
-all_dbs() ->
- all_dbs(<<>>).
-
-all_dbs(Prefix) when is_list(Prefix) ->
- all_dbs(list_to_binary(Prefix));
-all_dbs(Prefix) when is_binary(Prefix) ->
- Length = byte_size(Prefix),
- MatchingDbs = ets:foldl(fun(#shard{dbname=DbName}, Acc) ->
- case DbName of
- <<Prefix:Length/binary, _/binary>> ->
- [DbName | Acc];
- _ ->
- Acc
- end
- end, [], partitions),
- {ok, lists:usort(MatchingDbs)}.
-
-get_db_info(DbName) ->
- fabric_db_info:go(dbname(DbName)).
-
-get_doc_count(DbName) ->
- fabric_db_doc_count:go(dbname(DbName)).
-
-create_db(DbName) ->
- create_db(DbName, []).
-
-create_db(DbName, Options) ->
- fabric_db_create:go(dbname(DbName), opts(Options)).
-
-delete_db(DbName) ->
- delete_db(DbName, []).
-
-delete_db(DbName, Options) ->
- fabric_db_delete:go(dbname(DbName), opts(Options)).
-
-set_revs_limit(DbName, Limit, Options) when is_integer(Limit), Limit > 0 ->
- fabric_db_meta:set_revs_limit(dbname(DbName), Limit, opts(Options)).
-
-get_revs_limit(DbName) ->
- {ok, Db} = fabric_util:get_db(dbname(DbName)),
- try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end.
-
-set_security(DbName, SecObj, Options) ->
- fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)).
-
-get_security(DbName) ->
- {ok, Db} = fabric_util:get_db(dbname(DbName)),
- try couch_db:get_security(Db) after catch couch_db:close(Db) end.
-
-% doc operations
-open_doc(DbName, Id, Options) ->
- fabric_doc_open:go(dbname(DbName), docid(Id), opts(Options)).
-
-open_revs(DbName, Id, Revs, Options) ->
- fabric_doc_open_revs:go(dbname(DbName), docid(Id), Revs, opts(Options)).
-
-get_missing_revs(DbName, IdsRevs) when is_list(IdsRevs) ->
- Sanitized = [idrevs(IdR) || IdR <- IdsRevs],
- fabric_doc_missing_revs:go(dbname(DbName), Sanitized).
-
-update_doc(DbName, Doc, Options) ->
- case update_docs(DbName, [Doc], opts(Options)) of
- {ok, [{ok, NewRev}]} ->
- {ok, NewRev};
- {ok, [Error]} ->
- throw(Error);
- {ok, []} ->
- % replication success
- #doc{revs = {Pos, [RevId | _]}} = doc(Doc),
- {ok, {Pos, RevId}}
- end.
-
-update_docs(DbName, Docs, Options) ->
- try fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options))
- catch {aborted, PreCommitFailures} ->
- {aborted, PreCommitFailures}
- end.
-
-att_receiver(Req, Length) ->
- fabric_doc_attachments:receiver(Req, Length).
-
-all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs) when
- is_function(Callback, 2) ->
- fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0).
-
-changes(DbName, Callback, Acc0, Options) ->
- % TODO use a keylist for Options instead of #changes_args, BugzID 10281
- Feed = Options#changes_args.feed,
- fabric_view_changes:go(dbname(DbName), Feed, Options, Callback, Acc0).
-
-query_view(DbName, DesignName, ViewName) ->
- query_view(DbName, DesignName, ViewName, #view_query_args{}).
-
-query_view(DbName, DesignName, ViewName, QueryArgs) ->
- Callback = fun default_callback/2,
- query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs).
-
-query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) ->
- Db = dbname(DbName), View = name(ViewName),
- case is_reduce_view(Db, Design, View, QueryArgs) of
- true ->
- Mod = fabric_view_reduce;
- false ->
- Mod = fabric_view_map
- end,
- Mod:go(Db, Design, View, QueryArgs, Callback, Acc0).
-
-get_view_group_info(DbName, DesignId) ->
- fabric_group_info:go(dbname(DbName), design_doc(DesignId)).
-
-design_docs(DbName) ->
- QueryArgs = #view_query_args{start_key = <<"_design/">>, include_docs=true},
- Callback = fun({total_and_offset, _, _}, []) ->
- {ok, []};
- ({row, {Props}}, Acc) ->
- case couch_util:get_value(id, Props) of
- <<"_design/", _/binary>> ->
- {ok, [couch_util:get_value(doc, Props) | Acc]};
- _ ->
- {stop, Acc}
- end;
- (complete, Acc) ->
- {ok, lists:reverse(Acc)}
- end,
- fabric:all_docs(dbname(DbName), Callback, [], QueryArgs).
-
-reset_validation_funs(DbName) ->
- [rexi:cast(Node, {fabric_rpc, reset_validation_funs, [Name]}) ||
- #shard{node=Node, name=Name} <- mem3:shards(DbName)].
-
-cleanup_index_files() ->
- {ok, DbNames} = fabric:all_dbs(),
- [cleanup_index_files(Name) || Name <- DbNames].
-
-cleanup_index_files(DbName) ->
- {ok, DesignDocs} = fabric:design_docs(DbName),
-
- ActiveSigs = lists:map(fun(#doc{id = GroupId}) ->
- {ok, Info} = fabric:get_view_group_info(DbName, GroupId),
- binary_to_list(couch_util:get_value(signature, Info))
- end, [couch_doc:from_json_obj(DD) || DD <- DesignDocs]),
-
- FileList = filelib:wildcard([couch_config:get("couchdb", "view_index_dir"),
- "/.shards/*/", couch_util:to_list(DbName), "_design/*"]),
-
- DeleteFiles = if ActiveSigs =:= [] -> FileList; true ->
- {ok, RegExp} = re:compile([$(, string:join(ActiveSigs, "|"), $)]),
- lists:filter(fun(FilePath) ->
- re:run(FilePath, RegExp, [{capture, none}]) == nomatch
- end, FileList)
- end,
- [file:delete(File) || File <- DeleteFiles],
- ok.
-
-%% some simple type validation and transcoding
-
-dbname(DbName) when is_list(DbName) ->
- list_to_binary(DbName);
-dbname(DbName) when is_binary(DbName) ->
- DbName;
-dbname(#db{name=Name}) ->
- Name;
-dbname(DbName) ->
- erlang:error({illegal_database_name, DbName}).
-
-name(Thing) ->
- couch_util:to_binary(Thing).
-
-docid(DocId) when is_list(DocId) ->
- list_to_binary(DocId);
-docid(DocId) when is_binary(DocId) ->
- DocId;
-docid(DocId) ->
- erlang:error({illegal_docid, DocId}).
-
-docs(Docs) when is_list(Docs) ->
- [doc(D) || D <- Docs];
-docs(Docs) ->
- erlang:error({illegal_docs_list, Docs}).
-
-doc(#doc{} = Doc) ->
- Doc;
-doc({_} = Doc) ->
- couch_doc:from_json_obj(Doc);
-doc(Doc) ->
- erlang:error({illegal_doc_format, Doc}).
-
-design_doc(#doc{} = DDoc) ->
- DDoc;
-design_doc(DocId) when is_list(DocId) ->
- design_doc(list_to_binary(DocId));
-design_doc(<<"_design/", _/binary>> = DocId) ->
- DocId;
-design_doc(GroupName) ->
- <<"_design/", GroupName/binary>>.
-
-idrevs({Id, Revs}) when is_list(Revs) ->
- {docid(Id), [rev(R) || R <- Revs]}.
-
-rev(Rev) when is_list(Rev); is_binary(Rev) ->
- couch_doc:parse_rev(Rev);
-rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
- Rev.
-
-opts(Options) ->
- case couch_util:get_value(user_ctx, Options) of
- undefined ->
- case erlang:get(user_ctx) of
- #user_ctx{} = Ctx ->
- [{user_ctx, Ctx} | Options];
- _ ->
- Options
- end;
- _ ->
- Options
- end.
-
-default_callback(complete, Acc) ->
- {ok, lists:reverse(Acc)};
-default_callback(Row, Acc) ->
- {ok, [Row | Acc]}.
-
-is_reduce_view(_, _, _, #view_query_args{view_type=Reduce}) ->
- Reduce =:= reduce.
diff --git a/apps/fabric/src/fabric_db_create.erl b/apps/fabric/src/fabric_db_create.erl
deleted file mode 100644
index ccea943d..00000000
--- a/apps/fabric/src/fabric_db_create.erl
+++ /dev/null
@@ -1,79 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_db_create).
--export([go/2]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--define(DBNAME_REGEX, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/\\s.]*$").
-
-%% @doc Create a new database, and all its partition files across the cluster
-%% Options is proplist with user_ctx, n, q
-go(DbName, Options) ->
- case re:run(DbName, ?DBNAME_REGEX, [{capture,none}]) of
- match ->
- Shards = mem3:choose_shards(DbName, Options),
- Doc = make_document(Shards),
- Workers = fabric_util:submit_jobs(Shards, create_db, [Options, Doc]),
- Acc0 = fabric_dict:init(Workers, nil),
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
- {ok, _} ->
- ok;
- Else ->
- Else
- end;
- nomatch ->
- {error, illegal_database_name}
- end.
-
-handle_message(Msg, Shard, Counters) ->
- C1 = fabric_dict:store(Shard, Msg, Counters),
- case fabric_dict:any(nil, C1) of
- true ->
- {ok, C1};
- false ->
- final_answer(C1)
- end.
-
-make_document([#shard{dbname=DbName}|_] = Shards) ->
- {RawOut, ByNodeOut, ByRangeOut} =
- lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) ->
- Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-",
- couch_util:to_hex(<<E:32/integer>>)]),
- Node = couch_util:to_binary(N),
- {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode),
- orddict:append(Range, Node, ByRange)}
- end, {[], [], []}, Shards),
- #doc{id=DbName, body = {[
- {<<"changelog">>, lists:sort(RawOut)},
- {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
- {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
- ]}}.
-
-final_answer(Counters) ->
- Successes = [X || {_, M} = X <- Counters, M == ok orelse M == file_exists],
- case fabric_view:is_progress_possible(Successes) of
- true ->
- case lists:keymember(file_exists, 2, Successes) of
- true ->
- {error, file_exists};
- false ->
- {stop, ok}
- end;
- false ->
- {error, internal_server_error}
- end.
diff --git a/apps/fabric/src/fabric_db_delete.erl b/apps/fabric/src/fabric_db_delete.erl
deleted file mode 100644
index c3000e57..00000000
--- a/apps/fabric/src/fabric_db_delete.erl
+++ /dev/null
@@ -1,55 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_db_delete).
--export([go/2]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
-
-go(DbName, Options) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, delete_db, [Options, DbName]),
- Acc0 = fabric_dict:init(Workers, nil),
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
- {ok, ok} ->
- ok;
- {ok, not_found} ->
- erlang:error(database_does_not_exist);
- Error ->
- Error
- end.
-
-handle_message(Msg, Shard, Counters) ->
- C1 = fabric_dict:store(Shard, Msg, Counters),
- case fabric_dict:any(nil, C1) of
- true ->
- {ok, C1};
- false ->
- final_answer(C1)
- end.
-
-final_answer(Counters) ->
- Successes = [X || {_, M} = X <- Counters, M == ok orelse M == not_found],
- case fabric_view:is_progress_possible(Successes) of
- true ->
- case lists:keymember(ok, 2, Successes) of
- true ->
- {stop, ok};
- false ->
- {stop, not_found}
- end;
- false ->
- {error, internal_server_error}
- end.
diff --git a/apps/fabric/src/fabric_db_doc_count.erl b/apps/fabric/src/fabric_db_doc_count.erl
deleted file mode 100644
index faa755e6..00000000
--- a/apps/fabric/src/fabric_db_doc_count.erl
+++ /dev/null
@@ -1,46 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_db_doc_count).
-
--export([go/1]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, get_doc_count, []),
- Acc0 = {fabric_dict:init(Workers, nil), 0},
- fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
-
-handle_message({ok, Count}, Shard, {Counters, Acc}) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- % already heard from someone else in this range
- {ok, {Counters, Acc}};
- nil ->
- C1 = fabric_dict:store(Shard, ok, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- case fabric_dict:any(nil, C2) of
- true ->
- {ok, {C2, Count+Acc}};
- false ->
- {stop, Count+Acc}
- end
- end;
-handle_message(_, _, Acc) ->
- {ok, Acc}.
-
diff --git a/apps/fabric/src/fabric_db_info.erl b/apps/fabric/src/fabric_db_info.erl
deleted file mode 100644
index a0acb379..00000000
--- a/apps/fabric/src/fabric_db_info.erl
+++ /dev/null
@@ -1,69 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_db_info).
-
--export([go/1]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
-
-go(DbName) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
- Acc0 = {fabric_dict:init(Workers, nil), []},
- fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
-
-handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- % already heard from someone else in this range
- {ok, {Counters, Acc}};
- nil ->
- Seq = couch_util:get_value(update_seq, Info),
- C1 = fabric_dict:store(Shard, Seq, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- case fabric_dict:any(nil, C2) of
- true ->
- {ok, {C2, [Info|Acc]}};
- false ->
- {stop, [
- {db_name,Name},
- {update_seq, fabric_view_changes:pack_seqs(C2)} |
- merge_results(lists:flatten([Info|Acc]))
- ]}
- end
- end;
-handle_message(_, _, Acc) ->
- {ok, Acc}.
-
-merge_results(Info) ->
- Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end,
- orddict:new(), Info),
- orddict:fold(fun
- (doc_count, X, Acc) ->
- [{doc_count, lists:sum(X)} | Acc];
- (doc_del_count, X, Acc) ->
- [{doc_del_count, lists:sum(X)} | Acc];
- (purge_seq, X, Acc) ->
- [{purge_seq, lists:sum(X)} | Acc];
- (compact_running, X, Acc) ->
- [{compact_running, lists:member(true, X)} | Acc];
- (disk_size, X, Acc) ->
- [{disk_size, lists:sum(X)} | Acc];
- (disk_format_version, X, Acc) ->
- [{disk_format_version, lists:max(X)} | Acc];
- (_, _, Acc) ->
- Acc
- end, [{instance_start_time, <<"0">>}], Dict).
diff --git a/apps/fabric/src/fabric_db_meta.erl b/apps/fabric/src/fabric_db_meta.erl
deleted file mode 100644
index cb46f380..00000000
--- a/apps/fabric/src/fabric_db_meta.erl
+++ /dev/null
@@ -1,49 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_db_meta).
-
--export([set_revs_limit/3, set_security/3]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
-
-set_revs_limit(DbName, Limit, Options) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, set_revs_limit, [Limit, Options]),
- Waiting = length(Workers) - 1,
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of
- {ok, ok} ->
- ok;
- Error ->
- Error
- end.
-
-set_security(DbName, SecObj, Options) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, set_security, [SecObj, Options]),
- Waiting = length(Workers) - 1,
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of
- {ok, ok} ->
- ok;
- Error ->
- Error
- end.
-
-handle_message(ok, _, 0) ->
- {stop, ok};
-handle_message(ok, _, Waiting) ->
- {ok, Waiting - 1};
-handle_message(Error, _, _Waiting) ->
- {error, Error}. \ No newline at end of file
diff --git a/apps/fabric/src/fabric_dict.erl b/apps/fabric/src/fabric_dict.erl
deleted file mode 100644
index 7db98923..00000000
--- a/apps/fabric/src/fabric_dict.erl
+++ /dev/null
@@ -1,51 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_dict).
--compile(export_all).
-
-% Instead of ets, let's use an ordered keylist. We'll need to revisit if we
-% have >> 100 shards, so a private interface is a good idea. - APK June 2010
-
-init(Keys, InitialValue) ->
- orddict:from_list([{Key, InitialValue} || Key <- Keys]).
-
-
-decrement_all(Dict) ->
- [{K,V-1} || {K,V} <- Dict].
-
-store(Key, Value, Dict) ->
- orddict:store(Key, Value, Dict).
-
-erase(Key, Dict) ->
- orddict:erase(Key, Dict).
-
-update_counter(Key, Incr, Dict0) ->
- orddict:update_counter(Key, Incr, Dict0).
-
-
-lookup_element(Key, Dict) ->
- couch_util:get_value(Key, Dict).
-
-size(Dict) ->
- orddict:size(Dict).
-
-any(Value, Dict) ->
- lists:keymember(Value, 2, Dict).
-
-filter(Fun, Dict) ->
- orddict:filter(Fun, Dict).
-
-fold(Fun, Acc0, Dict) ->
- orddict:fold(Fun, Acc0, Dict).
diff --git a/apps/fabric/src/fabric_doc_attachments.erl b/apps/fabric/src/fabric_doc_attachments.erl
deleted file mode 100644
index b66e2ae4..00000000
--- a/apps/fabric/src/fabric_doc_attachments.erl
+++ /dev/null
@@ -1,116 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_doc_attachments).
-
--include("fabric.hrl").
-
-%% couch api calls
--export([receiver/2]).
-
-receiver(_Req, undefined) ->
- <<"">>;
-receiver(_Req, {unknown_transfer_encoding, Unknown}) ->
- exit({unknown_transfer_encoding, Unknown});
-receiver(Req, chunked) ->
- MiddleMan = spawn(fun() -> middleman(Req, chunked) end),
- fun(4096, ChunkFun, ok) ->
- write_chunks(MiddleMan, ChunkFun)
- end;
-receiver(_Req, 0) ->
- <<"">>;
-receiver(Req, Length) when is_integer(Length) ->
- Middleman = spawn(fun() -> middleman(Req, Length) end),
- fun() ->
- Middleman ! {self(), gimme_data},
- receive {Middleman, Data} -> Data end
- end;
-receiver(_Req, Length) ->
- exit({length_not_integer, Length}).
-
-%%
-%% internal
-%%
-
-write_chunks(MiddleMan, ChunkFun) ->
- MiddleMan ! {self(), gimme_data},
- receive
- {MiddleMan, {0, _Footers}} ->
- % MiddleMan ! {self(), done},
- ok;
- {MiddleMan, ChunkRecord} ->
- ChunkFun(ChunkRecord, ok),
- write_chunks(MiddleMan, ChunkFun)
- end.
-
-receive_unchunked_attachment(_Req, 0) ->
- ok;
-receive_unchunked_attachment(Req, Length) ->
- receive {MiddleMan, go} ->
- Data = couch_httpd:recv(Req, 0),
- MiddleMan ! {self(), Data}
- end,
- receive_unchunked_attachment(Req, Length - size(Data)).
-
-middleman(Req, chunked) ->
- % spawn a process to actually receive the uploaded data
- RcvFun = fun(ChunkRecord, ok) ->
- receive {From, go} -> From ! {self(), ChunkRecord} end, ok
- end,
- Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end),
-
- % take requests from the DB writers and get data from the receiver
- N = erlang:list_to_integer(couch_config:get("cluster","n")),
- middleman_loop(Receiver, N, dict:new(), 0, []);
-
-middleman(Req, Length) ->
- Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end),
- N = erlang:list_to_integer(couch_config:get("cluster","n")),
- middleman_loop(Receiver, N, dict:new(), 0, []).
-
-middleman_loop(Receiver, N, Counters, Offset, ChunkList) ->
- receive {From, gimme_data} ->
- % figure out how far along this writer (From) is in the list
- {NewCounters, WhichChunk} = case dict:find(From, Counters) of
- {ok, I} ->
- {dict:update_counter(From, 1, Counters), I};
- error ->
- {dict:store(From, 2, Counters), 1}
- end,
- ListIndex = WhichChunk - Offset,
-
- % talk to the receiver to get another chunk if necessary
- ChunkList1 = if ListIndex > length(ChunkList) ->
- Receiver ! {self(), go},
- receive {Receiver, ChunkRecord} -> ChunkList ++ [ChunkRecord] end;
- true -> ChunkList end,
-
- % reply to the writer
- From ! {self(), lists:nth(ListIndex, ChunkList1)},
-
- % check if we can drop a chunk from the head of the list
- SmallestIndex = dict:fold(fun(_, Val, Acc) -> lists:min([Val,Acc]) end,
- WhichChunk+1, NewCounters),
- Size = dict:size(NewCounters),
-
- {NewChunkList, NewOffset} =
- if Size == N andalso (SmallestIndex - Offset) == 2 ->
- {tl(ChunkList1), Offset+1};
- true ->
- {ChunkList1, Offset}
- end,
- middleman_loop(Receiver, N, NewCounters, NewOffset, NewChunkList)
- after 10000 ->
- ok
- end.
diff --git a/apps/fabric/src/fabric_doc_missing_revs.erl b/apps/fabric/src/fabric_doc_missing_revs.erl
deleted file mode 100644
index a4d54192..00000000
--- a/apps/fabric/src/fabric_doc_missing_revs.erl
+++ /dev/null
@@ -1,78 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_doc_missing_revs).
-
--export([go/2]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
-
-go(DbName, AllIdsRevs) ->
- Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) ->
- Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}),
- Shard#shard{ref=Ref}
- end, group_idrevs_by_shard(DbName, AllIdsRevs)),
- ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]),
- Acc0 = {length(Workers), ResultDict},
- fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
-
-handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message({rexi_EXIT, _, _, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message({ok, Results}, _Worker, {1, D0}) ->
- D = update_dict(D0, Results),
- {stop, dict:fold(fun force_reply/3, [], D)};
-handle_message({ok, Results}, _Worker, {WaitingCount, D0}) ->
- D = update_dict(D0, Results),
- case dict:fold(fun maybe_reply/3, {stop, []}, D) of
- continue ->
- % still haven't heard about some Ids
- {ok, {WaitingCount - 1, D}};
- {stop, FinalReply} ->
- {stop, FinalReply}
- end.
-
-force_reply(Id, {nil,Revs}, Acc) ->
- % never heard about this ID, assume it's missing
- [{Id, Revs} | Acc];
-force_reply(_, [], Acc) ->
- Acc;
-force_reply(Id, Revs, Acc) ->
- [{Id, Revs} | Acc].
-
-maybe_reply(_, _, continue) ->
- continue;
-maybe_reply(_, {nil, _}, _) ->
- continue;
-maybe_reply(_, [], {stop, Acc}) ->
- {stop, Acc};
-maybe_reply(Id, Revs, {stop, Acc}) ->
- {stop, [{Id, Revs} | Acc]}.
-
-group_idrevs_by_shard(DbName, IdsRevs) ->
- dict:to_list(lists:foldl(fun({Id, Revs}, D0) ->
- lists:foldl(fun(Shard, D1) ->
- dict:append(Shard, {Id, Revs}, D1)
- end, D0, mem3:shards(DbName,Id))
- end, dict:new(), IdsRevs)).
-
-update_dict(D0, KVs) ->
- lists:foldl(fun({K,V,_}, D1) -> dict:store(K, V, D1) end, D0, KVs).
-
-skip_message({1, Dict}) ->
- {stop, dict:fold(fun force_reply/3, [], Dict)};
-skip_message({WaitingCount, Dict}) ->
- {ok, {WaitingCount-1, Dict}}.
diff --git a/apps/fabric/src/fabric_doc_open.erl b/apps/fabric/src/fabric_doc_open.erl
deleted file mode 100644
index dd4917b9..00000000
--- a/apps/fabric/src/fabric_doc_open.erl
+++ /dev/null
@@ -1,120 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_doc_open).
-
--export([go/3]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, Id, Options) ->
- Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc,
- [Id, [deleted|Options]]),
- SuppressDeletedDoc = not lists:member(deleted, Options),
- R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")),
- RepairOpts = [{r, integer_to_list(mem3:n(DbName))} | Options],
- Acc0 = {length(Workers), list_to_integer(R), []},
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
- {ok, Reply} ->
- format_reply(Reply, SuppressDeletedDoc);
- {error, needs_repair, Reply} ->
- spawn(fabric, open_revs, [DbName, Id, all, RepairOpts]),
- format_reply(Reply, SuppressDeletedDoc);
- {error, needs_repair} ->
- % we couldn't determine the correct reply, so we'll run a sync repair
- {ok, Results} = fabric:open_revs(DbName, Id, all, RepairOpts),
- case lists:partition(fun({ok, #doc{deleted=Del}}) -> Del end, Results) of
- {[], []} ->
- {not_found, missing};
- {_DeletedDocs, []} when SuppressDeletedDoc ->
- {not_found, deleted};
- {DeletedDocs, []} ->
- lists:last(lists:sort(DeletedDocs));
- {_, LiveDocs} ->
- lists:last(lists:sort(LiveDocs))
- end;
- Error ->
- Error
- end.
-
-format_reply({ok, #doc{deleted=true}}, true) ->
- {not_found, deleted};
-format_reply(Else, _) ->
- Else.
-
-handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message({rexi_EXIT, _Reason}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message(Reply, _Worker, {WaitingCount, R, Replies}) ->
- NewReplies = orddict:update_counter(Reply, 1, Replies),
- Reduced = fabric_util:remove_ancestors(NewReplies, []),
- case lists:dropwhile(fun({_, Count}) -> Count < R end, Reduced) of
- [{QuorumReply, _} | _] ->
- if length(NewReplies) =:= 1 ->
- {stop, QuorumReply};
- true ->
- % we had some disagreement amongst the workers, so repair is useful
- {error, needs_repair, QuorumReply}
- end;
- [] ->
- if WaitingCount =:= 1 ->
- {error, needs_repair};
- true ->
- {ok, {WaitingCount-1, R, NewReplies}}
- end
- end.
-
-skip_message({1, _R, _Replies}) ->
- {error, needs_repair};
-skip_message({WaitingCount, R, Replies}) ->
- {ok, {WaitingCount-1, R, Replies}}.
-
-
-open_doc_test() ->
- Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}},
- Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}},
- Bar1 = {ok, #doc{revs = {1,[<<"bar">>]}}},
- Baz1 = {ok, #doc{revs = {1,[<<"baz">>]}}},
- NF = {not_found, missing},
- State0 = {3, 2, []},
- State1 = {2, 2, [{Foo1,1}]},
- State2 = {1, 2, [{Bar1,1}, {Foo1,1}]},
- ?assertEqual({ok, State1}, handle_message(Foo1, nil, State0)),
-
- % normal case - quorum reached, no disagreement
- ?assertEqual({stop, Foo1}, handle_message(Foo1, nil, State1)),
-
- % 2nd worker disagrees, voting continues
- ?assertEqual({ok, State2}, handle_message(Bar1, nil, State1)),
-
- % 3rd worker resolves voting, but repair is needed
- ?assertEqual({error, needs_repair, Foo1}, handle_message(Foo1, nil, State2)),
-
- % 2nd worker comes up with descendant of Foo1, voting resolved, run repair
- ?assertEqual({error, needs_repair, Foo2}, handle_message(Foo2, nil, State1)),
-
- % not_found is considered to be an ancestor of everybody
- ?assertEqual({error, needs_repair, Foo1}, handle_message(NF, nil, State1)),
-
- % 3 distinct edit branches result in quorum failure
- ?assertEqual({error, needs_repair}, handle_message(Baz1, nil, State2)),
-
- % bad node concludes voting w/o success, run sync repair to get the result
- ?assertEqual(
- {error, needs_repair},
- handle_message({rexi_DOWN, 1, 2, 3}, nil, State2)
- ).
diff --git a/apps/fabric/src/fabric_doc_open_revs.erl b/apps/fabric/src/fabric_doc_open_revs.erl
deleted file mode 100644
index d0aec6e4..00000000
--- a/apps/fabric/src/fabric_doc_open_revs.erl
+++ /dev/null
@@ -1,284 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_doc_open_revs).
-
--export([go/4]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("eunit/include/eunit.hrl").
-
--record(state, {
- dbname,
- worker_count,
- reply_count = 0,
- r,
- revs,
- latest,
- replies = []
-}).
-
-go(DbName, Id, Revs, Options) ->
- Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs,
- [Id, Revs, Options]),
- R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")),
- State = #state{
- dbname = DbName,
- worker_count = length(Workers),
- r = list_to_integer(R),
- revs = Revs,
- latest = lists:member(latest, Options),
- replies = case Revs of all -> []; Revs -> [{Rev,[]} || Rev <- Revs] end
- },
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of
- {ok, {ok, Reply}} ->
- {ok, Reply};
- Else ->
- Else
- end.
-
-handle_message({rexi_DOWN, _, _, _}, _Worker, State) ->
- skip(State);
-handle_message({rexi_EXIT, _}, _Worker, State) ->
- skip(State);
-handle_message({ok, RawReplies}, _Worker, #state{revs = all} = State) ->
- #state{
- dbname = DbName,
- reply_count = ReplyCount,
- worker_count = WorkerCount,
- replies = All0,
- r = R
- } = State,
- All = lists:foldl(fun(Reply,D) -> orddict:update_counter(Reply,1,D) end,
- All0, RawReplies),
- Reduced = fabric_util:remove_ancestors(All, []),
- Complete = (ReplyCount =:= (WorkerCount - 1)),
- QuorumMet = lists:all(fun({_, C}) -> C >= R end, Reduced),
- case Reduced of All when QuorumMet andalso ReplyCount =:= (R-1) ->
- Repair = false;
- _ ->
- Repair = [D || {{ok,D}, _} <- Reduced]
- end,
- case maybe_reply(DbName, Reduced, Complete, Repair, R) of
- noreply ->
- {ok, State#state{replies = All, reply_count = ReplyCount+1}};
- {reply, FinalReply} ->
- {stop, FinalReply}
- end;
-handle_message({ok, RawReplies0}, _Worker, State) ->
- % we've got an explicit revision list, but if latest=true the workers may
- % return a descendant of the requested revision. Take advantage of the
- % fact that revisions are returned in order to keep track.
- RawReplies = strip_not_found_missing(RawReplies0),
- #state{
- dbname = DbName,
- reply_count = ReplyCount,
- worker_count = WorkerCount,
- replies = All0,
- r = R
- } = State,
- All = lists:zipwith(fun({Rev, D}, Reply) ->
- if Reply =:= error -> {Rev, D}; true ->
- {Rev, orddict:update_counter(Reply, 1, D)}
- end
- end, All0, RawReplies),
- Reduced = [fabric_util:remove_ancestors(X, []) || {_, X} <- All],
- FinalReplies = [choose_winner(X, R) || X <- Reduced],
- Complete = (ReplyCount =:= (WorkerCount - 1)),
- case is_repair_needed(All, FinalReplies) of
- true ->
- Repair = [D || {{ok,D}, _} <- lists:flatten(Reduced)];
- false ->
- Repair = false
- end,
- case maybe_reply(DbName, FinalReplies, Complete, Repair, R) of
- noreply ->
- {ok, State#state{replies = All, reply_count = ReplyCount+1}};
- {reply, FinalReply} ->
- {stop, FinalReply}
- end.
-
-skip(#state{revs=all} = State) ->
- handle_message({ok, []}, nil, State);
-skip(#state{revs=Revs} = State) ->
- handle_message({ok, [error || _Rev <- Revs]}, nil, State).
-
-maybe_reply(_, [], false, _, _) ->
- noreply;
-maybe_reply(DbName, ReplyDict, IsComplete, RepairDocs, R) ->
- case lists:all(fun({_, C}) -> C >= R end, ReplyDict) of
- true ->
- maybe_execute_read_repair(DbName, RepairDocs),
- {reply, unstrip_not_found_missing(orddict:fetch_keys(ReplyDict))};
- false ->
- case IsComplete of false -> noreply; true ->
- maybe_execute_read_repair(DbName, RepairDocs),
- {reply, unstrip_not_found_missing(orddict:fetch_keys(ReplyDict))}
- end
- end.
-
-choose_winner(Options, R) ->
- case lists:dropwhile(fun({_Reply, C}) -> C < R end, Options) of
- [] ->
- case [Elem || {{ok, #doc{}}, _} = Elem <- Options] of
- [] ->
- hd(Options);
- Docs ->
- lists:last(lists:sort(Docs))
- end;
- [QuorumMet | _] ->
- QuorumMet
- end.
-
-% repair needed if any reply other than the winner has been received for a rev
-is_repair_needed([], []) ->
- false;
-is_repair_needed([{_Rev, [Reply]} | Tail1], [Reply | Tail2]) ->
- is_repair_needed(Tail1, Tail2);
-is_repair_needed(_, _) ->
- true.
-
-maybe_execute_read_repair(_Db, false) ->
- ok;
-maybe_execute_read_repair(Db, Docs) ->
- spawn(fun() ->
- [#doc{id=Id} | _] = Docs,
- Ctx = #user_ctx{roles=[<<"_admin">>]},
- Res = fabric:update_docs(Db, Docs, [replicated_changes, {user_ctx,Ctx}]),
- ?LOG_INFO("read_repair ~s ~s ~p", [Db, Id, Res])
- end).
-
-% hackery required so that not_found sorts first
-strip_not_found_missing([]) ->
- [];
-strip_not_found_missing([{{not_found, missing}, Rev} | Rest]) ->
- [{not_found, Rev} | strip_not_found_missing(Rest)];
-strip_not_found_missing([Else | Rest]) ->
- [Else | strip_not_found_missing(Rest)].
-
-unstrip_not_found_missing([]) ->
- [];
-unstrip_not_found_missing([{not_found, Rev} | Rest]) ->
- [{{not_found, missing}, Rev} | unstrip_not_found_missing(Rest)];
-unstrip_not_found_missing([Else | Rest]) ->
- [Else | unstrip_not_found_missing(Rest)].
-
-all_revs_test() ->
- State0 = #state{worker_count = 3, r = 2, revs = all},
- Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
- Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
- Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
-
- % an empty worker response does not count as meeting quorum
- ?assertMatch(
- {ok, #state{}},
- handle_message({ok, []}, nil, State0)
- ),
-
- ?assertMatch(
- {ok, #state{}},
- handle_message({ok, [Foo1, Bar1]}, nil, State0)
- ),
- {ok, State1} = handle_message({ok, [Foo1, Bar1]}, nil, State0),
-
- % the normal case - workers agree
- ?assertEqual(
- {stop, [Bar1, Foo1]},
- handle_message({ok, [Foo1, Bar1]}, nil, State1)
- ),
-
- % a case where the 2nd worker has a newer Foo - currently we're considering
- % Foo to have reached quorum and execute_read_repair()
- ?assertEqual(
- {stop, [Bar1, Foo2]},
- handle_message({ok, [Foo2, Bar1]}, nil, State1)
- ),
-
- % a case where quorum has not yet been reached for Foo
- ?assertMatch(
- {ok, #state{}},
- handle_message({ok, [Bar1]}, nil, State1)
- ),
- {ok, State2} = handle_message({ok, [Bar1]}, nil, State1),
-
- % still no quorum, but all workers have responded. We include Foo1 in the
- % response and execute_read_repair()
- ?assertEqual(
- {stop, [Bar1, Foo1]},
- handle_message({ok, [Bar1]}, nil, State2)
- ).
-
-specific_revs_test() ->
- Revs = [{1,<<"foo">>}, {1,<<"bar">>}, {1,<<"baz">>}],
- State0 = #state{
- worker_count = 3,
- r = 2,
- revs = Revs,
- latest = false,
- replies = [{Rev,[]} || Rev <- Revs]
- },
- Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
- Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
- Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
- Baz1 = {{not_found, missing}, {1,<<"baz">>}},
- Baz2 = {ok, #doc{revs = {1, [<<"baz">>]}}},
-
- ?assertMatch(
- {ok, #state{}},
- handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0)
- ),
- {ok, State1} = handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0),
-
- % the normal case - workers agree
- ?assertEqual(
- {stop, [Foo1, Bar1, Baz1]},
- handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1)
- ),
-
- % latest=true, worker responds with Foo2 and we return it
- State0L = State0#state{latest = true},
- ?assertMatch(
- {ok, #state{}},
- handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L)
- ),
- {ok, State1L} = handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L),
- ?assertEqual(
- {stop, [Foo2, Bar1, Baz1]},
- handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State1L)
- ),
-
- % Foo1 is included in the read quorum for Foo2
- ?assertEqual(
- {stop, [Foo2, Bar1, Baz1]},
- handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1L)
- ),
-
- % {not_found, missing} is included in the quorum for any found revision
- ?assertEqual(
- {stop, [Foo2, Bar1, Baz2]},
- handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State1L)
- ),
-
- % a worker failure is skipped
- ?assertMatch(
- {ok, #state{}},
- handle_message({rexi_EXIT, foo}, nil, State1L)
- ),
- {ok, State2L} = handle_message({rexi_EXIT, foo}, nil, State1L),
- ?assertEqual(
- {stop, [Foo2, Bar1, Baz2]},
- handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State2L)
- ).
diff --git a/apps/fabric/src/fabric_doc_update.erl b/apps/fabric/src/fabric_doc_update.erl
deleted file mode 100644
index 50d02888..00000000
--- a/apps/fabric/src/fabric_doc_update.erl
+++ /dev/null
@@ -1,147 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_doc_update).
-
--export([go/3]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(_, [], _) ->
- {ok, []};
-go(DbName, AllDocs, Opts) ->
- validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
- Options = lists:delete(all_or_nothing, Opts),
- GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
- Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
- {Shard#shard{ref=Ref}, Docs}
- end, group_docs_by_shard(DbName, AllDocs)),
- {Workers, _} = lists:unzip(GroupedDocs),
- W = couch_util:get_value(w, Options, couch_config:get("cluster","w","2")),
- Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs,
- dict:from_list([{Doc,[]} || Doc <- AllDocs])},
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
- {ok, Results} ->
- Reordered = couch_util:reorder_results(AllDocs, Results),
- {ok, [R || R <- Reordered, R =/= noreply]};
- Else ->
- Else
- end.
-
-handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message({rexi_EXIT, _}, _Worker, Acc0) ->
- {WaitingCount, _, W, _, DocReplyDict} = Acc0,
- if WaitingCount =:= 1 ->
- {W, Reply} = dict:fold(fun force_reply/3, {W,[]}, DocReplyDict),
- {stop, Reply};
- true ->
- {ok, setelement(1, Acc0, WaitingCount-1)}
- end;
-handle_message({ok, Replies}, Worker, Acc0) ->
- {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0,
- Docs = couch_util:get_value(Worker, GroupedDocs),
- DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
- case {WaitingCount, dict:size(DocReplyDict)} of
- {1, _} ->
- % last message has arrived, we need to conclude things
- {W, Reply} = dict:fold(fun force_reply/3, {W,[]}, DocReplyDict),
- {stop, Reply};
- {_, DocCount} ->
- % we've got at least one reply for each document, let's take a look
- case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of
- continue ->
- {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}};
- {stop, W, FinalReplies} ->
- {stop, FinalReplies}
- end;
- {_, N} when N < DocCount ->
- % no point in trying to finalize anything yet
- {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}
- end;
-handle_message({missing_stub, Stub}, _, _) ->
- throw({missing_stub, Stub});
-handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
- {_, _, _, GroupedDocs, _} = Acc0,
- Docs = couch_util:get_value(Worker, GroupedDocs),
- handle_message({ok, [X || _D <- Docs]}, Worker, Acc0).
-
-force_reply(Doc, [], {W, Acc}) ->
- {W, [{Doc, {error, internal_server_error}} | Acc]};
-force_reply(Doc, [FirstReply|_] = Replies, {W, Acc}) ->
- case update_quorum_met(W, Replies) of
- {true, Reply} ->
- {W, [{Doc,Reply} | Acc]};
- false ->
- ?LOG_ERROR("write quorum (~p) failed, reply ~p", [W, FirstReply]),
- % TODO make a smarter choice than just picking the first reply
- {W, [{Doc,FirstReply} | Acc]}
- end.
-
-maybe_reply(_, _, continue) ->
- % we didn't meet quorum for all docs, so we're fast-forwarding the fold
- continue;
-maybe_reply(Doc, Replies, {stop, W, Acc}) ->
- case update_quorum_met(W, Replies) of
- {true, Reply} ->
- {stop, W, [{Doc, Reply} | Acc]};
- false ->
- continue
- end.
-
-update_quorum_met(W, Replies) ->
- Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end,
- orddict:new(), Replies),
- case lists:dropwhile(fun({_, Count}) -> Count < W end, Counters) of
- [] ->
- false;
- [{FinalReply, _} | _] ->
- {true, FinalReply}
- end.
-
--spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
-group_docs_by_shard(DbName, Docs) ->
- dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) ->
- lists:foldl(fun(Shard, D1) ->
- dict:append(Shard, Doc, D1)
- end, D0, mem3:shards(DbName,Id))
- end, dict:new(), Docs)).
-
-append_update_replies([], [], DocReplyDict) ->
- DocReplyDict;
-append_update_replies([Doc|Rest], [], Dict0) ->
- % icky, if replicated_changes only errors show up in result
- append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
-append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
- % TODO what if the same document shows up twice in one update_docs call?
- append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
-
-skip_message(Acc0) ->
- % TODO fix this
- {ok, Acc0}.
-
-validate_atomic_update(_, _, false) ->
- ok;
-validate_atomic_update(_DbName, AllDocs, true) ->
- % TODO actually perform the validation. This requires some hackery, we need
- % to basically extract the prep_and_validate_updates function from couch_db
- % and only run that, without actually writing in case of a success.
- Error = {not_implemented, <<"all_or_nothing is not supported yet">>},
- PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) ->
- case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end,
- {{Id, {Pos, RevId}}, Error}
- end, AllDocs),
- throw({aborted, PreCommitFailures}).
diff --git a/apps/fabric/src/fabric_group_info.erl b/apps/fabric/src/fabric_group_info.erl
deleted file mode 100644
index d5260271..00000000
--- a/apps/fabric/src/fabric_group_info.erl
+++ /dev/null
@@ -1,66 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_group_info).
-
--export([go/2]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, GroupId) when is_binary(GroupId) ->
- {ok, DDoc} = fabric:open_doc(DbName, GroupId, []),
- go(DbName, DDoc);
-
-go(DbName, #doc{} = DDoc) ->
- Group = couch_view_group:design_doc_to_view_group(DDoc),
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, group_info, [Group]),
- Acc0 = {fabric_dict:init(Workers, nil), []},
- fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
-
-handle_message({ok, Info}, Shard, {Counters, Acc}) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- % already heard from someone else in this range
- {ok, {Counters, Acc}};
- nil ->
- C1 = fabric_dict:store(Shard, ok, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- case fabric_dict:any(nil, C2) of
- true ->
- {ok, {C2, [Info|Acc]}};
- false ->
- {stop, merge_results(lists:flatten([Info|Acc]))}
- end
- end;
-handle_message(_, _, Acc) ->
- {ok, Acc}.
-
-merge_results(Info) ->
- Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end,
- orddict:new(), Info),
- orddict:fold(fun
- (signature, [X|_], Acc) ->
- [{signature, X} | Acc];
- (language, [X|_], Acc) ->
- [{language, X} | Acc];
- (disk_size, X, Acc) ->
- [{disk_size, lists:sum(X)} | Acc];
- (compact_running, X, Acc) ->
- [{compact_running, lists:member(true, X)} | Acc];
- (_, _, Acc) ->
- Acc
- end, [], Dict).
diff --git a/apps/fabric/src/fabric_rpc.erl b/apps/fabric/src/fabric_rpc.erl
deleted file mode 100644
index a7d555e0..00000000
--- a/apps/fabric/src/fabric_rpc.erl
+++ /dev/null
@@ -1,402 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_rpc).
-
--export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
--export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]).
--export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]).
--export([create_db/3, delete_db/3, reset_validation_funs/1, set_security/3,
- set_revs_limit/3]).
-
--include("fabric.hrl").
--include_lib("couch/include/couch_db.hrl").
-
--record (view_acc, {
- db,
- limit,
- include_docs,
- offset = nil,
- total_rows,
- reduce_fun = fun couch_db:enum_docs_reduce_to_count/1,
- group_level = 0
-}).
-
-%% rpc endpoints
-%% call to with_db will supply your M:F with a #db{} and then remaining args
-
-all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) ->
- {ok, Db} = couch_db:open(DbName, []),
- #view_query_args{
- start_key = StartKey,
- start_docid = StartDocId,
- end_key = EndKey,
- end_docid = EndDocId,
- limit = Limit,
- skip = Skip,
- include_docs = IncludeDocs,
- direction = Dir,
- inclusive_end = Inclusive
- } = QueryArgs,
- {ok, Total} = couch_db:get_doc_count(Db),
- Acc0 = #view_acc{
- db = Db,
- include_docs = IncludeDocs,
- limit = Limit+Skip,
- total_rows = Total
- },
- EndKeyType = if Inclusive -> end_key; true -> end_key_gt end,
- Options = [
- {dir, Dir},
- {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end},
- {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end}
- ],
- {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options),
- final_response(Total, Acc#view_acc.offset).
-
-changes(DbName, Args, StartSeq) ->
- #changes_args{style=Style, dir=Dir} = Args,
- case couch_db:open(DbName, []) of
- {ok, Db} ->
- Enum = fun changes_enumerator/2,
- Opts = [{dir,Dir}],
- Acc0 = {Db, StartSeq, Args},
- try
- {ok, {_, LastSeq, _}} =
- couch_db:changes_since(Db, Style, StartSeq, Enum, Opts, Acc0),
- rexi:reply({complete, LastSeq})
- after
- couch_db:close(Db)
- end;
- Error ->
- rexi:reply(Error)
- end.
-
-map_view(DbName, DDoc, ViewName, QueryArgs) ->
- {ok, Db} = couch_db:open(DbName, []),
- #view_query_args{
- limit = Limit,
- skip = Skip,
- keys = Keys,
- include_docs = IncludeDocs,
- stale = Stale,
- view_type = ViewType
- } = QueryArgs,
- MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end,
- Group0 = couch_view_group:design_doc_to_view_group(DDoc),
- {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
- View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType),
- {ok, Total} = couch_view:get_row_count(View),
- Acc0 = #view_acc{
- db = Db,
- include_docs = IncludeDocs,
- limit = Limit+Skip,
- total_rows = Total,
- reduce_fun = fun couch_view:reduce_to_count/1
- },
- case Keys of
- nil ->
- Options = couch_httpd_view:make_key_options(QueryArgs),
- {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options);
- _ ->
- Acc = lists:foldl(fun(Key, AccIn) ->
- KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key},
- Options = couch_httpd_view:make_key_options(KeyArgs),
- {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn,
- Options),
- Out
- end, Acc0, Keys)
- end,
- final_response(Total, Acc#view_acc.offset).
-
-reduce_view(DbName, Group0, ViewName, QueryArgs) ->
- {ok, Db} = couch_db:open(DbName, []),
- #view_query_args{
- group_level = GroupLevel,
- limit = Limit,
- skip = Skip,
- keys = Keys,
- stale = Stale
- } = QueryArgs,
- GroupFun = group_rows_fun(GroupLevel),
- MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end,
- {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- {ok, #group{views=Views, def_lang=Lang}} = couch_view_group:request_group(
- Pid, MinSeq),
- {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
- ReduceView = {reduce, NthRed, Lang, View},
- Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
- case Keys of
- nil ->
- Options0 = couch_httpd_view:make_key_options(QueryArgs),
- Options = [{key_group_fun, GroupFun} | Options0],
- couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options);
- _ ->
- lists:map(fun(Key) ->
- KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key},
- Options0 = couch_httpd_view:make_key_options(KeyArgs),
- Options = [{key_group_fun, GroupFun} | Options0],
- couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options)
- end, Keys)
- end,
- rexi:reply(complete).
-
-create_db(DbName, Options, Doc) ->
- mem3_util:write_db_doc(Doc),
- rexi:reply(case couch_server:create(DbName, Options) of
- {ok, _} ->
- ok;
- Error ->
- Error
- end).
-
-delete_db(DbName, Options, DocId) ->
- mem3_util:delete_db_doc(DocId),
- rexi:reply(couch_server:delete(DbName, Options)).
-
-get_db_info(DbName) ->
- with_db(DbName, [], {couch_db, get_db_info, []}).
-
-get_doc_count(DbName) ->
- with_db(DbName, [], {couch_db, get_doc_count, []}).
-
-get_update_seq(DbName) ->
- with_db(DbName, [], {couch_db, get_update_seq, []}).
-
-set_security(DbName, SecObj, Options) ->
- with_db(DbName, Options, {couch_db, set_security, [SecObj]}).
-
-set_revs_limit(DbName, Limit, Options) ->
- with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
-
-open_doc(DbName, DocId, Options) ->
- with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
-
-open_revs(DbName, Id, Revs, Options) ->
- with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}).
-
-get_missing_revs(DbName, IdRevsList) ->
- % reimplement here so we get [] for Ids with no missing revs in response
- rexi:reply(case couch_db:open(DbName, []) of
- {ok, Db} ->
- Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
- {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) ->
- case FullDocInfoResult of
- {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} ->
- MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs),
- {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)};
- not_found ->
- {Id, Revs, []}
- end
- end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))};
- Error ->
- Error
- end).
-
-update_docs(DbName, Docs0, Options) ->
- case proplists:get_value(replicated_changes, Options) of
- true ->
- X = replicated_changes;
- _ ->
- X = interactive_edit
- end,
- Docs = make_att_readers(Docs0),
- with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
-
-group_info(DbName, Group0) ->
- {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- rexi:reply(couch_view_group:request_group_info(Pid)).
-
-reset_validation_funs(DbName) ->
- case couch_db:open(DbName, []) of
- {ok, #db{main_pid = Pid}} ->
- gen_server:cast(Pid, {load_validation_funs, undefined});
- _ ->
- ok
- end.
-
-%%
-%% internal
-%%
-
-with_db(DbName, Options, {M,F,A}) ->
- case couch_db:open(DbName, Options) of
- {ok, Db} ->
- rexi:reply(try
- apply(M, F, [Db | A])
- catch Exception ->
- Exception;
- error:Reason ->
- ?LOG_ERROR("~p ~p ~p~n~p", [?MODULE, {M,F}, Reason,
- erlang:get_stacktrace()]),
- {error, Reason}
- end);
- Error ->
- rexi:reply(Error)
- end.
-
-view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
- % matches for _all_docs and translates #full_doc_info{} -> KV pair
- case couch_doc:to_doc_info(FullDocInfo) of
- #doc_info{revs=[#rev_info{deleted=false, rev=Rev}|_]} ->
- Id = FullDocInfo#full_doc_info.id,
- Value = {[{rev,couch_doc:rev_to_str(Rev)}]},
- view_fold({{Id,Id}, Value}, OffsetReds, Acc);
- #doc_info{revs=[#rev_info{deleted=true}|_]} ->
- {ok, Acc}
- end;
-view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) ->
- % calculates the offset for this shard
- #view_acc{reduce_fun=Reduce} = Acc,
- Offset = Reduce(OffsetReds),
- case rexi:sync_reply({total_and_offset, Total, Offset}) of
- ok ->
- view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset});
- stop ->
- exit(normal);
- timeout ->
- exit(timeout)
- end;
-view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) ->
- % we scanned through limit+skip local rows
- {stop, Acc};
-view_fold({{Key,Id}, Value}, _Offset, Acc) ->
- % the normal case
- #view_acc{
- db = Db,
- limit = Limit,
- include_docs = IncludeDocs
- } = Acc,
- Doc = if not IncludeDocs -> undefined; true ->
- case couch_db:open_doc(Db, Id, []) of
- {not_found, deleted} ->
- null;
- {not_found, missing} ->
- undefined;
- {ok, Doc0} ->
- couch_doc:to_json_obj(Doc0, [])
- end
- end,
- case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
- ok ->
- {ok, Acc#view_acc{limit=Limit-1}};
- timeout ->
- exit(timeout)
- end.
-
-final_response(Total, nil) ->
- case rexi:sync_reply({total_and_offset, Total, Total}) of ok ->
- rexi:reply(complete);
- stop ->
- ok;
- timeout ->
- exit(timeout)
- end;
-final_response(_Total, _Offset) ->
- rexi:reply(complete).
-
-group_rows_fun(exact) ->
- fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end;
-group_rows_fun(0) ->
- fun(_A, _B) -> true end;
-group_rows_fun(GroupLevel) when is_integer(GroupLevel) ->
- fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) ->
- lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel);
- ({Key1,_}, {Key2,_}) ->
- Key1 == Key2
- end.
-
-reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) ->
- {stop, Acc};
-reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) ->
- send(null, Red, Acc);
-reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) ->
- send(Key, Red, Acc);
-reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
- send(lists:sublist(K, I), Red, Acc).
-
-send(Key, Value, #view_acc{limit=Limit} = Acc) ->
- case rexi:sync_reply(#view_row{key=Key, value=Value}) of
- ok ->
- {ok, Acc#view_acc{limit=Limit-1}};
- stop ->
- exit(normal);
- timeout ->
- exit(timeout)
- end.
-
-changes_enumerator(DocInfo, {Db, _Seq, Args}) ->
- #changes_args{include_docs=IncludeDocs, filter=Acc} = Args,
- #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}
- = DocInfo,
- case [X || X <- couch_changes:filter(DocInfo, Acc), X /= null] of
- [] ->
- {ok, {Db, Seq, Args}};
- Results ->
- ChangesRow = changes_row(Db, Seq, Id, Results, Rev, Del, IncludeDocs),
- Go = rexi:sync_reply(ChangesRow),
- {Go, {Db, Seq, Args}}
- end.
-
-changes_row(_, Seq, Id, Results, _, true, true) ->
- #view_row{key=Seq, id=Id, value=Results, doc=deleted};
-changes_row(_, Seq, Id, Results, _, true, false) ->
- #view_row{key=Seq, id=Id, value=Results, doc=deleted};
-changes_row(Db, Seq, Id, Results, Rev, false, true) ->
- #view_row{key=Seq, id=Id, value=Results, doc=doc_member(Db, Id, Rev)};
-changes_row(_, Seq, Id, Results, _, false, false) ->
- #view_row{key=Seq, id=Id, value=Results}.
-
-doc_member(Shard, Id, Rev) ->
- case couch_db:open_doc_revs(Shard, Id, [Rev], []) of
- {ok, [{ok,Doc}]} ->
- couch_doc:to_json_obj(Doc, []);
- Error ->
- Error
- end.
-
-possible_ancestors(_FullInfo, []) ->
- [];
-possible_ancestors(FullInfo, MissingRevs) ->
- #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
- LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
- % Find the revs that are possible parents of this rev
- lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
- % this leaf is a "possible ancenstor" of the missing
- % revs if this LeafPos lessthan any of the missing revs
- case lists:any(fun({MissingPos, _}) ->
- LeafPos < MissingPos end, MissingRevs) of
- true ->
- [{LeafPos, LeafRevId} | Acc];
- false ->
- Acc
- end
- end, [], LeafRevs).
-
-make_att_readers([]) ->
- [];
-make_att_readers([#doc{atts=Atts0} = Doc | Rest]) ->
- % % go through the attachments looking for 'follows' in the data,
- % % replace with function that reads the data from MIME stream.
- Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0],
- [Doc#doc{atts = Atts} | make_att_readers(Rest)].
-
-make_att_reader({follows, Parser}) ->
- fun() ->
- Parser ! {get_bytes, self()},
- receive {bytes, Bytes} -> Bytes end
- end;
-make_att_reader(Else) ->
- Else.
diff --git a/apps/fabric/src/fabric_util.erl b/apps/fabric/src/fabric_util.erl
deleted file mode 100644
index 4ae8e126..00000000
--- a/apps/fabric/src/fabric_util.erl
+++ /dev/null
@@ -1,97 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_util).
-
--export([submit_jobs/3, cleanup/1, recv/4, get_db/1, remove_ancestors/2]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("eunit/include/eunit.hrl").
-
-submit_jobs(Shards, EndPoint, ExtraArgs) ->
- lists:map(fun(#shard{node=Node, name=ShardName} = Shard) ->
- Ref = rexi:cast(Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}),
- Shard#shard{ref = Ref}
- end, Shards).
-
-cleanup(Workers) ->
- [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
-
-recv(Workers, Keypos, Fun, Acc0) ->
- Timeout = case couch_config:get("fabric", "request_timeout", "60000") of
- "infinity" -> infinity;
- N -> list_to_integer(N)
- end,
- rexi_utils:recv(Workers, Keypos, Fun, Acc0, Timeout, infinity).
-
-
-get_db(DbName) ->
- Shards = mem3:shards(DbName),
- case lists:partition(fun(#shard{node = N}) -> N =:= node() end, Shards) of
- {[#shard{name = ShardName}|_], _} ->
- % prefer node-local DBs
- couch_db:open(ShardName, []);
- {[], [#shard{node = Node, name = ShardName}|_]} ->
- % but don't require them
- rpc:call(Node, couch_db, open, [ShardName, []])
- end.
-
-% this presumes the incoming list is sorted, i.e. shorter revlists come first
-remove_ancestors([], Acc) ->
- lists:reverse(Acc);
-remove_ancestors([{{not_found, _}, Count} = Head | Tail], Acc) ->
- % any document is a descendant
- case lists:filter(fun({{ok, #doc{}}, _}) -> true; (_) -> false end, Tail) of
- [{{ok, #doc{}} = Descendant, _} | _] ->
- remove_ancestors(orddict:update_counter(Descendant, Count, Tail), Acc);
- [] ->
- remove_ancestors(Tail, [Head | Acc])
- end;
-remove_ancestors([{{ok, #doc{revs = {Pos, Revs}}}, Count} = Head | Tail], Acc) ->
- Descendants = lists:dropwhile(fun
- ({{ok, #doc{revs = {Pos2, Revs2}}}, _}) ->
- case lists:nthtail(Pos2 - Pos, Revs2) of
- [] ->
- % impossible to tell if Revs2 is a descendant - assume no
- true;
- History ->
- % if Revs2 is a descendant, History is a prefix of Revs
- not lists:prefix(History, Revs)
- end
- end, Tail),
- case Descendants of [] ->
- remove_ancestors(Tail, [Head | Acc]);
- [{Descendant, _} | _] ->
- remove_ancestors(orddict:update_counter(Descendant, Count, Tail), Acc)
- end.
-
-remove_ancestors_test() ->
- Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
- Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
- Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
- Bar2 = {not_found, {1,<<"bar">>}},
- ?assertEqual(
- [{Bar1,1}, {Foo1,1}],
- remove_ancestors([{Bar1,1}, {Foo1,1}], [])
- ),
- ?assertEqual(
- [{Bar1,1}, {Foo2,2}],
- remove_ancestors([{Bar1,1}, {Foo1,1}, {Foo2,1}], [])
- ),
- ?assertEqual(
- [{Bar1,2}],
- remove_ancestors([{Bar2,1}, {Bar1,1}], [])
- ).
diff --git a/apps/fabric/src/fabric_view.erl b/apps/fabric/src/fabric_view.erl
deleted file mode 100644
index e5f19b73..00000000
--- a/apps/fabric/src/fabric_view.erl
+++ /dev/null
@@ -1,235 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_view).
-
--export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
- maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1,
- extract_view/4]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-%% @doc looks for a fully covered keyrange in the list of counters
--spec is_progress_possible([{#shard{}, term()}]) -> boolean().
-is_progress_possible([]) ->
- false;
-is_progress_possible(Counters) ->
- Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end,
- [], Counters),
- [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges),
- Result = lists:foldl(fun
- (_, fail) ->
- % we've already declared failure
- fail;
- (_, complete) ->
- % this is the success condition, we can fast-forward
- complete;
- ({X,_}, Tail) when X > (Tail+1) ->
- % gap in the keyrange, we're dead
- fail;
- ({_,Y}, Tail) ->
- case erlang:max(Tail, Y) of
- End when (End+1) =:= (2 bsl 31) ->
- complete;
- Else ->
- % the normal condition, adding to the tail
- Else
- end
- end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest),
- (Start =:= 0) andalso (Result =:= complete).
-
--spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
- [{#shard{}, any()}].
-remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) ->
- fabric_dict:filter(fun(#shard{range=[X,Y]} = Shard, _Value) ->
- if Shard =:= Shard0 ->
- % we can't remove ourselves
- true;
- A < B, X >= A, X < B ->
- % lower bound is inside our range
- false;
- A < B, Y > A, Y =< B ->
- % upper bound is inside our range
- false;
- B < A, X >= A orelse B < A, X < B ->
- % target shard wraps the key range, lower bound is inside
- false;
- B < A, Y > A orelse B < A, Y =< B ->
- % target shard wraps the key range, upper bound is inside
- false;
- true ->
- true
- end
- end, Shards).
-
-maybe_pause_worker(Worker, From, State) ->
- #collector{buffer_size = BufferSize, counters = Counters} = State,
- case fabric_dict:lookup_element(Worker, Counters) of
- BufferSize ->
- State#collector{blocked = [{Worker,From} | State#collector.blocked]};
- _Count ->
- gen_server:reply(From, ok),
- State
- end.
-
-maybe_resume_worker(Worker, State) ->
- #collector{buffer_size = Buffer, counters = C, blocked = B} = State,
- case fabric_dict:lookup_element(Worker, C) of
- Count when Count < Buffer/2 ->
- case couch_util:get_value(Worker, B) of
- undefined ->
- State;
- From ->
- gen_server:reply(From, ok),
- State#collector{blocked = lists:keydelete(Worker, 1, B)}
- end;
- _Other ->
- State
- end.
-
-maybe_send_row(#collector{limit=0} = State) ->
- #collector{user_acc=AccIn, callback=Callback} = State,
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}};
-maybe_send_row(State) ->
- #collector{
- callback = Callback,
- counters = Counters,
- skip = Skip,
- limit = Limit,
- user_acc = AccIn
- } = State,
- case fabric_dict:any(0, Counters) of
- true ->
- {ok, State};
- false ->
- try get_next_row(State) of
- {_, NewState} when Skip > 0 ->
- maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1});
- {Row, NewState} ->
- case Callback(transform_row(Row), AccIn) of
- {stop, Acc} ->
- {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
- {ok, Acc} ->
- maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1})
- end
- catch complete ->
- {_, Acc} = Callback(complete, AccIn),
- {stop, State#collector{user_acc=Acc}}
- end
- end.
-
-keydict(nil) ->
- undefined;
-keydict(Keys) ->
- {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end,
- {dict:new(),0}, Keys),
- Dict.
-
-%% internal %%
-
-get_next_row(#collector{rows = []}) ->
- throw(complete);
-get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
- #collector{
- query_args = #view_query_args{direction=Dir},
- keys = Keys,
- rows = RowDict,
- os_proc = Proc,
- counters = Counters0
- } = St,
- {Key, RestKeys} = find_next_key(Keys, Dir, RowDict),
- case dict:find(Key, RowDict) of
- {ok, Records} ->
- NewRowDict = dict:erase(Key, RowDict),
- Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) ->
- fabric_dict:update_counter(Worker, -1, CountersAcc)
- end, Counters0, Records),
- Wrapped = [[V] || #view_row{value=V} <- Records],
- {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped),
- NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
- NewState = lists:foldl(fun(#view_row{worker=Worker}, StateAcc) ->
- maybe_resume_worker(Worker, StateAcc)
- end, NewSt, Records),
- {#view_row{key=Key, id=reduced, value=Reduced}, NewState};
- error ->
- get_next_row(St#collector{keys=RestKeys})
- end;
-get_next_row(State) ->
- #collector{rows = [Row|Rest], counters = Counters0} = State,
- Worker = Row#view_row.worker,
- Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
- NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}),
- {Row, NewState#collector{rows = Rest}}.
-
-find_next_key(nil, Dir, RowDict) ->
- case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
- [] ->
- throw(complete);
- [Key|_] ->
- {Key, nil}
- end;
-find_next_key([], _, _) ->
- throw(complete);
-find_next_key([Key|Rest], _, _) ->
- {Key, Rest}.
-
-transform_row(#view_row{key=Key, id=reduced, value=Value}) ->
- {row, {[{key,Key}, {value,Value}]}};
-transform_row(#view_row{key=Key, id=undefined}) ->
- {row, {[{key,Key}, {error,not_found}]}};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
- {row, {[{id,Id}, {key,Key}, {value,Value}]}};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) ->
- {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}};
-transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
- {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}.
-
-sort_fun(fwd) ->
- fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end;
-sort_fun(rev) ->
- fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end.
-
-extract_view(Pid, ViewName, [], _ViewType) ->
- ?LOG_ERROR("missing_named_view ~p", [ViewName]),
- exit(Pid, kill),
- exit(missing_named_view);
-extract_view(Pid, ViewName, [View|Rest], ViewType) ->
- case lists:member(ViewName, view_names(View, ViewType)) of
- true ->
- if ViewType == reduce ->
- {index_of(ViewName, view_names(View, reduce)), View};
- true ->
- View
- end;
- false ->
- extract_view(Pid, ViewName, Rest, ViewType)
- end.
-
-view_names(View, Type) when Type == red_map; Type == reduce ->
- [Name || {Name, _} <- View#view.reduce_funs];
-view_names(View, map) ->
- View#view.map_names.
-
-index_of(X, List) ->
- index_of(X, List, 1).
-
-index_of(_X, [], _I) ->
- not_found;
-index_of(X, [X|_Rest], I) ->
- I;
-index_of(X, [_|Rest], I) ->
- index_of(X, Rest, I+1).
diff --git a/apps/fabric/src/fabric_view_all_docs.erl b/apps/fabric/src/fabric_view_all_docs.erl
deleted file mode 100644
index b3436171..00000000
--- a/apps/fabric/src/fabric_view_all_docs.erl
+++ /dev/null
@@ -1,181 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_view_all_docs).
-
--export([go/4]).
--export([open_doc/3]). % exported for spawn
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
- Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) ->
- Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}),
- Shard#shard{ref = Ref}
- end, mem3:shards(DbName)),
- BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
- #view_query_args{limit = Limit, skip = Skip} = QueryArgs,
- State = #collector{
- query_args = QueryArgs,
- callback = Callback,
- buffer_size = list_to_integer(BufferSize),
- counters = fabric_dict:init(Workers, 0),
- skip = Skip,
- limit = Limit,
- user_acc = Acc0
- },
- try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
- State, infinity, 5000) of
- {ok, NewState} ->
- {ok, NewState#collector.user_acc};
- Error ->
- Error
- after
- fabric_util:cleanup(Workers)
- end;
-
-go(DbName, QueryArgs, Callback, Acc0) ->
- #view_query_args{
- direction = Dir,
- include_docs = IncludeDocs,
- limit = Limit0,
- skip = Skip0,
- keys = Keys
- } = QueryArgs,
- {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end),
- Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) ||
- Id <- Keys],
- Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end,
- receive {'DOWN', Ref0, _, _, {ok, TotalRows}} ->
- {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0),
- {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1),
- Callback(complete, Acc2)
- after 10000 ->
- Callback(timeout, Acc0)
- end.
-
-handle_message({rexi_DOWN, _, _, _}, nil, State) ->
- % TODO see if progress can be made here, possibly by removing all shards
- % from that node and checking is_progress_possible
- {ok, State};
-
-handle_message({rexi_EXIT, _}, Worker, State) ->
- #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
- Counters = fabric_dict:erase(Worker, Counters0),
- case fabric_view:is_progress_possible(Counters) of
- true ->
- {ok, State#collector{counters = Counters}};
- false ->
- Callback({error, dead_shards}, Acc),
- {error, dead_shards}
- end;
-
-handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
- #collector{
- callback = Callback,
- counters = Counters0,
- total_rows = Total0,
- offset = Offset0,
- user_acc = AccIn
- } = State,
- case fabric_dict:lookup_element(Worker, Counters0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate
- gen_server:reply(From, stop),
- {ok, State};
- 0 ->
- gen_server:reply(From, ok),
- Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
- Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
- Total = Total0 + Tot,
- Offset = Offset0 + Off,
- case fabric_dict:any(0, Counters2) of
- true ->
- {ok, State#collector{
- counters = Counters2,
- total_rows = Total,
- offset = Offset
- }};
- false ->
- FinalOffset = erlang:min(Total, Offset+State#collector.skip),
- {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn),
- {Go, State#collector{
- counters = fabric_dict:decrement_all(Counters2),
- total_rows = Total,
- offset = FinalOffset,
- user_acc = Acc
- }}
- end
- end;
-
-handle_message(#view_row{} = Row, {Worker, From}, State) ->
- #collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
- Dir = Args#view_query_args.direction,
- Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0),
- Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
- State1 = State#collector{rows=Rows, counters=Counters1},
- State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
- fabric_view:maybe_send_row(State2);
-
-handle_message(complete, Worker, State) ->
- Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
- fabric_view:maybe_send_row(State#collector{counters = Counters}).
-
-
-merge_row(fwd, Row, Rows) ->
- lists:keymerge(#view_row.id, [Row], Rows);
-merge_row(rev, Row, Rows) ->
- lists:rkeymerge(#view_row.id, [Row], Rows).
-
-doc_receive_loop([], _, _, _, Acc) ->
- {ok, Acc};
-doc_receive_loop(_, _, 0, _, Acc) ->
- {ok, Acc};
-doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 ->
- receive {'DOWN', Ref, process, Pid, #view_row{}} ->
- doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc)
- after 10000 ->
- timeout
- end;
-doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) ->
- receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
- case Callback(fabric_view:transform_row(Row), AccIn) of
- {ok, Acc} ->
- doc_receive_loop(Rest, 0, Limit-1, Callback, Acc);
- {stop, Acc} ->
- {ok, Acc}
- end
- after 10000 ->
- timeout
- end.
-
-open_doc(DbName, Id, IncludeDocs) ->
- Row = case fabric:open_doc(DbName, Id, [deleted]) of
- {not_found, missing} ->
- Doc = undefined,
- #view_row{key=Id};
- {ok, #doc{deleted=true, revs=Revs}} ->
- Doc = null,
- {RevPos, [RevId|_]} = Revs,
- Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]},
- #view_row{key=Id, id=Id, value=Value};
- {ok, #doc{revs=Revs} = Doc0} ->
- Doc = couch_doc:to_json_obj(Doc0, []),
- {RevPos, [RevId|_]} = Revs,
- Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]},
- #view_row{key=Id, id=Id, value=Value}
- end,
- exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end).
diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl
deleted file mode 100644
index a4421a92..00000000
--- a/apps/fabric/src/fabric_view_changes.erl
+++ /dev/null
@@ -1,271 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_view_changes).
-
--export([go/5, start_update_notifier/1, pack_seqs/1]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
- Feed == "longpoll" ->
- Args = make_changes_args(Options),
- {ok, Acc} = Callback(start, Acc0),
- Notifiers = start_update_notifiers(DbName),
- {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
- try
- keep_sending_changes(
- DbName,
- Args,
- Callback,
- get_start_seq(DbName, Args),
- Acc,
- Timeout
- )
- after
- stop_update_notifiers(Notifiers),
- couch_changes:get_rest_db_updated()
- end;
-
-go(DbName, "normal", Options, Callback, Acc0) ->
- Args = make_changes_args(Options),
- {ok, Acc} = Callback(start, Acc0),
- {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
- DbName,
- Args,
- Callback,
- get_start_seq(DbName, Args),
- Acc
- ),
- Callback({stop, pack_seqs(Seqs)}, AccOut).
-
-keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout) ->
- #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
- {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn),
- #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
- LastSeq = pack_seqs(NewSeqs),
- if Limit > Limit2, Feed == "longpoll" ->
- Callback({stop, LastSeq}, AccOut);
- true ->
- case wait_db_updated(Timeout) of
- updated ->
- keep_sending_changes(
- DbName,
- Args#changes_args{limit=Limit2},
- Callback,
- LastSeq,
- AccOut,
- Timeout
- );
- timeout ->
- case Heartbeat of undefined ->
- Callback({stop, LastSeq}, AccOut);
- _ ->
- {ok, AccTimeout} = Callback(timeout, AccOut),
- keep_sending_changes(DbName, Args#changes_args{limit=Limit2},
- Callback, LastSeq, AccTimeout, Timeout)
- end
- end
- end.
-
-send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) ->
- AllShards = mem3:shards(DbName),
- Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
- case lists:member(Shard, AllShards) of
- true ->
- Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
- [{Shard#shard{ref = Ref}, Seq}];
- false ->
- % Find some replacement shards to cover the missing range
- % TODO It's possible in rare cases of shard merging to end up
- % with overlapping shard ranges from this technique
- lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
- Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
- {NewShard#shard{ref = Ref}, 0}
- end, find_replacement_shards(Shard, AllShards))
- end
- end, unpack_seqs(PackedSeqs, DbName)),
- {Workers, _} = lists:unzip(Seqs),
- State = #collector{
- query_args = ChangesArgs,
- callback = Callback,
- counters = fabric_dict:init(Workers, 0),
- user_acc = AccIn,
- limit = ChangesArgs#changes_args.limit,
- rows = Seqs % store sequence positions instead
- },
- try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
- State, infinity, 5000)
- after
- fabric_util:cleanup(Workers)
- end.
-
-handle_message({rexi_DOWN, _, _, _}, nil, State) ->
- % TODO see if progress can be made here, possibly by removing all shards
- % from that node and checking is_progress_possible
- {ok, State};
-
-handle_message({rexi_EXIT, Reason}, Worker, State) ->
- ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
- #collector{
- callback=Callback,
- counters=Counters0,
- rows = Seqs0,
- user_acc=Acc
- } = State,
- Counters = fabric_dict:erase(Worker, Counters0),
- Seqs = fabric_dict:erase(Worker, Seqs0),
- case fabric_view:is_progress_possible(Counters) of
- true ->
- {ok, State#collector{counters = Counters, rows=Seqs}};
- false ->
- Callback({error, dead_shards}, Acc),
- {error, dead_shards}
- end;
-
-handle_message(_, _, #collector{limit=0} = State) ->
- {stop, State};
-
-handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) ->
- #collector{
- query_args = #changes_args{include_docs=IncludeDocs},
- callback = Callback,
- counters = S0,
- limit = Limit,
- user_acc = AccIn
- } = St,
- case fabric_dict:lookup_element(Worker, S0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate it
- gen_server:reply(From, stop),
- {ok, St};
- _ ->
- S1 = fabric_dict:store(Worker, Seq, S0),
- S2 = fabric_view:remove_overlapping_shards(Worker, S1),
- Row = Row0#view_row{key = pack_seqs(S2)},
- {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
- gen_server:reply(From, Go),
- {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}}
- end;
-
-handle_message({complete, EndSeq}, Worker, State) ->
- #collector{
- counters = S0,
- total_rows = Completed % override
- } = State,
- case fabric_dict:lookup_element(Worker, S0) of
- undefined ->
- {ok, State};
- _ ->
- S1 = fabric_dict:store(Worker, EndSeq, S0),
- % unlikely to have overlaps here, but possible w/ filters
- S2 = fabric_view:remove_overlapping_shards(Worker, S1),
- NewState = State#collector{counters=S2, total_rows=Completed+1},
- case fabric_dict:size(S2) =:= (Completed+1) of
- true ->
- {stop, NewState};
- false ->
- {ok, NewState}
- end
- end.
-
-make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
- Args#changes_args{filter = Style};
-make_changes_args(Args) ->
- Args.
-
-get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
- Since;
-get_start_seq(DbName, #changes_args{dir=rev}) ->
- Shards = mem3:shards(DbName),
- Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
- {ok, Since} = fabric_util:recv(Workers, #shard.ref,
- fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
- Since.
-
-collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
- case fabric_dict:lookup_element(Shard, Counters) of
- undefined ->
- % already heard from someone else in this range
- {ok, Counters};
- -1 ->
- C1 = fabric_dict:store(Shard, Seq, Counters),
- C2 = fabric_view:remove_overlapping_shards(Shard, C1),
- case fabric_dict:any(-1, C2) of
- true ->
- {ok, C2};
- false ->
- {stop, pack_seqs(C2)}
- end
- end.
-
-pack_seqs(Workers) ->
- SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
- SeqSum = lists:sum(element(2, lists:unzip(Workers))),
- Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
- list_to_binary([integer_to_list(SeqSum), $-, Opaque]).
-
-unpack_seqs(0, DbName) ->
- fabric_dict:init(mem3:shards(DbName), 0);
-
-unpack_seqs("0", DbName) ->
- fabric_dict:init(mem3:shards(DbName), 0);
-
-unpack_seqs(Packed, DbName) ->
- {match, [Opaque]} = re:run(Packed, "^([0-9]+-)?(?<opaque>.*)", [{capture,
- [opaque], binary}]),
- % TODO relies on internal structure of fabric_dict as keylist
- lists:map(fun({Node, [A,B], Seq}) ->
- Shard = #shard{node=Node, range=[A,B], dbname=DbName},
- {mem3_util:name_shard(Shard), Seq}
- end, binary_to_term(couch_util:decodeBase64Url(Opaque))).
-
-start_update_notifiers(DbName) ->
- lists:map(fun(#shard{node=Node, name=Name}) ->
- {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
- end, mem3:shards(DbName)).
-
-% rexi endpoint
-start_update_notifier(DbName) ->
- {Caller, _} = get(rexi_from),
- Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end,
- Id = {couch_db_update_notifier, make_ref()},
- ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
- receive {gen_event_EXIT, Id, Reason} ->
- rexi:reply({gen_event_EXIT, DbName, Reason})
- end.
-
-stop_update_notifiers(Notifiers) ->
- [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers].
-
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
-changes_row(#view_row{key=Seq, id=Id, value=Value}, false) ->
- {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
-
-find_replacement_shards(#shard{range=Range}, AllShards) ->
- % TODO make this moar betta -- we might have split or merged the partition
- [Shard || Shard <- AllShards, Shard#shard.range =:= Range].
-
-wait_db_updated(Timeout) ->
- receive db_updated -> couch_changes:get_rest_db_updated()
- after Timeout -> timeout end.
diff --git a/apps/fabric/src/fabric_view_map.erl b/apps/fabric/src/fabric_view_map.erl
deleted file mode 100644
index e1210a84..00000000
--- a/apps/fabric/src/fabric_view_map.erl
+++ /dev/null
@@ -1,151 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_view_map).
-
--export([go/6]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
- {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
- go(DbName, DDoc, View, Args, Callback, Acc0);
-
-go(DbName, DDoc, View, Args, Callback, Acc0) ->
- Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) ->
- Ref = rexi:cast(Node, {fabric_rpc, map_view, [Name, DDoc, View, Args]}),
- Shard#shard{ref = Ref}
- end, mem3:shards(DbName)),
- BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
- #view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args,
- State = #collector{
- query_args = Args,
- callback = Callback,
- buffer_size = list_to_integer(BufferSize),
- counters = fabric_dict:init(Workers, 0),
- skip = Skip,
- limit = Limit,
- keys = fabric_view:keydict(Keys),
- sorted = Args#view_query_args.sorted,
- user_acc = Acc0
- },
- try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
- State, infinity, 1000 * 60 * 60) of
- {ok, NewState} ->
- {ok, NewState#collector.user_acc};
- Error ->
- Error
- after
- fabric_util:cleanup(Workers)
- end.
-
-handle_message({rexi_DOWN, _, _, _}, nil, State) ->
- % TODO see if progress can be made here, possibly by removing all shards
- % from that node and checking is_progress_possible
- {ok, State};
-
-handle_message({rexi_EXIT, Reason}, Worker, State) ->
- ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
- #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
- Counters = fabric_dict:erase(Worker, Counters0),
- case fabric_view:is_progress_possible(Counters) of
- true ->
- {ok, State#collector{counters = Counters}};
- false ->
- Callback({error, dead_shards}, Acc),
- {error, dead_shards}
- end;
-
-handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
- #collector{
- callback = Callback,
- counters = Counters0,
- total_rows = Total0,
- offset = Offset0,
- user_acc = AccIn
- } = State,
- case fabric_dict:lookup_element(Worker, Counters0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate
- gen_server:reply(From, stop),
- {ok, State};
- 0 ->
- gen_server:reply(From, ok),
- Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
- Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
- Total = Total0 + Tot,
- Offset = Offset0 + Off,
- case fabric_dict:any(0, Counters2) of
- true ->
- {ok, State#collector{
- counters = Counters2,
- total_rows = Total,
- offset = Offset
- }};
- false ->
- FinalOffset = erlang:min(Total, Offset+State#collector.skip),
- {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn),
- {Go, State#collector{
- counters = fabric_dict:decrement_all(Counters2),
- total_rows = Total,
- offset = FinalOffset,
- user_acc = Acc
- }}
- end
- end;
-
-handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->
- #collector{callback=Callback} = State,
- {_, Acc} = Callback(complete, State#collector.user_acc),
- {stop, State#collector{user_acc=Acc}};
-
-handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) ->
- #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St,
- {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn),
- gen_server:reply(From, ok),
- {Go, St#collector{user_acc=Acc, limit=Limit-1}};
-
-handle_message(#view_row{} = Row, {Worker, From}, State) ->
- #collector{
- query_args = #view_query_args{direction=Dir},
- counters = Counters0,
- rows = Rows0,
- keys = KeyDict
- } = State,
- Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0),
- Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
- State1 = State#collector{rows=Rows, counters=Counters1},
- State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
- fabric_view:maybe_send_row(State2);
-
-handle_message(complete, Worker, State) ->
- Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
- fabric_view:maybe_send_row(State#collector{counters = Counters}).
-
-merge_row(fwd, undefined, Row, Rows) ->
- lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
- couch_view:less_json([KeyA, IdA], [KeyB, IdB])
- end, [Row], Rows);
-merge_row(rev, undefined, Row, Rows) ->
- lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
- couch_view:less_json([KeyB, IdB], [KeyA, IdA])
- end, [Row], Rows);
-merge_row(_, KeyDict, Row, Rows) ->
- lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) ->
- if A =:= B -> IdA < IdB; true ->
- dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict)
- end
- end, [Row], Rows).
diff --git a/apps/fabric/src/fabric_view_reduce.erl b/apps/fabric/src/fabric_view_reduce.erl
deleted file mode 100644
index 6ae564cc..00000000
--- a/apps/fabric/src/fabric_view_reduce.erl
+++ /dev/null
@@ -1,99 +0,0 @@
-% Copyright 2010 Cloudant
-%
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_view_reduce).
-
--export([go/6]).
-
--include("fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
- {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
- go(DbName, DDoc, View, Args, Callback, Acc0);
-
-go(DbName, DDoc, VName, Args, Callback, Acc0) ->
- #group{def_lang=Lang, views=Views} = Group =
- couch_view_group:design_doc_to_view_group(DDoc),
- {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
- {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs),
- Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
- Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}),
- Shard#shard{ref = Ref}
- end, mem3:shards(DbName)),
- BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"),
- #view_query_args{limit = Limit, skip = Skip} = Args,
- State = #collector{
- query_args = Args,
- callback = Callback,
- buffer_size = list_to_integer(BufferSize),
- counters = fabric_dict:init(Workers, 0),
- keys = Args#view_query_args.keys,
- skip = Skip,
- limit = Limit,
- lang = Group#group.def_lang,
- os_proc = couch_query_servers:get_os_process(Lang),
- reducer = RedSrc,
- rows = dict:new(),
- user_acc = Acc0
- },
- try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
- State, infinity, 1000 * 60 * 60) of
- {ok, NewState} ->
- {ok, NewState#collector.user_acc};
- Error ->
- Error
- after
- fabric_util:cleanup(Workers),
- catch couch_query_servers:ret_os_process(State#collector.os_proc)
- end.
-
-handle_message({rexi_DOWN, _, _, _}, nil, State) ->
- % TODO see if progress can be made here, possibly by removing all shards
- % from that node and checking is_progress_possible
- {ok, State};
-
-handle_message({rexi_EXIT, Reason}, Worker, State) ->
- ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
- #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
- Counters = fabric_dict:erase(Worker, Counters0),
- case fabric_view:is_progress_possible(Counters) of
- true ->
- {ok, State#collector{counters = Counters}};
- false ->
- Callback({error, dead_shards}, Acc),
- {error, dead_shards}
- end;
-
-handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
- #collector{counters = Counters0, rows = Rows0} = State,
- case fabric_dict:lookup_element(Worker, Counters0) of
- undefined ->
- % this worker lost the race with other partition copies, terminate it
- gen_server:reply(From, stop),
- {ok, State};
- _ ->
- Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0),
- C1 = fabric_dict:update_counter(Worker, 1, Counters0),
- % TODO time this call, if slow don't do it every time
- C2 = fabric_view:remove_overlapping_shards(Worker, C1),
- State1 = State#collector{rows=Rows, counters=C2},
- State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
- fabric_view:maybe_send_row(State2)
- end;
-
-handle_message(complete, Worker, State) ->
- Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
- fabric_view:maybe_send_row(State#collector{counters = Counters}).