Merge pull request #6563 from terry-xiaoyu/bridge_bug_fixes_1

Bridge bug fixes 1
This commit is contained in:
Shawn 2021-12-31 11:57:28 +08:00 committed by GitHub
commit 14ee053a0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 279 additions and 230 deletions

View File

@ -34,8 +34,8 @@
# direction = egress # direction = egress
# ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url # ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url
# url = "http://localhost:9901/messages/${topic}" # url = "http://localhost:9901/messages/${topic}"
# request_timeout = "30s" # request_timeout = "15s"
# connect_timeout = "30s" # connect_timeout = "15s"
# max_retries = 3 # max_retries = 3
# retry_interval = "10s" # retry_interval = "10s"
# pool_type = "random" # pool_type = "random"

View File

@ -222,7 +222,10 @@ update(Type, Name, {OldConf, Conf}) ->
true -> true ->
%% we don't need to recreate the bridge if this config change is only to %% we don't need to recreate the bridge if this config change is only to
%% toggole the config 'bridge.{type}.{name}.enable' %% toggole the config 'bridge.{type}.{name}.enable'
ok case maps:get(enable, Conf, true) of
false -> stop(Type, Name);
true -> start(Type, Name)
end
end. end.
recreate(Type, Name) -> recreate(Type, Name) ->

View File

@ -158,8 +158,8 @@ method_example(_Type, _Direction, put) ->
info_example_basic(http, _) -> info_example_basic(http, _) ->
#{ #{
url => <<"http://localhost:9901/messages/${topic}">>, url => <<"http://localhost:9901/messages/${topic}">>,
request_timeout => <<"30s">>, request_timeout => <<"15s">>,
connect_timeout => <<"30s">>, connect_timeout => <<"15s">>,
max_retries => 3, max_retries => 3,
retry_interval => <<"10s">>, retry_interval => <<"10s">>,
pool_type => <<"random">>, pool_type => <<"random">>,
@ -276,7 +276,7 @@ schema("/bridges/:id/operation/:operation") ->
'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf0}) -> '/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf0}) ->
Conf = filter_out_request_body(Conf0), Conf = filter_out_request_body(Conf0),
BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()), BridgeName = emqx_misc:gen_id(),
case emqx_bridge:lookup(BridgeType, BridgeName) of case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} -> {ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)}; {400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)};
@ -356,9 +356,8 @@ operation_to_conf_req(<<"restart">>) -> restart;
operation_to_conf_req(_) -> invalid. operation_to_conf_req(_) -> invalid.
ensure_bridge_created(BridgeType, BridgeName, Conf) -> ensure_bridge_created(BridgeType, BridgeName, Conf) ->
Conf1 = maps:without([<<"type">>, <<"name">>], Conf),
case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
Conf1, #{override_to => cluster}) of Conf, #{override_to => cluster}) of
{ok, _} -> ok; {ok, _} -> ok;
{error, Reason} -> {error, Reason} ->
{error, error_msg('BAD_ARG', Reason)} {error, error_msg('BAD_ARG', Reason)}
@ -411,12 +410,12 @@ aggregate_metrics(AllMetrics) ->
format_resp(#{id := Id, raw_config := RawConf, format_resp(#{id := Id, raw_config := RawConf,
resource_data := #{status := Status, metrics := Metrics}}) -> resource_data := #{status := Status, metrics := Metrics}}) ->
{Type, Name} = emqx_bridge:parse_bridge_id(Id), {Type, BridgeName} = emqx_bridge:parse_bridge_id(Id),
IsConnected = fun(started) -> connected; (_) -> disconnected end, IsConnected = fun(started) -> connected; (_) -> disconnected end,
RawConf#{ RawConf#{
id => Id, id => Id,
type => Type, type => Type,
name => Name, name => maps:get(<<"name">>, RawConf, BridgeName),
node => node(), node => node(),
status => IsConnected(Status), status => IsConnected(Status),
metrics => Metrics metrics => Metrics
@ -431,8 +430,8 @@ rpc_multicall(Func, Args) ->
end. end.
filter_out_request_body(Conf) -> filter_out_request_body(Conf) ->
ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>, ExtraConfs = [<<"id">>, <<"type">>, <<"status">>, <<"node_status">>,
<<"metrics">>, <<"node">>], <<"node_metrics">>, <<"metrics">>, <<"node">>],
maps:without(ExtraConfs, Conf). maps:without(ExtraConfs, Conf).
rpc_call(Node, Fun, Args) -> rpc_call(Node, Fun, Args) ->

View File

