fix(bridge): convert confs for http bridge when creating
This commit is contained in:
parent
f01f9632c1
commit
1e6884ee7d
|
@ -172,7 +172,7 @@ create_bridge(Type, Name, Conf) ->
|
||||||
config => Conf}),
|
config => Conf}),
|
||||||
ResId = resource_id(Type, Name),
|
ResId = resource_id(Type, Name),
|
||||||
case emqx_resource:create(ResId,
|
case emqx_resource:create(ResId,
|
||||||
emqx_bridge:resource_type(Type), Conf) of
|
emqx_bridge:resource_type(Type), parse_confs(Type, Conf)) of
|
||||||
{ok, already_created} ->
|
{ok, already_created} ->
|
||||||
emqx_resource:get_instance(ResId);
|
emqx_resource:get_instance(ResId);
|
||||||
{ok, Data} ->
|
{ok, Data} ->
|
||||||
|
@ -234,6 +234,40 @@ get_matched_bridge_id(#{from_local_topic := Filter}, Topic, BType, BName, Acc) -
|
||||||
false -> Acc
|
false -> Acc
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
parse_confs(http, #{ url := Url
|
||||||
|
, method := Method
|
||||||
|
, body := Body
|
||||||
|
, headers := Headers
|
||||||
|
, request_timeout := ReqTimeout
|
||||||
|
} = Conf) ->
|
||||||
|
{BaseUrl, Path} = parse_url(Url),
|
||||||
|
{ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl),
|
||||||
|
Conf#{ base_url => BaseUrl2
|
||||||
|
, request =>
|
||||||
|
#{ path => Path
|
||||||
|
, method => Method
|
||||||
|
, body => Body
|
||||||
|
, headers => Headers
|
||||||
|
, request_timeout => ReqTimeout
|
||||||
|
}
|
||||||
|
};
|
||||||
|
parse_confs(_Type, Conf) ->
|
||||||
|
Conf.
|
||||||
|
|
||||||
|
parse_url(Url) ->
|
||||||
|
case string:split(Url, "//", leading) of
|
||||||
|
[Scheme, UrlRem] ->
|
||||||
|
case string:split(UrlRem, "/", leading) of
|
||||||
|
[HostPort, Path] ->
|
||||||
|
{iolist_to_binary([Scheme, "//", HostPort]), Path};
|
||||||
|
[HostPort] ->
|
||||||
|
{iolist_to_binary([Scheme, "//", HostPort]), <<>>}
|
||||||
|
end;
|
||||||
|
[Url] ->
|
||||||
|
error({invalid_url, Url})
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
bin(Bin) when is_binary(Bin) -> Bin;
|
bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
||||||
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
||||||
|
|
|
@ -67,54 +67,6 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
load_bridges(Configs) ->
|
load_bridges(Configs) ->
|
||||||
lists:foreach(fun({Type, NamedConf}) ->
|
lists:foreach(fun({Type, NamedConf}) ->
|
||||||
lists:foreach(fun({Name, Conf}) ->
|
lists:foreach(fun({Name, Conf}) ->
|
||||||
load_bridge(Type, Name, Conf)
|
emqx_bridge:create_bridge(Type, Name, Conf)
|
||||||
end, maps:to_list(NamedConf))
|
end, maps:to_list(NamedConf))
|
||||||
end, maps:to_list(Configs)).
|
end, maps:to_list(Configs)).
|
||||||
|
|
||||||
%% TODO: move this monitor into emqx_resource
|
|
||||||
%% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}).
|
|
||||||
load_bridge(<<"http">>, Name, Config) ->
|
|
||||||
do_load_bridge(<<"http">>, Name, parse_http_confs(Config));
|
|
||||||
load_bridge(Type, Name, Config) ->
|
|
||||||
do_load_bridge(Type, Name, Config).
|
|
||||||
|
|
||||||
do_load_bridge(Type, Name, Config) ->
|
|
||||||
case emqx_resource:check_and_create_local(
|
|
||||||
emqx_bridge:resource_id(Type, Name),
|
|
||||||
emqx_bridge:resource_type(Type), Config) of
|
|
||||||
{ok, already_created} -> ok;
|
|
||||||
{ok, _} -> ok;
|
|
||||||
{error, Reason} ->
|
|
||||||
error({load_bridge, Reason})
|
|
||||||
end.
|
|
||||||
|
|
||||||
parse_http_confs(#{ <<"url">> := Url
|
|
||||||
, <<"method">> := Method
|
|
||||||
, <<"body">> := Body
|
|
||||||
, <<"headers">> := Headers
|
|
||||||
, <<"request_timeout">> := ReqTimeout
|
|
||||||
} = Conf) ->
|
|
||||||
{BaseUrl, Path} = parse_url(Url),
|
|
||||||
Conf#{ <<"base_url">> => BaseUrl
|
|
||||||
, <<"request">> =>
|
|
||||||
#{ <<"path">> => Path
|
|
||||||
, <<"method">> => Method
|
|
||||||
, <<"body">> => Body
|
|
||||||
, <<"headers">> => Headers
|
|
||||||
, <<"request_timeout">> => ReqTimeout
|
|
||||||
}
|
|
||||||
}.
|
|
||||||
|
|
||||||
parse_url(Url) ->
|
|
||||||
case string:split(Url, "//", leading) of
|
|
||||||
[Scheme, UrlRem] ->
|
|
||||||
case string:split(UrlRem, "/", leading) of
|
|
||||||
[HostPort, Path] ->
|
|
||||||
{iolist_to_binary([Scheme, "//", HostPort]), Path};
|
|
||||||
[HostPort] ->
|
|
||||||
{iolist_to_binary([Scheme, "//", HostPort]), <<>>}
|
|
||||||
end;
|
|
||||||
[Url] ->
|
|
||||||
error({invalid_url, Url})
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
|
@ -46,22 +46,19 @@ init_per_testcase(_, Config) ->
|
||||||
end_per_testcase(_, _Config) ->
|
end_per_testcase(_, _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-define(PATH1, <<"path1">>).
|
-define(URL1, <<"http://localhost:9901/path1">>).
|
||||||
-define(PATH2, <<"path2">>).
|
-define(URL2, <<"http://localhost:9901/path2">>).
|
||||||
-define(HTTP_BRIDGE(PATH),
|
-define(HTTP_BRIDGE(URL),
|
||||||
#{
|
#{
|
||||||
<<"base_url">> => <<"http://localhost:9901">>,
|
<<"url">> => URL,
|
||||||
<<"egress">> => #{
|
|
||||||
<<"a">> => #{
|
|
||||||
<<"from_local_topic">> => <<"emqx_http/#">>,
|
<<"from_local_topic">> => <<"emqx_http/#">>,
|
||||||
<<"method">> => <<"post">>,
|
<<"method">> => <<"post">>,
|
||||||
<<"path">> => PATH,
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
<<"body">> => <<"${payload}">>,
|
<<"body">> => <<"${payload}">>,
|
||||||
<<"headers">> => #{
|
<<"headers">> => #{
|
||||||
<<"content-type">> => <<"application/json">>
|
<<"content-type">> => <<"application/json">>
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -107,29 +104,25 @@ t_crud_apis(_) ->
|
||||||
%% then we add a http bridge now
|
%% then we add a http bridge now
|
||||||
{200, [Bridge]} = emqx_bridge_api:crud_bridges_cluster(put,
|
{200, [Bridge]} = emqx_bridge_api:crud_bridges_cluster(put,
|
||||||
#{ bindings => #{id => <<"http:test_bridge">>}
|
#{ bindings => #{id => <<"http:test_bridge">>}
|
||||||
, body => ?HTTP_BRIDGE(?PATH1)
|
, body => ?HTTP_BRIDGE(?URL1)
|
||||||
}),
|
}),
|
||||||
%ct:pal("---bridge: ~p", [Bridge]),
|
%ct:pal("---bridge: ~p", [Bridge]),
|
||||||
?assertMatch(#{ id := <<"http:test_bridge">>
|
?assertMatch(#{ id := <<"http:test_bridge">>
|
||||||
, bridge_type := http
|
, bridge_type := http
|
||||||
, is_connected := _
|
, is_connected := _
|
||||||
, node := _
|
, node := _
|
||||||
, <<"egress">> := #{
|
, <<"url">> := ?URL1
|
||||||
<<"a">> := #{<<"path">> := ?PATH1}
|
|
||||||
}
|
|
||||||
}, Bridge),
|
}, Bridge),
|
||||||
|
|
||||||
%% update the request-path of the bridge
|
%% update the request-path of the bridge
|
||||||
{200, [Bridge2]} = emqx_bridge_api:crud_bridges_cluster(put,
|
{200, [Bridge2]} = emqx_bridge_api:crud_bridges_cluster(put,
|
||||||
#{ bindings => #{id => <<"http:test_bridge">>}
|
#{ bindings => #{id => <<"http:test_bridge">>}
|
||||||
, body => ?HTTP_BRIDGE(?PATH2)
|
, body => ?HTTP_BRIDGE(?URL2)
|
||||||
}),
|
}),
|
||||||
?assertMatch(#{ id := <<"http:test_bridge">>
|
?assertMatch(#{ id := <<"http:test_bridge">>
|
||||||
, bridge_type := http
|
, bridge_type := http
|
||||||
, is_connected := _
|
, is_connected := _
|
||||||
, <<"egress">> := #{
|
, <<"url">> := ?URL2
|
||||||
<<"a">> := #{<<"path">> := ?PATH2}
|
|
||||||
}
|
|
||||||
}, Bridge2),
|
}, Bridge2),
|
||||||
|
|
||||||
%% list all bridges again, assert Bridge2 is in it
|
%% list all bridges again, assert Bridge2 is in it
|
||||||
|
|
|
@ -127,6 +127,7 @@ sc(Type, Meta) -> hoconsc:mk(Type, Meta).
|
||||||
ref(Field) -> hoconsc:ref(?MODULE, Field).
|
ref(Field) -> hoconsc:ref(?MODULE, Field).
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
|
||||||
on_start(InstId, #{base_url := #{scheme := Scheme,
|
on_start(InstId, #{base_url := #{scheme := Scheme,
|
||||||
host := Host,
|
host := Host,
|
||||||
port := Port,
|
port := Port,
|
||||||
|
|
Loading…
Reference in New Issue