diff options
Diffstat (limited to 'deps/fabric/src/fabric.erl')
-rw-r--r-- | deps/fabric/src/fabric.erl | 460 |
1 files changed, 460 insertions, 0 deletions
diff --git a/deps/fabric/src/fabric.erl b/deps/fabric/src/fabric.erl new file mode 100644 index 00000000..a8e3b2ea --- /dev/null +++ b/deps/fabric/src/fabric.erl @@ -0,0 +1,460 @@ +% Copyright 2010 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric). + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(ADMIN_CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>]}}). + +% DBs +-export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1, + delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3, + set_security/3, get_revs_limit/1, get_security/1, get_security/2]). + +% Documents +-export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3, + update_doc/3, update_docs/3, purge_docs/2, att_receiver/2]). + +% Views +-export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6, + get_view_group_info/2]). + +% miscellany +-export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0, + cleanup_index_files/1]). + +-include("fabric.hrl"). + +-type dbname() :: (iodata() | #db{}). +-type docid() :: iodata(). +-type revision() :: {integer(), binary()}. +-type callback() :: fun((any(), any()) -> {ok | stop, any()}). +-type json_obj() :: {[{binary() | atom(), any()}]}. +-type option() :: atom() | {atom(), any()}. + +%% db operations +%% @equiv all_dbs(<<>>) +all_dbs() -> + all_dbs(<<>>). + +%% @doc returns a list of all database names +-spec all_dbs(Prefix::iodata()) -> {ok, [binary()]}. +all_dbs(Prefix) when is_binary(Prefix) -> + Length = byte_size(Prefix), + MatchingDbs = ets:foldl(fun(#shard{dbname=DbName}, Acc) -> + case DbName of + <<Prefix:Length/binary, _/binary>> -> + [DbName | Acc]; + _ -> + Acc + end + end, [], partitions), + {ok, lists:usort(MatchingDbs)}; + +%% @equiv all_dbs(list_to_binary(Prefix)) +all_dbs(Prefix) when is_list(Prefix) -> + all_dbs(list_to_binary(Prefix)). + +%% @doc returns a property list of interesting properties +%% about the database such as `doc_count', `disk_size', +%% etc. +-spec get_db_info(dbname()) -> + {ok, [ + {instance_start_time, binary()} | + {doc_count, non_neg_integer()} | + {doc_del_count, non_neg_integer()} | + {purge_seq, non_neg_integer()} | + {compact_running, boolean()} | + {disk_size, non_neg_integer()} | + {disk_format_version, pos_integer()} + ]}. +get_db_info(DbName) -> + fabric_db_info:go(dbname(DbName)). + +%% @doc the number of docs in a database +-spec get_doc_count(dbname()) -> {ok, non_neg_integer()}. +get_doc_count(DbName) -> + fabric_db_doc_count:go(dbname(DbName)). + +%% @equiv create_db(DbName, []) +create_db(DbName) -> + create_db(DbName, []). + +%% @doc creates a database with the given name. +%% +%% Options can include values for q and n, +%% for example `{q, "8"}' and `{n, "3"}', which +%% control how many shards to split a database into +%% and how many nodes each doc is copied to respectively. +%% +-spec create_db(dbname(), [option()]) -> ok | accepted | {error, atom()}. +create_db(DbName, Options) -> + fabric_db_create:go(dbname(DbName), opts(Options)). + +%% @equiv delete_db([]) +delete_db(DbName) -> + delete_db(DbName, []). + +%% @doc delete a database +-spec delete_db(dbname(), [option()]) -> ok | accepted | {error, atom()}. +delete_db(DbName, Options) -> + fabric_db_delete:go(dbname(DbName), opts(Options)). + +%% @doc provide an upper bound for the number of tracked document revisions +-spec set_revs_limit(dbname(), pos_integer(), [option()]) -> ok. +set_revs_limit(DbName, Limit, Options) when is_integer(Limit), Limit > 0 -> + fabric_db_meta:set_revs_limit(dbname(DbName), Limit, opts(Options)). + +%% @doc retrieves the maximum number of document revisions +-spec get_revs_limit(dbname()) -> pos_integer() | no_return(). +get_revs_limit(DbName) -> + {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]), + try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end. + +%% @doc sets the readers/writers/admin permissions for a database +-spec set_security(dbname(), SecObj::json_obj(), [option()]) -> ok. +set_security(DbName, SecObj, Options) -> + fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)). + +get_security(DbName) -> + get_security(DbName, [?ADMIN_CTX]). + +%% @doc retrieve the security object for a database +-spec get_security(dbname()) -> json_obj() | no_return(). +get_security(DbName, Options) -> + {ok, Db} = fabric_util:get_db(dbname(DbName), opts(Options)), + try couch_db:get_security(Db) after catch couch_db:close(Db) end. + +% doc operations + +%% @doc retrieve the doc with a given id +-spec open_doc(dbname(), docid(), [option()]) -> + {ok, #doc{}} | + {not_found, missing | deleted} | + {timeout, any()} | + {error, any()} | + {error, any() | any()}. +open_doc(DbName, Id, Options) -> + fabric_doc_open:go(dbname(DbName), docid(Id), opts(Options)). + +%% @doc retrieve a collection of revisions, possible all +-spec open_revs(dbname(), docid(), [revision()] | all, [option()]) -> + {ok, [{ok, #doc{}} | {{not_found,missing}, revision()}]} | + {timeout, any()} | + {error, any()} | + {error, any(), any()}. +open_revs(DbName, Id, Revs, Options) -> + fabric_doc_open_revs:go(dbname(DbName), docid(Id), Revs, opts(Options)). + +%% @equiv get_missing_revs(DbName, IdsRevs, []) +get_missing_revs(DbName, IdsRevs) -> + get_missing_revs(DbName, IdsRevs, []). + +%% @doc retrieve missing revisions for a list of `{Id, Revs}' +-spec get_missing_revs(dbname(),[{docid(), [revision()]}], [option()]) -> + {ok, [{docid(), any(), [any()]}]}. +get_missing_revs(DbName, IdsRevs, Options) when is_list(IdsRevs) -> + Sanitized = [idrevs(IdR) || IdR <- IdsRevs], + fabric_doc_missing_revs:go(dbname(DbName), Sanitized, opts(Options)). + +%% @doc update a single doc +%% @equiv update_docs(DbName,[Doc],Options) +-spec update_doc(dbname(), #doc{}, [option()]) -> + {ok, any()} | any(). +update_doc(DbName, Doc, Options) -> + case update_docs(DbName, [Doc], opts(Options)) of + {ok, [{ok, NewRev}]} -> + {ok, NewRev}; + {accepted, [{accepted, NewRev}]} -> + {accepted, NewRev}; + {ok, [{{_Id, _Rev}, Error}]} -> + throw(Error); + {ok, [Error]} -> + throw(Error); + {ok, []} -> + % replication success + #doc{revs = {Pos, [RevId | _]}} = doc(Doc), + {ok, {Pos, RevId}} + end. + +%% @doc update a list of docs +-spec update_docs(dbname(), [#doc{}], [option()]) -> + {ok, any()} | any(). +update_docs(DbName, Docs, Options) -> + try + fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)) of + {ok, Results} -> + {ok, Results}; + {accepted, Results} -> + {accepted, Results}; + Error -> + throw(Error) + catch {aborted, PreCommitFailures} -> + {aborted, PreCommitFailures} + end. + +purge_docs(_DbName, _IdsRevs) -> + not_implemented. + +%% @doc spawns a process to upload attachment data and +%% returns a function that shards can use to communicate +%% with the spawned middleman process +-spec att_receiver(#httpd{}, Length :: undefined | chunked | pos_integer() | + {unknown_transfer_encoding, any()}) -> + function() | binary(). +att_receiver(Req, Length) -> + fabric_doc_attachments:receiver(Req, Length). + +%% @doc retrieves all docs. Additional query parameters, such as `limit', +%% `start_key' and `end_key', `descending', and `include_docs', can +%% also be passed to further constrain the query. See <a href= +%% "http://wiki.apache.org/couchdb/HTTP_Document_API#All_Documents"> +%% all_docs</a> for details +-spec all_docs(dbname(), callback(), [] | tuple(), #view_query_args{}) -> + {ok, [any()]}. +all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs) when + is_function(Callback, 2) -> + fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0); + +%% @doc convenience function that takes a keylist rather than a record +%% @equiv all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs)) +all_docs(DbName, Callback, Acc0, QueryArgs) -> + all_docs(DbName, Callback, Acc0, kl_to_query_args(QueryArgs)). + + +-spec changes(dbname(), callback(), any(), #changes_args{} | [{atom(),any()}]) -> + {ok, any()}. +changes(DbName, Callback, Acc0, #changes_args{}=Options) -> + Feed = Options#changes_args.feed, + fabric_view_changes:go(dbname(DbName), Feed, Options, Callback, Acc0); + +%% @doc convenience function, takes keylist instead of record +%% @equiv changes(DbName, Callback, Acc0, kl_to_changes_args(Options)) +changes(DbName, Callback, Acc0, Options) -> + changes(DbName, Callback, Acc0, kl_to_changes_args(Options)). + +%% @equiv query_view(DbName, DesignName, ViewName, #view_query_args{}) +query_view(DbName, DesignName, ViewName) -> + query_view(DbName, DesignName, ViewName, #view_query_args{}). + +%% @equiv query_view(DbName, DesignName, +%% ViewName, fun default_callback/2, [], QueryArgs) +query_view(DbName, DesignName, ViewName, QueryArgs) -> + Callback = fun default_callback/2, + query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs). + +%% @doc execute a given view. +%% There are many additional query args that can be passed to a view, +%% see <a href="http://wiki.apache.org/couchdb/HTTP_view_API#Querying_Options"> +%% query args</a> for details. +-spec query_view(dbname(), #doc{} | binary(), iodata(), callback(), any(), + #view_query_args{}) -> + any(). +query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) -> + Db = dbname(DbName), View = name(ViewName), + case is_reduce_view(Db, Design, View, QueryArgs) of + true -> + Mod = fabric_view_reduce; + false -> + Mod = fabric_view_map + end, + Mod:go(Db, Design, View, QueryArgs, Callback, Acc0). + +%% @doc retrieve info about a view group, disk size, language, whether compaction +%% is running and so forth +-spec get_view_group_info(dbname(), #doc{} | docid()) -> + {ok, [ + {signature, binary()} | + {language, binary()} | + {disk_size, non_neg_integer()} | + {compact_running, boolean()} | + {updater_running, boolean()} | + {waiting_commit, boolean()} | + {waiting_clients, non_neg_integer()} | + {update_seq, pos_integer()} | + {purge_seq, non_neg_integer()} + ]}. +get_view_group_info(DbName, DesignId) -> + fabric_group_info:go(dbname(DbName), design_doc(DesignId)). + +%% @doc retrieve all the design docs from a database +-spec design_docs(dbname()) -> {ok, [json_obj()]}. +design_docs(DbName) -> + QueryArgs = #view_query_args{start_key = <<"_design/">>, include_docs=true}, + Callback = fun({total_and_offset, _, _}, []) -> + {ok, []}; + ({row, {Props}}, Acc) -> + case couch_util:get_value(id, Props) of + <<"_design/", _/binary>> -> + {ok, [couch_util:get_value(doc, Props) | Acc]}; + _ -> + {stop, Acc} + end; + (complete, Acc) -> + {ok, lists:reverse(Acc)}; + ({error, Reason}, _Acc) -> + {error, Reason} + end, + fabric:all_docs(dbname(DbName), Callback, [], QueryArgs). + +%% @doc forces a reload of validation functions, this is performed after +%% design docs are update +%% NOTE: This function probably doesn't belong here as part fo the API +-spec reset_validation_funs(dbname()) -> [reference()]. +reset_validation_funs(DbName) -> + [rexi:cast(Node, {fabric_rpc, reset_validation_funs, [Name]}) || + #shard{node=Node, name=Name} <- mem3:shards(DbName)]. + +%% @doc clean up index files for all Dbs +-spec cleanup_index_files() -> [ok]. +cleanup_index_files() -> + {ok, Dbs} = fabric:all_dbs(), + [cleanup_index_files(Db) || Db <- Dbs]. + +%% @doc clean up index files for a specific db +-spec cleanup_index_files(dbname()) -> ok. +cleanup_index_files(DbName) -> + {ok, DesignDocs} = fabric:design_docs(DbName), + + ActiveSigs = lists:map(fun(#doc{id = GroupId}) -> + {ok, Info} = fabric:get_view_group_info(DbName, GroupId), + binary_to_list(couch_util:get_value(signature, Info)) + end, [couch_doc:from_json_obj(DD) || DD <- DesignDocs]), + + FileList = filelib:wildcard([couch_config:get("couchdb", "view_index_dir"), + "/.shards/*/", couch_util:to_list(dbname(DbName)), ".[0-9]*_design/*"]), + + DeleteFiles = if ActiveSigs =:= [] -> FileList; true -> + {ok, RegExp} = re:compile([$(, string:join(ActiveSigs, "|"), $)]), + lists:filter(fun(FilePath) -> + re:run(FilePath, RegExp, [{capture, none}]) == nomatch + end, FileList) + end, + [file:delete(File) || File <- DeleteFiles], + ok. + +%% some simple type validation and transcoding + +dbname(DbName) when is_list(DbName) -> + list_to_binary(DbName); +dbname(DbName) when is_binary(DbName) -> + DbName; +dbname(#db{name=Name}) -> + Name; +dbname(DbName) -> + erlang:error({illegal_database_name, DbName}). + +name(Thing) -> + couch_util:to_binary(Thing). + +docid(DocId) when is_list(DocId) -> + list_to_binary(DocId); +docid(DocId) when is_binary(DocId) -> + DocId; +docid(DocId) -> + erlang:error({illegal_docid, DocId}). + +docs(Docs) when is_list(Docs) -> + [doc(D) || D <- Docs]; +docs(Docs) -> + erlang:error({illegal_docs_list, Docs}). + +doc(#doc{} = Doc) -> + Doc; +doc({_} = Doc) -> + couch_doc:from_json_obj(Doc); +doc(Doc) -> + erlang:error({illegal_doc_format, Doc}). + +design_doc(#doc{} = DDoc) -> + DDoc; +design_doc(DocId) when is_list(DocId) -> + design_doc(list_to_binary(DocId)); +design_doc(<<"_design/", _/binary>> = DocId) -> + DocId; +design_doc(GroupName) -> + <<"_design/", GroupName/binary>>. + +idrevs({Id, Revs}) when is_list(Revs) -> + {docid(Id), [rev(R) || R <- Revs]}. + +rev(Rev) when is_list(Rev); is_binary(Rev) -> + couch_doc:parse_rev(Rev); +rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) -> + Rev. + +%% @doc convenience method, useful when testing or calling fabric from the shell +opts(Options) -> + add_option(user_ctx, add_option(io_priority, Options)). + +add_option(Key, Options) -> + case couch_util:get_value(Key, Options) of + undefined -> + case erlang:get(Key) of + undefined -> + Options; + Value -> + [{Key, Value} | Options] + end; + _ -> + Options + end. + +default_callback(complete, Acc) -> + {ok, lists:reverse(Acc)}; +default_callback(Row, Acc) -> + {ok, [Row | Acc]}. + +is_reduce_view(_, _, _, #view_query_args{view_type=Reduce}) -> + Reduce =:= reduce. + +%% @doc convenience method for use in the shell, converts a keylist +%% to a `changes_args' record +kl_to_changes_args(KeyList) -> + kl_to_record(KeyList, changes_args). + +%% @doc convenience method for use in the shell, converts a keylist +%% to a `view_query_args' record +kl_to_query_args(KeyList) -> + kl_to_record(KeyList, view_query_args). + +%% @doc finds the index of the given Key in the record. +%% note that record_info is only known at compile time +%% so the code must be written in this way. For each new +%% record type add a case clause +lookup_index(Key,RecName) -> + Indexes = + case RecName of + changes_args -> + lists:zip(record_info(fields, changes_args), + lists:seq(2, record_info(size, changes_args))); + view_query_args -> + lists:zip(record_info(fields, view_query_args), + lists:seq(2, record_info(size, view_query_args))) + end, + couch_util:get_value(Key, Indexes). + +%% @doc convert a keylist to record with given `RecName' +%% @see lookup_index +kl_to_record(KeyList,RecName) -> + Acc0 = case RecName of + changes_args -> #changes_args{}; + view_query_args -> #view_query_args{} + end, + lists:foldl(fun({Key, Value}, Acc) -> + Index = lookup_index(couch_util:to_existing_atom(Key),RecName), + setelement(Index, Acc, Value) + end, Acc0, KeyList). |