diff options
author | Robert Newson <rnewson@apache.org> | 2011-05-17 11:15:14 +0000 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2011-05-17 11:15:14 +0000 |
commit | e8e4b0d293021fe90326a85828f3cfb087bf18b7 (patch) | |
tree | 986f544eac623ec23b769b36828894f93a173aa3 /1.1.x/src/couchdb/couch_query_servers.erl | |
parent | da6a5322b0b8084f434752060caa8be214c6f4fa (diff) |
tagging 1.1.0
git-svn-id: https://svn.apache.org/repos/asf/couchdb/tags/1.1.0@1104149 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '1.1.x/src/couchdb/couch_query_servers.erl')
-rw-r--r-- | 1.1.x/src/couchdb/couch_query_servers.erl | 589 |
1 files changed, 589 insertions, 0 deletions
diff --git a/1.1.x/src/couchdb/couch_query_servers.erl b/1.1.x/src/couchdb/couch_query_servers.erl new file mode 100644 index 00000000..b0e46937 --- /dev/null +++ b/1.1.x/src/couchdb/couch_query_servers.erl @@ -0,0 +1,589 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_query_servers). +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]). +-export([start_doc_map/3, map_docs/2, stop_doc_map/1]). +-export([reduce/3, rereduce/3,validate_doc_update/5]). +-export([filter_docs/5]). + +-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]). + +% -export([test/0]). + +-include("couch_db.hrl"). + +-record(proc, { + pid, + lang, + ddoc_keys = [], + prompt_fun, + set_timeout_fun, + stop_fun +}). + +-record(qserver, { + langs, % Keyed by language name, value is {Mod,Func,Arg} + pid_procs, % Keyed by PID, valus is a #proc record. + lang_procs, % Keyed by language name, value is a #proc record + lang_limits, % Keyed by language name, value is {Lang, Limit, Current} + waitlist = [], + config +}). + +start_link() -> + gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). + +start_doc_map(Lang, Functions, Lib) -> + Proc = get_os_process(Lang), + case Lib of + {[]} -> ok; + Lib -> + true = proc_prompt(Proc, [<<"add_lib">>, Lib]) + end, + lists:foreach(fun(FunctionSource) -> + true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource]) + end, Functions), + {ok, Proc}. + +map_docs(Proc, Docs) -> + % send the documents + Results = lists:map( + fun(Doc) -> + Json = couch_doc:to_json_obj(Doc, []), + + FunsResults = proc_prompt(Proc, [<<"map_doc">>, Json]), + % the results are a json array of function map yields like this: + % [FunResults1, FunResults2 ...] + % where funresults is are json arrays of key value pairs: + % [[Key1, Value1], [Key2, Value2]] + % Convert the key, value pairs to tuples like + % [{Key1, Value1}, {Key2, Value2}] + lists:map( + fun(FunRs) -> + [list_to_tuple(FunResult) || FunResult <- FunRs] + end, + FunsResults) + end, + Docs), + {ok, Results}. + + +stop_doc_map(nil) -> + ok; +stop_doc_map(Proc) -> + ok = ret_os_process(Proc). + +group_reductions_results([]) -> + []; +group_reductions_results(List) -> + {Heads, Tails} = lists:foldl( + fun([H|T], {HAcc,TAcc}) -> + {[H|HAcc], [T|TAcc]} + end, {[], []}, List), + case Tails of + [[]|_] -> % no tails left + [Heads]; + _ -> + [Heads | group_reductions_results(Tails)] + end. + +rereduce(_Lang, [], _ReducedValues) -> + {ok, []}; +rereduce(Lang, RedSrcs, ReducedValues) -> + Grouped = group_reductions_results(ReducedValues), + Results = lists:zipwith( + fun + (<<"_", _/binary>> = FunSrc, Values) -> + {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []), + Result; + (FunSrc, Values) -> + os_rereduce(Lang, [FunSrc], Values) + end, RedSrcs, Grouped), + {ok, Results}. + +reduce(_Lang, [], _KVs) -> + {ok, []}; +reduce(Lang, RedSrcs, KVs) -> + {OsRedSrcs, BuiltinReds} = lists:partition(fun + (<<"_", _/binary>>) -> false; + (_OsFun) -> true + end, RedSrcs), + {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs), + {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []), + recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []). + +recombine_reduce_results([], [], [], Acc) -> + {ok, lists:reverse(Acc)}; +recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) -> + recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]); +recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) -> + recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]). + +os_reduce(_Lang, [], _KVs) -> + {ok, []}; +os_reduce(Lang, OsRedSrcs, KVs) -> + Proc = get_os_process(Lang), + OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of + [true, Reductions] -> Reductions + after + ok = ret_os_process(Proc) + end, + {ok, OsResults}. + +os_rereduce(Lang, OsRedSrcs, KVs) -> + Proc = get_os_process(Lang), + try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of + [true, [Reduction]] -> Reduction + after + ok = ret_os_process(Proc) + end. + + +builtin_reduce(_Re, [], _KVs, Acc) -> + {ok, lists:reverse(Acc)}; +builtin_reduce(Re, [<<"_sum",_/binary>>|BuiltinReds], KVs, Acc) -> + Sum = builtin_sum_rows(KVs), + builtin_reduce(Re, BuiltinReds, KVs, [Sum|Acc]); +builtin_reduce(reduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) -> + Count = length(KVs), + builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]); +builtin_reduce(rereduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) -> + Count = builtin_sum_rows(KVs), + builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]); +builtin_reduce(Re, [<<"_stats",_/binary>>|BuiltinReds], KVs, Acc) -> + Stats = builtin_stats(Re, KVs), + builtin_reduce(Re, BuiltinReds, KVs, [Stats|Acc]). + +builtin_sum_rows(KVs) -> + lists:foldl(fun + ([_Key, Value], Acc) when is_number(Value), is_number(Acc) -> + Acc + Value; + ([_Key, Value], Acc) when is_list(Value), is_list(Acc) -> + sum_terms(Acc, Value); + ([_Key, Value], Acc) when is_number(Value), is_list(Acc) -> + sum_terms(Acc, [Value]); + ([_Key, Value], Acc) when is_list(Value), is_number(Acc) -> + sum_terms([Acc], Value); + (_Else, _Acc) -> + throw({invalid_value, <<"builtin _sum function requires map values to be numbers or lists of numbers">>}) + end, 0, KVs). + +sum_terms([], []) -> + []; +sum_terms([_|_]=Xs, []) -> + Xs; +sum_terms([], [_|_]=Ys) -> + Ys; +sum_terms([X|Xs], [Y|Ys]) when is_number(X), is_number(Y) -> + [X+Y | sum_terms(Xs,Ys)]; +sum_terms(_, _) -> + throw({invalid_value, <<"builtin _sum function requires map values to be numbers or lists of numbers">>}). + +builtin_stats(reduce, [[_,First]|Rest]) when is_number(First) -> + Stats = lists:foldl(fun([_K,V], {S,C,Mi,Ma,Sq}) when is_number(V) -> + {S+V, C+1, lists:min([Mi, V]), lists:max([Ma, V]), Sq+(V*V)}; + (_, _) -> + throw({invalid_value, + <<"builtin _stats function requires map values to be numbers">>}) + end, {First,1,First,First,First*First}, Rest), + {Sum, Cnt, Min, Max, Sqr} = Stats, + {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]}; + +builtin_stats(rereduce, [[_,First]|Rest]) -> + {[{sum,Sum0}, {count,Cnt0}, {min,Min0}, {max,Max0}, {sumsqr,Sqr0}]} = First, + Stats = lists:foldl(fun([_K,Red], {S,C,Mi,Ma,Sq}) -> + {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]} = Red, + {Sum+S, Cnt+C, lists:min([Min, Mi]), lists:max([Max, Ma]), Sqr+Sq} + end, {Sum0,Cnt0,Min0,Max0,Sqr0}, Rest), + {Sum, Cnt, Min, Max, Sqr} = Stats, + {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]}. + +% use the function stored in ddoc.validate_doc_update to test an update. +validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> + JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), + JsonDiskDoc = json_doc(DiskDoc), + case ddoc_prompt(DDoc, [<<"validate_doc_update">>], [JsonEditDoc, JsonDiskDoc, Ctx, SecObj]) of + 1 -> + ok; + {[{<<"forbidden">>, Message}]} -> + throw({forbidden, Message}); + {[{<<"unauthorized">>, Message}]} -> + throw({unauthorized, Message}) + end. + +json_doc(nil) -> null; +json_doc(Doc) -> + couch_doc:to_json_obj(Doc, [revs]). + +filter_docs(Req, Db, DDoc, FName, Docs) -> + JsonReq = case Req of + {json_req, JsonObj} -> + JsonObj; + #httpd{} = HttpReq -> + couch_httpd_external:json_req_obj(HttpReq, Db) + end, + JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs], + [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName], [JsonDocs, JsonReq]), + {ok, Passes}. + +ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) -> + proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]). + +ddoc_prompt(DDoc, FunPath, Args) -> + with_ddoc_proc(DDoc, fun({Proc, DDocId}) -> + proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]) + end). + +with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) -> + Rev = couch_doc:rev_to_str({Start, DiskRev}), + DDocKey = {DDocId, Rev}, + Proc = get_ddoc_process(DDoc, DDocKey), + try Fun({Proc, DDocId}) + after + ok = ret_os_process(Proc) + end. + +init([]) -> + % read config and register for configuration changes + + % just stop if one of the config settings change. couch_server_sup + % will restart us and then we will pick up the new settings. + + ok = couch_config:register( + fun("query_servers" ++ _, _) -> + supervisor:terminate_child(couch_secondary_services, query_servers), + supervisor:restart_child(couch_secondary_services, query_servers) + end), + ok = couch_config:register( + fun("native_query_servers" ++ _, _) -> + supervisor:terminate_child(couch_secondary_services, query_servers), + [supervisor:restart_child(couch_secondary_services, query_servers)] + end), + ok = couch_config:register( + fun("query_server_config" ++ _, _) -> + supervisor:terminate_child(couch_secondary_services, query_servers), + supervisor:restart_child(couch_secondary_services, query_servers) + end), + + Langs = ets:new(couch_query_server_langs, [set, private]), + LangLimits = ets:new(couch_query_server_lang_limits, [set, private]), + PidProcs = ets:new(couch_query_server_pid_langs, [set, private]), + LangProcs = ets:new(couch_query_server_procs, [set, private]), + + ProcTimeout = list_to_integer(couch_config:get( + "couchdb", "os_process_timeout", "5000")), + ReduceLimit = list_to_atom( + couch_config:get("query_server_config","reduce_limit","true")), + OsProcLimit = list_to_integer( + couch_config:get("query_server_config","os_process_limit","10")), + + % 'query_servers' specifies an OS command-line to execute. + lists:foreach(fun({Lang, Command}) -> + true = ets:insert(LangLimits, {?l2b(Lang), OsProcLimit, 0}), + true = ets:insert(Langs, {?l2b(Lang), + couch_os_process, start_link, [Command]}) + end, couch_config:get("query_servers")), + % 'native_query_servers' specifies a {Module, Func, Arg} tuple. + lists:foreach(fun({Lang, SpecStr}) -> + {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr), + true = ets:insert(LangLimits, {?l2b(Lang), 0, 0}), % 0 means no limit + true = ets:insert(Langs, {?l2b(Lang), + Mod, Fun, SpecArg}) + end, couch_config:get("native_query_servers")), + + + process_flag(trap_exit, true), + {ok, #qserver{ + langs = Langs, % Keyed by language name, value is {Mod,Func,Arg} + pid_procs = PidProcs, % Keyed by PID, valus is a #proc record. + lang_procs = LangProcs, % Keyed by language name, value is a #proc record + lang_limits = LangLimits, % Keyed by language name, value is {Lang, Limit, Current} + config = {[{<<"reduce_limit">>, ReduceLimit},{<<"timeout">>, ProcTimeout}]} + }}. + +terminate(_Reason, #qserver{pid_procs=PidProcs}) -> + [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)], + ok. + +handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, Server) -> + Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), + case lang_proc(Lang, Server, fun(Procs) -> + % find a proc in the set that has the DDoc + proc_with_ddoc(DDoc, DDocKey, Procs) + end) of + {ok, Proc} -> + {reply, {ok, Proc, Server#qserver.config}, Server}; + wait -> + {noreply, add_to_waitlist({DDoc, DDocKey}, From, Server)}; + Error -> + {reply, Error, Server} + end; +handle_call({get_proc, Lang}, From, Server) -> + case lang_proc(Lang, Server, fun([P|_Procs]) -> + {ok, P} + end) of + {ok, Proc} -> + {reply, {ok, Proc, Server#qserver.config}, Server}; + wait -> + {noreply, add_to_waitlist({Lang}, From, Server)}; + Error -> + {reply, Error, Server} + end; +handle_call({unlink_proc, Pid}, _From, #qserver{pid_procs=PidProcs}=Server) -> + rem_value(PidProcs, Pid), + unlink(Pid), + {reply, ok, Server}; +handle_call({ret_proc, Proc}, _From, #qserver{ + pid_procs=PidProcs, + lang_procs=LangProcs}=Server) -> + % Along with max process limit, here we should check + % if we're over the limit and discard when we are. + add_value(PidProcs, Proc#proc.pid, Proc), + add_to_list(LangProcs, Proc#proc.lang, Proc), + link(Proc#proc.pid), + {reply, true, service_waitlist(Server)}. + +handle_cast(_Whatever, Server) -> + {noreply, Server}. + +handle_info({'EXIT', Pid, Status}, #qserver{ + pid_procs=PidProcs, + lang_procs=LangProcs, + lang_limits=LangLimits}=Server) -> + case ets:lookup(PidProcs, Pid) of + [{Pid, Proc}] -> + case Status of + normal -> ok; + _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status]) + end, + rem_value(PidProcs, Pid), + catch rem_from_list(LangProcs, Proc#proc.lang, Proc), + [{Lang, Lim, Current}] = ets:lookup(LangLimits, Proc#proc.lang), + true = ets:insert(LangLimits, {Lang, Lim, Current-1}), + {noreply, service_waitlist(Server)}; + [] -> + case Status of + normal -> + {noreply, Server}; + _ -> + {stop, Status, Server} + end + end. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +% Private API + +add_to_waitlist(Info, From, #qserver{waitlist=Waitlist}=Server) -> + Server#qserver{waitlist=[{Info, From}|Waitlist]}. + +service_waitlist(#qserver{waitlist=[]}=Server) -> + Server; +service_waitlist(#qserver{waitlist=Waitlist}=Server) -> + [Oldest|RevWList] = lists:reverse(Waitlist), + case service_waiting(Oldest, Server) of + ok -> + Server#qserver{waitlist=lists:reverse(RevWList)}; + wait -> + Server#qserver{waitlist=Waitlist} + end. + +% todo get rid of duplication +service_waiting({{#doc{body={Props}}=DDoc, DDocKey}, From}, Server) -> + Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>), + case lang_proc(Lang, Server, fun(Procs) -> + % find a proc in the set that has the DDoc + proc_with_ddoc(DDoc, DDocKey, Procs) + end) of + {ok, Proc} -> + gen_server:reply(From, {ok, Proc, Server#qserver.config}), + ok; + wait -> % this should never happen + wait; + Error -> + gen_server:reply(From, Error), + ok + end; +service_waiting({{Lang}, From}, Server) -> + case lang_proc(Lang, Server, fun([P|_Procs]) -> + {ok, P} + end) of + {ok, Proc} -> + gen_server:reply(From, {ok, Proc, Server#qserver.config}), + ok; + wait -> % this should never happen + wait; + Error -> + gen_server:reply(From, Error), + ok + end. + +lang_proc(Lang, #qserver{ + langs=Langs, + pid_procs=PidProcs, + lang_procs=LangProcs, + lang_limits=LangLimits}, PickFun) -> + % Note to future self. Add max process limit. + case ets:lookup(LangProcs, Lang) of + [{Lang, [P|Procs]}] -> + {ok, Proc} = PickFun([P|Procs]), + rem_from_list(LangProcs, Lang, Proc), + {ok, Proc}; + _ -> + case (catch new_process(Langs, LangLimits, Lang)) of + {ok, Proc} -> + add_value(PidProcs, Proc#proc.pid, Proc), + PickFun([Proc]); + ErrorOrWait -> + ErrorOrWait + end + end. + +new_process(Langs, LangLimits, Lang) -> + [{Lang, Lim, Current}] = ets:lookup(LangLimits, Lang), + if (Lim == 0) or (Current < Lim) -> % Lim == 0 means no limit + % we are below the limit for our language, make a new one + case ets:lookup(Langs, Lang) of + [{Lang, Mod, Func, Arg}] -> + {ok, Pid} = apply(Mod, Func, Arg), + true = ets:insert(LangLimits, {Lang, Lim, Current+1}), + {ok, #proc{lang=Lang, + pid=Pid, + % Called via proc_prompt, proc_set_timeout, and proc_stop + prompt_fun={Mod, prompt}, + set_timeout_fun={Mod, set_timeout}, + stop_fun={Mod, stop}}}; + _ -> + {unknown_query_language, Lang} + end; + true -> + wait + end. + +proc_with_ddoc(DDoc, DDocKey, LangProcs) -> + DDocProcs = lists:filter(fun(#proc{ddoc_keys=Keys}) -> + lists:any(fun(Key) -> + Key == DDocKey + end, Keys) + end, LangProcs), + case DDocProcs of + [DDocProc|_] -> + ?LOG_DEBUG("DDocProc found for DDocKey: ~p",[DDocKey]), + {ok, DDocProc}; + [] -> + [TeachProc|_] = LangProcs, + ?LOG_DEBUG("Teach ddoc to new proc ~p with DDocKey: ~p",[TeachProc, DDocKey]), + {ok, SmartProc} = teach_ddoc(DDoc, DDocKey, TeachProc), + {ok, SmartProc} + end. + +proc_prompt(Proc, Args) -> + {Mod, Func} = Proc#proc.prompt_fun, + apply(Mod, Func, [Proc#proc.pid, Args]). + +proc_stop(Proc) -> + {Mod, Func} = Proc#proc.stop_fun, + apply(Mod, Func, [Proc#proc.pid]). + +proc_set_timeout(Proc, Timeout) -> + {Mod, Func} = Proc#proc.set_timeout_fun, + apply(Mod, Func, [Proc#proc.pid, Timeout]). + +teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) -> + % send ddoc over the wire + % we only share the rev with the client we know to update code + % but it only keeps the latest copy, per each ddoc, around. + true = proc_prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]), + % we should remove any other ddocs keys for this docid + % because the query server overwrites without the rev + Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId], + % add ddoc to the proc + {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}. + +get_ddoc_process(#doc{} = DDoc, DDocKey) -> + % remove this case statement + case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of + {ok, Proc, {QueryConfig}} -> + % process knows the ddoc + case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of + true -> + proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)), + link(Proc#proc.pid), + gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), + Proc; + _ -> + catch proc_stop(Proc), + get_ddoc_process(DDoc, DDocKey) + end; + Error -> + throw(Error) + end. + +get_os_process(Lang) -> + case gen_server:call(couch_query_servers, {get_proc, Lang}) of + {ok, Proc, {QueryConfig}} -> + case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of + true -> + proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)), + link(Proc#proc.pid), + gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), + Proc; + _ -> + catch proc_stop(Proc), + get_os_process(Lang) + end; + Error -> + throw(Error) + end. + +ret_os_process(Proc) -> + true = gen_server:call(couch_query_servers, {ret_proc, Proc}), + catch unlink(Proc#proc.pid), + ok. + +add_value(Tid, Key, Value) -> + true = ets:insert(Tid, {Key, Value}). + +rem_value(Tid, Key) -> + true = ets:delete(Tid, Key). + +add_to_list(Tid, Key, Value) -> + case ets:lookup(Tid, Key) of + [{Key, Vals}] -> + true = ets:insert(Tid, {Key, [Value|Vals]}); + [] -> + true = ets:insert(Tid, {Key, [Value]}) + end. + +rem_from_list(Tid, Key, Value) when is_record(Value, proc)-> + Pid = Value#proc.pid, + case ets:lookup(Tid, Key) of + [{Key, Vals}] -> + % make a new values list that doesn't include the Value arg + NewValues = [Val || #proc{pid=P}=Val <- Vals, P /= Pid], + ets:insert(Tid, {Key, NewValues}); + [] -> ok + end; +rem_from_list(Tid, Key, Value) -> + case ets:lookup(Tid, Key) of + [{Key, Vals}] -> + % make a new values list that doesn't include the Value arg + NewValues = [Val || Val <- Vals, Val /= Value], + ets:insert(Tid, {Key, NewValues}); + [] -> ok + end. |