@ -59,7 +59,7 @@ Template with variables is allowed.
""" """
})} })}
, {request_timeout, mk(emqx_schema:duration_ms(), , {request_timeout, mk(emqx_schema:duration_ms(),
#{ default => <<"30s">> #{ default => <<"15s">>
, desc =>""" , desc =>"""
How long will the HTTP request timeout. How long will the HTTP request timeout.
""" """
@ -84,6 +84,10 @@ basic_config() ->
#{ desc => "Enable or disable this bridge" #{ desc => "Enable or disable this bridge"
, default => true , default => true
})} })}
, {name,
mk(binary(),
#{ desc => "Bridge name, used as a human-readable description of the bridge."
})}
, {direction, , {direction,
mk(egress, mk(egress,
#{ desc => "The direction of this bridge, MUST be egress" #{ desc => "The direction of this bridge, MUST be egress"

View File

@ -43,9 +43,13 @@ http_schema(Method) ->
common_bridge_fields() -> common_bridge_fields() ->
[ {enable, [ {enable,
mk(boolean(), mk(boolean(),
#{ desc =>"Enable or disable this bridge" #{ desc => "Enable or disable this bridge"
, default => true , default => true
})} })}
, {name,
mk(binary(),
#{ desc => "Bridge name, used as a human-readable description of the bridge."
})}
, {connector, , {connector,
mk(binary(), mk(binary(),
#{ nullable => false #{ nullable => false

View File

@ -23,12 +23,13 @@
-define(CONF_DEFAULT, <<"bridges: {}">>). -define(CONF_DEFAULT, <<"bridges: {}">>).
-define(BRIDGE_TYPE, <<"http">>). -define(BRIDGE_TYPE, <<"http">>).
-define(BRIDGE_NAME, <<"test_bridge">>). -define(BRIDGE_NAME, <<"test_bridge">>).
-define(BRIDGE_ID, <<"http:test_bridge">>).
-define(URL(PORT, PATH), list_to_binary( -define(URL(PORT, PATH), list_to_binary(
io_lib:format("http://localhost:~s/~s", io_lib:format("http://localhost:~s/~s",
[integer_to_list(PORT), PATH]))). [integer_to_list(PORT), PATH]))).
-define(HTTP_BRIDGE(URL), -define(HTTP_BRIDGE(URL, TYPE, NAME),
#{ #{
<<"type">> => TYPE,
<<"name">> => NAME,
<<"url">> => URL, <<"url">> => URL,
<<"local_topic">> => <<"emqx_http/#">>, <<"local_topic">> => <<"emqx_http/#">>,
<<"method">> => <<"post">>, <<"method">> => <<"post">>,
@ -145,13 +146,10 @@ t_http_crud_apis(_) ->
%% POST /bridges/ will create a bridge %% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"), URL1 = ?URL(Port, "path1"),
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{ ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
<<"type">> => ?BRIDGE_TYPE,
<<"name">> => ?BRIDGE_NAME
}),
%ct:pal("---bridge: ~p", [Bridge]), %ct:pal("---bridge: ~p", [Bridge]),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID #{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE , <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _ , <<"status">> := _
@ -159,18 +157,7 @@ t_http_crud_apis(_) ->
, <<"metrics">> := _ , <<"metrics">> := _
, <<"node_metrics">> := [_|_] , <<"node_metrics">> := [_|_]
, <<"url">> := URL1 , <<"url">> := URL1
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% create a again returns an error
{ok, 400, RetMsg} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{
<<"type">> => ?BRIDGE_TYPE,
<<"name">> => ?BRIDGE_NAME
}),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"bridge already exists">>
}, jsx:decode(RetMsg)),
%% send an message to emqx and the message should be forwarded to the HTTP server %% send an message to emqx and the message should be forwarded to the HTTP server
Body = <<"my msg">>, Body = <<"my msg">>,
@ -188,9 +175,9 @@ t_http_crud_apis(_) ->
end), end),
%% update the request-path of the bridge %% update the request-path of the bridge
URL2 = ?URL(Port, "path2"), URL2 = ?URL(Port, "path2"),
{ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]), {ok, 200, Bridge2} = request(put, uri(["bridges", BridgeID]),
?HTTP_BRIDGE(URL2)), ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE , <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _ , <<"status">> := _
@ -202,7 +189,7 @@ t_http_crud_apis(_) ->
%% list all bridges again, assert Bridge2 is in it %% list all bridges again, assert Bridge2 is in it
{ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []), {ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []),
?assertMatch([#{ <<"id">> := ?BRIDGE_ID ?assertMatch([#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE , <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _ , <<"status">> := _
@ -213,8 +200,8 @@ t_http_crud_apis(_) ->
}], jsx:decode(Bridge2Str)), }], jsx:decode(Bridge2Str)),
%% get the bridge by id %% get the bridge by id
{ok, 200, Bridge3Str} = request(get, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE , <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _ , <<"status">> := _
@ -238,12 +225,12 @@ t_http_crud_apis(_) ->
end), end),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% update a deleted bridge returns an error %% update a deleted bridge returns an error
{ok, 404, ErrMsg2} = request(put, uri(["bridges", ?BRIDGE_ID]), {ok, 404, ErrMsg2} = request(put, uri(["bridges", BridgeID]),
?HTTP_BRIDGE(URL2)), ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
?assertMatch( ?assertMatch(
#{ <<"code">> := _ #{ <<"code">> := _
, <<"message">> := <<"bridge not found">> , <<"message">> := <<"bridge not found">>
@ -251,16 +238,15 @@ t_http_crud_apis(_) ->
ok. ok.
t_start_stop_bridges(_) -> t_start_stop_bridges(_) ->
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
Port = start_http_server(fun handle_fun_200_ok/2), Port = start_http_server(fun handle_fun_200_ok/2),
URL1 = ?URL(Port, "abc"), URL1 = ?URL(Port, "abc"),
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{ ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
<<"type">> => ?BRIDGE_TYPE,
<<"name">> => ?BRIDGE_NAME
}),
%ct:pal("the bridge ==== ~p", [Bridge]), %ct:pal("the bridge ==== ~p", [Bridge]),
?assertMatch( #{ <<"id">> := BridgeID
#{ <<"id">> := ?BRIDGE_ID
, <<"type">> := ?BRIDGE_TYPE , <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME , <<"name">> := ?BRIDGE_NAME
, <<"status">> := _ , <<"status">> := _
@ -268,35 +254,35 @@ t_start_stop_bridges(_) ->
, <<"metrics">> := _ , <<"metrics">> := _
, <<"node_metrics">> := [_|_] , <<"node_metrics">> := [_|_]
, <<"url">> := URL1 , <<"url">> := URL1
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% stop it %% stop it
{ok, 200, <<>>} = request(post, operation_path(stop), <<"">>), {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"disconnected">> , <<"status">> := <<"disconnected">>
}, jsx:decode(Bridge2)), }, jsx:decode(Bridge2)),
%% start again %% start again
{ok, 200, <<>>} = request(post, operation_path(start), <<"">>), {ok, 200, <<>>} = request(post, operation_path(start, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)), }, jsx:decode(Bridge3)),
%% restart an already started bridge %% restart an already started bridge
{ok, 200, <<>>} = request(post, operation_path(restart), <<"">>), {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)), }, jsx:decode(Bridge3)),
%% stop it again %% stop it again
{ok, 200, <<>>} = request(post, operation_path(stop), <<"">>), {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
%% restart a stopped bridge %% restart a stopped bridge
{ok, 200, <<>>} = request(post, operation_path(restart), <<"">>), {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
}, jsx:decode(Bridge4)), }, jsx:decode(Bridge4)),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -332,5 +318,5 @@ auth_header_() ->
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}. {"Authorization", "Bearer " ++ binary_to_list(Token)}.
operation_path(Oper) -> operation_path(Oper, BridgeID) ->
uri(["bridges", ?BRIDGE_ID, "operation", Oper]). uri(["bridges", BridgeID, "operation", Oper]).

