Merge remote-tracking branch 'origin/main-v4.3' into release-v43
This commit is contained in:
commit
dd95a26270
|
@ -46,11 +46,11 @@ check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl := ACLParams = #{path :=
|
||||||
{ok, 200, <<"ignore">>} -> ok;
|
{ok, 200, <<"ignore">>} -> ok;
|
||||||
{ok, 200, _Body} -> {stop, allow};
|
{ok, 200, _Body} -> {stop, allow};
|
||||||
{ok, Code, _Body} ->
|
{ok, Code, _Body} ->
|
||||||
?LOG(error, "Deny ~s to topic ~ts, username: ~ts, http response code: ~p",
|
?LOG(warning, "Deny ~s to topic ~ts, username: ~ts, http response code: ~p",
|
||||||
[PubSub, Topic, Username, Code]),
|
[PubSub, Topic, Username, Code]),
|
||||||
{stop, deny};
|
{stop, deny};
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
?LOG(error, "Deny ~s to topic ~ts, username: ~ts, due to request "
|
?LOG(warning, "Deny ~s to topic ~ts, username: ~ts, due to request "
|
||||||
"http server failure, path: ~p, error: ~0p",
|
"http server failure, path: ~p, error: ~0p",
|
||||||
[PubSub, Topic, Username, Path, Error]),
|
[PubSub, Topic, Username, Path, Error]),
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -47,15 +47,14 @@ check(ClientInfo, AuthResult, #{auth := AuthParms = #{path := Path},
|
||||||
anonymous => false,
|
anonymous => false,
|
||||||
mountpoint => mountpoint(Body, ClientInfo)}};
|
mountpoint => mountpoint(Body, ClientInfo)}};
|
||||||
{ok, Code, _Body} ->
|
{ok, Code, _Body} ->
|
||||||
?LOG(error, "Deny connection from path: ~s, username: ~ts, http "
|
?LOG(warning, "Deny connection from path: ~s, username: ~ts, http "
|
||||||
"response code: ~p",
|
"response code: ~p",
|
||||||
[Path, Username, Code]),
|
[Path, Username, Code]),
|
||||||
{stop, AuthResult#{auth_result => http_to_connack_error(Code),
|
{stop, AuthResult#{auth_result => http_to_connack_error(Code),
|
||||||
anonymous => false}};
|
anonymous => false}};
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
?LOG_SENSITIVE(error, "Deny connection from path: ~s, username: ~ts, due to "
|
?LOG_SENSITIVE(warning, "Deny connection from path: ~s, username: ~ts, due to "
|
||||||
"request http-server failed: ~0p",
|
"request http-server failed: ~0p", [Path, Username, Error]),
|
||||||
[Path, Username, Error]),
|
|
||||||
%%FIXME later: server_unavailable is not right.
|
%%FIXME later: server_unavailable is not right.
|
||||||
{stop, AuthResult#{auth_result => server_unavailable,
|
{stop, AuthResult#{auth_result => server_unavailable,
|
||||||
anonymous => false}}
|
anonymous => false}}
|
||||||
|
@ -89,9 +88,12 @@ is_superuser(SuperParams =
|
||||||
timeout := Timeout}, ClientInfo) ->
|
timeout := Timeout}, ClientInfo) ->
|
||||||
Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES),
|
Retry = maps:get(retry_times, SuperParams, ?DEFAULT_RETRY_TIMES),
|
||||||
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of
|
case request(PoolName, Method, Path, Headers, feedvar(Params, ClientInfo), Timeout, Retry) of
|
||||||
{ok, 200, _Body} -> true;
|
{ok, 200, _Body} ->
|
||||||
{ok, _Code, _Body} -> false;
|
true;
|
||||||
{error, Error} -> ?LOG_SENSITIVE(error, "Request superuser path ~s, error: ~p", [Path, Error]),
|
{ok, _Code, _Body} ->
|
||||||
|
false;
|
||||||
|
{error, Error} ->
|
||||||
|
?LOG_SENSITIVE(warning, "Request superuser path ~s, error: ~p", [Path, Error]),
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -250,7 +250,8 @@ do_create_rule2(ParsedParams) ->
|
||||||
return({error, 400, ?ERR_BADARGS(Reason)})
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
update_rule(#{id := Id}, Params) ->
|
update_rule(#{id := Id0}, Params) ->
|
||||||
|
Id = urldecode(Id0),
|
||||||
case parse_rule_params(Params, #{id => Id}) of
|
case parse_rule_params(Params, #{id => Id}) of
|
||||||
{ok, ParsedParams} ->
|
{ok, ParsedParams} ->
|
||||||
case emqx_rule_engine:update_rule(ParsedParams) of
|
case emqx_rule_engine:update_rule(ParsedParams) of
|
||||||
|
@ -275,16 +276,21 @@ list_rules(_Bindings, Params) ->
|
||||||
return_all(emqx_rule_registry:get_rules_ordered_by_ts())
|
return_all(emqx_rule_registry:get_rules_ordered_by_ts())
|
||||||
end.
|
end.
|
||||||
|
|
||||||
show_rule(#{id := Id}, _Params) ->
|
show_rule(#{id := Id0}, _Params) ->
|
||||||
|
Id = urldecode(Id0),
|
||||||
reply_with(fun emqx_rule_registry:get_rule/1, Id).
|
reply_with(fun emqx_rule_registry:get_rule/1, Id).
|
||||||
|
|
||||||
delete_rule(#{id := Id}, _Params) ->
|
delete_rule(#{id := Id0}, _Params) ->
|
||||||
|
Id = urldecode(Id0),
|
||||||
ok = emqx_rule_engine:delete_rule(Id),
|
ok = emqx_rule_engine:delete_rule(Id),
|
||||||
return(ok).
|
return(ok).
|
||||||
|
|
||||||
reset_metrics_local(Id) -> emqx_rule_metrics:reset_metrics(Id).
|
reset_metrics_local(Id0) ->
|
||||||
|
Id = urldecode(Id0),
|
||||||
|
emqx_rule_metrics:reset_metrics(Id).
|
||||||
|
|
||||||
reset_metrics(#{id := Id}, _Params) ->
|
reset_metrics(#{id := Id0}, _Params) ->
|
||||||
|
Id = urldecode(Id0),
|
||||||
_ = ?CLUSTER_CALL(reset_metrics_local, [Id]),
|
_ = ?CLUSTER_CALL(reset_metrics_local, [Id]),
|
||||||
return(ok).
|
return(ok).
|
||||||
|
|
||||||
|
@ -350,7 +356,8 @@ list_resources(#{}, _Params) ->
|
||||||
list_resources_by_type(#{type := Type}, _Params) ->
|
list_resources_by_type(#{type := Type}, _Params) ->
|
||||||
return_all(emqx_rule_registry:get_resources_by_type(Type)).
|
return_all(emqx_rule_registry:get_resources_by_type(Type)).
|
||||||
|
|
||||||
show_resource(#{id := Id}, _Params) ->
|
show_resource(#{id := Id0}, _Params) ->
|
||||||
|
Id = urldecode(Id0),
|
||||||
case emqx_rule_registry:find_resource(Id) of
|
case emqx_rule_registry:find_resource(Id) of
|
||||||
{ok, R} ->
|
{ok, R} ->
|
||||||
StatusFun =
|
StatusFun =
|
||||||
|
@ -366,7 +373,8 @@ show_resource(#{id := Id}, _Params) ->
|
||||||
return({error, 404, <<"Not Found">>})
|
return({error, 404, <<"Not Found">>})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_resource_status(#{id := Id}, _Params) ->
|
get_resource_status(#{id := Id0}, _Params) ->
|
||||||
|
Id = urldecode(Id0),
|
||||||
case emqx_rule_engine:get_resource_status(Id) of
|
case emqx_rule_engine:get_resource_status(Id) of
|
||||||
{ok, Status} ->
|
{ok, Status} ->
|
||||||
return({ok, Status});
|
return({ok, Status});
|
||||||
|
@ -374,7 +382,8 @@ get_resource_status(#{id := Id}, _Params) ->
|
||||||
return({error, 400, ?ERR_NO_RESOURCE(Id)})
|
return({error, 400, ?ERR_NO_RESOURCE(Id)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
start_resource(#{id := Id}, _Params) ->
|
start_resource(#{id := Id0}, _Params) ->
|
||||||
|
Id = urldecode(Id0),
|
||||||
case emqx_rule_engine:start_resource(Id) of
|
case emqx_rule_engine:start_resource(Id) of
|
||||||
ok ->
|
ok ->
|
||||||
return(ok);
|
return(ok);
|
||||||
|
@ -385,7 +394,8 @@ start_resource(#{id := Id}, _Params) ->
|
||||||
return({error, 400, ?ERR_BADARGS(Reason)})
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
update_resource(#{id := Id}, NewParams) ->
|
update_resource(#{id := Id0}, NewParams) ->
|
||||||
|
Id = urldecode(Id0),
|
||||||
P1 = case proplists:get_value(<<"description">>, NewParams) of
|
P1 = case proplists:get_value(<<"description">>, NewParams) of
|
||||||
undefined -> #{};
|
undefined -> #{};
|
||||||
Value -> #{<<"description">> => Value}
|
Value -> #{<<"description">> => Value}
|
||||||
|
@ -409,7 +419,8 @@ update_resource(#{id := Id}, NewParams) ->
|
||||||
return({error, 400, ?ERR_BADARGS(Reason)})
|
return({error, 400, ?ERR_BADARGS(Reason)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
delete_resource(#{id := Id}, _Params) ->
|
delete_resource(#{id := Id0}, _Params) ->
|
||||||
|
Id = urldecode(Id0),
|
||||||
case emqx_rule_engine:delete_resource(Id) of
|
case emqx_rule_engine:delete_resource(Id) of
|
||||||
ok -> return(ok);
|
ok -> return(ok);
|
||||||
{error, not_found} -> return(ok);
|
{error, not_found} -> return(ok);
|
||||||
|
@ -660,3 +671,6 @@ run_fuzzy_match(E = #rule{for = Topics}, [{for, like, Pattern}|Fuzzy]) ->
|
||||||
lists:any(fun(For) -> binary:match(For, Pattern) /= nomatch end, Topics)
|
lists:any(fun(For) -> binary:match(For, Pattern) /= nomatch end, Topics)
|
||||||
andalso run_fuzzy_match(E, Fuzzy);
|
andalso run_fuzzy_match(E, Fuzzy);
|
||||||
run_fuzzy_match(_E, [{_Key, like, _SubStr}| _Fuzzy]) -> false.
|
run_fuzzy_match(_E, [{_Key, like, _SubStr}| _Fuzzy]) -> false.
|
||||||
|
|
||||||
|
urldecode(S) ->
|
||||||
|
emqx_http_lib:uri_decode(S).
|
||||||
|
|
|
@ -28,6 +28,14 @@
|
||||||
-include("emqx_rule_test.hrl").
|
-include("emqx_rule_test.hrl").
|
||||||
-import(emqx_rule_test_lib, [make_simple_resource_type/1]).
|
-import(emqx_rule_test_lib, [make_simple_resource_type/1]).
|
||||||
|
|
||||||
|
%% API request funcs
|
||||||
|
-import(emqx_rule_test_lib,
|
||||||
|
[ request_api/4
|
||||||
|
, request_api/5
|
||||||
|
, auth_header_/0
|
||||||
|
, api_path/1
|
||||||
|
]).
|
||||||
|
|
||||||
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
|
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
|
@ -62,6 +70,7 @@ groups() ->
|
||||||
]},
|
]},
|
||||||
{api, [],
|
{api, [],
|
||||||
[t_crud_rule_api,
|
[t_crud_rule_api,
|
||||||
|
t_rule_api_unicode_ids,
|
||||||
t_list_actions_api,
|
t_list_actions_api,
|
||||||
t_show_action_api,
|
t_show_action_api,
|
||||||
t_crud_resources_api,
|
t_crud_resources_api,
|
||||||
|
@ -227,6 +236,10 @@ init_per_testcase(Test, Config)
|
||||||
{conn_event, TriggerConnEvent},
|
{conn_event, TriggerConnEvent},
|
||||||
{connsql, SQL}
|
{connsql, SQL}
|
||||||
| Config];
|
| Config];
|
||||||
|
init_per_testcase(t_rule_api_unicode_ids, Config) ->
|
||||||
|
ok = emqx_dashboard_admin:mnesia(boot),
|
||||||
|
emqx_ct_helpers:start_apps([emqx_management, emqx_dashboard]),
|
||||||
|
Config;
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
ok = emqx_rule_registry:register_resource_types(
|
ok = emqx_rule_registry:register_resource_types(
|
||||||
[make_simple_debug_resource_type()]),
|
[make_simple_debug_resource_type()]),
|
||||||
|
@ -249,6 +262,10 @@ end_per_testcase(Test, Config)
|
||||||
emqtt:stop(?config(subclient, Config)),
|
emqtt:stop(?config(subclient, Config)),
|
||||||
emqtt:stop(?config(connclient, Config)),
|
emqtt:stop(?config(connclient, Config)),
|
||||||
Config;
|
Config;
|
||||||
|
end_per_testcase(t_rule_api_unicode_ids, _Config) ->
|
||||||
|
application:stop(emqx_dashboard),
|
||||||
|
application:stop(emqx_management),
|
||||||
|
ok;
|
||||||
end_per_testcase(_TestCase, _Config) ->
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -523,6 +540,46 @@ t_crud_rule_api(_Config) ->
|
||||||
?assertMatch({ok, #{code := 404, message := _Message}}, NotFound),
|
?assertMatch({ok, #{code := 404, message := _Message}}, NotFound),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
-define(PRED(Elem), fun(Elem) -> true; (_) -> false end).
|
||||||
|
|
||||||
|
t_rule_api_unicode_ids(_Config) ->
|
||||||
|
UData =
|
||||||
|
fun(Description) ->
|
||||||
|
#{<<"name">> => <<"debug-rule">>,
|
||||||
|
<<"rawsql">> => <<"select * from \"t/a\"">>,
|
||||||
|
<<"actions">> => [#{<<"name">> => <<"do_nothing">>,
|
||||||
|
<<"params">> => []}
|
||||||
|
],
|
||||||
|
<<"description">> => Description}
|
||||||
|
end,
|
||||||
|
CData = fun(Id, Description) -> maps:put(<<"id">>, Id, UData(Description)) end,
|
||||||
|
|
||||||
|
CDes = <<"Creating rules description">>,
|
||||||
|
UDes = <<"Updating rules description">>,
|
||||||
|
|
||||||
|
%% create rule
|
||||||
|
CFun = fun(Id) -> {ok, Return} = request_api(post, api_path(["rules"]), [], auth_header_(), CData(Id, CDes)), Return end,
|
||||||
|
%% update rule
|
||||||
|
UFun = fun(Id) -> {ok, Return} = request_api(put, api_path(["rules", cow_uri:urlencode(Id)]), [], auth_header_(), UData(UDes)), Return end,
|
||||||
|
%% show rule
|
||||||
|
SFun = fun(Id) -> {ok, Return} = request_api(get, api_path(["rules", cow_uri:urlencode(Id)]), [], auth_header_()), Return end,
|
||||||
|
%% delete rule
|
||||||
|
DFun = fun(Id) -> {ok, Return} = request_api(delete, api_path(["rules", cow_uri:urlencode(Id)]), [], auth_header_()), Return end,
|
||||||
|
|
||||||
|
Ids = [unicode:characters_to_binary([Char]) || Char <- lists:seq(0, 1000) -- [46]] ++ [<<"%2e">>],
|
||||||
|
|
||||||
|
Ress = [begin
|
||||||
|
{?assertMatch(#{<<"code">> := 0, <<"data">> := #{<<"description">> := CDes}}, decode_to_map(CFun(Id))),
|
||||||
|
?assertMatch(#{<<"code">> := 0}, decode_to_map(UFun(Id))),
|
||||||
|
?assertMatch(#{<<"code">> := 0, <<"data">> := #{<<"description">> := UDes}}, decode_to_map(SFun(Id))),
|
||||||
|
?assertMatch(#{<<"code">> := 0}, decode_to_map(DFun(Id)))}
|
||||||
|
end || Id <- Ids],
|
||||||
|
|
||||||
|
?assertEqual(true, lists:all(?PRED(true), [?PRED({ok, ok, ok, ok})(Res) || Res <- Ress ])).
|
||||||
|
|
||||||
|
decode_to_map(ResponseBody) ->
|
||||||
|
jiffy:decode(list_to_binary(ResponseBody), [return_maps]).
|
||||||
|
|
||||||
t_list_rule_api(_Config) ->
|
t_list_rule_api(_Config) ->
|
||||||
AddIds =
|
AddIds =
|
||||||
lists:map(fun(Seq) ->
|
lists:map(fun(Seq) ->
|
||||||
|
|
|
@ -128,6 +128,67 @@ make_simple_resource_type(ResTypeName) ->
|
||||||
init_events_counters() ->
|
init_events_counters() ->
|
||||||
ets:new(events_record_tab, [named_table, bag, public]).
|
ets:new(events_record_tab, [named_table, bag, public]).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% rule test helper funcs
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(HOST, "http://127.0.0.1:18083/").
|
||||||
|
|
||||||
|
-define(API_VERSION, "v4").
|
||||||
|
|
||||||
|
-define(BASE_PATH, "api").
|
||||||
|
|
||||||
|
request_api(Method, Url, Auth) ->
|
||||||
|
request_api(Method, Url, [], Auth, []).
|
||||||
|
|
||||||
|
request_api(Method, Url, QueryParams, Auth) ->
|
||||||
|
request_api(Method, Url, QueryParams, Auth, []).
|
||||||
|
|
||||||
|
request_api(Method, Url, QueryParams, Auth, []) ->
|
||||||
|
NewUrl = case QueryParams of
|
||||||
|
"" -> Url;
|
||||||
|
_ -> Url ++ "?" ++ QueryParams
|
||||||
|
end,
|
||||||
|
Headers = case Auth of
|
||||||
|
no_auth -> [];
|
||||||
|
Header -> [Header]
|
||||||
|
end,
|
||||||
|
do_request_api(Method, {NewUrl, Headers});
|
||||||
|
request_api(Method, Url, QueryParams, Auth, Body) ->
|
||||||
|
NewUrl = case QueryParams of
|
||||||
|
"" -> Url;
|
||||||
|
_ -> Url ++ "?" ++ QueryParams
|
||||||
|
end,
|
||||||
|
Headers = case Auth of
|
||||||
|
no_auth -> [];
|
||||||
|
Header -> [Header]
|
||||||
|
end,
|
||||||
|
do_request_api(Method, {NewUrl, Headers, "application/json", emqx_json:encode(Body)}).
|
||||||
|
|
||||||
|
do_request_api(Method, Request)->
|
||||||
|
%% ct:pal("Method: ~p, Request: ~p", [Method, Request]),
|
||||||
|
case httpc:request(Method, Request, [], []) of
|
||||||
|
{error, socket_closed_remotely} ->
|
||||||
|
{error, socket_closed_remotely};
|
||||||
|
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
|
||||||
|
when Code =:= 200 orelse Code =:= 201 ->
|
||||||
|
{ok, Return};
|
||||||
|
{ok, {Reason, _, _}} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
auth_header_() ->
|
||||||
|
AppId = <<"admin">>,
|
||||||
|
AppSecret = <<"public">>,
|
||||||
|
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
|
||||||
|
|
||||||
|
auth_header_(User, Pass) ->
|
||||||
|
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
|
||||||
|
{"Authorization","Basic " ++ Encoded}.
|
||||||
|
|
||||||
|
api_path(Parts)->
|
||||||
|
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal helper funcs
|
%% Internal helper funcs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -39,3 +39,7 @@
|
||||||
For rule-engine's input events like `$events/message_delivered`, and `$events/message_dropped`,
|
For rule-engine's input events like `$events/message_delivered`, and `$events/message_dropped`,
|
||||||
if the message was delivered to a shared-subscription, the encoding (to JSON) of the event will fail.
|
if the message was delivered to a shared-subscription, the encoding (to JSON) of the event will fail.
|
||||||
Affected versions: `v4.3.21`, `v4.4.10`, `e4.3.16` and `e4.4.10`.
|
Affected versions: `v4.3.21`, `v4.4.10`, `e4.3.16` and `e4.4.10`.
|
||||||
|
|
||||||
|
- Make sure Rule-Engine API supports Percent-encoding `rule_id` and `resource_id` in HTTP request path [#9190](https://github.com/emqx/emqx/pull/9190).
|
||||||
|
Note that the `id` in `POST /api/v4/rules` should be literals (not encoded) when creating a `rule` or `resource`.
|
||||||
|
See docs [Create Rule](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) [Create Resource](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources).
|
||||||
|
|
|
@ -39,3 +39,7 @@
|
||||||
带消息的规则引擎事件,例如 `$events/message_delivered` 和 `$events/message_dropped`,
|
带消息的规则引擎事件,例如 `$events/message_delivered` 和 `$events/message_dropped`,
|
||||||
如果消息事件是共享订阅产生的,在编码(到 JSON 格式)过程中会失败。
|
如果消息事件是共享订阅产生的,在编码(到 JSON 格式)过程中会失败。
|
||||||
影响到的版本:`v4.3.21`, `v4.4.10`, `e4.3.16` 和 `e4.4.10`。
|
影响到的版本:`v4.3.21`, `v4.4.10`, `e4.3.16` 和 `e4.4.10`。
|
||||||
|
|
||||||
|
- 使规则引擎 API 在 HTTP 请求路径中支持百分号编码的 `rule_id` 及 `resource_id` [#9190](https://github.com/emqx/emqx/pull/9190)。
|
||||||
|
注意在创建规则或资源时,HTTP body 中的 `id` 字段仍为字面值,而不是编码之后的值。
|
||||||
|
详情请参考 [创建规则](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) 和 [创建资源](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources)。
|
||||||
|
|
|
@ -63,8 +63,8 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([all_channels/0,
|
-export([all_channels/0,
|
||||||
channel_with_session_table/0,
|
channel_with_session_table/1,
|
||||||
live_connection_table/0]).
|
live_connection_table/1]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([ init/1
|
-export([ init/1
|
||||||
|
@ -433,7 +433,7 @@ all_channels() ->
|
||||||
ets:select(?CHAN_TAB, Pat).
|
ets:select(?CHAN_TAB, Pat).
|
||||||
|
|
||||||
%% @doc Get clientinfo for all clients with sessions
|
%% @doc Get clientinfo for all clients with sessions
|
||||||
channel_with_session_table() ->
|
channel_with_session_table(ConnModules) ->
|
||||||
Ms = ets:fun2ms(
|
Ms = ets:fun2ms(
|
||||||
fun({{ClientId, _ChanPid},
|
fun({{ClientId, _ChanPid},
|
||||||
Info,
|
Info,
|
||||||
|
@ -441,22 +441,25 @@ channel_with_session_table() ->
|
||||||
{ClientId, Info}
|
{ClientId, Info}
|
||||||
end),
|
end),
|
||||||
Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]),
|
Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]),
|
||||||
|
ConnModuleMap = maps:from_list([{Mod, true} || Mod <- ConnModules]),
|
||||||
qlc:q([ {ClientId, ConnState, ConnInfo, ClientInfo}
|
qlc:q([ {ClientId, ConnState, ConnInfo, ClientInfo}
|
||||||
|| {ClientId,
|
|| {ClientId,
|
||||||
#{conn_state := ConnState,
|
#{conn_state := ConnState,
|
||||||
clientinfo := ClientInfo,
|
clientinfo := ClientInfo,
|
||||||
conninfo := #{clean_start := false} = ConnInfo}} <- Table
|
conninfo := #{clean_start := false, conn_mod := ConnModule} = ConnInfo}}
|
||||||
|
<- Table,
|
||||||
|
maps:is_key(ConnModule, ConnModuleMap)
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% @doc Get all local connection query handle
|
%% @doc Get all local connection query handle
|
||||||
live_connection_table() ->
|
live_connection_table(ConnModules) ->
|
||||||
Ms = ets:fun2ms(
|
Ms = lists:map(fun live_connection_ms/1, ConnModules),
|
||||||
fun({{ClientId, ChanPid}, _}) ->
|
|
||||||
{ClientId, ChanPid}
|
|
||||||
end),
|
|
||||||
Table = ets:table(?CHAN_CONN_TAB, [{traverse, {select, Ms}}]),
|
Table = ets:table(?CHAN_CONN_TAB, [{traverse, {select, Ms}}]),
|
||||||
qlc:q([{ClientId, ChanPid} || {ClientId, ChanPid} <- Table, is_channel_connected(ClientId, ChanPid)]).
|
qlc:q([{ClientId, ChanPid} || {ClientId, ChanPid} <- Table, is_channel_connected(ClientId, ChanPid)]).
|
||||||
|
|
||||||
|
live_connection_ms(ConnModule) ->
|
||||||
|
{{{'$1','$2'},ConnModule},[],[{{'$1','$2'}}]}.
|
||||||
|
|
||||||
is_channel_connected(ClientId, ChanPid) when node(ChanPid) =:= node() ->
|
is_channel_connected(ClientId, ChanPid) when node(ChanPid) =:= node() ->
|
||||||
case get_chan_info(ClientId, ChanPid) of
|
case get_chan_info(ClientId, ChanPid) of
|
||||||
#{conn_state := disconnected} -> false;
|
#{conn_state := disconnected} -> false;
|
||||||
|
|
Loading…
Reference in New Issue