Skip to content

Fix workflow presenter to run things from threads #514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions src/snapred/ui/presenter/WorkflowPresenter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, List
from typing import Any, Callable, List, Tuple

from qtpy.QtCore import QObject, Signal, Slot
from qtpy.QtWidgets import QMainWindow, QMessageBox
Expand Down Expand Up @@ -164,10 +164,26 @@ def verifyAndContinue():
return model.continueAction(self)

# do action
self.worker = self.worker_pool.createWorker(target=verifyAndContinue, args=None)
continueOnSuccess = lambda success: self.advanceWorkflow() if success else None # noqa E731
self.handleAction(verifyAndContinue, None, continueOnSuccess)

def handleAction(
self,
action: Callable[[Any], Any],
args: Tuple[Any, ...] | Any | None,
onSuccess: Callable[[None], None],
):
"""
Send front-end task to a separate worker to complete.
@param action : a Callable to be called on worker
@param args : the argument action is to be called with
@param onSuccess : another Callable, called on completion, must take no parameters
"""
# do action
self.worker = self.worker_pool.createWorker(target=action, args=args)
self.worker.finished.connect(lambda: self._enableButtons(True)) # re-enable panel buttons on finish
self.worker.result.connect(self._handleComplications)
self.worker.success.connect(lambda success: self.advanceWorkflow() if success else None)
self.worker.success.connect(onSuccess)
self.worker_pool.submitWorker(self.worker)
self.actionCompleted.emit()

Expand Down
4 changes: 3 additions & 1 deletion src/snapred/ui/threading/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def __init__(self, target, args=None):
def run(self):
"""Long-running task."""
try:
if self.args is not None:
if isinstance(self.args, tuple):
results = self.target(*self.args)
elif self.args is not None:
results = self.target(self.args)
else:
results = self.target()
Expand Down
257 changes: 132 additions & 125 deletions src/snapred/ui/workflow/DiffCalWorkflow.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from mantid.simpleapi import mtd
from qtpy.QtCore import Slot
from qtpy.QtWidgets import (
QMessageBox,
)

from snapred.backend.dao import RunConfig
from snapred.backend.dao.indexing.IndexEntry import IndexEntry
Expand All @@ -21,7 +18,7 @@
HasStateRequest,
SimpleDiffCalRequest,
)
from snapred.backend.dao.SNAPResponse import SNAPResponse
from snapred.backend.dao.SNAPResponse import ResponseCode, SNAPResponse
from snapred.backend.error.ContinueWarning import ContinueWarning
from snapred.backend.log.logger import snapredLogger
from snapred.backend.recipe.algorithm.FitMultiplePeaksAlgorithm import FitOutputEnum
Expand Down Expand Up @@ -136,41 +133,45 @@ def _continueAnywayHandlerTweak(self, continueInfo: ContinueWarning.Model): # n
self._continueAnywayHandler(continueInfo)
self._tweakPeakView.updateContinueAnyway(True)

def __setInteraction(self, state: bool):
self._requestView.litemodeToggle.setEnabled(state)
self._requestView.groupingFileDropdown.setEnabled(state)

@ExceptionToErrLog
@Slot()
def _populateGroupingDropdown(self):
# when the run number is updated, freeze the drop down to populate it
runNumber = self._requestView.runNumberField.text()
useLiteMode = self._requestView.litemodeToggle.field.getState()

self._requestView.groupingFileDropdown.setEnabled(False)
self._requestView.litemodeToggle.setEnabled(False)
# TODO: Use threads, account for fail cases
try:
# check if the state exists -- if so load its grouping map
payload = HasStateRequest(
runId=runNumber,
useLiteMode=useLiteMode,
)
hasState = self.request(path="calibration/hasState", payload=payload.json()).data
if hasState:
self.groupingMap = self.request(path="config/groupingMap", payload=runNumber).data
else:
self.groupingMap = self.defaultGroupingMap
self.focusGroups = self.groupingMap.getMap(useLiteMode)
self.__setInteraction(False)
self.workflow.presenter.handleAction(
self.handleDropdown,
args=(runNumber, useLiteMode),
onSuccess=lambda: self.__setInteraction(True),
)

# populate and re-enable the drop down
self._requestView.populateGroupingDropdown(list(self.focusGroups.keys()))
except Exception as e: # noqa BLE001
print(e)
def handleDropdown(self, runNumber, useLiteMode):
# check if the state exists -- if so load its grouping map
payload = HasStateRequest(
runId=runNumber,
useLiteMode=useLiteMode,
)
hasState = self.request(path="calibration/hasState", payload=payload.json()).data
if hasState:
self.groupingMap = self.request(path="config/groupingMap", payload=runNumber).data
else:
self.groupingMap = self.defaultGroupingMap
self.focusGroups = self.groupingMap.getMap(useLiteMode)

self._requestView.groupingFileDropdown.setEnabled(True)
self._requestView.litemodeToggle.setEnabled(True)
# populate and re-enable the drop down
self._requestView.populateGroupingDropdown(list(self.focusGroups.keys()))
return SNAPResponse(code=ResponseCode.OK)

