Skip to content

Commit 9f77d47

Browse files
committed
feat(changes): Add IDocumentChanges + RavenDB implementation
- [Changes API: How to Subscribe to Document Changes | RavenDB 5.3 Documentation](https://ravendb.net/docs/article-page/5.3/csharp/client-api/changes/how-to-subscribe-to-document-changes#fordocumentsincollection) - [Async / Await with IObserver<T> · Issue #459 · dotnet/reactive](dotnet/reactive#459) - [Change Streams](https://mongodb.github.io/mongo-csharp-driver/2.9/reference/driver/change_streams/)
1 parent 3195df0 commit 9f77d47

File tree

6 files changed

+137
-2
lines changed

6 files changed

+137
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
namespace IntBasis.DocumentOriented.RavenDB.Tests;
2+
3+
public class RavenDbDocumentChangesTest
4+
{
5+
// Separate entity type for change tests so we don't pick up changes from other collections
6+
class TestBook : IDocumentEntity
7+
{
8+
public string? Id {get;set;}
9+
public string? Title { get; set; }
10+
public int PageCount { get; set; }
11+
}
12+
13+
[Theory(DisplayName = "Subscribe nothing changed"), Integration]
14+
public async Task NoChanges(IDocumentChanges subject)
15+
{
16+
var invoked = false;
17+
var subscription = subject.Subscribe<TestBook>(() =>
18+
{
19+
invoked = true;
20+
return Task.CompletedTask;
21+
});
22+
await Task.Delay(100);
23+
subscription.Dispose();
24+
invoked.Should().BeFalse();
25+
}
26+
27+
[Theory(DisplayName = "Subscribe 1 Change"), Integration]
28+
public async Task OneChange(IDocumentChanges subject, IDocumentStorage documentStorage)
29+
{
30+
var entity = new TestBook();
31+
await documentStorage.Store(entity);
32+
var invoked = false;
33+
using var subscription = subject.Subscribe<TestBook>(() =>
34+
{
35+
invoked = true;
36+
return Task.CompletedTask;
37+
});
38+
39+
entity.Title = "New title";
40+
await documentStorage.Store(entity);
41+
42+
await Task.Delay(100);
43+
invoked.Should().BeTrue();
44+
}
45+
46+
[Theory(DisplayName = "Subscribe 5 Changes"), Integration]
47+
public async Task FiveChanges(IDocumentChanges subject, IDocumentStorage documentStorage)
48+
{
49+
var entity = new TestBook();
50+
await documentStorage.Store(entity);
51+
var invoked = 0;
52+
using var subscription = subject.Subscribe<TestBook>(() =>
53+
{
54+
invoked++;
55+
return Task.CompletedTask;
56+
});
57+
58+
for (int i = 0; i < 5; i++)
59+
{
60+
entity.PageCount = i;
61+
await documentStorage.Store(entity);
62+
}
63+
64+
await Task.Delay(500);
65+
invoked.Should().BeInRange(5, 6); // TODO: Figure out why sometimes 6
66+
}
67+
68+
[Theory(DisplayName = "Disposal ends subscription"), Integration]
69+
public async Task Disposal(IDocumentChanges subject, IDocumentStorage documentStorage)
70+
{
71+
var invoked = false;
72+
var subscription = subject.Subscribe<TestBook>(() =>
73+
{
74+
invoked = true;
75+
return Task.CompletedTask;
76+
});
77+
subscription.Dispose();
78+
79+
await documentStorage.Store(new TestBook());
80+
81+
await Task.Delay(200);
82+
invoked.Should().BeFalse();
83+
}
84+
}
85+
86+
//- [Change Streams] (https://mongodb.github.io/mongo-csharp-driver/2.9/reference/driver/change_streams/)

IntBasis.DocumentOriented.RavenDB.Tests/RavenDbExampleTest.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public class RavenDbExampleTest
4747
// store.Maintenance.Server.Send<DatabasePutResult>(createOperation);
4848
//}
4949

50-
RavenDbConfiguration TestConfig => new("Test", "http://127.0.0.1:8080");
51-
IDocumentStore DocumentStore() => RavenDbInitialization.InitializeDocumentStore(TestConfig);
50+
static RavenDbConfiguration TestConfig => new("Test", "http://127.0.0.1:8080");
51+
static IDocumentStore DocumentStore() => RavenDbInitialization.InitializeDocumentStore(TestConfig);
5252

5353
[Theory(DisplayName = "RavenDB Store"), Integration]
5454
public void Storage(IDocumentSession session)

IntBasis.DocumentOriented.RavenDB/IntBasis.DocumentOriented.RavenDB.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
<ItemGroup>
1212
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
1313
<PackageReference Include="RavenDB.Client" Version="5.3.103" />
14+
<PackageReference Include="System.Reactive.Core" Version="5.0.0" />
1415
</ItemGroup>
1516

1617
<ItemGroup>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using Raven.Client.Documents;
2+
using System;
3+
using System.Reactive.Linq;
4+
using System.Threading.Tasks;
5+
6+
namespace IntBasis.DocumentOriented.RavenDB;
7+
8+
public class RavenDbDocumentChanges : IDocumentChanges
9+
{
10+
private readonly IDocumentStore documentStore;
11+
12+
public RavenDbDocumentChanges(IDocumentStore documentStore)
13+
{
14+
this.documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore));
15+
}
16+
17+
/// <inheritdoc/>
18+
public IDisposable Subscribe<T>(Func<Task> observer) where T : IDocumentEntity
19+
{
20+
// [Changes API: How to Subscribe to Document Changes](https://ravendb.net/docs/article-page/5.3/csharp/client-api/changes/how-to-subscribe-to-entity-changes#fordocumentsincollection)
21+
// [Async / Await with IObserver](https://github.com/dotnet/reactive/issues/459)
22+
var observable = documentStore.Changes()
23+
.ForDocumentsInCollection<T>();
24+
return observable.Select(document => Observable.FromAsync(observer))
25+
.Concat()
26+
.Subscribe();
27+
}
28+
}

