Skip to content

[RORDEV-1410] Data stream audit sink setup improvements #1089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: epic/RORDEV-1263
Choose a base branch
from

Conversation

mateuszkp96
Copy link
Collaborator

No description provided.

@mateuszkp96 mateuszkp96 changed the base branch from develop to epic/RORDEV-1263 March 25, 2025 16:41
@mateuszkp96
Copy link
Collaborator Author

I added changes for the es816x module. I will port to other modules when the solution is accepted

@mateuszkp96 mateuszkp96 requested a review from coutoPL March 25, 2025 18:05
import tech.beshu.ror.implicits.*
import tech.beshu.ror.utils.RefinedUtils.*

import java.util.concurrent.TimeUnit

final class AuditDataStreamCreator(services: NonEmptyList[DataStreamService]) extends Logging {

def createIfNotExists(dataStreamName: RorAuditDataStream): Task[Unit] = {
services.toList.traverse(createIfNotExists(_, dataStreamName)).map((_: List[Unit]) => ())
def createIfNotExists(dataStreamName: RorAuditDataStream): Task[Either[String, Unit]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's create at least type def for the String (aka Message or sth like that)

.toList
.map(createIfNotExists(_, dataStreamName))
.sequence
.map(_.sequence.map((_: List[Unit]) => ()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any monoid here, so I guess we miss potential error messages (when more than one service fails)

.map((_: Unit) => new EsDataStreamBasedAuditSink(serializer, rorAuditDataStream, auditSinkService))
.flatMap {
case Right(()) => Task.delay(new EsDataStreamBasedAuditSink(serializer, rorAuditDataStream, auditSinkService))
case Left(errorMsg) => Task.raiseError(new IllegalStateException(errorMsg))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it an illegal state? I think it's possible case

_ <- createDataStream(settings.dataStreamName)
} yield ()
}
_ <- createIfAbsent(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can format it like that:

    _ <- createIfAbsent(
        checkIfResourceExists = checkIndexLifecyclePolicyExists(settings.lifecyclePolicy.id),
        createResource = createIndexLifecyclePolicy(settings.lifecyclePolicy),
        onNotAcknowledged = Failure(s"Unable to determine if the index lifecycle policy with ID '${settings.lifecyclePolicy.id.show}' has been created")
      )


def checkDataStreamExists(dataStreamName: DataStreamName.Full): Task[Boolean]

protected def createDataStream(dataStreamName: DataStreamName.Full): Task[CreationResult]

protected def checkIndexLifecyclePolicyExists(policyId: NonEmptyString): Task[Boolean] = Task.pure(false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the default implementation? I'm not sure if we need it
(here, and below too)

"A ReadonlyREST data stream service" when {
"fully setup data stream called" should {
"not attempt to create data stream when one exists" in {
tryToCreateDataStream(DataStreamsMocks.alreadyExists)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests are great, but IMO we should do sth to improve the readability. Now, it's hard for the reader to grasp what each test does without looking into runTests, testSuccessfulDataStreamSetup, and other method implementations.

Maybe we should try to maintain the given-when-then style? And try to explicitly show what the mock returns, what is called, and how we assert the result.

Obviously, we will have to repeat ourselves many times, but maybe we will be able to achieve a state when the reader can read the test, and without looking into some private methods used in it, they will be able to understand what the test does and how.

WDYT?

.asInstanceOf[java.util.List[Object]]
.asScala
.map { obj =>
val policy = ReflecUtils.invokeMethod(obj, obj.getClass, "getLifecyclePolicy")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not use ReflecUtils. We should consider it as deprecated. Let's use org.joor.Reflect instead

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this file, we use Instant.now. Let's inject a clock to the class and use it instead

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private def failure(message: String) = {
    Task.raiseError(new IllegalStateException(message))
  }

it's not an invalid state, isn't it?

val policies = response.entityJson.obj.keySet
Task.pure(policies.contains(policy))
case response =>
failure(s"Cannot get ILM policy [$policy] - response code: ${response.statusCode}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to add the response body too.
I just wonder if we should add to the message or as a debug log.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants