diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 2a809f88d..a0f2b1e7d 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -49,7 +49,8 @@ -export([ listener_id/2, parse_listener_id/1, - ensure_override_limiter_conf/2 + ensure_override_limiter_conf/2, + esockd_access_rules/1 ]). -export([pre_config_update/3, post_config_update/5]). @@ -497,17 +498,28 @@ ip_port({Addr, Port}) -> [{ip, Addr}, {port, Port}]. esockd_access_rules(StrRules) -> - Access = fun(S) -> + Access = fun(S, Acc) -> [A, CIDR] = string:tokens(S, " "), - { - list_to_atom(A), - case CIDR of - "all" -> all; - _ -> CIDR - end - } + %% esockd rules only use words 'allow' and 'deny', both are existing + %% comparison of strings may be better, but there is a loss of backward compatibility + case emqx_misc:safe_to_existing_atom(A) of + {ok, Action} -> + [ + { + Action, + case CIDR of + "all" -> all; + _ -> CIDR + end + } + | Acc + ]; + _ -> + ?SLOG(warning, #{msg => "invalid esockd access rule", rule => S}), + Acc + end end, - [Access(R) || R <- StrRules]. + lists:foldr(Access, [], StrRules). merge_default(Options) -> case lists:keytake(tcp_options, 1, Options) of diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx/src/emqx_misc.erl index 54157da78..674465e6a 100644 --- a/apps/emqx/src/emqx_misc.erl +++ b/apps/emqx/src/emqx_misc.erl @@ -469,9 +469,9 @@ safe_to_existing_atom(In) -> safe_to_existing_atom(In, utf8). safe_to_existing_atom(Bin, Encoding) when is_binary(Bin) -> - try_to_existing_atom(fun erlang:binary_to_existing_atom/2, [Bin, Encoding]); -safe_to_existing_atom(List, _Encoding) when is_list(List) -> - try_to_existing_atom(fun erlang:list_to_existing_atom/1, [List]); + try_to_existing_atom(fun erlang:binary_to_existing_atom/2, Bin, Encoding); +safe_to_existing_atom(List, Encoding) when is_list(List) -> + try_to_existing_atom(fun(In, _) -> erlang:list_to_existing_atom(In) end, List, Encoding); safe_to_existing_atom(Atom, _Encoding) when is_atom(Atom) -> {ok, Atom}; safe_to_existing_atom(_Any, _Encoding) -> @@ -547,8 +547,8 @@ readable_error_msg(Error) -> end end. -try_to_existing_atom(Fun, Args) -> - try erlang:apply(Fun, Args) of +try_to_existing_atom(Convert, Data, Encoding) -> + try Convert(Data, Encoding) of Atom -> {ok, Atom} catch diff --git a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl index 9c12e3343..8587dc1dc 100644 --- a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl +++ b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl @@ -121,13 +121,7 @@ apply_publish_opts(Msg, MQTTMsg) -> maps:fold( fun (<<"retain">>, V, Acc) -> - Val = - case emqx_misc:safe_to_existing_atom(V) of - {ok, true} -> - true; - _ -> - false - end, + Val = V =:= <<"true">>, emqx_message:set_flag(retain, Val, Acc); (<<"expiry">>, V, Acc) -> Val = erlang:binary_to_integer(V), diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 8df7d84c0..68fce7589 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -70,6 +70,8 @@ default_subopts/0 ]). +-import(emqx_listeners, [esockd_access_rules/1]). + -define(ACTIVE_N, 100). -define(DEFAULT_IDLE_TIMEOUT, 30000). -define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024 * 1024}). @@ -443,19 +445,6 @@ esockd_opts(Type, Opts0) -> end ). -esockd_access_rules(StrRules) -> - Access = fun(S) -> - [A, CIDR] = string:tokens(S, " "), - { - list_to_atom(A), - case CIDR of - "all" -> all; - _ -> CIDR - end - } - end, - [Access(R) || R <- StrRules]. - ssl_opts(Name, Opts) -> Type = case Name of diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 60b2f3b15..6a0f8e8be 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -356,15 +356,26 @@ mnesia(_) -> %% @doc Logger Command log(["set-level", Level]) -> - case emqx_logger:set_log_level(list_to_atom(Level)) of - ok -> emqx_ctl:print("~ts~n", [Level]); - Error -> emqx_ctl:print("[error] set overall log level failed: ~p~n", [Error]) + case emqx_misc:safe_to_existing_atom(Level) of + {ok, Level1} -> + case emqx_logger:set_log_level(Level1) of + ok -> emqx_ctl:print("~ts~n", [Level]); + Error -> emqx_ctl:print("[error] set overall log level failed: ~p~n", [Error]) + end; + _ -> + emqx_ctl:print("[error] invalid level: ~p~n", [Level]) end; log(["primary-level"]) -> Level = emqx_logger:get_primary_log_level(), emqx_ctl:print("~ts~n", [Level]); log(["primary-level", Level]) -> - _ = emqx_logger:set_primary_log_level(list_to_atom(Level)), + case emqx_misc:safe_to_existing_atom(Level) of + {ok, Level1} -> + _ = emqx_logger:set_primary_log_level(Level1), + ok; + _ -> + emqx_ctl:print("[error] invalid level: ~p~n", [Level]) + end, emqx_ctl:print("~ts~n", [emqx_logger:get_primary_log_level()]); log(["handlers", "list"]) -> _ = [ @@ -381,26 +392,50 @@ log(["handlers", "list"]) -> ], ok; log(["handlers", "start", HandlerId]) -> - case emqx_logger:start_log_handler(list_to_atom(HandlerId)) of - ok -> - emqx_ctl:print("log handler ~ts started~n", [HandlerId]); - {error, Reason} -> - emqx_ctl:print("[error] failed to start log handler ~ts: ~p~n", [HandlerId, Reason]) + case emqx_misc:safe_to_existing_atom(HandlerId) of + {ok, HandlerId1} -> + case emqx_logger:start_log_handler(HandlerId1) of + ok -> + emqx_ctl:print("log handler ~ts started~n", [HandlerId]); + {error, Reason} -> + emqx_ctl:print("[error] failed to start log handler ~ts: ~p~n", [ + HandlerId, Reason + ]) + end; + _ -> + emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId]) end; log(["handlers", "stop", HandlerId]) -> - case emqx_logger:stop_log_handler(list_to_atom(HandlerId)) of - ok -> - emqx_ctl:print("log handler ~ts stopped~n", [HandlerId]); - {error, Reason} -> - emqx_ctl:print("[error] failed to stop log handler ~ts: ~p~n", [HandlerId, Reason]) + case emqx_misc:safe_to_existing_atom(HandlerId) of + {ok, HandlerId1} -> + case emqx_logger:stop_log_handler(HandlerId1) of + ok -> + emqx_ctl:print("log handler ~ts stopped~n", [HandlerId1]); + {error, Reason} -> + emqx_ctl:print("[error] failed to stop log handler ~ts: ~p~n", [ + HandlerId1, Reason + ]) + end; + _ -> + emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId]) end; log(["handlers", "set-level", HandlerId, Level]) -> - case emqx_logger:set_log_handler_level(list_to_atom(HandlerId), list_to_atom(Level)) of - ok -> - #{level := NewLevel} = emqx_logger:get_log_handler(list_to_atom(HandlerId)), - emqx_ctl:print("~ts~n", [NewLevel]); - {error, Error} -> - emqx_ctl:print("[error] ~p~n", [Error]) + case emqx_misc:safe_to_existing_atom(HandlerId) of + {ok, HandlerId1} -> + case emqx_misc:safe_to_existing_atom(Level) of + {ok, Level1} -> + case emqx_logger:set_log_handler_level(HandlerId1, Level1) of + ok -> + #{level := NewLevel} = emqx_logger:get_log_handler(HandlerId1), + emqx_ctl:print("~ts~n", [NewLevel]); + {error, Error} -> + emqx_ctl:print("[error] ~p~n", [Error]) + end; + _ -> + emqx_ctl:print("[error] invalid level:~p~n", [Level]) + end; + _ -> + emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId]) end; log(_) -> emqx_ctl:usage( @@ -593,25 +628,40 @@ listeners([]) -> emqx_listeners:list() ); listeners(["stop", ListenerId]) -> - case emqx_listeners:stop_listener(list_to_atom(ListenerId)) of - ok -> - emqx_ctl:print("Stop ~ts listener successfully.~n", [ListenerId]); - {error, Error} -> - emqx_ctl:print("Failed to stop ~ts listener: ~0p~n", [ListenerId, Error]) + case emqx_misc:safe_to_existing_atom(ListenerId) of + {ok, ListenerId1} -> + case emqx_listeners:stop_listener(ListenerId1) of + ok -> + emqx_ctl:print("Stop ~ts listener successfully.~n", [ListenerId]); + {error, Error} -> + emqx_ctl:print("Failed to stop ~ts listener: ~0p~n", [ListenerId, Error]) + end; + _ -> + emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId]) end; listeners(["start", ListenerId]) -> - case emqx_listeners:start_listener(list_to_atom(ListenerId)) of - ok -> - emqx_ctl:print("Started ~ts listener successfully.~n", [ListenerId]); - {error, Error} -> - emqx_ctl:print("Failed to start ~ts listener: ~0p~n", [ListenerId, Error]) + case emqx_misc:safe_to_existing_atom(ListenerId) of + {ok, ListenerId1} -> + case emqx_listeners:start_listener(ListenerId1) of + ok -> + emqx_ctl:print("Started ~ts listener successfully.~n", [ListenerId]); + {error, Error} -> + emqx_ctl:print("Failed to start ~ts listener: ~0p~n", [ListenerId, Error]) + end; + _ -> + emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId]) end; listeners(["restart", ListenerId]) -> - case emqx_listeners:restart_listener(list_to_atom(ListenerId)) of - ok -> - emqx_ctl:print("Restarted ~ts listener successfully.~n", [ListenerId]); - {error, Error} -> - emqx_ctl:print("Failed to restart ~ts listener: ~0p~n", [ListenerId, Error]) + case emqx_misc:safe_to_existing_atom(ListenerId) of + {ok, ListenerId1} -> + case emqx_listeners:restart_listener(ListenerId1) of + ok -> + emqx_ctl:print("Restarted ~ts listener successfully.~n", [ListenerId]); + {error, Error} -> + emqx_ctl:print("Failed to restart ~ts listener: ~0p~n", [ListenerId, Error]) + end; + _ -> + emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId]) end; listeners(_) -> emqx_ctl:usage([