diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 7db5882b2..6fd70ac49 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -172,7 +172,7 @@ create_bridge(Type, Name, Conf) -> config => Conf}), ResId = resource_id(Type, Name), case emqx_resource:create(ResId, - emqx_bridge:resource_type(Type), Conf) of + emqx_bridge:resource_type(Type), parse_confs(Type, Conf)) of {ok, already_created} -> emqx_resource:get_instance(ResId); {ok, Data} -> @@ -234,6 +234,40 @@ get_matched_bridge_id(#{from_local_topic := Filter}, Topic, BType, BName, Acc) - false -> Acc end. +parse_confs(http, #{ url := Url + , method := Method + , body := Body + , headers := Headers + , request_timeout := ReqTimeout + } = Conf) -> + {BaseUrl, Path} = parse_url(Url), + {ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl), + Conf#{ base_url => BaseUrl2 + , request => + #{ path => Path + , method => Method + , body => Body + , headers => Headers + , request_timeout => ReqTimeout + } + }; +parse_confs(_Type, Conf) -> + Conf. + +parse_url(Url) -> + case string:split(Url, "//", leading) of + [Scheme, UrlRem] -> + case string:split(UrlRem, "/", leading) of + [HostPort, Path] -> + {iolist_to_binary([Scheme, "//", HostPort]), Path}; + [HostPort] -> + {iolist_to_binary([Scheme, "//", HostPort]), <<>>} + end; + [Url] -> + error({invalid_url, Url}) + end. + + bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_bridge/src/emqx_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl index c0068f34c..9c7024c51 100644 --- a/apps/emqx_bridge/src/emqx_bridge_monitor.erl +++ b/apps/emqx_bridge/src/emqx_bridge_monitor.erl @@ -67,54 +67,6 @@ code_change(_OldVsn, State, _Extra) -> load_bridges(Configs) -> lists:foreach(fun({Type, NamedConf}) -> lists:foreach(fun({Name, Conf}) -> - load_bridge(Type, Name, Conf) + emqx_bridge:create_bridge(Type, Name, Conf) end, maps:to_list(NamedConf)) end, maps:to_list(Configs)). - -%% TODO: move this monitor into emqx_resource -%% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}). -load_bridge(<<"http">>, Name, Config) -> - do_load_bridge(<<"http">>, Name, parse_http_confs(Config)); -load_bridge(Type, Name, Config) -> - do_load_bridge(Type, Name, Config). - -do_load_bridge(Type, Name, Config) -> - case emqx_resource:check_and_create_local( - emqx_bridge:resource_id(Type, Name), - emqx_bridge:resource_type(Type), Config) of - {ok, already_created} -> ok; - {ok, _} -> ok; - {error, Reason} -> - error({load_bridge, Reason}) - end. - -parse_http_confs(#{ <<"url">> := Url - , <<"method">> := Method - , <<"body">> := Body - , <<"headers">> := Headers - , <<"request_timeout">> := ReqTimeout - } = Conf) -> - {BaseUrl, Path} = parse_url(Url), - Conf#{ <<"base_url">> => BaseUrl - , <<"request">> => - #{ <<"path">> => Path - , <<"method">> => Method - , <<"body">> => Body - , <<"headers">> => Headers - , <<"request_timeout">> => ReqTimeout - } - }. - -parse_url(Url) -> - case string:split(Url, "//", leading) of - [Scheme, UrlRem] -> - case string:split(UrlRem, "/", leading) of - [HostPort, Path] -> - {iolist_to_binary([Scheme, "//", HostPort]), Path}; - [HostPort] -> - {iolist_to_binary([Scheme, "//", HostPort]), <<>>} - end; - [Url] -> - error({invalid_url, Url}) - end. - diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 65f612f21..d0a7aeec6 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -46,22 +46,19 @@ init_per_testcase(_, Config) -> end_per_testcase(_, _Config) -> ok. --define(PATH1, <<"path1">>). --define(PATH2, <<"path2">>). --define(HTTP_BRIDGE(PATH), +-define(URL1, <<"http://localhost:9901/path1">>). +-define(URL2, <<"http://localhost:9901/path2">>). +-define(HTTP_BRIDGE(URL), #{ - <<"base_url">> => <<"http://localhost:9901">>, - <<"egress">> => #{ - <<"a">> => #{ - <<"from_local_topic">> => <<"emqx_http/#">>, - <<"method">> => <<"post">>, - <<"path">> => PATH, - <<"body">> => <<"${payload}">>, - <<"headers">> => #{ - <<"content-type">> => <<"application/json">> - } - } + <<"url">> => URL, + <<"from_local_topic">> => <<"emqx_http/#">>, + <<"method">> => <<"post">>, + <<"ssl">> => #{<<"enable">> => false}, + <<"body">> => <<"${payload}">>, + <<"headers">> => #{ + <<"content-type">> => <<"application/json">> } + }). %%------------------------------------------------------------------------------ @@ -107,29 +104,25 @@ t_crud_apis(_) -> %% then we add a http bridge now {200, [Bridge]} = emqx_bridge_api:crud_bridges_cluster(put, #{ bindings => #{id => <<"http:test_bridge">>} - , body => ?HTTP_BRIDGE(?PATH1) + , body => ?HTTP_BRIDGE(?URL1) }), %ct:pal("---bridge: ~p", [Bridge]), ?assertMatch(#{ id := <<"http:test_bridge">> , bridge_type := http , is_connected := _ , node := _ - , <<"egress">> := #{ - <<"a">> := #{<<"path">> := ?PATH1} - } + , <<"url">> := ?URL1 }, Bridge), %% update the request-path of the bridge {200, [Bridge2]} = emqx_bridge_api:crud_bridges_cluster(put, #{ bindings => #{id => <<"http:test_bridge">>} - , body => ?HTTP_BRIDGE(?PATH2) + , body => ?HTTP_BRIDGE(?URL2) }), ?assertMatch(#{ id := <<"http:test_bridge">> , bridge_type := http , is_connected := _ - , <<"egress">> := #{ - <<"a">> := #{<<"path">> := ?PATH2} - } + , <<"url">> := ?URL2 }, Bridge2), %% list all bridges again, assert Bridge2 is in it diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 388e56919..87ebf3863 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -127,6 +127,7 @@ sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). %% =================================================================== + on_start(InstId, #{base_url := #{scheme := Scheme, host := Host, port := Port,