Merge remote-tracking branch 'origin/develop'
This commit is contained in:
commit
962fb0cec5
|
@ -26,11 +26,11 @@
|
||||||
-spec(authenticate(emqx_types:credentials())
|
-spec(authenticate(emqx_types:credentials())
|
||||||
-> {ok, emqx_types:credentials()} | {error, term()}).
|
-> {ok, emqx_types:credentials()} | {error, term()}).
|
||||||
authenticate(Credentials) ->
|
authenticate(Credentials) ->
|
||||||
case emqx_hooks:run_fold('client.authenticate', [], Credentials#{result => init_result(Credentials)}) of
|
case emqx_hooks:run_fold('client.authenticate', [], Credentials#{auth_result => init_auth_result(Credentials)}) of
|
||||||
#{result := success} = NewCredentials ->
|
#{auth_result := success} = NewCredentials ->
|
||||||
{ok, NewCredentials};
|
{ok, NewCredentials};
|
||||||
NewCredentials ->
|
NewCredentials ->
|
||||||
{error, maps:get(result, NewCredentials, unknown_error)}
|
{error, maps:get(auth_result, NewCredentials, unknown_error)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Check ACL
|
%% @doc Check ACL
|
||||||
|
@ -61,7 +61,7 @@ do_check_acl(#{zone := Zone} = Credentials, PubSub, Topic) ->
|
||||||
reload_acl() ->
|
reload_acl() ->
|
||||||
emqx_mod_acl_internal:reload_acl().
|
emqx_mod_acl_internal:reload_acl().
|
||||||
|
|
||||||
init_result(Credentials) ->
|
init_auth_result(Credentials) ->
|
||||||
case emqx_zone:get_env(maps:get(zone, Credentials, undefined), allow_anonymous, false) of
|
case emqx_zone:get_env(maps:get(zone, Credentials, undefined), allow_anonymous, false) of
|
||||||
true -> success;
|
true -> success;
|
||||||
false -> not_authorized
|
false -> not_authorized
|
||||||
|
|
|
@ -104,7 +104,7 @@ match(Credentials, Topic, [Rule|Rules]) ->
|
||||||
-spec(reload_acl() -> ok | {error, term()}).
|
-spec(reload_acl() -> ok | {error, term()}).
|
||||||
reload_acl() ->
|
reload_acl() ->
|
||||||
try load_rules_from_file(acl_file()) of
|
try load_rules_from_file(acl_file()) of
|
||||||
ok ->
|
_ ->
|
||||||
emqx_logger:info("Reload acl_file ~s successfully", [acl_file()]),
|
emqx_logger:info("Reload acl_file ~s successfully", [acl_file()]),
|
||||||
ok;
|
ok;
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
|
|
|
@ -97,7 +97,8 @@ init([Opts]) ->
|
||||||
{ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts, 0.80),
|
{ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts, 0.80),
|
||||||
cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts, 0.60),
|
cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts, 0.60),
|
||||||
cpu_check_interval => proplists:get_value(cpu_check_interval, Opts, 60),
|
cpu_check_interval => proplists:get_value(cpu_check_interval, Opts, 60),
|
||||||
timer => undefined})}.
|
timer => undefined,
|
||||||
|
is_cpu_alarm_set => false})}.
|
||||||
|
|
||||||
handle_call(get_cpu_check_interval, _From, State) ->
|
handle_call(get_cpu_check_interval, _From, State) ->
|
||||||
{reply, maps:get(cpu_check_interval, State, undefined), State};
|
{reply, maps:get(cpu_check_interval, State, undefined), State};
|
||||||
|
@ -122,7 +123,8 @@ handle_cast(_Request, State) ->
|
||||||
|
|
||||||
handle_info({timeout, Timer, check}, State = #{timer := Timer,
|
handle_info({timeout, Timer, check}, State = #{timer := Timer,
|
||||||
cpu_high_watermark := CPUHighWatermark,
|
cpu_high_watermark := CPUHighWatermark,
|
||||||
cpu_low_watermark := CPULowWatermark}) ->
|
cpu_low_watermark := CPULowWatermark,
|
||||||
|
is_cpu_alarm_set := IsCPUAlarmSet}) ->
|
||||||
case cpu_sup:util() of
|
case cpu_sup:util() of
|
||||||
0 ->
|
0 ->
|
||||||
{noreply, State#{timer := undefined}};
|
{noreply, State#{timer := undefined}};
|
||||||
|
@ -131,10 +133,13 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer,
|
||||||
{noreply, ensure_check_timer(State)};
|
{noreply, ensure_check_timer(State)};
|
||||||
Busy when Busy / 100 >= CPUHighWatermark ->
|
Busy when Busy / 100 >= CPUHighWatermark ->
|
||||||
alarm_handler:set_alarm({cpu_high_watermark, Busy}),
|
alarm_handler:set_alarm({cpu_high_watermark, Busy}),
|
||||||
{noreply, ensure_check_timer(State)};
|
{noreply, ensure_check_timer(State#{is_cpu_alarm_set := true})};
|
||||||
Busy when Busy / 100 < CPULowWatermark ->
|
Busy when Busy / 100 < CPULowWatermark ->
|
||||||
alarm_handler:clear_alarm(cpu_high_watermark),
|
case IsCPUAlarmSet of
|
||||||
{noreply, ensure_check_timer(State)}
|
true -> alarm_handler:clear_alarm(cpu_high_watermark);
|
||||||
|
false -> ok
|
||||||
|
end,
|
||||||
|
{noreply, ensure_check_timer(State#{is_cpu_alarm_set := false})}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
terminate(_Reason, #{timer := Timer}) ->
|
terminate(_Reason, #{timer := Timer}) ->
|
||||||
|
|
|
@ -60,14 +60,15 @@ load() ->
|
||||||
load_expand_plugins() ->
|
load_expand_plugins() ->
|
||||||
case emqx_config:get_env(expand_plugins_dir) of
|
case emqx_config:get_env(expand_plugins_dir) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
Dir ->
|
ExpandPluginsDir ->
|
||||||
PluginsDir = filelib:wildcard("*", Dir),
|
Plugins = filelib:wildcard("*", ExpandPluginsDir),
|
||||||
lists:foreach(fun(PluginDir) ->
|
lists:foreach(fun(Plugin) ->
|
||||||
case filelib:is_dir(Dir ++ PluginDir) of
|
PluginDir = filename:join(ExpandPluginsDir, Plugin),
|
||||||
true -> load_expand_plugin(Dir ++ PluginDir);
|
case filelib:is_dir(PluginDir) of
|
||||||
|
true -> load_expand_plugin(PluginDir);
|
||||||
false -> ok
|
false -> ok
|
||||||
end
|
end
|
||||||
end, PluginsDir)
|
end, Plugins)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
load_expand_plugin(PluginDir) ->
|
load_expand_plugin(PluginDir) ->
|
||||||
|
@ -98,25 +99,6 @@ init_expand_plugin_config(PluginDir) ->
|
||||||
[application:set_env(AppName, Par, Val) || {Par, Val} <- Envs]
|
[application:set_env(AppName, Par, Val) || {Par, Val} <- Envs]
|
||||||
end, AppsEnv).
|
end, AppsEnv).
|
||||||
|
|
||||||
get_expand_plugin_config() ->
|
|
||||||
case emqx_config:get_env(expand_plugins_dir) of
|
|
||||||
undefined -> ok;
|
|
||||||
Dir ->
|
|
||||||
PluginsDir = filelib:wildcard("*", Dir),
|
|
||||||
lists:foldl(fun(PluginDir, Acc) ->
|
|
||||||
case filelib:is_dir(Dir ++ PluginDir) of
|
|
||||||
true ->
|
|
||||||
Etc = Dir ++ PluginDir ++ "/etc",
|
|
||||||
case filelib:wildcard("*.{conf,config}", Etc) of
|
|
||||||
[] -> Acc;
|
|
||||||
[Conf] -> [Conf | Acc]
|
|
||||||
end;
|
|
||||||
false ->
|
|
||||||
Acc
|
|
||||||
end
|
|
||||||
end, [], PluginsDir)
|
|
||||||
end.
|
|
||||||
|
|
||||||
ensure_file(File) ->
|
ensure_file(File) ->
|
||||||
case filelib:is_file(File) of false -> write_loaded([]); true -> ok end.
|
case filelib:is_file(File) of false -> write_loaded([]); true -> ok end.
|
||||||
|
|
||||||
|
@ -155,23 +137,16 @@ stop_plugins(Names) ->
|
||||||
%% @doc List all available plugins
|
%% @doc List all available plugins
|
||||||
-spec(list() -> [emqx_types:plugin()]).
|
-spec(list() -> [emqx_types:plugin()]).
|
||||||
list() ->
|
list() ->
|
||||||
case emqx_config:get_env(plugins_etc_dir) of
|
StartedApps = names(started_app),
|
||||||
undefined ->
|
lists:map(fun({Name, _, _}) ->
|
||||||
[];
|
Plugin = plugin(Name),
|
||||||
PluginsEtc ->
|
case lists:member(Name, StartedApps) of
|
||||||
CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc) ++ get_expand_plugin_config(),
|
true -> Plugin#plugin{active = true};
|
||||||
Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles],
|
false -> Plugin
|
||||||
StartedApps = names(started_app),
|
end
|
||||||
lists:map(fun(Plugin = #plugin{name = Name}) ->
|
end, lists:sort(ekka_boot:all_module_attributes(emqx_plugin))).
|
||||||
case lists:member(Name, StartedApps) of
|
|
||||||
true -> Plugin#plugin{active = true};
|
|
||||||
false -> Plugin
|
|
||||||
end
|
|
||||||
end, Plugins)
|
|
||||||
end.
|
|
||||||
|
|
||||||
plugin(CfgFile) ->
|
plugin(AppName) ->
|
||||||
AppName = app_name(CfgFile),
|
|
||||||
case application:get_all_key(AppName) of
|
case application:get_all_key(AppName) of
|
||||||
{ok, Attrs} ->
|
{ok, Attrs} ->
|
||||||
Ver = proplists:get_value(vsn, Attrs, "0"),
|
Ver = proplists:get_value(vsn, Attrs, "0"),
|
||||||
|
@ -268,10 +243,6 @@ stop_app(App) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
app_name(File) ->
|
|
||||||
[AppName | _] = string:tokens(File, "."), list_to_atom(AppName).
|
|
||||||
|
|
||||||
names(plugin) ->
|
names(plugin) ->
|
||||||
names(list());
|
names(list());
|
||||||
|
|
||||||
|
|
|
@ -555,7 +555,7 @@ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
|
||||||
PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
|
PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
|
||||||
Msg = emqx_mountpoint:mount(MountPoint,
|
Msg = emqx_mountpoint:mount(MountPoint,
|
||||||
emqx_packet:to_message(credentials(PState), Packet)),
|
emqx_packet:to_message(credentials(PState), Packet)),
|
||||||
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, Msg), PState).
|
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Puback -> Client
|
%% Puback -> Client
|
||||||
|
|
|
@ -153,4 +153,4 @@ connack_error(server_unavailable) -> ?RC_SERVER_UNAVAILABLE;
|
||||||
connack_error(server_busy) -> ?RC_SERVER_BUSY;
|
connack_error(server_busy) -> ?RC_SERVER_BUSY;
|
||||||
connack_error(banned) -> ?RC_BANNED;
|
connack_error(banned) -> ?RC_BANNED;
|
||||||
connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD;
|
connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD;
|
||||||
connack_error(_) -> ?RC_NOT_AUTHORIZED.
|
connack_error(_) -> ?RC_NOT_AUTHORIZED.
|
||||||
|
|
|
@ -390,7 +390,7 @@ deliver_fun(ConnPid) when node(ConnPid) == node() ->
|
||||||
deliver_fun(ConnPid) ->
|
deliver_fun(ConnPid) ->
|
||||||
Node = node(ConnPid),
|
Node = node(ConnPid),
|
||||||
fun(Packet) ->
|
fun(Packet) ->
|
||||||
emqx_rpc:cast(Node, erlang, send, [ConnPid, {deliver, Packet}])
|
true = emqx_rpc:cast(Node, erlang, send, [ConnPid, {deliver, Packet}]), ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_call(info, _From, State) ->
|
handle_call(info, _From, State) ->
|
||||||
|
@ -941,11 +941,8 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use
|
||||||
%% Deliver
|
%% Deliver
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
redeliver({PacketId, Msg = #message{qos = QoS}}, State) ->
|
redeliver({PacketId, Msg = #message{qos = QoS}}, State) when QoS =/= ?QOS_0 ->
|
||||||
Msg1 = if
|
Msg1 = emqx_message:set_flag(dup, Msg),
|
||||||
QoS =:= ?QOS_2 -> Msg;
|
|
||||||
true -> emqx_message:set_flag(dup, Msg)
|
|
||||||
end,
|
|
||||||
do_deliver(PacketId, Msg1, State);
|
do_deliver(PacketId, Msg1, State);
|
||||||
|
|
||||||
redeliver({pubrel, PacketId}, #state{deliver_fun = DeliverFun}) ->
|
redeliver({pubrel, PacketId}, #state{deliver_fun = DeliverFun}) ->
|
||||||
|
@ -1123,4 +1120,4 @@ do_subscribe(ClientId, Username, Topic, SubOpts, SubMap) ->
|
||||||
emqx_broker:subscribe(Topic, ClientId, SubOpts),
|
emqx_broker:subscribe(Topic, ClientId, SubOpts),
|
||||||
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
|
ok = emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
|
||||||
maps:put(Topic, SubOpts, SubMap)
|
maps:put(Topic, SubOpts, SubMap)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -40,10 +40,19 @@
|
||||||
-type(username() :: maybe(binary())).
|
-type(username() :: maybe(binary())).
|
||||||
-type(password() :: maybe(binary())).
|
-type(password() :: maybe(binary())).
|
||||||
-type(peername() :: {inet:ip_address(), inet:port_number()}).
|
-type(peername() :: {inet:ip_address(), inet:port_number()}).
|
||||||
|
-type(auth_result() :: success
|
||||||
|
| client_identifier_not_valid
|
||||||
|
| bad_username_or_password
|
||||||
|
| not_authorized
|
||||||
|
| server_unavailable
|
||||||
|
| server_busy
|
||||||
|
| banned
|
||||||
|
| bad_authentication_method).
|
||||||
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()).
|
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()).
|
||||||
-type(credentials() :: #{client_id := client_id(),
|
-type(credentials() :: #{client_id := client_id(),
|
||||||
username := username(),
|
username := username(),
|
||||||
peername := peername(),
|
peername := peername(),
|
||||||
|
result := auth_result(),
|
||||||
zone => zone(),
|
zone => zone(),
|
||||||
atom() => term()
|
atom() => term()
|
||||||
}).
|
}).
|
||||||
|
|
|
@ -67,7 +67,8 @@ init([Opts]) ->
|
||||||
{ok, ensure_check_timer(#{check_interval => proplists:get_value(check_interval, Opts, 30),
|
{ok, ensure_check_timer(#{check_interval => proplists:get_value(check_interval, Opts, 30),
|
||||||
process_high_watermark => proplists:get_value(process_high_watermark, Opts, 0.70),
|
process_high_watermark => proplists:get_value(process_high_watermark, Opts, 0.70),
|
||||||
process_low_watermark => proplists:get_value(process_low_watermark, Opts, 0.50),
|
process_low_watermark => proplists:get_value(process_low_watermark, Opts, 0.50),
|
||||||
timer => undefined})}.
|
timer => undefined,
|
||||||
|
is_process_alarm_set => false})}.
|
||||||
|
|
||||||
handle_call(get_check_interval, _From, State) ->
|
handle_call(get_check_interval, _From, State) ->
|
||||||
{reply, maps:get(check_interval, State, undefined), State};
|
{reply, maps:get(check_interval, State, undefined), State};
|
||||||
|
@ -92,15 +93,20 @@ handle_cast(_Request, State) ->
|
||||||
|
|
||||||
handle_info({timeout, Timer, check}, State = #{timer := Timer,
|
handle_info({timeout, Timer, check}, State = #{timer := Timer,
|
||||||
process_high_watermark := ProcHighWatermark,
|
process_high_watermark := ProcHighWatermark,
|
||||||
process_low_watermark := ProcLowWatermark}) ->
|
process_low_watermark := ProcLowWatermark,
|
||||||
|
is_process_alarm_set := IsProcessAlarmSet}) ->
|
||||||
ProcessCount = erlang:system_info(process_count),
|
ProcessCount = erlang:system_info(process_count),
|
||||||
case ProcessCount / erlang:system_info(process_limit) of
|
case ProcessCount / erlang:system_info(process_limit) of
|
||||||
Percent when Percent >= ProcHighWatermark ->
|
Percent when Percent >= ProcHighWatermark ->
|
||||||
alarm_handler:set_alarm({too_many_processes, ProcessCount});
|
alarm_handler:set_alarm({too_many_processes, ProcessCount}),
|
||||||
|
{noreply, ensure_check_timer(State#{is_process_alarm_set := true})};
|
||||||
Percent when Percent < ProcLowWatermark ->
|
Percent when Percent < ProcLowWatermark ->
|
||||||
alarm_handler:clear_alarm(too_many_processes)
|
case IsProcessAlarmSet of
|
||||||
end,
|
true -> alarm_handler:clear_alarm(too_many_processes);
|
||||||
{noreply, ensure_check_timer(State)}.
|
false -> ok
|
||||||
|
end,
|
||||||
|
{noreply, ensure_check_timer(State#{is_process_alarm_set := false})}
|
||||||
|
end.
|
||||||
|
|
||||||
terminate(_Reason, #{timer := Timer}) ->
|
terminate(_Reason, #{timer := Timer}) ->
|
||||||
emqx_misc:cancel_timer(Timer).
|
emqx_misc:cancel_timer(Timer).
|
||||||
|
|
Loading…
Reference in New Issue