diff --git a/.ci/docker-compose-file/kafka/kafka-entrypoint.sh b/.ci/docker-compose-file/kafka/kafka-entrypoint.sh index 336a78e74..987bfbccd 100755 --- a/.ci/docker-compose-file/kafka/kafka-entrypoint.sh +++ b/.ci/docker-compose-file/kafka/kafka-entrypoint.sh @@ -49,6 +49,9 @@ echo "+++++++ Creating Kafka Topics ++++++++" # there seem to be a race condition when creating the topics (too early) env KAFKA_CREATE_TOPICS="$KAFKA_CREATE_TOPICS_NG" KAFKA_PORT="$PORT1" create-topics.sh +# create a topic with max.message.bytes=100 +/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server "${SERVER}:${PORT1}" --topic max-100-bytes --partitions 1 --replication-factor 1 --config max.message.bytes=100 + echo "+++++++ Wait until Kafka ports are down ++++++++" bash -c 'while printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1 diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 8b0afa0b2..ae4aab097 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -85,9 +85,11 @@ end_per_testcase(TestCase, Config) when Nodes = ?config(nodes, Config), emqx_common_test_helpers:call_janitor(60_000), ok = emqx_cth_cluster:stop(Nodes), + snabbkaffe:stop(), ok; end_per_testcase(_TestCase, _Config) -> emqx_common_test_helpers:call_janitor(60_000), + snabbkaffe:stop(), ok. %%------------------------------------------------------------------------------ diff --git a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl index 4f01ab34a..570eb42ce 100644 --- a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl +++ b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl @@ -30,15 +30,28 @@ -define(PATH, [authentication]). +-import(emqx_common_test_helpers, [on_exit/1]). + all() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + TCs = AllTCs -- require_seeds_tests(), [ - {group, require_seeds}, - t_update_with_invalid_config, - t_update_with_bad_config_value + {group, require_seeds} + | TCs ]. groups() -> - [{require_seeds, [], [t_create, t_authenticate, t_update, t_destroy, t_is_superuser]}]. + [{require_seeds, [], require_seeds_tests()}]. + +require_seeds_tests() -> + [ + t_create, + t_authenticate, + t_authenticate_disabled_prepared_statements, + t_update, + t_destroy, + t_is_superuser + ]. init_per_testcase(_, Config) -> emqx_authn_test_lib:delete_authenticators( @@ -47,6 +60,10 @@ init_per_testcase(_, Config) -> ), Config. +end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(), + ok. + init_per_group(require_seeds, Config) -> ok = init_seeds(), Config. @@ -70,7 +87,12 @@ init_per_suite(Config) -> ), [{apps, Apps} | Config]; false -> - {skip, no_pgsql} + case os:getenv("IS_CI") of + "yes" -> + throw(no_postgres); + _ -> + {skip, no_postgres} + end end. end_per_suite(Config) -> @@ -174,6 +196,25 @@ test_user_auth(#{ ?GLOBAL ). +t_authenticate_disabled_prepared_statements(Config) -> + ResConfig = maps:merge(pgsql_config(), #{disable_prepared_statements => true}), + {ok, _} = emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, ResConfig), + on_exit(fun() -> + emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, pgsql_config()) + end), + ok = lists:foreach( + fun(Sample0) -> + Sample = maps:update_with( + config_params, + fun(Cfg) -> Cfg#{<<"disable_prepared_statements">> => true} end, + Sample0 + ), + ct:pal("test_user_auth sample: ~p", [Sample]), + test_user_auth(Sample) + end, + user_seeds() + ). + t_destroy(_Config) -> AuthConfig = raw_pgsql_auth_config(), diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 5236d9a0e..a0cc8def3 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index a3997ada7..f518c8d4f 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index 78569b321..500c5a394 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index a01d2edf4..c266f14c2 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -490,12 +490,17 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> %% Wolff producer never gives up retrying %% so there can only be 'ok' results. on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) -> - %% the ReplyFn is emqx_resource_buffer_worker:handle_async_reply/2 + %% the ReplyFn is emqx_rule_runtime:inc_action_metrics/2 apply(ReplyFn, Args ++ [ok]); on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> - %% wolff should bump the dropped_queue_full counter - %% do not apply the callback (which is basically to bump success or fail counter) - ok. + %% wolff should bump the dropped_queue_full counter in handle_telemetry_event/4 + %% so there is no need to apply the callback here + ok; +on_kafka_ack(_Partition, message_too_large, {ReplyFn, Args}) -> + %% wolff should bump the message 'dropped' counter with handle_telemetry_event/4. + %% however 'dropped' is not mapped to EMQX metrics name + %% so we reply error here + apply(ReplyFn, Args ++ [{error, message_too_large}]). %% Note: since wolff client has its own replayq that is not managed by %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 05f73684b..3caa712ed 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -185,6 +185,10 @@ action_config(ConnectorName, Overrides) -> emqx_utils_maps:deep_merge(Cfg1, Overrides). bridge_v2_config(ConnectorName) -> + KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + bridge_v2_config(ConnectorName, KafkaTopic). + +bridge_v2_config(ConnectorName, KafkaTopic) -> #{ <<"connector">> => ConnectorName, <<"enable">> => true, @@ -209,9 +213,7 @@ bridge_v2_config(ConnectorName) -> <<"query_mode">> => <<"sync">>, <<"required_acks">> => <<"all_isr">>, <<"sync_query_timeout">> => <<"5s">>, - <<"topic">> => list_to_binary( - emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() - ) + <<"topic">> => list_to_binary(KafkaTopic) }, <<"local_topic">> => <<"kafka_t/#">>, <<"resource_opts">> => #{ @@ -378,6 +380,28 @@ t_local_topic(_) -> ok = emqx_connector:remove(?TYPE, test_connector), ok. +t_message_too_large(_) -> + BridgeV2Config = bridge_v2_config(<<"test_connector4">>, "max-100-bytes"), + ConnectorConfig = connector_config(), + {ok, _} = emqx_connector:create(?TYPE, test_connector4, ConnectorConfig), + BridgeName = test_bridge4, + {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, BridgeV2Config), + BridgeV2Id = emqx_bridge_v2:id(?TYPE, BridgeName), + TooLargePayload = iolist_to_binary(lists:duplicate(100, 100)), + ?assertEqual(0, emqx_resource_metrics:failed_get(BridgeV2Id)), + emqx:publish(emqx_message:make(<<"kafka_t/hej">>, TooLargePayload)), + ?retry( + _Sleep0 = 50, + _Attempts0 = 100, + begin + ?assertEqual(1, emqx_resource_metrics:failed_get(BridgeV2Id)), + ok + end + ), + ok = emqx_bridge_v2:remove(?TYPE, BridgeName), + ok = emqx_connector:remove(?TYPE, test_connector4), + ok. + t_unknown_topic(_Config) -> ConnectorName = <<"test_connector">>, BridgeName = <<"test_bridge">>, diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index e2f5ac868..faa470bc6 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -601,7 +601,7 @@ t_simple_sql_query(Config) -> {ok, _}, create_bridge(Config) ), - Request = {sql, <<"SELECT count(1) AS T">>}, + Request = {query, <<"SELECT count(1) AS T">>}, Result = case QueryMode of sync -> @@ -651,7 +651,7 @@ t_bad_sql_parameter(Config) -> {ok, _}, create_bridge(Config) ), - Request = {sql, <<"">>, [bad_parameter]}, + Request = {query, <<"">>, [bad_parameter]}, Result = case QueryMode of sync -> diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl index 31419daf9..82efd609e 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_v2_pgsql_SUITE.erl @@ -102,6 +102,18 @@ init_per_group(Group, Config) when {connector_type, group_to_type(Group)} | Config ]; +init_per_group(batch_enabled, Config) -> + [ + {batch_size, 10}, + {batch_time, <<"10ms">>} + | Config + ]; +init_per_group(batch_disabled, Config) -> + [ + {batch_size, 1}, + {batch_time, <<"0ms">>} + | Config + ]; init_per_group(_Group, Config) -> Config. @@ -262,16 +274,66 @@ t_start_action_or_source_with_disabled_connector(Config) -> ok. t_disable_prepared_statements(matrix) -> - [[postgres], [timescale], [matrix]]; + [ + [postgres, batch_disabled], + [postgres, batch_enabled], + [timescale, batch_disabled], + [timescale, batch_enabled], + [matrix, batch_disabled], + [matrix, batch_enabled] + ]; t_disable_prepared_statements(Config0) -> + BatchSize = ?config(batch_size, Config0), + BatchTime = ?config(batch_time, Config0), ConnectorConfig0 = ?config(connector_config, Config0), ConnectorConfig = maps:merge(ConnectorConfig0, #{<<"disable_prepared_statements">> => true}), - Config = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}), - ok = emqx_bridge_v2_testlib:t_sync_query( - Config, - fun make_message/0, - fun(Res) -> ?assertMatch({ok, _}, Res) end, - postgres_bridge_connector_on_query_return + BridgeConfig0 = ?config(bridge_config, Config0), + BridgeConfig = emqx_utils_maps:deep_merge( + BridgeConfig0, + #{ + <<"resource_opts">> => #{ + <<"batch_size">> => BatchSize, + <<"batch_time">> => BatchTime, + <<"query_mode">> => <<"async">> + } + } + ), + Config1 = lists:keyreplace(connector_config, 1, Config0, {connector_config, ConnectorConfig}), + Config = lists:keyreplace(bridge_config, 1, Config1, {bridge_config, BridgeConfig}), + ?check_trace( + #{timetrap => 5_000}, + begin + ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge_api(Config)), + RuleTopic = <<"t/postgres">>, + Type = ?config(bridge_type, Config), + {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, Config), + ResourceId = emqx_bridge_v2_testlib:resource_id(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + {ok, C} = emqtt:start_link(), + {ok, _} = emqtt:connect(C), + lists:foreach( + fun(N) -> + emqtt:publish(C, RuleTopic, integer_to_binary(N)) + end, + lists:seq(1, BatchSize) + ), + case BatchSize > 1 of + true -> + ?block_until(#{ + ?snk_kind := "postgres_success_batch_result", + row_count := BatchSize + }), + ok; + false -> + ok + end, + ok + end, + [] ), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), ok = emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index f9f541b8a..be13591e6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -20,6 +20,7 @@ -export([ trie_create/1, trie_create/0, destroy/1, + trie_dump/2, trie_restore/2, trie_update/2, trie_copy_learned_paths/2, @@ -76,6 +77,8 @@ static_key_size => pos_integer() }. +-type dump() :: [{_Key, _Val}]. + -record(trie, { persist :: persist_callback(), static_key_size :: pos_integer(), @@ -125,12 +128,12 @@ destroy(#trie{trie = Trie, stats = Stats}) -> ok. %% @doc Restore trie from a dump --spec trie_restore(options(), [{_Key, _Val}]) -> trie(). +-spec trie_restore(options(), dump()) -> trie(). trie_restore(Options, Dump) -> trie_update(trie_create(Options), Dump). %% @doc Update a trie with a dump of operations (used for replication) --spec trie_update(trie(), [{_Key, _Val}]) -> trie(). +-spec trie_update(trie(), dump()) -> trie(). trie_update(Trie, Dump) -> lists:foreach( fun({{StateFrom, Token}, StateTo}) -> @@ -140,14 +143,23 @@ trie_update(Trie, Dump) -> ), Trie. +-spec trie_dump(trie(), _Filter :: all | wildcard) -> dump(). +trie_dump(Trie, Filter) -> + case Filter of + all -> + Fun = fun(_) -> true end; + wildcard -> + Fun = fun contains_wildcard/1 + end, + lists:append([P || P <- paths(Trie), Fun(P)]). + -spec trie_copy_learned_paths(trie(), trie()) -> trie(). trie_copy_learned_paths(OldTrie, NewTrie) -> - WildcardPaths = [P || P <- paths(OldTrie), contains_wildcard(P)], lists:foreach( fun({{StateFrom, Token}, StateTo}) -> trie_insert(NewTrie, StateFrom, Token, StateTo) end, - lists:flatten(WildcardPaths) + trie_dump(OldTrie, wildcard) ), NewTrie. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index e940f2514..c978e416f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -25,7 +25,7 @@ %% behavior callbacks: -export([ - create/4, + create/5, open/5, drop/5, prepare_batch/4, @@ -37,7 +37,6 @@ update_iterator/4, next/6, delete_next/6, - post_creation_actions/1, handle_event/4 ]). @@ -179,10 +178,11 @@ emqx_ds_storage_layer:shard_id(), rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), - options() + options(), + _PrevGeneration :: s() | undefined ) -> {schema(), emqx_ds_storage_layer:cf_refs()}. -create(_ShardId, DBHandle, GenId, Options) -> +create(_ShardId, DBHandle, GenId, Options, SPrev) -> %% Get options: BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64), TopicIndexBytes = maps:get(topic_index_bytes, Options, 4), @@ -193,6 +193,14 @@ create(_ShardId, DBHandle, GenId, Options) -> TrieCFName = trie_cf(GenId), {ok, DataCFHandle} = rocksdb:create_column_family(DBHandle, DataCFName, []), {ok, TrieCFHandle} = rocksdb:create_column_family(DBHandle, TrieCFName, []), + case SPrev of + #s{trie = TriePrev} -> + ok = copy_previous_trie(DBHandle, TrieCFHandle, TriePrev), + ?tp(bitfield_lts_inherited_trie, #{}), + ok; + undefined -> + ok + end, %% Create schema: Schema = #{ bits_per_wildcard_level => BitsPerTopicLevel, @@ -241,20 +249,6 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}]) }. --spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) -> - s(). -post_creation_actions( - #{ - new_gen_runtime_data := NewGenData, - old_gen_runtime_data := OldGenData - } -) -> - #s{trie = OldTrie} = OldGenData, - #s{trie = NewTrie0} = NewGenData, - NewTrie = copy_previous_trie(OldTrie, NewTrie0), - ?tp(bitfield_lts_inherited_trie, #{}), - NewGenData#s{trie = NewTrie}. - -spec drop( emqx_ds_storage_layer:shard_id(), rocksdb:db_handle(), @@ -905,9 +899,19 @@ restore_trie(TopicIndexBytes, DB, CF) -> rocksdb:iterator_close(IT) end. --spec copy_previous_trie(emqx_ds_lts:trie(), emqx_ds_lts:trie()) -> emqx_ds_lts:trie(). -copy_previous_trie(OldTrie, NewTrie) -> - emqx_ds_lts:trie_copy_learned_paths(OldTrie, NewTrie). +-spec copy_previous_trie(rocksdb:db_handle(), rocksdb:cf_handle(), emqx_ds_lts:trie()) -> + ok. +copy_previous_trie(DB, TrieCF, TriePrev) -> + {ok, Batch} = rocksdb:batch(), + lists:foreach( + fun({Key, Val}) -> + ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val)) + end, + emqx_ds_lts:trie_dump(TriePrev, wildcard) + ), + Result = rocksdb:write_batch(DB, Batch, []), + rocksdb:release_batch(Batch), + Result. read_persisted_trie(IT, {ok, KeyB, ValB}) -> [ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index f35792c17..47fe047fc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -69,7 +69,6 @@ shard_id/0, options/0, prototype/0, - post_creation_context/0, cooked_batch/0 ]). @@ -169,11 +168,14 @@ until := emqx_ds:time() | undefined }. +%% Module-specific runtime data, as instantiated by `Mod:open/5` callback function. +-type generation_data() :: term(). + %% Schema for a generation. Persistent term. -type generation_schema() :: generation(term()). %% Runtime view of generation: --type generation() :: generation(term()). +-type generation() :: generation(generation_data()). %%%% Shard: @@ -194,38 +196,32 @@ -type options() :: map(). --type post_creation_context() :: - #{ - shard_id := emqx_ds_storage_layer:shard_id(), - db := rocksdb:db_handle(), - new_gen_id := emqx_ds_storage_layer:gen_id(), - old_gen_id := emqx_ds_storage_layer:gen_id(), - new_cf_refs := cf_refs(), - old_cf_refs := cf_refs(), - new_gen_runtime_data := _NewData, - old_gen_runtime_data := _OldData - }. - %%================================================================================ %% Generation callbacks %%================================================================================ %% Create the new schema given generation id and the options. %% Create rocksdb column families. --callback create(shard_id(), rocksdb:db_handle(), gen_id(), Options :: map()) -> +-callback create( + shard_id(), + rocksdb:db_handle(), + gen_id(), + Options :: map(), + generation_data() | undefined +) -> {_Schema, cf_refs()}. %% Open the existing schema -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> - _Data. + generation_data(). %% Delete the schema and data --callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> +-callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), generation_data()) -> ok | {error, _Reason}. -callback prepare_batch( shard_id(), - _Data, + generation_data(), [{emqx_ds:time(), emqx_types:message()}, ...], emqx_ds:message_store_opts() ) -> @@ -233,34 +229,44 @@ -callback commit_batch( shard_id(), - _Data, + generation_data(), _CookedBatch ) -> ok | emqx_ds:error(_). --callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> +-callback get_streams( + shard_id(), generation_data(), emqx_ds:topic_filter(), emqx_ds:time() +) -> [_Stream]. --callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> +-callback make_iterator( + shard_id(), generation_data(), _Stream, emqx_ds:topic_filter(), emqx_ds:time() +) -> emqx_ds:make_iterator_result(_Iterator). -callback make_delete_iterator( - shard_id(), _Data, _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time() + shard_id(), generation_data(), _DeleteStream, emqx_ds:topic_filter(), emqx_ds:time() ) -> emqx_ds:make_delete_iterator_result(_Iterator). --callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()) -> +-callback next( + shard_id(), generation_data(), Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean() +) -> {ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}. -callback delete_next( - shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() + shard_id(), + generation_data(), + DeleteIterator, + emqx_ds:delete_selector(), + pos_integer(), + emqx_ds:time() ) -> {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. --callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. +-callback handle_event(shard_id(), generation_data(), emqx_ds:time(), CustomEvent | tick) -> + [CustomEvent]. --callback post_creation_actions(post_creation_context()) -> _Data. - --optional_callbacks([post_creation_actions/1, handle_event/4]). +-optional_callbacks([handle_event/4]). %%================================================================================ %% API for the replication layer @@ -686,42 +692,14 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) -> server_state() | {error, overlaps_existing_generations}. handle_add_generation(S0, Since) -> #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0, - - #{current_generation := OldGenId, prototype := {CurrentMod, _ModConf}} = Schema0, - OldKey = ?GEN_KEY(OldGenId), - #{OldKey := OldGenSchema} = Schema0, - #{cf_refs := OldCFRefs} = OldGenSchema, - #{OldKey := #{module := OldMod, data := OldGenData}} = Shard0, - Schema1 = update_last_until(Schema0, Since), Shard1 = update_last_until(Shard0, Since), - case Schema1 of _Updated = #{} -> - {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since), + {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Shard0, Since), CFRefs = NewCFRefs ++ CFRefs0, Key = ?GEN_KEY(GenId), - Generation0 = - #{data := NewGenData0} = - open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), - %% When the new generation's module is the same as the last one, we might want to - %% perform actions like inheriting some of the previous (meta)data. - NewGenData = - run_post_creation_actions( - #{ - shard_id => ShardId, - db => DB, - new_gen_id => GenId, - old_gen_id => OldGenId, - new_cf_refs => NewCFRefs, - old_cf_refs => OldCFRefs, - new_gen_runtime_data => NewGenData0, - old_gen_runtime_data => OldGenData, - new_module => CurrentMod, - old_module => OldMod - } - ), - Generation = Generation0#{data := NewGenData}, + Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), Shard = Shard1#{current_generation := GenId, Key => Generation}, S0#s{ cf_refs = CFRefs, @@ -834,9 +812,28 @@ create_new_shard_schema(ShardId, DB, CFRefs, Prototype) -> -spec new_generation(shard_id(), rocksdb:db_handle(), shard_schema(), emqx_ds:time()) -> {gen_id(), shard_schema(), cf_refs()}. new_generation(ShardId, DB, Schema0, Since) -> + new_generation(ShardId, DB, Schema0, undefined, Since). + +-spec new_generation( + shard_id(), + rocksdb:db_handle(), + shard_schema(), + shard() | undefined, + emqx_ds:time() +) -> + {gen_id(), shard_schema(), cf_refs()}. +new_generation(ShardId, DB, Schema0, Shard0, Since) -> #{current_generation := PrevGenId, prototype := {Mod, ModConf}} = Schema0, + case Shard0 of + #{?GEN_KEY(PrevGenId) := #{module := Mod} = PrevGen} -> + %% When the new generation's module is the same as the last one, we might want + %% to perform actions like inheriting some of the previous (meta)data. + PrevRuntimeData = maps:get(data, PrevGen); + _ -> + PrevRuntimeData = undefined + end, GenId = next_generation_id(PrevGenId), - {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf), + {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConf, PrevRuntimeData), GenSchema = #{ module => Mod, data => GenData, @@ -918,23 +915,6 @@ update_last_until(Schema = #{current_generation := GenId}, Until) -> {error, overlaps_existing_generations} end. -run_post_creation_actions( - #{ - new_module := Mod, - old_module := Mod, - new_gen_runtime_data := NewGenData - } = Context -) -> - case erlang:function_exported(Mod, post_creation_actions, 1) of - true -> - Mod:post_creation_actions(Context); - false -> - NewGenData - end; -run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) -> - %% Different implementation modules - NewGenData. - handle_take_snapshot(#s{db = DB, shard_id = ShardId}) -> Name = integer_to_list(erlang:system_time(millisecond)), Dir = checkpoint_dir(ShardId, Name), @@ -1007,17 +987,17 @@ generation_get(Shard, GenId) -> -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()]. generations_since(Shard, Since) -> - Schema = get_schema_runtime(Shard), - maps:fold( - fun - (?GEN_KEY(GenId), #{until := Until}, Acc) when Until >= Since -> - [GenId | Acc]; - (_K, _V, Acc) -> - Acc - end, - [], - Schema - ). + Schema = #{current_generation := Current} = get_schema_runtime(Shard), + list_generations_since(Schema, Current, Since). + +list_generations_since(Schema, GenId, Since) -> + case Schema of + #{?GEN_KEY(GenId) := #{until := Until}} when Until > Since -> + [GenId | list_generations_since(Schema, GenId - 1, Since)]; + #{} -> + %% No more live generations. + [] + end. format_state(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) -> #{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index a22f3791a..b4c3ade3f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -28,7 +28,7 @@ %% behavior callbacks: -export([ - create/4, + create/5, open/5, drop/5, prepare_batch/4, @@ -88,7 +88,7 @@ %% behavior callbacks %%================================================================================ -create(_ShardId, DBHandle, GenId, _Options) -> +create(_ShardId, DBHandle, GenId, _Options, _SPrev) -> CFName = data_cf(GenId), {ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, []), Schema = #schema{}, diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src index 3551a40df..7a20577d4 100644 --- a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src +++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src @@ -2,7 +2,7 @@ {application, emqx_durable_storage, [ {description, "Message persistence and subscription replays for EMQX"}, % strict semver, bump manually! - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]}, diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index bb6d0f917..004096431 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -177,20 +177,33 @@ t_new_generation_inherit_trie(_Config) -> ?check_trace( begin %% Create a bunch of topics to be learned in the first generation - Timestamps = lists:seq(1, 10_000, 100), - Batch = [ - begin - Topic = emqx_topic:join(["wildcard", integer_to_binary(I), "suffix", Suffix]), - {TS, make_message(TS, Topic, integer_to_binary(TS))} - end + TS1 = 500, + Batch1 = [ + {TS1, make_message(TS1, make_topic([wildcard, I, suffix, Suffix]), bin(I))} || I <- lists:seq(1, 200), - TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), %% Now we create a new generation with the same LTS module. It should inherit the %% learned trie. - ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1000), + ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000), + %% Restart the shard, to verify that LTS is persisted. + ok = application:stop(emqx_durable_storage), + ok = application:start(emqx_durable_storage), + ok = emqx_ds:open_db(?FUNCTION_NAME, ?DEFAULT_CONFIG), + %% Store a batch of messages with the same set of topics. + TS2 = 1_500, + Batch2 = [ + {TS2, make_message(TS2, make_topic([wildcard, I, suffix, Suffix]), bin(I))} + || I <- lists:seq(1, 200), + Suffix <- [<<"foo">>, <<"bar">>] + ], + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), + %% We should get only two streams for wildcard query, for "foo" and for "bar". + ?assertMatch( + [_Foo, _Bar], + emqx_ds_storage_layer:get_streams(?SHARD, [<<"wildcard">>, '#'], 1_000) + ), ok end, fun(Trace) -> @@ -211,10 +224,7 @@ t_replay(_Config) -> ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': Batch2 = [ - begin - Topic = emqx_topic:join(["wildcard", integer_to_list(I), "suffix", Suffix]), - {TS, make_message(TS, Topic, integer_to_binary(TS))} - end + {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))} || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), @@ -475,6 +485,9 @@ make_message(PublishedAt, Topic, Payload) when is_binary(Topic) -> payload = Payload }. +make_topic(Tokens = [_ | _]) -> + emqx_topic:join([bin(T) || T <- Tokens]). + payloads(Messages) -> lists:map( fun(#message{payload = P}) -> @@ -488,6 +501,9 @@ parse_topic(Topic = [L | _]) when is_binary(L); is_atom(L) -> parse_topic(Topic) -> emqx_topic:words(iolist_to_binary(Topic)). +bin(X) -> + emqx_utils_conv:bin(X). + %% CT callbacks all() -> emqx_common_test_helpers:all(?MODULE). diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index d9bcf9d25..62d357c19 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -117,6 +117,7 @@ emqx_bridge_oracle, emqx_bridge_rabbitmq, emqx_bridge_azure_event_hub, + emqx_s3, emqx_bridge_s3, emqx_bridge_azure_blob_storage, emqx_schema_registry, diff --git a/apps/emqx_machine/src/emqx_machine.app.src b/apps/emqx_machine/src/emqx_machine.app.src index 228d69463..cf556a2d1 100644 --- a/apps/emqx_machine/src/emqx_machine.app.src +++ b/apps/emqx_machine/src/emqx_machine.app.src @@ -6,7 +6,7 @@ {vsn, "0.3.1"}, {modules, []}, {registered, []}, - {applications, [kernel, stdlib, emqx_ctl]}, + {applications, [kernel, stdlib, emqx_ctl, redbug]}, {mod, {emqx_machine_app, []}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx_machine/src/user_default.erl b/apps/emqx_machine/src/user_default.erl index d98b4ac9f..4235afcc3 100644 --- a/apps/emqx_machine/src/user_default.erl +++ b/apps/emqx_machine/src/user_default.erl @@ -19,21 +19,191 @@ %% Import all the record definitions from the header file into the erlang shell. -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx_conf/include/emqx_conf.hrl"). -include_lib("emqx_dashboard/include/emqx_dashboard.hrl"). %% INCLUDE END +-define(TIME, 3 * 60). +-define(MESSAGE, 512). +-define(GREEN, <<"\e[32;1m">>). +-define(RED, <<"\e[31m">>). +-define(RESET, <<"\e[0m">>). + %% API -export([lock/0, unlock/0]). --export([t/1, t2/1, t/2, t2/2, t/3, t2/3]). +-export([trace/0, t/0, t/1, t/2, t_msg/0, t_msg/1, t_stop/0]). + +-dialyzer({nowarn_function, start_trace/3}). +-dialyzer({no_return, [t/0, t/1, t/2]}). lock() -> emqx_restricted_shell:lock(). unlock() -> emqx_restricted_shell:unlock(). -t(M) -> recon_trace:calls({M, '_', return_trace}, 300). -t2(M) -> recon_trace:calls({M, '_', return_trace}, 300, [{args, arity}]). -t(M, F) -> recon_trace:calls({M, F, return_trace}, 300). -t2(M, F) -> recon_trace:calls({M, F, return_trace}, 300, [{args, arity}]). -t(M, F, A) -> recon_trace:calls({M, F, A}, 300). -t2(M, F, A) -> recon_trace:calls({M, F, A}, 300, [{args, arity}]). +trace() -> + ?ULOG("Trace Usage:~n", []), + ?ULOG(" --------------------------------------------------~n", []), + ?ULOG(" t(Mod, Func) -> trace a specify function.~n", []), + ?ULOG(" t(RTPs) -> trace in Redbug Trace Patterns.~n", []), + ?ULOG(" eg1: t(\"emqx_hooks:run\").~n", []), + ?ULOG(" eg2: t(\"emqx_hooks:run/2\").~n", []), + ?ULOG(" eg3: t(\"emqx_hooks:run/2 -> return\").~n", []), + ?ULOG( + " eg4: t(\"emqx_hooks:run('message.dropped',[_, #{node := N}, _])" + "when N =:= 'emqx@127.0.0.1' -> stack,return\"~n", + [] + ), + ?ULOG(" t() -> when you forget the RTPs.~n", []), + ?ULOG(" --------------------------------------------------~n", []), + ?ULOG(" t_msg(PidorRegName) -> trace a pid/registed name's messages.~n", []), + ?ULOG(" t_msg([Pid,RegName]) -> trace a list pids's messages.~n", []), + ?ULOG(" t_msg() -> when you forget the pids.~n", []), + ?ULOG(" --------------------------------------------------~n", []), + ?ULOG(" t_stop() -> stop running trace.~n", []), + ?ULOG(" --------------------------------------------------~n", []), + ok. + +t_stop() -> + ensure_redbug_stop(). + +t() -> + {M, F} = get_rtp_fun(), + t(M, F). + +t(M) -> + t(M, ""). + +t(M, F) -> + ensure_redbug_stop(), + RTP = format_rtp(emqx_utils_conv:str(M), emqx_utils_conv:str(F)), + Pids = get_procs(erlang:system_info(process_count)), + Options = [{time, ?TIME * 1000}, {msgs, ?MESSAGE}, debug, {procs, Pids}], + start_trace(RTP, Options, Pids). + +t_msg() -> + ?ULOG("Tracing on specific pids's send/receive message: ~n", []), + Pids = get_pids(), + t_msg(Pids). + +t_msg([]) -> + exit("procs can't be empty"); +t_msg(Pids) when is_list(Pids) -> + ensure_redbug_stop(), + Options = [{time, ?TIME * 1000}, {msgs, ?MESSAGE}, {procs, Pids}], + start_trace(['send', 'receive'], Options, Pids); +t_msg(Pid) -> + t_msg([Pid]). + +start_trace(RTP, Options, Pids) -> + info("~nredbug:start(~0p, ~0p)", [RTP, Options]), + case redbug:start(RTP, Options) of + {argument_error, no_matching_functions} -> + warning("~p no matching function", [RTP]); + {argument_error, no_matching_processes} -> + case Pids of + [Pid] -> warning("~p is dead", [Pid]); + _ -> warning("~p are dead", [Pids]) + end; + {argument_error, Reason} -> + warning("argument_error:~p~n", [Reason]); + normal -> + warning("bad RTPs: ~p", [RTP]); + {_Name, ProcessCount, 0} -> + info( + "Tracing (~w) processes matching ~p within ~w seconds", + [ProcessCount, RTP, ?TIME] + ); + {_Name, ProcessCount, FunCount} -> + info( + "Tracing (~w) processes matching ~ts within ~w seconds and ~w function", + [ProcessCount, RTP, ?TIME, FunCount] + ) + end. + +get_rtp_fun() -> + RTP0 = io:get_line("Module:Function | Module | RTPs:\n"), + RTP1 = string:trim(RTP0, both, " \n"), + case string:split(RTP1, ":") of + [M] -> {M, get_function()}; + [M, ""] -> {M, get_function()}; + [M, F] -> {M, F} + end. + +get_function() -> + ?ULOG("Function(func|func/3|func('_', atom, X) when is_integer(X)) :~n", []), + F0 = io:get_line(""), + string:trim(F0, both, " \n"). + +format_rtp("", _) -> + exit("Module can't be empty"); +format_rtp(M, "") -> + add_return(M); +format_rtp(M, F) -> + M ++ ":" ++ add_return(F). + +add_return(M) -> + case string:find(M, "->") of + nomatch -> M ++ "-> return"; + _ -> M + end. + +get_procs(ProcCount) when ProcCount > 2500 -> + warning("Tracing include all(~w) processes can be very risky", [ProcCount]), + get_pids(); +get_procs(_ProcCount) -> + all. + +get_pids() -> + Str = io:get_line("<0.1.0>|<0.1.0>,<0.2.0>|all|new|running|RegName:"), + try + lists:map(fun parse_pid/1, string:tokens(Str, ", \n")) + catch + throw:{not_registered, Name} -> + warning("~ts not registered~n", [Name]), + get_pids(); + throw:new -> + new; + throw:running -> + running; + throw:quit -> + throw(quit); + throw:all -> + all; + _:_ -> + warning("Invalid pid: ~ts~n:", [Str]), + get_pids() + end. + +parse_pid("<0." ++ _ = L) -> + list_to_pid(L); +parse_pid("all") -> + throw(all); +parse_pid("new") -> + throw(new); +parse_pid("running") -> + throw(running); +parse_pid("q") -> + throw(quit); +parse_pid(NameStr) -> + case emqx_utils:safe_to_existing_atom(NameStr, utf8) of + {ok, Name} -> + case whereis(Name) of + undefined -> throw({not_registered, NameStr}); + Pid -> Pid + end; + {error, _} -> + throw({not_registered, NameStr}) + end. + +warning(Fmt, Args) -> ?ELOG("~s" ++ Fmt ++ ".~s~n", [?RED] ++ Args ++ [?RESET]). +info(Fmt, Args) -> ?ELOG("~s" ++ Fmt ++ ".~s~n", [?GREEN] ++ Args ++ [?RESET]). + +ensure_redbug_stop() -> + case redbug:stop() of + not_started -> + ok; + stopped -> + timer:sleep(80), + ok + end. diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.app.src b/apps/emqx_message_transformation/src/emqx_message_transformation.app.src index b8289c1f1..7643cbb9f 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.app.src +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.app.src @@ -6,7 +6,8 @@ {applications, [ kernel, stdlib, - emqx + emqx, + emqx_schema_registry ]}, {env, []}, {modules, []}, diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.erl b/apps/emqx_message_transformation/src/emqx_message_transformation.erl index 612a30f78..14f44ec1f 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.erl @@ -26,20 +26,28 @@ on_message_publish/1 ]). +%% Internal exports +-export([run_transformation/2, trace_failure_context_to_map/1]). + %%------------------------------------------------------------------------------ %% Type declarations %%------------------------------------------------------------------------------ -define(TRACE_TAG, "MESSAGE_TRANSFORMATION"). --define(CONF_ROOT, message_transformation). --define(CONF_ROOT_BIN, <<"message_transformation">>). --define(TRANSFORMATIONS_CONF_PATH, [?CONF_ROOT, transformations]). + +-record(trace_failure_context, { + transformation :: transformation(), + tag :: string(), + context :: map() +}). +-type trace_failure_context() :: #trace_failure_context{}. -type transformation_name() :: binary(). %% TODO: make more specific typespec -type transformation() :: #{atom() => term()}. %% TODO: make more specific typespec -type variform() :: any(). +-type failure_action() :: ignore | drop | disconnect. -type operation() :: #{key := [binary(), ...], value := variform()}. -type qos() :: 0..2. -type rendered_value() :: qos() | boolean() | binary(). @@ -62,7 +70,8 @@ -export_type([ transformation/0, - transformation_name/0 + transformation_name/0, + failure_action/0 ]). %%------------------------------------------------------------------------------ @@ -125,19 +134,50 @@ on_message_publish(Message = #message{topic = Topic}) -> %% Internal exports %%------------------------------------------------------------------------------ +-spec run_transformation(transformation(), emqx_types:message()) -> + {ok, emqx_types:message()} | {failure_action(), trace_failure_context()}. +run_transformation(Transformation, MessageIn) -> + #{ + operations := Operations, + failure_action := FailureAction, + payload_decoder := PayloadDecoder + } = Transformation, + Fun = fun(Operation, Acc) -> + case eval_operation(Operation, Transformation, Acc) of + {ok, NewAcc} -> {cont, NewAcc}; + {error, TraceFailureContext} -> {halt, {error, TraceFailureContext}} + end + end, + PayloadIn = MessageIn#message.payload, + case decode(PayloadIn, PayloadDecoder, Transformation) of + {ok, InitPayload} -> + InitAcc = message_to_context(MessageIn, InitPayload, Transformation), + case emqx_utils:foldl_while(Fun, InitAcc, Operations) of + #{} = ContextOut -> + context_to_message(MessageIn, ContextOut, Transformation); + {error, TraceFailureContext} -> + {FailureAction, TraceFailureContext} + end; + {error, TraceFailureContext} -> + {FailureAction, TraceFailureContext} + end. + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ --spec eval_operation(operation(), transformation(), eval_context()) -> {ok, eval_context()} | error. +-spec eval_operation(operation(), transformation(), eval_context()) -> + {ok, eval_context()} | {error, trace_failure_context()}. eval_operation(Operation, Transformation, Context) -> #{key := K, value := V} = Operation, case eval_variform(K, V, Context) of {error, Reason} -> - trace_failure(Transformation, "transformation_eval_operation_failure", #{ - reason => Reason - }), - error; + FailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "transformation_eval_operation_failure", + context = #{reason => Reason} + }, + {error, FailureContext}; {ok, Rendered} -> NewContext = put_value(K, Rendered, Context), {ok, NewContext} @@ -233,14 +273,16 @@ do_run_transformations(Transformations, Message) -> #{name := Name} = Transformation, emqx_message_transformation_registry:inc_matched(Name), case run_transformation(Transformation, MessageAcc) of - #message{} = NewAcc -> + {ok, #message{} = NewAcc} -> emqx_message_transformation_registry:inc_succeeded(Name), {cont, NewAcc}; - ignore -> + {ignore, TraceFailureContext} -> + trace_failure_from_context(TraceFailureContext), emqx_message_transformation_registry:inc_failed(Name), run_message_transformation_failed_hook(Message, Transformation), {cont, MessageAcc}; - FailureAction -> + {FailureAction, TraceFailureContext} -> + trace_failure_from_context(TraceFailureContext), trace_failure(Transformation, "transformation_failed", #{ transformation => Name, action => FailureAction @@ -270,33 +312,6 @@ do_run_transformations(Transformations, Message) -> FailureAction end. -run_transformation(Transformation, MessageIn) -> - #{ - operations := Operations, - failure_action := FailureAction, - payload_decoder := PayloadDecoder - } = Transformation, - Fun = fun(Operation, Acc) -> - case eval_operation(Operation, Transformation, Acc) of - {ok, NewAcc} -> {cont, NewAcc}; - error -> {halt, FailureAction} - end - end, - PayloadIn = MessageIn#message.payload, - case decode(PayloadIn, PayloadDecoder, Transformation) of - {ok, InitPayload} -> - InitAcc = message_to_context(MessageIn, InitPayload, Transformation), - case emqx_utils:foldl_while(Fun, InitAcc, Operations) of - #{} = ContextOut -> - context_to_message(MessageIn, ContextOut, Transformation); - _ -> - FailureAction - end; - error -> - %% Error already logged - FailureAction - end. - -spec message_to_context(emqx_types:message(), _Payload, transformation()) -> eval_context(). message_to_context(#message{} = Message, Payload, Transformation) -> #{ @@ -321,7 +336,7 @@ message_to_context(#message{} = Message, Payload, Transformation) -> }. -spec context_to_message(emqx_types:message(), eval_context(), transformation()) -> - {ok, emqx_types:message()} | _TODO. + {ok, emqx_types:message()} | {failure_action(), trace_failure_context()}. context_to_message(Message, Context, Transformation) -> #{ failure_action := FailureAction, @@ -330,9 +345,9 @@ context_to_message(Message, Context, Transformation) -> #{payload := PayloadOut} = Context, case encode(PayloadOut, PayloadEncoder, Transformation) of {ok, Payload} -> - take_from_context(Context#{payload := Payload}, Message); - error -> - FailureAction + {ok, take_from_context(Context#{payload := Payload}, Message)}; + {error, TraceFailureContext} -> + {FailureAction, TraceFailureContext} end. take_from_context(Context, Message) -> @@ -362,31 +377,43 @@ decode(Payload, #{type := json}, Transformation) -> {ok, JSON} -> {ok, JSON}; {error, Reason} -> - trace_failure(Transformation, "payload_decode_failed", #{ - decoder => json, - reason => Reason - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_failed", + context = #{ + decoder => json, + reason => Reason + } + }, + {error, TraceFailureContext} end; decode(Payload, #{type := avro, schema := SerdeName}, Transformation) -> try {ok, emqx_schema_registry_serde:decode(SerdeName, Payload)} catch error:{serde_not_found, _} -> - trace_failure(Transformation, "payload_decode_schema_not_found", #{ - decoder => avro, - schema_name => SerdeName - }), - error; + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_schema_not_found", + context = #{ + decoder => avro, + schema_name => SerdeName + } + }, + {error, TraceFailureContext}; Class:Error:Stacktrace -> - trace_failure(Transformation, "payload_decode_schema_failure", #{ - decoder => avro, - schema_name => SerdeName, - kind => Class, - reason => Error, - stacktrace => Stacktrace - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_schema_failure", + context = #{ + decoder => avro, + schema_name => SerdeName, + kind => Class, + reason => Error, + stacktrace => Stacktrace + } + }, + {error, TraceFailureContext} end; decode( Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation @@ -395,22 +422,30 @@ decode( {ok, emqx_schema_registry_serde:decode(SerdeName, Payload, [MessageType])} catch error:{serde_not_found, _} -> - trace_failure(Transformation, "payload_decode_schema_not_found", #{ - decoder => protobuf, - schema_name => SerdeName, - message_type => MessageType - }), - error; + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_schema_not_found", + context = #{ + decoder => protobuf, + schema_name => SerdeName, + message_type => MessageType + } + }, + {error, TraceFailureContext}; Class:Error:Stacktrace -> - trace_failure(Transformation, "payload_decode_schema_failure", #{ - decoder => protobuf, - schema_name => SerdeName, - message_type => MessageType, - kind => Class, - reason => Error, - stacktrace => Stacktrace - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_decode_schema_failure", + context = #{ + decoder => protobuf, + schema_name => SerdeName, + message_type => MessageType, + kind => Class, + reason => Error, + stacktrace => Stacktrace + } + }, + {error, TraceFailureContext} end. encode(Payload, #{type := none}, _Transformation) -> @@ -420,31 +455,43 @@ encode(Payload, #{type := json}, Transformation) -> {ok, Bin} -> {ok, Bin}; {error, Reason} -> - trace_failure(Transformation, "payload_encode_failed", #{ - encoder => json, - reason => Reason - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_encode_failed", + context = #{ + encoder => json, + reason => Reason + } + }, + {error, TraceFailureContext} end; encode(Payload, #{type := avro, schema := SerdeName}, Transformation) -> try {ok, emqx_schema_registry_serde:encode(SerdeName, Payload)} catch error:{serde_not_found, _} -> - trace_failure(Transformation, "payload_encode_schema_not_found", #{ - encoder => avro, - schema_name => SerdeName - }), - error; + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_encode_schema_not_found", + context = #{ + encoder => avro, + schema_name => SerdeName + } + }, + {error, TraceFailureContext}; Class:Error:Stacktrace -> - trace_failure(Transformation, "payload_encode_schema_failure", #{ - encoder => avro, - schema_name => SerdeName, - kind => Class, - reason => Error, - stacktrace => Stacktrace - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_encode_schema_failure", + context = #{ + encoder => avro, + schema_name => SerdeName, + kind => Class, + reason => Error, + stacktrace => Stacktrace + } + }, + {error, TraceFailureContext} end; encode( Payload, #{type := protobuf, schema := SerdeName, message_type := MessageType}, Transformation @@ -453,24 +500,50 @@ encode( {ok, emqx_schema_registry_serde:encode(SerdeName, Payload, [MessageType])} catch error:{serde_not_found, _} -> - trace_failure(Transformation, "payload_encode_schema_not_found", #{ - encoder => protobuf, - schema_name => SerdeName, - message_type => MessageType - }), - error; + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_encode_schema_failure", + context = #{ + encoder => protobuf, + schema_name => SerdeName, + message_type => MessageType + } + }, + {error, TraceFailureContext}; Class:Error:Stacktrace -> - trace_failure(Transformation, "payload_encode_schema_failure", #{ - encoder => protobuf, - schema_name => SerdeName, - message_type => MessageType, - kind => Class, - reason => Error, - stacktrace => Stacktrace - }), - error + TraceFailureContext = #trace_failure_context{ + transformation = Transformation, + tag = "payload_encode_schema_failure", + context = #{ + encoder => protobuf, + schema_name => SerdeName, + message_type => MessageType, + kind => Class, + reason => Error, + stacktrace => Stacktrace + } + }, + {error, TraceFailureContext} end. +trace_failure_from_context( + #trace_failure_context{ + transformation = Transformation, + tag = Tag, + context = Context + } +) -> + trace_failure(Transformation, Tag, Context). + +%% Internal export for HTTP API. +trace_failure_context_to_map( + #trace_failure_context{ + tag = Tag, + context = Context + } +) -> + Context#{msg => list_to_binary(Tag)}. + trace_failure(#{log_failure := #{level := none}} = Transformation, _Msg, _Meta) -> #{ name := _Name, diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl b/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl index 3b3132d0d..1ba5cee8a 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl @@ -8,6 +8,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_utils_api.hrl"). %% `minirest' and `minirest_trails' API @@ -23,6 +24,7 @@ -export([ '/message_transformations'/2, '/message_transformations/reorder'/2, + '/message_transformations/dryrun'/2, '/message_transformations/transformation/:name'/2, '/message_transformations/transformation/:name/metrics'/2, '/message_transformations/transformation/:name/metrics/reset'/2, @@ -36,6 +38,9 @@ -define(TAGS, [<<"Message Transformation">>]). -define(METRIC_NAME, message_transformation). +-type user_property() :: #{binary() => binary()}. +-reflect_type([user_property/0]). + %%------------------------------------------------------------------------------------------------- %% `minirest' and `minirest_trails' API %%------------------------------------------------------------------------------------------------- @@ -49,6 +54,7 @@ paths() -> [ "/message_transformations", "/message_transformations/reorder", + "/message_transformations/dryrun", "/message_transformations/transformation/:name", "/message_transformations/transformation/:name/metrics", "/message_transformations/transformation/:name/metrics/reset", @@ -143,6 +149,25 @@ schema("/message_transformations/reorder") -> } } }; +schema("/message_transformations/dryrun") -> + #{ + 'operationId' => '/message_transformations/dryrun', + post => #{ + tags => ?TAGS, + summary => <<"Test an input against a configuration">>, + description => ?DESC("dryrun_transformation"), + 'requestBody' => + emqx_dashboard_swagger:schema_with_examples( + ref(dryrun_transformation), + example_input_dryrun_transformation() + ), + responses => + #{ + 200 => <<"TODO">>, + 400 => error_schema('BAD_REQUEST', <<"Bad request">>) + } + } + }; schema("/message_transformations/transformation/:name") -> #{ 'operationId' => '/message_transformations/transformation/:name', @@ -267,6 +292,29 @@ fields(reorder) -> [ {order, mk(array(binary()), #{required => true, in => body})} ]; +fields(dryrun_transformation) -> + [ + {transformation, + mk( + hoconsc:ref(emqx_message_transformation_schema, transformation), + #{required => true, in => body} + )}, + {message, mk(ref(dryrun_input_message), #{required => true, in => body})} + ]; +fields(dryrun_input_message) -> + %% See `emqx_message_transformation:eval_context()'. + [ + {client_attrs, mk(map(), #{default => #{}})}, + {payload, mk(binary(), #{required => true})}, + {qos, mk(range(0, 2), #{default => 0})}, + {retain, mk(boolean(), #{default => false})}, + {topic, mk(binary(), #{required => true})}, + {user_property, + mk( + typerefl:alias("map(binary(), binary())", user_property()), + #{default => #{}} + )} + ]; fields(get_metrics) -> [ {metrics, mk(ref(metrics), #{})}, @@ -343,6 +391,9 @@ fields(node_metrics) -> '/message_transformations/reorder'(post, #{body := #{<<"order">> := Order}}) -> do_reorder(Order). +'/message_transformations/dryrun'(post, #{body := Params}) -> + do_transformation_dryrun(Params). + '/message_transformations/transformation/:name/enable/:enable'(post, #{ bindings := #{name := Name, enable := Enable} }) -> @@ -436,6 +487,17 @@ example_input_reorder() -> } }. +example_input_dryrun_transformation() -> + #{ + <<"test">> => + #{ + summary => <<"Test an input against a configuration">>, + value => #{ + todo => true + } + } + }. + example_return_list() -> OtherVal0 = example_transformation([example_avro_check()]), OtherVal = OtherVal0#{name => <<"other_transformation">>}, @@ -541,6 +603,20 @@ do_reorder(Order) -> ?BAD_REQUEST(Error) end. +do_transformation_dryrun(Params) -> + #{ + transformation := Transformation, + message := Message + } = dryrun_input_message_in(Params), + case emqx_message_transformation:run_transformation(Transformation, Message) of + {ok, #message{} = FinalMessage} -> + MessageOut = dryrun_input_message_out(FinalMessage), + ?OK(MessageOut); + {_FailureAction, TraceFailureContext} -> + Result = trace_failure_context_out(TraceFailureContext), + {400, Result} + end. + do_enable_disable(Transformation, Enable) -> RawTransformation = make_serializable(Transformation), case emqx_message_transformation:update(RawTransformation#{<<"enable">> => Enable}) of @@ -654,3 +730,74 @@ operation_out(Operation0) -> fun(Path) -> iolist_to_binary(lists:join(".", Path)) end, Operation ). + +dryrun_input_message_in(Params) -> + %% We already check the params against the schema at the API boundary, so we can + %% expect it to succeed here. + #{root := Result = #{message := Message0}} = + hocon_tconf:check_plain( + #{roots => [{root, ref(dryrun_transformation)}]}, + #{<<"root">> => Params}, + #{atom_key => true} + ), + #{ + client_attrs := ClientAttrs, + payload := Payload, + qos := QoS, + retain := Retain, + topic := Topic, + user_property := UserProperty0 + } = Message0, + UserProperty = maps:to_list(UserProperty0), + Message1 = #{ + id => emqx_guid:gen(), + timestamp => emqx_message:timestamp_now(), + extra => #{}, + from => <<"test-clientid">>, + + flags => #{retain => Retain}, + qos => QoS, + topic => Topic, + payload => Payload, + headers => #{ + client_attrs => ClientAttrs, + properties => #{'User-Property' => UserProperty} + } + }, + Message = emqx_message:from_map(Message1), + Result#{message := Message}. + +dryrun_input_message_out(#message{} = Message) -> + Retain = emqx_message:get_flag(retain, Message, false), + Props = emqx_message:get_header(properties, Message, #{}), + UserProperty0 = maps:get('User-Property', Props, []), + UserProperty = maps:from_list(UserProperty0), + MessageOut0 = emqx_message:to_map(Message), + MessageOut = maps:with([payload, qos, topic], MessageOut0), + MessageOut#{ + retain => Retain, + user_property => UserProperty + }. + +trace_failure_context_out(TraceFailureContext) -> + Context0 = emqx_message_transformation:trace_failure_context_to_map(TraceFailureContext), + %% Some context keys may not be JSON-encodable. + maps:filtermap( + fun + (reason, Reason) -> + case emqx_utils_json:safe_encode(Reason) of + {ok, _} -> + %% Let minirest encode it if it's structured. + true; + {error, _} -> + %% "Best effort" + {true, iolist_to_binary(io_lib:format("~p", [Reason]))} + end; + (stacktrace, _Stacktrace) -> + %% Log? + false; + (_Key, _Value) -> + true + end, + Context0 + ). diff --git a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl index b3b88ac69..5e4d9a0cb 100644 --- a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl +++ b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl @@ -140,6 +140,31 @@ topic_operation(VariformExpr) -> operation(Key, VariformExpr) -> {Key, VariformExpr}. +json_serde() -> + #{<<"type">> => <<"json">>}. + +avro_serde(SerdeName) -> + #{<<"type">> => <<"avro">>, <<"schema">> => SerdeName}. + +dryrun_input_message() -> + dryrun_input_message(_Overrides = #{}). + +dryrun_input_message(Overrides) -> + dryrun_input_message(Overrides, _Opts = #{}). + +dryrun_input_message(Overrides, Opts) -> + Encoder = maps:get(encoder, Opts, fun emqx_utils_json:encode/1), + Defaults = #{ + client_attrs => #{}, + payload => #{}, + qos => 2, + retain => true, + topic => <<"t/u/v">>, + user_property => #{} + }, + InputMessage0 = emqx_utils_maps:deep_merge(Defaults, Overrides), + maps:update_with(payload, Encoder, InputMessage0). + api_root() -> "message_transformations". simplify_result(Res) -> @@ -246,6 +271,13 @@ import_backup(BackupName) -> Res = request(post, Path, Body), simplify_result(Res). +dryrun_transformation(Transformation, Message) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), "dryrun"]), + Params = #{transformation => Transformation, message => Message}, + Res = request(post, Path, Params), + ct:pal("dryrun transformation result:\n ~p", [Res]), + simplify_result(Res). + connect(ClientId) -> connect(ClientId, _IsPersistent = false). @@ -1491,3 +1523,93 @@ t_client_attrs(_Config) -> [] ), ok. + +%% Smoke tests for the dryrun endpoint. +t_dryrun_transformation(_Config) -> + ?check_trace( + begin + Name1 = <<"foo">>, + Operations = [ + operation(qos, <<"payload.q">>), + operation(topic, <<"concat([topic, '/', payload.t])">>), + operation(retain, <<"payload.r">>), + operation(<<"user_property.a">>, <<"payload.u.a">>), + operation(<<"payload">>, <<"payload.p.hello">>) + ], + Transformation1 = transformation(Name1, Operations), + + %% Good input + Message1 = dryrun_input_message(#{ + payload => #{ + p => #{<<"hello">> => <<"world">>}, + q => 1, + r => true, + t => <<"t">>, + u => #{a => <<"b">>} + } + }), + ?assertMatch( + {200, #{ + <<"payload">> := <<"\"world\"">>, + <<"qos">> := 1, + <<"retain">> := true, + <<"topic">> := <<"t/u/v/t">>, + <<"user_property">> := #{<<"a">> := <<"b">>} + }}, + dryrun_transformation(Transformation1, Message1) + ), + + %% Bad input: fails to decode + Message2 = dryrun_input_message(#{payload => "{"}, #{encoder => fun(X) -> X end}), + ?assertMatch( + {400, #{ + <<"decoder">> := <<"json">>, + <<"reason">> := <<_/binary>> + }}, + dryrun_transformation(Transformation1, Message2) + ), + + %% Bad output: fails to encode + MissingSerde = <<"missing_serde">>, + Transformation2 = transformation(Name1, [dummy_operation()], #{ + <<"payload_decoder">> => json_serde(), + <<"payload_encoder">> => avro_serde(MissingSerde) + }), + ?assertMatch( + {400, #{ + <<"msg">> := <<"payload_encode_schema_not_found">>, + <<"encoder">> := <<"avro">>, + <<"schema_name">> := MissingSerde + }}, + dryrun_transformation(Transformation2, Message1) + ), + + %% Bad input: unbound var during one of the operations + Message3 = dryrun_input_message(#{ + payload => #{ + p => #{<<"hello">> => <<"world">>}, + q => 1, + %% Missing: + %% r => true, + t => <<"t">>, + u => #{a => <<"b">>} + } + }), + ?assertMatch( + {400, #{ + <<"msg">> := + <<"transformation_eval_operation_failure">>, + <<"reason">> := + #{ + <<"reason">> := <<"var_unbound">>, + <<"var_name">> := <<"payload.r">> + } + }}, + dryrun_transformation(Transformation1, Message3) + ), + + ok + end, + [] + ), + ok. diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 54bce1006..7e64a3e83 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -59,6 +59,9 @@ default_port => ?PGSQL_DEFAULT_PORT }). +-type connector_resource_id() :: binary(). +-type action_resource_id() :: binary(). + -type template() :: {unicode:chardata(), emqx_template_sql:row_template()}. -type state() :: #{ @@ -319,38 +322,40 @@ do_check_channel_sql( on_get_channels(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId). -on_query(InstId, {TypeOrKey, NameOrSQL}, State) -> - on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); +-spec on_query + %% Called from authn and authz modules + (connector_resource_id(), {prepared_query, binary(), [term()]}, state()) -> + {ok, _} | {error, term()}; + %% Called from bridges + (connector_resource_id(), {action_resource_id(), map()}, state()) -> + {ok, _} | {error, term()}. +on_query(InstId, {TypeOrKey, NameOrMap}, State) -> + on_query(InstId, {TypeOrKey, NameOrMap, []}, State); on_query( InstId, - {TypeOrKey, NameOrSQL, Params}, + {TypeOrKey, NameOrMap, Params}, #{pool_name := PoolName} = State ) -> ?SLOG(debug, #{ msg => "postgresql_connector_received_sql_query", connector => InstId, type => TypeOrKey, - sql => NameOrSQL, + sql => NameOrMap, state => State }), - Type = pgsql_query_type(TypeOrKey, State), - {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), - Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data), + {QueryType, NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrMap, Params, State), + emqx_trace:rendered_action_template( + TypeOrKey, + #{ + statement_type => QueryType, + statement_or_name => NameOrSQL2, + data => Data + } + ), + Res = on_sql_query(InstId, PoolName, QueryType, NameOrSQL2, Data), ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}), handle_result(Res). -pgsql_query_type(_TypeOrTag, #{prepares := disabled}) -> - query; -pgsql_query_type(sql, _ConnectorState) -> - query; -pgsql_query_type(query, _ConnectorState) -> - query; -pgsql_query_type(prepared_query, _ConnectorState) -> - prepared_query; -%% for bridge -pgsql_query_type(_, ConnectorState) -> - pgsql_query_type(prepared_query, ConnectorState). - on_batch_query( InstId, [{Key, _} = Request | _] = BatchReq, @@ -370,7 +375,15 @@ on_batch_query( {_Statement, RowTemplate} -> StatementTemplate = get_templated_statement(BinKey, State), Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq], - case on_sql_query(Key, InstId, PoolName, execute_batch, StatementTemplate, Rows) of + emqx_trace:rendered_action_template( + Key, + #{ + statement_type => execute_batch, + statement_or_name => StatementTemplate, + data => Rows + } + ), + case on_sql_query(InstId, PoolName, execute_batch, StatementTemplate, Rows) of {error, _Error} = Result -> handle_result(Result); {_Column, Results} -> @@ -386,25 +399,38 @@ on_batch_query(InstId, BatchReq, State) -> }), {error, {unrecoverable_error, invalid_request}}. -proc_sql_params(query, SQLOrKey, Params, _State) -> - {SQLOrKey, Params}; -proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> - {SQLOrKey, Params}; -proc_sql_params(TypeOrKey, SQLOrData, Params, State) -> - DisablePreparedStatements = maps:get(prepares, State, #{}) =:= disabled, - BinKey = to_bin(TypeOrKey), - case get_template(BinKey, State) of - undefined -> - {SQLOrData, Params}; - {Statement, RowTemplate} -> - Rendered = render_prepare_sql_row(RowTemplate, SQLOrData), - case DisablePreparedStatements of - true -> - {Statement, Rendered}; - false -> - {BinKey, Rendered} - end - end. +proc_sql_params(ActionResId, #{} = Map, [], State) when is_binary(ActionResId) -> + %% When this connector is called from actions/bridges. + DisablePreparedStatements = prepared_statements_disabled(State), + {ExprTemplate, RowTemplate} = get_template(ActionResId, State), + Rendered = render_prepare_sql_row(RowTemplate, Map), + case DisablePreparedStatements of + true -> + {query, ExprTemplate, Rendered}; + false -> + {prepared_query, ActionResId, Rendered} + end; +proc_sql_params(prepared_query, ConnResId, Params, State) -> + %% When this connector is called from authn/authz modules + DisablePreparedStatements = prepared_statements_disabled(State), + case DisablePreparedStatements of + true -> + #{query_templates := #{ConnResId := {ExprTemplate, _VarsTemplate}}} = State, + {query, ExprTemplate, Params}; + false -> + %% Connector resource id itself is the prepared statement name + {prepared_query, ConnResId, Params} + end; +proc_sql_params(QueryType, SQL, Params, _State) when + is_atom(QueryType) andalso + (is_binary(SQL) orelse is_list(SQL)) andalso + is_list(Params) +-> + %% When called to do ad-hoc commands/queries. + {QueryType, SQL, Params}. + +prepared_statements_disabled(State) -> + maps:get(prepares, State, #{}) =:= disabled. get_template(Key, #{installed_channels := Channels} = _State) when is_map_key(Key, Channels) -> BinKey = to_bin(Key), @@ -420,21 +446,17 @@ get_templated_statement(Key, #{installed_channels := Channels} = _State) when -> BinKey = to_bin(Key), ChannelState = maps:get(BinKey, Channels), - ChannelPreparedStatements = maps:get(prepares, ChannelState), - maps:get(BinKey, ChannelPreparedStatements); + case ChannelState of + #{prepares := disabled, query_templates := #{BinKey := {ExprTemplate, _}}} -> + ExprTemplate; + #{prepares := #{BinKey := ExprTemplate}} -> + ExprTemplate + end; get_templated_statement(Key, #{prepares := PrepStatements}) -> BinKey = to_bin(Key), maps:get(BinKey, PrepStatements). -on_sql_query(Key, InstId, PoolName, Type, NameOrSQL, Data) -> - emqx_trace:rendered_action_template( - Key, - #{ - statement_type => Type, - statement_or_name => NameOrSQL, - data => Data - } - ), +on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of {error, Reason} -> ?tp( @@ -785,6 +807,7 @@ handle_batch_result([{error, Error} | _Rest], _Acc) -> TranslatedError = translate_to_log_context(Error), {error, {unrecoverable_error, export_error(TranslatedError)}}; handle_batch_result([], Acc) -> + ?tp("postgres_success_batch_result", #{row_count => Acc}), {ok, Acc}. translate_to_log_context({error, Reason}) -> diff --git a/apps/emqx_utils/src/emqx_variform.erl b/apps/emqx_utils/src/emqx_variform.erl index 7a0bc8118..1c2064c87 100644 --- a/apps/emqx_utils/src/emqx_variform.erl +++ b/apps/emqx_utils/src/emqx_variform.erl @@ -276,7 +276,7 @@ resolve_var_value(VarName, Bindings, _Opts) -> Value; {error, _Reason} -> throw(#{ - var_name => VarName, + var_name => iolist_to_binary(VarName), reason => var_unbound }) end. diff --git a/changes/ce/feat-13191.en.md b/changes/ce/feat-13191.en.md new file mode 100644 index 000000000..925b6addf --- /dev/null +++ b/changes/ce/feat-13191.en.md @@ -0,0 +1,22 @@ +Upgrade EMQX Docker images to run on Erlang/OTP 26. + +EMQX had been running on Erlang/OTP 26 since 5.5 except for docker images which were on Erlang/OTP 25. +Now all releases are on Erlang/OTP 26. + +A known issue: +When an older version EMQX joins cluster with newer version nodes. +The older version node's schema registry may encounter an issue which emits logs like below: + +``` +Error loading module '$schema_parser___CiYAWBja87PleCyKZ58h__SparkPlug_B_BUILT-IN':, +This BEAM file was compiled for a later version of the runtime system than the current (Erlang/OTP 25). +``` + +This issue is fixed in newer version, however for older versions, a manual step is required. +Execute this in one of the clustered nodes before the older version EMQX joins the cluster. + +```shell +emqx eval 'lists:foreach(fun(Key) -> mnesia:dirty_delete(emqx_ee_schema_registry_protobuf_cache_tab, Key) end, mnesia:dirty_all_keys(emqx_ee_schema_registry_protobuf_cache_tab)).' +``` + +Or if the older version EMQX is already in the cluster, execute the above command, and restart this node. diff --git a/changes/ce/fix-13276.en.md b/changes/ce/fix-13276.en.md new file mode 100644 index 000000000..66b52e45a --- /dev/null +++ b/changes/ce/fix-13276.en.md @@ -0,0 +1 @@ +Fix an issue with durable message storage where parts of the internal storage state were not persisted during setup of new storage generation, a concept used internally for managing message expiration and cleanup. This could have manifested as messages being lost after a restart of the broker. diff --git a/changes/ee/fix-13079.en.md b/changes/ee/fix-13277.en.md similarity index 100% rename from changes/ee/fix-13079.en.md rename to changes/ee/fix-13277.en.md diff --git a/mix.exs b/mix.exs index 1dcbd8147..397a8c4b7 100644 --- a/mix.exs +++ b/mix.exs @@ -214,7 +214,7 @@ defmodule EMQXUmbrella.MixProject do {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.10.5"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"}, diff --git a/rel/i18n/emqx_message_transformation_http_api.hocon b/rel/i18n/emqx_message_transformation_http_api.hocon index 038e3e8ca..a40347bf4 100644 --- a/rel/i18n/emqx_message_transformation_http_api.hocon +++ b/rel/i18n/emqx_message_transformation_http_api.hocon @@ -18,6 +18,9 @@ emqx_message_transformation_http_api { reorder_transformations.desc: """Reorder of all transformations""" + dryrun_transformation.desc: + """Test an input against a transformation""" + enable_disable_transformation.desc: """Enable or disable a particular transformation"""