diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index cd1dd471f..32320d50d 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -278,14 +278,12 @@ %% Erlang System Monitor {sysmon, [ - %% Log file. - {logfile, "log/emqttd_sysmon.log"}, %% Long GC {long_gc, 100}, %% Long Schedule(ms) - {long_schedule, 50}, + {long_schedule, 100}, %% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. %% 8 * 1024 * 1024 diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index 66b57865b..987ab4876 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -270,22 +270,20 @@ %% Erlang System Monitor {sysmon, [ - %% Log file. - {logfile, "log/emqttd_sysmon.log"}, %% Long GC, don't monitor in production mode for: %% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 {long_gc, false}, %% Long Schedule(ms) - {long_schedule, 100}, + {long_schedule, 240}, %% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. %% 8 * 1024 * 1024 {large_heap, 8388608}, %% Busy Port - {busy_port, true}, + {busy_port, false}, %% Busy Dist Port {busy_dist_port, true} diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index fdc5b7fba..c04cd920b 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -157,7 +157,8 @@ stop() -> %%%============================================================================= init([Opts]) -> - ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected]), + ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), + ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, Opts))}), ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, Opts))}), {ok, state}. diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index fd5118cb6..ab0f3c0ad 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -63,7 +63,7 @@ all_rules() -> %%------------------------------------------------------------------------------ -spec init(AclOpts :: list()) -> {ok, State :: any()}. init(AclOpts) -> - ets:new(?ACL_RULE_TAB, [set, public, named_table]), + ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]), AclFile = proplists:get_value(file, AclOpts), Default = proplists:get_value(nomatch, AclOpts, allow), State = #state{acl_file = AclFile, nomatch = Default}, diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index a51ad4cf6..cfdfa2863 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -116,10 +116,10 @@ handle_call(Req, _From, State) -> handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -handle_info({dispatch, Msg}, State = #state{mqueue = MQ, status = down}) -> +handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = MQ, status = down}) -> {noreply, State#state{mqueue = emqttd_mqueue:in(Msg, MQ)}}; -handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) -> +handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) -> rpc:cast(Node, emqttd_pubsub, publish, [transform(Msg, State)]), {noreply, State, hibernate}; diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 06e2bea7e..7c86b6086 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -119,8 +119,7 @@ copy_table(Table) -> %% @end %%------------------------------------------------------------------------------ wait_for_tables() -> - %%TODO: is not right? - io:format("mnesia wait_for_tables: ~p~n", [mnesia:system_info(local_tables)]), + %% io:format("mnesia wait_for_tables: ~p~n", [mnesia:system_info(local_tables)]), mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity). %%------------------------------------------------------------------------------ diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 235d78be3..96851ae79 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -211,35 +211,35 @@ cast(Msg) -> -spec publish(Msg :: mqtt_message()) -> ok. publish(Msg = #mqtt_message{from = From}) -> trace(publish, From, Msg), - Msg1 = #mqtt_message{topic = Topic} + Msg1 = #mqtt_message{topic = To} = emqttd_broker:foldl_hooks('message.publish', [], Msg), %% Retain message first. Don't create retained topic. case emqttd_retainer:retain(Msg1) of ok -> %% TODO: why unset 'retain' flag? - publish(Topic, emqttd_message:unset_flag(Msg1)); + publish(To, emqttd_message:unset_flag(Msg1)); ignore -> - publish(Topic, Msg1) + publish(To, Msg1) end. -publish(Topic, Msg) when is_binary(Topic) -> - lists:foreach(fun(#mqtt_topic{topic = Name, node = Node}) -> +publish(To, Msg) -> + lists:foreach(fun(#mqtt_topic{topic = Topic, node = Node}) -> case Node =:= node() of - true -> ?ROUTER:route(Name, Msg); - false -> rpc:cast(Node, ?ROUTER, route, [Name, Msg]) + true -> ?ROUTER:route(Topic, Msg); + false -> rpc:cast(Node, ?ROUTER, route, [Topic, Msg]) end - end, match(Topic)). + end, match(To)). %%------------------------------------------------------------------------------ %% @doc Match Topic Name with Topic Filters %% @end %%------------------------------------------------------------------------------ --spec match(Topic :: binary()) -> [mqtt_topic()]. -match(Topic) when is_binary(Topic) -> - MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]), - %% ets:lookup for topic table will be copied. - lists:append([ets:lookup(topic, Name) || Name <- MatchedTopics]). +-spec match(binary()) -> [mqtt_topic()]. +match(To) -> + MatchedTopics = mnesia:async_dirty(fun emqttd_trie:match/1, [To]), + %% ets:lookup for topic table will be replicated. + lists:append([ets:lookup(topic, Topic) || Topic <- MatchedTopics]). %%%============================================================================= %%% gen_server callbacks @@ -252,13 +252,16 @@ init([Pool, Id, StatsFun, Opts]) -> handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, State = #state{statsfun = StatsFun}) -> + + Topics = [Topic || {Topic, _Qos} <- TopicTable], + %% Add routes first - ?ROUTER:add_routes(TopicTable, SubPid), + ?ROUTER:add_routes(Topics, SubPid), - %% Add topics - Topics = [#mqtt_topic{topic = Topic, node = node()} || {Topic, _Qos} <- TopicTable], + %% Insert topic records to global topic table + Records = [#mqtt_topic{topic = Topic, node = node()} || Topic <- Topics], - case mnesia:transaction(fun add_topics/1, [Topics]) of + case mnesia:transaction(fun add_topics/1, [Records]) of {atomic, _} -> StatsFun(topic), if_subscription( @@ -268,6 +271,7 @@ handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, emqttd_pooler:async_submit({mnesia, async_dirty, Args}), StatsFun(subscription) end), + %% Grant all qos... {reply, {ok, [Qos || {_Topic, Qos} <- TopicTable]}, State}; {aborted, Error} -> {reply, {error, Error}, State} @@ -293,12 +297,13 @@ handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> + Routes = ?ROUTER:lookup_routes(DownPid), %% Delete all routes of the process ?ROUTER:delete_routes(DownPid), - ?HELPER:aging([Topic || {Topic, _Qos} <- Routes, not ?ROUTER:has_route(Topic)]), + ?HELPER:aging([Topic || Topic <- Routes, not ?ROUTER:has_route(Topic)]), {noreply, State, hibernate}; diff --git a/src/emqttd_retainer.erl b/src/emqttd_retainer.erl index 1b033f5fd..00523f259 100644 --- a/src/emqttd_retainer.erl +++ b/src/emqttd_retainer.erl @@ -142,7 +142,7 @@ dispatch(Topic, CPid) when is_binary(Topic) -> end, mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained]) end, - lists:foreach(fun(Msg) -> CPid ! {dispatch, Msg} end, lists:reverse(Msgs)). + lists:foreach(fun(Msg) -> CPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)). %%%============================================================================= %%% gen_server callbacks diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 3d1489a5c..3eed025b4 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -23,11 +23,11 @@ %%% %%% Route Table: %%% -%%% Topic -> {Pid1, Qos}, {Pid2, Qos}, ... +%%% Topic -> Pid1, Pid2, ... %%% %%% Reverse Route Table: %%% -%%% Pid -> {Topic1, Qos}, {Topic2, Qos}, ... +%%% Pid -> Topic1, Topic2, ... %%% %%% @end %%% @@ -72,17 +72,15 @@ ensure_tab(Tab, Opts) -> %% @doc Add Routes. %% @end %%------------------------------------------------------------------------------ --spec add_routes(list({binary(), mqtt_qos()}), pid()) -> ok. -add_routes(TopicTable, Pid) when is_pid(Pid) -> +-spec add_routes(list(binary()), pid()) -> ok. +add_routes(Topics, Pid) when is_pid(Pid) -> with_stats(fun() -> case lookup_routes(Pid) of [] -> erlang:monitor(process, Pid), - insert_routes(TopicTable, Pid); - TopicInEts -> - {NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts), - update_routes(UpdatedTopics, Pid), - insert_routes(NewTopics, Pid) + insert_routes(Topics, Pid); + InEts -> + insert_routes(Topics -- InEts, Pid) end end). @@ -90,9 +88,9 @@ add_routes(TopicTable, Pid) when is_pid(Pid) -> %% @doc Lookup Routes %% @end %%------------------------------------------------------------------------------ --spec lookup_routes(pid()) -> list({binary(), mqtt_qos()}). +-spec lookup_routes(pid()) -> list(binary()). lookup_routes(Pid) when is_pid(Pid) -> - [{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)]. + [Topic || {_, Topic} <- ets:lookup(reverse_route, Pid)]. %%------------------------------------------------------------------------------ %% @doc Has Route? @@ -116,7 +114,7 @@ delete_routes(Topics, Pid) -> -spec delete_routes(pid()) -> ok. delete_routes(Pid) when is_pid(Pid) -> with_stats(fun() -> - Routes = [{Topic, Pid} || {Topic, _Qos} <- lookup_routes(Pid)], + Routes = [{Topic, Pid} || Topic <- lookup_routes(Pid)], ets:delete(reverse_route, Pid), lists:foreach(fun delete_route_only/1, Routes) end). @@ -132,8 +130,8 @@ route(Queue = <<"$Q/", _Q>>, Msg) -> emqttd_metrics:inc('messages/dropped'); Routes -> Idx = crypto:rand_uniform(1, length(Routes) + 1), - {_, SubPid, SubQos} = lists:nth(Idx, Routes), - SubPid ! {dispatch, tune_qos(SubQos, Msg)} + {_, SubPid} = lists:nth(Idx, Routes), + dispatch(SubPid, Queue, Msg) end; route(Topic, Msg) -> @@ -141,69 +139,33 @@ route(Topic, Msg) -> [] -> emqttd_metrics:inc('messages/dropped'); Routes -> - lists:foreach( - fun({_Topic, SubPid, SubQos}) -> - SubPid ! {dispatch, tune_qos(SubQos, Msg)} - end, Routes) + lists:foreach(fun({_Topic, SubPid}) -> + dispatch(SubPid, Topic, Msg) + end, Routes) end. -tune_qos(SubQos, Msg = #mqtt_message{qos = PubQos}) when PubQos > SubQos -> - Msg#mqtt_message{qos = SubQos}; -tune_qos(_SubQos, Msg) -> - Msg. +dispatch(SubPid, Topic, Msg) -> SubPid ! {dispatch, Topic, Msg}. %%%============================================================================= %%% Internal Functions %%%============================================================================= -diff(TopicTable, TopicInEts) -> - diff(TopicTable, TopicInEts, [], []). - -diff([], _TopicInEts, NewAcc, UpAcc) -> - {NewAcc, UpAcc}; - -diff([{Topic, Qos}|TopicTable], TopicInEts, NewAcc, UpAcc) -> - case lists:keyfind(Topic, 1, TopicInEts) of - {Topic, Qos} -> - diff(TopicTable, TopicInEts, NewAcc, UpAcc); - {Topic, _Qos} -> - diff(TopicTable, TopicInEts, NewAcc, [{Topic, Qos}|UpAcc]); - false -> - diff(TopicTable, TopicInEts, [{Topic, Qos}|NewAcc], UpAcc) - end. - insert_routes([], _Pid) -> ok; -insert_routes(TopicTable, Pid) -> - {Routes, ReverseRoutes} = routes(TopicTable, Pid), +insert_routes(Topics, Pid) -> + {Routes, ReverseRoutes} = routes(Topics, Pid), ets:insert(route, Routes), ets:insert(reverse_route, ReverseRoutes). -update_routes([], _Pid) -> - ok; -update_routes(TopicTable, Pid) -> - {Routes, ReverseRoutes} = routes(TopicTable, Pid), - lists:foreach(fun update_route/1, Routes), - lists:foreach(fun update_reverse_route/1, ReverseRoutes). - -update_route(Route = {Topic, Pid, _Qos}) -> - ets:match_delete(route, {Topic, Pid, '_'}), - ets:insert(route, Route). - -update_reverse_route(RevRoute = {Pid, Topic, _Qos}) -> - ets:match_delete(reverse_route, {Pid, Topic, '_'}), - ets:insert(reverse_route, RevRoute). - -routes(TopicTable, Pid) -> - F = fun(Topic, Qos) -> {{Topic, Pid, Qos}, {Pid, Topic, Qos}} end, - lists:unzip([F(Topic, Qos) || {Topic, Qos} <- TopicTable]). +routes(Topics, Pid) -> + lists:unzip([{{Topic, Pid}, {Pid, Topic}} || Topic <- Topics]). delete_route({Topic, Pid}) -> - ets:match_delete(reverse_route, {Pid, Topic, '_'}), - ets:match_delete(route, {Topic, Pid, '_'}). + ets:delete_object(reverse_route, {Pid, Topic}), + ets:delete_object(route, {Topic, Pid}). delete_route_only({Topic, Pid}) -> - ets:match_delete(route, {Topic, Pid, '_'}). + ets:delete_object(route, {Topic, Pid}). with_stats(Fun) -> Ok = Fun(), setstats(), Ok. diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 07b9b0359..a2289920d 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -85,9 +85,8 @@ %% Last packet id of the session packet_id = 1, - %%TODO: Removed?? %% Client’s subscriptions. - subscriptions :: list(), + subscriptions :: dict:dict(), %% Inflight qos1, qos2 messages sent to the client but unacked, %% QoS 1 and QoS 2 messages which have been sent to the Client, @@ -246,7 +245,7 @@ init([CleanSess, ClientId, ClientPid]) -> clean_sess = CleanSess, client_id = ClientId, client_pid = ClientPid, - subscriptions = [], + subscriptions = dict:new(), inflight_queue = [], max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0), message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), @@ -284,12 +283,12 @@ prioritise_cast(Msg, _Len, _State) -> prioritise_info(Msg, _Len, _State) -> case Msg of - {'EXIT', _, _} -> 10; - expired -> 10; - {timeout, _, _} -> 5; - collect_info -> 2; - {dispatch, _} -> 1; - _ -> 0 + {'EXIT', _, _} -> 10; + expired -> 10; + {timeout, _, _} -> 5; + collect_info -> 2; + {dispatch, _, _} -> 1; + _ -> 0 end. handle_call(info, _From, State) -> @@ -316,7 +315,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), - case TopicTable -- Subscriptions of + case TopicTable -- dict:to_list(Subscriptions) of [] -> AckFun([Qos || {_, Qos} <- TopicTable]), hibernate(Session); @@ -331,21 +330,22 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli ?LOG(info, "Subscribe ~p, Granted QoS: ~p", [TopicTable, GrantedQos], Session), Subscriptions1 = - lists:foldl(fun({Topic, Qos}, Acc) -> - case lists:keyfind(Topic, 1, Acc) of - {Topic, Qos} -> + lists:foldl(fun({Topic, Qos}, Dict) -> + case dict:find(Topic, Dict) of + {ok, Qos} -> ?LOG(warning, "resubscribe ~s, qos = ~w", [Topic, Qos], Session), - Acc; - {Topic, OldQos} -> + Dict; + {ok, OldQos} -> ?LOG(warning, "resubscribe ~s, old qos=~w, new qos=~w", [Topic, OldQos, Qos], Session), - lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); - false -> + dict:store(Topic, Qos, Dict); + error -> %%TODO: the design is ugly, rewrite later...:( %% : 3.8.4 %% Where the Topic Filter is not identical to any existing Subscription’s filter, %% a new Subscription is created and all matching retained messages are sent. emqttd_retainer:dispatch(Topic, self()), - [{Topic, Qos} | Acc] + + dict:store(Topic, Qos, Dict) end end, Subscriptions, TopicTable), hibernate(Session#session{subscriptions = Subscriptions1}) @@ -362,13 +362,8 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, ?LOG(info, "unsubscribe ~p", [Topics], Session), Subscriptions1 = - lists:foldl(fun(Topic, Acc) -> - case lists:keyfind(Topic, 1, Acc) of - {Topic, _Qos} -> - lists:keydelete(Topic, 1, Acc); - false -> - Acc - end + lists:foldl(fun(Topic, Dict) -> + dict:erase(Topic, Dict) end, Subscriptions, Topics), hibernate(Session#session{subscriptions = Subscriptions1}); @@ -485,28 +480,10 @@ handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp}) handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -%% Queue messages when client is offline -handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, - message_queue = Q}) +%% Dispatch Message +handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions}) when is_record(Msg, mqtt_message) -> - hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); - -%% Dispatch qos0 message directly to client -handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, - Session = #session{client_pid = ClientPid}) -> - ClientPid ! {deliver, Msg}, - hibernate(Session); - -handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, - Session = #session{message_queue = MsgQ}) - when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> - - case check_inflight(Session) of - true -> - noreply(deliver(Msg, Session)); - false -> - hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) - end; + dispatch(fixqos(Topic, Msg, Subscriptions), Session); handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, awaiting_ack = AwaitingAck}) -> @@ -604,6 +581,38 @@ kick(ClientId, OldPid, Pid) -> %% Clean noproc receive {'EXIT', OldPid, _} -> ok after 0 -> ok end. +%%------------------------------------------------------------------------------ +%% Dispatch Messages +%%------------------------------------------------------------------------------ + +%% Queue message if client disconnected +dispatch(Msg, Session = #session{client_pid = undefined, message_queue = Q}) -> + hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)}); + +%% Deliver qos0 message directly to client +dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) -> + ClientPid ! {deliver, Msg}, + hibernate(Session); + +dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ}) + when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> + case check_inflight(Session) of + true -> + noreply(deliver(Msg, Session)); + false -> + hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) + end. + +fixqos(Topic, Msg = #mqtt_message{qos = PubQos}, Subscriptions) -> + case dict:find(Topic, Subscriptions) of + {ok, SubQos} when PubQos > SubQos -> + Msg#mqtt_message{qos = SubQos}; + {ok, _SubQos} -> + Msg; + error -> + Msg + end. + %%------------------------------------------------------------------------------ %% Check inflight and awaiting_rel %%------------------------------------------------------------------------------ @@ -723,7 +732,7 @@ sess_info(#session{clean_sess = CleanSess, timestamp = CreatedAt}) -> Stats = emqttd_mqueue:stats(MessageQueue), [{clean_sess, CleanSess}, - {subscriptions, Subscriptions}, + {subscriptions, dict:to_list(Subscriptions)}, {max_inflight, MaxInflight}, {inflight_queue, length(InflightQueue)}, {message_queue, proplists:get_value(len, Stats)}, diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index 166bda674..bf18c31ca 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -60,8 +60,9 @@ start_link(Opts) -> init([Opts]) -> erlang:system_monitor(self(), parse_opt(Opts)), {ok, TRef} = timer:send_interval(timer:seconds(1), reset), - {ok, TraceLog} = start_tracelog(proplists:get_value(logfile, Opts)), - {ok, #state{tickref = TRef, tracelog = TraceLog}}. + %%TODO: don't trace for performance issue. + %%{ok, TraceLog} = start_tracelog(proplists:get_value(logfile, Opts)), + {ok, #state{tickref = TRef}}. parse_opt(Opts) -> parse_opt(Opts, []). @@ -71,6 +72,8 @@ parse_opt([{long_gc, false}|Opts], Acc) -> parse_opt(Opts, Acc); parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) -> parse_opt(Opts, [{long_gc, Ms}|Acc]); +parse_opt([{long_schedule, false}|Opts], Acc) -> + parse_opt(Opts, Acc); parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) -> parse_opt(Opts, [{long_schedule, Ms}|Acc]); parse_opt([{large_heap, Size}|Opts], Acc) when is_integer(Size) -> @@ -83,7 +86,7 @@ parse_opt([{busy_dist_port, true}|Opts], Acc) -> parse_opt(Opts, [busy_dist_port|Acc]); parse_opt([{busy_dist_port, false}|Opts], Acc) -> parse_opt(Opts, Acc); -parse_opt([{logfile, _}|Opts], Acc) -> +parse_opt([_Opt|Opts], Acc) -> parse_opt(Opts, Acc). handle_call(Req, _From, State) ->