diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index e1d2d4be7..632a0a406 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -2,9 +2,11 @@ ## EMQ X Bridge ##-------------------------------------------------------------------- -#bridges.mqtt.my_mqtt_bridge_to_aws { +## MQTT bridges to/from another MQTT broker +#bridges.mqtt.my_mqtt_bridge_from_aws { # server = "127.0.0.1:1883" # proto_ver = "v4" +# clientid = "my_mqtt_bridge_from_aws" # username = "username1" # password = "" # clean_start = true @@ -25,50 +27,75 @@ # certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" # cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" # } -# ## We will create one MQTT connection for each element of the `ingress_channels` -# ## Syntax: ingress_channels. -# ingress_channels.pull_msgs_from_aws { -# subscribe_remote_topic = "aws/#" -# subscribe_qos = 1 -# local_topic = "from_aws/${topic}" -# payload = "${payload}" -# qos = "${qos}" -# retain = "${retain}" -# } -# ## We will create one MQTT connection for each element of the `egress_channels` -# ## Syntax: egress_channels. -# egress_channels.push_msgs_to_aws { -# subscribe_local_topic = "emqx/#" -# remote_topic = "from_emqx/${topic}" -# payload = "${payload}" -# qos = 1 -# retain = false -# } +# +# ## topic mappings for this bridge +# direction = in +# from_remote_topic = "aws/#" +# subscribe_qos = 1 +# to_local_topic = "from_aws/${topic}" +# payload = "${payload}" +# qos = "${qos}" +# retain = "${retain}" #} # -#bridges.http.my_http_bridge { -# base_url: "http://localhost:9901" -# connect_timeout: "30s" -# max_retries: 3 -# retry_interval = "10s" -# pool_type = "hash" -# pool_size = 4 -# enable_pipelining = true +#bridges.mqtt.my_mqtt_bridge_to_aws { +# server = "127.0.0.1:1883" +# proto_ver = "v4" +# clientid = "my_mqtt_bridge_to_aws" +# username = "username1" +# password = "" +# clean_start = true +# keepalive = 300 +# retry_interval = "30s" +# max_inflight = 32 +# reconnect_interval = "30s" +# bridge_mode = true +# replayq { +# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/" +# seg_bytes = "100MB" +# offload = false +# max_total_bytes = "1GB" +# } # ssl { # enable = false # keyfile = "{{ platform_etc_dir }}/certs/client-key.pem" # certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" # cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" # } -# egress_channels.post_messages { -# subscribe_local_topic = "emqx_http/#" -# request_timeout: "30s" -# ## following config entries can use placehodler variables -# method = post -# path = "/messages/${topic}" -# body = "${payload}" -# headers { -# "content-type": "application/json" -# } -# } +# +# ## topic mappings for this bridge +# direction = out +# from_local_topic = "emqx/#" +# to_remote_topic = "from_emqx/${topic}" +# payload = "${payload}" +# qos = 1 +# retain = false #} + +## HTTP bridges to a http server +bridges.http.my_http_bridge { + ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url string + url = "http://localhost:9901/messages/${topic}" + request_timeout = "30s" + connect_timeout = "30s" + max_retries = 3 + retry_interval = "10s" + pool_type = "random" + pool_size = 4 + enable_pipelining = true + ssl { + enable = false + keyfile = "{{ platform_etc_dir }}/certs/client-key.pem" + certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" + cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" + } + + from_local_topic = "emqx_http/#" + ## the following config entries can use placehodler variables: + ## url, method, body, headers + method = post + body = "${payload}" + headers { + "content-type": "application/json" + } +} diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index c07a5b842..8fbd87c64 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -45,8 +45,6 @@ , resource_id/1 , resource_id/2 , parse_bridge_id/1 - , channel_id/4 - , parse_channel_id/1 ]). reload_hook() -> @@ -58,11 +56,8 @@ reload_hook() -> end, maps:to_list(Bridge)) end, maps:to_list(Bridges)). -load_hook(#{egress_channels := Channels}) -> - case has_subscribe_local_topic(Channels) of - true -> ok; - false -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}) - end; +load_hook(#{from_local_topic := _}) -> + emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}); load_hook(_Conf) -> ok. unload_hook() -> @@ -71,28 +66,25 @@ unload_hook() -> on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> case maps:get(sys, Flags, false) of false -> - ChannelIds = get_matched_channels(Topic), - lists:foreach(fun(ChannelId) -> - send_message(ChannelId, emqx_message:to_map(Message)) - end, ChannelIds); + lists:foreach(fun (Id) -> + send_message(Id, emqx_message:to_map(Message)) + end, get_matched_bridges(Topic)); true -> ok end, {ok, Message}. -%% TODO: remove this clause, treat mqtt bridges the same as other bridges -send_message(ChannelId, Message) -> - {BridgeType, BridgeName, _, _} = parse_channel_id(ChannelId), +send_message(BridgeId, Message) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), ResId = emqx_bridge:resource_id(BridgeType, BridgeName), - do_send_message(ResId, ChannelId, Message). - -do_send_message(ResId, ChannelId, Message) -> - emqx_resource:query(ResId, {send_message, ChannelId, Message}). + emqx_resource:query(ResId, {send_message, BridgeId, Message}). config_key_path() -> [bridges]. resource_type(mqtt) -> emqx_connector_mqtt; -resource_type(http) -> emqx_connector_http. +resource_type(<<"mqtt">>) -> emqx_connector_mqtt; +resource_type(http) -> emqx_connector_http; +resource_type(<<"http">>) -> emqx_connector_http. bridge_type(emqx_connector_mqtt) -> mqtt; bridge_type(emqx_connector_http) -> http. @@ -104,7 +96,8 @@ post_config_update(_Req, NewConf, OldConf, _AppEnv) -> {fun remove_bridge/3, Removed}, {fun create_bridge/3, Added}, {fun update_bridge/3, Updated} - ]). + ]), + reload_hook(). perform_bridge_changes(Tasks) -> perform_bridge_changes(Tasks, ok). @@ -145,20 +138,6 @@ parse_bridge_id(BridgeId) -> _ -> error({invalid_bridge_id, BridgeId}) end. -channel_id(BridgeType, BridgeName, ChannelType, ChannelName) -> - BType = bin(BridgeType), - BName = bin(BridgeName), - CType = bin(ChannelType), - CName = bin(ChannelName), - <>. - -parse_channel_id(ChannelId) -> - case string:split(bin(ChannelId), ":", all) of - [BridgeType, BridgeName, ChannelType, ChannelName] -> - {BridgeType, BridgeName, ChannelType, ChannelName}; - _ -> error({invalid_bridge_id, ChannelId}) - end. - list_bridges() -> lists:foldl(fun({Type, NameAndConf}, Bridges) -> lists:foldl(fun({Name, RawConf}, Acc) -> @@ -167,7 +146,7 @@ list_bridges() -> {ok, Res} -> [Res | Acc] end end, Bridges, maps:to_list(NameAndConf)) - end, [], maps:to_list(emqx:get_raw_config([bridges]))). + end, [], maps:to_list(emqx:get_raw_config([bridges], #{}))). get_bridge(Type, Name) -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), @@ -205,11 +184,11 @@ create_bridge(Type, Name, Conf) -> update_bridge(Type, Name, {_OldConf, Conf}) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% - %% - if the connection related configs like `username` is updated, we should restart/start + %% - if the connection related configs like `servers` is updated, we should restart/start %% or stop bridges according to the change. - %% - if the connection related configs are not update, but channel configs `ingress_channels` or - %% `egress_channels` are changed, then we should not restart the bridge, we only restart/start - %% the channels. + %% - if the connection related configs are not update, only non-connection configs like + %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated + %% without restarting the bridge. %% ?SLOG(info, #{msg => "update bridge", type => Type, name => Name, config => Conf}), @@ -238,35 +217,19 @@ flatten_confs(Conf0) -> do_flatten_confs(Type, Conf0) -> [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. -has_subscribe_local_topic(Channels) -> - lists:any(fun (#{subscribe_local_topic := _}) -> true; - (_) -> false - end, maps:to_list(Channels)). - -get_matched_channels(Topic) -> - Bridges = emqx_conf:get([bridges], #{}), - maps:fold(fun - %% TODO: also trigger 'message.publish' for mqtt bridges. - (mqtt, _Conf, Acc0) -> Acc0; - (BType, Conf, Acc0) -> - maps:fold(fun - (BName, #{egress_channels := Channels}, Acc1) -> - do_get_matched_channels(Topic, Channels, BType, BName, egress_channels) - ++ Acc1; - (_Name, _BridgeConf, Acc1) -> Acc1 - end, Acc0, Conf) +get_matched_bridges(Topic) -> + Bridges = emqx:get_config([bridges], #{}), + maps:fold(fun (BType, Conf, Acc0) -> + maps:fold(fun + (BName, #{from_local_topic := Filter}, Acc1) -> + case emqx_topic:match(Topic, Filter) of + true -> [bridge_id(BType, BName) | Acc1]; + false -> Acc1 + end; + (_Name, _BridgeConf, Acc1) -> Acc1 + end, Acc0, Conf) end, [], Bridges). -do_get_matched_channels(Topic, Channels, BType, BName, CType) -> - maps:fold(fun - (ChannName, #{subscribe_local_topic := Filter}, Acc) -> - case emqx_topic:match(Topic, Filter) of - true -> [channel_id(BType, BName, CType, ChannName) | Acc]; - false -> Acc - end; - (_ChannName, _ChannConf, Acc) -> Acc - end, [], Channels). - bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_bridge/src/emqx_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl index 3136a74c9..78af4ba41 100644 --- a/apps/emqx_bridge/src/emqx_bridge_monitor.erl +++ b/apps/emqx_bridge/src/emqx_bridge_monitor.erl @@ -67,14 +67,18 @@ code_change(_OldVsn, State, _Extra) -> load_bridges(Configs) -> lists:foreach(fun({Type, NamedConf}) -> lists:foreach(fun({Name, Conf}) -> - load_bridge(Name, Type, Conf) + load_bridge(Type, Name, Conf) end, maps:to_list(NamedConf)) end, maps:to_list(Configs)). %% TODO: move this monitor into emqx_resource %% emqx_resource:check_and_create_local(ResourceId, ResourceType, Config, #{keep_retry => true}). -load_bridge(Name, Type, Config) -> - case emqx_resource:create_local( +load_bridge(<<"http">>, Name, Config) -> + Config1 = parse_http_confs(Config), + do_load_bridge(<<"http">>, Name, Config1). + +do_load_bridge(Type, Name, Config) -> + case emqx_resource:check_and_create_local( emqx_bridge:resource_id(Type, Name), emqx_bridge:resource_type(Type), Config) of {ok, already_created} -> ok; @@ -82,3 +86,28 @@ load_bridge(Name, Type, Config) -> {error, Reason} -> error({load_bridge, Reason}) end. + +parse_http_confs(#{ <<"url">> := Url + , <<"method">> := Method + , <<"body">> := Body + , <<"headers">> := Headers + } = Conf) -> + {BaseUrl, Path} = parse_url(Url), + Conf#{ <<"base_url">> => BaseUrl + , <<"preprocessed_request">> => + emqx_connector_http:preprocess_request(Method, Path, Body, Headers) + }. + +parse_url(Url) -> + case string:split(Url, "//", leading) of + [Scheme, UrlRem] -> + case string:split(UrlRem, "/", leading) of + [HostPort, Path] -> + {iolist_to_binary([Scheme, "//", HostPort]), Path}; + [HostPort] -> + {iolist_to_binary([Scheme, "//", HostPort]), <<>>} + end; + [Url] -> + error({invalid_url, Url}) + end. + diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 2072d15ec..86a34699d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -18,8 +18,24 @@ fields("mqtt_bridge") -> emqx_connector_mqtt:fields("config"); fields("http_bridge") -> - emqx_connector_http:fields(config) ++ http_channels(). + basic_config_http() ++ + [ {url, hoconsc:mk(binary())} + , {from_local_topic, hoconsc:mk(binary())} + , {method, hoconsc:mk(method(), #{default => post})} + , {headers, hoconsc:mk(map(), + #{default => #{ + <<"accept">> => <<"application/json">>, + <<"cache-control">> => <<"no-cache">>, + <<"connection">> => <<"keep-alive">>, + <<"content-type">> => <<"application/json">>, + <<"keep-alive">> => <<"timeout=5">>}}) + } + , {body, hoconsc:mk(binary(), #{default => <<"${payload}">>})} + , {request_timeout, hoconsc:mk(emqx_schema:duration_ms(), #{default => <<"30s">>})} + ]. -http_channels() -> - [{egress_channels, hoconsc:mk(hoconsc:map(id, - hoconsc:ref(emqx_connector_http, "http_request")))}]. +basic_config_http() -> + proplists:delete(base_url, emqx_connector_http:fields(config)). + +method() -> + hoconsc:enum([post, put, get, delete]). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl new file mode 100644 index 000000000..23506669c --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -0,0 +1,144 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_api_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-define(CONF_DEFAULT, <<"bridges: {}">>). + +all() -> + emqx_ct:all(?MODULE). + +groups() -> + []. + +init_per_suite(Config) -> + application:load(emqx_machine), + ok = ekka:start(), + ok = emqx_ct_helpers:start_apps([emqx_bridge]), + ok = emqx_config:init_load(emqx_bridge_schema, ?CONF_DEFAULT), + Config. + +end_per_suite(_Config) -> + ok = ekka:stop(), + emqx_ct_helpers:stop_apps([emqx_bridge]), + ok. + +init_per_testcase(_, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + Config. +end_per_testcase(_, _Config) -> + ok. + +-define(PATH1, <<"path1">>). +-define(PATH2, <<"path2">>). +-define(HTTP_BRIDGE(PATH), +#{ + <<"base_url">> => <<"http://localhost:9901">>, + <<"egress_channels">> => #{ + <<"a">> => #{ + <<"subscribe_local_topic">> => <<"emqx_http/#">>, + <<"method">> => <<"post">>, + <<"path">> => PATH, + <<"body">> => <<"${payload}">>, + <<"headers">> => #{ + <<"content-type">> => <<"application/json">> + } + } + } +}). + +%%------------------------------------------------------------------------------ +%% HTTP server for testing +%%------------------------------------------------------------------------------ +start_http_server(Port, HandleFun) -> + spawn_link(fun() -> + {ok, Sock} = gen_tcp:listen(Port, [{active, false}]), + loop(Sock, HandleFun) + end). + +loop(Sock, HandleFun) -> + {ok, Conn} = gen_tcp:accept(Sock), + Handler = spawn(fun () -> HandleFun(Conn) end), + gen_tcp:controlling_process(Conn, Handler), + loop(Sock, HandleFun). + +make_response(CodeStr, Str) -> + B = iolist_to_binary(Str), + iolist_to_binary( + io_lib:fwrite( + "HTTP/1.0 ~s\nContent-Type: text/html\nContent-Length: ~p\n\n~s", + [CodeStr, size(B), B])). + +handle_fun_200_ok(Conn) -> + case gen_tcp:recv(Conn, 0) of + {ok, Request} -> + gen_tcp:send(Conn, make_response("200 OK", "Request OK")), + self() ! {http_server, received, Request}, + handle_fun_200_ok(Conn); + {error, closed} -> + gen_tcp:close(Conn) + end. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_crud_apis(_) -> + %% assert we there's no bridges at first + {200, []} = emqx_bridge_api:list_bridges(get, #{}), + + %% then we add a http bridge now + {200, [Bridge]} = emqx_bridge_api:crud_bridges_cluster(put, + #{ bindings => #{id => <<"http:test_bridge">>} + , body => ?HTTP_BRIDGE(?PATH1) + }), + %ct:pal("---bridge: ~p", [Bridge]), + ?assertMatch(#{ id := <<"http:test_bridge">> + , bridge_type := http + , is_connected := _ + , node := _ + , <<"egress_channels">> := #{ + <<"a">> := #{<<"path">> := ?PATH1} + } + }, Bridge), + + %% update the request-path of the bridge + {200, [Bridge2]} = emqx_bridge_api:crud_bridges_cluster(put, + #{ bindings => #{id => <<"http:test_bridge">>} + , body => ?HTTP_BRIDGE(?PATH2) + }), + ?assertMatch(#{ id := <<"http:test_bridge">> + , bridge_type := http + , is_connected := _ + , <<"egress_channels">> := #{ + <<"a">> := #{<<"path">> := ?PATH2} + } + }, Bridge2), + + %% list all bridges again, assert Bridge2 is in it + {200, [Bridge2]} = emqx_bridge_api:list_bridges(get, #{}), + + %% delete teh bridge + {200} = emqx_bridge_api:crud_bridges_cluster(delete, + #{ bindings => #{id => <<"http:test_bridge">>} + }), + {200, []} = emqx_bridge_api:list_bridges(get, #{}), + ok. + diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 3e59d3528..fe8bb6c97 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -14,6 +14,7 @@ epgsql, mysql, mongodb, + ehttpc, emqx, emqtt ]}, diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 61fcb1c67..0c5f377ba 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -38,7 +38,9 @@ , fields/1 , validations/0]). --export([ check_ssl_opts/2 ]). +-export([ check_ssl_opts/2 + , preprocess_request/4 + ]). -type connect_timeout() :: emqx_schema:duration() | infinity. -type pool_type() :: random | hash. @@ -50,23 +52,7 @@ %%===================================================================== %% Hocon schema roots() -> - [{config, #{type => hoconsc:ref(?MODULE, config)}}]. - -fields("http_request") -> - [ {subscribe_local_topic, hoconsc:mk(binary())} - , {method, hoconsc:mk(method(), #{default => post})} - , {path, hoconsc:mk(binary(), #{default => <<"">>})} - , {headers, hoconsc:mk(map(), - #{default => #{ - <<"accept">> => <<"application/json">>, - <<"cache-control">> => <<"no-cache">>, - <<"connection">> => <<"keep-alive">>, - <<"content-type">> => <<"application/json">>, - <<"keep-alive">> => <<"timeout=5">>}}) - } - , {body, hoconsc:mk(binary(), #{default => <<"${payload}">>})} - , {request_timeout, hoconsc:mk(emqx_schema:duration_ms(), #{default => <<"30s">>})} - ]; + fields(config). fields(config) -> [ {base_url, fun base_url/1} @@ -76,11 +62,9 @@ fields(config) -> , {pool_type, fun pool_type/1} , {pool_size, fun pool_size/1} , {enable_pipelining, fun enable_pipelining/1} + , {preprocessed_request, hoconsc:mk(map())} ] ++ emqx_connector_schema_lib:ssl_fields(). -method() -> - hoconsc:enum([post, put, get, delete]). - validations() -> [ {check_ssl_opts, fun check_ssl_opts/1} ]. @@ -152,8 +136,7 @@ on_start(InstId, #{base_url := #{scheme := Scheme, pool_name => PoolName, host => Host, port => Port, - base_path => BasePath, - channels => preproc_channels(InstId, Config) + base_path => BasePath }, case ehttpc_sup:start_pool(PoolName, PoolOpts) of {ok, _} -> {ok, State}; @@ -167,12 +150,12 @@ on_stop(InstId, #{pool_name := PoolName}) -> connector => InstId}), ehttpc_sup:stop_pool(PoolName). -on_query(InstId, {send_message, ChannelId, Msg}, AfterQuery, #{channels := Channels} = State) -> - case maps:find(ChannelId, Channels) of - error -> ?SLOG(error, #{msg => "channel not found", channel_id => ChannelId}); - {ok, ChannConf} -> +on_query(InstId, {send_message, BridgeId, Msg}, AfterQuery, State) -> + case maps:find(preprocessed_request, State) of + error -> ?SLOG(error, #{msg => "preprocessed_request found", bridge_id => BridgeId}); + {ok, Request} -> #{method := Method, path := Path, body := Body, headers := Headers, - request_timeout := Timeout} = proc_channel_conf(ChannConf, Msg), + request_timeout := Timeout} = process_request(Request, Msg), on_query(InstId, {Method, {Path, Headers, Body}, Timeout}, AfterQuery, State) end; on_query(InstId, {Method, Request}, AfterQuery, State) -> @@ -211,26 +194,12 @@ on_health_check(_InstId, #{host := Host, port := Port} = State) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - -preproc_channels(<<"bridge:", BridgeId/binary>>, Config) -> - {BridgeType, BridgeName} = emqx_bridge:parse_bridge_id(BridgeId), - maps:fold(fun(ChannName, ChannConf, Acc) -> - Acc#{emqx_bridge:channel_id(BridgeType, BridgeName, egress_channels, ChannName) => - preproc_channel_conf(ChannConf)} - end, #{}, maps:get(egress_channels, Config, #{})); -preproc_channels(_InstId, _Config) -> - #{}. - -preproc_channel_conf(#{ - method := Method, - path := Path, - body := Body, - headers := Headers} = Conf) -> - Conf#{ method => emqx_plugin_libs_rule:preproc_tmpl(bin(Method)) - , path => emqx_plugin_libs_rule:preproc_tmpl(Path) - , body => emqx_plugin_libs_rule:preproc_tmpl(Body) - , headers => preproc_headers(Headers) - }. +preprocess_request(Method, Path, Body, Headers) -> + #{ method => emqx_plugin_libs_rule:preproc_tmpl(bin(Method)) + , path => emqx_plugin_libs_rule:preproc_tmpl(Path) + , body => emqx_plugin_libs_rule:preproc_tmpl(Body) + , headers => preproc_headers(Headers) + }. preproc_headers(Headers) -> maps:fold(fun(K, V, Acc) -> @@ -238,7 +207,7 @@ preproc_headers(Headers) -> emqx_plugin_libs_rule:preproc_tmpl(bin(V))} end, #{}, Headers). -proc_channel_conf(#{ +process_request(#{ method := MethodTks, path := PathTks, body := BodyTks, @@ -264,7 +233,7 @@ check_ssl_opts(Conf) -> check_ssl_opts("base_url", Conf). check_ssl_opts(URLFrom, Conf) -> - #{schema := Scheme} = hocon_schema:get_value(URLFrom, Conf), + #{scheme := Scheme} = hocon_schema:get_value(URLFrom, Conf), SSL= hocon_schema:get_value("ssl", Conf), case {Scheme, maps:get(enable, SSL, false)} of {http, false} -> true;