Merge pull request #5968 from terry-xiaoyu/new_bridge_config_structs

refactor(bridge): the configs for http bridges
This commit is contained in:
Shawn 2021-11-22 19:54:42 +08:00 committed by GitHub
commit c137c2eab5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1971 additions and 668 deletions

View File

@ -79,8 +79,8 @@
]).
%% proxy callback
-export([ pre_config_update/2
, post_config_update/4
-export([ pre_config_update/3
, post_config_update/5
]).
-export_type([ authenticator_id/0
@ -238,11 +238,11 @@ get_enabled(Authenticators) ->
%% APIs
%%------------------------------------------------------------------------------
pre_config_update(UpdateReq, OldConfig) ->
emqx_authentication_config:pre_config_update(UpdateReq, OldConfig).
pre_config_update(Path, UpdateReq, OldConfig) ->
emqx_authentication_config:pre_config_update(Path, UpdateReq, OldConfig).
post_config_update(UpdateReq, NewConfig, OldConfig, AppEnvs) ->
emqx_authentication_config:post_config_update(UpdateReq, NewConfig, OldConfig, AppEnvs).
post_config_update(Path, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
emqx_authentication_config:post_config_update(Path, UpdateReq, NewConfig, OldConfig, AppEnvs).
%% @doc Get all registered authentication providers.
get_providers() ->

View File

@ -19,8 +19,8 @@
-behaviour(emqx_config_handler).
-export([ pre_config_update/2
, post_config_update/4
-export([ pre_config_update/3
, post_config_update/5
]).
-export([ authenticator_id/1
@ -53,9 +53,9 @@
%% Callbacks of config handler
%%------------------------------------------------------------------------------
-spec pre_config_update(update_request(), emqx_config:raw_config())
-spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config())
-> {ok, map() | list()} | {error, term()}.
pre_config_update(UpdateReq, OldConfig) ->
pre_config_update(_, UpdateReq, OldConfig) ->
try do_pre_config_update(UpdateReq, to_list(OldConfig)) of
{error, Reason} -> {error, Reason};
{ok, NewConfig} -> {ok, return_map(NewConfig)}
@ -102,9 +102,9 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}
end
end.
-spec post_config_update(update_request(), map() | list(), emqx_config:raw_config(), emqx_config:app_envs())
-spec post_config_update(list(atom()), update_request(), map() | list(), emqx_config:raw_config(), emqx_config:app_envs())
-> ok | {ok, map()} | {error, term()}.
post_config_update(UpdateReq, NewConfig, OldConfig, AppEnvs) ->
post_config_update(_, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
do_post_config_update(UpdateReq, check_configs(to_list(NewConfig)), OldConfig, AppEnvs).
do_post_config_update({create_authenticator, ChainName, Config}, _NewConfig, _OldConfig, _AppEnvs) ->

View File

@ -45,14 +45,14 @@
-type handler_name() :: module().
-type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}.
-optional_callbacks([ pre_config_update/2
, post_config_update/4
-optional_callbacks([ pre_config_update/3
, post_config_update/5
]).
-callback pre_config_update(emqx_config:update_request(), emqx_config:raw_config()) ->
-callback pre_config_update([atom()], emqx_config:update_request(), emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
-callback post_config_update(emqx_config:update_request(), emqx_config:config(),
-callback post_config_update([atom()], emqx_config:update_request(), emqx_config:config(),
emqx_config:config(), emqx_config:app_envs()) ->
ok | {ok, Result::any()} | {error, Reason::term()}.
@ -181,14 +181,20 @@ process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
Error -> Error
end.
do_update_config([], Handlers, OldRawConf, UpdateReq) ->
call_pre_config_update(Handlers, OldRawConf, UpdateReq);
do_update_config([ConfKey | ConfKeyPath], Handlers, OldRawConf, UpdateReq) ->
do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) ->
do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, []).
do_update_config([], Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath);
do_update_config([ConfKey | SubConfKeyPath], Handlers, OldRawConf,
UpdateReq, ConfKeyPath0) ->
ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
SubOldRawConf = get_sub_config(bin(ConfKey), OldRawConf),
SubHandlers = get_sub_handlers(ConfKey, Handlers),
case do_update_config(ConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq) of
case do_update_config(SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ConfKeyPath) of
{ok, NewUpdateReq} ->
call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq});
call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq},
ConfKeyPath);
Error ->
Error
end.
@ -211,18 +217,25 @@ check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, Override
Error -> Error
end.
do_post_config_update([], Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result) ->
call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, up_req(UpdateArgs), Result);
do_post_config_update([ConfKey | ConfKeyPath], Handlers, OldConf, NewConf, AppEnvs, UpdateArgs,
Result) ->
do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result) ->
do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs,
Result, []).
do_post_config_update([], Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result,
ConfKeyPath) ->
call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, up_req(UpdateArgs),
Result, ConfKeyPath);
do_post_config_update([ConfKey | SubConfKeyPath], Handlers, OldConf, NewConf, AppEnvs,
UpdateArgs, Result, ConfKeyPath0) ->
ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
SubOldConf = get_sub_config(ConfKey, OldConf),
SubNewConf = get_sub_config(ConfKey, NewConf),
SubHandlers = get_sub_handlers(ConfKey, Handlers),
case do_post_config_update(ConfKeyPath, SubHandlers, SubOldConf, SubNewConf, AppEnvs,
UpdateArgs, Result) of
case do_post_config_update(SubConfKeyPath, SubHandlers, SubOldConf, SubNewConf, AppEnvs,
UpdateArgs, Result, ConfKeyPath) of
{ok, Result1} ->
call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, up_req(UpdateArgs),
Result1);
Result1, ConfKeyPath);
Error -> Error
end.
@ -237,22 +250,23 @@ get_sub_config(ConfKey, Conf) when is_map(Conf) ->
get_sub_config(_, _Conf) -> %% the Conf is a primitive
undefined.
call_pre_config_update(Handlers, OldRawConf, UpdateReq) ->
call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
HandlerName = maps:get(?MOD, Handlers, undefined),
case erlang:function_exported(HandlerName, pre_config_update, 2) of
case erlang:function_exported(HandlerName, pre_config_update, 3) of
true ->
case HandlerName:pre_config_update(UpdateReq, OldRawConf) of
case HandlerName:pre_config_update(ConfKeyPath, UpdateReq, OldRawConf) of
{ok, NewUpdateReq} -> {ok, NewUpdateReq};
{error, Reason} -> {error, {pre_config_update, HandlerName, Reason}}
end;
false -> merge_to_old_config(UpdateReq, OldRawConf)
end.
call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result) ->
call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result, ConfKeyPath) ->
HandlerName = maps:get(?MOD, Handlers, undefined),
case erlang:function_exported(HandlerName, post_config_update, 4) of
case erlang:function_exported(HandlerName, post_config_update, 5) of
true ->
case HandlerName:post_config_update(UpdateReq, NewConf, OldConf, AppEnvs) of
case HandlerName:post_config_update(ConfKeyPath, UpdateReq, NewConf, OldConf,
AppEnvs) of
ok -> {ok, Result};
{ok, Result1} ->
{ok, Result#{HandlerName => Result1}};

View File

@ -46,7 +46,7 @@
, parse_listener_id/1
]).
-export([post_config_update/4]).
-export([post_config_update/5]).
-define(CONF_KEY_PATH, [listeners]).
-define(TYPES_STRING, ["tcp","ssl","ws","wss","quic"]).
@ -272,7 +272,7 @@ delete_authentication(Type, ListenerName, _Conf) ->
emqx_authentication:delete_chain(listener_id(Type, ListenerName)).
%% Update the listeners at runtime
post_config_update(_Req, NewListeners, OldListeners, _AppEnvs) ->
post_config_update(_, _Req, NewListeners, OldListeners, _AppEnvs) ->
#{added := Added, removed := Removed, changed := Updated}
= diff_listeners(NewListeners, OldListeners),
perform_listener_changes(fun stop_listener/3, Removed),

View File

@ -70,7 +70,7 @@
, stop_log_handler/1
]).
-export([post_config_update/4]).
-export([post_config_update/5]).
-type(peername_str() :: list()).
-type(logger_dst() :: file:filename() | console | unknown).
@ -123,7 +123,7 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%% emqx_config_handler callbacks
%%--------------------------------------------------------------------
post_config_update(_Req, _NewConf, _OldConf, AppEnvs) ->
post_config_update(_, _Req, _NewConf, _OldConf, AppEnvs) ->
gen_server:call(?MODULE, {update_config, AppEnvs}, 5000).
%%--------------------------------------------------------------------

View File

@ -292,6 +292,7 @@ subscribers(Group, Topic) ->
%%--------------------------------------------------------------------
init([]) ->
ok = mria:wait_for_tables([?TAB]),
{ok, _} = mnesia:subscribe({table, ?TAB, simple}),
{atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun init_monitors/0),
ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),

View File

@ -896,7 +896,7 @@ serialize_error({bad_ssl_config, Details}) ->
message => binfmt("bad_ssl_config ~p", [Details])}};
serialize_error({missing_parameter, Detail}) ->
{400, #{code => <<"MISSING_PARAMETER">>,
message => binfmt("Missing required parameter", [Detail])}};
message => binfmt("Missing required parameter: ~p", [Detail])}};
serialize_error({invalid_parameter, Name}) ->
{400, #{code => <<"INVALID_PARAMETER">>,
message => binfmt("Invalid value for '~p'", [Name])}};

View File

