summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--apps/fabric/ebin/fabric.app28
-rw-r--r--apps/fabric/ebin/fabric.appup3
-rw-r--r--apps/fabric/include/fabric.hrl22
-rw-r--r--apps/fabric/src/fabric.erl225
-rw-r--r--apps/fabric/src/fabric_db_create.erl65
-rw-r--r--apps/fabric/src/fabric_db_delete.erl41
-rw-r--r--apps/fabric/src/fabric_db_doc_count.erl32
-rw-r--r--apps/fabric/src/fabric_db_info.erl52
-rw-r--r--apps/fabric/src/fabric_db_meta.erl35
-rw-r--r--apps/fabric/src/fabric_dict.erl37
-rw-r--r--apps/fabric/src/fabric_doc_attachments.erl102
-rw-r--r--apps/fabric/src/fabric_doc_missing_revs.erl64
-rw-r--r--apps/fabric/src/fabric_doc_open.erl66
-rw-r--r--apps/fabric/src/fabric_doc_open_revs.erl65
-rw-r--r--apps/fabric/src/fabric_doc_update.erl127
-rw-r--r--apps/fabric/src/fabric_group_info.erl52
-rw-r--r--apps/fabric/src/fabric_rpc.erl388
-rw-r--r--apps/fabric/src/fabric_util.erl89
-rw-r--r--apps/fabric/src/fabric_view.erl218
-rw-r--r--apps/fabric/src/fabric_view_all_docs.erl167
-rw-r--r--apps/fabric/src/fabric_view_changes.erl251
-rw-r--r--apps/fabric/src/fabric_view_map.erl138
-rw-r--r--apps/fabric/src/fabric_view_reduce.erl85
23 files changed, 2352 insertions, 0 deletions
diff --git a/apps/fabric/ebin/fabric.app b/apps/fabric/ebin/fabric.app
new file mode 100644
index 00000000..8a565d8a
--- /dev/null
+++ b/apps/fabric/ebin/fabric.app
@@ -0,0 +1,28 @@
+{application, fabric, [
+ {description, "Routing and proxying layer for CouchDB cluster"},
+ {vsn, "1.0.3"},
+ {modules, [
+ fabric,
+ fabric_db_create,
+ fabric_db_delete,
+ fabric_db_doc_count,
+ fabric_db_info,
+ fabric_db_meta,
+ fabric_dict,
+ fabric_doc_attachments,
+ fabric_doc_missing_revs,
+ fabric_doc_open,
+ fabric_doc_open_revs,
+ fabric_doc_update,
+ fabric_group_info,
+ fabric_rpc,
+ fabric_util,
+ fabric_view,
+ fabric_view_all_docs,
+ fabric_view_changes,
+ fabric_view_map,
+ fabric_view_reduce
+ ]},
+ {registered, []},
+ {applications, [kernel, stdlib, couch, rexi, mem3]}
+]}.
diff --git a/apps/fabric/ebin/fabric.appup b/apps/fabric/ebin/fabric.appup
new file mode 100644
index 00000000..ef5dc496
--- /dev/null
+++ b/apps/fabric/ebin/fabric.appup
@@ -0,0 +1,3 @@
+{"1.0.3",[{"1.0.2",[
+ {load_module, fabric_view_changes}
+]}],[{"1.0.2",[]}]}.
diff --git a/apps/fabric/include/fabric.hrl b/apps/fabric/include/fabric.hrl
new file mode 100644
index 00000000..6ec17b34
--- /dev/null
+++ b/apps/fabric/include/fabric.hrl
@@ -0,0 +1,22 @@
+-include_lib("eunit/include/eunit.hrl").
+
+-record(collector, {
+ query_args,
+ callback,
+ counters,
+ buffer_size,
+ blocked = [],
+ total_rows = 0,
+ offset = 0,
+ rows = [],
+ skip,
+ limit,
+ keys,
+ os_proc,
+ reducer,
+ lang,
+ sorted,
+ user_acc
+}).
+
+-record(view_row, {key, id, value, doc, worker}).
diff --git a/apps/fabric/src/fabric.erl b/apps/fabric/src/fabric.erl
new file mode 100644
index 00000000..1be97a98
--- /dev/null
+++ b/apps/fabric/src/fabric.erl
@@ -0,0 +1,225 @@
+-module(fabric).
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+% DBs
+-export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1,
+ delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
+ set_security/3, get_revs_limit/1, get_security/1]).
+
+% Documents
+-export([open_doc/3, open_revs/4, get_missing_revs/2, update_doc/3,
+ update_docs/3, att_receiver/2]).
+
+% Views
+-export([all_docs/4, changes/4, query_view/3, query_view/4, query_view/6,
+ get_view_group_info/2]).
+
+% miscellany
+-export([design_docs/1, reset_validation_funs/1]).
+
+-include("fabric.hrl").
+
+% db operations
+
+all_dbs() ->
+ all_dbs(<<>>).
+
+all_dbs(Prefix) when is_list(Prefix) ->
+ all_dbs(list_to_binary(Prefix));
+all_dbs(Prefix) when is_binary(Prefix) ->
+ Length = byte_size(Prefix),
+ MatchingDbs = ets:foldl(fun(#shard{dbname=DbName}, Acc) ->
+ case DbName of
+ <<Prefix:Length/binary, _/binary>> ->
+ [DbName | Acc];
+ _ ->
+ Acc
+ end
+ end, [], partitions),
+ {ok, lists:usort(MatchingDbs)}.
+
+get_db_info(DbName) ->
+ fabric_db_info:go(dbname(DbName)).
+
+get_doc_count(DbName) ->
+ fabric_db_doc_count:go(dbname(DbName)).
+
+create_db(DbName) ->
+ create_db(DbName, []).
+
+create_db(DbName, Options) ->
+ fabric_db_create:go(dbname(DbName), opts(Options)).
+
+delete_db(DbName) ->
+ delete_db(DbName, []).
+
+delete_db(DbName, Options) ->
+ fabric_db_delete:go(dbname(DbName), opts(Options)).
+
+set_revs_limit(DbName, Limit, Options) when is_integer(Limit), Limit > 0 ->
+ fabric_db_meta:set_revs_limit(dbname(DbName), Limit, opts(Options)).
+
+get_revs_limit(DbName) ->
+ {ok, Db} = fabric_util:get_db(dbname(DbName)),
+ try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end.
+
+set_security(DbName, SecObj, Options) ->
+ fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)).
+
+get_security(DbName) ->
+ {ok, Db} = fabric_util:get_db(dbname(DbName)),
+ try couch_db:get_security(Db) after catch couch_db:close(Db) end.
+
+% doc operations
+open_doc(DbName, Id, Options) ->
+ fabric_doc_open:go(dbname(DbName), docid(Id), opts(Options)).
+
+open_revs(DbName, Id, Revs, Options) ->
+ fabric_doc_open_revs:go(dbname(DbName), docid(Id), Revs, opts(Options)).
+
+get_missing_revs(DbName, IdsRevs) when is_list(IdsRevs) ->
+ Sanitized = [idrevs(IdR) || IdR <- IdsRevs],
+ fabric_doc_missing_revs:go(dbname(DbName), Sanitized).
+
+update_doc(DbName, Doc, Options) ->
+ case update_docs(DbName, [Doc], opts(Options)) of
+ {ok, [{ok, NewRev}]} ->
+ {ok, NewRev};
+ {ok, [Error]} ->
+ throw(Error);
+ {ok, []} ->
+ % replication success
+ #doc{revs = {Pos, [RevId | _]}} = doc(Doc),
+ {ok, {Pos, RevId}}
+ end.
+
+update_docs(DbName, Docs, Options) ->
+ try fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options))
+ catch {aborted, PreCommitFailures} ->
+ {aborted, PreCommitFailures}
+ end.
+
+att_receiver(Req, Length) ->
+ fabric_doc_attachments:receiver(Req, Length).
+
+all_docs(DbName, Callback, Acc0, #view_query_args{} = QueryArgs) when
+ is_function(Callback, 2) ->
+ fabric_view_all_docs:go(dbname(DbName), QueryArgs, Callback, Acc0).
+
+changes(DbName, Callback, Acc0, Options) ->
+ % TODO use a keylist for Options instead of #changes_args, BugzID 10281
+ Feed = Options#changes_args.feed,
+ fabric_view_changes:go(dbname(DbName), Feed, Options, Callback, Acc0).
+
+query_view(DbName, DesignName, ViewName) ->
+ query_view(DbName, DesignName, ViewName, #view_query_args{}).
+
+query_view(DbName, DesignName, ViewName, QueryArgs) ->
+ Callback = fun default_callback/2,
+ query_view(DbName, DesignName, ViewName, Callback, [], QueryArgs).
+
+query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) ->
+ Db = dbname(DbName), View = name(ViewName),
+ case is_reduce_view(Db, Design, View, QueryArgs) of
+ true ->
+ Mod = fabric_view_reduce;
+ false ->
+ Mod = fabric_view_map
+ end,
+ Mod:go(Db, Design, View, QueryArgs, Callback, Acc0).
+
+get_view_group_info(DbName, DesignId) ->
+ fabric_group_info:go(dbname(DbName), design_doc(DesignId)).
+
+design_docs(DbName) ->
+ QueryArgs = #view_query_args{start_key = <<"_design/">>, include_docs=true},
+ Callback = fun({total_and_offset, _, _}, []) ->
+ {ok, []};
+ ({row, {Props}}, Acc) ->
+ case couch_util:get_value(id, Props) of
+ <<"_design/", _/binary>> ->
+ {ok, [couch_util:get_value(doc, Props) | Acc]};
+ _ ->
+ {stop, Acc}
+ end;
+ (complete, Acc) ->
+ {ok, lists:reverse(Acc)}
+ end,
+ fabric:all_docs(dbname(DbName), Callback, [], QueryArgs).
+
+reset_validation_funs(DbName) ->
+ [rexi:cast(Node, {fabric_rpc, reset_validation_funs, [Name]}) ||
+ #shard{node=Node, name=Name} <- mem3:shards(DbName)].
+
+%% some simple type validation and transcoding
+
+dbname(DbName) when is_list(DbName) ->
+ list_to_binary(DbName);
+dbname(DbName) when is_binary(DbName) ->
+ DbName;
+dbname(#db{name=Name}) ->
+ Name;
+dbname(DbName) ->
+ erlang:error({illegal_database_name, DbName}).
+
+name(Thing) ->
+ couch_util:to_binary(Thing).
+
+docid(DocId) when is_list(DocId) ->
+ list_to_binary(DocId);
+docid(DocId) when is_binary(DocId) ->
+ DocId;
+docid(DocId) ->
+ erlang:error({illegal_docid, DocId}).
+
+docs(Docs) when is_list(Docs) ->
+ [doc(D) || D <- Docs];
+docs(Docs) ->
+ erlang:error({illegal_docs_list, Docs}).
+
+doc(#doc{} = Doc) ->
+ Doc;
+doc({_} = Doc) ->
+ couch_doc:from_json_obj(Doc);
+doc(Doc) ->
+ erlang:error({illegal_doc_format, Doc}).
+
+design_doc(#doc{} = DDoc) ->
+ DDoc;
+design_doc(DocId) when is_list(DocId) ->
+ design_doc(list_to_binary(DocId));
+design_doc(<<"_design/", _/binary>> = DocId) ->
+ DocId;
+design_doc(GroupName) ->
+ <<"_design/", GroupName/binary>>.
+
+idrevs({Id, Revs}) when is_list(Revs) ->
+ {docid(Id), [rev(R) || R <- Revs]}.
+
+rev(Rev) when is_list(Rev); is_binary(Rev) ->
+ couch_doc:parse_rev(Rev);
+rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
+ Rev.
+
+opts(Options) ->
+ case couch_util:get_value(user_ctx, Options) of
+ undefined ->
+ case erlang:get(user_ctx) of
+ #user_ctx{} = Ctx ->
+ [{user_ctx, Ctx} | Options];
+ _ ->
+ Options
+ end;
+ _ ->
+ Options
+ end.
+
+default_callback(complete, Acc) ->
+ {ok, lists:reverse(Acc)};
+default_callback(Row, Acc) ->
+ {ok, [Row | Acc]}.
+
+is_reduce_view(_, _, _, #view_query_args{view_type=Reduce}) ->
+ Reduce =:= reduce.
diff --git a/apps/fabric/src/fabric_db_create.erl b/apps/fabric/src/fabric_db_create.erl
new file mode 100644
index 00000000..d10bcc22
--- /dev/null
+++ b/apps/fabric/src/fabric_db_create.erl
@@ -0,0 +1,65 @@
+-module(fabric_db_create).
+-export([go/2]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(DBNAME_REGEX, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/\\s.]*$").
+
+%% @doc Create a new database, and all its partition files across the cluster
+%% Options is proplist with user_ctx, n, q
+go(DbName, Options) ->
+ case re:run(DbName, ?DBNAME_REGEX, [{capture,none}]) of
+ match ->
+ Shards = mem3:choose_shards(DbName, Options),
+ Doc = make_document(Shards),
+ Workers = fabric_util:submit_jobs(Shards, create_db, [Options, Doc]),
+ Acc0 = fabric_dict:init(Workers, nil),
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, _} ->
+ ok;
+ Else ->
+ Else
+ end;
+ nomatch ->
+ {error, illegal_database_name}
+ end.
+
+handle_message(Msg, Shard, Counters) ->
+ C1 = fabric_dict:store(Shard, Msg, Counters),
+ case fabric_dict:any(nil, C1) of
+ true ->
+ {ok, C1};
+ false ->
+ final_answer(C1)
+ end.
+
+make_document([#shard{dbname=DbName}|_] = Shards) ->
+ {RawOut, ByNodeOut, ByRangeOut} =
+ lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) ->
+ Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-",
+ couch_util:to_hex(<<E:32/integer>>)]),
+ Node = couch_util:to_binary(N),
+ {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode),
+ orddict:append(Range, Node, ByRange)}
+ end, {[], [], []}, Shards),
+ #doc{id=DbName, body = {[
+ {<<"changelog">>, lists:sort(RawOut)},
+ {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
+ {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
+ ]}}.
+
+final_answer(Counters) ->
+ Successes = [X || {_, M} = X <- Counters, M == ok orelse M == file_exists],
+ case fabric_view:is_progress_possible(Successes) of
+ true ->
+ case lists:keymember(file_exists, 2, Successes) of
+ true ->
+ {error, file_exists};
+ false ->
+ {stop, ok}
+ end;
+ false ->
+ {error, internal_server_error}
+ end.
diff --git a/apps/fabric/src/fabric_db_delete.erl b/apps/fabric/src/fabric_db_delete.erl
new file mode 100644
index 00000000..57eefa9e
--- /dev/null
+++ b/apps/fabric/src/fabric_db_delete.erl
@@ -0,0 +1,41 @@
+-module(fabric_db_delete).
+-export([go/2]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+go(DbName, Options) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, delete_db, [Options, DbName]),
+ Acc0 = fabric_dict:init(Workers, nil),
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, ok} ->
+ ok;
+ {ok, not_found} ->
+ erlang:error(database_does_not_exist);
+ Error ->
+ Error
+ end.
+
+handle_message(Msg, Shard, Counters) ->
+ C1 = fabric_dict:store(Shard, Msg, Counters),
+ case fabric_dict:any(nil, C1) of
+ true ->
+ {ok, C1};
+ false ->
+ final_answer(C1)
+ end.
+
+final_answer(Counters) ->
+ Successes = [X || {_, M} = X <- Counters, M == ok orelse M == not_found],
+ case fabric_view:is_progress_possible(Successes) of
+ true ->
+ case lists:keymember(ok, 2, Successes) of
+ true ->
+ {stop, ok};
+ false ->
+ {stop, not_found}
+ end;
+ false ->
+ {error, internal_server_error}
+ end.
diff --git a/apps/fabric/src/fabric_db_doc_count.erl b/apps/fabric/src/fabric_db_doc_count.erl
new file mode 100644
index 00000000..12d5cbf8
--- /dev/null
+++ b/apps/fabric/src/fabric_db_doc_count.erl
@@ -0,0 +1,32 @@
+-module(fabric_db_doc_count).
+
+-export([go/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, get_doc_count, []),
+ Acc0 = {fabric_dict:init(Workers, nil), 0},
+ fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
+
+handle_message({ok, Count}, Shard, {Counters, Acc}) ->
+ case fabric_dict:lookup_element(Shard, Counters) of
+ undefined ->
+ % already heard from someone else in this range
+ {ok, {Counters, Acc}};
+ nil ->
+ C1 = fabric_dict:store(Shard, ok, Counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ case fabric_dict:any(nil, C2) of
+ true ->
+ {ok, {C2, Count+Acc}};
+ false ->
+ {stop, Count+Acc}
+ end
+ end;
+handle_message(_, _, Acc) ->
+ {ok, Acc}.
+
diff --git a/apps/fabric/src/fabric_db_info.erl b/apps/fabric/src/fabric_db_info.erl
new file mode 100644
index 00000000..3758c5c3
--- /dev/null
+++ b/apps/fabric/src/fabric_db_info.erl
@@ -0,0 +1,52 @@
+-module(fabric_db_info).
+
+-export([go/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+go(DbName) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
+ Acc0 = {fabric_dict:init(Workers, nil), []},
+ fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
+
+handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
+ case fabric_dict:lookup_element(Shard, Counters) of
+ undefined ->
+ % already heard from someone else in this range
+ {ok, {Counters, Acc}};
+ nil ->
+ C1 = fabric_dict:store(Shard, ok, Counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ case fabric_dict:any(nil, C2) of
+ true ->
+ {ok, {C2, [Info|Acc]}};
+ false ->
+ {stop, [{db_name,Name}|merge_results(lists:flatten([Info|Acc]))]}
+ end
+ end;
+handle_message(_, _, Acc) ->
+ {ok, Acc}.
+
+merge_results(Info) ->
+ Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end,
+ orddict:new(), Info),
+ orddict:fold(fun
+ (doc_count, X, Acc) ->
+ [{doc_count, lists:sum(X)} | Acc];
+ (doc_del_count, X, Acc) ->
+ [{doc_del_count, lists:sum(X)} | Acc];
+ (update_seq, X, Acc) ->
+ [{update_seq, lists:sum(X)} | Acc];
+ (purge_seq, X, Acc) ->
+ [{purge_seq, lists:sum(X)} | Acc];
+ (compact_running, X, Acc) ->
+ [{compact_running, lists:member(true, X)} | Acc];
+ (disk_size, X, Acc) ->
+ [{disk_size, lists:sum(X)} | Acc];
+ (disk_format_version, X, Acc) ->
+ [{disk_format_version, lists:max(X)} | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [{instance_start_time, <<"0">>}], Dict).
diff --git a/apps/fabric/src/fabric_db_meta.erl b/apps/fabric/src/fabric_db_meta.erl
new file mode 100644
index 00000000..ee15fc72
--- /dev/null
+++ b/apps/fabric/src/fabric_db_meta.erl
@@ -0,0 +1,35 @@
+-module(fabric_db_meta).
+
+-export([set_revs_limit/3, set_security/3]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+set_revs_limit(DbName, Limit, Options) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, set_revs_limit, [Limit, Options]),
+ Waiting = length(Workers) - 1,
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of
+ {ok, ok} ->
+ ok;
+ Error ->
+ Error
+ end.
+
+set_security(DbName, SecObj, Options) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, set_security, [SecObj, Options]),
+ Waiting = length(Workers) - 1,
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Waiting) of
+ {ok, ok} ->
+ ok;
+ Error ->
+ Error
+ end.
+
+handle_message(ok, _, 0) ->
+ {stop, ok};
+handle_message(ok, _, Waiting) ->
+ {ok, Waiting - 1};
+handle_message(Error, _, _Waiting) ->
+ {error, Error}. \ No newline at end of file
diff --git a/apps/fabric/src/fabric_dict.erl b/apps/fabric/src/fabric_dict.erl
new file mode 100644
index 00000000..42d46b34
--- /dev/null
+++ b/apps/fabric/src/fabric_dict.erl
@@ -0,0 +1,37 @@
+-module(fabric_dict).
+-compile(export_all).
+
+% Instead of ets, let's use an ordered keylist. We'll need to revisit if we
+% have >> 100 shards, so a private interface is a good idea. - APK June 2010
+
+init(Keys, InitialValue) ->
+ orddict:from_list([{Key, InitialValue} || Key <- Keys]).
+
+
+decrement_all(Dict) ->
+ [{K,V-1} || {K,V} <- Dict].
+
+store(Key, Value, Dict) ->
+ orddict:store(Key, Value, Dict).
+
+erase(Key, Dict) ->
+ orddict:erase(Key, Dict).
+
+update_counter(Key, Incr, Dict0) ->
+ orddict:update_counter(Key, Incr, Dict0).
+
+
+lookup_element(Key, Dict) ->
+ couch_util:get_value(Key, Dict).
+
+size(Dict) ->
+ orddict:size(Dict).
+
+any(Value, Dict) ->
+ lists:keymember(Value, 2, Dict).
+
+filter(Fun, Dict) ->
+ orddict:filter(Fun, Dict).
+
+fold(Fun, Acc0, Dict) ->
+ orddict:fold(Fun, Acc0, Dict).
diff --git a/apps/fabric/src/fabric_doc_attachments.erl b/apps/fabric/src/fabric_doc_attachments.erl
new file mode 100644
index 00000000..aecdaaef
--- /dev/null
+++ b/apps/fabric/src/fabric_doc_attachments.erl
@@ -0,0 +1,102 @@
+-module(fabric_doc_attachments).
+
+-include("fabric.hrl").
+
+%% couch api calls
+-export([receiver/2]).
+
+receiver(_Req, undefined) ->
+ <<"">>;
+receiver(_Req, {unknown_transfer_encoding, Unknown}) ->
+ exit({unknown_transfer_encoding, Unknown});
+receiver(Req, chunked) ->
+ MiddleMan = spawn(fun() -> middleman(Req, chunked) end),
+ fun(4096, ChunkFun, ok) ->
+ write_chunks(MiddleMan, ChunkFun)
+ end;
+receiver(_Req, 0) ->
+ <<"">>;
+receiver(Req, Length) when is_integer(Length) ->
+ Middleman = spawn(fun() -> middleman(Req, Length) end),
+ fun() ->
+ Middleman ! {self(), gimme_data},
+ receive {Middleman, Data} -> Data end
+ end;
+receiver(_Req, Length) ->
+ exit({length_not_integer, Length}).
+
+%%
+%% internal
+%%
+
+write_chunks(MiddleMan, ChunkFun) ->
+ MiddleMan ! {self(), gimme_data},
+ receive
+ {MiddleMan, {0, _Footers}} ->
+ % MiddleMan ! {self(), done},
+ ok;
+ {MiddleMan, ChunkRecord} ->
+ ChunkFun(ChunkRecord, ok),
+ write_chunks(MiddleMan, ChunkFun)
+ end.
+
+receive_unchunked_attachment(_Req, 0) ->
+ ok;
+receive_unchunked_attachment(Req, Length) ->
+ receive {MiddleMan, go} ->
+ Data = couch_httpd:recv(Req, 0),
+ MiddleMan ! {self(), Data}
+ end,
+ receive_unchunked_attachment(Req, Length - size(Data)).
+
+middleman(Req, chunked) ->
+ % spawn a process to actually receive the uploaded data
+ RcvFun = fun(ChunkRecord, ok) ->
+ receive {From, go} -> From ! {self(), ChunkRecord} end, ok
+ end,
+ Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end),
+
+ % take requests from the DB writers and get data from the receiver
+ N = erlang:list_to_integer(couch_config:get("cluster","n")),
+ middleman_loop(Receiver, N, dict:new(), 0, []);
+
+middleman(Req, Length) ->
+ Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end),
+ N = erlang:list_to_integer(couch_config:get("cluster","n")),
+ middleman_loop(Receiver, N, dict:new(), 0, []).
+
+middleman_loop(Receiver, N, Counters, Offset, ChunkList) ->
+ receive {From, gimme_data} ->
+ % figure out how far along this writer (From) is in the list
+ {NewCounters, WhichChunk} = case dict:find(From, Counters) of
+ {ok, I} ->
+ {dict:update_counter(From, 1, Counters), I};
+ error ->
+ {dict:store(From, 2, Counters), 1}
+ end,
+ ListIndex = WhichChunk - Offset,
+
+ % talk to the receiver to get another chunk if necessary
+ ChunkList1 = if ListIndex > length(ChunkList) ->
+ Receiver ! {self(), go},
+ receive {Receiver, ChunkRecord} -> ChunkList ++ [ChunkRecord] end;
+ true -> ChunkList end,
+
+ % reply to the writer
+ From ! {self(), lists:nth(ListIndex, ChunkList1)},
+
+ % check if we can drop a chunk from the head of the list
+ SmallestIndex = dict:fold(fun(_, Val, Acc) -> lists:min([Val,Acc]) end,
+ WhichChunk+1, NewCounters),
+ Size = dict:size(NewCounters),
+
+ {NewChunkList, NewOffset} =
+ if Size == N andalso (SmallestIndex - Offset) == 2 ->
+ {tl(ChunkList1), Offset+1};
+ true ->
+ {ChunkList1, Offset}
+ end,
+ middleman_loop(Receiver, N, NewCounters, NewOffset, NewChunkList)
+ after 10000 ->
+ ok
+ end.
diff --git a/apps/fabric/src/fabric_doc_missing_revs.erl b/apps/fabric/src/fabric_doc_missing_revs.erl
new file mode 100644
index 00000000..9a368783
--- /dev/null
+++ b/apps/fabric/src/fabric_doc_missing_revs.erl
@@ -0,0 +1,64 @@
+-module(fabric_doc_missing_revs).
+
+-export([go/2]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+go(DbName, AllIdsRevs) ->
+ Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) ->
+ Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}),
+ Shard#shard{ref=Ref}
+ end, group_idrevs_by_shard(DbName, AllIdsRevs)),
+ ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]),
+ Acc0 = {length(Workers), ResultDict},
+ fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
+
+handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
+ skip_message(Acc0);
+handle_message({rexi_EXIT, _, _, _}, _Worker, Acc0) ->
+ skip_message(Acc0);
+handle_message({ok, Results}, _Worker, {1, D0}) ->
+ D = update_dict(D0, Results),
+ {stop, dict:fold(fun force_reply/3, [], D)};
+handle_message({ok, Results}, _Worker, {WaitingCount, D0}) ->
+ D = update_dict(D0, Results),
+ case dict:fold(fun maybe_reply/3, {stop, []}, D) of
+ continue ->
+ % still haven't heard about some Ids
+ {ok, {WaitingCount - 1, D}};
+ {stop, FinalReply} ->
+ {stop, FinalReply}
+ end.
+
+force_reply(Id, {nil,Revs}, Acc) ->
+ % never heard about this ID, assume it's missing
+ [{Id, Revs} | Acc];
+force_reply(_, [], Acc) ->
+ Acc;
+force_reply(Id, Revs, Acc) ->
+ [{Id, Revs} | Acc].
+
+maybe_reply(_, _, continue) ->
+ continue;
+maybe_reply(_, {nil, _}, _) ->
+ continue;
+maybe_reply(_, [], {stop, Acc}) ->
+ {stop, Acc};
+maybe_reply(Id, Revs, {stop, Acc}) ->
+ {stop, [{Id, Revs} | Acc]}.
+
+group_idrevs_by_shard(DbName, IdsRevs) ->
+ dict:to_list(lists:foldl(fun({Id, Revs}, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, {Id, Revs}, D1)
+ end, D0, mem3:shards(DbName,Id))
+ end, dict:new(), IdsRevs)).
+
+update_dict(D0, KVs) ->
+ lists:foldl(fun({K,V,_}, D1) -> dict:store(K, V, D1) end, D0, KVs).
+
+skip_message({1, Dict}) ->
+ {stop, dict:fold(fun force_reply/3, [], Dict)};
+skip_message({WaitingCount, Dict}) ->
+ {ok, {WaitingCount-1, Dict}}.
diff --git a/apps/fabric/src/fabric_doc_open.erl b/apps/fabric/src/fabric_doc_open.erl
new file mode 100644
index 00000000..5c5699c3
--- /dev/null
+++ b/apps/fabric/src/fabric_doc_open.erl
@@ -0,0 +1,66 @@
+-module(fabric_doc_open).
+
+-export([go/3]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, Id, Options) ->
+ Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_doc,
+ [Id, [deleted|Options]]),
+ SuppressDeletedDoc = not lists:member(deleted, Options),
+ R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")),
+ Acc0 = {length(Workers), list_to_integer(R), []},
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, {ok, #doc{deleted=true}}} when SuppressDeletedDoc ->
+ {not_found, deleted};
+ {ok, Else} ->
+ Else;
+ Error ->
+ Error
+ end.
+
+handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
+ skip_message(Acc0);
+handle_message({rexi_EXIT, _Reason}, _Worker, Acc0) ->
+ skip_message(Acc0);
+handle_message(Reply, _Worker, {WaitingCount, R, Replies}) ->
+ case merge_read_reply(make_key(Reply), Reply, Replies) of
+ {_, KeyCount} when KeyCount =:= R ->
+ {stop, Reply};
+ {NewReplies, KeyCount} when KeyCount < R ->
+ if WaitingCount =:= 1 ->
+ % last message arrived, but still no quorum
+ repair_read_quorum_failure(NewReplies);
+ true ->
+ {ok, {WaitingCount-1, R, NewReplies}}
+ end
+ end.
+
+skip_message({1, _R, Replies}) ->
+ repair_read_quorum_failure(Replies);
+skip_message({WaitingCount, R, Replies}) ->
+ {ok, {WaitingCount-1, R, Replies}}.
+
+merge_read_reply(Key, Reply, Replies) ->
+ case lists:keyfind(Key, 1, Replies) of
+ false ->
+ {[{Key, Reply, 1} | Replies], 1};
+ {Key, _, N} ->
+ {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
+ end.
+
+make_key({ok, #doc{id=Id, revs=Revs}}) ->
+ {Id, Revs};
+make_key(Else) ->
+ Else.
+
+repair_read_quorum_failure(Replies) ->
+ case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of
+ [] ->
+ {stop, {not_found, missing}};
+ [Doc|_] ->
+ % TODO merge docs to find the winner as determined by replication
+ {stop, {ok, Doc}}
+ end. \ No newline at end of file
diff --git a/apps/fabric/src/fabric_doc_open_revs.erl b/apps/fabric/src/fabric_doc_open_revs.erl
new file mode 100644
index 00000000..61ff466f
--- /dev/null
+++ b/apps/fabric/src/fabric_doc_open_revs.erl
@@ -0,0 +1,65 @@
+-module(fabric_doc_open_revs).
+
+-export([go/4]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, Id, Revs, Options) ->
+ Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs,
+ [Id, Revs, Options]),
+ R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")),
+ Acc0 = {length(Workers), list_to_integer(R), []},
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, {ok, Reply}} ->
+ {ok, Reply};
+ Else ->
+ Else
+ end.
+
+handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
+ skip_message(Acc0);
+handle_message({rexi_EXIT, _}, _Worker, Acc0) ->
+ skip_message(Acc0);
+handle_message(Reply, _Worker, {WaitingCount, R, Replies}) ->
+ case merge_read_reply(make_key(Reply), Reply, Replies) of
+ {_, KeyCount} when KeyCount =:= R ->
+ {stop, Reply};
+ {NewReplies, KeyCount} when KeyCount < R ->
+ if WaitingCount =:= 1 ->
+ % last message arrived, but still no quorum
+ repair_read_quorum_failure(NewReplies);
+ true ->
+ {ok, {WaitingCount-1, R, NewReplies}}
+ end
+ end.
+
+skip_message({1, _R, Replies}) ->
+ repair_read_quorum_failure(Replies);
+skip_message({WaitingCount, R, Replies}) ->
+ {ok, {WaitingCount-1, R, Replies}}.
+
+merge_read_reply(Key, Reply, Replies) ->
+ case lists:keyfind(Key, 1, Replies) of
+ false ->
+ {[{Key, Reply, 1} | Replies], 1};
+ {Key, _, N} ->
+ {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
+ end.
+
+make_key({ok, #doc{id=Id, revs=Revs}}) ->
+ {Id, Revs};
+make_key(Else) ->
+ Else.
+
+repair_read_quorum_failure(Replies) ->
+ case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of
+ [] ->
+ {stop, {not_found, missing}};
+ [Doc|_] ->
+ % TODO merge docs to find the winner as determined by replication
+ {stop, {ok, Doc}}
+ end.
+
+ \ No newline at end of file
diff --git a/apps/fabric/src/fabric_doc_update.erl b/apps/fabric/src/fabric_doc_update.erl
new file mode 100644
index 00000000..f0fcf112
--- /dev/null
+++ b/apps/fabric/src/fabric_doc_update.erl
@@ -0,0 +1,127 @@
+-module(fabric_doc_update).
+
+-export([go/3]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(_, [], _) ->
+ {ok, []};
+go(DbName, AllDocs, Opts) ->
+ validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
+ Options = lists:delete(all_or_nothing, Opts),
+ GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
+ Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+ {Shard#shard{ref=Ref}, Docs}
+ end, group_docs_by_shard(DbName, AllDocs)),
+ {Workers, _} = lists:unzip(GroupedDocs),
+ W = couch_util:get_value(w, Options, couch_config:get("cluster","w","2")),
+ Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs,
+ dict:from_list([{Doc,[]} || Doc <- AllDocs])},
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, Results} ->
+ Reordered = couch_util:reorder_results(AllDocs, Results),
+ {ok, [R || R <- Reordered, R =/= noreply]};
+ Else ->
+ Else
+ end.
+
+handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
+ skip_message(Acc0);
+handle_message({rexi_EXIT, _}, _Worker, Acc0) ->
+ skip_message(Acc0);
+handle_message({ok, Replies}, Worker, Acc0) ->
+ {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0,
+ Docs = couch_util:get_value(Worker, GroupedDocs),
+ DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
+ case {WaitingCount, dict:size(DocReplyDict)} of
+ {1, _} ->
+ % last message has arrived, we need to conclude things
+ {W, Reply} = dict:fold(fun force_reply/3, {W,[]}, DocReplyDict),
+ {stop, Reply};
+ {_, DocCount} ->
+ % we've got at least one reply for each document, let's take a look
+ case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of
+ continue ->
+ {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}};
+ {stop, W, FinalReplies} ->
+ {stop, FinalReplies}
+ end;
+ {_, N} when N < DocCount ->
+ % no point in trying to finalize anything yet
+ {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}
+ end;
+handle_message({missing_stub, Stub}, _, _) ->
+ throw({missing_stub, Stub});
+handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
+ {_, _, _, GroupedDocs, _} = Acc0,
+ Docs = couch_util:get_value(Worker, GroupedDocs),
+ handle_message({ok, [X || _D <- Docs]}, Worker, Acc0).
+
+force_reply(Doc, [], {W, Acc}) ->
+ {W, [{Doc, {error, internal_server_error}} | Acc]};
+force_reply(Doc, [FirstReply|_] = Replies, {W, Acc}) ->
+ case update_quorum_met(W, Replies) of
+ {true, Reply} ->
+ {W, [{Doc,Reply} | Acc]};
+ false ->
+ ?LOG_ERROR("write quorum (~p) failed, reply ~p", [W, FirstReply]),
+ % TODO make a smarter choice than just picking the first reply
+ {W, [{Doc,FirstReply} | Acc]}
+ end.
+
+maybe_reply(_, _, continue) ->
+ % we didn't meet quorum for all docs, so we're fast-forwarding the fold
+ continue;
+maybe_reply(Doc, Replies, {stop, W, Acc}) ->
+ case update_quorum_met(W, Replies) of
+ {true, Reply} ->
+ {stop, W, [{Doc, Reply} | Acc]};
+ false ->
+ continue
+ end.
+
+update_quorum_met(W, Replies) ->
+ Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end,
+ orddict:new(), Replies),
+ case lists:dropwhile(fun({_, Count}) -> Count < W end, Counters) of
+ [] ->
+ false;
+ [{FinalReply, _} | _] ->
+ {true, FinalReply}
+ end.
+
+-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
+group_docs_by_shard(DbName, Docs) ->
+ dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, Doc, D1)
+ end, D0, mem3:shards(DbName,Id))
+ end, dict:new(), Docs)).
+
+append_update_replies([], [], DocReplyDict) ->
+ DocReplyDict;
+append_update_replies([Doc|Rest], [], Dict0) ->
+ % icky, if replicated_changes only errors show up in result
+ append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
+append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
+ % TODO what if the same document shows up twice in one update_docs call?
+ append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
+
+skip_message(Acc0) ->
+ % TODO fix this
+ {ok, Acc0}.
+
+validate_atomic_update(_, _, false) ->
+ ok;
+validate_atomic_update(_DbName, AllDocs, true) ->
+ % TODO actually perform the validation. This requires some hackery, we need
+ % to basically extract the prep_and_validate_updates function from couch_db
+ % and only run that, without actually writing in case of a success.
+ Error = {not_implemented, <<"all_or_nothing is not supported yet">>},
+ PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) ->
+ case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end,
+ {{Id, {Pos, RevId}}, Error}
+ end, AllDocs),
+ throw({aborted, PreCommitFailures}).
diff --git a/apps/fabric/src/fabric_group_info.erl b/apps/fabric/src/fabric_group_info.erl
new file mode 100644
index 00000000..04605a66
--- /dev/null
+++ b/apps/fabric/src/fabric_group_info.erl
@@ -0,0 +1,52 @@
+-module(fabric_group_info).
+
+-export([go/2]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, GroupId) when is_binary(GroupId) ->
+ {ok, DDoc} = fabric:open_doc(DbName, GroupId, []),
+ go(DbName, DDoc);
+
+go(DbName, #doc{} = DDoc) ->
+ Group = couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc),
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, group_info, [Group]),
+ Acc0 = {fabric_dict:init(Workers, nil), []},
+ fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
+
+handle_message({ok, Info}, Shard, {Counters, Acc}) ->
+ case fabric_dict:lookup_element(Shard, Counters) of
+ undefined ->
+ % already heard from someone else in this range
+ {ok, {Counters, Acc}};
+ nil ->
+ C1 = fabric_dict:store(Shard, ok, Counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ case fabric_dict:any(nil, C2) of
+ true ->
+ {ok, {C2, [Info|Acc]}};
+ false ->
+ {stop, merge_results(lists:flatten([Info|Acc]))}
+ end
+ end;
+handle_message(_, _, Acc) ->
+ {ok, Acc}.
+
+merge_results(Info) ->
+ Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end,
+ orddict:new(), Info),
+ orddict:fold(fun
+ (signature, [X|_], Acc) ->
+ [{signature, X} | Acc];
+ (language, [X|_], Acc) ->
+ [{language, X} | Acc];
+ (disk_size, X, Acc) ->
+ [{disk_size, lists:sum(X)} | Acc];
+ (compact_running, X, Acc) ->
+ [{compact_running, lists:member(true, X)} | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Dict).
diff --git a/apps/fabric/src/fabric_rpc.erl b/apps/fabric/src/fabric_rpc.erl
new file mode 100644
index 00000000..f56e3f68
--- /dev/null
+++ b/apps/fabric/src/fabric_rpc.erl
@@ -0,0 +1,388 @@
+-module(fabric_rpc).
+
+-export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
+-export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]).
+-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]).
+-export([create_db/3, delete_db/3, reset_validation_funs/1, set_security/3,
+ set_revs_limit/3]).
+
+-include("fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record (view_acc, {
+ db,
+ limit,
+ include_docs,
+ offset = nil,
+ total_rows,
+ reduce_fun = fun couch_db:enum_docs_reduce_to_count/1,
+ group_level = 0
+}).
+
+%% rpc endpoints
+%% call to with_db will supply your M:F with a #db{} and then remaining args
+
+all_docs(DbName, #view_query_args{keys=nil} = QueryArgs) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ #view_query_args{
+ start_key = StartKey,
+ start_docid = StartDocId,
+ end_key = EndKey,
+ end_docid = EndDocId,
+ limit = Limit,
+ skip = Skip,
+ include_docs = IncludeDocs,
+ direction = Dir,
+ inclusive_end = Inclusive
+ } = QueryArgs,
+ {ok, Total} = couch_db:get_doc_count(Db),
+ Acc0 = #view_acc{
+ db = Db,
+ include_docs = IncludeDocs,
+ limit = Limit+Skip,
+ total_rows = Total
+ },
+ EndKeyType = if Inclusive -> end_key; true -> end_key_gt end,
+ Options = [
+ {dir, Dir},
+ {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end},
+ {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end}
+ ],
+ {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options),
+ final_response(Total, Acc#view_acc.offset).
+
+changes(DbName, Args, StartSeq) ->
+ #changes_args{style=Style, dir=Dir} = Args,
+ case couch_db:open(DbName, []) of
+ {ok, Db} ->
+ Enum = fun changes_enumerator/2,
+ Opts = [{dir,Dir}],
+ Acc0 = {Db, StartSeq, Args},
+ try
+ {ok, {_, LastSeq, _}} =
+ couch_db:changes_since(Db, Style, StartSeq, Enum, Opts, Acc0),
+ rexi:reply({complete, LastSeq})
+ after
+ couch_db:close(Db)
+ end;
+ Error ->
+ rexi:reply(Error)
+ end.
+
+map_view(DbName, DDoc, ViewName, QueryArgs) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ #view_query_args{
+ limit = Limit,
+ skip = Skip,
+ keys = Keys,
+ include_docs = IncludeDocs,
+ stale = Stale,
+ view_type = ViewType
+ } = QueryArgs,
+ MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end,
+ Group0 = couch_view_group:design_doc_to_view_group(Db, DDoc),
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
+ {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
+ View = fabric_view:extract_view(Pid, ViewName, Group#group.views, ViewType),
+ {ok, Total} = couch_view:get_row_count(View),
+ Acc0 = #view_acc{
+ db = Db,
+ include_docs = IncludeDocs,
+ limit = Limit+Skip,
+ total_rows = Total,
+ reduce_fun = fun couch_view:reduce_to_count/1
+ },
+ case Keys of
+ nil ->
+ Options = couch_httpd_view:make_key_options(QueryArgs),
+ {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options);
+ _ ->
+ Acc = lists:foldl(fun(Key, AccIn) ->
+ KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key},
+ Options = couch_httpd_view:make_key_options(KeyArgs),
+ {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn,
+ Options),
+ Out
+ end, Acc0, Keys)
+ end,
+ final_response(Total, Acc#view_acc.offset).
+
+reduce_view(DbName, Group0, ViewName, QueryArgs) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ #view_query_args{
+ group_level = GroupLevel,
+ limit = Limit,
+ skip = Skip,
+ keys = Keys,
+ stale = Stale
+ } = QueryArgs,
+ GroupFun = group_rows_fun(GroupLevel),
+ MinSeq = if Stale == ok -> 0; true -> couch_db:get_update_seq(Db) end,
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
+ {ok, #group{views=Views, def_lang=Lang}} = couch_view_group:request_group(
+ Pid, MinSeq),
+ {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
+ ReduceView = {reduce, NthRed, Lang, View},
+ Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
+ case Keys of
+ nil ->
+ Options0 = couch_httpd_view:make_key_options(QueryArgs),
+ Options = [{key_group_fun, GroupFun} | Options0],
+ couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options);
+ _ ->
+ lists:map(fun(Key) ->
+ KeyArgs = QueryArgs#view_query_args{start_key=Key, end_key=Key},
+ Options0 = couch_httpd_view:make_key_options(KeyArgs),
+ Options = [{key_group_fun, GroupFun} | Options0],
+ couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options)
+ end, Keys)
+ end,
+ rexi:reply(complete).
+
+create_db(DbName, Options, Doc) ->
+ mem3_util:write_db_doc(Doc),
+ rexi:reply(case couch_server:create(DbName, Options) of
+ {ok, _} ->
+ ok;
+ Error ->
+ Error
+ end).
+
+delete_db(DbName, Options, DocId) ->
+ mem3_util:delete_db_doc(DocId),
+ rexi:reply(couch_server:delete(DbName, Options)).
+
+get_db_info(DbName) ->
+ with_db(DbName, [], {couch_db, get_db_info, []}).
+
+get_doc_count(DbName) ->
+ with_db(DbName, [], {couch_db, get_doc_count, []}).
+
+get_update_seq(DbName) ->
+ with_db(DbName, [], {couch_db, get_update_seq, []}).
+
+set_security(DbName, SecObj, Options) ->
+ with_db(DbName, Options, {couch_db, set_security, [SecObj]}).
+
+set_revs_limit(DbName, Limit, Options) ->
+ with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
+
+open_doc(DbName, DocId, Options) ->
+ with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
+
+open_revs(DbName, Id, Revs, Options) ->
+ with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}).
+
+get_missing_revs(DbName, IdRevsList) ->
+ % reimplement here so we get [] for Ids with no missing revs in response
+ rexi:reply(case couch_db:open(DbName, []) of
+ {ok, Db} ->
+ Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
+ {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) ->
+ case FullDocInfoResult of
+ {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} ->
+ MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs),
+ {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)};
+ not_found ->
+ {Id, Revs, []}
+ end
+ end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))};
+ Error ->
+ Error
+ end).
+
+update_docs(DbName, Docs0, Options) ->
+ case proplists:get_value(replicated_changes, Options) of
+ true ->
+ X = replicated_changes;
+ _ ->
+ X = interactive_edit
+ end,
+ Docs = make_att_readers(Docs0),
+ with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
+
+group_info(DbName, Group0) ->
+ {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
+ rexi:reply(couch_view_group:request_group_info(Pid)).
+
+reset_validation_funs(DbName) ->
+ case couch_db:open(DbName, []) of
+ {ok, #db{main_pid = Pid}} ->
+ gen_server:cast(Pid, {load_validation_funs, undefined});
+ _ ->
+ ok
+ end.
+
+%%
+%% internal
+%%
+
+with_db(DbName, Options, {M,F,A}) ->
+ case couch_db:open(DbName, Options) of
+ {ok, Db} ->
+ rexi:reply(try
+ apply(M, F, [Db | A])
+ catch Exception ->
+ Exception;
+ error:Reason ->
+ ?LOG_ERROR("~p ~p ~p~n~p", [?MODULE, {M,F}, Reason,
+ erlang:get_stacktrace()]),
+ {error, Reason}
+ end);
+ Error ->
+ rexi:reply(Error)
+ end.
+
+view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
+ % matches for _all_docs and translates #full_doc_info{} -> KV pair
+ case couch_doc:to_doc_info(FullDocInfo) of
+ #doc_info{revs=[#rev_info{deleted=false, rev=Rev}|_]} ->
+ Id = FullDocInfo#full_doc_info.id,
+ Value = {[{rev,couch_doc:rev_to_str(Rev)}]},
+ view_fold({{Id,Id}, Value}, OffsetReds, Acc);
+ #doc_info{revs=[#rev_info{deleted=true}|_]} ->
+ {ok, Acc}
+ end;
+view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) ->
+ % calculates the offset for this shard
+ #view_acc{reduce_fun=Reduce} = Acc,
+ Offset = Reduce(OffsetReds),
+ case rexi:sync_reply({total_and_offset, Total, Offset}) of
+ ok ->
+ view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset});
+ stop ->
+ exit(normal);
+ timeout ->
+ exit(timeout)
+ end;
+view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) ->
+ % we scanned through limit+skip local rows
+ {stop, Acc};
+view_fold({{Key,Id}, Value}, _Offset, Acc) ->
+ % the normal case
+ #view_acc{
+ db = Db,
+ limit = Limit,
+ include_docs = IncludeDocs
+ } = Acc,
+ Doc = if not IncludeDocs -> undefined; true ->
+ case couch_db:open_doc(Db, Id, []) of
+ {not_found, deleted} ->
+ null;
+ {not_found, missing} ->
+ undefined;
+ {ok, Doc0} ->
+ couch_doc:to_json_obj(Doc0, [])
+ end
+ end,
+ case rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
+ ok ->
+ {ok, Acc#view_acc{limit=Limit-1}};
+ timeout ->
+ exit(timeout)
+ end.
+
+final_response(Total, nil) ->
+ case rexi:sync_reply({total_and_offset, Total, Total}) of ok ->
+ rexi:reply(complete);
+ stop ->
+ ok;
+ timeout ->
+ exit(timeout)
+ end;
+final_response(_Total, _Offset) ->
+ rexi:reply(complete).
+
+group_rows_fun(exact) ->
+ fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end;
+group_rows_fun(0) ->
+ fun(_A, _B) -> true end;
+group_rows_fun(GroupLevel) when is_integer(GroupLevel) ->
+ fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) ->
+ lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel);
+ ({Key1,_}, {Key2,_}) ->
+ Key1 == Key2
+ end.
+
+reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) ->
+ {stop, Acc};
+reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) ->
+ send(null, Red, Acc);
+reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) ->
+ send(Key, Red, Acc);
+reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
+ send(lists:sublist(K, I), Red, Acc).
+
+send(Key, Value, #view_acc{limit=Limit} = Acc) ->
+ case rexi:sync_reply(#view_row{key=Key, value=Value}) of
+ ok ->
+ {ok, Acc#view_acc{limit=Limit-1}};
+ stop ->
+ exit(normal);
+ timeout ->
+ exit(timeout)
+ end.
+
+changes_enumerator(DocInfo, {Db, _Seq, Args}) ->
+ #changes_args{include_docs=IncludeDocs, filter=FilterFun} = Args,
+ #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}
+ = DocInfo,
+ case [Result || Result <- FilterFun(DocInfo), Result /= null] of
+ [] ->
+ {ok, {Db, Seq, Args}};
+ Results ->
+ ChangesRow = changes_row(Db, Seq, Id, Results, Rev, Del, IncludeDocs),
+ Go = rexi:sync_reply(ChangesRow),
+ {Go, {Db, Seq, Args}}
+ end.
+
+changes_row(_, Seq, Id, Results, _, true, true) ->
+ #view_row{key=Seq, id=Id, value=Results, doc=deleted};
+changes_row(_, Seq, Id, Results, _, true, false) ->
+ #view_row{key=Seq, id=Id, value=Results, doc=deleted};
+changes_row(Db, Seq, Id, Results, Rev, false, true) ->
+ #view_row{key=Seq, id=Id, value=Results, doc=doc_member(Db, Id, Rev)};
+changes_row(_, Seq, Id, Results, _, false, false) ->
+ #view_row{key=Seq, id=Id, value=Results}.
+
+doc_member(Shard, Id, Rev) ->
+ case couch_db:open_doc_revs(Shard, Id, [Rev], []) of
+ {ok, [{ok,Doc}]} ->
+ couch_doc:to_json_obj(Doc, []);
+ Error ->
+ Error
+ end.
+
+possible_ancestors(_FullInfo, []) ->
+ [];
+possible_ancestors(FullInfo, MissingRevs) ->
+ #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
+ LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
+ % Find the revs that are possible parents of this rev
+ lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
+ % this leaf is a "possible ancenstor" of the missing
+ % revs if this LeafPos lessthan any of the missing revs
+ case lists:any(fun({MissingPos, _}) ->
+ LeafPos < MissingPos end, MissingRevs) of
+ true ->
+ [{LeafPos, LeafRevId} | Acc];
+ false ->
+ Acc
+ end
+ end, [], LeafRevs).
+
+make_att_readers([]) ->
+ [];
+make_att_readers([#doc{atts=Atts0} = Doc | Rest]) ->
+ % % go through the attachments looking for 'follows' in the data,
+ % % replace with function that reads the data from MIME stream.
+ Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0],
+ [Doc#doc{atts = Atts} | make_att_readers(Rest)].
+
+make_att_reader({follows, Parser}) ->
+ fun() ->
+ Parser ! {get_bytes, self()},
+ receive {bytes, Bytes} -> Bytes end
+ end;
+make_att_reader(Else) ->
+ Else.
diff --git a/apps/fabric/src/fabric_util.erl b/apps/fabric/src/fabric_util.erl
new file mode 100644
index 00000000..639a32e7
--- /dev/null
+++ b/apps/fabric/src/fabric_util.erl
@@ -0,0 +1,89 @@
+-module(fabric_util).
+
+-export([submit_jobs/3, cleanup/1, recv/4, receive_loop/4, receive_loop/6,
+ get_db/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+submit_jobs(Shards, EndPoint, ExtraArgs) ->
+ lists:map(fun(#shard{node=Node, name=ShardName} = Shard) ->
+ Ref = rexi:cast(Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}),
+ Shard#shard{ref = Ref}
+ end, Shards).
+
+cleanup(Workers) ->
+ [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
+
+recv(Workers, Keypos, Fun, Acc0) ->
+ receive_loop(Workers, Keypos, Fun, Acc0).
+
+receive_loop(Workers, Keypos, Fun, Acc0) ->
+ case couch_config:get("fabric", "request_timeout", "60000") of
+ "infinity" ->
+ Timeout = infinity;
+ N ->
+ Timeout = list_to_integer(N)
+ end,
+ receive_loop(Workers, Keypos, Fun, Acc0, Timeout, infinity).
+
+%% @doc set up the receive loop with an overall timeout
+-spec receive_loop([any()], integer(), function(), any(), timeout(), timeout()) ->
+ {ok, any()} | timeout | {error, any()}.
+receive_loop(RefPartMap, Keypos, Fun, Acc0, infinity, PerMsgTO) ->
+ process_mailbox(RefPartMap, Keypos, Fun, Acc0, nil, PerMsgTO);
+receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) ->
+ TimeoutRef = erlang:make_ref(),
+ {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}),
+ try
+ process_mailbox(RefPartMap, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO)
+ after
+ timer:cancel(TRef)
+ end.
+
+process_mailbox(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
+ case process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) of
+ {ok, Acc} ->
+ process_mailbox(RefList, Keypos, Fun, Acc, TimeoutRef, PerMsgTO);
+ {stop, Acc} ->
+ {ok, Acc};
+ Error ->
+ Error
+ end.
+
+process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
+ receive
+ {timeout, TimeoutRef} ->
+ timeout;
+ {Ref, Msg} ->
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ % this was some non-matching message which we will ignore
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, Worker, Acc0)
+ end;
+ {Ref, From, Msg} ->
+ case lists:keyfind(Ref, Keypos, RefList) of
+ false ->
+ {ok, Acc0};
+ Worker ->
+ Fun(Msg, {Worker, From}, Acc0)
+ end;
+ {rexi_DOWN, _RexiMonPid, ServerPid, Reason} = Msg ->
+ showroom_log:message(alert, "rexi_DOWN ~p ~p", [ServerPid, Reason]),
+ Fun(Msg, nil, Acc0)
+ after PerMsgTO ->
+ timeout
+ end.
+
+get_db(DbName) ->
+ Shards = mem3:shards(DbName),
+ case lists:partition(fun(#shard{node = N}) -> N =:= node() end, Shards) of
+ {[#shard{name = ShardName}|_], _} ->
+ % prefer node-local DBs
+ couch_db:open(ShardName, []);
+ {[], [#shard{node = Node, name = ShardName}|_]} ->
+ % but don't require them
+ rpc:call(Node, couch_db, open, [ShardName, []])
+ end.
diff --git a/apps/fabric/src/fabric_view.erl b/apps/fabric/src/fabric_view.erl
new file mode 100644
index 00000000..49a3a55a
--- /dev/null
+++ b/apps/fabric/src/fabric_view.erl
@@ -0,0 +1,218 @@
+-module(fabric_view).
+
+-export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
+ maybe_pause_worker/3, maybe_resume_worker/2, transform_row/1, keydict/1,
+ extract_view/4]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% @doc looks for a fully covered keyrange in the list of counters
+-spec is_progress_possible([{#shard{}, term()}]) -> boolean().
+is_progress_possible([]) ->
+ false;
+is_progress_possible(Counters) ->
+ Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end,
+ [], Counters),
+ [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges),
+ Result = lists:foldl(fun
+ (_, fail) ->
+ % we've already declared failure
+ fail;
+ (_, complete) ->
+ % this is the success condition, we can fast-forward
+ complete;
+ ({X,_}, Tail) when X > (Tail+1) ->
+ % gap in the keyrange, we're dead
+ fail;
+ ({_,Y}, Tail) ->
+ case erlang:max(Tail, Y) of
+ End when (End+1) =:= (2 bsl 31) ->
+ complete;
+ Else ->
+ % the normal condition, adding to the tail
+ Else
+ end
+ end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest),
+ (Start =:= 0) andalso (Result =:= complete).
+
+-spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) ->
+ [{#shard{}, any()}].
+remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) ->
+ fabric_dict:filter(fun(#shard{range=[X,Y]} = Shard, _Value) ->
+ if Shard =:= Shard0 ->
+ % we can't remove ourselves
+ true;
+ A < B, X >= A, X < B ->
+ % lower bound is inside our range
+ false;
+ A < B, Y > A, Y =< B ->
+ % upper bound is inside our range
+ false;
+ B < A, X >= A orelse B < A, X < B ->
+ % target shard wraps the key range, lower bound is inside
+ false;
+ B < A, Y > A orelse B < A, Y =< B ->
+ % target shard wraps the key range, upper bound is inside
+ false;
+ true ->
+ true
+ end
+ end, Shards).
+
+maybe_pause_worker(Worker, From, State) ->
+ #collector{buffer_size = BufferSize, counters = Counters} = State,
+ case fabric_dict:lookup_element(Worker, Counters) of
+ BufferSize ->
+ State#collector{blocked = [{Worker,From} | State#collector.blocked]};
+ _Count ->
+ gen_server:reply(From, ok),
+ State
+ end.
+
+maybe_resume_worker(Worker, State) ->
+ #collector{buffer_size = Buffer, counters = C, blocked = B} = State,
+ case fabric_dict:lookup_element(Worker, C) of
+ Count when Count < Buffer/2 ->
+ case couch_util:get_value(Worker, B) of
+ undefined ->
+ State;
+ From ->
+ gen_server:reply(From, ok),
+ State#collector{blocked = lists:keydelete(Worker, 1, B)}
+ end;
+ _Other ->
+ State
+ end.
+
+maybe_send_row(#collector{limit=0} = State) ->
+ #collector{user_acc=AccIn, callback=Callback} = State,
+ {_, Acc} = Callback(complete, AccIn),
+ {stop, State#collector{user_acc=Acc}};
+maybe_send_row(State) ->
+ #collector{
+ callback = Callback,
+ counters = Counters,
+ skip = Skip,
+ limit = Limit,
+ user_acc = AccIn
+ } = State,
+ case fabric_dict:any(0, Counters) of
+ true ->
+ {ok, State};
+ false ->
+ try get_next_row(State) of
+ {_, NewState} when Skip > 0 ->
+ maybe_send_row(NewState#collector{skip=Skip-1, limit=Limit-1});
+ {Row, NewState} ->
+ case Callback(transform_row(Row), AccIn) of
+ {stop, Acc} ->
+ {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
+ {ok, Acc} ->
+ maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1})
+ end
+ catch complete ->
+ {_, Acc} = Callback(complete, AccIn),
+ {stop, State#collector{user_acc=Acc}}
+ end
+ end.
+
+keydict(nil) ->
+ undefined;
+keydict(Keys) ->
+ {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end,
+ {dict:new(),0}, Keys),
+ Dict.
+
+%% internal %%
+
+get_next_row(#collector{rows = []}) ->
+ throw(complete);
+get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined ->
+ #collector{
+ query_args = #view_query_args{direction=Dir},
+ keys = Keys,
+ rows = RowDict,
+ os_proc = Proc,
+ counters = Counters0
+ } = St,
+ {Key, RestKeys} = find_next_key(Keys, Dir, RowDict),
+ case dict:find(Key, RowDict) of
+ {ok, Records} ->
+ NewRowDict = dict:erase(Key, RowDict),
+ Counters = lists:foldl(fun(#view_row{worker=Worker}, CountersAcc) ->
+ fabric_dict:update_counter(Worker, -1, CountersAcc)
+ end, Counters0, Records),
+ Wrapped = [[V] || #view_row{value=V} <- Records],
+ {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped),
+ NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters},
+ {#view_row{key=Key, id=reduced, value=Reduced}, NewSt};
+ error ->
+ get_next_row(St#collector{keys=RestKeys})
+ end;
+get_next_row(State) ->
+ #collector{rows = [Row|Rest], counters = Counters0} = State,
+ Worker = Row#view_row.worker,
+ Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
+ NewState = maybe_resume_worker(Worker, State#collector{counters=Counters1}),
+ {Row, NewState#collector{rows = Rest}}.
+
+find_next_key(nil, Dir, RowDict) ->
+ case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
+ [] ->
+ throw(complete);
+ [Key|_] ->
+ {Key, nil}
+ end;
+find_next_key([], _, _) ->
+ throw(complete);
+find_next_key([Key|Rest], _, _) ->
+ {Key, Rest}.
+
+transform_row(#view_row{key=Key, id=reduced, value=Value}) ->
+ {row, {[{key,Key}, {value,Value}]}};
+transform_row(#view_row{key=Key, id=undefined}) ->
+ {row, {[{key,Key}, {error,not_found}]}};
+transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
+ {row, {[{id,Id}, {key,Key}, {value,Value}]}};
+transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) ->
+ {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}};
+transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
+ {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}.
+
+sort_fun(fwd) ->
+ fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end;
+sort_fun(rev) ->
+ fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end.
+
+extract_view(Pid, ViewName, [], _ViewType) ->
+ ?LOG_ERROR("missing_named_view ~p", [ViewName]),
+ exit(Pid, kill),
+ exit(missing_named_view);
+extract_view(Pid, ViewName, [View|Rest], ViewType) ->
+ case lists:member(ViewName, view_names(View, ViewType)) of
+ true ->
+ if ViewType == reduce ->
+ {index_of(ViewName, view_names(View, reduce)), View};
+ true ->
+ View
+ end;
+ false ->
+ extract_view(Pid, ViewName, Rest, ViewType)
+ end.
+
+view_names(View, Type) when Type == red_map; Type == reduce ->
+ [Name || {Name, _} <- View#view.reduce_funs];
+view_names(View, map) ->
+ View#view.map_names.
+
+index_of(X, List) ->
+ index_of(X, List, 1).
+
+index_of(_X, [], _I) ->
+ not_found;
+index_of(X, [X|_Rest], I) ->
+ I;
+index_of(X, [_|Rest], I) ->
+ index_of(X, Rest, I+1).
diff --git a/apps/fabric/src/fabric_view_all_docs.erl b/apps/fabric/src/fabric_view_all_docs.erl
new file mode 100644
index 00000000..d51a2831
--- /dev/null
+++ b/apps/fabric/src/fabric_view_all_docs.erl
@@ -0,0 +1,167 @@
+-module(fabric_view_all_docs).
+
+-export([go/4]).
+-export([open_doc/3]). % exported for spawn
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
+ Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) ->
+ Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}),
+ Shard#shard{ref = Ref}
+ end, mem3:shards(DbName)),
+ BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
+ #view_query_args{limit = Limit, skip = Skip} = QueryArgs,
+ State = #collector{
+ query_args = QueryArgs,
+ callback = Callback,
+ buffer_size = list_to_integer(BufferSize),
+ counters = fabric_dict:init(Workers, 0),
+ skip = Skip,
+ limit = Limit,
+ user_acc = Acc0
+ },
+ try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 5000) of
+ {ok, NewState} ->
+ {ok, NewState#collector.user_acc};
+ Error ->
+ Error
+ after
+ fabric_util:cleanup(Workers)
+ end;
+
+go(DbName, QueryArgs, Callback, Acc0) ->
+ #view_query_args{
+ direction = Dir,
+ include_docs = IncludeDocs,
+ limit = Limit0,
+ skip = Skip0,
+ keys = Keys
+ } = QueryArgs,
+ {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end),
+ Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) ||
+ Id <- Keys],
+ Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end,
+ receive {'DOWN', Ref0, _, _, {ok, TotalRows}} ->
+ {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0),
+ {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1),
+ Callback(complete, Acc2)
+ after 10000 ->
+ Callback(timeout, Acc0)
+ end.
+
+handle_message({rexi_DOWN, _, _, _}, nil, State) ->
+ % TODO see if progress can be made here, possibly by removing all shards
+ % from that node and checking is_progress_possible
+ {ok, State};
+
+handle_message({rexi_EXIT, _}, Worker, State) ->
+ #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters}};
+ false ->
+ Callback({error, dead_shards}, Acc),
+ {error, dead_shards}
+ end;
+
+handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
+ #collector{
+ callback = Callback,
+ counters = Counters0,
+ total_rows = Total0,
+ offset = Offset0,
+ user_acc = AccIn
+ } = State,
+ case fabric_dict:lookup_element(Worker, Counters0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate
+ gen_server:reply(From, stop),
+ {ok, State};
+ 0 ->
+ gen_server:reply(From, ok),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
+ Total = Total0 + Tot,
+ Offset = Offset0 + Off,
+ case fabric_dict:any(0, Counters2) of
+ true ->
+ {ok, State#collector{
+ counters = Counters2,
+ total_rows = Total,
+ offset = Offset
+ }};
+ false ->
+ FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+ {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn),
+ {Go, State#collector{
+ counters = fabric_dict:decrement_all(Counters2),
+ total_rows = Total,
+ offset = FinalOffset,
+ user_acc = Acc
+ }}
+ end
+ end;
+
+handle_message(#view_row{} = Row, {Worker, From}, State) ->
+ #collector{query_args = Args, counters = Counters0, rows = Rows0} = State,
+ Dir = Args#view_query_args.direction,
+ Rows = merge_row(Dir, Row#view_row{worker=Worker}, Rows0),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ State1 = State#collector{rows=Rows, counters=Counters1},
+ State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
+ fabric_view:maybe_send_row(State2);
+
+handle_message(complete, Worker, State) ->
+ Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+ fabric_view:maybe_send_row(State#collector{counters = Counters}).
+
+
+merge_row(fwd, Row, Rows) ->
+ lists:keymerge(#view_row.id, [Row], Rows);
+merge_row(rev, Row, Rows) ->
+ lists:rkeymerge(#view_row.id, [Row], Rows).
+
+doc_receive_loop([], _, _, _, Acc) ->
+ {ok, Acc};
+doc_receive_loop(_, _, 0, _, Acc) ->
+ {ok, Acc};
+doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 ->
+ receive {'DOWN', Ref, process, Pid, #view_row{}} ->
+ doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc)
+ after 10000 ->
+ timeout
+ end;
+doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) ->
+ receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
+ case Callback(fabric_view:transform_row(Row), AccIn) of
+ {ok, Acc} ->
+ doc_receive_loop(Rest, 0, Limit-1, Callback, Acc);
+ {stop, Acc} ->
+ {ok, Acc}
+ end
+ after 10000 ->
+ timeout
+ end.
+
+open_doc(DbName, Id, IncludeDocs) ->
+ Row = case fabric:open_doc(DbName, Id, [deleted]) of
+ {not_found, missing} ->
+ Doc = undefined,
+ #view_row{key=Id};
+ {ok, #doc{deleted=true, revs=Revs}} ->
+ Doc = null,
+ {RevPos, [RevId|_]} = Revs,
+ Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]},
+ #view_row{key=Id, id=Id, value=Value};
+ {ok, #doc{revs=Revs} = Doc0} ->
+ Doc = couch_doc:to_json_obj(Doc0, []),
+ {RevPos, [RevId|_]} = Revs,
+ Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]},
+ #view_row{key=Id, id=Id, value=Value}
+ end,
+ exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end).
diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl
new file mode 100644
index 00000000..6030df1d
--- /dev/null
+++ b/apps/fabric/src/fabric_view_changes.erl
@@ -0,0 +1,251 @@
+-module(fabric_view_changes).
+
+-export([go/5, start_update_notifier/1]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
+ Feed == "longpoll" ->
+ Args = make_changes_args(Options),
+ {ok, Acc} = Callback(start, Acc0),
+ Notifiers = start_update_notifiers(DbName),
+ {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback),
+ try
+ keep_sending_changes(
+ DbName,
+ Args,
+ Callback,
+ get_start_seq(DbName, Args),
+ Acc,
+ Timeout,
+ TimeoutFun
+ )
+ after
+ stop_update_notifiers(Notifiers),
+ couch_changes:get_rest_db_updated()
+ end;
+
+go(DbName, "normal", Options, Callback, Acc0) ->
+ Args = make_changes_args(Options),
+ {ok, Acc} = Callback(start, Acc0),
+ {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
+ DbName,
+ Args,
+ Callback,
+ get_start_seq(DbName, Args),
+ Acc
+ ),
+ Callback({stop, pack_seqs(Seqs)}, AccOut).
+
+keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) ->
+ #changes_args{limit=Limit, feed=Feed} = Args,
+ {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn),
+ #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
+ LastSeq = pack_seqs(NewSeqs),
+ if Limit > Limit2, Feed == "longpoll" ->
+ Callback({stop, LastSeq}, AccOut);
+ true ->
+ case couch_changes:wait_db_updated(Timeout, TFun) of
+ updated ->
+ keep_sending_changes(
+ DbName,
+ Args#changes_args{limit=Limit2},
+ Callback,
+ LastSeq,
+ AccIn,
+ Timeout,
+ TFun
+ );
+ stop ->
+ Callback({stop, LastSeq}, AccOut)
+ end
+ end.
+
+send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn) ->
+ AllShards = mem3:shards(DbName),
+ Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
+ case lists:member(Shard, AllShards) of
+ true ->
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
+ [{Shard#shard{ref = Ref}, Seq}];
+ false ->
+ % Find some replacement shards to cover the missing range
+ % TODO It's possible in rare cases of shard merging to end up
+ % with overlapping shard ranges from this technique
+ lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
+ Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
+ {NewShard#shard{ref = Ref}, 0}
+ end, find_replacement_shards(Shard, AllShards))
+ end
+ end, unpack_seqs(PackedSeqs, DbName)),
+ {Workers, _} = lists:unzip(Seqs),
+ State = #collector{
+ query_args = ChangesArgs,
+ callback = Callback,
+ counters = fabric_dict:init(Workers, 0),
+ user_acc = AccIn,
+ limit = ChangesArgs#changes_args.limit,
+ rows = Seqs % store sequence positions instead
+ },
+ try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 5000)
+ after
+ fabric_util:cleanup(Workers)
+ end.
+
+handle_message({rexi_DOWN, _, _, _}, nil, State) ->
+ % TODO see if progress can be made here, possibly by removing all shards
+ % from that node and checking is_progress_possible
+ {ok, State};
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
+ #collector{
+ callback=Callback,
+ counters=Counters0,
+ rows = Seqs0,
+ user_acc=Acc
+ } = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ Seqs = fabric_dict:erase(Worker, Seqs0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters, rows=Seqs}};
+ false ->
+ Callback({error, dead_shards}, Acc),
+ {error, dead_shards}
+ end;
+
+handle_message(_, _, #collector{limit=0} = State) ->
+ {stop, State};
+
+handle_message(#view_row{key=Seq} = Row0, {Worker, From}, St) ->
+ #collector{
+ query_args = #changes_args{include_docs=IncludeDocs},
+ callback = Callback,
+ counters = S0,
+ limit = Limit,
+ user_acc = AccIn
+ } = St,
+ case fabric_dict:lookup_element(Worker, S0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate it
+ gen_server:reply(From, stop),
+ {ok, St};
+ _ ->
+ S1 = fabric_dict:store(Worker, Seq, S0),
+ S2 = fabric_view:remove_overlapping_shards(Worker, S1),
+ Row = Row0#view_row{key = pack_seqs(S2)},
+ {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+ gen_server:reply(From, Go),
+ {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}}
+ end;
+
+handle_message({complete, EndSeq}, Worker, State) ->
+ #collector{
+ counters = S0,
+ total_rows = Completed % override
+ } = State,
+ case fabric_dict:lookup_element(Worker, S0) of
+ undefined ->
+ {ok, State};
+ _ ->
+ S1 = fabric_dict:store(Worker, EndSeq, S0),
+ % unlikely to have overlaps here, but possible w/ filters
+ S2 = fabric_view:remove_overlapping_shards(Worker, S1),
+ NewState = State#collector{counters=S2, total_rows=Completed+1},
+ case fabric_dict:size(S2) =:= (Completed+1) of
+ true ->
+ {stop, NewState};
+ false ->
+ {ok, NewState}
+ end
+ end.
+
+make_changes_args(#changes_args{style=main_only, filter=undefined}=Args) ->
+ Args#changes_args{filter = fun couch_changes:main_only_filter/1};
+make_changes_args(#changes_args{style=all_docs, filter=undefined}=Args) ->
+ Args#changes_args{filter = fun couch_changes:all_docs_filter/1};
+make_changes_args(Args) ->
+ Args.
+
+get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
+ Since;
+get_start_seq(DbName, #changes_args{dir=rev}) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
+ {ok, Since} = fabric_util:recv(Workers, #shard.ref,
+ fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
+ Since.
+
+collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
+ case fabric_dict:lookup_element(Shard, Counters) of
+ undefined ->
+ % already heard from someone else in this range
+ {ok, Counters};
+ -1 ->
+ C1 = fabric_dict:store(Shard, Seq, Counters),
+ C2 = fabric_view:remove_overlapping_shards(Shard, C1),
+ case fabric_dict:any(-1, C2) of
+ true ->
+ {ok, C2};
+ false ->
+ {stop, pack_seqs(C2)}
+ end
+ end.
+
+pack_seqs(Workers) ->
+ SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
+ SeqSum = lists:sum(element(2, lists:unzip(Workers))),
+ Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
+ list_to_binary([integer_to_list(SeqSum), $-, Opaque]).
+
+unpack_seqs(0, DbName) ->
+ fabric_dict:init(mem3:shards(DbName), 0);
+
+unpack_seqs("0", DbName) ->
+ fabric_dict:init(mem3:shards(DbName), 0);
+
+unpack_seqs(Packed, DbName) ->
+ {match, [Opaque]} = re:run(Packed, "^([0-9]+-)?(?<opaque>.*)", [{capture,
+ [opaque], binary}]),
+ % TODO relies on internal structure of fabric_dict as keylist
+ lists:map(fun({Node, [A,B], Seq}) ->
+ Shard = #shard{node=Node, range=[A,B], dbname=DbName},
+ {mem3_util:name_shard(Shard), Seq}
+ end, binary_to_term(couch_util:decodeBase64Url(Opaque))).
+
+start_update_notifiers(DbName) ->
+ lists:map(fun(#shard{node=Node, name=Name}) ->
+ {Node, rexi:cast(Node, {?MODULE, start_update_notifier, [Name]})}
+ end, mem3:shards(DbName)).
+
+% rexi endpoint
+start_update_notifier(DbName) ->
+ {Caller, _} = get(rexi_from),
+ Fun = fun({_, X}) when X == DbName -> Caller ! db_updated; (_) -> ok end,
+ Id = {couch_db_update_notifier, make_ref()},
+ ok = gen_event:add_sup_handler(couch_db_update, Id, Fun),
+ receive {gen_event_EXIT, Id, Reason} ->
+ rexi:reply({gen_event_EXIT, DbName, Reason})
+ end.
+
+stop_update_notifiers(Notifiers) ->
+ [rexi:kill(Node, Ref) || {Node, Ref} <- Notifiers].
+
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, null}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc=deleted}, false) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
+changes_row(#view_row{key=Seq, id=Id, value=Value}, false) ->
+ {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
+
+find_replacement_shards(#shard{range=Range}, AllShards) ->
+ % TODO make this moar betta -- we might have split or merged the partition
+ [Shard || Shard <- AllShards, Shard#shard.range =:= Range].
diff --git a/apps/fabric/src/fabric_view_map.erl b/apps/fabric/src/fabric_view_map.erl
new file mode 100644
index 00000000..ce8dd625
--- /dev/null
+++ b/apps/fabric/src/fabric_view_map.erl
@@ -0,0 +1,138 @@
+-module(fabric_view_map).
+
+-export([go/6]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
+ {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
+ go(DbName, DDoc, View, Args, Callback, Acc0);
+
+go(DbName, DDoc, View, Args, Callback, Acc0) ->
+ Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) ->
+ Ref = rexi:cast(Node, {fabric_rpc, map_view, [Name, DDoc, View, Args]}),
+ Shard#shard{ref = Ref}
+ end, mem3:shards(DbName)),
+ BufferSize = couch_config:get("fabric", "map_buffer_size", "2"),
+ #view_query_args{limit = Limit, skip = Skip, keys = Keys} = Args,
+ State = #collector{
+ query_args = Args,
+ callback = Callback,
+ buffer_size = list_to_integer(BufferSize),
+ counters = fabric_dict:init(Workers, 0),
+ skip = Skip,
+ limit = Limit,
+ keys = fabric_view:keydict(Keys),
+ sorted = Args#view_query_args.sorted,
+ user_acc = Acc0
+ },
+ try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 1000 * 60 * 60) of
+ {ok, NewState} ->
+ {ok, NewState#collector.user_acc};
+ Error ->
+ Error
+ after
+ fabric_util:cleanup(Workers)
+ end.
+
+handle_message({rexi_DOWN, _, _, _}, nil, State) ->
+ % TODO see if progress can be made here, possibly by removing all shards
+ % from that node and checking is_progress_possible
+ {ok, State};
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
+ #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters}};
+ false ->
+ Callback({error, dead_shards}, Acc),
+ {error, dead_shards}
+ end;
+
+handle_message({total_and_offset, Tot, Off}, {Worker, From}, State) ->
+ #collector{
+ callback = Callback,
+ counters = Counters0,
+ total_rows = Total0,
+ offset = Offset0,
+ user_acc = AccIn
+ } = State,
+ case fabric_dict:lookup_element(Worker, Counters0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate
+ gen_server:reply(From, stop),
+ {ok, State};
+ 0 ->
+ gen_server:reply(From, ok),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
+ Total = Total0 + Tot,
+ Offset = Offset0 + Off,
+ case fabric_dict:any(0, Counters2) of
+ true ->
+ {ok, State#collector{
+ counters = Counters2,
+ total_rows = Total,
+ offset = Offset
+ }};
+ false ->
+ FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+ {Go, Acc} = Callback({total_and_offset, Total, FinalOffset}, AccIn),
+ {Go, State#collector{
+ counters = fabric_dict:decrement_all(Counters2),
+ total_rows = Total,
+ offset = FinalOffset,
+ user_acc = Acc
+ }}
+ end
+ end;
+
+handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->
+ #collector{callback=Callback} = State,
+ {_, Acc} = Callback(complete, State#collector.user_acc),
+ {stop, State#collector{user_acc=Acc}};
+
+handle_message(#view_row{} = Row, {_,From}, #collector{sorted=false} = St) ->
+ #collector{callback=Callback, user_acc=AccIn, limit=Limit} = St,
+ {Go, Acc} = Callback(fabric_view:transform_row(Row), AccIn),
+ gen_server:reply(From, ok),
+ {Go, St#collector{user_acc=Acc, limit=Limit-1}};
+
+handle_message(#view_row{} = Row, {Worker, From}, State) ->
+ #collector{
+ query_args = #view_query_args{direction=Dir},
+ counters = Counters0,
+ rows = Rows0,
+ keys = KeyDict
+ } = State,
+ Rows = merge_row(Dir, KeyDict, Row#view_row{worker=Worker}, Rows0),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ State1 = State#collector{rows=Rows, counters=Counters1},
+ State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
+ fabric_view:maybe_send_row(State2);
+
+handle_message(complete, Worker, State) ->
+ Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+ fabric_view:maybe_send_row(State#collector{counters = Counters}).
+
+merge_row(fwd, undefined, Row, Rows) ->
+ lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
+ couch_view:less_json([KeyA, IdA], [KeyB, IdB])
+ end, [Row], Rows);
+merge_row(rev, undefined, Row, Rows) ->
+ lists:merge(fun(#view_row{key=KeyA, id=IdA}, #view_row{key=KeyB, id=IdB}) ->
+ couch_view:less_json([KeyB, IdB], [KeyA, IdA])
+ end, [Row], Rows);
+merge_row(_, KeyDict, Row, Rows) ->
+ lists:merge(fun(#view_row{key=A, id=IdA}, #view_row{key=B, id=IdB}) ->
+ if A =:= B -> IdA < IdB; true ->
+ dict:fetch(A, KeyDict) < dict:fetch(B, KeyDict)
+ end
+ end, [Row], Rows).
+
diff --git a/apps/fabric/src/fabric_view_reduce.erl b/apps/fabric/src/fabric_view_reduce.erl
new file mode 100644
index 00000000..ddde9f22
--- /dev/null
+++ b/apps/fabric/src/fabric_view_reduce.erl
@@ -0,0 +1,85 @@
+-module(fabric_view_reduce).
+
+-export([go/6]).
+
+-include("fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
+ {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
+ go(DbName, DDoc, View, Args, Callback, Acc0);
+
+go(DbName, DDoc, VName, Args, Callback, Acc0) ->
+ #group{def_lang=Lang, views=Views} = Group =
+ couch_view_group:design_doc_to_view_group(#db{name=DbName}, DDoc),
+ {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
+ {VName, RedSrc} = lists:nth(NthRed, View#view.reduce_funs),
+ Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
+ Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,Group,VName,Args]}),
+ Shard#shard{ref = Ref}
+ end, mem3:shards(DbName)),
+ BufferSize = couch_config:get("fabric", "reduce_buffer_size", "20"),
+ #view_query_args{limit = Limit, skip = Skip} = Args,
+ State = #collector{
+ query_args = Args,
+ callback = Callback,
+ buffer_size = list_to_integer(BufferSize),
+ counters = fabric_dict:init(Workers, 0),
+ keys = Args#view_query_args.keys,
+ skip = Skip,
+ limit = Limit,
+ lang = Group#group.def_lang,
+ os_proc = couch_query_servers:get_os_process(Lang),
+ reducer = RedSrc,
+ rows = dict:new(),
+ user_acc = Acc0
+ },
+ try fabric_util:receive_loop(Workers, #shard.ref, fun handle_message/3,
+ State, infinity, 1000 * 60 * 60) of
+ {ok, NewState} ->
+ {ok, NewState#collector.user_acc};
+ Error ->
+ Error
+ after
+ fabric_util:cleanup(Workers),
+ catch couch_query_servers:ret_os_process(State#collector.os_proc)
+ end.
+
+handle_message({rexi_DOWN, _, _, _}, nil, State) ->
+ % TODO see if progress can be made here, possibly by removing all shards
+ % from that node and checking is_progress_possible
+ {ok, State};
+
+handle_message({rexi_EXIT, Reason}, Worker, State) ->
+ ?LOG_ERROR("~p rexi_EXIT ~p", [?MODULE, Reason]),
+ #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
+ Counters = fabric_dict:erase(Worker, Counters0),
+ case fabric_view:is_progress_possible(Counters) of
+ true ->
+ {ok, State#collector{counters = Counters}};
+ false ->
+ Callback({error, dead_shards}, Acc),
+ {error, dead_shards}
+ end;
+
+handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
+ #collector{counters = Counters0, rows = Rows0} = State,
+ case fabric_dict:lookup_element(Worker, Counters0) of
+ undefined ->
+ % this worker lost the race with other partition copies, terminate it
+ gen_server:reply(From, stop),
+ {ok, State};
+ _ ->
+ Rows = dict:append(Key, Row#view_row{worker=Worker}, Rows0),
+ C1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ % TODO time this call, if slow don't do it every time
+ C2 = fabric_view:remove_overlapping_shards(Worker, C1),
+ State1 = State#collector{rows=Rows, counters=C2},
+ State2 = fabric_view:maybe_pause_worker(Worker, From, State1),
+ fabric_view:maybe_send_row(State2)
+ end;
+
+handle_message(complete, Worker, State) ->
+ Counters = fabric_dict:update_counter(Worker, 1, State#collector.counters),
+ fabric_view:maybe_send_row(State#collector{counters = Counters}).