fix(bridges): start mqtt bridge failed

This commit is contained in:
Shawn 2021-09-06 19:09:48 +08:00
parent 9ed90ba7a9
commit 39bb1b8d9d
10 changed files with 170 additions and 151 deletions

View File

@ -26,20 +26,19 @@ bridges.mqtt.my_mqtt_bridge {
certfile = "{{ platform_etc_dir }}/certs/client-cert.pem"
cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
}
in [{
in: [{
from_remote_topic = "msg/#"
to_local_topic = "from_aws/${topic}"
payload_template = "${message}"
qos = 1
}]
out [{
out: [{
from_local_topic = "msg/#"
to_remote_topic = "from_emqx/${topic}"
payload_template = "${message}"
}]
}
# {name: "mysql_bridge_1"
# type: mysql
# config: {

View File

@ -6,7 +6,8 @@
{applications,
[kernel,
stdlib,
emqx
emqx,
emqx_connector
]},
{env,[]},
{modules, []},

View File

@ -45,7 +45,8 @@ bridge_type(emqx_connector_redis) -> redis;
bridge_type(emqx_connector_ldap) -> ldap.
name_to_resource_id(BridgeName) ->
<<"bridge:", BridgeName/binary>>.
Name = bin(BridgeName),
<<"bridge:", Name/binary>>.
resource_id_to_name(<<"bridge:", BridgeName/binary>> = _ResourceId) ->
BridgeName.
@ -63,3 +64,7 @@ config_key_path() ->
update_config(ConfigReq) ->
emqx:update_config(config_key_path(), ConfigReq).
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).

View File

@ -65,8 +65,8 @@ code_change(_OldVsn, State, _Extra) ->
%%============================================================================
load_bridges(Configs) ->
lists:foreach(fun(Type, NamedConf) ->
lists:foreach(fun(Name, Conf) ->
lists:foreach(fun({Type, NamedConf}) ->
lists:foreach(fun({Name, Conf}) ->
load_bridge(Name, Type, Conf)
end, maps:to_list(NamedConf))
end, maps:to_list(Configs)).

View File

@ -8,10 +8,10 @@
roots() -> ["bridges"].
fields("bridges") ->
[{mqtt, hoconsc:ref("mqtt")}];
[{mqtt, hoconsc:ref(?MODULE, "mqtt")}];
fields("mqtt") ->
[{"?name"}, hoconsc:ref("mqtt_briage")];
[{"$name", hoconsc:ref(?MODULE, "mqtt_bridge")}];
fields("mqtt_briage") ->
fields("mqtt_bridge") ->
emqx_connector_mqtt:fields("config").

View File

@ -21,6 +21,7 @@
-export([ start/1
, send/2
, stop/1
, ping/1
]).
-export([ ensure_subscribed/3
@ -86,6 +87,9 @@ stop(#{client_pid := Pid}) ->
safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000),
ok.
ping(#{client_pid := Pid}) ->
emqtt:ping(Pid).
ensure_subscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic, QoS) when is_pid(Pid) ->
case emqtt:subscribe(Pid, Topic, QoS) of
{ok, _, _} -> Conn#{subscriptions => [{Topic, QoS}|Subs]};

View File

@ -45,8 +45,7 @@ init([]) ->
{ok, {SupFlag, []}}.
bridge_spec(Config) ->
Name = list_to_atom(maps:get(name, Config)),
#{id => Name,
#{id => maps:get(name, Config),
start => {emqx_bridge_worker, start_link, [Config]},
restart => permanent,
shutdown => 5000,

View File

@ -86,6 +86,7 @@
-export([ ensure_started/1
, ensure_stopped/1
, status/1
, ping/1
]).
-export([ get_forwards/1
@ -169,6 +170,11 @@ status(Pid) when is_pid(Pid) ->
status(Name) ->
gen_statem:call(name(Name), status).
ping(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, ping);
ping(Name) ->
gen_statem:call(name(Name), ping).
%% @doc Return all forwards (local subscriptions).
-spec get_forwards(id()) -> [topic()].
get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(1000)).
@ -311,6 +317,10 @@ connected(Type, Content, State) ->
%% Common handlers
common(StateName, {call, From}, status, _State) ->
{keep_state_and_data, [{reply, From, StateName}]};
common(_StateName, {call, From}, ping, #{connection := Conn,
connect_module := ConnectModule} =_State) ->
Reply = ConnectModule:ping(Conn),
{keep_state_and_data, [{reply, From, Reply}]};
common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) ->
{keep_state_and_data, [{reply, From, ok}]};
common(_StateName, {call, From}, ensure_stopped, #{connection := Conn,

View File

@ -36,7 +36,7 @@ roots() ->
[{config, #{type => hoconsc:ref(?MODULE, "config")}}].
fields("config") ->
[ {server, emqx_schema:t(string(), undefined, "127.0.0.1:1883")}
[ {server, emqx_schema:t(emqx_schema:ip_port(), undefined, "127.0.0.1:1883")}
, {reconnect_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")}
, {proto_ver, fun proto_ver/1}
, {bridge_mode, emqx_schema:t(boolean(), undefined, true)}
@ -48,8 +48,8 @@ fields("config") ->
, {retry_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")}
, {max_inflight, emqx_schema:t(integer(), undefined, 32)}
, {replayq, emqx_schema:t(hoconsc:ref(?MODULE, "replayq"))}
, {in, hoconsc:array("in")}
, {out, hoconsc:array("out")}
, {in, hoconsc:array(hoconsc:ref(?MODULE, "in"))}
, {out, hoconsc:array(hoconsc:ref(?MODULE, "out"))}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("in") ->
@ -68,7 +68,7 @@ fields("out") ->
fields("replayq") ->
[ {dir, hoconsc:union([boolean(), string()])}
, {seg_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "100MB")}
, {offload_mode, emqx_schema:t(boolean(), undefined, false)}
, {offload, emqx_schema:t(boolean(), undefined, false)}
, {max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")}
].
@ -93,8 +93,9 @@ on_start(InstId, #{server := Server,
out := Out,
ssl := #{enable := EnableSsl} = Ssl} = Conf) ->
logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]),
BridgeName = binary_to_atom(InstId, latin1),
BridgeConf = Conf#{
name => InstId,
name => BridgeName,
config => #{
conn_type => mqtt,
subscriptions => In,
@ -119,9 +120,9 @@ on_start(InstId, #{server := Server,
},
case emqx_bridge_mqtt_sup:create_bridge(BridgeConf) of
{ok, _Pid} ->
{ok, #{}};
start_bridge(BridgeName);
{error, {already_started, _Pid}} ->
{ok, #{}};
start_bridge(BridgeName);
{error, Reason} ->
{error, Reason}
end.
@ -137,5 +138,11 @@ on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) ->
on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) ->
logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]).
on_health_check(InstId, #{}) ->
emqx_bridge_mqtt_sup:try_ping(InstId).
on_health_check(_InstId, #{bridge_worker := Worker}) ->
{ok, emqx_bridge_worker:ping(Worker)}.
start_bridge(Name) ->
case emqx_bridge_worker:ensure_started(Name) of
ok -> {ok, #{bridge_name => Name}};
{error, Reason} -> {error, Reason}
end.

View File

@ -53,24 +53,18 @@
-export([roots/0, fields/1]).
roots() -> [ssl_on, ssl_off].
roots() -> ["ssl"].
fields(ssl_on) ->
[ {enable, #{type => true}}
fields("ssl") ->
[ {enable, #{type => boolean(), default => false}}
, {cacertfile, fun cacertfile/1}
, {keyfile, fun keyfile/1}
, {certfile, fun certfile/1}
, {verify, fun verify/1}
];
fields(ssl_off) ->
[ {enable, #{type => false}} ].
].
ssl_fields() ->
[ {ssl, #{type => hoconsc:union(
[ hoconsc:ref(?MODULE, ssl_on)
, hoconsc:ref(?MODULE, ssl_off)
]),
[ {ssl, #{type => hoconsc:ref(?MODULE, "ssl"),
default => #{<<"enable">> => false}
}
}