@ -136,12 +136,10 @@ test_authenticators(PathPrefix) ->
test_authenticator(PathPrefix) ->
ValidConfig0 = emqx_authn_test_lib:http_example(),
{ok, 200, _} = request(
post,
uri(PathPrefix ++ ["authentication"]),
ValidConfig0),
{ok, 200, _} = request(
get,
uri(PathPrefix ++ ["authentication", "password-based:http"])),
@ -262,9 +260,7 @@ test_authenticator_user(PathPrefix) ->
fun(UserUpdate) -> {ok, 200, _} = request(put, UsersUri ++ "/u1", UserUpdate) end,
ValidUserUpdates),
InvalidUserUpdates = [
#{user_id => <<"u1">>, password => <<"p1">>},
#{is_superuser => true}],
InvalidUserUpdates = [#{user_id => <<"u1">>, password => <<"p1">>}],
lists:foreach(
fun(UserUpdate) -> {ok, 400, _} = request(put, UsersUri ++ "/u1", UserUpdate) end,

View File

@ -36,7 +36,7 @@
, authorize/5
]).
-export([post_config_update/4, pre_config_update/2]).
-export([post_config_update/5, pre_config_update/3]).
-export([acl_conf_file/0]).
@ -127,13 +127,13 @@ do_update({_, Sources}, _Conf) when is_list(Sources)->
do_update({Op, Sources}, Conf) ->
error({bad_request, #{op => Op, sources => Sources, conf => Conf}}).
pre_config_update(Cmd, Conf) ->
pre_config_update(_, Cmd, Conf) ->
{ok, do_update(Cmd, Conf)}.
post_config_update(_, undefined, _Conf, _AppEnvs) ->
post_config_update(_, _, undefined, _Conf, _AppEnvs) ->
ok;
post_config_update(Cmd, NewSources, _OldSource, _AppEnvs) ->
post_config_update(_, Cmd, NewSources, _OldSource, _AppEnvs) ->
ok = do_post_update(Cmd, NewSources),
ok = emqx_authz_cache:drain_cache().

View File

@ -2,56 +2,39 @@
## EMQ X Bridge
##--------------------------------------------------------------------
#bridges.mqtt.my_mqtt_bridge_to_aws {
# server = "127.0.0.1:1883"
# proto_ver = "v4"
# username = "username1"
# password = ""
# clean_start = true
# keepalive = 300
# retry_interval = "30s"
# max_inflight = 32
# reconnect_interval = "30s"
# bridge_mode = true
# replayq {
# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
# seg_bytes = "100MB"
# offload = false
# max_total_bytes = "1GB"
# }
# ssl {
# enable = false
# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
# ## We will create one MQTT connection for each element of the `ingress_channels`
# ## Syntax: ingress_channels.<id>
# ingress_channels.pull_msgs_from_aws {
# subscribe_remote_topic = "aws/#"
## MQTT bridges to/from another MQTT broker
#bridges.mqtt.my_ingress_mqtt_bridge {
# connector = "mqtt:my_mqtt_connector"
# direction = ingress
# ## topic mappings for this bridge
# from_remote_topic = "aws/#"
# subscribe_qos = 1
# local_topic = "from_aws/${topic}"
# to_local_topic = "from_aws/${topic}"
# payload = "${payload}"
# qos = "${qos}"
# retain = "${retain}"
#}
# ## We will create one MQTT connection for each element of the `egress_channels`
# ## Syntax: egress_channels.<id>
# egress_channels.push_msgs_to_aws {
# subscribe_local_topic = "emqx/#"
# remote_topic = "from_emqx/${topic}"
#
#bridges.mqtt.my_egress_mqtt_bridge {
# connector = "mqtt:my_mqtt_connector"
# direction = egress
# ## topic mappings for this bridge
# from_local_topic = "emqx/#"
# to_remote_topic = "from_emqx/${topic}"
# payload = "${payload}"
# qos = 1
# retain = false
#}
#}
#
## HTTP bridges to an HTTP server
#bridges.http.my_http_bridge {
# base_url: "http://localhost:9901"
# connect_timeout: "30s"
# max_retries: 3
# ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url
# url = "http://localhost:9901/messages/${topic}"
# request_timeout = "30s"
# connect_timeout = "30s"
# max_retries = 3
# retry_interval = "10s"
# pool_type = "hash"
# pool_type = "random"
# pool_size = 4
# enable_pipelining = true
# ssl {
@ -60,15 +43,13 @@
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
# egress_channels.post_messages {
# subscribe_local_topic = "emqx_http/#"
# request_timeout: "30s"
# ## following config entries can use placehodler variables
#
# from_local_topic = "emqx_http/#"
# ## the following config entries can use placehodler variables:
# ## url, method, body, headers
# method = post
# path = "/messages/${topic}"
# body = "${payload}"
# headers {
# "content-type": "application/json"
# }
#}
#}

View File

@ -18,51 +18,58 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-export([post_config_update/4]).
-export([post_config_update/5]).
-export([reload_hook/0, unload_hook/0]).
-export([ load_hook/0
, reload_hook/0
, unload_hook/0
]).
-export([on_message_publish/1]).
-export([ load_bridges/0
, get_bridge/2
, get_bridge/3
, list_bridges/0
, create_bridge/3
, remove_bridge/3
, update_bridge/3
, start_bridge/2
, stop_bridge/2
, restart_bridge/2
, send_message/2
]).
-export([ config_key_path/0
]).
-export([ resource_type/1
, bridge_type/1
, resource_id/1
, resource_id/2
, parse_bridge_id/1
, channel_id/4
, parse_channel_id/1
]).
-export([ load/0
, lookup/2
, lookup/3
, list/0
, create/3
, recreate/2
, recreate/3
, create_dry_run/2
, remove/3
, update/3
, start/2
, stop/2
, restart/2
]).
-export([ send_message/2
]).
-export([ config_key_path/0
]).
reload_hook() ->
unload_hook(),
Bridges = emqx_conf:get([bridges], #{}),
load_hook().
load_hook() ->
Bridges = emqx:get_config([bridges], #{}),
lists:foreach(fun({_Type, Bridge}) ->
lists:foreach(fun({_Name, BridgeConf}) ->
load_hook(BridgeConf)
end, maps:to_list(Bridge))
end, maps:to_list(Bridges)).
load_hook(#{egress_channels := Channels}) ->
case has_subscribe_local_topic(Channels) of
true -> ok;
false -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []})
end;
load_hook(#{from_local_topic := _}) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
ok;
load_hook(_Conf) -> ok.
unload_hook() ->
@ -71,40 +78,39 @@ unload_hook() ->
on_message_publish(Message = #message{topic = Topic, flags = Flags}) ->
case maps:get(sys, Flags, false) of
false ->
ChannelIds = get_matched_channels(Topic),
lists:foreach(fun(ChannelId) ->
send_message(ChannelId, emqx_message:to_map(Message))
end, ChannelIds);
lists:foreach(fun (Id) ->
send_message(Id, emqx_rule_events:eventmsg_publish(Message))
end, get_matched_bridges(Topic));
true -> ok
end,
{ok, Message}.
%% TODO: remove this clause, treat mqtt bridges the same as other bridges
send_message(ChannelId, Message) ->
{BridgeType, BridgeName, _, _} = parse_channel_id(ChannelId),
send_message(BridgeId, Message) ->
{BridgeType, BridgeName} = parse_bridge_id(BridgeId),
ResId = emqx_bridge:resource_id(BridgeType, BridgeName),
do_send_message(ResId, ChannelId, Message).
do_send_message(ResId, ChannelId, Message) ->
emqx_resource:query(ResId, {send_message, ChannelId, Message}).
emqx_resource:query(ResId, {send_message, Message}).
config_key_path() ->
[bridges].
resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
resource_type(mqtt) -> emqx_connector_mqtt;
resource_type(<<"http">>) -> emqx_connector_http;
resource_type(http) -> emqx_connector_http.
bridge_type(emqx_connector_mqtt) -> mqtt;
bridge_type(emqx_connector_http) -> http.
post_config_update(_Req, NewConf, OldConf, _AppEnv) ->
post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated}
= diff_confs(NewConf, OldConf),
perform_bridge_changes([
{fun remove_bridge/3, Removed},
{fun create_bridge/3, Added},
{fun update_bridge/3, Updated}
]).
Result = perform_bridge_changes([
{fun remove/3, Removed},
{fun create/3, Added},
{fun update/3, Updated}
]),
ok = reload_hook(),
Result.
perform_bridge_changes(Tasks) ->
perform_bridge_changes(Tasks, ok).
@ -123,8 +129,8 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) ->
end, Result0, MapConfs),
perform_bridge_changes(Tasks, Result).
load_bridges() ->
Bridges = emqx_conf:get([bridges], #{}),
load() ->
Bridges = emqx:get_config([bridges], #{}),
emqx_bridge_monitor:ensure_all_started(Bridges).
resource_id(BridgeId) when is_binary(BridgeId) ->
@ -145,55 +151,41 @@ parse_bridge_id(BridgeId) ->
_ -> error({invalid_bridge_id, BridgeId})
end.
channel_id(BridgeType, BridgeName, ChannelType, ChannelName) ->
BType = bin(BridgeType),
BName = bin(BridgeName),
CType = bin(ChannelType),
CName = bin(ChannelName),
<<BType/binary, ":", BName/binary, ":", CType/binary, ":", CName/binary>>.
parse_channel_id(ChannelId) ->
case string:split(bin(ChannelId), ":", all) of
[BridgeType, BridgeName, ChannelType, ChannelName] ->
{BridgeType, BridgeName, ChannelType, ChannelName};
_ -> error({invalid_bridge_id, ChannelId})
end.
list_bridges() ->
list() ->
lists:foldl(fun({Type, NameAndConf}, Bridges) ->
lists:foldl(fun({Name, RawConf}, Acc) ->
case get_bridge(Type, Name, RawConf) of
case lookup(Type, Name, RawConf) of
{error, not_found} -> Acc;
{ok, Res} -> [Res | Acc]
end
end, Bridges, maps:to_list(NameAndConf))
end, [], maps:to_list(emqx:get_raw_config([bridges]))).
end, [], maps:to_list(emqx:get_raw_config([bridges], #{}))).
get_bridge(Type, Name) ->
lookup(Type, Name) ->
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
get_bridge(Type, Name, RawConf).
get_bridge(Type, Name, RawConf) ->
lookup(Type, Name, RawConf).
lookup(Type, Name, RawConf) ->
case emqx_resource:get_instance(resource_id(Type, Name)) of
{error, not_found} -> {error, not_found};
{ok, Data} -> {ok, #{id => bridge_id(Type, Name), resource_data => Data,
raw_config => RawConf}}
end.
start_bridge(Type, Name) ->
restart_bridge(Type, Name).
start(Type, Name) ->
restart(Type, Name).
stop_bridge(Type, Name) ->
stop(Type, Name) ->
emqx_resource:stop(resource_id(Type, Name)).
restart_bridge(Type, Name) ->
restart(Type, Name) ->
emqx_resource:restart(resource_id(Type, Name)).
create_bridge(Type, Name, Conf) ->
create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}),
ResId = resource_id(Type, Name),
case emqx_resource:create(ResId,
emqx_bridge:resource_type(Type), Conf) of
case emqx_resource:create_local(ResId,
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf)) of
{ok, already_created} ->
emqx_resource:get_instance(ResId);
{ok, Data} ->
@ -202,23 +194,38 @@ create_bridge(Type, Name, Conf) ->
{error, Reason}
end.
update_bridge(Type, Name, {_OldConf, Conf}) ->
update(Type, Name, {_OldConf, Conf}) ->
%% TODO: sometimes its not necessary to restart the bridge connection.
%%
%% - if the connection related configs like `username` is updated, we should restart/start
%% - if the connection related configs like `servers` is updated, we should restart/start
%% or stop bridges according to the change.
%% - if the connection related configs are not update, but channel configs `ingress_channels` or
%% `egress_channels` are changed, then we should not restart the bridge, we only restart/start
%% the channels.
%% - if the connection related configs are not update, only non-connection configs like
%% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated
%% without restarting the bridge.
%%
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
config => Conf}),
emqx_resource:recreate(resource_id(Type, Name),
emqx_bridge:resource_type(Type), Conf, []).
recreate(Type, Name, Conf).
remove_bridge(Type, Name, _Conf) ->
recreate(Type, Name) ->
recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])).
recreate(Type, Name, Conf) ->
emqx_resource:recreate_local(resource_id(Type, Name),
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []).
create_dry_run(Type, Conf) ->
Conf0 = Conf#{<<"ingress">> => #{<<"from_remote_topic">> => <<"t">>}},
case emqx_resource:check_config(emqx_bridge:resource_type(Type), Conf0) of
{ok, Conf1} ->
emqx_resource:create_dry_run_local(emqx_bridge:resource_type(Type), Conf1);
{error, _} = Error ->
Error
end.
remove(Type, Name, _Conf) ->
?SLOG(info, #{msg => "remove bridge", type => Type, name => Name}),
case emqx_resource:remove(resource_id(Type, Name)) of
case emqx_resource:remove_local(resource_id(Type, Name)) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
@ -238,34 +245,83 @@ flatten_confs(Conf0) ->
do_flatten_confs(Type, Conf0) ->
[{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)].
has_subscribe_local_topic(Channels) ->
lists:any(fun (#{subscribe_local_topic := _}) -> true;
(_) -> false
end, maps:to_list(Channels)).
get_matched_channels(Topic) ->
Bridges = emqx_conf:get([bridges], #{}),
get_matched_bridges(Topic) ->
Bridges = emqx:get_config([bridges], #{}),
maps:fold(fun (BType, Conf, Acc0) ->
maps:fold(fun
%% TODO: also trigger 'message.publish' for mqtt bridges.
(mqtt, _Conf, Acc0) -> Acc0;
(BType, Conf, Acc0) ->
maps:fold(fun
(BName, #{egress_channels := Channels}, Acc1) ->
do_get_matched_channels(Topic, Channels, BType, BName, egress_channels)
++ Acc1;
(_Name, _BridgeConf, Acc1) -> Acc1
%% Confs for MQTT, Kafka bridges have the `direction` flag
(_BName, #{direction := ingress}, Acc1) ->
Acc1;
(BName, #{direction := egress} = Egress, Acc1) ->
get_matched_bridge_id(Egress, Topic, BType, BName, Acc1);
%% HTTP, MySQL bridges only have egress direction
(BName, BridgeConf, Acc1) ->
get_matched_bridge_id(BridgeConf, Topic, BType, BName, Acc1)
end, Acc0, Conf)
end, [], Bridges).
do_get_matched_channels(Topic, Channels, BType, BName, CType) ->
maps:fold(fun
(ChannName, #{subscribe_local_topic := Filter}, Acc) ->
get_matched_bridge_id(#{from_local_topic := Filter}, Topic, BType, BName, Acc) ->
case emqx_topic:match(Topic, Filter) of
true -> [channel_id(BType, BName, CType, ChannName) | Acc];
true -> [bridge_id(BType, BName) | Acc];
false -> Acc
end.
parse_confs(http, _Name,
#{ url := Url
, method := Method
, body := Body
, headers := Headers
, request_timeout := ReqTimeout
} = Conf) ->
{BaseUrl, Path} = parse_url(Url),
{ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl),
Conf#{ base_url => BaseUrl2
, request =>
#{ path => Path
, method => Method
, body => Body
, headers => Headers
, request_timeout => ReqTimeout
}
};
parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf)
when is_binary(ConnId) ->
case emqx_connector:parse_connector_id(ConnId) of
{Type, ConnName} ->
ConnectorConfs = emqx:get_config([connectors, Type, ConnName]),
make_resource_confs(Direction, ConnectorConfs,
maps:without([connector, direction], Conf), Name);
{_ConnType, _ConnName} ->
error({cannot_use_connector_with_different_type, ConnId})
end;
(_ChannName, _ChannConf, Acc) -> Acc
end, [], Channels).
parse_confs(_Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf)
when is_map(ConnectorConfs) ->
make_resource_confs(Direction, ConnectorConfs,
maps:without([connector, direction], Conf), Name).
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Name) ->
BName = bin(Name),
ConnectorConfs#{
ingress => BridgeConf#{hookpoint => <<"$bridges/", BName/binary>>}
};
make_resource_confs(egress, ConnectorConfs, BridgeConf, _Name) ->
ConnectorConfs#{
egress => BridgeConf
}.
parse_url(Url) ->
case string:split(Url, "//", leading) of
[Scheme, UrlRem] ->
case string:split(UrlRem, "/", leading) of
[HostPort, Path] ->
{iolist_to_binary([Scheme, "//", HostPort]), Path};
[HostPort] ->
{iolist_to_binary([Scheme, "//", HostPort]), <<>>}
end;
[Url] ->
error({invalid_url, Url})
end.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);

View File

@ -19,23 +19,40 @@
-export([api_spec/0]).
-export([ list_bridges/2
-export([ list_create_bridges_in_cluster/2
, list_local_bridges/1
, crud_bridges_cluster/2
, crud_bridges/3
, crud_bridges_in_cluster/2
, manage_bridges/2
, lookup_from_local_node/2
]).
-define(TYPES, [mqtt]).
-define(BRIDGE(N, T, C), #{<<"id">> => N, <<"type">> => T, <<"config">> => C}).
-define(TYPES, [mqtt, http]).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge:parse_bridge_id(Id) of
{BridgeType, BridgeName} -> EXPR
catch
error:{invalid_bridge_id, Id0} ->
{400, #{code => 102, message => <<"invalid_bridge_id: ", Id0/binary>>}}
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
". Bridge Ids must be of format <bridge_type>:<name>">>}}
end).
-define(METRICS(SUCC, FAILED, RATE, RATE_5, RATE_MAX),
#{
success => SUCC,
failed => FAILED,
rate => RATE,
rate_last5m => RATE_5,
rate_max => RATE_MAX
}).
-define(metrics(SUCC, FAILED, RATE, RATE_5, RATE_MAX),
#{
success := SUCC,
failed := FAILED,
rate := RATE,
rate_last5m := RATE_5,
rate_max := RATE_MAX
}).
req_schema() ->
Schema = [
case maps:to_list(emqx:get_raw_config([bridges, T], #{})) of
@ -47,14 +64,50 @@ req_schema() ->
|| T <- ?TYPES],
#{'oneOf' => Schema}.
node_schema() ->
#{type => string, example => "emqx@127.0.0.1"}.
status_schema() ->
#{type => string, enum => [connected, disconnected]}.
metrics_schema() ->
#{ type => object
, properties => #{
success => #{type => integer, example => "0"},
failed => #{type => integer, example => "0"},
rate => #{type => number, format => float, example => "0.0"},
rate_last5m => #{type => number, format => float, example => "0.0"},
rate_max => #{type => number, format => float, example => "0.0"}
}
}.
per_node_schema(Key, Schema) ->
#{
type => array,
items => #{
type => object,
properties => #{
node => node_schema(),
Key => Schema
}
}
}.
resp_schema() ->
#{'oneOf' := Schema} = req_schema(),
AddMetadata = fun(Prop) ->
Prop#{is_connected => #{type => boolean},
id => #{type => string},
Prop#{status => status_schema(),
node_status => per_node_schema(status, status_schema()),
metrics => metrics_schema(),
node_metrics => per_node_schema(metrics, metrics_schema()),
id => #{type => string, example => "http:my_http_bridge"},
bridge_type => #{type => string, enum => ?TYPES},
node => #{type => string}}
node => node_schema()
}
end,
more_props_resp_schema(AddMetadata).
more_props_resp_schema(AddMetadata) ->
#{oneOf := Schema} = req_schema(),
Schema1 = [S#{properties => AddMetadata(Prop)}
|| S = #{properties := Prop} <- Schema],
#{'oneOf' => Schema1}.
@ -66,6 +119,10 @@ bridge_apis() ->
[list_all_bridges_api(), crud_bridges_apis(), operation_apis()].
list_all_bridges_api() ->
ReqSchema = more_props_resp_schema(fun(Prop) ->
Prop#{id => #{type => string, required => true}}
end),
RespSchema = resp_schema(),
Metadata = #{
get => #{
description => <<"List all created bridges">>,
@ -73,9 +130,18 @@ list_all_bridges_api() ->
<<"200">> => emqx_mgmt_util:array_schema(resp_schema(),
<<"A list of the bridges">>)
}
},
post => #{
description => <<"Create a new bridge">>,
'requestBody' => emqx_mgmt_util:schema(ReqSchema),
responses => #{
<<"201">> => emqx_mgmt_util:schema(RespSchema, <<"Bridge created">>),
<<"400">> => emqx_mgmt_util:error_schema(<<"Create bridge failed">>,
['UPDATE_FAILED'])
}
}
},
{"/bridges/", Metadata, list_bridges}.
{"/bridges/", Metadata, list_create_bridges_in_cluster}.
crud_bridges_apis() ->
ReqSchema = req_schema(),
@ -91,7 +157,7 @@ crud_bridges_apis() ->
}
},
put => #{
description => <<"Create or update a bridge">>,
description => <<"Update a bridge">>,
parameters => [param_path_id()],
'requestBody' => emqx_mgmt_util:schema(ReqSchema),
responses => #{
@ -109,7 +175,7 @@ crud_bridges_apis() ->
}
}
},
{"/bridges/:id", Metadata, crud_bridges_cluster}.
{"/bridges/:id", Metadata, crud_bridges_in_cluster}.
operation_apis() ->
Metadata = #{
@ -153,62 +219,69 @@ param_path_operation()->
example => restart
}.
list_bridges(get, _Params) ->
{200, lists:append([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
list_create_bridges_in_cluster(post, #{body := #{<<"id">> := Id} = Conf}) ->
?TRY_PARSE_ID(Id,
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
{error, not_found} ->
case ensure_bridge(BridgeType, BridgeName, maps:remove(<<"id">>, Conf)) of
ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 201);
{error, Error} -> {400, Error}
end
end);
list_create_bridges_in_cluster(get, _Params) ->
{200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
list_local_bridges(Node) when Node =:= node() ->
[format_resp(Data) || Data <- emqx_bridge:list_bridges()];
[format_resp(Data) || Data <- emqx_bridge:list()];
list_local_bridges(Node) ->
rpc_call(Node, list_local_bridges, [Node]).
crud_bridges_cluster(Method, Params) ->
Results = [crud_bridges(Node, Method, Params) || Node <- mria_mnesia:running_nodes()],
case lists:filter(fun({200}) -> false; ({200, _}) -> false; (_) -> true end, Results) of
[] ->
case Results of
[{200} | _] -> {200};
_ -> {200, [Res || {200, Res} <- Results]}
crud_bridges_in_cluster(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, lookup_from_all_nodes(Id, BridgeType, BridgeName, 200));
crud_bridges_in_cluster(put, #{bindings := #{id := Id}, body := Conf}) ->
?TRY_PARSE_ID(Id,
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} ->
case ensure_bridge(BridgeType, BridgeName, Conf) of
ok -> lookup_from_all_nodes(Id, BridgeType, BridgeName, 200);
{error, Error} -> {400, Error}
end;
Errors ->
hd(Errors)
end.
crud_bridges(Node, Method, Params) when Node =/= node() ->
rpc_call(Node, crud_bridges, [Node, Method, Params]);
crud_bridges(_, get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, case emqx_bridge:get_bridge(BridgeType, BridgeName) of
{ok, Data} -> {200, format_resp(Data)};
{error, not_found} ->
{404, #{code => 102, message => <<"not_found: ", Id/binary>>}}
{404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
end);
crud_bridges(_, put, #{bindings := #{id := Id}, body := Conf}) ->
crud_bridges_in_cluster(delete, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
case emqx:update_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
#{rawconf_with_defaults => true}) of
{ok, #{raw_config := RawConf, post_config_update := #{emqx_bridge := Data}}} ->
{200, format_resp(#{id => Id, raw_config => RawConf, resource_data => Data})};
{ok, _} -> %% the bridge already exits
{ok, Data} = emqx_bridge:get_bridge(BridgeType, BridgeName),
{200, format_resp(Data)};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end);
crud_bridges(_, delete, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
case emqx:remove_config(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName]) of
{ok, _} -> {200};
case emqx_conf:remove(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
#{override_to => cluster}) of
{ok, _} -> {204};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end).
lookup_from_all_nodes(Id, BridgeType, BridgeName, SuccCode) ->
case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of
{ok, [{ok, _} | _] = Results} ->
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
{ok, [{error, not_found} | _]} ->
{404, error_msg('NOT_FOUND', <<"not_found: ", Id/binary>>)};
{error, ErrL} ->
{500, error_msg('UNKNOWN_ERROR', ErrL)}
end.
lookup_from_local_node(BridgeType, BridgeName) ->
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, Res} -> {ok, format_resp(Res)};
Error -> Error
end.
manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) ->
OperFun =
fun (<<"start">>) -> start_bridge;
(<<"stop">>) -> stop_bridge;
(<<"restart">>) -> restart_bridge
fun (<<"start">>) -> start;
(<<"stop">>) -> stop;
(<<"restart">>) -> restart
end,
?TRY_PARSE_ID(Id,
case rpc_call(binary_to_atom(Node, latin1), emqx_bridge, OperFun(Op),
@ -218,15 +291,77 @@ manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}})
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end).
ensure_bridge(BridgeType, BridgeName, Conf) ->
case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], Conf,
#{override_to => cluster}) of
{ok, _} -> ok;
{error, Reason} ->
{error, error_msg('BAD_ARG', Reason)}
end.
zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
lists:foldl(fun(#{id := Id}, Acc) ->
Bridges = pick_bridges_by_id(Id, BridgesAllNodes),
[format_bridge_info(Bridges) | Acc]
end, [], BridgesFirstNode).
pick_bridges_by_id(Id, BridgesAllNodes) ->
lists:foldl(fun(BridgesOneNode, Acc) ->
[BridgeInfo] = [Bridge || Bridge = #{id := Id0} <- BridgesOneNode, Id0 == Id],
[BridgeInfo | Acc]
end, [], BridgesAllNodes).
format_bridge_info([FirstBridge | _] = Bridges) ->
Res = maps:remove(node, FirstBridge),
NodeStatus = collect_status(Bridges),
NodeMetrics = collect_metrics(Bridges),
Res#{ status => aggregate_status(NodeStatus)
, node_status => NodeStatus
, metrics => aggregate_metrics(NodeMetrics)
, node_metrics => NodeMetrics
}.
collect_status(Bridges) ->
[maps:with([node, status], B) || B <- Bridges].
aggregate_status(AllStatus) ->
AllConnected = lists:all(fun (#{status := connected}) -> true;
(_) -> false
end, AllStatus),
case AllConnected of
true -> connected;
false -> disconnected
end.
collect_metrics(Bridges) ->
[maps:with([node, metrics], B) || B <- Bridges].
aggregate_metrics(AllMetrics) ->
InitMetrics = ?METRICS(0,0,0,0,0),
lists:foldl(fun(#{metrics := ?metrics(Succ1, Failed1, Rate1, Rate5m1, RateMax1)},
?metrics(Succ0, Failed0, Rate0, Rate5m0, RateMax0)) ->
?METRICS(Succ1 + Succ0, Failed1 + Failed0,
Rate1 + Rate0, Rate5m1 + Rate5m0, RateMax1 + RateMax0)
end, InitMetrics, AllMetrics).
format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, status := Status}}) ->
IsConnected = fun(started) -> true; (_) -> false end,
IsConnected = fun(started) -> connected; (_) -> disconnected end,
RawConf#{
id => Id,
node => node(),
bridge_type => emqx_bridge:bridge_type(Mod),
is_connected => IsConnected(Status)
status => IsConnected(Status),
metrics => ?METRICS(0,0,0,0,0)
}.
rpc_multicall(Func, Args) ->
Nodes = mria_mnesia:running_nodes(),
ResL = erpc:multicall(Nodes, ?MODULE, Func, Args, 15000),
case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
[] -> {ok, [Res || {ok, Res} <- ResL]};
ErrL -> {error, ErrL}
end.
rpc_call(Node, Fun, Args) ->
rpc_call(Node, ?MODULE, Fun, Args).
@ -237,3 +372,8 @@ rpc_call(Node, Mod, Fun, Args) ->
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.
error_msg(Code, Msg) when is_binary(Msg) ->
#{code => Code, message => Msg};
error_msg(Code, Msg) ->
#{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.

View File

@ -21,9 +21,9 @@
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_bridge_sup:start_link(),
ok = emqx_bridge:load_bridges(),
ok = emqx_bridge:reload_hook(),
emqx_conf:add_handler(emqx_bridge:config_key_path(), emqx_bridge),
ok = emqx_bridge:load(),
ok = emqx_bridge:load_hook(),
emqx_config_handler:add_handler(emqx_bridge:config_key_path(), emqx_bridge),
{ok, Sup}.
stop(_State) ->

View File

@ -67,18 +67,6 @@ code_change(_OldVsn, State, _Extra) ->
load_bridges(Configs) ->
lists:foreach(fun({Type, NamedConf}) ->
lists:foreach(fun({Name, Conf}) ->
load_bridge(Name, Type, Conf)
emqx_bridge:create(Type, Name, Conf)
end, maps:to_list(NamedConf))
end, maps:to_list(Configs)).
%% TODO: move this monitor into emqx_resource
%% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}).
load_bridge(Name, Type, Config) ->
case emqx_resource:create_local(
emqx_bridge:resource_id(Type, Name),
emqx_bridge:resource_type(Type), Config) of
{ok, already_created} -> ok;
{ok, _} -> ok;
{error, Reason} ->
error({load_bridge, Reason})
end.

View File

@ -10,16 +10,114 @@
roots() -> [bridges].
fields(bridges) ->
[ {mqtt, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "mqtt_bridge")))}
, {http, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "http_bridge")))}
[ {mqtt,
sc(hoconsc:map(name, hoconsc:union([ ref("ingress_mqtt_bridge")
, ref("egress_mqtt_bridge")
])),
#{ desc => "MQTT bridges"
})}
, {http,
sc(hoconsc:map(name, ref("http_bridge")),
#{ desc => "HTTP bridges"
})}
];
fields("mqtt_bridge") ->
emqx_connector_mqtt:fields("config");
fields("ingress_mqtt_bridge") ->
[ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
, connector_name()
] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
fields("egress_mqtt_bridge") ->
[ direction(egress, emqx_connector_mqtt_schema:egress_desc())
, connector_name()
] ++ emqx_connector_mqtt_schema:fields("egress");
fields("http_bridge") ->
emqx_connector_http:fields(config) ++ http_channels().
basic_config_http() ++
[ {url,
sc(binary(),
#{ nullable => false
, desc =>"""
The URL of the HTTP Bridge.<br>
Template with variables is allowed in the path, but variables cannot be used in the scheme, host,
or port part.<br>
For example, <code> http://localhost:9901/${topic} </code> is allowed, but
<code> http://${host}:9901/message </code> or <code> http://localhost:${port}/message </code>
is not allowed.
"""
})}
, {from_local_topic,
sc(binary(),
#{ desc =>"""
The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
match the from_local_topic will be forwarded.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
from_local_topic will be forwarded.
"""
})}
, {method,
sc(method(),
#{ default => post
, desc =>"""
The method of the HTTP request. All the available methods are: post, put, get, delete.<br>
Template with variables is allowed.<br>
"""
})}
, {headers,
sc(map(),
#{ default => #{
<<"accept">> => <<"application/json">>,
<<"cache-control">> => <<"no-cache">>,
<<"connection">> => <<"keep-alive">>,
<<"content-type">> => <<"application/json">>,
<<"keep-alive">> => <<"timeout=5">>}
, desc =>"""
The headers of the HTTP request.<br>
Template with variables is allowed.
"""
})
}
, {body,
sc(binary(),
#{ default => <<"${payload}">>
, desc =>"""
The body of the HTTP request.<br>
Template with variables is allowed.
"""
})}
, {request_timeout,
sc(emqx_schema:duration_ms(),
#{ default => <<"30s">>
, desc =>"""
How long will the HTTP request timeout.
"""
})}
].
http_channels() ->
[{egress_channels, hoconsc:mk(hoconsc:map(id,
hoconsc:ref(emqx_connector_http, "http_request")))}].
direction(Dir, Desc) ->
{direction,
sc(Dir,
#{ nullable => false
, desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.<br>" ++
Desc
})}.
connector_name() ->
{connector,
sc(binary(),
#{ nullable => false
, desc =>"""
The connector name to be used for this bridge.
Connectors are configured by 'connectors.<type>.<name>
"""
})}.
basic_config_http() ->
proplists:delete(base_url, emqx_connector_http:fields(config)).
method() ->
hoconsc:enum([post, put, get, delete]).
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).

View File

@ -0,0 +1,292 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_api_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"bridges: {}">>).
-define(TEST_ID, <<"http:test_bridge">>).
-define(URL(PORT, PATH), list_to_binary(
io_lib:format("http://localhost:~s/~s",
[integer_to_list(PORT), PATH]))).
-define(HTTP_BRIDGE(URL),
#{
<<"url">> => URL,
<<"from_local_topic">> => <<"emqx_http/#">>,
<<"method">> => <<"post">>,
<<"ssl">> => #{<<"enable">> => false},
<<"body">> => <<"${payload}">>,
<<"headers">> => #{
<<"content-type">> => <<"application/json">>
}
}).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
suite() ->
[{timetrap,{seconds,30}}].
init_per_suite(Config) ->
ok = emqx_config:put([emqx_dashboard], #{
default_username => <<"admin">>,
default_password => <<"public">>,
listeners => [#{
protocol => http,
port => 18083
}]
}),
_ = application:load(emqx_conf),
%% some testcases (may from other app) already get emqx_connector started
_ = application:stop(emqx_resource),
_ = application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps([emqx_bridge, emqx_dashboard]),
ok = emqx_config:init_load(emqx_bridge_schema, ?CONF_DEFAULT),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_dashboard]),
ok.
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_, _Config) ->
ok.
%%------------------------------------------------------------------------------
%% HTTP server for testing
%%------------------------------------------------------------------------------
start_http_server(HandleFun) ->
Parent = self(),
spawn_link(fun() ->
{Port, Sock} = listen_on_random_port(),
Parent ! {port, Port},
loop(Sock, HandleFun)
end),
receive
{port, Port} -> Port
after
2000 -> error({timeout, start_http_server})
end.
listen_on_random_port() ->
Min = 1024, Max = 65000,
Port = rand:uniform(Max - Min) + Min,
case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}]) of
{ok, Sock} -> {Port, Sock};
{error, eaddrinuse} -> listen_on_random_port()
end.
loop(Sock, HandleFun) ->
{ok, Conn} = gen_tcp:accept(Sock),
Handler = spawn(fun () -> HandleFun(Conn) end),
gen_tcp:controlling_process(Conn, Handler),
loop(Sock, HandleFun).
make_response(CodeStr, Str) ->
B = iolist_to_binary(Str),
iolist_to_binary(
io_lib:fwrite(
"HTTP/1.0 ~s\nContent-Type: text/html\nContent-Length: ~p\n\n~s",
[CodeStr, size(B), B])).
handle_fun_200_ok(Conn) ->
case gen_tcp:recv(Conn, 0) of
{ok, Request} ->
gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
self() ! {http_server, received, Request},
handle_fun_200_ok(Conn);
{error, closed} ->
gen_tcp:close(Conn)
end.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_http_crud_apis(_) ->
Port = start_http_server(fun handle_fun_200_ok/1),
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a http bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
%ct:pal("---bridge: ~p", [Bridge]),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
, <<"node_metrics">> := [_|_]
, <<"url">> := URL1
}, jsx:decode(Bridge)),
%% create a again returns an error
{ok, 400, RetMsg} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"bridge already exists">>
}, jsx:decode(RetMsg)),
%% update the request-path of the bridge
URL2 = ?URL(Port, "path2"),
{ok, 200, Bridge2} = request(put, uri(["bridges", ?TEST_ID]),
?HTTP_BRIDGE(URL2)),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
, <<"node_metrics">> := [_|_]
, <<"url">> := URL2
}, jsx:decode(Bridge2)),
%% list all bridges again, assert Bridge2 is in it
{ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []),
?assertMatch([#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
, <<"node_metrics">> := [_|_]
, <<"url">> := URL2
}], jsx:decode(Bridge2Str)),
%% get the bridge by id
{ok, 200, Bridge3Str} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
, <<"node_metrics">> := [_|_]
, <<"url">> := URL2
}, jsx:decode(Bridge3Str)),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?TEST_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% update a deleted bridge returns an error
{ok, 404, ErrMsg2} = request(put, uri(["bridges", ?TEST_ID]),
?HTTP_BRIDGE(URL2)),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"bridge not found">>
}, jsx:decode(ErrMsg2)),
ok.
t_start_stop_bridges(_) ->
Port = start_http_server(fun handle_fun_200_ok/1),
URL1 = ?URL(Port, "abc"),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?HTTP_BRIDGE(URL1)#{<<"id">> => ?TEST_ID}),
%ct:pal("the bridge ==== ~p", [Bridge]),
?assertMatch(
#{ <<"id">> := ?TEST_ID
, <<"bridge_type">> := <<"http">>
, <<"status">> := _
, <<"node_status">> := [_|_]
, <<"metrics">> := _
, <<"node_metrics">> := [_|_]
, <<"url">> := URL1
}, jsx:decode(Bridge)),
%% stop it
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "stop"]),
<<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"status">> := <<"disconnected">>
}, jsx:decode(Bridge2)),
%% start again
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "start"]),
<<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% restart an already started bridge
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "restart"]),
<<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% stop it again
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "stop"]),
<<"">>),
%% restart a stopped bridge
{ok, 200, <<>>} = request(post,
uri(["nodes", node(), "bridges", ?TEST_ID, "operation", "restart"]),
<<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", ?TEST_ID]), []),
?assertMatch(#{ <<"id">> := ?TEST_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge4)),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?TEST_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
request(Method, Url, Body) ->
Request = case Body of
[] -> {Url, [auth_header_()]};
_ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
{ok, Code, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
NParts = [E || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
auth_header_() ->
Username = <<"admin">>,
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.

View File

@ -56,6 +56,7 @@
, emqx_exhook_schema
, emqx_psk_schema
, emqx_limiter_schema
, emqx_connector_schema
]).
namespace() -> undefined.

View File

@ -0,0 +1,23 @@
#connectors.mqtt.my_mqtt_connector {
# server = "127.0.0.1:1883"
# proto_ver = "v4"
# username = "username1"
# password = ""
# clean_start = true
# keepalive = 300
# retry_interval = "30s"
# max_inflight = 32
# reconnect_interval = "30s"
# bridge_mode = true
# replayq {
# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
# seg_bytes = "100MB"
# offload = false
# }
# ssl {
# enable = false
# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem"
# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
#}

View File

@ -14,6 +14,7 @@
epgsql,
mysql,
mongodb,
ehttpc,
emqx,
emqtt
]},

View File

@ -14,3 +14,101 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector).
-export([config_key_path/0]).
-export([ parse_connector_id/1
, connector_id/2
]).
-export([ list/0
, lookup/1
, lookup/2
, create_dry_run/2
, update/2
, update/3
, delete/1
, delete/2
]).
-export([ post_config_update/5
]).
config_key_path() ->
[connectors].
post_config_update([connectors, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
ConnId = connector_id(Type, Name),
LinkedBridgeIds = lists:foldl(fun
(#{id := BId, raw_config := #{<<"connector">> := ConnId0}}, Acc)
when ConnId0 == ConnId ->
[BId | Acc];
(_, Acc) -> Acc
end, [], emqx_bridge:list()),
case LinkedBridgeIds of
[] -> ok;
_ -> {error, {dependency_bridges_exist, LinkedBridgeIds}}
end;
post_config_update([connectors, Type, Name], _Req, NewConf, _OldConf, _AppEnvs) ->
ConnId = connector_id(Type, Name),
lists:foreach(fun
(#{id := BId, raw_config := #{<<"connector">> := ConnId0}}) when ConnId0 == ConnId ->
{BType, BName} = emqx_bridge:parse_bridge_id(BId),
BridgeConf = emqx:get_config([bridges, BType, BName]),
case emqx_bridge:recreate(BType, BName, BridgeConf#{connector => NewConf}) of
{ok, _} -> ok;
{error, Reason} -> error({update_bridge_error, Reason})
end;
(_) ->
ok
end, emqx_bridge:list()).
connector_id(Type0, Name0) ->
Type = bin(Type0),
Name = bin(Name0),
<<Type/binary, ":", Name/binary>>.
parse_connector_id(ConnectorId) ->
case string:split(bin(ConnectorId), ":", all) of
[Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)};
_ -> error({invalid_connector_id, ConnectorId})
end.
list() ->
lists:foldl(fun({Type, NameAndConf}, Connectors) ->
lists:foldl(fun({Name, RawConf}, Acc) ->
[RawConf#{<<"id">> => connector_id(Type, Name)} | Acc]
end, Connectors, maps:to_list(NameAndConf))
end, [], maps:to_list(emqx:get_raw_config(config_key_path(), #{}))).
lookup(Id) when is_binary(Id) ->
{Type, Name} = parse_connector_id(Id),
lookup(Type, Name).
lookup(Type, Name) ->
Id = connector_id(Type, Name),
case emqx:get_raw_config(config_key_path() ++ [Type, Name], not_found) of
not_found -> {error, not_found};
Conf -> {ok, Conf#{<<"id">> => Id}}
end.
create_dry_run(Type, Conf) ->
emqx_bridge:create_dry_run(Type, Conf).
update(Id, Conf) when is_binary(Id) ->
{Type, Name} = parse_connector_id(Id),
update(Type, Name, Conf).
update(Type, Name, Conf) ->
emqx_conf:update(config_key_path() ++ [Type, Name], Conf, #{override_to => cluster}).
delete(Id) when is_binary(Id) ->
{Type, Name} = parse_connector_id(Id),
delete(Type, Name).
delete(Type, Name) ->
emqx_conf:remove(config_key_path() ++ [Type, Name], #{override_to => cluster}).
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).

View File

@ -0,0 +1,203 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_api).
-behaviour(minirest_api).
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-import(hoconsc, [mk/2, ref/2, array/1, enum/1]).
%% Swagger specs from hocon schema
-export([api_spec/0, paths/0, schema/1, namespace/0]).
%% API callbacks
-export(['/connectors_test'/2, '/connectors'/2, '/connectors/:id'/2]).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_connector:parse_connector_id(Id) of
{ConnType, ConnName} ->
_ = ConnName,
EXPR
catch
error:{invalid_bridge_id, Id0} ->
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
". Bridge Ids must be of format <bridge_type>:<name>">>}}
end).
namespace() -> "connector".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
paths() -> ["/connectors_test", "/connectors", "/connectors/:id"].
error_schema(Code, Message) ->
[ {code, mk(string(), #{example => Code})}
, {message, mk(string(), #{example => Message})}
].
connector_info() ->
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_info")
]).
connector_test_info() ->
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_test_info")
]).
connector_req() ->
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector")
]).
param_path_id() ->
[{id, mk(binary(), #{in => path, example => <<"mqtt:my_mqtt_connector">>})}].
schema("/connectors_test") ->
#{
operationId => '/connectors_test',
post => #{
tags => [<<"connectors">>],
description => <<"Test creating a new connector by given Id <br>"
"The Id must be of format <type>:<name>">>,
summary => <<"Test creating connector">>,
requestBody => connector_test_info(),
responses => #{
200 => <<"Test connector OK">>,
400 => error_schema('TEST_FAILED', "connector test failed")
}
}
};
schema("/connectors") ->
#{
operationId => '/connectors',
get => #{
tags => [<<"connectors">>],
description => <<"List all connectors">>,
summary => <<"List connectors">>,
responses => #{
200 => mk(array(connector_info()), #{desc => "List of connectors"})
}
},
post => #{
tags => [<<"connectors">>],
description => <<"Create a new connector by given Id <br>"
"The Id must be of format <type>:<name>">>,
summary => <<"Create connector">>,
requestBody => connector_info(),
responses => #{
201 => connector_info(),
400 => error_schema('ALREADY_EXISTS', "connector already exists")
}
}
};
schema("/connectors/:id") ->
#{
operationId => '/connectors/:id',
get => #{
tags => [<<"connectors">>],
description => <<"Get the connector by Id">>,
summary => <<"Get connector">>,
parameters => param_path_id(),
responses => #{
200 => connector_info(),
404 => error_schema('NOT_FOUND', "Connector not found")
}
},
put => #{
tags => [<<"connectors">>],
description => <<"Update an existing connector by Id">>,
summary => <<"Update connector">>,
parameters => param_path_id(),
requestBody => connector_req(),
responses => #{
200 => <<"Update connector successfully">>,
400 => error_schema('UPDATE_FAIL', "Update failed"),
404 => error_schema('NOT_FOUND', "Connector not found")
}},
delete => #{
tags => [<<"connectors">>],
description => <<"Delete a connector by Id">>,
summary => <<"Delete connector">>,
parameters => param_path_id(),
responses => #{
204 => <<"Delete connector successfully">>,
400 => error_schema('DELETE_FAIL', "Delete failed")
}}
}.
'/connectors_test'(post, #{body := #{<<"bridge_type">> := ConnType} = Params}) ->
case emqx_connector:create_dry_run(ConnType, maps:remove(<<"bridge_type">>, Params)) of
ok -> {200};
{error, Error} ->
{400, error_msg('BAD_ARG', Error)}
end.
'/connectors'(get, _Request) ->
{200, emqx_connector:list()};
'/connectors'(post, #{body := #{<<"id">> := Id} = Params}) ->
?TRY_PARSE_ID(Id,
case emqx_connector:lookup(ConnType, ConnName) of
{ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
{error, not_found} ->
case emqx_connector:update(ConnType, ConnName, maps:remove(<<"id">>, Params)) of
{ok, #{raw_config := RawConf}} -> {201, RawConf#{<<"id">> => Id}};
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
end
end).
'/connectors/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
case emqx_connector:lookup(ConnType, ConnName) of
{ok, Conf} -> {200, Conf#{<<"id">> => Id}};
{error, not_found} ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
end);
'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params}) ->
?TRY_PARSE_ID(Id,
case emqx_connector:lookup(ConnType, ConnName) of
{ok, _} ->
case emqx_connector:update(ConnType, ConnName, Params) of
{ok, #{raw_config := RawConf}} -> {200, RawConf#{<<"id">> => Id}};
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
end;
{error, not_found} ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
end);
'/connectors/:id'(delete, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
case emqx_connector:lookup(ConnType, ConnName) of
{ok, _} ->
case emqx_connector:delete(ConnType, ConnName) of
{ok, _} -> {204};
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
end;
{error, not_found} ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
end).
error_msg(Code, Msg) when is_binary(Msg) ->
#{code => Code, message => Msg};
error_msg(Code, Msg) ->
#{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.

View File

@ -20,11 +20,15 @@
-export([start/2, stop/1]).
-define(CONF_HDLR_PATH, (emqx_connector:config_key_path() ++ ['?', '?'])).
start(_StartType, _StartArgs) ->
ok = emqx_config_handler:add_handler(?CONF_HDLR_PATH, emqx_connector),
emqx_connector_mqtt_worker:register_metrics(),
emqx_connector_sup:start_link().
stop(_State) ->
emqx_config_handler:remove_handler(?CONF_HDLR_PATH),
ok.
%% internal functions

View File

@ -38,7 +38,8 @@
, fields/1
, validations/0]).
-export([ check_ssl_opts/2 ]).
-export([ check_ssl_opts/2
]).
-type connect_timeout() :: emqx_schema:duration() | infinity.
-type pool_type() :: random | hash.
@ -50,73 +51,84 @@
%%=====================================================================
%% Hocon schema
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields("http_request") ->
[ {subscribe_local_topic, hoconsc:mk(binary())}
, {method, hoconsc:mk(method(), #{default => post})}
, {path, hoconsc:mk(binary(), #{default => <<"">>})}
, {headers, hoconsc:mk(map(),
#{default => #{
<<"accept">> => <<"application/json">>,
<<"cache-control">> => <<"no-cache">>,
<<"connection">> => <<"keep-alive">>,
<<"content-type">> => <<"application/json">>,
<<"keep-alive">> => <<"timeout=5">>}})
}
, {body, hoconsc:mk(binary(), #{default => <<"${payload}">>})}
, {request_timeout, hoconsc:mk(emqx_schema:duration_ms(), #{default => <<"30s">>})}
];
fields(config).
fields(config) ->
[ {base_url, fun base_url/1}
, {connect_timeout, fun connect_timeout/1}
, {max_retries, fun max_retries/1}
, {retry_interval, fun retry_interval/1}
, {pool_type, fun pool_type/1}
, {pool_size, fun pool_size/1}
, {enable_pipelining, fun enable_pipelining/1}
] ++ emqx_connector_schema_lib:ssl_fields().
[ {base_url,
sc(url(),
#{ nullable => false
, validator => fun(#{query := _Query}) ->
{error, "There must be no query in the base_url"};
(_) -> ok
end
, desc => """
The base URL is the URL includes only the scheme, host and port.<br>
When send an HTTP request, the real URL to be used is the concatenation of the base URL and the
path parameter (passed by the emqx_resource:query/2,3 or provided by the request parameter).<br>
For example: http://localhost:9901/
"""
})}
, {connect_timeout,
sc(emqx_schema:duration_ms(),
#{ default => "30s"
, desc => "The timeout when connecting to the HTTP server"
})}
, {max_retries,
sc(non_neg_integer(),
#{ default => 5
, desc => "Max retry times if error on sending request"
})}
, {retry_interval,
sc(emqx_schema:duration(),
#{ default => "1s"
, desc => "Interval before next retry if error on sending request"
})}
, {pool_type,
sc(pool_type(),
#{ default => random
, desc => "The type of the pool. Canbe one of random, hash"
})}
, {pool_size,
sc(non_neg_integer(),
#{ default => 8
, desc => "The pool size"
})}
, {enable_pipelining,
sc(boolean(),
#{ default => true
, desc => "Enable the HTTP pipeline"
})}
, {request, hoconsc:mk(
ref("request"),
#{ default => undefined
, nullable => true
, desc => """
If the request is provided, the caller can send HTTP requests via
<code>emqx_resource:query(ResourceId, {send_message, BridgeId, Message})</code>
"""
})}
] ++ emqx_connector_schema_lib:ssl_fields();
method() ->
hoconsc:enum([post, put, get, delete]).
fields("request") ->
[ {method, hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{nullable => true})}
, {path, hoconsc:mk(binary(), #{nullable => true})}
, {body, hoconsc:mk(binary(), #{nullable => true})}
, {headers, hoconsc:mk(map(), #{nullable => true})}
, {request_timeout,
sc(emqx_schema:duration_ms(),
#{ nullable => true
, desc => "The timeout when sending request to the HTTP server"
})}
].
validations() ->
[ {check_ssl_opts, fun check_ssl_opts/1} ].
base_url(type) -> url();
base_url(nullable) -> false;
base_url(validator) -> fun(#{query := _Query}) ->
{error, "There must be no query in the base_url"};
(_) -> ok
end;
base_url(_) -> undefined.
connect_timeout(type) -> emqx_schema:duration_ms();
connect_timeout(default) -> <<"5s">>;
connect_timeout(_) -> undefined.
max_retries(type) -> non_neg_integer();
max_retries(default) -> 5;
max_retries(_) -> undefined.
retry_interval(type) -> emqx_schema:duration();
retry_interval(default) -> <<"1s">>;
retry_interval(_) -> undefined.
pool_type(type) -> pool_type();
pool_type(default) -> hash;
pool_type(_) -> undefined.
pool_size(type) -> non_neg_integer();
pool_size(default) -> 8;
pool_size(_) -> undefined.
enable_pipelining(type) -> boolean();
enable_pipelining(default) -> true;
enable_pipelining(_) -> undefined.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).
%% ===================================================================
on_start(InstId, #{base_url := #{scheme := Scheme,
host := Host,
port := Port,
@ -153,7 +165,7 @@ on_start(InstId, #{base_url := #{scheme := Scheme,
host => Host,
port => Port,
base_path => BasePath,
channels => preproc_channels(InstId, Config)
request => preprocess_request(maps:get(request, Config, undefined))
},
case ehttpc_sup:start_pool(PoolName, PoolOpts) of
{ok, _} -> {ok, State};
@ -167,12 +179,12 @@ on_stop(InstId, #{pool_name := PoolName}) ->
connector => InstId}),
ehttpc_sup:stop_pool(PoolName).
on_query(InstId, {send_message, ChannelId, Msg}, AfterQuery, #{channels := Channels} = State) ->
case maps:find(ChannelId, Channels) of
error -> ?SLOG(error, #{msg => "channel not found", channel_id => ChannelId});
{ok, ChannConf} ->
on_query(InstId, {send_message, Msg}, AfterQuery, State) ->
case maps:get(request, State, undefined) of
undefined -> ?SLOG(error, #{msg => "request not found", connector => InstId});
Request ->
#{method := Method, path := Path, body := Body, headers := Headers,
request_timeout := Timeout} = proc_channel_conf(ChannConf, Msg),
request_timeout := Timeout} = process_request(Request, Msg),
on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State)
end;
on_query(InstId, {Method, Request}, AfterQuery, State) ->
@ -212,24 +224,21 @@ on_health_check(_InstId, #{host := Host, port := Port} = State) ->
%% Internal functions
%%--------------------------------------------------------------------
preproc_channels(<<"bridge:", BridgeId/binary>>, Config) ->
{BridgeType, BridgeName} = emqx_bridge:parse_bridge_id(BridgeId),
maps:fold(fun(ChannName, ChannConf, Acc) ->
Acc#{emqx_bridge:channel_id(BridgeType, BridgeName, egress_channels, ChannName) =>
preproc_channel_conf(ChannConf)}
end, #{}, maps:get(egress_channels, Config, #{}));
preproc_channels(_InstId, _Config) ->
#{}.
preproc_channel_conf(#{
preprocess_request(undefined) ->
undefined;
preprocess_request(Req) when map_size(Req) == 0 ->
undefined;
preprocess_request(#{
method := Method,
path := Path,
body := Body,
headers := Headers} = Conf) ->
Conf#{ method => emqx_plugin_libs_rule:preproc_tmpl(bin(Method))
headers := Headers
} = Req) ->
#{ method => emqx_plugin_libs_rule:preproc_tmpl(bin(Method))
, path => emqx_plugin_libs_rule:preproc_tmpl(Path)
, body => emqx_plugin_libs_rule:preproc_tmpl(Body)
, headers => preproc_headers(Headers)
, request_timeout => maps:get(request_timeout, Req, 30000)
}.
preproc_headers(Headers) ->
@ -238,15 +247,18 @@ preproc_headers(Headers) ->
emqx_plugin_libs_rule:preproc_tmpl(bin(V))}
end, #{}, Headers).
proc_channel_conf(#{
process_request(#{
method := MethodTks,
path := PathTks,
body := BodyTks,
headers := HeadersTks} = Conf, Msg) ->
headers := HeadersTks,
request_timeout := ReqTimeout
} = Conf, Msg) ->
Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg))
, path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg)
, body => emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg)
, headers => maps:to_list(proc_headers(HeadersTks, Msg))
, request_timeout => ReqTimeout
}.
proc_headers(HeaderTks, Msg) ->
@ -264,7 +276,7 @@ check_ssl_opts(Conf) ->
check_ssl_opts("base_url", Conf).
check_ssl_opts(URLFrom, Conf) ->
#{schema := Scheme} = hocon_schema:get_value(URLFrom, Conf),
#{scheme := Scheme} = hocon_schema:get_value(URLFrom, Conf),
SSL= hocon_schema:get_value("ssl", Conf),
case {Scheme, maps:get(enable, SSL, false)} of
{http, false} -> true;

View File

@ -46,7 +46,7 @@
%%=====================================================================
%% Hocon schema
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, "config")}}].
fields("config").
fields("config") ->
emqx_connector_mqtt_schema:fields("config").
@ -89,111 +89,74 @@ drop_bridge(Name) ->
%% ===================================================================
%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called
%% if the bridge received msgs from the remote broker.
on_message_received(Msg, ChannId) ->
Name = atom_to_binary(ChannId, utf8),
emqx:run_hook(<<"$bridges/", Name/binary>>, [Msg]).
on_message_received(Msg, HookPoint) ->
emqx:run_hook(HookPoint, [Msg]).
%% ===================================================================
on_start(InstId, Conf) ->
InstanceId = binary_to_atom(InstId, utf8),
?SLOG(info, #{msg => "starting mqtt connector",
connector => InstId, config => Conf}),
"bridge:" ++ NamePrefix = binary_to_list(InstId),
connector => InstanceId, config => Conf}),
BasicConf = basic_config(Conf),
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}},
InOutConfigs = taged_map_list(ingress_channels, maps:get(ingress_channels, Conf, #{}))
++ taged_map_list(egress_channels, maps:get(egress_channels, Conf, #{})),
lists:foldl(fun
(_InOutConf, {error, Reason}) ->
{error, Reason};
(InOutConf, {ok, #{channels := SubBridges} = Res}) ->
case create_channel(InOutConf, NamePrefix, BasicConf) of
{error, Reason} -> {error, Reason};
{ok, Name} -> {ok, Res#{channels => [Name | SubBridges]}}
end
end, InitRes, InOutConfigs).
on_stop(InstId, #{channels := NameList}) ->
?SLOG(info, #{msg => "stopping mqtt connector",
connector => InstId}),
lists:foreach(fun(Name) ->
remove_channel(Name)
end, NameList).
%% TODO: let the emqx_resource trigger on_query/4 automatically according to the
%% `ingress_channels` and `egress_channels` config
on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
baisc_conf := BasicConf}) ->
create_channel(Conf, Prefix, BasicConf);
on_query(_InstId, {send_message, ChannelId, Msg}, _AfterQuery, _State) ->
?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
channel_id => ChannelId}),
emqx_connector_mqtt_worker:send_to_remote(ChannelId, Msg).
on_health_check(_InstId, #{channels := NameList} = State) ->
Results = [{Name, emqx_connector_mqtt_worker:ping(Name)} || Name <- NameList],
case lists:all(fun({_, pong}) -> true; ({_, _}) -> false end, Results) of
true -> {ok, State};
false -> {error, {some_channel_down, Results}, State}
end.
create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT} = Conf},
NamePrefix, BasicConf) ->
LocalT = maps:get(local_topic, Conf, undefined),
ChannId = ingress_channel_id(NamePrefix, Id),
?SLOG(info, #{msg => "creating ingress channel",
remote_topic => RemoteT,
local_topic => LocalT,
channel_id => ChannId}),
do_create_channel(BasicConf#{
name => ChannId,
clientid => clientid(ChannId),
subscriptions => Conf#{
local_topic => LocalT,
on_message_received => {fun ?MODULE:on_message_received/2, [ChannId]}
BridgeConf = BasicConf#{
name => InstanceId,
clientid => clientid(InstanceId),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)),
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
},
forwards => undefined});
create_channel({{egress_channels, Id}, #{remote_topic := RemoteT} = Conf},
NamePrefix, BasicConf) ->
LocalT = maps:get(subscribe_local_topic, Conf, undefined),
ChannId = egress_channel_id(NamePrefix, Id),
?SLOG(info, #{msg => "creating egress channel",
remote_topic => RemoteT,
local_topic => LocalT,
channel_id => ChannId}),
do_create_channel(BasicConf#{
name => ChannId,
clientid => clientid(ChannId),
subscriptions => undefined,
forwards => Conf#{subscribe_local_topic => LocalT}}).
remove_channel(ChannId) ->
?SLOG(info, #{msg => "removing channel",
channel_id => ChannId}),
case ?MODULE:drop_bridge(ChannId) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
?SLOG(error, #{msg => "stop channel failed",
channel_id => ChannId, reason => Reason})
end.
do_create_channel(#{name := Name} = Conf) ->
case ?MODULE:create_bridge(Conf) of
case ?MODULE:create_bridge(BridgeConf) of
{ok, _Pid} ->
start_channel(Name);
case emqx_connector_mqtt_worker:ensure_started(InstanceId) of
ok -> {ok, #{name => InstanceId}};
{error, Reason} -> {error, Reason}
end;
{error, {already_started, _Pid}} ->
{ok, Name};
{ok, #{name => InstanceId}};
{error, Reason} ->
{error, Reason}
end.
start_channel(Name) ->
case emqx_connector_mqtt_worker:ensure_started(Name) of
ok -> {ok, Name};
{error, Reason} -> {error, Reason}
on_stop(_InstId, #{name := InstanceId}) ->
?SLOG(info, #{msg => "stopping mqtt connector",
connector => InstanceId}),
case ?MODULE:drop_bridge(InstanceId) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
?SLOG(error, #{msg => "stop mqtt connector",
connector => InstanceId, reason => Reason})
end.
on_query(_InstId, {send_message, Msg}, _AfterQuery, #{name := InstanceId}) ->
?SLOG(debug, #{msg => "send msg to remote node", message => Msg,
connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg).
on_health_check(_InstId, #{name := InstanceId} = State) ->
case emqx_connector_mqtt_worker:ping(InstanceId) of
pong -> {ok, State};
_ -> {error, {connector_down, InstanceId}, State}
end.
make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
undefined;
make_sub_confs(undefined) ->
undefined;
make_sub_confs(SubRemoteConf) ->
case maps:take(hookpoint, SubRemoteConf) of
error -> SubRemoteConf;
{HookPoint, SubConf} ->
MFA = {?MODULE, on_message_received, [HookPoint]},
SubConf#{on_message_received => MFA}
end.
make_forward_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
undefined;
make_forward_confs(undefined) ->
undefined;
make_forward_confs(FrowardConf) ->
FrowardConf.
basic_config(#{
server := Server,
reconnect_interval := ReconnIntv,
@ -225,23 +188,5 @@ basic_config(#{
if_record_metrics => true
}.
taged_map_list(Tag, Map) ->
[{{Tag, K}, V} || {K, V} <- maps:to_list(Map)].
ingress_channel_id(Prefix, Id) ->
channel_name("ingress_channels", Prefix, Id).
egress_channel_id(Prefix, Id) ->
channel_name("egress_channels", Prefix, Id).
channel_name(Type, Prefix, Id) ->
list_to_atom(str(Prefix) ++ ":" ++ Type ++ ":" ++ str(Id)).
clientid(Id) ->
list_to_binary(str(Id) ++ ":" ++ emqx_misc:gen_id(8)).
str(A) when is_atom(A) ->
atom_to_list(A);
str(B) when is_binary(B) ->
binary_to_list(B);
str(S) when is_list(S) ->
S.
list_to_binary(lists:concat([Id, ":", node()])).

View File

@ -0,0 +1,36 @@
-module(emqx_connector_schema).
-behaviour(hocon_schema).
-include_lib("typerefl/include/types.hrl").
-export([roots/0, fields/1]).
%%======================================================================================
%% Hocon Schema Definitions
roots() -> ["connectors"].
fields("connectors") ->
[ {mqtt,
sc(hoconsc:map(name,
hoconsc:union([ ref("mqtt_connector")
])),
#{ desc => "MQTT bridges"
})}
];
fields("mqtt_connector") ->
emqx_connector_mqtt_schema:fields("connector");
fields("mqtt_connector_info") ->
[{id, sc(binary(), #{desc => "The connector Id"})}]
++ fields("mqtt_connector");
fields("mqtt_connector_test_info") ->
[{bridge_type, sc(mqtt, #{desc => "The Bridge Type"})}]
++ fields("mqtt_connector").
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).

View File

@ -65,7 +65,7 @@ start(Config) ->
case emqtt:connect(Pid) of
{ok, _} ->
try
ok = subscribe_remote_topics(Pid, Subscriptions),
ok = from_remote_topics(Pid, Subscriptions),
{ok, #{client_pid => Pid, subscriptions => Subscriptions}}
catch
throw : Reason ->
@ -160,14 +160,18 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
handle_publish(Msg, undefined) ->
?SLOG(error, #{msg => "cannot publish to local broker as"
" ingress_channles' is not configured",
" 'ingress' is not configured",
message => Msg});
handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) ->
handle_publish(Msg, Vars) ->
?SLOG(debug, #{msg => "publish to local broker",
message => Msg, vars => Vars}),
emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
_ = erlang:apply(OnMsgRcvdFunc, [Msg | Args]),
case maps:get(local_topic, Vars, undefined) of
case Vars of
#{on_message_received := {Mod, Func, Args}} ->
_ = erlang:apply(Mod, Func, [Msg | Args]);
_ -> ok
end,
case maps:get(to_local_topic, Vars, undefined) of
undefined -> ok;
_Topic ->
emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars))
@ -182,8 +186,8 @@ make_hdlr(Parent, Vars) ->
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
}.
subscribe_remote_topics(_ClientPid, undefined) -> ok;
subscribe_remote_topics(ClientPid, #{subscribe_remote_topic := FromTopic, subscribe_qos := QoS}) ->
from_remote_topics(_ClientPid, undefined) -> ok;
from_remote_topics(ClientPid, #{from_remote_topic := FromTopic, subscribe_qos := QoS}) ->
case emqtt:subscribe(ClientPid, FromTopic, QoS) of
{ok, _, _} -> ok;
Error -> throw(Error)

View File

@ -36,7 +36,7 @@
-type variables() :: #{
mountpoint := undefined | binary(),
remote_topic := binary(),
to_remote_topic := binary(),
qos := original | integer(),
retain := original | boolean(),
payload := binary()
@ -59,7 +59,7 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
Retain0 = maps:get(retain, Flags0, false),
MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)),
to_remote_msg(MapMsg, Vars);
to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken,
to_remote_msg(MapMsg, #{to_remote_topic := TopicToken, payload := PayloadToken,
qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg),
@ -75,7 +75,7 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
%% published from remote node over a MQTT connection
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
#{local_topic := TopicToken, payload := PayloadToken,
#{to_local_topic := TopicToken, payload := PayloadToken,
qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg),

View File

@ -21,58 +21,203 @@
-behaviour(hocon_schema).
-export([ roots/0
, fields/1]).
, fields/1
]).
-export([ ingress_desc/0
, egress_desc/0
]).
-import(emqx_schema, [mk_duration/2]).
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, "config")}}].
fields("config").
fields("config") ->
[ {server, hoconsc:mk(emqx_schema:ip_port(), #{default => "127.0.0.1:1883"})}
fields("connector") ++
topic_mappings();
fields("connector") ->
[ {server,
sc(emqx_schema:ip_port(),
#{ default => "127.0.0.1:1883"
, desc => "The host and port of the remote MQTT broker"
})}
, {reconnect_interval, mk_duration("reconnect interval", #{default => "30s"})}
, {proto_ver, fun proto_ver/1}
, {bridge_mode, hoconsc:mk(boolean(), #{default => true})}
, {username, hoconsc:mk(string())}
, {password, hoconsc:mk(string())}
, {clean_start, hoconsc:mk(boolean(), #{default => true})}
, {proto_ver,
sc(hoconsc:enum([v3, v4, v5]),
#{ default => v4
, desc => "The MQTT protocol version"
})}
, {bridge_mode,
sc(boolean(),
#{ default => true
, desc => "The bridge mode of the MQTT protocol"
})}
, {username,
sc(binary(),
#{ default => "emqx"
, desc => "The username of the MQTT protocol"
})}
, {password,
sc(binary(),
#{ default => "emqx"
, desc => "The password of the MQTT protocol"
})}
, {clientid,
sc(binary(),
#{ default => "emqx_${nodename}"
, desc => "The clientid of the MQTT protocol"
})}
, {clean_start,
sc(boolean(),
#{ default => true
, desc => "The clean-start or the clean-session of the MQTT protocol"
})}
, {keepalive, mk_duration("keepalive", #{default => "300s"})}
, {retry_interval, mk_duration("retry interval", #{default => "30s"})}
, {max_inflight, hoconsc:mk(integer(), #{default => 32})}
, {replayq, hoconsc:mk(hoconsc:ref(?MODULE, "replayq"))}
, {ingress_channels, hoconsc:mk(hoconsc:map(id, hoconsc:ref(?MODULE, "ingress_channels")), #{default => []})}
, {egress_channels, hoconsc:mk(hoconsc:map(id, hoconsc:ref(?MODULE, "egress_channels")), #{default => []})}
, {max_inflight,
sc(integer(),
#{ default => 32
, desc => "Max inflight messages (sent but ACK has not received) of the MQTT protocol"
})}
, {replayq,
sc(ref("replayq"),
#{ desc => """
Queue messages in disk files.
"""
})}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress_channels") ->
%% the message maybe subscribed by rules, in this case 'local_topic' is not necessary
[ {subscribe_remote_topic, hoconsc:mk(binary(), #{nullable => false})}
, {local_topic, hoconsc:mk(binary())}
, {subscribe_qos, hoconsc:mk(qos(), #{default => 1})}
fields("ingress") ->
%% the message maybe subscribed by rules, in this case 'to_local_topic' is not necessary
[ {from_remote_topic,
sc(binary(),
#{ nullable => false
, desc => "Receive messages from which topic of the remote broker"
})}
, {subscribe_qos,
sc(qos(),
#{ default => 1
, desc => "The QoS level to be used when subscribing to the remote broker"
})}
, {to_local_topic,
sc(binary(),
#{ desc => """
Send messages to which topic of the local broker.<br>
Template with variables is allowed.
"""
})}
, {hookpoint,
sc(binary(),
#{ desc => """
The hookpoint will be triggered when there's any message received from the remote broker.
"""
})}
] ++ common_inout_confs();
fields("egress_channels") ->
%% the message maybe sent from rules, in this case 'subscribe_local_topic' is not necessary
[ {subscribe_local_topic, hoconsc:mk(binary())}
, {remote_topic, hoconsc:mk(binary(), #{default => <<"${topic}">>})}
fields("egress") ->
%% the message maybe sent from rules, in this case 'from_local_topic' is not necessary
[ {from_local_topic,
sc(binary(),
#{ desc => "The local topic to be forwarded to the remote broker"
})}
, {to_remote_topic,
sc(binary(),
#{ default => <<"${topic}">>
, desc => """
Forward to which topic of the remote broker.<br>
Template with variables is allowed.
"""
})}
] ++ common_inout_confs();
fields("replayq") ->
[ {dir, hoconsc:union([boolean(), string()])}
, {seg_bytes, hoconsc:mk(emqx_schema:bytesize(), #{default => "100MB"})}
, {offload, hoconsc:mk(boolean(), #{default => false})}
, {max_total_bytes, hoconsc:mk(emqx_schema:bytesize(), #{default => "1024MB"})}
[ {dir,
sc(hoconsc:union([boolean(), string()]),
#{ desc => """
The dir where the replayq file saved.<br>
Set to 'false' disables the replayq feature.
"""
})}
, {seg_bytes,
sc(emqx_schema:bytesize(),
#{ default => "100MB"
, desc => """
The size in bytes of a single segment.<br>
A segment is mapping to a file in the replayq dir. If the current segment is full, a new segment
(file) will be opened to write.
"""
})}
, {offload,
sc(boolean(),
#{ default => false
, desc => """
In offload mode, the disk queue is only used to offload queue tail segments.<br>
The messages are cached in the memory first, then it write to the replayq files after the size of
the memory cache reaches 'seg_bytes'.
"""
})}
].
topic_mappings() ->
[ {ingress,
sc(ref("ingress"),
#{ default => #{}
, desc => ingress_desc()
})}
, {egress,
sc(ref("egress"),
#{ default => #{}
, desc => egress_desc()
})}
].
ingress_desc() -> """
The ingress config defines how this bridge receive messages from the remote MQTT broker, and then
send them to the local broker.<br>
Template with variables is allowed in 'to_local_topic', 'subscribe_qos', 'qos', 'retain',
'payload'.<br>
NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also to_local_topic is
configured, then messages got from the remote broker will be sent to both the 'to_local_topic' and
the rule.
""".
egress_desc() -> """
The egress config defines how this bridge forwards messages from the local broker to the remote
broker.<br>
Template with variables is allowed in 'to_remote_topic', 'qos', 'retain', 'payload'.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic
is configured, then both the data got from the rule and the MQTT messages that matches
from_local_topic will be forwarded.
""".
common_inout_confs() ->
[ {qos, hoconsc:mk(qos(), #{default => <<"${qos}">>})}
, {retain, hoconsc:mk(hoconsc:union([boolean(), binary()]), #{default => <<"${retain}">>})}
, {payload, hoconsc:mk(binary(), #{default => <<"${payload}">>})}
[ {qos,
sc(qos(),
#{ default => <<"${qos}">>
, desc => """
The QoS of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
, {retain,
sc(hoconsc:union([boolean(), binary()]),
#{ default => <<"${retain}">>
, desc => """
The retain flag of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
, {payload,
sc(binary(),
#{ default => <<"${payload}">>
, desc => """
The payload of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
].
qos() ->
hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2), binary()]).
proto_ver(type) -> hoconsc:enum([v3, v4, v5]);
proto_ver(default) -> v4;
proto_ver(_) -> undefined.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).

