From e2721c144c5c5d0505f0ca5f727c0db88e32a0b2 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 27 Sep 2021 00:26:34 +0800 Subject: [PATCH] feat(bridge): support http bridge --- apps/emqx/src/emqx_config_handler.erl | 2 +- apps/emqx_bridge/etc/emqx_bridge.conf | 27 ++++ apps/emqx_bridge/src/emqx_bridge.erl | 118 ++++++++++++++---- apps/emqx_bridge/src/emqx_bridge_app.erl | 2 + apps/emqx_bridge/src/emqx_bridge_schema.erl | 15 ++- .../src/emqx_connector_http.erl | 110 ++++++++++++++-- .../src/emqx_connector_mqtt.erl | 6 +- .../src/emqx_rule_runtime.erl | 4 +- 8 files changed, 245 insertions(+), 39 deletions(-) diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index e47bb489e..db2376784 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -77,7 +77,7 @@ stop() -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update_config(SchemaModule, ConfKeyPath, UpdateArgs) -> ?ATOM_CONF_PATH(ConfKeyPath, gen_server:call(?MODULE, {change_config, SchemaModule, - AtomKeyPath, UpdateArgs}), {error, ConfKeyPath}). + AtomKeyPath, UpdateArgs}), {error, {not_found, ConfKeyPath}}). -spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok. add_handler(ConfKeyPath, HandlerName) -> diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index e8af40341..f26172ef6 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -45,3 +45,30 @@ # retain = false # } #} +# +#bridges.http.my_http_bridge { +# base_url: "http://localhost:9901" +# connect_timeout: "30s" +# max_retries: 3 +# retry_interval = "10s" +# pool_type = "random" +# pool_size = 4 +# enable_pipelining = true +# ssl { +# enable = false +# keyfile = "{{ platform_etc_dir }}/certs/client-key.pem" +# certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" +# cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" +# } +# egress_channels.post_messages { +# subscribe_local_topic = "emqx_http/#" +# request_timeout: "30s" +# ## following config entries can use placehodler variables +# method = post +# path = "/messages/${topic}" +# body = "${payload}" +# headers { +# "content-type": "application/json" +# } +# } +#} diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index e3458adca..402c4f597 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -15,9 +15,15 @@ %%-------------------------------------------------------------------- -module(emqx_bridge). -behaviour(emqx_config_handler). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). -export([post_config_update/4]). +-export([reload_hook/0, unload_hook/0]). + +-export([on_message_publish/1]). + -export([ load_bridges/0 , get_bridge/2 , get_bridge/3 @@ -28,6 +34,7 @@ , start_bridge/2 , stop_bridge/2 , restart_bridge/2 + , send_message/2 ]). -export([ config_key_path/0 @@ -38,24 +45,57 @@ , resource_id/1 , resource_id/2 , parse_bridge_id/1 + , channel_id/4 + , parse_channel_id/1 ]). +reload_hook() -> + unload_hook(), + Bridges = emqx:get_config([bridges], #{}), + lists:foreach(fun({_Type, Bridge}) -> + lists:foreach(fun({_Name, BridgeConf}) -> + load_hook(BridgeConf) + end, maps:to_list(Bridge)) + end, maps:to_list(Bridges)). + +load_hook(#{egress_channels := Channels}) -> + case has_subscribe_local_topic(Channels) of + true -> ok; + false -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}) + end; +load_hook(_Conf) -> ok. + +unload_hook() -> + ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}). + +on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> + case maps:get(sys, Flags, false) of + false -> + ChannelIds = get_matched_channels(Topic), + lists:foreach(fun(ChannelId) -> + send_message(ChannelId, emqx_message:to_map(Message)) + end, ChannelIds); + true -> ok + end, + {ok, Message}. + +%% TODO: remove this clause, treat mqtt bridges the same as other bridges +send_message(ChannelId, Message) -> + {BridgeType, BridgeName, _, _} = parse_channel_id(ChannelId), + ResId = emqx_bridge:resource_id(BridgeType, BridgeName), + do_send_message(ResId, ChannelId, Message). + +do_send_message(ResId, ChannelId, Message) -> + emqx_resource:query(ResId, {send_message, ChannelId, Message}). + config_key_path() -> [bridges]. resource_type(mqtt) -> emqx_connector_mqtt; -resource_type(mysql) -> emqx_connector_mysql; -resource_type(pgsql) -> emqx_connector_pgsql; -resource_type(mongo) -> emqx_connector_mongo; -resource_type(redis) -> emqx_connector_redis; -resource_type(ldap) -> emqx_connector_ldap. +resource_type(http) -> emqx_connector_http. bridge_type(emqx_connector_mqtt) -> mqtt; -bridge_type(emqx_connector_mysql) -> mysql; -bridge_type(emqx_connector_pgsql) -> pgsql; -bridge_type(emqx_connector_mongo) -> mongo; -bridge_type(emqx_connector_redis) -> redis; -bridge_type(emqx_connector_ldap) -> ldap. +bridge_type(emqx_connector_http) -> http. post_config_update(_Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} @@ -100,11 +140,23 @@ bridge_id(BridgeType, BridgeName) -> <>. parse_bridge_id(BridgeId) -> - try - [Type, Name] = string:split(str(BridgeId), ":", leading), - {list_to_existing_atom(Type), list_to_atom(Name)} - catch - _ : _ -> error({invalid_bridge_id, BridgeId}) + case string:split(bin(BridgeId), ":", all) of + [Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)}; + _ -> error({invalid_bridge_id, BridgeId}) + end. + +channel_id(BridgeType, BridgeName, ChannelType, ChannelName) -> + BType = bin(BridgeType), + BName = bin(BridgeName), + CType = bin(ChannelType), + CName = bin(ChannelName), + <>. + +parse_channel_id(ChannelId) -> + case string:split(bin(ChannelId), ":", all) of + [BridgeType, BridgeName, ChannelType, ChannelName] -> + {BridgeType, BridgeName, ChannelType, ChannelName}; + _ -> error({invalid_bridge_id, ChannelId}) end. list_bridges() -> @@ -184,13 +236,35 @@ flatten_confs(Conf0) -> do_flatten_confs(Type, Conf0) -> [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. +has_subscribe_local_topic(Channels) -> + lists:any(fun (#{subscribe_local_topic := _}) -> true; + (_) -> false + end, maps:to_list(Channels)). + +get_matched_channels(Topic) -> + Bridges = emqx:get_config([bridges], #{}), + maps:fold(fun + %% TODO: also trigger 'message.publish' for mqtt bridges. + (mqtt, _Conf, Acc0) -> Acc0; + (BType, Conf, Acc0) -> + maps:fold(fun + (BName, #{egress_channels := Channels}, Acc1) -> + do_get_matched_channels(Topic, Channels, BType, BName, egress_channels) + ++ Acc1; + (_Name, _BridgeConf, Acc1) -> Acc1 + end, Acc0, Conf) + end, [], Bridges). + +do_get_matched_channels(Topic, Channels, BType, BName, CType) -> + maps:fold(fun + (ChannName, #{subscribe_local_topic := Filter}, Acc) -> + case emqx_topic:match(Topic, Filter) of + true -> [channel_id(BType, BName, CType, ChannName) | Acc]; + false -> Acc + end; + (_ChannName, _ChannConf, Acc) -> Acc + end, [], Channels). + bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). - -str(A) when is_atom(A) -> - atom_to_list(A); -str(B) when is_binary(B) -> - binary_to_list(B); -str(S) when is_list(S) -> - S. diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index 004b32787..8cb325e20 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -22,10 +22,12 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_bridge_sup:start_link(), ok = emqx_bridge:load_bridges(), + ok = emqx_bridge:reload_hook(), emqx_config_handler:add_handler(emqx_bridge:config_key_path(), emqx_bridge), {ok, Sup}. stop(_State) -> + ok = emqx_bridge:unload_hook(), ok. %% internal functions \ No newline at end of file diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 87eb40372..2072d15ec 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -1,5 +1,7 @@ -module(emqx_bridge_schema). +-include_lib("typerefl/include/types.hrl"). + -export([roots/0, fields/1]). %%====================================================================================== @@ -8,7 +10,16 @@ roots() -> [bridges]. fields(bridges) -> - [{mqtt, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "mqtt_bridge")))}]; + [ {mqtt, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "mqtt_bridge")))} + , {http, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "http_bridge")))} + ]; fields("mqtt_bridge") -> - emqx_connector_mqtt:fields("config"). + emqx_connector_mqtt:fields("config"); + +fields("http_bridge") -> + emqx_connector_http:fields(config) ++ http_channels(). + +http_channels() -> + [{egress_channels, hoconsc:mk(hoconsc:map(id, + hoconsc:ref(emqx_connector_http, "http_request")))}]. diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 272f24556..2f4aa2af4 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -21,6 +21,8 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx_resource/include/emqx_resource_behaviour.hrl"). +-include_lib("emqx/include/logger.hrl"). + %% callbacks of behaviour emqx_resource -export([ on_start/2 , on_stop/2 @@ -38,7 +40,7 @@ -export([ check_ssl_opts/2 ]). --type connect_timeout() :: non_neg_integer() | infinity. +-type connect_timeout() :: emqx_schema:duration() | infinity. -type pool_type() :: random | hash. -reflect_type([ connect_timeout/0 @@ -50,6 +52,22 @@ roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. +fields("http_request") -> + [ {subscribe_local_topic, hoconsc:mk(binary())} + , {method, hoconsc:mk(method(), #{default => post})} + , {path, hoconsc:mk(binary(), #{default => <<"">>})} + , {headers, hoconsc:mk(map(), + #{default => #{ + <<"accept">> => <<"application/json">>, + <<"cache-control">> => <<"no-cache">>, + <<"connection">> => <<"keep-alive">>, + <<"content-type">> => <<"application/json">>, + <<"keep-alive">> => <<"timeout=5">>}}) + } + , {body, hoconsc:mk(binary(), #{default => <<"${payload}">>})} + , {request_timeout, hoconsc:mk(emqx_schema:duration_ms(), #{default => <<"30s">>})} + ]; + fields(config) -> [ {base_url, fun base_url/1} , {connect_timeout, fun connect_timeout/1} @@ -60,6 +78,13 @@ fields(config) -> , {enable_pipelining, fun enable_pipelining/1} ] ++ emqx_connector_schema_lib:ssl_fields(). +method() -> + hoconsc:union([ typerefl:atom(post) + , typerefl:atom(put) + , typerefl:atom(get) + , typerefl:atom(delete) + ]). + validations() -> [ {check_ssl_opts, fun check_ssl_opts/1} ]. @@ -79,7 +104,7 @@ max_retries(type) -> non_neg_integer(); max_retries(default) -> 5; max_retries(_) -> undefined. -retry_interval(type) -> emqx_schema:duration_ms(); +retry_interval(type) -> emqx_schema:duration(); retry_interval(default) -> "1s"; retry_interval(_) -> undefined. @@ -111,7 +136,7 @@ on_start(InstId, #{base_url := #{scheme := Scheme, {tcp, []}; https -> SSLOpts = emqx_plugin_libs_ssl:save_files_return_opts( - maps:get(ssl_opts, Config), "connectors", InstId), + maps:get(ssl, Config), "connectors", InstId), {tls, SSLOpts} end, NTransportOpts = emqx_misc:ipv6_probe(TransportOpts), @@ -126,16 +151,32 @@ on_start(InstId, #{base_url := #{scheme := Scheme, , {transport, Transport} , {transport_opts, NTransportOpts}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), - {ok, _} = ehttpc_sup:start_pool(PoolName, PoolOpts), - {ok, #{pool_name => PoolName, - host => Host, - port => Port, - base_path => BasePath}}. + State = #{ + pool_name => PoolName, + host => Host, + port => Port, + base_path => BasePath, + channels => preproc_channels(InstId, Config) + }, + case ehttpc_sup:start_pool(PoolName, PoolOpts) of + {ok, _} -> {ok, State}; + {error, {already_started, _}} -> {ok, State}; + {error, Reason} -> + {error, Reason} + end. on_stop(InstId, #{pool_name := PoolName}) -> logger:info("stopping http connector: ~p", [InstId]), ehttpc_sup:stop_pool(PoolName). +on_query(InstId, {send_message, ChannelId, Msg}, AfterQuery, #{channels := Channels} = State) -> + case maps:find(ChannelId, Channels) of + error -> ?SLOG(error, #{msg => "channel not found", channel_id => ChannelId}); + {ok, ChannConf} -> + #{method := Method, path := Path, body := Body, headers := Headers, + request_timeout := Timeout} = proc_channel_conf(ChannConf, Msg), + on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State) + end; on_query(InstId, {Method, Request}, AfterQuery, State) -> on_query(InstId, {undefined, Method, Request, 5000}, AfterQuery, State); on_query(InstId, {Method, Request, Timeout}, AfterQuery, State) -> @@ -169,6 +210,52 @@ on_health_check(_InstId, #{host := Host, port := Port} = State) -> %% Internal functions %%-------------------------------------------------------------------- +preproc_channels(<<"bridge:", BridgeId/binary>>, Config) -> + {BridgeType, BridgeName} = emqx_bridge:parse_bridge_id(BridgeId), + maps:fold(fun(ChannName, ChannConf, Acc) -> + Acc#{emqx_bridge:channel_id(BridgeType, BridgeName, egress_channels, ChannName) => + preproc_channel_conf(ChannConf)} + end, #{}, maps:get(egress_channels, Config, #{})). + +preproc_channel_conf(#{ + method := Method, + path := Path, + body := Body, + headers := Headers} = Conf) -> + Conf#{ method => emqx_plugin_libs_rule:preproc_tmpl(bin(Method)) + , path => emqx_plugin_libs_rule:preproc_tmpl(Path) + , body => emqx_plugin_libs_rule:preproc_tmpl(Body) + , headers => preproc_headers(Headers) + }. + +preproc_headers(Headers) -> + maps:fold(fun(K, V, Acc) -> + Acc#{emqx_plugin_libs_rule:preproc_tmpl(bin(K)) => + emqx_plugin_libs_rule:preproc_tmpl(bin(V))} + end, #{}, Headers). + +proc_channel_conf(#{ + method := MethodTks, + path := PathTks, + body := BodyTks, + headers := HeadersTks} = Conf, Msg) -> + Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)) + , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg) + , body => emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg) + , headers => maps:to_list(proc_headers(HeadersTks, Msg)) + }. + +proc_headers(HeaderTks, Msg) -> + maps:fold(fun(K, V, Acc) -> + Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) => + emqx_plugin_libs_rule:proc_tmpl(V, Msg)} + end, #{}, HeaderTks). + +make_method(M) when M == <<"POST">>; M == <<"post">> -> post; +make_method(M) when M == <<"PUT">>; M == <<"put">> -> put; +make_method(M) when M == <<"GET">>; M == <<"get">> -> get; +make_method(M) when M == <<"DELETE">>; M == <<"delete">> -> delete. + check_ssl_opts(Conf) -> check_ssl_opts("base_url", Conf). @@ -185,3 +272,10 @@ update_path(BasePath, {Path, Headers}) -> {filename:join(BasePath, Path), Headers}; update_path(BasePath, {Path, Headers, Body}) -> {filename:join(BasePath, Path), Headers, Body}. + +bin(Bin) when is_binary(Bin) -> + Bin; +bin(Str) when is_list(Str) -> + list_to_binary(Str); +bin(Atom) when is_atom(Atom) -> + atom_to_binary(Atom, utf8). diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 431e94b1e..424933ae4 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -121,9 +121,9 @@ on_stop(InstId, #{channels := NameList}) -> on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix, baisc_conf := BasicConf}) -> create_channel(Conf, Prefix, BasicConf); -on_query(_InstId, {send_to_remote, ChannelName, Msg}, _AfterQuery, _State) -> - logger:debug("send msg to remote node on channel: ~p, msg: ~p", [ChannelName, Msg]), - emqx_connector_mqtt_worker:send_to_remote(ChannelName, Msg). +on_query(_InstId, {send_message, ChannelId, Msg}, _AfterQuery, _State) -> + logger:debug("send msg to remote node on channel: ~p, msg: ~p", [ChannelId, Msg]), + emqx_connector_mqtt_worker:send_to_remote(ChannelId, Msg). on_health_check(_InstId, #{channels := NameList} = State) -> Results = [{Name, emqx_connector_mqtt_worker:ping(Name)} || Name <- NameList], diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index a7be19b54..836b545a2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -240,9 +240,7 @@ handle_output(OutId, Selected, Envs) -> do_handle_output(#{type := bridge, target := ChannelId}, Selected, _Envs) -> ?LOG(debug, "output to bridge: ~p", [ChannelId]), - [Type, BridgeName | _] = string:split(ChannelId, ":", all), - ResId = emqx_bridge:resource_id(<>), - emqx_resource:query(ResId, {send_to_remote, ChannelId, Selected}); + emqx_bridge:send_message(ChannelId, Selected); do_handle_output(#{type := func, target := Func} = Out, Selected, Envs) -> erlang:apply(Func, [Selected, Envs, maps:get(args, Out, #{})]); do_handle_output(#{type := builtin, target := Output} = Out, Selected, Envs)