diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index ed87ce6c2..91936c8f0 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -94,7 +94,8 @@ fields(client_bucket) -> , {initial, sc(initial(), #{default => "0"})} %% low_water_mark add for emqx_channel and emqx_session %% both modules consume first and then check - %% so we need to use this value to prevent excessive consumption (e.g, consumption from an empty bucket) + %% so we need to use this value to prevent excessive consumption + %% (e.g, consumption from an empty bucket) , {low_water_mark, sc(initial(), #{desc => "if the remaining tokens are lower than this value, the check/consume will succeed, but it will be forced to hang for a short period of time", diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 5e189a8b1..1bccbf2a0 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -89,6 +89,8 @@ -export_type([index/0]). -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). +-elvis([{elvis_style, no_if_expression, disable}]). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -337,8 +339,9 @@ longitudinal(#{id := Id, case lists:min([ShouldAlloc, Flow, Capacity]) of Avaiable when Avaiable > 0 -> - %% XXX if capacity is infinity, and flow always > 0, the value in counter - %% will be overflow at some point in the future, do we need to deal with this situation??? + %% XXX if capacity is infinity, and flow always > 0, the value in + %% counter will be overflow at some point in the future, do we need + %% to deal with this situation??? {Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node), counters:add(Counter, Index, Inc), diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 9e64e70c9..19bc2b3c3 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -923,7 +923,12 @@ t_ws_cookie_init(_) -> conn_mod => emqx_ws_connection, ws_cookie => WsCookie }, - Channel = emqx_channel:init(ConnInfo, #{zone => default, limiter => limiter_cfg(), listener => {tcp, default}}), + Channel = emqx_channel:init( + ConnInfo, + #{zone => default, + limiter => limiter_cfg(), + listener => {tcp, default} + }), ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). %%-------------------------------------------------------------------- @@ -948,7 +953,12 @@ channel(InitFields) -> maps:fold(fun(Field, Value, Channel) -> emqx_channel:set_field(Field, Value, Channel) end, - emqx_channel:init(ConnInfo, #{zone => default, limiter => limiter_cfg(), listener => {tcp, default}}), + emqx_channel:init( + ConnInfo, + #{zone => default, + limiter => limiter_cfg(), + listener => {tcp, default} + }), maps:merge(#{clientinfo => clientinfo(), session => session(), conn_state => connected diff --git a/apps/emqx_exhook/src/emqx_exhook_api.erl b/apps/emqx_exhook/src/emqx_exhook_api.erl index bd3351616..7581dd17b 100644 --- a/apps/emqx_exhook/src/emqx_exhook_api.erl +++ b/apps/emqx_exhook/src/emqx_exhook_api.erl @@ -77,7 +77,8 @@ schema("/exhooks/:name") -> description => <<"Delete the server">>, parameters => params_server_name_in_path(), responses => #{204 => <<>>, - 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) } + 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) + } } }; @@ -176,7 +177,8 @@ action_with_name(put, #{bindings := #{name := Name}, body := Body}) -> }}; {ok, {error, Reason}} -> {400, #{code => <<"BAD_REQUEST">>, - message => unicode:characters_to_binary(io_lib:format("Error Reason:~p~n", [Reason])) + message => unicode:characters_to_binary( + io_lib:format("Error Reason:~p~n", [Reason])) }}; {ok, _} -> {200}; diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index ed81ece82..3f43c1597 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -461,11 +461,9 @@ check_timer(Timer, _, _) -> -spec get_backend_module() -> backend(). get_backend_module() -> - #{type := Backend} = emqx:get_config([retainer, config]), - ModName = if Backend =:= built_in_database -> - mnesia; - true -> - Backend + ModName = case emqx:get_config([retainer, config]) of + #{type := built_in_database} -> mnesia; + #{type := Backend} -> Backend end, erlang:list_to_existing_atom(io_lib:format("~ts_~ts", [?APP, ModName])). diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 6bb0ae340..28be3aa55 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -95,7 +95,11 @@ store_retained(_, Msg =#message{topic = Topic}) -> end, case mria:transaction(?RETAINER_SHARD, Fun) of {atomic, ok} -> ok; - {aborted, Reason} -> ?SLOG(error, #{msg => "failed_to_retain_message", topic => Topic, reason => Reason}) + {aborted, Reason} -> + ?SLOG(error, #{ msg => "failed_to_retain_message" + , topic => Topic + , reason => Reason + }) end end. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index e51417e6e..54e19e42c 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -90,7 +90,10 @@ t_store_and_clean(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained">>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}]), timer:sleep(100), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), @@ -118,7 +121,10 @@ t_retain_handling(_) -> ?assertEqual(0, length(receive_messages(1))), {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>), - emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained">>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), @@ -148,9 +154,18 @@ t_retain_handling(_) -> t_wildcard_subscription(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/a/b/c">>, <<"this is a retained message 2">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/1">>, + <<"this is a retained message 1">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/a/b/c">>, + <<"this is a retained message 2">>, + [{qos, 0}, {retain, true}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0), @@ -165,11 +180,26 @@ t_message_expiry(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained/0">>, #{'Message-Expiry-Interval' => 0}, <<"don't expire">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/1">>, #{'Message-Expiry-Interval' => 2}, <<"expire">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/2">>, #{'Message-Expiry-Interval' => 5}, <<"don't expire">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/3">>, <<"don't expire">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"$SYS/retained/4">>, <<"don't expire">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/0">>, #{'Message-Expiry-Interval' => 0}, + <<"don't expire">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/1">>, #{'Message-Expiry-Interval' => 2}, + <<"expire">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/2">>, #{'Message-Expiry-Interval' => 5}, + <<"don't expire">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/3">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"$SYS/retained/4">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), @@ -210,9 +240,18 @@ t_message_expiry_2(_) -> t_clean(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/test/0">>, <<"this is a retained message 2">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/1">>, + <<"this is a retained message 1">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/test/0">>, + <<"this is a retained message 2">>, + [{qos, 0}, {retain, true}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(3, length(receive_messages(3))), @@ -227,7 +266,11 @@ t_stop_publish_clear_msg(_) -> emqx_retainer:update_config(#{<<"stop_publish_clear_msg">> => true}), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}] + ), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), @@ -239,14 +282,27 @@ t_stop_publish_clear_msg(_) -> ok = emqtt:disconnect(C1). t_flow_control(_) -> - emqx_retainer:update_config(#{<<"flow_control">> => #{<<"max_read_number">> => 1, - <<"msg_deliver_quota">> => 1, - <<"quota_release_interval">> => <<"1s">>}}), + emqx_retainer:update_config(#{<<"flow_control">> => + #{<<"max_read_number">> => 1, + <<"msg_deliver_quota">> => 1, + <<"quota_release_interval">> => <<"1s">>}}), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/3">>, <<"this is a retained message 3">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, <<"retained/1">>, + <<"this is a retained message 1">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, <<"retained/3">>, + <<"this is a retained message 3">>, + [{qos, 0}, {retain, true}] + ), Begin = erlang:system_time(millisecond), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(3, length(receive_messages(3))), diff --git a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl index 9a887d2d3..c5c828e55 100644 --- a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl @@ -94,13 +94,23 @@ t_publish_retain_message(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), - {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, [{qos, 2}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"new retained message">>, [{qos, 2}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"not retained message">>, [{qos, 2}, {retain, false}]), + {ok, _} = emqtt:publish( + Client1, Topic, #{}, + <<"retained message">>, + [{qos, 2}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, Topic, #{}, + <<"new retained message">>, + [{qos, 2}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, Topic, #{}, + <<"not retained message">>, + [{qos, 2}, {retain, false}]), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2), [Msg] = receive_messages(1), - ?assertEqual(<<"new retained message">>, maps:get(payload, Msg)), %% [MQTT-3.3.1-5] [MQTT-3.3.1-8] + %% [MQTT-3.3.1-5] [MQTT-3.3.1-8] + ?assertEqual(<<"new retained message">>, maps:get(payload, Msg)), {ok, _, [0]} = emqtt:unsubscribe(Client1, Topic), {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"">>, [{qos, 2}, {retain, true}]), @@ -113,16 +123,33 @@ t_publish_retain_message(_) -> t_publish_message_expiry_interval(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), - {ok, _} = emqtt:publish(Client1, <<"topic/A">>, #{'Message-Expiry-Interval' => 1}, <<"retained message">>, [{qos, 1}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, <<"topic/B">>, #{'Message-Expiry-Interval' => 1}, <<"retained message">>, [{qos, 2}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, <<"topic/C">>, #{'Message-Expiry-Interval' => 10}, <<"retained message">>, [{qos, 1}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, <<"topic/D">>, #{'Message-Expiry-Interval' => 10}, <<"retained message">>, [{qos, 2}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, <<"topic/A">>, #{'Message-Expiry-Interval' => 1}, + <<"retained message">>, + [{qos, 1}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, <<"topic/B">>, #{'Message-Expiry-Interval' => 1}, + <<"retained message">>, + [{qos, 2}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, <<"topic/C">>, #{'Message-Expiry-Interval' => 10}, + <<"retained message">>, + [{qos, 1}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, <<"topic/D">>, #{'Message-Expiry-Interval' => 10}, + <<"retained message">>, + [{qos, 2}, {retain, true}]), timer:sleep(1500), {ok, _, [2]} = emqtt:subscribe(Client1, <<"topic/+">>, 2), Msgs = receive_messages(4), ?assertEqual(2, length(Msgs)), %% [MQTT-3.3.2-5] - L = lists:map(fun(Msg) -> MessageExpiryInterval = maps:get('Message-Expiry-Interval', maps:get(properties, Msg)), MessageExpiryInterval < 10 end, Msgs), + L = lists:map( + fun(Msg) -> + MessageExpiryInterval = maps:get('Message-Expiry-Interval', + maps:get(properties, Msg)), + MessageExpiryInterval < 10 + end, Msgs), ?assertEqual(2, length(L)), %% [MQTT-3.3.2-6] ok = emqtt:disconnect(Client1), @@ -137,9 +164,21 @@ t_publish_message_expiry_interval(_) -> t_subscribe_retain_handing(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), - ok = emqtt:publish(Client1, <<"topic/A">>, #{}, <<"retained message">>, [{qos, 0}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, <<"topic/B">>, #{}, <<"retained message">>, [{qos, 1}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, <<"topic/C">>, #{}, <<"retained message">>, [{qos, 2}, {retain, true}]), + ok = emqtt:publish( + Client1, <<"topic/A">>, #{}, + <<"retained message">>, + [{qos, 0}, {retain, true}] + ), + {ok, _} = emqtt:publish( + Client1, <<"topic/B">>, #{}, + <<"retained message">>, + [{qos, 1}, {retain, true}] + ), + {ok, _} = emqtt:publish( + Client1, <<"topic/C">>, #{}, + <<"retained message">>, + [{qos, 2}, {retain, true}] + ), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]), ?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-10] diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index 370f250fb..027b304b1 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -104,7 +104,8 @@ on_stats_update(#{clientid := ClientId, case ets:lookup(?TOPK_TAB, LastIndex) of [#top_k{index = Index}] -> %% if last value == the new value, update the type and last_update_time - %% XXX for clients whose latency are stable for a long time, is it possible to reduce updates? + %% XXX for clients whose latency are stable for a long time, is it + %% possible to reduce updates? ets:insert(?TOPK_TAB, #top_k{index = Index, type = Type, last_update_time = Ts}); [_] -> diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl index 5ad7e207b..ee2016268 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -72,11 +72,16 @@ schema("/slow_subscriptions/settings") -> }. fields(record) -> - [ - {clientid, mk(string(), #{desc => <<"the clientid">>})}, - {latency, mk(integer(), #{desc => <<"average time for message delivery or time for message expire">>})}, - {type, mk(string(), #{desc => <<"type of the latency, could be average or expire">>})}, - {last_update_time, mk(integer(), #{desc => <<"the timestamp of last update">>})} + [ {clientid, + mk(string(), #{desc => <<"the clientid">>})}, + {latency, + mk(integer(), + #{desc => <<"average time for message delivery or time for message expire">>})}, + {type, + mk(string(), + #{desc => <<"type of the latency, could be average or expire">>})}, + {last_update_time, + mk(integer(), #{desc => <<"the timestamp of last update">>})} ]. conf_schema() -> diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl index 4a802eb4c..2eca0e730 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl @@ -23,8 +23,8 @@ fields("slow_subs") -> , {notice_interval, sc(emqx_schema:duration_ms(), "0s", - "The interval for pushing statistics table records to the system topic. When set to 0, push is disabled" - "publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval" + "The interval for pushing statistics table records to the system topic. " + "publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval. " "publish is disabled if set to 0s." )} , {notice_qos,