@ExceptionToErrLog
@Slot()
def _switchLiteNativeGroups(self):
# when the run number is updated, freeze the drop down to populate it
# determine resolution mode
useLiteMode = self._requestView.litemodeToggle.field.getState()
self._tweakPeakView.litemodeToggle.field.setState(useLiteMode)

Expand All @@ -180,14 +181,17 @@ def _switchLiteNativeGroups(self):
self._requestView.skipPixelCalToggle.field.setState(not useLiteMode)

self._requestView.groupingFileDropdown.setEnabled(False)
# TODO: Use threads, account for fail cases
try:
self.focusGroups = self.groupingMap.getMap(useLiteMode)
self._requestView.populateGroupingDropdown(list(self.focusGroups.keys()))
except Exception as e: # noqa BLE001
print(e)

self._requestView.groupingFileDropdown.setEnabled(True)
self.workflow.presenter.handleAction(
self.handleSwitchLiteNative,
args=(useLiteMode,),
onSuccess=lambda: self._requestView.groupingFileDropdown.setEnabled(True),
)

def handleSwitchLiteNative(self, useLiteMode):
self.focusGroups = self.groupingMap.getMap(useLiteMode)
self._requestView.populateGroupingDropdown(list(self.focusGroups.keys()))
return SNAPResponse(code=ResponseCode.OK)

@Slot(WorkflowPresenter, result=SNAPResponse)
def _specifyRun(self, workflowPresenter):
Expand Down Expand Up @@ -260,59 +264,61 @@ def _specifyRun(self, workflowPresenter):

@ExceptionToErrLog
@Slot(int, float, float, SymmetricPeakEnum, Pair, float)
def onValueChange(self, groupingIndex, xtalDMin, xtalDMax, peakFunction, fwhm, maxChiSq):
def onValueChange(self, *args):
self._tweakPeakView.disableRecalculateButton()
# TODO: This is a temporary solution,
# this should have never been setup to all run on the same thread.
# It assumed an exception would never be tossed and thus
# would never enable the recalc button again if one did
try:
self.focusGroupPath = list(self.focusGroups.items())[groupingIndex][0]

# if peaks will change, redo only the smoothing
if (
xtalDMin != self.prevXtalDMin
or xtalDMax != self.prevXtalDMax
or peakFunction != self.peakFunction
or fwhm != self.prevFWHM
or maxChiSq != self.maxChiSq
or self.peaksWerePurged
):
self._renewIngredients(xtalDMin, xtalDMax, peakFunction, fwhm, maxChiSq)
self._renewFitPeaks(peakFunction)
self._calculateResidual()
self.peaksWerePurged = False

# if the grouping file changes, load new grouping and refocus
if groupingIndex != self.prevGroupingIndex:
self._renewIngredients(xtalDMin, xtalDMax, peakFunction, fwhm, maxChiSq)
self._renewFocus(groupingIndex)
self._renewFitPeaks(peakFunction)
self._calculateResidual()

# NOTE it was determined pixel calibration NOT need to be re-calculated when peak params change.
# However, if this requirement changes, the if at L282 should be combined with the if at 269,
# and the order should be _renewIngredients --> _renewPixelCal --> _renewFocus --> _renewFitPeaks

self._tweakPeakView.updateGraphs(
self.focusedWorkspace,
self.ingredients.groupedPeakLists,
self.fitPeaksDiagnostic,
self.residualWorkspace,
)
self.workflow.presenter.handleAction(
self.renewWhenRecalculate,
args=args,
onSuccess=self._tweakPeakView.enableRecalculateButton,
)

def renewWhenRecalculate(self, groupingIndex, xtalDMin, xtalDMax, peakFunction, fwhm, maxChiSq):
self._tweakPeakView.disableRecalculateButton()

self.focusGroupPath = list(self.focusGroups.items())[groupingIndex][0]

# if peaks will change, redo only the smoothing
if (
xtalDMin != self.prevXtalDMin
or xtalDMax != self.prevXtalDMax
or peakFunction != self.peakFunction
or fwhm != self.prevFWHM
or maxChiSq != self.maxChiSq
or self.peaksWerePurged
):
self._renewIngredients(xtalDMin, xtalDMax, peakFunction, fwhm, maxChiSq)
self._renewFitPeaks(peakFunction)
self._calculateResidual()

# if the grouping file changes, load new grouping and refocus
if groupingIndex != self.prevGroupingIndex:
self._renewIngredients(xtalDMin, xtalDMax, peakFunction, fwhm, maxChiSq)
self._renewFocus(groupingIndex)
self._renewFitPeaks(peakFunction)
self._calculateResidual()

self.peaksWerePurged = False

# update the values for next call to this method
self.prevXtalDMin = xtalDMin
self.prevXtalDMax = xtalDMax
self.prevFWHM = fwhm
self.peakFunction = peakFunction
self.prevGroupingIndex = groupingIndex
self.maxChiSq = maxChiSq
except Exception as e: # noqa BLE001
print(e)
# NOTE it was determined pixel calibration NOT need to be re-calculated when peak params change.
# However, if this requirement changes, the if at L282 should be combined with the if at 269,
# and the order should be _renewIngredients --> _renewPixelCal --> _renewFocus --> _renewFitPeaks

