Skip to content

Commit f44fb87

Browse files
authored
Merge pull request #547 from Hydrospheredata/chore/handle_errors
Error handing fixes
2 parents e34847f + 4ef03a2 commit f44fb87

File tree

10 files changed

+52
-33
lines changed

10 files changed

+52
-33
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,6 @@ env-*
2525
dist/
2626
build/
2727
__version__.py
28+
.bloop
29+
.metals
30+
.vscode

mist-lib/src/main/python/setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@
3636

3737
packages=find_packages(exclude=['tests']),
3838
setup_requires=['pytest-runner'],
39-
tests_require=['pytest'],
39+
tests_require=['pytest==4.6.4'],
4040
test_suite='tests'
4141
)

mist/master/src/main/scala/io/hydrosphere/mist/master/JobDetailsRequest.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,25 @@
11
package io.hydrosphere.mist.master
22

3+
import cats.data.NonEmptyList
4+
import java.io.File
5+
36
sealed trait FilterClause
47

58
object FilterClause {
69
case class ByFunctionId(id: String) extends FilterClause
710
case class ByWorkerId(id: String) extends FilterClause
8-
case class ByStatuses(statuses: Seq[JobDetails.Status]) extends FilterClause
11+
case class ByStatuses(statuses: NonEmptyList[JobDetails.Status]) extends FilterClause
912
}
1013

1114
case class JobDetailsRequest(
1215
limit: Int,
1316
offset: Int,
1417
filters: Seq[FilterClause]
15-
) {
18+
) { self =>
1619

1720
def withFilter(f: FilterClause): JobDetailsRequest = copy(filters = f +: filters)
21+
def withOptFilter(optF: Option[FilterClause]): JobDetailsRequest =
22+
optF.fold(self)(f => self.withFilter(f))
1823
}
1924

