Merge pull request #9687 from emqx/function_clause_when_send_msg_to_bridges

fix: function_clause when sending messages to bridges
This commit is contained in:
Xinyu Liu 2023-01-09 09:23:17 +08:00 committed by GitHub
commit 336b310d56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 4 deletions

View File

@ -363,10 +363,13 @@ get_matched_egress_bridges(Topic) ->
get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) ->
Acc;
get_matched_bridge_id(BType, #{local_topic := Filter}, Topic, BName, Acc) when
?EGRESS_DIR_BRIDGES(BType)
->
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc);
get_matched_bridge_id(BType, Conf, Topic, BName, Acc) when ?EGRESS_DIR_BRIDGES(BType) ->
case maps:get(local_topic, Conf, undefined) of
undefined ->
Acc;
Filter ->
do_get_matched_bridge_id(Topic, Filter, BType, BName, Acc)
end;
get_matched_bridge_id(mqtt, #{egress := #{local := #{topic := Filter}}}, Topic, BName, Acc) ->
do_get_matched_bridge_id(Topic, Filter, mqtt, BName, Acc);
get_matched_bridge_id(kafka, #{producer := #{mqtt := #{topic := Filter}}}, Topic, BName, Acc) ->

View File

@ -305,6 +305,55 @@ t_http_crud_apis(Config) ->
),
ok.
t_http_bridges_local_topic(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name1 = <<"t_http_bridges_with_local_topic1">>,
Name2 = <<"t_http_bridges_without_local_topic1">>,
%% create one http bridge with local_topic
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name1)
),
%% and we create another one without local_topic
{ok, 201, _} = request(
post,
uri(["bridges"]),
maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name2))
),
BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name1),
BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name2),
%% Send an message to emqx and the message should be forwarded to the HTTP server.
%% This is to verify we can have 2 bridges with and without local_topic fields
%% at the same time.
Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
?assert(
receive
{http_server, received, #{
method := <<"POST">>,
path := <<"/path1">>,
body := Body
}} ->
true;
Msg ->
ct:pal("error: http got unexpected request: ~p", [Msg]),
false
after 100 ->
false
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID1]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID2]), []),
ok.
t_check_dependent_actions_on_delete(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first

View File

@ -0,0 +1,2 @@
Fix the problem that sending messages to data-bridges failed because of incorrect handling of some data-bridges without `local_topic` field configured.
Before this change, if some bridges have configured the `local_topic` field but others have not, a `function_clause` error will occur when forwarding messages to the data-bridges.

View File

@ -0,0 +1,2 @@
修复由于某些数据桥接未配置 `local_topic` 字段,导致的所有数据桥接无法发送消息。
在此改动之前,如果有些桥接设置了 `local_topic` 字段而有些没有设置,数据桥接转发消息时会出现 `function_clause` 的错误。