diff --git a/etc/emqx.conf b/etc/emqx.conf index c71d2250e..e5b52617b 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1399,18 +1399,6 @@ listener.ws.external.access.1 = allow all ## Value: on | off listener.ws.external.verify_protocol_header = on -## Use X-Forwarded-For header for real source IP if the EMQ X cluster is -## deployed behind NGINX or HAProxy. -## -## Value: String -## listener.ws.external.proxy_address_header = X-Forwarded-For - -## Use X-Forwarded-Port header for real source port if the EMQ X cluster is -## deployed behind NGINX or HAProxy. -## -## Value: String -## listener.ws.external.proxy_port_header = X-Forwarded-Port - ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind ## HAProxy or Nginx. ## @@ -1830,15 +1818,14 @@ listener.wss.external.send_timeout_close = on ##-------------------------------------------------------------------- ## Modules ##-------------------------------------------------------------------- +## The file to store loaded module names. +## +## Value: File +modules.loaded_file = {{ platform_data_dir }}/loaded_modules ##-------------------------------------------------------------------- ## Presence Module -## Enable Presence Module. -## -## Value: on | off -module.presence = on - ## Sets the QoS for presence MQTT message. ## ## Value: 0 | 1 | 2 @@ -1847,11 +1834,6 @@ module.presence.qos = 1 ##-------------------------------------------------------------------- ## Subscription Module -## Enable Subscription Module. -## -## Value: on | off -module.subscription = off - ## Subscribe the Topics automatically when client connected. ## ## Value: String @@ -1887,31 +1869,10 @@ module.subscription = off ##-------------------------------------------------------------------- ## Rewrite Module -## Enable Rewrite Module. -## -## Value: on | off -module.rewrite = off - ## {rewrite, Topic, Re, Dest} ## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1 ## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 -##-------------------------------------------------------------------- -## Topic Metrics Module - -## Enable Topic Metrics Module. -## -## Value: on | off -module.topic_metrics = off - -##-------------------------------------------------------------------- -## Delayed Module - -## Enable Delayed Module. -## -## Value: on | off -module.delayed = off - ##------------------------------------------------------------------- ## Plugins ##------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 5b1b44df3..f3bf3fb3f 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1310,16 +1310,6 @@ end}. {datatype, flag} ]}. -{mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [ - {datatype, string}, - hidden -]}. - -{mapping, "listener.ws.$name.proxy_port_header", "emqx.listeners", [ - {datatype, string}, - hidden -]}. - {mapping, "listener.ws.$name.proxy_protocol", "emqx.listeners", [ {datatype, flag} ]}. @@ -1467,16 +1457,6 @@ end}. {datatype, string} ]}. -{mapping, "listener.wss.$name.proxy_address_header", "emqx.listeners", [ - {datatype, string}, - hidden -]}. - -{mapping, "listener.wss.$name.proxy_port_header", "emqx.listeners", [ - {datatype, string}, - hidden -]}. - {mapping, "listener.wss.$name.proxy_protocol", "emqx.listeners", [ {datatype, flag} ]}. @@ -1681,11 +1661,9 @@ end}. {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, {verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)}, {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, - {proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)}, {compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)}, {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}, - {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)}, - {proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)]) + {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)} | AccOpts(Prefix)]) end, DeflateOpts = fun(Prefix) -> Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)}, @@ -1788,9 +1766,8 @@ end}. %% Modules %%-------------------------------------------------------------------- -{mapping, "module.presence", "emqx.modules", [ - {default, off}, - {datatype, flag} +{mapping, "modules.loaded_file", "emqx.modules_loaded_file", [ + {datatype, string} ]}. {mapping, "module.presence.qos", "emqx.modules", [ @@ -1799,11 +1776,6 @@ end}. {validators, ["range:0-2"]} ]}. -{mapping, "module.subscription", "emqx.modules", [ - {default, off}, - {datatype, flag} -]}. - {mapping, "module.subscription.$id.topic", "emqx.modules", [ {datatype, string} ]}. @@ -1832,25 +1804,10 @@ end}. {validators, ["range:0-2"]} ]}. -{mapping, "module.rewrite", "emqx.modules", [ - {default, off}, - {datatype, flag} -]}. - {mapping, "module.rewrite.rule.$id", "emqx.modules", [ {datatype, string} ]}. -{mapping, "module.topic_metrics", "emqx.modules", [ - {default, off}, - {datatype, flag} -]}. - -{mapping, "module.delayed", "emqx.modules", [ - {default, off}, - {datatype, flag} -]}. - {translation, "emqx.modules", fun(Conf) -> Subscriptions = fun() -> List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), @@ -1869,26 +1826,12 @@ end}. end, Rules) end, lists:append([ - case cuttlefish:conf_get("module.presence", Conf) of %% Presence - true -> [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}]; - false -> [] - end, - case cuttlefish:conf_get("module.subscription", Conf) of %% Subscription - true -> [{emqx_mod_subscription, Subscriptions()}]; - false -> [] - end, - case cuttlefish:conf_get("module.rewrite", Conf) of %% Rewrite - true -> [{emqx_mod_rewrite, Rewrites()}]; - false -> [] - end, - case cuttlefish:conf_get("module.topic_metrics", Conf) of %% Topic Metrics - true -> [{emqx_mod_topic_metrics, []}]; - false -> [] - end, - case cuttlefish:conf_get("module.delayed", Conf) of %% Delayed - true -> [{emqx_mod_delayed, []}]; - false -> [] - end + [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}], + [{emqx_mod_subscription, Subscriptions()}], + [{emqx_mod_rewrite, Rewrites()}], + [{emqx_mod_topic_metrics, []}], + [{emqx_mod_delayed, []}], + [{emqx_mod_acl_internal, []}] ]) end}. diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index e891ca275..32383b3f0 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -21,7 +21,6 @@ -export([authenticate/1]). -export([ check_acl/3 - , reload_acl/0 ]). -type(result() :: #{auth_result := emqx_types:auth_result(), @@ -67,11 +66,6 @@ do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) -> _Other -> deny end. --spec(reload_acl() -> ok | {error, term()}). -reload_acl() -> - emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(), - emqx_mod_acl_internal:reload_acl(). - default_auth_result(Zone) -> case emqx_zone:get_env(Zone, allow_anonymous, false) of true -> #{auth_result => success, anonymous => true}; diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 1a8839891..4490ca40f 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -71,6 +71,8 @@ topic_aliases :: emqx_types:topic_aliases(), %% MQTT Topic Alias Maximum alias_maximum :: maybe(map()), + %% Authentication Data Cache + auth_cache :: maybe(map()), %% Timers timers :: #{atom() => disabled | maybe(reference())}, %% Conn State @@ -185,6 +187,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, topic_aliases = #{inbound => #{}, outbound => #{} }, + auth_cache = #{}, timers = #{}, conn_state = idle, takeover = false, @@ -216,10 +219,41 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> fun check_banned/2, fun auth_connect/2 ], ConnPkt, Channel#channel{conn_state = connecting}) of - {ok, NConnPkt, NChannel} -> - process_connect(NConnPkt, NChannel); + {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} -> + NChannel1 = NChannel#channel{ + will_msg = emqx_packet:will_msg(NConnPkt), + alias_maximum = init_alias_maximum(NConnPkt, ClientInfo) + }, + case enhanced_auth(?CONNECT_PACKET(NConnPkt), NChannel1) of + {ok, Properties, NChannel2} -> + process_connect(Properties, ensure_connected(NChannel2)); + {continue, Properties, NChannel2} -> + handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2); + {error, ReasonCode, NChannel2} -> + handle_out(connack, ReasonCode, NChannel2) + end; {error, ReasonCode, NChannel} -> - handle_out(connack, {ReasonCode, ConnPkt}, NChannel) + handle_out(connack, ReasonCode, NChannel) + end; + +handle_in(Packet = ?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION, _Properties), Channel) -> + case enhanced_auth(Packet, Channel) of + {ok, NProperties, NChannel} -> + process_connect(NProperties, ensure_connected(NChannel)); + {continue, NProperties, NChannel} -> + handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel); + {error, NReasonCode, NChannel} -> + handle_out(connack, NReasonCode, NChannel) + end; + +handle_in(Packet = ?AUTH_PACKET(?RC_RE_AUTHENTICATE, _Properties), Channel) -> + case enhanced_auth(Packet, Channel) of + {ok, NProperties, NChannel} -> + handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel); + {continue, NProperties, NChannel} -> + handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, NProperties}, NChannel); + {error, NReasonCode, NChannel} -> + handle_out(disconnect, NReasonCode, NChannel) end; handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> @@ -362,24 +396,23 @@ handle_in(Packet, Channel) -> %% Process Connect %%-------------------------------------------------------------------- -process_connect(ConnPkt = #mqtt_packet_connect{clean_start = CleanStart}, - Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> +process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanStart} = ConnInfo, clientinfo = ClientInfo}) -> case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of {ok, #{session := Session, present := false}} -> NChannel = Channel#channel{session = Session}, - handle_out(connack, {?RC_SUCCESS, sp(false), ConnPkt}, NChannel); + handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, NChannel); {ok, #{session := Session, present := true, pendings := Pendings}} -> Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())), NChannel = Channel#channel{session = Session, resuming = true, pendings = Pendings1 }, - handle_out(connack, {?RC_SUCCESS, sp(true), ConnPkt}, NChannel); + handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, NChannel); {error, client_id_unavailable} -> - handle_out(connack, {?RC_CLIENT_IDENTIFIER_NOT_VALID, ConnPkt}, Channel); + handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel); {error, Reason} -> ?LOG(error, "Failed to open session due to ~p", [Reason]), - handle_out(connack, {?RC_UNSPECIFIED_ERROR, ConnPkt}, Channel) + handle_out(connack, ?RC_UNSPECIFIED_ERROR, Channel) end. %%-------------------------------------------------------------------- @@ -579,18 +612,17 @@ not_nacked({deliver, _Topic, Msg}) -> | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), replies(), channel()}). -handle_out(connack, {?RC_SUCCESS, SP, ConnPkt}, Channel = #channel{conninfo = ConnInfo}) -> +handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = ConnInfo}) -> AckProps = run_fold([fun enrich_connack_caps/2, fun enrich_server_keepalive/2, fun enrich_assigned_clientid/2 - ], #{}, Channel), + ], Props, Channel), NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps), return_connack(?CONNACK_PACKET(?RC_SUCCESS, SP, NAckProps), - ensure_keepalive(NAckProps, - ensure_connected(ConnPkt, Channel))); + ensure_keepalive(NAckProps, Channel)); -handle_out(connack, {ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnInfo}) -> +handle_out(connack, ReasonCode, Channel = #channel{conninfo = ConnInfo}) -> Reason = emqx_reason_codes:name(ReasonCode), AckProps = run_hooks('client.connack', [ConnInfo, Reason], emqx_mqtt_props:new()), AckPacket = ?CONNACK_PACKET(case maps:get(proto_ver, ConnInfo) of @@ -643,6 +675,9 @@ handle_out(disconnect, {ReasonCode, ReasonName}, Channel = ?IS_MQTT_V5) -> handle_out(disconnect, {_ReasonCode, ReasonName}, Channel) -> {ok, {close, ReasonName}, Channel}; +handle_out(auth, {ReasonCode, Properties}, Channel) -> + {ok, ?AUTH_PACKET(ReasonCode, Properties), Channel}; + handle_out(Type, Data, Channel) -> ?LOG(error, "Unexpected outgoing: ~s, ~p", [Type, Data]), {ok, Channel}. @@ -1069,6 +1104,55 @@ auth_connect(#mqtt_packet_connect{clientid = ClientId, is_anonymous(#{anonymous := true}) -> true; is_anonymous(_AuthResult) -> false. +%%-------------------------------------------------------------------- +%% Enhanced Authentication + +enhanced_auth(?CONNECT_PACKET(#mqtt_packet_connect{ + proto_ver = Protover, + properties = Properties + }), Channel) -> + case Protover of + ?MQTT_PROTO_V5 -> + AuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined), + AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined), + do_enhanced_auth(AuthMethod, AuthData, Channel); + _ -> + {ok, #{}, Channel} + end; + +enhanced_auth(?AUTH_PACKET(_ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) -> + AuthMethod = maps:get('Authentication-Method', maps:get(conn_props, ConnInfo), undefined), + NAuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined), + AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined), + case NAuthMethod =:= undefined orelse NAuthMethod =/= AuthMethod of + true -> + {error, emqx_reason_codes:connack_error(bad_authentication_method), Channel}; + false -> + do_enhanced_auth(AuthMethod, AuthData, Channel) + end. + +do_enhanced_auth(undefined, undefined, Channel) -> + {ok, #{}, Channel}; +do_enhanced_auth(undefined, _AuthData, Channel) -> + {error, emqx_reason_codes:connack_error(not_authorized), Channel}; +do_enhanced_auth(_AuthMethod, undefined, Channel) -> + {error, emqx_reason_codes:connack_error(not_authorized), Channel}; +do_enhanced_auth(AuthMethod, AuthData, Channel = #channel{auth_cache = Cache}) -> + case do_auth_check(AuthMethod, AuthData, Cache) of + ok -> {ok, #{}, Channel#channel{auth_cache = #{}}}; + {ok, NAuthData} -> + NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData}, + {ok, NProperties, Channel#channel{auth_cache = #{}}}; + {continue, NAuthData, NCache} -> + NProperties = #{'Authentication-Method' => AuthMethod, 'Authentication-Data' => NAuthData}, + {continue, NProperties, Channel#channel{auth_cache = NCache}}; + {error, _Reason} -> + {error, emqx_reason_codes:connack_error(not_authorized), Channel} + end. + +do_auth_check(_AuthMethod, _AuthData, _AuthDataCache) -> + {error, not_authorized}. + %%-------------------------------------------------------------------- %% Process Topic Alias @@ -1259,14 +1343,12 @@ enrich_assigned_clientid(AckProps, #channel{conninfo = ConnInfo, %%-------------------------------------------------------------------- %% Ensure connected -ensure_connected(ConnPkt, Channel = #channel{conninfo = ConnInfo, - clientinfo = ClientInfo}) -> +ensure_connected(Channel = #channel{conninfo = ConnInfo, + clientinfo = ClientInfo}) -> NConnInfo = ConnInfo#{connected_at => erlang:system_time(second)}, ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), Channel#channel{conninfo = NConnInfo, - conn_state = connected, - will_msg = emqx_packet:will_msg(ConnPkt), - alias_maximum = init_alias_maximum(ConnPkt, ClientInfo) + conn_state = connected }. %%-------------------------------------------------------------------- diff --git a/src/emqx_gen_mod.erl b/src/emqx_gen_mod.erl index 533f5fa64..e320ec877 100644 --- a/src/emqx_gen_mod.erl +++ b/src/emqx_gen_mod.erl @@ -22,6 +22,8 @@ -callback(unload(State :: term()) -> term()). +-callback(description() -> any()). + -else. -export([behaviour_info/1]). diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 948c8e8d5..ce256d147 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -44,17 +44,16 @@ start() -> lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])). --spec(start_listener(listener()) -> {ok, pid()} | {error, term()}). +-spec(start_listener(listener()) -> ok). start_listener({Proto, ListenOn, Options}) -> - StartRet = start_listener(Proto, ListenOn, Options), - case StartRet of + case start_listener(Proto, ListenOn, Options) of {ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~0p~n!", - [Proto, format(ListenOn), Reason]) - end, - StartRet. + [Proto, format(ListenOn), Reason]), + error(Reason) + end. %% Start MQTT/TCP listener -spec(start_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index e429d6899..7b5b5c2a7 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -24,14 +24,13 @@ -logger_header("[ACL_INTERNAL]"). %% APIs --export([ all_rules/0 - , check_acl/5 - , reload_acl/0 - ]). +-export([check_acl/5]). %% emqx_gen_mod callbacks -export([ load/1 , unload/1 + , reload/1 + , description/0 ]). -define(MFA(M, F, A), {M, F, A}). @@ -44,18 +43,19 @@ %%-------------------------------------------------------------------- load(_Env) -> - Rules = rules_from_file(acl_file()), + Rules = rules_from_file(emqx:get_env(acl_file)), emqx_hooks:add('client.check_acl', ?MFA(?MODULE, check_acl, [Rules]), -1). unload(_Env) -> - Rules = rules_from_file(acl_file()), + Rules = rules_from_file(emqx:get_env(acl_file)), emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])). -%% @doc Read all rules --spec(all_rules() -> list(emqx_access_rule:rule())). -all_rules() -> - rules_from_file(acl_file()). +reload(_Env) -> + emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(), + unload([]), load([]). +description() -> + "EMQ X Internal ACL Module". %%-------------------------------------------------------------------- %% ACL callbacks %%-------------------------------------------------------------------- @@ -71,16 +71,9 @@ check_acl(Client, PubSub, Topic, _AclResult, Rules) -> nomatch -> ok end. --spec(reload_acl() -> ok | {error, term()}). -reload_acl() -> - unload([]), load([]). - %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- - -acl_file() -> emqx:get_env(acl_file). - lookup(PubSub, Rules) -> maps:get(PubSub, Rules, []). diff --git a/src/emqx_mod_delayed.erl b/src/emqx_mod_delayed.erl index c88523f59..f9a1c4756 100644 --- a/src/emqx_mod_delayed.erl +++ b/src/emqx_mod_delayed.erl @@ -17,6 +17,7 @@ -module(emqx_mod_delayed). -behaviour(gen_server). +-behaviour(emqx_gen_mod). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -24,6 +25,7 @@ %% emqx_gen_mod callbacks -export([ load/1 , unload/1 + , description/0 ]). -export([ start_link/0 @@ -62,6 +64,8 @@ unload(_Env) -> emqx:unhook('message.publish', {?MODULE, on_message_publish, []}), emqx_mod_sup:stop_child(?MODULE). +description() -> + "EMQ X Delayed Publish Module". %%-------------------------------------------------------------------- %% Hooks %%-------------------------------------------------------------------- diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 1abfaf73a..f5aa2279e 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -26,6 +26,7 @@ %% emqx_gen_mod callbacks -export([ load/1 , unload/1 + , description/0 ]). -export([ on_client_connected/3 @@ -44,6 +45,8 @@ unload(_Env) -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}), emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}). +description() -> + "EMQ X Presence Module". %%-------------------------------------------------------------------- %% Callbacks %%-------------------------------------------------------------------- diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 8098c9595..9702b7350 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -16,7 +16,7 @@ -module(emqx_mod_rewrite). --behavior(emqx_gen_mod). +-behaviour(emqx_gen_mod). -include_lib("emqx.hrl"). -include_lib("emqx_mqtt.hrl"). @@ -36,6 +36,7 @@ %% emqx_gen_mod callbacks -export([ load/1 , unload/1 + , description/0 ]). %%-------------------------------------------------------------------- @@ -62,6 +63,8 @@ unload(_) -> emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). +description() -> + "EMQ X Topic Rewrite Module". %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index 79bb8dc63..b6d04528b 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -24,6 +24,7 @@ %% emqx_gen_mod callbacks -export([ load/1 , unload/1 + , description/0 ]). %% APIs @@ -49,6 +50,8 @@ on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = # unload(_) -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}). +description() -> + "EMQ X Subscription Module". %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/src/emqx_mod_sup.erl b/src/emqx_mod_sup.erl index 74caa419b..b5512013c 100644 --- a/src/emqx_mod_sup.erl +++ b/src/emqx_mod_sup.erl @@ -58,5 +58,6 @@ stop_child(ChildId) -> %%-------------------------------------------------------------------- init([]) -> + ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]), {ok, {{one_for_one, 10, 100}, []}}. diff --git a/src/emqx_mod_topic_metrics.erl b/src/emqx_mod_topic_metrics.erl index 53dde662e..5deb42b92 100644 --- a/src/emqx_mod_topic_metrics.erl +++ b/src/emqx_mod_topic_metrics.erl @@ -17,6 +17,7 @@ -module(emqx_mod_topic_metrics). -behaviour(gen_server). +-behaviour(emqx_gen_mod). -include("emqx.hrl"). -include("logger.hrl"). @@ -26,6 +27,7 @@ -export([ load/1 , unload/1 + , description/0 ]). -export([ on_message_publish/1 @@ -104,6 +106,9 @@ unload(_Env) -> emqx:unhook('message.delivered', fun ?MODULE:on_message_delivered/2), emqx_mod_sup:stop_child(?MODULE). +description() -> + "EMQ X Topic Metrics Module". + on_message_publish(#message{topic = Topic, qos = QoS}) -> case is_registered(Topic) of true -> diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index 0e7e931d0..948359aa7 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -20,28 +20,149 @@ -logger_header("[Modules]"). --export([ load/0 +-export([ list/0 + , load/0 + , load/1 , unload/0 + , unload/1 + , reload/1 + , load_module/2 ]). +%% @doc List all available plugins +-spec(list() -> [{atom(), boolean()}]). +list() -> + ets:tab2list(?MODULE). + %% @doc Load all the extended modules. -spec(load() -> ok). load() -> - ok = emqx_mod_acl_internal:load([]), - lists:foreach(fun load/1, modules()). + case emqx:get_env(modules_loaded_file) of + undefined -> ignore; + File -> + load_modules(File) + end. -load({Mod, Env}) -> - ok = Mod:load(Env), - ?LOG(info, "Load ~s module successfully.", [Mod]). - -modules() -> emqx:get_env(modules, []). +load(ModuleName) -> + case find_module(ModuleName) of + [] -> + ?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]), + {error, not_found}; + [{ModuleName, true}] -> + ?LOG(notice, "Module ~s is already started", [ModuleName]), + {error, already_started}; + [{ModuleName, false}] -> + emqx_modules:load_module(ModuleName, true) + end. %% @doc Unload all the extended modules. -spec(unload() -> ok). unload() -> - ok = emqx_mod_acl_internal:unload([]), - lists:foreach(fun unload/1, modules()). + case emqx:get_env(modules_loaded_file) of + undefined -> ignore; + File -> + unload_modules(File) + end. -unload({Mod, Env}) -> - Mod:unload(Env). +unload(ModuleName) -> + case find_module(ModuleName) of + [] -> + ?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]), + {error, not_found}; + [{ModuleName, false}] -> + ?LOG(error, "Module ~s is not started", [ModuleName]), + {error, not_started}; + [{ModuleName, true}] -> + unload_module(ModuleName, true) + end. +reload(emqx_mod_acl_internal) -> + Modules = emqx:get_env(modules, []), + Env = proplists:get_value(emqx_mod_acl_internal, Modules, undefined), + case emqx_mod_acl_internal:reload(Env) of + ok -> + ?LOG(info, "Reload ~s module successfully.", [emqx_mod_acl_internal]); + {error, Error} -> + ?LOG(error, "Reload module ~s failed, cannot start for ~0p", [emqx_mod_acl_internal, Error]) + end; +reload(_) -> + ignore. + +find_module(ModuleName) -> + ets:lookup(?MODULE, ModuleName). + +filter_module(ModuleNames) -> + filter_module(ModuleNames, emqx:get_env(modules, [])). +filter_module([], Acc) -> + Acc; +filter_module([{ModuleName, true} | ModuleNames], Acc) -> + filter_module(ModuleNames, lists:keydelete(ModuleName, 1, Acc)); +filter_module([{_, false} | ModuleNames], Acc) -> + filter_module(ModuleNames, Acc). + +load_modules(File) -> + case file:consult(File) of + {ok, ModuleNames} -> + lists:foreach(fun({ModuleName, _}) -> + ets:insert(?MODULE, {ModuleName, false}) + end, filter_module(ModuleNames)), + lists:foreach(fun load_module/1, ModuleNames); + {error, Error} -> + ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]) + end. + +load_module({ModuleName, true}) -> + emqx_modules:load_module(ModuleName, false); +load_module({ModuleName, false}) -> + ets:insert(?MODULE, {ModuleName, false}); +load_module(ModuleName) -> + load_module({ModuleName, true}). + +load_module(ModuleName, Persistent) -> + Modules = emqx:get_env(modules, []), + Env = proplists:get_value(ModuleName, Modules, undefined), + case ModuleName:load(Env) of + ok -> + ets:insert(?MODULE, {ModuleName, true}), + write_loaded(Persistent), + ?LOG(info, "Load ~s module successfully.", [ModuleName]); + {error, Error} -> + ?LOG(error, "Load module ~s failed, cannot load for ~0p", [ModuleName, Error]), + {error, Error} + end. + +unload_modules(File) -> + case file:consult(File) of + {ok, ModuleNames} -> + lists:foreach(fun unload_module/1, ModuleNames); + {error, Error} -> + ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]) + end. +unload_module({ModuleName, true}) -> + unload_module(ModuleName, false); +unload_module({ModuleName, false}) -> + ets:insert(?MODULE, {ModuleName, false}); +unload_module(ModuleName) -> + unload_module({ModuleName, true}). + +unload_module(ModuleName, Persistent) -> + Modules = emqx:get_env(modules, []), + Env = proplists:get_value(ModuleName, Modules, undefined), + case ModuleName:unload(Env) of + ok -> + ets:insert(?MODULE, {ModuleName, false}), + write_loaded(Persistent), + ?LOG(info, "Unload ~s module successfully.", [ModuleName]); + {error, Error} -> + ?LOG(error, "Unload module ~s failed, cannot unload for ~0p", [ModuleName, Error]) + end. + +write_loaded(true) -> + FilePath = emqx:get_env(modules_loaded_file), + case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- list()]) of + ok -> ok; + {error, Error} -> + ?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]), + {error, Error} + end; +write_loaded(false) -> ok. \ No newline at end of file diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index c3ee2b87e..ece285767 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -169,6 +169,7 @@ compat(_Other, _Code) -> undefined. frame_error(frame_too_large) -> ?RC_PACKET_TOO_LARGE; frame_error(_) -> ?RC_MALFORMED_PACKET. +connack_error(protocol_error) -> ?RC_PROTOCOL_ERROR; connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID; connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD; connack_error(bad_clientid_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD; diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index f26bfb70e..1b09cf9e6 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -191,7 +191,13 @@ init(Req, Opts) -> end. websocket_init([Req, Opts]) -> - Peername = cowboy_req:peer(Req), + Peername = case proplists:get_bool(proxy_protocol, Opts) + andalso maps:get(proxy_header, Req) of + #{src_address := SrcAddr, src_port := SrcPort} -> + {SrcAddr, SrcPort}; + _ -> + cowboy_req:peer(Req) + end, Sockname = cowboy_req:sock(Req), Peercert = cowboy_req:cert(Req), WsCookie = try cowboy_req:parse_cookies(Req) diff --git a/test/emqx_SUITE_data/loaded_modules b/test/emqx_SUITE_data/loaded_modules new file mode 100644 index 000000000..49effa0c5 --- /dev/null +++ b/test/emqx_SUITE_data/loaded_modules @@ -0,0 +1,2 @@ +{emqx_mod_acl_internal, true}. +{emqx_mod_presence, true}. \ No newline at end of file diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl deleted file mode 100644 index 7cc808b5a..000000000 --- a/test/emqx_access_SUITE.erl +++ /dev/null @@ -1,378 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_access_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include("emqx.hrl"). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). - --define(AC, emqx_access_control). --define(CACHE, emqx_acl_cache). - --import(emqx_access_rule, - [ compile/1 - , match/3 - ]). - -all() -> - [{group, access_control}, - {group, acl_cache}, - {group, access_control_cache_mode}, - {group, access_rule} - ]. - -groups() -> - [{access_control, [sequence], - [t_reload_acl, - t_check_acl_1, - t_check_acl_2]}, - {access_control_cache_mode, [sequence], - [t_acl_cache_basic, - t_acl_cache_expiry, - t_acl_cache_cleanup, - t_acl_cache_full]}, - {acl_cache, [sequence], - [t_put_get_del_cache, - t_cache_update, - t_cache_expiry, - t_cache_replacement, - t_cache_cleanup, - t_cache_auto_emtpy, - t_cache_auto_cleanup]}, - {access_rule, [parallel], - [t_compile_rule, - t_match_rule] - }]. - -init_per_suite(Config) -> - emqx_ct_helpers:boot_modules([router, broker]), - emqx_ct_helpers:start_apps([]), - Config. - -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([]). - -init_per_group(Group, Config) when Group =:= access_control; - Group =:= access_control_cache_mode -> - prepare_config(Group), - application:load(emqx), - Config; -init_per_group(_Group, Config) -> - Config. - -prepare_config(Group = access_control) -> - set_acl_config_file(Group), - application:set_env(emqx, enable_acl_cache, false); -prepare_config(Group = access_control_cache_mode) -> - set_acl_config_file(Group), - application:set_env(emqx, enable_acl_cache, true), - application:set_env(emqx, acl_cache_max_size, 100). - -set_acl_config_file(_Group) -> - Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}, - {allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}, - {allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]}, - {allow, {client, "testClient"}, subscribe, ["testTopics/testClient"]}, - {allow, all, subscribe, ["clients/%c"]}, - {allow, all, pubsub, ["users/%u/#"]}, - {deny, all, subscribe, ["$SYS/#", "#"]}, - {deny, all}], - write_config("access_SUITE_acl.conf", Rules), - application:set_env(emqx, acl_file, "access_SUITE_acl.conf"). - -write_config(Filename, Terms) -> - file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]). - -end_per_group(_Group, Config) -> - Config. - -%%-------------------------------------------------------------------- -%% emqx_access_control -%%-------------------------------------------------------------------- - -t_reload_acl(_) -> - ok = ?AC:reload_acl(). - -t_check_acl_1(_) -> - Client = #{zone => external, - clientid => <<"client1">>, - username => <<"testuser">> - }, - allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>), - allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>), - deny = ?AC:check_acl(Client, subscribe, <<"clients/client1/x/y">>), - allow = ?AC:check_acl(Client, publish, <<"users/testuser/1">>), - allow = ?AC:check_acl(Client, subscribe, <<"a/b/c">>). - -t_check_acl_2(_) -> - Client = #{zone => external, - clientid => <<"client2">>, - username => <<"xyz">> - }, - deny = ?AC:check_acl(Client, subscribe, <<"a/b/c">>). - -t_acl_cache_basic(_) -> - Client = #{zone => external, - clientid => <<"client1">>, - username => <<"testuser">> - }, - not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), - not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - - allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>), - allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>), - - allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>). - -t_acl_cache_expiry(_) -> - application:set_env(emqx, acl_cache_ttl, 100), - Client = #{zone => external, - clientid => <<"client1">>, - username => <<"testuser">> - }, - allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>), - allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - ct:sleep(150), - not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>). - -t_acl_cache_full(_) -> - application:set_env(emqx, acl_cache_max_size, 1), - Client = #{zone => external, - clientid => <<"client1">>, - username => <<"testuser">> - }, - allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>), - allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>), - - %% the older ones (the <<"users/testuser/1">>) will be evicted first - not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>). - -t_acl_cache_cleanup(_) -> - %% The acl cache will try to evict memory, if the size is full and the newest - %% cache entry is expired - application:set_env(emqx, acl_cache_ttl, 100), - application:set_env(emqx, acl_cache_max_size, 2), - Client = #{zone => external, - clientid => <<"client1">>, - username => <<"testuser">> - }, - allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>), - allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>), - - allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - - ct:sleep(150), - %% now the cache is full and the newest one - "clients/client1" - %% should be expired, so we'll empty the cache before putting - %% the next cache entry - deny = ?AC:check_acl(Client, subscribe, <<"#">>), - - not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), - not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - deny = ?CACHE:get_acl_cache(subscribe, <<"#">>). - -t_put_get_del_cache(_) -> - application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_max_size, 30), - - not_found = ?CACHE:get_acl_cache(publish, <<"a">>), - ok = ?CACHE:put_acl_cache(publish, <<"a">>, allow), - allow = ?CACHE:get_acl_cache(publish, <<"a">>), - - not_found = ?CACHE:get_acl_cache(subscribe, <<"b">>), - ok = ?CACHE:put_acl_cache(subscribe, <<"b">>, deny), - deny = ?CACHE:get_acl_cache(subscribe, <<"b">>), - - 2 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()). - -t_cache_expiry(_) -> - application:set_env(emqx, acl_cache_ttl, 100), - application:set_env(emqx, acl_cache_max_size, 30), - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), - - ct:sleep(150), - not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), - - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny), - deny = ?CACHE:get_acl_cache(subscribe, <<"a">>), - - ct:sleep(150), - not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>). - -t_cache_update(_) -> - application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_max_size, 30), - [] = ?CACHE:dump_acl_cache(), - - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), - 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()), - - %% update the 2nd one - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]), - - 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()), - ?assertEqual(?CACHE:cache_k(subscribe, <<"a">>), ?CACHE:get_oldest_key()). - -t_cache_replacement(_) -> - application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_max_size, 3), - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), - allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), - allow = ?CACHE:get_acl_cache(publish, <<"b">>), - allow = ?CACHE:get_acl_cache(publish, <<"c">>), - 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()), - - ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), - 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()), - ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_oldest_key()), - - ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny), - 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()), - ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()), - - not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), - not_found = ?CACHE:get_acl_cache(publish, <<"b">>), - allow = ?CACHE:get_acl_cache(publish, <<"c">>). - -t_cache_cleanup(_) -> - application:set_env(emqx, acl_cache_ttl, 100), - application:set_env(emqx, acl_cache_max_size, 30), - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ct:sleep(150), - ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), - 3 = ?CACHE:get_cache_size(), - - ?CACHE:cleanup_acl_cache(), - ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()), - 1 = ?CACHE:get_cache_size(). - -t_cache_auto_emtpy(_) -> - %% verify cache is emptied when cache full and even the newest - %% one is expired. - application:set_env(emqx, acl_cache_ttl, 100), - application:set_env(emqx, acl_cache_max_size, 3), - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), - 3 = ?CACHE:get_cache_size(), - - ct:sleep(150), - ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny), - 1 = ?CACHE:get_cache_size(). - -t_cache_auto_cleanup(_) -> - %% verify we'll cleanup expired entries when we got a exipired acl - %% from cache. - application:set_env(emqx, acl_cache_ttl, 100), - application:set_env(emqx, acl_cache_max_size, 30), - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ct:sleep(150), - ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), - 4 = ?CACHE:get_cache_size(), - - %% "a" and "b" expires, while "c" and "d" not - not_found = ?CACHE:get_acl_cache(publish, <<"b">>), - 2 = ?CACHE:get_cache_size(), - - ct:sleep(150), %% now "c" and "d" expires - not_found = ?CACHE:get_acl_cache(publish, <<"c">>), - 0 = ?CACHE:get_cache_size(). - -%%-------------------------------------------------------------------- -%% emqx_access_rule -%%-------------------------------------------------------------------- - -t_compile_rule(_) -> - {allow, {'and', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, - {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = - compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}), - {allow, {'or', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, - {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = - compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}), - - {allow, {ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = - compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}), - {allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]} = - compile({allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}), - {allow, {user, <<"admin">>}, pubsub, [ [<<"d">>, <<"e">>, <<"f">>, '#'] ]} = - compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]}), - {allow, {client, <<"testClient">>}, publish, [ [<<"testTopics">>, <<"testClient">>] ]} = - compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}), - {allow, all, pubsub, [{pattern, [<<"clients">>, <<"%c">>]}]} = - compile({allow, all, pubsub, ["clients/%c"]}), - {allow, all, subscribe, [{pattern, [<<"users">>, <<"%u">>, '#']}]} = - compile({allow, all, subscribe, ["users/%u/#"]}), - {deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = - compile({deny, all, subscribe, ["$SYS/#", "#"]}), - {allow, all} = compile({allow, all}), - {deny, all} = compile({deny, all}). - -t_match_rule(_) -> - ClientInfo1 = #{zone => external, - clientid => <<"testClient">>, - username => <<"TestUser">>, - peerhost => {127,0,0,1} - }, - ClientInfo2 = #{zone => external, - clientid => <<"testClient">>, - username => <<"TestUser">>, - peerhost => {192,168,0,10} - }, - {matched, allow} = match(ClientInfo1, <<"Test/Topic">>, {allow, all}), - {matched, deny} = match(ClientInfo1, <<"Test/Topic">>, {deny, all}), - {matched, allow} = match(ClientInfo1, <<"Test/Topic">>, - compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})), - {matched, allow} = match(ClientInfo2, <<"Test/Topic">>, - compile({allow, {ipaddr, "192.168.0.1/24"}, subscribe, ["$SYS/#", "#"]})), - {matched, allow} = match(ClientInfo1, <<"d/e/f/x">>, - compile({allow, {user, "TestUser"}, subscribe, ["a/b/c", "d/e/f/#"]})), - nomatch = match(ClientInfo1, <<"d/e/f/x">>, compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]})), - {matched, allow} = match(ClientInfo1, <<"testTopics/testClient">>, - compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]})), - {matched, allow} = match(ClientInfo1, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/%c"]})), - {matched, allow} = match(#{username => <<"user2">>}, <<"users/user2/abc/def">>, - compile({allow, all, subscribe, ["users/%u/#"]})), - {matched, deny} = match(ClientInfo1, <<"d/e/f">>, compile({deny, all, subscribe, ["$SYS/#", "#"]})), - Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}), - nomatch = match(ClientInfo1, <<"Topic">>, Rule), - AndRule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"TestUser">>}]}, publish, <<"Topic">>}), - {matched, allow} = match(ClientInfo1, <<"Topic">>, AndRule), - OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}), - {matched, allow} = match(ClientInfo1, <<"Topic">>, OrRule). - diff --git a/test/emqx_access_control_SUITE.erl b/test/emqx_access_control_SUITE.erl index ca4db1fbb..a327eea6d 100644 --- a/test/emqx_access_control_SUITE.erl +++ b/test/emqx_access_control_SUITE.erl @@ -49,9 +49,6 @@ t_check_acl(_) -> Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>), ?assertEqual(allow, emqx_access_control:check_acl(clientinfo(), Publish, <<"t">>)). -t_reload_acl(_) -> - ?assertEqual(ok, emqx_access_control:reload_acl()). - t_bypass_auth_plugins(_) -> AuthFun = fun(#{zone := bypass_zone}, AuthRes) -> {stop, AuthRes#{auth_result => password_error}}; diff --git a/test/emqx_acl_test_mod.erl b/test/emqx_acl_test_mod.erl index e698efc4d..5d36cce78 100644 --- a/test/emqx_acl_test_mod.erl +++ b/test/emqx_acl_test_mod.erl @@ -19,7 +19,6 @@ %% ACL callbacks -export([ init/1 , check_acl/2 - , reload_acl/1 , description/0 ]). @@ -29,9 +28,6 @@ init(AclOpts) -> check_acl({_User, _PubSub, _Topic}, _State) -> allow. -reload_acl(_State) -> - ok. - description() -> "Test ACL Mod". diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 342ba8bae..a94095b2b 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -112,6 +112,43 @@ t_handle_in_unexpected_connect_packet(_) -> {ok, [{outgoing, Packet}, {close, protocol_error}], Channel} = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel). +t_handle_in_connect_auth_failed(_) -> + ConnPkt = #mqtt_packet_connect{ + proto_name = <<"MQTT">>, + proto_ver = ?MQTT_PROTO_V5, + is_bridge = false, + clean_start = true, + keepalive = 30, + properties = #{ + 'Authentication-Method' => "failed_auth_method", + 'Authentication-Data' => <<"failed_auth_data">> + }, + clientid = <<"clientid">>, + username = <<"username">> + }, + {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} = + emqx_channel:handle_in(?CONNECT_PACKET(ConnPkt), channel(#{conn_state => idle})). + +t_handle_in_continue_auth(_) -> + Properties = #{ + 'Authentication-Method' => "failed_auth_method", + 'Authentication-Data' => <<"failed_auth_data">> + }, + {shutdown, bad_authentication_method, ?CONNACK_PACKET(?RC_BAD_AUTHENTICATION_METHOD), _} = + emqx_channel:handle_in(?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION,Properties), channel()), + {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} = + emqx_channel:handle_in(?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => Properties}})). + +t_handle_in_re_auth(_) -> + Properties = #{ + 'Authentication-Method' => "failed_auth_method", + 'Authentication-Data' => <<"failed_auth_data">> + }, + {ok, [{outgoing, ?DISCONNECT_PACKET(?RC_BAD_AUTHENTICATION_METHOD)}, {close, bad_authentication_method}], _} = + emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel()), + {ok, [{outgoing, ?DISCONNECT_PACKET(?RC_NOT_AUTHORIZED)}, {close, not_authorized}], _} = + emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => Properties}})). + t_handle_in_qos0_publish(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Channel = channel(#{conn_state => connected}), @@ -286,7 +323,7 @@ t_process_connect(_) -> {ok, #{session => session(), present => false}} end), {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} = - emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})). + emqx_channel:process_connect(#{}, channel(#{conn_state => idle})). t_process_publish_qos0(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), @@ -346,12 +383,12 @@ t_handle_out_publish_nl(_) -> t_handle_out_connack_sucess(_) -> {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} = - emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()), + emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, #{}}, channel()), ?assertEqual(connected, emqx_channel:info(conn_state, Channel)). t_handle_out_connack_failure(_) -> {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} = - emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()). + emqx_channel:handle_out(connack, ?RC_NOT_AUTHORIZED, channel()). t_handle_out_puback(_) -> Channel = channel(#{conn_state => connected}), diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index bc09a26bb..3bf53e683 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). all() -> emqx_ct:all(?MODULE). @@ -37,7 +38,7 @@ end_per_suite(_Config) -> t_start_stop_listeners(_) -> ok = emqx_listeners:start(), - {error, _} = emqx_listeners:start_listener({ws,{"127.0.0.1", 8083}, []}), + ?assertException(error, _, emqx_listeners:start_listener({ws,{"127.0.0.1", 8083}, []})), ok = emqx_listeners:stop(). t_restart_listeners(_) -> @@ -90,4 +91,4 @@ get_base_dir(Module) -> get_base_dir() -> get_base_dir(?MODULE). - \ No newline at end of file + diff --git a/test/emqx_mod_acl_internal_SUITE.erl b/test/emqx_mod_acl_internal_SUITE.erl index 4849fab7d..bd9a0bf5a 100644 --- a/test/emqx_mod_acl_internal_SUITE.erl +++ b/test/emqx_mod_acl_internal_SUITE.erl @@ -33,16 +33,9 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). t_load_unload(_) -> - ?assertEqual({error,already_exists}, emqx_mod_acl_internal:load([])), ?assertEqual(ok, emqx_mod_acl_internal:unload([])), - ?assertEqual(ok, emqx_mod_acl_internal:load([])). - -t_all_rules(_) -> - application:set_env(emqx, acl_file, ""), - ?assertMatch(#{}, emqx_mod_acl_internal:all_rules()), - - application:set_env(emqx, acl_file, emqx_ct_helpers:deps_path(emqx, "etc/acl.conf")), - ?assertMatch(#{publish := _, subscribe := _}, emqx_mod_acl_internal:all_rules()). + ?assertEqual(ok, emqx_mod_acl_internal:load([])), + ?assertEqual({error,already_exists}, emqx_mod_acl_internal:load([])). t_check_acl(_) -> Rules=#{publish => [{allow,all}], subscribe => [{deny, all}]}, @@ -51,7 +44,7 @@ t_check_acl(_) -> ?assertEqual(ok, emqx_mod_acl_internal:check_acl(clientinfo(), connect, <<"t">>, [], Rules)). t_reload_acl(_) -> - ?assertEqual(ok, emqx_mod_acl_internal:reload_acl()). + ?assertEqual(ok, emqx_mod_acl_internal:reload([])). %%-------------------------------------------------------------------- %% Helper functions diff --git a/test/emqx_mod_delayed_SUITE.erl b/test/emqx_mod_delayed_SUITE.erl index 99883eb16..3759e8c4d 100644 --- a/test/emqx_mod_delayed_SUITE.erl +++ b/test/emqx_mod_delayed_SUITE.erl @@ -44,9 +44,7 @@ end_per_suite(_) -> set_special_configs(emqx) -> application:set_env(emqx, modules, [{emqx_mod_delayed, []}]), application:set_env(emqx, allow_anonymous, false), - application:set_env(emqx, enable_acl_cache, false), - application:set_env(emqx, plugins_loaded_file, - emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); + application:set_env(emqx, enable_acl_cache, false); set_special_configs(_App) -> ok. @@ -55,8 +53,6 @@ set_special_configs(_App) -> %%-------------------------------------------------------------------- t_load_case(_) -> - ok = emqx_mod_delayed:unload([]), - timer:sleep(100), UnHooks = emqx_hooks:lookup('message.publish'), ?assertEqual([], UnHooks), ok = emqx_mod_delayed:load([]), @@ -65,6 +61,7 @@ t_load_case(_) -> ok. t_delayed_message(_) -> + ok = emqx_mod_delayed:load([]), DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>), ?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)), @@ -77,4 +74,5 @@ t_delayed_message(_) -> timer:sleep(5000), EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed), - ?assertEqual([], EmptyKey). + ?assertEqual([], EmptyKey), + ok = emqx_mod_delayed:unload([]). diff --git a/test/emqx_modules_SUITE.erl b/test/emqx_modules_SUITE.erl new file mode 100644 index 000000000..eb161d02b --- /dev/null +++ b/test/emqx_modules_SUITE.erl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_modules_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + + emqx_ct_helpers:boot_modules([]), + emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1), + Config. + +set_sepecial_cfg(_) -> + application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")), + ok. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_load(_) -> + ?assertEqual(ok, emqx_modules:unload()), + ?assertEqual(ok, emqx_modules:load()), + ?assertEqual({error, not_found}, emqx_modules:load(not_existed_module)), + ?assertEqual({error, not_started}, emqx_modules:unload(emqx_mod_rewrite)), + ?assertEqual(ignore, emqx_modules:reload(emqx_mod_rewrite)), + ?assertEqual(ok, emqx_modules:reload(emqx_mod_acl_internal)). + +t_list(_) -> + ?assertMatch([{_, _} | _ ], emqx_modules:list()). +