View File

@ -107,14 +107,14 @@ info_example_basic(mqtt) ->
#{ #{
mode => cluster_shareload, mode => cluster_shareload,
server => <<"127.0.0.1:1883">>, server => <<"127.0.0.1:1883">>,
reconnect_interval => <<"30s">>, reconnect_interval => <<"15s">>,
proto_ver => <<"v4">>, proto_ver => <<"v4">>,
username => <<"foo">>, username => <<"foo">>,
password => <<"bar">>, password => <<"bar">>,
clientid => <<"foo">>, clientid => <<"foo">>,
clean_start => true, clean_start => true,
keepalive => <<"300s">>, keepalive => <<"300s">>,
retry_interval => <<"30s">>, retry_interval => <<"15s">>,
max_inflight => 100, max_inflight => 100,
ssl => #{ ssl => #{
enable => false enable => false
@ -155,8 +155,7 @@ schema("/connectors") ->
}, },
post => #{ post => #{
tags => [<<"connectors">>], tags => [<<"connectors">>],
description => <<"Create a new connector by given Id <br>" description => <<"Create a new connector">>,
"The ID must be of format '{type}:{name}'">>,
summary => <<"Create connector">>, summary => <<"Create connector">>,
requestBody => post_request_body_schema(), requestBody => post_request_body_schema(),
responses => #{ responses => #{
@ -212,13 +211,13 @@ schema("/connectors/:id") ->
{200, [format_resp(Conn) || Conn <- emqx_connector:list()]}; {200, [format_resp(Conn) || Conn <- emqx_connector:list()]};
'/connectors'(post, #{body := #{<<"type">> := ConnType} = Params}) -> '/connectors'(post, #{body := #{<<"type">> := ConnType} = Params}) ->
ConnName = maps:get(<<"name">>, Params, emqx_misc:gen_id()), ConnName = emqx_misc:gen_id(),
case emqx_connector:lookup(ConnType, ConnName) of case emqx_connector:lookup(ConnType, ConnName) of
{ok, _} -> {ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)}; {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
{error, not_found} -> {error, not_found} ->
case emqx_connector:update(ConnType, ConnName, case emqx_connector:update(ConnType, ConnName,
maps:without([<<"type">>, <<"name">>], Params)) of filter_out_request_body(Params)) of
{ok, #{raw_config := RawConf}} -> {ok, #{raw_config := RawConf}} ->
Id = emqx_connector:connector_id(ConnType, ConnName), Id = emqx_connector:connector_id(ConnType, ConnName),
{201, format_resp(Id, RawConf)}; {201, format_resp(Id, RawConf)};
@ -270,16 +269,16 @@ format_resp(#{<<"id">> := Id} = RawConf) ->
format_resp(ConnId, RawConf) -> format_resp(ConnId, RawConf) ->
NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)), NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)),
{Type, Name} = emqx_connector:parse_connector_id(ConnId), {Type, ConnName} = emqx_connector:parse_connector_id(ConnId),
RawConf#{ RawConf#{
<<"id">> => ConnId, <<"id">> => ConnId,
<<"type">> => Type, <<"type">> => Type,
<<"name">> => Name, <<"name">> => maps:get(<<"name">>, RawConf, ConnName),
<<"num_of_bridges">> => NumOfBridges <<"num_of_bridges">> => NumOfBridges
}. }.
filter_out_request_body(Conf) -> filter_out_request_body(Conf) ->
ExtraConfs = [<<"num_of_bridges">>, <<"type">>, <<"name">>], ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>],
maps:without(ExtraConfs, Conf). maps:without(ExtraConfs, Conf).
bin(S) when is_list(S) -> bin(S) when is_list(S) ->

View File

