summaryrefslogtreecommitdiff
path: root/deps/fabric/src
diff options
context:
space:
mode:
Diffstat (limited to 'deps/fabric/src')
-rw-r--r--deps/fabric/src/fabric.app.src6
-rw-r--r--deps/fabric/src/fabric.erl460
-rw-r--r--deps/fabric/src/fabric_db_create.erl161
-rw-r--r--deps/fabric/src/fabric_db_delete.erl95
-rw-r--r--deps/fabric/src/fabric_db_doc_count.erl68
-rw-r--r--deps/fabric/src/fabric_db_info.erl104
-rw-r--r--deps/fabric/src/fabric_db_meta.erl49
-rw-r--r--deps/fabric/src/fabric_db_update_listener.erl114
-rw-r--r--deps/fabric/src/fabric_dict.erl51
-rw-r--r--deps/fabric/src/fabric_doc_attachments.erl131
-rw-r--r--deps/fabric/src/fabric_doc_missing_revs.erl90
-rw-r--r--deps/fabric/src/fabric_doc_open.erl139
-rw-r--r--deps/fabric/src/fabric_doc_open_revs.erl307
-rw-r--r--deps/fabric/src/fabric_doc_update.erl297
-rw-r--r--deps/fabric/src/fabric_group_info.erl100
-rw-r--r--deps/fabric/src/fabric_rpc.erl485
-rw-r--r--deps/fabric/src/fabric_util.erl168
-rw-r--r--deps/fabric/src/fabric_view.erl362
-rw-r--r--deps/fabric/src/fabric_view_all_docs.erl181
-rw-r--r--deps/fabric/src/fabric_view_changes.erl334
-rw-r--r--deps/fabric/src/fabric_view_map.erl151
-rw-r--r--deps/fabric/src/fabric_view_reduce.erl114
22 files changed, 3967 insertions, 0 deletions
diff --git a/deps/fabric/src/fabric.app.src b/deps/fabric/src/fabric.app.src
new file mode 100644
index 00000000..a1cbb2c8
--- /dev/null
+++ b/deps/fabric/src/fabric.app.src
@@ -0,0 +1,6 @@
+{application, fabric, [
+ {description, "Routing and proxying layer for CouchDB cluster"},
+ {vsn, git},
+ {registered, []},
+ {applications, [kernel, stdlib, couch, rexi, mem3, twig]}
+]}.
diff --git a/deps/fabric/src/fabric.erl b/deps/fabric/src/fabric.erl
new file mode 100644
index 00000000..a8e3b2ea
--- /dev/null
+++ b/deps/fabric/src/fabric.erl
@@ -0,0 +1,460 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric).
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(ADMIN_CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>]}}).
+
+% DBs
+-export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1,
+ delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
+ set_security/3, get_revs_limit/1, get_security/1, get_security/2]).
+
+% Documents
+-export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3,
+ update_doc/3, update_docs/3, purge_docs/2, att_receiver/2]).
+
+% Views
+-export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6,
+ get_view_group_info/2]).
+
+% miscellany
+-export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0,
+ cleanup_index_files/1]).
+
+-include("fabric.hrl").
+
+-type dbname() :: (iodata() | #db{}).
+-type docid() :: iodata().
+-type revision() :: {integer(), binary()}.
+-type callback() :: fun((any(), any()) -> {ok | stop, any()}).
+-type json_obj() :: {[{binary() | atom(), any()}]}.
+-type option() :: atom() | {atom(), any()}.
+
+%% db operations
+%% @equiv all_dbs(<<>>)
+all_dbs() ->
+ all_dbs(<<>>).
+
+%% @doc returns a list of all database names
+-spec all_dbs(Prefix::iodata()) -> {ok, [binary()]}.
+all_dbs(Prefix) when is_binary(Prefix) ->
+ Length = byte_size(Prefix),
+ MatchingDbs = ets:foldl(fun(#shard{dbname=DbName}, Acc) ->
+ case DbName of
+ <<Prefix:Length/binary, _/binary>> ->
+ [DbName | Acc];
+ _ ->
+ Acc
+ end
+ end, [], partitions),
+ {ok, lists:usort(MatchingDbs)};
+
+%% @equiv all_dbs(list_to_binary(Prefix))
+all_dbs(Prefix) when is_list(Prefix) ->
+ all_dbs(list_to_binary(Prefix)).
+
+%% @doc returns a property list of interesting properties
+%% about the database such as `doc_count', `disk_size',
+%% etc.
+-spec get_db_info(dbname()) ->
+ {ok, [
+ {instance_start_time, binary()} |
+ {doc_count, non_neg_integer()} |
+ {doc_del_count, non_neg_integer()} |
+ {purge_seq, non_neg_integer()} |
+ {compact_running, boolean()} |
+ {disk_size, non_neg_integer()} |
+ {disk_format_version, pos_integer()}
+ ]}.
+get_db_info(DbName) ->
+ fabric_db_info:go(dbname(DbName)).
+
+%% @doc the number of docs in a database
+-spec get_doc_count(dbname()) -> {ok, non_neg_integer()}.
+get_doc_count(DbName) ->
+ fabric_db_doc_count:go(dbname(DbName)).
+
+%% @equiv create_db(DbName, [])
+create_db(DbName) ->
+ create_db(DbName, []).
+
+%% @doc creates a database with the given name.
+%%
+%% Options can include values for q and n,
+%% for example `{q, "8"}' and `{n, "3"}', which
+%% control how many shards to split a database into
+%% and how many nodes each doc is copied to respectively.
+%%
+-spec create_db(dbname(), [option()]) -> ok | accepted | {error, atom()}.
+create_db(DbName, Options) ->
+ fabric_db_create:go(dbname(DbName), opts(Options)).
+
+%% @equiv delete_db([])
+delete_db(DbName) ->
+ delete_db(DbName, []).
+
+%% @doc delete a database
+-spec delete_db(dbname(), [option()]) -> ok | accepted | {error, atom()}.
+delete_db(DbName, Options) ->
+ fabric_db_delete:go(dbname(DbName), opts(Options)).
+
+%% @doc provide an upper bound for the number of tracked document revisions
+-spec set_revs_limit(dbname(), pos_integer(), [option()]) -> ok.
+set_revs_limit(DbName, Limit, Options) when is_integer(Limit), Limit > 0 ->
+ fabric_db_meta:set_revs_limit(dbname(DbName), Limit, opts(Options)).
+
+%% @doc retrieves the maximum number of document revisions
+-spec get_revs_limit(dbname()) -> pos_integer() | no_return().
+get_revs_limit(DbName) ->
+ {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
+ try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end.
+
+%% @doc sets the readers/writers/admin permissions for a database
+-spec set_security(dbname(), SecObj::json_obj(), [option()]) -> ok.
+set_security(DbName, SecObj, Options) ->
+ fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)).
+
+get_security(DbName) ->
+ get_security(DbName, [?ADMIN_CTX]).
+
+%% @doc retrieve the security object for a database
+-spec get_security(dbname()) -> json_obj() | no_return().
+get_security(DbName, Options) ->
+ {ok, Db} = fabric_util:get_db(dbname(DbName), opts(Options)),
+ try couch_db:get_security(Db) after catch couch_db:close(Db) end.
+
+% doc operations
+
+%% @doc retrieve the doc with a given id
+-spec open_doc(dbname(), docid(), [option()]) ->
+ {ok, #doc{}} |
+ {not_found, missing | deleted} |
+ {timeout, any()} |
+ {error, any()} |
+ {error, any() | any()}.
+open_doc(DbName, Id, Options) ->
+ fabric_doc_open:go(dbname(DbName), docid(Id), opts(Options)).
+
+%% @doc retrieve a collection of revisions, possible all
+-spec open_revs(dbname(), docid(), [revision()] | all, [option()]) ->
+ {ok, [{ok, #doc{}} | {{not_found,missing}, revision()}]} |
+ {timeout, any()} |
+ {error, any()} |
+ {error, any(), any()}.
+open_revs(DbName, Id, Revs, Options) ->
+ fabric_doc_open_revs:go(dbname(DbName), docid(Id), Revs, opts(Options)).
+
+%% @equiv get_missing_revs(DbName, IdsRevs, [])
+get_missing_revs(DbName, IdsRevs) ->
+ get_missing_revs(DbName, IdsRevs, []).
+
+%% @doc retrieve missing revisions for a list of `{Id, Revs}'
+-spec get_missing_revs(dbname(),[{docid(), [revision()]}], [option()]) ->
+ {ok, [{docid(), any(), [any()]}]}.
+get_missing_revs(DbName, IdsRevs, Options) when is_list(IdsRevs) ->
+ Sanitized = [idrevs(IdR) || IdR <- IdsRevs],
+ fabric_doc_missing_revs:go(dbname(DbName), Sanitized, opts(Options)).
+
+%% @doc update a single doc
+%% @equiv update_docs(DbName,[Doc],Options)
+-spec update_doc(dbname(), #doc{}, [option()]) ->
+ {ok, any()} | any().
+update_doc(DbName, Doc, Options) ->
+ case update_docs(DbName, [Doc], opts(Options)) of
+ {ok, [{ok, NewRev}]} ->
+ {ok, NewRev};
+ {accepted, [{accepted, NewRev}]} ->
+ {accepted, NewRev};
+ {ok, [{{_Id, _Rev}, Error}]} ->
+ throw(Error);
+ {ok, [Error]} ->
+ throw(Error);
+ {ok, []} ->
+ % replication success
+ #doc{revs = {Pos, [RevId | _]}} = doc(Doc),
+ {ok, {Pos, RevId}}
+ end.
+
+%% @doc update a list of docs
+-spec update_docs(dbname(), [#doc{}], [option()]) ->
+ {ok, any()} | any().
+update_docs(DbName, Docs, Options) ->
+ try
+ fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)) of
+ {ok, Results} ->
+ {ok, Results};
+ {accepted, Results} ->
+ {accepted, Results};
+ Error ->
+ throw(Error)
+ catch {aborted, PreCommitFailures} ->
+ {aborted, PreCommitFailures}
+ end.
+
+purge_docs(_DbName, _IdsRevs) ->
+ not_implemented.
+
+%% @doc spawns a process to upload attachment data and
+%% returns a function that shards can use to communicate
+%% with the spawned middleman process
+-spec att_receiver(#httpd{}, Length :: undefined | chunked | pos_integer() |
+ {unknown_transfer_encoding, any()}) ->
+ function() | binary().
+att_receiver(Req, Length) ->
+ fabric_doc_attachments:receiver(Req, Length).
+
+%% @doc retrieves all docs. Additional query parameters, such as `limit',
+%% `start_key' and `end_key', `descending', and `include_docs', can
+%% also be passed to further constrain the query. See <a href=
+%% "http://wiki.apache.org/couchdb/HTTP_Document_API#All_Documents">
+%% all_docs</a> for details
+-spec all_docs(dbname(), callback(), [] | tuple(), #view_query_args{}) ->
+ {ok, [any()]}.
+all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs) when
+ is_function(Callback, 2) ->
+ fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0);
+
+%% @doc convenience function that takes a keylist rather than a record
+%% @equiv all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs))
+all_docs(DbName, Callback, Acc0, QueryArgs) ->
+ all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs)).
+
+
+-spec changes(dbname(), callback(), any(), #changes_args{} | [{atom(),any()}]) ->
+ {ok, any()}.
+changes(DbName, Callback, Acc0, #changes_args{}=Options) ->
+ Feed = Options#changes_args.feed,
+ fabric_view_changes:go(dbname(DbName), Feed, Options, Callback, Acc0);
+
+%% @doc convenience function, takes keylist instead of record
+%% @equiv changes(DbName, Callback, Acc0, kl_to_changes_args(Options))
+changes(DbName, Callback, Acc0, Options) ->
+ changes(DbName, Callback, Acc0, kl_to_changes_args(Options)).
+
+%% @equiv query_view(DbName, DesignName, ViewName, #view_query_args{})
+query_view(DbName, DesignName, ViewName) ->
+ query_view(DbName, DesignName, ViewName, #view_query_args{}).
+
+%% @equiv query_view(DbName, DesignName,
+%% ViewName, fun default_callback/2, [], QueryArgs)
+query_view(DbName, DesignName, ViewName, QueryArgs) ->
+ Callback = fun default_callback/2,
+ query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs).
+
+%% @doc execute a given view.
+%% There are many additional query args that can be passed to a view,
+%% see <a href="http://wiki.apache.org/couchdb/HTTP_view_API#Querying_Options">
+%% query args</a> for details.
+-spec query_view(dbname(), #doc{} | binary(), iodata(), callback(), any(),
+ #view_query_args{}) ->
+ any().
+query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) ->
+ Db = dbname(DbName), View = name(ViewName),
+ case is_reduce_view(Db, Design, View, QueryArgs) of
+ true ->
+ Mod = fabric_view_reduce;
+ false ->
+ Mod = fabric_view_map
+ end,
+ Mod:go(Db, Design, View, QueryArgs, Callback, Acc0).
+
+%% @doc retrieve info about a view group, disk size, language, whether compaction
+%% is running and so forth
+-spec get_view_group_info(dbname(), #doc{} | docid()) ->
+ {ok, [
+ {signature, binary()} |
+ {language, binary()} |
+ {disk_size, non_neg_integer()} |
+ {compact_running, boolean()} |
+ {updater_running, boolean()} |
+ {waiting_commit, boolean()} |
+ {waiting_clients, non_neg_integer()} |
+ {update_seq, pos_integer()} |
+ {purge_seq, non_neg_integer()}
+ ]}.
+get_view_group_info(DbName, DesignId) ->
+ fabric_group_info:go(dbname(DbName), design_doc(DesignId)).
+
+%% @doc retrieve all the design docs from a database
+-spec design_docs(dbname()) -> {ok, [json_obj()]}.
+design_docs(DbName) ->
+ QueryArgs = #view_query_args{start_key = <<"_design/">>, include_docs=true},
+ Callback = fun({total_and_offset, _, _}, []) ->
+ {ok, []};
+ ({row, {Props}}, Acc) ->
+ case couch_util:get_value(id, Props) of
+ <<"_design/", _/binary>> ->
+ {ok, [couch_util:get_value(doc, Props) | Acc]};
+ _ ->
+ {stop, Acc}
+ end;
+ (complete, Acc) ->
+ {ok, lists:reverse(Acc)};
+ ({error, Reason}, _Acc) ->
+ {error, Reason}
+ end,
+ fabric:all_docs(dbname(DbName), Callback, [], QueryArgs).
+
+%% @doc forces a reload of validation functions, this is performed after
+%% design docs are update
+%% NOTE: This function probably doesn't belong here as part fo the API
+-spec reset_validation_funs(dbname()) -> [reference()].
+reset_validation_funs(DbName) ->
+ [rexi:cast(Node, {fabric_rpc, reset_validation_funs, [Name]}) ||
+ #shard{node=Node, name=Name} <- mem3:shards(DbName)].
+
+%% @doc clean up index files for all Dbs
+-spec cleanup_index_files() -> [ok].
+cleanup_index_files() ->
+ {ok, Dbs} = fabric:all_dbs(),
+ [cleanup_index_files(Db) || Db <- Dbs].
+
+%% @doc clean up index files for a specific db
+-spec cleanup_index_files(dbname()) -> ok.
+cleanup_index_files(DbName) ->
+ {ok, DesignDocs} = fabric:design_docs(DbName),
+
+ ActiveSigs = lists:map(fun(#doc{id = GroupId}) ->
+ {ok, Info} = fabric:get_view_group_info(DbName, GroupId),
+ binary_to_list(couch_util:get_value(signature, Info))
+ end, [couch_doc:from_json_obj(DD) || DD <- DesignDocs]),
+
+ FileList = filelib:wildcard([couch_config:get("couchdb", "view_index_dir"),
+ "/.shards/*/", couch_util:to_list(dbname(DbName)), ".[0-9]*_design/*"]),
+
+ DeleteFiles = if ActiveSigs =:= [] -> FileList; true ->
+ {ok, RegExp} = re:compile([$(, string:join(ActiveSigs, "|"), $)]),
+ lists:filter(fun(FilePath) ->
+ re:run(FilePath, RegExp, [{capture, none}]) == nomatch
+ end, FileList)
+ end,
+ [file:delete(File) || File <- DeleteFiles],
+ ok.
+
+%% some simple type validation and transcoding
+
+dbname(DbName) when is_list(DbName) ->
+ list_to_binary(DbName);
+dbname(DbName) when is_binary(DbName) ->
+ DbName;
+dbname(#db{name=Name}) ->
+ Name;
+dbname(DbName) ->
+ erlang:error({illegal_database_name, DbName}).
+
+name(Thing) ->
+ couch_util:to_binary(Thing).
+
+docid(DocId) when is_list(DocId) ->
+ list_to_binary(DocId);
+docid(DocId) when is_binary(DocId) ->
+ DocId;
+docid(DocId) ->
+ erlang:error({illegal_docid, DocId}).
+
+docs(Docs) when is_list(Docs) ->
+ [doc(D) || D <- Docs];
+docs(Docs) ->
+ erlang:error({illegal_docs_list, Docs}).
+
+doc(#doc{} = Doc) ->
+ Doc;
+doc({_} = Doc) ->
+ couch_doc:from_json_obj(Doc);
+doc(Doc) ->
+ erlang:error({illegal_doc_format, Doc}).
+
+design_doc(#doc{} = DDoc) ->
+ DDoc;
+design_doc(DocId) when is_list(DocId) ->
+ design_doc(list_to_binary(DocId));
+design_doc(<<"_design/", _/binary>> = DocId) ->
+ DocId;
+design_doc(GroupName) ->
+ <<"_design/", GroupName/binary>>.
+
+idrevs({Id, Revs}) when is_list(Revs) ->
+ {docid(Id), [rev(R) || R <- Revs]}.
+
+rev(Rev) when is_list(Rev); is_binary(Rev) ->
+ couch_doc:parse_rev(Rev);
+rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
+ Rev.
+
+%% @doc convenience method, useful when testing or calling fabric from the shell
+opts(Options) ->
+ add_option(user_ctx, add_option(io_priority, Options)).
+
+add_option(Key, Options) ->
+ case couch_util:get_value(Key, Options) of
+ undefined ->
+ case erlang:get(Key) of
+ undefined ->
+ Options;
+ Value ->
+ [{Key, Value} | Options]
+ end;
+ _ ->
+ Options
+ end.
+
+default_callback(complete, Acc) ->
+ {ok, lists:reverse(Acc)};
+default_callback(Row, Acc) ->
+ {ok, [Row | Acc]}.
+
+is_reduce_view(_, _, _, #view_query_args{view_type=Reduce}) ->
+ Reduce =:= reduce.
+
+%% @doc convenience method for use in the shell, converts a keylist
+%% to a `changes_args' record
+kl_to_changes_args(KeyList) ->
+ kl_to_record(KeyList, changes_args).
+
+%% @doc convenience method for use in the shell, converts a keylist
+%% to a `view_query_args' record
+kl_to_query_args(KeyList) ->
+ kl_to_record(KeyList, view_query_args).
+
+%% @doc finds the index of the given Key in the record.
+%% note that record_info is only known at compile time
+%% so the code must be written in this way. For each new
+%% record type add a case clause
+lookup_index(Key,RecName) ->
+ Indexes =
+ case RecName of
+ changes_args ->
+ lists:zip(record_info(fields, changes_args),
+ lists:seq(2, record_info(size, changes_args)));
+ view_query_args ->
+ lists:zip(record_info(fields, view_query_args),
+ lists:seq(2, record_info(size, view_query_args)))
+ end,
+ couch_util:get_value(Key, Indexes).
+
+%% @doc convert a keylist to record with given `RecName'
+%% @see lookup_index
+kl_to_record(KeyList,RecName) ->
+ Acc0 = case RecName of
+ changes_args -> #changes_args{};
+ view_query_args -> #view_query_args{}
+ end,
+ lists:foldl(fun({Key, Value}, Acc) ->
+ Index = lookup_index(couch_util:to_existing_atom(Key),RecName),
+ setelement(Index, Acc, Value)
+ end, Acc0, KeyList).
diff --git a/deps/fabric/src/fabric_db_create.erl b/deps/fabric/src/fabric_db_create.erl
new file mode 100644
index 00000000..080517be
--- /dev/null
+++ b/deps/fabric/src/fabric_db_create.erl
@@ -0,0 +1,161 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_create).
+-export([go/2]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(DBNAME_REGEX, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$").
+
+%% @doc Create a new database, and all its partition files across the cluster
+%% Options is proplist with user_ctx, n, q, validate_name
+go(DbName, Options) ->
+ case validate_dbname(DbName, Options) of
+ ok ->
+ {Shards, Doc} = generate_shard_map(DbName, Options),
+ case {create_shard_files(Shards), create_shard_db_doc(Doc)} of
+ {ok, {ok, Status}} ->
+ Status;
+ {file_exists, {ok, _}} ->
+ {error, file_exists};
+ {_, Error} ->
+ Error
+ end;
+ Error ->
+ Error
+ end.
+
+validate_dbname(DbName, Options) ->
+ case couch_util:get_value(validate_name, Options, true) of
+ false ->
+ ok;
+ true ->
+ case re:run(DbName, ?DBNAME_REGEX, [{capture,none}]) of
+ match ->
+ ok;
+ nomatch when DbName =:= <<"_users">> ->
+ ok;
+ nomatch when DbName =:= <<"_replicator">> ->
+ ok;
+ nomatch ->
+ {error, illegal_database_name}
+ end
+ end.
+
+generate_shard_map(DbName, Options) ->
+ {MegaSecs, Secs, _} = now(),
+ Suffix = "." ++ integer_to_list(MegaSecs*1000000 + Secs),
+ Shards = mem3:choose_shards(DbName, [{shard_suffix,Suffix} | Options]),
+ case mem3_util:open_db_doc(DbName) of
+ {ok, Doc} ->
+ % the DB already exists, and may have a different Suffix
+ ok;
+ {not_found, _} ->
+ Doc = make_document(Shards, Suffix)
+ end,
+ {Shards, Doc}.
+
+create_shard_files(Shards) ->
+ Workers = fabric_util:submit_jobs(Shards, create_db, []),
+ RexiMon = fabric_util:create_monitors(Shards),
+ try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Workers) of
+ {error, file_exists} ->
+ file_exists;
+ _ ->
+ ok
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+handle_message(file_exists, _, _) ->
+ {error, file_exists};
+
+handle_message({rexi_DOWN, _, {_, Node}, _}, _, Workers) ->
+ case lists:filter(fun(S) -> S#shard.node =/= Node end, Workers) of
+ [] ->
+ {stop, ok};
+ RemainingWorkers ->
+ {ok, RemainingWorkers}
+ end;
+
+handle_message(_, Worker, Workers) ->
+ case lists:delete(Worker, Workers) of
+ [] ->
+ {stop, ok};
+ RemainingWorkers ->
+ {ok, RemainingWorkers}
+ end.
+
+create_shard_db_doc(Doc) ->
+ Shards = [#shard{node=N} || N <- mem3:nodes()],
+ RexiMon = fabric_util:create_monitors(Shards),
+ Workers = fabric_util:submit_jobs(Shards, create_shard_db_doc, [Doc]),
+ Acc0 = {length(Shards), fabric_dict:init(Workers, nil)},
+ try fabric_util:recv(Workers, #shard.ref, fun handle_db_update/3, Acc0) of
+ {timeout, _} ->
+ {error, timeout};
+ Else ->
+ Else
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+handle_db_update({rexi_DOWN, _, {_, Node}, _}, _Worker, {W, Counters}) ->
+ New = fabric_dict:filter(fun(S, _) -> S#shard.node =/= Node end, Counters),
+ maybe_stop(W, New);
+
+handle_db_update({rexi_EXIT, _Reason}, Worker, {W, Counters}) ->
+ maybe_stop(W, fabric_dict:erase(Worker, Counters));
+
+handle_db_update(conflict, _, _) ->
+ % just fail when we get any conflicts
+ {error, conflict};
+
+handle_db_update(Msg, Worker, {W, Counters}) ->
+ maybe_stop(W, fabric_dict:store(Worker, Msg, Counters)).
+
+maybe_stop(W, Counters) ->
+ case fabric_dict:any(nil, Counters) of
+ true ->
+ {ok, {W, Counters}};
+ false ->
+ case lists:sum([1 || {_, ok} <- Counters]) of
+ W ->
+ {stop, ok};
+ NumOk when NumOk >= (W div 2 + 1) ->
+ {stop, accepted};
+ _ ->
+ {error, internal_server_error}
+ end
+ end.
+
+make_document([#shard{dbname=DbName}|_] = Shards, Suffix) ->
+ {RawOut, ByNodeOut, ByRangeOut} =
+ lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) ->
+ Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-",
+ couch_util:to_hex(<<E:32/integer>>)]),
+ Node = couch_util:to_binary(N),
+ {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode),
+ orddict:append(Range, Node, ByRange)}
+ end, {[], [], []}, Shards),
+ #doc{id=DbName, body = {[
+ {<<"shard_suffix">>, Suffix},
+ {<<"changelog">>, lists:sort(RawOut)},
+ {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
+ {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
+ ]}}.
+
diff --git a/deps/fabric/src/fabric_db_delete.erl b/deps/fabric/src/fabric_db_delete.erl
new file mode 100644
index 00000000..9283d0b2
--- /dev/null
+++ b/deps/fabric/src/fabric_db_delete.erl
@@ -0,0 +1,95 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_delete).
+-export([go/2]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+%% @doc Options aren't used at all now in couch on delete but are left here
+%% to be consistent with fabric_db_create for possible future use
+%% @see couch_server:delete_db
+%%
+go(DbName, _Options) ->
+ Shards = mem3:shards(DbName),
+ % delete doc from shard_db
+ try delete_shard_db_doc(DbName) of
+ {ok, ok} ->
+ ok;
+ {ok, accepted} ->
+ accepted;
+ {ok, not_found} ->
+ erlang:error(database_does_not_exist, DbName);
+ Error ->
+ Error
+ after
+ % delete the shard files
+ fabric_util:submit_jobs(Shards, delete_db, [])
+ end.
+
+delete_shard_db_doc(Doc) ->
+ Shards = [#shard{node=N} || N <- mem3:nodes()],
+ RexiMon = fabric_util:create_monitors(Shards),
+ Workers = fabric_util:submit_jobs(Shards, delete_shard_db_doc, [Doc]),
+ Acc0 = {length(Shards), fabric_dict:init(Workers, nil)},
+ try fabric_util:recv(Workers, #shard.ref, fun handle_db_update/3, Acc0) of
+ {timeout, _} ->
+ {error, timeout};
+ Else ->
+ Else
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+handle_db_update({rexi_DOWN, _, {_, Node}, _}, _Worker, {W, Counters}) ->
+ New = fabric_dict:filter(fun(S, _) -> S#shard.node =/= Node end, Counters),
+ maybe_stop(W, New);
+
+handle_db_update({rexi_EXIT, _Reason}, Worker, {W, Counters}) ->
+ maybe_stop(W, fabric_dict:erase(Worker, Counters));
+
+handle_db_update(conflict, _, _) ->
+ % just fail when we get any conflicts
+ {error, conflict};
+
+handle_db_update(Msg, Worker, {W, Counters}) ->
+ maybe_stop(W, fabric_dict:store(Worker, Msg, Counters)).
+
+maybe_stop(W, Counters) ->
+ case fabric_dict:any(nil, Counters) of
+ true ->
+ {ok, {W, Counters}};
+ false ->
+ {Ok,NotFound} = fabric_dict:fold(fun count_replies/3, {0,0}, Counters),
+ case {Ok + NotFound, Ok, NotFound} of
+ {W, 0, W} ->
+ {#shard{dbname=Name}, _} = hd(Counters),
+ twig:log(warn, "~p not_found ~s", [?MODULE, Name]),
+ {stop, not_found};
+ {W, _, _} ->
+ {stop, ok};
+ {N, M, _} when N >= (W div 2 + 1), M > 0 ->
+ {stop, accepted};
+ _ ->
+ {error, internal_server_error}
+ end
+ end.
+
+count_replies(_, ok, {Ok, NotFound}) ->
+ {Ok+1, NotFound};
+count_replies(_, not_found, {Ok, NotFound}) ->
+ {Ok, NotFound+1};
+count_replies(_, _, Acc) ->
+ Acc.
diff --git a/deps/fabric/src/fabric_db_doc_count.erl b/deps/fabric/src/fabric_db_doc_count.erl
new file mode 100644
index 00000000..107f212a
--- /dev/null
+++ b/deps/fabric/src/fabric_db_doc_count.erl
@@ -0,0 +1,68 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_doc_count).
+
+-export([go/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, get_doc_count, []),
+ RexiMon = fabric_util:create_monitors(Shards),
+ Acc0 = {fabric_dict:init(Workers, nil), 0},
+ try
+ fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0)
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
+ case fabric_util:remove_down_workers(Counters, NodeRef) of
+ {ok, NewCounters} ->
+ {ok, {NewCounters, Acc}};
+ error ->
+ {error, {nodedown, <<"progress not possible">>}}
+ end;
+
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+ NewCounters = lists:keydelete(Shard, 1, Counters),
+ case fabric_view:is_progress_possible(NewCounters) of
+ true ->
+ {ok, {NewCounters, Acc}};
+ false ->
+ {error, Reason}
+ end;
+
+handle_message({ok, Count}, Shard, {Counters, Acc}) ->
+ case fabric_dict:lookup_element(Shard, Counters) of
+ undefined ->
+ % already heard from someone else in this range
+ {ok, {Counters, Acc}};
+ nil ->
+ C1 = fabric_dict:store(Shard, ok, Counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ case fabric_dict:any(nil, C2) of
+ true ->
+ {ok, {C2, Count+Acc}};
+ false ->
+ {stop, Count+Acc}
+ end
+ end;
+handle_message(_, _, Acc) ->
+ {ok, Acc}.
+
diff --git a/deps/fabric/src/fabric_db_info.erl b/deps/fabric/src/fabric_db_info.erl
new file mode 100644
index 00000000..63fb44a5
--- /dev/null
+++ b/deps/fabric/src/fabric_db_info.erl
@@ -0,0 +1,104 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_info).
+
+-export([go/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+go(DbName) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
+ RexiMon = fabric_util:create_monitors(Shards),
+ Acc0 = {fabric_dict:init(Workers, nil), []},
+ try
+ fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0)
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
+ case fabric_util:remove_down_workers(Counters, NodeRef) of
+ {ok, NewCounters} ->
+ {ok, {NewCounters, Acc}};
+ error ->
+ {error, {nodedown, <<"progress not possible">>}}
+ end;
+
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+ NewCounters = lists:keydelete(Shard,1,Counters),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, {NewCounters, Acc}};
+ false ->
+ {error, Reason}
+ end;
+
+handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
+ case fabric_dict:lookup_element(Shard, Counters) of
+ undefined ->
+ % already heard from someone else in this range
+ {ok, {Counters, Acc}};
+ nil ->
+ Seq = couch_util:get_value(update_seq, Info),
+ C1 = fabric_dict:store(Shard, Seq, Counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ case fabric_dict:any(nil, C2) of
+ true ->
+ {ok, {C2, [Info|Acc]}};
+ false ->
+ {stop, [
+ {db_name,Name},
+ {update_seq, fabric_view_changes:pack_seqs(C2)} |
+ merge_results(lists:flatten([Info|Acc]))
+ ]}
+ end
+ end;
+handle_message(_, _, Acc) ->
+ {ok, Acc}.
+
+merge_results(Info) ->
+ Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end,
+ orddict:new(), Info),
+ orddict:fold(fun
+ (doc_count, X, Acc) ->
+ [{doc_count, lists:sum(X)} | Acc];
+ (doc_del_count, X, Acc) ->
+ [{doc_del_count, lists:sum(X)} | Acc];
+ (purge_seq, X, Acc) ->
+ [{purge_seq, lists:sum(X)} | Acc];
+ (compact_running, X, Acc) ->
+ [{compact_running, lists:member(true, X)} | Acc];
+ (disk_size, X, Acc) ->
+ [{disk_size, lists:sum(X)} | Acc];
+ (other, X, Acc) ->
+ [{other, {merge_other_results(X)}} | Acc];
+ (disk_format_version, X, Acc) ->
+ [{disk_format_version, lists:max(X)} | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [{instance_start_time, <<"0">>}], Dict).
+
+merge_other_results(Results) ->
+ Dict = lists:foldl(fun({Props}, D) ->
+ lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, D, Props)
+ end, orddict:new(), Results),
+ orddict:fold(fun
+ (data_size, X, Acc) ->
+ [{data_size, lists:sum(X)} | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Dict).
diff --git a/deps/fabric/src/fabric_db_meta.erl b/deps/fabric/src/fabric_db_meta.erl
new file mode 100644
index 00000000..87721555
--- /dev/null
+++ b/deps/fabric/src/fabric_db_meta.erl
@@ -0,0 +1,49 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_meta).
+
+-export([set_revs_limit/3, set_security/3]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+set_revs_limit(DbName, Limit, Options) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, set_revs_limit, [Limit, Options]),
+ Waiting = length(Workers) - 1,
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of
+ {ok, ok} ->
+ ok;
+ Error ->
+ Error
+ end.
+
+set_security(DbName, SecObj, Options) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, set_security, [SecObj, Options]),
+ Waiting = length(Workers) - 1,
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of
+ {ok, ok} ->
+ ok;
+ Error ->
+ Error
+ end.
+
+handle_message(ok, _, 0) ->
+ {stop, ok};
+handle_message(ok, _, Waiting) ->
+ {ok, Waiting - 1};
+handle_message(Error, _, _Waiting) ->
+ {error, Error}. \ No newline at end of file
diff --git a/deps/fabric/src/fabric_db_update_listener.erl b/deps/fabric/src/fabric_db_update_listener.erl
new file mode 100644
index 00000000..e29f3ec7
--- /dev/null
+++ b/deps/fabric/src/fabric_db_update_listener.erl
@@ -0,0 +1,114 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_update_listener).
+
+-export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+go(Parent, ParentRef, DbName, Timeout) ->
+ Notifiers = start_update_notifiers(DbName),
+ MonRefs = lists:usort([{rexi_server, Node} || {Node, _Ref} <- Notifiers]),
+ RexiMon = rexi_monitor:start(MonRefs),
+ %% Add calling controller node as rexi end point as this controller will
+ %% receive messages from it
+ Workers = [{Parent, ParentRef} | Notifiers],
+ try
+ receive_results(Workers, {Workers, Parent, unset}, Timeout)
+ after
+ rexi_monitor:stop(RexiMon),
+ stop_update_notifiers(Notifiers)
+ end.
+
+start_update_notifiers(DbName) ->
+ lists:map(fun(#shard{node=Node, name=Name}) ->
+ {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
+ end, mem3:shards(DbName)).
+
+% rexi endpoint
+start_update_notifier(DbName) ->
+ {Caller, Ref} = get(rexi_from),
+ Fun = fun({_, X}) when X == DbName ->
+ erlang:send(Caller, {Ref, db_updated}); (_) -> ok end,
+ Id = {couch_db_update_notifier, make_ref()},
+ ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
+ receive {gen_event_EXIT, Id, Reason} ->
+ rexi:reply({gen_event_EXIT, DbName, Reason})
+ end.
+
+stop_update_notifiers(Notifiers) ->
+ [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers].
+
+stop({Pid, Ref}) ->
+ erlang:send(Pid, {Ref, done}).
+
+wait_db_updated({Pid, Ref}) ->
+ erlang:send(Pid, {Ref, get_state}),
+ receive
+ Any ->
+ Any
+ end.
+
+receive_results(Workers, State, Timeout) ->
+ case rexi_utils:recv(Workers, 2, fun handle_message/3, State,
+ infinity, Timeout) of
+ {timeout, {NewWorkers, Parent, State1}} ->
+ erlang:send(Parent, timeout),
+ State2 =
+ case State1 of
+ waiting ->
+ unset;
+ Any -> Any
+ end,
+ receive_results(NewWorkers, {NewWorkers, Parent, State2}, Timeout);
+ {_, NewState} ->
+ {ok, NewState}
+ end.
+
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Workers, Parent, State}) ->
+ NewWorkers = lists:filter(fun({_Node, Ref}) -> NodeRef =/= Ref end, Workers),
+ case NewWorkers of
+ [] ->
+ {error, {nodedown, <<"progress not possible">>}};
+ _ ->
+ {ok, {NewWorkers, Parent, State}}
+ end;
+handle_message({rexi_EXIT, Reason}, Worker, {Workers, Parent, State}) ->
+ NewWorkers = lists:delete(Worker,Workers),
+ case NewWorkers of
+ [] ->
+ {error, Reason};
+ _ ->
+ {ok, {NewWorkers, Parent, State}}
+ end;
+handle_message(db_updated, {_Worker, _From}, {Workers, Parent, waiting}) ->
+ % propagate message to calling controller
+ erlang:send(Parent, updated),
+ {ok, {Workers, Parent, unset}};
+handle_message(db_updated, _Worker, {Workers, Parent, State})
+ when State == unset orelse State == updated ->
+ {ok, {Workers, Parent, updated}};
+handle_message(get_state, {_Worker, _From}, {Workers, Parent, unset}) ->
+ {ok, {Workers, Parent, waiting}};
+handle_message(get_state, {_Worker, _From}, {Workers, Parent, State}) ->
+ erlang:send(Parent, State),
+ {ok, {Workers, Parent, unset}};
+handle_message(done, _, _) ->
+ {stop, ok}.
+
+
+
diff --git a/deps/fabric/src/fabric_dict.erl b/deps/fabric/src/fabric_dict.erl
new file mode 100644
index 00000000..cea537ca
--- /dev/null
+++ b/deps/fabric/src/fabric_dict.erl
@@ -0,0 +1,51 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_dict).
+-compile(export_all).
+
+% Instead of ets, let's use an ordered keylist. We'll need to revisit if we
+% have >> 100 shards, so a private interface is a good idea. - APK June 2010
+
+init(Keys, InitialValue) ->
+ orddict:from_list([{Key, InitialValue} || Key <- Keys]).
+
+
+decrement_all(Dict) ->
+ [{K,V-1} || {K,V} <- Dict].
+
+store(Key, Value, Dict) ->
+ orddict:store(Key, Value, Dict).
+
+erase(Key, Dict) ->
+ orddict:erase(Key, Dict).
+
+update_counter(Key, Incr, Dict0) ->
+ orddict:update_counter(Key, Incr, Dict0).
+
+
+lookup_element(Key, Dict) ->
+ couch_util:get_value(Key, Dict).
+
+size(Dict) ->
+ orddict:size(Dict).
+
+any(Value, Dict) ->
+ lists:keymember(Value, 2, Dict).
+
+filter(Fun, Dict) ->
+ orddict:filter(Fun, Dict).
+
+fold(Fun, Acc0, Dict) ->
+ orddict:fold(Fun, Acc0, Dict).
diff --git a/deps/fabric/src/fabric_doc_attachments.erl b/deps/fabric/src/fabric_doc_attachments.erl
new file mode 100644
index 00000000..a45ba7c7
--- /dev/null
+++ b/deps/fabric/src/fabric_doc_attachments.erl
@@ -0,0 +1,131 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_attachments).
+
+-include("fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% couch api calls
+-export([receiver/2]).
+
+receiver(_Req, undefined) ->
+ <<"">>;
+receiver(_Req, {unknown_transfer_encoding, Unknown}) ->
+ exit({unknown_transfer_encoding, Unknown});
+receiver(Req, chunked) ->
+ MiddleMan = spawn(fun() -> middleman(Req, chunked) end),
+ fun(4096, ChunkFun, ok) ->
+ write_chunks(MiddleMan, ChunkFun)
+ end;
+receiver(_Req, 0) ->
+ <<"">>;
+receiver(Req, Length) when is_integer(Length) ->
+ maybe_send_continue(Req),
+ Middleman = spawn(fun() -> middleman(Req, Length) end),
+ fun() ->
+ Middleman ! {self(), gimme_data},
+ receive {Middleman, Data} -> Data end
+ end;
+receiver(_Req, Length) ->
+ exit({length_not_integer, Length}).
+
+%%
+%% internal
+%%
+
+maybe_send_continue(#httpd{mochi_req = MochiReq} = Req) ->
+ case couch_httpd:header_value(Req, "expect") of
+ undefined ->
+ ok;
+ Expect ->
+ case string:to_lower(Expect) of
+ "100-continue" ->
+ MochiReq:start_raw_response({100, gb_trees:empty()});
+ _ ->
+ ok
+ end
+ end.
+
+write_chunks(MiddleMan, ChunkFun) ->
+ MiddleMan ! {self(), gimme_data},
+ receive
+ {MiddleMan, {0, _Footers}} ->
+ % MiddleMan ! {self(), done},
+ ok;
+ {MiddleMan, ChunkRecord} ->
+ ChunkFun(ChunkRecord, ok),
+ write_chunks(MiddleMan, ChunkFun)
+ end.
+
+receive_unchunked_attachment(_Req, 0) ->
+ ok;
+receive_unchunked_attachment(Req, Length) ->
+ receive {MiddleMan, go} ->
+ Data = couch_httpd:recv(Req, 0),
+ MiddleMan ! {self(), Data}
+ end,
+ receive_unchunked_attachment(Req, Length - size(Data)).
+
+middleman(Req, chunked) ->
+ % spawn a process to actually receive the uploaded data
+ RcvFun = fun(ChunkRecord, ok) ->
+ receive {From, go} -> From ! {self(), ChunkRecord} end, ok
+ end,
+ Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end),
+
+ % take requests from the DB writers and get data from the receiver
+ N = erlang:list_to_integer(couch_config:get("cluster","n")),
+ middleman_loop(Receiver, N, dict:new(), 0, []);
+
+middleman(Req, Length) ->
+ Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end),
+ N = erlang:list_to_integer(couch_config:get("cluster","n")),
+ middleman_loop(Receiver, N, dict:new(), 0, []).
+
+middleman_loop(Receiver, N, Counters, Offset, ChunkList) ->
+ receive {From, gimme_data} ->
+ % figure out how far along this writer (From) is in the list
+ {NewCounters, WhichChunk} = case dict:find(From, Counters) of
+ {ok, I} ->
+ {dict:update_counter(From, 1, Counters), I};
+ error ->
+ {dict:store(From, 2, Counters), 1}
+ end,
+ ListIndex = WhichChunk - Offset,
+
+ % talk to the receiver to get another chunk if necessary
+ ChunkList1 = if ListIndex > length(ChunkList) ->
+ Receiver ! {self(), go},
+ receive {Receiver, ChunkRecord} -> ChunkList ++ [ChunkRecord] end;
+ true -> ChunkList end,
+
+ % reply to the writer
+ From ! {self(), lists:nth(ListIndex, ChunkList1)},
+
+ % check if we can drop a chunk from the head of the list
+ SmallestIndex = dict:fold(fun(_, Val, Acc) -> lists:min([Val,Acc]) end,
+ WhichChunk+1, NewCounters),
+ Size = dict:size(NewCounters),
+
+ {NewChunkList, NewOffset} =
+ if Size == N andalso (SmallestIndex - Offset) == 2 ->
+ {tl(ChunkList1), Offset+1};
+ true ->
+ {ChunkList1, Offset}
+ end,
+ middleman_loop(Receiver, N, NewCounters, NewOffset, NewChunkList)
+ after 10000 ->
+ ok
+ end.
diff --git a/deps/fabric/src/fabric_doc_missing_revs.erl b/deps/fabric/src/fabric_doc_missing_revs.erl
new file mode 100644
index 00000000..2dd04a70
--- /dev/null
+++ b/deps/fabric/src/fabric_doc_missing_revs.erl
@@ -0,0 +1,90 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_missing_revs).
+
+-export([go/2, go/3]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+go(DbName, AllIdsRevs) ->
+ go(DbName, AllIdsRevs, []).
+
+go(DbName, AllIdsRevs, Options) ->
+ Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) ->
+ Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs,
+ Options]}),
+ Shard#shard{ref=Ref}
+ end, group_idrevs_by_shard(DbName, AllIdsRevs)),
+ ResultDict = dict:from_list([{Id, {{nil,Revs},[]}} || {Id, Revs} <- AllIdsRevs]),
+ RexiMon = fabric_util:create_monitors(Workers),
+ Acc0 = {length(Workers), ResultDict, Workers},
+ try
+ fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0)
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {_WorkerLen, ResultDict, Workers}) ->
+ NewWorkers = [W || #shard{node=Node} = W <- Workers, Node =/= NodeRef],
+ skip_message({fabric_dict:size(NewWorkers), ResultDict, NewWorkers});
+handle_message({rexi_EXIT, _}, Worker, {W, D, Workers}) ->
+ skip_message({W-1,D,lists:delete(Worker, Workers)});
+handle_message({ok, Results}, _Worker, {1, D0, _}) ->
+ D = update_dict(D0, Results),
+ {stop, dict:fold(fun force_reply/3, [], D)};
+handle_message({ok, Results}, Worker, {WaitingCount, D0, Workers}) ->
+ D = update_dict(D0, Results),
+ case dict:fold(fun maybe_reply/3, {stop, []}, D) of
+ continue ->
+ % still haven't heard about some Ids
+ {ok, {WaitingCount - 1, D, lists:delete(Worker,Workers)}};
+ {stop, FinalReply} ->
+ % finished, stop the rest of the jobs
+ fabric_util:cleanup(lists:delete(Worker,Workers)),
+ {stop, FinalReply}
+ end.
+
+force_reply(Id, {{nil,Revs}, Anc}, Acc) ->
+ % never heard about this ID, assume it's missing
+ [{Id, Revs, Anc} | Acc];
+force_reply(_, {[], _}, Acc) ->
+ Acc;
+force_reply(Id, {Revs, Anc}, Acc) ->
+ [{Id, Revs, Anc} | Acc].
+
+maybe_reply(_, _, continue) ->
+ continue;
+maybe_reply(_, {{nil, _}, _}, _) ->
+ continue;
+maybe_reply(_, {[], _}, {stop, Acc}) ->
+ {stop, Acc};
+maybe_reply(Id, {Revs, Anc}, {stop, Acc}) ->
+ {stop, [{Id, Revs, Anc} | Acc]}.
+
+group_idrevs_by_shard(DbName, IdsRevs) ->
+ dict:to_list(lists:foldl(fun({Id, Revs}, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, {Id, Revs}, D1)
+ end, D0, mem3:shards(DbName,Id))
+ end, dict:new(), IdsRevs)).
+
+update_dict(D0, KVs) ->
+ lists:foldl(fun({K,V,A}, D1) -> dict:store(K, {V,A}, D1) end, D0, KVs).
+
+skip_message({0, Dict, _Workers}) ->
+ {stop, dict:fold(fun force_reply/3, [], Dict)};
+skip_message(Acc) ->
+ {ok, Acc}.
diff --git a/deps/fabric/src/fabric_doc_open.erl b/deps/fabric/src/fabric_doc_open.erl
new file mode 100644
index 00000000..9e466b7a
--- /dev/null
+++ b/deps/fabric/src/fabric_doc_open.erl
@@ -0,0 +1,139 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_open).
+
+-export([go/3]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, Id, Options) ->
+ Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc,
+ [Id, [deleted|Options]]),
+ SuppressDeletedDoc = not lists:member(deleted, Options),
+ N = mem3:n(DbName),
+ R = couch_util:get_value(r, Options, integer_to_list(mem3:quorum(DbName))),
+ RepairOpts = [{r, integer_to_list(N)} | Options],
+ Acc0 = {Workers, erlang:min(N, list_to_integer(R)), []},
+ RexiMon = fabric_util:create_monitors(Workers),
+ try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, Reply} ->
+ format_reply(Reply, SuppressDeletedDoc);
+ {error, needs_repair, Reply} ->
+ spawn(fabric, open_revs, [DbName, Id, all, RepairOpts]),
+ format_reply(Reply, SuppressDeletedDoc);
+ {error, needs_repair} ->
+ % we couldn't determine the correct reply, so we'll run a sync repair
+ {ok, Results} = fabric:open_revs(DbName, Id, all, RepairOpts),
+ case lists:partition(fun({ok, #doc{deleted=Del}}) -> Del end, Results) of
+ {[], []} ->
+ {not_found, missing};
+ {_DeletedDocs, []} when SuppressDeletedDoc ->
+ {not_found, deleted};
+ {DeletedDocs, []} ->
+ lists:last(lists:sort(DeletedDocs));
+ {_, LiveDocs} ->
+ lists:last(lists:sort(LiveDocs))
+ end;
+ Error ->
+ Error
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+format_reply({ok, #doc{deleted=true}}, true) ->
+ {not_found, deleted};
+format_reply(Else, _) ->
+ Else.
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Workers, R, Replies}) ->
+ NewWorkers = lists:keydelete(NodeRef, #shard.node, Workers),
+ case NewWorkers of
+ [] ->
+ {error, needs_repair};
+ _ ->
+ {ok, {NewWorkers, R, Replies}}
+ end;
+handle_message({rexi_EXIT, _Reason}, Worker, Acc0) ->
+ skip_message(Worker, Acc0);
+handle_message(Reply, Worker, {Workers, R, Replies}) ->
+ NewReplies = fabric_util:update_counter(Reply, 1, Replies),
+ case lists:dropwhile(fun({_,{_, Count}}) -> Count < R end, NewReplies) of
+ [{_,{QuorumReply, _}} | _] ->
+ fabric_util:cleanup(lists:delete(Worker,Workers)),
+ case {NewReplies, fabric_util:remove_ancestors(NewReplies, [])} of
+ {[_], [_]} ->
+ % complete agreement amongst all copies
+ {stop, QuorumReply};
+ {[_|_], [{_, {QuorumReply, _}}]} ->
+ % any divergent replies are ancestors of the QuorumReply
+ {error, needs_repair, QuorumReply};
+ _Else ->
+ % real disagreement amongst the workers, block for the repair
+ {error, needs_repair}
+ end;
+ [] ->
+ if length(Workers) =:= 1 ->
+ {error, needs_repair};
+ true ->
+ {ok, {lists:delete(Worker,Workers), R, NewReplies}}
+ end
+ end.
+
+skip_message(_Worker, {Workers, _R, _Replies}) when length(Workers) =:= 1 ->
+ {error, needs_repair};
+skip_message(Worker, {Workers, R, Replies}) ->
+ {ok, {lists:delete(Worker,Workers), R, Replies}}.
+
+
+open_doc_test() ->
+ Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}},
+ Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}},
+ Bar1 = {ok, #doc{revs = {1,[<<"bar">>]}}},
+ Baz1 = {ok, #doc{revs = {1,[<<"baz">>]}}},
+ NF = {not_found, missing},
+ State0 = {[nil, nil, nil], 2, []},
+ State1 = {[nil, nil], 2, [fabric_util:kv(Foo1,1)]},
+ State2 = {[nil], 2, [fabric_util:kv(Bar1,1), fabric_util:kv(Foo1,1)]},
+ State3 = {[nil], 2, [fabric_util:kv(Foo1,1), fabric_util:kv(Foo2,1)]},
+ ?assertEqual({ok, State1}, handle_message(Foo1, nil, State0)),
+
+ % normal case - quorum reached, no disagreement
+ ?assertEqual({stop, Foo1}, handle_message(Foo1, nil, State1)),
+
+ % 2nd worker disagrees, voting continues
+ ?assertEqual({ok, State2}, handle_message(Bar1, nil, State1)),
+
+ % 3rd worker resolves voting, but repair is needed
+ ?assertEqual({error, needs_repair}, handle_message(Foo1, nil, State2)),
+
+ % 2nd worker comes up with descendant of Foo1, voting continues
+ ?assertEqual({ok, State3}, handle_message(Foo2, nil, State1)),
+
+ % 3rd worker is also a descendant so run repair async
+ ?assertEqual({error, needs_repair, Foo2}, handle_message(Foo2, nil,
+ State3)),
+
+ % We only run async repair when every revision is part of the same branch
+ ?assertEqual({error, needs_repair}, handle_message(Bar1, nil, State3)),
+
+ % not_found is considered to be an ancestor of everybody
+ {ok, State4} = handle_message(NF, nil, State1),
+ ?assertEqual({error, needs_repair, Foo1}, handle_message(Foo1, nil,
+ State4)),
+
+ % 3 distinct edit branches result in quorum failure
+ ?assertEqual({error, needs_repair}, handle_message(Baz1, nil, State2)).
diff --git a/deps/fabric/src/fabric_doc_open_revs.erl b/deps/fabric/src/fabric_doc_open_revs.erl
new file mode 100644
index 00000000..395789ca
--- /dev/null
+++ b/deps/fabric/src/fabric_doc_open_revs.erl
@@ -0,0 +1,307 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_open_revs).
+
+-export([go/4]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-record(state, {
+ dbname,
+ worker_count,
+ workers,
+ reply_count = 0,
+ r,
+ revs,
+ latest,
+ replies = []
+}).
+
+go(DbName, Id, Revs, Options) ->
+ Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs,
+ [Id, Revs, Options]),
+ R = couch_util:get_value(r, Options, integer_to_list(mem3:quorum(DbName))),
+ State = #state{
+ dbname = DbName,
+ worker_count = length(Workers),
+ workers = Workers,
+ r = list_to_integer(R),
+ revs = Revs,
+ latest = lists:member(latest, Options),
+ replies = case Revs of all -> []; Revs -> [{Rev,[]} || Rev <- Revs] end
+ },
+ RexiMon = fabric_util:create_monitors(Workers),
+ try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of
+ {ok, {ok, Reply}} ->
+ {ok, Reply};
+ Else ->
+ Else
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, #state{workers=Workers}=State) ->
+ NewWorkers = lists:keydelete(NodeRef, #shard.node, Workers),
+ skip(State#state{workers=NewWorkers});
+handle_message({rexi_EXIT, _}, Worker, #state{workers=Workers}=State) ->
+ skip(State#state{workers=lists:delete(Worker,Workers)});
+handle_message({ok, RawReplies}, Worker, #state{revs = all} = State) ->
+ #state{
+ dbname = DbName,
+ reply_count = ReplyCount,
+ worker_count = WorkerCount,
+ workers = Workers,
+ replies = All0,
+ r = R
+ } = State,
+ All = lists:foldl(fun(Reply,D) -> fabric_util:update_counter(Reply,1,D) end,
+ All0, RawReplies),
+ Reduced = fabric_util:remove_ancestors(All, []),
+ Complete = (ReplyCount =:= (WorkerCount - 1)),
+ QuorumMet = lists:all(fun({_,{_, C}}) -> C >= R end, Reduced),
+ case Reduced of All when QuorumMet andalso ReplyCount =:= (R-1) ->
+ Repair = false;
+ _ ->
+ Repair = [D || {_,{{ok,D}, _}} <- Reduced]
+ end,
+ case maybe_reply(DbName, Reduced, Complete, Repair, R) of
+ noreply ->
+ {ok, State#state{replies = All, reply_count = ReplyCount+1,
+ workers = lists:delete(Worker,Workers)}};
+ {reply, FinalReply} ->
+ fabric_util:cleanup(lists:delete(Worker,Workers)),
+ {stop, FinalReply}
+ end;
+handle_message({ok, RawReplies0}, Worker, State) ->
+ % we've got an explicit revision list, but if latest=true the workers may
+ % return a descendant of the requested revision. Take advantage of the
+ % fact that revisions are returned in order to keep track.
+ RawReplies = strip_not_found_missing(RawReplies0),
+ #state{
+ dbname = DbName,
+ reply_count = ReplyCount,
+ worker_count = WorkerCount,
+ workers = Workers,
+ replies = All0,
+ r = R
+ } = State,
+ All = lists:zipwith(fun({Rev, D}, Reply) ->
+ if Reply =:= error -> {Rev, D}; true ->
+ {Rev, fabric_util:update_counter(Reply, 1, D)}
+ end
+ end, All0, RawReplies),
+ Reduced = [fabric_util:remove_ancestors(X, []) || {_, X} <- All],
+ FinalReplies = [choose_winner(X, R) || X <- Reduced, X =/= []],
+ Complete = (ReplyCount =:= (WorkerCount - 1)),
+ case is_repair_needed(All, FinalReplies) of
+ true ->
+ Repair = [D || {_,{{ok,D}, _}} <- lists:flatten(Reduced)];
+ false ->
+ Repair = false
+ end,
+ case maybe_reply(DbName, FinalReplies, Complete, Repair, R) of
+ noreply ->
+ {ok, State#state{replies = All, reply_count = ReplyCount+1,
+ workers=lists:delete(Worker,Workers)}};
+ {reply, FinalReply} ->
+ fabric_util:cleanup(lists:delete(Worker,Workers)),
+ {stop, FinalReply}
+ end.
+
+skip(#state{revs=all} = State) ->
+ handle_message({ok, []}, nil, State);
+skip(#state{revs=Revs} = State) ->
+ handle_message({ok, [error || _Rev <- Revs]}, nil, State).
+
+maybe_reply(_, [], false, _, _) ->
+ noreply;
+maybe_reply(DbName, ReplyDict, Complete, RepairDocs, R) ->
+ case Complete orelse lists:all(fun({_,{_, C}}) -> C >= R end, ReplyDict) of
+ true ->
+ maybe_execute_read_repair(DbName, RepairDocs),
+ {reply, unstrip_not_found_missing(extract_replies(ReplyDict))};
+ false ->
+ noreply
+ end.
+
+extract_replies(Replies) ->
+ lists:map(fun({_,{Reply,_}}) -> Reply end, Replies).
+
+choose_winner(Options, R) ->
+ case lists:dropwhile(fun({_,{_Reply, C}}) -> C < R end, Options) of
+ [] ->
+ case [Elem || {_,{{ok, #doc{}}, _}} = Elem <- Options] of
+ [] ->
+ hd(Options);
+ Docs ->
+ lists:last(lists:sort(Docs))
+ end;
+ [QuorumMet | _] ->
+ QuorumMet
+ end.
+
+% repair needed if any reply other than the winner has been received for a rev
+is_repair_needed([], []) ->
+ false;
+is_repair_needed([{_Rev, [Reply]} | Tail1], [Reply | Tail2]) ->
+ is_repair_needed(Tail1, Tail2);
+is_repair_needed(_, _) ->
+ true.
+
+maybe_execute_read_repair(_Db, false) ->
+ ok;
+maybe_execute_read_repair(Db, Docs) ->
+ [#doc{id=Id} | _] = Docs,
+ Ctx = #user_ctx{roles=[<<"_admin">>]},
+ Res = fabric:update_docs(Db, Docs, [replicated_changes, {user_ctx,Ctx}]),
+ twig:log(notice, "read_repair ~s ~s ~p", [Db, Id, Res]).
+
+% hackery required so that not_found sorts first
+strip_not_found_missing([]) ->
+ [];
+strip_not_found_missing([{{not_found, missing}, Rev} | Rest]) ->
+ [{not_found, Rev} | strip_not_found_missing(Rest)];
+strip_not_found_missing([Else | Rest]) ->
+ [Else | strip_not_found_missing(Rest)].
+
+unstrip_not_found_missing([]) ->
+ [];
+unstrip_not_found_missing([{not_found, Rev} | Rest]) ->
+ [{{not_found, missing}, Rev} | unstrip_not_found_missing(Rest)];
+unstrip_not_found_missing([Else | Rest]) ->
+ [Else | unstrip_not_found_missing(Rest)].
+
+all_revs_test() ->
+ couch_config:start_link([]),
+ meck:new(fabric),
+ meck:expect(fabric, dbname, fun(Name) -> Name end),
+ meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end),
+ State0 = #state{worker_count = 3, workers=[nil,nil,nil], r = 2, revs = all},
+ Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
+ Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
+ Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
+
+ % an empty worker response does not count as meeting quorum
+ ?assertMatch(
+ {ok, #state{workers=[nil,nil]}},
+ handle_message({ok, []}, nil, State0)
+ ),
+
+ ?assertMatch(
+ {ok, #state{workers=[nil, nil]}},
+ handle_message({ok, [Foo1, Bar1]}, nil, State0)
+ ),
+ {ok, State1} = handle_message({ok, [Foo1, Bar1]}, nil, State0),
+
+ % the normal case - workers agree
+ ?assertEqual(
+ {stop, [Bar1, Foo1]},
+ handle_message({ok, [Foo1, Bar1]}, nil, State1)
+ ),
+
+ % a case where the 2nd worker has a newer Foo - currently we're considering
+ % Foo to have reached quorum and execute_read_repair()
+ ?assertEqual(
+ {stop, [Bar1, Foo2]},
+ handle_message({ok, [Foo2, Bar1]}, nil, State1)
+ ),
+
+ % a case where quorum has not yet been reached for Foo
+ ?assertMatch(
+ {ok, #state{}},
+ handle_message({ok, [Bar1]}, nil, State1)
+ ),
+ {ok, State2} = handle_message({ok, [Bar1]}, nil, State1),
+
+ % still no quorum, but all workers have responded. We include Foo1 in the
+ % response and execute_read_repair()
+ ?assertEqual(
+ {stop, [Bar1, Foo1]},
+ handle_message({ok, [Bar1]}, nil, State2)
+ ),
+ meck:unload(fabric),
+ couch_config:stop().
+
+specific_revs_test() ->
+ couch_config:start_link([]),
+ meck:new(fabric),
+ meck:expect(fabric, dbname, fun(Name) -> Name end),
+ meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end),
+ Revs = [{1,<<"foo">>}, {1,<<"bar">>}, {1,<<"baz">>}],
+ State0 = #state{
+ worker_count = 3,
+ workers = [nil, nil, nil],
+ r = 2,
+ revs = Revs,
+ latest = false,
+ replies = [{Rev,[]} || Rev <- Revs]
+ },
+ Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
+ Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
+ Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
+ Baz1 = {{not_found, missing}, {1,<<"baz">>}},
+ Baz2 = {ok, #doc{revs = {1, [<<"baz">>]}}},
+
+ ?assertMatch(
+ {ok, #state{}},
+ handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0)
+ ),
+ {ok, State1} = handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0),
+
+ % the normal case - workers agree
+ ?assertEqual(
+ {stop, [Foo1, Bar1, Baz1]},
+ handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1)
+ ),
+
+ % latest=true, worker responds with Foo2 and we return it
+ State0L = State0#state{latest = true},
+ ?assertMatch(
+ {ok, #state{}},
+ handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L)
+ ),
+ {ok, State1L} = handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L),
+ ?assertEqual(
+ {stop, [Foo2, Bar1, Baz1]},
+ handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State1L)
+ ),
+
+ % Foo1 is included in the read quorum for Foo2
+ ?assertEqual(
+ {stop, [Foo2, Bar1, Baz1]},
+ handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1L)
+ ),
+
+ % {not_found, missing} is included in the quorum for any found revision
+ ?assertEqual(
+ {stop, [Foo2, Bar1, Baz2]},
+ handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State1L)
+ ),
+
+ % a worker failure is skipped
+ ?assertMatch(
+ {ok, #state{}},
+ handle_message({rexi_EXIT, foo}, nil, State1L)
+ ),
+ {ok, State2L} = handle_message({rexi_EXIT, foo}, nil, State1L),
+ ?assertEqual(
+ {stop, [Foo2, Bar1, Baz2]},
+ handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State2L)
+ ),
+ meck:unload(fabric),
+ couch_config:stop().
diff --git a/deps/fabric/src/fabric_doc_update.erl b/deps/fabric/src/fabric_doc_update.erl
new file mode 100644
index 00000000..3ac4e185
--- /dev/null
+++ b/deps/fabric/src/fabric_doc_update.erl
@@ -0,0 +1,297 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_update).
+
+-export([go/3]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(_, [], _) ->
+ {ok, []};
+go(DbName, AllDocs, Opts) ->
+ validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
+ Options = lists:delete(all_or_nothing, Opts),
+ GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
+ Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+ {Shard#shard{ref=Ref}, Docs}
+ end, group_docs_by_shard(DbName, AllDocs)),
+ {Workers, _} = lists:unzip(GroupedDocs),
+ RexiMon = fabric_util:create_monitors(Workers),
+ W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
+ Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs,
+ dict:from_list([{Doc,[]} || Doc <- AllDocs])},
+ try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, {Health, Results}} when Health =:= ok; Health =:= accepted ->
+ {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]};
+ {timeout, Acc} ->
+ {_, _, W1, _, DocReplDict} = Acc,
+ {Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []},
+ DocReplDict),
+ {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]};
+ Else ->
+ Else
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) ->
+ {_, LenDocs, W, GroupedDocs, DocReplyDict} = Acc0,
+ NewGrpDocs = [X || {#shard{node=N}, _} = X <- GroupedDocs, N =/= NodeRef],
+ skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict});
+
+handle_message({rexi_EXIT, _}, Worker, Acc0) ->
+ {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0,
+ NewGrpDocs = lists:keydelete(Worker,1,GrpDocs),
+ skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict});
+handle_message(internal_server_error, Worker, Acc0) ->
+ % happens when we fail to load validation functions in an RPC worker
+ {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0,
+ NewGrpDocs = lists:keydelete(Worker,1,GrpDocs),
+ skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict});
+handle_message({ok, Replies}, Worker, Acc0) ->
+ {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0,
+ {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
+ DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
+ case {WaitingCount, dict:size(DocReplyDict)} of
+ {1, _} ->
+ % last message has arrived, we need to conclude things
+ {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []},
+ DocReplyDict),
+ {stop, {Health, Reply}};
+ {_, DocCount} ->
+ % we've got at least one reply for each document, let's take a look
+ case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of
+ continue ->
+ {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}};
+ {stop, W, FinalReplies} ->
+ {stop, {ok, FinalReplies}}
+ end
+ end;
+handle_message({missing_stub, Stub}, _, _) ->
+ throw({missing_stub, Stub});
+handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
+ {_, _, _, GroupedDocs, _} = Acc0,
+ Docs = couch_util:get_value(Worker, GroupedDocs),
+ handle_message({ok, [X || _D <- Docs]}, Worker, Acc0).
+
+force_reply(Doc, [], {_, W, Acc}) ->
+ {error, W, [{Doc, {error, internal_server_error}} | Acc]};
+force_reply(Doc, [FirstReply|_] = Replies, {Health, W, Acc}) ->
+ case update_quorum_met(W, Replies) of
+ {true, Reply} ->
+ {Health, W, [{Doc,Reply} | Acc]};
+ false ->
+ twig:log(warn, "write quorum (~p) failed for ~s", [W, Doc#doc.id]),
+ case [Reply || {ok, Reply} <- Replies] of
+ [] ->
+ % check if all errors are identical, if so inherit health
+ case lists:all(fun(E) -> E =:= FirstReply end, Replies) of
+ true ->
+ {Health, W, [{Doc, FirstReply} | Acc]};
+ false ->
+ {error, W, [{Doc, FirstReply} | Acc]}
+ end;
+ [AcceptedRev | _] ->
+ NewHealth = case Health of ok -> accepted; _ -> Health end,
+ {NewHealth, W, [{Doc, {accepted,AcceptedRev}} | Acc]}
+ end
+ end.
+
+maybe_reply(_, _, continue) ->
+ % we didn't meet quorum for all docs, so we're fast-forwarding the fold
+ continue;
+maybe_reply(Doc, Replies, {stop, W, Acc}) ->
+ case update_quorum_met(W, Replies) of
+ {true, Reply} ->
+ {stop, W, [{Doc, Reply} | Acc]};
+ false ->
+ continue
+ end.
+
+update_quorum_met(W, Replies) ->
+ Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end,
+ orddict:new(), Replies),
+ case lists:dropwhile(fun({_, Count}) -> Count < W end, Counters) of
+ [] ->
+ false;
+ [{FinalReply, _} | _] ->
+ {true, FinalReply}
+ end.
+
+-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
+group_docs_by_shard(DbName, Docs) ->
+ dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, Doc, D1)
+ end, D0, mem3:shards(DbName,Id))
+ end, dict:new(), Docs)).
+
+append_update_replies([], [], DocReplyDict) ->
+ DocReplyDict;
+append_update_replies([Doc|Rest], [], Dict0) ->
+ % icky, if replicated_changes only errors show up in result
+ append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
+append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
+ % TODO what if the same document shows up twice in one update_docs call?
+ append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
+
+skip_message({0, _, W, _, DocReplyDict}) ->
+ {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict),
+ {stop, {Health, Reply}};
+skip_message(Acc0) ->
+ {ok, Acc0}.
+
+validate_atomic_update(_, _, false) ->
+ ok;
+validate_atomic_update(_DbName, AllDocs, true) ->
+ % TODO actually perform the validation. This requires some hackery, we need
+ % to basically extract the prep_and_validate_updates function from couch_db
+ % and only run that, without actually writing in case of a success.
+ Error = {not_implemented, <<"all_or_nothing is not supported yet">>},
+ PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) ->
+ case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end,
+ {{Id, {Pos, RevId}}, Error}
+ end, AllDocs),
+ throw({aborted, PreCommitFailures}).
+
+% eunits
+doc_update1_test() ->
+ Doc1 = #doc{revs = {1,[<<"foo">>]}},
+ Doc2 = #doc{revs = {1,[<<"bar">>]}},
+ Docs = [Doc1],
+ Docs2 = [Doc2, Doc1],
+ Dict = dict:from_list([{Doc,[]} || Doc <- Docs]),
+ Dict2 = dict:from_list([{Doc,[]} || Doc <- Docs2]),
+
+ Shards =
+ mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+ GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs),
+
+
+ % test for W = 2
+ AccW2 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
+ Dict},
+
+ {ok,{WaitingCountW2_1,_,_,_,_}=AccW2_1} =
+ handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW2),
+ ?assertEqual(WaitingCountW2_1,2),
+ {stop, FinalReplyW2 } =
+ handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW2_1),
+ ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW2),
+
+ % test for W = 3
+ AccW3 = {length(Shards), length(Docs), list_to_integer("3"), GroupedDocs,
+ Dict},
+
+ {ok,{WaitingCountW3_1,_,_,_,_}=AccW3_1} =
+ handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW3),
+ ?assertEqual(WaitingCountW3_1,2),
+
+ {ok,{WaitingCountW3_2,_,_,_,_}=AccW3_2} =
+ handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW3_1),
+ ?assertEqual(WaitingCountW3_2,1),
+
+ {stop, FinalReplyW3 } =
+ handle_message({ok, [{ok, Doc1}]},lists:nth(3,Shards),AccW3_2),
+ ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW3),
+
+ % test w quorum > # shards, which should fail immediately
+
+ Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
+ GroupedDocs2 = group_docs_by_shard_hack(<<"foo">>,Shards2,Docs),
+
+ AccW4 =
+ {length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict},
+ Bool =
+ case handle_message({ok, [{ok, Doc1}]},hd(Shards2),AccW4) of
+ {stop, _Reply} ->
+ true;
+ _ -> false
+ end,
+ ?assertEqual(Bool,true),
+
+ % Docs with no replies should end up as {error, internal_server_error}
+ SA1 = #shard{node=a, range=1},
+ SB1 = #shard{node=b, range=1},
+ SA2 = #shard{node=a, range=2},
+ SB2 = #shard{node=b, range=2},
+ GroupedDocs3 = [{SA1,[Doc1]}, {SB1,[Doc1]}, {SA2,[Doc2]}, {SB2,[Doc2]}],
+ StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2},
+ {ok, StW5_1} = handle_message({ok, [{ok, "A"}]}, SA1, StW5_0),
+ {ok, StW5_2} = handle_message({rexi_EXIT, nil}, SB1, StW5_1),
+ {ok, StW5_3} = handle_message({rexi_EXIT, nil}, SA2, StW5_2),
+ {stop, ReplyW5} = handle_message({rexi_EXIT, nil}, SB2, StW5_3),
+ ?assertEqual(
+ {error, [{Doc1,{accepted,"A"}},{Doc2,{error,internal_server_error}}]},
+ ReplyW5
+ ).
+
+
+doc_update2_test() ->
+ Doc1 = #doc{revs = {1,[<<"foo">>]}},
+ Doc2 = #doc{revs = {1,[<<"bar">>]}},
+ Docs = [Doc2, Doc1],
+ Shards =
+ mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+ GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs),
+ Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
+ dict:from_list([{Doc,[]} || Doc <- Docs])},
+
+ {ok,{WaitingCount1,_,_,_,_}=Acc1} =
+ handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0),
+ ?assertEqual(WaitingCount1,2),
+
+ {ok,{WaitingCount2,_,_,_,_}=Acc2} =
+ handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1),
+ ?assertEqual(WaitingCount2,1),
+
+ {stop, Reply} =
+ handle_message({rexi_EXIT, 1},lists:nth(3,Shards),Acc2),
+
+ ?assertEqual({accepted, [{Doc1,{accepted,Doc2}}, {Doc2,{accepted,Doc1}}]},
+ Reply).
+
+doc_update3_test() ->
+ Doc1 = #doc{revs = {1,[<<"foo">>]}},
+ Doc2 = #doc{revs = {1,[<<"bar">>]}},
+ Docs = [Doc2, Doc1],
+ Shards =
+ mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+ GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs),
+ Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
+ dict:from_list([{Doc,[]} || Doc <- Docs])},
+
+ {ok,{WaitingCount1,_,_,_,_}=Acc1} =
+ handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0),
+ ?assertEqual(WaitingCount1,2),
+
+ {ok,{WaitingCount2,_,_,_,_}=Acc2} =
+ handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1),
+ ?assertEqual(WaitingCount2,1),
+
+ {stop, Reply} =
+ handle_message({ok, [{ok, Doc1},{ok, Doc2}]},lists:nth(3,Shards),Acc2),
+
+ ?assertEqual({ok, [{Doc1, {ok, Doc2}},{Doc2, {ok,Doc1}}]},Reply).
+
+% needed for testing to avoid having to start the mem3 application
+group_docs_by_shard_hack(_DbName, Shards, Docs) ->
+ dict:to_list(lists:foldl(fun(#doc{id=_Id} = Doc, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, Doc, D1)
+ end, D0, Shards)
+ end, dict:new(), Docs)).
diff --git a/deps/fabric/src/fabric_group_info.erl b/deps/fabric/src/fabric_group_info.erl
new file mode 100644
index 00000000..27b0f839
--- /dev/null
+++ b/deps/fabric/src/fabric_group_info.erl
@@ -0,0 +1,100 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_group_info).
+
+-export([go/2]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, GroupId) when is_binary(GroupId) ->
+ {ok, DDoc} = fabric:open_doc(DbName, GroupId, []),
+ go(DbName, DDoc);
+
+go(DbName, #doc{} = DDoc) ->
+ Group = couch_view_group:design_doc_to_view_group(DDoc),
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, group_info, [Group]),
+ RexiMon = fabric_util:create_monitors(Shards),
+ Acc0 = {fabric_dict:init(Workers, nil), []},
+ try
+ fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0)
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
+ case fabric_util:remove_down_workers(Counters, NodeRef) of
+ {ok, NewCounters} ->
+ {ok, {NewCounters, Acc}};
+ error ->
+ {error, {nodedown, <<"progress not possible">>}}
+ end;
+
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+ NewCounters = lists:keydelete(Shard,1,Counters),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, {NewCounters, Acc}};
+ false ->
+ {error, Reason}
+ end;
+
+handle_message({ok, Info}, Shard, {Counters, Acc}) ->
+ case fabric_dict:lookup_element(Shard, Counters) of
+ undefined ->
+ % already heard from someone else in this range
+ {ok, {Counters, Acc}};
+ nil ->
+ C1 = fabric_dict:store(Shard, ok, Counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ case fabric_dict:any(nil, C2) of
+ true ->
+ {ok, {C2, [Info|Acc]}};
+ false ->
+ {stop, merge_results(lists:flatten([Info|Acc]))}
+ end
+ end;
+handle_message(_, _, Acc) ->
+ {ok, Acc}.
+
+merge_results(Info) ->
+ Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end,
+ orddict:new(), Info),
+ orddict:fold(fun
+ (signature, [X|_], Acc) ->
+ [{signature, X} | Acc];
+ (language, [X|_], Acc) ->
+ [{language, X} | Acc];
+ (disk_size, X, Acc) ->
+ [{disk_size, lists:sum(X)} | Acc];
+ (data_size, X, Acc) ->
+ [{data_size, lists:sum(X)} | Acc];
+ (compact_running, X, Acc) ->
+ [{compact_running, lists:member(true, X)} | Acc];
+ (updater_running, X, Acc) ->
+ [{updater_running, lists:member(true, X)} | Acc];
+ (waiting_commit, X, Acc) ->
+ [{waiting_commit, lists:member(true, X)} | Acc];
+ (waiting_clients, X, Acc) ->
+ [{waiting_clients, lists:sum(X)} | Acc];
+ (update_seq, X, Acc) ->
+ [{update_seq, lists:sum(X)} | Acc];
+ (purge_seq, X, Acc) ->
+ [{purge_seq, lists:sum(X)} | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Dict).
diff --git a/deps/fabric/src/fabric_rpc.erl b/deps/fabric/src/fabric_rpc.erl
new file mode 100644
index 00000000..3f25dfd7
--- /dev/null
+++ b/deps/fabric/src/fabric_rpc.erl
@@ -0,0 +1,485 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_rpc).
+
+-export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
+-export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3,
+ update_docs/3]).
+-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]).
+-export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3,
+ set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]).
+
+-include("fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record (view_acc, {
+ db,
+ limit,
+ include_docs,
+ conflicts,
+ doc_info = nil,
+ offset = nil,
+ total_rows,
+ reduce_fun = fun couch_db:enum_docs_reduce_to_count/1,
+ group_level = 0
+}).
+
+%% rpc endpoints
+%% call to with_db will supply your M:F with a #db{} and then remaining args
+
+all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) ->
+ {ok, Db} = get_or_create_db(DbName, []),
+ #view_query_args{
+ start_key = StartKey,
+ start_docid = StartDocId,
+ end_key = EndKey,
+ end_docid = EndDocId,
+ limit = Limit,
+ skip = Skip,
+ include_docs = IncludeDocs,
+ conflicts = Conflicts,
+ direction = Dir,
+ inclusive_end = Inclusive,
+ extra = Extra
+ } = QueryArgs,
+ set_io_priority(DbName, Extra),
+ {ok, Total} = couch_db:get_doc_count(Db),
+ Acc0 = #view_acc{
+ db = Db,
+ include_docs = IncludeDocs,
+ conflicts = Conflicts,
+ limit = Limit+Skip,
+ total_rows = Total
+ },
+ EndKeyType = if Inclusive -> end_key; true -> end_key_gt end,
+ Options = [
+ {dir, Dir},
+ {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end},
+ {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end}
+ ],
+ {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options),
+ final_response(Total, Acc#view_acc.offset).
+
+changes(DbName, Args, StartSeq) ->
+ erlang:put(io_priority, {interactive, DbName}),
+ #changes_args{dir=Dir} = Args,
+ case get_or_create_db(DbName, []) of
+ {ok, Db} ->
+ Enum = fun changes_enumerator/2,
+ Opts = [{dir,Dir}],
+ Acc0 = {Db, StartSeq, Args},
+ try
+ {ok, {_, LastSeq, _}} =
+ couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
+ rexi:reply({complete, LastSeq})
+ after
+ couch_db:close(Db)
+ end;
+ Error ->
+ rexi:reply(Error)
+ end.
+
+map_view(DbName, DDoc, ViewName, QueryArgs) ->
+ {ok, Db} = get_or_create_db(DbName, []),
+ #view_query_args{
+ limit = Limit,
+ skip = Skip,
+ keys = Keys,
+ include_docs = IncludeDocs,
+ conflicts = Conflicts,
+ stale = Stale,
+ view_type = ViewType,
+ extra = Extra
+ } = QueryArgs,
+ set_io_priority(DbName, Extra),
+ {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
+ Group0 = couch_view_group:design_doc_to_view_group(DDoc),
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
+ {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
+ maybe_update_view_group(Pid, LastSeq, Stale),
+ erlang:monitor(process, Group#group.fd),
+ View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType),
+ {ok, Total} = couch_view:get_row_count(View),
+ Acc0 = #view_acc{
+ db = Db,
+ include_docs = IncludeDocs,
+ conflicts = Conflicts,
+ limit = Limit+Skip,
+ total_rows = Total,
+ reduce_fun = fun couch_view:reduce_to_count/1
+ },
+ case Keys of
+ nil ->
+ Options = couch_httpd_view:make_key_options(QueryArgs),
+ {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options);
+ _ ->
+ Acc = lists:foldl(fun(Key, AccIn) ->
+ KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key},
+ Options = couch_httpd_view:make_key_options(KeyArgs),
+ {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn,
+ Options),
+ Out
+ end, Acc0, Keys)
+ end,
+ final_response(Total, Acc#view_acc.offset).
+
+reduce_view(DbName, Group0, ViewName, QueryArgs) ->
+ erlang:put(io_priority, {interactive, DbName}),
+ {ok, Db} = get_or_create_db(DbName, []),
+ #view_query_args{
+ group_level = GroupLevel,
+ limit = Limit,
+ skip = Skip,
+ keys = Keys,
+ stale = Stale,
+ extra = Extra
+ } = QueryArgs,
+ set_io_priority(DbName, Extra),
+ GroupFun = group_rows_fun(GroupLevel),
+ {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
+ {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
+ maybe_update_view_group(Pid, LastSeq, Stale),
+ #group{views=Views, def_lang=Lang, fd=Fd} = Group,
+ erlang:monitor(process, Fd),
+ {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
+ ReduceView = {reduce, NthRed, Lang, View},
+ Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
+ case Keys of
+ nil ->
+ Options0 = couch_httpd_view:make_key_options(QueryArgs),
+ Options = [{key_group_fun, GroupFun} | Options0],
+ couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options);
+ _ ->
+ lists:map(fun(Key) ->
+ KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key},
+ Options0 = couch_httpd_view:make_key_options(KeyArgs),
+ Options = [{key_group_fun, GroupFun} | Options0],
+ couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options)
+ end, Keys)
+ end,
+ rexi:reply(complete).
+
+calculate_seqs(Db, Stale) ->
+ LastSeq = couch_db:get_update_seq(Db),
+ if
+ Stale == ok orelse Stale == update_after ->
+ {LastSeq, 0};
+ true ->
+ {LastSeq, LastSeq}
+ end.
+
+maybe_update_view_group(GroupPid, LastSeq, update_after) ->
+ couch_view_group:trigger_group_update(GroupPid, LastSeq);
+maybe_update_view_group(_, _, _) ->
+ ok.
+
+create_db(DbName) ->
+ rexi:reply(case couch_server:create(DbName, []) of
+ {ok, _} ->
+ ok;
+ Error ->
+ Error
+ end).
+
+create_shard_db_doc(_, Doc) ->
+ rexi:reply(mem3_util:write_db_doc(Doc)).
+
+delete_db(DbName) ->
+ couch_server:delete(DbName, []).
+
+delete_shard_db_doc(_, DocId) ->
+ rexi:reply(mem3_util:delete_db_doc(DocId)).
+
+get_db_info(DbName) ->
+ with_db(DbName, [], {couch_db, get_db_info, []}).
+
+get_doc_count(DbName) ->
+ with_db(DbName, [], {couch_db, get_doc_count, []}).
+
+get_update_seq(DbName) ->
+ with_db(DbName, [], {couch_db, get_update_seq, []}).
+
+set_security(DbName, SecObj, Options) ->
+ with_db(DbName, Options, {couch_db, set_security, [SecObj]}).
+
+set_revs_limit(DbName, Limit, Options) ->
+ with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
+
+open_doc(DbName, DocId, Options) ->
+ with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
+
+open_revs(DbName, Id, Revs, Options) ->
+ with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}).
+
+get_missing_revs(DbName, IdRevsList) ->
+ get_missing_revs(DbName, IdRevsList, []).
+
+get_missing_revs(DbName, IdRevsList, Options) ->
+ % reimplement here so we get [] for Ids with no missing revs in response
+ set_io_priority(DbName, Options),
+ rexi:reply(case get_or_create_db(DbName, Options) of
+ {ok, Db} ->
+ Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
+ {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) ->
+ case FullDocInfoResult of
+ {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} ->
+ MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs),
+ {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)};
+ not_found ->
+ {Id, Revs, []}
+ end
+ end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))};
+ Error ->
+ Error
+ end).
+
+update_docs(DbName, Docs0, Options) ->
+ case proplists:get_value(replicated_changes, Options) of
+ true ->
+ X = replicated_changes;
+ _ ->
+ X = interactive_edit
+ end,
+ Docs = make_att_readers(Docs0),
+ with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
+
+group_info(DbName, Group0) ->
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
+ rexi:reply(couch_view_group:request_group_info(Pid)).
+
+reset_validation_funs(DbName) ->
+ case get_or_create_db(DbName, []) of
+ {ok, #db{main_pid = Pid}} ->
+ gen_server:cast(Pid, {load_validation_funs, undefined});
+ _ ->
+ ok
+ end.
+
+%%
+%% internal
+%%
+
+with_db(DbName, Options, {M,F,A}) ->
+ set_io_priority(DbName, Options),
+ case get_or_create_db(DbName, Options) of
+ {ok, Db} ->
+ rexi:reply(try
+ apply(M, F, [Db | A])
+ catch Exception ->
+ Exception;
+ error:Reason ->
+ twig:log(error, "rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason,
+ clean_stack()]),
+ {error, Reason}
+ end);
+ Error ->
+ rexi:reply(Error)
+ end.
+
+get_or_create_db(DbName, Options) ->
+ case couch_db:open_int(DbName, Options) of
+ {not_found, no_db_file} ->
+ twig:log(warn, "~p creating ~s", [?MODULE, DbName]),
+ couch_server:create(DbName, Options);
+ Else ->
+ Else
+ end.
+
+view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
+ % matches for _all_docs and translates #full_doc_info{} -> KV pair
+ case couch_doc:to_doc_info(FullDocInfo) of
+ #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI ->
+ Value = {[{rev,couch_doc:rev_to_str(Rev)}]},
+ view_fold({{Id,Id}, Value}, OffsetReds, Acc#view_acc{doc_info=DI});
+ #doc_info{revs=[#rev_info{deleted=true}|_]} ->
+ {ok, Acc}
+ end;
+view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) ->
+ % calculates the offset for this shard
+ #view_acc{reduce_fun=Reduce} = Acc,
+ Offset = Reduce(OffsetReds),
+ case rexi:sync_reply({total_and_offset, Total, Offset}) of
+ ok ->
+ view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset});
+ stop ->
+ exit(normal);
+ timeout ->
+ exit(timeout)
+ end;
+view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) ->
+ % we scanned through limit+skip local rows
+ {stop, Acc};
+view_fold({{Key,Id}, Value}, _Offset, Acc) ->
+ % the normal case
+ #view_acc{
+ db = Db,
+ doc_info = DocInfo,
+ limit = Limit,
+ conflicts = Conflicts,
+ include_docs = IncludeDocs
+ } = Acc,
+ case Value of {Props} ->
+ LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined);
+ _ ->
+ LinkedDocs = false
+ end,
+ if LinkedDocs ->
+ % we'll embed this at a higher level b/c the doc may be non-local
+ Doc = undefined;
+ IncludeDocs ->
+ IdOrInfo = if DocInfo =/= nil -> DocInfo; true -> Id end,
+ Options = if Conflicts -> [conflicts]; true -> [] end,
+ case couch_db:open_doc(Db, IdOrInfo, Options) of
+ {not_found, deleted} ->
+ Doc = null;
+ {not_found, missing} ->
+ Doc = undefined;
+ {ok, Doc0} ->
+ Doc = couch_doc:to_json_obj(Doc0, [])
+ end;
+ true ->
+ Doc = undefined
+ end,
+ case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
+ ok ->
+ {ok, Acc#view_acc{limit=Limit-1}};
+ timeout ->
+ exit(timeout)
+ end.
+
+final_response(Total, nil) ->
+ case rexi:sync_reply({total_and_offset, Total, Total}) of ok ->
+ rexi:reply(complete);
+ stop ->
+ ok;
+ timeout ->
+ exit(timeout)
+ end;
+final_response(_Total, _Offset) ->
+ rexi:reply(complete).
+
+%% TODO: handle case of bogus group level
+group_rows_fun(exact) ->
+ fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end;
+group_rows_fun(0) ->
+ fun(_A, _B) -> true end;
+group_rows_fun(GroupLevel) when is_integer(GroupLevel) ->
+ fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) ->
+ lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel);
+ ({Key1,_}, {Key2,_}) ->
+ Key1 == Key2
+ end.
+
+reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) ->
+ {stop, Acc};
+reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) ->
+ send(null, Red, Acc);
+reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) ->
+ send(Key, Red, Acc);
+reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
+ send(lists:sublist(K, I), Red, Acc);
+reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 ->
+ send(K, Red, Acc).
+
+
+send(Key, Value, #view_acc{limit=Limit} = Acc) ->
+ case rexi:sync_reply(#view_row{key=Key, value=Value}) of
+ ok ->
+ {ok, Acc#view_acc{limit=Limit-1}};
+ stop ->
+ exit(normal);
+ timeout ->
+ exit(timeout)
+ end.
+
+changes_enumerator(DocInfo, {Db, _Seq, Args}) ->
+ #changes_args{
+ include_docs = IncludeDocs,
+ filter = Acc,
+ conflicts = Conflicts
+ } = Args,
+ #doc_info{high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo,
+ case [X || X <- couch_changes:filter(DocInfo, Acc), X /= null] of
+ [] ->
+ {ok, {Db, Seq, Args}};
+ Results ->
+ Opts = if Conflicts -> [conflicts]; true -> [] end,
+ ChangesRow = changes_row(Db, DocInfo, Results, Del, IncludeDocs, Opts),
+ Go = rexi:sync_reply(ChangesRow),
+ {Go, {Db, Seq, Args}}
+ end.
+
+changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
+ Doc = doc_member(Db, DI, Opts),
+ #change{key=Seq, id=Id, value=Results, doc=Doc, deleted=Del};
+changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
+ #change{key=Seq, id=Id, value=Results, deleted=true};
+changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
+ #change{key=Seq, id=Id, value=Results}.
+
+doc_member(Shard, DocInfo, Opts) ->
+ case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
+ {ok, Doc} ->
+ couch_doc:to_json_obj(Doc, []);
+ Error ->
+ Error
+ end.
+
+possible_ancestors(_FullInfo, []) ->
+ [];
+possible_ancestors(FullInfo, MissingRevs) ->
+ #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
+ LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
+ % Find the revs that are possible parents of this rev
+ lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
+ % this leaf is a "possible ancenstor" of the missing
+ % revs if this LeafPos lessthan any of the missing revs
+ case lists:any(fun({MissingPos, _}) ->
+ LeafPos < MissingPos end, MissingRevs) of
+ true ->
+ [{LeafPos, LeafRevId} | Acc];
+ false ->
+ Acc
+ end
+ end, [], LeafRevs).
+
+make_att_readers([]) ->
+ [];
+make_att_readers([#doc{atts=Atts0} = Doc | Rest]) ->
+ % % go through the attachments looking for 'follows' in the data,
+ % % replace with function that reads the data from MIME stream.
+ Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0],
+ [Doc#doc{atts = Atts} | make_att_readers(Rest)].
+
+make_att_reader({follows, Parser}) ->
+ fun() ->
+ Parser ! {get_bytes, self()},
+ receive {bytes, Bytes} -> Bytes end
+ end;
+make_att_reader(Else) ->
+ Else.
+
+clean_stack() ->
+ lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end,
+ erlang:get_stacktrace()).
+
+set_io_priority(DbName, Options) ->
+ case lists:keyfind(io_priority, 1, Options) of
+ {io_priority, Pri} ->
+ erlang:put(io_priority, Pri);
+ false ->
+ erlang:put(io_priority, {interactive, DbName})
+ end.
diff --git a/deps/fabric/src/fabric_util.erl b/deps/fabric/src/fabric_util.erl
new file mode 100644
index 00000000..42fe900f
--- /dev/null
+++ b/deps/fabric/src/fabric_util.erl
@@ -0,0 +1,168 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_util).
+
+-export([submit_jobs/3, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1,
+ update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
+ remove_down_workers/2]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+remove_down_workers(Workers, BadNode) ->
+ Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
+ NewWorkers = fabric_dict:filter(Filter, Workers),
+ case fabric_view:is_progress_possible(NewWorkers) of
+ true ->
+ {ok, NewWorkers};
+ false ->
+ error
+ end.
+
+submit_jobs(Shards, EndPoint, ExtraArgs) ->
+ lists:map(fun(#shard{node=Node, name=ShardName} = Shard) ->
+ Ref = rexi:cast(Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}),
+ Shard#shard{ref = Ref}
+ end, Shards).
+
+cleanup(Workers) ->
+ [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
+
+recv(Workers, Keypos, Fun, Acc0) ->
+ Timeout = case couch_config:get("fabric", "request_timeout", "60000") of
+ "infinity" -> infinity;
+ N -> list_to_integer(N)
+ end,
+ rexi_utils:recv(Workers, Keypos, Fun, Acc0, Timeout, infinity).
+
+
+get_db(DbName) ->
+ get_db(DbName, []).
+
+get_db(DbName, Options) ->
+ % prefer local shards
+ {Local, Remote} = lists:partition(fun(S) -> S#shard.node =:= node() end,
+ mem3:shards(DbName)),
+ % suppress shards from down nodes
+ Nodes = erlang:nodes(),
+ Live = [S || #shard{node = N} = S <- Remote, lists:member(N, Nodes)],
+ % sort the live remote shards so that we don't repeatedly try the same node
+ get_shard(Local ++ lists:keysort(#shard.name, Live), Options, 100).
+
+get_shard([], _Opts, _Timeout) ->
+ erlang:error({internal_server_error, "No DB shards could be opened."});
+get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout) ->
+ case rpc:call(Node, couch_db, open, [Name, [{timeout, Timeout} | Opts]]) of
+ {ok, Db} ->
+ {ok, Db};
+ {unauthorized, _} = Error ->
+ throw(Error);
+ {badrpc, {'EXIT', {timeout, _}}} ->
+ get_shard(Rest, Opts, 2*Timeout);
+ _Else ->
+ get_shard(Rest, Opts, Timeout)
+ end.
+
+error_info({{<<"reduce_overflow_error">>, _} = Error, _Stack}) ->
+ Error;
+error_info({{timeout, _} = Error, _Stack}) ->
+ Error;
+error_info({{Error, Reason}, Stack}) ->
+ {Error, Reason, Stack};
+error_info({Error, Stack}) ->
+ {Error, nil, Stack}.
+
+update_counter(Item, Incr, D) ->
+ UpdateFun = fun ({Old, Count}) -> {Old, Count + Incr} end,
+ orddict:update(make_key(Item), UpdateFun, {Item, Incr}, D).
+
+make_key({ok, L}) when is_list(L) ->
+ make_key(L);
+make_key([]) ->
+ [];
+make_key([{ok, #doc{revs= {Pos,[RevId | _]}}} | Rest]) ->
+ [{ok, {Pos, RevId}} | make_key(Rest)];
+make_key([{{not_found, missing}, Rev} | Rest]) ->
+ [{not_found, Rev} | make_key(Rest)];
+make_key({ok, #doc{id=Id,revs=Revs}}) ->
+ {Id, Revs};
+make_key(Else) ->
+ Else.
+
+% this presumes the incoming list is sorted, i.e. shorter revlists come first
+remove_ancestors([], Acc) ->
+ lists:reverse(Acc);
+remove_ancestors([{_, {{not_found, _}, Count}} = Head | Tail], Acc) ->
+ % any document is a descendant
+ case lists:filter(fun({_,{{ok, #doc{}}, _}}) -> true; (_) -> false end, Tail) of
+ [{_,{{ok, #doc{}} = Descendant, _}} | _] ->
+ remove_ancestors(update_counter(Descendant, Count, Tail), Acc);
+ [] ->
+ remove_ancestors(Tail, [Head | Acc])
+ end;
+remove_ancestors([{_,{{ok, #doc{revs = {Pos, Revs}}}, Count}} = Head | Tail], Acc) ->
+ Descendants = lists:dropwhile(fun
+ ({_,{{ok, #doc{revs = {Pos2, Revs2}}}, _}}) ->
+ case lists:nthtail(erlang:min(Pos2 - Pos, length(Revs2)), Revs2) of
+ [] ->
+ % impossible to tell if Revs2 is a descendant - assume no
+ true;
+ History ->
+ % if Revs2 is a descendant, History is a prefix of Revs
+ not lists:prefix(History, Revs)
+ end
+ end, Tail),
+ case Descendants of [] ->
+ remove_ancestors(Tail, [Head | Acc]);
+ [{Descendant, _} | _] ->
+ remove_ancestors(update_counter(Descendant, Count, Tail), Acc)
+ end;
+remove_ancestors([Error | Tail], Acc) ->
+ remove_ancestors(Tail, [Error | Acc]).
+
+create_monitors(Shards) ->
+ MonRefs = lists:usort([{rexi_server, N} || #shard{node=N} <- Shards]),
+ rexi_monitor:start(MonRefs).
+
+%% verify only id and rev are used in key.
+update_counter_test() ->
+ Reply = {ok, #doc{id = <<"id">>, revs = <<"rev">>,
+ body = <<"body">>, atts = <<"atts">>}},
+ ?assertEqual([{{<<"id">>,<<"rev">>}, {Reply, 1}}],
+ update_counter(Reply, 1, [])).
+
+remove_ancestors_test() ->
+ Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
+ Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
+ Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
+ Bar2 = {not_found, {1,<<"bar">>}},
+ ?assertEqual(
+ [kv(Bar1,1), kv(Foo1,1)],
+ remove_ancestors([kv(Bar1,1), kv(Foo1,1)], [])
+ ),
+ ?assertEqual(
+ [kv(Bar1,1), kv(Foo2,2)],
+ remove_ancestors([kv(Bar1,1), kv(Foo1,1), kv(Foo2,1)], [])
+ ),
+ ?assertEqual(
+ [kv(Bar1,2)],
+ remove_ancestors([kv(Bar2,1), kv(Bar1,1)], [])
+ ).
+
+%% test function
+kv(Item, Count) ->
+ {make_key(Item), {Item,Count}}.
diff --git a/deps/fabric/src/fabric_view.erl b/deps/fabric/src/fabric_view.erl
new file mode 100644
index 00000000..fa2127e7
--- /dev/null
+++ b/deps/fabric/src/fabric_view.erl
@@ -0,0 +1,362 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_view).
+
+-export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
+ maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1,
+ extract_view/4, get_shards/2, remove_down_shards/2]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-spec remove_down_shards(#collector{}, node()) ->
+ {ok, #collector{}} | {error, any()}.
+remove_down_shards(Collector, BadNode) ->
+ #collector{callback=Callback, counters=Counters, user_acc=Acc} = Collector,
+ case fabric_util:remove_down_workers(Counters, BadNode) of
+ {ok, NewCounters} ->
+ {ok, Collector#collector{counters = NewCounters}};
+ error ->
+ Reason = {nodedown, <<"progress not possible">>},
+ Callback({error, Reason}, Acc)
+ end.
+
+%% @doc looks for a fully covered keyrange in the list of counters
+-spec is_progress_possible([{#shard{}, term()}]) -> boolean().
+is_progress_possible([]) ->
+ false;
+is_progress_possible(Counters) ->
+ Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end,
+ [], Counters),
+ [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges),
+ Result = lists:foldl(fun
+ (_, fail) ->
+ % we've already declared failure
+ fail;
+ (_, complete) ->
+ % this is the success condition, we can fast-forward
+ complete;
+ ({X,_}, Tail) when X > (Tail+1) ->
+ % gap in the keyrange, we're dead
+ fail;
+ ({_,Y}, Tail) ->
+ case erlang:max(Tail, Y) of
+ End when (End+1) =:= (2 bsl 31) ->
+ complete;
+ Else ->
+ % the normal condition, adding to the tail
+ Else
+ end
+ end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest),
+ (Start =:= 0) andalso (Result =:= complete).
+
+-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
+ [{#shard{}, any()}].
+remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) ->
+ fabric_dict:filter(fun(#shard{range=[X,Y], node=Node, ref=Ref} = Shard, _) ->
+ if Shard =:= Shard0 ->
+ % we can't remove ourselves
+ true;
+ A < B, X >= A, X < B ->
+ % lower bound is inside our range
+ rexi:kill(Node, Ref),
+ false;
+ A < B, Y > A, Y =< B ->
+ % upper bound is inside our range
+ rexi:kill(Node, Ref),
+ false;
+ B < A, X >= A orelse B < A, X < B ->
+ % target shard wraps the key range, lower bound is inside
+ rexi:kill(Node, Ref),
+ false;
+ B < A, Y > A orelse B < A, Y =< B ->
+ % target shard wraps the key range, upper bound is inside
+ rexi:kill(Node, Ref),
+ false;
+ true ->
+ true
+ end
+ end, Shards).
+
+maybe_pause_worker(Worker, From, State) ->
+ #collector{buffer_size = BufferSize, counters = Counters} = State,
+ case fabric_dict:lookup_element(Worker, Counters) of
+ BufferSize ->
+ State#collector{blocked = [{Worker,From} | State#collector.blocked]};
+ _Count ->
+ gen_server:reply(From, ok),
+ State
+ end.
+
+maybe_resume_worker(Worker, State) ->
+ #collector{buffer_size = Buffer, counters = C, blocked = B} = State,
+ case fabric_dict:lookup_element(Worker, C) of
+ Count when Count < Buffer/2 ->
+ case couch_util:get_value(Worker, B) of
+ undefined ->
+ State;
+ From ->
+ gen_server:reply(From, ok),
+ State#collector{blocked = lists:keydelete(Worker, 1, B)}
+ end;
+ _Other ->
+ State
+ end.
+
+maybe_send_row(#collector{limit=0} = State) ->
+ #collector{counters=Counters, user_acc=AccIn, callback=Callback} = State,
+ case fabric_dict:any(0, Counters) of
+ true ->
+ % we still need to send the total/offset header
+ {ok, State};
+ false ->
+ {_, Acc} = Callback(complete, AccIn),
+ {stop, State#collector{user_acc=Acc}}
+ end;
+maybe_send_row(State) ->
+ #collector{
+ callback = Callback,
+ counters = Counters,
+ skip = Skip,
+ limit = Limit,
+ user_acc = AccIn
+ } = State,
+ case fabric_dict:any(0, Counters) of
+ true ->
+ {ok, State};
+ false ->
+ try get_next_row(State) of
+ {_, NewState} when Skip > 0 ->
+ maybe_send_row(NewState#collector{skip=Skip-1});
+ {Row, NewState} ->
+ case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of
+ {stop, Acc} ->
+ {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
+ {ok, Acc} ->
+ maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1})
+ end
+ catch complete ->
+ {_, Acc} = Callback(complete, AccIn),
+ {stop, State#collector{user_acc=Acc}}
+ end
+ end.
+
+%% if include_docs=true is used when keys and
+%% the values contain "_id" then use the "_id"s
+%% to retrieve documents and embed in result
+possibly_embed_doc(_State,
+ #view_row{id=reduced}=Row) ->
+ Row;
+possibly_embed_doc(_State,
+ #view_row{value=undefined}=Row) ->
+ Row;
+possibly_embed_doc(#collector{db_name=DbName, query_args=Args},
+ #view_row{key=_Key, id=_Id, value=Value, doc=_Doc}=Row) ->
+ #view_query_args{include_docs=IncludeDocs} = Args,
+ case IncludeDocs andalso is_tuple(Value) of
+ true ->
+ {Props} = Value,
+ Rev0 = couch_util:get_value(<<"_rev">>, Props),
+ case couch_util:get_value(<<"_id">>,Props) of
+ undefined -> Row;
+ IncId ->
+ % use separate process to call fabric:open_doc
+ % to not interfere with current call
+ {Pid, Ref} = spawn_monitor(fun() ->
+ exit(
+ case Rev0 of
+ undefined ->
+ case fabric:open_doc(DbName, IncId, []) of
+ {ok, NewDoc} ->
+ Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])};
+ {not_found, _} ->
+ Row#view_row{doc=null}
+ end;
+ Rev0 ->
+ Rev = couch_doc:parse_rev(Rev0),
+ case fabric:open_revs(DbName, IncId, [Rev], []) of
+ {ok, [{ok, NewDoc}]} ->
+ Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])};
+ {ok, [{{not_found, _}, Rev}]} ->
+ Row#view_row{doc=null}
+ end
+ end) end),
+ receive {'DOWN',Ref,process,Pid, Resp} ->
+ Resp
+ end
+ end;
+ _ -> Row
+ end.
+
+
+keydict(nil) ->
+ undefined;
+keydict(Keys) ->
+ {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end,
+ {dict:new(),0}, Keys),
+ Dict.
+
+%% internal %%
+
+get_next_row(#collector{rows = []}) ->
+ throw(complete);
+get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
+ #collector{
+ query_args = #view_query_args{direction=Dir},
+ keys = Keys,
+ rows = RowDict,
+ os_proc = Proc,
+ counters = Counters0
+ } = St,
+ {Key, RestKeys} = find_next_key(Keys, Dir, RowDict),
+ case dict:find(Key, RowDict) of
+ {ok, Records} ->
+ NewRowDict = dict:erase(Key, RowDict),
+ Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) ->
+ fabric_dict:update_counter(Worker, -1, CountersAcc)
+ end, Counters0, Records),
+ Wrapped = [[V] || #view_row{value=V} <- Records],
+ {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped),
+ NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
+ NewState = lists:foldl(fun(#view_row{worker=Worker}, StateAcc) ->
+ maybe_resume_worker(Worker, StateAcc)
+ end, NewSt, Records),
+ {#view_row{key=Key, id=reduced, value=Reduced}, NewState};
+ error ->
+ get_next_row(St#collector{keys=RestKeys})
+ end;
+get_next_row(State) ->
+ #collector{rows = [Row|Rest], counters = Counters0} = State,
+ Worker = Row#view_row.worker,
+ Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
+ NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}),
+ {Row, NewState#collector{rows = Rest}}.
+
+find_next_key(nil, Dir, RowDict) ->
+ case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
+ [] ->
+ throw(complete);
+ [Key|_] ->
+ {Key, nil}
+ end;
+find_next_key([], _, _) ->
+ throw(complete);
+find_next_key([Key|Rest], _, _) ->
+ {Key, Rest}.
+
+transform_row(#view_row{key=Key, id=reduced, value=Value}) ->
+ {row, {[{key,Key}, {value,Value}]}};
+transform_row(#view_row{key=Key, id=undefined}) ->
+ {row, {[{key,Key}, {error,not_found}]}};
+transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
+ {row, {[{id,Id}, {key,Key}, {value,Value}]}};
+transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) ->
+ {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}};
+transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
+ {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}.
+
+
+sort_fun(fwd) ->
+ fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end;
+sort_fun(rev) ->
+ fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end.
+
+extract_view(Pid, ViewName, [], _ViewType) ->
+ twig:log(error, "missing_named_view ~p", [ViewName]),
+ exit(Pid, kill),
+ exit(missing_named_view);
+extract_view(Pid, ViewName, [View|Rest], ViewType) ->
+ case lists:member(ViewName, view_names(View, ViewType)) of
+ true ->
+ if ViewType == reduce ->
+ {index_of(ViewName, view_names(View, reduce)), View};
+ true ->
+ View
+ end;
+ false ->
+ extract_view(Pid, ViewName, Rest, ViewType)
+ end.
+
+view_names(View, Type) when Type == red_map; Type == reduce ->
+ [Name || {Name, _} <- View#view.reduce_funs];
+view_names(View, map) ->
+ View#view.map_names.
+
+index_of(X, List) ->
+ index_of(X, List, 1).
+
+index_of(_X, [], _I) ->
+ not_found;
+index_of(X, [X|_Rest], I) ->
+ I;
+index_of(X, [_|Rest], I) ->
+ index_of(X, Rest, I+1).
+
+get_shards(DbName, #view_query_args{stale=Stale})
+ when Stale == ok orelse Stale == update_after ->
+ mem3:ushards(DbName);
+get_shards(DbName, #view_query_args{stale=false}) ->
+ mem3:shards(DbName).
+
+% unit test
+is_progress_possible_test() ->
+ EndPoint = 2 bsl 31,
+ T1 = [[0, EndPoint-1]],
+ ?assertEqual(is_progress_possible(mk_cnts(T1)),true),
+ T2 = [[0,10],[11,20],[21,EndPoint-1]],
+ ?assertEqual(is_progress_possible(mk_cnts(T2)),true),
+ % gap
+ T3 = [[0,10],[12,EndPoint-1]],
+ ?assertEqual(is_progress_possible(mk_cnts(T3)),false),
+ % outside range
+ T4 = [[1,10],[11,20],[21,EndPoint-1]],
+ ?assertEqual(is_progress_possible(mk_cnts(T4)),false),
+ % outside range
+ T5 = [[0,10],[11,20],[21,EndPoint]],
+ ?assertEqual(is_progress_possible(mk_cnts(T5)),false).
+
+remove_overlapping_shards_test() ->
+ EndPoint = 2 bsl 31,
+ T1 = [[0,10],[11,20],[21,EndPoint-1]],
+ Shards = mk_cnts(T1,3),
+ ?assertEqual(orddict:size(
+ remove_overlapping_shards(#shard{name=list_to_atom("node-3"),
+ node=list_to_atom("node-3"),
+ range=[11,20]},
+ Shards)),7).
+
+mk_cnts(Ranges) ->
+ Shards = lists:map(fun(Range) ->
+ #shard{range=Range}
+ end,
+ Ranges),
+ orddict:from_list([{Shard,nil} || Shard <- Shards]).
+
+mk_cnts(Ranges, NoNodes) ->
+ orddict:from_list([{Shard,nil}
+ || Shard <-
+ lists:flatten(lists:map(
+ fun(Range) ->
+ mk_shards(NoNodes,Range,[])
+ end, Ranges))]
+ ).
+
+mk_shards(0,_Range,Shards) ->
+ Shards;
+mk_shards(NoNodes,Range,Shards) ->
+ NodeName = list_to_atom("node-" ++ integer_to_list(NoNodes)),
+ mk_shards(NoNodes-1,Range,
+ [#shard{name=NodeName, node=NodeName, range=Range} | Shards]).
diff --git a/deps/fabric/src/fabric_view_all_docs.erl b/deps/fabric/src/fabric_view_all_docs.erl
new file mode 100644
index 00000000..a769aedc
--- /dev/null
+++ b/deps/fabric/src/fabric_view_all_docs.erl
@@ -0,0 +1,181 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_view_all_docs).
+
+-export([go/4]).
+-export([open_doc/3]). % exported for spawn
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
+ Workers = fabric_util:submit_jobs(mem3:shards(DbName),all_docs,[QueryArgs]),
+ BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
+ #view_query_args{limit = Limit, skip = Skip} = QueryArgs,
+ State = #collector{
+ query_args = QueryArgs,
+ callback = Callback,
+ buffer_size = list_to_integer(BufferSize),
+ counters = fabric_dict:init(Workers, 0),
+ skip = Skip,
+ limit = Limit,
+ user_acc = Acc0
+ },
+ RexiMon = fabric_util:create_monitors(Workers),
+ try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 5000) of
+ {ok, NewState} ->
+ {ok, NewState#collector.user_acc};
+ {timeout, NewState} ->
+ Callback({error, timeout}, NewState#collector.user_acc);
+ {error, Resp} ->
+ {ok, Resp}
+ after
+ rexi_monitor:stop(RexiMon),
+ fabric_util:cleanup(Workers)
+ end;
+
+
+go(DbName, QueryArgs, Callback, Acc0) ->
+ #view_query_args{
+ direction = Dir,
+ include_docs = IncludeDocs,
+ limit = Limit0,
+ skip = Skip0,
+ keys = Keys
+ } = QueryArgs,
+ {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end),
+ Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) ||
+ Id <- Keys],
+ Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end,
+ receive {'DOWN', Ref0, _, _, {ok, TotalRows}} ->
+ {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0),
+ {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1),
+ Callback(complete, Acc2)
+ after 10000 ->
+ Callback(timeout, Acc0)
+ end.
+
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+ fabric_view:remove_down_shards(State, NodeRef);
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters}};
+ false ->
+ {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
+ {error, Resp}
+ end;
+
+handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
+ #collector{
+ callback = Callback,
+ counters = Counters0,
+ total_rows = Total0,
+ offset = Offset0,
+ user_acc = AccIn
+ } = State,
+ case fabric_dict:lookup_element(Worker, Counters0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate
+ gen_server:reply(From, stop),
+ {ok, State};
+ 0 ->
+ gen_server:reply(From, ok),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
+ Total = Total0 + Tot,
+ Offset = Offset0 + Off,
+ case fabric_dict:any(0, Counters2) of
+ true ->
+ {ok, State#collector{
+ counters = Counters2,
+ total_rows = Total,
+ offset = Offset
+ }};
+ false ->
+ FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+ {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn),
+ {Go, State#collector{
+ counters = fabric_dict:decrement_all(Counters2),
+ total_rows = Total,
+ offset = FinalOffset,
+ user_acc = Acc
+ }}
+ end
+ end;
+
+handle_message(#view_row{} = Row, {Worker, From}, State) ->
+ #collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
+ Dir = Args#view_query_args.direction,
+ Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ State1 = State#collector{rows=Rows, counters=Counters1},
+ State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
+ fabric_view:maybe_send_row(State2);
+
+handle_message(complete, Worker, State) ->
+ Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+ fabric_view:maybe_send_row(State#collector{counters = Counters}).
+
+
+merge_row(fwd, Row, Rows) ->
+ lists:keymerge(#view_row.id, [Row], Rows);
+merge_row(rev, Row, Rows) ->
+ lists:rkeymerge(#view_row.id, [Row], Rows).
+
+doc_receive_loop([], _, _, _, Acc) ->
+ {ok, Acc};
+doc_receive_loop(_, _, 0, _, Acc) ->
+ {ok, Acc};
+doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 ->
+ receive {'DOWN', Ref, process, Pid, #view_row{}} ->
+ doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc)
+ after 10000 ->
+ timeout
+ end;
+doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) ->
+ receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
+ case Callback(fabric_view:transform_row(Row), AccIn) of
+ {ok, Acc} ->
+ doc_receive_loop(Rest, 0, Limit-1, Callback, Acc);
+ {stop, Acc} ->
+ {ok, Acc}
+ end
+ after 10000 ->
+ timeout
+ end.
+
+open_doc(DbName, Id, IncludeDocs) ->
+ Row = case fabric:open_doc(DbName, Id, [deleted]) of
+ {not_found, missing} ->
+ Doc = undefined,
+ #view_row{key=Id};
+ {ok, #doc{deleted=true, revs=Revs}} ->
+ Doc = null,
+ {RevPos, [RevId|_]} = Revs,
+ Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]},
+ #view_row{key=Id, id=Id, value=Value};
+ {ok, #doc{revs=Revs} = Doc0} ->
+ Doc = couch_doc:to_json_obj(Doc0, []),
+ {RevPos, [RevId|_]} = Revs,
+ Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]},
+ #view_row{key=Id, id=Id, value=Value}
+ end,
+ exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end).
diff --git a/deps/fabric/src/fabric_view_changes.erl b/deps/fabric/src/fabric_view_changes.erl
new file mode 100644
index 00000000..41347095
--- /dev/null
+++ b/deps/fabric/src/fabric_view_changes.erl
@@ -0,0 +1,334 @@
+% Copyright 2012 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_view_changes).
+
+-export([go/5, pack_seqs/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-import(fabric_db_update_listener, [wait_db_updated/1, stop/1]).
+
+go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
+ Feed == "longpoll" ->
+ Args = make_changes_args(Options),
+ Since = get_start_seq(DbName, Args),
+ case validate_start_seq(DbName, Since) of
+ ok ->
+ {ok, Acc} = Callback(start, Acc0),
+ {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
+ Ref = make_ref(),
+ Parent = self(),
+ UpdateListener = {spawn_link(fabric_db_update_listener, go,
+ [Parent, Ref, DbName, Timeout]),
+ Ref},
+ try
+ keep_sending_changes(
+ DbName,
+ Args,
+ Callback,
+ Since,
+ Acc,
+ Timeout,
+ UpdateListener
+ )
+ after
+ stop(UpdateListener)
+ end;
+ Error ->
+ Callback(Error, Acc0)
+ end;
+
+go(DbName, "normal", Options, Callback, Acc0) ->
+ Args = make_changes_args(Options),
+ Since = get_start_seq(DbName, Args),
+ case validate_start_seq(DbName, Since) of
+ ok ->
+ {ok, Acc} = Callback(start, Acc0),
+ {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
+ DbName,
+ Args,
+ Callback,
+ Since,
+ Acc,
+ 5000
+ ),
+ Callback({stop, pack_seqs(Seqs)}, AccOut);
+ Error ->
+ Callback(Error, Acc0)
+ end.
+
+keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen) ->
+ #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
+ {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout),
+ #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
+ LastSeq = pack_seqs(NewSeqs),
+ if Limit > Limit2, Feed == "longpoll" ->
+ Callback({stop, LastSeq}, AccOut);
+ true ->
+ case {Heartbeat, wait_db_updated(UpListen)} of
+ {undefined, timeout} ->
+ Callback({stop, LastSeq}, AccOut);
+ _ ->
+ {ok, AccTimeout} = Callback(timeout, AccOut),
+ keep_sending_changes(
+ DbName,
+ Args#changes_args{limit=Limit2},
+ Callback,
+ LastSeq,
+ AccTimeout,
+ Timeout,
+ UpListen
+ )
+ end
+ end.
+
+send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
+ AllShards = mem3:shards(DbName),
+ Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
+ case lists:member(Shard, AllShards) of
+ true ->
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
+ [{Shard#shard{ref = Ref}, Seq}];
+ false ->
+ % Find some replacement shards to cover the missing range
+ % TODO It's possible in rare cases of shard merging to end up
+ % with overlapping shard ranges from this technique
+ lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
+ Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
+ {NewShard#shard{ref = Ref}, 0}
+ end, find_replacement_shards(Shard, AllShards))
+ end
+ end, unpack_seqs(PackedSeqs, DbName)),
+ {Workers, _} = lists:unzip(Seqs),
+ RexiMon = fabric_util:create_monitors(Workers),
+ State = #collector{
+ query_args = ChangesArgs,
+ callback = Callback,
+ counters = orddict:from_list(Seqs),
+ user_acc = AccIn,
+ limit = ChangesArgs#changes_args.limit,
+ rows = Seqs % store sequence positions instead
+ },
+ %% TODO: errors need to be handled here
+ try
+ receive_results(Workers, State, Timeout, Callback, AccIn)
+ after
+ rexi_monitor:stop(RexiMon),
+ fabric_util:cleanup(Workers)
+ end.
+
+receive_results(Workers, State, Timeout, Callback, AccIn) ->
+ case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State,
+ infinity, Timeout) of
+ {timeout, NewState0} ->
+ {ok, AccOut} = Callback(timeout, NewState0#collector.user_acc),
+ NewState = NewState0#collector{user_acc = AccOut},
+ receive_results(Workers, NewState, Timeout, Callback, AccOut);
+ {_, NewState} ->
+ {ok, NewState}
+ end.
+
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, nil, State) ->
+ fabric_view:remove_down_shards(State, NodeRef);
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ #collector{
+ callback=Callback,
+ counters=Counters0,
+ rows = Seqs0,
+ user_acc=Acc
+ } = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ Seqs = fabric_dict:erase(Worker, Seqs0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters, rows=Seqs}};
+ false ->
+ {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
+ {error, Resp}
+ end;
+
+handle_message(_, _, #collector{limit=0} = State) ->
+ {stop, State};
+
+handle_message(#change{key=Seq} = Row0, {Worker, From}, St) ->
+ #collector{
+ query_args = #changes_args{include_docs=IncludeDocs},
+ callback = Callback,
+ counters = S0,
+ limit = Limit,
+ user_acc = AccIn
+ } = St,
+ case fabric_dict:lookup_element(Worker, S0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate it
+ gen_server:reply(From, stop),
+ {ok, St};
+ _ ->
+ S1 = fabric_dict:store(Worker, Seq, S0),
+ S2 = fabric_view:remove_overlapping_shards(Worker, S1),
+ Row = Row0#change{key = pack_seqs(S2)},
+ {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+ gen_server:reply(From, Go),
+ {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}}
+ end;
+
+handle_message({complete, EndSeq}, Worker, State) ->
+ #collector{
+ counters = S0,
+ total_rows = Completed % override
+ } = State,
+ case fabric_dict:lookup_element(Worker, S0) of
+ undefined ->
+ {ok, State};
+ _ ->
+ S1 = fabric_dict:store(Worker, EndSeq, S0),
+ % unlikely to have overlaps here, but possible w/ filters
+ S2 = fabric_view:remove_overlapping_shards(Worker, S1),
+ NewState = State#collector{counters=S2, total_rows=Completed+1},
+ case fabric_dict:size(S2) =:= (Completed+1) of
+ true ->
+ {stop, NewState};
+ false ->
+ {ok, NewState}
+ end
+ end.
+
+make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
+ Args#changes_args{filter = Style};
+make_changes_args(Args) ->
+ Args.
+
+get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
+ Since;
+get_start_seq(DbName, #changes_args{dir=rev}) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
+ {ok, Since} = fabric_util:recv(Workers, #shard.ref,
+ fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
+ Since.
+
+collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
+ case fabric_dict:lookup_element(Shard, Counters) of
+ undefined ->
+ % already heard from someone else in this range
+ {ok, Counters};
+ -1 ->
+ C1 = fabric_dict:store(Shard, Seq, Counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ case fabric_dict:any(-1, C2) of
+ true ->
+ {ok, C2};
+ false ->
+ {stop, pack_seqs(C2)}
+ end
+ end.
+
+pack_seqs(Workers) ->
+ SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
+ SeqSum = lists:sum(element(2, lists:unzip(Workers))),
+ Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
+ [SeqSum, Opaque].
+
+unpack_seqs(0, DbName) ->
+ fabric_dict:init(mem3:shards(DbName), 0);
+
+unpack_seqs("0", DbName) ->
+ fabric_dict:init(mem3:shards(DbName), 0);
+
+unpack_seqs([_SeqNum, Opaque], DbName) ->
+ do_unpack_seqs(Opaque, DbName);
+
+unpack_seqs(Packed, DbName) ->
+ NewPattern = "^\\[[0-9]+,\"(?<opaque>.*)\"\\]$",
+ OldPattern = "^([0-9]+-)?(?<opaque>.*)$",
+ Options = [{capture, [opaque], binary}],
+ Opaque = case re:run(Packed, NewPattern, Options) of
+ {match, Match} ->
+ Match;
+ nomatch ->
+ {match, Match} = re:run(Packed, OldPattern, Options),
+ Match
+ end,
+ do_unpack_seqs(Opaque, DbName).
+
+do_unpack_seqs(Opaque, DbName) ->
+ % TODO relies on internal structure of fabric_dict as keylist
+ lists:map(fun({Node, [A,B], Seq}) ->
+ Match = #shard{node=Node, range=[A,B], dbname=DbName, _ = '_'},
+ case ets:match_object(partitions, Match) of
+ [Shard] ->
+ {Shard, Seq};
+ [] ->
+ {Match, Seq} % will be replaced in find_replacement_shards
+ end
+ end, binary_to_term(couch_util:decodeBase64Url(Opaque))).
+
+changes_row(#change{key=Seq, id=Id, value=Value, deleted=true, doc=Doc}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, Doc}]}};
+changes_row(#change{key=Seq, id=Id, value=Value, deleted=true}, false) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
+changes_row(#change{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
+changes_row(#change{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
+changes_row(#change{key=Seq, id=Id, value=Value}, false) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
+
+find_replacement_shards(#shard{range=Range}, AllShards) ->
+ % TODO make this moar betta -- we might have split or merged the partition
+ [Shard || Shard <- AllShards, Shard#shard.range =:= Range].
+
+validate_start_seq(DbName, Seq) ->
+ try unpack_seqs(Seq, DbName) of _Any ->
+ ok
+ catch _:_ ->
+ Reason = <<"Malformed sequence supplied in 'since' parameter.">>,
+ {error, {bad_request, Reason}}
+ end.
+
+unpack_seqs_test() ->
+ ets:new(partitions, [named_table]),
+
+ % BigCouch 0.3 style.
+ assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"),
+
+ % BigCouch 0.4 style.
+ assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]),
+
+ % BigCouch 0.4 style (as string).
+ assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
+ "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
+ "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
+
+ % with internal hypen
+ assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
+ "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
+ "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"),
+ assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
+ "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
+ "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]),
+
+ ets:delete(partitions).
+
+assert_shards(Packed) ->
+ ?assertMatch([{#shard{},_}|_], unpack_seqs(Packed, <<"foo">>)).
diff --git a/deps/fabric/src/fabric_view_map.erl b/deps/fabric/src/fabric_view_map.erl
new file mode 100644
index 00000000..96741a8e
--- /dev/null
+++ b/deps/fabric/src/fabric_view_map.erl
@@ -0,0 +1,151 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_view_map).
+
+-export([go/6]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
+ {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
+ go(DbName, DDoc, View, Args, Callback, Acc0);
+
+go(DbName, DDoc, View, Args, Callback, Acc0) ->
+ Shards = fabric_view:get_shards(DbName, Args),
+ Workers = fabric_util:submit_jobs(Shards, map_view, [DDoc, View, Args]),
+ BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
+ #view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args,
+ State = #collector{
+ db_name=DbName,
+ query_args = Args,
+ callback = Callback,
+ buffer_size = list_to_integer(BufferSize),
+ counters = fabric_dict:init(Workers, 0),
+ skip = Skip,
+ limit = Limit,
+ keys = fabric_view:keydict(Keys),
+ sorted = Args#view_query_args.sorted,
+ user_acc = Acc0
+ },
+ RexiMon = fabric_util:create_monitors(Workers),
+ try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 1000 * 60 * 60) of
+ {ok, NewState} ->
+ {ok, NewState#collector.user_acc};
+ {timeout, NewState} ->
+ Callback({error, timeout}, NewState#collector.user_acc);
+ {error, Resp} ->
+ {ok, Resp}
+ after
+ rexi_monitor:stop(RexiMon),
+ fabric_util:cleanup(Workers)
+ end.
+
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+ fabric_view:remove_down_shards(State, NodeRef);
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters}};
+ false ->
+ {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
+ {error, Resp}
+ end;
+
+handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
+ #collector{
+ callback = Callback,
+ counters = Counters0,
+ total_rows = Total0,
+ offset = Offset0,
+ user_acc = AccIn
+ } = State,
+ case fabric_dict:lookup_element(Worker, Counters0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate
+ gen_server:reply(From, stop),
+ {ok, State};
+ 0 ->
+ gen_server:reply(From, ok),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
+ Total = Total0 + Tot,
+ Offset = Offset0 + Off,
+ case fabric_dict:any(0, Counters2) of
+ true ->
+ {ok, State#collector{
+ counters = Counters2,
+ total_rows = Total,
+ offset = Offset
+ }};
+ false ->
+ FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+ {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn),
+ {Go, State#collector{
+ counters = fabric_dict:decrement_all(Counters2),
+ total_rows = Total,
+ offset = FinalOffset,
+ user_acc = Acc
+ }}
+ end
+ end;
+
+handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->
+ #collector{callback=Callback} = State,
+ {_, Acc} = Callback(complete, State#collector.user_acc),
+ {stop, State#collector{user_acc=Acc}};
+
+handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) ->
+ #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St,
+ {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn),
+ gen_server:reply(From, ok),
+ {Go, St#collector{user_acc=Acc, limit=Limit-1}};
+
+handle_message(#view_row{} = Row, {Worker, From}, State) ->
+ #collector{
+ query_args = #view_query_args{direction=Dir},
+ counters = Counters0,
+ rows = Rows0,
+ keys = KeyDict
+ } = State,
+ Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ State1 = State#collector{rows=Rows, counters=Counters1},
+ State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
+ fabric_view:maybe_send_row(State2);
+
+handle_message(complete, Worker, State) ->
+ Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+ fabric_view:maybe_send_row(State#collector{counters = Counters}).
+
+merge_row(fwd, undefined, Row, Rows) ->
+ lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
+ couch_view:less_json([KeyA, IdA], [KeyB, IdB])
+ end, [Row], Rows);
+merge_row(rev, undefined, Row, Rows) ->
+ lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
+ couch_view:less_json([KeyB, IdB], [KeyA, IdA])
+ end, [Row], Rows);
+merge_row(_, KeyDict, Row, Rows) ->
+ lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) ->
+ if A =:= B -> IdA < IdB; true ->
+ dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict)
+ end
+ end, [Row], Rows).
diff --git a/deps/fabric/src/fabric_view_reduce.erl b/deps/fabric/src/fabric_view_reduce.erl
new file mode 100644
index 00000000..58438573
--- /dev/null
+++ b/deps/fabric/src/fabric_view_reduce.erl
@@ -0,0 +1,114 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_view_reduce).
+
+-export([go/6]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
+ {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
+ go(DbName, DDoc, View, Args, Callback, Acc0);
+
+go(DbName, DDoc, VName, Args, Callback, Acc0) ->
+ #group{def_lang=Lang, views=Views} = Group =
+ couch_view_group:design_doc_to_view_group(DDoc),
+ {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
+ {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs),
+ Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
+ Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}),
+ Shard#shard{ref = Ref}
+ end, fabric_view:get_shards(DbName, Args)),
+ RexiMon = fabric_util:create_monitors(Workers),
+ BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"),
+ #view_query_args{limit = Limit, skip = Skip} = Args,
+ State = #collector{
+ db_name = DbName,
+ query_args = Args,
+ callback = Callback,
+ buffer_size = list_to_integer(BufferSize),
+ counters = fabric_dict:init(Workers, 0),
+ keys = Args#view_query_args.keys,
+ skip = Skip,
+ limit = Limit,
+ lang = Group#group.def_lang,
+ os_proc = couch_query_servers:get_os_process(Lang),
+ reducer = RedSrc,
+ rows = dict:new(),
+ user_acc = Acc0
+ },
+ try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 1000 * 60 * 60) of
+ {ok, NewState} ->
+ {ok, NewState#collector.user_acc};
+ {timeout, NewState} ->
+ Callback({error, timeout}, NewState#collector.user_acc);
+ {error, Resp} ->
+ {ok, Resp}
+ after
+ rexi_monitor:stop(RexiMon),
+ fabric_util:cleanup(Workers),
+ catch couch_query_servers:ret_os_process(State#collector.os_proc)
+ end.
+
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+ fabric_view:remove_down_shards(State, NodeRef);
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters}};
+ false ->
+ {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
+ {error, Resp}
+ end;
+
+handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
+ #collector{counters = Counters0, rows = Rows0} = State,
+ case fabric_dict:lookup_element(Worker, Counters0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate it
+ gen_server:reply(From, stop),
+ {ok, State};
+ _ ->
+ Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0),
+ C1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ % TODO time this call, if slow don't do it every time
+ C2 = fabric_view:remove_overlapping_shards(Worker, C1),
+ State1 = State#collector{rows=Rows, counters=C2},
+ State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
+ fabric_view:maybe_send_row(State2)
+ end;
+
+handle_message(complete, Worker, State) ->
+ C1 = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+ C2 = fabric_view:remove_overlapping_shards(Worker, C1),
+ fabric_view:maybe_send_row(State#collector{counters = C2}).
+
+complete_worker_test() ->
+ Shards =
+ mem3_util:create_partition_map("foo",3,3,[node(),node(),node()]),
+ Workers = lists:map(fun(#shard{} = Shard) ->
+ Ref = make_ref(),
+ Shard#shard{ref = Ref}
+ end,
+ Shards),
+ State = #collector{counters=fabric_dict:init(Workers,0)},
+ {ok, NewState} = handle_message(complete, lists:nth(2,Workers), State),
+ ?assertEqual(orddict:size(NewState#collector.counters),length(Workers) - 2).