Merge pull request #8757 from zhongwencool/copy-of-main-v4.3

sync main-v4.3 to main-v4.4
This commit is contained in:
zhongwencool 2022-08-19 14:05:58 +08:00 committed by GitHub
commit 49dec82928
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
62 changed files with 1124 additions and 327 deletions

View File

@ -20,7 +20,6 @@ jobs:
runs-on: ubuntu-20.04
# prepare source with any OTP version, no need for a matrix
container: ghcr.io/emqx/emqx-builder/4.4-19:24.1.5-3-ubuntu20.04
outputs:
profiles: ${{ steps.set_profile.outputs.profiles}}
@ -316,7 +315,7 @@ jobs:
id: meta
with:
images: ${{ matrix.registry }}/${{ github.repository_owner }}/${{ matrix.profile }}
## only stable tag is latest
## only 5.0 is latest
flavor: |
latest=false # latest is now 5.0
tags: |

View File

@ -8,7 +8,27 @@ File format:
- Use weight-2 heading for releases
- One list item per change topic
Change log ends with a list of github PRs
Change log ends with a list of GitHub PRs
## v4.3.19
### Enhancements
- Improve error message for LwM2M plugin when object ID is not valid [#8654](https://github.com/emqx/emqx/pull/8654).
- Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671)
- Add node evacuation and cluster rebalancing features [#8597](https://github.com/emqx/emqx/pull/8597)
- Refine Rule Engine error log. RuleId will be logged when take action failed. [#8737](https://github.com/emqx/emqx/pull/8737)
- Improved jwt authentication module initialization process.[#8736](https://github.com/emqx/emqx/pull/8736)
### Bug fixes
- Fix rule SQL compare to null values always returns false. [#8743](https://github.com/emqx/emqx/pull/8743)
Before this change, the following SQL failed to match on the WHERE clause (`clientid != foo` returns false):
`SELECT 'some_var' as clientid FROM "t" WHERE clientid != foo`.
The `foo` variable is a null value, so `clientid != foo` should be evaluated as true.
- Fix GET `/auth_clientid` and `/auth_username` counts. [#8655](https://github.com/emqx/emqx/pull/8655)
- Add an idle timer for ExProto UDP client to avoid client leaking [#8628](https://github.com/emqx/emqx/pull/8628)
- Fix GET `/listeners/` crashes when listener is not ready. [#8752](https://github.com/emqx/emqx/pull/8752)
## v4.3.18

View File

@ -1,6 +1,6 @@
{application, emqx_auth_jwt,
[{description, "EMQ X Authentication with JWT"},
{vsn, "4.4.3"}, % strict semver, bump manually!
{vsn, "4.4.4"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_jwt_sup]},
{applications, [kernel,stdlib,jose]},

View File

@ -1,11 +1,13 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.2",[{load_module,emqx_auth_jwt_svr,brutal_purge,soft_purge,[]},
[{"4.4.3",[{load_module,emqx_auth_jwt_svr,brutal_purge,soft_purge,[]}]},
{"4.4.2",[{load_module,emqx_auth_jwt_svr,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_jwt,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.[0-1]">>,[{restart_application,emqx_auth_jwt}]},
{<<".*">>,[]}],
[{"4.4.2",[{load_module,emqx_auth_jwt_svr,brutal_purge,soft_purge,[]},
[{"4.4.3",[{load_module,emqx_auth_jwt_svr,brutal_purge,soft_purge,[]}]},
{"4.4.2",[{load_module,emqx_auth_jwt_svr,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_jwt,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.[0-1]">>,[{restart_application,emqx_auth_jwt}]},
{<<".*">>,[]}]}.

View File

@ -73,8 +73,9 @@ verify(JwsCompacted) when is_binary(JwsCompacted) ->
init([Options]) ->
ok = jose:json_module(jiffy),
_ = ets:new(?TAB, [set, protected, named_table]),
{Static, Remote} = do_init_jwks(Options),
true = ets:insert(?TAB, [{static, Static}, {remote, Remote}]),
Static = do_init_jwks(Options),
to_request_jwks(Options),
true = ets:insert(?TAB, [{static, Static}, {remote, undefined}]),
Intv = proplists:get_value(interval, Options, ?INTERVAL),
{ok, reset_timer(
#state{
@ -83,29 +84,13 @@ init([Options]) ->
%% @private
do_init_jwks(Options) ->
K2J = fun(K, F) ->
case proplists:get_value(K, Options) of
undefined -> undefined;
V ->
try F(V) of
{error, Reason} ->
?LOG(warning, "Build ~p JWK ~p failed: {error, ~p}~n",
[K, V, Reason]),
undefined;
J -> J
catch T:R ->
?LOG(warning, "Build ~p JWK ~p failed: {~p, ~p}~n",
[K, V, T, R]),
undefined
end
end
end,
OctJwk = K2J(secret, fun(V) ->
jose_jwk:from_oct(list_to_binary(V))
end),
PemJwk = K2J(pubkey, fun jose_jwk:from_pem_file/1),
Remote = K2J(jwks_addr, fun request_jwks/1),
{[J ||J <- [OctJwk, PemJwk], J /= undefined], Remote}.
OctJwk = key2jwt_value(secret,
fun(V) ->
jose_jwk:from_oct(list_to_binary(V))
end,
Options),
PemJwk = key2jwt_value(pubkey, fun jose_jwk:from_pem_file/1, Options),
[J ||J <- [OctJwk, PemJwk], J /= undefined].
handle_call(_Req, _From, State) ->
{reply, ok, State}.
@ -122,6 +107,11 @@ handle_info({timeout, _TRef, refresh}, State = #state{addr = Addr}) ->
end,
{noreply, reset_timer(NState)};
handle_info({request_jwks, Options}, State) ->
Remote = key2jwt_value(jwks_addr, fun request_jwks/1, Options),
true = ets:insert(?TAB, {remote, Remote}),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@ -249,3 +239,23 @@ do_check_claim([{K, F}|More], Claims) ->
_ ->
do_check_claim(More, Claims)
end.
to_request_jwks(Options) ->
erlang:send(self(), {request_jwks, Options}).
key2jwt_value(Key, Func, Options) ->
case proplists:get_value(Key, Options) of
undefined -> undefined;
V ->
try Func(V) of
{error, Reason} ->
?LOG(warning, "Build ~p JWK ~p failed: {error, ~p}~n",
[Key, V, Reason]),
undefined;
J -> J
catch T:R ->
?LOG(warning, "Build ~p JWK ~p failed: {~p, ~p}~n",
[Key, V, T, R]),
undefined
end
end.

View File

@ -1,6 +1,6 @@
{application, emqx_auth_mnesia,
[{description, "EMQ X Authentication with Mnesia"},
{vsn, "4.3.7"}, % strict semver, bump manually
{vsn, "4.3.8"}, % strict semver, bump manually
{modules, []},
{registered, []},
{applications, [kernel,stdlib,mnesia]},

View File

@ -1,7 +1,9 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{<<"4\\.3\\.[5-6]">>,
[{<<"4\\.3\\.7">>,
[{load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[5-6]">>,
[{load_module,emqx_auth_mnesia_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mnesia,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]},
@ -28,7 +30,8 @@
{load_module,emqx_acl_mnesia,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mnesia_app,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{<<"4\\.3\\.[5-6]">>,
[{"4.3.7",[{load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[5-6]">>,
[{load_module,emqx_auth_mnesia_app,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mnesia,brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mnesia_api,brutal_purge,soft_purge,[]},

View File

@ -132,7 +132,11 @@
list_clientid(_Bindings, Params) ->
SortFun = fun(#{created_at := C1}, #{created_at := C2}) -> C1 > C2 end,
return({ok, emqx_mgmt_api:node_query(node(), Params, ?CLIENTID_SCHEMA, ?query_clientid, SortFun)}).
CountFun = fun() ->
MatchSpec = [{{?TABLE, {clientid, '_'}, '_', '_'}, [], [true]}],
ets:select_count(?TABLE, MatchSpec)
end,
return({ok, emqx_mgmt_api:node_query(node(), Params, ?CLIENTID_SCHEMA, ?query_clientid, SortFun, CountFun)}).
lookup_clientid(#{clientid := Clientid}, _Params) ->
return({ok, format(emqx_auth_mnesia_cli:lookup_user({clientid, urldecode(Clientid)}))}).
@ -182,7 +186,11 @@ delete_clientid(#{clientid := Clientid}, _) ->
list_username(_Bindings, Params) ->
SortFun = fun(#{created_at := C1}, #{created_at := C2}) -> C1 > C2 end,
return({ok, emqx_mgmt_api:node_query(node(), Params, ?USERNAME_SCHEMA, ?query_username, SortFun)}).
CountFun = fun() ->
MatchSpec = [{{?TABLE, {username, '_'}, '_', '_'}, [], [true]}],
ets:select_count(?TABLE, MatchSpec)
end,
return({ok, emqx_mgmt_api:node_query(node(), Params, ?USERNAME_SCHEMA, ?query_username, SortFun, CountFun)}).
lookup_username(#{username := Username}, _Params) ->
return({ok, format(emqx_auth_mnesia_cli:lookup_user({username, urldecode(Username)}))}).

View File

@ -272,7 +272,8 @@ t_clientid_rest_api(_Config) ->
clean_all_users(),
{ok, Result1} = request_http_rest_list(["auth_clientid"]),
[] = get_http_data(Result1),
?assertMatch(#{<<"data">> := [], <<"meta">> := #{<<"count">> := 0}},
emqx_json:decode(Result1, [return_maps])),
Params1 = #{<<"clientid">> => ?CLIENTID, <<"password">> => ?PASSWORD},
{ok, _} = request_http_rest_add(["auth_clientid"], Params1),
@ -295,8 +296,28 @@ t_clientid_rest_api(_Config) ->
}, get_http_data(Result3)),
{ok, Result4} = request_http_rest_list(["auth_clientid"]),
#{<<"data">> := Data4, <<"meta">> := #{<<"count">> := Count4}}
= emqx_json:decode(Result4, [return_maps]),
?assertEqual(3, length(get_http_data(Result4))),
?assertEqual(3, Count4),
?assertEqual([<<"client2">>, <<"clientid1">>, ?CLIENTID],
lists:sort(lists:map(fun(#{<<"clientid">> := C}) -> C end, Data4))),
UserNameParams = [#{<<"username">> => <<"username1">>, <<"password">> => ?PASSWORD}
, #{<<"username">> => <<"username2">>, <<"password">> => ?PASSWORD}
],
{ok, _} = request_http_rest_add(["auth_username"], UserNameParams),
{ok, Result41} = request_http_rest_list(["auth_clientid"]),
%% the count clientid is not affected by username count.
?assertEqual(Result4, Result41),
{ok, Result42} = request_http_rest_list(["auth_username"]),
#{<<"data">> := Data42, <<"meta">> := #{<<"count">> := Count42}}
= emqx_json:decode(Result42, [return_maps]),
?assertEqual(2, Count42),
?assertEqual([<<"username1">>, <<"username2">>],
lists:sort(lists:map(fun(#{<<"username">> := U}) -> U end, Data42))),
{ok, Result5} = request_http_rest_list(["auth_clientid?_like_clientid=id"]),
?assertEqual(2, length(get_http_data(Result5))),

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_mqtt,
[{description, "EMQ X Bridge to MQTT Broker"},
{vsn, "4.3.5"}, % strict semver, bump manually!
{vsn, "4.3.6"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,replayq,emqtt]},

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.4",
[{<<"4\\.3\\.[4-5]">>,
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},
@ -14,7 +14,7 @@
{load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.4",
[{<<"4\\.3\\.[4-5]">>,
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]},

View File

@ -433,7 +433,7 @@ test_resource_status(PoolName) ->
try
Status = [
receive {Pid, R} -> R
after 1000 -> %% get_worker_status/1 should be a quick operation
after 10000 -> %% get_worker_status/1 should be a quick operation
throw({timeout, Pid})
end || Pid <- Pids],
lists:any(fun(St) -> St =:= true end, Status)
@ -444,13 +444,28 @@ test_resource_status(PoolName) ->
false
end.
-define(RETRY_TIMES, 4).
get_worker_status(Worker) ->
get_worker_status(Worker, ?RETRY_TIMES).
get_worker_status(_Worker, 0) ->
false;
get_worker_status(Worker, Times) ->
case ecpool_worker:client(Worker) of
{ok, Bridge} ->
try emqx_bridge_worker:status(Bridge) of
connected -> true;
_ -> false
catch _Error:_Reason ->
connected ->
true;
idle ->
?LOG(info, "MQTT Bridge get status idle. Should not ignore this."),
timer:sleep(100),
get_worker_status(Worker, Times - 1);
ErrorStatus ->
?LOG(error, "MQTT Bridge get status ~p", [ErrorStatus]),
false
catch Error:Reason:ST ->
?LOG(error, "MQTT Bridge get status error: ~p reason: ~p stacktrace: ~p", [Error, Reason, ST]),
false
end;
{error, _} ->

View File

@ -1,6 +1,6 @@
{application, emqx_exproto,
[{description, "EMQ X Extension for Protocol"},
{vsn, "4.3.9"}, %% 4.3.3 is used by ee
{vsn, "4.3.10"}, %% 4.3.3 is used by ee
{modules, []},
{registered, []},
{mod, {emqx_exproto_app, []}},

View File

@ -1,7 +1,9 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{<<"4\\.3\\.[2-8]">>,
[{"4.3.9",
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[2-8]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>,
@ -10,7 +12,9 @@
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{<<"4\\.3\\.[2-8]">>,
[{"4.3.9",
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[2-8]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>,

View File

@ -76,7 +76,8 @@
-define(TIMER_TABLE, #{
alive_timer => keepalive,
force_timer => force_close
force_timer => force_close,
idle_timer => force_close_idle
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
@ -94,6 +95,8 @@
awaiting_rel_max
]).
-define(DEFAULT_IDLE_TIMEOUT, 30000).
%%--------------------------------------------------------------------
%% Info, Attrs and Caps
%%--------------------------------------------------------------------
@ -149,8 +152,12 @@ init(ConnInfo = #{socktype := Socktype,
GRpcChann = proplists:get_value(handler, Options),
NConnInfo = default_conninfo(ConnInfo),
ClientInfo = default_clientinfo(ConnInfo),
IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT),
NConnInfo1 = NConnInfo#{idle_timeout => IdleTimeout},
Channel = #channel{gcli = #{channel => GRpcChann},
conninfo = NConnInfo,
conninfo = NConnInfo1,
clientinfo = ClientInfo,
conn_state = accepted,
timers = #{}
@ -165,7 +172,8 @@ init(ConnInfo = #{socktype := Socktype,
#{socktype => socktype(Socktype),
peername => address(Peername),
sockname => address(Sockname)})},
try_dispatch(on_socket_created, wrap(Req), Channel)
start_idle_checking_timer(
try_dispatch(on_socket_created, wrap(Req), Channel))
end.
register_the_anonymous_client(ClientInfo, ConnInfo) ->
@ -184,6 +192,12 @@ register_the_anonymous_client(ClientInfo, ConnInfo) ->
unregister_the_anonymous_client(ClientId) ->
emqx_cm:unregister_channel(ClientId).
start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) ->
ensure_timer(idle_timer, Channel);
start_idle_checking_timer(Channel) ->
Channel.
%% @private
peercert(NoSsl, ConnInfo) when NoSsl == nossl;
NoSsl == undefined ->
@ -267,6 +281,9 @@ handle_timeout(_TRef, {keepalive, StatVal},
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
{shutdown, Reason, Channel};
handle_timeout(_TRef, force_close_idle, Channel) ->
{shutdown, idle_timeout, Channel};
handle_timeout(_TRef, Msg, Channel) ->
?WARN("Unexpected timeout: ~p", [Msg]),
{ok, Channel}.
@ -328,7 +345,8 @@ handle_call({start_timer, keepalive, Interval},
NConnInfo = ConnInfo#{keepalive => Interval},
NClientInfo = ClientInfo#{keepalive => Interval},
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
{reply, ok, [{event, updated}], ensure_keepalive(NChannel)};
{reply, ok, [{event, updated}],
ensure_keepalive(cancel_timer(idle_timer, NChannel))};
handle_call({subscribe, TopicFilter, Qos},
Channel = #channel{
@ -561,6 +579,12 @@ reset_timer(Name, Channel) ->
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
cancel_timer(Name, Channel = #channel{timers = Timers}) ->
emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)),
clean_timer(Name, Channel).
interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
IdleTimeout;
interval(force_timer, _) ->
15000;
interval(alive_timer, #channel{keepalive = Keepalive}) ->

View File

@ -1,6 +1,6 @@
{application,emqx_lwm2m,
[{description,"EMQ X LwM2M Gateway"},
{vsn, "4.3.7"}, % strict semver, bump manually!
{vsn, "4.3.8"}, % strict semver, bump manually!
{modules,[]},
{registered,[emqx_lwm2m_sup]},
{applications,[kernel,stdlib,lwm2m_coap]},

View File

@ -1,29 +1,75 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{<<"4\\.3\\.[0-1]">>,
[{restart_application,emqx_lwm2m}]},
[{"4.3.7",
[{load_module,emqx_lwm2m_xml_object_db,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_xml_object,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_json,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_cmd_handler,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>,[{restart_application,emqx_lwm2m}]},
{"4.3.2",
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
[{load_module,emqx_lwm2m_xml_object_db,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_xml_object,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_json,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_cmd_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
[{load_module,emqx_lwm2m_xml_object_db,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_xml_object,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_json,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_cmd_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
[{load_module,emqx_lwm2m_xml_object_db,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_xml_object,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_json,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_cmd_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[ %% There are only changes to the schema file, so we don't need any
%% commands here
]}],
[{<<"4\\.3\\.[0-1]">>,
[{restart_application,emqx_lwm2m}]},
[{load_module,emqx_lwm2m_xml_object_db,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_xml_object,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_json,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_cmd_handler,brutal_purge,soft_purge,[]}]}],
[{"4.3.7",
[{load_module,emqx_lwm2m_xml_object_db,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_xml_object,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_json,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_cmd_handler,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>,[{restart_application,emqx_lwm2m}]},
{"4.3.2",
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
[{load_module,emqx_lwm2m_xml_object_db,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_xml_object,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_json,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_cmd_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-4]">>,
[{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
[{load_module,emqx_lwm2m_xml_object_db,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_xml_object,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_json,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_cmd_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{"4.3.6", []}]}.
[{load_module,emqx_lwm2m_xml_object_db,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_xml_object,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_json,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_cmd_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_api,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_lwm2m_xml_object_db,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_xml_object,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_message,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_json,brutal_purge,soft_purge,[]},
{load_module,emqx_lwm2m_cmd_handler,brutal_purge,soft_purge,[]}]}]}.

View File

@ -106,9 +106,11 @@ coap_read_to_mqtt({ok, SuccessCode}, CoapPayload, Format, Ref) ->
Result = coap_content_to_mqtt_payload(CoapPayload, Format, Ref),
make_response(SuccessCode, Ref, Format, Result)
catch
error:not_implemented -> make_response(not_implemented, Ref);
throw : {bad_request, Reason} ->
?LOG(error, "bad_request, reason=~p, payload=~p", [Reason, CoapPayload]),
make_response(bad_request, Ref);
C:R:Stack ->
?LOG(error, "~p, bad payload format: ~p, stacktrace: ~p", [{C, R}, CoapPayload, Stack]),
?LOG(error, "bad_request, error=~p, stacktrace=~p~npayload=~p", [{C, R}, Stack, CoapPayload]),
make_response(bad_request, Ref)
end.

View File

@ -29,7 +29,7 @@
tlv_to_json(BaseName, TlvData) ->
DecodedTlv = emqx_lwm2m_tlv:parse(TlvData),
ObjectId = object_id(BaseName),
ObjDefinition = emqx_lwm2m_xml_object:get_obj_def(ObjectId, true),
ObjDefinition = emqx_lwm2m_xml_object:get_obj_def_assertive(ObjectId, true),
case DecodedTlv of
[#{tlv_resource_with_value:=Id, value:=Value}] ->
TrueBaseName = basename(BaseName, undefined, undefined, Id, 3),
@ -315,7 +315,7 @@ encode_int(Int) -> binary:encode_unsigned(Int).
text_to_json(BaseName, Text) ->
{ObjectId, ResourceId} = object_resource_id(BaseName),
ObjDefinition = emqx_lwm2m_xml_object:get_obj_def(ObjectId, true),
ObjDefinition = emqx_lwm2m_xml_object:get_obj_def_assertive(ObjectId, true),
{K, V} = text_value(Text, ResourceId, ObjDefinition),
#{bn=>BaseName, e=>[#{K=>V}]}.

View File

@ -33,7 +33,7 @@
tlv_to_json(BaseName, TlvData) ->
DecodedTlv = emqx_lwm2m_tlv:parse(TlvData),
ObjectId = object_id(BaseName),
ObjDefinition = emqx_lwm2m_xml_object:get_obj_def(ObjectId, true),
ObjDefinition = emqx_lwm2m_xml_object:get_obj_def_assertive(ObjectId, true),
case DecodedTlv of
[#{tlv_resource_with_value:=Id, value:=Value}] ->
TrueBaseName = basename(BaseName, undefined, undefined, Id, 3),
@ -289,7 +289,7 @@ path([H|T], Acc) ->
text_to_json(BaseName, Text) ->
{ObjectId, ResourceId} = object_resource_id(BaseName),
ObjDefinition = emqx_lwm2m_xml_object:get_obj_def(ObjectId, true),
ObjDefinition = emqx_lwm2m_xml_object:get_obj_def_assertive(ObjectId, true),
Val = text_value(Text, ResourceId, ObjDefinition),
[#{path => BaseName, value => Val}].

View File

@ -20,6 +20,7 @@
-include_lib("xmerl/include/xmerl.hrl").
-export([ get_obj_def/2
, get_obj_def_assertive/2
, get_object_id/1
, get_object_name/1
, get_object_and_resource_id/2
@ -31,15 +32,19 @@
-define(LOG(Level, Format, Args),
logger:Level("LWM2M-OBJ: " ++ Format, Args)).
% This module is for future use. Disabled now.
get_obj_def_assertive(ObjectId, IsInt) ->
case get_obj_def(ObjectId, IsInt) of
{error, no_xml_definition} ->
erlang:throw({bad_request, {unknown_object_id, ObjectId}});
Xml ->
Xml
end.
get_obj_def(ObjectIdInt, true) ->
emqx_lwm2m_xml_object_db:find_objectid(ObjectIdInt);
get_obj_def(ObjectNameStr, false) ->
emqx_lwm2m_xml_object_db:find_name(ObjectNameStr).
get_object_id(ObjDefinition) ->
[#xmlText{value=ObjectId}] = xmerl_xpath:string("ObjectID/text()", ObjDefinition),
ObjectId.

View File

@ -69,12 +69,9 @@ find_name(Name) ->
end,
case ets:lookup(?LWM2M_OBJECT_NAME_TO_ID_TAB, NameBinary) of
[] ->
undefined;
{error, no_xml_definition};
[{NameBinary, ObjectId}] ->
case ets:lookup(?LWM2M_OBJECT_DEF_TAB, ObjectId) of
[] -> undefined;
[{ObjectId, Xml}] -> Xml
end
find_objectid(ObjectId)
end.
stop() ->

View File

@ -689,6 +689,60 @@ case10_read(Config) ->
}),
?assertEqual(ReadResult, test_recv_mqtt_response(RespTopic)).
case10_read_bad_request(Config) ->
UdpSock = ?config(sock, Config),
Epn = "urn:oma:lwm2m:oma:3",
MsgId1 = 15,
RespTopic = list_to_binary("lwm2m/"++Epn++"/up/resp"),
emqtt:subscribe(?config(emqx_c, Config), RespTopic, qos0),
timer:sleep(200),
% step 1, device register ...
test_send_coap_request( UdpSock,
post,
sprintf("coap://127.0.0.1:~b/rd?ep=~s&lt=345&lwm2m=1", [?PORT, Epn]),
#coap_content{content_format = <<"text/plain">>,
payload = <<"</lwm2m>;rt=\"oma.lwm2m\";ct=11543,</lwm2m/1/0>,</lwm2m/2/0>,</lwm2m/3/0>">>},
[],
MsgId1),
#coap_message{method = Method1} = test_recv_coap_response(UdpSock),
?assertEqual({ok,created}, Method1),
test_recv_mqtt_response(RespTopic),
% step2, send a READ command to device
CmdId = 206,
CommandTopic = <<"lwm2m/", (list_to_binary(Epn))/binary, "/dn/dm">>,
Command = #{
<<"requestID">> => CmdId, <<"cacheID">> => CmdId,
<<"msgType">> => <<"read">>,
<<"data">> => #{
<<"path">> => <<"/3333/0/0">>
}
},
CommandJson = emqx_json:encode(Command),
?LOGT("CommandJson=~p", [CommandJson]),
test_mqtt_broker:publish(CommandTopic, CommandJson, 0),
timer:sleep(50),
Request2 = test_recv_coap_request(UdpSock),
#coap_message{method = Method2, payload=Payload2} = Request2,
?LOGT("LwM2M client got ~p", [Request2]),
?assertEqual(get, Method2),
?assertEqual(<<>>, Payload2),
timer:sleep(50),
test_send_coap_response(UdpSock, "127.0.0.1", ?PORT, {ok, content}, #coap_content{content_format = <<"text/plain">>, payload = <<"EMQ">>}, Request2, true),
timer:sleep(100),
ReadResult = emqx_json:encode(#{ <<"requestID">> => CmdId, <<"cacheID">> => CmdId,
<<"msgType">> => <<"read">>,
<<"data">> => #{
<<"code">> => <<"4.00">>,
<<"codeMsg">> => <<"bad_request">>,
<<"reqPath">> => <<"/3333/0/0">>
}
}),
?assertEqual(ReadResult, test_recv_mqtt_response(RespTopic)).
case10_read_separate_ack(Config) ->
UdpSock = ?config(sock, Config),
Epn = "urn:oma:lwm2m:oma:3",

View File

@ -24,6 +24,7 @@
-export([ params2qs/2
, node_query/4
, node_query/5
, node_query/6
, cluster_query/3
, traverse_table/5
, select_table/5
@ -75,6 +76,11 @@ query_handle(Tables) ->
Handles = lists:foldl(Fold, [], Tables),
qlc:append(lists:reverse(Handles)).
count_size(Table, undefined) ->
count(Table);
count_size(_Table, CountFun) ->
CountFun().
count(Table) when is_atom(Table) ->
ets:info(Table, size);
@ -112,9 +118,12 @@ limit(Params) ->
%%--------------------------------------------------------------------
node_query(Node, Params, {Tab, QsSchema}, QueryFun) ->
node_query(Node, Params, {Tab, QsSchema}, QueryFun, undefined).
node_query(Node, Params, {Tab, QsSchema}, QueryFun, undefined, undefined).
node_query(Node, Params, {Tab, QsSchema}, QueryFun, SortFun) ->
node_query(Node, Params, {Tab, QsSchema}, QueryFun, SortFun, undefined).
node_query(Node, Params, {Tab, QsSchema}, QueryFun, SortFun, CountFun) ->
{CodCnt, Qs} = params2qs(Params, QsSchema),
Limit = limit(Params),
Page = page(Params),
@ -124,7 +133,7 @@ node_query(Node, Params, {Tab, QsSchema}, QueryFun, SortFun) ->
{_, Rows} = do_query(Node, Qs, QueryFun, Start, Limit+1),
Meta = #{page => Page, limit => Limit},
NMeta = case CodCnt =:= 0 of
true -> Meta#{count => count(Tab), hasnext => length(Rows) > Limit};
true -> Meta#{count => count_size(Tab, CountFun), hasnext => length(Rows) > Limit};
_ -> Meta#{count => -1, hasnext => length(Rows) > Limit}
end,
Data0 = lists:sublist(Rows, Limit),

View File

@ -72,5 +72,4 @@ format(Listeners) when is_list(Listeners) ->
[ Info#{listen_on => list_to_binary(esockd:to_string(ListenOn))}
|| Info = #{listen_on := ListenOn} <- Listeners ];
format({error, Reason}) -> [{error, Reason}].
format({error, Reason}) -> [{error, iolist_to_binary(io_lib:format("~p", [Reason]))}].

View File

@ -70,4 +70,10 @@ remove_all_users_and_acl() ->
mnesia:delete_table(emqx_user),
mnesia:delete_table(emqx_acl).
-else.
%% opensource edition
all() -> [].
-endif.

View File

@ -25,15 +25,12 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_management/include/emqx_mgmt.hrl").
-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
-define(HOST, "http://127.0.0.1:8081/").
-elvis([{elvis_style, line_length, disable}]).
-define(API_VERSION, "v4").
-define(BASE_PATH, "api").
-import(emqx_mgmt_api_test_helpers,
[request_api/3,
request_api/4,
request_api/5,
auth_header_/0,
api_path/1]).
all() ->
emqx_ct:all(?MODULE).
@ -790,49 +787,6 @@ t_keepalive(_Config) ->
application:stop(emqx_dashboard),
ok.
request_api(Method, Url, Auth) ->
request_api(Method, Url, [], Auth, []).
request_api(Method, Url, QueryParams, Auth) ->
request_api(Method, Url, QueryParams, Auth, []).
request_api(Method, Url, QueryParams, Auth, []) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth]});
request_api(Method, Url, QueryParams, Auth, Body) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}).
do_request_api(Method, Request)->
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
when Code =:= 200 orelse Code =:= 201 ->
{ok, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
auth_header_() ->
AppId = <<"admin">>,
AppSecret = <<"public">>,
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
{"Authorization","Basic " ++ Encoded}.
api_path(Parts)->
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).
filter(List, Key, Value) ->
lists:filter(fun(Item) ->
maps:get(Key, Item) == Value

View File

@ -0,0 +1,69 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_test_helpers).
-compile(export_all).
-compile(nowarn_export_all).
-define(HOST, "http://127.0.0.1:8081/").
-define(API_VERSION, "v4").
-define(BASE_PATH, "api").
request_api(Method, Url, Auth) ->
request_api(Method, Url, [], Auth, []).
request_api(Method, Url, QueryParams, Auth) ->
request_api(Method, Url, QueryParams, Auth, []).
request_api(Method, Url, QueryParams, Auth, []) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth]});
request_api(Method, Url, QueryParams, Auth, Body) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}).
do_request_api(Method, Request)->
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
when Code =:= 200 orelse Code =:= 201 ->
{ok, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
auth_header_() ->
AppId = <<"admin">>,
AppSecret = <<"public">>,
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
{"Authorization","Basic " ++ Encoded}.
api_path(Parts)->
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).

View File

@ -1,6 +1,6 @@
{application, emqx_retainer,
[{description, "EMQ X Retainer"},
{vsn, "4.4.1"}, % strict semver, bump manually!
{vsn, "4.4.2"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel,stdlib]},

View File

@ -1,7 +1,12 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.0",[{load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]}]},
[{"4.4.1",[{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]},
{"4.4.0",[{load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.0",[{load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]}]},
[{"4.4.1",[{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]}]},
{"4.4.0",[{load_module,emqx_retainer_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_retainer_cli,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]
}.

View File

@ -32,5 +32,8 @@ init([Env]) ->
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_retainer]}]}}.
modules => [emqx_retainer]} || not is_managed_by_modules()]}}.
is_managed_by_modules() ->
%% always false for opensource edition
false.

View File

@ -31,11 +31,12 @@ all() -> emqx_ct:all(?MODULE).
%%--------------------------------------------------------------------
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_retainer]),
emqx_retainer_ct_helper:ensure_start(),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_retainer]).
emqx_retainer_ct_helper:ensure_stop(),
ok.
init_per_testcase(TestCase, Config) ->
emqx_retainer:clean(<<"#">>),
@ -207,4 +208,3 @@ receive_messages(Count, Msgs) ->
after 2000 ->
Msgs
end.

View File

@ -24,11 +24,12 @@
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_retainer]),
emqx_retainer_ct_helper:ensure_start(),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_retainer]).
emqx_retainer_ct_helper:ensure_stop(),
ok.
init_per_testcase(_TestCase, Config) ->
Config.

View File

@ -0,0 +1,46 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_retainer_ct_helper).
%% API
-export([ensure_start/0, ensure_stop/0]).
-ifdef(EMQX_ENTERPRISE).
ensure_start() ->
application:stop(emqx_modules),
init_conf(),
emqx_ct_helpers:start_apps([emqx_retainer]),
ok.
-else.
ensure_start() ->
init_conf(),
emqx_ct_helpers:start_apps([emqx_retainer]),
ok.
-endif.
ensure_stop() ->
emqx_ct_helpers:stop_apps([emqx_retainer]).
init_conf() ->
application:set_env(emqx_retainer, expiry_interval, 0),
application:set_env(emqx_retainer, max_payload_size, 1024000),
application:set_env(emqx_retainer, max_retained_messages, 0),
application:set_env(emqx_retainer, storage_type, ram),
ok.

View File

@ -27,12 +27,13 @@ init_per_suite(Config) ->
%% Meck emqtt
ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
%% Start Apps
emqx_ct_helpers:start_apps([emqx_retainer]),
emqx_retainer_ct_helper:ensure_start(),
Config.
end_per_suite(_Config) ->
ok = meck:unload(emqtt),
emqx_ct_helpers:stop_apps([emqx_retainer]).
emqx_retainer_ct_helper:ensure_stop().
%%--------------------------------------------------------------------
%% Helpers
@ -107,7 +108,7 @@ t_publish_message_expiry_interval(_) ->
Msgs = receive_messages(4),
?assertEqual(2, length(Msgs)), %% [MQTT-3.3.2-5]
L = lists:map(fun(Msg) -> MessageExpiryInterval = maps:get('Message-Expiry-Interval', maps:get(properties, Msg)), MessageExpiryInterval < 10 end, Msgs),
L = lists:map(fun(Msg) -> MessageExpiryInterval = maps:get('Message-Expiry-Interval', maps:get(properties, Msg)), MessageExpiryInterval < 10 end, Msgs),
?assertEqual(2, length(L)), %% [MQTT-3.3.2-6]
ok = emqtt:disconnect(Client1),

View File

@ -1,3 +1,19 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-compile({parse_transform, emqx_rule_actions_trans}).
-type selected_data() :: map().
@ -6,6 +22,9 @@
-define(BINDING_KEYS, '__bindings__').
-define(LOG_RULE_ACTION(Level, Metadata, Fmt, Args),
emqx_rule_utils:log_action(Level, Metadata, Fmt, Args)).
-define(bound_v(Key, ENVS0),
maps:get(Key,
maps:get(?BINDING_KEYS, ENVS0, #{}))).

View File

@ -171,9 +171,13 @@ on_action_create_republish(Id, Params = #{
on_action_republish(_Selected, Envs = #{
topic := Topic,
headers := #{republish_by := ActId},
?BINDING_KEYS := #{'Id' := ActId}
?BINDING_KEYS := #{'Id' := ActId},
metadata := Metadata
}) ->
?LOG(error, "[republish] recursively republish detected, msg topic: ~p, target topic: ~p",
?LOG_RULE_ACTION(
error,
Metadata,
"[republish] recursively republish detected, msg topic: ~p, target topic: ~p",
[Topic, ?bound_v('TargetTopic', Envs)]),
emqx_rule_metrics:inc_actions_error(?bound_v('Id', Envs)),
{badact, recursively_republish};
@ -186,8 +190,9 @@ on_action_republish(Selected, _Envs = #{
'TargetQoS' := TargetQoS,
'TopicTks' := TopicTks,
'PayloadTks' := PayloadTks
} = Bindings}) ->
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
} = Bindings,
metadata := Metadata}) ->
?LOG_RULE_ACTION(debug, Metadata, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
TargetRetain = maps:get('TargetRetain', Bindings, false),
Message =
#message{
@ -210,8 +215,9 @@ on_action_republish(Selected, _Envs = #{
'TargetQoS' := TargetQoS,
'TopicTks' := TopicTks,
'PayloadTks' := PayloadTks
} = Bindings}) ->
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
} = Bindings,
metadata := Metadata}) ->
?LOG_RULE_ACTION(debug, Metadata, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
TargetRetain = maps:get('TargetRetain', Bindings, false),
Message =
#message{

View File

@ -1,10 +1,16 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.7",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.4.6",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
[{<<"4\\.4\\.[6-7]">>,
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.4.5",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.4.4",
@ -73,11 +79,17 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.7",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.4.6",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
[{<<"4\\.4\\.[6-7]">>,
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.4.5",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},

View File

@ -17,6 +17,7 @@
-module(emqx_rule_runtime).
-include("rule_engine.hrl").
-include("rule_actions.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
@ -54,36 +55,37 @@ apply_rules([Rule|More], Input) ->
apply_rule(Rule, Input),
apply_rules(More, Input).
apply_rule(Rule = #rule{id = RuleID}, Input) ->
apply_rule(Rule = #rule{id = RuleId}, Input) ->
clear_rule_payload(),
ok = emqx_rule_metrics:inc_rules_matched(RuleID),
try do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID}))
ok = emqx_rule_metrics:inc_rules_matched(RuleId),
%% Add metadata here caused we need support `metadata` and `rule_id` in SQL
try do_apply_rule(Rule, emqx_rule_utils:add_metadata(Input, #{rule_id => RuleId}))
catch
%% ignore the errors if select or match failed
_:Reason = {select_and_transform_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
emqx_rule_metrics:inc_rules_exception(RuleId),
?LOG(warning, "SELECT clause exception for ~s failed: ~p",
[RuleID, Error]),
[RuleId, Error]),
{error, Reason};
_:Reason = {match_conditions_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
emqx_rule_metrics:inc_rules_exception(RuleId),
?LOG(warning, "WHERE clause exception for ~s failed: ~p",
[RuleID, Error]),
[RuleId, Error]),
{error, Reason};
_:Reason = {select_and_collect_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
emqx_rule_metrics:inc_rules_exception(RuleId),
?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
[RuleID, Error]),
[RuleId, Error]),
{error, Reason};
_:Reason = {match_incase_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
emqx_rule_metrics:inc_rules_exception(RuleId),
?LOG(warning, "INCASE clause exception for ~s failed: ~p",
[RuleID, Error]),
[RuleId, Error]),
{error, Reason};
_:Error:StkTrace ->
emqx_rule_metrics:inc_rules_exception(RuleID),
emqx_rule_metrics:inc_rules_exception(RuleId),
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
[RuleID, Error, StkTrace]),
[RuleId, Error, StkTrace]),
{error, {Error, StkTrace}}
end.
@ -216,10 +218,8 @@ match_conditions({}, _Data) ->
true.
%% comparing numbers against strings
compare(Op, undefined, undefined) ->
do_compare(Op, undefined, undefined);
compare(_Op, L, R) when L == undefined; R == undefined ->
false;
compare(Op, L, R) when L == undefined; R == undefined ->
do_compare(Op, L, R);
compare(Op, L, R) when is_number(L), is_binary(R) ->
do_compare(Op, L, number(R));
compare(Op, L, R) when is_binary(L), is_number(R) ->
@ -232,10 +232,14 @@ compare(Op, L, R) ->
do_compare(Op, L, R).
do_compare('=', L, R) -> L == R;
do_compare('>', L, R) when L == undefined; R == undefined -> false;
do_compare('>', L, R) -> L > R;
do_compare('<', L, R) when L == undefined; R == undefined -> false;
do_compare('<', L, R) -> L < R;
do_compare('<=', L, R) -> L =< R;
do_compare('>=', L, R) -> L >= R;
do_compare('<=', L, R) ->
do_compare('=', L, R) orelse do_compare('<', L, R);
do_compare('>=', L, R) ->
do_compare('=', L, R) orelse do_compare('>', L, R);
do_compare('<>', L, R) -> L /= R;
do_compare('!=', L, R) -> L /= R;
do_compare('=~', T, F) -> emqx_topic:match(T, F).
@ -245,9 +249,10 @@ number(Bin) ->
catch error:badarg -> binary_to_float(Bin)
end.
%% Step3 -> Take actions
%% %% Step3 -> Take actions
%% fallback actions already have `rule_id` in `metadata`
take_actions(Actions, Selected, Envs, OnFailed) ->
[take_action(ActInst, Selected, Envs, OnFailed, ?ActionMaxRetry)
[take_action(ActInst, Selected, emqx_rule_utils:add_metadata(Envs, ActInst), OnFailed, ?ActionMaxRetry)
|| ActInst <- Actions].
take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = ActInst,
@ -312,12 +317,12 @@ wait_action_on(Id, RetryN) ->
end
end.
handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) ->
?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]),
handle_action_failure(continue, _Id, Fallbacks, Selected, Envs = #{metadata := Metadata}, Reason) ->
?LOG_RULE_ACTION(error, Metadata, "Continue next action, reason: ~0p", [Reason]),
_ = take_actions(Fallbacks, Selected, Envs, continue),
failed;
handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) ->
?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]),
handle_action_failure(stop, Id, Fallbacks, Selected, Envs = #{metadata := Metadata}, Reason) ->
?LOG_RULE_ACTION(error, Metadata, "Skip all actions, reason: ~0p", [Reason]),
_ = take_actions(Fallbacks, Selected, Envs, continue),
error({take_action_failed, {Id, Reason}}).
@ -429,10 +434,6 @@ do_apply_func(Name, Args, Input) ->
Result -> Result
end.
add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) ->
NewMetadata = maps:merge(maps:get(metadata, Input, #{}), Metadata),
Input#{metadata => NewMetadata}.
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------

View File

@ -16,6 +16,9 @@
-module(emqx_rule_utils).
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-export([ replace_var/2
]).
@ -59,6 +62,10 @@
, can_topic_match_oneof/2
]).
-export([ add_metadata/2
, log_action/4
]).
-compile({no_auto_import,
[ float/1
]}).
@ -371,3 +378,30 @@ can_topic_match_oneof(Topic, Filters) ->
lists:any(fun(Fltr) ->
emqx_topic:match(Topic, Fltr)
end, Filters).
add_metadata(Envs, Metadata) when is_map(Envs), is_map(Metadata) ->
NMetadata = maps:merge(maps:get(metadata, Envs, #{}), Metadata),
Envs#{metadata => NMetadata};
add_metadata(Envs, Action) when is_map(Envs), is_record(Action, action_instance)->
Metadata = gen_metadata_from_action(Action),
NMetadata = maps:merge(maps:get(metadata, Envs, #{}), Metadata),
Envs#{metadata => NMetadata}.
gen_metadata_from_action(#action_instance{name = Name, args = undefined}) ->
#{action_name => Name, resource_id => undefined};
gen_metadata_from_action(#action_instance{name = Name, args = Args})
when is_map(Args) ->
#{action_name => Name, resource_id => maps:get(<<"$resource">>, Args, undefined)};
gen_metadata_from_action(#action_instance{name = Name}) ->
#{action_name => Name, resource_id => undefined}.
log_action(Level, Metadata, Fmt, Args) ->
?LOG(Level,
"Rule: ~p; Action: ~p; Resource: ~p. " ++ Fmt,
metadata_values(Metadata) ++ Args).
metadata_values(Metadata) ->
RuleId = maps:get(rule_id, Metadata, undefined),
ActionName = maps:get(action_name, Metadata, undefined),
ResourceName = maps:get(resource_id, Metadata, undefined),
[RuleId, ActionName, ResourceName].

View File

@ -126,6 +126,11 @@ groups() ->
t_sqlparse_array_range_1,
t_sqlparse_array_range_2,
t_sqlparse_true_false,
t_sqlparse_compare_undefined,
t_sqlparse_compare_null_null,
t_sqlparse_compare_null_notnull,
t_sqlparse_compare_notnull_null,
t_sqlparse_compare,
t_sqlparse_new_map,
t_sqlparse_invalid_json
]},
@ -2514,6 +2519,235 @@ t_sqlparse_true_false(_Config) ->
<<"c">> := [true]
}, Res00).
-define(TEST_SQL(SQL),
emqx_rule_sqltester:test(
#{<<"rawsql">> => SQL,
<<"ctx">> => #{<<"payload">> => <<"{}">>,
<<"topic">> => <<"t/a">>}})).
t_sqlparse_compare_undefined(_Config) ->
Sql00 = "select "
" * "
"from \"t/#\" "
"where dev != undefined ",
%% no match
?assertMatch({error, nomatch}, ?TEST_SQL(Sql00)),
Sql01 = "select "
" 'd' as dev "
"from \"t/#\" "
"where dev != undefined ",
{ok, Res01} = ?TEST_SQL(Sql01),
%% pass
?assertMatch(#{}, Res01),
Sql02 = "select "
" * "
"from \"t/#\" "
"where dev != 'undefined' ",
{ok, Res02} = ?TEST_SQL(Sql02),
%% pass
?assertMatch(#{}, Res02).
t_sqlparse_compare_null_null(_Config) ->
%% test undefined == undefined
Sql00 = "select "
" a = b as c "
"from \"t/#\" ",
{ok, Res00} = ?TEST_SQL(Sql00),
?assertMatch(#{<<"c">> := true
}, Res00),
%% test undefined != undefined
Sql01 = "select "
" a != b as c "
"from \"t/#\" ",
{ok, Res01} = ?TEST_SQL(Sql01),
?assertMatch(#{<<"c">> := false
}, Res01),
%% test undefined > undefined
Sql02 = "select "
" a > b as c "
"from \"t/#\" ",
{ok, Res02} = ?TEST_SQL(Sql02),
?assertMatch(#{<<"c">> := false
}, Res02),
%% test undefined < undefined
Sql03 = "select "
" a < b as c "
"from \"t/#\" ",
{ok, Res03} = ?TEST_SQL(Sql03),
?assertMatch(#{<<"c">> := false
}, Res03),
%% test undefined <= undefined
Sql04 = "select "
" a <= b as c "
"from \"t/#\" ",
{ok, Res04} = ?TEST_SQL(Sql04),
?assertMatch(#{<<"c">> := true
}, Res04),
%% test undefined >= undefined
Sql05 = "select "
" a >= b as c "
"from \"t/#\" ",
{ok, Res05} = ?TEST_SQL(Sql05),
?assertMatch(#{<<"c">> := true
}, Res05).
t_sqlparse_compare_null_notnull(_Config) ->
%% test undefined == b
Sql00 = "select "
" 'b' as b, a = b as c "
"from \"t/#\" ",
{ok, Res00} = ?TEST_SQL(Sql00),
?assertMatch(#{<<"c">> := false
}, Res00),
%% test undefined != b
Sql01 = "select "
" 'b' as b, a != b as c "
"from \"t/#\" ",
{ok, Res01} = ?TEST_SQL(Sql01),
?assertMatch(#{<<"c">> := true
}, Res01),
%% test undefined > b
Sql02 = "select "
" 'b' as b, a > b as c "
"from \"t/#\" ",
{ok, Res02} = ?TEST_SQL(Sql02),
?assertMatch(#{<<"c">> := false
}, Res02),
%% test undefined < b
Sql03 = "select "
" 'b' as b, a < b as c "
"from \"t/#\" ",
{ok, Res03} = ?TEST_SQL(Sql03),
?assertMatch(#{<<"c">> := false
}, Res03),
%% test undefined <= b
Sql04 = "select "
" 'b' as b, a <= b as c "
"from \"t/#\" ",
{ok, Res04} = ?TEST_SQL(Sql04),
?assertMatch(#{<<"c">> := false
}, Res04),
%% test undefined >= b
Sql05 = "select "
" 'b' as b, a >= b as c "
"from \"t/#\" ",
{ok, Res05} = ?TEST_SQL(Sql05),
?assertMatch(#{<<"c">> := false
}, Res05).
t_sqlparse_compare_notnull_null(_Config) ->
%% test 'a' == undefined
Sql00 = "select "
" 'a' as a, a = b as c "
"from \"t/#\" ",
{ok, Res00} = ?TEST_SQL(Sql00),
?assertMatch(#{<<"c">> := false
}, Res00),
%% test 'a' != undefined
Sql01 = "select "
" 'a' as a, a != b as c "
"from \"t/#\" ",
{ok, Res01} = ?TEST_SQL(Sql01),
?assertMatch(#{<<"c">> := true
}, Res01),
%% test 'a' > undefined
Sql02 = "select "
" 'a' as a, a > b as c "
"from \"t/#\" ",
{ok, Res02} = ?TEST_SQL(Sql02),
?assertMatch(#{<<"c">> := false
}, Res02),
%% test 'a' < undefined
Sql03 = "select "
" 'a' as a, a < b as c "
"from \"t/#\" ",
{ok, Res03} = ?TEST_SQL(Sql03),
?assertMatch(#{<<"c">> := false
}, Res03),
%% test 'a' <= undefined
Sql04 = "select "
" 'a' as a, a <= b as c "
"from \"t/#\" ",
{ok, Res04} = ?TEST_SQL(Sql04),
?assertMatch(#{<<"c">> := false
}, Res04),
%% test 'a' >= undefined
Sql05 = "select "
" 'a' as a, a >= b as c "
"from \"t/#\" ",
{ok, Res05} = ?TEST_SQL(Sql05),
?assertMatch(#{<<"c">> := false
}, Res05).
t_sqlparse_compare(_Config) ->
Sql00 = "select "
" 'a' as a, 'a' as b, a = b as c "
"from \"t/#\" ",
{ok, Res00} = ?TEST_SQL(Sql00),
?assertMatch(#{<<"c">> := true
}, Res00),
Sql01 = "select "
" is_null(a) as c "
"from \"t/#\" ",
{ok, Res01} = ?TEST_SQL(Sql01),
?assertMatch(#{<<"c">> := true
}, Res01),
Sql02 = "select "
" 1 as a, 2 as b, a < b as c "
"from \"t/#\" ",
{ok, Res02} = ?TEST_SQL(Sql02),
?assertMatch(#{<<"c">> := true
}, Res02),
Sql03 = "select "
" 1 as a, 2 as b, a > b as c "
"from \"t/#\" ",
{ok, Res03} = ?TEST_SQL(Sql03),
?assertMatch(#{<<"c">> := false
}, Res03),
Sql04 = "select "
" 1 as a, 2 as b, a = b as c "
"from \"t/#\" ",
{ok, Res04} = ?TEST_SQL(Sql04),
?assertMatch(#{<<"c">> := false
}, Res04),
%% test 'a' >= undefined
Sql05 = "select "
" 1 as a, 2 as b, a >= b as c "
"from \"t/#\" ",
{ok, Res05} = ?TEST_SQL(Sql05),
?assertMatch(#{<<"c">> := false
}, Res05),
%% test 'a' >= undefined
Sql06 = "select "
" 1 as a, 2 as b, a <= b as c "
"from \"t/#\" ",
{ok, Res06} = ?TEST_SQL(Sql06),
?assertMatch(#{<<"c">> := true
}, Res06).
t_sqlparse_new_map(_Config) ->
%% construct a range without 'as'
Sql00 = "select "

View File

@ -6,6 +6,7 @@
-define(PORT, 1884).
-export([start/0]).
-export([gen_register_packet/2]).
start() ->
io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),

View File

@ -6,6 +6,7 @@
-define(PORT, 1884).
-export([start/0]).
-export([gen_register_packet/2]).
start() ->
io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),

View File

@ -5,7 +5,7 @@
-define(HOST, {127,0,0,1}).
-define(PORT, 1884).
-export([start/0]).
-export([start/1]).
start(LoopTimes) ->
io:format("start to connect ~p:~p~n", [?HOST, ?PORT]),

View File

@ -1,6 +1,6 @@
{application, emqx_web_hook,
[{description, "EMQ X WebHook Plugin"},
{vsn, "4.3.13"}, % strict semver, bump manually!
{vsn, "4.3.14"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_web_hook_sup]},
{applications, [kernel,stdlib,ehttpc]},

View File

@ -1,12 +1,7 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{<<"4\\.3\\.[0-2]">>,
[{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-7]">>,
[{<<"4\\.3\\.[0-7]">>,
[{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
@ -26,15 +21,10 @@
{"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.12",
{<<"4\\.3\\.1[2-3]">>,
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{<<"4\\.3\\.[0-2]">>,
[{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-7]">>,
[{<<"4\\.3\\.[0-7]">>,
[{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
@ -54,6 +44,6 @@
{"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.12",
{<<"4\\.3\\.1[2-3]">>,
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}.

View File

@ -259,25 +259,27 @@ on_action_data_to_webserver(Selected, _Envs =
'BodyTokens' := BodyTokens,
'Pool' := Pool,
'RequestTimeout' := RequestTimeout},
clientid := ClientID}) ->
clientid := ClientID,
metadata := Metadata}) ->
NBody = format_msg(BodyTokens, clear_user_property_header(Selected)),
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
Req = create_req(Method, NPath, Headers, NBody),
case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]),
emqx_rule_metrics:inc_actions_success(Id);
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
emqx_rule_metrics:inc_actions_success(Id);
{ok, StatusCode, _} ->
?LOG(warning, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]),
?LOG_RULE_ACTION(warning, Metadata, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]),
emqx_rule_metrics:inc_actions_error(Id),
{badact, StatusCode};
{ok, StatusCode, _, _} ->
?LOG(warning, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]),
?LOG_RULE_ACTION(warning, Metadata, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]),
emqx_rule_metrics:inc_actions_error(Id),
{badact, StatusCode};
{error, Reason} ->
?LOG(error, "HTTP request failed path: ~p error: ~p", [NPath, Reason]),
?LOG_RULE_ACTION(error, Metadata, "HTTP request failed path: ~p error: ~p", [NPath, Reason]),
emqx_rule_metrics:inc_actions_error(Id),
{badact, Reason}
end.

View File

@ -38,9 +38,20 @@ export PROGNAME="erl"
DYNLIBS_DIR="$RUNNER_ROOT_DIR/dynlibs"
ERTS_LIB_DIR="$ERTS_DIR/../lib"
# Fix bin permission for all erts bin files
# the 'x' attributes may get lost if the files are extracted from a relup package
find "$BINDIR" -exec chmod a+x {} \;
# Echo to stderr on errors
echoerr() { echo "$*" 1>&2; }
die() {
set +x
echoerr "ERROR: $1"
errno=${2:-1}
exit "$errno"
}
assert_node_alive() {
if ! relx_nodetool "ping" > /dev/null; then
die "node_is_not_running!" 1
@ -48,9 +59,8 @@ assert_node_alive() {
}
check_erlang_start() {
# Fix bin permission
find "$BINDIR" ! -executable -exec chmod a+x {} \;
"$BINDIR/$PROGNAME" -boot "$REL_DIR/start_clean" -eval "crypto:start(),halt()"
# set ERL_CRASH_DUMP_BYTES to zero so it will not write a crash dump file
env ERL_CRASH_DUMP_BYTES=0 "$BINDIR/$PROGNAME" -boot "$REL_DIR/start_clean" -eval "crypto:start(),halt()"
}
if ! check_erlang_start >/dev/null 2>&1; then

View File

@ -16,6 +16,7 @@ RUN apk add --no-cache \
libc-dev \
libstdc++ \
bash \
tzdata \
jq
COPY . /emqx
@ -37,7 +38,7 @@ COPY deploy/docker/docker-entrypoint.sh /usr/bin/
COPY --from=builder /emqx/_build/$EMQX_NAME/rel/emqx /opt/emqx
RUN ln -s /opt/emqx/bin/* /usr/local/bin/
RUN apk add --no-cache curl ncurses-libs openssl sudo libstdc++ bash
RUN apk add --no-cache curl ncurses-libs openssl sudo libstdc++ bash tzdata
WORKDIR /opt/emqx

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.4.8-beta.1"}).
-define(EMQX_RELEASE, {opensource, "4.4.8-beta.2"}).
-else.

View File

@ -1,6 +1,6 @@
{application, emqx_dashboard,
[{description, "EMQX Web Dashboard"},
{vsn, "4.4.6"}, % strict semver, bump manually!
{vsn, "4.4.7"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_dashboard_sup]},
{applications, [kernel,stdlib,mnesia,minirest]},

View File

@ -41,7 +41,7 @@
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {redbug, "2.0.7"}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.2"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.3"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}

View File

@ -51,6 +51,8 @@ overrides() ->
[ {add, [ {extra_src_dirs, [{"etc", [{recursive,true}]}]}
, {erl_opts, [{compile_info, [{emqx_vsn, get_vsn()}]}]}
]}
, {add, relx, [{erl_opts, [{d, 'RLX_LOG', rlx_log}]}]}
, {add, snabbkaffe,
[{erl_opts, common_compile_opts()}]}
] ++ community_plugin_overrides().
@ -88,7 +90,7 @@ project_app_dirs() ->
["apps/*", alternative_lib_dir() ++ "/*", "."].
plugins(HasElixir) ->
[ {relup_helper,{git,"https://github.com/emqx/relup_helper", {tag, "2.0.0"}}}
[ {relup_helper,{git,"https://github.com/emqx/relup_helper", {tag, "2.1.0"}}}
, {er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.4"}}}
%% emqx main project does not require port-compiler
%% pin at root level for deterministic

View File

@ -29,6 +29,14 @@ check_apps() {
app_path="."
fi
src_file="$app_path/src/$(basename "$app").app.src"
old_app_exists=0
git show "$latest_release":"$src_file" >/dev/null 2>&1 || old_app_exists="$?"
if [ "$old_app_exists" != "0" ]; then
echo "$app is new, skipping version check"
continue
fi
old_app_version="$(git show "$latest_release":"$src_file" | grep vsn | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')"
now_app_version=$(grep -E 'vsn' "$src_file" | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')
if [ "$old_app_version" = "$now_app_version" ]; then
@ -58,7 +66,7 @@ check_apps() {
now_app_version_semver=($(parse_semver "$now_app_version"))
if [ "${old_app_version_semver[0]}" = "${now_app_version_semver[0]}" ] && \
[ "${old_app_version_semver[1]}" = "${now_app_version_semver[1]}" ] && \
[ "$(( "${old_app_version_semver[2]}" + 1 ))" = "${now_app_version_semver[2]}" ]; then
[ "$(( old_app_version_semver[2] + 1 ))" = "${now_app_version_semver[2]}" ]; then
true
else
echo "$src_file: non-strict semver version bump from $old_app_version to $now_app_version"

View File

@ -2,21 +2,39 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.7",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]}]},
{"4.4.6",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]}]},
{"4.4.5",
[{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.6",
[{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}]},
{"4.4.5",
[{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
[{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
@ -32,7 +50,8 @@
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{add_module,emqx_calendar},
[{load_module,emqx,brutal_purge,soft_purge,[]},
{add_module,emqx_calendar},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
@ -47,6 +66,7 @@
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@ -79,6 +99,7 @@
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
@ -139,7 +160,6 @@
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
@ -152,6 +172,7 @@
{add_module,emqx_relup},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -166,24 +187,43 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.7",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]}]},
[{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.6",
[{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]}]},
{"4.4.5",
[{update,emqx_broker_sup,supervisor},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
[{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]}]},
{"4.4.5",
[{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
@ -193,10 +233,10 @@
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
[{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{update,emqx_broker_sup,supervisor},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
@ -211,12 +251,13 @@
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics, brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]},
@ -241,6 +282,7 @@
{update,emqx_os_mon,{advanced,[]}},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
@ -321,7 +363,6 @@
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{delete_module,emqx_relup}]},
{<<".*">>,[]}]}.

View File

@ -22,6 +22,8 @@
-include("logger.hrl").
-include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-logger_header("[Channel]").
-ifdef(TEST).
@ -316,7 +318,7 @@ handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
},
case enhanced_auth(?CONNECT_PACKET(NConnPkt), NChannel1) of
{ok, Properties, NChannel2} ->
process_connect(Properties, ensure_connected(NChannel2));
process_connect(Properties, NChannel2);
{continue, Properties, NChannel2} ->
handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
{error, ReasonCode, NChannel2} ->
@ -332,7 +334,7 @@ handle_in(Packet = ?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION, _Properties),
{ok, NProperties, NChannel} ->
case ConnState of
connecting ->
process_connect(NProperties, ensure_connected(NChannel));
process_connect(NProperties, NChannel);
connected ->
handle_out(auth, {?RC_SUCCESS, NProperties}, NChannel);
_ ->
@ -522,14 +524,14 @@ process_connect(AckProps, Channel = #channel{conninfo = ConnInfo,
case emqx_cm:open_session(CleanStart, ClientInfo, ConnInfo) of
{ok, #{session := Session, present := false}} ->
NChannel = Channel#channel{session = Session},
handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, NChannel);
handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, ensure_connected(NChannel));
{ok, #{session := Session, present := true, pendings := Pendings}} ->
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
NChannel = Channel#channel{session = Session,
resuming = true,
pendings = Pendings1
},
handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, NChannel);
handle_out(connack, {?RC_SUCCESS, sp(true), AckProps}, ensure_connected(NChannel));
{error, client_id_unavailable} ->
handle_out(connack, ?RC_CLIENT_IDENTIFIER_NOT_VALID, Channel);
{error, Reason} ->
@ -875,11 +877,14 @@ handle_out(disconnect, ReasonCode, Channel) when is_integer(ReasonCode) ->
ReasonName = disconnect_reason(ReasonCode),
handle_out(disconnect, {ReasonCode, ReasonName}, Channel);
handle_out(disconnect, {ReasonCode, ReasonName}, Channel = ?IS_MQTT_V5) ->
Packet = ?DISCONNECT_PACKET(ReasonCode),
handle_out(disconnect, {ReasonCode, ReasonName}, Channel) ->
handle_out(disconnect, {ReasonCode, ReasonName, #{}}, Channel);
handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) ->
Packet = ?DISCONNECT_PACKET(ReasonCode, Props),
{ok, [{outgoing, Packet}, {close, ReasonName}], Channel};
handle_out(disconnect, {_ReasonCode, ReasonName}, Channel) ->
handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) ->
{ok, {close, ReasonName}, Channel};
handle_out(auth, {ReasonCode, Properties}, Channel) ->
@ -979,11 +984,15 @@ handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
reply(Session, Channel#channel{takeover = true});
handle_call({takeover, 'end'}, Channel = #channel{session = Session,
pendings = Pendings}) ->
pendings = Pendings,
conninfo = #{clientid := ClientId}}) ->
ok = emqx_session:takeover(Session),
%% TODO: Should not drain deliver here (side effect)
Delivers = emqx_misc:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings),
?tp(debug,
emqx_channel_takeover_end,
#{clientid => ClientId}),
disconnect_and_shutdown(takeovered, AllPendings, Channel);
handle_call(list_acl_cache, Channel) ->
@ -1057,6 +1066,9 @@ handle_info(clean_acl_cache, Channel) ->
ok = emqx_acl_cache:empty_acl_cache(),
{ok, Channel};
handle_info({disconnect, ReasonCode, ReasonName, Props}, Channel) ->
handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel);
handle_info(Info, Channel) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{ok, Channel}.

View File

@ -22,6 +22,8 @@
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("stdlib/include/qlc.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -61,7 +63,9 @@
, lookup_channels/2
]).
-export([all_channels/0]).
-export([all_channels/0,
channel_with_session_table/0,
live_connection_table/0]).
%% gen_server callbacks
-export([ init/1
@ -158,8 +162,11 @@ connection_closed(ClientId) ->
connection_closed(ClientId, self()).
-spec(connection_closed(emqx_types:clientid(), chan_pid()) -> true).
connection_closed(ClientId, ChanPid) ->
ets:delete_object(?CHAN_CONN_TAB, {ClientId, ChanPid}).
connection_closed(_ClientId, _ChanPid) ->
%% We can't clean CHAN_CONN_TAB because records for dead connections
%% are required for `get_chann_conn_mod/1` function, and `get_chann_conn_mod/1`
%% is used for takeover.
true.
%% @doc Get info of a channel.
-spec(get_chan_info(emqx_types:clientid()) -> maybe(emqx_types:infos())).
@ -435,6 +442,38 @@ all_channels() ->
Pat = [{{'_', '$1'}, [], ['$1']}],
ets:select(?CHAN_TAB, Pat).
%% @doc Get clientinfo for all clients with sessions
channel_with_session_table() ->
Ms = ets:fun2ms(
fun({{ClientId, _ChanPid},
Info,
_Stats}) ->
{ClientId, Info}
end),
Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]),
qlc:q([ {ClientId, ConnState, ConnInfo, ClientInfo}
|| {ClientId,
#{conn_state := ConnState,
clientinfo := ClientInfo,
conninfo := #{clean_start := false} = ConnInfo}} <- Table
]).
%% @doc Get all local connection query handle
live_connection_table() ->
Ms = ets:fun2ms(
fun({{ClientId, ChanPid}, _}) ->
{ClientId, ChanPid}
end),
Table = ets:table(?CHAN_CONN_TAB, [{traverse, {select, Ms}}]),
qlc:q([{ClientId, ChanPid} || {ClientId, ChanPid} <- Table, is_channel_connected(ClientId, ChanPid)]).
is_channel_connected(ClientId, ChanPid) when node(ChanPid) =:= node() ->
case get_chan_info(ClientId, ChanPid) of
#{conn_state := disconnected} -> false;
_ -> true
end;
is_channel_connected(_ClientId, _ChanPid) -> false.
%% @doc Lookup channels.
-spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())).
lookup_channels(ClientId) ->

View File

@ -217,20 +217,46 @@ load_plugin_conf(AppName, PluginDir) ->
ensure_file(File) ->
case filelib:is_file(File) of
false ->
DefaultPlugins = [ {emqx_management, true}
, {emqx_dashboard, true}
, {emqx_modules, false}
, {emqx_recon, true}
, {emqx_retainer, true}
, {emqx_telemetry, true}
, {emqx_rule_engine, true}
, {emqx_bridge_mqtt, false}
],
DefaultPlugins = default_plugins(),
write_loaded(DefaultPlugins);
true ->
ok
end.
-ifndef(EMQX_ENTERPRISE).
%% default plugins see rebar.config.erl
default_plugins() ->
[
{emqx_management, true},
{emqx_dashboard, true},
%% emqx_modules is not a plugin, but a normal application starting when boots.
{emqx_modules, false},
{emqx_retainer, true},
{emqx_recon, true},
{emqx_telemetry, true},
{emqx_rule_engine, true},
{emqx_bridge_mqtt, false}
].
-else.
default_plugins() ->
[
{emqx_management, true},
{emqx_dashboard, true},
%% enterprise version of emqx_modules is a plugin
{emqx_modules, true},
%% retainer is managed by emqx_modules.
%% default is true in data/load_modules. **NOT HERE**
{emqx_retainer, false},
{emqx_recon, true},
{emqx_telemetry, true},
{emqx_rule_engine, true},
{emqx_bridge_mqtt, false}
].
-endif.
with_loaded_file(File, SuccFun) ->
case read_loaded(File) of
{ok, Names0} ->
@ -265,7 +291,7 @@ load_plugins(Names, Persistent) ->
[] -> ok;
NotFound -> ?LOG(alert, "cannot_find_plugins: ~p", [NotFound])
end,
NeedToLoad = Names -- NotFound -- names(started_app),
NeedToLoad = (Names -- NotFound) -- names(started_app),
lists:foreach(fun(Name) ->
Plugin = find_plugin(Name, Plugins),
load_plugin(Plugin#plugin.name, Persistent)

View File

@ -61,5 +61,5 @@ t_restart_shared_sub(Config) when is_list(Config) ->
after 2000 ->
false
end);
t_restart_shared_sub({'end', Config}) ->
t_restart_shared_sub({'end', _Config}) ->
emqx:unsubscribe(<<"$share/grpa/t/a">>).

View File

@ -0,0 +1,83 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_node_helpers).
-include_lib("eunit/include/eunit.hrl").
-define(SLAVE_START_APPS, [emqx]).
-export([start_slave/1,
start_slave/2,
stop_slave/1]).
start_slave(Name) ->
start_slave(Name, #{}).
start_slave(Name, Opts) ->
{ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
[{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 10000},
{startup_timeout, 10000},
{erl_flags, ebin_path()}]),
pong = net_adm:ping(Node),
setup_node(Node, Opts),
Node.
stop_slave(Node) ->
rpc:call(Node, ekka, leave, []),
ct_slave:stop(Node).
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
is_lib(Path) ->
string:prefix(Path, code:lib_dir()) =:= nomatch.
setup_node(Node, #{} = Opts) ->
Listeners = maps:get(listeners, Opts, []),
StartApps = maps:get(start_apps, Opts, ?SLAVE_START_APPS),
DefaultEnvHandler =
fun(emqx) ->
application:set_env(
emqx,
listeners,
Listeners),
application:set_env(gen_rpc, port_discovery, stateless),
ok;
(_) ->
ok
end,
EnvHandler = maps:get(env_handler, Opts, DefaultEnvHandler),
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [StartApps, EnvHandler]),
rpc:call(Node, ekka, join, [node()]),
%% Sanity check. Assert that `gen_rpc' is set up correctly:
?assertEqual( Node
, gen_rpc:call(Node, erlang, node, [])
),
?assertEqual( node()
, gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []])
),
ok.

View File

@ -89,6 +89,34 @@ t_load(_) ->
?assertEqual(ignore, emqx_plugins:load()),
?assertEqual(ignore, emqx_plugins:unload()).
-ifndef(EMQX_ENTERPRISE).
default_plugins() ->
[
{emqx_bridge_mqtt, false},
{emqx_dashboard, true},
{emqx_management, true},
{emqx_modules, false},
{emqx_recon, true},
{emqx_retainer, true},
{emqx_rule_engine, true},
{emqx_telemetry, true}
].
-else.
default_plugins() ->
[
{emqx_bridge_mqtt, false},
{emqx_dashboard, true},
{emqx_management, true},
{emqx_modules, true},
{emqx_recon, true},
{emqx_retainer, false},
{emqx_rule_engine, true},
{emqx_telemetry, true}
].
-endif.
t_ensure_default_loaded_plugins_file(Config) ->
%% this will trigger it to write the default plugins to the
%% inexistent file; but it won't truly load them in this test
@ -96,17 +124,8 @@ t_ensure_default_loaded_plugins_file(Config) ->
TmpFilepath = ?config(tmp_filepath, Config),
ok = emqx_plugins:load(),
{ok, Contents} = file:consult(TmpFilepath),
?assertEqual(
[ {emqx_bridge_mqtt, false}
, {emqx_dashboard, true}
, {emqx_management, true}
, {emqx_modules, false}
, {emqx_recon, true}
, {emqx_retainer, true}
, {emqx_rule_engine, true}
, {emqx_telemetry, true}
],
lists:sort(Contents)),
DefaultPlugins = default_plugins(),
?assertEqual(DefaultPlugins, lists:sort(Contents)),
ok.
t_init_config(_) ->

View File

@ -380,7 +380,7 @@ t_local(_) ->
emqtt:stop(ConnPid1),
emqtt:stop(ConnPid2),
stop_slave(Node),
emqx_node_helpers:stop_slave(Node),
?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)),
?assertEqual(local, RemoteLocalGroupStrategy),
@ -415,7 +415,7 @@ t_local_fallback(_) ->
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1]),
emqtt:stop(ConnPid1),
stop_slave(Node),
emqx_node_helpers:stop_slave(Node),
?assertEqual(UsedSubPid1, UsedSubPid2),
ok.
@ -536,55 +536,8 @@ recv_msgs(Count, Msgs) ->
end.
start_slave(Name, Port) ->
{ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
[{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 10000},
{startup_timeout, 10000},
{erl_flags, ebin_path()}]),
pong = net_adm:ping(Node),
ok = setup_node(Node, Port),
Node.
stop_slave(Node) ->
rpc:call(Node, ekka, leave, []),
ct_slave:stop(Node).
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
is_lib(Path) ->
string:prefix(Path, code:lib_dir()) =:= nomatch.
setup_node(Node, Port) ->
EnvHandler =
fun(emqx) ->
application:set_env(
emqx,
listeners,
[#{listen_on => {{127,0,0,1},Port},
name => "internal",
opts => [{zone,internal}],
proto => tcp}]),
application:set_env(gen_rpc, port_discovery, stateless),
ok;
(_) ->
ok
end,
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [[emqx], EnvHandler]),
rpc:call(Node, ekka, join, [node()]),
%% Sanity check. Assert that `gen_rpc' is set up correctly:
?assertEqual( Node
, gen_rpc:call(Node, erlang, node, [])
),
?assertEqual( node()
, gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []])
),
ok.
Listeners = [#{listen_on => {{127,0,0,1}, Port},
name => "internal",
opts => [{zone,internal}],
proto => tcp}],
emqx_node_helpers:start_slave(Name, #{listeners => Listeners}).