@ -75,7 +75,7 @@ For example: http://localhost:9901/
})} })}
, {connect_timeout, , {connect_timeout,
sc(emqx_schema:duration_ms(), sc(emqx_schema:duration_ms(),
#{ default => "30s" #{ default => "15s"
, desc => "The timeout when connecting to the HTTP server" , desc => "The timeout when connecting to the HTTP server"
})} })}
, {max_retries, , {max_retries,
@ -201,7 +201,7 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
#{pool_name := PoolName, base_path := BasePath} = State) -> #{pool_name := PoolName, base_path := BasePath} = State) ->
?TRACE("QUERY", "http_connector_received", ?TRACE("QUERY", "http_connector_received",
#{request => Request, connector => InstId, state => State}), #{request => Request, connector => InstId, state => State}),
NRequest = update_path(BasePath, Request), NRequest = formalize_request(Method, BasePath, Request),
case Result = ehttpc:request(case KeyOrNum of case Result = ehttpc:request(case KeyOrNum of
undefined -> PoolName; undefined -> PoolName;
_ -> {PoolName, KeyOrNum} _ -> {PoolName, KeyOrNum}
@ -211,8 +211,20 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
request => NRequest, reason => Reason, request => NRequest, reason => Reason,
connector => InstId}), connector => InstId}),
emqx_resource:query_failed(AfterQuery); emqx_resource:query_failed(AfterQuery);
_ -> {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
emqx_resource:query_success(AfterQuery) emqx_resource:query_success(AfterQuery);
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
emqx_resource:query_success(AfterQuery);
{ok, StatusCode, _} ->
?SLOG(error, #{msg => "http connector do reqeust, received error response",
request => NRequest, connector => InstId,
status_code => StatusCode}),
emqx_resource:query_failed(AfterQuery);
{ok, StatusCode, _, _} ->
?SLOG(error, #{msg => "http connector do reqeust, received error response",
request => NRequest, connector => InstId,
status_code => StatusCode}),
emqx_resource:query_failed(AfterQuery)
end, end,
Result. Result.
@ -298,10 +310,14 @@ check_ssl_opts(URLFrom, Conf) ->
{_, _} -> false {_, _} -> false
end. end.
update_path(BasePath, {Path, Headers}) -> formalize_request(Method, BasePath, {Path, Headers, _Body})
{filename:join(BasePath, Path), Headers}; when Method =:= get; Method =:= delete ->
update_path(BasePath, {Path, Headers, Body}) -> formalize_request(Method, BasePath, {Path, Headers});
{filename:join(BasePath, Path), Headers, Body}. formalize_request(_Method, BasePath, {Path, Headers, Body}) ->
{filename:join(BasePath, Path), Headers, Body};
formalize_request(_Method, BasePath, {Path, Headers}) ->
{filename:join(BasePath, Path), Headers}.
bin(Bin) when is_binary(Bin) -> bin(Bin) when is_binary(Bin) ->
Bin; Bin;

View File

@ -29,7 +29,7 @@
, bridges/0 , bridges/0
]). ]).
-export([on_message_received/2]). -export([on_message_received/3]).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ on_start/2 -export([ on_start/2
@ -105,14 +105,17 @@ drop_bridge(Name) ->
case supervisor:terminate_child(?MODULE, Name) of case supervisor:terminate_child(?MODULE, Name) of
ok -> ok ->
supervisor:delete_child(?MODULE, Name); supervisor:delete_child(?MODULE, Name);
{error, not_found} ->
ok;
{error, Error} -> {error, Error} ->
{error, Error} {error, Error}
end. end.
%% =================================================================== %% ===================================================================
%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called %% When use this bridge as a data source, ?MODULE:on_message_received will be called
%% if the bridge received msgs from the remote broker. %% if the bridge received msgs from the remote broker.
on_message_received(Msg, HookPoint) -> on_message_received(Msg, HookPoint, InstId) ->
_ = emqx_resource:query(InstId, {message_received, Msg}),
emqx:run_hook(HookPoint, [Msg]). emqx:run_hook(HookPoint, [Msg]).
%% =================================================================== %% ===================================================================
@ -123,8 +126,8 @@ on_start(InstId, Conf) ->
BasicConf = basic_config(Conf), BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{ BridgeConf = BasicConf#{
name => InstanceId, name => InstanceId,
clientid => clientid(maps:get(clientid, Conf, InstId)), clientid => clientid(InstId),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)), subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), InstId),
forwards => make_forward_confs(maps:get(egress, Conf, undefined)) forwards => make_forward_confs(maps:get(egress, Conf, undefined))
}, },
case ?MODULE:create_bridge(BridgeConf) of case ?MODULE:create_bridge(BridgeConf) of
@ -149,6 +152,9 @@ on_stop(_InstId, #{name := InstanceId}) ->
connector => InstanceId, reason => Reason}) connector => InstanceId, reason => Reason})
end. end.
on_query(_InstId, {message_received, _Msg}, AfterQuery, _State) ->
emqx_resource:query_success(AfterQuery);
on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
@ -166,15 +172,15 @@ ensure_mqtt_worker_started(InstanceId) ->
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 -> make_sub_confs(EmptyMap, _) when map_size(EmptyMap) == 0 ->
undefined; undefined;
make_sub_confs(undefined) -> make_sub_confs(undefined, _) ->
undefined; undefined;
make_sub_confs(SubRemoteConf) -> make_sub_confs(SubRemoteConf, InstId) ->
case maps:take(hookpoint, SubRemoteConf) of case maps:take(hookpoint, SubRemoteConf) of
error -> SubRemoteConf; error -> SubRemoteConf;
{HookPoint, SubConf} -> {HookPoint, SubConf} ->
MFA = {?MODULE, on_message_received, [HookPoint]}, MFA = {?MODULE, on_message_received, [HookPoint, InstId]},
SubConf#{on_message_received => MFA} SubConf#{on_message_received => MFA}
end. end.