# renable button when graph is updated
self._tweakPeakView.enableRecalculateButton()
self._tweakPeakView.updateGraphs(
self.focusedWorkspace,
self.ingredients.groupedPeakLists,
self.fitPeaksDiagnostic,
self.residualWorkspace,
)

# update the values for next call to this method
self.prevXtalDMin = xtalDMin
self.prevXtalDMax = xtalDMax
self.prevFWHM = fwhm
self.peakFunction = peakFunction
self.prevGroupingIndex = groupingIndex
self.maxChiSq = maxChiSq

return SNAPResponse(code=ResponseCode.OK)

def _createDiffCalRequest(self, xtalDMin, xtalDMax, peakFunction, fwhm, maxChiSq) -> DiffractionCalibrationRequest:
"""
Expand Down Expand Up @@ -389,49 +395,50 @@ def _calculateResidual(self):
@Slot(float)
def purgeBadPeaks(self, maxChiSq):
self._tweakPeakView.disableRecalculateButton()
try:
# update the max chi sq
self.maxChiSq = maxChiSq
allPeaks = self.ingredients.groupedPeakLists
param_table = mtd[self.fitPeaksDiagnostic].getItem(FitOutputEnum.Parameters.value).toDict()
index = param_table["wsindex"]
allChi2 = param_table["chi2"]
goodPeaks = []
for wkspIndex, groupPeaks in enumerate(allPeaks):
peaks = groupPeaks.peaks
# collect the fit chi-sq parameters for this spectrum, and the fits
chi2 = [x2 for i, x2 in zip(index, allChi2) if i == wkspIndex]
goodPeaks.append([peak for x2, peak in zip(chi2, peaks) if x2 < maxChiSq])
too_fews = [goodPeak for goodPeak in goodPeaks if len(goodPeak) < 2]
if too_fews != []:
QMessageBox.critical(
self._tweakPeakView,
"Too Few Peaks",
"Purging would result in fewer than the required 2 peaks for calibration. "
+ "The current set of peaks will be retained.",
QMessageBox.Ok,
)
else:
for wkspIndex, groupPeaks in enumerate(allPeaks):
groupPeaks.peaks = goodPeaks[wkspIndex]
self.peaksWerePurged = True

# renew the fits to the peaks
self._renewFitPeaks(self.peakFunction)

# update graph with reduced peak list
self._tweakPeakView.updateGraphs(
self.focusedWorkspace,
self.ingredients.groupedPeakLists,
self.fitPeaksDiagnostic,
)
self.workflow.presenter.handleAction(
self._purgeBadPeaks,
args=(maxChiSq,),
onSuccess=self._tweakPeakView.enableRecalculateButton,
)

except Exception as e: # noqa BLE001
# NOTE this has the same issue as onValueChange
print(e)
def _purgeBadPeaks(self, maxChiSq):
# update the max chi sq
self.maxChiSq = maxChiSq
allPeaks = self.ingredients.groupedPeakLists
param_table = mtd[self.fitPeaksDiagnostic].getItem(FitOutputEnum.Parameters.value).toDict()
index = param_table["wsindex"]
allChi2 = param_table["chi2"]
goodPeaks = []
for wkspIndex, groupPeaks in enumerate(allPeaks):
peaks = groupPeaks.peaks
# collect the fit chi-sq parameters for this spectrum, and the fits
chi2 = [x2 for i, x2 in zip(index, allChi2) if i == wkspIndex]
goodPeaks.append([peak for x2, peak in zip(chi2, peaks) if x2 < maxChiSq])
too_fews = [goodPeak for goodPeak in goodPeaks if len(goodPeak) < 2]
if too_fews != []:
msg = """
Too Few Peaks
Purging would result in fewer than the required 2 peaks for calibration.
The current set of peaks will be retained.
"""
raise RuntimeError(msg)
else:
for wkspIndex, originalGroupPeaks in enumerate(allPeaks):
originalGroupPeaks.peaks = goodPeaks[wkspIndex]
self.ingredients.groupedPeakLists = allPeaks
self.peaksWerePurged = True

# renew the fits to the peaks
self._renewFitPeaks(self.peakFunction)

# renable button when graph is updated
self._tweakPeakView.enableRecalculateButton()
# update graph with reduced peak list
self._tweakPeakView.updateGraphs(
self.focusedWorkspace,
self.ingredients.groupedPeakLists,
self.fitPeaksDiagnostic,
self.residualWorkspace,
)
return SNAPResponse(code=ResponseCode.OK)

@Slot(WorkflowPresenter, result=SNAPResponse)
def _triggerDiffractionCalibration(self, workflowPresenter):
Expand Down Expand Up @@ -468,7 +475,7 @@ def _triggerDiffractionCalibration(self, workflowPresenter):
maxChiSq=self.maxChiSq,
)

response = self.request(path="calibration/assessment", payload=payload.json())
response = self.request(path="calibration/assessment", payload=payload)
assessmentResponse = response.data
self.calibrationRecord = assessmentResponse.record

Expand Down
Loading
Loading