diff --git a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl index ee7e8524c..530eb77b2 100644 --- a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl +++ b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl @@ -103,13 +103,13 @@ end_per_testcase(_TestCase, _Config) -> %% Helper fns %%------------------------------------------------------------------------------------- -check_send_message_with_action(Topic, ActionName, ConnectorName) -> +check_send_message_with_action(Topic, ActionName, ConnectorName, Expect) -> send_message(Topic), %% ###################################### %% Check if message is sent to es %% ###################################### timer:sleep(500), - check_action_metrics(ActionName, ConnectorName). + check_action_metrics(ActionName, ConnectorName, Expect). send_message(Topic) -> Now = emqx_utils_calendar:now_to_rfc3339(microsecond), @@ -123,7 +123,7 @@ send_message(Topic) -> ok = emqtt:publish(Client, Topic, Payload, [{qos, 0}]), ok. -check_action_metrics(ActionName, ConnectorName) -> +check_action_metrics(ActionName, ConnectorName, Expect) -> ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName), Metrics = #{ @@ -134,13 +134,7 @@ check_action_metrics(ActionName, ConnectorName) -> dropped => emqx_resource_metrics:dropped_get(ActionId) }, ?assertEqual( - #{ - match => 1, - success => 1, - dropped => 0, - failed => 0, - queuing => 0 - }, + Expect, Metrics, {ActionName, ConnectorName, ActionId} ). @@ -248,7 +242,7 @@ t_create_remove_list(Config) -> ok. %% Test sending a message to a bridge V2 -t_send_message(Config) -> +t_create_message(Config) -> ConnectorConfig = connector_config(Config), {ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig), ActionConfig = action(<<"test_connector2">>), @@ -261,7 +255,8 @@ t_send_message(Config) -> }, {ok, _} = emqx_rule_engine:create_rule(Rule), %% Use the action to send a message - check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2), + Expect = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0}, + check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2, Expect), %% Create a few more bridges with the same connector and test them ActionNames1 = lists:foldl( @@ -278,7 +273,7 @@ t_send_message(Config) -> }, {ok, _} = emqx_rule_engine:create_rule(Rule1), Topic = <<"es/", Seq/binary>>, - check_send_message_with_action(Topic, ActionName, test_connector2), + check_send_message_with_action(Topic, ActionName, test_connector2, Expect), [ActionName | Acc] end, [], @@ -293,6 +288,74 @@ t_send_message(Config) -> ActionNames ), emqx_connector:remove(?TYPE, test_connector2), + lists:foreach( + fun(#{id := Id}) -> + emqx_rule_engine:delete_rule(Id) + end, + emqx_rule_engine:get_rules() + ), + ok. + +t_update_message(Config) -> + ConnectorConfig = connector_config(Config), + {ok, _} = emqx_connector:create(?TYPE, update_connector, ConnectorConfig), + ActionConfig0 = action(<<"update_connector">>), + DocId = emqx_guid:to_hexstr(emqx_guid:gen()), + ActionConfig1 = ActionConfig0#{ + <<"parameters">> => #{ + <<"index">> => <<"${payload.index}">>, + <<"id">> => DocId, + <<"max_retries">> => 0, + <<"action">> => <<"update">>, + <<"doc">> => <<"${payload.doc}">> + } + }, + {ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig1), + Rule = #{ + id => <<"rule:t_es_1">>, + sql => <<"SELECT\n *\nFROM\n \"es/#\"">>, + actions => [<<"elasticsearch:update_action">>], + description => <<"sink doc to elasticsearch">> + }, + {ok, _} = emqx_rule_engine:create_rule(Rule), + %% failed to update a nonexistent doc + Expect0 = #{match => 1, success => 0, dropped => 0, failed => 1, queuing => 0}, + check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect0), + %% doc_as_upsert to insert a new doc + ActionConfig2 = ActionConfig1#{ + <<"parameters">> => #{ + <<"index">> => <<"${payload.index}">>, + <<"id">> => DocId, + <<"action">> => <<"update">>, + <<"doc">> => <<"${payload.doc}">>, + <<"doc_as_upsert">> => true, + <<"max_retries">> => 0 + } + }, + {ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig2), + Expect1 = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0}, + check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect1), + %% update without doc, use msg as default + ActionConfig3 = ActionConfig1#{ + <<"parameters">> => #{ + <<"index">> => <<"${payload.index}">>, + <<"id">> => DocId, + <<"action">> => <<"update">>, + <<"max_retries">> => 0 + } + }, + {ok, _} = emqx_bridge_v2:create(?TYPE, update_action, ActionConfig3), + Expect2 = #{match => 1, success => 1, dropped => 0, failed => 0, queuing => 0}, + check_send_message_with_action(<<"es/1">>, update_action, update_connector, Expect2), + %% Clean + ok = emqx_bridge_v2:remove(?TYPE, update_action), + emqx_connector:remove(?TYPE, update_connector), + lists:foreach( + fun(#{id := Id}) -> + emqx_rule_engine:delete_rule(Id) + end, + emqx_rule_engine:get_rules() + ), ok. %% Test that we can get the status of the bridge V2