summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-05-25 12:46:00 -0400
committerBrad Anderson <brad@cloudant.com>2010-05-25 12:46:00 -0400
commitd5fc5305705bd5a308d1c5cab8da0fba1b5d0320 (patch)
tree9922e8eaba140fd2ca873a33b1c3fe7454e52b97 /src
add fabric app, begin moving relevant parts of showroom over to it
Diffstat (limited to 'src')
-rw-r--r--src/fabric.erl6
-rw-r--r--src/fabric_api.erl262
-rw-r--r--src/fabric_create.erl141
3 files changed, 409 insertions, 0 deletions
diff --git a/src/fabric.erl b/src/fabric.erl
new file mode 100644
index 00000000..d588265f
--- /dev/null
+++ b/src/fabric.erl
@@ -0,0 +1,6 @@
+-module(fabric).
+
+-export([create_db/2]).
+
+create_db(DbName, Options) ->
+ fabric_create:create_db(DbName, Options).
diff --git a/src/fabric_api.erl b/src/fabric_api.erl
new file mode 100644
index 00000000..a26aa7de
--- /dev/null
+++ b/src/fabric_api.erl
@@ -0,0 +1,262 @@
+%% This is a raw Erlang API for CouchDB in a Cloudant cluster
+%% It makes use of clustering facilities
+
+-module(fabric_api).
+-author('adam@cloudant.com').
+-author('brad@cloudant.com').
+
+-include("../../couch/src/couch_db.hrl").
+
+-compile(export_all).
+
+-type response() :: any().
+
+%% dialyzer doesn't have recursive types, so this is necessarily wrong
+-type ejson_value() :: true | false | number() | bstring() | list().
+-type ejson() :: {[{bstring(), ejson_value()}]}.
+
+%% Database
+
+-spec db_path(bstring(), bstring()) -> bstring().
+db_path(RawUri, Customer) ->
+ showroom_db:db_path(RawUri, Customer).
+
+-spec all_databases(string()) -> {ok, [bstring()]}.
+all_databases(Customer) ->
+ showroom_db:all_databases(Customer).
+
+-spec create_db(bstring(), [any()]) -> {ok, #db{}} | {error, any()}.
+create_db(DbName, Options) ->
+ fabric:create_db(DbName, Options).
+
+-spec delete_db(bstring(), [any()]) -> ok | not_found | {error, atom()}.
+delete_db(DbName, Options) ->
+ showroom_db:delete_db(DbName, Options).
+
+-spec open_db(bstring(), [any()]) -> {ok, #db{}} | {error, any()}.
+open_db(DbName, Options) ->
+ showroom_db:open_db(DbName, Options).
+
+-spec close_db(#db{}) -> ok.
+close_db(Db) ->
+ showroom_db:close_db(Db).
+
+-spec get_db_info(#db{}, bstring()) -> {ok, [{atom(), any()}]}.
+get_db_info(Db, Customer) ->
+ showroom_db:get_db_info(Db, Customer).
+
+-spec get_committed_update_seq(#db{}) -> update_seq().
+get_committed_update_seq(_Db) ->
+%% couch_db:get_committed_update_seq(Db).
+ not_implemented.
+
+-spec get_purge_seq(#db{}) -> update_seq().
+get_purge_seq(_Db) ->
+%% couch_db:get_purge_seq(Db).
+ not_implemented.
+
+-spec get_update_seq(#db{}) -> update_seq().
+get_update_seq(_Db) ->
+%% couch_db:get_update_seq(Db).
+ not_implemented.
+
+-spec compact_db(#db{}) -> ok.
+compact_db(_Db) ->
+%% couch_db:start_compact(Db).
+ not_implemented.
+
+-spec replicate_db(ejson(), #user_ctx{}) -> {ok, ejson()}.
+replicate_db(PostBody, UserCtx) ->
+ showroom_rep:replicate(PostBody, UserCtx).
+
+-spec ensure_full_commit(#db{}) -> {ok, InstanceStartTime::bstring()}.
+ensure_full_commit(Db) ->
+ showroom_db:ensure_full_commit(Db),
+ {ok, <<"0">>}.
+
+-spec increment_update_seq(#db{}) -> {ok, update_seq()}.
+increment_update_seq(_Db) ->
+%% couch_db:increment_update_seq(Db).
+ not_implemented.
+
+-spec get_admins(#db{}) -> [any()].
+get_admins(_Db) ->
+%% couch_db:get_admins(Db).
+ not_implemented.
+
+-spec set_admins(#db{}, [any()]) -> ok.
+set_admins(_Db, _Admins) ->
+%% couch_db:set_admins(Db, Admins).
+ not_implemented.
+
+-spec get_revs_limit(#db{}) -> pos_integer().
+get_revs_limit(_Db) ->
+%% couch_db:get_revs_limit(Db).
+ not_implemented.
+
+-spec set_revs_limit(#db{}, pos_integer()) -> ok.
+set_revs_limit(_Db, _Limit) ->
+%% couch_db:set_revs_limit(Db, Limit).
+ not_implemented.
+
+
+%% Document
+
+att_receiver(Req, Length) ->
+ showroom_att:receiver(Req, Length).
+
+-spec changes_since(#db{}, main_only | all_docs, update_seq(),
+ fun(([#doc_info{}], Acc) -> {ok, Acc}), Acc) -> {ok, Acc}.
+changes_since(_Db, _Style, _StartSeq, _Fun, _Acc0) ->
+%% couch_db:changes_since(Db, Style, StartSeq, Fun, Acc0).
+ not_implemented.
+
+-spec enum_docs(#db{}, docid(), fwd | rev, fun((#full_doc_info{},
+ Offset::integer(), Acc) -> {ok|stop, Acc}), Acc) -> {ok, Acc}.
+enum_docs(_Db, _StartId, _Dir, _Fun, _Acc0) ->
+%% couch_db:enum_docs(Db, StartId, Dir, Fun, Acc0).
+ not_implemented.
+
+-spec enum_docs_reduce_to_count({any(), any()}) -> non_neg_integer().
+enum_docs_reduce_to_count(_Reductions) ->
+%% couch_db:enum_docs_reduce_to_count(Reductions).
+ not_implemented.
+
+-spec enum_docs_since(#db{}, update_seq(), fwd | rev, fun((#doc_info{},
+ Offset::integer(), Acc) -> {ok|stop, Acc}), Acc) -> {ok, Acc}.
+enum_docs_since(_Db, _StartKey, _Dir, _Fun, _Acc0) ->
+%% couch_db:enum_docs_since(Db, StartKey, Dir, Fun, Acc0).
+ not_implemented.
+
+-spec enum_docs_since_reduce_to_count({any(), any()}) -> non_neg_integer().
+enum_docs_since_reduce_to_count(_Reductions) ->
+%% couch_db:enum_docs_since_reduce_to_count(Reductions).
+ not_implemented.
+
+-spec get_doc_info(#db{}, docid()) -> {ok, #doc_info{}}.
+get_doc_info(_Db, _DocId) ->
+%% couch_db:get_doc_info(Db, DocId).
+ not_implemented.
+
+-spec get_missing_revs(#db{}, [{docid(), [revision()]}]) ->
+ {ok, [{docid(), [revision()]}]}.
+get_missing_revs(Db, IdsRevs) ->
+ showroom_doc:get_missing_revs(Db, IdsRevs).
+
+open_doc(Db, DocId) ->
+ open_doc(Db, DocId, nil, []).
+
+-spec open_doc(#db{}, docid(), [any()]) -> {ok, #doc{}} | {not_found, deleted |
+ missing}.
+open_doc(Db, DocId, Options) ->
+ open_doc(Db, DocId, nil, Options).
+
+open_doc(Db, DocId, Revs, Options) ->
+ showroom_doc:open_doc(Db, DocId, Revs, Options).
+
+-spec open_doc_revs(#db{}, docid(), [revision()], [any()]) -> {ok, [{ok, #doc{}}
+ | {{not_found, deleted | missing}, revision()}]}.
+open_doc_revs(Db, DocId, Revs, Options) ->
+ open_doc(Db, DocId, Revs, Options).
+
+-spec purge_docs(#db{}, [{docid(), [revision()]}]) ->
+ {ok, update_seq(), [{docid(),[revision()]} | {error,purge_during_compaction}]}.
+purge_docs(_Db, _IdsRevs) ->
+%% couch_db:purge_docs(Db, IdsRevs).
+ not_implemented.
+
+update_doc(Db, Doc) ->
+ update_doc(Db, Doc, []).
+
+-spec update_doc(#db{}, #doc{}, [any()]) -> {ok, revision()}.
+update_doc(Db, Doc, Options) ->
+ showroom_doc:update_doc(Db, Doc, Options).
+
+update_docs(Db, Docs, Options) ->
+ update_docs(Db, Docs, Options, interactive_edit).
+
+-spec update_docs(#db{}, [#doc{}], [any()], interactive_edit |
+ replicated_changes) -> {ok, [{ok, revision()}]}.
+update_docs(Db, Docs, Options, Type) ->
+ showroom_doc:update_docs(Db, Docs, Options, Type).
+
+
+%% View
+
+-spec all_docs_view(response(), #db{}, nil | list(), #view_query_args{}) ->
+ {ok, any()}.
+all_docs_view(Resp, Db, Keys, QueryArgs) ->
+ showroom_view:all_docs(Resp, Db, Keys, QueryArgs).
+
+-spec compact_view_group(#db{}, bstring()) -> ok.
+compact_view_group(_Db, _DesignId) ->
+%% couch_view_compactor:start_compact(Db, DesignId).
+ not_implemented.
+
+-spec cleanup_view_index_files(#db{}) -> any().
+cleanup_view_index_files(_Db) ->
+%% couch_view:cleanup_index_files(Db).
+ not_implemented.
+
+-spec design_view(response(), #db{}, bstring(), bstring(), nil | list(),
+ #view_query_args{}) -> any().
+design_view(Resp, Db, Id, Name, Keys, QueryArgs) ->
+ showroom_view:design(Resp, Db, Id, Name, Keys, QueryArgs).
+
+list_view(Req, Db, DesignId, ViewName, Keys, QueryArgs, QueryServer) ->
+ showroom_view:list(Req, Db, DesignId, ViewName, Keys, QueryArgs, QueryServer).
+
+-spec extract_map_view({reduce, any(), bstring(), #view{}}) -> {ok, #view{}}.
+extract_map_view(_ReduceView) ->
+%% couch_view:extract_map_view(ReduceView).
+ not_implemented.
+
+-spec get_map_view(#db{}, bstring(), bstring(), true | false) -> any().
+get_map_view(_Db, _DesignId, _ViewName, _Stale) ->
+%% couch_view:get_map_view(Db, DesignId, ViewName, Stale).
+ not_implemented.
+
+-spec get_reduce_view(#db{}, bstring(), bstring(), true | false) -> any().
+get_reduce_view(_Db, _DesignId, _ViewName, _Stale) ->
+%% couch_view:get_reduce_view(Db, DesignId, ViewName, Stale).
+ not_implemented.
+
+-spec get_row_count(#view{}) -> {ok, non_neg_integer()}.
+get_row_count(_View) ->
+%% couch_view:get_row_count(View).
+ not_implemented.
+
+-spec get_temp_map_view(#db{}, bstring(), [any()], bstring()) -> {ok, #view{},
+ #group{}}.
+get_temp_map_view(_Db, _Language, _DesignOptions, _MapSrc) ->
+%% couch_view:get_temp_map_view(Db, Language, DesignOptions, MapSrc).
+ not_implemented.
+
+-spec get_temp_reduce_view(#db{}, bstring(), [any()], bstring(), bstring()) ->
+ {ok, {temp_reduce, #view{}}, #group{}}.
+get_temp_reduce_view(_Db, _Language, _DesignOptions, _MapSrc, _RedSrc) ->
+%% couch_view:get_temp_reduce_view(Db, Language, DesignOptions, MapSrc, RedSrc).
+ not_implemented.
+
+-spec get_view_group_info(#db{}, bstring()) -> {ok, [{atom(), any()}]}.
+get_view_group_info(Db, DesignId) ->
+ showroom_view:group_info(Db, DesignId).
+
+-spec reduce_to_count({any(), any()}) -> non_neg_integer().
+reduce_to_count(_Reductions) ->
+%% couch_view:reduce_to_count(Reductions).
+ not_implemented.
+
+-spec view_fold(#view{}, any(), fwd | rev, fun(({{Key::any(), docid()},
+ Value::any()}, OffsetReds::any(), Acc) -> {ok|stop, Acc}), Acc) -> {ok, Acc}.
+view_fold(_View, _Start, _Dir, _Fun, _Acc0) ->
+%% couch_view:fold(View, Start, Dir, Fun, Acc0).
+ not_implemented.
+
+-spec view_fold_reduce({reduce | temp_reduce, #view{}}, fwd | rev, any(), any(),
+ fun(({Key1::any(), any()}, {Key2::any(), any()}) -> boolean()),
+ fun(({Key::any(), Reduction::any(), Acc}) -> {ok|stop, Acc}), Acc) ->
+ {ok, Acc}.
+view_fold_reduce(_View, _Dir, _Start, _End, _GroupFun, _ResponseFun, _Acc0) ->
+%% couch_view:fold_reduce(View, Dir, Start, End, GroupFun, ResponseFun, Acc0).
+ not_implemented.
diff --git a/src/fabric_create.erl b/src/fabric_create.erl
new file mode 100644
index 00000000..b47cd768
--- /dev/null
+++ b/src/fabric_create.erl
@@ -0,0 +1,141 @@
+-module(fabric_create).
+-author('Brad Anderson <brad@cloudant.com>').
+
+-include("../../couch/src/couch_db.hrl").
+
+%% api
+-export([create_db/2]).
+
+-type part() :: integer().
+-type ref_node_part() :: {reference(), node(), part()}.
+-type tref() :: reference().
+-type np() :: {node(), part()}.
+-type np_acc() :: [{np(), any()}].
+
+
+%% =====================
+%% api
+%% =====================
+
+%% @doc Create a new database, and all its partition files across the cluster
+%% Options is proplist with user_ctx, n, q
+-spec create_db(binary(), list()) -> {ok, #db{}} | {error, any()}.
+create_db(DbName, Options) ->
+ RefNodePart = send_create_calls(DbName, Options),
+ {ok, Results} = create_db_loop(RefNodePart),
+ case create_results(Results, RefNodePart) of
+ ok -> {ok, #db{name=DbName}};
+ Other -> {error, Other}
+ end.
+
+
+%% =====================
+%% internal
+%% =====================
+
+%% @doc create the partitions on all appropriate nodes (rexi calls)
+-spec send_create_calls(binary(), list()) -> [{reference(), np()}].
+send_create_calls(DbName, Options) ->
+ Fullmap = partitions:fullmap(DbName, Options),
+ lists:map(fun({Node, Part}) ->
+ ShardName = showroom_utils:shard_name(Part, DbName),
+ Ref = rexi:async_server_call({couch_server, Node},
+ {create, ShardName, Options}),
+ {Ref, {Node, Part}}
+ end, Fullmap).
+
+%% @doc set up the receive loop with an overall timeout
+-spec create_db_loop([ref_node_part()]) -> {ok, np_acc()}.
+create_db_loop(RefNodePart) ->
+ TimeoutRef = erlang:make_ref(),
+ {ok, TRef} = timer:send_after(5000, {timeout, TimeoutRef}),
+ Results = create_db_loop(RefNodePart, TimeoutRef, []),
+ timer:cancel(TRef),
+ Results.
+
+%% @doc create_db receive loop
+-spec create_db_loop([ref_node_part()], tref(), np_acc()) ->
+ np_acc() | {ok, np_acc()}.
+create_db_loop(_,_,{ok, Acc}) -> {ok, Acc};
+create_db_loop(RefNodePart, TimeoutRef, AccIn) ->
+ receive
+ {Ref, {ok, MainPid}} when is_reference(Ref) ->
+ % for dev only, close the Fd
+ gen_server:call({couch_server, node(MainPid)}, {force_close, MainPid}),
+
+ AccOut = check_all_parts(Ref, RefNodePart, AccIn, ok),
+ create_db_loop(RefNodePart, TimeoutRef, AccOut);
+ {Ref, Reply} when is_reference(Ref) ->
+ AccOut = check_all_parts(Ref, RefNodePart, AccIn, Reply),
+ create_db_loop(RefNodePart, TimeoutRef, AccOut);
+ {timeout, TimeoutRef} ->
+ {error, timeout}
+ end.
+
+-spec create_results(np_acc(), [ref_node_part()]) -> ok | create_quorum_error.
+create_results(Results, RefNodePart) ->
+ NPs = create_result(Results, []),
+ DistinctNPs = distinct_parts(RefNodePart),
+ if
+ NPs =:= DistinctNPs -> ok;
+ true -> create_quorum_error
+ end.
+
+-spec create_result(np_acc(), [np()]) -> [np()] | file_exists.
+create_result([], Acc) ->
+ Acc;
+create_result([{NP, ok}|Rest], Acc) ->
+ create_result(Rest, [NP|Acc]);
+create_result([{_NP, {error, file_exists}}|_Rest], _Acc) ->
+ {error, file_exists}; % if any replies were file_exists, return that
+create_result([{{_N,_P}, Result}|Rest], Acc) ->
+ showroom_log:message(error, "create_db error: ~p", [Result]),
+ create_result(Rest, Acc).
+
+check_all_parts(Ref, RefNodePart, Acc, Reply) ->
+ case couch_util:get_value(Ref, RefNodePart) of
+ {Node, Part} ->
+ case lists:keyfind(1, {Node, Part}, Acc) of
+ true -> Acc; % already present... that's odd
+ _ ->
+ NewAcc = [{{Node, Part}, Reply} | Acc],
+ case length(NewAcc) >= length(RefNodePart) of
+ true -> {ok, NewAcc};
+ _ -> NewAcc
+ end
+ end;
+ _ -> Acc % ignore a non-matching Ref
+ end.
+
+%% @doc check that we have a good reply from each partition.
+%% If we do, return {ok, Acc}, if we don't, return Acc of partitions
+%% Three 'case' statements and one 'if', a personal best. fml
+%% @end
+% check_distinct_parts(Ref, RefNodePart, Acc, Msg) ->
+% Parts = distinct_parts(RefNodePart),
+% case couch_util:get_value(Ref, RefNodePart) of
+% {Node, Part} ->
+% case lists:member(Part, Acc) of
+% true -> Acc;
+% _ ->
+% case Msg of
+% ok ->
+% NewAcc = lists:usort([Part|Acc]),
+% if
+% Parts =:= NewAcc -> {ok, NewAcc};
+% true -> NewAcc
+% end;
+% _ ->
+% Hex = showroom_utils:int_to_hexstr(Part),
+% showroom_log:message(error,
+% "create_db reply error: ~p from ~p ~p", [Msg, Node, Hex]),
+% Acc
+% end
+% end;
+% _ -> Acc % ignore a non-matching Ref
+% end.
+
+distinct_parts(RefNodePart) ->
+ {_Refs, NPs} = lists:unzip(RefNodePart),
+ {_Nodes, Parts} = lists:unzip(NPs),
+ lists:usort(Parts).