diff --git a/Makefile b/Makefile index 8e8f4b493..254a4b0f9 100644 --- a/Makefile +++ b/Makefile @@ -85,7 +85,7 @@ $(REL_PROFILES:%=%-compile): $(REBAR) merge-config .PHONY: ct ct: $(REBAR) merge-config - @ENABLE_COVER_COMPILE=1 $(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct + @$(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct ## only check bpapi for enterprise profile because it's a super-set. .PHONY: static_checks diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 9a9e05a7a..f3027f500 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -605,9 +605,11 @@ session_read_subscriptions(DSSessionId) -> ), mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read). --spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), emqx_ds:time()}. +-spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}. new_subscription_id(DSSessionId, TopicFilter) -> - NowMS = erlang:system_time(microsecond), + %% Note: here we use _milliseconds_ to match with the timestamp + %% field of `#message' record. + NowMS = erlang:system_time(millisecond), DSSubId = {DSSessionId, TopicFilter}, {DSSubId, NowMS}. @@ -662,8 +664,7 @@ renew_streams(Id, ExistingStreams, TopicFilter, StartTime) -> ok; false -> mnesia:write(?SESSION_STREAM_TAB, Rec, write), - % StartTime), - {ok, Iterator} = emqx_ds:make_iterator(Stream, TopicFilter, 0), + {ok, Iterator} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime), IterRec = #ds_iter{id = {Id, Stream}, iter = Iterator}, mnesia:write(?SESSION_ITER_TAB, IterRec, write) end diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index 5666b45ae..a3b65c7e6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -512,22 +512,24 @@ make_bitfilter(Keymapper = #keymapper{dim_sizeof = DimSizeof}, Ranges) -> {Bitmask, Bitfilter} = lists:unzip(L), {vector_to_key(Keymapper, Bitmask), vector_to_key(Keymapper, Bitfilter)}. -%% Transform inequalities into a list of closed intervals that the +%% Transform constraints into a list of closed intervals that the %% vector elements should lie in. constraints_to_ranges(#keymapper{dim_sizeof = DimSizeof}, Filter) -> lists:zipwith( - fun - (any, Bitsize) -> - {0, ones(Bitsize)}; - ({'=', infinity}, Bitsize) -> - Val = ones(Bitsize), - {Val, Val}; - ({'=', Val}, _Bitsize) -> - {Val, Val}; - ({'>=', Val}, Bitsize) -> - {Val, ones(Bitsize)}; - ({Min, '..', Max}, _Bitsize) -> - {Min, Max} + fun(Constraint, Bitsize) -> + Max = ones(Bitsize), + case Constraint of + any -> + {0, Max}; + {'=', infinity} -> + {Max, Max}; + {'=', Val} when Val =< Max -> + {Val, Val}; + {'>=', Val} when Val =< Max -> + {Val, Max}; + {A, '..', B} when A =< Max, B =< Max -> + {A, B} + end end, Filter, DimSizeof diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index d8352df18..d57d8013c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -27,7 +27,7 @@ -export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]). %% internal exports: --export([format_key/2, format_keyfilter/1]). +-export([format_key/2]). -export_type([options/0]). @@ -321,11 +321,6 @@ format_key(KeyMapper, Key) -> Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)], lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])). -format_keyfilter(any) -> - any; -format_keyfilter({Op, Val}) -> - {Op, integer_to_list(Val, 16)}. - -spec make_key(s(), emqx_types:message()) -> {binary(), [binary()]}. make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) -> Tokens = emqx_topic:tokens(TopicBin),