fix(ds): Address code review remarks
This commit is contained in:
parent
c030188eb7
commit
a1cdbaa76d
2
Makefile
2
Makefile
|
@ -85,7 +85,7 @@ $(REL_PROFILES:%=%-compile): $(REBAR) merge-config
|
||||||
|
|
||||||
.PHONY: ct
|
.PHONY: ct
|
||||||
ct: $(REBAR) merge-config
|
ct: $(REBAR) merge-config
|
||||||
@ENABLE_COVER_COMPILE=1 $(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct
|
@$(REBAR) ct --name $(CT_NODE_NAME) -c -v --cover_export_name $(CT_COVER_EXPORT_PREFIX)-ct
|
||||||
|
|
||||||
## only check bpapi for enterprise profile because it's a super-set.
|
## only check bpapi for enterprise profile because it's a super-set.
|
||||||
.PHONY: static_checks
|
.PHONY: static_checks
|
||||||
|
|
|
@ -605,9 +605,11 @@ session_read_subscriptions(DSSessionId) ->
|
||||||
),
|
),
|
||||||
mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read).
|
mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read).
|
||||||
|
|
||||||
-spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), emqx_ds:time()}.
|
-spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}.
|
||||||
new_subscription_id(DSSessionId, TopicFilter) ->
|
new_subscription_id(DSSessionId, TopicFilter) ->
|
||||||
NowMS = erlang:system_time(microsecond),
|
%% Note: here we use _milliseconds_ to match with the timestamp
|
||||||
|
%% field of `#message' record.
|
||||||
|
NowMS = erlang:system_time(millisecond),
|
||||||
DSSubId = {DSSessionId, TopicFilter},
|
DSSubId = {DSSessionId, TopicFilter},
|
||||||
{DSSubId, NowMS}.
|
{DSSubId, NowMS}.
|
||||||
|
|
||||||
|
@ -662,8 +664,7 @@ renew_streams(Id, ExistingStreams, TopicFilter, StartTime) ->
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
mnesia:write(?SESSION_STREAM_TAB, Rec, write),
|
mnesia:write(?SESSION_STREAM_TAB, Rec, write),
|
||||||
% StartTime),
|
{ok, Iterator} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime),
|
||||||
{ok, Iterator} = emqx_ds:make_iterator(Stream, TopicFilter, 0),
|
|
||||||
IterRec = #ds_iter{id = {Id, Stream}, iter = Iterator},
|
IterRec = #ds_iter{id = {Id, Stream}, iter = Iterator},
|
||||||
mnesia:write(?SESSION_ITER_TAB, IterRec, write)
|
mnesia:write(?SESSION_ITER_TAB, IterRec, write)
|
||||||
end
|
end
|
||||||
|
|
|
@ -512,22 +512,24 @@ make_bitfilter(Keymapper = #keymapper{dim_sizeof = DimSizeof}, Ranges) ->
|
||||||
{Bitmask, Bitfilter} = lists:unzip(L),
|
{Bitmask, Bitfilter} = lists:unzip(L),
|
||||||
{vector_to_key(Keymapper, Bitmask), vector_to_key(Keymapper, Bitfilter)}.
|
{vector_to_key(Keymapper, Bitmask), vector_to_key(Keymapper, Bitfilter)}.
|
||||||
|
|
||||||
%% Transform inequalities into a list of closed intervals that the
|
%% Transform constraints into a list of closed intervals that the
|
||||||
%% vector elements should lie in.
|
%% vector elements should lie in.
|
||||||
constraints_to_ranges(#keymapper{dim_sizeof = DimSizeof}, Filter) ->
|
constraints_to_ranges(#keymapper{dim_sizeof = DimSizeof}, Filter) ->
|
||||||
lists:zipwith(
|
lists:zipwith(
|
||||||
fun
|
fun(Constraint, Bitsize) ->
|
||||||
(any, Bitsize) ->
|
Max = ones(Bitsize),
|
||||||
{0, ones(Bitsize)};
|
case Constraint of
|
||||||
({'=', infinity}, Bitsize) ->
|
any ->
|
||||||
Val = ones(Bitsize),
|
{0, Max};
|
||||||
{Val, Val};
|
{'=', infinity} ->
|
||||||
({'=', Val}, _Bitsize) ->
|
{Max, Max};
|
||||||
{Val, Val};
|
{'=', Val} when Val =< Max ->
|
||||||
({'>=', Val}, Bitsize) ->
|
{Val, Val};
|
||||||
{Val, ones(Bitsize)};
|
{'>=', Val} when Val =< Max ->
|
||||||
({Min, '..', Max}, _Bitsize) ->
|
{Val, Max};
|
||||||
{Min, Max}
|
{A, '..', B} when A =< Max, B =< Max ->
|
||||||
|
{A, B}
|
||||||
|
end
|
||||||
end,
|
end,
|
||||||
Filter,
|
Filter,
|
||||||
DimSizeof
|
DimSizeof
|
||||||
|
|
|
@ -27,7 +27,7 @@
|
||||||
-export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]).
|
-export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]).
|
||||||
|
|
||||||
%% internal exports:
|
%% internal exports:
|
||||||
-export([format_key/2, format_keyfilter/1]).
|
-export([format_key/2]).
|
||||||
|
|
||||||
-export_type([options/0]).
|
-export_type([options/0]).
|
||||||
|
|
||||||
|
@ -321,11 +321,6 @@ format_key(KeyMapper, Key) ->
|
||||||
Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
|
Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
|
||||||
lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
|
lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).
|
||||||
|
|
||||||
format_keyfilter(any) ->
|
|
||||||
any;
|
|
||||||
format_keyfilter({Op, Val}) ->
|
|
||||||
{Op, integer_to_list(Val, 16)}.
|
|
||||||
|
|
||||||
-spec make_key(s(), emqx_types:message()) -> {binary(), [binary()]}.
|
-spec make_key(s(), emqx_types:message()) -> {binary(), [binary()]}.
|
||||||
make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) ->
|
make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) ->
|
||||||
Tokens = emqx_topic:tokens(TopicBin),
|
Tokens = emqx_topic:tokens(TopicBin),
|
||||||
|
|
Loading…
Reference in New Issue