diff options
Diffstat (limited to 'deps/fabric')
26 files changed, 4047 insertions, 0 deletions
diff --git a/deps/fabric/README.md b/deps/fabric/README.md new file mode 100644 index 00000000..ab710ca9 --- /dev/null +++ b/deps/fabric/README.md @@ -0,0 +1,24 @@ +## fabric + +Fabric is a collection of proxy functions for [CouchDB][1] operations in a cluster. These functions are used in [BigCouch][2] as the remote procedure endpoints on each of the cluster nodes. + +For example, creating a database is a straightforward task in standalone CouchDB, but for BigCouch, each node that will store a shard/partition for the database needs to receive and execute a fabric function. The node handling the request also needs to compile the results from each of the nodes and respond accordingly to the client. + +Fabric is used in conjunction with 'Rexi' which is also an application within BigCouch. + +### Getting Started +Fabric requires R13B03 or higher and can be built with [rebar][6], which comes bundled in the repository. + +### License +[Apache 2.0][3] + +### Contact + * [http://cloudant.com][4] + * [info@cloudant.com][5] + +[1]: http://couchdb.apache.org +[2]: http://github.com/cloudant/bigcouch +[3]: http://www.apache.org/licenses/LICENSE-2.0.html +[4]: http://cloudant.com +[5]: mailto:info@cloudant.com +[6]: http://github.com/basho/rebar diff --git a/deps/fabric/include/fabric.hrl b/deps/fabric/include/fabric.hrl new file mode 100644 index 00000000..c8de0f8f --- /dev/null +++ b/deps/fabric/include/fabric.hrl @@ -0,0 +1,38 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-include_lib("eunit/include/eunit.hrl"). + +-record(collector, { + db_name=nil, + query_args, + callback, + counters, + buffer_size, + blocked = [], + total_rows = 0, + offset = 0, + rows = [], + skip, + limit, + keys, + os_proc, + reducer, + lang, + sorted, + user_acc +}). + +-record(view_row, {key, id, value, doc, worker}). +-record(change, {key, id, value, deleted=false, doc, worker}). diff --git a/deps/fabric/rebar b/deps/fabric/rebar Binary files differnew file mode 100755 index 00000000..30c43ba5 --- /dev/null +++ b/deps/fabric/rebar diff --git a/deps/fabric/rebar.config b/deps/fabric/rebar.config new file mode 100644 index 00000000..bb4d2fe9 --- /dev/null +++ b/deps/fabric/rebar.config @@ -0,0 +1,18 @@ +% Copyright 2011 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{deps, [ + {meck, ".*", {git, "https://github.com/eproxus/meck.git", {tag, "0.7.2"}}}, + {twig, ".*", {git, "https://github.com/cloudant/twig.git", {tag, "0.2.1"}}} +]}. diff --git a/deps/fabric/src/fabric.app.src b/deps/fabric/src/fabric.app.src new file mode 100644 index 00000000..a1cbb2c8 --- /dev/null +++ b/deps/fabric/src/fabric.app.src @@ -0,0 +1,6 @@ +{application, fabric, [ + {description, "Routing and proxying layer for CouchDB cluster"}, + {vsn, git}, + {registered, []}, + {applications, [kernel, stdlib, couch, rexi, mem3, twig]} +]}. diff --git a/deps/fabric/src/fabric.erl b/deps/fabric/src/fabric.erl new file mode 100644 index 00000000..a8e3b2ea --- /dev/null +++ b/deps/fabric/src/fabric.erl @@ -0,0 +1,460 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric). + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(ADMIN_CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>]}}). + +% DBs +-export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1, + delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3, + set_security/3, get_revs_limit/1, get_security/1, get_security/2]). + +% Documents +-export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3, + update_doc/3, update_docs/3, purge_docs/2, att_receiver/2]). + +% Views +-export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6, + get_view_group_info/2]). + +% miscellany +-export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0, + cleanup_index_files/1]). + +-include("fabric.hrl"). + +-type dbname() :: (iodata() | #db{}). +-type docid() :: iodata(). +-type revision() :: {integer(), binary()}. +-type callback() :: fun((any(), any()) -> {ok | stop, any()}). +-type json_obj() :: {[{binary() | atom(), any()}]}. +-type option() :: atom() | {atom(), any()}. + +%% db operations +%% @equiv all_dbs(<<>>) +all_dbs() -> + all_dbs(<<>>). + +%% @doc returns a list of all database names +-spec all_dbs(Prefix::iodata()) -> {ok, [binary()]}. +all_dbs(Prefix) when is_binary(Prefix) -> + Length = byte_size(Prefix), + MatchingDbs = ets:foldl(fun(#shard{dbname=DbName}, Acc) -> + case DbName of + <<Prefix:Length/binary, _/binary>> -> + [DbName | Acc]; + _ -> + Acc + end + end, [], partitions), + {ok, lists:usort(MatchingDbs)}; + +%% @equiv all_dbs(list_to_binary(Prefix)) +all_dbs(Prefix) when is_list(Prefix) -> + all_dbs(list_to_binary(Prefix)). + +%% @doc returns a property list of interesting properties +%% about the database such as `doc_count', `disk_size', +%% etc. +-spec get_db_info(dbname()) -> + {ok, [ + {instance_start_time, binary()} | + {doc_count, non_neg_integer()} | + {doc_del_count, non_neg_integer()} | + {purge_seq, non_neg_integer()} | + {compact_running, boolean()} | + {disk_size, non_neg_integer()} | + {disk_format_version, pos_integer()} + ]}. +get_db_info(DbName) -> + fabric_db_info:go(dbname(DbName)). + +%% @doc the number of docs in a database +-spec get_doc_count(dbname()) -> {ok, non_neg_integer()}. +get_doc_count(DbName) -> + fabric_db_doc_count:go(dbname(DbName)). + +%% @equiv create_db(DbName, []) +create_db(DbName) -> + create_db(DbName, []). + +%% @doc creates a database with the given name. +%% +%% Options can include values for q and n, +%% for example `{q, "8"}' and `{n, "3"}', which +%% control how many shards to split a database into +%% and how many nodes each doc is copied to respectively. +%% +-spec create_db(dbname(), [option()]) -> ok | accepted | {error, atom()}. +create_db(DbName, Options) -> + fabric_db_create:go(dbname(DbName), opts(Options)). + +%% @equiv delete_db([]) +delete_db(DbName) -> + delete_db(DbName, []). + +%% @doc delete a database +-spec delete_db(dbname(), [option()]) -> ok | accepted | {error, atom()}. +delete_db(DbName, Options) -> + fabric_db_delete:go(dbname(DbName), opts(Options)). + +%% @doc provide an upper bound for the number of tracked document revisions +-spec set_revs_limit(dbname(), pos_integer(), [option()]) -> ok. +set_revs_limit(DbName, Limit, Options) when is_integer(Limit), Limit > 0 -> + fabric_db_meta:set_revs_limit(dbname(DbName), Limit, opts(Options)). + +%% @doc retrieves the maximum number of document revisions +-spec get_revs_limit(dbname()) -> pos_integer() | no_return(). +get_revs_limit(DbName) -> + {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]), + try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end. + +%% @doc sets the readers/writers/admin permissions for a database +-spec set_security(dbname(), SecObj::json_obj(), [option()]) -> ok. +set_security(DbName, SecObj, Options) -> + fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)). + +get_security(DbName) -> + get_security(DbName, [?ADMIN_CTX]). + +%% @doc retrieve the security object for a database +-spec get_security(dbname()) -> json_obj() | no_return(). +get_security(DbName, Options) -> + {ok, Db} = fabric_util:get_db(dbname(DbName), opts(Options)), + try couch_db:get_security(Db) after catch couch_db:close(Db) end. + +% doc operations + +%% @doc retrieve the doc with a given id +-spec open_doc(dbname(), docid(), [option()]) -> + {ok, #doc{}} | + {not_found, missing | deleted} | + {timeout, any()} | + {error, any()} | + {error, any() | any()}. +open_doc(DbName, Id, Options) -> + fabric_doc_open:go(dbname(DbName), docid(Id), opts(Options)). + +%% @doc retrieve a collection of revisions, possible all +-spec open_revs(dbname(), docid(), [revision()] | all, [option()]) -> + {ok, [{ok, #doc{}} | {{not_found,missing}, revision()}]} | + {timeout, any()} | + {error, any()} | + {error, any(), any()}. +open_revs(DbName, Id, Revs, Options) -> + fabric_doc_open_revs:go(dbname(DbName), docid(Id), Revs, opts(Options)). + +%% @equiv get_missing_revs(DbName, IdsRevs, []) +get_missing_revs(DbName, IdsRevs) -> + get_missing_revs(DbName, IdsRevs, []). + +%% @doc retrieve missing revisions for a list of `{Id, Revs}' +-spec get_missing_revs(dbname(),[{docid(), [revision()]}], [option()]) -> + {ok, [{docid(), any(), [any()]}]}. +get_missing_revs(DbName, IdsRevs, Options) when is_list(IdsRevs) -> + Sanitized = [idrevs(IdR) || IdR <- IdsRevs], + fabric_doc_missing_revs:go(dbname(DbName), Sanitized, opts(Options)). + +%% @doc update a single doc +%% @equiv update_docs(DbName,[Doc],Options) +-spec update_doc(dbname(), #doc{}, [option()]) -> + {ok, any()} | any(). +update_doc(DbName, Doc, Options) -> + case update_docs(DbName, [Doc], opts(Options)) of + {ok, [{ok, NewRev}]} -> + {ok, NewRev}; + {accepted, [{accepted, NewRev}]} -> + {accepted, NewRev}; + {ok, [{{_Id, _Rev}, Error}]} -> + throw(Error); + {ok, [Error]} -> + throw(Error); + {ok, []} -> + % replication success + #doc{revs = {Pos, [RevId | _]}} = doc(Doc), + {ok, {Pos, RevId}} + end. + +%% @doc update a list of docs +-spec update_docs(dbname(), [#doc{}], [option()]) -> + {ok, any()} | any(). +update_docs(DbName, Docs, Options) -> + try + fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)) of + {ok, Results} -> + {ok, Results}; + {accepted, Results} -> + {accepted, Results}; + Error -> + throw(Error) + catch {aborted, PreCommitFailures} -> + {aborted, PreCommitFailures} + end. + +purge_docs(_DbName, _IdsRevs) -> + not_implemented. + +%% @doc spawns a process to upload attachment data and +%% returns a function that shards can use to communicate +%% with the spawned middleman process +-spec att_receiver(#httpd{}, Length :: undefined | chunked | pos_integer() | + {unknown_transfer_encoding, any()}) -> + function() | binary(). +att_receiver(Req, Length) -> + fabric_doc_attachments:receiver(Req, Length). + +%% @doc retrieves all docs. Additional query parameters, such as `limit', +%% `start_key' and `end_key', `descending', and `include_docs', can +%% also be passed to further constrain the query. See <a href= +%% "http://wiki.apache.org/couchdb/HTTP_Document_API#All_Documents"> +%% all_docs</a> for details +-spec all_docs(dbname(), callback(), [] | tuple(), #view_query_args{}) -> + {ok, [any()]}. +all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs) when + is_function(Callback, 2) -> + fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0); + +%% @doc convenience function that takes a keylist rather than a record +%% @equiv all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs)) +all_docs(DbName, Callback, Acc0, QueryArgs) -> + all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs)). + + +-spec changes(dbname(), callback(), any(), #changes_args{} | [{atom(),any()}]) -> + {ok, any()}. +changes(DbName, Callback, Acc0, #changes_args{}=Options) -> + Feed = Options#changes_args.feed, + fabric_view_changes:go(dbname(DbName), Feed, Options, Callback, Acc0); + +%% @doc convenience function, takes keylist instead of record +%% @equiv changes(DbName, Callback, Acc0, kl_to_changes_args(Options)) +changes(DbName, Callback, Acc0, Options) -> + changes(DbName, Callback, Acc0, kl_to_changes_args(Options)). + +%% @equiv query_view(DbName, DesignName, ViewName, #view_query_args{}) +query_view(DbName, DesignName, ViewName) -> + query_view(DbName, DesignName, ViewName, #view_query_args{}). + +%% @equiv query_view(DbName, DesignName, +%% ViewName, fun default_callback/2, [], QueryArgs) +query_view(DbName, DesignName, ViewName, QueryArgs) -> + Callback = fun default_callback/2, + query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs). + +%% @doc execute a given view. +%% There are many additional query args that can be passed to a view, +%% see <a href="http://wiki.apache.org/couchdb/HTTP_view_API#Querying_Options"> +%% query args</a> for details. +-spec query_view(dbname(), #doc{} | binary(), iodata(), callback(), any(), + #view_query_args{}) -> + any(). +query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) -> + Db = dbname(DbName), View = name(ViewName), + case is_reduce_view(Db, Design, View, QueryArgs) of + true -> + Mod = fabric_view_reduce; + false -> + Mod = fabric_view_map + end, + Mod:go(Db, Design, View, QueryArgs, Callback, Acc0). + +%% @doc retrieve info about a view group, disk size, language, whether compaction +%% is running and so forth +-spec get_view_group_info(dbname(), #doc{} | docid()) -> + {ok, [ + {signature, binary()} | + {language, binary()} | + {disk_size, non_neg_integer()} | + {compact_running, boolean()} | + {updater_running, boolean()} | + {waiting_commit, boolean()} | + {waiting_clients, non_neg_integer()} | + {update_seq, pos_integer()} | + {purge_seq, non_neg_integer()} + ]}. +get_view_group_info(DbName, DesignId) -> + fabric_group_info:go(dbname(DbName), design_doc(DesignId)). + +%% @doc retrieve all the design docs from a database +-spec design_docs(dbname()) -> {ok, [json_obj()]}. +design_docs(DbName) -> + QueryArgs = #view_query_args{start_key = <<"_design/">>, include_docs=true}, + Callback = fun({total_and_offset, _, _}, []) -> + {ok, []}; + ({row, {Props}}, Acc) -> + case couch_util:get_value(id, Props) of + <<"_design/", _/binary>> -> + {ok, [couch_util:get_value(doc, Props) | Acc]}; + _ -> + {stop, Acc} + end; + (complete, Acc) -> + {ok, lists:reverse(Acc)}; + ({error, Reason}, _Acc) -> + {error, Reason} + end, + fabric:all_docs(dbname(DbName), Callback, [], QueryArgs). + +%% @doc forces a reload of validation functions, this is performed after +%% design docs are update +%% NOTE: This function probably doesn't belong here as part fo the API +-spec reset_validation_funs(dbname()) -> [reference()]. +reset_validation_funs(DbName) -> + [rexi:cast(Node, {fabric_rpc, reset_validation_funs, [Name]}) || + #shard{node=Node, name=Name} <- mem3:shards(DbName)]. + +%% @doc clean up index files for all Dbs +-spec cleanup_index_files() -> [ok]. +cleanup_index_files() -> + {ok, Dbs} = fabric:all_dbs(), + [cleanup_index_files(Db) || Db <- Dbs]. + +%% @doc clean up index files for a specific db +-spec cleanup_index_files(dbname()) -> ok. +cleanup_index_files(DbName) -> + {ok, DesignDocs} = fabric:design_docs(DbName), + + ActiveSigs = lists:map(fun(#doc{id = GroupId}) -> + {ok, Info} = fabric:get_view_group_info(DbName, GroupId), + binary_to_list(couch_util:get_value(signature, Info)) + end, [couch_doc:from_json_obj(DD) || DD <- DesignDocs]), + + FileList = filelib:wildcard([couch_config:get("couchdb", "view_index_dir"), + "/.shards/*/", couch_util:to_list(dbname(DbName)), ".[0-9]*_design/*"]), + + DeleteFiles = if ActiveSigs =:= [] -> FileList; true -> + {ok, RegExp} = re:compile([$(, string:join(ActiveSigs, "|"), $)]), + lists:filter(fun(FilePath) -> + re:run(FilePath, RegExp, [{capture, none}]) == nomatch + end, FileList) + end, + [file:delete(File) || File <- DeleteFiles], + ok. + +%% some simple type validation and transcoding + +dbname(DbName) when is_list(DbName) -> + list_to_binary(DbName); +dbname(DbName) when is_binary(DbName) -> + DbName; +dbname(#db{name=Name}) -> + Name; +dbname(DbName) -> + erlang:error({illegal_database_name, DbName}). + +name(Thing) -> + couch_util:to_binary(Thing). + +docid(DocId) when is_list(DocId) -> + list_to_binary(DocId); +docid(DocId) when is_binary(DocId) -> + DocId; +docid(DocId) -> + erlang:error({illegal_docid, DocId}). + +docs(Docs) when is_list(Docs) -> + [doc(D) || D <- Docs]; +docs(Docs) -> + erlang:error({illegal_docs_list, Docs}). + +doc(#doc{} = Doc) -> + Doc; +doc({_} = Doc) -> + couch_doc:from_json_obj(Doc); +doc(Doc) -> + erlang:error({illegal_doc_format, Doc}). + +design_doc(#doc{} = DDoc) -> + DDoc; +design_doc(DocId) when is_list(DocId) -> + design_doc(list_to_binary(DocId)); +design_doc(<<"_design/", _/binary>> = DocId) -> + DocId; +design_doc(GroupName) -> + <<"_design/", GroupName/binary>>. + +idrevs({Id, Revs}) when is_list(Revs) -> + {docid(Id), [rev(R) || R <- Revs]}. + +rev(Rev) when is_list(Rev); is_binary(Rev) -> + couch_doc:parse_rev(Rev); +rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) -> + Rev. + +%% @doc convenience method, useful when testing or calling fabric from the shell +opts(Options) -> + add_option(user_ctx, add_option(io_priority, Options)). + +add_option(Key, Options) -> + case couch_util:get_value(Key, Options) of + undefined -> + case erlang:get(Key) of + undefined -> + Options; + Value -> + [{Key, Value} | Options] + end; + _ -> + Options + end. + +default_callback(complete, Acc) -> + {ok, lists:reverse(Acc)}; +default_callback(Row, Acc) -> + {ok, [Row | Acc]}. + +is_reduce_view(_, _, _, #view_query_args{view_type=Reduce}) -> + Reduce =:= reduce. + +%% @doc convenience method for use in the shell, converts a keylist +%% to a `changes_args' record +kl_to_changes_args(KeyList) -> + kl_to_record(KeyList, changes_args). + +%% @doc convenience method for use in the shell, converts a keylist +%% to a `view_query_args' record +kl_to_query_args(KeyList) -> + kl_to_record(KeyList, view_query_args). + +%% @doc finds the index of the given Key in the record. +%% note that record_info is only known at compile time +%% so the code must be written in this way. For each new +%% record type add a case clause +lookup_index(Key,RecName) -> + Indexes = + case RecName of + changes_args -> + lists:zip(record_info(fields, changes_args), + lists:seq(2, record_info(size, changes_args))); + view_query_args -> + lists:zip(record_info(fields, view_query_args), + lists:seq(2, record_info(size, view_query_args))) + end, + couch_util:get_value(Key, Indexes). + +%% @doc convert a keylist to record with given `RecName' +%% @see lookup_index +kl_to_record(KeyList,RecName) -> + Acc0 = case RecName of + changes_args -> #changes_args{}; + view_query_args -> #view_query_args{} + end, + lists:foldl(fun({Key, Value}, Acc) -> + Index = lookup_index(couch_util:to_existing_atom(Key),RecName), + setelement(Index, Acc, Value) + end, Acc0, KeyList). diff --git a/deps/fabric/src/fabric_db_create.erl b/deps/fabric/src/fabric_db_create.erl new file mode 100644 index 00000000..080517be --- /dev/null +++ b/deps/fabric/src/fabric_db_create.erl @@ -0,0 +1,161 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_db_create). +-export([go/2]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(DBNAME_REGEX, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"). + +%% @doc Create a new database, and all its partition files across the cluster +%% Options is proplist with user_ctx, n, q, validate_name +go(DbName, Options) -> + case validate_dbname(DbName, Options) of + ok -> + {Shards, Doc} = generate_shard_map(DbName, Options), + case {create_shard_files(Shards), create_shard_db_doc(Doc)} of + {ok, {ok, Status}} -> + Status; + {file_exists, {ok, _}} -> + {error, file_exists}; + {_, Error} -> + Error + end; + Error -> + Error + end. + +validate_dbname(DbName, Options) -> + case couch_util:get_value(validate_name, Options, true) of + false -> + ok; + true -> + case re:run(DbName, ?DBNAME_REGEX, [{capture,none}]) of + match -> + ok; + nomatch when DbName =:= <<"_users">> -> + ok; + nomatch when DbName =:= <<"_replicator">> -> + ok; + nomatch -> + {error, illegal_database_name} + end + end. + +generate_shard_map(DbName, Options) -> + {MegaSecs, Secs, _} = now(), + Suffix = "." ++ integer_to_list(MegaSecs*1000000 + Secs), + Shards = mem3:choose_shards(DbName, [{shard_suffix,Suffix} | Options]), + case mem3_util:open_db_doc(DbName) of + {ok, Doc} -> + % the DB already exists, and may have a different Suffix + ok; + {not_found, _} -> + Doc = make_document(Shards, Suffix) + end, + {Shards, Doc}. + +create_shard_files(Shards) -> + Workers = fabric_util:submit_jobs(Shards, create_db, []), + RexiMon = fabric_util:create_monitors(Shards), + try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Workers) of + {error, file_exists} -> + file_exists; + _ -> + ok + after + rexi_monitor:stop(RexiMon) + end. + +handle_message(file_exists, _, _) -> + {error, file_exists}; + +handle_message({rexi_DOWN, _, {_, Node}, _}, _, Workers) -> + case lists:filter(fun(S) -> S#shard.node =/= Node end, Workers) of + [] -> + {stop, ok}; + RemainingWorkers -> + {ok, RemainingWorkers} + end; + +handle_message(_, Worker, Workers) -> + case lists:delete(Worker, Workers) of + [] -> + {stop, ok}; + RemainingWorkers -> + {ok, RemainingWorkers} + end. + +create_shard_db_doc(Doc) -> + Shards = [#shard{node=N} || N <- mem3:nodes()], + RexiMon = fabric_util:create_monitors(Shards), + Workers = fabric_util:submit_jobs(Shards, create_shard_db_doc, [Doc]), + Acc0 = {length(Shards), fabric_dict:init(Workers, nil)}, + try fabric_util:recv(Workers, #shard.ref, fun handle_db_update/3, Acc0) of + {timeout, _} -> + {error, timeout}; + Else -> + Else + after + rexi_monitor:stop(RexiMon) + end. + +handle_db_update({rexi_DOWN, _, {_, Node}, _}, _Worker, {W, Counters}) -> + New = fabric_dict:filter(fun(S, _) -> S#shard.node =/= Node end, Counters), + maybe_stop(W, New); + +handle_db_update({rexi_EXIT, _Reason}, Worker, {W, Counters}) -> + maybe_stop(W, fabric_dict:erase(Worker, Counters)); + +handle_db_update(conflict, _, _) -> + % just fail when we get any conflicts + {error, conflict}; + +handle_db_update(Msg, Worker, {W, Counters}) -> + maybe_stop(W, fabric_dict:store(Worker, Msg, Counters)). + +maybe_stop(W, Counters) -> + case fabric_dict:any(nil, Counters) of + true -> + {ok, {W, Counters}}; + false -> + case lists:sum([1 || {_, ok} <- Counters]) of + W -> + {stop, ok}; + NumOk when NumOk >= (W div 2 + 1) -> + {stop, accepted}; + _ -> + {error, internal_server_error} + end + end. + +make_document([#shard{dbname=DbName}|_] = Shards, Suffix) -> + {RawOut, ByNodeOut, ByRangeOut} = + lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) -> + Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-", + couch_util:to_hex(<<E:32/integer>>)]), + Node = couch_util:to_binary(N), + {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode), + orddict:append(Range, Node, ByRange)} + end, {[], [], []}, Shards), + #doc{id=DbName, body = {[ + {<<"shard_suffix">>, Suffix}, + {<<"changelog">>, lists:sort(RawOut)}, + {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, + {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} + ]}}. + diff --git a/deps/fabric/src/fabric_db_delete.erl b/deps/fabric/src/fabric_db_delete.erl new file mode 100644 index 00000000..9283d0b2 --- /dev/null +++ b/deps/fabric/src/fabric_db_delete.erl @@ -0,0 +1,95 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_db_delete). +-export([go/2]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +%% @doc Options aren't used at all now in couch on delete but are left here +%% to be consistent with fabric_db_create for possible future use +%% @see couch_server:delete_db +%% +go(DbName, _Options) -> + Shards = mem3:shards(DbName), + % delete doc from shard_db + try delete_shard_db_doc(DbName) of + {ok, ok} -> + ok; + {ok, accepted} -> + accepted; + {ok, not_found} -> + erlang:error(database_does_not_exist, DbName); + Error -> + Error + after + % delete the shard files + fabric_util:submit_jobs(Shards, delete_db, []) + end. + +delete_shard_db_doc(Doc) -> + Shards = [#shard{node=N} || N <- mem3:nodes()], + RexiMon = fabric_util:create_monitors(Shards), + Workers = fabric_util:submit_jobs(Shards, delete_shard_db_doc, [Doc]), + Acc0 = {length(Shards), fabric_dict:init(Workers, nil)}, + try fabric_util:recv(Workers, #shard.ref, fun handle_db_update/3, Acc0) of + {timeout, _} -> + {error, timeout}; + Else -> + Else + after + rexi_monitor:stop(RexiMon) + end. + +handle_db_update({rexi_DOWN, _, {_, Node}, _}, _Worker, {W, Counters}) -> + New = fabric_dict:filter(fun(S, _) -> S#shard.node =/= Node end, Counters), + maybe_stop(W, New); + +handle_db_update({rexi_EXIT, _Reason}, Worker, {W, Counters}) -> + maybe_stop(W, fabric_dict:erase(Worker, Counters)); + +handle_db_update(conflict, _, _) -> + % just fail when we get any conflicts + {error, conflict}; + +handle_db_update(Msg, Worker, {W, Counters}) -> + maybe_stop(W, fabric_dict:store(Worker, Msg, Counters)). + +maybe_stop(W, Counters) -> + case fabric_dict:any(nil, Counters) of + true -> + {ok, {W, Counters}}; + false -> + {Ok,NotFound} = fabric_dict:fold(fun count_replies/3, {0,0}, Counters), + case {Ok + NotFound, Ok, NotFound} of + {W, 0, W} -> + {#shard{dbname=Name}, _} = hd(Counters), + twig:log(warn, "~p not_found ~s", [?MODULE, Name]), + {stop, not_found}; + {W, _, _} -> + {stop, ok}; + {N, M, _} when N >= (W div 2 + 1), M > 0 -> + {stop, accepted}; + _ -> + {error, internal_server_error} + end + end. + +count_replies(_, ok, {Ok, NotFound}) -> + {Ok+1, NotFound}; +count_replies(_, not_found, {Ok, NotFound}) -> + {Ok, NotFound+1}; +count_replies(_, _, Acc) -> + Acc. diff --git a/deps/fabric/src/fabric_db_doc_count.erl b/deps/fabric/src/fabric_db_doc_count.erl new file mode 100644 index 00000000..107f212a --- /dev/null +++ b/deps/fabric/src/fabric_db_doc_count.erl @@ -0,0 +1,68 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_db_doc_count). + +-export([go/1]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, get_doc_count, []), + RexiMon = fabric_util:create_monitors(Shards), + Acc0 = {fabric_dict:init(Workers, nil), 0}, + try + fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) + after + rexi_monitor:stop(RexiMon) + end. + +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> + case fabric_util:remove_down_workers(Counters, NodeRef) of + {ok, NewCounters} -> + {ok, {NewCounters, Acc}}; + error -> + {error, {nodedown, <<"progress not possible">>}} + end; + +handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> + NewCounters = lists:keydelete(Shard, 1, Counters), + case fabric_view:is_progress_possible(NewCounters) of + true -> + {ok, {NewCounters, Acc}}; + false -> + {error, Reason} + end; + +handle_message({ok, Count}, Shard, {Counters, Acc}) -> + case fabric_dict:lookup_element(Shard, Counters) of + undefined -> + % already heard from someone else in this range + {ok, {Counters, Acc}}; + nil -> + C1 = fabric_dict:store(Shard, ok, Counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + case fabric_dict:any(nil, C2) of + true -> + {ok, {C2, Count+Acc}}; + false -> + {stop, Count+Acc} + end + end; +handle_message(_, _, Acc) -> + {ok, Acc}. + diff --git a/deps/fabric/src/fabric_db_info.erl b/deps/fabric/src/fabric_db_info.erl new file mode 100644 index 00000000..63fb44a5 --- /dev/null +++ b/deps/fabric/src/fabric_db_info.erl @@ -0,0 +1,104 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_db_info). + +-export([go/1]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +go(DbName) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, get_db_info, []), + RexiMon = fabric_util:create_monitors(Shards), + Acc0 = {fabric_dict:init(Workers, nil), []}, + try + fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) + after + rexi_monitor:stop(RexiMon) + end. + +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> + case fabric_util:remove_down_workers(Counters, NodeRef) of + {ok, NewCounters} -> + {ok, {NewCounters, Acc}}; + error -> + {error, {nodedown, <<"progress not possible">>}} + end; + +handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> + NewCounters = lists:keydelete(Shard,1,Counters), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, {NewCounters, Acc}}; + false -> + {error, Reason} + end; + +handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) -> + case fabric_dict:lookup_element(Shard, Counters) of + undefined -> + % already heard from someone else in this range + {ok, {Counters, Acc}}; + nil -> + Seq = couch_util:get_value(update_seq, Info), + C1 = fabric_dict:store(Shard, Seq, Counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + case fabric_dict:any(nil, C2) of + true -> + {ok, {C2, [Info|Acc]}}; + false -> + {stop, [ + {db_name,Name}, + {update_seq, fabric_view_changes:pack_seqs(C2)} | + merge_results(lists:flatten([Info|Acc])) + ]} + end + end; +handle_message(_, _, Acc) -> + {ok, Acc}. + +merge_results(Info) -> + Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, + orddict:new(), Info), + orddict:fold(fun + (doc_count, X, Acc) -> + [{doc_count, lists:sum(X)} | Acc]; + (doc_del_count, X, Acc) -> + [{doc_del_count, lists:sum(X)} | Acc]; + (purge_seq, X, Acc) -> + [{purge_seq, lists:sum(X)} | Acc]; + (compact_running, X, Acc) -> + [{compact_running, lists:member(true, X)} | Acc]; + (disk_size, X, Acc) -> + [{disk_size, lists:sum(X)} | Acc]; + (other, X, Acc) -> + [{other, {merge_other_results(X)}} | Acc]; + (disk_format_version, X, Acc) -> + [{disk_format_version, lists:max(X)} | Acc]; + (_, _, Acc) -> + Acc + end, [{instance_start_time, <<"0">>}], Dict). + +merge_other_results(Results) -> + Dict = lists:foldl(fun({Props}, D) -> + lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, D, Props) + end, orddict:new(), Results), + orddict:fold(fun + (data_size, X, Acc) -> + [{data_size, lists:sum(X)} | Acc]; + (_, _, Acc) -> + Acc + end, [], Dict). diff --git a/deps/fabric/src/fabric_db_meta.erl b/deps/fabric/src/fabric_db_meta.erl new file mode 100644 index 00000000..87721555 --- /dev/null +++ b/deps/fabric/src/fabric_db_meta.erl @@ -0,0 +1,49 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_db_meta). + +-export([set_revs_limit/3, set_security/3]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +set_revs_limit(DbName, Limit, Options) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, set_revs_limit, [Limit, Options]), + Waiting = length(Workers) - 1, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of + {ok, ok} -> + ok; + Error -> + Error + end. + +set_security(DbName, SecObj, Options) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, set_security, [SecObj, Options]), + Waiting = length(Workers) - 1, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of + {ok, ok} -> + ok; + Error -> + Error + end. + +handle_message(ok, _, 0) -> + {stop, ok}; +handle_message(ok, _, Waiting) -> + {ok, Waiting - 1}; +handle_message(Error, _, _Waiting) -> + {error, Error}.
\ No newline at end of file diff --git a/deps/fabric/src/fabric_db_update_listener.erl b/deps/fabric/src/fabric_db_update_listener.erl new file mode 100644 index 00000000..e29f3ec7 --- /dev/null +++ b/deps/fabric/src/fabric_db_update_listener.erl @@ -0,0 +1,114 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_db_update_listener). + +-export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +go(Parent, ParentRef, DbName, Timeout) -> + Notifiers = start_update_notifiers(DbName), + MonRefs = lists:usort([{rexi_server, Node} || {Node, _Ref} <- Notifiers]), + RexiMon = rexi_monitor:start(MonRefs), + %% Add calling controller node as rexi end point as this controller will + %% receive messages from it + Workers = [{Parent, ParentRef} | Notifiers], + try + receive_results(Workers, {Workers, Parent, unset}, Timeout) + after + rexi_monitor:stop(RexiMon), + stop_update_notifiers(Notifiers) + end. + +start_update_notifiers(DbName) -> + lists:map(fun(#shard{node=Node, name=Name}) -> + {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})} + end, mem3:shards(DbName)). + +% rexi endpoint +start_update_notifier(DbName) -> + {Caller, Ref} = get(rexi_from), + Fun = fun({_, X}) when X == DbName -> + erlang:send(Caller, {Ref, db_updated}); (_) -> ok end, + Id = {couch_db_update_notifier, make_ref()}, + ok = gen_event:add_sup_handler(couch_db_update, Id, Fun), + receive {gen_event_EXIT, Id, Reason} -> + rexi:reply({gen_event_EXIT, DbName, Reason}) + end. + +stop_update_notifiers(Notifiers) -> + [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers]. + +stop({Pid, Ref}) -> + erlang:send(Pid, {Ref, done}). + +wait_db_updated({Pid, Ref}) -> + erlang:send(Pid, {Ref, get_state}), + receive + Any -> + Any + end. + +receive_results(Workers, State, Timeout) -> + case rexi_utils:recv(Workers, 2, fun handle_message/3, State, + infinity, Timeout) of + {timeout, {NewWorkers, Parent, State1}} -> + erlang:send(Parent, timeout), + State2 = + case State1 of + waiting -> + unset; + Any -> Any + end, + receive_results(NewWorkers, {NewWorkers, Parent, State2}, Timeout); + {_, NewState} -> + {ok, NewState} + end. + + +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Workers, Parent, State}) -> + NewWorkers = lists:filter(fun({_Node, Ref}) -> NodeRef =/= Ref end, Workers), + case NewWorkers of + [] -> + {error, {nodedown, <<"progress not possible">>}}; + _ -> + {ok, {NewWorkers, Parent, State}} + end; +handle_message({rexi_EXIT, Reason}, Worker, {Workers, Parent, State}) -> + NewWorkers = lists:delete(Worker,Workers), + case NewWorkers of + [] -> + {error, Reason}; + _ -> + {ok, {NewWorkers, Parent, State}} + end; +handle_message(db_updated, {_Worker, _From}, {Workers, Parent, waiting}) -> + % propagate message to calling controller + erlang:send(Parent, updated), + {ok, {Workers, Parent, unset}}; +handle_message(db_updated, _Worker, {Workers, Parent, State}) + when State == unset orelse State == updated -> + {ok, {Workers, Parent, updated}}; +handle_message(get_state, {_Worker, _From}, {Workers, Parent, unset}) -> + {ok, {Workers, Parent, waiting}}; +handle_message(get_state, {_Worker, _From}, {Workers, Parent, State}) -> + erlang:send(Parent, State), + {ok, {Workers, Parent, unset}}; +handle_message(done, _, _) -> + {stop, ok}. + + + diff --git a/deps/fabric/src/fabric_dict.erl b/deps/fabric/src/fabric_dict.erl new file mode 100644 index 00000000..cea537ca --- /dev/null +++ b/deps/fabric/src/fabric_dict.erl @@ -0,0 +1,51 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_dict). +-compile(export_all). + +% Instead of ets, let's use an ordered keylist. We'll need to revisit if we +% have >> 100 shards, so a private interface is a good idea. - APK June 2010 + +init(Keys, InitialValue) -> + orddict:from_list([{Key, InitialValue} || Key <- Keys]). + + +decrement_all(Dict) -> + [{K,V-1} || {K,V} <- Dict]. + +store(Key, Value, Dict) -> + orddict:store(Key, Value, Dict). + +erase(Key, Dict) -> + orddict:erase(Key, Dict). + +update_counter(Key, Incr, Dict0) -> + orddict:update_counter(Key, Incr, Dict0). + + +lookup_element(Key, Dict) -> + couch_util:get_value(Key, Dict). + +size(Dict) -> + orddict:size(Dict). + +any(Value, Dict) -> + lists:keymember(Value, 2, Dict). + +filter(Fun, Dict) -> + orddict:filter(Fun, Dict). + +fold(Fun, Acc0, Dict) -> + orddict:fold(Fun, Acc0, Dict). diff --git a/deps/fabric/src/fabric_doc_attachments.erl b/deps/fabric/src/fabric_doc_attachments.erl new file mode 100644 index 00000000..a45ba7c7 --- /dev/null +++ b/deps/fabric/src/fabric_doc_attachments.erl @@ -0,0 +1,131 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_doc_attachments). + +-include("fabric.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +%% couch api calls +-export([receiver/2]). + +receiver(_Req, undefined) -> + <<"">>; +receiver(_Req, {unknown_transfer_encoding, Unknown}) -> + exit({unknown_transfer_encoding, Unknown}); +receiver(Req, chunked) -> + MiddleMan = spawn(fun() -> middleman(Req, chunked) end), + fun(4096, ChunkFun, ok) -> + write_chunks(MiddleMan, ChunkFun) + end; +receiver(_Req, 0) -> + <<"">>; +receiver(Req, Length) when is_integer(Length) -> + maybe_send_continue(Req), + Middleman = spawn(fun() -> middleman(Req, Length) end), + fun() -> + Middleman ! {self(), gimme_data}, + receive {Middleman, Data} -> Data end + end; +receiver(_Req, Length) -> + exit({length_not_integer, Length}). + +%% +%% internal +%% + +maybe_send_continue(#httpd{mochi_req = MochiReq} = Req) -> + case couch_httpd:header_value(Req, "expect") of + undefined -> + ok; + Expect -> + case string:to_lower(Expect) of + "100-continue" -> + MochiReq:start_raw_response({100, gb_trees:empty()}); + _ -> + ok + end + end. + +write_chunks(MiddleMan, ChunkFun) -> + MiddleMan ! {self(), gimme_data}, + receive + {MiddleMan, {0, _Footers}} -> + % MiddleMan ! {self(), done}, + ok; + {MiddleMan, ChunkRecord} -> + ChunkFun(ChunkRecord, ok), + write_chunks(MiddleMan, ChunkFun) + end. + +receive_unchunked_attachment(_Req, 0) -> + ok; +receive_unchunked_attachment(Req, Length) -> + receive {MiddleMan, go} -> + Data = couch_httpd:recv(Req, 0), + MiddleMan ! {self(), Data} + end, + receive_unchunked_attachment(Req, Length - size(Data)). + +middleman(Req, chunked) -> + % spawn a process to actually receive the uploaded data + RcvFun = fun(ChunkRecord, ok) -> + receive {From, go} -> From ! {self(), ChunkRecord} end, ok + end, + Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end), + + % take requests from the DB writers and get data from the receiver + N = erlang:list_to_integer(couch_config:get("cluster","n")), + middleman_loop(Receiver, N, dict:new(), 0, []); + +middleman(Req, Length) -> + Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end), + N = erlang:list_to_integer(couch_config:get("cluster","n")), + middleman_loop(Receiver, N, dict:new(), 0, []). + +middleman_loop(Receiver, N, Counters, Offset, ChunkList) -> + receive {From, gimme_data} -> + % figure out how far along this writer (From) is in the list + {NewCounters, WhichChunk} = case dict:find(From, Counters) of + {ok, I} -> + {dict:update_counter(From, 1, Counters), I}; + error -> + {dict:store(From, 2, Counters), 1} + end, + ListIndex = WhichChunk - Offset, + + % talk to the receiver to get another chunk if necessary + ChunkList1 = if ListIndex > length(ChunkList) -> + Receiver ! {self(), go}, + receive {Receiver, ChunkRecord} -> ChunkList ++ [ChunkRecord] end; + true -> ChunkList end, + + % reply to the writer + From ! {self(), lists:nth(ListIndex, ChunkList1)}, + + % check if we can drop a chunk from the head of the list + SmallestIndex = dict:fold(fun(_, Val, Acc) -> lists:min([Val,Acc]) end, + WhichChunk+1, NewCounters), + Size = dict:size(NewCounters), + + {NewChunkList, NewOffset} = + if Size == N andalso (SmallestIndex - Offset) == 2 -> + {tl(ChunkList1), Offset+1}; + true -> + {ChunkList1, Offset} + end, + middleman_loop(Receiver, N, NewCounters, NewOffset, NewChunkList) + after 10000 -> + ok + end. diff --git a/deps/fabric/src/fabric_doc_missing_revs.erl b/deps/fabric/src/fabric_doc_missing_revs.erl new file mode 100644 index 00000000..2dd04a70 --- /dev/null +++ b/deps/fabric/src/fabric_doc_missing_revs.erl @@ -0,0 +1,90 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_doc_missing_revs). + +-export([go/2, go/3]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + +go(DbName, AllIdsRevs) -> + go(DbName, AllIdsRevs, []). + +go(DbName, AllIdsRevs, Options) -> + Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) -> + Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, + Options]}), + Shard#shard{ref=Ref} + end, group_idrevs_by_shard(DbName, AllIdsRevs)), + ResultDict = dict:from_list([{Id, {{nil,Revs},[]}} || {Id, Revs} <- AllIdsRevs]), + RexiMon = fabric_util:create_monitors(Workers), + Acc0 = {length(Workers), ResultDict, Workers}, + try + fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) + after + rexi_monitor:stop(RexiMon) + end. + +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {_WorkerLen, ResultDict, Workers}) -> + NewWorkers = [W || #shard{node=Node} = W <- Workers, Node =/= NodeRef], + skip_message({fabric_dict:size(NewWorkers), ResultDict, NewWorkers}); +handle_message({rexi_EXIT, _}, Worker, {W, D, Workers}) -> + skip_message({W-1,D,lists:delete(Worker, Workers)}); +handle_message({ok, Results}, _Worker, {1, D0, _}) -> + D = update_dict(D0, Results), + {stop, dict:fold(fun force_reply/3, [], D)}; +handle_message({ok, Results}, Worker, {WaitingCount, D0, Workers}) -> + D = update_dict(D0, Results), + case dict:fold(fun maybe_reply/3, {stop, []}, D) of + continue -> + % still haven't heard about some Ids + {ok, {WaitingCount - 1, D, lists:delete(Worker,Workers)}}; + {stop, FinalReply} -> + % finished, stop the rest of the jobs + fabric_util:cleanup(lists:delete(Worker,Workers)), + {stop, FinalReply} + end. + +force_reply(Id, {{nil,Revs}, Anc}, Acc) -> + % never heard about this ID, assume it's missing + [{Id, Revs, Anc} | Acc]; +force_reply(_, {[], _}, Acc) -> + Acc; +force_reply(Id, {Revs, Anc}, Acc) -> + [{Id, Revs, Anc} | Acc]. + +maybe_reply(_, _, continue) -> + continue; +maybe_reply(_, {{nil, _}, _}, _) -> + continue; +maybe_reply(_, {[], _}, {stop, Acc}) -> + {stop, Acc}; +maybe_reply(Id, {Revs, Anc}, {stop, Acc}) -> + {stop, [{Id, Revs, Anc} | Acc]}. + +group_idrevs_by_shard(DbName, IdsRevs) -> + dict:to_list(lists:foldl(fun({Id, Revs}, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, {Id, Revs}, D1) + end, D0, mem3:shards(DbName,Id)) + end, dict:new(), IdsRevs)). + +update_dict(D0, KVs) -> + lists:foldl(fun({K,V,A}, D1) -> dict:store(K, {V,A}, D1) end, D0, KVs). + +skip_message({0, Dict, _Workers}) -> + {stop, dict:fold(fun force_reply/3, [], Dict)}; +skip_message(Acc) -> + {ok, Acc}. diff --git a/deps/fabric/src/fabric_doc_open.erl b/deps/fabric/src/fabric_doc_open.erl new file mode 100644 index 00000000..9e466b7a --- /dev/null +++ b/deps/fabric/src/fabric_doc_open.erl @@ -0,0 +1,139 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_doc_open). + +-export([go/3]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, Id, Options) -> + Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc, + [Id, [deleted|Options]]), + SuppressDeletedDoc = not lists:member(deleted, Options), + N = mem3:n(DbName), + R = couch_util:get_value(r, Options, integer_to_list(mem3:quorum(DbName))), + RepairOpts = [{r, integer_to_list(N)} | Options], + Acc0 = {Workers, erlang:min(N, list_to_integer(R)), []}, + RexiMon = fabric_util:create_monitors(Workers), + try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, Reply} -> + format_reply(Reply, SuppressDeletedDoc); + {error, needs_repair, Reply} -> + spawn(fabric, open_revs, [DbName, Id, all, RepairOpts]), + format_reply(Reply, SuppressDeletedDoc); + {error, needs_repair} -> + % we couldn't determine the correct reply, so we'll run a sync repair + {ok, Results} = fabric:open_revs(DbName, Id, all, RepairOpts), + case lists:partition(fun({ok, #doc{deleted=Del}}) -> Del end, Results) of + {[], []} -> + {not_found, missing}; + {_DeletedDocs, []} when SuppressDeletedDoc -> + {not_found, deleted}; + {DeletedDocs, []} -> + lists:last(lists:sort(DeletedDocs)); + {_, LiveDocs} -> + lists:last(lists:sort(LiveDocs)) + end; + Error -> + Error + after + rexi_monitor:stop(RexiMon) + end. + +format_reply({ok, #doc{deleted=true}}, true) -> + {not_found, deleted}; +format_reply(Else, _) -> + Else. + +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, {Workers, R, Replies}) -> + NewWorkers = lists:keydelete(NodeRef, #shard.node, Workers), + case NewWorkers of + [] -> + {error, needs_repair}; + _ -> + {ok, {NewWorkers, R, Replies}} + end; +handle_message({rexi_EXIT, _Reason}, Worker, Acc0) -> + skip_message(Worker, Acc0); +handle_message(Reply, Worker, {Workers, R, Replies}) -> + NewReplies = fabric_util:update_counter(Reply, 1, Replies), + case lists:dropwhile(fun({_,{_, Count}}) -> Count < R end, NewReplies) of + [{_,{QuorumReply, _}} | _] -> + fabric_util:cleanup(lists:delete(Worker,Workers)), + case {NewReplies, fabric_util:remove_ancestors(NewReplies, [])} of + {[_], [_]} -> + % complete agreement amongst all copies + {stop, QuorumReply}; + {[_|_], [{_, {QuorumReply, _}}]} -> + % any divergent replies are ancestors of the QuorumReply + {error, needs_repair, QuorumReply}; + _Else -> + % real disagreement amongst the workers, block for the repair + {error, needs_repair} + end; + [] -> + if length(Workers) =:= 1 -> + {error, needs_repair}; + true -> + {ok, {lists:delete(Worker,Workers), R, NewReplies}} + end + end. + +skip_message(_Worker, {Workers, _R, _Replies}) when length(Workers) =:= 1 -> + {error, needs_repair}; +skip_message(Worker, {Workers, R, Replies}) -> + {ok, {lists:delete(Worker,Workers), R, Replies}}. + + +open_doc_test() -> + Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}}, + Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}}, + Bar1 = {ok, #doc{revs = {1,[<<"bar">>]}}}, + Baz1 = {ok, #doc{revs = {1,[<<"baz">>]}}}, + NF = {not_found, missing}, + State0 = {[nil, nil, nil], 2, []}, + State1 = {[nil, nil], 2, [fabric_util:kv(Foo1,1)]}, + State2 = {[nil], 2, [fabric_util:kv(Bar1,1), fabric_util:kv(Foo1,1)]}, + State3 = {[nil], 2, [fabric_util:kv(Foo1,1), fabric_util:kv(Foo2,1)]}, + ?assertEqual({ok, State1}, handle_message(Foo1, nil, State0)), + + % normal case - quorum reached, no disagreement + ?assertEqual({stop, Foo1}, handle_message(Foo1, nil, State1)), + + % 2nd worker disagrees, voting continues + ?assertEqual({ok, State2}, handle_message(Bar1, nil, State1)), + + % 3rd worker resolves voting, but repair is needed + ?assertEqual({error, needs_repair}, handle_message(Foo1, nil, State2)), + + % 2nd worker comes up with descendant of Foo1, voting continues + ?assertEqual({ok, State3}, handle_message(Foo2, nil, State1)), + + % 3rd worker is also a descendant so run repair async + ?assertEqual({error, needs_repair, Foo2}, handle_message(Foo2, nil, + State3)), + + % We only run async repair when every revision is part of the same branch + ?assertEqual({error, needs_repair}, handle_message(Bar1, nil, State3)), + + % not_found is considered to be an ancestor of everybody + {ok, State4} = handle_message(NF, nil, State1), + ?assertEqual({error, needs_repair, Foo1}, handle_message(Foo1, nil, + State4)), + + % 3 distinct edit branches result in quorum failure + ?assertEqual({error, needs_repair}, handle_message(Baz1, nil, State2)). diff --git a/deps/fabric/src/fabric_doc_open_revs.erl b/deps/fabric/src/fabric_doc_open_revs.erl new file mode 100644 index 00000000..395789ca --- /dev/null +++ b/deps/fabric/src/fabric_doc_open_revs.erl @@ -0,0 +1,307 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_doc_open_revs). + +-export([go/4]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-record(state, { + dbname, + worker_count, + workers, + reply_count = 0, + r, + revs, + latest, + replies = [] +}). + +go(DbName, Id, Revs, Options) -> + Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs, + [Id, Revs, Options]), + R = couch_util:get_value(r, Options, integer_to_list(mem3:quorum(DbName))), + State = #state{ + dbname = DbName, + worker_count = length(Workers), + workers = Workers, + r = list_to_integer(R), + revs = Revs, + latest = lists:member(latest, Options), + replies = case Revs of all -> []; Revs -> [{Rev,[]} || Rev <- Revs] end + }, + RexiMon = fabric_util:create_monitors(Workers), + try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of + {ok, {ok, Reply}} -> + {ok, Reply}; + Else -> + Else + after + rexi_monitor:stop(RexiMon) + end. + +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, #state{workers=Workers}=State) -> + NewWorkers = lists:keydelete(NodeRef, #shard.node, Workers), + skip(State#state{workers=NewWorkers}); +handle_message({rexi_EXIT, _}, Worker, #state{workers=Workers}=State) -> + skip(State#state{workers=lists:delete(Worker,Workers)}); +handle_message({ok, RawReplies}, Worker, #state{revs = all} = State) -> + #state{ + dbname = DbName, + reply_count = ReplyCount, + worker_count = WorkerCount, + workers = Workers, + replies = All0, + r = R + } = State, + All = lists:foldl(fun(Reply,D) -> fabric_util:update_counter(Reply,1,D) end, + All0, RawReplies), + Reduced = fabric_util:remove_ancestors(All, []), + Complete = (ReplyCount =:= (WorkerCount - 1)), + QuorumMet = lists:all(fun({_,{_, C}}) -> C >= R end, Reduced), + case Reduced of All when QuorumMet andalso ReplyCount =:= (R-1) -> + Repair = false; + _ -> + Repair = [D || {_,{{ok,D}, _}} <- Reduced] + end, + case maybe_reply(DbName, Reduced, Complete, Repair, R) of + noreply -> + {ok, State#state{replies = All, reply_count = ReplyCount+1, + workers = lists:delete(Worker,Workers)}}; + {reply, FinalReply} -> + fabric_util:cleanup(lists:delete(Worker,Workers)), + {stop, FinalReply} + end; +handle_message({ok, RawReplies0}, Worker, State) -> + % we've got an explicit revision list, but if latest=true the workers may + % return a descendant of the requested revision. Take advantage of the + % fact that revisions are returned in order to keep track. + RawReplies = strip_not_found_missing(RawReplies0), + #state{ + dbname = DbName, + reply_count = ReplyCount, + worker_count = WorkerCount, + workers = Workers, + replies = All0, + r = R + } = State, + All = lists:zipwith(fun({Rev, D}, Reply) -> + if Reply =:= error -> {Rev, D}; true -> + {Rev, fabric_util:update_counter(Reply, 1, D)} + end + end, All0, RawReplies), + Reduced = [fabric_util:remove_ancestors(X, []) || {_, X} <- All], + FinalReplies = [choose_winner(X, R) || X <- Reduced, X =/= []], + Complete = (ReplyCount =:= (WorkerCount - 1)), + case is_repair_needed(All, FinalReplies) of + true -> + Repair = [D || {_,{{ok,D}, _}} <- lists:flatten(Reduced)]; + false -> + Repair = false + end, + case maybe_reply(DbName, FinalReplies, Complete, Repair, R) of + noreply -> + {ok, State#state{replies = All, reply_count = ReplyCount+1, + workers=lists:delete(Worker,Workers)}}; + {reply, FinalReply} -> + fabric_util:cleanup(lists:delete(Worker,Workers)), + {stop, FinalReply} + end. + +skip(#state{revs=all} = State) -> + handle_message({ok, []}, nil, State); +skip(#state{revs=Revs} = State) -> + handle_message({ok, [error || _Rev <- Revs]}, nil, State). + +maybe_reply(_, [], false, _, _) -> + noreply; +maybe_reply(DbName, ReplyDict, Complete, RepairDocs, R) -> + case Complete orelse lists:all(fun({_,{_, C}}) -> C >= R end, ReplyDict) of + true -> + maybe_execute_read_repair(DbName, RepairDocs), + {reply, unstrip_not_found_missing(extract_replies(ReplyDict))}; + false -> + noreply + end. + +extract_replies(Replies) -> + lists:map(fun({_,{Reply,_}}) -> Reply end, Replies). + +choose_winner(Options, R) -> + case lists:dropwhile(fun({_,{_Reply, C}}) -> C < R end, Options) of + [] -> + case [Elem || {_,{{ok, #doc{}}, _}} = Elem <- Options] of + [] -> + hd(Options); + Docs -> + lists:last(lists:sort(Docs)) + end; + [QuorumMet | _] -> + QuorumMet + end. + +% repair needed if any reply other than the winner has been received for a rev +is_repair_needed([], []) -> + false; +is_repair_needed([{_Rev, [Reply]} | Tail1], [Reply | Tail2]) -> + is_repair_needed(Tail1, Tail2); +is_repair_needed(_, _) -> + true. + +maybe_execute_read_repair(_Db, false) -> + ok; +maybe_execute_read_repair(Db, Docs) -> + [#doc{id=Id} | _] = Docs, + Ctx = #user_ctx{roles=[<<"_admin">>]}, + Res = fabric:update_docs(Db, Docs, [replicated_changes, {user_ctx,Ctx}]), + twig:log(notice, "read_repair ~s ~s ~p", [Db, Id, Res]). + +% hackery required so that not_found sorts first +strip_not_found_missing([]) -> + []; +strip_not_found_missing([{{not_found, missing}, Rev} | Rest]) -> + [{not_found, Rev} | strip_not_found_missing(Rest)]; +strip_not_found_missing([Else | Rest]) -> + [Else | strip_not_found_missing(Rest)]. + +unstrip_not_found_missing([]) -> + []; +unstrip_not_found_missing([{not_found, Rev} | Rest]) -> + [{{not_found, missing}, Rev} | unstrip_not_found_missing(Rest)]; +unstrip_not_found_missing([Else | Rest]) -> + [Else | unstrip_not_found_missing(Rest)]. + +all_revs_test() -> + couch_config:start_link([]), + meck:new(fabric), + meck:expect(fabric, dbname, fun(Name) -> Name end), + meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end), + State0 = #state{worker_count = 3, workers=[nil,nil,nil], r = 2, revs = all}, + Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, + Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, + + % an empty worker response does not count as meeting quorum + ?assertMatch( + {ok, #state{workers=[nil,nil]}}, + handle_message({ok, []}, nil, State0) + ), + + ?assertMatch( + {ok, #state{workers=[nil, nil]}}, + handle_message({ok, [Foo1, Bar1]}, nil, State0) + ), + {ok, State1} = handle_message({ok, [Foo1, Bar1]}, nil, State0), + + % the normal case - workers agree + ?assertEqual( + {stop, [Bar1, Foo1]}, + handle_message({ok, [Foo1, Bar1]}, nil, State1) + ), + + % a case where the 2nd worker has a newer Foo - currently we're considering + % Foo to have reached quorum and execute_read_repair() + ?assertEqual( + {stop, [Bar1, Foo2]}, + handle_message({ok, [Foo2, Bar1]}, nil, State1) + ), + + % a case where quorum has not yet been reached for Foo + ?assertMatch( + {ok, #state{}}, + handle_message({ok, [Bar1]}, nil, State1) + ), + {ok, State2} = handle_message({ok, [Bar1]}, nil, State1), + + % still no quorum, but all workers have responded. We include Foo1 in the + % response and execute_read_repair() + ?assertEqual( + {stop, [Bar1, Foo1]}, + handle_message({ok, [Bar1]}, nil, State2) + ), + meck:unload(fabric), + couch_config:stop(). + +specific_revs_test() -> + couch_config:start_link([]), + meck:new(fabric), + meck:expect(fabric, dbname, fun(Name) -> Name end), + meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end), + Revs = [{1,<<"foo">>}, {1,<<"bar">>}, {1,<<"baz">>}], + State0 = #state{ + worker_count = 3, + workers = [nil, nil, nil], + r = 2, + revs = Revs, + latest = false, + replies = [{Rev,[]} || Rev <- Revs] + }, + Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, + Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, + Baz1 = {{not_found, missing}, {1,<<"baz">>}}, + Baz2 = {ok, #doc{revs = {1, [<<"baz">>]}}}, + + ?assertMatch( + {ok, #state{}}, + handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0) + ), + {ok, State1} = handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0), + + % the normal case - workers agree + ?assertEqual( + {stop, [Foo1, Bar1, Baz1]}, + handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1) + ), + + % latest=true, worker responds with Foo2 and we return it + State0L = State0#state{latest = true}, + ?assertMatch( + {ok, #state{}}, + handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L) + ), + {ok, State1L} = handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L), + ?assertEqual( + {stop, [Foo2, Bar1, Baz1]}, + handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State1L) + ), + + % Foo1 is included in the read quorum for Foo2 + ?assertEqual( + {stop, [Foo2, Bar1, Baz1]}, + handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1L) + ), + + % {not_found, missing} is included in the quorum for any found revision + ?assertEqual( + {stop, [Foo2, Bar1, Baz2]}, + handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State1L) + ), + + % a worker failure is skipped + ?assertMatch( + {ok, #state{}}, + handle_message({rexi_EXIT, foo}, nil, State1L) + ), + {ok, State2L} = handle_message({rexi_EXIT, foo}, nil, State1L), + ?assertEqual( + {stop, [Foo2, Bar1, Baz2]}, + handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State2L) + ), + meck:unload(fabric), + couch_config:stop(). diff --git a/deps/fabric/src/fabric_doc_update.erl b/deps/fabric/src/fabric_doc_update.erl new file mode 100644 index 00000000..3ac4e185 --- /dev/null +++ b/deps/fabric/src/fabric_doc_update.erl @@ -0,0 +1,297 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_doc_update). + +-export([go/3]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(_, [], _) -> + {ok, []}; +go(DbName, AllDocs, Opts) -> + validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), + Options = lists:delete(all_or_nothing, Opts), + GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> + Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), + {Shard#shard{ref=Ref}, Docs} + end, group_docs_by_shard(DbName, AllDocs)), + {Workers, _} = lists:unzip(GroupedDocs), + RexiMon = fabric_util:create_monitors(Workers), + W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))), + Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, + dict:from_list([{Doc,[]} || Doc <- AllDocs])}, + try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, {Health, Results}} when Health =:= ok; Health =:= accepted -> + {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]}; + {timeout, Acc} -> + {_, _, W1, _, DocReplDict} = Acc, + {Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []}, + DocReplDict), + {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]}; + Else -> + Else + after + rexi_monitor:stop(RexiMon) + end. + +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) -> + {_, LenDocs, W, GroupedDocs, DocReplyDict} = Acc0, + NewGrpDocs = [X || {#shard{node=N}, _} = X <- GroupedDocs, N =/= NodeRef], + skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict}); + +handle_message({rexi_EXIT, _}, Worker, Acc0) -> + {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0, + NewGrpDocs = lists:keydelete(Worker,1,GrpDocs), + skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict}); +handle_message(internal_server_error, Worker, Acc0) -> + % happens when we fail to load validation functions in an RPC worker + {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0, + NewGrpDocs = lists:keydelete(Worker,1,GrpDocs), + skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict}); +handle_message({ok, Replies}, Worker, Acc0) -> + {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, + {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), + DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), + case {WaitingCount, dict:size(DocReplyDict)} of + {1, _} -> + % last message has arrived, we need to conclude things + {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, + DocReplyDict), + {stop, {Health, Reply}}; + {_, DocCount} -> + % we've got at least one reply for each document, let's take a look + case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of + continue -> + {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}; + {stop, W, FinalReplies} -> + {stop, {ok, FinalReplies}} + end + end; +handle_message({missing_stub, Stub}, _, _) -> + throw({missing_stub, Stub}); +handle_message({not_found, no_db_file} = X, Worker, Acc0) -> + {_, _, _, GroupedDocs, _} = Acc0, + Docs = couch_util:get_value(Worker, GroupedDocs), + handle_message({ok, [X || _D <- Docs]}, Worker, Acc0). + +force_reply(Doc, [], {_, W, Acc}) -> + {error, W, [{Doc, {error, internal_server_error}} | Acc]}; +force_reply(Doc, [FirstReply|_] = Replies, {Health, W, Acc}) -> + case update_quorum_met(W, Replies) of + {true, Reply} -> + {Health, W, [{Doc,Reply} | Acc]}; + false -> + twig:log(warn, "write quorum (~p) failed for ~s", [W, Doc#doc.id]), + case [Reply || {ok, Reply} <- Replies] of + [] -> + % check if all errors are identical, if so inherit health + case lists:all(fun(E) -> E =:= FirstReply end, Replies) of + true -> + {Health, W, [{Doc, FirstReply} | Acc]}; + false -> + {error, W, [{Doc, FirstReply} | Acc]} + end; + [AcceptedRev | _] -> + NewHealth = case Health of ok -> accepted; _ -> Health end, + {NewHealth, W, [{Doc, {accepted,AcceptedRev}} | Acc]} + end + end. + +maybe_reply(_, _, continue) -> + % we didn't meet quorum for all docs, so we're fast-forwarding the fold + continue; +maybe_reply(Doc, Replies, {stop, W, Acc}) -> + case update_quorum_met(W, Replies) of + {true, Reply} -> + {stop, W, [{Doc, Reply} | Acc]}; + false -> + continue + end. + +update_quorum_met(W, Replies) -> + Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end, + orddict:new(), Replies), + case lists:dropwhile(fun({_, Count}) -> Count < W end, Counters) of + [] -> + false; + [{FinalReply, _} | _] -> + {true, FinalReply} + end. + +-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. +group_docs_by_shard(DbName, Docs) -> + dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, Doc, D1) + end, D0, mem3:shards(DbName,Id)) + end, dict:new(), Docs)). + +append_update_replies([], [], DocReplyDict) -> + DocReplyDict; +append_update_replies([Doc|Rest], [], Dict0) -> + % icky, if replicated_changes only errors show up in result + append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0)); +append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> + % TODO what if the same document shows up twice in one update_docs call? + append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). + +skip_message({0, _, W, _, DocReplyDict}) -> + {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict), + {stop, {Health, Reply}}; +skip_message(Acc0) -> + {ok, Acc0}. + +validate_atomic_update(_, _, false) -> + ok; +validate_atomic_update(_DbName, AllDocs, true) -> + % TODO actually perform the validation. This requires some hackery, we need + % to basically extract the prep_and_validate_updates function from couch_db + % and only run that, without actually writing in case of a success. + Error = {not_implemented, <<"all_or_nothing is not supported yet">>}, + PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) -> + case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end, + {{Id, {Pos, RevId}}, Error} + end, AllDocs), + throw({aborted, PreCommitFailures}). + +% eunits +doc_update1_test() -> + Doc1 = #doc{revs = {1,[<<"foo">>]}}, + Doc2 = #doc{revs = {1,[<<"bar">>]}}, + Docs = [Doc1], + Docs2 = [Doc2, Doc1], + Dict = dict:from_list([{Doc,[]} || Doc <- Docs]), + Dict2 = dict:from_list([{Doc,[]} || Doc <- Docs2]), + + Shards = + mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), + GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs), + + + % test for W = 2 + AccW2 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, + Dict}, + + {ok,{WaitingCountW2_1,_,_,_,_}=AccW2_1} = + handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW2), + ?assertEqual(WaitingCountW2_1,2), + {stop, FinalReplyW2 } = + handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW2_1), + ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW2), + + % test for W = 3 + AccW3 = {length(Shards), length(Docs), list_to_integer("3"), GroupedDocs, + Dict}, + + {ok,{WaitingCountW3_1,_,_,_,_}=AccW3_1} = + handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW3), + ?assertEqual(WaitingCountW3_1,2), + + {ok,{WaitingCountW3_2,_,_,_,_}=AccW3_2} = + handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW3_1), + ?assertEqual(WaitingCountW3_2,1), + + {stop, FinalReplyW3 } = + handle_message({ok, [{ok, Doc1}]},lists:nth(3,Shards),AccW3_2), + ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW3), + + % test w quorum > # shards, which should fail immediately + + Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]), + GroupedDocs2 = group_docs_by_shard_hack(<<"foo">>,Shards2,Docs), + + AccW4 = + {length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict}, + Bool = + case handle_message({ok, [{ok, Doc1}]},hd(Shards2),AccW4) of + {stop, _Reply} -> + true; + _ -> false + end, + ?assertEqual(Bool,true), + + % Docs with no replies should end up as {error, internal_server_error} + SA1 = #shard{node=a, range=1}, + SB1 = #shard{node=b, range=1}, + SA2 = #shard{node=a, range=2}, + SB2 = #shard{node=b, range=2}, + GroupedDocs3 = [{SA1,[Doc1]}, {SB1,[Doc1]}, {SA2,[Doc2]}, {SB2,[Doc2]}], + StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2}, + {ok, StW5_1} = handle_message({ok, [{ok, "A"}]}, SA1, StW5_0), + {ok, StW5_2} = handle_message({rexi_EXIT, nil}, SB1, StW5_1), + {ok, StW5_3} = handle_message({rexi_EXIT, nil}, SA2, StW5_2), + {stop, ReplyW5} = handle_message({rexi_EXIT, nil}, SB2, StW5_3), + ?assertEqual( + {error, [{Doc1,{accepted,"A"}},{Doc2,{error,internal_server_error}}]}, + ReplyW5 + ). + + +doc_update2_test() -> + Doc1 = #doc{revs = {1,[<<"foo">>]}}, + Doc2 = #doc{revs = {1,[<<"bar">>]}}, + Docs = [Doc2, Doc1], + Shards = + mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), + GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs), + Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, + dict:from_list([{Doc,[]} || Doc <- Docs])}, + + {ok,{WaitingCount1,_,_,_,_}=Acc1} = + handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0), + ?assertEqual(WaitingCount1,2), + + {ok,{WaitingCount2,_,_,_,_}=Acc2} = + handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1), + ?assertEqual(WaitingCount2,1), + + {stop, Reply} = + handle_message({rexi_EXIT, 1},lists:nth(3,Shards),Acc2), + + ?assertEqual({accepted, [{Doc1,{accepted,Doc2}}, {Doc2,{accepted,Doc1}}]}, + Reply). + +doc_update3_test() -> + Doc1 = #doc{revs = {1,[<<"foo">>]}}, + Doc2 = #doc{revs = {1,[<<"bar">>]}}, + Docs = [Doc2, Doc1], + Shards = + mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), + GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs), + Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, + dict:from_list([{Doc,[]} || Doc <- Docs])}, + + {ok,{WaitingCount1,_,_,_,_}=Acc1} = + handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0), + ?assertEqual(WaitingCount1,2), + + {ok,{WaitingCount2,_,_,_,_}=Acc2} = + handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1), + ?assertEqual(WaitingCount2,1), + + {stop, Reply} = + handle_message({ok, [{ok, Doc1},{ok, Doc2}]},lists:nth(3,Shards),Acc2), + + ?assertEqual({ok, [{Doc1, {ok, Doc2}},{Doc2, {ok,Doc1}}]},Reply). + +% needed for testing to avoid having to start the mem3 application +group_docs_by_shard_hack(_DbName, Shards, Docs) -> + dict:to_list(lists:foldl(fun(#doc{id=_Id} = Doc, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, Doc, D1) + end, D0, Shards) + end, dict:new(), Docs)). diff --git a/deps/fabric/src/fabric_group_info.erl b/deps/fabric/src/fabric_group_info.erl new file mode 100644 index 00000000..27b0f839 --- /dev/null +++ b/deps/fabric/src/fabric_group_info.erl @@ -0,0 +1,100 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_group_info). + +-export([go/2]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, GroupId) when is_binary(GroupId) -> + {ok, DDoc} = fabric:open_doc(DbName, GroupId, []), + go(DbName, DDoc); + +go(DbName, #doc{} = DDoc) -> + Group = couch_view_group:design_doc_to_view_group(DDoc), + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, group_info, [Group]), + RexiMon = fabric_util:create_monitors(Shards), + Acc0 = {fabric_dict:init(Workers, nil), []}, + try + fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) + after + rexi_monitor:stop(RexiMon) + end. + +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> + case fabric_util:remove_down_workers(Counters, NodeRef) of + {ok, NewCounters} -> + {ok, {NewCounters, Acc}}; + error -> + {error, {nodedown, <<"progress not possible">>}} + end; + +handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> + NewCounters = lists:keydelete(Shard,1,Counters), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, {NewCounters, Acc}}; + false -> + {error, Reason} + end; + +handle_message({ok, Info}, Shard, {Counters, Acc}) -> + case fabric_dict:lookup_element(Shard, Counters) of + undefined -> + % already heard from someone else in this range + {ok, {Counters, Acc}}; + nil -> + C1 = fabric_dict:store(Shard, ok, Counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + case fabric_dict:any(nil, C2) of + true -> + {ok, {C2, [Info|Acc]}}; + false -> + {stop, merge_results(lists:flatten([Info|Acc]))} + end + end; +handle_message(_, _, Acc) -> + {ok, Acc}. + +merge_results(Info) -> + Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, + orddict:new(), Info), + orddict:fold(fun + (signature, [X|_], Acc) -> + [{signature, X} | Acc]; + (language, [X|_], Acc) -> + [{language, X} | Acc]; + (disk_size, X, Acc) -> + [{disk_size, lists:sum(X)} | Acc]; + (data_size, X, Acc) -> + [{data_size, lists:sum(X)} | Acc]; + (compact_running, X, Acc) -> + [{compact_running, lists:member(true, X)} | Acc]; + (updater_running, X, Acc) -> + [{updater_running, lists:member(true, X)} | Acc]; + (waiting_commit, X, Acc) -> + [{waiting_commit, lists:member(true, X)} | Acc]; + (waiting_clients, X, Acc) -> + [{waiting_clients, lists:sum(X)} | Acc]; + (update_seq, X, Acc) -> + [{update_seq, lists:sum(X)} | Acc]; + (purge_seq, X, Acc) -> + [{purge_seq, lists:sum(X)} | Acc]; + (_, _, Acc) -> + Acc + end, [], Dict). diff --git a/deps/fabric/src/fabric_rpc.erl b/deps/fabric/src/fabric_rpc.erl new file mode 100644 index 00000000..3f25dfd7 --- /dev/null +++ b/deps/fabric/src/fabric_rpc.erl @@ -0,0 +1,485 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_rpc). + +-export([get_db_info/1, get_doc_count/1, get_update_seq/1]). +-export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3, + update_docs/3]). +-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]). +-export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3, + set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]). + +-include("fabric.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record (view_acc, { + db, + limit, + include_docs, + conflicts, + doc_info = nil, + offset = nil, + total_rows, + reduce_fun = fun couch_db:enum_docs_reduce_to_count/1, + group_level = 0 +}). + +%% rpc endpoints +%% call to with_db will supply your M:F with a #db{} and then remaining args + +all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) -> + {ok, Db} = get_or_create_db(DbName, []), + #view_query_args{ + start_key = StartKey, + start_docid = StartDocId, + end_key = EndKey, + end_docid = EndDocId, + limit = Limit, + skip = Skip, + include_docs = IncludeDocs, + conflicts = Conflicts, + direction = Dir, + inclusive_end = Inclusive, + extra = Extra + } = QueryArgs, + set_io_priority(DbName, Extra), + {ok, Total} = couch_db:get_doc_count(Db), + Acc0 = #view_acc{ + db = Db, + include_docs = IncludeDocs, + conflicts = Conflicts, + limit = Limit+Skip, + total_rows = Total + }, + EndKeyType = if Inclusive -> end_key; true -> end_key_gt end, + Options = [ + {dir, Dir}, + {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end}, + {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end} + ], + {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options), + final_response(Total, Acc#view_acc.offset). + +changes(DbName, Args, StartSeq) -> + erlang:put(io_priority, {interactive, DbName}), + #changes_args{dir=Dir} = Args, + case get_or_create_db(DbName, []) of + {ok, Db} -> + Enum = fun changes_enumerator/2, + Opts = [{dir,Dir}], + Acc0 = {Db, StartSeq, Args}, + try + {ok, {_, LastSeq, _}} = + couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0), + rexi:reply({complete, LastSeq}) + after + couch_db:close(Db) + end; + Error -> + rexi:reply(Error) + end. + +map_view(DbName, DDoc, ViewName, QueryArgs) -> + {ok, Db} = get_or_create_db(DbName, []), + #view_query_args{ + limit = Limit, + skip = Skip, + keys = Keys, + include_docs = IncludeDocs, + conflicts = Conflicts, + stale = Stale, + view_type = ViewType, + extra = Extra + } = QueryArgs, + set_io_priority(DbName, Extra), + {LastSeq, MinSeq} = calculate_seqs(Db, Stale), + Group0 = couch_view_group:design_doc_to_view_group(DDoc), + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + {ok, Group} = couch_view_group:request_group(Pid, MinSeq), + maybe_update_view_group(Pid, LastSeq, Stale), + erlang:monitor(process, Group#group.fd), + View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType), + {ok, Total} = couch_view:get_row_count(View), + Acc0 = #view_acc{ + db = Db, + include_docs = IncludeDocs, + conflicts = Conflicts, + limit = Limit+Skip, + total_rows = Total, + reduce_fun = fun couch_view:reduce_to_count/1 + }, + case Keys of + nil -> + Options = couch_httpd_view:make_key_options(QueryArgs), + {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options); + _ -> + Acc = lists:foldl(fun(Key, AccIn) -> + KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, + Options = couch_httpd_view:make_key_options(KeyArgs), + {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn, + Options), + Out + end, Acc0, Keys) + end, + final_response(Total, Acc#view_acc.offset). + +reduce_view(DbName, Group0, ViewName, QueryArgs) -> + erlang:put(io_priority, {interactive, DbName}), + {ok, Db} = get_or_create_db(DbName, []), + #view_query_args{ + group_level = GroupLevel, + limit = Limit, + skip = Skip, + keys = Keys, + stale = Stale, + extra = Extra + } = QueryArgs, + set_io_priority(DbName, Extra), + GroupFun = group_rows_fun(GroupLevel), + {LastSeq, MinSeq} = calculate_seqs(Db, Stale), + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + {ok, Group} = couch_view_group:request_group(Pid, MinSeq), + maybe_update_view_group(Pid, LastSeq, Stale), + #group{views=Views, def_lang=Lang, fd=Fd} = Group, + erlang:monitor(process, Fd), + {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce), + ReduceView = {reduce, NthRed, Lang, View}, + Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip}, + case Keys of + nil -> + Options0 = couch_httpd_view:make_key_options(QueryArgs), + Options = [{key_group_fun, GroupFun} | Options0], + couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options); + _ -> + lists:map(fun(Key) -> + KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key}, + Options0 = couch_httpd_view:make_key_options(KeyArgs), + Options = [{key_group_fun, GroupFun} | Options0], + couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options) + end, Keys) + end, + rexi:reply(complete). + +calculate_seqs(Db, Stale) -> + LastSeq = couch_db:get_update_seq(Db), + if + Stale == ok orelse Stale == update_after -> + {LastSeq, 0}; + true -> + {LastSeq, LastSeq} + end. + +maybe_update_view_group(GroupPid, LastSeq, update_after) -> + couch_view_group:trigger_group_update(GroupPid, LastSeq); +maybe_update_view_group(_, _, _) -> + ok. + +create_db(DbName) -> + rexi:reply(case couch_server:create(DbName, []) of + {ok, _} -> + ok; + Error -> + Error + end). + +create_shard_db_doc(_, Doc) -> + rexi:reply(mem3_util:write_db_doc(Doc)). + +delete_db(DbName) -> + couch_server:delete(DbName, []). + +delete_shard_db_doc(_, DocId) -> + rexi:reply(mem3_util:delete_db_doc(DocId)). + +get_db_info(DbName) -> + with_db(DbName, [], {couch_db, get_db_info, []}). + +get_doc_count(DbName) -> + with_db(DbName, [], {couch_db, get_doc_count, []}). + +get_update_seq(DbName) -> + with_db(DbName, [], {couch_db, get_update_seq, []}). + +set_security(DbName, SecObj, Options) -> + with_db(DbName, Options, {couch_db, set_security, [SecObj]}). + +set_revs_limit(DbName, Limit, Options) -> + with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). + +open_doc(DbName, DocId, Options) -> + with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). + +open_revs(DbName, Id, Revs, Options) -> + with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}). + +get_missing_revs(DbName, IdRevsList) -> + get_missing_revs(DbName, IdRevsList, []). + +get_missing_revs(DbName, IdRevsList, Options) -> + % reimplement here so we get [] for Ids with no missing revs in response + set_io_priority(DbName, Options), + rexi:reply(case get_or_create_db(DbName, Options) of + {ok, Db} -> + Ids = [Id1 || {Id1, _Revs} <- IdRevsList], + {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> + case FullDocInfoResult of + {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} -> + MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs), + {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)}; + not_found -> + {Id, Revs, []} + end + end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))}; + Error -> + Error + end). + +update_docs(DbName, Docs0, Options) -> + case proplists:get_value(replicated_changes, Options) of + true -> + X = replicated_changes; + _ -> + X = interactive_edit + end, + Docs = make_att_readers(Docs0), + with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). + +group_info(DbName, Group0) -> + {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}), + rexi:reply(couch_view_group:request_group_info(Pid)). + +reset_validation_funs(DbName) -> + case get_or_create_db(DbName, []) of + {ok, #db{main_pid = Pid}} -> + gen_server:cast(Pid, {load_validation_funs, undefined}); + _ -> + ok + end. + +%% +%% internal +%% + +with_db(DbName, Options, {M,F,A}) -> + set_io_priority(DbName, Options), + case get_or_create_db(DbName, Options) of + {ok, Db} -> + rexi:reply(try + apply(M, F, [Db | A]) + catch Exception -> + Exception; + error:Reason -> + twig:log(error, "rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason, + clean_stack()]), + {error, Reason} + end); + Error -> + rexi:reply(Error) + end. + +get_or_create_db(DbName, Options) -> + case couch_db:open_int(DbName, Options) of + {not_found, no_db_file} -> + twig:log(warn, "~p creating ~s", [?MODULE, DbName]), + couch_server:create(DbName, Options); + Else -> + Else + end. + +view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> + % matches for _all_docs and translates #full_doc_info{} -> KV pair + case couch_doc:to_doc_info(FullDocInfo) of + #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI -> + Value = {[{rev,couch_doc:rev_to_str(Rev)}]}, + view_fold({{Id,Id}, Value}, OffsetReds, Acc#view_acc{doc_info=DI}); + #doc_info{revs=[#rev_info{deleted=true}|_]} -> + {ok, Acc} + end; +view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) -> + % calculates the offset for this shard + #view_acc{reduce_fun=Reduce} = Acc, + Offset = Reduce(OffsetReds), + case rexi:sync_reply({total_and_offset, Total, Offset}) of + ok -> + view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset}); + stop -> + exit(normal); + timeout -> + exit(timeout) + end; +view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) -> + % we scanned through limit+skip local rows + {stop, Acc}; +view_fold({{Key,Id}, Value}, _Offset, Acc) -> + % the normal case + #view_acc{ + db = Db, + doc_info = DocInfo, + limit = Limit, + conflicts = Conflicts, + include_docs = IncludeDocs + } = Acc, + case Value of {Props} -> + LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined); + _ -> + LinkedDocs = false + end, + if LinkedDocs -> + % we'll embed this at a higher level b/c the doc may be non-local + Doc = undefined; + IncludeDocs -> + IdOrInfo = if DocInfo =/= nil -> DocInfo; true -> Id end, + Options = if Conflicts -> [conflicts]; true -> [] end, + case couch_db:open_doc(Db, IdOrInfo, Options) of + {not_found, deleted} -> + Doc = null; + {not_found, missing} -> + Doc = undefined; + {ok, Doc0} -> + Doc = couch_doc:to_json_obj(Doc0, []) + end; + true -> + Doc = undefined + end, + case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of + ok -> + {ok, Acc#view_acc{limit=Limit-1}}; + timeout -> + exit(timeout) + end. + +final_response(Total, nil) -> + case rexi:sync_reply({total_and_offset, Total, Total}) of ok -> + rexi:reply(complete); + stop -> + ok; + timeout -> + exit(timeout) + end; +final_response(_Total, _Offset) -> + rexi:reply(complete). + +%% TODO: handle case of bogus group level +group_rows_fun(exact) -> + fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end; +group_rows_fun(0) -> + fun(_A, _B) -> true end; +group_rows_fun(GroupLevel) when is_integer(GroupLevel) -> + fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) -> + lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel); + ({Key1,_}, {Key2,_}) -> + Key1 == Key2 + end. + +reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) -> + {stop, Acc}; +reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) -> + send(null, Red, Acc); +reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) -> + send(Key, Red, Acc); +reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) -> + send(lists:sublist(K, I), Red, Acc); +reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 -> + send(K, Red, Acc). + + +send(Key, Value, #view_acc{limit=Limit} = Acc) -> + case rexi:sync_reply(#view_row{key=Key, value=Value}) of + ok -> + {ok, Acc#view_acc{limit=Limit-1}}; + stop -> + exit(normal); + timeout -> + exit(timeout) + end. + +changes_enumerator(DocInfo, {Db, _Seq, Args}) -> + #changes_args{ + include_docs = IncludeDocs, + filter = Acc, + conflicts = Conflicts + } = Args, + #doc_info{high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo, + case [X || X <- couch_changes:filter(DocInfo, Acc), X /= null] of + [] -> + {ok, {Db, Seq, Args}}; + Results -> + Opts = if Conflicts -> [conflicts]; true -> [] end, + ChangesRow = changes_row(Db, DocInfo, Results, Del, IncludeDocs, Opts), + Go = rexi:sync_reply(ChangesRow), + {Go, {Db, Seq, Args}} + end. + +changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) -> + Doc = doc_member(Db, DI, Opts), + #change{key=Seq, id=Id, value=Results, doc=Doc, deleted=Del}; +changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) -> + #change{key=Seq, id=Id, value=Results, deleted=true}; +changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) -> + #change{key=Seq, id=Id, value=Results}. + +doc_member(Shard, DocInfo, Opts) -> + case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of + {ok, Doc} -> + couch_doc:to_json_obj(Doc, []); + Error -> + Error + end. + +possible_ancestors(_FullInfo, []) -> + []; +possible_ancestors(FullInfo, MissingRevs) -> + #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), + LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], + % Find the revs that are possible parents of this rev + lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> + % this leaf is a "possible ancenstor" of the missing + % revs if this LeafPos lessthan any of the missing revs + case lists:any(fun({MissingPos, _}) -> + LeafPos < MissingPos end, MissingRevs) of + true -> + [{LeafPos, LeafRevId} | Acc]; + false -> + Acc + end + end, [], LeafRevs). + +make_att_readers([]) -> + []; +make_att_readers([#doc{atts=Atts0} = Doc | Rest]) -> + % % go through the attachments looking for 'follows' in the data, + % % replace with function that reads the data from MIME stream. + Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0], + [Doc#doc{atts = Atts} | make_att_readers(Rest)]. + +make_att_reader({follows, Parser}) -> + fun() -> + Parser ! {get_bytes, self()}, + receive {bytes, Bytes} -> Bytes end + end; +make_att_reader(Else) -> + Else. + +clean_stack() -> + lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end, + erlang:get_stacktrace()). + +set_io_priority(DbName, Options) -> + case lists:keyfind(io_priority, 1, Options) of + {io_priority, Pri} -> + erlang:put(io_priority, Pri); + false -> + erlang:put(io_priority, {interactive, DbName}) + end. diff --git a/deps/fabric/src/fabric_util.erl b/deps/fabric/src/fabric_util.erl new file mode 100644 index 00000000..42fe900f --- /dev/null +++ b/deps/fabric/src/fabric_util.erl @@ -0,0 +1,168 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_util). + +-export([submit_jobs/3, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1, + update_counter/3, remove_ancestors/2, create_monitors/1, kv/2, + remove_down_workers/2]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +remove_down_workers(Workers, BadNode) -> + Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end, + NewWorkers = fabric_dict:filter(Filter, Workers), + case fabric_view:is_progress_possible(NewWorkers) of + true -> + {ok, NewWorkers}; + false -> + error + end. + +submit_jobs(Shards, EndPoint, ExtraArgs) -> + lists:map(fun(#shard{node=Node, name=ShardName} = Shard) -> + Ref = rexi:cast(Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}), + Shard#shard{ref = Ref} + end, Shards). + +cleanup(Workers) -> + [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers]. + +recv(Workers, Keypos, Fun, Acc0) -> + Timeout = case couch_config:get("fabric", "request_timeout", "60000") of + "infinity" -> infinity; + N -> list_to_integer(N) + end, + rexi_utils:recv(Workers, Keypos, Fun, Acc0, Timeout, infinity). + + +get_db(DbName) -> + get_db(DbName, []). + +get_db(DbName, Options) -> + % prefer local shards + {Local, Remote} = lists:partition(fun(S) -> S#shard.node =:= node() end, + mem3:shards(DbName)), + % suppress shards from down nodes + Nodes = erlang:nodes(), + Live = [S || #shard{node = N} = S <- Remote, lists:member(N, Nodes)], + % sort the live remote shards so that we don't repeatedly try the same node + get_shard(Local ++ lists:keysort(#shard.name, Live), Options, 100). + +get_shard([], _Opts, _Timeout) -> + erlang:error({internal_server_error, "No DB shards could be opened."}); +get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout) -> + case rpc:call(Node, couch_db, open, [Name, [{timeout, Timeout} | Opts]]) of + {ok, Db} -> + {ok, Db}; + {unauthorized, _} = Error -> + throw(Error); + {badrpc, {'EXIT', {timeout, _}}} -> + get_shard(Rest, Opts, 2*Timeout); + _Else -> + get_shard(Rest, Opts, Timeout) + end. + +error_info({{<<"reduce_overflow_error">>, _} = Error, _Stack}) -> + Error; +error_info({{timeout, _} = Error, _Stack}) -> + Error; +error_info({{Error, Reason}, Stack}) -> + {Error, Reason, Stack}; +error_info({Error, Stack}) -> + {Error, nil, Stack}. + +update_counter(Item, Incr, D) -> + UpdateFun = fun ({Old, Count}) -> {Old, Count + Incr} end, + orddict:update(make_key(Item), UpdateFun, {Item, Incr}, D). + +make_key({ok, L}) when is_list(L) -> + make_key(L); +make_key([]) -> + []; +make_key([{ok, #doc{revs= {Pos,[RevId | _]}}} | Rest]) -> + [{ok, {Pos, RevId}} | make_key(Rest)]; +make_key([{{not_found, missing}, Rev} | Rest]) -> + [{not_found, Rev} | make_key(Rest)]; +make_key({ok, #doc{id=Id,revs=Revs}}) -> + {Id, Revs}; +make_key(Else) -> + Else. + +% this presumes the incoming list is sorted, i.e. shorter revlists come first +remove_ancestors([], Acc) -> + lists:reverse(Acc); +remove_ancestors([{_, {{not_found, _}, Count}} = Head | Tail], Acc) -> + % any document is a descendant + case lists:filter(fun({_,{{ok, #doc{}}, _}}) -> true; (_) -> false end, Tail) of + [{_,{{ok, #doc{}} = Descendant, _}} | _] -> + remove_ancestors(update_counter(Descendant, Count, Tail), Acc); + [] -> + remove_ancestors(Tail, [Head | Acc]) + end; +remove_ancestors([{_,{{ok, #doc{revs = {Pos, Revs}}}, Count}} = Head | Tail], Acc) -> + Descendants = lists:dropwhile(fun + ({_,{{ok, #doc{revs = {Pos2, Revs2}}}, _}}) -> + case lists:nthtail(erlang:min(Pos2 - Pos, length(Revs2)), Revs2) of + [] -> + % impossible to tell if Revs2 is a descendant - assume no + true; + History -> + % if Revs2 is a descendant, History is a prefix of Revs + not lists:prefix(History, Revs) + end + end, Tail), + case Descendants of [] -> + remove_ancestors(Tail, [Head | Acc]); + [{Descendant, _} | _] -> + remove_ancestors(update_counter(Descendant, Count, Tail), Acc) + end; +remove_ancestors([Error | Tail], Acc) -> + remove_ancestors(Tail, [Error | Acc]). + +create_monitors(Shards) -> + MonRefs = lists:usort([{rexi_server, N} || #shard{node=N} <- Shards]), + rexi_monitor:start(MonRefs). + +%% verify only id and rev are used in key. +update_counter_test() -> + Reply = {ok, #doc{id = <<"id">>, revs = <<"rev">>, + body = <<"body">>, atts = <<"atts">>}}, + ?assertEqual([{{<<"id">>,<<"rev">>}, {Reply, 1}}], + update_counter(Reply, 1, [])). + +remove_ancestors_test() -> + Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, + Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, + Bar2 = {not_found, {1,<<"bar">>}}, + ?assertEqual( + [kv(Bar1,1), kv(Foo1,1)], + remove_ancestors([kv(Bar1,1), kv(Foo1,1)], []) + ), + ?assertEqual( + [kv(Bar1,1), kv(Foo2,2)], + remove_ancestors([kv(Bar1,1), kv(Foo1,1), kv(Foo2,1)], []) + ), + ?assertEqual( + [kv(Bar1,2)], + remove_ancestors([kv(Bar2,1), kv(Bar1,1)], []) + ). + +%% test function +kv(Item, Count) -> + {make_key(Item), {Item,Count}}. diff --git a/deps/fabric/src/fabric_view.erl b/deps/fabric/src/fabric_view.erl new file mode 100644 index 00000000..fa2127e7 --- /dev/null +++ b/deps/fabric/src/fabric_view.erl @@ -0,0 +1,362 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_view). + +-export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, + maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1, + extract_view/4, get_shards/2, remove_down_shards/2]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-spec remove_down_shards(#collector{}, node()) -> + {ok, #collector{}} | {error, any()}. +remove_down_shards(Collector, BadNode) -> + #collector{callback=Callback, counters=Counters, user_acc=Acc} = Collector, + case fabric_util:remove_down_workers(Counters, BadNode) of + {ok, NewCounters} -> + {ok, Collector#collector{counters = NewCounters}}; + error -> + Reason = {nodedown, <<"progress not possible">>}, + Callback({error, Reason}, Acc) + end. + +%% @doc looks for a fully covered keyrange in the list of counters +-spec is_progress_possible([{#shard{}, term()}]) -> boolean(). +is_progress_possible([]) -> + false; +is_progress_possible(Counters) -> + Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, + [], Counters), + [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges), + Result = lists:foldl(fun + (_, fail) -> + % we've already declared failure + fail; + (_, complete) -> + % this is the success condition, we can fast-forward + complete; + ({X,_}, Tail) when X > (Tail+1) -> + % gap in the keyrange, we're dead + fail; + ({_,Y}, Tail) -> + case erlang:max(Tail, Y) of + End when (End+1) =:= (2 bsl 31) -> + complete; + Else -> + % the normal condition, adding to the tail + Else + end + end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest), + (Start =:= 0) andalso (Result =:= complete). + +-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) -> + [{#shard{}, any()}]. +remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> + fabric_dict:filter(fun(#shard{range=[X,Y], node=Node, ref=Ref} = Shard, _) -> + if Shard =:= Shard0 -> + % we can't remove ourselves + true; + A < B, X >= A, X < B -> + % lower bound is inside our range + rexi:kill(Node, Ref), + false; + A < B, Y > A, Y =< B -> + % upper bound is inside our range + rexi:kill(Node, Ref), + false; + B < A, X >= A orelse B < A, X < B -> + % target shard wraps the key range, lower bound is inside + rexi:kill(Node, Ref), + false; + B < A, Y > A orelse B < A, Y =< B -> + % target shard wraps the key range, upper bound is inside + rexi:kill(Node, Ref), + false; + true -> + true + end + end, Shards). + +maybe_pause_worker(Worker, From, State) -> + #collector{buffer_size = BufferSize, counters = Counters} = State, + case fabric_dict:lookup_element(Worker, Counters) of + BufferSize -> + State#collector{blocked = [{Worker,From} | State#collector.blocked]}; + _Count -> + gen_server:reply(From, ok), + State + end. + +maybe_resume_worker(Worker, State) -> + #collector{buffer_size = Buffer, counters = C, blocked = B} = State, + case fabric_dict:lookup_element(Worker, C) of + Count when Count < Buffer/2 -> + case couch_util:get_value(Worker, B) of + undefined -> + State; + From -> + gen_server:reply(From, ok), + State#collector{blocked = lists:keydelete(Worker, 1, B)} + end; + _Other -> + State + end. + +maybe_send_row(#collector{limit=0} = State) -> + #collector{counters=Counters, user_acc=AccIn, callback=Callback} = State, + case fabric_dict:any(0, Counters) of + true -> + % we still need to send the total/offset header + {ok, State}; + false -> + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}} + end; +maybe_send_row(State) -> + #collector{ + callback = Callback, + counters = Counters, + skip = Skip, + limit = Limit, + user_acc = AccIn + } = State, + case fabric_dict:any(0, Counters) of + true -> + {ok, State}; + false -> + try get_next_row(State) of + {_, NewState} when Skip > 0 -> + maybe_send_row(NewState#collector{skip=Skip-1}); + {Row, NewState} -> + case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of + {stop, Acc} -> + {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; + {ok, Acc} -> + maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) + end + catch complete -> + {_, Acc} = Callback(complete, AccIn), + {stop, State#collector{user_acc=Acc}} + end + end. + +%% if include_docs=true is used when keys and +%% the values contain "_id" then use the "_id"s +%% to retrieve documents and embed in result +possibly_embed_doc(_State, + #view_row{id=reduced}=Row) -> + Row; +possibly_embed_doc(_State, + #view_row{value=undefined}=Row) -> + Row; +possibly_embed_doc(#collector{db_name=DbName, query_args=Args}, + #view_row{key=_Key, id=_Id, value=Value, doc=_Doc}=Row) -> + #view_query_args{include_docs=IncludeDocs} = Args, + case IncludeDocs andalso is_tuple(Value) of + true -> + {Props} = Value, + Rev0 = couch_util:get_value(<<"_rev">>, Props), + case couch_util:get_value(<<"_id">>,Props) of + undefined -> Row; + IncId -> + % use separate process to call fabric:open_doc + % to not interfere with current call + {Pid, Ref} = spawn_monitor(fun() -> + exit( + case Rev0 of + undefined -> + case fabric:open_doc(DbName, IncId, []) of + {ok, NewDoc} -> + Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])}; + {not_found, _} -> + Row#view_row{doc=null} + end; + Rev0 -> + Rev = couch_doc:parse_rev(Rev0), + case fabric:open_revs(DbName, IncId, [Rev], []) of + {ok, [{ok, NewDoc}]} -> + Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])}; + {ok, [{{not_found, _}, Rev}]} -> + Row#view_row{doc=null} + end + end) end), + receive {'DOWN',Ref,process,Pid, Resp} -> + Resp + end + end; + _ -> Row + end. + + +keydict(nil) -> + undefined; +keydict(Keys) -> + {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end, + {dict:new(),0}, Keys), + Dict. + +%% internal %% + +get_next_row(#collector{rows = []}) -> + throw(complete); +get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> + #collector{ + query_args = #view_query_args{direction=Dir}, + keys = Keys, + rows = RowDict, + os_proc = Proc, + counters = Counters0 + } = St, + {Key, RestKeys} = find_next_key(Keys, Dir, RowDict), + case dict:find(Key, RowDict) of + {ok, Records} -> + NewRowDict = dict:erase(Key, RowDict), + Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) -> + fabric_dict:update_counter(Worker, -1, CountersAcc) + end, Counters0, Records), + Wrapped = [[V] || #view_row{value=V} <- Records], + {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped), + NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters}, + NewState = lists:foldl(fun(#view_row{worker=Worker}, StateAcc) -> + maybe_resume_worker(Worker, StateAcc) + end, NewSt, Records), + {#view_row{key=Key, id=reduced, value=Reduced}, NewState}; + error -> + get_next_row(St#collector{keys=RestKeys}) + end; +get_next_row(State) -> + #collector{rows = [Row|Rest], counters = Counters0} = State, + Worker = Row#view_row.worker, + Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), + NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}), + {Row, NewState#collector{rows = Rest}}. + +find_next_key(nil, Dir, RowDict) -> + case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of + [] -> + throw(complete); + [Key|_] -> + {Key, nil} + end; +find_next_key([], _, _) -> + throw(complete); +find_next_key([Key|Rest], _, _) -> + {Key, Rest}. + +transform_row(#view_row{key=Key, id=reduced, value=Value}) -> + {row, {[{key,Key}, {value,Value}]}}; +transform_row(#view_row{key=Key, id=undefined}) -> + {row, {[{key,Key}, {error,not_found}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> + {row, {[{id,Id}, {key,Key}, {value,Value}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> + {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}}; +transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> + {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}. + + +sort_fun(fwd) -> + fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end; +sort_fun(rev) -> + fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end. + +extract_view(Pid, ViewName, [], _ViewType) -> + twig:log(error, "missing_named_view ~p", [ViewName]), + exit(Pid, kill), + exit(missing_named_view); +extract_view(Pid, ViewName, [View|Rest], ViewType) -> + case lists:member(ViewName, view_names(View, ViewType)) of + true -> + if ViewType == reduce -> + {index_of(ViewName, view_names(View, reduce)), View}; + true -> + View + end; + false -> + extract_view(Pid, ViewName, Rest, ViewType) + end. + +view_names(View, Type) when Type == red_map; Type == reduce -> + [Name || {Name, _} <- View#view.reduce_funs]; +view_names(View, map) -> + View#view.map_names. + +index_of(X, List) -> + index_of(X, List, 1). + +index_of(_X, [], _I) -> + not_found; +index_of(X, [X|_Rest], I) -> + I; +index_of(X, [_|Rest], I) -> + index_of(X, Rest, I+1). + +get_shards(DbName, #view_query_args{stale=Stale}) + when Stale == ok orelse Stale == update_after -> + mem3:ushards(DbName); +get_shards(DbName, #view_query_args{stale=false}) -> + mem3:shards(DbName). + +% unit test +is_progress_possible_test() -> + EndPoint = 2 bsl 31, + T1 = [[0, EndPoint-1]], + ?assertEqual(is_progress_possible(mk_cnts(T1)),true), + T2 = [[0,10],[11,20],[21,EndPoint-1]], + ?assertEqual(is_progress_possible(mk_cnts(T2)),true), + % gap + T3 = [[0,10],[12,EndPoint-1]], + ?assertEqual(is_progress_possible(mk_cnts(T3)),false), + % outside range + T4 = [[1,10],[11,20],[21,EndPoint-1]], + ?assertEqual(is_progress_possible(mk_cnts(T4)),false), + % outside range + T5 = [[0,10],[11,20],[21,EndPoint]], + ?assertEqual(is_progress_possible(mk_cnts(T5)),false). + +remove_overlapping_shards_test() -> + EndPoint = 2 bsl 31, + T1 = [[0,10],[11,20],[21,EndPoint-1]], + Shards = mk_cnts(T1,3), + ?assertEqual(orddict:size( + remove_overlapping_shards(#shard{name=list_to_atom("node-3"), + node=list_to_atom("node-3"), + range=[11,20]}, + Shards)),7). + +mk_cnts(Ranges) -> + Shards = lists:map(fun(Range) -> + #shard{range=Range} + end, + Ranges), + orddict:from_list([{Shard,nil} || Shard <- Shards]). + +mk_cnts(Ranges, NoNodes) -> + orddict:from_list([{Shard,nil} + || Shard <- + lists:flatten(lists:map( + fun(Range) -> + mk_shards(NoNodes,Range,[]) + end, Ranges))] + ). + +mk_shards(0,_Range,Shards) -> + Shards; +mk_shards(NoNodes,Range,Shards) -> + NodeName = list_to_atom("node-" ++ integer_to_list(NoNodes)), + mk_shards(NoNodes-1,Range, + [#shard{name=NodeName, node=NodeName, range=Range} | Shards]). diff --git a/deps/fabric/src/fabric_view_all_docs.erl b/deps/fabric/src/fabric_view_all_docs.erl new file mode 100644 index 00000000..a769aedc --- /dev/null +++ b/deps/fabric/src/fabric_view_all_docs.erl @@ -0,0 +1,181 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_view_all_docs). + +-export([go/4]). +-export([open_doc/3]). % exported for spawn + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> + Workers = fabric_util:submit_jobs(mem3:shards(DbName),all_docs,[QueryArgs]), + BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), + #view_query_args{limit = Limit, skip = Skip} = QueryArgs, + State = #collector{ + query_args = QueryArgs, + callback = Callback, + buffer_size = list_to_integer(BufferSize), + counters = fabric_dict:init(Workers, 0), + skip = Skip, + limit = Limit, + user_acc = Acc0 + }, + RexiMon = fabric_util:create_monitors(Workers), + try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, + State, infinity, 5000) of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + {timeout, NewState} -> + Callback({error, timeout}, NewState#collector.user_acc); + {error, Resp} -> + {ok, Resp} + after + rexi_monitor:stop(RexiMon), + fabric_util:cleanup(Workers) + end; + + +go(DbName, QueryArgs, Callback, Acc0) -> + #view_query_args{ + direction = Dir, + include_docs = IncludeDocs, + limit = Limit0, + skip = Skip0, + keys = Keys + } = QueryArgs, + {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end), + Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) || + Id <- Keys], + Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end, + receive {'DOWN', Ref0, _, _, {ok, TotalRows}} -> + {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0), + {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1), + Callback(complete, Acc2) + after 10000 -> + Callback(timeout, Acc0) + end. + +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> + fabric_view:remove_down_shards(State, NodeRef); + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, + Counters = fabric_dict:erase(Worker, Counters0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters}}; + false -> + {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc), + {error, Resp} + end; + +handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> + #collector{ + callback = Callback, + counters = Counters0, + total_rows = Total0, + offset = Offset0, + user_acc = AccIn + } = State, + case fabric_dict:lookup_element(Worker, Counters0) of + undefined -> + % this worker lost the race with other partition copies, terminate + gen_server:reply(From, stop), + {ok, State}; + 0 -> + gen_server:reply(From, ok), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), + Total = Total0 + Tot, + Offset = Offset0 + Off, + case fabric_dict:any(0, Counters2) of + true -> + {ok, State#collector{ + counters = Counters2, + total_rows = Total, + offset = Offset + }}; + false -> + FinalOffset = erlang:min(Total, Offset+State#collector.skip), + {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), + {Go, State#collector{ + counters = fabric_dict:decrement_all(Counters2), + total_rows = Total, + offset = FinalOffset, + user_acc = Acc + }} + end + end; + +handle_message(#view_row{} = Row, {Worker, From}, State) -> + #collector{query_args = Args, counters = Counters0, rows = Rows0} = State, + Dir = Args#view_query_args.direction, + Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows=Rows, counters=Counters1}, + State2 = fabric_view:maybe_pause_worker(Worker, From, State1), + fabric_view:maybe_send_row(State2); + +handle_message(complete, Worker, State) -> + Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), + fabric_view:maybe_send_row(State#collector{counters = Counters}). + + +merge_row(fwd, Row, Rows) -> + lists:keymerge(#view_row.id, [Row], Rows); +merge_row(rev, Row, Rows) -> + lists:rkeymerge(#view_row.id, [Row], Rows). + +doc_receive_loop([], _, _, _, Acc) -> + {ok, Acc}; +doc_receive_loop(_, _, 0, _, Acc) -> + {ok, Acc}; +doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 -> + receive {'DOWN', Ref, process, Pid, #view_row{}} -> + doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc) + after 10000 -> + timeout + end; +doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) -> + receive {'DOWN', Ref, process, Pid, #view_row{} = Row} -> + case Callback(fabric_view:transform_row(Row), AccIn) of + {ok, Acc} -> + doc_receive_loop(Rest, 0, Limit-1, Callback, Acc); + {stop, Acc} -> + {ok, Acc} + end + after 10000 -> + timeout + end. + +open_doc(DbName, Id, IncludeDocs) -> + Row = case fabric:open_doc(DbName, Id, [deleted]) of + {not_found, missing} -> + Doc = undefined, + #view_row{key=Id}; + {ok, #doc{deleted=true, revs=Revs}} -> + Doc = null, + {RevPos, [RevId|_]} = Revs, + Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]}, + #view_row{key=Id, id=Id, value=Value}; + {ok, #doc{revs=Revs} = Doc0} -> + Doc = couch_doc:to_json_obj(Doc0, []), + {RevPos, [RevId|_]} = Revs, + Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]}, + #view_row{key=Id, id=Id, value=Value} + end, + exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end). diff --git a/deps/fabric/src/fabric_view_changes.erl b/deps/fabric/src/fabric_view_changes.erl new file mode 100644 index 00000000..41347095 --- /dev/null +++ b/deps/fabric/src/fabric_view_changes.erl @@ -0,0 +1,334 @@ +% Copyright 2012 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_view_changes). + +-export([go/5, pack_seqs/1]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-import(fabric_db_update_listener, [wait_db_updated/1, stop/1]). + +go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse + Feed == "longpoll" -> + Args = make_changes_args(Options), + Since = get_start_seq(DbName, Args), + case validate_start_seq(DbName, Since) of + ok -> + {ok, Acc} = Callback(start, Acc0), + {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback), + Ref = make_ref(), + Parent = self(), + UpdateListener = {spawn_link(fabric_db_update_listener, go, + [Parent, Ref, DbName, Timeout]), + Ref}, + try + keep_sending_changes( + DbName, + Args, + Callback, + Since, + Acc, + Timeout, + UpdateListener + ) + after + stop(UpdateListener) + end; + Error -> + Callback(Error, Acc0) + end; + +go(DbName, "normal", Options, Callback, Acc0) -> + Args = make_changes_args(Options), + Since = get_start_seq(DbName, Args), + case validate_start_seq(DbName, Since) of + ok -> + {ok, Acc} = Callback(start, Acc0), + {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes( + DbName, + Args, + Callback, + Since, + Acc, + 5000 + ), + Callback({stop, pack_seqs(Seqs)}, AccOut); + Error -> + Callback(Error, Acc0) + end. + +keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen) -> + #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args, + {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout), + #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector, + LastSeq = pack_seqs(NewSeqs), + if Limit > Limit2, Feed == "longpoll" -> + Callback({stop, LastSeq}, AccOut); + true -> + case {Heartbeat, wait_db_updated(UpListen)} of + {undefined, timeout} -> + Callback({stop, LastSeq}, AccOut); + _ -> + {ok, AccTimeout} = Callback(timeout, AccOut), + keep_sending_changes( + DbName, + Args#changes_args{limit=Limit2}, + Callback, + LastSeq, + AccTimeout, + Timeout, + UpListen + ) + end + end. + +send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) -> + AllShards = mem3:shards(DbName), + Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) -> + case lists:member(Shard, AllShards) of + true -> + Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}), + [{Shard#shard{ref = Ref}, Seq}]; + false -> + % Find some replacement shards to cover the missing range + % TODO It's possible in rare cases of shard merging to end up + % with overlapping shard ranges from this technique + lists:map(fun(#shard{name=Name2, node=N2} = NewShard) -> + Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}), + {NewShard#shard{ref = Ref}, 0} + end, find_replacement_shards(Shard, AllShards)) + end + end, unpack_seqs(PackedSeqs, DbName)), + {Workers, _} = lists:unzip(Seqs), + RexiMon = fabric_util:create_monitors(Workers), + State = #collector{ + query_args = ChangesArgs, + callback = Callback, + counters = orddict:from_list(Seqs), + user_acc = AccIn, + limit = ChangesArgs#changes_args.limit, + rows = Seqs % store sequence positions instead + }, + %% TODO: errors need to be handled here + try + receive_results(Workers, State, Timeout, Callback, AccIn) + after + rexi_monitor:stop(RexiMon), + fabric_util:cleanup(Workers) + end. + +receive_results(Workers, State, Timeout, Callback, AccIn) -> + case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State, + infinity, Timeout) of + {timeout, NewState0} -> + {ok, AccOut} = Callback(timeout, NewState0#collector.user_acc), + NewState = NewState0#collector{user_acc = AccOut}, + receive_results(Workers, NewState, Timeout, Callback, AccOut); + {_, NewState} -> + {ok, NewState} + end. + +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, nil, State) -> + fabric_view:remove_down_shards(State, NodeRef); + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + #collector{ + callback=Callback, + counters=Counters0, + rows = Seqs0, + user_acc=Acc + } = State, + Counters = fabric_dict:erase(Worker, Counters0), + Seqs = fabric_dict:erase(Worker, Seqs0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters, rows=Seqs}}; + false -> + {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc), + {error, Resp} + end; + +handle_message(_, _, #collector{limit=0} = State) -> + {stop, State}; + +handle_message(#change{key=Seq} = Row0, {Worker, From}, St) -> + #collector{ + query_args = #changes_args{include_docs=IncludeDocs}, + callback = Callback, + counters = S0, + limit = Limit, + user_acc = AccIn + } = St, + case fabric_dict:lookup_element(Worker, S0) of + undefined -> + % this worker lost the race with other partition copies, terminate it + gen_server:reply(From, stop), + {ok, St}; + _ -> + S1 = fabric_dict:store(Worker, Seq, S0), + S2 = fabric_view:remove_overlapping_shards(Worker, S1), + Row = Row0#change{key = pack_seqs(S2)}, + {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn), + gen_server:reply(From, Go), + {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}} + end; + +handle_message({complete, EndSeq}, Worker, State) -> + #collector{ + counters = S0, + total_rows = Completed % override + } = State, + case fabric_dict:lookup_element(Worker, S0) of + undefined -> + {ok, State}; + _ -> + S1 = fabric_dict:store(Worker, EndSeq, S0), + % unlikely to have overlaps here, but possible w/ filters + S2 = fabric_view:remove_overlapping_shards(Worker, S1), + NewState = State#collector{counters=S2, total_rows=Completed+1}, + case fabric_dict:size(S2) =:= (Completed+1) of + true -> + {stop, NewState}; + false -> + {ok, NewState} + end + end. + +make_changes_args(#changes_args{style=Style, filter=undefined}=Args) -> + Args#changes_args{filter = Style}; +make_changes_args(Args) -> + Args. + +get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) -> + Since; +get_start_seq(DbName, #changes_args{dir=rev}) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, get_update_seq, []), + {ok, Since} = fabric_util:recv(Workers, #shard.ref, + fun collect_update_seqs/3, fabric_dict:init(Workers, -1)), + Since. + +collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) -> + case fabric_dict:lookup_element(Shard, Counters) of + undefined -> + % already heard from someone else in this range + {ok, Counters}; + -1 -> + C1 = fabric_dict:store(Shard, Seq, Counters), + C2 = fabric_view:remove_overlapping_shards(Shard, C1), + case fabric_dict:any(-1, C2) of + true -> + {ok, C2}; + false -> + {stop, pack_seqs(C2)} + end + end. + +pack_seqs(Workers) -> + SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers], + SeqSum = lists:sum(element(2, lists:unzip(Workers))), + Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])), + [SeqSum, Opaque]. + +unpack_seqs(0, DbName) -> + fabric_dict:init(mem3:shards(DbName), 0); + +unpack_seqs("0", DbName) -> + fabric_dict:init(mem3:shards(DbName), 0); + +unpack_seqs([_SeqNum, Opaque], DbName) -> + do_unpack_seqs(Opaque, DbName); + +unpack_seqs(Packed, DbName) -> + NewPattern = "^\\[[0-9]+,\"(?<opaque>.*)\"\\]$", + OldPattern = "^([0-9]+-)?(?<opaque>.*)$", + Options = [{capture, [opaque], binary}], + Opaque = case re:run(Packed, NewPattern, Options) of + {match, Match} -> + Match; + nomatch -> + {match, Match} = re:run(Packed, OldPattern, Options), + Match + end, + do_unpack_seqs(Opaque, DbName). + +do_unpack_seqs(Opaque, DbName) -> + % TODO relies on internal structure of fabric_dict as keylist + lists:map(fun({Node, [A,B], Seq}) -> + Match = #shard{node=Node, range=[A,B], dbname=DbName, _ = '_'}, + case ets:match_object(partitions, Match) of + [Shard] -> + {Shard, Seq}; + [] -> + {Match, Seq} % will be replaced in find_replacement_shards + end + end, binary_to_term(couch_util:decodeBase64Url(Opaque))). + +changes_row(#change{key=Seq, id=Id, value=Value, deleted=true, doc=Doc}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, Doc}]}}; +changes_row(#change{key=Seq, id=Id, value=Value, deleted=true}, false) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}}; +changes_row(#change{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}}; +changes_row(#change{key=Seq, id=Id, value=Value, doc=Doc}, true) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}}; +changes_row(#change{key=Seq, id=Id, value=Value}, false) -> + {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}. + +find_replacement_shards(#shard{range=Range}, AllShards) -> + % TODO make this moar betta -- we might have split or merged the partition + [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. + +validate_start_seq(DbName, Seq) -> + try unpack_seqs(Seq, DbName) of _Any -> + ok + catch _:_ -> + Reason = <<"Malformed sequence supplied in 'since' parameter.">>, + {error, {bad_request, Reason}} + end. + +unpack_seqs_test() -> + ets:new(partitions, [named_table]), + + % BigCouch 0.3 style. + assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" + "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" + "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"), + + % BigCouch 0.4 style. + assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" + "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" + "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]), + + % BigCouch 0.4 style (as string). + assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND" + "LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee" + "C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"), + + % with internal hypen + assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ" + "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8" + "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"), + assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ" + "5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8" + "VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]), + + ets:delete(partitions). + +assert_shards(Packed) -> + ?assertMatch([{#shard{},_}|_], unpack_seqs(Packed, <<"foo">>)). diff --git a/deps/fabric/src/fabric_view_map.erl b/deps/fabric/src/fabric_view_map.erl new file mode 100644 index 00000000..96741a8e --- /dev/null +++ b/deps/fabric/src/fabric_view_map.erl @@ -0,0 +1,151 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_view_map). + +-export([go/6]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> + {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), + go(DbName, DDoc, View, Args, Callback, Acc0); + +go(DbName, DDoc, View, Args, Callback, Acc0) -> + Shards = fabric_view:get_shards(DbName, Args), + Workers = fabric_util:submit_jobs(Shards, map_view, [DDoc, View, Args]), + BufferSize = couch_config:get("fabric", "map_buffer_size", "2"), + #view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args, + State = #collector{ + db_name=DbName, + query_args = Args, + callback = Callback, + buffer_size = list_to_integer(BufferSize), + counters = fabric_dict:init(Workers, 0), + skip = Skip, + limit = Limit, + keys = fabric_view:keydict(Keys), + sorted = Args#view_query_args.sorted, + user_acc = Acc0 + }, + RexiMon = fabric_util:create_monitors(Workers), + try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, + State, infinity, 1000 * 60 * 60) of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + {timeout, NewState} -> + Callback({error, timeout}, NewState#collector.user_acc); + {error, Resp} -> + {ok, Resp} + after + rexi_monitor:stop(RexiMon), + fabric_util:cleanup(Workers) + end. + +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> + fabric_view:remove_down_shards(State, NodeRef); + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, + Counters = fabric_dict:erase(Worker, Counters0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters}}; + false -> + {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc), + {error, Resp} + end; + +handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) -> + #collector{ + callback = Callback, + counters = Counters0, + total_rows = Total0, + offset = Offset0, + user_acc = AccIn + } = State, + case fabric_dict:lookup_element(Worker, Counters0) of + undefined -> + % this worker lost the race with other partition copies, terminate + gen_server:reply(From, stop), + {ok, State}; + 0 -> + gen_server:reply(From, ok), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1), + Total = Total0 + Tot, + Offset = Offset0 + Off, + case fabric_dict:any(0, Counters2) of + true -> + {ok, State#collector{ + counters = Counters2, + total_rows = Total, + offset = Offset + }}; + false -> + FinalOffset = erlang:min(Total, Offset+State#collector.skip), + {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn), + {Go, State#collector{ + counters = fabric_dict:decrement_all(Counters2), + total_rows = Total, + offset = FinalOffset, + user_acc = Acc + }} + end + end; + +handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) -> + #collector{callback=Callback} = State, + {_, Acc} = Callback(complete, State#collector.user_acc), + {stop, State#collector{user_acc=Acc}}; + +handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) -> + #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St, + {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn), + gen_server:reply(From, ok), + {Go, St#collector{user_acc=Acc, limit=Limit-1}}; + +handle_message(#view_row{} = Row, {Worker, From}, State) -> + #collector{ + query_args = #view_query_args{direction=Dir}, + counters = Counters0, + rows = Rows0, + keys = KeyDict + } = State, + Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0), + Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), + State1 = State#collector{rows=Rows, counters=Counters1}, + State2 = fabric_view:maybe_pause_worker(Worker, From, State1), + fabric_view:maybe_send_row(State2); + +handle_message(complete, Worker, State) -> + Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters), + fabric_view:maybe_send_row(State#collector{counters = Counters}). + +merge_row(fwd, undefined, Row, Rows) -> + lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> + couch_view:less_json([KeyA, IdA], [KeyB, IdB]) + end, [Row], Rows); +merge_row(rev, undefined, Row, Rows) -> + lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) -> + couch_view:less_json([KeyB, IdB], [KeyA, IdA]) + end, [Row], Rows); +merge_row(_, KeyDict, Row, Rows) -> + lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) -> + if A =:= B -> IdA < IdB; true -> + dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict) + end + end, [Row], Rows). diff --git a/deps/fabric/src/fabric_view_reduce.erl b/deps/fabric/src/fabric_view_reduce.erl new file mode 100644 index 00000000..58438573 --- /dev/null +++ b/deps/fabric/src/fabric_view_reduce.erl @@ -0,0 +1,114 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_view_reduce). + +-export([go/6]). + +-include("fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) -> + {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), + go(DbName, DDoc, View, Args, Callback, Acc0); + +go(DbName, DDoc, VName, Args, Callback, Acc0) -> + #group{def_lang=Lang, views=Views} = Group = + couch_view_group:design_doc_to_view_group(DDoc), + {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce), + {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs), + Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) -> + Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}), + Shard#shard{ref = Ref} + end, fabric_view:get_shards(DbName, Args)), + RexiMon = fabric_util:create_monitors(Workers), + BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"), + #view_query_args{limit = Limit, skip = Skip} = Args, + State = #collector{ + db_name = DbName, + query_args = Args, + callback = Callback, + buffer_size = list_to_integer(BufferSize), + counters = fabric_dict:init(Workers, 0), + keys = Args#view_query_args.keys, + skip = Skip, + limit = Limit, + lang = Group#group.def_lang, + os_proc = couch_query_servers:get_os_process(Lang), + reducer = RedSrc, + rows = dict:new(), + user_acc = Acc0 + }, + try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, + State, infinity, 1000 * 60 * 60) of + {ok, NewState} -> + {ok, NewState#collector.user_acc}; + {timeout, NewState} -> + Callback({error, timeout}, NewState#collector.user_acc); + {error, Resp} -> + {ok, Resp} + after + rexi_monitor:stop(RexiMon), + fabric_util:cleanup(Workers), + catch couch_query_servers:ret_os_process(State#collector.os_proc) + end. + +handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> + fabric_view:remove_down_shards(State, NodeRef); + +handle_message({rexi_EXIT, Reason}, Worker, State) -> + #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State, + Counters = fabric_dict:erase(Worker, Counters0), + case fabric_view:is_progress_possible(Counters) of + true -> + {ok, State#collector{counters = Counters}}; + false -> + {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc), + {error, Resp} + end; + +handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> + #collector{counters = Counters0, rows = Rows0} = State, + case fabric_dict:lookup_element(Worker, Counters0) of + undefined -> + % this worker lost the race with other partition copies, terminate it + gen_server:reply(From, stop), + {ok, State}; + _ -> + Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0), + C1 = fabric_dict:update_counter(Worker, 1, Counters0), + % TODO time this call, if slow don't do it every time + C2 = fabric_view:remove_overlapping_shards(Worker, C1), + State1 = State#collector{rows=Rows, counters=C2}, + State2 = fabric_view:maybe_pause_worker(Worker, From, State1), + fabric_view:maybe_send_row(State2) + end; + +handle_message(complete, Worker, State) -> + C1 = fabric_dict:update_counter(Worker, 1, State#collector.counters), + C2 = fabric_view:remove_overlapping_shards(Worker, C1), + fabric_view:maybe_send_row(State#collector{counters = C2}). + +complete_worker_test() -> + Shards = + mem3_util:create_partition_map("foo",3,3,[node(),node(),node()]), + Workers = lists:map(fun(#shard{} = Shard) -> + Ref = make_ref(), + Shard#shard{ref = Ref} + end, + Shards), + State = #collector{counters=fabric_dict:init(Workers,0)}, + {ok, NewState} = handle_message(complete, lists:nth(2,Workers), State), + ?assertEqual(orddict:size(NewState#collector.counters),length(Workers) - 2). |