View File

@ -168,7 +168,6 @@ handle_publish(Msg, undefined) ->
handle_publish(Msg, Vars) -> handle_publish(Msg, Vars) ->
?SLOG(debug, #{msg => "publish_to_local_broker", ?SLOG(debug, #{msg => "publish_to_local_broker",
message => Msg, vars => Vars}), message => Msg, vars => Vars}),
emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
case Vars of case Vars of
#{on_message_received := {Mod, Func, Args}} -> #{on_message_received := {Mod, Func, Args}} ->
_ = erlang:apply(Mod, Func, [Msg | Args]); _ = erlang:apply(Mod, Func, [Msg | Args]);

View File

@ -61,7 +61,7 @@ make_pub_vars(Mountpoint, Conf) when is_map(Conf) ->
-> exp_msg(). -> exp_msg().
to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
Retain0 = maps:get(retain, Flags0, false), Retain0 = maps:get(retain, Flags0, false),
MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)), MapMsg = maps:put(retain, Retain0, emqx_rule_events:eventmsg_publish(Msg)),
to_remote_msg(MapMsg, Vars); to_remote_msg(MapMsg, Vars);
to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken, to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken,
remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
@ -78,9 +78,10 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}. Msg#message{topic = topic(Mountpoint, Topic)}.
%% published from remote node over a MQTT connection %% published from remote node over a MQTT connection
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0,
#{local_topic := TopicToken, payload := PayloadToken, #{local_topic := TopicToken, payload := PayloadToken,
local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
MapMsg = format_msg_received(MapMsg0),
Topic = replace_vars_in_str(TopicToken, MapMsg), Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(PayloadToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg),
@ -89,6 +90,33 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:set_flags(#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))).
format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
qos := QoS, retain := Retain, topic := Topic}) ->
#{event => '$bridges/mqtt',
id => emqx_guid:to_hexstr(emqx_guid:gen()),
payload => Payload,
topic => Topic,
qos => QoS,
flags => #{dup => Dup, retain => Retain},
pub_props => printable_maps(Props),
timestamp => erlang:system_time(millisecond),
node => node()
}.
printable_maps(undefined) -> #{};
printable_maps(Headers) ->
maps:fold(
fun ('User-Property', V0, AccIn) when is_list(V0) ->
AccIn#{
'User-Property' => maps:from_list(V0),
'User-Property-Pairs' => [#{
key => Key,
value => Value
} || {Key, Value} <- V0]
};
(K, V0, AccIn) -> AccIn#{K => V0}
end, #{}, Headers).
process_payload([], Msg) -> process_payload([], Msg) ->
emqx_json:encode(Msg); emqx_json:encode(Msg);
process_payload(Tks, Msg) -> process_payload(Tks, Msg) ->

View File