IntBasis.DocumentOriented.RavenDB/ServiceCollectionExtensions.cs

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public static class ServiceCollectionExtensions
1414
/// External:
1515
/// <list type="bullet">
1616
/// <item> <see cref="IDocumentStorage"/> </item>
17+
/// <item> <see cref="IDocumentChanges"/> </item>
1718
/// </list>
1819
/// <para/>
1920
/// Internal:
@@ -27,6 +28,7 @@ public static IServiceCollection AddDocumentOrientedRavenDb(this IServiceCollect
2728
RavenDbConfiguration configuration)
2829
{
2930
services.AddTransient<IDocumentStorage, RavenDbDocumentStorage>();
31+
services.AddTransient<IDocumentChanges, RavenDbDocumentChanges>();
3032
// A single instance of the Document Store (Singleton Pattern)
3133
// should be created per cluster per the lifetime of your application.
3234
// See https://ravendb.net/docs/article-page/5.3/csharp/client-api/what-is-a-document-store
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
namespace IntBasis.DocumentOriented;
2+
3+
/// <summary>
4+
/// Provides a way to subscribe to notifications of changes to Document Entities
5+
/// </summary>
6+
public interface IDocumentChanges
7+
{
8+
/// <summary>
9+
/// Subscribes to changes to Document Entities of type <typeparamref name="T"/>
10+
/// and invokes <paramref name="observer"/> for each change.
11+
/// <para/>
12+
/// The subscription is closed when the returned object is disposed.
13+
/// </summary>
14+
/// <typeparam name="T">The Document Entity type</typeparam>
15+
/// <param name="observer">The delegate that is called for each change</param>
16+
/// <returns>A subscription which can be closed by calling <see cref="IDisposable.Dispose"/></returns>
17+
IDisposable Subscribe<T>(Func<Task> observer) where T : IDocumentEntity;
18+
}

0 commit comments

Comments
 (0)