Skip to content

Pre-create certain outputs for upload 2.0 API. #5609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions lib/galaxy/actions/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,14 @@ def _make_library_uploaded_dataset(self, trans, params, name, path, type, librar
uploaded_dataset.to_posix_lines = params.get('to_posix_lines', None)
uploaded_dataset.space_to_tab = params.get('space_to_tab', None)
uploaded_dataset.tag_using_filenames = params.get('tag_using_filenames', True)
uploaded_dataset.purge_source = getattr(trans.app.config, 'ftp_upload_purge', True)
if in_folder:
uploaded_dataset.in_folder = in_folder
uploaded_dataset.data = upload_common.new_upload(trans, 'api', uploaded_dataset, library_bunch)
uploaded_dataset.link_data_only = link_data_only
uploaded_dataset.uuid = uuid_str
if link_data_only == 'link_to_files':
uploaded_dataset.data.file_name = os.path.abspath(path)
# Since we are not copying the file into Galaxy's managed
# default file location, the dataset should never be purgable.
uploaded_dataset.data.dataset.purgable = False
uploaded_dataset.data.link_to(path)
trans.sa_session.add_all((uploaded_dataset.data, uploaded_dataset.data.dataset))
trans.sa_session.flush()
return uploaded_dataset
Expand Down
4 changes: 4 additions & 0 deletions lib/galaxy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from galaxy import config, jobs
from galaxy.jobs import metrics as job_metrics
from galaxy.managers.collections import DatasetCollectionManager
from galaxy.managers.folders import FolderManager
from galaxy.managers.libraries import LibraryManager
from galaxy.managers.tags import GalaxyTagManager
from galaxy.openid.providers import OpenIDProviders
from galaxy.queue_worker import GalaxyQueueWorker
Expand Down Expand Up @@ -90,6 +92,8 @@ def __init__(self, **kwargs):
self.tag_handler = GalaxyTagManager(self.model.context)
# Dataset Collection Plugins
self.dataset_collections_service = DatasetCollectionManager(self)
self.library_folder_manager = FolderManager()
self.library_manager = LibraryManager()

# Tool Data Tables
self._configure_tool_data_tables(from_shed_config=False)
Expand Down
7 changes: 7 additions & 0 deletions lib/galaxy/datatypes/sniff.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import zipfile

from six import text_type
from six.moves.urllib.request import urlopen

from galaxy import util
from galaxy.util import compression_utils
Expand All @@ -39,6 +40,12 @@ def get_test_fname(fname):
return full_path


def stream_url_to_file(path):
page = urlopen(path) # page will be .close()ed in stream_to_file
temp_name = stream_to_file(page, prefix='url_paste', source_encoding=util.get_charset_from_http_headers(page.headers))
return temp_name


def stream_to_open_named_file(stream, fd, filename, source_encoding=None, source_error='strict', target_encoding=None, target_error='strict'):
"""Writes a stream to the provided file descriptor, returns the file name. Closes file descriptor"""
# signature and behavor is somewhat odd, due to backwards compatibility, but this can/should be done better
Expand Down
47 changes: 47 additions & 0 deletions lib/galaxy/datatypes/upload_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from galaxy.datatypes import sniff
from galaxy.datatypes.binary import Binary


class UploadProblemException(Exception):

def __init__(self, message):
self.message = message


def handle_unsniffable_binary_check(data_type, ext, path, name, is_binary, requested_ext, check_content, registry):
"""Return modified values of data_type and ext if unsniffable binary encountered.

Throw UploadProblemException if content problems or extension mismatches occur.

Precondition: check_binary called returned True.
"""
if is_binary or registry.is_extension_unsniffable_binary(requested_ext):
# We have a binary dataset, but it is not Bam, Sff or Pdf
data_type = 'binary'
parts = name.split(".")
if len(parts) > 1:
ext = parts[-1].strip().lower()
is_ext_unsniffable_binary = registry.is_extension_unsniffable_binary(ext)
if check_content and not is_ext_unsniffable_binary:
raise UploadProblemException('The uploaded binary file contains inappropriate content')