@ -39,7 +39,7 @@ fields("config") ->
fields("connector") -> fields("connector") ->
[ {mode, [ {mode,
sc(hoconsc:enum([cluster_singleton, cluster_shareload]), sc(hoconsc:enum([cluster_shareload]),
#{ default => cluster_shareload #{ default => cluster_shareload
, desc => """ , desc => """
The mode of the MQTT Bridge. Can be one of 'cluster_singleton' or 'cluster_shareload'<br> The mode of the MQTT Bridge. Can be one of 'cluster_singleton' or 'cluster_shareload'<br>
@ -55,12 +55,17 @@ clientid conflicts between different nodes. And we can only use shared subscript
topic filters for 'remote_topic' of ingress connections. topic filters for 'remote_topic' of ingress connections.
""" """
})} })}
, {name,
sc(binary(),
#{ nullable => true
, desc => "Connector name, used as a human-readable description of the connector."
})}
, {server, , {server,
sc(emqx_schema:ip_port(), sc(emqx_schema:ip_port(),
#{ default => "127.0.0.1:1883" #{ default => "127.0.0.1:1883"
, desc => "The host and port of the remote MQTT broker" , desc => "The host and port of the remote MQTT broker"
})} })}
, {reconnect_interval, mk_duration("reconnect interval", #{default => "30s"})} , {reconnect_interval, mk_duration("reconnect interval", #{default => "15s"})}
, {proto_ver, , {proto_ver,
sc(hoconsc:enum([v3, v4, v5]), sc(hoconsc:enum([v3, v4, v5]),
#{ default => v4 #{ default => v4
@ -76,17 +81,13 @@ topic filters for 'remote_topic' of ingress connections.
#{ default => "emqx" #{ default => "emqx"
, desc => "The password of the MQTT protocol" , desc => "The password of the MQTT protocol"
})} })}
, {clientid,
sc(binary(),
#{ desc => "The clientid of the MQTT protocol"
})}
, {clean_start, , {clean_start,
sc(boolean(), sc(boolean(),
#{ default => true #{ default => true
, desc => "The clean-start or the clean-session of the MQTT protocol" , desc => "The clean-start or the clean-session of the MQTT protocol"
})} })}
, {keepalive, mk_duration("keepalive", #{default => "300s"})} , {keepalive, mk_duration("keepalive", #{default => "300s"})}
, {retry_interval, mk_duration("retry interval", #{default => "30s"})} , {retry_interval, mk_duration("retry interval", #{default => "15s"})}
, {max_inflight, , {max_inflight,
sc(integer(), sc(integer(),
#{ default => 32 #{ default => 32

View File

@ -26,11 +26,8 @@
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(CONNECTR_TYPE, <<"mqtt">>). -define(CONNECTR_TYPE, <<"mqtt">>).
-define(CONNECTR_NAME, <<"test_connector">>). -define(CONNECTR_NAME, <<"test_connector">>).
-define(CONNECTR_ID, <<"mqtt:test_connector">>).
-define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>). -define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>).
-define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>). -define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>).
-define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>).
-define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>).
-define(MQTT_CONNECOTR(Username), -define(MQTT_CONNECOTR(Username),
#{ #{
<<"server">> => <<"127.0.0.1:1883">>, <<"server">> => <<"127.0.0.1:1883">>,
@ -123,8 +120,7 @@ t_mqtt_crud_apis(_) ->
, <<"name">> => ?CONNECTR_NAME , <<"name">> => ?CONNECTR_NAME
}), }),
%ct:pal("---connector: ~p", [Connector]), #{ <<"id">> := ConnctorID
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"type">> := ?CONNECTR_TYPE , <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME , <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
@ -132,23 +128,13 @@ t_mqtt_crud_apis(_) ->
, <<"password">> := <<"">> , <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">> , <<"proto_ver">> := <<"v4">>
, <<"ssl">> := #{<<"enable">> := false} , <<"ssl">> := #{<<"enable">> := false}
}, jsx:decode(Connector)), } = jsx:decode(Connector),
%% create a again returns an error
{ok, 400, RetMsg} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
, <<"name">> => ?CONNECTR_NAME
}),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"connector already exists">>
}, jsx:decode(RetMsg)),
%% update the request-path of the connector %% update the request-path of the connector
User2 = <<"user2">>, User2 = <<"user2">>,
{ok, 200, Connector2} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 200, Connector2} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR(User2)), ?MQTT_CONNECOTR(User2)),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID ?assertMatch(#{ <<"id">> := ConnctorID
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2 , <<"username">> := User2
, <<"password">> := <<"">> , <<"password">> := <<"">>
@ -158,7 +144,7 @@ t_mqtt_crud_apis(_) ->
%% list all connectors again, assert Connector2 is in it %% list all connectors again, assert Connector2 is in it
{ok, 200, Connector2Str} = request(get, uri(["connectors"]), []), {ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
?assertMatch([#{ <<"id">> := ?CONNECTR_ID ?assertMatch([#{ <<"id">> := ConnctorID
, <<"type">> := ?CONNECTR_TYPE , <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME , <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
@ -169,8 +155,8 @@ t_mqtt_crud_apis(_) ->
}], jsx:decode(Connector2Str)), }], jsx:decode(Connector2Str)),
%% get the connector by id %% get the connector by id
{ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []), {ok, 200, Connector3Str} = request(get, uri(["connectors", ConnctorID]), []),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID ?assertMatch(#{ <<"id">> := ConnctorID
, <<"type">> := ?CONNECTR_TYPE , <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME , <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
@ -181,11 +167,11 @@ t_mqtt_crud_apis(_) ->
}, jsx:decode(Connector3Str)), }, jsx:decode(Connector3Str)),
%% delete the connector %% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
%% update a deleted connector returns an error %% update a deleted connector returns an error
{ok, 404, ErrMsg2} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 404, ErrMsg2} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR(User2)), ?MQTT_CONNECOTR(User2)),
?assertMatch( ?assertMatch(
#{ <<"code">> := _ #{ <<"code">> := _
@ -205,28 +191,28 @@ t_mqtt_conn_bridge_ingress(_) ->
, <<"name">> => ?CONNECTR_NAME , <<"name">> => ?CONNECTR_NAME
}), }),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID #{ <<"id">> := ConnctorID
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
, <<"num_of_bridges">> := 0 , <<"num_of_bridges">> := 0
, <<"username">> := User1 , <<"username">> := User1
, <<"password">> := <<"">> , <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">> , <<"proto_ver">> := <<"v4">>
, <<"ssl">> := #{<<"enable">> := false} , <<"ssl">> := #{<<"enable">> := false}
}, jsx:decode(Connector)), } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST %% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now %% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{ ?MQTT_BRIDGE_INGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE, <<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_INGRESS <<"name">> => ?BRIDGE_NAME_INGRESS
}), }),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS #{ <<"id">> := BridgeIDIngress
, <<"type">> := <<"mqtt">> , <<"type">> := <<"mqtt">>
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID , <<"connector">> := ConnctorID
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% we now test if the bridge works as expected %% we now test if the bridge works as expected
@ -252,17 +238,17 @@ t_mqtt_conn_bridge_ingress(_) ->
end), end),
%% get the connector by id, verify the num_of_bridges now is 1 %% get the connector by id, verify the num_of_bridges now is 1
{ok, 200, Connector1Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []), {ok, 200, Connector1Str} = request(get, uri(["connectors", ConnctorID]), []),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID ?assertMatch(#{ <<"id">> := ConnctorID
, <<"num_of_bridges">> := 1 , <<"num_of_bridges">> := 1
}, jsx:decode(Connector1Str)), }, jsx:decode(Connector1Str)),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector %% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
ok. ok.
@ -279,29 +265,28 @@ t_mqtt_conn_bridge_egress(_) ->
}), }),
%ct:pal("---connector: ~p", [Connector]), %ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID #{ <<"id">> := ConnctorID
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User1 , <<"username">> := User1
, <<"password">> := <<"">> , <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">> , <<"proto_ver">> := <<"v4">>
, <<"ssl">> := #{<<"enable">> := false} , <<"ssl">> := #{<<"enable">> := false}
}, jsx:decode(Connector)), } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST %% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now %% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE, <<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS <<"name">> => ?BRIDGE_NAME_EGRESS
}), }),
%ct:pal("---bridge: ~p", [Bridge]), #{ <<"id">> := BridgeIDEgress
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
, <<"type">> := ?CONNECTR_TYPE , <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?BRIDGE_NAME_EGRESS , <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID , <<"connector">> := ConnctorID
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% we now test if the bridge works as expected %% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>, LocalTopic = <<"local_topic/1">>,
@ -326,19 +311,19 @@ t_mqtt_conn_bridge_egress(_) ->
end), end),
%% verify the metrics of the bridge %% verify the metrics of the bridge
{ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS ?assertMatch(#{ <<"id">> := BridgeIDEgress
, <<"metrics">> := ?metrics(1, 1, 0, _, _, _) , <<"metrics">> := ?metrics(1, 1, 0, _, _, _)
, <<"node_metrics">> := , <<"node_metrics">> :=
[#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}] [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
}, jsx:decode(BridgeStr)), }, jsx:decode(BridgeStr)),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector %% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
ok. ok.
@ -358,37 +343,37 @@ t_mqtt_conn_update(_) ->
}), }),
%ct:pal("---connector: ~p", [Connector]), %ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID #{ <<"id">> := ConnctorID
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
}, jsx:decode(Connector)), } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST %% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now %% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE, <<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS <<"name">> => ?BRIDGE_NAME_EGRESS
}), }),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS #{ <<"id">> := BridgeIDEgress
, <<"type">> := <<"mqtt">> , <<"type">> := <<"mqtt">>
, <<"name">> := ?BRIDGE_NAME_EGRESS , <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID , <<"connector">> := ConnctorID
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% then we try to update 'server' of the connector, to an unavailable IP address %% then we try to update 'server' of the connector, to an unavailable IP address
%% the update should fail because of 'unreachable' or 'connrefused' %% the update should fail because of 'unreachable' or 'connrefused'
{ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)), ?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)),
%% we fix the 'server' parameter to a normal one, it should work %% we fix the 'server' parameter to a normal one, it should work
{ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)), ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector %% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_update2(_) -> t_mqtt_conn_update2(_) ->
@ -404,36 +389,36 @@ t_mqtt_conn_update2(_) ->
, <<"name">> => ?CONNECTR_NAME , <<"name">> => ?CONNECTR_NAME
}), }),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID #{ <<"id">> := ConnctorID
, <<"server">> := <<"127.0.0.1:2603">> , <<"server">> := <<"127.0.0.1:2603">>
}, jsx:decode(Connector)), } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST %% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now %% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]), {ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE, <<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS <<"name">> => ?BRIDGE_NAME_EGRESS
}), }),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS #{ <<"id">> := BridgeIDEgress
, <<"type">> := <<"mqtt">> , <<"type">> := <<"mqtt">>
, <<"name">> := ?BRIDGE_NAME_EGRESS , <<"name">> := ?BRIDGE_NAME_EGRESS
, <<"status">> := <<"disconnected">> , <<"status">> := <<"disconnected">>
, <<"connector">> := ?CONNECTR_ID , <<"connector">> := ConnctorID
}, jsx:decode(Bridge)), } = jsx:decode(Bridge),
%% we fix the 'server' parameter to a normal one, it should work %% we fix the 'server' parameter to a normal one, it should work
{ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)), ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
{ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS ?assertMatch(#{ <<"id">> := BridgeIDEgress
, <<"status">> := <<"connected">> , <<"status">> := <<"connected">>
}, jsx:decode(BridgeStr)), }, jsx:decode(BridgeStr)),
%% delete the bridge %% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector %% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_testing(_) -> t_mqtt_conn_testing(_) ->

