diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 84281a500..9b31795f2 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1003,6 +1003,17 @@ handle_call({quota, Policy}, Channel) -> Quota = emqx_limiter:init(Zone, Policy), reply(ok, Channel#channel{quota = Quota}); +handle_call({keepalive, Interval}, Channel = #channel{keepalive = KeepAlive, + conninfo = ConnInfo}) -> + ClientId = info(clientid, Channel), + NKeepalive = emqx_keepalive:set(interval, Interval * 1000, KeepAlive), + NConnInfo = maps:put(keepalive, Interval, ConnInfo), + NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo}, + SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}), + ChanInfo1 = info(NChannel), + emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}), + reply(ok, reset_timer(alive_timer, NChannel)); + handle_call(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), reply(ignored, Channel). diff --git a/apps/emqx/src/emqx_keepalive.erl b/apps/emqx/src/emqx_keepalive.erl index 8fba00f50..7ec424d1d 100644 --- a/apps/emqx/src/emqx_keepalive.erl +++ b/apps/emqx/src/emqx_keepalive.erl @@ -20,8 +20,11 @@ , info/1 , info/2 , check/2 + , set/3 ]). +-elvis([{elvis_style, no_if_expression, disable}]). + -export_type([keepalive/0]). -record(keepalive, { @@ -49,7 +52,7 @@ info(#keepalive{interval = Interval, repeat => Repeat }. --spec(info(interval|statval|repeat, keepalive()) +-spec(info(interval | statval | repeat, keepalive()) -> non_neg_integer()). info(interval, #keepalive{interval = Interval}) -> Interval; @@ -71,3 +74,7 @@ check(NewVal, KeepAlive = #keepalive{statval = OldVal, true -> {error, timeout} end. +%% @doc Update keepalive's interval +-spec(set(interval, non_neg_integer(), keepalive()) -> keepalive()). +set(interval, Interval, KeepAlive) -> + KeepAlive#keepalive{interval = Interval}. diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index 89a44dfee..993ab3dc5 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -122,6 +122,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), ets:new(test, [named_table, public]), + ets:insert(test, {other_mfa_result, failed}), ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]), {M, F, A} = {?MODULE, failed_on_other_recover_after_retry, [erlang:whereis(?NODE1)]}, {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), @@ -129,6 +130,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> ct:pal("333:~p~n", [emqx_cluster_rpc:status()]), {atomic, [_Status|L]} = emqx_cluster_rpc:status(), ?assertEqual([], L), + ets:insert(test, {other_mfa_result, ok}), {ok, 2, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000), ct:sleep(1000), {atomic, NewStatus} = emqx_cluster_rpc:status(), @@ -239,12 +241,9 @@ failed_on_node_by_odd(Pid) -> end. failed_on_other_recover_after_retry(Pid) -> - Counter = ets:update_counter(test, counter, 1, {counter, 0}), case Pid =:= self() of true -> ok; false -> - case Counter < 4 of - true -> "MFA return not ok"; - false -> ok - end + [{_, Res}] = ets:lookup(test, other_mfa_result), + Res end. diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 720aa0a52..f82b881a8 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -17,6 +17,8 @@ -module(emqx_mgmt). -include("emqx_mgmt.hrl"). +-elvis([{elvis_style, invalid_dynamic_call, disable}]). +-elvis([{elvis_style, god_modules, disable}]). -include_lib("stdlib/include/qlc.hrl"). -include_lib("emqx/include/emqx.hrl"). @@ -51,6 +53,7 @@ , clean_authz_cache_all/1 , set_ratelimit_policy/2 , set_quota_policy/2 + , set_keepalive/2 ]). %% Internal funcs @@ -149,6 +152,7 @@ node_info(Node) when Node =:= node() -> memory_used => proplists:get_value(total, Memory), process_available => erlang:system_info(process_limit), process_used => erlang:system_info(process_count), + max_fds => proplists:get_value( max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))), connections => ets:info(emqx_channel, size), @@ -235,6 +239,7 @@ nodes_info_count(PropList) -> %%-------------------------------------------------------------------- lookup_client({clientid, ClientId}, FormatFun) -> + lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- mria_mnesia:running_nodes()]); @@ -300,6 +305,7 @@ clean_authz_cache(ClientId) -> Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()], check_results(Results). + clean_authz_cache(Node, ClientId) when Node =:= node() -> case emqx_cm:lookup_channels(ClientId) of [] -> @@ -330,6 +336,9 @@ set_ratelimit_policy(ClientId, Policy) -> set_quota_policy(ClientId, Policy) -> call_client(ClientId, {quota, Policy}). +set_keepalive(ClientId, Interval) -> + call_client(ClientId, {keepalive, Interval}). + %% @private call_client(ClientId, Req) -> Results = [call_client(Node, ClientId, Req) || Node <- mria_mnesia:running_nodes()], @@ -373,6 +382,7 @@ list_subscriptions_via_topic(Topic, FormatFun) -> lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- mria_mnesia:running_nodes()]). + list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() -> MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}], erlang:apply(M, F, [ets:select(emqx_suboption, MatchSpec)]); @@ -502,6 +512,7 @@ listener_id_filter(Id, Listeners) -> Filter = fun(#{id := Id0}) -> Id0 =:= Id end, lists:filter(Filter, Listeners). + -spec manage_listener( Operation :: start_listener | stop_listener | restart_listener @@ -576,6 +587,7 @@ add_duration_field([], _Now, Acc) -> Acc; add_duration_field([Alarm = #{activated := true, activate_at := ActivateAt} | Rest], Now, Acc) -> add_duration_field(Rest, Now, [Alarm#{duration => Now - ActivateAt} | Acc]); + add_duration_field( [Alarm = #{ activated := false , activate_at := ActivateAt , deactivate_at := DeactivateAt} | Rest] @@ -638,3 +650,4 @@ max_row_limit() -> ?MAX_ROW_LIMIT. table_size(Tab) -> ets:info(Tab, size). + diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index d95ab7e91..ff429a9a6 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -34,6 +34,7 @@ , subscribe/2 , unsubscribe/2 , subscribe_batch/2 + , set_keepalive/2 ]). -export([ query/4 @@ -82,7 +83,9 @@ apis() -> , clients_authz_cache_api() , clients_subscriptions_api() , subscribe_api() - , unsubscribe_api()]. + , unsubscribe_api() + , keepalive_api() + ]. schemas() -> Client = #{ @@ -435,6 +438,27 @@ subscribe_api() -> <<"200">> => emqx_mgmt_util:schema(<<"Subscribe ok">>)}}}, {"/clients/:clientid/subscribe", Metadata, subscribe}. +keepalive_api() -> + Metadata = #{ + put => #{ + description => <<"set the online client keepalive by second ">>, + parameters => [#{ + name => clientid, + in => path, + schema => #{type => string}, + required => true + }, + #{ + name => interval, + in => query, + schema => #{type => integer}, + required => true + } + ], + responses => #{ + <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>), + <<"200">> => emqx_mgmt_util:schema(<<"ok">>)}}}, + {"/clients/:clientid/keepalive", Metadata, set_keepalive}. %%%============================================================================================== %% parameters trans clients(get, #{query_string := Qs}) -> @@ -478,6 +502,17 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) -> end, Subs0), {200, Subs}. +set_keepalive(put, #{bindings := #{clientid := ClientID}, query_string := Query}) -> + case maps:find(<<"interval">>, Query) of + error -> {404, "Interval Not Found"}; + {ok, Interval0} -> + Interval = binary_to_integer(Interval0), + case emqx_mgmt:set_keepalive(emqx_mgmt_util:urldecode(ClientID), Interval) of + ok -> {200}; + {error, not_found} ->{404, ?CLIENT_ID_NOT_FOUND} + end + end. + %%%============================================================================================== %% api apply diff --git a/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl index 615445f66..691828ffd 100644 --- a/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl @@ -77,22 +77,27 @@ t_clients(_) -> ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse2), %% get /clients/:clientid/authz_cache should has no authz cache - Client1AuthzCachePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "authz_cache"]), + Client1AuthzCachePath = emqx_mgmt_api_test_util:api_path(["clients", + binary_to_list(ClientId1), "authz_cache"]), {ok, Client1AuthzCache} = emqx_mgmt_api_test_util:request_api(get, Client1AuthzCachePath), ?assertEqual("[]", Client1AuthzCache), %% post /clients/:clientid/subscribe SubscribeBody = #{topic => Topic, qos => Qos}, - SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "subscribe"]), - {ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath, "", AuthHeader, SubscribeBody), + SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", + binary_to_list(ClientId1), "subscribe"]), + {ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath, + "", AuthHeader, SubscribeBody), timer:sleep(100), [{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1), ?assertEqual(AfterSubTopic, Topic), ?assertEqual(AfterSubQos, Qos), %% post /clients/:clientid/unsubscribe - UnSubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "unsubscribe"]), - {ok, _} = emqx_mgmt_api_test_util:request_api(post, UnSubscribePath, "", AuthHeader, SubscribeBody), + UnSubscribePath = emqx_mgmt_api_test_util:api_path(["clients", + binary_to_list(ClientId1), "unsubscribe"]), + {ok, _} = emqx_mgmt_api_test_util:request_api(post, UnSubscribePath, + "", AuthHeader, SubscribeBody), timer:sleep(100), ?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)), @@ -123,7 +128,8 @@ t_query_clients_with_time(_) -> %% get /clients with time(rfc3339) NowTimeStampInt = erlang:system_time(millisecond), %% Do not uri_encode `=` to `%3D` - Rfc3339String = emqx_http_lib:uri_encode(binary:bin_to_list(emqx_mgmt_api_clients:unix_ts_to_rfc3339_bin(NowTimeStampInt))), + Rfc3339String = emqx_http_lib:uri_encode(binary:bin_to_list( + emqx_mgmt_api_clients:unix_ts_to_rfc3339_bin(NowTimeStampInt))), TimeStampString = emqx_http_lib:uri_encode(integer_to_list(NowTimeStampInt)), LteKeys = ["lte_created_at=", "lte_connected_at="], @@ -133,8 +139,10 @@ t_query_clients_with_time(_) -> GteParamRfc3339 = [Param ++ Rfc3339String || Param <- GteKeys], GteParamStamp = [Param ++ TimeStampString || Param <- GteKeys], - RequestResults = [emqx_mgmt_api_test_util:request_api(get, ClientsPath, Param, AuthHeader) - || Param <- LteParamRfc3339 ++ LteParamStamp ++ GteParamRfc3339 ++ GteParamStamp], + RequestResults = + [emqx_mgmt_api_test_util:request_api(get, ClientsPath, Param, AuthHeader) + || Param <- LteParamRfc3339 ++ LteParamStamp + ++ GteParamRfc3339 ++ GteParamStamp], DecodedResults = [emqx_json:decode(Response, [return_maps]) || {ok, Response} <- RequestResults], {LteResponseDecodeds, GteResponseDecodeds} = lists:split(4, DecodedResults), @@ -153,3 +161,22 @@ t_query_clients_with_time(_) -> Client2Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId2)]), {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client1Path), {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client2Path). + +t_keepalive(_Config) -> + Username = "user_keepalive", + ClientId = "client_keepalive", + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "keepalive"]), + Query = "interval=11", + {error,{"HTTP/1.1",404,"Not Found"}} = + emqx_mgmt_api_test_util:request_api(put, Path, Query, AuthHeader, <<"">>), + {ok, C1} = emqtt:start_link(#{username => Username, clientid => ClientId}), + {ok, _} = emqtt:connect(C1), + {ok, Ok} = emqx_mgmt_api_test_util:request_api(put, Path, Query, AuthHeader, <<"">>), + ?assertEqual("", Ok), + [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)), + State = sys:get_state(Pid), + ct:pal("~p~n", [State]), + ?assertEqual(11000, element(2, element(5, element(11, State)))), + emqtt:disconnect(C1), + ok.