-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(flags): dynamic cohort matching in rust #25776
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 30 commits
ca431b5
e89f169
4f20e07
ed00224
fb8aab8
899a99c
896c31a
eeea8cc
d02baec
39dad2d
db8cd8d
43cda76
8d2ab85
9ccf479
797adbe
71def67
27af814
4c49bc4
d4af2f0
870f719
57d9885
9eb0f18
3cfc590
3e8e5d2
3528b31
77059f3
09317c4
43e8692
3a65683
a5812e6
fd52b24
59f7c10
8066aff
4d5ecd9
4012ebe
8ededb1
fe37b04
bc38940
41d3db3
0a409f4
0dd1c0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,8 @@ health = { path = "../common/health" } | |
common-metrics = { path = "../common/metrics" } | ||
tower = { workspace = true } | ||
derive_builder = "0.20.1" | ||
petgraph = "0.6.5" | ||
moka = { version = "0.12.8", features = ["future"] } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. caching lib with support for TTL and feature weighting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a heads up, this is already in the workspace, you can probably pull it in (we're using it in error tracking). |
||
|
||
[lints] | ||
workspace = true | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
use crate::api::FlagError; | ||
use crate::cohort_models::Cohort; | ||
use crate::flag_matching::{PostgresReader, TeamId}; | ||
use moka::future::Cache; | ||
use std::time::Duration; | ||
|
||
/// CohortCache manages the in-memory cache of cohorts using `moka` for caching. | ||
/// | ||
/// Features: | ||
/// - **TTL**: Each cache entry expires after 5 minutes. | ||
/// - **Size-based eviction**: The cache evicts least recently used entries when the maximum capacity is reached. | ||
/// | ||
/// ```text | ||
/// CohortCache { | ||
/// postgres_reader: PostgresReader, | ||
/// per_team_cohorts: Cache<TeamId, Vec<Cohort>> { | ||
/// // Example: | ||
/// 2: [ | ||
/// Cohort { id: 1, name: "Power Users", filters: {...} }, | ||
/// Cohort { id: 2, name: "Churned", filters: {...} } | ||
/// ], | ||
/// 5: [ | ||
/// Cohort { id: 3, name: "Beta Users", filters: {...} } | ||
/// ] | ||
/// } | ||
/// } | ||
/// ``` | ||
/// | ||
/// Caches only successful cohort lists to maintain cache integrity. | ||
#[derive(Clone)] | ||
pub struct CohortCache { | ||
postgres_reader: PostgresReader, | ||
per_team_cohorts: Cache<TeamId, Vec<Cohort>>, | ||
} | ||
|
||
impl CohortCache { | ||
/// Creates a new `CohortCache` with configurable TTL and maximum capacity. | ||
pub fn new( | ||
postgres_reader: PostgresReader, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, but if the variable name is the same as the type name, I go for stuff like "pr: PostgresReader" - the ide tells me everything I need to know about it anyway. I'd make it |
||
max_capacity: Option<u64>, | ||
ttl_seconds: Option<u64>, | ||
) -> Self { | ||
// We use the size of the cohort list as the weight of the entry | ||
let weigher = | ||
|_: &TeamId, value: &Vec<Cohort>| -> u32 { value.len().try_into().unwrap_or(u32::MAX) }; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a thought about casts generally, I think this is totally fine and you shouldn't pay the CI time to change it: I'd almost argue for a raw |
||
|
||
// Initialize the Moka cache with TTL and size-based eviction. | ||
let cache = Cache::builder() | ||
.time_to_live(Duration::from_secs(ttl_seconds.unwrap_or(300))) // Default to 5 minutes | ||
.weigher(weigher) | ||
.max_capacity(max_capacity.unwrap_or(10_000)) // Default to 10,000 cohorts | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This default strikes me as quite low, I'd bump it an order of magnitude (or set it an order of magnitude larger) - that's a pure gut feeling though. |
||
.build(); | ||
|
||
Self { | ||
postgres_reader, | ||
per_team_cohorts: cache, | ||
} | ||
} | ||
|
||
/// Retrieves cohorts for a given team. | ||
/// | ||
/// If the cohorts are not present in the cache or have expired, it fetches them from the database, | ||
/// caches the result upon successful retrieval, and then returns it. | ||
pub async fn get_cohorts_for_team(&self, team_id: TeamId) -> Result<Vec<Cohort>, FlagError> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above, I know it's |
||
if let Some(cached_cohorts) = self.per_team_cohorts.get(&team_id).await { | ||
return Ok(cached_cohorts.clone()); | ||
} | ||
let fetched_cohorts = Cohort::list_from_pg(self.postgres_reader.clone(), team_id).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only a note: I'm a fan of taking a manager-wide lock here (called, say, |
||
self.per_team_cohorts | ||
.insert(team_id, fetched_cohorts.clone()) | ||
.await; | ||
|
||
Ok(fetched_cohorts) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use crate::cohort_models::Cohort; | ||
use crate::test_utils::{ | ||
insert_cohort_for_team_in_pg, insert_new_team_in_pg, setup_pg_reader_client, | ||
setup_pg_writer_client, | ||
}; | ||
use std::sync::Arc; | ||
use tokio::time::{sleep, Duration}; | ||
|
||
/// Helper function to setup a new team for testing. | ||
async fn setup_test_team( | ||
writer_client: Arc<dyn crate::database::Client + Send + Sync>, | ||
) -> Result<TeamId, anyhow::Error> { | ||
let team = crate::test_utils::insert_new_team_in_pg(writer_client, None).await?; | ||
Ok(team.id) | ||
} | ||
|
||
/// Helper function to insert a cohort for a team. | ||
async fn setup_test_cohort( | ||
writer_client: Arc<dyn crate::database::Client + Send + Sync>, | ||
team_id: TeamId, | ||
name: Option<String>, | ||
) -> Result<Cohort, anyhow::Error> { | ||
let filters = serde_json::json!({"properties": {"type": "OR", "values": [{"type": "OR", "values": [{"key": "$active", "type": "person", "value": [true], "negation": false, "operator": "exact"}]}]}}); | ||
insert_cohort_for_team_in_pg(writer_client, team_id, name, filters, false).await | ||
} | ||
|
||
/// Tests that cache entries expire after the specified TTL. | ||
#[tokio::test] | ||
async fn test_cache_expiry() -> Result<(), anyhow::Error> { | ||
let writer_client = setup_pg_writer_client(None).await; | ||
let reader_client = setup_pg_reader_client(None).await; | ||
|
||
let team_id = setup_test_team(writer_client.clone()).await?; | ||
let _cohort = setup_test_cohort(writer_client.clone(), team_id, None).await?; | ||
|
||
// Initialize CohortCache with a short TTL for testing | ||
let cohort_cache = CohortCache::new( | ||
reader_client.clone(), | ||
Some(100), | ||
Some(1), // 1-second TTL | ||
); | ||
|
||
let cohorts = cohort_cache.get_cohorts_for_team(team_id).await?; | ||
assert_eq!(cohorts.len(), 1); | ||
assert_eq!(cohorts[0].team_id, team_id); | ||
|
||
let cached_cohorts = cohort_cache.per_team_cohorts.get(&team_id).await; | ||
assert!(cached_cohorts.is_some()); | ||
|
||
// Wait for TTL to expire | ||
sleep(Duration::from_secs(2)).await; | ||
|
||
// Attempt to retrieve from cache again | ||
let cached_cohorts = cohort_cache.per_team_cohorts.get(&team_id).await; | ||
assert!(cached_cohorts.is_none(), "Cache entry should have expired"); | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Tests that the cache correctly evicts least recently used entries based on the weigher. | ||
#[tokio::test] | ||
async fn test_cache_weigher() -> Result<(), anyhow::Error> { | ||
let writer_client = setup_pg_writer_client(None).await; | ||
let reader_client = setup_pg_reader_client(None).await; | ||
|
||
// Define a smaller max_capacity for testing | ||
let max_capacity: u64 = 3; | ||
|
||
let cohort_cache = CohortCache::new(reader_client.clone(), Some(max_capacity), None); | ||
|
||
let mut inserted_team_ids = Vec::new(); | ||
|
||
// Insert multiple teams and their cohorts | ||
for _ in 0..max_capacity { | ||
let team = insert_new_team_in_pg(writer_client.clone(), None).await?; | ||
let team_id = team.id; | ||
inserted_team_ids.push(team_id); | ||
setup_test_cohort(writer_client.clone(), team_id, None).await?; | ||
cohort_cache.get_cohorts_for_team(team_id).await?; | ||
} | ||
|
||
cohort_cache.per_team_cohorts.run_pending_tasks().await; | ||
let cache_size = cohort_cache.per_team_cohorts.entry_count(); | ||
assert_eq!( | ||
cache_size, max_capacity, | ||
"Cache size should be equal to max_capacity" | ||
); | ||
|
||
let new_team = insert_new_team_in_pg(writer_client.clone(), None).await?; | ||
let new_team_id = new_team.id; | ||
setup_test_cohort(writer_client.clone(), new_team_id, None).await?; | ||
cohort_cache.get_cohorts_for_team(new_team_id).await?; | ||
|
||
cohort_cache.per_team_cohorts.run_pending_tasks().await; | ||
let cache_size_after = cohort_cache.per_team_cohorts.entry_count(); | ||
assert_eq!( | ||
cache_size_after, max_capacity, | ||
"Cache size should remain equal to max_capacity after eviction" | ||
); | ||
|
||
let evicted_team_id = &inserted_team_ids[0]; | ||
let cached_cohorts = cohort_cache.per_team_cohorts.get(evicted_team_id).await; | ||
assert!( | ||
cached_cohorts.is_none(), | ||
"Least recently used cache entry should have been evicted" | ||
); | ||
|
||
let cached_new_team = cohort_cache.per_team_cohorts.get(&new_team_id).await; | ||
assert!( | ||
cached_new_team.is_some(), | ||
"Newly added cache entry should be present" | ||
); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_get_cohorts_for_team() -> Result<(), anyhow::Error> { | ||
let writer_client = setup_pg_writer_client(None).await; | ||
let reader_client = setup_pg_reader_client(None).await; | ||
let team_id = setup_test_team(writer_client.clone()).await?; | ||
let _cohort = setup_test_cohort(writer_client.clone(), team_id, None).await?; | ||
let cohort_cache = CohortCache::new(reader_client.clone(), None, None); | ||
|
||
let cached_cohorts = cohort_cache.per_team_cohorts.get(&team_id).await; | ||
assert!(cached_cohorts.is_none(), "Cache should initially be empty"); | ||
|
||
let cohorts = cohort_cache.get_cohorts_for_team(team_id).await?; | ||
assert_eq!(cohorts.len(), 1); | ||
assert_eq!(cohorts[0].team_id, team_id); | ||
|
||
let cached_cohorts = cohort_cache.per_team_cohorts.get(&team_id).await.unwrap(); | ||
assert_eq!(cached_cohorts.len(), 1); | ||
assert_eq!(cached_cohorts[0].team_id, team_id); | ||
|
||
Ok(()) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.