diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index 04f4709b8..de931ae12 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -34,8 +34,8 @@ # direction = egress # ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url # url = "http://localhost:9901/messages/${topic}" -# request_timeout = "30s" -# connect_timeout = "30s" +# request_timeout = "15s" +# connect_timeout = "15s" # max_retries = 3 # retry_interval = "10s" # pool_type = "random" diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index a6681d3f1..d46ce217e 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -222,7 +222,10 @@ update(Type, Name, {OldConf, Conf}) -> true -> %% we don't need to recreate the bridge if this config change is only to %% toggole the config 'bridge.{type}.{name}.enable' - ok + case maps:get(enable, Conf, true) of + false -> stop(Type, Name); + true -> start(Type, Name) + end end. recreate(Type, Name) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 0f291ac1a..a5b9aa984 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -158,8 +158,8 @@ method_example(_Type, _Direction, put) -> info_example_basic(http, _) -> #{ url => <<"http://localhost:9901/messages/${topic}">>, - request_timeout => <<"30s">>, - connect_timeout => <<"30s">>, + request_timeout => <<"15s">>, + connect_timeout => <<"15s">>, max_retries => 3, retry_interval => <<"10s">>, pool_type => <<"random">>, @@ -276,7 +276,7 @@ schema("/bridges/:id/operation/:operation") -> '/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf0}) -> Conf = filter_out_request_body(Conf0), - BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()), + BridgeName = emqx_misc:gen_id(), case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, _} -> {400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)}; @@ -356,9 +356,8 @@ operation_to_conf_req(<<"restart">>) -> restart; operation_to_conf_req(_) -> invalid. ensure_bridge_created(BridgeType, BridgeName, Conf) -> - Conf1 = maps:without([<<"type">>, <<"name">>], Conf), case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], - Conf1, #{override_to => cluster}) of + Conf, #{override_to => cluster}) of {ok, _} -> ok; {error, Reason} -> {error, error_msg('BAD_ARG', Reason)} @@ -411,12 +410,12 @@ aggregate_metrics(AllMetrics) -> format_resp(#{id := Id, raw_config := RawConf, resource_data := #{status := Status, metrics := Metrics}}) -> - {Type, Name} = emqx_bridge:parse_bridge_id(Id), + {Type, BridgeName} = emqx_bridge:parse_bridge_id(Id), IsConnected = fun(started) -> connected; (_) -> disconnected end, RawConf#{ id => Id, type => Type, - name => Name, + name => maps:get(<<"name">>, RawConf, BridgeName), node => node(), status => IsConnected(Status), metrics => Metrics @@ -431,8 +430,8 @@ rpc_multicall(Func, Args) -> end. filter_out_request_body(Conf) -> - ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>, - <<"metrics">>, <<"node">>], + ExtraConfs = [<<"id">>, <<"type">>, <<"status">>, <<"node_status">>, + <<"node_metrics">>, <<"metrics">>, <<"node">>], maps:without(ExtraConfs, Conf). rpc_call(Node, Fun, Args) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl index 540a6a070..494911d21 100644 --- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl @@ -59,7 +59,7 @@ Template with variables is allowed. """ })} , {request_timeout, mk(emqx_schema:duration_ms(), - #{ default => <<"30s">> + #{ default => <<"15s">> , desc =>""" How long will the HTTP request timeout. """ @@ -84,6 +84,10 @@ basic_config() -> #{ desc => "Enable or disable this bridge" , default => true })} + , {name, + mk(binary(), + #{ desc => "Bridge name, used as a human-readable description of the bridge." + })} , {direction, mk(egress, #{ desc => "The direction of this bridge, MUST be egress" diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 82fc79ebf..00d461098 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -43,9 +43,13 @@ http_schema(Method) -> common_bridge_fields() -> [ {enable, mk(boolean(), - #{ desc =>"Enable or disable this bridge" + #{ desc => "Enable or disable this bridge" , default => true })} + , {name, + mk(binary(), + #{ desc => "Bridge name, used as a human-readable description of the bridge." + })} , {connector, mk(binary(), #{ nullable => false diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 7724d467c..807ad32f6 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -23,12 +23,13 @@ -define(CONF_DEFAULT, <<"bridges: {}">>). -define(BRIDGE_TYPE, <<"http">>). -define(BRIDGE_NAME, <<"test_bridge">>). --define(BRIDGE_ID, <<"http:test_bridge">>). -define(URL(PORT, PATH), list_to_binary( io_lib:format("http://localhost:~s/~s", [integer_to_list(PORT), PATH]))). --define(HTTP_BRIDGE(URL), +-define(HTTP_BRIDGE(URL, TYPE, NAME), #{ + <<"type">> => TYPE, + <<"name">> => NAME, <<"url">> => URL, <<"local_topic">> => <<"emqx_http/#">>, <<"method">> => <<"post">>, @@ -145,32 +146,18 @@ t_http_crud_apis(_) -> %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1)#{ - <<"type">> => ?BRIDGE_TYPE, - <<"name">> => ?BRIDGE_NAME - }), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)), %ct:pal("---bridge: ~p", [Bridge]), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID - , <<"type">> := ?BRIDGE_TYPE - , <<"name">> := ?BRIDGE_NAME - , <<"status">> := _ - , <<"node_status">> := [_|_] - , <<"metrics">> := _ - , <<"node_metrics">> := [_|_] - , <<"url">> := URL1 - }, jsx:decode(Bridge)), - - %% create a again returns an error - {ok, 400, RetMsg} = request(post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1)#{ - <<"type">> => ?BRIDGE_TYPE, - <<"name">> => ?BRIDGE_NAME - }), - ?assertMatch( - #{ <<"code">> := _ - , <<"message">> := <<"bridge already exists">> - }, jsx:decode(RetMsg)), + #{ <<"id">> := BridgeID + , <<"type">> := ?BRIDGE_TYPE + , <<"name">> := ?BRIDGE_NAME + , <<"status">> := _ + , <<"node_status">> := [_|_] + , <<"metrics">> := _ + , <<"node_metrics">> := [_|_] + , <<"url">> := URL1 + } = jsx:decode(Bridge), %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, @@ -188,9 +175,9 @@ t_http_crud_apis(_) -> end), %% update the request-path of the bridge URL2 = ?URL(Port, "path2"), - {ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]), - ?HTTP_BRIDGE(URL2)), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, Bridge2} = request(put, uri(["bridges", BridgeID]), + ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)), + ?assertMatch(#{ <<"id">> := BridgeID , <<"type">> := ?BRIDGE_TYPE , <<"name">> := ?BRIDGE_NAME , <<"status">> := _ @@ -202,7 +189,7 @@ t_http_crud_apis(_) -> %% list all bridges again, assert Bridge2 is in it {ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []), - ?assertMatch([#{ <<"id">> := ?BRIDGE_ID + ?assertMatch([#{ <<"id">> := BridgeID , <<"type">> := ?BRIDGE_TYPE , <<"name">> := ?BRIDGE_NAME , <<"status">> := _ @@ -213,8 +200,8 @@ t_http_crud_apis(_) -> }], jsx:decode(Bridge2Str)), %% get the bridge by id - {ok, 200, Bridge3Str} = request(get, uri(["bridges", ?BRIDGE_ID]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"id">> := BridgeID , <<"type">> := ?BRIDGE_TYPE , <<"name">> := ?BRIDGE_NAME , <<"status">> := _ @@ -238,12 +225,12 @@ t_http_crud_apis(_) -> end), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% update a deleted bridge returns an error - {ok, 404, ErrMsg2} = request(put, uri(["bridges", ?BRIDGE_ID]), - ?HTTP_BRIDGE(URL2)), + {ok, 404, ErrMsg2} = request(put, uri(["bridges", BridgeID]), + ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)), ?assertMatch( #{ <<"code">> := _ , <<"message">> := <<"bridge not found">> @@ -251,52 +238,51 @@ t_http_crud_apis(_) -> ok. t_start_stop_bridges(_) -> + %% assert we there's no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + Port = start_http_server(fun handle_fun_200_ok/2), URL1 = ?URL(Port, "abc"), {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1)#{ - <<"type">> => ?BRIDGE_TYPE, - <<"name">> => ?BRIDGE_NAME - }), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)), %ct:pal("the bridge ==== ~p", [Bridge]), - ?assertMatch( - #{ <<"id">> := ?BRIDGE_ID - , <<"type">> := ?BRIDGE_TYPE - , <<"name">> := ?BRIDGE_NAME - , <<"status">> := _ - , <<"node_status">> := [_|_] - , <<"metrics">> := _ - , <<"node_metrics">> := [_|_] - , <<"url">> := URL1 - }, jsx:decode(Bridge)), + #{ <<"id">> := BridgeID + , <<"type">> := ?BRIDGE_TYPE + , <<"name">> := ?BRIDGE_NAME + , <<"status">> := _ + , <<"node_status">> := [_|_] + , <<"metrics">> := _ + , <<"node_metrics">> := [_|_] + , <<"url">> := URL1 + } = jsx:decode(Bridge), %% stop it - {ok, 200, <<>>} = request(post, operation_path(stop), <<"">>), - {ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>), + {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"id">> := BridgeID , <<"status">> := <<"disconnected">> }, jsx:decode(Bridge2)), %% start again - {ok, 200, <<>>} = request(post, operation_path(start), <<"">>), - {ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, <<>>} = request(post, operation_path(start, BridgeID), <<"">>), + {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"id">> := BridgeID , <<"status">> := <<"connected">> }, jsx:decode(Bridge3)), %% restart an already started bridge - {ok, 200, <<>>} = request(post, operation_path(restart), <<"">>), - {ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>), + {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"id">> := BridgeID , <<"status">> := <<"connected">> }, jsx:decode(Bridge3)), %% stop it again - {ok, 200, <<>>} = request(post, operation_path(stop), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>), %% restart a stopped bridge - {ok, 200, <<>>} = request(post, operation_path(restart), <<"">>), - {ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>), + {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), + ?assertMatch(#{ <<"id">> := BridgeID , <<"status">> := <<"connected">> }, jsx:decode(Bridge4)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). %%-------------------------------------------------------------------- @@ -332,5 +318,5 @@ auth_header_() -> {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {"Authorization", "Bearer " ++ binary_to_list(Token)}. -operation_path(Oper) -> - uri(["bridges", ?BRIDGE_ID, "operation", Oper]). +operation_path(Oper, BridgeID) -> + uri(["bridges", BridgeID, "operation", Oper]). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 5d6bddb6a..4989cf17e 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -107,14 +107,14 @@ info_example_basic(mqtt) -> #{ mode => cluster_shareload, server => <<"127.0.0.1:1883">>, - reconnect_interval => <<"30s">>, + reconnect_interval => <<"15s">>, proto_ver => <<"v4">>, username => <<"foo">>, password => <<"bar">>, clientid => <<"foo">>, clean_start => true, keepalive => <<"300s">>, - retry_interval => <<"30s">>, + retry_interval => <<"15s">>, max_inflight => 100, ssl => #{ enable => false @@ -155,8 +155,7 @@ schema("/connectors") -> }, post => #{ tags => [<<"connectors">>], - description => <<"Create a new connector by given Id
" - "The ID must be of format '{type}:{name}'">>, + description => <<"Create a new connector">>, summary => <<"Create connector">>, requestBody => post_request_body_schema(), responses => #{ @@ -212,13 +211,13 @@ schema("/connectors/:id") -> {200, [format_resp(Conn) || Conn <- emqx_connector:list()]}; '/connectors'(post, #{body := #{<<"type">> := ConnType} = Params}) -> - ConnName = maps:get(<<"name">>, Params, emqx_misc:gen_id()), + ConnName = emqx_misc:gen_id(), case emqx_connector:lookup(ConnType, ConnName) of {ok, _} -> {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)}; {error, not_found} -> case emqx_connector:update(ConnType, ConnName, - maps:without([<<"type">>, <<"name">>], Params)) of + filter_out_request_body(Params)) of {ok, #{raw_config := RawConf}} -> Id = emqx_connector:connector_id(ConnType, ConnName), {201, format_resp(Id, RawConf)}; @@ -270,16 +269,16 @@ format_resp(#{<<"id">> := Id} = RawConf) -> format_resp(ConnId, RawConf) -> NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)), - {Type, Name} = emqx_connector:parse_connector_id(ConnId), + {Type, ConnName} = emqx_connector:parse_connector_id(ConnId), RawConf#{ <<"id">> => ConnId, <<"type">> => Type, - <<"name">> => Name, + <<"name">> => maps:get(<<"name">>, RawConf, ConnName), <<"num_of_bridges">> => NumOfBridges }. filter_out_request_body(Conf) -> - ExtraConfs = [<<"num_of_bridges">>, <<"type">>, <<"name">>], + ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>], maps:without(ExtraConfs, Conf). bin(S) when is_list(S) -> diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index ac0847a91..8b366070d 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -75,7 +75,7 @@ For example: http://localhost:9901/ })} , {connect_timeout, sc(emqx_schema:duration_ms(), - #{ default => "30s" + #{ default => "15s" , desc => "The timeout when connecting to the HTTP server" })} , {max_retries, @@ -201,7 +201,7 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, #{pool_name := PoolName, base_path := BasePath} = State) -> ?TRACE("QUERY", "http_connector_received", #{request => Request, connector => InstId, state => State}), - NRequest = update_path(BasePath, Request), + NRequest = formalize_request(Method, BasePath, Request), case Result = ehttpc:request(case KeyOrNum of undefined -> PoolName; _ -> {PoolName, KeyOrNum} @@ -211,8 +211,20 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery, request => NRequest, reason => Reason, connector => InstId}), emqx_resource:query_failed(AfterQuery); - _ -> - emqx_resource:query_success(AfterQuery) + {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> + emqx_resource:query_success(AfterQuery); + {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> + emqx_resource:query_success(AfterQuery); + {ok, StatusCode, _} -> + ?SLOG(error, #{msg => "http connector do reqeust, received error response", + request => NRequest, connector => InstId, + status_code => StatusCode}), + emqx_resource:query_failed(AfterQuery); + {ok, StatusCode, _, _} -> + ?SLOG(error, #{msg => "http connector do reqeust, received error response", + request => NRequest, connector => InstId, + status_code => StatusCode}), + emqx_resource:query_failed(AfterQuery) end, Result. @@ -298,10 +310,14 @@ check_ssl_opts(URLFrom, Conf) -> {_, _} -> false end. -update_path(BasePath, {Path, Headers}) -> - {filename:join(BasePath, Path), Headers}; -update_path(BasePath, {Path, Headers, Body}) -> - {filename:join(BasePath, Path), Headers, Body}. +formalize_request(Method, BasePath, {Path, Headers, _Body}) + when Method =:= get; Method =:= delete -> + formalize_request(Method, BasePath, {Path, Headers}); +formalize_request(_Method, BasePath, {Path, Headers, Body}) -> + {filename:join(BasePath, Path), Headers, Body}; + +formalize_request(_Method, BasePath, {Path, Headers}) -> + {filename:join(BasePath, Path), Headers}. bin(Bin) when is_binary(Bin) -> Bin; diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index a9647b2c1..c216e905c 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -29,7 +29,7 @@ , bridges/0 ]). --export([on_message_received/2]). +-export([on_message_received/3]). %% callbacks of behaviour emqx_resource -export([ on_start/2 @@ -105,14 +105,17 @@ drop_bridge(Name) -> case supervisor:terminate_child(?MODULE, Name) of ok -> supervisor:delete_child(?MODULE, Name); + {error, not_found} -> + ok; {error, Error} -> {error, Error} end. %% =================================================================== -%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called +%% When use this bridge as a data source, ?MODULE:on_message_received will be called %% if the bridge received msgs from the remote broker. -on_message_received(Msg, HookPoint) -> +on_message_received(Msg, HookPoint, InstId) -> + _ = emqx_resource:query(InstId, {message_received, Msg}), emqx:run_hook(HookPoint, [Msg]). %% =================================================================== @@ -123,8 +126,8 @@ on_start(InstId, Conf) -> BasicConf = basic_config(Conf), BridgeConf = BasicConf#{ name => InstanceId, - clientid => clientid(maps:get(clientid, Conf, InstId)), - subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)), + clientid => clientid(InstId), + subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), InstId), forwards => make_forward_confs(maps:get(egress, Conf, undefined)) }, case ?MODULE:create_bridge(BridgeConf) of @@ -149,6 +152,9 @@ on_stop(_InstId, #{name := InstanceId}) -> connector => InstanceId, reason => Reason}) end. +on_query(_InstId, {message_received, _Msg}, AfterQuery, _State) -> + emqx_resource:query_success(AfterQuery); + on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), @@ -166,15 +172,15 @@ ensure_mqtt_worker_started(InstanceId) -> {error, Reason} -> {error, Reason} end. -make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 -> +make_sub_confs(EmptyMap, _) when map_size(EmptyMap) == 0 -> undefined; -make_sub_confs(undefined) -> +make_sub_confs(undefined, _) -> undefined; -make_sub_confs(SubRemoteConf) -> +make_sub_confs(SubRemoteConf, InstId) -> case maps:take(hookpoint, SubRemoteConf) of error -> SubRemoteConf; {HookPoint, SubConf} -> - MFA = {?MODULE, on_message_received, [HookPoint]}, + MFA = {?MODULE, on_message_received, [HookPoint, InstId]}, SubConf#{on_message_received => MFA} end. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 3ab410391..d7abcda84 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -168,7 +168,6 @@ handle_publish(Msg, undefined) -> handle_publish(Msg, Vars) -> ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), - emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1), case Vars of #{on_message_received := {Mod, Func, Args}} -> _ = erlang:apply(Mod, Func, [Msg | Args]); diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 1357037ee..a0dd9eec1 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -61,7 +61,7 @@ make_pub_vars(Mountpoint, Conf) when is_map(Conf) -> -> exp_msg(). to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> Retain0 = maps:get(retain, Flags0, false), - MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)), + MapMsg = maps:put(retain, Retain0, emqx_rule_events:eventmsg_publish(Msg)), to_remote_msg(MapMsg, Vars); to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken, remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> @@ -78,9 +78,10 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. %% published from remote node over a MQTT connection -to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, +to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0, #{local_topic := TopicToken, payload := PayloadToken, local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> + MapMsg = format_msg_received(MapMsg0), Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), @@ -89,6 +90,33 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). +format_msg_received(#{dup := Dup, payload := Payload, properties := Props, + qos := QoS, retain := Retain, topic := Topic}) -> + #{event => '$bridges/mqtt', + id => emqx_guid:to_hexstr(emqx_guid:gen()), + payload => Payload, + topic => Topic, + qos => QoS, + flags => #{dup => Dup, retain => Retain}, + pub_props => printable_maps(Props), + timestamp => erlang:system_time(millisecond), + node => node() + }. + +printable_maps(undefined) -> #{}; +printable_maps(Headers) -> + maps:fold( + fun ('User-Property', V0, AccIn) when is_list(V0) -> + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [#{ + key => Key, + value => Value + } || {Key, Value} <- V0] + }; + (K, V0, AccIn) -> AccIn#{K => V0} + end, #{}, Headers). + process_payload([], Msg) -> emqx_json:encode(Msg); process_payload(Tks, Msg) -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 6fabb6b5d..b3484f5d9 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -39,7 +39,7 @@ fields("config") -> fields("connector") -> [ {mode, - sc(hoconsc:enum([cluster_singleton, cluster_shareload]), + sc(hoconsc:enum([cluster_shareload]), #{ default => cluster_shareload , desc => """ The mode of the MQTT Bridge. Can be one of 'cluster_singleton' or 'cluster_shareload'
@@ -55,12 +55,17 @@ clientid conflicts between different nodes. And we can only use shared subscript topic filters for 'remote_topic' of ingress connections. """ })} + , {name, + sc(binary(), + #{ nullable => true + , desc => "Connector name, used as a human-readable description of the connector." + })} , {server, sc(emqx_schema:ip_port(), #{ default => "127.0.0.1:1883" , desc => "The host and port of the remote MQTT broker" })} - , {reconnect_interval, mk_duration("reconnect interval", #{default => "30s"})} + , {reconnect_interval, mk_duration("reconnect interval", #{default => "15s"})} , {proto_ver, sc(hoconsc:enum([v3, v4, v5]), #{ default => v4 @@ -76,17 +81,13 @@ topic filters for 'remote_topic' of ingress connections. #{ default => "emqx" , desc => "The password of the MQTT protocol" })} - , {clientid, - sc(binary(), - #{ desc => "The clientid of the MQTT protocol" - })} , {clean_start, sc(boolean(), #{ default => true , desc => "The clean-start or the clean-session of the MQTT protocol" })} , {keepalive, mk_duration("keepalive", #{default => "300s"})} - , {retry_interval, mk_duration("retry interval", #{default => "30s"})} + , {retry_interval, mk_duration("retry interval", #{default => "15s"})} , {max_inflight, sc(integer(), #{ default => 32 diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 307852546..1a96a3596 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -26,11 +26,8 @@ -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(CONNECTR_TYPE, <<"mqtt">>). -define(CONNECTR_NAME, <<"test_connector">>). --define(CONNECTR_ID, <<"mqtt:test_connector">>). -define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>). -define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>). --define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>). --define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>). -define(MQTT_CONNECOTR(Username), #{ <<"server">> => <<"127.0.0.1:1883">>, @@ -123,32 +120,21 @@ t_mqtt_crud_apis(_) -> , <<"name">> => ?CONNECTR_NAME }), - %ct:pal("---connector: ~p", [Connector]), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID - , <<"type">> := ?CONNECTR_TYPE - , <<"name">> := ?CONNECTR_NAME - , <<"server">> := <<"127.0.0.1:1883">> - , <<"username">> := User1 - , <<"password">> := <<"">> - , <<"proto_ver">> := <<"v4">> - , <<"ssl">> := #{<<"enable">> := false} - }, jsx:decode(Connector)), - - %% create a again returns an error - {ok, 400, RetMsg} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE - , <<"name">> => ?CONNECTR_NAME - }), - ?assertMatch( - #{ <<"code">> := _ - , <<"message">> := <<"connector already exists">> - }, jsx:decode(RetMsg)), + #{ <<"id">> := ConnctorID + , <<"type">> := ?CONNECTR_TYPE + , <<"name">> := ?CONNECTR_NAME + , <<"server">> := <<"127.0.0.1:1883">> + , <<"username">> := User1 + , <<"password">> := <<"">> + , <<"proto_ver">> := <<"v4">> + , <<"ssl">> := #{<<"enable">> := false} + } = jsx:decode(Connector), %% update the request-path of the connector User2 = <<"user2">>, - {ok, 200, Connector2} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 200, Connector2} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR(User2)), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID + ?assertMatch(#{ <<"id">> := ConnctorID , <<"server">> := <<"127.0.0.1:1883">> , <<"username">> := User2 , <<"password">> := <<"">> @@ -158,7 +144,7 @@ t_mqtt_crud_apis(_) -> %% list all connectors again, assert Connector2 is in it {ok, 200, Connector2Str} = request(get, uri(["connectors"]), []), - ?assertMatch([#{ <<"id">> := ?CONNECTR_ID + ?assertMatch([#{ <<"id">> := ConnctorID , <<"type">> := ?CONNECTR_TYPE , <<"name">> := ?CONNECTR_NAME , <<"server">> := <<"127.0.0.1:1883">> @@ -169,8 +155,8 @@ t_mqtt_crud_apis(_) -> }], jsx:decode(Connector2Str)), %% get the connector by id - {ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID + {ok, 200, Connector3Str} = request(get, uri(["connectors", ConnctorID]), []), + ?assertMatch(#{ <<"id">> := ConnctorID , <<"type">> := ?CONNECTR_TYPE , <<"name">> := ?CONNECTR_NAME , <<"server">> := <<"127.0.0.1:1883">> @@ -181,11 +167,11 @@ t_mqtt_crud_apis(_) -> }, jsx:decode(Connector3Str)), %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), %% update a deleted connector returns an error - {ok, 404, ErrMsg2} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 404, ErrMsg2} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR(User2)), ?assertMatch( #{ <<"code">> := _ @@ -205,28 +191,28 @@ t_mqtt_conn_bridge_ingress(_) -> , <<"name">> => ?CONNECTR_NAME }), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID - , <<"server">> := <<"127.0.0.1:1883">> - , <<"num_of_bridges">> := 0 - , <<"username">> := User1 - , <<"password">> := <<"">> - , <<"proto_ver">> := <<"v4">> - , <<"ssl">> := #{<<"enable">> := false} - }, jsx:decode(Connector)), + #{ <<"id">> := ConnctorID + , <<"server">> := <<"127.0.0.1:1883">> + , <<"num_of_bridges">> := 0 + , <<"username">> := User1 + , <<"password">> := <<"">> + , <<"proto_ver">> := <<"v4">> + , <<"ssl">> := #{<<"enable">> := false} + } = jsx:decode(Connector), %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{ + ?MQTT_BRIDGE_INGRESS(ConnctorID)#{ <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_INGRESS }), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS - , <<"type">> := <<"mqtt">> - , <<"status">> := <<"connected">> - , <<"connector">> := ?CONNECTR_ID - }, jsx:decode(Bridge)), + #{ <<"id">> := BridgeIDIngress + , <<"type">> := <<"mqtt">> + , <<"status">> := <<"connected">> + , <<"connector">> := ConnctorID + } = jsx:decode(Bridge), %% we now test if the bridge works as expected @@ -252,17 +238,17 @@ t_mqtt_conn_bridge_ingress(_) -> end), %% get the connector by id, verify the num_of_bridges now is 1 - {ok, 200, Connector1Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID + {ok, 200, Connector1Str} = request(get, uri(["connectors", ConnctorID]), []), + ?assertMatch(#{ <<"id">> := ConnctorID , <<"num_of_bridges">> := 1 }, jsx:decode(Connector1Str)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), ok. @@ -279,29 +265,28 @@ t_mqtt_conn_bridge_egress(_) -> }), %ct:pal("---connector: ~p", [Connector]), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID - , <<"server">> := <<"127.0.0.1:1883">> - , <<"username">> := User1 - , <<"password">> := <<"">> - , <<"proto_ver">> := <<"v4">> - , <<"ssl">> := #{<<"enable">> := false} - }, jsx:decode(Connector)), + #{ <<"id">> := ConnctorID + , <<"server">> := <<"127.0.0.1:1883">> + , <<"username">> := User1 + , <<"password">> := <<"">> + , <<"proto_ver">> := <<"v4">> + , <<"ssl">> := #{<<"enable">> := false} + } = jsx:decode(Connector), %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_EGRESS }), - %ct:pal("---bridge: ~p", [Bridge]), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS - , <<"type">> := ?CONNECTR_TYPE - , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"connected">> - , <<"connector">> := ?CONNECTR_ID - }, jsx:decode(Bridge)), + #{ <<"id">> := BridgeIDEgress + , <<"type">> := ?CONNECTR_TYPE + , <<"name">> := ?BRIDGE_NAME_EGRESS + , <<"status">> := <<"connected">> + , <<"connector">> := ConnctorID + } = jsx:decode(Bridge), %% we now test if the bridge works as expected LocalTopic = <<"local_topic/1">>, @@ -326,19 +311,19 @@ t_mqtt_conn_bridge_egress(_) -> end), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch(#{ <<"id">> := BridgeIDEgress , <<"metrics">> := ?metrics(1, 1, 0, _, _, _) , <<"node_metrics">> := [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}] }, jsx:decode(BridgeStr)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), ok. @@ -358,37 +343,37 @@ t_mqtt_conn_update(_) -> }), %ct:pal("---connector: ~p", [Connector]), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID - , <<"server">> := <<"127.0.0.1:1883">> - }, jsx:decode(Connector)), + #{ <<"id">> := ConnctorID + , <<"server">> := <<"127.0.0.1:1883">> + } = jsx:decode(Connector), %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_EGRESS }), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS - , <<"type">> := <<"mqtt">> - , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"connected">> - , <<"connector">> := ?CONNECTR_ID - }, jsx:decode(Bridge)), + #{ <<"id">> := BridgeIDEgress + , <<"type">> := <<"mqtt">> + , <<"name">> := ?BRIDGE_NAME_EGRESS + , <<"status">> := <<"connected">> + , <<"connector">> := ConnctorID + } = jsx:decode(Bridge), %% then we try to update 'server' of the connector, to an unavailable IP address %% the update should fail because of 'unreachable' or 'connrefused' - {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)), %% we fix the 'server' parameter to a normal one, it should work - {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). t_mqtt_conn_update2(_) -> @@ -404,36 +389,36 @@ t_mqtt_conn_update2(_) -> , <<"name">> => ?CONNECTR_NAME }), - ?assertMatch(#{ <<"id">> := ?CONNECTR_ID - , <<"server">> := <<"127.0.0.1:2603">> - }, jsx:decode(Connector)), + #{ <<"id">> := ConnctorID + , <<"server">> := <<"127.0.0.1:2603">> + } = jsx:decode(Connector), %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ <<"type">> => ?CONNECTR_TYPE, <<"name">> => ?BRIDGE_NAME_EGRESS }), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS - , <<"type">> := <<"mqtt">> - , <<"name">> := ?BRIDGE_NAME_EGRESS - , <<"status">> := <<"disconnected">> - , <<"connector">> := ?CONNECTR_ID - }, jsx:decode(Bridge)), + #{ <<"id">> := BridgeIDEgress + , <<"type">> := <<"mqtt">> + , <<"name">> := ?BRIDGE_NAME_EGRESS + , <<"status">> := <<"disconnected">> + , <<"connector">> := ConnctorID + } = jsx:decode(Bridge), %% we fix the 'server' parameter to a normal one, it should work - {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), + {ok, 200, _} = request(put, uri(["connectors", ConnctorID]), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), - {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch(#{ <<"id">> := BridgeIDEgress , <<"status">> := <<"connected">> }, jsx:decode(BridgeStr)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). t_mqtt_conn_testing(_) -> diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index 24541990b..cffb914d8 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -96,7 +96,6 @@ reboot_apps() -> , emqx_resource , emqx_rule_engine , emqx_bridge - , emqx_bridge_mqtt , emqx_plugin_libs , emqx_management , emqx_retainer @@ -112,17 +111,17 @@ sorted_reboot_apps() -> app_deps(App) -> case application:get_key(App, applications) of - undefined -> []; + undefined -> undefined; {ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List) end. sorted_reboot_apps(Apps) -> G = digraph:new(), try - lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps), + NoDepApps = add_apps_to_digraph(G, Apps), case digraph_utils:topsort(G) of Sorted when is_list(Sorted) -> - Sorted; + Sorted ++ (NoDepApps -- Sorted); false -> Loops = find_loops(G), error({circular_application_dependency, Loops}) @@ -131,17 +130,29 @@ sorted_reboot_apps(Apps) -> digraph:delete(G) end. -add_app(G, App, undefined) -> +add_apps_to_digraph(G, Apps) -> + lists:foldl(fun + ({App, undefined}, Acc) -> + ?SLOG(debug, #{msg => "app_is_not_loaded", app => App}), + Acc; + ({App, []}, Acc) -> + Acc ++ [App]; %% use '++' to keep the original order + ({App, Deps}, Acc) -> + add_app_deps_to_digraph(G, App, Deps), + Acc + end, [], Apps). + +add_app_deps_to_digraph(G, App, undefined) -> ?SLOG(debug, #{msg => "app_is_not_loaded", app => App}), %% not loaded - add_app(G, App, []); -add_app(_G, _App, []) -> + add_app_deps_to_digraph(G, App, []); +add_app_deps_to_digraph(_G, _App, []) -> ok; -add_app(G, App, [Dep | Deps]) -> +add_app_deps_to_digraph(G, App, [Dep | Deps]) -> digraph:add_vertex(G, App), digraph:add_vertex(G, Dep), digraph:add_edge(G, Dep, App), %% dep -> app as dependency - add_app(G, App, Deps). + add_app_deps_to_digraph(G, App, Deps). find_loops(G) -> lists:filtermap( diff --git a/apps/emqx_machine/test/emqx_machine_SUITE.erl b/apps/emqx_machine/test/emqx_machine_SUITE.erl index 03d9e6ba9..a760d2f5f 100644 --- a/apps/emqx_machine/test/emqx_machine_SUITE.erl +++ b/apps/emqx_machine/test/emqx_machine_SUITE.erl @@ -43,7 +43,7 @@ init_per_suite(Config) -> %% application:unload(emqx_authz), - emqx_common_test_helpers:start_apps([]), + emqx_common_test_helpers:start_apps([emqx_conf]), Config. end_per_suite(_Config) -> diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 497affa5e..745b8b684 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -149,7 +149,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> TestInstId = iolist_to_binary(emqx_misc:gen_id(16)), case do_create_dry_run(TestInstId, ResourceType, Config) of ok -> - do_remove(ResourceType, InstId, ResourceState), + do_remove(ResourceType, InstId, ResourceState, false), do_create(InstId, ResourceType, Config, #{force_create => true}); Error -> Error @@ -208,15 +208,23 @@ do_remove(InstId) -> end. do_remove(Mod, InstId, ResourceState) -> + do_remove(Mod, InstId, ResourceState, true). + +do_remove(Mod, InstId, ResourceState, ClearMetrics) -> _ = emqx_resource:call_stop(InstId, Mod, ResourceState), ets:delete(emqx_resource_instance, InstId), - ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId), - ok. + case ClearMetrics of + true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); + false -> ok + end. do_restart(InstId) -> case lookup(InstId) of {ok, #{mod := Mod, state := ResourceState, config := Config} = Data} -> - _ = emqx_resource:call_stop(InstId, Mod, ResourceState), + _ = case ResourceState of + undefined -> ok; + _ -> emqx_resource:call_stop(InstId, Mod, ResourceState) + end, case emqx_resource:call_start(InstId, Mod, Config) of {ok, NewResourceState} -> ets:insert(emqx_resource_instance, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 5316ca5ef..6a579cbb0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -187,11 +187,11 @@ init([]) -> {ok, #{}}. handle_call({insert_rule, Rule}, _From, State) -> - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_insert_rule, [Rule]), + do_insert_rule(Rule), {reply, ok, State}; handle_call({delete_rule, Rule}, _From, State) -> - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_delete_rule, [Rule]), + do_delete_rule(Rule), {reply, ok, State}; handle_call(Req, _From, State) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index cbfda16db..d9138ffd1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -172,7 +172,7 @@ param_path_id() -> {ok, _Rule} -> {400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}}; not_found -> - case emqx:update_config(ConfPath, Params, #{}) of + case emqx_conf:update(ConfPath, Params, #{}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> [Rule] = get_one_rule(AllRules, Id), {201, format_rule_resp(Rule)}; @@ -200,7 +200,7 @@ param_path_id() -> '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) -> Params = filter_out_request_body(Params0), ConfPath = emqx_rule_engine:config_key_path() ++ [Id], - case emqx:update_config(ConfPath, Params, #{}) of + case emqx_conf:update(ConfPath, Params, #{}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> [Rule] = get_one_rule(AllRules, Id), {200, format_rule_resp(Rule)}; diff --git a/rebar.config b/rebar.config index 7b08d38a0..61a3f4510 100644 --- a/rebar.config +++ b/rebar.config @@ -56,7 +56,7 @@ , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}} - , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} + , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {replayq, "0.3.3"} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}}