2025
object JobDetailsRequest {

mist/master/src/main/scala/io/hydrosphere/mist/master/execution/JobActor.scala

+2
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ class JobActor(
9595
case JobStarted(id, time) =>
9696
report.reportPlain(StartedEvent(id, time))
9797
context become completion(callback, connection)
98+
99+
case JobFailure(_, err) => completeFailure(err, Some(connection))
98100
}
99101

100102
private def completion(callback: ActorRef, connection: PerJobConnection): Receive = {

mist/master/src/main/scala/io/hydrosphere/mist/master/interfaces/http/HttpV2Routes.scala

+23-13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.hydrosphere.mist.master.interfaces.http
22

33
import akka.http.scaladsl.Http
4-
import java.io.File
4+
import java.io.{PrintWriter, StringWriter, File}
55

66
import akka.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallable}
77
import akka.http.scaladsl.model._
@@ -30,6 +30,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
3030
import scala.concurrent.Future
3131
import scala.concurrent.duration.Duration
3232
import scala.util._
33+
import cats.data.NonEmptyList
3334

3435
case class JobRunQueryParams(
3536
force: Boolean,
@@ -111,11 +112,11 @@ object HttpV2Base {
111112
}
112113
}
113114

114-
val statusesQuery: Directive1[Seq[JobDetails.Status]] = {
115+
val statusesQuery: Directive1[Option[NonEmptyList[JobDetails.Status]]] = {
115116
parameter('status *).flatMap(raw => {
116117
toStatuses(raw) match {
117118
case Left(errors) => reject(ValidationRejection(s"Unknown statuses: ${errors.mkString(",")}"))
118-
case Right(validated) => provide(validated)
119+
case Right(validated) => provide(NonEmptyList.fromList(validated))
119120
}
120121
})
121122
}
@@ -170,9 +171,9 @@ object HttpV2Routes extends Logger {
170171
}
171172
} ~
172173
path( root / "workers"/ Segment / "jobs") { workerId =>
173-
get { (paginationQuery & statusesQuery) { (pagination, statuses) =>
174+
get { (paginationQuery & statusesQuery) { (pagination, maybeStatuses) =>
174175
val req = JobDetailsRequest(pagination.limit, pagination.offset)
175-
.withFilter(FilterClause.ByStatuses(statuses))
176+
.withOptFilter(maybeStatuses.map(FilterClause.ByStatuses))
176177
.withFilter(FilterClause.ByWorkerId(workerId))
177178

178179
onSuccess(jobService.getHistory(req))(rsp => {
@@ -231,9 +232,9 @@ object HttpV2Routes extends Logger {
231232

232233
def functionsJobs(main: MainService): Route = {
233234
path( root / "functions" / Segment / "jobs" ) { functionId =>
234-
get { (paginationQuery & statusesQuery) { (pagination, statuses) =>
235+
get { (paginationQuery & statusesQuery) { (pagination, maybeStatuses) =>
235236
val req = JobDetailsRequest(pagination.limit, pagination.offset)
236-
.withFilter(FilterClause.ByStatuses(statuses))
237+
.withOptFilter(maybeStatuses.map(FilterClause.ByStatuses))
237238
.withFilter(FilterClause.ByFunctionId(functionId))
238239

239240
onSuccess(main.execution.getHistory(req))(rsp => {
@@ -343,9 +344,9 @@ object HttpV2Routes extends Logger {
343344
def jobsRoutes(master: MainService): Route = {
344345
pathPrefix( root / "jobs") {
345346
pathEnd {
346-
get { (paginationQuery & statusesQuery) { (pagination, statuses) =>
347+
get { (paginationQuery & statusesQuery) { (pagination, maybeStatuses) =>
347348
val req = JobDetailsRequest(pagination.limit, pagination.offset)
348-
.withFilter(FilterClause.ByStatuses(statuses))
349+
.withOptFilter(maybeStatuses.map(FilterClause.ByStatuses))
349350

350351
onSuccess(master.execution.getHistory(req))(rsp => {
351352
if (pagination.paginate)
@@ -432,13 +433,22 @@ object HttpV2Routes extends Logger {
432433
}
433434

434435
def apiRoutes(main: MainService, artifacts: ArtifactRepository, mistHome: String): Route = {
435-
val exceptionHandler =
436-
ExceptionHandler {
436+
val exceptionHandler = ExceptionHandler({ case e => {
437+
val (status, msg) = e match {
437438
case ex @ (_: IllegalArgumentException | _: IllegalStateException) =>
438-
complete((StatusCodes.BadRequest, s"Bad request: ${ex.getMessage}"))
439+
(StatusCodes.BadRequest, "Bad request")
439440
case ex =>
440-
complete(HttpResponse(StatusCodes.InternalServerError, entity = s"Server error: ${ex.getMessage}"))
441+
(StatusCodes.InternalServerError, "Server error")
441442
}
443+
val errorMsg = {
444+
val writer = new StringWriter()
445+
e.printStackTrace(new PrintWriter(writer))
446+
writer.toString
447+
}
448+
val fullMsg = msg + ":\n" + errorMsg
449+
complete(HttpResponse(status, entity = fullMsg))
450+
}})
451+
442452
handleExceptions(exceptionHandler) {
443453
functionAllRoutes(main) ~
444454
jobsRoutes(main) ~

mist/master/src/main/scala/io/hydrosphere/mist/master/store/JobRequestSql.scala

+9-13
Original file line numberDiff line numberDiff line change
@@ -93,25 +93,21 @@ trait JobRequestSql {
9393
def clear = sql"truncate table job_details"
9494

9595
def filteredByStatuses(statuses: Seq[JobDetails.Status]): Fragment = {
96-
if (statuses.nonEmpty) {
97-
beginSql ++ sql"where " ++ Fragments.in(fr"status", jobDetailStatusToList(statuses))
98-
} else {
99-
beginSql
100-
}
96+
beginSql ++ statusFr(fr"where ", statuses)
10197
}
10298

10399
def getAll(limit: Int, offset: Int, statuses: Seq[JobDetails.Status]): Fragment = {
104100
val endOfSQL = sql" limit $limit offset $offset"
105101

106-
if (statuses.nonEmpty) {
107-
beginSql ++ sql"where " ++ Fragments.in(fr"status", jobDetailStatusToList(statuses)) ++ endOfSQL
108-
} else {
109-
beginSql ++ endOfSQL
110-
}
102+
beginSql ++ statusFr(fr"where ", statuses) ++ endOfSQL
111103
}
112104

113-
def jobDetailStatusToList(seq: Seq[JobDetails.Status]): NonEmptyList[String] =
114-
NonEmptyList.fromList(seq.map(_.toString).toList).get
105+
def statusFr(prefix: Fragment, statuses: Seq[JobDetails.Status]): Fragment = {
106+
NonEmptyList.fromList(statuses.toList) match {
107+
case None => Fragment.empty
108+
case Some(inSt) => prefix ++ Fragments.in(fr"status", inSt.map(_.toString))
109+
}
110+
}
115111

116112
/**
117113
* Generates Doobie sql fragment with JobDetailsRequest
@@ -121,7 +117,7 @@ trait JobRequestSql {
121117
if (req.filters.isEmpty) sql"" else {
122118
fr"where" ++ req.filters.map {
123119
case ByFunctionId(id) => fr"function = $id"
124-
case ByStatuses(statuses) => Fragments.in(fr"status", jobDetailStatusToList(statuses))
120+
case ByStatuses(statuses) => Fragments.in(fr"status", statuses.map(_.toString))
125121
case ByWorkerId(id) => fr"worker_id = $id"
126122
}.reduce((a, b) => a ++ fr"and" ++ b)
127123
}

mist/master/src/test/scala/io/hydrosphere/mist/master/interfaces/http/HttpApiV2Spec.scala

-1
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,6 @@ class HttpApiV2Spec extends FunSpec
536536

537537
val route = HttpV2Routes.apiRoutes(master, mock[ArtifactRepository], "")
538538
Post(s"/v2/api/functions/x/jobs", JsMap("1" -> "Hello".js)) ~> route ~> check {
539-
responseAs[String] shouldBe "Bad request: argument missing"
540539
status shouldBe StatusCodes.BadRequest
541540
}
542541
}

mist/master/src/test/scala/io/hydrosphere/mist/master/store/JobRequestsSqlSpec.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import doobie.implicits._
88
import io.hydrosphere.mist.core.CommonData.{Action, JobParams}
99
import io.hydrosphere.mist.master.JobDetails.Source
1010
import mist.api.data.JsMap
11+
import cats.data.NonEmptyList
1112

1213
class JobRequestsSqlSpec extends FunSpec with Matchers{
1314

@@ -36,13 +37,13 @@ class JobRequestsSqlSpec extends FunSpec with Matchers{
3637
}
3738

3839
it("sql filter by status") {
39-
val sql = jobRequestSql.generateSqlByJobDetailsRequest(JobDetailsRequest(0, 0).withFilter(ByStatuses(Seq(Initialized, Queued))))
40+
val sql = jobRequestSql.generateSqlByJobDetailsRequest(JobDetailsRequest(0, 0).withFilter(ByStatuses(NonEmptyList.of(Initialized, Queued))))
4041
sql.toString() shouldEqual sql"select * from job_details where status IN (?, ?) order by create_time desc limit ? offset ? ".toString()
4142
}
4243

4344
it("sql complex filter") {
4445
val sql = jobRequestSql.generateSqlByJobDetailsRequest(JobDetailsRequest(0, 0).
45-
withFilter(ByStatuses(Seq(Initialized, Queued))).
46+
withFilter(ByStatuses(NonEmptyList.of(Initialized, Queued))).
4647
withFilter(ByWorkerId("ID")).
4748
withFilter(ByFunctionId("ID")))
4849
sql.toString() shouldEqual sql"select * from job_details where function = ? and worker_id = ? and status IN (?, ?) order by create_time desc limit ? offset ? ".toString()

mist/worker/src/main/scala/io/hydrosphere/mist/worker/WorkerActor.scala

+3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import io.hydrosphere.mist.core.CommonData._
88
import io.hydrosphere.mist.worker.runners._
99
import mist.api.data.JsData
1010
import org.apache.spark.streaming.StreamingContext
11+
12+
import scala.concurrent.Future
13+
1114
import RequestSetup._
1215

1316
class WorkerActor(

project/Ui.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ object Ui {
1515

1616
lazy val settings = Seq(
1717
uiUrl := { (s: String) => s"https://github.com/Hydrospheredata/mist-ui/releases/download/v$s/mist-ui-$s.tar.gz" },
18-
uiVersion := "2.2.0",
18+
uiVersion := "2.2.1",
1919
uiCheckoutDir := "ui_local",
2020
ui := {
2121
val local = baseDirectory.value / uiCheckoutDir.value

0 commit comments

Comments
 (0)