Merge pull request #3381 from emqx/master
Auto-pull-request-by-2020-04-11
This commit is contained in:
commit
455a04e92f
|
@ -1399,18 +1399,6 @@ listener.ws.external.access.1 = allow all
|
||||||
## Value: on | off
|
## Value: on | off
|
||||||
listener.ws.external.verify_protocol_header = on
|
listener.ws.external.verify_protocol_header = on
|
||||||
|
|
||||||
## Use X-Forwarded-For header for real source IP if the EMQ X cluster is
|
|
||||||
## deployed behind NGINX or HAProxy.
|
|
||||||
##
|
|
||||||
## Value: String
|
|
||||||
## listener.ws.external.proxy_address_header = X-Forwarded-For
|
|
||||||
|
|
||||||
## Use X-Forwarded-Port header for real source port if the EMQ X cluster is
|
|
||||||
## deployed behind NGINX or HAProxy.
|
|
||||||
##
|
|
||||||
## Value: String
|
|
||||||
## listener.ws.external.proxy_port_header = X-Forwarded-Port
|
|
||||||
|
|
||||||
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
|
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
|
||||||
## HAProxy or Nginx.
|
## HAProxy or Nginx.
|
||||||
##
|
##
|
||||||
|
@ -1830,15 +1818,14 @@ listener.wss.external.send_timeout_close = on
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
## Modules
|
## Modules
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
|
## The file to store loaded module names.
|
||||||
|
##
|
||||||
|
## Value: File
|
||||||
|
modules.loaded_file = {{ platform_data_dir }}/loaded_modules
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
## Presence Module
|
## Presence Module
|
||||||
|
|
||||||
## Enable Presence Module.
|
|
||||||
##
|
|
||||||
## Value: on | off
|
|
||||||
module.presence = on
|
|
||||||
|
|
||||||
## Sets the QoS for presence MQTT message.
|
## Sets the QoS for presence MQTT message.
|
||||||
##
|
##
|
||||||
## Value: 0 | 1 | 2
|
## Value: 0 | 1 | 2
|
||||||
|
@ -1847,11 +1834,6 @@ module.presence.qos = 1
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
## Subscription Module
|
## Subscription Module
|
||||||
|
|
||||||
## Enable Subscription Module.
|
|
||||||
##
|
|
||||||
## Value: on | off
|
|
||||||
module.subscription = off
|
|
||||||
|
|
||||||
## Subscribe the Topics automatically when client connected.
|
## Subscribe the Topics automatically when client connected.
|
||||||
##
|
##
|
||||||
## Value: String
|
## Value: String
|
||||||
|
@ -1887,31 +1869,10 @@ module.subscription = off
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
## Rewrite Module
|
## Rewrite Module
|
||||||
|
|
||||||
## Enable Rewrite Module.
|
|
||||||
##
|
|
||||||
## Value: on | off
|
|
||||||
module.rewrite = off
|
|
||||||
|
|
||||||
## {rewrite, Topic, Re, Dest}
|
## {rewrite, Topic, Re, Dest}
|
||||||
## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1
|
## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1
|
||||||
## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
|
## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
|
||||||
## Topic Metrics Module
|
|
||||||
|
|
||||||
## Enable Topic Metrics Module.
|
|
||||||
##
|
|
||||||
## Value: on | off
|
|
||||||
module.topic_metrics = off
|
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
|
||||||
## Delayed Module
|
|
||||||
|
|
||||||
## Enable Delayed Module.
|
|
||||||
##
|
|
||||||
## Value: on | off
|
|
||||||
module.delayed = off
|
|
||||||
|
|
||||||
##-------------------------------------------------------------------
|
##-------------------------------------------------------------------
|
||||||
## Plugins
|
## Plugins
|
||||||
##-------------------------------------------------------------------
|
##-------------------------------------------------------------------
|
||||||
|
|
|
@ -1310,16 +1310,6 @@ end}.
|
||||||
{datatype, flag}
|
{datatype, flag}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [
|
|
||||||
{datatype, string},
|
|
||||||
hidden
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "listener.ws.$name.proxy_port_header", "emqx.listeners", [
|
|
||||||
{datatype, string},
|
|
||||||
hidden
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "listener.ws.$name.proxy_protocol", "emqx.listeners", [
|
{mapping, "listener.ws.$name.proxy_protocol", "emqx.listeners", [
|
||||||
{datatype, flag}
|
{datatype, flag}
|
||||||
]}.
|
]}.
|
||||||
|
@ -1467,16 +1457,6 @@ end}.
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "listener.wss.$name.proxy_address_header", "emqx.listeners", [
|
|
||||||
{datatype, string},
|
|
||||||
hidden
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "listener.wss.$name.proxy_port_header", "emqx.listeners", [
|
|
||||||
{datatype, string},
|
|
||||||
hidden
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "listener.wss.$name.proxy_protocol", "emqx.listeners", [
|
{mapping, "listener.wss.$name.proxy_protocol", "emqx.listeners", [
|
||||||
{datatype, flag}
|
{datatype, flag}
|
||||||
]}.
|
]}.
|
||||||
|
@ -1681,11 +1661,9 @@ end}.
|
||||||
{proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
|
{proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
|
||||||
{verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)},
|
{verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)},
|
||||||
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
|
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
|
||||||
{proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)},
|
|
||||||
{compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)},
|
{compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)},
|
||||||
{idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)},
|
{idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)},
|
||||||
{max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)},
|
{max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)} | AccOpts(Prefix)])
|
||||||
{proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)])
|
|
||||||
end,
|
end,
|
||||||
DeflateOpts = fun(Prefix) ->
|
DeflateOpts = fun(Prefix) ->
|
||||||
Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)},
|
Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)},
|
||||||
|
@ -1788,9 +1766,8 @@ end}.
|
||||||
%% Modules
|
%% Modules
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
{mapping, "module.presence", "emqx.modules", [
|
{mapping, "modules.loaded_file", "emqx.modules_loaded_file", [
|
||||||
{default, off},
|
{datatype, string}
|
||||||
{datatype, flag}
|
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "module.presence.qos", "emqx.modules", [
|
{mapping, "module.presence.qos", "emqx.modules", [
|
||||||
|
@ -1799,11 +1776,6 @@ end}.
|
||||||
{validators, ["range:0-2"]}
|
{validators, ["range:0-2"]}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "module.subscription", "emqx.modules", [
|
|
||||||
{default, off},
|
|
||||||
{datatype, flag}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.subscription.$id.topic", "emqx.modules", [
|
{mapping, "module.subscription.$id.topic", "emqx.modules", [
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
@ -1832,25 +1804,10 @@ end}.
|
||||||
{validators, ["range:0-2"]}
|
{validators, ["range:0-2"]}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "module.rewrite", "emqx.modules", [
|
|
||||||
{default, off},
|
|
||||||
{datatype, flag}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.rewrite.rule.$id", "emqx.modules", [
|
{mapping, "module.rewrite.rule.$id", "emqx.modules", [
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{mapping, "module.topic_metrics", "emqx.modules", [
|
|
||||||
{default, off},
|
|
||||||
{datatype, flag}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.delayed", "emqx.modules", [
|
|
||||||
{default, off},
|
|
||||||
{datatype, flag}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{translation, "emqx.modules", fun(Conf) ->
|
{translation, "emqx.modules", fun(Conf) ->
|
||||||
Subscriptions = fun() ->
|
Subscriptions = fun() ->
|
||||||
List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf),
|
List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf),
|
||||||
|
@ -1869,26 +1826,12 @@ end}.
|
||||||
end, Rules)
|
end, Rules)
|
||||||
end,
|
end,
|
||||||
lists:append([
|
lists:append([
|
||||||
case cuttlefish:conf_get("module.presence", Conf) of %% Presence
|
[{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}],
|
||||||
true -> [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}];
|
[{emqx_mod_subscription, Subscriptions()}],
|
||||||
false -> []
|
[{emqx_mod_rewrite, Rewrites()}],
|
||||||
end,
|
[{emqx_mod_topic_metrics, []}],
|
||||||
case cuttlefish:conf_get("module.subscription", Conf) of %% Subscription
|
[{emqx_mod_delayed, []}],
|
||||||
true -> [{emqx_mod_subscription, Subscriptions()}];
|
[{emqx_mod_acl_internal, []}]
|
||||||
false -> []
|
|
||||||
end,
|
|
||||||
case cuttlefish:conf_get("module.rewrite", Conf) of %% Rewrite
|
|
||||||
true -> [{emqx_mod_rewrite, Rewrites()}];
|
|
||||||
false -> []
|
|
||||||
end,
|
|
||||||
case cuttlefish:conf_get("module.topic_metrics", Conf) of %% Topic Metrics
|
|
||||||
true -> [{emqx_mod_topic_metrics, []}];
|
|
||||||
false -> []
|
|
||||||
end,
|
|
||||||
case cuttlefish:conf_get("module.delayed", Conf) of %% Delayed
|
|
||||||
true -> [{emqx_mod_delayed, []}];
|
|
||||||
false -> []
|
|
||||||
end
|
|
||||||
])
|
])
|
||||||
end}.
|
end}.
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@
|
||||||
-export([authenticate/1]).
|
-export([authenticate/1]).
|
||||||
|
|
||||||
-export([ check_acl/3
|
-export([ check_acl/3
|
||||||
, reload_acl/0
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-type(result() :: #{auth_result := emqx_types:auth_result(),
|
-type(result() :: #{auth_result := emqx_types:auth_result(),
|
||||||
|
@ -67,11 +66,6 @@ do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
|
||||||
_Other -> deny
|
_Other -> deny
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(reload_acl() -> ok | {error, term()}).
|
|
||||||
reload_acl() ->
|
|
||||||
emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(),
|
|
||||||
emqx_mod_acl_internal:reload_acl().
|
|
||||||
|
|
||||||
default_auth_result(Zone) ->
|
default_auth_result(Zone) ->
|
||||||
case emqx_zone:get_env(Zone, allow_anonymous, false) of
|
case emqx_zone:get_env(Zone, allow_anonymous, false) of
|
||||||
true -> #{auth_result => success, anonymous => true};
|
true -> #{auth_result => success, anonymous => true};
|
||||||
|
|
|
@ -71,6 +71,8 @@
|
||||||
topic_aliases :: emqx_types:topic_aliases(),
|
topic_aliases :: emqx_types:topic_aliases(),
|
||||||
%% MQTT Topic Alias Maximum
|
%% MQTT Topic Alias Maximum
|
||||||
alias_maximum :: maybe(map()),
|
alias_maximum :: maybe(map()),
|
||||||
|
%% Authentication Data Cache
|
||||||
|
auth_cache :: maybe(map()),
|
||||||
%% Timers
|
%% Timers
|
||||||
timers :: #{atom() => disabled | maybe(reference())},
|
timers :: #{atom() => disabled | maybe(reference())},
|
||||||
%% Conn State
|
%% Conn State
|
||||||
|
@ -185,6 +187,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
|
||||||
topic_aliases = #{inbound => #{},
|
topic_aliases = #{inbound => #{},
|
||||||
outbound => #{}
|
outbound => #{}
|
||||||
},
|
},
|
||||||
|
auth_cache = #{},
|
||||||
timers = #{},
|
timers = #{},
|
||||||
conn_state = idle,
|
conn_state = idle,
|
||||||
takeover = false,
|
takeover = false,
|
||||||
|
@ -216,10 +219,41 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
||||||
fun check_banned/2,
|
fun check_banned/2,
|
||||||
fun auth_connect/2
|
fun auth_connect/2
|
||||||
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
], ConnPkt, Channel#channel{conn_state = connecting}) of
|
||||||
{ok, NConnPkt, NChannel} ->
|
{ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
|
||||||
process_connect(NConnPkt, NChannel);
|
NChannel1 = NChannel#channel{
|
||||||
|
will_msg = emqx_packet:will_msg(NConnPkt),
|
||||||
|
alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
|
||||||
|
},
|
||||||
|
case enhanced_auth(?CONNECT_PACKET(NConnPkt), NChannel1) of
|
||||||
|
{ok, Properties, NChannel2} ->
|
||||||
|
process_connect(Properties, ensure_connected(NChannel2));
|
||||||
|
{continue, Properties, NChannel2} ->
|
||||||
|
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
|
||||||
|
{error, ReasonCode, NChannel2} ->
|
||||||
|
handle_out(connack, ReasonCode, NChannel2)
|
||||||
|
end;
|
||||||
{error, ReasonCode, NChannel} ->
|
{error, ReasonCode, NChannel} ->
|
||||||
handle_out(connack, {ReasonCode, ConnPkt}, NChannel)
|
handle_out(connack, ReasonCode, NChannel)
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_in(Packet = ?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION, _Properties), Channel) ->
|
||||||
|
case enhanced_auth(Packet, Channel) of
|
||||||
|
{ok, NProperties, NChannel} ->
|
||||||
|
process_connect(NProperties, ensure_connected(NChannel));
|
||||||
|
{continue, NProperties, NChannel} ->
|
||||||
|
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel);
|
||||||
|
{error, NReasonCode, NChannel} ->
|
||||||
|
handle_out(connack, NReasonCode, NChannel)
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_in(Packet = ?AUTH_PACKET(?RC_RE_AUTHENTICATE, _Properties), Channel) ->
|
||||||
|
case enhanced_auth(Packet, Channel) of
|
||||||
|
{ok, NProperties, NChannel} ->
|
||||||
|
handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel);
|
||||||
|
{continue, NProperties, NChannel} ->
|
||||||
|
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel);
|
||||||
|
{error, NReasonCode, NChannel} ->
|
||||||
|
handle_out(disconnect, NReasonCode, NChannel)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
|
handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
|
||||||
|
@ -362,24 +396,23 @@ handle_in(Packet, Channel) ->
|
||||||
%% Process Connect
|
%% Process Connect
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart},
|
process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanStart} = ConnInfo, clientinfo = ClientInfo}) ->
|
||||||
Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
|
|
||||||
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
|
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
|
||||||
{ok, #{session := Session, present := false}} ->
|
{ok, #{session := Session, present := false}} ->
|
||||||
NChannel = Channel#channel{session = Session},
|
NChannel = Channel#channel{session = Session},
|
||||||
handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel);
|
handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, NChannel);
|
||||||
{ok, #{session := Session, present := true, pendings := Pendings}} ->
|
{ok, #{session := Session, present := true, pendings := Pendings}} ->
|
||||||
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
|
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
|
||||||
NChannel = Channel#channel{session = Session,
|
NChannel = Channel#channel{session = Session,
|
||||||
resuming = true,
|
resuming = true,
|
||||||
pendings = Pendings1
|
pendings = Pendings1
|
||||||
},
|
},
|
||||||
handle_out(connack, {?RC_SUCCESS, sp(true), ConnPkt}, NChannel);
|
handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, NChannel);
|
||||||
{error, client_id_unavailable} ->
|
{error, client_id_unavailable} ->
|
||||||
handle_out(connack, {?RC_CLIENT_IDENTIFIER_NOT_VALID, ConnPkt}, Channel);
|
handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?LOG(error, "Failed to open session due to ~p", [Reason]),
|
?LOG(error, "Failed to open session due to ~p", [Reason]),
|
||||||
handle_out(connack, {?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel)
|
handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -579,18 +612,17 @@ not_nacked({deliver, _Topic, Msg}) ->
|
||||||
| {ok, replies(), channel()}
|
| {ok, replies(), channel()}
|
||||||
| {shutdown, Reason :: term(), channel()}
|
| {shutdown, Reason :: term(), channel()}
|
||||||
| {shutdown, Reason :: term(), replies(), channel()}).
|
| {shutdown, Reason :: term(), replies(), channel()}).
|
||||||
handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
|
handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
AckProps = run_fold([fun enrich_connack_caps/2,
|
AckProps = run_fold([fun enrich_connack_caps/2,
|
||||||
fun enrich_server_keepalive/2,
|
fun enrich_server_keepalive/2,
|
||||||
fun enrich_assigned_clientid/2
|
fun enrich_assigned_clientid/2
|
||||||
], #{}, Channel),
|
], Props, Channel),
|
||||||
NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps),
|
NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps),
|
||||||
|
|
||||||
return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
|
return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps),
|
||||||
ensure_keepalive(NAckProps,
|
ensure_keepalive(NAckProps, Channel));
|
||||||
ensure_connected(ConnPkt, Channel)));
|
|
||||||
|
|
||||||
handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo}) ->
|
handle_out(connack, ReasonCode, Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
Reason = emqx_reason_codes:name(ReasonCode),
|
Reason = emqx_reason_codes:name(ReasonCode),
|
||||||
AckProps = run_hooks('client.connack', [ConnInfo, Reason], emqx_mqtt_props:new()),
|
AckProps = run_hooks('client.connack', [ConnInfo, Reason], emqx_mqtt_props:new()),
|
||||||
AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of
|
AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of
|
||||||
|
@ -643,6 +675,9 @@ handle_out(disconnect, {ReasonCode, ReasonName}, Channel = ?IS_MQTT_V5) ->
|
||||||
handle_out(disconnect, {_ReasonCode, ReasonName}, Channel) ->
|
handle_out(disconnect, {_ReasonCode, ReasonName}, Channel) ->
|
||||||
{ok, {close, ReasonName}, Channel};
|
{ok, {close, ReasonName}, Channel};
|
||||||
|
|
||||||
|
handle_out(auth, {ReasonCode, Properties}, Channel) ->
|
||||||
|
{ok, ?AUTH_PACKET(ReasonCode, Properties), Channel};
|
||||||
|
|
||||||
handle_out(Type, Data, Channel) ->
|
handle_out(Type, Data, Channel) ->
|
||||||
?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]),
|
?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
@ -1069,6 +1104,55 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId,
|
||||||
is_anonymous(#{anonymous := true}) -> true;
|
is_anonymous(#{anonymous := true}) -> true;
|
||||||
is_anonymous(_AuthResult) -> false.
|
is_anonymous(_AuthResult) -> false.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Enhanced Authentication
|
||||||
|
|
||||||
|
enhanced_auth(?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
proto_ver = Protover,
|
||||||
|
properties = Properties
|
||||||
|
}), Channel) ->
|
||||||
|
case Protover of
|
||||||
|
?MQTT_PROTO_V5 ->
|
||||||
|
AuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined),
|
||||||
|
AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
|
||||||
|
do_enhanced_auth(AuthMethod, AuthData, Channel);
|
||||||
|
_ ->
|
||||||
|
{ok, #{}, Channel}
|
||||||
|
end;
|
||||||
|
|
||||||
|
enhanced_auth(?AUTH_PACKET(_ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
|
AuthMethod = maps:get('Authentication-Method', maps:get(conn_props, ConnInfo), undefined),
|
||||||
|
NAuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined),
|
||||||
|
AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined),
|
||||||
|
case NAuthMethod =:= undefined orelse NAuthMethod =/= AuthMethod of
|
||||||
|
true ->
|
||||||
|
{error, emqx_reason_codes:connack_error(bad_authentication_method), Channel};
|
||||||
|
false ->
|
||||||
|
do_enhanced_auth(AuthMethod, AuthData, Channel)
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_enhanced_auth(undefined, undefined, Channel) ->
|
||||||
|
{ok, #{}, Channel};
|
||||||
|
do_enhanced_auth(undefined, _AuthData, Channel) ->
|
||||||
|
{error, emqx_reason_codes:connack_error(not_authorized), Channel};
|
||||||
|
do_enhanced_auth(_AuthMethod, undefined, Channel) ->
|
||||||
|
{error, emqx_reason_codes:connack_error(not_authorized), Channel};
|
||||||
|
do_enhanced_auth(AuthMethod, AuthData, Channel = #channel{auth_cache = Cache}) ->
|
||||||
|
case do_auth_check(AuthMethod, AuthData, Cache) of
|
||||||
|
ok -> {ok, #{}, Channel#channel{auth_cache = #{}}};
|
||||||
|
{ok, NAuthData} ->
|
||||||
|
NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
|
||||||
|
{ok, NProperties, Channel#channel{auth_cache = #{}}};
|
||||||
|
{continue, NAuthData, NCache} ->
|
||||||
|
NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData},
|
||||||
|
{continue, NProperties, Channel#channel{auth_cache = NCache}};
|
||||||
|
{error, _Reason} ->
|
||||||
|
{error, emqx_reason_codes:connack_error(not_authorized), Channel}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_auth_check(_AuthMethod, _AuthData, _AuthDataCache) ->
|
||||||
|
{error, not_authorized}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Process Topic Alias
|
%% Process Topic Alias
|
||||||
|
|
||||||
|
@ -1259,14 +1343,12 @@ enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo,
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Ensure connected
|
%% Ensure connected
|
||||||
|
|
||||||
ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo,
|
ensure_connected(Channel = #channel{conninfo = ConnInfo,
|
||||||
clientinfo = ClientInfo}) ->
|
clientinfo = ClientInfo}) ->
|
||||||
NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)},
|
NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)},
|
||||||
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
|
ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
|
||||||
Channel#channel{conninfo = NConnInfo,
|
Channel#channel{conninfo = NConnInfo,
|
||||||
conn_state = connected,
|
conn_state = connected
|
||||||
will_msg = emqx_packet:will_msg(ConnPkt),
|
|
||||||
alias_maximum = init_alias_maximum(ConnPkt, ClientInfo)
|
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -22,6 +22,8 @@
|
||||||
|
|
||||||
-callback(unload(State :: term()) -> term()).
|
-callback(unload(State :: term()) -> term()).
|
||||||
|
|
||||||
|
-callback(description() -> any()).
|
||||||
|
|
||||||
-else.
|
-else.
|
||||||
|
|
||||||
-export([behaviour_info/1]).
|
-export([behaviour_info/1]).
|
||||||
|
|
|
@ -44,17 +44,16 @@
|
||||||
start() ->
|
start() ->
|
||||||
lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])).
|
lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])).
|
||||||
|
|
||||||
-spec(start_listener(listener()) -> {ok, pid()} | {error, term()}).
|
-spec(start_listener(listener()) -> ok).
|
||||||
start_listener({Proto, ListenOn, Options}) ->
|
start_listener({Proto, ListenOn, Options}) ->
|
||||||
StartRet = start_listener(Proto, ListenOn, Options),
|
case start_listener(Proto, ListenOn, Options) of
|
||||||
case StartRet of
|
|
||||||
{ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n",
|
{ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n",
|
||||||
[Proto, format(ListenOn)]);
|
[Proto, format(ListenOn)]);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~0p~n!",
|
io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~0p~n!",
|
||||||
[Proto, format(ListenOn), Reason])
|
[Proto, format(ListenOn), Reason]),
|
||||||
end,
|
error(Reason)
|
||||||
StartRet.
|
end.
|
||||||
|
|
||||||
%% Start MQTT/TCP listener
|
%% Start MQTT/TCP listener
|
||||||
-spec(start_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
|
-spec(start_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
|
||||||
|
|
|
@ -24,14 +24,13 @@
|
||||||
-logger_header("[ACL_INTERNAL]").
|
-logger_header("[ACL_INTERNAL]").
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([ all_rules/0
|
-export([check_acl/5]).
|
||||||
, check_acl/5
|
|
||||||
, reload_acl/0
|
|
||||||
]).
|
|
||||||
|
|
||||||
%% emqx_gen_mod callbacks
|
%% emqx_gen_mod callbacks
|
||||||
-export([ load/1
|
-export([ load/1
|
||||||
, unload/1
|
, unload/1
|
||||||
|
, reload/1
|
||||||
|
, description/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(MFA(M, F, A), {M, F, A}).
|
-define(MFA(M, F, A), {M, F, A}).
|
||||||
|
@ -44,18 +43,19 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
load(_Env) ->
|
load(_Env) ->
|
||||||
Rules = rules_from_file(acl_file()),
|
Rules = rules_from_file(emqx:get_env(acl_file)),
|
||||||
emqx_hooks:add('client.check_acl', ?MFA(?MODULE, check_acl, [Rules]), -1).
|
emqx_hooks:add('client.check_acl', ?MFA(?MODULE, check_acl, [Rules]), -1).
|
||||||
|
|
||||||
unload(_Env) ->
|
unload(_Env) ->
|
||||||
Rules = rules_from_file(acl_file()),
|
Rules = rules_from_file(emqx:get_env(acl_file)),
|
||||||
emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])).
|
emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])).
|
||||||
|
|
||||||
%% @doc Read all rules
|
reload(_Env) ->
|
||||||
-spec(all_rules() -> list(emqx_access_rule:rule())).
|
emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(),
|
||||||
all_rules() ->
|
unload([]), load([]).
|
||||||
rules_from_file(acl_file()).
|
|
||||||
|
|
||||||
|
description() ->
|
||||||
|
"EMQ X Internal ACL Module".
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% ACL callbacks
|
%% ACL callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -71,16 +71,9 @@ check_acl(Client, PubSub, Topic, _AclResult, Rules) ->
|
||||||
nomatch -> ok
|
nomatch -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(reload_acl() -> ok | {error, term()}).
|
|
||||||
reload_acl() ->
|
|
||||||
unload([]), load([]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
acl_file() -> emqx:get_env(acl_file).
|
|
||||||
|
|
||||||
lookup(PubSub, Rules) ->
|
lookup(PubSub, Rules) ->
|
||||||
maps:get(PubSub, Rules, []).
|
maps:get(PubSub, Rules, []).
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
-module(emqx_mod_delayed).
|
-module(emqx_mod_delayed).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
-behaviour(emqx_gen_mod).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
@ -24,6 +25,7 @@
|
||||||
%% emqx_gen_mod callbacks
|
%% emqx_gen_mod callbacks
|
||||||
-export([ load/1
|
-export([ load/1
|
||||||
, unload/1
|
, unload/1
|
||||||
|
, description/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ start_link/0
|
-export([ start_link/0
|
||||||
|
@ -62,6 +64,8 @@ unload(_Env) ->
|
||||||
emqx:unhook('message.publish', {?MODULE, on_message_publish, []}),
|
emqx:unhook('message.publish', {?MODULE, on_message_publish, []}),
|
||||||
emqx_mod_sup:stop_child(?MODULE).
|
emqx_mod_sup:stop_child(?MODULE).
|
||||||
|
|
||||||
|
description() ->
|
||||||
|
"EMQ X Delayed Publish Module".
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Hooks
|
%% Hooks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
%% emqx_gen_mod callbacks
|
%% emqx_gen_mod callbacks
|
||||||
-export([ load/1
|
-export([ load/1
|
||||||
, unload/1
|
, unload/1
|
||||||
|
, description/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ on_client_connected/3
|
-export([ on_client_connected/3
|
||||||
|
@ -44,6 +45,8 @@ unload(_Env) ->
|
||||||
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),
|
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),
|
||||||
emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
|
emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
|
||||||
|
|
||||||
|
description() ->
|
||||||
|
"EMQ X Presence Module".
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Callbacks
|
%% Callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
|
|
||||||
-module(emqx_mod_rewrite).
|
-module(emqx_mod_rewrite).
|
||||||
|
|
||||||
-behavior(emqx_gen_mod).
|
-behaviour(emqx_gen_mod).
|
||||||
|
|
||||||
-include_lib("emqx.hrl").
|
-include_lib("emqx.hrl").
|
||||||
-include_lib("emqx_mqtt.hrl").
|
-include_lib("emqx_mqtt.hrl").
|
||||||
|
@ -36,6 +36,7 @@
|
||||||
%% emqx_gen_mod callbacks
|
%% emqx_gen_mod callbacks
|
||||||
-export([ load/1
|
-export([ load/1
|
||||||
, unload/1
|
, unload/1
|
||||||
|
, description/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -62,6 +63,8 @@ unload(_) ->
|
||||||
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
|
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
|
||||||
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
|
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
|
||||||
|
|
||||||
|
description() ->
|
||||||
|
"EMQ X Topic Rewrite Module".
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
%% emqx_gen_mod callbacks
|
%% emqx_gen_mod callbacks
|
||||||
-export([ load/1
|
-export([ load/1
|
||||||
, unload/1
|
, unload/1
|
||||||
|
, description/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
|
@ -49,6 +50,8 @@ on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #
|
||||||
unload(_) ->
|
unload(_) ->
|
||||||
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
|
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
|
||||||
|
|
||||||
|
description() ->
|
||||||
|
"EMQ X Subscription Module".
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -58,5 +58,6 @@ stop_child(ChildId) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
|
ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]),
|
||||||
{ok, {{one_for_one, 10, 100}, []}}.
|
{ok, {{one_for_one, 10, 100}, []}}.
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
-module(emqx_mod_topic_metrics).
|
-module(emqx_mod_topic_metrics).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
-behaviour(emqx_gen_mod).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
@ -26,6 +27,7 @@
|
||||||
|
|
||||||
-export([ load/1
|
-export([ load/1
|
||||||
, unload/1
|
, unload/1
|
||||||
|
, description/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ on_message_publish/1
|
-export([ on_message_publish/1
|
||||||
|
@ -104,6 +106,9 @@ unload(_Env) ->
|
||||||
emqx:unhook('message.delivered', fun ?MODULE:on_message_delivered/2),
|
emqx:unhook('message.delivered', fun ?MODULE:on_message_delivered/2),
|
||||||
emqx_mod_sup:stop_child(?MODULE).
|
emqx_mod_sup:stop_child(?MODULE).
|
||||||
|
|
||||||
|
description() ->
|
||||||
|
"EMQ X Topic Metrics Module".
|
||||||
|
|
||||||
on_message_publish(#message{topic = Topic, qos = QoS}) ->
|
on_message_publish(#message{topic = Topic, qos = QoS}) ->
|
||||||
case is_registered(Topic) of
|
case is_registered(Topic) of
|
||||||
true ->
|
true ->
|
||||||
|
|
|
@ -20,28 +20,149 @@
|
||||||
|
|
||||||
-logger_header("[Modules]").
|
-logger_header("[Modules]").
|
||||||
|
|
||||||
-export([ load/0
|
-export([ list/0
|
||||||
|
, load/0
|
||||||
|
, load/1
|
||||||
, unload/0
|
, unload/0
|
||||||
|
, unload/1
|
||||||
|
, reload/1
|
||||||
|
, load_module/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% @doc List all available plugins
|
||||||
|
-spec(list() -> [{atom(), boolean()}]).
|
||||||
|
list() ->
|
||||||
|
ets:tab2list(?MODULE).
|
||||||
|
|
||||||
%% @doc Load all the extended modules.
|
%% @doc Load all the extended modules.
|
||||||
-spec(load() -> ok).
|
-spec(load() -> ok).
|
||||||
load() ->
|
load() ->
|
||||||
ok = emqx_mod_acl_internal:load([]),
|
case emqx:get_env(modules_loaded_file) of
|
||||||
lists:foreach(fun load/1, modules()).
|
undefined -> ignore;
|
||||||
|
File ->
|
||||||
|
load_modules(File)
|
||||||
|
end.
|
||||||
|
|
||||||
load({Mod, Env}) ->
|
load(ModuleName) ->
|
||||||
ok = Mod:load(Env),
|
case find_module(ModuleName) of
|
||||||
?LOG(info, "Load ~s module successfully.", [Mod]).
|
[] ->
|
||||||
|
?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]),
|
||||||
modules() -> emqx:get_env(modules, []).
|
{error, not_found};
|
||||||
|
[{ModuleName, true}] ->
|
||||||
|
?LOG(notice, "Module ~s is already started", [ModuleName]),
|
||||||
|
{error, already_started};
|
||||||
|
[{ModuleName, false}] ->
|
||||||
|
emqx_modules:load_module(ModuleName, true)
|
||||||
|
end.
|
||||||
|
|
||||||
%% @doc Unload all the extended modules.
|
%% @doc Unload all the extended modules.
|
||||||
-spec(unload() -> ok).
|
-spec(unload() -> ok).
|
||||||
unload() ->
|
unload() ->
|
||||||
ok = emqx_mod_acl_internal:unload([]),
|
case emqx:get_env(modules_loaded_file) of
|
||||||
lists:foreach(fun unload/1, modules()).
|
undefined -> ignore;
|
||||||
|
File ->
|
||||||
|
unload_modules(File)
|
||||||
|
end.
|
||||||
|
|
||||||
unload({Mod, Env}) ->
|
unload(ModuleName) ->
|
||||||
Mod:unload(Env).
|
case find_module(ModuleName) of
|
||||||
|
[] ->
|
||||||
|
?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]),
|
||||||
|
{error, not_found};
|
||||||
|
[{ModuleName, false}] ->
|
||||||
|
?LOG(error, "Module ~s is not started", [ModuleName]),
|
||||||
|
{error, not_started};
|
||||||
|
[{ModuleName, true}] ->
|
||||||
|
unload_module(ModuleName, true)
|
||||||
|
end.
|
||||||
|
|
||||||
|
reload(emqx_mod_acl_internal) ->
|
||||||
|
Modules = emqx:get_env(modules, []),
|
||||||
|
Env = proplists:get_value(emqx_mod_acl_internal, Modules, undefined),
|
||||||
|
case emqx_mod_acl_internal:reload(Env) of
|
||||||
|
ok ->
|
||||||
|
?LOG(info, "Reload ~s module successfully.", [emqx_mod_acl_internal]);
|
||||||
|
{error, Error} ->
|
||||||
|
?LOG(error, "Reload module ~s failed, cannot start for ~0p", [emqx_mod_acl_internal, Error])
|
||||||
|
end;
|
||||||
|
reload(_) ->
|
||||||
|
ignore.
|
||||||
|
|
||||||
|
find_module(ModuleName) ->
|
||||||
|
ets:lookup(?MODULE, ModuleName).
|
||||||
|
|
||||||
|
filter_module(ModuleNames) ->
|
||||||
|
filter_module(ModuleNames, emqx:get_env(modules, [])).
|
||||||
|
filter_module([], Acc) ->
|
||||||
|
Acc;
|
||||||
|
filter_module([{ModuleName, true} | ModuleNames], Acc) ->
|
||||||
|
filter_module(ModuleNames, lists:keydelete(ModuleName, 1, Acc));
|
||||||
|
filter_module([{_, false} | ModuleNames], Acc) ->
|
||||||
|
filter_module(ModuleNames, Acc).
|
||||||
|
|
||||||
|
load_modules(File) ->
|
||||||
|
case file:consult(File) of
|
||||||
|
{ok, ModuleNames} ->
|
||||||
|
lists:foreach(fun({ModuleName, _}) ->
|
||||||
|
ets:insert(?MODULE, {ModuleName, false})
|
||||||
|
end, filter_module(ModuleNames)),
|
||||||
|
lists:foreach(fun load_module/1, ModuleNames);
|
||||||
|
{error, Error} ->
|
||||||
|
?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error])
|
||||||
|
end.
|
||||||
|
|
||||||
|
load_module({ModuleName, true}) ->
|
||||||
|
emqx_modules:load_module(ModuleName, false);
|
||||||
|
load_module({ModuleName, false}) ->
|
||||||
|
ets:insert(?MODULE, {ModuleName, false});
|
||||||
|
load_module(ModuleName) ->
|
||||||
|
load_module({ModuleName, true}).
|
||||||
|
|
||||||
|
load_module(ModuleName, Persistent) ->
|
||||||
|
Modules = emqx:get_env(modules, []),
|
||||||
|
Env = proplists:get_value(ModuleName, Modules, undefined),
|
||||||
|
case ModuleName:load(Env) of
|
||||||
|
ok ->
|
||||||
|
ets:insert(?MODULE, {ModuleName, true}),
|
||||||
|
write_loaded(Persistent),
|
||||||
|
?LOG(info, "Load ~s module successfully.", [ModuleName]);
|
||||||
|
{error, Error} ->
|
||||||
|
?LOG(error, "Load module ~s failed, cannot load for ~0p", [ModuleName, Error]),
|
||||||
|
{error, Error}
|
||||||
|
end.
|
||||||
|
|
||||||
|
unload_modules(File) ->
|
||||||
|
case file:consult(File) of
|
||||||
|
{ok, ModuleNames} ->
|
||||||
|
lists:foreach(fun unload_module/1, ModuleNames);
|
||||||
|
{error, Error} ->
|
||||||
|
?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error])
|
||||||
|
end.
|
||||||
|
unload_module({ModuleName, true}) ->
|
||||||
|
unload_module(ModuleName, false);
|
||||||
|
unload_module({ModuleName, false}) ->
|
||||||
|
ets:insert(?MODULE, {ModuleName, false});
|
||||||
|
unload_module(ModuleName) ->
|
||||||
|
unload_module({ModuleName, true}).
|
||||||
|
|
||||||
|
unload_module(ModuleName, Persistent) ->
|
||||||
|
Modules = emqx:get_env(modules, []),
|
||||||
|
Env = proplists:get_value(ModuleName, Modules, undefined),
|
||||||
|
case ModuleName:unload(Env) of
|
||||||
|
ok ->
|
||||||
|
ets:insert(?MODULE, {ModuleName, false}),
|
||||||
|
write_loaded(Persistent),
|
||||||
|
?LOG(info, "Unload ~s module successfully.", [ModuleName]);
|
||||||
|
{error, Error} ->
|
||||||
|
?LOG(error, "Unload module ~s failed, cannot unload for ~0p", [ModuleName, Error])
|
||||||
|
end.
|
||||||
|
|
||||||
|
write_loaded(true) ->
|
||||||
|
FilePath = emqx:get_env(modules_loaded_file),
|
||||||
|
case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- list()]) of
|
||||||
|
ok -> ok;
|
||||||
|
{error, Error} ->
|
||||||
|
?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]),
|
||||||
|
{error, Error}
|
||||||
|
end;
|
||||||
|
write_loaded(false) -> ok.
|
|
@ -169,6 +169,7 @@ compat(_Other, _Code) -> undefined.
|
||||||
frame_error(frame_too_large) -> ?RC_PACKET_TOO_LARGE;
|
frame_error(frame_too_large) -> ?RC_PACKET_TOO_LARGE;
|
||||||
frame_error(_) -> ?RC_MALFORMED_PACKET.
|
frame_error(_) -> ?RC_MALFORMED_PACKET.
|
||||||
|
|
||||||
|
connack_error(protocol_error) -> ?RC_PROTOCOL_ERROR;
|
||||||
connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID;
|
connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID;
|
||||||
connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
|
connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
|
||||||
connack_error(bad_clientid_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
|
connack_error(bad_clientid_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
|
||||||
|
|
|
@ -191,7 +191,13 @@ init(Req, Opts) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
websocket_init([Req, Opts]) ->
|
websocket_init([Req, Opts]) ->
|
||||||
Peername = cowboy_req:peer(Req),
|
Peername = case proplists:get_bool(proxy_protocol, Opts)
|
||||||
|
andalso maps:get(proxy_header, Req) of
|
||||||
|
#{src_address := SrcAddr, src_port := SrcPort} ->
|
||||||
|
{SrcAddr, SrcPort};
|
||||||
|
_ ->
|
||||||
|
cowboy_req:peer(Req)
|
||||||
|
end,
|
||||||
Sockname = cowboy_req:sock(Req),
|
Sockname = cowboy_req:sock(Req),
|
||||||
Peercert = cowboy_req:cert(Req),
|
Peercert = cowboy_req:cert(Req),
|
||||||
WsCookie = try cowboy_req:parse_cookies(Req)
|
WsCookie = try cowboy_req:parse_cookies(Req)
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
{emqx_mod_acl_internal, true}.
|
||||||
|
{emqx_mod_presence, true}.
|
|
@ -1,378 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_access_SUITE).
|
|
||||||
|
|
||||||
-compile(export_all).
|
|
||||||
-compile(nowarn_export_all).
|
|
||||||
|
|
||||||
-include("emqx.hrl").
|
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
|
||||||
-include_lib("common_test/include/ct.hrl").
|
|
||||||
|
|
||||||
-define(AC, emqx_access_control).
|
|
||||||
-define(CACHE, emqx_acl_cache).
|
|
||||||
|
|
||||||
-import(emqx_access_rule,
|
|
||||||
[ compile/1
|
|
||||||
, match/3
|
|
||||||
]).
|
|
||||||
|
|
||||||
all() ->
|
|
||||||
[{group, access_control},
|
|
||||||
{group, acl_cache},
|
|
||||||
{group, access_control_cache_mode},
|
|
||||||
{group, access_rule}
|
|
||||||
].
|
|
||||||
|
|
||||||
groups() ->
|
|
||||||
[{access_control, [sequence],
|
|
||||||
[t_reload_acl,
|
|
||||||
t_check_acl_1,
|
|
||||||
t_check_acl_2]},
|
|
||||||
{access_control_cache_mode, [sequence],
|
|
||||||
[t_acl_cache_basic,
|
|
||||||
t_acl_cache_expiry,
|
|
||||||
t_acl_cache_cleanup,
|
|
||||||
t_acl_cache_full]},
|
|
||||||
{acl_cache, [sequence],
|
|
||||||
[t_put_get_del_cache,
|
|
||||||
t_cache_update,
|
|
||||||
t_cache_expiry,
|
|
||||||
t_cache_replacement,
|
|
||||||
t_cache_cleanup,
|
|
||||||
t_cache_auto_emtpy,
|
|
||||||
t_cache_auto_cleanup]},
|
|
||||||
{access_rule, [parallel],
|
|
||||||
[t_compile_rule,
|
|
||||||
t_match_rule]
|
|
||||||
}].
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
|
||||||
emqx_ct_helpers:boot_modules([router, broker]),
|
|
||||||
emqx_ct_helpers:start_apps([]),
|
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
|
||||||
emqx_ct_helpers:stop_apps([]).
|
|
||||||
|
|
||||||
init_per_group(Group, Config) when Group =:= access_control;
|
|
||||||
Group =:= access_control_cache_mode ->
|
|
||||||
prepare_config(Group),
|
|
||||||
application:load(emqx),
|
|
||||||
Config;
|
|
||||||
init_per_group(_Group, Config) ->
|
|
||||||
Config.
|
|
||||||
|
|
||||||
prepare_config(Group = access_control) ->
|
|
||||||
set_acl_config_file(Group),
|
|
||||||
application:set_env(emqx, enable_acl_cache, false);
|
|
||||||
prepare_config(Group = access_control_cache_mode) ->
|
|
||||||
set_acl_config_file(Group),
|
|
||||||
application:set_env(emqx, enable_acl_cache, true),
|
|
||||||
application:set_env(emqx, acl_cache_max_size, 100).
|
|
||||||
|
|
||||||
set_acl_config_file(_Group) ->
|
|
||||||
Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]},
|
|
||||||
{allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]},
|
|
||||||
{allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]},
|
|
||||||
{allow, {client, "testClient"}, subscribe, ["testTopics/testClient"]},
|
|
||||||
{allow, all, subscribe, ["clients/%c"]},
|
|
||||||
{allow, all, pubsub, ["users/%u/#"]},
|
|
||||||
{deny, all, subscribe, ["$SYS/#", "#"]},
|
|
||||||
{deny, all}],
|
|
||||||
write_config("access_SUITE_acl.conf", Rules),
|
|
||||||
application:set_env(emqx, acl_file, "access_SUITE_acl.conf").
|
|
||||||
|
|
||||||
write_config(Filename, Terms) ->
|
|
||||||
file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]).
|
|
||||||
|
|
||||||
end_per_group(_Group, Config) ->
|
|
||||||
Config.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% emqx_access_control
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
t_reload_acl(_) ->
|
|
||||||
ok = ?AC:reload_acl().
|
|
||||||
|
|
||||||
t_check_acl_1(_) ->
|
|
||||||
Client = #{zone => external,
|
|
||||||
clientid => <<"client1">>,
|
|
||||||
username => <<"testuser">>
|
|
||||||
},
|
|
||||||
allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>),
|
|
||||||
allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>),
|
|
||||||
deny = ?AC:check_acl(Client, subscribe, <<"clients/client1/x/y">>),
|
|
||||||
allow = ?AC:check_acl(Client, publish, <<"users/testuser/1">>),
|
|
||||||
allow = ?AC:check_acl(Client, subscribe, <<"a/b/c">>).
|
|
||||||
|
|
||||||
t_check_acl_2(_) ->
|
|
||||||
Client = #{zone => external,
|
|
||||||
clientid => <<"client2">>,
|
|
||||||
username => <<"xyz">>
|
|
||||||
},
|
|
||||||
deny = ?AC:check_acl(Client, subscribe, <<"a/b/c">>).
|
|
||||||
|
|
||||||
t_acl_cache_basic(_) ->
|
|
||||||
Client = #{zone => external,
|
|
||||||
clientid => <<"client1">>,
|
|
||||||
username => <<"testuser">>
|
|
||||||
},
|
|
||||||
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
|
||||||
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
|
|
||||||
|
|
||||||
allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>),
|
|
||||||
allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>),
|
|
||||||
|
|
||||||
allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
|
||||||
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>).
|
|
||||||
|
|
||||||
t_acl_cache_expiry(_) ->
|
|
||||||
application:set_env(emqx, acl_cache_ttl, 100),
|
|
||||||
Client = #{zone => external,
|
|
||||||
clientid => <<"client1">>,
|
|
||||||
username => <<"testuser">>
|
|
||||||
},
|
|
||||||
allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>),
|
|
||||||
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
|
|
||||||
ct:sleep(150),
|
|
||||||
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>).
|
|
||||||
|
|
||||||
t_acl_cache_full(_) ->
|
|
||||||
application:set_env(emqx, acl_cache_max_size, 1),
|
|
||||||
Client = #{zone => external,
|
|
||||||
clientid => <<"client1">>,
|
|
||||||
username => <<"testuser">>
|
|
||||||
},
|
|
||||||
allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>),
|
|
||||||
allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>),
|
|
||||||
|
|
||||||
%% the older ones (the <<"users/testuser/1">>) will be evicted first
|
|
||||||
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
|
||||||
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>).
|
|
||||||
|
|
||||||
t_acl_cache_cleanup(_) ->
|
|
||||||
%% The acl cache will try to evict memory, if the size is full and the newest
|
|
||||||
%% cache entry is expired
|
|
||||||
application:set_env(emqx, acl_cache_ttl, 100),
|
|
||||||
application:set_env(emqx, acl_cache_max_size, 2),
|
|
||||||
Client = #{zone => external,
|
|
||||||
clientid => <<"client1">>,
|
|
||||||
username => <<"testuser">>
|
|
||||||
},
|
|
||||||
allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>),
|
|
||||||
allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>),
|
|
||||||
|
|
||||||
allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
|
||||||
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
|
|
||||||
|
|
||||||
ct:sleep(150),
|
|
||||||
%% now the cache is full and the newest one - "clients/client1"
|
|
||||||
%% should be expired, so we'll empty the cache before putting
|
|
||||||
%% the next cache entry
|
|
||||||
deny = ?AC:check_acl(Client, subscribe, <<"#">>),
|
|
||||||
|
|
||||||
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
|
|
||||||
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
|
|
||||||
deny = ?CACHE:get_acl_cache(subscribe, <<"#">>).
|
|
||||||
|
|
||||||
t_put_get_del_cache(_) ->
|
|
||||||
application:set_env(emqx, acl_cache_ttl, 300000),
|
|
||||||
application:set_env(emqx, acl_cache_max_size, 30),
|
|
||||||
|
|
||||||
not_found = ?CACHE:get_acl_cache(publish, <<"a">>),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"a">>, allow),
|
|
||||||
allow = ?CACHE:get_acl_cache(publish, <<"a">>),
|
|
||||||
|
|
||||||
not_found = ?CACHE:get_acl_cache(subscribe, <<"b">>),
|
|
||||||
ok = ?CACHE:put_acl_cache(subscribe, <<"b">>, deny),
|
|
||||||
deny = ?CACHE:get_acl_cache(subscribe, <<"b">>),
|
|
||||||
|
|
||||||
2 = ?CACHE:get_cache_size(),
|
|
||||||
?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()).
|
|
||||||
|
|
||||||
t_cache_expiry(_) ->
|
|
||||||
application:set_env(emqx, acl_cache_ttl, 100),
|
|
||||||
application:set_env(emqx, acl_cache_max_size, 30),
|
|
||||||
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
|
|
||||||
allow = ?CACHE:get_acl_cache(subscribe, <<"a">>),
|
|
||||||
|
|
||||||
ct:sleep(150),
|
|
||||||
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>),
|
|
||||||
|
|
||||||
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny),
|
|
||||||
deny = ?CACHE:get_acl_cache(subscribe, <<"a">>),
|
|
||||||
|
|
||||||
ct:sleep(150),
|
|
||||||
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>).
|
|
||||||
|
|
||||||
t_cache_update(_) ->
|
|
||||||
application:set_env(emqx, acl_cache_ttl, 300000),
|
|
||||||
application:set_env(emqx, acl_cache_max_size, 30),
|
|
||||||
[] = ?CACHE:dump_acl_cache(),
|
|
||||||
|
|
||||||
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
|
|
||||||
3 = ?CACHE:get_cache_size(),
|
|
||||||
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()),
|
|
||||||
|
|
||||||
%% update the 2nd one
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
|
|
||||||
ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]),
|
|
||||||
|
|
||||||
3 = ?CACHE:get_cache_size(),
|
|
||||||
?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()),
|
|
||||||
?assertEqual(?CACHE:cache_k(subscribe, <<"a">>), ?CACHE:get_oldest_key()).
|
|
||||||
|
|
||||||
t_cache_replacement(_) ->
|
|
||||||
application:set_env(emqx, acl_cache_ttl, 300000),
|
|
||||||
application:set_env(emqx, acl_cache_max_size, 3),
|
|
||||||
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
|
|
||||||
allow = ?CACHE:get_acl_cache(subscribe, <<"a">>),
|
|
||||||
allow = ?CACHE:get_acl_cache(publish, <<"b">>),
|
|
||||||
allow = ?CACHE:get_acl_cache(publish, <<"c">>),
|
|
||||||
3 = ?CACHE:get_cache_size(),
|
|
||||||
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()),
|
|
||||||
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny),
|
|
||||||
3 = ?CACHE:get_cache_size(),
|
|
||||||
?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()),
|
|
||||||
?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_oldest_key()),
|
|
||||||
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny),
|
|
||||||
3 = ?CACHE:get_cache_size(),
|
|
||||||
?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()),
|
|
||||||
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()),
|
|
||||||
|
|
||||||
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>),
|
|
||||||
not_found = ?CACHE:get_acl_cache(publish, <<"b">>),
|
|
||||||
allow = ?CACHE:get_acl_cache(publish, <<"c">>).
|
|
||||||
|
|
||||||
t_cache_cleanup(_) ->
|
|
||||||
application:set_env(emqx, acl_cache_ttl, 100),
|
|
||||||
application:set_env(emqx, acl_cache_max_size, 30),
|
|
||||||
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
|
|
||||||
ct:sleep(150),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
|
|
||||||
3 = ?CACHE:get_cache_size(),
|
|
||||||
|
|
||||||
?CACHE:cleanup_acl_cache(),
|
|
||||||
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()),
|
|
||||||
1 = ?CACHE:get_cache_size().
|
|
||||||
|
|
||||||
t_cache_auto_emtpy(_) ->
|
|
||||||
%% verify cache is emptied when cache full and even the newest
|
|
||||||
%% one is expired.
|
|
||||||
application:set_env(emqx, acl_cache_ttl, 100),
|
|
||||||
application:set_env(emqx, acl_cache_max_size, 3),
|
|
||||||
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
|
|
||||||
3 = ?CACHE:get_cache_size(),
|
|
||||||
|
|
||||||
ct:sleep(150),
|
|
||||||
ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny),
|
|
||||||
1 = ?CACHE:get_cache_size().
|
|
||||||
|
|
||||||
t_cache_auto_cleanup(_) ->
|
|
||||||
%% verify we'll cleanup expired entries when we got a exipired acl
|
|
||||||
%% from cache.
|
|
||||||
application:set_env(emqx, acl_cache_ttl, 100),
|
|
||||||
application:set_env(emqx, acl_cache_max_size, 30),
|
|
||||||
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
|
|
||||||
ct:sleep(150),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
|
|
||||||
ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny),
|
|
||||||
4 = ?CACHE:get_cache_size(),
|
|
||||||
|
|
||||||
%% "a" and "b" expires, while "c" and "d" not
|
|
||||||
not_found = ?CACHE:get_acl_cache(publish, <<"b">>),
|
|
||||||
2 = ?CACHE:get_cache_size(),
|
|
||||||
|
|
||||||
ct:sleep(150), %% now "c" and "d" expires
|
|
||||||
not_found = ?CACHE:get_acl_cache(publish, <<"c">>),
|
|
||||||
0 = ?CACHE:get_cache_size().
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% emqx_access_rule
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
t_compile_rule(_) ->
|
|
||||||
{allow, {'and', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}},
|
|
||||||
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
|
|
||||||
compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}),
|
|
||||||
{allow, {'or', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}},
|
|
||||||
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
|
|
||||||
compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}),
|
|
||||||
|
|
||||||
{allow, {ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
|
|
||||||
compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}),
|
|
||||||
{allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]} =
|
|
||||||
compile({allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}),
|
|
||||||
{allow, {user, <<"admin">>}, pubsub, [ [<<"d">>, <<"e">>, <<"f">>, '#'] ]} =
|
|
||||||
compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]}),
|
|
||||||
{allow, {client, <<"testClient">>}, publish, [ [<<"testTopics">>, <<"testClient">>] ]} =
|
|
||||||
compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}),
|
|
||||||
{allow, all, pubsub, [{pattern, [<<"clients">>, <<"%c">>]}]} =
|
|
||||||
compile({allow, all, pubsub, ["clients/%c"]}),
|
|
||||||
{allow, all, subscribe, [{pattern, [<<"users">>, <<"%u">>, '#']}]} =
|
|
||||||
compile({allow, all, subscribe, ["users/%u/#"]}),
|
|
||||||
{deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
|
|
||||||
compile({deny, all, subscribe, ["$SYS/#", "#"]}),
|
|
||||||
{allow, all} = compile({allow, all}),
|
|
||||||
{deny, all} = compile({deny, all}).
|
|
||||||
|
|
||||||
t_match_rule(_) ->
|
|
||||||
ClientInfo1 = #{zone => external,
|
|
||||||
clientid => <<"testClient">>,
|
|
||||||
username => <<"TestUser">>,
|
|
||||||
peerhost => {127,0,0,1}
|
|
||||||
},
|
|
||||||
ClientInfo2 = #{zone => external,
|
|
||||||
clientid => <<"testClient">>,
|
|
||||||
username => <<"TestUser">>,
|
|
||||||
peerhost => {192,168,0,10}
|
|
||||||
},
|
|
||||||
{matched, allow} = match(ClientInfo1, <<"Test/Topic">>, {allow, all}),
|
|
||||||
{matched, deny} = match(ClientInfo1, <<"Test/Topic">>, {deny, all}),
|
|
||||||
{matched, allow} = match(ClientInfo1, <<"Test/Topic">>,
|
|
||||||
compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})),
|
|
||||||
{matched, allow} = match(ClientInfo2, <<"Test/Topic">>,
|
|
||||||
compile({allow, {ipaddr, "192.168.0.1/24"}, subscribe, ["$SYS/#", "#"]})),
|
|
||||||
{matched, allow} = match(ClientInfo1, <<"d/e/f/x">>,
|
|
||||||
compile({allow, {user, "TestUser"}, subscribe, ["a/b/c", "d/e/f/#"]})),
|
|
||||||
nomatch = match(ClientInfo1, <<"d/e/f/x">>, compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]})),
|
|
||||||
{matched, allow} = match(ClientInfo1, <<"testTopics/testClient">>,
|
|
||||||
compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]})),
|
|
||||||
{matched, allow} = match(ClientInfo1, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/%c"]})),
|
|
||||||
{matched, allow} = match(#{username => <<"user2">>}, <<"users/user2/abc/def">>,
|
|
||||||
compile({allow, all, subscribe, ["users/%u/#"]})),
|
|
||||||
{matched, deny} = match(ClientInfo1, <<"d/e/f">>, compile({deny, all, subscribe, ["$SYS/#", "#"]})),
|
|
||||||
Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}),
|
|
||||||
nomatch = match(ClientInfo1, <<"Topic">>, Rule),
|
|
||||||
AndRule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"TestUser">>}]}, publish, <<"Topic">>}),
|
|
||||||
{matched, allow} = match(ClientInfo1, <<"Topic">>, AndRule),
|
|
||||||
OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}),
|
|
||||||
{matched, allow} = match(ClientInfo1, <<"Topic">>, OrRule).
|
|
||||||
|
|
|
@ -49,9 +49,6 @@ t_check_acl(_) ->
|
||||||
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
|
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
|
||||||
?assertEqual(allow, emqx_access_control:check_acl(clientinfo(), Publish, <<"t">>)).
|
?assertEqual(allow, emqx_access_control:check_acl(clientinfo(), Publish, <<"t">>)).
|
||||||
|
|
||||||
t_reload_acl(_) ->
|
|
||||||
?assertEqual(ok, emqx_access_control:reload_acl()).
|
|
||||||
|
|
||||||
t_bypass_auth_plugins(_) ->
|
t_bypass_auth_plugins(_) ->
|
||||||
AuthFun = fun(#{zone := bypass_zone}, AuthRes) ->
|
AuthFun = fun(#{zone := bypass_zone}, AuthRes) ->
|
||||||
{stop, AuthRes#{auth_result => password_error}};
|
{stop, AuthRes#{auth_result => password_error}};
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
%% ACL callbacks
|
%% ACL callbacks
|
||||||
-export([ init/1
|
-export([ init/1
|
||||||
, check_acl/2
|
, check_acl/2
|
||||||
, reload_acl/1
|
|
||||||
, description/0
|
, description/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -29,9 +28,6 @@ init(AclOpts) ->
|
||||||
check_acl({_User, _PubSub, _Topic}, _State) ->
|
check_acl({_User, _PubSub, _Topic}, _State) ->
|
||||||
allow.
|
allow.
|
||||||
|
|
||||||
reload_acl(_State) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
description() ->
|
description() ->
|
||||||
"Test ACL Mod".
|
"Test ACL Mod".
|
||||||
|
|
||||||
|
|
|
@ -112,6 +112,43 @@ t_handle_in_unexpected_connect_packet(_) ->
|
||||||
{ok, [{outgoing, Packet}, {close, protocol_error}], Channel} =
|
{ok, [{outgoing, Packet}, {close, protocol_error}], Channel} =
|
||||||
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
|
emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
|
||||||
|
|
||||||
|
t_handle_in_connect_auth_failed(_) ->
|
||||||
|
ConnPkt = #mqtt_packet_connect{
|
||||||
|
proto_name = <<"MQTT">>,
|
||||||
|
proto_ver = ?MQTT_PROTO_V5,
|
||||||
|
is_bridge = false,
|
||||||
|
clean_start = true,
|
||||||
|
keepalive = 30,
|
||||||
|
properties = #{
|
||||||
|
'Authentication-Method' => "failed_auth_method",
|
||||||
|
'Authentication-Data' => <<"failed_auth_data">>
|
||||||
|
},
|
||||||
|
clientid = <<"clientid">>,
|
||||||
|
username = <<"username">>
|
||||||
|
},
|
||||||
|
{shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} =
|
||||||
|
emqx_channel:handle_in(?CONNECT_PACKET(ConnPkt), channel(#{conn_state => idle})).
|
||||||
|
|
||||||
|
t_handle_in_continue_auth(_) ->
|
||||||
|
Properties = #{
|
||||||
|
'Authentication-Method' => "failed_auth_method",
|
||||||
|
'Authentication-Data' => <<"failed_auth_data">>
|
||||||
|
},
|
||||||
|
{shutdown, bad_authentication_method, ?CONNACK_PACKET(?RC_BAD_AUTHENTICATION_METHOD), _} =
|
||||||
|
emqx_channel:handle_in(?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION,Properties), channel()),
|
||||||
|
{shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} =
|
||||||
|
emqx_channel:handle_in(?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => Properties}})).
|
||||||
|
|
||||||
|
t_handle_in_re_auth(_) ->
|
||||||
|
Properties = #{
|
||||||
|
'Authentication-Method' => "failed_auth_method",
|
||||||
|
'Authentication-Data' => <<"failed_auth_data">>
|
||||||
|
},
|
||||||
|
{ok, [{outgoing, ?DISCONNECT_PACKET(?RC_BAD_AUTHENTICATION_METHOD)}, {close, bad_authentication_method}], _} =
|
||||||
|
emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel()),
|
||||||
|
{ok, [{outgoing, ?DISCONNECT_PACKET(?RC_NOT_AUTHORIZED)}, {close, not_authorized}], _} =
|
||||||
|
emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => Properties}})).
|
||||||
|
|
||||||
t_handle_in_qos0_publish(_) ->
|
t_handle_in_qos0_publish(_) ->
|
||||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
||||||
Channel = channel(#{conn_state => connected}),
|
Channel = channel(#{conn_state => connected}),
|
||||||
|
@ -286,7 +323,7 @@ t_process_connect(_) ->
|
||||||
{ok, #{session => session(), present => false}}
|
{ok, #{session => session(), present => false}}
|
||||||
end),
|
end),
|
||||||
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} =
|
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} =
|
||||||
emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})).
|
emqx_channel:process_connect(#{}, channel(#{conn_state => idle})).
|
||||||
|
|
||||||
t_process_publish_qos0(_) ->
|
t_process_publish_qos0(_) ->
|
||||||
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
||||||
|
@ -346,12 +383,12 @@ t_handle_out_publish_nl(_) ->
|
||||||
|
|
||||||
t_handle_out_connack_sucess(_) ->
|
t_handle_out_connack_sucess(_) ->
|
||||||
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
|
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
|
||||||
emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()),
|
emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, #{}}, channel()),
|
||||||
?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
|
?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
|
||||||
|
|
||||||
t_handle_out_connack_failure(_) ->
|
t_handle_out_connack_failure(_) ->
|
||||||
{shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} =
|
{shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} =
|
||||||
emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()).
|
emqx_channel:handle_out(connack, ?RC_NOT_AUTHORIZED, channel()).
|
||||||
|
|
||||||
t_handle_out_puback(_) ->
|
t_handle_out_puback(_) ->
|
||||||
Channel = channel(#{conn_state => connected}),
|
Channel = channel(#{conn_state => connected}),
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
|
@ -37,7 +38,7 @@ end_per_suite(_Config) ->
|
||||||
|
|
||||||
t_start_stop_listeners(_) ->
|
t_start_stop_listeners(_) ->
|
||||||
ok = emqx_listeners:start(),
|
ok = emqx_listeners:start(),
|
||||||
{error, _} = emqx_listeners:start_listener({ws,{"127.0.0.1", 8083}, []}),
|
?assertException(error, _, emqx_listeners:start_listener({ws,{"127.0.0.1", 8083}, []})),
|
||||||
ok = emqx_listeners:stop().
|
ok = emqx_listeners:stop().
|
||||||
|
|
||||||
t_restart_listeners(_) ->
|
t_restart_listeners(_) ->
|
||||||
|
|
|
@ -33,16 +33,9 @@ end_per_suite(_Config) ->
|
||||||
emqx_ct_helpers:stop_apps([]).
|
emqx_ct_helpers:stop_apps([]).
|
||||||
|
|
||||||
t_load_unload(_) ->
|
t_load_unload(_) ->
|
||||||
?assertEqual({error,already_exists}, emqx_mod_acl_internal:load([])),
|
|
||||||
?assertEqual(ok, emqx_mod_acl_internal:unload([])),
|
?assertEqual(ok, emqx_mod_acl_internal:unload([])),
|
||||||
?assertEqual(ok, emqx_mod_acl_internal:load([])).
|
?assertEqual(ok, emqx_mod_acl_internal:load([])),
|
||||||
|
?assertEqual({error,already_exists}, emqx_mod_acl_internal:load([])).
|
||||||
t_all_rules(_) ->
|
|
||||||
application:set_env(emqx, acl_file, ""),
|
|
||||||
?assertMatch(#{}, emqx_mod_acl_internal:all_rules()),
|
|
||||||
|
|
||||||
application:set_env(emqx, acl_file, emqx_ct_helpers:deps_path(emqx, "etc/acl.conf")),
|
|
||||||
?assertMatch(#{publish := _, subscribe := _}, emqx_mod_acl_internal:all_rules()).
|
|
||||||
|
|
||||||
t_check_acl(_) ->
|
t_check_acl(_) ->
|
||||||
Rules=#{publish => [{allow,all}], subscribe => [{deny, all}]},
|
Rules=#{publish => [{allow,all}], subscribe => [{deny, all}]},
|
||||||
|
@ -51,7 +44,7 @@ t_check_acl(_) ->
|
||||||
?assertEqual(ok, emqx_mod_acl_internal:check_acl(clientinfo(), connect, <<"t">>, [], Rules)).
|
?assertEqual(ok, emqx_mod_acl_internal:check_acl(clientinfo(), connect, <<"t">>, [], Rules)).
|
||||||
|
|
||||||
t_reload_acl(_) ->
|
t_reload_acl(_) ->
|
||||||
?assertEqual(ok, emqx_mod_acl_internal:reload_acl()).
|
?assertEqual(ok, emqx_mod_acl_internal:reload([])).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
|
|
|
@ -44,9 +44,7 @@ end_per_suite(_) ->
|
||||||
set_special_configs(emqx) ->
|
set_special_configs(emqx) ->
|
||||||
application:set_env(emqx, modules, [{emqx_mod_delayed, []}]),
|
application:set_env(emqx, modules, [{emqx_mod_delayed, []}]),
|
||||||
application:set_env(emqx, allow_anonymous, false),
|
application:set_env(emqx, allow_anonymous, false),
|
||||||
application:set_env(emqx, enable_acl_cache, false),
|
application:set_env(emqx, enable_acl_cache, false);
|
||||||
application:set_env(emqx, plugins_loaded_file,
|
|
||||||
emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
|
|
||||||
set_special_configs(_App) ->
|
set_special_configs(_App) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -55,8 +53,6 @@ set_special_configs(_App) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_load_case(_) ->
|
t_load_case(_) ->
|
||||||
ok = emqx_mod_delayed:unload([]),
|
|
||||||
timer:sleep(100),
|
|
||||||
UnHooks = emqx_hooks:lookup('message.publish'),
|
UnHooks = emqx_hooks:lookup('message.publish'),
|
||||||
?assertEqual([], UnHooks),
|
?assertEqual([], UnHooks),
|
||||||
ok = emqx_mod_delayed:load([]),
|
ok = emqx_mod_delayed:load([]),
|
||||||
|
@ -65,6 +61,7 @@ t_load_case(_) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_delayed_message(_) ->
|
t_delayed_message(_) ->
|
||||||
|
ok = emqx_mod_delayed:load([]),
|
||||||
DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>),
|
DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>),
|
||||||
?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)),
|
?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)),
|
||||||
|
|
||||||
|
@ -77,4 +74,5 @@ t_delayed_message(_) ->
|
||||||
timer:sleep(5000),
|
timer:sleep(5000),
|
||||||
|
|
||||||
EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed),
|
EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed),
|
||||||
?assertEqual([], EmptyKey).
|
?assertEqual([], EmptyKey),
|
||||||
|
ok = emqx_mod_delayed:unload([]).
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_modules_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
|
||||||
|
emqx_ct_helpers:boot_modules([]),
|
||||||
|
emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
set_sepecial_cfg(_) ->
|
||||||
|
application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
emqx_ct_helpers:stop_apps([]).
|
||||||
|
|
||||||
|
t_load(_) ->
|
||||||
|
?assertEqual(ok, emqx_modules:unload()),
|
||||||
|
?assertEqual(ok, emqx_modules:load()),
|
||||||
|
?assertEqual({error, not_found}, emqx_modules:load(not_existed_module)),
|
||||||
|
?assertEqual({error, not_started}, emqx_modules:unload(emqx_mod_rewrite)),
|
||||||
|
?assertEqual(ignore, emqx_modules:reload(emqx_mod_rewrite)),
|
||||||
|
?assertEqual(ok, emqx_modules:reload(emqx_mod_acl_internal)).
|
||||||
|
|
||||||
|
t_list(_) ->
|
||||||
|
?assertMatch([{_, _} | _ ], emqx_modules:list()).
|
||||||
|
|
Loading…
Reference in New Issue