View File

@ -96,7 +96,6 @@ reboot_apps() ->
, emqx_resource , emqx_resource
, emqx_rule_engine , emqx_rule_engine
, emqx_bridge , emqx_bridge
, emqx_bridge_mqtt
, emqx_plugin_libs , emqx_plugin_libs
, emqx_management , emqx_management
, emqx_retainer , emqx_retainer
@ -112,17 +111,17 @@ sorted_reboot_apps() ->
app_deps(App) -> app_deps(App) ->
case application:get_key(App, applications) of case application:get_key(App, applications) of
undefined -> []; undefined -> undefined;
{ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List) {ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List)
end. end.
sorted_reboot_apps(Apps) -> sorted_reboot_apps(Apps) ->
G = digraph:new(), G = digraph:new(),
try try
lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps), NoDepApps = add_apps_to_digraph(G, Apps),
case digraph_utils:topsort(G) of case digraph_utils:topsort(G) of
Sorted when is_list(Sorted) -> Sorted when is_list(Sorted) ->
Sorted; Sorted ++ (NoDepApps -- Sorted);
false -> false ->
Loops = find_loops(G), Loops = find_loops(G),
error({circular_application_dependency, Loops}) error({circular_application_dependency, Loops})
@ -131,17 +130,29 @@ sorted_reboot_apps(Apps) ->
digraph:delete(G) digraph:delete(G)
end. end.
add_app(G, App, undefined) -> add_apps_to_digraph(G, Apps) ->
lists:foldl(fun
({App, undefined}, Acc) ->
?SLOG(debug, #{msg => "app_is_not_loaded", app => App}),
Acc;
({App, []}, Acc) ->
Acc ++ [App]; %% use '++' to keep the original order
({App, Deps}, Acc) ->
add_app_deps_to_digraph(G, App, Deps),
Acc
end, [], Apps).
add_app_deps_to_digraph(G, App, undefined) ->
?SLOG(debug, #{msg => "app_is_not_loaded", app => App}), ?SLOG(debug, #{msg => "app_is_not_loaded", app => App}),
%% not loaded %% not loaded
add_app(G, App, []); add_app_deps_to_digraph(G, App, []);
add_app(_G, _App, []) -> add_app_deps_to_digraph(_G, _App, []) ->
ok; ok;
add_app(G, App, [Dep | Deps]) -> add_app_deps_to_digraph(G, App, [Dep | Deps]) ->
digraph:add_vertex(G, App), digraph:add_vertex(G, App),
digraph:add_vertex(G, Dep), digraph:add_vertex(G, Dep),
digraph:add_edge(G, Dep, App), %% dep -> app as dependency digraph:add_edge(G, Dep, App), %% dep -> app as dependency
add_app(G, App, Deps). add_app_deps_to_digraph(G, App, Deps).
find_loops(G) -> find_loops(G) ->
lists:filtermap( lists:filtermap(

View File

@ -43,7 +43,7 @@ init_per_suite(Config) ->
%% %%
application:unload(emqx_authz), application:unload(emqx_authz),
emqx_common_test_helpers:start_apps([]), emqx_common_test_helpers:start_apps([emqx_conf]),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->

View File

@ -149,7 +149,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
TestInstId = iolist_to_binary(emqx_misc:gen_id(16)), TestInstId = iolist_to_binary(emqx_misc:gen_id(16)),
case do_create_dry_run(TestInstId, ResourceType, Config) of case do_create_dry_run(TestInstId, ResourceType, Config) of
ok -> ok ->
do_remove(ResourceType, InstId, ResourceState), do_remove(ResourceType, InstId, ResourceState, false),
do_create(InstId, ResourceType, Config, #{force_create => true}); do_create(InstId, ResourceType, Config, #{force_create => true});
Error -> Error ->
Error Error
@ -208,15 +208,23 @@ do_remove(InstId) ->
end. end.
do_remove(Mod, InstId, ResourceState) -> do_remove(Mod, InstId, ResourceState) ->
do_remove(Mod, InstId, ResourceState, true).
do_remove(Mod, InstId, ResourceState, ClearMetrics) ->
_ = emqx_resource:call_stop(InstId, Mod, ResourceState), _ = emqx_resource:call_stop(InstId, Mod, ResourceState),
ets:delete(emqx_resource_instance, InstId), ets:delete(emqx_resource_instance, InstId),
ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId), case ClearMetrics of
ok. true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
false -> ok
end.
do_restart(InstId) -> do_restart(InstId) ->
case lookup(InstId) of case lookup(InstId) of
{ok, #{mod := Mod, state := ResourceState, config := Config} = Data} -> {ok, #{mod := Mod, state := ResourceState, config := Config} = Data} ->
_ = emqx_resource:call_stop(InstId, Mod, ResourceState), _ = case ResourceState of
undefined -> ok;
_ -> emqx_resource:call_stop(InstId, Mod, ResourceState)
end,
case emqx_resource:call_start(InstId, Mod, Config) of case emqx_resource:call_start(InstId, Mod, Config) of
{ok, NewResourceState} -> {ok, NewResourceState} ->
ets:insert(emqx_resource_instance, ets:insert(emqx_resource_instance,

View File

@ -187,11 +187,11 @@ init([]) ->
{ok, #{}}. {ok, #{}}.
handle_call({insert_rule, Rule}, _From, State) -> handle_call({insert_rule, Rule}, _From, State) ->
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_insert_rule, [Rule]), do_insert_rule(Rule),
{reply, ok, State}; {reply, ok, State};
handle_call({delete_rule, Rule}, _From, State) -> handle_call({delete_rule, Rule}, _From, State) ->
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_delete_rule, [Rule]), do_delete_rule(Rule),
{reply, ok, State}; {reply, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->

View File

@ -172,7 +172,7 @@ param_path_id() ->
{ok, _Rule} -> {ok, _Rule} ->
{400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}}; {400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}};
not_found -> not_found ->
case emqx:update_config(ConfPath, Params, #{}) of case emqx_conf:update(ConfPath, Params, #{}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id), [Rule] = get_one_rule(AllRules, Id),
{201, format_rule_resp(Rule)}; {201, format_rule_resp(Rule)};
@ -200,7 +200,7 @@ param_path_id() ->
'/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) -> '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
Params = filter_out_request_body(Params0), Params = filter_out_request_body(Params0),
ConfPath = emqx_rule_engine:config_key_path() ++ [Id], ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx:update_config(ConfPath, Params, #{}) of case emqx_conf:update(ConfPath, Params, #{}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id), [Rule] = get_one_rule(AllRules, Id),
{200, format_rule_resp(Rule)}; {200, format_rule_resp(Rule)};

View File

@ -56,7 +56,7 @@
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
, {replayq, "0.3.3"} , {replayq, "0.3.3"}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}}