refactor(bridge): the configs for http bridges
This commit is contained in:
parent
90a65b8d04
commit
dd9e2c4b24
|
@ -2,9 +2,11 @@
|
||||||
## EMQ X Bridge
|
## EMQ X Bridge
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
|
|
||||||
#bridges.mqtt.my_mqtt_bridge_to_aws {
|
## MQTT bridges to/from another MQTT broker
|
||||||
|
#bridges.mqtt.my_mqtt_bridge_from_aws {
|
||||||
# server = "127.0.0.1:1883"
|
# server = "127.0.0.1:1883"
|
||||||
# proto_ver = "v4"
|
# proto_ver = "v4"
|
||||||
|
# clientid = "my_mqtt_bridge_from_aws"
|
||||||
# username = "username1"
|
# username = "username1"
|
||||||
# password = ""
|
# password = ""
|
||||||
# clean_start = true
|
# clean_start = true
|
||||||
|
@ -25,50 +27,75 @@
|
||||||
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
||||||
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||||
# }
|
# }
|
||||||
# ## We will create one MQTT connection for each element of the `ingress_channels`
|
#
|
||||||
# ## Syntax: ingress_channels.<id>
|
# ## topic mappings for this bridge
|
||||||
# ingress_channels.pull_msgs_from_aws {
|
# direction = in
|
||||||
# subscribe_remote_topic = "aws/#"
|
# from_remote_topic = "aws/#"
|
||||||
# subscribe_qos = 1
|
# subscribe_qos = 1
|
||||||
# local_topic = "from_aws/${topic}"
|
# to_local_topic = "from_aws/${topic}"
|
||||||
# payload = "${payload}"
|
# payload = "${payload}"
|
||||||
# qos = "${qos}"
|
# qos = "${qos}"
|
||||||
# retain = "${retain}"
|
# retain = "${retain}"
|
||||||
# }
|
|
||||||
# ## We will create one MQTT connection for each element of the `egress_channels`
|
|
||||||
# ## Syntax: egress_channels.<id>
|
|
||||||
# egress_channels.push_msgs_to_aws {
|
|
||||||
# subscribe_local_topic = "emqx/#"
|
|
||||||
# remote_topic = "from_emqx/${topic}"
|
|
||||||
# payload = "${payload}"
|
|
||||||
# qos = 1
|
|
||||||
# retain = false
|
|
||||||
# }
|
|
||||||
#}
|
#}
|
||||||
#
|
#
|
||||||
#bridges.http.my_http_bridge {
|
#bridges.mqtt.my_mqtt_bridge_to_aws {
|
||||||
# base_url: "http://localhost:9901"
|
# server = "127.0.0.1:1883"
|
||||||
# connect_timeout: "30s"
|
# proto_ver = "v4"
|
||||||
# max_retries: 3
|
# clientid = "my_mqtt_bridge_to_aws"
|
||||||
# retry_interval = "10s"
|
# username = "username1"
|
||||||
# pool_type = "hash"
|
# password = ""
|
||||||
# pool_size = 4
|
# clean_start = true
|
||||||
# enable_pipelining = true
|
# keepalive = 300
|
||||||
|
# retry_interval = "30s"
|
||||||
|
# max_inflight = 32
|
||||||
|
# reconnect_interval = "30s"
|
||||||
|
# bridge_mode = true
|
||||||
|
# replayq {
|
||||||
|
# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
|
||||||
|
# seg_bytes = "100MB"
|
||||||
|
# offload = false
|
||||||
|
# max_total_bytes = "1GB"
|
||||||
|
# }
|
||||||
# ssl {
|
# ssl {
|
||||||
# enable = false
|
# enable = false
|
||||||
# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
|
# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
|
||||||
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
||||||
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||||
# }
|
# }
|
||||||
# egress_channels.post_messages {
|
#
|
||||||
# subscribe_local_topic = "emqx_http/#"
|
# ## topic mappings for this bridge
|
||||||
# request_timeout: "30s"
|
# direction = out
|
||||||
# ## following config entries can use placehodler variables
|
# from_local_topic = "emqx/#"
|
||||||
# method = post
|
# to_remote_topic = "from_emqx/${topic}"
|
||||||
# path = "/messages/${topic}"
|
# payload = "${payload}"
|
||||||
# body = "${payload}"
|
# qos = 1
|
||||||
# headers {
|
# retain = false
|
||||||
# "content-type": "application/json"
|
|
||||||
# }
|
|
||||||
# }
|
|
||||||
#}
|
#}
|
||||||
|
|
||||||
|
## HTTP bridges to a http server
|
||||||
|
bridges.http.my_http_bridge {
|
||||||
|
## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url string
|
||||||
|
url = "http://localhost:9901/messages/${topic}"
|
||||||
|
request_timeout = "30s"
|
||||||
|
connect_timeout = "30s"
|
||||||
|
max_retries = 3
|
||||||
|
retry_interval = "10s"
|
||||||
|
pool_type = "random"
|
||||||
|
pool_size = 4
|
||||||
|
enable_pipelining = true
|
||||||
|
ssl {
|
||||||
|
enable = false
|
||||||
|
keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
|
||||||
|
certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
|
||||||
|
cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||||
|
}
|
||||||
|
|
||||||
|
from_local_topic = "emqx_http/#"
|
||||||
|
## the following config entries can use placehodler variables:
|
||||||
|
## url, method, body, headers
|
||||||
|
method = post
|
||||||
|
body = "${payload}"
|
||||||
|
headers {
|
||||||
|
"content-type": "application/json"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -45,8 +45,6 @@
|
||||||
, resource_id/1
|
, resource_id/1
|
||||||
, resource_id/2
|
, resource_id/2
|
||||||
, parse_bridge_id/1
|
, parse_bridge_id/1
|
||||||
, channel_id/4
|
|
||||||
, parse_channel_id/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
reload_hook() ->
|
reload_hook() ->
|
||||||
|
@ -58,11 +56,8 @@ reload_hook() ->
|
||||||
end, maps:to_list(Bridge))
|
end, maps:to_list(Bridge))
|
||||||
end, maps:to_list(Bridges)).
|
end, maps:to_list(Bridges)).
|
||||||
|
|
||||||
load_hook(#{egress_channels := Channels}) ->
|
load_hook(#{from_local_topic := _}) ->
|
||||||
case has_subscribe_local_topic(Channels) of
|
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
|
||||||
true -> ok;
|
|
||||||
false -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []})
|
|
||||||
end;
|
|
||||||
load_hook(_Conf) -> ok.
|
load_hook(_Conf) -> ok.
|
||||||
|
|
||||||
unload_hook() ->
|
unload_hook() ->
|
||||||
|
@ -71,28 +66,25 @@ unload_hook() ->
|
||||||
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
|
||||||
case maps:get(sys, Flags, false) of
|
case maps:get(sys, Flags, false) of
|
||||||
false ->
|
false ->
|
||||||
ChannelIds = get_matched_channels(Topic),
|
lists:foreach(fun (Id) ->
|
||||||
lists:foreach(fun(ChannelId) ->
|
send_message(Id, emqx_message:to_map(Message))
|
||||||
send_message(ChannelId, emqx_message:to_map(Message))
|
end, get_matched_bridges(Topic));
|
||||||
end, ChannelIds);
|
|
||||||
true -> ok
|
true -> ok
|
||||||
end,
|
end,
|
||||||
{ok, Message}.
|
{ok, Message}.
|
||||||
|
|
||||||
%% TODO: remove this clause, treat mqtt bridges the same as other bridges
|
send_message(BridgeId, Message) ->
|
||||||
send_message(ChannelId, Message) ->
|
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
|
||||||
{BridgeType, BridgeName, _, _} = parse_channel_id(ChannelId),
|
|
||||||
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
|
||||||
do_send_message(ResId, ChannelId, Message).
|
emqx_resource:query(ResId, {send_message, BridgeId, Message}).
|
||||||
|
|
||||||
do_send_message(ResId, ChannelId, Message) ->
|
|
||||||
emqx_resource:query(ResId, {send_message, ChannelId, Message}).
|
|
||||||
|
|
||||||
config_key_path() ->
|
config_key_path() ->
|
||||||
[bridges].
|
[bridges].
|
||||||
|
|
||||||
resource_type(mqtt) -> emqx_connector_mqtt;
|
resource_type(mqtt) -> emqx_connector_mqtt;
|
||||||
resource_type(http) -> emqx_connector_http.
|
resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
|
||||||
|
resource_type(http) -> emqx_connector_http;
|
||||||
|
resource_type(<<"http">>) -> emqx_connector_http.
|
||||||
|
|
||||||
bridge_type(emqx_connector_mqtt) -> mqtt;
|
bridge_type(emqx_connector_mqtt) -> mqtt;
|
||||||
bridge_type(emqx_connector_http) -> http.
|
bridge_type(emqx_connector_http) -> http.
|
||||||
|
@ -104,7 +96,8 @@ post_config_update(_Req, NewConf, OldConf, _AppEnv) ->
|
||||||
{fun remove_bridge/3, Removed},
|
{fun remove_bridge/3, Removed},
|
||||||
{fun create_bridge/3, Added},
|
{fun create_bridge/3, Added},
|
||||||
{fun update_bridge/3, Updated}
|
{fun update_bridge/3, Updated}
|
||||||
]).
|
]),
|
||||||
|
reload_hook().
|
||||||
|
|
||||||
perform_bridge_changes(Tasks) ->
|
perform_bridge_changes(Tasks) ->
|
||||||
perform_bridge_changes(Tasks, ok).
|
perform_bridge_changes(Tasks, ok).
|
||||||
|
@ -145,20 +138,6 @@ parse_bridge_id(BridgeId) ->
|
||||||
_ -> error({invalid_bridge_id, BridgeId})
|
_ -> error({invalid_bridge_id, BridgeId})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
channel_id(BridgeType, BridgeName, ChannelType, ChannelName) ->
|
|
||||||
BType = bin(BridgeType),
|
|
||||||
BName = bin(BridgeName),
|
|
||||||
CType = bin(ChannelType),
|
|
||||||
CName = bin(ChannelName),
|
|
||||||
<<BType/binary, ":", BName/binary, ":", CType/binary, ":", CName/binary>>.
|
|
||||||
|
|
||||||
parse_channel_id(ChannelId) ->
|
|
||||||
case string:split(bin(ChannelId), ":", all) of
|
|
||||||
[BridgeType, BridgeName, ChannelType, ChannelName] ->
|
|
||||||
{BridgeType, BridgeName, ChannelType, ChannelName};
|
|
||||||
_ -> error({invalid_bridge_id, ChannelId})
|
|
||||||
end.
|
|
||||||
|
|
||||||
list_bridges() ->
|
list_bridges() ->
|
||||||
lists:foldl(fun({Type, NameAndConf}, Bridges) ->
|
lists:foldl(fun({Type, NameAndConf}, Bridges) ->
|
||||||
lists:foldl(fun({Name, RawConf}, Acc) ->
|
lists:foldl(fun({Name, RawConf}, Acc) ->
|
||||||
|
@ -167,7 +146,7 @@ list_bridges() ->
|
||||||
{ok, Res} -> [Res | Acc]
|
{ok, Res} -> [Res | Acc]
|
||||||
end
|
end
|
||||||
end, Bridges, maps:to_list(NameAndConf))
|
end, Bridges, maps:to_list(NameAndConf))
|
||||||
end, [], maps:to_list(emqx:get_raw_config([bridges]))).
|
end, [], maps:to_list(emqx:get_raw_config([bridges], #{}))).
|
||||||
|
|
||||||
get_bridge(Type, Name) ->
|
get_bridge(Type, Name) ->
|
||||||
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
|
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
|
||||||
|
@ -205,11 +184,11 @@ create_bridge(Type, Name, Conf) ->
|
||||||
update_bridge(Type, Name, {_OldConf, Conf}) ->
|
update_bridge(Type, Name, {_OldConf, Conf}) ->
|
||||||
%% TODO: sometimes its not necessary to restart the bridge connection.
|
%% TODO: sometimes its not necessary to restart the bridge connection.
|
||||||
%%
|
%%
|
||||||
%% - if the connection related configs like `username` is updated, we should restart/start
|
%% - if the connection related configs like `servers` is updated, we should restart/start
|
||||||
%% or stop bridges according to the change.
|
%% or stop bridges according to the change.
|
||||||
%% - if the connection related configs are not update, but channel configs `ingress_channels` or
|
%% - if the connection related configs are not update, only non-connection configs like
|
||||||
%% `egress_channels` are changed, then we should not restart the bridge, we only restart/start
|
%% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated
|
||||||
%% the channels.
|
%% without restarting the bridge.
|
||||||
%%
|
%%
|
||||||
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
|
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
|
||||||
config => Conf}),
|
config => Conf}),
|
||||||
|
@ -238,35 +217,19 @@ flatten_confs(Conf0) ->
|
||||||
do_flatten_confs(Type, Conf0) ->
|
do_flatten_confs(Type, Conf0) ->
|
||||||
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
|
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
|
||||||
|
|
||||||
has_subscribe_local_topic(Channels) ->
|
get_matched_bridges(Topic) ->
|
||||||
lists:any(fun (#{subscribe_local_topic := _}) -> true;
|
Bridges = emqx:get_config([bridges], #{}),
|
||||||
(_) -> false
|
maps:fold(fun (BType, Conf, Acc0) ->
|
||||||
end, maps:to_list(Channels)).
|
|
||||||
|
|
||||||
get_matched_channels(Topic) ->
|
|
||||||
Bridges = emqx_conf:get([bridges], #{}),
|
|
||||||
maps:fold(fun
|
maps:fold(fun
|
||||||
%% TODO: also trigger 'message.publish' for mqtt bridges.
|
(BName, #{from_local_topic := Filter}, Acc1) ->
|
||||||
(mqtt, _Conf, Acc0) -> Acc0;
|
case emqx_topic:match(Topic, Filter) of
|
||||||
(BType, Conf, Acc0) ->
|
true -> [bridge_id(BType, BName) | Acc1];
|
||||||
maps:fold(fun
|
false -> Acc1
|
||||||
(BName, #{egress_channels := Channels}, Acc1) ->
|
end;
|
||||||
do_get_matched_channels(Topic, Channels, BType, BName, egress_channels)
|
|
||||||
++ Acc1;
|
|
||||||
(_Name, _BridgeConf, Acc1) -> Acc1
|
(_Name, _BridgeConf, Acc1) -> Acc1
|
||||||
end, Acc0, Conf)
|
end, Acc0, Conf)
|
||||||
end, [], Bridges).
|
end, [], Bridges).
|
||||||
|
|
||||||
do_get_matched_channels(Topic, Channels, BType, BName, CType) ->
|
|
||||||
maps:fold(fun
|
|
||||||
(ChannName, #{subscribe_local_topic := Filter}, Acc) ->
|
|
||||||
case emqx_topic:match(Topic, Filter) of
|
|
||||||
true -> [channel_id(BType, BName, CType, ChannName) | Acc];
|
|
||||||
false -> Acc
|
|
||||||
end;
|
|
||||||
(_ChannName, _ChannConf, Acc) -> Acc
|
|
||||||
end, [], Channels).
|
|
||||||
|
|
||||||
bin(Bin) when is_binary(Bin) -> Bin;
|
bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
||||||
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
||||||
|
|
|
@ -67,14 +67,18 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
load_bridges(Configs) ->
|
load_bridges(Configs) ->
|
||||||
lists:foreach(fun({Type, NamedConf}) ->
|
lists:foreach(fun({Type, NamedConf}) ->
|
||||||
lists:foreach(fun({Name, Conf}) ->
|
lists:foreach(fun({Name, Conf}) ->
|
||||||
load_bridge(Name, Type, Conf)
|
load_bridge(Type, Name, Conf)
|
||||||
end, maps:to_list(NamedConf))
|
end, maps:to_list(NamedConf))
|
||||||
end, maps:to_list(Configs)).
|
end, maps:to_list(Configs)).
|
||||||
|
|
||||||
%% TODO: move this monitor into emqx_resource
|
%% TODO: move this monitor into emqx_resource
|
||||||
%% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}).
|
%% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}).
|
||||||
load_bridge(Name, Type, Config) ->
|
load_bridge(<<"http">>, Name, Config) ->
|
||||||
case emqx_resource:create_local(
|
Config1 = parse_http_confs(Config),
|
||||||
|
do_load_bridge(<<"http">>, Name, Config1).
|
||||||
|
|
||||||
|
do_load_bridge(Type, Name, Config) ->
|
||||||
|
case emqx_resource:check_and_create_local(
|
||||||
emqx_bridge:resource_id(Type, Name),
|
emqx_bridge:resource_id(Type, Name),
|
||||||
emqx_bridge:resource_type(Type), Config) of
|
emqx_bridge:resource_type(Type), Config) of
|
||||||
{ok, already_created} -> ok;
|
{ok, already_created} -> ok;
|
||||||
|
@ -82,3 +86,28 @@ load_bridge(Name, Type, Config) ->
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
error({load_bridge, Reason})
|
error({load_bridge, Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
parse_http_confs(#{ <<"url">> := Url
|
||||||
|
, <<"method">> := Method
|
||||||
|
, <<"body">> := Body
|
||||||
|
, <<"headers">> := Headers
|
||||||
|
} = Conf) ->
|
||||||
|
{BaseUrl, Path} = parse_url(Url),
|
||||||
|
Conf#{ <<"base_url">> => BaseUrl
|
||||||
|
, <<"preprocessed_request">> =>
|
||||||
|
emqx_connector_http:preprocess_request(Method, Path, Body, Headers)
|
||||||
|
}.
|
||||||
|
|
||||||
|
parse_url(Url) ->
|
||||||
|
case string:split(Url, "//", leading) of
|
||||||
|
[Scheme, UrlRem] ->
|
||||||
|
case string:split(UrlRem, "/", leading) of
|
||||||
|
[HostPort, Path] ->
|
||||||
|
{iolist_to_binary([Scheme, "//", HostPort]), Path};
|
||||||
|
[HostPort] ->
|
||||||
|
{iolist_to_binary([Scheme, "//", HostPort]), <<>>}
|
||||||
|
end;
|
||||||
|
[Url] ->
|
||||||
|
error({invalid_url, Url})
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,24 @@ fields("mqtt_bridge") ->
|
||||||
emqx_connector_mqtt:fields("config");
|
emqx_connector_mqtt:fields("config");
|
||||||
|
|
||||||
fields("http_bridge") ->
|
fields("http_bridge") ->
|
||||||
emqx_connector_http:fields(config) ++ http_channels().
|
basic_config_http() ++
|
||||||
|
[ {url, hoconsc:mk(binary())}
|
||||||
|
, {from_local_topic, hoconsc:mk(binary())}
|
||||||
|
, {method, hoconsc:mk(method(), #{default => post})}
|
||||||
|
, {headers, hoconsc:mk(map(),
|
||||||
|
#{default => #{
|
||||||
|
<<"accept">> => <<"application/json">>,
|
||||||
|
<<"cache-control">> => <<"no-cache">>,
|
||||||
|
<<"connection">> => <<"keep-alive">>,
|
||||||
|
<<"content-type">> => <<"application/json">>,
|
||||||
|
<<"keep-alive">> => <<"timeout=5">>}})
|
||||||
|
}
|
||||||
|
, {body, hoconsc:mk(binary(), #{default => <<"${payload}">>})}
|
||||||
|
, {request_timeout, hoconsc:mk(emqx_schema:duration_ms(), #{default => <<"30s">>})}
|
||||||
|
].
|
||||||
|
|
||||||
http_channels() ->
|
basic_config_http() ->
|
||||||
[{egress_channels, hoconsc:mk(hoconsc:map(id,
|
proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||||
hoconsc:ref(emqx_connector_http, "http_request")))}].
|
|
||||||
|
method() ->
|
||||||
|
hoconsc:enum([post, put, get, delete]).
|
||||||
|
|
|
@ -0,0 +1,144 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_api_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-define(CONF_DEFAULT, <<"bridges: {}">>).
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
[].
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
application:load(emqx_machine),
|
||||||
|
ok = ekka:start(),
|
||||||
|
ok = emqx_ct_helpers:start_apps([emqx_bridge]),
|
||||||
|
ok = emqx_config:init_load(emqx_bridge_schema, ?CONF_DEFAULT),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
ok = ekka:stop(),
|
||||||
|
emqx_ct_helpers:stop_apps([emqx_bridge]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
init_per_testcase(_, Config) ->
|
||||||
|
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||||
|
Config.
|
||||||
|
end_per_testcase(_, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-define(PATH1, <<"path1">>).
|
||||||
|
-define(PATH2, <<"path2">>).
|
||||||
|
-define(HTTP_BRIDGE(PATH),
|
||||||
|
#{
|
||||||
|
<<"base_url">> => <<"http://localhost:9901">>,
|
||||||
|
<<"egress_channels">> => #{
|
||||||
|
<<"a">> => #{
|
||||||
|
<<"subscribe_local_topic">> => <<"emqx_http/#">>,
|
||||||
|
<<"method">> => <<"post">>,
|
||||||
|
<<"path">> => PATH,
|
||||||
|
<<"body">> => <<"${payload}">>,
|
||||||
|
<<"headers">> => #{
|
||||||
|
<<"content-type">> => <<"application/json">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% HTTP server for testing
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
start_http_server(Port, HandleFun) ->
|
||||||
|
spawn_link(fun() ->
|
||||||
|
{ok, Sock} = gen_tcp:listen(Port, [{active, false}]),
|
||||||
|
loop(Sock, HandleFun)
|
||||||
|
end).
|
||||||
|
|
||||||
|
loop(Sock, HandleFun) ->
|
||||||
|
{ok, Conn} = gen_tcp:accept(Sock),
|
||||||
|
Handler = spawn(fun () -> HandleFun(Conn) end),
|
||||||
|
gen_tcp:controlling_process(Conn, Handler),
|
||||||
|
loop(Sock, HandleFun).
|
||||||
|
|
||||||
|
make_response(CodeStr, Str) ->
|
||||||
|
B = iolist_to_binary(Str),
|
||||||
|
iolist_to_binary(
|
||||||
|
io_lib:fwrite(
|
||||||
|
"HTTP/1.0 ~s\nContent-Type: text/html\nContent-Length: ~p\n\n~s",
|
||||||
|
[CodeStr, size(B), B])).
|
||||||
|
|
||||||
|
handle_fun_200_ok(Conn) ->
|
||||||
|
case gen_tcp:recv(Conn, 0) of
|
||||||
|
{ok, Request} ->
|
||||||
|
gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
|
||||||
|
self() ! {http_server, received, Request},
|
||||||
|
handle_fun_200_ok(Conn);
|
||||||
|
{error, closed} ->
|
||||||
|
gen_tcp:close(Conn)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Testcases
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_crud_apis(_) ->
|
||||||
|
%% assert we there's no bridges at first
|
||||||
|
{200, []} = emqx_bridge_api:list_bridges(get, #{}),
|
||||||
|
|
||||||
|
%% then we add a http bridge now
|
||||||
|
{200, [Bridge]} = emqx_bridge_api:crud_bridges_cluster(put,
|
||||||
|
#{ bindings => #{id => <<"http:test_bridge">>}
|
||||||
|
, body => ?HTTP_BRIDGE(?PATH1)
|
||||||
|
}),
|
||||||
|
%ct:pal("---bridge: ~p", [Bridge]),
|
||||||
|
?assertMatch(#{ id := <<"http:test_bridge">>
|
||||||
|
, bridge_type := http
|
||||||
|
, is_connected := _
|
||||||
|
, node := _
|
||||||
|
, <<"egress_channels">> := #{
|
||||||
|
<<"a">> := #{<<"path">> := ?PATH1}
|
||||||
|
}
|
||||||
|
}, Bridge),
|
||||||
|
|
||||||
|
%% update the request-path of the bridge
|
||||||
|
{200, [Bridge2]} = emqx_bridge_api:crud_bridges_cluster(put,
|
||||||
|
#{ bindings => #{id => <<"http:test_bridge">>}
|
||||||
|
, body => ?HTTP_BRIDGE(?PATH2)
|
||||||
|
}),
|
||||||
|
?assertMatch(#{ id := <<"http:test_bridge">>
|
||||||
|
, bridge_type := http
|
||||||
|
, is_connected := _
|
||||||
|
, <<"egress_channels">> := #{
|
||||||
|
<<"a">> := #{<<"path">> := ?PATH2}
|
||||||
|
}
|
||||||
|
}, Bridge2),
|
||||||
|
|
||||||
|
%% list all bridges again, assert Bridge2 is in it
|
||||||
|
{200, [Bridge2]} = emqx_bridge_api:list_bridges(get, #{}),
|
||||||
|
|
||||||
|
%% delete teh bridge
|
||||||
|
{200} = emqx_bridge_api:crud_bridges_cluster(delete,
|
||||||
|
#{ bindings => #{id => <<"http:test_bridge">>}
|
||||||
|
}),
|
||||||
|
{200, []} = emqx_bridge_api:list_bridges(get, #{}),
|
||||||
|
ok.
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
epgsql,
|
epgsql,
|
||||||
mysql,
|
mysql,
|
||||||
mongodb,
|
mongodb,
|
||||||
|
ehttpc,
|
||||||
emqx,
|
emqx,
|
||||||
emqtt
|
emqtt
|
||||||
]},
|
]},
|
||||||
|
|
|
@ -38,7 +38,9 @@
|
||||||
, fields/1
|
, fields/1
|
||||||
, validations/0]).
|
, validations/0]).
|
||||||
|
|
||||||
-export([ check_ssl_opts/2 ]).
|
-export([ check_ssl_opts/2
|
||||||
|
, preprocess_request/4
|
||||||
|
]).
|
||||||
|
|
||||||
-type connect_timeout() :: emqx_schema:duration() | infinity.
|
-type connect_timeout() :: emqx_schema:duration() | infinity.
|
||||||
-type pool_type() :: random | hash.
|
-type pool_type() :: random | hash.
|
||||||
|
@ -50,23 +52,7 @@
|
||||||
%%=====================================================================
|
%%=====================================================================
|
||||||
%% Hocon schema
|
%% Hocon schema
|
||||||
roots() ->
|
roots() ->
|
||||||
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
|
fields(config).
|
||||||
|
|
||||||
fields("http_request") ->
|
|
||||||
[ {subscribe_local_topic, hoconsc:mk(binary())}
|
|
||||||
, {method, hoconsc:mk(method(), #{default => post})}
|
|
||||||
, {path, hoconsc:mk(binary(), #{default => <<"">>})}
|
|
||||||
, {headers, hoconsc:mk(map(),
|
|
||||||
#{default => #{
|
|
||||||
<<"accept">> => <<"application/json">>,
|
|
||||||
<<"cache-control">> => <<"no-cache">>,
|
|
||||||
<<"connection">> => <<"keep-alive">>,
|
|
||||||
<<"content-type">> => <<"application/json">>,
|
|
||||||
<<"keep-alive">> => <<"timeout=5">>}})
|
|
||||||
}
|
|
||||||
, {body, hoconsc:mk(binary(), #{default => <<"${payload}">>})}
|
|
||||||
, {request_timeout, hoconsc:mk(emqx_schema:duration_ms(), #{default => <<"30s">>})}
|
|
||||||
];
|
|
||||||
|
|
||||||
fields(config) ->
|
fields(config) ->
|
||||||
[ {base_url, fun base_url/1}
|
[ {base_url, fun base_url/1}
|
||||||
|
@ -76,11 +62,9 @@ fields(config) ->
|
||||||
, {pool_type, fun pool_type/1}
|
, {pool_type, fun pool_type/1}
|
||||||
, {pool_size, fun pool_size/1}
|
, {pool_size, fun pool_size/1}
|
||||||
, {enable_pipelining, fun enable_pipelining/1}
|
, {enable_pipelining, fun enable_pipelining/1}
|
||||||
|
, {preprocessed_request, hoconsc:mk(map())}
|
||||||
] ++ emqx_connector_schema_lib:ssl_fields().
|
] ++ emqx_connector_schema_lib:ssl_fields().
|
||||||
|
|
||||||
method() ->
|
|
||||||
hoconsc:enum([post, put, get, delete]).
|
|
||||||
|
|
||||||
validations() ->
|
validations() ->
|
||||||
[ {check_ssl_opts, fun check_ssl_opts/1} ].
|
[ {check_ssl_opts, fun check_ssl_opts/1} ].
|
||||||
|
|
||||||
|
@ -152,8 +136,7 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
|
||||||
pool_name => PoolName,
|
pool_name => PoolName,
|
||||||
host => Host,
|
host => Host,
|
||||||
port => Port,
|
port => Port,
|
||||||
base_path => BasePath,
|
base_path => BasePath
|
||||||
channels => preproc_channels(InstId, Config)
|
|
||||||
},
|
},
|
||||||
case ehttpc_sup:start_pool(PoolName, PoolOpts) of
|
case ehttpc_sup:start_pool(PoolName, PoolOpts) of
|
||||||
{ok, _} -> {ok, State};
|
{ok, _} -> {ok, State};
|
||||||
|
@ -167,12 +150,12 @@ on_stop(InstId, #{pool_name := PoolName}) ->
|
||||||
connector => InstId}),
|
connector => InstId}),
|
||||||
ehttpc_sup:stop_pool(PoolName).
|
ehttpc_sup:stop_pool(PoolName).
|
||||||
|
|
||||||
on_query(InstId, {send_message, ChannelId, Msg}, AfterQuery, #{channels := Channels} = State) ->
|
on_query(InstId, {send_message, BridgeId, Msg}, AfterQuery, State) ->
|
||||||
case maps:find(ChannelId, Channels) of
|
case maps:find(preprocessed_request, State) of
|
||||||
error -> ?SLOG(error, #{msg => "channel not found", channel_id => ChannelId});
|
error -> ?SLOG(error, #{msg => "preprocessed_request found", bridge_id => BridgeId});
|
||||||
{ok, ChannConf} ->
|
{ok, Request} ->
|
||||||
#{method := Method, path := Path, body := Body, headers := Headers,
|
#{method := Method, path := Path, body := Body, headers := Headers,
|
||||||
request_timeout := Timeout} = proc_channel_conf(ChannConf, Msg),
|
request_timeout := Timeout} = process_request(Request, Msg),
|
||||||
on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State)
|
on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State)
|
||||||
end;
|
end;
|
||||||
on_query(InstId, {Method, Request}, AfterQuery, State) ->
|
on_query(InstId, {Method, Request}, AfterQuery, State) ->
|
||||||
|
@ -211,22 +194,8 @@ on_health_check(_InstId, #{host := Host, port := Port} = State) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
preprocess_request(Method, Path, Body, Headers) ->
|
||||||
preproc_channels(<<"bridge:", BridgeId/binary>>, Config) ->
|
#{ method => emqx_plugin_libs_rule:preproc_tmpl(bin(Method))
|
||||||
{BridgeType, BridgeName} = emqx_bridge:parse_bridge_id(BridgeId),
|
|
||||||
maps:fold(fun(ChannName, ChannConf, Acc) ->
|
|
||||||
Acc#{emqx_bridge:channel_id(BridgeType, BridgeName, egress_channels, ChannName) =>
|
|
||||||
preproc_channel_conf(ChannConf)}
|
|
||||||
end, #{}, maps:get(egress_channels, Config, #{}));
|
|
||||||
preproc_channels(_InstId, _Config) ->
|
|
||||||
#{}.
|
|
||||||
|
|
||||||
preproc_channel_conf(#{
|
|
||||||
method := Method,
|
|
||||||
path := Path,
|
|
||||||
body := Body,
|
|
||||||
headers := Headers} = Conf) ->
|
|
||||||
Conf#{ method => emqx_plugin_libs_rule:preproc_tmpl(bin(Method))
|
|
||||||
, path => emqx_plugin_libs_rule:preproc_tmpl(Path)
|
, path => emqx_plugin_libs_rule:preproc_tmpl(Path)
|
||||||
, body => emqx_plugin_libs_rule:preproc_tmpl(Body)
|
, body => emqx_plugin_libs_rule:preproc_tmpl(Body)
|
||||||
, headers => preproc_headers(Headers)
|
, headers => preproc_headers(Headers)
|
||||||
|
@ -238,7 +207,7 @@ preproc_headers(Headers) ->
|
||||||
emqx_plugin_libs_rule:preproc_tmpl(bin(V))}
|
emqx_plugin_libs_rule:preproc_tmpl(bin(V))}
|
||||||
end, #{}, Headers).
|
end, #{}, Headers).
|
||||||
|
|
||||||
proc_channel_conf(#{
|
process_request(#{
|
||||||
method := MethodTks,
|
method := MethodTks,
|
||||||
path := PathTks,
|
path := PathTks,
|
||||||
body := BodyTks,
|
body := BodyTks,
|
||||||
|
@ -264,7 +233,7 @@ check_ssl_opts(Conf) ->
|
||||||
check_ssl_opts("base_url", Conf).
|
check_ssl_opts("base_url", Conf).
|
||||||
|
|
||||||
check_ssl_opts(URLFrom, Conf) ->
|
check_ssl_opts(URLFrom, Conf) ->
|
||||||
#{schema := Scheme} = hocon_schema:get_value(URLFrom, Conf),
|
#{scheme := Scheme} = hocon_schema:get_value(URLFrom, Conf),
|
||||||
SSL= hocon_schema:get_value("ssl", Conf),
|
SSL= hocon_schema:get_value("ssl", Conf),
|
||||||
case {Scheme, maps:get(enable, SSL, false)} of
|
case {Scheme, maps:get(enable, SSL, false)} of
|
||||||
{http, false} -> true;
|
{http, false} -> true;
|
||||||
|
|
Loading…
Reference in New Issue