From 39bb1b8d9dd6ef89666e8af396cf6e6befb1615a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 6 Sep 2021 19:09:48 +0800 Subject: [PATCH] fix(bridges): start mqtt bridge failed --- apps/emqx_bridge/etc/emqx_bridge.conf | 243 +++++++++--------- apps/emqx_bridge/src/emqx_bridge.app.src | 3 +- apps/emqx_bridge/src/emqx_bridge.erl | 7 +- apps/emqx_bridge/src/emqx_bridge_monitor.erl | 4 +- apps/emqx_bridge/src/emqx_bridge_schema.erl | 6 +- .../emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl | 4 + .../src/emqx_bridge_mqtt_sup.erl | 3 +- .../src/emqx_bridge_worker.erl | 10 + .../src/emqx_connector_mqtt.erl | 25 +- .../src/emqx_connector_schema_lib.erl | 16 +- 10 files changed, 170 insertions(+), 151 deletions(-) diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index 663ae6586..7a2a3512e 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -26,137 +26,136 @@ bridges.mqtt.my_mqtt_bridge { certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" } - in [{ + in: [{ from_remote_topic = "msg/#" to_local_topic = "from_aws/${topic}" payload_template = "${message}" qos = 1 }] - out [{ + out: [{ from_local_topic = "msg/#" to_remote_topic = "from_emqx/${topic}" payload_template = "${message}" }] } - # {name: "mysql_bridge_1" - # type: mysql - # config: { - # server: "192.168.0.172:3306" - # database: mqtt - # pool_size: 1 - # username: root - # password: public - # auto_reconnect: true - # ssl: false - # } - # } - # , {name: "pgsql_bridge_1" - # type: pgsql - # config: { - # server: "192.168.0.172:5432" - # database: mqtt - # pool_size: 1 - # username: root - # password: public - # auto_reconnect: true - # ssl: false - # } - # } - # , {name: "mongodb_bridge_single" - # type: mongo - # config: { - # servers: "192.168.0.172:27017" - # mongo_type: single - # pool_size: 1 - # login: root - # password: public - # auth_source: mqtt - # database: mqtt - # ssl: false - # } - # } - # ,{name: "mongodb_bridge_rs" - # type: mongo - # config: { - # servers: "127.0.0.1:27017" - # mongo_type: rs - # rs_set_name: rs_name - # pool_size: 1 - # login: root - # password: public - # auth_source: mqtt - # database: mqtt - # ssl: false - # } - # } - # ,{name: "mongodb_bridge_shared" - # type: mongo - # config: { - # servers: "127.0.0.1:27017" - # mongo_type: shared - # pool_size: 1 - # login: root - # password: public - # auth_source: mqtt - # database: mqtt - # ssl: false - # max_overflow: 1 - # overflow_ttl: - # overflow_check_period: 10s - # local_threshold_ms: 10s - # connect_timeout_ms: 10s - # socket_timeout_ms: 10s - # server_selection_timeout_ms: 10s - # wait_queue_timeout_ms: 10s - # heartbeat_frequency_ms: 10s - # min_heartbeat_frequency_ms: 10s - # } - # } - # , {name: "redis_bridge_single" - # type: redis - # config: { - # servers: "192.168.0.172:6379" - # redis_type: single - # pool_size: 1 - # database: 0 - # password: public - # auto_reconnect: true - # ssl: false - # } - # } - # ,{name: "redis_bridge_sentinel" - # type: redis - # config: { - # servers: "127.0.0.1:6379, 127.0.0.2:6379, 127.0.0.3:6379" - # redis_type: sentinel - # sentinel_name: mymaster - # pool_size: 1 - # database: 0 - # ssl: false - # } - # } - # ,{name: "redis_bridge_cluster" - # type: redis - # config: { - # servers: "127.0.0.1:6379, 127.0.0.2:6379, 127.0.0.3:6379" - # redis_type: cluster - # pool_size: 1 - # database: 0 - # password: "public" - # ssl: false - # } - # } - # , {name: "ldap_bridge_1" - # type: ldap - # config: { - # servers: "192.168.0.172" - # port: 389 - # bind_dn: "cn=root,dc=emqx,dc=io" - # bind_password: "public" - # timeout: 30s - # pool_size: 1 - # ssl: false - # } - # } \ No newline at end of file +# type: mysql +# config: { +# server: "192.168.0.172:3306" +# database: mqtt +# pool_size: 1 +# username: root +# password: public +# auto_reconnect: true +# ssl: false +# } +# } +# , {name: "pgsql_bridge_1" +# type: pgsql +# config: { +# server: "192.168.0.172:5432" +# database: mqtt +# pool_size: 1 +# username: root +# password: public +# auto_reconnect: true +# ssl: false +# } +# } +# , {name: "mongodb_bridge_single" +# type: mongo +# config: { +# servers: "192.168.0.172:27017" +# mongo_type: single +# pool_size: 1 +# login: root +# password: public +# auth_source: mqtt +# database: mqtt +# ssl: false +# } +# } +# ,{name: "mongodb_bridge_rs" +# type: mongo +# config: { +# servers: "127.0.0.1:27017" +# mongo_type: rs +# rs_set_name: rs_name +# pool_size: 1 +# login: root +# password: public +# auth_source: mqtt +# database: mqtt +# ssl: false +# } +# } +# ,{name: "mongodb_bridge_shared" +# type: mongo +# config: { +# servers: "127.0.0.1:27017" +# mongo_type: shared +# pool_size: 1 +# login: root +# password: public +# auth_source: mqtt +# database: mqtt +# ssl: false +# max_overflow: 1 +# overflow_ttl: +# overflow_check_period: 10s +# local_threshold_ms: 10s +# connect_timeout_ms: 10s +# socket_timeout_ms: 10s +# server_selection_timeout_ms: 10s +# wait_queue_timeout_ms: 10s +# heartbeat_frequency_ms: 10s +# min_heartbeat_frequency_ms: 10s +# } +# } +# , {name: "redis_bridge_single" +# type: redis +# config: { +# servers: "192.168.0.172:6379" +# redis_type: single +# pool_size: 1 +# database: 0 +# password: public +# auto_reconnect: true +# ssl: false +# } +# } +# ,{name: "redis_bridge_sentinel" +# type: redis +# config: { +# servers: "127.0.0.1:6379, 127.0.0.2:6379, 127.0.0.3:6379" +# redis_type: sentinel +# sentinel_name: mymaster +# pool_size: 1 +# database: 0 +# ssl: false +# } +# } +# ,{name: "redis_bridge_cluster" +# type: redis +# config: { +# servers: "127.0.0.1:6379, 127.0.0.2:6379, 127.0.0.3:6379" +# redis_type: cluster +# pool_size: 1 +# database: 0 +# password: "public" +# ssl: false +# } +# } +# , {name: "ldap_bridge_1" +# type: ldap +# config: { +# servers: "192.168.0.172" +# port: 389 +# bind_dn: "cn=root,dc=emqx,dc=io" +# bind_password: "public" +# timeout: 30s +# pool_size: 1 +# ssl: false +# } +# } diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 42fc245f5..9c0f6b779 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -6,7 +6,8 @@ {applications, [kernel, stdlib, - emqx + emqx, + emqx_connector ]}, {env,[]}, {modules, []}, diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 4e05f8e96..75ebfac0c 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -45,7 +45,8 @@ bridge_type(emqx_connector_redis) -> redis; bridge_type(emqx_connector_ldap) -> ldap. name_to_resource_id(BridgeName) -> - <<"bridge:", BridgeName/binary>>. + Name = bin(BridgeName), + <<"bridge:", Name/binary>>. resource_id_to_name(<<"bridge:", BridgeName/binary>> = _ResourceId) -> BridgeName. @@ -63,3 +64,7 @@ config_key_path() -> update_config(ConfigReq) -> emqx:update_config(config_key_path(), ConfigReq). + +bin(Bin) when is_binary(Bin) -> Bin; +bin(Str) when is_list(Str) -> list_to_binary(Str); +bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_bridge/src/emqx_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl index 4b3695615..d76af5fb9 100644 --- a/apps/emqx_bridge/src/emqx_bridge_monitor.erl +++ b/apps/emqx_bridge/src/emqx_bridge_monitor.erl @@ -65,8 +65,8 @@ code_change(_OldVsn, State, _Extra) -> %%============================================================================ load_bridges(Configs) -> - lists:foreach(fun(Type, NamedConf) -> - lists:foreach(fun(Name, Conf) -> + lists:foreach(fun({Type, NamedConf}) -> + lists:foreach(fun({Name, Conf}) -> load_bridge(Name, Type, Conf) end, maps:to_list(NamedConf)) end, maps:to_list(Configs)). diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index f651ce189..beb0f282c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -8,10 +8,10 @@ roots() -> ["bridges"]. fields("bridges") -> - [{mqtt, hoconsc:ref("mqtt")}]; + [{mqtt, hoconsc:ref(?MODULE, "mqtt")}]; fields("mqtt") -> - [{"?name"}, hoconsc:ref("mqtt_briage")]; + [{"$name", hoconsc:ref(?MODULE, "mqtt_bridge")}]; -fields("mqtt_briage") -> +fields("mqtt_bridge") -> emqx_connector_mqtt:fields("config"). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl index 2fd5447a5..5678c73ef 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl @@ -21,6 +21,7 @@ -export([ start/1 , send/2 , stop/1 + , ping/1 ]). -export([ ensure_subscribed/3 @@ -86,6 +87,9 @@ stop(#{client_pid := Pid}) -> safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000), ok. +ping(#{client_pid := Pid}) -> + emqtt:ping(Pid). + ensure_subscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic, QoS) when is_pid(Pid) -> case emqtt:subscribe(Pid, Topic, QoS) of {ok, _, _} -> Conn#{subscriptions => [{Topic, QoS}|Subs]}; diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl index 23ecdb5f9..26e6ee7c3 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl @@ -45,8 +45,7 @@ init([]) -> {ok, {SupFlag, []}}. bridge_spec(Config) -> - Name = list_to_atom(maps:get(name, Config)), - #{id => Name, + #{id => maps:get(name, Config), start => {emqx_bridge_worker, start_link, [Config]}, restart => permanent, shutdown => 5000, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index 11d9182d9..319d7054e 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -86,6 +86,7 @@ -export([ ensure_started/1 , ensure_stopped/1 , status/1 + , ping/1 ]). -export([ get_forwards/1 @@ -169,6 +170,11 @@ status(Pid) when is_pid(Pid) -> status(Name) -> gen_statem:call(name(Name), status). +ping(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, ping); +ping(Name) -> + gen_statem:call(name(Name), ping). + %% @doc Return all forwards (local subscriptions). -spec get_forwards(id()) -> [topic()]. get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(1000)). @@ -311,6 +317,10 @@ connected(Type, Content, State) -> %% Common handlers common(StateName, {call, From}, status, _State) -> {keep_state_and_data, [{reply, From, StateName}]}; +common(_StateName, {call, From}, ping, #{connection := Conn, + connect_module := ConnectModule} =_State) -> + Reply = ConnectModule:ping(Conn), + {keep_state_and_data, [{reply, From, Reply}]}; common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) -> {keep_state_and_data, [{reply, From, ok}]}; common(_StateName, {call, From}, ensure_stopped, #{connection := Conn, diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 15137039b..4210b66cf 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -36,7 +36,7 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, "config")}}]. fields("config") -> - [ {server, emqx_schema:t(string(), undefined, "127.0.0.1:1883")} + [ {server, emqx_schema:t(emqx_schema:ip_port(), undefined, "127.0.0.1:1883")} , {reconnect_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")} , {proto_ver, fun proto_ver/1} , {bridge_mode, emqx_schema:t(boolean(), undefined, true)} @@ -48,8 +48,8 @@ fields("config") -> , {retry_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")} , {max_inflight, emqx_schema:t(integer(), undefined, 32)} , {replayq, emqx_schema:t(hoconsc:ref(?MODULE, "replayq"))} - , {in, hoconsc:array("in")} - , {out, hoconsc:array("out")} + , {in, hoconsc:array(hoconsc:ref(?MODULE, "in"))} + , {out, hoconsc:array(hoconsc:ref(?MODULE, "out"))} ] ++ emqx_connector_schema_lib:ssl_fields(); fields("in") -> @@ -68,7 +68,7 @@ fields("out") -> fields("replayq") -> [ {dir, hoconsc:union([boolean(), string()])} , {seg_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "100MB")} - , {offload_mode, emqx_schema:t(boolean(), undefined, false)} + , {offload, emqx_schema:t(boolean(), undefined, false)} , {max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")} ]. @@ -93,8 +93,9 @@ on_start(InstId, #{server := Server, out := Out, ssl := #{enable := EnableSsl} = Ssl} = Conf) -> logger:info("starting mqtt connector: ~p, ~p", [InstId, Conf]), + BridgeName = binary_to_atom(InstId, latin1), BridgeConf = Conf#{ - name => InstId, + name => BridgeName, config => #{ conn_type => mqtt, subscriptions => In, @@ -119,9 +120,9 @@ on_start(InstId, #{server := Server, }, case emqx_bridge_mqtt_sup:create_bridge(BridgeConf) of {ok, _Pid} -> - {ok, #{}}; + start_bridge(BridgeName); {error, {already_started, _Pid}} -> - {ok, #{}}; + start_bridge(BridgeName); {error, Reason} -> {error, Reason} end. @@ -137,5 +138,11 @@ on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) -> on_query(InstId, {publish_to_remote, Msg}, _AfterQuery, _State) -> logger:debug("publish to remote node, connector: ~p, msg: ~p", [InstId, Msg]). -on_health_check(InstId, #{}) -> - emqx_bridge_mqtt_sup:try_ping(InstId). +on_health_check(_InstId, #{bridge_worker := Worker}) -> + {ok, emqx_bridge_worker:ping(Worker)}. + +start_bridge(Name) -> + case emqx_bridge_worker:ensure_started(Name) of + ok -> {ok, #{bridge_name => Name}}; + {error, Reason} -> {error, Reason} + end. diff --git a/apps/emqx_connector/src/emqx_connector_schema_lib.erl b/apps/emqx_connector/src/emqx_connector_schema_lib.erl index a6d33ffca..6dcc564af 100644 --- a/apps/emqx_connector/src/emqx_connector_schema_lib.erl +++ b/apps/emqx_connector/src/emqx_connector_schema_lib.erl @@ -53,24 +53,18 @@ -export([roots/0, fields/1]). -roots() -> [ssl_on, ssl_off]. +roots() -> ["ssl"]. -fields(ssl_on) -> - [ {enable, #{type => true}} +fields("ssl") -> + [ {enable, #{type => boolean(), default => false}} , {cacertfile, fun cacertfile/1} , {keyfile, fun keyfile/1} , {certfile, fun certfile/1} , {verify, fun verify/1} - ]; - -fields(ssl_off) -> - [ {enable, #{type => false}} ]. + ]. ssl_fields() -> - [ {ssl, #{type => hoconsc:union( - [ hoconsc:ref(?MODULE, ssl_on) - , hoconsc:ref(?MODULE, ssl_off) - ]), + [ {ssl, #{type => hoconsc:ref(?MODULE, "ssl"), default => #{<<"enable">> => false} } }