-
Notifications
You must be signed in to change notification settings - Fork 164
[RORDEV-1410] Data stream audit sink setup improvements #1089
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: epic/RORDEV-1263
Are you sure you want to change the base?
Conversation
I added changes for the es816x module. I will port to other modules when the solution is accepted |
import tech.beshu.ror.implicits.* | ||
import tech.beshu.ror.utils.RefinedUtils.* | ||
|
||
import java.util.concurrent.TimeUnit | ||
|
||
final class AuditDataStreamCreator(services: NonEmptyList[DataStreamService]) extends Logging { | ||
|
||
def createIfNotExists(dataStreamName: RorAuditDataStream): Task[Unit] = { | ||
services.toList.traverse(createIfNotExists(_, dataStreamName)).map((_: List[Unit]) => ()) | ||
def createIfNotExists(dataStreamName: RorAuditDataStream): Task[Either[String, Unit]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's create at least type def for the String (aka Message or sth like that)
.toList | ||
.map(createIfNotExists(_, dataStreamName)) | ||
.sequence | ||
.map(_.sequence.map((_: List[Unit]) => ())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any monoid here, so I guess we miss potential error messages (when more than one service fails)
.map((_: Unit) => new EsDataStreamBasedAuditSink(serializer, rorAuditDataStream, auditSinkService)) | ||
.flatMap { | ||
case Right(()) => Task.delay(new EsDataStreamBasedAuditSink(serializer, rorAuditDataStream, auditSinkService)) | ||
case Left(errorMsg) => Task.raiseError(new IllegalStateException(errorMsg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it an illegal state? I think it's possible case
_ <- createDataStream(settings.dataStreamName) | ||
} yield () | ||
} | ||
_ <- createIfAbsent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can format it like that:
_ <- createIfAbsent(
checkIfResourceExists = checkIndexLifecyclePolicyExists(settings.lifecyclePolicy.id),
createResource = createIndexLifecyclePolicy(settings.lifecyclePolicy),
onNotAcknowledged = Failure(s"Unable to determine if the index lifecycle policy with ID '${settings.lifecyclePolicy.id.show}' has been created")
)
|
||
def checkDataStreamExists(dataStreamName: DataStreamName.Full): Task[Boolean] | ||
|
||
protected def createDataStream(dataStreamName: DataStreamName.Full): Task[CreationResult] | ||
|
||
protected def checkIndexLifecyclePolicyExists(policyId: NonEmptyString): Task[Boolean] = Task.pure(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the default implementation? I'm not sure if we need it
(here, and below too)
"A ReadonlyREST data stream service" when { | ||
"fully setup data stream called" should { | ||
"not attempt to create data stream when one exists" in { | ||
tryToCreateDataStream(DataStreamsMocks.alreadyExists) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests are great, but IMO we should do sth to improve the readability. Now, it's hard for the reader to grasp what each test does without looking into runTests
, testSuccessfulDataStreamSetup
, and other method implementations.
Maybe we should try to maintain the given-when-then style? And try to explicitly show what the mock returns, what is called, and how we assert the result.
Obviously, we will have to repeat ourselves many times, but maybe we will be able to achieve a state when the reader can read the test, and without looking into some private methods used in it, they will be able to understand what the test does and how.
WDYT?
.asInstanceOf[java.util.List[Object]] | ||
.asScala | ||
.map { obj => | ||
val policy = ReflecUtils.invokeMethod(obj, obj.getClass, "getLifecyclePolicy") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not use ReflecUtils. We should consider it as deprecated. Let's use org.joor.Reflect
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this file, we use Instant.now
. Let's inject a clock to the class and use it instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def failure(message: String) = {
Task.raiseError(new IllegalStateException(message))
}
it's not an invalid state, isn't it?
val policies = response.entityJson.obj.keySet | ||
Task.pure(policies.contains(policy)) | ||
case response => | ||
failure(s"Cannot get ILM policy [$policy] - response code: ${response.statusCode}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice to add the response body too.
I just wonder if we should add to the message or as a debug log.
No description provided.