From fd7eacb953820f346c761e04d29ea344a03c1137 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 4 Mar 2024 15:05:41 -0300 Subject: [PATCH 01/82] test(elastic_search): fix flaky test --- .../docker-compose-elastic-search-tls.yaml | 4 ++- .../test/emqx_bridge_es_SUITE.erl | 36 +++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml index 50491a88a..c68efb6af 100644 --- a/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml +++ b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml @@ -1,5 +1,7 @@ version: "3.9" +# hint: run the following if the container fails to start locally +# sysctl -w vm.max_map_count=262144 services: setup: image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG} @@ -54,7 +56,7 @@ services: - xpack.security.http.ssl.certificate=certs/es01/es01.crt - xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt - xpack.license.self_generated.type=${LICENSE} - mem_limit: 1073741824 + mem_limit: 4G ulimits: memlock: soft: -1 diff --git a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl index 530eb77b2..76bf5d217 100644 --- a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl +++ b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl @@ -36,6 +36,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), ProxyName = "elasticsearch", ESHost = os:getenv("ELASTICSEARCH_HOST", "elasticsearch"), ESPort = list_to_integer(os:getenv("ELASTICSEARCH_PORT", "9200")), @@ -82,9 +83,6 @@ wait_until_elasticsearch_is_up(Count, Host, Port) -> end_per_suite(Config) -> Apps = ?config(apps, Config), - %ProxyHost = ?config(proxy_host, Config), - %ProxyPort = ?config(proxy_port, Config), - %emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_cth_suite:stop(Apps), ok. @@ -92,9 +90,6 @@ init_per_testcase(_TestCase, Config) -> Config. end_per_testcase(_TestCase, _Config) -> - %ProxyHost = ?config(proxy_host, Config), - %ProxyPort = ?config(proxy_port, Config), - %emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), emqx_common_test_helpers:call_janitor(60_000), ok. @@ -125,18 +120,20 @@ send_message(Topic) -> check_action_metrics(ActionName, ConnectorName, Expect) -> ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName), - Metrics = - #{ - match => emqx_resource_metrics:matched_get(ActionId), - success => emqx_resource_metrics:success_get(ActionId), - failed => emqx_resource_metrics:failed_get(ActionId), - queuing => emqx_resource_metrics:queuing_get(ActionId), - dropped => emqx_resource_metrics:dropped_get(ActionId) - }, - ?assertEqual( - Expect, - Metrics, - {ActionName, ConnectorName, ActionId} + ?retry( + 300, + 20, + ?assertEqual( + Expect, + #{ + match => emqx_resource_metrics:matched_get(ActionId), + success => emqx_resource_metrics:success_get(ActionId), + failed => emqx_resource_metrics:failed_get(ActionId), + queuing => emqx_resource_metrics:queuing_get(ActionId), + dropped => emqx_resource_metrics:dropped_get(ActionId) + }, + {ActionName, ConnectorName, ActionId} + ) ). action_config(ConnectorName) -> @@ -159,7 +156,8 @@ action(ConnectorName) -> <<"connector">> => ConnectorName, <<"resource_opts">> => #{ <<"health_check_interval">> => <<"30s">>, - <<"query_mode">> => <<"sync">> + <<"query_mode">> => <<"sync">>, + <<"metrics_flush_interval">> => <<"300ms">> } }. From eacd803a375555af68511f6260c83e4c8cca13a5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Mar 2024 12:04:29 -0300 Subject: [PATCH 02/82] test(pulsar): fix flaky test --- .../test/emqx_bridge_pulsar_v2_SUITE.erl | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl index 69b384ab8..0636806de 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_v2_SUITE.erl @@ -212,20 +212,25 @@ t_action(Config) -> ?assertEqual(ReqPayload, emqx_utils_json:decode(RespPayload)), ok = emqtt:disconnect(C1), InstanceId = instance_id(actions, Name), - #{counters := Counters} = emqx_resource:get_metrics(InstanceId), + ?retry( + 100, + 20, + ?assertMatch( + #{ + counters := #{ + dropped := 0, + success := 1, + matched := 1, + failed := 0, + received := 0 + } + }, + emqx_resource:get_metrics(InstanceId) + ) + ), ok = delete_action(Name), ActionsAfterDelete = emqx_bridge_v2:list(actions), ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete), - ?assertMatch( - #{ - dropped := 0, - success := 1, - matched := 1, - failed := 0, - received := 0 - }, - Counters - ), ok. %%------------------------------------------------------------------------------ @@ -292,7 +297,8 @@ pulsar_action(Config) -> <<"pulsar_topic">> => ?config(pulsar_topic, Config) }, <<"resource_opts">> => #{ - <<"health_check_interval">> => <<"1s">> + <<"health_check_interval">> => <<"1s">>, + <<"metrics_flush_interval">> => <<"300ms">> } } } From dc16e59f2c4ec143fe4815cb961d94e6d896e120 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Mar 2024 11:29:50 -0300 Subject: [PATCH 03/82] fix(gcp_pubsub_producer): check for topic existence when creating action Fixes https://emqx.atlassian.net/browse/EMQX-11949 --- .../src/emqx_bridge_gcp_pubsub_client.erl | 6 +- .../emqx_bridge_gcp_pubsub_impl_producer.erl | 43 ++-- .../emqx_bridge_gcp_pubsub_producer_SUITE.erl | 155 ++++++++----- ...qx_bridge_v2_gcp_pubsub_producer_SUITE.erl | 215 ++++++++++++++++++ changes/ee/fix-12656.en.md | 1 + 5 files changed, 349 insertions(+), 71 deletions(-) create mode 100644 apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl create mode 100644 changes/ee/fix-12656.en.md diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl index f27aab422..67218fcf0 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl @@ -198,13 +198,13 @@ get_status(#{connect_timeout := Timeout, pool_name := PoolName} = State) -> %%------------------------------------------------------------------------------------------------- -spec get_topic(topic(), state(), request_opts()) -> {ok, map()} | {error, term()}. -get_topic(Topic, ConnectorState, ReqOpts) -> - #{project_id := ProjectId} = ConnectorState, +get_topic(Topic, ClientState, ReqOpts) -> + #{project_id := ProjectId} = ClientState, Method = get, Path = <<"/v1/projects/", ProjectId/binary, "/topics/", Topic/binary>>, Body = <<>>, PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts}, - ?MODULE:query_sync(PreparedRequest, ConnectorState). + ?MODULE:query_sync(PreparedRequest, ClientState). %%------------------------------------------------------------------------------------------------- %% Helper fns diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 299b90226..13040dccf 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -186,10 +186,14 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, ConnectorState) -> {ok, connector_state()}. on_add_channel(_ConnectorResId, ConnectorState0, ActionId, ActionConfig) -> #{installed_actions := InstalledActions0} = ConnectorState0, - ChannelState = install_channel(ActionConfig), - InstalledActions = InstalledActions0#{ActionId => ChannelState}, - ConnectorState = ConnectorState0#{installed_actions := InstalledActions}, - {ok, ConnectorState}. + case install_channel(ActionConfig, ConnectorState0) of + {ok, ChannelState} -> + InstalledActions = InstalledActions0#{ActionId => ChannelState}, + ConnectorState = ConnectorState0#{installed_actions := InstalledActions}, + {ok, ConnectorState}; + Error = {error, _} -> + Error + end. -spec on_remove_channel( connector_resource_id(), @@ -218,8 +222,7 @@ on_get_channel_status(_ConnectorResId, _ChannelId, _ConnectorState) -> %% Helper fns %%------------------------------------------------------------------------------------------------- -%% TODO: check if topic exists ("unhealthy target") -install_channel(ActionConfig) -> +install_channel(ActionConfig, ConnectorState) -> #{ parameters := #{ attributes_template := AttributesTemplate, @@ -231,13 +234,27 @@ install_channel(ActionConfig) -> request_ttl := RequestTTL } } = ActionConfig, - #{ - attributes_template => preproc_attributes(AttributesTemplate), - ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate), - payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), - pubsub_topic => PubSubTopic, - request_ttl => RequestTTL - }. + #{client := Client} = ConnectorState, + case + emqx_bridge_gcp_pubsub_client:get_topic(PubSubTopic, Client, #{request_ttl => RequestTTL}) + of + {error, #{status_code := 404}} -> + {error, {unhealthy_target, <<"Topic does not exist">>}}; + {error, #{status_code := 403}} -> + {error, {unhealthy_target, <<"Permission denied for topic">>}}; + {error, #{status_code := 401}} -> + {error, {unhealthy_target, <<"Bad credentials">>}}; + {error, Reason} -> + {error, Reason}; + {ok, _} -> + {ok, #{ + attributes_template => preproc_attributes(AttributesTemplate), + ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate), + payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), + pubsub_topic => PubSubTopic, + request_ttl => RequestTTL + }} + end. -spec do_send_requests_sync( connector_state(), diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index 4acc5ff3c..6666a3fd0 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -76,6 +76,7 @@ only_sync_tests() -> [t_query_sync]. init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), Apps = emqx_cth_suite:start( [ emqx, @@ -257,20 +258,31 @@ create_rule_and_action_http(Config) -> success_http_handler() -> TestPid = self(), fun(Req0, State) -> - {ok, Body, Req} = cowboy_req:read_body(Req0), - TestPid ! {http, cowboy_req:headers(Req), Body}, - Rep = cowboy_req:reply( - 200, - #{<<"content-type">> => <<"application/json">>}, - emqx_utils_json:encode(#{messageIds => [<<"6058891368195201">>]}), - Req - ), - {ok, Rep, State} + case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of + {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} -> + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<"{}">>, + Req0 + ), + {ok, Rep, State}; + _ -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + emqx_utils_json:encode(#{messageIds => [<<"6058891368195201">>]}), + Req + ), + {ok, Rep, State} + end end. start_echo_http_server() -> HTTPHost = "localhost", - HTTPPath = <<"/v1/projects/myproject/topics/mytopic:publish">>, + HTTPPath = '_', ServerSSLOpts = [ {verify, verify_none}, @@ -656,6 +668,20 @@ wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) -> error({timeout_waiting_for_telemetry, EventName}) end. +kill_gun_process(EhttpcPid) -> + State = ehttpc:get_state(EhttpcPid, minimal), + GunPid = maps:get(client, State), + true = is_pid(GunPid), + _ = exit(GunPid, kill), + ok. + +kill_gun_processes(ConnectorResourceId) -> + Pool = ehttpc:workers(ConnectorResourceId), + Workers = lists:map(fun({_, Pid}) -> Pid end, Pool), + %% assert there is at least one pool member + ?assertMatch([_ | _], Workers), + lists:foreach(fun(Pid) -> kill_gun_process(Pid) end, Workers). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -1343,15 +1369,26 @@ t_failure_with_body(Config) -> TestPid = self(), FailureWithBodyHandler = fun(Req0, State) -> - {ok, Body, Req} = cowboy_req:read_body(Req0), - TestPid ! {http, cowboy_req:headers(Req), Body}, - Rep = cowboy_req:reply( - 400, - #{<<"content-type">> => <<"application/json">>}, - emqx_utils_json:encode(#{}), - Req - ), - {ok, Rep, State} + case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of + {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} -> + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<"{}">>, + Req0 + ), + {ok, Rep, State}; + _ -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 400, + #{<<"content-type">> => <<"application/json">>}, + emqx_utils_json:encode(#{}), + Req + ), + {ok, Rep, State} + end end, ok = emqx_bridge_http_connector_test_server:set_handler(FailureWithBodyHandler), Topic = <<"t/topic">>, @@ -1381,15 +1418,26 @@ t_failure_no_body(Config) -> TestPid = self(), FailureNoBodyHandler = fun(Req0, State) -> - {ok, Body, Req} = cowboy_req:read_body(Req0), - TestPid ! {http, cowboy_req:headers(Req), Body}, - Rep = cowboy_req:reply( - 400, - #{<<"content-type">> => <<"application/json">>}, - <<>>, - Req - ), - {ok, Rep, State} + case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of + {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} -> + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<"{}">>, + Req0 + ), + {ok, Rep, State}; + _ -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + Rep = cowboy_req:reply( + 400, + #{<<"content-type">> => <<"application/json">>}, + <<>>, + Req + ), + {ok, Rep, State} + end end, ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler), Topic = <<"t/topic">>, @@ -1415,20 +1463,6 @@ t_failure_no_body(Config) -> ), ok. -kill_gun_process(EhttpcPid) -> - State = ehttpc:get_state(EhttpcPid, minimal), - GunPid = maps:get(client, State), - true = is_pid(GunPid), - _ = exit(GunPid, kill), - ok. - -kill_gun_processes(ConnectorResourceId) -> - Pool = ehttpc:workers(ConnectorResourceId), - Workers = lists:map(fun({_, Pid}) -> Pid end, Pool), - %% assert there is at least one pool member - ?assertMatch([_ | _], Workers), - lists:foreach(fun(Pid) -> kill_gun_process(Pid) end, Workers). - t_unrecoverable_error(Config) -> ActionResourceId = ?config(action_resource_id, Config), ConnectorResourceId = ?config(connector_resource_id, Config), @@ -1436,19 +1470,30 @@ t_unrecoverable_error(Config) -> TestPid = self(), FailureNoBodyHandler = fun(Req0, State) -> - {ok, Body, Req} = cowboy_req:read_body(Req0), - TestPid ! {http, cowboy_req:headers(Req), Body}, - %% kill the gun process while it's waiting for the - %% response so we provoke an `{error, _}' response from - %% ehttpc. - ok = kill_gun_processes(ConnectorResourceId), - Rep = cowboy_req:reply( - 200, - #{<<"content-type">> => <<"application/json">>}, - <<>>, - Req - ), - {ok, Rep, State} + case {cowboy_req:method(Req0), cowboy_req:path(Req0)} of + {<<"GET">>, <<"/v1/projects/myproject/topics/", _/binary>>} -> + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<"{}">>, + Req0 + ), + {ok, Rep, State}; + _ -> + {ok, Body, Req} = cowboy_req:read_body(Req0), + TestPid ! {http, cowboy_req:headers(Req), Body}, + %% kill the gun process while it's waiting for the + %% response so we provoke an `{error, _}' response from + %% ehttpc. + ok = kill_gun_processes(ConnectorResourceId), + Rep = cowboy_req:reply( + 200, + #{<<"content-type">> => <<"application/json">>}, + <<>>, + Req + ), + {ok, Rep, State} + end end, ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler), Topic = <<"t/topic">>, diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl new file mode 100644 index 000000000..f2255c343 --- /dev/null +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_v2_gcp_pubsub_producer_SUITE.erl @@ -0,0 +1,215 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_v2_gcp_pubsub_producer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_producer">>). +-define(ACTION_TYPE_BIN, <<"gcp_pubsub_producer">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_common_test_helpers:clear_screen(), + emqx_bridge_gcp_pubsub_consumer_SUITE:init_per_suite(Config). + +end_per_suite(Config) -> + emqx_bridge_gcp_pubsub_consumer_SUITE:end_per_suite(Config). + +init_per_testcase(TestCase, Config) -> + common_init_per_testcase(TestCase, Config). + +common_init_per_testcase(TestCase, Config0) -> + ct:timetrap(timer:seconds(60)), + ServiceAccountJSON = + #{<<"project_id">> := ProjectId} = + emqx_bridge_gcp_pubsub_utils:generate_service_account_json(), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>, + ConnectorConfig = connector_config(Name, ServiceAccountJSON), + PubsubTopic = Name, + ActionConfig = action_config(#{ + connector => Name, + parameters => #{pubsub_topic => PubsubTopic} + }), + Config = [ + {bridge_kind, action}, + {action_type, ?ACTION_TYPE_BIN}, + {action_name, Name}, + {action_config, ActionConfig}, + {connector_name, Name}, + {connector_type, ?CONNECTOR_TYPE_BIN}, + {connector_config, ConnectorConfig}, + {service_account_json, ServiceAccountJSON}, + {project_id, ProjectId}, + {pubsub_topic, PubsubTopic} + | Config0 + ], + ok = emqx_bridge_gcp_pubsub_consumer_SUITE:ensure_topic(Config, PubsubTopic), + Config. + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +connector_config(Name, ServiceAccountJSON) -> + InnerConfigMap0 = + #{ + <<"enable">> => true, + <<"tags">> => [<<"bridge">>], + <<"description">> => <<"my cool bridge">>, + <<"connect_timeout">> => <<"5s">>, + <<"pool_size">> => 8, + <<"pipelining">> => <<"100">>, + <<"max_retries">> => <<"2">>, + <<"service_account_json">> => ServiceAccountJSON, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"1s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + } + }, + emqx_bridge_v2_testlib:parse_and_check_connector(?ACTION_TYPE_BIN, Name, InnerConfigMap0). + +action_config(Overrides0) -> + Overrides = emqx_utils_maps:binary_key_map(Overrides0), + CommonConfig = + #{ + <<"enable">> => true, + <<"connector">> => <<"please override">>, + <<"parameters">> => + #{ + <<"pubsub_topic">> => <<"please override">> + }, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => <<"0ms">>, + <<"buffer_mode">> => <<"memory_only">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"45s">>, + <<"resume_interval">> => <<"15s">>, + <<"worker_pool_size">> => <<"1">> + } + }, + maps:merge(CommonConfig, Overrides). + +assert_persisted_service_account_json_is_binary(ConnectorName) -> + %% ensure cluster.hocon has a binary encoded json string as the value + {ok, Hocon} = hocon:files([application:get_env(emqx, cluster_hocon_file, undefined)]), + ?assertMatch( + Bin when is_binary(Bin), + emqx_utils_maps:deep_get( + [ + <<"connectors">>, + <<"gcp_pubsub_producer">>, + ConnectorName, + <<"service_account_json">> + ], + Hocon + ) + ), + ok. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_start_stop(Config) -> + ok = emqx_bridge_v2_testlib:t_start_stop(Config, gcp_pubsub_stop), + ok. + +t_create_via_http(Config) -> + ok = emqx_bridge_v2_testlib:t_create_via_http(Config), + ok. + +t_create_via_http_json_object_service_account(Config0) -> + %% After the config goes through the roundtrip with `hocon_tconf:check_plain', service + %% account json comes back as a binary even if the input is a json object. + ConnectorName = ?config(connector_name, Config0), + ConnConfig0 = ?config(connector_config, Config0), + Config1 = proplists:delete(connector_config, Config0), + ConnConfig1 = maps:update_with( + <<"service_account_json">>, + fun(X) -> + ?assert(is_binary(X), #{json => X}), + JSON = emqx_utils_json:decode(X, [return_maps]), + ?assert(is_map(JSON)), + JSON + end, + ConnConfig0 + ), + Config = [{connector_config, ConnConfig1} | Config1], + ok = emqx_bridge_v2_testlib:t_create_via_http(Config), + assert_persisted_service_account_json_is_binary(ConnectorName), + ok. + +%% Check that creating an action (V2) with a non-existent topic leads returns an error. +t_bad_topic(Config) -> + ?check_trace( + begin + %% Should it really be 201 here? + ?assertMatch( + {ok, {{_, 201, _}, _, #{}}}, + emqx_bridge_v2_testlib:create_bridge_api( + Config, + #{<<"parameters">> => #{<<"pubsub_topic">> => <<"i-dont-exist">>}} + ) + ), + #{ + kind := Kind, + type := Type, + name := Name + } = emqx_bridge_v2_testlib:get_common_values(Config), + ActionConfig0 = emqx_bridge_v2_testlib:get_value(action_config, Config), + ProbeRes = emqx_bridge_v2_testlib:probe_bridge_api( + Kind, + Type, + Name, + emqx_utils_maps:deep_merge( + ActionConfig0, + #{<<"parameters">> => #{<<"pubsub_topic">> => <<"i-dont-exist">>}} + ) + ), + ?assertMatch( + {error, {{_, 400, _}, _, _}}, + ProbeRes + ), + {error, {{_, 400, _}, _, #{<<"message">> := Msg}}} = ProbeRes, + ?assertMatch(match, re:run(Msg, <<"unhealthy_target">>, [{capture, none}]), #{ + msg => Msg + }), + ?assertMatch(match, re:run(Msg, <<"Topic does not exist">>, [{capture, none}]), #{ + msg => Msg + }), + ok + end, + [] + ), + ok. diff --git a/changes/ee/fix-12656.en.md b/changes/ee/fix-12656.en.md new file mode 100644 index 000000000..85e2a7e3b --- /dev/null +++ b/changes/ee/fix-12656.en.md @@ -0,0 +1 @@ +Added a topic check when creating a GCP PubSub Producer action, so it now fails when the topic does not exist or the provided credentials do not have enough permissions to use it. From a2e761681e5442197b6afb977ad5b686dc88ef50 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Wed, 21 Feb 2024 16:49:17 +0200 Subject: [PATCH 04/82] feat: add client mqueue/inflight messages API --- apps/emqx/src/emqx_channel.erl | 4 + apps/emqx/src/emqx_inflight.erl | 47 +++- apps/emqx/src/emqx_mqueue.erl | 52 +++- apps/emqx/src/emqx_session.erl | 2 +- apps/emqx/src/emqx_session_mem.erl | 5 + apps/emqx/test/emqx_inflight_SUITE.erl | 82 +++++- apps/emqx/test/emqx_mqueue_SUITE.erl | 68 +++++ .../src/emqx_dashboard_swagger.erl | 30 ++- apps/emqx_management/src/emqx_mgmt.erl | 7 + apps/emqx_management/src/emqx_mgmt_api.erl | 50 ++++ .../src/emqx_mgmt_api_clients.erl | 221 +++++++++++++++- .../test/emqx_mgmt_api_clients_SUITE.erl | 237 +++++++++++++++++- changes/ce/feat-12561.en.md | 21 ++ rel/i18n/emqx_mgmt_api_clients.hocon | 51 ++++ 14 files changed, 866 insertions(+), 11 deletions(-) create mode 100644 changes/ce/feat-12561.en.md diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 51b66f4f9..94497ef46 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1210,6 +1210,10 @@ handle_call( ChanInfo1 = info(NChannel), emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}), reply(ok, reset_timer(keepalive, NChannel)); +handle_call({Type, _Meta} = MsgsReq, Channel = #channel{session = Session}) when + Type =:= mqueue_msgs; Type =:= inflight_msgs +-> + {reply, emqx_session:info(MsgsReq, Session), Channel}; handle_call(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), reply(ignored, Channel). diff --git a/apps/emqx/src/emqx_inflight.erl b/apps/emqx/src/emqx_inflight.erl index c342a846f..1f4433e57 100644 --- a/apps/emqx/src/emqx_inflight.erl +++ b/apps/emqx/src/emqx_inflight.erl @@ -36,7 +36,8 @@ max_size/1, is_full/1, is_empty/1, - window/1 + window/1, + query/2 ]). -export_type([inflight/0]). @@ -138,3 +139,47 @@ size(?INFLIGHT(Tree)) -> -spec max_size(inflight()) -> non_neg_integer(). max_size(?INFLIGHT(MaxSize, _Tree)) -> MaxSize. + +-spec query(inflight(), #{continuation => Cont, limit := L}) -> + {[{key(), term()}], #{continuation := Cont, count := C}} +when + Cont :: none | end_of_data | key(), + L :: non_neg_integer(), + C :: non_neg_integer(). +query(?INFLIGHT(Tree), #{limit := Limit} = Pager) -> + Count = gb_trees:size(Tree), + ContKey = maps:get(continuation, Pager, none), + {List, NextCont} = sublist(iterator_from(ContKey, Tree), Limit), + {List, #{continuation => NextCont, count => Count}}. + +iterator_from(none, Tree) -> + gb_trees:iterator(Tree); +iterator_from(ContKey, Tree) -> + It = gb_trees:iterator_from(ContKey, Tree), + case gb_trees:next(It) of + {ContKey, _Val, ItNext} -> ItNext; + _ -> It + end. + +sublist(_It, 0) -> + {[], none}; +sublist(It, Len) -> + {ListAcc, HasNext} = sublist(It, Len, []), + {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}. + +sublist(It, 0, Acc) -> + {Acc, gb_trees:next(It) =/= none}; +sublist(It, Len, Acc) -> + case gb_trees:next(It) of + none -> + {Acc, false}; + {Key, Val, ItNext} -> + sublist(ItNext, Len - 1, [{Key, Val} | Acc]) + end. + +next_cont(_Acc, false) -> + end_of_data; +next_cont([{LastKey, _LastVal} | _Acc], _HasNext) -> + LastKey; +next_cont([], _HasNext) -> + end_of_data. diff --git a/apps/emqx/src/emqx_mqueue.erl b/apps/emqx/src/emqx_mqueue.erl index d085a196b..e3e54cdc9 100644 --- a/apps/emqx/src/emqx_mqueue.erl +++ b/apps/emqx/src/emqx_mqueue.erl @@ -68,7 +68,8 @@ stats/1, dropped/1, to_list/1, - filter/2 + filter/2, + query/2 ]). -define(NO_PRIORITY_TABLE, disabled). @@ -171,6 +172,55 @@ filter(Pred, #mqueue{q = Q, len = Len, dropped = Droppend} = MQ) -> MQ#mqueue{q = Q2, len = Len2, dropped = Droppend + Diff} end. +-spec query(mqueue(), #{continuation => ContMsgId, limit := L}) -> + {[message()], #{continuation := ContMsgId, count := C}} +when + ContMsgId :: none | end_of_data | binary(), + C :: non_neg_integer(), + L :: non_neg_integer(). +query(MQ, #{limit := Limit} = Pager) -> + ContMsgId = maps:get(continuation, Pager, none), + {List, NextCont} = sublist(skip_until(MQ, ContMsgId), Limit), + {List, #{continuation => NextCont, count => len(MQ)}}. + +skip_until(MQ, none = _MsgId) -> + MQ; +skip_until(MQ, MsgId) -> + do_skip_until(MQ, MsgId). + +do_skip_until(MQ, MsgId) -> + case out(MQ) of + {empty, MQ} -> + MQ; + {{value, #message{id = MsgId}}, Q1} -> + Q1; + {{value, _Msg}, Q1} -> + do_skip_until(Q1, MsgId) + end. + +sublist(_MQ, 0) -> + {[], none}; +sublist(MQ, Len) -> + {ListAcc, HasNext} = sublist(MQ, Len, []), + {lists:reverse(ListAcc), next_cont(ListAcc, HasNext)}. + +sublist(MQ, 0, Acc) -> + {Acc, element(1, out(MQ)) =/= empty}; +sublist(MQ, Len, Acc) -> + case out(MQ) of + {empty, _MQ} -> + {Acc, false}; + {{value, Msg}, Q1} -> + sublist(Q1, Len - 1, [Msg | Acc]) + end. + +next_cont(_Acc, false) -> + end_of_data; +next_cont([#message{id = Id} | _Acc], _HasNext) -> + Id; +next_cont([], _HasNext) -> + end_of_data. + to_list(MQ, Acc) -> case out(MQ) of {empty, _MQ} -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index a84ed4d83..de9af5388 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -527,7 +527,7 @@ info(Session) -> -spec info ([atom()], t()) -> [{atom(), _Value}]; - (atom(), t()) -> _Value. + (atom() | {atom(), _Meta}, t()) -> _Value. info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; info(impl, Session) -> diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index e5e60583f..dbb440f41 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -268,6 +268,9 @@ info(inflight_cnt, #session{inflight = Inflight}) -> emqx_inflight:size(Inflight); info(inflight_max, #session{inflight = Inflight}) -> emqx_inflight:max_size(Inflight); +info({inflight_msgs, PagerParams}, #session{inflight = Inflight}) -> + {InflightList, Meta} = emqx_inflight:query(Inflight, PagerParams), + {[I#inflight_data.message || {_, I} <- InflightList], Meta}; info(retry_interval, #session{retry_interval = Interval}) -> Interval; info(mqueue, #session{mqueue = MQueue}) -> @@ -278,6 +281,8 @@ info(mqueue_max, #session{mqueue = MQueue}) -> emqx_mqueue:max_len(MQueue); info(mqueue_dropped, #session{mqueue = MQueue}) -> emqx_mqueue:dropped(MQueue); +info({mqueue_msgs, PagerParams}, #session{mqueue = MQueue}) -> + emqx_mqueue:query(MQueue, PagerParams); info(next_pkt_id, #session{next_pkt_id = PacketId}) -> PacketId; info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) -> diff --git a/apps/emqx/test/emqx_inflight_SUITE.erl b/apps/emqx/test/emqx_inflight_SUITE.erl index c3b7ca6fc..a220129af 100644 --- a/apps/emqx/test/emqx_inflight_SUITE.erl +++ b/apps/emqx/test/emqx_inflight_SUITE.erl @@ -116,5 +116,83 @@ t_window(_) -> ), ?assertEqual([a, b], emqx_inflight:window(Inflight)). -% t_to_list(_) -> -% error('TODO'). +t_to_list(_) -> + Inflight = lists:foldl( + fun(Seq, InflightAcc) -> + emqx_inflight:insert(Seq, integer_to_binary(Seq), InflightAcc) + end, + emqx_inflight:new(100), + [1, 6, 2, 3, 10, 7, 9, 8, 4, 5] + ), + ExpList = [{Seq, integer_to_binary(Seq)} || Seq <- lists:seq(1, 10)], + ?assertEqual(ExpList, emqx_inflight:to_list(Inflight)). + +t_query(_) -> + EmptyInflight = emqx_inflight:new(500), + ?assertMatch( + {[], #{continuation := end_of_data}}, emqx_inflight:query(EmptyInflight, #{limit => 50}) + ), + ?assertMatch( + {[], #{continuation := end_of_data}}, + emqx_inflight:query(EmptyInflight, #{continuation => <<"empty">>, limit => 50}) + ), + ?assertMatch( + {[], #{continuation := end_of_data}}, + emqx_inflight:query(EmptyInflight, #{continuation => none, limit => 50}) + ), + + Inflight = lists:foldl( + fun(Seq, QAcc) -> + emqx_inflight:insert(Seq, integer_to_binary(Seq), QAcc) + end, + EmptyInflight, + lists:reverse(lists:seq(1, 114)) + ), + + LastCont = lists:foldl( + fun(PageSeq, Cont) -> + Limit = 10, + PagerParams = #{continuation => Cont, limit => Limit}, + {Page, #{continuation := NextCont} = Meta} = emqx_inflight:query(Inflight, PagerParams), + ?assertEqual(10, length(Page)), + ExpFirst = PageSeq * Limit - Limit + 1, + ExpLast = PageSeq * Limit, + ?assertEqual({ExpFirst, integer_to_binary(ExpFirst)}, lists:nth(1, Page)), + ?assertEqual({ExpLast, integer_to_binary(ExpLast)}, lists:nth(10, Page)), + ?assertMatch( + #{count := 114, continuation := IntCont} when is_integer(IntCont), + Meta + ), + NextCont + end, + none, + lists:seq(1, 11) + ), + {LastPartialPage, LastMeta} = emqx_inflight:query(Inflight, #{ + continuation => LastCont, limit => 10 + }), + ?assertEqual(4, length(LastPartialPage)), + ?assertEqual({111, <<"111">>}, lists:nth(1, LastPartialPage)), + ?assertEqual({114, <<"114">>}, lists:nth(4, LastPartialPage)), + ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta), + + ?assertMatch( + {[], #{continuation := end_of_data}}, + emqx_inflight:query(Inflight, #{continuation => <<"not-existing-cont-id">>, limit => 10}) + ), + + {LargePage, LargeMeta} = emqx_inflight:query(Inflight, #{limit => 1000}), + ?assertEqual(114, length(LargePage)), + ?assertEqual({1, <<"1">>}, hd(LargePage)), + ?assertEqual({114, <<"114">>}, lists:last(LargePage)), + ?assertMatch(#{continuation := end_of_data}, LargeMeta), + + {FullPage, FullMeta} = emqx_inflight:query(Inflight, #{limit => 114}), + ?assertEqual(114, length(FullPage)), + ?assertEqual({1, <<"1">>}, hd(FullPage)), + ?assertEqual({114, <<"114">>}, lists:last(FullPage)), + ?assertMatch(#{continuation := end_of_data}, FullMeta), + + {EmptyPage, EmptyMeta} = emqx_inflight:query(Inflight, #{limit => 0}), + ?assertEqual([], EmptyPage), + ?assertMatch(#{continuation := none, count := 114}, EmptyMeta). diff --git a/apps/emqx/test/emqx_mqueue_SUITE.erl b/apps/emqx/test/emqx_mqueue_SUITE.erl index 51db4b98a..f3e1629a7 100644 --- a/apps/emqx/test/emqx_mqueue_SUITE.erl +++ b/apps/emqx/test/emqx_mqueue_SUITE.erl @@ -282,6 +282,74 @@ t_dropped(_) -> {Msg, Q2} = ?Q:in(Msg, Q1), ?assertEqual(1, ?Q:dropped(Q2)). +t_query(_) -> + EmptyQ = ?Q:init(#{max_len => 500, store_qos0 => true}), + ?assertMatch({[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{limit => 50})), + ?assertMatch( + {[], #{continuation := end_of_data}}, + ?Q:query(EmptyQ, #{continuation => <<"empty">>, limit => 50}) + ), + ?assertMatch( + {[], #{continuation := end_of_data}}, ?Q:query(EmptyQ, #{continuation => none, limit => 50}) + ), + + Q = lists:foldl( + fun(Seq, QAcc) -> + Msg = emqx_message:make(<<"t">>, integer_to_binary(Seq)), + {_, QAcc1} = ?Q:in(Msg, QAcc), + QAcc1 + end, + EmptyQ, + lists:seq(1, 114) + ), + + LastCont = lists:foldl( + fun(PageSeq, Cont) -> + Limit = 10, + PagerParams = #{continuation => Cont, limit => Limit}, + {Page, #{continuation := NextCont} = Meta} = ?Q:query(Q, PagerParams), + ?assertEqual(10, length(Page)), + ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1), + ExpLastPayload = integer_to_binary(PageSeq * Limit), + ?assertEqual( + ExpFirstPayload, + emqx_message:payload(lists:nth(1, Page)), + #{page_seq => PageSeq, page => Page, meta => Meta} + ), + ?assertEqual(ExpLastPayload, emqx_message:payload(lists:nth(10, Page))), + ?assertMatch(#{count := 114, continuation := <<_/binary>>}, Meta), + NextCont + end, + none, + lists:seq(1, 11) + ), + {LastPartialPage, LastMeta} = ?Q:query(Q, #{continuation => LastCont, limit => 10}), + ?assertEqual(4, length(LastPartialPage)), + ?assertEqual(<<"111">>, emqx_message:payload(lists:nth(1, LastPartialPage))), + ?assertEqual(<<"114">>, emqx_message:payload(lists:nth(4, LastPartialPage))), + ?assertMatch(#{continuation := end_of_data, count := 114}, LastMeta), + + ?assertMatch( + {[], #{continuation := end_of_data}}, + ?Q:query(Q, #{continuation => <<"not-existing-cont-id">>, limit => 10}) + ), + + {LargePage, LargeMeta} = ?Q:query(Q, #{limit => 1000}), + ?assertEqual(114, length(LargePage)), + ?assertEqual(<<"1">>, emqx_message:payload(hd(LargePage))), + ?assertEqual(<<"114">>, emqx_message:payload(lists:last(LargePage))), + ?assertMatch(#{continuation := end_of_data}, LargeMeta), + + {FullPage, FullMeta} = ?Q:query(Q, #{limit => 114}), + ?assertEqual(114, length(FullPage)), + ?assertEqual(<<"1">>, emqx_message:payload(hd(FullPage))), + ?assertEqual(<<"114">>, emqx_message:payload(lists:last(FullPage))), + ?assertMatch(#{continuation := end_of_data}, FullMeta), + + {EmptyPage, EmptyMeta} = ?Q:query(Q, #{limit => 0}), + ?assertEqual([], EmptyPage), + ?assertMatch(#{continuation := none, count := 114}, EmptyMeta). + conservation_prop() -> ?FORALL( {Priorities, Messages}, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 27b1ef2fc..719f62690 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -178,8 +178,36 @@ fields(hasnext) -> >>, Meta = #{desc => Desc, required => true}, [{hasnext, hoconsc:mk(boolean(), Meta)}]; +fields('after') -> + Desc = << + "The value of \"last\" field returned in the previous response. It can then be used" + " in subsequent requests to get the next chunk of results.
" + "It is used instead of \"page\" parameter to traverse volatile data.
" + "Can be omitted or set to \"none\" to get the first chunk of data.
" + "\last\" = end_of_data\" is returned, if there is no more data.
" + "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\"" + " error response." + >>, + Meta = #{ + in => query, desc => Desc, required => false, example => <<"AAYS53qRa0n07AAABFIACg">> + }, + [{'after', hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}]; +fields(last) -> + Desc = << + "An opaque token that can then be in subsequent requests to get " + " the next chunk of results: \"?after={last}\"
" + "if there is no more data, \"last\" = end_of_data\" is returned.
" + "Sending \"after=end_of_table\" back to the server will result in \"400 Bad Request\"" + " error response." + >>, + Meta = #{ + desc => Desc, required => true, example => <<"AAYS53qRa0n07AAABFIACg">> + }, + [{last, hoconsc:mk(hoconsc:union([none, end_of_data, binary()]), Meta)}]; fields(meta) -> - fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext). + fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext); +fields(continuation_meta) -> + fields(last) ++ fields(count). -spec schema_with_example(hocon_schema:type(), term()) -> hocon_schema:field_schema(). schema_with_example(Type, Example) -> diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index a1ab0bc3f..e470805d8 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -52,6 +52,7 @@ kickout_clients/1, list_authz_cache/1, list_client_subscriptions/1, + list_client_msgs/3, client_subscriptions/2, clean_authz_cache/1, clean_authz_cache/2, @@ -417,6 +418,12 @@ list_client_subscriptions_mem(ClientId) -> end end. +list_client_msgs(MsgsType, ClientId, PagerParams) when + MsgsType =:= inflight_msgs; + MsgsType =:= mqueue_msgs +-> + call_client(ClientId, {MsgsType, PagerParams}). + client_subscriptions(Node, ClientId) -> {Node, unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId))}. diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index be8f24bc3..bd3e5723c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -22,6 +22,8 @@ -define(LONG_QUERY_TIMEOUT, 50000). +-define(CONT_BASE64_OPTS, #{mode => urlsafe, padding => false}). + -export([ paginate/3 ]). @@ -37,6 +39,8 @@ -export([ parse_pager_params/1, + parse_cont_pager_params/2, + encode_cont_pager_params/2, parse_qstring/2, init_query_result/0, init_query_state/5, @@ -134,6 +138,33 @@ page(Params) -> limit(Params) when is_map(Params) -> maps:get(<<"limit">>, Params, emqx_mgmt:default_row_limit()). +continuation(Params, Encoding) -> + try + decode_continuation(maps:get(<<"after">>, Params, none), Encoding) + catch + _:_ -> + error + end. + +decode_continuation(none, _Encoding) -> + none; +decode_continuation(end_of_data, _Encoding) -> + %% Clients should not send "after=end_of_data" back to the server + error; +decode_continuation(Cont, none) -> + Cont; +decode_continuation(Cont, base64) -> + base64:decode(Cont, ?CONT_BASE64_OPTS). + +encode_continuation(none, _Encoding) -> + none; +encode_continuation(end_of_data, _Encoding) -> + end_of_data; +encode_continuation(Cont, none) -> + emqx_utils_conv:bin(Cont); +encode_continuation(Cont, base64) -> + base64:encode(emqx_utils_conv:bin(Cont), ?CONT_BASE64_OPTS). + %%-------------------------------------------------------------------- %% Node Query %%-------------------------------------------------------------------- @@ -632,6 +663,25 @@ parse_pager_params(Params) -> false end. +-spec parse_cont_pager_params(map(), none | base64) -> + #{limit := pos_integer(), continuation := none | end_of_table | binary()} | false. +parse_cont_pager_params(Params, Encoding) -> + Cont = continuation(Params, Encoding), + Limit = b2i(limit(Params)), + case Limit > 0 andalso Cont =/= error of + true -> + #{continuation => Cont, limit => Limit}; + false -> + false + end. + +-spec encode_cont_pager_params(map(), none | base64) -> map(). +encode_cont_pager_params(#{continuation := Cont} = Meta, ContEncoding) -> + Meta1 = maps:remove(continuation, Meta), + Meta1#{last => encode_continuation(Cont, ContEncoding)}; +encode_cont_pager_params(Meta, _ContEncoding) -> + Meta. + %%-------------------------------------------------------------------- %% Types %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index bc07d38bf..3555b5df6 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -22,8 +22,8 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_cm.hrl"). -include_lib("hocon/include/hoconsc.hrl"). - -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_utils/include/emqx_utils_api.hrl"). -include("emqx_mgmt.hrl"). @@ -47,7 +47,9 @@ unsubscribe/2, unsubscribe_batch/2, set_keepalive/2, - sessions_count/2 + sessions_count/2, + inflight_msgs/2, + mqueue_msgs/2 ]). -export([ @@ -101,6 +103,8 @@ paths() -> "/clients/:clientid/unsubscribe", "/clients/:clientid/unsubscribe/bulk", "/clients/:clientid/keepalive", + "/clients/:clientid/mqueue_messages", + "/clients/:clientid/inflight_messages", "/sessions_count" ]. @@ -391,6 +395,14 @@ schema("/clients/:clientid/keepalive") -> } } }; +schema("/clients/:clientid/mqueue_messages") -> + ContExample = <<"AAYS53qRa0n07AAABFIACg">>, + RespSchema = ?R_REF(mqueue_messages), + client_msgs_schema(mqueue_msgs, ?DESC(get_client_mqueue_msgs), ContExample, RespSchema); +schema("/clients/:clientid/inflight_messages") -> + ContExample = <<"10">>, + RespSchema = ?R_REF(inflight_messages), + client_msgs_schema(inflight_msgs, ?DESC(get_client_inflight_msgs), ContExample, RespSchema); schema("/sessions_count") -> #{ 'operationId' => sessions_count, @@ -621,6 +633,26 @@ fields(subscribe) -> fields(unsubscribe) -> [ {topic, hoconsc:mk(binary(), #{desc => <<"Topic">>, example => <<"testtopic/#">>})} + ]; +fields(mqueue_messages) -> + [ + {data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(mqueue_msgs_list)})}, + {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})} + ]; +fields(inflight_messages) -> + [ + {data, hoconsc:mk(hoconsc:array(?REF(message)), #{desc => ?DESC(inflight_msgs_list)})}, + {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, continuation_meta), #{})} + ]; +fields(message) -> + [ + {msgid, hoconsc:mk(binary(), #{desc => ?DESC(msg_id)})}, + {topic, hoconsc:mk(binary(), #{desc => ?DESC(msg_topic)})}, + {qos, hoconsc:mk(emqx_schema:qos(), #{desc => ?DESC(msg_qos)})}, + {publish_at, hoconsc:mk(integer(), #{desc => ?DESC(msg_publish_at)})}, + {from_clientid, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_clientid)})}, + {from_username, hoconsc:mk(binary(), #{desc => ?DESC(msg_from_username)})}, + {payload, hoconsc:mk(binary(), #{desc => ?DESC(msg_payload)})} ]. %%%============================================================================================== @@ -693,6 +725,15 @@ set_keepalive(put, #{bindings := #{clientid := ClientID}, body := Body}) -> end end. +mqueue_msgs(get, #{bindings := #{clientid := ClientID}, query_string := QString}) -> + list_client_msgs(mqueue_msgs, ClientID, QString). + +inflight_msgs(get, #{ + bindings := #{clientid := ClientID}, + query_string := QString +}) -> + list_client_msgs(inflight_msgs, ClientID, QString). + %%%============================================================================================== %% api apply @@ -825,6 +866,62 @@ unsubscribe_batch(#{clientid := ClientID, topics := Topics}) -> %%-------------------------------------------------------------------- %% internal function +client_msgs_schema(OpId, Desc, ContExample, RespSchema) -> + #{ + 'operationId' => OpId, + get => #{ + description => Desc, + tags => ?TAGS, + parameters => client_msgs_params(), + responses => #{ + 200 => + emqx_dashboard_swagger:schema_with_example(RespSchema, #{ + <<"data">> => [message_example()], + <<"meta">> => #{ + <<"count">> => 100, + <<"last">> => ContExample + } + }), + 400 => + emqx_dashboard_swagger:error_codes( + ['INVALID_PARAMETER'], <<"Invalid parameters">> + ), + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> + ) + } + } + }. + +client_msgs_params() -> + [ + {clientid, hoconsc:mk(binary(), #{in => path})}, + {payload, + hoconsc:mk(hoconsc:enum([none, base64, plain]), #{ + in => query, + default => base64, + desc => << + "Client's inflight/mqueue messages payload encoding." + " If set to `none`, no payload is returned in the response." + >> + })}, + {max_payload_bytes, + hoconsc:mk(emqx_schema:bytesize(), #{ + in => query, + default => <<"1MB">>, + desc => << + "Client's inflight/mqueue messages payload limit." + " The total payload size of all messages in the response will not exceed this value." + " Messages beyond the limit will be silently omitted in the response." + " The only exception to this rule is when the first message payload" + " is already larger than the limit." + " In this case, the first message will be returned in the response." + >> + })}, + hoconsc:ref(emqx_dashboard_swagger, 'after'), + hoconsc:ref(emqx_dashboard_swagger, limit) + ]. + do_subscribe(ClientID, Topic0, Options) -> try emqx_topic:parse(Topic0, Options) of {Topic, Opts} -> @@ -1037,6 +1134,42 @@ remove_live_sessions(Rows) -> Rows ). +list_client_msgs(MsgType, ClientID, QString) -> + case parse_cont_pager_params(QString, MsgType) of + false -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"after_limit_invalid">>}}; + PagerParams = #{} -> + case emqx_mgmt:list_client_msgs(MsgType, ClientID, PagerParams) of + {error, not_found} -> + {404, ?CLIENTID_NOT_FOUND}; + {Msgs, Meta = #{}} when is_list(Msgs) -> + format_msgs_resp(MsgType, Msgs, Meta, QString) + end + end. + +parse_cont_pager_params(QString, MsgType) -> + case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of + false -> + false; + PagerParams -> + maybe_cast_cont(MsgType, PagerParams) + end. + +maybe_cast_cont(inflight_msgs, #{continuation := Cont} = PagerParams) when is_binary(Cont) -> + try + PagerParams#{continuation => emqx_utils_conv:int(Cont)} + catch + _:_ -> + false + end; +maybe_cast_cont(_, PagerParams) -> + PagerParams. + +%% integer packet id +cont_encoding(inflight_msgs) -> none; +%% binary message id +cont_encoding(mqueue_msgs) -> base64. + %%-------------------------------------------------------------------- %% QueryString to Match Spec @@ -1197,6 +1330,79 @@ format_persistent_session_info(ClientId, PSInfo0) -> ), result_format_undefined_to_null(PSInfo). +format_msgs_resp(MsgType, Msgs, Meta, QString) -> + #{ + <<"payload">> := PayloadFmt, + <<"max_payload_bytes">> := MaxBytes + } = QString, + Meta1 = emqx_mgmt_api:encode_cont_pager_params(Meta, cont_encoding(MsgType)), + Resp = #{meta => Meta1, data => format_msgs(Msgs, PayloadFmt, MaxBytes)}, + %% Make sure minirest won't set another content-type for self-encoded JSON response body + Headers = #{<<"content-type">> => <<"application/json">>}, + case emqx_utils_json:safe_encode(Resp) of + {ok, RespBin} -> + {200, Headers, RespBin}; + _Error when PayloadFmt =:= plain -> + ?BAD_REQUEST( + <<"INVALID_PARAMETER">>, + <<"Some message payloads are not JSON serializable">> + ); + %% Unexpected internal error + Error -> + ?INTERNAL_ERROR(Error) + end. + +format_msgs([FirstMsg | Msgs], PayloadFmt, MaxBytes) -> + %% Always include at least one message payload, even if it exceeds the limit + {FirstMsg1, PayloadSize0} = format_msg(FirstMsg, PayloadFmt), + {Msgs1, _} = + catch lists:foldl( + fun(Msg, {MsgsAcc, SizeAcc} = Acc) -> + {Msg1, PayloadSize} = format_msg(Msg, PayloadFmt), + case SizeAcc + PayloadSize of + SizeAcc1 when SizeAcc1 =< MaxBytes -> + {[Msg1 | MsgsAcc], SizeAcc1}; + _ -> + throw(Acc) + end + end, + {[FirstMsg1], PayloadSize0}, + Msgs + ), + lists:reverse(Msgs1); +format_msgs([], _PayloadFmt, _MaxBytes) -> + []. + +format_msg( + #message{ + id = ID, + qos = Qos, + topic = Topic, + from = From, + timestamp = Timestamp, + headers = Headers, + payload = Payload + }, + PayloadFmt +) -> + Msg = #{ + msgid => emqx_guid:to_hexstr(ID), + qos => Qos, + topic => Topic, + publish_at => Timestamp, + from_clientid => emqx_utils_conv:bin(From), + from_username => maps:get(username, Headers, <<>>) + }, + format_payload(PayloadFmt, Msg, Payload). + +format_payload(none, Msg, _Payload) -> + {Msg, 0}; +format_payload(base64, Msg, Payload) -> + Payload1 = base64:encode(Payload), + {Msg#{payload => Payload1}, erlang:byte_size(Payload1)}; +format_payload(plain, Msg, Payload) -> + {Msg#{payload => Payload}, erlang:iolist_size(Payload)}. + %% format func helpers take_maps_from_inner(_Key, Value, Current) when is_map(Value) -> maps:merge(Current, Value); @@ -1298,6 +1504,17 @@ client_example() -> <<"recv_msg.qos0">> => 0 }. +message_example() -> + #{ + <<"msgid">> => <<"000611F460D57FA9F44500000D360002">>, + <<"topic">> => <<"t/test">>, + <<"qos">> => 0, + <<"publish_at">> => 1709055346487, + <<"from_clientid">> => <<"mqttx_59ac0a87">>, + <<"from_username">> => <<"test-user">>, + <<"payload">> => <<"eyJmb28iOiAiYmFyIn0=">> + }. + sessions_count(get, #{query_string := QString}) -> Since = maps:get(<<"since">>, QString, 0), Count = emqx_cm_registry_keeper:count(Since), diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index e4ad37e04..a007de829 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -23,16 +23,23 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/asserts.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). all() -> AllTCs = emqx_common_test_helpers:all(?MODULE), [ - {group, persistent_sessions} - | AllTCs -- persistent_session_testcases() + {group, persistent_sessions}, + {group, msgs_base64_encoding}, + {group, msgs_plain_encoding} + | AllTCs -- (persistent_session_testcases() ++ client_msgs_testcases()) ]. groups() -> - [{persistent_sessions, persistent_session_testcases()}]. + [ + {persistent_sessions, persistent_session_testcases()}, + {msgs_base64_encoding, client_msgs_testcases()}, + {msgs_plain_encoding, client_msgs_testcases()} + ]. persistent_session_testcases() -> [ @@ -42,12 +49,19 @@ persistent_session_testcases() -> t_persistent_sessions4, t_persistent_sessions5 ]. +client_msgs_testcases() -> + [ + t_inflight_messages, + t_mqueue_messages + ]. init_per_suite(Config) -> + ok = snabbkaffe:start_trace(), emqx_mgmt_api_test_util:init_suite(), Config. end_per_suite(_) -> + ok = snabbkaffe:stop(), emqx_mgmt_api_test_util:end_suite(). init_per_group(persistent_sessions, Config) -> @@ -67,6 +81,10 @@ init_per_group(persistent_sessions, Config) -> #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{nodes, Nodes} | Config]; +init_per_group(msgs_base64_encoding, Config) -> + [{payload_encoding, base64} | Config]; +init_per_group(msgs_plain_encoding, Config) -> + [{payload_encoding, plain} | Config]; init_per_group(_Group, Config) -> Config. @@ -77,6 +95,21 @@ end_per_group(persistent_sessions, Config) -> end_per_group(_Group, _Config) -> ok. +end_per_testcase(TC, _Config) when + TC =:= t_inflight_messages; + TC =:= t_mqueue_messages +-> + ClientId = atom_to_binary(TC), + lists:foreach(fun(P) -> exit(P, kill) end, emqx_cm:lookup_channels(local, ClientId)), + ok = emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> [] =:= emqx_cm:lookup_channels(local, ClientId) end, + 5000 + ); +end_per_testcase(_TC, _Config) -> + ok. + t_clients(_) -> process_flag(trap_exit, true), @@ -759,8 +792,206 @@ t_client_id_not_found(_Config) -> ?assertMatch({error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe"]), UnsubBody)), ?assertMatch( {error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody]) + ), + %% Mqueue messages + ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["mqueue_messages"]))), + %% Inflight messages + ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))). + +t_mqueue_messages(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Topic = <<"t/test_mqueue_msgs">>, + Count = emqx_mgmt:default_row_limit(), + {ok, _Client} = client_with_mqueue(ClientId, Topic, Count), + Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]), + ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + test_messages(Path, Topic, Count, AuthHeader, ?config(payload_encoding, Config)), + + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api( + get, Path, "limit=10&after=not-base64%23%21", AuthHeader + ) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api( + get, Path, "limit=-5&after=not-base64%23%21", AuthHeader + ) ). +t_inflight_messages(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Topic = <<"t/test_inflight_msgs">>, + PubCount = emqx_mgmt:default_row_limit(), + {ok, Client} = client_with_inflight(ClientId, Topic, PubCount), + Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "inflight_messages"]), + InflightLimit = emqx:get_config([mqtt, max_inflight]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + test_messages(Path, Topic, InflightLimit, AuthHeader, ?config(payload_encoding, Config)), + + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api( + get, Path, "limit=10&after=not-int", AuthHeader + ) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api( + get, Path, "limit=-5&after=invalid-int", AuthHeader + ) + ), + emqtt:stop(Client). + +client_with_mqueue(ClientId, Topic, Count) -> + {ok, Client} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 120}} + ]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, Topic, 1), + ok = emqtt:disconnect(Client), + publish_msgs(Topic, Count), + {ok, Client}. + +client_with_inflight(ClientId, Topic, Count) -> + {ok, Client} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {clean_start, true}, + {auto_ack, never} + ]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, Topic, 1), + publish_msgs(Topic, Count), + {ok, Client}. + +publish_msgs(Topic, Count) -> + lists:foreach( + fun(Seq) -> + emqx_broker:publish(emqx_message:make(undefined, ?QOS_1, Topic, integer_to_binary(Seq))) + end, + lists:seq(1, Count) + ). + +test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding) -> + Qs0 = io_lib:format("payload=~s", [PayloadEncoding]), + {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader), + #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp), + + ?assertMatch( + #{ + <<"last">> := <<"end_of_data">>, + <<"count">> := Count + }, + Meta + ), + ?assertEqual(length(Msgs), Count), + lists:foreach( + fun({Seq, #{<<"payload">> := P} = M}) -> + ?assertEqual(Seq, binary_to_integer(decode_payload(P, PayloadEncoding))), + ?assertMatch( + #{ + <<"msgid">> := _, + <<"topic">> := Topic, + <<"qos">> := _, + <<"publish_at">> := _, + <<"from_clientid">> := _, + <<"from_username">> := _ + }, + M + ) + end, + lists:zip(lists:seq(1, Count), Msgs) + ), + + %% The first message payload is <<"1">>, + %% and when it is urlsafe base64 encoded (with no padding), it's <<"MQ">>, + %% so we cover both cases: + %% - when total payload size exceeds the limit, + %% - when the first message payload already exceeds the limit but is still returned in the response. + QsPayloadLimit = io_lib:format("payload=~s&max_payload_bytes=1", [PayloadEncoding]), + {ok, LimitedMsgsResp} = emqx_mgmt_api_test_util:request_api( + get, Path, QsPayloadLimit, AuthHeader + ), + #{<<"meta">> := _, <<"data">> := FirstMsgOnly} = emqx_utils_json:decode(LimitedMsgsResp), + ct:pal("~p", [FirstMsgOnly]), + ?assertEqual(1, length(FirstMsgOnly)), + ?assertEqual( + <<"1">>, decode_payload(maps:get(<<"payload">>, hd(FirstMsgOnly)), PayloadEncoding) + ), + + Limit = 19, + LastCont = lists:foldl( + fun(PageSeq, Cont) -> + Qs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, Cont, Limit]), + {ok, MsgsRespP} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, AuthHeader), + #{ + <<"meta">> := #{<<"last">> := NextCont} = MetaP, + <<"data">> := MsgsP + } = emqx_utils_json:decode(MsgsRespP), + ?assertMatch(#{<<"count">> := Count}, MetaP), + ?assertNotEqual(<<"end_of_data">>, NextCont), + ?assertEqual(length(MsgsP), Limit), + ExpFirstPayload = integer_to_binary(PageSeq * Limit - Limit + 1), + ExpLastPayload = integer_to_binary(PageSeq * Limit), + ?assertEqual( + ExpFirstPayload, decode_payload(maps:get(<<"payload">>, hd(MsgsP)), PayloadEncoding) + ), + ?assertEqual( + ExpLastPayload, + decode_payload(maps:get(<<"payload">>, lists:last(MsgsP)), PayloadEncoding) + ), + NextCont + end, + none, + lists:seq(1, Count div 19) + ), + LastPartialPage = Count div 19 + 1, + LastQs = io_lib:format("payload=~s&after=~s&limit=~p", [PayloadEncoding, LastCont, Limit]), + {ok, MsgsRespLastP} = emqx_mgmt_api_test_util:request_api(get, Path, LastQs, AuthHeader), + #{<<"meta">> := #{<<"last">> := EmptyCont} = MetaLastP, <<"data">> := MsgsLastP} = emqx_utils_json:decode( + MsgsRespLastP + ), + ?assertEqual(<<"end_of_data">>, EmptyCont), + ?assertMatch(#{<<"count">> := Count}, MetaLastP), + + ?assertEqual( + integer_to_binary(LastPartialPage * Limit - Limit + 1), + decode_payload(maps:get(<<"payload">>, hd(MsgsLastP)), PayloadEncoding) + ), + ?assertEqual( + integer_to_binary(Count), + decode_payload(maps:get(<<"payload">>, lists:last(MsgsLastP)), PayloadEncoding) + ), + + ExceedQs = io_lib:format("payload=~s&after=~s&limit=~p", [ + PayloadEncoding, EmptyCont, Limit + ]), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(get, Path, ExceedQs, AuthHeader) + ), + + %% Invalid common page params + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(get, Path, "limit=0", AuthHeader) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(get, Path, "limit=limit", AuthHeader) + ). + +decode_payload(Payload, base64) -> + base64:decode(Payload); +decode_payload(Payload, _) -> + Payload. + t_subscribe_shared_topic(_Config) -> ClientId = <<"client_subscribe_shared">>, diff --git a/changes/ce/feat-12561.en.md b/changes/ce/feat-12561.en.md new file mode 100644 index 000000000..072a71373 --- /dev/null +++ b/changes/ce/feat-12561.en.md @@ -0,0 +1,21 @@ +Implement HTTP APIs to get the list of client's inflight and mqueue messages. + +To get the first chunk of data: + - GET /clients/{clientid}/mqueue_messages?limit=100 + - GET /clients/{clientid}/inflight_messages?limit=100 + +Alternatively: + - GET /clients/{clientid}/mqueue_messages?limit=100&after=none + - GET /clients/{clientid}/inflight_messages?limit=100&after=none + +To get the next chunk of data: + - GET /clients/{clientid}/mqueue_messages?limit=100&after={last} + - GET /clients/{clientid}/inflight_messages?limit=100&after={last} + + Where {last} is a value (opaque string token) of "meta.last" field from the previous response. + + If there is no more data, "last" = "end_of_data" is returned. + If a subsequent request is attempted with "after=end_of_data", a "400 Bad Request" error response will be received. + +Mqueue messages are ordered according to the queue (FIFO) order. +Inflight messages are ordered by MQTT Packet Id, which may not represent the chronological messages order. diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index 2431c09ec..3175715e0 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -35,6 +35,57 @@ get_client_subs.desc: get_client_subs.label: """Get client subscriptions""" +get_client_mqueue_msgs.desc: +"""Get client mqueue messages""" +get_client_mqueue_msgs.label: +"""Get client mqueue messages""" + +get_client_inflight_msgs.desc: +"""Get client inflight messages""" +get_client_inflight_msgs.label: +"""Get client inflight messages""" + +mqueue_msgs_list.desc: +"""Client's mqueue messages list. The queue (FIFO) ordering is preserved.""" +mqueue_msgs_list.label: +"""Client's mqueue messages""" + +inflight_msgs_list.desc: +"""Client's inflight messages list. +Ordered by MQTT Packet Id, which may not represent the chronological messages order.""" +inflight_msgs_list.label: +"""Client's inflight messages""" + +msg_id.desc: +"""Message ID.""" +msg_id.label: +"""Message ID""" + +msg_topic.desc: +"""Message topic.""" +msg_topic.label: +"""Message Topic""" + +msg_qos.desc: +"""Message QoS.""" +msg_topic.label: +"""Message Qos""" + +msg_publish_at.desc: +"""Message publish time, a millisecond precision Unix epoch timestamp.""" +msg_publish_at.label: +"""Message Publish Time.""" + +msg_from_clientid.desc: +"""Message publisher's client ID.""" +msg_from_clientid.desc: +"""Message publisher's Client ID""" + +msg_from_username.desc: +"""Message publisher's username.""" +msg_from_username.label: +"""Message Publisher's Username """ + subscribe.desc: """Subscribe""" subscribe.label: From c62776edafa1829c7c9ddfbc88deb6eae452ce87 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 7 Mar 2024 12:19:53 +0100 Subject: [PATCH 05/82] fix(sessds): Prevent hot update of session_persistence.enable config --- apps/emqx/src/emqx_config.erl | 1 + apps/emqx/src/emqx_persistent_message.erl | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 5147f2b6d..9da453260 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -715,6 +715,7 @@ add_handlers() -> ok = emqx_config_logger:add_handler(), ok = emqx_config_zones:add_handler(), emqx_sys_mon:add_handler(), + emqx_persistent_message:add_handler(), ok. remove_handlers() -> diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 9787dfd9a..61a1568ba 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -16,11 +16,16 @@ -module(emqx_persistent_message). +-behaviour(emqx_config_handler). + -include("emqx.hrl"). -export([init/0]). -export([is_persistence_enabled/0, force_ds/0]). +%% Config handler +-export([add_handler/0, pre_config_update/3]). + %% Message persistence -export([ persist/1 @@ -66,6 +71,19 @@ storage_backend(Path) -> %%-------------------------------------------------------------------- +-spec add_handler() -> ok. +add_handler() -> + emqx_conf:add_handler([session_persistence], ?MODULE). + +pre_config_update([session_persistence], #{<<"enable">> := New}, #{<<"enable">> := Old}) when + New =/= Old +-> + {error, "Hot update of session_persistence.enable parameter is currently not supported"}; +pre_config_update(_Root, _NewConf, _OldConf) -> + ok. + +%%-------------------------------------------------------------------- + -spec persist(emqx_types:message()) -> ok | {skipped, _Reason} | {error, _TODO}. persist(Msg) -> From 2146d9e1fea090577ed36de59afb635ff2fb24d5 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Mar 2024 19:10:25 +0100 Subject: [PATCH 06/82] feat(ds): introduce error classes in critical API functions For now, only recoverable / unrecoverable errors are introduced. --- apps/emqx_durable_storage/src/emqx_ds.erl | 10 ++- .../src/emqx_ds_replication_layer.erl | 62 +++++++++++-------- .../src/emqx_ds_storage_bitfield_lts.erl | 23 ++++--- .../src/emqx_ds_storage_layer.erl | 12 ++-- .../test/emqx_ds_SUITE.erl | 25 +++++--- 5 files changed, 75 insertions(+), 57 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index c7fa3552b..4143c9ffd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -68,6 +68,8 @@ make_iterator_result/1, make_iterator_result/0, make_delete_iterator_result/1, make_delete_iterator_result/0, + error/1, + ds_specific_stream/0, ds_specific_iterator/0, ds_specific_generation_rank/0, @@ -118,14 +120,14 @@ -type message_key() :: binary(). --type store_batch_result() :: ok | {error, _}. +-type store_batch_result() :: ok | error(_). --type make_iterator_result(Iterator) :: {ok, Iterator} | {error, _}. +-type make_iterator_result(Iterator) :: {ok, Iterator} | error(_). -type make_iterator_result() :: make_iterator_result(iterator()). -type next_result(Iterator) :: - {ok, Iterator, [{message_key(), emqx_types:message()}]} | {ok, end_of_stream} | {error, _}. + {ok, Iterator, [{message_key(), emqx_types:message()}]} | {ok, end_of_stream} | error(_). -type next_result() :: next_result(iterator()). @@ -142,6 +144,8 @@ -type delete_next_result() :: delete_next_result(delete_iterator()). +-type error(Reason) :: {error, recoverable | unrecoverable, Reason}. + %% Timestamp %% Earliest possible timestamp is 0. %% TODO granularity? Currently, we should always use milliseconds, as that's the unit we diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index ed3a93212..49d7f95a2 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -171,7 +171,14 @@ drop_db(DB) -> -spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). store_batch(DB, Messages, Opts) -> - emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts). + case emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) of + ok -> + ok; + Error = {error, _, _} -> + Error; + RPCError = {badrpc, _} -> + {error, recoverable, RPCError} + end. -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. @@ -180,7 +187,14 @@ get_streams(DB, TopicFilter, StartTime) -> lists:flatmap( fun(Shard) -> Node = node_of_shard(DB, Shard), - Streams = emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime), + Streams = + try + emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime) + catch + error:{erpc, _} -> + %% TODO: log? + [] + end, lists:map( fun({RankY, StorageLayerStream}) -> RankX = Shard, @@ -198,35 +212,29 @@ get_streams(DB, TopicFilter, StartTime) -> make_iterator(DB, Stream, TopicFilter, StartTime) -> ?stream_v2(Shard, StorageStream) = Stream, Node = node_of_shard(DB, Shard), - case emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of + try emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; - Err = {error, _} -> - Err + Error = {error, _, _} -> + Error + catch + error:RPCError = {erpc, _} -> + {error, recoverable, RPCError} end. --spec update_iterator( - emqx_ds:db(), - iterator(), - emqx_ds:message_key() -) -> +-spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) -> emqx_ds:make_iterator_result(iterator()). update_iterator(DB, OldIter, DSKey) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, Node = node_of_shard(DB, Shard), - case - emqx_ds_proto_v4:update_iterator( - Node, - DB, - Shard, - StorageIter, - DSKey - ) - of + try emqx_ds_proto_v4:update_iterator(Node, DB, Shard, StorageIter, DSKey) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; - Err = {error, _} -> - Err + Error = {error, _, _} -> + Error + catch + error:RPCError = {erpc, _} -> + {error, recoverable, RPCError} end. -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). @@ -245,8 +253,12 @@ next(DB, Iter0, BatchSize) -> {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; - Other -> - Other + Ok = {ok, _} -> + Ok; + Error = {error, _, _} -> + Error; + RPCError = {badrpc, _} -> + {error, recoverable, RPCError} end. -spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). @@ -337,7 +349,7 @@ do_get_streams_v2(DB, Shard, TopicFilter, StartTime) -> emqx_ds:topic_filter(), emqx_ds:time() ) -> - {ok, emqx_ds_storage_layer:iterator()} | {error, _}. + emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) -> error(obsolete_api). @@ -348,7 +360,7 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) -> emqx_ds:topic_filter(), emqx_ds:time() ) -> - {ok, emqx_ds_storage_layer:iterator()} | {error, _}. + emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index d265d8fec..8de64e313 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -230,7 +230,7 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{}) -> emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options = #{atomic := true}) -> +store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> {ok, Batch} = rocksdb:batch(), lists:foreach( fun(Msg) -> @@ -240,18 +240,17 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options = #{atomi end, Messages ), - Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), + Result = rocksdb:write_batch(DB, Batch, []), rocksdb:release_batch(Batch), - Res; -store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> - lists:foreach( - fun(Msg) -> - {Key, _} = make_key(S, Msg), - Val = serialize(Msg), - rocksdb:put(DB, Data, Key, Val, []) - end, - Messages - ). + %% NOTE + %% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to + %% observe until there's `{no_slowdown, true}` in write options. + case Result of + ok -> + ok; + {error, {error, Reason}} -> + {error, unrecoverable, {rocksdb, Reason}} + end. -spec get_streams( emqx_ds_storage_layer:shard_id(), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index e0bf1fa1b..7cb1b67f7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -256,12 +256,10 @@ make_iterator( Err end; {error, not_found} -> - {error, end_of_stream} + {error, unrecoverable, generation_not_found} end. --spec update_iterator( - shard_id(), iterator(), emqx_ds:message_key() -) -> +-spec update_iterator(shard_id(), iterator(), emqx_ds:message_key()) -> emqx_ds:make_iterator_result(iterator()). update_iterator( Shard, @@ -281,7 +279,7 @@ update_iterator( Err end; {error, not_found} -> - {error, end_of_stream} + {error, unrecoverable, generation_not_found} end. -spec next(shard_id(), iterator(), pos_integer()) -> @@ -298,12 +296,12 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch {ok, end_of_stream}; {ok, GenIter, Batch} -> {ok, Iter#{?enc := GenIter}, Batch}; - Error = {error, _} -> + Error = {error, _, _} -> Error end; {error, not_found} -> %% generation was possibly dropped by GC - {ok, end_of_stream} + {error, unrecoverable, generation_not_found} end. -spec update_config(shard_id(), emqx_ds:create_db_opts()) -> ok. diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index a0dae0e6f..b491657b0 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -404,7 +404,10 @@ t_drop_generation_with_never_used_iterator(_Config) -> ], ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), - ?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter0, 1)), + ?assertMatch( + {error, unrecoverable, generation_not_found, []}, + iterate(DB, Iter0, 1) + ), %% New iterator for the new stream will only see the later messages. [{_, Stream1}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), @@ -453,9 +456,10 @@ t_drop_generation_with_used_once_iterator(_Config) -> ], ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1)), - ?assertMatch({ok, end_of_stream, []}, iterate(DB, Iter1, 1)), - - ok. + ?assertMatch( + {error, unrecoverable, generation_not_found, []}, + iterate(DB, Iter1, 1) + ). t_drop_generation_update_iterator(_Config) -> %% This checks the behavior of `emqx_ds:update_iterator' after the generation @@ -481,9 +485,10 @@ t_drop_generation_update_iterator(_Config) -> ok = emqx_ds:add_generation(DB), ok = emqx_ds:drop_generation(DB, GenId0), - ?assertEqual({error, end_of_stream}, emqx_ds:update_iterator(DB, Iter1, Key2)), - - ok. + ?assertEqual( + {error, unrecoverable, generation_not_found}, + emqx_ds:update_iterator(DB, Iter1, Key2) + ). t_make_iterator_stale_stream(_Config) -> %% This checks the behavior of `emqx_ds:make_iterator' after the generation underlying @@ -507,7 +512,7 @@ t_make_iterator_stale_stream(_Config) -> ok = emqx_ds:drop_generation(DB, GenId0), ?assertEqual( - {error, end_of_stream}, + {error, unrecoverable, generation_not_found}, emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime) ), @@ -605,8 +610,8 @@ iterate(DB, It0, BatchSize, Acc) -> iterate(DB, It, BatchSize, Acc ++ Msgs); {ok, end_of_stream} -> {ok, end_of_stream, Acc}; - Ret -> - Ret + {error, Class, Reason} -> + {error, Class, Reason, Acc} end. %% CT callbacks From 1cf672e78dbc96c6f1ed83fd9c8512a4c2f71285 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Mar 2024 19:11:40 +0100 Subject: [PATCH 07/82] feat(sessds): handle recoverable errors during replay --- apps/emqx/src/emqx_persistent_session_ds.erl | 108 +++++++++++++------ 1 file changed, 77 insertions(+), 31 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 2cbf65b47..3517d6b73 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -123,7 +123,12 @@ -define(TIMER_PULL, timer_pull). -define(TIMER_GET_STREAMS, timer_get_streams). -define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). --type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. +-define(TIMER_RETRY_REPLAY, timer_retry_replay). + +-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT | ?TIMER_RETRY_REPLAY. + +%% TODO: Needs configuration? +-define(TIMEOUT_RETRY_REPLAY, 1000). -type session() :: #{ %% Client ID @@ -134,6 +139,8 @@ s := emqx_persistent_session_ds_state:t(), %% Buffer: inflight := emqx_persistent_session_ds_inflight:t(), + %% In-progress replay: + replay => [{_StreamKey, stream_state()}, ...], %% Timers: timer() => reference() }. @@ -454,7 +461,7 @@ handle_timeout( ClientInfo, ?TIMER_PULL, Session0 -) -> +) when not is_map_key(replay, Session0) -> {Publishes, Session1} = drain_buffer(fetch_new_messages(Session0, ClientInfo)), Timeout = case Publishes of @@ -465,6 +472,12 @@ handle_timeout( end, Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; +handle_timeout(ClientInfo, ?TIMER_PULL, Session0 = #{replay := [_ | _]}) -> + Session = replay_streams(Session0, ClientInfo), + {ok, [], Session}; +handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) -> + Session = replay_streams(Session0, ClientInfo), + {ok, [], Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> S1 = emqx_persistent_session_ds_subs:gc(S0), S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1), @@ -503,30 +516,44 @@ bump_last_alive(S0) -> {ok, replies(), session()}. replay(ClientInfo, [], Session0 = #{s := S0}) -> Streams = emqx_persistent_session_ds_stream_scheduler:find_replay_streams(S0), - Session = lists:foldl( - fun({_StreamKey, Stream}, SessionAcc) -> - replay_batch(Stream, SessionAcc, ClientInfo) - end, - Session0, - Streams - ), + Session = replay_streams(Session0#{replay => Streams}, ClientInfo), + {ok, [], Session}. + +replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) -> + case replay_batch(Srs0, Session0, ClientInfo) of + Session = #{} -> + replay_streams(Session#{replay := Rest}, ClientInfo); + {error, _, _} -> + emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session0) + end; +replay_streams(Session0 = #{replay := []}, _ClientInfo) -> + Session = maps:remove(replay, Session0), %% Note: we filled the buffer with the historical messages, and %% from now on we'll rely on the normal inflight/flow control %% mechanisms to replay them: - {ok, [], pull_now(Session)}. + pull_now(Session). -spec replay_batch(stream_state(), session(), clientinfo()) -> session(). -replay_batch(Srs0, Session, ClientInfo) -> +replay_batch(Srs0, Session0, ClientInfo) -> #srs{batch_size = BatchSize} = Srs0, - %% TODO: retry on errors: - {Srs, Inflight} = enqueue_batch(true, BatchSize, Srs0, Session, ClientInfo), - %% Assert: - Srs =:= Srs0 orelse - ?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{ - expected => Srs0, - got => Srs - }), - Session#{inflight => Inflight}. + case enqueue_batch(true, BatchSize, Srs0, Session0, ClientInfo) of + {ok, Srs, Session} -> + %% Assert: + Srs =:= Srs0 orelse + ?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{ + expected => Srs0, + got => Srs + }), + Session; + {error, recoverable, Reason} = Error -> + ?SLOG(warning, #{ + msg => "failed_to_fetch_replay_batch", + stream => Srs0, + reason => Reason, + class => recoverable + }), + Error + end. %%-------------------------------------------------------------------- @@ -743,7 +770,7 @@ fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo fetch_new_messages(Streams, Session, ClientInfo) end. -new_batch({StreamKey, Srs0}, BatchSize, Session = #{s := S0}, ClientInfo) -> +new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0), SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0), Srs1 = Srs0#srs{ @@ -753,11 +780,30 @@ new_batch({StreamKey, Srs0}, BatchSize, Session = #{s := S0}, ClientInfo) -> last_seqno_qos1 = SN1, last_seqno_qos2 = SN2 }, - {Srs, Inflight} = enqueue_batch(false, BatchSize, Srs1, Session, ClientInfo), - S1 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_1), Srs#srs.last_seqno_qos1, S0), - S2 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_2), Srs#srs.last_seqno_qos2, S1), - S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2), - Session#{s => S, inflight => Inflight}. + case enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of + {ok, Srs, Session} -> + S1 = emqx_persistent_session_ds_state:put_seqno( + ?next(?QOS_1), + Srs#srs.last_seqno_qos1, + S0 + ), + S2 = emqx_persistent_session_ds_state:put_seqno( + ?next(?QOS_2), + Srs#srs.last_seqno_qos2, + S1 + ), + S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2), + Session#{s => S}; + {error, Class, Reason} -> + %% TODO: Handle unrecoverable error. + ?SLOG(info, #{ + msg => "failed_to_fetch_batch", + stream => Srs1, + reason => Reason, + class => Class + }), + Session0 + end. enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, ClientInfo) -> #srs{ @@ -786,13 +832,13 @@ enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, Cli last_seqno_qos1 = LastSeqnoQos1, last_seqno_qos2 = LastSeqnoQos2 }, - {Srs, Inflight}; + {ok, Srs, Session#{inflight := Inflight}}; {ok, end_of_stream} -> %% No new messages; just update the end iterator: - {Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0}, Inflight0}; - {error, _} when not IsReplay -> - ?SLOG(info, #{msg => "failed_to_fetch_batch", iterator => ItBegin}), - {Srs0, Inflight0} + Srs = Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0}, + {ok, Srs, Session#{inflight := Inflight0}}; + {error, _, _} = Error -> + Error end. %% key_of_iter(#{3 := #{3 := #{5 := K}}}) -> From b39c710ec21f02184986a7df2fb6a88d45eb485e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Sun, 3 Mar 2024 22:20:16 +0100 Subject: [PATCH 08/82] fix(ds): tidy up few typespecs --- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- apps/emqx/src/emqx_rpc.erl | 1 + .../src/emqx_ds_replication_layer.erl | 11 +++++------ .../src/proto/emqx_ds_proto_v4.erl | 8 +++----- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 3517d6b73..790e2d477 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -533,7 +533,7 @@ replay_streams(Session0 = #{replay := []}, _ClientInfo) -> %% mechanisms to replay them: pull_now(Session). --spec replay_batch(stream_state(), session(), clientinfo()) -> session(). +-spec replay_batch(stream_state(), session(), clientinfo()) -> session() | emqx_ds:error(_). replay_batch(Srs0, Session0, ClientInfo) -> #srs{batch_size = BatchSize} = Srs0, case enqueue_batch(true, BatchSize, Srs0, Session0, ClientInfo) of diff --git a/apps/emqx/src/emqx_rpc.erl b/apps/emqx/src/emqx_rpc.erl index e6ce5002a..61aa2a8ca 100644 --- a/apps/emqx/src/emqx_rpc.erl +++ b/apps/emqx/src/emqx_rpc.erl @@ -35,6 +35,7 @@ -export_type([ badrpc/0, + call_result/1, call_result/0, cast_result/0, multicall_result/1, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 49d7f95a2..1b5f21a11 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -171,13 +171,12 @@ drop_db(DB) -> -spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). store_batch(DB, Messages, Opts) -> - case emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) of + try emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) of ok -> - ok; - Error = {error, _, _} -> - Error; - RPCError = {badrpc, _} -> - {error, recoverable, RPCError} + ok + catch + error:{Reason, _Call} when Reason == timeout; Reason == noproc -> + {error, recoverable, Reason} end. -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl index fcab12507..36a983ce9 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl @@ -64,7 +64,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) -> emqx_ds:topic_filter(), emqx_ds:time() ) -> - {ok, emqx_ds_storage_layer:iterator()} | {error, _}. + emqx_ds:make_iterator_result(). make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v2, [ DB, Shard, Stream, TopicFilter, StartTime @@ -77,9 +77,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:iterator(), pos_integer() ) -> - {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]} - | {ok, end_of_stream} - | {error, _}. + emqx_rpc:call_result(emqx_ds:next_result()). next(Node, DB, Shard, Iter, BatchSize) -> emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). @@ -103,7 +101,7 @@ store_batch(Node, DB, Shard, Batch, Options) -> emqx_ds_storage_layer:iterator(), emqx_ds:message_key() ) -> - {ok, emqx_ds_storage_layer:iterator()} | {error, _}. + emqx_ds:make_iterator_result(). update_iterator(Node, DB, Shard, OldIter, DSKey) -> erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [ DB, Shard, OldIter, DSKey From 3f3e33b2cb36763100b5ca4a8fa0ef3a9a705f6d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 5 Mar 2024 20:16:52 +0100 Subject: [PATCH 09/82] fix(sessds): untangle pull and replay retry timers And restore the convention that timer handler always manages only its own timers. --- apps/emqx/src/emqx_persistent_session_ds.erl | 31 +++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 790e2d477..4c42b2415 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -140,11 +140,14 @@ %% Buffer: inflight := emqx_persistent_session_ds_inflight:t(), %% In-progress replay: + %% List of stream replay states to be added to the inflight buffer. replay => [{_StreamKey, stream_state()}, ...], %% Timers: timer() => reference() }. +-define(IS_REPLAY_ONGOING(SESS), is_map_key(replay, SESS)). + -record(req_sync, { from :: pid(), ref :: reference() @@ -457,12 +460,14 @@ deliver(ClientInfo, Delivers, Session0) -> -spec handle_timeout(clientinfo(), _Timeout, session()) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. -handle_timeout( - ClientInfo, - ?TIMER_PULL, - Session0 -) when not is_map_key(replay, Session0) -> - {Publishes, Session1} = drain_buffer(fetch_new_messages(Session0, ClientInfo)), +handle_timeout(ClientInfo, ?TIMER_PULL, Session0) -> + {Publishes, Session1} = + case ?IS_REPLAY_ONGOING(Session0) of + false -> + drain_buffer(fetch_new_messages(Session0, ClientInfo)); + true -> + {[], Session0} + end, Timeout = case Publishes of [] -> @@ -472,11 +477,15 @@ handle_timeout( end, Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; -handle_timeout(ClientInfo, ?TIMER_PULL, Session0 = #{replay := [_ | _]}) -> - Session = replay_streams(Session0, ClientInfo), - {ok, [], Session}; handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) -> - Session = replay_streams(Session0, ClientInfo), + Session1 = replay_streams(Session0, ClientInfo), + Session = + case ?IS_REPLAY_ONGOING(Session1) of + true -> + emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session1); + false -> + Session1 + end, {ok, [], Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> S1 = emqx_persistent_session_ds_subs:gc(S0), @@ -524,7 +533,7 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) Session = #{} -> replay_streams(Session#{replay := Rest}, ClientInfo); {error, _, _} -> - emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session0) + Session0 end; replay_streams(Session0 = #{replay := []}, _ClientInfo) -> Session = maps:remove(replay, Session0), From b604c3dbd4f4da271083c2d211200739bc9b8057 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 5 Mar 2024 20:26:18 +0100 Subject: [PATCH 10/82] refactor(sessds): make replay error handling a bit more clear Also leave a forgotten TODO. --- apps/emqx/src/emqx_persistent_session_ds.erl | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 4c42b2415..5c5fe5b82 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -532,8 +532,15 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) case replay_batch(Srs0, Session0, ClientInfo) of Session = #{} -> replay_streams(Session#{replay := Rest}, ClientInfo); - {error, _, _} -> + {error, recoverable, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_fetch_replay_batch", + stream => Srs0, + reason => Reason, + class => recoverable + }), Session0 + %% TODO: Handle unrecoverable errors. end; replay_streams(Session0 = #{replay := []}, _ClientInfo) -> Session = maps:remove(replay, Session0), @@ -554,13 +561,7 @@ replay_batch(Srs0, Session0, ClientInfo) -> got => Srs }), Session; - {error, recoverable, Reason} = Error -> - ?SLOG(warning, #{ - msg => "failed_to_fetch_replay_batch", - stream => Srs0, - reason => Reason, - class => recoverable - }), + {error, _, _} = Error -> Error end. From 09905d78cd86f459c1b1cf6958ee230f748c0419 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 5 Mar 2024 20:26:53 +0100 Subject: [PATCH 11/82] chore(ds): make error handling slightly simpler Co-Authored-By: Thales Macedo Garitezi --- apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 1b5f21a11..1738867b5 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -171,9 +171,8 @@ drop_db(DB) -> -spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). store_batch(DB, Messages, Opts) -> - try emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) of - ok -> - ok + try + emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) catch error:{Reason, _Call} when Reason == timeout; Reason == noproc -> {error, recoverable, Reason} From e7e8771277a759731f179680dd38137991a51d8a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 7 Mar 2024 12:45:10 +0100 Subject: [PATCH 12/82] fix(sessds): set replay retry timer if initial `replay/3` fails --- apps/emqx/src/emqx_persistent_session_ds.erl | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 5c5fe5b82..4757e8670 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -478,14 +478,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) -> Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) -> - Session1 = replay_streams(Session0, ClientInfo), - Session = - case ?IS_REPLAY_ONGOING(Session1) of - true -> - emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session1); - false -> - Session1 - end, + Session = replay_streams(Session0, ClientInfo), {ok, [], Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> S1 = emqx_persistent_session_ds_subs:gc(S0), @@ -533,13 +526,15 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) Session = #{} -> replay_streams(Session#{replay := Rest}, ClientInfo); {error, recoverable, Reason} -> + RetryTimeout = ?TIMEOUT_RETRY_REPLAY, ?SLOG(warning, #{ msg => "failed_to_fetch_replay_batch", stream => Srs0, reason => Reason, - class => recoverable + class => recoverable, + retry_in_ms => RetryTimeout }), - Session0 + emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0) %% TODO: Handle unrecoverable errors. end; replay_streams(Session0 = #{replay := []}, _ClientInfo) -> From 69427dc42d5da3a0217923ee24de6cf417198561 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 7 Mar 2024 12:47:05 +0100 Subject: [PATCH 13/82] test(ds): add tests for error mapping and replay recovery --- apps/emqx/include/asserts.hrl | 21 +++++ .../emqx_persistent_session_ds_SUITE.erl | 72 ++++++++++++++- apps/emqx/test/emqx_common_test_helpers.erl | 11 +++ .../test/emqx_ds_SUITE.erl | 91 ++++++++++++++++++- .../test/emqx_ds_test_helpers.erl | 58 ++++++++++++ 5 files changed, 251 insertions(+), 2 deletions(-) create mode 100644 apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl diff --git a/apps/emqx/include/asserts.hrl b/apps/emqx/include/asserts.hrl index a200394e4..5744fdbf3 100644 --- a/apps/emqx/include/asserts.hrl +++ b/apps/emqx/include/asserts.hrl @@ -83,6 +83,27 @@ end)() ). +-define(assertMatchOneOf(PAT1, PAT2, EXPR), + (fun() -> + case (X__V = (EXPR)) of + PAT1 -> + X__V; + PAT2 -> + X__V; + _ -> + erlang:error( + {assertMatch, [ + {module, ?MODULE}, + {line, ?LINE}, + {expression, (??EXPR)}, + {pattern, "one of [ " ++ (??PAT1) ++ ", " ++ (??PAT2) ++ " ]"}, + {value, X__V} + ]} + ) + end + end)() +). + -define(assertExceptionOneOf(CT1, CT2, EXPR), (fun() -> X__Attrs = [ diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index c0631e7ab..c928f10da 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -118,7 +118,6 @@ app_specs() -> app_specs(Opts) -> ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""), [ - emqx_durable_storage, {emqx, "session_persistence = {enable = true}" ++ ExtraEMQXConf} ]. @@ -154,6 +153,14 @@ start_client(Opts0 = #{}) -> on_exit(fun() -> catch emqtt:stop(Client) end), Client. +start_connect_client(Opts = #{}) -> + Client = start_client(Opts), + ?assertMatch({ok, _}, emqtt:connect(Client)), + Client. + +mk_clientid(Prefix, ID) -> + iolist_to_binary(io_lib:format("~p/~p", [Prefix, ID])). + restart_node(Node, NodeSpec) -> ?tp(will_restart_node, #{}), emqx_cth_cluster:restart(Node, NodeSpec), @@ -599,3 +606,66 @@ t_session_gc(Config) -> [] ), ok. + +t_session_replay_retry(_Config) -> + %% Verify that the session recovers smoothly from transient errors during + %% replay. + + ok = emqx_ds_test_helpers:mock_rpc(), + + NClients = 10, + ClientSubOpts = #{ + clientid => mk_clientid(?FUNCTION_NAME, sub), + auto_ack => never + }, + ClientSub = start_connect_client(ClientSubOpts), + ?assertMatch( + {ok, _, [?RC_GRANTED_QOS_1]}, + emqtt:subscribe(ClientSub, <<"t/#">>, ?QOS_1) + ), + + ClientsPub = [ + start_connect_client(#{ + clientid => mk_clientid(?FUNCTION_NAME, I), + properties => #{'Session-Expiry-Interval' => 0} + }) + || I <- lists:seq(1, NClients) + ], + lists:foreach( + fun(Client) -> + Index = integer_to_binary(rand:uniform(NClients)), + Topic = <<"t/", Index/binary>>, + ?assertMatch({ok, #{}}, emqtt:publish(Client, Topic, Index, 1)) + end, + ClientsPub + ), + + Pubs0 = emqx_common_test_helpers:wait_publishes(NClients, 5_000), + NPubs = length(Pubs0), + ?assertEqual(NClients, NPubs, ?drainMailbox()), + + ok = emqtt:stop(ClientSub), + + %% Make `emqx_ds` believe that roughly half of the shards are unavailable. + ok = emqx_ds_test_helpers:mock_rpc_result( + fun(_Node, emqx_ds_replication_layer, _Function, [_DB, Shard | _]) -> + case erlang:phash2(Shard) rem 2 of + 0 -> unavailable; + 1 -> passthrough + end + end + ), + + _ClientSub = start_connect_client(ClientSubOpts#{clean_start => false}), + + Pubs1 = emqx_common_test_helpers:wait_publishes(NPubs, 5_000), + ?assert(length(Pubs1) < length(Pubs0), Pubs1), + + %% "Recover" the shards. + emqx_ds_test_helpers:unmock_rpc(), + + Pubs2 = emqx_common_test_helpers:wait_publishes(NPubs - length(Pubs1), 5_000), + ?assertEqual( + [maps:with([topic, payload, qos], P) || P <- Pubs0], + [maps:with([topic, payload, qos], P) || P <- Pubs1 ++ Pubs2] + ). diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index a383e0b2c..7a25e925d 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -61,6 +61,7 @@ read_schema_configs/2, render_config_file/2, wait_for/4, + wait_publishes/2, wait_mqtt_payload/1, select_free_port/1 ]). @@ -426,6 +427,16 @@ wait_for(Fn, Ln, F, Timeout) -> {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end), wait_for_down(Fn, Ln, Timeout, Pid, Mref, false). +wait_publishes(0, _Timeout) -> + []; +wait_publishes(Count, Timeout) -> + receive + {publish, Msg} -> + [Msg | wait_publishes(Count - 1, Timeout)] + after Timeout -> + [] + end. + flush() -> flush([]). diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index b491657b0..002f5c557 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(N_SHARDS, 1). @@ -553,9 +554,93 @@ t_get_streams_concurrently_with_drop_generation(_Config) -> ok end, [] + ). + +t_error_mapping_replication_layer(_Config) -> + %% This checks that the replication layer maps recoverable errors correctly. + + ok = emqx_ds_test_helpers:mock_rpc(), + ok = snabbkaffe:start_trace(), + + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), + [Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB), + + TopicFilter = emqx_topic:words(<<"foo/#">>), + Msgs = [ + message(<<"C1">>, <<"foo/bar">>, <<"1">>, 0), + message(<<"C1">>, <<"foo/baz">>, <<"2">>, 1), + message(<<"C2">>, <<"foo/foo">>, <<"3">>, 2), + message(<<"C3">>, <<"foo/xyz">>, <<"4">>, 3), + message(<<"C4">>, <<"foo/bar">>, <<"5">>, 4), + message(<<"C5">>, <<"foo/oof">>, <<"6">>, 5) + ], + + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + + ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard1}), + ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard2}), + + Streams0 = emqx_ds:get_streams(DB, TopicFilter, 0), + Iterators0 = lists:map( + fun({_Rank, S}) -> + {ok, Iter} = emqx_ds:make_iterator(DB, S, TopicFilter, 0), + Iter + end, + Streams0 ), - ok. + %% Disrupt the link to the second shard. + ok = emqx_ds_test_helpers:mock_rpc_result( + fun(_Node, emqx_ds_replication_layer, _Function, Args) -> + case Args of + [DB, Shard1 | _] -> passthrough; + [DB, Shard2 | _] -> unavailable + end + end + ), + + %% Result of `emqx_ds:get_streams/3` will just contain partial results, not an error. + Streams1 = emqx_ds:get_streams(DB, TopicFilter, 0), + ?assert( + length(Streams1) > 0 andalso length(Streams1) =< length(Streams0), + Streams1 + ), + + %% At least one of `emqx_ds:make_iterator/4` will end in an error. + Results1 = lists:map( + fun({_Rank, S}) -> + ?assertMatchOneOf( + {ok, _Iter}, + {error, recoverable, {erpc, _}}, + emqx_ds:make_iterator(DB, S, TopicFilter, 0) + ) + end, + Streams0 + ), + ?assert( + length([error || {error, _, _} <- Results1]) > 0, + Results1 + ), + + %% At least one of `emqx_ds:next/3` over initial set of iterators will end in an error. + Results2 = lists:map( + fun(Iter) -> + ?assertMatchOneOf( + {ok, _Iter, [_ | _]}, + {error, recoverable, {badrpc, _}}, + emqx_ds:next(DB, Iter, _BatchSize = 42) + ) + end, + Iterators0 + ), + ?assert( + length([error || {error, _, _} <- Results2]) > 0, + Results2 + ), + + snabbkaffe:stop(), + meck:unload(). update_data_set() -> [ @@ -591,6 +676,10 @@ fetch_all(DB, TopicFilter, StartTime) -> Streams ). +message(ClientId, Topic, Payload, PublishedAt) -> + Msg = message(Topic, Payload, PublishedAt), + Msg#message{from = ClientId}. + message(Topic, Payload, PublishedAt) -> #message{ topic = Topic, diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl new file mode 100644 index 000000000..d26c6dd30 --- /dev/null +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -0,0 +1,58 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_test_helpers). + +-compile(export_all). +-compile(nowarn_export_all). + +%% RPC mocking + +mock_rpc() -> + ok = meck:new(erpc, [passthrough, no_history, unstick]), + ok = meck:new(gen_rpc, [passthrough, no_history]). + +unmock_rpc() -> + catch meck:unload(erpc), + catch meck:unload(gen_rpc). + +mock_rpc_result(ExpectFun) -> + mock_rpc_result(erpc, ExpectFun), + mock_rpc_result(gen_rpc, ExpectFun). + +mock_rpc_result(erpc, ExpectFun) -> + ok = meck:expect(erpc, call, fun(Node, Mod, Function, Args) -> + case ExpectFun(Node, Mod, Function, Args) of + passthrough -> + meck:passthrough([Node, Mod, Function, Args]); + unavailable -> + meck:exception(error, {erpc, noconnection}); + {timeout, Timeout} -> + ok = timer:sleep(Timeout), + meck:exception(error, {erpc, timeout}) + end + end); +mock_rpc_result(gen_rpc, ExpectFun) -> + ok = meck:expect(gen_rpc, call, fun(Dest = {Node, _}, Mod, Function, Args) -> + case ExpectFun(Node, Mod, Function, Args) of + passthrough -> + meck:passthrough([Dest, Mod, Function, Args]); + unavailable -> + {badtcp, econnrefused}; + {timeout, Timeout} -> + ok = timer:sleep(Timeout), + {badrpc, timeout} + end + end). From f7e3afde16893c93ce08e586453a5edccd404b8a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 7 Mar 2024 16:48:33 +0100 Subject: [PATCH 14/82] test(ds): avoid introducing new macros --- apps/emqx/include/asserts.hrl | 21 --------------- .../test/emqx_ds_SUITE.erl | 26 ++++++++++++------- 2 files changed, 16 insertions(+), 31 deletions(-) diff --git a/apps/emqx/include/asserts.hrl b/apps/emqx/include/asserts.hrl index 5744fdbf3..a200394e4 100644 --- a/apps/emqx/include/asserts.hrl +++ b/apps/emqx/include/asserts.hrl @@ -83,27 +83,6 @@ end)() ). --define(assertMatchOneOf(PAT1, PAT2, EXPR), - (fun() -> - case (X__V = (EXPR)) of - PAT1 -> - X__V; - PAT2 -> - X__V; - _ -> - erlang:error( - {assertMatch, [ - {module, ?MODULE}, - {line, ?LINE}, - {expression, (??EXPR)}, - {pattern, "one of [ " ++ (??PAT1) ++ ", " ++ (??PAT2) ++ " ]"}, - {value, X__V} - ]} - ) - end - end)() -). - -define(assertExceptionOneOf(CT1, CT2, EXPR), (fun() -> X__Attrs = [ diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 002f5c557..5ba616012 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -610,11 +610,14 @@ t_error_mapping_replication_layer(_Config) -> %% At least one of `emqx_ds:make_iterator/4` will end in an error. Results1 = lists:map( fun({_Rank, S}) -> - ?assertMatchOneOf( - {ok, _Iter}, - {error, recoverable, {erpc, _}}, - emqx_ds:make_iterator(DB, S, TopicFilter, 0) - ) + case emqx_ds:make_iterator(DB, S, TopicFilter, 0) of + Ok = {ok, _Iter} -> + Ok; + Error = {error, recoverable, {erpc, _}} -> + Error; + Other -> + ct:fail({unexpected_result, Other}) + end end, Streams0 ), @@ -626,11 +629,14 @@ t_error_mapping_replication_layer(_Config) -> %% At least one of `emqx_ds:next/3` over initial set of iterators will end in an error. Results2 = lists:map( fun(Iter) -> - ?assertMatchOneOf( - {ok, _Iter, [_ | _]}, - {error, recoverable, {badrpc, _}}, - emqx_ds:next(DB, Iter, _BatchSize = 42) - ) + case emqx_ds:next(DB, Iter, _BatchSize = 42) of + Ok = {ok, _Iter, [_ | _]} -> + Ok; + Error = {error, recoverable, {badrpc, _}} -> + Error; + Other -> + ct:fail({unexpected_result, Other}) + end end, Iterators0 ), From 963e0de0c311bae752c6b6ee9141643d062b9b99 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Mar 2024 10:42:50 -0300 Subject: [PATCH 15/82] fix(kafka_consumer): check client connectivity Fixes https://emqx.atlassian.net/browse/EMQX-11945 --- .../test/emqx_bridge_v2_testlib.erl | 18 +++++++ .../src/emqx_bridge_kafka_impl_consumer.erl | 48 +++++++++++++++++-- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 1 + .../emqx_bridge_v2_kafka_consumer_SUITE.erl | 12 +++++ 4 files changed, 75 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 9def284d9..99011deea 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -454,6 +454,24 @@ probe_bridge_api(Kind, BridgeType, BridgeName, BridgeConfig) -> ct:pal("bridge probe (~s, http) result:\n ~p", [Kind, Res]), Res. +probe_connector_api(Config) -> + probe_connector_api(Config, _Overrides = #{}). + +probe_connector_api(Config, Overrides) -> + #{ + connector_type := Type, + connector_name := Name + } = get_common_values(Config), + ConnectorConfig0 = get_value(connector_config, Config), + ConnectorConfig1 = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides), + Params = ConnectorConfig1#{<<"type">> => Type, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["connectors_probe"]), + ct:pal("probing connector (~s, http):\n ~p", [Type, Params]), + Method = post, + Res = request(Method, Path, Params), + ct:pal("probing connector (~s, http) result:\n ~p", [Type, Res]), + Res. + list_bridges_http_api_v1() -> Path = emqx_mgmt_api_test_util:api_path(["bridges"]), ct:pal("list bridges (http v1)"), diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 6cfcf7d5d..c4f66dfff 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -220,10 +220,17 @@ on_stop(ConnectorResId, State) -> -spec on_get_status(connector_resource_id(), connector_state()) -> ?status_connected | ?status_disconnected. -on_get_status(_ConnectorResId, _State = #{kafka_client_id := ClientID}) -> - case brod_sup:find_client(ClientID) of - [_Pid] -> ?status_connected; - _ -> ?status_disconnected +on_get_status(_ConnectorResId, State = #{kafka_client_id := ClientID}) -> + case whereis(ClientID) of + Pid when is_pid(Pid) -> + case check_client_connectivity(Pid) of + {Status, Reason} -> + {Status, State, Reason}; + Status -> + Status + end; + _ -> + ?status_disconnected end; on_get_status(_ConnectorResId, _State) -> ?status_disconnected. @@ -631,6 +638,39 @@ is_dry_run(ConnectorResId) -> string:equal(TestIdStart, ConnectorResId) end. +-spec check_client_connectivity(pid()) -> + ?status_connected + | ?status_disconnected + | {?status_disconnected, term()}. +check_client_connectivity(ClientPid) -> + %% We use a fake group id just to probe the connection, as `get_group_coordinator' + %% will ensure a connection to the broker. + FakeGroupId = <<"____emqx_consumer_probe">>, + case brod_client:get_group_coordinator(ClientPid, FakeGroupId) of + {error, client_down} -> + ?status_disconnected; + {error, {client_down, Reason}} -> + %% `brod' should have already logged the client being down. + {?status_disconnected, maybe_clean_error(Reason)}; + {error, Reason} -> + %% `brod' should have already logged the client being down. + {?status_disconnected, maybe_clean_error(Reason)}; + {ok, _Metadata} -> + ?status_connected + end. + +%% Attempt to make the returned error a bit more friendly. +maybe_clean_error(Reason) -> + case Reason of + [{{Host, Port}, {nxdomain, _Stacktrace}} | _] when is_integer(Port) -> + HostPort = iolist_to_binary([Host, ":", integer_to_binary(Port)]), + {HostPort, nxdomain}; + [{error_code, Code}, {error_msg, Msg} | _] -> + {Code, Msg}; + _ -> + Reason + end. + -spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom(). make_client_id(ConnectorResId, BridgeType, BridgeName) -> case is_dry_run(ConnectorResId) of diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 23a8b4828..402841f99 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -2071,6 +2071,7 @@ t_begin_offset_earliest(Config) -> {ok, _} = create_bridge(Config, #{ <<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>} }), + ?retry(500, 20, ?assertEqual({ok, connected}, health_check(Config))), #{num_published => NumMessages} end, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl index 02a7a6279..8568e2f62 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl @@ -339,3 +339,15 @@ t_update_topic(Config) -> emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name) ), ok. + +t_bad_bootstrap_host(Config) -> + ?assertMatch( + {error, {{_, 400, _}, _, _}}, + emqx_bridge_v2_testlib:probe_connector_api( + Config, + #{ + <<"bootstrap_hosts">> => <<"bad_host:9999">> + } + ) + ), + ok. From 163d095dca4a8c406066e5070cc4d4fd7ef76469 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 8 Mar 2024 10:36:49 +0800 Subject: [PATCH 16/82] fix: port the changes for date_to_unix_ts SQL fun from 4.4 --- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 2 +- .../test/emqx_rule_funcs_SUITE.erl | 118 ++++++++++++++++-- apps/emqx_utils/src/emqx_utils_calendar.erl | 102 ++++++++------- changes/ce/fix-12668.en.md | 2 + 4 files changed, 157 insertions(+), 67 deletions(-) create mode 100644 changes/ce/fix-12668.en.md diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 735025e2b..3f2bb99a0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -1181,7 +1181,7 @@ format_date(TimeUnit, Offset, FormatString, TimeEpoch) -> date_to_unix_ts(TimeUnit, FormatString, InputString) -> Unit = time_unit(TimeUnit), - emqx_utils_calendar:parse(InputString, Unit, FormatString). + emqx_utils_calendar:formatted_datetime_to_system_time(InputString, Unit, FormatString). date_to_unix_ts(TimeUnit, Offset, FormatString, InputString) -> Unit = time_unit(TimeUnit), diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index 5bcb48417..d74055a66 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -1143,6 +1143,50 @@ timezone_to_offset_seconds_helper(FunctionName) -> apply_func(FunctionName, [local]), ok. +t_date_to_unix_ts(_) -> + TestTab = [ + {{"2024-03-01T10:30:38+08:00", second}, [ + <<"second">>, <<"+08:00">>, <<"%Y-%m-%d %H-%M-%S">>, <<"2024-03-01 10:30:38">> + ]}, + {{"2024-03-01T10:30:38.333+08:00", second}, [ + <<"second">>, <<"+08:00">>, <<"%Y-%m-%d %H-%M-%S.%3N">>, <<"2024-03-01 10:30:38.333">> + ]}, + {{"2024-03-01T10:30:38.333+08:00", millisecond}, [ + <<"millisecond">>, + <<"+08:00">>, + <<"%Y-%m-%d %H-%M-%S.%3N">>, + <<"2024-03-01 10:30:38.333">> + ]}, + {{"2024-03-01T10:30:38.333+08:00", microsecond}, [ + <<"microsecond">>, + <<"+08:00">>, + <<"%Y-%m-%d %H-%M-%S.%3N">>, + <<"2024-03-01 10:30:38.333">> + ]}, + {{"2024-03-01T10:30:38.333+08:00", nanosecond}, [ + <<"nanosecond">>, + <<"+08:00">>, + <<"%Y-%m-%d %H-%M-%S.%3N">>, + <<"2024-03-01 10:30:38.333">> + ]}, + {{"2024-03-01T10:30:38.333444+08:00", microsecond}, [ + <<"microsecond">>, + <<"+08:00">>, + <<"%Y-%m-%d %H-%M-%S.%6N">>, + <<"2024-03-01 10:30:38.333444">> + ]} + ], + lists:foreach( + fun({{DateTime3339, Unit}, DateToTsArgs}) -> + ?assertEqual( + calendar:rfc3339_to_system_time(DateTime3339, [{unit, Unit}]), + apply_func(date_to_unix_ts, DateToTsArgs), + "Failed on test: " ++ DateTime3339 ++ "/" ++ atom_to_list(Unit) + ) + end, + TestTab + ). + t_parse_date_errors(_) -> ?assertError( bad_formatter_or_date, @@ -1154,6 +1198,37 @@ t_parse_date_errors(_) -> bad_formatter_or_date, emqx_rule_funcs:date_to_unix_ts(second, <<"%y-%m-%d %H:%M:%S">>, <<"2022-05-26 10:40:12">>) ), + %% invalid formats + ?assertThrow( + {missing_date_part, month}, + emqx_rule_funcs:date_to_unix_ts( + second, <<"%Y-%d %H:%M:%S">>, <<"2022-32 10:40:12">> + ) + ), + ?assertThrow( + {missing_date_part, year}, + emqx_rule_funcs:date_to_unix_ts( + second, <<"%H:%M:%S">>, <<"10:40:12">> + ) + ), + ?assertError( + _, + emqx_rule_funcs:date_to_unix_ts( + second, <<"%Y-%m-%d %H:%M:%S">>, <<"2022-05-32 10:40:12">> + ) + ), + ?assertError( + _, + emqx_rule_funcs:date_to_unix_ts( + second, <<"%Y-%m-%d %H:%M:%S">>, <<"2023-02-29 10:40:12">> + ) + ), + ?assertError( + _, + emqx_rule_funcs:date_to_unix_ts( + second, <<"%Y-%m-%d %H:%M:%S">>, <<"2024-02-30 10:40:12">> + ) + ), %% Compatibility test %% UTC+0 @@ -1173,25 +1248,42 @@ t_parse_date_errors(_) -> emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2022-05-26 10-40-12">>) ), - %% UTC+0 - UnixTsLeap0 = 1582986700, + %% leap year checks ?assertEqual( - UnixTsLeap0, - emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2020-02-29 14:31:40">>) + %% UTC+0 + 1709217100, + emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2024-02-29 14:31:40">>) ), - - %% UTC+0 - UnixTsLeap1 = 1709297071, ?assertEqual( - UnixTsLeap1, + %% UTC+0 + 1709297071, emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2024-03-01 12:44:31">>) ), - - %% UTC+0 - UnixTsLeap2 = 1709535387, ?assertEqual( - UnixTsLeap2, - emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2024-03-04 06:56:27">>) + %% UTC+0 + 4107588271, + emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2100-03-01 12:44:31">>) + ), + ?assertEqual( + %% UTC+8 + 1709188300, + emqx_rule_funcs:date_to_unix_ts( + second, <<"+08:00">>, <<"%Y-%m-%d %H:%M:%S">>, <<"2024-02-29 14:31:40">> + ) + ), + ?assertEqual( + %% UTC+8 + 1709268271, + emqx_rule_funcs:date_to_unix_ts( + second, <<"+08:00">>, <<"%Y-%m-%d %H:%M:%S">>, <<"2024-03-01 12:44:31">> + ) + ), + ?assertEqual( + %% UTC+8 + 4107559471, + emqx_rule_funcs:date_to_unix_ts( + second, <<"+08:00">>, <<"%Y-%m-%d %H:%M:%S">>, <<"2100-03-01 12:44:31">> + ) ), %% None zero zone shift with millisecond level precision diff --git a/apps/emqx_utils/src/emqx_utils_calendar.erl b/apps/emqx_utils/src/emqx_utils_calendar.erl index a3c1450cd..3c98c2828 100644 --- a/apps/emqx_utils/src/emqx_utils_calendar.erl +++ b/apps/emqx_utils/src/emqx_utils_calendar.erl @@ -22,7 +22,7 @@ formatter/1, format/3, format/4, - parse/3, + formatted_datetime_to_system_time/3, offset_second/1 ]). @@ -48,8 +48,9 @@ -define(DAYS_PER_YEAR, 365). -define(DAYS_PER_LEAP_YEAR, 366). -define(DAYS_FROM_0_TO_1970, 719528). --define(SECONDS_FROM_0_TO_1970, (?DAYS_FROM_0_TO_1970 * ?SECONDS_PER_DAY)). - +-define(DAYS_FROM_0_TO_10000, 2932897). +-define(SECONDS_FROM_0_TO_1970, ?DAYS_FROM_0_TO_1970 * ?SECONDS_PER_DAY). +-define(SECONDS_FROM_0_TO_10000, (?DAYS_FROM_0_TO_10000 * ?SECONDS_PER_DAY)). %% the maximum value is the SECONDS_FROM_0_TO_10000 in the calendar.erl, %% here minus SECONDS_PER_DAY to tolerate timezone time offset, %% so the maximum date can reach 9999-12-31 which is ample. @@ -171,10 +172,10 @@ format(Time, Unit, Offset, FormatterBin) when is_binary(FormatterBin) -> format(Time, Unit, Offset, Formatter) -> do_format(Time, time_unit(Unit), offset_second(Offset), Formatter). -parse(DateStr, Unit, FormatterBin) when is_binary(FormatterBin) -> - parse(DateStr, Unit, formatter(FormatterBin)); -parse(DateStr, Unit, Formatter) -> - do_parse(DateStr, Unit, Formatter). +formatted_datetime_to_system_time(DateStr, Unit, FormatterBin) when is_binary(FormatterBin) -> + formatted_datetime_to_system_time(DateStr, Unit, formatter(FormatterBin)); +formatted_datetime_to_system_time(DateStr, Unit, Formatter) -> + do_formatted_datetime_to_system_time(DateStr, Unit, Formatter). %%-------------------------------------------------------------------- %% Time unit @@ -467,56 +468,51 @@ padding(Data, _Len) -> Data. %%-------------------------------------------------------------------- -%% internal: parse part +%% internal: formatted_datetime_to_system_time part %%-------------------------------------------------------------------- -do_parse(DateStr, Unit, Formatter) -> +do_formatted_datetime_to_system_time(DateStr, Unit, Formatter) -> DateInfo = do_parse_date_str(DateStr, Formatter, #{}), - {Precise, PrecisionUnit} = precision(DateInfo), - Counter = - fun - (year, V, Res) -> - Res + dy(V) * ?SECONDS_PER_DAY * Precise - (?SECONDS_FROM_0_TO_1970 * Precise); - (month, V, Res) -> - Dm = dym(maps:get(year, DateInfo, 0), V), - Res + Dm * ?SECONDS_PER_DAY * Precise; - (day, V, Res) -> - Res + (V * ?SECONDS_PER_DAY * Precise); - (hour, V, Res) -> - Res + (V * ?SECONDS_PER_HOUR * Precise); - (minute, V, Res) -> - Res + (V * ?SECONDS_PER_MINUTE * Precise); - (second, V, Res) -> - Res + V * Precise; - (millisecond, V, Res) -> - case PrecisionUnit of - millisecond -> - Res + V; - microsecond -> - Res + (V * 1000); - nanosecond -> - Res + (V * 1000000) - end; - (microsecond, V, Res) -> - case PrecisionUnit of - microsecond -> - Res + V; - nanosecond -> - Res + (V * 1000) - end; - (nanosecond, V, Res) -> - Res + V; - (parsed_offset, V, Res) -> - Res - V * Precise - end, - Count = maps:fold(Counter, 0, DateInfo) - (?SECONDS_PER_DAY * Precise), - erlang:convert_time_unit(Count, PrecisionUnit, Unit). + PrecisionUnit = precision(DateInfo), + ToPrecisionUnit = fun(Time, FromUnit) -> + erlang:convert_time_unit(Time, FromUnit, PrecisionUnit) + end, + GetRequiredPart = fun(Key) -> + case maps:get(Key, DateInfo, undefined) of + undefined -> throw({missing_date_part, Key}); + Value -> Value + end + end, + GetOptionalPart = fun(Key) -> maps:get(Key, DateInfo, 0) end, + Year = GetRequiredPart(year), + Month = GetRequiredPart(month), + Day = GetRequiredPart(day), + Hour = GetRequiredPart(hour), + Min = GetRequiredPart(minute), + Sec = GetRequiredPart(second), + DateTime = {{Year, Month, Day}, {Hour, Min, Sec}}, + TotalSecs = datetime_to_system_time(DateTime) - GetOptionalPart(parsed_offset), + check(TotalSecs, DateStr, Unit), + TotalTime = + ToPrecisionUnit(TotalSecs, second) + + ToPrecisionUnit(GetOptionalPart(millisecond), millisecond) + + ToPrecisionUnit(GetOptionalPart(microsecond), microsecond) + + ToPrecisionUnit(GetOptionalPart(nanosecond), nanosecond), + erlang:convert_time_unit(TotalTime, PrecisionUnit, Unit). -precision(#{nanosecond := _}) -> {1000_000_000, nanosecond}; -precision(#{microsecond := _}) -> {1000_000, microsecond}; -precision(#{millisecond := _}) -> {1000, millisecond}; -precision(#{second := _}) -> {1, second}; -precision(_) -> {1, second}. +check(Secs, _, _) when Secs >= -?SECONDS_FROM_0_TO_1970, Secs < ?SECONDS_FROM_0_TO_10000 -> + ok; +check(_Secs, DateStr, Unit) -> + throw({bad_format, #{date_string => DateStr, to_unit => Unit}}). + +datetime_to_system_time(DateTime) -> + calendar:datetime_to_gregorian_seconds(DateTime) - ?SECONDS_FROM_0_TO_1970. + +precision(#{nanosecond := _}) -> nanosecond; +precision(#{microsecond := _}) -> microsecond; +precision(#{millisecond := _}) -> millisecond; +precision(#{second := _}) -> second; +precision(_) -> second. do_parse_date_str(<<>>, _, Result) -> Result; diff --git a/changes/ce/fix-12668.en.md b/changes/ce/fix-12668.en.md new file mode 100644 index 000000000..c8ff95f9e --- /dev/null +++ b/changes/ce/fix-12668.en.md @@ -0,0 +1,2 @@ +Refactor the SQL function: `date_to_unix_ts()` by using `calendar:datetime_to_gregorian_seconds/1`. +This change also added validation for the input date format. From 29111a2192c1c26bf2d4752a5e0c2d31326de470 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 8 Mar 2024 14:25:19 +0800 Subject: [PATCH 17/82] fix: dialyzer problems --- apps/emqx_utils/src/emqx_utils_calendar.erl | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/apps/emqx_utils/src/emqx_utils_calendar.erl b/apps/emqx_utils/src/emqx_utils_calendar.erl index 3c98c2828..b9da2bfd5 100644 --- a/apps/emqx_utils/src/emqx_utils_calendar.erl +++ b/apps/emqx_utils/src/emqx_utils_calendar.erl @@ -560,27 +560,6 @@ date_size(timezone) -> 5; date_size(timezone1) -> 6; date_size(timezone2) -> 9. -dym(Y, M) -> - case is_leap_year(Y) of - true when M > 2 -> - dm(M) + 1; - _ -> - dm(M) - end. - -dm(1) -> 0; -dm(2) -> 31; -dm(3) -> 59; -dm(4) -> 90; -dm(5) -> 120; -dm(6) -> 151; -dm(7) -> 181; -dm(8) -> 212; -dm(9) -> 243; -dm(10) -> 273; -dm(11) -> 304; -dm(12) -> 334. - str_to_int_or_error(Str, Error) -> case string:to_integer(Str) of {Int, []} -> From e0b64190abec1b5fa3a650ba9d81d8d46e39d6dd Mon Sep 17 00:00:00 2001 From: zmstone Date: Tue, 5 Mar 2024 15:16:49 +0100 Subject: [PATCH 18/82] chore: rename DB_ROLE to ROLE --- bin/emqx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/emqx b/bin/emqx index c7ec11c3b..4699bf880 100755 --- a/bin/emqx +++ b/bin/emqx @@ -986,7 +986,7 @@ if [[ "$IS_BOOT_COMMAND" == 'yes' && "$(get_boot_config 'node.db_backend')" == " if ! (echo -e "$COMPATIBILITY_INFO" | $GREP -q 'MNESIA_OK'); then logwarn "DB Backend is RLOG, but an incompatible OTP version has been detected. Falling back to using Mnesia DB backend." export EMQX_NODE__DB_BACKEND=mnesia - export EMQX_NODE__DB_ROLE=core + export EMQX_NODE__ROLE=core fi fi From 62ebcd71ef7a575f5ce8fa2bb7d76a40b80b1e2d Mon Sep 17 00:00:00 2001 From: zmstone Date: Fri, 8 Mar 2024 11:28:51 +0100 Subject: [PATCH 19/82] fix: load cluster.hocon when generate app.