Skip to content

Async jobs add endtime #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ public class ApiConstants {
public static final String IPSEC_PSK = "ipsecpsk";
public static final String GUEST_IP = "guestip";
public static final String REMOVED = "removed";
public static final String END_TIME = "endtime";
public static final String IKE_POLICY = "ikepolicy";
public static final String ESP_POLICY = "esppolicy";
public static final String IKE_LIFETIME = "ikelifetime";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public class AsyncJobResponse extends BaseResponse {
@Param(description = " the created date of the job")
private Date created;

@SerializedName(ApiConstants.END_TIME)
@Param(description = " the removed date of the job")
private Date removed;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider - finished, completed?


public void setAccountId(String accountId) {
this.accountId = accountId;
}
Expand Down Expand Up @@ -119,4 +123,8 @@ public void setJobInstanceId(String jobInstanceId) {
public void setCreated(Date created) {
this.created = created;
}

public void setRemoved(final Date removed) {
this.removed = removed;
}
}
2 changes: 2 additions & 0 deletions api/src/main/java/org/apache/cloudstack/jobs/JobInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public boolean done() {

Date getCreated();

Date getRemoved();

Date getLastUpdated();

Date getLastPolled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Date;
import java.util.List;

import org.apache.cloudstack.api.ApiConstants;
import org.apache.log4j.Logger;

import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
Expand Down Expand Up @@ -71,7 +72,7 @@ public AsyncJobDaoImpl() {
expiringUnfinishedAsyncJobSearch.done();

expiringCompletedAsyncJobSearch = createSearchBuilder();
expiringCompletedAsyncJobSearch.and("created", expiringCompletedAsyncJobSearch.entity().getCreated(), SearchCriteria.Op.LTEQ);
expiringCompletedAsyncJobSearch.and(ApiConstants.REMOVED, expiringCompletedAsyncJobSearch.entity().getRemoved(), SearchCriteria.Op.LTEQ);
expiringCompletedAsyncJobSearch.and("completeMsId", expiringCompletedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NNULL);
expiringCompletedAsyncJobSearch.and("jobStatus", expiringCompletedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.NEQ);
expiringCompletedAsyncJobSearch.done();
Expand Down Expand Up @@ -168,11 +169,11 @@ public List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit) {
}

@Override
public List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit) {
SearchCriteria<AsyncJobVO> sc = expiringCompletedAsyncJobSearch.create();
sc.setParameters("created", cutTime);
public List<AsyncJobVO> getExpiredCompletedJobs(final Date cutTime, final int limit) {
final SearchCriteria<AsyncJobVO> sc = expiringCompletedAsyncJobSearch.create();
sc.setParameters(ApiConstants.REMOVED, cutTime);
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
final Filter filter = new Filter(AsyncJobVO.class, ApiConstants.REMOVED, true, 0L, (long)limit);
return listIncludingRemovedBy(sc, filter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,9 @@ public void completeAsyncJob(final long jobId, final Status jobStatus, final int
if (s_logger.isDebugEnabled()) {
s_logger.debug("Wake up jobs related to job-" + jobId);
}
List<Long> wakeupList = Transaction.execute(new TransactionCallback<List<Long>>() {
final List<Long> wakeupList = Transaction.execute(new TransactionCallback<List<Long>>() {
@Override
public List<Long> doInTransaction(TransactionStatus status) {
public List<Long> doInTransaction(final TransactionStatus status) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Update db status for job-" + jobId);
}
Expand All @@ -302,14 +302,16 @@ public List<Long> doInTransaction(TransactionStatus status) {
job.setResult(null);
}

job.setLastUpdated(DateUtil.currentGMTTime());
final Date currentGMTTime = DateUtil.currentGMTTime();
job.setLastUpdated(currentGMTTime);
job.setRemoved(currentGMTTime);
job.setExecutingMsid(null);
_jobDao.update(jobId, job);

if (s_logger.isDebugEnabled()) {
s_logger.debug("Wake up jobs joined with job-" + jobId + " and disjoin all subjobs created from job- " + jobId);
}
List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
final List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
_joinMapDao.disjoinAllJobs(jobId);

// purge the job sync item from queue
Expand Down Expand Up @@ -445,8 +447,8 @@ public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObj
}

@Override
public AsyncJob queryJob(long jobId, boolean updatePollTime) {
AsyncJobVO job = _jobDao.findById(jobId);
public AsyncJob queryJob(final long jobId, final boolean updatePollTime) {
final AsyncJobVO job = _jobDao.findByIdIncludingRemoved(jobId);

if (updatePollTime) {
job.setLastPolled(DateUtil.currentGMTTime());
Expand Down Expand Up @@ -1025,15 +1027,16 @@ public void doInTransactionWithoutResult(TransactionStatus status) {
// purge sync queue item running on this ms node
_queueMgr.cleanupActiveQueueItems(msid, true);
// reset job status for all jobs running on this ms node
List<AsyncJobVO> jobs = _jobDao.getResetJobs(msid);
for (AsyncJobVO job : jobs) {
final List<AsyncJobVO> jobs = _jobDao.getResetJobs(msid);
for (final AsyncJobVO job : jobs) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Cancel left-over job-" + job.getId());
}
job.setStatus(JobInfo.Status.FAILED);
job.setResultCode(ApiErrorCode.INTERNAL_ERROR.getHttpCode());
job.setResult("job cancelled because of management server restart or shutdown");
job.setCompleteMsid(msid);
job.setRemoved(DateUtil.currentGMTTime());
_jobDao.update(job.getId(), job);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Purge queue item for cancelled job-" + job.getId());
Expand All @@ -1049,8 +1052,8 @@ public void doInTransactionWithoutResult(TransactionStatus status) {
}
}
}
List<SnapshotDetailsVO> snapshotList = _snapshotDetailsDao.findDetails(AsyncJob.Constants.MS_ID, Long.toString(msid), false);
for (SnapshotDetailsVO snapshotDetailsVO : snapshotList) {
final List<SnapshotDetailsVO> snapshotList = _snapshotDetailsDao.findDetails(AsyncJob.Constants.MS_ID, Long.toString(msid), false);
for (final SnapshotDetailsVO snapshotDetailsVO : snapshotList) {
SnapshotInfo snapshot = snapshotFactory.getSnapshot(snapshotDetailsVO.getResourceId(), DataStoreRole.Primary);
snapshotSrv.processEventOnSnapshotObject(snapshot, Snapshot.Event.OperationFailed);
_snapshotDetailsDao.removeDetail(snapshotDetailsVO.getResourceId(), AsyncJob.Constants.MS_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,15 @@ public void setUuid(String uuid) {
this.uuid = uuid;
}

@Override
public Date getRemoved() {
return removed;
}

public void setRemoved(final Date removed) {
this.removed = removed;
}

@Override
public String toString() {
StringBuffer sb = new StringBuffer();
Expand All @@ -392,6 +401,7 @@ public String toString() {
sb.append(", lastUpdated: ").append(getLastUpdated());
sb.append(", lastPolled: ").append(getLastPolled());
sb.append(", created: ").append(getCreated());
sb.append(", removed: ").append(getRemoved());
sb.append("}");
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.jobs;

import com.cloud.storage.dao.SnapshotDetailsDao;
import com.cloud.storage.dao.VolumeDao;
import com.cloud.storage.dao.VolumeDetailsDao;
import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotService;
import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobManagerImpl;
import org.apache.cloudstack.framework.jobs.impl.SyncQueueManager;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.powermock.modules.junit4.PowerMockRunner;

import static org.junit.Assert.assertFalse;

@RunWith(PowerMockRunner.class)
public class AsyncJobManagerImplTest {

@InjectMocks
private AsyncJobManagerImpl asyncJobManager;

@Mock
private SyncQueueManager _queueMgr;

@Mock
private AsyncJobDao _jobDao;

@Mock
private VolumeDetailsDao _volumeDetailsDao;

@Mock
private VolumeDao _volsDao;

@Mock
private SnapshotDetailsDao _snapshotDetailsDao;

@Mock
private SnapshotDataFactory snapshotFactory;

@Mock
private SnapshotService snapshotSrv;

@Mock
private Logger s_logger;

// @Before
// public void setup() {
// asyncJobManager = spy(AsyncJobManagerImpl.class);
// }

@Test
public void testcleanupLeftOverJobs() throws Exception {
// Whitebox.invokeMethod(asyncJobManager, "cleanupLeftOverJobs", 1L);
// verify(_queueMgr).cleanupActiveQueueItems(1L, true);
// boolean result = Whitebox.invokeMethod(asyncJobManager, "stop");
assertFalse(false);
}
}
10 changes: 5 additions & 5 deletions server/src/main/java/com/cloud/api/ApiResponseHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -1808,16 +1808,16 @@ public TemplatePermissionsResponse createTemplatePermissionsResponse(ResponseVie
}

@Override
public AsyncJobResponse queryJobResult(QueryAsyncJobResultCmd cmd) {
Account caller = CallContext.current().getCallingAccount();
public AsyncJobResponse queryJobResult(final QueryAsyncJobResultCmd cmd) {
final Account caller = CallContext.current().getCallingAccount();

AsyncJob job = _entityMgr.findById(AsyncJob.class, cmd.getId());
final AsyncJob job = _entityMgr.findByIdIncludingRemoved(AsyncJob.class, cmd.getId());
if (job == null) {
throw new InvalidParameterValueException("Unable to find a job by id " + cmd.getId());
}

User userJobOwner = _accountMgr.getUserIncludingRemoved(job.getUserId());
Account jobOwner = _accountMgr.getAccount(userJobOwner.getAccountId());
final User userJobOwner = _accountMgr.getUserIncludingRemoved(job.getUserId());
final Account jobOwner = _accountMgr.getAccount(userJobOwner.getAccountId());

//check permissions
if (_accountMgr.isNormalUser(caller.getId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ protected AsyncJobJoinDaoImpl() {
}

@Override
public AsyncJobResponse newAsyncJobResponse(AsyncJobJoinVO job) {
AsyncJobResponse jobResponse = new AsyncJobResponse();
public AsyncJobResponse newAsyncJobResponse(final AsyncJobJoinVO job) {
final AsyncJobResponse jobResponse = new AsyncJobResponse();
jobResponse.setAccountId(job.getAccountUuid());
jobResponse.setUserId(job.getUserUuid());
jobResponse.setCmd(job.getCmd());
jobResponse.setCreated(job.getCreated());
jobResponse.setRemoved(job.getRemoved());
jobResponse.setJobId(job.getUuid());
jobResponse.setJobStatus(job.getStatus());
jobResponse.setJobProcStatus(job.getProcessStatus());
Expand All @@ -68,15 +69,15 @@ public AsyncJobResponse newAsyncJobResponse(AsyncJobJoinVO job) {
}
jobResponse.setJobResultCode(job.getResultCode());

boolean savedValue = SerializationContext.current().getUuidTranslation();
final boolean savedValue = SerializationContext.current().getUuidTranslation();
SerializationContext.current().setUuidTranslation(false);

Object resultObject = ApiSerializerHelper.fromSerializedString(job.getResult());
final Object resultObject = ApiSerializerHelper.fromSerializedString(job.getResult());
jobResponse.setJobResult((ResponseObject)resultObject);
SerializationContext.current().setUuidTranslation(savedValue);

if (resultObject != null) {
Class<?> clz = resultObject.getClass();
final Class<?> clz = resultObject.getClass();
if (clz.isPrimitive() || clz.getSuperclass() == Number.class || clz == String.class || clz == Date.class) {
jobResponse.setJobResultType("text");
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.cloud.storage.dao;

import com.cloud.api.query.dao.AsyncJobJoinDaoImpl;
import com.cloud.api.query.vo.AsyncJobJoinVO;
import org.apache.cloudstack.api.ApiCommandJobType;
import org.apache.cloudstack.api.response.AsyncJobResponse;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;

import java.util.Date;

@RunWith(MockitoJUnitRunner.class)
public class AsyncJobJoinDaoTest {

@InjectMocks
AsyncJobJoinDaoImpl dao;

@Test
public void testNewAsyncJobResponseValidValues() {
final AsyncJobJoinVO job = new AsyncJobJoinVO();
ReflectionTestUtils.setField(job,"uuid","a2b22932-1b61-4406-8e89-4ae19968e8d3");
ReflectionTestUtils.setField(job,"accountUuid","4dea2836-72cc-11e8-b2de-107b4429825a");
ReflectionTestUtils.setField(job,"domainUuid","4dea136b-72cc-11e8-b2de-107b4429825a");
ReflectionTestUtils.setField(job,"userUuid","4decc724-72cc-11e8-b2de-107b4429825a");
ReflectionTestUtils.setField(job,"cmd","org.apache.cloudstack.api.command.admin.vm.StartVMCmdByAdmin");
ReflectionTestUtils.setField(job,"status",0);
ReflectionTestUtils.setField(job,"resultCode",0);
ReflectionTestUtils.setField(job,"result",null);
ReflectionTestUtils.setField(job,"created",new Date());
ReflectionTestUtils.setField(job,"removed",new Date());
ReflectionTestUtils.setField(job,"instanceType",ApiCommandJobType.VirtualMachine);
ReflectionTestUtils.setField(job,"instanceId",3L);
final AsyncJobResponse response = dao.newAsyncJobResponse(job);
Assert.assertEquals(job.getUuid(),response.getJobId());
Assert.assertEquals(job.getAccountUuid(), ReflectionTestUtils.getField(response, "accountId"));
Assert.assertEquals(job.getUserUuid(), ReflectionTestUtils.getField(response, "userId"));
Assert.assertEquals(job.getCmd(), ReflectionTestUtils.getField(response, "cmd"));
Assert.assertEquals(job.getStatus(), ReflectionTestUtils.getField(response, "jobStatus"));
Assert.assertEquals(job.getStatus(), ReflectionTestUtils.getField(response, "jobProcStatus"));
Assert.assertEquals(job.getResultCode(), ReflectionTestUtils.getField(response, "jobResultCode"));
Assert.assertEquals(null, ReflectionTestUtils.getField(response, "jobResultType"));
Assert.assertEquals(job.getResult(), ReflectionTestUtils.getField(response, "jobResult"));
Assert.assertEquals(job.getInstanceType().toString(), ReflectionTestUtils.getField(response, "jobInstanceType"));
Assert.assertEquals(job.getInstanceUuid(), ReflectionTestUtils.getField(response, "jobInstanceId"));
Assert.assertEquals(job.getCreated(), ReflectionTestUtils.getField(response, "created"));
Assert.assertEquals(job.getRemoved(), ReflectionTestUtils.getField(response, "removed"));
}

@Test
public void testNewAsyncJobResponseNullValues() {
final AsyncJobJoinVO job = new AsyncJobJoinVO();
final AsyncJobResponse response = dao.newAsyncJobResponse(job);
Assert.assertEquals(job.getUuid(),response.getJobId());
Assert.assertEquals(job.getAccountUuid(), ReflectionTestUtils.getField(response, "accountId"));
Assert.assertEquals(job.getUserUuid(), ReflectionTestUtils.getField(response, "userId"));
Assert.assertEquals(job.getCmd(), ReflectionTestUtils.getField(response, "cmd"));
Assert.assertEquals(job.getStatus(), ReflectionTestUtils.getField(response, "jobStatus"));
Assert.assertEquals(job.getStatus(), ReflectionTestUtils.getField(response, "jobProcStatus"));
Assert.assertEquals(job.getResultCode(), ReflectionTestUtils.getField(response, "jobResultCode"));
Assert.assertEquals(null, ReflectionTestUtils.getField(response, "jobResultType"));
Assert.assertEquals(job.getResult(), ReflectionTestUtils.getField(response, "jobResult"));
Assert.assertEquals(job.getInstanceType(), ReflectionTestUtils.getField(response, "jobInstanceType"));
Assert.assertEquals(job.getInstanceUuid(), ReflectionTestUtils.getField(response, "jobInstanceId"));
Assert.assertEquals(job.getCreated(), ReflectionTestUtils.getField(response, "created"));
Assert.assertEquals(job.getRemoved(), ReflectionTestUtils.getField(response, "removed"));
}
}