diff options
Diffstat (limited to 'src/mochiweb/mochiweb_socket_server.erl')
-rw-r--r-- | src/mochiweb/mochiweb_socket_server.erl | 190 |
1 files changed, 107 insertions, 83 deletions
diff --git a/src/mochiweb/mochiweb_socket_server.erl b/src/mochiweb/mochiweb_socket_server.erl index 7aafe290..1aae09ac 100644 --- a/src/mochiweb/mochiweb_socket_server.erl +++ b/src/mochiweb/mochiweb_socket_server.erl @@ -7,22 +7,28 @@ -author('bob@mochimedia.com'). -behaviour(gen_server). +-include("internal.hrl"). + -export([start/1, stop/1]). -export([init/1, handle_call/3, handle_cast/2, terminate/2, code_change/3, handle_info/2]). -export([get/2]). --export([acceptor_loop/1]). - -record(mochiweb_socket_server, {port, loop, name=undefined, + %% NOTE: This is currently ignored. max=2048, ip=any, listen=null, - acceptor=null, - backlog=128}). + nodelay=false, + backlog=128, + active_sockets=0, + acceptor_pool_size=16, + ssl=false, + ssl_opts=[{ssl_imp, new}], + acceptor_pool=sets:new()}). start(State=#mochiweb_socket_server{}) -> start_server(State); @@ -54,6 +60,8 @@ parse_options([], State) -> parse_options([{name, L} | Rest], State) when is_list(L) -> Name = {local, list_to_atom(L)}, parse_options(Rest, State#mochiweb_socket_server{name=Name}); +parse_options([{name, A} | Rest], State) when A =:= undefined -> + parse_options(Rest, State#mochiweb_socket_server{name=A}); parse_options([{name, A} | Rest], State) when is_atom(A) -> Name = {local, A}, parse_options(Rest, State#mochiweb_socket_server{name=Name}); @@ -79,16 +87,32 @@ parse_options([{loop, Loop} | Rest], State) -> parse_options(Rest, State#mochiweb_socket_server{loop=Loop}); parse_options([{backlog, Backlog} | Rest], State) -> parse_options(Rest, State#mochiweb_socket_server{backlog=Backlog}); +parse_options([{nodelay, NoDelay} | Rest], State) -> + parse_options(Rest, State#mochiweb_socket_server{nodelay=NoDelay}); +parse_options([{acceptor_pool_size, Max} | Rest], State) -> + MaxInt = ensure_int(Max), + parse_options(Rest, + State#mochiweb_socket_server{acceptor_pool_size=MaxInt}); parse_options([{max, Max} | Rest], State) -> - MaxInt = case Max of - Max when is_list(Max) -> - list_to_integer(Max); - Max when is_integer(Max) -> - Max - end, - parse_options(Rest, State#mochiweb_socket_server{max=MaxInt}). - -start_server(State=#mochiweb_socket_server{name=Name}) -> + error_logger:info_report([{warning, "TODO: max is currently unsupported"}, + {max, Max}]), + MaxInt = ensure_int(Max), + parse_options(Rest, State#mochiweb_socket_server{max=MaxInt}); +parse_options([{ssl, Ssl} | Rest], State) when is_boolean(Ssl) -> + parse_options(Rest, State#mochiweb_socket_server{ssl=Ssl}); +parse_options([{ssl_opts, SslOpts} | Rest], State) when is_list(SslOpts) -> + SslOpts1 = [{ssl_imp, new} | proplists:delete(ssl_imp, SslOpts)], + parse_options(Rest, State#mochiweb_socket_server{ssl_opts=SslOpts1}). + +start_server(State=#mochiweb_socket_server{ssl=Ssl, name=Name}) -> + case Ssl of + true -> + application:start(crypto), + application:start(public_key), + application:start(ssl); + false -> + void + end, case Name of undefined -> gen_server:start_link(?MODULE, State, []); @@ -96,6 +120,11 @@ start_server(State=#mochiweb_socket_server{name=Name}) -> gen_server:start_link(Name, ?MODULE, State, []) end. +ensure_int(N) when is_integer(N) -> + N; +ensure_int(S) when is_list(S) -> + integer_to_list(S). + ipv6_supported() -> case (catch inet:getaddr("localhost", inet6)) of {ok, _Addr} -> @@ -104,15 +133,15 @@ ipv6_supported() -> false end. -init(State=#mochiweb_socket_server{ip=Ip, port=Port, backlog=Backlog}) -> +init(State=#mochiweb_socket_server{ip=Ip, port=Port, backlog=Backlog, nodelay=NoDelay}) -> process_flag(trap_exit, true), BaseOpts = [binary, {reuseaddr, true}, {packet, 0}, {backlog, Backlog}, - {recbuf, 8192}, + {recbuf, ?RECBUF_SIZE}, {active, false}, - {nodelay, true}], + {nodelay, NoDelay}], Opts = case Ip of any -> case ipv6_supported() of % IPv4, and IPv6 if supported @@ -124,7 +153,7 @@ init(State=#mochiweb_socket_server{ip=Ip, port=Port, backlog=Backlog}) -> {_, _, _, _, _, _, _, _} -> % IPv6 [inet6, {ip, Ip} | BaseOpts] end, - case gen_tcp_listen(Port, Opts, State) of + case listen(Port, Opts, State) of {stop, eacces} -> case Port < 1024 of true -> @@ -132,7 +161,7 @@ init(State=#mochiweb_socket_server{ip=Ip, port=Port, backlog=Backlog}) -> {ok, _} -> case fdsrv:bind_socket(tcp, Port) of {ok, Fd} -> - gen_tcp_listen(Port, [{fd, Fd} | Opts], State); + listen(Port, [{fd, Fd} | Opts], State); _ -> {stop, fdsrv_bind_failed} end; @@ -146,47 +175,33 @@ init(State=#mochiweb_socket_server{ip=Ip, port=Port, backlog=Backlog}) -> Other end. -gen_tcp_listen(Port, Opts, State) -> - case gen_tcp:listen(Port, Opts) of +new_acceptor_pool(Listen, + State=#mochiweb_socket_server{acceptor_pool=Pool, + acceptor_pool_size=Size, + loop=Loop}) -> + F = fun (_, S) -> + Pid = mochiweb_acceptor:start_link(self(), Listen, Loop), + sets:add_element(Pid, S) + end, + Pool1 = lists:foldl(F, Pool, lists:seq(1, Size)), + State#mochiweb_socket_server{acceptor_pool=Pool1}. + +listen(Port, Opts, State=#mochiweb_socket_server{ssl=Ssl, ssl_opts=SslOpts}) -> + case mochiweb_socket:listen(Ssl, Port, Opts, SslOpts) of {ok, Listen} -> - {ok, ListenPort} = inet:port(Listen), - {ok, new_acceptor(State#mochiweb_socket_server{listen=Listen, - port=ListenPort})}; + {ok, ListenPort} = mochiweb_socket:port(Listen), + {ok, new_acceptor_pool( + Listen, + State#mochiweb_socket_server{listen=Listen, + port=ListenPort})}; {error, Reason} -> {stop, Reason} end. -new_acceptor(State=#mochiweb_socket_server{max=0}) -> - io:format("Not accepting new connections~n"), - State#mochiweb_socket_server{acceptor=null}; -new_acceptor(State=#mochiweb_socket_server{listen=Listen,loop=Loop}) -> - Pid = proc_lib:spawn_link(?MODULE, acceptor_loop, - [{self(), Listen, Loop}]), - State#mochiweb_socket_server{acceptor=Pid}. - -call_loop({M, F}, Socket) -> - M:F(Socket); -call_loop(Loop, Socket) -> - Loop(Socket). - -acceptor_loop({Server, Listen, Loop}) -> - case catch gen_tcp:accept(Listen) of - {ok, Socket} -> - gen_server:cast(Server, {accepted, self()}), - call_loop(Loop, Socket); - {error, closed} -> - exit({error, closed}); - Other -> - error_logger:error_report( - [{application, mochiweb}, - "Accept failed error", - lists:flatten(io_lib:format("~p", [Other]))]), - exit({error, accept_failed}) - end. - - do_get(port, #mochiweb_socket_server{port=Port}) -> - Port. + Port; +do_get(active_sockets, #mochiweb_socket_server{active_sockets=ActiveSockets}) -> + ActiveSockets. handle_call({get, Property}, _From, State) -> Res = do_get(Property, State), @@ -195,16 +210,15 @@ handle_call(_Message, _From, State) -> Res = error, {reply, Res, State}. -handle_cast({accepted, Pid}, - State=#mochiweb_socket_server{acceptor=Pid, max=Max}) -> - % io:format("accepted ~p~n", [Pid]), - State1 = State#mochiweb_socket_server{max=Max - 1}, - {noreply, new_acceptor(State1)}; +handle_cast({accepted, Pid, _Timing}, + State=#mochiweb_socket_server{active_sockets=ActiveSockets}) -> + State1 = State#mochiweb_socket_server{active_sockets=1 + ActiveSockets}, + {noreply, recycle_acceptor(Pid, State1)}; handle_cast(stop, State) -> {stop, normal, State}. terminate(_Reason, #mochiweb_socket_server{listen=Listen, port=Port}) -> - gen_tcp:close(Listen), + mochiweb_socket:close(Listen), case Port < 1024 of true -> catch fdsrv:stop(), @@ -216,33 +230,43 @@ terminate(_Reason, #mochiweb_socket_server{listen=Listen, port=Port}) -> code_change(_OldVsn, State, _Extra) -> State. -handle_info({'EXIT', Pid, normal}, - State=#mochiweb_socket_server{acceptor=Pid}) -> - % io:format("normal acceptor down~n"), - {noreply, new_acceptor(State)}; +recycle_acceptor(Pid, State=#mochiweb_socket_server{ + acceptor_pool=Pool, + listen=Listen, + loop=Loop, + active_sockets=ActiveSockets}) -> + case sets:is_element(Pid, Pool) of + true -> + Acceptor = mochiweb_acceptor:start_link(self(), Listen, Loop), + Pool1 = sets:add_element(Acceptor, sets:del_element(Pid, Pool)), + State#mochiweb_socket_server{acceptor_pool=Pool1}; + false -> + State#mochiweb_socket_server{active_sockets=ActiveSockets - 1} + end. + +handle_info({'EXIT', Pid, normal}, State) -> + {noreply, recycle_acceptor(Pid, State)}; handle_info({'EXIT', Pid, Reason}, - State=#mochiweb_socket_server{acceptor=Pid}) -> - error_logger:error_report({?MODULE, ?LINE, - {acceptor_error, Reason}}), - timer:sleep(100), - {noreply, new_acceptor(State)}; -handle_info({'EXIT', _LoopPid, Reason}, - State=#mochiweb_socket_server{acceptor=Pid, max=Max}) -> - case Reason of - normal -> - ok; - _ -> + State=#mochiweb_socket_server{acceptor_pool=Pool}) -> + case sets:is_element(Pid, Pool) of + true -> + %% If there was an unexpected error accepting, log and sleep. error_logger:error_report({?MODULE, ?LINE, - {child_error, Reason}}) + {acceptor_error, Reason}}), + timer:sleep(100); + false -> + ok end, - State1 = State#mochiweb_socket_server{max=Max + 1}, - State2 = case Pid of - null -> - new_acceptor(State1); - _ -> - State1 - end, - {noreply, State2}; + {noreply, recycle_acceptor(Pid, State)}; handle_info(Info, State) -> error_logger:info_report([{'INFO', Info}, {'State', State}]), {noreply, State}. + + + +%% +%% Tests +%% +-include_lib("eunit/include/eunit.hrl"). +-ifdef(TEST). +-endif. |