View File

@ -101,14 +101,12 @@
-export([msg_marshaller/1]).
-export_type([ config/0
, batch/0
, ack_ref/0
]).
-type id() :: atom() | string() | pid().
-type qos() :: emqx_types:qos().
-type config() :: map().
-type batch() :: [emqx_connector_mqtt_msg:exp_msg()].
-type ack_ref() :: term().
-type topic() :: emqx_types:topic().
@ -117,7 +115,7 @@
%% same as default in-flight limit for emqtt
-define(DEFAULT_BATCH_SIZE, 32).
-define(DEFAULT_INFLIGHT_SIZE, 32).
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
-define(DEFAULT_SEG_BYTES, (1 bsl 20)).
-define(DEFAULT_MAX_TOTAL_SIZE, (1 bsl 31)).
@ -205,12 +203,10 @@ init_state(Opts) ->
ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS),
StartType = maps:get(start_type, Opts, manual),
Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
MaxInflightSize = maps:get(max_inflight, Opts, ?DEFAULT_BATCH_SIZE),
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
MaxInflightSize = maps:get(max_inflight, Opts, ?DEFAULT_INFLIGHT_SIZE),
Name = maps:get(name, Opts, undefined),
#{start_type => StartType,
reconnect_interval => ReconnDelayMs,
batch_size => BatchSize,
mountpoint => format_mountpoint(Mountpoint),
inflight => [],
max_inflight => MaxInflightSize,
@ -235,8 +231,8 @@ pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts)
pre_process_in_out(undefined) -> undefined;
pre_process_in_out(Conf) when is_map(Conf) ->
Conf1 = pre_process_conf(local_topic, Conf),
Conf2 = pre_process_conf(remote_topic, Conf1),
Conf1 = pre_process_conf(to_local_topic, Conf),
Conf2 = pre_process_conf(to_remote_topic, Conf1),
Conf3 = pre_process_conf(payload, Conf2),
Conf4 = pre_process_conf(qos, Conf3),
pre_process_conf(retain, Conf4).
@ -327,10 +323,6 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F
{keep_state_and_data, [{reply, From, Forwards}]};
common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
{keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]};
common(_StateName, info, {deliver, _, Msg}, State = #{replayq := Q}) ->
Msgs = collect([Msg]),
NewQ = replayq:append(Q, Msgs),
{keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}};
common(_StateName, info, {'EXIT', _, _}, State) ->
{keep_state, State};
common(_StateName, cast, {send_to_remote, Msg}, #{replayq := Q} = State) ->
@ -342,13 +334,9 @@ common(StateName, Type, Content, #{name := Name} = State) ->
content => Content}),
{keep_state, State}.
do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards},
do_connect(#{connect_opts := ConnectOpts,
inflight := Inflight,
name := Name} = State) ->
case Forwards of
undefined -> ok;
#{subscribe_local_topic := Topic} -> subscribe_local_topic(Topic, Name)
end,
case emqx_connector_mqtt_mod:start(ConnectOpts) of
{ok, Conn} ->
?tp(info, connected, #{name => Name, inflight => length(Inflight)}),
@ -360,19 +348,10 @@ do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards},
{error, Reason, State}
end.
collect(Acc) ->
receive
{deliver, _, Msg} ->
collect([Msg | Acc])
after
0 ->
lists:reverse(Acc)
end.
%% Retry all inflight (previously sent but not acked) batches.
retry_inflight(State, []) -> {ok, State};
retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Rest] = OldInf) ->
case do_send(State, QAckRef, Batch) of
retry_inflight(State, [#{q_ack_ref := QAckRef, msg := Msg} | Rest] = OldInf) ->
case do_send(State, QAckRef, Msg) of
{ok, State1} ->
retry_inflight(State1, Rest);
{error, #{inflight := NewInf} = State1} ->
@ -393,34 +372,33 @@ pop_and_send_loop(#{replayq := Q} = State, N) ->
false ->
BatchSize = 1,
Opts = #{count_limit => BatchSize, bytes_limit => 999999999},
{Q1, QAckRef, Batch} = replayq:pop(Q, Opts),
case do_send(State#{replayq := Q1}, QAckRef, Batch) of
{Q1, QAckRef, [Msg]} = replayq:pop(Q, Opts),
case do_send(State#{replayq := Q1}, QAckRef, Msg) of
{ok, NewState} -> pop_and_send_loop(NewState, N - 1);
{error, NewState} -> {error, NewState}
end
end.
%% Assert non-empty batch because we have a is_empty check earlier.
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Batch) ->
do_send(#{connect_opts := #{forwards := undefined}}, _QAckRef, Msg) ->
?SLOG(error, #{msg => "cannot forward messages to remote broker"
" as egress_channel is not configured",
messages => Batch});
" as 'egress' is not configured",
messages => Msg});
do_send(#{inflight := Inflight,
connection := Connection,
mountpoint := Mountpoint,
connect_opts := #{forwards := Forwards}} = State, QAckRef, [_ | _] = Batch) ->
connect_opts := #{forwards := Forwards}} = State, QAckRef, Msg) ->
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
ExportMsg = fun(Message) ->
emqx_metrics:inc('bridge.mqtt.message_sent_to_remote'),
emqx_connector_mqtt_msg:to_remote_msg(Message, Vars)
end,
?SLOG(debug, #{msg => "publish to remote broker",
message => Batch, vars => Vars}),
case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(M) || M <- Batch]) of
message => Msg, vars => Vars}),
case emqx_connector_mqtt_mod:send(Connection, [ExportMsg(Msg)]) of
{ok, Refs} ->
{ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef,
send_ack_ref => map_set(Refs),
batch => Batch}]}};
msg => Msg}]}};
{error, Reason} ->
?SLOG(info, #{msg => "mqtt_bridge_produce_failed",
reason => Reason}),
@ -473,27 +451,6 @@ drop_acked_batches(Q, [#{send_ack_ref := Refs,
All
end.
subscribe_local_topic(undefined, _Name) ->
ok;
subscribe_local_topic(Topic, Name) ->
do_subscribe(Topic, Name).
topic(T) -> iolist_to_binary(T).
validate(RawTopic) ->
Topic = topic(RawTopic),
try emqx_topic:validate(Topic) of
_Success -> Topic
catch
error:Reason ->
error({bad_topic, Topic, Reason})
end.
do_subscribe(RawTopic, Name) ->
TopicFilter = validate(RawTopic),
{Topic, SubOpts} = emqx_topic:parse(TopicFilter, #{qos => ?QOS_2}),
emqx_broker:subscribe(Topic, Name, SubOpts).
disconnect(#{connection := Conn} = State) when Conn =/= undefined ->
emqx_connector_mqtt_mod:stop(Conn),
State#{connection => undefined};

View File

@ -0,0 +1,310 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_api_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"connectors: {}">>).
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(CONNECTR_ID, <<"mqtt:test_connector">>).
-define(BRIDGE_ID, <<"mqtt:test_bridge">>).
-define(MQTT_CONNECOTR(Username),
#{
<<"server">> => <<"127.0.0.1:1883">>,
<<"username">> => Username,
<<"password">> => <<"">>,
<<"proto_ver">> => <<"v4">>,
<<"ssl">> => #{<<"enable">> => false}
}).
-define(MQTT_CONNECOTR2(Server),
?MQTT_CONNECOTR(<<"user1">>)#{<<"server">> => Server}).
-define(MQTT_BRIDGE(ID),
#{
<<"connector">> => ID,
<<"direction">> => <<"ingress">>,
<<"from_remote_topic">> => <<"remote_topic/#">>,
<<"to_local_topic">> => <<"local_topic/${topic}">>,
<<"subscribe_qos">> => 1,
<<"payload">> => <<"${payload}">>,
<<"qos">> => <<"${qos}">>,
<<"retain">> => <<"${retain}">>
}).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
suite() ->
[{timetrap,{seconds,30}}].
init_per_suite(Config) ->
ok = emqx_config:put([emqx_dashboard], #{
default_username => <<"admin">>,
default_password => <<"public">>,
listeners => [#{
protocol => http,
port => 18083
}]
}),
_ = application:load(emqx_conf),
%% some testcases (may from other app) already get emqx_connector started
_ = application:stop(emqx_resource),
_ = application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
ok = emqx_config:init_load(emqx_connector_schema, ?CONF_DEFAULT),
ok = emqx_config:init_load(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_connector, emqx_bridge, emqx_dashboard]),
ok.
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_, _Config) ->
ok.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_mqtt_crud_apis(_) ->
%% assert we there's no connectors at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
%% then we add a mqtt connector, using POST
%% POST /connectors/ will create a connector
User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User1
, <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">>
, <<"ssl">> := #{<<"enable">> := false}
}, jsx:decode(Connector)),
%% create a again returns an error
{ok, 400, RetMsg} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"connector already exists">>
}, jsx:decode(RetMsg)),
%% update the request-path of the connector
User2 = <<"user2">>,
{ok, 200, Connector2} = request(put, uri(["connectors", ?CONNECTR_ID]),
?MQTT_CONNECOTR(User2)),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2
, <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">>
, <<"ssl">> := #{<<"enable">> := false}
}, jsx:decode(Connector2)),
%% list all connectors again, assert Connector2 is in it
{ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
?assertMatch([#{ <<"id">> := ?CONNECTR_ID
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2
, <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">>
, <<"ssl">> := #{<<"enable">> := false}
}], jsx:decode(Connector2Str)),
%% get the connector by id
{ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2
, <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">>
, <<"ssl">> := #{<<"enable">> := false}
}, jsx:decode(Connector3Str)),
%% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
%% update a deleted connector returns an error
{ok, 404, ErrMsg2} = request(put, uri(["connectors", ?CONNECTR_ID]),
?MQTT_CONNECOTR(User2)),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"connector not found">>
}, jsx:decode(ErrMsg2)),
ok.
t_mqtt_conn_bridge(_) ->
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User1
, <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">>
, <<"ssl">> := #{<<"enable">> := false}
}, jsx:decode(Connector)),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}),
%ct:pal("---bridge: ~p", [Bridge]),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"bridge_type">> := <<"mqtt">>
, <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID
}, jsx:decode(Bridge)),
%% we now test if the bridge works as expected
RemoteTopic = <<"remote_topic/1">>,
LocalTopic = <<"local_topic/", RemoteTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(LocalTopic),
%% PUBLISH a message to the 'remote' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(RemoteTopic, Payload)),
%% we should receive a message on the local broker, with specified topic
?assert(
receive
{deliver, LocalTopic, #message{payload = Payload}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
false
after 100 ->
false
end),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
ok.
%% t_mqtt_conn_update:
%% - update a connector should also update all of the the bridges
%% - cannot delete a connector that is used by at least one bridge
t_mqtt_conn_update(_) ->
%% assert we there's no connectors and no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a mqtt connector, using POST
{ok, 201, Connector} = request(post, uri(["connectors"]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"id">> => ?CONNECTR_ID}),
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"server">> := <<"127.0.0.1:1883">>
}, jsx:decode(Connector)),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"bridge_type">> := <<"mqtt">>
, <<"status">> := <<"connected">>
, <<"connector">> := ?CONNECTR_ID
}, jsx:decode(Bridge)),
%% then we try to update 'server' of the connector, to an unavailable IP address
%% the update should fail because of 'unreachable' or 'connrefused'
{ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)),
%% we fix the 'server' parameter to a normal one, it should work
{ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector
{ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_testing(_) ->
%% APIs for testing the connectivity
%% then we add a mqtt connector, using POST
{ok, 200, <<>>} = request(post, uri(["connectors_test"]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"bridge_type">> => <<"mqtt">>}),
{ok, 400, _} = request(post, uri(["connectors_test"]),
?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)#{<<"bridge_type">> => <<"mqtt">>}).
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
-define(HOST, "http://127.0.0.1:18083/").
-define(API_VERSION, "v5").
-define(BASE_PATH, "api").
request(Method, Url, Body) ->
Request = case Body of
[] -> {Url, [auth_header_()]};
_ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)}
end,
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
{ok, Code, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
uri() -> uri([]).
uri(Parts) when is_list(Parts) ->
NParts = [E || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
auth_header_() ->
Username = <<"admin">>,
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.

View File

@ -9,12 +9,12 @@
-export([schema_with_example/2, schema_with_examples/2]).
-export([error_codes/1, error_codes/2]).
-export([filter_check_request/2, filter_check_request_and_translate_body/2]).
-ifdef(TEST).
-export([
parse_spec_ref/2,
components/1,
filter_check_request/2,
filter_check_request_and_translate_body/2]).
-export([ parse_spec_ref/2
, components/1
]).
-endif.
-define(METHODS, [get, post, put, head, delete, patch, options, trace]).
@ -137,9 +137,9 @@ check_only(Schema, Map, Opts) ->
Map.
support_check_schema(#{check_schema := true, translate_body := true}) ->
#{filter => fun filter_check_request_and_translate_body/2};
#{filter => fun ?MODULE:filter_check_request_and_translate_body/2};
support_check_schema(#{check_schema := true}) ->
#{filter => fun filter_check_request/2};
#{filter => fun ?MODULE:filter_check_request/2};
support_check_schema(#{check_schema := Filter}) when is_function(Filter, 2) ->
#{filter => Filter};
support_check_schema(_) ->
@ -200,7 +200,7 @@ check_request_body(#{body := Body}, Schema, Module, CheckFun, true) ->
_ -> Type0
end,
NewSchema = ?INIT_SCHEMA#{roots => [{root, Type}]},
Option = #{override_env => false},
Option = #{override_env => false, nullable => true},
#{<<"root">> := NewBody} = CheckFun(NewSchema, #{<<"root">> => Body}, Option),
NewBody;
%% TODO not support nest object check yet, please use ref!

View File

@ -52,8 +52,8 @@
]).
%% callbacks for emqx_config_handler
-export([ pre_config_update/2
, post_config_update/4
-export([ pre_config_update/3
, post_config_update/5
]).
-type atom_or_bin() :: atom() | binary().
@ -246,10 +246,11 @@ bin(B) when is_binary(B) ->
%% Config Handler
%%--------------------------------------------------------------------
-spec pre_config_update(emqx_config:update_request(),
-spec pre_config_update(list(atom()),
emqx_config:update_request(),
emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
pre_config_update({load_gateway, GwName, Conf}, RawConf) ->
pre_config_update(_, {load_gateway, GwName, Conf}, RawConf) ->
case maps:get(GwName, RawConf, undefined) of
undefined ->
NConf = tune_gw_certs(fun convert_certs/2, GwName, Conf),
@ -257,7 +258,7 @@ pre_config_update({load_gateway, GwName, Conf}, RawConf) ->
_ ->
{error, already_exist}
end;
pre_config_update({update_gateway, GwName, Conf}, RawConf) ->
pre_config_update(_, {update_gateway, GwName, Conf}, RawConf) ->
case maps:get(GwName, RawConf, undefined) of
undefined ->
{error, not_found};
@ -266,14 +267,14 @@ pre_config_update({update_gateway, GwName, Conf}, RawConf) ->
<<"authentication">>], Conf),
{ok, emqx_map_lib:deep_merge(RawConf, #{GwName => NConf})}
end;
pre_config_update({unload_gateway, GwName}, RawConf) ->
pre_config_update(_, {unload_gateway, GwName}, RawConf) ->
_ = tune_gw_certs(fun clear_certs/2,
GwName,
maps:get(GwName, RawConf, #{})
),
{ok, maps:remove(GwName, RawConf)};
pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
pre_config_update(_, {add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
undefined ->
@ -285,7 +286,7 @@ pre_config_update({add_listener, GwName, {LType, LName}, Conf}, RawConf) ->
_ ->
{error, already_exist}
end;
pre_config_update({update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
pre_config_update(_, {update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName], RawConf, undefined) of
undefined ->
@ -298,7 +299,7 @@ pre_config_update({update_listener, GwName, {LType, LName}, Conf}, RawConf) ->
#{GwName => #{<<"listeners">> => NListener}})}
end;
pre_config_update({remove_listener, GwName, {LType, LName}}, RawConf) ->
pre_config_update(_, {remove_listener, GwName, {LType, LName}}, RawConf) ->
Path = [GwName, <<"listeners">>, LType, LName],
case emqx_map_lib:deep_get(Path, RawConf, undefined) of
undefined ->
@ -308,7 +309,7 @@ pre_config_update({remove_listener, GwName, {LType, LName}}, RawConf) ->
{ok, emqx_map_lib:deep_remove(Path, RawConf)}
end;
pre_config_update({add_authn, GwName, Conf}, RawConf) ->
pre_config_update(_, {add_authn, GwName, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"authentication">>], RawConf, undefined) of
undefined ->
@ -318,7 +319,7 @@ pre_config_update({add_authn, GwName, Conf}, RawConf) ->
_ ->
{error, already_exist}
end;
pre_config_update({add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
pre_config_update(_, {add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName],
RawConf, undefined) of
@ -336,7 +337,7 @@ pre_config_update({add_authn, GwName, {LType, LName}, Conf}, RawConf) ->
{error, already_exist}
end
end;
pre_config_update({update_authn, GwName, Conf}, RawConf) ->
pre_config_update(_, {update_authn, GwName, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"authentication">>], RawConf, undefined) of
undefined ->
@ -346,7 +347,7 @@ pre_config_update({update_authn, GwName, Conf}, RawConf) ->
RawConf,
#{GwName => #{<<"authentication">> => Conf}})}
end;
pre_config_update({update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
pre_config_update(_, {update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
case emqx_map_lib:deep_get(
[GwName, <<"listeners">>, LType, LName],
RawConf, undefined) of
@ -368,22 +369,24 @@ pre_config_update({update_authn, GwName, {LType, LName}, Conf}, RawConf) ->
{ok, emqx_map_lib:deep_merge(RawConf, NGateway)}
end
end;
pre_config_update({remove_authn, GwName}, RawConf) ->
pre_config_update(_, {remove_authn, GwName}, RawConf) ->
{ok, emqx_map_lib:deep_remove(
[GwName, <<"authentication">>], RawConf)};
pre_config_update({remove_authn, GwName, {LType, LName}}, RawConf) ->
pre_config_update(_, {remove_authn, GwName, {LType, LName}}, RawConf) ->
Path = [GwName, <<"listeners">>, LType, LName, <<"authentication">>],
{ok, emqx_map_lib:deep_remove(Path, RawConf)};
pre_config_update(UnknownReq, _RawConf) ->
pre_config_update(_, UnknownReq, _RawConf) ->
logger:error("Unknown configuration update request: ~0p", [UnknownReq]),
{error, badreq}.
-spec post_config_update(emqx_config:update_request(), emqx_config:config(),
-spec post_config_update(list(atom()),
emqx_config:update_request(),
emqx_config:config(),
emqx_config:config(), emqx_config:app_envs())
-> ok | {ok, Result::any()} | {error, Reason::term()}.
post_config_update(Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
post_config_update(_, Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
[_Tag, GwName0 | _] = tuple_to_list(Req),
GwName = binary_to_existing_atom(GwName0),
@ -398,7 +401,7 @@ post_config_update(Req, NewConfig, OldConfig, _AppEnvs) when is_tuple(Req) ->
{New, Old} when is_map(New), is_map(Old) ->
emqx_gateway:update(GwName, New)
end;
post_config_update(_Req, _NewConfig, _OldConfig, _AppEnvs) ->
post_config_update(_, _Req, _NewConfig, _OldConfig, _AppEnvs) ->
ok.
%%--------------------------------------------------------------------

View File

@ -84,7 +84,7 @@
% , inc_counter/3 %% increment the counter by a given integer
]).
-define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => false}).
-define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}).
-optional_callbacks([ on_query/4
, on_health_check/2

View File

@ -25,7 +25,7 @@
-export([start_link/0]).
-export([ post_config_update/4
-export([ post_config_update/5
, config_key_path/0
]).
@ -81,7 +81,7 @@ start_link() ->
%%------------------------------------------------------------------------------
%% The config handler for emqx_rule_engine
%%------------------------------------------------------------------------------
post_config_update(_Req, NewRules, OldRules, _AppEnvs) ->
post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
#{added := Added, removed := Removed, changed := Updated}
= emqx_map_lib:diff_maps(NewRules, OldRules),
maps_foreach(fun({Id, {_Old, New}}) ->

View File

@ -68,7 +68,7 @@ reload() ->
ok = emqx_rule_engine:load_hooks_for_rule(Rule)
end, emqx_rule_engine:get_rules()).
load(<<"$bridges/", _ChannelId/binary>> = BridgeTopic) ->
load(<<"$bridges/", _BridgeId/binary>> = BridgeTopic) ->
emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received,
[#{bridge_topic => BridgeTopic}]});
load(Topic) ->
@ -114,11 +114,15 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
on_session_subscribed(ClientInfo, Topic, SubOpts, Env) ->
apply_event('session.subscribed',
fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env).
fun() ->
eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts)
end, Env).
on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env) ->
apply_event('session.unsubscribed',
fun() -> eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts) end, Env).
fun() ->
eventmsg_sub_or_unsub('session.unsubscribed', ClientInfo, Topic, SubOpts)
end, Env).
on_message_dropped(Message, _, Reason, Env) ->
case ignore_sys_message(Message) of
@ -151,7 +155,8 @@ on_message_acked(ClientInfo, Message, Env) ->
%% Event Messages
%%--------------------------------------------------------------------
eventmsg_publish(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) ->
eventmsg_publish(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags,
topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) ->
with_basic_columns('message.publish',
#{id => emqx_guid:to_hexstr(Id),
clientid => ClientId,
@ -236,7 +241,8 @@ eventmsg_sub_or_unsub(Event, _ClientInfo = #{
qos => QoS
}).
eventmsg_dropped(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}, Reason) ->
eventmsg_dropped(Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags,
topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}, Reason) ->
with_basic_columns('message.dropped',
#{id => emqx_guid:to_hexstr(Id),
reason => Reason,
@ -257,7 +263,9 @@ eventmsg_delivered(_ClientInfo = #{
peerhost := PeerHost,
clientid := ReceiverCId,
username := ReceiverUsername
}, Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) ->
}, Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags,
topic = Topic, headers = Headers, payload = Payload,
timestamp = Timestamp}) ->
with_basic_columns('message.delivered',
#{id => emqx_guid:to_hexstr(Id),
from_clientid => ClientId,
@ -279,7 +287,10 @@ eventmsg_acked(_ClientInfo = #{
peerhost := PeerHost,
clientid := ReceiverCId,
username := ReceiverUsername
}, Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, headers = Headers, payload = Payload, timestamp = Timestamp}) ->
},
Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags,
topic = Topic, headers = Headers, payload = Payload,
timestamp = Timestamp}) ->
with_basic_columns('message.acked',
#{id => emqx_guid:to_hexstr(Id),
from_clientid => ClientId,
@ -455,37 +466,9 @@ columns_with_exam('message.publish') ->
, {<<"node">>, node()}
];
columns_with_exam('message.delivered') ->
[ {<<"event">>, 'message.delivered'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_message_ack_delivered('message.delivered');
columns_with_exam('message.acked') ->
[ {<<"event">>, 'message.acked'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_message_ack_delivered('message.acked');
columns_with_exam('message.dropped') ->
[ {<<"event">>, 'message.dropped'}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
@ -530,7 +513,12 @@ columns_with_exam('client.disconnected') ->
, {<<"node">>, node()}
];
columns_with_exam('session.subscribed') ->
[ {<<"event">>, 'session.subscribed'}
columns_message_sub_unsub('session.subscribed');
columns_with_exam('session.unsubscribed') ->
columns_message_sub_unsub('session.unsubscribed').
columns_message_sub_unsub(EventName) ->
[ {<<"event">>, EventName}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
@ -538,14 +526,21 @@ columns_with_exam('session.subscribed') ->
, {<<"qos">>, 1}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('session.unsubscribed') ->
[ {<<"event">>, 'session.unsubscribed'}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
].
columns_message_ack_delivered(EventName) ->
[ {<<"event">>, EventName}
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
, {<<"from_clientid">>, <<"c_emqx_1">>}
, {<<"from_username">>, <<"u_emqx_1">>}
, {<<"clientid">>, <<"c_emqx_2">>}
, {<<"username">>, <<"u_emqx_2">>}
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
, {<<"peerhost">>, <<"192.168.0.10">>}
, {<<"topic">>, <<"t/a">>}
, {<<"qos">>, 1}
, {<<"flags">>, #{}}
, {<<"publish_received_at">>, erlang:system_time(millisecond)}
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
].