When dealing with bigger amounts of data that you have to somehow transform (for example map over) in Elixir, you might encounter a problem of running out of memory or losing the performance that Phoenix and Elixir can provide.
What they can provide though, are Streams. Elixir Streams allow you to build a pipeline of operations, and are really beneficial when working with larger amounts of data - what they do, is they chunk the set of data you have, and do the transforms (like maps, filters, etc.) on demand, in those chunks, which allows to save a lot of memory, by not loading the whole set at once, rather “streaming” the data one after another. Simply speaking - Streams are basically “lazy” enumerables.
This concept would work perfectly when working with large sets of entities from the database, wouldn’t it? That’s what also Ecto’s developers thought, and thanks to them - we have the Ecto’s Repo.stream/1
.
Example
Let’s take a look at a real example of how a simplified code for preparing and rendering a sitemap, with the help of Phoenix, would look without, and then with the Ecto’s Stream:
def fetch(conn, _params) do
render(conn, "sitemap.xml",
sitemap_items: get_sitemap_paths(),
host: "https://appunite.com"
)
end
def get_sitemap_paths do
sitemap_profiles() ++ sitemap_posts()
end
defp sitemap_profiles do
from(u in User, where: is_nil(u.deleted_at))
|> Repo.all()
|> Enum.map(fn profile ->
%SitemapUrl{
url: "/profiles/#{profile.slug}",
lastmod: profile.updated_at
}
end)
end
defp sitemap_posts do
from(p in Posts,
where: p.private == false and is_nil(p.deleted_at)
)
|> Repo.all()
|> Enum.map(fn post ->
%SitemapUrl{
url: "/posts/#{post.slug}",
lastmod: post.updated_at
}
end)
end
and then with the Repo.Stream/1
def fetch(conn, _params) do
{:ok, conn} =
Repo.transaction(fn ->
render(conn, "sitemap.xml",
sitemap_items: get_sitemap_paths(),
host: "https://appunite.com"
)
end)
conn
end
def get_sitemap_paths do
Stream.concat(sitemap_profiles(), sitemap_posts())
end
defp sitemap_profiles do
from(u in User, where: is_nil(u.deleted_at))
|> Repo.stream()
|> Stream.map(fn profile ->
%SitemapUrl{
url: "/profiles/#{profile.slug}",
lastmod: profile.updated_at
}
end)
end
defp sitemap_posts do
from(p in Post,
where: p.private == false and is_nil(p.deleted_at)
)
|> Repo.stream()
|> Stream.map(fn post ->
%SitemapUrl{
url: "/posts/#{post.slug}",
lastmod: post.updated_at
}
end)
end
Differences
- We replaced
Repo.all/1
with (surprisingly)Repo.stream/1
. It’ll still allow us to work with all the records from the query, but with one difference - it returns a Stream, instead of an enumerable list. - We had to wrap the whole part where we work with the Stream inside a database transaction. It’s a required step, as we have to be sure that the data doesn’t change during the lifespan of the Stream (you have to remember, that only a chunk of the entities from the query is inside our app’s memory at once).
- In order to work with the Stream, we can’t use the typical
Enum
methods - instead we use theStream
module, but it’s not a big deal - it has all the methods you would need, likemap
,filter
, etc. Also take a look at how we’ve composed two streams - instead of the++
operator, the Stream.concat/2 function had to be used.
Read more
- https://elixir-lang.org/getting-started/enumerables-and-streams.html#streams
- https://hexdocs.pm/ecto/Ecto.Repo.html#c:stream/2
- https://hexdocs.pm/elixir/1.12/Stream.html
Special thanks to Kacper Latuszewski ;)