diff --git a/.ci/build_packages/tests.sh b/.ci/build_packages/tests.sh index a6bb1d765..0ad17adb2 100755 --- a/.ci/build_packages/tests.sh +++ b/.ci/build_packages/tests.sh @@ -27,13 +27,6 @@ emqx_test(){ sed -i "/mqtt.max_topic_alias/c mqtt.max_topic_alias = 10" "${PACKAGE_PATH}"/emqx/etc/emqx.conf sed -i '/emqx_telemetry/d' "${PACKAGE_PATH}"/emqx/data/loaded_plugins - if echo "${EMQX_DEPS_DEFAULT_VSN#v}" | grep -qE "[0-9]+\.[0-9]+(\.[0-9]+)?-(alpha|beta|rc)\.[0-9]"; then - if [ ! -d "${PACKAGE_PATH}/emqx/lib/emqx-${EMQX_DEPS_DEFAULT_VSN#v}" ] || [ ! -d "${PACKAGE_PATH}/emqx/releases/${EMQX_DEPS_DEFAULT_VSN#v}" ] ;then - echo "emqx zip version error" - exit 1 - fi - fi - echo "running ${packagename} start" "${PACKAGE_PATH}"/emqx/bin/emqx start || tail "${PACKAGE_PATH}"/emqx/log/erlang.log.1 IDLE_TIME=0 @@ -103,13 +96,6 @@ emqx_test(){ } running_test(){ - if echo "${EMQX_DEPS_DEFAULT_VSN#v}" | grep -qE "[0-9]+\.[0-9]+(\.[0-9]+)?-(alpha|beta|rc)\.[0-9]"; then - if [ ! -d /usr/lib/emqx/lib/emqx-"${EMQX_DEPS_DEFAULT_VSN#v}" ] || [ ! -d /usr/lib/emqx/releases/"${EMQX_DEPS_DEFAULT_VSN#v}" ];then - echo "emqx package version error" - exit 1 - fi - fi - sed -i "/zone.external.server_keepalive/c zone.external.server_keepalive = 60" /etc/emqx/emqx.conf sed -i "/mqtt.max_topic_alias/c mqtt.max_topic_alias = 10" /etc/emqx/emqx.conf sed -i '/emqx_telemetry/d' /var/lib/emqx/loaded_plugins @@ -150,6 +136,7 @@ running_test(){ } relup_test(){ + TARGET_VERSION="$1" if [ -d "${RELUP_PACKAGE_PATH}" ];then cd "${RELUP_PACKAGE_PATH }" @@ -159,9 +146,9 @@ relup_test(){ ./emqx/bin/emqx start ./emqx/bin/emqx_ctl status ./emqx/bin/emqx versions - cp "${PACKAGE_PATH}/${EMQX_NAME}"-*-"${EMQX_DEPS_DEFAULT_VSN#v}-$(uname -m)".zip ./emqx/releases - ./emqx/bin/emqx install "${EMQX_DEPS_DEFAULT_VSN#v}" - [ "$(./emqx/bin/emqx versions |grep permanent | grep -oE "[0-9].[0-9].[0-9]")" = "${EMQX_DEPS_DEFAULT_VSN#v}" ] || exit 1 + cp "${PACKAGE_PATH}/${EMQX_NAME}"-*-"${TARGET_VERSION}-$(uname -m)".zip ./emqx/releases + ./emqx/bin/emqx install "${TARGET_VERSION}" + [ "$(./emqx/bin/emqx versions |grep permanent | grep -oE "[0-9].[0-9].[0-9]")" = "${TARGET_VERSION}" ] || exit 1 ./emqx/bin/emqx_ctl status ./emqx/bin/emqx stop rm -rf emqx @@ -171,4 +158,4 @@ relup_test(){ emqx_prepare emqx_test -relup_test +# relup_test diff --git a/deploy/packages/README.md b/deploy/packages/README.md deleted file mode 100644 index cdb3384fb..000000000 --- a/deploy/packages/README.md +++ /dev/null @@ -1,24 +0,0 @@ -emqx-packages -============= - -EMQ X RPM/Debian Packages - -NOTICE: Requires Erlang/OTP R21+ to build since 3.0 release. - -How to use ----------------------------- - -``` -cd project-root-directory-path -EMQX_DEPS_DEFAULT_VSN=${version} make emqx-pkg -``` - -License -------- - -Apache License Version 2.0 - -Author ------- - -EMQ X Team. diff --git a/lib-opensource/emqx_management/src/emqx_mgmt.erl b/lib-opensource/emqx_management/src/emqx_mgmt.erl index 4174f3224..26b55b249 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt.erl @@ -302,14 +302,20 @@ lookup_client({username, Username}, FormatFun) -> lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]). lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> - M:F(ets:lookup(emqx_channel, ClientId)); + lists:append(lists:map( + fun(Key) -> + lists:map(fun M:F/1, ets:lookup(emqx_channel_info, Key)) + end, ets:lookup(emqx_channel, ClientId))); lookup_client(Node, {clientid, ClientId}, FormatFun) -> rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]); lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() -> - MatchSpec = [{{'$1', #{clientinfo => #{username => '$2'}}, '_'}, [{'=:=','$2', Username}], ['$1']}], - M:F(ets:select(emqx_channel_info, MatchSpec)); + MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'} + , [{'=:=','$1', Username}] + , ['$_'] + }], + lists:map(fun M:F/1, ets:select(emqx_channel_info, MatchSpec)); lookup_client(Node, {username, Username}, FormatFun) -> rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). @@ -917,46 +923,6 @@ get_telemetry_data() -> %% Common Table API %%-------------------------------------------------------------------- -item(client, {ClientId, ChanPid}) -> - Attrs = case emqx_cm:get_chan_info(ClientId, ChanPid) of - undefined -> #{}; - Attrs0 -> Attrs0 - end, - Stats = case emqx_cm:get_chan_stats(ClientId, ChanPid) of - undefined -> #{}; - Stats0 -> maps:from_list(Stats0) - end, - ClientInfo = maps:get(clientinfo, Attrs, #{}), - ConnInfo = maps:get(conninfo, Attrs, #{}), - Session = case maps:get(session, Attrs, #{}) of - undefined -> #{}; - _Sess -> _Sess - end, - SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), - Connected = case maps:get(conn_state, Attrs) of - connected -> true; - _ -> false - end, - NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0), - max_inflight => maps:get(inflight_max, Stats, 0), - max_awaiting_rel => maps:get(awaiting_rel_max, Stats, 0), - max_mqueue => maps:get(mqueue_max, Stats, 0), - inflight => maps:get(inflight_cnt, Stats, 0), - awaiting_rel => maps:get(awaiting_rel_cnt, Stats, 0)}, - lists:foldl(fun(Items, Acc) -> - maps:merge(Items, Acc) - end, #{connected => Connected}, - [maps:with([ subscriptions_cnt, max_subscriptions, - inflight, max_inflight, awaiting_rel, - max_awaiting_rel, mqueue_len, mqueue_dropped, - max_mqueue, heap_size, reductions, mailbox_len, - recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt, - send_msg, send_oct, send_pkt], NStats), - maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo), - maps:with([clean_start, keepalive, expiry_interval, proto_name, - proto_ver, peername, connected_at, disconnected_at], ConnInfo), - #{created_at => SessCreated}]); - item(subscription, {{Topic, ClientId}, Options}) -> #{topic => Topic, clientid => ClientId, options => Options}; diff --git a/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl b/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl index db0884383..7a4e4b0b7 100644 --- a/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/lib-opensource/emqx_management/src/emqx_mgmt_api_clients.erl @@ -139,11 +139,11 @@ ]). -export([ query/3 - , format/1 + , format_channel_info/1 ]). -define(query_fun, {?MODULE, query}). --define(format_fun, {?MODULE, format}). +-define(format_fun, {?MODULE, format_channel_info}). list(Bindings, Params) when map_size(Bindings) == 0 -> return({ok, emqx_mgmt_api:cluster_query(Params, ?CLIENT_QS_SCHEMA, ?query_fun)}); @@ -249,10 +249,40 @@ parse_ratelimit_str(S) -> %%-------------------------------------------------------------------- %% Format -format(Items) when is_list(Items) -> - [format(Item) || Item <- Items]; -format(Key) when is_tuple(Key) -> - format(emqx_mgmt:item(client, Key)); +format_channel_info({_Key, Info, Stats0}) -> + Stats = maps:from_list(Stats0), + ClientInfo = maps:get(clientinfo, Info, #{}), + ConnInfo = maps:get(conninfo, Info, #{}), + Session = case maps:get(session, Info, #{}) of + undefined -> #{}; + _Sess -> _Sess + end, + SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)), + Connected = case maps:get(conn_state, Info, connected) of + connected -> true; + _ -> false + end, + NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0), + max_inflight => maps:get(inflight_max, Stats, 0), + max_awaiting_rel => maps:get(awaiting_rel_max, Stats, 0), + max_mqueue => maps:get(mqueue_max, Stats, 0), + inflight => maps:get(inflight_cnt, Stats, 0), + awaiting_rel => maps:get(awaiting_rel_cnt, Stats, 0)}, + format( + lists:foldl(fun(Items, Acc) -> + maps:merge(Items, Acc) + end, #{connected => Connected}, + [maps:with([ subscriptions_cnt, max_subscriptions, + inflight, max_inflight, awaiting_rel, + max_awaiting_rel, mqueue_len, mqueue_dropped, + max_mqueue, heap_size, reductions, mailbox_len, + recv_cnt, recv_msg, recv_oct, recv_pkt, send_cnt, + send_msg, send_oct, send_pkt], NStats), + maps:with([clientid, username, mountpoint, is_bridge, zone], ClientInfo), + maps:with([clean_start, keepalive, expiry_interval, proto_name, + proto_ver, peername, connected_at, disconnected_at], ConnInfo), + #{created_at => SessCreated}])). + format(Data) when is_map(Data)-> {IpAddr, Port} = maps:get(peername, Data), ConnectedAt = maps:get(connected_at, Data), @@ -279,13 +309,13 @@ format_acl_cache({{PubSub, Topic}, {AclResult, Timestamp}}) -> %%-------------------------------------------------------------------- query({Qs, []}, Start, Limit) -> - Ms = qs2ms_k(Qs), - emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format/1); + Ms = qs2ms(Qs), + emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format_channel_info/1); query({Qs, Fuzzy}, Start, Limit) -> Ms = qs2ms(Qs), MatchFun = match_fun(Ms, Fuzzy), - emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format/1). + emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format_channel_info/1). %%-------------------------------------------------------------------- %% Match funcs @@ -300,11 +330,9 @@ match_fun(Ms, Fuzzy) -> case ets:match_spec_run(Rows, MsC) of [] -> []; Ls -> - lists:filtermap(fun(E) -> - case run_fuzzy_match(E, REFuzzy) of - false -> false; - true -> {true, element(1, E)} - end end, Ls) + lists:filter(fun(E) -> + run_fuzzy_match(E, REFuzzy) + end, Ls) end end. @@ -325,10 +353,6 @@ qs2ms(Qs) -> {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), [{{'$1', MtchHead, '_'}, Conds, ['$_']}]. -qs2ms_k(Qs) -> - {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), - [{{'$1', MtchHead, '_'}, Conds, ['$1']}]. - qs2ms([], _, {MtchHead, Conds}) -> {MtchHead, lists:reverse(Conds)}; @@ -413,7 +437,6 @@ params2qs_test() -> ?assertEqual(ExpectedMtchHead, MtchHead), ?assertEqual(ExpectedCondi, Condi), - [{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]), - [{{'$1', #{}, '_'}, [], ['$1']}] = qs2ms_k([]). + [{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]). -endif.