From 99c9374a9ce5edab421ca78e039e9efe5a4b1f9a Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Fri, 12 Feb 2021 14:47:22 +0100 Subject: [PATCH 1/4] chore(ci): delete usage of EMQX_DEPS_DEFAULT_VSN EMQX_DEPS_DEFAULT_VSN is the old package version number used in emqx-rel project. We no longer use it in emqx.git umbrella project --- .ci/build_packages/tests.sh | 23 +++++------------------ deploy/packages/README.md | 24 ------------------------ 2 files changed, 5 insertions(+), 42 deletions(-) delete mode 100644 deploy/packages/README.md diff --git a/.ci/build_packages/tests.sh b/.ci/build_packages/tests.sh index a6bb1d765..0ad17adb2 100755 --- a/.ci/build_packages/tests.sh +++ b/.ci/build_packages/tests.sh @@ -27,13 +27,6 @@ emqx_test(){ sed -i "/mqtt.max_topic_alias/c mqtt.max_topic_alias = 10" "${PACKAGE_PATH}"/emqx/etc/emqx.conf sed -i '/emqx_telemetry/d' "${PACKAGE_PATH}"/emqx/data/loaded_plugins - if echo "${EMQX_DEPS_DEFAULT_VSN#v}" | grep -qE "[0-9]+\.[0-9]+(\.[0-9]+)?-(alpha|beta|rc)\.[0-9]"; then - if [ ! -d "${PACKAGE_PATH}/emqx/lib/emqx-${EMQX_DEPS_DEFAULT_VSN#v}" ] || [ ! -d "${PACKAGE_PATH}/emqx/releases/${EMQX_DEPS_DEFAULT_VSN#v}" ] ;then - echo "emqx zip version error" - exit 1 - fi - fi - echo "running ${packagename} start" "${PACKAGE_PATH}"/emqx/bin/emqx start || tail "${PACKAGE_PATH}"/emqx/log/erlang.log.1 IDLE_TIME=0 @@ -103,13 +96,6 @@ emqx_test(){ } running_test(){ - if echo "${EMQX_DEPS_DEFAULT_VSN#v}" | grep -qE "[0-9]+\.[0-9]+(\.[0-9]+)?-(alpha|beta|rc)\.[0-9]"; then - if [ ! -d /usr/lib/emqx/lib/emqx-"${EMQX_DEPS_DEFAULT_VSN#v}" ] || [ ! -d /usr/lib/emqx/releases/"${EMQX_DEPS_DEFAULT_VSN#v}" ];then - echo "emqx package version error" - exit 1 - fi - fi - sed -i "/zone.external.server_keepalive/c zone.external.server_keepalive = 60" /etc/emqx/emqx.conf sed -i "/mqtt.max_topic_alias/c mqtt.max_topic_alias = 10" /etc/emqx/emqx.conf sed -i '/emqx_telemetry/d' /var/lib/emqx/loaded_plugins @@ -150,6 +136,7 @@ running_test(){ } relup_test(){ + TARGET_VERSION="$1" if [ -d "${RELUP_PACKAGE_PATH}" ];then cd "${RELUP_PACKAGE_PATH }" @@ -159,9 +146,9 @@ relup_test(){ ./emqx/bin/emqx start ./emqx/bin/emqx_ctl status ./emqx/bin/emqx versions - cp "${PACKAGE_PATH}/${EMQX_NAME}"-*-"${EMQX_DEPS_DEFAULT_VSN#v}-$(uname -m)".zip ./emqx/releases - ./emqx/bin/emqx install "${EMQX_DEPS_DEFAULT_VSN#v}" - [ "$(./emqx/bin/emqx versions |grep permanent | grep -oE "[0-9].[0-9].[0-9]")" = "${EMQX_DEPS_DEFAULT_VSN#v}" ] || exit 1 + cp "${PACKAGE_PATH}/${EMQX_NAME}"-*-"${TARGET_VERSION}-$(uname -m)".zip ./emqx/releases + ./emqx/bin/emqx install "${TARGET_VERSION}" + [ "$(./emqx/bin/emqx versions |grep permanent | grep -oE "[0-9].[0-9].[0-9]")" = "${TARGET_VERSION}" ] || exit 1 ./emqx/bin/emqx_ctl status ./emqx/bin/emqx stop rm -rf emqx @@ -171,4 +158,4 @@ relup_test(){ emqx_prepare emqx_test -relup_test +# relup_test diff --git a/deploy/packages/README.md b/deploy/packages/README.md deleted file mode 100644 index cdb3384fb..000000000 --- a/deploy/packages/README.md +++ /dev/null @@ -1,24 +0,0 @@ -emqx-packages -============= - -EMQ X RPM/Debian Packages - -NOTICE: Requires Erlang/OTP R21+ to build since 3.0 release. - -How to use ----------------------------- - -``` -cd project-root-directory-path -EMQX_DEPS_DEFAULT_VSN=${version} make emqx-pkg -``` - -License -------- - -Apache License Version 2.0 - -Author ------- - -EMQ X Team. From 9b3a6aa635bd3f10a37b28183b22cf035bdcabe3 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 1 Feb 2021 11:13:47 +0800 Subject: [PATCH 2/4] fix(mgmt): fix client formating crash see: https://github.com/emqx/emqx/issues/3868 --- lib-opensource/emqx_management/src/emqx_mgmt.erl | 4 ++-- .../emqx_management/src/emqx_mgmt_api_clients.erl | 10 +++++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/lib-opensource/emqx_management/src/emqx_mgmt.erl b/lib-opensource/emqx_management/src/emqx_mgmt.erl index 4174f3224..d4f75ca94 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt.erl @@ -919,7 +919,7 @@ get_telemetry_data() -> item(client, {ClientId, ChanPid}) -> Attrs = case emqx_cm:get_chan_info(ClientId, ChanPid) of - undefined -> #{}; + undefined -> throw(gone); Attrs0 -> Attrs0 end, Stats = case emqx_cm:get_chan_stats(ClientId, ChanPid) of @@ -933,7 +933,7 @@ item(client, {ClientId, ChanPid}) -> _Sess -> _Sess end, SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), - Connected = case maps:get(conn_state, Attrs) of + Connected = case maps:get(conn_state, Attrs, connected) of connected -> true; _ -> false end, diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl b/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl index db0884383..e9cd83139 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl @@ -250,7 +250,15 @@ parse_ratelimit_str(S) -> %% Format format(Items) when is_list(Items) -> - [format(Item) || Item <- Items]; + lists:foldr( + fun(Item, Acc) -> + try + [format(Item) | Acc] + catch + throw:gone:_Stk -> + Acc + end + end, [], Items); format(Key) when is_tuple(Key) -> format(emqx_mgmt:item(client, Key)); format(Data) when is_map(Data)-> From fcbf2539bcaeb34e28eb889af146362270a442bc Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 12 Feb 2021 16:32:39 +0800 Subject: [PATCH 3/4] refactor(mgmt): move the fomating codes to emqx_mgmt_api_clients.erl --- .../emqx_management/src/emqx_mgmt.erl | 44 +---------- .../src/emqx_mgmt_api_clients.erl | 75 ++++++++++++------- 2 files changed, 49 insertions(+), 70 deletions(-) diff --git a/lib-opensource/emqx_management/src/emqx_mgmt.erl b/lib-opensource/emqx_management/src/emqx_mgmt.erl index d4f75ca94..aebb43f69 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt.erl @@ -302,14 +302,14 @@ lookup_client({username, Username}, FormatFun) -> lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]). lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> - M:F(ets:lookup(emqx_channel, ClientId)); + lists:map(fun(E) -> M:F(E) end, ets:lookup(emqx_channel, ClientId)); lookup_client(Node, {clientid, ClientId}, FormatFun) -> rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]); lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() -> MatchSpec = [{{'$1', #{clientinfo => #{username => '$2'}}, '_'}, [{'=:=','$2', Username}], ['$1']}], - M:F(ets:select(emqx_channel_info, MatchSpec)); + lists:map(fun(E) -> M:F(E) end, ets:select(emqx_channel_info, MatchSpec)); lookup_client(Node, {username, Username}, FormatFun) -> rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). @@ -917,46 +917,6 @@ get_telemetry_data() -> %% Common Table API %%-------------------------------------------------------------------- -item(client, {ClientId, ChanPid}) -> - Attrs = case emqx_cm:get_chan_info(ClientId, ChanPid) of - undefined -> throw(gone); - Attrs0 -> Attrs0 - end, - Stats = case emqx_cm:get_chan_stats(ClientId, ChanPid) of - undefined -> #{}; - Stats0 -> maps:from_list(Stats0) - end, - ClientInfo = maps:get(clientinfo, Attrs, #{}), - ConnInfo = maps:get(conninfo, Attrs, #{}), - Session = case maps:get(session, Attrs, #{}) of - undefined -> #{}; - _Sess -> _Sess - end, - SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), - Connected = case maps:get(conn_state, Attrs, connected) of - connected -> true; - _ -> false - end, - NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0), - max_inflight => maps:get(inflight_max, Stats, 0), - max_awaiting_rel => maps:get(awaiting_rel_max, Stats, 0), - max_mqueue => maps:get(mqueue_max, Stats, 0), - inflight => maps:get(inflight_cnt, Stats, 0), - awaiting_rel => maps:get(awaiting_rel_cnt, Stats, 0)}, - lists:foldl(fun(Items, Acc) -> - maps:merge(Items, Acc) - end, #{connected => Connected}, - [maps:with([ subscriptions_cnt, max_subscriptions, - inflight, max_inflight, awaiting_rel, - max_awaiting_rel, mqueue_len, mqueue_dropped, - max_mqueue, heap_size, reductions, mailbox_len, - recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt, - send_msg, send_oct, send_pkt], NStats), - maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo), - maps:with([clean_start, keepalive, expiry_interval, proto_name, - proto_ver, peername, connected_at, disconnected_at], ConnInfo), - #{created_at => SessCreated}]); - item(subscription, {{Topic, ClientId}, Options}) -> #{topic => Topic, clientid => ClientId, options => Options}; diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl b/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl index e9cd83139..18420cfb4 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl @@ -139,11 +139,11 @@ ]). -export([ query/3 - , format/1 + , format_channel_info/1 ]). -define(query_fun, {?MODULE, query}). --define(format_fun, {?MODULE, format}). +-define(format_fun, {?MODULE, format_channel_info}). list(Bindings, Params) when map_size(Bindings) == 0 -> return({ok, emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun)}); @@ -249,18 +249,44 @@ parse_ratelimit_str(S) -> %%-------------------------------------------------------------------- %% Format -format(Items) when is_list(Items) -> - lists:foldr( - fun(Item, Acc) -> - try - [format(Item) | Acc] - catch - throw:gone:_Stk -> - Acc - end - end, [], Items); -format(Key) when is_tuple(Key) -> - format(emqx_mgmt:item(client, Key)); +format_channel_info(Key = {_ClientId, _Pid}) -> + [E] = ets:lookup(emqx_channel_info, Key), + format_channel_info(E); + +format_channel_info({_Key, Info, Stats0}) -> + Stats = maps:from_list(Stats0), + ClientInfo = maps:get(clientinfo, Info, #{}), + ConnInfo = maps:get(conninfo, Info, #{}), + Session = case maps:get(session, Info, #{}) of + undefined -> #{}; + _Sess -> _Sess + end, + SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), + Connected = case maps:get(conn_state, Info, connected) of + connected -> true; + _ -> false + end, + NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0), + max_inflight => maps:get(inflight_max, Stats, 0), + max_awaiting_rel => maps:get(awaiting_rel_max, Stats, 0), + max_mqueue => maps:get(mqueue_max, Stats, 0), + inflight => maps:get(inflight_cnt, Stats, 0), + awaiting_rel => maps:get(awaiting_rel_cnt, Stats, 0)}, + format( + lists:foldl(fun(Items, Acc) -> + maps:merge(Items, Acc) + end, #{connected => Connected}, + [maps:with([ subscriptions_cnt, max_subscriptions, + inflight, max_inflight, awaiting_rel, + max_awaiting_rel, mqueue_len, mqueue_dropped, + max_mqueue, heap_size, reductions, mailbox_len, + recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt, + send_msg, send_oct, send_pkt], NStats), + maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo), + maps:with([clean_start, keepalive, expiry_interval, proto_name, + proto_ver, peername, connected_at, disconnected_at], ConnInfo), + #{created_at => SessCreated}])). + format(Data) when is_map(Data)-> {IpAddr, Port} = maps:get(peername, Data), ConnectedAt = maps:get(connected_at, Data), @@ -287,13 +313,13 @@ format_acl_cache({{PubSub, Topic}, {AclResult, Timestamp}}) -> %%-------------------------------------------------------------------- query({Qs, []}, Start, Limit) -> - Ms = qs2ms_k(Qs), - emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format/1); + Ms = qs2ms(Qs), + emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format_channel_info/1); query({Qs, Fuzzy}, Start, Limit) -> Ms = qs2ms(Qs), MatchFun = match_fun(Ms, Fuzzy), - emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format/1). + emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format_channel_info/1). %%-------------------------------------------------------------------- %% Match funcs @@ -308,11 +334,9 @@ match_fun(Ms, Fuzzy) -> case ets:match_spec_run(Rows, MsC) of [] -> []; Ls -> - lists:filtermap(fun(E) -> - case run_fuzzy_match(E, REFuzzy) of - false -> false; - true -> {true, element(1, E)} - end end, Ls) + lists:filter(fun(E) -> + run_fuzzy_match(E, REFuzzy) + end, Ls) end end. @@ -333,10 +357,6 @@ qs2ms(Qs) -> {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), [{{'$1', MtchHead, '_'}, Conds, ['$_']}]. -qs2ms_k(Qs) -> - {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), - [{{'$1', MtchHead, '_'}, Conds, ['$1']}]. - qs2ms([], _, {MtchHead, Conds}) -> {MtchHead, lists:reverse(Conds)}; @@ -421,7 +441,6 @@ params2qs_test() -> ?assertEqual(ExpectedMtchHead, MtchHead), ?assertEqual(ExpectedCondi, Condi), - [{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]), - [{{'$1', #{}, '_'}, [], ['$1']}] = qs2ms_k([]). + [{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]). -endif. From 56a75d0d473aaccac766b48751ae99ecb107fa4e Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sat, 13 Feb 2021 10:09:14 +0800 Subject: [PATCH 4/4] fix(mgmt): make the format function clearer Co-authored-by: Zaiming Shi --- lib-opensource/emqx_management/src/emqx_mgmt.erl | 12 +++++++++--- .../emqx_management/src/emqx_mgmt_api_clients.erl | 4 ---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/lib-opensource/emqx_management/src/emqx_mgmt.erl b/lib-opensource/emqx_management/src/emqx_mgmt.erl index aebb43f69..26b55b249 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt.erl @@ -302,14 +302,20 @@ lookup_client({username, Username}, FormatFun) -> lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]). lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> - lists:map(fun(E) -> M:F(E) end, ets:lookup(emqx_channel, ClientId)); + lists:append(lists:map( + fun(Key) -> + lists:map(fun M:F/1, ets:lookup(emqx_channel_info, Key)) + end, ets:lookup(emqx_channel, ClientId))); lookup_client(Node, {clientid, ClientId}, FormatFun) -> rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]); lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() -> - MatchSpec = [{{'$1', #{clientinfo => #{username => '$2'}}, '_'}, [{'=:=','$2', Username}], ['$1']}], - lists:map(fun(E) -> M:F(E) end, ets:select(emqx_channel_info, MatchSpec)); + MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'} + , [{'=:=','$1', Username}] + , ['$_'] + }], + lists:map(fun M:F/1, ets:select(emqx_channel_info, MatchSpec)); lookup_client(Node, {username, Username}, FormatFun) -> rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl b/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl index 18420cfb4..7a4e4b0b7 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl @@ -249,10 +249,6 @@ parse_ratelimit_str(S) -> %%-------------------------------------------------------------------- %% Format -format_channel_info(Key = {_ClientId, _Pid}) -> - [E] = ets:lookup(emqx_channel_info, Key), - format_channel_info(E); - format_channel_info({_Key, Info, Stats0}) -> Stats = maps:from_list(Stats0), ClientInfo = maps:get(clientinfo, Info, #{}),