elif is_ext_unsniffable_binary and requested_ext != ext:
err_msg = "You must manually set the 'File Format' to '%s' when uploading %s files." % (ext, ext)
raise UploadProblemException(err_msg)
return data_type, ext


def handle_sniffable_binary_check(data_type, ext, path, registry):
"""Return modified values of data_type and ext if sniffable binary encountered.

Precondition: check_binary called returned True.
"""
# Sniff the data type
guessed_ext = sniff.guess_ext(path, registry.sniff_order)
# Set data_type only if guessed_ext is a binary datatype
datatype = registry.get_datatype_by_extension(guessed_ext)
if isinstance(datatype, Binary):
data_type = guessed_ext
ext = guessed_ext

return data_type, ext
1 change: 1 addition & 0 deletions lib/galaxy/dependencies/pinned-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ uWSGI==2.0.15
pysam==0.14

# pure Python packages
bdbag==1.1.1
bz2file==0.98; python_version < '3.3'
ipaddress==1.0.18; python_version < '3.3'
boltons==17.1.0
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1380,7 +1380,7 @@ def path_rewriter(path):
collected_datasets = {
'primary': self.tool.collect_primary_datasets(out_data, self.get_tool_provided_job_metadata(), tool_working_directory, input_ext, input_dbkey)
}
self.tool.collect_dynamic_collections(
self.tool.collect_dynamic_outputs(
out_collections,
self.get_tool_provided_job_metadata(),
job_working_directory=tool_working_directory,
Expand Down
19 changes: 12 additions & 7 deletions lib/galaxy/managers/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,22 @@ def __init__(self, app):
self.tag_manager = tags.GalaxyTagManager(app.model.context)
self.ldda_manager = lddas.LDDAManager(app)

def precreate_dataset_collection_instance(self, trans, parent, name, implicit_inputs, implicit_output_name, structure):
def precreate_dataset_collection_instance(self, trans, parent, name, structure, implicit_inputs=None, implicit_output_name=None):
# TODO: prebuild all required HIDs and send them in so no need to flush in between.
dataset_collection = self.precreate_dataset_collection(structure)
dataset_collection = self.precreate_dataset_collection(structure, allow_unitialized_element=implicit_output_name is not None)
instance = self._create_instance_for_collection(
trans, parent, name, dataset_collection, implicit_inputs=implicit_inputs, implicit_output_name=implicit_output_name, flush=False
)
return instance

def precreate_dataset_collection(self, structure):
if structure.is_leaf or not structure.children_known:
return model.DatasetCollectionElement.UNINITIALIZED_ELEMENT
def precreate_dataset_collection(self, structure, allow_unitialized_element=True):
has_structure = not structure.is_leaf and structure.children_known
if not has_structure and allow_unitialized_element:
dataset_collection = model.DatasetCollectionElement.UNINITIALIZED_ELEMENT
elif not has_structure:
collection_type_description = structure.collection_type_description
dataset_collection = model.DatasetCollection(populated=False)
dataset_collection.collection_type = collection_type_description.collection_type
else:
collection_type_description = structure.collection_type_description
dataset_collection = model.DatasetCollection(populated=False)
Expand All @@ -67,7 +72,7 @@ def precreate_dataset_collection(self, structure):
if substructure.is_leaf:
element = model.DatasetCollectionElement.UNINITIALIZED_ELEMENT
else:
element = self.precreate_dataset_collection(substructure)
element = self.precreate_dataset_collection(substructure, allow_unitialized_element=allow_unitialized_element)

element = model.DatasetCollectionElement(
element=element,
Expand All @@ -78,7 +83,7 @@ def precreate_dataset_collection(self, structure):
dataset_collection.elements = elements
dataset_collection.element_count = len(elements)

return dataset_collection
return dataset_collection

def create(self, trans, parent, name, collection_type, element_identifiers=None,
elements=None, implicit_collection_info=None, trusted_identifiers=None,
Expand Down
6 changes: 6 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2035,6 +2035,12 @@ def set_file_name(self, filename):
return self.dataset.set_file_name(filename)
file_name = property(get_file_name, set_file_name)

def link_to(self, path):
self.file_name = os.path.abspath(path)
# Since we are not copying the file into Galaxy's managed
# default file location, the dataset should never be purgable.
self.dataset.purgable = False

@property
def extra_files_path(self):
return self.dataset.extra_files_path
Expand Down
12 changes: 8 additions & 4 deletions lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
# Tools that require Galaxy's Python environment to be preserved.
GALAXY_LIB_TOOLS_UNVERSIONED = [
"upload1",
"__DATA_FETCH__",
# Legacy tools bundled with Galaxy.
"vcf_to_maf_customtrack1",
"laj_1",
Expand Down Expand Up @@ -1063,7 +1064,10 @@ def parse_input_elem(self, page_source, enctypes, context=None):
group.file_type_name = elem.get('file_type_name', group.file_type_name)
group.default_file_type = elem.get('default_file_type', group.default_file_type)
group.metadata_ref = elem.get('metadata_ref', group.metadata_ref)
rval[group.file_type_name].refresh_on_change = True
try:
rval[group.file_type_name].refresh_on_change = True
except KeyError:
pass
group_page_source = XmlPageSource(elem)
group.inputs = self.parse_input_elem(group_page_source, enctypes, context)
rval[group.name] = group
Expand Down Expand Up @@ -1614,10 +1618,10 @@ def collect_primary_datasets(self, output, tool_provided_metadata, job_working_d
"""
return output_collect.collect_primary_datasets(self, output, tool_provided_metadata, job_working_directory, input_ext, input_dbkey=input_dbkey)

def collect_dynamic_collections(self, output, tool_provided_metadata, **kwds):
""" Find files corresponding to dynamically structured collections.
def collect_dynamic_outputs(self, output, tool_provided_metadata, **kwds):
"""Collect dynamic outputs associated with a job from this tool.
"""
return output_collect.collect_dynamic_collections(self, output, tool_provided_metadata, **kwds)
return output_collect.collect_dynamic_outputs(self, output, tool_provided_metadata, **kwds)

def to_archive(self):
tool = self
Expand Down
130 changes: 121 additions & 9 deletions lib/galaxy/tools/actions/upload.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import json
import logging
import os

from galaxy.dataset_collections.structure import UnitializedTree
from galaxy.exceptions import RequestParameterMissingException
from galaxy.tools.actions import upload_common
from galaxy.util import ExecutionTimer
from galaxy.util.bunch import Bunch
from . import ToolAction

log = logging.getLogger(__name__)


class UploadToolAction(ToolAction):
class BaseUploadToolAction(ToolAction):

def execute(self, tool, trans, incoming={}, set_output_hid=True, history=None, **kwargs):
def execute(self, tool, trans, incoming={}, history=None, **kwargs):
dataset_upload_inputs = []
for input_name, input in tool.inputs.items():
if input.type == "upload_dataset":
Expand All @@ -19,18 +24,125 @@ def execute(self, tool, trans, incoming={}, set_output_hid=True, history=None, *
persisting_uploads_timer = ExecutionTimer()
incoming = upload_common.persist_uploads(incoming, trans)
log.debug("Persisted uploads %s" % persisting_uploads_timer)
rval = self._setup_job(tool, trans, incoming, dataset_upload_inputs, history)
return rval

def _setup_job(self, tool, trans, incoming, dataset_upload_inputs, history):
"""Take persisted uploads and create a job for given tool."""

def _create_job(self, *args, **kwds):
"""Wrapper around upload_common.create_job with a timer."""
create_job_timer = ExecutionTimer()
rval = upload_common.create_job(*args, **kwds)
log.debug("Created upload job %s" % create_job_timer)
return rval


class UploadToolAction(BaseUploadToolAction):

def _setup_job(self, tool, trans, incoming, dataset_upload_inputs, history):
check_timer = ExecutionTimer()
# We can pass an empty string as the cntrller here since it is used to check whether we
# are in an admin view, and this tool is currently not used there.
uploaded_datasets = upload_common.get_uploaded_datasets(trans, '', incoming, dataset_upload_inputs, history=history)

if not uploaded_datasets:
return None, 'No data was entered in the upload form, please go back and choose data to upload.'

log.debug("Checked uploads %s" % check_timer)
create_job_timer = ExecutionTimer()
json_file_path = upload_common.create_paramfile(trans, uploaded_datasets)
data_list = [ud.data for ud in uploaded_datasets]
rval = upload_common.create_job(trans, incoming, tool, json_file_path, data_list, history=history)
log.debug("Created upload job %s" % create_job_timer)
return rval
log.debug("Checked uploads %s" % check_timer)
return self._create_job(
trans, incoming, tool, json_file_path, data_list, history=history
)


class FetchUploadToolAction(BaseUploadToolAction):

def _setup_job(self, tool, trans, incoming, dataset_upload_inputs, history):
# Now replace references in requests with these.
files = incoming.get("files", [])
files_iter = iter(files)
request = json.loads(incoming.get("request_json"))

def replace_file_srcs(request_part):
if isinstance(request_part, dict):
if request_part.get("src", None) == "files":
path_def = next(files_iter)
if path_def is None or path_def["file_data"] is None:
raise RequestParameterMissingException("Failed to find uploaded file matching target with src='files'")
request_part["path"] = path_def["file_data"]["local_filename"]
if "name" not in request_part:
request_part["name"] = path_def["file_data"]["filename"]
request_part["src"] = "path"
else:
for key, value in request_part.items():
replace_file_srcs(value)
elif isinstance(request_part, list):
for value in request_part:
replace_file_srcs(value)

replace_file_srcs(request)

outputs = []
for target in request.get("targets", []):
destination = target.get("destination")
destination_type = destination.get("type")
# Start by just pre-creating HDAs.
if destination_type == "hdas":
if target.get("elements_from"):
# Dynamic collection required I think.
continue
_precreate_fetched_hdas(trans, history, target, outputs)

if destination_type == "hdca":
_precreate_fetched_collection_instance(trans, history, target, outputs)

incoming["request_json"] = json.dumps(request)
return self._create_job(
trans, incoming, tool, None, outputs, history=history
)


def _precreate_fetched_hdas(trans, history, target, outputs):
for item in target.get("elements", []):
name = item.get("name", None)
if name is None:
src = item.get("src", None)
if src == "url":
url = item.get("url")
if name is None:
name = url.split("/")[-1]
elif src == "path":
path = item["path"]
if name is None:
name = os.path.basename(path)

file_type = item.get("ext", "auto")
dbkey = item.get("dbkey", "?")
uploaded_dataset = Bunch(
type='file', name=name, file_type=file_type, dbkey=dbkey
)
data = upload_common.new_upload(trans, '', uploaded_dataset, library_bunch=None, history=history)
outputs.append(data)
item["object_id"] = data.id


def _precreate_fetched_collection_instance(trans, history, target, outputs):
collection_type = target.get("collection_type")
if not collection_type:
# Can't precreate collections of unknown type at this time.
return

name = target.get("name")
if not name:
return

collections_service = trans.app.dataset_collections_service
collection_type_description = collections_service.collection_type_descriptions.for_collection_type(collection_type)
structure = UnitializedTree(collection_type_description)
hdca = collections_service.precreate_dataset_collection_instance(
trans, history, name, structure=structure
)
outputs.append(hdca)
# Following flushed needed for an ID.
trans.sa_session.flush()
target["destination"]["object_id"] = hdca.id
Loading