diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index bb3478702..8a73e5dca 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -26,11 +26,11 @@ -spec(authenticate(emqx_types:credentials()) -> {ok, emqx_types:credentials()} | {error, term()}). authenticate(Credentials) -> - case emqx_hooks:run_fold('client.authenticate', [], Credentials#{result => init_result(Credentials)}) of - #{result := success} = NewCredentials -> + case emqx_hooks:run_fold('client.authenticate', [], Credentials#{auth_result => init_auth_result(Credentials)}) of + #{auth_result := success} = NewCredentials -> {ok, NewCredentials}; NewCredentials -> - {error, maps:get(result, NewCredentials, unknown_error)} + {error, maps:get(auth_result, NewCredentials, unknown_error)} end. %% @doc Check ACL @@ -61,7 +61,7 @@ do_check_acl(#{zone := Zone} = Credentials, PubSub, Topic) -> reload_acl() -> emqx_mod_acl_internal:reload_acl(). -init_result(Credentials) -> +init_auth_result(Credentials) -> case emqx_zone:get_env(maps:get(zone, Credentials, undefined), allow_anonymous, false) of true -> success; false -> not_authorized diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index 49d919258..0c4152c4e 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -104,7 +104,7 @@ match(Credentials, Topic, [Rule|Rules]) -> -spec(reload_acl() -> ok | {error, term()}). reload_acl() -> try load_rules_from_file(acl_file()) of - ok -> + _ -> emqx_logger:info("Reload acl_file ~s successfully", [acl_file()]), ok; {error, Error} -> diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index ba2cc35da..3d186e93a 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -97,7 +97,8 @@ init([Opts]) -> {ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts, 0.80), cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts, 0.60), cpu_check_interval => proplists:get_value(cpu_check_interval, Opts, 60), - timer => undefined})}. + timer => undefined, + is_cpu_alarm_set => false})}. handle_call(get_cpu_check_interval, _From, State) -> {reply, maps:get(cpu_check_interval, State, undefined), State}; @@ -122,7 +123,8 @@ handle_cast(_Request, State) -> handle_info({timeout, Timer, check}, State = #{timer := Timer, cpu_high_watermark := CPUHighWatermark, - cpu_low_watermark := CPULowWatermark}) -> + cpu_low_watermark := CPULowWatermark, + is_cpu_alarm_set := IsCPUAlarmSet}) -> case cpu_sup:util() of 0 -> {noreply, State#{timer := undefined}}; @@ -131,10 +133,13 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer, {noreply, ensure_check_timer(State)}; Busy when Busy / 100 >= CPUHighWatermark -> alarm_handler:set_alarm({cpu_high_watermark, Busy}), - {noreply, ensure_check_timer(State)}; + {noreply, ensure_check_timer(State#{is_cpu_alarm_set := true})}; Busy when Busy / 100 < CPULowWatermark -> - alarm_handler:clear_alarm(cpu_high_watermark), - {noreply, ensure_check_timer(State)} + case IsCPUAlarmSet of + true -> alarm_handler:clear_alarm(cpu_high_watermark); + false -> ok + end, + {noreply, ensure_check_timer(State#{is_cpu_alarm_set := false})} end. terminate(_Reason, #{timer := Timer}) -> diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index c108f8b33..164655d47 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -60,14 +60,15 @@ load() -> load_expand_plugins() -> case emqx_config:get_env(expand_plugins_dir) of undefined -> ok; - Dir -> - PluginsDir = filelib:wildcard("*", Dir), - lists:foreach(fun(PluginDir) -> - case filelib:is_dir(Dir ++ PluginDir) of - true -> load_expand_plugin(Dir ++ PluginDir); + ExpandPluginsDir -> + Plugins = filelib:wildcard("*", ExpandPluginsDir), + lists:foreach(fun(Plugin) -> + PluginDir = filename:join(ExpandPluginsDir, Plugin), + case filelib:is_dir(PluginDir) of + true -> load_expand_plugin(PluginDir); false -> ok end - end, PluginsDir) + end, Plugins) end. load_expand_plugin(PluginDir) -> @@ -98,25 +99,6 @@ init_expand_plugin_config(PluginDir) -> [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs] end, AppsEnv). -get_expand_plugin_config() -> - case emqx_config:get_env(expand_plugins_dir) of - undefined -> ok; - Dir -> - PluginsDir = filelib:wildcard("*", Dir), - lists:foldl(fun(PluginDir, Acc) -> - case filelib:is_dir(Dir ++ PluginDir) of - true -> - Etc = Dir ++ PluginDir ++ "/etc", - case filelib:wildcard("*.{conf,config}", Etc) of - [] -> Acc; - [Conf] -> [Conf | Acc] - end; - false -> - Acc - end - end, [], PluginsDir) - end. - ensure_file(File) -> case filelib:is_file(File) of false -> write_loaded([]); true -> ok end. @@ -155,23 +137,16 @@ stop_plugins(Names) -> %% @doc List all available plugins -spec(list() -> [emqx_types:plugin()]). list() -> - case emqx_config:get_env(plugins_etc_dir) of - undefined -> - []; - PluginsEtc -> - CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc) ++ get_expand_plugin_config(), - Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles], - StartedApps = names(started_app), - lists:map(fun(Plugin = #plugin{name = Name}) -> - case lists:member(Name, StartedApps) of - true -> Plugin#plugin{active = true}; - false -> Plugin - end - end, Plugins) - end. + StartedApps = names(started_app), + lists:map(fun({Name, _, _}) -> + Plugin = plugin(Name), + case lists:member(Name, StartedApps) of + true -> Plugin#plugin{active = true}; + false -> Plugin + end + end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))). -plugin(CfgFile) -> - AppName = app_name(CfgFile), +plugin(AppName) -> case application:get_all_key(AppName) of {ok, Attrs} -> Ver = proplists:get_value(vsn, Attrs, "0"), @@ -268,10 +243,6 @@ stop_app(App) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - -app_name(File) -> - [AppName | _] = string:tokens(File, "."), list_to_atom(AppName). - names(plugin) -> names(list()); diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index aa4ad5c8a..4548821b0 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -555,7 +555,7 @@ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId), PState = #pstate{session = SPid, mountpoint = MountPoint}) -> Msg = emqx_mountpoint:mount(MountPoint, emqx_packet:to_message(credentials(PState), Packet)), - puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, Msg), PState). + puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState). %%------------------------------------------------------------------------------ %% Puback -> Client diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index 62fa8a6c2..ebe992ffe 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -153,4 +153,4 @@ connack_error(server_unavailable) -> ?RC_SERVER_UNAVAILABLE; connack_error(server_busy) -> ?RC_SERVER_BUSY; connack_error(banned) -> ?RC_BANNED; connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD; -connack_error(_) -> ?RC_NOT_AUTHORIZED. \ No newline at end of file +connack_error(_) -> ?RC_NOT_AUTHORIZED. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index a3023b382..8be299ced 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -390,7 +390,7 @@ deliver_fun(ConnPid) when node(ConnPid) == node() -> deliver_fun(ConnPid) -> Node = node(ConnPid), fun(Packet) -> - emqx_rpc:cast(Node, erlang, send, [ConnPid, {deliver, Packet}]) + true = emqx_rpc:cast(Node, erlang, send, [ConnPid, {deliver, Packet}]), ok end. handle_call(info, _From, State) -> @@ -941,11 +941,8 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use %% Deliver %%------------------------------------------------------------------------------ -redeliver({PacketId, Msg = #message{qos = QoS}}, State) -> - Msg1 = if - QoS =:= ?QOS_2 -> Msg; - true -> emqx_message:set_flag(dup, Msg) - end, +redeliver({PacketId, Msg = #message{qos = QoS}}, State) when QoS =/= ?QOS_0 -> + Msg1 = emqx_message:set_flag(dup, Msg), do_deliver(PacketId, Msg1, State); redeliver({pubrel, PacketId}, #state{deliver_fun = DeliverFun}) -> @@ -1123,4 +1120,4 @@ do_subscribe(ClientId, Username, Topic, SubOpts, SubMap) -> emqx_broker:subscribe(Topic, ClientId, SubOpts), ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]), maps:put(Topic, SubOpts, SubMap) - end. \ No newline at end of file + end. diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 2fe34e853..586a1aef0 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -40,10 +40,19 @@ -type(username() :: maybe(binary())). -type(password() :: maybe(binary())). -type(peername() :: {inet:ip_address(), inet:port_number()}). +-type(auth_result() :: success + | client_identifier_not_valid + | bad_username_or_password + | not_authorized + | server_unavailable + | server_busy + | banned + | bad_authentication_method). -type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()). -type(credentials() :: #{client_id := client_id(), username := username(), peername := peername(), + result := auth_result(), zone => zone(), atom() => term() }). diff --git a/src/emqx_vm_mon.erl b/src/emqx_vm_mon.erl index a4562198e..be4f5749e 100644 --- a/src/emqx_vm_mon.erl +++ b/src/emqx_vm_mon.erl @@ -67,7 +67,8 @@ init([Opts]) -> {ok, ensure_check_timer(#{check_interval => proplists:get_value(check_interval, Opts, 30), process_high_watermark => proplists:get_value(process_high_watermark, Opts, 0.70), process_low_watermark => proplists:get_value(process_low_watermark, Opts, 0.50), - timer => undefined})}. + timer => undefined, + is_process_alarm_set => false})}. handle_call(get_check_interval, _From, State) -> {reply, maps:get(check_interval, State, undefined), State}; @@ -92,15 +93,20 @@ handle_cast(_Request, State) -> handle_info({timeout, Timer, check}, State = #{timer := Timer, process_high_watermark := ProcHighWatermark, - process_low_watermark := ProcLowWatermark}) -> + process_low_watermark := ProcLowWatermark, + is_process_alarm_set := IsProcessAlarmSet}) -> ProcessCount = erlang:system_info(process_count), case ProcessCount / erlang:system_info(process_limit) of Percent when Percent >= ProcHighWatermark -> - alarm_handler:set_alarm({too_many_processes, ProcessCount}); + alarm_handler:set_alarm({too_many_processes, ProcessCount}), + {noreply, ensure_check_timer(State#{is_process_alarm_set := true})}; Percent when Percent < ProcLowWatermark -> - alarm_handler:clear_alarm(too_many_processes) - end, - {noreply, ensure_check_timer(State)}. + case IsProcessAlarmSet of + true -> alarm_handler:clear_alarm(too_many_processes); + false -> ok + end, + {noreply, ensure_check_timer(State#{is_process_alarm_set := false})} + end. terminate(_Reason, #{timer := Timer}) -> emqx_misc:cancel_timer(Timer).