Ecto - Using select_merge for flexible aggregates
Problem
PostgreSQL is a rock solid database. I've been using aggregate expressions for a number of features over the past few years in various languages. They can be used to build powerful features for your platform. When teamed up with Elixir and Ecto, you get amazing flexibility in your query generation. Typically the data you want to retrieve via a select is fairly
static, or there are only one or two different versions of the select you need.
@doc """
Returns an aggregation of user activity
"""
@spec user_activity_history() :: UserActivityHistory
def user_activity_history() do
query =
from(
u in User,
select: %UserActivityHistory{
captured_at: fragment("now()"),
hour_1: filter(count(u.id), fragment("? >= now() - interval '1 hour'", u.last_active)),
hour_6: filter(count(u.id), fragment("? >= now() - interval '6 hour'", u.last_active)),
hour_12: filter(count(u.id), fragment("? >= now() - interval '12 hour'", u.last_active)),
hour_24: filter(count(u.id), fragment("? >= now() - interval '24 hour'", u.last_active)),
day_1: filter(count(u.id), fragment("? >= now() - interval '1 day'", u.last_active)),
day_7: filter(count(u.id), fragment("? >= now() - interval '7 day'", u.last_active)),
day_30: filter(count(u.id), fragment("? >= now() - interval '30 day'", u.last_active))
}
)
Repo.replica().one(query)
end
The above code generates a rollup of user activity, but what if we wanted a custom interval, or only a subset of these intervals? This is a great use case for Ecto's select_merge, but I'm not going to dive into this example. Instead I'll show a more advanced feature where select_merge shined.
At SwayDM, we have the concept of Members, Vendors and Redemptions. Members perform
activities on the platform to earn SwayCash (SC). Vendors offer Redemptions for a set amount
of SwayCash to Members.
Example Redemptions:
- 20% off a Yeti cooler: (50 SC)
- One free coffee: (8 SC)
Vendors want to set limits on their redemptions to ensure they are not abused.
Example Redemption Limits:
- 20% off a Yeti cooler: (50 SC)
- A member can only redeem the Yeti cooler deal once over the life of the redemption.
- The Vendor only wants 5 Members to use this deal per week.
- The Vendor only wants 100 Members to use this deal overall.
- One free coffee drink: (8 SC)
- A Member can redeem a free coffee once per week.
- A Member can redeem a free coffee 10 times over the life of the redemption.
- A Vendor wants a global limit of 50 free coffees.
Representing Limits
First, let's take a look at how the limits are represented in the
database. There is a `redemptions` table, which embeds a list of
`limits`.
# boilerplate code not included
schema "redemptions" do
# ... all other redemption fields, such as name, price, etc
embeds_many :limits, RedemptionLimit, on_replace: :delete
end
embedded_schema do
field :limit_scope, Ecto.Enum, values: [:global, :user]
field :limit_type, Ecto.Enum, values: [:count, :rate_limit]
field :limit, :integer
field :interval, :integer
field :unit, Ecto.Enum, values: [:year, :month, :week, :day, :hour, :minute, :second]
field :end_of_day, :boolean, default: false
end
Storing Redemption History
We store records of successful redemptions in a `redemptions_history` table. This records the datetime, user,
and redemption that occurred. The Member is the user type that is redeeming, and the Vendor is the member type that
offers redemptions.
schema "redemptions_history" do
belongs_to :redemption, Redemption
belongs_to :member, Member
belongs_to :vendor, Vendor
timestamps(updated_at: false)
end
Showing and Evaluating Limits
We have two main scenarios where we want to show, and potentially evaluate limits. The first is when displaying available
redemptions to the user. We want them to see if a redemption they are interested has a limit.
Typically in this case we do not need to evaluate the limits.
The second is the redemption detail view, where we do want them to see the current state of the global, and user based limits.
Here we can see the member has hit their daily limit, and that they have 9 more remaining. |
Solutions
Inefficient Solutions
Retrieve History: One potential solution would be to retrieve all redemption history records
from the DB, and evaluate the limits in memory. It would be inefficient to retrieve all history
records every time we want to evaluate a limit, and the more history records that exist for the redemption,
the slower it will become. To optimize this, we'd probably have to start caching these history
records, which would be tedious to keep consistent.
Query per Limit: A better solution would be to run one query per limit. Here are two
examples:
defmodule RedemptionLimits do
@doc """
Evaluate redemption limits
# Scope
* global - Limit is applied across all users.
* user - Limit is applied per user.
# Type
* count - Limit is based on the number of redemptions.
* rate_limit - Limit is based on the rate of redemptions.
# Opts
* member_id :: String.t() (Default nil) - If provided, user based limits will be evaluated.
* as_of :: DateTime.t() (Default now()) - The reference point in time to evaluate rate limits.
"""
@spec evaluate!(Redemption.t(), Keyword.t()) :: limit_result
def evaluate!(%Redemption{id: id, limits: limits}, opts \\ []) do
query = base_query(id)
for limit <- limits do
evaluate_limit(limit, query, opts)
end
end
# Global rate limit
def evaluate_limit(%{id: id, limit_scope: :global, limit_type: :rate_limit, limit: limit} = l, query, opts) do
as_of = Map.get(opts, :as_of, DateTime.utc_now())
interval = l.interval
unit = Atom.to_string(l.unit)
query = from(
base_query,
[redemption: r, redemption_history: rh],
select: %{
id: ^id,
limit_scope: :global,
limit_type: :rate_limit,
interval: type(^interval, :integer),
unit: type(^unit, :string),
limit: type(^limit, :integer),
blocking:
filter(
count(rh.id),
rh.member_id == ^member_id and rh.inserted_at >= datetime_add(^as_of, -1 * ^interval, ^unit)
) >= type(^limit, :integer),
count:
filter(
count(rh.id),
rh.member_id == ^member_id and rh.inserted_at >= datetime_add(^as_of, -1 * ^interval, ^unit)
)
}
)
Repo.one(query)
end
# ... Functions for all other limit types
def evaluate_limit(_, _, _), do: %{}
end
->
This approach is an improvement, but still requires multiple queries.
One Query using Select Merge: A solid solution IMO, which requires only one query, is to use Ecto's select_merge. We can use the list of RedemptionLimits to generate one query which will evaluate all the limits at the DB level.
defmodule RedemptionLimits do
@doc """
Evaluate redemption limits
# Scope
* global - Limit is applied across all users.
* user - Limit is applied per user.
# Type
* count - Limit is based on the number of redemptions.
* rate_limit - Limit is based on the rate of redemptions.
# Opts
* member_id :: String.t() (Default nil) - If provided, user based limits will be evaluated.
* as_of :: DateTime.t() (Default now()) - The reference point in time to evaluate rate limits.
"""
@spec evaluate!(Redemption.t(), Keyword.t()) :: limit_result
def evaluate!(%Redemption{id: id, limits: limits}, opts \\ []) do
query = base_query(id)
opts = Map.new(opts)
query = Enum.reduce(limits, query, &gen_select(&1, &2, opts))
Repo.one(query)
end
@doc """
Merge a select into an existing query based on the limit configuration.
## Opts
* member_id :: String.t() (Default nil) - If provided, user based limits will be evaluated.
* as_of :: DateTime.t() (Default now()) - The reference point in time to evaluate rate limits.
"""
@spec gen_select(limit :: RedemptionLimit.t(), query :: Ecto.Query.t(), opts :: map()) :: Ecto.Query.t()
def gen_select(limit, query, opts \\ %{})
# Global count limit
def gen_select(%{id: id, limit_scope: :global, limit_type: :count, limit: limit}, query, _member) do
select_merge(query, [redemption: r, redemption_history: rh], %{
^id => %{
id: ^id,
limit_scope: :global,
limit_type: :count,
limit: type(^limit, :integer),
blocking: count(rh.id) >= type(^limit, :integer),
count: count(rh.id)
}
})
end
# User count limit
def gen_select(%{id: id, limit_scope: :user, limit_type: :count, limit: limit}, query, %{member_id: member_id})
when is_binary(pid) do
select_merge(query, [redemption: r, redemption_history: rh], %{
^id => %{
id: ^id,
limit_scope: :user,
limit_type: :count,
limit: type(^limit, :integer),
blocking: filter(count(rh.id), rh.member_id == ^member_id) >= type(^limit, :integer),
count: filter(count(rh.id), rh.member_id == ^member_id)
}
})
end
# Global rate limit
def gen_select(%{id: id, limit_scope: :global, limit_type: :rate_limit, limit: limit} = l, query, opts) do
as_of = Map.get(opts, :as_of, DateTime.utc_now())
interval = l.interval
unit = Atom.to_string(l.unit)
select_merge(query, [redemption: r, redemption_history: rh], %{
^id => %{
id: ^id,
limit_scope: :global,
limit_type: :rate_limit,
interval: type(^interval, :integer),
unit: type(^unit, :string),
limit: type(^limit, :integer),
blocking:
filter(
count(rh.id),
rh.inserted_at >= datetime_add(^as_of, -1 * ^interval, ^unit)
) >= type(^limit, :integer),
count:
filter(
count(rh.id),
rh.inserted_at >= datetime_add(^as_of, -1 * ^interval, ^unit)
)
}
})
end
# ... Functions for all other limit types
def gen_select(_limit, query, _opts), do: query
end
How does this work?
When you use select_merge, the result must be a map. Ecto generates a query that executes all select statements, and then merges the results into a map as a result.
In the example above, each limit generates a new select statement which returns a map. The map key is the redemption limit ID. This makes it very easy to pick an evaluated limit from the result map. See the example test below.
assert %{
:redemption_id => ^rid,
^global_limit_id => %{
blocking: false,
count: 5,
limit: 6,
limit_scope: :global,
limit_type: :count
},
^user_limit_id => %{
blocking: true,
count: 3,
limit: 3,
limit_scope: :user,
limit_type: :count
}
} = RedemptionLimits.evaluate!(redemption, member_id: member.id)
Using this approach with aggregates is beneficial as Postgres will more efficiently retrieve and aggregate the data when it is all one query, rather than multiple queries.
Adding a new limit is very easy, we simply add a new gen_select function for the new limit type.
Evaluating a subset of limits is also very straight forward, we just filter the list before generating the query.
Conclusion
I hope you find this post useful. There are better introductions to select_merge, but this is a real life use case that I thought others would find interesting and useful.
Comments
Post a Comment