Skip to content

Better control on running for duplicate job constraint #219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@

#### New features

- Job callback (#217)
Add a global listener from SwiftQueueManager and get notification for job run and completion

- Job status listener (#217)
- Allow a queue to run multiple jobs in parallel (#215)
For a better support of concurrency, you can now define how many jobs schedule in a queue can run in parallel.

#### Breaking changes
- Rename synchronous to initInBackground (#213)
- Rename group() to parallel() (#212)

#### Enhancement

- Better control on running for duplicate job constraint (#219)
- Add no logger by default (#211)


Expand Down
15 changes: 11 additions & 4 deletions Sources/SwiftQueue/Constraint+UniqueUUID.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@ internal final class UniqueUUIDConstraint: JobConstraint {

func willSchedule(queue: SqOperationQueue, operation: SqOperation) throws {
for ope in queue.operations where ope.name == operation.info.uuid {
if operation.info.override {
ope.cancel()
} else {
throw SwiftQueueError.duplicate
if shouldAbort(ope: ope, operation: operation) {
if operation.info.override {
ope.cancel()
break
} else {
throw SwiftQueueError.duplicate
}
}
}
}

private func shouldAbort(ope: Operation, operation: SqOperation) -> Bool {
return (ope.isExecuting && operation.info.includeExecutingJob) || !ope.isExecuting
}

func willRun(operation: SqOperation) throws {
// Nothing to check
}
Expand Down
5 changes: 3 additions & 2 deletions Sources/SwiftQueue/JobBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ public final class JobBuilder {
self.info = JobInfo(type: type)
}

/// Allow only 1 job at the time with this ID scheduled or running
/// Allow only 1 job at the time with this ID scheduled or running if includeExecutingJob is true
/// Same job scheduled with same id will result in onRemove(SwiftQueueError.duplicate) if override = false
/// If override = true the previous job will be canceled and the new job will be scheduled
public func singleInstance(forId: String, override: Bool = false) -> Self {
public func singleInstance(forId: String, override: Bool = false, includeExecutingJob: Bool = true) -> Self {
assertNotEmptyString(forId)
info.uuid = forId
info.override = override
info.includeExecutingJob = includeExecutingJob
return self
}

Expand Down
56 changes: 41 additions & 15 deletions Sources/SwiftQueue/JobInfo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public struct JobInfo {
/// Override job when scheduling a job with same uuid
var override: Bool

//// Including job that are executing when scheduling with same uuid
var includeExecutingJob: Bool

/// Set of identifiers
var tags: Set<String>

Expand Down Expand Up @@ -71,27 +74,50 @@ public struct JobInfo {
/// Current number of repetition. Transient value
var currentRepetition: Int

init(type: String) {
self.init(
type: type,
queueName: "GLOBAL",
uuid: UUID().uuidString,
override: false,
includeExecutingJob: true,
tags: Set<String>(),
delay: nil,
deadline: nil,
requireNetwork: NetworkType.any,
isPersisted: false,
params: [:],
createTime: Date(),
interval: -1.0,
maxRun: .limited(0),
retries: .limited(0),
runCount: 0,
requireCharging: false)
}

init(type: String,
queueName: String = "GLOBAL",
uuid: String = UUID().uuidString,
override: Bool = false,
tags: Set<String> = Set<String>(),
delay: TimeInterval? = nil,
deadline: Date? = nil,
requireNetwork: NetworkType = NetworkType.any,
isPersisted: Bool = false,
params: [String: Any] = [:],
createTime: Date = Date(),
interval: TimeInterval = -1.0,
maxRun: Limit = .limited(0),
retries: Limit = .limited(0),
runCount: Double = 0,
requireCharging: Bool = false) {
queueName: String,
uuid: String,
override: Bool,
includeExecutingJob: Bool,
tags: Set<String>,
delay: TimeInterval?,
deadline: Date?,
requireNetwork: NetworkType,
isPersisted: Bool,
params: [String: Any],
createTime: Date,
interval: TimeInterval,
maxRun: Limit,
retries: Limit,
runCount: Double,
requireCharging: Bool) {

self.type = type
self.queueName = queueName
self.uuid = uuid
self.override = override
self.includeExecutingJob = includeExecutingJob
self.tags = tags
self.delay = delay
self.deadline = deadline
Expand Down
4 changes: 4 additions & 0 deletions Sources/SwiftQueue/JobInfoSerializer+Decodable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ extension JobInfo: Decodable {
case type = "type"
case uuid = "uuid"
case override = "override"
case includeExecutingJob = "includeExecutingJob"
case queueName = "group"
case tags = "tags"
case delay = "delay"
Expand All @@ -79,6 +80,7 @@ extension JobInfo: Decodable {
let type: String = try container.decode(String.self, forKey: .type)
let uuid: String = try container.decode(String.self, forKey: .uuid)
let override: Bool = try container.decode(Bool.self, forKey: .override)
let includeExecutingJob: Bool = try container.decode(Bool.self, forKey: .includeExecutingJob)
let queueName: String = try container.decode(String.self, forKey: .queueName)
let tags: Set<String> = try container.decode(Set.self, forKey: .tags)
let delay: TimeInterval? = try container.decodeIfPresent(TimeInterval.self, forKey: .delay)
Expand All @@ -98,6 +100,7 @@ extension JobInfo: Decodable {
queueName: queueName,
uuid: uuid,
override: override,
includeExecutingJob: includeExecutingJob,
tags: tags,
delay: delay,
deadline: deadline,
Expand All @@ -120,6 +123,7 @@ extension JobInfo: Encodable {
try container.encode(type, forKey: .type)
try container.encode(uuid, forKey: .uuid)
try container.encode(override, forKey: .override)
try container.encode(includeExecutingJob, forKey: .includeExecutingJob)
try container.encode(queueName, forKey: .queueName)
try container.encode(tags, forKey: .tags)
try container.encode(delay, forKey: .delay)
Expand Down
34 changes: 18 additions & 16 deletions Sources/SwiftQueue/JobInfoSerializer+V1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,28 +73,30 @@ internal extension JobInfo {

func toDictionary() -> [String: Any] {
var dict = [String: Any]()
dict[JobInfoKeys.type.stringValue] = self.type
dict[JobInfoKeys.uuid.stringValue] = self.uuid
dict[JobInfoKeys.override.stringValue] = self.override
dict[JobInfoKeys.queueName.stringValue] = self.queueName
dict[JobInfoKeys.tags.stringValue] = Array(self.tags)
dict[JobInfoKeys.delay.stringValue] = self.delay
dict[JobInfoKeys.deadline.stringValue] = self.deadline.map(dateFormatter.string)
dict[JobInfoKeys.requireNetwork.stringValue] = self.requireNetwork.rawValue
dict[JobInfoKeys.isPersisted.stringValue] = self.isPersisted
dict[JobInfoKeys.params.stringValue] = self.params
dict[JobInfoKeys.createTime.stringValue] = dateFormatter.string(from: self.createTime)
dict[JobInfoKeys.runCount.stringValue] = self.runCount
dict[JobInfoKeys.maxRun.stringValue] = self.maxRun.rawValue
dict[JobInfoKeys.retries.stringValue] = self.retries.rawValue
dict[JobInfoKeys.interval.stringValue] = self.interval
dict[JobInfoKeys.requireCharging.stringValue] = self.requireCharging
dict[JobInfoKeys.type.stringValue] = self.type
dict[JobInfoKeys.uuid.stringValue] = self.uuid
dict[JobInfoKeys.override.stringValue] = self.override
dict[JobInfoKeys.includeExecutingJob.stringValue] = self.includeExecutingJob
dict[JobInfoKeys.queueName.stringValue] = self.queueName
dict[JobInfoKeys.tags.stringValue] = Array(self.tags)
dict[JobInfoKeys.delay.stringValue] = self.delay
dict[JobInfoKeys.deadline.stringValue] = self.deadline.map(dateFormatter.string)
dict[JobInfoKeys.requireNetwork.stringValue] = self.requireNetwork.rawValue
dict[JobInfoKeys.isPersisted.stringValue] = self.isPersisted
dict[JobInfoKeys.params.stringValue] = self.params
dict[JobInfoKeys.createTime.stringValue] = dateFormatter.string(from: self.createTime)
dict[JobInfoKeys.runCount.stringValue] = self.runCount
dict[JobInfoKeys.maxRun.stringValue] = self.maxRun.rawValue
dict[JobInfoKeys.retries.stringValue] = self.retries.rawValue
dict[JobInfoKeys.interval.stringValue] = self.interval
dict[JobInfoKeys.requireCharging.stringValue] = self.requireCharging
return dict
}

mutating func bind(dictionary: [String: Any]) throws {
dictionary.assign(JobInfoKeys.uuid.stringValue, &self.uuid)
dictionary.assign(JobInfoKeys.override.stringValue, &self.override)
dictionary.assign(JobInfoKeys.includeExecutingJob.stringValue, &self.includeExecutingJob)
dictionary.assign(JobInfoKeys.queueName.stringValue, &self.queueName)
dictionary.assign(JobInfoKeys.tags.stringValue, &self.tags) { (array: [String]) -> Set<String> in Set(array) }
dictionary.assign(JobInfoKeys.delay.stringValue, &self.delay)
Expand Down
2 changes: 0 additions & 2 deletions Sources/SwiftQueue/SwiftQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public protocol QueueCreator {

}


/// Method to implement to have a custom persister
public protocol JobPersister {

Expand Down Expand Up @@ -148,7 +147,6 @@ extension BasicQueue: Queue {

}


/// Listen from job status
public protocol JobListener {

Expand Down
25 changes: 25 additions & 0 deletions Tests/SwiftQueueTests/ConstraintUniqueUUIDTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,29 @@ class ConstraintUniqueUUIDTests: XCTestCase {
manager.waitUntilAllOperationsAreFinished()
}

func testUniqueIdConstraintShouldNotCancelRunningJob() {
let (type1, job1) = (UUID().uuidString, TestJob())
let (type2, job2) = (UUID().uuidString, TestJob())
let (type3, job3) = (UUID().uuidString, TestJob())

let id = UUID().uuidString

let creator = TestCreator([type1: job1, type2: job2, type3: job3])

let manager = SwiftQueueManagerBuilder(creator: creator).set(persister: NoSerializer.shared).build()
JobBuilder(type: type1)
.singleInstance(forId: id)
.delay(time: 3600)
.schedule(manager: manager)

JobBuilder(type: type2).singleInstance(forId: id, includeExecutingJob: false).schedule(manager: manager)
JobBuilder(type: type3).singleInstance(forId: id, includeExecutingJob: false).schedule(manager: manager)

job3.awaitForRemoval()
job3.assertRemovedBeforeRun(reason: .duplicate)

manager.cancelAllOperations()
manager.waitUntilAllOperationsAreFinished()
}

}
1 change: 0 additions & 1 deletion Tests/SwiftQueueTests/SwiftQueueManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class SwiftQueueManagerTests: XCTestCase {
job.assertSingleCompletion()
}


func testJobListener() {
let (type, job) = (UUID().uuidString, TestJob())

Expand Down